431 lines
12 KiB
Python
431 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
|
|
#~ import re
|
|
#~ from xml.etree import ElementTree as ET
|
|
#~ from requests import Request, Session, exceptions
|
|
import datetime
|
|
import hashlib
|
|
import os
|
|
import requests
|
|
import time
|
|
from lxml import etree
|
|
from xml.dom.minidom import parseString
|
|
from xml.sax.saxutils import escape, unescape
|
|
from uuid import UUID
|
|
|
|
from logbook import Logger
|
|
from zeep import Client
|
|
from zeep.plugins import HistoryPlugin
|
|
from zeep.cache import SqliteCache
|
|
from zeep.transports import Transport
|
|
from zeep.exceptions import Fault, TransportError
|
|
from requests.exceptions import ConnectionError
|
|
|
|
|
|
class Finkok(object):
|
|
|
|
def _get_xml(self, uuid):
|
|
if not self._validate_uuid(uuid):
|
|
return ''
|
|
|
|
method = 'util'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'uuid': uuid,
|
|
'taxpayer_id': self.rfc,
|
|
'invoice_type': 'I',
|
|
}
|
|
try:
|
|
result = client.service.get_xml(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
except TransportError as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
if result.error:
|
|
self.error = result.error
|
|
return ''
|
|
|
|
tree = parseString(result.xml)
|
|
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
|
|
return xml
|
|
|
|
def recupera_xml(self, file_xml='', uuid=''):
|
|
self.error = ''
|
|
if uuid:
|
|
return self._get_xml(uuid)
|
|
|
|
method = 'timbra'
|
|
ok, xml = self._validate_xml(file_xml)
|
|
if not ok:
|
|
return ''
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.stamped(
|
|
xml, self._auth['user'], self._auth['pass'])
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return self._check_result(method, result)
|
|
|
|
def estatus_xml(self, uuid):
|
|
method = 'timbra'
|
|
if not self._validate_uuid(uuid):
|
|
return ''
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.query_pending(
|
|
self._auth['USER'], self._auth['PASS'], uuid)
|
|
return result.status
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
def cancel_xml(self, rfc, uuid, cer, key):
|
|
# ~ for u in uuids:
|
|
# ~ if not self._validate_uuid(u):
|
|
# ~ return ''
|
|
|
|
method = 'cancel'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
uuid_type = client.get_type('ns1:UUIDS')
|
|
sa = client.get_type('ns0:stringArray')
|
|
|
|
args = {
|
|
'UUIDS': uuid_type(uuids=sa(string=uuid)),
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'cer': cer,
|
|
'key': key,
|
|
'store_pending': False,
|
|
}
|
|
try:
|
|
result = client.service.cancel(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
if result.CodEstatus and self.codes['205'] in result.CodEstatus:
|
|
self.error = result.CodEstatus
|
|
return ''
|
|
|
|
return result
|
|
|
|
def cancel_signature(self, file_xml):
|
|
method = 'cancel'
|
|
if os.path.isfile(file_xml):
|
|
root = etree.parse(file_xml).getroot()
|
|
else:
|
|
root = etree.fromstring(file_xml.encode())
|
|
|
|
xml = etree.tostring(root)
|
|
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'xml': xml,
|
|
'store_pending': False,
|
|
}
|
|
|
|
try:
|
|
result = client.service.cancel_signature(**args)
|
|
return result
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
def get_acuse(self, rfc, uuids, type_acuse='C'):
|
|
for u in uuids:
|
|
if not self._validate_uuid(u):
|
|
return ''
|
|
|
|
method = 'cancel'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'uuid': '',
|
|
'type': type_acuse,
|
|
}
|
|
try:
|
|
result = []
|
|
for u in uuids:
|
|
args['uuid'] = u
|
|
r = client.service.get_receipt(**args)
|
|
result.append(r)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def estatus_cancel(self, uuids):
|
|
for u in uuids:
|
|
if not self._validate_uuid(u):
|
|
return ''
|
|
|
|
method = 'cancel'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'uuid': '',
|
|
}
|
|
try:
|
|
result = []
|
|
for u in uuids:
|
|
args['uuid'] = u
|
|
r = client.service.query_pending_cancellation(**args)
|
|
result.append(r)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def add_token(self, rfc, email):
|
|
"""Agrega un nuevo token al cliente para timbrado.
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del cliente, ya debe existir
|
|
email (str): El correo del cliente, funciona como USER al timbrar
|
|
|
|
Returns:
|
|
dict
|
|
'username': 'username',
|
|
'status': True or False
|
|
'name': 'name',
|
|
'success': True or False
|
|
'token': 'Token de timbrado',
|
|
'message': None
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
method = 'util'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'username': auth['USER'],
|
|
'password': auth['PASS'],
|
|
'name': rfc,
|
|
'token_username': email,
|
|
'taxpayer_id': rfc,
|
|
'status': True,
|
|
}
|
|
try:
|
|
result = client.service.add_token(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def get_date(self):
|
|
method = 'util'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.datetime(AUTH['USER'], AUTH['PASS'])
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
if result.error:
|
|
self.error = result.error
|
|
return
|
|
|
|
return result.datetime
|
|
|
|
def add_client(self, rfc, type_user=False):
|
|
"""Agrega un nuevo cliente para timbrado.
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del nuevo cliente
|
|
|
|
Kwargs:
|
|
type_user (bool): False == 'P' == Prepago or True == 'O' == On demand
|
|
|
|
Returns:
|
|
dict
|
|
'message':
|
|
'Account Created successfully'
|
|
'Account Already exists'
|
|
'success': True or False
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
tu = {False: 'P', True: 'O'}
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': auth['USER'],
|
|
'reseller_password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'type_user': tu[type_user],
|
|
'added': datetime.datetime.now().isoformat()[:19],
|
|
}
|
|
try:
|
|
result = client.service.add(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def edit_client(self, rfc, status=True):
|
|
"""
|
|
Se requiere cuenta de reseller para usar este método
|
|
status = 'A' or 'S'
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
sv = {False: 'S', True: 'A'}
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': auth['USER'],
|
|
'reseller_password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'status': sv[status],
|
|
}
|
|
try:
|
|
result = client.service.edit(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def get_client(self, rfc):
|
|
"""Regresa el estatus del cliente
|
|
.
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del emisor
|
|
|
|
Returns:
|
|
dict
|
|
'message': None,
|
|
'users': {
|
|
'ResellerUser': [
|
|
{
|
|
'status': 'A',
|
|
'counter': 0,
|
|
'taxpayer_id': '',
|
|
'credit': 0
|
|
}
|
|
]
|
|
} or None si no existe
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': auth['USER'],
|
|
'reseller_password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
}
|
|
|
|
try:
|
|
result = client.service.get(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
except TransportError as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def assign_client(self, rfc, credit):
|
|
"""Agregar credito a un emisor
|
|
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del emisor, debe existir
|
|
credit (int): Cantidad de folios a agregar
|
|
|
|
Returns:
|
|
dict
|
|
'success': True or False,
|
|
'credit': nuevo credito despues de agregar or None
|
|
'message':
|
|
'Success, added {credit} of credit to {RFC}'
|
|
'RFC no encontrado'
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'username': auth['USER'],
|
|
'password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'credit': credit,
|
|
}
|
|
try:
|
|
result = client.service.assign(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def client_get_timbres(self, rfc):
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': self._auth['USER'],
|
|
'reseller_password': self._auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
}
|
|
|
|
try:
|
|
self.result = client.service.get(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return 0
|
|
except TransportError as e:
|
|
self.error = str(e)
|
|
return 0
|
|
except ConnectionError:
|
|
self.error = 'Verifica la conexión a internet'
|
|
return 0
|
|
|
|
success = bool(self.result.users)
|
|
if not success:
|
|
self.error = self.result.message or 'RFC no existe'
|
|
return 0
|
|
|
|
return self.result.users.ResellerUser[0].credit
|
|
|