cfdi-descarga/source/sat/util.py

454 lines
11 KiB
Python

#!/usr/bin/env python3
import getpass
import uuid
from calendar import monthrange
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep
from .cfdi_cert import SATCertificate
from .sat_web import SATWebService
from settings import log
def today():
return datetime.today()
def validate_date(year, month, day):
try:
datetime(year, month, day, 0, 0, 0)
result = True
except ValueError:
result = False
return result
def is_dir(path):
return Path(path).is_dir()
def join(*paths):
return Path(paths[0]).joinpath(*paths[1:])
def _validate_fiel_args(args, need_key=False):
fiel_path = args.fiel_dir
fiel_name = args.fiel_nombre
if not fiel_path:
msg = 'El directorio con la FIEL es requerido'
log.error(msg)
return False, {}
if not is_dir(fiel_path):
msg = f'La ruta no existe o no es un directorio. \nRuta: {fiel_path}'
log.error(msg)
return False, {}
path_cer = join(fiel_path, f'{fiel_name}.cer')
path_key = join(fiel_path, f'{fiel_name}.key')
path_enc = join(fiel_path, f'{fiel_name}.enc')
path_pem = join(fiel_path, f'{fiel_name}.pem')
if not path_cer.is_file():
msg = f'No se encontró el archivo CER. \nRuta: {path_cer}'
log.error(msg)
return False, {}
if need_key and not path_key.is_file():
msg = f'No se encontró el archivo KEY. \nRuta: {path_key}'
log.error(msg)
return False, {}
data = {
'path_cer': path_cer,
'path_key': path_key,
'path_enc': path_enc,
'path_pem': path_pem,
}
return True, data
def fiel_validar(args):
result, data = _validate_fiel_args(args, True)
if not result:
return
try:
password = getpass.getpass('Introduce la contraseña del archivo KEY: ')
except KeyboardInterrupt:
msg = 'Proceso cancelado'
log.info(msg)
return
if not password:
msg = 'La contraseña es requerida para validar la FIEL'
log.error(msg)
return
cer = data['path_cer'].read_bytes()
key = data['path_key'].read_bytes()
cert = SATCertificate(cer, key, password=password)
if cert.error:
msg = f'{cert.error}\n\nNo podrás conectarte el SAT.'
log.error(msg)
return
if not cert.is_fiel:
msg = 'El certificado no es FIEL'
log.error(msg)
return
data['path_enc'].write_bytes(cert.key_enc)
msg = 'Los datos del certificado son:'
log.info(msg)
log.info(f'\n{cert}')
msg = 'Ya puedes descargar del SAT'
log.info(msg)
return
def base_datos():
db.create_tables()
return
def _get_cert(data):
key = b''
pem = b''
cer = data['path_cer'].read_bytes()
if data['path_enc'].is_file():
key = data['path_enc'].read_bytes()
elif data['path_pem'].is_file():
pem = data['path_pem'].read_bytes()
cert = SATCertificate(cer, key, pem)
return cert
def _to_date(str_date, end=False):
error = ''
dt = None
try:
parts = str_date.split('-')
if end:
dt = datetime(int(parts[0]), int(parts[1]), int(parts[2]), 23, 59, 59)
else:
dt = datetime(int(parts[0]), int(parts[1]), int(parts[2]), 0, 0, 0)
except Exception as e:
error = 'Fecha inválida'
return error, dt
def _validate_requests_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file() and not data['path_pem'].is_file():
msg = f"No se encontró la FIEL [enc|pem].\nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cert = _get_cert(data)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
data['cert'] = cert
data['type'] = args.type
data['year'] = args.year
data['month'] = args.month
data['day'] = args.day
data['metadata'] = args.metadata
now = today()
if args.last_days:
date_start = now.replace(hour=0, minute=0, second=0, microsecond=0) \
- timedelta(days=args.last_days)
date_end = now.replace(hour=23, minute=59, second=59, microsecond=0)
data['date_start'] = date_start
data['date_end'] = date_end
return True, data
if args.date_start and args.date_end:
error, date_start = _to_date(args.date_start)
if error:
log.error(error)
return False, {}
error, date_end = _to_date(args.date_end, True)
if error:
log.error(error)
return False, {}
data['date_start'] = date_start
data['date_end'] = date_end
return True, data
if data['day']:
if not validate_date(data['year'], data['month'], data['day']):
msg = 'Fecha inválida'
log.error(msg)
return False, {}
month1 = month2 = data['month']
if month1 == 0:
month1 = 1
month2 = 12
if data['day']:
day1 = day2 = data['day']
else:
day1 = 1
day2 = monthrange(data['year'], month2)[1]
data['date_start'] = datetime(data['year'], month1, day1)
data['date_end'] = datetime(data['year'], month2, day2, 23, 59, 59)
return True, data
def _validate_verificar_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file() and not data['path_pem'].is_file():
msg = f"No se encontró la FIEL [enc|pem].\nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cert = _get_cert(data)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
if not args.id_request:
msg = 'El ID de solicitud de descarga es requerido'
log.error(msg)
return False, {}
data['cert'] = cert
data['id'] = args.id_request
return True, data
def _validate_download_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file() and not data['path_pem'].is_file():
msg = f"No se encontró la FIEL [enc|pem].\nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cert = _get_cert(data)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
if not args.id_request and not args.id_file:
msg = 'El ID de solicitud o ID de archivo de descarga es requerido'
log.error(msg)
return False, {}
if not args.path_download:
msg = 'La ruta de descarga es requerida [-dd]'
log.error(msg)
return False, {}
data['cert'] = cert
data['id'] = args.id_request
data['id_file'] = args.id_file
data['path'] = args.path_download
return True, data
def _request_download(sat, data, key):
data['rfc'] = key
result = sat.request_download(data)
# ~ Usando un simple print, permite capturarlo desde cualquier lenguaje
print(result)
return result
def solicitar_descarga(args):
result, data = _validate_requests_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
if data['type'] == 'e':
_request_download(sat, data, 'RfcEmisor')
elif data['type'] == 'r':
_request_download(sat, data, 'RfcReceptor')
else:
_request_download(sat, data, 'RfcEmisor')
_request_download(sat, data, 'RfcReceptor')
return
def verificar_descarga(args):
result, data = _validate_verificar_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
result = sat.verify(data)
# ~ Usando un simple print, permite capturarlo desde cualquier lenguaje
print(result)
return
def descargar_archivos(args):
result, data = _validate_download_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
if args.id_file:
files = (args.id_file,)
else:
result = sat.verify(data)
files = result['files']
if result['EstadoSolicitud'] in ('1', '2'):
msg = 'Solicitud aún no aceptada...'
log.error(msg)
return
for f in files:
data['id_file'] = f
result, file_data = sat.download(data)
if file_data is None:
log.debug(result)
else:
path_zip = join(data['path'], f'{f}.zip')
msg = f'Guardando: {path_zip}'
log.info(msg)
with open(path_zip, 'wb') as f:
f.write(file_data)
msg = f's\tArchivo guardado correctamente'
log.info(msg)
return
def _validate_args(args):
result, data = _validate_requests_args(args)
if not result:
return False, {}
if not args.path_download:
msg = 'La ruta de descarga es requerida [-dd]'
log.error(msg)
return False, {}
data['path'] = args.path_download
return True, data
def _download(data, key):
OK = '5000'
sat = SATWebService(data['cert'])
result = _request_download(sat, data, key)
if result['CodEstatus'] != OK:
log.error(result)
return
data['id'] = result['IdSolicitud']
msg = f"Descarga aceptada con el ID: {data['id']}\n\nEsperando un minuto para verificar la descarga..."
log.info(msg)
for i in range(60, 0, -1):
print(f'\r{i}', end=' ')
sleep(1)
while True:
sat = SATWebService(data['cert'])
result = sat.verify(data)
if result['EstadoSolicitud'] in ('1', '2'):
msg = 'Esperando un minuto más para volver a verificar...'
log.info(msg)
for i in range(60, 0, -1):
print(f'\r{i}', end=' ')
sleep(1)
continue
if result['EstadoSolicitud'] == '3':
for f in result['files']:
data['id_file'] = f
result, file_data = sat.download(data)
if file_data is None:
log.debug(result)
else:
path_zip = join(data['path'], f'{f}.zip')
msg = f'Guardando: {path_zip}'
log.info(msg)
with open(path_zip, 'wb') as f:
f.write(file_data)
msg = f'\tArchivo guardado correctamente'
log.info(msg)
break
log.error(result)
break
return
def descargar(args):
result, data = _validate_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
if data['type'] == 'e':
_download(data, 'RfcEmisor')
elif data['type'] == 'r':
_download(data, 'RfcReceptor')
else:
_download(data, 'RfcEmisor')
_download(data, 'RfcReceptor')
return