Login in portal SAT

This commit is contained in:
el Mau 2023-01-20 23:20:34 -06:00
parent da889c9af5
commit 4ee0aabbff
3 changed files with 180 additions and 3 deletions

View File

@ -38,6 +38,7 @@ class SATCertificate(object):
self._cer_modulus = 0
self._key_modulus = 0
self._issuer = ''
self._fert = ''
return
def __str__(self):
@ -62,6 +63,7 @@ class SATCertificate(object):
self._not_before = obj.not_valid_before
self._not_after = obj.not_valid_after
self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer])
self._fert = self._not_after.strftime('%y%m%d%H%M%SZ')
now = datetime.datetime.utcnow()
self._is_valid_time = (now > self.not_before) and (now < self.not_after)
@ -123,6 +125,10 @@ class SATCertificate(object):
def not_after(self):
return self._not_after
@property
def fert(self):
return self._fert
@property
def is_fiel(self):
return self._is_fiel
@ -215,4 +221,4 @@ class SATCertificate(object):
sign = private_key.sign(data.encode(), padding.PKCS1v15(), type_hash)
del password
del private_key
return base64.b64encode(sign).decode()
return base64.b64encode(sign)

169
source/sat/sat.py Normal file
View File

@ -0,0 +1,169 @@
#!/usr/bin/env python
import asyncio
import base64
import logging
import ssl
import aiohttp
import lxml.html
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
BROWSER = 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/109.0'
HOST = 'cfdiau.sat.gob.mx'
REFERER = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATUPCFDiCon&sid=0&option=credential&sid=0'
URL_MAIN = 'https://portalcfdi.facturaelectronica.sat.gob.mx/'
URL = {
'LOGIN': 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0',
'CONSULTA': URL_MAIN + 'Consulta.aspx',
'LOGOUT': URL_MAIN + 'logout.aspx?salir=y',
}
ssl_context = ssl.create_default_context()
ssl_context.set_ciphers('HIGH:!DH:!aNULL')
async def _get(client, url, headers={}):
async with client.get(url, headers=headers, ssl=ssl_context) as r:
# ~ assert r.status == 200
return await r.text()
async def _post(client, url, data={}, headers={}):
async with client.post(url, data=data, headers=headers, ssl=ssl_context) as r:
# ~ assert r.status == 200
return await r.text()
def _get_headers(host, referer, ajax=False):
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'DNT': '1',
'Host': host,
'Referer': referer,
'Upgrade-Insecure-Requests': '1',
'User-Agent': BROWSER,
'Content-Type': 'application/x-www-form-urlencoded',
}
if ajax:
headers.update({
'Cache-Control': 'no-cache',
'X-MicrosoftAjax': 'Delta=true',
'x-requested-with': 'XMLHttpRequest',
'Pragma': 'no-cache',
})
return headers
def _get_data_form(html):
tree = lxml.html.fromstring(html)
data = {}
for tag in tree.xpath("//input[@type='hidden']"):
if 'name' in tag.attrib and 'value' in tag.attrib:
data[tag.attrib['name']] = tag.attrib['value']
return data
def _get_post_type_search(html, emitidas=False):
tipo_busqueda = 'RdoTipoBusquedaReceptor'
if emitidas:
tipo_busqueda = 'RdoTipoBusquedaEmisor'
sm = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda'
post = _get_data_form(html)
post['ctl00$MainContent$TipoBusqueda'] = tipo_busqueda
post['__ASYNCPOST'] = 'true'
post['__EVENTTARGET'] = ''
post['__EVENTARGUMENT'] = ''
post['ctl00$ScriptManager1'] = sm
return post
async def _login(client, cert):
msg = 'Init login...'
log.info(msg)
async with client.get(URL_MAIN, ssl=ssl_context) as response:
url = str(response.url)
headers = {'Host': 'cfdiau.sat.gob.mx'}
html = await _get(client, url, headers)
headers = {'User-Agent': BROWSER, 'Referer': url}
html = await _post(client, URL['LOGIN'], headers=headers)
tree = lxml.html.fromstring(html)
values = {}
for tag in tree.xpath('//input'):
if 'id' in tag.attrib and 'value' in tag.attrib:
values[tag.attrib['id']] = tag.attrib['value']
co = f"{values['tokenuuid']}|{cert.rfc}|{cert.serial_number_str}"
firma = base64.b64encode(cert.sign(co)).decode('utf-8')
co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
data = '{}#{}'.format(co, firma).encode('utf-8')
token = base64.b64encode(data).decode('utf-8')
keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
data = {k: values[k] for k in keys}
data['fert'] = cert.fert
data['token'] = token
headers = _get_headers(HOST, REFERER)
html = await _post(client, URL['LOGIN'], data, headers)
if not html:
msg = 'Error al identificarse en el SAT'
log.error(msg)
return
data = _get_data_form(html)
html = await _post(client, URL_MAIN, data)
data = _get_post_type_search(html)
headers = _get_headers(HOST, URL_MAIN)
html = await _post(client, URL['CONSULTA'], data, headers)
msg = '\tLogin Ok...'
log.info(msg)
return
async def _logout(client):
html = await _get(client, URL['LOGOUT'])
msg = 'Logout Ok...'
log.info(msg)
return
async def _search(client, data):
print(data)
return
async def _main(data, cert):
async with aiohttp.ClientSession() as client:
await _login(client, cert)
await _search(client, data)
await _logout(client)
def portal_sat(data, cert):
loop = asyncio.get_event_loop()
loop.run_until_complete(_main(data, cert))
return

View File

@ -6,6 +6,7 @@ import logging
from pathlib import Path
from .cfdi_cert import SATCertificate
from .sat import portal_sat
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
@ -104,15 +105,16 @@ def _validate_arguments(args):
if cert is None:
return {}, None
data = {'path_dowload': ''}
data = {'ok': True}
return data, cert
def download(args):
data, cert = _validate_arguments(args)
if not data:
return
print(data)
portal_sat(data, cert)
return