SPPAS 4.22

https://sppas.org/

Module sppas.src.annotations

Class sppasAlign

Description

SPPAS integration of the Alignment automatic annotation.

  • author: Brigitte Bigi
  • contact: contact@sppas.org

This class can produce 1 up to 5 tiers with names:

  • PhonAlign
  • TokensAlign (if tokens are given in the input)
  • PhnTokAlign - option (if tokens are given in the input)

How to use sppasAlign?

Example
 >>> a = sppasAlign()
 >>> a.set_aligner('julius')
 >>> a.load_resources(model_dirname)
 >>> a.run([phones], [audio, tokens], output)

Constructor

Create a new sppasAlign instance.

Log is used for a better communication of the annotation process and its results. If None, logs are redirected to the default logging system.

Parameters
  • log: (sppasLog) Human-readable logs.
View Source
def __init__(self, log=None):
    """Create a new sppasAlign instance.

    Log is used for a better communication of the annotation process and
    its results.
    If None, logs are redirected to the default logging system.

    :param log: (sppasLog) Human-readable logs.

    """
    sppasBaseAnnotation.__init__(self, 'alignment.json', log)
    self.mapping = sppasMapping()
    self._segmenter = TrackSegmenter(model=None, aligner_name='basic')
    self._tracksrw = TracksReaderWriter(sppasMapping())
    self.__lang = 'und'

Public functions

load_resources

Fix the acoustic model directory.

Create a SpeechSegmenter and AlignerIO.

Parameters
  • model: (str) Directory of the acoustic model of the language of the text
  • model_L1: (str) Directory of the acoustic model of the mother language of the speaker
  • lang: (str) Language code
View Source
def load_resources(self, model, model_L1=None, lang='und', **kwargs):
    """Fix the acoustic model directory.

        Create a SpeechSegmenter and AlignerIO.

        :param model: (str) Directory of the acoustic model of the language
        of the text
        :param model_L1: (str) Directory of the acoustic model of
        the mother language of the speaker
        :param lang: (str) Language code

        """
    self.__lang = lang
    if model_L1 is not None:
        try:
            model_mixer = sppasModelMixer()
            model_mixer.read(model, model_L1)
            output_dir = os.path.join(paths.resources, 'models', 'models-mix')
            model_mixer.mix(output_dir, gamma=0.6)
            model = output_dir
        except Exception as e:
            self.logfile.print_message(MSG_MODEL_L1_FAILED.format(str(e)), indent=2, status=annots.warning)
    mapping_filename = os.path.join(model, 'monophones.repl')
    if os.path.isfile(mapping_filename):
        try:
            mapping = sppasMapping(mapping_filename)
        except:
            mapping = sppasMapping()
            logging.warning('No mapping file was found in model {:s}'.format(model))
    else:
        mapping = sppasMapping()
    self._tracksrw = TracksReaderWriter(mapping)
    self._segmenter.set_model(model)

fix_options

Fix all options.

Available options are:

  • clean
  • basic
  • aligner
Parameters
  • options: (sppasOption)
View Source
def fix_options(self, options):
    """Fix all options.

        Available options are:

            - clean
            - basic
            - aligner

        :param options: (sppasOption)

        """
    for opt in options:
        key = opt.get_key()
        if 'clean' == key:
            self.set_clean(opt.get_value())
        elif 'basic' == key:
            self.set_basic(opt.get_value())
        elif 'aligner' == key:
            self.set_aligner(opt.get_value())
        elif 'pattern' in key:
            self._options[key] = opt.get_value()
        else:
            raise AnnotationOptionError(key)

set_clean

Fix the clean option.

Parameters
  • clean: (bool) If clean is set to True then temporary files will be removed.
View Source
def set_clean(self, clean):
    """Fix the clean option.

        :param clean: (bool) If clean is set to True then temporary files
        will be removed.

        """
    self._options['clean'] = clean

set_aligner

Fix the name of the aligner.

