#!/usr/bin/env python3 import os import re import subprocess import lxml.etree as ET from settings import DEBUG, log, PATH_XSLT, DELETE_FILES, PAC_AUTH from helper.comercio import PACComercioDigital as PAC def _call(args): return subprocess.check_output(args, shell=True).decode() def join(*paths): return os.path.join(*paths) def kill(path): try: os.remove(path) except: pass return class Cert(): OPENSSL = 'openssl' def __init__ (self, path, name): self._get_data(path, name) @property def serial_number(self): return self._serial_number @property def txt(self): return self._cert_txt def _get_data(self, path, name): path_cer = join(path, f'{name}.cer') self._path_key = join(path, f'{name}.pem') self._serial_number = self._get_serial_number(path_cer) self._cert_txt = self._get_cert_txt(path_cer) return def _get_serial_number(self, path_cer): args = f'"{self.OPENSSL}" x509 -inform DER -in "{path_cer}" -noout -serial' serial_number = _call(args) serial_number = serial_number.split('=')[1].split('\n')[0][1::2] return serial_number def _get_cert_txt(self, path_cer): args = f'"{self.OPENSSL}" enc -base64 -in "{path_cer}"' data = _call(args).replace('\n', '') return data def sign(self, data): args = f'echo -n -e "{data.decode()}" | "{self.OPENSSL}" dgst -sha256 -sign "{self._path_key}" | "{self.OPENSSL}" enc -base64' data = _call(args).replace('\n', '') return data class DictToCfdi(): _PREFIX = 'cfdi' _XMLNS = 'http://www.sat.gob.mx/cfd/4' _SCHEMA = f'{_XMLNS} http://www.sat.gob.mx/sitio_internet/cfd/4/cfdv40.xsd' _LEYENDAS = { 'version': '1.0', 'prefix': 'leyendasFisc', 'xmlns': 'http://www.sat.gob.mx/leyendasFiscales', 'schema': ' http://www.sat.gob.mx/leyendasFiscales http://www.sat.gob.mx/sitio_internet/cfd/leyendasFiscales/leyendasFisc.xsd', } _PAGOS = { 'version': '2.0', 'prefix': 'pago20', 'xmlns': 'http://www.sat.gob.mx/Pagos20', 'schema': ' http://www.sat.gob.mx/Pagos20 http://www.sat.gob.mx/sitio_internet/cfd/Pagos/Pagos20.xsd', } _COMERCIO = { 'version': '1.1', 'prefix': 'cce11', 'xmlns': 'http://www.sat.gob.mx/ComercioExterior11', 'schema': ' http://www.sat.gob.mx/ComercioExterior11 http://www.sat.gob.mx/sitio_internet/cfd/ComercioExterior11/ComercioExterior11.xsd', } def __init__ (self, data): self._data = data self._cfdi = None self._root = None self._attr_complementos = {} self._node_addenda = None self._make_cfdi() @property def cfdi(self): return self._cfdi def _make_cfdi(self): self._validate_data() self._comprobante() self._relacionados() self._global() self._emisor() self._receptor() self._conceptos() self._impuestos() self._complementos() self._addenda() xml = ET.tostring(self._root, pretty_print=True, xml_declaration=True, encoding='utf-8') self._cfdi = xml.decode() return def _validate_data(self): self._schema = self._SCHEMA if 'leyendas' in self._data['complementos']: self._schema += self._LEYENDAS['schema'] self._attr_complementos['leyendas'] = { self._LEYENDAS['prefix']: self._LEYENDAS['xmlns'] } if 'pagos' in self._data['complementos']: self._schema += self._PAGOS['schema'] self._attr_complementos['pagos'] = { self._PAGOS['prefix']: self._PAGOS['xmlns'] } if 'comercio' in self._data['complementos']: self._schema += self._COMERCIO['schema'] self._attr_complementos['comercio'] = { self._COMERCIO['prefix']: self._COMERCIO['xmlns'] } return def _comprobante(self): attr = self._data['comprobante'] NSMAP = { self._PREFIX: self._XMLNS, 'xsi': 'http://www.w3.org/2001/XMLSchema-instance', } for k, value in self._attr_complementos.items(): NSMAP.update(value) attr_qname = ET.QName( 'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation') schema = {attr_qname: self._schema} node_name = f'{{{self._XMLNS}}}Comprobante' self._root = ET.Element(node_name, schema, **attr, nsmap=NSMAP) return def _global(self): data = self._data.get('global', {}) if not data: return node_name = f'{{{self._XMLNS}}}InformacionGlobal' node = ET.SubElement(self._root, node_name, data) return def _relacionados(self): data = self._data['relacionados'] if not data: return node_name = f'{{{self._XMLNS}}}CfdiRelacionados' attr = {'TipoRelacion': data['TipoRelacion']} node = ET.SubElement(self._root, node_name, attr) for uuid in data['UUID']: node_name = f'{{{self._XMLNS}}}CfdiRelacionado' attr = {'UUID': uuid} ET.SubElement(node, node_name, attr) return def _emisor(self): attr = self._data['emisor'] node_name = f'{{{self._XMLNS}}}Emisor' emisor = ET.SubElement(self._root, node_name, attr) return def _receptor(self): attr = self._data['receptor'] node_name = f'{{{self._XMLNS}}}Receptor' emisor = ET.SubElement(self._root, node_name, attr) return def _conceptos(self): products = self._data['conceptos'] node_name = f'{{{self._XMLNS}}}Conceptos' node = ET.SubElement(self._root, node_name) for product in products: complemento = product.pop('complemento', {}) taxes = product.pop('impuestos', {}) node_name = f'{{{self._XMLNS}}}Concepto' node_product = ET.SubElement(node, node_name, product) if not taxes: continue node_name = f'{{{self._XMLNS}}}Impuestos' node_taxes = ET.SubElement(node_product, node_name) traslados = taxes.get('traslados', []) retenciones = taxes.get('retenciones', []) if traslados: node_name = f'{{{self._XMLNS}}}Traslados' node_tmp = ET.SubElement(node_taxes, node_name) for tax in traslados: node_name = f'{{{self._XMLNS}}}Traslado' ET.SubElement(node_tmp, node_name, tax) if retenciones: node_name = f'{{{self._XMLNS}}}Retenciones' node_tmp = ET.SubElement(node_taxes, node_name) for tax in retenciones: node_name = f'{{{self._XMLNS}}}Retencion' ET.SubElement(node_tmp, node_name, tax) return def _impuestos(self): taxes = self._data['impuestos'] if not taxes: return node_name = f'{{{self._XMLNS}}}Impuestos' retenciones = taxes.pop('retenciones', ()) traslados = taxes.pop('traslados', ()) node = ET.SubElement(self._root, node_name, taxes) if retenciones: node_name = f'{{{self._XMLNS}}}Retenciones' sub_node = ET.SubElement(node, node_name) node_name = f'{{{self._XMLNS}}}Retencion' for tax in retenciones: ET.SubElement(sub_node, node_name, tax) if traslados: node_name = f'{{{self._XMLNS}}}Traslados' sub_node = ET.SubElement(node, node_name) node_name = f'{{{self._XMLNS}}}Traslado' for tax in traslados: ET.SubElement(sub_node, node_name, tax) return def _complementos(self): if not self._data['complementos']: return node_name = f'{{{self._XMLNS}}}Complemento' node = ET.SubElement(self._root, node_name) if 'leyendas' in self._data['complementos']: self._complemento_leyendas(self._data['complementos']['leyendas'], node) if 'pagos' in self._data['complementos']: self._complemento_pagos(self._data['complementos']['pagos'], node) elif 'comercio' in self._data['complementos']: self._complemento_comercio(self._data['complementos']['comercio'], node) return def _complemento_leyendas(self, data, node): attr = {'version': self._LEYENDAS['version']} node_name = f"{{{self._LEYENDAS['xmlns']}}}LeyendasFiscales" node_leyendas = ET.SubElement(node, node_name, attr) for leyenda in data: node_name = f"{{{self._LEYENDAS['xmlns']}}}Leyenda" ET.SubElement(node_leyendas, node_name, leyenda) return def _complemento_pagos(self, data, node): pago = data.pop('pago') docs = data.pop('docs') taxesd = data.pop('taxesd') taxes = data.pop('taxes') attr = {'Version': data.pop('Version')} node_name = f"{{{self._PAGOS['xmlns']}}}Pagos" node_pagos = ET.SubElement(node, node_name, attr) attr = data node_name = f"{{{self._PAGOS['xmlns']}}}Totales" ET.SubElement(node_pagos, node_name, attr) node_name = f"{{{self._PAGOS['xmlns']}}}Pago" node_pago = ET.SubElement(node_pagos, node_name, pago) for i, doc in enumerate(docs): node_name = f"{{{self._PAGOS['xmlns']}}}DoctoRelacionado" node_doc = ET.SubElement(node_pago, node_name, doc) if taxesd[i]: doc_taxes = taxesd[i] node_name = f"{{{self._PAGOS['xmlns']}}}ImpuestosDR" node_taxes_doc = ET.SubElement(node_doc, node_name) if 'retenciones' in doc_taxes: node_name = f"{{{self._PAGOS['xmlns']}}}RetencionesDR" node_taxes_dr = ET.SubElement(node_taxes_doc, node_name) for r in doc_taxes['retenciones']: node_name = f"{{{self._PAGOS['xmlns']}}}RetencionDR" ET.SubElement(node_taxes_dr, node_name, r) if 'traslados' in doc_taxes: node_name = f"{{{self._PAGOS['xmlns']}}}TrasladosDR" node_taxes_dt = ET.SubElement(node_taxes_doc, node_name) for t in doc_taxes['traslados']: node_name = f"{{{self._PAGOS['xmlns']}}}TrasladoDR" ET.SubElement(node_taxes_dt, node_name, t) node_name = f"{{{self._PAGOS['xmlns']}}}ImpuestosP" node_taxes = ET.SubElement(node_pago, node_name) if 'retenciones' in taxes: node_name = f"{{{self._PAGOS['xmlns']}}}RetencionesP" node_taxes_r = ET.SubElement(node_taxes, node_name) for r in taxes['retenciones']: node_name = f"{{{self._PAGOS['xmlns']}}}RetencionP" ET.SubElement(node_taxes_r, node_name, r) if 'traslados' in taxes: node_name = f"{{{self._PAGOS['xmlns']}}}TrasladosP" node_taxes_t = ET.SubElement(node_taxes, node_name) for t in taxes['traslados']: node_name = f"{{{self._PAGOS['xmlns']}}}TrasladoP" ET.SubElement(node_taxes_t, node_name, t) return def _complemento_comercio(self, data, node): mercancias = data.pop('mercancias', {}) emisor = data.pop('emisor', {}) propietario = data.pop('propietario', {}) receptor = data.pop('receptor', {}) destinatario = data.pop('destinatario', {}) attr = data node_name = f"{{{self._COMERCIO['xmlns']}}}ComercioExterior" node_comercio = ET.SubElement(node, node_name, attr) if emisor: node_name = f"{{{self._COMERCIO['xmlns']}}}Emisor" attr = {} if 'Curp' in emisor: attr = {'Curp': emisor.pop('Curp')} node_emisor = ET.SubElement(node_comercio, node_name, attr) node_name = f"{{{self._COMERCIO['xmlns']}}}Domicilio" ET.SubElement(node_emisor, node_name, emisor) if propietario: node_name = f"{{{self._COMERCIO['xmlns']}}}Propietario" ET.SubElement(node_comercio, node_name, propietario) if receptor: node_name = f"{{{self._COMERCIO['xmlns']}}}Receptor" attr = {} if 'NumRegIdTrib' in receptor: attr = {'NumRegIdTrib': receptor.pop('NumRegIdTrib')} node_receptor = ET.SubElement(node_comercio, node_name, attr) node_name = f"{{{self._COMERCIO['xmlns']}}}Domicilio" ET.SubElement(node_receptor, node_name, receptor) if destinatario: node_name = f"{{{self._COMERCIO['xmlns']}}}Destinatario" attr = {} if 'NumRegIdTrib' in destinatario: attr['NumRegIdTrib'] = destinatario.pop('NumRegIdTrib') if 'Nombre' in destinatario: attr['Nombre'] = destinatario.pop('Nombre') node_destinatario = ET.SubElement(node_comercio, node_name, attr) node_name = f"{{{self._COMERCIO['xmlns']}}}Domicilio" ET.SubElement(node_destinatario, node_name, destinatario) if mercancias: node_name = f"{{{self._COMERCIO['xmlns']}}}Mercancias" node_mercancias = ET.SubElement(node_comercio, node_name) for mercancia in mercancias: description = mercancia.pop('description', {}) node_name = f"{{{self._COMERCIO['xmlns']}}}Mercancia" node_mercancia = ET.SubElement(node_mercancias, node_name, mercancia) if description: node_name = f"{{{self._COMERCIO['xmlns']}}}DescripcionesEspecificas" ET.SubElement(node_mercancia, node_name, description) return def _addenda(self): type_boveda = self._data['addenda'].get('type', '') type_client = self._data['addenda'].get('type_client', '') data = self._data['addenda'].get('boveda', False) lotes = self._data['addenda']['lotes'] self._boveda(type_boveda, data, lotes) if type_client: data = self._data['addenda'].get('cliente', False) partes = self._data['addenda'].get('partes', ()) self._addenda_client(type_client, data, partes) return def _boveda(self, type_boveda, data, lotes): if not data: return XMLNS = 'http://kontender.mx/namespace/boveda' NSMAP = { 'bovadd': 'http://kontender.mx/namespace/boveda', 'kon': 'http://kontender.mx/namespace', } node_name = f'{{{self._XMLNS}}}Addenda' self._node_addenda = ET.SubElement(self._root, node_name) schema = 'http://kontender.mx/namespace/boveda http://kontender.mx/namespace/boveda/BOVEDAFISCAL.xsd http://kontender.mx/namespace http://kontender.mx/namespace/AddendaK.xsd' attr_qname = ET.QName( 'http://www.w3.org/2001/XMLSchema-instance', 'schemaLocation') schema = {attr_qname: schema} node_name = f'{{{XMLNS}}}BOVEDAFISCAL' node = ET.SubElement(self._node_addenda, node_name, schema, nsmap=NSMAP) if type_boveda == '01': for k, v in data.items(): node_name = f'{{{XMLNS}}}{k}' ET.SubElement(node, node_name, v) if lotes: node_name = f'{{{XMLNS}}}Lotes' node_lotes = ET.SubElement(node, node_name) for lote in lotes: node_name = f'{{{XMLNS}}}Lote' node_lote = ET.SubElement(node_lotes, node_name) for k2, v2 in lote.items(): node_name = f'{{{XMLNS}}}{k2}' sn = ET.SubElement(node_lote, node_name) sn.text = v2 elif type_boveda == '02': for k, v in data.items(): node_name = f'{{{XMLNS}}}{k}' n = ET.SubElement(node, node_name) n.text = v return def _addenda_client(self, type_client, data, partes): if not data: return if type_client == '1': XMLNS = 'http://www.vwnovedades.com/volkswagen/kanseilab/shcp/2009/Addenda/PMT' NSMAP = {'PMT': XMLNS} elif type_client == '2': XMLNS = 'http://www.vwnovedades.com/volkswagen/kanseilab/shcp/2009/Addenda/PSV' NSMAP = {'PSV': XMLNS} elif type_client == '4': XMLNS = 'http://www.sas-automative/en/locations/local-offices-and-plants/mexico/plant-puebla.html' NSMAP = {'PMT': XMLNS} attr = data.pop('Factura') node_name = f'{{{XMLNS}}}Factura' node = ET.SubElement(self._node_addenda, node_name, **attr, nsmap=NSMAP) for key, attr in data.items(): node_name = f'{{{XMLNS}}}{key}' ET.SubElement(node, node_name, **attr) if not partes: return node_name = f'{{{XMLNS}}}Partes' node = ET.SubElement(node, node_name) for parte in partes: referencias = parte.pop('referencias') node_name = f'{{{XMLNS}}}Parte' sub_node = ET.SubElement(node, node_name, **parte) node_name = f'{{{XMLNS}}}Referencias' ET.SubElement(sub_node, node_name, **referencias) return class DataToDict(): TRASLADO = 'T' RETENCION = 'R' NODES = { '01': '_comprobante', '02': '_relacionados', '03': '_emisor', '04': '_receptor', '05': '_conceptos', '06': '_impuestos', '07': '_global', '10': '_leyendas', '11': '_complemento', '12': '_complemento_12', '13': '_complemento_13', '14': '_complemento_14', '15': '_complemento_15', '16': '_complemento_16', '17': '_complemento_17', '49': '_addenda_lotes', '50': '_boveda', '51': '_addenda', '52': '_addenda_partes', } def __init__ (self, data): self._data = data self._cfdi = {'conceptos': [], 'impuestos': {}, 'complementos': {}, 'addenda': {}} self._complement = '' self._type_header = '' self._partes = [] self._lotes = [] self._ce_mercancias = [] self._process_data() @property def cfdi(self): return self._cfdi def _process_data(self): lines = self._data.split('\n') for line in lines: parts = line.split('|') if not parts[0]: continue header = self.NODES.get(parts[0], '') if not header: log.debug(f'No existe: {parts[0]}') continue if hasattr(self, header): getattr(self, header)(parts[2:]) self._cfdi['addenda']['partes'] = self._partes self._cfdi['addenda']['lotes'] = self._lotes if self._ce_mercancias: self._cfdi['complementos']['comercio']['mercancias'] = \ self._ce_mercancias return def _comprobante(self, data): self._cfdi['comprobante'] = {} fields = ( 'Version', 'Serie', 'Folio', 'Fecha', 'FormaPago', 'CondicionesDePago', 'SubTotal', 'Descuento', 'Moneda', 'TipoCambio', 'Total', 'TipoDeComprobante', 'MetodoPago', 'LugarExpedicion', 'Confirmacion', 'Exportacion', ) for index, field in enumerate(fields): if not data[index]: continue self._cfdi['comprobante'][field] = data[index] return def _relacionados(self, data): self._cfdi['relacionados'] = {} if data[0]: self._cfdi['relacionados']['TipoRelacion'] = data[0] self._cfdi['relacionados']['UUID'] = data[1:] return def _emisor(self, data): self._cfdi['emisor'] = {} fields = ( 'Rfc', 'Nombre', 'RegimenFiscal', ) for index, field in enumerate(fields): self._cfdi['emisor'][field] = data[index] return def _receptor(self, data): self._cfdi['receptor'] = {} fields = ( 'Rfc', 'Nombre', 'DomicilioFiscalReceptor', 'ResidenciaFiscal', 'NumRegIdTrib', 'RegimenFiscalReceptor', 'UsoCFDI', ) for index, field in enumerate(fields): if not data[index]: continue self._cfdi['receptor'][field] = data[index] return def _get_taxes_by_concept(self, data): taxes = {} traslados = [] retenciones = [] for i in range(0, len(data), 6): type_tax = data[i] tax = { 'Base': data[i + 1], 'Impuesto': data[i + 2], 'TipoFactor': data[i + 3], } if data[i + 4]: tax['TasaOCuota'] = data[i + 4] if data[i + 5]: tax['Importe'] = data[i + 5] if type_tax == self.TRASLADO: traslados.append(tax) elif type_tax == self.RETENCION: retenciones.append(tax) if traslados: taxes['traslados'] = traslados if retenciones: taxes['retenciones'] = retenciones return taxes def _conceptos(self, data): concepto = {} fields = ( 'ClaveProdServ', 'NoIdentificacion', 'Cantidad', 'ClaveUnidad', 'Unidad', 'Descripcion', 'ValorUnitario', 'Importe', 'Descuento', 'ObjetoImp', ) for index, field in enumerate(fields): if not data[index]: continue concepto[field] = data[index] pedimento = data[index + 1] if pedimento: concepto['pedimento'] = pedimento concepto['impuestos'] = self._get_taxes_by_concept(data[index + 2:]) self._cfdi['conceptos'].append(concepto) return def _get_taxes(self, data): traslados = [] retenciones = [] for i in range(0, len(data), 6): type_tax = data[i] if type_tax == self.TRASLADO: tax = { 'Base': data[i + 1], 'Impuesto': data[i + 2], 'TipoFactor': data[i + 3], 'TasaOCuota': data[i + 4], 'Importe': data[i + 5], } traslados.append(tax) elif type_tax == self.RETENCION: tax = { 'Impuesto': data[i + 2], 'Importe': data[i + 5], } retenciones.append(tax) if traslados: self._cfdi['impuestos']['traslados'] = traslados if retenciones: self._cfdi['impuestos']['retenciones'] = retenciones return def _impuestos(self, data): # ~ self._cfdi['impuestos'] = {} fields = ( 'TotalImpuestosRetenidos', 'TotalImpuestosTrasladados', ) for index, field in enumerate(fields): if not data[index]: continue self._cfdi['impuestos'][field] = data[index] self._get_taxes(data[index + 1:]) return def _global(self, data): self._cfdi['global'] = {} fields = ( 'Periodicidad', 'Meses', 'Año', ) for index, field in enumerate(fields): if not data[index]: continue self._cfdi['global'][field] = data[index] return def _leyendas(self, data): if not data: return leyendas = [] for i in range(0, len(data), 3): leyenda = { 'disposicionFiscal': data[i], 'norma': data[i+1], 'textoLeyenda': data[i+2], } leyendas.append(leyenda) self._cfdi['complementos']['leyendas'] = leyendas return def _fields_to_dict(self, fields, data): attr = {} for index, field in enumerate(fields): if not data[index]: continue attr[field] = data[index] return attr def _complemento(self, data): if not data: return self._complement = data[0] version = {'Version': data[1]} if self._complement == '1': self._cfdi['complementos']['pagos'] = version self._cfdi['complementos']['pagos']['docs'] = [] self._cfdi['complementos']['pagos']['taxes'] = {} self._cfdi['complementos']['pagos']['taxesd'] = [] elif self._complement == '2': self._cfdi['complementos']['comercio'] = version return def _complemento_12(self, data): if not data: return if self._complement == '1': fields = ( 'TotalRetencionesIVA', 'TotalRetencionesISR', 'TotalRetencionesIEPS', 'TotalTrasladosBaseIVA16', 'TotalTrasladosImpuestoIVA16', 'TotalTrasladosBaseIVA8', 'TotalTrasladosImpuestoIVA8', 'TotalTrasladosBaseIVA0', 'TotalTrasladosImpuestoIVA0', 'TotalTrasladosBaseIVAExento', 'MontoTotalPagos', ) attr = self._fields_to_dict(fields, data) self._cfdi['complementos']['pagos'].update(attr) elif self._complement == '2': fields = ( 'MotivoTraslado', 'TipoOperacion', 'ClaveDePedimento', 'CertificadoOrigen', 'NumCertificadoOrigen', 'NumeroExportadorConfiable', 'Incoterm', 'Subdivision', 'Observaciones', 'TipoCambioUSD', 'TotalUSD', ) attr = self._fields_to_dict(fields, data) self._cfdi['complementos']['comercio'].update(attr) return def _complemento_13(self, data): if not data: return if self._complement == '1': fields = ( 'FechaPago', 'FormaDePagoP', 'MonedaP', 'TipoCambioP', 'Monto', 'NumOperacion', 'RfcEmisorCtaOrd', 'NomBancoOrdExt', 'CtaOrdenante', 'RfcEmisorCtaBen', 'CtaBeneficiario', 'TipoCadPago', 'CertPago', 'CadPago', 'SelloPago', ) attr = self._fields_to_dict(fields, data) self._cfdi['complementos']['pagos']['pago'] = attr elif self._complement == '2': fields = ( 'Curp', 'Calle', 'NumeroExterior', 'NumeroInterior', 'Colonia', 'Localidad', 'Referencia', 'Municipio', 'Estado', 'Pais', 'CodigoPostal', ) attr = self._fields_to_dict(fields, data) self._cfdi['complementos']['comercio']['emisor'] = attr return def _get_retenciones_by_pay(self, data): retenciones = [] for i in range(0, len(data), 2): tax = { 'ImpuestoP': data[i], 'ImporteP': data[i + 1], } retenciones.append(tax) return retenciones def _complemento_14(self, data): if not data: return if self._complement == '1': attr = self._get_retenciones_by_pay(data) self._cfdi['complementos']['pagos']['taxes']['retenciones'] = attr elif self._complement == '2': fields = ('NumRegIdTrib', 'ResidenciaFiscal') attr = self._fields_to_dict(fields, data) if attr: self._cfdi['complementos']['comercio']['propietario'] = attr return def _get_traslados_by_pay(self, data): traslados = [] for i in range(0, len(data), 5): tax = { 'BaseP': data[i], 'ImpuestoP': data[i + 1], 'TipoFactorP': data[i + 2], 'TasaOCuotaP': data[i + 3], 'ImporteP': data[i + 4], } traslados.append(tax) return traslados def _complemento_15(self, data): if not data: return if self._complement == '1': attr = self._get_traslados_by_pay(data) self._cfdi['complementos']['pagos']['taxes']['traslados'] = attr elif self._complement == '2': fields = ( 'NumRegIdTrib', 'Calle', 'NumeroExterior', 'NumeroInterior', 'Colonia', 'Localidad', 'Referencia', 'Municipio', 'Estado', 'Pais', 'CodigoPostal', ) attr = self._fields_to_dict(fields, data) self._cfdi['complementos']['comercio']['receptor'] = attr return def _complemento_16(self, data): if not data: return if self._complement == '1': fields = ( 'IdDocumento', 'Serie', 'Folio', 'MonedaDR', 'EquivalenciaDR', 'NumParcialidad', 'ImpSaldoAnt', 'ImpPagado', 'ImpSaldoInsoluto', 'ObjetoImpDR', ) attr = self._fields_to_dict(fields, data) self._cfdi['complementos']['pagos']['docs'].append(attr) elif self._complement == '2': fields = ( 'NumRegIdTrib', 'Nombre', 'Calle', 'NumeroExterior', 'NumeroInterior', 'Colonia', 'Localidad', 'Referencia', 'Municipio', 'Estado', 'Pais', 'CodigoPostal', ) attr = self._fields_to_dict(fields, data) self._cfdi['complementos']['comercio']['destinatario'] = attr return def _get_taxes_by_doc(self, data): taxes = {} traslados = [] retenciones = [] for i in range(0, len(data), 6): type_tax = data[i] tax = { 'BaseDR': data[i + 1], 'ImpuestoDR': data[i + 2], 'TipoFactorDR': data[i + 3], 'TasaOCuotaDR': data[i + 4], 'ImporteDR': data[i + 5], } if type_tax == self.TRASLADO: traslados.append(tax) elif type_tax == self.RETENCION: retenciones.append(tax) if traslados: taxes['traslados'] = traslados if retenciones: taxes['retenciones'] = retenciones return taxes def _complemento_17(self, data): if not data: return if self._complement == '1': attr = self._get_taxes_by_doc(data) self._cfdi['complementos']['pagos']['taxesd'].append(attr) elif self._complement == '2': fields = ( 'NoIdentificacion', 'FraccionArancelaria', 'CantidadAduana', 'UnidadAduana', 'ValorUnitarioAduana', 'ValorDolares', ) mercancia = self._fields_to_dict(fields, data) fields = ( 'Marca', 'Modelo', 'SubModelo', 'NumeroSerie', ) description = self._fields_to_dict(fields, data[6:]) if description: mercancia['description'] = description self._ce_mercancias.append(mercancia) return def _boveda(self, data): type_addenda = data[0] if type_addenda == '01': fields = ( ('ImporteLetra', 'importe'), ('UsoCFDI', 'UsoCFDI'), ('MetodosPago', 'MetodoPagoSAT'), ('FormaPago', 'FormaPagoSAT'), ('TipoDoctoElectronico', 'TipoDocumento'), ('BovedaFiscal', 'almacen', 'condicion', 'correoEmisor', 'correoReceptor', 'numeroCliente', 'razonSocialCliente', 'tipo'), ('DireccionEmisor', 'Calle', 'CodigoPostal', 'Colonia', 'Estado', 'Localidad', 'Municipio', 'NoExterior', 'NoInterior', 'Pais', 'Referencia', 'Telefono'), ('DireccionSucursal', 'Calle', 'Ciudad', 'CodigoPostal', 'Colonia', 'Estado', 'Localidad', 'Municipio', 'NoExterior', 'NoInterior', 'Pais', 'Referencia'), ('DireccionReceptor', 'Calle', 'Ciudad', 'CodigoPostal', 'Colonia', 'Delegacion', 'Estado', 'Localidad', 'Municipio', 'NoExterior', 'NoInterior', 'Pais', 'Referencia'), ('DireccionReceptorSucursal', 'Nombre', 'Calle', 'Ciudad', 'CodigoPostal', 'Estado', 'Pais', 'Comentario', 'Dato01', 'Dato02', 'Dato03', 'Dato04', 'Dato05', 'Dato06', 'Dato07', 'Dato08', 'Dato09', 'Dato10'), ('NombreComercial', 'Nombre'), ('ClaveTipoFolio', 'clave'), ('TR', 'transaccion'), ('OrdenCompra', 'folio'), ('NotaDeVenta', 'folio'), ) boveda = {} i = 1 for f in fields: k = f[0] attr = {} for a in f[1:]: try: attr[a] = data[i] except IndexError: log.error('Faltan datos en addenda Boveda') attr[a] = '' i += 1 boveda[k] = attr elif type_addenda == '02': fields = ( 'Razon_Social_destino', 'Calle_Destino', 'Colonia_Destino', 'Ciudad_Destino', 'Estado_Destino', 'Pais_Destino', 'CP_Destino_consigan', 'RFC_Destino_consigna', 'Telefono_Receptor', 'Peso_Bruto', 'Peso_Neto', 'Incoterm', 'leyenda_pie', 'R.vto', 'TIPO_CAMBIO_FACTURA', 'R.cte', 'RI_Solicitante', 'R.fefa', 'Razon_Social_facturado', 'Calle_facturado', 'Colonia_facturado', 'RFC_destino', 'Telefono_facturado', 'NUMCTAPAGO', ) boveda = {} for i, f in enumerate(fields): boveda[f] = data[i+1] self._cfdi['addenda']['type'] = type_addenda self._cfdi['addenda']['boveda'] = boveda return def _addenda(self, header): self._type_header = header[1] self._cfdi['addenda']['type_client'] = self._type_header data = {} if self._type_header == '2': data['Factura'] = { 'version': header[3], 'tipoDocumentoFiscal': header[4], 'tipoDocumentoVWM': header[5], 'division': header[6]} data['Moneda'] = { 'tipoMoneda': header[7], 'tipoCambio': header[8]} data['Proveedor'] = { 'codigo': header[10], 'nombre': header[11], 'correoContacto': header[12]} data['Origen'] = {'codigo': header[13]} data['Destino'] = { 'codigo': header[14], 'naveReciboMaterial': header[15]} data['Referencias'] = {'referenciaProveedor': header[16]} data['Solicitante'] = {'correo': header[17]} data['Archivo'] = {'datos': header[20], 'tipo': 'ZIP'} elif self._type_header == '4': data['Factura'] = {'version': header[3]} data['Moneda'] = {'tipoMoneda': header[7]} data['Proveedor'] = {'codigo': header[10], 'nombre': header[11]} data['Referencias'] = {'referenciaProveedor': header[16]} self._cfdi['addenda']['cliente'] = data return def _addenda_lotes(self, lote): attr = { 'IVM_InvoiceID': '', 'POline': lote[0], 'NoIdentificacion': lote[1], 'Lote': lote[2], 'Cantidad': lote[3], 'Fecha': lote[4], } self._lotes.append(attr) return def _addenda_partes(self, parte): if self._type_header == '2': attr = {'posicion': parte[0], 'numeroMaterial': parte[1], 'descripcionMaterial': parte[2], 'cantidadMaterial': parte[3], 'unidadMedida': parte[4], 'precioUnitario': parte[5], 'montoLinea': parte[6], 'codigoImpuesto': parte[8], 'referencias': { 'ordenCompra': parte[7], } } elif self._type_header == '4': attr = {'posicion': parte[0], 'numeroMaterial': parte[1], 'descripcionMaterial': parte[2], 'referencias': { 'ordenCompra': parte[7], 'numeroPKN': parte[8], 'numeroASN': parte[9], } } self._partes.append(attr) return def stamp_cfdi(cfdi, cert): xslt = open(PATH_XSLT, 'rb') root = ET.fromstring(cfdi.encode()) root.attrib['NoCertificado'] = cert.serial_number root.attrib['Certificado'] = cert.txt transfor = ET.XSLT(ET.parse(xslt)) cadena = str(transfor(root)).encode() root.attrib['Sello'] = cert.sign(cadena) xslt.close() xml = ET.tostring(root, pretty_print=True, encoding='utf-8') return xml.decode() def _get_files(path, ext='xml'): paths = [] for folder, _, files in os.walk(path): pattern = re.compile('\.{}'.format(ext), re.IGNORECASE) paths += [join(folder, f) for f in files if pattern.search(f)] return paths def _read_file(path, encoding='utf-8'): # ~ CODEC_WIN = 'ISO-8859-1' with open(path, 'r', encoding=encoding) as f: data = f.read() if DEBUG: msg = f'Archivo leido: {path}' log.debug(msg) return data def _save_file(path, target, data): _, filename = os.path.split(path) name, _ = os.path.splitext(filename) path_new = join(target, f'{name}.xml') data = f'\n{data}' with open(path_new, 'w', encoding='utf-8') as f: f.write(data) if DEBUG: msg = f'Archivo sellado: {path}' log.debug(msg) return path_new def make_cfdi(source, target, dir_cert, nombre): cert = Cert(dir_cert, nombre) paths = _get_files(source, 'txt') for path in paths: # ~ _version33(path, target) # ~ continue data = _read_file(path) data = DataToDict(data).cfdi cfdi = DictToCfdi(data).cfdi cfdi = stamp_cfdi(cfdi, cert) path_xml = _save_file(path, target, cfdi) msg = f'CFDI: {path_xml}' log.info(msg) return def stamp_pac(source, target): pac = PAC() paths = _get_files(source) for path in paths: log.info(f'\tEnviar: {path}') _, filename = os.path.split(path) data = open(path, 'r').read() result = pac.stamp(data, PAC_AUTH) if pac.error: log.error(pac.error) continue new_path = f'{target}/{filename}' with open(new_path, 'w') as f: f.write(result['xml']) log.info(f'\tTimbrada: {new_path}') return