#!/usr/bin/env python3 import getpass import uuid from calendar import monthrange from datetime import datetime from pathlib import Path from time import sleep from .cfdi_cert import SATCertificate from .sat_web import SATWebService from settings import log def today(): return datetime.today() def validate_date(year, month, day): try: datetime(year, month, day, 0, 0, 0) result = True except ValueError: result = False return result def is_dir(path): return Path(path).is_dir() def join(*paths): return Path(paths[0]).joinpath(*paths[1:]) def _validate_fiel_args(args): fiel_path = args.fiel_dir fiel_name = args.fiel_nombre if not fiel_path: msg = 'El directorio con la FIEL es requerido' log.error(msg) return False, {} if not is_dir(fiel_path): msg = f'La ruta no existe o no es un directorio. \nRuta: {fiel_path}' log.error(msg) return False, {} path_cer = join(fiel_path, f'{fiel_name}.cer') path_key = join(fiel_path, f'{fiel_name}.key') path_enc = join(fiel_path, f'{fiel_name}.enc') if not path_cer.is_file(): msg = f'No se encontró el archivo CER. \nRuta: {path_cer}' log.error(msg) return False, {} if not path_key.is_file(): msg = f'No se encontró el archivo KEY. \nRuta: {path_cer}' log.error(msg) return False, {} data = { 'path_cer': path_cer, 'path_key': path_key, 'path_enc': path_enc, } return True, data def fiel_validar(args): result, data = _validate_fiel_args(args) if not result: return password = getpass.getpass('Introduce la contraseña del archivo KEY: ') if not password: msg = 'La contraseña es requerida para validar la FIEL' log.error(msg) return cer = data['path_cer'].read_bytes() key = data['path_key'].read_bytes() cert = SATCertificate(cer, key, password) if cert.error: msg = f'{cert.error}\n\nNo podrás conectarte el SAT.' log.error(msg) return if not cert.is_fiel: msg = 'El certificado no es FIEL' log.error(msg) return data['path_enc'].write_bytes(cert.key_enc) msg = 'Los datos del certificado son:' log.info(msg) log.info(f'\n{cert}') msg = 'Ya puedes descargar del SAT' log.info(msg) return def base_datos(): db.create_tables() return def _validate_requests_args(args): result, data = _validate_fiel_args(args) if not result: return False, {} if not data['path_enc'].is_file(): msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}" log.error(msg) return False, {} cer = data['path_cer'].read_bytes() key = data['path_enc'].read_bytes() cert = SATCertificate(cer, key) if not cert.is_valid_time: msg = 'La FIEL no es vigente' log.error(msg) return False, {} data['cert'] = cert data['type'] = args.type data['year'] = args.year data['month'] = args.month data['day'] = args.day if data['day']: if not validate_date(data['year'], data['month'], data['day']): msg = 'Fecha inválida' log.error(msg) return False, {} now = today() month1 = month2 = data['month'] if month1 == 0: month1 = 1 month2 = 12 if data['day']: day1 = day2 = data['day'] else: day1 = 1 day2 = monthrange(data['year'], month2)[1] data['date_start'] = datetime(data['year'], month1, day1) data['date_end'] = datetime(data['year'], month2, day2, 23, 59, 59) return True, data def _validate_verificar_args(args): result, data = _validate_fiel_args(args) if not result: return False, {} if not data['path_enc'].is_file(): msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}" log.error(msg) return False, {} cer = data['path_cer'].read_bytes() key = data['path_enc'].read_bytes() cert = SATCertificate(cer, key) if not cert.is_valid_time: msg = 'La FIEL no es vigente' log.error(msg) return False, {} if not args.id_request: msg = 'El ID de solicitud de descarga es requerido' log.error(msg) return False, {} data['cert'] = cert data['id'] = args.id_request return True, data def _validate_download_args(args): result, data = _validate_fiel_args(args) if not result: return False, {} if not data['path_enc'].is_file(): msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}" log.error(msg) return False, {} cer = data['path_cer'].read_bytes() key = data['path_enc'].read_bytes() cert = SATCertificate(cer, key) if not cert.is_valid_time: msg = 'La FIEL no es vigente' log.error(msg) return False, {} if not args.id_request and not args.id_file: msg = 'El ID de solicitud o ID de archivo de descarga es requerido' log.error(msg) return False, {} if not args.path_download: msg = 'La ruta de descarga es requerida [-dd]' log.error(msg) return False, {} data['cert'] = cert data['id'] = args.id_request data['id_file'] = args.id_file data['path'] = args.path_download return True, data def _request_download(sat, data, key): data['rfc'] = key result = sat.request_download(data) # ~ Usando un simple print, permite capturarlo desde cualquier lenguaje print(result) return result def solicitar_descarga(args): result, data = _validate_requests_args(args) if not result: return sat = SATWebService(data['cert']) if not sat.is_authenticate: log.error(sat.error) return if data['type'] == 'e': _request_download(sat, data, 'RfcEmisor') elif data['type'] == 'r': _request_download(sat, data, 'RfcReceptor') else: _request_download(sat, data, 'RfcEmisor') _request_download(sat, data, 'RfcReceptor') return def verificar_descarga(args): result, data = _validate_verificar_args(args) if not result: return sat = SATWebService(data['cert']) if not sat.is_authenticate: log.error(sat.error) return result = sat.verify(data) # ~ Usando un simple print, permite capturarlo desde cualquier lenguaje print(result) return def descargar_archivos(args): result, data = _validate_download_args(args) if not result: return sat = SATWebService(data['cert']) if not sat.is_authenticate: log.error(sat.error) return if args.id_file: files = (args.id_file,) else: result = sat.verify(data) files = result['files'] for f in files: data['id_file'] = f result, file_data = sat.download(data) if file_data is None: log.debug(result) else: path_zip = join(data['path'], f'{f}.zip') msg = f'Guardando: {path_zip}' log.info(msg) with open(path_zip, 'wb') as f: f.write(file_data) msg = f's\tArchivo guardado correctamente' log.info(msg) return def _validate_args(args): result, data = _validate_requests_args(args) if not result: return False, {} if not args.path_download: msg = 'La ruta de descarga es requerida [-dd]' log.error(msg) return False, {} data['path'] = args.path_download return True, data def _download(sat, data, key): OK = '5000' result = _request_download(sat, data, key) if result['CodEstatus'] != OK: log.error(result) return data['id'] = result['IdSolicitud'] msg = f"Descarga aceptada con el ID: {data['id']}\n\nEsperando un minuto para verificar la descarga..." log.info(msg) for i in range(60, 0, -1): print(f'\r{i}', end=' ') sleep(1) while True: result = sat.verify(data) if result['EstadoSolicitud'] in ('1', '2'): msg = 'Esperando un minuto más para volver a verificar...' log.info(msg) for i in range(60, 0, -1): print(f'\r{i}', end=' ') sleep(1) continue if result['EstadoSolicitud'] == '3': for f in result['files']: data['id_file'] = f result, file_data = sat.download(data) if file_data is None: log.debug(result) else: path_zip = join(data['path'], f'{f}.zip') msg = f'Guardando: {path_zip}' log.info(msg) with open(path_zip, 'wb') as f: f.write(file_data) msg = f'\tArchivo guardado correctamente' log.info(msg) break log.error(result) break return def descargar(args): result, data = _validate_args(args) if not result: return sat = SATWebService(data['cert']) if not sat.is_authenticate: log.error(sat.error) return if data['type'] == 'e': _download(sat, data, 'RfcEmisor') elif data['type'] == 'r': _download(sat, data, 'RfcReceptor') else: _download(sat, data, 'RfcEmisor') _download(sat, data, 'RfcReceptor') return