Add pac Finkok

This commit is contained in:
Mauricio Baeza 2021-01-01 17:31:23 -06:00
parent d3c7c17f91
commit a3092cf3c6
7 changed files with 1814 additions and 2 deletions

View File

@ -78,8 +78,6 @@ class PACComercioDigital(object):
def __init__(self):
self.error = ''
# ~ self.cfdi_uuid = ''
# ~ self.date_stamped = ''
def _error(self, msg):
self.error = str(msg)

View File

@ -0,0 +1,3 @@
#!/usr/bin/env python3
from .finkok import PACFinkok

View File

@ -0,0 +1,38 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
# ~ Siempre consulta la documentación de PAC
# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC
# ~ NO cambies las credenciales de prueba
DEBUG = True
AUTH = {
'user': '',
'pass': '',
}
if DEBUG:
AUTH = {
'user': 'pruebas-finkok@correolibre.net',
'pass': '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366',
}

169
source/finkok/finkok.py Normal file
View File

@ -0,0 +1,169 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import logging
import os
import re
from io import BytesIO
from xml.sax.saxutils import unescape
import lxml.etree as ET
from zeep import Client
from zeep.plugins import Plugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
from .conf import DEBUG, AUTH
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.ERROR)
TIMEOUT = 10
DEBUG_SOAP = False
class DebugPlugin(Plugin):
def _to_string(self, envelope, name):
if DEBUG_SOAP:
data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
path = f'/tmp/soap_{name}.xml'
with open(path, 'w') as f:
f.write(data)
return
def egress(self, envelope, http_headers, operation, binding_options):
self._to_string(envelope, 'request')
return envelope, http_headers
def ingress(self, envelope, http_headers, operation):
self._to_string(envelope, 'response')
return envelope, http_headers
class PACFinkok(object):
WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
if DEBUG:
WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
URL = {
'quick_stamp': False,
'timbra': WS.format('stamp'),
'cancel': WS.format('cancel'),
'client': WS.format('registration'),
'util': WS.format('utilities'),
}
CODE = {
'200': 'Comprobante timbrado satisfactoriamente',
'205': 'No Encontrado',
'307': 'Comprobante timbrado previamente',
'702': 'No se encontro el RFC del emisor',
'IP': 'Invalid Passphrase',
'IPMSG': 'Frase de paso inválida',
'NE': 'No Encontrado',
}
def __init__(self):
self._error = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = [DebugPlugin()]
def _validate_result(self, result):
if hasattr(result, 'CodEstatus'):
ce = result.CodEstatus
if ce == self.CODE['IP']:
self._error = self.CODE['IPMSG']
return {}
if self.CODE['NE'] in ce:
self._error = 'UUID ' + self.CODE['NE']
return {}
if self.CODE['200'] != ce:
log.error('CodEstatus', type(ce), ce)
return result
if hasattr(result, 'Incidencias'):
fault = result.Incidencias.Incidencia[0]
cod_error = fault.CodigoError.encode('utf-8')
msg_error = fault.MensajeIncidencia.encode('utf-8')
error = 'Error: {}\n{}'.format(cod_error, msg_error)
self._error = self.CODE.get(cod_error, error)
return {}
return result
def _get_result(self, client, method, args):
self._error = ''
try:
result = getattr(client.service, method)(**args)
except Fault as e:
self._error = str(e)
return {}
except TransportError as e:
if '413' in str(e):
self._error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self._error = str(e)
return {}
except ConnectionError as e:
msg = '502 - Error de conexión'
self._error = msg
return {}
return self._validate_result(result)
def _to_string(self, data):
root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
xml = ET.tostring(root,
pretty_print=True, xml_declaration=True, encoding='utf-8')
return xml.decode('utf-8')
def stamp(self, cfdi, auth={}):
if DEBUG or not auth:
auth = AUTH
method = 'timbra'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'xml': cfdi,
}
result = self._get_result(client, 'stamp', args)
if self.error:
return ''
data = {
'xml': self._to_string(result.xml),
'uuid': result.UUID,
'date': result.Fecha,
}
return data

729
source/finkok/finkok1.py Normal file
View File

