Merge pull request 'flatpak' (#13) from AmourSpirit/zaz-pip:flatpak into develop

Reviewed-on: #13
This commit is contained in:
Mauricio 2023-11-01 19:18:49 -06:00
commit c93c182953
6 changed files with 358 additions and 10 deletions

View File

@ -1,6 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest:manifest xmlns:manifest="urn:oasis:names:tc:opendocument:xmlns:manifest:1.0" xmlns:loext="urn:org:documentfoundation:names:experimental:office:xmlns:loext:1.0" manifest:version="1.2">
<manifest:file-entry manifest:full-path="job.xcu" manifest:media-type="application/vnd.sun.star.configuration-data" />
<manifest:file-entry manifest:full-path="ZAZPip.py" manifest:media-type="application/vnd.sun.star.uno-component;type=Python"/>
<manifest:file-entry manifest:full-path="pip_runner.py" manifest:media-type="application/vnd.sun.star.uno-component;type=Python" />
<manifest:file-entry manifest:full-path="Office/Accelerators.xcu" manifest:media-type="application/vnd.sun.star.configuration-data"/>
<manifest:file-entry manifest:full-path="Addons.xcu" manifest:media-type="application/vnd.sun.star.configuration-data"/>
</manifest:manifest>

20
source/job.xcu Normal file
View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE oor:component-data SYSTEM "../../../../component-update.dtd">
<oor:component-data oor:name="Jobs" oor:package="org.openoffice.Office"
xmlns:oor="http://openoffice.org/2001/registry" xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<node oor:name="Jobs">
<node oor:name="PipRunnerJob" oor:op="fuse">
<prop oor:name="Service">
<value>net.elmau.zaz.pip.PipRunner</value>
</prop>
</node>
</node>
<node oor:name="Events">
<node oor:name="OnStartApp" oor:op="fuse">
<node oor:name="JobList">
<node oor:name="PipRunnerJob" oor:op="fuse" />
</node>
</node>
</node>
</oor:component-data>

56
source/pip_runner.py Normal file
View File

@ -0,0 +1,56 @@
# region Imports
from __future__ import unicode_literals, annotations
import os
import sys
from typing import TYPE_CHECKING, Tuple
from pathlib import Path
import uno
import unohelper
from com.sun.star.task import XJob
if TYPE_CHECKING:
# just for design time
from com.sun.star.beans import NamedValue
# endregion Imports
# region Constants
implementation_name = "net.elmau.zaz.pip.PipRunner"
implementation_services = ("com.sun.star.task.Job",)
# endregion Constants
# region XJob
class PipRunner(unohelper.Base, XJob):
def __init__(self, ctx):
self._is_flatpak = bool(os.getenv("FLATPAK_ID", ""))
if self._is_flatpak:
site_packages = self.get_flatpak_site_packages_dir()
ss = str(site_packages)
if site_packages.exists() and ss not in sys.path:
sys.path.append(ss)
def execute(self, *args: Tuple[NamedValue, ...]) -> None:
pass
def get_flatpak_site_packages_dir(self) -> Path:
# should never be all users
sand_box = os.getenv("FLATPAK_SANDBOX_DIR", "") or str(
Path.home() / ".var/app/org.libreoffice.LibreOffice/sandbox"
)
major_minor = f"{sys.version_info.major}.{sys.version_info.minor}"
site_packages = Path(sand_box) / f"lib/python{major_minor}/site-packages"
site_packages.mkdir(parents=True, exist_ok=True)
return site_packages
# endregion XJob
# region Implementation
g_TypeTable = {}
g_ImplementationHelper = unohelper.ImplementationHelper()
g_ImplementationHelper.addImplementation(PipRunner, implementation_name, implementation_services)
# endregion Implementation

View File

@ -167,6 +167,7 @@ OBJ_GRAPHIC = 'SwXTextGraphicObject'
OBJ_TEXTS = 'SwXTextRanges'
OBJ_TEXT = 'SwXTextRange'
IS_FLATPAK = bool(os.getenv("FLATPAK_ID", ""))
# ~ from com.sun.star.sheet.FilterOperator import EMPTY, NO_EMPTY, EQUAL, NOT_EQUAL
@ -595,13 +596,24 @@ def call_macro(args, in_thread=False):
result = _call_macro(args)
return result
def get_env():
"""
Gets Environment used for subprocess.
"""
my_env = os.environ.copy()
py_path = ""
p_sep = ";" if IS_WIN else ":"
for d in sys.path:
py_path = py_path + d + p_sep
my_env["PYTHONPATH"] = py_path
return my_env
def run(command, capture=False, split=True):
if not split:
return subprocess.check_output(command, shell=True).decode()
return subprocess.check_output(command, shell=True, env=get_env()).decode()
cmd = shlex.split(command)
result = subprocess.run(cmd, capture_output=capture, text=True, shell=IS_WIN)
result = subprocess.run(cmd, capture_output=capture, text=True, shell=IS_WIN, env=get_env())
if capture:
result = result.stdout
else:
@ -612,7 +624,7 @@ def run(command, capture=False, split=True):
def popen(command):
try:
proc = subprocess.Popen(shlex.split(command), shell=IS_WIN,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=get_env())
for line in proc.stdout:
yield line.decode().rstrip()
except Exception as e:
@ -624,6 +636,15 @@ def sleep(seconds):
time.sleep(seconds)
return
def get_flatpak_site_packages_dir() -> str:
# should never be all users
sand_box = os.getenv("FLATPAK_SANDBOX_DIR", "") or str(
Path.home() / ".var/app/org.libreoffice.LibreOffice/sandbox"
)
major_minor = f"{sys.version_info.major}.{sys.version_info.minor}"
site_packages = Path(sand_box) / f"lib/python{major_minor}/site-packages"
site_packages.mkdir(parents=True, exist_ok=True)
return str(site_packages)
class TimerThread(threading.Thread):
@ -4146,6 +4167,7 @@ class EventsFocus(EventsListenerBase, XFocusListener):
if service in self.CONTROLS:
obj = event.Source.Model
obj.BackgroundColor = COLOR_ON_FOCUS
obj.TextColor = TEXT_COLOR_ON_FOCUS
return
def focusLost(self, event):
@ -6310,7 +6332,7 @@ class Paths(object):
if IS_WIN:
os.startfile(path)
else:
pid = subprocess.Popen(['xdg-open', path]).pid
pid = subprocess.Popen(['xdg-open', path, ], env=get_env()).pid
return
@classmethod
@ -6843,6 +6865,7 @@ def get_color(value):
COLOR_ON_FOCUS = get_color('LightYellow')
TEXT_COLOR_ON_FOCUS = get_color('black')
class LOServer(object):
@ -6894,7 +6917,7 @@ class LOServer(object):
for i in range(3):
self._server = subprocess.Popen(self.CMD,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=get_env())
time.sleep(3)
if self.is_running:
break

View File

@ -0,0 +1,214 @@
from __future__ import annotations
from typing import cast
import os
import sys
import tempfile
import subprocess
from pathlib import Path
from typing import Any, List, Dict
import easymacro as app
from contextlib import contextmanager
import uno
@contextmanager
def change_dir(directory):
"""
A context manager that changes the current working directory to the specified directory
temporarily and then changes it back when the context is exited.
"""
current_dir = os.getcwd()
os.chdir(directory)
try:
yield
finally:
os.chdir(current_dir)
class InstallPipFromWheel:
"""Download and install PIP from wheel url"""
def __init__(self, pip_wheel_url: str, lo_identifier: str) -> None:
self._pip_url = pip_wheel_url
self._lo_identifier = lo_identifier
def install(self, dst: str | Path = "") -> None:
"""
Install pip from wheel file.
Downloads the pip wheel file from the url provided in the config file and unzips it to the destination directory.
Args:
dst (str | Path, Optional): The destination directory where the pip wheel file will be installed. If not provided, the ``pythonpath`` location will be used.
Returns:
None:
Raises:
None:
"""
if not self._pip_url:
app.error("PIP installation has failed - No wheel url")
return
if not dst:
root_pth = Path(app.Paths.from_id(self._lo_identifier))
dst = root_pth / "pythonpath"
with tempfile.TemporaryDirectory() as temp_dir:
# temp_dir = tempfile.gettempdir()
path_pip = Path(temp_dir)
filename = path_pip / "pip-wheel.whl"
data, _, err = app.url_open(self._pip_url, verify=False)
if err:
app.error("Unable to download PIP installation wheel file")
return
filename.write_bytes(data)
if filename.exists():
app.info("PIP wheel file has been saved")
else:
app.error("PIP wheel file has not been saved")
return
try:
self._unzip_wheel(filename=filename, dst=dst)
except Exception:
return
# now that pip has been installed from wheel force a reinstall to ensure it is the latest version
self._force_install_pip()
def _unzip_wheel(self, filename: Path, dst: str | Path) -> None:
"""Unzip the downloaded wheel file"""
if isinstance(dst, str):
dst = Path(dst)
try:
# app.zip.unzip(source=str(filename), target=str(dst))
self.unzip_file(zip_file=filename, dest_dir=dst)
if dst.exists():
app.debug(f"PIP wheel file has been unzipped to {dst}")
else:
app.error("PIP wheel file has not been unzipped")
raise Exception("PIP wheel file has not been unzipped")
except Exception as err:
app.error(f"Unable to unzip wheel file: {err}", exc_info=True)
raise
def _force_install_pip(self) -> None:
"""Now that pip has been installed, force reinstall it to ensure it is the latest version"""
cmd = [app.paths.python, "-m", "pip", "install", "--upgrade", "pip"]
app.popen(command=" ".join(cmd))
def unzip_file(self, zip_file: str | Path, dest_dir: str | Path = "") -> None:
"""
Unzip the given zip file to the specified destination directory.
Args:
zip_file (str | Path): The zip file to unzip.
dest_dir (str | Path, optional): The destination directory to unzip to.
Returns:
None:
"""
from zipfile import ZipFile
zip_file_path = Path(zip_file) if isinstance(zip_file, str) else zip_file
if not zip_file_path.is_file():
raise ValueError(f"Expected file, got '{zip_file_path}'")
if not zip_file_path.is_absolute():
zip_file_path = zip_file_path.absolute()
if not zip_file_path.exists():
raise FileNotFoundError(f"File '{zip_file_path}' not found")
if isinstance(dest_dir, str):
dest_dir = zip_file_path.parent if dest_dir == "" else Path(dest_dir)
else:
dest_dir = dest_dir.absolute()
if not dest_dir.is_dir():
raise ValueError(f"Expected folder, got '{dest_dir}'")
if not dest_dir.exists():
try:
dest_dir.mkdir(parents=True)
except Exception as e:
raise FileNotFoundError(
f"Folder '{dest_dir}' not found, unable to create folder."
) from e
if not dest_dir.exists():
raise FileNotFoundError(f"Folder '{dest_dir}' not found")
with change_dir(dest_dir):
with ZipFile(zip_file_path) as f:
f.extractall(dest_dir)
# with change_dir(dest_dir):
# shutil.unpack_archive(zip_file_path, dest_dir)
class FlatpakInstaller:
"""class for the PIP install."""
def __init__(self, pip_wheel_url: str, lo_identifier: str) -> None:
self.path_python = app.Paths.python
app.debug(f"Python path: {self.path_python}")
self._pip_url = pip_wheel_url
self._lo_identifier = lo_identifier
self._site_packages = cast(str, app.get_flatpak_site_packages_dir())
def install_pip(self) -> None:
if self.is_pip_installed():
app.info("PIP is already installed")
return
if self._install_wheel():
if self.is_pip_installed():
app.info("PIP was installed successfully")
else:
app.error("PIP installation has failed")
return
def _get_pip_cmd(self, filename: Path) -> List[str]:
return [str(self.path_python), f"{filename}", "--user"]
def _get_env(self) -> Dict[str, str]:
"""
Gets Environment used for subprocess.
"""
my_env = os.environ.copy()
py_path = ""
p_sep = ";" if app.IS_WIN else ":"
for d in sys.path:
py_path = py_path + d + p_sep
my_env["PYTHONPATH"] = py_path
return my_env
def _cmd_pip(self, *args: str) -> List[str]:
cmd: List[str] = [str(self.path_python), "-m", "pip", *args]
return cmd
def _install_wheel(self) -> bool:
result = False
installer = InstallPipFromWheel(
pip_wheel_url=self._pip_url, lo_identifier=self._lo_identifier
)
try:
installer.install(self._site_packages)
if self._site_packages not in sys.path:
sys.path.append(self._site_packages)
result = True
except Exception as err:
app.error(err)
return result
return result
def is_pip_installed(self) -> bool:
"""Check if PIP is installed."""
# cmd = self._cmd_pip("--version")
# cmd = '"{}" -m pip -V'.format(self.path_python)
cmd = [str(self.path_python), "-m", "pip", "-V"]
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self._get_env()
)
return result.returncode == 0

