Compare commits

...

3 Commits

Author SHA1 Message Date
el Mau da889c9af5 Remove argument 2023-01-19 22:45:15 -06:00
el Mau 3e35c55283 Validate path download 2023-01-18 23:28:43 -06:00
el Mau 200809cc8e Validate Fiel 2023-01-18 22:10:36 -06:00
6 changed files with 377 additions and 0 deletions

2
.gitignore vendored
View File

@ -1 +1,3 @@
env/
__pycache__/
conf.py

View File

@ -1 +1,3 @@
aioreq
cryptography
peewee

34
source/cfdi-descarga.py Normal file
View File

@ -0,0 +1,34 @@
#!/usr/bin/env python
import argparse
from sat import util
def main(args):
print(args, '\n')
if args.fiel_validar:
util.fiel_validate(args)
return
util.download(args)
return
def _process_command_line_arguments():
parser = argparse.ArgumentParser(
description='Descarga SAT')
parser.add_argument('-fv', '--fiel-validar', default=False, action='store_true')
help = 'Directorio de ubicación de la FIEL'
parser.add_argument('-fd', '--fiel-dir', help=help, default='')
help = "Nombre de la Fiel, el predeterminado es 'fiel'"
parser.add_argument('-fn', '--fiel-nombre', help=help, default='fiel')
return parser.parse_args()
if __name__ == '__main__':
args = _process_command_line_arguments()
main(args)

3
source/conf.py Normal file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env python
TOKEN = '12345'

218
source/sat/cfdi_cert.py Normal file
View File

@ -0,0 +1,218 @@
#!/usr/bin/env python3
import base64
import datetime
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from conf import TOKEN
class SATCertificate(object):
def __init__(self, cer):
self._error = ''
self._init_values()
self._get_data_cer(cer)
def _init_values(self):
self._rfc = ''
self._serial_number_int = 0
self._serial_number_str = ''
self._not_before = None
self._not_after = None
self._is_fiel = False
self._are_couple = False
self._is_valid_time = False
self._cer_pem = ''
self._cer_txt = ''
self._key = b''
self._key_enc = b''
self._key_pem = b''
self._cer_modulus = 0
self._key_modulus = 0
self._issuer = ''
return
def __str__(self):
msg = '\n\tRFC: {}\n'.format(self.rfc)
msg += '\tNo de Serie: {}\n'.format(self.serial_number_str)
msg += '\tVálido desde: {}\n'.format(self.not_before)
msg += '\tVálido hasta: {}\n'.format(self.not_after)
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
msg += '\tSon pareja: {}\n'.format(self.are_couple)
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
return msg
def __bool__(self):
return self.is_valid
def _get_data_cer(self, cer):
obj = x509.load_der_x509_certificate(cer, default_backend())
self._rfc = obj.subject.get_attributes_for_oid(
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
self._serial_number_int = obj.serial_number
self._serial_number_str = f'{obj.serial_number:x}'[1::2]
self._not_before = obj.not_valid_before
self._not_after = obj.not_valid_after
self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer])
now = datetime.datetime.utcnow()
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
if not self._is_valid_time:
msg = 'El certificado no es vigente'
self._error = msg
self._is_fiel = obj.extensions.get_extension_for_oid(
ExtensionOID.KEY_USAGE).value.key_agreement
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
self._cer_modulus = obj.public_key().public_numbers().n
return
def _get_hash(self):
digest = hashes.Hash(hashes.SHA512(), default_backend())
digest.update(self._rfc.encode())
digest.update(self._serial_number_str.encode())
digest.update(TOKEN.encode())
return digest.finalize()
# ~ def _get_key_pem(self):
# ~ obj = self._get_key('')
# ~ key_pem = obj.private_bytes(
# ~ encoding=serialization.Encoding.PEM,
# ~ format=serialization.PrivateFormat.PKCS8,
# ~ encryption_algorithm=serialization.NoEncryption()
# ~ )
# ~ return key_pem
# ~ def _sign_with_pem(self, data, name_hash):
# ~ if name_hash == 'sha256':
# ~ type_hash = hashes.SHA256()
# ~ elif name_hash == 'sha1':
# ~ type_hash = hashes.SHA1()
# ~ firma = self._key_pem.sign(data, padding.PKCS1v15(), type_hash)
# ~ return base64.b64encode(firma).decode()
@property
def rfc(self):
return self._rfc
@property
def serial_number_str(self):
return self._serial_number_str
@property
def serial_number_int(self):
return self._serial_number_int
@property
def not_before(self):
return self._not_before
@property
def not_after(self):
return self._not_after
@property
def is_fiel(self):
return self._is_fiel
@property
def are_couple(self):
return self._are_couple
@property
def is_valid(self):
return not bool(self.error)
@property
def is_valid_time(self):
return self._is_valid_time
@property
def cer_pem(self):
return self._cer_pem.encode()
@property
def cer_txt(self):
return self._cer_txt
@property
def key_pem(self):
return self._key_pem
@key_pem.setter
def key_pem(self, value):
self._key_pem = value
@property
def key_enc(self):
return self._key_enc
@property
def issuer(self):
return self._issuer
@property
def error(self):
return self._error
@property
def key(self):
return self._key
@key.setter
def key(self, value):
self._key = value
def validate_key(self, key, password):
try:
obj = serialization.load_der_private_key(
key, password.encode(), default_backend())
except ValueError:
msg = 'La contraseña es incorrecta'
self._error = msg
return
self._key_pem = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
new_pass = self._get_hash()
self._key_enc = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(new_pass)
)
self._key_modulus = obj.public_key().public_numbers().n
self._are_couple = self._cer_modulus == self._key_modulus
if not self._are_couple:
msg = 'El CER y el KEY no son pareja'
self._error = msg
return
def sign(self, data, name_hash='sha256'):
if name_hash == 'sha256':
type_hash = hashes.SHA256()
elif name_hash == 'sha1':
type_hash = hashes.SHA1()
password = self._get_hash()
private_key = serialization.load_pem_private_key(
self._key, password=password, backend=default_backend())
sign = private_key.sign(data.encode(), padding.PKCS1v15(), type_hash)
del password
del private_key
return base64.b64encode(sign).decode()

