SPPAS 4.22

https://sppas.org/

Module sppas.preinstall

Class Installer

Description

Manage the installation of external required or optional features.

It will browse the Features() to install, according to the OS of the computer. Must be sub-classed to create the appropriate Features(). Then, the installation is launched with:

Example
 >>> class SubInstaller(Installer):
 >>> def __init__(self):
 >>> super(SubInstaller, self).__init__()
 >>> self._features = Features(req="", cmdos="")
 >>> SubInstaller().install()

Constructor

Create a new Installer instance.

View Source
def __init__(self):
    """Create a new Installer instance. """
    self.__pbar = None
    self.__pvalue = 0
    self.__nb_steps = -1
    self._features = None
    self._pypi_local_dist = " --find-links '" + paths.dist + "' "
    logging.debug('Local distributions of dependencies: {:s}'.format(self._pypi_local_dist))
    self._python = self.__search_python_cmd()
    logging.info(" ... the 'installer' is using the Python command: {:s}".format(self._python))
    self.update_pip()

Public functions

set_progress

Set the progress bar.

Parameters
  • progress: (BaseProcessProgress) The installation progress.
View Source
def set_progress(self, progress):
    """Set the progress bar.

        :param progress: (BaseProcessProgress) The installation progress.

        """
    self.__pbar = progress

get_fids

Return the list of feature identifiers.

Parameters
  • feat_type: (str) Only return features of the given type.
Returns
  • (list)
View Source
def get_fids(self, feat_type=None):
    """Return the list of feature identifiers.

        :param feat_type: (str) Only return features of the given type.
        :returns: (list)

        """
    if feat_type is None:
        return self._features.get_ids()
    f = list()
    for fid in self._features.get_ids():
        if self.feature_type(fid) == feat_type:
            f.append(fid)
    return f

feature_type

Return the feature type: deps, lang, annot.

Parameters
  • fid: (str) Identifier of a feature
View Source
def feature_type(self, fid):
    """Return the feature type: deps, lang, annot.

        :param fid: (str) Identifier of a feature

        """
    ft = self._features.feature_type(fid)
    return ft

enable

Return True if the feature is enabled and/or set it.

Parameters
  • fid: (str) Identifier of a feature
  • value: (bool or None) Enable of disable the feature.
View Source
def enable(self, fid, value=None):
    """Return True if the feature is enabled and/or set it.

        :param fid: (str) Identifier of a feature
        :param value: (bool or None) Enable of disable the feature.

        """
    return self._features.enable(fid, value)

available

Return True if the feature is available and/or set it.

Parameters
  • fid: (str) Identifier of a feature
  • value: (bool or None) Make the feature available or not.
View Source
def available(self, fid, value=None):
    """Return True if the feature is available and/or set it.

        :param fid: (str) Identifier of a feature
        :param value: (bool or None) Make the feature available or not.

        """
    return self._features.available(fid, value)

description

Return the long description of the feature.

Parameters
  • fid: (str) Identifier of a feature
Returns
  • (str)
View Source
def description(self, fid):
    """Return the long description of the feature.

        :param fid: (str) Identifier of a feature
        :return: (str)

        """
    return self._features.description(fid)

brief

Return the brief description of the feature.

Parameters
  • fid: (str) Identifier of a feature
Returns
  • (str)
View Source
def brief(self, fid):
    """Return the brief description of the feature.

        :param fid: (str) Identifier of a feature
        :return: (str)

        """
    return self._features.brief(fid)

update_pip

Update the version of pip.

View Source
def update_pip(self):
    """Update the version of pip. """
    logging.info('Update pip, the package installer for Python:')
    try:
        process = sppasExecProcess()
        process.run(self._python + ' -m pip install --upgrade pip')
    except Exception as e:
        raise sppasInstallationError(str(e))

ckeck_pypis

Update the app config file for features depending on pip packages.

CAUTION: it is supposed that if the PIP dependency is satisfied, the feature can be enabled. It is currently True but it could be false...

Parameters
  • fid: (str) Identifier of a feature
Raises

sppasInstallationError

