Compare commits
4 Commits
Author | SHA1 | Date |
---|---|---|
el Mau | 58eb4d6727 | |
el Mau | e79967aef3 | |
el Mau | 378a05df7d | |
el Mau | 21f87e52c6 |
|
@ -1,3 +1,3 @@
|
|||
falcon
|
||||
requests
|
||||
pyOpenSSL
|
||||
python-dateutil
|
||||
|
|
|
@ -6,3 +6,6 @@ RUTA_FIEL = ''
|
|||
|
||||
# ~ Nombre predeterminado de los archivos FIEL
|
||||
NOMBRE_FIEL = 'fiel'
|
||||
|
||||
|
||||
TIMEOUT = 10
|
||||
|
|
|
@ -14,11 +14,11 @@ class JSONTranslator():
|
|||
|
||||
class AppApi(object):
|
||||
|
||||
def on_get(self, req, resp, rfc, cfdi):
|
||||
resp.context['result'] = get_uuid(rfc, cfdi)
|
||||
def on_get(self, req, resp, rfc, tipo, cfdi):
|
||||
resp.context['result'] = get_uuid(rfc, tipo, cfdi)
|
||||
resp.status = falcon.HTTP_200
|
||||
|
||||
|
||||
app = falcon.App(middleware=[JSONTranslator()])
|
||||
api = AppApi()
|
||||
app.add_route('/api/{rfc}/{cfdi}', api)
|
||||
app.add_route('/api/{rfc}/{tipo}/{cfdi}', api)
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import base64
|
||||
import datetime
|
||||
import re
|
||||
from subprocess import check_output
|
||||
from dateutil.parser import parse
|
||||
|
||||
|
||||
OPENSSL = 'openssl'
|
||||
|
||||
|
||||
class SATCertificate(object):
|
||||
|
||||
def __init__(self, cer, pem):
|
||||
self._error = ''
|
||||
self._init_values()
|
||||
self._get_data_cer(cer)
|
||||
self._key_pem = pem
|
||||
|
||||
def _init_values(self):
|
||||
self._rfc = ''
|
||||
self._serial_number_int = 0
|
||||
self._serial_number_str = ''
|
||||
self._not_before = None
|
||||
self._not_after = None
|
||||
self._is_fiel = False
|
||||
self._are_couple = False
|
||||
self._is_valid_time = False
|
||||
self._cer_pem = ''
|
||||
self._cer_txt = ''
|
||||
self._key = b''
|
||||
self._key_enc = b''
|
||||
self._key_pem = b''
|
||||
self._cer_modulus = 0
|
||||
self._key_modulus = 0
|
||||
self._issuer = ''
|
||||
self._fert = ''
|
||||
return
|
||||
|
||||
def _get_data_cer(self, cer):
|
||||
# ~ RFC
|
||||
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -subject'
|
||||
result = check_output(cmd, shell=True).decode()
|
||||
pattern = r'[A-Z]{3,4}[0-9]{6}[A-Z0-9]{3}'
|
||||
self._rfc = re.search(pattern, result)[0]
|
||||
|
||||
# ~ Serial number
|
||||
sep = '='
|
||||
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -serial'
|
||||
result = check_output(cmd, shell=True).decode()
|
||||
self._serial_number2 = result.split(sep)[1]
|
||||
self._serial_number_str = self._serial_number2[1::2]
|
||||
|
||||
# ~ Dates
|
||||
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -dates'
|
||||
result = check_output(cmd, shell=True).decode()
|
||||
data = result.split('\n')
|
||||
self._not_before = parse(data[0].split('=')[1])
|
||||
self._not_after = parse(data[1].split('=')[1])
|
||||
self._fert = self._not_after.strftime('%y%m%d%H%M%SZ')
|
||||
|
||||
# ~ Issuer
|
||||
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -issuer'
|
||||
self._issuer = check_output(cmd, shell=True).decode()[7:]
|
||||
|
||||
now = datetime.datetime.utcnow().timestamp()
|
||||
self._is_valid_time = (now > self.not_before.timestamp()) and (now < self.not_after.timestamp())
|
||||
if not self._is_valid_time:
|
||||
msg = 'El certificado no es vigente'
|
||||
self._error = msg
|
||||
|
||||
# ~ cmd = f'{OPENSSL} x509 -inform der -in "{cer}"'
|
||||
# ~ result = check_output(cmd, shell=True).decode()
|
||||
# ~ self._cer_txt = ''.join(result.split('\n')[1:-2])
|
||||
return
|
||||
|
||||
def __str__(self):
|
||||
msg = '\n\tRFC: {}\n'.format(self.rfc)
|
||||
msg += '\tNo de Serie: {}\n'.format(self.serial_number_str)
|
||||
msg += '\tVálido desde: {}\n'.format(self.not_before)
|
||||
msg += '\tVálido hasta: {}\n'.format(self.not_after)
|
||||
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
|
||||
msg += '\tSon pareja: {}\n'.format(self.are_couple)
|
||||
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
|
||||
return msg
|
||||
|
||||
@property
|
||||
def rfc(self):
|
||||
return self._rfc
|
||||
|
||||
@property
|
||||
def serial_number_str(self):
|
||||
return self._serial_number_str
|
||||
|
||||
@property
|
||||
def not_before(self):
|
||||
return self._not_before
|
||||
|
||||
@property
|
||||
def not_after(self):
|
||||
return self._not_after
|
||||
|
||||
@property
|
||||
def is_valid_time(self):
|
||||
return self._is_valid_time
|
||||
|
||||
@property
|
||||
def are_couple(self):
|
||||
return self._are_couple
|
||||
|
||||
@property
|
||||
def is_fiel(self):
|
||||
return self._is_fiel
|
||||
|
||||
@property
|
||||
def fert(self):
|
||||
return self._fert
|
||||
|
||||
def sign(self, data, name_hash='sha256'):
|
||||
cmd = f"echo -n -e '{data}' | {OPENSSL} dgst -{name_hash} -sign '{self._key_pem}' | {OPENSSL} enc -base64"
|
||||
result = check_output(cmd, shell=True).decode().replace('\n', '')
|
||||
return base64.b64encode(result.encode()).decode()
|
|
@ -17,11 +17,13 @@ import logging
|
|||
from html.parser import HTMLParser
|
||||
from uuid import UUID
|
||||
|
||||
from OpenSSL import crypto
|
||||
# ~ from OpenSSL import crypto
|
||||
import requests
|
||||
from requests import Session, exceptions, adapters
|
||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL'
|
||||
|
||||
from conf import TIMEOUT
|
||||
|
||||
|
||||
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
||||
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
||||
|
@ -32,7 +34,6 @@ logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
TIMEOUT = 10
|
||||
VERIFY_CERT = True
|
||||
|
||||
|
||||
|
@ -406,20 +407,6 @@ class PortalSAT(object):
|
|||
post['ctl00$ScriptManager1'] = sm
|
||||
return post
|
||||
|
||||
def _get_data_cert(self, cert):
|
||||
with open(cert['cer'], 'rb') as fh:
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, fh.read())
|
||||
rfc = cert.get_subject().x500UniqueIdentifier.split(' ')[0]
|
||||
serie = '{0:x}'.format(cert.get_serial_number())[1::2]
|
||||
fert = cert.get_notAfter().decode()[2:]
|
||||
return rfc, serie, fert
|
||||
|
||||
def _sign(self, cert, data):
|
||||
with open(cert['key']) as fh:
|
||||
key = crypto.load_privatekey(crypto.FILETYPE_PEM, fh.read())
|
||||
sign = base64.b64encode(crypto.sign(key, data, 'sha256'))
|
||||
return base64.b64encode(sign).decode('utf-8')
|
||||
|
||||
def _get_token(self, firma, co):
|
||||
co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
|
||||
data = '{}#{}'.format(co, firma).encode('utf-8')
|
||||
|
@ -427,14 +414,13 @@ class PortalSAT(object):
|
|||
return token
|
||||
|
||||
def _make_data_form(self, cert, values):
|
||||
rfc, serie, fert = self._get_data_cert(cert)
|
||||
co = '{}|{}|{}'.format(values['tokenuuid'], rfc, serie)
|
||||
firma = self._sign(cert, co)
|
||||
co = f"{values['tokenuuid']}|{cert.rfc}|{cert.serial_number_str}"
|
||||
firma = cert.sign(co)
|
||||
token = self._get_token(firma, co)
|
||||
|
||||
keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
|
||||
data = {k: values[k] for k in keys}
|
||||
data['fert'] = fert
|
||||
data['fert'] = cert.fert
|
||||
data['token'] = token
|
||||
data['arc'] = ''
|
||||
data['placer'] = ''
|
||||
|
@ -466,6 +452,7 @@ class PortalSAT(object):
|
|||
|
||||
if not result:
|
||||
msg = 'Error al identificarse en el SAT'
|
||||
self.error = msg
|
||||
log.error(msg)
|
||||
return False
|
||||
data = self._read_form(result)
|
||||
|
@ -544,20 +531,16 @@ class PortalSAT(object):
|
|||
|
||||
return xml
|
||||
|
||||
def get_uuid(self, cfdi_uuid):
|
||||
def get_uuid(self, tipo, cfdi_uuid):
|
||||
data = {'error': '', 'xml': ''}
|
||||
msg = f'Buscando UUID: {cfdi_uuid}'
|
||||
log.debug(msg)
|
||||
|
||||
# ~ Recibidos
|
||||
filters = self._get_filters(cfdi_uuid, False)
|
||||
data['xml'] = self._search_by_uuid(filters)
|
||||
|
||||
if data['xml']:
|
||||
return data
|
||||
|
||||
# ~ Emitidos
|
||||
filters = self._get_filters(cfdi_uuid, True)
|
||||
data['xml'] = self._search_by_uuid(filters)
|
||||
if tipo == 'r':
|
||||
filters = self._get_filters(cfdi_uuid, False)
|
||||
data['xml'] = self._search_by_uuid(filters)
|
||||
elif tipo == 'e':
|
||||
filters = self._get_filters(cfdi_uuid, True)
|
||||
data['xml'] = self._search_by_uuid(filters)
|
||||
|
||||
return data
|
||||
|
|
|
@ -1,13 +1,23 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from uuid import UUID
|
||||
from OpenSSL import crypto
|
||||
|
||||
from .cfdi_cert import SATCertificate
|
||||
from .portal_sat import PortalSAT
|
||||
from conf import RUTA_FIEL, NOMBRE_FIEL
|
||||
|
||||
|
||||
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
||||
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
||||
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
|
||||
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
|
||||
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
|
||||
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def validate_uuid(value):
|
||||
try:
|
||||
UUID(value)
|
||||
|
@ -34,34 +44,42 @@ def validate_fiel(rfc):
|
|||
error = f'No se encontró el archivo: {path_fiel_pem}'
|
||||
return cert, error
|
||||
|
||||
cert['key'] = str(path_fiel_pem)
|
||||
cert['cer'] = str(path_fiel_cer)
|
||||
cert = SATCertificate(str(path_fiel_cer), str(path_fiel_pem))
|
||||
|
||||
return cert, error
|
||||
|
||||
|
||||
def get_uuid(rfc, cfdi_uuid):
|
||||
def get_uuid(rfc, tipo, cfdi_uuid):
|
||||
data = {'error': '', 'xml': ''}
|
||||
|
||||
if not validate_uuid(cfdi_uuid):
|
||||
data['error'] = 'UUID inválido'
|
||||
return data
|
||||
|
||||
if not tipo.lower() in ('e', 'r'):
|
||||
data['error'] = 'Tipo inválido, debe ser e o r'
|
||||
return data
|
||||
|
||||
cert, error = validate_fiel(rfc)
|
||||
if not cert:
|
||||
data['error'] = error
|
||||
return data
|
||||
|
||||
sat = PortalSAT()
|
||||
sat.login(cert)
|
||||
try:
|
||||
sat = PortalSAT()
|
||||
sat.login(cert)
|
||||
|
||||
if not sat.is_connect:
|
||||
if not sat.is_connect:
|
||||
sat.logout()
|
||||
data['error'] = sat.error
|
||||
log.error(sat.error)
|
||||
return data
|
||||
|
||||
data = sat.get_uuid(tipo, cfdi_uuid)
|
||||
sat.logout()
|
||||
data['error'] = sat.error
|
||||
log.eror(sat.error)
|
||||
return data
|
||||
|
||||
data = sat.get_uuid(cfdi_uuid)
|
||||
sat.logout()
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
data['xml'] = ''
|
||||
data['error'] = str(e)
|
||||
|
||||
return data
|
||||
|
|
Loading…
Reference in New Issue