diff --git a/.gitignore b/.gitignore index b8c2fe7..ac7ddec 100644 --- a/.gitignore +++ b/.gitignore @@ -161,3 +161,6 @@ cython_debug/ #.idea/ portal_sat.bk.py +conf.py + + diff --git a/requirements.txt b/requirements.txt index e0ca368..b698e0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -cryptography falcon requests +pyOpenSSL diff --git a/source/conf.py.example b/source/conf.py.example new file mode 100644 index 0000000..0d6c94a --- /dev/null +++ b/source/conf.py.example @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +# ~ Ruta al directorio con las FIEL, dentro de este, se busca +# ~ la carpeta por RFC en minúsculas. +RUTA_FIEL = '' + +# ~ Nombre predeterminado de los archivos FIEL +NOMBRE_FIEL = 'fiel' diff --git a/source/sat/portal_sat.py b/source/sat/portal_sat.py index 51a4e07..9be98af 100644 --- a/source/sat/portal_sat.py +++ b/source/sat/portal_sat.py @@ -1,9 +1,313 @@ +#!/usr/bin/env python +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 3, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +import base64 +import datetime +import logging + +from html.parser import HTMLParser +from uuid import UUID + +from OpenSSL import crypto import requests from requests import Session, exceptions, adapters requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL' +LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' +LOG_DATE = '%d/%m/%Y %H:%M:%S' +logging.addLevelName(logging.ERROR, '\033[1;41mERROR\033[1;0m') +logging.addLevelName(logging.DEBUG, '\x1b[33mDEBUG\033[1;0m') +logging.addLevelName(logging.INFO, '\x1b[32mINFO\033[1;0m') +logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=LOG_DATE) +log = logging.getLogger(__name__) + + +TIMEOUT = 10 +VERIFY_CERT = True + + +class FormLoginValues(HTMLParser): + + def __init__(self): + super().__init__() + self.values = {} + + def handle_starttag(self, tag, attrs): + if tag == 'input': + attrib = dict(attrs) + try: + self.values[attrib['id']] = attrib['value'] + except: + pass + + +class FormValues(HTMLParser): + + def __init__(self): + super().__init__() + self.values = {} + + def handle_starttag(self, tag, attrs): + if tag in ('input', 'select'): + a = dict(attrs) + if a.get('type', '') and a['type'] == 'hidden': + if 'name' in a and 'value' in a: + self.values[a['name']] = a['value'] + +class Filters(object): + + def __init__(self, args): + self.uuid = args['uuid'] + + self.date_from = args['date_from'] + self.day = args.get('day', False) + self.emitidas = args['emitidas'] + self.date_to = None + self.stop = False + self.hour = False + self.minute = False + self.second = False + self._init_values(args) + + def __str__(self): + msg = 'Descargar por UUID' + tipo = 'Recibidas' + if self.emitidas: + tipo = 'Emitidas' + return '{} - {} - {}'.format(msg, self.uuid, tipo) + + def _init_values(self, args): + #~ print ('ARGS', args) + status = '-1' + type_cfdi = args.get('type_cfdi', '-1') + center_filter = 'RdoFolioFiscal' + rfc_receptor = '' + + script_manager = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda' + self._post = { + '__ASYNCPOST': 'true', + '__EVENTTARGET': '', + '__EVENTARGUMENT': '', + '__LASTFOCUS': '', + '__VIEWSTATEENCRYPTED': '', + 'ctl00$ScriptManager1': script_manager, + 'ctl00$MainContent$hfInicialBool': 'false', + 'ctl00$MainContent$BtnBusqueda': 'Buscar CFDI', + 'ctl00$MainContent$TxtUUID': self.uuid, + 'ctl00$MainContent$FiltroCentral': center_filter, + 'ctl00$MainContent$TxtRfcReceptor': rfc_receptor, + 'ctl00$MainContent$DdlEstadoComprobante': status, + 'ctl00$MainContent$ddlComplementos': type_cfdi, + } + return + + def get_post(self): + start_hour = '0' + start_minute = '0' + start_second = '0' + end_hour = '0' + end_minute = '0' + end_second = '0' + + if self.date_from: + start_hour = str(self.date_from.hour) + start_minute = str(self.date_from.minute) + start_second = str(self.date_from.second) + end_hour = str(self.date_to.hour) + end_minute = str(self.date_to.minute) + end_second = str(self.date_to.second) + + if self.emitidas: + year1 = '0' + year2 = '0' + start = '' + end = '' + if self.date_from: + year1 = str(self.date_from.year) + year2 = str(self.date_to.year) + start = self.date_from.strftime('%d/%m/%Y') + end = self.date_to.strftime('%d/%m/%Y') + data = { + 'ctl00$MainContent$hfInicial': year1, + 'ctl00$MainContent$CldFechaInicial2$Calendario_text': start, + 'ctl00$MainContent$CldFechaInicial2$DdlHora': start_hour, + 'ctl00$MainContent$CldFechaInicial2$DdlMinuto': start_minute, + 'ctl00$MainContent$CldFechaInicial2$DdlSegundo': start_second, + 'ctl00$MainContent$hfFinal': year2, + 'ctl00$MainContent$CldFechaFinal2$Calendario_text': end, + 'ctl00$MainContent$CldFechaFinal2$DdlHora': end_hour, + 'ctl00$MainContent$CldFechaFinal2$DdlMinuto': end_minute, + 'ctl00$MainContent$CldFechaFinal2$DdlSegundo': end_second, + } + else: + year = '0' + month = '0' + if self.date_from: + year = str(self.date_from.year) + month = str(self.date_from.month) + day = '00' + if self.day: + day = '{:02d}'.format(self.date_from.day) + data = { + 'ctl00$MainContent$CldFecha$DdlAnio': year, + 'ctl00$MainContent$CldFecha$DdlMes': month, + 'ctl00$MainContent$CldFecha$DdlDia': day, + 'ctl00$MainContent$CldFecha$DdlHora': start_hour, + 'ctl00$MainContent$CldFecha$DdlMinuto': start_minute, + 'ctl00$MainContent$CldFecha$DdlSegundo': start_second, + 'ctl00$MainContent$CldFecha$DdlHoraFin': end_hour, + 'ctl00$MainContent$CldFecha$DdlMinutoFin': end_minute, + 'ctl00$MainContent$CldFecha$DdlSegundoFin': end_second, + } + self._post.update(data) + return self._post + + +class Invoice(HTMLParser): + START_PAGE = 'ContenedorDinamico' + # ~ START_PAGE = 'ctl00_MainContent_ContenedorDinamico' + URL = 'https://portalcfdi.facturaelectronica.sat.gob.mx/' + END_PAGE = 'ctl00_MainContent_pageNavPosition' + LIMIT_RECORDS = 'ctl00_MainContent_PnlLimiteRegistros' + NOT_RECORDS = 'ctl00_MainContent_PnlNoResultados' + TEMPLATE_DATE = '%Y-%m-%dT%H:%M:%S' + + def __init__(self): + super().__init__() + self._is_div_page = False + self._col = 0 + self._current_tag = '' + self._last_link = '' + self._last_link_pdf = '' + self._last_uuid = '' + self._last_status = '' + self._last_date_cfdi = '' + self._last_date_timbre = '' + self._last_pac = '' + self._last_total = '' + self._last_type = '' + self._last_date_cancel = '' + self._last_emisor_rfc = '' + self._last_emisor = '' + self._last_receptor_rfc = '' + self._last_receptor = '' + self.invoices = [] + self.not_found = False + self.limit = False + + def handle_starttag(self, tag, attrs): + self._current_tag = tag + if tag == 'div': + attrib = dict(attrs) + if 'id' in attrib and attrib['id'] == self.NOT_RECORDS \ + and 'inline' in attrib['style']: + self.not_found = True + elif 'id' in attrib and attrib['id'] == self.LIMIT_RECORDS: + self.limit = True + elif 'id' in attrib and attrib['id'] == self.START_PAGE: + self._is_div_page = True + elif 'id' in attrib and attrib['id'] == self.END_PAGE: + self._is_div_page = False + elif self._is_div_page and tag == 'td': + self._col +=1 + elif tag == 'span': + attrib = dict(attrs) + if attrib.get('id', '') == 'BtnDescarga': + self._last_link = attrib['onclick'].split("'")[1] + + def handle_endtag(self, tag): + if self._is_div_page and tag == 'tr': + if self._last_uuid: + url_xml = '' + if self._last_link: + url_xml = '{}{}'.format(self.URL, self._last_link) + self._last_link = '' + url_pdf = '' + if self._last_link_pdf: + url_pdf = '{}{}'.format(self.URL, self._last_link_pdf) + + date_cancel = None + if self._last_date_cancel: + date_cancel = datetime.datetime.strptime( + self._last_date_cancel, self.TEMPLATE_DATE) + invoice = (self._last_uuid, + { + 'url': url_xml, + 'acuse': url_pdf, + 'estatus': self._last_status, + 'date_cfdi': datetime.datetime.strptime( + self._last_date_cfdi, self.TEMPLATE_DATE), + 'date_timbre': datetime.datetime.strptime( + self._last_date_timbre, self.TEMPLATE_DATE), + 'date_cancel': date_cancel, + 'rfc_pac': self._last_pac, + 'total': float(self._last_total), + 'tipo': self._last_type, + 'emisor': self._last_emisor, + 'rfc_emisor': self._last_emisor_rfc, + 'receptor': self._last_receptor, + 'rfc_receptor': self._last_receptor_rfc, + } + ) + self.invoices.append(invoice) + self._last_link_pdf = '' + self._last_uuid = '' + self._last_status = '' + self._last_date_cancel = '' + self._last_emisor_rfc = '' + self._last_emisor = '' + self._last_receptor_rfc = '' + self._last_receptor = '' + self._last_date_cfdi = '' + self._last_date_timbre = '' + self._last_pac = '' + self._last_total = '' + self._last_type = '' + self._col = 0 + + def handle_data(self, data): + cv = data.strip() + if self._is_div_page and self._current_tag == 'span' and cv: + if self._col == 1: + try: + UUID(cv) + self._last_uuid = cv + except ValueError: + pass + elif self._col == 2: + self._last_emisor_rfc = cv + elif self._col == 3: + self._last_emisor = cv + elif self._col == 4: + self._last_receptor_rfc = cv + elif self._col == 5: + self._last_receptor = cv + elif self._col == 6: + self._last_date_cfdi = cv + elif self._col == 7: + self._last_date_timbre = cv + elif self._col == 8: + self._last_pac = cv + elif self._col == 9: + self._last_total = cv.replace('$', '').replace(',', '') + elif self._col == 10: + self._last_type = cv.lower() + elif self._col == 12: + self._last_status = cv + elif self._col == 14: + self._last_date_cancel = cv + + class PortalSAT(object): URL_MAIN = 'https://portalcfdi.facturaelectronica.sat.gob.mx/' HOST = 'cfdiau.sat.gob.mx' @@ -20,12 +324,240 @@ class PortalSAT(object): URL_EMISOR = URL_PORTAL + 'ConsultaEmisor.aspx' URL_LOGOUT = URL_PORTAL + 'logout.aspx?salir=y' - def __init__(self, cert, cfdi_uuid): - self._cert = cert - self._uuid = cfdi_uuid + def __init__(self): self.error = '' self.is_connect = False + self._emitidas = False self._session = Session() a = adapters.HTTPAdapter(pool_connections=512, pool_maxsize=512, max_retries=5) self._session.mount('https://', a) + def _read_form(self, html, form=''): + if form == 'login': + parser = FormLoginValues() + else: + parser = FormValues() + parser.feed(html) + return parser.values + + def _response(self, url, method='get', headers={}, data={}): + # ~ log.debug('URL: {}'.format(url)) + try: + if method == 'get': + result = self._session.get(url, timeout=TIMEOUT, + verify=VERIFY_CERT) + else: + result = self._session.post(url, data=data, + timeout=TIMEOUT, verify=VERIFY_CERT) + msg = '{} {} {}'.format(result.status_code, method.upper(), url) + if result.status_code == 200: + return result.text + else: + log.error(msg) + return '' + except exceptions.Timeout: + msg = 'Tiempo de espera agotado' + self.not_network = True + log.error(msg) + self.error = msg + return '' + except exceptions.ConnectionError: + msg = 'Revisa la conexión a Internet' + self.not_network = True + log.error(msg) + self.error = msg + return '' + + def _get_headers(self, host, referer, ajax=False): + user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:49.0) Gecko/20100101 Firefox/49.0' + acept = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' + + headers = { + 'Accept': acept, + 'Accept-Encoding': 'gzip, deflate, br', + 'Accept-Language': 'en-US,en;q=0.5', + 'Connection': 'keep-alive', + 'DNT': '1', + 'Host': host, + 'Referer': referer, + 'Upgrade-Insecure-Requests': '1', + 'User-Agent': self.BROWSER, + 'Content-Type': 'application/x-www-form-urlencoded', + } + if ajax: + headers.update({ + 'Cache-Control': 'no-cache', + 'X-MicrosoftAjax': 'Delta=true', + 'x-requested-with': 'XMLHttpRequest', + 'Pragma': 'no-cache', + }) + return headers + + def _get_post_type_search(self, html): + tipo_busqueda = 'RdoTipoBusquedaReceptor' + if self._emitidas: + tipo_busqueda = 'RdoTipoBusquedaEmisor' + sm = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda' + post = self._read_form(html) + post['ctl00$MainContent$TipoBusqueda'] = tipo_busqueda + post['__ASYNCPOST'] = 'true' + post['__EVENTTARGET'] = '' + post['__EVENTARGUMENT'] = '' + post['ctl00$ScriptManager1'] = sm + return post + + def _get_data_cert(self, cert): + with open(cert['cer'], 'rb') as fh: + cert = crypto.load_certificate(crypto.FILETYPE_ASN1, fh.read()) + rfc = cert.get_subject().x500UniqueIdentifier.split(' ')[0] + serie = '{0:x}'.format(cert.get_serial_number())[1::2] + fert = cert.get_notAfter().decode()[2:] + return rfc, serie, fert + + def _sign(self, cert, data): + with open(cert['key']) as fh: + key = crypto.load_privatekey(crypto.FILETYPE_PEM, fh.read()) + sign = base64.b64encode(crypto.sign(key, data, 'sha256')) + return base64.b64encode(sign).decode('utf-8') + + def _get_token(self, firma, co): + co = base64.b64encode(co.encode('utf-8')).decode('utf-8') + data = '{}#{}'.format(co, firma).encode('utf-8') + token = base64.b64encode(data).decode('utf-8') + return token + + def _make_data_form(self, cert, values): + rfc, serie, fert = self._get_data_cert(cert) + co = '{}|{}|{}'.format(values['tokenuuid'], rfc, serie) + firma = self._sign(cert, co) + token = self._get_token(firma, co) + + keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet') + data = {k: values[k] for k in keys} + data['fert'] = fert + data['token'] = token + data['arc'] = '' + data['placer'] = '' + data['secuence'] = '' + data['seeder'] = '' + data['tan'] = '' + return data + + def login(self, cert): + HOST = 'cfdicontribuyentes.accesscontrol.windows.net' + REFERER = 'https://cfdiau.sat.gob.mx/nidp/wsfed/ep?id=SATUPCFDiCon&sid=0&option=credential&sid=0' + + url_login = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0' + + result = self._session.get(self.URL_MAIN) + url_redirect = result.history[-1].headers['Location'] + self._session.headers['Host'] = self.HOST + result = self._response(url_redirect) + + self._session.headers['User-Agent'] = self.BROWSER + self._session.headers['Referer'] = REFERER.format(url_redirect) + result = self._response(url_login, 'post') + + values = self._read_form(result, 'login') + data = self._make_data_form(cert, values) + headers = self._get_headers(self.HOST, self.REFERER) + self._session.headers.update(headers) + result = self._response(url_login, 'post', data=data) + + if not result: + msg = 'Error al identificarse en el SAT' + log.error(msg) + return False + data = self._read_form(result) + + # Inicio + response = self._response(self.URL_MAIN, 'post', data=data) + data = self._get_post_type_search(response) + headers = self._get_headers(self.HOST, self.URL_MAIN) + + # Consulta + response = self._response(self.URL_CONSULTA, 'post', headers, data) + msg = 'Se ha identificado en el SAT' + log.info(msg) + self.is_connect = True + return True + + def logout(self): + msg = 'Cerrando sessión en el SAT' + log.debug(msg) + respuesta = self._response(self.URL_LOGOUT) + self.is_connect = False + msg = 'Sesión cerrada en el SAT' + log.info(msg) + return + + def _get_filters(self, cfdi_uuid, emitidas=True): + filters = [] + data = {'uuid': cfdi_uuid} + + data['day'] = False + data['emitidas'] = emitidas + data['rfc_emisor'] = '' + data['rfc_receptor'] = '' + data['type_cfdi'] = '-1' + data['date_from'] = None + + filters.append(Filters(data)) + + return tuple(filters) + + def _merge(self, list1, list2): + result = list1.copy() + result.update(list2) + return result + + def _get_download_links(self, html): + parser = Invoice() + parser.feed(html) + return parser.not_found, parser.limit, parser.invoices + + def _search_by_uuid(self, filters): + f = filters[0] + log.info(str(f)) + url_search = self.URL_RECEPTOR + if f.emitidas: + url_search = self.URL_EMISOR + + result = self._response(url_search, 'get') + post = self._read_form(result) + post = self._merge(post, f.get_post()) + headers = self._get_headers(self.PORTAL, url_search) + html = self._response(url_search, 'post', headers, post) + not_found, limit, invoices = self._get_download_links(html) + + if not_found: + msg = f'\n\tNo se encontraron documentos en el filtro:\n\t{str(f)}' + log.info(msg) + return '' + + url = invoices[0][1]['url'] + xml = '' + + r = self._session.get(url, timeout=TIMEOUT) + if r.status_code == 200: + xml = r.text + + return xml + + def get_uuid(self, cfdi_uuid): + data = {'error': '', 'xml': ''} + msg = f'Buscando UUID: {cfdi_uuid}' + log.debug(msg) + + # ~ Recibidos + filters = self._get_filters(cfdi_uuid, False) + data['xml'] = self._search_by_uuid(filters) + + if data['xml']: + return data + + # ~ Emitidos + filters = self._get_filters(cfdi_uuid, True) + data['xml'] = self._search_by_uuid(filters) + + return data diff --git a/source/sat/util.py b/source/sat/util.py index 0592311..962cbc7 100644 --- a/source/sat/util.py +++ b/source/sat/util.py @@ -1,7 +1,11 @@ #!/usr/bin/env python +from pathlib import Path from uuid import UUID +from OpenSSL import crypto + from .portal_sat import PortalSAT +from conf import RUTA_FIEL, NOMBRE_FIEL def validate_uuid(value): @@ -12,11 +16,52 @@ def validate_uuid(value): return False -def get_uuid(rfc, cfdi): +def validate_fiel(rfc): + cert = {} + error = '' + + fiel_cer = f'{NOMBRE_FIEL}.cer' + fiel_pem = f'{NOMBRE_FIEL}.pem' + path_fiel = Path(RUTA_FIEL) / rfc.lower() + path_fiel_cer = path_fiel / fiel_cer + path_fiel_pem = path_fiel / fiel_pem + + if not path_fiel_cer.exists(): + error = f'No se encontró el archivo: {path_fiel_cer}' + return cert, error + + if not path_fiel_pem.exists(): + error = f'No se encontró el archivo: {path_fiel_pem}' + return cert, error + + cert['key'] = str(path_fiel_pem) + cert['cer'] = str(path_fiel_cer) + + return cert, error + + +def get_uuid(rfc, cfdi_uuid): data = {'error': '', 'xml': ''} - if not validate_uuid(cfdi): + if not validate_uuid(cfdi_uuid): data['error'] = 'UUID inválido' return data + cert, error = validate_fiel(rfc) + if not cert: + data['error'] = error + return data + + sat = PortalSAT() + sat.login(cert) + + if not sat.is_connect: + sat.logout() + data['error'] = sat.error + log.eror(sat.error) + return data + + data = sat.get_uuid(cfdi_uuid) + sat.logout() + return data