View Source
def ckeck_pypis(self):
    """Update the app config file for features depending on pip packages.

        CAUTION: it is supposed that if the PIP dependency is satisfied, the
        feature can be enabled. It is currently True but it could be false...

        :param fid: (str) Identifier of a feature
        :raises: sppasInstallationError

        """
    for fid in self._features.get_ids('deps'):
        for package, version in self._features.pypi(fid).items():
            cfg.set_feature(fid, self._show_pypi(package))
    cfg.save()

install

Process the installation.

None is used to install all feature types.

Parameters
  • feat_type: (str or None) Install features of the given type.
Returns
  • (list) Error messages.
View Source
def install(self, feat_type=None):
    """Process the installation.

        None is used to install all feature types.

        :param feat_type: (str or None) Install features of the given type.
        :return: (list) Error messages.

        """
    errors = list()
    self.__pvalue = 1
    self.__nb_steps = len(self._features.get_ids(feat_type))
    for step, fid in enumerate(self._features.get_ids(feat_type)):
        self.__pheader(self.__message('beginning_feature', fid))
        if self._features.available(fid) is False:
            pmesg = 'available_false'
        elif self._features.enable(fid) is False:
            if self._features.feature_type(fid) == 'deps':
                if cfg.feature_installed(fid) is False:
                    cfg.set_feature(fid, False)
            pmesg = 'enable_false'
        else:
            try:
                fid_output = self.__install_feature(fid)
                if len(fid_output) > 0:
                    msg = 'Installation of feature {} returned information message(s): '.format(fid)
                    errors.append(fid_output)
                    logging.warning(msg + fid_output)
            except sppasInstallationError as e:
                self._features.enable(fid, False)
                pmesg = 'install_failed'
                if self._features.feature_type(fid) == 'deps':
                    cfg.set_feature(fid, False)
                errors.append(str(e))
                logging.error(str(e))
            except NotImplementedError:
                self._features.available(fid, False)
                self._features.enable(fid, False)
                pmesg = 'install_failed'
                msg = 'Installation of feature {} is not implemented yet for this os.'.format(fid)
                errors.append(msg)
                logging.error(msg)
            except Exception as e:
                logging.debug(traceback.format_exc())
                self._features.enable(fid, False)
                pmesg = 'install_failed'
                msg = 'An un-expected error occurred: {:s}'.format(str(e))
                errors.append(msg)
                logging.error(msg)
            else:
                self._features.enable(fid, True)
                cfg.set_feature(fid, True)
                pmesg = 'install_success'
        self.__pvalue = int(100.0 * float(step + 1) / self.__nb_steps)
        self.__pupdate(pmesg, fid)
    cfg.save()
    return errors

install_resource

Install the given tar/zip file in the resources of SPPAS.

Parameters
  • web_url: (str) URL of the directory
  • file_path: (str) Tar/Zip filename to download and install
View Source
@staticmethod
def install_resource(web_url, file_path):
    """Install the given tar/zip file in the resources of SPPAS.

        :param web_url: (str) URL of the directory
        :param file_path: (str) Tar/Zip filename to download and install

        """
    url = web_url + file_path
    try:
        req = urllib.request.Request(url)
        err = ''
    except ValueError as e:
        err = str(e)
    else:
        tmp_file = os.path.join(paths.resources, file_path)
        try:
            response = urllib.request.urlopen(req)
            data = response.read()
        except urllib.error.URLError as e:
            logging.debug(traceback.format_exc())
            if hasattr(e, 'reason'):
                err = 'Failed to establish a connection to the url {}: {}'.format(url, e.reason)
            elif hasattr(e, 'code'):
                err = "The web server couldn't fulfill the request for url {}. Error code: {}".format(url, e.code)
            else:
                err = 'Unknown connection error.'
        except http.client.IncompleteRead:
            logging.debug(traceback.format_exc())
            err = 'The server closed the connection before sending the complete file. The file can be downloaded at the following URL: {} and uncompressed in {}.'.format(url, paths.resources)
        except Exception as e:
            logging.debug(traceback.format_exc())
            err = str(e)
        else:
            read_ok = False
            err = ''
            try:
                with open(tmp_file, 'wb') as out_file:
                    out_file.write(data)
                read_ok = True
            except:
                logging.debug(traceback.format_exc())
                try:
                    zipped = response.read()
                    with open(tmp_file, 'wb') as f:
                        f.write(zipped)
                    read_ok = True
                except Exception as e:
                    logging.debug(traceback.format_exc())
                    err = str(e)
            response.close()
            if read_ok:
                if tmp_file.lower().endswith('zip') is True:
                    err = Installer.__unzip(tmp_file)
                elif tmp_file.lower().endswith('tar') is True:
                    err = Installer.__untar(tmp_file)
                else:
                    err = 'Unsupported file extension for file {:s}. Expected zip or tar.'.format(tmp_file)
        if os.path.exists(tmp_file) is True and len(err) == 0:
            os.remove(tmp_file)
    if len(err) > 0:
        raise sppasInstallationError(err)

