From b9e1c3332afdd836900e1a4fb543c1be21402b02 Mon Sep 17 00:00:00 2001 From: El Mau Date: Thu, 31 Mar 2022 13:35:28 -0600 Subject: [PATCH] Cambios en el SAT --- CHANGELOG.md | 5 +++ VERSION | 2 +- source/cfdi-descarga.py | 6 ++- source/conf.py.ejemplo | 2 + source/sat/sat_web.py | 99 +++++++++++++++++++++++++++++++---------- source/sat/util.py | 57 ++++++++++++++---------- 6 files changed, 121 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10f6d5e..075da2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ # Lista de cambios +## v 0.3.0 [31-Mar-22] +--- +* Cambios en el SAT + + ## v 0.2.1 [16-Jul-21] --- * Fix - Issue #1 diff --git a/VERSION b/VERSION index 0c62199..0d91a54 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.2.1 +0.3.0 diff --git a/source/cfdi-descarga.py b/source/cfdi-descarga.py index c62d548..9e70eec 100755 --- a/source/cfdi-descarga.py +++ b/source/cfdi-descarga.py @@ -51,7 +51,7 @@ def _process_command_line_arguments(): dest='month', default=0, type=int, choices=range(13)) help = "Día de la descarga, de forma predeterminada no se usa" parser.add_argument('-d', '--dia', help=help, - dest='day', default=0, type=int, choices=range(32)) + dest='day', default=0, type=int, choices=range(31)) help = "Intervalo de días a partir de la fecha actual y hacia a atras" parser.add_argument('-ud', '--ultimos-dias', help=help, dest='last_days', default=0, type=int, choices=range(30)) @@ -81,6 +81,10 @@ def _process_command_line_arguments(): parser.add_argument('-dd', '--directorio-descargas', dest='path_download', help=help, default='') + help = 'Descargar solo metadatos' + parser.add_argument('-md', '--metadata', help=help, + action='store_true', default=False, required=False) + args = parser.parse_args() return args diff --git a/source/conf.py.ejemplo b/source/conf.py.ejemplo index 8f1fe03..d4df8fb 100644 --- a/source/conf.py.ejemplo +++ b/source/conf.py.ejemplo @@ -5,3 +5,5 @@ DEBUG = False # ~ Este valor se usa para cifrar la FIEL # ~ Si la cambias en producción, debes de validar de nuevo las FIELs TOKEN = '' + +TIMEOUT = 10 diff --git a/source/sat/sat_web.py b/source/sat/sat_web.py index 2df7737..47df420 100644 --- a/source/sat/sat_web.py +++ b/source/sat/sat_web.py @@ -8,6 +8,8 @@ from datetime import datetime, timedelta import httpx import lxml.etree as ET +from conf import TIMEOUT + class SATWebService(): BASE = 'https://cfdidescargamasivasolicitud.clouda.sat.gob.mx' @@ -29,6 +31,8 @@ class SATWebService(): 'Content-type': 'text/xml;charset="utf-8"', 'Accept': 'text/xml', 'Cache-Control': 'no-cache', + 'Expect': '100-continue', + 'Accept-Encoding': 'gzip, deflate', } NS = { 's': 'http://schemas.xmlsoap.org/soap/envelope/', @@ -148,38 +152,69 @@ class SATWebService(): nsmap = {'s': self.NS['s'], None: self.XMLNS} node_name = 's:Body/AutenticaResponse/AutenticaResult' token = result.find(node_name, namespaces=nsmap).text + # ~ print(f'Token: {token}') return token def _get_data_req(self, args): - NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} + # ~ NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} + # ~ NSMAP = {'s': self.NS['s'], 'xd': self.NS['xd']} + NSMAP = {'s': self.NS['s']} FORMAT = '%Y-%m-%dT%H:%M:%S' date_start = args['date_start'] date_end = args['date_end'] + msg = f'Descarga desde: {date_start} hasta: {date_end}' + print(msg) node_name = f"{{{self.NS['s']}}}Envelope" root = ET.Element(node_name, nsmap=NSMAP) node_name = f"{{{self.NS['s']}}}Header" - body = ET.SubElement(root, node_name) + header = ET.SubElement(root, node_name) + + node_name = 'ActivityId' + attr = {'CorrelationId': '806aad0d-ef46-443b-9741-040c8e8e8c7d'} + nsmap = {None: 'http://schemas.microsoft.com/2004/09/ServiceModel/Diagnostics'} + activity = ET.SubElement(header, node_name, attr, nsmap=nsmap) + activity.text = 'e906cfb4-f706-43de-94d0-5cc935be1aaa' node_name = f"{{{self.NS['s']}}}Body" - body = ET.SubElement(root, node_name) - - node_name = f"{{{self.NS['des']}}}SolicitaDescarga" - request_down = ET.SubElement(body, node_name) - - node_name = f"{{{self.NS['des']}}}solicitud" - attr = { - 'RfcSolicitante': self._cert.rfc, - 'FechaFinal': date_end.strftime(FORMAT), - 'FechaInicial': date_start.strftime(FORMAT), - 'TipoSolicitud': 'CFDI', - args['rfc']: self._cert.rfc, + nsmap = { + 'xsi': 'http://www.w3.org/2001/XMLSchema-instance', + 'xsd': 'http://www.w3.org/2001/XMLSchema', } + body = ET.SubElement(root, node_name, nsmap=nsmap) + + # ~ node_name = f"{{{self.NS['des']}}}SolicitaDescarga" + node_name = "SolicitaDescarga" + nsmap = {None: 'http://DescargaMasivaTerceros.sat.gob.mx'} + request_down = ET.SubElement(body, node_name, nsmap=nsmap) + + # ~ node_name = f"{{{self.NS['des']}}}solicitud" + node_name = "solicitud" + + type_request = 'CFDI' + if args['metadata']: + type_request = 'Metadata' + attr = { + 'FechaInicial': date_start.strftime(FORMAT), + 'FechaFinal': date_end.strftime(FORMAT), + 'RfcSolicitante': self._cert.rfc, + 'TipoSolicitud': type_request, + # ~ 'RfcACuentaTerceros': '', + } + if args['rfc'] == 'RfcEmisor': + attr['RfcEmisor'] = self._cert.rfc request = ET.SubElement(request_down, node_name, attr) + node_name = 'RfcReceptores' + node_receptores = ET.SubElement(request, node_name) + node_name = 'RfcReceptor' + node_receptor = ET.SubElement(node_receptores, node_name) + if args['rfc'] == 'RfcReceptor': + node_receptor.text = self._cert.rfc + nsmap = {None: self.NS['xd']} signature = ET.SubElement(request, 'Signature', nsmap=nsmap) signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap) @@ -192,7 +227,8 @@ class SATWebService(): attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'} signature_method = ET.SubElement(signed_info, node_name, attr) - attr = {'URI': '#_0'} + # ~ attr = {'URI': '#_0'} + attr = {'URI': ''} reference = ET.SubElement(signed_info, 'Reference', attr) transforms = ET.SubElement(reference, 'Transforms') ET.SubElement(transforms, 'Transform', attr1) @@ -220,18 +256,25 @@ class SATWebService(): x_serial_number.text = str(self._cert.serial_number2) x_cert.text = self._cert.cer_txt - # ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8') - soap = ET.tostring(root) - + # ~ soap = b'\n' + ET.tostring(root, pretty_print=True, encoding='utf-8') + soap = ET.tostring(root, pretty_print=True) + # ~ print(soap.decode()) return soap def request_download(self, args): headers = self.HEADERS.copy() headers['SOAPAction'] = self.ACTIONS['REQ'] + headers['Host'] = 'srvsolicituddescargamaster.cloudapp.net' headers['Authorization'] = f'WRAP access_token="{self._token}"' data = self._get_data_req(args) - response = httpx.post(self.URL['REQ'], data=data, headers=headers) + try: + response = httpx.post(self.URL['REQ'], + data=data, headers=headers, timeout=TIMEOUT) + except httpx.TimeoutException as exc: + print(exc) + return + if response.status_code != httpx.codes.OK: self._error = f'Status: {response.status_code} - {response.text}' return @@ -244,7 +287,7 @@ class SATWebService(): return data def _get_data_verify(self, args): - NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} + NSMAP = {'soapenv': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} node_name = f"{{{self.NS['s']}}}Envelope" root = ET.Element(node_name, nsmap=NSMAP) @@ -277,7 +320,8 @@ class SATWebService(): attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'} signature_method = ET.SubElement(signed_info, node_name, attr) - attr = {'URI': '#_0'} + # ~ attr = {'URI': '#_0'} + attr = {'URI': ''} reference = ET.SubElement(signed_info, 'Reference', attr) transforms = ET.SubElement(reference, 'Transforms') ET.SubElement(transforms, 'Transform', attr1) @@ -330,7 +374,7 @@ class SATWebService(): return data def _get_data_download(self, args): - NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} + NSMAP = {'soapenv': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} node_name = f"{{{self.NS['s']}}}Envelope" root = ET.Element(node_name, nsmap=NSMAP) @@ -363,7 +407,8 @@ class SATWebService(): attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'} signature_method = ET.SubElement(signed_info, node_name, attr) - attr = {'URI': '#_0'} + # ~ attr = {'URI': '#_0'} + attr = {'URI': ''} reference = ET.SubElement(signed_info, 'Reference', attr) transforms = ET.SubElement(reference, 'Transforms') ET.SubElement(transforms, 'Transform', attr1) @@ -402,7 +447,13 @@ class SATWebService(): headers['Authorization'] = f'WRAP access_token="{self._token}"' data = self._get_data_download(args) - response = httpx.post(self.URL['DOWN'], data=data, headers=headers) + try: + response = httpx.post(self.URL['DOWN'], + data=data, headers=headers, timeout=TIMEOUT) + except httpx.TimeoutException as exc: + print(exc) + return + if response.status_code != httpx.codes.OK: self._error = f'Status: {response.status_code} - {response.text}' return diff --git a/source/sat/util.py b/source/sat/util.py index 9acfcd8..e3ca8ce 100644 --- a/source/sat/util.py +++ b/source/sat/util.py @@ -76,7 +76,13 @@ def fiel_validar(args): if not result: return - password = getpass.getpass('Introduce la contraseña del archivo KEY: ') + try: + password = getpass.getpass('Introduce la contraseña del archivo KEY: ') + except KeyboardInterrupt: + msg = 'Proceso cancelado' + log.info(msg) + return + if not password: msg = 'La contraseña es requerida para validar la FIEL' log.error(msg) @@ -115,14 +121,12 @@ def base_datos(): def _get_cert(data): key = b'' pem = b'' - - # ~ cer = data['path_cer'].read_bytes() - # ~ if data['path_enc'].is_file(): - # ~ key = data['path_enc'].read_bytes() - # ~ elif data['path_pem'].is_file(): - # ~ pem = data['path_pem'].read_bytes() - - cert = SATCertificate(data['path_cer'], key, data['path_pem']) + cer = data['path_cer'].read_bytes() + if data['path_enc'].is_file(): + key = data['path_enc'].read_bytes() + elif data['path_pem'].is_file(): + pem = data['path_pem'].read_bytes() + cert = SATCertificate(cer, key, pem) return cert @@ -163,6 +167,7 @@ def _validate_requests_args(args): data['year'] = args.year data['month'] = args.month data['day'] = args.day + data['metadata'] = args.metadata now = today() @@ -217,14 +222,12 @@ def _validate_verificar_args(args): if not result: return False, {} - if not data['path_enc'].is_file(): - msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}" + if not data['path_enc'].is_file() and not data['path_pem'].is_file(): + msg = f"No se encontró la FIEL [enc|pem].\nRuta: {data['path_enc']}" log.error(msg) return False, {} - cer = data['path_cer'].read_bytes() - key = data['path_enc'].read_bytes() - cert = SATCertificate(cer, key) + cert = _get_cert(data) if not cert.is_valid_time: msg = 'La FIEL no es vigente' @@ -247,14 +250,12 @@ def _validate_download_args(args): if not result: return False, {} - if not data['path_enc'].is_file(): - msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}" + if not data['path_enc'].is_file() and not data['path_pem'].is_file(): + msg = f"No se encontró la FIEL [enc|pem].\nRuta: {data['path_enc']}" log.error(msg) return False, {} - cer = data['path_cer'].read_bytes() - key = data['path_enc'].read_bytes() - cert = SATCertificate(cer, key) + cert = _get_cert(data) if not cert.is_valid_time: msg = 'La FIEL no es vigente' @@ -342,6 +343,10 @@ def descargar_archivos(args): else: result = sat.verify(data) files = result['files'] + if result['EstadoSolicitud'] in ('1', '2'): + msg = 'Solicitud aún no aceptada...' + log.error(msg) + return for f in files: data['id_file'] = f @@ -374,9 +379,11 @@ def _validate_args(args): return True, data -def _download(sat, data, key): +def _download(data, key): OK = '5000' + sat = SATWebService(data['cert']) + result = _request_download(sat, data, key) if result['CodEstatus'] != OK: log.error(result) @@ -391,7 +398,9 @@ def _download(sat, data, key): sleep(1) while True: + sat = SATWebService(data['cert']) result = sat.verify(data) + if result['EstadoSolicitud'] in ('1', '2'): msg = 'Esperando un minuto más para volver a verificar...' log.info(msg) @@ -434,12 +443,12 @@ def descargar(args): return if data['type'] == 'e': - _download(sat, data, 'RfcEmisor') + _download(data, 'RfcEmisor') elif data['type'] == 'r': - _download(sat, data, 'RfcReceptor') + _download(data, 'RfcReceptor') else: - _download(sat, data, 'RfcEmisor') - _download(sat, data, 'RfcReceptor') + _download(data, 'RfcEmisor') + _download(data, 'RfcReceptor') return