diff --git a/source/conf.py.example b/source/conf.py.example index 106177d..713c604 100644 --- a/source/conf.py.example +++ b/source/conf.py.example @@ -23,7 +23,7 @@ import logging # ~ 1 = normal extension # ~ 2 = new component # ~ 3 = Calc addin -TYPE_EXTENSION = 2 +TYPE_EXTENSION = 1 # ~ https://semver.org/ VERSION = '0.1.0' diff --git a/source/easymacro.py b/source/easymacro.py index 138df70..51da1b3 100644 --- a/source/easymacro.py +++ b/source/easymacro.py @@ -18,15 +18,25 @@ # ~ along with ZAZ. If not, see . + +import errno import getpass import logging import os import platform +import shlex +import subprocess import sys import tempfile import threading import time +import zipfile + +from datetime import datetime from functools import wraps +from pathlib import Path, PurePath +from pprint import pprint +from subprocess import PIPE import uno import unohelper @@ -66,11 +76,15 @@ logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) log = logging.getLogger(__name__) -OS = sys.platform +OS = platform.system() USER = getpass.getuser() PC = platform.node() -WIN = 'win32' +IS_WIN = OS == 'Windows' +LOG_NAME = 'ZAZ' + +CALC = 'calc' +WRITER = 'writer' OBJ_CELL = 'ScCellObj' OBJ_RANGE = 'ScCellRangeObj' OBJ_RANGES = 'ScCellRangesObj' @@ -122,17 +136,6 @@ def mri(obj): return -def debug(*info): - for i in info: - log.debug(i) - return - - -def error(info): - log.error(info) - return - - def catch_exception(f): @wraps(f) def func(*args, **kwargs): @@ -143,6 +146,51 @@ def catch_exception(f): return func +class LogWin(object): + + def __init__(self, doc): + self.doc = doc + + def write(self, info): + text = self.doc.Text + cursor = text.createTextCursor() + cursor.gotoEnd(False) + text.insertString(cursor, str(info), 0) + return + + +def info(data): + log.info(data) + return + + +def debug(info): + if IS_WIN: + # ~ app = LOApp(self.ctx, self.sm, self.desktop, self.toolkit) + # ~ doc = app.getDoc(FILE_NAME_DEBUG) + # ~ if not doc: + # ~ doc = app.newDoc(WRITER) + # ~ out = OutputDoc(doc) + # ~ sys.stdout = out + pprint(info) + return + + log.debug(info) + return + + +def error(info): + log.error(info) + return + + +def save_log(path, data): + with open(path, 'a') as out: + out.write('{} -{}- '.format(str(datetime.now())[:19], LOG_NAME)) + pprint(data, stream=out) + return + + def run_in_thread(fn): def run(*k, **kw): t = threading.Thread(target=fn, args=k, kwargs=kw) @@ -206,6 +254,20 @@ def _path_system(path): return path +def exists_app(name): + try: + dn = subprocess.DEVNULL + subprocess.Popen([name, ''], stdout=dn, stderr=dn).terminate() + except OSError as e: + if e.errno == errno.ENOENT: + return False + return True + + +def exists(path): + return Path(path).exists() + + def get_type_doc(obj): services = { 'calc': 'com.sun.star.sheet.SpreadsheetDocument', @@ -289,10 +351,23 @@ class LODocument(object): obj = self.obj.createInstance(name) return obj + def save(self, path='', **kwargs): + opt = _properties(kwargs) + if path: + self._obj.storeAsURL(_path_url(path), opt) + else: + self._obj.store() + return True + def close(self): self.obj.close(True) return + def focus(self): + w = self._cc.getFrame().getComponentWindow() + w.setFocus() + return + class LOCalc(LODocument): @@ -1002,7 +1077,6 @@ def create_dialog(properties): return LODialog(properties) -@catch_exception def set_properties(model, properties): if 'X' in properties: properties['PositionX'] = properties.pop('X') @@ -1109,7 +1183,13 @@ def inputbox(message, default='', title=TITLE): return '' -def open(path, **kwargs): +def new_doc(type_doc=CALC): + path = 'private:factory/s{}'.format(type_doc) + doc = get_desktop().loadComponentFromURL(path, '_default', 0, ()) + return _get_class_doc(doc) + + +def open_doc(path, **kwargs): """ Open document in path Usually options: Hidden: True or False @@ -1129,12 +1209,160 @@ def open(path, **kwargs): def open_file(path): - if OS == WIN: + if IS_WIN: os.startfile(path) else: - subprocess.call(['xdg-open', path]) + subprocess.Popen(['xdg-open', path]) return def join(*paths): return os.path.join(*paths) + + +def is_dir(path): + return Path(path).is_dir() + + +def is_file(path): + return Path(path).is_file() + + +def get_file_size(path): + return Path(path).stat().st_size + + +def is_created(path): + return is_file(path) and bool(get_file_size(path)) + + +def replace_ext(path, ext): + path, _, name, _ = get_info_path(path) + return '{}/{}.{}'.format(path, name, ext) + + +def zip_names(path): + with zipfile.ZipFile(path) as z: + names = z.namelist() + return names + + +def run(command, wait=False): + # ~ debug(command) + # ~ debug(shlex.split(command)) + try: + if wait: + p = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE) + p.wait() + else: + p = subprocess.Popen(shlex.split(command), stdin=None, + stdout=None, stderr=None, close_fds=True) + result, er = p.communicate() + except subprocess.CalledProcessError as e: + msg = ("run [ERROR]: output = %s, error code = %s\n" + % (e.output, e.returncode)) + error(msg) + return False + + if result is None: + return True + + return result.decode() + + +def _zippwd(source, target, pwd): + if IS_WIN: + return False + if not exists_app('zip'): + return False + + cmd = 'zip' + opt = '-j ' + args = "{} --password {} ".format(cmd, pwd) + + if isinstance(source, (tuple, list)): + if not target: + return False + args += opt + target + ' ' + ' '.join(source) + else: + if is_file(source) and not target: + target = replace_ext(source, 'zip') + elif is_dir(source) and not target: + target = join(PurePath(source).parent, + '{}.zip'.format(PurePath(source).name)) + opt = '-r ' + args += opt + target + ' ' + source + + result = run(args, True) + if not result: + return False + + return is_created(target) + + +def zip(source, target='', mode='w', pwd=''): + if pwd: + return _zippwd(source, target, pwd) + + if isinstance(source, (tuple, list)): + if not target: + return False + + with zipfile.ZipFile(target, mode, compression=zipfile.ZIP_DEFLATED) as z: + for path in source: + _, name, _, _ = get_info_path(path) + z.write(path, name) + + return is_created(target) + + if is_file(source): + if not target: + target = replace_ext(source, 'zip') + z = zipfile.ZipFile(target, mode, compression=zipfile.ZIP_DEFLATED) + _, name, _, _ = get_info_path(source) + z.write(source, name) + z.close() + return is_created(target) + + if not target: + target = join( + PurePath(source).parent, + '{}.zip'.format(PurePath(source).name)) + z = zipfile.ZipFile(target, mode, compression=zipfile.ZIP_DEFLATED) + root_len = len(os.path.abspath(source)) + for root, dirs, files in os.walk(source): + relative = os.path.abspath(root)[root_len:] + for f in files: + fullpath = join(root, f) + file_name = join(relative, f) + z.write(fullpath, file_name) + z.close() + + return is_created(target) + + +def unzip(source, path='', members=None, pwd=None): + if not path: + path, _, _, _ = get_info_path(source) + with zipfile.ZipFile(source) as z: + if not pwd is None: + pwd = pwd.encode() + if isinstance(members, str): + members = (members,) + z.extractall(path, members=members, pwd=pwd) + return True + + +def merge_zip(target, zips): + try: + with zipfile.ZipFile(target, 'w', compression=zipfile.ZIP_DEFLATED) as t: + for path in zips: + with zipfile.ZipFile(path, compression=zipfile.ZIP_DEFLATED) as s: + for name in s.namelist(): + t.writestr(name, s.open(name).read()) + except Exception as e: + error(e) + return False + + return True +