cfdi-descarga/source/sat/sat_web.py

476 lines
19 KiB
Python

#!/usr/bin/env python3
import base64
import hashlib
import uuid
from datetime import datetime, timedelta
import httpx
import lxml.etree as ET
from conf import TIMEOUT
class SATWebService():
BASE = 'https://cfdidescargamasivasolicitud.clouda.sat.gob.mx'
URL = {
'AUTH': f'{BASE}/Autenticacion/Autenticacion.svc',
'REQ': f'{BASE}/SolicitaDescargaService.svc',
'VSD': f'{BASE}/VerificaSolicitudDescargaService.svc',
'DOWN': 'https://cfdidescargamasiva.clouda.sat.gob.mx/DescargaMasivaService.svc',
}
XMLNS = 'http://DescargaMasivaTerceros.gob.mx'
XMLNS2 = 'http://DescargaMasivaTerceros.sat.gob.mx'
ACTIONS = {
'AUTH': f'{XMLNS}/IAutenticacion/Autentica',
'REQ': f'{XMLNS2}/ISolicitaDescargaService/SolicitaDescarga',
'VSD': f'{XMLNS2}/IVerificaSolicitudDescargaService/VerificaSolicitudDescarga',
'DOWN': f'{XMLNS2}/IDescargaMasivaTercerosService/Descargar',
}
HEADERS = {
'Content-type': 'text/xml;charset="utf-8"',
'Accept': 'text/xml',
'Cache-Control': 'no-cache',
'Expect': '100-continue',
'Accept-Encoding': 'gzip, deflate',
}
NS = {
's': 'http://schemas.xmlsoap.org/soap/envelope/',
'u': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd',
'o': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd',
'des': 'http://DescargaMasivaTerceros.sat.gob.mx',
'xd': 'http://www.w3.org/2000/09/xmldsig#',
}
NS_RESULT = {'s': NS['s'], None: XMLNS}
NS_RESULT2 = {'s': NS['s'], None: XMLNS2}
def __init__(self, cert):
self._cert = cert
self._error = ''
self._token = self._get_token()
@property
def is_authenticate(self):
return bool(self._token)
@property
def error(self):
return self._error
def _get_data_auth(self):
NSMAP = {'s': self.NS['s'], 'u': self.NS['u']}
FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
UID = str(uuid.uuid4())
now = datetime.utcnow()
date_created = now.strftime(FORMAT)
date_expires = (now + timedelta(seconds=300)).strftime(FORMAT)
node_name = f"{{{self.NS['s']}}}Envelope"
root = ET.Element(node_name, nsmap=NSMAP)
node_name = f"{{{self.NS['s']}}}Header"
header = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['o']}}}Security"
nsmap = {'o': self.NS['o']}
attr_name = f"{{{self.NS['s']}}}mustUnderstand"
attr = {attr_name: '1'}
security = ET.SubElement(header, node_name, attr, nsmap=nsmap)
node_name = f"{{{self.NS['u']}}}Timestamp"
attr_name = f"{{{self.NS['u']}}}Id"
attr = {attr_name: '_0'}
timestamp = ET.SubElement(security, node_name, attr)
node_name = f"{{{self.NS['u']}}}Created"
ET.SubElement(timestamp, node_name).text = date_created
node_name = f"{{{self.NS['u']}}}Expires"
ET.SubElement(timestamp, node_name).text = date_expires
node_name = f"{{{self.NS['o']}}}BinarySecurityToken"
attr = {
f"{{{self.NS['u']}}}Id": UID,
'ValueType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3',
'EncodingType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary'
}
ET.SubElement(security, node_name, attr).text = self._cert.cer_txt
nsmap = {None: 'http://www.w3.org/2000/09/xmldsig#'}
signature = ET.SubElement(security, 'Signature', nsmap=nsmap)
signedinfo = ET.SubElement(signature, 'SignedInfo')
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
ET.SubElement(signedinfo, 'CanonicalizationMethod', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
ET.SubElement(signedinfo, 'SignatureMethod', attr)
attr = {'URI': '#_0'}
reference = ET.SubElement(signedinfo, 'Reference', attr)
transforms = ET.SubElement(reference, 'Transforms')
ET.SubElement(transforms, 'Transform', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
ET.SubElement(reference, 'DigestMethod', attr)
dvalue = ET.tostring(timestamp, method='c14n', exclusive=1)
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
ET.SubElement(reference, 'DigestValue').text = dvalue
signature_value = ET.tostring(signedinfo, method='c14n', exclusive=1)
signature_value = self._cert.sign_sha1(signature_value)
ET.SubElement(signature, 'SignatureValue').text = signature_value
keyinfo = ET.SubElement(signature, 'KeyInfo')
node_name = f"{{{self.NS['o']}}}SecurityTokenReference"
security_token = ET.SubElement(keyinfo, node_name)
node_name = f"{{{self.NS['o']}}}Reference"
attr = {
'ValueType': 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3',
'URI': f'#{UID}',
}
ET.SubElement(security_token, node_name, attr)
node_name = f"{{{self.NS['s']}}}Body"
body = ET.SubElement(root, node_name)
ET.SubElement(body, 'Autentica', nsmap=self.NS_RESULT)
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
soap = ET.tostring(root)
return soap
def _get_token(self):
headers = self.HEADERS.copy()
headers['SOAPAction'] = self.ACTIONS['AUTH']
data = self._get_data_auth()
response = httpx.post(self.URL['AUTH'], data=data, headers=headers)
if response.status_code != httpx.codes.OK:
self._error = f'Status: {response.status_code} - {response.text}'
return
result = ET.fromstring(response.text)
nsmap = {'s': self.NS['s'], None: self.XMLNS}
node_name = 's:Body/AutenticaResponse/AutenticaResult'
token = result.find(node_name, namespaces=nsmap).text
# ~ print(f'Token: {token}')
return token
def _get_data_req(self, args):
# ~ NSMAP = {'s': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
# ~ NSMAP = {'s': self.NS['s'], 'xd': self.NS['xd']}
NSMAP = {'s': self.NS['s']}
FORMAT = '%Y-%m-%dT%H:%M:%S'
date_start = args['date_start']
date_end = args['date_end']
msg = f'Descarga desde: {date_start} hasta: {date_end}'
print(msg)
node_name = f"{{{self.NS['s']}}}Envelope"
root = ET.Element(node_name, nsmap=NSMAP)
node_name = f"{{{self.NS['s']}}}Header"
header = ET.SubElement(root, node_name)
node_name = 'ActivityId'
attr = {'CorrelationId': '806aad0d-ef46-443b-9741-040c8e8e8c7d'}
nsmap = {None: 'http://schemas.microsoft.com/2004/09/ServiceModel/Diagnostics'}
activity = ET.SubElement(header, node_name, attr, nsmap=nsmap)
activity.text = 'e906cfb4-f706-43de-94d0-5cc935be1aaa'
node_name = f"{{{self.NS['s']}}}Body"
nsmap = {
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'xsd': 'http://www.w3.org/2001/XMLSchema',
}
body = ET.SubElement(root, node_name, nsmap=nsmap)
# ~ node_name = f"{{{self.NS['des']}}}SolicitaDescarga"
node_name = "SolicitaDescarga"
nsmap = {None: 'http://DescargaMasivaTerceros.sat.gob.mx'}
request_down = ET.SubElement(body, node_name, nsmap=nsmap)
# ~ node_name = f"{{{self.NS['des']}}}solicitud"
node_name = "solicitud"
type_request = 'CFDI'
if args['metadata']:
type_request = 'Metadata'
attr = {
'FechaInicial': date_start.strftime(FORMAT),
'FechaFinal': date_end.strftime(FORMAT),
'RfcSolicitante': self._cert.rfc,
'TipoSolicitud': type_request,
# ~ 'RfcACuentaTerceros': '',
}
if args['rfc'] == 'RfcEmisor':
attr['RfcEmisor'] = self._cert.rfc
request = ET.SubElement(request_down, node_name, attr)
node_name = 'RfcReceptores'
node_receptores = ET.SubElement(request, node_name)
node_name = 'RfcReceptor'
node_receptor = ET.SubElement(node_receptores, node_name)
if args['rfc'] == 'RfcReceptor':
node_receptor.text = self._cert.rfc
nsmap = {None: self.NS['xd']}
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
node_name = 'CanonicalizationMethod'
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
canonicalization = ET.SubElement(signed_info, node_name, attr1)
node_name = 'SignatureMethod'
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
signature_method = ET.SubElement(signed_info, node_name, attr)
# ~ attr = {'URI': '#_0'}
attr = {'URI': ''}
reference = ET.SubElement(signed_info, 'Reference', attr)
transforms = ET.SubElement(reference, 'Transforms')
ET.SubElement(transforms, 'Transform', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
ET.SubElement(reference, 'DigestMethod', attr)
digest_value = ET.SubElement(reference, 'DigestValue')
signature_value = ET.SubElement(signature, 'SignatureValue')
key_info = ET.SubElement(signature, 'KeyInfo')
x_data = ET.SubElement(key_info, 'X509Data')
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName')
x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber')
x_cert = ET.SubElement(x_data, 'X509Certificate')
dvalue = ET.tostring(request_down, method='c14n', exclusive=1)
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
digest_value.text = dvalue
sign = ET.tostring(signed_info, method='c14n', exclusive=1)
sign = self._cert.sign_sha1(sign)
signature_value.text = sign
x_issuer.text = self._cert.issuer
x_serial_number.text = str(self._cert.serial_number2)
x_cert.text = self._cert.cer_txt
# ~ soap = b'<?xml version="1.0"?>\n' + ET.tostring(root, pretty_print=True, encoding='utf-8')
soap = ET.tostring(root, pretty_print=True)
# ~ print(soap.decode())
return soap
def request_download(self, args):
headers = self.HEADERS.copy()
headers['SOAPAction'] = self.ACTIONS['REQ']
headers['Host'] = 'srvsolicituddescargamaster.cloudapp.net'
headers['Authorization'] = f'WRAP access_token="{self._token}"'
data = self._get_data_req(args)
try:
response = httpx.post(self.URL['REQ'],
data=data, headers=headers, timeout=TIMEOUT)
except httpx.TimeoutException as exc:
print(exc)
return
if response.status_code != httpx.codes.OK:
self._error = f'Status: {response.status_code} - {response.text}'
return
result = ET.fromstring(response.text)
node_name = 's:Body/SolicitaDescargaResponse/SolicitaDescargaResult'
node = result.find(node_name, namespaces=self.NS_RESULT2)
data = dict(node.attrib)
return data
def _get_data_verify(self, args):
NSMAP = {'soapenv': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
node_name = f"{{{self.NS['s']}}}Envelope"
root = ET.Element(node_name, nsmap=NSMAP)
node_name = f"{{{self.NS['s']}}}Header"
body = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['s']}}}Body"
body = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['des']}}}VerificaSolicitudDescarga"
verify_download = ET.SubElement(body, node_name)
node_name = f"{{{self.NS['des']}}}solicitud"
attr = {
'IdSolicitud': args['id'],
'RfcSolicitante': self._cert.rfc,
}
request = ET.SubElement(verify_download, node_name, attr)
nsmap = {None: self.NS['xd']}
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
node_name = 'CanonicalizationMethod'
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
canonicalization = ET.SubElement(signed_info, node_name, attr1)
node_name = 'SignatureMethod'
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
signature_method = ET.SubElement(signed_info, node_name, attr)
# ~ attr = {'URI': '#_0'}
attr = {'URI': ''}
reference = ET.SubElement(signed_info, 'Reference', attr)
transforms = ET.SubElement(reference, 'Transforms')
ET.SubElement(transforms, 'Transform', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
ET.SubElement(reference, 'DigestMethod', attr)
digest_value = ET.SubElement(reference, 'DigestValue')
signature_value = ET.SubElement(signature, 'SignatureValue')
key_info = ET.SubElement(signature, 'KeyInfo')
x_data = ET.SubElement(key_info, 'X509Data')
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName')
x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber')
x_cert = ET.SubElement(x_data, 'X509Certificate')
dvalue = ET.tostring(verify_download, method='c14n', exclusive=1)
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
digest_value.text = dvalue
sign = ET.tostring(signed_info, method='c14n', exclusive=1)
sign = self._cert.sign_sha1(sign)
signature_value.text = sign
x_issuer.text = self._cert.issuer
x_serial_number.text = str(self._cert.serial_number2)
x_cert.text = self._cert.cer_txt
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
soap = ET.tostring(root)
return soap
def verify(self, args):
headers = self.HEADERS.copy()
headers['SOAPAction'] = self.ACTIONS['VSD']
headers['Authorization'] = f'WRAP access_token="{self._token}"'
data = self._get_data_verify(args)
response = httpx.post(self.URL['VSD'], data=data, headers=headers)
if response.status_code != httpx.codes.OK:
self._error = f'Status: {response.status_code} - {response.text}'
return
result = ET.fromstring(response.text)
node_name = 's:Body/VerificaSolicitudDescargaResponse/VerificaSolicitudDescargaResult'
node = result.find(node_name, namespaces=self.NS_RESULT2)
data = dict(node.attrib)
data['files'] = [n.text for n in node]
return data
def _get_data_download(self, args):
NSMAP = {'soapenv': self.NS['s'], 'des': self.NS['des'], 'xd': self.NS['xd']}
node_name = f"{{{self.NS['s']}}}Envelope"
root = ET.Element(node_name, nsmap=NSMAP)
node_name = f"{{{self.NS['s']}}}Header"
body = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['s']}}}Body"
body = ET.SubElement(root, node_name)
node_name = f"{{{self.NS['des']}}}PeticionDescargaMasivaTercerosEntrada"
request_download = ET.SubElement(body, node_name)
node_name = f"{{{self.NS['des']}}}peticionDescarga"
attr = {
'IdPaquete': args['id_file'],
'RfcSolicitante': self._cert.rfc,
}
request = ET.SubElement(request_download, node_name, attr)
nsmap = {None: self.NS['xd']}
signature = ET.SubElement(request, 'Signature', nsmap=nsmap)
signed_info = ET.SubElement(signature, 'SignedInfo', nsmap=nsmap)
node_name = 'CanonicalizationMethod'
attr1 = {'Algorithm': 'http://www.w3.org/2001/10/xml-exc-c14n#'}
canonicalization = ET.SubElement(signed_info, node_name, attr1)
node_name = 'SignatureMethod'
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#rsa-sha1'}
signature_method = ET.SubElement(signed_info, node_name, attr)
# ~ attr = {'URI': '#_0'}
attr = {'URI': ''}
reference = ET.SubElement(signed_info, 'Reference', attr)
transforms = ET.SubElement(reference, 'Transforms')
ET.SubElement(transforms, 'Transform', attr1)
attr = {'Algorithm': 'http://www.w3.org/2000/09/xmldsig#sha1'}
ET.SubElement(reference, 'DigestMethod', attr)
digest_value = ET.SubElement(reference, 'DigestValue')
signature_value = ET.SubElement(signature, 'SignatureValue')
key_info = ET.SubElement(signature, 'KeyInfo')
x_data = ET.SubElement(key_info, 'X509Data')
x_issuer_serial = ET.SubElement(x_data, 'X509IssuerSerial')
x_issuer = ET.SubElement(x_issuer_serial, 'X509IssuerName')
x_serial_number = ET.SubElement(x_issuer_serial, 'X509SerialNumber')
x_cert = ET.SubElement(x_data, 'X509Certificate')
dvalue = ET.tostring(request_download, method='c14n', exclusive=1)
dvalue = base64.b64encode(hashlib.new('sha1', dvalue).digest())
digest_value.text = dvalue
sign = ET.tostring(request_download, method='c14n', exclusive=1)
sign = self._cert.sign_sha1(sign)
signature_value.text = sign
x_issuer.text = self._cert.issuer
x_serial_number.text = str(self._cert.serial_number2)
x_cert.text = self._cert.cer_txt
# ~ soap = ET.tostring(root, pretty_print=True, encoding='utf-8')
soap = ET.tostring(root)
return soap
def download(self, args):
headers = self.HEADERS.copy()
headers['SOAPAction'] = self.ACTIONS['DOWN']
headers['Authorization'] = f'WRAP access_token="{self._token}"'
data = self._get_data_download(args)
try:
response = httpx.post(self.URL['DOWN'],
data=data, headers=headers, timeout=TIMEOUT)
except httpx.TimeoutException as exc:
print(exc)
return
if response.status_code != httpx.codes.OK:
self._error = f'Status: {response.status_code} - {response.text}'
return
result = ET.fromstring(response.text, parser=ET.XMLParser(huge_tree=True))
namespaces = self.NS_RESULT2.copy()
namespaces['h'] = 'http://DescargaMasivaTerceros.sat.gob.mx'
respuesta = result.find('s:Header/h:respuesta', namespaces=namespaces)
data = dict(respuesta.attrib)
node_name = 's:Body/RespuestaDescargaMasivaTercerosSalida/Paquete'
node = result.find(node_name, namespaces=self.NS_RESULT2)
archivo = None
if not node.text is None:
archivo = base64.b64decode(node.text)
return data, archivo