From e432200a35bb6cfda44e78343013ed3e11454494 Mon Sep 17 00:00:00 2001 From: Mauricio Baeza Date: Fri, 1 Jan 2021 18:39:34 -0600 Subject: [PATCH] Clean review methods --- source/finkok/finkok1.py | 125 ------------------------ source/finkok/finkok2.py | 206 --------------------------------------- 2 files changed, 331 deletions(-) diff --git a/source/finkok/finkok1.py b/source/finkok/finkok1.py index dda9195..a86b0ef 100644 --- a/source/finkok/finkok1.py +++ b/source/finkok/finkok1.py @@ -38,116 +38,7 @@ TIMEOUT = 10 DEBUG_SOAP = False -class DebugPlugin(Plugin): - - def _to_string(self, envelope, name): - if DEBUG_SOAP: - data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode() - path = f'/tmp/soap_{name}.xml' - with open(path, 'w') as f: - f.write(data) - return - - def egress(self, envelope, http_headers, operation, binding_options): - self._to_string(envelope, 'request') - return envelope, http_headers - - def ingress(self, envelope, http_headers, operation): - self._to_string(envelope, 'response') - return envelope, http_headers - - class PACFinkok(object): - URL = { - 'quick_stamp': False, - 'timbra': FINKOK['WS'].format('stamp'), - 'cancel': FINKOK['WS'].format('cancel'), - 'client': FINKOK['WS'].format('registration'), - 'util': FINKOK['WS'].format('utilities'), - } - CODE = { - '200': 'Comprobante timbrado satisfactoriamente', - '205': 'No Encontrado', - '307': 'Comprobante timbrado previamente', - '702': 'No se encontro el RFC del emisor', - 'IP': 'Invalid Passphrase', - 'IPMSG': 'Frase de paso inválida', - 'NE': 'No Encontrado', - } - - def __init__(self): - self._error = '' - self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) - self._plugins = [DebugPlugin()] - - def _validate_result(self, result): - if hasattr(result, 'CodEstatus'): - ce = result.CodEstatus - if ce == self.CODE['IP']: - self.error = self.CODE['IPMSG'] - return {} - - if self.CODE['NE'] in ce: - self.error = 'UUID ' + self.CODE['NE'] - return {} - - if self.CODE['200'] != ce: - print('CodEstatus', type(ce), ce) - return result - - if hasattr(result, 'Incidencias'): - fault = result.Incidencias.Incidencia[0] - cod_error = fault.CodigoError.encode('utf-8') - msg_error = fault.MensajeIncidencia.encode('utf-8') - error = 'Error: {}\n{}'.format(cod_error, msg_error) - self.error = self.CODE.get(cod_error, error) - return {} - - return result - - def _get_result(self, client, method, args): - self.error = '' - try: - result = getattr(client.service, method)(**args) - except Fault as e: - self.error = str(e) - return {} - except TransportError as e: - if '413' in str(e): - self.error = '413

