Source code for wkps.wio.wjson

# -*- coding: utf-8 -*-
"""
:filename: sppas.src.wkps.wio.wjson.py
:author:   Brigitte Bigi
:contact:  develop@sppas.org
:summary:  A reader/writer of wjson workspace file format.

.. _This file is part of SPPAS: http://www.sppas.org/
..
    -------------------------------------------------------------------------

     ___   __    __    __    ___
    /     |  \  |  \  |  \  /              the automatic
    \__   |__/  |__/  |___| \__             annotation and
       \  |     |     |   |    \             analysis
    ___/  |     |     |   | ___/              of speech

    Copyright (C) 2011-2021  Brigitte Bigi
    Laboratoire Parole et Langage, Aix-en-Provence, France

    Use of this software is governed by the GNU Public License, version 3.

    SPPAS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    SPPAS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with SPPAS. If not, see <http://www.gnu.org/licenses/>.

    This banner notice must not be removed.

    -------------------------------------------------------------------------

"""

import os
import json
import logging
import codecs
from collections import OrderedDict

from sppas.src.config import sg

from ..filebase import FileBase, States
from ..filestructure import FilePath, FileRoot, FileName
from ..fileref import sppasCatReference, sppasRefAttribute
from ..wkpexc import FileOSError

from .basewkpio import sppasBaseWkpIO

# ---------------------------------------------------------------------------


