Validate Fiel

This commit is contained in:
el Mau 2023-01-18 22:10:36 -06:00
parent 1ac4b4f8ec
commit 200809cc8e
6 changed files with 346 additions and 0 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
env/
__pycache__/

View File

@ -1 +1,2 @@
aioreq
cryptography

30
source/cfdi-descarga.py Normal file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
import argparse
from sat import util
def main(args):
print(args)
if args.fiel_validar:
util.fiel_validate(args)
return
def _process_command_line_arguments():
parser = argparse.ArgumentParser(
description='Descarga SAT')
parser.add_argument('-fv', '--fiel-validar', default=False, action='store_true')
help = "Directorio de ubicación de la FIEL"
parser.add_argument('-fd', '--fiel-dir', help=help, default='')
help = "Nombre de la Fiel, el predeterminado es 'fiel'"
parser.add_argument('-fn', '--fiel-nombre', help=help, default='fiel')
return parser.parse_args()
if __name__ == '__main__':
args = _process_command_line_arguments()
main(args)

3
source/conf.py Normal file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env python
TOKEN = '12345'

238
source/sat/cfdi_cert.py Normal file
View File

@ -0,0 +1,238 @@
#!/usr/bin/env python3
import argparse
import base64
import datetime
import getpass
from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from conf import TOKEN
class SATCertificate(object):
def __init__(self, cer):
# ~ def __init__(self, cer=b'', key=b'', pem=b'', password=''):
self._error = ''
self._init_values()
self._get_data_cer(cer)
# ~ self._get_data_pem(pem)
def _init_values(self):
self._rfc = ''
self._serial_number_int = 0
self._serial_number_str = ''
self._not_before = None
self._not_after = None
self._is_fiel = False
self._are_couple = False
self._is_valid_time = False
self._cer_pem = ''
self._cer_txt = ''
self._key_enc = b''
self._key_pem = b''
self._cer_modulus = 0
self._key_modulus = 0
self._issuer = ''
return
def __str__(self):
msg = '\n\tRFC: {}\n'.format(self.rfc)
msg += '\tNo de Serie: {}\n'.format(self.serial_number_str)
msg += '\tVálido desde: {}\n'.format(self.not_before)
msg += '\tVálido hasta: {}\n'.format(self.not_after)
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
msg += '\tSon pareja: {}\n'.format(self.are_couple)
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
return msg
def __bool__(self):
return self.is_valid
def _get_data_cer(self, cer):
obj = x509.load_der_x509_certificate(cer, default_backend())
self._rfc = obj.subject.get_attributes_for_oid(
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
self._serial_number_int = obj.serial_number
self._serial_number_str = f'{obj.serial_number:x}'[1::2]
self._not_before = obj.not_valid_before
self._not_after = obj.not_valid_after
self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer])
now = datetime.datetime.utcnow()
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
if not self._is_valid_time:
msg = 'El certificado no es vigente'
self._error = msg
self._is_fiel = obj.extensions.get_extension_for_oid(
ExtensionOID.KEY_USAGE).value.key_agreement
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
self._cer_modulus = obj.public_key().public_numbers().n
return
def _get_hash(self):
digest = hashes.Hash(hashes.SHA512(), default_backend())
digest.update(self._rfc.encode())
digest.update(self._serial_number_str.encode())
digest.update(TOKEN.encode())
return digest.finalize()
# ~ def _get_key(self, password):
# ~ if not password:
# ~ password = self._get_hash()
# ~ private_key = serialization.load_pem_private_key(
# ~ self._key_enc, password=password, backend=default_backend())
# ~ return private_key
# ~ def _get_key_pem(self):
# ~ obj = self._get_key('')
# ~ key_pem = obj.private_bytes(
# ~ encoding=serialization.Encoding.PEM,
# ~ format=serialization.PrivateFormat.PKCS8,
# ~ encryption_algorithm=serialization.NoEncryption()
# ~ )
# ~ return key_pem
# ~ def _get_data_pem(self, pem):
# ~ if not pem:
# ~ return
# ~ self._key_pem = serialization.load_pem_private_key(
# ~ pem, None, backend=default_backend())
# ~ return
# ~ def _sign_with_pem(self, data, name_hash):
# ~ if name_hash == 'sha256':
# ~ type_hash = hashes.SHA256()
# ~ elif name_hash == 'sha1':
# ~ type_hash = hashes.SHA1()
# ~ firma = self._key_pem.sign(data, padding.PKCS1v15(), type_hash)
# ~ return base64.b64encode(firma).decode()
# ~ def sign_sha1(self, data, password=''):
# ~ if self._key_pem:
# ~ return self._sign_with_pem(data, 'sha1')
# ~ private_key = self._get_key(password)
# ~ firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA1())
# ~ return base64.b64encode(firma).decode()
# ~ def sign_xml(self, tree):
# ~ import xmlsec
# ~ node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature)
# ~ ctx = xmlsec.SignatureContext()
# ~ key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem)
# ~ ctx.key = key
# ~ ctx.sign(node)
# ~ node = xmlsec.tree.find_node(tree, 'X509Certificate')
# ~ node.text = self.cer_txt
# ~ return tree
@property
def rfc(self):
return self._rfc
@property
def serial_number_str(self):
return self._serial_number_str
@property
def serial_number_int(self):
return self._serial_number_int
@property
def not_before(self):
return self._not_before
@property
def not_after(self):
return self._not_after
@property
def is_fiel(self):
return self._is_fiel
@property
def are_couple(self):
return self._are_couple
@property
def is_valid(self):
return not bool(self.error)
@property
def is_valid_time(self):
return self._is_valid_time
@property
def cer_pem(self):
return self._cer_pem.encode()
@property
def cer_txt(self):
return self._cer_txt
@property
def key_pem(self):
return self._key_pem
@property
def key_enc(self):
return self._key_enc
@property
def issuer(self):
return self._issuer
@property
def error(self):
return self._error
def validate_key(self, key, password):
try:
obj = serialization.load_der_private_key(
key, password.encode(), default_backend())
except ValueError:
msg = 'La contraseña es incorrecta'
self._error = msg
return
self._key_pem = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
new_pass = self._get_hash()
self._key_enc = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(new_pass)
)
self._key_modulus = obj.public_key().public_numbers().n
self._are_couple = self._cer_modulus == self._key_modulus
if not self._are_couple:
msg = 'El CER y el KEY no son pareja'
self._error = msg
return
def sign(self, data, password=''):
private_key = self._get_key(password)
firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256())
return base64.b64encode(firma).decode()

