From a3092cf3c65f5d227ed3ec73444e1bdadbf649a8 Mon Sep 17 00:00:00 2001 From: Mauricio Baeza Date: Fri, 1 Jan 2021 17:31:23 -0600 Subject: [PATCH] Add pac Finkok --- source/comerciodigital/comercio.py | 2 - source/finkok/__init__.py | 3 + source/finkok/conf.py.example | 38 ++ source/finkok/finkok.py | 169 +++++++ source/finkok/finkok1.py | 729 +++++++++++++++++++++++++++++ source/finkok/finkok2.py | 636 +++++++++++++++++++++++++ source/tests/tests_finkok.py | 239 ++++++++++ 7 files changed, 1814 insertions(+), 2 deletions(-) create mode 100644 source/finkok/__init__.py create mode 100644 source/finkok/conf.py.example create mode 100644 source/finkok/finkok.py create mode 100644 source/finkok/finkok1.py create mode 100644 source/finkok/finkok2.py create mode 100644 source/tests/tests_finkok.py diff --git a/source/comerciodigital/comercio.py b/source/comerciodigital/comercio.py index bcca148..04239b1 100644 --- a/source/comerciodigital/comercio.py +++ b/source/comerciodigital/comercio.py @@ -78,8 +78,6 @@ class PACComercioDigital(object): def __init__(self): self.error = '' - # ~ self.cfdi_uuid = '' - # ~ self.date_stamped = '' def _error(self, msg): self.error = str(msg) diff --git a/source/finkok/__init__.py b/source/finkok/__init__.py new file mode 100644 index 0000000..1b61fc3 --- /dev/null +++ b/source/finkok/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python3 + +from .finkok import PACFinkok diff --git a/source/finkok/conf.py.example b/source/finkok/conf.py.example new file mode 100644 index 0000000..8394472 --- /dev/null +++ b/source/finkok/conf.py.example @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + + +# ~ Siempre consulta la documentación de PAC +# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC +# ~ NO cambies las credenciales de prueba + + +DEBUG = True + + +AUTH = { + 'user': '', + 'pass': '', +} + + +if DEBUG: + AUTH = { + 'user': 'pruebas-finkok@correolibre.net', + 'pass': '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366', + } diff --git a/source/finkok/finkok.py b/source/finkok/finkok.py new file mode 100644 index 0000000..9e8f578 --- /dev/null +++ b/source/finkok/finkok.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + + +import datetime +import logging +import os +import re +from io import BytesIO +from xml.sax.saxutils import unescape + +import lxml.etree as ET +from zeep import Client +from zeep.plugins import Plugin +from zeep.cache import SqliteCache +from zeep.transports import Transport +from zeep.exceptions import Fault, TransportError +from requests.exceptions import ConnectionError + +from .conf import DEBUG, AUTH + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) +logging.getLogger('requests').setLevel(logging.ERROR) + + +TIMEOUT = 10 +DEBUG_SOAP = False + + +class DebugPlugin(Plugin): + + def _to_string(self, envelope, name): + if DEBUG_SOAP: + data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode() + path = f'/tmp/soap_{name}.xml' + with open(path, 'w') as f: + f.write(data) + return + + def egress(self, envelope, http_headers, operation, binding_options): + self._to_string(envelope, 'request') + return envelope, http_headers + + def ingress(self, envelope, http_headers, operation): + self._to_string(envelope, 'response') + return envelope, http_headers + + +class PACFinkok(object): + WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl' + if DEBUG: + WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl' + URL = { + 'quick_stamp': False, + 'timbra': WS.format('stamp'), + 'cancel': WS.format('cancel'), + 'client': WS.format('registration'), + 'util': WS.format('utilities'), + } + CODE = { + '200': 'Comprobante timbrado satisfactoriamente', + '205': 'No Encontrado', + '307': 'Comprobante timbrado previamente', + '702': 'No se encontro el RFC del emisor', + 'IP': 'Invalid Passphrase', + 'IPMSG': 'Frase de paso inválida', + 'NE': 'No Encontrado', + } + + def __init__(self): + self._error = '' + self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) + self._plugins = [DebugPlugin()] + + def _validate_result(self, result): + if hasattr(result, 'CodEstatus'): + ce = result.CodEstatus + if ce == self.CODE['IP']: + self._error = self.CODE['IPMSG'] + return {} + + if self.CODE['NE'] in ce: + self._error = 'UUID ' + self.CODE['NE'] + return {} + + if self.CODE['200'] != ce: + log.error('CodEstatus', type(ce), ce) + return result + + if hasattr(result, 'Incidencias'): + fault = result.Incidencias.Incidencia[0] + cod_error = fault.CodigoError.encode('utf-8') + msg_error = fault.MensajeIncidencia.encode('utf-8') + error = 'Error: {}\n{}'.format(cod_error, msg_error) + self._error = self.CODE.get(cod_error, error) + return {} + + return result + + def _get_result(self, client, method, args): + self._error = '' + try: + result = getattr(client.service, method)(**args) + except Fault as e: + self._error = str(e) + return {} + except TransportError as e: + if '413' in str(e): + self._error = '413

