empresa-libre/source/app/controllers/util.py

1078 lines
31 KiB
Python

#!/usr/bin/env python
import datetime
import getpass
import hashlib
import json
import mimetypes
import os
import re
import sqlite3
import socket
import subprocess
import tempfile
import time
import unicodedata
import uuid
import zipfile
from io import BytesIO
from smtplib import SMTPException, SMTPAuthenticationError
from xml.etree import ElementTree as ET
#~ import uno
#~ from com.sun.star.beans import PropertyValue
#~ from com.sun.star.awt import Size
import pyqrcode
from dateutil import parser
from .helper import CaseInsensitiveDict, NumLet, SendMail, TemplateInvoice
from settings import DEBUG, log, template_lookup, COMPANIES, DB_SAT, \
PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL, PATH_TEMPLATES, PRE
#~ def _get_hash(password):
#~ return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
#~ def validate_password(hashed, password):
#~ return bcrypt.hashpw(password.encode(), hashed.encode()) == hashed.encode()
def _call(args):
return subprocess.check_output(args, shell=True).decode()
def _get_md5(data):
return hashlib.md5(data.encode()).hexdigest()
def _save_temp(data, modo='wb'):
path = tempfile.mkstemp()[1]
with open(path, modo) as f:
f.write(data)
return path
def _join(*paths):
return os.path.join(*paths)
def _kill(path):
try:
os.remove(path)
except:
pass
return
def get_pass():
password = getpass.getpass('Introduce la contraseña: ')
pass2 = getpass.getpass('Confirma la contraseña: ')
if password != pass2:
msg = 'Las contraseñas son diferentes'
return False, msg
password = password.strip()
if not password:
msg = 'La contraseña es necesaria'
return False, msg
return True, password
def get_value(arg):
value = input('Introduce el {}: '.format(arg)).strip()
if not value:
msg = 'El {} es requerido'.format(arg)
log.error(msg)
return ''
return value
def _get_args(rfc):
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT con FROM names WHERE rfc=?"
cursor.execute(sql, (rfc,))
values = cursor.fetchone()
if values is None:
msg = 'No se encontró el RFC'
log.error(msg)
return ''
cursor.close()
con.close()
return values[0]
def get_con(rfc=''):
if not rfc:
rfc = get_value('RFC').upper()
if not rfc:
return False
args = _get_args(rfc.upper())
if not args:
return False
return loads(args)
def get_rfcs():
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT * FROM names"
cursor.execute(sql)
values = cursor.fetchall()
cursor.close()
con.close()
return values
def get_sat_key(table, key):
con = sqlite3.connect(DB_SAT)
cursor = con.cursor()
sql = 'SELECT key, description FROM {} WHERE key=?'.format(table)
cursor.execute(sql, (key,))
data = cursor.fetchone()
cursor.close()
con.close()
if data is None:
return {'ok': False, 'text': 'No se encontró la clave'}
return {'ok': True, 'text': data[1]}
def now():
return datetime.datetime.now().replace(microsecond=0)
def get_token():
return _get_hash(uuid.uuid4().hex)
def get_mimetype(path):
mt = mimetypes.guess_type(path)[0]
return mt or 'application/octet-stream'
def is_file(path):
return os.path.isfile(path)
def get_stream(path):
return get_file(path), get_size(path)
def get_file(path):
return open(path, 'rb')
def read_file(path, mode='rb'):
return open(path, mode).read()
def get_size(path):
return os.path.getsize(path)
def get_template(name, data={}):
#~ print ('NAME', name, data)
template = template_lookup.get_template(name)
return template.render(**data)
def get_custom_styles(name, default='plantilla_factura.json'):
path = _join(PATH_TEMPLATES, name)
if is_file(path):
with open(path) as fh:
return loads(fh.read())
path = _join(PATH_TEMPLATES, default)
if is_file(path):
with open(path) as fh:
return loads(fh.read())
return {}
def dumps(data):
return json.dumps(data, default=str)
def loads(data):
return json.loads(data)
def clean(values):
for k, v in values.items():
if isinstance(v, str):
values[k] = v.strip()
return values
def parse_con(values):
data = values.split('|')
try:
con = {'type': data[0]}
if con['type'] == 'sqlite':
con['name'] = data[1]
else:
if data[1]:
con['host'] = data[1]
if data[2]:
con['port'] = data[2]
con['name'] = data[3]
con['user'] = data[4]
con['password'] = data[5]
return con
except IndexError:
return {}
def spaces(value):
return ' '.join(value.split())
def to_slug(string):
value = (unicodedata.normalize('NFKD', string)
.encode('ascii', 'ignore')
.decode('ascii').lower())
return value.replace(' ', '_')
class Certificado(object):
def __init__(self, key, cer):
self._key = key
self._cer = cer
self._modulus = ''
self._save_files()
self.error = ''
def _save_files(self):
try:
self._path_key = _save_temp(self._key)
self._path_cer = _save_temp(self._cer)
except:
self._path_key = ''
self._path_cer = ''
return
def _kill(self, path):
try:
os.remove(path)
except:
pass
return
def _get_info_cer(self, session_rfc):
data = {}
args = 'openssl x509 -inform DER -in {}'
try:
cer_pem = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en PEM'
return data
args = 'openssl enc -base64 -in {}'
try:
cer_txt = _call(args.format(self._path_cer))
except Exception as e:
self.error = 'No se pudo convertir el CER en TXT'
return data
args = 'openssl x509 -inform DER -in {} -noout -{}'
try:
result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3]
except Exception as e:
self.error = 'No se puede saber si es FIEL'
return data
if result == 'SSL server : No':
self.error = 'El certificado es FIEL'
return data
result = _call(args.format(self._path_cer, 'serial'))
serie = result.split('=')[1].split('\n')[0][1::2]
result = _call(args.format(self._path_cer, 'subject'))
rfc = result.split('=')[5].split('/')[0].strip()
if not DEBUG:
if not rfc == session_rfc:
self.error = 'El RFC del certificado no corresponde.'
return data
dates = _call(args.format(self._path_cer, 'dates')).split('\n')
desde = parser.parse(dates[0].split('=')[1])
hasta = parser.parse(dates[1].split('=')[1])
self._modulus = _call(args.format(self._path_cer, 'modulus'))
data['cer'] = self._cer
data['cer_tmp'] = None
data['cer_pem'] = cer_pem
data['cer_txt'] = cer_txt.replace('\n', '')
data['serie'] = serie
data['rfc'] = rfc
data['desde'] = desde
data['hasta'] = hasta
return data
def _get_p12(self, password, rfc):
tmp_cer = tempfile.mkstemp()[1]
tmp_key = tempfile.mkstemp()[1]
tmp_p12 = tempfile.mkstemp()[1]
args = 'openssl x509 -inform DER -in "{}" -out "{}"'
_call(args.format(self._path_cer, tmp_cer))
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} -out "{}"'
_call(args.format(self._path_key, password, tmp_key))
args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" -passout ' \
'pass:"{}" -out "{}"'
_call(args.format(tmp_cer, tmp_key, rfc,
hashlib.md5(rfc.encode()).hexdigest(), tmp_p12))
data = open(tmp_p12, 'rb').read()
self._kill(tmp_cer)
self._kill(tmp_key)
self._kill(tmp_p12)
return data
def _get_info_key(self, password, rfc):
data = {}
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{}'
try:
result = _call(args.format(self._path_key, password))
except Exception as e:
self.error = 'Contraseña incorrecta'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
'openssl rsa -noout -modulus'
mod_key = _call(args.format(self._path_key, password))
if self._modulus != mod_key:
self.error = 'Los archivos no son pareja'
return data
args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:{} | ' \
'openssl rsa -des3 -passout pass:{}'.format(
self._path_key, password, _get_md5(rfc))
key_enc = _call(args)
data['key'] = self._key
data['key_tmp'] = None
data['key_enc'] = key_enc
data['p12'] = self._get_p12(password, rfc)
return data
def validate(self, password, rfc):
if not self._path_key or not self._path_cer:
self.error = 'Error al cargar el certificado'
return {}
data = self._get_info_cer(rfc)
llave = self._get_info_key(password, data['rfc'])
if not llave:
return {}
data.update(llave)
self._kill(self._path_key)
self._kill(self._path_cer)
return data
def make_xml(data, certificado):
from .cfdi_xml import CFDI
if DEBUG:
data['emisor']['Rfc'] = certificado.rfc
data['emisor']['RegimenFiscal'] = '603'
cfdi = CFDI()
xml = cfdi.get_xml(data)
data = {
'xsltproc': PATH_XSLTPROC,
'xslt': _join(PATH_XSLT, 'cadena.xslt'),
'xml': _save_temp(xml, 'w'),
'openssl': PATH_OPENSSL,
'key': _save_temp(certificado.key_enc, 'w'),
'pass': _get_md5(certificado.rfc)
}
args = '"{xsltproc}" "{xslt}" "{xml}" | ' \
'"{openssl}" dgst -sha256 -sign "{key}" -passin pass:{pass} | ' \
'"{openssl}" enc -base64 -A'.format(**data)
sello = _call(args)
_kill(data['xml'])
_kill(data['key'])
return cfdi.add_sello(sello)
def timbra_xml(xml):
from .pac import Finkok as PAC
result = {'ok': True, 'error': ''}
pac = PAC()
xml = pac.timbra_xml(xml)
if not xml:
result['ok'] = False
result['error'] = pac.error
return result
result['xml'] = xml
result['uuid'] = pac.uuid
result['fecha'] = pac.fecha
return result
class LIBO(object):
HOST = 'localhost'
PORT = '8100'
ARG = 'socket,host={},port={};urp;StarOffice.ComponentContext'.format(
HOST, PORT)
def __init__(self):
self._app = None
self._start_office()
self._init_values()
def _init_values(self):
self._ctx = None
self._sm = None
self._desktop = None
if self.is_running:
ctx = uno.getComponentContext()
service = 'com.sun.star.bridge.UnoUrlResolver'
resolver = ctx.ServiceManager.createInstanceWithContext(service, ctx)
self._ctx = resolver.resolve('uno:{}'.format(self.ARG))
self._sm = self._ctx.ServiceManager
self._desktop = self._create_instance('com.sun.star.frame.Desktop')
return
def _create_instance(self, name, with_context=True):
if with_context:
instance = self._sm.createInstanceWithContext(name, self._ctx)
else:
instance = self._sm.createInstance(name)
return instance
@property
def is_running(self):
try:
s = socket.create_connection((self.HOST, self.PORT), 5.0)
s.close()
return True
except ConnectionRefusedError:
return False
def _start_office(self):
if self.is_running:
return
c = 1
while c < 4:
c += 1
self.app = subprocess.Popen([
'soffice', '--headless', '--accept={}'.format(self.ARG)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(5)
if self.is_running:
return
return
def _set_properties(self, properties):
pl = []
for k, v in properties.items():
pv = PropertyValue()
pv.Name = k
pv.Value = v
pl.append(pv)
return tuple(pl)
def _doc_open(self, path, options):
options = self._set_properties(options)
path = self._path_url(path)
try:
doc = self._desktop.loadComponentFromURL(path, '_blank', 0, options)
return doc
except:
return None
def _path_url(self, path):
if path.startswith('file://'):
return path
return uno.systemPathToFileUrl(path)
def close(self):
if self.is_running:
if not self._desktop is None:
self._desktop.terminate()
if not self._app is None:
self._app.terminate()
return
def _read(self, path):
try:
return open(path, 'rb').read()
except:
return b''
def _clean(self):
self._sd.SearchRegularExpression = True
self._sd.setSearchString("\{(\w.+)\}")
self._search.replaceAll(self._sd)
return
def _cancelado(self, cancel):
if not cancel:
pd = self._sheet.getDrawPage()
if pd.getCount():
pd.remove(pd.getByIndex(0))
return
def _set_search(self):
self._sheet = self._template.getSheets().getByIndex(0)
self._search = self._sheet.getPrintAreas()[0]
self._search = self._sheet.getCellRangeByPosition(
self._search.StartColumn,
self._search.StartRow,
self._search.EndColumn,
self._search.EndRow
)
self._sd = self._sheet.createSearchDescriptor()
self._sd.SearchCaseSensitive = False
return
def _next_cell(self, cell):
col = cell.getCellAddress().Column
row = cell.getCellAddress().Row + 1
return self._sheet.getCellByPosition(col, row)
def _copy_cell(self, cell):
destino = self._next_cell(cell)
self._sheet.copyRange(destino.getCellAddress(), cell.getRangeAddress())
return destino
def _set_cell(self, k='', v=None, cell=None, value=False):
if k:
self._sd.setSearchString(k)
ranges = self._search.findAll(self._sd)
if ranges:
ranges = ranges.getRangeAddressesAsString().split(';')
for r in ranges:
for c in r.split(','):
cell = self._sheet.getCellRangeByName(c)
if v is None:
return cell
if cell.getImplementationName() == 'ScCellObj':
pattern = re.compile(k, re.IGNORECASE)
nv = pattern.sub(v, cell.getString())
if value:
cell.setValue(nv)
else:
cell.setString(nv)
return cell
if cell:
if cell.getImplementationName() == 'ScCellObj':
ca = cell.getCellAddress()
new_cell = self._sheet.getCellByPosition(ca.Column, ca.Row + 1)
if value:
new_cell.setValue(v)
else:
new_cell.setString(v)
return new_cell
def _comprobante(self, data):
for k, v in data.items():
if k in ('total', 'descuento', 'subtotal'):
self._set_cell('{cfdi.%s}' % k, v, value=True)
else:
self._set_cell('{cfdi.%s}' % k, v)
return
def _emisor(self, data):
for k, v in data.items():
self._set_cell('{emisor.%s}' % k, v)
return
def _receptor(self, data):
for k, v in data.items():
self._set_cell('{receptor.%s}' % k, v)
return
def _conceptos(self, data):
first = True
for concepto in data:
key = concepto.get('noidentificacion', '')
description = concepto['descripcion']
unidad = concepto['unidad']
cantidad = concepto['cantidad']
valor_unitario = concepto['valorunitario']
importe = concepto['importe']
if first:
first = False
cell_1 = self._set_cell('{noidentificacion}', key)
cell_2 = self._set_cell('{descripcion}', description)
cell_3 = self._set_cell('{unidad}', unidad)
cell_4 = self._set_cell('{cantidad}', cantidad, value=True)
cell_5 = self._set_cell('{valorunitario}', valor_unitario, value=True)
cell_6 = self._set_cell('{importe}', importe, value=True)
return
def _totales(self, data):
currency = data['moneda']
cell_title = self._set_cell('{subtotal.titulo}', 'SubTotal')
value = data['subtotal']
cell_value = self._set_cell('{subtotal}', value, value=True)
cell_value.CellStyle = currency
#~ Si encuentra el campo {total}, se asume que los totales e impuestos
#~ están declarados de forma independiente cada uno
#~ if self._add_totales(xml):
#~ return
#~ Si no se encuentra, copia las celdas hacia abajo de
#~ {subtotal.titulo} y {subtotal}
if 'descuento' in data:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v='Descuento', cell=cell_title)
value = data['descuento']
cell_value = self._set_cell(v=value, cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['traslados']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['retenciones']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['taxlocales']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v='Total', cell=cell_title)
value = data['total']
cell_value = self._set_cell(v=value, cell=cell_value, value=True)
cell_value.CellStyle = currency
return
def _timbre(self, data):
for k, v in data.items():
self._set_cell('{timbre.%s}' % k, v)
pd = self._sheet.getDrawPage()
image = self._template.createInstance('com.sun.star.drawing.GraphicObjectShape')
image.GraphicURL = data['path_cbb']
pd.add(image)
s = Size()
s.Width = 4250
s.Height = 4500
image.setSize(s)
image.Anchor = self._set_cell('{timbre.cbb}')
return
def _render(self, data):
self._set_search()
self._comprobante(data['comprobante'])
self._emisor(data['emisor'])
self._receptor(data['receptor'])
self._conceptos(data['conceptos'])
self._totales(data['totales'])
self._timbre(data['timbre'])
self._cancelado(data['cancelada'])
self._clean()
return
def pdf(self, path, data):
options = {'AsTemplate': True, 'Hidden': True}
self._template = self._doc_open(path, options)
if self._template is None:
return b''
self._render(data)
path = '{}.ods'.format(tempfile.mkstemp()[1])
self._template.storeToURL(self._path_url(path), ())
doc = self._doc_open(path, {'Hidden': True})
options = {'FilterName': 'calc_pdf_Export'}
path = tempfile.mkstemp()[1]
doc.storeToURL(self._path_url(path), self._set_properties(options))
doc.close(True)
self._template.close(True)
return self._read(path)
def to_pdf(styles, data):
#~ app = LIBO()
#~ if not app.is_running:
#~ return b''
#~ return app.pdf(path, data)
path = get_path_temp()
pdf = TemplateInvoice(path)
pdf.custom_styles = styles
pdf.data = data
pdf.render()
return read_file(path)
def parse_xml(xml):
return ET.fromstring(xml)
def get_dict(data):
return CaseInsensitiveDict(data)
def to_letters(value, moneda):
monedas = {
'MXN': 'peso',
'USD': 'dólar',
'EUR': 'euro',
}
return NumLet(value, monedas[moneda]).letras
def get_qr(data):
path = tempfile.mkstemp()[1]
qr = pyqrcode.create(data, mode='binary')
qr.png(path, scale=7)
return path
def _comprobante(values, options):
data = CaseInsensitiveDict(values)
del data['certificado']
data['totalenletras'] = to_letters(float(data['total']), data['moneda'])
if data['version'] == '3.3':
tipos = {
'I': 'ingreso',
'E': 'egreso',
'T': 'traslado',
}
data['tipodecomprobante'] = tipos.get(data['tipodecomprobante'])
data['lugarexpedicion'] = 'C.P. Expedición: {}'.format(data['lugarexpedicion'])
data['metododepago'] = options['metododepago']
data['formadepago'] = options['formadepago']
data['moneda'] = options['moneda']
data['tipocambio'] = 'Tipo de Cambio: $ {:0.2f}'.format(
float(data['tipocambio']))
return data
def _emisor(doc, version, values):
node = doc.find('{}Emisor'.format(PRE[version]))
data = CaseInsensitiveDict(node.attrib.copy())
node = node.find('{}DomicilioFiscal'.format(PRE[version]))
if not node is None:
data.update(CaseInsensitiveDict(node.attrib.copy()))
data['regimenfiscal'] = values['regimenfiscal']
return data
def _receptor(doc, version, values):
node = doc.find('{}Receptor'.format(PRE[version]))
data = CaseInsensitiveDict(node.attrib.copy())
node = node.find('{}Domicilio'.format(PRE[version]))
if not node is None:
data.update(node.attrib.copy())
data['usocfdi'] = values['usocfdi']
return data
def _conceptos(doc, version):
data = []
conceptos = doc.find('{}Conceptos'.format(PRE[version]))
for c in conceptos.getchildren():
values = CaseInsensitiveDict(c.attrib.copy())
if version == '3.3':
values['noidentificacion'] = '{}\n(SAT {})'.format(
values['noidentificacion'], values['ClaveProdServ'])
values['unidad'] = '({}) {}'.format(
values['ClaveUnidad'], values['unidad'])
data.append(values)
return data
def _totales(doc, cfdi, version):
data = {}
data['moneda'] = doc.attrib['Moneda']
data['subtotal'] = cfdi['subtotal']
if 'descuento' in cfdi:
data['descuento'] = cfdi['descuento']
data['total'] = cfdi['total']
tn = {
'001': 'ISR',
'002': 'IVA',
'003': 'IEPS',
}
traslados = []
retenciones = []
taxlocales = []
imp = doc.find('{}Impuestos'.format(PRE[version]))
if imp is not None:
tmp = CaseInsensitiveDict(imp.attrib.copy())
for k, v in tmp.items():
data[k] = v
node = imp.find('{}Traslados'.format(PRE[version]))
if node is not None:
for n in node.getchildren():
tmp = CaseInsensitiveDict(n.attrib.copy())
if version == '3.3':
title = 'Traslado {} {}'.format(
tn.get(tmp['impuesto']), tmp['tasaocuota'])
else:
title = 'Traslado {} {}'.format(tmp['impuesto'], tmp['tasa'])
traslados.append((title, float(tmp['importe'])))
node = imp.find('{}Retenciones'.format(PRE[version]))
if node is not None:
for n in node.getchildren():
tmp = CaseInsensitiveDict(n.attrib.copy())
if version == '3.3':
title = 'Retención {} {}'.format(
tn.get(tmp['impuesto']), '')
else:
title = 'Retención {} {}'.format(tmp['impuesto'], '')
retenciones.append((title, float(tmp['importe'])))
#~ com = xml.find('%sComplemento' % PRE)
#~ if com is not None:
#~ otros = com.find('%sImpuestosLocales' % IMP_LOCAL)
#~ if otros is not None:
#~ for otro in list(otros):
#~ if otro.tag == '%sRetencionesLocales' % IMP_LOCAL:
#~ name = 'ImpLocRetenido'
#~ tasa = 'TasadeRetencion'
#~ else:
#~ name = 'ImpLocTrasladado'
#~ tasa = 'TasadeTraslado'
#~ title = '%s %s %%' % (otro.attrib[name], otro.attrib[tasa])
#~ value = otro.attrib['Importe']
#~ self._copy_cell(cell_title)
#~ self._copy_cell(cell_value)
#~ cell_title = self._set_cell(v=title, cell=cell_title)
#~ cell_value = self._set_cell(v=value, cell=cell_value, value=True)
#~ cell_value.CellStyle = currency
data['traslados'] = traslados
data['retenciones'] = retenciones
data['taxlocales'] = taxlocales
return data
def _timbre(doc, version, values):
CADENA = '||{version}|{UUID}|{FechaTimbrado}|{selloCFD}|{noCertificadoSAT}||'
if version == '3.3':
CADENA = '||{Version}|{UUID}|{FechaTimbrado}|{SelloCFD}|{NoCertificadoSAT}||'
node = doc.find('{}Complemento/{}TimbreFiscalDigital'.format(
PRE[version], PRE['TIMBRE']))
data = CaseInsensitiveDict(node.attrib.copy())
total_s = '%017.06f' % float(values['total'])
qr_data = '?re=%s&rr=%s&tt=%s&id=%s' % (
values['rfc_emisor'],
values['rfc_receptor'],
total_s,
node.attrib['UUID'])
data['path_cbb'] = get_qr(qr_data)
data['cadenaoriginal'] = CADENA.format(**node.attrib)
return data
def get_data_from_xml(invoice, rfc, values):
name = '{}_factura.json'.format(rfc.lower())
custom_styles = get_custom_styles(name)
data = {'cancelada': invoice.cancelada}
doc = parse_xml(invoice.xml)
data['comprobante'] = _comprobante(doc.attrib.copy(), values)
version = data['comprobante']['version']
data['emisor'] = _emisor(doc, version, values)
data['receptor'] = _receptor(doc, version, values)
data['conceptos'] = _conceptos(doc, version)
data['totales'] = _totales(doc, data['comprobante'], version)
options = {
'rfc_emisor': data['emisor']['rfc'],
'rfc_receptor': data['receptor']['rfc'],
'total': data['comprobante']['total'],
}
data['timbre'] = _timbre(doc, version, options)
return custom_styles, data
def to_zip(*files):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
for data, file_name in files:
zip_file.writestr(file_name, data)
return zip_buffer.getvalue()
def make_fields(xml):
doc = ET.fromstring(xml)
data = CaseInsensitiveDict(doc.attrib.copy())
data.pop('certificado')
data.pop('sello')
version = data['version']
receptor = doc.find('{}Receptor'.format(PRE[version]))
receptor = CaseInsensitiveDict(receptor.attrib.copy())
data['receptor_nombre'] = receptor['nombre']
data['receptor_rfc'] = receptor['rfc']
data = {k.lower(): v for k, v in data.items()}
return data
def make_info_mail(data, fields):
return data.format(**fields).replace('\n', '<br/>')
def send_mail(data):
msg = ''
server = SendMail(data['server'])
is_connect = server.is_connect
if is_connect:
msg = server.send(data['options'])
else:
msg = server.error
server.close()
return {'ok': is_connect, 'msg': msg}
def get_path_info(path):
path, filename = os.path.split(path)
name, extension = os.path.splitext(filename)
return (path, filename, name, extension)
def get_path_temp():
return tempfile.mkstemp()[1]
class ImportFacturaLibre(object):
def __init__(self, path):
self._con = None
self._cursor = None
self._is_connect = self._connect(path)
@property
def is_connect(self):
return self._is_connect
def _connect(self, path):
try:
self._con = sqlite3.connect(path)
self._con.row_factory = sqlite3.Row
self._cursor = self._con.cursor()
return True
except Exception as e:
log.error(e)
return False
def close(self):
try:
self._cursor.close()
self._con.close()
except:
pass
return
def import_data(self):
data = {}
tables = (
('receptores', 'Socios'),
)
for source, target in tables:
data[target] = self._get_table(source)
return data
def _get_table(self, table):
return getattr(self, '_{}'.format(table))()
def _receptores(self):
sql = "SELECT * FROM receptores"
self._cursor.execute(sql)
rows = self._cursor.fetchall()
#~ names = [d[0] for d in self._cursor.description]
fields = (
('id', 'id'),
('rfc', 'rfc'),
('nombre', 'nombre'),
('calle', 'calle'),
('noExterior', 'no_exterior'),
('noInterior', 'no_interior'),
('colonia', 'colonia'),
('municipio', 'municipio'),
('estado', 'estado'),
('pais', 'pais'),
('codigoPostal', 'codigo_postal'),
('extranjero', 'es_extranjero'),
('activo', 'es_activo'),
('fechaalta', 'fecha_alta'),
('notas', 'notas'),
('cuentaCliente', 'cuenta_cliente'),
('cuentaProveedor', 'cuenta_proveedor'),
('saldoCliente', 'saldo_cliente'),
('saldoProveedor', 'saldo_proveedor'),
('esCliente', 'es_cliente'),
('esProveedor', 'es_proveedor'),
)
data = []
sql1 = "SELECT correo FROM correos WHERE id_cliente=?"
sql2 = "SELECT telefono FROM telefonos WHERE id_cliente=?"
for row in rows:
new = {t: row[s] for s, t in fields}
new['slug'] = to_slug(new['nombre'])
if new['es_extranjero']:
new['tipo_persona'] = 4
elif new['rfc'] == 'XAXX010101000':
new['tipo_persona'] = 3
elif len(new['rfc']) == 12:
new['tipo_persona'] = 2
self._cursor.execute(sql1, (new['id'],))
tmp = self._cursor.fetchall()
if tmp:
new['correo_facturas'] = ', '.join([r[0] for r in tmp])
self._cursor.execute(sql2, (new['id'],))
tmp = self._cursor.fetchall()
if tmp:
new['telefonos'] = ', '.join([r[0] for r in tmp])
data.append(new)
return data