SPPAS 4.22

https://sppas.org/

Module sppas.src.wkps

Class sppasWorkspace

Description

Represent the data linked to a list of files and a list of references.

sppasWorkspace is the container for a list of files and a catalog. It organizes files hierarchically as a collection of FilePath instances, each of which is a collection of FileRoot instances, each of which is a collection of FileName. The catalog is a list of sppasCatReference instances each of which is a list of key/att-value.

Constructor

Constructor of a sppasWorkspace.

Parameters
  • identifier: (str)
View Source
def __init__(self, identifier=str(uuid.uuid4())):
    """Constructor of a sppasWorkspace.

    :param identifier: (str)

    """
    self._id = identifier
    self.__paths = list()
    self.__refs = list()

Public functions

get_id

Return the identifier of the workspace (str).

View Source
def get_id(self):
    """Return the identifier of the workspace (str)."""
    return self._id

add

Add an object into the data.

IMPLEMENTED ONLY FOR paths and references.

Parameters
  • file_object: (FileBase)
Raises

sppasTypeError, FileAddValueError, NotImplementedError

View Source
def add(self, file_object):
    """Add an object into the data.

        IMPLEMENTED ONLY FOR paths and references.

        :param file_object: (FileBase)
        :raises: sppasTypeError, FileAddValueError, NotImplementedError

        """
    if isinstance(file_object, (FileName, FileRoot, FilePath, sppasCatReference)) is False:
        raise sppasTypeError(file_object.id, 'FileBase-subclass')
    test_obj = self.get_object(file_object.id)
    if test_obj is not None:
        raise FileAddValueError(file_object.id)
    if isinstance(file_object, FilePath):
        self.__paths.append(file_object)
    elif isinstance(file_object, sppasCatReference):
        self.add_ref(file_object)
    else:
        raise NotImplementedError('Adding a {} in a workspace is not implemented yet.'.format(type(file_object)))

add_file

Add file(s) in the list from a file name.

Parameters
  • filename: (str) Absolute or relative name of a file
  • brothers: (bool) Add also all files sharing the same root as the given file
  • ctime: (float) Add files only if created/modified after time in seconds since the epoch
Returns
  • (list of FileBase or None) List of added objects
Raises

OSError

View Source
def add_file(self, filename, brothers=False, ctime=0.0):
    """Add file(s) in the list from a file name.

        :param filename: (str) Absolute or relative name of a file
        :param brothers: (bool) Add also all files sharing the same root as the given file
        :param ctime: (float) Add files only if created/modified after time in seconds since the epoch
        :returns: (list of FileBase or None) List of added objects
        :raises: OSError

        """
    new_fp = FilePath(os.path.dirname(filename))
    for fp in self.__paths:
        if fp.id == new_fp.id:
            new_fp = fp
    added = new_fp.append(filename, brothers, ctime)
    if added is None:
        added = list()
    elif added is not None and new_fp not in self.__paths:
        self.__paths.append(new_fp)
    return added

remove_file

Remove a file in the list from its file name.

Its root and path are also removed if empties, or their state is updated.

Parameters
  • filename: (str) Absolute or relative name of a file
Returns
  • (list) Identifiers of removed objects
Raises

OSError

View Source
def remove_file(self, filename):
    """Remove a file in the list from its file name.

        Its root and path are also removed if empties, or their state is
        updated.

        :param filename: (str) Absolute or relative name of a file
        :returns: (list) Identifiers of removed objects
        :raises: OSError

        """
    if isinstance(filename, FileName):
        fn_id = filename.get_id()
    else:
        fn_id = FileName(filename).get_id()
    given_fp = FilePath(os.path.dirname(filename))
    path = None
    root = None
    removed = list()
    for fp in self.__paths:
        if fp.get_id() == given_fp.get_id():
            for fr in fp:
                rem_id = fr.remove(fn_id)
                if rem_id is not None:
                    removed.append(rem_id)
                    root = fr
                    path = fp
                    break
    if root is not None:
        if len(root) == 0:
            removed.append(root.get_id())
            path.remove(root)
        else:
            root.update_state()
        if len(path) == 0:
            removed.append(path.get_id())
            self.__paths.remove(path)
        else:
            path.update_state()
    return removed

add_ref

Add a reference in the list from its file name.

Parameters
  • ref: (sppasCatReference) Reference to add
Raises

sppasTypeError, FileAddValueError

View Source
def add_ref(self, ref):
    """Add a reference in the list from its file name.

        :param ref: (sppasCatReference) Reference to add
        :raises: sppasTypeError, FileAddValueError

        """
    if isinstance(ref, sppasCatReference) is False:
        raise sppasTypeError(ref, 'sppasCatReference')
    for refe in self.__refs:
        if refe.id == ref.id:
            raise FileAddValueError(refe.id)
    self.__refs.append(ref)

remove_refs