Parameters
  • aligner_name: (str) Case-insensitive name of the aligner.
View Source
def set_aligner(self, aligner_name):
    """Fix the name of the aligner.

        :param aligner_name: (str) Case-insensitive name of the aligner.

        """
    self._options['aligner'] = aligner_name

set_basic

Fix the basic option.

Parameters
  • basic: (bool) If basic is set to True, a basic segmentation will be performed if the main aligner fails.
View Source
def set_basic(self, basic):
    """Fix the basic option.

        :param basic: (bool) If basic is set to True, a basic segmentation
        will be performed if the main aligner fails.

        """
    self._options['basic'] = basic

convert

Perform speech segmentation of data.

Parameters
  • phon_tier: (Tier) phonetization.
  • tok_tier: (Tier) tokenization, or None.
  • tokfakedtier: (Tier) rescue tokenization, or None.
  • input_audio: (str) Audio file name.
  • workdir: (str) The working directory
Returns
  • tierphn, tiertok
View Source
def convert(self, phon_tier, tok_tier, tok_faked_tier, input_audio, workdir):
    """Perform speech segmentation of data.

        :param phon_tier: (Tier) phonetization.
        :param tok_tier: (Tier) tokenization, or None.
        :param tok_faked_tier: (Tier) rescue tokenization, or None.
        :param input_audio: (str) Audio file name.
        :param workdir: (str) The working directory

        :returns: tier_phn, tier_tok

        """
    self._segmenter.set_aligner(self._options['aligner'])
    self._options['aligner'] = self._segmenter.get_aligner_name()
    if os.path.exists(workdir) is False:
        raise NoDirectoryError(workdir)
    self.logfile.print_message(MSG_ACTION_SPLIT_INTERVALS, indent=1)
    if os.path.exists(workdir) is False:
        os.mkdir(workdir)
    if phon_tier.is_point() is True:
        self._tracksrw.set_radius(0.0)
    else:
        self._tracksrw.set_radius(0.005)
    errmsg = self._tracksrw.split_into_tracks(input_audio, phon_tier, tok_tier, tok_faked_tier, workdir)
    if len(errmsg) > 0:
        self.logfile.print_message(errmsg, indent=2, status=annots.warning)
    self._segment_tracks(workdir)
    self.logfile.print_message(MSG_ACTION_MERGE_INTERVALS, indent=1)
    tier_phn, tier_tok, tier_pron = self._tracksrw.read_aligned_tracks(workdir)
    self.__add_meta_in_tier(tier_phn)
    self.__add_meta_in_tier(tier_tok)
    self.__add_meta_in_tier(tier_pron)
    return (tier_phn, tier_tok, tier_pron)

fix_workingdir

Fix the working directory to store temporarily the data.

Parameters
  • inputaudio: (str) Audio file name
View Source
@staticmethod
def fix_workingdir(inputaudio=None):
    """Fix the working directory to store temporarily the data.

        :param inputaudio: (str) Audio file name

        """
    sf = sppasFileUtils()
    workdir = sf.set_random()
    while os.path.exists(workdir) is True:
        workdir = sf.set_random()
    os.mkdir(workdir)
    if inputaudio is not None:
        audio_file = os.path.basename(inputaudio)
        sf = sppasFileUtils(audio_file)
        formatted_audio_file = sf.format()
        shutil.copy(inputaudio, os.path.join(workdir, formatted_audio_file))
    return workdir

get_inputs

Return the audio file name and the tiers.

Parameters
  • input_files: (list)
Raises

NoTierInputError

Returns
  • (audio filename, sppasTier, sppasTier, sppasTier)
