Login and logout refactorice

This commit is contained in:
El Mau 2024-04-05 22:02:20 -06:00
parent 910cd0bedb
commit f964e4120b
9 changed files with 369 additions and 73 deletions

View File

@ -1,5 +1,10 @@
# Lista de cambios
## v 1.0.0 [00-abr-24]
---
* Se refactoriza la descarga
## v 0.6.0 [27-Mar-22]
---
* Fix - Cambios del SAT

View File

@ -1 +1 @@
0.6.0
1.0.0

View File

@ -1,4 +1,3 @@
httpx
peewee
cryptography==3.4.8
lxml
cryptography==42.0.5
requests
lxml

View File

@ -1,86 +1,93 @@
#!/usr/bin/env python3
import argparse
from sat import util
from util import app
def main(args):
if args.fiel_validar:
util.fiel_validar(args)
if args.descarga_web:
app.download_web(args)
return
if args.solicitar_descarga:
util.solicitar_descarga(args)
return
# ~ if args.fiel_validar:
# ~ util.fiel_validar(args)
# ~ return
if args.verificar_descarga:
util.verificar_descarga(args)
return
# ~ if args.solicitar_descarga:
# ~ util.solicitar_descarga(args)
# ~ return
if args.descargar_archivos:
util.descargar_archivos(args)
return
# ~ if args.verificar_descarga:
# ~ util.verificar_descarga(args)
# ~ return
util.descargar(args)
# ~ if args.descargar_archivos:
# ~ util.descargar_archivos(args)
# ~ return
# ~ util.descargar(args)
return
def _process_command_line_arguments():
now = util.today()
year = now.year
parser = argparse.ArgumentParser(description='CFDI Descarga SAT')
help = 'Valida la FIEL'
parser.add_argument('-fv', '--fiel-validar', help=help,
# ~ help = 'Valida la FIEL'
# ~ parser.add_argument('-fv', '--fiel-validar', help=help,
# ~ action='store_true', default=False, required=False)
# ~ help = "Descargar por Tipo: t=todos(default), e=emitidas, r=recibidas"
# ~ parser.add_argument('-t', '--tipo', help=help,
# ~ dest='type', default='t', choices=['t', 'e', 'r'])
# ~ help = "Año de la descarga entre 2014 y el año actual (predeterminado)."
# ~ parser.add_argument('-a', '--año', help=help,
# ~ dest='year', default=year, type=int, choices=range(2014, year+1))
# ~ help = "Mes de la descarga, el mes actual es el predeterminado"
# ~ parser.add_argument('-m', '--mes', help=help,
# ~ dest='month', default=0, type=int, choices=range(13))
# ~ help = "Día de la descarga, de forma predeterminada no se usa"
# ~ parser.add_argument('-d', '--dia', help=help,
# ~ dest='day', default=0, type=int, choices=range(31))
# ~ help = "Intervalo de días a partir de la fecha actual y hacia a atras"
# ~ parser.add_argument('-ud', '--ultimos-dias', help=help,
# ~ dest='last_days', default=0, type=int, choices=range(30))
# ~ help = "Fecha inicial AAAA-MM-DD"
# ~ parser.add_argument('-fi', '--fecha-inicial', help=help,
# ~ dest='date_start', default='')
# ~ help = "Fecha final AAAA-MM-DD"
# ~ parser.add_argument('-ff', '--fecha-final', help=help,
# ~ dest='date_end', default='')
# ~ help = 'Solicitar descarga'
# ~ parser.add_argument('-sd', '--solicitar-descarga', help=help,
# ~ action='store_true', default=False, required=False)
# ~ help = 'Verificar estatus de descarga'
# ~ parser.add_argument('-vd', '--verificar-descarga', help=help,
# ~ action='store_true', default=False, required=False)
# ~ help = 'Descargar archivos'
# ~ parser.add_argument('-da', '--descargar-archivos', help=help,
# ~ action='store_true', default=False, required=False)
# ~ help = 'ID de solicitud'
# ~ parser.add_argument('-id', '--id-solicitud', dest='id_request',
# ~ help=help, default='')
# ~ help = 'ID archivo'
# ~ parser.add_argument('-ida', '--id-archivo', dest='id_file',
# ~ help=help, default='')
# ~ help = 'Ruta de descarga de archivos'
# ~ parser.add_argument('-dd', '--directorio-descargas', dest='path_download',
# ~ help=help, default='')
help = 'Descargar de la página web'
parser.add_argument('-dw', '--descarga-web', help=help,
action='store_true', default=False, required=False)
help = 'Ruta al directorio con la FIEL'
parser.add_argument('-fd', '--fiel-dir', help=help, default='')
parser.add_argument('-fd', '--fiel-directorio', help=help, default='')
help = "Nombre de los archivos FIEL, el predeterminado es 'fiel'"
parser.add_argument('-fn', '--fiel-nombre', help=help, default='fiel')
help = "Descargar por Tipo: t=todos(default), e=emitidas, r=recibidas"
parser.add_argument('-t', '--tipo', help=help,
dest='type', default='t', choices=['t', 'e', 'r'])
help = "Año de la descarga entre 2014 y el año actual (predeterminado)."
parser.add_argument('-a', '--año', help=help,
dest='year', default=year, type=int, choices=range(2014, year+1))
help = "Mes de la descarga, el mes actual es el predeterminado"
parser.add_argument('-m', '--mes', help=help,
dest='month', default=0, type=int, choices=range(13))
help = "Día de la descarga, de forma predeterminada no se usa"
parser.add_argument('-d', '--dia', help=help,
dest='day', default=0, type=int, choices=range(31))
help = "Intervalo de días a partir de la fecha actual y hacia a atras"
parser.add_argument('-ud', '--ultimos-dias', help=help,
dest='last_days', default=0, type=int, choices=range(30))
help = "Fecha inicial AAAA-MM-DD"
parser.add_argument('-fi', '--fecha-inicial', help=help,
dest='date_start', default='')
help = "Fecha final AAAA-MM-DD"
parser.add_argument('-ff', '--fecha-final', help=help,
dest='date_end', default='')
help = 'Solicitar descarga'
parser.add_argument('-sd', '--solicitar-descarga', help=help,
action='store_true', default=False, required=False)
help = 'Verificar estatus de descarga'
parser.add_argument('-vd', '--verificar-descarga', help=help,
action='store_true', default=False, required=False)
help = 'Descargar archivos'
parser.add_argument('-da', '--descargar-archivos', help=help,
action='store_true', default=False, required=False)
help = 'ID de solicitud'
parser.add_argument('-id', '--id-solicitud', dest='id_request',
help=help, default='')
help = 'ID archivo'
parser.add_argument('-ida', '--id-archivo', dest='id_file',
help=help, default='')
help = 'Ruta de descarga de archivos'
parser.add_argument('-dd', '--directorio-descargas', dest='path_download',
help=help, default='')
help = 'Descargar solo metadatos'
parser.add_argument('-md', '--metadata', help=help,
action='store_true', default=False, required=False)

View File

@ -1,7 +0,0 @@
#!/usr/bin/env python
DEBUG = False
# ~ Este valor se usa para cifrar la FIEL
# ~ Si la cambias en producción, debes de validar de nuevo las FIELs
TOKEN = ''

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import logging
from conf import DEBUG
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
@ -12,3 +11,5 @@ logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
TIMEOUT = 60

71
source/util/app.py Normal file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env python3
from pathlib import Path
from .sat import SATPortal
from .cfdi_cert import CertSign
from settings import log
from time import sleep
def is_dir(path):
return Path(path).is_dir()
def join(*paths):
return Path(paths[0]).joinpath(*paths[1:])
def _validate_fiel_args(args):
fiel_path = args.fiel_directorio
fiel_name = args.fiel_nombre
if not fiel_path:
msg = 'El directorio con la FIEL es requerido'
log.error(msg)
return False, {}
if not is_dir(fiel_path):
msg = f'La ruta no existe o no es un directorio. \nRuta: {fiel_path}'
log.error(msg)
return False, {}
path_cer = join(fiel_path, f'{fiel_name}.cer')
path_key = join(fiel_path, f'{fiel_name}.key')
path_enc = join(fiel_path, f'{fiel_name}.enc')
path_pem = join(fiel_path, f'{fiel_name}.pem')
if not path_cer.is_file():
msg = f'No se encontró el archivo CER. \nRuta: {path_cer}'
log.error(msg)
return False, {}
if not path_key.is_file():
msg = f'No se encontró el archivo KEY. \nRuta: {path_key}'
log.error(msg)
return False, {}
data = {
'cer': path_cer.read_bytes(),
'key': path_key.read_bytes(),
# ~ 'enc': path_enc.read_bytes(),
'pem': path_pem.read_bytes(),
}
return True, data
def download_web(args):
# ~ print(args)
result, files = _validate_fiel_args(args)
if not result:
return
fiel = CertSign(files['pem'])
fiel.get_data_cer(files['cer'])
sat = SATPortal()
if sat.login(fiel):
sleep(3)
sat.logout()
return

1
source/util/cfdi_cert.py Symbolic link
View File

@ -0,0 +1 @@
/home/elmau/Projects/cfdi/cfdi-cert/source/cfdi_cert.py

219
source/util/sat.py Normal file
View File

@ -0,0 +1,219 @@
#!/usr/bin/env python
# ~ import niquests
import base64
import requests
from requests import Session, exceptions
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from urllib3 import PoolManager
from lxml import etree
from settings import TIMEOUT, log
requests.packages.urllib3.disable_warnings()
class CipherAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=False):
ctx = create_urllib3_context(ciphers=":HIGH:!DH:!aNULL")
self.poolmanager = PoolManager(
num_pools=connections, maxsize=maxsize, block=block, ssl_context=ctx)
class SATPortal(object):
BROWSER = 'Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0'
HOST = 'cfdiau.sat.gob.mx'
REFERER = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATUPCFDiCon&sid=0&option=credential&sid=0'
URL = {
'MAIN': 'https://portalcfdi.facturaelectronica.sat.gob.mx/',
'PORTAL': 'https://portalcfdi.facturaelectronica.sat.gob.mx/',
}
URL['LOGOUT'] = f"{URL['PORTAL']}logout.aspx?salir=y"
URL['CONSULTA'] = f"{URL['PORTAL']}Consulta.aspx"
def __init__(self):
# ~ self._s = niquests.Session()
self._s = Session()
self._s.mount('https://', CipherAdapter())
def _add_headers(self, headers):
for k, v in headers.items():
self._s.headers[k] = v
return
def _get(self, url):
# ~ log.debug('URL: {}'.format(url))
try:
result = self._s.get(url, timeout=TIMEOUT)
if result.status_code != 200:
msg = f'Code: {result.status_code}. URL: {url}'
log.error(msg)
return ''
except exceptions.Timeout:
msg = 'Tiempo de espera agotado'
self.not_network = True
log.error(msg)
self.error = msg
return ''
except exceptions.ConnectionError:
msg = 'Revisa la conexión a Internet'
self.not_network = True
log.error(msg)
self.error = msg
return ''
return result.text
def _post(self, url, data={}):
# ~ log.debug('URL: {}'.format(url))
try:
result = self._s.post(url, data=data, timeout=TIMEOUT)
if result.status_code != 200:
msg = f'Code: {result.status_code}. URL: {url}'
log.error(msg)
return ''
except exceptions.Timeout:
msg = 'Tiempo de espera agotado'
self.not_network = True
log.error(msg)
self.error = msg
return ''
except exceptions.ConnectionError:
msg = 'Revisa la conexión a Internet'
self.not_network = True
log.error(msg)
self.error = msg
return ''
return result.text
def _read_form(self, html, login=False):
values = {}
parser = etree.HTMLParser()
tree = etree.fromstring(html, parser)
if login:
for node in tree.xpath('//input'):
try:
values[node.attrib['id']] = node.attrib['value']
except:
pass
else:
for node in tree.xpath('//input | //select'):
a = node.attrib
if a.get('type', '') and a['type'] == 'hidden':
if 'name' in a and 'value' in a:
values[a['name']] = a['value']
return values
def _get_headers(self, host, referer, ajax=False):
acept = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
headers = {
'Accept': acept,
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'DNT': '1',
'Host': host,
'Referer': referer,
'Upgrade-Insecure-Requests': '1',
'User-Agent': self.BROWSER,
'Content-Type': 'application/x-www-form-urlencoded',
}
if ajax:
headers.update({
'Cache-Control': 'no-cache',
'X-MicrosoftAjax': 'Delta=true',
'x-requested-with': 'XMLHttpRequest',
'Pragma': 'no-cache',
})
return headers
def _get_token(self, firma, co):
co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
data = f'{co}#{firma}'.encode('utf-8')
token = base64.b64encode(data).decode('utf-8')
return token
def _sign(self, values, fiel):
fert = f'{fiel.cer.not_after:%y%m%d%H%M%SZ}'
co = f"{values['tokenuuid']}|{fiel.cer.rfc}|{fiel.cer.serial_number2}"
firma = base64.b64encode(fiel.sign(co).encode()).decode('utf-8')
token = self._get_token(firma, co)
keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
data = {k: values[k] for k in keys}
data['fert'] = fert
data['token'] = token
data['arc'] = ''
data['placer'] = ''
data['secuence'] = ''
data['seeder'] = ''
data['tan'] = ''
return data
def login(self, fiel):
HOST = 'cfdicontribuyentes.accesscontrol.windows.net'
REFERER = 'https://cfdiau.sat.gob.mx/nidp/wsfed/ep?id=SATUPCFDiCon&sid=0&option=credential&sid=0'
URL_LOGIN = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0'
result = self._s.get(self.URL['MAIN'])
url_redirect = result.history[-1].headers['Location']
self._add_headers({'Host': self.HOST})
result = self._get(url_redirect)
headers = {
'User-Agent': self.BROWSER,
'Referer': f'{REFERER}{url_redirect}'}
self._add_headers(headers)
result = self._post(URL_LOGIN)
values = self._read_form(result, True)
data = self._sign(values, fiel)
headers = self._get_headers(self.HOST, self.REFERER)
self._add_headers(headers)
result = self._post(URL_LOGIN, data)
if not result:
msg = 'Error al identificarse en el SAT'
log.error(msg)
return False
data = self._read_form(result)
# Inicio
result = self._post(self.URL['MAIN'], data)
data = self._get_post_type_search(result)
headers = self._get_headers(self.HOST, self.URL['MAIN'])
# Consulta
self._add_headers(headers)
result = self._post(self.URL['CONSULTA'], data)
msg = 'Se ha identificado en el SAT'
log.info(msg)
self.is_connect = True
return True
def _get_post_type_search(self, html, emitidas=False):
tipo_busqueda = 'RdoTipoBusquedaReceptor'
if emitidas:
tipo_busqueda = 'RdoTipoBusquedaEmisor'
sm = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda'
post = self._read_form(html)
post['ctl00$MainContent$TipoBusqueda'] = tipo_busqueda
post['__ASYNCPOST'] = 'true'
post['__EVENTTARGET'] = ''
post['__EVENTARGUMENT'] = ''
post['ctl00$ScriptManager1'] = sm
return post
def logout(self):
msg = 'Cerrando sessión en el SAT'
log.debug(msg)
result = self._get(self.URL['LOGOUT'])
self.is_connect = False
msg = 'Sesión cerrada en el SAT'
log.info(msg)
return