diff --git a/source/finkok/finkok1.py b/source/finkok/finkok1.py
index dda9195..a86b0ef 100644
--- a/source/finkok/finkok1.py
+++ b/source/finkok/finkok1.py
@@ -38,116 +38,7 @@ TIMEOUT = 10
DEBUG_SOAP = False
-class DebugPlugin(Plugin):
-
- def _to_string(self, envelope, name):
- if DEBUG_SOAP:
- data = ET.tostring(envelope, pretty_print=True, encoding='utf-8').decode()
- path = f'/tmp/soap_{name}.xml'
- with open(path, 'w') as f:
- f.write(data)
- return
-
- def egress(self, envelope, http_headers, operation, binding_options):
- self._to_string(envelope, 'request')
- return envelope, http_headers
-
- def ingress(self, envelope, http_headers, operation):
- self._to_string(envelope, 'response')
- return envelope, http_headers
-
-
class PACFinkok(object):
- URL = {
- 'quick_stamp': False,
- 'timbra': FINKOK['WS'].format('stamp'),
- 'cancel': FINKOK['WS'].format('cancel'),
- 'client': FINKOK['WS'].format('registration'),
- 'util': FINKOK['WS'].format('utilities'),
- }
- CODE = {
- '200': 'Comprobante timbrado satisfactoriamente',
- '205': 'No Encontrado',
- '307': 'Comprobante timbrado previamente',
- '702': 'No se encontro el RFC del emisor',
- 'IP': 'Invalid Passphrase',
- 'IPMSG': 'Frase de paso inválida',
- 'NE': 'No Encontrado',
- }
-
- def __init__(self):
- self._error = ''
- self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
- self._plugins = [DebugPlugin()]
-
- def _validate_result(self, result):
- if hasattr(result, 'CodEstatus'):
- ce = result.CodEstatus
- if ce == self.CODE['IP']:
- self.error = self.CODE['IPMSG']
- return {}
-
- if self.CODE['NE'] in ce:
- self.error = 'UUID ' + self.CODE['NE']
- return {}
-
- if self.CODE['200'] != ce:
- print('CodEstatus', type(ce), ce)
- return result
-
- if hasattr(result, 'Incidencias'):
- fault = result.Incidencias.Incidencia[0]
- cod_error = fault.CodigoError.encode('utf-8')
- msg_error = fault.MensajeIncidencia.encode('utf-8')
- error = 'Error: {}\n{}'.format(cod_error, msg_error)
- self.error = self.CODE.get(cod_error, error)
- return {}
-
- return result
-
- def _get_result(self, client, method, args):
- self.error = ''
- try:
- result = getattr(client.service, method)(**args)
- except Fault as e:
- self.error = str(e)
- return {}
- except TransportError as e:
- if '413' in str(e):
- self.error = '413
Documento muy grande para timbrar'
- else:
- self.error = str(e)
- return {}
- except ConnectionError as e:
- msg = '502 - Error de conexión'
- self.error = msg
- return {}
-
- return self._validate_result(result)
-
- def cfdi_stamp(self, cfdi, auth={}):
- if not auth:
- auth = FINKOK['AUTH']
-
- method = 'timbra'
- client = Client(
- self.URL[method], transport=self._transport, plugins=self._plugins)
- args = {
- 'username': auth['USER'],
- 'password': auth['PASS'],
- 'xml': cfdi,
- }
-
- result = self._get_result(client, 'stamp', args)
- if self.error:
- return {}
-
- data = {
- 'xml': self._to_string(result.xml),
- 'uuid': result.UUID,
- 'fecha': result.Fecha,
- }
- return data
def cfdi_cancel(self, rfc, uuid, cer, key, auth={}):
if not auth:
@@ -711,19 +602,3 @@ class PACFinkok(object):
error = 'Error: {}\n{}'.format(code_error, msg_error)
self.error = self.CODE.get(code_error, error)
return {}
-
-
-def main():
- rfc = 'TEST740115999'
- # ~ rfc = 'TCM970625MB1'
- email = 'test999@empresalibre.mx'
- pac = PACFinkok()
- result = pac.client_get(rfc)
- print(result)
- result = pac.client_add_timbres(rfc, 10)
- print(result)
- return
-
-
-if __name__ == '__main__':
- main()
diff --git a/source/finkok/finkok2.py b/source/finkok/finkok2.py
index 80f5807..7efc333 100644
--- a/source/finkok/finkok2.py
+++ b/source/finkok/finkok2.py
@@ -22,142 +22,8 @@ from zeep.exceptions import Fault, TransportError
from requests.exceptions import ConnectionError
-if __name__ == '__main__':
- from configpac import DEBUG, TIMEOUT, AUTH, URL
-else:
- from .configpac import DEBUG, TIMEOUT, AUTH, URL
-
-
-log = Logger('PAC')
-#~ node = client.create_message(client.service, SERVICE, **args)
-#~ print(etree.tostring(node, pretty_print=True).decode())
-
-
class Finkok(object):
- def __init__(self, auth={}):
- self.codes = URL['codes']
- self.error = ''
- self.message = ''
- self._transport = Transport(cache=SqliteCache(), timeout=TIMEOUT)
- self._plugins = None
- self._history = None
- self.uuid = ''
- self.fecha = None
- if DEBUG:
- self._history = HistoryPlugin()
- self._plugins = [self._history]
- self._auth = AUTH
- else:
- self._auth = auth
-
- def _debug(self):
- if not DEBUG:
- return
- print('SEND', self._history.last_sent)
- print('RESULT', self._history.last_received)
- return
-
- def _check_result(self, method, result):
- # ~ print ('CODE', result.CodEstatus)
- # ~ print ('INCIDENCIAS', result.Incidencias)
- self.message = ''
- MSG = {
- 'OK': 'Comprobante timbrado satisfactoriamente',
- '307': 'Comprobante timbrado previamente',
- }
- status = result.CodEstatus
- if status is None and result.Incidencias:
- for i in result.Incidencias['Incidencia']:
- self.error += 'Error: {}\n{}\n{}'.format(
- i['CodigoError'], i['MensajeIncidencia'], i['ExtraInfo'])
- return ''
-
- if method == 'timbra' and status in (MSG['OK'], MSG['307']):
- #~ print ('UUID', result.UUID)
- #~ print ('FECHA', result.Fecha)
- if status == MSG['307']:
- self.message = MSG['307']
- tree = parseString(result.xml)
- response = tree.toprettyxml(encoding='utf-8').decode('utf-8')
- self.uuid = result.UUID
- self.fecha = result.Fecha
-
- return response
-
- def _load_file(self, path):
- try:
- with open(path, 'rb') as f:
- data = f.read()
- except Exception as e:
- self.error = str(e)
- return
- return data
-
- def _validate_xml(self, file_xml):
- if os.path.isfile(file_xml):
- try:
- with open(file_xml, 'rb') as f:
- xml = f.read()
- except Exception as e:
- self.error = str(e)
- return False, ''
- else:
- xml = file_xml.encode('utf-8')
- return True, xml
-
- def _validate_uuid(self, uuid):
- try:
- UUID(uuid)
- return True
- except ValueError:
- self.error = 'UUID no válido: {}'.format(uuid)
- return False
-
- def timbra_xml(self, file_xml):
- self.error = ''
-
- if not DEBUG and not self._auth:
- self.error = 'Sin datos para timbrar'
- return
-
- method = 'timbra'
- ok, xml = self._validate_xml(file_xml)
- if not ok:
- return ''
- client = Client(
- URL[method], transport=self._transport, plugins=self._plugins)
-
- args = {
- 'username': self._auth['USER'],
- 'password': self._auth['PASS'],
- 'xml': xml,
- }
- if URL['quick_stamp']:
- try:
- result = client.service.quick_stamp(**args)
- except Fault as e:
- self.error = str(e)
- return
- else:
- try:
- result = client.service.stamp(**args)
- except Fault as e:
- self.error = str(e)
- return
- except TransportError as e:
- if '413' in str(e):
- self.error = '413
Documento muy grande para timbrar'
- else:
- self.error = str(e)
- return
- except ConnectionError as e:
- msg = '502 - Error de conexión'
- self.error = msg
- return
-
- return self._check_result(method, result)
-
def _get_xml(self, uuid):
if not self._validate_uuid(uuid):
return ''
@@ -562,75 +428,3 @@ class Finkok(object):
return self.result.users.ResellerUser[0].credit
-
-def _get_data_sat(path):
- BF = 'string(//*[local-name()="{}"]/@{})'
- NS_CFDI = {'cfdi': 'http://www.sat.gob.mx/cfd/3'}
-
- try:
- if os.path.isfile(path):
- tree = etree.parse(path).getroot()
- else:
- tree = etree.fromstring(path.encode())
-
- data = {}
- emisor = escape(
- tree.xpath('string(//cfdi:Emisor/@rfc)', namespaces=NS_CFDI) or
- tree.xpath('string(//cfdi:Emisor/@Rfc)', namespaces=NS_CFDI)
- )
- receptor = escape(
- tree.xpath('string(//cfdi:Receptor/@rfc)', namespaces=NS_CFDI) or
- tree.xpath('string(//cfdi:Receptor/@Rfc)', namespaces=NS_CFDI)
- )
- data['total'] = tree.get('total') or tree.get('Total')
- data['emisor'] = emisor
- data['receptor'] = receptor
- data['uuid'] = tree.xpath(BF.format('TimbreFiscalDigital', 'UUID'))
- except Exception as e:
- print (e)
- return {}
-
- return '?re={emisor}&rr={receptor}&tt={total}&id={uuid}'.format(**data)
-
-
-def get_status_sat(xml):
- data = _get_data_sat(xml)
- if not data:
- return 'XML inválido'
-
- data = """
-
-
-
-
-
- {}
-
-
-
- """.format(data)
- headers = {
- 'SOAPAction': '"http://tempuri.org/IConsultaCFDIService/Consulta"',
- 'Content-type': 'text/xml; charset="UTF-8"'
- }
- URL = 'https://consultaqr.facturaelectronica.sat.gob.mx/consultacfdiservice.svc'
-
- try:
- result = requests.post(URL, data=data, headers=headers)
- tree = etree.fromstring(result.text)
- node = tree.xpath("//*[local-name() = 'Estado']")[0]
- except Exception as e:
- return 'Error: {}'.format(str(e))
-
- return node.text
-
-
-def main():
- return
-
-
-if __name__ == '__main__':
- main()