Documento muy grande para timbrar' - else: - self.error = str(e) - return {} - except ConnectionError as e: - msg = '502 - Error de conexión' - self.error = msg - return {} - - return self._validate_result(result) - - def cfdi_stamp(self, cfdi, auth={}): - if not auth: - auth = FINKOK['AUTH'] - - method = 'timbra' - client = Client( - self.URL[method], transport=self._transport, plugins=self._plugins) - args = { - 'username': auth['USER'], - 'password': auth['PASS'], - 'xml': cfdi, - } - - result = self._get_result(client, 'stamp', args) - if self.error: - return {} - - data = { - 'xml': self._to_string(result.xml), - 'uuid': result.UUID, - 'fecha': result.Fecha, - } - return data def cfdi_cancel(self, rfc, uuid, cer, key, auth={}): if not auth: @@ -711,19 +602,3 @@ class PACFinkok(object): error = 'Error: {}\n{}'.format(code_error, msg_error) self.error = self.CODE.get(code_error, error) return {} - - -def main(): - rfc = 'TEST740115999' - # ~ rfc = 'TCM970625MB1' - email = 'test999@empresalibre.mx' - pac = PACFinkok() - result = pac.client_get(rfc) - print(result) - result = pac.client_add_timbres(rfc, 10) - print(result) - return - - -if __name__ == '__main__': - main() diff --git a/source/finkok/finkok2.py b/source/finkok/finkok2.py index 80f5807..7efc333 100644 --- a/source/finkok/finkok2.py +++ b/source/finkok/finkok2.py @@ -22,142 +22,8 @@ from zeep.exceptions import Fault, TransportError from requests.exceptions import ConnectionError -if __name__ == '__main__': - from configpac import DEBUG, TIMEOUT, AUTH, URL -else: - from .configpac import DEBUG, TIMEOUT, AUTH, URL - - -log = Logger('PAC') -#~ node = client.create_message(client.service, SERVICE, **args) -#~ print(etree.tostring(node, pretty_print=True).decode()) - - class Finkok(object): - def __init__(self, auth={}): - self.codes = URL['codes'] - self.error = '' - self.message = '' - self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT) - self._plugins = None - self._history = None - self.uuid = '' - self.fecha = None - if DEBUG: - self._history = HistoryPlugin() - self._plugins = [self._history] - self._auth = AUTH - else: - self._auth = auth - - def _debug(self): - if not DEBUG: - return - print('SEND', self._history.last_sent) - print('RESULT', self._history.last_received) - return - - def _check_result(self, method, result): - # ~ print ('CODE', result.CodEstatus) - # ~ print ('INCIDENCIAS', result.Incidencias) - self.message = '' - MSG = { - 'OK': 'Comprobante timbrado satisfactoriamente', - '307': 'Comprobante timbrado previamente', - } - status = result.CodEstatus - if status is None and result.Incidencias: - for i in result.Incidencias['Incidencia']: - self.error += 'Error: {}\n{}\n{}'.format( - i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo']) - return '' - - if method == 'timbra' and status in (MSG['OK'], MSG['307']): - #~ print ('UUID', result.UUID) - #~ print ('FECHA', result.Fecha) - if status == MSG['307']: - self.message = MSG['307'] - tree = parseString(result.xml) - response = tree.toprettyxml(encoding='utf-8').decode('utf-8') - self.uuid = result.UUID - self.fecha = result.Fecha - - return response - - def _load_file(self, path): - try: - with open(path, 'rb') as f: - data = f.read() - except Exception as e: - self.error = str(e) - return - return data - - def _validate_xml(self, file_xml): - if os.path.isfile(file_xml): - try: - with open(file_xml, 'rb') as f: - xml = f.read() - except Exception as e: - self.error = str(e) - return False, '' - else: - xml = file_xml.encode('utf-8') - return True, xml - - def _validate_uuid(self, uuid): - try: - UUID(uuid) - return True - except ValueError: - self.error = 'UUID no válido: {}'.format(uuid) - return False - - def timbra_xml(self, file_xml): - self.error = '' - - if not DEBUG and not self._auth: - self.error = 'Sin datos para timbrar' - return - - method = 'timbra' - ok, xml = self._validate_xml(file_xml) - if not ok: - return '' - client = Client( - URL[method], transport=self._transport, plugins=self._plugins) - - args = { - 'username': self._auth['USER'], - 'password': self._auth['PASS'], - 'xml': xml, - } - if URL['quick_stamp']: - try: - result = client.service.quick_stamp(**args) - except Fault as e: - self.error = str(e) - return - else: - try: - result = client.service.stamp(**args) - except Fault as e: - self.error = str(e) - return - except TransportError as e: - if '413' in str(e): - self.error = '413

Documento muy grande para timbrar' - else: - self.error = str(e) - return - except ConnectionError as e: - msg = '502 - Error de conexión' - self.error = msg - return - - return self._check_result(method, result) - def _get_xml(self, uuid): if not self._validate_uuid(uuid): return '' @@ -562,75 +428,3 @@ class Finkok(object): return self.result.users.ResellerUser[0].credit - -def _get_data_sat(path): - BF = 'string(//*[local-name()="{}"]/@{})' - NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'} - - try: - if os.path.isfile(path): - tree = etree.parse(path).getroot() - else: - tree = etree.fromstring(path.encode()) - - data = {} - emisor = escape( - tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or - tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI) - ) - receptor = escape( - tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or - tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI) - ) - data['total'] = tree.get('total') or tree.get('Total') - data['emisor'] = emisor - data['receptor'] = receptor - data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID')) - except Exception as e: - print (e) - return {} - - return '?re={emisor}&rr={receptor}&tt={total}&id={uuid}'.format(**data) - - -def get_status_sat(xml): - data = _get_data_sat(xml) - if not data: - return 'XML inválido' - - data = """ - - - - - - {} - - - - """.format(data) - headers = { - 'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"', - 'Content-type': 'text/xml; charset="UTF-8"' - } - URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc' - - try: - result = requests.post(URL, data=data, headers=headers) - tree = etree.fromstring(result.text) - node = tree.xpath("//*[local-name() = 'Estado']")[0] - except Exception as e: - return 'Error: {}'.format(str(e)) - - return node.text - - -def main(): - return - - -if __name__ == '__main__': - main()