diff --git a/.gitignore b/.gitignore index f8b73e7..87f0ff9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ # ---> Python + +conf.py + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/source/cfdi-descarga.py b/source/cfdi-descarga.py new file mode 100755 index 0000000..1147ef2 --- /dev/null +++ b/source/cfdi-descarga.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +import argparse +from sat import util + + +def main(args): + if args.fiel_validar: + util.fiel_validar(args) + return + + util.sat_download(args) + + return + + +def _process_command_line_arguments(): + now = util.today() + year = now.year + month = now.month + + parser = argparse.ArgumentParser(description='CFDI Descarga SAT') + + help = 'Valida la FIEL' + parser.add_argument('-fv', '--fiel-validar', help=help, + action='store_true', default=False, required=False) + help = 'Ruta al directorio con la FIEL' + parser.add_argument('-fd', '--fiel-dir', help=help, default='') + help = "Nombre de los archivos FIEL, el predeterminado es 'fiel'" + parser.add_argument('-fn', '--fiel-nombre', help=help, default='fiel') + + help = "Descargar por Tipo: t=todos(default), e=emitidas, r=recibidas" + parser.add_argument('-t', '--tipo', help=help, dest='type', default='t', choices=['t', 'e', 'r']) + help = "Año de la descarga entre 2014 y el año actual (predeterminado)." + parser.add_argument('-a', '--año', help=help, dest='year', default=year, type=int, choices=range(2014, year+1)) + help = "Mes de la descarga, el mes actual es el predeterminado" + parser.add_argument('-m', '--mes', help=help, dest='month', default=month, type=int, choices=range(13)) + help = "Día de la descarga, de forma predeterminada no se usa" + parser.add_argument('-d', '--dia', help=help, dest='day', default=0, type=int, choices=range(32)) + + args = parser.parse_args() + return args + + +if __name__ == '__main__': + args = _process_command_line_arguments() + main(args) diff --git a/source/sat/cfdi_cert.py b/source/sat/cfdi_cert.py new file mode 100644 index 0000000..725029c --- /dev/null +++ b/source/sat/cfdi_cert.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python3 + +import argparse +import base64 +import datetime +import getpass +from pathlib import Path + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.x509.oid import ExtensionOID +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding + +from conf import TOKEN + + +class SATCertificate(object): + + def __init__(self, cer=b'', key=b'', password=''): + self._error = '' + self._init_values() + self._get_data_cer(cer) + self._get_data_key(key, password) + + def _init_values(self): + self._rfc = '' + self._serial_number = '' + self._not_before = None + self._not_after = None + self._is_fiel = False + self._are_couple = False + self._is_valid_time = False + self._cer_pem = '' + self._cer_txt = '' + self._key_enc = b'' + self._p12 = b'' + self._cer_modulus = 0 + self._key_modulus = 0 + return + + def __str__(self): + msg = '\tRFC: {}\n'.format(self.rfc) + msg += '\tNo de Serie: {}\n'.format(self.serial_number) + msg += '\tVálido desde: {}\n'.format(self.not_before) + msg += '\tVálido hasta: {}\n'.format(self.not_after) + msg += '\tEs vigente: {}\n'.format(self.is_valid_time) + msg += '\tSon pareja: {}\n'.format(self.are_couple) + msg += '\tEs FIEL: {}\n'.format(self.is_fiel) + return msg + + def __bool__(self): + return self.is_valid + + def _get_hash(self): + digest = hashes.Hash(hashes.SHA512(), default_backend()) + digest.update(self._rfc.encode()) + digest.update(self._serial_number.encode()) + digest.update(TOKEN.encode()) + return digest.finalize() + + def _get_data_cer(self, cer): + obj = x509.load_der_x509_certificate(cer, default_backend()) + self._rfc = obj.subject.get_attributes_for_oid( + NameOID.X500_UNIQUE_IDENTIFIER)[0].value.split(' ')[0] + self._serial_number = '{0:x}'.format(obj.serial_number)[1::2] + self._not_before = obj.not_valid_before + self._not_after = obj.not_valid_after + now = datetime.datetime.utcnow() + self._is_valid_time = (now > self.not_before) and (now < self.not_after) + if not self._is_valid_time: + msg = 'El certificado no es vigente' + self._error = msg + + self._is_fiel = obj.extensions.get_extension_for_oid( + ExtensionOID.KEY_USAGE).value.key_agreement + + self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode() + self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2]) + self._cer_modulus = obj.public_key().public_numbers().n + return + + def _get_data_key(self, key, password): + self._key_enc = key + if not key or not password: + return + + try: + obj = serialization.load_der_private_key( + key, password.encode(), default_backend()) + except ValueError: + msg = 'La contraseña es incorrecta' + self._error = msg + return + + p = self._get_hash() + self._key_enc = obj.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.BestAvailableEncryption(p) + ) + + self._key_modulus = obj.public_key().public_numbers().n + self._are_couple = self._cer_modulus == self._key_modulus + if not self._are_couple: + msg = 'El CER y el KEY no son pareja' + self._error = msg + return + + def _get_key(self, password): + if not password: + password = self._get_hash() + private_key = serialization.load_pem_private_key( + self._key_enc, password=password, backend=default_backend()) + return private_key + + def _get_key_pem(self): + obj = self._get_key('') + key_pem = obj.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + return key_pem + + # Not work + def _get_p12(self): + obj = serialization.pkcs12.serialize_key_and_certificates('test', + self.key_pem, self.cer_pem, None, + encryption_algorithm=serialization.NoEncryption() + ) + return obj + + def sign(self, data, password=''): + private_key = self._get_key(password) + firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA256()) + return base64.b64encode(firma).decode() + + def sign_sha1(self, data, password=''): + private_key = self._get_key(password) + firma = private_key.sign(data, padding.PKCS1v15(), hashes.SHA1()) + return base64.b64encode(firma).decode() + + def sign_xml(self, tree): + import xmlsec + + node = xmlsec.tree.find_node(tree, xmlsec.constants.NodeSignature) + ctx = xmlsec.SignatureContext() + key = xmlsec.Key.from_memory(self.key_pem, xmlsec.constants.KeyDataFormatPem) + ctx.key = key + ctx.sign(node) + node = xmlsec.tree.find_node(tree, 'X509Certificate') + node.text = self.cer_txt + return tree + + @property + def rfc(self): + return self._rfc + + @property + def serial_number(self): + return self._serial_number + + @property + def not_before(self): + return self._not_before + + @property + def not_after(self): + return self._not_after + + @property + def is_fiel(self): + return self._is_fiel + + @property + def are_couple(self): + return self._are_couple + + @property + def is_valid(self): + return not bool(self.error) + + @property + def is_valid_time(self): + return self._is_valid_time + + @property + def cer_pem(self): + return self._cer_pem.encode() + + @property + def cer_txt(self): + return self._cer_txt + + @property + def key_pem(self): + return self._get_key_pem() + + @property + def key_enc(self): + return self._key_enc + + @property + def p12(self): + return self._get_p12() + + @property + def error(self): + return self._error + + +def main(args): + contra = getpass.getpass('Introduce la contraseña del archivo KEY: ') + #contra = '12345678a' + if not contra.strip(): + msg = 'La contraseña es requerida' + print(msg) + return + + path_cer = Path(args.cer) + path_key = Path(args.key) + + if not path_cer.is_file(): + msg = 'El archivo CER es necesario' + print(msg) + return + + if not path_key.is_file(): + msg = 'El archivo KEY es necesario' + print(msg) + return + + cer = path_cer.read_bytes() + key = path_key.read_bytes() + cert = SATCertificate(cer, key, contra) + + if cert.error: + print(cert.error) + else: + print(cert) + return + + +def _process_command_line_arguments(): + parser = argparse.ArgumentParser(description='CFDI Certificados') + + help = 'Archivo CER' + parser.add_argument('-c', '--cer', help=help, default='') + help = 'Archivo KEY' + parser.add_argument('-k', '--key', help=help, default='') + + args = parser.parse_args() + return args + + +if __name__ == '__main__': + args = _process_command_line_arguments() + main(args) diff --git a/source/sat/sat_web.py b/source/sat/sat_web.py new file mode 100644 index 0000000..241f216 --- /dev/null +++ b/source/sat/sat_web.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +import base64 +import hashlib +import uuid +from datetime import datetime, timedelta + +import httpx +import lxml.etree as ET + + +class SATWebService(): + BASE = 'https://cfdidescargamasivasolicitud.clouda.sat.gob.mx' + URL = { + 'AUTH': f'{BASE}/Autenticacion/Autenticacion.svc', + 'REQ': f'{BASE}/SolicitaDescargaService.svc', + } + XMLNS = 'http://DescargaMasivaTerceros.gob.mx' + ACTIONS = { + 'AUTH': f'{XMLNS}/IAutenticacion/Autentica', + 'REQ': f'{XMLNS}/ISolicitaDescargaService/SolicitaDescarga', + } + HEADERS = { + 'Content-type': 'text/xml;charset="utf-8"', + 'Accept': 'text/xml', + 'Cache-Control': 'no-cache', + } + NS = { + 's': 'http://schemas.xmlsoap.org/soap/envelope/', + 'u': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd', + 'o': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd' + 'des': 'http://DescargaMasivaTerceros.sat.gob.mx', + 'xd': 'http://www.w3.org/2000/09/xmldsig#', + } + + def __init__(self, cert): + self._cert = cert + self._error = '' + self._token = self._get_token() + + @property + def is_authenticate(self): + return bool(self._token) + + @property + def error(self): + return self._error + + def _get_data_auth(self): + NSMAP = {'s': self.NS['s'], 'u': self.NS['u']} + FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' + UID = str(uuid.uuid4()) + + now = datetime.utcnow() + date_created = now.strftime(FORMAT) + date_expires = (now + timedelta(seconds=300)).strftime(FORMAT) + + node_name = f"{{{self.NS['s']}}}Envelope" + root = ET.Element(node_name, nsmap=NSMAP) + + node_name = f"{{{self.NS['s']}}}Header" + header = ET.SubElement(root, node_name) + + node_name = f"{{{self.NS['o']}}}Security" + nsmap = {'o': self.NS['o']} + attr_name = f"{{{self.NS['s']}}}mustUnderstand" + attr = {attr_name: '1'} + security = ET.SubElement(header, node_name, attr, nsmap=nsmap) + + node_name = f"{{{self.NS['u']}}}Timestamp" + attr_name = f"{{{self.NS['u']}}}Id" + attr = {attr_name: '_0'} + timestamp = ET.SubElement(security, node_name, attr) + node_name = f"{{{self.NS['u']}}}Created" + ET.SubElement(timestamp, node_name).text = date_created + node_name = f"{{{self.NS['u']}}}Expires" + ET.SubElement(timestamp, node_name).text = date_expires + + node_name = f"{{{self.NS['o']}}}BinarySecurityToken" + attr = { + f"{{{self.NS['u']}}}Id": UID, + 'ValueType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3', + 'EncodingType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary' + } + ET.SubElement(security, node_name, attr).text = self._cert.cer_txt + + nsmap = {None: 'http://www.w3.org/2000/09/xmldsig#'} + signature = ET.SubElement(security, 'Signature', nsmap=nsmap) + signedinfo = ET.SubElement(signature, 'SignedInfo') + attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'} + ET.SubElement(signedinfo, 'CanonicalizationMethod', attr1) + attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'} + ET.SubElement(signedinfo, 'SignatureMethod', attr) + + attr = {'URI': '#_0'} + reference = ET.SubElement(signedinfo, 'Reference', attr) + transforms = ET.SubElement(reference, 'Transforms') + ET.SubElement(transforms, 'Transform', attr1) + attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'} + ET.SubElement(reference, 'DigestMethod', attr) + + dvalue = ET.tostring(timestamp, method='c14n', exclusive=1) + dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest()) + ET.SubElement(reference, 'DigestValue').text = dvalue + + signature_value = ET.tostring(signedinfo, method='c14n', exclusive=1) + signature_value = self._cert.sign_sha1(signature_value) + ET.SubElement(signature, 'SignatureValue').text = signature_value + keyinfo = ET.SubElement(signature, 'KeyInfo') + + node_name = f"{{{self.NS['o']}}}SecurityTokenReference" + security_token = ET.SubElement(keyinfo, node_name) + + node_name = f"{{{self.NS['o']}}}Reference" + attr = { + 'ValueType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3', + 'URI': f'#{UID}', + } + ET.SubElement(security_token, node_name, attr) + + node_name = f"{{{self.NS['s']}}}Body" + body = ET.SubElement(root, node_name) + nsmap = {None: self.XMLNS} + ET.SubElement(body, 'Autentica', nsmap=nsmap) + + # ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8') + soap = ET.tostring(root) + + return soap + + def _get_token(self): + headers = self.HEADERS.copy() + headers['SOAPAction'] = self.ACTIONS['AUTH'] + data = self._get_data_auth() + + response = httpx.post(self.URL['AUTH'], data=data, headers=headers) + if response.status_code != httpx.codes.OK: + self._error = f'Status: {response.status_code} - {response.text}' + return + + result = ET.fromstring(response.text) + nsmap = {'s': self.NS['s'], None: self.XMLNS} + node_name = 's:Body/AutenticaResponse/AutenticaResult' + token = result.find(node_name, namespaces=nsmap).text + + return token + + def _get_data_req(self, args): + + return + + def _get_request(self, args): + headers = self.HEADERS.copy() + headers['SOAPAction'] = self.ACTIONS['REQ'] + headers['Authorization'] = f'WRAP access_token="{self._token}"' + data = self._get_data_req(args) + + response = httpx.post(self.URL['REQ'], data=data, headers=headers) + if response.status_code != httpx.codes.OK: + self._error = f'Status: {response.status_code} - {response.text}' + return + + print(response.text) + result = ET.fromstring(response.text) + + return + + def download(self, args): + request = self._get_request(args) + print(request) + return diff --git a/source/sat/util.py b/source/sat/util.py new file mode 100644 index 0000000..bbd984d --- /dev/null +++ b/source/sat/util.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 + +import getpass +import uuid +from datetime import datetime +from pathlib import Path + +from .cfdi_cert import SATCertificate +from .sat_web import SATWebService +from settings import log + + +def today(): + return datetime.today() + + +def validate_date(year, month, day): + try: + datetime(year, month, day, 0, 0, 0) + result = True + except ValueError: + result = False + return result + + +def is_dir(path): + return Path(path).is_dir() + + +def join(*paths): + return Path(paths[0]).joinpath(*paths[1:]) + + +def _validate_fiel_args(args): + fiel_path = args.fiel_dir + fiel_name = args.fiel_nombre + + if not fiel_path: + msg = 'El directorio con la FIEL es requerido' + log.error(msg) + return False, {} + + if not is_dir(fiel_path): + msg = f'La ruta no existe o no es un directorio. \nRuta: {fiel_path}' + log.error(msg) + return False, {} + + path_cer = join(fiel_path, f'{fiel_name}.cer') + path_key = join(fiel_path, f'{fiel_name}.key') + path_enc = join(fiel_path, f'{fiel_name}.enc') + + if not path_cer.is_file(): + msg = f'No se encontró el archivo CER. \nRuta: {path_cer}' + log.error(msg) + return False, {} + + if not path_key.is_file(): + msg = f'No se encontró el archivo KEY. \nRuta: {path_cer}' + log.error(msg) + return False, {} + + data = { + 'path_cer': path_cer, + 'path_key': path_key, + 'path_enc': path_enc, + } + return True, data + + +def fiel_validar(args): + result, data = _validate_fiel_args(args) + if not result: + return + + password = getpass.getpass('Introduce la contraseña del archivo KEY: ') + if not password: + msg = 'La contraseña es requerida para validar la FIEL' + log.error(msg) + return + + cer = data['path_cer'].read_bytes() + key = data['path_key'].read_bytes() + cert = SATCertificate(cer, key, password) + + if cert.error: + msg = f'{cert.error}\n\nNo podrás conectarte el SAT.' + log.error(msg) + return + + if not cert.is_fiel: + msg = 'El certificado no es FIEL' + log.error(msg) + return + + data['path_enc'].write_bytes(cert.key_enc) + + msg = 'Los datos del certificado son:' + log.info(msg) + log.info(f'\n{cert}') + msg = 'Ya puedes descargar del SAT' + log.info(msg) + return + + +def base_datos(): + db.create_tables() + return + + +def _validate_download_args(args): + result, data = _validate_fiel_args(args) + if not result: + return False, {} + + if not data['path_enc'].is_file(): + msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}" + log.error(msg) + return False, {} + + cer = data['path_cer'].read_bytes() + key = data['path_enc'].read_bytes() + cert = SATCertificate(cer, key) + + if not cert.is_valid_time: + msg = 'La FIEL no es vigente' + log.error(msg) + return False, {} + + data['cert'] = cert + data['type'] = args.type + data['year'] = args.year + data['month'] = args.month + data['day'] = args.day + + if data['day']: + if not validate_date(data['year'], data['month'], data['day']): + msg = 'Fecha inválida' + log.error(msg) + return False, {} + + return True, data + + +def sat_download(args): + result, data = _validate_download_args(args) + if not result: + return + + sat = SATWebService(data['cert']) + + if sat.is_authenticate: + sat.download(data) + else: + log.error(sat.error) + + return + diff --git a/source/settings.py b/source/settings.py new file mode 100644 index 0000000..7b2d006 --- /dev/null +++ b/source/settings.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +import logging +from conf import DEBUG + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) +