2021-07-14 00:12:21 -05:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
import getpass
|
|
|
|
import uuid
|
2021-07-14 12:14:58 -05:00
|
|
|
from calendar import monthrange
|
2021-07-28 22:36:14 -05:00
|
|
|
from datetime import datetime, timedelta
|
2021-07-14 00:12:21 -05:00
|
|
|
from pathlib import Path
|
2021-07-14 23:06:48 -05:00
|
|
|
from time import sleep
|
2021-07-14 00:12:21 -05:00
|
|
|
|
|
|
|
from .cfdi_cert import SATCertificate
|
|
|
|
from .sat_web import SATWebService
|
|
|
|
from settings import log
|
|
|
|
|
|
|
|
|
|
|
|
def today():
|
|
|
|
return datetime.today()
|
|
|
|
|
|
|
|
|
|
|
|
def validate_date(year, month, day):
|
|
|
|
try:
|
|
|
|
datetime(year, month, day, 0, 0, 0)
|
|
|
|
result = True
|
|
|
|
except ValueError:
|
|
|
|
result = False
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
def is_dir(path):
|
|
|
|
return Path(path).is_dir()
|
|
|
|
|
|
|
|
|
|
|
|
def join(*paths):
|
|
|
|
return Path(paths[0]).joinpath(*paths[1:])
|
|
|
|
|
|
|
|
|
2021-07-15 18:07:38 -05:00
|
|
|
def _validate_fiel_args(args, need_key=False):
|
2021-07-14 00:12:21 -05:00
|
|
|
fiel_path = args.fiel_dir
|
|
|
|
fiel_name = args.fiel_nombre
|
|
|
|
|
|
|
|
if not fiel_path:
|
|
|
|
msg = 'El directorio con la FIEL es requerido'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not is_dir(fiel_path):
|
|
|
|
msg = f'La ruta no existe o no es un directorio. \nRuta: {fiel_path}'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
path_cer = join(fiel_path, f'{fiel_name}.cer')
|
|
|
|
path_key = join(fiel_path, f'{fiel_name}.key')
|
|
|
|
path_enc = join(fiel_path, f'{fiel_name}.enc')
|
2021-07-15 18:07:38 -05:00
|
|
|
path_pem = join(fiel_path, f'{fiel_name}.pem')
|
2021-07-14 00:12:21 -05:00
|
|
|
|
|
|
|
if not path_cer.is_file():
|
|
|
|
msg = f'No se encontró el archivo CER. \nRuta: {path_cer}'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-15 18:07:38 -05:00
|
|
|
if need_key and not path_key.is_file():
|
|
|
|
msg = f'No se encontró el archivo KEY. \nRuta: {path_key}'
|
2021-07-14 00:12:21 -05:00
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data = {
|
|
|
|
'path_cer': path_cer,
|
|
|
|
'path_key': path_key,
|
|
|
|
'path_enc': path_enc,
|
2021-07-15 18:07:38 -05:00
|
|
|
'path_pem': path_pem,
|
2021-07-14 00:12:21 -05:00
|
|
|
}
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
|
|
|
def fiel_validar(args):
|
2021-07-15 18:07:38 -05:00
|
|
|
result, data = _validate_fiel_args(args, True)
|
2021-07-14 00:12:21 -05:00
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
password = getpass.getpass('Introduce la contraseña del archivo KEY: ')
|
|
|
|
if not password:
|
|
|
|
msg = 'La contraseña es requerida para validar la FIEL'
|
|
|
|
log.error(msg)
|
|
|
|
return
|
|
|
|
|
|
|
|
cer = data['path_cer'].read_bytes()
|
|
|
|
key = data['path_key'].read_bytes()
|
2021-07-15 18:07:38 -05:00
|
|
|
cert = SATCertificate(cer, key, password=password)
|
2021-07-14 00:12:21 -05:00
|
|
|
|
|
|
|
if cert.error:
|
|
|
|
msg = f'{cert.error}\n\nNo podrás conectarte el SAT.'
|
|
|
|
log.error(msg)
|
|
|
|
return
|
|
|
|
|
|
|
|
if not cert.is_fiel:
|
|
|
|
msg = 'El certificado no es FIEL'
|
|
|
|
log.error(msg)
|
|
|
|
return
|
|
|
|
|
|
|
|
data['path_enc'].write_bytes(cert.key_enc)
|
|
|
|
|
|
|
|
msg = 'Los datos del certificado son:'
|
|
|
|
log.info(msg)
|
|
|
|
log.info(f'\n{cert}')
|
|
|
|
msg = 'Ya puedes descargar del SAT'
|
|
|
|
log.info(msg)
|
2021-07-14 15:32:30 -05:00
|
|
|
|
2021-07-14 00:12:21 -05:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def base_datos():
|
|
|
|
db.create_tables()
|
|
|
|
return
|
|
|
|
|
|
|
|
|
2021-07-15 18:07:38 -05:00
|
|
|
def _get_cert(data):
|
|
|
|
key = b''
|
|
|
|
pem = b''
|
|
|
|
cer = data['path_cer'].read_bytes()
|
|
|
|
if data['path_enc'].is_file():
|
|
|
|
key = data['path_enc'].read_bytes()
|
|
|
|
elif data['path_pem'].is_file():
|
|
|
|
pem = data['path_pem'].read_bytes()
|
|
|
|
cert = SATCertificate(cer, key, pem)
|
|
|
|
return cert
|
|
|
|
|
2021-07-30 22:43:40 -05:00
|
|
|
def _to_date(str_date, end=False):
|
|
|
|
error = ''
|
|
|
|
dt = None
|
|
|
|
try:
|
|
|
|
parts = str_date.split('-')
|
|
|
|
if end:
|
2021-08-04 18:38:54 -05:00
|
|
|
dt = datetime(int(parts[0]), int(parts[1]), int(parts[2]), 23, 59, 59)
|
2021-07-30 22:43:40 -05:00
|
|
|
else:
|
2021-08-04 18:38:54 -05:00
|
|
|
dt = datetime(int(parts[0]), int(parts[1]), int(parts[2]), 0, 0, 0)
|
2021-07-30 22:43:40 -05:00
|
|
|
except Exception as e:
|
|
|
|
error = 'Fecha inválida'
|
|
|
|
|
|
|
|
return error, dt
|
|
|
|
|
2021-07-15 18:07:38 -05:00
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
def _validate_requests_args(args):
|
2021-07-14 00:12:21 -05:00
|
|
|
result, data = _validate_fiel_args(args)
|
|
|
|
if not result:
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-15 18:07:38 -05:00
|
|
|
if not data['path_enc'].is_file() and not data['path_pem'].is_file():
|
|
|
|
msg = f"No se encontró la FIEL [enc|pem].\nRuta: {data['path_enc']}"
|
2021-07-14 00:12:21 -05:00
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-15 18:07:38 -05:00
|
|
|
cert = _get_cert(data)
|
2021-07-14 00:12:21 -05:00
|
|
|
|
|
|
|
if not cert.is_valid_time:
|
|
|
|
msg = 'La FIEL no es vigente'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data['cert'] = cert
|
|
|
|
data['type'] = args.type
|
|
|
|
data['year'] = args.year
|
|
|
|
data['month'] = args.month
|
|
|
|
data['day'] = args.day
|
|
|
|
|
2021-07-28 22:36:14 -05:00
|
|
|
now = today()
|
|
|
|
|
|
|
|
if args.last_days:
|
|
|
|
date_start = now.replace(hour=0, minute=0, second=0, microsecond=0) \
|
|
|
|
- timedelta(days=args.last_days)
|
|
|
|
date_end = now.replace(hour=23, minute=59, second=59, microsecond=0)
|
|
|
|
data['date_start'] = date_start
|
|
|
|
data['date_end'] = date_end
|
|
|
|
return True, data
|
|
|
|
|
2021-07-30 22:43:40 -05:00
|
|
|
if args.date_start and args.date_end:
|
|
|
|
error, date_start = _to_date(args.date_start)
|
|
|
|
if error:
|
|
|
|
log.error(error)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
error, date_end = _to_date(args.date_end, True)
|
|
|
|
if error:
|
|
|
|
log.error(error)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data['date_start'] = date_start
|
|
|
|
data['date_end'] = date_end
|
|
|
|
return True, data
|
|
|
|
|
2021-07-14 00:12:21 -05:00
|
|
|
if data['day']:
|
|
|
|
if not validate_date(data['year'], data['month'], data['day']):
|
|
|
|
msg = 'Fecha inválida'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-14 12:14:58 -05:00
|
|
|
month1 = month2 = data['month']
|
|
|
|
if month1 == 0:
|
|
|
|
month1 = 1
|
|
|
|
month2 = 12
|
|
|
|
|
|
|
|
if data['day']:
|
|
|
|
day1 = day2 = data['day']
|
|
|
|
else:
|
|
|
|
day1 = 1
|
|
|
|
day2 = monthrange(data['year'], month2)[1]
|
|
|
|
|
|
|
|
data['date_start'] = datetime(data['year'], month1, day1)
|
|
|
|
data['date_end'] = datetime(data['year'], month2, day2, 23, 59, 59)
|
|
|
|
|
2021-07-14 00:12:21 -05:00
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
def _validate_verificar_args(args):
|
|
|
|
result, data = _validate_fiel_args(args)
|
|
|
|
if not result:
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not data['path_enc'].is_file():
|
|
|
|
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
cer = data['path_cer'].read_bytes()
|
|
|
|
key = data['path_enc'].read_bytes()
|
|
|
|
cert = SATCertificate(cer, key)
|
|
|
|
|
|
|
|
if not cert.is_valid_time:
|
|
|
|
msg = 'La FIEL no es vigente'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not args.id_request:
|
|
|
|
msg = 'El ID de solicitud de descarga es requerido'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data['cert'] = cert
|
|
|
|
data['id'] = args.id_request
|
|
|
|
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
|
|
|
def _validate_download_args(args):
|
|
|
|
result, data = _validate_fiel_args(args)
|
|
|
|
if not result:
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not data['path_enc'].is_file():
|
|
|
|
msg = f"No se encontró la FIEL encriptada. \nRuta: {data['path_enc']}"
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
cer = data['path_cer'].read_bytes()
|
|
|
|
key = data['path_enc'].read_bytes()
|
|
|
|
cert = SATCertificate(cer, key)
|
|
|
|
|
|
|
|
if not cert.is_valid_time:
|
|
|
|
msg = 'La FIEL no es vigente'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not args.id_request and not args.id_file:
|
|
|
|
msg = 'El ID de solicitud o ID de archivo de descarga es requerido'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-14 23:06:48 -05:00
|
|
|
if not args.path_download:
|
|
|
|
msg = 'La ruta de descarga es requerida [-dd]'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
data['cert'] = cert
|
|
|
|
data['id'] = args.id_request
|
|
|
|
data['id_file'] = args.id_file
|
2021-07-14 23:06:48 -05:00
|
|
|
data['path'] = args.path_download
|
2021-07-14 21:20:57 -05:00
|
|
|
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
2021-07-15 13:14:14 -05:00
|
|
|
def _request_download(sat, data, key):
|
|
|
|
data['rfc'] = key
|
|
|
|
result = sat.request_download(data)
|
|
|
|
# ~ Usando un simple print, permite capturarlo desde cualquier lenguaje
|
|
|
|
print(result)
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
def solicitar_descarga(args):
|
|
|
|
result, data = _validate_requests_args(args)
|
2021-07-14 00:12:21 -05:00
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
sat = SATWebService(data['cert'])
|
2021-07-14 12:14:58 -05:00
|
|
|
if not sat.is_authenticate:
|
2021-07-14 00:12:21 -05:00
|
|
|
log.error(sat.error)
|
2021-07-14 12:14:58 -05:00
|
|
|
return
|
|
|
|
|
2021-07-15 13:14:14 -05:00
|
|
|
if data['type'] == 'e':
|
|
|
|
_request_download(sat, data, 'RfcEmisor')
|
|
|
|
elif data['type'] == 'r':
|
|
|
|
_request_download(sat, data, 'RfcReceptor')
|
|
|
|
else:
|
|
|
|
_request_download(sat, data, 'RfcEmisor')
|
|
|
|
_request_download(sat, data, 'RfcReceptor')
|
2021-07-14 00:12:21 -05:00
|
|
|
|
|
|
|
return
|
|
|
|
|
2021-07-14 21:20:57 -05:00
|
|
|
|
|
|
|
def verificar_descarga(args):
|
|
|
|
result, data = _validate_verificar_args(args)
|
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
sat = SATWebService(data['cert'])
|
|
|
|
|
|
|
|
if not sat.is_authenticate:
|
|
|
|
log.error(sat.error)
|
|
|
|
return
|
|
|
|
|
|
|
|
result = sat.verify(data)
|
2021-07-15 13:14:14 -05:00
|
|
|
# ~ Usando un simple print, permite capturarlo desde cualquier lenguaje
|
2021-07-14 21:20:57 -05:00
|
|
|
print(result)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
def descargar_archivos(args):
|
|
|
|
result, data = _validate_download_args(args)
|
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
sat = SATWebService(data['cert'])
|
|
|
|
|
|
|
|
if not sat.is_authenticate:
|
|
|
|
log.error(sat.error)
|
|
|
|
return
|
|
|
|
|
|
|
|
if args.id_file:
|
|
|
|
files = (args.id_file,)
|
|
|
|
else:
|
|
|
|
result = sat.verify(data)
|
|
|
|
files = result['files']
|
|
|
|
|
|
|
|
for f in files:
|
|
|
|
data['id_file'] = f
|
|
|
|
result, file_data = sat.download(data)
|
|
|
|
if file_data is None:
|
|
|
|
log.debug(result)
|
|
|
|
else:
|
2021-07-14 23:06:48 -05:00
|
|
|
path_zip = join(data['path'], f'{f}.zip')
|
|
|
|
msg = f'Guardando: {path_zip}'
|
2021-07-14 21:20:57 -05:00
|
|
|
log.info(msg)
|
2021-07-14 23:06:48 -05:00
|
|
|
with open(path_zip, 'wb') as f:
|
2021-07-14 21:20:57 -05:00
|
|
|
f.write(file_data)
|
2021-07-15 13:14:14 -05:00
|
|
|
msg = f's\tArchivo guardado correctamente'
|
|
|
|
log.info(msg)
|
2021-07-14 21:20:57 -05:00
|
|
|
return
|
|
|
|
|
|
|
|
|
2021-07-14 23:06:48 -05:00
|
|
|
def _validate_args(args):
|
|
|
|
result, data = _validate_requests_args(args)
|
|
|
|
if not result:
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
if not args.path_download:
|
|
|
|
msg = 'La ruta de descarga es requerida [-dd]'
|
|
|
|
log.error(msg)
|
|
|
|
return False, {}
|
|
|
|
|
|
|
|
data['path'] = args.path_download
|
|
|
|
|
|
|
|
return True, data
|
|
|
|
|
|
|
|
|
2021-07-15 13:14:14 -05:00
|
|
|
def _download(sat, data, key):
|
2021-07-14 23:06:48 -05:00
|
|
|
OK = '5000'
|
|
|
|
|
2021-07-15 13:14:14 -05:00
|
|
|
result = _request_download(sat, data, key)
|
2021-07-14 23:06:48 -05:00
|
|
|
if result['CodEstatus'] != OK:
|
|
|
|
log.error(result)
|
|
|
|
return
|
|
|
|
|
2021-07-15 13:14:14 -05:00
|
|
|
data['id'] = result['IdSolicitud']
|
|
|
|
|
2021-07-15 14:11:10 -05:00
|
|
|
msg = f"Descarga aceptada con el ID: {data['id']}\n\nEsperando un minuto para verificar la descarga..."
|
2021-07-14 23:15:38 -05:00
|
|
|
log.info(msg)
|
2021-07-15 14:08:02 -05:00
|
|
|
for i in range(60, 0, -1):
|
|
|
|
print(f'\r{i}', end=' ')
|
|
|
|
sleep(1)
|
2021-07-14 23:15:38 -05:00
|
|
|
|
2021-07-14 23:06:48 -05:00
|
|
|
while True:
|
|
|
|
result = sat.verify(data)
|
|
|
|
if result['EstadoSolicitud'] in ('1', '2'):
|
2021-07-14 23:15:38 -05:00
|
|
|
msg = 'Esperando un minuto más para volver a verificar...'
|
2021-07-14 23:06:48 -05:00
|
|
|
log.info(msg)
|
2021-07-15 14:11:10 -05:00
|
|
|
for i in range(60, 0, -1):
|
|
|
|
print(f'\r{i}', end=' ')
|
|
|
|
sleep(1)
|
2021-07-14 23:06:48 -05:00
|
|
|
continue
|
|
|
|
|
|
|
|
if result['EstadoSolicitud'] == '3':
|
|
|
|
for f in result['files']:
|
|
|
|
data['id_file'] = f
|
|
|
|
result, file_data = sat.download(data)
|
|
|
|
if file_data is None:
|
|
|
|
log.debug(result)
|
|
|
|
else:
|
|
|
|
path_zip = join(data['path'], f'{f}.zip')
|
|
|
|
msg = f'Guardando: {path_zip}'
|
|
|
|
log.info(msg)
|
|
|
|
with open(path_zip, 'wb') as f:
|
|
|
|
f.write(file_data)
|
2021-07-14 23:12:42 -05:00
|
|
|
msg = f'\tArchivo guardado correctamente'
|
|
|
|
log.info(msg)
|
2021-07-14 23:06:48 -05:00
|
|
|
break
|
|
|
|
|
|
|
|
log.error(result)
|
2021-07-14 23:12:42 -05:00
|
|
|
break
|
2021-07-14 23:06:48 -05:00
|
|
|
|
|
|
|
return
|
2021-07-14 21:20:57 -05:00
|
|
|
|
2021-07-15 13:14:14 -05:00
|
|
|
|
|
|
|
def descargar(args):
|
|
|
|
result, data = _validate_args(args)
|
|
|
|
if not result:
|
|
|
|
return
|
|
|
|
|
|
|
|
sat = SATWebService(data['cert'])
|
|
|
|
|
|
|
|
if not sat.is_authenticate:
|
|
|
|
log.error(sat.error)
|
|
|
|
return
|
|
|
|
|
|
|
|
if data['type'] == 'e':
|
|
|
|
_download(sat, data, 'RfcEmisor')
|
|
|
|
elif data['type'] == 'r':
|
|
|
|
_download(sat, data, 'RfcReceptor')
|
|
|
|
else:
|
|
|
|
_download(sat, data, 'RfcEmisor')
|
|
|
|
_download(sat, data, 'RfcReceptor')
|
|
|
|
|
|
|
|
return
|
|
|
|
|