working with flatpak

This commit is contained in:
Barry-Thomas-Paul: Moss 2023-10-31 19:28:22 -04:00
parent 4aa0ca68c8
commit 7b9ac8eee0
6 changed files with 185 additions and 161 deletions

Binary file not shown.

View File

@ -18,7 +18,6 @@ if TYPE_CHECKING:
implementation_name = "net.elmau.zaz.pip.PipRunner"
implementation_services = ("com.sun.star.task.Job",)
# endregion Constants
@ -51,10 +50,7 @@ class PipRunner(unohelper.Base, XJob):
# region Implementation
g_TypeTable = {}
# python loader looks for a static g_ImplementationHelper variable
g_ImplementationHelper = unohelper.ImplementationHelper()
# add the FormatFactory class to the implementation container,
# which the loader uses to register/instantiate the component.
g_ImplementationHelper.addImplementation(PipRunner, implementation_name, implementation_services)
# endregion Implementation

View File

@ -596,13 +596,24 @@ def call_macro(args, in_thread=False):
result = _call_macro(args)
return result
def get_env():
"""
Gets Environment used for subprocess.
"""
my_env = os.environ.copy()
py_path = ""
p_sep = ";" if IS_WIN else ":"
for d in sys.path:
py_path = py_path + d + p_sep
my_env["PYTHONPATH"] = py_path
return my_env
def run(command, capture=False, split=True):
if not split:
return subprocess.check_output(command, shell=True).decode()
return subprocess.check_output(command, shell=True, env=get_env()).decode()
cmd = shlex.split(command)
result = subprocess.run(cmd, capture_output=capture, text=True, shell=IS_WIN)
result = subprocess.run(cmd, capture_output=capture, text=True, shell=IS_WIN, env=get_env())
if capture:
result = result.stdout
else:
@ -613,7 +624,7 @@ def run(command, capture=False, split=True):
def popen(command):
try:
proc = subprocess.Popen(shlex.split(command), shell=IS_WIN,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=get_env())
for line in proc.stdout:
yield line.decode().rstrip()
except Exception as e:
@ -6320,7 +6331,7 @@ class Paths(object):
if IS_WIN:
os.startfile(path)
else:
pid = subprocess.Popen(['xdg-open', path]).pid
pid = subprocess.Popen(['xdg-open', path, ], env=get_env()).pid
return
@classmethod
@ -6904,7 +6915,7 @@ class LOServer(object):
for i in range(3):
self._server = subprocess.Popen(self.CMD,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=get_env())
time.sleep(3)
if self.is_running:
break

View File

