diff --git a/source/comerciodigital/comercio.py b/source/comerciodigital/comercio.py
index bcca148..04239b1 100644
--- a/source/comerciodigital/comercio.py
+++ b/source/comerciodigital/comercio.py
@@ -78,8 +78,6 @@ class PACComercioDigital(object):
def __init__(self):
self.error = ''
- # ~ self.cfdi_uuid = ''
- # ~ self.date_stamped = ''
def _error(self, msg):
self.error = str(msg)
diff --git a/source/finkok/__init__.py b/source/finkok/__init__.py
new file mode 100644
index 0000000..1b61fc3
--- /dev/null
+++ b/source/finkok/__init__.py
@@ -0,0 +1,3 @@
+#!/usr/bin/env python3
+
+from .finkok import PACFinkok
diff --git a/source/finkok/conf.py.example b/source/finkok/conf.py.example
new file mode 100644
index 0000000..8394472
--- /dev/null
+++ b/source/finkok/conf.py.example
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+# ~
+# ~ PAC
+# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
+# ~
+# ~ This program is free software: you can redistribute it and/or modify
+# ~ it under the terms of the GNU General Public License as published by
+# ~ the Free Software Foundation, either version 3 of the License, or
+# ~ (at your option) any later version.
+# ~
+# ~ This program is distributed in the hope that it will be useful,
+# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
+# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# ~ GNU General Public License for more details.
+# ~
+# ~ You should have received a copy of the GNU General Public License
+# ~ along with this program. If not, see .
+
+
+# ~ Siempre consulta la documentación de PAC
+# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC
+# ~ NO cambies las credenciales de prueba
+
+
+DEBUG = True
+
+
+AUTH = {
+ 'user': '',
+ 'pass': '',
+}
+
+
+if DEBUG:
+ AUTH = {
+ 'user': 'pruebas-finkok@correolibre.net',
+ 'pass': '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366',
+ }
diff --git a/source/finkok/finkok.py b/source/finkok/finkok.py
new file mode 100644
index 0000000..9e8f578
--- /dev/null
+++ b/source/finkok/finkok.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env python
+# ~
+# ~ PAC
+# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
+# ~
+# ~ This program is free software: you can redistribute it and/or modify
+# ~ it under the terms of the GNU General Public License as published by
+# ~ the Free Software Foundation, either version 3 of the License, or
+# ~ (at your option) any later version.
+# ~
+# ~ This program is distributed in the hope that it will be useful,
+# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
+# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# ~ GNU General Public License for more details.
+# ~
+# ~ You should have received a copy of the GNU General Public License
+# ~ along with this program. If not, see .
+
+
+import datetime
+import logging
+import os
+import re
+from io import BytesIO
+from xml.sax.saxutils import unescape
+
+import lxml.etree as ET
+from zeep import Client
+from zeep.plugins import Plugin
+from zeep.cache import SqliteCache
+from zeep.transports import Transport
+from zeep.exceptions import Fault, TransportError
+from requests.exceptions import ConnectionError
+
+from .conf import DEBUG, AUTH
+
+
+LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
+LOG_DATE = '%d/%m/%Y %H:%M:%S'
+logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
+logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
+logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
+logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
+log = logging.getLogger(__name__)
+logging.getLogger('requests').setLevel(logging.ERROR)
+
+
+TIMEOUT = 10
+DEBUG_SOAP = False
+
+
+class DebugPlugin(Plugin):
+
+ def _to_string(self, envelope, name):
+ if DEBUG_SOAP:
+ data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
+ path = f'/tmp/soap_{name}.xml'
+ with open(path, 'w') as f:
+ f.write(data)
+ return
+
+ def egress(self, envelope, http_headers, operation, binding_options):
+ self._to_string(envelope, 'request')
+ return envelope, http_headers
+
+ def ingress(self, envelope, http_headers, operation):
+ self._to_string(envelope, 'response')
+ return envelope, http_headers
+
+
+class PACFinkok(object):
+ WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
+ if DEBUG:
+ WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
+ URL = {
+ 'quick_stamp': False,
+ 'timbra': WS.format('stamp'),
+ 'cancel': WS.format('cancel'),
+ 'client': WS.format('registration'),
+ 'util': WS.format('utilities'),
+ }
+ CODE = {
+ '200': 'Comprobante timbrado satisfactoriamente',
+ '205': 'No Encontrado',
+ '307': 'Comprobante timbrado previamente',
+ '702': 'No se encontro el RFC del emisor',
+ 'IP': 'Invalid Passphrase',
+ 'IPMSG': 'Frase de paso inválida',
+ 'NE': 'No Encontrado',
+ }
+
+ def __init__(self):
+ self._error = ''
+ self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
+ self._plugins = [DebugPlugin()]
+
+ def _validate_result(self, result):
+ if hasattr(result, 'CodEstatus'):
+ ce = result.CodEstatus
+ if ce == self.CODE['IP']:
+ self._error = self.CODE['IPMSG']
+ return {}
+
+ if self.CODE['NE'] in ce:
+ self._error = 'UUID ' + self.CODE['NE']
+ return {}
+
+ if self.CODE['200'] != ce:
+ log.error('CodEstatus', type(ce), ce)
+ return result
+
+ if hasattr(result, 'Incidencias'):
+ fault = result.Incidencias.Incidencia[0]
+ cod_error = fault.CodigoError.encode('utf-8')
+ msg_error = fault.MensajeIncidencia.encode('utf-8')
+ error = 'Error: {}\n{}'.format(cod_error, msg_error)
+ self._error = self.CODE.get(cod_error, error)
+ return {}
+
+ return result
+
+ def _get_result(self, client, method, args):
+ self._error = ''
+ try:
+ result = getattr(client.service, method)(**args)
+ except Fault as e:
+ self._error = str(e)
+ return {}
+ except TransportError as e:
+ if '413' in str(e):
+ self._error = '413
Documento muy grande para timbrar'
+ else:
+ self._error = str(e)
+ return {}
+ except ConnectionError as e:
+ msg = '502 - Error de conexión'
+ self._error = msg
+ return {}
+
+ return self._validate_result(result)
+
+ def _to_string(self, data):
+ root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
+ xml = ET.tostring(root,
+ pretty_print=True, xml_declaration=True, encoding='utf-8')
+ return xml.decode('utf-8')
+
+ def stamp(self, cfdi, auth={}):
+ if DEBUG or not auth:
+ auth = AUTH
+
+ method = 'timbra'
+ client = Client(self.URL[method],
+ transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['user'],
+ 'password': auth['pass'],
+ 'xml': cfdi,
+ }
+ result = self._get_result(client, 'stamp', args)
+ if self.error:
+ return ''
+
+ data = {
+ 'xml': self._to_string(result.xml),
+ 'uuid': result.UUID,
+ 'date': result.Fecha,
+ }
+ return data
diff --git a/source/finkok/finkok1.py b/source/finkok/finkok1.py
new file mode 100644
index 0000000..dda9195
--- /dev/null
+++ b/source/finkok/finkok1.py
@@ -0,0 +1,729 @@
+#!/usr/bin/env python
+# ~
+# ~ PAC
+# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
+# ~
+# ~ This program is free software: you can redistribute it and/or modify
+# ~ it under the terms of the GNU General Public License as published by
+# ~ the Free Software Foundation, either version 3 of the License, or
+# ~ (at your option) any later version.
+# ~
+# ~ This program is distributed in the hope that it will be useful,
+# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
+# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# ~ GNU General Public License for more details.
+# ~
+# ~ You should have received a copy of the GNU General Public License
+# ~ along with this program. If not, see .
+
+
+import datetime
+import os
+import re
+from io import BytesIO
+from xml.sax.saxutils import unescape
+
+import lxml.etree as ET
+from zeep import Client
+from zeep.plugins import Plugin
+from zeep.cache import SqliteCache
+from zeep.transports import Transport
+from zeep.exceptions import Fault, TransportError
+from requests.exceptions import ConnectionError
+
+from conf import DEBUG, FINKOK
+
+
+TIMEOUT = 10
+DEBUG_SOAP = False
+
+
+class DebugPlugin(Plugin):
+
+ def _to_string(self, envelope, name):
+ if DEBUG_SOAP:
+ data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
+ path = f'/tmp/soap_{name}.xml'
+ with open(path, 'w') as f:
+ f.write(data)
+ return
+
+ def egress(self, envelope, http_headers, operation, binding_options):
+ self._to_string(envelope, 'request')
+ return envelope, http_headers
+
+ def ingress(self, envelope, http_headers, operation):
+ self._to_string(envelope, 'response')
+ return envelope, http_headers
+
+
+class PACFinkok(object):
+ URL = {
+ 'quick_stamp': False,
+ 'timbra': FINKOK['WS'].format('stamp'),
+ 'cancel': FINKOK['WS'].format('cancel'),
+ 'client': FINKOK['WS'].format('registration'),
+ 'util': FINKOK['WS'].format('utilities'),
+ }
+ CODE = {
+ '200': 'Comprobante timbrado satisfactoriamente',
+ '205': 'No Encontrado',
+ '307': 'Comprobante timbrado previamente',
+ '702': 'No se encontro el RFC del emisor',
+ 'IP': 'Invalid Passphrase',
+ 'IPMSG': 'Frase de paso inválida',
+ 'NE': 'No Encontrado',
+ }
+
+ def __init__(self):
+ self._error = ''
+ self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
+ self._plugins = [DebugPlugin()]
+
+ def _validate_result(self, result):
+ if hasattr(result, 'CodEstatus'):
+ ce = result.CodEstatus
+ if ce == self.CODE['IP']:
+ self.error = self.CODE['IPMSG']
+ return {}
+
+ if self.CODE['NE'] in ce:
+ self.error = 'UUID ' + self.CODE['NE']
+ return {}
+
+ if self.CODE['200'] != ce:
+ print('CodEstatus', type(ce), ce)
+ return result
+
+ if hasattr(result, 'Incidencias'):
+ fault = result.Incidencias.Incidencia[0]
+ cod_error = fault.CodigoError.encode('utf-8')
+ msg_error = fault.MensajeIncidencia.encode('utf-8')
+ error = 'Error: {}\n{}'.format(cod_error, msg_error)
+ self.error = self.CODE.get(cod_error, error)
+ return {}
+
+ return result
+
+ def _get_result(self, client, method, args):
+ self.error = ''
+ try:
+ result = getattr(client.service, method)(**args)
+ except Fault as e:
+ self.error = str(e)
+ return {}
+ except TransportError as e:
+ if '413' in str(e):
+ self.error = '413
Documento muy grande para timbrar'
+ else:
+ self.error = str(e)
+ return {}
+ except ConnectionError as e:
+ msg = '502 - Error de conexión'
+ self.error = msg
+ return {}
+
+ return self._validate_result(result)
+
+ def cfdi_stamp(self, cfdi, auth={}):
+ if not auth:
+ auth = FINKOK['AUTH']
+
+ method = 'timbra'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'xml': cfdi,
+ }
+
+ result = self._get_result(client, 'stamp', args)
+ if self.error:
+ return {}
+
+ data = {
+ 'xml': self._to_string(result.xml),
+ 'uuid': result.UUID,
+ 'fecha': result.Fecha,
+ }
+ return data
+
+ def cfdi_cancel(self, rfc, uuid, cer, key, auth={}):
+ if not auth:
+ auth = FINKOK['AUTH']
+
+ method = 'cancel'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ uuid_type = client.get_type('ns1:UUIDS')
+ sa = client.get_type('ns0:stringArray')
+
+ args = {
+ 'UUIDS': uuid_type(uuids=sa(string=uuid)),
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'cer': cer,
+ 'key': key,
+ 'store_pending': False,
+ }
+
+ result = self._get_result(client, 'cancel', args)
+ if self.error:
+ return {}
+
+ return result
+
+ def cfdi_status(self, uuid, auth={}):
+ if not auth:
+ auth = FINKOK['AUTH']
+
+ method = 'timbra'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'uuid': uuid,
+ }
+
+ result = self._get_result(client, 'query_pending', args)
+ if self.error:
+ return {}
+
+ STATUS = {
+ 'C': 'Cancelado',
+ 'S': 'Timbrado, aún no eviado al SAT',
+ 'F': 'Timbrado y enviado al SAT',
+ }
+
+ data = {
+ 'estatus': STATUS[result.status],
+ 'xml': self._to_string(unescape(result.xml)),
+ 'fecha': result.date,
+ }
+
+ return data
+
+ def client_add(self, rfc, type_user=False):
+ """Agrega un nuevo cliente para timbrado.
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del nuevo cliente
+
+ Kwargs:
+ type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
+
+ Returns:
+ True or False
+
+ origin PAC
+ 'message':
+ 'Account Created successfully'
+ 'Account Already exists'
+ 'success': True or False
+ """
+ auth = FINKOK['RESELLER']
+ tu = {True: 'O', False: 'P'}
+
+ method = 'client'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'reseller_username': auth['USER'],
+ 'reseller_password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'type_user': tu[type_user],
+ 'added': datetime.datetime.now().isoformat()[:19],
+ }
+
+ result = self._get_result(client, 'add', args)
+ if self.error:
+ return False
+
+ if not result.success:
+ self.error = result.message
+ return False
+
+ # ~ PAC success debería ser False
+ msg = 'Account Already exists'
+ if result.message == msg:
+ self.error = msg
+ return True
+
+ return result.success
+
+ def client_add_token(self, rfc, email):
+ """Agrega un nuevo token al cliente para timbrado.
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del cliente, ya debe existir
+ email (str): El correo del cliente, funciona como USER al timbrar
+
+ Returns:
+ token (str): Es la contraseña para timbrar
+
+ origin PAC
+ dict
+ 'username': 'username',
+ 'status': True or False
+ 'name': 'name',
+ 'success': True or False
+ 'token': 'Token de timbrado',
+ 'message': None
+ """
+ auth = FINKOK['RESELLER']
+ method = 'util'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'name': rfc,
+ 'token_username': email,
+ 'taxpayer_id': rfc,
+ 'status': True,
+ }
+
+ result = self._get_result(client, 'add_token', args)
+ if self.error:
+ return ''
+
+ if not result.success:
+ self.error = result.message
+ return ''
+
+ return result.token
+
+ # ~ Send issue to PAC
+ def client_reset_token(self, email):
+ auth = FINKOK['RESELLER']
+ method = 'util'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'token': email,
+ }
+
+ result = self._get_result(client, 'reset_token', args)
+ if self.error:
+ return ''
+
+ if not result.success:
+ self.error = result.message
+ return ''
+
+ return result.token
+
+ def client_add_timbres(self, rfc, credit):
+ """Agregar credito a un emisor
+
+ Se requiere cuenta de reseller
+
+ Args:
+ rfc (str): El RFC del emisor, debe existir
+ credit (int): Cantidad de folios a agregar
+
+ Returns:
+ dict
+ 'success': True or False,
+ 'credit': nuevo credito despues de agregar or None
+ 'message':
+ 'Success, added {credit} of credit to {RFC}.'
+ 'RFC no encontrado'
+ """
+ auth = FINKOK['RESELLER']
+
+ if not isinstance(credit, int):
+ self.error = 'El credito debe ser un entero'
+ return 0
+
+ method = 'client'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'credit': credit,
+ }
+
+ result = self._get_result(client, 'assign', args)
+ if self.error:
+ return ''
+
+ if not result.success:
+ self.error = result.message
+ return 0
+
+ return result.credit
+
+ def client_edit(self, rfc, status=True):
+ """Edita el estatus (Activo o Suspendido) de un cliente
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del cliente
+
+ Kwargs:
+ status (bool): True == 'A' == Activo or False == 'S' == Suspendido
+
+ Returns:
+ dict
+ 'message':
+ 'Account Created successfully'
+ 'Account Already exists'
+ 'success': True or False
+ """
+ auth = FINKOK['RESELLER']
+ ts = {True: 'A', False: 'S'}
+ method = 'client'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'reseller_username': auth['USER'],
+ 'reseller_password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'status': ts[status],
+ }
+ result = self._get_result(client, 'edit', args)
+ if self.error:
+ return False
+
+ if not result.success:
+ self.error = result.message
+ return False
+
+ return True
+
+ def client_get(self, rfc):
+ """Regresa el estatus del cliente
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del emisor
+
+ Returns:
+ dict
+ 'message': None,
+ 'users': {
+ 'ResellerUser': [
+ {
+ 'status': 'A',
+ 'counter': 0,
+ 'taxpayer_id': '',
+ 'credit': 0
+ }
+ ]
+ } or None si no existe
+ """
+ auth = FINKOK['RESELLER']
+
+ method = 'client'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'reseller_username': auth['USER'],
+ 'reseller_password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ }
+
+ try:
+ self.result = client.service.get(**args)
+ except Fault as e:
+ self.error = str(e)
+ return {}
+ except TransportError as e:
+ self.error = str(e)
+ return {}
+ except ConnectionError:
+ self.error = 'Verifica la conexión a internet'
+ return {}
+
+ success = bool(self.result.users)
+ if not success:
+ self.error = self.result.message or 'RFC no existe'
+ return {}
+
+ data = self.result.users.ResellerUser[0]
+ client = {
+ 'status': data.status,
+ 'counter': data.counter,
+ 'credit': data.credit,
+ }
+ return client
+
+ def client_get_timbres(self, rfc, auth={}):
+ """Regresa los timbres restantes del cliente
+ Se pueden usar las credenciales de relleser o las credenciales del emisor
+
+ Args:
+ rfc (str): El RFC del emisor
+
+ Kwargs:
+ auth (dict): Credenciales del emisor
+
+ Returns:
+ int Cantidad de timbres restantes
+ """
+
+ if not auth:
+ auth = FINKOK['RESELLER']
+
+ method = 'client'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'reseller_username': auth['USER'],
+ 'reseller_password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ }
+
+ try:
+ self.result = client.service.get(**args)
+ except Fault as e:
+ self.error = str(e)
+ return 0
+ except TransportError as e:
+ self.error = str(e)
+ return 0
+ except ConnectionError:
+ self.error = 'Verifica la conexión a internet'
+ return 0
+
+ success = bool(self.result.users)
+ if not success:
+ self.error = self.result.message or 'RFC no existe'
+ return 0
+
+ return self.result.users.ResellerUser[0].credit
+
+ def get_server_datetime(self):
+ """Regresa la fecha y hora del servidor de timbrado del PAC
+ """
+ auth = FINKOK['RESELLER']
+
+ method = 'util'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ try:
+ self.result = client.service.datetime(auth['USER'], auth['PASS'])
+ except Fault as e:
+ self.error = str(e)
+ return None
+ except TransportError as e:
+ self.error = str(e)
+ return None
+ except ConnectionError:
+ self.error = 'Verifica la conexión a internet'
+ return None
+
+ try:
+ dt = datetime.datetime.strptime(
+ self.result.datetime, '%Y-%m-%dT%H:%M:%S')
+ except ValueError:
+ self.error = 'Error al obtener la fecha'
+ return None
+
+ return dt
+
+ def get_report_credit(self, rfc):
+ """Obtiene un reporte de los timbres agregados
+ """
+ auth = FINKOK['RESELLER']
+
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ }
+
+ method = 'util'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ try:
+ self.result = client.service.report_credit(**args)
+ except Fault as e:
+ self.error = str(e)
+ return []
+ except TransportError as e:
+ self.error = str(e)
+ return []
+ except ConnectionError:
+ self.error = 'Verifica la conexión a internet'
+ return []
+
+ if self.result.result is None:
+ # ~ PAC - Debería regresar RFC inexistente o sin registros
+ self.error = 'RFC no existe o no tiene registros'
+ return []
+
+ return self.result.result.ReportTotalCredit
+
+ def get_report_total(self, rfc, date_from, date_to, invoice_type='I'):
+ """Obtiene un reporte del total de facturas timbradas
+ """
+ auth = FINKOK['RESELLER']
+
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'date_from': date_from,
+ 'date_to': date_to,
+ 'invoice_type': invoice_type,
+ }
+
+ method = 'util'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ try:
+ self.result = client.service.report_total(**args)
+ except Fault as e:
+ self.error = str(e)
+ return 0
+ except TransportError as e:
+ self.error = str(e)
+ return 0
+ except ConnectionError:
+ self.error = 'Verifica la conexión a internet'
+ return 0
+
+ if self.result.result is None:
+ # ~ PAC - Debería regresar RFC inexistente o sin registros
+ self.error = 'RFC no existe o no tiene registros'
+ return 0
+
+ return self.result.result.ReportTotal[0].total or 0
+
+ def get_report_uuid(self, rfc, date_from, date_to, invoice_type='I'):
+ """Obtiene un reporte de los CFDI timbrados
+ """
+ auth = FINKOK['RESELLER']
+
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'date_from': date_from,
+ 'date_to': date_to,
+ 'invoice_type': invoice_type,
+ }
+
+ method = 'util'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ try:
+ self.result = client.service.report_uuid(**args)
+ except Fault as e:
+ self.error = str(e)
+ return []
+ except TransportError as e:
+ self.error = str(e)
+ return []
+ except ConnectionError:
+ self.error = 'Verifica la conexión a internet'
+ return []
+
+ if self.result.invoices is None:
+ # ~ PAC - Debería regresar RFC inexistente o sin registros
+ self.error = 'RFC no existe o no tiene registros'
+ return []
+
+ return self.result.invoices.ReportUUID
+
+ def _to_string(self, data):
+ root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
+ xml = ET.tostring(root,
+ pretty_print=True, xml_declaration=True, encoding='utf-8')
+ return xml.decode('utf-8')
+
+ def cfdi_get_by_xml(self, xml, auth):
+ if not auth:
+ auth = FINKOK['AUTH']
+
+ method = 'timbra'
+ client = Client(
+ self.URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'xml': xml,
+ }
+
+ try:
+ result = client.service.stamped(**args)
+ except Fault as e:
+ self.error = str(e)
+ return {}
+ except TransportError as e:
+ self.error = str(e)
+ return {}
+ except ConnectionError as e:
+ msg = '502 - Error de conexión'
+ self.error = msg
+ return {}
+
+ print(result)
+
+ error = 'Error: {}\n{}'.format(code_error, msg_error)
+ self.error = self.CODE.get(code_error, error)
+ return {}
+
+ def cfdi_get_by_uuid(self, uuid, rfc, invoice_type='I', auth={}):
+ if not auth:
+ auth = FINKOK['AUTH']
+
+ method = 'util'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'uuid': uuid,
+ 'taxpayer_id': rfc,
+ 'invoice_type': invoice_type,
+ }
+ try:
+ result = client.service.get_xml(**args)
+ except Fault as e:
+ self.error = str(e)
+ return {}
+ except TransportError as e:
+ self.error = str(e)
+ return {}
+ except ConnectionError as e:
+ msg = '502 - Error de conexión'
+ self.error = msg
+ return {}
+
+ print(result)
+
+ error = 'Error: {}\n{}'.format(code_error, msg_error)
+ self.error = self.CODE.get(code_error, error)
+ return {}
+
+
+def main():
+ rfc = 'TEST740115999'
+ # ~ rfc = 'TCM970625MB1'
+ email = 'test999@empresalibre.mx'
+ pac = PACFinkok()
+ result = pac.client_get(rfc)
+ print(result)
+ result = pac.client_add_timbres(rfc, 10)
+ print(result)
+ return
+
+
+if __name__ == '__main__':
+ main()
diff --git a/source/finkok/finkok2.py b/source/finkok/finkok2.py
new file mode 100644
index 0000000..80f5807
--- /dev/null
+++ b/source/finkok/finkok2.py
@@ -0,0 +1,636 @@
+#!/usr/bin/env python3
+
+#~ import re
+#~ from xml.etree import ElementTree as ET
+#~ from requests import Request, Session, exceptions
+import datetime
+import hashlib
+import os
+import requests
+import time
+from lxml import etree
+from xml.dom.minidom import parseString
+from xml.sax.saxutils import escape, unescape
+from uuid import UUID
+
+from logbook import Logger
+from zeep import Client
+from zeep.plugins import HistoryPlugin
+from zeep.cache import SqliteCache
+from zeep.transports import Transport
+from zeep.exceptions import Fault, TransportError
+from requests.exceptions import ConnectionError
+
+
+if __name__ == '__main__':
+ from configpac import DEBUG, TIMEOUT, AUTH, URL
+else:
+ from .configpac import DEBUG, TIMEOUT, AUTH, URL
+
+
+log = Logger('PAC')
+#~ node = client.create_message(client.service, SERVICE, **args)
+#~ print(etree.tostring(node, pretty_print=True).decode())
+
+
+class Finkok(object):
+
+ def __init__(self, auth={}):
+ self.codes = URL['codes']
+ self.error = ''
+ self.message = ''
+ self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
+ self._plugins = None
+ self._history = None
+ self.uuid = ''
+ self.fecha = None
+ if DEBUG:
+ self._history = HistoryPlugin()
+ self._plugins = [self._history]
+ self._auth = AUTH
+ else:
+ self._auth = auth
+
+ def _debug(self):
+ if not DEBUG:
+ return
+ print('SEND', self._history.last_sent)
+ print('RESULT', self._history.last_received)
+ return
+
+ def _check_result(self, method, result):
+ # ~ print ('CODE', result.CodEstatus)
+ # ~ print ('INCIDENCIAS', result.Incidencias)
+ self.message = ''
+ MSG = {
+ 'OK': 'Comprobante timbrado satisfactoriamente',
+ '307': 'Comprobante timbrado previamente',
+ }
+ status = result.CodEstatus
+ if status is None and result.Incidencias:
+ for i in result.Incidencias['Incidencia']:
+ self.error += 'Error: {}\n{}\n{}'.format(
+ i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo'])
+ return ''
+
+ if method == 'timbra' and status in (MSG['OK'], MSG['307']):
+ #~ print ('UUID', result.UUID)
+ #~ print ('FECHA', result.Fecha)
+ if status == MSG['307']:
+ self.message = MSG['307']
+ tree = parseString(result.xml)
+ response = tree.toprettyxml(encoding='utf-8').decode('utf-8')
+ self.uuid = result.UUID
+ self.fecha = result.Fecha
+
+ return response
+
+ def _load_file(self, path):
+ try:
+ with open(path, 'rb') as f:
+ data = f.read()
+ except Exception as e:
+ self.error = str(e)
+ return
+ return data
+
+ def _validate_xml(self, file_xml):
+ if os.path.isfile(file_xml):
+ try:
+ with open(file_xml, 'rb') as f:
+ xml = f.read()
+ except Exception as e:
+ self.error = str(e)
+ return False, ''
+ else:
+ xml = file_xml.encode('utf-8')
+ return True, xml
+
+ def _validate_uuid(self, uuid):
+ try:
+ UUID(uuid)
+ return True
+ except ValueError:
+ self.error = 'UUID no válido: {}'.format(uuid)
+ return False
+
+ def timbra_xml(self, file_xml):
+ self.error = ''
+
+ if not DEBUG and not self._auth:
+ self.error = 'Sin datos para timbrar'
+ return
+
+ method = 'timbra'
+ ok, xml = self._validate_xml(file_xml)
+ if not ok:
+ return ''
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'username': self._auth['USER'],
+ 'password': self._auth['PASS'],
+ 'xml': xml,
+ }
+ if URL['quick_stamp']:
+ try:
+ result = client.service.quick_stamp(**args)
+ except Fault as e:
+ self.error = str(e)
+ return
+ else:
+ try:
+ result = client.service.stamp(**args)
+ except Fault as e:
+ self.error = str(e)
+ return
+ except TransportError as e:
+ if '413' in str(e):
+ self.error = '413
Documento muy grande para timbrar'
+ else:
+ self.error = str(e)
+ return
+ except ConnectionError as e:
+ msg = '502 - Error de conexión'
+ self.error = msg
+ return
+
+ return self._check_result(method, result)
+
+ def _get_xml(self, uuid):
+ if not self._validate_uuid(uuid):
+ return ''
+
+ method = 'util'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'username': self._auth['USER'],
+ 'password': self._auth['PASS'],
+ 'uuid': uuid,
+ 'taxpayer_id': self.rfc,
+ 'invoice_type': 'I',
+ }
+ try:
+ result = client.service.get_xml(**args)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+ except TransportError as e:
+ self.error = str(e)
+ return ''
+
+ if result.error:
+ self.error = result.error
+ return ''
+
+ tree = parseString(result.xml)
+ xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
+ return xml
+
+ def recupera_xml(self, file_xml='', uuid=''):
+ self.error = ''
+ if uuid:
+ return self._get_xml(uuid)
+
+ method = 'timbra'
+ ok, xml = self._validate_xml(file_xml)
+ if not ok:
+ return ''
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ try:
+ result = client.service.stamped(
+ xml, self._auth['user'], self._auth['pass'])
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ return self._check_result(method, result)
+
+ def estatus_xml(self, uuid):
+ method = 'timbra'
+ if not self._validate_uuid(uuid):
+ return ''
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ try:
+ result = client.service.query_pending(
+ self._auth['USER'], self._auth['PASS'], uuid)
+ return result.status
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ def cancel_xml(self, rfc, uuid, cer, key):
+ # ~ for u in uuids:
+ # ~ if not self._validate_uuid(u):
+ # ~ return ''
+
+ method = 'cancel'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ uuid_type = client.get_type('ns1:UUIDS')
+ sa = client.get_type('ns0:stringArray')
+
+ args = {
+ 'UUIDS': uuid_type(uuids=sa(string=uuid)),
+ 'username': self._auth['USER'],
+ 'password': self._auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'cer': cer,
+ 'key': key,
+ 'store_pending': False,
+ }
+ try:
+ result = client.service.cancel(**args)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ if result.CodEstatus and self.codes['205'] in result.CodEstatus:
+ self.error = result.CodEstatus
+ return ''
+
+ return result
+
+ def cancel_signature(self, file_xml):
+ method = 'cancel'
+ if os.path.isfile(file_xml):
+ root = etree.parse(file_xml).getroot()
+ else:
+ root = etree.fromstring(file_xml.encode())
+
+ xml = etree.tostring(root)
+
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'username': self._auth['USER'],
+ 'password': self._auth['PASS'],
+ 'xml': xml,
+ 'store_pending': False,
+ }
+
+ try:
+ result = client.service.cancel_signature(**args)
+ return result
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ def get_acuse(self, rfc, uuids, type_acuse='C'):
+ for u in uuids:
+ if not self._validate_uuid(u):
+ return ''
+
+ method = 'cancel'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'username': self._auth['USER'],
+ 'password': self._auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'uuid': '',
+ 'type': type_acuse,
+ }
+ try:
+ result = []
+ for u in uuids:
+ args['uuid'] = u
+ r = client.service.get_receipt(**args)
+ result.append(r)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ return result
+
+ def estatus_cancel(self, uuids):
+ for u in uuids:
+ if not self._validate_uuid(u):
+ return ''
+
+ method = 'cancel'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+
+ args = {
+ 'username': self._auth['USER'],
+ 'password': self._auth['PASS'],
+ 'uuid': '',
+ }
+ try:
+ result = []
+ for u in uuids:
+ args['uuid'] = u
+ r = client.service.query_pending_cancellation(**args)
+ result.append(r)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ return result
+
+ def add_token(self, rfc, email):
+ """Agrega un nuevo token al cliente para timbrado.
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del cliente, ya debe existir
+ email (str): El correo del cliente, funciona como USER al timbrar
+
+ Returns:
+ dict
+ 'username': 'username',
+ 'status': True or False
+ 'name': 'name',
+ 'success': True or False
+ 'token': 'Token de timbrado',
+ 'message': None
+ """
+ auth = AUTH['RESELLER']
+
+ method = 'util'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'name': rfc,
+ 'token_username': email,
+ 'taxpayer_id': rfc,
+ 'status': True,
+ }
+ try:
+ result = client.service.add_token(**args)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ return result
+
+ def get_date(self):
+ method = 'util'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ try:
+ result = client.service.datetime(AUTH['USER'], AUTH['PASS'])
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ if result.error:
+ self.error = result.error
+ return
+
+ return result.datetime
+
+ def add_client(self, rfc, type_user=False):
+ """Agrega un nuevo cliente para timbrado.
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del nuevo cliente
+
+ Kwargs:
+ type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
+
+ Returns:
+ dict
+ 'message':
+ 'Account Created successfully'
+ 'Account Already exists'
+ 'success': True or False
+ """
+ auth = AUTH['RESELLER']
+
+ tu = {False: 'P', True: 'O'}
+ method = 'client'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'reseller_username': auth['USER'],
+ 'reseller_password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'type_user': tu[type_user],
+ 'added': datetime.datetime.now().isoformat()[:19],
+ }
+ try:
+ result = client.service.add(**args)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ return result
+
+ def edit_client(self, rfc, status=True):
+ """
+ Se requiere cuenta de reseller para usar este método
+ status = 'A' or 'S'
+ """
+ auth = AUTH['RESELLER']
+
+ sv = {False: 'S', True: 'A'}
+ method = 'client'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'reseller_username': auth['USER'],
+ 'reseller_password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'status': sv[status],
+ }
+ try:
+ result = client.service.edit(**args)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ return result
+
+ def get_client(self, rfc):
+ """Regresa el estatus del cliente
+ .
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del emisor
+
+ Returns:
+ dict
+ 'message': None,
+ 'users': {
+ 'ResellerUser': [
+ {
+ 'status': 'A',
+ 'counter': 0,
+ 'taxpayer_id': '',
+ 'credit': 0
+ }
+ ]
+ } or None si no existe
+ """
+ auth = AUTH['RESELLER']
+
+ method = 'client'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'reseller_username': auth['USER'],
+ 'reseller_password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ }
+
+ try:
+ result = client.service.get(**args)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+ except TransportError as e:
+ self.error = str(e)
+ return ''
+
+ return result
+
+ def assign_client(self, rfc, credit):
+ """Agregar credito a un emisor
+
+ Se requiere cuenta de reseller para usar este método
+
+ Args:
+ rfc (str): El RFC del emisor, debe existir
+ credit (int): Cantidad de folios a agregar
+
+ Returns:
+ dict
+ 'success': True or False,
+ 'credit': nuevo credito despues de agregar or None
+ 'message':
+ 'Success, added {credit} of credit to {RFC}'
+ 'RFC no encontrado'
+ """
+ auth = AUTH['RESELLER']
+
+ method = 'client'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'username': auth['USER'],
+ 'password': auth['PASS'],
+ 'taxpayer_id': rfc,
+ 'credit': credit,
+ }
+ try:
+ result = client.service.assign(**args)
+ except Fault as e:
+ self.error = str(e)
+ return ''
+
+ return result
+
+ def client_get_timbres(self, rfc):
+ method = 'client'
+ client = Client(
+ URL[method], transport=self._transport, plugins=self._plugins)
+ args = {
+ 'reseller_username': self._auth['USER'],
+ 'reseller_password': self._auth['PASS'],
+ 'taxpayer_id': rfc,
+ }
+
+ try:
+ self.result = client.service.get(**args)
+ except Fault as e:
+ self.error = str(e)
+ return 0
+ except TransportError as e:
+ self.error = str(e)
+ return 0
+ except ConnectionError:
+ self.error = 'Verifica la conexión a internet'
+ return 0
+
+ success = bool(self.result.users)
+ if not success:
+ self.error = self.result.message or 'RFC no existe'
+ return 0
+
+ return self.result.users.ResellerUser[0].credit
+
+
+def _get_data_sat(path):
+ BF = 'string(//*[local-name()="{}"]/@{})'
+ NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
+
+ try:
+ if os.path.isfile(path):
+ tree = etree.parse(path).getroot()
+ else:
+ tree = etree.fromstring(path.encode())
+
+ data = {}
+ emisor = escape(
+ tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
+ tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
+ )
+ receptor = escape(
+ tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
+ tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
+ )
+ data['total'] = tree.get('total') or tree.get('Total')
+ data['emisor'] = emisor
+ data['receptor'] = receptor
+ data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
+ except Exception as e:
+ print (e)
+ return {}
+
+ return '?re={emisor}&rr={receptor}&tt={total}&id={uuid}'.format(**data)
+
+
+def get_status_sat(xml):
+ data = _get_data_sat(xml)
+ if not data:
+ return 'XML inválido'
+
+ data = """
+
+
+
+
+
+ {}
+
+
+
+ """.format(data)
+ headers = {
+ 'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
+ 'Content-type': 'text/xml; charset="UTF-8"'
+ }
+ URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc'
+
+ try:
+ result = requests.post(URL, data=data, headers=headers)
+ tree = etree.fromstring(result.text)
+ node = tree.xpath("//*[local-name() = 'Estado']")[0]
+ except Exception as e:
+ return 'Error: {}'.format(str(e))
+
+ return node.text
+
+
+def main():
+ return
+
+
+if __name__ == '__main__':
+ main()
diff --git a/source/tests/tests_finkok.py b/source/tests/tests_finkok.py
new file mode 100644
index 0000000..0197e55
--- /dev/null
+++ b/source/tests/tests_finkok.py
@@ -0,0 +1,239 @@
+#!/usr/bin/env python3
+
+import base64
+import datetime
+import sys
+import time
+import unittest
+import uuid
+import lxml.etree as ET
+from io import BytesIO
+from pathlib import Path
+
+sys.path.append('..')
+from pycert import SATCertificate
+from finkok import PACFinkok
+
+
+NAME = 'finkok'
+
+
+TEMPLATE_CFDI = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+TEMPLATE_CANCEL = """
+
+ {uuid}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+class TestCfdi(object):
+
+ def __init__(self):
+ self._xml = ''
+ self._make_cfdi()
+
+ @property
+ def xml(self):
+ return self._xml.decode()
+
+ def _make_cfdi(self):
+ path = Path(__file__)
+ path_cer = Path(path.parent).joinpath('certificados', f'{NAME}.cer')
+ path_key = Path(path.parent).joinpath('certificados', f'{NAME}.enc')
+ path_xslt = Path(path.parent).joinpath('xslt', 'cadena.xslt')
+ self._cer_ori = cer = path_cer.read_bytes()
+ self._key_ori = key = path_key.read_bytes()
+
+ self._cert = SATCertificate(cer, key)
+ self._doc = ET.parse(BytesIO(TEMPLATE_CFDI.encode()))
+ self._root = self._doc.getroot()
+ self._root.attrib['Fecha'] = datetime.datetime.now().isoformat()[:19]
+ self._root.attrib['NoCertificado'] = self._cert.serial_number
+ self._root.attrib['Certificado'] = self._cert.cer_txt
+
+ self._add_stamp(path_xslt)
+
+ self._xml = ET.tostring(self._root,
+ pretty_print=True, doctype='')
+ return
+
+ def _add_stamp(self, path_xslt):
+ xslt = open(path_xslt, 'rb')
+ transfor = ET.XSLT(ET.parse(xslt))
+ cadena = str(transfor(self._doc)).encode()
+ stamp = self._cert.sign(cadena)
+ self._root.attrib['Sello'] = stamp
+ xslt.close()
+ return
+
+ def sign_xml(self, template):
+ tree = ET.fromstring(template.encode())
+ tree = self._cert.sign_xml(tree)
+ xml = ET.tostring(tree).decode()
+ return xml
+
+ @property
+ def cert(self):
+ cer = base64.b64encode(self._cer_ori).decode()
+ key = base64.b64encode(self._key_ori).decode()
+ return key, cer
+
+
+class TestStamp(unittest.TestCase):
+
+ def setUp(self):
+ print(f'In method: {self._testMethodName}')
+ self.pac = PACFinkok()
+
+ def test_cfdi_stamp(self):
+ cfdi = TestCfdi().xml
+ result = self.pac.stamp(cfdi)
+ cfdi_uuid = result['uuid']
+
+ self.assertFalse(bool(self.pac.error))
+ self.assertTrue(bool(uuid.UUID(cfdi_uuid)))
+
+ def test_cfdi_cancel(self):
+ expected = '201'
+ cfdi = TestCfdi()
+ result = self.pac.stamp(cfdi.xml)
+ cfdi_uuid = self.pac.cfdi_uuid
+
+ self.assertFalse(bool(self.pac.error))
+ self.assertTrue(bool(uuid.UUID(cfdi_uuid)))
+
+ time.sleep(1)
+ cert = cfdi.cert
+ info = {
+ 'key': cert[0],
+ 'cer': cert[1],
+ 'pass': '12345678a',
+ 'tipo': 'cfdi3.3',
+ }
+ result = self.pac.cancel(result, info)
+ self.assertFalse(bool(self.pac.error))
+
+ tree = ET.fromstring(result)
+ cancel_uuid = tree.xpath('string(//Acuse/Folios/UUID)')
+ status = tree.xpath('string(//Acuse/Folios/EstatusUUID)')
+
+ self.assertEqual(cfdi_uuid, cancel_uuid)
+ self.assertEqual(status, expected)
+
+ def test_cfdi_cancel_xml(self):
+ expected = '201'
+ cfdi = TestCfdi()
+ result = self.pac.stamp(cfdi.xml)
+ cfdi_uuid = self.pac.cfdi_uuid
+
+ self.assertFalse(bool(self.pac.error))
+ self.assertTrue(bool(uuid.UUID(cfdi_uuid)))
+
+ NS_CFDI = {
+ 'cfdi': 'http://www.sat.gob.mx/cfd/3',
+ 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
+ }
+ tree = ET.fromstring(result.encode())
+ rfc_emisor = tree.xpath(
+ 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
+ namespaces=NS_CFDI)
+
+ time.sleep(1)
+ data = {
+ 'rfc': rfc_emisor,
+ 'fecha': datetime.datetime.now().isoformat()[:19],
+ 'uuid': cfdi_uuid,
+ }
+ template = TEMPLATE_CANCEL.format(**data)
+ sign_xml = cfdi.sign_xml(template)
+ info = {
+ 'tipo': 'cfdi3.3',
+ }
+ result = self.pac.cancel_xml(result, sign_xml, info)
+ tree = ET.fromstring(result)
+ uid = tree.xpath('string(//Acuse/Folios/UUID)')
+ status = tree.xpath('string(//Acuse/Folios/EstatusUUID)')
+
+ self.assertEqual(cfdi_uuid, uid)
+ self.assertEqual(status, expected)
+
+ def test_cfdi_status(self):
+ expected = ''
+ cfdi = TestCfdi()
+ result = self.pac.stamp(cfdi.xml)
+ cfdi_uuid = self.pac.cfdi_uuid
+
+ self.assertFalse(bool(self.pac.error))
+ self.assertTrue(bool(uuid.UUID(cfdi_uuid)))
+
+ NS_CFDI = {
+ 'cfdi': 'http://www.sat.gob.mx/cfd/3',
+ 'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
+ }
+ tree = ET.fromstring(result.encode())
+ rfc_emisor = tree.xpath(
+ 'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
+ namespaces=NS_CFDI)
+ rfc_receptor = tree.xpath(
+ 'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)',
+ namespaces=NS_CFDI)
+ total = tree.xpath(
+ 'string(//cfdi:Comprobante/@Total)',
+ namespaces=NS_CFDI)
+
+ time.sleep(3)
+ data = {
+ 'rfc_receptor': rfc_receptor,
+ 'rfc_emisor': rfc_emisor,
+ 'total': total,
+ 'uuid': cfdi_uuid,
+ }
+ result = self.pac.status(data)
+ self.assertEqual(result, expected)
+
+
+if __name__ == '__main__':
+ unittest.main()