532 lines
16 KiB
Python
532 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import datetime
|
|
import getpass
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
import lxml.etree as ET
|
|
|
|
from finkok import PACFinkok
|
|
from comercio import PACComercioDigital
|
|
from pycert import SATCertificate
|
|
|
|
|
|
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
|
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
|
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
|
|
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
|
|
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
|
|
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
PACs = {
|
|
'finkok': PACFinkok,
|
|
'comercio': PACComercioDigital,
|
|
}
|
|
NS_CFDI = {
|
|
'cfdi': 'http://www.sat.gob.mx/cfd/3',
|
|
}
|
|
|
|
|
|
def validar_dir(base, name):
|
|
path = _join(base, name)
|
|
if not _exists(path):
|
|
os.makedirs(path)
|
|
return path
|
|
|
|
|
|
def _exists(path):
|
|
if not path:
|
|
return False
|
|
return Path(path).exists()
|
|
|
|
|
|
def _join(*paths):
|
|
return str(Path(paths[0]).joinpath(*paths[1:]))
|
|
|
|
|
|
class CFDI(object):
|
|
_version = '3.3'
|
|
_prefix = 'cfdi'
|
|
_xmlns = 'http://www.sat.gob.mx/cfd/3'
|
|
schema = f'{_xmlns} http://www.sat.gob.mx/sitio_internet/cfd/3/cfdv33.xsd'
|
|
_pagos = 'http://www.sat.gob.mx/Pagos'
|
|
PAGOS = {
|
|
'version': '1.0',
|
|
'prefix': _pagos,
|
|
'ns': {'pago10': _pagos},
|
|
'schema': f' {_pagos} http://www.sat.gob.mx/sitio_internet/cfd/Pagos/Pagos10.xsd',
|
|
}
|
|
_nomina = 'http://www.sat.gob.mx/nomina12'
|
|
NOMINA = {
|
|
'version': '1.2',
|
|
'prefix': _nomina,
|
|
'ns': {'nomina12': _nomina},
|
|
'schema': f' {_nomina} http://www.sat.gob.mx/sitio_internet/cfd/nomina/nomina12.xsd',
|
|
}
|
|
_cartaporte = 'http://www.sat.gob.mx/CartaPorte20'
|
|
CARTAPORTE = {
|
|
'version': '2.0',
|
|
'prefix': _cartaporte,
|
|
'ns': {'cartaporte20': _cartaporte},
|
|
'schema': f' {_cartaporte} http://www.sat.gob.mx/sitio_internet/cfd/CartaPorte/CartaPorte20.xsd',
|
|
}
|
|
|
|
def __init__(self, data):
|
|
self.error = ''
|
|
self._root = None
|
|
self._cfdi = self._make_cfdi(data)
|
|
|
|
@property
|
|
def xml(self):
|
|
return self._cfdi
|
|
|
|
def save(self, path):
|
|
doc = ET.ElementTree(self._root)
|
|
doc.write(path, pretty_print=True, encoding='utf-8',
|
|
doctype='<?xml version="1.0" encoding="utf-8"?>')
|
|
return
|
|
|
|
def _make_cfdi(self, data):
|
|
self._validate_data(data)
|
|
self._comprobante(data['comprobante'])
|
|
self._relacionados(data.get('relacionados', {}))
|
|
self._emisor(data['emisor'])
|
|
self._receptor(data['receptor'])
|
|
self._conceptos(data['conceptos'])
|
|
self._impuestos(data.get('impuestos', {}))
|
|
self._complementos(data)
|
|
|
|
xml = ET.tostring(self._root,
|
|
pretty_print=True, xml_declaration=True, encoding='utf-8')
|
|
return xml.decode()
|
|
|
|
def _validate_data(self, data):
|
|
self._schema = self.schema
|
|
self._exists_pagos = 'pagos' in data
|
|
self._exists_nomina = 'nomina' in data
|
|
self._exists_cartaporte = 'cartaporte' in data
|
|
self._node_complement = False
|
|
|
|
if self._exists_pagos:
|
|
self._node_complement = True
|
|
self._schema += self.PAGOS['schema']
|
|
if self._exists_nomina:
|
|
self._node_complement = True
|
|
self._schema += self.NOMINA['schema']
|
|
if self._exists_cartaporte:
|
|
self._node_complement = True
|
|
self._schema += self.CARTAPORTE['schema']
|
|
return
|
|
|
|
def _comprobante(self, attr):
|
|
NSMAP = {
|
|
self._prefix: self._xmlns,
|
|
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
|
|
}
|
|
if self._exists_pagos:
|
|
NSMAP.update(self.PAGOS['ns'])
|
|
if self._exists_nomina:
|
|
NSMAP.update(self.NOMINA['ns'])
|
|
if self._exists_cartaporte:
|
|
NSMAP.update(self.CARTAPORTE['ns'])
|
|
|
|
attr_qname = ET.QName(
|
|
'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation')
|
|
schema = {attr_qname: self._schema}
|
|
|
|
if not 'Version' in attr:
|
|
attr['Version'] = self._version
|
|
|
|
node_name = f'{{{self._xmlns}}}Comprobante'
|
|
self._root = ET.Element(node_name, schema, **attr, nsmap=NSMAP)
|
|
return
|
|
|
|
def _relacionados(self, attr):
|
|
|
|
return
|
|
|
|
def _emisor(self, attr):
|
|
node_name = f'{{{self._xmlns}}}Emisor'
|
|
emisor = ET.SubElement(self._root, node_name, attr)
|
|
return
|
|
|
|
def _receptor(self, attr):
|
|
node_name = f'{{{self._xmlns}}}Receptor'
|
|
emisor = ET.SubElement(self._root, node_name, attr)
|
|
return
|
|
|
|
def _conceptos(self, data):
|
|
node_name = f'{{{self._xmlns}}}Conceptos'
|
|
conceptos = ET.SubElement(self._root, node_name)
|
|
for row in data:
|
|
complemento = row.pop('complemento', {})
|
|
taxes = row.pop('impuestos', {})
|
|
|
|
node_name = f'{{{self._xmlns}}}Concepto'
|
|
concepto = ET.SubElement(conceptos, node_name, row)
|
|
|
|
if not taxes:
|
|
continue
|
|
|
|
if taxes['traslados'] or taxes['retenciones']:
|
|
node_name = f'{{{self._xmlns}}}Impuestos'
|
|
impuestos = ET.SubElement(concepto, node_name)
|
|
if 'traslados' in taxes and taxes['traslados']:
|
|
node_name = f'{{{self._xmlns}}}Traslados'
|
|
traslados = ET.SubElement(impuestos, node_name)
|
|
node_name = f'{{{self._xmlns}}}Traslado'
|
|
for traslado in taxes['traslados']:
|
|
ET.SubElement(traslados, node_name, traslado)
|
|
if 'retenciones' in taxes and taxes['retenciones']:
|
|
node_name = f'{{{self._xmlns}}}Retenciones'
|
|
retenciones = ET.SubElement(impuestos, node_name)
|
|
node_name = f'{{{self._xmlns}}}Retencion'
|
|
for retencion in taxes['retenciones']:
|
|
ET.SubElement(retenciones, node_name, retencion)
|
|
return
|
|
|
|
def _impuestos(self, data):
|
|
if not data:
|
|
return
|
|
|
|
node_name = f'{{{self._xmlns}}}Impuestos'
|
|
# ~ ET.SubElement(self._root, node_name)
|
|
|
|
retenciones = data.pop('retenciones', ())
|
|
traslados = data.pop('traslados', ())
|
|
taxes = ET.SubElement(self._root, node_name, data)
|
|
|
|
if retenciones:
|
|
node_name = f'{{{self._xmlns}}}Retenciones'
|
|
subnode = ET.SubElement(taxes, node_name)
|
|
node_name = f'{{{self._xmlns}}}Retencion'
|
|
for row in retenciones:
|
|
ET.SubElement(subnode, node_name, row)
|
|
|
|
if traslados:
|
|
node_name = f'{{{self._xmlns}}}Traslados'
|
|
subnode = ET.SubElement(taxes, node_name)
|
|
node_name = f'{{{self._xmlns}}}Traslado'
|
|
for row in traslados:
|
|
ET.SubElement(subnode, node_name, row)
|
|
|
|
return
|
|
|
|
def _complementos(self, data):
|
|
if not self._node_complement:
|
|
return
|
|
|
|
node_name = f'{{{self._xmlns}}}Complemento'
|
|
complemento = ET.SubElement(self._root, node_name)
|
|
|
|
if self._exists_pagos:
|
|
self._pagos(complemento, data['pagos'])
|
|
|
|
if self._exists_nomina:
|
|
self._nomina(complemento, data['nomina'])
|
|
|
|
if self._exists_cartaporte:
|
|
self._cartaporte(complemento, data['cartaporte'])
|
|
return
|
|
|
|
def _pagos(self, complemento, data):
|
|
node_name = f"{{{self.PAGOS['prefix']}}}Pagos"
|
|
attr = {'Version': self.PAGOS['version']}
|
|
node_pagos = ET.SubElement(complemento, node_name, attr)
|
|
for pago in data:
|
|
documentos = pago.pop('documentos')
|
|
node_name = f"{{{self.PAGOS['prefix']}}}Pago"
|
|
node_pago = ET.SubElement(node_pagos, node_name, pago)
|
|
node_name = f"{{{self.PAGOS['prefix']}}}DoctoRelacionado"
|
|
for doc in documentos:
|
|
ET.SubElement(node_pago, node_name, doc)
|
|
return
|
|
|
|
def _nomina(self, complemento, data):
|
|
emisor = data.pop('emisor')
|
|
receptor = data.pop('receptor')
|
|
percepciones = data.pop('percepciones', {})
|
|
deducciones = data.pop('deducciones', {})
|
|
otros = data.pop('otros', ())
|
|
|
|
if not 'Version' in data:
|
|
data['Version'] = self.NOMINA['version']
|
|
|
|
node_name = f"{{{self.NOMINA['prefix']}}}Nomina"
|
|
node_nomina = ET.SubElement(complemento, node_name, data)
|
|
node_name = f"{{{self.NOMINA['prefix']}}}Emisor"
|
|
ET.SubElement(node_nomina, node_name, emisor)
|
|
node_name = f"{{{self.NOMINA['prefix']}}}Receptor"
|
|
ET.SubElement(node_nomina, node_name, receptor)
|
|
|
|
if percepciones:
|
|
attr = percepciones
|
|
percepciones = attr.pop('percepciones')
|
|
node_name = f"{{{self.NOMINA['prefix']}}}Percepciones"
|
|
node = ET.SubElement(node_nomina, node_name, attr)
|
|
node_name = f"{{{self.NOMINA['prefix']}}}Percepcion"
|
|
for percepcion in percepciones:
|
|
ET.SubElement(node, node_name, percepcion)
|
|
|
|
if deducciones:
|
|
attr = deducciones
|
|
deducciones = attr.pop('deducciones')
|
|
node_name = f"{{{self.NOMINA['prefix']}}}Deducciones"
|
|
node = ET.SubElement(node_nomina, node_name, attr)
|
|
node_name = f"{{{self.NOMINA['prefix']}}}Deduccion"
|
|
for deduccion in deducciones:
|
|
ET.SubElement(node, node_name, deduccion)
|
|
|
|
if otros:
|
|
node_name = f"{{{self.NOMINA['prefix']}}}OtrosPagos"
|
|
node = ET.SubElement(node_nomina, node_name)
|
|
node_name = f"{{{self.NOMINA['prefix']}}}OtroPago"
|
|
for otro in otros:
|
|
subsidio = otro.pop('subsidio', {})
|
|
sub_node = ET.SubElement(node, node_name, otro)
|
|
if subsidio:
|
|
sub_name = f"{{{self.NOMINA['prefix']}}}SubsidioAlEmpleo"
|
|
ET.SubElement(sub_node, sub_name, subsidio)
|
|
return
|
|
|
|
def _cartaporte(self, complemento, data):
|
|
ubicaciones = data.pop('ubicaciones')
|
|
mercancias = data.pop('mercancias')
|
|
tiposfigura = data.pop('tiposfigura', ())
|
|
|
|
if not 'Version' in data:
|
|
data['Version'] = self.CARTAPORTE['version']
|
|
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}CartaPorte"
|
|
node_carta = ET.SubElement(complemento, node_name, data)
|
|
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}Ubicaciones"
|
|
node = ET.SubElement(node_carta, node_name)
|
|
|
|
for ubicacion in ubicaciones:
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}Ubicacion"
|
|
sub_node = ET.SubElement(node, node_name, ubicacion)
|
|
|
|
attr = mercancias
|
|
mercancias = attr.pop('mercancias')
|
|
autotransporte = attr.pop('autotransporte')
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}Mercancias"
|
|
node = ET.SubElement(node_carta, node_name, attr)
|
|
|
|
for mercancia in mercancias:
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}Mercancia"
|
|
sub_node = ET.SubElement(node, node_name, mercancia)
|
|
|
|
identificacion = autotransporte.pop('identificacion')
|
|
seguros = autotransporte.pop('seguros')
|
|
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}Autotransporte"
|
|
sub_node = ET.SubElement(node, node_name, autotransporte)
|
|
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}IdentificacionVehicular"
|
|
ET.SubElement(sub_node, node_name, identificacion)
|
|
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}Seguros"
|
|
ET.SubElement(sub_node, node_name, seguros)
|
|
|
|
if tiposfigura:
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}FiguraTransporte"
|
|
node = ET.SubElement(node_carta, node_name)
|
|
for figura in tiposfigura:
|
|
node_name = f"{{{self.CARTAPORTE['prefix']}}}TiposFigura"
|
|
ET.SubElement(node, node_name, figura)
|
|
|
|
return
|
|
|
|
|
|
def _validate_args_validar_cert(args):
|
|
data = {}
|
|
path_cert = args.dir_cert
|
|
name = args.nombre
|
|
|
|
if not _exists(path_cert):
|
|
msg = 'La ruta con los certificados es requerida'
|
|
log.error(msg)
|
|
return False, data
|
|
|
|
path_cer = _join(path_cert, f'{name}.cer')
|
|
path_key = _join(path_cert, f'{name}.key')
|
|
path_enc = _join(path_cert, f'{name}.enc')
|
|
|
|
if not _exists(path_cer):
|
|
msg = f'No se encontró el archivo CER en: {path_cer}'
|
|
log.error(msg)
|
|
return False, data
|
|
|
|
if not _exists(path_key):
|
|
msg = f'No se encontró el archivo KEY en: {path_key}'
|
|
log.error(msg)
|
|
return False, data
|
|
|
|
password = getpass.getpass('Introduce tu clave del archivo KEY: ')
|
|
if not password.strip():
|
|
msg = 'La clave es requerida'
|
|
log.error(msg)
|
|
return False, data
|
|
|
|
data['cer'] = path_cer
|
|
data['key'] = path_key
|
|
data['pass'] = password
|
|
data['enc'] = path_enc
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
def validar_cert(args):
|
|
result, data = _validate_args_validar_cert(args)
|
|
if not result:
|
|
return
|
|
|
|
cer = open(data['cer'], 'rb').read()
|
|
key = open(data['key'], 'rb').read()
|
|
cert = SATCertificate(cer, key, password=data['pass'])
|
|
|
|
if cert.error:
|
|
log.error(cert.error)
|
|
return
|
|
|
|
if cert.is_fiel:
|
|
msg = 'El certificado es FIEL, no podrás timbrar.'
|
|
log.error(msg)
|
|
return
|
|
|
|
Path(data['enc']).write_bytes(cert.key_enc)
|
|
|
|
msg = 'Los datos del certificado son:'
|
|
log.info(msg)
|
|
log.info(f'\n{cert}')
|
|
msg = 'Puedes timbrar con estos certificados'
|
|
log.info(msg)
|
|
return
|
|
|
|
|
|
def _make(path, target):
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
data = json.loads(f.read())
|
|
|
|
cfdi = CFDI(data)
|
|
p = Path(path)
|
|
new_path = f'{target}/{p.stem}.xml'
|
|
cfdi.save(new_path)
|
|
log.info(f'\tGenerado: {new_path}')
|
|
return
|
|
|
|
|
|
def make(source, target):
|
|
docs = get_files(source, 'json')
|
|
for doc in docs:
|
|
log.info(f'Generando: {doc}')
|
|
_make(doc, target)
|
|
return
|
|
|
|
|
|
def _stamp(path, target):
|
|
current_path = os.path.dirname(__file__)
|
|
path_cer = _join(current_path, 'certificados')
|
|
path_xslt = _join(current_path, 'xslt', 'cadena.xslt')
|
|
doc = ET.parse(path)
|
|
|
|
for pac in PACs.keys():
|
|
xslt = open(path_xslt, 'rb')
|
|
cer = read_file(_join(path_cer, f'{pac}.cer'))
|
|
key = read_file(_join(path_cer, f'{pac}.enc'))
|
|
cert = SATCertificate(cer, key)
|
|
# ~ print(cert)
|
|
|
|
root = doc.getroot()
|
|
root.attrib['Fecha'] = datetime.datetime.now().isoformat()[:19]
|
|
root.attrib['NoCertificado'] = cert.serial_number
|
|
root.attrib['Certificado'] = cert.cer_txt
|
|
emisor = root.xpath('//cfdi:Emisor', namespaces=NS_CFDI)[0]
|
|
emisor.attrib['Rfc'] = cert.rfc
|
|
emisor.attrib['RegimenFiscal'] = '601'
|
|
|
|
transfor = ET.XSLT(ET.parse(xslt))
|
|
cadena = str(transfor(doc)).encode()
|
|
stamp = cert.sign(cadena)
|
|
root.attrib['Sello'] = stamp
|
|
xslt.close()
|
|
|
|
p = Path(path)
|
|
new_path = f'{target}/{p.stem}_{pac}.xml'
|
|
doc.write(new_path, pretty_print=True, encoding='utf-8',
|
|
doctype='<?xml version="1.0" encoding="utf-8"?>')
|
|
log.info(f'\tSellado: {new_path}')
|
|
|
|
return
|
|
|
|
|
|
def stamp(source, target):
|
|
docs = get_files(source)
|
|
for doc in docs:
|
|
log.info(f'Sellando: {doc}')
|
|
_stamp(doc, target)
|
|
return
|
|
|
|
|
|
def _stamp_pac(path, target):
|
|
pac = None
|
|
for key in PACs.keys():
|
|
if key in path:
|
|
pac = PACs[key]()
|
|
break
|
|
|
|
if pac is None:
|
|
msg = 'Diferente PAC en documento'
|
|
log.debug(msg)
|
|
return
|
|
|
|
data = open(path, 'rb').read()
|
|
xml = pac.stamp(data)
|
|
|
|
if pac.error:
|
|
log.error(pac.error)
|
|
return
|
|
|
|
p = Path(path)
|
|
new_path = f'{target}/{p.stem}_timbrada.xml'
|
|
|
|
with open(new_path, 'w') as f:
|
|
f.write(xml)
|
|
log.info(f'\tTimbrada: {new_path}')
|
|
return
|
|
|
|
|
|
def stamp_pac(source, target):
|
|
docs = get_files(source)
|
|
for doc in docs:
|
|
log.info(f'Enviando: {doc}')
|
|
_stamp_pac(doc, target)
|
|
return
|
|
|
|
|
|
def read_file(path):
|
|
with open(path, 'rb') as f:
|
|
data = f.read()
|
|
return data
|
|
|
|
|
|
def get_files(path, ext='xml'):
|
|
docs = []
|
|
for folder, _, files in os.walk(path):
|
|
pattern = re.compile('\.{}'.format(ext), re.IGNORECASE)
|
|
docs += [_join(folder,f) for f in files if pattern.search(f)]
|
|
return tuple(docs)
|
|
|
|
|
|
def _replace_ext(path, ext):
|
|
return str(Path(path).with_suffix(ext))
|