# -*- coding: UTF-8 -*-
"""
:filename: sppas.src.annotations.SearchIPUs.silences.py
:author: Brigitte Bigi
:contact: develop@sppas.org
:summary: search for silences in a channel.
.. _This file is part of SPPAS: http://www.sppas.org/
..
-------------------------------------------------------------------------
___ __ __ __ ___
/ | \ | \ | \ / the automatic
\__ |__/ |__/ |___| \__ annotation and
\ | | | | \ analysis
___/ | | | | ___/ of speech
Copyright (C) 2011-2021 Brigitte Bigi
Laboratoire Parole et Langage, Aix-en-Provence, France
Use of this software is governed by the GNU Public License, version 3.
SPPAS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
SPPAS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SPPAS. If not, see <http://www.gnu.org/licenses/>.
This banner notice must not be removed.
-------------------------------------------------------------------------
"""
import logging
from sppas.src.audiodata.channel import sppasChannel
from sppas.src.audiodata.channelvolume import sppasChannelVolume
# ---------------------------------------------------------------------------
[docs]class sppasSilences(object):
"""Silence search on a channel of an audio file.
Silences are stored in a list of (from_pos,to_pos) values, indicating
the frame from which the silences are beginning and ending respectively.
"""
[docs] def __init__(self, channel, win_len=0.020, vagueness=0.005):
"""Create a sppasSilences instance.
:param channel: (sppasChannel) the input channel
:param win_len: (float) duration of a window
:param vagueness: (float) Windows length to estimate the boundaries.
Maximum value of vagueness is win_len.
The duration of a window (win_len) is relevant for the estimation
of the rms values.
Radius (see sppasPoint) is the 2*vagueness of the boundaries.
"""
self._win_len = win_len
self._vagueness = vagueness
self._channel = None
self.__volume_stats = None
self.__silences = list()
if channel is not None:
self.set_channel(channel)
# -----------------------------------------------------------------------
# Getters and setters
# -----------------------------------------------------------------------
[docs] def set_vagueness(self, vagueness):
"""Windows length to estimate the boundaries.
:param vagueness: (float) Maximum value of radius is win_len.
"""
self._vagueness = min(vagueness, self._win_len)
# -----------------------------------------------------------------------
[docs] def get_vagueness(self):
"""Get the vagueness value (=2*radius)."""
return self._vagueness
# -----------------------------------------------------------------------
[docs] def set_channel(self, channel):
"""Set a channel, then reset all previous results.
:param channel: (sppasChannel)
"""
if isinstance(channel, sppasChannel) is False:
raise TypeError('Expected a sppasChannel, got {:s} instead'
''.format(str(type(channel))))
self._channel = channel
self.__volume_stats = sppasChannelVolume(channel, self._win_len)
self.__silences = list()
# -----------------------------------------------------------------------
[docs] def get_volstats(self):
"""Return the sppasChannelVolume() estimated on the channel."""
return self.__volume_stats
# -----------------------------------------------------------------------
[docs] def set_silences(self, silences):
"""Fix manually silences.
To be use carefully!
:param silences: (list of tuples (start_pos, end_pos))
"""
# check if it's really a list of tuples
if isinstance(silences, list) is False:
raise TypeError('Expected a list, got {:s}'
''.format(type(silences)))
for v in silences:
v[0] = int(v[0])
v[1] = int(v[1])
# ok, assign value
self.__silences = silences
# -----------------------------------------------------------------------
[docs] def reset_silences(self):
"""Reset silences to an empty list."""
self.__silences = list()
# -----------------------------------------------------------------------
# Utility methods for tracks
# -----------------------------------------------------------------------
[docs] def track_data(self, tracks):
"""Yield the track data: a set of frames for each track.
:param tracks: (list of tuples) List of (from_pos,to_pos)
"""
if self._channel is None:
return
nframes = self._channel.get_nframes()
for from_pos, to_pos in tracks:
if nframes < from_pos:
# Accept a "DELTA" of 10 frames, in case of corrupted data.
if nframes < from_pos-10:
raise ValueError("Position {:d} not in range({:d})"
"".format(from_pos, nframes))
else:
from_pos = nframes
# Go to the provided position
self._channel.seek(from_pos)
# Keep in mind the related frames
yield self._channel.get_frames(to_pos - from_pos)
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
# Silence detection
# -----------------------------------------------------------------------
[docs] def fix_threshold_vol(self):
"""Fix the threshold for tracks/silences segmentation.
This is an observation of the distribution of rms values.
:returns: (int) volume value
"""
volumes = sorted(self.__volume_stats.volumes())
vmin = max(self.__volume_stats.min(), 0) # provide negative values
logging.info("RMS min={:d}".format(vmin))
vmean = self.__volume_stats.mean()
logging.info("RMS mean={:.2f}".format(vmean))
vmedian = self.__volume_stats.median()
logging.info("RMS median={:2f}".format(vmedian))
vvar = self.__volume_stats.coefvariation()
logging.info("RMS coef. var={:2f}".format(vvar))
# Remove very high volume values (outliers)
# only for distributions with a too high variability
if vmedian > vmean:
logging.debug('The RMS distribution need to be normalized.')
rms_threshold = volumes[int(0.85 * len(volumes))]
nb = 0
for i, v in enumerate(self.__volume_stats):
if v > rms_threshold:
self.__volume_stats.set_volume_value(i, rms_threshold)
nb += 1
vmean = self.__volume_stats.mean()
vmedian = self.__volume_stats.median()
vvar = self.__volume_stats.coefvariation()
# Normal situation... (more than 75% of the files!!!)
vcvar = 1.5 * vvar
threshold = int(vmin) + int((vmean - vcvar))
# Alternative, in case the audio is not as good as expected!
# (too low volume, or outliers which make the coeff var very high)
if vmedian > vmean:
# often means a lot of low volume values and some very high
median_index = 0.55 * len(volumes)
threshold = volumes[int(median_index)]
logging.debug(' ... threshold: estimator exception 1 - median > mean')
elif vcvar > vmean:
if vmedian < (vmean * 0.2):
# for distributions with a too low variability
threshold = int(vmin) + int((vmean - vmedian))
logging.debug(' ... threshold: estimator exception 2 - median < 0.2*mean')
else:
# often means some crazy values (very rare)
threshold = int(vmin) + int(0.2 * float(vmean))
logging.debug(' ... threshold: estimator exception 3 - vcvar > mean')
else:
logging.debug(' ... threshold: normal estimator')
logging.info('Threshold value for the search of silences: {:d}'
''.format(threshold))
return threshold
# -----------------------------------------------------------------------
[docs] def search_silences(self, threshold=0):
"""Search windows with a volume lesser than a given threshold.
This is then a search for silences. All windows with a volume
higher than the threshold are considered as tracks and not included
in the result. Block of silences lesser than min_sil_dur are
also considered tracks.
:param threshold: (int) Expected minimum volume (rms value)
If threshold is set to 0, search_minvol() will assign a value.
:returns: threshold
"""
if self._channel is None:
return 0
if threshold == 0:
threshold = self.fix_threshold_vol()
# This scans the volumes whether it is lower than threshold,
# and if true, it is written to silence.
self.__silences = list()
inside = False # inside a silence or not
idx_begin = 0
nframes = self.__volume_stats.get_winlen() * self._channel.get_framerate()
i = 0
for v in self.__volume_stats:
if v < threshold:
# It's a small enough volume to consider the window a silence
if inside is False:
# We consider it like the beginning of a block of silences
idx_begin = i
inside = True
# else: it's the continuation of a silence
else:
# It's a big enough volume to consider the window an IPU
if inside is True:
# It's the first window of an IPU
# so the previous window was the end of a silence
from_pos = int(idx_begin * nframes)
to_pos = int((i - 1) * nframes)
self.__silences.append((from_pos, to_pos))
inside = False
# else: it's the continuation of an IPU
i += 1
# Last interval
if inside is True:
start_pos = int(idx_begin *
self.__volume_stats.get_winlen() *
self._channel.get_framerate())
end_pos = self._channel.get_nframes()
self.__silences.append((start_pos, end_pos))
# Filter the current very small windows
self.__filter_silences(2. * self._win_len)
return threshold
# -----------------------------------------------------------------------
[docs] def filter_silences(self, threshold, min_sil_dur=0.200):
"""Filter the current silences.
:param threshold: (int) Expected minimum volume (rms value)
If threshold is set to 0, search_minvol() will assign a value.
:param min_sil_dur: (float) Minimum silence duration in seconds
:returns: Number of silences with the expected minimum duration
"""
if len(self.__silences) == 0:
return 0
if threshold == 0:
threshold = self.fix_threshold_vol()
# Adjust boundaries of the silences
adjusted = list()
for (from_pos, to_pos) in self.__silences:
adjusted_from = self.__adjust_bound(from_pos, threshold, direction=-1)
adjusted_to = self.__adjust_bound(to_pos, threshold, direction=1)
adjusted.append((adjusted_from, adjusted_to))
self.__silences = adjusted
# Re-filter
self.__filter_silences(min_sil_dur)
return len(self.__silences)
# -----------------------------------------------------------------------
[docs] def filter_silences_from_tracks(self, min_track_dur=0.60):
"""Filter the given silences to remove very small tracks.
:param min_track_dur: (float) Minimum duration of a track
:returns: filtered silences
"""
if len(self.__silences) < 3:
return
tracks = self.extract_tracks(min_track_dur, 0., 0.)
# Remove too short tracks
keep_tracks = list()
for (from_track, to_track) in tracks:
delta = float((to_track - from_track)) / float(self._channel.get_framerate())
if delta > min_track_dur:
keep_tracks.append((from_track, to_track))
# Re-create silences from the selected tracks
filtered_sil = list()
# first silence
if self.__silences[0][0] < keep_tracks[0][0]:
filtered_sil.append((self.__silences[0][0], self.__silences[0][1]))
# silences between tracks
prev_track_end = -1
for (from_track, to_track) in keep_tracks:
if prev_track_end > -1:
filtered_sil.append((int(prev_track_end), int(from_track)))
prev_track_end = to_track
# last silence
to_pos = self._channel.get_nframes()
to_track = tracks[-1][1]
if (to_pos - to_track) > 0:
filtered_sil.append((int(to_track), int(to_pos)))
self.__silences = filtered_sil
# -----------------------------------------------------------------------
def __filter_silences(self, min_sil_dur=0.200):
"""Filter the given silences.
:param min_sil_dur: (float) Minimum silence duration in seconds
:returns: filtered silences
"""
filtered_sil = list()
for (start_pos, end_pos) in self.__silences:
sil_dur = float(end_pos-start_pos) / \
float(self._channel.get_framerate())
if sil_dur > min_sil_dur:
filtered_sil.append((start_pos, end_pos))
self.__silences = filtered_sil
# -----------------------------------------------------------------------
def __adjust_bound(self, pos, threshold, direction=0):
"""Adjust the position of a silence around a given position.
Here "around" the position means in a range of 18 windows,
i.e. 6 before + 12 after the position.
:param pos: (int) Initial position of the silence
:param threshold: (int) RMS threshold value for a silence
:param direction: (int)
:returns: new position
"""
if self._vagueness == self._win_len:
return pos
if direction not in (-1, 1):
return pos
# Extract the frames of the windows around the pos
delta = int(1.5 * self.__volume_stats.get_winlen() * self._channel.get_framerate())
start_pos = int(max(pos - delta, 0))
self._channel.seek(start_pos)
frames = self._channel.get_frames(int(delta * 3))
# Create a channel and estimate volume values with a window
# of vagueness (i.e. 4 times more precise than the original)
c = sppasChannel(self._channel.get_framerate(),
self._channel.get_sampwidth(),
frames)
vol_stats = sppasChannelVolume(c, self._vagueness)
# Can reduce the silence? the idea is to not miss low-level volume of the
# beginning of the first phoneme or the end of the last one.
new_pos = pos
if direction == 1: # silence | ipu
# start to compare at the beginning of the silence: as soon as a rms value
# is higher than the threshold, the silence ended.
for idx, v in enumerate(vol_stats):
shift = idx * (int(self._vagueness * self._channel.get_framerate()))
if v > threshold:
new_pos = start_pos + int(shift)
break
elif direction == -1: # ipu | silence
# start to compare at the end of the silence: as soon as a rms value is
# higher than the threshold, the silence starts.
idx = len(vol_stats) # = 12 (3 windows of 4 vagueness)
for v in reversed(vol_stats):
if v >= threshold:
shift = idx * (int(self._vagueness * self._channel.get_framerate()))
new_pos = start_pos + int(shift)
break
idx -= 1
return new_pos
# -----------------------------------------------------------------------
# overloads
# -----------------------------------------------------------------------
def __len__(self):
return len(self.__silences)
def __iter__(self):
for x in self.__silences:
yield x
def __getitem__(self, i):
return self.__silences[i]