118
source/sat/util.py Normal file
View File

@ -0,0 +1,118 @@
#!/usr/bin/env python
import getpass
import logging
from pathlib import Path
from .cfdi_cert import SATCertificate
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
def fiel_validate(args):
path_fiel = args.fiel_dir
if not path_fiel:
msg = 'El argumento -fd (fiel-dir) es requerido'
log.error(msg)
return
path_fiel = Path(path_fiel)
if not path_fiel.is_dir():
msg = 'La ruta no es un directorio'
log.error(msg)
return
path_fiel_cer = path_fiel / f'{args.fiel_nombre}.cer'
if not path_fiel_cer.exists():
msg = 'No se encontró el archivo CER'
log.error(msg)
return
path_fiel_key = path_fiel / f'{args.fiel_nombre}.key'
if not path_fiel_cer.exists():
msg = 'No se encontró el archivo KEY'
log.error(msg)
return
cer = path_fiel_cer.read_bytes()
key = path_fiel_key.read_bytes()
cert = SATCertificate(cer)
if not cert.is_valid:
log.error(cert.error)
return
password = getpass.getpass('Captura la contraseña de la FIEL: ')
cert.validate_key(key, password)
if not cert.is_valid:
log.error(cert.error)
return
if not cert.is_fiel:
msg = 'El certificado no es Fiel, no puedes usarlo para descargar.'
log.error(msg)
return
path_fiel_enc = path_fiel / f'{args.fiel_nombre}.enc'
path_fiel_enc.write_bytes(cert.key_enc)
msg = f'El certificado es válido.\n{cert}\n\tPuedes usarlo para descargar.\n'
log.info(msg)
return
def _get_certificate(path_fiel, name_fiel):
if not path_fiel:
msg = 'El argumento -fd (fiel-dir) es requerido'
log.error(msg)
return
path_fiel = Path(path_fiel)
path_fiel_cer = path_fiel / f'{name_fiel}.cer'
path_fiel_key = path_fiel / f'{name_fiel}.enc'
path_fiel_pem = path_fiel / f'{name_fiel}.pem'
cer = path_fiel_cer.read_bytes()
certificate = SATCertificate(cer)
if not certificate.is_valid:
log.error(certificate.error)
return
if path_fiel_key.exists():
certificate.key = path_fiel_key.read_bytes()
if path_fiel_pem.exists():
certificate.key_pem = path_fiel_pem.read_bytes()
return certificate
def _validate_arguments(args):
cert = _get_certificate(args.fiel_dir, args.fiel_nombre)
if cert is None:
return {}, None
data = {'path_dowload': ''}
return data, cert
def download(args):
data, cert = _validate_arguments(args)
if not data:
return
print(data)
return