diff --git a/extension/ZAZPip_v0.10.2.oxt b/extension/ZAZPip_v0.10.2.oxt index 2c37398..6332e55 100644 Binary files a/extension/ZAZPip_v0.10.2.oxt and b/extension/ZAZPip_v0.10.2.oxt differ diff --git a/source/pip_runner.py b/source/pip_runner.py index 36ee398..f23a058 100644 --- a/source/pip_runner.py +++ b/source/pip_runner.py @@ -18,7 +18,6 @@ if TYPE_CHECKING: implementation_name = "net.elmau.zaz.pip.PipRunner" implementation_services = ("com.sun.star.task.Job",) - # endregion Constants @@ -51,10 +50,7 @@ class PipRunner(unohelper.Base, XJob): # region Implementation g_TypeTable = {} -# python loader looks for a static g_ImplementationHelper variable g_ImplementationHelper = unohelper.ImplementationHelper() -# add the FormatFactory class to the implementation container, -# which the loader uses to register/instantiate the component. g_ImplementationHelper.addImplementation(PipRunner, implementation_name, implementation_services) # endregion Implementation \ No newline at end of file diff --git a/source/pythonpath/easymacro.py b/source/pythonpath/easymacro.py index 5dc49b3..9b115a4 100644 --- a/source/pythonpath/easymacro.py +++ b/source/pythonpath/easymacro.py @@ -596,13 +596,24 @@ def call_macro(args, in_thread=False): result = _call_macro(args) return result - +def get_env(): + """ + Gets Environment used for subprocess. + """ + my_env = os.environ.copy() + py_path = "" + p_sep = ";" if IS_WIN else ":" + for d in sys.path: + py_path = py_path + d + p_sep + my_env["PYTHONPATH"] = py_path + return my_env + def run(command, capture=False, split=True): if not split: - return subprocess.check_output(command, shell=True).decode() + return subprocess.check_output(command, shell=True, env=get_env()).decode() cmd = shlex.split(command) - result = subprocess.run(cmd, capture_output=capture, text=True, shell=IS_WIN) + result = subprocess.run(cmd, capture_output=capture, text=True, shell=IS_WIN, env=get_env()) if capture: result = result.stdout else: @@ -613,7 +624,7 @@ def run(command, capture=False, split=True): def popen(command): try: proc = subprocess.Popen(shlex.split(command), shell=IS_WIN, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=get_env()) for line in proc.stdout: yield line.decode().rstrip() except Exception as e: @@ -6320,7 +6331,7 @@ class Paths(object): if IS_WIN: os.startfile(path) else: - pid = subprocess.Popen(['xdg-open', path]).pid + pid = subprocess.Popen(['xdg-open', path, ], env=get_env()).pid return @classmethod @@ -6904,7 +6915,7 @@ class LOServer(object): for i in range(3): self._server = subprocess.Popen(self.CMD, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=get_env()) time.sleep(3) if self.is_running: break diff --git a/source/pythonpath/install_flatpak.py b/source/pythonpath/install_flatpak.py index cd310fa..5f98199 100644 --- a/source/pythonpath/install_flatpak.py +++ b/source/pythonpath/install_flatpak.py @@ -2,11 +2,148 @@ from __future__ import annotations from typing import cast import os import sys +import tempfile import subprocess from pathlib import Path from typing import Any, List, Dict import easymacro as app -from install_pip_from_wheel import InstallPipFromWheel +from contextlib import contextmanager +import uno + + +@contextmanager +def change_dir(directory): + """ + A context manager that changes the current working directory to the specified directory + temporarily and then changes it back when the context is exited. + """ + current_dir = os.getcwd() + os.chdir(directory) + try: + yield + finally: + os.chdir(current_dir) + + +class InstallPipFromWheel: + """Download and install PIP from wheel url""" + + def __init__(self, pip_wheel_url: str, lo_identifier: str) -> None: + self._pip_url = pip_wheel_url + self._lo_identifier = lo_identifier + + def install(self, dst: str | Path = "") -> None: + """ + Install pip from wheel file. + + Downloads the pip wheel file from the url provided in the config file and unzips it to the destination directory. + + Args: + dst (str | Path, Optional): The destination directory where the pip wheel file will be installed. If not provided, the ``pythonpath`` location will be used. + + Returns: + None: + + Raises: + None: + """ + if not self._pip_url: + app.error("PIP installation has failed - No wheel url") + return + + if not dst: + root_pth = Path(app.Paths.from_id(self._lo_identifier)) + dst = root_pth / "pythonpath" + + with tempfile.TemporaryDirectory() as temp_dir: + # temp_dir = tempfile.gettempdir() + path_pip = Path(temp_dir) + + filename = path_pip / "pip-wheel.whl" + + data, _, err = app.url_open(self._pip_url, verify=False) + if err: + app.error("Unable to download PIP installation wheel file") + return + filename.write_bytes(data) + + if filename.exists(): + app.info("PIP wheel file has been saved") + else: + app.error("PIP wheel file has not been saved") + return + + try: + self._unzip_wheel(filename=filename, dst=dst) + except Exception: + return + # now that pip has been installed from wheel force a reinstall to ensure it is the latest version + self._force_install_pip() + + def _unzip_wheel(self, filename: Path, dst: str | Path) -> None: + """Unzip the downloaded wheel file""" + if isinstance(dst, str): + dst = Path(dst) + try: + # app.zip.unzip(source=str(filename), target=str(dst)) + self.unzip_file(zip_file=filename, dest_dir=dst) + if dst.exists(): + app.debug(f"PIP wheel file has been unzipped to {dst}") + else: + app.error("PIP wheel file has not been unzipped") + raise Exception("PIP wheel file has not been unzipped") + except Exception as err: + app.error(f"Unable to unzip wheel file: {err}", exc_info=True) + raise + + def _force_install_pip(self) -> None: + """Now that pip has been installed, force reinstall it to ensure it is the latest version""" + cmd = [app.paths.python, "-m", "pip", "install", "--upgrade", "pip"] + app.popen(command=" ".join(cmd)) + + def unzip_file(self, zip_file: str | Path, dest_dir: str | Path = "") -> None: + """ + Unzip the given zip file to the specified destination directory. + + Args: + zip_file (str | Path): The zip file to unzip. + dest_dir (str | Path, optional): The destination directory to unzip to. + + Returns: + None: + """ + from zipfile import ZipFile + + zip_file_path = Path(zip_file) if isinstance(zip_file, str) else zip_file + if not zip_file_path.is_file(): + raise ValueError(f"Expected file, got '{zip_file_path}'") + if not zip_file_path.is_absolute(): + zip_file_path = zip_file_path.absolute() + if not zip_file_path.exists(): + raise FileNotFoundError(f"File '{zip_file_path}' not found") + + if isinstance(dest_dir, str): + dest_dir = zip_file_path.parent if dest_dir == "" else Path(dest_dir) + else: + dest_dir = dest_dir.absolute() + + if not dest_dir.is_dir(): + raise ValueError(f"Expected folder, got '{dest_dir}'") + if not dest_dir.exists(): + try: + dest_dir.mkdir(parents=True) + except Exception as e: + raise FileNotFoundError( + f"Folder '{dest_dir}' not found, unable to create folder." + ) from e + if not dest_dir.exists(): + raise FileNotFoundError(f"Folder '{dest_dir}' not found") + + with change_dir(dest_dir): + with ZipFile(zip_file_path) as f: + f.extractall(dest_dir) + # with change_dir(dest_dir): + # shutil.unpack_archive(zip_file_path, dest_dir) class FlatpakInstaller: @@ -53,7 +190,9 @@ class FlatpakInstaller: def _install_wheel(self) -> bool: result = False - installer = InstallPipFromWheel(pip_wheel_url=self._pip_url, lo_identifier=self._lo_identifier) + installer = InstallPipFromWheel( + pip_wheel_url=self._pip_url, lo_identifier=self._lo_identifier + ) try: installer.install(self._site_packages) if self._site_packages not in sys.path: @@ -64,12 +203,12 @@ class FlatpakInstaller: return result return result - def is_pip_installed(self) -> bool: """Check if PIP is installed.""" # cmd = self._cmd_pip("--version") # cmd = '"{}" -m pip -V'.format(self.path_python) cmd = [str(self.path_python), "-m", "pip", "-V"] - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self._get_env()) + result = subprocess.run( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self._get_env() + ) return result.returncode == 0 - diff --git a/source/pythonpath/install_pip_from_wheel.py b/source/pythonpath/install_pip_from_wheel.py deleted file mode 100644 index 3f9950b..0000000 --- a/source/pythonpath/install_pip_from_wheel.py +++ /dev/null @@ -1,141 +0,0 @@ -from __future__ import annotations -import os -from typing import Any -import tempfile -from pathlib import Path -import easymacro as app -from contextlib import contextmanager -import uno - - -@contextmanager -def change_dir(directory): - """ - A context manager that changes the current working directory to the specified directory - temporarily and then changes it back when the context is exited. - """ - current_dir = os.getcwd() - os.chdir(directory) - try: - yield - finally: - os.chdir(current_dir) - - -class InstallPipFromWheel: - """Download and install PIP from wheel url""" - - def __init__(self, pip_wheel_url: str, lo_identifier: str) -> None: - self._pip_url = pip_wheel_url - self._lo_identifier = lo_identifier - - def install(self, dst: str | Path = "") -> None: - """ - Install pip from wheel file. - - Downloads the pip wheel file from the url provided in the config file and unzips it to the destination directory. - - Args: - dst (str | Path, Optional): The destination directory where the pip wheel file will be installed. If not provided, the ``pythonpath`` location will be used. - - Returns: - None: - - Raises: - None: - """ - if not self._pip_url: - app.error("PIP installation has failed - No wheel url") - return - - if not dst: - root_pth = Path(app.Paths.from_id(self._lo_identifier)) - dst = root_pth / "pythonpath" - - with tempfile.TemporaryDirectory() as temp_dir: - # temp_dir = tempfile.gettempdir() - path_pip = Path(temp_dir) - - filename = path_pip / "pip-wheel.whl" - - data, _, err = app.url_open(self._pip_url, verify=False) - if err: - app.error("Unable to download PIP installation wheel file") - return - filename.write_bytes(data) - - if filename.exists(): - app.info("PIP wheel file has been saved") - else: - app.error("PIP wheel file has not been saved") - return - - try: - self._unzip_wheel(filename=filename, dst=dst) - except Exception: - return - # now that pip has been installed from wheel force a reinstall to ensure it is the latest version - self._force_install_pip() - - def _unzip_wheel(self, filename: Path, dst: str | Path) -> None: - """Unzip the downloaded wheel file""" - if isinstance(dst, str): - dst = Path(dst) - try: - # app.zip.unzip(source=str(filename), target=str(dst)) - self.unzip_file(zip_file=filename, dest_dir=dst) - if dst.exists(): - app.debug(f"PIP wheel file has been unzipped to {dst}") - else: - app.error("PIP wheel file has not been unzipped") - raise Exception("PIP wheel file has not been unzipped") - except Exception as err: - app.error(f"Unable to unzip wheel file: {err}", exc_info=True) - raise - - def _force_install_pip(self) -> None: - """Now that pip has been installed, force reinstall it to ensure it is the latest version""" - cmd = [app.paths.python, "-m", "pip", "install", "--upgrade", "pip"] - app.popen(command=" ".join(cmd)) - - def unzip_file(self, zip_file: str | Path, dest_dir: str | Path = "") -> None: - """ - Unzip the given zip file to the specified destination directory. - - Args: - zip_file (str | Path): The zip file to unzip. - dest_dir (str | Path, optional): The destination directory to unzip to. - - Returns: - None: - """ - from zipfile import ZipFile - - zip_file_path = Path(zip_file) if isinstance(zip_file, str) else zip_file - if not zip_file_path.is_file(): - raise ValueError(f"Expected file, got '{zip_file_path}'") - if not zip_file_path.is_absolute(): - zip_file_path = zip_file_path.absolute() - if not zip_file_path.exists(): - raise FileNotFoundError(f"File '{zip_file_path}' not found") - - if isinstance(dest_dir, str): - dest_dir = zip_file_path.parent if dest_dir == "" else Path(dest_dir) - else: - dest_dir = dest_dir.absolute() - - if not dest_dir.is_dir(): - raise ValueError(f"Expected folder, got '{dest_dir}'") - if not dest_dir.exists(): - try: - dest_dir.mkdir(parents=True) - except Exception as e: - raise FileNotFoundError(f"Folder '{dest_dir}' not found, unable to create folder.") from e - if not dest_dir.exists(): - raise FileNotFoundError(f"Folder '{dest_dir}' not found") - - with change_dir(dest_dir): - with ZipFile(zip_file_path) as f: - f.extractall(dest_dir) - # with change_dir(dest_dir): - # shutil.unpack_archive(zip_file_path, dest_dir) diff --git a/source/pythonpath/main.py b/source/pythonpath/main.py index bd66353..e0c47ce 100644 --- a/source/pythonpath/main.py +++ b/source/pythonpath/main.py @@ -120,7 +120,7 @@ class Controllers(object): self.d.lbl_pip.value = label self.d.cmd_install_pip.visible = False self.d.cmd_admin_pip.visible = True - msg = _('PIP was installed sucessfully') + msg = _('PIP was installed successfully') app.msgbox(msg) else: msg = _('PIP installation has failed, see log') @@ -131,9 +131,25 @@ class Controllers(object): return def _install_pip_flatpak(self) -> None: - from install_flatpak import FlatpakInstaller - installer = FlatpakInstaller(pip_wheel_url=URL_PIP_WHEEL, lo_identifier=ID_EXTENSION) - installer.install_pip() + try: + from install_flatpak import FlatpakInstaller + installer = FlatpakInstaller(pip_wheel_url=URL_PIP_WHEEL, lo_identifier=ID_EXTENSION) + installer.install_pip() + msg = _('PIP was installed successfully') + # app.msgbox(msg) + cmd = self._cmd_pip('-V') + label = app.run(cmd, True) + if label: + self.d.lbl_pip.value = label + self.d.cmd_install_pip.visible = False + self.d.cmd_admin_pip.visible = True + msg = _('PIP was installed successfully') + app.msgbox(msg) + else: + msg = _('PIP installation has failed, see log') + app.warning(msg) + except Exception as e: + app.errorbox(e) @app.run_in_thread def _install_pip(self): @@ -244,7 +260,10 @@ class Controllers(object): self.d.lst_log.visible = True line = '' - cmd = ' install --upgrade --user' + if self.is_flatpak: + cmd = f' install --upgrade --target={app.get_flatpak_site_packages_dir()}' + else: + cmd = ' install --upgrade --user' if value: name = value.split(' ')[0].strip() cmd = self._cmd_pip(f'{cmd} {name}')