Compare commits
4 Commits
Author | SHA1 | Date |
---|---|---|
el Mau | 58eb4d6727 | |
el Mau | e79967aef3 | |
el Mau | 378a05df7d | |
el Mau | 21f87e52c6 |
|
@ -1,3 +1,3 @@
|
||||||
falcon
|
falcon
|
||||||
requests
|
requests
|
||||||
pyOpenSSL
|
python-dateutil
|
||||||
|
|
|
@ -6,3 +6,6 @@ RUTA_FIEL = ''
|
||||||
|
|
||||||
# ~ Nombre predeterminado de los archivos FIEL
|
# ~ Nombre predeterminado de los archivos FIEL
|
||||||
NOMBRE_FIEL = 'fiel'
|
NOMBRE_FIEL = 'fiel'
|
||||||
|
|
||||||
|
|
||||||
|
TIMEOUT = 10
|
||||||
|
|
|
@ -14,11 +14,11 @@ class JSONTranslator():
|
||||||
|
|
||||||
class AppApi(object):
|
class AppApi(object):
|
||||||
|
|
||||||
def on_get(self, req, resp, rfc, cfdi):
|
def on_get(self, req, resp, rfc, tipo, cfdi):
|
||||||
resp.context['result'] = get_uuid(rfc, cfdi)
|
resp.context['result'] = get_uuid(rfc, tipo, cfdi)
|
||||||
resp.status = falcon.HTTP_200
|
resp.status = falcon.HTTP_200
|
||||||
|
|
||||||
|
|
||||||
app = falcon.App(middleware=[JSONTranslator()])
|
app = falcon.App(middleware=[JSONTranslator()])
|
||||||
api = AppApi()
|
api = AppApi()
|
||||||
app.add_route('/api/{rfc}/{cfdi}', api)
|
app.add_route('/api/{rfc}/{tipo}/{cfdi}', api)
|
||||||
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import datetime
|
||||||
|
import re
|
||||||
|
from subprocess import check_output
|
||||||
|
from dateutil.parser import parse
|
||||||
|
|
||||||
|
|
||||||
|
OPENSSL = 'openssl'
|
||||||
|
|
||||||
|
|
||||||
|
class SATCertificate(object):
|
||||||
|
|
||||||
|
def __init__(self, cer, pem):
|
||||||
|
self._error = ''
|
||||||
|
self._init_values()
|
||||||
|
self._get_data_cer(cer)
|
||||||
|
self._key_pem = pem
|
||||||
|
|
||||||
|
def _init_values(self):
|
||||||
|
self._rfc = ''
|
||||||
|
self._serial_number_int = 0
|
||||||
|
self._serial_number_str = ''
|
||||||
|
self._not_before = None
|
||||||
|
self._not_after = None
|
||||||
|
self._is_fiel = False
|
||||||
|
self._are_couple = False
|
||||||
|
self._is_valid_time = False
|
||||||
|
self._cer_pem = ''
|
||||||
|
self._cer_txt = ''
|
||||||
|
self._key = b''
|
||||||
|
self._key_enc = b''
|
||||||
|
self._key_pem = b''
|
||||||
|
self._cer_modulus = 0
|
||||||
|
self._key_modulus = 0
|
||||||
|
self._issuer = ''
|
||||||
|
self._fert = ''
|
||||||
|
return
|
||||||
|
|
||||||
|
def _get_data_cer(self, cer):
|
||||||
|
# ~ RFC
|
||||||
|
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -subject'
|
||||||
|
result = check_output(cmd, shell=True).decode()
|
||||||
|
pattern = r'[A-Z]{3,4}[0-9]{6}[A-Z0-9]{3}'
|
||||||
|
self._rfc = re.search(pattern, result)[0]
|
||||||
|
|
||||||
|
# ~ Serial number
|
||||||
|
sep = '='
|
||||||
|
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -serial'
|
||||||
|
result = check_output(cmd, shell=True).decode()
|
||||||
|
self._serial_number2 = result.split(sep)[1]
|
||||||
|
self._serial_number_str = self._serial_number2[1::2]
|
||||||
|
|
||||||
|
# ~ Dates
|
||||||
|
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -dates'
|
||||||
|
result = check_output(cmd, shell=True).decode()
|
||||||
|
data = result.split('\n')
|
||||||
|
self._not_before = parse(data[0].split('=')[1])
|
||||||
|
self._not_after = parse(data[1].split('=')[1])
|
||||||
|
self._fert = self._not_after.strftime('%y%m%d%H%M%SZ')
|
||||||
|
|
||||||
|
# ~ Issuer
|
||||||
|
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -issuer'
|
||||||
|
self._issuer = check_output(cmd, shell=True).decode()[7:]
|
||||||
|
|
||||||
|
now = datetime.datetime.utcnow().timestamp()
|
||||||
|
self._is_valid_time = (now > self.not_before.timestamp()) and (now < self.not_after.timestamp())
|
||||||
|
if not self._is_valid_time:
|
||||||
|
msg = 'El certificado no es vigente'
|
||||||
|
self._error = msg
|
||||||
|
|
||||||
|
# ~ cmd = f'{OPENSSL} x509 -inform der -in "{cer}"'
|
||||||
|
# ~ result = check_output(cmd, shell=True).decode()
|
||||||
|
# ~ self._cer_txt = ''.join(result.split('\n')[1:-2])
|
||||||
|
return
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
msg = '\n\tRFC: {}\n'.format(self.rfc)
|
||||||
|
msg += '\tNo de Serie: {}\n'.format(self.serial_number_str)
|
||||||
|
msg += '\tVálido desde: {}\n'.format(self.not_before)
|
||||||
|
msg += '\tVálido hasta: {}\n'.format(self.not_after)
|
||||||
|
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
|
||||||
|
msg += '\tSon pareja: {}\n'.format(self.are_couple)
|
||||||
|
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
|
||||||
|
return msg
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rfc(self):
|
||||||
|
return self._rfc
|
||||||
|
|
||||||
|
@property
|
||||||
|
def serial_number_str(self):
|
||||||
|
return self._serial_number_str
|
||||||
|
|
||||||
|
@property
|
||||||
|
def not_before(self):
|
||||||
|
return self._not_before
|
||||||
|
|
||||||
|
@property
|
||||||
|
def not_after(self):
|
||||||
|
return self._not_after
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_valid_time(self):
|
||||||
|
return self._is_valid_time
|
||||||
|
|
||||||
|
@property
|
||||||
|
def are_couple(self):
|
||||||
|
return self._are_couple
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_fiel(self):
|
||||||
|
return self._is_fiel
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fert(self):
|
||||||
|
return self._fert
|
||||||
|
|
||||||
|
def sign(self, data, name_hash='sha256'):
|
||||||
|
cmd = f"echo -n -e '{data}' | {OPENSSL} dgst -{name_hash} -sign '{self._key_pem}' | {OPENSSL} enc -base64"
|
||||||
|
result = check_output(cmd, shell=True).decode().replace('\n', '')
|
||||||
|
return base64.b64encode(result.encode()).decode()
|
|
@ -17,11 +17,13 @@ import logging
|
||||||
from html.parser import HTMLParser
|
from html.parser import HTMLParser
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from OpenSSL import crypto
|
# ~ from OpenSSL import crypto
|
||||||
import requests
|
import requests
|
||||||
from requests import Session, exceptions, adapters
|
from requests import Session, exceptions, adapters
|
||||||
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL'
|
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL'
|
||||||
|
|
||||||
|
from conf import TIMEOUT
|
||||||
|
|
||||||
|
|
||||||
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
||||||
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
||||||
|
@ -32,7 +34,6 @@ logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
TIMEOUT = 10
|
|
||||||
VERIFY_CERT = True
|
VERIFY_CERT = True
|
||||||
|
|
||||||
|
|
||||||
|
@ -406,20 +407,6 @@ class PortalSAT(object):
|
||||||
post['ctl00$ScriptManager1'] = sm
|
post['ctl00$ScriptManager1'] = sm
|
||||||
return post
|
return post
|
||||||
|
|
||||||
def _get_data_cert(self, cert):
|
|
||||||
with open(cert['cer'], 'rb') as fh:
|
|
||||||
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, fh.read())
|
|
||||||
rfc = cert.get_subject().x500UniqueIdentifier.split(' ')[0]
|
|
||||||
serie = '{0:x}'.format(cert.get_serial_number())[1::2]
|
|
||||||
fert = cert.get_notAfter().decode()[2:]
|
|
||||||
return rfc, serie, fert
|
|
||||||
|
|
||||||
def _sign(self, cert, data):
|
|
||||||
with open(cert['key']) as fh:
|
|
||||||
key = crypto.load_privatekey(crypto.FILETYPE_PEM, fh.read())
|
|
||||||
sign = base64.b64encode(crypto.sign(key, data, 'sha256'))
|
|
||||||
return base64.b64encode(sign).decode('utf-8')
|
|
||||||
|
|
||||||
def _get_token(self, firma, co):
|
def _get_token(self, firma, co):
|
||||||
co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
|
co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
|
||||||
data = '{}#{}'.format(co, firma).encode('utf-8')
|
data = '{}#{}'.format(co, firma).encode('utf-8')
|
||||||
|
@ -427,14 +414,13 @@ class PortalSAT(object):
|
||||||
return token
|
return token
|
||||||
|
|
||||||
def _make_data_form(self, cert, values):
|
def _make_data_form(self, cert, values):
|
||||||
rfc, serie, fert = self._get_data_cert(cert)
|
co = f"{values['tokenuuid']}|{cert.rfc}|{cert.serial_number_str}"
|
||||||
co = '{}|{}|{}'.format(values['tokenuuid'], rfc, serie)
|
firma = cert.sign(co)
|
||||||
firma = self._sign(cert, co)
|
|
||||||
token = self._get_token(firma, co)
|
token = self._get_token(firma, co)
|
||||||
|
|
||||||
keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
|
keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
|
||||||
data = {k: values[k] for k in keys}
|
data = {k: values[k] for k in keys}
|
||||||
data['fert'] = fert
|
data['fert'] = cert.fert
|
||||||
data['token'] = token
|
data['token'] = token
|
||||||
data['arc'] = ''
|
data['arc'] = ''
|
||||||
data['placer'] = ''
|
data['placer'] = ''
|
||||||
|
@ -466,6 +452,7 @@ class PortalSAT(object):
|
||||||
|
|
||||||
if not result:
|
if not result:
|
||||||
msg = 'Error al identificarse en el SAT'
|
msg = 'Error al identificarse en el SAT'
|
||||||
|
self.error = msg
|
||||||
log.error(msg)
|
log.error(msg)
|
||||||
return False
|
return False
|
||||||
data = self._read_form(result)
|
data = self._read_form(result)
|
||||||
|
@ -544,20 +531,16 @@ class PortalSAT(object):
|
||||||
|
|
||||||
return xml
|
return xml
|
||||||
|
|
||||||
def get_uuid(self, cfdi_uuid):
|
def get_uuid(self, tipo, cfdi_uuid):
|
||||||
data = {'error': '', 'xml': ''}
|
data = {'error': '', 'xml': ''}
|
||||||
msg = f'Buscando UUID: {cfdi_uuid}'
|
msg = f'Buscando UUID: {cfdi_uuid}'
|
||||||
log.debug(msg)
|
log.debug(msg)
|
||||||
|
|
||||||
# ~ Recibidos
|
if tipo == 'r':
|
||||||
filters = self._get_filters(cfdi_uuid, False)
|
filters = self._get_filters(cfdi_uuid, False)
|
||||||
data['xml'] = self._search_by_uuid(filters)
|
data['xml'] = self._search_by_uuid(filters)
|
||||||
|
elif tipo == 'e':
|
||||||
if data['xml']:
|
filters = self._get_filters(cfdi_uuid, True)
|
||||||
return data
|
data['xml'] = self._search_by_uuid(filters)
|
||||||
|
|
||||||
# ~ Emitidos
|
|
||||||
filters = self._get_filters(cfdi_uuid, True)
|
|
||||||
data['xml'] = self._search_by_uuid(filters)
|
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
|
@ -1,13 +1,23 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
from OpenSSL import crypto
|
|
||||||
|
|
||||||
|
from .cfdi_cert import SATCertificate
|
||||||
from .portal_sat import PortalSAT
|
from .portal_sat import PortalSAT
|
||||||
from conf import RUTA_FIEL, NOMBRE_FIEL
|
from conf import RUTA_FIEL, NOMBRE_FIEL
|
||||||
|
|
||||||
|
|
||||||
|
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
||||||
|
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
||||||
|
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
|
||||||
|
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
|
||||||
|
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
|
||||||
|
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def validate_uuid(value):
|
def validate_uuid(value):
|
||||||
try:
|
try:
|
||||||
UUID(value)
|
UUID(value)
|
||||||
|
@ -34,34 +44,42 @@ def validate_fiel(rfc):
|
||||||
error = f'No se encontró el archivo: {path_fiel_pem}'
|
error = f'No se encontró el archivo: {path_fiel_pem}'
|
||||||
return cert, error
|
return cert, error
|
||||||
|
|
||||||
cert['key'] = str(path_fiel_pem)
|
cert = SATCertificate(str(path_fiel_cer), str(path_fiel_pem))
|
||||||
cert['cer'] = str(path_fiel_cer)
|
|
||||||
|
|
||||||
return cert, error
|
return cert, error
|
||||||
|
|
||||||
|
|
||||||
def get_uuid(rfc, cfdi_uuid):
|
def get_uuid(rfc, tipo, cfdi_uuid):
|
||||||
data = {'error': '', 'xml': ''}
|
data = {'error': '', 'xml': ''}
|
||||||
|
|
||||||
if not validate_uuid(cfdi_uuid):
|
if not validate_uuid(cfdi_uuid):
|
||||||
data['error'] = 'UUID inválido'
|
data['error'] = 'UUID inválido'
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
if not tipo.lower() in ('e', 'r'):
|
||||||
|
data['error'] = 'Tipo inválido, debe ser e o r'
|
||||||
|
return data
|
||||||
|
|
||||||
cert, error = validate_fiel(rfc)
|
cert, error = validate_fiel(rfc)
|
||||||
if not cert:
|
if not cert:
|
||||||
data['error'] = error
|
data['error'] = error
|
||||||
return data
|
return data
|
||||||
|
|
||||||
sat = PortalSAT()
|
try:
|
||||||
sat.login(cert)
|
sat = PortalSAT()
|
||||||
|
sat.login(cert)
|
||||||
|
|
||||||
if not sat.is_connect:
|
if not sat.is_connect:
|
||||||
|
sat.logout()
|
||||||
|
data['error'] = sat.error
|
||||||
|
log.error(sat.error)
|
||||||
|
return data
|
||||||
|
|
||||||
|
data = sat.get_uuid(tipo, cfdi_uuid)
|
||||||
sat.logout()
|
sat.logout()
|
||||||
data['error'] = sat.error
|
except Exception as e:
|
||||||
log.eror(sat.error)
|
log.error(e)
|
||||||
return data
|
data['xml'] = ''
|
||||||
|
data['error'] = str(e)
|
||||||
data = sat.get_uuid(cfdi_uuid)
|
|
||||||
sat.logout()
|
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
Loading…
Reference in New Issue