SPPAS integration of the IPUs detection.
Module sppas.src.annotations
Class sppasSearchIPUs
Description
Constructor
Create a new sppasSearchIPUs instance.
Parameters
- log: (sppasLog) Human-readable logs.
View Source
def __init__(self, log=None):
"""Create a new sppasSearchIPUs instance.
:param log: (sppasLog) Human-readable logs.
"""
super(sppasSearchIPUs, self).__init__('searchipus.json', log)
Public functions
fix_options
Fix all options.
Available options are:
- threshold: volume threshold to decide a window is silence or not
- win_length: length of window for a estimation or volume values
- min_sil: minimum duration of a silence
- min_ipu: minimum duration of an ipu
- shift_start: start boundary shift value.
- shift_end: end boundary shift value.
Parameters
- options: (sppasOption)
View Source
def fix_options(self, options):
"""Fix all options.
Available options are:
- threshold: volume threshold to decide a window is silence or not
- win_length: length of window for a estimation or volume values
- min_sil: minimum duration of a silence
- min_ipu: minimum duration of an ipu
- shift_start: start boundary shift value.
- shift_end: end boundary shift value.
:param options: (sppasOption)
"""
for opt in options:
key = opt.get_key()
if 'threshold' == key:
self.set_threshold(opt.get_value())
elif 'win_length' == key:
self.set_win_length(opt.get_value())
elif 'min_sil' == key:
self.set_min_sil(opt.get_value())
elif 'min_ipu' == key:
self.set_min_ipu(opt.get_value())
elif 'shift_start' == key:
self.set_shift_start(opt.get_value())
elif 'shift_end' == key:
self.set_shift_end(opt.get_value())
elif 'pattern' in key:
self._options[key] = opt.get_value()
else:
raise AnnotationOptionError(key)
get_threshold
View Source
def get_threshold(self):
return self._options['threshold']
get_win_length
View Source
def get_win_length(self):
return self._options['win_length']
get_min_sil
View Source
def get_min_sil(self):
return self._options['min_sil']
get_min_ipu
View Source
def get_min_ipu(self):
return self._options['min_ipu']
get_shift_start
View Source
def get_shift_start(self):
return self._options['shift_start']
get_shift_end
View Source
def get_shift_end(self):
return self._options['shift_end']
set_threshold
Fix the threshold volume.
Parameters
- value: (int) RMS value used as volume threshold
View Source
def set_threshold(self, value):
"""Fix the threshold volume.
:param value: (int) RMS value used as volume threshold
"""
self._options['threshold'] = value
set_win_length
Set a new length of window for a estimation or volume values.
TAKE CARE: it cancels any previous estimation of volume and silence search.
Parameters
- value: (float) generally between 0.01 and 0.04 seconds.
View Source
def set_win_length(self, value):
"""Set a new length of window for a estimation or volume values.
TAKE CARE:
it cancels any previous estimation of volume and silence search.
:param value: (float) generally between 0.01 and 0.04 seconds.
"""
self._options['win_length'] = value
set_min_sil
Fix the default minimum duration of a silence.
Parameters
- value: (float) Duration in seconds.
View Source
def set_min_sil(self, value):
"""Fix the default minimum duration of a silence.
:param value: (float) Duration in seconds.
"""
self._options['min_sil'] = value
set_min_ipu
Fix the default minimum duration of an IPU.
Parameters
- value: (float) Duration in seconds.
View Source
def set_min_ipu(self, value):
"""Fix the default minimum duration of an IPU.
:param value: (float) Duration in seconds.
"""
self._options['min_ipu'] = value
set_shift_start
Fix the start boundary shift value.
Parameters
- value: (float) Duration in seconds.
View Source
def set_shift_start(self, value):
"""Fix the start boundary shift value.
:param value: (float) Duration in seconds.
"""
self._options['shift_start'] = value
set_shift_end
Fix the end boundary shift value.
Parameters
- value: (float) Duration in seconds.
View Source
def set_shift_end(self, value):
"""Fix the end boundary shift value.
:param value: (float) Duration in seconds.
"""
self._options['shift_end'] = value
tracks_to_tier
Create a sppasTier object from tracks.
Parameters
- tracks: (List of tuple) with (from, to) values in seconds
- end_time: (float) End-time of the tier
- vagueness: (float) vagueness used for silence search
View Source
@staticmethod
def tracks_to_tier(tracks, end_time, vagueness):
"""Create a sppasTier object from tracks.
:param tracks: (List of tuple) with (from, to) values in seconds
:param end_time: (float) End-time of the tier
:param vagueness: (float) vagueness used for silence search
"""
if len(tracks) == 0:
raise IOError('No IPUs to write.\n')
tier = sppasTier('IPUs')
tier.set_meta('number_of_ipus', str(len(tracks)))
i = 0
to_prec = 0.0
for from_time, to_time in tracks:
if from_time == 0.0 or to_time == end_time:
radius = 0.0
else:
radius = vagueness / 2.0
if to_prec < from_time:
tier.create_annotation(sppasLocation(sppasInterval(sppasPoint(to_prec, radius), sppasPoint(from_time, radius))), sppasLabel(sppasTag(SIL_ORTHO)))
tier.create_annotation(sppasLocation(sppasInterval(sppasPoint(from_time, radius), sppasPoint(to_time, radius))), sppasLabel(sppasTag('ipu_%d' % (i + 1))))
i += 1
to_prec = to_time
begin = sppasPoint(to_prec, vagueness / 2.0)
if begin < end_time:
tier.create_annotation(sppasLocation(sppasInterval(begin, sppasPoint(end_time))), sppasLabel(sppasTag(SIL_ORTHO)))
return tier
convert
Search for IPUs in the given channel.
Parameters
- channel: (Channel) Input channel
Returns
- (sppasTier)
View Source
def convert(self, channel):
"""Search for IPUs in the given channel.
:param channel: (Channel) Input channel
:return: (sppasTier)
"""
searcher = SearchForIPUs(channel=channel)
searcher.set_vol_threshold(self._options['threshold'])
searcher.set_win_length(self._options['win_length'])
searcher.set_min_sil(self._options['min_sil'])
searcher.set_min_ipu(self._options['min_ipu'])
searcher.set_shift_start(self._options['shift_start'])
searcher.set_shift_end(self._options['shift_end'])
tracks = searcher.get_tracks(time_domain=True)
tier = self.tracks_to_tier(tracks, channel.get_duration(), searcher.get_vagueness())
self._set_meta(searcher, tier)
return tier
run
Run the automatic annotation process on an input.
Parameters
- input_files: (list of str) Audio
- output: (str) the output file name
Returns
- (sppasTranscription)
View Source
def run(self, input_files, output=None):
"""Run the automatic annotation process on an input.
:param input_files: (list of str) Audio
:param output: (str) the output file name
:returns: (sppasTranscription)
"""
audio_speech = audioopy.aio.open(input_files[0])
framerate = audio_speech.get_framerate()
n = audio_speech.get_nchannels()
if n != 1:
raise IOError('An audio file with only one channel is expected. Got {:d} channels.'.format(n))
idx = audio_speech.extract_channel(0)
channel = audio_speech.get_channel(idx)
tier = self.convert(channel)
trs_output = sppasTranscription(self.name)
trs_output.set_meta('media_sample_rate', str(framerate))
trs_output.set_meta('annotation_result_of', input_files[0])
trs_output.append(tier)
extm = os.path.splitext(input_files[0])[1].lower()[1:]
media = sppasMedia(os.path.abspath(input_files[0]), mime_type='audio/' + extm)
tier.set_media(media)
if output is not None:
output_file = self.fix_out_file_ext(output)
parser = sppasTrsRW(output_file)
parser.write(trs_output)
return [output_file]
return trs_output
run_for_batch_processing
Perform the annotation on a file.
This method is called by 'batch_processing'. It fixes the name of the output file. If the output file is already existing, the annotation is cancelled (the file won't be overridden). If not, it calls the run method.
Parameters
- input_files: (list of str) the inputs to perform a run
Returns
- output file name or None
View Source
def run_for_batch_processing(self, input_files):
"""Perform the annotation on a file.
This method is called by 'batch_processing'. It fixes the name of the
output file. If the output file is already existing, the annotation
is cancelled (the file won't be overridden). If not, it calls the run
method.
:param input_files: (list of str) the inputs to perform a run
:returns: output file name or None
"""
root_pattern = self.get_out_name(input_files[0])
ext = []
for e in sppasTrsRW.annot_extensions():
if e not in ('.txt',):
ext.append(e)
exist_out_name = sppasBaseAnnotation._get_filename(root_pattern, ext)
if exist_out_name is not None:
out_name = self.fix_out_file_ext(root_pattern)
if exist_out_name.lower() == out_name.lower():
return list()
else:
try:
parser = sppasTrsRW(exist_out_name)
t = parser.read()
parser.set_filename(out_name)
parser.write(t)
self.logfile.print_message(_info(1302).format(out_name), indent=2, status=annots.warning)
return [out_name]
except:
pass
try:
new_files = self.run(input_files, root_pattern)
except Exception as e:
new_files = list()
self.logfile.print_message('{:s}\n'.format(str(e)), indent=2, status=-1)
return new_files
get_input_extensions
Extensions that the annotation expects for its input filename.
View Source
@staticmethod
def get_input_extensions():
"""Extensions that the annotation expects for its input filename."""
return [sppasFiles.get_informat_extensions('AUDIO')]
Private functions
_set_meta
Set meta values to the tier.
Parameters
- searcher
- tier
View Source
def _set_meta(self, searcher, tier):
"""Set meta values to the tier."""
tier.set_meta('required_threshold_volume', str(searcher.get_vol_threshold()))
tier.set_meta('estimated_threshold_volume', str(searcher.get_effective_threshold()))
tier.set_meta('minimum_silence_duration', str(searcher.get_min_sil_dur()))
tier.set_meta('minimum_ipus_duration', str(searcher.get_min_ipu_dur()))
tier.set_meta('shift_ipus_start', str(searcher.get_shift_start()))
tier.set_meta('shift_ipus_end', str(searcher.get_shift_end()))
meta = ('rms_min', 'rms_max', 'rms_mean', 'rms_median', 'rms_coefvar')
for key, value in zip(meta, searcher.get_rms_stats()):
tier.set_meta(str(key), str(value))
self.logfile.print_message('Information: ', indent=1)
if searcher.get_vol_threshold() == 0:
self.logfile.print_message('Automatically estimated threshold volume value: {:d}'.format(searcher.get_effective_threshold()), indent=2)
self.logfile.print_message('Number of IPUs found: {:s}'.format(tier.get_meta('number_of_ipus')), indent=2)