SPPAS integration of the speaker lexical variation annotation.
Main differences compared to repetitions: The span option is used to fix the max number of continuous tokens to analyze. The span window has a duration limit.
SPPAS integration of the speaker lexical variation annotation.
Main differences compared to repetitions: The span option is used to fix the max number of continuous tokens to analyze. The span window has a duration limit.
Create a new sppasLexVar instance.
Log is used for a better communication of the annotation process and its results. If None, logs are redirected to the default logging system.
def __init__(self, log=None):
"""Create a new sppasLexVar instance.
Log is used for a better communication of the annotation process and its
results. If None, logs are redirected to the default logging system.
:param log: (sppasLog) Human-readable logs.
"""
super(sppasLexRep, self).__init__('lexrep.json', log)
self.__rules = OtherRules(self._stop_words)
Fix all options.
def fix_options(self, options):
"""Fix all options.
:param options: list of sppasOption instances
"""
for opt in options:
key = opt.get_key()
if 'spandur' == key:
self.set_span_duration(opt.get_value())
elif 'span' == key:
self.set_span(opt.get_value())
elif 'stopwords' == key:
self.set_stopwords(opt.get_value())
elif 'alpha' == key:
self.set_alpha(opt.get_value())
elif 'pattern' in key:
self._options[key] = opt.get_value()
else:
raise AnnotationOptionError(key)
Set the max span, in number of words.
def set_span(self, value):
"""Set the max span, in number of words.
:param value: (int) Max nb of tokens in a span window.
"""
if value < 5 or value > 200:
raise Exception('Invalid span value')
self._options['span'] = int(value)
Set the spandur option.
def set_span_duration(self, value):
"""Set the spandur option.
:param value: (float, int) Max duration of a span window.
"""
self._options['spandur'] = value
Set the stopwords option.
def set_stopwords(self, value):
"""Set the stopwords option.
:param value: (bool) Enable the fact to add estimated stopwords
"""
self._options['stopwords'] = bool(value)
Set the alpha option.
def set_alpha(self, value):
"""Set the alpha option.
:param value: (float) Coefficient to estimated stopwords
"""
if value < 0.1 or value > 2.0:
raise Exception('Invalid alpha value')
self._options['alpha'] = float(value)
Create a list with the tokens contained in a tier.
@staticmethod
def tier_to_list(tier, loc=False):
"""Create a list with the tokens contained in a tier.
:param tier: (sppasTier)
:param loc: (bool) if true create the corresponding list of sppasLocation()
:returns: (list, list) list of unicode content and list of location
"""
content = list()
localiz = list()
for ann in tier:
for label in ann.get_labels():
for tag, score in label:
if tag.is_speech():
content.append(tag.get_content())
if loc is True:
localiz.append(ann.get_location())
return (content, localiz)
Return the index of the last token of the longest repeated sequence.
No matter if a non-speech event occurs in the middle of the repeated sequence and no matter if a non-speech event occurs in the middle of the source sequence. No matter if tokens are not repeated in the same order.
@staticmethod
def get_longest(speaker1, speaker2):
"""Return the index of the last token of the longest repeated sequence.
No matter if a non-speech event occurs in the middle of the repeated
sequence and no matter if a non-speech event occurs in the middle of
the source sequence.
No matter if tokens are not repeated in the same order.
:param speaker1: (DataSpeaker) Entries of speaker 1
:param speaker2: (DataSpeaker) Entries of speaker 2
:returns: (int) Index or -1
"""
last_token = -1
for index1 in range(len(speaker1)):
if speaker1.is_word(index1) is True:
param2 = 0
repet_idx = speaker1.is_word_repeated(index1, param2, speaker2)
if repet_idx > -1:
last_token = index1
else:
break
return last_token
Append (or not) a repetition.
def select(self, index1, speaker1, speaker2):
"""Append (or not) a repetition.
:param index1: (int) end index of the entry of the source (speaker1)
:param speaker1: (DataSpeaker) Entries of speaker 1
:param speaker2: (DataSpeaker) Entries of speaker 2
:returns: (bool)
"""
keep_me = self.__rules.rule_syntagme(0, index1, speaker1)
if keep_me is False:
keep_me = self.__rules.rule_strict(0, index1, speaker1, speaker2)
return keep_me
Create a tier from content end localization lists.
@staticmethod
def create_tier(sources, locations):
"""Create a tier from content end localization lists.
:param sources: (dict) dict of sources -- in fact, the indexes.
:param locations: (list) list of location corresponding to the tokens
:returns: (sppasTier)
"""
tier_content = sppasTier('LexRepContent')
for lexreprise in sources:
start_idx = lexreprise.get_start()
end_idx = start_idx + lexreprise.get_end()
loc_begin = locations[start_idx]
loc_end = locations[end_idx]
begin_point = loc_begin.get_lowest_localization()
end_point = loc_end.get_highest_localization()
location = sppasLocation(sppasInterval(begin_point, end_point))
tier_content.create_annotation(location, lexreprise.get_labels())
return tier_content
Return the list of DataSpeaker matching the given content.
def windowing(self, content, location=None):
"""Return the list of DataSpeaker matching the given content.
:param content: (list) List of entries
:param location: (list) List of locations of the entries
:returns: list of DataSpeaker
"""
span_tok = self._options['span']
span_dur = self._options['spandur']
windows = list()
for i in range(len(content)):
end_size = min(span_tok, len(content) - i)
if location is not None and end_size > 1:
win_loc = location[i:i + end_size]
start_point = win_loc[0].get_lowest_localization()
end_point = win_loc[end_size - 1].get_highest_localization()
win_dur = end_point.get_midpoint() - start_point.get_midpoint()
while win_dur > span_dur and end_size > 0:
end_point = win_loc[end_size - 1].get_highest_localization()
win_dur = end_point.get_midpoint() - start_point.get_midpoint()
if win_dur <= span_dur:
break
end_size -= 1
if end_size < min(span_tok, len(content) - i):
logging.debug(' ... window was reduced to {} tokens.'.format(end_size + 1))
win_tok = content[i:i + end_size]
windows.append(DataSpeaker(win_tok))
return windows
Detect the lexical variations between 2 tiers.
def lexical_variation_detect(self, tier1, tier2):
"""Detect the lexical variations between 2 tiers.
:param tier1: (sppasTier)
:param tier2: (sppasTier)
"""
content_tier1, loc_tier1 = self.tier_to_list(tier1, True)
content_tier2, loc_tier2 = self.tier_to_list(tier2, True)
window_list1 = self.windowing(content_tier1, loc_tier1)
window_list2 = self.windowing(content_tier2, loc_tier2)
sources1 = self._detect_all_sources(window_list1, window_list2)
sources2 = self._detect_all_sources(window_list2, window_list1)
tier1 = self.create_tier(sources1, loc_tier1)
tier2 = self.create_tier(sources2, loc_tier2)
tier1.set_name(tier1.get_name() + '-1')
tier2.set_name(tier2.get_name() + '-2')
return (tier1, tier2)
Return 2 tiers with aligned tokens.
NoTierInputError
def get_inputs(self, input_files):
"""Return 2 tiers with aligned tokens.
:param input_files: (list)
:raise: NoTierInputError
:return: (sppasTier)
"""
if len(input_files) != 2:
raise Exception('Invalid format of input files.')
tier_src = None
for filename in input_files[0]:
parser = sppasTrsRW(filename)
trs_input = parser.read()
if tier_src is None:
tier_src = sppasFindTier.aligned_tokens(trs_input)
if tier_src is None:
logging.error('A source tier with time-aligned tokens was expected but not found.')
raise NoTierInputError
tier_echo = None
for filename in input_files[1]:
parser = sppasTrsRW(filename)
trs_input = parser.read()
if tier_echo is None:
tier_echo = sppasFindTier.aligned_tokens(trs_input)
if tier_echo is None:
logging.error('An echo tier with time-aligned tokens was expected but not found.')
raise NoTierInputError
return (tier_src, tier_echo)
Run the automatic annotation process on an input.
def run(self, input_files, output=None):
"""Run the automatic annotation process on an input.
:param input_files: (list of list of str) time-aligned tokens of 2 files
:param output: (str) the output file name
:returns: (sppasTranscription)
"""
tier_tokens_src, tier_tokens_echo = self.get_inputs(input_files)
tier1, tier2 = self.lexical_variation_detect(tier_tokens_src, tier_tokens_echo)
trs_output = sppasTranscription(self.name)
trs_output.set_meta('annotation_result_of', input_files[0][0])
if len(self._word_strain) > 0:
tier_tokens_src.set_name(tier_tokens_src.get_name() + '-1')
trs_output.append(tier_tokens_src)
if self._options['stopwords'] is True:
stopwords1 = self.make_stop_words(tier_tokens_src)
stopwords1.set_name(stopwords1.get_name() + '-1')
trs_output.append(stopwords1)
trs_output.append(tier1)
if len(self._word_strain) > 0:
tier_tokens_echo.set_name(tier_tokens_echo.get_name() + '-2')
trs_output.append(tier_tokens_echo)
if self._options['stopwords'] is True:
stopwords2 = self.make_stop_words(tier_tokens_echo)
stopwords2.set_name(stopwords2.get_name() + '-2')
trs_output.append(stopwords2)
trs_output.append(tier2)
if output is not None:
if len(trs_output) > 0:
output_file = self.fix_out_file_ext(output)
parser = sppasTrsRW(output_file)
parser.write(trs_output)
return [output_file]
else:
raise EmptyOutputError
return trs_output
Pattern this annotation uses in an output filename.
def get_output_pattern(self):
"""Pattern this annotation uses in an output filename."""
return self._options.get('outputpattern', '-rms')
Return the end-index of the longest selected sequence.
def _get_longest_selected(self, data_spk1, data_spk2):
"""Return the end-index of the longest selected sequence."""
spk2_echo_idx = sppasLexRep.get_longest(data_spk1, data_spk2)
if spk2_echo_idx != -1:
if self.select(spk2_echo_idx, data_spk1, data_spk2):
return spk2_echo_idx
return -1
Add the source in the list of sources.
@staticmethod
def _add_source(sources, win_idx, end, dataspk):
"""Add the source in the list of sources."""
if (win_idx, end) not in sources:
lex_reprise = LexReprise(win_idx, end)
lex_reprise.set_content(dataspk)
sources.append(lex_reprise)
Return all reprises of speaker1 in speaker2.
(dict) dict of sources
key: (indexstart, indexend)
def _detect_all_sources(self, win_spk1, win_spk2):
"""Return all reprises of speaker1 in speaker2.
:return: (dict) dict of sources
- key: (index_start, index_end)
- value: the number of time the source is repeated
"""
sources = list()
prev_max_index = -1
spk1_widx = 0
while spk1_widx < len(win_spk1):
data_spk1 = win_spk1[spk1_widx]
max_index = -1
spk2_widx = 0
while spk2_widx < len(win_spk2):
data_spk2 = win_spk2[spk2_widx]
spk2_echo_idx = self._get_longest_selected(data_spk1, data_spk2)
if spk2_echo_idx > -1 and spk1_widx + spk2_echo_idx > prev_max_index:
if spk2_echo_idx > max_index:
max_index = spk2_echo_idx
if max_index == self._options['span']:
break
spk2_widx += 1
if max_index > -1:
sppasLexRep._add_source(sources, win_idx=spk1_widx, end=max_index, dataspk=data_spk1)
prev_max_index = spk1_widx + max_index
spk1_widx += 1
return sources
Merge sources if content is the same.
def _merge_sources(self, sources):
"""Merge sources if content is the same."""
return sources