73
source/sat/util.py Normal file
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python
import getpass
import logging
from pathlib import Path
from .cfdi_cert import SATCertificate
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
def fiel_validate(args):
path_fiel = args.fiel_dir
if not path_fiel:
msg = 'El argumento -fd (fiel-dir) es requerido'
log.error(msg)
return
path_fiel = Path(path_fiel)
if not path_fiel.is_dir():
msg = 'La ruta no es un directorio'
log.error(msg)
return
path_fiel_cer = path_fiel / f'{args.fiel_nombre}.cer'
if not path_fiel_cer.exists():
msg = 'No se encontró el archivo CER'
log.error(msg)
return
path_fiel_key = path_fiel / f'{args.fiel_nombre}.key'
if not path_fiel_cer.exists():
msg = 'No se encontró el archivo KEY'
log.error(msg)
return
cer = path_fiel_cer.read_bytes()
key = path_fiel_key.read_bytes()
cert = SATCertificate(cer)
if not cert.is_valid:
log.error(cert.error)
return
password = getpass.getpass('Captura la contraseña de la FIEL: ')
cert.validate_key(key, password)
if not cert.is_valid:
log.error(cert.error)
return
if not cert.is_fiel:
msg = 'El certificado no es Fiel, no puedes usarlo para descargar.'
log.error(msg)
return
path_fiel_enc = path_fiel / f'{args.fiel_nombre}.enc'
path_fiel_enc.write_bytes(cert.key_enc)
msg = f'El certificado es válido.\n{cert}\n\tPuedes usarlo para descargar.\n'
log.info(msg)
return