Public functions
get_id
Return the identifier of the workspace (str).
View Source
def get_id(self):
"""Return the identifier of the workspace (str)."""
return self._id
add
Add an object into the data.
IMPLEMENTED ONLY FOR paths and references.
Parameters
Raises
sppasTypeError, FileAddValueError, NotImplementedError
View Source
def add(self, file_object):
"""Add an object into the data.
IMPLEMENTED ONLY FOR paths and references.
:param file_object: (FileBase)
:raises: sppasTypeError, FileAddValueError, NotImplementedError
"""
if isinstance(file_object, (FileName, FileRoot, FilePath, sppasCatReference)) is False:
raise sppasTypeError(file_object.id, 'FileBase-subclass')
test_obj = self.get_object(file_object.id)
if test_obj is not None:
raise FileAddValueError(file_object.id)
if isinstance(file_object, FilePath):
self.__paths.append(file_object)
elif isinstance(file_object, sppasCatReference):
self.add_ref(file_object)
else:
raise NotImplementedError('Adding a {} in a workspace is not implemented yet.'.format(type(file_object)))
add_file
Add file(s) in the list from a file name.
Parameters
- filename: (str) Absolute or relative name of a file
- brothers: (bool) Add also all files sharing the same root as the given file
- ctime: (float) Add files only if created/modified after time in seconds since the epoch
Returns
- (list of FileBase or None) List of added objects
Raises
OSError
View Source
def add_file(self, filename, brothers=False, ctime=0.0):
"""Add file(s) in the list from a file name.
:param filename: (str) Absolute or relative name of a file
:param brothers: (bool) Add also all files sharing the same root as the given file
:param ctime: (float) Add files only if created/modified after time in seconds since the epoch
:returns: (list of FileBase or None) List of added objects
:raises: OSError
"""
new_fp = FilePath(os.path.dirname(filename))
for fp in self.__paths:
if fp.id == new_fp.id:
new_fp = fp
added = new_fp.append(filename, brothers, ctime)
if added is None:
added = list()
elif added is not None and new_fp not in self.__paths:
self.__paths.append(new_fp)
return added
remove_file
Remove a file in the list from its file name.
Its root and path are also removed if empties, or their state is
updated.
Parameters
- filename: (str) Absolute or relative name of a file
Returns
- (list) Identifiers of removed objects
Raises
OSError
View Source
def remove_file(self, filename):
"""Remove a file in the list from its file name.
Its root and path are also removed if empties, or their state is
updated.
:param filename: (str) Absolute or relative name of a file
:returns: (list) Identifiers of removed objects
:raises: OSError
"""
if isinstance(filename, FileName):
fn_id = filename.get_id()
else:
fn_id = FileName(filename).get_id()
given_fp = FilePath(os.path.dirname(filename))
path = None
root = None
removed = list()
for fp in self.__paths:
if fp.get_id() == given_fp.get_id():
for fr in fp:
rem_id = fr.remove(fn_id)
if rem_id is not None:
removed.append(rem_id)
root = fr
path = fp
break
if root is not None:
if len(root) == 0:
removed.append(root.get_id())
path.remove(root)
else:
root.update_state()
if len(path) == 0:
removed.append(path.get_id())
self.__paths.remove(path)
else:
path.update_state()
return removed
add_ref
Add a reference in the list from its file name.
Parameters
- ref: (sppasCatReference) Reference to add
Raises
sppasTypeError, FileAddValueError
View Source
def add_ref(self, ref):
"""Add a reference in the list from its file name.
:param ref: (sppasCatReference) Reference to add
:raises: sppasTypeError, FileAddValueError
"""
if isinstance(ref, sppasCatReference) is False:
raise sppasTypeError(ref, 'sppasCatReference')
for refe in self.__refs:
if refe.id == ref.id:
raise FileAddValueError(refe.id)
self.__refs.append(ref)
remove_refs
Remove all references of the given state.
Parameters
Returns
- (int) Number of removed refs
View Source
def remove_refs(self, state=States().CHECKED):
"""Remove all references of the given state.
:param state: (States)
:returns: (int) Number of removed refs
"""
removes = list()
for ref in self.__refs:
if ref.state == state:
removes.append(ref)
for fp in self.__paths:
for fr in fp:
for fc in removes:
fr.remove_ref(fc)
nb = len(removes)
for ref in reversed(removes):
self.__refs.remove(ref)
return nb
get_refs
Return the list of references.
View Source
def get_refs(self):
"""Return the list of references."""
return self.__refs
update
Update the data: missing files, properties changed.
Empty FileRoot and FilePath are removed.
View Source
def update(self):
"""Update the data: missing files, properties changed.
Empty FileRoot and FilePath are removed.
"""
for fp in self.__paths:
for fr in reversed(fp):
for fn in reversed(fr):
fn.update_properties()
fr.update_state()
fp.update_state()
remove_files
Remove all files of the given state.
Do not remove empty roots or paths.
Parameters
Returns
View Source
def remove_files(self, state=States().CHECKED):
"""Remove all files of the given state.
Do not remove empty roots or paths.
:param state: (States)
:returns: (int)
"""
nb = 0
for fp in self.__paths:
for fr in reversed(fp):
for fn in reversed(fr):
if fn.get_state() == state:
fr.remove(fn)
nb += 1
fr.update_state()
fp.update_state()
return nb
remove_empties
Remove paths and roots without files.
View Source
def remove_empties(self):
"""Remove paths and roots without files."""
for fp in reversed(self.__paths):
for fr in reversed(fp):
if len(fr) == 0:
fp.remove(fr)
if len(fp) == 0:
self.__paths.remove(fp)
get_paths
Return all the stored paths.
Returns
View Source
def get_paths(self):
"""Return all the stored paths.
:returns: (list of FilePath)
"""
return self.__paths
get_object
Return the file object matching the given identifier.
Parameters
Returns
- (sppasWorkspace, FilePath, FileRoot, FileName, sppasCatReference)
View Source
def get_object(self, identifier):
"""Return the file object matching the given identifier.
:param identifier: (str)
:returns: (sppasWorkspace, FilePath, FileRoot, FileName, sppasCatReference)
"""
if self.id == identifier:
return self
for ref in self.get_refs():
if ref.id == identifier:
return ref
for fp in self.__paths:
obj = fp.get_object(identifier)
if obj is not None:
return obj
return None
set_object_state
Set the state of any or all FileBase within sppasWorkspace.
The default case is to set the state to all FilePath and FileRefence.
It is not allowed to manually assign one of the "AT_LEAST" states.
They are automatically fixed depending on the paths states.
Parameters
- state: (States) state to set the file to
- file_obj: (FileBase) the specific file to set the state to. None to set all files
Raises
sppasTypeError, sppasValueError
Returns
View Source
def set_object_state(self, state, file_obj=None):
"""Set the state of any or all FileBase within sppasWorkspace.
The default case is to set the state to all FilePath and FileRefence.
It is not allowed to manually assign one of the "AT_LEAST" states.
They are automatically fixed depending on the paths states.
:param state: (States) state to set the file to
:param file_obj: (FileBase) the specific file to set the state to. None to set all files
:raises: sppasTypeError, sppasValueError
:return: list of modified objects
"""
modified = list()
if file_obj is None:
for fp in self.__paths:
m = fp.set_state(state)
if m is True:
modified.append(fp)
for ref in self.__refs:
m = ref.set_state(state)
if m is True:
modified.append(ref)
elif isinstance(file_obj, sppasCatReference):
file_obj.set_state(state)
modified.append(file_obj)
elif isinstance(file_obj, FilePath):
modified = file_obj.set_state(state)
elif isinstance(file_obj, (FileRoot, FileName)):
for fp in self.__paths:
cur_obj = fp.get_object(file_obj.id)
if cur_obj is not None:
m = fp.set_object_state(state, file_obj)
if len(m) > 0:
modified.extend(m)
break
else:
logging.error('Wrong type of the object: {:s}'.format(str(type(file_obj))))
raise sppasTypeError(file_obj, 'FileBase')
return modified
set_state
Set the state of this sppasWorkspace instance.
Parameters
View Source
def set_state(self, value):
"""Set the state of this sppasWorkspace instance.
:param value: (States)
"""
warnings.warn('Do not set a state: A workspace has no state anymore.', DeprecationWarning)
associate
View Source
def associate(self):
ref_checked = self.get_reference_from_state(States().CHECKED)
if len(ref_checked) == 0:
return 0
associed = 0
for fp in self.__paths:
for fr in fp:
if fr.get_state() in (States().AT_LEAST_ONE_CHECKED, States().CHECKED):
for ref in ref_checked:
added = fr.add_ref(ref)
if added is True:
associed += 1
return associed
dissociate
View Source
def dissociate(self):
ref_checked = self.get_reference_from_state(States().CHECKED)
if len(ref_checked) == 0:
return 0
dissocied = 0
for fp in self.__paths:
for fr in fp:
if fr.get_state() in (States().AT_LEAST_ONE_CHECKED, States().CHECKED):
for ref in ref_checked:
removed = fr.remove_ref(ref)
if removed is True:
dissocied += 1
return dissocied
is_empty
Return if the instance contains information.
View Source
def is_empty(self):
"""Return if the instance contains information."""
return len(self.__paths) + len(self.__refs) == 0
get_filepath_from_state
Return every FilePath of the given state.
Parameters
View Source
def get_filepath_from_state(self, state):
"""Return every FilePath of the given state.
"""
paths = list()
for fp in self.__paths:
if fp.get_state() == state:
paths.append(fp)
return paths
get_fileroot_from_state
Return every FileRoot in the given state.
Parameters
View Source
def get_fileroot_from_state(self, state):
"""Return every FileRoot in the given state.
"""
roots = list()
for fp in self.__paths:
for fr in fp:
if fr.get_state() == state:
roots.append(fr)
return roots
get_fileroot_with_ref
Return every FileRoot with the given reference.
Parameters
View Source
def get_fileroot_with_ref(self, ref):
"""Return every FileRoot with the given reference."""
roots = list()
for fp in self.__paths:
for fr in fp:
if fr.has_ref(ref) is True:
roots.append(fr)
return roots
get_filename_from_state
Return every FileName in the given state.
Parameters
View Source
def get_filename_from_state(self, state):
"""Return every FileName in the given state.
"""
if len(self.__paths) == 0:
return list()
files = list()
for fp in self.__paths:
for fr in fp:
for fn in fr:
if fn.get_state() == state:
files.append(fn)
return files
get_reference_from_state
Return every Reference in the given state.
Parameters
View Source
def get_reference_from_state(self, state):
"""Return every Reference in the given state.
"""
if len(self.__refs) == 0:
return list()
refs = list()
for r in self.__refs:
if r.get_state() == state:
refs.append(r)
return refs
has_locked_files
View Source
def has_locked_files(self):
for fp in self.__paths:
if fp.get_state() in (States().AT_LEAST_ONE_LOCKED, States().LOCKED):
return True
return False
get_parent
Return the parent of an object.
Parameters
- filebase: (FileName or FileRoot).
Returns
Raises
sppasTypeError
View Source
def get_parent(self, filebase):
"""Return the parent of an object.
:param filebase: (FileName or FileRoot).
:returns: (FileRoot or FilePath)
:raises: sppasTypeError
"""
if isinstance(filebase, FileName):
fr = FileRoot(filebase.id)
root = self.get_object(fr.id)
return root
if isinstance(filebase, FileRoot):
fp = FilePath(os.path.dirname(filebase.id))
return self.get_object(fp.id)
raise sppasTypeError(filebase, 'FileName, FileRoot')
unlock
Unlock the given list of files.
Parameters
- entries: (list, None) List of FileName to unlock
Returns
- number of unlocked entries
View Source
def unlock(self, entries=None):
"""Unlock the given list of files.
:param entries: (list, None) List of FileName to unlock
:returns: number of unlocked entries
"""
i = 0
if entries is None:
for fp in self.__paths:
i += fp.unlock()
elif isinstance(entries, list):
for fp in self.__paths:
for fr in fp:
for fn in fr:
if fn in entries and fn.get_state() == States().LOCKED:
fn.set_state(States().CHECKED)
i += 1
if i > 0:
fr.update_state()
if i > 0:
fp.update_state()
return i