cfdi-descarga/source/sat/cfdi_cert.py

219 lines
6.3 KiB
Python

#!/usr/bin/env python3
import base64
import datetime
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from conf import TOKEN
class SATCertificate(object):
def __init__(self, cer):
self._error = ''
self._init_values()
self._get_data_cer(cer)
def _init_values(self):
self._rfc = ''
self._serial_number_int = 0
self._serial_number_str = ''
self._not_before = None
self._not_after = None
self._is_fiel = False
self._are_couple = False
self._is_valid_time = False
self._cer_pem = ''
self._cer_txt = ''
self._key = b''
self._key_enc = b''
self._key_pem = b''
self._cer_modulus = 0
self._key_modulus = 0
self._issuer = ''
return
def __str__(self):
msg = '\n\tRFC: {}\n'.format(self.rfc)
msg += '\tNo de Serie: {}\n'.format(self.serial_number_str)
msg += '\tVálido desde: {}\n'.format(self.not_before)
msg += '\tVálido hasta: {}\n'.format(self.not_after)
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
msg += '\tSon pareja: {}\n'.format(self.are_couple)
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
return msg
def __bool__(self):
return self.is_valid
def _get_data_cer(self, cer):
obj = x509.load_der_x509_certificate(cer, default_backend())
self._rfc = obj.subject.get_attributes_for_oid(
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
self._serial_number_int = obj.serial_number
self._serial_number_str = f'{obj.serial_number:x}'[1::2]
self._not_before = obj.not_valid_before
self._not_after = obj.not_valid_after
self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer])
now = datetime.datetime.utcnow()
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
if not self._is_valid_time:
msg = 'El certificado no es vigente'
self._error = msg
self._is_fiel = obj.extensions.get_extension_for_oid(
ExtensionOID.KEY_USAGE).value.key_agreement
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
self._cer_modulus = obj.public_key().public_numbers().n
return
def _get_hash(self):
digest = hashes.Hash(hashes.SHA512(), default_backend())
digest.update(self._rfc.encode())
digest.update(self._serial_number_str.encode())
digest.update(TOKEN.encode())
return digest.finalize()
# ~ def _get_key_pem(self):
# ~ obj = self._get_key('')
# ~ key_pem = obj.private_bytes(
# ~ encoding=serialization.Encoding.PEM,
# ~ format=serialization.PrivateFormat.PKCS8,
# ~ encryption_algorithm=serialization.NoEncryption()
# ~ )
# ~ return key_pem
# ~ def _sign_with_pem(self, data, name_hash):
# ~ if name_hash == 'sha256':
# ~ type_hash = hashes.SHA256()
# ~ elif name_hash == 'sha1':
# ~ type_hash = hashes.SHA1()
# ~ firma = self._key_pem.sign(data, padding.PKCS1v15(), type_hash)
# ~ return base64.b64encode(firma).decode()
@property
def rfc(self):
return self._rfc
@property
def serial_number_str(self):
return self._serial_number_str
@property
def serial_number_int(self):
return self._serial_number_int
@property
def not_before(self):
return self._not_before
@property
def not_after(self):
return self._not_after
@property
def is_fiel(self):
return self._is_fiel
@property
def are_couple(self):
return self._are_couple
@property
def is_valid(self):
return not bool(self.error)
@property
def is_valid_time(self):
return self._is_valid_time
@property
def cer_pem(self):
return self._cer_pem.encode()
@property
def cer_txt(self):
return self._cer_txt
@property
def key_pem(self):
return self._key_pem
@key_pem.setter
def key_pem(self, value):
self._key_pem = value
@property
def key_enc(self):
return self._key_enc
@property
def issuer(self):
return self._issuer
@property
def error(self):
return self._error
@property
def key(self):
return self._key
@key.setter
def key(self, value):
self._key = value
def validate_key(self, key, password):
try:
obj = serialization.load_der_private_key(
key, password.encode(), default_backend())
except ValueError:
msg = 'La contraseña es incorrecta'
self._error = msg
return
self._key_pem = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
new_pass = self._get_hash()
self._key_enc = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(new_pass)
)
self._key_modulus = obj.public_key().public_numbers().n
self._are_couple = self._cer_modulus == self._key_modulus
if not self._are_couple:
msg = 'El CER y el KEY no son pareja'
self._error = msg
return
def sign(self, data, name_hash='sha256'):
if name_hash == 'sha256':
type_hash = hashes.SHA256()
elif name_hash == 'sha1':
type_hash = hashes.SHA1()
password = self._get_hash()
private_key = serialization.load_pem_private_key(
self._key, password=password, backend=default_backend())
sign = private_key.sign(data.encode(), padding.PKCS1v15(), type_hash)
del password
del private_key
return base64.b64encode(sign).decode()