[docs]class sppasWJSON(sppasBaseWkpIO): """Reader and writer of a workspace in wjson format. """
[docs] def __init__(self, name=None): """Initialize a sppasWJSON instance. :param name: (str) The name of the workspace """ if name is None: name = self.__class__.__name__ super(sppasWJSON, self).__init__(name) self.default_extension = "wjson" self.software = sg.__name__
# -----------------------------------------------------------------------
[docs] @staticmethod def detect(filename): """Check whether a file is of wjson format or not. :param filename: (str) Name of the file to detect :returns: (bool) """ try: with open(filename, 'r') as f: f.readline() doctype_line = f.readline().strip() f.close() except IOError: return False except UnicodeDecodeError: return False return "wjson" in doctype_line
# -----------------------------------------------------------------------
[docs] def read(self, filename): """Read a wjson file and fill the the sppasWSJON. :param filename: (str) """ if os.path.exists(filename) is False: raise FileOSError(filename) with codecs.open(filename, 'r', "UTF-8") as f: d = json.load(f) self._parse(d)
# -----------------------------------------------------------------------
[docs] def write(self, filename): """Write in the filename. :param filename: (str) """ serialized_dict = self._serialize() with codecs.open(filename, 'w', "UTF-8") as f: json.dump(serialized_dict, f, indent=4, separators=(',', ': '))
# ----------------------------------------------------------------------- def _serialize(self): """Convert this sppasWJSON instance into a serializable structure. :returns: (dict) a dictionary that can be serialized """ d = OrderedDict() # Factual information about this file and this sppasWJSON d['wjson'] = "2.0" # WJSON format version d['software'] = self.software # the tool that is writing this file d['version'] = sg.__version__ # the version of the tool d['id'] = self.id # identifier of the wkp # The list of paths/roots/files stored in this sppasWorkspace() d['paths'] = list() for fp in self.get_paths(): d['paths'].append(self._serialize_path(fp)) # The list of references/attributes stored in this sppasWorkspace() d['catalogue'] = list() for fref in self.get_refs(): d['catalogue'].append(self._serialize_ref(fref)) return d # ----------------------------------------------------------------------- def _serialize_ref(self, fref): """Convert a sppasCatReference into a serializable structure. :param fref: (sppasCatReference) :returns: (dict) a dictionary that can be serialized """ dict_ref = dict() dict_ref["id"] = fref.get_id() dict_ref["state"] = fref.get_state() dict_ref["type"] = fref.get_type() dict_ref["subjoin"] = fref.subjoined dict_ref["attributes"] = list() # serialize the attributes in a reference for att in fref: dict_ref["attributes"].append(self._serialize_attributes(att)) return dict_ref # ----------------------------------------------------------------------- @staticmethod def _serialize_attributes(att): """Convert a sppasRefAttribute into a serializable structure. :param att: (sppasRefAttribute) :returns: (dict) a dictionary that can be serialized """ dict_att = dict() dict_att["id"] = att.get_id() dict_att["value"] = att.get_value() dict_att["type"] = att.get_value_type() dict_att["descr"] = att.get_description() return dict_att # ----------------------------------------------------------------------- def _serialize_path(self, fp): """Convert a FilePath into a serializable structure. :param fp: (FilePath) :returns: (dict) a dictionary that can be serialize """ dict_path = dict() # Systematically save the path with the "/" instead of the os separator path = fp.get_id() path = path.replace(os.sep, "/") dict_path["id"] = path # Save the relative path try: rel_path = os.path.relpath(fp.get_id()) rel_path = rel_path.replace(os.sep, "/") dict_path["rel"] = rel_path except ValueError: # On Windows, a ValueError is raised if the current directory and # the start path are not on the same drive. dict_path["rel"] = fp.get_id() # serialize the roots dict_path["roots"] = list() for fr in fp: dict_path["roots"].append(self._serialize_root(fr)) if fp.subjoined is not None: dict_path['subjoin'] = fp.subjoined return dict_path # ----------------------------------------------------------------------- def _serialize_root(self, fr): """Convert a FileRoot into a serializable structure. :param fr: (FileRoot) :returns: (dict) a dictionary that can be serialize """ dict_root = dict() dict_root["id"] = fr.get_id().split(os.sep)[-1] # serialize files dict_root["files"] = list() for fn in fr: dict_root["files"].append(self._serialize_files(fn)) # references identifiers are stored into a list dict_root["refids"] = list() for ref in fr.get_references(): dict_root["refids"].append(ref.get_id()) # subjoined data are simply added as-it # (it's risky, the embedded data could be un-serializable by json...) if fr.subjoined is not None: dict_root['subjoin'] = fr.subjoined return dict_root # ----------------------------------------------------------------------- @staticmethod def _serialize_files(fn): """Convert a FileName into a serializable structure. :param fn: (FileName) :returns: (dict) a dictionary that can be serialized """ dict_file = dict() dict_file["id"] = fn.get_id().split(os.sep)[-1] dict_file["state"] = fn.get_state() # subjoined data are simply added as-it if fn.subjoined is not None: dict_file['subjoin'] = fn.subjoined return dict_file # ----------------------------------------------------------------------- def _parse(self, d): """Fill the data of a sppasWJSON reader with the given dictionary. :param d: (dict) :returns: the id of the workspace """ try: wid = d.get('id', None) if wid is None: raise KeyError("Workspace 'id' is missing of the dictionary to parse. ") self._id = FileBase.validate_id(wid) except ValueError as e: # We keep our current 'id' logging.warning(str(e)) except KeyError as e: logging.warning(str(e)) version = d.get('wjson', None) if version is None: raise KeyError("Version of the wjson is missing of the dictionary to parse. ") if 'catalogue' in d: for dict_ref in d['catalogue']: try: self._parse_ref(dict_ref) except KeyError as e: logging.error("Reference can't be saved to the workspace: {:s}" "".format(str(e))) if 'paths' in d: for dict_path in d['paths']: try: self._parse_path(dict_path, version) except KeyError as e: logging.error("Path can't be saved to the workspace: {:s}" "".format(str(e))) return self.id # ----------------------------------------------------------------------- def _parse_ref(self, d): """Fill in the ref of a sppasWJSON reader with the given dictionary. :param d: (dict) :returns: (sppasCatReference) """ if "id" not in d: raise KeyError("reference 'id' is missing of the dictionary to parse.") fr = sppasCatReference(d["id"]) if 'type' in d: fr.set_type(d["type"]) if 'attributes' in d: for att_dict in d["attributes"]: fr.append(self._parse_attribute(att_dict)) # parse the state value s = d.get('state', States().UNUSED) if s > 0: fr.set_state(States().CHECKED) else: fr.set_state(States().UNUSED) self.add(fr) return fr # ----------------------------------------------------------------------- def _parse_path(self, d, version): """Fill in the paths of a sppasWJSON reader with the given dictionary. :param d: (dict) :param version: (str) Indicate the version of the wjson :returns: (FilePath) """ if 'id' not in d: raise KeyError("path 'id' is missing of the dictionary to parse.") path = d["id"] # Ensure we'll use the separator of the current system path = path.replace("/", os.sep) # check if the entry path exists if os.path.exists(d["id"]) is False: logging.debug("Absolute path {:s} does not exist.".format(path)) # check if the relative path exists. "rel" was introduced in v2.0. # check if "rel" exists for compatibility with v1.0. if "rel" in d: rel_path = os.path.abspath(d["rel"]) rel_path = rel_path.replace("/", os.sep) if os.path.exists(rel_path) is True: logging.debug("Relative path {:s} exists.".format(rel_path)) path = rel_path else: logging.debug("Relative path {:s} does not exist too.".format(rel_path)) # in any case, create the corresponding object fp = FilePath(path) # parse its roots if 'roots' in d: for dict_root in d["roots"]: fr = self._parse_root(dict_root, path, version) fp.append(fr) # append subjoined fp.subjoined = d.get('subjoin', None) self.add(fp) return fp # ----------------------------------------------------------------------- def _parse_root(self, d, path, version): """Fill in the root of a sppasWJSON reader with the given dictionary. :param d: (dict) :param path: (str) path of the file used to create the whole path of the file as only the name of the file is kept in the wjson file :param version: (str) Indicate the version of the wjson :returns: (FileRoot) """ if "id" not in d: raise KeyError("root 'id' is missing of the dictionary to parse.") if version == "1.0": fr = FileRoot(d["id"]) else: fr = FileRoot(path + os.sep + d["id"]) if "files" in d: for dict_file in d["files"]: fr.append(self._parse_file(dict_file, path, version)) for ref in d["refids"]: all_refs = self.get_refs() refe = None for r in all_refs: if r.get_id() == ref: refe = r break if refe is None: # if the file is well-formed, this should never happen... refe = sppasCatReference(ref) fr.add_ref(refe) # append subjoined dict "as it" fr.subjoined = d.get('subjoin', None) return fr # ----------------------------------------------------------------------- @staticmethod def _parse_file(d, path, version): """Fill in the files of a sppasWJSON reader with the given dictionary. :param d: (dict) :param path: (str) path of the file used to create the whole path of the file as only the name of the file is kept in the wjson file :param version: (str) Indicate the version of the wjson :returns: (FileName) """ if 'id' not in d: raise KeyError("file 'id' is missing of the dictionary to parse.") if version == "1.0": fn = FileName(d["id"]) else: fn = FileName(path + os.sep + d["id"]) # parse the state value s = d.get('state', States().UNUSED) if s > 0: fn.set_state(States().CHECKED) else: fn.set_state(States().UNUSED) # append subjoined dict "as it" fn.subjoined = d.get('subjoin', None) return fn # ----------------------------------------------------------------------- @staticmethod def _parse_attribute(d): """Fill in the attribute of a sppasWJSON reader with the given dictionary. :param d: (dict) :returns: (sppasRefAttribute) """ if 'id' not in d: raise KeyError("attribute 'id' is missing of the dictionary to parse.") att = sppasRefAttribute(d['id']) att.set_value(d["value"]) att.set_value_type(d["type"]) att.set_description(d["descr"]) return att