#!/usr/bin/env python import datetime import getpass import hashlib import json import mimetypes import os import re import sqlite3 import subprocess import tempfile import unicodedata import uuid from dateutil import parser from settings import DEBUG, log, template_lookup, COMPANIES, DB_SAT, \ PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL #~ def _get_hash(password): #~ return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() #~ def validate_password(hashed, password): #~ return bcrypt.hashpw(password.encode(), hashed.encode()) == hashed.encode() def _call(args): return subprocess.check_output(args, shell=True).decode() def _get_md5(data): return hashlib.md5(data.encode()).hexdigest() def _save_temp(data, modo='wb'): path = tempfile.mkstemp()[1] with open(path, modo) as f: f.write(data) return path def _join(*paths): return os.path.join(*paths) def _kill(path): try: os.remove(path) except: pass return def get_pass(): password = getpass.getpass('Introduce la contraseña: ') pass2 = getpass.getpass('Confirma la contraseña: ') if password != pass2: msg = 'Las contraseñas son diferentes' return False, msg password = password.strip() if not password: msg = 'La contraseña es necesaria' return False, msg return True, password def get_value(arg): value = input('Introduce el {}: '.format(arg)).strip() if not value: msg = 'El {} es requerido'.format(arg) log.error(msg) return '' return value def _get_args(rfc): con = sqlite3.connect(COMPANIES) cursor = con.cursor() sql = "SELECT con FROM names WHERE rfc=?" cursor.execute(sql, (rfc,)) values = cursor.fetchone() if values is None: msg = 'No se encontró el RFC' log.error(msg) return '' cursor.close() con.close() return values[0] def get_con(rfc=''): if not rfc: rfc = get_value('RFC').upper() if not rfc: return False args = _get_args(rfc.upper()) if not args: return False return loads(args) def get_rfcs(): con = sqlite3.connect(COMPANIES) cursor = con.cursor() sql = "SELECT * FROM names" cursor.execute(sql) values = cursor.fetchall() cursor.close() con.close() return values def get_sat_key(table, key): con = sqlite3.connect(DB_SAT) cursor = con.cursor() sql = 'SELECT key, description FROM {} WHERE key=?'.format(table) cursor.execute(sql, (key,)) data = cursor.fetchone() cursor.close() con.close() if data is None: return {'ok': False, 'text': 'No se encontró la clave'} return {'ok': True, 'text': data[1]} def now(): return datetime.datetime.now().replace(microsecond=0) def get_token(): return _get_hash(uuid.uuid4().hex) def get_mimetype(path): mt = mimetypes.guess_type(path)[0] return mt or 'application/octet-stream' def is_file(path): return os.path.isfile(path) def get_stream(path): return get_file(path), get_size(path) def get_file(path): return open(path, 'rb') def get_size(path): return os.path.getsize(path) def get_template(name, data={}): #~ print ('NAME', name, data) template = template_lookup.get_template(name) return template.render(**data) def dumps(data): return json.dumps(data, default=str) def loads(data): return json.loads(data) def clean(values): for k, v in values.items(): if isinstance(v, str): values[k] = v.strip() return values def parse_con(values): data = values.split('|') try: con = {'type': data[0]} if con['type'] == 'sqlite': con['name'] = data[1] else: if data[1]: con['host'] = data[1] if data[2]: con['port'] = data[2] con['name'] = data[3] con['user'] = data[4] con['password'] = data[5] return con except IndexError: return {} def spaces(value): return ' '.join(value.split()) def to_slug(string): value = (unicodedata.normalize('NFKD', string) .encode('ascii', 'ignore') .decode('ascii').lower()) return value class Certificado(object): def __init__(self, key, cer): self._key = key self._cer = cer self._modulus = '' self._save_files() self.error = '' def _save_files(self): try: self._path_key = _save_temp(self._key) self._path_cer = _save_temp(self._cer) except: self._path_key = '' self._path_cer = '' return def _kill(self, path): try: os.remove(path) except: pass return def _get_info_cer(self, session_rfc): data = {} args = 'openssl x509 -inform DER -in {}' try: cer_pem = _call(args.format(self._path_cer)) except Exception as e: self.error = 'No se pudo convertir el CER en PEM' return data args = 'openssl enc -base64 -in {}' try: cer_txt = _call(args.format(self._path_cer)) except Exception as e: self.error = 'No se pudo convertir el CER en TXT' return data args = 'openssl x509 -inform DER -in {} -noout -{}' try: result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3] except Exception as e: self.error = 'No se puede saber si es FIEL' return data if result == 'SSL server : No': self.error = 'El certificado es FIEL' return data result = _call(args.format(self._path_cer, 'serial')) serie = result.split('=')[1].split('\n')[0][1::2] result = _call(args.format(self._path_cer, 'subject')) rfc = result.split('=')[5].split('/')[0].strip() if not DEBUG: if not rfc == session_rfc: self.error = 'El RFC del certificado no corresponde.' return data dates = _call(args.format(self._path_cer, 'dates')).split('\n') desde = parser.parse(dates[0].split('=')[1]) hasta = parser.parse(dates[1].split('=')[1]) self._modulus = _call(args.format(self._path_cer, 'modulus')) data['cer'] = self._cer data['cer_tmp'] = None data['cer_pem'] = cer_pem data['cer_txt'] = cer_txt.replace('\n', '') data['serie'] = serie data['rfc'] = rfc data['desde'] = desde data['hasta'] = hasta return data def _get_p12(self, password, rfc): tmp_cer = tempfile.mkstemp()[1] tmp_key = tempfile.mkstemp()[1] tmp_p12 = tempfile.mkstemp()[1] args = 'openssl x509 -inform DER -in "{}" -out "{}"' _call(args.format(self._path_cer, tmp_cer)) args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} -out "{}"' _call(args.format(self._path_key, password, tmp_key)) args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" -passout ' \ 'pass:"{}" -out "{}"' _call(args.format(tmp_cer, tmp_key, rfc, hashlib.md5(rfc.encode()).hexdigest(), tmp_p12)) data = open(tmp_p12, 'rb').read() self._kill(tmp_cer) self._kill(tmp_key) self._kill(tmp_p12) return data def _get_info_key(self, password, rfc): data = {} args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{}' try: result = _call(args.format(self._path_key, password)) except Exception as e: self.error = 'Contraseña incorrecta' return data args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \ 'openssl rsa -noout -modulus' mod_key = _call(args.format(self._path_key, password)) if self._modulus != mod_key: self.error = 'Los archivos no son pareja' return data args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \ 'openssl rsa -des3 -passout pass:{}'.format( self._path_key, password, _get_md5(rfc)) key_enc = _call(args) data['key'] = self._key data['key_tmp'] = None data['key_enc'] = key_enc data['p12'] = self._get_p12(password, rfc) return data def validate(self, password, rfc): if not self._path_key or not self._path_cer: self.error = 'Error al cargar el certificado' return {} data = self._get_info_cer(rfc) llave = self._get_info_key(password, data['rfc']) if not llave: return {} data.update(llave) self._kill(self._path_key) self._kill(self._path_cer) return data def make_xml(data, certificado): from .cfdi_xml import CFDI if DEBUG: data['emisor']['Rfc'] = certificado.rfc data['emisor']['RegimenFiscal'] = '603' cfdi = CFDI() xml = cfdi.get_xml(data) data = { 'xsltproc': PATH_XSLTPROC, 'xslt': _join(PATH_XSLT, 'cadena.xslt'), 'xml': _save_temp(xml, 'w'), 'openssl': PATH_OPENSSL, 'key': _save_temp(certificado.key_enc, 'w'), 'pass': _get_md5(certificado.rfc) } args = '"{xsltproc}" "{xslt}" "{xml}" | ' \ '"{openssl}" dgst -sha256 -sign "{key}" -passin pass:{pass} | ' \ '"{openssl}" enc -base64 -A'.format(**data) sello = _call(args) _kill(data['xml']) _kill(data['key']) return cfdi.add_sello(sello) def timbra_xml(xml): from .pac import Finkok as PAC result = {'ok': True, 'error': ''} pac = PAC() xml = pac.timbra_xml(xml) if not xml: result['ok'] = False result['error'] = pac.error return result result['xml'] = xml result['uuid'] = pac.uuid result['fecha'] = pac.fecha return result