empresa-libre/source/app/controllers/pacs/cfdi_cert.py

306 lines
8.8 KiB
Python
Raw Normal View History

2020-12-30 22:45:57 -06:00
#!/usr/bin/env python3
import argparse
import base64
import datetime
import getpass
2021-01-06 21:30:31 -06:00
import subprocess
2020-12-30 22:45:57 -06:00
from pathlib import Path
import xmlsec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import ExtensionOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
2021-01-05 18:43:34 -06:00
from conf import TOKEN
2020-12-30 22:45:57 -06:00
class SATCertificate(object):
def __init__(self, cer=b'', key=b'', password=''):
self._error = ''
self._init_values()
self._get_data_cer(cer)
self._get_data_key(key, password)
def _init_values(self):
self._rfc = ''
self._serial_number = ''
self._not_before = None
self._not_after = None
self._is_fiel = False
self._are_couple = False
self._is_valid_time = False
2020-12-31 00:01:41 -06:00
self._cer = b''
2020-12-30 22:45:57 -06:00
self._cer_pem = ''
self._cer_txt = ''
self._key_enc = b''
self._p12 = b''
self._cer_modulus = 0
self._key_modulus = 0
return
def __str__(self):
msg = '\tRFC: {}\n'.format(self.rfc)
msg += '\tNo de Serie: {}\n'.format(self.serial_number)
msg += '\tVálido desde: {}\n'.format(self.not_before)
msg += '\tVálido hasta: {}\n'.format(self.not_after)
msg += '\tEs vigente: {}\n'.format(self.is_valid_time)
msg += '\tSon pareja: {}\n'.format(self.are_couple)
msg += '\tEs FIEL: {}\n'.format(self.is_fiel)
return msg
def __bool__(self):
return self.is_valid
def _get_hash(self):
digest = hashes.Hash(hashes.SHA512(), default_backend())
digest.update(self._rfc.encode())
digest.update(self._serial_number.encode())
digest.update(TOKEN.encode())
return digest.finalize()
def _get_data_cer(self, cer):
2020-12-31 00:01:41 -06:00
self._cer = cer
2020-12-30 22:45:57 -06:00
obj = x509.load_der_x509_certificate(cer, default_backend())
self._rfc = obj.subject.get_attributes_for_oid(
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
self._serial_number = '{0:x}'.format(obj.serial_number)[1::2]
self._not_before = obj.not_valid_before
self._not_after = obj.not_valid_after
now = datetime.datetime.utcnow()
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
if not self._is_valid_time:
msg = 'El certificado no es vigente'
self._error = msg
self._is_fiel = obj.extensions.get_extension_for_oid(
ExtensionOID.KEY_USAGE).value.key_agreement
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
self._cer_modulus = obj.public_key().public_numbers().n
return
def _get_data_key(self, key, password):
self._key_enc = key
if not key or not password:
return
try:
obj = serialization.load_der_private_key(
key, password.encode(), default_backend())
except ValueError:
msg = 'La contraseña es incorrecta'
self._error = msg
return
p = self._get_hash()
self._key_enc = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(p)
)
self._key_modulus = obj.public_key().public_numbers().n
self._are_couple = self._cer_modulus == self._key_modulus
if not self._are_couple:
msg = 'El CER y el KEY no son pareja'
self._error = msg
return
def _get_key(self, password):
if not password:
password = self._get_hash()
private_key = serialization.load_pem_private_key(
self._key_enc, password=password, backend=default_backend())
return private_key
def _get_key_pem(self):
obj = self._get_key('')
key_pem = obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return key_pem
# Not work
def _get_p12(self):
obj = serialization.pkcs12.serialize_key_and_certificates('test',
self.key_pem, self.cer_pem, None,
encryption_algorithm=serialization.NoEncryption()
)
return obj
def sign(self, data, password=''):
private_key = self._get_key(password)
firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256())
return base64.b64encode(firma).decode()
def sign_xml(self, tree):
node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature)
ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem)
ctx.key = key
ctx.sign(node)
2020-12-30 22:45:57 -06:00
node = xmlsec.tree.find_node(tree, 'X509Certificate')
node.text = self.cer_txt
2021-11-20 22:08:05 -06:00
# ~ node = xmlsec.tree.find_node(tree, 'SignatureValue')
# ~ node.text = node.text.replace('\n', '')
2021-11-20 22:08:05 -06:00
# ~ node = xmlsec.tree.find_node(tree, 'Modulus')
# ~ node.text = node.text.replace('\n', '')
2020-12-30 22:45:57 -06:00
return tree
@property
def rfc(self):
return self._rfc
@property
def serial_number(self):
return self._serial_number
@property
def not_before(self):
return self._not_before
@property
def not_after(self):
return self._not_after
@property
def is_fiel(self):
return self._is_fiel
@property
def are_couple(self):
return self._are_couple
@property
def is_valid(self):
return not bool(self.error)
@property
def is_valid_time(self):
return self._is_valid_time
2020-12-31 00:01:41 -06:00
@property
def cer(self):
return self._cer
2020-12-30 22:45:57 -06:00
@property
def cer_pem(self):
return self._cer_pem.encode()
@property
def cer_txt(self):
return self._cer_txt
@property
def key_pem(self):
return self._get_key_pem()
@property
def key_enc(self):
return self._key_enc
@property
def p12(self):
return self._get_p12()
@property
def error(self):
return self._error
2021-01-06 21:30:31 -06:00
@classmethod
def save_cert(cls, obj):
import hashlib
cer = x509.load_pem_x509_certificate(obj.cer_pem.encode(), default_backend())
cer_der = cer.public_bytes(serialization.Encoding.DER)
token = hashlib.md5(obj.rfc.encode()).hexdigest()
path = Path(f'/tmp/{obj.rfc}.key')
path.write_text(obj.key_enc)
args = f'openssl rsa -inform PEM -outform PEM -in "{str(path)}" -passin pass:{token}'
pem = subprocess.check_output(args, shell=True).decode()
2021-01-10 14:21:32 -06:00
key = serialization.load_pem_private_key(
2021-01-10 14:23:39 -06:00
pem.encode(), password=None, backend=default_backend())
2021-01-06 21:30:31 -06:00
digest = hashes.Hash(hashes.SHA512(), default_backend())
digest.update(obj.rfc.encode())
digest.update(obj.serie.encode())
digest.update(TOKEN.encode())
p = digest.finalize()
key_enc = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(p)
)
cert = SATCertificate(cer_der, key_enc)
obj.key_enc = cert.key_enc
obj.cer = cert.cer
obj.serie = cert.serial_number
obj.desde = cert.not_before
obj.hasta = cert.not_after
obj.save()
return
2020-12-30 22:45:57 -06:00
def main(args):
# ~ contra = getpass.getpass('Introduce la contraseña del archivo KEY: ')
contra = '12345678a'
if not contra.strip():
msg = 'La contraseña es requerida'
print(msg)
return
path_cer = Path(args.cer)
path_key = Path(args.key)
if not path_cer.is_file():
msg = 'El archivo CER es necesario'
print(msg)
return
if not path_key.is_file():
msg = 'El archivo KEY es necesario'
print(msg)
return
cer = path_cer.read_bytes()
key = path_key.read_bytes()
cert = SATCertificate(cer, key, contra)
if cert.error:
print(cert.error)
else:
print(cert)
return
def _process_command_line_arguments():
parser = argparse.ArgumentParser(description='CFDI Certificados')
help = 'Archivo CER'
parser.add_argument('-c', '--cer', help=help, default='')
help = 'Archivo KEY'
parser.add_argument('-k', '--key', help=help, default='')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = _process_command_line_arguments()
main(args)