Source code for annotations.SearchIPUs.sppassearchipus

# -*- coding: UTF-8 -*-
"""
:filename: sppas.src.annotations.SearchIPUs.sppassearchipus.py
:author:   Brigitte Bigi
:contact:  develop@sppas.org
:summary:  SPPAS integration of Search for IPUs automatic annotation

.. _This file is part of SPPAS: <http://www.sppas.org/>
..
    -------------------------------------------------------------------------

     ___   __    __    __    ___
    /     |  \  |  \  |  \  /              the automatic
    \__   |__/  |__/  |___| \__             annotation and
       \  |     |     |   |    \             analysis
    ___/  |     |     |   | ___/              of speech

    Copyright (C) 2011-2021  Brigitte Bigi
    Laboratoire Parole et Langage, Aix-en-Provence, France

    Use of this software is governed by the GNU Public License, version 3.

    SPPAS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    SPPAS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with SPPAS. If not, see <http://www.gnu.org/licenses/>.

    This banner notice must not be removed.

    -------------------------------------------------------------------------

"""

import os

from sppas.src.config import symbols
from sppas.src.config import annots
from sppas.src.config import info
from sppas.src.config import u
import sppas.src.audiodata.aio
from sppas.src.anndata import sppasTranscription
from sppas.src.anndata import sppasTier
from sppas.src.anndata import sppasMedia
from sppas.src.anndata import sppasLocation
from sppas.src.anndata import sppasInterval
from sppas.src.anndata import sppasPoint
from sppas.src.anndata import sppasLabel
from sppas.src.anndata import sppasTag
from sppas.src.anndata import sppasTrsRW

from ..autils import SppasFiles
from ..annotationsexc import AnnotationOptionError
from ..baseannot import sppasBaseAnnotation

from .searchipus import SearchIPUs

# ---------------------------------------------------------------------------

SIL_ORTHO = list(
    symbols.ortho.keys()
    )[list(symbols.ortho.values()).index("silence")]

# ---------------------------------------------------------------------------


def _info(msg_id):
    return u(info(msg_id, "annotations"))

# ---------------------------------------------------------------------------