View Source
def get_inputs(self, input_files):
    """Return the audio file name and the tiers.

        :param input_files: (list)
        :raise: NoTierInputError
        :return: (audio filename, sppasTier, sppasTier, sppasTier)

        """
    ext = self.get_input_extensions()
    audio_ext = ext[0]
    tier_phon = None
    tier_tok_faked = None
    tier_tok_std = None
    audio_filename = None
    for filename in input_files:
        if filename is None:
            continue
        fn, fe = os.path.splitext(filename)
        if audio_filename is None and fe in audio_ext:
            audio_filename = filename
        if fe in ext[1]:
            parser = sppasTrsRW(filename)
            trs_input = parser.read()
            if tier_phon is None:
                tier_phon = sppasFindTier.phonetization(trs_input)
            if tier_tok_std is None:
                tier_tok_std = sppasFindTier.tokenization(trs_input, 'std')
            if tier_tok_faked is None:
                tier_tok_faked = sppasFindTier.tokenization(trs_input)
    if tier_tok_std is None and tier_tok_faked is None:
        self.logfile.print_message(MSG_TOKENS_DISABLED, indent=2, status=annots.warning)
    if tier_phon is None:
        logging.error('A tier with a phonetization is required but was not found')
        raise NoTierInputError
    return (audio_filename, tier_phon, tier_tok_std, tier_tok_faked)

run

Run the automatic annotation process on an input.

Parameters
  • input_files: (list of str) Phonemes, and optionally tokens, audio
  • output: (str) the output name
Returns
  • (sppasTranscription)
View Source
def run(self, input_files, output=None):
    """Run the automatic annotation process on an input.

        :param input_files: (list of str) Phonemes, and optionally tokens, audio
        :param output: (str) the output name
        :returns: (sppasTranscription)

        """
    audio_filename, phon_tier, tok_tier, tok_faked_tier = self.get_inputs(input_files)
    framerate = None
    if audio_filename is not None and AUDIOOPY is True:
        audio_speech = audioopy.aio.open(audio_filename)
        n = audio_speech.get_nchannels()
        framerate = audio_speech.get_framerate()
        if n != 1:
            audio_speech.close()
            raise AudioChannelError(n)
        audio_speech.close()
    else:
        self.logfile.print_message("Audio is unavailable. Aligner is set to 'basic' and no extra option available.", indent=1, status=annots.warning)
        self._options['aligner'] = 'basic'
    workdir = sppasAlign.fix_workingdir(audio_filename)
    if self._options['clean'] is False:
        self.logfile.print_message(MSG_WORKDIR.format(dirname=workdir), indent=3, status=None)
    media = None
    if audio_filename is not None:
        extm = os.path.splitext(audio_filename)[1].lower()[1:]
        media = sppasMedia(audio_filename, mime_type='audio/' + extm)
        logging.info('Alignment of {:s}'.format(audio_filename))
    try:
        tier_phn, tier_tok, tier_pron = self.convert(phon_tier, tok_tier, tok_faked_tier, audio_filename, workdir)
        if media is not None:
            tier_phn.set_media(media)
            tier_tok.set_media(media)
            tier_pron.set_media(media)
        trs_output = sppasTranscription(self.name)
        trs_output.set_meta('annotation_result_of', input_files[0])
        trs_output.set_meta('aligner_name', self._segmenter.get_aligner_name())
        trs_output.set_meta('language_iso', 'iso639-3')
        trs_output.set_meta('language_name_0', 'Undetermined')
        if len(self.__lang) == 3:
            trs_output.set_meta('language_code_0', self.__lang)
            trs_output.set_meta('language_url_0', 'https://iso639-3.sil.org/code/' + self.__lang)
        else:
            trs_output.set_meta('language_code_0', 'und')
            trs_output.set_meta('language_url_0', 'https://iso639-3.sil.org/code/und')
        if framerate is not None:
            trs_output.set_meta('media_sample_rate', str(framerate))
        trs_output.append(tier_phn)
        if tier_tok is not None:
            tier_tok.set_media(media)
            trs_output.append(tier_tok)
        if tier_pron is not None:
            tier_pron.set_media(media)
            trs_output.append(tier_pron)
    except Exception as e:
        self.logfile.print_message(str(e))
        if self._options['clean'] is True:
            shutil.rmtree(workdir)
        raise
    error = None
    output_file = list()
    if output is not None:
        output_file = self.fix_out_file_ext(output)
        try:
            parser = sppasTrsRW(output_file)
            parser.write(trs_output)
        except Exception as e:
            error = e
    if self._options['clean'] is True:
        shutil.rmtree(workdir)
    if error is not None:
        raise error
    if output is not None:
        return [output_file]
    return trs_output

