# -*- coding: UTF-8 -*-
"""
:filename: sppas.src.annotations.Momel.momel.py
:author: Brigitte Bigi
:contact: develop@sppas.org
:summary: Momel algorithm.
.. _This file is part of SPPAS: http://www.sppas.org/
..
-------------------------------------------------------------------------
___ __ __ __ ___
/ | \ | \ | \ / the automatic
\__ |__/ |__/ |___| \__ annotation and
\ | | | | \ analysis
___/ | | | | ___/ of speech
Copyright (C) 2011-2021 Brigitte Bigi
Laboratoire Parole et Langage, Aix-en-Provence, France
Use of this software is governed by the GNU Public License, version 3.
SPPAS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
SPPAS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SPPAS. If not, see <http://www.gnu.org/licenses/>.
This banner notice must not be removed.
-------------------------------------------------------------------------
https://en.wikipedia.org/wiki/Momel
Different versions of the Momel algorithm have been developed
in the LPL in Aix en Provence over the last twenty years and have been
used for the phonetic modelling and symbolic coding of the intonation
patterns of a number of languages (including English, French, Italian,
Catalan, etc).
The last implementation is presented as a Praat plugin. The modelling
and coding algorithms have been implemented as a set of Praat scripts,
each corresponding to a specific step in the process.
See:
| Hirst, Daniel. (2007).
| A Praat plugin for Momel and INTSINT with improved algorithms
| for modelling and coding intonation.
| Proceedings of the 16th International Congress of Phonetic Sciences.
The quality of the F0 modelling crucially depends on the quality of
the F0 detected.
The quadratic spline function used to model the macro-melodic component
is defined by a sequence of target points, (couples <s, Hz>) each pair
of which is linked by two monotonic parabolic curves with the spline
knot occurring (by default) at the midway point between the two targets.
The first derivative of the curve thus defined is zero at each target
point and the two parabolas have the same value and same derivative at
the spline knot. This, in fact, defines the most simple mathematical
function for which the curves are both continuous and smooth.
"""
import math
from .anchor import Anchor
from .momelutil import quicksortcib
# ----------------------------------------------------------------------------
[docs]class Momel(object):
"""Implements Momel.
:author: Brigitte Bigi
:organization: Laboratoire Parole et Langage, Aix-en-Provence, France
:contact: develop@sppas.org
:license: GPL, v3
:copyright: Copyright (C) 2011-2018 Brigitte Bigi
"""
[docs] def __init__(self):
"""Create a new Momel instance."""
# Constants
self.SEUILV = 50.
self.FSIGMA = 1.
self.HALO_BORNE_TRAME = 4
self.RAPP_GLITCH = 0.05
self.ELIM_GLITCH = True
# Array of pitch values
self.hzptr = []
self.nval = 0
self.delta = 0.01
# Output of cible:
self.cib = []
# Output of reduc:
self.cibred = []
# Output of reduc2:
self.cibred2 = []
# cible window length (lfen1 est en pointes echantillon,
# pas milliseconds)
self.lfen1 = 30
# f0 threshold
self.hzinf = 50
# f0 ceiling (Hirst, Di Cristo et Espesser :
# hzsup est calcule automatiquement)
self.hzsup = 600
# maximum error (Maxec est 1+Delta en Hirst, Di Cristo et Espesser)
self.maxec = 1.04
# reduc window length (lfen2 est en pointes echantillon,
# pas milliseconds)
self.lfen2 = 20
# minimal distance (seuildiff_x est en pointes echantillon,
# pas milliseconds)
self.seuildiff_x = 5
# minimal frequency ratio
self.seuilrapp_y = 0.05
self.a0 = 0
self.a1 = 0
self.a2 = 0
# -------------------------------------------------------------------
[docs] def initialize(self):
"""Set some variables to their default values."""
# Array of pitch values
self.hzptr = []
self.nval = 0
self.delta = 0.01
# Output of cible:
self.cib = []
# Output of reduc:
self.cibred = []
# Output of reduc2:
self.cibred2 = []
# -------------------------------------------------------------------
[docs] def set_pitch_array(self, arrayvals):
self.hzptr = arrayvals
self.nval = len(self.hzptr)
[docs] def set_option_elim_glitch(self, activate=True):
self.ELIM_GLITCH = activate
[docs] def set_option_win1(self, val):
self.lfen1 = val
assert(self.lfen1 > 0)
[docs] def set_option_lo(self, val):
self.hzinf = val
[docs] def set_option_hi(self, val):
self.hzsup = val
[docs] def set_option_maxerr(self, val):
self.maxec = val
[docs] def set_option_win2(self, val):
self.lfen2 = val
[docs] def set_option_mind(self, val):
self.seuildiff_x = val
[docs] def set_option_minr(self, val):
self.seuilrapp_y = val
# ------------------------------------------------------------------
[docs] def elim_glitch(self):
"""Eliminate Glitch of the pitch values array.
Set a current pith value to 0 if left and right values
are greater than 5% more than the current value.
"""
_delta = 1.0 + self.RAPP_GLITCH
for i in range(1, self.nval-1):
cur = self.hzptr[i]
gprec = self.hzptr[i-1] * _delta
gnext = self.hzptr[i+1] * _delta
if cur > gprec and cur > gnext:
self.hzptr[i] = 0.
# ------------------------------------------------------------------
[docs] def calcrgp(self, pond, dpx, fpx):
"""From inputs, estimates: a0, a1, a2.
:param pond:
:param dpx:
:param fpx:
"""
pn = 0.
sx = sx2 = sx3 = sx4 = sy = sxy = sx2y = 0.
for ix in range(dpx, fpx+1):
p = pond[ix]
if p != 0.:
val_ix = float(ix)
y = self.hzptr[ix]
x2 = val_ix * val_ix
x3 = x2 * val_ix
x4 = x2 * x2
xy = val_ix * y
x2y = x2 * y
pn = pn + p
sx += (p * val_ix)
sx2 += (p * x2)
sx3 += (p * x3)
sx4 += (p * x4)
sy += (p * y)
sxy += (p * xy)
sx2y += (p * x2y)
if pn < 3.:
raise ValueError('pn < 3')
spdxy = sxy - (sx * sy) / pn
spdx2 = sx2 - (sx * sx) / pn
spdx3 = sx3 - (sx * sx2) / pn
spdx4 = sx4 - (sx2 * sx2) / pn
spdx2y = sx2y - (sx2 * sy) / pn
muet = (spdx2 * spdx4) - (spdx3 * spdx3)
if spdx2 == 0. or muet == 0.:
raise ValueError('spdx2 == 0. or muet == 0.')
self.a2 = (spdx2y * spdx2 - spdxy * spdx3) / muet
self.a1 = (spdxy - self.a2 * spdx3) / spdx2
self.a0 = (sy - self.a1 * sx - self.a2 * sx2) / pn
# ------------------------------------------------------------------
[docs] def cible(self):
"""Find momel target points."""
if len(self.hzptr) == 0:
raise IOError('Empty pitch array')
if self.hzsup < self.hzinf:
raise ValueError('F0 ceiling > F0 threshold')
pond = []
pondloc = [] # local copy of pond
hzes = []
for ix in range(self.nval):
hzes.append(0.)
if self.hzptr[ix] > self.SEUILV:
pond.append(1.0)
pondloc.append(1.0)
else:
pond.append(0.0)
pondloc.append(0.0)
# Examinate each pitch value
for ix in range(self.nval):
# Current interval to analyze: from dpx to fpx
dpx = ix - int(self.lfen1 / 2)
fpx = dpx + self.lfen1 + 1
# BB: do not go out of the range!
if dpx < 0:
dpx = 0
if fpx > self.nval:
fpx = self.nval
# copy original pond values for the current interval
for i in range(dpx, fpx):
pondloc[i] = pond[i]
nsup = 0
nsupr = -1
xc = yc = 0.0
ret_rgp = True
while nsup > nsupr:
nsupr = nsup
nsup = 0
try:
# Estimate values of: a0, a1, a2
self.calcrgp(pondloc, dpx, fpx-1)
except Exception:
# if calcrgp failed.
# print "calcrgp failed: ",e
ret_rgp = False
break
else:
# Estimate hzes
for ix2 in range(dpx, fpx):
hzes[ix2] = self.a0 + \
(self.a1 + self.a2 * float(ix2)) * \
float(ix2)
for x in range(dpx, fpx):
if self.hzptr[x] == 0. or \
(hzes[x] / self.hzptr[x]) > self.maxec:
nsup += 1
pondloc[x] = 0.
# Now estimate xc and yc for the new 'cible'
if ret_rgp is True and self.a2 != 0.:
vxc = (0.0 - self.a1) / (self.a2 + self.a2)
if (vxc > ix - self.lfen1) and (vxc < ix + self.lfen1):
vyc = self.a0 + (self.a1 + self.a2 * vxc) * vxc
if vyc > self.hzinf and vyc < self.hzsup:
xc = vxc
yc = vyc
c = Anchor()
c.set(xc, yc)
self.cib.append(c)
# ------------------------------------------------------------------
[docs] def reduc(self):
"""First target reduction of too close points."""
# initialisations
# ---------------
xdist = []
ydist = []
dist = []
for i in range(self.nval):
xdist.append(-1.)
ydist.append(-1.)
dist.append(-1.)
lf = int(self.lfen2 / 2)
xds = yds = 0.
np = 0
# xdist and ydist estimations
for i in range(self.nval-1):
# j1 and j2 estimations (interval min and max values)
j1 = 0
if i > lf:
j1 = i - lf
j2 = self.nval - 1
if i+lf < self.nval-1:
j2 = i + lf
# left (g means left)
sxg = syg = 0.
ng = 0
for j in range(j1, i+1):
if self.cib[j].y > self.SEUILV:
sxg = sxg + self.cib[j].x
syg = syg + self.cib[j].y
ng += 1
# right (d means right)
sxd = syd = 0.
nd = 0
for j in range(i+1, j2):
if self.cib[j].y > self.SEUILV:
sxd = sxd + self.cib[j].x
syd = syd + self.cib[j].y
nd += 1
# xdist[i] and ydist[i] evaluations
if nd * ng > 0:
xdist[i] = math.fabs(sxg / ng - sxd / nd)
ydist[i] = math.fabs(syg / ng - syd / nd)
xds = xds + xdist[i]
yds = yds + ydist[i]
np += 1
# end for
if np == 0 or xds == 0. or yds == 0.:
raise ValueError('Not enough values more than ' +
str(self.SEUILV) + ' hz \n')
# dist estimation (on pondere par la distance moyenne)
# ----------------------------------------------------
px = float(np) / xds
py = float(np) / yds
for i in range(self.nval):
if xdist[i] > 0.:
dist[i] = (xdist[i] * px + ydist[i] * py) / (px + py)
# Cherche les maxs des pics de dist > seuil
# -----------------------------------------
# Seuil = moy des dist ponderees
seuil = 2. / (px + py)
susseuil = False
# Add the start value (=0)
xd = list()
xd.append(0)
xmax = 0
for i in range(self.nval):
if len(xd) > int(self.nval/2):
raise Exception('Too many partitions (', len(xd), ')\n')
if susseuil is False:
if dist[i] > seuil:
susseuil = True
xmax = i
else:
if dist[i] > dist[xmax]:
xmax = i
if dist[i] < seuil and xmax > 0:
xd.append(xmax)
susseuil = False
# end for
# do not forget the last analyzed value!
if susseuil is True:
xd.append(xmax)
# Add the final value (=nval)
xd.append(self.nval)
# Partition sur les x
# -------------------
for ip in range(len(xd)-1):
# bornes partition courante
parinf = xd[ip]
parsup = xd[ip + 1]
sx = sx2 = sy = sy2 = 0.
n = 0
# moyenne sigma
for j in range(parinf, parsup):
# sur la pop d'une partition
if self.cib[j].y > 0.:
sx += self.cib[j].x
sx2 += self.cib[j].x * self.cib[j].x
sy += self.cib[j].y
sy2 += self.cib[j].y * self.cib[j].y
n += 1
# pour la variance
if n > 1:
xm = float(sx) / float(n)
ym = float(sy) / float(n)
varx = float(sx2) / float(n) - xm * xm
vary = float(sy2) / float(n) - ym * ym
# cas ou variance devrait etre == +epsilon
if varx <= 0.:
varx = 0.1
if vary <= 0.:
vary = 0.1
et2x = self.FSIGMA * math.sqrt(varx)
et2y = self.FSIGMA * math.sqrt(vary)
seuilbx = xm - et2x
seuilhx = xm + et2x
seuilby = ym - et2y
seuilhy = ym + et2y
# Elimination (set cib to 0)
for j in range(parinf, parsup):
if self.cib[j].y > 0. and \
(self.cib[j].x < seuilbx or
self.cib[j].x > seuilhx or
self.cib[j].y < seuilby or
self.cib[j].y > seuilhy):
self.cib[j].x = 0.
self.cib[j].y = 0.
# Recalcule moyennes
# ------------------
sx = sy = 0.
n = 0
for j in range(parinf, parsup):
if self.cib[j].y > 0.:
sx += self.cib[j].x
sy += self.cib[j].y
n += 1
# Reduit la liste des cibles
if n > 0:
cibred_cour = Anchor()
cibred_cour.set(sx/n, sy/n, n)
ncibr = len(self.cibred) - 1
if ncibr < 0:
ncibr = 0
self.cibred.append(cibred_cour)
else:
# si les cibred[].x ne sont pas strictement croissants
# on ecrase la cible ayant le poids le moins fort
if cibred_cour.x > self.cibred[ncibr].x:
# 1 cibred en + car t croissant
ncibr += 1
self.cibred.append(cibred_cour)
else:
# t <= precedent
if cibred_cour.p > self.cibred[ncibr].p:
# si p courant >, ecrase la precedente
self.cibred[ncibr].set(cibred_cour.x,
cibred_cour.y,
cibred_cour.p)
# ------------------------------------------------------------------
[docs] def reduc2(self):
"""reduc2.
2eme filtrage des cibles trop proches en t [et Hz]
"""
# classe ordre temporel croissant les cibred
c = quicksortcib(self.cibred)
self.cibred = c
self.cibred2.append(self.cibred[0])
pnred2 = 0
assert(self.seuilrapp_y > 0.)
ncibr_brut = len(self.cibred)
for i in range(1, ncibr_brut):
delta_x = self.cibred[i].x - self.cibred2[pnred2].x
if float(delta_x) < float(self.seuildiff_x):
if math.fabs(float((self.cibred[i].y - self.cibred2[pnred2].y)) /
self.cibred2[pnred2].y) < self.seuilrapp_y:
self.cibred2[pnred2].x = (self.cibred2[pnred2].x +
self.cibred[i].x) / 2.
self.cibred2[pnred2].y = (self.cibred2[pnred2].y +
self.cibred[i].y) / 2.
self.cibred2[pnred2].p = (self.cibred2[pnred2].p +
self.cibred[i].p)
else:
if self.cibred2[pnred2].p < self.cibred[i].p:
self.cibred2[pnred2].set(self.cibred[i].x,
self.cibred[i].y,
self.cibred[i].p)
else:
pnred2 += 1
self.cibred2.append(self.cibred[i])
# ------------------------------------------------------------------
[docs] def borne(self):
"""borne.
Principes:
calcul borne G (D) si 1ere (derniere) cible est
( > (debut_voisement+halo) )
( < (fin_voisement -halo) )
ce pt de debut(fin) voisement == frontiere
cible extremite == ancre
regression quadratique sur Hz de [frontiere ancre]
"""
halo = self.HALO_BORNE_TRAME
ancre = Anchor()
borne = Anchor()
# Borne gauche
# ------------
# Recherche 1er voise
premier_voise = 0
while premier_voise < self.nval and \
self.hzptr[premier_voise] < self.SEUILV:
premier_voise += 1
if int(self.cibred2[0].x) > (premier_voise + halo):
# origine des t : ancre.x, et des y : ancre.y
ancre = self.cibred2[0]
sx2y = 0.
sx4 = 0.
j = 0
for i in range(int(ancre.x), 0):
if self.hzptr[i] > self.SEUILV:
x2 = float(j) * float(j)
sx2y += (x2 * (self.hzptr[i] - ancre.y))
sx4 += (x2 * x2)
j += 1
frontiere = float(premier_voise)
a = 0.
if sx4 > 0.:
a = sx2y / sx4
borne.x = frontiere - (ancre.x - frontiere)
borne.y = ancre.y + \
(2 * a * (ancre.x - frontiere) * (ancre.x - frontiere))
# recherche dernier voisement
dernier_voise = self.nval - 1
while dernier_voise >= 0 and self.hzptr[dernier_voise] < self.SEUILV:
dernier_voise -= 1
# ################################################################## #
# nred2 = len(self.cibred2)
# if( int(self.cibred2[nred2-1].x) < (dernier_voise - halo)):
# origine des t : ancre.x, et des y : ancre.y
# ancre = self.cibred2[nred2-1]
# sx2y = 0.
# sx4 = 0.
# j = 0
# for i in range( int(ancre.x),self.nval):
# if self.hzptr[i] > self.SEUILV:
# x2 = float(j) * float(j)
# sx2y += (x2 * (self.hzptr[i] - ancre.y))
# sx4 += (x2*x2)
# j += 1
# frontiere = float(dernier_voise)
# a = 0.
# if (sx4 > 0.):
# a = sx2y / sx4
# borne.set_x( frontiere + (frontiere - ancre.x) )
# borne.set_y( ancre.y +
# (2. * a * (ancre.x - frontiere) * (ancre.x - frontiere)) )
# ------------------------------------------------------------------
[docs] def annotate(self, pitch_values):
"""Apply momel from a vector of pitch values, one each 0.01 sec.
:param pitch_values: (list)
:returns: list of selected anchors
"""
# Get pitch values
self.initialize()
self.set_pitch_array(pitch_values)
if self.ELIM_GLITCH is True:
self.elim_glitch()
self.cible()
self.reduc()
if len(self.cibred) == 0:
raise Exception("No left point after the first pass of point "
"reduction.\n")
self.reduc2()
if len(self.cibred2) == 0:
raise Exception("No left point after the second pass of point "
"reduction.\n")
self.borne()
return self.cibred2