forked from elmau/empresa-libre
675 lines
18 KiB
Python
675 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# ~ Empresa Libre
|
|
# ~ Copyright (C) 2016-2019 Mauricio Baeza Servin (public@elmau.net)
|
|
# ~
|
|
# ~ This program is free software: you can redistribute it and/or modify
|
|
# ~ it under the terms of the GNU General Public License as published by
|
|
# ~ the Free Software Foundation, either version 3 of the License, or
|
|
# ~ (at your option) any later version.
|
|
# ~
|
|
# ~ This program is distributed in the hope that it will be useful,
|
|
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# ~ GNU General Public License for more details.
|
|
# ~
|
|
# ~ You should have received a copy of the GNU General Public License
|
|
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import collections
|
|
import datetime
|
|
import getpass
|
|
import json
|
|
import logging
|
|
import math
|
|
import os
|
|
import shlex
|
|
import shutil
|
|
import smtplib
|
|
import sqlite3
|
|
import subprocess
|
|
import threading
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.text import MIMEText
|
|
from email import encoders
|
|
from email.utils import formatdate
|
|
from io import BytesIO
|
|
|
|
import lxml.etree as ET
|
|
import requests
|
|
|
|
from cryptography.fernet import Fernet
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
from dateutil import parser
|
|
|
|
from .cfdi_xml import CFDI
|
|
|
|
from settings import DEBUG, DB_COMPANIES, PATHS, TEMPLATE_CANCEL
|
|
|
|
from .cfdi_cert import SATCertificate
|
|
from .pacs import PACComercioDigital
|
|
# ~ from .pacs import PACFinkok
|
|
from .pac import Finkok as PACFinkok
|
|
# ~ from .finkok import PACFinkok
|
|
|
|
|
|
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
|
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
|
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
|
|
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
|
|
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
|
|
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
|
|
log = logging.getLogger(__name__)
|
|
logging.getLogger('peewee').setLevel(logging.WARNING)
|
|
|
|
|
|
TIMEOUT = 10
|
|
PATH_INVOICES = 'facturas'
|
|
PG_DUMP = 'pg_dump -U postgres'
|
|
PSQL = 'psql -U postgres'
|
|
if DEBUG:
|
|
PG_DUMP = 'pg_dump -h localhost -U postgres'
|
|
PSQL = 'psql -h localhost -U postgres'
|
|
|
|
PACS = {
|
|
'finkok': PACFinkok,
|
|
'comercio': PACComercioDigital,
|
|
}
|
|
|
|
#~ https://github.com/kennethreitz/requests/blob/v1.2.3/requests/structures.py#L37
|
|
class CaseInsensitiveDict(collections.MutableMapping):
|
|
"""
|
|
A case-insensitive ``dict``-like object.
|
|
Implements all methods and operations of
|
|
``collections.MutableMapping`` as well as dict's ``copy``. Also
|
|
provides ``lower_items``.
|
|
All keys are expected to be strings. The structure remembers the
|
|
case of the last key to be set, and ``iter(instance)``,
|
|
``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()``
|
|
will contain case-sensitive keys. However, querying and contains
|
|
testing is case insensitive:
|
|
cid = CaseInsensitiveDict()
|
|
cid['Accept'] = 'application/json'
|
|
cid['aCCEPT'] == 'application/json' # True
|
|
list(cid) == ['Accept'] # True
|
|
For example, ``headers['content-encoding']`` will return the
|
|
value of a ``'Content-Encoding'`` response header, regardless
|
|
of how the header name was originally stored.
|
|
If the constructor, ``.update``, or equality comparison
|
|
operations are given keys that have equal ``.lower()``s, the
|
|
behavior is undefined.
|
|
"""
|
|
def __init__(self, data=None, **kwargs):
|
|
self._store = dict()
|
|
if data is None:
|
|
data = {}
|
|
self.update(data, **kwargs)
|
|
|
|
def __setitem__(self, key, value):
|
|
# Use the lowercased key for lookups, but store the actual
|
|
# key alongside the value.
|
|
self._store[key.lower()] = (key, value)
|
|
|
|
def __getitem__(self, key):
|
|
return self._store[key.lower()][1]
|
|
|
|
def __delitem__(self, key):
|
|
del self._store[key.lower()]
|
|
|
|
def __iter__(self):
|
|
return (casedkey for casedkey, mappedvalue in self._store.values())
|
|
|
|
def __len__(self):
|
|
return len(self._store)
|
|
|
|
def lower_items(self):
|
|
"""Like iteritems(), but with all lowercase keys."""
|
|
return (
|
|
(lowerkey, keyval[1])
|
|
for (lowerkey, keyval)
|
|
in self._store.items()
|
|
)
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, collections.Mapping):
|
|
other = CaseInsensitiveDict(other)
|
|
else:
|
|
return NotImplemented
|
|
# Compare insensitively
|
|
return dict(self.lower_items()) == dict(other.lower_items())
|
|
|
|
# Copy is required
|
|
def copy(self):
|
|
return CaseInsensitiveDict(self._store.values())
|
|
|
|
def __repr__(self):
|
|
return '%s(%r)' % (self.__class__.__name__, dict(self.items()))
|
|
|
|
|
|
class SendMail(object):
|
|
|
|
def __init__(self, config):
|
|
self._config = config
|
|
self._server = None
|
|
self._error = ''
|
|
self._is_connect = self._login()
|
|
|
|
@property
|
|
def is_connect(self):
|
|
return self._is_connect
|
|
|
|
@property
|
|
def error(self):
|
|
return self._error
|
|
|
|
def _login(self):
|
|
hosts = ('gmail' in self._config['server'] or
|
|
'outlook' in self._config['server'])
|
|
try:
|
|
if self._config['ssl'] and hosts:
|
|
self._server = smtplib.SMTP(
|
|
self._config['server'],
|
|
self._config['port'], timeout=TIMEOUT)
|
|
self._server.ehlo()
|
|
self._server.starttls()
|
|
self._server.ehlo()
|
|
elif self._config['ssl']:
|
|
self._server = smtplib.SMTP_SSL(
|
|
self._config['server'],
|
|
self._config['port'], timeout=TIMEOUT)
|
|
self._server.ehlo()
|
|
else:
|
|
self._server = smtplib.SMTP(
|
|
self._config['server'],
|
|
self._config['port'], timeout=TIMEOUT)
|
|
self._server.login(self._config['user'], self._config['pass'])
|
|
return True
|
|
except smtplib.SMTPAuthenticationError as e:
|
|
if '535' in str(e):
|
|
self._error = 'Nombre de usuario o contraseña inválidos'
|
|
return False
|
|
if '534' in str(e) and 'gmail' in self._config['server']:
|
|
self._error = 'Necesitas activar el acceso a otras ' \
|
|
'aplicaciones en tu cuenta de GMail'
|
|
return False
|
|
except smtplib.SMTPException as e:
|
|
self._error = str(e)
|
|
return False
|
|
except Exception as e:
|
|
self._error = str(e)
|
|
return False
|
|
return
|
|
|
|
def send(self, options):
|
|
try:
|
|
message = MIMEMultipart()
|
|
message['From'] = self._config['user']
|
|
message['To'] = options['to']
|
|
message['CC'] = options.get('copy', '')
|
|
message['Subject'] = options['subject']
|
|
message['Date'] = formatdate(localtime=True)
|
|
if options.get('confirm', False):
|
|
message['Disposition-Notification-To'] = message['From']
|
|
message.attach(MIMEText(options['message'], 'html'))
|
|
for f in options.get('files', ()):
|
|
part = MIMEBase('application', 'octet-stream')
|
|
if isinstance(f[0], str):
|
|
part.set_payload(f[0].encode('utf-8'))
|
|
else:
|
|
part.set_payload(f[0])
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
'Content-Disposition',
|
|
"attachment; filename={}".format(f[1]))
|
|
message.attach(part)
|
|
|
|
receivers = options['to'].split(',') + message['CC'].split(',')
|
|
self._server.sendmail(
|
|
self._config['user'], receivers, message.as_string())
|
|
return ''
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
def close(self):
|
|
try:
|
|
self._server.quit()
|
|
except:
|
|
pass
|
|
return
|
|
|
|
|
|
class CfdiToDict(object):
|
|
NS = {
|
|
'cfdi': 'http://www.sat.gob.mx/cfd/3',
|
|
'divisas': 'http://www.sat.gob.mx/divisas',
|
|
'leyendasFisc': 'http://www.sat.gob.mx/leyendasFiscales',
|
|
}
|
|
|
|
def __init__(self, xml):
|
|
self._values = {
|
|
'leyendas': (),
|
|
}
|
|
self._root = ET.parse(BytesIO(xml.encode())).getroot()
|
|
self._get_values()
|
|
|
|
@property
|
|
def values(self):
|
|
return self._values
|
|
|
|
def _get_values(self):
|
|
self._complementos()
|
|
return
|
|
|
|
def _complementos(self):
|
|
path = '//cfdi:Complemento'
|
|
complemento = self._root.xpath(path, namespaces=self.NS)[0]
|
|
|
|
path = '//divisas:Divisas'
|
|
divisas = complemento.xpath(path, namespaces=self.NS)
|
|
if divisas:
|
|
d = CaseInsensitiveDict(divisas[0].attrib)
|
|
d.pop('version', '')
|
|
self._values.update({'divisas': d})
|
|
|
|
path = '//leyendasFisc:Leyenda'
|
|
node = complemento.xpath(path, namespaces=self.NS)
|
|
if node:
|
|
leyendas = [CaseInsensitiveDict(n.attrib) for n in node]
|
|
self._values['leyendas'] = leyendas
|
|
return
|
|
|
|
|
|
def _call(args):
|
|
return subprocess.check_output(args, shell=True).decode()
|
|
|
|
|
|
def _run(args, wait=False):
|
|
result = ''
|
|
cmd = shlex.split(args)
|
|
if wait:
|
|
result = subprocess.run(cmd, shell=True, check=True).stdout.decode()
|
|
else:
|
|
subprocess.run(cmd)
|
|
return result
|
|
|
|
|
|
def _join(*paths):
|
|
return os.path.join(*paths)
|
|
|
|
|
|
def run_in_thread(fn):
|
|
def run(*k, **kw):
|
|
t = threading.Thread(target=fn, args=k, kwargs=kw)
|
|
t.start()
|
|
return t
|
|
return run
|
|
|
|
|
|
def send_mail(data):
|
|
msg = ''
|
|
ok = True
|
|
server = SendMail(data['server'])
|
|
if server.is_connect:
|
|
msg = server.send(data['mail'])
|
|
else:
|
|
msg = server.error
|
|
ok = False
|
|
server.close()
|
|
|
|
return {'ok': ok, 'msg': msg}
|
|
|
|
|
|
def round_up(value):
|
|
return int(math.ceil(value))
|
|
|
|
|
|
def _get_key(password):
|
|
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
|
|
digest.update(password.encode())
|
|
key = base64.urlsafe_b64encode(digest.finalize())
|
|
return key
|
|
|
|
|
|
def encrypt(data, password):
|
|
f = Fernet(_get_key(password))
|
|
return f.encrypt(data.encode()).decode()
|
|
|
|
|
|
def decrypt(data, password):
|
|
f = Fernet(_get_key(password))
|
|
return f.decrypt(data.encode()).decode()
|
|
|
|
|
|
def to_bool(value):
|
|
return bool(int(value))
|
|
|
|
|
|
def get_url(url):
|
|
r = requests.get(url).text
|
|
return r
|
|
|
|
|
|
def parse_date(value, next_day=False):
|
|
d = parser.parse(value)
|
|
if next_day:
|
|
return d + datetime.timedelta(days=1)
|
|
return d
|
|
|
|
|
|
def to_zip(files):
|
|
zip_buffer = BytesIO()
|
|
|
|
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
|
|
for file_name, data in files.items():
|
|
zip_file.writestr(file_name, data)
|
|
|
|
return zip_buffer.getvalue()
|
|
|
|
|
|
def dumps(data):
|
|
return json.dumps(data, default=str)
|
|
|
|
|
|
def loads(data):
|
|
return json.loads(data)
|
|
|
|
|
|
def json_loads(path):
|
|
return json.loads(open(path, 'r').read())
|
|
|
|
|
|
def _validate_db_rfc():
|
|
con = sqlite3.connect(DB_COMPANIES)
|
|
sql = """
|
|
CREATE TABLE IF NOT EXISTS names(
|
|
rfc TEXT NOT NULL COLLATE NOCASE UNIQUE,
|
|
con TEXT NOT NULL
|
|
);
|
|
"""
|
|
cursor = con.cursor()
|
|
cursor.executescript(sql)
|
|
cursor.close()
|
|
con.close()
|
|
return
|
|
|
|
|
|
def _sql_companies(sql, args=()):
|
|
_validate_db_rfc()
|
|
con = sqlite3.connect(DB_COMPANIES)
|
|
cursor = con.cursor()
|
|
try:
|
|
cursor.execute(sql, args)
|
|
data = cursor.fetchall()
|
|
except sqlite3.IntegrityError as e:
|
|
log.error(e)
|
|
return False
|
|
|
|
con.commit()
|
|
cursor.close()
|
|
con.close()
|
|
return data
|
|
|
|
|
|
def get_data_con(rfc):
|
|
data = rfc_get(rfc)[0][0]
|
|
return loads(data)
|
|
|
|
|
|
def rfc_get(rfc=''):
|
|
if rfc:
|
|
sql = "SELECT con FROM names WHERE rfc = ?"
|
|
w = (rfc,)
|
|
else:
|
|
w = ()
|
|
sql = "SELECT * FROM names"
|
|
data = _sql_companies(sql, w)
|
|
return data
|
|
|
|
|
|
def rfc_exists(rfc):
|
|
sql = "SELECT rfc FROM names WHERE rfc = ?"
|
|
data = _sql_companies(sql, (rfc,))
|
|
if isinstance(data, bool):
|
|
return
|
|
return bool(data)
|
|
|
|
|
|
def rfc_add(rfc, con):
|
|
sql = "INSERT INTO names VALUES (?, ?)"
|
|
data = _sql_companies(sql, (rfc.upper(), dumps(con)))
|
|
return True
|
|
|
|
|
|
def db_create(user):
|
|
args = f'{PSQL} -c "CREATE ROLE {user} WITH LOGIN ENCRYPTED PASSWORD \'{user}\';"'
|
|
_run(args)
|
|
args = f'{PSQL} -c "CREATE DATABASE {user} WITH OWNER {user};"'
|
|
_run(args)
|
|
return True
|
|
|
|
|
|
def db_delete(user, path, no_database=False):
|
|
sql = "DELETE FROM names WHERE rfc = ?"
|
|
data = _sql_companies(sql, (user,))
|
|
|
|
if no_database:
|
|
return True
|
|
|
|
user = user.replace('&', '').lower()
|
|
dt = now().strftime('%y%m%d_%H%M')
|
|
path_bk = _join(path, f'{user}_{dt}.bk')
|
|
|
|
args = f'{PG_DUMP} -d {user} -Fc -f "{path_bk}"'
|
|
_run(args)
|
|
|
|
args = f'{PSQL} -c "DROP DATABASE {user};"'
|
|
_run(args)
|
|
|
|
args = f'{PSQL} -c "DROP ROLE {user};"'
|
|
_run(args)
|
|
return True
|
|
|
|
|
|
def _get_pass(rfc):
|
|
return rfc
|
|
|
|
|
|
def _backup_db(rfc, is_mv, url_seafile):
|
|
log.info(f'Generando backup de: {rfc.upper()}')
|
|
bk_name = f'{rfc}.bk'
|
|
path = _join(PATHS['BK'], bk_name)
|
|
args = f'{PG_DUMP} -d {rfc} -Fc -f "{path}"'
|
|
_run(args)
|
|
log.info('\tBackup local generado...')
|
|
|
|
if is_mv:
|
|
path_target = _validate_path_local()
|
|
if path_target:
|
|
path_target = _join(path_target, bk_name)
|
|
shutil.copy(path, path_target)
|
|
else:
|
|
log.error('\tNo existe la carpeta compartida...')
|
|
return
|
|
|
|
|
|
def db_backup(is_mv, url_seafile):
|
|
data = rfc_get()
|
|
if not len(data):
|
|
msg = 'Sin bases de datos a respaldar'
|
|
log.info(msg)
|
|
return
|
|
|
|
for rfc, _ in data:
|
|
_backup_db(rfc.lower(), is_mv, url_seafile)
|
|
|
|
return
|
|
|
|
def _validate_path_local():
|
|
path_bk = _join(str(Path.home()), PATHS['LOCAL'])
|
|
if not os.path.isdir(path_bk):
|
|
path_bk = ''
|
|
return path_bk
|
|
|
|
|
|
def db_backup_local():
|
|
path_bk = _validate_path_local()
|
|
if not path_bk:
|
|
msg = 'No existe la carpeta local'
|
|
return {'ok': False, 'msg': msg}
|
|
|
|
data = rfc_get()
|
|
if not len(data):
|
|
msg = 'Sin bases de datos a respaldar'
|
|
return {'ok': False, 'msg': msg}
|
|
|
|
for row in data:
|
|
user = row[0].lower()
|
|
db = loads(row[1])['name']
|
|
path = _join(path_bk, '{}.bk'.format(user))
|
|
args = f'{PG_DUMP} -d {user} -Fc -f "{path}"'
|
|
_run(args)
|
|
|
|
msg = 'Bases de datos respaldadas correctamente'
|
|
result = {'ok': True, 'msg': msg}
|
|
return result
|
|
|
|
|
|
def now():
|
|
return datetime.datetime.now().replace(microsecond=0)
|
|
|
|
|
|
def get_days(date):
|
|
return (now() - date).days
|
|
|
|
|
|
def get_pass():
|
|
pass1 = getpass.getpass('Introduce la contraseña: ')
|
|
pass2 = getpass.getpass('Confirma la contraseña: ')
|
|
|
|
if pass1 != pass2:
|
|
msg = 'Las contraseñas son diferentes'
|
|
return False, msg
|
|
|
|
password = pass1.strip()
|
|
if not password:
|
|
msg = 'La contraseña es necesaria'
|
|
return False, msg
|
|
|
|
return True, password
|
|
|
|
|
|
def xml_stamp(xml, auth):
|
|
if not DEBUG and not auth:
|
|
msg = 'Sin datos para timbrar'
|
|
result = {'ok': False, 'error': msg}
|
|
return result
|
|
|
|
result = {'ok': True, 'error': ''}
|
|
|
|
pac = PACS[auth['pac']]()
|
|
response = pac.stamp(xml, auth)
|
|
|
|
if not response:
|
|
result['ok'] = False
|
|
result['error'] = pac.error
|
|
return result
|
|
|
|
result.update(response)
|
|
|
|
return result
|
|
|
|
|
|
def xml_cancel(xml, auth, cert, name):
|
|
msg = 'Factura cancelada correctamente'
|
|
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}}
|
|
|
|
pac = PACS[name]()
|
|
|
|
# ~ result = pac.cancel_xml(certificado.rfc, str(uuid).upper(),
|
|
# ~ certificado.cer_pem.encode(), _get_pem_from_pfx(certificado))
|
|
# ~ if result:
|
|
# ~ codes = {None: '',
|
|
# ~ 'Could not get UUID Text': 'UUID no encontrado',
|
|
# ~ 'Invalid Passphrase': 'Contraseña inválida',
|
|
# ~ }
|
|
# ~ if not result['CodEstatus'] is None:
|
|
# ~ data['ok'] = False
|
|
# ~ data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus'])
|
|
# ~ else:
|
|
data['ok'] = False
|
|
data['msg'] = pac.error
|
|
data['msg'] = 'Error Test'
|
|
|
|
return data, result
|
|
|
|
|
|
def get_client_balance(auth):
|
|
if DEBUG:
|
|
return '-d'
|
|
|
|
pac = PACS[auth['pac']]()
|
|
balance = pac.client_balance(auth)
|
|
if pac.error:
|
|
balance = 'p/e'
|
|
|
|
return balance
|
|
|
|
|
|
def get_cert(args):
|
|
cer = base64.b64decode(args['cer'].split(',')[1])
|
|
key = base64.b64decode(args['key'].split(',')[1])
|
|
cert = SATCertificate(cer, key, args['contra'])
|
|
return cert
|
|
|
|
|
|
def make_xml(data, certificado):
|
|
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
|
|
if DEBUG:
|
|
data['emisor']['Rfc'] = certificado.rfc
|
|
data['emisor']['RegimenFiscal'] = '603'
|
|
|
|
cfdi = CFDI()
|
|
xml = ET.parse(BytesIO(cfdi.get_xml(data).encode()))
|
|
|
|
path_xslt = _join(PATHS['xslt'], 'cadena.xslt')
|
|
xslt = open(path_xslt, 'rb')
|
|
transfor = ET.XSLT(ET.parse(xslt))
|
|
cadena = str(transfor(xml)).encode()
|
|
stamp = cert.sign(cadena)
|
|
xslt.close()
|
|
|
|
return cfdi.add_sello(stamp, cert.cer_txt)
|
|
|
|
|
|
def cancel_xml_sign(invoice, auth, certificado):
|
|
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
|
|
pac = PACS[auth['pac']]()
|
|
data = {
|
|
'rfc': certificado.rfc,
|
|
'fecha': now().isoformat()[:19],
|
|
'uuid': invoice.uuid,
|
|
}
|
|
template = TEMPLATE_CANCEL.format(**data)
|
|
tree = ET.fromstring(template.encode())
|
|
tree = cert.sign_xml(tree)
|
|
sign_xml = ET.tostring(tree).decode()
|
|
|
|
result = pac.cancel_xml(invoice.xml, sign_xml, auth)
|
|
if pac.error:
|
|
result = {'ok': False, 'msg': pac.error, 'row': {}}
|
|
return result
|
|
|
|
tree = ET.fromstring(result)
|
|
date_cancel = tree.xpath('string(//Acuse/@Fecha)')[:19]
|
|
|
|
msg = 'Factura cancelada correctamente'
|
|
result = {'ok': True, 'msg': '', 'row': {'estatus': 'Cancelada'},
|
|
'Fecha': date_cancel, 'Acuse': result}
|
|
return result
|