get_output_pattern

Pattern this annotation uses in an output filename.

View Source
def get_output_pattern(self):
    """Pattern this annotation uses in an output filename."""
    return self._options.get('outputpattern', '-palign')

get_input_patterns

Pattern this annotation expects for its input filename.

View Source
def get_input_patterns(self):
    """Pattern this annotation expects for its input filename."""
    return [self._options.get('inputpattern1', ''), self._options.get('inputpattern2', '-phon'), self._options.get('inputpattern3', '-token')]

get_input_extensions

Extensions that the annotation expects for its input filename.

View Source
@staticmethod
def get_input_extensions():
    """Extensions that the annotation expects for its input filename."""
    return [sppasFiles.get_informat_extensions('AUDIO'), sppasFiles.get_informat_extensions('ANNOT_ANNOT'), sppasFiles.get_informat_extensions('ANNOT_ANNOT')]

Private functions

_segment_track_with_basic

Segmentation of a track with the basic alignment system.

Parameters
  • audio
  • phn
  • token
  • align
View Source
def _segment_track_with_basic(self, audio, phn, token, align):
    """Segmentation of a track with the basic alignment system."""
    self.logfile.print_message(MSG_BASIC, indent=2)
    aligner_id = self._segmenter.get_aligner_name()
    self._segmenter.set_aligner('basic')
    msg = self._segmenter.segment(audio, phn, token, align)
    if len(msg) > 0:
        self.logfile.print_message(msg, indent=2, status=annots.info)
    self._segmenter.set_aligner(aligner_id)

_segment_tracks

Call the Aligner to align each unit of a directory.

Parameters
  • workdir: (str) directory to get units and put alignments.
View Source
def _segment_tracks(self, workdir):
    """Call the Aligner to align each unit of a directory.

        :param workdir: (str) directory to get units and put alignments.

        """
    nb_tracks = len(self._tracksrw.get_units(workdir))
    if nb_tracks == 0:
        raise EmptyDirectoryError(workdir)
    track_number = 0
    while track_number < nb_tracks:
        track_number += 1
        logging.info(MSG_ALIGN_TRACK.format(number=track_number))
        audio, phn, token, align = self._tracksrw.get_filenames(workdir, track_number)
        try:
            msg = self._segmenter.segment(audio, phn, token, align)
            if len(msg) > 0:
                self.logfile.print_message(MSG_ALIGN_TRACK.format(number=track_number), indent=1)
                self.logfile.print_message(msg, indent=2, status=annots.warning)
        except Exception as e:
            self.logfile.print_message(MSG_ALIGN_TRACK.format(number=track_number), indent=1)
            self.logfile.print_message(MSG_ALIGN_FAILED.format(name=self._segmenter.get_aligner_name()), indent=2, status=annots.error)
            self.logfile.print_message(str(e), indent=3, status=annots.error)
            logging.error(traceback.format_exc())
            if self._options['basic'] is True:
                self.logfile.print_message('Segment is rescued by assigning the same duration to each phoneme.', indent=2, status=annots.error)
                self._segment_track_with_basic(audio, phn, token, align)
            else:
                self._segmenter.segment(audio, None, None, align)

Protected functions

__add_meta_in_tier

Add metadata into a normalized tier.

Parameters
  • tier
View Source
def __add_meta_in_tier(self, tier):
    """Add metadata into a normalized tier."""
    tier.set_meta('linguistic_resource_model', self._segmenter.get_model_dirname())
    tier.set_meta('aligner_name', self._segmenter.get_aligner_name())
    tier.set_meta('language', '0')