235 lines
6.9 KiB
Python
235 lines
6.9 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# ~ cfdi_cert - Para trabajar con certificados del SAT fácilmente
|
|
# ~ Copyright (C) 2020-2024 Mauricio Servin (elmau)
|
|
|
|
# ~ This program is free software: you can redistribute it and/or modify it
|
|
# ~ under the terms of the GNU General Public License as published by the
|
|
# ~ Free Software Foundation, either version 3 of the License, or (at your
|
|
# ~ option) any later version.
|
|
|
|
# ~ This program is distributed in the hope that it will be useful, but
|
|
# ~ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
|
# ~ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
# ~ for more details.
|
|
|
|
# ~ You should have received a copy of the GNU General Public License along
|
|
# ~ with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
import base64
|
|
from datetime import datetime, timezone
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from cryptography.x509.oid import ExtensionOID
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
|
|
|
def get_hash(words: tuple):
|
|
digest = hashes.Hash(hashes.SHA512(), default_backend())
|
|
for word in words:
|
|
digest.update(word.encode())
|
|
return digest.finalize()
|
|
|
|
|
|
class CertValidate(object):
|
|
|
|
def __init__(self):
|
|
self._error = ''
|
|
self._init_values()
|
|
|
|
def _init_values(self):
|
|
self._rfc = ''
|
|
self._serial_number1 = ''
|
|
self._serial_number2 = ''
|
|
self._not_before = None
|
|
self._not_after = None
|
|
self._is_fiel = False
|
|
self._are_couple = False
|
|
self._is_valid_time = False
|
|
self._cer_pem = ''
|
|
self._cer_txt = ''
|
|
self._key = None
|
|
self._key_enc = b''
|
|
self._key_pem = b''
|
|
self._modulus_cer = 0
|
|
self._modulus_key = 0
|
|
self._issuer = ''
|
|
return
|
|
|
|
def __str__(self):
|
|
msg = f'''\tRFC: {self.rfc}
|
|
No de Serie: {self.serial_number1}
|
|
No de Serie: {self.serial_number2}
|
|
Válido desde: {self.not_before}
|
|
Válido hasta: {self.not_after}
|
|
Es vigente: {self.is_valid_time}
|
|
Son pareja: {self.are_couple}
|
|
Es FIEL: {self.is_fiel}
|
|
'''
|
|
return msg
|
|
|
|
def __bool__(self):
|
|
return self.is_valid
|
|
|
|
@property
|
|
def error(self):
|
|
return self._error
|
|
|
|
@property
|
|
def rfc(self):
|
|
return self._rfc
|
|
|
|
@property
|
|
def serial_number1(self):
|
|
return self._serial_number1
|
|
|
|
@property
|
|
def serial_number2(self):
|
|
return self._serial_number2
|
|
|
|
@property
|
|
def not_before(self):
|
|
return self._not_before
|
|
|
|
@property
|
|
def not_after(self):
|
|
return self._not_after
|
|
|
|
@property
|
|
def is_fiel(self):
|
|
return self._is_fiel
|
|
|
|
@property
|
|
def are_couple(self):
|
|
return self._are_couple
|
|
|
|
@property
|
|
def is_valid(self):
|
|
return not bool(self.error)
|
|
|
|
@property
|
|
def is_valid_time(self):
|
|
return self._is_valid_time
|
|
|
|
@property
|
|
def cer_pem(self):
|
|
return self._cer_pem.encode()
|
|
|
|
@property
|
|
def cer_txt(self):
|
|
return self._cer_txt
|
|
|
|
@property
|
|
def key_pem(self):
|
|
return self._get_key_pem()
|
|
|
|
@property
|
|
def issuer(self):
|
|
return self._issuer
|
|
|
|
def get_data_cer(self, cer: bytes):
|
|
obj = x509.load_der_x509_certificate(cer, default_backend())
|
|
self._rfc = obj.subject.get_attributes_for_oid(
|
|
NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0]
|
|
self._serial_number1 = obj.serial_number
|
|
self._serial_number2 = f'{obj.serial_number:x}'[1::2]
|
|
self._not_before = obj.not_valid_before_utc
|
|
self._not_after = obj.not_valid_after_utc
|
|
self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer])
|
|
|
|
now = datetime.now(timezone.utc)
|
|
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
|
|
if not self._is_valid_time:
|
|
msg = 'El certificado no es vigente'
|
|
self._error = msg
|
|
|
|
self._is_fiel = obj.extensions.get_extension_for_oid(
|
|
ExtensionOID.KEY_USAGE).value.key_agreement
|
|
|
|
self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode()
|
|
self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2])
|
|
self._cer_modulus = obj.public_key().public_numbers().n
|
|
return
|
|
|
|
def _get_data_key(self, key, password):
|
|
try:
|
|
obj = serialization.load_der_private_key(
|
|
key, password.encode(), default_backend())
|
|
except ValueError:
|
|
msg = 'La contraseña es incorrecta'
|
|
self._error = msg
|
|
return
|
|
|
|
self._key_modulus = obj.public_key().public_numbers().n
|
|
self._are_couple = self._cer_modulus == self._key_modulus
|
|
if not self._are_couple:
|
|
msg = 'El CER y el KEY no son pareja'
|
|
self._error = msg
|
|
self._key = obj
|
|
return
|
|
|
|
def get_key_enc(self, words: tuple):
|
|
pw = get_hash(words)
|
|
key_enc = self._key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.BestAvailableEncryption(pw)
|
|
)
|
|
return key_enc
|
|
|
|
def get_key_pem(self, key_enc: bytes, words: tuple):
|
|
pw = get_hash(words)
|
|
key = serialization.load_pem_private_key(key_enc, pw, default_backend())
|
|
key_pem = key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
return key_pem
|
|
|
|
def validate(self, cer: bytes, key: bytes, password: str) -> bool:
|
|
self.get_data_cer(cer)
|
|
self._get_data_key(key, password)
|
|
return self.is_valid
|
|
|
|
|
|
class CertSign(object):
|
|
HS = {
|
|
'sha1': hashes.SHA1,
|
|
'sha256': hashes.SHA256,
|
|
}
|
|
|
|
def __init__(self, key_file: bytes, words: tuple=()):
|
|
pw = None
|
|
if words:
|
|
pw = get_hash(words)
|
|
self._key = serialization.load_pem_private_key(key_file, pw, default_backend())
|
|
|
|
def get_data_cer(self, cer: bytes):
|
|
cer = CertValidate().get_data_cer(cer)
|
|
return cer
|
|
|
|
def sign(self, data, type_hash: str='sha256'):
|
|
if isinstance(data, str):
|
|
data = data.encode()
|
|
firma = self._key.sign(data, padding.PKCS1v15(), self.HS[type_hash]())
|
|
firma = base64.b64encode(firma).decode()
|
|
return firma
|
|
|
|
|
|
# ~ def sign_xml(tree):
|
|
# ~ import xmlsec
|
|
|
|
# ~ node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature)
|
|
# ~ ctx = xmlsec.SignatureContext()
|
|
# ~ key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem)
|
|
# ~ ctx.key = key
|
|
# ~ ctx.sign(node)
|
|
# ~ node = xmlsec.tree.find_node(tree, 'X509Certificate')
|
|
# ~ node.text = self.cer_txt
|
|
# ~ return tree |