Add pac Finkok refactory

This commit is contained in:
Mauricio Baeza 2021-01-02 18:16:15 -06:00
parent cf189b08fa
commit 348a7f6ecb
12 changed files with 627 additions and 859 deletions

View File

@ -1,13 +0,0 @@
#!/usr/bin/env python3
DEBUG = False
#~ Ecodex
ID_INTEGRADOR = ''
#~ Finkok
FINKOK= {
'USER': '',
'PASS': '',
}

View File

@ -1,62 +0,0 @@
#!/usr/bin/env python3
from .conf import DEBUG, FINKOK
DEBUG = DEBUG
TIMEOUT = 10
#~ PACs que han proporcionado un entorno de pruebas libre y abierto
#~ ecodex, finkok
PAC = 'finkok'
#~ IMPORTANTE: Si quieres hacer pruebas, con tu propio correo de usuario y
#~ contraseña, ponte en contacto con Finkok para que te asignen tus datos de
#~ acceso, consulta su documentación para ver las diferentes opciones de acceso.
#~ Si solo estas haciendo pruebas de timbrado y ancelación, con estos datos debería
#~ ser suficiente.
def finkok(debug):
USER = FINKOK['USER']
PASS = FINKOK['PASS']
TOKEN = ''
auth = {
'DEBUG': debug,
'USER': '',
'PASS': TOKEN or PASS,
'RESELLER': {'USER': USER, 'PASS': PASS}
}
if debug:
USER = 'pruebas-finkok@correolibre.net'
PASS = ''
TOKEN = '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366'
auth = {
'DEBUG': debug,
'USER': USER,
'PASS': TOKEN or PASS,
'RESELLER': {
'USER': '',
'PASS': ''
}
}
base_url = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
if debug:
base_url = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
url = {
'timbra': base_url.format('stamp'),
'quick_stamp': False,
'cancel': base_url.format('cancel'),
'client': base_url.format('registration'),
'util': base_url.format('utilities'),
'codes': {
'200': 'Comprobante timbrado satisfactoriamente',
'307': 'Comprobante timbrado previamente',
'205': 'No Encontrado',
}
}
return auth, url
AUTH, URL = globals()[PAC](DEBUG)

View File

