empresa-libre/source/app/controllers/utils.py

991 lines
28 KiB
Python

#!/usr/bin/env python3
# ~ Empresa Libre
# ~ Copyright (C) 2016-2019 Mauricio Baeza Servin (public@elmau.net)
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import collections
import csv
import datetime
import getpass
import io
import json
import logging
import math
import os
import shlex
import shutil
import smtplib
import sqlite3
import ssl
import subprocess
import threading
import unicodedata
import zipfile
from pathlib import Path
from xml.sax.saxutils import escape
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
from email.utils import formatdate
from io import BytesIO
import lxml.etree as ET
import requests
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from dateutil import parser
from .cfdi_xml import CFDI
from settings import DEBUG, DB_COMPANIES, PATHS, TEMPLATE_CANCEL, RFCS
from .pacs.cfdi_cert import SATCertificate
from .pacs import PACComercioDigital
from .pacs import PACFinkok
# ~ v2
import segno
from .pycfdi import CfdiRead
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('peewee').setLevel(logging.WARNING)
TIMEOUT = 10
PATH_INVOICES = 'facturas'
PG_DUMP = 'pg_dump -U postgres'
PSQL = 'psql -U postgres'
if DEBUG:
PG_DUMP = 'pg_dump -h localhost -U postgres'
PSQL = 'psql -h localhost -U postgres'
PACS = {
'finkok': PACFinkok,
'comercio': PACComercioDigital,
}
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
#~ https://github.com/kennethreitz/requests/blob/v1.2.3/requests/structures.py#L37
class CaseInsensitiveDict(collections.MutableMapping):
"""
A case-insensitive ``dict``-like object.
Implements all methods and operations of
``collections.MutableMapping`` as well as dict's ``copy``. Also
provides ``lower_items``.
All keys are expected to be strings. The structure remembers the
case of the last key to be set, and ``iter(instance)``,
``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()``
will contain case-sensitive keys. However, querying and contains
testing is case insensitive:
cid = CaseInsensitiveDict()
cid['Accept'] = 'application/json'
cid['aCCEPT'] == 'application/json' # True
list(cid) == ['Accept'] # True
For example, ``headers['content-encoding']`` will return the
value of a ``'Content-Encoding'`` response header, regardless
of how the header name was originally stored.
If the constructor, ``.update``, or equality comparison
operations are given keys that have equal ``.lower()``s, the
behavior is undefined.
"""
def __init__(self, data=None, **kwargs):
self._store = dict()
if data is None:
data = {}
self.update(data, **kwargs)
def __setitem__(self, key, value):
# Use the lowercased key for lookups, but store the actual
# key alongside the value.
self._store[key.lower()] = (key, value)
def __getitem__(self, key):
return self._store[key.lower()][1]
def __delitem__(self, key):
del self._store[key.lower()]
def __iter__(self):
return (casedkey for casedkey, mappedvalue in self._store.values())
def __len__(self):
return len(self._store)
def lower_items(self):
"""Like iteritems(), but with all lowercase keys."""
return (
(lowerkey, keyval[1])
for (lowerkey, keyval)
in self._store.items()
)
def __eq__(self, other):
if isinstance(other, collections.Mapping):
other = CaseInsensitiveDict(other)
else:
return NotImplemented
# Compare insensitively
return dict(self.lower_items()) == dict(other.lower_items())
# Copy is required
def copy(self):
return CaseInsensitiveDict(self._store.values())
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, dict(self.items()))
class SendMail(object):
def __init__(self, config):
self._config = config
self._server = None
self._error = ''
self._is_connect = self._login()
@property
def is_connect(self):
return self._is_connect
@property
def error(self):
return self._error
def _login(self):
try:
if self._config['ssl'] and self._config['starttls']:
self._server = smtplib.SMTP(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.ehlo()
self._server.starttls()
self._server.ehlo()
elif self._config['ssl']:
self._server = smtplib.SMTP_SSL(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.ehlo()
else:
self._server = smtplib.SMTP(
self._config['server'],
self._config['port'], timeout=TIMEOUT)
self._server.login(self._config['user'], self._config['pass'])
return True
except smtplib.SMTPAuthenticationError as e:
if '535' in str(e):
self._error = 'Nombre de usuario o contraseña inválidos'
return False
if '534' in str(e) and 'gmail' in self._config['server']:
self._error = 'Necesitas activar el acceso a otras ' \
'aplicaciones en tu cuenta de GMail'
return False
except smtplib.SMTPException as e:
self._error = str(e)
return False
except Exception as e:
self._error = str(e)
return False
return
def send(self, options):
try:
message = MIMEMultipart()
message['From'] = self._config['user']
message['To'] = options['to']
message['CC'] = options.get('copy', '')
message['Subject'] = options['subject']
message['Date'] = formatdate(localtime=True)
if options.get('confirm', False):
message['Disposition-Notification-To'] = message['From']
message.attach(MIMEText(options['message'], 'html'))
for f in options.get('files', ()):
part = MIMEBase('application', 'octet-stream')
if isinstance(f[0], str):
part.set_payload(f[0].encode('utf-8'))
else:
part.set_payload(f[0])
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
"attachment; filename={}".format(f[1]))
message.attach(part)
receivers = options['to'].split(',') + message['CC'].split(',')
self._server.sendmail(
self._config['user'], receivers, message.as_string())
return ''
except Exception as e:
return str(e)
def close(self):
try:
self._server.quit()
except:
pass
return
class CfdiToDict(object):
NS_VERSION = {
'cfdi3.3': 'http://www.sat.gob.mx/cfd/3',
'cfdi4.0': 'http://www.sat.gob.mx/cfd/4',
}
NS = {
'divisas': 'http://www.sat.gob.mx/divisas',
'leyendasFisc': 'http://www.sat.gob.mx/leyendasFiscales',
'cartaporte20': 'http://www.sat.gob.mx/CartaPorte20',
}
tipo_figura = {
'01': '[01] Operador',
'02': '[02] Propietario',
'03': '[03] Arrendador',
'04': '[04] Notificado',
}
PAISES = {
'MEX': 'México',
}
ESTADOS = {
'AGU': 'Aguascalientes',
'BCN': 'Baja California',
'BCS': 'Baja California Sur',
'CAM': 'Campeche',
'CHP': 'Chiapas',
'CHH': 'Chihuahua',
'COA': 'Coahuila',
'COL': 'Colima',
'DIF': 'Ciudad de México',
'DUR': 'Durango',
'GUA': 'Guanajuato',
'GRO': 'Guerrero',
'HID': 'Hidalgo',
'JAL': 'Jalisco',
'MEX': 'México',
'MIC': 'Michoacán',
'MOR': 'Morelos',
'NAC': 'Nacional',
'NAY': 'Nayarit',
'NLE': 'Nuevo León',
'OAX': 'Oaxaca',
'PUE': 'Puebla',
'QUE': 'Querétaro',
'ROO': 'Quintana Roo',
'SLP': 'San Luis Potosí',
'SIN': 'Sinaloa',
'SON': 'Sonora',
'TAB': 'Tabasco',
'TAM': 'Tamaulipas',
'TLA': 'Tlaxcala',
'VER': 'Veracruz',
'YUC': 'Yucatán',
'ZAC': 'Zacatecas',
}
def __init__(self, xml):
self._values = {
'leyendas': (),
}
self._root = ET.parse(BytesIO(xml.encode())).getroot()
self._get_values()
@property
def values(self):
return self._values
def _get_values(self):
version = self._root.attrib['Version']
ns = f'cfdi{version}'
self.NS['cfdi'] = self.NS_VERSION[ns]
self._complementos()
return
def _set_carta_porte_domicilio(self, data):
municipio = data['Municipio']
estado = self.ESTADOS[data['Estado']]
pais = self.PAISES[data['Pais']]
domicilio = f"{municipio}, {estado}, {pais}, C.P. {data['CodigoPostal']}"
return domicilio
def _complementos(self):
path = '//cfdi:Complemento'
complemento = self._root.xpath(path, namespaces=self.NS)[0]
path = '//divisas:Divisas'
divisas = complemento.xpath(path, namespaces=self.NS)
if divisas:
d = CaseInsensitiveDict(divisas[0].attrib)
d.pop('version', '')
self._values.update({'divisas': d})
path = '//leyendasFisc:Leyenda'
node = complemento.xpath(path, namespaces=self.NS)
if node:
leyendas = [CaseInsensitiveDict(n.attrib) for n in node]
self._values['leyendas'] = leyendas
path = '//cartaporte20:CartaPorte'
carta_porte = complemento.xpath(path, namespaces=self.NS)
if carta_porte:
values = CaseInsensitiveDict(carta_porte[0].attrib)
for node in carta_porte[0]:
if 'FiguraTransporte' in node.tag:
figuras = CaseInsensitiveDict(node[0].attrib)
figuras['TipoFigura'] = self.tipo_figura[figuras['TipoFigura']]
values['figuras'] = figuras
elif 'Mercancias' in node.tag:
mercancias = CaseInsensitiveDict(node.attrib)
detalle = [CaseInsensitiveDict(n.attrib)
for n in node if 'Mercancia' in n.tag]
values['mercancias'] = {
'mercancias': mercancias,
'detalle': detalle,
}
path = '//cartaporte20:Autotransporte'
node_auto = node.xpath(path, namespaces=self.NS)[0]
values_auto = CaseInsensitiveDict(node_auto.attrib)
values['autotransporte'] = values_auto
path = '//cartaporte20:IdentificacionVehicular'
node_tmp = node_auto.xpath(path, namespaces=self.NS)[0]
values_auto = CaseInsensitiveDict(node_tmp.attrib)
values['autotransporte'].update(values_auto)
path = '//cartaporte20:Seguros'
node_tmp = node_auto.xpath(path, namespaces=self.NS)[0]
values_auto = CaseInsensitiveDict(node_tmp.attrib)
values['autotransporte'].update(values_auto)
path = '//cartaporte20:Remolques'
try:
node_tmp = node_auto.xpath(path, namespaces=self.NS)[0][0]
values_auto = CaseInsensitiveDict(node_tmp.attrib)
values['autotransporte'].update(values_auto)
except IndexError:
pass
elif 'Ubicaciones' in node.tag:
ubicaciones = []
for n in node:
ubicacion = CaseInsensitiveDict(n.attrib)
ubicacion['domicilio'] = self._set_carta_porte_domicilio(
CaseInsensitiveDict(n[0].attrib))
ubicaciones.append(ubicacion)
values['ubicaciones'] = ubicaciones
self._values['carta_porte'] = values
return
def _call(args):
return subprocess.check_output(args, shell=True).decode()
def _run(args, wait=False):
result = ''
cmd = shlex.split(args)
if wait:
result = subprocess.run(cmd, shell=True, check=True).stdout.decode()
else:
subprocess.run(cmd)
return result
def _join(*paths):
return os.path.join(*paths)
def run_in_thread(fn):
def run(*k, **kw):
t = threading.Thread(target=fn, args=k, kwargs=kw)
t.start()
return t
return run
def send_mail(data):
msg = ''
ok = True
server = SendMail(data['server'])
if server.is_connect:
msg = server.send(data['mail'])
else:
msg = server.error
ok = False
server.close()
return {'ok': ok, 'msg': msg}
def round_up(value):
return int(math.ceil(value))
def _get_key(password):
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(password.encode())
key = base64.urlsafe_b64encode(digest.finalize())
return key
def encrypt(data, password):
f = Fernet(_get_key(password))
return f.encrypt(data.encode()).decode()
def decrypt(data, password):
f = Fernet(_get_key(password))
return f.decrypt(data.encode()).decode()
def to_bool(value):
return bool(int(value))
def get_url(url):
r = requests.get(url).text
return r
def parse_date(value, next_day=False):
d = parser.parse(value)
if next_day:
return d + datetime.timedelta(days=1)
return d
def to_zip(files):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
for file_name, data in files.items():
zip_file.writestr(file_name, data)
return zip_buffer.getvalue()
def dumps(data):
return json.dumps(data, default=str)
def loads(data):
return json.loads(data)
def json_loads(path):
return json.loads(open(path, 'r').read())
def _validate_db_rfc():
con = sqlite3.connect(DB_COMPANIES)
sql = """
CREATE TABLE IF NOT EXISTS names(
rfc TEXT NOT NULL COLLATE NOCASE UNIQUE,
con TEXT NOT NULL
);
"""
cursor = con.cursor()
cursor.executescript(sql)
cursor.close()
con.close()
return
def _sql_companies(sql, args=()):
_validate_db_rfc()
con = sqlite3.connect(DB_COMPANIES)
cursor = con.cursor()
try:
cursor.execute(sql, args)
data = cursor.fetchall()
except sqlite3.IntegrityError as e:
log.error(e)
return False
con.commit()
cursor.close()
con.close()
return data
def get_data_con(rfc):
data = rfc_get(rfc)[0][0]
return loads(data)
def rfc_get(rfc=''):
if rfc:
sql = "SELECT con FROM names WHERE rfc = ?"
w = (rfc,)
else:
w = ()
sql = "SELECT * FROM names"
data = _sql_companies(sql, w)
return data
def rfc_exists(rfc):
sql = "SELECT rfc FROM names WHERE rfc = ?"
data = _sql_companies(sql, (rfc,))
if isinstance(data, bool):
return
return bool(data)
def rfc_add(rfc, con):
sql = "INSERT INTO names VALUES (?, ?)"
data = _sql_companies(sql, (rfc.upper(), dumps(con)))
return True
def db_create(user):
args = f'{PSQL} -c "CREATE ROLE {user} WITH LOGIN ENCRYPTED PASSWORD \'{user}\';"'
_run(args)
args = f'{PSQL} -c "CREATE DATABASE {user} WITH OWNER {user};"'
_run(args)
return True
def db_delete(user, path, no_database=False):
sql = "DELETE FROM names WHERE rfc = ?"
data = _sql_companies(sql, (user,))
if no_database:
return True
user = user.replace('&', '').lower()
dt = now().strftime('%y%m%d_%H%M')
path_bk = _join(path, f'{user}_{dt}.bk')
args = f'{PG_DUMP} -d {user} -Fc -f "{path_bk}"'
_run(args)
args = f'{PSQL} -c "DROP DATABASE {user};"'
_run(args)
args = f'{PSQL} -c "DROP ROLE {user};"'
_run(args)
return True
def _get_pass(rfc):
return rfc
def _backup_db(rfc, is_mv, url_seafile):
log.info(f'Generando backup de: {rfc.upper()}')
bk_name = f'{rfc}.bk'
path = _join(PATHS['BK'], bk_name)
args = f'{PG_DUMP} -d {rfc} -Fc -f "{path}"'
_run(args)
log.info('\tBackup local generado...')
if is_mv:
path_target = _validate_path_local()
if path_target:
path_target = _join(path_target, bk_name)
shutil.copy(path, path_target)
else:
log.error('\tNo existe la carpeta compartida...')
return
def db_backup(is_mv, url_seafile):
data = rfc_get()
if not len(data):
msg = 'Sin bases de datos a respaldar'
log.info(msg)
return
for rfc, _ in data:
_backup_db(rfc.lower(), is_mv, url_seafile)
return
def _validate_path_local():
path_bk = _join(str(Path.home()), PATHS['LOCAL'])
if not os.path.isdir(path_bk):
path_bk = ''
return path_bk
def db_backup_local():
path_bk = _validate_path_local()
if not path_bk:
msg = 'No existe la carpeta local'
return {'ok': False, 'msg': msg}
data = rfc_get()
if not len(data):
msg = 'Sin bases de datos a respaldar'
return {'ok': False, 'msg': msg}
for row in data:
user = row[0].lower()
db = loads(row[1])['name']
path = _join(path_bk, '{}.bk'.format(user))
args = f'{PG_DUMP} -d {user} -Fc -f "{path}"'
_run(args)
msg = 'Bases de datos respaldadas correctamente'
result = {'ok': True, 'msg': msg}
return result
def now():
return datetime.datetime.now().replace(microsecond=0)
def get_days(date):
return (now() - date).days
def get_pass():
pass1 = getpass.getpass('Introduce la contraseña: ')
pass2 = getpass.getpass('Confirma la contraseña: ')
if pass1 != pass2:
msg = 'Las contraseñas son diferentes'
return False, msg
password = pass1.strip()
if not password:
msg = 'La contraseña es necesaria'
return False, msg
return True, password
def xml_stamp(xml, auth):
if not DEBUG and not auth:
msg = 'Sin datos para timbrar'
result = {'ok': False, 'error': msg}
return result
result = {'ok': True, 'error': ''}
pac = PACS[auth['pac']]()
response = pac.stamp(xml, auth)
if not response:
result['ok'] = False
result['error'] = pac.error
return result
result.update(response)
return result
def xml_cancel(xml, auth, cert, name):
msg = 'Factura cancelada correctamente'
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}}
pac = PACS[name]()
# ~ result = pac.cancel_xml(certificado.rfc, str(uuid).upper(),
# ~ certificado.cer_pem.encode(), _get_pem_from_pfx(certificado))
# ~ if result:
# ~ codes = {None: '',
# ~ 'Could not get UUID Text': 'UUID no encontrado',
# ~ 'Invalid Passphrase': 'Contraseña inválida',
# ~ }
# ~ if not result['CodEstatus'] is None:
# ~ data['ok'] = False
# ~ data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus'])
# ~ else:
data['ok'] = False
data['msg'] = pac.error
data['msg'] = 'Error Test'
return data, result
def get_client_balance(auth, rfc=''):
pac = PACS[auth['pac']]()
balance = pac.client_balance(auth, rfc)
if pac.error:
balance = 'p/e'
return balance
def get_cert(args):
cer = base64.b64decode(args['cer'].split(',')[1])
key = base64.b64decode(args['key'].split(',')[1])
cert = SATCertificate(cer, key, args['contra'])
return cert
def make_xml(data, certificado):
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
# ~ if DEBUG:
# ~ data['emisor']['Rfc'] = certificado.rfc
# ~ data['emisor']['RegimenFiscal'] = '603'
cfdi = CFDI()
xml = ET.parse(BytesIO(cfdi.get_xml(data).encode()))
path_xslt = _join(PATHS['xslt'], 'cadena.xslt')
xslt = open(path_xslt, 'rb')
transfor = ET.XSLT(ET.parse(xslt))
cadena = str(transfor(xml)).encode()
stamp = cert.sign(cadena)
xslt.close()
return cfdi.add_sello(stamp, cert.cer_txt)
def get_pac_by_rfc(cfdi):
tree = ET.fromstring(cfdi.encode())
path = 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@RfcProvCertif)'
rfc_pac = tree.xpath(path, namespaces=NS_CFDI)
return RFCS[rfc_pac]
def _cancel_with_cert(invoice, args, auth, certificado):
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
pac = PACS[auth['pac']]()
contra = ''
try:
contra = decrypt(bytes(certificado.p12).decode(), certificado.serie)
except Exception as e:
log.error(e)
if auth['pac'] == 'comercio':
msg = 'Es necesario subir de nuevo los certificados de sello'
data = {'ok': False, 'msg': msg, 'row': {}}
return data
info = {'cer': cert.cer_pem, 'key': cert.key_pem, 'cer_ori': cert.cer,
'key_enc': certificado.key, 'pass': contra, 'args': args}
result = pac.cancel(invoice.xml, info, auth)
if pac.error:
data = {'ok': False, 'msg': pac.error, 'row': {}}
return data
msg = 'Factura cancelada correctamente'
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'},
'date': result['date'], 'acuse': result['acuse']}
return data
def cancel_xml_sign(invoice, args, auth, certificado):
# ~ if auth['pac'] == 'finkok':
return _cancel_with_cert(invoice, args, auth, certificado)
cert = SATCertificate(certificado.cer, certificado.key_enc.encode())
pac = PACS[auth['pac']]()
folio_new = ''
if args['uuid']:
folio_new = f' FolioSustitucion="{args["uuid"]}"'
data = {
'rfc': certificado.rfc,
'fecha': now().isoformat()[:19],
'uuid': str(invoice.uuid).upper(),
'motivo': args['reason'],
'folio': folio_new,
}
template = TEMPLATE_CANCEL.format(**data)
tree = ET.fromstring(template.encode())
sign_xml = cert.sign_xml(tree)
# ~ print(sign_xml)
result = pac.cancel_xml(sign_xml, auth, invoice.xml)
if pac.error:
data = {'ok': False, 'msg': pac.error, 'row': {}}
return data
msg = 'Factura cancelada correctamente'
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'},
'date': result['date'], 'acuse': result['acuse']}
return data
def _get_data_sat(xml):
BF = 'string(//*[local-name()="{}"]/@{})'
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
try:
tree = ET.fromstring(xml.encode())
emisor = escape(
tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
)
receptor = escape(
tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
)
total = tree.get('total') or tree.get('Total')
uuid = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
except Exception as e:
return ''
data = f'?re={emisor}&amp;rr={receptor}&amp;tt={total}&amp;id={uuid}'
return data
def get_status_sat(xml):
data = _get_data_sat(xml)
if not data:
return 'XML inválido'
data = """<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<Consulta xmlns="http://tempuri.org/">
<expresionImpresa>
{}
</expresionImpresa>
</Consulta>
</soap:Body>
</soap:Envelope>""".format(data)
headers = {
'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
'Content-type': 'text/xml; charset="UTF-8"'
}
URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc'
try:
result = requests.post(URL, data=data, headers=headers)
tree = ET.fromstring(result.text)
node = tree.xpath("//*[local-name() = 'Estado']")[0]
except Exception as e:
return 'Error: {}'.format(str(e))
return node.text
def spaces(value):
return '\n'.join([' '.join(l.split()) for l in value.split('\n')])
def to_slug(string):
value = (unicodedata.normalize('NFKD', string)
.encode('ascii', 'ignore')
.decode('ascii').lower())
return value.replace(' ', '_')
def read_csv(path, args={'delimiter': '|'}):
with open(path) as f:
reader = csv.DictReader(f, **args)
rows = [r for r in reader]
return rows
def _products_from_xml(rfc, data):
result = {'status': 'server', 'error': ''}
cfdi = CfdiRead(data)
if not DEBUG and rfc != cfdi.rfc_receptor:
msg = f'El receptor no es: {rfc}'
result['error'] = msg
return result
result['data'] = cfdi.data
# ~ result['data']['xml'] = cfdi.source
del result['data']['cfdi']['Certificado']
del result['data']['cfdi']['NoCertificado']
del result['data']['cfdi']['Sello']
del result['data']['timbre']['SelloCFD']
del result['data']['timbre']['SelloSAT']
emisor = result['data']['emisor']
emisor['rfc'] = emisor.pop('Rfc')
emisor['nombre'] = emisor.pop('Nombre')
result['data']['emisor'] = emisor
products = result['data']['conceptos']
rows = []
for p in products:
row = {
'key': p['NoIdentificacion'],
'key_sat': p['ClaveProdServ'],
'description': p['Descripcion'],
'unit': p['Unidad'],
'unit_value': p['ValorUnitario'],
'import': p['Importe'],
'cant': p['Cantidad'],
}
rows.append(row)
result['data']['conceptos'] = rows
return result
def save_file(path, data, modo='wb'):
try:
with open(path, modo) as f:
f.write(data)
return True
except:
return False
def _save_template(rfc, name, file_obj):
result = {'status': 'server', 'ok': False}
ext1 = name[-3:]
ext2 = file_obj.filename.split('.')[-1].lower()
if ext1 != ext2:
msg = f'Extensión incorrecta del archivo: {ext2}'
result['error'] = msg
return result
rfc = rfc.lower()
path = _join(PATHS['USER'], f'{rfc}{name}')
if save_file(path, file_obj.file.read()):
result['ok'] = True
return result
def upload_file(rfc, name, file_obj):
if name == 'productsadd':
return _products_from_xml(rfc, file_obj.file.read())
return _save_template(rfc, name, file_obj)
def get_qr(data, kind='svg', in_base64=False):
buffer = io.BytesIO()
segno.make(data).save(buffer, kind=kind, scale=8, border=2)
qr = buffer
if in_base64:
qr = base64.b64encode(qr.getvalue()).decode()
return qr