forked from elmau/empresa-libre
698 lines
20 KiB
Python
698 lines
20 KiB
Python
#!/usr/bin/env python
|
|
|
|
#~ import re
|
|
#~ from xml.etree import ElementTree as ET
|
|
#~ from requests import Request, Session, exceptions
|
|
import datetime
|
|
import hashlib
|
|
import os
|
|
import requests
|
|
import time
|
|
from lxml import etree
|
|
from xml.dom.minidom import parseString
|
|
from xml.sax.saxutils import escape, unescape
|
|
from uuid import UUID
|
|
|
|
from logbook import Logger
|
|
from zeep import Client
|
|
from zeep.plugins import HistoryPlugin
|
|
from zeep.cache import SqliteCache
|
|
from zeep.transports import Transport
|
|
from zeep.exceptions import Fault, TransportError
|
|
|
|
if __name__ == '__main__':
|
|
from configpac import DEBUG, TIMEOUT, AUTH, URL
|
|
else:
|
|
from .configpac import DEBUG, TIMEOUT, AUTH, URL
|
|
|
|
|
|
log = Logger('PAC')
|
|
#~ node = client.create_message(client.service, SERVICE, **args)
|
|
#~ print(etree.tostring(node, pretty_print=True).decode())
|
|
|
|
|
|
class Ecodex(object):
|
|
|
|
def __init__(self):
|
|
self.codes = URL['codes']
|
|
self.error = ''
|
|
self.message = ''
|
|
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
|
|
self._plugins = None
|
|
self._history = None
|
|
if DEBUG:
|
|
self._history = HistoryPlugin()
|
|
self._plugins = [self._history]
|
|
|
|
def _get_token(self, rfc):
|
|
client = Client(URL['seguridad'],
|
|
transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.ObtenerToken(rfc, self._get_epoch())
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
log.error(self.error)
|
|
return ''
|
|
|
|
s = '{}|{}'.format(AUTH['ID'], result.Token)
|
|
return hashlib.sha1(s.encode()).hexdigest()
|
|
|
|
def _get_token_rest(self, rfc):
|
|
data = {
|
|
'rfc': rfc,
|
|
'grant_type': 'authorization_token',
|
|
}
|
|
headers = {'Content-type': 'application/x-www-form-urlencoded'}
|
|
result = requests.post(URL['token'], data=data, headers=headers)
|
|
data = result.json()
|
|
s = '{}|{}'.format(AUTH['ID'], data['service_token'])
|
|
return hashlib.sha1(s.encode()).hexdigest(), data['access_token']
|
|
|
|
def _validate_xml(self, xml):
|
|
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
|
|
if os.path.isfile(xml):
|
|
tree = etree.parse(xml).getroot()
|
|
else:
|
|
tree = etree.fromstring(xml.encode())
|
|
|
|
fecha = tree.get('Fecha')
|
|
rfc = tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
|
|
data = {
|
|
'ComprobanteXML': etree.tostring(tree).decode(),
|
|
'RFC': rfc,
|
|
'Token': self._get_token(rfc),
|
|
'TransaccionID': self._get_epoch(fecha),
|
|
}
|
|
return data
|
|
|
|
def _get_by_hash(self, sh, rfc):
|
|
token, access_token = self._get_token_rest(rfc)
|
|
url = URL['hash'].format(sh)
|
|
headers = {
|
|
'Authorization': 'Bearer {}'.format(access_token),
|
|
'X-Auth-Token': token,
|
|
}
|
|
result = requests.get(url, headers=headers)
|
|
if result.status_code == 200:
|
|
print (result.json())
|
|
return
|
|
|
|
def timbra_xml(self, xml):
|
|
data = self._validate_xml(xml)
|
|
client = Client(URL['timbra'],
|
|
transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.TimbraXML(**data)
|
|
except Fault as e:
|
|
error = str(e)
|
|
if self.codes['HASH'] in error:
|
|
sh = error.split(' ')[3]
|
|
return self._get_by_hash(sh[:40], data['RFC'])
|
|
self.error = error
|
|
return ''
|
|
|
|
tree = parseString(result.ComprobanteXML.DatosXML)
|
|
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
|
|
return xml
|
|
|
|
def _get_epoch(self, date=None):
|
|
if isinstance(date, str):
|
|
f = '%Y-%m-%dT%H:%M:%S'
|
|
e = int(time.mktime(time.strptime(date, f)))
|
|
else:
|
|
date = datetime.datetime.now()
|
|
e = int(time.mktime(date.timetuple()))
|
|
return e
|
|
|
|
def estatus_cuenta(self, rfc):
|
|
#~ Codigos:
|
|
#~ 100 = Cuenta encontrada
|
|
#~ 101 = RFC no dado de alta en el sistema ECODEX
|
|
token = self._get_token(rfc)
|
|
if not token:
|
|
return {}
|
|
|
|
data = {
|
|
'RFC': rfc,
|
|
'Token': token,
|
|
'TransaccionID': self._get_epoch()
|
|
}
|
|
client = Client(URL['clients'],
|
|
transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.EstatusCuenta(**data)
|
|
except Fault as e:
|
|
log.error(str(e))
|
|
return
|
|
#~ print (result)
|
|
return result.Estatus
|
|
|
|
|
|
class Finkok(object):
|
|
|
|
def __init__(self, auth={}):
|
|
self.codes = URL['codes']
|
|
self.error = ''
|
|
self.message = ''
|
|
self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
|
|
self._plugins = None
|
|
self._history = None
|
|
self.uuid = ''
|
|
self.fecha = None
|
|
if DEBUG:
|
|
self._history = HistoryPlugin()
|
|
self._plugins = [self._history]
|
|
self._auth = AUTH
|
|
else:
|
|
self._auth = auth
|
|
|
|
def _debug(self):
|
|
if not DEBUG:
|
|
return
|
|
print('SEND', self._history.last_sent)
|
|
print('RESULT', self._history.last_received)
|
|
return
|
|
|
|
def _check_result(self, method, result):
|
|
#~ print ('CODE', result.CodEstatus)
|
|
#~ print ('INCIDENCIAS', result.Incidencias)
|
|
self.message = ''
|
|
MSG = {
|
|
'OK': 'Comprobante timbrado satisfactoriamente',
|
|
'307': 'Comprobante timbrado previamente',
|
|
}
|
|
status = result.CodEstatus
|
|
if status is None and result.Incidencias:
|
|
for i in result.Incidencias['Incidencia']:
|
|
self.error += 'Error: {}\n{}'.format(
|
|
i['CodigoError'], i['MensajeIncidencia'])
|
|
return ''
|
|
|
|
if method == 'timbra' and status in (MSG['OK'], MSG['307']):
|
|
#~ print ('UUID', result.UUID)
|
|
#~ print ('FECHA', result.Fecha)
|
|
if status == MSG['307']:
|
|
self.message = MSG['307']
|
|
tree = parseString(result.xml)
|
|
response = tree.toprettyxml(encoding='utf-8').decode('utf-8')
|
|
self.uuid = result.UUID
|
|
self.fecha = result.Fecha
|
|
|
|
return response
|
|
|
|
def _load_file(self, path):
|
|
try:
|
|
with open(path, 'rb') as f:
|
|
data = f.read()
|
|
except Exception as e:
|
|
self.error = str(e)
|
|
return
|
|
return data
|
|
|
|
def _validate_xml(self, file_xml):
|
|
if os.path.isfile(file_xml):
|
|
try:
|
|
with open(file_xml, 'rb') as f:
|
|
xml = f.read()
|
|
except Exception as e:
|
|
self.error = str(e)
|
|
return False, ''
|
|
else:
|
|
xml = file_xml.encode('utf-8')
|
|
return True, xml
|
|
|
|
def _validate_uuid(self, uuid):
|
|
try:
|
|
UUID(uuid)
|
|
return True
|
|
except ValueError:
|
|
self.error = 'UUID no válido: {}'.format(uuid)
|
|
return False
|
|
|
|
def timbra_xml(self, file_xml):
|
|
self.error = ''
|
|
|
|
if not DEBUG and not self._auth:
|
|
self.error = 'Sin datos para timbrar'
|
|
return
|
|
|
|
method = 'timbra'
|
|
ok, xml = self._validate_xml(file_xml)
|
|
if not ok:
|
|
return ''
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'xml': xml,
|
|
}
|
|
if URL['quick_stamp']:
|
|
try:
|
|
result = client.service.quick_stamp(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return
|
|
else:
|
|
try:
|
|
result = client.service.stamp(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return
|
|
|
|
return self._check_result(method, result)
|
|
|
|
def _get_xml(self, uuid):
|
|
if not self._validate_uuid(uuid):
|
|
return ''
|
|
|
|
method = 'util'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'uuid': uuid,
|
|
'taxpayer_id': self.rfc,
|
|
'invoice_type': 'I',
|
|
}
|
|
try:
|
|
result = client.service.get_xml(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
except TransportError as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
if result.error:
|
|
self.error = result.error
|
|
return ''
|
|
|
|
tree = parseString(result.xml)
|
|
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
|
|
return xml
|
|
|
|
def recupera_xml(self, file_xml='', uuid=''):
|
|
self.error = ''
|
|
if uuid:
|
|
return self._get_xml(uuid)
|
|
|
|
method = 'timbra'
|
|
ok, xml = self._validate_xml(file_xml)
|
|
if not ok:
|
|
return ''
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.stamped(
|
|
xml, self._auth['user'], self._auth['pass'])
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return self._check_result(method, result)
|
|
|
|
def estatus_xml(self, uuid):
|
|
method = 'timbra'
|
|
if not self._validate_uuid(uuid):
|
|
return ''
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.query_pending(
|
|
self._auth['USER'], self._auth['PASS'], uuid)
|
|
#~ print (result.date)
|
|
#~ tree = parseString(unescape(result.xml))
|
|
#~ response = tree.toprettyxml(encoding='utf-8').decode('utf-8')
|
|
return result.status
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
def cancel_xml(self, rfc, uuids, path_cer, path_key):
|
|
for u in uuids:
|
|
if not self._validate_uuid(u):
|
|
return ''
|
|
|
|
cer = self._load_file(path_cer)
|
|
key = self._load_file(path_key)
|
|
method = 'cancel'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
uuid_type = client.get_type('ns1:UUIDS')
|
|
sa = client.get_type('ns0:stringArray')
|
|
|
|
args = {
|
|
'UUIDS': uuid_type(uuids=sa(string=uuids)),
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'cer': cer,
|
|
'key': key,
|
|
'store_pending': True,
|
|
}
|
|
try:
|
|
result = client.service.cancel(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
if result.CodEstatus and self.codes['205'] in result.CodEstatus:
|
|
self.error = result.CodEstatus
|
|
return ''
|
|
|
|
return result
|
|
|
|
def cancel_signature(self, file_xml):
|
|
method = 'cancel'
|
|
if os.path.isfile(file_xml):
|
|
root = etree.parse(file_xml).getroot()
|
|
else:
|
|
root = etree.fromstring(file_xml.encode())
|
|
|
|
xml = etree.tostring(root)
|
|
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'xml': xml,
|
|
'store_pending': True,
|
|
}
|
|
|
|
try:
|
|
result = client.service.cancel_signature(**args)
|
|
return result
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
def get_acuse(self, rfc, uuids, type_acuse='C'):
|
|
for u in uuids:
|
|
if not self._validate_uuid(u):
|
|
return ''
|
|
|
|
method = 'cancel'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'uuid': '',
|
|
'type': type_acuse,
|
|
}
|
|
try:
|
|
result = []
|
|
for u in uuids:
|
|
args['uuid'] = u
|
|
r = client.service.get_receipt(**args)
|
|
result.append(r)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def estatus_cancel(self, uuids):
|
|
for u in uuids:
|
|
if not self._validate_uuid(u):
|
|
return ''
|
|
|
|
method = 'cancel'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'uuid': '',
|
|
}
|
|
try:
|
|
result = []
|
|
for u in uuids:
|
|
args['uuid'] = u
|
|
r = client.service.query_pending_cancellation(**args)
|
|
result.append(r)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def add_token(self, rfc, email):
|
|
"""Agrega un nuevo token al cliente para timbrado.
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del cliente, ya debe existir
|
|
email (str): El correo del cliente, funciona como USER al timbrar
|
|
|
|
Returns:
|
|
dict
|
|
'username': 'username',
|
|
'status': True or False
|
|
'name': 'name',
|
|
'success': True or False
|
|
'token': 'Token de timbrado',
|
|
'message': None
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
method = 'util'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'username': auth['USER'],
|
|
'password': auth['PASS'],
|
|
'name': rfc,
|
|
'token_username': email,
|
|
'taxpayer_id': rfc,
|
|
'status': True,
|
|
}
|
|
try:
|
|
result = client.service.add_token(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def get_date(self):
|
|
method = 'util'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.datetime(AUTH['USER'], AUTH['PASS'])
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
if result.error:
|
|
self.error = result.error
|
|
return
|
|
|
|
return result.datetime
|
|
|
|
def add_client(self, rfc, type_user=False):
|
|
"""Agrega un nuevo cliente para timbrado.
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del nuevo cliente
|
|
|
|
Kwargs:
|
|
type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
|
|
|
|
Returns:
|
|
dict
|
|
'message':
|
|
'Account Created successfully'
|
|
'Account Already exists'
|
|
'success': True or False
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
tu = {False: 'P', True: 'O'}
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': auth['USER'],
|
|
'reseller_password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'type_user': tu[type_user],
|
|
'added': datetime.datetime.now().isoformat()[:19],
|
|
}
|
|
try:
|
|
result = client.service.add(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def edit_client(self, rfc, status=True):
|
|
"""
|
|
Se requiere cuenta de reseller para usar este método
|
|
status = 'A' or 'S'
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
sv = {False: 'S', True: 'A'}
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': auth['USER'],
|
|
'reseller_password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'status': sv[status],
|
|
}
|
|
try:
|
|
result = client.service.edit(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def get_client(self, rfc):
|
|
"""Regresa el estatus del cliente
|
|
.
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del emisor
|
|
|
|
Returns:
|
|
dict
|
|
'message': None,
|
|
'users': {
|
|
'ResellerUser': [
|
|
{
|
|
'status': 'A',
|
|
'counter': 0,
|
|
'taxpayer_id': '',
|
|
'credit': 0
|
|
}
|
|
]
|
|
} or None si no existe
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': auth['USER'],
|
|
'reseller_password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
}
|
|
|
|
try:
|
|
result = client.service.get(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
except TransportError as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def assign_client(self, rfc, credit):
|
|
"""Agregar credito a un emisor
|
|
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del emisor, debe existir
|
|
credit (int): Cantidad de folios a agregar
|
|
|
|
Returns:
|
|
dict
|
|
'success': True or False,
|
|
'credit': nuevo credito despues de agregar or None
|
|
'message':
|
|
'Success, added {credit} of credit to {RFC}'
|
|
'RFC no encontrado'
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'username': auth['USER'],
|
|
'password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'credit': credit,
|
|
}
|
|
try:
|
|
result = client.service.assign(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
|
|
def _get_data_sat(path):
|
|
BF = 'string(//*[local-name()="{}"]/@{})'
|
|
NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
|
|
|
|
try:
|
|
if os.path.isfile(path):
|
|
tree = etree.parse(path).getroot()
|
|
else:
|
|
tree = etree.fromstring(path.encode())
|
|
|
|
data = {}
|
|
emisor = escape(
|
|
tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
|
|
tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
|
|
)
|
|
receptor = escape(
|
|
tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
|
|
tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
|
|
)
|
|
data['total'] = tree.get('total') or tree.get('Total')
|
|
data['emisor'] = emisor
|
|
data['receptor'] = receptor
|
|
data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
|
|
except Exception as e:
|
|
print (e)
|
|
return {}
|
|
|
|
return '?re={emisor}&rr={receptor}&tt={total}&id={uuid}'.format(**data)
|
|
|
|
|
|
def get_status_sat(xml):
|
|
data = _get_data_sat(xml)
|
|
if not data:
|
|
return 'XML inválido'
|
|
|
|
URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/ConsultaCFDIService.svc?wsdl'
|
|
client = Client(URL, transport=Transport(cache=SqliteCache()))
|
|
try:
|
|
result = client.service.Consulta(expresionImpresa=data)
|
|
except Exception as e:
|
|
return 'Error: {}'.format(str(e))
|
|
|
|
return result.Estado
|
|
|
|
|
|
def main():
|
|
return
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|