Download for UUID

This commit is contained in:
el Mau 2023-01-17 23:00:39 -06:00
parent c20d979e10
commit 33ee45af63
5 changed files with 594 additions and 6 deletions

3
.gitignore vendored
View File

@ -161,3 +161,6 @@ cython_debug/
#.idea/
portal_sat.bk.py
conf.py

View File

@ -1,3 +1,3 @@
cryptography
falcon
requests
pyOpenSSL

8
source/conf.py.example Normal file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env python
# ~ Ruta al directorio con las FIEL, dentro de este, se busca
# ~ la carpeta por RFC en minúsculas.
RUTA_FIEL = ''
# ~ Nombre predeterminado de los archivos FIEL
NOMBRE_FIEL = 'fiel'

View File

@ -1,9 +1,313 @@
#!/usr/bin/env python
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
import base64
import datetime
import logging
from html.parser import HTMLParser
from uuid import UUID
from OpenSSL import crypto
import requests
from requests import Session, exceptions, adapters
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL'
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_DATE = '%d/%m/%Y %H:%M:%S'
logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m')
logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m')
logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m')
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE)
log = logging.getLogger(__name__)
TIMEOUT = 10
VERIFY_CERT = True
class FormLoginValues(HTMLParser):
def __init__(self):
super().__init__()
self.values = {}
def handle_starttag(self, tag, attrs):
if tag == 'input':
attrib = dict(attrs)
try:
self.values[attrib['id']] = attrib['value']
except:
pass
class FormValues(HTMLParser):
def __init__(self):
super().__init__()
self.values = {}
def handle_starttag(self, tag, attrs):
if tag in ('input', 'select'):
a = dict(attrs)
if a.get('type', '') and a['type'] == 'hidden':
if 'name' in a and 'value' in a:
self.values[a['name']] = a['value']
class Filters(object):
def __init__(self, args):
self.uuid = args['uuid']
self.date_from = args['date_from']
self.day = args.get('day', False)
self.emitidas = args['emitidas']
self.date_to = None
self.stop = False
self.hour = False
self.minute = False
self.second = False
self._init_values(args)
def __str__(self):
msg = 'Descargar por UUID'
tipo = 'Recibidas'
if self.emitidas:
tipo = 'Emitidas'
return '{} - {} - {}'.format(msg, self.uuid, tipo)
def _init_values(self, args):
#~ print ('ARGS', args)
status = '-1'
type_cfdi = args.get('type_cfdi', '-1')
center_filter = 'RdoFolioFiscal'
rfc_receptor = ''
script_manager = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda'
self._post = {
'__ASYNCPOST': 'true',
'__EVENTTARGET': '',
'__EVENTARGUMENT': '',
'__LASTFOCUS': '',
'__VIEWSTATEENCRYPTED': '',
'ctl00$ScriptManager1': script_manager,
'ctl00$MainContent$hfInicialBool': 'false',
'ctl00$MainContent$BtnBusqueda': 'Buscar CFDI',
'ctl00$MainContent$TxtUUID': self.uuid,
'ctl00$MainContent$FiltroCentral': center_filter,
'ctl00$MainContent$TxtRfcReceptor': rfc_receptor,
'ctl00$MainContent$DdlEstadoComprobante': status,
'ctl00$MainContent$ddlComplementos': type_cfdi,
}
return
def get_post(self):
start_hour = '0'
start_minute = '0'
start_second = '0'
end_hour = '0'
end_minute = '0'
end_second = '0'
if self.date_from:
start_hour = str(self.date_from.hour)
start_minute = str(self.date_from.minute)
start_second = str(self.date_from.second)
end_hour = str(self.date_to.hour)
end_minute = str(self.date_to.minute)
end_second = str(self.date_to.second)
if self.emitidas:
year1 = '0'
year2 = '0'
start = ''
end = ''
if self.date_from:
year1 = str(self.date_from.year)
year2 = str(self.date_to.year)
start = self.date_from.strftime('%d/%m/%Y')
end = self.date_to.strftime('%d/%m/%Y')
data = {
'ctl00$MainContent$hfInicial': year1,
'ctl00$MainContent$CldFechaInicial2$Calendario_text': start,
'ctl00$MainContent$CldFechaInicial2$DdlHora': start_hour,
'ctl00$MainContent$CldFechaInicial2$DdlMinuto': start_minute,
'ctl00$MainContent$CldFechaInicial2$DdlSegundo': start_second,
'ctl00$MainContent$hfFinal': year2,
'ctl00$MainContent$CldFechaFinal2$Calendario_text': end,
'ctl00$MainContent$CldFechaFinal2$DdlHora': end_hour,
'ctl00$MainContent$CldFechaFinal2$DdlMinuto': end_minute,
'ctl00$MainContent$CldFechaFinal2$DdlSegundo': end_second,
}
else:
year = '0'
month = '0'
if self.date_from:
year = str(self.date_from.year)
month = str(self.date_from.month)
day = '00'
if self.day:
day = '{:02d}'.format(self.date_from.day)
data = {
'ctl00$MainContent$CldFecha$DdlAnio': year,
'ctl00$MainContent$CldFecha$DdlMes': month,
'ctl00$MainContent$CldFecha$DdlDia': day,
'ctl00$MainContent$CldFecha$DdlHora': start_hour,
'ctl00$MainContent$CldFecha$DdlMinuto': start_minute,
'ctl00$MainContent$CldFecha$DdlSegundo': start_second,
'ctl00$MainContent$CldFecha$DdlHoraFin': end_hour,
'ctl00$MainContent$CldFecha$DdlMinutoFin': end_minute,
'ctl00$MainContent$CldFecha$DdlSegundoFin': end_second,
}
self._post.update(data)
return self._post
class Invoice(HTMLParser):
START_PAGE = 'ContenedorDinamico'
# ~ START_PAGE = 'ctl00_MainContent_ContenedorDinamico'
URL = 'https://portalcfdi.facturaelectronica.sat.gob.mx/'
END_PAGE = 'ctl00_MainContent_pageNavPosition'
LIMIT_RECORDS = 'ctl00_MainContent_PnlLimiteRegistros'
NOT_RECORDS = 'ctl00_MainContent_PnlNoResultados'
TEMPLATE_DATE = '%Y-%m-%dT%H:%M:%S'
def __init__(self):
super().__init__()
self._is_div_page = False
self._col = 0
self._current_tag = ''
self._last_link = ''
self._last_link_pdf = ''
self._last_uuid = ''
self._last_status = ''
self._last_date_cfdi = ''
self._last_date_timbre = ''
self._last_pac = ''
self._last_total = ''
self._last_type = ''
self._last_date_cancel = ''
self._last_emisor_rfc = ''
self._last_emisor = ''
self._last_receptor_rfc = ''
self._last_receptor = ''
self.invoices = []
self.not_found = False
self.limit = False
def handle_starttag(self, tag, attrs):
self._current_tag = tag
if tag == 'div':
attrib = dict(attrs)
if 'id' in attrib and attrib['id'] == self.NOT_RECORDS \
and 'inline' in attrib['style']:
self.not_found = True
elif 'id' in attrib and attrib['id'] == self.LIMIT_RECORDS:
self.limit = True
elif 'id' in attrib and attrib['id'] == self.START_PAGE:
self._is_div_page = True
elif 'id' in attrib and attrib['id'] == self.END_PAGE:
self._is_div_page = False
elif self._is_div_page and tag == 'td':
self._col +=1
elif tag == 'span':
attrib = dict(attrs)
if attrib.get('id', '') == 'BtnDescarga':
self._last_link = attrib['onclick'].split("'")[1]
def handle_endtag(self, tag):
if self._is_div_page and tag == 'tr':
if self._last_uuid:
url_xml = ''
if self._last_link:
url_xml = '{}{}'.format(self.URL, self._last_link)
self._last_link = ''
url_pdf = ''
if self._last_link_pdf:
url_pdf = '{}{}'.format(self.URL, self._last_link_pdf)
date_cancel = None
if self._last_date_cancel:
date_cancel = datetime.datetime.strptime(
self._last_date_cancel, self.TEMPLATE_DATE)
invoice = (self._last_uuid,
{
'url': url_xml,
'acuse': url_pdf,
'estatus': self._last_status,
'date_cfdi': datetime.datetime.strptime(
self._last_date_cfdi, self.TEMPLATE_DATE),
'date_timbre': datetime.datetime.strptime(
self._last_date_timbre, self.TEMPLATE_DATE),
'date_cancel': date_cancel,
'rfc_pac': self._last_pac,
'total': float(self._last_total),
'tipo': self._last_type,
'emisor': self._last_emisor,
'rfc_emisor': self._last_emisor_rfc,
'receptor': self._last_receptor,
'rfc_receptor': self._last_receptor_rfc,
}
)
self.invoices.append(invoice)
self._last_link_pdf = ''
self._last_uuid = ''
self._last_status = ''
self._last_date_cancel = ''
self._last_emisor_rfc = ''
self._last_emisor = ''
self._last_receptor_rfc = ''
self._last_receptor = ''
self._last_date_cfdi = ''
self._last_date_timbre = ''
self._last_pac = ''
self._last_total = ''
self._last_type = ''
self._col = 0
def handle_data(self, data):
cv = data.strip()
if self._is_div_page and self._current_tag == 'span' and cv:
if self._col == 1:
try:
UUID(cv)
self._last_uuid = cv
except ValueError:
pass
elif self._col == 2:
self._last_emisor_rfc = cv
elif self._col == 3:
self._last_emisor = cv
elif self._col == 4:
self._last_receptor_rfc = cv
elif self._col == 5:
self._last_receptor = cv
elif self._col == 6:
self._last_date_cfdi = cv
elif self._col == 7:
self._last_date_timbre = cv
elif self._col == 8:
self._last_pac = cv
elif self._col == 9:
self._last_total = cv.replace('$', '').replace(',', '')
elif self._col == 10:
self._last_type = cv.lower()
elif self._col == 12:
self._last_status = cv
elif self._col == 14:
self._last_date_cancel = cv
class PortalSAT(object):
URL_MAIN = 'https://portalcfdi.facturaelectronica.sat.gob.mx/'
HOST = 'cfdiau.sat.gob.mx'
@ -20,12 +324,240 @@ class PortalSAT(object):
URL_EMISOR = URL_PORTAL + 'ConsultaEmisor.aspx'
URL_LOGOUT = URL_PORTAL + 'logout.aspx?salir=y'
def __init__(self, cert, cfdi_uuid):
self._cert = cert
self._uuid = cfdi_uuid
def __init__(self):
self.error = ''
self.is_connect = False
self._emitidas = False
self._session = Session()
a = adapters.HTTPAdapter(pool_connections=512, pool_maxsize=512, max_retries=5)
self._session.mount('https://', a)
def _read_form(self, html, form=''):
if form == 'login':
parser = FormLoginValues()
else:
parser = FormValues()
parser.feed(html)
return parser.values
def _response(self, url, method='get', headers={}, data={}):
# ~ log.debug('URL: {}'.format(url))
try:
if method == 'get':
result = self._session.get(url, timeout=TIMEOUT,
verify=VERIFY_CERT)
else:
result = self._session.post(url, data=data,
timeout=TIMEOUT, verify=VERIFY_CERT)
msg = '{} {} {}'.format(result.status_code, method.upper(), url)
if result.status_code == 200:
return result.text
else:
log.error(msg)
return ''
except exceptions.Timeout:
msg = 'Tiempo de espera agotado'
self.not_network = True
log.error(msg)
self.error = msg
return ''
except exceptions.ConnectionError:
msg = 'Revisa la conexión a Internet'
self.not_network = True
log.error(msg)
self.error = msg
return ''
def _get_headers(self, host, referer, ajax=False):
user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:49.0) Gecko/20100101 Firefox/49.0'
acept = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
headers = {
'Accept': acept,
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'DNT': '1',
'Host': host,
'Referer': referer,
'Upgrade-Insecure-Requests': '1',
'User-Agent': self.BROWSER,
'Content-Type': 'application/x-www-form-urlencoded',
}
if ajax:
headers.update({
'Cache-Control': 'no-cache',
'X-MicrosoftAjax': 'Delta=true',
'x-requested-with': 'XMLHttpRequest',
'Pragma': 'no-cache',
})
return headers
def _get_post_type_search(self, html):
tipo_busqueda = 'RdoTipoBusquedaReceptor'
if self._emitidas:
tipo_busqueda = 'RdoTipoBusquedaEmisor'
sm = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda'
post = self._read_form(html)
post['ctl00$MainContent$TipoBusqueda'] = tipo_busqueda
post['__ASYNCPOST'] = 'true'
post['__EVENTTARGET'] = ''
post['__EVENTARGUMENT'] = ''
post['ctl00$ScriptManager1'] = sm
return post
def _get_data_cert(self, cert):
with open(cert['cer'], 'rb') as fh:
cert = crypto.load_certificate(crypto.FILETYPE_ASN1, fh.read())
rfc = cert.get_subject().x500UniqueIdentifier.split(' ')[0]
serie = '{0:x}'.format(cert.get_serial_number())[1::2]
fert = cert.get_notAfter().decode()[2:]
return rfc, serie, fert
def _sign(self, cert, data):
with open(cert['key']) as fh:
key = crypto.load_privatekey(crypto.FILETYPE_PEM, fh.read())
sign = base64.b64encode(crypto.sign(key, data, 'sha256'))
return base64.b64encode(sign).decode('utf-8')
def _get_token(self, firma, co):
co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
data = '{}#{}'.format(co, firma).encode('utf-8')
token = base64.b64encode(data).decode('utf-8')
return token
def _make_data_form(self, cert, values):
rfc, serie, fert = self._get_data_cert(cert)
co = '{}|{}|{}'.format(values['tokenuuid'], rfc, serie)
firma = self._sign(cert, co)
token = self._get_token(firma, co)
keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
data = {k: values[k] for k in keys}
data['fert'] = fert
data['token'] = token
data['arc'] = ''
data['placer'] = ''
data['secuence'] = ''
data['seeder'] = ''
data['tan'] = ''
return data
def login(self, cert):
HOST = 'cfdicontribuyentes.accesscontrol.windows.net'
REFERER = 'https://cfdiau.sat.gob.mx/nidp/wsfed/ep?id=SATUPCFDiCon&sid=0&option=credential&sid=0'
url_login = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0'
result = self._session.get(self.URL_MAIN)
url_redirect = result.history[-1].headers['Location']
self._session.headers['Host'] = self.HOST
result = self._response(url_redirect)
self._session.headers['User-Agent'] = self.BROWSER
self._session.headers['Referer'] = REFERER.format(url_redirect)
result = self._response(url_login, 'post')
values = self._read_form(result, 'login')
data = self._make_data_form(cert, values)
headers = self._get_headers(self.HOST, self.REFERER)
self._session.headers.update(headers)
result = self._response(url_login, 'post', data=data)
if not result:
msg = 'Error al identificarse en el SAT'
log.error(msg)
return False
data = self._read_form(result)
# Inicio
response = self._response(self.URL_MAIN, 'post', data=data)
data = self._get_post_type_search(response)
headers = self._get_headers(self.HOST, self.URL_MAIN)
# Consulta
response = self._response(self.URL_CONSULTA, 'post', headers, data)
msg = 'Se ha identificado en el SAT'
log.info(msg)
self.is_connect = True
return True
def logout(self):
msg = 'Cerrando sessión en el SAT'
log.debug(msg)
respuesta = self._response(self.URL_LOGOUT)
self.is_connect = False
msg = 'Sesión cerrada en el SAT'
log.info(msg)
return
def _get_filters(self, cfdi_uuid, emitidas=True):
filters = []
data = {'uuid': cfdi_uuid}
data['day'] = False
data['emitidas'] = emitidas
data['rfc_emisor'] = ''
data['rfc_receptor'] = ''
data['type_cfdi'] = '-1'
data['date_from'] = None
filters.append(Filters(data))
return tuple(filters)
def _merge(self, list1, list2):
result = list1.copy()
result.update(list2)
return result
def _get_download_links(self, html):
parser = Invoice()
parser.feed(html)
return parser.not_found, parser.limit, parser.invoices
def _search_by_uuid(self, filters):
f = filters[0]
log.info(str(f))
url_search = self.URL_RECEPTOR
if f.emitidas:
url_search = self.URL_EMISOR
result = self._response(url_search, 'get')
post = self._read_form(result)
post = self._merge(post, f.get_post())
headers = self._get_headers(self.PORTAL, url_search)
html = self._response(url_search, 'post', headers, post)
not_found, limit, invoices = self._get_download_links(html)
if not_found:
msg = f'\n\tNo se encontraron documentos en el filtro:\n\t{str(f)}'
log.info(msg)
return ''
url = invoices[0][1]['url']
xml = ''
r = self._session.get(url, timeout=TIMEOUT)
if r.status_code == 200:
xml = r.text
return xml
def get_uuid(self, cfdi_uuid):
data = {'error': '', 'xml': ''}
msg = f'Buscando UUID: {cfdi_uuid}'
log.debug(msg)
# ~ Recibidos
filters = self._get_filters(cfdi_uuid, False)
data['xml'] = self._search_by_uuid(filters)
if data['xml']:
return data
# ~ Emitidos
filters = self._get_filters(cfdi_uuid, True)
data['xml'] = self._search_by_uuid(filters)
return data