Private functions

_search_package

To be overridden. Return True if package is already installed.

Parameters
  • package: (str) The system package to search.
View Source
def _search_package(self, package):
    """To be overridden. Return True if package is already installed.

        :param package: (str) The system package to search.

        """
    raise NotImplementedError

_install_package

To be overridden. Install package.

Parameters
  • package: (str) The system package to install.
Returns
  • False or None
View Source
def _install_package(self, package):
    """To be overridden. Install package.

        :param package: (str) The system package to install.
        :returns: False or None

        """
    raise NotImplementedError

_version_package

To be overridden. Return True if the package is up to date.

Parameters
  • package: (str) The system package to search.
  • req_version: (str) The minimum version required.
View Source
def _version_package(self, package, req_version):
    """To be overridden. Return True if the package is up to date.

        :param package: (str) The system package to search.
        :param req_version: (str) The minimum version required.

        """
    raise NotImplementedError

_need_update_package

To be overridden. Return True if the package need to be updated.

Parameters
  • stdout_show: (str) The stdout of the command.
  • req_version: (str) The minimum version required.
View Source
@staticmethod
def _need_update_package(stdout_show, req_version):
    """To be overridden. Return True if the package need to be updated.

        :param stdout_show: (str) The stdout of the command.
        :param req_version: (str) The minimum version required.

        """
    raise NotImplementedError

_update_package

To be overridden. Update package.

Parameters
  • package: (str) The system package to update.
Returns
  • False or None
View Source
def _update_package(self, package, req_version):
    """To be overridden. Update package.

        :param package: (str) The system package to update.
        :returns: False or None

        """
    raise NotImplementedError

_show_pypi

Return True if given Pypi package is already installed.

Parameters
  • package: (str) The pip package to search.
View Source
def _show_pypi(self, package):
    """Return True if given Pypi package is already installed.

        :param package: (str) The pip package to search.

        """
    try:
        command = self._python + ' -m pip show ' + package
        process = sppasExecProcess()
        process.run(command)
        logging.debug('Search pypi return status: {}'.format(process.status()))
        err = process.error()
        stdout = process.out()
        stdout = stdout.replace("b''", '')
        if len(err) > 3 or len(stdout) == 0:
            return False
    except Exception as e:
        raise sppasInstallationError(str(e))
    return True

_install_pypi

Install a Python Pypi package.

Parameters
  • package: (str) The pip package to install
  • version: (str) The version constraint
Raises

sppasInstallationError

Returns
  • (str)
