Source code for anndata.aio.sclite

# -*- coding: UTF-8 -*-
"""
:filename: sppas.src.anndata.aio.sclite.py
:author:   Brigitte Bigi
:contact:  develop@sppas.org
:summary:  Input/Output of SCTK formats.

.. _This file is part of SPPAS: http://www.sppas.org/
..
    -------------------------------------------------------------------------

     ___   __    __    __    ___
    /     |  \  |  \  |  \  /              the automatic
    \__   |__/  |__/  |___| \__             annotation and
       \  |     |     |   |    \             analysis
    ___/  |     |     |   | ___/              of speech

    Copyright (C) 2011-2021  Brigitte Bigi
    Laboratoire Parole et Langage, Aix-en-Provence, France

    Use of this software is governed by the GNU Public License, version 3.

    SPPAS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    SPPAS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with SPPAS. If not, see <http://www.gnu.org/licenses/>.

    This banner notice must not be removed.

    -------------------------------------------------------------------------

Sclite readers and writers: ctm, stm file formats.
The program sclite is a tool for scoring and evaluating the output of
speech recognition systems.

Sclite is part of the NIST SCTK Scoring Tookit:
https://www.nist.gov/itl/iad/mig/tools

File formats description:
http://www1.icsi.berkeley.edu/Speech/docs/sctk-1.2/infmts.htm#ctm_fmt_name_0

Remark:
=======

Because comments are possible, this class uses this function as an
opportunity to store metadata.

"""
import logging
import codecs
import os.path

from sppas.src.config import sg
from sppas.src.config.makeunicode import sppasUnicode

from ..anndataexc import AioLocationTypeError
from ..anndataexc import AnnDataTypeError
from ..anndataexc import AioLineFormatError
from ..ann.annotation import sppasAnnotation
from ..ann.annlocation import sppasLocation
from ..ann.annlocation import sppasPoint
from ..ann.annlocation import sppasInterval
from ..ann.annlabel import sppasLabel
from ..ann.annlabel import sppasTag

from .text import sppasBaseText
from .aioutils import format_labels
from .aioutils import serialize_labels
from .aioutils import is_ortho_tier
from .aioutils import load

# ---------------------------------------------------------------------------


