cfdi-descarga/source/sat/util.py

359 lines
8.3 KiB
Python

#!/usr/bin/env python3
import getpass
import uuid
from calendar import monthrange
from datetime import datetime
from pathlib import Path
from time import sleep
from .cfdi_cert import SATCertificate
from .sat_web import SATWebService
from settings import log
def today():
return datetime.today()
def validate_date(year, month, day):
try:
datetime(year, month, day, 0, 0, 0)
result = True
except ValueError:
result = False
return result
def is_dir(path):
return Path(path).is_dir()
def join(*paths):
return Path(paths[0]).joinpath(*paths[1:])
def _validate_fiel_args(args):
fiel_path = args.fiel_dir
fiel_name = args.fiel_nombre
if not fiel_path:
msg = 'El directorio con la FIEL es requerido'
log.error(msg)
return False, {}
if not is_dir(fiel_path):
msg = f'La ruta no existe o no es un directorio. \nRuta: {fiel_path}'
log.error(msg)
return False, {}
path_cer = join(fiel_path, f'{fiel_name}.cer')
path_key = join(fiel_path, f'{fiel_name}.key')
path_enc = join(fiel_path, f'{fiel_name}.enc')
if not path_cer.is_file():
msg = f'No se encontró el archivo CER. \nRuta: {path_cer}'
log.error(msg)
return False, {}
if not path_key.is_file():
msg = f'No se encontró el archivo KEY. \nRuta: {path_cer}'
log.error(msg)
return False, {}
data = {
'path_cer': path_cer,
'path_key': path_key,
'path_enc': path_enc,
}
return True, data
def fiel_validar(args):
result, data = _validate_fiel_args(args)
if not result:
return
password = getpass.getpass('Introduce la contraseña del archivo KEY: ')
if not password:
msg = 'La contraseña es requerida para validar la FIEL'
log.error(msg)
return
cer = data['path_cer'].read_bytes()
key = data['path_key'].read_bytes()
cert = SATCertificate(cer, key, password)
if cert.error:
msg = f'{cert.error}\n\nNo podrás conectarte el SAT.'
log.error(msg)
return
if not cert.is_fiel:
msg = 'El certificado no es FIEL'
log.error(msg)
return
data['path_enc'].write_bytes(cert.key_enc)
msg = 'Los datos del certificado son:'
log.info(msg)
log.info(f'\n{cert}')
msg = 'Ya puedes descargar del SAT'
log.info(msg)
return
def base_datos():
db.create_tables()
return
def _validate_requests_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file():
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cer = data['path_cer'].read_bytes()
key = data['path_enc'].read_bytes()
cert = SATCertificate(cer, key)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
data['cert'] = cert
data['type'] = args.type
data['year'] = args.year
data['month'] = args.month
data['day'] = args.day
if data['day']:
if not validate_date(data['year'], data['month'], data['day']):
msg = 'Fecha inválida'
log.error(msg)
return False, {}
now = today()
month1 = month2 = data['month']
if month1 == 0:
month1 = 1
month2 = 12
if data['day']:
day1 = day2 = data['day']
else:
day1 = 1
day2 = monthrange(data['year'], month2)[1]
data['date_start'] = datetime(data['year'], month1, day1)
data['date_end'] = datetime(data['year'], month2, day2, 23, 59, 59)
return True, data
def _validate_verificar_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file():
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cer = data['path_cer'].read_bytes()
key = data['path_enc'].read_bytes()
cert = SATCertificate(cer, key)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
if not args.id_request:
msg = 'El ID de solicitud de descarga es requerido'
log.error(msg)
return False, {}
data['cert'] = cert
data['id'] = args.id_request
return True, data
def _validate_download_args(args):
result, data = _validate_fiel_args(args)
if not result:
return False, {}
if not data['path_enc'].is_file():
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
log.error(msg)
return False, {}
cer = data['path_cer'].read_bytes()
key = data['path_enc'].read_bytes()
cert = SATCertificate(cer, key)
if not cert.is_valid_time:
msg = 'La FIEL no es vigente'
log.error(msg)
return False, {}
if not args.id_request and not args.id_file:
msg = 'El ID de solicitud o ID de archivo de descarga es requerido'
log.error(msg)
return False, {}
if not args.path_download:
msg = 'La ruta de descarga es requerida [-dd]'
log.error(msg)
return False, {}
data['cert'] = cert
data['id'] = args.id_request
data['id_file'] = args.id_file
data['path'] = args.path_download
return True, data
def solicitar_descarga(args):
result, data = _validate_requests_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
result = sat.request_download(data)
print(result)
return
def verificar_descarga(args):
result, data = _validate_verificar_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
result = sat.verify(data)
print(result)
return
def descargar_archivos(args):
result, data = _validate_download_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
if args.id_file:
files = (args.id_file,)
else:
result = sat.verify(data)
files = result['files']
for f in files:
data['id_file'] = f
result, file_data = sat.download(data)
if file_data is None:
log.debug(result)
else:
path_zip = join(data['path'], f'{f}.zip')
msg = f'Guardando: {path_zip}'
log.info(msg)
with open(path_zip, 'wb') as f:
f.write(file_data)
msg = f'\tArchivo guardado correctamente'
log.info(msg)
return
def _validate_args(args):
result, data = _validate_requests_args(args)
if not result:
return False, {}
if not args.path_download:
msg = 'La ruta de descarga es requerida [-dd]'
log.error(msg)
return False, {}
data['path'] = args.path_download
return True, data
def descargar(args):
OK = '5000'
result, data = _validate_args(args)
if not result:
return
sat = SATWebService(data['cert'])
if not sat.is_authenticate:
log.error(sat.error)
return
result = sat.request_download(data)
if result['CodEstatus'] != OK:
log.error(result)
return
data['id'] = result['IdSolicitud']
while True:
result = sat.verify(data)
if result['EstadoSolicitud'] in ('1', '2'):
msg = 'Esperando un minuto para volver a verificar...'
log.info(msg)
sleep(60)
continue
if result['EstadoSolicitud'] == '3':
for f in result['files']:
data['id_file'] = f
result, file_data = sat.download(data)
if file_data is None:
log.debug(result)
else:
path_zip = join(data['path'], f'{f}.zip')
msg = f'Guardando: {path_zip}'
log.info(msg)
with open(path_zip, 'wb') as f:
f.write(file_data)
msg = f'\tArchivo guardado correctamente'
log.info(msg)
break
log.error(result)
return