Remove all references of the given state.

Parameters
  • state: (States)
Returns
  • (int) Number of removed refs
View Source
def remove_refs(self, state=States().CHECKED):
    """Remove all references of the given state.

        :param state: (States)
        :returns: (int) Number of removed refs

        """
    removes = list()
    for ref in self.__refs:
        if ref.state == state:
            removes.append(ref)
    for fp in self.__paths:
        for fr in fp:
            for fc in removes:
                fr.remove_ref(fc)
    nb = len(removes)
    for ref in reversed(removes):
        self.__refs.remove(ref)
    return nb

get_refs

Return the list of references.

View Source
def get_refs(self):
    """Return the list of references."""
    return self.__refs

update

Update the data: missing files, properties changed.

Empty FileRoot and FilePath are removed.

View Source
def update(self):
    """Update the data: missing files, properties changed.

        Empty FileRoot and FilePath are removed.

        """
    for fp in self.__paths:
        for fr in reversed(fp):
            for fn in reversed(fr):
                fn.update_properties()
            fr.update_state()
        fp.update_state()

remove_files

Remove all files of the given state.

Do not remove empty roots or paths.

Parameters
  • state: (States)
Returns
  • (int)
View Source
def remove_files(self, state=States().CHECKED):
    """Remove all files of the given state.

        Do not remove empty roots or paths.

        :param state: (States)
        :returns: (int)

        """
    nb = 0
    for fp in self.__paths:
        for fr in reversed(fp):
            for fn in reversed(fr):
                if fn.get_state() == state:
                    fr.remove(fn)
                    nb += 1
            fr.update_state()
        fp.update_state()
    return nb

remove_empties

Remove paths and roots without files.

View Source
def remove_empties(self):
    """Remove paths and roots without files."""
    for fp in reversed(self.__paths):
        for fr in reversed(fp):
            if len(fr) == 0:
                fp.remove(fr)
        if len(fp) == 0:
            self.__paths.remove(fp)

get_paths

Return all the stored paths.

Returns
  • (list of FilePath)
View Source
def get_paths(self):
    """Return all the stored paths.

        :returns: (list of FilePath)

        """
    return self.__paths

get_object

Return the file object matching the given identifier.

Parameters
  • identifier: (str)
Returns
  • (sppasWorkspace, FilePath, FileRoot, FileName, sppasCatReference)
View Source
def get_object(self, identifier):
    """Return the file object matching the given identifier.

        :param identifier: (str)
        :returns: (sppasWorkspace, FilePath, FileRoot, FileName, sppasCatReference)

        """
    if self.id == identifier:
        return self
    for ref in self.get_refs():
        if ref.id == identifier:
            return ref
    for fp in self.__paths:
        obj = fp.get_object(identifier)
        if obj is not None:
            return obj
    return None

set_object_state

Set the state of any or all FileBase within sppasWorkspace.

The default case is to set the state to all FilePath and FileRefence.

It is not allowed to manually assign one of the "AT_LEAST" states. They are automatically fixed depending on the paths states.

Parameters
  • state: (States) state to set the file to
  • file_obj: (FileBase) the specific file to set the state to. None to set all files
Raises

sppasTypeError, sppasValueError

Returns
  • list of modified objects
View Source
def set_object_state(self, state, file_obj=None):
    """Set the state of any or all FileBase within sppasWorkspace.

        The default case is to set the state to all FilePath and FileRefence.

        It is not allowed to manually assign one of the "AT_LEAST" states.
        They are automatically fixed depending on the paths states.

        :param state: (States) state to set the file to
        :param file_obj: (FileBase) the specific file to set the state to. None to set all files
        :raises: sppasTypeError, sppasValueError
        :return: list of modified objects

        """
    modified = list()
    if file_obj is None:
        for fp in self.__paths:
            m = fp.set_state(state)
            if m is True:
                modified.append(fp)
        for ref in self.__refs:
            m = ref.set_state(state)
            if m is True:
                modified.append(ref)
    elif isinstance(file_obj, sppasCatReference):
        file_obj.set_state(state)
        modified.append(file_obj)
    elif isinstance(file_obj, FilePath):
        modified = file_obj.set_state(state)
    elif isinstance(file_obj, (FileRoot, FileName)):
        for fp in self.__paths:
            cur_obj = fp.get_object(file_obj.id)
            if cur_obj is not None:
                m = fp.set_object_state(state, file_obj)
                if len(m) > 0:
                    modified.extend(m)
                break
    else:
        logging.error('Wrong type of the object: {:s}'.format(str(type(file_obj))))
        raise sppasTypeError(file_obj, 'FileBase')
    return modified

set_state

Set the state of this sppasWorkspace instance.

Parameters
  • value: (States)
View Source
def set_state(self, value):
    """Set the state of this sppasWorkspace instance.

        :param value: (States)

        """
    warnings.warn('Do not set a state: A workspace has no state anymore.', DeprecationWarning)