[docs]class sppasBaseSclite(sppasBaseText): """SPPAS base Sclite reader and writer. * * * * * Current version does not fully support alternations. * * * * * """
[docs] def __init__(self, name=None): """Initialize a new sppasBaseSclite instance. :param name: (str) This transcription name. """ if name is None: name = self.__class__.__name__ super(sppasBaseSclite, self).__init__(name) self.software = "SCTK" # override all self._accept_multi_tiers = True self._accept_no_tiers = True self._accept_metadata = False self._accept_ctrl_vocab = False self._accept_media = True self._accept_hierarchy = False self._accept_point = False self._accept_interval = True self._accept_disjoint = False self._accept_alt_localization = False self._accept_alt_tag = True self._accept_radius = False self._accept_gaps = True self._accept_overlaps = True
# -----------------------------------------------------------------------
[docs] @staticmethod def make_point(midpoint): """The localization is a time value, so always a float.""" try: midpoint = float(midpoint) except ValueError: raise AnnDataTypeError(midpoint, "float") return sppasPoint(midpoint, radius=0.005)
# ---------------------------------------------------------------------------
[docs]class sppasCTM(sppasBaseSclite): """SPPAS ctm reader and writer. This is the reader/writer of the time marked conversation input files to be used for scoring the output of speech recognizers via the NIST sclite() program. This file format is as follow (in BNF): CTM :== <F> <C> <BT> <DUR> word [ <CONF> ] where: <F> -> The waveform filename. NOTE: no path-names or extensions are expected. <C> -> The waveform channel. Either "A" or "B". The text of the waveform channel is not restricted by sclite. The text can be any text string without whitespace so long as the matching string is found in both the reference and hypothesis input files. <BT> -> The begin time (seconds) of the word, measured from the start time of the file. <DUR> -> The duration (seconds) of the word. <CONF> -> Optional confidence score. The file must be sorted by the first three columns: the first and the second in ASCII order, and the third by a numeric order. Lines beginning with ';;' are considered comments and ignored by sclite. Blank lines are also ignored. * * * NOT IMPLEMENTED * * * ============================ Alternations are also accepted in some extended CTM. Examples: ;; 7654 A * * <ALT_BEGIN> 7654 A 12.00 0.34 UM 7654 A * * <ALT> 7654 A 12.00 0.34 UH 7654 A * * <ALT_END> ;; 5555 A * * <ALT_BEGIN> 5555 A 222.77 0.32 BYEBYE 5555 A * * <ALT> 5555 A 222.78 0.12 BYE 5555 A 222.93 0.16 BYE 5555 A * * <ALT_END> ;; 5555 A * * <ALT_BEGIN> 5555 A 186.32 0.01 D- 5555 A * * <ALT> 5555 A * * <ALT_END> """
[docs] @staticmethod def detect(filename): """Check whether a file is of CTM format or not. :param filename: (str) Name of the file to check. :returns: (bool) """ # Open and load the content. try: lines = load(filename) except: return False # Check each line for line in lines: line = line.strip() try: # a comment, a blank line, an annotation sppasCTM.check_line(line) except AioLineFormatError: # not the right number of columns return False except ValueError: # can't convert begin/duration into float return False return True
# -----------------------------------------------------------------------
[docs] @staticmethod def check_line(line, line_number=0): """Check whether a line is an annotation or not. Raises AioLineFormatError() or ValueError() in case of a malformed line. :param line: (str) :param line_number: (int) :returns: (bool) """ # Comment if sppasBaseSclite.is_comment(line): return False # Blank line if len(line) == 0: return False # A column-delimited line tab_line = line.split() if len(tab_line) < 4 or len(tab_line) > 6: raise AioLineFormatError(line_number, line) # An alternation if tab_line[2] != "*": float(tab_line[2]) # begin float(tab_line[3]) # duration return True
# -----------------------------------------------------------------------
[docs] def __init__(self, name=None): """Initialize a new CTM instance. :param name: (str) This transcription name. """ if name is None: name = self.__class__.__name__ super(sppasCTM, self).__init__(name) self.default_extension = "ctm"
# ----------------------------------------------------------------------- # proceedReader # -----------------------------------------------------------------------
[docs] def get_tier(self, line): """Return the tier related to the given line. Find the tier or create it. :param line: (str) :returns: (sppasTier) """ tab_line = line.split() tier_name = tab_line[0] + "-" + tab_line[1] tier = self.find(tier_name) if tier is None: # Create the media linked to the tier media = sppasBaseText.create_media(tab_line[0].strip(), self) # Create the tier and set metadata tier = self.create_tier(tier_name, media=media) tier.set_meta("media_channel", tab_line[1]) # Do some communication if is_ortho_tier(tier_name) is False: logging.info( 'Tier {:s} is not an orthographic transcription. ' 'Whitespace in annotations are interpreted as a ' 'label separator.'.format(tier_name)) return tier
# -----------------------------------------------------------------------
[docs] @staticmethod def get_score(line): """Return the score of the label of a given line. :param line: (str) :returns: (float) or None if no score is given """ tab_line = line.split() score = None if len(tab_line) > 5: try: score = float(tab_line[-1]) except ValueError: pass return score
# -----------------------------------------------------------------------
[docs] def read(self, filename): """Read a ctm file and fill the Transcription. It creates a tier for each media-channel observed in the file. :param filename: (str) """ content = load(filename) self._parse_lines(content)
# ----------------------------------------------------------------------- def _parse_lines(self, lines): """Fill the transcription from the lines of the CTM file.""" # the number of the current alternation in_alt = 0 # the annotations of the alternations alternates = dict() # the current tier to fill tier = None # Extract rows, create tiers and metadata. for i, line in enumerate(lines): line = sppasUnicode(line).to_strip() # a comment can contain metadata if sppasBaseSclite.is_comment(line): if tier is None: sppasBaseSclite._parse_comment(line, self) else: sppasBaseSclite._parse_comment(line, tier) # ignore comments and blank lines if sppasCTM.check_line(line, i+1) is False: continue # check for the tier (find it or create it) tier = self.get_tier(line) # extract information of this annotation tab_line = line.strip().split() wavname, channel, begin, duration, word = tab_line[:5] score = sppasCTM.get_score(line) # check for an alternative annotation if begin == "*": if word == "<ALT_BEGIN>": alternates = dict() in_alt = 1 alternates[in_alt] = list() elif word == "<ALT>": in_alt += 1 alternates[in_alt] = list() else: # todo: we SHOULD add ALL the alternations into the tier # but we add only the first one... sppasCTM._add_alt_annotations(tier, alternates[1]) # re-init alternates = dict() in_alt = 0 else: ann = sppasCTM._create_annotation(begin, duration, word, score) if in_alt == 0: tier.add(ann) else: alternates[in_alt].append(ann) # ----------------------------------------------------------------------- @staticmethod def _add_alt_annotations(tier, annotations): """Add the annotations into the tier. :TODO: deal with annotation alternations. """ try: for ann in annotations: tier.add(ann) except Exception: pass # ----------------------------------------------------------------------- @staticmethod def _create_annotation(begin, duration, word, score): """Return the annotation corresponding to data of a line.""" word = sppasUnicode(word).clear_whitespace() label = sppasLabel(sppasTag(word), score) begin = float(begin) end = begin + float(duration) location = sppasLocation( sppasInterval(sppasBaseSclite.make_point(begin), sppasBaseSclite.make_point(end))) return sppasAnnotation(location, label) # ----------------------------------------------------------------------- # Writer # -----------------------------------------------------------------------
[docs] def write(self, filename): """Write a transcription into a file. :param filename: (str) """ with codecs.open(filename, 'w', sg.__encoding__, buffering=8096) as fp: # write an header with the metadata fp.write(sppasBaseSclite.serialize_header(filename, self)) for i, tier in enumerate(self): # fix the name of the waveform (for 1st column) waveform = "waveform-"+str(i) if tier.get_media() is not None: waveform = os.path.basename( tier.get_media().get_filename()) # fix the name of the channel (for 2nd column) channel = "A" if tier.is_meta_key('media_channel'): channel = tier.get_meta('media_channel') # serialize annotations for ann in tier: if ann.get_location().is_point(): raise AioLocationTypeError('Sclite CTM', 'points') fp.write(sppasCTM._serialize_annotation(ann, waveform, channel)) # write the metadata of this tier fp.write(sppasBaseText.serialize_metadata(tier)) fp.write('\n') fp.close()
# ----------------------------------------------------------------------- @staticmethod def _serialize_annotation(ann, waveform, channel): """Convert an annotation into lines for CTM files. Empty labels are replaced by "@". :param ann: (sppasAnnotation) :returns: (str) """ # fix location information begin = ann.get_location().get_best().get_begin().get_midpoint() duration = ann.get_location().get_best().get_end().get_midpoint() - \ begin # no label if len(ann.get_labels()) == 0: content = sppasCTM._serialize_tag(waveform, channel, begin, duration, sppasTag("")) else: content = "" # all labels will have the same begin/duration. # todo: check if sequences of labels are supported by CTM. for label in ann.get_labels(): # only one tag in the label: no alternation if len(label) == 1: tag = ann.get_best_tag() score = label.get_score(tag) content += sppasCTM._serialize_tag(waveform, channel, begin, duration, tag, score) # label with alternation tags else: content = "{:s} {:s} * * <ALT_BEGIN>\n".format(waveform, channel) for tag, score in label: content += sppasCTM._serialize_tag(waveform, channel, begin, duration, tag, score) content += "{:s} {:s} * * <ALT>\n".format(waveform, channel) content = content[:-2] content += "_END>\n" return content # ----------------------------------------------------------------------- @staticmethod def _serialize_tag(waveform, channel, begin, duration, tag, score=None): """Convert a tag with its score into a line for CTM files.""" if tag.is_empty(): tag_content = "@" else: tag_content = tag.get_content() # serialize the content content = "{:s} {:s} {:s} {:s} {:s}" \ "".format(waveform, channel, str(begin), str(duration), tag_content) if score is not None: content += " {:s}" \ "".format(str(score)) return content+"\n"
# ---------------------------------------------------------------------------
[docs]class sppasSTM(sppasBaseSclite): """SPPAS stm reader and writer. This is the reader/writer for the segment time marked files to be used for scoring the output of speech recognizers via the NIST sclite() program. STM :== <F> <C> <S> <BT> <ET> [ <LABEL> ] transcript . . . where: <F> -> The waveform filename. NOTE: no pathnames or extensions are expected. <C> -> The waveform channel. Either "A" or "B". The text of the waveform channel is not restricted by sclite. The text can be any text string without whitespace so long as the matching string is found in both the reference and hypothesis input files. <S> -> The speaker id, no restrictions apply to this name. <BT> -> The begin time (seconds) of the word, measured from the start time of the file. <ET> -> The end time (seconds) of the segment. <LABEL> -> A comma separated list of subset identifiers enclosed in angle brackets transcript -> The transcript can take on two forms: 1) a whitespace separated list of words, or 2) the string "IGNORE_TIME_SEGMENT_IN_SCORING". The list of words can contain a transcript alternation using the following BNF format: ALTERNATE :== "{" <text> ALT+ "}" ALT :== "|" <text> TEXT :== 1 thru n words | "@" | ALTERNATE The file must be sorted by the first and second columns in ASCII order, and the fourth in numeric order. Lines beginning with ';;' are considered comments and are ignored. Blank lines are also ignored. """
[docs] @staticmethod def detect(filename): """Check whether a file is of STM format or not. :param filename: (str) Name of the file to check. :returns: (bool) """ # Open and load the content. try: lines = load(filename) except: return False # Check each line for line in lines: line = line.strip() try: # a comment, a blank line, an annotation sppasSTM.check_line(line) except AioLineFormatError: # not the right number of columns return False except ValueError: # can't convert begin/end into float return False return True
# -----------------------------------------------------------------------
[docs] @staticmethod def check_line(line, line_number=0): """Check whether a line is an annotation or not. Raises AioLineFormatError() or ValueError() in case of a malformed line. :param line: (str) :param line_number: (int) :returns: (bool) """ # Comment if sppasBaseSclite.is_comment(line): return False # Blank line if len(line) == 0: return False # A column-delimited line tab_line = line.split() if len(tab_line) < 6: raise AioLineFormatError(line_number, line) float(tab_line[3]) # begin float(tab_line[4]) # end return True
# -----------------------------------------------------------------------
[docs] def __init__(self, name=None): """Initialize a new STM instance. :param name: (str) This transcription name. """ if name is None: name = self.__class__.__name__ super(sppasSTM, self).__init__(name) self.default_extension = "stm"
# ----------------------------------------------------------------------- # proceedReader # -----------------------------------------------------------------------
[docs] def get_tier(self, line): """Return the tier related to the given line. Find the tier or create it. :param line: (str) :returns: (sppasTier) """ tab_line = line.split() tier_name = tab_line[0] + "-" + tab_line[1] + "-" + tab_line[2] tier = self.find(tier_name) if tier is None: # Create the media linked to the tier media = sppasBaseSclite.create_media(tab_line[0].strip(), self) # Create the tier and set metadata tier = self.create_tier(tier_name, media=media) tier.set_meta("media_channel", tab_line[1]) tier.set_meta("speaker_id", tab_line[2]) return tier
# -----------------------------------------------------------------------
[docs] def read(self, filename): """Read a ctm file and fill the Transcription. It creates a tier for each media-channel observed in the file. :param filename: (str) """ content = load(filename) self._parse_lines(content)
# ----------------------------------------------------------------------- def _parse_lines(self, lines): """Fill the transcription from the lines of the STM file.""" # the current tier to fill tier = None # Extract rows, create tiers and metadata. for i, line in enumerate(lines): line = sppasUnicode(line).to_strip() # a comment can contain metadata if sppasBaseSclite.is_comment(line): if tier is None: sppasBaseSclite._parse_comment(line, self) else: sppasBaseSclite._parse_comment(line, tier) # ignore comments and blank lines if sppasSTM.check_line(line, i+1) is False: continue # check for the tier (find it or create it) tier = self.get_tier(line) # extract information of this annotation tab_line = line.split() utterance = " ".join(tab_line[5:]) if is_ortho_tier(tier.get_name()) is False: utterance = utterance.replace(" ", "\n") sppasSTM._create_annotation(tab_line[3], tab_line[4], utterance, tier) # ----------------------------------------------------------------------- @staticmethod def _create_annotation(begin, end, utterance, tier): """Add into the tier the annotation corresponding to data of a line.""" labels = format_labels(utterance, separator="\n") location = sppasLocation( sppasInterval(sppasBaseSclite.make_point(begin), sppasBaseSclite.make_point(end))) tier.create_annotation(location, labels) # ----------------------------------------------------------------------- # Writer # -----------------------------------------------------------------------
[docs] def write(self, filename): """Write a transcription into a file. :param filename: (str) """ with codecs.open(filename, 'w', sg.__encoding__, buffering=8096) as fp: # write an header with the metadata fp.write(sppasBaseSclite.serialize_header(filename, self)) for i, tier in enumerate(self): # fix the name of the waveform (for 1st column) waveform = "waveform-"+str(i) if tier.get_media() is not None: waveform = os.path.basename( tier.get_media().get_filename()) # fix the name of the channel (for 2nd column) channel = "A" if tier.is_meta_key('media_channel'): channel = tier.get_meta('media_channel') # fix the speaker speaker = "A" if tier.is_meta_key('speaker_id'): speaker = tier.get_meta('speaker_id') elif tier.is_meta_key('speaker_name'): speaker = tier.get_meta('speaker_name') # serialize annotations for ann in tier: if ann.get_location().is_point(): raise AioLocationTypeError('Sclite STM', 'points') fp.write(sppasSTM._serialize_annotation(ann, waveform, channel, speaker)) # write the metadata of this tier fp.write(sppasBaseText.serialize_metadata(tier)) fp.write('\n') fp.close()
# ----------------------------------------------------------------------- @staticmethod def _serialize_annotation(ann, waveform, channel, speaker): """Convert an annotation into lines for STM files. Empty labels are replaced by "IGNORE_TIME_SEGMENT_IN_SCORING". Alternative tags are included. :param ann: (sppasAnnotation) :returns: (str) """ # fix location information begin = ann.get_location().get_best().get_begin().get_midpoint() end = ann.get_location().get_best().get_end().get_midpoint() # fix label information content = serialize_labels(ann.get_labels(), separator=" ", empty="IGNORE_TIME_SEGMENT_IN_SCORING", alt=True) return "{wav} {cha} {spk} {beg} {end} {lab}\n".format( wav=waveform, cha=channel, spk=speaker, beg=str(begin), end=str(end), lab=content )