# -*- coding: UTF-8 -*-
"""
:filename: sppas.src.anndata.aio.table.py
:author: Brigitte Bigi
:contact: develop@sppas.org
:summary: Export annotated data into time-tables.
.. _This file is part of SPPAS: http://www.sppas.org/
..
-------------------------------------------------------------------------
___ __ __ __ ___
/ | \ | \ | \ / the automatic
\__ |__/ |__/ |___| \__ annotation and
\ | | | | \ analysis
___/ | | | | ___/ of speech
Copyright (C) 2011-2021 Brigitte Bigi
Laboratoire Parole et Langage, Aix-en-Provence, France
Use of this software is governed by the GNU Public License, version 3.
SPPAS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
SPPAS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SPPAS. If not, see <http://www.gnu.org/licenses/>.
This banner notice must not be removed.
-------------------------------------------------------------------------
Weka is a collection of machine learning algorithms for data mining tasks:
https://www.cs.waikato.ac.nz/ml/weka/
WEKA is supporting 2 file formats:
1. ARFF: a simple ASCII file,
2. XRFF: an XML file which can be compressed with gzip.
This file is also implementing the TRA format of SPPAS:
the Table Rich Annotations format.
ONLY writers for ARFF, XRFF and TRA are implemented.
"""
import logging
import codecs
import re
from datetime import datetime
from sppas.src.config import sg
from sppas.src.config.makeunicode import sppasUnicode
from sppas.src.config.makeunicode import b
from ..anndataexc import AioNoTiersError
from ..anndataexc import TagValueError
from ..anndataexc import AioEmptyTierError
from ..tier import sppasTier
from ..ann.annlocation import sppasLocation
from ..ann.annlocation import sppasInterval
from ..ann.annlabel import sppasLabel
from ..ann.annlabel import sppasTag
from ..ann.annlocation import sppasPoint
from .aioutils import load
from .aioutils import serialize_labels
from .basetrsio import sppasBaseIO
# ----------------------------------------------------------------------------
# Maximum number of class to predict
MAX_CLASS_TAGS = 100
# Maximum of attributes to list explicitly. Others are mentioned with "STRING".
MAX_ATTRIBUTES_TAGS = 200
# ----------------------------------------------------------------------------
[docs]class sppasTable(sppasBaseIO):
"""SPPAS Base writer for ARFF and XRFF formats.
The following metadata of the Transcription object can be defined:
- table_instance_step: time step for the data instances. Do not
define if "table_instance_anchor" is set to a tier.
- table_max_class_tags
- table_max_attributes_tags
- table_empty_annotation_tag
- table_empty_annotation_class_tag
- table_uncertain_annotation_tag
The following metadata can be defined in a tier:
- `table_attribute` is fixed if the tier will be used as attribute
(i.e. its data will be part of the instances). The value can
be "numeric" to use distributions of probabilities or
"label" to use the annotation labels in the vector of parameters.
- `table_class` is fixed to the tier with the annotation labels to
be inferred by the classification system. No matter of the value.
- `table_instance_anchor` is fixed if the tier has to be used to
define the time intervals of the instances.
- `table_epsilon` probability of an unobserved tag.
Notice that the anchor tier can also be either an attribute tier or
the class tier.
TODO: BUG IF ANCHOR == CLASS
"""
[docs] def __init__(self, name=None):
"""Initialize a new sppasTable instance.
:param name: (str) This transcription name.
"""
if name is None:
name = self.__class__.__name__
super(sppasTable, self).__init__(name)
self.software = "none"
self.trs_type = "TABLE"
self._max_class_tags = int(MAX_CLASS_TAGS/10)
self._max_attributes_tags = int(MAX_ATTRIBUTES_TAGS/10)
self._empty_annotation_tag = "none"
self._empty_annotation_class_tag = None
self._uncertain_annotation_tag = "?"
self._epsilon_proba = 0.001
self._accept_multi_tiers = True
self._accept_no_tiers = False
self._accept_metadata = True
self._accept_ctrl_vocab = True
self._accept_media = False
self._accept_hierarchy = False
self._accept_point = True
self._accept_interval = True
self._accept_disjoint = True
self._accept_alt_localization = False
self._accept_alt_tag = True
self._accept_radius = False
self._accept_gaps = True # is True only for the reference tier
self._accept_overlaps = True
# -----------------------------------------------------------------------
# Getters and setters
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
[docs] def set_empty_annotation_tag(self, tag_str):
"""Fix the annotation string to be used to replace...
empty annotations.
:param tag_str: (str)
"""
tag_str_formatted = sppasUnicode(tag_str).clear_whitespace()
if len(tag_str_formatted) > 0:
self._empty_annotation_tag = tag_str_formatted
else:
raise TagValueError(tag_str)
# -----------------------------------------------------------------------
[docs] def set_empty_annotation_class_tag(self, tag_str=None):
"""Fix the annotation string to be used to replace...
empty annotations in the class tier.
:param tag_str: (str or None) None is used to NOT fill
unlabelled annotations, so to ignore them in the data.
"""
if tag_str is None:
self._empty_annotation_class_tag = None
else:
tag_str_formatted = sppasUnicode(tag_str).clear_whitespace()
if len(tag_str_formatted) > 0:
self._empty_annotation_class_tag = tag_str_formatted
else:
raise TagValueError(tag_str)
# -----------------------------------------------------------------------
[docs] def set_uncertain_annotation_tag(self, tag_str):
"""Fix the annotation string that is used in the annotations to...
mention an uncertain label.
:param tag_str: (str)
"""
tag_str_formatted = sppasUnicode(tag_str).clear_whitespace()
if len(tag_str_formatted) > 0:
self._uncertain_annotation_tag = tag_str_formatted
else:
raise TagValueError(tag_str)
# -----------------------------------------------------------------------
# Validation methods
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
[docs] def validate_annotations(self):
"""Prepare data to be compatible with the expected format.
- Convert tier names
- Delete the existing controlled vocabularies
- Convert tags: fill empty tags, replace whitespace by underscores
"""
if self.is_empty():
raise AioNoTiersError("WEKA")
min_time_point = self.get_min_loc()
max_time_point = self.get_max_loc()
if min_time_point is None or max_time_point is None:
# it means there are only empty tiers in the transcription
raise AioNoTiersError("WEKA")
for tier in self:
is_tier_class = tier.is_meta_key("table_class")
is_tier_att = tier.is_meta_key("table_attribute")
# only change the tiers to be used (no matter of the other ones!)
if is_tier_att is False and is_tier_class is False:
continue
# Name of the tier: no whitespace
name = tier.get_name()
tier.set_name(sppasUnicode(name).clear_whitespace())
# Delete current controlled vocabulary.
# if tier.is_meta_key("table_attribute") or
# tier.is_meta_key("table_class"):
if tier.get_ctrl_vocab() is not None:
tier.set_ctrl_vocab(None)
# Convert annotation tags.
for ann in tier:
if ann.is_labelled():
for label in ann.get_labels():
if len(label) > 0:
for tag, score in label:
if tag.get_type() == "str":
# Replace whitespace by underscore and check for an empty tag.
tag_text = sppasUnicode(tag.get_content()).clear_whitespace()
if len(tag_text) == 0:
# The tag is empty. We have to fill it (or not).
if is_tier_class is False:
tag_text = self._empty_annotation_tag
else:
if self._empty_annotation_class_tag is not None:
tag_text = self._empty_annotation_class_tag
new_tag = sppasTag(tag_text)
# Set the new version of the tag to the label
if new_tag != tag:
ann.remove_tag(tag)
label.append(new_tag, score)
else:
if is_tier_class is False:
# The annotation was not labelled.
# We have to do it.
ann.set_labels(sppasLabel(sppasTag(self._empty_annotation_tag)))
else:
if self._empty_annotation_class_tag is not None:
ann.set_labels(sppasLabel(sppasTag(self._empty_annotation_class_tag)))
else:
if is_tier_class is False:
# The annotation was not labelled. We have to do it.
ann.set_labels(sppasLabel(sppasTag(self._empty_annotation_tag)))
else:
if self._empty_annotation_class_tag is not None:
ann.set_labels(sppasLabel(sppasTag(self._empty_annotation_class_tag)))
# Set the controlled vocabularies
self._create_ctrl_vocab()
# -----------------------------------------------------------------------
[docs] def validate(self):
"""Check the tiers.
Verify if everything is ok:
1. A class is defined: "table_class" in the metadata of a tier
2. Attributes are fixed: "table_attribute" in the metadata of
at least one tier
Raises IOError or ValueError if something is wrong.
"""
if self.is_empty() is True:
raise AioNoTiersError("WEKA")
if len(self) == 1:
raise IOError("The transcription must contain at least 2 tiers.")
class_tier = self._get_class_tier()
if class_tier is None:
logging.warning("The transcription should contain a class tier but it doesn't. A new empty one is added.")
class_tier = self.create_class_tier()
if class_tier.is_empty():
raise AioEmptyTierError("WEKA", class_tier.get_name())
sppasTable.check_max_class_tags(len(class_tier.get_ctrl_vocab()))
has_attribute = list()
for tier in self:
if tier.is_meta_key("table_attribute"):
has_attribute.append(tier)
if tier is class_tier:
raise IOError("A tier can be either an attribute or "
"the class. It can't be both.")
if len(has_attribute) == 0:
raise IOError("The transcription must contain attributes.")
for tier in has_attribute:
if tier.is_empty():
raise AioEmptyTierError("WEKA", tier.get_name())
has_time_slice = False
if self.is_meta_key("table_instance_step") is False:
for tier in self:
if tier.is_meta_key("table_instance_anchor"):
has_time_slice = True
else:
try:
float(self.get_meta("table_instance_step"))
except ValueError:
raise ValueError(
"The instance step must be a numerical value. "
"Got {:s}".format(self.get_meta("table_instance_step")))
has_time_slice = True
if has_time_slice is False:
raise IOError("An instance time step or an anchor tier "
"must be defined.")
# -----------------------------------------------------------------------
[docs] def create_class_tier(self):
"""Return a tier with a single annotation to be used as class."""
t = sppasTier("class")
t.set_meta("table_class", "faked")
if self.get_min_loc() is None or self.get_max_loc() is None:
raise IOError("A class tier can't be created: no min/max localization.")
begin = self.get_min_loc().copy()
end = self.get_max_loc().copy()
location = sppasLocation(sppasInterval(begin, end))
t.create_annotation(location, sppasLabel(sppasTag("-")))
t.create_ctrl_vocab()
self.append(t)
return t
# -----------------------------------------------------------------------
# Private
# -----------------------------------------------------------------------
def _create_ctrl_vocab(self):
"""Fix the controlled vocabularies of attribute tiers."""
for tier in self:
if tier.is_meta_key("table_attribute") or \
tier.is_meta_key("table_class"):
tier.create_ctrl_vocab()
# -----------------------------------------------------------------------
@staticmethod
def _tier_is_attribute(tier):
"""Check if a tier is an attribute for the classification.
:param tier: (sppasTier)
:returns: (is attribute, is numeric)
"""
if tier.is_meta_key("table_class"):
return False, False
is_att = False
is_numeric = False
if tier.is_meta_key("table_attribute"):
is_att = True
is_numeric = "numeric" in tier.get_meta("table_attribute").lower()
return is_att, is_numeric
# -----------------------------------------------------------------------
def _get_class_tier(self):
"""Return the tier which is the class or None."""
for tier in self:
if tier.is_meta_key("table_class"):
return tier
return None
# -----------------------------------------------------------------------
def _get_anchor_tier(self):
"""Return the tier which will be used to create the instances.
:return: (sppasTier or None)
"""
for tier in self:
if tier.is_meta_key("table_instance_anchor"):
return tier
return None
# -----------------------------------------------------------------------
def _get_labels(self, localization, tier):
"""Return the list of sppasLabel() at the given time in the given tier.
:param localization: (sppasPoint)
:param tier: (sppasTier)
:return: (sppasLabel) Return the empty label if no label was assigned
at the given time.
"""
# Find the annotation at the given time.
# Return the first one in case of overlapping annotations.
if tier.is_point() is True:
mindex = tier.index(localization)
else:
mindex = tier.mindex(localization, bound=10)
# TODO: return all sppasLabel() during the localization
# (i.e. during the period including the vagueness) and
# not only at the midpoint of the localization.
# And in the same idea, we have to deal with overlapping
# annotations.
labels = list()
# Fix the label to be returned: the observed one or an empty one
if mindex != -1:
ann = tier[mindex]
if ann.is_labelled():
for label in ann.get_labels():
# if len(label) > 0:
for tag, score in label:
if tag.get_content() == "":
labels.append(sppasLabel(sppasTag(self._empty_annotation_tag), score))
else:
labels.append(label)
if len(labels) == 0:
return [sppasLabel(sppasTag(self._empty_annotation_tag))]
return labels
# -----------------------------------------------------------------------
def _fix_all_possible_instance_steps(self, start_time, end_time, time_step=None, anchor_tier=None):
"""Fix all the possible time-points of the instances.
If an anchor tier is given, only labelled annotations are used
to create the instances.
:param start_time: (float)
:param end_time: (float)
:param time_step: (float)
:param anchor_tier: (sppasTier)
:returns: list of sppasPoint()
"""
class_tier = self._get_class_tier()
# Create the list of all possible points for the instances
all_points = list()
all_tags = list()
# A timer is used to fix the steps
if time_step is not None:
time_value = start_time
while (time_value + (time_step/2.)) < end_time:
# Fix the anchor point of the instance
midpoint = time_value + (time_step/2.)
radius = time_step/2.
point = sppasPoint(midpoint, radius)
all_points.append(point)
# next...
time_value += time_step
# An anchor class is used to fix the steps
# Only labelled annotations are selected
elif anchor_tier is not None:
for ann in anchor_tier:
localization = ann.get_location().get_best()
if ann.label_is_filled() is True:
if localization.is_point():
all_points.append(localization)
else:
# Fix the anchor point of the instance
duration = localization.duration()
midpoint = \
localization.get_begin().get_midpoint() + \
(duration.get_value() / 2.)
radius = (duration.get_value() +
duration.get_margin()) / 2.
point = sppasPoint(midpoint, radius)
all_points.append(point)
return all_points
# -----------------------------------------------------------------------
def _fix_instance_steps(self):
"""Fix the time-points to create the instances.
And fix the tag of the class to predict by the classification system.
The instances are created only for the labelled annotations of
the class tier.
If several classes were assigned, the instance is also ignored.
(we also could choose to predict the one with the better score)
:returns: List of (sppasPoint, tag content)
"""
class_tier = self._get_class_tier()
# The localization point to start the instances
begin = class_tier.get_first_point().get_midpoint()
# The localization point to finish the instances
end = class_tier.get_last_point().get_midpoint()
# Fix the list of candidates for the instance points
time_step = None
if self.is_meta_key("table_instance_step") is True:
time_step = float(self.get_meta("table_instance_step"))
anchor_tier = self._get_anchor_tier()
all_points = self._fix_all_possible_instance_steps(begin, end, time_step, anchor_tier)
logging.info("{:f} examples will be extracted".format(len(all_points)))
# Create the list of points for the instances
instance_points = list()
for point in all_points:
labels = self._get_labels(point, class_tier)
tags = self.__get_tags(labels)
# Append only if the class was labelled
# * * * WITH ONLY ONE LABEL * * *
if len(tags) == 1:
instance_points.append((point, tags[0]))
return instance_points
# -----------------------------------------------------------------------
def __get_tags(self, labels):
tags = list()
for label in labels:
if label is not None and label.is_tagged():
tag = label.get_best()
if tag.get_content() != self._empty_annotation_tag:
tags.append(tag.get_content())
return tags
# -----------------------------------------------------------------------
@staticmethod
def _scores_to_probas(tags):
"""Convert scores of a set of tags to probas.
"""
if len(tags) == 0:
return False
# Convert "None" scores into a numerical value
# then convert numerical values into probabilities.
if len(tags) == 1:
for tag in tags:
tags[tag] = 1.
else:
# Search for the minimum score
min_score = None
for tag in tags:
score = tags[tag]
if score is not None:
if min_score is None or min_score > score:
min_score = score
if min_score is None:
# None of the tags had a score.
min_score = 2.
# Assign a score to the tags if needed
for tag in tags:
score = tags[tag]
if score is None:
tags[tag] = min_score / 2.
# Convert scores to probabilities
total = float(sum(tags[tag] for tag in tags))
for tag in tags:
score = tags[tag]
tags[tag] = float(score) / total
# -----------------------------------------------------------------------
def _fix_data_instance(self, point):
"""Fix the data content of an instance.
Create the instance at the given point with annotations of all
attribute tiers, ended by the class.
TODO: fix data instances if points tiers, boolean/int/float tags
- tiers with points
- tiers with boolean tags
- tiers with int/float tags: should be converted to labels
:param point: (sppasPoint) The moment to be used
:returns: list of attributes (str)
"""
instances_data = list()
for tier in self:
is_att, is_numeric = sppasTable._tier_is_attribute(tier)
if is_att is False:
continue
# Get all labels of the annotation
labels = self._get_labels(point, tier)
if is_numeric is True:
# Create a list of tags
tags = dict()
for label in labels:
if label is None:
continue
if len(label) == 0:
continue
for tag, score in label:
if tag in tags:
tags[tag] += score
else:
tags[tag] = score
# Scores of observed tags are converted to probabilities
self._scores_to_probas(tags)
# Score of un-observed tags are all set to an
# epsilon probability
nb_eps_tags = len(tier.get_ctrl_vocab()) - len(tags)
epsilon = self._epsilon_proba
if tier.is_meta_key('table_epsilon'):
epsilon = float(tier.get_meta('table_epsilon'))
# ... if an uncertain tag is observed
uncertain_tag = sppasTag(self._uncertain_annotation_tag)
if uncertain_tag in tags:
score = tags[uncertain_tag]
nb_eps_tags += 1
epsilon = score / float(nb_eps_tags)
del tags[uncertain_tag]
# All possible tags are written
for tag in tier.get_ctrl_vocab():
proba = epsilon
if tag in tags:
proba = tags[tag] - (nb_eps_tags * epsilon)
instances_data.append(str(proba))
else:
content = ""
for label in labels:
if label is None:
continue
if len(label) == 0:
continue
content += label.get_best().get_content() + " "
content = content.strip()
if len(content) == 0:
content = self._empty_annotation_tag
instances_data.append(content)
return instances_data
# ---------------------------------------------------------------------------
[docs]class sppasTRA(sppasTable):
"""SPPAS TRA writer: the Table Rich Annotations format.
This format contains the set of instances separated be ';'. It can be
easily parsed like a CSV file.
"""
[docs] @staticmethod
def detect(filename):
"""Check whether a file is of CSV format or not.
:param filename: (str) Name of the file to check.
:returns: (bool)
"""
csv_line = re.compile(
'^(("([^"]|"")*"|[^",]*),)+("([^"]|"")*"|[^",]*)$')
# Open and load the content.
try:
lines = load(filename)
except:
return False
for line in lines:
if not csv_line.match(line):
return False
return True
# -----------------------------------------------------------------------
[docs] def __init__(self, name=None):
"""Initialize a new sppasTRA instance.
:param name: (str) This transcription name.
"""
if name is None:
name = self.__class__.__name__
super(sppasTRA, self).__init__(name)
self.default_extension = "tra"
self.software = "SPPAS4"
# -----------------------------------------------------------------------
# Write data
# -----------------------------------------------------------------------
[docs] def write(self, filename, signed=True):
"""Write a raw text file with data in a table.
If signed is False, the default encoding is used.
:param filename: (str)
:param signed: (bool) Indicate if the encoding is UTF-8 signed.
"""
if self.is_empty() is True:
raise AioNoTiersError(self.default_extension)
# Check metadata
self.check_metadata()
# Check the annotation tags
self.validate_annotations()
# Check if the metadata are properly fixed.
self.validate()
# OK, the given data are correct.
enc = sg.__encoding__
if signed is True:
enc = 'utf-8-sig'
# OK, we are ready to write
anchor = self._get_anchor_tier()
with codecs.open(filename, 'w', enc) as fp:
for point, class_str in self._fix_instance_steps():
# First: write the reference point
fp.write("{:.3f},".format(point.get_midpoint()))
fp.write("{:.3f},".format(point.get_radius()))
# Second: write the anchor annotation, if an anchor tier was fixed
if anchor is not None:
self.__write_anchor(anchor, point, fp)
# Then, write all attributes
data_instances = self._fix_data_instance(point)
for attribute in data_instances:
fp.write('"{}",'.format(attribute))
fp.write('"{:s}"\n'.format(str(class_str)))
fp.close()
# -----------------------------------------------------------------------
def __write_anchor(self, anchor_tier, time_point, stream):
if anchor_tier.is_point():
idx = anchor_tier.index(time_point)
if idx != -1:
ann = anchor_tier[idx]
content = serialize_labels(ann.get_labels(), separator=" ", empty="", alt=True)
mp = ann.get_lowest_localization().get_midpoint()
stream.write('{:3f},,"{:s}",'.format(mp, content))
else:
time_point.set_radius(0.)
idx = anchor_tier.mindex(time_point)
if idx != -1:
ann = anchor_tier[idx]
content = serialize_labels(ann.get_labels(), separator=" ", empty="", alt=True)
b = ann.get_lowest_localization().get_midpoint()
e = ann.get_highest_localization().get_midpoint()
stream.write('{:3f},{:3f},"{:s}",'.format(b, e, content))
# ---------------------------------------------------------------------------
[docs]class sppasARFF(sppasTable):
"""SPPAS ARFF writer.
ARFF format description is at the following URL:
http://weka.wikispaces.com/ARFF+(book+version)
An ARFF file for WEKA has the following structure:
1. Several lines starting by '%' with any kind of comment,
2. The name of the relation,
3. The set of attributes,
4. The set of instances.
"""
[docs] @staticmethod
def detect(filename):
try:
with codecs.open(filename, 'r', sg.__encoding__) as fp:
for i in range(200):
line = fp.readline()
if "@relation" in line.lower():
return True
return False
except IOError:
return False
except UnicodeDecodeError:
return False
# -----------------------------------------------------------------------
[docs] def __init__(self, name=None):
"""Initialize a new sppasARFF instance.
:param name: (str) This transcription name.
"""
if name is None:
name = self.__class__.__name__
super(sppasARFF, self).__init__(name)
self.default_extension = "arff"
self.software = "weka"
# -----------------------------------------------------------------------
# Write data
# -----------------------------------------------------------------------
[docs] def write(self, filename):
"""Write a RawText file.
:param filename: (str)
"""
if self.is_empty() is True:
raise AioNoTiersError(self.default_extension)
# Check metadata
self.check_metadata()
# Check the annotation tags.
self.validate_annotations()
# Check if the metadata are properly fixed.
self.validate()
# OK, we are ready to write
with codecs.open(filename, 'w', sg.__encoding__, buffering=8096) as fp:
# OK, we are ready to write
fp.write(sppasARFF._serialize_header())
fp.write(self._serialize_metadata())
fp.write(self._serialize_relation())
fp.write(self._serialize_attributes())
self._write_data(fp)
fp.close()
# -----------------------------------------------------------------------
# Private
# -----------------------------------------------------------------------
@staticmethod
def _serialize_header():
"""Return a standard header in comments."""
content = "% creator: {:s}\n".format(sg.__name__)
content += "% version: {:s}\n".format(sg.__version__)
content += "% date: {:s}\n".format(datetime.now().strftime("%Y-%m-%d"))
content += "% author: {:s}\n".format(sg.__author__)
content += "% license: {:s}\n".format(sg.__copyright__)
content += "% \n"
return content
# -----------------------------------------------------------------------
def _serialize_metadata(self):
"""Serialize metadata in comments."""
content = ""
for key in self.get_meta_keys():
# todo: we should ignore metadata already in the header.
value = self.get_meta(key)
content += "% {:s}: {:s}\n".format(key, value)
content += "\n\n"
return content
# -----------------------------------------------------------------------
def _serialize_relation(self):
"""Serialize the relation of the ARFF file."""
content = "@RELATION {:s}\n".format(self.get_name())
content += "\n"
return content
# -----------------------------------------------------------------------
@staticmethod
def _serialize_attributes_ctrl_vocab(tier, is_class=False):
"""Serialize the controlled vocabulary in an attribute set.
:param tier: (sppasTier)
"""
# Prepare the list of strings to write
tags = list()
for tag in tier.get_ctrl_vocab():
tags.append(tag.get_content())
# Write the name of the attribute serie
content = "@ATTRIBUTES "
if is_class is True:
content += "class "
else:
content += "{:s} ".format(tier.get_name())
# Write the attributes
content += "{"
content += "{:s}".format(",".join(tags))
content += "}\n"
return content
# -----------------------------------------------------------------------
def _serialize_attributes(self):
"""Write the attributes of the ARFF file.
Attributes are corresponding to the controlled vocabulary.
They are the list of possible tags of the annotations, except
for the numerical ones.
It is supposed that the transcription has been already validated.
"""
content = ""
for tier in self:
is_att, is_numeric = sppasTable._tier_is_attribute(tier)
if is_att is False:
continue
if is_numeric is True:
# Tags will be converted to probabilities
for tag in tier.get_ctrl_vocab():
# Do not write an uncertain label in that situation.
if tag.get_content() != self._uncertain_annotation_tag:
attribute_name = tier.get_name() + "-" + tag.get_content()
content += "@ATTRIBUTES {:s} NUMERIC\n" \
"".format(attribute_name)
else:
# Either a generic "string" or we can explicitly fix the list
if len(tier.get_ctrl_vocab()) > self._max_attributes_tags:
content += "@ATTRIBUTES {:s} STRING\n" \
"".format(tier.get_name())
else:
content += sppasARFF._serialize_attributes_ctrl_vocab(tier)
tier = self._get_class_tier()
content += sppasARFF._serialize_attributes_ctrl_vocab(
tier,
is_class=True)
content += "\n"
return content
# -----------------------------------------------------------------------
def _write_data(self, fp):
"""Write the data content of the ARFF file.
Data are the tags of the annotations or distributions of
probabilities.
* Each instance is represented on a single line, with carriage
returns denoting the end of the instance.
* Attribute values for each instance are delimited by commas.
They must appear in the order that they were declared in the header.
* Missing values are represented by a single question mark
* Values of string and nominal attributes are case sensitive,
and any that contain space must be quoted
:param fp: FileDescriptor
"""
fp.write(b("@DATA\n"))
for point, class_str in self._fix_instance_steps():
line = ""
data_instances = self._fix_data_instance(point)
for attribute in data_instances:
line += attribute
line += ","
line += str(class_str)
line += "\n"
fp.write(b(line))
# ----------------------------------------------------------------------------
[docs]class sppasXRFF(sppasTable):
"""SPPAS XRFF writer.
XML-based format of WEKA software tool.
XRFF format description is at the following URL:
http://weka.wikispaces.com/XRFF
This class is limited to:
1. Only the writers are implemented. No readers.
2. Sparse option is not supported by both writers.
3. XRFF output file is not gzipped.
4. XRFF format supports the followings that are not currently \
implemented into this class:
- attribute weights;
- instance weights.
-- !!!!!!!! No guarantee !!!!!! --
This class has never been tested.
-- !!!!!!!! No guarantee !!!!!! --
"""
[docs] @staticmethod
def detect(filename):
try:
with codecs.open(filename, 'r', 'utf-8') as fp:
for i in range(200):
line = fp.readline()
if "<dataset " in line.lower():
return True
except Exception:
return False
return False
# -----------------------------------------------------------------------
[docs] def __init__(self, name=None):
"""Initialize a new sppasXRFF instance.
:param name: (str) This transcription name.
"""
if name is None:
name = self.__class__.__name__
super(sppasXRFF, self).__init__(name)
self.default_extension = "xrff"
# -----------------------------------------------------------------------
# Write data
# -----------------------------------------------------------------------
[docs] def write(self, filename):
"""Write a XRFF file.
:param filename: (str)
"""
if self.is_empty() is True:
raise AioNoTiersError(self.default_extension)
# Check metadata
self.check_metadata()
# Check the annotation tags.
self.validate_annotations()
# Check if the metadata are properly fixed.
self.validate()
# OK, we are ready to write
with codecs.open(filename, 'w', sg.__encoding__, buffering=8096) as fp:
fp.write(b('<?xml version="1.0" encoding="utf-8"?>\n'))
fp.write(b("\n"))
fp.write(b('<dataset name="{:s}" />\n'.format(self.get_name())))
fp.write(b("\n"))
fp.write(b('<header>\n'))
self._write_attributes(fp)
fp.write(b('</header>\n'))
fp.write(b('\n'))
fp.write(b('<body>\n'))
self._write_instances(fp)
fp.write(b('</body>\n'))
fp.close()
# -----------------------------------------------------------------------
# Private
# -----------------------------------------------------------------------
@staticmethod
def _write_attribute_ctrl_vocab(tier, fp, is_class=False):
"""Write the controlled vocabulary in an attribute set.
:param tier: (sppasTier)
:param fp: FileDescription
:param is_class: (boolean)
"""
fp.write(b(' <attribute '))
if is_class is True:
fp.write(b('class="yes" '))
fp.write(b('name="{:s}" type="nominal">\n'.format(tier.get_name())))
fp.write(b(' <labels>\n'))
for tag in tier.get_ctrl_vocab():
fp.write(b(" <label>{:s}</label>\n"
"".format(tag.get_content())))
fp.write(b(' </labels>\n'))
fp.write(b(' </attribute>\n'))
# -----------------------------------------------------------------------
def _write_attributes(self, fp):
"""Write the attributes of the ARFF file.
Attributes are corresponding to the controlled vocabulary.
They are the list of possible tags of the annotations, except
for the numerical ones.
It is supposed that the transcription has been already validated.
"""
fp.write(b(' <attributes>\n'))
for tier in self:
is_att, is_numeric = sppasTable._tier_is_attribute(tier)
if is_att is False:
continue
if is_numeric is True:
# Tags will be converted to probabilities
for tag in tier.get_ctrl_vocab():
# Do not write an uncertain label in that situation.
if tag.get_content() != self._uncertain_annotation_tag:
attribute_name = \
tier.get_name() + "-" + tag.get_content()
fp.write(b(' <attribute name="{:s}" '
'type="numeric" />\n'
''.format(attribute_name)))
else:
# Either a generic "string" or we can explicitly fix the list
if len(tier.get_ctrl_vocab()) > self._max_attributes_tags:
fp.write(b(' <attribute name="{:s}" '
'type="nominal" />\n'
''.format(tier.get_name())))
else:
# The controlled vocabulary
fp.write(b(' <attribute name="{:s}" '
'type="nominal">'.format(tier.get_name())))
fp.write(' <labels>\n')
for tag in tier.get_ctrl_vocab():
fp.write(b(" <label>{:s}"
"</label>\n"
"".format(tag.get_content())))
fp.write(b(' </labels>\n'))
fp.write(b(' </attribute>\n'))
tier = self._get_class_tier()
self._write_attribute_ctrl_vocab(tier, fp, is_class=True)
fp.write(b(' </attributes>\n'))
# -----------------------------------------------------------------------
def _write_instances(self, fp):
"""Write the data content of the XRFF file.
Data are the tags of the annotations or distributions of
probabilities.
:param fp: FileDescriptor
"""
fp.write(b(" <instances>\n"))
for point, class_str in self._fix_instance_steps():
data_instances = self._fix_data_instance(point)
fp.write(b(" <instance>\n"))
for attribute in data_instances:
fp.write(b(" <value>{!s:s}</value>\n"
"".format(attribute)))
fp.write(b(" <value>{!s:s}</value>\n"
"".format(class_str)))
fp.write(b(" </instance>\n"))
fp.write(b(" </instances>\n"))