diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f19ac48 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,4 @@ +v 1.0.0 [03-Abr-2024] +--------------------- + - Refactorizada + diff --git a/README.md b/README.md index c6c8ea1..64fc362 100644 --- a/README.md +++ b/README.md @@ -4,3 +4,4 @@ Maneja los certificados de sello y FIEL fácilmente ## Software libre, NO gratis. + diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..3eefcb9 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.0.0 diff --git a/cert/finkok.cer b/cert/finkok.cer deleted file mode 100644 index 471d739..0000000 Binary files a/cert/finkok.cer and /dev/null differ diff --git a/requirements.txt b/requirements.txt index c909f0e..7ea8cc5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -xmlsec -cryptography==3.4.8 +cryptography==42.0.5 \ No newline at end of file diff --git a/source/__init__.py b/source/__init__.py index 516f2c1..0365c7e 100644 --- a/source/__init__.py +++ b/source/__init__.py @@ -1,3 +1,3 @@ #!/usr/bin/env python3 -from .cfdi-cert import SATCertificate +from .cfdi_cert import CertValidate, CertSign diff --git a/source/cfdi_cert.py b/source/cfdi_cert.py index ec1937f..2dc1f58 100644 --- a/source/cfdi_cert.py +++ b/source/cfdi_cert.py @@ -1,10 +1,24 @@ #!/usr/bin/env python3 -import argparse +# ~ cfdi_cert - Para trabajar con certificados del SAT fácilmente +# ~ Copyright (C) 2020-2024 Mauricio Servin (elmau) + +# ~ This program is free software: you can redistribute it and/or modify it +# ~ under the terms of the GNU General Public License as published by the +# ~ Free Software Foundation, either version 3 of the License, or (at your +# ~ option) any later version. + +# ~ This program is distributed in the hope that it will be useful, but +# ~ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# ~ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# ~ for more details. + +# ~ You should have received a copy of the GNU General Public License along +# ~ with this program. If not, see . + + import base64 -import datetime -import getpass -from pathlib import Path +from datetime import datetime, timezone from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization @@ -14,21 +28,23 @@ from cryptography.x509.oid import ExtensionOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding -from conf import TOKEN + +def get_hash(words: tuple): + digest = hashes.Hash(hashes.SHA512(), default_backend()) + for word in words: + digest.update(word.encode()) + return digest.finalize() -class SATCertificate(object): +class CertValidate(object): - def __init__(self, cer=b'', key=b'', pem=b'', password=''): + def __init__(self): self._error = '' self._init_values() - self._get_data_cer(cer) - self._get_data_key(key, password) - self._get_data_pem(pem) def _init_values(self): self._rfc = '' - self._serial_number = '' + self._serial_number1 = '' self._serial_number2 = '' self._not_before = None self._not_after = None @@ -37,159 +53,40 @@ class SATCertificate(object): self._is_valid_time = False self._cer_pem = '' self._cer_txt = '' + self._key = None self._key_enc = b'' self._key_pem = b'' - self._p12 = b'' - self._cer_modulus = 0 - self._key_modulus = 0 + self._modulus_cer = 0 + self._modulus_key = 0 self._issuer = '' return def __str__(self): - msg = '\tRFC: {}\n'.format(self.rfc) - msg += '\tNo de Serie: {}\n'.format(self.serial_number) - msg += '\tVálido desde: {}\n'.format(self.not_before) - msg += '\tVálido hasta: {}\n'.format(self.not_after) - msg += '\tEs vigente: {}\n'.format(self.is_valid_time) - msg += '\tSon pareja: {}\n'.format(self.are_couple) - msg += '\tEs FIEL: {}\n'.format(self.is_fiel) + msg = f'''\tRFC: {self.rfc} + No de Serie: {self.serial_number1} + No de Serie: {self.serial_number2} + Válido desde: {self.not_before} + Válido hasta: {self.not_after} + Es vigente: {self.is_valid_time} + Son pareja: {self.are_couple} + Es FIEL: {self.is_fiel} + ''' return msg def __bool__(self): return self.is_valid - def _get_hash(self): - digest = hashes.Hash(hashes.SHA512(), default_backend()) - digest.update(self._rfc.encode()) - digest.update(self._serial_number.encode()) - digest.update(TOKEN.encode()) - return digest.finalize() - - def _get_data_cer(self, cer): - obj = x509.load_der_x509_certificate(cer, default_backend()) - self._rfc = obj.subject.get_attributes_for_oid( - NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0] - self._serial_number2 = obj.serial_number - self._serial_number = '{0:x}'.format(obj.serial_number)[1::2] - self._not_before = obj.not_valid_before - self._not_after = obj.not_valid_after - self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer]) - - now = datetime.datetime.utcnow() - self._is_valid_time = (now > self.not_before) and (now < self.not_after) - if not self._is_valid_time: - msg = 'El certificado no es vigente' - self._error = msg - - self._is_fiel = obj.extensions.get_extension_for_oid( - ExtensionOID.KEY_USAGE).value.key_agreement - - self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode() - self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2]) - self._cer_modulus = obj.public_key().public_numbers().n - - return - - def _get_data_key(self, key, password): - self._key_enc = key - if not key or not password: - return - - try: - obj = serialization.load_der_private_key( - key, password.encode(), default_backend()) - except ValueError: - msg = 'La contraseña es incorrecta' - self._error = msg - return - - p = self._get_hash() - self._key_enc = obj.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.BestAvailableEncryption(p) - ) - - self._key_modulus = obj.public_key().public_numbers().n - self._are_couple = self._cer_modulus == self._key_modulus - if not self._are_couple: - msg = 'El CER y el KEY no son pareja' - self._error = msg - return - - def _get_key(self, password): - if not password: - password = self._get_hash() - private_key = serialization.load_pem_private_key( - self._key_enc, password=password, backend=default_backend()) - return private_key - - def _get_key_pem(self): - obj = self._get_key('') - key_pem = obj.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() - ) - return key_pem - - def _get_data_pem(self, pem): - if not pem: - return - - self._key_pem = serialization.load_pem_private_key( - pem, None, backend=default_backend()) - return - - # Not work - def _get_p12(self): - obj = serialization.pkcs12.serialize_key_and_certificates('test', - self.key_pem, self.cer_pem, None, - encryption_algorithm=serialization.NoEncryption() - ) - return obj - - def sign(self, data, password=''): - private_key = self._get_key(password) - firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256()) - return base64.b64encode(firma).decode() - - def _sign_with_pem(self, data, name_hash): - if name_hash == 'sha256': - type_hash = hashes.SHA256() - elif name_hash == 'sha1': - type_hash = hashes.SHA1() - - firma = self._key_pem.sign(data, padding.PKCS1v15(), type_hash) - return base64.b64encode(firma).decode() - - def sign_sha1(self, data, password=''): - if self._key_pem: - return self._sign_with_pem(data, 'sha1') - - private_key = self._get_key(password) - firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA1()) - return base64.b64encode(firma).decode() - - def sign_xml(self, tree): - import xmlsec - - node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature) - ctx = xmlsec.SignatureContext() - key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem) - ctx.key = key - ctx.sign(node) - node = xmlsec.tree.find_node(tree, 'X509Certificate') - node.text = self.cer_txt - return tree + @property + def error(self): + return self._error @property def rfc(self): return self._rfc @property - def serial_number(self): - return self._serial_number + def serial_number1(self): + return self._serial_number1 @property def serial_number2(self): @@ -231,18 +128,104 @@ class SATCertificate(object): def key_pem(self): return self._get_key_pem() - @property - def key_enc(self): - return self._key_enc - @property def issuer(self): return self._issuer - @property - def p12(self): - return self._get_p12() + def _get_data_cer(self, cer): + obj = x509.load_der_x509_certificate(cer, default_backend()) + self._rfc = obj.subject.get_attributes_for_oid( + NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0] + self._serial_number1 = obj.serial_number + self._serial_number2 = f'{obj.serial_number:x}'[1::2] + self._not_before = obj.not_valid_before_utc + self._not_after = obj.not_valid_after_utc + self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer]) - @property - def error(self): - return self._error + now = datetime.now(timezone.utc) + self._is_valid_time = (now > self.not_before) and (now < self.not_after) + if not self._is_valid_time: + msg = 'El certificado no es vigente' + self._error = msg + + self._is_fiel = obj.extensions.get_extension_for_oid( + ExtensionOID.KEY_USAGE).value.key_agreement + + self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode() + self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2]) + self._cer_modulus = obj.public_key().public_numbers().n + return + + def _get_data_key(self, key, password): + try: + obj = serialization.load_der_private_key( + key, password.encode(), default_backend()) + except ValueError: + msg = 'La contraseña es incorrecta' + self._error = msg + return + + self._key_modulus = obj.public_key().public_numbers().n + self._are_couple = self._cer_modulus == self._key_modulus + if not self._are_couple: + msg = 'El CER y el KEY no son pareja' + self._error = msg + self._key = obj + return + + def get_key_enc(self, words: tuple): + pw = get_hash(words) + key_enc = self._key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.BestAvailableEncryption(pw) + ) + return key_enc + + def get_key_pem(self, key_enc: bytes, words: tuple): + pw = get_hash(words) + key = serialization.load_pem_private_key(key_enc, pw, default_backend()) + key_pem = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + return key_pem + + def validate(self, cer: bytes, key: bytes, password: str) -> bool: + self._get_data_cer(cer) + self._get_data_key(key, password) + return self.is_valid + + +class CertSign(object): + HS = { + 'sha1': hashes.SHA1, + 'sha256': hashes.SHA256, + } + + def __init__(self, key_file: bytes, words: tuple=()): + pw = None + if words: + pw = get_hash(words) + self._key = serialization.load_pem_private_key(key_file, pw, default_backend()) + + def sign(self, data, type_hash: str='sha256'): + if isinstance(data, str): + data = data.encode() + firma = self._key.sign(data, padding.PKCS1v15(), self.HS[type_hash]()) + firma = base64.b64encode(firma).decode() + return firma + + +# ~ def sign_xml(tree): + # ~ import xmlsec + + # ~ node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature) + # ~ ctx = xmlsec.SignatureContext() + # ~ key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem) + # ~ ctx.key = key + # ~ ctx.sign(node) + # ~ node = xmlsec.tree.find_node(tree, 'X509Certificate') + # ~ node.text = self.cer_txt + # ~ return tree \ No newline at end of file diff --git a/source/conf.py.example b/source/conf.py.example deleted file mode 100644 index 4e350fb..0000000 --- a/source/conf.py.example +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python3 - -# ~ Establece un token personalizado para encriptar las claves -# ~ from secrets import token_hex -# ~ token_hex(32) - -TOKEN = '' diff --git a/source/tests/__init_.py b/source/tests/__init_.py new file mode 100644 index 0000000..cf529d7 --- /dev/null +++ b/source/tests/__init_.py @@ -0,0 +1,2 @@ +#!/usr/bin/env python + diff --git a/cert/comercio.cer b/source/tests/certs/nopareja.cer similarity index 100% rename from cert/comercio.cer rename to source/tests/certs/nopareja.cer diff --git a/cert/finkok.key b/source/tests/certs/nopareja.key similarity index 100% rename from cert/finkok.key rename to source/tests/certs/nopareja.key diff --git a/source/tests/certs/novigente.cer b/source/tests/certs/novigente.cer new file mode 100644 index 0000000..ebcba80 Binary files /dev/null and b/source/tests/certs/novigente.cer differ diff --git a/cert/comercio.key b/source/tests/certs/novigente.key similarity index 100% rename from cert/comercio.key rename to source/tests/certs/novigente.key diff --git a/source/tests/certs/vigente.cer b/source/tests/certs/vigente.cer new file mode 100644 index 0000000..e0963d1 Binary files /dev/null and b/source/tests/certs/vigente.cer differ diff --git a/source/tests/certs/vigente.key b/source/tests/certs/vigente.key new file mode 100644 index 0000000..4f2d75d Binary files /dev/null and b/source/tests/certs/vigente.key differ diff --git a/source/tests/test.py b/source/tests/test.py new file mode 100644 index 0000000..a50b15b --- /dev/null +++ b/source/tests/test.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +import sys +from pathlib import Path +p = str(Path(__file__).resolve().parent.parent) +sys.path.insert(0, p) + +import unittest +from test_config import * +from cfdi_cert import CertValidate, CertSign + + +# ~ @unittest.SkipTest + + +class BaseTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + pass + + @classmethod + def tearDownClass(cls): + pass + + def setUp(self): + msg = f'In method: {self._testMethodName}' + print(msg) + + def tearDown(self): + pass + + +class TestCertValidate(BaseTest): + + def test_validate_not_vigente(self): + path_cer = path_certs / 'novigente.cer' + path_key = path_certs / 'novigente.key' + cer = path_cer.read_bytes() + key = path_key.read_bytes() + + cert = CertValidate() + result = cert.validate(cer, key, PASSWORD) + self.assertFalse(result) + + expected = 'El certificado no es vigente' + result = cert.error + self.assertEqual(expected, result) + return + + def test_validate_not_couple(self): + path_cer = path_certs / 'nopareja.cer' + path_key = path_certs / 'nopareja.key' + cer = path_cer.read_bytes() + key = path_key.read_bytes() + + cert = CertValidate() + result = cert.validate(cer, key, PASSWORD) + self.assertFalse(result) + + expected = 'El CER y el KEY no son pareja' + result = cert.error + self.assertEqual(expected, result) + return + + def test_validate_wrong_password(self): + cert = CertValidate() + cer = path_cer.read_bytes() + key = path_key.read_bytes() + + result = cert.validate(cer, key, 'letmein') + self.assertFalse(result) + + expected = 'La contraseña es incorrecta' + result = cert.error + self.assertEqual(expected, result) + return + + def test_get_key_enc_pem(self): + cert = CertValidate() + cer = path_cer.read_bytes() + key = path_key.read_bytes() + + result = cert.validate(cer, key, PASSWORD) + self.assertTrue(result) + + words = (cert.rfc, str(cert.serial_number1), MY_TOKEN) + result = cert.get_key_enc(words) + self.assertIsNotNone(result) + + result = cert.get_key_pem(result, words) + self.assertIsNotNone(result) + + return + + def test_validate_cert(self): + cert = CertValidate() + cer = path_cer.read_bytes() + key = path_key.read_bytes() + + result = cert.validate(cer, key, PASSWORD) + # ~ print(cert) + + self.assertTrue(result) + return + + +class TestCertSign(BaseTest): + + def test_sign(self): + cert = CertValidate() + cer = path_cer.read_bytes() + key = path_key.read_bytes() + + result = cert.validate(cer, key, PASSWORD) + self.assertTrue(result) + + words = (cert.rfc, str(cert.serial_number1), MY_TOKEN) + key_enc = cert.get_key_enc(words) + + expected = 'ZSOD/SNUP0YmogV7h94ysXxERPSy8M+EBfWK4oKWkGRIMqSM1DEGLVi0IE0YNDoZTnBSWULsozCxOwt5rJdGE1tr2OTaXaHMubvC88vhqiv62mOeU/vGCv2yPbKcbjOpKDSQ/pEGlgUd69mESwekjpPI0c0NUWlnkO81eHr+Z8v7hTnJxoopvDiMAkg82snPDIFoIBEePcB/VL8oABRLKh9/2UHFMeS0YKQJWApPEaXD1ycxUbBqXgbi2OwQgM4vWMNX0qsiHyuEI82/zUZ8WLj+GHG6m+P/VKs9nYfEurXh68wZZqT1nzUNHudQxGVFdrwgj+uh7kl3O0Swoi160w==' + + cert = CertSign(key_enc, words) + data = 'Ingrid Bergman' + result = cert.sign(data) + + self.assertEqual(expected, result) + return + + def test_sign_with_pem(self): + cert = CertValidate() + cer = path_cer.read_bytes() + key = path_key.read_bytes() + + result = cert.validate(cer, key, PASSWORD) + self.assertTrue(result) + + words = (cert.rfc, str(cert.serial_number1), MY_TOKEN) + key_enc = cert.get_key_enc(words) + key_pem = cert.get_key_pem(key_enc, words) + + expected = 'ZSOD/SNUP0YmogV7h94ysXxERPSy8M+EBfWK4oKWkGRIMqSM1DEGLVi0IE0YNDoZTnBSWULsozCxOwt5rJdGE1tr2OTaXaHMubvC88vhqiv62mOeU/vGCv2yPbKcbjOpKDSQ/pEGlgUd69mESwekjpPI0c0NUWlnkO81eHr+Z8v7hTnJxoopvDiMAkg82snPDIFoIBEePcB/VL8oABRLKh9/2UHFMeS0YKQJWApPEaXD1ycxUbBqXgbi2OwQgM4vWMNX0qsiHyuEI82/zUZ8WLj+GHG6m+P/VKs9nYfEurXh68wZZqT1nzUNHudQxGVFdrwgj+uh7kl3O0Swoi160w==' + + cert = CertSign(key_pem) + data = 'Ingrid Bergman' + result = cert.sign(data) + + self.assertEqual(expected, result) + return + + +if __name__ == '__main__': + unittest.main(exit=False) diff --git a/source/tests/test_config.py b/source/tests/test_config.py new file mode 100644 index 0000000..a280cdd --- /dev/null +++ b/source/tests/test_config.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 + +from pathlib import Path + + +__all__ = [ + 'MY_TOKEN', + 'PASSWORD', + 'path_certs', + 'path_cer', + 'path_key', + ] + + +FOLDER = 'certs' +MY_TOKEN = 'PutoSat' +NAME = 'vigente' +PASSWORD = '12345678a' + + +path_current = Path(__file__).resolve().parent +path_certs = path_current / FOLDER +path_cer = path_certs / f'{NAME}.cer' +path_key = path_certs / f'{NAME}.key'