From 348a7f6ecb67c587f109937a7a147a27b53ddf40 Mon Sep 17 00:00:00 2001 From: Mauricio Baeza Date: Sat, 2 Jan 2021 18:16:15 -0600 Subject: [PATCH] Add pac Finkok refactory --- source/app/controllers/conf.py.example | 13 - source/app/controllers/configpac.py | 62 -- source/app/controllers/pac.py | 755 ------------------ source/app/controllers/pacs/__init__.py | 1 + .../app/controllers/{ => pacs}/cfdi_cert.py | 7 +- source/app/controllers/pacs/conf.py.example | 6 + .../app/controllers/pacs/finkok/__init__.py | 3 + .../controllers/pacs/finkok/conf.py.example | 46 ++ source/app/controllers/pacs/finkok/finkok.py | 549 +++++++++++++ source/app/controllers/util.py | 2 +- source/app/controllers/utils.py | 6 +- source/app/models/main.py | 36 +- 12 files changed, 627 insertions(+), 859 deletions(-) delete mode 100644 source/app/controllers/conf.py.example delete mode 100644 source/app/controllers/configpac.py delete mode 100644 source/app/controllers/pac.py rename source/app/controllers/{ => pacs}/cfdi_cert.py (98%) create mode 100644 source/app/controllers/pacs/conf.py.example create mode 100644 source/app/controllers/pacs/finkok/__init__.py create mode 100644 source/app/controllers/pacs/finkok/conf.py.example create mode 100644 source/app/controllers/pacs/finkok/finkok.py diff --git a/source/app/controllers/conf.py.example b/source/app/controllers/conf.py.example deleted file mode 100644 index 2a015b5..0000000 --- a/source/app/controllers/conf.py.example +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env python3 - - -DEBUG = False - -#~ Ecodex -ID_INTEGRADOR = '' - -#~ Finkok -FINKOK= { - 'USER': '', - 'PASS': '', -} diff --git a/source/app/controllers/configpac.py b/source/app/controllers/configpac.py deleted file mode 100644 index d4c70a0..0000000 --- a/source/app/controllers/configpac.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python3 - - -from .conf import DEBUG, FINKOK - -DEBUG = DEBUG -TIMEOUT = 10 - -#~ PACs que han proporcionado un entorno de pruebas libre y abierto -#~ ecodex, finkok -PAC = 'finkok' - - -#~ IMPORTANTE: Si quieres hacer pruebas, con tu propio correo de usuario y -#~ contraseña, ponte en contacto con Finkok para que te asignen tus datos de -#~ acceso, consulta su documentación para ver las diferentes opciones de acceso. -#~ Si solo estas haciendo pruebas de timbrado y ancelación, con estos datos debería -#~ ser suficiente. -def finkok(debug): - USER = FINKOK['USER'] - PASS = FINKOK['PASS'] - TOKEN = '' - auth = { - 'DEBUG': debug, - 'USER': '', - 'PASS': TOKEN or PASS, - 'RESELLER': {'USER': USER, 'PASS': PASS} - } - if debug: - USER = 'pruebas-finkok@correolibre.net' - PASS = '' - TOKEN = '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366' - auth = { - 'DEBUG': debug, - 'USER': USER, - 'PASS': TOKEN or PASS, - 'RESELLER': { - 'USER': '', - 'PASS': '' - } - } - - base_url = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl' - if debug: - base_url = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl' - url = { - 'timbra': base_url.format('stamp'), - 'quick_stamp': False, - 'cancel': base_url.format('cancel'), - 'client': base_url.format('registration'), - 'util': base_url.format('utilities'), - 'codes': { - '200': 'Comprobante timbrado satisfactoriamente', - '307': 'Comprobante timbrado previamente', - '205': 'No Encontrado', - } - } - return auth, url - - -AUTH, URL = globals()[PAC](DEBUG) - diff --git a/source/app/controllers/pac.py b/source/app/controllers/pac.py deleted file mode 100644 index 9242773..0000000 --- a/source/app/controllers/pac.py +++ /dev/null @@ -1,755 +0,0 @@ -#!/usr/bin/env python3 - -#~ import re -#~ from xml.etree import ElementTree as ET -#~ from requests import Request, Session, exceptions -import datetime -import hashlib -import os -import requests -import time -from lxml import etree -from xml.dom.minidom import parseString -from xml.sax.saxutils import escape, unescape -from uuid import UUID - -from logbook import Logger -from zeep import Client -from zeep.plugins import HistoryPlugin -from zeep.cache import SqliteCache -from zeep.transports import Transport -from zeep.exceptions import Fault, TransportError -from requests.exceptions import ConnectionError - - -if __name__ == '__main__': - from configpac import DEBUG, TIMEOUT, AUTH, URL -else: - from .configpac import DEBUG, TIMEOUT, AUTH, URL - - -log = Logger('PAC') -#~ node = client.create_message(client.service, SERVICE, **args) -#~ print(etree.tostring(node, pretty_print=True).decode()) - - -class Ecodex(object): - - def __init__(self, auth, url): - self.auth = auth - self.url = url - self.codes = self.url['codes'] - self.error = '' - self.message = '' - self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) - self._plugins = None - self._history = None - if DEBUG: - self._history = HistoryPlugin() - self._plugins = [self._history] - - def _get_token(self, rfc): - client = Client(self.url['seguridad'], - transport=self._transport, plugins=self._plugins) - try: - result = client.service.ObtenerToken(rfc, self._get_epoch()) - except Fault as e: - self.error = str(e) - log.error(self.error) - return '' - - s = '{}|{}'.format(self.auth['ID'], result.Token) - return hashlib.sha1(s.encode()).hexdigest() - - def _get_token_rest(self, rfc): - data = { - 'rfc': rfc, - 'grant_type': 'authorization_token', - } - headers = {'Content-type': 'application/x-www-form-urlencoded'} - result = requests.post(URL['token'], data=data, headers=headers) - data = result.json() - s = '{}|{}'.format(AUTH['ID'], data['service_token']) - return hashlib.sha1(s.encode()).hexdigest(), data['access_token'] - - def _validate_xml(self, xml): - NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'} - if os.path.isfile(xml): - tree = etree.parse(xml).getroot() - else: - tree = etree.fromstring(xml.encode()) - - fecha = tree.get('Fecha') - rfc = tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI) - data = { - 'ComprobanteXML': etree.tostring(tree).decode(), - 'RFC': rfc, - 'Token': self._get_token(rfc), - 'TransaccionID': self._get_epoch(fecha), - } - return data - - def _get_by_hash(self, sh, rfc): - token, access_token = self._get_token_rest(rfc) - url = URL['hash'].format(sh) - headers = { - 'Authorization': 'Bearer {}'.format(access_token), - 'X-Auth-Token': token, - } - result = requests.get(url, headers=headers) - if result.status_code == 200: - print (result.json()) - return - - def timbra_xml(self, xml): - data = self._validate_xml(xml) - client = Client(self.url['timbra'], - transport=self._transport, plugins=self._plugins) - try: - result = client.service.TimbraXML(**data) - except Fault as e: - error = str(e) - if self.codes['HASH'] in error: - sh = error.split(' ')[3] - return self._get_by_hash(sh[:40], data['RFC']) - self.error = error - return '' - - tree = parseString(result.ComprobanteXML.DatosXML) - xml = tree.toprettyxml(encoding='utf-8').decode('utf-8') - return xml - - def _get_epoch(self, date=None): - if isinstance(date, str): - f = '%Y-%m-%dT%H:%M:%S' - e = int(time.mktime(time.strptime(date, f))) - else: - date = datetime.datetime.now() - e = int(time.mktime(date.timetuple())) - return e - - def estatus_cuenta(self, rfc): - #~ Codigos: - #~ 100 = Cuenta encontrada - #~ 101 = RFC no dado de alta en el sistema ECODEX - token = self._get_token(rfc) - if not token: - return {} - - data = { - 'RFC': rfc, - 'Token': token, - 'TransaccionID': self._get_epoch() - } - client = Client(URL['clients'], - transport=self._transport, plugins=self._plugins) - try: - result = client.service.EstatusCuenta(**data) - except Fault as e: - log.error(str(e)) - return - #~ print (result) - return result.Estatus - - -class Finkok(object): - - def __init__(self, auth={}): - self.codes = URL['codes'] - self.error = '' - self.message = '' - self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) - self._plugins = None - self._history = None - self.uuid = '' - self.fecha = None - if DEBUG: - self._history = HistoryPlugin() - self._plugins = [self._history] - self._auth = AUTH - else: - self._auth = auth - - def _debug(self): - if not DEBUG: - return - print('SEND', self._history.last_sent) - print('RESULT', self._history.last_received) - return - - def _check_result(self, method, result): - # ~ print ('CODE', result.CodEstatus) - # ~ print ('INCIDENCIAS', result.Incidencias) - self.message = '' - MSG = { - 'OK': 'Comprobante timbrado satisfactoriamente', - '307': 'Comprobante timbrado previamente', - } - status = result.CodEstatus - if status is None and result.Incidencias: - for i in result.Incidencias['Incidencia']: - self.error += 'Error: {}\n{}\n{}'.format( - i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo']) - return '' - - if method == 'timbra' and status in (MSG['OK'], MSG['307']): - #~ print ('UUID', result.UUID) - #~ print ('FECHA', result.Fecha) - if status == MSG['307']: - self.message = MSG['307'] - tree = parseString(result.xml) - response = tree.toprettyxml(encoding='utf-8').decode('utf-8') - self.uuid = result.UUID - self.fecha = result.Fecha - - return response - - def _load_file(self, path): - try: - with open(path, 'rb') as f: - data = f.read() - except Exception as e: - self.error = str(e) - return - return data - - def _validate_xml(self, file_xml): - if os.path.isfile(file_xml): - try: - with open(file_xml, 'rb') as f: - xml = f.read() - except Exception as e: - self.error = str(e) - return False, '' - else: - xml = file_xml.encode('utf-8') - return True, xml - - def _validate_uuid(self, uuid): - try: - UUID(uuid) - return True - except ValueError: - self.error = 'UUID no válido: {}'.format(uuid) - return False - - def timbra_xml(self, file_xml): - self.error = '' - - if not DEBUG and not self._auth: - self.error = 'Sin datos para timbrar' - return - - method = 'timbra' - ok, xml = self._validate_xml(file_xml) - if not ok: - return '' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - - args = { - 'username': self._auth['USER'], - 'password': self._auth['PASS'], - 'xml': xml, - } - if URL['quick_stamp']: - try: - result = client.service.quick_stamp(**args) - except Fault as e: - self.error = str(e) - return - else: - try: - result = client.service.stamp(**args) - except Fault as e: - self.error = str(e) - return - except TransportError as e: - if '413' in str(e): - self.error = '413

