cfdi-pac/source/finkok/finkok.py

560 lines
17 KiB
Python

#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import logging
import os
import re
from io import BytesIO
from xml.sax.saxutils import unescape
import lxml.etree as ET
from zeep import Client
from zeep.plugins import Plugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
from .conf import DEBUG, AUTH
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.ERROR)
logging.getLogger('zeep').setLevel(logging.ERROR)
TIMEOUT = 10
DEBUG_SOAP = False
class DebugPlugin(Plugin):
def _to_string(self, envelope, name):
if DEBUG_SOAP:
data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
path = f'/tmp/soap_{name}.xml'
with open(path, 'w') as f:
f.write(data)
return
def egress(self, envelope, http_headers, operation, binding_options):
self._to_string(envelope, 'request')
return envelope, http_headers
def ingress(self, envelope, http_headers, operation):
self._to_string(envelope, 'response')
return envelope, http_headers
class PACFinkok(object):
WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
if DEBUG:
WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
URL = {
'quick_stamp': False,
'timbra': WS.format('stamp'),
'cancel': WS.format('cancel'),
'client': WS.format('registration'),
'util': WS.format('utilities'),
}
CODE = {
'200': 'Comprobante timbrado satisfactoriamente',
'205': 'No Encontrado',
'307': 'Comprobante timbrado previamente',
'702': 'No se encontro el RFC del emisor',
'IP': 'Invalid Passphrase',
'IPMSG': 'Frase de paso inválida',
'NE': 'No Encontrado',
}
def __init__(self):
self._error = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = [DebugPlugin()]
@property
def error(self):
return self._error
def _validate_result(self, result):
if hasattr(result, 'CodEstatus'):
ce = result.CodEstatus
if ce is None:
return result
if ce == self.CODE['IP']:
self._error = self.CODE['IPMSG']
return {}
if self.CODE['NE'] in ce:
self._error = 'UUID ' + self.CODE['NE']
return {}
if ce == 'UUID Not Found':
self._error = 'UUID ' + self.CODE['NE']
return {}
if self.CODE['200'] != ce:
log.error('CodEstatus', type(ce), ce)
return result
if hasattr(result, 'Incidencias'):
fault = result.Incidencias.Incidencia[0]
cod_error = fault.CodigoError.encode('utf-8')
msg_error = fault.MensajeIncidencia.encode('utf-8')
error = 'Error: {}\n{}'.format(cod_error, msg_error)
self._error = self.CODE.get(cod_error, error)
return {}
return result
def _get_result(self, client, method, args):
self._error = ''
try:
result = getattr(client.service, method)(**args)
except Fault as e:
self._error = str(e)
return {}
except TransportError as e:
if '413' in str(e):
self._error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self._error = str(e)
return {}
except ConnectionError as e:
msg = '502 - Error de conexión'
self._error = msg
return {}
return self._validate_result(result)
def _to_string(self, data):
root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
xml = ET.tostring(root,
pretty_print=True, xml_declaration=True, encoding='utf-8')
return xml.decode('utf-8')
def stamp(self, cfdi, auth={}):
if DEBUG or not auth:
auth = AUTH
method = 'timbra'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'xml': cfdi.encode('utf-8'),
}
result = self._get_result(client, 'stamp', args)
if self.error:
log.error(self.error)
return ''
data = {
'xml': self._to_string(result.xml),
'uuid': result.UUID,
'date': result.Fecha,
}
return data
def _get_data_cancel(self, cfdi):
NS_CFDI = {
'cfdi': 'http://www.sat.gob.mx/cfd/3',
'tdf': 'http://www.sat.gob.mx/TimbreFiscalDigital',
}
tree = ET.fromstring(cfdi.encode())
rfc_emisor = tree.xpath(
'string(//cfdi:Comprobante/cfdi:Emisor/@Rfc)',
namespaces=NS_CFDI)
cfdi_uuid = tree.xpath(
'string(//cfdi:Complemento/tdf:TimbreFiscalDigital/@UUID)',
namespaces=NS_CFDI)
return rfc_emisor, cfdi_uuid
def cancel(self, cfdi, info, auth={}):
if DEBUG or not auth:
auth = AUTH
rfc_emisor, cfdi_uuid = self._get_data_cancel(cfdi)
method = 'cancel'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
uuid_type = client.get_type('ns1:UUIDS')
sa = client.get_type('ns0:stringArray')
args = {
'UUIDS': uuid_type(uuids=sa(string=cfdi_uuid)),
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc_emisor,
'cer': info['cer'],
'key': info['key'],
'store_pending': False,
}
result = self._get_result(client, 'cancel', args)
if self.error:
log.error(self.error)
return ''
folio = result['Folios']['Folio'][0]
status = folio['EstatusUUID']
if status != '201':
log.debug(f'Cancel status: {status} - {cfdi_uuid}')
data = {
'acuse': result['Acuse'],
'date': result['Fecha'],
}
return data
def cancel_xml(self, xml, auth={}, cfdi=''):
if DEBUG or not auth:
auth = AUTH
method = 'cancel'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
client.set_ns_prefix('can', 'http://facturacion.finkok.com/cancel')
# ~ xml = f'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n{xml}'
# ~ xml = f'<?xml version="1.0" encoding="UTF-8" standalone="true"?>\n{xml}'
args = {
'xml': xml.encode(),
'username': auth['user'],
'password': auth['pass'],
'store_pending': False,
}
result = self._get_result(client, 'cancel_signature', args)
if self.error:
log.error(self.error)
return ''
folio = result['Folios']['Folio'][0]
status = folio['EstatusUUID']
if status == '708':
self._error = 'Error 708 del SAT, intenta más tarde.'
log.error(self.error)
return ''
if status != '201':
log.debug(f'Cancel status: {status} -')
data = {
'acuse': result['Acuse'],
'date': result['Fecha'],
}
return data
def client_add(self, rfc, type_user=False):
"""Agrega un nuevo cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del nuevo cliente
Kwargs:
type_user (bool):
False == 'P' == Prepago
True == 'O' == On demand
Returns:
True or False
origin PAC
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {True: 'O', False: 'P'}
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
'added': datetime.datetime.now().isoformat()[:19],
}
result = self._get_result(client, 'add', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
# ~ PAC success debería ser False
msg = 'Account Already exists'
if result.message == msg:
self.error = msg
return True
return result.success
def client_get_token(self, rfc, email):
"""Genera un nuevo token al cliente para timbrado.
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente, ya debe existir
email (str): El correo del cliente, funciona como USER al timbrar
Returns:
token (str): Es la contraseña para timbrar
origin PAC
dict
'username': 'username',
'status': True or False
'name': 'name',
'success': True or False
'token': 'Token de timbrado',
'message': None
"""
auth = AUTH['RESELLER']
method = 'util'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'name': rfc,
'token_username': email,
'taxpayer_id': rfc,
'status': True,
}
result = self._get_result(client, 'add_token', args)
if self.error:
log.error(self.error)
return ''
if not result.success:
self.error = result.message
log.error(self.error)
return ''
return result.token
def client_add_timbres(self, rfc, credit):
"""Agregar credito a un emisor
Se requiere cuenta de reseller
Args:
rfc (str): El RFC del emisor, debe existir
credit (int): Cantidad de folios a agregar
Returns:
dict
'success': True or False,
'credit': nuevo credito despues de agregar or None
'message':
'Success, added {credit} of credit to {RFC}.'
'RFC no encontrado'
"""
auth = AUTH['RESELLER']
method = 'client'
client = Client(
self.URL[method], transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'credit': credit,
}
result = self._get_result(client, 'assign', args)
if self.error:
log.error(error)
return ''
if not result.success:
self.error = result.message
return 0
return result.credit
def client_balance(self, auth={}, rfc=''):
"""Regresa los timbres restantes del cliente
Se pueden usar las credenciales de relleser o las credenciales del emisor
Args:
rfc (str): El RFC del emisor
Kwargs:
auth (dict): Credenciales del emisor
Returns:
int Cantidad de timbres restantes
"""
if not auth:
auth = AUTH['RESELLER']
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
}
result = self._get_result(client, 'get', args)
if self.error:
log.error(self.error)
return ''
success = bool(result.users)
if not success:
self.error = result.message or 'RFC no existe'
return 0
return result.users.ResellerUser[0].credit
def client_set_status(self, rfc, status):
"""Edita el estatus (Activo o Suspendido) de un cliente
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente
Kwargs:
status (bool):
True == 'A' == Activo
False == 'S' == Suspendido
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
ts = {True: 'A', False: 'S'}
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'reseller_username': auth['user'],
'reseller_password': auth['pass'],
'taxpayer_id': rfc,
'status': ts[status],
}
result = self._get_result(client, 'edit', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
return True
def client_switch(self, rfc, type_user):
"""Edita el tipo de timbrado (OnDemand o Prepago) de un cliente
Se requiere cuenta de reseller para usar este método
Args:
rfc (str): El RFC del cliente
Kwargs:
status (bool):
True == 'O' == OnDemand
False == 'P' == Prepago
Returns:
dict
'message':
'Account Created successfully'
'Account Already exists'
'success': True or False
"""
auth = AUTH['RESELLER']
tu = {True: 'O', False: 'P'}
method = 'client'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'type_user': tu[type_user],
}
result = self._get_result(client, 'switch', args)
if self.error:
return False
if not result.success:
self.error = result.message
return False
return True
def client_report_folios(self, rfc, date_from, date_to, invoice_type='I'):
"""Obtiene un reporte del total de facturas timbradas
"""
auth = AUTH['RESELLER']
args = {
'username': auth['user'],
'password': auth['pass'],
'taxpayer_id': rfc,
'date_from': date_from,
'date_to': date_to,
'invoice_type': invoice_type,
}
method = 'util'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
result = self._get_result(client, 'report_total', args)
if result.result is None:
# ~ PAC - Debería regresar RFC inexistente o sin registros
self.error = 'RFC no existe o no tiene registros'
return 0
total = result.result.ReportTotal[0].total
return total