View Source
def _install_pypi(self, package, version, options=''):
    """Install a Python Pypi package.

        :param package: (str) The pip package to install
        :param version: (str) The version constraint
        :raises: sppasInstallationError
        :return: (str)

        """
    command = self._python + ' -m pip cache purge'
    process = sppasExecProcess()
    process.run(command)
    logging.debug('python pip cache purge return status: {}'.format(process.status()))
    logging.info('Try to install the stable version of {:s} python package.'.format(package))
    if 'env' not in self._python:
        command = self._python + ' -m pip install' + ' --only-binary=:all: ' + options + " '" + package + version + "' " + self._pypi_local_dist + ' --user --no-warn-script-location'
    else:
        command = self._python + ' -m pip install --only-binary=:all: ' + options + " '" + package + version + "' " + self._pypi_local_dist + ' --no-warn-script-location'
    process = sppasExecProcess()
    process.run(command)
    err = u(process.error().strip())
    stdout = process.out()
    if len(stdout) > 3:
        logging.info(stdout)
    if len(err) > 0:
        if len(options) > 0:
            logging.warning('An error occurred when installing: {} {} (stable)'.format(package, version))
            logging.warning("The stable package can't be installed from the official python package repository: {:s}".format(str(err)))
            err = self._install_prebuild_pypi(package, version, options)
        else:
            logging.error('The following error occurred: ' + err)
            logging.info(" ... but we'll attempt another way to install the package:")
            command = self._python + ' -m pip cache purge'
            process = sppasExecProcess()
            process.run(command)
            logging.debug(' ... ... python pip cache purge return status: {}'.format(process.status()))
            if 'env' not in self._python:
                command = self._python + " -m pip install '" + package + version + "' --user --no-warn-script-location"
            else:
                command = self._python + " -m pip install '" + package + version + "' --no-warn-script-location"
            process = sppasExecProcess()
            process.run(command)
            err = u(process.error().strip())
            stdout = process.out()
            if len(stdout) > 3:
                logging.info(stdout)
    if len(err) > 0:
        if self._version_pypi(package, version) is False:
            raise sppasInstallationError(err)
        else:
            return err
    return ''

_install_prebuild_pypi

Install a pre-build version of a Python Pypi package.

Parameters
  • package: (str) The pip package to install
  • version: (str) The version constraint
Raises

sppasInstallationError

View Source
def _install_prebuild_pypi(self, package, version, options=''):
    """Install a pre-build version of a Python Pypi package.

        :param package: (str) The pip package to install
        :param version: (str) The version constraint
        :raises: sppasInstallationError

        """
    command = self._python + ' -m pip cache purge'
    process = sppasExecProcess()
    process.run(command)
    logging.debug('python pip cache purge return status: {}'.format(process.status()))
    logging.info('Try to install a pre-build version of {:s} python package.'.format(package))
    if 'env' not in self._python:
        command = self._python + ' -m pip install --pre ' + options + " '" + package + version + "' --user --no-warn-script-location"
    else:
        command = self._python + ' -m pip install --pre ' + options + " '" + package + version + "' --no-warn-script-location"
    process = sppasExecProcess()
    process.run(command)
    err = u(process.error().strip())
    stdout = process.out()
    if len(stdout) > 3:
        logging.info(stdout)
    if len(err) > 0:
        if self._version_pypi(package, version) is False:
            raise sppasInstallationError(err)
        else:
            return err
    return ''

_version_pypi

Returns True if package is up-to-date.

Parameters
  • package: (str) The pip package to search.
  • req_version: (str) The minimum version required.
View Source
def _version_pypi(self, package, req_version):
    """Returns True if package is up-to-date.

        :param package: (str) The pip package to search.
        :param req_version: (str) The minimum version required.

        """
    try:
        command = self._python + ' -m pip show ' + package
        process = sppasExecProcess()
        process.run(command)
        logging.debug('Version pypi return status: {}'.format(process.status()))
        err = process.error()
        if len(err) > 3:
            return False
        stdout = process.out()
        return not Installer._need_update_pypi(stdout, req_version)
    except Exception:
        return False

_need_update_pypi

Return True if the package needs to be updated.

Parameters
  • stdout_show: (str) The stdout of the command.
  • req_version: (str) The minimum version required.
View Source
@staticmethod
def _need_update_pypi(stdout_show, req_version):
    """Return True if the package needs to be updated.

        :param stdout_show: (str) The stdout of the command.
        :param req_version: (str) The minimum version required.

        """
    stdout_show = str(stdout_show)
    req_version = str(req_version)
    version = stdout_show.split('\\r\\n')[1].split(':')[1].replace(' ', '')
    v = ''
    i = 0
    for letter in version:
        if letter.isalpha() is False:
            if letter == '.':
                i += 1
            if i == 2 or letter == ' ':
                break
            v += letter
        else:
            break
    req_version = req_version.split(';', maxsplit=1)
    comparator = req_version[0]
    comparator += '='
    v = v.strip()
    v = float(v)
    version = float(req_version[1])
    if comparator == '>=':
        return v < version
    raise ValueError('The comparator: ' + comparator + ' does not refer to a valid comparator')

