2021-07-14 00:12:21 -05:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
import getpass
|
|
|
|
import uuid
|
2021-07-14 12:14:58 -05:00
|
|
|
from calendar import monthrange
|
2021-07-14 00:12:21 -05:00
|
|
|
from datetime import datetime
|
|
|
|
from pathlib import Path
|
2021-07-14 23:06:48 -05:00
|
|
|
from time import sleep
|
2021-07-14 00:12:21 -05:00
|
|
|
|
|
|
|
from .cfdi_cert import SATCertificate
|
|
|
|
from .sat_web import SATWebService
|
|
|
|
from settings import log
|
|
|
|
|
|
|
|
|
|
|
|
def today():
|
|
|
|
return datetime.today()
|
|
|
|
|
|
|
|
|
|
|
|
def validate_date(year, month, day):
|
|
|
|
try:
|
|
|
|
datetime(year, month, day, 0, 0, 0)
|
|
|
|
result = True
|
|
|
|
except ValueError:
|
|
|
|
result = False
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
def is_dir(path):
|
|
|
|
return Path(path).is_dir()
|
|
|
|
|
|
|
|
|
|
|
|
def join(*paths):
|
|
|
|
return Path(paths[0]).joinpath(*paths[1:])
|
|
|
|
|
|
|
|
|
|
|
|
def _validate_fiel_args(args):
|
|
|
|
fiel_path = args.fiel_dir
|
|
|
|
fiel_name = args.fiel_nombre
|
|
|
|
|
|
|
|
if not fiel_path:
|
|
|
|
msg = 'El directorio con la FIEL es requerido'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not is_dir(fiel_path):
|
|
|
|
msg = f'La ruta no existe o no es un directorio. \nRuta: {fiel_path}'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
path_cer = join(fiel_path, f'{fiel_name}.cer')
|
|
|
|
path_key = join(fiel_path, f'{fiel_name}.key')
|
|
|
|
path_enc = join(fiel_path, f'{fiel_name}.enc')
|
|
|
|
|
|
|
|
if not path_cer.is_file():
|
|
|
|
msg = f'No se encontró el archivo CER. \nRuta: {path_cer}'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not path_key.is_file():
|
|
|
|
msg = f'No se encontró el archivo KEY. \nRuta: {path_cer}'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data = {
|
|
|
|
'path_cer': path_cer,
|
|
|
|
'path_key': path_key,
|
|
|
|
'path_enc': path_enc,
|
|
|
|
}
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
|
|
|
def fiel_validar(args):
|
|
|
|
result, data = _validate_fiel_args(args)
|
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
password = getpass.getpass('Introduce la contraseña del archivo KEY: ')
|
|
|
|
if not password:
|
|
|
|
msg = 'La contraseña es requerida para validar la FIEL'
|
|
|
|
log.error(msg)
|
|
|
|
return
|
|
|
|
|
|
|
|
cer = data['path_cer'].read_bytes()
|
|
|
|
key = data['path_key'].read_bytes()
|
|
|
|
cert = SATCertificate(cer, key, password)
|
|
|
|
|
|
|
|
if cert.error:
|
|
|
|
msg = f'{cert.error}\n\nNo podrás conectarte el SAT.'
|
|
|
|
log.error(msg)
|
|
|
|
return
|
|
|
|
|
|
|
|
if not cert.is_fiel:
|
|
|
|
msg = 'El certificado no es FIEL'
|
|
|
|
log.error(msg)
|
|
|
|
return
|
|
|
|
|
|
|
|
data['path_enc'].write_bytes(cert.key_enc)
|
|
|
|
|
|
|
|
msg = 'Los datos del certificado son:'
|
|
|
|
log.info(msg)
|
|
|
|
log.info(f'\n{cert}')
|
|
|
|
msg = 'Ya puedes descargar del SAT'
|
|
|
|
log.info(msg)
|
2021-07-14 15:32:30 -05:00
|
|
|
|
2021-07-14 00:12:21 -05:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def base_datos():
|
|
|
|
db.create_tables()
|
|
|
|
return
|
|
|
|
|
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
def _validate_requests_args(args):
|
2021-07-14 00:12:21 -05:00
|
|
|
result, data = _validate_fiel_args(args)
|
|
|
|
if not result:
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not data['path_enc'].is_file():
|
|
|
|
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
cer = data['path_cer'].read_bytes()
|
|
|
|
key = data['path_enc'].read_bytes()
|
|
|
|
cert = SATCertificate(cer, key)
|
|
|
|
|
|
|
|
if not cert.is_valid_time:
|
|
|
|
msg = 'La FIEL no es vigente'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data['cert'] = cert
|
|
|
|
data['type'] = args.type
|
|
|
|
data['year'] = args.year
|
|
|
|
data['month'] = args.month
|
|
|
|
data['day'] = args.day
|
|
|
|
|
|
|
|
if data['day']:
|
|
|
|
if not validate_date(data['year'], data['month'], data['day']):
|
|
|
|
msg = 'Fecha inválida'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-14 12:14:58 -05:00
|
|
|
now = today()
|
|
|
|
|
|
|
|
month1 = month2 = data['month']
|
|
|
|
if month1 == 0:
|
|
|
|
month1 = 1
|
|
|
|
month2 = 12
|
|
|
|
|
|
|
|
if data['day']:
|
|
|
|
day1 = day2 = data['day']
|
|
|
|
else:
|
|
|
|
day1 = 1
|
|
|
|
day2 = monthrange(data['year'], month2)[1]
|
|
|
|
|
|
|
|
data['date_start'] = datetime(data['year'], month1, day1)
|
|
|
|
data['date_end'] = datetime(data['year'], month2, day2, 23, 59, 59)
|
|
|
|
|
2021-07-14 00:12:21 -05:00
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
def _validate_verificar_args(args):
|
|
|
|
result, data = _validate_fiel_args(args)
|
|
|
|
if not result:
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not data['path_enc'].is_file():
|
|
|
|
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
cer = data['path_cer'].read_bytes()
|
|
|
|
key = data['path_enc'].read_bytes()
|
|
|
|
cert = SATCertificate(cer, key)
|
|
|
|
|
|
|
|
if not cert.is_valid_time:
|
|
|
|
msg = 'La FIEL no es vigente'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not args.id_request:
|
|
|
|
msg = 'El ID de solicitud de descarga es requerido'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data['cert'] = cert
|
|
|
|
data['id'] = args.id_request
|
|
|
|
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
|
|
|
def _validate_download_args(args):
|
|
|
|
result, data = _validate_fiel_args(args)
|
|
|
|
if not result:
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not data['path_enc'].is_file():
|
|
|
|
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
cer = data['path_cer'].read_bytes()
|
|
|
|
key = data['path_enc'].read_bytes()
|
|
|
|
cert = SATCertificate(cer, key)
|
|
|
|
|
|
|
|
if not cert.is_valid_time:
|
|
|
|
msg = 'La FIEL no es vigente'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not args.id_request and not args.id_file:
|
|
|
|
msg = 'El ID de solicitud o ID de archivo de descarga es requerido'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-14 23:06:48 -05:00
|
|
|
if not args.path_download:
|
|
|
|
msg = 'La ruta de descarga es requerida [-dd]'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
data['cert'] = cert
|
|
|
|
data['id'] = args.id_request
|
|
|
|
data['id_file'] = args.id_file
|
2021-07-14 23:06:48 -05:00
|
|
|
data['path'] = args.path_download
|
2021-07-14 21:20:57 -05:00
|
|
|
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
|
|
|
def solicitar_descarga(args):
|
|
|
|
result, data = _validate_requests_args(args)
|
2021-07-14 00:12:21 -05:00
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
sat = SATWebService(data['cert'])
|
|
|
|
|
2021-07-14 12:14:58 -05:00
|
|
|
if not sat.is_authenticate:
|
2021-07-14 00:12:21 -05:00
|
|
|
log.error(sat.error)
|
2021-07-14 12:14:58 -05:00
|
|
|
return
|
|
|
|
|
|
|
|
result = sat.request_download(data)
|
|
|
|
print(result)
|
2021-07-14 00:12:21 -05:00
|
|
|
|
|
|
|
return
|
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
|
|
|
|
def verificar_descarga(args):
|
|
|
|
result, data = _validate_verificar_args(args)
|
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
sat = SATWebService(data['cert'])
|
|
|
|
|
|
|
|
if not sat.is_authenticate:
|
|
|
|
log.error(sat.error)
|
|
|
|
return
|
|
|
|
|
|
|
|
result = sat.verify(data)
|
|
|
|
print(result)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def descargar_archivos(args):
|
|
|
|
result, data = _validate_download_args(args)
|
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
sat = SATWebService(data['cert'])
|
|
|
|
|
|
|
|
if not sat.is_authenticate:
|
|
|
|
log.error(sat.error)
|
|
|
|
return
|
|
|
|
|
|
|
|
if args.id_file:
|
|
|
|
files = (args.id_file,)
|
|
|
|
else:
|
|
|
|
result = sat.verify(data)
|
|
|
|
files = result['files']
|
|
|
|
|
|
|
|
for f in files:
|
|
|
|
data['id_file'] = f
|
|
|
|
result, file_data = sat.download(data)
|
|
|
|
if file_data is None:
|
|
|
|
log.debug(result)
|
|
|
|
else:
|
2021-07-14 23:06:48 -05:00
|
|
|
path_zip = join(data['path'], f'{f}.zip')
|
|
|
|
msg = f'Guardando: {path_zip}'
|
2021-07-14 21:20:57 -05:00
|
|
|
log.info(msg)
|
2021-07-14 23:06:48 -05:00
|
|
|
with open(path_zip, 'wb') as f:
|
2021-07-14 21:20:57 -05:00
|
|
|
f.write(file_data)
|
|
|
|
msg = f'\tArchivo guardado correctamente'
|
|
|
|
log.info(msg)
|
|
|
|
return
|
|
|
|
|
|
|
|
|
2021-07-14 23:06:48 -05:00
|
|
|
def _validate_args(args):
|
|
|
|
result, data = _validate_requests_args(args)
|
|
|
|
if not result:
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not args.path_download:
|
|
|
|
msg = 'La ruta de descarga es requerida [-dd]'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data['path'] = args.path_download
|
|
|
|
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
|
|
|
def descargar(args):
|
|
|
|
OK = '5000'
|
|
|
|
|
|
|
|
result, data = _validate_args(args)
|
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
sat = SATWebService(data['cert'])
|
|
|
|
|
|
|
|
if not sat.is_authenticate:
|
|
|
|
log.error(sat.error)
|
|
|
|
return
|
|
|
|
|
|
|
|
result = sat.request_download(data)
|
|
|
|
if result['CodEstatus'] != OK:
|
|
|
|
log.error(result)
|
|
|
|
return
|
|
|
|
|
2021-07-14 23:15:38 -05:00
|
|
|
msg = 'Esperando un minuto para verificar la descarga...'
|
|
|
|
log.info(msg)
|
|
|
|
sleep(60)
|
|
|
|
|
2021-07-14 23:06:48 -05:00
|
|
|
data['id'] = result['IdSolicitud']
|
|
|
|
while True:
|
|
|
|
result = sat.verify(data)
|
|
|
|
if result['EstadoSolicitud'] in ('1', '2'):
|
2021-07-14 23:15:38 -05:00
|
|
|
msg = 'Esperando un minuto más para volver a verificar...'
|
2021-07-14 23:06:48 -05:00
|
|
|
log.info(msg)
|
|
|
|
sleep(60)
|
|
|
|
continue
|
|
|
|
|
|
|
|
if result['EstadoSolicitud'] == '3':
|
|
|
|
for f in result['files']:
|
|
|
|
data['id_file'] = f
|
|
|
|
result, file_data = sat.download(data)
|
|
|
|
if file_data is None:
|
|
|
|
log.debug(result)
|
|
|
|
else:
|
|
|
|
path_zip = join(data['path'], f'{f}.zip')
|
|
|
|
msg = f'Guardando: {path_zip}'
|
|
|
|
log.info(msg)
|
|
|
|
with open(path_zip, 'wb') as f:
|
|
|
|
f.write(file_data)
|
2021-07-14 23:12:42 -05:00
|
|
|
msg = f'\tArchivo guardado correctamente'
|
|
|
|
log.info(msg)
|
2021-07-14 23:06:48 -05:00
|
|
|
break
|
|
|
|
|
|
|
|
log.error(result)
|
2021-07-14 23:12:42 -05:00
|
|
|
break
|
2021-07-14 23:06:48 -05:00
|
|
|
|
|
|
|
return
|
2021-07-14 21:20:57 -05:00
|
|
|
|