SPPAS integration of the fill in IPUs automatic annotation.
Module sppas.src.annotations
Class sppasFillIPUs
Description
Constructor
Create a new sppasFillIPUs instance.
Log is used for a better communication of the annotation process and its results. If None, logs are redirected to the default logging system.
Parameters
- log: (sppasLog) Human-readable logs.
View Source
def __init__(self, log=None):
"""Create a new sppasFillIPUs instance.
Log is used for a better communication of the annotation process and its
results. If None, logs are redirected to the default logging system.
:param log: (sppasLog) Human-readable logs.
"""
super(sppasFillIPUs, self).__init__('fillipus.json', log)
self.__audio_filename = None
Public functions
fix_options
Fix all options.
Available options are:
- threshold: volume threshold to decide a window is silence or not
- win_length: length of window for a estimation or volume values
- min_sil: minimum duration of a silence
- min_ipu: minimum duration of an ipu
- shift_start: start boundary shift value.
- shift_end: end boundary shift value.
Parameters
- options: (sppasOption)
View Source
def fix_options(self, options):
"""Fix all options.
Available options are:
- threshold: volume threshold to decide a window is silence or not
- win_length: length of window for a estimation or volume values
- min_sil: minimum duration of a silence
- min_ipu: minimum duration of an ipu
- shift_start: start boundary shift value.
- shift_end: end boundary shift value.
:param options: (sppasOption)
"""
for opt in options:
key = opt.get_key()
if 'min_sil' == key:
self.set_min_sil(opt.get_value())
elif 'min_ipu' == key:
self.set_min_ipu(opt.get_value())
elif 'shift_start' == key:
self.set_shift_start(opt.get_value())
elif 'shift_end' == key:
self.set_shift_end(opt.get_value())
elif 'pattern' in key:
self._options[key] = opt.get_value()
else:
raise AnnotationOptionError(key)
get_min_sil
View Source
def get_min_sil(self):
return self._options['min_sil']
get_min_ipu
View Source
def get_min_ipu(self):
return self._options['min_ipu']
get_shift_start
View Source
def get_shift_start(self):
return self._options['shift_start']
get_shift_end
View Source
def get_shift_end(self):
return self._options['shift_end']
set_min_sil
Fix the initial minimum duration of a silence.
Parameters
- value: (float) Duration in seconds.
View Source
def set_min_sil(self, value):
"""Fix the initial minimum duration of a silence.
:param value: (float) Duration in seconds.
"""
self._options['min_sil'] = value
set_min_ipu
Fix the initial minimum duration of an IPU.
Parameters
- value: (float) Duration in seconds.
View Source
def set_min_ipu(self, value):
"""Fix the initial minimum duration of an IPU.
:param value: (float) Duration in seconds.
"""
self._options['min_ipu'] = value
set_shift_start
Fix the start boundary shift value.
Parameters
- value: (float) Duration in seconds.
View Source
def set_shift_start(self, value):
"""Fix the start boundary shift value.
:param value: (float) Duration in seconds.
"""
self._options['shift_start'] = value
set_shift_end
Fix the end boundary shift value.
Parameters
- value: (float) Duration in seconds.
View Source
def set_shift_end(self, value):
"""Fix the end boundary shift value.
:param value: (float) Duration in seconds.
"""
self._options['shift_end'] = value
convert
Return a tier with transcription aligned to the audio.
Parameters
- channel: (Channel) Input audio channel
- text_tier: (sppasTier) Input transcription text in a PointTier
Returns
- (sppasTier | None) Tier with transcription aligned to the audio
View Source
def convert(self, channel, text_tier):
"""Return a tier with transcription aligned to the audio.
:param channel: (Channel) Input audio channel
:param text_tier: (sppasTier) Input transcription text in a PointTier
:return: (sppasTier | None) Tier with transcription aligned to the audio
"""
units = list()
for a in text_tier:
units.append(serialize_labels(a.get_labels()))
ipus = [unit for unit in units if unit != SIL_ORTHO]
filler = FillIPUs(channel, units)
filler.set_min_ipu(self._options['min_ipu'])
filler.set_min_sil(self._options['min_sil'])
filler.set_shift_start(self._options['shift_start'])
filler.set_shift_end(self._options['shift_end'])
n = filler.fix_threshold_durations()
if n != len(ipus):
return None
tracks = filler.get_tracks(time_domain=True)
tier = sppasSearchIPUs.tracks_to_tier(tracks, channel.get_duration(), filler.get_vagueness())
tier.set_name('Transcription')
self._set_meta(filler, tier)
i = 0
for a in tier:
if a.get_best_tag().is_silence() is False:
a.set_labels([sppasLabel(sppasTag(ipus[i]))])
i += 1
return tier
get_inputs
Return the channel and the tier with ipus.
Parameters
- input_files: (list)
Raises
NoTierInputError
Returns
- (Channel, sppasTier)
View Source
def get_inputs(self, input_files):
"""Return the channel and the tier with ipus.
:param input_files: (list)
:raise: NoTierInputError
:return: (Channel, sppasTier)
"""
ext = self.get_input_extensions()
audio_ext = ext[0]
annot_ext = ext[1]
tier = None
channel = None
audio_filename = ''
for filename in input_files:
fn, fe = os.path.splitext(filename)
if channel is None and fe in audio_ext:
audio_speech = audioopy.aio.open(filename)
n = audio_speech.get_nchannels()
if n != 1:
audio_speech.close()
raise AudioChannelError(n)
idx = audio_speech.extract_channel()
channel = audio_speech.get_channel(idx)
audio_filename = filename
audio_speech.close()
elif tier is None and fe in annot_ext:
parser = sppasTrsRW(filename)
trs_input = parser.read()
tier = sppasFindTier.transcription(trs_input)
if 'raw' not in tier.get_name().lower():
tier = None
if tier is None:
logging.error('A tier with the raw transcription was not found.')
raise NoTierInputError
if tier.is_point() is False:
logging.error('The tier with the raw transcription should be of type: Point.')
raise AnnDataTypeError(tier.get_name(), 'PointTier')
if tier.is_empty() is True:
raise EmptyInputError(tier.get_name())
if channel is None:
logging.error('No audio file found or invalid one. An audio file with only one channel was expected.')
raise NoChannelInputError
extm = os.path.splitext(audio_filename)[1].lower()[1:]
media = sppasMedia(os.path.abspath(audio_filename), mime_type='audio/' + extm)
tier.set_media(media)
return (channel, tier)
run
Run the automatic annotation process on an input.
input_filename is a tuple (audio, raw transcription)
Parameters
- input_files: (list of str) (audio, ortho)
- output: (str) the output file name
Returns
- (sppasTranscription)
View Source
def run(self, input_files, output=None):
"""Run the automatic annotation process on an input.
input_filename is a tuple (audio, raw transcription)
:param input_files: (list of str) (audio, ortho)
:param output: (str) the output file name
:returns: (sppasTranscription)
"""
input_channel, input_tier = self.get_inputs(input_files)
framerate = input_channel.get_framerate()
tier = self.convert(input_channel, input_tier)
if tier is None:
self.logfile.print_message(_info(1296), indent=2, status=-1)
return []
trs_output = sppasTranscription(self.name)
trs_output.set_meta('media_sample_rate', str(framerate))
trs_output.set_meta('annotation_result_of', input_files[0])
trs_output.append(tier)
tier.set_media(input_tier.get_media())
if output is not None:
output_file = self.fix_out_file_ext(output)
parser = sppasTrsRW(output_file)
parser.write(trs_output)
return [output_file]
return trs_output
run_for_batch_processing
Perform the annotation on a file.
This method is called by 'batch_processing'. It fixes the name of the output file, and call the run method.
Override to NOT ANNOTATE if an annotation is already existing.
Parameters
- input_files: (list of str) the required inputs for a run
Returns
- output file name or None
View Source
def run_for_batch_processing(self, input_files):
"""Perform the annotation on a file.
This method is called by 'batch_processing'. It fixes the name of the
output file, and call the run method.
Override to NOT ANNOTATE if an annotation is already existing.
:param input_files: (list of str) the required inputs for a run
:returns: output file name or None
"""
root_pattern = self.get_out_name(input_files[0])
ext = []
for e in sppasTrsRW.annot_extensions():
if e not in ('.txt',):
ext.append(e)
exists_out_name = sppasBaseAnnotation._get_filename(root_pattern, ext)
new_files = list()
if exists_out_name is not None:
out_name = self.fix_out_file_ext(root_pattern)
if exists_out_name.lower() == out_name.lower():
return list()
else:
try:
parser = sppasTrsRW(exists_out_name)
t = parser.read()
parser.set_filename(out_name)
parser.write(t)
self.logfile.print_message(_info(1302).format(out_name), indent=2, status=annots.info)
return [out_name]
except:
pass
else:
try:
new_files = self.run(input_files, root_pattern)
except Exception as e:
self.logfile.print_message('{:s}\n'.format(str(e)), indent=1, status=-1)
return new_files
get_output_pattern
Pattern this annotation uses in an output filename.
View Source
def get_output_pattern(self):
"""Pattern this annotation uses in an output filename."""
return self._options.get('outputpattern', '')
get_input_patterns
Pattern this annotation expects for its input filename.
View Source
def get_input_patterns(self):
"""Pattern this annotation expects for its input filename."""
return [self._options.get('inputpattern1', ''), self._options.get('inputpattern2', '')]
get_input_extensions
Extensions that the annotation expects for its input filename.
View Source
@staticmethod
def get_input_extensions():
"""Extensions that the annotation expects for its input filename."""
return [sppasFiles.get_informat_extensions('AUDIO'), ['.txt']]
Private functions
_set_meta
Set meta values to the tier.
Parameters
- filler
- tier
View Source
def _set_meta(self, filler, tier):
"""Set meta values to the tier."""
tier.set_meta('threshold_volume', str(filler.get_vol_threshold()))
tier.set_meta('minimum_silence_duration', str(filler.get_min_sil_dur()))
tier.set_meta('minimum_ipus_duration', str(filler.get_min_ipu_dur()))
tier.set_meta('shift_ipus_start', str(filler.get_shift_start()))
tier.set_meta('shift_ipus_end', str(filler.get_shift_end()))
self.logfile.print_message(_info(1058), indent=1)
m1 = _info(1290).format(int(filler.get_vol_threshold()))
m2 = _info(1292).format(filler.get_min_sil_dur())
m3 = _info(1294).format(filler.get_min_ipu_dur())
for m in (m1, m2, m3):
self.logfile.print_message(m, indent=2)