Documento muy grande para timbrar' - else: - self.error = str(e) - return - except ConnectionError as e: - msg = '502 - Error de conexión' - self.error = msg - return - - return self._check_result(method, result) - - def _get_xml(self, uuid): - if not self._validate_uuid(uuid): - return '' - - method = 'util' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - - args = { - 'username': self._auth['USER'], - 'password': self._auth['PASS'], - 'uuid': uuid, - 'taxpayer_id': self.rfc, - 'invoice_type': 'I', - } - try: - result = client.service.get_xml(**args) - except Fault as e: - self.error = str(e) - return '' - except TransportError as e: - self.error = str(e) - return '' - - if result.error: - self.error = result.error - return '' - - tree = parseString(result.xml) - xml = tree.toprettyxml(encoding='utf-8').decode('utf-8') - return xml - - def recupera_xml(self, file_xml='', uuid=''): - self.error = '' - if uuid: - return self._get_xml(uuid) - - method = 'timbra' - ok, xml = self._validate_xml(file_xml) - if not ok: - return '' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - try: - result = client.service.stamped( - xml, self._auth['user'], self._auth['pass']) - except Fault as e: - self.error = str(e) - return '' - - return self._check_result(method, result) - - def estatus_xml(self, uuid): - method = 'timbra' - if not self._validate_uuid(uuid): - return '' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - try: - result = client.service.query_pending( - self._auth['USER'], self._auth['PASS'], uuid) - return result.status - except Fault as e: - self.error = str(e) - return '' - - def cancel_xml(self, rfc, uuid, cer, key): - # ~ for u in uuids: - # ~ if not self._validate_uuid(u): - # ~ return '' - - method = 'cancel' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - uuid_type = client.get_type('ns1:UUIDS') - sa = client.get_type('ns0:stringArray') - - args = { - 'UUIDS': uuid_type(uuids=sa(string=uuid)), - 'username': self._auth['USER'], - 'password': self._auth['PASS'], - 'taxpayer_id': rfc, - 'cer': cer, - 'key': key, - 'store_pending': False, - } - try: - result = client.service.cancel(**args) - except Fault as e: - self.error = str(e) - return '' - - if result.CodEstatus and self.codes['205'] in result.CodEstatus: - self.error = result.CodEstatus - return '' - - return result - - def cancel_signature(self, file_xml): - method = 'cancel' - if os.path.isfile(file_xml): - root = etree.parse(file_xml).getroot() - else: - root = etree.fromstring(file_xml.encode()) - - xml = etree.tostring(root) - - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - - args = { - 'username': self._auth['USER'], - 'password': self._auth['PASS'], - 'xml': xml, - 'store_pending': False, - } - - try: - result = client.service.cancel_signature(**args) - return result - except Fault as e: - self.error = str(e) - return '' - - def get_acuse(self, rfc, uuids, type_acuse='C'): - for u in uuids: - if not self._validate_uuid(u): - return '' - - method = 'cancel' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - - args = { - 'username': self._auth['USER'], - 'password': self._auth['PASS'], - 'taxpayer_id': rfc, - 'uuid': '', - 'type': type_acuse, - } - try: - result = [] - for u in uuids: - args['uuid'] = u - r = client.service.get_receipt(**args) - result.append(r) - except Fault as e: - self.error = str(e) - return '' - - return result - - def estatus_cancel(self, uuids): - for u in uuids: - if not self._validate_uuid(u): - return '' - - method = 'cancel' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - - args = { - 'username': self._auth['USER'], - 'password': self._auth['PASS'], - 'uuid': '', - } - try: - result = [] - for u in uuids: - args['uuid'] = u - r = client.service.query_pending_cancellation(**args) - result.append(r) - except Fault as e: - self.error = str(e) - return '' - - return result - - def add_token(self, rfc, email): - """Agrega un nuevo token al cliente para timbrado. - Se requiere cuenta de reseller para usar este método - - Args: - rfc (str): El RFC del cliente, ya debe existir - email (str): El correo del cliente, funciona como USER al timbrar - - Returns: - dict - 'username': 'username', - 'status': True or False - 'name': 'name', - 'success': True or False - 'token': 'Token de timbrado', - 'message': None - """ - auth = AUTH['RESELLER'] - - method = 'util' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - args = { - 'username': auth['USER'], - 'password': auth['PASS'], - 'name': rfc, - 'token_username': email, - 'taxpayer_id': rfc, - 'status': True, - } - try: - result = client.service.add_token(**args) - except Fault as e: - self.error = str(e) - return '' - - return result - - def get_date(self): - method = 'util' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - try: - result = client.service.datetime(AUTH['USER'], AUTH['PASS']) - except Fault as e: - self.error = str(e) - return '' - - if result.error: - self.error = result.error - return - - return result.datetime - - def add_client(self, rfc, type_user=False): - """Agrega un nuevo cliente para timbrado. - Se requiere cuenta de reseller para usar este método - - Args: - rfc (str): El RFC del nuevo cliente - - Kwargs: - type_user (bool): False == 'P' == Prepago or True == 'O' == On demand - - Returns: - dict - 'message': - 'Account Created successfully' - 'Account Already exists' - 'success': True or False - """ - auth = AUTH['RESELLER'] - - tu = {False: 'P', True: 'O'} - method = 'client' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - args = { - 'reseller_username': auth['USER'], - 'reseller_password': auth['PASS'], - 'taxpayer_id': rfc, - 'type_user': tu[type_user], - 'added': datetime.datetime.now().isoformat()[:19], - } - try: - result = client.service.add(**args) - except Fault as e: - self.error = str(e) - return '' - - return result - - def edit_client(self, rfc, status=True): - """ - Se requiere cuenta de reseller para usar este método - status = 'A' or 'S' - """ - auth = AUTH['RESELLER'] - - sv = {False: 'S', True: 'A'} - method = 'client' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - args = { - 'reseller_username': auth['USER'], - 'reseller_password': auth['PASS'], - 'taxpayer_id': rfc, - 'status': sv[status], - } - try: - result = client.service.edit(**args) - except Fault as e: - self.error = str(e) - return '' - - return result - - def get_client(self, rfc): - """Regresa el estatus del cliente - . - Se requiere cuenta de reseller para usar este método - - Args: - rfc (str): El RFC del emisor - - Returns: - dict - 'message': None, - 'users': { - 'ResellerUser': [ - { - 'status': 'A', - 'counter': 0, - 'taxpayer_id': '', - 'credit': 0 - } - ] - } or None si no existe - """ - auth = AUTH['RESELLER'] - - method = 'client' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - args = { - 'reseller_username': auth['USER'], - 'reseller_password': auth['PASS'], - 'taxpayer_id': rfc, - } - - try: - result = client.service.get(**args) - except Fault as e: - self.error = str(e) - return '' - except TransportError as e: - self.error = str(e) - return '' - - return result - - def assign_client(self, rfc, credit): - """Agregar credito a un emisor - - Se requiere cuenta de reseller para usar este método - - Args: - rfc (str): El RFC del emisor, debe existir - credit (int): Cantidad de folios a agregar - - Returns: - dict - 'success': True or False, - 'credit': nuevo credito despues de agregar or None - 'message': - 'Success, added {credit} of credit to {RFC}' - 'RFC no encontrado' - """ - auth = AUTH['RESELLER'] - - method = 'client' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - args = { - 'username': auth['USER'], - 'password': auth['PASS'], - 'taxpayer_id': rfc, - 'credit': credit, - } - try: - result = client.service.assign(**args) - except Fault as e: - self.error = str(e) - return '' - - return result - - def client_get_timbres(self, rfc): - method = 'client' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - args = { - 'reseller_username': self._auth['USER'], - 'reseller_password': self._auth['PASS'], - 'taxpayer_id': rfc, - } - - try: - self.result = client.service.get(**args) - except Fault as e: - self.error = str(e) - return 0 - except TransportError as e: - self.error = str(e) - return 0 - except ConnectionError: - self.error = 'Verifica la conexión a internet' - return 0 - - success = bool(self.result.users) - if not success: - self.error = self.result.message or 'RFC no existe' - return 0 - - return self.result.users.ResellerUser[0].credit - - -def _get_data_sat(path): - BF = 'string(//*[local-name()="{}"]/@{})' - NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'} - - try: - if os.path.isfile(path): - tree = etree.parse(path).getroot() - else: - tree = etree.fromstring(path.encode()) - - data = {} - emisor = escape( - tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or - tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI) - ) - receptor = escape( - tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or - tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI) - ) - data['total'] = tree.get('total') or tree.get('Total') - data['emisor'] = emisor - data['receptor'] = receptor - data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID')) - except Exception as e: - print (e) - return {} - - return '?re={emisor}&rr={receptor}&tt={total}&id={uuid}'.format(**data) - - -def get_status_sat(xml): - data = _get_data_sat(xml) - if not data: - return 'XML inválido' - - data = """ - - - - - - {} - - - - """.format(data) - headers = { - 'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"', - 'Content-type': 'text/xml; charset="UTF-8"' - } - URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc' - - try: - result = requests.post(URL, data=data, headers=headers) - tree = etree.fromstring(result.text) - node = tree.xpath("//*[local-name() = 'Estado']")[0] - except Exception as e: - return 'Error: {}'.format(str(e)) - - return node.text - - -def main(): - return - - -if __name__ == '__main__': - main() diff --git a/source/app/controllers/pacs/__init__.py b/source/app/controllers/pacs/__init__.py index afe806b..05445b6 100644 --- a/source/app/controllers/pacs/__init__.py +++ b/source/app/controllers/pacs/__init__.py @@ -1,3 +1,4 @@ #!/usr/bin/env python from .comerciodigital import PACComercioDigital +from .finkok import PACFinkok diff --git a/source/app/controllers/cfdi_cert.py b/source/app/controllers/pacs/cfdi_cert.py similarity index 98% rename from source/app/controllers/cfdi_cert.py rename to source/app/controllers/pacs/cfdi_cert.py index 295711b..a19fe7e 100644 --- a/source/app/controllers/cfdi_cert.py +++ b/source/app/controllers/pacs/cfdi_cert.py @@ -15,12 +15,7 @@ from cryptography.x509.oid import ExtensionOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding - -try: - from .conf import TOKEN -except ImportError: - TOKEN = '' - print('Agrega el TOKEN al archivo conf.py, obligatorio en v1.41.0') +from .conf import TOKEN class SATCertificate(object): diff --git a/source/app/controllers/pacs/conf.py.example b/source/app/controllers/pacs/conf.py.example new file mode 100644 index 0000000..deb384c --- /dev/null +++ b/source/app/controllers/pacs/conf.py.example @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + + +DEBUG = False + +TOKEN = '' diff --git a/source/app/controllers/pacs/finkok/__init__.py b/source/app/controllers/pacs/finkok/__init__.py new file mode 100644 index 0000000..1b61fc3 --- /dev/null +++ b/source/app/controllers/pacs/finkok/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python3 + +from .finkok import PACFinkok diff --git a/source/app/controllers/pacs/finkok/conf.py.example b/source/app/controllers/pacs/finkok/conf.py.example new file mode 100644 index 0000000..af7b74c --- /dev/null +++ b/source/app/controllers/pacs/finkok/conf.py.example @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + + +# ~ Siempre consulta la documentación de PAC +# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC +# ~ NO cambies las credenciales de prueba + + +DEBUG = True + + +AUTH = { + 'user': '', + 'pass': '', + 'RESELLER': { + 'user': '', + 'pass': '' + } +} + + +if DEBUG: + AUTH = { + 'user': 'pruebas-finkok@correolibre.net', + 'pass': '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366', + 'RESELLER': { + 'user': '', + 'pass': '' + } + } diff --git a/source/app/controllers/pacs/finkok/finkok.py b/source/app/controllers/pacs/finkok/finkok.py new file mode 100644 index 0000000..8c7e16b --- /dev/null +++ b/source/app/controllers/pacs/finkok/finkok.py @@ -0,0 +1,549 @@ +#!/usr/bin/env python +# ~ +# ~ PAC +# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net +# ~ +# ~ This program is free software: you can redistribute it and/or modify +# ~ it under the terms of the GNU General Public License as published by +# ~ the Free Software Foundation, either version 3 of the License, or +# ~ (at your option) any later version. +# ~ +# ~ This program is distributed in the hope that it will be useful, +# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of +# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# ~ GNU General Public License for more details. +# ~ +# ~ You should have received a copy of the GNU General Public License +# ~ along with this program. If not, see . + +import base64 +import datetime +import logging +import os +import re +from io import BytesIO +from xml.sax.saxutils import unescape + +import lxml.etree as ET +from zeep import Client +from zeep.plugins import Plugin +from zeep.cache import SqliteCache +from zeep.transports import Transport +from zeep.exceptions import Fault, TransportError +from requests.exceptions import ConnectionError + +from .conf import DEBUG, AUTH + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) +logging.getLogger('requests').setLevel(logging.ERROR) +logging.getLogger('zeep').setLevel(logging.ERROR) + + +TIMEOUT = 10 +DEBUG_SOAP = True + + +class DebugPlugin(Plugin): + + def _to_string(self, envelope, name): + if DEBUG_SOAP: + data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode() + path = f'/tmp/soap_{name}.xml' + with open(path, 'w') as f: + f.write(data) + return + + def egress(self, envelope, http_headers, operation, binding_options): + self._to_string(envelope, 'request') + return envelope, http_headers + + def ingress(self, envelope, http_headers, operation): + self._to_string(envelope, 'response') + return envelope, http_headers + + +class PACFinkok(object): + WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl' + if DEBUG: + WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl' + URL = { + 'quick_stamp': False, + 'timbra': WS.format('stamp'), + 'cancel': WS.format('cancel'), + 'client': WS.format('registration'), + 'util': WS.format('utilities'), + } + CODE = { + '200': 'Comprobante timbrado satisfactoriamente', + '205': 'No Encontrado', + '307': 'Comprobante timbrado previamente', + '702': 'No se encontro el RFC del emisor', + 'IP': 'Invalid Passphrase', + 'IPMSG': 'Frase de paso inválida', + 'NE': 'No Encontrado', + } + + def __init__(self): + self._error = '' + self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) + self._plugins = [DebugPlugin()] + + @property + def error(self): + return self._error + + def _validate_result(self, result): + if hasattr(result, 'CodEstatus'): + ce = result.CodEstatus + if ce is None: + return result + + if ce == self.CODE['IP']: + self._error = self.CODE['IPMSG'] + return {} + + if self.CODE['NE'] in ce: + self._error = 'UUID ' + self.CODE['NE'] + return {} + + if self.CODE['200'] != ce: + log.error('CodEstatus', type(ce), ce) + return result + + if hasattr(result, 'Incidencias'): + fault = result.Incidencias.Incidencia[0] + cod_error = fault.CodigoError.encode('utf-8') + msg_error = fault.MensajeIncidencia.encode('utf-8') + error = 'Error: {}\n{}'.format(cod_error, msg_error) + self._error = self.CODE.get(cod_error, error) + return {} + + return result + + def _get_result(self, client, method, args): + self._error = '' + try: + result = getattr(client.service, method)(**args) + except Fault as e: + self._error = str(e) + return {} + except TransportError as e: + if '413' in str(e): + self._error = '413

Documento muy grande para timbrar' + else: + self._error = str(e) + return {} + except ConnectionError as e: + msg = '502 - Error de conexión' + self._error = msg + return {} + + return self._validate_result(result) + + def _to_string(self, data): + root = ET.parse(BytesIO(data.encode('utf-8'))).getroot() + xml = ET.tostring(root, + pretty_print=True, xml_declaration=True, encoding='utf-8') + return xml.decode('utf-8') + + def stamp(self, cfdi, auth={}): + if DEBUG or not auth: + auth = AUTH + + method = 'timbra' + client = Client(self.URL[method], + transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['user'], + 'password': auth['pass'], + 'xml': cfdi.encode('utf-8'), + } + result = self._get_result(client, 'stamp', args) + if self.error: + log.error(self.error) + return '' + + data = { + 'xml': self._to_string(result.xml), + 'uuid': result.UUID, + 'date': result.Fecha, + } + return data + + def _get_data_cancel(self, cfdi): + NS_CFDI = { + 'cfdi': 'http://www.sat.gob.mx/cfd/3', + 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital', + } + tree = ET.fromstring(cfdi.encode()) + rfc_emisor = tree.xpath( + 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)', + namespaces=NS_CFDI) + cfdi_uuid = tree.xpath( + 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)', + namespaces=NS_CFDI) + return rfc_emisor, cfdi_uuid + + def cancel(self, cfdi, info, auth={}): + if not auth: + auth = AUTH + + rfc_emisor, cfdi_uuid = self._get_data_cancel(cfdi) + method = 'cancel' + client = Client(self.URL[method], + transport=self._transport, plugins=self._plugins) + uuid_type = client.get_type('ns1:UUIDS') + sa = client.get_type('ns0:stringArray') + + args = { + 'UUIDS': uuid_type(uuids=sa(string=cfdi_uuid)), + 'username': auth['user'], + 'password': auth['pass'], + 'taxpayer_id': rfc_emisor, + 'cer': info['cer'], + 'key': info['key'], + 'store_pending': False, + } + + result = self._get_result(client, 'cancel', args) + if self.error: + log.error(self.error) + return '' + + folio = result['Folios']['Folio'][0] + status = folio['EstatusUUID'] + if status != '201': + log.debug(f'Cancel status: {status} - {cfdi_uuid}') + + data = { + 'acuse': result['Acuse'], + 'date': result['Fecha'], + } + return data + + def cancel_xml(self, xml, auth={}): + if not auth: + auth = AUTH + + method = 'cancel' + client = Client(self.URL[method], + transport=self._transport, plugins=self._plugins) + client.set_ns_prefix('can', 'http://facturacion.finkok.com/cancel') + # ~ xml = f'\n{xml}' + # ~ xml = f'\n{xml}' + args = { + 'xml': xml.encode(), + 'username': auth['user'], + 'password': auth['pass'], + 'store_pending': False, + } + result = self._get_result(client, 'cancel_signature', args) + if self.error: + log.error(self.error) + return '' + + folio = result['Folios']['Folio'][0] + status = folio['EstatusUUID'] + if status != '201': + log.debug(f'Cancel status: {status} -') + + data = { + 'acuse': result['Acuse'], + 'date': result['Fecha'], + } + return data + + def client_add(self, rfc, type_user=False): + """Agrega un nuevo cliente para timbrado. + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del nuevo cliente + + Kwargs: + type_user (bool): + False == 'P' == Prepago + True == 'O' == On demand + + Returns: + True or False + + origin PAC + 'message': + 'Account Created successfully' + 'Account Already exists' + 'success': True or False + """ + auth = AUTH['RESELLER'] + tu = {True: 'O', False: 'P'} + + method = 'client' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + + args = { + 'reseller_username': auth['user'], + 'reseller_password': auth['pass'], + 'taxpayer_id': rfc, + 'type_user': tu[type_user], + 'added': datetime.datetime.now().isoformat()[:19], + } + + result = self._get_result(client, 'add', args) + if self.error: + return False + + if not result.success: + self.error = result.message + return False + + # ~ PAC success debería ser False + msg = 'Account Already exists' + if result.message == msg: + self.error = msg + return True + + return result.success + + def client_get_token(self, rfc, email): + """Genera un nuevo token al cliente para timbrado. + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del cliente, ya debe existir + email (str): El correo del cliente, funciona como USER al timbrar + + Returns: + token (str): Es la contraseña para timbrar + + origin PAC + dict + 'username': 'username', + 'status': True or False + 'name': 'name', + 'success': True or False + 'token': 'Token de timbrado', + 'message': None + """ + auth = AUTH['RESELLER'] + method = 'util' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['user'], + 'password': auth['pass'], + 'name': rfc, + 'token_username': email, + 'taxpayer_id': rfc, + 'status': True, + } + + result = self._get_result(client, 'add_token', args) + if self.error: + log.error(self.error) + return '' + + if not result.success: + self.error = result.message + log.error(self.error) + return '' + + return result.token + + def client_add_timbres(self, rfc, credit): + """Agregar credito a un emisor + + Se requiere cuenta de reseller + + Args: + rfc (str): El RFC del emisor, debe existir + credit (int): Cantidad de folios a agregar + + Returns: + dict + 'success': True or False, + 'credit': nuevo credito despues de agregar or None + 'message': + 'Success, added {credit} of credit to {RFC}.' + 'RFC no encontrado' + """ + auth = AUTH['RESELLER'] + + method = 'client' + client = Client( + self.URL[method], transport=self._transport, plugins=self._plugins) + args = { + 'username': auth['user'], + 'password': auth['pass'], + 'taxpayer_id': rfc, + 'credit': credit, + } + + result = self._get_result(client, 'assign', args) + if self.error: + log.error(error) + return '' + + if not result.success: + self.error = result.message + return 0 + + return result.credit + + def client_balance(self, auth={}, rfc=''): + """Regresa los timbres restantes del cliente + Se pueden usar las credenciales de relleser o las credenciales del emisor + + Args: + rfc (str): El RFC del emisor + + Kwargs: + auth (dict): Credenciales del emisor + + Returns: + int Cantidad de timbres restantes + """ + + if not auth: + auth = AUTH['RESELLER'] + + method = 'client' + client = Client(self.URL[method], + transport=self._transport, plugins=self._plugins) + args = { + 'reseller_username': auth['user'], + 'reseller_password': auth['pass'], + 'taxpayer_id': rfc, + } + + result = self._get_result(client, 'get', args) + if self.error: + log.error(self.error) + return '' + + success = bool(result.users) + if not success: + self.error = result.message or 'RFC no existe' + return 0 + + return result.users.ResellerUser[0].credit + + def client_set_status(self, rfc, status): + """Edita el estatus (Activo o Suspendido) de un cliente + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del cliente + + Kwargs: + status (bool): + True == 'A' == Activo + False == 'S' == Suspendido + + Returns: + dict + 'message': + 'Account Created successfully' + 'Account Already exists' + 'success': True or False + """ + auth = AUTH['RESELLER'] + ts = {True: 'A', False: 'S'} + method = 'client' + client = Client(self.URL[method], + transport=self._transport, plugins=self._plugins) + + args = { + 'reseller_username': auth['user'], + 'reseller_password': auth['pass'], + 'taxpayer_id': rfc, + 'status': ts[status], + } + result = self._get_result(client, 'edit', args) + + if self.error: + return False + + if not result.success: + self.error = result.message + return False + + return True + + def client_switch(self, rfc, type_user): + """Edita el tipo de timbrado (OnDemand o Prepago) de un cliente + Se requiere cuenta de reseller para usar este método + + Args: + rfc (str): El RFC del cliente + + Kwargs: + status (bool): + True == 'O' == OnDemand + False == 'P' == Prepago + + Returns: + dict + 'message': + 'Account Created successfully' + 'Account Already exists' + 'success': True or False + """ + auth = AUTH['RESELLER'] + tu = {True: 'O', False: 'P'} + method = 'client' + client = Client(self.URL[method], + transport=self._transport, plugins=self._plugins) + + args = { + 'username': auth['user'], + 'password': auth['pass'], + 'taxpayer_id': rfc, + 'type_user': tu[type_user], + } + result = self._get_result(client, 'switch', args) + + if self.error: + return False + + if not result.success: + self.error = result.message + return False + + return True + + def client_report_folios(self, rfc, date_from, date_to, invoice_type='I'): + """Obtiene un reporte del total de facturas timbradas + """ + auth = AUTH['RESELLER'] + + args = { + 'username': auth['user'], + 'password': auth['pass'], + 'taxpayer_id': rfc, + 'date_from': date_from, + 'date_to': date_to, + 'invoice_type': invoice_type, + } + + method = 'util' + client = Client(self.URL[method], + transport=self._transport, plugins=self._plugins) + + result = self._get_result(client, 'report_total', args) + + if result.result is None: + # ~ PAC - Debería regresar RFC inexistente o sin registros + self.error = 'RFC no existe o no tiene registros' + return 0 + + total = result.result.ReportTotal[0].total + + return total diff --git a/source/app/controllers/util.py b/source/app/controllers/util.py index 9b6dd25..af95d32 100644 --- a/source/app/controllers/util.py +++ b/source/app/controllers/util.py @@ -73,7 +73,7 @@ from settings import USAR_TOKEN, API, DECIMALES_TAX # ~ v2 -from .cfdi_cert import SATCertificate +from .pacs.cfdi_cert import SATCertificate from settings import ( EXT, diff --git a/source/app/controllers/utils.py b/source/app/controllers/utils.py index 274635c..76275ff 100644 --- a/source/app/controllers/utils.py +++ b/source/app/controllers/utils.py @@ -52,11 +52,9 @@ from .cfdi_xml import CFDI from settings import DEBUG, DB_COMPANIES, PATHS, TEMPLATE_CANCEL -from .cfdi_cert import SATCertificate +from .pacs.cfdi_cert import SATCertificate from .pacs import PACComercioDigital -# ~ from .pacs import PACFinkok -from .pac import Finkok as PACFinkok -# ~ from .finkok import PACFinkok +from .pacs import PACFinkok LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' diff --git a/source/app/models/main.py b/source/app/models/main.py index 0d2dcb1..255a754 100644 --- a/source/app/models/main.py +++ b/source/app/models/main.py @@ -3917,24 +3917,24 @@ class Facturas(BaseModel): query.execute() return - def _cancel_signature(self, id): - msg = 'Factura cancelada correctamente' - auth = Emisor.get_auth() - certificado = Certificado.select()[0] - obj = Facturas.get(Facturas.id==id) - data, result = util.cancel_signature( - obj.uuid, certificado.p12, certificado.rfc, auth) - if data['ok']: - obj.estatus = 'Cancelada' - obj.error = '' - obj.cancelada = True - obj.fecha_cancelacion = result['Fecha'] - obj.acuse = result['Acuse'] - self._actualizar_saldo_cliente(self, obj, True) - else: - obj.error = data['msg'] - obj.save() - return data + # ~ def _cancel_signature(self, id): + # ~ msg = 'Factura cancelada correctamente' + # ~ auth = Emisor.get_auth() + # ~ certificado = Certificado.select()[0] + # ~ obj = Facturas.get(Facturas.id==id) + # ~ data, result = util.cancel_signature( + # ~ obj.uuid, certificado.p12, certificado.rfc, auth) + # ~ if data['ok']: + # ~ obj.estatus = 'Cancelada' + # ~ obj.error = '' + # ~ obj.cancelada = True + # ~ obj.fecha_cancelacion = result['Fecha'] + # ~ obj.acuse = result['Acuse'] + # ~ self._actualizar_saldo_cliente(self, obj, True) + # ~ else: + # ~ obj.error = data['msg'] + # ~ obj.save() + # ~ return data def _get_filters(self, values): if 'start' in values and 'end' in values: