971 lines
27 KiB
Python
971 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# ~ Empresa Libre
|
|
# ~ Copyright (C) 2016-2019 Mauricio Baeza Servin (public@elmau.net)
|
|
# ~
|
|
# ~ This program is free software: you can redistribute it and/or modify
|
|
# ~ it under the terms of the GNU General Public License as published by
|
|
# ~ the Free Software Foundation, either version 3 of the License, or
|
|
# ~ (at your option) any later version.
|
|
# ~
|
|
# ~ This program is distributed in the hope that it will be useful,
|
|
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# ~ GNU General Public License for more details.
|
|
# ~
|
|
# ~ You should have received a copy of the GNU General Public License
|
|
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import collections
|
|
import csv
|
|
import datetime
|
|
import getpass
|
|
import io
|
|
import json
|
|
import logging
|
|
import math
|
|
import os
|
|
import shlex
|
|
import shutil
|
|
import smtplib
|
|
import sqlite3
|
|
import ssl
|
|
import subprocess
|
|
import threading
|
|
import unicodedata
|
|
import zipfile
|
|
from pathlib import Path
|
|
from xml.sax.saxutils import escape
|
|
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.text import MIMEText
|
|
from email import encoders
|
|
from email.utils import formatdate
|
|
from io import BytesIO
|
|
|
|
import lxml.etree as ET
|
|
import requests
|
|
|
|
from cryptography.fernet import Fernet
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
from dateutil import parser
|
|
|
|
from .cfdi_xml import CFDI
|
|
|
|
from settings import DEBUG, DB_COMPANIES, PATHS, TEMPLATE_CANCEL, RFCS
|
|
|
|
from .pacs.cfdi_cert import SATCertificate
|
|
from .pacs import PACComercioDigital
|
|
from .pacs import PACFinkok
|
|
|
|
# ~ v2
|
|
import segno
|
|
from .pycfdi import CfdiRead
|
|
|
|
|
|
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
|
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
|
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
|
|
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
|
|
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
|
|
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
|
|
log = logging.getLogger(__name__)
|
|
logging.getLogger('peewee').setLevel(logging.WARNING)
|
|
|
|
|
|
TIMEOUT = 10
|
|
PATH_INVOICES = 'facturas'
|
|
PG_DUMP = 'pg_dump -U postgres'
|
|
PSQL = 'psql -U postgres'
|
|
if DEBUG:
|
|
PG_DUMP = 'pg_dump -h localhost -U postgres'
|
|
PSQL = 'psql -h localhost -U postgres'
|
|
|
|
PACS = {
|
|
'finkok': PACFinkok,
|
|
'comercio': PACComercioDigital,
|
|
}
|
|
NS_CFDI = {
|
|
'cfdi': 'http://www.sat.gob.mx/cfd/3',
|
|
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
|
|
}
|
|
|
|
|
|
#~ https://github.com/kennethreitz/requests/blob/v1.2.3/requests/structures.py#L37
|
|
class CaseInsensitiveDict(collections.MutableMapping):
|
|
"""
|
|
A case-insensitive ``dict``-like object.
|
|
Implements all methods and operations of
|
|
``collections.MutableMapping`` as well as dict's ``copy``. Also
|
|
provides ``lower_items``.
|
|
All keys are expected to be strings. The structure remembers the
|
|
case of the last key to be set, and ``iter(instance)``,
|
|
``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()``
|
|
will contain case-sensitive keys. However, querying and contains
|
|
testing is case insensitive:
|
|
cid = CaseInsensitiveDict()
|
|
cid['Accept'] = 'application/json'
|
|
cid['aCCEPT'] == 'application/json' # True
|
|
list(cid) == ['Accept'] # True
|
|
For example, ``headers['content-encoding']`` will return the
|
|
value of a ``'Content-Encoding'`` response header, regardless
|
|
of how the header name was originally stored.
|
|
If the constructor, ``.update``, or equality comparison
|
|
operations are given keys that have equal ``.lower()``s, the
|
|
behavior is undefined.
|
|
"""
|
|
def __init__(self, data=None, **kwargs):
|
|
self._store = dict()
|
|
if data is None:
|
|
data = {}
|
|
self.update(data, **kwargs)
|
|
|
|
def __setitem__(self, key, value):
|
|
# Use the lowercased key for lookups, but store the actual
|
|
# key alongside the value.
|
|
self._store[key.lower()] = (key, value)
|
|
|
|
def __getitem__(self, key):
|
|
return self._store[key.lower()][1]
|
|
|
|
def __delitem__(self, key):
|
|
del self._store[key.lower()]
|
|
|
|
def __iter__(self):
|
|
return (casedkey for casedkey, mappedvalue in self._store.values())
|
|
|
|
def __len__(self):
|
|
return len(self._store)
|
|
|
|
def lower_items(self):
|
|
"""Like iteritems(), but with all lowercase keys."""
|
|
return (
|
|
(lowerkey, keyval[1])
|
|
for (lowerkey, keyval)
|
|
in self._store.items()
|
|
)
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, collections.Mapping):
|
|
other = CaseInsensitiveDict(other)
|
|
else:
|
|
return NotImplemented
|
|
# Compare insensitively
|
|
return dict(self.lower_items()) == dict(other.lower_items())
|
|
|
|
# Copy is required
|
|
def copy(self):
|
|
return CaseInsensitiveDict(self._store.values())
|
|
|
|
def __repr__(self):
|
|
return '%s(%r)' % (self.__class__.__name__, dict(self.items()))
|
|
|
|
|
|
class SendMail(object):
|
|
|
|
def __init__(self, config):
|
|
self._config = config
|
|
self._server = None
|
|
self._error = ''
|
|
self._is_connect = self._login()
|
|
|
|
@property
|
|
def is_connect(self):
|
|
return self._is_connect
|
|
|
|
@property
|
|
def error(self):
|
|
return self._error
|
|
|
|
def _login(self):
|
|
try:
|
|
if self._config['ssl'] and self._config['starttls']:
|
|
self._server = smtplib.SMTP(
|
|
self._config['server'],
|
|
self._config['port'], timeout=TIMEOUT)
|
|
self._server.ehlo()
|
|
self._server.starttls()
|
|
self._server.ehlo()
|
|
elif self._config['ssl']:
|
|
self._server = smtplib.SMTP_SSL(
|
|
self._config['server'],
|
|
self._config['port'], timeout=TIMEOUT)
|
|
self._server.ehlo()
|
|
else:
|
|
self._server = smtplib.SMTP(
|
|
self._config['server'],
|
|
self._config['port'], timeout=TIMEOUT)
|
|
self._server.login(self._config['user'], self._config['pass'])
|
|
return True
|
|
except smtplib.SMTPAuthenticationError as e:
|
|
if '535' in str(e):
|
|
self._error = 'Nombre de usuario o contraseña inválidos'
|
|
return False
|
|
if '534' in str(e) and 'gmail' in self._config['server']:
|
|
self._error = 'Necesitas activar el acceso a otras ' \
|
|
'aplicaciones en tu cuenta de GMail'
|
|
return False
|
|
except smtplib.SMTPException as e:
|
|
self._error = str(e)
|
|
return False
|
|
except Exception as e:
|
|
self._error = str(e)
|
|
return False
|
|
return
|
|
|
|
def send(self, options):
|
|
try:
|
|
message = MIMEMultipart()
|
|
message['From'] = self._config['user']
|
|
message['To'] = options['to']
|
|
message['CC'] = options.get('copy', '')
|
|
message['Subject'] = options['subject']
|
|
message['Date'] = formatdate(localtime=True)
|
|
if options.get('confirm', False):
|
|
message['Disposition-Notification-To'] = message['From']
|
|
message.attach(MIMEText(options['message'], 'html'))
|
|
for f in options.get('files', ()):
|
|
part = MIMEBase('application', 'octet-stream')
|
|
if isinstance(f[0], str):
|
|
part.set_payload(f[0].encode('utf-8'))
|
|
else:
|
|
part.set_payload(f[0])
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
'Content-Disposition',
|
|
"attachment; filename={}".format(f[1]))
|
|
message.attach(part)
|
|
|
|
receivers = options['to'].split(',') + message['CC'].split(',')
|
|
self._server.sendmail(
|
|
self._config['user'], receivers, message.as_string())
|
|
return ''
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
def close(self):
|
|
try:
|
|
self._server.quit()
|
|
except:
|
|
pass
|
|
return
|
|
|
|
|
|
class CfdiToDict(object):
|
|
NS = {
|
|
'cfdi': 'http://www.sat.gob.mx/cfd/3',
|
|
'divisas': 'http://www.sat.gob.mx/divisas',
|
|
'leyendasFisc': 'http://www.sat.gob.mx/leyendasFiscales',
|
|
'cartaporte20': 'http://www.sat.gob.mx/CartaPorte20',
|
|
}
|
|
tipo_figura = {
|
|
'01': '[01] Operador',
|
|
'02': '[02] Propietario',
|
|
'03': '[03] Arrendador',
|
|
'04': '[04] Notificado',
|
|
}
|
|
PAISES = {
|
|
'MEX': 'México',
|
|
}
|
|
ESTADOS = {
|
|
'AGU': 'Aguascalientes',
|
|
'BCN': 'Baja California',
|
|
'BCS': 'Baja California Sur',
|
|
'CAM': 'Campeche',
|
|
'CHP': 'Chiapas',
|
|
'CHH': 'Chihuahua',
|
|
'COA': 'Coahuila',
|
|
'COL': 'Colima',
|
|
'DIF': 'Ciudad de México',
|
|
'DUR': 'Durango',
|
|
'GUA': 'Guanajuato',
|
|
'GRO': 'Guerrero',
|
|
'HID': 'Hidalgo',
|
|
'JAL': 'Jalisco',
|
|
'MEX': 'México',
|
|
'MIC': 'Michoacán',
|
|
'MOR': 'Morelos',
|
|
'NAC': 'Nacional',
|
|
'NAY': 'Nayarit',
|
|
'NLE': 'Nuevo León',
|
|
'OAX': 'Oaxaca',
|
|
'PUE': 'Puebla',
|
|
'QUE': 'Querétaro',
|
|
'ROO': 'Quintana Roo',
|
|
'SLP': 'San Luis Potosí',
|
|
'SIN': 'Sinaloa',
|
|
'SON': 'Sonora',
|
|
'TAB': 'Tabasco',
|
|
'TAM': 'Tamaulipas',
|
|
'TLA': 'Tlaxcala',
|
|
'VER': 'Veracruz',
|
|
'YUC': 'Yucatán',
|
|
'ZAC': 'Zacatecas',
|
|
}
|
|
|
|
def __init__(self, xml):
|
|
self._values = {
|
|
'leyendas': (),
|
|
}
|
|
self._root = ET.parse(BytesIO(xml.encode())).getroot()
|
|
self._get_values()
|
|
|
|
@property
|
|
def values(self):
|
|
return self._values
|
|
|
|
def _get_values(self):
|
|
self._complementos()
|
|
return
|
|
|
|
def _set_carta_porte_domicilio(self, data):
|
|
municipio = data['Municipio']
|
|
estado = self.ESTADOS[data['Estado']]
|
|
pais = self.PAISES[data['Pais']]
|
|
domicilio = f"{municipio}, {estado}, {pais}, C.P. {data['CodigoPostal']}"
|
|
return domicilio
|
|
|
|
def _complementos(self):
|
|
path = '//cfdi:Complemento'
|
|
complemento = self._root.xpath(path, namespaces=self.NS)[0]
|
|
|
|
path = '//divisas:Divisas'
|
|
divisas = complemento.xpath(path, namespaces=self.NS)
|
|
if divisas:
|
|
d = CaseInsensitiveDict(divisas[0].attrib)
|
|
d.pop('version', '')
|
|
self._values.update({'divisas': d})
|
|
|
|
path = '//leyendasFisc:Leyenda'
|
|
node = complemento.xpath(path, namespaces=self.NS)
|
|
if node:
|
|
leyendas = [CaseInsensitiveDict(n.attrib) for n in node]
|
|
self._values['leyendas'] = leyendas
|
|
|
|
path = '//cartaporte20:CartaPorte'
|
|
carta_porte = complemento.xpath(path, namespaces=self.NS)
|
|
if carta_porte:
|
|
values = CaseInsensitiveDict(carta_porte[0].attrib)
|
|
for node in carta_porte[0]:
|
|
if 'FiguraTransporte' in node.tag:
|
|
figuras = CaseInsensitiveDict(node[0].attrib)
|
|
figuras['TipoFigura'] = self.tipo_figura[figuras['TipoFigura']]
|
|
values['figuras'] = figuras
|
|
elif 'Mercancias' in node.tag:
|
|
mercancias = CaseInsensitiveDict(node.attrib)
|
|
detalle = [CaseInsensitiveDict(n.attrib)
|
|
for n in node if 'Mercancia' in n.tag]
|
|
values['mercancias'] = {
|
|
'mercancias': mercancias,
|
|
'detalle': detalle,
|
|
}
|
|
|
|
path = '//cartaporte20:Autotransporte'
|
|
node_auto = node.xpath(path, namespaces=self.NS)[0]
|
|
values_auto = CaseInsensitiveDict(node_auto.attrib)
|
|
values['autotransporte'] = values_auto
|
|
|
|
path = '//cartaporte20:IdentificacionVehicular'
|
|
node_tmp = node_auto.xpath(path, namespaces=self.NS)[0]
|
|
values_auto = CaseInsensitiveDict(node_tmp.attrib)
|
|
values['autotransporte'].update(values_auto)
|
|
|
|
path = '//cartaporte20:Seguros'
|
|
node_tmp = node_auto.xpath(path, namespaces=self.NS)[0]
|
|
values_auto = CaseInsensitiveDict(node_tmp.attrib)
|
|
values['autotransporte'].update(values_auto)
|
|
|
|
path = '//cartaporte20:Remolques'
|
|
node_tmp = node_auto.xpath(path, namespaces=self.NS)[0][0]
|
|
values_auto = CaseInsensitiveDict(node_tmp.attrib)
|
|
values['autotransporte'].update(values_auto)
|
|
elif 'Ubicaciones' in node.tag:
|
|
ubicaciones = []
|
|
for n in node:
|
|
ubicacion = CaseInsensitiveDict(n.attrib)
|
|
ubicacion['domicilio'] = self._set_carta_porte_domicilio(
|
|
CaseInsensitiveDict(n[0].attrib))
|
|
ubicaciones.append(ubicacion)
|
|
|
|
values['ubicaciones'] = ubicaciones
|
|
|
|
self._values['carta_porte'] = values
|
|
|
|
return
|
|
|
|
|
|
def _call(args):
|
|
return subprocess.check_output(args, shell=True).decode()
|
|
|
|
|
|
def _run(args, wait=False):
|
|
result = ''
|
|
cmd = shlex.split(args)
|
|
if wait:
|
|
result = subprocess.run(cmd, shell=True, check=True).stdout.decode()
|
|
else:
|
|
subprocess.run(cmd)
|
|
return result
|
|
|
|
|
|
def _join(*paths):
|
|
return os.path.join(*paths)
|
|
|
|
|
|
def run_in_thread(fn):
|
|
def run(*k, **kw):
|
|
t = threading.Thread(target=fn, args=k, kwargs=kw)
|
|
t.start()
|
|
return t
|
|
return run
|
|
|
|
|
|
def send_mail(data):
|
|
msg = ''
|
|
ok = True
|
|
server = SendMail(data['server'])
|
|
if server.is_connect:
|
|
msg = server.send(data['mail'])
|
|
else:
|
|
msg = server.error
|
|
ok = False
|
|
server.close()
|
|
|
|
return {'ok': ok, 'msg': msg}
|
|
|
|
|
|
def round_up(value):
|
|
return int(math.ceil(value))
|
|
|
|
|
|
def _get_key(password):
|
|
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
|
|
digest.update(password.encode())
|
|
key = base64.urlsafe_b64encode(digest.finalize())
|
|
return key
|
|
|
|
|
|
def encrypt(data, password):
|
|
f = Fernet(_get_key(password))
|
|
return f.encrypt(data.encode()).decode()
|
|
|
|
|
|
def decrypt(data, password):
|
|
f = Fernet(_get_key(password))
|
|
return f.decrypt(data.encode()).decode()
|
|
|
|
|
|
def to_bool(value):
|
|
return bool(int(value))
|
|
|
|
|
|
def get_url(url):
|
|
r = requests.get(url).text
|
|
return r
|
|
|
|
|
|
def parse_date(value, next_day=False):
|
|
d = parser.parse(value)
|
|
if next_day:
|
|
return d + datetime.timedelta(days=1)
|
|
return d
|
|
|
|
|
|
def to_zip(files):
|
|
zip_buffer = BytesIO()
|
|
|
|
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
|
|
for file_name, data in files.items():
|
|
zip_file.writestr(file_name, data)
|
|
|
|
return zip_buffer.getvalue()
|
|
|
|
|
|
def dumps(data):
|
|
return json.dumps(data, default=str)
|
|
|
|
|
|
def loads(data):
|
|
return json.loads(data)
|
|
|
|
|
|
def json_loads(path):
|
|
return json.loads(open(path, 'r').read())
|
|
|
|
|
|
def _validate_db_rfc():
|
|
con = sqlite3.connect(DB_COMPANIES)
|
|
sql = """
|
|
CREATE TABLE IF NOT EXISTS names(
|
|
rfc TEXT NOT NULL COLLATE NOCASE UNIQUE,
|
|
con TEXT NOT NULL
|
|
);
|
|
"""
|
|
cursor = con.cursor()
|
|
cursor.executescript(sql)
|
|
cursor.close()
|
|
con.close()
|
|
return
|
|
|
|
|
|
def _sql_companies(sql, args=()):
|
|
_validate_db_rfc()
|
|
con = sqlite3.connect(DB_COMPANIES)
|
|
cursor = con.cursor()
|
|
try:
|
|
cursor.execute(sql, args)
|
|
data = cursor.fetchall()
|
|
except sqlite3.IntegrityError as e:
|
|
log.error(e)
|
|
return False
|
|
|
|
con.commit()
|
|
cursor.close()
|
|
con.close()
|
|
return data
|
|
|
|
|
|
def get_data_con(rfc):
|
|
data = rfc_get(rfc)[0][0]
|
|
return loads(data)
|
|
|
|
|
|
def rfc_get(rfc=''):
|
|
if rfc:
|
|
sql = "SELECT con FROM names WHERE rfc = ?"
|
|
w = (rfc,)
|
|
else:
|
|
w = ()
|
|
sql = "SELECT * FROM names"
|
|
data = _sql_companies(sql, w)
|
|
return data
|
|
|
|
|
|
def rfc_exists(rfc):
|
|
sql = "SELECT rfc FROM names WHERE rfc = ?"
|
|
data = _sql_companies(sql, (rfc,))
|
|
if isinstance(data, bool):
|
|
return
|
|
return bool(data)
|
|
|
|
|
|
def rfc_add(rfc, con):
|
|
sql = "INSERT INTO names VALUES (?, ?)"
|
|
data = _sql_companies(sql, (rfc.upper(), dumps(con)))
|
|
return True
|
|
|
|
|
|
def db_create(user):
|
|
args = f'{PSQL} -c "CREATE ROLE {user} WITH LOGIN ENCRYPTED PASSWORD \'{user}\';"'
|
|
_run(args)
|
|
args = f'{PSQL} -c "CREATE DATABASE {user} WITH OWNER {user};"'
|
|
_run(args)
|
|
return True
|
|
|
|
|
|
def db_delete(user, path, no_database=False):
|
|
sql = "DELETE FROM names WHERE rfc = ?"
|
|
data = _sql_companies(sql, (user,))
|
|
|
|
if no_database:
|
|
return True
|
|
|
|
user = user.replace('&', '').lower()
|
|
dt = now().strftime('%y%m%d_%H%M')
|
|
path_bk = _join(path, f'{user}_{dt}.bk')
|
|
|
|
args = f'{PG_DUMP} -d {user} -Fc -f "{path_bk}"'
|
|
_run(args)
|
|
|
|
args = f'{PSQL} -c "DROP DATABASE {user};"'
|
|
_run(args)
|
|
|
|
args = f'{PSQL} -c "DROP ROLE {user};"'
|
|
_run(args)
|
|
return True
|
|
|
|
|
|
def _get_pass(rfc):
|
|
return rfc
|
|
|
|
|
|
def _backup_db(rfc, is_mv, url_seafile):
|
|
log.info(f'Generando backup de: {rfc.upper()}')
|
|
bk_name = f'{rfc}.bk'
|
|
path = _join(PATHS['BK'], bk_name)
|
|
args = f'{PG_DUMP} -d {rfc} -Fc -f "{path}"'
|
|
_run(args)
|
|
log.info('\tBackup local generado...')
|
|
|
|
if is_mv:
|
|
path_target = _validate_path_local()
|
|
if path_target:
|
|
path_target = _join(path_target, bk_name)
|
|
shutil.copy(path, path_target)
|
|
else:
|
|
log.error('\tNo existe la carpeta compartida...')
|
|
return
|
|
|
|
|
|
def db_backup(is_mv, url_seafile):
|
|
data = rfc_get()
|
|
if not len(data):
|
|
msg = 'Sin bases de datos a respaldar'
|
|
log.info(msg)
|
|
return
|
|
|
|
for rfc, _ in data:
|
|
_backup_db(rfc.lower(), is_mv, url_seafile)
|
|
|
|
return
|
|
|
|
def _validate_path_local():
|
|
path_bk = _join(str(Path.home()), PATHS['LOCAL'])
|
|
if not os.path.isdir(path_bk):
|
|
path_bk = ''
|
|
return path_bk
|
|
|
|
|
|
def db_backup_local():
|
|
path_bk = _validate_path_local()
|
|
if not path_bk:
|
|
msg = 'No existe la carpeta local'
|
|
return {'ok': False, 'msg': msg}
|
|
|
|
data = rfc_get()
|
|
if not len(data):
|
|
msg = 'Sin bases de datos a respaldar'
|
|
return {'ok': False, 'msg': msg}
|
|
|
|
for row in data:
|
|
user = row[0].lower()
|
|
db = loads(row[1])['name']
|
|
path = _join(path_bk, '{}.bk'.format(user))
|
|
args = f'{PG_DUMP} -d {user} -Fc -f "{path}"'
|
|
_run(args)
|
|
|
|
msg = 'Bases de datos respaldadas correctamente'
|
|
result = {'ok': True, 'msg': msg}
|
|
return result
|
|
|
|
|
|
def now():
|
|
return datetime.datetime.now().replace(microsecond=0)
|
|
|
|
|
|
def get_days(date):
|
|
return (now() - date).days
|
|
|
|
|
|
def get_pass():
|
|
pass1 = getpass.getpass('Introduce la contraseña: ')
|
|
pass2 = getpass.getpass('Confirma la contraseña: ')
|
|
|
|
if pass1 != pass2:
|
|
msg = 'Las contraseñas son diferentes'
|
|
return False, msg
|
|
|
|
password = pass1.strip()
|
|
if not password:
|
|
msg = 'La contraseña es necesaria'
|
|
return False, msg
|
|
|
|
return True, password
|
|
|
|
|
|
def xml_stamp(xml, auth):
|
|
if not DEBUG and not auth:
|
|
msg = 'Sin datos para timbrar'
|
|
result = {'ok': False, 'error': msg}
|
|
return result
|
|
|
|
result = {'ok': True, 'error': ''}
|
|
|
|
pac = PACS[auth['pac']]()
|
|
response = pac.stamp(xml, auth)
|
|
|
|
if not response:
|
|
result['ok'] = False
|
|
result['error'] = pac.error
|
|
return result
|
|
|
|
result.update(response)
|
|
|
|
return result
|
|
|
|
|
|
def xml_cancel(xml, auth, cert, name):
|
|
msg = 'Factura cancelada correctamente'
|
|
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}}
|
|
|
|
pac = PACS[name]()
|
|
|
|
# ~ result = pac.cancel_xml(certificado.rfc, str(uuid).upper(),
|
|
# ~ certificado.cer_pem.encode(), _get_pem_from_pfx(certificado))
|
|
# ~ if result:
|
|
# ~ codes = {None: '',
|
|
# ~ 'Could not get UUID Text': 'UUID no encontrado',
|
|
# ~ 'Invalid Passphrase': 'Contraseña inválida',
|
|
# ~ }
|
|
# ~ if not result['CodEstatus'] is None:
|
|
# ~ data['ok'] = False
|
|
# ~ data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus'])
|
|
# ~ else:
|
|
data['ok'] = False
|
|
data['msg'] = pac.error
|
|
data['msg'] = 'Error Test'
|
|
|
|
return data, result
|
|
|
|
|
|
def get_client_balance(auth, rfc=''):
|
|
pac = PACS[auth['pac']]()
|
|
balance = pac.client_balance(auth, rfc)
|
|
if pac.error:
|
|
balance = 'p/e'
|
|
return balance
|
|
|
|
|
|
def get_cert(args):
|
|
cer = base64.b64decode(args['cer'].split(',')[1])
|
|
key = base64.b64decode(args['key'].split(',')[1])
|
|
cert = SATCertificate(cer, key, args['contra'])
|
|
return cert
|
|
|
|
|
|
def make_xml(data, certificado):
|
|
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
|
|
# ~ if DEBUG:
|
|
# ~ data['emisor']['Rfc'] = certificado.rfc
|
|
# ~ data['emisor']['RegimenFiscal'] = '603'
|
|
|
|
cfdi = CFDI()
|
|
xml = ET.parse(BytesIO(cfdi.get_xml(data).encode()))
|
|
|
|
path_xslt = _join(PATHS['xslt'], 'cadena.xslt')
|
|
xslt = open(path_xslt, 'rb')
|
|
transfor = ET.XSLT(ET.parse(xslt))
|
|
cadena = str(transfor(xml)).encode()
|
|
stamp = cert.sign(cadena)
|
|
xslt.close()
|
|
|
|
return cfdi.add_sello(stamp, cert.cer_txt)
|
|
|
|
|
|
def get_pac_by_rfc(cfdi):
|
|
tree = ET.fromstring(cfdi.encode())
|
|
path = 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@RfcProvCertif)'
|
|
rfc_pac = tree.xpath(path, namespaces=NS_CFDI)
|
|
return RFCS[rfc_pac]
|
|
|
|
|
|
def _cancel_with_cert(invoice, args, auth, certificado):
|
|
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
|
|
pac = PACS[auth['pac']]()
|
|
info = {'cer': cert.cer_pem, 'key': cert.key_pem, 'pass': '', 'args': args}
|
|
|
|
result = pac.cancel(invoice.xml, info, auth)
|
|
if pac.error:
|
|
data = {'ok': False, 'msg': pac.error, 'row': {}}
|
|
return data
|
|
|
|
msg = 'Factura cancelada correctamente'
|
|
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'},
|
|
'date': result['date'], 'acuse': result['acuse']}
|
|
return data
|
|
|
|
|
|
def cancel_xml_sign(invoice, args, auth, certificado):
|
|
if auth['pac'] == 'finkok':
|
|
return _cancel_with_cert(invoice, args, auth, certificado)
|
|
|
|
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
|
|
pac = PACS[auth['pac']]()
|
|
folio_new = ''
|
|
if args['uuid']:
|
|
folio_new = f' FolioSustitucion="{args["uuid"]}"'
|
|
data = {
|
|
'rfc': certificado.rfc,
|
|
'fecha': now().isoformat()[:19],
|
|
'uuid': str(invoice.uuid).upper(),
|
|
'motivo': args['reason'],
|
|
'folio': folio_new,
|
|
}
|
|
template = TEMPLATE_CANCEL.format(**data)
|
|
tree = ET.fromstring(template.encode())
|
|
sign_xml = cert.sign_xml(tree)
|
|
print(sign_xml)
|
|
|
|
result = pac.cancel_xml(sign_xml, auth, invoice.xml)
|
|
|
|
if pac.error:
|
|
data = {'ok': False, 'msg': pac.error, 'row': {}}
|
|
return data
|
|
|
|
msg = 'Factura cancelada correctamente'
|
|
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'},
|
|
'date': result['date'], 'acuse': result['acuse']}
|
|
return data
|
|
|
|
|
|
def _get_data_sat(xml):
|
|
BF = 'string(//*[local-name()="{}"]/@{})'
|
|
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
|
|
|
|
try:
|
|
tree = ET.fromstring(xml.encode())
|
|
emisor = escape(
|
|
tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
|
|
tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
|
|
)
|
|
receptor = escape(
|
|
tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
|
|
tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
|
|
)
|
|
total = tree.get('total') or tree.get('Total')
|
|
uuid = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
|
|
except Exception as e:
|
|
return ''
|
|
|
|
data = f'?re={emisor}&rr={receptor}&tt={total}&id={uuid}'
|
|
return data
|
|
|
|
|
|
def get_status_sat(xml):
|
|
data = _get_data_sat(xml)
|
|
if not data:
|
|
return 'XML inválido'
|
|
|
|
data = """<?xml version="1.0" encoding="UTF-8"?>
|
|
<soap:Envelope
|
|
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
|
|
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
|
<soap:Header/>
|
|
<soap:Body>
|
|
<Consulta xmlns="http://tempuri.org/">
|
|
<expresionImpresa>
|
|
{}
|
|
</expresionImpresa>
|
|
</Consulta>
|
|
</soap:Body>
|
|
</soap:Envelope>""".format(data)
|
|
headers = {
|
|
'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
|
|
'Content-type': 'text/xml; charset="UTF-8"'
|
|
}
|
|
URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc'
|
|
|
|
try:
|
|
result = requests.post(URL, data=data, headers=headers)
|
|
tree = ET.fromstring(result.text)
|
|
node = tree.xpath("//*[local-name() = 'Estado']")[0]
|
|
except Exception as e:
|
|
return 'Error: {}'.format(str(e))
|
|
|
|
return node.text
|
|
|
|
|
|
def spaces(value):
|
|
return '\n'.join([' '.join(l.split()) for l in value.split('\n')])
|
|
|
|
|
|
def to_slug(string):
|
|
value = (unicodedata.normalize('NFKD', string)
|
|
.encode('ascii', 'ignore')
|
|
.decode('ascii').lower())
|
|
return value.replace(' ', '_')
|
|
|
|
|
|
def read_csv(path, args={'delimiter': '|'}):
|
|
with open(path) as f:
|
|
reader = csv.DictReader(f, **args)
|
|
rows = [r for r in reader]
|
|
return rows
|
|
|
|
|
|
def _products_from_xml(rfc, data):
|
|
result = {'status': 'server', 'error': ''}
|
|
cfdi = CfdiRead(data)
|
|
|
|
if not DEBUG and rfc != cfdi.rfc_receptor:
|
|
msg = f'El receptor no es: {rfc}'
|
|
result['error'] = msg
|
|
return result
|
|
|
|
result['data'] = cfdi.data
|
|
# ~ result['data']['xml'] = cfdi.source
|
|
del result['data']['cfdi']['Certificado']
|
|
del result['data']['cfdi']['NoCertificado']
|
|
del result['data']['cfdi']['Sello']
|
|
del result['data']['timbre']['SelloCFD']
|
|
del result['data']['timbre']['SelloSAT']
|
|
|
|
emisor = result['data']['emisor']
|
|
emisor['rfc'] = emisor.pop('Rfc')
|
|
emisor['nombre'] = emisor.pop('Nombre')
|
|
result['data']['emisor'] = emisor
|
|
|
|
products = result['data']['conceptos']
|
|
rows = []
|
|
for p in products:
|
|
row = {
|
|
'key': p['NoIdentificacion'],
|
|
'key_sat': p['ClaveProdServ'],
|
|
'description': p['Descripcion'],
|
|
'unit': p['Unidad'],
|
|
'unit_value': p['ValorUnitario'],
|
|
'import': p['Importe'],
|
|
'cant': p['Cantidad'],
|
|
}
|
|
rows.append(row)
|
|
result['data']['conceptos'] = rows
|
|
|
|
return result
|
|
|
|
|
|
def save_file(path, data, modo='wb'):
|
|
try:
|
|
with open(path, modo) as f:
|
|
f.write(data)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def _save_template(rfc, name, file_obj):
|
|
result = {'status': 'server', 'ok': False}
|
|
|
|
ext1 = name[-3:]
|
|
ext2 = file_obj.filename.split('.')[-1].lower()
|
|
if ext1 != ext2:
|
|
msg = f'Extensión incorrecta del archivo: {ext2}'
|
|
result['error'] = msg
|
|
return result
|
|
|
|
rfc = rfc.lower()
|
|
path = _join(PATHS['USER'], f'{rfc}{name}')
|
|
if save_file(path, file_obj.file.read()):
|
|
result['ok'] = True
|
|
|
|
return result
|
|
|
|
|
|
def upload_file(rfc, name, file_obj):
|
|
if name == 'productsadd':
|
|
return _products_from_xml(rfc, file_obj.file.read())
|
|
|
|
return _save_template(rfc, name, file_obj)
|
|
|
|
|
|
def get_qr(data, kind='svg', in_base64=False):
|
|
buffer = io.BytesIO()
|
|
segno.make(data).save(buffer, kind=kind, scale=8, border=2)
|
|
qr = buffer
|
|
if in_base64:
|
|
qr = base64.b64encode(qr.getvalue()).decode()
|
|
return qr
|