empresa-libre/source/app/controllers/util.py

339 lines
8.5 KiB
Python
Raw Normal View History

2017-06-27 15:43:02 -05:00
#!/usr/bin/env python
import datetime
import getpass
2017-10-08 22:01:19 -05:00
import hashlib
2017-06-27 15:43:02 -05:00
import json
import mimetypes
import os
import re
2017-09-30 23:14:44 -05:00
import sqlite3
2017-10-08 22:01:19 -05:00
import subprocess
import tempfile
import unicodedata
2017-06-27 15:43:02 -05:00
import uuid
2017-10-08 22:01:19 -05:00
from dateutil import parser
2017-06-27 15:43:02 -05:00
2017-10-08 22:01:19 -05:00
from settings import DEBUG, log, template_lookup, COMPANIES, DB_SAT
2017-06-27 15:43:02 -05:00
#~ def _get_hash(password):
#~ return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
2017-06-27 15:43:02 -05:00
#~ def validate_password(hashed, password):
#~ return bcrypt.hashpw(password.encode(), hashed.encode()) == hashed.encode()
2017-06-27 15:43:02 -05:00
2017-10-08 22:01:19 -05:00
def _call(args):
return subprocess.check_output(args, shell=True).decode()
def _save_temp(data, modo='wb'):
path = tempfile.mkstemp()[1]
with open(path, modo) as f:
f.write(data)
return path
2017-06-27 15:43:02 -05:00
def get_pass():
password = getpass.getpass('Introduce la contraseƱa: ')
pass2 = getpass.getpass('Confirma la contraseƱa: ')
if password != pass2:
msg = 'Las contraseƱas son diferentes'
return False, msg
password = password.strip()
if not password:
msg = 'La contraseƱa es necesaria'
return False, msg
2017-09-30 23:14:44 -05:00
return True, password
def get_value(arg):
value = input('Introduce el {}: '.format(arg)).strip()
if not value:
msg = 'El {} es requerido'.format(arg)
log.error(msg)
return ''
return value
def _get_args(rfc):
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT con FROM names WHERE rfc=?"
cursor.execute(sql, (rfc,))
values = cursor.fetchone()
if values is None:
msg = 'No se encontrĆ³ el RFC'
log.error(msg)
return ''
cursor.close()
con.close()
return values[0]
def get_con(rfc=''):
if not rfc:
rfc = get_value('RFC').upper()
if not rfc:
return False
args = _get_args(rfc.upper())
if not args:
return False
return loads(args)
def get_rfcs():
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT * FROM names"
cursor.execute(sql)
values = cursor.fetchall()
cursor.close()
con.close()
return values
2017-06-27 15:43:02 -05:00
2017-10-04 00:11:49 -05:00
def get_sat_key(table, key):
con = sqlite3.connect(DB_SAT)
cursor = con.cursor()
sql = 'SELECT key, description FROM {} WHERE key=?'.format(table)
cursor.execute(sql, (key,))
data = cursor.fetchone()
cursor.close()
con.close()
if data is None:
return {'ok': False, 'text': 'No se encontrĆ³ la clave'}
return {'ok': True, 'text': data[1]}
2017-06-27 15:43:02 -05:00
def now():
2017-10-07 00:16:58 -05:00
return datetime.datetime.now().replace(microsecond=0)
2017-06-27 15:43:02 -05:00
def get_token():
return _get_hash(uuid.uuid4().hex)
def get_mimetype(path):
mt = mimetypes.guess_type(path)[0]
return mt or 'application/octet-stream'
def is_file(path):
return os.path.isfile(path)
def get_stream(path):
return get_file(path), get_size(path)
def get_file(path):
return open(path, 'rb')
def get_size(path):
return os.path.getsize(path)
def get_template(name, data={}):
#~ print ('NAME', name, data)
template = template_lookup.get_template(name)
return template.render(**data)
def dumps(data):
return json.dumps(data, default=str)
2017-09-30 23:14:44 -05:00
def loads(data):
return json.loads(data)
2017-06-27 15:43:02 -05:00
def clean(values):
for k, v in values.items():
if isinstance(v, str):
values[k] = v.strip()
return values
2017-09-30 00:22:55 -05:00
def parse_con(values):
data = values.split('|')
try:
con = {'type': data[0]}
if con['type'] == 'sqlite':
con['name'] = data[1]
else:
if data[1]:
con['host'] = data[1]
if data[2]:
con['port'] = data[2]
con['name'] = data[3]
con['user'] = data[4]
con['password'] = data[5]
return con
except IndexError:
return {}
def spaces(value):
return ' '.join(value.split())
def to_slug(string):
value = (unicodedata.normalize('NFKD', string)
.encode('ascii', 'ignore')
.decode('ascii').lower())
return value
2017-10-08 22:01:19 -05:00
class Certificado(object):
def __init__(self, key, cer):
self._key = key
self._cer = cer
self._modulus = ''
self._save_files()
self.error = ''
def _save_files(self):
try:
self._path_key = _save_temp(self._key)
self._path_cer = _save_temp(self._cer)
except:
self._path_key = ''
self._path_cer = ''
return
def _kill(self, path):
try:
os.remove(path)
except:
pass
return
def _get_info_cer(self, session_rfc):
data = {}
args = 'openssl x509 -inform DER -in {}'
try:
cer_pem = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en PEM'
return data
args = 'openssl enc -base64 -in {}'
try:
cer_txt = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en TXT'
return data
args = 'openssl x509 -inform DER -in {} -noout -{}'
try:
result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3]
except Exception as e:
self.error = 'No se puede saber si es FIEL'
return data
if result == 'SSL server : No':
self.error = 'El certificado es FIEL'
return data
result = _call(args.format(self._path_cer, 'serial'))
serie = result.split('=')[1].split('\n')[0][1::2]
result = _call(args.format(self._path_cer, 'subject'))
rfc = result.split('=')[5].split('/')[0].strip()
if not DEBUG:
if not rfc == session_rfc:
self.error = 'El RFC del certificado no corresponde.'
return data
dates = _call(args.format(self._path_cer, 'dates')).split('\n')
desde = parser.parse(dates[0].split('=')[1])
hasta = parser.parse(dates[1].split('=')[1])
self._modulus = _call(args.format(self._path_cer, 'modulus'))
data['cer'] = self._cer
data['cer_tmp'] = None
data['cer_pem'] = cer_pem
data['cer_txt'] = cer_txt.replace('\n', '')
data['serie'] = serie
data['rfc'] = rfc
data['desde'] = desde
data['hasta'] = hasta
return data
def _get_p12(self, password, rfc):
tmp_cer = tempfile.mkstemp()[1]
tmp_key = tempfile.mkstemp()[1]
tmp_p12 = tempfile.mkstemp()[1]
args = 'openssl x509 -inform DER -in "{}" -out "{}"'
_call(args.format(self._path_cer, tmp_cer))
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} -out "{}"'
_call(args.format(self._path_key, password, tmp_key))
args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" -passout ' \
'pass:"{}" -out "{}"'
_call(args.format(tmp_cer, tmp_key, rfc,
hashlib.md5(rfc.encode()).hexdigest(), tmp_p12))
data = open(tmp_p12, 'rb').read()
self._kill(tmp_cer)
self._kill(tmp_key)
self._kill(tmp_p12)
return data
def _get_info_key(self, password, rfc):
data = {}
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{}'
try:
result = _call(args.format(self._path_key, password))
except Exception as e:
self.error = 'ContraseƱa incorrecta'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
'openssl rsa -noout -modulus'
mod_key = _call(args.format(self._path_key, password))
if self._modulus != mod_key:
self.error = 'Los archivos no son pareja'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
'openssl rsa -des3 -passout pass:{}'.format(
self._path_key, password, hashlib.md5(rfc.encode()).hexdigest())
key_enc = _call(args)
data['key'] = self._key
data['key_tmp'] = None
data['key_enc'] = key_enc
data['p12'] = self._get_p12(password, rfc)
return data
def validate(self, password, rfc):
if not self._path_key or not self._path_cer:
self.error = 'Error al cargar el certificado'
return {}
data = self._get_info_cer(rfc)
llave = self._get_info_key(password, rfc)
if not llave:
return {}
data.update(llave)
self._kill(self._path_key)
self._kill(self._path_cer)
return data