View File

@ -10,6 +10,7 @@ _ = None
TITLE = 'ZAZ-PIP'
URL_PIP = 'https://bootstrap.pypa.io/get-pip.py'
URL_PIP_WHEEL = 'https://files.pythonhosted.org/packages/47/6a/453160888fab7c6a432a6e25f8afe6256d0d9f2cbd25971021da6491d899/pip-23.3.1-py3-none-any.whl'
URL_TEST = 'http://duckduckgo.com'
PIP = 'pip'
URL_GIT = 'https://git.cuates.net/elmau/zaz-pip'
@ -66,6 +67,7 @@ class Controllers(object):
def __init__(self, dialog):
self.d = dialog
self.path_python = app.paths.python
self.is_flatpak = app.IS_FLATPAK
def _set_state(self, state):
self._states = {
@ -85,8 +87,7 @@ class Controllers(object):
self._install_pip()
return
@app.run_in_thread
def _install_pip(self):
def _install_pip_normal(self) -> None:
self.d.link_proyect.visible = False
self.d.lst_log.visible = True
path_pip = app.paths.tmp()
@ -119,7 +120,7 @@ class Controllers(object):
self.d.lbl_pip.value = label
self.d.cmd_install_pip.visible = False
self.d.cmd_admin_pip.visible = True
msg = _('PIP was installed sucessfully')
msg = _('PIP was installed successfully')
app.msgbox(msg)
else:
msg = _('PIP installation has failed, see log')
@ -129,6 +130,35 @@ class Controllers(object):
return
def _install_pip_flatpak(self) -> None:
try:
from install_flatpak import FlatpakInstaller
installer = FlatpakInstaller(pip_wheel_url=URL_PIP_WHEEL, lo_identifier=ID_EXTENSION)
installer.install_pip()
msg = _('PIP was installed successfully')
# app.msgbox(msg)
cmd = self._cmd_pip('-V')
label = app.run(cmd, True)
if label:
self.d.lbl_pip.value = label
self.d.cmd_install_pip.visible = False
self.d.cmd_admin_pip.visible = True
msg = _('PIP was installed successfully')
app.msgbox(msg)
else:
msg = _('PIP installation has failed, see log')
app.warning(msg)
except Exception as e:
app.errorbox(e)
@app.run_in_thread
def _install_pip(self):
if app.IS_FLATPAK:
self._install_pip_flatpak()
else:
self._install_pip_normal()
return
def _cmd_pip(self, args):
cmd = '"{}" -m pip {}'.format(self.path_python, args)
return cmd
@ -230,7 +260,10 @@ class Controllers(object):
self.d.lst_log.visible = True
line = ''
cmd = ' install --upgrade --user'
if self.is_flatpak:
cmd = f' install --upgrade --target={app.get_flatpak_site_packages_dir()}'
else:
cmd = ' install --upgrade --user'
if value:
name = value.split(' ')[0].strip()
cmd = self._cmd_pip(f'{cmd} {name}')