339 lines
8.5 KiB
Python
339 lines
8.5 KiB
Python
#!/usr/bin/env python
|
|
|
|
import datetime
|
|
import getpass
|
|
import hashlib
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
import tempfile
|
|
import unicodedata
|
|
import uuid
|
|
|
|
from dateutil import parser
|
|
|
|
from settings import DEBUG, log, template_lookup, COMPANIES, DB_SAT
|
|
|
|
|
|
#~ def _get_hash(password):
|
|
#~ return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
#~ def validate_password(hashed, password):
|
|
#~ return bcrypt.hashpw(password.encode(), hashed.encode()) == hashed.encode()
|
|
|
|
|
|
def _call(args):
|
|
return subprocess.check_output(args, shell=True).decode()
|
|
|
|
|
|
def _save_temp(data, modo='wb'):
|
|
path = tempfile.mkstemp()[1]
|
|
with open(path, modo) as f:
|
|
f.write(data)
|
|
return path
|
|
|
|
|
|
def get_pass():
|
|
password = getpass.getpass('Introduce la contraseña: ')
|
|
pass2 = getpass.getpass('Confirma la contraseña: ')
|
|
|
|
if password != pass2:
|
|
msg = 'Las contraseñas son diferentes'
|
|
return False, msg
|
|
password = password.strip()
|
|
if not password:
|
|
msg = 'La contraseña es necesaria'
|
|
return False, msg
|
|
|
|
return True, password
|
|
|
|
|
|
def get_value(arg):
|
|
value = input('Introduce el {}: '.format(arg)).strip()
|
|
if not value:
|
|
msg = 'El {} es requerido'.format(arg)
|
|
log.error(msg)
|
|
return ''
|
|
return value
|
|
|
|
|
|
def _get_args(rfc):
|
|
con = sqlite3.connect(COMPANIES)
|
|
cursor = con.cursor()
|
|
sql = "SELECT con FROM names WHERE rfc=?"
|
|
cursor.execute(sql, (rfc,))
|
|
values = cursor.fetchone()
|
|
if values is None:
|
|
msg = 'No se encontró el RFC'
|
|
log.error(msg)
|
|
return ''
|
|
|
|
cursor.close()
|
|
con.close()
|
|
return values[0]
|
|
|
|
|
|
def get_con(rfc=''):
|
|
if not rfc:
|
|
rfc = get_value('RFC').upper()
|
|
if not rfc:
|
|
return False
|
|
|
|
args = _get_args(rfc.upper())
|
|
if not args:
|
|
return False
|
|
return loads(args)
|
|
|
|
|
|
def get_rfcs():
|
|
con = sqlite3.connect(COMPANIES)
|
|
cursor = con.cursor()
|
|
sql = "SELECT * FROM names"
|
|
cursor.execute(sql)
|
|
values = cursor.fetchall()
|
|
cursor.close()
|
|
con.close()
|
|
return values
|
|
|
|
|
|
def get_sat_key(table, key):
|
|
con = sqlite3.connect(DB_SAT)
|
|
cursor = con.cursor()
|
|
sql = 'SELECT key, description FROM {} WHERE key=?'.format(table)
|
|
cursor.execute(sql, (key,))
|
|
data = cursor.fetchone()
|
|
cursor.close()
|
|
con.close()
|
|
if data is None:
|
|
return {'ok': False, 'text': 'No se encontró la clave'}
|
|
return {'ok': True, 'text': data[1]}
|
|
|
|
|
|
def now():
|
|
return datetime.datetime.now().replace(microsecond=0)
|
|
|
|
|
|
def get_token():
|
|
return _get_hash(uuid.uuid4().hex)
|
|
|
|
|
|
def get_mimetype(path):
|
|
mt = mimetypes.guess_type(path)[0]
|
|
return mt or 'application/octet-stream'
|
|
|
|
|
|
def is_file(path):
|
|
return os.path.isfile(path)
|
|
|
|
|
|
def get_stream(path):
|
|
return get_file(path), get_size(path)
|
|
|
|
|
|
def get_file(path):
|
|
return open(path, 'rb')
|
|
|
|
|
|
def get_size(path):
|
|
return os.path.getsize(path)
|
|
|
|
|
|
def get_template(name, data={}):
|
|
#~ print ('NAME', name, data)
|
|
template = template_lookup.get_template(name)
|
|
return template.render(**data)
|
|
|
|
|
|
def dumps(data):
|
|
return json.dumps(data, default=str)
|
|
|
|
|
|
def loads(data):
|
|
return json.loads(data)
|
|
|
|
|
|
def clean(values):
|
|
for k, v in values.items():
|
|
if isinstance(v, str):
|
|
values[k] = v.strip()
|
|
return values
|
|
|
|
|
|
def parse_con(values):
|
|
data = values.split('|')
|
|
try:
|
|
con = {'type': data[0]}
|
|
if con['type'] == 'sqlite':
|
|
con['name'] = data[1]
|
|
else:
|
|
if data[1]:
|
|
con['host'] = data[1]
|
|
if data[2]:
|
|
con['port'] = data[2]
|
|
con['name'] = data[3]
|
|
con['user'] = data[4]
|
|
con['password'] = data[5]
|
|
return con
|
|
except IndexError:
|
|
return {}
|
|
|
|
|
|
def spaces(value):
|
|
return ' '.join(value.split())
|
|
|
|
|
|
def to_slug(string):
|
|
value = (unicodedata.normalize('NFKD', string)
|
|
.encode('ascii', 'ignore')
|
|
.decode('ascii').lower())
|
|
return value
|
|
|
|
|
|
class Certificado(object):
|
|
|
|
def __init__(self, key, cer):
|
|
self._key = key
|
|
self._cer = cer
|
|
self._modulus = ''
|
|
self._save_files()
|
|
self.error = ''
|
|
|
|
def _save_files(self):
|
|
try:
|
|
self._path_key = _save_temp(self._key)
|
|
self._path_cer = _save_temp(self._cer)
|
|
except:
|
|
self._path_key = ''
|
|
self._path_cer = ''
|
|
return
|
|
|
|
def _kill(self, path):
|
|
try:
|
|
os.remove(path)
|
|
except:
|
|
pass
|
|
return
|
|
|
|
def _get_info_cer(self, session_rfc):
|
|
data = {}
|
|
args = 'openssl x509 -inform DER -in {}'
|
|
try:
|
|
cer_pem = _call(args.format(self._path_cer))
|
|
except Exception as e:
|
|
self.error = 'No se pudo convertir el CER en PEM'
|
|
return data
|
|
|
|
args = 'openssl enc -base64 -in {}'
|
|
try:
|
|
cer_txt = _call(args.format(self._path_cer))
|
|
except Exception as e:
|
|
self.error = 'No se pudo convertir el CER en TXT'
|
|
return data
|
|
|
|
args = 'openssl x509 -inform DER -in {} -noout -{}'
|
|
try:
|
|
result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3]
|
|
except Exception as e:
|
|
self.error = 'No se puede saber si es FIEL'
|
|
return data
|
|
|
|
if result == 'SSL server : No':
|
|
self.error = 'El certificado es FIEL'
|
|
return data
|
|
|
|
result = _call(args.format(self._path_cer, 'serial'))
|
|
serie = result.split('=')[1].split('\n')[0][1::2]
|
|
result = _call(args.format(self._path_cer, 'subject'))
|
|
rfc = result.split('=')[5].split('/')[0].strip()
|
|
|
|
if not DEBUG:
|
|
if not rfc == session_rfc:
|
|
self.error = 'El RFC del certificado no corresponde.'
|
|
return data
|
|
|
|
dates = _call(args.format(self._path_cer, 'dates')).split('\n')
|
|
desde = parser.parse(dates[0].split('=')[1])
|
|
hasta = parser.parse(dates[1].split('=')[1])
|
|
self._modulus = _call(args.format(self._path_cer, 'modulus'))
|
|
|
|
data['cer'] = self._cer
|
|
data['cer_tmp'] = None
|
|
data['cer_pem'] = cer_pem
|
|
data['cer_txt'] = cer_txt.replace('\n', '')
|
|
data['serie'] = serie
|
|
data['rfc'] = rfc
|
|
data['desde'] = desde
|
|
data['hasta'] = hasta
|
|
return data
|
|
|
|
def _get_p12(self, password, rfc):
|
|
tmp_cer = tempfile.mkstemp()[1]
|
|
tmp_key = tempfile.mkstemp()[1]
|
|
tmp_p12 = tempfile.mkstemp()[1]
|
|
|
|
args = 'openssl x509 -inform DER -in "{}" -out "{}"'
|
|
_call(args.format(self._path_cer, tmp_cer))
|
|
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} -out "{}"'
|
|
_call(args.format(self._path_key, password, tmp_key))
|
|
|
|
args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" -passout ' \
|
|
'pass:"{}" -out "{}"'
|
|
_call(args.format(tmp_cer, tmp_key, rfc,
|
|
hashlib.md5(rfc.encode()).hexdigest(), tmp_p12))
|
|
data = open(tmp_p12, 'rb').read()
|
|
|
|
self._kill(tmp_cer)
|
|
self._kill(tmp_key)
|
|
self._kill(tmp_p12)
|
|
|
|
return data
|
|
|
|
def _get_info_key(self, password, rfc):
|
|
data = {}
|
|
|
|
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{}'
|
|
try:
|
|
result = _call(args.format(self._path_key, password))
|
|
except Exception as e:
|
|
self.error = 'Contraseña incorrecta'
|
|
return data
|
|
|
|
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
|
|
'openssl rsa -noout -modulus'
|
|
mod_key = _call(args.format(self._path_key, password))
|
|
|
|
if self._modulus != mod_key:
|
|
self.error = 'Los archivos no son pareja'
|
|
return data
|
|
|
|
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
|
|
'openssl rsa -des3 -passout pass:{}'.format(
|
|
self._path_key, password, hashlib.md5(rfc.encode()).hexdigest())
|
|
key_enc = _call(args)
|
|
|
|
data['key'] = self._key
|
|
data['key_tmp'] = None
|
|
data['key_enc'] = key_enc
|
|
data['p12'] = self._get_p12(password, rfc)
|
|
return data
|
|
|
|
def validate(self, password, rfc):
|
|
if not self._path_key or not self._path_cer:
|
|
self.error = 'Error al cargar el certificado'
|
|
return {}
|
|
|
|
data = self._get_info_cer(rfc)
|
|
llave = self._get_info_key(password, rfc)
|
|
if not llave:
|
|
return {}
|
|
|
|
data.update(llave)
|
|
|
|
self._kill(self._path_key)
|
|
self._kill(self._path_cer)
|
|
return data
|