@ -2,11 +2,148 @@ from __future__ import annotations
from typing import cast
import os
import sys
import tempfile
import subprocess
from pathlib import Path
from typing import Any, List, Dict
import easymacro as app
from install_pip_from_wheel import InstallPipFromWheel
from contextlib import contextmanager
import uno
@contextmanager
def change_dir(directory):
"""
A context manager that changes the current working directory to the specified directory
temporarily and then changes it back when the context is exited.
"""
current_dir = os.getcwd()
os.chdir(directory)
try:
yield
finally:
os.chdir(current_dir)
class InstallPipFromWheel:
"""Download and install PIP from wheel url"""
def __init__(self, pip_wheel_url: str, lo_identifier: str) -> None:
self._pip_url = pip_wheel_url
self._lo_identifier = lo_identifier
def install(self, dst: str | Path = "") -> None:
"""
Install pip from wheel file.
Downloads the pip wheel file from the url provided in the config file and unzips it to the destination directory.
Args:
dst (str | Path, Optional): The destination directory where the pip wheel file will be installed. If not provided, the ``pythonpath`` location will be used.
Returns:
None:
Raises:
None:
"""
if not self._pip_url:
app.error("PIP installation has failed - No wheel url")
return
if not dst:
root_pth = Path(app.Paths.from_id(self._lo_identifier))
dst = root_pth / "pythonpath"
with tempfile.TemporaryDirectory() as temp_dir:
# temp_dir = tempfile.gettempdir()
path_pip = Path(temp_dir)
filename = path_pip / "pip-wheel.whl"
data, _, err = app.url_open(self._pip_url, verify=False)
if err:
app.error("Unable to download PIP installation wheel file")
return
filename.write_bytes(data)
if filename.exists():
app.info("PIP wheel file has been saved")
else:
app.error("PIP wheel file has not been saved")
return
try:
self._unzip_wheel(filename=filename, dst=dst)
except Exception:
return
# now that pip has been installed from wheel force a reinstall to ensure it is the latest version
self._force_install_pip()
def _unzip_wheel(self, filename: Path, dst: str | Path) -> None:
"""Unzip the downloaded wheel file"""
if isinstance(dst, str):
dst = Path(dst)
try:
# app.zip.unzip(source=str(filename), target=str(dst))
self.unzip_file(zip_file=filename, dest_dir=dst)
if dst.exists():
app.debug(f"PIP wheel file has been unzipped to {dst}")
else:
app.error("PIP wheel file has not been unzipped")
raise Exception("PIP wheel file has not been unzipped")
except Exception as err:
app.error(f"Unable to unzip wheel file: {err}", exc_info=True)
raise
def _force_install_pip(self) -> None:
"""Now that pip has been installed, force reinstall it to ensure it is the latest version"""
cmd = [app.paths.python, "-m", "pip", "install", "--upgrade", "pip"]
app.popen(command=" ".join(cmd))
def unzip_file(self, zip_file: str | Path, dest_dir: str | Path = "") -> None:
"""
Unzip the given zip file to the specified destination directory.
Args:
zip_file (str | Path): The zip file to unzip.
dest_dir (str | Path, optional): The destination directory to unzip to.
Returns:
None:
"""
from zipfile import ZipFile
zip_file_path = Path(zip_file) if isinstance(zip_file, str) else zip_file
if not zip_file_path.is_file():
raise ValueError(f"Expected file, got '{zip_file_path}'")
if not zip_file_path.is_absolute():
zip_file_path = zip_file_path.absolute()
if not zip_file_path.exists():
raise FileNotFoundError(f"File '{zip_file_path}' not found")
if isinstance(dest_dir, str):
dest_dir = zip_file_path.parent if dest_dir == "" else Path(dest_dir)
else:
dest_dir = dest_dir.absolute()
if not dest_dir.is_dir():
raise ValueError(f"Expected folder, got '{dest_dir}'")
if not dest_dir.exists():
try:
dest_dir.mkdir(parents=True)
except Exception as e:
raise FileNotFoundError(
f"Folder '{dest_dir}' not found, unable to create folder."
) from e
if not dest_dir.exists():
raise FileNotFoundError(f"Folder '{dest_dir}' not found")
with change_dir(dest_dir):
with ZipFile(zip_file_path) as f:
f.extractall(dest_dir)
# with change_dir(dest_dir):
# shutil.unpack_archive(zip_file_path, dest_dir)
class FlatpakInstaller:
@ -53,7 +190,9 @@ class FlatpakInstaller:
def _install_wheel(self) -> bool:
result = False
installer = InstallPipFromWheel(pip_wheel_url=self._pip_url, lo_identifier=self._lo_identifier)
installer = InstallPipFromWheel(
pip_wheel_url=self._pip_url, lo_identifier=self._lo_identifier
)
try:
installer.install(self._site_packages)
if self._site_packages not in sys.path:
@ -64,12 +203,12 @@ class FlatpakInstaller:
return result
return result
def is_pip_installed(self) -> bool:
"""Check if PIP is installed."""
# cmd = self._cmd_pip("--version")
# cmd = '"{}" -m pip -V'.format(self.path_python)
cmd = [str(self.path_python), "-m", "pip", "-V"]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self._get_env())
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self._get_env()
)
return result.returncode == 0

View File

