empresa-libre/source/app/controllers/util.py

3018 lines
91 KiB
Python

#!/usr/bin/env python3
# ~ Empresa Libre
# ~ Copyright (C) 2016-2018 Mauricio Baeza Servin (web@correolibre.net)
# ~
# ~ This program is free software: you can redistribute it and/or modify
# ~ it under the terms of the GNU General Public License as published by
# ~ the Free Software Foundation, either version 3 of the License, or
# ~ (at your option) any later version.
# ~
# ~ This program is distributed in the hope that it will be useful,
# ~ but WITHOUT ANY WARRANTY; without even the implied warranty of
# ~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# ~ GNU General Public License for more details.
# ~
# ~ You should have received a copy of the GNU General Public License
# ~ along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import datetime
import getpass
import hashlib
import io
import json
import locale
import mimetypes
import os
import re
import requests
import sqlite3
import socket
import subprocess
import tempfile
import textwrap
import threading
import time
import unicodedata
import uuid
import zipfile
from io import BytesIO
from math import trunc
from pathlib import Path
from xml.etree import ElementTree as ET
from xml.dom.minidom import parseString
try:
import uno
from com.sun.star.beans import PropertyValue
from com.sun.star.awt import Size
from com.sun.star.view.PaperFormat import LETTER
APP_LIBO = True
except ImportError:
APP_LIBO = False
# ~ import pyqrcode
from dateutil import parser
from lxml import etree
import mako.runtime
from mako.exceptions import TopLevelLookupException
mako.runtime.UNDEFINED = ''
from .helper import CaseInsensitiveDict, NumLet, SendMail, TemplateInvoice, \
SeaFileAPI, PrintTicket
from settings import DEBUG, MV, log, template_lookup, COMPANIES, DB_SAT, \
PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL, PATH_TEMPLATES, PATH_MEDIA, PRE, \
PATH_XMLSEC, TEMPLATE_CANCEL, DEFAULT_SAT_PRODUCTO, DECIMALES, DIR_FACTURAS
from settings import USAR_TOKEN, API, DECIMALES_TAX
# ~ from .configpac import AUTH
from .utils import get_qr
# ~ v2
import segno
from .pacs.cfdi_cert import SATCertificate
from settings import (
CFDI_VERSIONS,
EXT,
MXN,
PATHS,
PRE_DEFAULT,
)
def _call(args):
return subprocess.check_output(args, shell=True).decode()
def _get_md5(data):
return hashlib.md5(data.encode()).hexdigest()
def save_temp(data, modo='wb'):
path = tempfile.mkstemp()[1]
with open(path, modo) as f:
f.write(data)
return path
def save_file(path, data, modo='wb'):
try:
with open(path, modo) as f:
f.write(data)
return True
except:
return False
def _join(*paths):
return os.path.join(*paths)
def _kill(path):
try:
os.remove(path)
except:
pass
return
def get_pass():
password = getpass.getpass('Introduce la contraseña: ')
pass2 = getpass.getpass('Confirma la contraseña: ')
if password != pass2:
msg = 'Las contraseñas son diferentes'
return False, msg
password = password.strip()
if not password:
msg = 'La contraseña es necesaria'
return False, msg
return True, password
def get_value(arg):
value = input('Introduce el {}: '.format(arg)).strip()
if not value:
msg = 'El {} es requerido'.format(arg)
log.error(msg)
return ''
return value
def _valid_db_companies():
con = sqlite3.connect(COMPANIES)
sql = """
CREATE TABLE IF NOT EXISTS names(
rfc TEXT NOT NULL COLLATE NOCASE UNIQUE,
con TEXT NOT NULL
);
"""
cursor = con.cursor()
cursor.executescript(sql)
cursor.close()
con.close()
return
def _get_args(rfc):
_valid_db_companies()
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT con FROM names WHERE rfc=?"
cursor.execute(sql, (rfc,))
values = cursor.fetchone()
if values is None:
msg = 'No se encontró el RFC'
log.error(msg)
return ''
cursor.close()
con.close()
return values[0]
def get_rfcs():
_valid_db_companies()
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT * FROM names"
cursor.execute(sql)
values = cursor.fetchall()
cursor.close()
con.close()
return values
def get_con(rfc=''):
if not rfc:
rfc = get_value('RFC').upper()
if not rfc:
return {}
args = _get_args(rfc.upper())
if not args:
return {}
return loads(args)
def get_sat_key(table, key):
con = sqlite3.connect(DB_SAT)
cursor = con.cursor()
sql = 'SELECT key, name FROM {} WHERE key=?'.format(table)
cursor.execute(sql, (key,))
data = cursor.fetchone()
cursor.close()
con.close()
if data is None:
return {'ok': False, 'text': 'No se encontró la clave'}
return {'ok': True, 'text': data[1]}
def get_sat_monedas(key):
con = sqlite3.connect(DB_SAT)
con.row_factory = sqlite3.Row
cursor = con.cursor()
filtro = '%{}%'.format(key)
sql = "SELECT * FROM monedas WHERE key LIKE ? OR name LIKE ?"
cursor.execute(sql, [filtro, filtro])
data = cursor.fetchall()
cursor.close()
con.close()
if data is None:
return ()
data = [dict(r) for r in data]
return tuple(data)
def get_sat_unidades(key):
con = sqlite3.connect(DB_SAT)
con.row_factory = sqlite3.Row
cursor = con.cursor()
filtro = '%{}%'.format(key)
sql = "SELECT * FROM unidades WHERE key LIKE ? OR name LIKE ?"
cursor.execute(sql, [filtro, filtro])
data = cursor.fetchall()
cursor.close()
con.close()
if data is None:
return ()
data = [dict(r) for r in data]
return tuple(data)
def get_sat_unidadespeso(key):
con = sqlite3.connect(DB_SAT)
con.row_factory = sqlite3.Row
cursor = con.cursor()
filtro = '%{}%'.format(key)
sql = "SELECT * FROM unidad_peso WHERE key LIKE ? OR name LIKE ?"
cursor.execute(sql, [filtro, filtro])
data = cursor.fetchall()
cursor.close()
con.close()
if data is None:
return ()
data = tuple([dict(r) for r in data])
return data
def get_sat_productos(key):
con = sqlite3.connect(DB_SAT)
con.row_factory = sqlite3.Row
cursor = con.cursor()
filtro = '%{}%'.format(key)
sql = "SELECT * FROM productos WHERE key LIKE ? OR name LIKE ?"
cursor.execute(sql, [filtro, filtro])
data = cursor.fetchall()
cursor.close()
con.close()
if data is None:
return ()
data = [dict(r) for r in data]
return tuple(data)
def now():
n = datetime.datetime.now().replace(microsecond=0)
return n
def today():
return datetime.date.today()
def get_token():
return _get_hash(uuid.uuid4().hex)
def get_mimetype(path):
mt = mimetypes.guess_type(path)[0]
return mt or 'application/octet-stream'
def is_file(path):
return os.path.isfile(path)
def get_stream(path):
return get_file(path), get_size(path)
def get_file(path):
return open(path, 'rb')
def get_files(path, ext='xml'):
docs = []
for folder, _, files in os.walk(path):
pattern = re.compile('\.{}'.format(ext), re.IGNORECASE)
docs += [os.path.join(folder, f) for f in files if pattern.search(f)]
return tuple(docs)
def read_file(path, mode='rb'):
return open(path, mode).read()
def get_size(path):
return os.path.getsize(path)
def get_template(name, data={}):
# ~ print ('NAME', name)
template = template_lookup.get_template(name)
return template.render(**data)
def get_custom_styles(name, default='plantilla_factura.json'):
path = _join(PATH_MEDIA, 'templates', name.lower())
if is_file(path):
with open(path) as fh:
return loads(fh.read())
path = _join(PATH_TEMPLATES, default)
if is_file(path):
with open(path) as fh:
return loads(fh.read())
return {}
def get_template_ods(name, default='plantilla_factura.ods'):
path = _join(PATH_MEDIA, 'templates', name.lower())
if is_file(path):
return path
if 'pagos' in name:
default='plantilla_pagos.ods'
path = _join(PATH_TEMPLATES, default)
if is_file(path):
return path
return ''
def dumps(data):
return json.dumps(data, default=str)
def loads(data):
return json.loads(data)
def import_json(path):
return loads(read_file(path, 'r'))
def clean(values):
for k, v in values.items():
if isinstance(v, str):
values[k] = v.strip()
return values
def parse_con(values):
data = values.split('|')
try:
con = {'type': data[0]}
if con['type'] == 'sqlite':
con['name'] = data[1]
else:
if data[1]:
con['host'] = data[1]
if data[2]:
con['port'] = data[2]
con['name'] = data[3]
con['user'] = data[4]
con['password'] = data[5]
return con
except IndexError:
return {}
def spaces(value):
return '\n'.join([' '.join(l.split()) for l in value.split('\n')])
def to_slug(string):
value = (unicodedata.normalize('NFKD', string)
.encode('ascii', 'ignore')
.decode('ascii').lower())
return value.replace(' ', '_')
def timbra_xml(xml, auth):
from .pac import Finkok as PAC
if not DEBUG and not auth:
msg = 'Sin datos para timbrar'
result = {'ok': False, 'error': msg}
return result
result = {'ok': True, 'error': ''}
pac = PAC(auth)
new_xml = pac.timbra_xml(xml)
if not new_xml:
result['ok'] = False
result['error'] = pac.error
if pac.error.startswith('413'):
return _ecodex_timbra_xml(xml)
else:
return result
result['xml'] = new_xml
result['uuid'] = pac.uuid
result['fecha'] = pac.fecha
return result
def _get_uuid_fecha(xml):
doc = parse_xml(xml)
version = doc.attrib['Version']
node = doc.find('{}Complemento/{}TimbreFiscalDigital'.format(
PRE[version], PRE['TIMBRE']))
return node.attrib['UUID'], node.attrib['FechaTimbrado']
# ~ def get_sat(xml):
# ~ from .pac import get_status_sat
# ~ return get_status_sat(xml)
class LIBO(object):
HOST = 'localhost'
PORT = '8100'
ARG = 'socket,host={},port={};urp;StarOffice.ComponentContext'.format(
HOST, PORT)
CMD = ['soffice',
'-env:SingleAppInstance=false',
'-env:UserInstallation=file:///tmp/LIBO_Process8100',
'--headless', '--norestore', '--nologo', '--accept={}'.format(ARG)]
CELL_STYLE = {
'EUR': 'euro',
}
def __init__(self):
self._app = None
self._start_office()
self._init_values()
def _init_values(self):
self._es_pre = False
self._ctx = None
self._sm = None
self._desktop = None
self._currency = MXN
self._total_cantidades = 0
if self.is_running:
ctx = uno.getComponentContext()
service = 'com.sun.star.bridge.UnoUrlResolver'
resolver = ctx.ServiceManager.createInstanceWithContext(service, ctx)
self._ctx = resolver.resolve('uno:{}'.format(self.ARG))
self._sm = self._ctx.ServiceManager
self._desktop = self._create_instance('com.sun.star.frame.Desktop')
return
def _create_instance(self, name, with_context=True):
if with_context:
instance = self._sm.createInstanceWithContext(name, self._ctx)
else:
instance = self._sm.createInstance(name)
return instance
@property
def is_running(self):
try:
s = socket.create_connection((self.HOST, self.PORT), 5.0)
s.close()
return True
except ConnectionRefusedError:
return False
def _start_office(self):
if self.is_running:
return
for i in range(3):
self.app = subprocess.Popen(self.CMD,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(5)
if self.is_running:
break
return
def _set_properties(self, properties):
pl = []
for k, v in properties.items():
pv = PropertyValue()
pv.Name = k
pv.Value = v
pl.append(pv)
return tuple(pl)
def _doc_open(self, path, options):
options = self._set_properties(options)
path = self._path_url(path)
try:
doc = self._desktop.loadComponentFromURL(path, '_blank', 0, options)
return doc
except:
return None
def _path_url(self, path):
if path.startswith('file://'):
return path
return uno.systemPathToFileUrl(path)
def close(self):
if self.is_running:
if not self._desktop is None:
self._desktop.terminate()
if not self._app is None:
self._app.terminate()
return
def _read(self, path):
try:
return open(path, 'rb').read()
except:
return b''
def _clean(self):
self._sd.SearchRegularExpression = True
self._sd.setSearchString("\{(\w.+)\}")
self._search.replaceAll(self._sd)
return
def _cancelado(self, cancel):
if not cancel:
pd = self._sheet.getDrawPage()
if pd.getCount():
pd.remove(pd.getByIndex(0))
return
def _set_search(self):
self._sheet = self._template.getSheets().getByIndex(0)
try:
self._search = self._sheet.getPrintAreas()[0]
except IndexError:
self._search = self._sheet.getRangeAddress()
self._search = self._sheet.getCellRangeByPosition(
self._search.StartColumn,
self._search.StartRow,
self._search.EndColumn,
self._search.EndRow
)
self._sd = self._sheet.createSearchDescriptor()
try:
self._sd.SearchCaseSensitive = False
except:
print ('SD', self._sd)
return
def _next_cell(self, cell):
col = cell.getCellAddress().Column
row = cell.getCellAddress().Row + 1
return self._sheet.getCellByPosition(col, row)
def _copy_cell(self, cell):
destino = self._next_cell(cell)
self._sheet.copyRange(destino.getCellAddress(), cell.getRangeAddress())
return destino
def _set_cell(self, k='', v=None, cell=None, value=False):
if k:
self._sd.setSearchString(k)
ranges = self._search.findAll(self._sd)
if ranges:
ranges = ranges.getRangeAddressesAsString().split(';')
for r in ranges:
for c in r.split(','):
cell = self._sheet.getCellRangeByName(c)
if v is None:
return cell
if cell.getImplementationName() == 'ScCellObj':
pattern = re.compile(k, re.IGNORECASE)
nv = pattern.sub(v, cell.getString())
if value:
cell.setValue(nv)
else:
cell.setString(nv)
return cell
if cell:
if cell.getImplementationName() == 'ScCellObj':
ca = cell.getCellAddress()
new_cell = self._sheet.getCellByPosition(ca.Column, ca.Row + 1)
if value:
new_cell.setValue(v)
else:
new_cell.setString(v)
return new_cell
def _comprobante(self, data):
for k, v in data.items():
if k.lower() in ('total', 'descuento', 'subtotal', 'totalgravado', 'totalexento'):
self._set_cell('{cfdi.%s}' % k, v, value=True)
else:
self._set_cell('{cfdi.%s}' % k, v)
return
def _informacion_global(self, data):
for k, v in data.items():
print(k, v)
self._set_cell('{cfdi.%s}' % k, v)
return
def _emisor(self, data):
for k, v in data.items():
self._set_cell('{emisor.%s}' % k, v)
return
def _receptor(self, data):
for k, v in data.items():
if k.lower() in ('salariobasecotapor', 'salariodiariointegrado'):
self._set_cell('{receptor.%s}' % k, v, value=True)
else:
self._set_cell('{receptor.%s}' % k, v)
return
def _copy_row(self, cell):
row = cell.getCellAddress().Row
source = self._sheet.getRows().getByIndex(row)
nc = self._next_cell(cell)
self._sheet.copyRange(nc.getCellAddress(), source.getRangeAddress())
return
def _clean_rows(self, row, count):
for i in range(count):
source = self._sheet.getRows().getByIndex(row + i)
source.clearContents(5)
return
def _copy_paste_rows(self, cell, count):
dispatch = self._create_instance('com.sun.star.frame.DispatchHelper')
row = cell.getCellAddress().Row
source = self._sheet.getRows().getByIndex(row)
self._template.getCurrentController().select(source)
frame = self._template.getCurrentController().getFrame()
dispatch.executeDispatch(frame, '.uno:Copy', '', 0, ())
target = self._sheet.getCellRangeByPosition(0, row + 1, 0, row + count)
self._template.getCurrentController().select(target)
dispatch.executeDispatch(frame, '.uno:Paste', '', 0, ())
return
def _get_style(self, cell):
if cell is None:
return ''
match = re.match(r"([a-z]+)([0-9]+)", cell.CellStyle, re.I)
if not match:
return ''
currency = self.CELL_STYLE.get(self._currency, 'peso')
return '{}{}'.format(currency, match.groups()[1])
def _conceptos(self, data, pakings):
first = True
col1 = []
col2 = []
col3 = []
col4 = []
col5 = []
col6 = []
col7 = []
col8 = []
count = len(data) - 1
for i, concepto in enumerate(data):
key = concepto.get('noidentificacion', '')
description = concepto['descripcion']
unidad = concepto['unidad']
cantidad = concepto['cantidad']
valor_unitario = concepto['valorunitario']
importe = concepto['importe']
descuento = concepto.get('descuento', '0.0')
if first:
first = False
cell_1 = self._set_cell('{noidentificacion}', key)
cell_2 = self._set_cell('{descripcion}', description)
cell_3 = self._set_cell('{unidad}', unidad)
cell_4 = self._set_cell('{cantidad}', cantidad, value=True)
cell_5 = self._set_cell('{valorunitario}', valor_unitario, value=True)
cell_6 = self._set_cell('{importe}', importe, value=True)
cell_7 = self._set_cell('{descuento}', descuento, value=True)
if pakings:
cell_8 = self._set_cell('{empaque}', pakings[i], value=True)
if len(data) > 1:
row = cell_1.getCellAddress().Row + 1
self._sheet.getRows().insertByIndex(row, count)
self._copy_paste_rows(cell_1, count)
row = cell_1.getCellAddress().Row
else:
col1.append((key,))
col2.append((description,))
col3.append((unidad,))
col4.append((float(cantidad),))
col5.append((float(valor_unitario),))
col6.append((float(importe),))
col7.append((float(descuento),))
if pakings:
col8.append((pakings[i],))
self._total_cantidades += float(cantidad)
if not count:
if not cell_5 is None:
cell_5.CellStyle = self._get_style(cell_5)
if not cell_6 is None:
cell_6.CellStyle = self._get_style(cell_6)
return
style_5 = self._get_style(cell_5)
style_6 = self._get_style(cell_6)
style_7 = self._get_style(cell_7)
style_8 = ''
if pakings:
style_8 = self._get_style(cell_8)
col = cell_1.getCellAddress().Column
target1 = self._sheet.getCellRangeByPosition(col, row+1, col, row+count)
col = cell_2.getCellAddress().Column
target2 = self._sheet.getCellRangeByPosition(col, row+1, col, row+count)
col = cell_3.getCellAddress().Column
target3 = self._sheet.getCellRangeByPosition(col, row+1, col, row+count)
col = cell_4.getCellAddress().Column
target4 = self._sheet.getCellRangeByPosition(col, row+1, col, row+count)
col = cell_5.getCellAddress().Column
target5 = self._sheet.getCellRangeByPosition(col, row+1, col, row+count)
col = cell_6.getCellAddress().Column
target6 = self._sheet.getCellRangeByPosition(col, row+1, col, row+count)
target7 = None
target8 = None
if not cell_7 is None:
col = cell_7.getCellAddress().Column
target7 = self._sheet.getCellRangeByPosition(col, row+1, col, row+count)
if pakings and cell_8:
col = cell_8.getCellAddress().Column
target8 = self._sheet.getCellRangeByPosition(col, row+1, col, row+count)
target1.setFormulaArray(tuple(col1))
target2.setDataArray(tuple(col2))
target3.setFormulaArray(tuple(col3))
target4.setDataArray(tuple(col4))
target5.setDataArray(tuple(col5))
target6.setDataArray(tuple(col6))
if not target7 is None:
target7.setDataArray(tuple(col7))
if not target8 is None:
target8.setDataArray(tuple(col8))
if style_5:
cell_5.CellStyle = style_5
target5.CellStyle = style_5
if style_6:
cell_6.CellStyle = style_6
target6.CellStyle = style_6
if style_7:
cell_7.CellStyle = style_7
target7.CellStyle = style_7
if style_8:
cell_8.CellStyle = style_8
target8.CellStyle = style_8
return
def _add_totales(self, data):
currency = data['moneda']
value = data['total']
cell_value = self._set_cell('{total}', value, value=True)
if cell_value is None:
return False
cell_value.CellStyle = currency
return True
def _totales(self, data):
cell_styles = {
'EUR': 'euro',
}
currency = data['moneda']
self._set_cell('{total_cantidades}', str(self._total_cantidades))
if self._pagos:
return
cell_title = self._set_cell('{subtotal.titulo}', 'SubTotal')
value = data['subtotal']
cell_value = self._set_cell('{subtotal}', value, value=True)
if not cell_value is None:
cell_value.CellStyle = cell_styles.get(currency, 'peso')
#~ Si encuentra el campo {total}, se asume que los totales e impuestos
#~ están declarados de forma independiente cada uno
if self._add_totales(data):
return
#~ Si no se encuentra, copia las celdas hacia abajo de
#~ {subtotal.titulo} y {subtotal}
#~ print (data['descuento'])
if 'descuento' in data:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v='Descuento', cell=cell_title)
value = data['descuento']
cell_value = self._set_cell(v=value, cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['traslados']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['retenciones']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
for tax in data['taxlocales']:
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v=tax[0], cell=cell_title)
cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True)
cell_value.CellStyle = currency
self._copy_cell(cell_title)
self._copy_cell(cell_value)
cell_title = self._set_cell(v='Total', cell=cell_title)
value = data['total']
cell_value = self._set_cell(v=value, cell=cell_value, value=True)
cell_value.CellStyle = currency
return
def _timbre(self, data):
if self._es_pre or self._is_ticket:
return
qr = data.pop('cbb')
for k, v in data.items():
self._set_cell('{timbre.%s}' % k, v)
pd = self._sheet.getDrawPage()
image = self._template.createInstance('com.sun.star.drawing.GraphicObjectShape')
gp = self._create_instance('com.sun.star.graphic.GraphicProvider')
pd.add(image)
instance = 'com.sun.star.io.SequenceInputStream'
stream = self._create_instance(instance)
stream.initialize((uno.ByteSequence(qr.getvalue()),))
properties = self._set_properties({'InputStream': stream})
image.Graphic = gp.queryGraphic(properties)
s = Size()
s.Width = 4000
s.Height = 4000
image.setSize(s)
image.Anchor = self._set_cell('{timbre.cbb}')
return
def _donataria(self, data):
if not data:
return
for k, v in data.items():
self._set_cell('{donataria.%s}' % k, v)
return
def _ine(self, data):
if not data:
return
for k, v in data.items():
self._set_cell('{ine.%s}' % k, v)
return
def _divisas(self, data):
if data:
for k, v in data.items():
self._set_cell(f'{{divisas.{k}}}', v)
return
def _leyendas(self, data):
if not data:
return
first = True
for row in data:
leyenda = row['textoLeyenda']
norma = row.get('norma', '')
disposicion = row.get('disposicionFiscal', '')
if first:
first = False
cell1 = self._set_cell('{textoLeyenda}', leyenda)
cell2 = self._set_cell('{norma}', norma)
cell3 = self._set_cell('{disposicionFiscal}', disposicion)
else:
row = cell1.CellAddress.Row + 1
self._sheet.getRows().insertByIndex(row, 1)
cell1 = self._set_cell(v=leyenda, cell=cell1)
cell2 = self._set_cell(v=norma, cell=cell2)
cell3 = self._set_cell(v=disposicion, cell=cell3)
return
def _carta_porte(self, data):
if not data:
return
# ~ print(data)
figuras = data.pop('figuras')
mercancias = data.pop('mercancias')
detalle = mercancias.pop('detalle')
mercancias = mercancias.pop('mercancias')
autotransporte = data.pop('autotransporte')
ubicaciones = data.pop('ubicaciones')
for k, v in data.items():
self._set_cell(f'{{cp.{k}}}', v)
for k, v in figuras.items():
self._set_cell(f'{{cp.{k}}}', v)
for k, v in autotransporte.items():
self._set_cell(f'{{cp.{k}}}', v)
for k, v in mercancias.items():
self._set_cell(f'{{cp.{k}}}', v)
first = True
count = len(ubicaciones) - 1
for i, ubicacion in enumerate(ubicaciones):
tipo = ubicacion['TipoUbicacion']
nombre = ubicacion['NombreRemitenteDestinatario']
rfc = ubicacion['RFCRemitenteDestinatario']
nombre_rfc = f"{nombre} ({rfc})"
fecha = ubicacion['FechaHoraSalidaLlegada']
domicilio = ubicacion['domicilio']
if first:
first = False
cell_1 = self._set_cell('{cp.TipoUbicacion}', tipo)
cell_2 = self._set_cell('{cp.NombreRemitenteDestinatario}', nombre)
cell_3 = self._set_cell('{cp.RFCRemitenteDestinatario}', rfc)
cell_4 = self._set_cell('{cp.FechaHoraSalidaLlegada}', fecha)
cell_5 = self._set_cell('{cp.Domicilio}', domicilio)
row = cell_1.CellAddress.Row + 1
self._sheet.getRows().insertByIndex(row, count)
self._copy_paste_rows(cell_1, count)
else:
cell_1 = self._set_cell(v=tipo, cell=cell_1)
cell_2 = self._set_cell(v=nombre, cell=cell_2)
cell_3 = self._set_cell(v=rfc, cell=cell_3)
cell_4 = self._set_cell(v=fecha, cell=cell_4)
cell_5 = self._set_cell(v=domicilio, cell=cell_5)
first = True
count = len(detalle) - 1
for i, mercancia in enumerate(detalle):
clave = mercancia['BienesTransp']
descripcion = mercancia['Descripcion']
unidad = mercancia['ClaveUnidad']
cantidad = mercancia['Cantidad']
peso = mercancia['PesoEnKg']
if first:
first = False
cell_1 = self._set_cell('{cp.BienesTransp}', clave)
cell_2 = self._set_cell('{cp.Descripcion}', descripcion)
cell_3 = self._set_cell('{cp.ClaveUnidad}', unidad)
cell_4 = self._set_cell('{cp.Cantidad}', cantidad)
cell_5 = self._set_cell('{cp.PesoEnKg}', peso)
if count > 0:
row = cell_1.CellAddress.Row + 1
self._sheet.getRows().insertByIndex(row, count)
self._copy_paste_rows(cell_1, count)
else:
cell_1 = self._set_cell(v=clave, cell=cell_1)
cell_2 = self._set_cell(v=descripcion, cell=cell_2)
cell_3 = self._set_cell(v=unidad, cell=cell_3)
cell_4 = self._set_cell(v=cantidad, cell=cell_4)
cell_5 = self._set_cell(v=peso, cell=cell_5)
return
def _nomina(self, data):
if not data:
return
percepciones = data.pop('percepciones', [])
deducciones = data.pop('deducciones', [])
otrospagos = data.pop('otrospagos', [])
incapacidades = data.pop('incapacidades', [])
for k, v in data.items():
if k.lower() in ('totalpercepciones', 'totaldeducciones',
'totalotrospagos', 'subsidiocausado'):
self._set_cell('{nomina.%s}' % k, v, value=True)
else:
self._set_cell('{nomina.%s}' % k, v)
count = len(percepciones)
if len(deducciones) > count:
count = len(deducciones)
count -= 1
first = True
separacion = {}
for r in percepciones:
if 'TotalPagado' in r:
separacion = r
continue
tipo = r.get('TipoPercepcion')
concepto = r.get('Concepto')
gravado = r.get('ImporteGravado')
exento = r.get('ImporteExento')
if first:
first = False
cell_1 = self._set_cell('{percepcion.TipoPercepcion}', tipo)
cell_2 = self._set_cell('{percepcion.Concepto}', concepto)
cell_3 = self._set_cell('{percepcion.ImporteGravado}', gravado, value=True)
cell_4 = self._set_cell('{percepcion.ImporteExento}', exento, value=True)
if count:
row = cell_1.getCellAddress().Row + 1
self._sheet.getRows().insertByIndex(row, count)
self._copy_paste_rows(cell_1, count)
self._clean_rows(row, count)
else:
cell_1 = self._set_cell(v=tipo, cell=cell_1)
cell_2 = self._set_cell(v=concepto, cell=cell_2)
cell_3 = self._set_cell(v=gravado, cell=cell_3, value=True)
cell_4 = self._set_cell(v=exento, cell=cell_4, value=True)
first = True
for r in deducciones:
tipo = r.get('TipoDeduccion')
concepto = r.get('Concepto')
importe = r.get('Importe')
if first:
first = False
cell_1 = self._set_cell('{deduccion.TipoDeduccion}', tipo)
cell_2 = self._set_cell('{deduccion.Concepto}', concepto)
cell_3 = self._set_cell('{deduccion.Importe}', importe, value=True)
else:
cell_1 = self._set_cell(v=tipo, cell=cell_1)
cell_2 = self._set_cell(v=concepto, cell=cell_2)
cell_3 = self._set_cell(v=importe, cell=cell_3, value=True)
count = len(otrospagos) - 1
first = True
for r in otrospagos:
tipo = r.get('TipoOtroPago')
concepto = r.get('Concepto')
importe = r.get('Importe')
if first:
first = False
cell_1 = self._set_cell('{otropago.TipoOtroPago}', tipo)
cell_2 = self._set_cell('{otropago.Concepto}', concepto)
cell_3 = self._set_cell('{otropago.Importe}', importe, value=True)
if count:
row = cell_1.getCellAddress().Row + 1
self._sheet.getRows().insertByIndex(row, count)
self._copy_paste_rows(cell_1, count)
self._clean_rows(row, count)
else:
cell_1 = self._set_cell(v=tipo, cell=cell_1)
cell_2 = self._set_cell(v=concepto, cell=cell_2)
cell_3 = self._set_cell(v=importe, cell=cell_3, value=True)
count = len(incapacidades) - 1
first = True
for r in incapacidades:
tipo = r.get('TipoIncapacidad')
days = r.get('DiasIncapacidad')
importe = r.get('ImporteMonetario')
if first:
first = False
cell_1 = self._set_cell('{incapacidad.TipoIncapacidad}', tipo)
cell_2 = self._set_cell('{incapacidad.DiasIncapacidad}', days)
cell_3 = self._set_cell('{incapacidad.ImporteMonetario}', importe, value=True)
# ~ if count:
# ~ row = cell_1.getCellAddress().Row + 1
# ~ self._sheet.getRows().insertByIndex(row, count)
# ~ self._copy_paste_rows(cell_1, count)
# ~ self._clean_rows(row, count)
# ~ else:
# ~ cell_1 = self._set_cell(v=tipo, cell=cell_1)
# ~ cell_2 = self._set_cell(v=concepto, cell=cell_2)
# ~ cell_3 = self._set_cell(v=importe, cell=cell_3, value=True)
return
def _cfdipays(self, data):
VERSION2 = '2.0'
version = data['Version']
related = data.pop('related', [])
for k, v in data.items():
if k.lower() in ('monto',):
self._set_cell('{pago.%s}' % k, v, value=True)
else:
self._set_cell('{pago.%s}' % k, v)
col1 = []
col2 = []
col3 = []
col4 = []
col5 = []
col6 = []
col7 = []
col8 = []
col9 = []
count = len(related)
for i, doc in enumerate(related):
uuid = doc['IdDocumento'].upper()
serie = doc.get('Serie', '')
folio = doc['Folio']
metodo_pago = doc['MetodoDePagoDR']
moneda = doc['MonedaDR']
parcialidad = doc['NumParcialidad']
saldo_anterior = doc['ImpSaldoAnt']
importe_pagado = doc['ImpPagado']
saldo_insoluto = doc['ImpSaldoInsoluto']
if i == 0:
cell_1 = self._set_cell('{doc.uuid}', uuid)
cell_2 = self._set_cell('{doc.serie}', serie)
cell_3 = self._set_cell('{doc.folio}', folio)
if version != VERSION2:
cell_4 = self._set_cell('{doc.metodopago}', metodo_pago)
cell_5 = self._set_cell('{doc.moneda}', moneda)
cell_6 = self._set_cell('{doc.parcialidad}', parcialidad)
cell_7 = self._set_cell('{doc.saldoanterior}', saldo_anterior, value=True)
cell_8 = self._set_cell('{doc.importepagado}', importe_pagado, value=True)
cell_9 = self._set_cell('{doc.saldoinsoluto}', saldo_insoluto, value=True)
else:
col1.append((uuid,))
col2.append((serie,))
col3.append((folio,))
if version != VERSION2:
col4.append((metodo_pago,))
col5.append((moneda,))
col6.append((parcialidad,))
col7.append((float(saldo_anterior),))
col8.append((float(importe_pagado),))
col9.append((float(saldo_insoluto),))
if count == 1:
return
count -= 1
row1 = cell_1.getCellAddress().Row + 1
row2 = row1 + count - 1
self._sheet.getRows().insertByIndex(row1, count)
self._copy_paste_rows(cell_1, count)
# ~ style_7 = self._get_style(cell_7)
# ~ style_8 = self._get_style(cell_8)
# ~ style_9 = self._get_style(cell_9)
col = cell_1.getCellAddress().Column
target1 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
col = cell_2.getCellAddress().Column
target2 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
col = cell_3.getCellAddress().Column
target3 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
if version != VERSION2:
col = cell_4.getCellAddress().Column
target4 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
col = cell_5.getCellAddress().Column
target5 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
col = cell_6.getCellAddress().Column
target6 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
col = cell_7.getCellAddress().Column
target7 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
col = cell_8.getCellAddress().Column
target8 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
col = cell_9.getCellAddress().Column
target9 = self._sheet.getCellRangeByPosition(col, row1, col, row2)
target1.setFormulaArray(tuple(col1))
target2.setDataArray(tuple(col2))
target3.setFormulaArray(tuple(col3))
if version != VERSION2:
target4.setDataArray(tuple(col4))
target5.setDataArray(tuple(col5))
target6.setDataArray(tuple(col6))
target7.setDataArray(tuple(col7))
target8.setDataArray(tuple(col8))
target9.setDataArray(tuple(col9))
return
def _render(self, data):
self._set_search()
self._es_pre = data.pop('es_pre', False)
self._is_ticket = data.pop('is_ticket', False)
self._currency = data['totales']['moneda']
self._pagos = data.pop('pagos', False)
pakings = data.pop('pakings', [])
self._comprobante(data['comprobante'])
self._informacion_global(data.get('informacion_global', {}))
self._emisor(data['emisor'])
self._receptor(data['receptor'])
self._conceptos(data['conceptos'], pakings)
if self._pagos:
self._cfdipays(data['pays'])
if 'nomina' in data and data['nomina']:
self._nomina(data['nomina'])
else:
self._totales(data['totales'])
self._donataria(data['donataria'])
self._ine(data['ine'])
self._divisas(data.get('divisas', {}))
self._leyendas(data.get('leyendas', ''))
self._carta_porte(data.get('carta_porte', {}))
self._timbre(data['timbre'])
self._cancelado(data['cancelada'])
self._others_values(data)
self._clean()
return
def _others_values(self, data):
el_version = data.get('el.version', '')
if el_version:
self._set_cell('{el.version}', el_version)
return
def pdf(self, path, data, ods=False):
options = {'AsTemplate': True, 'Hidden': True}
log.debug('Abrir plantilla...')
self._template = self._doc_open(path, options)
if self._template is None:
return b''
self._template.setPrinter(self._set_properties({'PaperFormat': LETTER}))
self._render(data)
path_ods = get_path_temp('.ods')
self._template.storeToURL(self._path_url(path_ods), ())
if ods:
data = self._read(path_ods)
_kill(path_ods)
return data
options = {'FilterName': 'calc_pdf_Export'}
path_pdf = get_path_temp('.pdf')
self._template.storeToURL(self._path_url(path_pdf), self._set_properties(options))
try:
self._template.close(True)
except:
pass
data = self._read(path_pdf)
_kill(path_ods)
_kill(path_pdf)
return data
def _get_data(self, doc, name=0):
try:
sheet = doc.getSheets()[name]
cursor = sheet.createCursorByRange(sheet['A1'])
cursor.collapseToCurrentRegion()
except KeyError:
msg = 'Hoja no existe'
return (), msg
return cursor.getDataArray(), ''
def products(self, path):
options = {'AsTemplate': True, 'Hidden': True}
doc = self._doc_open(path, options)
if doc is None:
return (), 'No se pudo abrir la plantilla'
data, msg = self._get_data(doc)
doc.close(True)
if len(data) == 1:
msg = 'Sin datos para importar'
return (), msg
fields = (
'categoria',
'clave',
'clave_sat',
'descripcion',
'unidad',
'valor_unitario',
'inventario',
'existencia',
'codigo_barras',
'impuestos',
)
rows = [dict(zip(fields, r)) for r in data[1:]]
return rows, ''
def employees(self, path):
options = {'AsTemplate': True, 'Hidden': True}
doc = self._doc_open(path, options)
if doc is None:
return ()
data, msg = self._get_data(doc, 'Empleados')
doc.close(True)
if len(data) == 1:
msg = 'Sin datos para importar'
return (), msg
fields = (
'num_empleado',
'rfc',
'curp',
'nombre',
'paterno',
'materno',
'fecha_ingreso',
'imss',
'tipo_contrato',
'es_sindicalizado',
'tipo_jornada',
'tipo_regimen',
'departamento',
'puesto',
'riesgo_puesto',
'periodicidad_pago',
'banco',
'cuenta_bancaria',
'clabe',
'salario_base',
'salario_diario',
'estado',
'codigo_postal',
'notas',
'correo',
'regimen_fiscal',
)
rows = tuple([dict(zip(fields, r)) for r in data[1:]])
msg = 'Empleados importados correctamente'
return rows, msg
def _get_nomina(self, doc):
rows, msg = self._get_data(doc, 'Nomina')
if len(rows) == 2:
msg = 'Sin datos para importar'
return {}, msg
fields = (
'rfc',
'tipo_nomina',
'fecha_pago',
'fecha_inicial_pago',
'fecha_final_pago',
'relacionados',
'dias_pagados',
)
data = tuple([dict(zip(fields, r[1:])) for r in rows[2:]])
return data, ''
def _get_percepciones(self, doc, count):
rows, msg = self._get_data(doc, 'Percepciones')
if len(rows) == 2:
msg = 'Sin Percepciones'
return {}, msg
if len(rows[0][2:]) % 2:
msg = 'Las Percepciones deben ir en pares: Gravado y Exento'
return {}, msg
data = tuple([r[2:] for r in rows[:count+2]])
return data, ''
def _get_deducciones(self, doc, count):
rows, msg = self._get_data(doc, 'Deducciones')
if len(rows) == 2:
msg = 'Sin Deducciones'
return {}, msg
data = tuple([r[2:] for r in rows[:count+2]])
sheet = doc.Sheets['Deducciones']
notes = sheet.getAnnotations()
new_titles = {}
for n in notes:
col = n.getPosition().Column - 2
if data[0][col] == '004':
new_titles[col] = n.getString()
return data, new_titles, ''
def _get_otros_pagos(self, doc, count):
rows, msg = self._get_data(doc, 'OtrosPagos')
if len(rows) == 2:
msg = 'Sin Otros Pagos'
return {}, msg
data = tuple([r[2:] for r in rows[:count+2]])
return data, ''
def _get_separacion(self, doc, count):
rows, msg = self._get_data(doc, 'Separacion')
if len(rows) == 2:
msg = 'Sin Separacion'
return {}, msg
data = tuple([r[1:] for r in rows[:count+2]])
return data, ''
def _get_horas_extras(self, doc, count):
rows, msg = self._get_data(doc, 'HorasExtras')
if len(rows) == 2:
msg = 'Sin Horas Extras'
return {}, msg
if len(rows[1][1:]) % 4:
msg = 'Las Horas Extras deben ir grupos de 4 columnas'
return {}, msg
data = tuple([r[1:] for r in rows[:count+2]])
return data, ''
def _get_incapacidades(self, doc, count):
rows, msg = self._get_data(doc, 'Incapacidades')
if len(rows) == 2:
msg = 'Sin Incapacidades'
return {}, msg
if len(rows[1][1:]) % 3:
msg = 'Las Incapacidades deben ir grupos de 3 columnas'
return {}, msg
data = tuple([r[1:] for r in rows[:count+2]])
return data, ''
def nomina(self, path):
options = {'AsTemplate': True, 'Hidden': True}
doc = self._doc_open(path, options)
if doc is None:
msg = 'No se pudo abrir la plantilla'
return {}, msg
data = {}
nomina, msg = self._get_nomina(doc)
if msg:
doc.close(True)
return {}, msg
percepciones, msg = self._get_percepciones(doc, len(nomina))
if msg:
doc.close(True)
return {}, msg
deducciones, new_titles, msg = self._get_deducciones(doc, len(nomina))
if msg:
doc.close(True)
return {}, msg
otros_pagos, msg = self._get_otros_pagos(doc, len(nomina))
if msg:
doc.close(True)
return {}, msg
separacion, msg = self._get_separacion(doc, len(nomina))
if msg:
doc.close(True)
return {}, msg
horas_extras, msg = self._get_horas_extras(doc, len(nomina))
if msg:
doc.close(True)
return {}, msg
incapacidades, msg = self._get_incapacidades(doc, len(nomina))
if msg:
doc.close(True)
return {}, msg
doc.close(True)
rows = len(nomina) + 2
if rows != len(percepciones):
msg = 'Cantidad de filas incorrecta en: Percepciones'
return {}, msg
if rows != len(deducciones):
msg = 'Cantidad de filas incorrecta en: Deducciones'
return {}, msg
if rows != len(otros_pagos):
msg = 'Cantidad de filas incorrecta en: Otros Pagos'
return {}, msg
if rows != len(separacion):
msg = 'Cantidad de filas incorrecta en: Separación'
return {}, msg
if rows != len(horas_extras):
msg = 'Cantidad de filas incorrecta en: Horas Extras'
return {}, msg
if rows != len(incapacidades):
msg = 'Cantidad de filas incorrecta en: Incapacidades'
return {}, msg
data['nomina'] = nomina
data['percepciones'] = percepciones
data['deducciones'] = deducciones
data['otros_pagos'] = otros_pagos
data['separacion'] = separacion
data['horas_extras'] = horas_extras
data['incapacidades'] = incapacidades
data['new_titles'] = new_titles
return data, ''
def invoice(self, path):
options = {'AsTemplate': True, 'Hidden': True}
doc = self._doc_open(path, options)
if doc is None:
return (), 'No se pudo abrir la plantilla'
data, msg = self._get_data(doc)
doc.close(True)
if len(data) == 1:
msg = 'Sin datos para importar'
return (), msg
rows = tuple(data[1:])
return rows, ''
def to_pdf(data, emisor_rfc, ods=False, pdf_from='1'):
rfc = data['emisor']['rfc']
if DEBUG:
rfc = emisor_rfc
version = data['comprobante']['version']
default = f'plantilla_factura_{version}.ods'
if pdf_from == '2':
return to_pdf_from_json(rfc, version, data)
if 'nomina' in data and data['nomina']:
version_nomina = data['nomina']['version']
default = f'plantilla_nomina_{version}_{version_nomina}.ods'
version = f'{version}_cn_{version_nomina}'
if 'carta_porte' in data:
default = 'plantilla_factura_ccp.ods'
version = '{}_ccp_{}'.format(version, data['carta_porte']['version'])
if data.get('pagos', False):
version_pagos = data['pays']['version']
default = f'plantilla_pagos_{version}_{version_pagos}.ods'
version = f'{version}_cp_{version_pagos}'
if data['donativo']:
version_donatarias = data['donataria']['version']
default = f'plantilla_donatarias_{version}_{version_donatarias}.ods'
version = f'{version}_cd_{version_donatarias}'
template_name = f'{rfc.lower()}_{version}.ods'
# ~ print('T', template_name, default)
if APP_LIBO:
app = LIBO()
if app.is_running:
path = get_template_ods(template_name, default)
if path:
return app.pdf(path, data, ods)
return to_pdf_from_json(rfc, version, data)
def to_pdf_from_json(rfc, version, data):
rfc = rfc.lower()
name = '{}_{}.json'.format(rfc, version)
print('name', name)
custom_styles = get_custom_styles(name)
path_logo = _join(PATHS['LOGOS'], f"{rfc}.png")
if exists(path_logo):
data['emisor']['logo'] = path_logo
path_logo = _join(PATHS['LOGOS'], f"{rfc}_2.png")
if exists(path_logo):
data['emisor']['logo2'] = path_logo
buffer = io.BytesIO()
pdf = TemplateInvoice(buffer)
pdf.custom_styles = custom_styles
pdf.data = data
pdf.render()
return buffer.getvalue()
def format_currency(value, currency, digits=2):
c = {
MXN: '$',
'USD': '$',
'EUR': '',
}
s = c.get(currency, MXN)
return f'{s} {float(value):,.{digits}f}'
def to_html(data):
name = f"{data['rfc']}_{data['version']}.html"
try:
template = template_lookup.get_template(name)
except TopLevelLookupException:
template = template_lookup.get_template('plantilla_factura.html')
data['rfc'] = 'invoice'
# ~ data['cfdi_sello'] = textwrap.fill(data['cfdi_sello'], 50)
# ~ data['timbre_sellosat'] = textwrap.fill(data['timbre_sellosat'], 110)
# ~ data['timbre_cadenaoriginal'] = textwrap.fill(data['timbre_cadenaoriginal'], 140)
# ~ data['cfdi_sello'] = 'X'*100 + '<BR>' + 'X'*100 + '<BR>' + 'X'*100
# ~ data['timbre_sellosat'] = 'X'*100 + '<BR>' + 'X'*100 + '<BR>' + 'X'*100
return template.render(**data)
def html_to_pdf(data):
path_pdf = '/home/mau/test.pdf'
css = '/home/mau/projects/empresa-libre/source/static/css/invoice.css'
# ~ font_config = FontConfiguration()
# ~ html = HTML(string=data)
# ~ css = CSS(filename=path_css)
# ~ html.write_pdf(path_pdf, stylesheets=[css], font_config=font_config)
options = {
'page-size': 'Letter',
'margin-top': '0.50in',
'margin-right': '0.50in',
'margin-bottom': '0.50in',
'margin-left': '0.50in',
'encoding': "UTF-8",
}
pdfkit.from_string(data.decode(), path_pdf, options=options, css=css)
return
def import_employees(rfc):
msg = 'No se pudo cargar el archivo'
name = '{}_employees.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'tmp', name)
if not is_file(path):
return (), msg
msg = 'LibreOffice no se pudo iniciar'
if APP_LIBO:
app = LIBO()
if app.is_running:
return app.employees(path)
return (), msg
def import_nomina(rfc):
name = '{}_nomina.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'tmp', name)
if not is_file(path):
return ()
if APP_LIBO:
app = LIBO()
if app.is_running:
return app.nomina(path)
return ()
def parse_xml(xml):
try:
return ET.fromstring(xml)
except ET.ParseError:
return None
def to_pretty_xml(xml):
tree = parseString(xml)
return tree.toprettyxml(encoding='utf-8').decode('utf-8')
def get_dict(data):
return CaseInsensitiveDict(data)
def to_letters(value, currency):
return NumLet(value, currency).letras
# ~ def get_qr(data, p=True):
# ~ qr = pyqrcode.create(data, mode='binary')
# ~ if p:
# ~ path = get_path_temp('.qr')
# ~ qr.png(path, scale=7)
# ~ return path
# ~ buffer = io.BytesIO()
# ~ qr.png(buffer, scale=8)
# ~ return base64.b64encode(buffer.getvalue()).decode()
# ~ def get_qr2(data, kind='svg'):
# ~ buffer = io.BytesIO()
# ~ segno.make(data).save(buffer, kind=kind, scale=8, border=2)
# ~ return buffer
def _get_relacionados(doc, version):
node = doc.find('{}CfdiRelacionados'.format(PRE[version]))
if node is None:
return ''
uuids = ['UUID: {}'.format(n.attrib['UUID']) for n in list(node)]
return '\n'.join(uuids)
def _comprobante(doc, options):
data = CaseInsensitiveDict(doc.attrib.copy())
del data['certificado']
serie = ''
if 'serie' in data:
serie = '{}-'.format(data['serie'])
data['seriefolio'] = '{}{}'.format(serie, data.get('folio', ''))
data['totalenletras'] = to_letters(float(data['total']), data['moneda'])
is_nomina = options.get('is_nomina', False)
if is_nomina:
data['formadepago'] = options['formadepago']
data['periodicidaddepago'] = options['periodicidaddepago']
data['tiporelacion'] = options.get('tiporelacion', '')
return data
if data['version'] in CFDI_VERSIONS:
tipos = {
'I': 'ingreso',
'E': 'egreso',
'T': 'traslado',
'P': 'pago',
}
data['tipodecomprobante'] = tipos.get(data['tipodecomprobante'])
data['lugarexpedicion'] = \
'C.P. de Expedición: {}'.format(data['lugarexpedicion'])
if 'metododepago' in options:
data['metododepago'] = options['metododepago']
if 'formadepago' in options:
data['formadepago'] = options['formadepago']
if 'condicionesdepago' in data:
data['condicionesdepago'] = \
'Condiciones de pago: {}'.format(data['condicionesdepago'])
data['moneda'] = options['moneda']
data['tiporelacion'] = options.get('tiporelacion', '')
data['relacionados'] = _get_relacionados(doc, data['version'])
else:
fields = {
'formaDePago': 'Forma de Pago: {}\n',
'metodoDePago': 'Método de pago: {}\n',
'condicionesDePago': 'Condiciones de Pago: {}\n',
'NumCtaPago': 'Número de Cuenta de Pago: {}\n',
'Moneda': 'Moneda: {}\n',
'TipoCambio': 'Tipo de Cambio: {}',
}
datos = ''
for k, v in fields.items():
if k in data:
datos += v.format(data[k])
data['datos'] = datos
data['hora'] = data['fecha'].split('T')[1]
fecha = parser.parse(data['fecha'])
try:
locale.setlocale(locale.LC_TIME, "es_MX.UTF-8")
except:
pass
data['fechaformato'] = fecha.strftime('%A, %d de %B de %Y')
if 'tipocambio' in data:
data['tipocambio'] = 'Tipo de Cambio: $ {:0.4f}'.format(
float(data['tipocambio']))
data['notas'] = options['notas']
return data
def _emisor(doc, version, values):
emisor = doc.find('{}Emisor'.format(PRE[version]))
data = CaseInsensitiveDict(emisor.attrib.copy())
node = emisor.find('{}DomicilioFiscal'.format(PRE[version]))
if not node is None:
data.update(CaseInsensitiveDict(node.attrib.copy()))
if version == '3.2':
node = emisor.find('{}RegimenFiscal'.format(PRE[version]))
if not node is None:
data['regimenfiscal'] = node.attrib['Regimen']
data['regimen'] = node.attrib['Regimen']
else:
data['regimenfiscal'] = values['regimenfiscal']
path = _join(PATH_MEDIA, 'logos', '{}.png'.format(data['rfc'].lower()))
if is_file(path):
data['logo'] = path
return data
def _receptor(doc, version, values):
node = doc.find('{}Receptor'.format(PRE[version]))
data = CaseInsensitiveDict(node.attrib.copy())
node = node.find('{}Domicilio'.format(PRE[version]))
if not node is None:
data.update(node.attrib.copy())
if version == '3.2':
return data
data['usocfdi'] = values['usocfdi']
# ~ data.update(values['receptor'])
return data
def _conceptos(doc, version, options):
is_nomina = options.get('is_nomina', False)
data = []
conceptos = doc.find('{}Conceptos'.format(PRE[version]))
# ~ for c in conceptos.getchildren():
for c in list(conceptos):
values = CaseInsensitiveDict(c.attrib.copy())
if is_nomina:
values['noidentificacion'] = values['ClaveProdServ']
values['unidad'] = values['ClaveUnidad']
data.append(values)
continue
if version in CFDI_VERSIONS:
if 'noidentificacion' in values:
values['noidentificacion'] = '{}\n(SAT {})'.format(
values['noidentificacion'], values['ClaveProdServ'])
else:
values['noidentificacion'] = 'SAT {}'.format(
values['ClaveProdServ'])
if 'unidad' in values:
values['unidad'] = '({})\n{}'.format(
values['ClaveUnidad'], values['unidad'])
else:
values['unidad'] = '{}'.format(values['ClaveUnidad'])
n = c.find('{}CuentaPredial'.format(PRE[version]))
if n is not None:
v = CaseInsensitiveDict(n.attrib.copy())
info = '\nCuenta Predial Número: {}'.format(v['numero'])
values['descripcion'] += info
n = c.find('{}InformacionAduanera'.format(PRE[version]))
if n is not None:
v = CaseInsensitiveDict(n.attrib.copy())
info = '\nNúmero Pedimento: {}'.format(v['numeropedimento'])
values['descripcion'] += info
n = c.find('{}ComplementoConcepto'.format(PRE[version]))
if n is not None:
v = CaseInsensitiveDict(n[0].attrib.copy())
info = '\nAlumno: {} (CURP: {})\nNivel: {}, Autorización: {}'.format(
v['nombreAlumno'], v['CURP'], v['nivelEducativo'], v['autRVOE'])
values['descripcion'] += info
data.append(values)
return data
def _totales(doc, cfdi, version):
data = {}
data['moneda'] = doc.attrib['Moneda']
data['subtotal'] = cfdi['subtotal']
if 'descuento' in cfdi:
data['descuento'] = cfdi['descuento']
data['total'] = cfdi['total']
tn = {
'001': 'ISR',
'002': 'IVA',
'003': 'IEPS',
}
traslados = []
retenciones = []
taxlocales = []
imp = doc.find('{}Impuestos'.format(PRE[version]))
if imp is not None:
tmp = CaseInsensitiveDict(imp.attrib.copy())
for k, v in tmp.items():
data[k] = v
node = imp.find('{}Traslados'.format(PRE[version]))
if node is not None:
# ~ for n in node.getchildren():
for n in list(node):
tmp = CaseInsensitiveDict(n.attrib.copy())
if version in CFDI_VERSIONS:
tasa = ''
if 'tasaocuota' in tmp:
tasa = round(float(tmp['tasaocuota']), DECIMALES)
title = 'Traslado {} {}'.format(tn.get(tmp['impuesto']), tasa)
else:
title = 'Traslado {} {}'.format(tmp['impuesto'], tmp['tasa'])
if 'importe' in tmp:
traslados.append((title, float(tmp['importe'])))
node = imp.find('{}Retenciones'.format(PRE[version]))
if node is not None:
# ~ for n in node.getchildren():
for n in list(node):
tmp = CaseInsensitiveDict(n.attrib.copy())
if version in CFDI_VERSIONS:
title = 'Retención {} {}'.format(
tn.get(tmp['impuesto']), '')
else:
title = 'Retención {} {}'.format(tmp['impuesto'], '')
retenciones.append((title, float(tmp['importe'])))
node = doc.find('{}Complemento/{}ImpuestosLocales'.format(
PRE[version], PRE['LOCALES']))
if node is not None:
for otro in list(node):
if otro.tag == '{}RetencionesLocales'.format(PRE['LOCALES']):
tipo = 'Retención '
name = 'ImpLocRetenido'
tasa = 'TasadeRetencion'
else:
tipo = 'Traslado '
name = 'ImpLocTrasladado'
tasa = 'TasadeTraslado'
title = '{} {} {}%'.format(
tipo, otro.attrib[name], otro.attrib[tasa])
importe = float(otro.attrib['Importe'])
taxlocales.append((title, importe))
data['traslados'] = traslados
data['retenciones'] = retenciones
data['taxlocales'] = taxlocales
return data
def _timbre(doc, version, values, pdf_from='1'):
CADENA = '||{version}|{UUID}|{FechaTimbrado}|{selloCFD}|{noCertificadoSAT}||'
if version in CFDI_VERSIONS:
CADENA = '||{Version}|{UUID}|{FechaTimbrado}|{SelloCFD}|{NoCertificadoSAT}||'
node = doc.find('{}Complemento/{}TimbreFiscalDigital'.format(
PRE[version], PRE['TIMBRE']))
data = CaseInsensitiveDict(node.attrib.copy())
qr_data = {
'url': 'https://verificacfdi.facturaelectronica.sat.gob.mx/default.aspx?',
'uuid': '&id={}'.format(data['uuid']),
'emisor': '&re={}'.format(values['rfc_emisor']),
'receptor': '&rr={}'.format(values['rfc_receptor']),
'total': '&tt={}'.format(values['total']),
'sello': '&fe={}'.format(data['sellocfd'][-8:]),
}
qr_data = '{url}{uuid}{emisor}{receptor}{total}{sello}'.format(**qr_data)
data['cbb'] = get_qr(qr_data, 'png')
# ~ if pdf_from == '1':
# ~ data['cbb'] = get_qr(qr_data, 'png')
# ~ else:
# ~ data['cbb'] = get_qr(qr_data)
data['cadenaoriginal'] = CADENA.format(**data)
return data
def _donataria(doc, version, fechadof):
node = doc.find('{}Complemento/{}Donatarias'.format(
PRE[version], PRE['DONATARIA']))
if node is None:
return {}
data = CaseInsensitiveDict(node.attrib.copy())
data['fechadof'] = fechadof
return data
def _ine(doc, version):
node = doc.find('{}Complemento/{}INE'.format(PRE[version], PRE['INE']))
if node is None:
return {}
values = (
('TipoComite', 'Tipo de Comite: {}'),
('TipoProceso', 'Tipo de Proceso: {}'),
('IdContabilidad', 'ID de Contabilidad: {}'),
)
data = CaseInsensitiveDict(node.attrib.copy())
for k, v in values:
if k in data:
data[k] = v.format(data[k])
try:
node = node[0]
attr = CaseInsensitiveDict(node.attrib.copy())
values = (
('ClaveEntidad', 'Clave de la Entidad: {}'),
('Ambito', 'Ámbito: {}'),
)
for k, v in values:
if k in attr:
data[k] = v.format(attr[k])
node = node[0]
attr = CaseInsensitiveDict(node.attrib.copy())
values = (
('IdContabilidad', 'ID de Contabilidad: {}'),
)
for k, v in values:
if k in attr:
data[k] = v.format(attr[k])
except Exception as e:
print(e)
return data
def _nomina(doc, data, values, version_cfdi):
is_nomina = values.get('is_nomina', False)
if not is_nomina:
return {}
version = values['version']
node_nomina = doc.find('{}Complemento/{}Nomina'.format(
PRE[version_cfdi], PRE['NOMINA'][version]))
if node_nomina is None:
return {}
info = CaseInsensitiveDict(node_nomina.attrib.copy())
node = node_nomina.find('{}Emisor'.format(PRE['NOMINA'][version]))
if not node is None:
data['emisor'].update(CaseInsensitiveDict(node.attrib.copy()))
node = node_nomina.find('{}Receptor'.format(PRE['NOMINA'][version]))
data['receptor'].update(CaseInsensitiveDict(node.attrib.copy()))
node = node_nomina.find('{}Percepciones'.format(PRE['NOMINA'][version]))
if not node is None:
data['comprobante'].update(CaseInsensitiveDict(node.attrib.copy()))
info['percepciones'] = []
for p in list(node):
info['percepciones'].append(CaseInsensitiveDict(p.attrib.copy()))
node = node_nomina.find('{}Deducciones'.format(PRE['NOMINA'][version]))
if not node is None:
data['comprobante'].update(CaseInsensitiveDict(node.attrib.copy()))
info['deducciones'] = []
for d in list(node):
info['deducciones'].append(CaseInsensitiveDict(d.attrib.copy()))
node = node_nomina.find('{}OtrosPagos'.format(PRE['NOMINA'][version]))
if not node is None:
info['otrospagos'] = []
for o in list(node):
info['otrospagos'].append(CaseInsensitiveDict(o.attrib.copy()))
n = o.find('{}SubsidioAlEmpleo'.format(PRE['NOMINA'][version]))
if not n is None:
info.update(CaseInsensitiveDict(n.attrib.copy()))
node = node_nomina.find('{}Incapacidades'.format(PRE['NOMINA'][version]))
if not node is None:
info['incapacidades'] = []
for i in list(node):
info['incapacidades'].append(CaseInsensitiveDict(i.attrib.copy()))
return info
def _get_info_pays_2(node):
pre_pays = PRE_DEFAULT['PAGOS']['PRE']
data = CaseInsensitiveDict(node.attrib.copy())
path = f"{pre_pays}Totales"
totales = node.find(path)
data.update(CaseInsensitiveDict(totales.attrib.copy()))
path = f"{pre_pays}Pago"
node_pay = node.find(path)
data.update(CaseInsensitiveDict(node_pay.attrib.copy()))
related = []
for n in node_pay:
attr = CaseInsensitiveDict(n.attrib.copy())
if attr:
attr['metododepagodr'] = ''
related.append(attr)
data['related'] = related
return data
def _cfdipays(doc, data, version):
pre_pays = PRE_DEFAULT['PAGOS']['PRE']
path = f"{PRE[version]}Complemento/{pre_pays}Pagos"
node = doc.find(path)
if node is None:
pre_pays = PRE['PAGOS']['1.0']
path = f"{PRE[version]}Complemento/{pre_pays}Pagos"
node = doc.find(path)
if node is None:
log.error('Node pays not found...')
return {}
if version == '4.0':
info = _get_info_pays_2(node)
else:
info = CaseInsensitiveDict(node.attrib.copy())
related = []
for n1 in node:
info.update(CaseInsensitiveDict(n1.attrib.copy()))
for n2 in n1:
related.append(CaseInsensitiveDict(n2.attrib.copy()))
info['related'] = related
data['comprobante']['totalenletras'] = to_letters(
float(info['monto']), info['monedap'])
data['comprobante']['moneda'] = info['monedap']
return info
def get_data_from_xml(invoice, values, pdf_from='1'):
data = {'cancelada': invoice.cancelada, 'donativo': False}
if hasattr(invoice, 'donativo'):
data['donativo'] = invoice.donativo
doc = parse_xml(invoice.xml)
data['comprobante'] = _comprobante(doc, values)
version = data['comprobante']['version']
data['emisor'] = _emisor(doc, version, values)
data['receptor'] = _receptor(doc, version, values)
data['conceptos'] = _conceptos(doc, version, values)
data['totales'] = _totales(doc, data['comprobante'], version)
data['donataria'] = _donataria(doc, version, values['fechadof'])
data['ine'] = _ine(doc, version)
options = {
'rfc_emisor': data['emisor']['rfc'],
'rfc_receptor': data['receptor']['rfc'],
'total': data['comprobante']['total'],
}
data['timbre'] = _timbre(doc, version, options, pdf_from)
del data['timbre']['version']
data['comprobante'].update(data['timbre'])
data['nomina'] = _nomina(doc, data, values, version)
data['pagos'] = values.get('pagos', False)
if data['pagos']:
data['pays'] = _cfdipays(doc, data, version)
data['pakings'] = values.get('pakings', [])
data['el.version'] = values['el.version']
return data
def to_zip(*files):
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
for data, file_name in files:
zip_file.writestr(file_name, data)
return zip_buffer.getvalue()
def make_fields(xml):
doc = ET.fromstring(xml)
data = CaseInsensitiveDict(doc.attrib.copy())
data.pop('certificado')
data.pop('sello')
version = data['version']
receptor = doc.find('{}Receptor'.format(PRE[version]))
receptor = CaseInsensitiveDict(receptor.attrib.copy())
data['receptor_nombre'] = receptor['nombre']
data['receptor_rfc'] = receptor['rfc']
data = {k.lower(): v for k, v in data.items()}
return data
def make_info_mail(data, fields):
try:
return data.format(**fields).replace('\n', '<br/>')
except:
log.error(data)
log.error(fields)
return data.replace('\n', '<br/>')
def send_mail(data):
msg = ''
server = SendMail(data['server'])
is_connect = server.is_connect
if is_connect:
msg = server.send(data['options'])
else:
msg = server.error
server.close()
return {'ok': is_connect, 'msg': msg}
def exists(path):
return os.path.exists(path)
def get_path_info(path):
path, filename = os.path.split(path)
name, extension = os.path.splitext(filename)
return (path, filename, name, extension)
def get_path_temp(s=''):
return tempfile.mkstemp(s)[1]
def get_date(value, next_day=False):
d = parser.parse(value)
if next_day:
return d + datetime.timedelta(days=1)
return d
class UpFile(object):
def __init__(self):
self._init_values()
def _init_values(self):
return
def save_template(rfc, opt, file_obj):
result = {'status': 'error', 'ok': False}
name_template = f'{rfc}{opt}'
path_template = _join(PATH_MEDIA, 'templates', name_template)
if save_file(path_template, file_obj.file.read()):
result = {'status': 'server', 'name': file_obj.filename, 'ok': True}
return result
def upload_file(rfc, opt, file_obj):
rfc = rfc.lower()
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
versions = ('_3.2.ods',
'_3.3.ods', '_3.3_cd_1.1.ods', '_3.3_cp_1.0.ods', '_3.3_cn_1.2.ods', '_3.3_ccp_2.0.ods', '_3.3.json',
'_4.0.ods',
'_4.0_cn_1.2.ods',
'_4.0_cp_2.0.ods',
'_4.0_ccp_2.0.ods',
'_4.0_cd_1.1.ods',
'_4.0.json',
)
if opt in versions:
return save_template(rfc, opt, file_obj)
EXTENSIONS = {
'txt_plantilla_factura_32': EXT['ODS'],
'txt_plantilla_factura_33': EXT['ODS'],
'txt_plantilla_factura_html': EXT['HTML'],
'txt_plantilla_factura_css': EXT['CSS'],
'txt_plantilla_factura_json': EXT['JSON'],
}
if opt in EXTENSIONS:
if ext != EXTENSIONS[opt]:
msg = (
f"Extensión de archivo incorrecta, "
f"selecciona un archivo {EXTENSIONS[opt].upper()}"
)
return {'status': 'server', 'name': msg, 'ok': False}
NAMES = {
'txt_plantilla_factura_32': f"{rfc}_3.2.ods",
'txt_plantilla_factura_33': f"{rfc}_3.3.ods",
'txt_plantilla_factura_html': f"{rfc}_3.3.html",
'txt_plantilla_factura_css': f"{rfc}.css",
'txt_plantilla_factura_json': f"{rfc}_3.3.json",
}
name = NAMES[opt]
paths = {
'txt_plantilla_factura_32': _join(PATHS['USER'], name),
'txt_plantilla_factura_33': _join(PATHS['USER'], name),
'txt_plantilla_factura_html': _join(PATHS['USER'], name),
'txt_plantilla_factura_css': _join(PATHS['CSS'], name),
'txt_plantilla_factura_json': _join(PATHS['USER'], name),
}
if save_file(paths[opt], file_obj.file.read()):
return {'status': 'server', 'name': file_obj.filename, 'ok': True}
return {'status': 'error', 'ok': False}
if opt == 'txt_plantilla_ticket':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_ticket.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'templates', name)
elif opt == 'txt_plantilla_donataria':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_3.3_donativo.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'templates', name)
elif opt == 'txt_plantilla_nomina1233':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_1.2_3.3.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'templates', name)
elif opt == 'txt_plantilla_pagos10':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_pagos_1.0.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'templates', name)
elif opt == 'bdfl':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'sqlite':
msg = 'Extensión de archivo incorrecta, selecciona un archivo SQLite'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}.sqlite'.format(rfc.lower())
path = _join('/tmp', name)
elif opt == 'products':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_products.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'tmp', name)
elif opt == 'invoiceods':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_invoice.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'tmp', name)
elif opt == 'employees':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_employees.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'tmp', name)
elif opt == 'nomina':
tmp = file_obj.filename.split('.')
ext = tmp[-1].lower()
if ext != 'ods':
msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS'
return {'status': 'server', 'name': msg, 'ok': False}
name = '{}_nomina.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'tmp', name)
if save_file(path, file_obj.file.read()):
return {'status': 'server', 'name': file_obj.filename, 'ok': True}
return {'status': 'error', 'ok': False}
def _get_pem_from_pfx(cert):
tmp_p12 = save_temp(cert.p12)
args = "openssl pkcs12 -in '{}' -clcerts -nodes -nocerts " \
"-passin pass:'{}' | openssl rsa".format(tmp_p12, _get_md5(cert.rfc))
result = _call(args)
_kill(tmp_p12)
return result.encode()
def cancel_xml(auth, uuid, certificado):
from .pac import Finkok as PAC
if DEBUG:
auth = {}
else:
if not auth:
msg = 'Sin datos para cancelar'
data = {'ok': False, 'error': msg}
return data, result
msg = 'Factura cancelada correctamente'
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}}
pac = PAC(auth)
result = pac.cancel_xml(certificado.rfc, str(uuid).upper(),
certificado.cer_pem.encode(), _get_pem_from_pfx(certificado))
if result:
codes = {None: '',
'Could not get UUID Text': 'UUID no encontrado',
'Invalid Passphrase': 'Contraseña inválida',
}
if not result['CodEstatus'] is None:
data['ok'] = False
data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus'])
else:
data['ok'] = False
data['msg'] = pac.error
return data, result
def cancel_signature(uuid, pk12, rfc, auth):
from .pac import Finkok as PAC
token = _get_md5(rfc)
if USAR_TOKEN:
token = auth['PASS']
if AUTH['DEBUG']:
token = AUTH['PASS']
template = read_file(TEMPLATE_CANCEL, 'r')
data = {
'rfc': rfc,
'fecha': datetime.datetime.now().isoformat()[:19],
'uuid': str(uuid).upper(),
}
template = template.format(**data)
data = {
'xmlsec': PATH_XMLSEC,
'pk12': save_temp(pk12),
'pass': token,
'template': save_temp(template, 'w'),
}
args = '"{xmlsec}" --sign --pkcs12 "{pk12}" --pwd {pass} ' \
'"{template}"'.format(**data)
xml_sign = _call(args)
if DEBUG:
auth = {}
else:
if not auth:
msg = 'Sin datos para cancelar'
result = {'ok': False, 'error': msg}
return result
msg = 'Factura cancelada correctamente'
data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}}
pac = PAC(auth)
result = pac.cancel_signature(xml_sign)
if result:
codes = {None: '',
'Could not get UUID Text': 'UUID no encontrado'}
if not result['CodEstatus'] is None:
data['ok'] = False
data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus'])
else:
data['ok'] = False
data['msg'] = pac.error
return data, result
def run_in_thread(fn):
def run(*k, **kw):
t = threading.Thread(target=fn, args=k, kwargs=kw)
t.start()
return t
return run
def get_bool(value):
if not value:
return False
if value == '1':
return True
return False
def get_float(value, four=False):
if four:
return round(float(value), DECIMALES_TAX)
return round(float(value), DECIMALES)
def crear_rol(user, contra=''):
if not contra:
contra = user
args = 'psql -U postgres -c "CREATE ROLE {} WITH LOGIN ENCRYPTED ' \
'PASSWORD \'{}\';"'.format(user, contra)
try:
result = _call(args)
if result == 'CREATE ROLE\n':
return True
except Exception as e:
log.info(e)
return False
def crear_db(nombre):
args = 'psql -U postgres -c "CREATE DATABASE {0} WITH ' \
'OWNER {0};"'.format(nombre)
try:
result = _call(args)
print (result)
if result == 'CREATE DATABASE\n':
return True
except Exception as e:
log.info(e)
return False
def _backup_db(user):
dt = datetime.datetime.now().strftime('%y%m%d_%H%M')
path_bk = _join(PATH_MEDIA, 'tmp', '{}_{}.bk'.format(user, dt))
args = 'pg_dump -U postgres -Fc {} > "{}"'.format(user, path_bk)
_call(args)
return
def delete_db(user, bk=True):
if bk:
_backup_db(user)
args = 'psql -U postgres -c "DROP DATABASE {0};"'.format(user)
_call(args)
args = 'psql -U postgres -c "DROP ROLE {0};"'.format(user)
_call(args)
return
def _to_seafile(path_db, data):
if DEBUG:
return
_, filename = os.path.split(path_db)
if SEAFILE_SERVER:
msg = '\tSincronizando backup general...'
log.info(msg)
seafile = SeaFileAPI(
SEAFILE_SERVER['URL'],
SEAFILE_SERVER['USER'],
SEAFILE_SERVER['PASS'])
if seafile.is_connect:
msg = '\tSincronizando: {} '.format(filename)
log.info(msg)
seafile.update_file(
path_db, SEAFILE_SERVER['REPO'], '/', SEAFILE_SERVER['PASS'])
msg = '\tRespaldo general de {} sincronizado'.format(filename)
log.info(msg)
msg = '\tSin datos para sincronización particular de {}'.format(filename)
if len(data) < 2:
log.info(msg)
return
if not data[0] or not data[1] or not data[2]:
log.info(msg)
return
msg = '\tSincronizando backup particular...'
log.info(msg)
seafile = SeaFileAPI(SEAFILE_SERVER['URL'], data[0], data[1])
if seafile.is_connect:
msg = '\t\tSincronizando: {} '.format(filename)
log.info(msg)
seafile.update_file(path_db, data[2], 'Base de datos/', data[1])
msg = '\t\tRespaldo partícular de {} sincronizado'.format(filename)
log.info(msg)
return
@run_in_thread
def _backup_and_sync(rfc, data):
msg = 'Generando backup de: {}'.format(rfc)
log.info(msg)
sql = 'select correo_timbrado, token_timbrado, token_soporte from emisor;'
path_bk = _join(PATH_MEDIA, 'tmp', '{}.bk'.format(rfc.lower()))
if data['type'] == 'postgres':
args = 'pg_dump -U postgres -Fc {} > "{}"'.format(
data['name'], path_bk)
sql = 'psql -U postgres -d {} -Atc "{}"'.format(data['name'], sql)
elif data['type'] == 'sqlite':
args = 'gzip -c "{}" > "{}"'.format(data['name'], path_bk)
sql = 'sqlite3 "{}" "{}"'.format(data['name'], sql)
try:
result = _call(args)
msg = '\tBackup generado de {}'.format(rfc)
log.info(msg)
result = _call(sql).strip().split('|')
_to_seafile(path_bk, result)
except Exception as e:
log.info(e)
return
@run_in_thread
def _backup_companies():
if DEBUG:
return
_, filename = os.path.split(COMPANIES)
if SEAFILE_SERVER:
msg = '\tSincronizando backup RFCs...'
log.info(msg)
seafile = SeaFileAPI(
SEAFILE_SERVER['URL'],
SEAFILE_SERVER['USER'],
SEAFILE_SERVER['PASS'])
if seafile.is_connect:
msg = '\tSincronizando: {} '.format(filename)
log.info(msg)
seafile.update_file(
COMPANIES, SEAFILE_SERVER['REPO'], '/', SEAFILE_SERVER['PASS'])
msg = '\tRespaldo general de {} sincronizado'.format(filename)
log.info(msg)
return
def backup_dbs():
con = sqlite3.connect(COMPANIES)
cursor = con.cursor()
sql = "SELECT * FROM names"
cursor.execute(sql)
rows = cursor.fetchall()
if rows is None:
return
cursor.close()
con.close()
for rfc, data in rows:
args = loads(data)
_backup_and_sync(rfc, args)
_backup_companies()
return
def _validar_directorios(path_bk, target):
path = Path(_join(path_bk, target))
path.mkdir(parents=True, exist_ok=True)
return str(path)
def local_copy(files):
if not MV:
return
path_bk = _join(str(Path.home()), DIR_FACTURAS)
if not os.path.isdir(path_bk):
msg = 'No existe la carpeta: facturas'
log.error(msg)
return
# ~ args = 'df -P {} | tail -1 | cut -d" " -f 1'.format(path_bk)
# ~ try:
# ~ result = _call(args)
# ~ log.info(result)
# ~ except:
# ~ pass
# ~ if result != 'empresalibre\n':
# ~ log.info(result)
# ~ msg = 'Asegurate de que exista la carpeta para sincronizar'
# ~ log.error(msg)
# ~ return
# ~ except subprocess.CalledProcessError:
# ~ msg = 'No se pudo obtener la ruta para sincronizar'
# ~ log.error(msg)
# ~ return
try:
for obj, name, target in files:
path = _validar_directorios(path_bk, target)
path_file = _join(path, name)
m = 'wb'
if name.endswith('xml'):
m = 'w'
save_file(path_file, obj, m)
except Exception as e:
log.error(e)
return
def sync_files(files, auth={}):
if not MV:
return
path_bk = _join(str(Path.home()), DIR_FACTURAS)
if not os.path.isdir(path_bk):
msg = 'No existe la carpeta: facturas'
log.error(msg)
return
for obj, name, target in files:
path = _validar_directorios(path_bk, target)
path_file = _join(path, name)
m = 'wb'
if name.endswith('xml'):
m = 'w'
save_file(path_file, obj, m)
return
def sync_cfdi(files):
local_copy(files)
if DEBUG:
return
# ~ if not auth['REPO'] or not SEAFILE_SERVER:
# ~ return
# ~ seafile = SeaFileAPI(SEAFILE_SERVER['URL'], auth['USER'], auth['PASS'])
# ~ if seafile.is_connect:
# ~ for f in files:
# ~ seafile.update_file(
# ~ f, auth['REPO'], 'Facturas/{}/'.format(f[2]), auth['PASS'])
return
class ImportCFDI(object):
def __init__(self, xml):
self._doc = xml
self._pre = ''
def _relacionados(self):
data = {}
node = self._doc.find('{}CfdiRelacionados'.format(self._pre))
if not node is None:
data = CaseInsensitiveDict(node.attrib.copy())
return data
def _emisor(self):
emisor = self._doc.find('{}Emisor'.format(self._pre))
data = CaseInsensitiveDict(emisor.attrib.copy())
node = emisor.find('{}RegimenFiscal'.format(self._pre))
if not node is None:
data['regimen_fiscal'] = node.attrib['Regimen']
return data
def _receptor(self):
node = self._doc.find('{}Receptor'.format(self._pre))
data = CaseInsensitiveDict(node.attrib.copy())
node = node.find('{}Domicilio'.format(self._pre))
if not node is None:
data.update(node.attrib.copy())
return data
def _conceptos(self):
data = []
conceptos = self._doc.find('{}Conceptos'.format(self._pre))
for c in list(conceptos):
values = CaseInsensitiveDict(c.attrib.copy())
data.append(values)
return data
def _impuestos(self):
data = {}
node = self._doc.find('{}Impuestos'.format(self._pre))
if not node is None:
data = CaseInsensitiveDict(node.attrib.copy())
return data
def _timbre(self):
node = self._doc.find('{}Complemento/{}TimbreFiscalDigital'.format(
self._pre, PRE['TIMBRE']))
data = CaseInsensitiveDict(node.attrib.copy())
data.pop('SelloCFD', None)
data.pop('SelloSAT', None)
data.pop('Version', None)
return data
def get_data(self):
invoice = CaseInsensitiveDict(self._doc.attrib.copy())
invoice.pop('certificado', '')
invoice.pop('sello', '')
self._pre = PRE[invoice['version']]
relacionados = self._relacionados()
emisor = self._emisor()
receptor = self._receptor()
conceptos = self._conceptos()
impuestos = self._impuestos()
timbre = self._timbre()
invoice.update(relacionados)
invoice.update(emisor)
invoice.update(receptor)
invoice.update(impuestos)
invoice.update(timbre)
data = {
'invoice': invoice,
'emisor': emisor,
'receptor': receptor,
'conceptos': conceptos,
}
return data
def print_ticket(data, info):
p = PrintTicket(info)
return p.printer(data)
def import_products(rfc):
name = '{}_products.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'tmp', name)
if not is_file(path):
return (), 'No se encontró la plantilla'
if APP_LIBO:
app = LIBO()
if app.is_running:
return app.products(path)
return (), 'No se encontro LibreOffice'
def import_invoice(rfc):
name = '{}_invoice.ods'.format(rfc.lower())
path = _join(PATH_MEDIA, 'tmp', name)
if not is_file(path):
return (), 'No se encontró la plantilla'
if APP_LIBO:
app = LIBO()
if app.is_running:
return app.invoice(path)
return (), 'No se encontro LibreOffice'
def calc_to_date(value):
return datetime.date.fromordinal(int(value) + 693594)
def get_days(start, end):
return (end - start).days + 1
def log_file(name, msg='', kill=False):
path = _join(PATH_MEDIA, 'tmp', '{}.log'.format(name))
if kill:
_kill(path)
return
with open(path, 'a') as fh:
line = '{} : {}\n'.format(str(now()), msg)
fh.write(line)
return
def get_log(name):
data = ''
name = '{}.log'.format(name)
path = _join(PATH_MEDIA, 'tmp', name)
if is_file(path):
data = open(path).read()
return data, name
def get_timbres(auth):
from .pac import Finkok as PAC
if DEBUG:
return '-d'
pac = PAC(auth)
timbres = pac.client_get_timbres(auth['RFC'])
if pac.error:
return '-e'
return timbres
def truncate(value):
return trunc(value * 100) / 100
def validate_path_bk():
path_bk = _join(str(Path.home()), DIR_FACTURAS)
if not os.path.isdir(path_bk):
msg = 'No existe la carpeta'
return {'ok': False, 'msg': msg}
return {'ok': True, 'msg': path_bk}
def respaldar_db(values, path_bk):
user = values[0].lower()
db = loads(values[1])['name']
path = _join(path_bk, '{}.bk'.format(user))
args = 'pg_dump -U postgres -Fc {} > "{}"'.format(db, path)
_call(args)
return
def validate_rfc(value):
msg = ''
if len(value) < 12:
msg = 'Longitud inválida del RFC'
return msg
l = 4
if len(value)==12:
l = 3
s = value[0:l]
r = re.match('[A-ZÑ&]{%s}' % l, s)
msg = 'Caracteres inválidos al {} del RFC'
if not r:
return msg.format('inicio')
s = value[-3:]
r = re.match('[A-Z0-9]{3}', s)
if not r:
return msg.format('final')
s = value[l:l+6]
r = re.match('[0-9]{6}', s)
msg = 'Fecha inválida en el RFC'
if not r:
return msg
try:
datetime.datetime.strptime(s, '%y%m%d')
return ''
except:
return msg
def parse_xml2(xml_str):
return etree.fromstring(xml_str.encode('utf-8'))