empresa-libre/source/app/controllers/util.py

402 lines
9.8 KiB
Python

#!/usr/bin/env python
import datetime
import getpass
import hashlib
import json
import mimetypes
import os
import re
import sqlite3
import subprocess
import tempfile
import unicodedata
import uuid
from dateutil import parser
from settings import DEBUG, log, template_lookup, COMPANIES, DB_SAT, \
PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL
#~ def _get_hash(password):
#~ return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
#~ def validate_password(hashed, password):
#~ return bcrypt.hashpw(password.encode(), hashed.encode()) == hashed.encode()
def _call(args):
return subprocess.check_output(args, shell=True).decode()
def _get_md5(data):
return hashlib.md5(data.encode()).hexdigest()
def _save_temp(data, modo='wb'):
path = tempfile.mkstemp()[1]
with open(path, modo) as f:
f.write(data)
return path
def _join(*paths):
return os.path.join(*paths)
def _kill(path):
try:
os.remove(path)
except:
pass
return
def get_pass():
password = getpass.getpass('Introduce la contraseña: ')
pass2 = getpass.getpass('Confirma la contraseña: ')
if password != pass2:
msg = 'Las contraseñas son diferentes'
return False, msg
password = password.strip()
if not password:
msg = 'La contraseña es necesaria'
return False, msg
return True, password
def get_value(arg):
value = input('Introduce el {}: '.format(arg)).strip()
if not value:
msg = 'El {} es requerido'.format(arg)
log.error(msg)
return ''
return value
def _get_args(rfc):
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT con FROM names WHERE rfc=?"
cursor.execute(sql, (rfc,))
values = cursor.fetchone()
if values is None:
msg = 'No se encontró el RFC'
log.error(msg)
return ''
cursor.close()
con.close()
return values[0]
def get_con(rfc=''):
if not rfc:
rfc = get_value('RFC').upper()
if not rfc:
return False
args = _get_args(rfc.upper())
if not args:
return False
return loads(args)
def get_rfcs():
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT * FROM names"
cursor.execute(sql)
values = cursor.fetchall()
cursor.close()
con.close()
return values
def get_sat_key(table, key):
con = sqlite3.connect(DB_SAT)
cursor = con.cursor()
sql = 'SELECT key, description FROM {} WHERE key=?'.format(table)
cursor.execute(sql, (key,))
data = cursor.fetchone()
cursor.close()
con.close()
if data is None:
return {'ok': False, 'text': 'No se encontró la clave'}
return {'ok': True, 'text': data[1]}
def now():
return datetime.datetime.now().replace(microsecond=0)
def get_token():
return _get_hash(uuid.uuid4().hex)
def get_mimetype(path):
mt = mimetypes.guess_type(path)[0]
return mt or 'application/octet-stream'
def is_file(path):
return os.path.isfile(path)
def get_stream(path):
return get_file(path), get_size(path)
def get_file(path):
return open(path, 'rb')
def get_size(path):
return os.path.getsize(path)
def get_template(name, data={}):
#~ print ('NAME', name, data)
template = template_lookup.get_template(name)
return template.render(**data)
def dumps(data):
return json.dumps(data, default=str)
def loads(data):
return json.loads(data)
def clean(values):
for k, v in values.items():
if isinstance(v, str):
values[k] = v.strip()
return values
def parse_con(values):
data = values.split('|')
try:
con = {'type': data[0]}
if con['type'] == 'sqlite':
con['name'] = data[1]
else:
if data[1]:
con['host'] = data[1]
if data[2]:
con['port'] = data[2]
con['name'] = data[3]
con['user'] = data[4]
con['password'] = data[5]
return con
except IndexError:
return {}
def spaces(value):
return ' '.join(value.split())
def to_slug(string):
value = (unicodedata.normalize('NFKD', string)
.encode('ascii', 'ignore')
.decode('ascii').lower())
return value
class Certificado(object):
def __init__(self, key, cer):
self._key = key
self._cer = cer
self._modulus = ''
self._save_files()
self.error = ''
def _save_files(self):
try:
self._path_key = _save_temp(self._key)
self._path_cer = _save_temp(self._cer)
except:
self._path_key = ''
self._path_cer = ''
return
def _kill(self, path):
try:
os.remove(path)
except:
pass
return
def _get_info_cer(self, session_rfc):
data = {}
args = 'openssl x509 -inform DER -in {}'
try:
cer_pem = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en PEM'
return data
args = 'openssl enc -base64 -in {}'
try:
cer_txt = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en TXT'
return data
args = 'openssl x509 -inform DER -in {} -noout -{}'
try:
result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3]
except Exception as e:
self.error = 'No se puede saber si es FIEL'
return data
if result == 'SSL server : No':
self.error = 'El certificado es FIEL'
return data
result = _call(args.format(self._path_cer, 'serial'))
serie = result.split('=')[1].split('\n')[0][1::2]
result = _call(args.format(self._path_cer, 'subject'))
rfc = result.split('=')[5].split('/')[0].strip()
if not DEBUG:
if not rfc == session_rfc:
self.error = 'El RFC del certificado no corresponde.'
return data
dates = _call(args.format(self._path_cer, 'dates')).split('\n')
desde = parser.parse(dates[0].split('=')[1])
hasta = parser.parse(dates[1].split('=')[1])
self._modulus = _call(args.format(self._path_cer, 'modulus'))
data['cer'] = self._cer
data['cer_tmp'] = None
data['cer_pem'] = cer_pem
data['cer_txt'] = cer_txt.replace('\n', '')
data['serie'] = serie
data['rfc'] = rfc
data['desde'] = desde
data['hasta'] = hasta
return data
def _get_p12(self, password, rfc):
tmp_cer = tempfile.mkstemp()[1]
tmp_key = tempfile.mkstemp()[1]
tmp_p12 = tempfile.mkstemp()[1]
args = 'openssl x509 -inform DER -in "{}" -out "{}"'
_call(args.format(self._path_cer, tmp_cer))
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} -out "{}"'
_call(args.format(self._path_key, password, tmp_key))
args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" -passout ' \
'pass:"{}" -out "{}"'
_call(args.format(tmp_cer, tmp_key, rfc,
hashlib.md5(rfc.encode()).hexdigest(), tmp_p12))
data = open(tmp_p12, 'rb').read()
self._kill(tmp_cer)
self._kill(tmp_key)
self._kill(tmp_p12)
return data
def _get_info_key(self, password, rfc):
data = {}
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{}'
try:
result = _call(args.format(self._path_key, password))
except Exception as e:
self.error = 'Contraseña incorrecta'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
'openssl rsa -noout -modulus'
mod_key = _call(args.format(self._path_key, password))
if self._modulus != mod_key:
self.error = 'Los archivos no son pareja'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
'openssl rsa -des3 -passout pass:{}'.format(
self._path_key, password, _get_md5(rfc))
key_enc = _call(args)
data['key'] = self._key
data['key_tmp'] = None
data['key_enc'] = key_enc
data['p12'] = self._get_p12(password, rfc)
return data
def validate(self, password, rfc):
if not self._path_key or not self._path_cer:
self.error = 'Error al cargar el certificado'
return {}
data = self._get_info_cer(rfc)
llave = self._get_info_key(password, data['rfc'])
if not llave:
return {}
data.update(llave)
self._kill(self._path_key)
self._kill(self._path_cer)
return data
def make_xml(data, certificado):
from .cfdi_xml import CFDI
if DEBUG:
data['emisor']['Rfc'] = certificado.rfc
data['emisor']['RegimenFiscal'] = '603'
cfdi = CFDI()
xml = cfdi.get_xml(data)
data = {
'xsltproc': PATH_XSLTPROC,
'xslt': _join(PATH_XSLT, 'cadena.xslt'),
'xml': _save_temp(xml, 'w'),
'openssl': PATH_OPENSSL,
'key': _save_temp(certificado.key_enc, 'w'),
'pass': _get_md5(certificado.rfc)
}
args = '"{xsltproc}" "{xslt}" "{xml}" | ' \
'"{openssl}" dgst -sha256 -sign "{key}" -passin pass:{pass} | ' \
'"{openssl}" enc -base64 -A'.format(**data)
sello = _call(args)
_kill(data['xml'])
_kill(data['key'])
return cfdi.add_sello(sello)
def timbra_xml(xml):
from .pac import Finkok as PAC
result = {'ok': True, 'error': ''}
pac = PAC()
xml = pac.timbra_xml(xml)
if not xml:
result['ok'] = False
result['error'] = pac.error
return result
result['xml'] = xml
result['uuid'] = pac.uuid
result['fecha'] = pac.fecha
return result