Descarga de archivo correcta

This commit is contained in:
Mauricio Baeza 2021-07-14 21:20:57 -05:00
parent 82033da207
commit 98ba0df3af
4 changed files with 332 additions and 6 deletions

View File

@ -9,7 +9,17 @@ def main(args):
util.fiel_validar(args)
return
util.sat_download(args)
if args.solicitar_descarga:
util.solicitar_descarga(args)
return
if args.verificar_descarga:
util.verificar_descarga(args)
return
if args.descargar_archivos:
util.descargar_archivos(args)
return
return
@ -37,9 +47,22 @@ def _process_command_line_arguments():
help = "Día de la descarga, de forma predeterminada no se usa"
parser.add_argument('-d', '--dia', help=help, dest='day', default=0, type=int, choices=range(32))
help = 'Verificar estatus de descarga'
parser.add_argument('-ve', '--fiel-validar', help=help,
help = 'Solicitar descarga'
parser.add_argument('-sd', '--solicitar-descarga', help=help,
action='store_true', default=False, required=False)
help = 'Verificar estatus de descarga'
parser.add_argument('-vd', '--verificar-descarga', help=help,
action='store_true', default=False, required=False)
help = 'Descargar archivos'
parser.add_argument('-da', '--descargar-archivos', help=help,
action='store_true', default=False, required=False)
help = 'ID de solicitud'
parser.add_argument('-id', '--id-solicitud', dest='id_request', help=help, default='')
help = 'ID archivo'
parser.add_argument('-ida', '--id-archivo', dest='id_file', help=help, default='')
help = 'Ruta de descarga de archivos'
parser.add_argument('-dd', '--directorio-descargas', dest='path_download',
help=help, default='')
args = parser.parse_args()
return args

7
source/conf.py.ejemplo Normal file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env python
DEBUG = False
# ~ Este valor se usa para cifrar la FIEL
# ~ Si la cambias en producción, debes de validar de nuevo las FIELs
TOKEN = ''

View File

@ -14,12 +14,16 @@ class SATWebService():
URL = {
'AUTH': f'{BASE}/Autenticacion/Autenticacion.svc',
'REQ': f'{BASE}/SolicitaDescargaService.svc',
'VSD': f'{BASE}/VerificaSolicitudDescargaService.svc',
'DOWN': 'https://cfdidescargamasiva.clouda.sat.gob.mx/DescargaMasivaService.svc',
}
XMLNS = 'http://DescargaMasivaTerceros.gob.mx'
XMLNS2 = 'http://DescargaMasivaTerceros.sat.gob.mx'
ACTIONS = {
'AUTH': f'{XMLNS}/IAutenticacion/Autentica',
'REQ': f'{XMLNS2}/ISolicitaDescargaService/SolicitaDescarga',
'VSD': f'{XMLNS2}/IVerificaSolicitudDescargaService/VerificaSolicitudDescarga',
'DOWN': f'{XMLNS2}/IDescargaMasivaTercerosService/Descargar',
}
HEADERS = {
'Content-type': 'text/xml;charset="utf-8"',
@ -242,3 +246,183 @@ class SATWebService():
data = dict(node.attrib)
return data
def _get_data_verify(self, args):
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
node_name = f"{{{self.NS['s']}}}Envelope"
root = ET.Element(node_name, nsmap=NSMAP)
node_name = f"{{{self.NS['s']}}}Header"
body = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['s']}}}Body"
body = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['des']}}}VerificaSolicitudDescarga"
verify_download = ET.SubElement(body, node_name)
node_name = f"{{{self.NS['des']}}}solicitud"
attr = {
'IdSolicitud': args['id'],
'RfcSolicitante': self._cert.rfc,
}
request = ET.SubElement(verify_download, node_name, attr)
nsmap = {None: self.NS['xd']}
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
node_name = 'CanonicalizationMethod'
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
canonicalization = ET.SubElement(signed_info, node_name, attr1)
node_name = 'SignatureMethod'
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
signature_method = ET.SubElement(signed_info, node_name, attr)
attr = {'URI': '#_0'}
reference = ET.SubElement(signed_info, 'Reference', attr)
transforms = ET.SubElement(reference, 'Transforms')
ET.SubElement(transforms, 'Transform', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
ET.SubElement(reference, 'DigestMethod', attr)
digest_value = ET.SubElement(reference, 'DigestValue')
signature_value = ET.SubElement(signature, 'SignatureValue')
key_info = ET.SubElement(signature, 'KeyInfo')
x_data = ET.SubElement(key_info, 'X509Data')
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName')
x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber')
x_cert = ET.SubElement(x_data, 'X509Certificate')
dvalue = ET.tostring(verify_download, method='c14n', exclusive=1)
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
digest_value.text = dvalue
sign = ET.tostring(signed_info, method='c14n', exclusive=1)
sign = self._cert.sign_sha1(sign)
signature_value.text = sign
x_issuer.text = self._cert.issuer
x_serial_number.text = str(self._cert.serial_number2)
x_cert.text = self._cert.cer_txt
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
soap = ET.tostring(root)
return soap
def verify(self, args):
headers = self.HEADERS.copy()
headers['SOAPAction'] = self.ACTIONS['VSD']
headers['Authorization'] = f'WRAP access_token="{self._token}"'
data = self._get_data_verify(args)
response = httpx.post(self.URL['VSD'], data=data, headers=headers)
if response.status_code != httpx.codes.OK:
self._error = f'Status: {response.status_code} - {response.text}'
return
result = ET.fromstring(response.text)
node_name = 's:Body/VerificaSolicitudDescargaResponse/VerificaSolicitudDescargaResult'
node = result.find(node_name, namespaces=self.NS_RESULT2)
data = dict(node.attrib)
data['files'] = [n.text for n in node]
return data
def _get_data_download(self, args):
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
node_name = f"{{{self.NS['s']}}}Envelope"
root = ET.Element(node_name, nsmap=NSMAP)
node_name = f"{{{self.NS['s']}}}Header"
body = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['s']}}}Body"
body = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['des']}}}PeticionDescargaMasivaTercerosEntrada"
request_download = ET.SubElement(body, node_name)
node_name = f"{{{self.NS['des']}}}peticionDescarga"
attr = {
'IdPaquete': args['id_file'],
'RfcSolicitante': self._cert.rfc,
}
request = ET.SubElement(request_download, node_name, attr)
nsmap = {None: self.NS['xd']}
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
node_name = 'CanonicalizationMethod'
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
canonicalization = ET.SubElement(signed_info, node_name, attr1)
node_name = 'SignatureMethod'
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
signature_method = ET.SubElement(signed_info, node_name, attr)
attr = {'URI': '#_0'}
reference = ET.SubElement(signed_info, 'Reference', attr)
transforms = ET.SubElement(reference, 'Transforms')
ET.SubElement(transforms, 'Transform', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
ET.SubElement(reference, 'DigestMethod', attr)
digest_value = ET.SubElement(reference, 'DigestValue')
signature_value = ET.SubElement(signature, 'SignatureValue')
key_info = ET.SubElement(signature, 'KeyInfo')
x_data = ET.SubElement(key_info, 'X509Data')
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName')
x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber')
x_cert = ET.SubElement(x_data, 'X509Certificate')
dvalue = ET.tostring(request_download, method='c14n', exclusive=1)
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
digest_value.text = dvalue
sign = ET.tostring(request_download, method='c14n', exclusive=1)
sign = self._cert.sign_sha1(sign)
signature_value.text = sign
x_issuer.text = self._cert.issuer
x_serial_number.text = str(self._cert.serial_number2)
x_cert.text = self._cert.cer_txt
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
soap = ET.tostring(root)
return soap
def download(self, args):
headers = self.HEADERS.copy()
headers['SOAPAction'] = self.ACTIONS['DOWN']
headers['Authorization'] = f'WRAP access_token="{self._token}"'
data = self._get_data_download(args)
response = httpx.post(self.URL['DOWN'], data=data, headers=headers)
if response.status_code != httpx.codes.OK:
self._error = f'Status: {response.status_code} - {response.text}'
return
result = ET.fromstring(response.text)
namespaces = self.NS_RESULT2.copy()
namespaces['h'] = 'http://DescargaMasivaTerceros.sat.gob.mx'
respuesta = result.find('s:Header/h:respuesta', namespaces=namespaces)
data = dict(respuesta.attrib)
node_name = 's:Body/RespuestaDescargaMasivaTercerosSalida/Paquete'
node = result.find(node_name, namespaces=self.NS_RESULT2)
archivo = None
if not node.text is None:
archivo = base64.b64decode(node.text)
return data, archivo

View File

@ -109,7 +109,7 @@ def base_datos():
return
def _validate_download_args(args):
def _validate_requests_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
@ -159,8 +159,69 @@ def _validate_download_args(args):
return True, data
def sat_download(args):
result, data = _validate_download_args(args)
def _validate_verificar_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file():
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cer = data['path_cer'].read_bytes()
key = data['path_enc'].read_bytes()
cert = SATCertificate(cer, key)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
if not args.id_request:
msg = 'El ID de solicitud de descarga es requerido'
log.error(msg)
return False, {}
data['cert'] = cert
data['id'] = args.id_request
return True, data
def _validate_download_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file():
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cer = data['path_cer'].read_bytes()
key = data['path_enc'].read_bytes()
cert = SATCertificate(cer, key)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
if not args.id_request and not args.id_file:
msg = 'El ID de solicitud o ID de archivo de descarga es requerido'
log.error(msg)
return False, {}
data['cert'] = cert
data['id'] = args.id_request
data['id_file'] = args.id_file
return True, data
def solicitar_descarga(args):
result, data = _validate_requests_args(args)
if not result:
return
@ -175,3 +236,54 @@ def sat_download(args):
return
def verificar_descarga(args):
result, data = _validate_verificar_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
result = sat.verify(data)
print(result)
return
def descargar_archivos(args):
result, data = _validate_download_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
if args.id_file:
files = (args.id_file,)
else:
result = sat.verify(data)
files = result['files']
for f in files:
data['id_file'] = f
result, file_data = sat.download(data)
if file_data is None:
log.debug(result)
else:
msg = f'Guardando: {f}.zip'
log.info(msg)
with open(f"{f}.zip", 'wb') as f:
f.write(file_data)
msg = f'\tArchivo guardado correctamente'
log.info(msg)
return