_update_pypi

Update package.

Parameters
  • package: (str) The pip package to update.
  • version: (str) The version constraint
Raises

sppasInstallationError

View Source
def _update_pypi(self, package, version):
    """Update package.

        :param package: (str) The pip package to update.
        :param version: (str) The version constraint
        :raises: sppasInstallationError

        """
    try:
        command = self._python + ' -m pip cache purge'
        process = sppasExecProcess()
        process.run(command)
        logging.debug('python pip cache purge return status: {}'.format(process.status()))
        command = self._python + ' -m pip install ' + self._pypi_local_dist + " -U '" + package + version + "'"
        command += ' --no-warn-script-location'
        process = sppasExecProcess()
        process.run(command)
        logging.debug('Update pypi return status: {}'.format(process.status()))
    except Exception as e:
        raise sppasInstallationError(str(e))
    err = u(process.error().strip())
    stdout = u(process.out())
    if len(stdout) > 3:
        logging.info(stdout)
    if len(err) > 0:
        if self._version_pypi(package, version) is False:
            raise sppasInstallationError(err)
        else:
            return err
    return ''

Protected functions

__search_python_cmd

Search for a valid python command. Raise SystemError if not found.

View Source
def __search_python_cmd(self):
    """Search for a valid python command. Raise SystemError if not found."""
    logging.info("Search for a 'python' command that this installer can launch...")
    logging.info(' ... Try: {:s}'.format(sys.executable))
    command = quote(sys.executable)
    command += " -c 'import sys; print(sys.version_info.major)' "
    pyprocess = sppasExecProcess()
    try:
        pyprocess.run(command)
        out = pyprocess.out().replace("b'", '')
        out = out.replace("'", '')
        out = out.strip()
        logging.info('Command returned: {}'.format(out))
        if len(out) == 1:
            return quote(sys.executable)
    except Exception as e:
        logging.error(str(e))
    command = "python3 -c 'import sys; print(sys.version_info.major)' "
    logging.info(' ... Try: python3')
    try:
        pyprocess.run(command)
        out = pyprocess.out().strip()
        out = out.replace("b'", '')
        out = out.replace("'", '')
        out = out.strip()
        logging.info('Command returned: {}'.format(out))
        if len(out) == 1:
            pyversion = int(out)
            if pyversion == 3:
                return 'python3'
    except Exception as e:
        logging.error(str(e))
    command = "python -c 'import sys; print(sys.version_info.major)' "
    logging.info(' ... Try: python')
    try:
        pyprocess.run(command)
        out = pyprocess.out().replace("b'", '')
        out = out.replace("'", '')
        out = out.strip()
        logging.info('Command returned: {}'.format(out))
        if len(out) == 1:
            pyversion = int(out)
            if pyversion == 3:
                return 'python'
    except Exception as e:
        logging.error(str(e))
    raise SystemError('No valid python command can be invoked by the installer system.')

__install_feature

Install the given feature depending on its type.

Parameters
  • fid
View Source
def __install_feature(self, fid):
    """Install the given feature depending on its type.

        """
    ft = self._features.feature_type(fid)
    out_msg = list()
    if ft == 'deps':
        if len(self._features.packages(fid)) > 0:
            out_install = self.__install_packages(fid)
            if len(out_install) > 0:
                out_msg.append(out_install)
        if len(self._features.pypi(fid)) > 0:
            try:
                out_install = self.__install_pypis(fid)
                if len(out_install) > 0:
                    out_msg.append(out_install)
            except:
                out_install = self.__install_pypis(fid, alt=True)
                if len(out_install) > 0:
                    out_msg.append(out_install)
        if len(self._features.cmd(fid)) > 0:
            out_install = self.__install_cmd(fid)
            if len(out_install) > 0:
                out_msg.append(out_install)
    elif ft == 'lang':
        self.__install_lang(fid)
    elif ft == 'annot':
        self.__install_annot(fid)
    else:
        self._features.available(fid, False)
        raise sppasInstallationError('Unknown feature type {}.'.format(fid))
    return '\n\n'.join(out_msg)