@ -1,141 +0,0 @@
from __future__ import annotations
import os
from typing import Any
import tempfile
from pathlib import Path
import easymacro as app
from contextlib import contextmanager
import uno
@contextmanager
def change_dir(directory):
"""
A context manager that changes the current working directory to the specified directory
temporarily and then changes it back when the context is exited.
"""
current_dir = os.getcwd()
os.chdir(directory)
try:
yield
finally:
os.chdir(current_dir)
class InstallPipFromWheel:
"""Download and install PIP from wheel url"""
def __init__(self, pip_wheel_url: str, lo_identifier: str) -> None:
self._pip_url = pip_wheel_url
self._lo_identifier = lo_identifier
def install(self, dst: str | Path = "") -> None:
"""
Install pip from wheel file.
Downloads the pip wheel file from the url provided in the config file and unzips it to the destination directory.
Args:
dst (str | Path, Optional): The destination directory where the pip wheel file will be installed. If not provided, the ``pythonpath`` location will be used.
Returns:
None:
Raises:
None:
"""
if not self._pip_url:
app.error("PIP installation has failed - No wheel url")
return
if not dst:
root_pth = Path(app.Paths.from_id(self._lo_identifier))
dst = root_pth / "pythonpath"
with tempfile.TemporaryDirectory() as temp_dir:
# temp_dir = tempfile.gettempdir()
path_pip = Path(temp_dir)
filename = path_pip / "pip-wheel.whl"
data, _, err = app.url_open(self._pip_url, verify=False)
if err:
app.error("Unable to download PIP installation wheel file")
return
filename.write_bytes(data)
if filename.exists():
app.info("PIP wheel file has been saved")
else:
app.error("PIP wheel file has not been saved")
return
try:
self._unzip_wheel(filename=filename, dst=dst)
except Exception:
return
# now that pip has been installed from wheel force a reinstall to ensure it is the latest version
self._force_install_pip()
def _unzip_wheel(self, filename: Path, dst: str | Path) -> None:
"""Unzip the downloaded wheel file"""
if isinstance(dst, str):
dst = Path(dst)
try:
# app.zip.unzip(source=str(filename), target=str(dst))
self.unzip_file(zip_file=filename, dest_dir=dst)
if dst.exists():
app.debug(f"PIP wheel file has been unzipped to {dst}")
else:
app.error("PIP wheel file has not been unzipped")
raise Exception("PIP wheel file has not been unzipped")
except Exception as err:
app.error(f"Unable to unzip wheel file: {err}", exc_info=True)
raise
def _force_install_pip(self) -> None:
"""Now that pip has been installed, force reinstall it to ensure it is the latest version"""
cmd = [app.paths.python, "-m", "pip", "install", "--upgrade", "pip"]
app.popen(command=" ".join(cmd))
def unzip_file(self, zip_file: str | Path, dest_dir: str | Path = "") -> None:
"""
Unzip the given zip file to the specified destination directory.
Args:
zip_file (str | Path): The zip file to unzip.
dest_dir (str | Path, optional): The destination directory to unzip to.
Returns:
None:
"""
from zipfile import ZipFile
zip_file_path = Path(zip_file) if isinstance(zip_file, str) else zip_file
if not zip_file_path.is_file():
raise ValueError(f"Expected file, got '{zip_file_path}'")
if not zip_file_path.is_absolute():
zip_file_path = zip_file_path.absolute()
if not zip_file_path.exists():
raise FileNotFoundError(f"File '{zip_file_path}' not found")
if isinstance(dest_dir, str):
dest_dir = zip_file_path.parent if dest_dir == "" else Path(dest_dir)
else:
dest_dir = dest_dir.absolute()
if not dest_dir.is_dir():
raise ValueError(f"Expected folder, got '{dest_dir}'")
if not dest_dir.exists():
try:
dest_dir.mkdir(parents=True)
except Exception as e:
raise FileNotFoundError(f"Folder '{dest_dir}' not found, unable to create folder.") from e
if not dest_dir.exists():
raise FileNotFoundError(f"Folder '{dest_dir}' not found")
with change_dir(dest_dir):
with ZipFile(zip_file_path) as f:
f.extractall(dest_dir)
# with change_dir(dest_dir):
# shutil.unpack_archive(zip_file_path, dest_dir)

View File

@ -120,7 +120,7 @@ class Controllers(object):
self.d.lbl_pip.value = label
self.d.cmd_install_pip.visible = False
self.d.cmd_admin_pip.visible = True
msg = _('PIP was installed sucessfully')
msg = _('PIP was installed successfully')
app.msgbox(msg)
else:
msg = _('PIP installation has failed, see log')
@ -131,9 +131,25 @@ class Controllers(object):
return
def _install_pip_flatpak(self) -> None:
from install_flatpak import FlatpakInstaller
installer = FlatpakInstaller(pip_wheel_url=URL_PIP_WHEEL, lo_identifier=ID_EXTENSION)
installer.install_pip()
try:
from install_flatpak import FlatpakInstaller
installer = FlatpakInstaller(pip_wheel_url=URL_PIP_WHEEL, lo_identifier=ID_EXTENSION)
installer.install_pip()
msg = _('PIP was installed successfully')
# app.msgbox(msg)
cmd = self._cmd_pip('-V')
label = app.run(cmd, True)
if label:
self.d.lbl_pip.value = label
self.d.cmd_install_pip.visible = False
self.d.cmd_admin_pip.visible = True
msg = _('PIP was installed successfully')
app.msgbox(msg)
else:
msg = _('PIP installation has failed, see log')
app.warning(msg)
except Exception as e:
app.errorbox(e)
@app.run_in_thread
def _install_pip(self):
@ -244,7 +260,10 @@ class Controllers(object):
self.d.lst_log.visible = True
line = ''
cmd = ' install --upgrade --user'
if self.is_flatpak:
cmd = f' install --upgrade --target={app.get_flatpak_site_packages_dir()}'
else:
cmd = ' install --upgrade --user'
if value:
name = value.split(' ')[0].strip()
cmd = self._cmd_pip(f'{cmd} {name}')