associate

View Source
def associate(self):
    ref_checked = self.get_reference_from_state(States().CHECKED)
    if len(ref_checked) == 0:
        return 0
    associed = 0
    for fp in self.__paths:
        for fr in fp:
            if fr.get_state() in (States().AT_LEAST_ONE_CHECKED, States().CHECKED):
                for ref in ref_checked:
                    added = fr.add_ref(ref)
                    if added is True:
                        associed += 1
    return associed

dissociate

View Source
def dissociate(self):
    ref_checked = self.get_reference_from_state(States().CHECKED)
    if len(ref_checked) == 0:
        return 0
    dissocied = 0
    for fp in self.__paths:
        for fr in fp:
            if fr.get_state() in (States().AT_LEAST_ONE_CHECKED, States().CHECKED):
                for ref in ref_checked:
                    removed = fr.remove_ref(ref)
                    if removed is True:
                        dissocied += 1
    return dissocied

is_empty

Return if the instance contains information.

View Source
def is_empty(self):
    """Return if the instance contains information."""
    return len(self.__paths) + len(self.__refs) == 0

get_filepath_from_state

Return every FilePath of the given state.

Parameters
  • state
View Source
def get_filepath_from_state(self, state):
    """Return every FilePath of the given state.

        """
    paths = list()
    for fp in self.__paths:
        if fp.get_state() == state:
            paths.append(fp)
    return paths

get_fileroot_from_state

Return every FileRoot in the given state.

Parameters
  • state
View Source
def get_fileroot_from_state(self, state):
    """Return every FileRoot in the given state.

        """
    roots = list()
    for fp in self.__paths:
        for fr in fp:
            if fr.get_state() == state:
                roots.append(fr)
    return roots

get_fileroot_with_ref

Return every FileRoot with the given reference.

Parameters
  • ref
View Source
def get_fileroot_with_ref(self, ref):
    """Return every FileRoot with the given reference."""
    roots = list()
    for fp in self.__paths:
        for fr in fp:
            if fr.has_ref(ref) is True:
                roots.append(fr)
    return roots

get_filename_from_state

Return every FileName in the given state.

Parameters
  • state
View Source
def get_filename_from_state(self, state):
    """Return every FileName in the given state.

        """
    if len(self.__paths) == 0:
        return list()
    files = list()
    for fp in self.__paths:
        for fr in fp:
            for fn in fr:
                if fn.get_state() == state:
                    files.append(fn)
    return files

get_reference_from_state

Return every Reference in the given state.

Parameters
  • state
View Source
def get_reference_from_state(self, state):
    """Return every Reference in the given state.

        """
    if len(self.__refs) == 0:
        return list()
    refs = list()
    for r in self.__refs:
        if r.get_state() == state:
            refs.append(r)
    return refs

has_locked_files

View Source
def has_locked_files(self):
    for fp in self.__paths:
        if fp.get_state() in (States().AT_LEAST_ONE_LOCKED, States().LOCKED):
            return True
    return False

get_parent

Return the parent of an object.

Parameters
  • filebase: (FileName or FileRoot).
Returns
  • (FileRoot or FilePath)
Raises

sppasTypeError

View Source
def get_parent(self, filebase):
    """Return the parent of an object.

        :param filebase: (FileName or FileRoot).
        :returns: (FileRoot or FilePath)
        :raises: sppasTypeError

        """
    if isinstance(filebase, FileName):
        fr = FileRoot(filebase.id)
        root = self.get_object(fr.id)
        return root
    if isinstance(filebase, FileRoot):
        fp = FilePath(os.path.dirname(filebase.id))
        return self.get_object(fp.id)
    raise sppasTypeError(filebase, 'FileName, FileRoot')

unlock

Unlock the given list of files.

Parameters
  • entries: (list, None) List of FileName to unlock
Returns
  • number of unlocked entries
View Source
def unlock(self, entries=None):
    """Unlock the given list of files.

        :param entries: (list, None) List of FileName to unlock
        :returns: number of unlocked entries

        """
    i = 0
    if entries is None:
        for fp in self.__paths:
            i += fp.unlock()
    elif isinstance(entries, list):
        for fp in self.__paths:
            for fr in fp:
                for fn in fr:
                    if fn in entries and fn.get_state() == States().LOCKED:
                        fn.set_state(States().CHECKED)
                        i += 1
                if i > 0:
                    fr.update_state()
            if i > 0:
                fp.update_state()
    return i

Overloads

__contains__

View Source
def __contains__(self, ident):
    for fp in self.__paths:
        if fp.id == ident:
            return True
    for ref in self.__refs:
        if ref.id == ident:
            return True
    return False

__hash__

View Source
def __hash__(self):
    return hash((self.get_id(), self.__paths, self.__refs))