@ -0,0 +1,729 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import os
import re
from io import BytesIO
from xml.sax.saxutils import unescape
import lxml.etree as ET
from zeep import Client
from zeep.plugins import Plugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
from conf import DEBUG, FINKOK
TIMEOUT = 10
DEBUG_SOAP = False
class DebugPlugin(Plugin):
def _to_string(self, envelope, name):
if DEBUG_SOAP:
data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
path = f'/tmp/soap_{name}.xml'
with open(path, 'w') as f:
f.write(data)
return
def egress(self, envelope, http_headers, operation, binding_options):
self._to_string(envelope, 'request')
return envelope, http_headers
def ingress(self, envelope, http_headers, operation):
self._to_string(envelope, 'response')
return envelope, http_headers
class PACFinkok(object):
URL = {
'quick_stamp': False,
'timbra': FINKOK['WS'].format('stamp'),
'cancel': FINKOK['WS'].format('cancel'),
'client': FINKOK['WS'].format('registration'),
'util': FINKOK['WS'].format('utilities'),
}
CODE = {
'200': 'Comprobante timbrado satisfactoriamente',
'205': 'No Encontrado',
'307': 'Comprobante timbrado previamente',
'702': 'No se encontro el RFC del emisor',
'IP': 'Invalid Passphrase',
'IPMSG': 'Frase de paso inválida',
'NE': 'No Encontrado',
}
def __init__(self):
self._error = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = [DebugPlugin()]
def _validate_result(self, result):
if hasattr(result, 'CodEstatus'):
ce = result.CodEstatus
if ce == self.CODE['IP']:
self.error = self.CODE['IPMSG']
return {}
if self.CODE['NE'] in ce:
self.error = 'UUID ' + self.CODE['NE']
return {}
if self.CODE['200'] != ce:
print('CodEstatus', type(ce), ce)
return result
if hasattr(result, 'Incidencias'):
fault = result.Incidencias.Incidencia[0]
cod_error = fault.CodigoError.encode('utf-8')
msg_error = fault.MensajeIncidencia.encode('utf-8')
error = 'Error: {}\n{}'.format(cod_error, msg_error)
self.error = self.CODE.get(cod_error, error)
return {}
return result
def _get_result(self, client, method, args):
self.error = ''
try:
result = getattr(client.service, method)(**args)
except Fault as e:
self.error = str(e)
return {}
except TransportError as e:
if '413' in str(e):
self.error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self.error = str(e)
return {}
except ConnectionError as e:
msg = '502 - Error de conexión'
self.error = msg
return {}
return self._validate_result(result)
def cfdi_stamp(self, cfdi, auth={}):
if not auth:
auth = FINKOK['AUTH']
method = 'timbra'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'xml': cfdi,
}
result = self._get_result(client, 'stamp', args)
if self.error:
return {}
data = {
'xml': self._to_string(result.xml),
'uuid': result.UUID,
'fecha': result.Fecha,
}
return data
def cfdi_cancel(self, rfc, uuid, cer, key, auth={}):
if not auth:
auth = FINKOK['AUTH']
method = 'cancel'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
uuid_type = client.get_type('ns1:UUIDS')
sa = client.get_type('ns0:stringArray')
args = {
'UUIDS': uuid_type(uuids=sa(string=uuid)),
'username': auth['USER'],
'password': auth['PASS'],
'taxpayer_id': rfc,
'cer': cer,
'key': key,
'store_pending': False,
}
result = self._get_result(client, 'cancel', args)
if self.error:
return {}
return result
def cfdi_status(self, uuid, auth={}):
if not auth:
auth = FINKOK['AUTH']
method = 'timbra'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'uuid': uuid,
}
result = self._get_result(client, 'query_pending', args)
if self.error:
return {}
STATUS = {
'C': 'Cancelado',
'S': 'Timbrado, aún no eviado al SAT',
'F': 'Timbrado y enviado al SAT',
}
data = {
'estatus': STATUS[result.status],
'xml': self._to_string(unescape(result.xml)),
'fecha': result.date,
}
return data
def client_add(self, rfc, type_user=False):
"""Agrega un nuevo cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del nuevo cliente
Kwargs:
type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
Returns:
True or False
origin PAC
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = FINKOK['RESELLER']
tu = {True: 'O', False: 'P'}
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
'added': datetime.datetime.now().isoformat()[:19],
}
result = self._get_result(client, 'add', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
# ~ PAC success debería ser False
msg = 'Account Already exists'
if result.message == msg:
self.error = msg
return True
return result.success
def client_add_token(self, rfc, email):
"""Agrega un nuevo token al cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente, ya debe existir
email (str): El correo del cliente, funciona como USER al timbrar
Returns:
token (str): Es la contraseña para timbrar
origin PAC
dict
'username': 'username',
'status': True or False
'name': 'name',
'success': True or False
'token': 'Token de timbrado',
'message': None
"""
auth = FINKOK['RESELLER']
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'name': rfc,
'token_username': email,
'taxpayer_id': rfc,
'status': True,
}
result = self._get_result(client, 'add_token', args)
if self.error:
return ''
if not result.success:
self.error = result.message
return ''
return result.token
# ~ Send issue to PAC
def client_reset_token(self, email):
auth = FINKOK['RESELLER']
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'token': email,
}
result = self._get_result(client, 'reset_token', args)
if self.error:
return ''
if not result.success:
self.error = result.message
return ''
return result.token
def client_add_timbres(self, rfc, credit):
"""Agregar credito a un emisor
Se requiere cuenta de reseller
Args:
rfc (str): El RFC del emisor, debe existir
credit (int): Cantidad de folios a agregar
Returns:
dict
'success': True or False,
'credit': nuevo credito despues de agregar or None
'message':
'Success, added {credit} of credit to {RFC}.'
'RFC no encontrado'
"""
auth = FINKOK['RESELLER']
if not isinstance(credit, int):
self.error = 'El credito debe ser un entero'
return 0
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'taxpayer_id': rfc,
'credit': credit,
}
result = self._get_result(client, 'assign', args)
if self.error:
return ''
if not result.success:
self.error = result.message
return 0
return result.credit
def client_edit(self, rfc, status=True):
"""Edita el estatus (Activo o Suspendido) de un cliente
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente
Kwargs:
status (bool): True == 'A' == Activo or False == 'S' == Suspendido
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = FINKOK['RESELLER']
ts = {True: 'A', False: 'S'}
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
'status': ts[status],
}
result = self._get_result(client, 'edit', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
return True
def client_get(self, rfc):
"""Regresa el estatus del cliente
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del emisor
Returns:
dict
'message': None,
'users': {
'ResellerUser': [
{
'status': 'A',
'counter': 0,
'taxpayer_id': '',
'credit': 0
}
]
} or None si no existe
"""
auth = FINKOK['RESELLER']
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
}
try:
self.result = client.service.get(**args)
except Fault as e:
self.error = str(e)
return {}
except TransportError as e:
self.error = str(e)
return {}
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return {}
success = bool(self.result.users)
if not success:
self.error = self.result.message or 'RFC no existe'
return {}
data = self.result.users.ResellerUser[0]
client = {
'status': data.status,
'counter': data.counter,
'credit': data.credit,
}
return client
def client_get_timbres(self, rfc, auth={}):
"""Regresa los timbres restantes del cliente
Se pueden usar las credenciales de relleser o las credenciales del emisor
Args:
rfc (str): El RFC del emisor
Kwargs:
auth (dict): Credenciales del emisor
Returns:
int Cantidad de timbres restantes
"""
if not auth:
auth = FINKOK['RESELLER']
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
}
try:
self.result = client.service.get(**args)
except Fault as e:
self.error = str(e)
return 0
except TransportError as e:
self.error = str(e)
return 0
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return 0
success = bool(self.result.users)
if not success:
self.error = self.result.message or 'RFC no existe'
return 0
return self.result.users.ResellerUser[0].credit
def get_server_datetime(self):
"""Regresa la fecha y hora del servidor de timbrado del PAC
"""
auth = FINKOK['RESELLER']
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
try:
self.result = client.service.datetime(auth['USER'], auth['PASS'])
except Fault as e:
self.error = str(e)
return None
except TransportError as e:
self.error = str(e)
return None
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return None
try:
dt = datetime.datetime.strptime(
self.result.datetime, '%Y-%m-%dT%H:%M:%S')
except ValueError:
self.error = 'Error al obtener la fecha'
return None
return dt
def get_report_credit(self, rfc):
"""Obtiene un reporte de los timbres agregados
"""
auth = FINKOK['RESELLER']
args = {
'username': auth['USER'],
'password': auth['PASS'],
'taxpayer_id': rfc,
}
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
try:
self.result = client.service.report_credit(**args)
except Fault as e:
self.error = str(e)
return []
except TransportError as e:
self.error = str(e)
return []
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return []
if self.result.result is None:
# ~ PAC - Debería regresar RFC inexistente o sin registros
self.error = 'RFC no existe o no tiene registros'
return []
return self.result.result.ReportTotalCredit
def get_report_total(self, rfc, date_from, date_to, invoice_type='I'):
"""Obtiene un reporte del total de facturas timbradas
"""
auth = FINKOK['RESELLER']
args = {
'username': auth['USER'],
'password': auth['PASS'],
'taxpayer_id': rfc,
'date_from': date_from,
'date_to': date_to,
'invoice_type': invoice_type,
}
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
try:
self.result = client.service.report_total(**args)
except Fault as e:
self.error = str(e)
return 0
except TransportError as e:
self.error = str(e)
return 0
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return 0
if self.result.result is None:
# ~ PAC - Debería regresar RFC inexistente o sin registros
self.error = 'RFC no existe o no tiene registros'
return 0
return self.result.result.ReportTotal[0].total or 0
def get_report_uuid(self, rfc, date_from, date_to, invoice_type='I'):
"""Obtiene un reporte de los CFDI timbrados
"""
auth = FINKOK['RESELLER']
args = {
'username': auth['USER'],
'password': auth['PASS'],
'taxpayer_id': rfc,
'date_from': date_from,
'date_to': date_to,
'invoice_type': invoice_type,
}
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
try:
self.result = client.service.report_uuid(**args)
except Fault as e:
self.error = str(e)
return []
except TransportError as e:
self.error = str(e)
return []
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return []
if self.result.invoices is None:
# ~ PAC - Debería regresar RFC inexistente o sin registros
self.error = 'RFC no existe o no tiene registros'
return []
return self.result.invoices.ReportUUID
def _to_string(self, data):
root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
xml = ET.tostring(root,
pretty_print=True, xml_declaration=True, encoding='utf-8')
return xml.decode('utf-8')
def cfdi_get_by_xml(self, xml, auth):
if not auth:
auth = FINKOK['AUTH']
method = 'timbra'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'xml': xml,
}
try:
result = client.service.stamped(**args)
except Fault as e:
self.error = str(e)
return {}
except TransportError as e:
self.error = str(e)
return {}
except ConnectionError as e:
msg = '502 - Error de conexión'
self.error = msg
return {}
print(result)
error = 'Error: {}\n{}'.format(code_error, msg_error)
self.error = self.CODE.get(code_error, error)
return {}
def cfdi_get_by_uuid(self, uuid, rfc, invoice_type='I', auth={}):
if not auth:
auth = FINKOK['AUTH']
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'uuid': uuid,
'taxpayer_id': rfc,
'invoice_type': invoice_type,
}
try:
result = client.service.get_xml(**args)
except Fault as e:
self.error = str(e)
return {}
except TransportError as e:
self.error = str(e)
return {}
except ConnectionError as e:
msg = '502 - Error de conexión'
self.error = msg
return {}
print(result)
error = 'Error: {}\n{}'.format(code_error, msg_error)
self.error = self.CODE.get(code_error, error)
return {}
def main():
rfc = 'TEST740115999'
# ~ rfc = 'TCM970625MB1'
email = 'test999@empresalibre.mx'
pac = PACFinkok()
result = pac.client_get(rfc)
print(result)
result = pac.client_add_timbres(rfc, 10)
print(result)
return
if __name__ == '__main__':
main()