Documento muy grande para timbrar' + else: + self._error = str(e) + return {} + except ConnectionError as e: + msg = '502 - Error de conexión' + self._error = msg + return {} + + return self._validate_result(result) + + def _to_string(self, data): + root = ET.parse(BytesIO(data.encode('utf-8'))).getroot() + xml = ET.tostring(root, + pretty_print=True, xml_declaration=True, encoding='utf-8') + return xml.decode('utf-8') + + def stamp(self, cfdi, auth={}): + if DEBUG or not auth: + auth = AUTH + + method = 'timbra' + client = Client(self.URL[method], + transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['user'], + 'password': auth['pass'], + 'xml': cfdi, + } + result = self._get_result(client, 'stamp', args) + if self.error: + return '' + + data = { + 'xml': self._to_string(result.xml), + 'uuid': result.UUID, + 'date': result.Fecha, + } + return data diff --git a/source/finkok/finkok1.py b/source/finkok/finkok1.py new file mode 100644 index 0000000..dda9195 --- /dev/null +++ b/source/finkok/finkok1.py @@ -0,0 +1,729 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + + +import datetime +import os +import re +from io import BytesIO +from xml.sax.saxutils import unescape + +import lxml.etree as ET +from zeep import Client +from zeep.plugins import Plugin +from zeep.cache import SqliteCache +from zeep.transports import Transport +from zeep.exceptions import Fault, TransportError +from requests.exceptions import ConnectionError + +from conf import DEBUG, FINKOK + + +TIMEOUT = 10 +DEBUG_SOAP = False + + +class DebugPlugin(Plugin): + + def _to_string(self, envelope, name): + if DEBUG_SOAP: + data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode() + path = f'/tmp/soap_{name}.xml' + with open(path, 'w') as f: + f.write(data) + return + + def egress(self, envelope, http_headers, operation, binding_options): + self._to_string(envelope, 'request') + return envelope, http_headers + + def ingress(self, envelope, http_headers, operation): + self._to_string(envelope, 'response') + return envelope, http_headers + + +class PACFinkok(object): + URL = { + 'quick_stamp': False, + 'timbra': FINKOK['WS'].format('stamp'), + 'cancel': FINKOK['WS'].format('cancel'), + 'client': FINKOK['WS'].format('registration'), + 'util': FINKOK['WS'].format('utilities'), + } + CODE = { + '200': 'Comprobante timbrado satisfactoriamente', + '205': 'No Encontrado', + '307': 'Comprobante timbrado previamente', + '702': 'No se encontro el RFC del emisor', + 'IP': 'Invalid Passphrase', + 'IPMSG': 'Frase de paso inválida', + 'NE': 'No Encontrado', + } + + def __init__(self): + self._error = '' + self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) + self._plugins = [DebugPlugin()] + + def _validate_result(self, result): + if hasattr(result, 'CodEstatus'): + ce = result.CodEstatus + if ce == self.CODE['IP']: + self.error = self.CODE['IPMSG'] + return {} + + if self.CODE['NE'] in ce: + self.error = 'UUID ' + self.CODE['NE'] + return {} + + if self.CODE['200'] != ce: + print('CodEstatus', type(ce), ce) + return result + + if hasattr(result, 'Incidencias'): + fault = result.Incidencias.Incidencia[0] + cod_error = fault.CodigoError.encode('utf-8') + msg_error = fault.MensajeIncidencia.encode('utf-8') + error = 'Error: {}\n{}'.format(cod_error, msg_error) + self.error = self.CODE.get(cod_error, error) + return {} + + return result + + def _get_result(self, client, method, args): + self.error = '' + try: + result = getattr(client.service, method)(**args) + except Fault as e: + self.error = str(e) + return {} + except TransportError as e: + if '413' in str(e): + self.error = '413

Documento muy grande para timbrar' + else: + self.error = str(e) + return {} + except ConnectionError as e: + msg = '502 - Error de conexión' + self.error = msg + return {} + + return self._validate_result(result) + + def cfdi_stamp(self, cfdi, auth={}): + if not auth: + auth = FINKOK['AUTH'] + + method = 'timbra' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'xml': cfdi, + } + + result = self._get_result(client, 'stamp', args) + if self.error: + return {} + + data = { + 'xml': self._to_string(result.xml), + 'uuid': result.UUID, + 'fecha': result.Fecha, + } + return data + + def cfdi_cancel(self, rfc, uuid, cer, key, auth={}): + if not auth: + auth = FINKOK['AUTH'] + + method = 'cancel' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + uuid_type = client.get_type('ns1:UUIDS') + sa = client.get_type('ns0:stringArray') + + args = { + 'UUIDS': uuid_type(uuids=sa(string=uuid)), + 'username': auth['USER'], + 'password': auth['PASS'], + 'taxpayer_id': rfc, + 'cer': cer, + 'key': key, + 'store_pending': False, + } + + result = self._get_result(client, 'cancel', args) + if self.error: + return {} + + return result + + def cfdi_status(self, uuid, auth={}): + if not auth: + auth = FINKOK['AUTH'] + + method = 'timbra' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'uuid': uuid, + } + + result = self._get_result(client, 'query_pending', args) + if self.error: + return {} + + STATUS = { + 'C': 'Cancelado', + 'S': 'Timbrado, aún no eviado al SAT', + 'F': 'Timbrado y enviado al SAT', + } + + data = { + 'estatus': STATUS[result.status], + 'xml': self._to_string(unescape(result.xml)), + 'fecha': result.date, + } + + return data + + def client_add(self, rfc, type_user=False): + """Agrega un nuevo cliente para timbrado. + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del nuevo cliente + + Kwargs: + type_user (bool): False == 'P' == Prepago or True == 'O' == On demand + + Returns: + True or False + + origin PAC + 'message': + 'Account Created successfully' + 'Account Already exists' + 'success': True or False + """ + auth = FINKOK['RESELLER'] + tu = {True: 'O', False: 'P'} + + method = 'client' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'reseller_username': auth['USER'], + 'reseller_password': auth['PASS'], + 'taxpayer_id': rfc, + 'type_user': tu[type_user], + 'added': datetime.datetime.now().isoformat()[:19], + } + + result = self._get_result(client, 'add', args) + if self.error: + return False + + if not result.success: + self.error = result.message + return False + + # ~ PAC success debería ser False + msg = 'Account Already exists' + if result.message == msg: + self.error = msg + return True + + return result.success + + def client_add_token(self, rfc, email): + """Agrega un nuevo token al cliente para timbrado. + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del cliente, ya debe existir + email (str): El correo del cliente, funciona como USER al timbrar + + Returns: + token (str): Es la contraseña para timbrar + + origin PAC + dict + 'username': 'username', + 'status': True or False + 'name': 'name', + 'success': True or False + 'token': 'Token de timbrado', + 'message': None + """ + auth = FINKOK['RESELLER'] + method = 'util' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'name': rfc, + 'token_username': email, + 'taxpayer_id': rfc, + 'status': True, + } + + result = self._get_result(client, 'add_token', args) + if self.error: + return '' + + if not result.success: + self.error = result.message + return '' + + return result.token + + # ~ Send issue to PAC + def client_reset_token(self, email): + auth = FINKOK['RESELLER'] + method = 'util' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'token': email, + } + + result = self._get_result(client, 'reset_token', args) + if self.error: + return '' + + if not result.success: + self.error = result.message + return '' + + return result.token + + def client_add_timbres(self, rfc, credit): + """Agregar credito a un emisor + + Se requiere cuenta de reseller + + Args: + rfc (str): El RFC del emisor, debe existir + credit (int): Cantidad de folios a agregar + + Returns: + dict + 'success': True or False, + 'credit': nuevo credito despues de agregar or None + 'message': + 'Success, added {credit} of credit to {RFC}.' + 'RFC no encontrado' + """ + auth = FINKOK['RESELLER'] + + if not isinstance(credit, int): + self.error = 'El credito debe ser un entero' + return 0 + + method = 'client' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'taxpayer_id': rfc, + 'credit': credit, + } + + result = self._get_result(client, 'assign', args) + if self.error: + return '' + + if not result.success: + self.error = result.message + return 0 + + return result.credit + + def client_edit(self, rfc, status=True): + """Edita el estatus (Activo o Suspendido) de un cliente + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del cliente + + Kwargs: + status (bool): True == 'A' == Activo or False == 'S' == Suspendido + + Returns: + dict + 'message': + 'Account Created successfully' + 'Account Already exists' + 'success': True or False + """ + auth = FINKOK['RESELLER'] + ts = {True: 'A', False: 'S'} + method = 'client' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'reseller_username': auth['USER'], + 'reseller_password': auth['PASS'], + 'taxpayer_id': rfc, + 'status': ts[status], + } + result = self._get_result(client, 'edit', args) + if self.error: + return False + + if not result.success: + self.error = result.message + return False + + return True + + def client_get(self, rfc): + """Regresa el estatus del cliente + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del emisor + + Returns: + dict + 'message': None, + 'users': { + 'ResellerUser': [ + { + 'status': 'A', + 'counter': 0, + 'taxpayer_id': '', + 'credit': 0 + } + ] + } or None si no existe + """ + auth = FINKOK['RESELLER'] + + method = 'client' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'reseller_username': auth['USER'], + 'reseller_password': auth['PASS'], + 'taxpayer_id': rfc, + } + + try: + self.result = client.service.get(**args) + except Fault as e: + self.error = str(e) + return {} + except TransportError as e: + self.error = str(e) + return {} + except ConnectionError: + self.error = 'Verifica la conexión a internet' + return {} + + success = bool(self.result.users) + if not success: + self.error = self.result.message or 'RFC no existe' + return {} + + data = self.result.users.ResellerUser[0] + client = { + 'status': data.status, + 'counter': data.counter, + 'credit': data.credit, + } + return client + + def client_get_timbres(self, rfc, auth={}): + """Regresa los timbres restantes del cliente + Se pueden usar las credenciales de relleser o las credenciales del emisor + + Args: + rfc (str): El RFC del emisor + + Kwargs: + auth (dict): Credenciales del emisor + + Returns: + int Cantidad de timbres restantes + """ + + if not auth: + auth = FINKOK['RESELLER'] + + method = 'client' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'reseller_username': auth['USER'], + 'reseller_password': auth['PASS'], + 'taxpayer_id': rfc, + } + + try: + self.result = client.service.get(**args) + except Fault as e: + self.error = str(e) + return 0 + except TransportError as e: + self.error = str(e) + return 0 + except ConnectionError: + self.error = 'Verifica la conexión a internet' + return 0 + + success = bool(self.result.users) + if not success: + self.error = self.result.message or 'RFC no existe' + return 0 + + return self.result.users.ResellerUser[0].credit + + def get_server_datetime(self): + """Regresa la fecha y hora del servidor de timbrado del PAC + """ + auth = FINKOK['RESELLER'] + + method = 'util' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + try: + self.result = client.service.datetime(auth['USER'], auth['PASS']) + except Fault as e: + self.error = str(e) + return None + except TransportError as e: + self.error = str(e) + return None + except ConnectionError: + self.error = 'Verifica la conexión a internet' + return None + + try: + dt = datetime.datetime.strptime( + self.result.datetime, '%Y-%m-%dT%H:%M:%S') + except ValueError: + self.error = 'Error al obtener la fecha' + return None + + return dt + + def get_report_credit(self, rfc): + """Obtiene un reporte de los timbres agregados + """ + auth = FINKOK['RESELLER'] + + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'taxpayer_id': rfc, + } + + method = 'util' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + try: + self.result = client.service.report_credit(**args) + except Fault as e: + self.error = str(e) + return [] + except TransportError as e: + self.error = str(e) + return [] + except ConnectionError: + self.error = 'Verifica la conexión a internet' + return [] + + if self.result.result is None: + # ~ PAC - Debería regresar RFC inexistente o sin registros + self.error = 'RFC no existe o no tiene registros' + return [] + + return self.result.result.ReportTotalCredit + + def get_report_total(self, rfc, date_from, date_to, invoice_type='I'): + """Obtiene un reporte del total de facturas timbradas + """ + auth = FINKOK['RESELLER'] + + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'taxpayer_id': rfc, + 'date_from': date_from, + 'date_to': date_to, + 'invoice_type': invoice_type, + } + + method = 'util' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + try: + self.result = client.service.report_total(**args) + except Fault as e: + self.error = str(e) + return 0 + except TransportError as e: + self.error = str(e) + return 0 + except ConnectionError: + self.error = 'Verifica la conexión a internet' + return 0 + + if self.result.result is None: + # ~ PAC - Debería regresar RFC inexistente o sin registros + self.error = 'RFC no existe o no tiene registros' + return 0 + + return self.result.result.ReportTotal[0].total or 0 + + def get_report_uuid(self, rfc, date_from, date_to, invoice_type='I'): + """Obtiene un reporte de los CFDI timbrados + """ + auth = FINKOK['RESELLER'] + + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'taxpayer_id': rfc, + 'date_from': date_from, + 'date_to': date_to, + 'invoice_type': invoice_type, + } + + method = 'util' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + try: + self.result = client.service.report_uuid(**args) + except Fault as e: + self.error = str(e) + return [] + except TransportError as e: + self.error = str(e) + return [] + except ConnectionError: + self.error = 'Verifica la conexión a internet' + return [] + + if self.result.invoices is None: + # ~ PAC - Debería regresar RFC inexistente o sin registros + self.error = 'RFC no existe o no tiene registros' + return [] + + return self.result.invoices.ReportUUID + + def _to_string(self, data): + root = ET.parse(BytesIO(data.encode('utf-8'))).getroot() + xml = ET.tostring(root, + pretty_print=True, xml_declaration=True, encoding='utf-8') + return xml.decode('utf-8') + + def cfdi_get_by_xml(self, xml, auth): + if not auth: + auth = FINKOK['AUTH'] + + method = 'timbra' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'xml': xml, + } + + try: + result = client.service.stamped(**args) + except Fault as e: + self.error = str(e) + return {} + except TransportError as e: + self.error = str(e) + return {} + except ConnectionError as e: + msg = '502 - Error de conexión' + self.error = msg + return {} + + print(result) + + error = 'Error: {}\n{}'.format(code_error, msg_error) + self.error = self.CODE.get(code_error, error) + return {} + + def cfdi_get_by_uuid(self, uuid, rfc, invoice_type='I', auth={}): + if not auth: + auth = FINKOK['AUTH'] + + method = 'util' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'uuid': uuid, + 'taxpayer_id': rfc, + 'invoice_type': invoice_type, + } + try: + result = client.service.get_xml(**args) + except Fault as e: + self.error = str(e) + return {} + except TransportError as e: + self.error = str(e) + return {} + except ConnectionError as e: + msg = '502 - Error de conexión' + self.error = msg + return {} + + print(result) + + error = 'Error: {}\n{}'.format(code_error, msg_error) + self.error = self.CODE.get(code_error, error) + return {} + + +def main(): + rfc = 'TEST740115999' + # ~ rfc = 'TCM970625MB1' + email = 'test999@empresalibre.mx' + pac = PACFinkok() + result = pac.client_get(rfc) + print(result) + result = pac.client_add_timbres(rfc, 10) + print(result) + return + + +if __name__ == '__main__': + main() diff --git a/source/finkok/finkok2.py b/source/finkok/finkok2.py new file mode 100644 index 0000000..80f5807 --- /dev/null +++ b/source/finkok/finkok2.py @@ -0,0 +1,636 @@ +#!/usr/bin/env python3 + +#~ import re +#~ from xml.etree import ElementTree as ET +#~ from requests import Request, Session, exceptions +import datetime +import hashlib +import os +import requests +import time +from lxml import etree +from xml.dom.minidom import parseString +from xml.sax.saxutils import escape, unescape +from uuid import UUID + +from logbook import Logger +from zeep import Client +from zeep.plugins import HistoryPlugin +from zeep.cache import SqliteCache +from zeep.transports import Transport +from zeep.exceptions import Fault, TransportError +from requests.exceptions import ConnectionError + + +if __name__ == '__main__': + from configpac import DEBUG, TIMEOUT, AUTH, URL +else: + from .configpac import DEBUG, TIMEOUT, AUTH, URL + + +log = Logger('PAC') +#~ node = client.create_message(client.service, SERVICE, **args) +#~ print(etree.tostring(node, pretty_print=True).decode()) + + +class Finkok(object): + + def __init__(self, auth={}): + self.codes = URL['codes'] + self.error = '' + self.message = '' + self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) + self._plugins = None + self._history = None + self.uuid = '' + self.fecha = None + if DEBUG: + self._history = HistoryPlugin() + self._plugins = [self._history] + self._auth = AUTH + else: + self._auth = auth + + def _debug(self): + if not DEBUG: + return + print('SEND', self._history.last_sent) + print('RESULT', self._history.last_received) + return + + def _check_result(self, method, result): + # ~ print ('CODE', result.CodEstatus) + # ~ print ('INCIDENCIAS', result.Incidencias) + self.message = '' + MSG = { + 'OK': 'Comprobante timbrado satisfactoriamente', + '307': 'Comprobante timbrado previamente', + } + status = result.CodEstatus + if status is None and result.Incidencias: + for i in result.Incidencias['Incidencia']: + self.error += 'Error: {}\n{}\n{}'.format( + i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo']) + return '' + + if method == 'timbra' and status in (MSG['OK'], MSG['307']): + #~ print ('UUID', result.UUID) + #~ print ('FECHA', result.Fecha) + if status == MSG['307']: + self.message = MSG['307'] + tree = parseString(result.xml) + response = tree.toprettyxml(encoding='utf-8').decode('utf-8') + self.uuid = result.UUID + self.fecha = result.Fecha + + return response + + def _load_file(self, path): + try: + with open(path, 'rb') as f: + data = f.read() + except Exception as e: + self.error = str(e) + return + return data + + def _validate_xml(self, file_xml): + if os.path.isfile(file_xml): + try: + with open(file_xml, 'rb') as f: + xml = f.read() + except Exception as e: + self.error = str(e) + return False, '' + else: + xml = file_xml.encode('utf-8') + return True, xml + + def _validate_uuid(self, uuid): + try: + UUID(uuid) + return True + except ValueError: + self.error = 'UUID no válido: {}'.format(uuid) + return False + + def timbra_xml(self, file_xml): + self.error = '' + + if not DEBUG and not self._auth: + self.error = 'Sin datos para timbrar' + return + + method = 'timbra' + ok, xml = self._validate_xml(file_xml) + if not ok: + return '' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'username': self._auth['USER'], + 'password': self._auth['PASS'], + 'xml': xml, + } + if URL['quick_stamp']: + try: + result = client.service.quick_stamp(**args) + except Fault as e: + self.error = str(e) + return + else: + try: + result = client.service.stamp(**args) + except Fault as e: + self.error = str(e) + return + except TransportError as e: + if '413' in str(e): + self.error = '413

Documento muy grande para timbrar' + else: + self.error = str(e) + return + except ConnectionError as e: + msg = '502 - Error de conexión' + self.error = msg + return + + return self._check_result(method, result) + + def _get_xml(self, uuid): + if not self._validate_uuid(uuid): + return '' + + method = 'util' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'username': self._auth['USER'], + 'password': self._auth['PASS'], + 'uuid': uuid, + 'taxpayer_id': self.rfc, + 'invoice_type': 'I', + } + try: + result = client.service.get_xml(**args) + except Fault as e: + self.error = str(e) + return '' + except TransportError as e: + self.error = str(e) + return '' + + if result.error: + self.error = result.error + return '' + + tree = parseString(result.xml) + xml = tree.toprettyxml(encoding='utf-8').decode('utf-8') + return xml + + def recupera_xml(self, file_xml='', uuid=''): + self.error = '' + if uuid: + return self._get_xml(uuid) + + method = 'timbra' + ok, xml = self._validate_xml(file_xml) + if not ok: + return '' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + try: + result = client.service.stamped( + xml, self._auth['user'], self._auth['pass']) + except Fault as e: + self.error = str(e) + return '' + + return self._check_result(method, result) + + def estatus_xml(self, uuid): + method = 'timbra' + if not self._validate_uuid(uuid): + return '' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + try: + result = client.service.query_pending( + self._auth['USER'], self._auth['PASS'], uuid) + return result.status + except Fault as e: + self.error = str(e) + return '' + + def cancel_xml(self, rfc, uuid, cer, key): + # ~ for u in uuids: + # ~ if not self._validate_uuid(u): + # ~ return '' + + method = 'cancel' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + uuid_type = client.get_type('ns1:UUIDS') + sa = client.get_type('ns0:stringArray') + + args = { + 'UUIDS': uuid_type(uuids=sa(string=uuid)), + 'username': self._auth['USER'], + 'password': self._auth['PASS'], + 'taxpayer_id': rfc, + 'cer': cer, + 'key': key, + 'store_pending': False, + } + try: + result = client.service.cancel(**args) + except Fault as e: + self.error = str(e) + return '' + + if result.CodEstatus and self.codes['205'] in result.CodEstatus: + self.error = result.CodEstatus + return '' + + return result + + def cancel_signature(self, file_xml): + method = 'cancel' + if os.path.isfile(file_xml): + root = etree.parse(file_xml).getroot() + else: + root = etree.fromstring(file_xml.encode()) + + xml = etree.tostring(root) + + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'username': self._auth['USER'], + 'password': self._auth['PASS'], + 'xml': xml, + 'store_pending': False, + } + + try: + result = client.service.cancel_signature(**args) + return result + except Fault as e: + self.error = str(e) + return '' + + def get_acuse(self, rfc, uuids, type_acuse='C'): + for u in uuids: + if not self._validate_uuid(u): + return '' + + method = 'cancel' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'username': self._auth['USER'], + 'password': self._auth['PASS'], + 'taxpayer_id': rfc, + 'uuid': '', + 'type': type_acuse, + } + try: + result = [] + for u in uuids: + args['uuid'] = u + r = client.service.get_receipt(**args) + result.append(r) + except Fault as e: + self.error = str(e) + return '' + + return result + + def estatus_cancel(self, uuids): + for u in uuids: + if not self._validate_uuid(u): + return '' + + method = 'cancel' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'username': self._auth['USER'], + 'password': self._auth['PASS'], + 'uuid': '', + } + try: + result = [] + for u in uuids: + args['uuid'] = u + r = client.service.query_pending_cancellation(**args) + result.append(r) + except Fault as e: + self.error = str(e) + return '' + + return result + + def add_token(self, rfc, email): + """Agrega un nuevo token al cliente para timbrado. + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del cliente, ya debe existir + email (str): El correo del cliente, funciona como USER al timbrar + + Returns: + dict + 'username': 'username', + 'status': True or False + 'name': 'name', + 'success': True or False + 'token': 'Token de timbrado', + 'message': None + """ + auth = AUTH['RESELLER'] + + method = 'util' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'name': rfc, + 'token_username': email, + 'taxpayer_id': rfc, + 'status': True, + } + try: + result = client.service.add_token(**args) + except Fault as e: + self.error = str(e) + return '' + + return result + + def get_date(self): + method = 'util' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + try: + result = client.service.datetime(AUTH['USER'], AUTH['PASS']) + except Fault as e: + self.error = str(e) + return '' + + if result.error: + self.error = result.error + return + + return result.datetime + + def add_client(self, rfc, type_user=False): + """Agrega un nuevo cliente para timbrado. + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del nuevo cliente + + Kwargs: + type_user (bool): False == 'P' == Prepago or True == 'O' == On demand + + Returns: + dict + 'message': + 'Account Created successfully' + 'Account Already exists' + 'success': True or False + """ + auth = AUTH['RESELLER'] + + tu = {False: 'P', True: 'O'} + method = 'client' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'reseller_username': auth['USER'], + 'reseller_password': auth['PASS'], + 'taxpayer_id': rfc, + 'type_user': tu[type_user], + 'added': datetime.datetime.now().isoformat()[:19], + } + try: + result = client.service.add(**args) + except Fault as e: + self.error = str(e) + return '' + + return result + + def edit_client(self, rfc, status=True): + """ + Se requiere cuenta de reseller para usar este método + status = 'A' or 'S' + """ + auth = AUTH['RESELLER'] + + sv = {False: 'S', True: 'A'} + method = 'client' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'reseller_username': auth['USER'], + 'reseller_password': auth['PASS'], + 'taxpayer_id': rfc, + 'status': sv[status], + } + try: + result = client.service.edit(**args) + except Fault as e: + self.error = str(e) + return '' + + return result + + def get_client(self, rfc): + """Regresa el estatus del cliente + . + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del emisor + + Returns: + dict + 'message': None, + 'users': { + 'ResellerUser': [ + { + 'status': 'A', + 'counter': 0, + 'taxpayer_id': '', + 'credit': 0 + } + ] + } or None si no existe + """ + auth = AUTH['RESELLER'] + + method = 'client' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'reseller_username': auth['USER'], + 'reseller_password': auth['PASS'], + 'taxpayer_id': rfc, + } + + try: + result = client.service.get(**args) + except Fault as e: + self.error = str(e) + return '' + except TransportError as e: + self.error = str(e) + return '' + + return result + + def assign_client(self, rfc, credit): + """Agregar credito a un emisor + + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del emisor, debe existir + credit (int): Cantidad de folios a agregar + + Returns: + dict + 'success': True or False, + 'credit': nuevo credito despues de agregar or None + 'message': + 'Success, added {credit} of credit to {RFC}' + 'RFC no encontrado' + """ + auth = AUTH['RESELLER'] + + method = 'client' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['USER'], + 'password': auth['PASS'], + 'taxpayer_id': rfc, + 'credit': credit, + } + try: + result = client.service.assign(**args) + except Fault as e: + self.error = str(e) + return '' + + return result + + def client_get_timbres(self, rfc): + method = 'client' + client = Client( + URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'reseller_username': self._auth['USER'], + 'reseller_password': self._auth['PASS'], + 'taxpayer_id': rfc, + } + + try: + self.result = client.service.get(**args) + except Fault as e: + self.error = str(e) + return 0 + except TransportError as e: + self.error = str(e) + return 0 + except ConnectionError: + self.error = 'Verifica la conexión a internet' + return 0 + + success = bool(self.result.users) + if not success: + self.error = self.result.message or 'RFC no existe' + return 0 + + return self.result.users.ResellerUser[0].credit + + +def _get_data_sat(path): + BF = 'string(//*[local-name()="{}"]/@{})' + NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'} + + try: + if os.path.isfile(path): + tree = etree.parse(path).getroot() + else: + tree = etree.fromstring(path.encode()) + + data = {} + emisor = escape( + tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or + tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI) + ) + receptor = escape( + tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or + tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI) + ) + data['total'] = tree.get('total') or tree.get('Total') + data['emisor'] = emisor + data['receptor'] = receptor + data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID')) + except Exception as e: + print (e) + return {} + + return '?re={emisor}&rr={receptor}&tt={total}&id={uuid}'.format(**data) + + +def get_status_sat(xml): + data = _get_data_sat(xml) + if not data: + return 'XML inválido' + + data = """ + + + + + + {} + + + + """.format(data) + headers = { + 'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"', + 'Content-type': 'text/xml; charset="UTF-8"' + } + URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc' + + try: + result = requests.post(URL, data=data, headers=headers) + tree = etree.fromstring(result.text) + node = tree.xpath("//*[local-name() = 'Estado']")[0] + except Exception as e: + return 'Error: {}'.format(str(e)) + + return node.text + + +def main(): + return + + +if __name__ == '__main__': + main() diff --git a/source/tests/tests_finkok.py b/source/tests/tests_finkok.py new file mode 100644 index 0000000..0197e55 --- /dev/null +++ b/source/tests/tests_finkok.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 + +import base64 +import datetime +import sys +import time +import unittest +import uuid +import lxml.etree as ET +from io import BytesIO +from pathlib import Path + +sys.path.append('..') +from pycert import SATCertificate +from finkok import PACFinkok + + +NAME = 'finkok' + + +TEMPLATE_CFDI = """ + + + + + + + + + + + + + + + + + +""" + + +TEMPLATE_CANCEL = """ + + {uuid} + + + + + + + + + + + + + + + + + + + + + + + + +""" + + +class TestCfdi(object): + + def __init__(self): + self._xml = '' + self._make_cfdi() + + @property + def xml(self): + return self._xml.decode() + + def _make_cfdi(self): + path = Path(__file__) + path_cer = Path(path.parent).joinpath('certificados', f'{NAME}.cer') + path_key = Path(path.parent).joinpath('certificados', f'{NAME}.enc') + path_xslt = Path(path.parent).joinpath('xslt', 'cadena.xslt') + self._cer_ori = cer = path_cer.read_bytes() + self._key_ori = key = path_key.read_bytes() + + self._cert = SATCertificate(cer, key) + self._doc = ET.parse(BytesIO(TEMPLATE_CFDI.encode())) + self._root = self._doc.getroot() + self._root.attrib['Fecha'] = datetime.datetime.now().isoformat()[:19] + self._root.attrib['NoCertificado'] = self._cert.serial_number + self._root.attrib['Certificado'] = self._cert.cer_txt + + self._add_stamp(path_xslt) + + self._xml = ET.tostring(self._root, + pretty_print=True, doctype='') + return + + def _add_stamp(self, path_xslt): + xslt = open(path_xslt, 'rb') + transfor = ET.XSLT(ET.parse(xslt)) + cadena = str(transfor(self._doc)).encode() + stamp = self._cert.sign(cadena) + self._root.attrib['Sello'] = stamp + xslt.close() + return + + def sign_xml(self, template): + tree = ET.fromstring(template.encode()) + tree = self._cert.sign_xml(tree) + xml = ET.tostring(tree).decode() + return xml + + @property + def cert(self): + cer = base64.b64encode(self._cer_ori).decode() + key = base64.b64encode(self._key_ori).decode() + return key, cer + + +class TestStamp(unittest.TestCase): + + def setUp(self): + print(f'In method: {self._testMethodName}') + self.pac = PACFinkok() + + def test_cfdi_stamp(self): + cfdi = TestCfdi().xml + result = self.pac.stamp(cfdi) + cfdi_uuid = result['uuid'] + + self.assertFalse(bool(self.pac.error)) + self.assertTrue(bool(uuid.UUID(cfdi_uuid))) + + def test_cfdi_cancel(self): + expected = '201' + cfdi = TestCfdi() + result = self.pac.stamp(cfdi.xml) + cfdi_uuid = self.pac.cfdi_uuid + + self.assertFalse(bool(self.pac.error)) + self.assertTrue(bool(uuid.UUID(cfdi_uuid))) + + time.sleep(1) + cert = cfdi.cert + info = { + 'key': cert[0], + 'cer': cert[1], + 'pass': '12345678a', + 'tipo': 'cfdi3.3', + } + result = self.pac.cancel(result, info) + self.assertFalse(bool(self.pac.error)) + + tree = ET.fromstring(result) + cancel_uuid = tree.xpath('string(//Acuse/Folios/UUID)') + status = tree.xpath('string(//Acuse/Folios/EstatusUUID)') + + self.assertEqual(cfdi_uuid, cancel_uuid) + self.assertEqual(status, expected) + + def test_cfdi_cancel_xml(self): + expected = '201' + cfdi = TestCfdi() + result = self.pac.stamp(cfdi.xml) + cfdi_uuid = self.pac.cfdi_uuid + + self.assertFalse(bool(self.pac.error)) + self.assertTrue(bool(uuid.UUID(cfdi_uuid))) + + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(result.encode()) + rfc_emisor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)', + namespaces=NS_CFDI) + + time.sleep(1) + data = { + 'rfc': rfc_emisor, + 'fecha': datetime.datetime.now().isoformat()[:19], + 'uuid': cfdi_uuid, + } + template = TEMPLATE_CANCEL.format(**data) + sign_xml = cfdi.sign_xml(template) + info = { + 'tipo': 'cfdi3.3', + } + result = self.pac.cancel_xml(result, sign_xml, info) + tree = ET.fromstring(result) + uid = tree.xpath('string(//Acuse/Folios/UUID)') + status = tree.xpath('string(//Acuse/Folios/EstatusUUID)') + + self.assertEqual(cfdi_uuid, uid) + self.assertEqual(status, expected) + + def test_cfdi_status(self): + expected = '' + cfdi = TestCfdi() + result = self.pac.stamp(cfdi.xml) + cfdi_uuid = self.pac.cfdi_uuid + + self.assertFalse(bool(self.pac.error)) + self.assertTrue(bool(uuid.UUID(cfdi_uuid))) + + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(result.encode()) + rfc_emisor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)', + namespaces=NS_CFDI) + rfc_receptor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)', + namespaces=NS_CFDI) + total = tree.xpath( + 'string(//cfdi:Comprobante/@Total)', + namespaces=NS_CFDI) + + time.sleep(3) + data = { + 'rfc_receptor': rfc_receptor, + 'rfc_emisor': rfc_emisor, + 'total': total, + 'uuid': cfdi_uuid, + } + result = self.pac.status(data) + self.assertEqual(result, expected) + + +if __name__ == '__main__': + unittest.main()