cfdi-pac/source/finkok/finkok.py

175 lines
5.5 KiB
Python

#!/usr/bin/env python
# ~
# ~ PAC
# ~ Copyright (C) 2018-2019 Mauricio Baeza Servin - public [AT] elmau [DOT] net
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import logging
import os
import re
from io import BytesIO
from xml.sax.saxutils import unescape
import lxml.etree as ET
from zeep import Client
from zeep.plugins import Plugin
from zeep.cache import SqliteCache
from zeep.transports import Transport
from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
from .conf import DEBUG, AUTH
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
logging.getLogger('requests').setLevel(logging.ERROR)
logging.getLogger('zeep').setLevel(logging.ERROR)
TIMEOUT = 10
DEBUG_SOAP = False
class DebugPlugin(Plugin):
def _to_string(self, envelope, name):
if DEBUG_SOAP:
data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
path = f'/tmp/soap_{name}.xml'
with open(path, 'w') as f:
f.write(data)
return
def egress(self, envelope, http_headers, operation, binding_options):
self._to_string(envelope, 'request')
return envelope, http_headers
def ingress(self, envelope, http_headers, operation):
self._to_string(envelope, 'response')
return envelope, http_headers
class PACFinkok(object):
WS = 'https://facturacion.finkok.com/servicios/soap/{}.wsdl'
if DEBUG:
WS = 'http://demo-facturacion.finkok.com/servicios/soap/{}.wsdl'
URL = {
'quick_stamp': False,
'timbra': WS.format('stamp'),
'cancel': WS.format('cancel'),
'client': WS.format('registration'),
'util': WS.format('utilities'),
}
CODE = {
'200': 'Comprobante timbrado satisfactoriamente',
'205': 'No Encontrado',
'307': 'Comprobante timbrado previamente',
'702': 'No se encontro el RFC del emisor',
'IP': 'Invalid Passphrase',
'IPMSG': 'Frase de paso inválida',
'NE': 'No Encontrado',
}
def __init__(self):
self._error = ''
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
self._plugins = [DebugPlugin()]
@property
def error(self):
return self._error
def _validate_result(self, result):
if hasattr(result, 'CodEstatus'):
ce = result.CodEstatus
if ce == self.CODE['IP']:
self._error = self.CODE['IPMSG']
return {}
if self.CODE['NE'] in ce:
self._error = 'UUID ' + self.CODE['NE']
return {}
if self.CODE['200'] != ce:
log.error('CodEstatus', type(ce), ce)
return result
if hasattr(result, 'Incidencias'):
fault = result.Incidencias.Incidencia[0]
cod_error = fault.CodigoError.encode('utf-8')
msg_error = fault.MensajeIncidencia.encode('utf-8')
error = 'Error: {}\n{}'.format(cod_error, msg_error)
self._error = self.CODE.get(cod_error, error)
return {}
return result
def _get_result(self, client, method, args):
self._error = ''
try:
result = getattr(client.service, method)(**args)
except Fault as e:
self._error = str(e)
return {}
except TransportError as e:
if '413' in str(e):
self._error = '413<BR><BR><b>Documento muy grande para timbrar</b>'
else:
self._error = str(e)
return {}
except ConnectionError as e:
msg = '502 - Error de conexión'
self._error = msg
return {}
return self._validate_result(result)
def _to_string(self, data):
root = ET.parse(BytesIO(data.encode('utf-8'))).getroot()
xml = ET.tostring(root,
pretty_print=True, xml_declaration=True, encoding='utf-8')
return xml.decode('utf-8')
def stamp(self, cfdi, auth={}):
if DEBUG or not auth:
auth = AUTH
method = 'timbra'
client = Client(self.URL[method],
transport=self._transport, plugins=self._plugins)
args = {
'username': auth['user'],
'password': auth['pass'],
'xml': cfdi.encode('utf-8'),
}
result = self._get_result(client, 'stamp', args)
if self._error:
return ''
data = {
'xml': self._to_string(result.xml),
'uuid': result.UUID,
'date': result.Fecha,
}
return data