"""
:filename: sppas.src.annotations.Align.aligners.alignerio.py
:author: Brigitte Bigi
:contact: develop@sppas.org
:summary: Aligners Input/Output readers and writers
.. _This file is part of SPPAS: <http://www.sppas.org/>
..
---------------------------------------------------------------------
___ __ __ __ ___
/ | \ | \ | \ / the automatic
\__ |__/ |__/ |___| \__ annotation and
\ | | | | \ analysis
___/ | | | | ___/ of speech
Copyright (C) 2011-2021 Brigitte Bigi
Laboratoire Parole et Langage, Aix-en-Provence, France
Use of this software is governed by the GNU Public License, version 3.
SPPAS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
SPPAS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SPPAS. If not, see <http://www.gnu.org/licenses/>.
This banner notice must not be removed.
---------------------------------------------------------------------
"""
import os
import codecs
import logging
from sppas.src.config import sg
from sppas.src.config import separators
from sppas.src.config import sppasUnicode
# ---------------------------------------------------------------------------
[docs]class BaseAlignersReader(object):
"""Base class for readers/writers of time-aligned files.
"""
[docs] def __init__(self):
self.extension = ""
# -----------------------------------------------------------------------
[docs] @staticmethod
def read(filename):
raise NotImplementedError
# -----------------------------------------------------------------------
[docs] @staticmethod
def get_lines(filename):
"""Return the lines of a file with the SPPAS encoding.
:param filename: file to load
:return: list of decoded lines
"""
with open(filename, 'rb') as f:
lines = []
lines_b = f.readlines()
i = 0
for line_b in lines_b:
i += 1
try:
line = line_b.decode(sg.__encoding__).strip('\n')
lines.append(line)
except UnicodeDecodeError:
logging.warning("UnicodeDecodeError: line {:d} is ignored.".format(i))
f.close()
#with codecs.open(filename, 'r', sg.__encoding__) as fp:
# lines = fp.readlines()
# fp.close()
return lines
# -----------------------------------------------------------------------
[docs] @staticmethod
def get_units_julius(lines):
"""Return the units of a palign/walign file (in frames).
:param lines: (List of str)
:returns: List of tuples (start, end)
"""
units = list()
i = 0
while "=== begin forced alignment ===" not in lines[i]:
i += 1
if i > len(lines):
raise IOError('Time units not found')
while "=== end forced alignment ===" not in lines[i]:
i += 1
if i > len(lines):
raise IOError('Time units not found in alignment result')
if lines[i].startswith('['):
# New phonemes
line = lines[i].replace("[", "")
line = line.replace("]", "")
line = sppasUnicode(line).to_strip()
tab = line.split()
# tab 0: first frame
# tab 1: last frame
# tab 2: score of the segmentation (log proba)
# tab 3: triphone used
units.append((int(tab[0]), int(tab[1])))
return units
# -----------------------------------------------------------------------
[docs] @staticmethod
def get_phonemes_julius(lines):
"""Return the pronunciation of all words.
:param lines: (List of str)
:returns: List of tuples (ph1 ph2...phN)
"""
phonemes = list()
i = 0
while lines[i].startswith('phseq1') is False:
i += 1
if i == len(lines):
raise IOError('Phonemes sequence not found.')
line = lines[i]
line = line[7:].strip()
if len(line) == 0:
raise IOError('Empty phonemes sequence.')
words = line.split('|')
for phn in words:
phn = phn.strip()
phonemes.append(tuple(phn.split()))
return phonemes
# -----------------------------------------------------------------------
[docs] @staticmethod
def get_words_julius(lines):
"""Return all words.
:param lines: (List of str)
:returns: List
"""
i = 0
while lines[i].startswith('sentence1') is False:
i += 1
if i == len(lines):
raise IOError('Words not found in alignment result')
line = lines[i]
line = line[10:]
line = line.strip()
return line.split()
# -----------------------------------------------------------------------
[docs] @staticmethod
def get_word_scores_julius(lines):
"""Return all scores of words.
:param lines: (List of str)
:returns: List
"""
i = 0
while lines[i].startswith('cmscore1') is False:
i += 1
if i == len(lines):
raise IOError('Scores not found in alignment result')
line = lines[i]
line = line[9:]
line = line.strip()
return line.split()
# -----------------------------------------------------------------------
[docs] @staticmethod
def units_to_time(units, samplerate):
"""Return the conversion of units.
Convert units (in frames) into time values (in seconds).
:param samplerate: (int) Sample rate to be applied to the units.
:returns: List of tuples (start, end)
NOTE: DANS LES VERSIONS PREC. ON DECALAIT TOUT DE 10ms A DROITE.
"""
samplerate = float(samplerate)
u = list()
i = 0
while i < len(units):
# Fix the begin of this annotation
s = round(float(units[i][0]) / samplerate, 3)
if i+1 < len(units):
# Fix the end of this annotation to the begin of the next one
e = round(float(units[i+1][0]) / samplerate, 3)
else:
e = round(float(units[i][1]) / samplerate, 3)
u.append((s, e))
i += 1
return u
# -----------------------------------------------------------------------
[docs] @staticmethod
def shift_time_units(units, delta):
"""Return the units shifted of a delta time.
The first start time and the last end time are not shifted.
:param units: (list of tuples) Time units
:param delta: (float) Delta time value in range [-0.02;0.02]
"""
if delta > 0.02:
delta = 0.02
if delta < -0.02:
delta = -0.02
shifted = list()
i = 0
while i < len(units):
start, end = units[i]
if i > 0:
start += delta
if i + 1 < len(units):
end += delta
shifted.append((round(start, 3), round(end, 3)))
i += 1
return shifted
# -----------------------------------------------------------------------
[docs] @staticmethod
def make_result(units, words, phonemes, scores):
"""Make a unique data structure from the given data.
:param units: (List of tuples)
:param words: (List of str)
:param phonemes: (List of tuples)
:param scores: (List of str, or None)
:returns: Two data structures
1. List of (start_time end_time phoneme None)
2. List of (start_time end_time word score)
"""
if scores is None:
scores = [None]*len(words)
aligned_words = list()
aligned_phones = list()
i = 0
for wd, phn_seq, sc in zip(words, phonemes, scores):
start_wd = units[i][0]
for phn in phn_seq:
if i == len(units):
raise IOError('Phonemes/Units are not matching '
'in alignment result')
start_phn, end_phn = units[i]
aligned_phones.append((start_phn, end_phn, phn, None))
i += 1
end_wd = units[i - 1][1]
aligned_words.append((start_wd, end_wd, wd, sc))
return aligned_phones, aligned_words
# ---------------------------------------------------------------------------
[docs]class palign(BaseAlignersReader):
"""palign reader/writer of time-aligned files (Julius CSR Engine).
"""
[docs] def __init__(self):
"""Create a palign instance to read palign files of Julius."""
super(palign, self).__init__()
self.extension = "palign"
# -----------------------------------------------------------------------
[docs] @staticmethod
def read(filename):
"""Read an alignment file in the format of Julius CSR engine.
:param filename: (str) The input file name.
:returns: 3 lists of tuples
1. List of (start-time end-time phoneme None)
2. List of (start-time end-time word None)
3. List of (start-time end-time pron_word score)
"""
b = BaseAlignersReader()
lines = b.get_lines(filename)
try:
phonemes = b.get_phonemes_julius(lines)
except IOError:
logging.error('Got no time-aligned phonemes in file {:s}:'
''.format(filename))
raise
words = b.get_words_julius(lines)
pron_words = [separators.phonemes.join(phn) for phn in phonemes]
scores = b.get_word_scores_julius(lines)
if len(words) != len(phonemes):
logging.error('Words/Phonemes are not matching in file: {:s}'
''.format(filename))
logging.error(' - words: {}'.format(words))
logging.error(' - phonemes: {}'.format(phonemes))
raise IOError("Words/Phonemes are not matching "
"in alignment result of file {:s}".format(filename))
if len(words) != len(scores):
logging.error('Words/Scores are not matching in file: {:s}'
''.format(filename))
logging.error(' - words: {}'.format(words))
logging.error(' - scores: {}'.format(scores))
raise IOError("Words/Scores are not matching in alignment result "
"of file {:s}".format(filename))
units = b.get_units_julius(lines)
units = b.units_to_time(units, 100)
units = b.shift_time_units(units, 0.01)
data_phon, data_words = b.make_result(units, words, phonemes, None)
d, data_pron = b.make_result(units, pron_words, phonemes, scores)
return data_phon, data_words, data_pron
# -----------------------------------------------------------------------
[docs] @staticmethod
def write(phoneslist, tokenslist, alignments, outputfilename):
"""Write an alignment output file.
:param phoneslist: (list) The phonetization of each token
:param tokenslist: (list) Each token
:param alignments: (list) Tuples (start-time end-time phoneme)
:param outputfilename: (str) Output file name (a Julius-like output).
"""
with codecs.open(outputfilename, 'w', sg.__encoding__) as fp:
fp.write("----------------------- System Information begin "
"---------------------\n")
fp.write("\n")
fp.write(" Basic Alignment\n")
fp.write("\n")
fp.write("----------------------- System Information end "
"-----------------------\n")
fp.write("\n### Recognition: 1st pass\n")
fp.write("pass1_best: ")
fp.write("{:s}\n".format(" ".join(tokenslist)))
fp.write("pass1_best_wordseq: ")
fp.write("{:s}\n".format(" ".join(tokenslist)))
fp.write("pass1_best_phonemeseq: ")
fp.write("{:s}\n".format(" | ".join(phoneslist)))
fp.write("\n### Recognition: 2nd pass\n")
fp.write("ALIGN: === phoneme alignment begin ===\n")
fp.write("sentence1: ")
fp.write("{:s}\n".format(" ".join(tokenslist)))
fp.write("wseq1: ")
fp.write("{:s}\n".format(" ".join(tokenslist)))
fp.write("phseq1: ")
fp.write("{:s}\n".format(" | ".join(phoneslist)))
fp.write("cmscore1: ")
fp.write("{:s}\n".format("0.000 "*len(phoneslist)))
fp.write("=== begin forced alignment ===\n")
fp.write("-- phoneme alignment --\n")
fp.write(" id: from to n_score unit\n")
fp.write(" ----------------------------------------\n")
for tv1, tv2, phon in alignments:
fp.write("[ {:d} ".format(tv1))
fp.write(" {:d}]".format(tv2))
fp.write(" -30.000000 " + str(phon) + "\n")
fp.write("=== end forced alignment ===\n")
fp.close()
# ---------------------------------------------------------------------------
[docs]class walign(BaseAlignersReader):
"""walign reader of time-aligned files (Julius CSR Engine).
"""
[docs] def __init__(self):
"""Create a walign instance to read walign files of Julius."""
super(walign, self).__init__()
self.extension = "walign"
# -----------------------------------------------------------------------
[docs] @staticmethod
def read(filename):
"""Read an alignment file in the format of Julius CSR engine.
:param filename: (str) The input file name.
:returns: A list of tuples (start-time end-time word score)
"""
b = BaseAlignersReader()
lines = b.get_lines(filename)
words = b.get_words_julius(lines)
scores = b.get_word_scores_julius(lines)
if len(words) != len(scores):
logging.error('Got words: {}'.format(words))
logging.error('Got scores: {}'.format(scores))
raise IOError("Words/Scores are not matching in alignment result")
units = b.get_units_julius(lines)
units = b.units_to_time(units, 100)
units = b.shift_time_units(units, 0.01)
aligned_words = list()
i = 0
for wd, sc in zip(words, scores):
if i == len(units):
logging.error('Got words: {}'.format(words))
logging.error('Got units: {}'.format(units))
raise IOError('Phonemes/Units are not matching '
'in alignment result')
start_wd = units[i][0]
end_wd = units[i][1]
aligned_words.append((start_wd, end_wd, wd, sc))
i += 1
return aligned_words
# ---------------------------------------------------------------------------
[docs]class mlf(BaseAlignersReader):
"""mlf reader of time-aligned files (HTK Toolkit).
When the -m option is used, the transcriptions output by HVITE would by
default contain both the model level and word level transcriptions .
For example, a typical fragment of the output might be:
7500000 8700000 f -1081.604736 FOUR 30.000000
8700000 9800000 ao -903.821350
9800000 10400000 r -665.931641
10400000 10400000 sp -0.103585
10400000 11700000 s -1266.470093 SEVEN 22.860001
11700000 12500000 eh -765.568237
12500000 13000000 v -476.323334
13000000 14400000 n -1285.369629
14400000 14400000 sp -0.103585
"""
[docs] def __init__(self):
"""Create a mlf instance to parse mlf files from HVite."""
super(mlf, self).__init__()
self.extension = "mlf"
# -----------------------------------------------------------------------
[docs] @staticmethod
def is_integer(s):
"""Check whether a string is an integer or not.
:param s: (str or unicode)
:returns: (bool)
"""
try:
int(s)
return True
except ValueError:
pass
try:
import unicodedata
unicodedata.numeric(s)
return True
except (TypeError, ValueError):
pass
return False
# -----------------------------------------------------------------------
[docs] @staticmethod
def get_units(lines):
"""Return the units of a mlf file (in nano-seconds).
:param lines: (List of str)
:returns: List of tuples (start, end)
"""
units = list()
for line in lines:
columns = line.split()
if len(columns) > 3:
if mlf.is_integer(columns[0]) and mlf.is_integer(columns[1]):
units.append((int(columns[0]), int(columns[1])))
return units
# -----------------------------------------------------------------------
[docs] @staticmethod
def get_phonemes(lines):
"""Return the pronunciation of all words.
:param lines: (List of str)
:returns: List of tuples (ph1 ph2...phN)
"""
phonemes = list()
phon_seq = list()
for line in lines:
columns = line.split()
if len(columns) > 3:
if mlf.is_integer(columns[0]) and mlf.is_integer(columns[1]):
phon = columns[2]
if len(columns) >= 5:
if len(phon_seq) > 0:
phonemes.append(tuple(phon_seq))
phon_seq = list()
phon_seq.append(phon)
if len(phon_seq) > 0:
phonemes.append(tuple(phon_seq))
return phonemes
# -----------------------------------------------------------------------
[docs] @staticmethod
def get_words(lines):
"""Return all words.
:param lines: (List of str)
:returns: List
"""
words = list()
for line in lines:
columns = line.split()
if len(columns) > 3:
if mlf.is_integer(columns[0]) and mlf.is_integer(columns[1]):
if len(columns) >= 5:
words.append(columns[4])
return words
# -----------------------------------------------------------------------
[docs] @staticmethod
def read(filename):
"""Read an alignment file (a mlf file).
:param filename: (str) the input file (a HVite mlf output file).
:returns: 2 lists of tuples:
- (start-time end-time phoneme None)
- (start-time end-time word None)
"""
b = BaseAlignersReader()
lines = b.get_lines(filename)
units = mlf.get_units(lines)
units = b.units_to_time(units, 10e6)
units = b.shift_time_units(units, 0.01)
phonemes = mlf.get_phonemes(lines)
pron_words = [separators.phonemes.join(phn) for phn in phonemes]
words = mlf.get_words(lines)
if len(words) != len(phonemes):
logging.error('Got words: {}'.format(words))
logging.error('Got phonemes: {}'.format(phonemes))
raise IOError("Words/Phonemes are not matching "
"in alignment result")
data_phon, data_words = b.make_result(units, words, phonemes, None)
data_phon, data_pron = b.make_result(units, pron_words, phonemes, None)
return data_phon, data_words, data_pron
# ---------------------------------------------------------------------------
[docs]class AlignerIO(object):
"""Reader/writer of the output files of the aligners.
AlignerIO implements methods to read/write files of the external aligner
systems.
"""
# List of file extensions this class is able to read and/or write.
EXTENSIONS_READ = {palign().extension: palign,
mlf().extension: mlf,
walign().extension: walign}
EXTENSIONS_WRITE = {palign().extension: palign}
# -----------------------------------------------------------------------
[docs] @staticmethod
def read_aligned(basename):
"""Find an aligned file and read it.
:param basename: (str) File name without extension
:returns: Two lists of tuples with phones and words
- (start-time end-time phoneme score)
- (start-time end-time word score)
The score can be None.
todo: The "phoneme" column can be a sequence of alternative phonemes.
"""
for ext in AlignerIO.EXTENSIONS_READ:
track_name = basename + "." + ext
if os.path.isfile(track_name) is True:
return AlignerIO.EXTENSIONS_READ[ext]().read(track_name)
raise IOError('No time-aligned file was found for {:s}'
''.format(basename))