__install_lang

Download, unzip and install resources for a given language.

Parameters
  • fid: (str) Identifier of a feature
Raises

sppasInstallationError

View Source
def __install_lang(self, fid):
    """Download, unzip and install resources for a given language.

        :param fid: (str) Identifier of a feature
        :raises: sppasInstallationError

        """
    self.__pmessage('Download, unzip and install linguistic resources for {} language'.format(fid))
    zip_path = self._features.lang(fid) + '.zip'
    url = paths.urlresources
    if url.endswith('/') is False:
        url += '/'
    url += 'lang/'
    Installer.install_resource(url, zip_path)

__install_annot

Download, unzip and install resources for a given annotation.

Parameters
  • fid: (str) Identifier of a feature
Raises

sppasInstallationError

View Source
def __install_annot(self, fid):
    """Download, unzip and install resources for a given annotation.

        :param fid: (str) Identifier of a feature
        :raises: sppasInstallationError

        """
    self.__pmessage('Download, unzip and install resources for {} annotation'.format(fid))
    tar_path = self._features.annot(fid) + '.tar'
    url = paths.urlresources
    if url.endswith('/') is False:
        url += '/'
    url += 'annot/'
    Installer.install_resource(url, tar_path)

__unzip

View Source
@staticmethod
def __unzip(zip_filename):
    dest = os.path.join(paths.resources)
    try:
        z = zipfile.ZipFile(zip_filename)
        z.extractall(dest)
        z.close()
    except zipfile.error as e:
        logging.debug(traceback.format_exc())
        return str(e)
    return ''

__untar

View Source
@staticmethod
def __untar(tar_filename):
    dest = os.path.join(paths.resources)
    try:
        with tarfile.open(tar_filename, 'r') as tf:
            tf.extractall(path=dest)
    except Exception as e:
        logging.debug(traceback.format_exc())
        return str(e)
    return ''

__install_cmd

Execute a system command for a feature.

Parameters
  • fid: (str) Identifier of a feature
Raises

sppasInstallationError

View Source
def __install_cmd(self, fid):
    """Execute a system command for a feature.

        :param fid: (str) Identifier of a feature
        :raises: sppasInstallationError

        """
    command = self._features.cmd(fid)
    logging.info("The installer is testing the command '{}' for feature {}".format(command, fid))
    try:
        process = sppasExecProcess()
        process.run(self._features.cmd(fid))
        err = u(process.error().strip())
        stdout = process.out()
        logging.info('Command return code is {}'.format(process.status()))
        if len(stdout) > 3:
            logging.info(stdout)
    except Exception as e:
        raise sppasInstallationError(str(e))
    if process.status() != 0:
        raise sppasInstallationError(err)
    return err

__install_packages

Manage installation of system packages.

Parameters
  • fid: (str) Identifier of a feature
Raises

sppasInstallationError

View Source
def __install_packages(self, fid):
    """Manage installation of system packages.

        :param fid: (str) Identifier of a feature
        :raises: sppasInstallationError

        """
    out_cmd = list()
    for package, version in self._features.packages(fid).items():
        logging.info(' ... package: {}'.format(package))
        if self._search_package(package) is False:
            logging.info(' ... ... is going to be installed.')
            out_install = self._install_package(package)
            if len(out_install) > 0:
                out_cmd.append(out_install)
        elif self._version_package(package, version) is False:
            logging.info(' ... ... is going to be updated.')
            try:
                out_install = self._update_package(package, version)
                if len(out_install) > 0:
                    out_cmd.append(out_install)
            except Exception as e:
                logging.error(' ... ... Update failed with the error: {}'.format(str(e)))
                out_cmd.append(str(e))
        else:
            logging.info(' ... ... is already installed and up-to-date.')
        self.__pvalue += self.__eval_step(fid) // self.__nb_steps
        self.__pupdate('install_success', package + version)
    return '\n'.join(out_cmd)

__install_pypis

Manage the installation of pip packages.

