forked from elmau/empresa-libre
Fix - Guardar certificados
This commit is contained in:
parent
4f26f820cc
commit
9eb5b63dc8
|
@ -53,7 +53,7 @@ def _get_md5(data):
|
||||||
return hashlib.md5(data.encode()).hexdigest()
|
return hashlib.md5(data.encode()).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
def _save_temp(data, modo='wb'):
|
def save_temp(data, modo='wb'):
|
||||||
path = tempfile.mkstemp()[1]
|
path = tempfile.mkstemp()[1]
|
||||||
with open(path, modo) as f:
|
with open(path, modo) as f:
|
||||||
f.write(data)
|
f.write(data)
|
||||||
|
@ -277,21 +277,22 @@ def to_slug(string):
|
||||||
|
|
||||||
class Certificado(object):
|
class Certificado(object):
|
||||||
|
|
||||||
def __init__(self, key, cer):
|
def __init__(self, paths):
|
||||||
self._key = key
|
self._path_key = paths['path_key']
|
||||||
self._cer = cer
|
self._path_cer = paths['path_cer']
|
||||||
self._modulus = ''
|
self._modulus = ''
|
||||||
self._save_files()
|
#~ self._save_files()
|
||||||
self.error = ''
|
self.error = ''
|
||||||
|
|
||||||
def _save_files(self):
|
#~ def _save_files(self):
|
||||||
try:
|
#~ try:
|
||||||
self._path_key = _save_temp(self._key)
|
#~ self._path_key = _save_temp(bytes(self._key))
|
||||||
self._path_cer = _save_temp(self._cer)
|
#~ self._path_cer = _save_temp(bytes(self._cer))
|
||||||
except:
|
#~ except Exception as e:
|
||||||
self._path_key = ''
|
#~ log.error(e)
|
||||||
self._path_cer = ''
|
#~ self._path_key = ''
|
||||||
return
|
#~ self._path_cer = ''
|
||||||
|
#~ return
|
||||||
|
|
||||||
def _kill(self, path):
|
def _kill(self, path):
|
||||||
try:
|
try:
|
||||||
|
@ -342,7 +343,7 @@ class Certificado(object):
|
||||||
hasta = parser.parse(dates[1].split('=')[1])
|
hasta = parser.parse(dates[1].split('=')[1])
|
||||||
self._modulus = _call(args.format(self._path_cer, 'modulus'))
|
self._modulus = _call(args.format(self._path_cer, 'modulus'))
|
||||||
|
|
||||||
data['cer'] = self._cer
|
data['cer'] = read_file(self._path_cer)
|
||||||
data['cer_tmp'] = None
|
data['cer_tmp'] = None
|
||||||
data['cer_pem'] = cer_pem
|
data['cer_pem'] = cer_pem
|
||||||
data['cer_txt'] = cer_txt.replace('\n', '')
|
data['cer_txt'] = cer_txt.replace('\n', '')
|
||||||
|
@ -366,7 +367,8 @@ class Certificado(object):
|
||||||
'pass:"{}" -out "{}"'
|
'pass:"{}" -out "{}"'
|
||||||
_call(args.format(tmp_cer, tmp_key, rfc,
|
_call(args.format(tmp_cer, tmp_key, rfc,
|
||||||
hashlib.md5(rfc.encode()).hexdigest(), tmp_p12))
|
hashlib.md5(rfc.encode()).hexdigest(), tmp_p12))
|
||||||
data = open(tmp_p12, 'rb').read()
|
#~ data = open(tmp_p12, 'rb').read()
|
||||||
|
data = read_file(tmp_p12)
|
||||||
|
|
||||||
self._kill(tmp_cer)
|
self._kill(tmp_cer)
|
||||||
self._kill(tmp_key)
|
self._kill(tmp_key)
|
||||||
|
@ -397,7 +399,7 @@ class Certificado(object):
|
||||||
self._path_key, password, _get_md5(rfc))
|
self._path_key, password, _get_md5(rfc))
|
||||||
key_enc = _call(args)
|
key_enc = _call(args)
|
||||||
|
|
||||||
data['key'] = self._key
|
data['key'] = read_file(self._path_key)
|
||||||
data['key_tmp'] = None
|
data['key_tmp'] = None
|
||||||
data['key_enc'] = key_enc
|
data['key_enc'] = key_enc
|
||||||
data['p12'] = self._get_p12(password, rfc)
|
data['p12'] = self._get_p12(password, rfc)
|
||||||
|
@ -405,7 +407,7 @@ class Certificado(object):
|
||||||
|
|
||||||
def validate(self, password, rfc):
|
def validate(self, password, rfc):
|
||||||
if not self._path_key or not self._path_cer:
|
if not self._path_key or not self._path_cer:
|
||||||
self.error = 'Error al cargar el certificado'
|
self.error = 'Error en las rutas temporales del certificado'
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
data = self._get_info_cer(rfc)
|
data = self._get_info_cer(rfc)
|
||||||
|
@ -433,9 +435,9 @@ def make_xml(data, certificado):
|
||||||
data = {
|
data = {
|
||||||
'xsltproc': PATH_XSLTPROC,
|
'xsltproc': PATH_XSLTPROC,
|
||||||
'xslt': _join(PATH_XSLT, 'cadena.xslt'),
|
'xslt': _join(PATH_XSLT, 'cadena.xslt'),
|
||||||
'xml': _save_temp(xml, 'w'),
|
'xml': save_temp(xml, 'w'),
|
||||||
'openssl': PATH_OPENSSL,
|
'openssl': PATH_OPENSSL,
|
||||||
'key': _save_temp(certificado.key_enc, 'w'),
|
'key': save_temp(certificado.key_enc, 'w'),
|
||||||
'pass': _get_md5(certificado.rfc)
|
'pass': _get_md5(certificado.rfc)
|
||||||
}
|
}
|
||||||
args = '"{xsltproc}" "{xslt}" "{xml}" | ' \
|
args = '"{xsltproc}" "{xslt}" "{xml}" | ' \
|
||||||
|
@ -1062,9 +1064,9 @@ def cancel_cfdi(uuid, pk12, rfc, auth):
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
'xmlsec': PATH_XMLSEC,
|
'xmlsec': PATH_XMLSEC,
|
||||||
'pk12': _save_temp(pk12),
|
'pk12': save_temp(pk12),
|
||||||
'pass': _get_md5(rfc),
|
'pass': _get_md5(rfc),
|
||||||
'template': _save_temp(template, 'w'),
|
'template': save_temp(template, 'w'),
|
||||||
}
|
}
|
||||||
args = '"{xmlsec}" --sign --pkcs12 "{pk12}" --pwd {pass} ' \
|
args = '"{xmlsec}" --sign --pkcs12 "{pk12}" --pwd {pass} ' \
|
||||||
'"{template}"'.format(**data)
|
'"{template}"'.format(**data)
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
socket = 127.0.0.1:3033
|
socket = 127.0.0.1:3033
|
||||||
uid = nginx
|
uid = nginx
|
||||||
gid = nginx
|
gid = nginx
|
||||||
|
#~ Establece una ruta accesible para nginx
|
||||||
chdir = /srv/app/empresa-libre/app
|
chdir = /srv/app/empresa-libre/app
|
||||||
wsgi-file = main.py
|
wsgi-file = main.py
|
||||||
callable = app
|
callable = app
|
||||||
|
@ -10,4 +11,5 @@ processes = 4
|
||||||
threads = 4
|
threads = 4
|
||||||
thunder-lock = true
|
thunder-lock = true
|
||||||
#~ stats = 127.0.0.1:9191
|
#~ stats = 127.0.0.1:9191
|
||||||
|
#~ Establece una ruta accesible para nginx
|
||||||
logger = file:/srv/log/empresalibre-uwsgi.log
|
logger = file:/srv/log/empresalibre-uwsgi.log
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
[uwsgi]
|
[uwsgi]
|
||||||
http = 127.0.0.1:8000
|
http = 127.0.0.1:8000
|
||||||
#~ http = 37.228.132.181:9000
|
|
||||||
wsgi-file = main.py
|
wsgi-file = main.py
|
||||||
callable = app
|
callable = app
|
||||||
master = true
|
master = true
|
||||||
|
|
|
@ -20,8 +20,8 @@ class StorageEngine(object):
|
||||||
def add_config(self, values):
|
def add_config(self, values):
|
||||||
return main.Configuracion.add(values)
|
return main.Configuracion.add(values)
|
||||||
|
|
||||||
def add_cert(self, file_object):
|
def add_cert(self, file_obj):
|
||||||
return main.Certificado.add(file_object)
|
return main.Certificado.add(file_obj)
|
||||||
|
|
||||||
def validate_cert(self, values, session):
|
def validate_cert(self, values, session):
|
||||||
return main.Certificado.validate(values, session)
|
return main.Certificado.validate(values, session)
|
||||||
|
|
|
@ -64,6 +64,9 @@ class Configuracion(BaseModel):
|
||||||
clave = TextField(unique=True)
|
clave = TextField(unique=True)
|
||||||
valor = TextField(default='')
|
valor = TextField(default='')
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return '{} = {}'.format(self.clave, self.valor)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_(cls, keys):
|
def get_(cls, keys):
|
||||||
if keys['fields'] == 'correo':
|
if keys['fields'] == 'correo':
|
||||||
|
@ -74,7 +77,14 @@ class Configuracion(BaseModel):
|
||||||
.select()
|
.select()
|
||||||
.where(Configuracion.clave.in_(fields))
|
.where(Configuracion.clave.in_(fields))
|
||||||
)
|
)
|
||||||
values = {r.clave: r.valor for r in data}
|
elif keys['fields'] == 'path_cer':
|
||||||
|
fields = ('path_key', 'path_cer')
|
||||||
|
data = (Configuracion
|
||||||
|
.select()
|
||||||
|
.where(Configuracion.clave.in_(fields))
|
||||||
|
)
|
||||||
|
|
||||||
|
values = {r.clave: r.valor for r in data}
|
||||||
return values
|
return values
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -316,28 +326,26 @@ class Certificado(BaseModel):
|
||||||
return row
|
return row
|
||||||
|
|
||||||
def get_(cls):
|
def get_(cls):
|
||||||
if Certificado.select().count():
|
return Certificado.select()[0]
|
||||||
obj = Certificado.select()[0]
|
|
||||||
else:
|
|
||||||
obj = Certificado()
|
|
||||||
return obj
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add(cls, file_object):
|
def add(cls, file_obj):
|
||||||
obj = cls.get_(cls)
|
if file_obj.filename.endswith('key'):
|
||||||
if file_object.filename.endswith('key'):
|
path_key = util.save_temp(file_obj.file.read())
|
||||||
obj.key_tmp = file_object.file.read()
|
Configuracion.add({'path_key': path_key})
|
||||||
elif file_object.filename.endswith('cer'):
|
elif file_obj.filename.endswith('cer'):
|
||||||
obj.cer_tmp = file_object.file.read()
|
path_cer = util.save_temp(file_obj.file.read())
|
||||||
obj.save()
|
Configuracion.add({'path_cer': path_cer})
|
||||||
return {'status': 'server'}
|
return {'status': 'server'}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def validate(cls, values, session):
|
def validate(cls, values, session):
|
||||||
row = {}
|
row = {}
|
||||||
result = False
|
result = False
|
||||||
|
|
||||||
obj = cls.get_(cls)
|
obj = cls.get_(cls)
|
||||||
cert = util.Certificado(obj.key_tmp, obj.cer_tmp)
|
paths = Configuracion.get_({'fields': 'path_cer'})
|
||||||
|
cert = util.Certificado(paths)
|
||||||
data = cert.validate(values['contra'], session['rfc'])
|
data = cert.validate(values['contra'], session['rfc'])
|
||||||
if data:
|
if data:
|
||||||
msg = 'Certificado guardado correctamente'
|
msg = 'Certificado guardado correctamente'
|
||||||
|
@ -352,9 +360,12 @@ class Certificado(BaseModel):
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
msg = cert.error
|
msg = cert.error
|
||||||
obj.key_tmp = None
|
#~ obj.key_tmp = None
|
||||||
obj.cer_tmp = None
|
#~ obj.cer_tmp = None
|
||||||
obj.save()
|
#~ obj.save()
|
||||||
|
|
||||||
|
Configuracion.add({'path_key': ''})
|
||||||
|
Configuracion.add({'path_cer': ''})
|
||||||
|
|
||||||
return {'ok': result, 'msg': msg, 'data': row}
|
return {'ok': result, 'msg': msg, 'data': row}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue