Compare commits

...

4 Commits

Author SHA1 Message Date
el Mau 58eb4d6727 Add type download 2023-01-27 15:38:05 -06:00
el Mau e79967aef3 Fix log error 2023-01-26 23:08:00 -06:00
el Mau 378a05df7d Add try for catch error 2023-01-25 23:33:51 -06:00
el Mau 21f87e52c6 Remove pyOpenSsl, used openssl directly 2023-01-25 23:28:38 -06:00
6 changed files with 175 additions and 48 deletions

View File

@ -1,3 +1,3 @@
falcon
requests
pyOpenSSL
python-dateutil

View File

@ -6,3 +6,6 @@ RUTA_FIEL = ''
# ~ Nombre predeterminado de los archivos FIEL
NOMBRE_FIEL = 'fiel'
TIMEOUT = 10

View File

@ -14,11 +14,11 @@ class JSONTranslator():
class AppApi(object):
def on_get(self, req, resp, rfc, cfdi):
resp.context['result'] = get_uuid(rfc, cfdi)
def on_get(self, req, resp, rfc, tipo, cfdi):
resp.context['result'] = get_uuid(rfc, tipo, cfdi)
resp.status = falcon.HTTP_200
app = falcon.App(middleware=[JSONTranslator()])
api = AppApi()
app.add_route('/api/{rfc}/{cfdi}', api)
app.add_route('/api/{rfc}/{tipo}/{cfdi}', api)

123
source/sat/cfdi_cert.py Normal file
View File

@ -0,0 +1,123 @@
#!/usr/bin/env python
import base64
import datetime
import re
from subprocess import check_output
from dateutil.parser import parse
OPENSSL = 'openssl'
class SATCertificate(object):
def __init__(self, cer, pem):
self._error = ''
self._init_values()
self._get_data_cer(cer)
self._key_pem = pem
def _init_values(self):
self._rfc = ''
self._serial_number_int = 0
self._serial_number_str = ''
self._not_before = None
self._not_after = None
self._is_fiel = False
self._are_couple = False
self._is_valid_time = False
self._cer_pem = ''
self._cer_txt = ''
self._key = b''
self._key_enc = b''
self._key_pem = b''
self._cer_modulus = 0
self._key_modulus = 0
self._issuer = ''
self._fert = ''
return
def _get_data_cer(self, cer):
# ~ RFC
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -subject'
result = check_output(cmd, shell=True).decode()
pattern = r'[A-Z]{3,4}[0-9]{6}[A-Z0-9]{3}'
self._rfc = re.search(pattern, result)[0]
# ~ Serial number
sep = '='
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -serial'
result = check_output(cmd, shell=True).decode()
self._serial_number2 = result.split(sep)[1]
self._serial_number_str = self._serial_number2[1::2]
# ~ Dates
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -dates'
result = check_output(cmd, shell=True).decode()
data = result.split('\n')
self._not_before = parse(data[0].split('=')[1])
self._not_after = parse(data[1].split('=')[1])
self._fert = self._not_after.strftime('%y%m%d%H%M%SZ')
# ~ Issuer
cmd = f'{OPENSSL} x509 -inform der -in "{cer}" -noout -issuer'
self._issuer = check_output(cmd, shell=True).decode()[7:]
now = datetime.datetime.utcnow().timestamp()
self._is_valid_time = (now > self.not_before.timestamp()) and (now < self.not_after.timestamp())
if not self._is_valid_time:
msg = 'El certificado no es vigente'
self._error = msg
# ~ cmd = f'{OPENSSL} x509 -inform der -in "{cer}"'
# ~ result = check_output(cmd, shell=True).decode()
# ~ self._cer_txt = ''.join(result.split('\n')[1:-2])
return
def __str__(self):
msg = '\n\tRFC: {}\n'.format(self.rfc)
msg += '\tNo de Serie: {}\n'.format(self.serial_number_str)
msg += '\tVálido desde: {}\n'.format(self.not_before)
msg += '\tVálido hasta: {}\n'.format(self.not_after)
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
msg += '\tSon pareja: {}\n'.format(self.are_couple)
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
return msg
@property
def rfc(self):
return self._rfc
@property
def serial_number_str(self):
return self._serial_number_str
@property
def not_before(self):
return self._not_before
@property
def not_after(self):
return self._not_after
@property
def is_valid_time(self):
return self._is_valid_time
@property
def are_couple(self):
return self._are_couple
@property
def is_fiel(self):
return self._is_fiel
@property
def fert(self):
return self._fert
def sign(self, data, name_hash='sha256'):
cmd = f"echo -n -e '{data}' | {OPENSSL} dgst -{name_hash} -sign '{self._key_pem}' | {OPENSSL} enc -base64"
result = check_output(cmd, shell=True).decode().replace('\n', '')
return base64.b64encode(result.encode()).decode()

View File

@ -17,11 +17,13 @@ import logging
from html.parser import HTMLParser
from uuid import UUID
from OpenSSL import crypto
# ~ from OpenSSL import crypto
import requests
from requests import Session, exceptions, adapters
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL'
from conf import TIMEOUT
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
@ -32,7 +34,6 @@ logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
TIMEOUT = 10
VERIFY_CERT = True
@ -406,20 +407,6 @@ class PortalSAT(object):
post['ctl00$ScriptManager1'] = sm
return post
def _get_data_cert(self, cert):
with open(cert['cer'], 'rb') as fh:
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, fh.read())
rfc = cert.get_subject().x500UniqueIdentifier.split(' ')[0]
serie = '{0:x}'.format(cert.get_serial_number())[1::2]
fert = cert.get_notAfter().decode()[2:]
return rfc, serie, fert
def _sign(self, cert, data):
with open(cert['key']) as fh:
key = crypto.load_privatekey(crypto.FILETYPE_PEM, fh.read())
sign = base64.b64encode(crypto.sign(key, data, 'sha256'))
return base64.b64encode(sign).decode('utf-8')
def _get_token(self, firma, co):
co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
data = '{}#{}'.format(co, firma).encode('utf-8')
@ -427,14 +414,13 @@ class PortalSAT(object):
return token
def _make_data_form(self, cert, values):
rfc, serie, fert = self._get_data_cert(cert)
co = '{}|{}|{}'.format(values['tokenuuid'], rfc, serie)
firma = self._sign(cert, co)
co = f"{values['tokenuuid']}|{cert.rfc}|{cert.serial_number_str}"
firma = cert.sign(co)
token = self._get_token(firma, co)
keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
data = {k: values[k] for k in keys}
data['fert'] = fert
data['fert'] = cert.fert
data['token'] = token
data['arc'] = ''
data['placer'] = ''
@ -466,6 +452,7 @@ class PortalSAT(object):
if not result:
msg = 'Error al identificarse en el SAT'
self.error = msg
log.error(msg)
return False
data = self._read_form(result)
@ -544,20 +531,16 @@ class PortalSAT(object):
return xml
def get_uuid(self, cfdi_uuid):
def get_uuid(self, tipo, cfdi_uuid):
data = {'error': '', 'xml': ''}
msg = f'Buscando UUID: {cfdi_uuid}'
log.debug(msg)
# ~ Recibidos
filters = self._get_filters(cfdi_uuid, False)
data['xml'] = self._search_by_uuid(filters)
if data['xml']:
return data
# ~ Emitidos
filters = self._get_filters(cfdi_uuid, True)
data['xml'] = self._search_by_uuid(filters)
if tipo == 'r':
filters = self._get_filters(cfdi_uuid, False)
data['xml'] = self._search_by_uuid(filters)
elif tipo == 'e':
filters = self._get_filters(cfdi_uuid, True)
data['xml'] = self._search_by_uuid(filters)
return data

View File

@ -1,13 +1,23 @@
#!/usr/bin/env python
import logging
from pathlib import Path
from uuid import UUID
from OpenSSL import crypto
from .cfdi_cert import SATCertificate
from .portal_sat import PortalSAT
from conf import RUTA_FIEL, NOMBRE_FIEL
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
def validate_uuid(value):
try:
UUID(value)
@ -34,34 +44,42 @@ def validate_fiel(rfc):
error = f'No se encontró el archivo: {path_fiel_pem}'
return cert, error
cert['key'] = str(path_fiel_pem)
cert['cer'] = str(path_fiel_cer)
cert = SATCertificate(str(path_fiel_cer), str(path_fiel_pem))
return cert, error
def get_uuid(rfc, cfdi_uuid):
def get_uuid(rfc, tipo, cfdi_uuid):
data = {'error': '', 'xml': ''}
if not validate_uuid(cfdi_uuid):
data['error'] = 'UUID inválido'
return data
if not tipo.lower() in ('e', 'r'):
data['error'] = 'Tipo inválido, debe ser e o r'
return data
cert, error = validate_fiel(rfc)
if not cert:
data['error'] = error
return data
sat = PortalSAT()
sat.login(cert)
try:
sat = PortalSAT()
sat.login(cert)
if not sat.is_connect:
if not sat.is_connect:
sat.logout()
data['error'] = sat.error
log.error(sat.error)
return data
data = sat.get_uuid(tipo, cfdi_uuid)
sat.logout()
data['error'] = sat.error
log.eror(sat.error)
return data
data = sat.get_uuid(cfdi_uuid)
sat.logout()
except Exception as e:
log.error(e)
data['xml'] = ''
data['error'] = str(e)
return data