empresa-libre/source/app/controllers/util.py

1721 lines
50 KiB
Python

#!/usr/bin/env python
import datetime
import getpass
import hashlib
import json
import locale
import mimetypes
import os
import re
import sqlite3
import socket
import subprocess
import tempfile
import threading
import time
import unicodedata
import uuid
import zipfile
from io import BytesIO
from smtplib import SMTPException, SMTPAuthenticationError
from xml.etree import ElementTree as ET
try:
import uno
from com.sun.star.beans import PropertyValue
from com.sun.star.awt import Size
APP_LIBO = True
except:
APP_LIBO = False
import pyqrcode
from dateutil import parser
from .helper import CaseInsensitiveDict, NumLet, SendMail, TemplateInvoice
from settings import DEBUG, log, template_lookup, COMPANIES, DB_SAT, \
PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL, PATH_TEMPLATES, PATH_MEDIA, PRE, \
PATH_XMLSEC, TEMPLATE_CANCEL, DEFAULT_SAT_PRODUCTO, DECIMALES
#~ def _get_hash(password):
#~ return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
#~ def validate_password(hashed, password):
#~ return bcrypt.hashpw(password.encode(), hashed.encode()) == hashed.encode()
def _call(args):
return subprocess.check_output(args, shell=True).decode()
def _get_md5(data):
return hashlib.md5(data.encode()).hexdigest()
def save_temp(data, modo='wb'):
path = tempfile.mkstemp()[1]
with open(path, modo) as f:
f.write(data)
return path
def save_file(path, data, modo='wb'):
try:
with open(path, modo) as f:
f.write(data)
return True
except:
return False
def _join(*paths):
return os.path.join(*paths)
def _kill(path):
try:
os.remove(path)
except:
pass
return
def get_pass():
password = getpass.getpass('Introduce la contraseña: ')
pass2 = getpass.getpass('Confirma la contraseña: ')
if password != pass2:
msg = 'Las contraseñas son diferentes'
return False, msg
password = password.strip()
if not password:
msg = 'La contraseña es necesaria'
return False, msg
return True, password
def get_value(arg):
value = input('Introduce el {}: '.format(arg)).strip()
if not value:
msg = 'El {} es requerido'.format(arg)
log.error(msg)
return ''
return value
def _valid_db_companies():
con = sqlite3.connect(COMPANIES)
sql = """
CREATE TABLE IF NOT EXISTS names(
rfc TEXT NOT NULL COLLATE NOCASE UNIQUE,
con TEXT NOT NULL
);
"""
cursor = con.cursor()
cursor.executescript(sql)
cursor.close()
con.close()
return
def _get_args(rfc):
_valid_db_companies()
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT con FROM names WHERE rfc=?"
cursor.execute(sql, (rfc,))
values = cursor.fetchone()
if values is None:
msg = 'No se encontró el RFC'
log.error(msg)
return ''
cursor.close()
con.close()
return values[0]
def get_rfcs():
_valid_db_companies()
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT * FROM names"
cursor.execute(sql)
values = cursor.fetchall()
cursor.close()
con.close()
return values
def get_con(rfc=''):
if not rfc:
rfc = get_value('RFC').upper()
if not rfc:
return {}
args = _get_args(rfc.upper())
if not args:
return {}
return loads(args)
def get_sat_key(table, key):
con = sqlite3.connect(DB_SAT)
cursor = con.cursor()
sql = 'SELECT key, name FROM {} WHERE key=?'.format(table)
cursor.execute(sql, (key,))
data = cursor.fetchone()
cursor.close()
con.close()
if data is None:
return {'ok': False, 'text': 'No se encontró la clave'}
return {'ok': True, 'text': data[1]}
def get_sat_unidades(key):
con = sqlite3.connect(DB_SAT)
con.row_factory = sqlite3.Row
cursor = con.cursor()
filtro = '%{}%'.format(key)
sql = "SELECT * FROM unidades WHERE key LIKE ? OR name LIKE ?"
cursor.execute(sql, [filtro, filtro])
data = cursor.fetchall()
cursor.close()
con.close()
if data is None:
return ()
data = [dict(r) for r in data]
return tuple(data)
def get_sat_productos(key):
con = sqlite3.connect(DB_SAT)
con.row_factory = sqlite3.Row
cursor = con.cursor()
filtro = '%{}%'.format(key)
sql = "SELECT * FROM productos WHERE key LIKE ? OR name LIKE ?"
cursor.execute(sql, [filtro, filtro])
data = cursor.fetchall()
cursor.close()
con.close()
if data is None:
return ()
data = [dict(r) for r in data]
return tuple(data)
def now():
return datetime.datetime.now().replace(microsecond=0)
def get_token():
return _get_hash(uuid.uuid4().hex)
def get_mimetype(path):
mt = mimetypes.guess_type(path)[0]
return mt or 'application/octet-stream'
def is_file(path):
return os.path.isfile(path)
def get_stream(path):
return get_file(path), get_size(path)
def get_file(path):
return open(path, 'rb')
def read_file(path, mode='rb'):
return open(path, mode).read()
def get_size(path):
return os.path.getsize(path)
def get_template(name, data={}):
#~ print ('NAME', name, data)
template = template_lookup.get_template(name)
return template.render(**data)
def get_custom_styles(name, default='plantilla_factura.json'):
path = _join(PATH_MEDIA, 'templates', name.lower())
if is_file(path):
with open(path) as fh:
return loads(fh.read())
path = _join(PATH_TEMPLATES, default)
if is_file(path):
with open(path) as fh:
return loads(fh.read())
return {}
def get_template_ods(name, default='plantilla_factura.ods'):
path = _join(PATH_MEDIA, 'templates', name.lower())
if is_file(path):
return path
path = _join(PATH_TEMPLATES, default)
if is_file(path):
return path
return ''
def dumps(data):
return json.dumps(data, default=str)
def loads(data):
return json.loads(data)
def clean(values):
for k, v in values.items():
if isinstance(v, str):
values[k] = v.strip()
return values
def parse_con(values):
data = values.split('|')
try:
con = {'type': data[0]}
if con['type'] == 'sqlite':
con['name'] = data[1]
else:
if data[1]:
con['host'] = data[1]
if data[2]:
con['port'] = data[2]
con['name'] = data[3]
con['user'] = data[4]
con['password'] = data[5]
return con
except IndexError:
return {}
def spaces(value):
return ' '.join(value.split())
def to_slug(string):
value = (unicodedata.normalize('NFKD', string)
.encode('ascii', 'ignore')
.decode('ascii').lower())
return value.replace(' ', '_')
class Certificado(object):
def __init__(self, paths):
self._path_key = paths['path_key']
self._path_cer = paths['path_cer']
self._modulus = ''
#~ self._save_files()
self.error = ''
#~ def _save_files(self):
#~ try:
#~ self._path_key = _save_temp(bytes(self._key))
#~ self._path_cer = _save_temp(bytes(self._cer))
#~ except Exception as e:
#~ log.error(e)
#~ self._path_key = ''
#~ self._path_cer = ''
#~ return
def _kill(self, path):
try:
os.remove(path)
except:
pass
return
def _get_info_cer(self, session_rfc):
data = {}
args = 'openssl x509 -inform DER -in {}'
try:
cer_pem = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en PEM'
return data
args = 'openssl enc -base64 -in {}'
try:
cer_txt = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en TXT'
return data
args = 'openssl x509 -inform DER -in {} -noout -{}'
try:
result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3]
except Exception as e:
self.error = 'No se puede saber si es FIEL'
return data
if result == 'SSL server : No':
self.error = 'El certificado es FIEL'
return data
result = _call(args.format(self._path_cer, 'serial'))
serie = result.split('=')[1].split('\n')[0][1::2]
result = _call(args.format(self._path_cer, 'subject'))
#~ Verificar si es por la version de OpenSSL
t1 = 'x500UniqueIdentifier = '
t2 = 'x500UniqueIdentifier='
if t1 in result:
rfc = result.split(t1)[1][:13].strip()
elif t2 in result:
rfc = result.split(t2)[1][:13].strip()
else:
self.error = 'No se pudo obtener el RFC del certificado'
print ('\n', result)
return data
if not DEBUG:
if not rfc == session_rfc:
self.error = 'El RFC del certificado no corresponde.'
return data
dates = _call(args.format(self._path_cer, 'dates')).split('\n')
desde = parser.parse(dates[0].split('=')[1])
hasta = parser.parse(dates[1].split('=')[1])
self._modulus = _call(args.format(self._path_cer, 'modulus'))
data['cer'] = read_file(self._path_cer)
data['cer_pem'] = cer_pem
data['cer_txt'] = cer_txt.replace('\n', '')
data['serie'] = serie
data['rfc'] = rfc
data['desde'] = desde.replace(tzinfo=None)
data['hasta'] = hasta.replace(tzinfo=None)
return data
def _get_p12(self, password, rfc):
tmp_cer = tempfile.mkstemp()[1]
tmp_key = tempfile.mkstemp()[1]
tmp_p12 = tempfile.mkstemp()[1]
args = 'openssl x509 -inform DER -in "{}" -out "{}"'
_call(args.format(self._path_cer, tmp_cer))
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" -out "{}"'
_call(args.format(self._path_key, password, tmp_key))
args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" ' \
'-passout pass:"{}" -out "{}"'
_call(args.format(tmp_cer, tmp_key, rfc, _get_md5(rfc), tmp_p12))
data = read_file(tmp_p12)
self._kill(tmp_cer)
self._kill(tmp_key)
self._kill(tmp_p12)
return data
def _get_info_key(self, password, rfc):
data = {}
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}"'
try:
result = _call(args.format(self._path_key, password))
except Exception as e:
self.error = 'Contraseña incorrecta'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" | ' \
'openssl rsa -noout -modulus'
mod_key = _call(args.format(self._path_key, password))
if self._modulus != mod_key:
self.error = 'Los archivos no son pareja'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" | ' \
'openssl rsa -des3 -passout pass:"{}"'.format(
self._path_key, password, _get_md5(rfc))
key_enc = _call(args)
data['key'] = read_file(self._path_key)
data['key_enc'] = key_enc
data['p12'] = self._get_p12(password, rfc)
return data
def validate(self, password, rfc):
if not self._path_key or not self._path_cer:
self.error = 'Error en las rutas temporales del certificado'
return {}
data = self._get_info_cer(rfc)
if not data:
return {}
llave = self._get_info_key(password, data['rfc'])
if not llave:
return {}
data.update(llave)
self._kill(self._path_key)
self._kill(self._path_cer)
return data
def make_xml(data, certificado):
from .cfdi_xml import CFDI
if DEBUG:
data['emisor']['Rfc'] = certificado.rfc
data['emisor']['RegimenFiscal'] = '603'
cfdi = CFDI()
xml = cfdi.get_xml(data)
data = {
'xsltproc': PATH_XSLTPROC,
'xslt': _join(PATH_XSLT, 'cadena.xslt'),
'xml': save_temp(xml, 'w'),
'openssl': PATH_OPENSSL,
'key': save_temp(certificado.key_enc, 'w'),
'pass': _get_md5(certificado.rfc)
}
args = '"{xsltproc}" "{xslt}" "{xml}" | ' \
'"{openssl}" dgst -sha256 -sign "{key}" -passin pass:"{pass}" | ' \
'"{openssl}" enc -base64 -A'.format(**data)
sello = _call(args)
_kill(data['xml'])
_kill(data['key'])
return cfdi.add_sello(sello)
def timbra_xml(xml, auth):
from .pac import Finkok as PAC
if DEBUG:
auth = {}
else:
if not auth:
msg = 'Sin datos para timbrar'
result = {'ok': True, 'error': msg}
return result
result = {'ok': True, 'error': ''}
pac = PAC(auth)
xml = pac.timbra_xml(xml)
if not xml:
result['ok'] = False
result['error'] = pac.error
return result
result['xml'] = xml
result['uuid'] = pac.uuid
result['fecha'] = pac.fecha
return result
def get_sat(xml):
from .pac import get_status_sat
return get_status_sat(xml)
class LIBO(object):
HOST = 'localhost'
PORT = '8100'
ARG = 'socket,host={},port={};urp;StarOffice.ComponentContext'.format(
HOST, PORT)
def __init__(self):
self._app = None
self._start_office()
self._init_values()
def _init_values(self):
self._es_pre = False
self._ctx = None
self._sm = None
self._desktop = None
if self.is_running:
ctx = uno.getComponentContext()
service = 'com.sun.star.bridge.UnoUrlResolver'
resolver = ctx.ServiceManager.createInstanceWithContext(service, ctx)
self._ctx = resolver.resolve('uno:{}'.format(self.ARG))
self._sm = self._ctx.ServiceManager
self._desktop = self._create_instance('com.sun.star.frame.Desktop')
return
def _create_instance(self, name, with_context=True):
if with_context:
instance = self._sm.createInstanceWithContext(name, self._ctx)
else:
instance = self._sm.createInstance(name)
return instance
@property
def is_running(self):
try:
s = socket.create_connection((self.HOST, self.PORT), 5.0)
s.close()
return True
except ConnectionRefusedError:
return False
def _start_office(self):
if self.is_running:
return
c = 1
while c < 4:
c += 1
self.app = subprocess.Popen([
'soffice', '--headless', '--accept={}'.format(self.ARG)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(5)
if self.is_running:
return
return
def _set_properties(self, properties):
pl = []
for k, v in properties.items():
pv = PropertyValue()
pv.Name = k
pv.Value = v
pl.append(pv)
return tuple(pl)
def _doc_open(self, path, options):
options = self._set_properties(options)
path = self._path_url(path)
try:
doc = self._desktop.loadComponentFromURL(path, '_blank', 0, options)
return doc
except:
return None
def _path_url(self, path):
if path.startswith('file://'):
return path
return uno.systemPathToFileUrl(path)
def close(self):
if self.is_running:
if not self._desktop is None:
self._desktop.terminate()
if not self._app is None:
self._app.terminate()
return
def _read(self, path):
try:
return open(path, 'rb').read()
except:
return b''
def _clean(self):
self._sd.SearchRegularExpression = True
self._sd.setSearchString("\{(\w.+)\}")
self._search.replaceAll(self._sd)
return
def _cancelado(self, cancel):
if not cancel:
pd = self._sheet.getDrawPage()
if pd.getCount():
pd.remove(pd.getByIndex(0))
return
def _set_search(self):
self._sheet = self._template.getSheets().getByIndex(0)
self._search = self._sheet.getPrintAreas()[0]
self._search = self._sheet.getCellRangeByPosition(
self._search.StartColumn,
self._search.StartRow,
self._search.EndColumn,
self._search.EndRow
)
self._sd = self._sheet.createSearchDescriptor()
self._sd.SearchCaseSensitive = False
return
def _next_cell(self, cell):
col = cell.getCellAddress().Column
row = cell.getCellAddress().Row + 1
return self._sheet.getCellByPosition(col, row)
def _copy_cell(self, cell):
destino = self._next_cell(cell)
self._sheet.copyRange(destino.getCellAddress(), cell.getRangeAddress())
return destino
def _set_cell(self, k='', v=None, cell=None, value=False):
if k:
self._sd.setSearchString(k)
ranges = self._search.findAll(self._sd)
if ranges:
ranges = ranges.getRangeAddressesAsString().split(';')
for r in ranges:
for c in r.split(','):
cell = self._sheet.getCellRangeByName(c)
if v is None:
return cell
if cell.getImplementationName() == 'ScCellObj':
pattern = re.compile(k, re.IGNORECASE)
nv = pattern.sub(v, cell.getString())
if value:
cell.setValue(nv)
else:
cell.setString(nv)
return cell
if cell:
if cell.getImplementationName() == 'ScCellObj':
ca = cell.getCellAddress()
new_cell = self._sheet.getCellByPosition(ca.Column, ca.Row + 1)
if value:
new_cell.setValue(v)
else:
new_cell.setString(v)
return new_cell
def _comprobante(self, data):
for k, v in data.items():
if k in ('total', 'descuento', 'subtotal'):
self._set_cell('{cfdi.%s}' % k, v, value=True)
else:
self._set_cell('{cfdi.%s}' % k, v)
return
def _emisor(self, data):
for k, v in data.items():
self._set_cell('{emisor.%s}' % k, v)
return
def _receptor(self, data):
for k, v in data.items():
self._set_cell('{receptor.%s}' % k, v)
return
def _conceptos(self, data):
first = True
for concepto in data:
key = concepto.get('noidentificacion', '')
description = concepto['descripcion']
unidad = concepto['unidad']
cantidad = concepto['cantidad']
valor_unitario = concepto['valorunitario']
importe = concepto['importe']
if first:
first = False
cell_1 = self._set_cell('{noidentificacion}', key)
cell_2 = self._set_cell('{descripcion}', description)
cell_3 = self._set_cell('{unidad}', unidad)
cell_4 = self._set_cell('{cantidad}', cantidad, value=True)
cell_5 = self._set_cell('{valorunitario}', valor_unitario, value=True)
cell_6 = self._set_cell('{importe}', importe, value=True)
else:
row = cell_2.getCellAddress().Row + 1
self._sheet.getRows().insertByIndex(row, 1)
if cell_1:
self._copy_cell(cell_1)
cell_1 = self._set_cell(v=key, cell=cell_1)
if cell_3:
self._copy_cell(cell_3)
cell_3 = self._set_cell(v=unidad, cell=cell_3)
self._copy_cell(cell_2)
self._copy_cell(cell_4)
self._copy_cell(cell_5)
self._copy_cell(cell_6)
cell_2 = self._set_cell(v=description, cell=cell_2)
cell_4 = self._set_cell(v=cantidad, cell=cell_4, value=True)
cell_5 = self._set_cell(v=valor_unitario, cell=cell_5, value=True)
cell_6 = self._set_cell(v=importe, cell=cell_6, value=True)
return
def _totales(self, data):
currency = data['moneda']
cell_title = self._set_cell('{subtotal.titulo}', 'SubTotal')
value = data['subtotal']
cell_value = self._set_cell('{subtotal}', value, value=True)
cell_value.CellStyle = currency
#~ Si encuentra el campo {total}, se asume que los totales e impuestos
#~ están declarados de forma independiente cada uno
#~ if self._add_totales(xml):
#~ return
#~ Si no se encuentra, copia las celdas hacia abajo de
#~ {subtotal.titulo} y {subtotal}
#~ print (data['descuento'])
if 'descuento' in data:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v='Descuento', cell=cell_title)
value = data['descuento']
cell_value = self._set_cell(v=value, cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['traslados']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['retenciones']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['taxlocales']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v='Total', cell=cell_title)
value = data['total']
cell_value = self._set_cell(v=value, cell=cell_value, value=True)
cell_value.CellStyle = currency
return
def _timbre(self, data):
if self._es_pre:
return
for k, v in data.items():
self._set_cell('{timbre.%s}' % k, v)
pd = self._sheet.getDrawPage()
image = self._template.createInstance('com.sun.star.drawing.GraphicObjectShape')
image.GraphicURL = data['path_cbb']
pd.add(image)
s = Size()
s.Width = 4150
s.Height = 4500
image.setSize(s)
image.Anchor = self._set_cell('{timbre.cbb}')
return
def _donataria(self, data):
if not data:
return
for k, v in data.items():
self._set_cell('{donataria.%s}' % k, v)
return
def _ine(self, data):
if not data:
return
for k, v in data.items():
self._set_cell('{ine.%s}' % k, v)
return
def _render(self, data):
self._set_search()
self._es_pre = data.pop('es_pre', False)
self._comprobante(data['comprobante'])
self._emisor(data['emisor'])
self._receptor(data['receptor'])
self._conceptos(data['conceptos'])
self._totales(data['totales'])
self._timbre(data['timbre'])
self._donataria(data['donataria'])
self._ine(data['ine'])
self._cancelado(data['cancelada'])
self._clean()
return
def pdf(self, path, data):
options = {'AsTemplate': True, 'Hidden': True}
self._template = self._doc_open(path, options)
if self._template is None:
return b''
self._render(data)
path = '{}.ods'.format(tempfile.mkstemp()[1])
self._template.storeToURL(self._path_url(path), ())
doc = self._doc_open(path, {'Hidden': True})
options = {'FilterName': 'calc_pdf_Export'}
path = tempfile.mkstemp()[1]
doc.storeToURL(self._path_url(path), self._set_properties(options))
doc.close(True)
self._template.close(True)
return self._read(path)
def to_pdf(data, emisor_rfc):
rfc = data['emisor']['rfc']
if DEBUG:
rfc = emisor_rfc
version = data['comprobante']['version']
if APP_LIBO:
app = LIBO()
if app.is_running:
donativo = ''
if data['donativo']:
donativo = '_donativo'
name = '{}_{}{}.ods'.format(rfc.lower(), version, donativo)
path = get_template_ods(name)
if path:
return app.pdf(path, data)
name = '{}_{}.json'.format(rfc, version)
custom_styles = get_custom_styles(name)
path = get_path_temp()
pdf = TemplateInvoice(path)
pdf.custom_styles = custom_styles
pdf.data = data
pdf.render()
return read_file(path)
def parse_xml(xml):
return ET.fromstring(xml)
def get_dict(data):
return CaseInsensitiveDict(data)
def to_letters(value, moneda):
monedas = {
'MXN': 'peso',
'USD': 'dólar',
'EUR': 'euro',
}
return NumLet(value, monedas.get(moneda, moneda)).letras
def get_qr(data):
path = tempfile.mkstemp()[1]
qr = pyqrcode.create(data, mode='binary')
qr.png(path, scale=7)
return path
def _get_relacionados(doc, version):
node = doc.find('{}CfdiRelacionados'.format(PRE[version]))
if node is None:
return ''
uuids = ['UUID: {}'.format(n.attrib['UUID']) for n in node.getchildren()]
return '\n'.join(uuids)
def _comprobante(doc, options):
data = CaseInsensitiveDict(doc.attrib.copy())
del data['certificado']
data['seriefolio'] = '{} - {}'.format(data['serie'], data['folio'])
data['totalenletras'] = to_letters(float(data['total']), data['moneda'])
if data['version'] == '3.3':
tipos = {
'I': 'ingreso',
'E': 'egreso',
'T': 'traslado',
}
data['tipodecomprobante'] = tipos.get(data['tipodecomprobante'])
data['lugarexpedicion'] = \
'C.P. de Expedición: {}'.format(data['lugarexpedicion'])
data['metododepago'] = options['metododepago']
data['formadepago'] = options['formadepago']
if 'condicionesdepago' in data:
data['condicionesdepago'] = \
'Condiciones de pago: {}'.format(data['condicionesdepago'])
data['moneda'] = options['moneda']
data['tiporelacion'] = options.get('tiporelacion', '')
data['relacionados'] = _get_relacionados(doc, data['version'])
else:
fields = {
'formaDePago': 'Forma de Pago: {}\n',
'metodoDePago': 'Método de pago: {}\n',
'condicionesDePago': 'Condiciones de Pago: {}\n',
'NumCtaPago': 'Número de Cuenta de Pago: {}\n',
'Moneda': 'Moneda: {}\n',
'TipoCambio': 'Tipo de Cambio: {}',
}
datos = ''
for k, v in fields.items():
if k in data:
datos += v.format(data[k])
data['datos'] = datos
data['hora'] = data['fecha'].split('T')[1]
fecha = parser.parse(data['fecha'])
try:
locale.setlocale(locale.LC_TIME, "es_MX.UTF-8")
except:
pass
data['fechaformato'] = fecha.strftime('%A, %d de %B de %Y')
data['tipocambio'] = 'Tipo de Cambio: $ {:0.2f}'.format(
float(data['tipocambio']))
return data
def _emisor(doc, version, values):
emisor = doc.find('{}Emisor'.format(PRE[version]))
data = CaseInsensitiveDict(emisor.attrib.copy())
node = emisor.find('{}DomicilioFiscal'.format(PRE[version]))
if not node is None:
data.update(CaseInsensitiveDict(node.attrib.copy()))
if version == '3.2':
node = emisor.find('{}RegimenFiscal'.format(PRE[version]))
if not node is None:
data['regimenfiscal'] = node.attrib['Regimen']
data['regimen'] = node.attrib['Regimen']
else:
data['regimenfiscal'] = values['regimenfiscal']
path = _join(PATH_MEDIA, 'logos', '{}.png'.format(data['rfc'].lower()))
if is_file(path):
data['logo'] = path
return data
def _receptor(doc, version, values):
node = doc.find('{}Receptor'.format(PRE[version]))
data = CaseInsensitiveDict(node.attrib.copy())
node = node.find('{}Domicilio'.format(PRE[version]))
if not node is None:
data.update(node.attrib.copy())
if version == '3.2':
return data
data['usocfdi'] = values['usocfdi']
return data
def _conceptos(doc, version):
data = []
conceptos = doc.find('{}Conceptos'.format(PRE[version]))
for c in conceptos.getchildren():
values = CaseInsensitiveDict(c.attrib.copy())
if version == '3.3':
values['noidentificacion'] = '{}\n(SAT {})'.format(
values['noidentificacion'], values['ClaveProdServ'])
values['unidad'] = '({})\n{}'.format(
values['ClaveUnidad'], values['unidad'])
data.append(values)
return data
def _totales(doc, cfdi, version):
data = {}
data['moneda'] = doc.attrib['Moneda']
data['subtotal'] = cfdi['subtotal']
if 'descuento' in cfdi:
data['descuento'] = cfdi['descuento']
data['total'] = cfdi['total']
tn = {
'001': 'ISR',
'002': 'IVA',
'003': 'IEPS',
}
traslados = []
retenciones = []
taxlocales = []
imp = doc.find('{}Impuestos'.format(PRE[version]))
if imp is not None:
tmp = CaseInsensitiveDict(imp.attrib.copy())
for k, v in tmp.items():
data[k] = v
node = imp.find('{}Traslados'.format(PRE[version]))
if node is not None:
for n in node.getchildren():
tmp = CaseInsensitiveDict(n.attrib.copy())
if version == '3.3':
title = 'Traslado {} {}'.format(
tn.get(tmp['impuesto']), tmp['tasaocuota'])
else:
title = 'Traslado {} {}'.format(tmp['impuesto'], tmp['tasa'])
traslados.append((title, float(tmp['importe'])))
node = imp.find('{}Retenciones'.format(PRE[version]))
if node is not None:
for n in node.getchildren():
tmp = CaseInsensitiveDict(n.attrib.copy())
if version == '3.3':
title = 'Retención {} {}'.format(
tn.get(tmp['impuesto']), '')
else:
title = 'Retención {} {}'.format(tmp['impuesto'], '')
retenciones.append((title, float(tmp['importe'])))
node = doc.find('{}Complemento/{}ImpuestosLocales'.format(
PRE[version], PRE['LOCALES']))
if node is not None:
for otro in list(node):
if otro.tag == '{}RetencionesLocales'.format(PRE['LOCALES']):
tipo = 'Retención '
name = 'ImpLocRetenido'
tasa = 'TasadeRetencion'
else:
tipo = 'Traslado '
name = 'ImpLocTrasladado'
tasa = 'TasadeTraslado'
title = '{} {} {}%'.format(
tipo, otro.attrib[name], otro.attrib[tasa])
importe = float(otro.attrib['Importe'])
taxlocales.append((title, importe))
data['traslados'] = traslados
data['retenciones'] = retenciones
data['taxlocales'] = taxlocales
return data
def _timbre(doc, version, values):
CADENA = '||{version}|{UUID}|{FechaTimbrado}|{selloCFD}|{noCertificadoSAT}||'
if version == '3.3':
CADENA = '||{Version}|{UUID}|{FechaTimbrado}|{SelloCFD}|{NoCertificadoSAT}||'
node = doc.find('{}Complemento/{}TimbreFiscalDigital'.format(
PRE[version], PRE['TIMBRE']))
data = CaseInsensitiveDict(node.attrib.copy())
total_s = '%017.06f' % float(values['total'])
qr_data = '?re=%s&rr=%s&tt=%s&id=%s' % (
values['rfc_emisor'],
values['rfc_receptor'],
total_s,
node.attrib['UUID'])
data['path_cbb'] = get_qr(qr_data)
data['cadenaoriginal'] = CADENA.format(**node.attrib)
return data
def _donataria(doc, version, fechadof):
node = doc.find('{}Complemento/{}Donatarias'.format(
PRE[version], PRE['DONATARIA']))
if node is None:
return {}
data = CaseInsensitiveDict(node.attrib.copy())
data['fechadof'] = fechadof
return data
def _ine(doc, version):
node = doc.find('{}Complemento/{}INE'.format(PRE[version], PRE['INE']))
if node is None:
return {}
values = (
('TipoComite', 'Tipo de Comite: {}'),
('TipoProceso', 'Tipo de Proceso: {}'),
('IdContabilidad', 'ID de Contabilidad: {}'),
)
data = CaseInsensitiveDict(node.attrib.copy())
for k, v in values:
data[k] = v.format(data[k])
return data
def get_data_from_xml(invoice, values):
data = {'cancelada': invoice.cancelada, 'donativo': invoice.donativo}
doc = parse_xml(invoice.xml)
data['comprobante'] = _comprobante(doc, values)
version = data['comprobante']['version']
data['emisor'] = _emisor(doc, version, values)
data['receptor'] = _receptor(doc, version, values)
data['conceptos'] = _conceptos(doc, version)
data['totales'] = _totales(doc, data['comprobante'], version)
data['donataria'] = _donataria(doc, version, values['fechadof'])
data['ine'] = _ine(doc, version)
options = {
'rfc_emisor': data['emisor']['rfc'],
'rfc_receptor': data['receptor']['rfc'],
'total': data['comprobante']['total'],
}
data['timbre'] = _timbre(doc, version, options)
del data['timbre']['version']
data['comprobante'].update(data['timbre'])
return data
def to_zip(*files):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
for data, file_name in files:
zip_file.writestr(file_name, data)
return zip_buffer.getvalue()
def make_fields(xml):
doc = ET.fromstring(xml)
data = CaseInsensitiveDict(doc.attrib.copy())
data.pop('certificado')
data.pop('sello')
version = data['version']
receptor = doc.find('{}Receptor'.format(PRE[version]))
receptor = CaseInsensitiveDict(receptor.attrib.copy())
data['receptor_nombre'] = receptor['nombre']
data['receptor_rfc'] = receptor['rfc']
data = {k.lower(): v for k, v in data.items()}
return data
def make_info_mail(data, fields):
return data.format(**fields).replace('\n', '<br/>')
def send_mail(data):
msg = ''
server = SendMail(data['server'])
is_connect = server.is_connect
if is_connect:
msg = server.send(data['options'])
else:
msg = server.error
server.close()
return {'ok': is_connect, 'msg': msg}
def get_path_info(path):
path, filename = os.path.split(path)
name, extension = os.path.splitext(filename)
return (path, filename, name, extension)
def get_path_temp():
return tempfile.mkstemp()[1]
def get_date(value, next_day=False):
d = parser.parse(value)
if next_day:
return d + datetime.timedelta(days=1)
return d
def upload_file(rfc, opt, file_obj):
if opt == 'emisorlogo':
tmp = file_obj.filename.split('.')
name = '{}.{}'.format(rfc.lower(), tmp[-1].lower())
path = _join(PATH_MEDIA, 'logos', name)
elif opt == 'txt_plantilla_factura_32':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_3.2.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'templates', name)
elif opt == 'txt_plantilla_factura_33':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_3.3.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'templates', name)
elif opt == 'txt_plantilla_factura_33j':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'json':
msg = 'Extensión de archivo incorrecta, selecciona un archivo JSON'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_3.3.json'.format(rfc.lower())
path = _join(PATH_MEDIA, 'templates', name)
elif opt == 'txt_plantilla_donataria':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_3.3_donativo.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'templates', name)
elif opt == 'bdfl':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'sqlite':
msg = 'Extensión de archivo incorrecta, selecciona un archivo SQLite'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}.sqlite'.format(rfc.lower())
path = _join('/tmp', name)
if save_file(path, file_obj.file.read()):
return {'status': 'server', 'name': file_obj.filename, 'ok': True}
return {'status': 'error', 'ok': False}
def cancel_cfdi(uuid, pk12, rfc, auth):
from .pac import Finkok as PAC
template = read_file(TEMPLATE_CANCEL, 'r')
data = {
'rfc': rfc,
'fecha': datetime.datetime.now().isoformat()[:19],
'uuid': str(uuid).upper(),
}
template = template.format(**data)
data = {
'xmlsec': PATH_XMLSEC,
'pk12': save_temp(pk12),
'pass': _get_md5(rfc),
'template': save_temp(template, 'w'),
}
args = '"{xmlsec}" --sign --pkcs12 "{pk12}" --pwd {pass} ' \
'"{template}"'.format(**data)
xml_sign = _call(args)
if DEBUG:
auth = {}
else:
if not auth:
msg = 'Sin datos para cancelar'
result = {'ok': False, 'error': msg}
return result
msg = 'Factura cancelada correctamente'
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}}
pac = PAC(auth)
result = pac.cancel_signature(xml_sign)
if result:
codes = {None: '',
'Could not get UUID Text': 'UUID no encontrado'}
if not result['CodEstatus'] is None:
data['ok'] = False
data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus'])
else:
data['ok'] = False
data['msg'] = pac.error
return data, result
def run_in_thread(fn):
def run(*k, **kw):
t = threading.Thread(target=fn, args=k, kwargs=kw)
t.start()
return t
return run
def get_bool(value):
if not value:
return False
if value == '1':
return True
return False
def get_float(value):
return round(float(value), DECIMALES)
def crear_rol(user, contra=''):
if not contra:
contra = user
args = 'psql -U postgres -c "CREATE ROLE {} WITH LOGIN ENCRYPTED ' \
'PASSWORD \'{}\';"'.format(user, contra)
try:
result = _call(args)
if result == 'CREATE ROLE\n':
return True
except Exception as e:
log.info(e)
return False
def crear_db(nombre):
args = 'psql -U postgres -c "CREATE DATABASE {0} WITH ' \
'OWNER {0};"'.format(nombre)
try:
result = _call(args)
print (result)
if result == 'CREATE DATABASE\n':
return True
except Exception as e:
log.info(e)
return False
class ImportFacturaLibre(object):
def __init__(self, path, rfc):
self._rfc = rfc
self._con = None
self._cursor = None
self._error = ''
self._is_connect = self._connect(path)
self._clientes = []
self._clientes_rfc = []
@property
def error(self):
return self._error
@property
def is_connect(self):
return self._is_connect
def _validate_rfc(self):
sql = "SELECT rfc FROM emisor LIMIT 1"
self._cursor.execute(sql)
obj = self._cursor.fetchone()
if obj is None:
self._error = 'No se encontró al emisor: {}'.format(self._rfc)
return False
if not DEBUG:
if obj['rfc'] != self._rfc:
self._error = 'Los datos no corresponden al RFC: {}'.format(self._rfc)
return False
return True
def _connect(self, path):
try:
self._con = sqlite3.connect(path)
self._con.row_factory = sqlite3.Row
self._cursor = self._con.cursor()
return self._validate_rfc()
except Exception as e:
log.error(e)
self._error = 'No se pudo conectar a la base de datos'
return False
def close(self):
try:
self._cursor.close()
self._con.close()
except:
pass
return
def import_data(self):
data = {}
tables = (
('receptores', 'Socios'),
('cfdfacturas', 'Facturas'),
('categorias', 'Categorias'),
)
for source, target in tables:
data[target] = self._get_table(source)
data['Socios'] += self._clientes
return data
def _get_table(self, table):
return getattr(self, '_{}'.format(table))()
def import_productos(self):
sql = "SELECT * FROM productos"
self._cursor.execute(sql)
rows = self._cursor.fetchall()
fields = (
('id_categoria', 'categoria'),
('noIdentificacion', 'clave'),
('descripcion', 'descripcion'),
('unidad', 'unidad'),
('valorUnitario', 'valor_unitario'),
('existencia', 'existencia'),
('inventario', 'inventario'),
('codigobarras', 'codigo_barras'),
('CuentaPredial', 'cuenta_predial'),
('precio_compra', 'ultimo_precio'),
('minimo', 'minimo'),
)
data = []
sql = """
SELECT nombre, tasa, tipo
FROM impuestos, productos, productosimpuestos
WHERE productos.id=productosimpuestos.id_producto
AND productosimpuestos.id_impuesto=impuestos.id
AND productos.id = ?
"""
for row in rows:
new = {t: row[s] for s, t in fields}
new['descripcion'] = ' '.join(new['descripcion'].split())
new['clave_sat'] = DEFAULT_SAT_PRODUCTO
self._cursor.execute(sql, (row['id'],))
impuestos = self._cursor.fetchall()
new['impuestos'] = tuple(impuestos)
data.append(new)
return data
def _categorias(self):
sql = "SELECT * FROM categorias ORDER BY id_padre"
self._cursor.execute(sql)
rows = self._cursor.fetchall()
fields = (
('id', 'id'),
('categoria', 'categoria'),
('id_padre', 'padre'),
)
data = []
for row in rows:
new = {t: row[s] for s, t in fields}
if new['padre'] == 0:
new['padre'] = None
data.append(new)
return data
def _get_cliente(self, invoice):
sql = "SELECT rfc, nombre FROM receptores WHERE id=?"
self._cursor.execute(sql, [invoice['id_cliente']])
obj = self._cursor.fetchone()
if not obj is None:
data = {
'rfc': obj['rfc'],
'slug': to_slug(obj['nombre']),
}
return data
if not invoice['xml']:
return {}
doc = parse_xml(invoice['xml'])
version = doc.attrib['version']
node = doc.find('{}Receptor'.format(PRE[version]))
rfc = node.attrib['rfc']
nombre = node.attrib['nombre']
tipo_persona = 1
if rfc == 'XEXX010101000':
tipo_persona = 4
elif rfc == 'XAXX010101000':
tipo_persona = 3
elif len(rfc) == 12:
tipo_persona = 2
data = {
'tipo_persona': tipo_persona,
'rfc': rfc,
'nombre': nombre,
'slug': to_slug(nombre),
'es_cliente': True,
'es_activo': False,
}
if not rfc in self._clientes_rfc:
self._clientes_rfc.append(rfc)
self._clientes.append(data)
data = {
'rfc': data['rfc'],
'slug': data['slug'],
}
return data
def _get_detalles(self, id):
sql = "SELECT * FROM cfddetalle WHERE id_cfd=?"
self._cursor.execute(sql, [id])
rows = self._cursor.fetchall()
fields = (
('categoria', 'categoria'),
('cantidad', 'cantidad'),
('unidad', 'unidad'),
('noIdentificacion', 'clave'),
('descripcion', 'descripcion'),
('valorUnitario', 'valor_unitario'),
('importe', 'importe'),
('numero', 'pedimento'),
('fecha', 'fecha_pedimento'),
('aduana', 'aduana'),
('CuentaPredial', 'cuenta_predial'),
('alumno', 'alumno'),
('curp', 'curp'),
('nivel', 'nivel'),
('autorizacion', 'autorizacion'),
)
data = []
for row in rows:
new = {t: row[s] for s, t in fields}
data.append(new)
return data
def _get_impuestos(self, id):
sql = "SELECT * FROM cfdimpuestos WHERE id_cfd=?"
self._cursor.execute(sql, [id])
rows = self._cursor.fetchall()
tasas = {
'16': 0.16,
'11': 0.11,
'-10': 0.10,
'0': 0.0,
'-2/3': 0.666667,
'-0.5': 0.005,
}
data = []
for row in rows:
filtro = {
'name': row['nombre'],
'tasa': tasas[row['tasa']],
'tipo': row['tipo'][0],
}
new = {
'importe': row['importe'],
'filtro': filtro
}
data.append(new)
return data
def _cfdfacturas(self):
sql = "SELECT * FROM cfdfacturas"
self._cursor.execute(sql)
rows = self._cursor.fetchall()
fields = (
('version', 'version'),
('serie', 'serie'),
('folio', 'folio'),
('fecha', 'fecha'),
('fecha_timbrado', 'fecha_timbrado'),
('formaDePago', 'forma_pago'),
('condicionesDePago', 'condiciones_pago'),
('subTotal', 'subtotal'),
('descuento', 'descuento'),
('TipoCambio', 'tipo_cambio'),
('Moneda', 'moneda'),
('total', 'total'),
('tipoDeComprobante', 'tipo_comprobante'),
('metodoDePago', 'metodo_pago'),
('LugarExpedicion', 'lugar_expedicion'),
('totalImpuestosRetenidos', 'total_retenciones'),
('totalImpuestosTrasladados', 'total_traslados'),
('xml', 'xml'),
('id_cliente', 'cliente'),
('notas', 'notas'),
('uuid', 'uuid'),
('donativo', 'donativo'),
('estatus', 'estatus'),
('regimen', 'regimen_fiscal'),
('xml_acuse', 'acuse'),
)
data = []
for row in rows:
new = {t: row[s] for s, t in fields}
if not new['uuid']:
new['uuid'] = None
if new['xml'] is None:
new['xml'] = ''
if row['estatus'] == 'Pagada':
new['pagada'] = True
elif row['estatus'] == 'Cancelada':
new['cancelada'] = True
new['total_mn'] = round(row['TipoCambio'] * row['total'], 2)
new['detalles'] = self._get_detalles(row['id'])
new['impuestos'] = self._get_impuestos(row['id'])
new['cliente'] = self._get_cliente(row)
data.append(new)
return data
def _receptores(self):
sql = "SELECT * FROM receptores"
self._cursor.execute(sql)
rows = self._cursor.fetchall()
fields = (
('rfc', 'rfc'),
('nombre', 'nombre'),
('calle', 'calle'),
('noExterior', 'no_exterior'),
('noInterior', 'no_interior'),
('colonia', 'colonia'),
('municipio', 'municipio'),
('estado', 'estado'),
('pais', 'pais'),
('codigoPostal', 'codigo_postal'),
('extranjero', 'es_extranjero'),
('activo', 'es_activo'),
('fechaalta', 'fecha_alta'),
('notas', 'notas'),
('cuentaCliente', 'cuenta_cliente'),
('cuentaProveedor', 'cuenta_proveedor'),
('saldoCliente', 'saldo_cliente'),
('saldoProveedor', 'saldo_proveedor'),
('esCliente', 'es_cliente'),
('esProveedor', 'es_proveedor'),
)
data = []
sql1 = "SELECT correo FROM correos WHERE id_cliente=?"
sql2 = "SELECT telefono FROM telefonos WHERE id_cliente=?"
for row in rows:
new = {t: row[s] for s, t in fields}
new['slug'] = to_slug(new['nombre'])
if new['es_extranjero']:
new['tipo_persona'] = 4
elif new['rfc'] == 'XAXX010101000':
new['tipo_persona'] = 3
elif len(new['rfc']) == 12:
new['tipo_persona'] = 2
self._cursor.execute(sql1, (row['id'],))
tmp = self._cursor.fetchall()
if tmp:
new['correo_facturas'] = ', '.join([r[0] for r in tmp])
self._cursor.execute(sql2, (row['id'],))
tmp = self._cursor.fetchall()
if tmp:
new['telefonos'] = ', '.join([r[0] for r in tmp])
data.append(new)
return data