diff --git a/.gitignore b/.gitignore index bdaab25..06a47c5 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ env/ +__pycache__/ diff --git a/requirements.txt b/requirements.txt index ec36d32..54fd49c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ aioreq +cryptography diff --git a/source/cfdi-descarga.py b/source/cfdi-descarga.py new file mode 100644 index 0000000..9cbd48f --- /dev/null +++ b/source/cfdi-descarga.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python + +import argparse +from sat import util + + +def main(args): + print(args) + + if args.fiel_validar: + util.fiel_validate(args) + return + + +def _process_command_line_arguments(): + parser = argparse.ArgumentParser( + description='Descarga SAT') + + parser.add_argument('-fv', '--fiel-validar', default=False, action='store_true') + help = "Directorio de ubicación de la FIEL" + parser.add_argument('-fd', '--fiel-dir', help=help, default='') + help = "Nombre de la Fiel, el predeterminado es 'fiel'" + parser.add_argument('-fn', '--fiel-nombre', help=help, default='fiel') + + return parser.parse_args() + + +if __name__ == '__main__': + args = _process_command_line_arguments() + main(args) diff --git a/source/conf.py b/source/conf.py new file mode 100644 index 0000000..fbe7578 --- /dev/null +++ b/source/conf.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python + +TOKEN = '12345' diff --git a/source/sat/cfdi_cert.py b/source/sat/cfdi_cert.py new file mode 100644 index 0000000..03712cd --- /dev/null +++ b/source/sat/cfdi_cert.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 + +import argparse +import base64 +import datetime +import getpass +from pathlib import Path + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.x509.oid import ExtensionOID +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding + +from conf import TOKEN + + +class SATCertificate(object): + + def __init__(self, cer): + # ~ def __init__(self, cer=b'', key=b'', pem=b'', password=''): + self._error = '' + self._init_values() + self._get_data_cer(cer) + # ~ self._get_data_pem(pem) + + def _init_values(self): + self._rfc = '' + self._serial_number_int = 0 + self._serial_number_str = '' + self._not_before = None + self._not_after = None + self._is_fiel = False + self._are_couple = False + self._is_valid_time = False + self._cer_pem = '' + self._cer_txt = '' + self._key_enc = b'' + self._key_pem = b'' + self._cer_modulus = 0 + self._key_modulus = 0 + self._issuer = '' + return + + def __str__(self): + msg = '\n\tRFC: {}\n'.format(self.rfc) + msg += '\tNo de Serie: {}\n'.format(self.serial_number_str) + msg += '\tVálido desde: {}\n'.format(self.not_before) + msg += '\tVálido hasta: {}\n'.format(self.not_after) + msg += '\tEs vigente: {}\n'.format(self.is_valid_time) + msg += '\tSon pareja: {}\n'.format(self.are_couple) + msg += '\tEs FIEL: {}\n'.format(self.is_fiel) + return msg + + def __bool__(self): + return self.is_valid + + def _get_data_cer(self, cer): + obj = x509.load_der_x509_certificate(cer, default_backend()) + self._rfc = obj.subject.get_attributes_for_oid( + NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0] + self._serial_number_int = obj.serial_number + self._serial_number_str = f'{obj.serial_number:x}'[1::2] + self._not_before = obj.not_valid_before + self._not_after = obj.not_valid_after + self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer]) + + now = datetime.datetime.utcnow() + self._is_valid_time = (now > self.not_before) and (now < self.not_after) + if not self._is_valid_time: + msg = 'El certificado no es vigente' + self._error = msg + + self._is_fiel = obj.extensions.get_extension_for_oid( + ExtensionOID.KEY_USAGE).value.key_agreement + + self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode() + self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2]) + self._cer_modulus = obj.public_key().public_numbers().n + + return + + def _get_hash(self): + digest = hashes.Hash(hashes.SHA512(), default_backend()) + digest.update(self._rfc.encode()) + digest.update(self._serial_number_str.encode()) + digest.update(TOKEN.encode()) + return digest.finalize() + + # ~ def _get_key(self, password): + # ~ if not password: + # ~ password = self._get_hash() + # ~ private_key = serialization.load_pem_private_key( + # ~ self._key_enc, password=password, backend=default_backend()) + # ~ return private_key + + # ~ def _get_key_pem(self): + # ~ obj = self._get_key('') + # ~ key_pem = obj.private_bytes( + # ~ encoding=serialization.Encoding.PEM, + # ~ format=serialization.PrivateFormat.PKCS8, + # ~ encryption_algorithm=serialization.NoEncryption() + # ~ ) + # ~ return key_pem + + # ~ def _get_data_pem(self, pem): + # ~ if not pem: + # ~ return + + # ~ self._key_pem = serialization.load_pem_private_key( + # ~ pem, None, backend=default_backend()) + # ~ return + + # ~ def _sign_with_pem(self, data, name_hash): + # ~ if name_hash == 'sha256': + # ~ type_hash = hashes.SHA256() + # ~ elif name_hash == 'sha1': + # ~ type_hash = hashes.SHA1() + + # ~ firma = self._key_pem.sign(data, padding.PKCS1v15(), type_hash) + # ~ return base64.b64encode(firma).decode() + + # ~ def sign_sha1(self, data, password=''): + # ~ if self._key_pem: + # ~ return self._sign_with_pem(data, 'sha1') + + # ~ private_key = self._get_key(password) + # ~ firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA1()) + # ~ return base64.b64encode(firma).decode() + + # ~ def sign_xml(self, tree): + # ~ import xmlsec + + # ~ node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature) + # ~ ctx = xmlsec.SignatureContext() + # ~ key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem) + # ~ ctx.key = key + # ~ ctx.sign(node) + # ~ node = xmlsec.tree.find_node(tree, 'X509Certificate') + # ~ node.text = self.cer_txt + # ~ return tree + + @property + def rfc(self): + return self._rfc + + @property + def serial_number_str(self): + return self._serial_number_str + + @property + def serial_number_int(self): + return self._serial_number_int + + @property + def not_before(self): + return self._not_before + + @property + def not_after(self): + return self._not_after + + @property + def is_fiel(self): + return self._is_fiel + + @property + def are_couple(self): + return self._are_couple + + @property + def is_valid(self): + return not bool(self.error) + + @property + def is_valid_time(self): + return self._is_valid_time + + @property + def cer_pem(self): + return self._cer_pem.encode() + + @property + def cer_txt(self): + return self._cer_txt + + @property + def key_pem(self): + return self._key_pem + + @property + def key_enc(self): + return self._key_enc + + @property + def issuer(self): + return self._issuer + + @property + def error(self): + return self._error + + def validate_key(self, key, password): + try: + obj = serialization.load_der_private_key( + key, password.encode(), default_backend()) + except ValueError: + msg = 'La contraseña es incorrecta' + self._error = msg + return + + self._key_pem = obj.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + + new_pass = self._get_hash() + self._key_enc = obj.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.BestAvailableEncryption(new_pass) + ) + + self._key_modulus = obj.public_key().public_numbers().n + self._are_couple = self._cer_modulus == self._key_modulus + if not self._are_couple: + msg = 'El CER y el KEY no son pareja' + self._error = msg + + return + + def sign(self, data, password=''): + private_key = self._get_key(password) + firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256()) + return base64.b64encode(firma).decode() diff --git a/source/sat/util.py b/source/sat/util.py new file mode 100644 index 0000000..6495361 --- /dev/null +++ b/source/sat/util.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +import getpass +import logging + +from pathlib import Path + +from .cfdi_cert import SATCertificate + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) + + +def fiel_validate(args): + path_fiel = args.fiel_dir + if not path_fiel: + msg = 'El argumento -fd (fiel-dir) es requerido' + log.error(msg) + return + + path_fiel = Path(path_fiel) + if not path_fiel.is_dir(): + msg = 'La ruta no es un directorio' + log.error(msg) + return + + path_fiel_cer = path_fiel / f'{args.fiel_nombre}.cer' + if not path_fiel_cer.exists(): + msg = 'No se encontró el archivo CER' + log.error(msg) + return + + path_fiel_key = path_fiel / f'{args.fiel_nombre}.key' + if not path_fiel_cer.exists(): + msg = 'No se encontró el archivo KEY' + log.error(msg) + return + + cer = path_fiel_cer.read_bytes() + key = path_fiel_key.read_bytes() + + cert = SATCertificate(cer) + + if not cert.is_valid: + log.error(cert.error) + return + + password = getpass.getpass('Captura la contraseña de la FIEL: ') + + cert.validate_key(key, password) + + if not cert.is_valid: + log.error(cert.error) + return + + if not cert.is_fiel: + msg = 'El certificado no es Fiel, no puedes usarlo para descargar.' + log.error(msg) + return + + path_fiel_enc = path_fiel / f'{args.fiel_nombre}.enc' + path_fiel_enc.write_bytes(cert.key_enc) + + msg = f'El certificado es válido.\n{cert}\n\tPuedes usarlo para descargar.\n' + log.info(msg) + + return