diff --git a/source/app/controllers/conf.py.example b/source/app/controllers/conf.py.example
deleted file mode 100644
index 2a015b5..0000000
--- a/source/app/controllers/conf.py.example
+++ /dev/null
@@ -1,13 +0,0 @@
-#!/usr/bin/env python3
-
-
-DEBUG = False
-
-#~ Ecodex
-ID_INTEGRADOR = ''
-
-#~ Finkok
-FINKOK= {
- 'USER': '',
- 'PASS': '',
-}
diff --git a/source/app/controllers/configpac.py b/source/app/controllers/configpac.py
deleted file mode 100644
index d4c70a0..0000000
--- a/source/app/controllers/configpac.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#!/usr/bin/env python3
-
-
-from .conf import DEBUG, FINKOK
-
-DEBUG = DEBUG
-TIMEOUT = 10
-
-#~ PACs que han proporcionado un entorno de pruebas libre y abierto
-#~ ecodex, finkok
-PAC = 'finkok'
-
-
-#~ IMPORTANTE: Si quieres hacer pruebas, con tu propio correo de usuario y
-#~ contraseña, ponte en contacto con Finkok para que te asignen tus datos de
-#~ acceso, consulta su documentación para ver las diferentes opciones de acceso.
-#~ Si solo estas haciendo pruebas de timbrado y ancelación, con estos datos debería
-#~ ser suficiente.
-def finkok(debug):
- USER = FINKOK['USER']
- PASS = FINKOK['PASS']
- TOKEN = ''
- auth = {
- 'DEBUG': debug,
- 'USER': '',
- 'PASS': TOKEN or PASS,
- 'RESELLER': {'USER': USER, 'PASS': PASS}
- }
- if debug:
- USER = 'pruebas-finkok@correolibre.net'
- PASS = ''
- TOKEN = '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366'
- auth = {
- 'DEBUG': debug,
- 'USER': USER,
- 'PASS': TOKEN or PASS,
- 'RESELLER': {
- 'USER': '',
- 'PASS': ''
- }
- }
-
- base_url = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
- if debug:
- base_url = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
- url = {
- 'timbra': base_url.format('stamp'),
- 'quick_stamp': False,
- 'cancel': base_url.format('cancel'),
- 'client': base_url.format('registration'),
- 'util': base_url.format('utilities'),
- 'codes': {
- '200': 'Comprobante timbrado satisfactoriamente',
- '307': 'Comprobante timbrado previamente',
- '205': 'No Encontrado',
- }
- }
- return auth, url
-
-
-AUTH, URL = globals()[PAC](DEBUG)
-
diff --git a/source/app/controllers/pac.py b/source/app/controllers/pac.py
deleted file mode 100644
index 9242773..0000000
--- a/source/app/controllers/pac.py
+++ /dev/null
@@ -1,755 +0,0 @@
-#!/usr/bin/env python3
-
-#~ import re
-#~ from xml.etree import ElementTree as ET
-#~ from requests import Request, Session, exceptions
-import datetime
-import hashlib
-import os
-import requests
-import time
-from lxml import etree
-from xml.dom.minidom import parseString
-from xml.sax.saxutils import escape, unescape
-from uuid import UUID
-
-from logbook import Logger
-from zeep import Client
-from zeep.plugins import HistoryPlugin
-from zeep.cache import SqliteCache
-from zeep.transports import Transport
-from zeep.exceptions import Fault, TransportError
-from requests.exceptions import ConnectionError
-
-
-if __name__ == '__main__':
- from configpac import DEBUG, TIMEOUT, AUTH, URL
-else:
- from .configpac import DEBUG, TIMEOUT, AUTH, URL
-
-
-log = Logger('PAC')
-#~ node = client.create_message(client.service, SERVICE, **args)
-#~ print(etree.tostring(node, pretty_print=True).decode())
-
-
-class Ecodex(object):
-
- def __init__(self, auth, url):
- self.auth = auth
- self.url = url
- self.codes = self.url['codes']
- self.error = ''
- self.message = ''
- self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
- self._plugins = None
- self._history = None
- if DEBUG:
- self._history = HistoryPlugin()
- self._plugins = [self._history]
-
- def _get_token(self, rfc):
- client = Client(self.url['seguridad'],
- transport=self._transport, plugins=self._plugins)
- try:
- result = client.service.ObtenerToken(rfc, self._get_epoch())
- except Fault as e:
- self.error = str(e)
- log.error(self.error)
- return ''
-
- s = '{}|{}'.format(self.auth['ID'], result.Token)
- return hashlib.sha1(s.encode()).hexdigest()
-
- def _get_token_rest(self, rfc):
- data = {
- 'rfc': rfc,
- 'grant_type': 'authorization_token',
- }
- headers = {'Content-type': 'application/x-www-form-urlencoded'}
- result = requests.post(URL['token'], data=data, headers=headers)
- data = result.json()
- s = '{}|{}'.format(AUTH['ID'], data['service_token'])
- return hashlib.sha1(s.encode()).hexdigest(), data['access_token']
-
- def _validate_xml(self, xml):
- NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
- if os.path.isfile(xml):
- tree = etree.parse(xml).getroot()
- else:
- tree = etree.fromstring(xml.encode())
-
- fecha = tree.get('Fecha')
- rfc = tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
- data = {
- 'ComprobanteXML': etree.tostring(tree).decode(),
- 'RFC': rfc,
- 'Token': self._get_token(rfc),
- 'TransaccionID': self._get_epoch(fecha),
- }
- return data
-
- def _get_by_hash(self, sh, rfc):
- token, access_token = self._get_token_rest(rfc)
- url = URL['hash'].format(sh)
- headers = {
- 'Authorization': 'Bearer {}'.format(access_token),
- 'X-Auth-Token': token,
- }
- result = requests.get(url, headers=headers)
- if result.status_code == 200:
- print (result.json())
- return
-
- def timbra_xml(self, xml):
- data = self._validate_xml(xml)
- client = Client(self.url['timbra'],
- transport=self._transport, plugins=self._plugins)
- try:
- result = client.service.TimbraXML(**data)
- except Fault as e:
- error = str(e)
- if self.codes['HASH'] in error:
- sh = error.split(' ')[3]
- return self._get_by_hash(sh[:40], data['RFC'])
- self.error = error
- return ''
-
- tree = parseString(result.ComprobanteXML.DatosXML)
- xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
- return xml
-
- def _get_epoch(self, date=None):
- if isinstance(date, str):
- f = '%Y-%m-%dT%H:%M:%S'
- e = int(time.mktime(time.strptime(date, f)))
- else:
- date = datetime.datetime.now()
- e = int(time.mktime(date.timetuple()))
- return e
-
- def estatus_cuenta(self, rfc):
- #~ Codigos:
- #~ 100 = Cuenta encontrada
- #~ 101 = RFC no dado de alta en el sistema ECODEX
- token = self._get_token(rfc)
- if not token:
- return {}
-
- data = {
- 'RFC': rfc,
- 'Token': token,
- 'TransaccionID': self._get_epoch()
- }
- client = Client(URL['clients'],
- transport=self._transport, plugins=self._plugins)
- try:
- result = client.service.EstatusCuenta(**data)
- except Fault as e:
- log.error(str(e))
- return
- #~ print (result)
- return result.Estatus
-
-
-class Finkok(object):
-
- def __init__(self, auth={}):
- self.codes = URL['codes']
- self.error = ''
- self.message = ''
- self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
- self._plugins = None
- self._history = None
- self.uuid = ''
- self.fecha = None
- if DEBUG:
- self._history = HistoryPlugin()
- self._plugins = [self._history]
- self._auth = AUTH
- else:
- self._auth = auth
-
- def _debug(self):
- if not DEBUG:
- return
- print('SEND', self._history.last_sent)
- print('RESULT', self._history.last_received)
- return
-
- def _check_result(self, method, result):
- # ~ print ('CODE', result.CodEstatus)
- # ~ print ('INCIDENCIAS', result.Incidencias)
- self.message = ''
- MSG = {
- 'OK': 'Comprobante timbrado satisfactoriamente',
- '307': 'Comprobante timbrado previamente',
- }
- status = result.CodEstatus
- if status is None and result.Incidencias:
- for i in result.Incidencias['Incidencia']:
- self.error += 'Error: {}\n{}\n{}'.format(
- i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo'])
- return ''
-
- if method == 'timbra' and status in (MSG['OK'], MSG['307']):
- #~ print ('UUID', result.UUID)
- #~ print ('FECHA', result.Fecha)
- if status == MSG['307']:
- self.message = MSG['307']
- tree = parseString(result.xml)
- response = tree.toprettyxml(encoding='utf-8').decode('utf-8')
- self.uuid = result.UUID
- self.fecha = result.Fecha
-
- return response
-
- def _load_file(self, path):
- try:
- with open(path, 'rb') as f:
- data = f.read()
- except Exception as e:
- self.error = str(e)
- return
- return data
-
- def _validate_xml(self, file_xml):
- if os.path.isfile(file_xml):
- try:
- with open(file_xml, 'rb') as f:
- xml = f.read()
- except Exception as e:
- self.error = str(e)
- return False, ''
- else:
- xml = file_xml.encode('utf-8')
- return True, xml
-
- def _validate_uuid(self, uuid):
- try:
- UUID(uuid)
- return True
- except ValueError:
- self.error = 'UUID no válido: {}'.format(uuid)
- return False
-
- def timbra_xml(self, file_xml):
- self.error = ''
-
- if not DEBUG and not self._auth:
- self.error = 'Sin datos para timbrar'
- return
-
- method = 'timbra'
- ok, xml = self._validate_xml(file_xml)
- if not ok:
- return ''
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
-
- args = {
- 'username': self._auth['USER'],
- 'password': self._auth['PASS'],
- 'xml': xml,
- }
- if URL['quick_stamp']:
- try:
- result = client.service.quick_stamp(**args)
- except Fault as e:
- self.error = str(e)
- return
- else:
- try:
- result = client.service.stamp(**args)
- except Fault as e:
- self.error = str(e)
- return
- except TransportError as e:
- if '413' in str(e):
- self.error = '413
Documento muy grande para timbrar'
- else:
- self.error = str(e)
- return
- except ConnectionError as e:
- msg = '502 - Error de conexión'
- self.error = msg
- return
-
- return self._check_result(method, result)
-
- def _get_xml(self, uuid):
- if not self._validate_uuid(uuid):
- return ''
-
- method = 'util'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
-
- args = {
- 'username': self._auth['USER'],
- 'password': self._auth['PASS'],
- 'uuid': uuid,
- 'taxpayer_id': self.rfc,
- 'invoice_type': 'I',
- }
- try:
- result = client.service.get_xml(**args)
- except Fault as e:
- self.error = str(e)
- return ''
- except TransportError as e:
- self.error = str(e)
- return ''
-
- if result.error:
- self.error = result.error
- return ''
-
- tree = parseString(result.xml)
- xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
- return xml
-
- def recupera_xml(self, file_xml='', uuid=''):
- self.error = ''
- if uuid:
- return self._get_xml(uuid)
-
- method = 'timbra'
- ok, xml = self._validate_xml(file_xml)
- if not ok:
- return ''
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- try:
- result = client.service.stamped(
- xml, self._auth['user'], self._auth['pass'])
- except Fault as e:
- self.error = str(e)
- return ''
-
- return self._check_result(method, result)
-
- def estatus_xml(self, uuid):
- method = 'timbra'
- if not self._validate_uuid(uuid):
- return ''
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- try:
- result = client.service.query_pending(
- self._auth['USER'], self._auth['PASS'], uuid)
- return result.status
- except Fault as e:
- self.error = str(e)
- return ''
-
- def cancel_xml(self, rfc, uuid, cer, key):
- # ~ for u in uuids:
- # ~ if not self._validate_uuid(u):
- # ~ return ''
-
- method = 'cancel'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- uuid_type = client.get_type('ns1:UUIDS')
- sa = client.get_type('ns0:stringArray')
-
- args = {
- 'UUIDS': uuid_type(uuids=sa(string=uuid)),
- 'username': self._auth['USER'],
- 'password': self._auth['PASS'],
- 'taxpayer_id': rfc,
- 'cer': cer,
- 'key': key,
- 'store_pending': False,
- }
- try:
- result = client.service.cancel(**args)
- except Fault as e:
- self.error = str(e)
- return ''
-
- if result.CodEstatus and self.codes['205'] in result.CodEstatus:
- self.error = result.CodEstatus
- return ''
-
- return result
-
- def cancel_signature(self, file_xml):
- method = 'cancel'
- if os.path.isfile(file_xml):
- root = etree.parse(file_xml).getroot()
- else:
- root = etree.fromstring(file_xml.encode())
-
- xml = etree.tostring(root)
-
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
-
- args = {
- 'username': self._auth['USER'],
- 'password': self._auth['PASS'],
- 'xml': xml,
- 'store_pending': False,
- }
-
- try:
- result = client.service.cancel_signature(**args)
- return result
- except Fault as e:
- self.error = str(e)
- return ''
-
- def get_acuse(self, rfc, uuids, type_acuse='C'):
- for u in uuids:
- if not self._validate_uuid(u):
- return ''
-
- method = 'cancel'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
-
- args = {
- 'username': self._auth['USER'],
- 'password': self._auth['PASS'],
- 'taxpayer_id': rfc,
- 'uuid': '',
- 'type': type_acuse,
- }
- try:
- result = []
- for u in uuids:
- args['uuid'] = u
- r = client.service.get_receipt(**args)
- result.append(r)
- except Fault as e:
- self.error = str(e)
- return ''
-
- return result
-
- def estatus_cancel(self, uuids):
- for u in uuids:
- if not self._validate_uuid(u):
- return ''
-
- method = 'cancel'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
-
- args = {
- 'username': self._auth['USER'],
- 'password': self._auth['PASS'],
- 'uuid': '',
- }
- try:
- result = []
- for u in uuids:
- args['uuid'] = u
- r = client.service.query_pending_cancellation(**args)
- result.append(r)
- except Fault as e:
- self.error = str(e)
- return ''
-
- return result
-
- def add_token(self, rfc, email):
- """Agrega un nuevo token al cliente para timbrado.
- Se requiere cuenta de reseller para usar este método
-
- Args:
- rfc (str): El RFC del cliente, ya debe existir
- email (str): El correo del cliente, funciona como USER al timbrar
-
- Returns:
- dict
- 'username': 'username',
- 'status': True or False
- 'name': 'name',
- 'success': True or False
- 'token': 'Token de timbrado',
- 'message': None
- """
- auth = AUTH['RESELLER']
-
- method = 'util'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- args = {
- 'username': auth['USER'],
- 'password': auth['PASS'],
- 'name': rfc,
- 'token_username': email,
- 'taxpayer_id': rfc,
- 'status': True,
- }
- try:
- result = client.service.add_token(**args)
- except Fault as e:
- self.error = str(e)
- return ''
-
- return result
-
- def get_date(self):
- method = 'util'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- try:
- result = client.service.datetime(AUTH['USER'], AUTH['PASS'])
- except Fault as e:
- self.error = str(e)
- return ''
-
- if result.error:
- self.error = result.error
- return
-
- return result.datetime
-
- def add_client(self, rfc, type_user=False):
- """Agrega un nuevo cliente para timbrado.
- Se requiere cuenta de reseller para usar este método
-
- Args:
- rfc (str): El RFC del nuevo cliente
-
- Kwargs:
- type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
-
- Returns:
- dict
- 'message':
- 'Account Created successfully'
- 'Account Already exists'
- 'success': True or False
- """
- auth = AUTH['RESELLER']
-
- tu = {False: 'P', True: 'O'}
- method = 'client'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- args = {
- 'reseller_username': auth['USER'],
- 'reseller_password': auth['PASS'],
- 'taxpayer_id': rfc,
- 'type_user': tu[type_user],
- 'added': datetime.datetime.now().isoformat()[:19],
- }
- try:
- result = client.service.add(**args)
- except Fault as e:
- self.error = str(e)
- return ''
-
- return result
-
- def edit_client(self, rfc, status=True):
- """
- Se requiere cuenta de reseller para usar este método
- status = 'A' or 'S'
- """
- auth = AUTH['RESELLER']
-
- sv = {False: 'S', True: 'A'}
- method = 'client'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- args = {
- 'reseller_username': auth['USER'],
- 'reseller_password': auth['PASS'],
- 'taxpayer_id': rfc,
- 'status': sv[status],
- }
- try:
- result = client.service.edit(**args)
- except Fault as e:
- self.error = str(e)
- return ''
-
- return result
-
- def get_client(self, rfc):
- """Regresa el estatus del cliente
- .
- Se requiere cuenta de reseller para usar este método
-
- Args:
- rfc (str): El RFC del emisor
-
- Returns:
- dict
- 'message': None,
- 'users': {
- 'ResellerUser': [
- {
- 'status': 'A',
- 'counter': 0,
- 'taxpayer_id': '',
- 'credit': 0
- }
- ]
- } or None si no existe
- """
- auth = AUTH['RESELLER']
-
- method = 'client'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- args = {
- 'reseller_username': auth['USER'],
- 'reseller_password': auth['PASS'],
- 'taxpayer_id': rfc,
- }
-
- try:
- result = client.service.get(**args)
- except Fault as e:
- self.error = str(e)
- return ''
- except TransportError as e:
- self.error = str(e)
- return ''
-
- return result
-
- def assign_client(self, rfc, credit):
- """Agregar credito a un emisor
-
- Se requiere cuenta de reseller para usar este método
-
- Args:
- rfc (str): El RFC del emisor, debe existir
- credit (int): Cantidad de folios a agregar
-
- Returns:
- dict
- 'success': True or False,
- 'credit': nuevo credito despues de agregar or None
- 'message':
- 'Success, added {credit} of credit to {RFC}'
- 'RFC no encontrado'
- """
- auth = AUTH['RESELLER']
-
- method = 'client'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- args = {
- 'username': auth['USER'],
- 'password': auth['PASS'],
- 'taxpayer_id': rfc,
- 'credit': credit,
- }
- try:
- result = client.service.assign(**args)
- except Fault as e:
- self.error = str(e)
- return ''
-
- return result
-
- def client_get_timbres(self, rfc):
- method = 'client'
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
- args = {
- 'reseller_username': self._auth['USER'],
- 'reseller_password': self._auth['PASS'],
- 'taxpayer_id': rfc,
- }
-
- try:
- self.result = client.service.get(**args)
- except Fault as e:
- self.error = str(e)
- return 0
- except TransportError as e:
- self.error = str(e)
- return 0
- except ConnectionError:
- self.error = 'Verifica la conexión a internet'
- return 0
-
- success = bool(self.result.users)
- if not success:
- self.error = self.result.message or 'RFC no existe'
- return 0
-
- return self.result.users.ResellerUser[0].credit
-
-
-def _get_data_sat(path):
- BF = 'string(//*[local-name()="{}"]/@{})'
- NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
-
- try:
- if os.path.isfile(path):
- tree = etree.parse(path).getroot()
- else:
- tree = etree.fromstring(path.encode())
-
- data = {}
- emisor = escape(
- tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
- tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
- )
- receptor = escape(
- tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
- tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
- )
- data['total'] = tree.get('total') or tree.get('Total')
- data['emisor'] = emisor
- data['receptor'] = receptor
- data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
- except Exception as e:
- print (e)
- return {}
-
- return '?re={emisor}&rr={receptor}&tt={total}&id={uuid}'.format(**data)
-
-
-def get_status_sat(xml):
- data = _get_data_sat(xml)
- if not data:
- return 'XML inválido'
-
- data = """
-
-
-
-
-
- {}
-
-
-
- """.format(data)
- headers = {
- 'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
- 'Content-type': 'text/xml; charset="UTF-8"'
- }
- URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc'
-
- try:
- result = requests.post(URL, data=data, headers=headers)
- tree = etree.fromstring(result.text)
- node = tree.xpath("//*[local-name() = 'Estado']")[0]
- except Exception as e:
- return 'Error: {}'.format(str(e))
-
- return node.text
-
-
-def main():
- return
-
-
-if __name__ == '__main__':
- main()
diff --git a/source/app/controllers/pacs/__init__.py b/source/app/controllers/pacs/__init__.py
index afe806b..05445b6 100644
--- a/source/app/controllers/pacs/__init__.py
+++ b/source/app/controllers/pacs/__init__.py
@@ -1,3 +1,4 @@
#!/usr/bin/env python
from .comerciodigital import PACComercioDigital
+from .finkok import PACFinkok
diff --git a/source/app/controllers/cfdi_cert.py b/source/app/controllers/pacs/cfdi_cert.py
similarity index 98%
rename from source/app/controllers/cfdi_cert.py
rename to source/app/controllers/pacs/cfdi_cert.py
index 295711b..a19fe7e 100644
--- a/source/app/controllers/cfdi_cert.py
+++ b/source/app/controllers/pacs/cfdi_cert.py
@@ -15,12 +15,7 @@ from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
-
-try:
- from .conf import TOKEN
-except ImportError:
- TOKEN = ''
- print('Agrega el TOKEN al archivo conf.py, obligatorio en v1.41.0')
+from .conf import TOKEN
class SATCertificate(object):
diff --git a/source/app/controllers/pacs/conf.py.example b/source/app/controllers/pacs/conf.py.example
new file mode 100644
index 0000000..deb384c
--- /dev/null
+++ b/source/app/controllers/pacs/conf.py.example
@@ -0,0 +1,6 @@
+#!/usr/bin/env python3
+
+
+DEBUG = False
+
+TOKEN = ''
diff --git a/source/app/controllers/pacs/finkok/__init__.py b/source/app/controllers/pacs/finkok/__init__.py
new file mode 100644
index 0000000..1b61fc3
--- /dev/null
+++ b/source/app/controllers/pacs/finkok/__init__.py
@@ -0,0 +1,3 @@
+#!/usr/bin/env python3
+
+from .finkok import PACFinkok
diff --git a/source/app/controllers/pacs/finkok/conf.py.example b/source/app/controllers/pacs/finkok/conf.py.example
new file mode 100644
index 0000000..af7b74c
--- /dev/null
+++ b/source/app/controllers/pacs/finkok/conf.py.example
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+# ~
+# ~ PAC
+# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
+# ~
+# ~ This program is free software: you can redistribute it and/or modify
+# ~ it under the terms of the GNU General Public License as published by
+# ~ the Free Software Foundation, either version 3 of the License, or
+# ~ (at your option) any later version.
+# ~
+# ~ This program is distributed in the hope that it will be useful,
+# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
+# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# ~ GNU General Public License for more details.
+# ~
+# ~ You should have received a copy of the GNU General Public License
+# ~ along with this program. If not, see .
+
+
+# ~ Siempre consulta la documentación de PAC
+# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC
+# ~ NO cambies las credenciales de prueba
+
+
+DEBUG = True
+
+
+AUTH = {
+ 'user': '',
+ 'pass': '',
+ 'RESELLER': {
+ 'user': '',
+ 'pass': ''
+ }
+}
+
+
+if DEBUG:
+ AUTH = {
+ 'user': 'pruebas-finkok@correolibre.net',
+ 'pass': '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366',
+ 'RESELLER': {
+ 'user': '',
+ 'pass': ''
+ }
+ }
diff --git a/source/app/controllers/pacs/finkok/finkok.py b/source/app/controllers/pacs/finkok/finkok.py
new file mode 100644
index 0000000..8c7e16b
--- /dev/null
+++ b/source/app/controllers/pacs/finkok/finkok.py
@@ -0,0 +1,549 @@
+#!/usr/bin/env python
+# ~
+# ~ PAC
+# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
+# ~
+# ~ This program is free software: you can redistribute it and/or modify
+# ~ it under the terms of the GNU General Public License as published by
+# ~ the Free Software Foundation, either version 3 of the License, or
+# ~ (at your option) any later version.
+# ~
+# ~ This program is distributed in the hope that it will be useful,
+# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
+# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# ~ GNU General Public License for more details.
+# ~
+# ~ You should have received a copy of the GNU General Public License
+# ~ along with this program. If not, see .
+
+import base64
+import datetime
+import logging
+import os
+import re
+from io import BytesIO
+from xml.sax.saxutils import unescape
+
+import lxml.etree as ET
+from zeep import Client
+from zeep.plugins import Plugin
+from zeep.cache import SqliteCache
+from zeep.transports import Transport
+from zeep.exceptions import Fault, TransportError
+from requests.exceptions import ConnectionError
+
+from .conf import DEBUG, AUTH
+
+
+LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
+LOG_DATE = '%d/%m/%Y %H:%M:%S'
+logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
+logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
+logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
+logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
+log = logging.getLogger(__name__)
+logging.getLogger('requests').setLevel(logging.ERROR)
+logging.getLogger('zeep').setLevel(logging.ERROR)
+
+
+TIMEOUT = 10
+DEBUG_SOAP = True
+
+
+class DebugPlugin(Plugin):
+
+ def _to_string(self, envelope, name):
+ if DEBUG_SOAP:
+ data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
+ path = f'/tmp/soap_{name}.xml'
+ with open(path, 'w') as f:
+ f.write(data)
+ return
+
+ def egress(self, envelope, http_headers, operation, binding_options):
+ self._to_string(envelope, 'request')
+ return envelope, http_headers
+
+ def ingress(self, envelope, http_headers, operation):
+ self._to_string(envelope, 'response')
+ return envelope, http_headers
+
+
+class PACFinkok(object):
+ WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
+ if DEBUG:
+ WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
+ URL = {
+ 'quick_stamp': False,
+ 'timbra': WS.format('stamp'),
+ 'cancel': WS.format('cancel'),
+ 'client': WS.format('registration'),
+ 'util': WS.format('utilities'),
+ }
+ CODE = {
+ '200': 'Comprobante timbrado satisfactoriamente',
+ '205': 'No Encontrado',
+ '307': 'Comprobante timbrado previamente',
+ '702': 'No se encontro el RFC del emisor',
+ 'IP': 'Invalid Passphrase',
+ 'IPMSG': 'Frase de paso inválida',
+ 'NE': 'No Encontrado',
+ }
+
+ def __init__(self):
+ self._error = ''
+ self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
+ self._plugins = [DebugPlugin()]
+
+ @property
+ def error(self):
+ return self._error
+
+ def _validate_result(self, result):
+ if hasattr(result, 'CodEstatus'):
+ ce = result.CodEstatus
+ if ce is None:
+ return result
+
+ if ce == self.CODE['IP']:
+ self._error = self.CODE['IPMSG']
+ return {}
+
+ if self.CODE['NE'] in ce:
+ self._error = 'UUID ' + self.CODE['NE']
+ return {}
+
+ if self.CODE['200'] != ce:
+ log.error('CodEstatus', type(ce), ce)
+ return result
+
+ if hasattr(result, 'Incidencias'):
+ fault = result.Incidencias.Incidencia[0]
+ cod_error = fault.CodigoError.encode('utf-8')
+ msg_error = fault.MensajeIncidencia.encode('utf-8')
+ error = 'Error: {}\n{}'.format(cod_error, msg_error)
+ self._error = self.CODE.get(cod_error, error)
+ return {}
+
+ return result
+
+ def _get_result(self, client, method, args):
+ self._error = ''
+ try:
+ result = getattr(client.service, method)(**args)
+ except Fault as e:
+ self._error = str(e)
+ return {}
+ except TransportError as e:
+ if '413' in str(e):
+ self._error = '413
Documento muy grande para timbrar'
+ else:
+ self._error = str(e)
+ return {}
+ except ConnectionError as e:
+ msg = '502 - Error de conexión'
+ self._error = msg
+ return {}
+
+ return self._validate_result(result)
+
+ def _to_string(self, data):
+ root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
+ xml = ET.tostring(root,
+ pretty_print=True, xml_declaration=True, encoding='utf-8')
+ return xml.decode('utf-8')
+
+ def stamp(self, cfdi, auth={}):
+ if DEBUG or not auth:
+ auth = AUTH
+
+ method = 'timbra'
+ client = Client(self.URL[method],
+ transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['user'],
+ 'password': auth['pass'],
+ 'xml': cfdi.encode('utf-8'),
+ }
+ result = self._get_result(client, 'stamp', args)
+ if self.error:
+ log.error(self.error)
+ return ''
+
+ data = {
+ 'xml': self._to_string(result.xml),
+ 'uuid': result.UUID,
+ 'date': result.Fecha,
+ }
+ return data
+
+ def _get_data_cancel(self, cfdi):
+ NS_CFDI = {
+ 'cfdi': 'http://www.sat.gob.mx/cfd/3',
+ 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
+ }
+ tree = ET.fromstring(cfdi.encode())
+ rfc_emisor = tree.xpath(
+ 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
+ namespaces=NS_CFDI)
+ cfdi_uuid = tree.xpath(
+ 'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)',
+ namespaces=NS_CFDI)
+ return rfc_emisor, cfdi_uuid
+
+ def cancel(self, cfdi, info, auth={}):
+ if not auth:
+ auth = AUTH
+
+ rfc_emisor, cfdi_uuid = self._get_data_cancel(cfdi)
+ method = 'cancel'
+ client = Client(self.URL[method],
+ transport=self._transport, plugins=self._plugins)
+ uuid_type = client.get_type('ns1:UUIDS')
+ sa = client.get_type('ns0:stringArray')
+
+ args = {
+ 'UUIDS': uuid_type(uuids=sa(string=cfdi_uuid)),
+ 'username': auth['user'],
+ 'password': auth['pass'],
+ 'taxpayer_id': rfc_emisor,
+ 'cer': info['cer'],
+ 'key': info['key'],
+ 'store_pending': False,
+ }
+
+ result = self._get_result(client, 'cancel', args)
+ if self.error:
+ log.error(self.error)
+ return ''
+
+ folio = result['Folios']['Folio'][0]
+ status = folio['EstatusUUID']
+ if status != '201':
+ log.debug(f'Cancel status: {status} - {cfdi_uuid}')
+
+ data = {
+ 'acuse': result['Acuse'],
+ 'date': result['Fecha'],
+ }
+ return data
+
+ def cancel_xml(self, xml, auth={}):
+ if not auth:
+ auth = AUTH
+
+ method = 'cancel'
+ client = Client(self.URL[method],
+ transport=self._transport, plugins=self._plugins)
+ client.set_ns_prefix('can', 'http://facturacion.finkok.com/cancel')
+ # ~ xml = f'\n{xml}'
+ # ~ xml = f'\n{xml}'
+ args = {
+ 'xml': xml.encode(),
+ 'username': auth['user'],
+ 'password': auth['pass'],
+ 'store_pending': False,
+ }
+ result = self._get_result(client, 'cancel_signature', args)
+ if self.error:
+ log.error(self.error)
+ return ''
+
+ folio = result['Folios']['Folio'][0]
+ status = folio['EstatusUUID']
+ if status != '201':
+ log.debug(f'Cancel status: {status} -')
+
+ data = {
+ 'acuse': result['Acuse'],
+ 'date': result['Fecha'],
+ }
+ return data
+
+ def client_add(self, rfc, type_user=False):
+ """Agrega un nuevo cliente para timbrado.
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del nuevo cliente
+
+ Kwargs:
+ type_user (bool):
+ False == 'P' == Prepago
+ True == 'O' == On demand
+
+ Returns:
+ True or False
+
+ origin PAC
+ 'message':
+ 'Account Created successfully'
+ 'Account Already exists'
+ 'success': True or False
+ """
+ auth = AUTH['RESELLER']
+ tu = {True: 'O', False: 'P'}
+
+ method = 'client'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'reseller_username': auth['user'],
+ 'reseller_password': auth['pass'],
+ 'taxpayer_id': rfc,
+ 'type_user': tu[type_user],
+ 'added': datetime.datetime.now().isoformat()[:19],
+ }
+
+ result = self._get_result(client, 'add', args)
+ if self.error:
+ return False
+
+ if not result.success:
+ self.error = result.message
+ return False
+
+ # ~ PAC success debería ser False
+ msg = 'Account Already exists'
+ if result.message == msg:
+ self.error = msg
+ return True
+
+ return result.success
+
+ def client_get_token(self, rfc, email):
+ """Genera un nuevo token al cliente para timbrado.
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del cliente, ya debe existir
+ email (str): El correo del cliente, funciona como USER al timbrar
+
+ Returns:
+ token (str): Es la contraseña para timbrar
+
+ origin PAC
+ dict
+ 'username': 'username',
+ 'status': True or False
+ 'name': 'name',
+ 'success': True or False
+ 'token': 'Token de timbrado',
+ 'message': None
+ """
+ auth = AUTH['RESELLER']
+ method = 'util'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['user'],
+ 'password': auth['pass'],
+ 'name': rfc,
+ 'token_username': email,
+ 'taxpayer_id': rfc,
+ 'status': True,
+ }
+
+ result = self._get_result(client, 'add_token', args)
+ if self.error:
+ log.error(self.error)
+ return ''
+
+ if not result.success:
+ self.error = result.message
+ log.error(self.error)
+ return ''
+
+ return result.token
+
+ def client_add_timbres(self, rfc, credit):
+ """Agregar credito a un emisor
+
+ Se requiere cuenta de reseller
+
+ Args:
+ rfc (str): El RFC del emisor, debe existir
+ credit (int): Cantidad de folios a agregar
+
+ Returns:
+ dict
+ 'success': True or False,
+ 'credit': nuevo credito despues de agregar or None
+ 'message':
+ 'Success, added {credit} of credit to {RFC}.'
+ 'RFC no encontrado'
+ """
+ auth = AUTH['RESELLER']
+
+ method = 'client'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['user'],
+ 'password': auth['pass'],
+ 'taxpayer_id': rfc,
+ 'credit': credit,
+ }
+
+ result = self._get_result(client, 'assign', args)
+ if self.error:
+ log.error(error)
+ return ''
+
+ if not result.success:
+ self.error = result.message
+ return 0
+
+ return result.credit
+
+ def client_balance(self, auth={}, rfc=''):
+ """Regresa los timbres restantes del cliente
+ Se pueden usar las credenciales de relleser o las credenciales del emisor
+
+ Args:
+ rfc (str): El RFC del emisor
+
+ Kwargs:
+ auth (dict): Credenciales del emisor
+
+ Returns:
+ int Cantidad de timbres restantes
+ """
+
+ if not auth:
+ auth = AUTH['RESELLER']
+
+ method = 'client'
+ client = Client(self.URL[method],
+ transport=self._transport, plugins=self._plugins)
+ args = {
+ 'reseller_username': auth['user'],
+ 'reseller_password': auth['pass'],
+ 'taxpayer_id': rfc,
+ }
+
+ result = self._get_result(client, 'get', args)
+ if self.error:
+ log.error(self.error)
+ return ''
+
+ success = bool(result.users)
+ if not success:
+ self.error = result.message or 'RFC no existe'
+ return 0
+
+ return result.users.ResellerUser[0].credit
+
+ def client_set_status(self, rfc, status):
+ """Edita el estatus (Activo o Suspendido) de un cliente
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del cliente
+
+ Kwargs:
+ status (bool):
+ True == 'A' == Activo
+ False == 'S' == Suspendido
+
+ Returns:
+ dict
+ 'message':
+ 'Account Created successfully'
+ 'Account Already exists'
+ 'success': True or False
+ """
+ auth = AUTH['RESELLER']
+ ts = {True: 'A', False: 'S'}
+ method = 'client'
+ client = Client(self.URL[method],
+ transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'reseller_username': auth['user'],
+ 'reseller_password': auth['pass'],
+ 'taxpayer_id': rfc,
+ 'status': ts[status],
+ }
+ result = self._get_result(client, 'edit', args)
+
+ if self.error:
+ return False
+
+ if not result.success:
+ self.error = result.message
+ return False
+
+ return True
+
+ def client_switch(self, rfc, type_user):
+ """Edita el tipo de timbrado (OnDemand o Prepago) de un cliente
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del cliente
+
+ Kwargs:
+ status (bool):
+ True == 'O' == OnDemand
+ False == 'P' == Prepago
+
+ Returns:
+ dict
+ 'message':
+ 'Account Created successfully'
+ 'Account Already exists'
+ 'success': True or False
+ """
+ auth = AUTH['RESELLER']
+ tu = {True: 'O', False: 'P'}
+ method = 'client'
+ client = Client(self.URL[method],
+ transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'username': auth['user'],
+ 'password': auth['pass'],
+ 'taxpayer_id': rfc,
+ 'type_user': tu[type_user],
+ }
+ result = self._get_result(client, 'switch', args)
+
+ if self.error:
+ return False
+
+ if not result.success:
+ self.error = result.message
+ return False
+
+ return True
+
+ def client_report_folios(self, rfc, date_from, date_to, invoice_type='I'):
+ """Obtiene un reporte del total de facturas timbradas
+ """
+ auth = AUTH['RESELLER']
+
+ args = {
+ 'username': auth['user'],
+ 'password': auth['pass'],
+ 'taxpayer_id': rfc,
+ 'date_from': date_from,
+ 'date_to': date_to,
+ 'invoice_type': invoice_type,
+ }
+
+ method = 'util'
+ client = Client(self.URL[method],
+ transport=self._transport, plugins=self._plugins)
+
+ result = self._get_result(client, 'report_total', args)
+
+ if result.result is None:
+ # ~ PAC - Debería regresar RFC inexistente o sin registros
+ self.error = 'RFC no existe o no tiene registros'
+ return 0
+
+ total = result.result.ReportTotal[0].total
+
+ return total
diff --git a/source/app/controllers/util.py b/source/app/controllers/util.py
index 9b6dd25..af95d32 100644
--- a/source/app/controllers/util.py
+++ b/source/app/controllers/util.py
@@ -73,7 +73,7 @@ from settings import USAR_TOKEN, API, DECIMALES_TAX
# ~ v2
-from .cfdi_cert import SATCertificate
+from .pacs.cfdi_cert import SATCertificate
from settings import (
EXT,
diff --git a/source/app/controllers/utils.py b/source/app/controllers/utils.py
index 274635c..76275ff 100644
--- a/source/app/controllers/utils.py
+++ b/source/app/controllers/utils.py
@@ -52,11 +52,9 @@ from .cfdi_xml import CFDI
from settings import DEBUG, DB_COMPANIES, PATHS, TEMPLATE_CANCEL
-from .cfdi_cert import SATCertificate
+from .pacs.cfdi_cert import SATCertificate
from .pacs import PACComercioDigital
-# ~ from .pacs import PACFinkok
-from .pac import Finkok as PACFinkok
-# ~ from .finkok import PACFinkok
+from .pacs import PACFinkok
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
diff --git a/source/app/models/main.py b/source/app/models/main.py
index 0d2dcb1..255a754 100644
--- a/source/app/models/main.py
+++ b/source/app/models/main.py
@@ -3917,24 +3917,24 @@ class Facturas(BaseModel):
query.execute()
return
- def _cancel_signature(self, id):
- msg = 'Factura cancelada correctamente'
- auth = Emisor.get_auth()
- certificado = Certificado.select()[0]
- obj = Facturas.get(Facturas.id==id)
- data, result = util.cancel_signature(
- obj.uuid, certificado.p12, certificado.rfc, auth)
- if data['ok']:
- obj.estatus = 'Cancelada'
- obj.error = ''
- obj.cancelada = True
- obj.fecha_cancelacion = result['Fecha']
- obj.acuse = result['Acuse']
- self._actualizar_saldo_cliente(self, obj, True)
- else:
- obj.error = data['msg']
- obj.save()
- return data
+ # ~ def _cancel_signature(self, id):
+ # ~ msg = 'Factura cancelada correctamente'
+ # ~ auth = Emisor.get_auth()
+ # ~ certificado = Certificado.select()[0]
+ # ~ obj = Facturas.get(Facturas.id==id)
+ # ~ data, result = util.cancel_signature(
+ # ~ obj.uuid, certificado.p12, certificado.rfc, auth)
+ # ~ if data['ok']:
+ # ~ obj.estatus = 'Cancelada'
+ # ~ obj.error = ''
+ # ~ obj.cancelada = True
+ # ~ obj.fecha_cancelacion = result['Fecha']
+ # ~ obj.acuse = result['Acuse']
+ # ~ self._actualizar_saldo_cliente(self, obj, True)
+ # ~ else:
+ # ~ obj.error = data['msg']
+ # ~ obj.save()
+ # ~ return data
def _get_filters(self, values):
if 'start' in values and 'end' in values: