#!/usr/bin/env python # ~ # ~ PAC # ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net # ~ # ~ This program is free software: you can redistribute it and/or modify # ~ it under the terms of the GNU General Public License as published by # ~ the Free Software Foundation, either version 3 of the License, or # ~ (at your option) any later version. # ~ # ~ This program is distributed in the hope that it will be useful, # ~ but WITHOUT ANY WARRANTY; without even the implied warranty of # ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # ~ GNU General Public License for more details. # ~ # ~ You should have received a copy of the GNU General Public License # ~ along with this program. If not, see . import base64 import datetime import logging import os import re from io import BytesIO from xml.sax.saxutils import unescape import lxml.etree as ET from zeep import Client from zeep.plugins import Plugin from zeep.cache import SqliteCache from zeep.transports import Transport from zeep.exceptions import Fault, TransportError from requests.exceptions import ConnectionError from .conf import DEBUG, AUTH LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' LOG_DATE = '%d/%m/%Y %H:%M:%S' logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) log = logging.getLogger(__name__) logging.getLogger('requests').setLevel(logging.ERROR) logging.getLogger('zeep').setLevel(logging.ERROR) TIMEOUT = 10 DEBUG_SOAP = True class DebugPlugin(Plugin): def _to_string(self, envelope, name): if DEBUG_SOAP: data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode() path = f'/tmp/soap_{name}.xml' with open(path, 'w') as f: f.write(data) return def egress(self, envelope, http_headers, operation, binding_options): self._to_string(envelope, 'request') return envelope, http_headers def ingress(self, envelope, http_headers, operation): self._to_string(envelope, 'response') return envelope, http_headers class PACFinkok(object): WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl' if DEBUG: WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl' URL = { 'quick_stamp': False, 'timbra': WS.format('stamp'), 'cancel': WS.format('cancel'), 'client': WS.format('registration'), 'util': WS.format('utilities'), } CODE = { '200': 'Comprobante timbrado satisfactoriamente', '205': 'No Encontrado', '307': 'Comprobante timbrado previamente', '702': 'No se encontro el RFC del emisor', 'IP': 'Invalid Passphrase', 'IPMSG': 'Frase de paso inválida', 'NE': 'No Encontrado', } def __init__(self): self._error = '' self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) self._plugins = [DebugPlugin()] @property def error(self): return self._error def _validate_result(self, result): if hasattr(result, 'CodEstatus'): ce = result.CodEstatus if ce is None: return result if ce == self.CODE['IP']: self._error = self.CODE['IPMSG'] return {} if self.CODE['NE'] in ce: self._error = 'UUID ' + self.CODE['NE'] return {} if self.CODE['200'] != ce: log.error('CodEstatus', type(ce), ce) return result if hasattr(result, 'Incidencias'): fault = result.Incidencias.Incidencia[0] cod_error = fault.CodigoError.encode('utf-8') msg_error = fault.MensajeIncidencia.encode('utf-8') error = 'Error: {}\n{}'.format(cod_error, msg_error) self._error = self.CODE.get(cod_error, error) return {} return result def _get_result(self, client, method, args): self._error = '' try: result = getattr(client.service, method)(**args) except Fault as e: self._error = str(e) return {} except TransportError as e: if '413' in str(e): self._error = '413

Documento muy grande para timbrar' else: self._error = str(e) return {} except ConnectionError as e: msg = '502 - Error de conexión' self._error = msg return {} return self._validate_result(result) def _to_string(self, data): root = ET.parse(BytesIO(data.encode('utf-8'))).getroot() xml = ET.tostring(root, pretty_print=True, xml_declaration=True, encoding='utf-8') return xml.decode('utf-8') def stamp(self, cfdi, auth={}): if DEBUG or not auth: auth = AUTH method = 'timbra' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': auth['user'], 'password': auth['pass'], 'xml': cfdi.encode('utf-8'), } result = self._get_result(client, 'stamp', args) if self.error: log.error(self.error) return '' data = { 'xml': self._to_string(result.xml), 'uuid': result.UUID, 'date': result.Fecha, } return data def _get_data_cancel(self, cfdi): NS_CFDI = { 'cfdi': 'http://www.sat.gob.mx/cfd/3', 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', } tree = ET.fromstring(cfdi.encode()) rfc_emisor = tree.xpath( 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)', namespaces=NS_CFDI) cfdi_uuid = tree.xpath( 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', namespaces=NS_CFDI) return rfc_emisor, cfdi_uuid def cancel(self, cfdi, info, auth={}): if not auth: auth = AUTH rfc_emisor, cfdi_uuid = self._get_data_cancel(cfdi) method = 'cancel' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) uuid_type = client.get_type('ns1:UUIDS') sa = client.get_type('ns0:stringArray') args = { 'UUIDS': uuid_type(uuids=sa(string=cfdi_uuid)), 'username': auth['user'], 'password': auth['pass'], 'taxpayer_id': rfc_emisor, 'cer': info['cer'], 'key': info['key'], 'store_pending': False, } result = self._get_result(client, 'cancel', args) if self.error: log.error(self.error) return '' folio = result['Folios']['Folio'][0] status = folio['EstatusUUID'] if status != '201': log.debug(f'Cancel status: {status} - {cfdi_uuid}') data = { 'acuse': result['Acuse'], 'date': result['Fecha'], } return data def cancel_xml(self, xml, auth={}): if not auth: auth = AUTH method = 'cancel' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) client.set_ns_prefix('can', 'http://facturacion.finkok.com/cancel') # ~ xml = f'\n{xml}' args = { 'xml': base64.b64encode(xml.encode()), 'username': auth['user'], 'password': auth['pass'], 'store_pending': False, } result = self._get_result(client, 'cancel_signature', args) if self.error: log.error(self.error) return '' return result def client_balance(self, auth, rfc=''): """Regresa los timbres restantes del cliente Se pueden usar las credenciales de relleser o las credenciales del emisor Args: rfc (str): El RFC del emisor Kwargs: auth (dict): Credenciales del emisor Returns: int Cantidad de timbres restantes """ if not auth: auth = AUTH['RESELLER'] method = 'client' client = Client( self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'reseller_username': auth['user'], 'reseller_password': auth['pass'], 'taxpayer_id': rfc, } result = self._get_result(client, 'get', args) if self.error: log.error(self.error) return '' success = bool(self.result.users) if not success: self.error = self.result.message or 'RFC no existe' return 0 return self.result.users.ResellerUser[0].credit