diff --git a/source/cfdi-descarga.py b/source/cfdi-descarga.py index a05bb3c..2606839 100755 --- a/source/cfdi-descarga.py +++ b/source/cfdi-descarga.py @@ -9,7 +9,17 @@ def main(args): util.fiel_validar(args) return - util.sat_download(args) + if args.solicitar_descarga: + util.solicitar_descarga(args) + return + + if args.verificar_descarga: + util.verificar_descarga(args) + return + + if args.descargar_archivos: + util.descargar_archivos(args) + return return @@ -37,9 +47,22 @@ def _process_command_line_arguments(): help = "Día de la descarga, de forma predeterminada no se usa" parser.add_argument('-d', '--dia', help=help, dest='day', default=0, type=int, choices=range(32)) - help = 'Verificar estatus de descarga' - parser.add_argument('-ve', '--fiel-validar', help=help, + help = 'Solicitar descarga' + parser.add_argument('-sd', '--solicitar-descarga', help=help, action='store_true', default=False, required=False) + help = 'Verificar estatus de descarga' + parser.add_argument('-vd', '--verificar-descarga', help=help, + action='store_true', default=False, required=False) + help = 'Descargar archivos' + parser.add_argument('-da', '--descargar-archivos', help=help, + action='store_true', default=False, required=False) + help = 'ID de solicitud' + parser.add_argument('-id', '--id-solicitud', dest='id_request', help=help, default='') + help = 'ID archivo' + parser.add_argument('-ida', '--id-archivo', dest='id_file', help=help, default='') + help = 'Ruta de descarga de archivos' + parser.add_argument('-dd', '--directorio-descargas', dest='path_download', + help=help, default='') args = parser.parse_args() return args diff --git a/source/conf.py.ejemplo b/source/conf.py.ejemplo new file mode 100644 index 0000000..8f1fe03 --- /dev/null +++ b/source/conf.py.ejemplo @@ -0,0 +1,7 @@ +#!/usr/bin/env python + +DEBUG = False + +# ~ Este valor se usa para cifrar la FIEL +# ~ Si la cambias en producción, debes de validar de nuevo las FIELs +TOKEN = '' diff --git a/source/sat/sat_web.py b/source/sat/sat_web.py index 46bc447..a032478 100644 --- a/source/sat/sat_web.py +++ b/source/sat/sat_web.py @@ -14,12 +14,16 @@ class SATWebService(): URL = { 'AUTH': f'{BASE}/Autenticacion/Autenticacion.svc', 'REQ': f'{BASE}/SolicitaDescargaService.svc', + 'VSD': f'{BASE}/VerificaSolicitudDescargaService.svc', + 'DOWN': 'https://cfdidescargamasiva.clouda.sat.gob.mx/DescargaMasivaService.svc', } XMLNS = 'http://DescargaMasivaTerceros.gob.mx' XMLNS2 = 'http://DescargaMasivaTerceros.sat.gob.mx' ACTIONS = { 'AUTH': f'{XMLNS}/IAutenticacion/Autentica', 'REQ': f'{XMLNS2}/ISolicitaDescargaService/SolicitaDescarga', + 'VSD': f'{XMLNS2}/IVerificaSolicitudDescargaService/VerificaSolicitudDescarga', + 'DOWN': f'{XMLNS2}/IDescargaMasivaTercerosService/Descargar', } HEADERS = { 'Content-type': 'text/xml;charset="utf-8"', @@ -242,3 +246,183 @@ class SATWebService(): data = dict(node.attrib) return data + + def _get_data_verify(self, args): + NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} + + node_name = f"{{{self.NS['s']}}}Envelope" + root = ET.Element(node_name, nsmap=NSMAP) + + node_name = f"{{{self.NS['s']}}}Header" + body = ET.SubElement(root, node_name) + + node_name = f"{{{self.NS['s']}}}Body" + body = ET.SubElement(root, node_name) + + node_name = f"{{{self.NS['des']}}}VerificaSolicitudDescarga" + verify_download = ET.SubElement(body, node_name) + + node_name = f"{{{self.NS['des']}}}solicitud" + attr = { + 'IdSolicitud': args['id'], + 'RfcSolicitante': self._cert.rfc, + } + request = ET.SubElement(verify_download, node_name, attr) + + nsmap = {None: self.NS['xd']} + signature = ET.SubElement(request, 'Signature', nsmap=nsmap) + signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap) + + node_name = 'CanonicalizationMethod' + attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'} + canonicalization = ET.SubElement(signed_info, node_name, attr1) + + node_name = 'SignatureMethod' + attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'} + signature_method = ET.SubElement(signed_info, node_name, attr) + + attr = {'URI': '#_0'} + reference = ET.SubElement(signed_info, 'Reference', attr) + transforms = ET.SubElement(reference, 'Transforms') + ET.SubElement(transforms, 'Transform', attr1) + attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'} + ET.SubElement(reference, 'DigestMethod', attr) + digest_value = ET.SubElement(reference, 'DigestValue') + signature_value = ET.SubElement(signature, 'SignatureValue') + + key_info = ET.SubElement(signature, 'KeyInfo') + x_data = ET.SubElement(key_info, 'X509Data') + x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial') + x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName') + x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber') + x_cert = ET.SubElement(x_data, 'X509Certificate') + + dvalue = ET.tostring(verify_download, method='c14n', exclusive=1) + dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest()) + digest_value.text = dvalue + + sign = ET.tostring(signed_info, method='c14n', exclusive=1) + sign = self._cert.sign_sha1(sign) + signature_value.text = sign + + x_issuer.text = self._cert.issuer + x_serial_number.text = str(self._cert.serial_number2) + x_cert.text = self._cert.cer_txt + + # ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8') + soap = ET.tostring(root) + + return soap + + def verify(self, args): + headers = self.HEADERS.copy() + headers['SOAPAction'] = self.ACTIONS['VSD'] + headers['Authorization'] = f'WRAP access_token="{self._token}"' + data = self._get_data_verify(args) + + response = httpx.post(self.URL['VSD'], data=data, headers=headers) + if response.status_code != httpx.codes.OK: + self._error = f'Status: {response.status_code} - {response.text}' + return + + result = ET.fromstring(response.text) + node_name = 's:Body/VerificaSolicitudDescargaResponse/VerificaSolicitudDescargaResult' + node = result.find(node_name, namespaces=self.NS_RESULT2) + data = dict(node.attrib) + data['files'] = [n.text for n in node] + + return data + + def _get_data_download(self, args): + NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']} + + node_name = f"{{{self.NS['s']}}}Envelope" + root = ET.Element(node_name, nsmap=NSMAP) + + node_name = f"{{{self.NS['s']}}}Header" + body = ET.SubElement(root, node_name) + + node_name = f"{{{self.NS['s']}}}Body" + body = ET.SubElement(root, node_name) + + node_name = f"{{{self.NS['des']}}}PeticionDescargaMasivaTercerosEntrada" + request_download = ET.SubElement(body, node_name) + + node_name = f"{{{self.NS['des']}}}peticionDescarga" + attr = { + 'IdPaquete': args['id_file'], + 'RfcSolicitante': self._cert.rfc, + } + request = ET.SubElement(request_download, node_name, attr) + + nsmap = {None: self.NS['xd']} + signature = ET.SubElement(request, 'Signature', nsmap=nsmap) + signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap) + + node_name = 'CanonicalizationMethod' + attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'} + canonicalization = ET.SubElement(signed_info, node_name, attr1) + + node_name = 'SignatureMethod' + attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'} + signature_method = ET.SubElement(signed_info, node_name, attr) + + attr = {'URI': '#_0'} + reference = ET.SubElement(signed_info, 'Reference', attr) + transforms = ET.SubElement(reference, 'Transforms') + ET.SubElement(transforms, 'Transform', attr1) + attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'} + ET.SubElement(reference, 'DigestMethod', attr) + digest_value = ET.SubElement(reference, 'DigestValue') + signature_value = ET.SubElement(signature, 'SignatureValue') + + key_info = ET.SubElement(signature, 'KeyInfo') + x_data = ET.SubElement(key_info, 'X509Data') + x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial') + x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName') + x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber') + x_cert = ET.SubElement(x_data, 'X509Certificate') + + dvalue = ET.tostring(request_download, method='c14n', exclusive=1) + dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest()) + digest_value.text = dvalue + + sign = ET.tostring(request_download, method='c14n', exclusive=1) + sign = self._cert.sign_sha1(sign) + signature_value.text = sign + + x_issuer.text = self._cert.issuer + x_serial_number.text = str(self._cert.serial_number2) + x_cert.text = self._cert.cer_txt + + # ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8') + soap = ET.tostring(root) + + return soap + + def download(self, args): + headers = self.HEADERS.copy() + headers['SOAPAction'] = self.ACTIONS['DOWN'] + headers['Authorization'] = f'WRAP access_token="{self._token}"' + data = self._get_data_download(args) + + response = httpx.post(self.URL['DOWN'], data=data, headers=headers) + if response.status_code != httpx.codes.OK: + self._error = f'Status: {response.status_code} - {response.text}' + return + + result = ET.fromstring(response.text) + namespaces = self.NS_RESULT2.copy() + namespaces['h'] = 'http://DescargaMasivaTerceros.sat.gob.mx' + + respuesta = result.find('s:Header/h:respuesta', namespaces=namespaces) + data = dict(respuesta.attrib) + + node_name = 's:Body/RespuestaDescargaMasivaTercerosSalida/Paquete' + node = result.find(node_name, namespaces=self.NS_RESULT2) + archivo = None + if not node.text is None: + archivo = base64.b64decode(node.text) + + return data, archivo + diff --git a/source/sat/util.py b/source/sat/util.py index 8c78a90..8b93dbb 100644 --- a/source/sat/util.py +++ b/source/sat/util.py @@ -109,7 +109,7 @@ def base_datos(): return -def _validate_download_args(args): +def _validate_requests_args(args): result, data = _validate_fiel_args(args) if not result: return False, {} @@ -159,8 +159,69 @@ def _validate_download_args(args): return True, data -def sat_download(args): - result, data = _validate_download_args(args) +def _validate_verificar_args(args): + result, data = _validate_fiel_args(args) + if not result: + return False, {} + + if not data['path_enc'].is_file(): + msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}" + log.error(msg) + return False, {} + + cer = data['path_cer'].read_bytes() + key = data['path_enc'].read_bytes() + cert = SATCertificate(cer, key) + + if not cert.is_valid_time: + msg = 'La FIEL no es vigente' + log.error(msg) + return False, {} + + if not args.id_request: + msg = 'El ID de solicitud de descarga es requerido' + log.error(msg) + return False, {} + + data['cert'] = cert + data['id'] = args.id_request + + return True, data + + +def _validate_download_args(args): + result, data = _validate_fiel_args(args) + if not result: + return False, {} + + if not data['path_enc'].is_file(): + msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}" + log.error(msg) + return False, {} + + cer = data['path_cer'].read_bytes() + key = data['path_enc'].read_bytes() + cert = SATCertificate(cer, key) + + if not cert.is_valid_time: + msg = 'La FIEL no es vigente' + log.error(msg) + return False, {} + + if not args.id_request and not args.id_file: + msg = 'El ID de solicitud o ID de archivo de descarga es requerido' + log.error(msg) + return False, {} + + data['cert'] = cert + data['id'] = args.id_request + data['id_file'] = args.id_file + + return True, data + + +def solicitar_descarga(args): + result, data = _validate_requests_args(args) if not result: return @@ -175,3 +236,54 @@ def sat_download(args): return + +def verificar_descarga(args): + result, data = _validate_verificar_args(args) + if not result: + return + + sat = SATWebService(data['cert']) + + if not sat.is_authenticate: + log.error(sat.error) + return + + result = sat.verify(data) + print(result) + + return + + +def descargar_archivos(args): + result, data = _validate_download_args(args) + if not result: + return + + sat = SATWebService(data['cert']) + + if not sat.is_authenticate: + log.error(sat.error) + return + + if args.id_file: + files = (args.id_file,) + else: + result = sat.verify(data) + files = result['files'] + + for f in files: + data['id_file'] = f + result, file_data = sat.download(data) + if file_data is None: + log.debug(result) + else: + msg = f'Guardando: {f}.zip' + log.info(msg) + with open(f"{f}.zip", 'wb') as f: + f.write(file_data) + msg = f'\tArchivo guardado correctamente' + log.info(msg) + return + + +