402 lines
9.8 KiB
Python
402 lines
9.8 KiB
Python
#!/usr/bin/env python
|
|
|
|
import datetime
|
|
import getpass
|
|
import hashlib
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
import tempfile
|
|
import unicodedata
|
|
import uuid
|
|
|
|
from dateutil import parser
|
|
|
|
from settings import DEBUG, log, template_lookup, COMPANIES, DB_SAT, \
|
|
PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL
|
|
|
|
|
|
#~ def _get_hash(password):
|
|
#~ return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
#~ def validate_password(hashed, password):
|
|
#~ return bcrypt.hashpw(password.encode(), hashed.encode()) == hashed.encode()
|
|
|
|
|
|
def _call(args):
|
|
return subprocess.check_output(args, shell=True).decode()
|
|
|
|
|
|
def _get_md5(data):
|
|
return hashlib.md5(data.encode()).hexdigest()
|
|
|
|
|
|
def _save_temp(data, modo='wb'):
|
|
path = tempfile.mkstemp()[1]
|
|
with open(path, modo) as f:
|
|
f.write(data)
|
|
return path
|
|
|
|
|
|
def _join(*paths):
|
|
return os.path.join(*paths)
|
|
|
|
|
|
def _kill(path):
|
|
try:
|
|
os.remove(path)
|
|
except:
|
|
pass
|
|
return
|
|
|
|
|
|
def get_pass():
|
|
password = getpass.getpass('Introduce la contraseña: ')
|
|
pass2 = getpass.getpass('Confirma la contraseña: ')
|
|
|
|
if password != pass2:
|
|
msg = 'Las contraseñas son diferentes'
|
|
return False, msg
|
|
password = password.strip()
|
|
if not password:
|
|
msg = 'La contraseña es necesaria'
|
|
return False, msg
|
|
|
|
return True, password
|
|
|
|
|
|
def get_value(arg):
|
|
value = input('Introduce el {}: '.format(arg)).strip()
|
|
if not value:
|
|
msg = 'El {} es requerido'.format(arg)
|
|
log.error(msg)
|
|
return ''
|
|
return value
|
|
|
|
|
|
def _get_args(rfc):
|
|
con = sqlite3.connect(COMPANIES)
|
|
cursor = con.cursor()
|
|
sql = "SELECT con FROM names WHERE rfc=?"
|
|
cursor.execute(sql, (rfc,))
|
|
values = cursor.fetchone()
|
|
if values is None:
|
|
msg = 'No se encontró el RFC'
|
|
log.error(msg)
|
|
return ''
|
|
|
|
cursor.close()
|
|
con.close()
|
|
return values[0]
|
|
|
|
|
|
def get_con(rfc=''):
|
|
if not rfc:
|
|
rfc = get_value('RFC').upper()
|
|
if not rfc:
|
|
return False
|
|
|
|
args = _get_args(rfc.upper())
|
|
if not args:
|
|
return False
|
|
return loads(args)
|
|
|
|
|
|
def get_rfcs():
|
|
con = sqlite3.connect(COMPANIES)
|
|
cursor = con.cursor()
|
|
sql = "SELECT * FROM names"
|
|
cursor.execute(sql)
|
|
values = cursor.fetchall()
|
|
cursor.close()
|
|
con.close()
|
|
return values
|
|
|
|
|
|
def get_sat_key(table, key):
|
|
con = sqlite3.connect(DB_SAT)
|
|
cursor = con.cursor()
|
|
sql = 'SELECT key, description FROM {} WHERE key=?'.format(table)
|
|
cursor.execute(sql, (key,))
|
|
data = cursor.fetchone()
|
|
cursor.close()
|
|
con.close()
|
|
if data is None:
|
|
return {'ok': False, 'text': 'No se encontró la clave'}
|
|
return {'ok': True, 'text': data[1]}
|
|
|
|
|
|
def now():
|
|
return datetime.datetime.now().replace(microsecond=0)
|
|
|
|
|
|
def get_token():
|
|
return _get_hash(uuid.uuid4().hex)
|
|
|
|
|
|
def get_mimetype(path):
|
|
mt = mimetypes.guess_type(path)[0]
|
|
return mt or 'application/octet-stream'
|
|
|
|
|
|
def is_file(path):
|
|
return os.path.isfile(path)
|
|
|
|
|
|
def get_stream(path):
|
|
return get_file(path), get_size(path)
|
|
|
|
|
|
def get_file(path):
|
|
return open(path, 'rb')
|
|
|
|
|
|
def get_size(path):
|
|
return os.path.getsize(path)
|
|
|
|
|
|
def get_template(name, data={}):
|
|
#~ print ('NAME', name, data)
|
|
template = template_lookup.get_template(name)
|
|
return template.render(**data)
|
|
|
|
|
|
def dumps(data):
|
|
return json.dumps(data, default=str)
|
|
|
|
|
|
def loads(data):
|
|
return json.loads(data)
|
|
|
|
|
|
def clean(values):
|
|
for k, v in values.items():
|
|
if isinstance(v, str):
|
|
values[k] = v.strip()
|
|
return values
|
|
|
|
|
|
def parse_con(values):
|
|
data = values.split('|')
|
|
try:
|
|
con = {'type': data[0]}
|
|
if con['type'] == 'sqlite':
|
|
con['name'] = data[1]
|
|
else:
|
|
if data[1]:
|
|
con['host'] = data[1]
|
|
if data[2]:
|
|
con['port'] = data[2]
|
|
con['name'] = data[3]
|
|
con['user'] = data[4]
|
|
con['password'] = data[5]
|
|
return con
|
|
except IndexError:
|
|
return {}
|
|
|
|
|
|
def spaces(value):
|
|
return ' '.join(value.split())
|
|
|
|
|
|
def to_slug(string):
|
|
value = (unicodedata.normalize('NFKD', string)
|
|
.encode('ascii', 'ignore')
|
|
.decode('ascii').lower())
|
|
return value
|
|
|
|
|
|
class Certificado(object):
|
|
|
|
def __init__(self, key, cer):
|
|
self._key = key
|
|
self._cer = cer
|
|
self._modulus = ''
|
|
self._save_files()
|
|
self.error = ''
|
|
|
|
def _save_files(self):
|
|
try:
|
|
self._path_key = _save_temp(self._key)
|
|
self._path_cer = _save_temp(self._cer)
|
|
except:
|
|
self._path_key = ''
|
|
self._path_cer = ''
|
|
return
|
|
|
|
def _kill(self, path):
|
|
try:
|
|
os.remove(path)
|
|
except:
|
|
pass
|
|
return
|
|
|
|
def _get_info_cer(self, session_rfc):
|
|
data = {}
|
|
args = 'openssl x509 -inform DER -in {}'
|
|
try:
|
|
cer_pem = _call(args.format(self._path_cer))
|
|
except Exception as e:
|
|
self.error = 'No se pudo convertir el CER en PEM'
|
|
return data
|
|
|
|
args = 'openssl enc -base64 -in {}'
|
|
try:
|
|
cer_txt = _call(args.format(self._path_cer))
|
|
except Exception as e:
|
|
self.error = 'No se pudo convertir el CER en TXT'
|
|
return data
|
|
|
|
args = 'openssl x509 -inform DER -in {} -noout -{}'
|
|
try:
|
|
result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3]
|
|
except Exception as e:
|
|
self.error = 'No se puede saber si es FIEL'
|
|
return data
|
|
|
|
if result == 'SSL server : No':
|
|
self.error = 'El certificado es FIEL'
|
|
return data
|
|
|
|
result = _call(args.format(self._path_cer, 'serial'))
|
|
serie = result.split('=')[1].split('\n')[0][1::2]
|
|
result = _call(args.format(self._path_cer, 'subject'))
|
|
rfc = result.split('=')[5].split('/')[0].strip()
|
|
|
|
if not DEBUG:
|
|
if not rfc == session_rfc:
|
|
self.error = 'El RFC del certificado no corresponde.'
|
|
return data
|
|
|
|
dates = _call(args.format(self._path_cer, 'dates')).split('\n')
|
|
desde = parser.parse(dates[0].split('=')[1])
|
|
hasta = parser.parse(dates[1].split('=')[1])
|
|
self._modulus = _call(args.format(self._path_cer, 'modulus'))
|
|
|
|
data['cer'] = self._cer
|
|
data['cer_tmp'] = None
|
|
data['cer_pem'] = cer_pem
|
|
data['cer_txt'] = cer_txt.replace('\n', '')
|
|
data['serie'] = serie
|
|
data['rfc'] = rfc
|
|
data['desde'] = desde
|
|
data['hasta'] = hasta
|
|
return data
|
|
|
|
def _get_p12(self, password, rfc):
|
|
tmp_cer = tempfile.mkstemp()[1]
|
|
tmp_key = tempfile.mkstemp()[1]
|
|
tmp_p12 = tempfile.mkstemp()[1]
|
|
|
|
args = 'openssl x509 -inform DER -in "{}" -out "{}"'
|
|
_call(args.format(self._path_cer, tmp_cer))
|
|
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} -out "{}"'
|
|
_call(args.format(self._path_key, password, tmp_key))
|
|
|
|
args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" -passout ' \
|
|
'pass:"{}" -out "{}"'
|
|
_call(args.format(tmp_cer, tmp_key, rfc,
|
|
hashlib.md5(rfc.encode()).hexdigest(), tmp_p12))
|
|
data = open(tmp_p12, 'rb').read()
|
|
|
|
self._kill(tmp_cer)
|
|
self._kill(tmp_key)
|
|
self._kill(tmp_p12)
|
|
|
|
return data
|
|
|
|
def _get_info_key(self, password, rfc):
|
|
data = {}
|
|
|
|
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{}'
|
|
try:
|
|
result = _call(args.format(self._path_key, password))
|
|
except Exception as e:
|
|
self.error = 'Contraseña incorrecta'
|
|
return data
|
|
|
|
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
|
|
'openssl rsa -noout -modulus'
|
|
mod_key = _call(args.format(self._path_key, password))
|
|
|
|
if self._modulus != mod_key:
|
|
self.error = 'Los archivos no son pareja'
|
|
return data
|
|
|
|
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
|
|
'openssl rsa -des3 -passout pass:{}'.format(
|
|
self._path_key, password, _get_md5(rfc))
|
|
key_enc = _call(args)
|
|
|
|
data['key'] = self._key
|
|
data['key_tmp'] = None
|
|
data['key_enc'] = key_enc
|
|
data['p12'] = self._get_p12(password, rfc)
|
|
return data
|
|
|
|
def validate(self, password, rfc):
|
|
if not self._path_key or not self._path_cer:
|
|
self.error = 'Error al cargar el certificado'
|
|
return {}
|
|
|
|
data = self._get_info_cer(rfc)
|
|
llave = self._get_info_key(password, data['rfc'])
|
|
if not llave:
|
|
return {}
|
|
|
|
data.update(llave)
|
|
|
|
self._kill(self._path_key)
|
|
self._kill(self._path_cer)
|
|
return data
|
|
|
|
|
|
def make_xml(data, certificado):
|
|
from .cfdi_xml import CFDI
|
|
|
|
if DEBUG:
|
|
data['emisor']['Rfc'] = certificado.rfc
|
|
data['emisor']['RegimenFiscal'] = '603'
|
|
|
|
cfdi = CFDI()
|
|
xml = cfdi.get_xml(data)
|
|
|
|
data = {
|
|
'xsltproc': PATH_XSLTPROC,
|
|
'xslt': _join(PATH_XSLT, 'cadena.xslt'),
|
|
'xml': _save_temp(xml, 'w'),
|
|
'openssl': PATH_OPENSSL,
|
|
'key': _save_temp(certificado.key_enc, 'w'),
|
|
'pass': _get_md5(certificado.rfc)
|
|
}
|
|
args = '"{xsltproc}" "{xslt}" "{xml}" | ' \
|
|
'"{openssl}" dgst -sha256 -sign "{key}" -passin pass:{pass} | ' \
|
|
'"{openssl}" enc -base64 -A'.format(**data)
|
|
sello = _call(args)
|
|
|
|
_kill(data['xml'])
|
|
_kill(data['key'])
|
|
|
|
return cfdi.add_sello(sello)
|
|
|
|
|
|
def timbra_xml(xml):
|
|
from .pac import Finkok as PAC
|
|
|
|
result = {'ok': True, 'error': ''}
|
|
pac = PAC()
|
|
xml = pac.timbra_xml(xml)
|
|
if not xml:
|
|
result['ok'] = False
|
|
result['error'] = pac.error
|
|
return result
|
|
|
|
result['xml'] = xml
|
|
result['uuid'] = pac.uuid
|
|
result['fecha'] = pac.fecha
|
|
return result
|