#!/usr/bin/env python3 import csv import datetime import hashlib import json import os import re import shlex import shutil import subprocess import sys import tempfile import threading import time from functools import wraps from pathlib import Path from pprint import pprint from string import Template from typing import Any, Union import mailbox import smtplib from smtplib import SMTPException, SMTPAuthenticationError from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.utils import formatdate from email import encoders import uno from com.sun.star.awt import MessageBoxButtons from com.sun.star.awt.MessageBoxResults import YES from com.sun.star.beans import PropertyValue from com.sun.star.beans.PropertyConcept import ALL from com.sun.star.ui.dialogs import TemplateDescription from .easymain import ( CTX, DATE_OFFSET, IS_MAC, IS_WIN, LANG, TITLE, log, create_instance ) from .easyuno import MessageBoxType from .easydocs import LODocuments from .messages import MESSAGES from .zazplus import mureq __all__ = [ 'Color', 'Config', 'Dates', 'Email', 'Hash', 'LOInspect', 'Macro', 'Paths', 'URL', 'Shell', 'Timer', 'catch_exception', 'debug', 'error', 'info', 'mri', 'msgbox', 'render', 'run_in_thread', 'save_log', 'sleep', ] TIMEOUT = 10 PYTHON = 'python' if IS_WIN: PYTHON = 'python.exe' FILES = { 'CONFIG': 'zaz-{}.json', } _EVENTS = {} def _(msg): if LANG == 'en': return msg if not LANG in MESSAGES: return msg return MESSAGES[LANG][msg] def debug(*messages) -> None: """Show messages debug :param messages: List of messages to debug :type messages: list[Any] """ data = [str(m) for m in messages] log.debug('\t'.join(data)) return def error(message: Any) -> None: """Show message error :param message: The message error :type message: Any """ log.error(message) return def info(*messages) -> None: """Show messages info :param messages: List of messages to debug :type messages: list[Any] """ data = [str(m) for m in messages] log.info('\t'.join(data)) return def save_log(path: str, data: Any) -> bool: """Save data in file, data append to end and automatic add current time. :param path: Path to save log :type path: str :param data: Data to save in file log :type data: Any """ result = True try: with open(path, 'a') as f: f.write(f'{str(Dates.now())} - ') pprint(data, stream=f) except Exception as e: error(e) result = False return result def mri(obj: Any) -> None: """Inspect object with MRI Extension :param obj: Any pyUno object :type obj: Any `See MRI `_ """ m = create_instance('mytools.Mri') if m is None: msg = 'Extension MRI not found' error(msg) return if hasattr(obj, 'obj'): obj = obj.obj m.inspect(obj) return def catch_exception(f): """Catch exception for any function :param f: Any Python function :type f: Function instance """ @wraps(f) def func(*args, **kwargs): try: return f(*args, **kwargs) except Exception as e: name = f.__name__ if IS_WIN: msgbox(traceback.format_exc()) log.error(name, exc_info=True) return func def set_app_config(node_name: str, key: str, new_value: Any) -> Any: """Update value for key in node name. :param node_name: Name of node :type name: str :param key: Name of key :type key: str :return: True if update sucesfully :rtype: bool `See Api ConfigurationUpdateAccess `_ """ result = True current_value = '' name = 'com.sun.star.configuration.ConfigurationProvider' service = 'com.sun.star.configuration.ConfigurationUpdateAccess' cp = create_instance(name, True) node = PropertyValue(Name='nodepath', Value=node_name) update = cp.createInstanceWithArguments(service, (node,)) try: current_value = update.getPropertyValue(key) update.setPropertyValue(key, new_value) update.commitChanges() except Exception as e: error(e) if update.hasByName(key) and current_value: update.setPropertyValue(key, current_value) update.commitChanges() result = False return result def msgbox(message: Any, title: str=TITLE, buttons=MessageBoxButtons.BUTTONS_OK, \ type_message_box=MessageBoxType.INFOBOX) -> int: """Create message box :param message: Any type message, all is converted to string. :type message: Any :param title: The title for message box :type title: str :param buttons: A combination of `com::sun::star::awt::MessageBoxButtons `_ :type buttons: long :param type_message_box: The `message box type `_ :type type_message_box: enum :return: `MessageBoxResult `_ :rtype: int `See Api XMessageBoxFactory `_ """ toolkit = create_instance('com.sun.star.awt.Toolkit') parent = toolkit.getDesktopWindow() box = toolkit.createMessageBox(parent, type_message_box, buttons, title, str(message)) return box.execute() def question(message: str, title: str=TITLE) -> bool: """Create message box question, show buttons YES and NO :param message: Message question :type message: str :param title: The title for message box :type title: str :return: True if user click YES and False if click NO :rtype: bool """ buttons = MessageBoxButtons.BUTTONS_YES_NO result = msgbox(message, title, buttons, MessageBoxType.QUERYBOX) return result == YES def warning(message: Any, title: str=TITLE) -> int: """Create message box with icon warning :param message: Any type message, all is converted to string. :type message: Any :param title: The title for message box :type title: str :return: MessageBoxResult :rtype: int """ return msgbox(message, title, type_message_box=MessageBoxType.WARNINGBOX) def errorbox(message: Any, title: str=TITLE) -> int: """Create message box with icon error :param message: Any type message, all is converted to string. :type message: Any :param title: The title for message box :type title: str :return: MessageBoxResult :rtype: int """ return msgbox(message, title, type_message_box=MessageBoxType.ERRORBOX) def sleep(seconds: int): """Sleep """ time.sleep(seconds) return def render(template, data): s = Template(template) return s.safe_substitute(**data) def run_in_thread(fn: Any) -> Any: """Run any function in thread :param fn: Any Python function (macro) :type fn: Function instance """ def run(*k, **kw): t = threading.Thread(target=fn, args=k, kwargs=kw) t.start() return t return run def set_properties(model, properties): if 'X' in properties: properties['PositionX'] = properties.pop('X') if 'Y' in properties: properties['PositionY'] = properties.pop('Y') keys = tuple(properties.keys()) values = tuple(properties.values()) model.setPropertyValues(keys, values) return # ~ https://github.com/django/django/blob/main/django/utils/functional.py#L61 class classproperty: def __init__(self, method=None): self.fget = method def __get__(self, instance, cls=None): return self.fget(cls) def getter(self, method): self.fget = method return self class Services(): @classproperty def toolkit(cls): instance = create_instance('com.sun.star.awt.Toolkit', True) return instance class LOInspect(): """Class inspect Inspired by `MRI `_ """ TYPE_CLASSES = { 'INTERFACE': '-Interface-', 'SEQUENCE': '-Sequence-', 'STRUCT': '-Struct-', } def __init__(self, obj: Any, to_doc: bool=False): """Introspection objects pyUno :param obj: Object to inspect :type obj: Any pyUno :param to_doc: If show info in new doc Calc :type to_doc: bool """ self._obj = obj if hasattr(obj, 'obj'): self._obj = obj.obj self._properties = () self._methods = () self._interfaces = () self._services = () self._listeners = () introspection = create_instance('com.sun.star.beans.Introspection') result = introspection.inspect(self._obj) if result: self._properties = self._get_properties(result) self._methods = self._get_methods(result) self._interfaces = self._get_interfaces(result) self._services = self._get_services(self._obj) self._listeners = self._get_listeners(result) self._to_doc(to_doc) def _to_doc(self, to_doc: bool): if not to_doc: return doc = LODocuments().new() sheet = doc[0] sheet.name = 'Properties' sheet['A1'].data = self.properties sheet = doc.insert('Methods') sheet['A1'].data = self.methods sheet = doc.insert('Interfaces') sheet['A1'].data = self.interfaces sheet = doc.insert('Services') sheet['A1'].data = self.services sheet = doc.insert('Listeners') sheet['A1'].data = self.listeners return def _get_value(self, p: Any): type_class = p.Type.typeClass.value if type_class in self.TYPE_CLASSES: return self.TYPE_CLASSES[type_class] value = '' try: value = getattr(self._obj, p.Name) if type_class == 'ENUM' and value: value = value.value elif type_class == 'TYPE': value = value.typeName elif value is None: value = '-void-' except: value = '-error-' return str(value) def _get_attributes(self, a: Any): PA = {1 : 'Maybe Void', 16 : 'Read Only'} attr = ', '.join([PA.get(k, '') for k in PA.keys() if a & k]) return attr def _get_property(self, p: Any): name = p.Name tipo = p.Type.typeName value = self._get_value(p) attr = self._get_attributes(p.Attributes) return name, tipo, value, attr def _get_properties(self, result: Any): properties = result.getProperties(ALL) data = [('Name', 'Type', 'Value', 'Attributes')] data += [self._get_property(p) for p in properties] return data def _get_arguments(self, m: Any): arguments = '( {} )'.format(', '.join( [' '.join(( f'[{p.aMode.value.lower()}]', p.aName, p.aType.Name)) for p in m.ParameterInfos] )) return arguments def _get_method(self, m: Any): name = m.Name arguments = self._get_arguments(m) return_type = m.ReturnType.Name class_name = m.DeclaringClass.Name return name, arguments, return_type, class_name def _get_methods(self, result: Any): methods = result.getMethods(ALL) data = [('Name', 'Arguments', 'Return Type', 'Class')] data += [self._get_method(m) for m in methods] return data def _get_interfaces(self, result: Any): methods = result.getMethods(ALL) interfaces = {m.DeclaringClass.Name for m in methods} return tuple(zip(interfaces)) def _get_services(self, obj: Any): try: data = [str(s) for s in obj.getSupportedServiceNames()] data = tuple(zip(data)) except: data = () return data def _get_listeners(self, result: Any): data = [l.typeName for l in result.getSupportedListeners()] return tuple(zip(data)) @property def properties(self): return self._properties @property def methods(self): return self._methods @property def interfaces(self): return self._interfaces @property def services(self): return self._services @property def listeners(self): return self._listeners class Dates(object): """Class for datetimes """ _start = None @classproperty def now(cls): """Current local date and time :return: Return the current local date and time, remove microseconds. :rtype: datetime """ return datetime.datetime.now().replace(microsecond=0) @classproperty def now_time(cls): """Current local time :return: Return the current local time :rtype: time """ return cls.now.time() @classproperty def today(cls): """Current local date :return: Return the current local date :rtype: date """ return datetime.date.today() @classproperty def epoch(cls): """Get unix time :return: Return unix time :rtype: int `See Unix Time `_ """ n = cls.now e = int(time.mktime(n.timetuple())) return e @classmethod def date(cls, year: int, month: int, day: int): """Get date from year, month, day :param year: Year of date :type year: int :param month: Month of date :type month: int :param day: Day of day :type day: int :return: Return the date :rtype: date `See Python date `_ """ d = datetime.date(year, month, day) return d @classmethod def time(cls, hours: int, minutes: int, seconds: int): """Get time from hour, minutes, seconds :param hours: Hours of time :type hours: int :param minutes: Minutes of time :type minutes: int :param seconds: Seconds of time :type seconds: int :return: Return the time :rtype: datetime.time """ t = datetime.time(hours, minutes, seconds) return t @classmethod def datetime(cls, year: int, month: int, day: int, hours: int, minutes: int, seconds: int): """Get datetime from year, month, day, hours, minutes and seconds :param year: Year of date :type year: int :param month: Month of date :type month: int :param day: Day of day :type day: int :param hours: Hours of time :type hours: int :param minutes: Minutes of time :type minutes: int :param seconds: Seconds of time :type seconds: int :return: Return datetime :rtype: datetime """ dt = datetime.datetime(year, month, day, hours, minutes, seconds) return dt @classmethod def str_to_date(cls, str_date: str, template: str, to_calc: bool=False): """Get date from string :param str_date: Date in string :type str_date: str :param template: Formato of date string :type template: str :param to_calc: If date is for used in Calc cell :type to_calc: bool :return: Return date or int if used in Calc :rtype: date or int `See Python strptime `_ """ d = datetime.datetime.strptime(str_date, template).date() if to_calc: d = d.toordinal() - DATE_OFFSET return d @classmethod def calc_to_date(cls, value: float): """Get date from calc value :param value: Float value from cell :type value: float :return: Return the current local date :rtype: date `See Python fromordinal `_ """ d = datetime.date.fromordinal(int(value) + DATE_OFFSET) return d @classmethod def start(cls): """Start counter """ cls._start = cls.now info('Start: ', cls._start) return @classmethod def end(cls, get_seconds: bool=True): """End counter :param get_seconds: If return value in total seconds :type get_seconds: bool :return: Return the timedelta or total seconds :rtype: timedelta or int """ e = cls.now td = e - cls._start result = str(td) if get_seconds: result = td.total_seconds() info('End: ', e) return result class Paths(object): """Class for paths """ FILE_PICKER = 'com.sun.star.ui.dialogs.FilePicker' FOLDER_PICKER = 'com.sun.star.ui.dialogs.FolderPicker' REMOTE_FILE_PICKER = 'com.sun.star.ui.dialogs.RemoteFilePicker' OFFICE_FILE_PICKER = 'com.sun.star.ui.dialogs.OfficeFilePicker' def __init__(self, path=''): if path.startswith('file://'): path = str(Path(uno.fileUrlToSystemPath(path)).resolve()) self._path = Path(path) @property def path(self): """Get base path""" return str(self._path.parent) @property def file_name(self): """Get file name""" return self._path.name @property def name(self): """Get name""" return self._path.stem @property def ext(self): """Get extension""" return self._path.suffix[1:] @property def size(self): """Get size""" return self._path.stat().st_size @property def url(self): """Get like URL""" return self._path.as_uri() @property def info(self): """Get all info like tuple""" i = (self.path, self.file_name, self.name, self.ext, self.size, self.url) return i @property def dict(self): """Get all info like dict""" data = { 'path': self.path, 'file_name': self.file_name, 'name': self.name, 'ext': self.ext, 'size': self.size, 'url': self.url, } return data @classproperty def home(self): """Get user home""" return str(Path.home()) @classproperty def documents(self): """Get user save documents""" return self.config() @classproperty def user_profile(self): """Get path user profile""" path = self.config('UserConfig') path = str(Path(path).parent) return path @classproperty def user_config(self): """Get path config in user profile""" path = self.config('UserConfig') return path @classproperty def python(self): """Get path executable python""" if IS_WIN: path = self.join(self.config('Module'), PYTHON) elif IS_MAC: path = self.join(self.config('Module'), '..', 'Resources', PYTHON) else: path = sys.executable return path @classmethod def to_url(cls, path: str) -> str: """Convert paths in format system to URL :param path: Path to convert :type path: str :return: Path in URL :rtype: str """ if not path.startswith('file://'): path = Path(path).as_uri() return path @classmethod def to_system(cls, path:str) -> str: """Convert paths in URL to system :param path: Path to convert :type path: str :return: Path system format :rtype: str """ if path.startswith('file://'): path = str(Path(uno.fileUrlToSystemPath(path)).resolve()) return path @classmethod def config(cls, name: str='Work') -> Union[str, list]: """Return path from config :param name: Name in service PathSettings, default get path documents :type name: str :return: Path in config, if exists. :rtype: str or list `See Api XPathSettings `_ """ path = create_instance('com.sun.star.util.PathSettings') path = cls.to_system(getattr(path, name)).split(';') if len(path) == 1: path = path[0] return path @classmethod def join(cls, *paths: str) -> str: """Join paths :param paths: Paths to join :type paths: list :return: New path with joins :rtype: str """ path = str(Path(paths[0]).joinpath(*paths[1:])) return path @classmethod def exists(cls, path: str) -> bool: """If exists path :param path: Path for validate :type path: str :return: True if path exists, False if not. :rtype: bool """ path = cls.to_system(path) result = Path(path).exists() return result @classmethod def exists_app(cls, name_app: str) -> bool: """If exists app in system :param name_app: Name of application :type name_app: str :return: True if app exists, False if not. :rtype: bool """ result = bool(shutil.which(name_app)) return result @classmethod def is_dir(cls, path: str): """Validate if path is directory :param path: Path for validate :type path: str :return: True if path is directory, False if not. :rtype: bool """ return Path(path).is_dir() @classmethod def is_file(cls, path: str): """Validate if path is a file :param path: Path for validate :type path: str :return: True if path is a file, False if not. :rtype: bool """ return Path(path).is_file() @classmethod def temp_file(self): """Make temporary file""" return tempfile.NamedTemporaryFile(mode='w') @classmethod def temp_dir(self): """Make temporary directory""" return tempfile.TemporaryDirectory(ignore_cleanup_errors=True) @classmethod def get(cls, init_dir: str='', filters: str='', multiple: bool=False) -> Union[str, list]: """Get path for open :param init_dir: Initial default path :type init_dir: str :param filters: Filter for show type files: 'xml' or 'txt,xml' :type filters: str :param multiple: If user can selected multiple files :type multiple: bool :return: Selected path or paths :rtype: str or list `See API `_ """ if not init_dir: init_dir = cls.documents init_dir = cls.to_url(init_dir) file_picker = create_instance(cls.FILE_PICKER) file_picker.setTitle(_('Select path')) file_picker.setDisplayDirectory(init_dir) file_picker.initialize((TemplateDescription.FILEOPEN_SIMPLE,)) file_picker.setMultiSelectionMode(multiple) if filters: for f in filters.split(','): file_picker.appendFilter(f.upper(), f'*.{f.lower()}') if file_picker.execute(): paths = [cls.to_system(p) for p in file_picker.getSelectedFiles()] if not multiple: paths = paths[0] return paths @classmethod def get_dir(cls, init_dir: str='') -> str: """Get path dir :param init_dir: Initial default path :type init_dir: str :return: Selected path :rtype: str """ folder_picker = create_instance(cls.FOLDER_PICKER) if not init_dir: init_dir = cls.documents init_dir = cls.to_url(init_dir) folder_picker.setTitle(_('Select directory')) folder_picker.setDisplayDirectory(init_dir) path = '' if folder_picker.execute(): path = cls.to_system(folder_picker.getDirectory()) return path @classmethod def get_for_save(cls, init_dir: str='', filters: str=''): """Get path for save :param init_dir: Initial default path :type init_dir: str :param filters: Filter for show type files: 'xml' or 'txt,xml' :type filters: str :return: Selected path :rtype: str """ if not init_dir: init_dir = cls.documents init_dir = cls.to_url(init_dir) file_picker = create_instance(cls.FILE_PICKER) file_picker.setTitle(_('Select file')) file_picker.setDisplayDirectory(init_dir) file_picker.initialize((TemplateDescription.FILESAVE_SIMPLE,)) if filters: for f in filters.split(','): file_picker.appendFilter(f.upper(), f'*.{f.lower()}') path = '' if file_picker.execute(): files = file_picker.getSelectedFiles() path = [cls.to_system(f) for f in files][0] return path @classmethod def files(cls, path: str, pattern: str='*'): """Get all files in path :param path: Path with files :type path: str :param pattern: For filter files, default get all. :type pattern: str :return: Files in path :rtype: list """ files = [str(p) for p in Path(path).glob(pattern) if p.is_file()] return files @classmethod def walk(cls, path, filters=''): """Get all files in path recursively :param path: Path with files :type path: str :param filters: For filter files, default get all. :type filters: str :return: Files in path :rtype: list """ paths = [] for folder, _, files in os.walk(path): if filters: pattern = re.compile(r'\.(?:{})$'.format(filters), re.IGNORECASE) paths += [cls.join(folder, f) for f in files if pattern.search(f)] else: paths += [cls.join(folder, f) for f in files] return paths @classmethod def dirs(cls, path): """Get directories in path :param path: Path to scan :type path: str :return: Directories in path :rtype: list """ dirs = [str(p) for p in Path(path).iterdir() if p.is_dir()] return dirs @classmethod def walk_dirs(cls, path, tree=False): """Get directories recursively :param path: Path to scan :type path: str :param tree: get info in a tuple (ID_FOLDER, ID_PARENT, NAME) :type tree: bool :return: Directories in path :rtype: list """ folders = [] if tree: i = 0 parents = {path: 0} for root, dirs, _ in os.walk(path): for name in dirs: i += 1 rn = cls.join(root, name) if not rn in parents: parents[rn] = i folders.append((i, parents[root], name)) else: for root, dirs, _ in os.walk(path): folders += [cls.join(root, name) for name in dirs] return folders @classmethod def extension(cls, id_ext: str): """Get path extension install from id :param id_ext: ID extension :type id_ext: str :return: Path extension :rtype: str """ pip = CTX.getValueByName('/singletons/com.sun.star.deployment.PackageInformationProvider') path = Paths.to_system(pip.getPackageLocation(id_ext)) return path @classmethod def replace_ext(cls, path: str, new_ext: str): """Replace extension in file path :param path: Path to file :type path: str :param new_ext: New extension :type new_ext: str :return: Path with new extension :rtype: str """ p = Paths(path) name = f'{p.name}.{new_ext}' path = cls.join(p.path, name) return path @classmethod def open(cls, path: str): """Open any file with default program in system :param path: Path to file :type path: str :return: PID file, only Linux :rtype: int """ pid = 0 if IS_WIN: os.startfile(path) else: pid = subprocess.Popen(['xdg-open', path]).pid return pid # ~ Save/read data @classmethod def save(cls, path: str, data: str, encoding: str='utf-8') -> bool: """Save data in path with encoding :param path: Path to file save :type path: str :param data: Data to save :type data: str :param encoding: Encoding for save data, default utf-8 :type encoding: str :return: True, if save corrrectly :rtype: bool """ result = bool(Path(path).write_text(data, encoding=encoding)) return result @classmethod def save_bin(cls, path: str, data: bytes) -> bool: """Save binary data in path :param path: Path to file save :type path: str :param data: Data to save :type data: bytes :return: True, if save corrrectly :rtype: bool """ result = bool(Path(path).write_bytes(data)) return result @classmethod def read(cls, path: str, get_lines: bool=False, encoding: str='utf-8') -> Union[str, list]: """Read data in path :param path: Path to file read :type path: str :param get_lines: If read file line by line :type get_lines: bool :return: File content :rtype: str or list """ if get_lines: with Path(path).open(encoding=encoding) as f: data = f.readlines() else: data = Path(path).read_text(encoding=encoding) return data @classmethod def read_bin(cls, path: str) -> bytes: """Read binary data in path :param path: Path to file read :type path: str :return: File content :rtype: bytes """ data = Path(path).read_bytes() return data # ~ Import/export data @classmethod def save_json(cls, path: str, data: str): """Save data in path file like json :param path: Path to file :type path: str :return: True if save correctly :rtype: bool """ data = json.dumps(data, indent=4, ensure_ascii=False, sort_keys=True) return cls.save(path, data) @classmethod def read_json(cls, path: str) -> Any: """Read path file and load json data :param path: Path to file :type path: str :return: Any data :rtype: Any """ data = json.loads(cls.read(path)) return data @classmethod def save_csv(cls, path: str, data: Any, args: dict={}): """Write CSV :param path: Path to file write csv :type path: str :param data: Data to write :type data: Iterable :param args: Any argument support for Python library :type args: dict `See CSV Writer `_ """ with open(path, 'w') as f: writer = csv.writer(f, **args) writer.writerows(data) return @classmethod def read_csv(cls, path: str, args: dict={}) -> list: """Read CSV :param path: Path to file csv :type path: str :param args: Any argument support for Python library :type args: dict :return: Data csv like tuple :rtype: tuple `See CSV Reader `_ """ with open(path) as f: rows = list(csv.reader(f, **args)) return rows @classmethod def kill(cls, path: str) -> bool: """Delete path :param path: Path to file or directory :type path: str :return: True if delete correctly :rtype: bool """ result = False p = Path(path) try: if p.is_file(): p.unlink() result = True elif p.is_dir(): shutil.rmtree(path) result = True except OSError as e: log.error(e) return result @classmethod def copy(cls, source: str, target: str='', name: str='') -> str: """Copy files :param source: Path source :type source: str :param target: Path target :type target: str :param name: New name in target :type name: str :return: Path target :rtype: str """ p, f, n, e, _, _ = Paths(source).info if target: p = target e = f'.{e}' if name: e = '' n = name path_new = cls.join(p, f'{n}{e}') shutil.copy(source, path_new) return path_new @classmethod def zip(cls, source: Union[str, tuple, list], target: str='') -> str: path_zip = target if not isinstance(source, (tuple, list)): path, _, name, _ = _P(source).info start = len(path) + 1 if not target: path_zip = f'{path}/{name}.zip' if isinstance(source, (tuple, list)): files = [(f, f[len(_P(f).path)+1:]) for f in source] elif _P.is_file(source): files = ((source, source[start:]),) else: files = [(f, f[start:]) for f in _P.walk(source)] compression = zipfile.ZIP_DEFLATED with zipfile.ZipFile(path_zip, 'w', compression=compression) as z: for f in files: z.write(f[0], f[1]) return path_zip @classmethod def unzip(cls, source: str, target: str='', members=None, pwd=None): path = target if not target: path = _P(source).path with zipfile.ZipFile(source) as z: if not pwd is None: pwd = pwd.encode() if isinstance(members, str): members = (members,) z.extractall(path, members=members, pwd=pwd) return @classmethod def zip_content(cls, path: str): with zipfile.ZipFile(path) as z: names = z.namelist() return names @classmethod def merge_zip(cls, target, zips): try: with zipfile.ZipFile(target, 'w', compression=zipfile.ZIP_DEFLATED) as t: for path in zips: with zipfile.ZipFile(path, compression=zipfile.ZIP_DEFLATED) as s: for name in s.namelist(): t.writestr(name, s.open(name).read()) except Exception as e: error(e) return False return True class Email(): """Class for send email """ class SmtpServer(object): def __init__(self, config): self._server = None self._error = '' self._sender = '' self._is_connect = self._login(config) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() @property def is_connect(self): return self._is_connect @property def error(self): return self._error def _login(self, config): name = config['server'] port = config['port'] is_ssl = config['ssl'] starttls = config.get('starttls', False) self._sender = config['user'] try: if starttls: self._server = smtplib.SMTP(name, port, timeout=TIMEOUT) self._server.ehlo() self._server.starttls() self._server.ehlo() elif is_ssl: self._server = smtplib.SMTP_SSL(name, port, timeout=TIMEOUT) self._server.ehlo() else: self._server = smtplib.SMTP(name, port, timeout=TIMEOUT) self._server.login(self._sender, config['password']) msg = 'Connect to: {}'.format(name) debug(msg) return True except smtplib.SMTPAuthenticationError as e: if '535' in str(e): self._error = _('Incorrect user or password') return False if '534' in str(e) and 'gmail' in name: self._error = _('Allow less secure apps in GMail') return False except smtplib.SMTPException as e: self._error = str(e) return False except Exception as e: self._error = str(e) return False return False def _body_validate(self, msg): body = msg.replace('\n', '
') return body def send(self, message): # ~ file_name = 'attachment; filename={}' email = MIMEMultipart('alternative') email['From'] = self._sender email['To'] = message['to'] email['Cc'] = message.get('cc', '') email['Subject'] = message['subject'] email['Date'] = formatdate(localtime=True) if message.get('confirm', False): email['Disposition-Notification-To'] = email['From'] if 'body_text' in message: email.attach(MIMEText(message['body_text'], 'plain', 'utf-8')) if 'body' in message: body = self._body_validate(message['body']) email.attach(MIMEText(body, 'html', 'utf-8')) paths = message.get('files', ()) if isinstance(paths, str): paths = (paths,) for path in paths: fn = Paths(path).file_name # ~ print('NAME', fn) part = MIMEBase('application', 'octet-stream') part.set_payload(Paths.read_bin(path)) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename="{fn}"') email.attach(part) receivers = ( email['To'].split(',') + email['CC'].split(',') + message.get('bcc', '').split(',')) try: self._server.sendmail(self._sender, receivers, email.as_string()) msg = 'Email sent...' debug(msg) if message.get('path', ''): self.save_message(email, message['path']) return True except Exception as e: self._error = str(e) return False return False def save_message(self, email, path): mbox = mailbox.mbox(path, create=True) mbox.lock() try: msg = mailbox.mboxMessage(email) mbox.add(msg) mbox.flush() finally: mbox.unlock() return def close(self): try: self._server.quit() msg = 'Close connection...' debug(msg) except: pass return @classmethod def _send_email(cls, server, messages): with cls.SmtpServer(server) as server: if server.is_connect: for msg in messages: server.send(msg) else: log.error(server.error) return server.error @classmethod def send(cls, server: dict, messages: Union[dict, tuple, list]): """Send email with config server, emails send in thread. :param server: Configuration for send emails :type server: dict :param server: Dictionary con message or list of messages :type server: dict or iterator """ if isinstance(messages, dict): messages = (messages,) t = threading.Thread(target=cls._send_email, args=(server, messages)) t.start() return class Macro(object): """Class for call macro `See Scripting Framework `_ """ @classmethod def call(cls, args: dict, in_thread: bool=False): """Call any macro :param args: Dictionary with macro location :type args: dict :param in_thread: If execute in thread :type in_thread: bool :return: Return None or result of call macro :rtype: Any """ result = None if in_thread: t = threading.Thread(target=cls._call, args=(args,)) t.start() else: result = cls._call(args) return result @classmethod def get_url_script(cls, args: dict): library = args['library'] name = args['name'] language = args.get('language', 'Python') location = args.get('location', 'user') module = args.get('module', '.') if language == 'Python': module = '.py$' elif language == 'Basic': module = f".{module}." if location == 'user': location = 'application' url = 'vnd.sun.star.script' url = f'{url}:{library}{module}{name}?language={language}&location={location}' return url @classmethod def _call(cls, args: dict): url = cls.get_url_script(args) args = args.get('args', ()) service = 'com.sun.star.script.provider.MasterScriptProviderFactory' factory = create_instance(service) script = factory.createScriptProvider('').getScript(url) result = script.invoke(args, None, None)[0] return result class Shell(object): """Class for subprocess `See Subprocess `_ """ @classmethod def run(cls, command: str, capture: bool=False, split: bool=False): """Execute commands :param command: Command to run :type command: str :param capture: If capture result of command :type capture: bool :param split: Some commands need split. :type split: bool :return: Result of command :rtype: Any """ if split: cmd = shlex.split(command) result = subprocess.run(cmd, capture_output=capture, text=True, shell=IS_WIN) if capture: result = result.stdout else: result = result.returncode else: if capture: result = subprocess.check_output(command, shell=True).decode() else: result = subprocess.Popen(command) return result @classmethod def popen(cls, command: str): """Execute commands and return line by line :param command: Command to run :type command: str :return: Result of command :rtype: Any """ try: proc = subprocess.Popen(shlex.split(command), shell=IS_WIN, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in proc.stdout: yield line.decode().rstrip() except Exception as e: error(e) yield str(e) class Hash(object): """Class for hash """ @classmethod def digest(cls, method: str, data: str, in_hex: bool=True): """Get digest from data with method :param method: Digest method: md5, sha1, sha256, sha512, etc... :type method: str :param data: Data for get digest :type data: str :param in_hex: If True, get digest in hexadecimal, if False, get bytes :type in_hex: bool :return: bytes or hex digest :rtype: bytes or str """ result = '' obj = getattr(hashlib, method)(data.encode()) if in_hex: result = obj.hexdigest() else: result = obj.digest() return result class Config(object): """Class for set and get configurations """ @classmethod def set(cls, prefix: str, values: Any, key: str='') -> bool: """Save data config in user config like json :param prefix: Unique prefix for this data :type prefix: str :param values: Values for save :type values: Any :param key: Key for value :type key: str :return: True if save correctly :rtype: bool """ name_file = FILES['CONFIG'].format(prefix) path = Paths.join(Paths.user_config, name_file) data = values if key: data = cls.get(prefix) data[key] = values result = Paths.save_json(path, data) return result @classmethod def get(cls, prefix: str, key: str='') -> Any: """Get data config from user config like json :param prefix: Unique prefix for this data :type prefix: str :param key: Key for value :type key: str :return: data :rtype: Any """ data = {} name_file = FILES['CONFIG'].format(prefix) path = Paths.join(Paths.user_config, name_file) if not Paths.exists(path): return data data = Paths.read_json(path) if key and key in data: data = data[key] return data class Color(): """Class for colors `See Web Colors `_ """ COLORS = { 'aliceblue': 15792383, 'antiquewhite': 16444375, 'aqua': 65535, 'aquamarine': 8388564, 'azure': 15794175, 'beige': 16119260, 'bisque': 16770244, 'black': 0, 'blanchedalmond': 16772045, 'blue': 255, 'blueviolet': 9055202, 'brown': 10824234, 'burlywood': 14596231, 'cadetblue': 6266528, 'chartreuse': 8388352, 'chocolate': 13789470, 'coral': 16744272, 'cornflowerblue': 6591981, 'cornsilk': 16775388, 'crimson': 14423100, 'cyan': 65535, 'darkblue': 139, 'darkcyan': 35723, 'darkgoldenrod': 12092939, 'darkgray': 11119017, 'darkgreen': 25600, 'darkgrey': 11119017, 'darkkhaki': 12433259, 'darkmagenta': 9109643, 'darkolivegreen': 5597999, 'darkorange': 16747520, 'darkorchid': 10040012, 'darkred': 9109504, 'darksalmon': 15308410, 'darkseagreen': 9419919, 'darkslateblue': 4734347, 'darkslategray': 3100495, 'darkslategrey': 3100495, 'darkturquoise': 52945, 'darkviolet': 9699539, 'deeppink': 16716947, 'deepskyblue': 49151, 'dimgray': 6908265, 'dimgrey': 6908265, 'dodgerblue': 2003199, 'firebrick': 11674146, 'floralwhite': 16775920, 'forestgreen': 2263842, 'fuchsia': 16711935, 'gainsboro': 14474460, 'ghostwhite': 16316671, 'gold': 16766720, 'goldenrod': 14329120, 'gray': 8421504, 'grey': 8421504, 'green': 32768, 'greenyellow': 11403055, 'honeydew': 15794160, 'hotpink': 16738740, 'indianred': 13458524, 'indigo': 4915330, 'ivory': 16777200, 'khaki': 15787660, 'lavender': 15132410, 'lavenderblush': 16773365, 'lawngreen': 8190976, 'lemonchiffon': 16775885, 'lightblue': 11393254, 'lightcoral': 15761536, 'lightcyan': 14745599, 'lightgoldenrodyellow': 16448210, 'lightgray': 13882323, 'lightgreen': 9498256, 'lightgrey': 13882323, 'lightpink': 16758465, 'lightsalmon': 16752762, 'lightseagreen': 2142890, 'lightskyblue': 8900346, 'lightslategray': 7833753, 'lightslategrey': 7833753, 'lightsteelblue': 11584734, 'lightyellow': 16777184, 'lime': 65280, 'limegreen': 3329330, 'linen': 16445670, 'magenta': 16711935, 'maroon': 8388608, 'mediumaquamarine': 6737322, 'mediumblue': 205, 'mediumorchid': 12211667, 'mediumpurple': 9662683, 'mediumseagreen': 3978097, 'mediumslateblue': 8087790, 'mediumspringgreen': 64154, 'mediumturquoise': 4772300, 'mediumvioletred': 13047173, 'midnightblue': 1644912, 'mintcream': 16121850, 'mistyrose': 16770273, 'moccasin': 16770229, 'navajowhite': 16768685, 'navy': 128, 'oldlace': 16643558, 'olive': 8421376, 'olivedrab': 7048739, 'orange': 16753920, 'orangered': 16729344, 'orchid': 14315734, 'palegoldenrod': 15657130, 'palegreen': 10025880, 'paleturquoise': 11529966, 'palevioletred': 14381203, 'papayawhip': 16773077, 'peachpuff': 16767673, 'peru': 13468991, 'pink': 16761035, 'plum': 14524637, 'powderblue': 11591910, 'purple': 8388736, 'red': 16711680, 'rosybrown': 12357519, 'royalblue': 4286945, 'saddlebrown': 9127187, 'salmon': 16416882, 'sandybrown': 16032864, 'seagreen': 3050327, 'seashell': 16774638, 'sienna': 10506797, 'silver': 12632256, 'skyblue': 8900331, 'slateblue': 6970061, 'slategray': 7372944, 'slategrey': 7372944, 'snow': 16775930, 'springgreen': 65407, 'steelblue': 4620980, 'tan': 13808780, 'teal': 32896, 'thistle': 14204888, 'tomato': 16737095, 'turquoise': 4251856, 'violet': 15631086, 'wheat': 16113331, 'white': 16777215, 'whitesmoke': 16119285, 'yellow': 16776960, 'yellowgreen': 10145074, } def _get_color(self, index): if isinstance(index, tuple): color = (index[0] << 16) + (index[1] << 8) + index[2] else: if index[0] == '#': r, g, b = bytes.fromhex(index[1:]) color = (r << 16) + (g << 8) + b else: color = self.COLORS.get(index.lower(), -1) return color def __call__(self, index): return self._get_color(index) def __getitem__(self, index): return self._get_color(index) class Timer(object): """Class for timer thread""" class TimerThread(threading.Thread): def __init__(self, event, seconds, macro): threading.Thread.__init__(self) self._event = event self._seconds = seconds self._macro = macro def run(self): while not self._event.wait(self._seconds): Macro.call(self._macro) info('\tTimer stopped... ') return @classmethod def exists(cls, name): """Validate in timer **name** exists :param name: Timer name, it must be unique :type name: str :return: True if exists timer name :rtype: bool """ global _EVENTS return name in _EVENTS @classmethod def start(cls, name: str, seconds: float, macro: dict): """Start timer **name** every **seconds** and execute **macro** :param name: Timer name, it must be unique :type name: str :param seconds: Seconds for wait :type seconds: float :param macro: Macro for execute :type macro: dict """ global _EVENTS _EVENTS[name] = threading.Event() info(f"Timer '{name}' started, execute macro: '{macro['name']}'") thread = cls.TimerThread(_EVENTS[name], seconds, macro) thread.start() return @classmethod def stop(cls, name: str): """Stop timer **name** :param name: Timer name :type name: str """ global _EVENTS _EVENTS[name].set() del _EVENTS[name] return @classmethod def once(cls, name: str, seconds: float, macro: dict): """Start timer **name** only once in **seconds** and execute **macro** :param name: Timer name, it must be unique :type name: str :param seconds: Seconds for wait before execute macro :type seconds: float :param macro: Macro for execute :type macro: dict """ global _EVENTS _EVENTS[name] = threading.Timer(seconds, Macro.call, (macro,)) _EVENTS[name].start() info(f'Event: "{name}", started... execute in {seconds} seconds') return @classmethod def cancel(cls, name: str): """Cancel timer **name** only once events. :param name: Timer name, it must be unique :type name: str """ global _EVENTS if name in _EVENTS: try: _EVENTS[name].cancel() del _EVENTS[name] info(f'Cancel event: "{name}", ok...') except Exception as e: error(e) else: debug(f'Cancel event: "{name}", not exists...') return class URL(object): """Class for simple url open """ @classmethod def get(cls, url: str, **kwargs): return mureq.get(url, **kwargs) @classmethod def post(cls, url: str, body=None, **kwargs): return mureq.post(url, body, **kwargs)