1410 lines
39 KiB
Python
1410 lines
39 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import csv
|
|
import datetime
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
from pprint import pprint
|
|
from typing import Any, Union
|
|
|
|
import mailbox
|
|
import smtplib
|
|
from smtplib import SMTPException, SMTPAuthenticationError
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.text import MIMEText
|
|
from email.utils import formatdate
|
|
from email import encoders
|
|
|
|
import uno
|
|
from com.sun.star.awt import MessageBoxButtons
|
|
from com.sun.star.awt.MessageBoxResults import YES
|
|
from com.sun.star.beans import PropertyValue
|
|
from com.sun.star.beans.PropertyConcept import ALL
|
|
from com.sun.star.ui.dialogs import TemplateDescription
|
|
|
|
from .easymain import (
|
|
CTX, DATE_OFFSET, IS_MAC, IS_WIN, LANG, TITLE,
|
|
log, create_instance
|
|
)
|
|
from .easyuno import MessageBoxType
|
|
from .easydocs import LODocuments
|
|
from .messages import MESSAGES
|
|
|
|
|
|
__all__ = [
|
|
'Dates',
|
|
'Email',
|
|
'LOInspect',
|
|
'Paths',
|
|
'catch_exception',
|
|
'debug',
|
|
'error',
|
|
'info',
|
|
'mri',
|
|
'msgbox',
|
|
'save_log',
|
|
'sleep',
|
|
]
|
|
|
|
|
|
TIMEOUT = 10
|
|
PYTHON = 'python'
|
|
if IS_WIN:
|
|
PYTHON = 'python.exe'
|
|
|
|
|
|
def _(msg):
|
|
if LANG == 'en':
|
|
return msg
|
|
|
|
if not LANG in MESSAGES:
|
|
return msg
|
|
|
|
return MESSAGES[LANG][msg]
|
|
|
|
|
|
def debug(*messages) -> None:
|
|
"""Show messages debug
|
|
|
|
:param messages: List of messages to debug
|
|
:type messages: list[Any]
|
|
"""
|
|
|
|
data = [str(m) for m in messages]
|
|
log.debug('\t'.join(data))
|
|
return
|
|
|
|
|
|
def error(message: Any) -> None:
|
|
"""Show message error
|
|
|
|
:param message: The message error
|
|
:type message: Any
|
|
"""
|
|
|
|
log.error(message)
|
|
return
|
|
|
|
|
|
def info(*messages) -> None:
|
|
"""Show messages info
|
|
|
|
:param messages: List of messages to debug
|
|
:type messages: list[Any]
|
|
"""
|
|
|
|
data = [str(m) for m in messages]
|
|
log.info('\t'.join(data))
|
|
return
|
|
|
|
|
|
def save_log(path: str, data: Any) -> bool:
|
|
"""Save data in file, data append to end and automatic add current time.
|
|
|
|
:param path: Path to save log
|
|
:type path: str
|
|
:param data: Data to save in file log
|
|
:type data: Any
|
|
"""
|
|
result = True
|
|
|
|
try:
|
|
with open(path, 'a') as f:
|
|
f.write(f'{str(Dates.now())} - ')
|
|
pprint(data, stream=f)
|
|
except Exception as e:
|
|
error(e)
|
|
result = False
|
|
|
|
return result
|
|
|
|
|
|
def mri(obj: Any) -> None:
|
|
"""Inspect object with MRI Extension
|
|
|
|
:param obj: Any pyUno object
|
|
:type obj: Any
|
|
|
|
`See MRI <https://github.com/hanya/MRI/releases>`_
|
|
"""
|
|
|
|
m = create_instance('mytools.Mri')
|
|
if m is None:
|
|
msg = 'Extension MRI not found'
|
|
error(msg)
|
|
return
|
|
|
|
if hasattr(obj, 'obj'):
|
|
obj = obj.obj
|
|
m.inspect(obj)
|
|
return
|
|
|
|
|
|
def catch_exception(f):
|
|
"""Catch exception for any function
|
|
|
|
:param f: Any Python function
|
|
:type f: Function instance
|
|
"""
|
|
|
|
@wraps(f)
|
|
def func(*args, **kwargs):
|
|
try:
|
|
return f(*args, **kwargs)
|
|
except Exception as e:
|
|
name = f.__name__
|
|
if IS_WIN:
|
|
msgbox(traceback.format_exc())
|
|
log.error(name, exc_info=True)
|
|
return func
|
|
|
|
|
|
def set_app_config(node_name: str, key: str, new_value: Any) -> Any:
|
|
"""Update value for key in node name.
|
|
|
|
:param node_name: Name of node
|
|
:type name: str
|
|
:param key: Name of key
|
|
:type key: str
|
|
:return: True if update sucesfully
|
|
:rtype: bool
|
|
|
|
`See Api ConfigurationUpdateAccess <https://api.libreoffice.org/docs/idl/ref/servicecom_1_1sun_1_1star_1_1configuration_1_1ConfigurationUpdateAccess.html>`_
|
|
"""
|
|
result = True
|
|
current_value = ''
|
|
name = 'com.sun.star.configuration.ConfigurationProvider'
|
|
service = 'com.sun.star.configuration.ConfigurationUpdateAccess'
|
|
cp = create_instance(name, True)
|
|
node = PropertyValue(Name='nodepath', Value=node_name)
|
|
update = cp.createInstanceWithArguments(service, (node,))
|
|
|
|
try:
|
|
current_value = update.getPropertyValue(key)
|
|
update.setPropertyValue(key, new_value)
|
|
update.commitChanges()
|
|
except Exception as e:
|
|
error(e)
|
|
if update.hasByName(key) and current_value:
|
|
update.setPropertyValue(key, current_value)
|
|
update.commitChanges()
|
|
result = False
|
|
|
|
return result
|
|
|
|
|
|
def msgbox(message: Any, title: str=TITLE, buttons=MessageBoxButtons.BUTTONS_OK, \
|
|
type_message_box=MessageBoxType.INFOBOX) -> int:
|
|
"""Create message box
|
|
|
|
:param message: Any type message, all is converted to string.
|
|
:type message: Any
|
|
:param title: The title for message box
|
|
:type title: str
|
|
:param buttons: A combination of `com::sun::star::awt::MessageBoxButtons <https://api.libreoffice.org/docs/idl/ref/namespacecom_1_1sun_1_1star_1_1awt_1_1MessageBoxButtons.html>`_
|
|
:type buttons: long
|
|
:param type_message_box: The `message box type <https://api.libreoffice.org/docs/idl/ref/namespacecom_1_1sun_1_1star_1_1awt.html#ad249d76933bdf54c35f4eaf51a5b7965>`_
|
|
:type type_message_box: enum
|
|
:return: `MessageBoxResult <https://api.libreoffice.org/docs/idl/ref/namespacecom_1_1sun_1_1star_1_1awt_1_1MessageBoxResults.html>`_
|
|
:rtype: int
|
|
|
|
`See Api XMessageBoxFactory <http://api.libreoffice.org/docs/idl/ref/interfacecom_1_1sun_1_1star_1_1awt_1_1XMessageBoxFactory.html>`_
|
|
"""
|
|
|
|
toolkit = create_instance('com.sun.star.awt.Toolkit')
|
|
parent = toolkit.getDesktopWindow()
|
|
box = toolkit.createMessageBox(parent, type_message_box, buttons, title, str(message))
|
|
return box.execute()
|
|
|
|
|
|
def question(message: str, title: str=TITLE) -> bool:
|
|
"""Create message box question, show buttons YES and NO
|
|
|
|
:param message: Message question
|
|
:type message: str
|
|
:param title: The title for message box
|
|
:type title: str
|
|
:return: True if user click YES and False if click NO
|
|
:rtype: bool
|
|
"""
|
|
buttons = MessageBoxButtons.BUTTONS_YES_NO
|
|
result = msgbox(message, title, buttons, MessageBoxType.QUERYBOX)
|
|
return result == YES
|
|
|
|
|
|
def warning(message: Any, title: str=TITLE) -> int:
|
|
"""Create message box with icon warning
|
|
|
|
:param message: Any type message, all is converted to string.
|
|
:type message: Any
|
|
:param title: The title for message box
|
|
:type title: str
|
|
:return: MessageBoxResult
|
|
:rtype: int
|
|
"""
|
|
return msgbox(message, title, type_message_box=MessageBoxType.WARNINGBOX)
|
|
|
|
|
|
def errorbox(message: Any, title: str=TITLE) -> int:
|
|
"""Create message box with icon error
|
|
|
|
:param message: Any type message, all is converted to string.
|
|
:type message: Any
|
|
:param title: The title for message box
|
|
:type title: str
|
|
:return: MessageBoxResult
|
|
:rtype: int
|
|
"""
|
|
return msgbox(message, title, type_message_box=MessageBoxType.ERRORBOX)
|
|
|
|
|
|
def sleep(seconds: int):
|
|
"""Sleep
|
|
"""
|
|
time.sleep(seconds)
|
|
return
|
|
|
|
|
|
def set_properties(model, properties):
|
|
if 'X' in properties:
|
|
properties['PositionX'] = properties.pop('X')
|
|
if 'Y' in properties:
|
|
properties['PositionY'] = properties.pop('Y')
|
|
keys = tuple(properties.keys())
|
|
values = tuple(properties.values())
|
|
model.setPropertyValues(keys, values)
|
|
return
|
|
|
|
|
|
# ~ https://github.com/django/django/blob/main/django/utils/functional.py#L61
|
|
class classproperty:
|
|
|
|
def __init__(self, method=None):
|
|
self.fget = method
|
|
|
|
def __get__(self, instance, cls=None):
|
|
return self.fget(cls)
|
|
|
|
def getter(self, method):
|
|
self.fget = method
|
|
return self
|
|
|
|
|
|
class Services():
|
|
|
|
@classproperty
|
|
def toolkit(cls):
|
|
instance = create_instance('com.sun.star.awt.Toolkit', True)
|
|
return instance
|
|
|
|
|
|
class LOInspect():
|
|
"""Class inspect
|
|
|
|
Inspired by `MRI <https://github.com/hanya/MRI/releases>`_
|
|
"""
|
|
TYPE_CLASSES = {
|
|
'INTERFACE': '-Interface-',
|
|
'SEQUENCE': '-Sequence-',
|
|
'STRUCT': '-Struct-',
|
|
}
|
|
|
|
def __init__(self, obj: Any, to_doc: bool=False):
|
|
"""Introspection objects pyUno
|
|
|
|
:param obj: Object to inspect
|
|
:type obj: Any pyUno
|
|
:param to_doc: If show info in new doc Calc
|
|
:type to_doc: bool
|
|
"""
|
|
self._obj = obj
|
|
if hasattr(obj, 'obj'):
|
|
self._obj = obj.obj
|
|
self._properties = ()
|
|
self._methods = ()
|
|
self._interfaces = ()
|
|
self._services = ()
|
|
self._listeners = ()
|
|
|
|
introspection = create_instance('com.sun.star.beans.Introspection')
|
|
result = introspection.inspect(self._obj)
|
|
if result:
|
|
self._properties = self._get_properties(result)
|
|
self._methods = self._get_methods(result)
|
|
self._interfaces = self._get_interfaces(result)
|
|
self._services = self._get_services(self._obj)
|
|
self._listeners = self._get_listeners(result)
|
|
self._to_doc(to_doc)
|
|
|
|
def _to_doc(self, to_doc: bool):
|
|
if not to_doc:
|
|
return
|
|
|
|
doc = LODocuments().new()
|
|
sheet = doc[0]
|
|
sheet.name = 'Properties'
|
|
sheet['A1'].data = self.properties
|
|
|
|
sheet = doc.insert('Methods')
|
|
sheet['A1'].data = self.methods
|
|
|
|
sheet = doc.insert('Interfaces')
|
|
sheet['A1'].data = self.interfaces
|
|
|
|
sheet = doc.insert('Services')
|
|
sheet['A1'].data = self.services
|
|
|
|
sheet = doc.insert('Listeners')
|
|
sheet['A1'].data = self.listeners
|
|
|
|
return
|
|
|
|
def _get_value(self, p: Any):
|
|
type_class = p.Type.typeClass.value
|
|
if type_class in self.TYPE_CLASSES:
|
|
return self.TYPE_CLASSES[type_class]
|
|
|
|
value = ''
|
|
try:
|
|
value = getattr(self._obj, p.Name)
|
|
if type_class == 'ENUM' and value:
|
|
value = value.value
|
|
elif type_class == 'TYPE':
|
|
value = value.typeName
|
|
elif value is None:
|
|
value = '-void-'
|
|
except:
|
|
value = '-error-'
|
|
|
|
return str(value)
|
|
|
|
def _get_attributes(self, a: Any):
|
|
PA = {1 : 'Maybe Void', 16 : 'Read Only'}
|
|
attr = ', '.join([PA.get(k, '') for k in PA.keys() if a & k])
|
|
return attr
|
|
|
|
def _get_property(self, p: Any):
|
|
name = p.Name
|
|
tipo = p.Type.typeName
|
|
value = self._get_value(p)
|
|
attr = self._get_attributes(p.Attributes)
|
|
return name, tipo, value, attr
|
|
|
|
def _get_properties(self, result: Any):
|
|
properties = result.getProperties(ALL)
|
|
data = [('Name', 'Type', 'Value', 'Attributes')]
|
|
data += [self._get_property(p) for p in properties]
|
|
return data
|
|
|
|
def _get_arguments(self, m: Any):
|
|
arguments = '( {} )'.format(', '.join(
|
|
[' '.join((
|
|
f'[{p.aMode.value.lower()}]',
|
|
p.aName,
|
|
p.aType.Name)) for p in m.ParameterInfos]
|
|
))
|
|
return arguments
|
|
|
|
def _get_method(self, m: Any):
|
|
name = m.Name
|
|
arguments = self._get_arguments(m)
|
|
return_type = m.ReturnType.Name
|
|
class_name = m.DeclaringClass.Name
|
|
return name, arguments, return_type, class_name
|
|
|
|
def _get_methods(self, result: Any):
|
|
methods = result.getMethods(ALL)
|
|
data = [('Name', 'Arguments', 'Return Type', 'Class')]
|
|
data += [self._get_method(m) for m in methods]
|
|
return data
|
|
|
|
def _get_interfaces(self, result: Any):
|
|
methods = result.getMethods(ALL)
|
|
interfaces = {m.DeclaringClass.Name for m in methods}
|
|
return tuple(zip(interfaces))
|
|
|
|
def _get_services(self, obj: Any):
|
|
try:
|
|
data = [str(s) for s in obj.getSupportedServiceNames()]
|
|
data = tuple(zip(data))
|
|
except:
|
|
data = ()
|
|
return data
|
|
|
|
def _get_listeners(self, result: Any):
|
|
data = [l.typeName for l in result.getSupportedListeners()]
|
|
return tuple(zip(data))
|
|
|
|
@property
|
|
def properties(self):
|
|
return self._properties
|
|
|
|
@property
|
|
def methods(self):
|
|
return self._methods
|
|
|
|
@property
|
|
def interfaces(self):
|
|
return self._interfaces
|
|
|
|
@property
|
|
def services(self):
|
|
return self._services
|
|
|
|
@property
|
|
def listeners(self):
|
|
return self._listeners
|
|
|
|
|
|
class Dates(object):
|
|
"""Class for datetimes
|
|
"""
|
|
_start = None
|
|
|
|
@classproperty
|
|
def now(cls):
|
|
"""Current local date and time
|
|
|
|
:return: Return the current local date and time, remove microseconds.
|
|
:rtype: datetime
|
|
"""
|
|
return datetime.datetime.now().replace(microsecond=0)
|
|
|
|
@classproperty
|
|
def today(cls):
|
|
"""Current local date
|
|
|
|
:return: Return the current local date
|
|
:rtype: date
|
|
"""
|
|
return datetime.date.today()
|
|
|
|
@classproperty
|
|
def epoch(cls):
|
|
"""Get unix time
|
|
|
|
:return: Return unix time
|
|
:rtype: int
|
|
|
|
`See Unix Time <https://en.wikipedia.org/wiki/Unix_time>`_
|
|
"""
|
|
n = cls.now
|
|
e = int(time.mktime(n.timetuple()))
|
|
return e
|
|
|
|
@classmethod
|
|
def date(cls, year: int, month: int, day: int):
|
|
"""Get date from year, month, day
|
|
|
|
:param year: Year of date
|
|
:type year: int
|
|
:param month: Month of date
|
|
:type month: int
|
|
:param day: Day of day
|
|
:type day: int
|
|
:return: Return the date
|
|
:rtype: date
|
|
|
|
`See Python date <https://docs.python.org/3/library/datetime.html#date-objects>`_
|
|
"""
|
|
d = datetime.date(year, month, day)
|
|
return d
|
|
|
|
@classmethod
|
|
def time(cls, hours: int, minutes: int, seconds: int):
|
|
"""Get time from hour, minutes, seconds
|
|
|
|
:param hours: Hours of time
|
|
:type hours: int
|
|
:param minutes: Minutes of time
|
|
:type minutes: int
|
|
:param seconds: Seconds of time
|
|
:type seconds: int
|
|
:return: Return the time
|
|
:rtype: datetime.time
|
|
"""
|
|
t = datetime.time(hours, minutes, seconds)
|
|
return t
|
|
|
|
@classmethod
|
|
def datetime(cls, year: int, month: int, day: int, hours: int, minutes: int, seconds: int):
|
|
"""Get datetime from year, month, day, hours, minutes and seconds
|
|
|
|
:param year: Year of date
|
|
:type year: int
|
|
:param month: Month of date
|
|
:type month: int
|
|
:param day: Day of day
|
|
:type day: int
|
|
:param hours: Hours of time
|
|
:type hours: int
|
|
:param minutes: Minutes of time
|
|
:type minutes: int
|
|
:param seconds: Seconds of time
|
|
:type seconds: int
|
|
:return: Return datetime
|
|
:rtype: datetime
|
|
"""
|
|
dt = datetime.datetime(year, month, day, hours, minutes, seconds)
|
|
return dt
|
|
|
|
@classmethod
|
|
def str_to_date(cls, str_date: str, template: str, to_calc: bool=False):
|
|
"""Get date from string
|
|
|
|
:param str_date: Date in string
|
|
:type str_date: str
|
|
:param template: Formato of date string
|
|
:type template: str
|
|
:param to_calc: If date is for used in Calc cell
|
|
:type to_calc: bool
|
|
:return: Return date or int if used in Calc
|
|
:rtype: date or int
|
|
|
|
`See Python strptime <https://docs.python.org/3/library/datetime.html#datetime.datetime.strptime>`_
|
|
"""
|
|
d = datetime.datetime.strptime(str_date, template).date()
|
|
if to_calc:
|
|
d = d.toordinal() - DATE_OFFSET
|
|
return d
|
|
|
|
@classmethod
|
|
def calc_to_date(cls, value: float):
|
|
"""Get date from calc value
|
|
|
|
:param value: Float value from cell
|
|
:type value: float
|
|
:return: Return the current local date
|
|
:rtype: date
|
|
|
|
`See Python fromordinal <https://docs.python.org/3/library/datetime.html#datetime.datetime.fromordinal>`_
|
|
"""
|
|
d = datetime.date.fromordinal(int(value) + DATE_OFFSET)
|
|
return d
|
|
|
|
@classmethod
|
|
def start(cls):
|
|
"""Start counter
|
|
"""
|
|
cls._start = cls.now
|
|
info('Start: ', cls._start)
|
|
return
|
|
|
|
@classmethod
|
|
def end(cls, get_seconds: bool=True):
|
|
"""End counter
|
|
|
|
:param get_seconds: If return value in total seconds
|
|
:type get_seconds: bool
|
|
:return: Return the timedelta or total seconds
|
|
:rtype: timedelta or int
|
|
"""
|
|
e = cls.now
|
|
td = e - cls._start
|
|
result = str(td)
|
|
if get_seconds:
|
|
result = td.total_seconds()
|
|
info('End: ', e)
|
|
return result
|
|
|
|
|
|
class Paths(object):
|
|
"""Class for paths
|
|
"""
|
|
FILE_PICKER = 'com.sun.star.ui.dialogs.FilePicker'
|
|
FOLDER_PICKER = 'com.sun.star.ui.dialogs.FolderPicker'
|
|
REMOTE_FILE_PICKER = 'com.sun.star.ui.dialogs.RemoteFilePicker'
|
|
OFFICE_FILE_PICKER = 'com.sun.star.ui.dialogs.OfficeFilePicker'
|
|
|
|
def __init__(self, path=''):
|
|
if path.startswith('file://'):
|
|
path = str(Path(uno.fileUrlToSystemPath(path)).resolve())
|
|
self._path = Path(path)
|
|
|
|
@property
|
|
def path(self):
|
|
"""Get base path"""
|
|
return str(self._path.parent)
|
|
|
|
@property
|
|
def file_name(self):
|
|
"""Get file name"""
|
|
return self._path.name
|
|
|
|
@property
|
|
def name(self):
|
|
"""Get name"""
|
|
return self._path.stem
|
|
|
|
@property
|
|
def ext(self):
|
|
"""Get extension"""
|
|
return self._path.suffix[1:]
|
|
|
|
@property
|
|
def size(self):
|
|
"""Get size"""
|
|
return self._path.stat().st_size
|
|
|
|
@property
|
|
def url(self):
|
|
"""Get like URL"""
|
|
return self._path.as_uri()
|
|
|
|
@property
|
|
def info(self):
|
|
"""Get all info like tuple"""
|
|
i = (self.path, self.file_name, self.name, self.ext, self.size, self.url)
|
|
return i
|
|
|
|
@property
|
|
def dict(self):
|
|
"""Get all info like dict"""
|
|
data = {
|
|
'path': self.path,
|
|
'file_name': self.file_name,
|
|
'name': self.name,
|
|
'ext': self.ext,
|
|
'size': self.size,
|
|
'url': self.url,
|
|
}
|
|
return data
|
|
|
|
@classproperty
|
|
def home(self):
|
|
"""Get user home"""
|
|
return str(Path.home())
|
|
|
|
@classproperty
|
|
def documents(self):
|
|
"""Get user save documents"""
|
|
return self.config()
|
|
|
|
@classproperty
|
|
def user_profile(self):
|
|
"""Get path user profile"""
|
|
path = self.config('UserConfig')
|
|
path = str(Path(path).parent)
|
|
return path
|
|
|
|
@classproperty
|
|
def user_config(self):
|
|
"""Get path config in user profile"""
|
|
path = self.config('UserConfig')
|
|
return path
|
|
|
|
@classproperty
|
|
def python(self):
|
|
"""Get path executable python"""
|
|
if IS_WIN:
|
|
path = self.join(self.config('Module'), PYTHON)
|
|
elif IS_MAC:
|
|
path = self.join(self.config('Module'), '..', 'Resources', PYTHON)
|
|
else:
|
|
path = sys.executable
|
|
return path
|
|
|
|
@classmethod
|
|
def to_url(cls, path: str) -> str:
|
|
"""Convert paths in format system to URL
|
|
|
|
:param path: Path to convert
|
|
:type path: str
|
|
:return: Path in URL
|
|
:rtype: str
|
|
"""
|
|
if not path.startswith('file://'):
|
|
path = Path(path).as_uri()
|
|
return path
|
|
|
|
@classmethod
|
|
def to_system(cls, path:str) -> str:
|
|
"""Convert paths in URL to system
|
|
|
|
:param path: Path to convert
|
|
:type path: str
|
|
:return: Path system format
|
|
:rtype: str
|
|
"""
|
|
if path.startswith('file://'):
|
|
path = str(Path(uno.fileUrlToSystemPath(path)).resolve())
|
|
return path
|
|
|
|
@classmethod
|
|
def config(cls, name: str='Work') -> Union[str, list]:
|
|
"""Return path from config
|
|
|
|
:param name: Name in service PathSettings, default get path documents
|
|
:type name: str
|
|
:return: Path in config, if exists.
|
|
:rtype: str or list
|
|
|
|
`See Api XPathSettings <http://api.libreoffice.org/docs/idl/ref/interfacecom_1_1sun_1_1star_1_1util_1_1XPathSettings.html>`_
|
|
"""
|
|
path = create_instance('com.sun.star.util.PathSettings')
|
|
path = cls.to_system(getattr(path, name)).split(';')
|
|
if len(path) == 1:
|
|
path = path[0]
|
|
return path
|
|
|
|
@classmethod
|
|
def join(cls, *paths: str) -> str:
|
|
"""Join paths
|
|
|
|
:param paths: Paths to join
|
|
:type paths: list
|
|
:return: New path with joins
|
|
:rtype: str
|
|
"""
|
|
path = str(Path(paths[0]).joinpath(*paths[1:]))
|
|
return path
|
|
|
|
@classmethod
|
|
def exists(cls, path: str) -> bool:
|
|
"""If exists path
|
|
|
|
:param path: Path for validate
|
|
:type path: str
|
|
:return: True if path exists, False if not.
|
|
:rtype: bool
|
|
"""
|
|
path = cls.to_system(path)
|
|
result = Path(path).exists()
|
|
return result
|
|
|
|
@classmethod
|
|
def exists_app(cls, name_app: str) -> bool:
|
|
"""If exists app in system
|
|
|
|
:param name_app: Name of application
|
|
:type name_app: str
|
|
:return: True if app exists, False if not.
|
|
:rtype: bool
|
|
"""
|
|
result = bool(shutil.which(name_app))
|
|
return result
|
|
|
|
@classmethod
|
|
def is_dir(cls, path: str):
|
|
"""Validate if path is directory
|
|
|
|
:param path: Path for validate
|
|
:type path: str
|
|
:return: True if path is directory, False if not.
|
|
:rtype: bool
|
|
"""
|
|
return Path(path).is_dir()
|
|
|
|
@classmethod
|
|
def is_file(cls, path: str):
|
|
"""Validate if path is a file
|
|
|
|
:param path: Path for validate
|
|
:type path: str
|
|
:return: True if path is a file, False if not.
|
|
:rtype: bool
|
|
"""
|
|
return Path(path).is_file()
|
|
|
|
@classmethod
|
|
def temp_file(self):
|
|
"""Make temporary file"""
|
|
return tempfile.NamedTemporaryFile(mode='w')
|
|
|
|
@classmethod
|
|
def temp_dir(self):
|
|
"""Make temporary directory"""
|
|
return tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
|
|
|
|
@classmethod
|
|
def get(cls, init_dir: str='', filters: str='', multiple: bool=False) -> Union[str, list]:
|
|
"""Get path for open
|
|
|
|
:param init_dir: Initial default path
|
|
:type init_dir: str
|
|
:param filters: Filter for show type files: 'xml' or 'txt,xml'
|
|
:type filters: str
|
|
:param multiple: If user can selected multiple files
|
|
:type multiple: bool
|
|
:return: Selected path or paths
|
|
:rtype: str or list
|
|
|
|
`See API <https://api.libreoffice.org/docs/idl/ref/namespacecom_1_1sun_1_1star_1_1ui_1_1dialogs_1_1TemplateDescription.html>`_
|
|
"""
|
|
if not init_dir:
|
|
init_dir = cls.documents
|
|
init_dir = cls.to_url(init_dir)
|
|
file_picker = create_instance(cls.FILE_PICKER)
|
|
file_picker.setTitle(_('Select path'))
|
|
file_picker.setDisplayDirectory(init_dir)
|
|
file_picker.initialize((TemplateDescription.FILEOPEN_SIMPLE,))
|
|
file_picker.setMultiSelectionMode(multiple)
|
|
if filters:
|
|
for f in filters.split(','):
|
|
file_picker.appendFilter(f.upper(), f'*.{f.lower()}')
|
|
|
|
if file_picker.execute():
|
|
paths = [cls.to_system(p) for p in file_picker.getSelectedFiles()]
|
|
if not multiple:
|
|
paths = paths[0]
|
|
return paths
|
|
|
|
@classmethod
|
|
def get_dir(cls, init_dir: str='') -> str:
|
|
"""Get path dir
|
|
|
|
:param init_dir: Initial default path
|
|
:type init_dir: str
|
|
:return: Selected path
|
|
:rtype: str
|
|
"""
|
|
folder_picker = create_instance(cls.FOLDER_PICKER)
|
|
if not init_dir:
|
|
init_dir = cls.documents
|
|
init_dir = cls.to_url(init_dir)
|
|
folder_picker.setTitle(_('Select directory'))
|
|
folder_picker.setDisplayDirectory(init_dir)
|
|
|
|
path = ''
|
|
if folder_picker.execute():
|
|
path = cls.to_system(folder_picker.getDirectory())
|
|
return path
|
|
|
|
@classmethod
|
|
def get_for_save(cls, init_dir: str='', filters: str=''):
|
|
"""Get path for save
|
|
|
|
:param init_dir: Initial default path
|
|
:type init_dir: str
|
|
:param filters: Filter for show type files: 'xml' or 'txt,xml'
|
|
:type filters: str
|
|
:return: Selected path
|
|
:rtype: str
|
|
"""
|
|
if not init_dir:
|
|
init_dir = cls.documents
|
|
init_dir = cls.to_url(init_dir)
|
|
|
|
file_picker = create_instance(cls.FILE_PICKER)
|
|
file_picker.setTitle(_('Select file'))
|
|
file_picker.setDisplayDirectory(init_dir)
|
|
file_picker.initialize((TemplateDescription.FILESAVE_SIMPLE,))
|
|
|
|
if filters:
|
|
for f in filters.split(','):
|
|
file_picker.appendFilter(f.upper(), f'*.{f.lower()}')
|
|
|
|
path = ''
|
|
if file_picker.execute():
|
|
files = file_picker.getSelectedFiles()
|
|
path = [cls.to_system(f) for f in files][0]
|
|
return path
|
|
|
|
@classmethod
|
|
def files(cls, path: str, pattern: str='*'):
|
|
"""Get all files in path
|
|
|
|
:param path: Path with files
|
|
:type path: str
|
|
:param pattern: For filter files, default get all.
|
|
:type pattern: str
|
|
:return: Files in path
|
|
:rtype: list
|
|
"""
|
|
files = [str(p) for p in Path(path).glob(pattern) if p.is_file()]
|
|
return files
|
|
|
|
@classmethod
|
|
def walk(cls, path, filters=''):
|
|
"""Get all files in path recursively
|
|
|
|
:param path: Path with files
|
|
:type path: str
|
|
:param filters: For filter files, default get all.
|
|
:type filters: str
|
|
:return: Files in path
|
|
:rtype: list
|
|
"""
|
|
paths = []
|
|
for folder, _, files in os.walk(path):
|
|
if filters:
|
|
pattern = re.compile(r'\.(?:{})$'.format(filters), re.IGNORECASE)
|
|
paths += [cls.join(folder, f) for f in files if pattern.search(f)]
|
|
else:
|
|
paths += [cls.join(folder, f) for f in files]
|
|
return paths
|
|
|
|
@classmethod
|
|
def dirs(cls, path):
|
|
"""Get directories in path
|
|
|
|
:param path: Path to scan
|
|
:type path: str
|
|
:return: Directories in path
|
|
:rtype: list
|
|
"""
|
|
dirs = [str(p) for p in Path(path).iterdir() if p.is_dir()]
|
|
return dirs
|
|
|
|
@classmethod
|
|
def walk_dirs(cls, path, tree=False):
|
|
"""Get directories recursively
|
|
|
|
:param path: Path to scan
|
|
:type path: str
|
|
:param tree: get info in a tuple (ID_FOLDER, ID_PARENT, NAME)
|
|
:type tree: bool
|
|
:return: Directories in path
|
|
:rtype: list
|
|
"""
|
|
folders = []
|
|
if tree:
|
|
i = 0
|
|
parents = {path: 0}
|
|
for root, dirs, _ in os.walk(path):
|
|
for name in dirs:
|
|
i += 1
|
|
rn = cls.join(root, name)
|
|
if not rn in parents:
|
|
parents[rn] = i
|
|
folders.append((i, parents[root], name))
|
|
else:
|
|
for root, dirs, _ in os.walk(path):
|
|
folders += [cls.join(root, name) for name in dirs]
|
|
return folders
|
|
|
|
@classmethod
|
|
def extension(cls, id_ext: str):
|
|
"""Get path extension install from id
|
|
|
|
:param id_ext: ID extension
|
|
:type id_ext: str
|
|
:return: Path extension
|
|
:rtype: str
|
|
"""
|
|
pip = CTX.getValueByName('/singletons/com.sun.star.deployment.PackageInformationProvider')
|
|
path = Paths.to_system(pip.getPackageLocation(id_ext))
|
|
return path
|
|
|
|
@classmethod
|
|
def replace_ext(cls, path: str, new_ext: str):
|
|
"""Replace extension in file path
|
|
|
|
:param path: Path to file
|
|
:type path: str
|
|
:param new_ext: New extension
|
|
:type new_ext: str
|
|
:return: Path with new extension
|
|
:rtype: str
|
|
"""
|
|
p = Paths(path)
|
|
name = f'{p.name}.{new_ext}'
|
|
path = cls.join(p.path, name)
|
|
return path
|
|
|
|
@classmethod
|
|
def open(cls, path: str):
|
|
"""Open any file with default program in system
|
|
|
|
:param path: Path to file
|
|
:type path: str
|
|
:return: PID file, only Linux
|
|
:rtype: int
|
|
"""
|
|
pid = 0
|
|
if IS_WIN:
|
|
os.startfile(path)
|
|
else:
|
|
pid = subprocess.Popen(['xdg-open', path]).pid
|
|
return pid
|
|
|
|
# ~ Save/read data
|
|
|
|
@classmethod
|
|
def save(cls, path: str, data: str, encoding: str='utf-8') -> bool:
|
|
"""Save data in path with encoding
|
|
|
|
:param path: Path to file save
|
|
:type path: str
|
|
:param data: Data to save
|
|
:type data: str
|
|
:param encoding: Encoding for save data, default utf-8
|
|
:type encoding: str
|
|
:return: True, if save corrrectly
|
|
:rtype: bool
|
|
"""
|
|
result = bool(Path(path).write_text(data, encoding=encoding))
|
|
return result
|
|
|
|
@classmethod
|
|
def save_bin(cls, path: str, data: bytes) -> bool:
|
|
"""Save binary data in path
|
|
|
|
:param path: Path to file save
|
|
:type path: str
|
|
:param data: Data to save
|
|
:type data: bytes
|
|
:return: True, if save corrrectly
|
|
:rtype: bool
|
|
"""
|
|
result = bool(Path(path).write_bytes(data))
|
|
return result
|
|
|
|
@classmethod
|
|
def read(cls, path: str, get_lines: bool=False, encoding: str='utf-8') -> Union[str, list]:
|
|
"""Read data in path
|
|
|
|
:param path: Path to file read
|
|
:type path: str
|
|
:param get_lines: If read file line by line
|
|
:type get_lines: bool
|
|
:return: File content
|
|
:rtype: str or list
|
|
"""
|
|
if get_lines:
|
|
with Path(path).open(encoding=encoding) as f:
|
|
data = f.readlines()
|
|
else:
|
|
data = Path(path).read_text(encoding=encoding)
|
|
return data
|
|
|
|
@classmethod
|
|
def read_bin(cls, path: str) -> bytes:
|
|
"""Read binary data in path
|
|
|
|
:param path: Path to file read
|
|
:type path: str
|
|
:return: File content
|
|
:rtype: bytes
|
|
"""
|
|
data = Path(path).read_bytes()
|
|
return data
|
|
|
|
# ~ Import/export data
|
|
|
|
@classmethod
|
|
def save_json(cls, path: str, data: str):
|
|
"""Save data in path file like json
|
|
|
|
:param path: Path to file
|
|
:type path: str
|
|
:return: True if save correctly
|
|
:rtype: bool
|
|
"""
|
|
data = json.dumps(data, indent=4, ensure_ascii=False, sort_keys=True)
|
|
return cls.save(path, data)
|
|
|
|
@classmethod
|
|
def read_json(cls, path: str) -> Any:
|
|
"""Read path file and load json data
|
|
|
|
:param path: Path to file
|
|
:type path: str
|
|
:return: Any data
|
|
:rtype: Any
|
|
"""
|
|
data = json.loads(cls.read(path))
|
|
return data
|
|
|
|
@classmethod
|
|
def save_csv(cls, path: str, data: Any, args: dict={}):
|
|
"""Write CSV
|
|
|
|
:param path: Path to file write csv
|
|
:type path: str
|
|
:param data: Data to write
|
|
:type data: Iterable
|
|
:param args: Any argument support for Python library
|
|
:type args: dict
|
|
|
|
`See CSV Writer <https://docs.python.org/3.8/library/csv.html#csv.writer>`_
|
|
"""
|
|
with open(path, 'w') as f:
|
|
writer = csv.writer(f, **args)
|
|
writer.writerows(data)
|
|
return
|
|
|
|
@classmethod
|
|
def read_csv(cls, path: str, args: dict={}) -> list:
|
|
"""Read CSV
|
|
|
|
:param path: Path to file csv
|
|
:type path: str
|
|
:param args: Any argument support for Python library
|
|
:type args: dict
|
|
:return: Data csv like tuple
|
|
:rtype: tuple
|
|
|
|
`See CSV Reader <https://docs.python.org/3.8/library/csv.html#csv.reader>`_
|
|
"""
|
|
with open(path) as f:
|
|
rows = list(csv.reader(f, **args))
|
|
return rows
|
|
|
|
@classmethod
|
|
def kill(cls, path: str) -> bool:
|
|
"""Delete path
|
|
|
|
:param path: Path to file or directory
|
|
:type path: str
|
|
:return: True if delete correctly
|
|
:rtype: bool
|
|
"""
|
|
result = False
|
|
|
|
p = Path(path)
|
|
try:
|
|
if p.is_file():
|
|
p.unlink()
|
|
result = True
|
|
elif p.is_dir():
|
|
shutil.rmtree(path)
|
|
result = True
|
|
except OSError as e:
|
|
log.error(e)
|
|
|
|
return result
|
|
|
|
@classmethod
|
|
def copy(cls, source: str, target: str='', name: str='') -> str:
|
|
"""Copy files
|
|
|
|
:param source: Path source
|
|
:type source: str
|
|
:param target: Path target
|
|
:type target: str
|
|
:param name: New name in target
|
|
:type name: str
|
|
:return: Path target
|
|
:rtype: str
|
|
"""
|
|
p, f, n, e, _, _ = Paths(source).info
|
|
if target:
|
|
p = target
|
|
e = f'.{e}'
|
|
if name:
|
|
e = ''
|
|
n = name
|
|
path_new = cls.join(p, f'{n}{e}')
|
|
shutil.copy(source, path_new)
|
|
return path_new
|
|
|
|
@classmethod
|
|
def zip(cls, source: Union[str, tuple, list], target: str='') -> str:
|
|
path_zip = target
|
|
if not isinstance(source, (tuple, list)):
|
|
path, _, name, _ = _P(source).info
|
|
start = len(path) + 1
|
|
if not target:
|
|
path_zip = f'{path}/{name}.zip'
|
|
|
|
if isinstance(source, (tuple, list)):
|
|
files = [(f, f[len(_P(f).path)+1:]) for f in source]
|
|
elif _P.is_file(source):
|
|
files = ((source, source[start:]),)
|
|
else:
|
|
files = [(f, f[start:]) for f in _P.walk(source)]
|
|
|
|
compression = zipfile.ZIP_DEFLATED
|
|
with zipfile.ZipFile(path_zip, 'w', compression=compression) as z:
|
|
for f in files:
|
|
z.write(f[0], f[1])
|
|
return path_zip
|
|
|
|
@classmethod
|
|
def unzip(cls, source: str, target: str='', members=None, pwd=None):
|
|
path = target
|
|
if not target:
|
|
path = _P(source).path
|
|
with zipfile.ZipFile(source) as z:
|
|
if not pwd is None:
|
|
pwd = pwd.encode()
|
|
if isinstance(members, str):
|
|
members = (members,)
|
|
z.extractall(path, members=members, pwd=pwd)
|
|
return
|
|
|
|
@classmethod
|
|
def zip_content(cls, path: str):
|
|
with zipfile.ZipFile(path) as z:
|
|
names = z.namelist()
|
|
return names
|
|
|
|
@classmethod
|
|
def merge_zip(cls, target, zips):
|
|
try:
|
|
with zipfile.ZipFile(target, 'w', compression=zipfile.ZIP_DEFLATED) as t:
|
|
for path in zips:
|
|
with zipfile.ZipFile(path, compression=zipfile.ZIP_DEFLATED) as s:
|
|
for name in s.namelist():
|
|
t.writestr(name, s.open(name).read())
|
|
except Exception as e:
|
|
error(e)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class Email():
|
|
"""Class for send email
|
|
"""
|
|
class SmtpServer(object):
|
|
|
|
def __init__(self, config):
|
|
self._server = None
|
|
self._error = ''
|
|
self._sender = ''
|
|
self._is_connect = self._login(config)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.close()
|
|
|
|
@property
|
|
def is_connect(self):
|
|
return self._is_connect
|
|
|
|
@property
|
|
def error(self):
|
|
return self._error
|
|
|
|
def _login(self, config):
|
|
name = config['server']
|
|
port = config['port']
|
|
is_ssl = config['ssl']
|
|
starttls = config.get('starttls', False)
|
|
self._sender = config['user']
|
|
try:
|
|
if starttls:
|
|
self._server = smtplib.SMTP(name, port, timeout=TIMEOUT)
|
|
self._server.ehlo()
|
|
self._server.starttls()
|
|
self._server.ehlo()
|
|
elif is_ssl:
|
|
self._server = smtplib.SMTP_SSL(name, port, timeout=TIMEOUT)
|
|
self._server.ehlo()
|
|
else:
|
|
self._server = smtplib.SMTP(name, port, timeout=TIMEOUT)
|
|
|
|
self._server.login(self._sender, config['password'])
|
|
msg = 'Connect to: {}'.format(name)
|
|
debug(msg)
|
|
return True
|
|
except smtplib.SMTPAuthenticationError as e:
|
|
if '535' in str(e):
|
|
self._error = _('Incorrect user or password')
|
|
return False
|
|
if '534' in str(e) and 'gmail' in name:
|
|
self._error = _('Allow less secure apps in GMail')
|
|
return False
|
|
except smtplib.SMTPException as e:
|
|
self._error = str(e)
|
|
return False
|
|
except Exception as e:
|
|
self._error = str(e)
|
|
return False
|
|
return False
|
|
|
|
def _body(self, msg):
|
|
body = msg.replace('\n', '<BR>')
|
|
return body
|
|
|
|
def send(self, message):
|
|
# ~ file_name = 'attachment; filename={}'
|
|
email = MIMEMultipart()
|
|
email['From'] = self._sender
|
|
email['To'] = message['to']
|
|
email['Cc'] = message.get('cc', '')
|
|
email['Subject'] = message['subject']
|
|
email['Date'] = formatdate(localtime=True)
|
|
if message.get('confirm', False):
|
|
email['Disposition-Notification-To'] = email['From']
|
|
email.attach(MIMEText(self._body(message['body']), 'html'))
|
|
|
|
paths = message.get('files', ())
|
|
if isinstance(paths, str):
|
|
paths = (paths,)
|
|
for path in paths:
|
|
fn = Paths(path).file_name
|
|
# ~ print('NAME', fn)
|
|
part = MIMEBase('application', 'octet-stream')
|
|
part.set_payload(Paths.read_bin(path))
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
'Content-Disposition', f'attachment; filename="{fn}"')
|
|
email.attach(part)
|
|
|
|
receivers = (
|
|
email['To'].split(',') +
|
|
email['CC'].split(',') +
|
|
message.get('bcc', '').split(','))
|
|
try:
|
|
self._server.sendmail(self._sender, receivers, email.as_string())
|
|
msg = 'Email sent...'
|
|
debug(msg)
|
|
if message.get('path', ''):
|
|
self.save_message(email, message['path'])
|
|
return True
|
|
except Exception as e:
|
|
self._error = str(e)
|
|
return False
|
|
return False
|
|
|
|
def save_message(self, email, path):
|
|
mbox = mailbox.mbox(path, create=True)
|
|
mbox.lock()
|
|
try:
|
|
msg = mailbox.mboxMessage(email)
|
|
mbox.add(msg)
|
|
mbox.flush()
|
|
finally:
|
|
mbox.unlock()
|
|
return
|
|
|
|
def close(self):
|
|
try:
|
|
self._server.quit()
|
|
msg = 'Close connection...'
|
|
debug(msg)
|
|
except:
|
|
pass
|
|
return
|
|
|
|
@classmethod
|
|
def _send_email(cls, server, messages):
|
|
with cls.SmtpServer(server) as server:
|
|
if server.is_connect:
|
|
for msg in messages:
|
|
server.send(msg)
|
|
else:
|
|
log.error(server.error)
|
|
return server.error
|
|
|
|
@classmethod
|
|
def send(cls, server: dict, messages: Union[dict, tuple, list]):
|
|
"""Send email with config server, emails send in thread.
|
|
|
|
:param server: Configuration for send emails
|
|
:type server: dict
|
|
:param server: Dictionary con message or list of messages
|
|
:type server: dict or iterator
|
|
"""
|
|
if isinstance(messages, dict):
|
|
messages = (messages,)
|
|
t = threading.Thread(target=cls._send_email, args=(server, messages))
|
|
t.start()
|
|
return
|