Parameters
  • fid: (str) Identifier of a feature
  • alt: (bool) Install alternative pipys instead of the regular ones.
Raises

sppasInstallationError

View Source
def __install_pypis(self, fid, alt=False):
    """Manage the installation of pip packages.

        :param fid: (str) Identifier of a feature
        :param alt: (bool) Install alternative pipys instead of the regular ones.
        :raises: sppasInstallationError

        """
    out_pip = list()
    pip_options = self._features.pypi_opt(fid)
    if alt is False:
        pip_items = self._features.pypi(fid).items()
    else:
        pip_items = self._features.pypi_alt(fid).items()
    for package, version in pip_items:
        logging.info(' ... python library: {}'.format(package))
        if self._show_pypi(package) is False:
            try:
                logging.info(' ... ... is going to be installed.')
                err = self._install_pypi(package, version, pip_options)
                if len(err) > 0:
                    out_pip.append(err)
            except sppasInstallationError as e:
                if 'DEPRECATION: ' in str(e):
                    logging.warning(' ... ... returned a warning message. {}'.format(str(e)))
                else:
                    raise
        elif self._version_pypi(package, version) is False:
            try:
                logging.info(' ... ... is going to be updated.')
                self._update_pypi(package, version)
            except sppasInstallationError as e:
                logging.error(' ... ... Update failed with the error: {}'.format(str(e)))
                out_pip.append(e)
        else:
            logging.info(' ... ... is already installed and up-to-date.')
        time.sleep(2)
        self.__pvalue += self.__eval_step(fid) // self.__nb_steps
        if self._show_pypi(package) is True:
            self.__pupdate('install_success', package + version)
        else:
            self.__pupdate('install_failed', package + version)
    return '\n'.join(out_pip)

__pheader

View Source
def __pheader(self, text):
    if self.__pbar is not None:
        self.__pbar.set_header(text)
        self.__pbar.set_fraction(self.__pvalue)
        self.__pbar.set_text('')
    logging.info(' ==> {:d} percents'.format(self.__pvalue))
    logging.info('    * * * * *   {}   * * * * * '.format(text))

__pmessage

View Source
def __pmessage(self, text):
    if self.__pbar is not None:
        self.__pbar.set_text(text)
    logging.info('  ==> {text}'.format(text=text))

__pfraction

View Source
def __pfraction(self):
    if self.__pbar is not None:
        self.__pbar.set_fraction(self.__pvalue)
    logging.info('  ==> {:d} percents'.format(self.__pvalue))

__pupdate

text is either a text or a mid.

Parameters
  • text
  • fid
View Source
def __pupdate(self, text, fid):
    """text is either a text or a mid."""
    msg = self.__message(text, fid)
    if self.__pvalue > 95:
        self.__pvalue = 100
    if self.__pbar is not None:
        self.__pbar.update(self.__pvalue, msg)
    logging.info(' ==> {:s} ({:d} percents)'.format(str(msg), self.__pvalue))

__message

View Source
def __message(self, mid, fid):
    if mid in MESSAGES:
        return MESSAGES[mid].format(name=fid)
    else:
        return mid

__eval_step

Return the percentage of 1 step in progression for a given feature.

Parameters
  • fid: (str) Identifier of a feature
Returns
  • (float)
View Source
def __eval_step(self, fid):
    """Return the percentage of 1 step in progression for a given feature.

        :param fid: (str) Identifier of a feature
        :return: (float)

        """
    nb_total = 0
    ft = self._features.feature_type(fid)
    if ft == 'deps':
        nb_cmd = 0
        if len(self._features.cmd(fid)) > 0:
            nb_cmd = 1
        nb_packages = len(self._features.packages(fid))
        nb_pypi = len(self._features.pypi(fid))
        nb_total = nb_cmd + nb_packages + nb_pypi
    elif ft == 'annot':
        if len(self._features.annot(fid)) > 0:
            nb_total = 1
    elif ft == 'lang':
        if len(self._features.lang(fid)) > 0:
            nb_total = 1
    if nb_total > 0:
        return int(round(1.0 / float(nb_total), 2) * 100.0)
    return 0