empresa-libre/source/app/controllers/utils.py

427 lines
12 KiB
Python

#!/usr/bin/env python3
# ~ Empresa Libre
# ~ Copyright (C) 2016-2019 Mauricio Baeza Servin (public@elmau.net)
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import collections
import datetime
import json
import logging
import math
import os
import shutil
import smtplib
import sqlite3
import subprocess
import threading
import zipfile
from pathlib import Path
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
from email.utils import formatdate
from io import BytesIO
import lxml.etree as ET
import requests
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from dateutil import parser
import seafileapi
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('peewee').setLevel(logging.WARNING)
TIMEOUT = 10
PATH_INVOICES = 'facturas'
#~ https://github.com/kennethreitz/requests/blob/v1.2.3/requests/structures.py#L37
class CaseInsensitiveDict(collections.MutableMapping):
"""
A case-insensitive ``dict``-like object.
Implements all methods and operations of
``collections.MutableMapping`` as well as dict's ``copy``. Also
provides ``lower_items``.
All keys are expected to be strings. The structure remembers the
case of the last key to be set, and ``iter(instance)``,
``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()``
will contain case-sensitive keys. However, querying and contains
testing is case insensitive:
cid = CaseInsensitiveDict()
cid['Accept'] = 'application/json'
cid['aCCEPT'] == 'application/json' # True
list(cid) == ['Accept'] # True
For example, ``headers['content-encoding']`` will return the
value of a ``'Content-Encoding'`` response header, regardless
of how the header name was originally stored.
If the constructor, ``.update``, or equality comparison
operations are given keys that have equal ``.lower()``s, the
behavior is undefined.
"""
def __init__(self, data=None, **kwargs):
self._store = dict()
if data is None:
data = {}
self.update(data, **kwargs)
def __setitem__(self, key, value):
# Use the lowercased key for lookups, but store the actual
# key alongside the value.
self._store[key.lower()] = (key, value)
def __getitem__(self, key):
return self._store[key.lower()][1]
def __delitem__(self, key):
del self._store[key.lower()]
def __iter__(self):
return (casedkey for casedkey, mappedvalue in self._store.values())
def __len__(self):
return len(self._store)
def lower_items(self):
"""Like iteritems(), but with all lowercase keys."""
return (
(lowerkey, keyval[1])
for (lowerkey, keyval)
in self._store.items()
)
def __eq__(self, other):
if isinstance(other, collections.Mapping):
other = CaseInsensitiveDict(other)
else:
return NotImplemented
# Compare insensitively
return dict(self.lower_items()) == dict(other.lower_items())
# Copy is required
def copy(self):
return CaseInsensitiveDict(self._store.values())
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, dict(self.items()))
class SendMail(object):
def __init__(self, config):
self._config = config
self._server = None
self._error = ''
self._is_connect = self._login()
@property
def is_connect(self):
return self._is_connect
@property
def error(self):
return self._error
def _login(self):
hosts = ('gmail' in self._config['server'] or
'outlook' in self._config['server'])
try:
if self._config['ssl'] and hosts:
self._server = smtplib.SMTP(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.ehlo()
self._server.starttls()
self._server.ehlo()
elif self._config['ssl']:
self._server = smtplib.SMTP_SSL(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.ehlo()
else:
self._server = smtplib.SMTP(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.login(self._config['user'], self._config['pass'])
return True
except smtplib.SMTPAuthenticationError as e:
if '535' in str(e):
self._error = 'Nombre de usuario o contraseƱa invƔlidos'
return False
if '534' in str(e) and 'gmail' in self._config['server']:
self._error = 'Necesitas activar el acceso a otras ' \
'aplicaciones en tu cuenta de GMail'
return False
except smtplib.SMTPException as e:
self._error = str(e)
return False
except Exception as e:
self._error = str(e)
return False
return
def send(self, options):
try:
message = MIMEMultipart()
message['From'] = self._config['user']
message['To'] = options['to']
message['CC'] = options.get('copy', '')
message['Subject'] = options['subject']
message['Date'] = formatdate(localtime=True)
if options.get('confirm', False):
message['Disposition-Notification-To'] = message['From']
message.attach(MIMEText(options['message'], 'html'))
for f in options.get('files', ()):
part = MIMEBase('application', 'octet-stream')
if isinstance(f[0], str):
part.set_payload(f[0].encode('utf-8'))
else:
part.set_payload(f[0])
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
"attachment; filename={}".format(f[1]))
message.attach(part)
receivers = options['to'].split(',') + message['CC'].split(',')
self._server.sendmail(
self._config['user'], receivers, message.as_string())
return ''
except Exception as e:
return str(e)
def close(self):
try:
self._server.quit()
except:
pass
return
class CfdiToDict(object):
NS = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'divisas': 'http://www.sat.gob.mx/divisas',
}
def __init__(self, xml):
self._values = {}
self._root = ET.parse(BytesIO(xml.encode())).getroot()
self._get_values()
@property
def values(self):
return self._values
def _get_values(self):
self._complementos()
return
def _complementos(self):
complemento = self._root.xpath('//cfdi:Complemento', namespaces=self.NS)[0]
divisas = complemento.xpath('//divisas:Divisas', namespaces=self.NS)
if divisas:
d = CaseInsensitiveDict(divisas[0].attrib)
d.pop('version', '')
self._values.update({'divisas': d})
return
def _call(args):
return subprocess.check_output(args, shell=True).decode()
def _join(*paths):
return os.path.join(*paths)
def run_in_thread(fn):
def run(*k, **kw):
t = threading.Thread(target=fn, args=k, kwargs=kw)
t.start()
return t
return run
def send_mail(data):
msg = ''
ok = True
server = SendMail(data['server'])
if server.is_connect:
msg = server.send(data['mail'])
else:
msg = server.error
ok = False
server.close()
return {'ok': ok, 'msg': msg}
def round_up(value):
return int(math.ceil(value))
def _get_key(password):
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(password.encode())
key = base64.urlsafe_b64encode(digest.finalize())
return key
def encrypt(data, password):
f = Fernet(_get_key(password))
return f.encrypt(data.encode()).decode()
def decrypt(data, password):
f = Fernet(_get_key(password))
return f.decrypt(data.encode()).decode()
def to_bool(value):
return bool(int(value))
def get_url(url):
r = requests.get(url).text
return r
def parse_date(value, next_day=False):
d = parser.parse(value)
if next_day:
return d + datetime.timedelta(days=1)
return d
def to_zip(files):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, data in files.items():
zip_file.writestr(file_name, data)
return zip_buffer.getvalue()
def db_delete(user, path):
dt = datetime.datetime.now().strftime('%y%m%d_%H%M')
path_bk = _join(path, 'tmp', '{}_{}.bk'.format(user, dt))
args = 'pg_dump -U postgres -Fc {} > "{}"'.format(user, path_bk)
try:
_call(args)
except:
pass
args = 'psql -U postgres -c "DROP DATABASE {0};"'.format(user)
try:
_call(args)
except:
pass
args = 'psql -U postgres -c "DROP ROLE {0};"'.format(user)
try:
_call(args)
except:
pass
return
def _get_pass(rfc):
return rfc
def _backup_db(rfc, data, path_bk, is_mv, url_seafile):
if data['type'] != 'postgres':
return
log.info('Generando backup de: {}'.format(rfc))
bk_name = '{}.bk'.format(rfc.lower())
path_db = _join(path_bk, bk_name)
args = 'pg_dump -U postgres -Fc {} > "{}"'.format(data['name'], path_db)
result = _call(args)
log.info('\tBackup local generado...')
if is_mv:
path_target = _join(Path.home(), PATH_INVOICES)
if Path(path_target).exists():
path_target = _join(path_target, bk_name)
shutil.copy(path_db, path_target)
else:
log.error('\tNo existe la carpeta compartida...')
sql = 'select correo_timbrado, token_soporte from emisor;'
args = 'psql -U postgres -d {} -Atc "{}"'.format(data['name'], sql)
result = _call(args)
if not result:
log.error('\tSin datos para backup remoto')
return
data = result.strip().split('|')
if not data[1]:
log.error('\tSin token de soporte')
return
# ~ email = data[0]
# ~ uuid = data[1]
# ~ email = 'hola@elmau.net'
# ~ uuid = 'cc42c591-cf66-499a-ae70-c09df5646be9'
# ~ log.debug(url_seafile, email, _get_pass(rfc))
# ~ client = seafileapi.connect(url_seafile, email, _get_pass(rfc))
# ~ repo = client.repos.get_repo(uuid)
# ~ print(repo)
return
def db_backup(path_companies, path_bk, is_mv, url_seafile):
con = sqlite3.connect(path_companies)
cursor = con.cursor()
sql = "SELECT * FROM names"
cursor.execute(sql)
rows = cursor.fetchall()
if rows is None:
return
cursor.close()
con.close()
for rfc, data in rows:
_backup_db(rfc, json.loads(data), path_bk, is_mv, url_seafile)
return
def now():
return datetime.datetime.now().replace(microsecond=0)
def get_days(date):
return (now() - date).days