Add support for zip/unzip

This commit is contained in:
Mauricio Baeza 2019-09-11 23:53:29 -05:00
parent c7be0b9c98
commit 118395df71
2 changed files with 246 additions and 18 deletions

View File

@ -23,7 +23,7 @@ import logging
# ~ 1 = normal extension
# ~ 2 = new component
# ~ 3 = Calc addin
TYPE_EXTENSION = 2
TYPE_EXTENSION = 1
# ~ https://semver.org/
VERSION = '0.1.0'

View File

@ -18,15 +18,25 @@
# ~ along with ZAZ. If not, see <https://www.gnu.org/licenses/>.
import errno
import getpass
import logging
import os
import platform
import shlex
import subprocess
import sys
import tempfile
import threading
import time
import zipfile
from datetime import datetime
from functools import wraps
from pathlib import Path, PurePath
from pprint import pprint
from subprocess import PIPE
import uno
import unohelper
@ -66,11 +76,15 @@ logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
OS = sys.platform
OS = platform.system()
USER = getpass.getuser()
PC = platform.node()
WIN = 'win32'
IS_WIN = OS == 'Windows'
LOG_NAME = 'ZAZ'
CALC = 'calc'
WRITER = 'writer'
OBJ_CELL = 'ScCellObj'
OBJ_RANGE = 'ScCellRangeObj'
OBJ_RANGES = 'ScCellRangesObj'
@ -122,17 +136,6 @@ def mri(obj):
return
def debug(*info):
for i in info:
log.debug(i)
return
def error(info):
log.error(info)
return
def catch_exception(f):
@wraps(f)
def func(*args, **kwargs):
@ -143,6 +146,51 @@ def catch_exception(f):
return func
class LogWin(object):
def __init__(self, doc):
self.doc = doc
def write(self, info):
text = self.doc.Text
cursor = text.createTextCursor()
cursor.gotoEnd(False)
text.insertString(cursor, str(info), 0)
return
def info(data):
log.info(data)
return
def debug(info):
if IS_WIN:
# ~ app = LOApp(self.ctx, self.sm, self.desktop, self.toolkit)
# ~ doc = app.getDoc(FILE_NAME_DEBUG)
# ~ if not doc:
# ~ doc = app.newDoc(WRITER)
# ~ out = OutputDoc(doc)
# ~ sys.stdout = out
pprint(info)
return
log.debug(info)
return
def error(info):
log.error(info)
return
def save_log(path, data):
with open(path, 'a') as out:
out.write('{} -{}- '.format(str(datetime.now())[:19], LOG_NAME))
pprint(data, stream=out)
return
def run_in_thread(fn):
def run(*k, **kw):
t = threading.Thread(target=fn, args=k, kwargs=kw)
@ -206,6 +254,20 @@ def _path_system(path):
return path
def exists_app(name):
try:
dn = subprocess.DEVNULL
subprocess.Popen([name, ''], stdout=dn, stderr=dn).terminate()
except OSError as e:
if e.errno == errno.ENOENT:
return False
return True
def exists(path):
return Path(path).exists()
def get_type_doc(obj):
services = {
'calc': 'com.sun.star.sheet.SpreadsheetDocument',
@ -289,10 +351,23 @@ class LODocument(object):
obj = self.obj.createInstance(name)
return obj
def save(self, path='', **kwargs):
opt = _properties(kwargs)
if path:
self._obj.storeAsURL(_path_url(path), opt)
else:
self._obj.store()
return True
def close(self):
self.obj.close(True)
return
def focus(self):
w = self._cc.getFrame().getComponentWindow()
w.setFocus()
return
class LOCalc(LODocument):
@ -1002,7 +1077,6 @@ def create_dialog(properties):
return LODialog(properties)
@catch_exception
def set_properties(model, properties):
if 'X' in properties:
properties['PositionX'] = properties.pop('X')
@ -1109,7 +1183,13 @@ def inputbox(message, default='', title=TITLE):
return ''
def open(path, **kwargs):
def new_doc(type_doc=CALC):
path = 'private:factory/s{}'.format(type_doc)
doc = get_desktop().loadComponentFromURL(path, '_default', 0, ())
return _get_class_doc(doc)
def open_doc(path, **kwargs):
""" Open document in path
Usually options:
Hidden: True or False
@ -1129,12 +1209,160 @@ def open(path, **kwargs):
def open_file(path):
if OS == WIN:
if IS_WIN:
os.startfile(path)
else:
subprocess.call(['xdg-open', path])
subprocess.Popen(['xdg-open', path])
return
def join(*paths):
return os.path.join(*paths)
def is_dir(path):
return Path(path).is_dir()
def is_file(path):
return Path(path).is_file()
def get_file_size(path):
return Path(path).stat().st_size
def is_created(path):
return is_file(path) and bool(get_file_size(path))
def replace_ext(path, ext):
path, _, name, _ = get_info_path(path)
return '{}/{}.{}'.format(path, name, ext)
def zip_names(path):
with zipfile.ZipFile(path) as z:
names = z.namelist()
return names
def run(command, wait=False):
# ~ debug(command)
# ~ debug(shlex.split(command))
try:
if wait:
p = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE)
p.wait()
else:
p = subprocess.Popen(shlex.split(command), stdin=None,
stdout=None, stderr=None, close_fds=True)
result, er = p.communicate()
except subprocess.CalledProcessError as e:
msg = ("run [ERROR]: output = %s, error code = %s\n"
% (e.output, e.returncode))
error(msg)
return False
if result is None:
return True
return result.decode()
def _zippwd(source, target, pwd):
if IS_WIN:
return False
if not exists_app('zip'):
return False
cmd = 'zip'
opt = '-j '
args = "{} --password {} ".format(cmd, pwd)
if isinstance(source, (tuple, list)):
if not target:
return False
args += opt + target + ' ' + ' '.join(source)
else:
if is_file(source) and not target:
target = replace_ext(source, 'zip')
elif is_dir(source) and not target:
target = join(PurePath(source).parent,
'{}.zip'.format(PurePath(source).name))
opt = '-r '
args += opt + target + ' ' + source
result = run(args, True)
if not result:
return False
return is_created(target)
def zip(source, target='', mode='w', pwd=''):
if pwd:
return _zippwd(source, target, pwd)
if isinstance(source, (tuple, list)):
if not target:
return False
with zipfile.ZipFile(target, mode, compression=zipfile.ZIP_DEFLATED) as z:
for path in source:
_, name, _, _ = get_info_path(path)
z.write(path, name)
return is_created(target)
if is_file(source):
if not target:
target = replace_ext(source, 'zip')
z = zipfile.ZipFile(target, mode, compression=zipfile.ZIP_DEFLATED)
_, name, _, _ = get_info_path(source)
z.write(source, name)
z.close()
return is_created(target)
if not target:
target = join(
PurePath(source).parent,
'{}.zip'.format(PurePath(source).name))
z = zipfile.ZipFile(target, mode, compression=zipfile.ZIP_DEFLATED)
root_len = len(os.path.abspath(source))
for root, dirs, files in os.walk(source):
relative = os.path.abspath(root)[root_len:]
for f in files:
fullpath = join(root, f)
file_name = join(relative, f)
z.write(fullpath, file_name)
z.close()
return is_created(target)
def unzip(source, path='', members=None, pwd=None):
if not path:
path, _, _, _ = get_info_path(source)
with zipfile.ZipFile(source) as z:
if not pwd is None:
pwd = pwd.encode()
if isinstance(members, str):
members = (members,)
z.extractall(path, members=members, pwd=pwd)
return True
def merge_zip(target, zips):
try:
with zipfile.ZipFile(target, 'w', compression=zipfile.ZIP_DEFLATED) as t:
for path in zips:
with zipfile.ZipFile(path, compression=zipfile.ZIP_DEFLATED) as s:
for name in s.namelist():
t.writestr(name, s.open(name).read())
except Exception as e:
error(e)
return False
return True