231 lines
6.2 KiB
Python
231 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
|
|
#~ import re
|
|
#~ from xml.etree import ElementTree as ET
|
|
#~ from requests import Request, Session, exceptions
|
|
import datetime
|
|
import hashlib
|
|
import os
|
|
import requests
|
|
import time
|
|
from lxml import etree
|
|
from xml.dom.minidom import parseString
|
|
from xml.sax.saxutils import escape, unescape
|
|
from uuid import UUID
|
|
|
|
from logbook import Logger
|
|
from zeep import Client
|
|
from zeep.plugins import HistoryPlugin
|
|
from zeep.cache import SqliteCache
|
|
from zeep.transports import Transport
|
|
from zeep.exceptions import Fault, TransportError
|
|
from requests.exceptions import ConnectionError
|
|
|
|
|
|
class Finkok(object):
|
|
|
|
def _get_xml(self, uuid):
|
|
if not self._validate_uuid(uuid):
|
|
return ''
|
|
|
|
method = 'util'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'uuid': uuid,
|
|
'taxpayer_id': self.rfc,
|
|
'invoice_type': 'I',
|
|
}
|
|
try:
|
|
result = client.service.get_xml(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
except TransportError as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
if result.error:
|
|
self.error = result.error
|
|
return ''
|
|
|
|
tree = parseString(result.xml)
|
|
xml = tree.toprettyxml(encoding='utf-8').decode('utf-8')
|
|
return xml
|
|
|
|
def recupera_xml(self, file_xml='', uuid=''):
|
|
self.error = ''
|
|
if uuid:
|
|
return self._get_xml(uuid)
|
|
|
|
method = 'timbra'
|
|
ok, xml = self._validate_xml(file_xml)
|
|
if not ok:
|
|
return ''
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.stamped(
|
|
xml, self._auth['user'], self._auth['pass'])
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return self._check_result(method, result)
|
|
|
|
def estatus_xml(self, uuid):
|
|
method = 'timbra'
|
|
if not self._validate_uuid(uuid):
|
|
return ''
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.query_pending(
|
|
self._auth['USER'], self._auth['PASS'], uuid)
|
|
return result.status
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
def get_acuse(self, rfc, uuids, type_acuse='C'):
|
|
for u in uuids:
|
|
if not self._validate_uuid(u):
|
|
return ''
|
|
|
|
method = 'cancel'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'uuid': '',
|
|
'type': type_acuse,
|
|
}
|
|
try:
|
|
result = []
|
|
for u in uuids:
|
|
args['uuid'] = u
|
|
r = client.service.get_receipt(**args)
|
|
result.append(r)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def estatus_cancel(self, uuids):
|
|
for u in uuids:
|
|
if not self._validate_uuid(u):
|
|
return ''
|
|
|
|
method = 'cancel'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
|
|
args = {
|
|
'username': self._auth['USER'],
|
|
'password': self._auth['PASS'],
|
|
'uuid': '',
|
|
}
|
|
try:
|
|
result = []
|
|
for u in uuids:
|
|
args['uuid'] = u
|
|
r = client.service.query_pending_cancellation(**args)
|
|
result.append(r)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def get_date(self):
|
|
method = 'util'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
try:
|
|
result = client.service.datetime(AUTH['USER'], AUTH['PASS'])
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
if result.error:
|
|
self.error = result.error
|
|
return
|
|
|
|
return result.datetime
|
|
|
|
def edit_client(self, rfc, status=True):
|
|
"""
|
|
Se requiere cuenta de reseller para usar este método
|
|
status = 'A' or 'S'
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
sv = {False: 'S', True: 'A'}
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': auth['USER'],
|
|
'reseller_password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
'status': sv[status],
|
|
}
|
|
try:
|
|
result = client.service.edit(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
|
|
return result
|
|
|
|
def get_client(self, rfc):
|
|
"""Regresa el estatus del cliente
|
|
.
|
|
Se requiere cuenta de reseller para usar este método
|
|
|
|
Args:
|
|
rfc (str): El RFC del emisor
|
|
|
|
Returns:
|
|
dict
|
|
'message': None,
|
|
'users': {
|
|
'ResellerUser': [
|
|
{
|
|
'status': 'A',
|
|
'counter': 0,
|
|
'taxpayer_id': '',
|
|
'credit': 0
|
|
}
|
|
]
|
|
} or None si no existe
|
|
"""
|
|
auth = AUTH['RESELLER']
|
|
|
|
method = 'client'
|
|
client = Client(
|
|
URL[method], transport=self._transport, plugins=self._plugins)
|
|
args = {
|
|
'reseller_username': auth['USER'],
|
|
'reseller_password': auth['PASS'],
|
|
'taxpayer_id': rfc,
|
|
}
|
|
|
|
try:
|
|
result = client.service.get(**args)
|
|
except Fault as e:
|
|
self.error = str(e)
|
|
return ''
|
|
except TransportError as e:
|
|
self.errorcancel = str(e)
|
|
return ''
|
|
|
|
return result
|