View File

@ -1,7 +1,11 @@
#!/usr/bin/env python
from pathlib import Path
from uuid import UUID
from OpenSSL import crypto
from .portal_sat import PortalSAT
from conf import RUTA_FIEL, NOMBRE_FIEL
def validate_uuid(value):
@ -12,11 +16,52 @@ def validate_uuid(value):
return False
def get_uuid(rfc, cfdi):
def validate_fiel(rfc):
cert = {}
error = ''
fiel_cer = f'{NOMBRE_FIEL}.cer'
fiel_pem = f'{NOMBRE_FIEL}.pem'
path_fiel = Path(RUTA_FIEL) / rfc.lower()
path_fiel_cer = path_fiel / fiel_cer
path_fiel_pem = path_fiel / fiel_pem
if not path_fiel_cer.exists():
error = f'No se encontró el archivo: {path_fiel_cer}'
return cert, error
if not path_fiel_pem.exists():
error = f'No se encontró el archivo: {path_fiel_pem}'
return cert, error
cert['key'] = str(path_fiel_pem)
cert['cer'] = str(path_fiel_cer)
return cert, error
def get_uuid(rfc, cfdi_uuid):
data = {'error': '', 'xml': ''}
if not validate_uuid(cfdi):
if not validate_uuid(cfdi_uuid):
data['error'] = 'UUID inválido'
return data
cert, error = validate_fiel(rfc)
if not cert:
data['error'] = error
return data
sat = PortalSAT()
sat.login(cert)
if not sat.is_connect:
sat.logout()
data['error'] = sat.error
log.eror(sat.error)
return data
data = sat.get_uuid(cfdi_uuid)
sat.logout()
return data