# -*- coding: UTF-8 -*-
"""
:filename: sppas.src.annotations.TGA.sppastga.py
:author: Brigitte Bigi
:contact: develop@sppas.org
:summary: SPPAS integration of TGA automatic annotation
.. _This file is part of SPPAS: <http://www.sppas.org/>
..
-------------------------------------------------------------------------
___ __ __ __ ___
/ | \ | \ | \ / the automatic
\__ |__/ |__/ |___| \__ annotation and
\ | | | | \ analysis
___/ | | | | ___/ of speech
Copyright (C) 2011-2021 Brigitte Bigi
Laboratoire Parole et Langage, Aix-en-Provence, France
Use of this software is governed by the GNU Public License, version 3.
SPPAS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
SPPAS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SPPAS. If not, see <http://www.gnu.org/licenses/>.
This banner notice must not be removed.
-------------------------------------------------------------------------
"""
import logging
import os
from sppas.src.config import symbols
from sppas.src.config import sppasUnicode
from sppas.src.anndata import sppasTrsRW
from sppas.src.anndata import sppasTranscription
from sppas.src.anndata import sppasTier
from sppas.src.anndata import sppasTag
from sppas.src.anndata import sppasLabel
from sppas.src.anndata.aio.aioutils import serialize_labels
from ..baseannot import sppasBaseAnnotation
from ..searchtier import sppasFindTier
from ..annotationsexc import AnnotationOptionError
from ..annotationsexc import EmptyOutputError
from ..annotationsexc import NoTierInputError
from .timegroupanalysis import TimeGroupAnalysis
# ----------------------------------------------------------------------------
[docs]class sppasTGA(sppasBaseAnnotation):
"""Estimate TGA on a tier -- from D. Gibbon.
Create time groups then map them into a dictionary where:
- key is a label assigned to the time group;
- value is the list of observed durations of segments in this TG.
"""
[docs] def __init__(self, log=None):
"""Create a new sppasTGA instance.
Log is used for a better communication of the annotation process and its
results. If None, logs are redirected to the default logging system.
:param log: (sppasLog) Human-readable logs.
"""
super(sppasTGA, self).__init__("tga.json", log)
# List of the symbols used to create the time groups
self._tg_separators = list(symbols.phone.keys())
# for backward compatibility, we can't simply use the symbols.phone
self._tg_separators.append('#')
self._tg_separators.append('@@')
self._tg_separators.append('+')
self._tg_separators.append('gb')
self._tg_separators.append('lg')
self._tg_separators.append('_')
# -----------------------------------------------------------------------
# Methods to fix options
# -----------------------------------------------------------------------
[docs] def fix_options(self, options):
"""Fix all options.
Available options are:
- with_radius
- original
- annotationpro
- tg_prefix_label
:param options: (sppasOption)
"""
for opt in options:
key = opt.get_key()
if "with_radius" == key:
self.set_with_radius(opt.get_value())
elif "original" == key:
self.set_intercept_slope_original(opt.get_value())
elif "annotationpro" == key:
self.set_intercept_slope_annotationpro(opt.get_value())
elif "tg_prefix_label" == key:
self.set_tg_prefix_label(opt.get_value())
elif "pattern" in key:
self._options[key] = opt.get_value()
else:
raise AnnotationOptionError(key)
# -----------------------------------------------------------------------
[docs] def set_tg_prefix_label(self, prefix):
"""Fix the prefix to add to each TG.
:param prefix: (str) Default is 'tg_'
"""
sp = sppasUnicode(prefix)
tg = sp.to_strip()
if len(tg) > 0:
self._options['tg_prefix_label'] = tg
# -----------------------------------------------------------------------
[docs] def set_with_radius(self, with_radius):
"""Set the with_radius option, used to estimate the duration.
:param with_radius: (int)
- 0 means to use Midpoint;
- negative value means to use R-;
- positive radius means to use R+.
"""
try:
w = int(with_radius)
self._options['with_radius'] = w
except ValueError:
raise
# -----------------------------------------------------------------------
[docs] def set_intercept_slope_original(self, value):
"""Estimate intercepts and slopes with the original method.
Default is False.
:param value: (boolean)
"""
self._options['original'] = bool(value)
# -----------------------------------------------------------------------
[docs] def set_intercept_slope_annotationpro(self, value):
"""Estimate intercepts and slopes with the method of annotationpro.
Default is True.
:param value: (boolean)
"""
self._options['annotationpro'] = bool(value)
# -----------------------------------------------------------------------
# Workers
# -----------------------------------------------------------------------
[docs] def syllables_to_timegroups(self, syllables):
"""Create the time group intervals.
:param syllables: (sppasTier)
:returns: (sppasTier) Time groups
"""
intervals = syllables.export_to_intervals(self._tg_separators)
intervals.set_name("TGA-TimeGroups")
for i, tg in enumerate(intervals):
tag_str = self._options['tg_prefix_label']
tag_str += str(i+1)
tg.append_label(sppasLabel(sppasTag(tag_str)))
return intervals
# ----------------------------------------------------------------------
[docs] def syllables_to_timesegments(self, syllables):
"""Create the time segments intervals.
Time segments are time groups with serialized syllables.
:param syllables:
:returns: (sppasTier) Time segments
"""
intervals = syllables.export_to_intervals(self._tg_separators)
intervals.set_name("TGA-Segments")
for i, tg in enumerate(intervals):
syll_anns = syllables.find(tg.get_lowest_localization(),
tg.get_highest_localization())
tag_str = ""
for ann in syll_anns:
tag_str += serialize_labels(ann.get_labels(), separator=" ")
tag_str += " "
tg.append_label(sppasLabel(sppasTag(tag_str)))
return intervals
# ----------------------------------------------------------------------
[docs] def timegroups_to_durations(self, syllables, timegroups):
"""Return a dict with timegroups and the syllable durations.
:param syllables: (sppasTier) Syllables
:param timegroups: (sppasTier) Time groups
:returns: (dict)
"""
tg_dur = dict()
for tg_ann in timegroups:
tg_label = serialize_labels(tg_ann.get_labels())
tg_dur[tg_label] = list()
syll_anns = syllables.find(tg_ann.get_lowest_localization(),
tg_ann.get_highest_localization())
for syll_ann in syll_anns:
loc = syll_ann.get_location().get_best()
# Fix the duration value of this syllable
dur = loc.duration()
value = dur.get_value()
if self._options['with_radius'] < 0:
value -= dur.get_margin()
if self._options['with_radius'] > 0:
value += dur.get_margin()
# Append in the list of values of this TG
tg_dur[tg_label].append(value)
return tg_dur
# -----------------------------------------------------------------------
[docs] @staticmethod
def tga_to_tier(tga_result, timegroups, tier_name, tag_type="float"):
"""Create a tier from one of the TGA result.
:param tga_result: One of the results of TGA
:param timegroups: (sppasTier) Time groups
:param tier_name: (str) Name of the output tier
:param tag_type: (str) Type of the sppasTag to be included
:returns: (sppasTier)
"""
tier = sppasTier(tier_name)
for tg_ann in timegroups:
tg_label = serialize_labels(tg_ann.get_labels())
tag_value = tga_result[tg_label]
if tag_type == "float":
tag_value = round(tag_value, 5)
tier.create_annotation(
tg_ann.get_location().copy(),
sppasLabel(sppasTag(tag_value, tag_type)))
return tier
# ----------------------------------------------------------------------
[docs] @staticmethod
def tga_to_tier_reglin(tga_result, timegroups, intercept=True):
"""Create tiers of intercept,slope from one of the TGA result.
:param tga_result: One of the results of TGA
:param timegroups: (sppasTier) Time groups
:param intercept: (boolean) Export the intercept.
If False, export Slope.
:returns: (sppasTier)
"""
if intercept is True:
tier = sppasTier('TGA-Intercept')
else:
tier = sppasTier('TGA-Slope')
for tg_ann in timegroups:
tg_label = serialize_labels(tg_ann.get_labels())
loc = tg_ann.get_location().copy()
if intercept is True:
tag_value = tga_result[tg_label][0]
else:
tag_value = tga_result[tg_label][1]
tag_value = round(tag_value, 5)
tier.create_annotation(loc,
sppasLabel(sppasTag(tag_value, "float")))
return tier
# ----------------------------------------------------------------------
[docs] def convert(self, syllables):
"""Estimate TGA on the given syllables.
:param syllables: (sppasTier)
:returns: (sppasTranscription)
"""
trs_out = sppasTranscription("TimeGroupAnalyser")
# Create the time groups: intervals of consecutive syllables
timegroups = self.syllables_to_timegroups(syllables)
timegroups.set_meta('timegroups_of_tier', syllables.get_name())
trs_out.append(timegroups)
# Create the time segments
timesegs = self.syllables_to_timesegments(syllables)
trs_out.append(timesegs)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, timesegs)
# Get the duration of each syllable, grouped into the timegroups
tg_dur = self.timegroups_to_durations(syllables, timegroups)
# here, we could add an option to add durations and
# delta durations into the transcription output
# Estimate TGA
ts = TimeGroupAnalysis(tg_dur)
# Put TGA non-optional results into tiers
tier = sppasTGA.tga_to_tier(ts.len(), timegroups, "TGA-Occurrences", "int")
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
tier = sppasTGA.tga_to_tier(ts.total(), timegroups, "TGA-Total")
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
tier = sppasTGA.tga_to_tier(ts.mean(), timegroups, "TGA-Mean")
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
tier = sppasTGA.tga_to_tier(ts.median(), timegroups, "TGA-Median")
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
tier = sppasTGA.tga_to_tier(ts.stdev(), timegroups, "TGA-StdDev")
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
tier = sppasTGA.tga_to_tier(ts.nPVI(), timegroups, "TGA-nPVI")
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
# Put TGA Intercept/Slope results
if self._options['original'] is True:
tier = sppasTGA.tga_to_tier_reglin(
ts.intercept_slope_original(),
timegroups,
True)
tier.set_name('TGA-Intercept-original')
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
tier = sppasTGA.tga_to_tier_reglin(
ts.intercept_slope_original(),
timegroups,
False)
tier.set_name('TGA-Slope-original')
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
if self._options['annotationpro'] is True:
tier = sppasTGA.tga_to_tier_reglin(
ts.intercept_slope(),
timegroups,
True)
tier.set_name('TGA-Intercept-timestamps')
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
tier = sppasTGA.tga_to_tier_reglin(
ts.intercept_slope(),
timegroups,
False)
tier.set_name('TGA-Slope-timestamps')
trs_out.append(tier)
# trs_out.add_hierarchy_link("TimeAssociation", timegroups, tier)
return trs_out
# ----------------------------------------------------------------------
# Apply the annotation on one given file
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
[docs] def run(self, input_files, output=None):
"""Run the automatic annotation process on an input.
:param input_files: (list of str) Syllabification
:param output: (str) the output file name
:returns: (sppasTranscription)
"""
# Get the tier to syllabify
tier_input = self.get_inputs(input_files)
# Create the transcription result
trs_output = sppasTranscription(self.name)
trs_output.set_meta('annotation_result_of', input_files[0])
# Estimate TGA on the tier
trs_output = self.convert(tier_input)
# Save in a file
if output is not None:
if len(trs_output) > 0:
output_file = self.fix_out_file_ext(output)
parser = sppasTrsRW(output_file)
parser.write(trs_output)
return [output_file]
else:
raise EmptyOutputError
return trs_output
# ----------------------------------------------------------------------
[docs] def get_output_pattern(self):
"""Pattern this annotation uses in an output filename."""
return self._options.get("outputpattern", "-tga")