636
source/finkok/finkok2.py Normal file
View File

@ -0,0 +1,636 @@
#!/usr/bin/env python3
#~ import re
#~ from xml.etree import ElementTree as ET
#~ from requests import Request, Session, exceptions
import datetime
import hashlib
import os
import requests
import time
from lxml import etree
from xml.dom.minidom import parseString
from xml.sax.saxutils import escape, unescape
from uuid import UUID
from logbook import Logger
from zeep import Client
from zeep.plugins import HistoryPlugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
if __name__ == '__main__':
from configpac import DEBUG, TIMEOUT, AUTH, URL
else:
from .configpac import DEBUG, TIMEOUT, AUTH, URL
log = Logger('PAC')
#~ node = client.create_message(client.service, SERVICE, **args)
#~ print(etree.tostring(node, pretty_print=True).decode())
class Finkok(object):
def __init__(self, auth={}):
self.codes = URL['codes']
self.error = ''
self.message = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = None
self._history = None
self.uuid = ''
self.fecha = None
if DEBUG:
self._history = HistoryPlugin()
self._plugins = [self._history]
self._auth = AUTH
else:
self._auth = auth
def _debug(self):
if not DEBUG:
return
print('SEND', self._history.last_sent)
print('RESULT', self._history.last_received)
return
def _check_result(self, method, result):
# ~ print ('CODE', result.CodEstatus)
# ~ print ('INCIDENCIAS', result.Incidencias)
self.message = ''
MSG = {
'OK': 'Comprobante timbrado satisfactoriamente',
'307': 'Comprobante timbrado previamente',
}
status = result.CodEstatus
if status is None and result.Incidencias:
for i in result.Incidencias['Incidencia']:
self.error += 'Error: {}\n{}\n{}'.format(
i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo'])
return ''
if method == 'timbra' and status in (MSG['OK'], MSG['307']):
#~ print ('UUID', result.UUID)
#~ print ('FECHA', result.Fecha)
if status == MSG['307']:
self.message = MSG['307']
tree = parseString(result.xml)
response = tree.toprettyxml(encoding='utf-8').decode('utf-8')
self.uuid = result.UUID
self.fecha = result.Fecha
return response
def _load_file(self, path):
try:
with open(path, 'rb') as f:
data = f.read()
except Exception as e:
self.error = str(e)
return
return data
def _validate_xml(self, file_xml):
if os.path.isfile(file_xml):
try:
with open(file_xml, 'rb') as f:
xml = f.read()
except Exception as e:
self.error = str(e)
return False, ''
else:
xml = file_xml.encode('utf-8')
return True, xml
def _validate_uuid(self, uuid):
try:
UUID(uuid)
return True
except ValueError:
self.error = 'UUID no válido: {}'.format(uuid)
return False
def timbra_xml(self, file_xml):
self.error = ''
if not DEBUG and not self._auth:
self.error = 'Sin datos para timbrar'
return
method = 'timbra'
ok, xml = self._validate_xml(file_xml)
if not ok:
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'xml': xml,
}
if URL['quick_stamp']:
try:
result = client.service.quick_stamp(**args)
except Fault as e:
self.error = str(e)
return
else:
try:
result = client.service.stamp(**args)
except Fault as e:
self.error = str(e)
return
except TransportError as e:
if '413' in str(e):
self.error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self.error = str(e)
return
except ConnectionError as e:
msg = '502 - Error de conexión'
self.error = msg
return
return self._check_result(method, result)
def _get_xml(self, uuid):
if not self._validate_uuid(uuid):
return ''
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'uuid': uuid,
'taxpayer_id': self.rfc,
'invoice_type': 'I',
}
try:
result = client.service.get_xml(**args)
except Fault as e:
self.error = str(e)
return ''
except TransportError as e:
self.error = str(e)
return ''
if result.error:
self.error = result.error
return ''
tree = parseString(result.xml)
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
return xml
def recupera_xml(self, file_xml='', uuid=''):
self.error = ''
if uuid:
return self._get_xml(uuid)
method = 'timbra'
ok, xml = self._validate_xml(file_xml)
if not ok:
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.stamped(
xml, self._auth['user'], self._auth['pass'])
except Fault as e:
self.error = str(e)
return ''
return self._check_result(method, result)
def estatus_xml(self, uuid):
method = 'timbra'
if not self._validate_uuid(uuid):
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.query_pending(
self._auth['USER'], self._auth['PASS'], uuid)
return result.status
except Fault as e:
self.error = str(e)
return ''
def cancel_xml(self, rfc, uuid, cer, key):
# ~ for u in uuids:
# ~ if not self._validate_uuid(u):
# ~ return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
uuid_type = client.get_type('ns1:UUIDS')
sa = client.get_type('ns0:stringArray')
args = {
'UUIDS': uuid_type(uuids=sa(string=uuid)),
'username': self._auth['USER'],
'password': self._auth['PASS'],
'taxpayer_id': rfc,
'cer': cer,
'key': key,
'store_pending': False,
}
try:
result = client.service.cancel(**args)
except Fault as e:
self.error = str(e)
return ''
if result.CodEstatus and self.codes['205'] in result.CodEstatus:
self.error = result.CodEstatus
return ''
return result
def cancel_signature(self, file_xml):
method = 'cancel'
if os.path.isfile(file_xml):
root = etree.parse(file_xml).getroot()
else:
root = etree.fromstring(file_xml.encode())
xml = etree.tostring(root)
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'xml': xml,
'store_pending': False,
}
try:
result = client.service.cancel_signature(**args)
return result
except Fault as e:
self.error = str(e)
return ''
def get_acuse(self, rfc, uuids, type_acuse='C'):
for u in uuids:
if not self._validate_uuid(u):
return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'taxpayer_id': rfc,
'uuid': '',
'type': type_acuse,
}
try:
result = []
for u in uuids:
args['uuid'] = u
r = client.service.get_receipt(**args)
result.append(r)
except Fault as e:
self.error = str(e)
return ''
return result
def estatus_cancel(self, uuids):
for u in uuids:
if not self._validate_uuid(u):
return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'uuid': '',
}
try:
result = []
for u in uuids:
args['uuid'] = u
r = client.service.query_pending_cancellation(**args)
result.append(r)
except Fault as e:
self.error = str(e)
return ''
return result
def add_token(self, rfc, email):
"""Agrega un nuevo token al cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente, ya debe existir
email (str): El correo del cliente, funciona como USER al timbrar
Returns:
dict
'username': 'username',
'status': True or False
'name': 'name',
'success': True or False
'token': 'Token de timbrado',
'message': None
"""
auth = AUTH['RESELLER']
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'name': rfc,
'token_username': email,
'taxpayer_id': rfc,
'status': True,
}
try:
result = client.service.add_token(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def get_date(self):
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.datetime(AUTH['USER'], AUTH['PASS'])
except Fault as e:
self.error = str(e)
return ''
if result.error:
self.error = result.error
return
return result.datetime
def add_client(self, rfc, type_user=False):
"""Agrega un nuevo cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del nuevo cliente
Kwargs:
type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {False: 'P', True: 'O'}
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
'added': datetime.datetime.now().isoformat()[:19],
}
try:
result = client.service.add(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def edit_client(self, rfc, status=True):
"""
Se requiere cuenta de reseller para usar este método
status = 'A' or 'S'
"""
auth = AUTH['RESELLER']
sv = {False: 'S', True: 'A'}
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
'status': sv[status],
}
try:
result = client.service.edit(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def get_client(self, rfc):
"""Regresa el estatus del cliente
.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del emisor
Returns:
dict
'message': None,
'users': {
'ResellerUser': [
{
'status': 'A',
'counter': 0,
'taxpayer_id': '',
'credit': 0
}
]
} or None si no existe
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
}
try:
result = client.service.get(**args)
except Fault as e:
self.error = str(e)
return ''
except TransportError as e:
self.error = str(e)
return ''
return result
def assign_client(self, rfc, credit):
"""Agregar credito a un emisor
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del emisor, debe existir
credit (int): Cantidad de folios a agregar
Returns:
dict
'success': True or False,
'credit': nuevo credito despues de agregar or None
'message':
'Success, added {credit} of credit to {RFC}'
'RFC no encontrado'
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'taxpayer_id': rfc,
'credit': credit,
}
try:
result = client.service.assign(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def client_get_timbres(self, rfc):
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': self._auth['USER'],
'reseller_password': self._auth['PASS'],
'taxpayer_id': rfc,
}
try:
self.result = client.service.get(**args)
except Fault as e:
self.error = str(e)
return 0
except TransportError as e:
self.error = str(e)
return 0
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return 0
success = bool(self.result.users)
if not success:
self.error = self.result.message or 'RFC no existe'
return 0
return self.result.users.ResellerUser[0].credit
def _get_data_sat(path):
BF = 'string(//*[local-name()="{}"]/@{})'
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
try:
if os.path.isfile(path):
tree = etree.parse(path).getroot()
else:
tree = etree.fromstring(path.encode())
data = {}
emisor = escape(
tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
)
receptor = escape(
tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
)
data['total'] = tree.get('total') or tree.get('Total')
data['emisor'] = emisor
data['receptor'] = receptor
data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
except Exception as e:
print (e)
return {}
return '?re={emisor}&amp;rr={receptor}&amp;tt={total}&amp;id={uuid}'.format(**data)
def get_status_sat(xml):
data = _get_data_sat(xml)
if not data:
return 'XML inválido'
data = """<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<Consulta xmlns="http://tempuri.org/">
<expresionImpresa>
{}
</expresionImpresa>
</Consulta>
</soap:Body>
</soap:Envelope>""".format(data)
headers = {
'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
'Content-type': 'text/xml; charset="UTF-8"'
}
URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc'
try:
result = requests.post(URL, data=data, headers=headers)
tree = etree.fromstring(result.text)
node = tree.xpath("//*[local-name() = 'Estado']")[0]
except Exception as e:
return 'Error: {}'.format(str(e))
return node.text
def main():
return
if __name__ == '__main__':
main()

View File

@ -0,0 +1,239 @@
#!/usr/bin/env python3
import base64
import datetime
import sys
import time
import unittest
import uuid
import lxml.etree as ET
from io import BytesIO
from pathlib import Path
sys.path.append('..')
from pycert import SATCertificate
from finkok import PACFinkok
NAME = 'finkok'
TEMPLATE_CFDI = """<cfdi:Comprobante xmlns:cfdi="http://www.sat.gob.mx/cfd/3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" LugarExpedicion="06850" Moneda="MXN" SubTotal="10000.00" TipoCambio="1" TipoDeComprobante="I" Total="11600.00" FormaPago="01" MetodoPago="PUE" Version="3.3" xsi:schemaLocation="http://www.sat.gob.mx/cfd/3 http://www.sat.gob.mx/sitio_internet/cfd/3/cfdv33.xsd">
<cfdi:Emisor Rfc="EKU9003173C9" RegimenFiscal="601"/>
<cfdi:Receptor Rfc="BASM740115RW0" UsoCFDI="G01"/>
<cfdi:Conceptos>
<cfdi:Concepto Cantidad="1.0" ClaveProdServ="60121001" ClaveUnidad="ACT" Descripcion="Asesoría en desarrollo" Importe="10000.00" ValorUnitario="10000.00">
<cfdi:Impuestos>
<cfdi:Traslados>
<cfdi:Traslado Base="10000.00" Importe="1600.00" Impuesto="002" TasaOCuota="0.160000" TipoFactor="Tasa"/>
</cfdi:Traslados>
</cfdi:Impuestos>
</cfdi:Concepto>
</cfdi:Conceptos>
<cfdi:Impuestos TotalImpuestosTrasladados="1600.00">
<cfdi:Traslados>
<cfdi:Traslado Importe="1600.00" Impuesto="002" TasaOCuota="0.160000" TipoFactor="Tasa"/>
</cfdi:Traslados>
</cfdi:Impuestos>
</cfdi:Comprobante>
"""
TEMPLATE_CANCEL = """<Cancelacion RfcEmisor="{rfc}" Fecha="{fecha}" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://cancelacfd.sat.gob.mx">
<Folios>
<UUID>{uuid}</UUID>
</Folios>
<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">
<SignedInfo>
<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315" />
<SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1" />
<Reference URI="">
<Transforms>
<Transform Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature" />
</Transforms>
<DigestMethod Algorithm="http://www.w3.org/2000/09/xmldsig#sha1" />
<DigestValue />
</Reference>
</SignedInfo>
<SignatureValue />
<KeyInfo>
<X509Data>
<X509SubjectName />
<X509IssuerSerial />
<X509Certificate />
</X509Data>
<KeyValue />
</KeyInfo>
</Signature>
</Cancelacion>
"""
class TestCfdi(object):
def __init__(self):
self._xml = ''
self._make_cfdi()
@property
def xml(self):
return self._xml.decode()
def _make_cfdi(self):
path = Path(__file__)
path_cer = Path(path.parent).joinpath('certificados', f'{NAME}.cer')
path_key = Path(path.parent).joinpath('certificados', f'{NAME}.enc')
path_xslt = Path(path.parent).joinpath('xslt', 'cadena.xslt')
self._cer_ori = cer = path_cer.read_bytes()
self._key_ori = key = path_key.read_bytes()
self._cert = SATCertificate(cer, key)
self._doc = ET.parse(BytesIO(TEMPLATE_CFDI.encode()))
self._root = self._doc.getroot()
self._root.attrib['Fecha'] = datetime.datetime.now().isoformat()[:19]
self._root.attrib['NoCertificado'] = self._cert.serial_number
self._root.attrib['Certificado'] = self._cert.cer_txt
self._add_stamp(path_xslt)
self._xml = ET.tostring(self._root,
pretty_print=True, doctype='<?xml version="1.0" encoding="utf-8"?>')
return
def _add_stamp(self, path_xslt):
xslt = open(path_xslt, 'rb')
transfor = ET.XSLT(ET.parse(xslt))
cadena = str(transfor(self._doc)).encode()
stamp = self._cert.sign(cadena)
self._root.attrib['Sello'] = stamp
xslt.close()
return
def sign_xml(self, template):
tree = ET.fromstring(template.encode())
tree = self._cert.sign_xml(tree)
xml = ET.tostring(tree).decode()
return xml
@property
def cert(self):
cer = base64.b64encode(self._cer_ori).decode()
key = base64.b64encode(self._key_ori).decode()
return key, cer
class TestStamp(unittest.TestCase):
def setUp(self):
print(f'In method: {self._testMethodName}')
self.pac = PACFinkok()
def test_cfdi_stamp(self):
cfdi = TestCfdi().xml
result = self.pac.stamp(cfdi)
cfdi_uuid = result['uuid']
self.assertFalse(bool(self.pac.error))
self.assertTrue(bool(uuid.UUID(cfdi_uuid)))
def test_cfdi_cancel(self):
expected = '201'
cfdi = TestCfdi()
result = self.pac.stamp(cfdi.xml)
cfdi_uuid = self.pac.cfdi_uuid
self.assertFalse(bool(self.pac.error))
self.assertTrue(bool(uuid.UUID(cfdi_uuid)))
time.sleep(1)
cert = cfdi.cert
info = {
'key': cert[0],
'cer': cert[1],
'pass': '12345678a',
'tipo': 'cfdi3.3',
}
result = self.pac.cancel(result, info)
self.assertFalse(bool(self.pac.error))
tree = ET.fromstring(result)
cancel_uuid = tree.xpath('string(//Acuse/Folios/UUID)')
status = tree.xpath('string(//Acuse/Folios/EstatusUUID)')
self.assertEqual(cfdi_uuid, cancel_uuid)
self.assertEqual(status, expected)
def test_cfdi_cancel_xml(self):
expected = '201'
cfdi = TestCfdi()
result = self.pac.stamp(cfdi.xml)
cfdi_uuid = self.pac.cfdi_uuid
self.assertFalse(bool(self.pac.error))
self.assertTrue(bool(uuid.UUID(cfdi_uuid)))
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(result.encode())
rfc_emisor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
namespaces=NS_CFDI)
time.sleep(1)
data = {
'rfc': rfc_emisor,
'fecha': datetime.datetime.now().isoformat()[:19],
'uuid': cfdi_uuid,
}
template = TEMPLATE_CANCEL.format(**data)
sign_xml = cfdi.sign_xml(template)
info = {
'tipo': 'cfdi3.3',
}
result = self.pac.cancel_xml(result, sign_xml, info)
tree = ET.fromstring(result)
uid = tree.xpath('string(//Acuse/Folios/UUID)')
status = tree.xpath('string(//Acuse/Folios/EstatusUUID)')
self.assertEqual(cfdi_uuid, uid)
self.assertEqual(status, expected)
def test_cfdi_status(self):
expected = ''
cfdi = TestCfdi()
result = self.pac.stamp(cfdi.xml)
cfdi_uuid = self.pac.cfdi_uuid
self.assertFalse(bool(self.pac.error))
self.assertTrue(bool(uuid.UUID(cfdi_uuid)))
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(result.encode())
rfc_emisor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
namespaces=NS_CFDI)
rfc_receptor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Receptor/@Rfc)',
namespaces=NS_CFDI)
total = tree.xpath(
'string(//cfdi:Comprobante/@Total)',
namespaces=NS_CFDI)
time.sleep(3)
data = {
'rfc_receptor': rfc_receptor,
'rfc_emisor': rfc_emisor,
'total': total,
'uuid': cfdi_uuid,
}
result = self.pac.status(data)
self.assertEqual(result, expected)
if __name__ == '__main__':
unittest.main()