"""
:filename: sppas.src.annotations.Align.tracksio.py
:author: Brigitte Bigi
:contact: develop@sppas.org
:summary: Automatic segmentation of the data into tracks
.. _This file is part of SPPAS: http://www.sppas.org/
..
-------------------------------------------------------------------------
___ __ __ __ ___
/ | \ | \ | \ / the automatic
\__ |__/ |__/ |___| \__ annotation and
\ | | | | \ analysis
___/ | | | | ___/ of speech
Copyright (C) 2011-2021 Brigitte Bigi
Laboratoire Parole et Langage, Aix-en-Provence, France
Use of this software is governed by the GNU Public License, version 3.
SPPAS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
SPPAS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SPPAS. If not, see <http://www.gnu.org/licenses/>.
This banner notice must not be removed.
-------------------------------------------------------------------------
"""
import os
import codecs
import logging
import traceback
from sppas.src.config import NoDirectoryError
from sppas.src.config import sppasUnicode
from sppas.src.config import sg
from sppas.src.config import separators
from sppas.src.anndata import sppasTier
from sppas.src.anndata import sppasLocation
from sppas.src.anndata import sppasInterval
from sppas.src.anndata import sppasPoint
from sppas.src.anndata.aio.aioutils import serialize_labels
from sppas.src.resources.mapping import sppasMapping
from sppas.src.anndata import sppasTag, sppasLabel
from sppas.src.audiodata import autils
import sppas.src.audiodata.aio
from ..annotationsexc import BadInputError
from ..annotationsexc import SizeInputsError
from ..annotationsexc import AudioChannelError
from .aligners.alignerio import AlignerIO
# ------------------------------------------------------------------
[docs]class TracksReaderWriter(object):
"""Manager for tracks from/to tiers.
"""
DELIMITERS = (" ", separators.variants, separators.phonemes)
# ------------------------------------------------------------------------
[docs] def __init__(self, mapping):
"""Create a new TracksReaderWriter instance.
:param mapping: (Mapping) a mapping table to convert the phone set
"""
# Mapping system for the phonemes
if mapping is None:
mapping = sppasMapping()
if isinstance(mapping, sppasMapping) is False:
raise TypeError('Expected a sppasMapping() as argument.'
'Got {:s} instead.'.format(type(mapping)))
self._mapping = mapping
# ------------------------------------------------------------------------
[docs] def get_units(self, dir_name):
"""Return the time units of all tracks.
:param dir_name: (str) Input directory to get files.
"""
return ListOfTracks.read(dir_name)
# ------------------------------------------------------------------------
# Read files
# ------------------------------------------------------------------------
[docs] def read_aligned_tracks(self, dir_name):
"""Read time-aligned tracks in a directory.
:param dir_name: (str) Input directory to get files.
:returns: (sppasTier, sppasTier, sppasTier)
"""
tier_phn, tier_tok, tier_pron = \
TracksReader.read_aligned_tracks(dir_name)
# map-back phonemes
self._mapping.set_keep_miss(True)
self._mapping.set_reverse(False)
# Map-back time-aligned phonemes to SAMPA
# include the mapping of alternative tags
for ann in tier_phn:
labels = list()
for label in ann.get_labels():
tags = list()
scores = list()
for tag, score in label:
text = tag.get_content()
tags.append(sppasTag(self._mapping.map_entry(text)))
scores.append(score)
labels.append(sppasLabel(tags, scores))
ann.set_labels(labels)
for ann in tier_pron:
labels = list()
for label in ann.get_labels():
tags = list()
scores = list()
for tag, score in label:
text = tag.get_content()
tags.append(sppasTag(
self._mapping.map(text, [separators.phonemes])))
scores.append(score)
labels.append(sppasLabel(tags, scores))
ann.set_labels(labels)
return tier_phn, tier_tok, tier_pron
# ------------------------------------------------------------------------
# Write files
# ------------------------------------------------------------------------
[docs] def split_into_tracks(self, input_audio, phon_tier, tok_tier, tok_rescue_tier, dir_align):
"""Write tracks from the given data.
:param input_audio: (str) Audio file name. Or None if no needed (basic alignment).
:param phon_tier: (sppasTier) The phonetization tier.
:param tok_tier: (sppasTier) The tokens tier, or None.
:param tok_rescue_tier: (sppasTier) The tokens rescue tier, or None.
:param dir_align: (str) Output directory to store files.
:returns: PhonAlign, TokensAlign
"""
# Map phonemes from SAMPA to the expected ones.
self._mapping.set_keep_miss(True)
self._mapping.set_reverse(True)
# Map phonetizations (even the alternatives)
for ann in phon_tier:
text = serialize_labels(ann.get_labels(), separator="\n", empty="", alt=True)
tab = text.split('\n')
content = list()
for item in tab:
item = item.replace('|', separators.variants)
if item.startswith('{') and item.endswith('}'):
content.append(item[1:-1])
else:
content.append(item)
mapped = self._mapping.map(" ".join(content),
TracksReaderWriter.DELIMITERS)
ann.set_labels(sppasLabel(sppasTag(mapped)))
try:
TracksWriter.write_tracks(input_audio, phon_tier, tok_tier, tok_rescue_tier, dir_align)
except SizeInputsError:
# number of intervals are not matching
TracksWriter.write_tracks(input_audio, phon_tier, None, None, dir_align)
except BadInputError:
# either phonemes or tokens is wrong... re-try with phonemes only
TracksWriter.write_tracks(input_audio, phon_tier, None, None, dir_align)
# ------------------------------------------------------------------------
[docs] @staticmethod
def get_filenames(track_dir, track_number):
"""Return file names corresponding to a given track.
:param track_dir: (str)
:param track_number: (int)
:returns: (audio, phn, tok, align) file names
"""
audio = TrackNamesGenerator.audio_filename(track_dir, track_number)
phn = TrackNamesGenerator.phones_filename(track_dir, track_number)
tok = TrackNamesGenerator.tokens_filename(track_dir, track_number)
align = TrackNamesGenerator.align_filename(track_dir, track_number)
return audio, phn, tok, align
# ----------------------------------------------------------------------------
[docs]class TrackNamesGenerator:
"""Manage names of the files for a given track number.
"""
[docs] @staticmethod
def audio_filename(track_dir, track_number):
"""Return the name of the audio file."""
return os.path.join(track_dir,
"track_{:06d}.wav".format(track_number))
[docs] @staticmethod
def phones_filename(track_dir, track_number):
"""Return the name of the file with Phonetization."""
return os.path.join(track_dir,
"track_{:06d}.phn".format(track_number))
[docs] @staticmethod
def tokens_filename(track_dir, track_number):
"""Return the name of the file with Tokenization."""
return os.path.join(track_dir,
"track_{:06d}.tok".format(track_number))
[docs] @staticmethod
def align_filename(track_dir, track_number, ext=None):
"""Return the name of the time-aligned file, without extension."""
if ext is None:
return os.path.join(track_dir,
"track_{:06d}".format(track_number))
return os.path.join(track_dir,
"track_{:06d}.{:s}".format(track_number, ext))
# ----------------------------------------------------------------------------
[docs]class TracksReader:
"""Read time-aligned tracks.
Manage tracks for the time-aligned phonemes and tokens.
"""
RADIUS = 0.005 # Half-size of a frame in the acoustic model
# ------------------------------------------------------------------------
[docs] @staticmethod
def read_aligned_tracks(dir_name):
"""Read a set of alignment files and set as tiers.
:param dir_name: (str) input directory containing a set of units
:returns: PhonAlign, TokensAlign
"""
# Read the time values of each track from a file
units = ListOfTracks.read(dir_name)
# Check if the directory exists
if os.path.exists(dir_name) is False:
raise NoDirectoryError(dirname=dir_name)
# Create new tiers
tier_phn = sppasTier("PhonAlign")
tier_tok = sppasTier("TokensAlign")
tier_pron = sppasTier("PronTokAlign")
# Explore each unit to get alignments
track_number = 1
for unit_start, unit_end in units:
# Fix filename to read, and load the content
basename = \
TrackNamesGenerator.align_filename(dir_name, track_number)
try:
_phons, _words, _prons = AlignerIO.read_aligned(basename)
except IOError:
_phons, _words, _prons = [], [], []
# Append alignments in tiers
TracksReader._add_aligned_track_into_tier(tier_phn, _phons, unit_start, unit_end)
TracksReader._add_aligned_track_into_tier(tier_tok, _words, unit_start, unit_end)
TracksReader._add_aligned_track_into_tier(tier_pron, _prons, unit_start, unit_end)
track_number += 1
return tier_phn, tier_tok, tier_pron
# ------------------------------------------------------------------------
@staticmethod
def _add_aligned_track_into_tier(tier, tdata, delta, unitend):
"""Append a list of (start, end, text, score) into the tier.
Shift start/end of a delta value and set the last end value.
"""
try:
for i, t in enumerate(tdata):
# fix the location - an interval
(loc_s, loc_e, contents, scores) = t
loc_s += delta
loc_e += delta
if i == (len(tdata)-1):
loc_e = unitend
location = sppasLocation(
sppasInterval(
sppasPoint(loc_s, TracksReader.RADIUS),
sppasPoint(loc_e, TracksReader.RADIUS)
))
# fix the label
# allow to work with alternative tags
tags = [sppasTag(c) for c in contents.split('|')]
if scores is not None:
tag_scores = [float(s) for s in scores.split('|')]
else:
tag_scores = None
label = sppasLabel(tags, tag_scores)
tier.create_annotation(location, label)
except:
logging.error('The following data were not added to the tier '
'{:s} at position {:f}: {:s}'
''.format(tier.get_name(), delta, str(tdata)))
logging.error(traceback.format_exc())
return False
return True
# ---------------------------------------------------------------------------
[docs]class TracksWriter:
"""Write non-aligned track files.
Manage tracks for the audio, the phonetization and the tokenization.
"""
[docs] @staticmethod
def write_tracks(input_audio, phon_tier, tok_tier, tok_rescue_tier, dir_align):
"""Main method to write tracks from the given data.
:param input_audio: (src) File name of the audio file.
:param phon_tier: (Tier) Tier with phonetization to split.
:param tok_tier: (Tier) Tier with tokenization to split.
:param tok_rescue_tier: (Tier) Tier with tokens to split.
:param dir_align: (str) Directory to put units.
:returns: List of tracks with (start-time end-time)
"""
# In any case, the phonetization is written
TracksWriter._write_text_tracks(phon_tier, tok_tier, tok_rescue_tier, dir_align)
# No need of an audio if basic alignment
if input_audio is not None:
if phon_tier.is_interval() is False:
raise BadInputError
if tok_tier is not None:
if tok_tier.is_interval() is False:
if tok_rescue_tier.is_interval() is False:
raise BadInputError
tracks = phon_tier.get_midpoint_intervals()
TracksWriter._write_audio_tracks(input_audio, tracks, dir_align)
else:
if phon_tier.is_interval() is True:
tracks = phon_tier.get_midpoint_intervals()
else:
# probably basic alignment of a written text!
tracks = phon_tier.get_midpoint_points()
# Write the time values of each track into a file
ListOfTracks.write(dir_align, tracks)
# ------------------------------------------------------------------------
@staticmethod
def _write_audio_tracks(input_audio, units, dir_align, silence=0.):
"""Write the first channel of an audio file into separated track files.
Re-sample to 16000 Hz, 16 bits.
:param input_audio: (src) File name of the audio file.
:param units: (list) List of tuples (start-time,end-time) of tracks.
:param dir_align: (str) Directory to write audio tracks.
:param silence: (float) Duration of a silence to surround the tracks.
"""
audio = sppas.src.audiodata.aio.open(input_audio)
nbc = audio.get_nchannels()
if nbc != 1:
raise AudioChannelError(nb=nbc)
i = audio.extract_channel(0)
channel = audio.get_channel(i)
audio.close()
channel = autils.format_channel(channel, 16000, 2)
for track, u in enumerate(units):
(s, e) = u
track_channel = \
autils.extract_channel_fragment(channel, s, e, silence)
track_name = \
TrackNamesGenerator.audio_filename(dir_align, track + 1)
autils.write_channel(track_name, track_channel)
# ------------------------------------------------------------------------
@staticmethod
def _write_text_tracks(phon_tier, tok_tier, tok_rescue_tier, dir_align):
"""Write tokenization and phonetization into separated track files.
:param phon_tier: (sppasTier) time-aligned tier with phonetization
:param tok_tier: (sppasTier) time-aligned tier with tokenization
:param tok_rescue_tier: (sppasTier) time-aligned tier with tokenization
:param dir_align: (str) the directory to write tracks.
"""
last_chance_tier = TracksWriter._create_tok_tier(phon_tier)
if tok_rescue_tier is None:
tok_rescue_tier = last_chance_tier
if tok_tier is None:
tok_tier = tok_rescue_tier
if len(phon_tier) != len(tok_tier):
if len(phon_tier) != len(tok_rescue_tier):
raise SizeInputsError(len(phon_tier), len(tok_tier))
for i in range(len(phon_tier)):
phon_ann = phon_tier[i]
phon_ann_labels = serialize_labels(phon_ann.get_labels())
TracksWriter._write_phonemes(phon_ann, dir_align, i + 1)
tok_ann = tok_tier[i]
tok_ann_labels = serialize_labels(tok_ann.get_labels())
if len(phon_ann_labels.split()) != len(tok_ann_labels.split()):
tok_ann = tok_rescue_tier[i]
tok_ann_labels = serialize_labels(tok_ann.get_labels())
logging.warning("Alignment of tokens rescued at interval {}".format(i))
if len(phon_ann_labels.split()) != len(tok_ann_labels.split()):
tok_ann = last_chance_tier[i]
TracksWriter._write_tokens(tok_ann, dir_align, i + 1)
# ------------------------------------------------------------------------
@staticmethod
def _create_tok_tier(phon_tier):
"""Create a tier with tokens like 'w_1 w_2...w_n' from phonemes.
:param phon_tier: (sppasTier) time-aligned tier with phonetization
:returns: (sppasTier)
"""
tok_tier = phon_tier.copy()
for ann in tok_tier:
tag = ann.get_best_tag()
if tag.is_silence() is False:
phonemes = serialize_labels(ann.get_labels(), " ", "", alt=True)
nb_phonemes = len(phonemes.split(' '))
tokens = " ".join(
["w_" + str(i + 1) for i in range(nb_phonemes)]
)
ann.set_labels([sppasLabel(sppasTag(tokens))])
return tok_tier
# ------------------------------------------------------------------------
@staticmethod
def _write_phonemes(annotation, dir_align, number):
"""Write the phonetization of a track in a file.
:param annotation: (sppasAnnotation)
:param dir_align: (str)
:param number: (int)
"""
phonemes = serialize_labels(annotation.get_labels(),
separator=" ", empty="", alt=True
)
fnp = TrackNamesGenerator.phones_filename(dir_align, number)
with codecs.open(fnp, "w", sg.__encoding__) as fp:
fp.write(phonemes)
# ------------------------------------------------------------------------
@staticmethod
def _write_tokens(annotation, dir_align, number):
"""Write the tokenization of a track in a file.
:param annotation: (sppasAnnotation)
:param dir_align: (str)
:param number: (int)
"""
tokens = serialize_labels(annotation.get_labels(), separator=" ", empty="", alt=True)
fnt = TrackNamesGenerator.tokens_filename(dir_align, number)
with codecs.open(fnt, "w", sg.__encoding__) as fp:
fp.write(tokens)
# ---------------------------------------------------------------------------
[docs]class ListOfTracks:
"""Manage the file with a list of tracks (units, ipus...).
"""
DEFAULT_FILENAME = "tracks.list"
# ------------------------------------------------------------------
[docs] @staticmethod
def read(dir_name):
"""Return a list of (start-time end-time).
:param dir_name: Name of the directory with the file to read.
:returns: list of units
"""
filename = os.path.join(dir_name, ListOfTracks.DEFAULT_FILENAME)
if os.path.exists(filename) is False:
raise IOError('The list of tracks is missing of the directory '
'{:s}'.format(dir_name))
with open(filename, 'r') as fp:
lines = fp.readlines()
fp.close()
# Each line corresponds to a track,
# with a couple 'start end' of float values.
_units = list()
for line in lines:
s = sppasUnicode(line)
line = s.to_strip()
_tab = line.split()
if len(_tab) >= 2:
_units.append((float(_tab[0]), float(_tab[1])))
return _units
# ------------------------------------------------------------------
[docs] @staticmethod
def write(dir_name, units):
"""Write a list file (start-time end-time).
:param dir_name: Name of the directory with the file to read.
:param units: List of units to write.
"""
if len(units) == 0:
raise IOError('No filled tracks were founds in the annotations.')
# convert points into intervals
# can happen mainly when written text: IPUs are ranked (1, 2, 3 ...)
u = units[0]
if isinstance(u, (tuple, list)) is False:
u = list()
#for i in range(1, len(units)+1):
# u.append((i, i + 1))
for i, midpoint in enumerate(units):
midpoint = float(midpoint)
if i+1 < len(units):
end = float(units[i+1])
else:
end = midpoint + 1.
end_midpoint = midpoint + (0.9 * (end-midpoint))
u.append((midpoint, end_midpoint))
units = u
filename = os.path.join(dir_name, ListOfTracks.DEFAULT_FILENAME)
with open(filename, 'w') as fp:
for start, end in units:
fp.write("{:6f} {:6f}\n".format(start, end))
fp.close()