#!/usr/bin/env python #~ import re #~ from xml.etree import ElementTree as ET #~ from requests import Request, Session, exceptions import datetime import hashlib import os import requests import time from lxml import etree from xml.dom.minidom import parseString from xml.sax.saxutils import escape, unescape from uuid import UUID from logbook import Logger from zeep import Client from zeep.plugins import HistoryPlugin from zeep.cache import SqliteCache from zeep.transports import Transport from zeep.exceptions import Fault, TransportError from .configpac import DEBUG, TIMEOUT, AUTH, URL log = Logger('PAC') #~ node = client.create_message(client.service, SERVICE, **args) #~ print(etree.tostring(node, pretty_print=True).decode()) class Ecodex(object): def __init__(self): self.codes = URL['codes'] self.error = '' self.message = '' self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) self._plugins = None self._history = None if DEBUG: self._history = HistoryPlugin() self._plugins = [self._history] def _get_token(self, rfc): client = Client(URL['seguridad'], transport=self._transport, plugins=self._plugins) try: result = client.service.ObtenerToken(rfc, self._get_epoch()) except Fault as e: self.error = str(e) log.error(self.error) return '' s = '{}|{}'.format(AUTH['ID'], result.Token) return hashlib.sha1(s.encode()).hexdigest() def _get_token_rest(self, rfc): data = { 'rfc': rfc, 'grant_type': 'authorization_token', } headers = {'Content-type': 'application/x-www-form-urlencoded'} result = requests.post(URL['token'], data=data, headers=headers) data = result.json() s = '{}|{}'.format(AUTH['ID'], data['service_token']) return hashlib.sha1(s.encode()).hexdigest(), data['access_token'] def _validate_xml(self, xml): NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'} if os.path.isfile(xml): tree = etree.parse(xml).getroot() else: tree = etree.fromstring(xml.encode()) fecha = tree.get('Fecha') rfc = tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI) data = { 'ComprobanteXML': etree.tostring(tree).decode(), 'RFC': rfc, 'Token': self._get_token(rfc), 'TransaccionID': self._get_epoch(fecha), } return data def _get_by_hash(self, sh, rfc): token, access_token = self._get_token_rest(rfc) url = URL['hash'].format(sh) headers = { 'Authorization': 'Bearer {}'.format(access_token), 'X-Auth-Token': token, } result = requests.get(url, headers=headers) if result.status_code == 200: print (result.json()) return def timbra_xml(self, xml): data = self._validate_xml(xml) client = Client(URL['timbra'], transport=self._transport, plugins=self._plugins) try: result = client.service.TimbraXML(**data) except Fault as e: error = str(e) if self.codes['HASH'] in error: sh = error.split(' ')[3] return self._get_by_hash(sh[:40], data['RFC']) self.error = error return '' tree = parseString(result.ComprobanteXML.DatosXML) xml = tree.toprettyxml(encoding='utf-8').decode('utf-8') return xml def _get_epoch(self, date=None): if isinstance(date, str): f = '%Y-%m-%dT%H:%M:%S' e = int(time.mktime(time.strptime(date, f))) else: date = datetime.datetime.now() e = int(time.mktime(date.timetuple())) return e def estatus_cuenta(self, rfc): #~ Codigos: #~ 100 = Cuenta encontrada #~ 101 = RFC no dado de alta en el sistema ECODEX token = self._get_token(rfc) if not token: return {} data = { 'RFC': rfc, 'Token': token, 'TransaccionID': self._get_epoch() } client = Client(URL['clients'], transport=self._transport, plugins=self._plugins) try: result = client.service.EstatusCuenta(**data) except Fault as e: log.error(str(e)) return #~ print (result) return result.Estatus class Finkok(object): def __init__(self): self.codes = URL['codes'] self.error = '' self.message = '' self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) self._plugins = None self._history = None self.uuid = '' self.fecha = None if DEBUG: self._history = HistoryPlugin() self._plugins = [self._history] def _debug(self): if not DEBUG: return print('SEND', self._history.last_sent) print('RESULT', self._history.last_received) return def _check_result(self, method, result): #~ print ('CODE', result.CodEstatus) #~ print ('INCIDENCIAS', result.Incidencias) self.message = '' MSG = { 'OK': 'Comprobante timbrado satisfactoriamente', '307': 'Comprobante timbrado previamente', } status = result.CodEstatus if status is None and result.Incidencias: for i in result.Incidencias['Incidencia']: self.error += 'Error: {}\n{}'.format( i['CodigoError'], i['MensajeIncidencia']) return '' if method == 'timbra' and status in (MSG['OK'], MSG['307']): #~ print ('UUID', result.UUID) #~ print ('FECHA', result.Fecha) if status == MSG['307']: self.message = MSG['307'] tree = parseString(result.xml) response = tree.toprettyxml(encoding='utf-8').decode('utf-8') self.uuid = result.UUID self.fecha = result.Fecha return response def _load_file(self, path): try: with open(path, 'rb') as f: data = f.read() except Exception as e: self.error = str(e) return return data def _validate_xml(self, file_xml): if os.path.isfile(file_xml): try: with open(file_xml, 'rb') as f: xml = f.read() except Exception as e: self.error = str(e) return False, '' else: xml = file_xml.encode('utf-8') return True, xml def _validate_uuid(self, uuid): try: UUID(uuid) return True except ValueError: self.error = 'UUID no válido: {}'.format(uuid) return False def timbra_xml(self, file_xml): self.error = '' method = 'timbra' ok, xml = self._validate_xml(file_xml) if not ok: return '' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': AUTH['USER'], 'password': AUTH['PASS'], 'xml': xml, } if URL['quick_stamp']: try: result = client.service.quick_stamp(**args) except Fault as e: self.error = str(e) return else: try: result = client.service.stamp(**args) except Fault as e: self.error = str(e) return return self._check_result(method, result) def _get_xml(self, uuid): if not self._validate_uuid(uuid): return '' method = 'util' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': AUTH['USER'], 'password': AUTH['PASS'], 'uuid': uuid, 'taxpayer_id': self.rfc, 'invoice_type': 'I', } try: result = client.service.get_xml(**args) except Fault as e: self.error = str(e) return '' except TransportError as e: self.error = str(e) return '' if result.error: self.error = result.error return '' tree = parseString(result.xml) xml = tree.toprettyxml(encoding='utf-8').decode('utf-8') return xml def recupera_xml(self, file_xml='', uuid=''): self.error = '' if uuid: return self._get_xml(uuid) method = 'timbra' ok, xml = self._validate_xml(file_xml) if not ok: return '' client = Client( URL[method], transport=self._transport, plugins=self._plugins) try: result = client.service.stamped(xml, AUTH['USER'], AUTH['PASS']) except Fault as e: self.error = str(e) return '' return self._check_result(method, result) def estatus_xml(self, uuid): method = 'timbra' if not self._validate_uuid(uuid): return '' client = Client( URL[method], transport=self._transport, plugins=self._plugins) try: result = client.service.query_pending(AUTH['USER'], AUTH['PASS'], uuid) #~ print (result.date) #~ tree = parseString(unescape(result.xml)) #~ response = tree.toprettyxml(encoding='utf-8').decode('utf-8') return result.status except Fault as e: self.error = str(e) return '' def cancel_xml(self, rfc, uuids, path_cer, path_key): for u in uuids: if not self._validate_uuid(u): return '' cer = self._load_file(path_cer) key = self._load_file(path_key) method = 'cancel' client = Client( URL[method], transport=self._transport, plugins=self._plugins) uuid_type = client.get_type('ns1:UUIDS') sa = client.get_type('ns0:stringArray') args = { 'UUIDS': uuid_type(uuids=sa(string=uuids)), 'username': AUTH['USER'], 'password': AUTH['PASS'], 'taxpayer_id': rfc, 'cer': cer, 'key': key, 'store_pending': True, } try: result = client.service.cancel(**args) except Fault as e: self.error = str(e) return '' if result.CodEstatus and self.codes['205'] in result.CodEstatus: self.error = result.CodEstatus return '' return result def cancel_signature(self, file_xml): method = 'cancel' if os.path.isfile(file_xml): root = etree.parse(file_xml).getroot() else: root = etree.fromstring(file_xml) xml = etree.tostring(root) client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': AUTH['USER'], 'password': AUTH['PASS'], 'xml': xml, 'store_pending': True, } result = client.service.cancel_signature(**args) return result def get_acuse(self, rfc, uuids, type_acuse='C'): for u in uuids: if not self._validate_uuid(u): return '' method = 'cancel' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': AUTH['USER'], 'password': AUTH['PASS'], 'taxpayer_id': rfc, 'uuid': '', 'type': type_acuse, } try: result = [] for u in uuids: args['uuid'] = u r = client.service.get_receipt(**args) result.append(r) except Fault as e: self.error = str(e) return '' return result def estatus_cancel(self, uuids): for u in uuids: if not self._validate_uuid(u): return '' method = 'cancel' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': AUTH['USER'], 'password': AUTH['PASS'], 'uuid': '', } try: result = [] for u in uuids: args['uuid'] = u r = client.service.query_pending_cancellation(**args) result.append(r) except Fault as e: self.error = str(e) return '' return result def add_token(self, rfc, email): """ Se requiere cuenta de reseller para usar este método """ method = 'util' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': AUTH['USER'], 'password': AUTH['PASS'], 'name': rfc, 'token_username': email, 'taxpayer_id': rfc, 'status': True, } result = client.service.add_token(**args) return result def get_date(self): method = 'util' client = Client( URL[method], transport=self._transport, plugins=self._plugins) try: result = client.service.datetime(AUTH['USER'], AUTH['PASS']) except Fault as e: self.error = str(e) return '' if result.error: self.error = result.error return return result.datetime def add_client(self, rfc, type_user=False): """ Se requiere cuenta de reseller para usar este método type_user: False == 'P' == Prepago or True == 'O' == On demand """ tu = {False: 'P', True: 'O'} method = 'client' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'reseller_username': AUTH['USER'], 'reseller_password': AUTH['PASS'], 'taxpayer_id': rfc, 'type_user': tu[type_user], 'added': datetime.datetime.now().isoformat()[:19], } try: result = client.service.add(**args) except Fault as e: self.error = str(e) return '' return result def edit_client(self, rfc, status=True): """ Se requiere cuenta de reseller para usar este método status = 'A' or 'S' """ sv = {False: 'S', True: 'A'} method = 'client' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'reseller_username': AUTH['USER'], 'reseller_password': AUTH['PASS'], 'taxpayer_id': rfc, 'status': sv[status], } try: result = client.service.edit(**args) except Fault as e: self.error = str(e) return '' return result def get_client(self, rfc): """ Se requiere cuenta de reseller para usar este método """ method = 'client' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'reseller_username': AUTH['USER'], 'reseller_password': AUTH['PASS'], 'taxpayer_id': rfc, } try: result = client.service.get(**args) except Fault as e: self.error = str(e) return '' except TransportError as e: self.error = str(e) return '' return result def assign_client(self, rfc, credit): """ Se requiere cuenta de reseller para usar este método """ method = 'client' client = Client( URL[method], transport=self._transport, plugins=self._plugins) args = { 'username': AUTH['USER'], 'password': AUTH['PASS'], 'taxpayer_id': rfc, 'credit': credit, } try: result = client.service.assign(**args) except Fault as e: self.error = str(e) return '' return result def _get_data_sat(path): BF = 'string(//*[local-name()="{}"]/@{})' NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'} try: if os.path.isfile(path): tree = etree.parse(path).getroot() else: tree = etree.fromstring(path) data = {} emisor = escape( tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI) ) receptor = escape( tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI) ) data['total'] = tree.get('total') or tree.get('Total') data['emisor'] = emisor data['receptor'] = receptor data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID')) except Exception as e: print (e) return {} return '?re={emisor}&rr={receptor}&tt={total}&id={uuid}'.format(**data) def get_status_sat(xml): data = _get_data_sat(xml) if not data: return URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/ConsultaCFDIService.svc?wsdl' client = Client(URL, transport=Transport(cache=SqliteCache())) try: result = client.service.Consulta(expresionImpresa=data) except Exception as e: return 'Error: {}'.format(str(e)) return result.Estado def main(): return if __name__ == '__main__': main()