Compare commits
3 Commits
1ac4b4f8ec
...
da889c9af5
Author | SHA1 | Date |
---|---|---|
el Mau | da889c9af5 | |
el Mau | 3e35c55283 | |
el Mau | 200809cc8e |
|
@ -1 +1,3 @@
|
|||
env/
|
||||
__pycache__/
|
||||
conf.py
|
||||
|
|
|
@ -1 +1,3 @@
|
|||
aioreq
|
||||
cryptography
|
||||
peewee
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import argparse
|
||||
from sat import util
|
||||
|
||||
|
||||
def main(args):
|
||||
print(args, '\n')
|
||||
|
||||
if args.fiel_validar:
|
||||
util.fiel_validate(args)
|
||||
return
|
||||
|
||||
util.download(args)
|
||||
|
||||
return
|
||||
|
||||
|
||||
def _process_command_line_arguments():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Descarga SAT')
|
||||
|
||||
parser.add_argument('-fv', '--fiel-validar', default=False, action='store_true')
|
||||
help = 'Directorio de ubicación de la FIEL'
|
||||
parser.add_argument('-fd', '--fiel-dir', help=help, default='')
|
||||
help = "Nombre de la Fiel, el predeterminado es 'fiel'"
|
||||
parser.add_argument('-fn', '--fiel-nombre', help=help, default='fiel')
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = _process_command_line_arguments()
|
||||
main(args)
|
|
@ -0,0 +1,3 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
TOKEN = '12345'
|
|
@ -0,0 +1,218 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import base64
|
||||
import datetime
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography import x509
|
||||
from cryptography.x509.oid import NameOID
|
||||
from cryptography.x509.oid import ExtensionOID
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
|
||||
from conf import TOKEN
|
||||
|
||||
|
||||
class SATCertificate(object):
|
||||
|
||||
def __init__(self, cer):
|
||||
self._error = ''
|
||||
self._init_values()
|
||||
self._get_data_cer(cer)
|
||||
|
||||
def _init_values(self):
|
||||
self._rfc = ''
|
||||
self._serial_number_int = 0
|
||||
self._serial_number_str = ''
|
||||
self._not_before = None
|
||||
self._not_after = None
|
||||
self._is_fiel = False
|
||||
self._are_couple = False
|
||||
self._is_valid_time = False
|
||||
self._cer_pem = ''
|
||||
self._cer_txt = ''
|
||||
self._key = b''
|
||||
self._key_enc = b''
|
||||
self._key_pem = b''
|
||||
self._cer_modulus = 0
|
||||
self._key_modulus = 0
|
||||
self._issuer = ''
|
||||
return
|
||||
|
||||
def __str__(self):
|
||||
msg = '\n\tRFC: {}\n'.format(self.rfc)
|
||||
msg += '\tNo de Serie: {}\n'.format(self.serial_number_str)
|
||||
msg += '\tVálido desde: {}\n'.format(self.not_before)
|
||||
msg += '\tVálido hasta: {}\n'.format(self.not_after)
|
||||
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
|
||||
msg += '\tSon pareja: {}\n'.format(self.are_couple)
|
||||
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
|
||||
return msg
|
||||
|
||||
def __bool__(self):
|
||||
return self.is_valid
|
||||
|
||||
def _get_data_cer(self, cer):
|
||||
obj = x509.load_der_x509_certificate(cer, default_backend())
|
||||
self._rfc = obj.subject.get_attributes_for_oid(
|
||||
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
|
||||
self._serial_number_int = obj.serial_number
|
||||
self._serial_number_str = f'{obj.serial_number:x}'[1::2]
|
||||
self._not_before = obj.not_valid_before
|
||||
self._not_after = obj.not_valid_after
|
||||
self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer])
|
||||
|
||||
now = datetime.datetime.utcnow()
|
||||
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
|
||||
if not self._is_valid_time:
|
||||
msg = 'El certificado no es vigente'
|
||||
self._error = msg
|
||||
|
||||
self._is_fiel = obj.extensions.get_extension_for_oid(
|
||||
ExtensionOID.KEY_USAGE).value.key_agreement
|
||||
|
||||
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
|
||||
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
|
||||
self._cer_modulus = obj.public_key().public_numbers().n
|
||||
|
||||
return
|
||||
|
||||
def _get_hash(self):
|
||||
digest = hashes.Hash(hashes.SHA512(), default_backend())
|
||||
digest.update(self._rfc.encode())
|
||||
digest.update(self._serial_number_str.encode())
|
||||
digest.update(TOKEN.encode())
|
||||
return digest.finalize()
|
||||
|
||||
# ~ def _get_key_pem(self):
|
||||
# ~ obj = self._get_key('')
|
||||
# ~ key_pem = obj.private_bytes(
|
||||
# ~ encoding=serialization.Encoding.PEM,
|
||||
# ~ format=serialization.PrivateFormat.PKCS8,
|
||||
# ~ encryption_algorithm=serialization.NoEncryption()
|
||||
# ~ )
|
||||
# ~ return key_pem
|
||||
|
||||
# ~ def _sign_with_pem(self, data, name_hash):
|
||||
# ~ if name_hash == 'sha256':
|
||||
# ~ type_hash = hashes.SHA256()
|
||||
# ~ elif name_hash == 'sha1':
|
||||
# ~ type_hash = hashes.SHA1()
|
||||
|
||||
# ~ firma = self._key_pem.sign(data, padding.PKCS1v15(), type_hash)
|
||||
# ~ return base64.b64encode(firma).decode()
|
||||
|
||||
@property
|
||||
def rfc(self):
|
||||
return self._rfc
|
||||
|
||||
@property
|
||||
def serial_number_str(self):
|
||||
return self._serial_number_str
|
||||
|
||||
@property
|
||||
def serial_number_int(self):
|
||||
return self._serial_number_int
|
||||
|
||||
@property
|
||||
def not_before(self):
|
||||
return self._not_before
|
||||
|
||||
@property
|
||||
def not_after(self):
|
||||
return self._not_after
|
||||
|
||||
@property
|
||||
def is_fiel(self):
|
||||
return self._is_fiel
|
||||
|
||||
@property
|
||||
def are_couple(self):
|
||||
return self._are_couple
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return not bool(self.error)
|
||||
|
||||
@property
|
||||
def is_valid_time(self):
|
||||
return self._is_valid_time
|
||||
|
||||
@property
|
||||
def cer_pem(self):
|
||||
return self._cer_pem.encode()
|
||||
|
||||
@property
|
||||
def cer_txt(self):
|
||||
return self._cer_txt
|
||||
|
||||
@property
|
||||
def key_pem(self):
|
||||
return self._key_pem
|
||||
@key_pem.setter
|
||||
def key_pem(self, value):
|
||||
self._key_pem = value
|
||||
|
||||
@property
|
||||
def key_enc(self):
|
||||
return self._key_enc
|
||||
|
||||
@property
|
||||
def issuer(self):
|
||||
return self._issuer
|
||||
|
||||
@property
|
||||
def error(self):
|
||||
return self._error
|
||||
|
||||
@property
|
||||
def key(self):
|
||||
return self._key
|
||||
@key.setter
|
||||
def key(self, value):
|
||||
self._key = value
|
||||
|
||||
def validate_key(self, key, password):
|
||||
try:
|
||||
obj = serialization.load_der_private_key(
|
||||
key, password.encode(), default_backend())
|
||||
except ValueError:
|
||||
msg = 'La contraseña es incorrecta'
|
||||
self._error = msg
|
||||
return
|
||||
|
||||
self._key_pem = obj.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
)
|
||||
|
||||
new_pass = self._get_hash()
|
||||
self._key_enc = obj.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.BestAvailableEncryption(new_pass)
|
||||
)
|
||||
|
||||
self._key_modulus = obj.public_key().public_numbers().n
|
||||
self._are_couple = self._cer_modulus == self._key_modulus
|
||||
if not self._are_couple:
|
||||
msg = 'El CER y el KEY no son pareja'
|
||||
self._error = msg
|
||||
|
||||
return
|
||||
|
||||
def sign(self, data, name_hash='sha256'):
|
||||
if name_hash == 'sha256':
|
||||
type_hash = hashes.SHA256()
|
||||
elif name_hash == 'sha1':
|
||||
type_hash = hashes.SHA1()
|
||||
|
||||
password = self._get_hash()
|
||||
private_key = serialization.load_pem_private_key(
|
||||
self._key, password=password, backend=default_backend())
|
||||
sign = private_key.sign(data.encode(), padding.PKCS1v15(), type_hash)
|
||||
del password
|
||||
del private_key
|
||||
return base64.b64encode(sign).decode()
|
|
@ -0,0 +1,118 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import getpass
|
||||
import logging
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from .cfdi_cert import SATCertificate
|
||||
|
||||
|
||||
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
|
||||
LOG_DATE = '%d/%m/%Y %H:%M:%S'
|
||||
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
|
||||
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
|
||||
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
|
||||
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fiel_validate(args):
|
||||
path_fiel = args.fiel_dir
|
||||
if not path_fiel:
|
||||
msg = 'El argumento -fd (fiel-dir) es requerido'
|
||||
log.error(msg)
|
||||
return
|
||||
|
||||
path_fiel = Path(path_fiel)
|
||||
if not path_fiel.is_dir():
|
||||
msg = 'La ruta no es un directorio'
|
||||
log.error(msg)
|
||||
return
|
||||
|
||||
path_fiel_cer = path_fiel / f'{args.fiel_nombre}.cer'
|
||||
if not path_fiel_cer.exists():
|
||||
msg = 'No se encontró el archivo CER'
|
||||
log.error(msg)
|
||||
return
|
||||
|
||||
path_fiel_key = path_fiel / f'{args.fiel_nombre}.key'
|
||||
if not path_fiel_cer.exists():
|
||||
msg = 'No se encontró el archivo KEY'
|
||||
log.error(msg)
|
||||
return
|
||||
|
||||
cer = path_fiel_cer.read_bytes()
|
||||
key = path_fiel_key.read_bytes()
|
||||
|
||||
cert = SATCertificate(cer)
|
||||
|
||||
if not cert.is_valid:
|
||||
log.error(cert.error)
|
||||
return
|
||||
|
||||
password = getpass.getpass('Captura la contraseña de la FIEL: ')
|
||||
|
||||
cert.validate_key(key, password)
|
||||
|
||||
if not cert.is_valid:
|
||||
log.error(cert.error)
|
||||
return
|
||||
|
||||
if not cert.is_fiel:
|
||||
msg = 'El certificado no es Fiel, no puedes usarlo para descargar.'
|
||||
log.error(msg)
|
||||
return
|
||||
|
||||
path_fiel_enc = path_fiel / f'{args.fiel_nombre}.enc'
|
||||
path_fiel_enc.write_bytes(cert.key_enc)
|
||||
|
||||
msg = f'El certificado es válido.\n{cert}\n\tPuedes usarlo para descargar.\n'
|
||||
log.info(msg)
|
||||
|
||||
return
|
||||
|
||||
|
||||
def _get_certificate(path_fiel, name_fiel):
|
||||
if not path_fiel:
|
||||
msg = 'El argumento -fd (fiel-dir) es requerido'
|
||||
log.error(msg)
|
||||
return
|
||||
|
||||
path_fiel = Path(path_fiel)
|
||||
path_fiel_cer = path_fiel / f'{name_fiel}.cer'
|
||||
path_fiel_key = path_fiel / f'{name_fiel}.enc'
|
||||
path_fiel_pem = path_fiel / f'{name_fiel}.pem'
|
||||
|
||||
cer = path_fiel_cer.read_bytes()
|
||||
|
||||
certificate = SATCertificate(cer)
|
||||
if not certificate.is_valid:
|
||||
log.error(certificate.error)
|
||||
return
|
||||
|
||||
if path_fiel_key.exists():
|
||||
certificate.key = path_fiel_key.read_bytes()
|
||||
if path_fiel_pem.exists():
|
||||
certificate.key_pem = path_fiel_pem.read_bytes()
|
||||
|
||||
return certificate
|
||||
|
||||
|
||||
def _validate_arguments(args):
|
||||
cert = _get_certificate(args.fiel_dir, args.fiel_nombre)
|
||||
if cert is None:
|
||||
return {}, None
|
||||
|
||||
data = {'path_dowload': ''}
|
||||
|
||||
return data, cert
|
||||
|
||||
def download(args):
|
||||
data, cert = _validate_arguments(args)
|
||||
if not data:
|
||||
return
|
||||
|
||||
print(data)
|
||||
|
||||
return
|
Loading…
Reference in New Issue