cfdi-test/source/util.py

464 lines
14 KiB
Python

#!/usr/bin/env python3
import datetime
import getpass
import json
import logging
import os
import re
from io import BytesIO
from pathlib import Path
import lxml.etree as ET
from finkok import PACFinkok
from comercio import PACComercioDigital
from pycert import SATCertificate
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
PACs = {
'finkok': PACFinkok,
'comercio': PACComercioDigital,
}
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
}
def _exists(path):
if not path:
return False
return Path(path).exists()
def _join(*paths):
return str(Path(paths[0]).joinpath(*paths[1:]))
class CFDI(object):
_version = '3.3'
_prefix = 'cfdi'
_xmlns = 'http://www.sat.gob.mx/cfd/3'
schema = f'{_xmlns} http://www.sat.gob.mx/sitio_internet/cfd/3/cfdv33.xsd'
_pagos = 'http://www.sat.gob.mx/Pagos'
PAGOS = {
'version': '1.0',
'prefix': _pagos,
'ns': {'pago10': _pagos},
'schema': f' {_pagos} http://www.sat.gob.mx/sitio_internet/cfd/Pagos/Pagos10.xsd',
}
_nomina = 'http://www.sat.gob.mx/nomina12'
NOMINA = {
'version': '1.2',
'prefix': _nomina,
'ns': {'nomina12': _nomina},
'schema': f' {_nomina} http://www.sat.gob.mx/sitio_internet/cfd/nomina/nomina12.xsd',
}
def __init__(self, data):
self.error = ''
self._root = None
self._cfdi = self._make_cfdi(data)
@property
def xml(self):
return self._cfdi
def save(self, path):
doc = ET.ElementTree(self._root)
doc.write(path, pretty_print=True, encoding='utf-8',
doctype='<?xml version="1.0" encoding="utf-8"?>')
return
def _make_cfdi(self, data):
self._validate_data(data)
self._comprobante(data['comprobante'])
self._relacionados(data.get('relacionados', {}))
self._emisor(data['emisor'])
self._receptor(data['receptor'])
self._conceptos(data['conceptos'])
self._impuestos(data.get('impuestos', {}))
self._complementos(data)
xml = ET.tostring(self._root,
pretty_print=True, xml_declaration=True, encoding='utf-8')
return xml.decode()
def _validate_data(self, data):
self._schema = self.schema
self._exists_pagos = 'pagos' in data
self._exists_nomina = 'nomina' in data
self._node_complement = False
if self._exists_pagos:
self._node_complement = True
self._schema += self.PAGOS['schema']
if self._exists_nomina:
self._node_complement = True
self._schema += self.NOMINA['schema']
return
def _comprobante(self, attr):
NSMAP = {
self._prefix: self._xmlns,
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
}
if self._exists_pagos:
NSMAP.update(self.PAGOS['ns'])
if self._exists_nomina:
NSMAP.update(self.NOMINA['ns'])
attr_qname = ET.QName(
'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation')
schema = {attr_qname: self._schema}
if not 'Version' in attr:
attr['Version'] = self._version
node_name = f'{{{self._xmlns}}}Comprobante'
self._root = ET.Element(node_name, schema, **attr, nsmap=NSMAP)
return
def _relacionados(self, attr):
return
def _emisor(self, attr):
node_name = f'{{{self._xmlns}}}Emisor'
emisor = ET.SubElement(self._root, node_name, attr)
return
def _receptor(self, attr):
node_name = f'{{{self._xmlns}}}Receptor'
emisor = ET.SubElement(self._root, node_name, attr)
return
def _conceptos(self, data):
node_name = f'{{{self._xmlns}}}Conceptos'
conceptos = ET.SubElement(self._root, node_name)
for row in data:
complemento = row.pop('complemento', {})
taxes = row.pop('impuestos', {})
node_name = f'{{{self._xmlns}}}Concepto'
concepto = ET.SubElement(conceptos, node_name, row)
if not taxes:
continue
if taxes['traslados'] or taxes['retenciones']:
node_name = f'{{{self._xmlns}}}Impuestos'
impuestos = ET.SubElement(concepto, node_name)
if 'traslados' in taxes and taxes['traslados']:
node_name = f'{{{self._xmlns}}}Traslados'
traslados = ET.SubElement(impuestos, node_name)
node_name = f'{{{self._xmlns}}}Traslado'
for traslado in taxes['traslados']:
ET.SubElement(traslados, node_name, traslado)
if 'retenciones' in taxes and taxes['retenciones']:
node_name = f'{{{self._xmlns}}}Retenciones'
retenciones = ET.SubElement(impuestos, node_name)
node_name = f'{{{self._xmlns}}}Retencion'
for retencion in taxes['retenciones']:
ET.SubElement(retenciones, node_name, retencion)
return
def _impuestos(self, data):
if not data:
return
node_name = f'{{{self._xmlns}}}Impuestos'
# ~ ET.SubElement(self._root, node_name)
retenciones = data.pop('retenciones', ())
traslados = data.pop('traslados', ())
taxes = ET.SubElement(self._root, node_name, data)
if retenciones:
node_name = f'{{{self._xmlns}}}Retenciones'
subnode = ET.SubElement(taxes, node_name)
node_name = f'{{{self._xmlns}}}Retencion'
for row in retenciones:
ET.SubElement(subnode, node_name, row)
if traslados:
node_name = f'{{{self._xmlns}}}Traslados'
subnode = ET.SubElement(taxes, node_name)
node_name = f'{{{self._xmlns}}}Traslado'
for row in traslados:
ET.SubElement(subnode, node_name, row)
return
def _complementos(self, data):
if not self._node_complement:
return
node_name = f'{{{self._xmlns}}}Complemento'
complemento = ET.SubElement(self._root, node_name)
if self._exists_pagos:
self._pagos(complemento, data['pagos'])
if self._exists_nomina:
self._nomina(complemento, data['nomina'])
return
def _pagos(self, complemento, data):
node_name = f"{{{self.PAGOS['prefix']}}}Pagos"
attr = {'Version': self.PAGOS['version']}
node_pagos = ET.SubElement(complemento, node_name, attr)
for pago in data:
documentos = pago.pop('documentos')
node_name = f"{{{self.PAGOS['prefix']}}}Pago"
node_pago = ET.SubElement(node_pagos, node_name, pago)
node_name = f"{{{self.PAGOS['prefix']}}}DoctoRelacionado"
for doc in documentos:
ET.SubElement(node_pago, node_name, doc)
return
def _nomina(self, complemento, data):
emisor = data.pop('emisor')
receptor = data.pop('receptor')
percepciones = data.pop('percepciones', {})
deducciones = data.pop('deducciones', {})
otros = data.pop('otros', ())
if not 'Version' in data:
data['Version'] = self.NOMINA['version']
node_name = f"{{{self.NOMINA['prefix']}}}Nomina"
node_nomina = ET.SubElement(complemento, node_name, data)
node_name = f"{{{self.NOMINA['prefix']}}}Emisor"
ET.SubElement(node_nomina, node_name, emisor)
node_name = f"{{{self.NOMINA['prefix']}}}Receptor"
ET.SubElement(node_nomina, node_name, receptor)
if percepciones:
attr = percepciones
percepciones = attr.pop('percepciones')
node_name = f"{{{self.NOMINA['prefix']}}}Percepciones"
node = ET.SubElement(node_nomina, node_name, attr)
node_name = f"{{{self.NOMINA['prefix']}}}Percepcion"
for percepcion in percepciones:
ET.SubElement(node, node_name, percepcion)
if deducciones:
attr = deducciones
deducciones = attr.pop('deducciones')
node_name = f"{{{self.NOMINA['prefix']}}}Deducciones"
node = ET.SubElement(node_nomina, node_name, attr)
node_name = f"{{{self.NOMINA['prefix']}}}Deduccion"
for deduccion in deducciones:
ET.SubElement(node, node_name, deduccion)
if otros:
node_name = f"{{{self.NOMINA['prefix']}}}OtrosPagos"
node = ET.SubElement(node_nomina, node_name)
node_name = f"{{{self.NOMINA['prefix']}}}OtroPago"
for otro in otros:
subsidio = otro.pop('subsidio', {})
sub_node = ET.SubElement(node, node_name, otro)
if subsidio:
sub_name = f"{{{self.NOMINA['prefix']}}}SubsidioAlEmpleo"
ET.SubElement(sub_node, sub_name, subsidio)
return
def _validate_args_validar_cert(args):
data = {}
path_cert = args.dir_cert
name = args.nombre
if not _exists(path_cert):
msg = 'La ruta con los certificados es requerida'
log.error(msg)
return False, data
path_cer = _join(path_cert, f'{name}.cer')
path_key = _join(path_cert, f'{name}.key')
path_enc = _join(path_cert, f'{name}.enc')
if not _exists(path_cer):
msg = f'No se encontró el archivo CER en: {path_cer}'
log.error(msg)
return False, data
if not _exists(path_key):
msg = f'No se encontró el archivo KEY en: {path_key}'
log.error(msg)
return False, data
password = getpass.getpass('Introduce tu clave del archivo KEY: ')
if not password.strip():
msg = 'La clave es requerida'
log.error(msg)
return False, data
data['cer'] = path_cer
data['key'] = path_key
data['pass'] = password
data['enc'] = path_enc
return True, data
def validar_cert(args):
result, data = _validate_args_validar_cert(args)
if not result:
return
cer = open(data['cer'], 'rb').read()
key = open(data['key'], 'rb').read()
cert = SATCertificate(cer, key, password=data['pass'])
if cert.error:
log.error(cert.error)
return
if cert.is_fiel:
msg = 'El certificado es FIEL, no podrás timbrar.'
log.error(msg)
return
Path(data['enc']).write_bytes(cert.key_enc)
msg = 'Los datos del certificado son:'
log.info(msg)
log.info(f'\n{cert}')
msg = 'Puedes timbrar con estos certificados'
log.info(msg)
return
def _make(path, target):
with open(path, 'r', encoding='utf-8') as f:
data = json.loads(f.read())
cfdi = CFDI(data)
p = Path(path)
new_path = f'{target}/{p.stem}.xml'
cfdi.save(new_path)
log.info(f'\tGenerado: {new_path}')
return
def make(source, target):
docs = get_files(source, 'json')
for doc in docs:
log.info(f'Generando: {doc}')
_make(doc, target)
return
def _stamp(path, target):
current_path = os.path.dirname(__file__)
path_cer = join(current_path, 'certificados')
path_xslt = join(current_path, 'xslt', 'cadena.xslt')
doc = ET.parse(path)
for pac in PACs.keys():
xslt = open(path_xslt, 'rb')
cer = read_file(join(path_cer, f'{pac}.cer'))
key = read_file(join(path_cer, f'{pac}.enc'))
cert = SATCertificate(cer, key)
# ~ print(cert)
root = doc.getroot()
root.attrib['Fecha'] = datetime.datetime.now().isoformat()[:19]
root.attrib['NoCertificado'] = cert.serial_number
root.attrib['Certificado'] = cert.cer_txt
emisor = root.xpath('//cfdi:Emisor', namespaces=NS_CFDI)[0]
emisor.attrib['Rfc'] = cert.rfc
emisor.attrib['RegimenFiscal'] = '601'
transfor = ET.XSLT(ET.parse(xslt))
cadena = str(transfor(doc)).encode()
stamp = cert.sign(cadena)
root.attrib['Sello'] = stamp
xslt.close()
p = Path(path)
new_path = f'{target}/{p.stem}_{pac}.xml'
doc.write(new_path, pretty_print=True, encoding='utf-8',
doctype='<?xml version="1.0" encoding="utf-8"?>')
log.info(f'\tSellado: {new_path}')
return
def stamp(source, target):
docs = get_files(source)
for doc in docs:
log.info(f'Sellando: {doc}')
_stamp(doc, target)
return
def _stamp_pac(path, target):
pac = None
for key in PACs.keys():
if key in path:
pac = PACs[key]()
break
if pac is None:
msg = 'Diferente PAC en documento'
log.debug(msg)
return
data = open(path, 'rb').read()
xml = pac.stamp(data)
if pac.error:
log.error(pac.error)
return
p = Path(path)
new_path = f'{target}/{p.stem}_timbrada.xml'
with open(new_path, 'w') as f:
f.write(xml)
log.info(f'\tTimbrada: {new_path}')
return
def stamp_pac(source, target):
docs = get_files(source)
for doc in docs:
log.info(f'Enviando: {doc}')
_stamp_pac(doc, target)
return
def read_file(path):
with open(path, 'rb') as f:
data = f.read()
return data
def get_files(path, ext='xml'):
docs = []
for folder, _, files in os.walk(path):
pattern = re.compile('\.{}'.format(ext), re.IGNORECASE)
docs += [join(folder,f) for f in files if pattern.search(f)]
return tuple(docs)
def _replace_ext(path, ext):
return str(Path(path).with_suffix(ext))