425 lines
17 KiB
Python
425 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import base64
|
|
import hashlib
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
|
|
import httpx
|
|
import lxml.etree as ET
|
|
|
|
|
|
class SATWebService():
|
|
BASE = 'https://cfdidescargamasivasolicitud.clouda.sat.gob.mx'
|
|
URL = {
|
|
'AUTH': f'{BASE}/Autenticacion/Autenticacion.svc',
|
|
'REQ': f'{BASE}/SolicitaDescargaService.svc',
|
|
'VSD': f'{BASE}/VerificaSolicitudDescargaService.svc',
|
|
'DOWN': 'https://cfdidescargamasiva.clouda.sat.gob.mx/DescargaMasivaService.svc',
|
|
}
|
|
XMLNS = 'http://DescargaMasivaTerceros.gob.mx'
|
|
XMLNS2 = 'http://DescargaMasivaTerceros.sat.gob.mx'
|
|
ACTIONS = {
|
|
'AUTH': f'{XMLNS}/IAutenticacion/Autentica',
|
|
'REQ': f'{XMLNS2}/ISolicitaDescargaService/SolicitaDescarga',
|
|
'VSD': f'{XMLNS2}/IVerificaSolicitudDescargaService/VerificaSolicitudDescarga',
|
|
'DOWN': f'{XMLNS2}/IDescargaMasivaTercerosService/Descargar',
|
|
}
|
|
HEADERS = {
|
|
'Content-type': 'text/xml;charset="utf-8"',
|
|
'Accept': 'text/xml',
|
|
'Cache-Control': 'no-cache',
|
|
}
|
|
NS = {
|
|
's': 'http://schemas.xmlsoap.org/soap/envelope/',
|
|
'u': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
|
|
'o': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd',
|
|
'des': 'http://DescargaMasivaTerceros.sat.gob.mx',
|
|
'xd': 'http://www.w3.org/2000/09/xmldsig#',
|
|
}
|
|
NS_RESULT = {'s': NS['s'], None: XMLNS}
|
|
NS_RESULT2 = {'s': NS['s'], None: XMLNS2}
|
|
|
|
def __init__(self, cert):
|
|
self._cert = cert
|
|
self._error = ''
|
|
self._token = self._get_token()
|
|
|
|
@property
|
|
def is_authenticate(self):
|
|
return bool(self._token)
|
|
|
|
@property
|
|
def error(self):
|
|
return self._error
|
|
|
|
def _get_data_auth(self):
|
|
NSMAP = {'s': self.NS['s'], 'u': self.NS['u']}
|
|
FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
|
|
UID = str(uuid.uuid4())
|
|
|
|
now = datetime.utcnow()
|
|
date_created = now.strftime(FORMAT)
|
|
date_expires = (now + timedelta(seconds=300)).strftime(FORMAT)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Envelope"
|
|
root = ET.Element(node_name, nsmap=NSMAP)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Header"
|
|
header = ET.SubElement(root, node_name)
|
|
|
|
node_name = f"{{{self.NS['o']}}}Security"
|
|
nsmap = {'o': self.NS['o']}
|
|
attr_name = f"{{{self.NS['s']}}}mustUnderstand"
|
|
attr = {attr_name: '1'}
|
|
security = ET.SubElement(header, node_name, attr, nsmap=nsmap)
|
|
|
|
node_name = f"{{{self.NS['u']}}}Timestamp"
|
|
attr_name = f"{{{self.NS['u']}}}Id"
|
|
attr = {attr_name: '_0'}
|
|
timestamp = ET.SubElement(security, node_name, attr)
|
|
node_name = f"{{{self.NS['u']}}}Created"
|
|
ET.SubElement(timestamp, node_name).text = date_created
|
|
node_name = f"{{{self.NS['u']}}}Expires"
|
|
ET.SubElement(timestamp, node_name).text = date_expires
|
|
|
|
node_name = f"{{{self.NS['o']}}}BinarySecurityToken"
|
|
attr = {
|
|
f"{{{self.NS['u']}}}Id": UID,
|
|
'ValueType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3',
|
|
'EncodingType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary'
|
|
}
|
|
ET.SubElement(security, node_name, attr).text = self._cert.cer_txt
|
|
|
|
nsmap = {None: 'http://www.w3.org/2000/09/xmldsig#'}
|
|
signature = ET.SubElement(security, 'Signature', nsmap=nsmap)
|
|
signedinfo = ET.SubElement(signature, 'SignedInfo')
|
|
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
|
|
ET.SubElement(signedinfo, 'CanonicalizationMethod', attr1)
|
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
|
|
ET.SubElement(signedinfo, 'SignatureMethod', attr)
|
|
|
|
attr = {'URI': '#_0'}
|
|
reference = ET.SubElement(signedinfo, 'Reference', attr)
|
|
transforms = ET.SubElement(reference, 'Transforms')
|
|
ET.SubElement(transforms, 'Transform', attr1)
|
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
|
|
ET.SubElement(reference, 'DigestMethod', attr)
|
|
|
|
dvalue = ET.tostring(timestamp, method='c14n', exclusive=1)
|
|
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
|
|
ET.SubElement(reference, 'DigestValue').text = dvalue
|
|
|
|
signature_value = ET.tostring(signedinfo, method='c14n', exclusive=1)
|
|
signature_value = self._cert.sign_sha1(signature_value)
|
|
ET.SubElement(signature, 'SignatureValue').text = signature_value
|
|
keyinfo = ET.SubElement(signature, 'KeyInfo')
|
|
|
|
node_name = f"{{{self.NS['o']}}}SecurityTokenReference"
|
|
security_token = ET.SubElement(keyinfo, node_name)
|
|
|
|
node_name = f"{{{self.NS['o']}}}Reference"
|
|
attr = {
|
|
'ValueType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3',
|
|
'URI': f'#{UID}',
|
|
}
|
|
ET.SubElement(security_token, node_name, attr)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Body"
|
|
body = ET.SubElement(root, node_name)
|
|
ET.SubElement(body, 'Autentica', nsmap=self.NS_RESULT)
|
|
|
|
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
|
|
soap = ET.tostring(root)
|
|
|
|
return soap
|
|
|
|
def _get_token(self):
|
|
headers = self.HEADERS.copy()
|
|
headers['SOAPAction'] = self.ACTIONS['AUTH']
|
|
data = self._get_data_auth()
|
|
|
|
response = httpx.post(self.URL['AUTH'], data=data, headers=headers)
|
|
if response.status_code != httpx.codes.OK:
|
|
self._error = f'Status: {response.status_code} - {response.text}'
|
|
return
|
|
|
|
result = ET.fromstring(response.text)
|
|
nsmap = {'s': self.NS['s'], None: self.XMLNS}
|
|
node_name = 's:Body/AutenticaResponse/AutenticaResult'
|
|
token = result.find(node_name, namespaces=nsmap).text
|
|
|
|
return token
|
|
|
|
def _get_data_req(self, args):
|
|
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
|
FORMAT = '%Y-%m-%dT%H:%M:%S'
|
|
|
|
date_start = args['date_start']
|
|
date_end = args['date_end']
|
|
|
|
node_name = f"{{{self.NS['s']}}}Envelope"
|
|
root = ET.Element(node_name, nsmap=NSMAP)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Header"
|
|
body = ET.SubElement(root, node_name)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Body"
|
|
body = ET.SubElement(root, node_name)
|
|
|
|
node_name = f"{{{self.NS['des']}}}SolicitaDescarga"
|
|
request_down = ET.SubElement(body, node_name)
|
|
|
|
node_name = f"{{{self.NS['des']}}}solicitud"
|
|
attr = {
|
|
'RfcSolicitante': self._cert.rfc,
|
|
'FechaFinal': date_end.strftime(FORMAT),
|
|
'FechaInicial': date_start.strftime(FORMAT),
|
|
'TipoSolicitud': 'CFDI',
|
|
args['rfc']: self._cert.rfc,
|
|
}
|
|
request = ET.SubElement(request_down, node_name, attr)
|
|
|
|
nsmap = {None: self.NS['xd']}
|
|
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
|
|
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
|
|
|
|
node_name = 'CanonicalizationMethod'
|
|
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
|
|
canonicalization = ET.SubElement(signed_info, node_name, attr1)
|
|
|
|
node_name = 'SignatureMethod'
|
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
|
|
signature_method = ET.SubElement(signed_info, node_name, attr)
|
|
|
|
attr = {'URI': '#_0'}
|
|
reference = ET.SubElement(signed_info, 'Reference', attr)
|
|
transforms = ET.SubElement(reference, 'Transforms')
|
|
ET.SubElement(transforms, 'Transform', attr1)
|
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
|
|
ET.SubElement(reference, 'DigestMethod', attr)
|
|
digest_value = ET.SubElement(reference, 'DigestValue')
|
|
signature_value = ET.SubElement(signature, 'SignatureValue')
|
|
|
|
key_info = ET.SubElement(signature, 'KeyInfo')
|
|
x_data = ET.SubElement(key_info, 'X509Data')
|
|
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
|
|
x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName')
|
|
x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber')
|
|
x_cert = ET.SubElement(x_data, 'X509Certificate')
|
|
|
|
dvalue = ET.tostring(request_down, method='c14n', exclusive=1)
|
|
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
|
|
digest_value.text = dvalue
|
|
|
|
sign = ET.tostring(signed_info, method='c14n', exclusive=1)
|
|
sign = self._cert.sign_sha1(sign)
|
|
signature_value.text = sign
|
|
|
|
x_issuer.text = self._cert.issuer
|
|
x_serial_number.text = str(self._cert.serial_number2)
|
|
x_cert.text = self._cert.cer_txt
|
|
|
|
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
|
|
soap = ET.tostring(root)
|
|
|
|
return soap
|
|
|
|
def request_download(self, args):
|
|
headers = self.HEADERS.copy()
|
|
headers['SOAPAction'] = self.ACTIONS['REQ']
|
|
headers['Authorization'] = f'WRAP access_token="{self._token}"'
|
|
data = self._get_data_req(args)
|
|
|
|
response = httpx.post(self.URL['REQ'], data=data, headers=headers)
|
|
if response.status_code != httpx.codes.OK:
|
|
self._error = f'Status: {response.status_code} - {response.text}'
|
|
return
|
|
|
|
result = ET.fromstring(response.text)
|
|
node_name = 's:Body/SolicitaDescargaResponse/SolicitaDescargaResult'
|
|
node = result.find(node_name, namespaces=self.NS_RESULT2)
|
|
data = dict(node.attrib)
|
|
|
|
return data
|
|
|
|
def _get_data_verify(self, args):
|
|
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
|
|
|
node_name = f"{{{self.NS['s']}}}Envelope"
|
|
root = ET.Element(node_name, nsmap=NSMAP)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Header"
|
|
body = ET.SubElement(root, node_name)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Body"
|
|
body = ET.SubElement(root, node_name)
|
|
|
|
node_name = f"{{{self.NS['des']}}}VerificaSolicitudDescarga"
|
|
verify_download = ET.SubElement(body, node_name)
|
|
|
|
node_name = f"{{{self.NS['des']}}}solicitud"
|
|
attr = {
|
|
'IdSolicitud': args['id'],
|
|
'RfcSolicitante': self._cert.rfc,
|
|
}
|
|
request = ET.SubElement(verify_download, node_name, attr)
|
|
|
|
nsmap = {None: self.NS['xd']}
|
|
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
|
|
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
|
|
|
|
node_name = 'CanonicalizationMethod'
|
|
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
|
|
canonicalization = ET.SubElement(signed_info, node_name, attr1)
|
|
|
|
node_name = 'SignatureMethod'
|
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
|
|
signature_method = ET.SubElement(signed_info, node_name, attr)
|
|
|
|
attr = {'URI': '#_0'}
|
|
reference = ET.SubElement(signed_info, 'Reference', attr)
|
|
transforms = ET.SubElement(reference, 'Transforms')
|
|
ET.SubElement(transforms, 'Transform', attr1)
|
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
|
|
ET.SubElement(reference, 'DigestMethod', attr)
|
|
digest_value = ET.SubElement(reference, 'DigestValue')
|
|
signature_value = ET.SubElement(signature, 'SignatureValue')
|
|
|
|
key_info = ET.SubElement(signature, 'KeyInfo')
|
|
x_data = ET.SubElement(key_info, 'X509Data')
|
|
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
|
|
x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName')
|
|
x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber')
|
|
x_cert = ET.SubElement(x_data, 'X509Certificate')
|
|
|
|
dvalue = ET.tostring(verify_download, method='c14n', exclusive=1)
|
|
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
|
|
digest_value.text = dvalue
|
|
|
|
sign = ET.tostring(signed_info, method='c14n', exclusive=1)
|
|
sign = self._cert.sign_sha1(sign)
|
|
signature_value.text = sign
|
|
|
|
x_issuer.text = self._cert.issuer
|
|
x_serial_number.text = str(self._cert.serial_number2)
|
|
x_cert.text = self._cert.cer_txt
|
|
|
|
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
|
|
soap = ET.tostring(root)
|
|
|
|
return soap
|
|
|
|
def verify(self, args):
|
|
headers = self.HEADERS.copy()
|
|
headers['SOAPAction'] = self.ACTIONS['VSD']
|
|
headers['Authorization'] = f'WRAP access_token="{self._token}"'
|
|
data = self._get_data_verify(args)
|
|
|
|
response = httpx.post(self.URL['VSD'], data=data, headers=headers)
|
|
if response.status_code != httpx.codes.OK:
|
|
self._error = f'Status: {response.status_code} - {response.text}'
|
|
return
|
|
|
|
result = ET.fromstring(response.text)
|
|
node_name = 's:Body/VerificaSolicitudDescargaResponse/VerificaSolicitudDescargaResult'
|
|
node = result.find(node_name, namespaces=self.NS_RESULT2)
|
|
data = dict(node.attrib)
|
|
data['files'] = [n.text for n in node]
|
|
|
|
return data
|
|
|
|
def _get_data_download(self, args):
|
|
NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
|
|
|
|
node_name = f"{{{self.NS['s']}}}Envelope"
|
|
root = ET.Element(node_name, nsmap=NSMAP)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Header"
|
|
body = ET.SubElement(root, node_name)
|
|
|
|
node_name = f"{{{self.NS['s']}}}Body"
|
|
body = ET.SubElement(root, node_name)
|
|
|
|
node_name = f"{{{self.NS['des']}}}PeticionDescargaMasivaTercerosEntrada"
|
|
request_download = ET.SubElement(body, node_name)
|
|
|
|
node_name = f"{{{self.NS['des']}}}peticionDescarga"
|
|
attr = {
|
|
'IdPaquete': args['id_file'],
|
|
'RfcSolicitante': self._cert.rfc,
|
|
}
|
|
request = ET.SubElement(request_download, node_name, attr)
|
|
|
|
nsmap = {None: self.NS['xd']}
|
|
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
|
|
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
|
|
|
|
node_name = 'CanonicalizationMethod'
|
|
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
|
|
canonicalization = ET.SubElement(signed_info, node_name, attr1)
|
|
|
|
node_name = 'SignatureMethod'
|
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
|
|
signature_method = ET.SubElement(signed_info, node_name, attr)
|
|
|
|
attr = {'URI': '#_0'}
|
|
reference = ET.SubElement(signed_info, 'Reference', attr)
|
|
transforms = ET.SubElement(reference, 'Transforms')
|
|
ET.SubElement(transforms, 'Transform', attr1)
|
|
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
|
|
ET.SubElement(reference, 'DigestMethod', attr)
|
|
digest_value = ET.SubElement(reference, 'DigestValue')
|
|
signature_value = ET.SubElement(signature, 'SignatureValue')
|
|
|
|
key_info = ET.SubElement(signature, 'KeyInfo')
|
|
x_data = ET.SubElement(key_info, 'X509Data')
|
|
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
|
|
x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName')
|
|
x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber')
|
|
x_cert = ET.SubElement(x_data, 'X509Certificate')
|
|
|
|
dvalue = ET.tostring(request_download, method='c14n', exclusive=1)
|
|
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
|
|
digest_value.text = dvalue
|
|
|
|
sign = ET.tostring(request_download, method='c14n', exclusive=1)
|
|
sign = self._cert.sign_sha1(sign)
|
|
signature_value.text = sign
|
|
|
|
x_issuer.text = self._cert.issuer
|
|
x_serial_number.text = str(self._cert.serial_number2)
|
|
x_cert.text = self._cert.cer_txt
|
|
|
|
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
|
|
soap = ET.tostring(root)
|
|
|
|
return soap
|
|
|
|
def download(self, args):
|
|
headers = self.HEADERS.copy()
|
|
headers['SOAPAction'] = self.ACTIONS['DOWN']
|
|
headers['Authorization'] = f'WRAP access_token="{self._token}"'
|
|
data = self._get_data_download(args)
|
|
|
|
response = httpx.post(self.URL['DOWN'], data=data, headers=headers)
|
|
if response.status_code != httpx.codes.OK:
|
|
self._error = f'Status: {response.status_code} - {response.text}'
|
|
return
|
|
|
|
result = ET.fromstring(response.text, parser=ET.XMLParser(huge_tree=True))
|
|
namespaces = self.NS_RESULT2.copy()
|
|
namespaces['h'] = 'http://DescargaMasivaTerceros.sat.gob.mx'
|
|
|
|
respuesta = result.find('s:Header/h:respuesta', namespaces=namespaces)
|
|
data = dict(respuesta.attrib)
|
|
|
|
node_name = 's:Body/RespuestaDescargaMasivaTercerosSalida/Paquete'
|
|
node = result.find(node_name, namespaces=self.NS_RESULT2)
|
|
archivo = None
|
|
if not node.text is None:
|
|
archivo = base64.b64decode(node.text)
|
|
|
|
return data, archivo
|
|
|