@ -1,755 +0,0 @@
#!/usr/bin/env python3
#~ import re
#~ from xml.etree import ElementTree as ET
#~ from requests import Request, Session, exceptions
import datetime
import hashlib
import os
import requests
import time
from lxml import etree
from xml.dom.minidom import parseString
from xml.sax.saxutils import escape, unescape
from uuid import UUID
from logbook import Logger
from zeep import Client
from zeep.plugins import HistoryPlugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
if __name__ == '__main__':
from configpac import DEBUG, TIMEOUT, AUTH, URL
else:
from .configpac import DEBUG, TIMEOUT, AUTH, URL
log = Logger('PAC')
#~ node = client.create_message(client.service, SERVICE, **args)
#~ print(etree.tostring(node, pretty_print=True).decode())
class Ecodex(object):
def __init__(self, auth, url):
self.auth = auth
self.url = url
self.codes = self.url['codes']
self.error = ''
self.message = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = None
self._history = None
if DEBUG:
self._history = HistoryPlugin()
self._plugins = [self._history]
def _get_token(self, rfc):
client = Client(self.url['seguridad'],
transport=self._transport, plugins=self._plugins)
try:
result = client.service.ObtenerToken(rfc, self._get_epoch())
except Fault as e:
self.error = str(e)
log.error(self.error)
return ''
s = '{}|{}'.format(self.auth['ID'], result.Token)
return hashlib.sha1(s.encode()).hexdigest()
def _get_token_rest(self, rfc):
data = {
'rfc': rfc,
'grant_type': 'authorization_token',
}
headers = {'Content-type': 'application/x-www-form-urlencoded'}
result = requests.post(URL['token'], data=data, headers=headers)
data = result.json()
s = '{}|{}'.format(AUTH['ID'], data['service_token'])
return hashlib.sha1(s.encode()).hexdigest(), data['access_token']
def _validate_xml(self, xml):
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
if os.path.isfile(xml):
tree = etree.parse(xml).getroot()
else:
tree = etree.fromstring(xml.encode())
fecha = tree.get('Fecha')
rfc = tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
data = {
'ComprobanteXML': etree.tostring(tree).decode(),
'RFC': rfc,
'Token': self._get_token(rfc),
'TransaccionID': self._get_epoch(fecha),
}
return data
def _get_by_hash(self, sh, rfc):
token, access_token = self._get_token_rest(rfc)
url = URL['hash'].format(sh)
headers = {
'Authorization': 'Bearer {}'.format(access_token),
'X-Auth-Token': token,
}
result = requests.get(url, headers=headers)
if result.status_code == 200:
print (result.json())
return
def timbra_xml(self, xml):
data = self._validate_xml(xml)
client = Client(self.url['timbra'],
transport=self._transport, plugins=self._plugins)
try:
result = client.service.TimbraXML(**data)
except Fault as e:
error = str(e)
if self.codes['HASH'] in error:
sh = error.split(' ')[3]
return self._get_by_hash(sh[:40], data['RFC'])
self.error = error
return ''
tree = parseString(result.ComprobanteXML.DatosXML)
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
return xml
def _get_epoch(self, date=None):
if isinstance(date, str):
f = '%Y-%m-%dT%H:%M:%S'
e = int(time.mktime(time.strptime(date, f)))
else:
date = datetime.datetime.now()
e = int(time.mktime(date.timetuple()))
return e
def estatus_cuenta(self, rfc):
#~ Codigos:
#~ 100 = Cuenta encontrada
#~ 101 = RFC no dado de alta en el sistema ECODEX
token = self._get_token(rfc)
if not token:
return {}
data = {
'RFC': rfc,
'Token': token,
'TransaccionID': self._get_epoch()
}
client = Client(URL['clients'],
transport=self._transport, plugins=self._plugins)
try:
result = client.service.EstatusCuenta(**data)
except Fault as e:
log.error(str(e))
return
#~ print (result)
return result.Estatus
class Finkok(object):
def __init__(self, auth={}):
self.codes = URL['codes']
self.error = ''
self.message = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = None
self._history = None
self.uuid = ''
self.fecha = None
if DEBUG:
self._history = HistoryPlugin()
self._plugins = [self._history]
self._auth = AUTH
else:
self._auth = auth
def _debug(self):
if not DEBUG:
return
print('SEND', self._history.last_sent)
print('RESULT', self._history.last_received)
return
def _check_result(self, method, result):
# ~ print ('CODE', result.CodEstatus)
# ~ print ('INCIDENCIAS', result.Incidencias)
self.message = ''
MSG = {
'OK': 'Comprobante timbrado satisfactoriamente',
'307': 'Comprobante timbrado previamente',
}
status = result.CodEstatus
if status is None and result.Incidencias:
for i in result.Incidencias['Incidencia']:
self.error += 'Error: {}\n{}\n{}'.format(
i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo'])
return ''
if method == 'timbra' and status in (MSG['OK'], MSG['307']):
#~ print ('UUID', result.UUID)
#~ print ('FECHA', result.Fecha)
if status == MSG['307']:
self.message = MSG['307']
tree = parseString(result.xml)
response = tree.toprettyxml(encoding='utf-8').decode('utf-8')
self.uuid = result.UUID
self.fecha = result.Fecha
return response
def _load_file(self, path):
try:
with open(path, 'rb') as f:
data = f.read()
except Exception as e:
self.error = str(e)
return
return data
def _validate_xml(self, file_xml):
if os.path.isfile(file_xml):
try:
with open(file_xml, 'rb') as f:
xml = f.read()
except Exception as e:
self.error = str(e)
return False, ''
else:
xml = file_xml.encode('utf-8')
return True, xml
def _validate_uuid(self, uuid):
try:
UUID(uuid)
return True
except ValueError:
self.error = 'UUID no válido: {}'.format(uuid)
return False
def timbra_xml(self, file_xml):
self.error = ''
if not DEBUG and not self._auth:
self.error = 'Sin datos para timbrar'
return
method = 'timbra'
ok, xml = self._validate_xml(file_xml)
if not ok:
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'xml': xml,
}
if URL['quick_stamp']:
try:
result = client.service.quick_stamp(**args)
except Fault as e:
self.error = str(e)
return
else:
try:
result = client.service.stamp(**args)
except Fault as e:
self.error = str(e)
return
except TransportError as e:
if '413' in str(e):
self.error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self.error = str(e)
return
except ConnectionError as e:
msg = '502 - Error de conexión'
self.error = msg
return
return self._check_result(method, result)
def _get_xml(self, uuid):
if not self._validate_uuid(uuid):
return ''
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'uuid': uuid,
'taxpayer_id': self.rfc,
'invoice_type': 'I',
}
try:
result = client.service.get_xml(**args)
except Fault as e:
self.error = str(e)
return ''
except TransportError as e:
self.error = str(e)
return ''
if result.error:
self.error = result.error
return ''
tree = parseString(result.xml)
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
return xml
def recupera_xml(self, file_xml='', uuid=''):
self.error = ''
if uuid:
return self._get_xml(uuid)
method = 'timbra'
ok, xml = self._validate_xml(file_xml)
if not ok:
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.stamped(
xml, self._auth['user'], self._auth['pass'])
except Fault as e:
self.error = str(e)
return ''
return self._check_result(method, result)
def estatus_xml(self, uuid):
method = 'timbra'
if not self._validate_uuid(uuid):
return ''
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.query_pending(
self._auth['USER'], self._auth['PASS'], uuid)
return result.status
except Fault as e:
self.error = str(e)
return ''
def cancel_xml(self, rfc, uuid, cer, key):
# ~ for u in uuids:
# ~ if not self._validate_uuid(u):
# ~ return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
uuid_type = client.get_type('ns1:UUIDS')
sa = client.get_type('ns0:stringArray')
args = {
'UUIDS': uuid_type(uuids=sa(string=uuid)),
'username': self._auth['USER'],
'password': self._auth['PASS'],
'taxpayer_id': rfc,
'cer': cer,
'key': key,
'store_pending': False,
}
try:
result = client.service.cancel(**args)
except Fault as e:
self.error = str(e)
return ''
if result.CodEstatus and self.codes['205'] in result.CodEstatus:
self.error = result.CodEstatus
return ''
return result
def cancel_signature(self, file_xml):
method = 'cancel'
if os.path.isfile(file_xml):
root = etree.parse(file_xml).getroot()
else:
root = etree.fromstring(file_xml.encode())
xml = etree.tostring(root)
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'xml': xml,
'store_pending': False,
}
try:
result = client.service.cancel_signature(**args)
return result
except Fault as e:
self.error = str(e)
return ''
def get_acuse(self, rfc, uuids, type_acuse='C'):
for u in uuids:
if not self._validate_uuid(u):
return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'taxpayer_id': rfc,
'uuid': '',
'type': type_acuse,
}
try:
result = []
for u in uuids:
args['uuid'] = u
r = client.service.get_receipt(**args)
result.append(r)
except Fault as e:
self.error = str(e)
return ''
return result
def estatus_cancel(self, uuids):
for u in uuids:
if not self._validate_uuid(u):
return ''
method = 'cancel'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': self._auth['USER'],
'password': self._auth['PASS'],
'uuid': '',
}
try:
result = []
for u in uuids:
args['uuid'] = u
r = client.service.query_pending_cancellation(**args)
result.append(r)
except Fault as e:
self.error = str(e)
return ''
return result
def add_token(self, rfc, email):
"""Agrega un nuevo token al cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente, ya debe existir
email (str): El correo del cliente, funciona como USER al timbrar
Returns:
dict
'username': 'username',
'status': True or False
'name': 'name',
'success': True or False
'token': 'Token de timbrado',
'message': None
"""
auth = AUTH['RESELLER']
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'name': rfc,
'token_username': email,
'taxpayer_id': rfc,
'status': True,
}
try:
result = client.service.add_token(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def get_date(self):
method = 'util'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
try:
result = client.service.datetime(AUTH['USER'], AUTH['PASS'])
except Fault as e:
self.error = str(e)
return ''
if result.error:
self.error = result.error
return
return result.datetime
def add_client(self, rfc, type_user=False):
"""Agrega un nuevo cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del nuevo cliente
Kwargs:
type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {False: 'P', True: 'O'}
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
'added': datetime.datetime.now().isoformat()[:19],
}
try:
result = client.service.add(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def edit_client(self, rfc, status=True):
"""
Se requiere cuenta de reseller para usar este método
status = 'A' or 'S'
"""
auth = AUTH['RESELLER']
sv = {False: 'S', True: 'A'}
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
'status': sv[status],
}
try:
result = client.service.edit(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def get_client(self, rfc):
"""Regresa el estatus del cliente
.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del emisor
Returns:
dict
'message': None,
'users': {
'ResellerUser': [
{
'status': 'A',
'counter': 0,
'taxpayer_id': '',
'credit': 0
}
]
} or None si no existe
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['USER'],
'reseller_password': auth['PASS'],
'taxpayer_id': rfc,
}
try:
result = client.service.get(**args)
except Fault as e:
self.error = str(e)
return ''
except TransportError as e:
self.error = str(e)
return ''
return result
def assign_client(self, rfc, credit):
"""Agregar credito a un emisor
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del emisor, debe existir
credit (int): Cantidad de folios a agregar
Returns:
dict
'success': True or False,
'credit': nuevo credito despues de agregar or None
'message':
'Success, added {credit} of credit to {RFC}'
'RFC no encontrado'
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['USER'],
'password': auth['PASS'],
'taxpayer_id': rfc,
'credit': credit,
}
try:
result = client.service.assign(**args)
except Fault as e:
self.error = str(e)
return ''
return result
def client_get_timbres(self, rfc):
method = 'client'
client = Client(
URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': self._auth['USER'],
'reseller_password': self._auth['PASS'],
'taxpayer_id': rfc,
}
try:
self.result = client.service.get(**args)
except Fault as e:
self.error = str(e)
return 0
except TransportError as e:
self.error = str(e)
return 0
except ConnectionError:
self.error = 'Verifica la conexión a internet'
return 0
success = bool(self.result.users)
if not success:
self.error = self.result.message or 'RFC no existe'
return 0
return self.result.users.ResellerUser[0].credit
def _get_data_sat(path):
BF = 'string(//*[local-name()="{}"]/@{})'
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
try:
if os.path.isfile(path):
tree = etree.parse(path).getroot()
else:
tree = etree.fromstring(path.encode())
data = {}
emisor = escape(
tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
)
receptor = escape(
tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
)
data['total'] = tree.get('total') or tree.get('Total')
data['emisor'] = emisor
data['receptor'] = receptor
data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
except Exception as e:
print (e)
return {}
return '?re={emisor}&amp;rr={receptor}&amp;tt={total}&amp;id={uuid}'.format(**data)
def get_status_sat(xml):
data = _get_data_sat(xml)
if not data:
return 'XML inválido'
data = """<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<Consulta xmlns="http://tempuri.org/">
<expresionImpresa>
{}
</expresionImpresa>
</Consulta>
</soap:Body>
</soap:Envelope>""".format(data)
headers = {
'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
'Content-type': 'text/xml; charset="UTF-8"'
}
URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc'
try:
result = requests.post(URL, data=data, headers=headers)
tree = etree.fromstring(result.text)
node = tree.xpath("//*[local-name() = 'Estado']")[0]
except Exception as e:
return 'Error: {}'.format(str(e))
return node.text
def main():
return
if __name__ == '__main__':
main()

View File

@ -1,3 +1,4 @@
#!/usr/bin/env python
from .comerciodigital import PACComercioDigital
from .finkok import PACFinkok

View File

@ -15,12 +15,7 @@ from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
try:
from .conf import TOKEN
except ImportError:
TOKEN = ''
print('Agrega el TOKEN al archivo conf.py, obligatorio en v1.41.0')
from .conf import TOKEN
class SATCertificate(object):

View File

@ -0,0 +1,6 @@
#!/usr/bin/env python3
DEBUG = False
TOKEN = ''

View File

@ -0,0 +1,3 @@
#!/usr/bin/env python3
from .finkok import PACFinkok

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
# ~ Siempre consulta la documentación de PAC
# ~ AUTH = Las credenciales de timbrado proporcionadas por el PAC
# ~ NO cambies las credenciales de prueba
DEBUG = True
AUTH = {
'user': '',
'pass': '',
'RESELLER': {
'user': '',
'pass': ''
}
}
if DEBUG:
AUTH = {
'user': 'pruebas-finkok@correolibre.net',
'pass': '5c9a88da105bff9a8c430cb713f6d35269f51674bdc5963c1501b7316366',
'RESELLER': {
'user': '',
'pass': ''
}
}

View File

@ -0,0 +1,549 @@
#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import logging
import os
import re
from io import BytesIO
from xml.sax.saxutils import unescape
import lxml.etree as ET
from zeep import Client
from zeep.plugins import Plugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
from .conf import DEBUG, AUTH
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.ERROR)
logging.getLogger('zeep').setLevel(logging.ERROR)
TIMEOUT = 10
DEBUG_SOAP = True
class DebugPlugin(Plugin):
def _to_string(self, envelope, name):
if DEBUG_SOAP:
data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
path = f'/tmp/soap_{name}.xml'
with open(path, 'w') as f:
f.write(data)
return
def egress(self, envelope, http_headers, operation, binding_options):
self._to_string(envelope, 'request')
return envelope, http_headers
def ingress(self, envelope, http_headers, operation):
self._to_string(envelope, 'response')
return envelope, http_headers
class PACFinkok(object):
WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
if DEBUG:
WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
URL = {
'quick_stamp': False,
'timbra': WS.format('stamp'),
'cancel': WS.format('cancel'),
'client': WS.format('registration'),
'util': WS.format('utilities'),
}
CODE = {
'200': 'Comprobante timbrado satisfactoriamente',
'205': 'No Encontrado',
'307': 'Comprobante timbrado previamente',
'702': 'No se encontro el RFC del emisor',
'IP': 'Invalid Passphrase',
'IPMSG': 'Frase de paso inválida',
'NE': 'No Encontrado',
}
def __init__(self):
self._error = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = [DebugPlugin()]
@property
def error(self):
return self._error
def _validate_result(self, result):
if hasattr(result, 'CodEstatus'):
ce = result.CodEstatus
if ce is None:
return result
if ce == self.CODE['IP']:
self._error = self.CODE['IPMSG']
return {}
if self.CODE['NE'] in ce:
self._error = 'UUID ' + self.CODE['NE']
return {}
if self.CODE['200'] != ce:
log.error('CodEstatus', type(ce), ce)
return result
if hasattr(result, 'Incidencias'):
fault = result.Incidencias.Incidencia[0]
cod_error = fault.CodigoError.encode('utf-8')
msg_error = fault.MensajeIncidencia.encode('utf-8')
error = 'Error: {}\n{}'.format(cod_error, msg_error)
self._error = self.CODE.get(cod_error, error)
return {}
return result
def _get_result(self, client, method, args):
self._error = ''
try:
result = getattr(client.service, method)(**args)
except Fault as e:
self._error = str(e)
return {}
except TransportError as e:
if '413' in str(e):
self._error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self._error = str(e)
return {}
except ConnectionError as e:
msg = '502 - Error de conexión'
self._error = msg
return {}
return self._validate_result(result)
def _to_string(self, data):
root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
xml = ET.tostring(root,
pretty_print=True, xml_declaration=True, encoding='utf-8')
return xml.decode('utf-8')
def stamp(self, cfdi, auth={}):
if DEBUG or not auth:
auth = AUTH
method = 'timbra'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'xml': cfdi.encode('utf-8'),
}
result = self._get_result(client, 'stamp', args)
if self.error:
log.error(self.error)
return ''
data = {
'xml': self._to_string(result.xml),
'uuid': result.UUID,
'date': result.Fecha,
}
return data
def _get_data_cancel(self, cfdi):
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(cfdi.encode())
rfc_emisor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
namespaces=NS_CFDI)
cfdi_uuid = tree.xpath(
'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)',
namespaces=NS_CFDI)
return rfc_emisor, cfdi_uuid
def cancel(self, cfdi, info, auth={}):
if not auth:
auth = AUTH
rfc_emisor, cfdi_uuid = self._get_data_cancel(cfdi)
method = 'cancel'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
uuid_type = client.get_type('ns1:UUIDS')
sa = client.get_type('ns0:stringArray')
args = {
'UUIDS': uuid_type(uuids=sa(string=cfdi_uuid)),
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc_emisor,
'cer': info['cer'],
'key': info['key'],
'store_pending': False,
}
result = self._get_result(client, 'cancel', args)
if self.error:
log.error(self.error)
return ''
folio = result['Folios']['Folio'][0]
status = folio['EstatusUUID']
if status != '201':
log.debug(f'Cancel status: {status} - {cfdi_uuid}')
data = {
'acuse': result['Acuse'],
'date': result['Fecha'],
}
return data
def cancel_xml(self, xml, auth={}):
if not auth:
auth = AUTH
method = 'cancel'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
client.set_ns_prefix('can', 'http://facturacion.finkok.com/cancel')
# ~ xml = f'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n{xml}'
# ~ xml = f'<?xml version="1.0" encoding="UTF-8" standalone="true"?>\n{xml}'
args = {
'xml': xml.encode(),
'username': auth['user'],
'password': auth['pass'],
'store_pending': False,
}
result = self._get_result(client, 'cancel_signature', args)
if self.error:
log.error(self.error)
return ''
folio = result['Folios']['Folio'][0]
status = folio['EstatusUUID']
if status != '201':
log.debug(f'Cancel status: {status} -')
data = {
'acuse': result['Acuse'],
'date': result['Fecha'],
}
return data
def client_add(self, rfc, type_user=False):
"""Agrega un nuevo cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del nuevo cliente
Kwargs:
type_user (bool):
False == 'P' == Prepago
True == 'O' == On demand
Returns:
True or False
origin PAC
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {True: 'O', False: 'P'}
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
'added': datetime.datetime.now().isoformat()[:19],
}
result = self._get_result(client, 'add', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
# ~ PAC success debería ser False
msg = 'Account Already exists'
if result.message == msg:
self.error = msg
return True
return result.success
def client_get_token(self, rfc, email):
"""Genera un nuevo token al cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente, ya debe existir
email (str): El correo del cliente, funciona como USER al timbrar
Returns:
token (str): Es la contraseña para timbrar
origin PAC
dict
'username': 'username',
'status': True or False
'name': 'name',
'success': True or False
'token': 'Token de timbrado',
'message': None
"""
auth = AUTH['RESELLER']
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'name': rfc,
'token_username': email,
'taxpayer_id': rfc,
'status': True,
}
result = self._get_result(client, 'add_token', args)
if self.error:
log.error(self.error)
return ''
if not result.success:
self.error = result.message
log.error(self.error)
return ''
return result.token
def client_add_timbres(self, rfc, credit):
"""Agregar credito a un emisor
Se requiere cuenta de reseller
Args:
rfc (str): El RFC del emisor, debe existir
credit (int): Cantidad de folios a agregar
Returns:
dict
'success': True or False,
'credit': nuevo credito despues de agregar or None
'message':
'Success, added {credit} of credit to {RFC}.'
'RFC no encontrado'
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'credit': credit,
}
result = self._get_result(client, 'assign', args)
if self.error:
log.error(error)
return ''
if not result.success:
self.error = result.message
return 0
return result.credit
def client_balance(self, auth={}, rfc=''):
"""Regresa los timbres restantes del cliente
Se pueden usar las credenciales de relleser o las credenciales del emisor
Args:
rfc (str): El RFC del emisor
Kwargs:
auth (dict): Credenciales del emisor
Returns:
int Cantidad de timbres restantes
"""
if not auth:
auth = AUTH['RESELLER']
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
}
result = self._get_result(client, 'get', args)
if self.error:
log.error(self.error)
return ''
success = bool(result.users)
if not success:
self.error = result.message or 'RFC no existe'
return 0
return result.users.ResellerUser[0].credit
def client_set_status(self, rfc, status):
"""Edita el estatus (Activo o Suspendido) de un cliente
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente
Kwargs:
status (bool):
True == 'A' == Activo
False == 'S' == Suspendido
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
ts = {True: 'A', False: 'S'}
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
'status': ts[status],
}
result = self._get_result(client, 'edit', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
return True
def client_switch(self, rfc, type_user):
"""Edita el tipo de timbrado (OnDemand o Prepago) de un cliente
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente
Kwargs:
status (bool):
True == 'O' == OnDemand
False == 'P' == Prepago
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {True: 'O', False: 'P'}
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
}
result = self._get_result(client, 'switch', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
return True
def client_report_folios(self, rfc, date_from, date_to, invoice_type='I'):
"""Obtiene un reporte del total de facturas timbradas
"""
auth = AUTH['RESELLER']
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'date_from': date_from,
'date_to': date_to,
'invoice_type': invoice_type,
}
method = 'util'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
result = self._get_result(client, 'report_total', args)
if result.result is None:
# ~ PAC - Debería regresar RFC inexistente o sin registros
self.error = 'RFC no existe o no tiene registros'
return 0
total = result.result.ReportTotal[0].total
return total

View File

@ -73,7 +73,7 @@ from settings import USAR_TOKEN, API, DECIMALES_TAX
# ~ v2
from .cfdi_cert import SATCertificate
from .pacs.cfdi_cert import SATCertificate
from settings import (
EXT,

View File

@ -52,11 +52,9 @@ from .cfdi_xml import CFDI
from settings import DEBUG, DB_COMPANIES, PATHS, TEMPLATE_CANCEL
from .cfdi_cert import SATCertificate
from .pacs.cfdi_cert import SATCertificate
from .pacs import PACComercioDigital
# ~ from .pacs import PACFinkok
from .pac import Finkok as PACFinkok
# ~ from .finkok import PACFinkok
from .pacs import PACFinkok
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'

View File

@ -3917,24 +3917,24 @@ class Facturas(BaseModel):
query.execute()
return
def _cancel_signature(self, id):
msg = 'Factura cancelada correctamente'
auth = Emisor.get_auth()
certificado = Certificado.select()[0]
obj = Facturas.get(Facturas.id==id)
data, result = util.cancel_signature(
obj.uuid, certificado.p12, certificado.rfc, auth)
if data['ok']:
obj.estatus = 'Cancelada'
obj.error = ''
obj.cancelada = True
obj.fecha_cancelacion = result['Fecha']
obj.acuse = result['Acuse']
self._actualizar_saldo_cliente(self, obj, True)
else:
obj.error = data['msg']
obj.save()
return data
# ~ def _cancel_signature(self, id):
# ~ msg = 'Factura cancelada correctamente'
# ~ auth = Emisor.get_auth()
# ~ certificado = Certificado.select()[0]
# ~ obj = Facturas.get(Facturas.id==id)
# ~ data, result = util.cancel_signature(
# ~ obj.uuid, certificado.p12, certificado.rfc, auth)
# ~ if data['ok']:
# ~ obj.estatus = 'Cancelada'
# ~ obj.error = ''
# ~ obj.cancelada = True
# ~ obj.fecha_cancelacion = result['Fecha']
# ~ obj.acuse = result['Acuse']
# ~ self._actualizar_saldo_cliente(self, obj, True)
# ~ else:
# ~ obj.error = data['msg']
# ~ obj.save()
# ~ return data
def _get_filters(self, values):
if 'start' in values and 'end' in values: