#!/usr/bin/env python # ~ # ~ PAC # ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net # ~ # ~ This program is free software: you can redistribute it and/or modify # ~ it under the terms of the GNU General Public License as published by # ~ the Free Software Foundation, either version 3 of the License, or # ~ (at your option) any later version. # ~ # ~ This program is distributed in the hope that it will be useful, # ~ but WITHOUT ANY WARRANTY; without even the implied warranty of # ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # ~ GNU General Public License for more details. # ~ # ~ You should have received a copy of the GNU General Public License # ~ along with this program. If not, see . # ~ import base64 import datetime import logging import os import re from io import BytesIO from xml.sax.saxutils import unescape import lxml.etree as ET from zeep import Client from zeep.plugins import Plugin from zeep.cache import SqliteCache from zeep.transports import Transport from zeep.exceptions import Fault, TransportError from requests.exceptions import ConnectionError from .conf import DEBUG, AUTH LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' LOG_DATE = '%d/%m/%Y %H:%M:%S' logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) log = logging.getLogger(__name__) logging.getLogger('requests').setLevel(logging.ERROR) logging.getLogger('zeep').setLevel(logging.ERROR) TIMEOUT = 10 DEBUG_SOAP = True class DebugPlugin(Plugin): def _to_string(self, envelope, name): if DEBUG_SOAP: data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode() path = f'/tmp/soap_{name}.xml' with open(path, 'w') as f: f.write(data) # ~ print(data) return def egress(self, envelope, http_headers, operation, binding_options): self._to_string(envelope, 'request') return envelope, http_headers def ingress(self, envelope, http_headers, operation): self._to_string(envelope, 'response') return envelope, http_headers class PACFinkok(object): WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl' if DEBUG: WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl' URL = { 'quick_stamp': False, 'timbra': WS.format('stamp'), 'cancel': WS.format('cancel'), 'client': WS.format('registration'), 'util': WS.format('utilities'), } CODE = { '200': 'Comprobante timbrado satisfactoriamente', '205': 'No Encontrado', '307': 'Comprobante timbrado previamente', '702': 'No se encontro el RFC del emisor', 'IP': 'Invalid Passphrase', 'IPMSG': 'Frase de paso inválida', 'NE': 'No Encontrado', } def __init__(self): self._error = '' self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) self._plugins = [DebugPlugin()] @property def error(self): return self._error def _validate_result(self, result): if hasattr(result, 'Incidencias') and not result.Incidencias is None: fault = result.Incidencias.Incidencia[0] cod_error = fault.CodigoError.encode('utf-8').decode() msg_error = fault.MensajeIncidencia.encode('utf-8').decode() error = 'Error: {}\n{}'.format(cod_error, msg_error) if cod_error == '307': return result self._error = self.CODE.get(cod_error, error) return {} if hasattr(result, 'CodEstatus'): ce = result.CodEstatus if ce is None: return result if ce == self.CODE['IP']: self._error = self.CODE['IPMSG'] return {} if self.CODE['NE'] in ce: self._error = 'UUID ' + self.CODE['NE'] return {} if ce == 'UUID Not Found': self._error = 'UUID ' + self.CODE['NE'] return {} if self.CODE['200'] != ce: self._error = ce return {} return result return result def _get_result(self, client, method, args): self._error = '' try: result = getattr(client.service, method)(**args) except Fault as e: self._error = str(e) return {} except TransportError as e: if '413' in str(e): self._error = '413

Documento muy grande para timbrar' else: self._error = str(e) return {} except ConnectionError as e: msg = '502 - Error de conexión' self._error = msg return {} return self._validate_result(result) def _to_string(self, data): root = ET.parse(BytesIO(data.encode('utf-8'))).getroot() xml = ET.tostring(root, pretty_print=True, xml_declaration=True, encoding='utf-8') return xml.decode('utf-8') def stamp(self, cfdi, auth={}): if DEBUG or not auth: auth = AUTH method = 'timbra' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': auth['user'], 'password': auth['pass'], 'xml': cfdi.encode('utf-8'), } result = self._get_result(client, 'stamp', args) if self.error: log.error(self.error) return '' data = { 'xml': self._to_string(result.xml), 'uuid': result.UUID, 'date': result.Fecha, } return data def _get_data_cancel(self, cfdi): NS_CFDI = { 'cfdi': 'http://www.sat.gob.mx/cfd/3', 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', } tree = ET.fromstring(cfdi.encode()) rfc_emisor = tree.xpath( 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)', namespaces=NS_CFDI) cfdi_uuid = tree.xpath( 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', namespaces=NS_CFDI) return rfc_emisor, cfdi_uuid def cancel(self, cfdi, info, auth={}): if DEBUG or not auth: auth = AUTH rfc_emisor, cfdi_uuid = self._get_data_cancel(cfdi) method = 'cancel' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) uuid_type = client.get_type('ns1:UUIDS') # ~ sa = client.get_type('ns0:stringArray') ns1_uuid = client.get_type('ns1:UUID') data_uuid = { 'UUID': cfdi_uuid, 'FolioSustitucion': info['args']['uuid'], 'Motivo': info['args']['reason'], } # ~ 'UUIDS': uuid_type(uuids=sa(string=cfdi_uuid)), args = { 'UUIDS': uuid_type(ns1_uuid(**data_uuid)), 'username': auth['user'], 'password': auth['pass'], 'taxpayer_id': rfc_emisor, 'cer': info['cer'], 'key': info['key'], 'store_pending': False, } result = self._get_result(client, 'cancel', args) if self.error: log.error(self.error) return '' folio = result['Folios']['Folio'][0] status = folio['EstatusUUID'] if status != '201': log.debug(f'Cancel status: {status} - {cfdi_uuid}') data = { 'acuse': result['Acuse'], 'date': result['Fecha'], } return data def cancel_xml(self, xml, auth={}, cfdi=''): if DEBUG or not auth: auth = AUTH method = 'cancel' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) client.set_ns_prefix('can', 'http://facturacion.finkok.com/cancel') args = { 'xml': xml.encode(), 'username': auth['user'], 'password': auth['pass'], 'store_pending': False, } result = self._get_result(client, 'cancel_signature', args) if self.error: log.error(self.error) return '' folio = result['Folios']['Folio'][0] status = folio['EstatusUUID'] if status == '708': self._error = 'Error 708 del SAT, intenta más tarde.' log.error(self.error) return '' if status != '201': log.debug(f'Cancel status: {status} -') data = { 'acuse': result['Acuse'], 'date': result['Fecha'], } return data def client_add(self, rfc, type_user=False): """Agrega un nuevo cliente para timbrado. Se requiere cuenta de reseller para usar este método Args: rfc (str): El RFC del nuevo cliente Kwargs: type_user (bool): False == 'P' == Prepago True == 'O' == On demand Returns: True or False origin PAC 'message': 'Account Created successfully' 'Account Already exists' 'success': True or False """ auth = AUTH['RESELLER'] tu = {True: 'O', False: 'P'} method = 'client' client = Client( self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'reseller_username': auth['user'], 'reseller_password': auth['pass'], 'taxpayer_id': rfc, 'type_user': tu[type_user], 'added': datetime.datetime.now().isoformat()[:19], } result = self._get_result(client, 'add', args) if self.error: return False if not result.success: self.error = result.message return False # ~ PAC success debería ser False msg = 'Account Already exists' if result.message == msg: self.error = msg return True return result.success def client_get_token(self, rfc, email): """Genera un nuevo token al cliente para timbrado. Se requiere cuenta de reseller para usar este método Args: rfc (str): El RFC del cliente, ya debe existir email (str): El correo del cliente, funciona como USER al timbrar Returns: token (str): Es la contraseña para timbrar origin PAC dict 'username': 'username', 'status': True or False 'name': 'name', 'success': True or False 'token': 'Token de timbrado', 'message': None """ auth = AUTH['RESELLER'] method = 'util' client = Client( self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': auth['user'], 'password': auth['pass'], 'name': rfc, 'token_username': email, 'taxpayer_id': rfc, 'status': True, } result = self._get_result(client, 'add_token', args) if self.error: log.error(self.error) return '' if not result.success: self.error = result.message log.error(self.error) return '' return result.token def client_add_timbres(self, rfc, credit): """Agregar credito a un emisor Se requiere cuenta de reseller Args: rfc (str): El RFC del emisor, debe existir credit (int): Cantidad de folios a agregar Returns: dict 'success': True or False, 'credit': nuevo credito despues de agregar or None 'message': 'Success, added {credit} of credit to {RFC}.' 'RFC no encontrado' """ auth = AUTH['RESELLER'] method = 'client' client = Client( self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': auth['user'], 'password': auth['pass'], 'taxpayer_id': rfc, 'credit': credit, } result = self._get_result(client, 'assign', args) if self.error: log.error(error) return '' if not result.success: self.error = result.message return 0 return result.credit def client_balance(self, auth={}, rfc=''): """Regresa los timbres restantes del cliente Se pueden usar las credenciales de relleser o las credenciales del emisor Args: auth (dict): Credenciales del emisor rfc (str): El RFC del emisor Returns: int Cantidad de timbres restantes """ if not auth: auth = AUTH['RESELLER'] method = 'client' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'reseller_username': auth['user'], 'reseller_password': auth['pass'], 'taxpayer_id': rfc, } result = self._get_result(client, 'get', args) if self.error: log.error(self.error) return '' success = bool(result.users) if not success: self._error = result.message or 'RFC no existe' log.error(self.error) return 0 return result.users.ResellerUser[0].credit def client_set_status(self, rfc, status): """Edita el estatus (Activo o Suspendido) de un cliente Se requiere cuenta de reseller para usar este método Args: rfc (str): El RFC del cliente Kwargs: status (bool): True == 'A' == Activo False == 'S' == Suspendido Returns: dict 'message': 'Account Created successfully' 'Account Already exists' 'success': True or False """ auth = AUTH['RESELLER'] ts = {True: 'A', False: 'S'} method = 'client' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'reseller_username': auth['user'], 'reseller_password': auth['pass'], 'taxpayer_id': rfc, 'status': ts[status], } result = self._get_result(client, 'edit', args) if self.error: return False if not result.success: self.error = result.message return False return True def client_switch(self, rfc, type_user): """Edita el tipo de timbrado (OnDemand o Prepago) de un cliente Se requiere cuenta de reseller para usar este método Args: rfc (str): El RFC del cliente Kwargs: status (bool): True == 'O' == OnDemand False == 'P' == Prepago Returns: dict 'message': 'Account Created successfully' 'Account Already exists' 'success': True or False """ auth = AUTH['RESELLER'] tu = {True: 'O', False: 'P'} method = 'client' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': auth['user'], 'password': auth['pass'], 'taxpayer_id': rfc, 'type_user': tu[type_user], } result = self._get_result(client, 'switch', args) if self.error: return False if not result.success: self.error = result.message return False return True def client_report_folios(self, rfc, date_from, date_to, invoice_type='I'): """Obtiene un reporte del total de facturas timbradas """ auth = AUTH['RESELLER'] args = { 'username': auth['user'], 'password': auth['pass'], 'taxpayer_id': rfc, 'date_from': date_from, 'date_to': date_to, 'invoice_type': invoice_type, } method = 'util' client = Client(self.URL[method], transport=self._transport, plugins=self._plugins) result = self._get_result(client, 'report_total', args) if result.result is None: # ~ PAC - Debería regresar RFC inexistente o sin registros self.error = 'RFC no existe o no tiene registros' return 0 total = result.result.ReportTotal[0].total return total