#!/usr/bin/env python3 import os import re import subprocess import lxml.etree as ET from settings import DEBUG, log, PATH_XSLT, DELETE_FILES, PAC_AUTH from helper.comercio import PACComercioDigital as PAC def _call(args): return subprocess.check_output(args, shell=True).decode() def join(*paths): return os.path.join(*paths) def kill(path): try: os.remove(path) except: pass return class Cert(): OPENSSL = 'openssl' def __init__ (self, path, name): self._get_data(path, name) @property def serial_number(self): return self._serial_number @property def txt(self): return self._cert_txt def _get_data(self, path, name): path_cer = join(path, f'{name}.cer') self._path_key = join(path, f'{name}.pem') self._serial_number = self._get_serial_number(path_cer) self._cert_txt = self._get_cert_txt(path_cer) return def _get_serial_number(self, path_cer): args = f'"{self.OPENSSL}" x509 -inform DER -in "{path_cer}" -noout -serial' serial_number = _call(args) serial_number = serial_number.split('=')[1].split('\n')[0][1::2] return serial_number def _get_cert_txt(self, path_cer): args = f'"{self.OPENSSL}" enc -base64 -in "{path_cer}"' data = _call(args).replace('\n', '') return data def sign(self, data): args = f'echo -n -e "{data.decode()}" | "{self.OPENSSL}" dgst -sha256 -sign "{self._path_key}" | "{self.OPENSSL}" enc -base64' data = _call(args).replace('\n', '') return data class DictToCfdi(): _PREFIX = 'cfdi' _XMLNS = 'http://www.sat.gob.mx/cfd/4' _SCHEMA = f'{_XMLNS} http://www.sat.gob.mx/sitio_internet/cfd/4/cfdv40.xsd' _LEYENDAS = { 'version': '1.0', 'prefix': 'leyendasFisc', 'xmlns': 'http://www.sat.gob.mx/leyendasFiscales', 'schema': ' http://www.sat.gob.mx/leyendasFiscales http://www.sat.gob.mx/sitio_internet/cfd/leyendasFiscales/leyendasFisc.xsd', } def __init__ (self, data): self._data = data self._cfdi = None self._root = None self._attr_complementos = {} self._make_cfdi() @property def cfdi(self): return self._cfdi def _make_cfdi(self): self._validate_data() self._comprobante() self._relacionados() self._emisor() self._receptor() self._conceptos() self._impuestos() self._complementos() xml = ET.tostring(self._root, pretty_print=True, xml_declaration=True, encoding='utf-8') self._cfdi = xml.decode() return def _validate_data(self): self._schema = self._SCHEMA if 'leyendas' in self._data['complementos']: self._schema += self._LEYENDAS['schema'] self._attr_complementos['leyendas'] = { self._LEYENDAS['prefix']: self._LEYENDAS['xmlns'] } return def _comprobante(self): attr = self._data['comprobante'] NSMAP = { self._PREFIX: self._XMLNS, 'xsi': 'http://www.w3.org/2001/XMLSchema-instance', } for k, value in self._attr_complementos.items(): NSMAP.update(value) attr_qname = ET.QName( 'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation') schema = {attr_qname: self._schema} node_name = f'{{{self._XMLNS}}}Comprobante' self._root = ET.Element(node_name, schema, **attr, nsmap=NSMAP) return def _relacionados(self): data = self._data['relacionados'] if not data: return node_name = f'{{{self._XMLNS}}}CfdiRelacionados' attr = {'TipoRelacion': data['TipoRelacion']} node = ET.SubElement(self._root, node_name, attr) for uuid in data['UUID']: node_name = f'{{{self._XMLNS}}}CfdiRelacionado' attr = {'UUID': uuid} ET.SubElement(node, node_name, attr) return def _emisor(self): attr = self._data['emisor'] node_name = f'{{{self._XMLNS}}}Emisor' emisor = ET.SubElement(self._root, node_name, attr) return def _receptor(self): attr = self._data['receptor'] node_name = f'{{{self._XMLNS}}}Receptor' emisor = ET.SubElement(self._root, node_name, attr) return def _conceptos(self): products = self._data['conceptos'] node_name = f'{{{self._XMLNS}}}Conceptos' node = ET.SubElement(self._root, node_name) for product in products: complemento = product.pop('complemento', {}) taxes = product.pop('impuestos', {}) node_name = f'{{{self._XMLNS}}}Concepto' node_product = ET.SubElement(node, node_name, product) if not taxes: continue node_name = f'{{{self._XMLNS}}}Impuestos' node_taxes = ET.SubElement(node_product, node_name) traslados = taxes.get('traslados', []) retenciones = taxes.get('retenciones', []) if traslados: node_name = f'{{{self._XMLNS}}}Traslados' node_tmp = ET.SubElement(node_taxes, node_name) for tax in traslados: node_name = f'{{{self._XMLNS}}}Traslado' ET.SubElement(node_tmp, node_name, tax) if retenciones: node_name = f'{{{self._XMLNS}}}Retenciones' node_tmp = ET.SubElement(node_taxes, node_name) for tax in retenciones: node_name = f'{{{self._XMLNS}}}Retencion' ET.SubElement(node_tmp, node_name, tax) return def _impuestos(self): taxes = self._data['impuestos'] if not taxes: return node_name = f'{{{self._XMLNS}}}Impuestos' retenciones = taxes.pop('retenciones', ()) traslados = taxes.pop('traslados', ()) node = ET.SubElement(self._root, node_name, taxes) if retenciones: node_name = f'{{{self._XMLNS}}}Retenciones' sub_node = ET.SubElement(node, node_name) node_name = f'{{{self._XMLNS}}}Retencion' for tax in retenciones: ET.SubElement(sub_node, node_name, tax) if traslados: node_name = f'{{{self._XMLNS}}}Traslados' sub_node = ET.SubElement(node, node_name) node_name = f'{{{self._XMLNS}}}Traslado' for tax in traslados: ET.SubElement(sub_node, node_name, tax) return def _complementos(self): if not self._data['complementos']: return node_name = f'{{{self._XMLNS}}}Complemento' node = ET.SubElement(self._root, node_name) if 'leyendas' in self._data['complementos']: self._complemento_leyendas(self._data['complementos']['leyendas'], node) return def _complemento_leyendas(self, data, node): attr = {'version': self._LEYENDAS['version']} node_name = f"{{{self._LEYENDAS['xmlns']}}}LeyendasFiscales" node_leyendas = ET.SubElement(node, node_name, attr) for leyenda in data: node_name = f"{{{self._LEYENDAS['xmlns']}}}Leyenda" ET.SubElement(node_leyendas, node_name, leyenda) return class DataToDict(): TRASLADO = 'T' RETENCION = 'R' NODES = { '01': '_comprobante', '02': '_relacionados', '03': '_emisor', '04': '_receptor', '05': '_conceptos', '06': '_impuestos', '10': '_leyendas', } def __init__ (self, data): self._data = data self._cfdi = {'conceptos': [], 'complementos': {}} self._process_data() @property def cfdi(self): return self._cfdi def _process_data(self): lines = self._data.split('\n') for line in lines: parts = line.split('|') if not parts[0]: continue header = self.NODES.get(parts[0], '') if not header: log.debug(f'No existe: {parts[0]}') continue if hasattr(self, header): getattr(self, header)(parts[2:]) return def _comprobante(self, data): self._cfdi['comprobante'] = {} fields = ( 'Version', 'Serie', 'Folio', 'Fecha', 'FormaPago', 'CondicionesDePago', 'SubTotal', 'Descuento', 'Moneda', 'TipoCambio', 'Total', 'TipoDeComprobante', 'MetodoPago', 'LugarExpedicion', 'Confirmacion', 'Exportacion', ) for index, field in enumerate(fields): if not data[index]: continue self._cfdi['comprobante'][field] = data[index] return def _relacionados(self, data): self._cfdi['relacionados'] = {} if data[0]: self._cfdi['relacionados']['TipoRelacion'] = data[0] self._cfdi['relacionados']['UUID'] = data[1:] return def _emisor(self, data): self._cfdi['emisor'] = {} fields = ( 'Rfc', 'Nombre', 'RegimenFiscal', ) for index, field in enumerate(fields): self._cfdi['emisor'][field] = data[index] return def _receptor(self, data): self._cfdi['receptor'] = {} fields = ( 'Rfc', 'Nombre', 'DomicilioFiscalReceptor', 'ResidenciaFiscal', 'NumRegIdTrib', 'RegimenFiscalReceptor', 'UsoCFDI', ) for index, field in enumerate(fields): if not data[index]: continue self._cfdi['receptor'][field] = data[index] return def _get_taxes_by_concept(self, data): taxes = {} traslados = [] retenciones = [] for i in range(0, len(data), 6): type_tax = data[i] tax = { 'Base': data[i + 1], 'Impuesto': data[i + 2], 'TipoFactor': data[i + 3], 'TasaOCuota': data[i + 4], 'Importe': data[i + 5], } if type_tax == self.TRASLADO: traslados.append(tax) elif type_tax == self.RETENCION: retenciones.append(tax) if traslados: taxes['traslados'] = traslados if retenciones: taxes['retenciones'] = retenciones return taxes def _conceptos(self, data): concepto = {} fields = ( 'ClaveProdServ', 'NoIdentificacion', 'Cantidad', 'ClaveUnidad', 'Unidad', 'Descripcion', 'ValorUnitario', 'Importe', 'Descuento', 'ObjetoImp', ) for index, field in enumerate(fields): if not data[index]: continue concepto[field] = data[index] pedimento = data[index + 1] if pedimento: concepto['pedimento'] = pedimento concepto['impuestos'] = self._get_taxes_by_concept(data[index + 2:]) self._cfdi['conceptos'].append(concepto) return def _get_taxes(self, data): traslados = [] retenciones = [] for i in range(0, len(data), 6): type_tax = data[i] if type_tax == self.TRASLADO: tax = { 'Base': data[i + 1], 'Impuesto': data[i + 2], 'TipoFactor': data[i + 3], 'TasaOCuota': data[i + 4], 'Importe': data[i + 5], } traslados.append(tax) elif type_tax == self.RETENCION: tax = { 'Impuesto': data[i + 2], 'Importe': data[i + 5], } retenciones.append(tax) if traslados: self._cfdi['impuestos']['traslados'] = traslados if retenciones: self._cfdi['impuestos']['retenciones'] = retenciones return def _impuestos(self, data): self._cfdi['impuestos'] = {} fields = ( 'TotalImpuestosRetenidos', 'TotalImpuestosTrasladados', ) for index, field in enumerate(fields): if not data[index]: continue self._cfdi['impuestos'][field] = data[index] self._get_taxes(data[index + 1:]) return def _leyendas(self, data): if not data: return leyendas = [] for i in range(0, len(data), 3): leyenda = { 'disposicionFiscal': data[i], 'norma': data[i+1], 'textoLeyenda': data[i+2], } leyendas.append(leyenda) self._cfdi['complementos']['leyendas'] = leyendas return def stamp_cfdi(cfdi, cert): xslt = open(PATH_XSLT, 'rb') root = ET.fromstring(cfdi.encode()) root.attrib['NoCertificado'] = cert.serial_number root.attrib['Certificado'] = cert.txt transfor = ET.XSLT(ET.parse(xslt)) cadena = str(transfor(root)).encode() root.attrib['Sello'] = cert.sign(cadena) xslt.close() xml = ET.tostring(root, pretty_print=True, encoding='utf-8') return xml.decode() def _get_files(path, ext='xml'): paths = [] for folder, _, files in os.walk(path): pattern = re.compile('\.{}'.format(ext), re.IGNORECASE) paths += [join(folder, f) for f in files if pattern.search(f)] return paths def _read_file(path, encoding='utf-8'): # ~ CODEC_WIN = 'ISO-8859-1' with open(path, 'r', encoding=encoding) as f: data = f.read() if DEBUG: msg = f'Archivo leido: {path}' log.debug(msg) return data def _save_file(path, target, data): _, filename = os.path.split(path) name, _ = os.path.splitext(filename) path_new = join(target, f'{name}.xml') data = f'\n{data}' with open(path_new, 'w', encoding='utf-8') as f: f.write(data) if DEBUG: msg = f'Archivo sellado: {path}' log.debug(msg) return path_new def make_cfdi(source, target, dir_cert, nombre): cert = Cert(dir_cert, nombre) paths = _get_files(source, 'txt') for path in paths: data = _read_file(path) data = DataToDict(data).cfdi cfdi = DictToCfdi(data).cfdi cfdi = stamp_cfdi(cfdi, cert) path_xml = _save_file(path, target, cfdi) msg = f'CFDI: {path_xml}' log.info(msg) return def stamp_pac(source, target): pac = PAC() paths = _get_files(source) for path in paths: log.info(f'\tEnviar: {path}') _, filename = os.path.split(path) data = open(path, 'r').read() result = pac.stamp(data, PAC_AUTH) if pac.error: log.error(pac.error) continue new_path = f'{target}/{filename}' with open(new_path, 'w') as f: f.write(result['xml']) log.info(f'\tTimbrada: {new_path}') return