From 4ee0aabbffbc2fca971e2494a5ae1812f87eafd1 Mon Sep 17 00:00:00 2001 From: el Mau Date: Fri, 20 Jan 2023 23:20:34 -0600 Subject: [PATCH] Login in portal SAT --- source/sat/cfdi_cert.py | 8 +- source/sat/sat.py | 169 ++++++++++++++++++++++++++++++++++++++++ source/sat/util.py | 6 +- 3 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 source/sat/sat.py diff --git a/source/sat/cfdi_cert.py b/source/sat/cfdi_cert.py index 3310cea..7b62c0e 100644 --- a/source/sat/cfdi_cert.py +++ b/source/sat/cfdi_cert.py @@ -38,6 +38,7 @@ class SATCertificate(object): self._cer_modulus = 0 self._key_modulus = 0 self._issuer = '' + self._fert = '' return def __str__(self): @@ -62,6 +63,7 @@ class SATCertificate(object): self._not_before = obj.not_valid_before self._not_after = obj.not_valid_after self._issuer = ','.join([i.rfc4514_string() for i in obj.issuer]) + self._fert = self._not_after.strftime('%y%m%d%H%M%SZ') now = datetime.datetime.utcnow() self._is_valid_time = (now > self.not_before) and (now < self.not_after) @@ -123,6 +125,10 @@ class SATCertificate(object): def not_after(self): return self._not_after + @property + def fert(self): + return self._fert + @property def is_fiel(self): return self._is_fiel @@ -215,4 +221,4 @@ class SATCertificate(object): sign = private_key.sign(data.encode(), padding.PKCS1v15(), type_hash) del password del private_key - return base64.b64encode(sign).decode() + return base64.b64encode(sign) diff --git a/source/sat/sat.py b/source/sat/sat.py new file mode 100644 index 0000000..d2b4c35 --- /dev/null +++ b/source/sat/sat.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python + +import asyncio +import base64 +import logging +import ssl + +import aiohttp +import lxml.html + + +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) + + +BROWSER = 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/109.0' +HOST = 'cfdiau.sat.gob.mx' +REFERER = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATUPCFDiCon&sid=0&option=credential&sid=0' +URL_MAIN = 'https://portalcfdi.facturaelectronica.sat.gob.mx/' +URL = { + 'LOGIN': 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0', + 'CONSULTA': URL_MAIN + 'Consulta.aspx', + 'LOGOUT': URL_MAIN + 'logout.aspx?salir=y', +} + + +ssl_context = ssl.create_default_context() +ssl_context.set_ciphers('HIGH:!DH:!aNULL') + + +async def _get(client, url, headers={}): + async with client.get(url, headers=headers, ssl=ssl_context) as r: + # ~ assert r.status == 200 + return await r.text() + + +async def _post(client, url, data={}, headers={}): + async with client.post(url, data=data, headers=headers, ssl=ssl_context) as r: + # ~ assert r.status == 200 + return await r.text() + + +def _get_headers(host, referer, ajax=False): + headers = { + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Accept-Language': 'en-US,en;q=0.5', + 'Connection': 'keep-alive', + 'DNT': '1', + 'Host': host, + 'Referer': referer, + 'Upgrade-Insecure-Requests': '1', + 'User-Agent': BROWSER, + 'Content-Type': 'application/x-www-form-urlencoded', + } + if ajax: + headers.update({ + 'Cache-Control': 'no-cache', + 'X-MicrosoftAjax': 'Delta=true', + 'x-requested-with': 'XMLHttpRequest', + 'Pragma': 'no-cache', + }) + return headers + + +def _get_data_form(html): + tree = lxml.html.fromstring(html) + data = {} + for tag in tree.xpath("//input[@type='hidden']"): + if 'name' in tag.attrib and 'value' in tag.attrib: + data[tag.attrib['name']] = tag.attrib['value'] + return data + + +def _get_post_type_search(html, emitidas=False): + tipo_busqueda = 'RdoTipoBusquedaReceptor' + if emitidas: + tipo_busqueda = 'RdoTipoBusquedaEmisor' + sm = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda' + + post = _get_data_form(html) + post['ctl00$MainContent$TipoBusqueda'] = tipo_busqueda + post['__ASYNCPOST'] = 'true' + post['__EVENTTARGET'] = '' + post['__EVENTARGUMENT'] = '' + post['ctl00$ScriptManager1'] = sm + return post + + +async def _login(client, cert): + msg = 'Init login...' + log.info(msg) + + async with client.get(URL_MAIN, ssl=ssl_context) as response: + url = str(response.url) + + headers = {'Host': 'cfdiau.sat.gob.mx'} + html = await _get(client, url, headers) + + headers = {'User-Agent': BROWSER, 'Referer': url} + html = await _post(client, URL['LOGIN'], headers=headers) + + tree = lxml.html.fromstring(html) + values = {} + for tag in tree.xpath('//input'): + if 'id' in tag.attrib and 'value' in tag.attrib: + values[tag.attrib['id']] = tag.attrib['value'] + + co = f"{values['tokenuuid']}|{cert.rfc}|{cert.serial_number_str}" + firma = base64.b64encode(cert.sign(co)).decode('utf-8') + co = base64.b64encode(co.encode('utf-8')).decode('utf-8') + data = '{}#{}'.format(co, firma).encode('utf-8') + token = base64.b64encode(data).decode('utf-8') + + keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet') + data = {k: values[k] for k in keys} + data['fert'] = cert.fert + data['token'] = token + + headers = _get_headers(HOST, REFERER) + html = await _post(client, URL['LOGIN'], data, headers) + + if not html: + msg = 'Error al identificarse en el SAT' + log.error(msg) + return + + data = _get_data_form(html) + + html = await _post(client, URL_MAIN, data) + data = _get_post_type_search(html) + headers = _get_headers(HOST, URL_MAIN) + + html = await _post(client, URL['CONSULTA'], data, headers) + + msg = '\tLogin Ok...' + log.info(msg) + + return + + +async def _logout(client): + html = await _get(client, URL['LOGOUT']) + msg = 'Logout Ok...' + log.info(msg) + return + + +async def _search(client, data): + print(data) + return + + +async def _main(data, cert): + async with aiohttp.ClientSession() as client: + await _login(client, cert) + await _search(client, data) + await _logout(client) + + +def portal_sat(data, cert): + loop = asyncio.get_event_loop() + loop.run_until_complete(_main(data, cert)) + return diff --git a/source/sat/util.py b/source/sat/util.py index 7b7778d..a8ce691 100644 --- a/source/sat/util.py +++ b/source/sat/util.py @@ -6,6 +6,7 @@ import logging from pathlib import Path from .cfdi_cert import SATCertificate +from .sat import portal_sat LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' @@ -104,15 +105,16 @@ def _validate_arguments(args): if cert is None: return {}, None - data = {'path_dowload': ''} + data = {'ok': True} return data, cert + def download(args): data, cert = _validate_arguments(args) if not data: return - print(data) + portal_sat(data, cert) return