[docs]class sppasSearchIPUs(sppasBaseAnnotation): """SPPAS integration of the IPUs detection. """
[docs] def __init__(self, log=None): """Create a new sppasSearchIPUs instance. :param log: (sppasLog) Human-readable logs. """ super(sppasSearchIPUs, self).__init__("searchipus.json", log)
# ----------------------------------------------------------------------- # Methods to fix options # -----------------------------------------------------------------------
[docs] def fix_options(self, options): """Fix all options. Available options are: - threshold: volume threshold to decide a window is silence or not - win_length: length of window for a estimation or volume values - min_sil: minimum duration of a silence - min_ipu: minimum duration of an ipu - shift_start: start boundary shift value. - shift_end: end boundary shift value. :param options: (sppasOption) """ for opt in options: key = opt.get_key() if "threshold" == key: self.set_threshold(opt.get_value()) elif "win_length" == key: self.set_win_length(opt.get_value()) elif "min_sil" == key: self.set_min_sil(opt.get_value()) elif "min_ipu" == key: self.set_min_ipu(opt.get_value()) elif "shift_start" == key: self.set_shift_start(opt.get_value()) elif "shift_end" == key: self.set_shift_end(opt.get_value()) elif "pattern" in key: self._options[key] = opt.get_value() else: raise AnnotationOptionError(key)
# -----------------------------------------------------------------------
[docs] def get_threshold(self): return self._options['threshold']
[docs] def get_win_length(self): return self._options['win_length']
[docs] def get_min_sil(self): return self._options['min_sil']
[docs] def get_min_ipu(self): return self._options['min_ipu']
[docs] def get_shift_start(self): return self._options['shift_start']
[docs] def get_shift_end(self): return self._options['shift_end']
# -----------------------------------------------------------------------
[docs] def set_threshold(self, value): """Fix the threshold volume. :param value: (int) RMS value used as volume threshold """ self._options['threshold'] = value
# -----------------------------------------------------------------------
[docs] def set_win_length(self, value): """Set a new length of window for a estimation or volume values. TAKE CARE: it cancels any previous estimation of volume and silence search. :param value: (float) generally between 0.01 and 0.04 seconds. """ self._options['win_length'] = value
# -----------------------------------------------------------------------
[docs] def set_min_sil(self, value): """Fix the default minimum duration of a silence. :param value: (float) Duration in seconds. """ self._options['min_sil'] = value
# -----------------------------------------------------------------------
[docs] def set_min_ipu(self, value): """Fix the default minimum duration of an IPU. :param value: (float) Duration in seconds. """ self._options['min_ipu'] = value
# -----------------------------------------------------------------------
[docs] def set_shift_start(self, value): """Fix the start boundary shift value. :param value: (float) Duration in seconds. """ self._options['shift_start'] = value
# -----------------------------------------------------------------------
[docs] def set_shift_end(self, value): """Fix the end boundary shift value. :param value: (float) Duration in seconds. """ self._options['shift_end'] = value
# ----------------------------------------------------------------------- # Annotate # -----------------------------------------------------------------------
[docs] @staticmethod def tracks_to_tier(tracks, end_time, vagueness): """Create a sppasTier object from tracks. :param tracks: (List of tuple) with (from, to) values in seconds :param end_time: (float) End-time of the tier :param vagueness: (float) vagueness used for silence search """ if len(tracks) == 0: raise IOError('No IPUs to write.\n') tier = sppasTier("IPUs") tier.set_meta('number_of_ipus', str(len(tracks))) i = 0 to_prec = 0. for (from_time, to_time) in tracks: if from_time == 0. or to_time == end_time: radius = 0. else: radius = vagueness / 2. # From the previous track to the current track: silence if to_prec < from_time: tier.create_annotation( sppasLocation( sppasInterval(sppasPoint(to_prec, radius), sppasPoint(from_time, radius))), sppasLabel( sppasTag(SIL_ORTHO)) ) # New track with speech tier.create_annotation( sppasLocation( sppasInterval(sppasPoint(from_time, radius), sppasPoint(to_time, radius))), sppasLabel( sppasTag("ipu_%d" % (i+1))) ) # Go to the next i += 1 to_prec = to_time # The end is a silence? Fill... begin = sppasPoint(to_prec, vagueness / 2.) if begin < end_time: tier.create_annotation( sppasLocation( sppasInterval(begin, sppasPoint(end_time))), sppasLabel( sppasTag(SIL_ORTHO)) ) return tier
# ----------------------------------------------------------------------- def _set_meta(self, searcher, tier): """Set meta values to the tier.""" tier.set_meta('required_threshold_volume', str(searcher.get_vol_threshold())) tier.set_meta('estimated_threshold_volume', str(searcher.get_effective_threshold())) tier.set_meta('minimum_silence_duration', str(searcher.get_min_sil_dur())) tier.set_meta('minimum_ipus_duration', str(searcher.get_min_ipu_dur())) tier.set_meta('shift_ipus_start', str(searcher.get_shift_start())) tier.set_meta('shift_ipus_end', str(searcher.get_shift_end())) meta = ("rms_min", "rms_max", "rms_mean", "rms_median", "rms_coefvar") for key, value in zip(meta, searcher.get_rms_stats()): tier.set_meta(str(key), str(value)) self.logfile.print_message("Information: ", indent=1) if searcher.get_vol_threshold() == 0: self.logfile.print_message( "Automatically estimated threshold volume value: {:d}"\ "".format(searcher.get_effective_threshold()), indent=2) self.logfile.print_message( "Number of IPUs found: {:s}".format(tier.get_meta("number_of_ipus")), indent=2) # ----------------------------------------------------------------------- # Apply the annotation on one or several given files # -----------------------------------------------------------------------
[docs] def convert(self, channel): """Search for IPUs in the given channel. :param channel: (sppasChannel) Input channel :returns: (sppasTier) """ # Create the searcher instance searcher = SearchIPUs(channel=channel) # Fix options searcher.set_vol_threshold(self._options['threshold']) searcher.set_win_length(self._options['win_length']) searcher.set_min_sil(self._options['min_sil']) searcher.set_min_ipu(self._options['min_ipu']) searcher.set_shift_start(self._options['shift_start']) searcher.set_shift_end(self._options['shift_end']) # Process the data. tracks = searcher.get_tracks(time_domain=True) tier = self.tracks_to_tier( tracks, channel.get_duration(), searcher.get_vagueness() ) self._set_meta(searcher, tier) return tier
# -----------------------------------------------------------------------
[docs] def run(self, input_files, output=None): """Run the automatic annotation process on an input. :param input_files: (list of str) Audio :param output: (str) the output file name :returns: (sppasTranscription) """ # Get audio and the channel we'll work on audio_speech = sppas.src.audiodata.aio.open(input_files[0]) framerate = audio_speech.get_framerate() n = audio_speech.get_nchannels() if n != 1: raise IOError("An audio file with only one channel is expected. " "Got {:d} channels.".format(n)) # Extract the channel idx = audio_speech.extract_channel(0) channel = audio_speech.get_channel(idx) tier = self.convert(channel) # Create the transcription to put the result trs_output = sppasTranscription(self.name) trs_output.set_meta("media_sample_rate", str(framerate)) trs_output.set_meta('annotation_result_of', input_files[0]) trs_output.append(tier) extm = os.path.splitext(input_files[0])[1].lower()[1:] media = sppasMedia(os.path.abspath(input_files[0]), mime_type="audio/"+extm) tier.set_media(media) # Save in a file if output is not None: output_file = self.fix_out_file_ext(output) parser = sppasTrsRW(output_file) parser.write(trs_output) return [output_file] return trs_output
# -----------------------------------------------------------------------
[docs] def run_for_batch_processing(self, input_files): """Perform the annotation on a file. This method is called by 'batch_processing'. It fixes the name of the output file. If the output file is already existing, the annotation is cancelled (the file won't be overridden). If not, it calls the run method. :param input_files: (list of str) the inputs to perform a run :returns: output file name or None """ # Fix input/output file name root_pattern = self.get_out_name(input_files[0]) # Is there already an existing output file (in any format)! ext = [] for e in sppasTrsRW.annot_extensions(): if e not in ('.txt', ): ext.append(e) exist_out_name = sppasBaseAnnotation._get_filename(root_pattern, ext) # it's existing... but not in the expected format: convert! if exist_out_name is not None: out_name = self.fix_out_file_ext(root_pattern) if exist_out_name.lower() == out_name.lower(): return list() else: try: parser = sppasTrsRW(exist_out_name) t = parser.read() parser.set_filename(out_name) parser.write(t) self.logfile.print_message(_info(1302).format(out_name), indent=2, status=annots.warning) return [out_name] except: pass try: # Execute annotation new_files = self.run(input_files, root_pattern) except Exception as e: new_files = list() self.logfile.print_message("{:s}\n".format(str(e)), indent=2, status=-1) return new_files
# -----------------------------------------------------------------------
[docs] @staticmethod def get_input_extensions(): """Extensions that the annotation expects for its input filename.""" return [SppasFiles.get_informat_extensions("AUDIO")]