From 48155488c3a3c35579d175baa413cd1339f62fdb Mon Sep 17 00:00:00 2001 From: Mauricio Baeza Date: Wed, 14 Jul 2021 12:14:58 -0500 Subject: [PATCH] Solicitar descarga --- requirements.txt | 1 + source/cfdi-descarga.py | 3 +- source/sat/cfdi_cert.py | 57 ++++---------------------- source/sat/sat_web.py | 88 +++++++++++++++++++++++++++++++++++------ source/sat/util.py | 25 ++++++++++-- 5 files changed, 108 insertions(+), 66 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9e7e111..68153fd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ httpx peewee cryptography +lxml diff --git a/source/cfdi-descarga.py b/source/cfdi-descarga.py index 1147ef2..8160689 100755 --- a/source/cfdi-descarga.py +++ b/source/cfdi-descarga.py @@ -17,7 +17,6 @@ def main(args): def _process_command_line_arguments(): now = util.today() year = now.year - month = now.month parser = argparse.ArgumentParser(description='CFDI Descarga SAT') @@ -34,7 +33,7 @@ def _process_command_line_arguments(): help = "Año de la descarga entre 2014 y el año actual (predeterminado)." parser.add_argument('-a', '--año', help=help, dest='year', default=year, type=int, choices=range(2014, year+1)) help = "Mes de la descarga, el mes actual es el predeterminado" - parser.add_argument('-m', '--mes', help=help, dest='month', default=month, type=int, choices=range(13)) + parser.add_argument('-m', '--mes', help=help, dest='month', default=0, type=int, choices=range(13)) help = "Día de la descarga, de forma predeterminada no se usa" parser.add_argument('-d', '--dia', help=help, dest='day', default=0, type=int, choices=range(32)) diff --git a/source/sat/cfdi_cert.py b/source/sat/cfdi_cert.py index 725029c..35fc048 100644 --- a/source/sat/cfdi_cert.py +++ b/source/sat/cfdi_cert.py @@ -39,6 +39,7 @@ class SATCertificate(object): self._p12 = b'' self._cer_modulus = 0 self._key_modulus = 0 + self._issuer = '' return def __str__(self): @@ -68,6 +69,8 @@ class SATCertificate(object): self._serial_number = '{0:x}'.format(obj.serial_number)[1::2] self._not_before = obj.not_valid_before self._not_after = obj.not_valid_after + self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer]) + now = datetime.datetime.utcnow() self._is_valid_time = (now > self.not_before) and (now < self.not_after) if not self._is_valid_time: @@ -80,6 +83,7 @@ class SATCertificate(object): self._cer_pem = obj.public_bytes(serialization.Encoding.PEM).decode() self._cer_txt = ''.join(self._cer_pem.split('\n')[1:-2]) self._cer_modulus = obj.public_key().public_numbers().n + return def _get_data_key(self, key, password): @@ -203,6 +207,10 @@ class SATCertificate(object): def key_enc(self): return self._key_enc + @property + def issuer(self): + return self._issuer + @property def p12(self): return self._get_p12() @@ -210,52 +218,3 @@ class SATCertificate(object): @property def error(self): return self._error - - -def main(args): - contra = getpass.getpass('Introduce la contraseña del archivo KEY: ') - #contra = '12345678a' - if not contra.strip(): - msg = 'La contraseña es requerida' - print(msg) - return - - path_cer = Path(args.cer) - path_key = Path(args.key) - - if not path_cer.is_file(): - msg = 'El archivo CER es necesario' - print(msg) - return - - if not path_key.is_file(): - msg = 'El archivo KEY es necesario' - print(msg) - return - - cer = path_cer.read_bytes() - key = path_key.read_bytes() - cert = SATCertificate(cer, key, contra) - - if cert.error: - print(cert.error) - else: - print(cert) - return - - -def _process_command_line_arguments(): - parser = argparse.ArgumentParser(description='CFDI Certificados') - - help = 'Archivo CER' - parser.add_argument('-c', '--cer', help=help, default='') - help = 'Archivo KEY' - parser.add_argument('-k', '--key', help=help, default='') - - args = parser.parse_args() - return args - - -if __name__ == '__main__': - args = _process_command_line_arguments() - main(args) diff --git a/source/sat/sat_web.py b/source/sat/sat_web.py index 241f216..57cdf0c 100644 --- a/source/sat/sat_web.py +++ b/source/sat/sat_web.py @@ -28,10 +28,11 @@ class SATWebService(): NS = { 's': 'http://schemas.xmlsoap.org/soap/envelope/', 'u': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd', - 'o': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd' + 'o': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd', 'des': 'http://DescargaMasivaTerceros.sat.gob.mx', 'xd': 'http://www.w3.org/2000/09/xmldsig#', } + NS_RESULT = {'s': NS['s'], None: XMLNS} def __init__(self, cert): self._cert = cert @@ -120,8 +121,7 @@ class SATWebService(): node_name = f"{{{self.NS['s']}}}Body" body = ET.SubElement(root, node_name) - nsmap = {None: self.XMLNS} - ET.SubElement(body, 'Autentica', nsmap=nsmap) + ET.SubElement(body, 'Autentica', nsmap=self.NS_RESULT) # ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8') soap = ET.tostring(root) @@ -146,10 +146,77 @@ class SATWebService(): return token def _get_data_req(self, args): + NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} + FORMAT = '%Y-%m-%dT%H:%M:%S' - return + date_start = args['date_start'] + date_end = args['date_end'] - def _get_request(self, args): + node_name = f"{{{self.NS['s']}}}Envelope" + root = ET.Element(node_name, nsmap=NSMAP) + + node_name = f"{{{self.NS['s']}}}Header" + body = ET.SubElement(root, node_name) + + node_name = f"{{{self.NS['s']}}}Body" + body = ET.SubElement(root, node_name) + + node_name = f"{{{self.NS['des']}}}SolicitaDescarga" + request_down = ET.SubElement(body, node_name) + + node_name = f"{{{self.NS['des']}}}solicitud" + attr = { + 'RfcSolicitante': self._cert.rfc, + 'FechaFinal': date_end.strftime(FORMAT), + 'FechaInicial': date_start.strftime(FORMAT), + 'TipoSolicitud': 'cfdi', + } + request = ET.SubElement(request_down, node_name, attr) + # ~ if rfc_emisor is not None: + # ~ solicitud.set('RfcEmisor', rfc_emisor) + # ~ if rfc_receptor is not None: + # ~ solicitud.set('RfcReceptor', rfc_receptor) + + nsmap = {None: self.NS['xd']} + signature = ET.SubElement(request, 'Signature', nsmap=nsmap) + signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap) + + node_name = 'CanonicalizationMethod' + attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'} + canonicalization = ET.SubElement(signed_info, node_name, attr) + + node_name = 'SignatureMethod' + attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'} + signature_method = ET.SubElement(signed_info, node_name, attr) + + attr = {'URI': '#_0'} + reference = ET.SubElement(signed_info, 'Reference', attr) + transforms = ET.SubElement(reference, 'Transforms') + ET.SubElement(transforms, 'Transform') + attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'} + ET.SubElement(reference, 'DigestMethod', attr) + + dvalue = ET.tostring(request_down, method='c14n', exclusive=1) + dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest()) + ET.SubElement(reference, 'DigestValue').text = dvalue + + signature_value = ET.tostring(signed_info, method='c14n', exclusive=1) + signature_value = self._cert.sign_sha1(signature_value) + ET.SubElement(signature, 'SignatureValue').text = signature_value + + key_info = ET.SubElement(signature, 'KeyInfo') + x_data = ET.SubElement(key_info, 'X509Data') + x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial') + ET.SubElement(x_issuer_serial, 'X509IssuerName').text = self._cert.issuer + ET.SubElement(x_issuer_serial, 'X509SerialNumber').text = self._cert.serial_number + ET.SubElement(x_data, 'X509Certificate').text = self._cert.cer_txt + + # ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8') + soap = ET.tostring(root) + + return soap + + def request_download(self, args): headers = self.HEADERS.copy() headers['SOAPAction'] = self.ACTIONS['REQ'] headers['Authorization'] = f'WRAP access_token="{self._token}"' @@ -160,12 +227,9 @@ class SATWebService(): self._error = f'Status: {response.status_code} - {response.text}' return - print(response.text) result = ET.fromstring(response.text) + node_name = 's:Body/SolicitaDescargaResponse/SolicitaDescargaResult' + node = result.find(node_name, namespaces=self.NS_RESULT) + data = dict(node.attrib) - return - - def download(self, args): - request = self._get_request(args) - print(request) - return + return data diff --git a/source/sat/util.py b/source/sat/util.py index bbd984d..4067519 100644 --- a/source/sat/util.py +++ b/source/sat/util.py @@ -2,6 +2,7 @@ import getpass import uuid +from calendar import monthrange from datetime import datetime from pathlib import Path @@ -138,6 +139,22 @@ def _validate_download_args(args): log.error(msg) return False, {} + now = today() + + month1 = month2 = data['month'] + if month1 == 0: + month1 = 1 + month2 = 12 + + if data['day']: + day1 = day2 = data['day'] + else: + day1 = 1 + day2 = monthrange(data['year'], month2)[1] + + data['date_start'] = datetime(data['year'], month1, day1) + data['date_end'] = datetime(data['year'], month2, day2, 23, 59, 59) + return True, data @@ -148,10 +165,12 @@ def sat_download(args): sat = SATWebService(data['cert']) - if sat.is_authenticate: - sat.download(data) - else: + if not sat.is_authenticate: log.error(sat.error) + return + + result = sat.request_download(data) + print(result) return