#!/usr/bin/env python import datetime import getpass import hashlib import json import locale import mimetypes import os import re import sqlite3 import socket import subprocess import tempfile import threading import time import unicodedata import uuid import zipfile from io import BytesIO from pathlib import Path # ~ from smtplib import SMTPException, SMTPAuthenticationError from xml.etree import ElementTree as ET try: import uno from com.sun.star.beans import PropertyValue from com.sun.star.awt import Size from com.sun.star.view.PaperFormat import LETTER APP_LIBO = True except: APP_LIBO = False import pyqrcode from dateutil import parser from .helper import CaseInsensitiveDict, NumLet, SendMail, TemplateInvoice, \ SeaFileAPI, PrintTicket from settings import DEBUG, MV, log, template_lookup, COMPANIES, DB_SAT, \ PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL, PATH_TEMPLATES, PATH_MEDIA, PRE, \ PATH_XMLSEC, TEMPLATE_CANCEL, DEFAULT_SAT_PRODUCTO, DECIMALES, DIR_FACTURAS from settings import SEAFILE_SERVER, USAR_TOKEN from .configpac import AUTH def _call(args): return subprocess.check_output(args, shell=True).decode() def _get_md5(data): return hashlib.md5(data.encode()).hexdigest() def save_temp(data, modo='wb'): path = tempfile.mkstemp()[1] with open(path, modo) as f: f.write(data) return path def save_file(path, data, modo='wb'): try: with open(path, modo) as f: f.write(data) return True except: return False def _join(*paths): return os.path.join(*paths) def _kill(path): try: os.remove(path) except: pass return def get_pass(): password = getpass.getpass('Introduce la contraseña: ') pass2 = getpass.getpass('Confirma la contraseña: ') if password != pass2: msg = 'Las contraseñas son diferentes' return False, msg password = password.strip() if not password: msg = 'La contraseña es necesaria' return False, msg return True, password def get_value(arg): value = input('Introduce el {}: '.format(arg)).strip() if not value: msg = 'El {} es requerido'.format(arg) log.error(msg) return '' return value def _valid_db_companies(): con = sqlite3.connect(COMPANIES) sql = """ CREATE TABLE IF NOT EXISTS names( rfc TEXT NOT NULL COLLATE NOCASE UNIQUE, con TEXT NOT NULL ); """ cursor = con.cursor() cursor.executescript(sql) cursor.close() con.close() return def _get_args(rfc): _valid_db_companies() con = sqlite3.connect(COMPANIES) cursor = con.cursor() sql = "SELECT con FROM names WHERE rfc=?" cursor.execute(sql, (rfc,)) values = cursor.fetchone() if values is None: msg = 'No se encontró el RFC' log.error(msg) return '' cursor.close() con.close() return values[0] def get_rfcs(): _valid_db_companies() con = sqlite3.connect(COMPANIES) cursor = con.cursor() sql = "SELECT * FROM names" cursor.execute(sql) values = cursor.fetchall() cursor.close() con.close() return values def get_con(rfc=''): if not rfc: rfc = get_value('RFC').upper() if not rfc: return {} args = _get_args(rfc.upper()) if not args: return {} return loads(args) def get_sat_key(table, key): con = sqlite3.connect(DB_SAT) cursor = con.cursor() sql = 'SELECT key, name FROM {} WHERE key=?'.format(table) cursor.execute(sql, (key,)) data = cursor.fetchone() cursor.close() con.close() if data is None: return {'ok': False, 'text': 'No se encontró la clave'} return {'ok': True, 'text': data[1]} def get_sat_monedas(key): con = sqlite3.connect(DB_SAT) con.row_factory = sqlite3.Row cursor = con.cursor() filtro = '%{}%'.format(key) sql = "SELECT * FROM monedas WHERE key LIKE ? OR name LIKE ?" cursor.execute(sql, [filtro, filtro]) data = cursor.fetchall() cursor.close() con.close() if data is None: return () data = [dict(r) for r in data] return tuple(data) def get_sat_unidades(key): con = sqlite3.connect(DB_SAT) con.row_factory = sqlite3.Row cursor = con.cursor() filtro = '%{}%'.format(key) sql = "SELECT * FROM unidades WHERE key LIKE ? OR name LIKE ?" cursor.execute(sql, [filtro, filtro]) data = cursor.fetchall() cursor.close() con.close() if data is None: return () data = [dict(r) for r in data] return tuple(data) def get_sat_productos(key): con = sqlite3.connect(DB_SAT) con.row_factory = sqlite3.Row cursor = con.cursor() filtro = '%{}%'.format(key) sql = "SELECT * FROM productos WHERE key LIKE ? OR name LIKE ?" cursor.execute(sql, [filtro, filtro]) data = cursor.fetchall() cursor.close() con.close() if data is None: return () data = [dict(r) for r in data] return tuple(data) def now(): return datetime.datetime.now().replace(microsecond=0) def today(): return datetime.date.today() def get_token(): return _get_hash(uuid.uuid4().hex) def get_mimetype(path): mt = mimetypes.guess_type(path)[0] return mt or 'application/octet-stream' def is_file(path): return os.path.isfile(path) def get_stream(path): return get_file(path), get_size(path) def get_file(path): return open(path, 'rb') def get_files(path, ext='xml'): docs = [] for folder, _, files in os.walk(path): pattern = re.compile('\.{}'.format(ext), re.IGNORECASE) docs += [os.path.join(folder,f) for f in files if pattern.search(f)] return tuple(docs) def read_file(path, mode='rb'): return open(path, mode).read() def get_size(path): return os.path.getsize(path) def get_template(name, data={}): # ~ print ('NAME', name) template = template_lookup.get_template(name) return template.render(**data) def get_custom_styles(name, default='plantilla_factura.json'): path = _join(PATH_MEDIA, 'templates', name.lower()) if is_file(path): with open(path) as fh: return loads(fh.read()) path = _join(PATH_TEMPLATES, default) if is_file(path): with open(path) as fh: return loads(fh.read()) return {} def get_template_ods(name, default='plantilla_factura.ods'): path = _join(PATH_MEDIA, 'templates', name.lower()) if is_file(path): return path path = _join(PATH_TEMPLATES, default) if is_file(path): return path return '' def dumps(data): return json.dumps(data, default=str) def loads(data): return json.loads(data) def import_json(path): return loads(read_file(path, 'r')) def clean(values): for k, v in values.items(): if isinstance(v, str): values[k] = v.strip() return values def parse_con(values): data = values.split('|') try: con = {'type': data[0]} if con['type'] == 'sqlite': con['name'] = data[1] else: if data[1]: con['host'] = data[1] if data[2]: con['port'] = data[2] con['name'] = data[3] con['user'] = data[4] con['password'] = data[5] return con except IndexError: return {} def spaces(value): return '\n'.join([' '.join(l.split()) for l in value.split('\n')]) def to_slug(string): value = (unicodedata.normalize('NFKD', string) .encode('ascii', 'ignore') .decode('ascii').lower()) return value.replace(' ', '_') class Certificado(object): def __init__(self, paths): self._path_key = paths['path_key'] self._path_cer = paths['path_cer'] self._modulus = '' self.error = '' def _kill(self, path): try: os.remove(path) except: pass return def _get_info_cer(self, session_rfc): data = {} args = 'openssl x509 -inform DER -in {}' try: cer_pem = _call(args.format(self._path_cer)) except Exception as e: self.error = 'No se pudo convertir el CER en PEM' return data args = 'openssl enc -base64 -in {}' try: cer_txt = _call(args.format(self._path_cer)) except Exception as e: self.error = 'No se pudo convertir el CER en TXT' return data args = 'openssl x509 -inform DER -in {} -noout -{}' try: result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3] except Exception as e: self.error = 'No se puede saber si es FIEL' return data if result == 'SSL server : No': self.error = 'El certificado es FIEL' return data result = _call(args.format(self._path_cer, 'serial')) serie = result.split('=')[1].split('\n')[0][1::2] result = _call(args.format(self._path_cer, 'subject')) #~ Verificar si es por la version de OpenSSL t1 = 'x500UniqueIdentifier = ' t2 = 'x500UniqueIdentifier=' if t1 in result: rfc = result.split(t1)[1][:13].strip() elif t2 in result: rfc = result.split(t2)[1][:13].strip() else: self.error = 'No se pudo obtener el RFC del certificado' print ('\n', result) return data if not DEBUG: if not rfc == session_rfc: self.error = 'El RFC del certificado no corresponde.' return data dates = _call(args.format(self._path_cer, 'dates')).split('\n') desde = parser.parse(dates[0].split('=')[1]) hasta = parser.parse(dates[1].split('=')[1]) self._modulus = _call(args.format(self._path_cer, 'modulus')) data['cer'] = read_file(self._path_cer) data['cer_pem'] = cer_pem data['cer_txt'] = cer_txt.replace('\n', '') data['serie'] = serie data['rfc'] = rfc data['desde'] = desde.replace(tzinfo=None) data['hasta'] = hasta.replace(tzinfo=None) return data def _get_p12(self, password, rfc, token): tmp_cer = tempfile.mkstemp()[1] tmp_key = tempfile.mkstemp()[1] tmp_p12 = tempfile.mkstemp()[1] args = 'openssl x509 -inform DER -in "{}" -out "{}"' _call(args.format(self._path_cer, tmp_cer)) args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" -out "{}"' _call(args.format(self._path_key, password, tmp_key)) args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" ' \ '-passout pass:"{}" -out "{}"' _call(args.format(tmp_cer, tmp_key, rfc, token, tmp_p12)) data = read_file(tmp_p12) self._kill(tmp_cer) self._kill(tmp_key) self._kill(tmp_p12) return data def _get_info_key(self, password, rfc, token): data = {} args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}"' try: result = _call(args.format(self._path_key, password)) except Exception as e: self.error = 'Contraseña incorrecta' return data args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" | ' \ 'openssl rsa -noout -modulus' mod_key = _call(args.format(self._path_key, password)) if self._modulus != mod_key: self.error = 'Los archivos no son pareja' return data args = "openssl pkcs8 -inform DER -in '{}' -passin pass:'{}' | " \ "openssl rsa -des3 -passout pass:'{}'".format( self._path_key, password, token) key_enc = _call(args) data['key'] = read_file(self._path_key) data['key_enc'] = key_enc data['p12'] = self._get_p12(password, rfc, token) return data def validate(self, password, rfc, auth): token = _get_md5(rfc) if USAR_TOKEN: token = auth['PASS'] if AUTH['DEBUG']: token = AUTH['PASS'] if not self._path_key or not self._path_cer: self.error = 'Error en las rutas temporales del certificado' return {} data = self._get_info_cer(rfc) if not data: return {} llave = self._get_info_key(password, rfc, token) if not llave: return {} data.update(llave) self._kill(self._path_key) self._kill(self._path_cer) return data def make_xml(data, certificado, auth): from .cfdi_xml import CFDI token = _get_md5(certificado.rfc) if USAR_TOKEN: token = auth['PASS'] if AUTH['DEBUG']: token = AUTH['PASS'] if DEBUG: data['emisor']['Rfc'] = certificado.rfc data['emisor']['RegimenFiscal'] = '603' cfdi = CFDI() xml = cfdi.get_xml(data) data = { 'xsltproc': PATH_XSLTPROC, 'xslt': _join(PATH_XSLT, 'cadena.xslt'), 'xml': save_temp(xml, 'w'), 'openssl': PATH_OPENSSL, 'key': save_temp(certificado.key_enc, 'w'), 'pass': token, } args = '"{xsltproc}" "{xslt}" "{xml}" | ' \ '"{openssl}" dgst -sha256 -sign "{key}" -passin pass:"{pass}" | ' \ '"{openssl}" enc -base64 -A'.format(**data) sello = _call(args) _kill(data['xml']) _kill(data['key']) return cfdi.add_sello(sello) def timbra_xml(xml, auth): from .pac import Finkok as PAC if not DEBUG and not auth: msg = 'Sin datos para timbrar' result = {'ok': True, 'error': msg} return result result = {'ok': True, 'error': ''} pac = PAC(auth) xml = pac.timbra_xml(xml) if not xml: result['ok'] = False result['error'] = pac.error return result result['xml'] = xml result['uuid'] = pac.uuid result['fecha'] = pac.fecha return result def get_sat(xml): from .pac import get_status_sat return get_status_sat(xml) class LIBO(object): HOST = 'localhost' PORT = '8100' ARG = 'socket,host={},port={};urp;StarOffice.ComponentContext'.format( HOST, PORT) def __init__(self): self._app = None self._start_office() self._init_values() def _init_values(self): self._es_pre = False self._ctx = None self._sm = None self._desktop = None if self.is_running: ctx = uno.getComponentContext() service = 'com.sun.star.bridge.UnoUrlResolver' resolver = ctx.ServiceManager.createInstanceWithContext(service, ctx) self._ctx = resolver.resolve('uno:{}'.format(self.ARG)) self._sm = self._ctx.ServiceManager self._desktop = self._create_instance('com.sun.star.frame.Desktop') return def _create_instance(self, name, with_context=True): if with_context: instance = self._sm.createInstanceWithContext(name, self._ctx) else: instance = self._sm.createInstance(name) return instance @property def is_running(self): try: s = socket.create_connection((self.HOST, self.PORT), 5.0) s.close() return True except ConnectionRefusedError: return False def _start_office(self): if self.is_running: return c = 1 while c < 4: c += 1 self.app = subprocess.Popen([ 'soffice', '--headless', '--accept={}'.format(self.ARG)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) time.sleep(5) if self.is_running: return return def _set_properties(self, properties): pl = [] for k, v in properties.items(): pv = PropertyValue() pv.Name = k pv.Value = v pl.append(pv) return tuple(pl) def _doc_open(self, path, options): options = self._set_properties(options) path = self._path_url(path) try: doc = self._desktop.loadComponentFromURL(path, '_blank', 0, options) return doc except: return None def _path_url(self, path): if path.startswith('file://'): return path return uno.systemPathToFileUrl(path) def close(self): if self.is_running: if not self._desktop is None: self._desktop.terminate() if not self._app is None: self._app.terminate() return def _read(self, path): try: return open(path, 'rb').read() except: return b'' def _clean(self): self._sd.SearchRegularExpression = True self._sd.setSearchString("\{(\w.+)\}") self._search.replaceAll(self._sd) return def _cancelado(self, cancel): if not cancel: pd = self._sheet.getDrawPage() if pd.getCount(): pd.remove(pd.getByIndex(0)) return def _set_search(self): self._sheet = self._template.getSheets().getByIndex(0) try: self._search = self._sheet.getPrintAreas()[0] except IndexError: self._search = self._sheet.getRangeAddress() self._search = self._sheet.getCellRangeByPosition( self._search.StartColumn, self._search.StartRow, self._search.EndColumn, self._search.EndRow ) self._sd = self._sheet.createSearchDescriptor() try: self._sd.SearchCaseSensitive = False except: print ('SD', self._sd) return def _next_cell(self, cell): col = cell.getCellAddress().Column row = cell.getCellAddress().Row + 1 return self._sheet.getCellByPosition(col, row) def _copy_cell(self, cell): destino = self._next_cell(cell) self._sheet.copyRange(destino.getCellAddress(), cell.getRangeAddress()) return destino def _set_cell(self, k='', v=None, cell=None, value=False): if k: self._sd.setSearchString(k) ranges = self._search.findAll(self._sd) if ranges: ranges = ranges.getRangeAddressesAsString().split(';') for r in ranges: for c in r.split(','): cell = self._sheet.getCellRangeByName(c) if v is None: return cell if cell.getImplementationName() == 'ScCellObj': pattern = re.compile(k, re.IGNORECASE) nv = pattern.sub(v, cell.getString()) if value: cell.setValue(nv) else: cell.setString(nv) return cell if cell: if cell.getImplementationName() == 'ScCellObj': ca = cell.getCellAddress() new_cell = self._sheet.getCellByPosition(ca.Column, ca.Row + 1) if value: new_cell.setValue(v) else: new_cell.setString(v) return new_cell def _comprobante(self, data): for k, v in data.items(): if k in ('total', 'descuento', 'subtotal'): self._set_cell('{cfdi.%s}' % k, v, value=True) else: self._set_cell('{cfdi.%s}' % k, v) return def _emisor(self, data): for k, v in data.items(): self._set_cell('{emisor.%s}' % k, v) return def _receptor(self, data): for k, v in data.items(): self._set_cell('{receptor.%s}' % k, v) return def _copy_row(self, cell): row = cell.getCellAddress().Row source = self._sheet.getRows().getByIndex(row) nc = self._next_cell(cell) self._sheet.copyRange(nc.getCellAddress(), source.getRangeAddress()) return def _conceptos(self, data): first = True for concepto in data: key = concepto.get('noidentificacion', '') description = concepto['descripcion'] unidad = concepto['unidad'] cantidad = concepto['cantidad'] valor_unitario = concepto['valorunitario'] importe = concepto['importe'] if first: first = False cell_1 = self._set_cell('{noidentificacion}', key) cell_2 = self._set_cell('{descripcion}', description) cell_3 = self._set_cell('{unidad}', unidad) cell_4 = self._set_cell('{cantidad}', cantidad, value=True) cell_5 = self._set_cell('{valorunitario}', valor_unitario, value=True) cell_6 = self._set_cell('{importe}', importe, value=True) if len(data) > 1: row = cell_1.getCellAddress().Row + 1 self._sheet.getRows().insertByIndex(row, len(data)-1) else: self._copy_row(cell_1) cell_1 = self._set_cell(v=key, cell=cell_1) cell_2 = self._set_cell(v=description, cell=cell_2) cell_3 = self._set_cell(v=unidad, cell=cell_3) cell_4 = self._set_cell(v=cantidad, cell=cell_4, value=True) cell_5 = self._set_cell(v=valor_unitario, cell=cell_5, value=True) cell_6 = self._set_cell(v=importe, cell=cell_6, value=True) return def _add_totales(self, data): currency = data['moneda'] value = data['total'] cell_value = self._set_cell('{total}', value, value=True) if cell_value is None: return False cell_value.CellStyle = currency return True def _totales(self, data): cell_styles = { 'EUR': 'euro', } currency = data['moneda'] cell_title = self._set_cell('{subtotal.titulo}', 'SubTotal') value = data['subtotal'] cell_value = self._set_cell('{subtotal}', value, value=True) if not cell_value is None: cell_value.CellStyle = cell_styles.get(currency, 'peso') #~ Si encuentra el campo {total}, se asume que los totales e impuestos #~ están declarados de forma independiente cada uno if self._add_totales(data): return #~ Si no se encuentra, copia las celdas hacia abajo de #~ {subtotal.titulo} y {subtotal} #~ print (data['descuento']) if 'descuento' in data: self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v='Descuento', cell=cell_title) value = data['descuento'] cell_value = self._set_cell(v=value, cell=cell_value, value=True) cell_value.CellStyle = currency for tax in data['traslados']: self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v=tax[0], cell=cell_title) cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True) cell_value.CellStyle = currency for tax in data['retenciones']: self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v=tax[0], cell=cell_title) cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True) cell_value.CellStyle = currency for tax in data['taxlocales']: self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v=tax[0], cell=cell_title) cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True) cell_value.CellStyle = currency self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v='Total', cell=cell_title) value = data['total'] cell_value = self._set_cell(v=value, cell=cell_value, value=True) cell_value.CellStyle = currency return def _timbre(self, data): if self._es_pre or self._is_ticket: return for k, v in data.items(): self._set_cell('{timbre.%s}' % k, v) pd = self._sheet.getDrawPage() image = self._template.createInstance('com.sun.star.drawing.GraphicObjectShape') image.GraphicURL = data['path_cbb'] pd.add(image) s = Size() s.Width = 4150 s.Height = 4500 image.setSize(s) image.Anchor = self._set_cell('{timbre.cbb}') return def _donataria(self, data): if not data: return for k, v in data.items(): self._set_cell('{donataria.%s}' % k, v) return def _ine(self, data): if not data: return for k, v in data.items(): self._set_cell('{ine.%s}' % k, v) return def _render(self, data): self._set_search() self._es_pre = data.pop('es_pre', False) self._is_ticket = data.pop('is_ticket', False) self._comprobante(data['comprobante']) self._emisor(data['emisor']) self._receptor(data['receptor']) self._conceptos(data['conceptos']) self._totales(data['totales']) self._timbre(data['timbre']) self._donataria(data['donataria']) self._ine(data['ine']) self._cancelado(data['cancelada']) self._clean() return def pdf(self, path, data, ods=False): options = {'AsTemplate': True, 'Hidden': True} self._template = self._doc_open(path, options) if self._template is None: return b'' self._template.setPrinter(self._set_properties({'PaperFormat': LETTER})) self._render(data) path = '{}.ods'.format(tempfile.mkstemp()[1]) self._template.storeToURL(self._path_url(path), ()) if ods: return self._read(path) doc = self._doc_open(path, {'Hidden': True}) options = {'FilterName': 'calc_pdf_Export'} path = tempfile.mkstemp()[1] doc.storeToURL(self._path_url(path), self._set_properties(options)) doc.close(True) self._template.close(True) return self._read(path) def _get_data(self, doc, name=0): try: sheet = doc.getSheets()[name] cursor = sheet.createCursorByRange(sheet['A1']) cursor.collapseToCurrentRegion() except KeyError: msg = 'Hoja no existe' return (), msg return cursor.getDataArray(), '' def products(self, path): options = {'AsTemplate': True, 'Hidden': True} doc = self._doc_open(path, options) if doc is None: return (), 'No se pudo abrir la plantilla' data, msg = self._get_data(doc) doc.close(True) if len(data) == 1: msg = 'Sin datos para importar' return (), msg fields = ( 'categoria', 'clave', 'clave_sat', 'descripcion', 'unidad', 'valor_unitario', 'inventario', 'existencia', 'codigo_barras', 'impuestos', ) rows = [dict(zip(fields, r)) for r in data[1:]] return rows, '' def employees(self, path): options = {'AsTemplate': True, 'Hidden': True} doc = self._doc_open(path, options) if doc is None: return () data, msg = self._get_data(doc, 'Empleados') doc.close(True) if len(data) == 1: msg = 'Sin datos para importar' return (), msg fields = ( 'num_empleado', 'rfc', 'curp', 'nombre', 'paterno', 'materno', 'fecha_ingreso', 'imss', 'tipo_contrato', 'es_sindicalizado', 'tipo_jornada', 'tipo_regimen', 'departamento', 'puesto', 'riesgo_puesto', 'periodicidad_pago', 'banco', 'cuenta_bancaria', 'clabe', 'salario_base', 'salario_diario', 'estado', 'codigo_postal', 'notas', 'correo', ) rows = tuple([dict(zip(fields, r)) for r in data[1:]]) msg = 'Empleados importados correctamente' return rows, msg def invoice(self, path): options = {'AsTemplate': True, 'Hidden': True} doc = self._doc_open(path, options) if doc is None: return (), 'No se pudo abrir la plantilla' data, msg = self._get_data(doc) doc.close(True) if len(data) == 1: msg = 'Sin datos para importar' return (), msg rows = tuple(data[1:]) return rows, '' def to_pdf(data, emisor_rfc, ods=False): rfc = data['emisor']['rfc'] if DEBUG: rfc = emisor_rfc version = data['comprobante']['version'] if APP_LIBO: app = LIBO() if app.is_running: donativo = '' if data['donativo']: donativo = '_donativo' name = '{}_{}{}.ods'.format(rfc.lower(), version, donativo) path = get_template_ods(name) if path: return app.pdf(path, data, ods) name = '{}_{}.json'.format(rfc, version) custom_styles = get_custom_styles(name) path = get_path_temp() pdf = TemplateInvoice(path) pdf.custom_styles = custom_styles pdf.data = data pdf.render() return read_file(path) def import_employees(rfc): name = '{}_employees.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'tmp', name) if not is_file(path): return () if APP_LIBO: app = LIBO() if app.is_running: return app.employees(path) return () def parse_xml(xml): return ET.fromstring(xml) def get_dict(data): return CaseInsensitiveDict(data) def to_letters(value, moneda): return NumLet(value, moneda).letras def get_qr(data): path = tempfile.mkstemp()[1] qr = pyqrcode.create(data, mode='binary') qr.png(path, scale=7) return path def _get_relacionados(doc, version): node = doc.find('{}CfdiRelacionados'.format(PRE[version])) if node is None: return '' uuids = ['UUID: {}'.format(n.attrib['UUID']) for n in node.getchildren()] return '\n'.join(uuids) def _comprobante(doc, options): data = CaseInsensitiveDict(doc.attrib.copy()) del data['certificado'] serie = '' if 'serie' in data: serie = '{} -'.format(data['serie']) data['seriefolio'] = '{}{}'.format(serie, data['folio']) data['totalenletras'] = to_letters(float(data['total']), data['moneda']) if data['version'] == '3.3': tipos = { 'I': 'ingreso', 'E': 'egreso', 'T': 'traslado', } data['tipodecomprobante'] = tipos.get(data['tipodecomprobante']) data['lugarexpedicion'] = \ 'C.P. de Expedición: {}'.format(data['lugarexpedicion']) data['metododepago'] = options['metododepago'] data['formadepago'] = options['formadepago'] if 'condicionesdepago' in data: data['condicionesdepago'] = \ 'Condiciones de pago: {}'.format(data['condicionesdepago']) data['moneda'] = options['moneda'] data['tiporelacion'] = options.get('tiporelacion', '') data['relacionados'] = _get_relacionados(doc, data['version']) else: fields = { 'formaDePago': 'Forma de Pago: {}\n', 'metodoDePago': 'Método de pago: {}\n', 'condicionesDePago': 'Condiciones de Pago: {}\n', 'NumCtaPago': 'Número de Cuenta de Pago: {}\n', 'Moneda': 'Moneda: {}\n', 'TipoCambio': 'Tipo de Cambio: {}', } datos = '' for k, v in fields.items(): if k in data: datos += v.format(data[k]) data['datos'] = datos data['hora'] = data['fecha'].split('T')[1] fecha = parser.parse(data['fecha']) try: locale.setlocale(locale.LC_TIME, "es_MX.UTF-8") except: pass data['fechaformato'] = fecha.strftime('%A, %d de %B de %Y') data['tipocambio'] = 'Tipo de Cambio: $ {:0.2f}'.format( float(data['tipocambio'])) data['notas'] = options['notas'] return data def _emisor(doc, version, values): emisor = doc.find('{}Emisor'.format(PRE[version])) data = CaseInsensitiveDict(emisor.attrib.copy()) node = emisor.find('{}DomicilioFiscal'.format(PRE[version])) if not node is None: data.update(CaseInsensitiveDict(node.attrib.copy())) if version == '3.2': node = emisor.find('{}RegimenFiscal'.format(PRE[version])) if not node is None: data['regimenfiscal'] = node.attrib['Regimen'] data['regimen'] = node.attrib['Regimen'] else: data['regimenfiscal'] = values['regimenfiscal'] path = _join(PATH_MEDIA, 'logos', '{}.png'.format(data['rfc'].lower())) if is_file(path): data['logo'] = path return data def _receptor(doc, version, values): node = doc.find('{}Receptor'.format(PRE[version])) data = CaseInsensitiveDict(node.attrib.copy()) node = node.find('{}Domicilio'.format(PRE[version])) if not node is None: data.update(node.attrib.copy()) if version == '3.2': return data data['usocfdi'] = values['usocfdi'] return data def _conceptos(doc, version): data = [] conceptos = doc.find('{}Conceptos'.format(PRE[version])) for c in conceptos.getchildren(): values = CaseInsensitiveDict(c.attrib.copy()) if version == '3.3': values['noidentificacion'] = '{}\n(SAT {})'.format( values['noidentificacion'], values['ClaveProdServ']) values['unidad'] = '({})\n{}'.format( values['ClaveUnidad'], values['unidad']) n = c.find('{}CuentaPredial'.format(PRE[version])) if n is not None: v = CaseInsensitiveDict(n.attrib.copy()) info = '\nCuenta Predial Número: {}'.format(v['numero']) values['descripcion'] += info n = c.find('{}InformacionAduanera'.format(PRE[version])) if n is not None: v = CaseInsensitiveDict(n.attrib.copy()) info = '\nNúmero Pedimento: {}'.format(v['numeropedimento']) values['descripcion'] += info data.append(values) return data def _totales(doc, cfdi, version): data = {} data['moneda'] = doc.attrib['Moneda'] data['subtotal'] = cfdi['subtotal'] if 'descuento' in cfdi: data['descuento'] = cfdi['descuento'] data['total'] = cfdi['total'] tn = { '001': 'ISR', '002': 'IVA', '003': 'IEPS', } traslados = [] retenciones = [] taxlocales = [] imp = doc.find('{}Impuestos'.format(PRE[version])) if imp is not None: tmp = CaseInsensitiveDict(imp.attrib.copy()) for k, v in tmp.items(): data[k] = v node = imp.find('{}Traslados'.format(PRE[version])) if node is not None: for n in node.getchildren(): tmp = CaseInsensitiveDict(n.attrib.copy()) if version == '3.3': title = 'Traslado {} {}'.format( tn.get(tmp['impuesto']), tmp['tasaocuota']) else: title = 'Traslado {} {}'.format(tmp['impuesto'], tmp['tasa']) traslados.append((title, float(tmp['importe']))) node = imp.find('{}Retenciones'.format(PRE[version])) if node is not None: for n in node.getchildren(): tmp = CaseInsensitiveDict(n.attrib.copy()) if version == '3.3': title = 'Retención {} {}'.format( tn.get(tmp['impuesto']), '') else: title = 'Retención {} {}'.format(tmp['impuesto'], '') retenciones.append((title, float(tmp['importe']))) node = doc.find('{}Complemento/{}ImpuestosLocales'.format( PRE[version], PRE['LOCALES'])) if node is not None: for otro in list(node): if otro.tag == '{}RetencionesLocales'.format(PRE['LOCALES']): tipo = 'Retención ' name = 'ImpLocRetenido' tasa = 'TasadeRetencion' else: tipo = 'Traslado ' name = 'ImpLocTrasladado' tasa = 'TasadeTraslado' title = '{} {} {}%'.format( tipo, otro.attrib[name], otro.attrib[tasa]) importe = float(otro.attrib['Importe']) taxlocales.append((title, importe)) data['traslados'] = traslados data['retenciones'] = retenciones data['taxlocales'] = taxlocales return data def _timbre(doc, version, values): CADENA = '||{version}|{UUID}|{FechaTimbrado}|{selloCFD}|{noCertificadoSAT}||' if version == '3.3': CADENA = '||{Version}|{UUID}|{FechaTimbrado}|{SelloCFD}|{NoCertificadoSAT}||' node = doc.find('{}Complemento/{}TimbreFiscalDigital'.format( PRE[version], PRE['TIMBRE'])) data = CaseInsensitiveDict(node.attrib.copy()) total_s = '%017.06f' % float(values['total']) qr_data = '?re=%s&rr=%s&tt=%s&id=%s' % ( values['rfc_emisor'], values['rfc_receptor'], total_s, node.attrib['UUID']) data['path_cbb'] = get_qr(qr_data) data['cadenaoriginal'] = CADENA.format(**node.attrib) return data def _donataria(doc, version, fechadof): node = doc.find('{}Complemento/{}Donatarias'.format( PRE[version], PRE['DONATARIA'])) if node is None: return {} data = CaseInsensitiveDict(node.attrib.copy()) data['fechadof'] = fechadof return data def _ine(doc, version): node = doc.find('{}Complemento/{}INE'.format(PRE[version], PRE['INE'])) if node is None: return {} values = ( ('TipoComite', 'Tipo de Comite: {}'), ('TipoProceso', 'Tipo de Proceso: {}'), ('IdContabilidad', 'ID de Contabilidad: {}'), ) data = CaseInsensitiveDict(node.attrib.copy()) for k, v in values: data[k] = v.format(data[k]) return data def get_data_from_xml(invoice, values): data = {'cancelada': invoice.cancelada, 'donativo': invoice.donativo} doc = parse_xml(invoice.xml) data['comprobante'] = _comprobante(doc, values) version = data['comprobante']['version'] data['emisor'] = _emisor(doc, version, values) data['receptor'] = _receptor(doc, version, values) data['conceptos'] = _conceptos(doc, version) data['totales'] = _totales(doc, data['comprobante'], version) data['donataria'] = _donataria(doc, version, values['fechadof']) data['ine'] = _ine(doc, version) options = { 'rfc_emisor': data['emisor']['rfc'], 'rfc_receptor': data['receptor']['rfc'], 'total': data['comprobante']['total'], } data['timbre'] = _timbre(doc, version, options) del data['timbre']['version'] data['comprobante'].update(data['timbre']) return data def to_zip(*files): zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file: for data, file_name in files: zip_file.writestr(file_name, data) return zip_buffer.getvalue() def make_fields(xml): doc = ET.fromstring(xml) data = CaseInsensitiveDict(doc.attrib.copy()) data.pop('certificado') data.pop('sello') version = data['version'] receptor = doc.find('{}Receptor'.format(PRE[version])) receptor = CaseInsensitiveDict(receptor.attrib.copy()) data['receptor_nombre'] = receptor['nombre'] data['receptor_rfc'] = receptor['rfc'] data = {k.lower(): v for k, v in data.items()} return data def make_info_mail(data, fields): try: return data.format(**fields).replace('\n', '
') except: log.error(data) log.error(fields) return data.replace('\n', '
') def send_mail(data): msg = '' server = SendMail(data['server']) is_connect = server.is_connect if is_connect: msg = server.send(data['options']) else: msg = server.error server.close() return {'ok': is_connect, 'msg': msg} def get_path_info(path): path, filename = os.path.split(path) name, extension = os.path.splitext(filename) return (path, filename, name, extension) def get_path_temp(): return tempfile.mkstemp()[1] def get_date(value, next_day=False): d = parser.parse(value) if next_day: return d + datetime.timedelta(days=1) return d def upload_file(rfc, opt, file_obj): if opt == 'emisorlogo': tmp = file_obj.filename.split('.') name = '{}.{}'.format(rfc.lower(), tmp[-1].lower()) path = _join(PATH_MEDIA, 'logos', name) elif opt == 'txt_plantilla_factura_32': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_3.2.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'txt_plantilla_factura_33': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_3.3.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'txt_plantilla_factura_33j': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'json': msg = 'Extensión de archivo incorrecta, selecciona un archivo JSON' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_3.3.json'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'txt_plantilla_ticket': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_ticket.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'txt_plantilla_donataria': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_3.3_donativo.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'bdfl': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'sqlite': msg = 'Extensión de archivo incorrecta, selecciona un archivo SQLite' return {'status': 'server', 'name': msg, 'ok': False} name = '{}.sqlite'.format(rfc.lower()) path = _join('/tmp', name) elif opt == 'products': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_products.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'tmp', name) elif opt == 'invoiceods': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_invoice.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'tmp', name) elif opt == 'employees': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_employees.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'tmp', name) if save_file(path, file_obj.file.read()): return {'status': 'server', 'name': file_obj.filename, 'ok': True} return {'status': 'error', 'ok': False} def _get_pem_from_pfx(cert): tmp_p12 = save_temp(cert.p12) args = "openssl pkcs12 -in '{}' -clcerts -nodes -nocerts " \ "-passin pass:'{}' | openssl rsa".format(tmp_p12, _get_md5(cert.rfc)) result = _call(args) _kill(tmp_p12) return result.encode() def cancel_xml(auth, uuid, certificado): from .pac import Finkok as PAC if DEBUG: auth = {} else: if not auth: msg = 'Sin datos para cancelar' data = {'ok': False, 'error': msg} return data, result msg = 'Factura cancelada correctamente' data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}} pac = PAC(auth) result = pac.cancel_xml(certificado.rfc, str(uuid).upper(), certificado.cer_pem.encode(), _get_pem_from_pfx(certificado)) if result: codes = {None: '', 'Could not get UUID Text': 'UUID no encontrado', 'Invalid Passphrase': 'Contraseña inválida', } if not result['CodEstatus'] is None: data['ok'] = False data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus']) else: data['ok'] = False data['msg'] = pac.error return data, result def cancel_signature(uuid, pk12, rfc, auth): from .pac import Finkok as PAC token = _get_md5(rfc) if USAR_TOKEN: token = auth['PASS'] if AUTH['DEBUG']: token = AUTH['PASS'] template = read_file(TEMPLATE_CANCEL, 'r') data = { 'rfc': rfc, 'fecha': datetime.datetime.now().isoformat()[:19], 'uuid': str(uuid).upper(), } template = template.format(**data) data = { 'xmlsec': PATH_XMLSEC, 'pk12': save_temp(pk12), 'pass': token, 'template': save_temp(template, 'w'), } args = '"{xmlsec}" --sign --pkcs12 "{pk12}" --pwd {pass} ' \ '"{template}"'.format(**data) xml_sign = _call(args) if DEBUG: auth = {} else: if not auth: msg = 'Sin datos para cancelar' result = {'ok': False, 'error': msg} return result msg = 'Factura cancelada correctamente' data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}} pac = PAC(auth) result = pac.cancel_signature(xml_sign) if result: codes = {None: '', 'Could not get UUID Text': 'UUID no encontrado'} if not result['CodEstatus'] is None: data['ok'] = False data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus']) else: data['ok'] = False data['msg'] = pac.error return data, result def run_in_thread(fn): def run(*k, **kw): t = threading.Thread(target=fn, args=k, kwargs=kw) t.start() return t return run def get_bool(value): if not value: return False if value == '1': return True return False def get_float(value): return round(float(value), DECIMALES) def crear_rol(user, contra=''): if not contra: contra = user args = 'psql -U postgres -c "CREATE ROLE {} WITH LOGIN ENCRYPTED ' \ 'PASSWORD \'{}\';"'.format(user, contra) try: result = _call(args) if result == 'CREATE ROLE\n': return True except Exception as e: log.info(e) return False def crear_db(nombre): args = 'psql -U postgres -c "CREATE DATABASE {0} WITH ' \ 'OWNER {0};"'.format(nombre) try: result = _call(args) print (result) if result == 'CREATE DATABASE\n': return True except Exception as e: log.info(e) return False def _backup_db(user): dt = datetime.datetime.now().strftime('%y%m%d_%H%M') path_bk = _join(PATH_MEDIA, 'tmp', '{}_{}.bk'.format(user, dt)) args = 'pg_dump -U postgres -Fc {} > "{}"'.format(user, path_bk) _call(args) return def delete_db(user, bk=True): if bk: _backup_db(user) args = 'psql -U postgres -c "DROP DATABASE {0};"'.format(user) _call(args) args = 'psql -U postgres -c "DROP ROLE {0};"'.format(user) _call(args) return def _to_seafile(path_db, data): if DEBUG: return _, filename = os.path.split(path_db) if SEAFILE_SERVER: msg = '\tSincronizando backup general...' log.info(msg) seafile = SeaFileAPI( SEAFILE_SERVER['URL'], SEAFILE_SERVER['USER'], SEAFILE_SERVER['PASS']) if seafile.is_connect: msg = '\tSincronizando: {} '.format(filename) log.info(msg) seafile.update_file( path_db, SEAFILE_SERVER['REPO'], '/', SEAFILE_SERVER['PASS']) msg = '\tRespaldo general de {} sincronizado'.format(filename) log.info(msg) msg = '\tSin datos para sincronización particular de {}'.format(filename) if len(data) < 2: log.info(msg) return if not data[0] or not data[1] or not data[2]: log.info(msg) return msg = '\tSincronizando backup particular...' log.info(msg) seafile = SeaFileAPI(SEAFILE_SERVER['URL'], data[0], data[1]) if seafile.is_connect: msg = '\t\tSincronizando: {} '.format(filename) log.info(msg) seafile.update_file(path_db, data[2], 'Base de datos/', data[1]) msg = '\t\tRespaldo partícular de {} sincronizado'.format(filename) log.info(msg) return @run_in_thread def _backup_and_sync(rfc, data): msg = 'Generando backup de: {}'.format(rfc) log.info(msg) sql = 'select correo_timbrado, token_timbrado, token_soporte from emisor;' path_bk = _join(PATH_MEDIA, 'tmp', '{}.bk'.format(rfc.lower())) if data['type'] == 'postgres': args = 'pg_dump -U postgres -Fc {} > "{}"'.format( data['name'], path_bk) sql = 'psql -U postgres -d {} -Atc "{}"'.format(data['name'], sql) elif data['type'] == 'sqlite': args = 'gzip -c "{}" > "{}"'.format(data['name'], path_bk) sql = 'sqlite3 "{}" "{}"'.format(data['name'], sql) try: result = _call(args) msg = '\tBackup generado de {}'.format(rfc) log.info(msg) result = _call(sql).strip().split('|') _to_seafile(path_bk, result) except Exception as e: log.info(e) return @run_in_thread def _backup_companies(): if DEBUG: return _, filename = os.path.split(COMPANIES) if SEAFILE_SERVER: msg = '\tSincronizando backup RFCs...' log.info(msg) seafile = SeaFileAPI( SEAFILE_SERVER['URL'], SEAFILE_SERVER['USER'], SEAFILE_SERVER['PASS']) if seafile.is_connect: msg = '\tSincronizando: {} '.format(filename) log.info(msg) seafile.update_file( COMPANIES, SEAFILE_SERVER['REPO'], '/', SEAFILE_SERVER['PASS']) msg = '\tRespaldo general de {} sincronizado'.format(filename) log.info(msg) return def backup_dbs(): con = sqlite3.connect(COMPANIES) cursor = con.cursor() sql = "SELECT * FROM names" cursor.execute(sql) rows = cursor.fetchall() if rows is None: return cursor.close() con.close() for rfc, data in rows: args = loads(data) _backup_and_sync(rfc, args) _backup_companies() return def _validar_directorios(path_bk, target): path = Path(_join(path_bk, target)) path.mkdir(parents=True, exist_ok=True) return str(path) def local_copy(files): if not MV: return path_bk = _join(str(Path.home()), DIR_FACTURAS) if not os.path.isdir(path_bk): msg = 'No existe la carpeta: facturas' log.error(msg) return args = 'df -P {} | tail -1 | cut -d" " -f 1'.format(path_bk) try: result = _call(args) log.info(result) except: pass # ~ if result != 'empresalibre\n': # ~ log.info(result) # ~ msg = 'Asegurate de que exista la carpeta para sincronizar' # ~ log.error(msg) # ~ return # ~ except subprocess.CalledProcessError: # ~ msg = 'No se pudo obtener la ruta para sincronizar' # ~ log.error(msg) # ~ return try: for obj, name, target in files: path = _validar_directorios(path_bk, target) path_file = _join(path, name) m = 'wb' if name.endswith('xml'): m = 'w' save_file(path_file, obj, m) except Exception as e: log.error(e) return def sync_cfdi(auth, files): local_copy(files) if DEBUG: return if not auth['REPO']: return seafile = SeaFileAPI(SEAFILE_SERVER['URL'], auth['USER'], auth['PASS']) if seafile.is_connect: for f in files: seafile.update_file( f, auth['REPO'], 'Facturas/{}/'.format(f[2]), auth['PASS']) return class ImportFacturaLibreGambas(object): def __init__(self, conexion, rfc): self._rfc = rfc self._con = None self._cursor = None self._error = '' self._is_connect = self._connect(conexion) self._clientes = [] self._clientes_rfc = [] @property def error(self): return self._error @property def is_connect(self): return self._is_connect def _validate_rfc(self): sql = "SELECT rfc FROM emisor LIMIT 1" self._cursor.execute(sql) obj = self._cursor.fetchone() if obj is None: self._error = 'No se encontró al emisor: {}'.format(self._rfc) return False if not DEBUG: if obj['rfc'] != self._rfc: self._error = 'Los datos no corresponden al RFC: {}'.format(self._rfc) return False return True def _connect(self, conexion): import psycopg2 import psycopg2.extras try: self._con = psycopg2.connect(conexion) self._cursor = self._con.cursor(cursor_factory=psycopg2.extras.DictCursor) return self._validate_rfc() except Exception as e: log.error(e) self._error = 'No se pudo conectar a la base de datos' return False def close(self): try: self._cursor.close() self._con.close() except: pass return def import_data(self): data = {} tables = ( ('receptores', 'Socios'), ('cfdifacturas', 'Facturas'), ('categorias', 'Categorias'), ('productos', 'Productos'), ('tickets', 'Tickets'), ) for source, target in tables: data[target] = self._get_table(source) data['Socios'] += self._clientes return data def _get_table(self, table): return getattr(self, '_{}'.format(table))() def _tickets(self): sql = "SELECT * FROM tickets" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('serie', 'serie'), ('folio', 'folio'), ('fecha', 'fecha'), ('formadepago', 'forma_pago'), ('subtotal', 'subtotal'), ('descuento', 'descuento'), ('total', 'total'), ('notas', 'notas'), ('factura', 'factura'), ('cancelada', 'cancelado'), ('vendedor', 'vendedor'), ) data = [] totals = len(rows) for i, row in enumerate(rows): msg = '\tImportando ticket {} de {}'.format(i+1, totals) log.info(msg) new = {t: row[s] for s, t in fields} new['notas'] = '' new['fecha'] = new['fecha'].replace(microsecond=0) new['estatus'] = 'Generado' if new['cancelado']: new['estatus'] = 'Cancelado' new['factura'] = self._get_invoice_ticket(new['factura']) new['details'] = self._get_details_ticket(row['id']) new['taxes'] = self._get_taxes_ticket(row['id']) data.append(new) return data def _get_invoice_ticket(self, invoice): if not invoice: return None sql = "SELECT serie, folio FROM cfdifacturas WHERE id=%s" self._cursor.execute(sql, [invoice]) row = self._cursor.fetchone() if row is None: return {} return dict(row) def _get_details_ticket(self, id): sql = "SELECT * FROM t_detalle WHERE id_cfdi=%s" self._cursor.execute(sql, [id]) rows = self._cursor.fetchall() fields = ( ('descripcion', 'descripcion'), ('cantidad', 'cantidad'), ('valorunitario', 'valor_unitario'), ('importe', 'importe'), ('precio', 'precio_final'), ) data = [] for row in rows: new = {t: row[s] for s, t in fields if row[s]} data.append(new) return data def _get_taxes_ticket(self, id): sql = "SELECT * FROM t_impuestos WHERE id_cfdi=%s" self._cursor.execute(sql, [id]) rows = self._cursor.fetchall() tasas = { '0': 0.0, '16': 0.16, '16.00': 0.16, '0.16': 0.16, '11': 0.11, '-10': 0.10, '-2': 0.02, '-0.5': 0.005, '-2/3': 0.106667, '-10.6667': 0.106667, '-10.6666': 0.106667, '-10.666666': 0.106667, '-10.66660': 0.106667, } data = [] for row in rows: filtro = { 'name': row['impuesto'], 'tasa': tasas[row['tasa']], 'tipo': row['tipo'][0], } new = { 'import': row['importe'], 'filter': filtro } data.append(new) return data def _productos(self): UNIDADES = { 'k': 'KGM', 'kg': 'KGM', 'kg.': 'KGM', 'pieza': 'H87', 'pza': 'H87', 'pz': 'H87', 'bulto': 'H87', 'b': 'H87', 'exb': 'H87', 'ex': 'H87', 'caja': 'XBX', 'c': 'XBX', 'rollo': 'XRO', 'tira': 'SR', 't': 'SR', 'cono': 'XAJ', 'paquete': 'XPK', 'pq': 'XPK', } sql = "SELECT * FROM productos" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('id_categoria', 'categoria'), ('noidentificacion', 'clave'), ('descripcion', 'descripcion'), # ~ ('unidad', 'unidad'), ('id_unidad', 'unidad'), # ~ ('costo', 'ultimo_costo'), ('valorunitario', 'valor_unitario'), # ~ ('existencia', 'existencia'), # ~ ('minimo', 'minimo'), ('inventario', 'inventario'), ('codigobarras', 'codigo_barras'), ('cuentapredial', 'cuenta_predial'), ) data = [] sql = """ SELECT nombre, tasa, tipo FROM impuestos WHERE id=%s """ totals = len(rows) for i, row in enumerate(rows): msg = '\tImportando producto {} de {}'.format(i+1, totals) log.info(msg) # ~ print (i, dict(row)) new = {t: row[s] for s, t in fields} # ~ print (new['unidad']) if new['unidad'] == 2: new['unidad'] = 'servicio' u = new['unidad'].lower().strip() if u in ('sin',): continue if not u: u = 'pieza' if not new['categoria']: new['categoria'] = None new['codigo_barras'] = new['codigo_barras'] or '' new['cuenta_predial'] = new['cuenta_predial'] or '' new['descripcion'] = ' '.join(new['descripcion'].split()) new['clave_sat'] = DEFAULT_SAT_PRODUCTO new['unidad'] = UNIDADES.get(u, new['unidad']) self._cursor.execute(sql, [row['id_impuesto1']]) impuestos = self._cursor.fetchall() new['impuestos'] = tuple(impuestos) data.append(new) return data def _categorias(self): sql = "SELECT * FROM categorias ORDER BY id_padre" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('id', 'id'), ('categoria', 'categoria'), ('id_padre', 'padre'), ) data = [] for row in rows: new = {t: row[s] for s, t in fields} if new['padre'] == 0: new['padre'] = None data.append(new) return data def _get_cliente(self, invoice): sql = "SELECT rfc, nombre FROM receptores WHERE id=%s" self._cursor.execute(sql, [invoice['id_cliente']]) obj = self._cursor.fetchone() if not obj is None: data = { 'rfc': obj['rfc'], 'slug': to_slug(obj['nombre']), } return data if not invoice['xml']: return {} doc = parse_xml(invoice['xml']) version = doc.attrib['version'] node = doc.find('{}Receptor'.format(PRE[version])) rfc = node.attrib['rfc'] nombre = node.attrib['nombre'] tipo_persona = 1 if rfc == 'XEXX010101000': tipo_persona = 4 elif rfc == 'XAXX010101000': tipo_persona = 3 elif len(rfc) == 12: tipo_persona = 2 data = { 'tipo_persona': tipo_persona, 'rfc': rfc, 'nombre': nombre, 'slug': to_slug(nombre), 'es_cliente': True, 'es_activo': False, } if not rfc in self._clientes_rfc: self._clientes_rfc.append(rfc) self._clientes.append(data) data = { 'rfc': data['rfc'], 'slug': data['slug'], } return data def _get_detalles(self, id): sql = "SELECT * FROM cfdidetalle WHERE id_cfdi=%s" self._cursor.execute(sql, [id]) rows = self._cursor.fetchall() fields = ( ('categoria', 'categoria'), ('cantidad', 'cantidad'), ('unidad', 'unidad'), ('noidentificacion', 'clave'), ('descripcion', 'descripcion'), ('valorunitario', 'valor_unitario'), ('importe', 'importe'), ('numero', 'pedimento'), ('fecha', 'fecha_pedimento'), ('aduana', 'aduana'), ('cuentapredial', 'cuenta_predial'), ('descuento', 'descuento'), ('precio', 'precio_final'), ) data = [] for row in rows: new = {t: row[s] for s, t in fields if row[s]} data.append(new) return data def _get_impuestos(self, id): sql = "SELECT * FROM cfdiimpuestos WHERE id_cfdi=%s" self._cursor.execute(sql, [id]) rows = self._cursor.fetchall() tasas = { '0': 0.0, '16': 0.16, '16.00': 0.16, '11': 0.11, '-10': 0.10, '-2': 0.02, '-0.5': 0.005, '-2/3': 0.106667, '-10.6666': 0.106667, '-10.666666': 0.106667, '-10.66660': 0.106667, '-4': 0.04, } data = [] for row in rows: filtro = { 'name': row['impuesto'], 'tasa': tasas[row['tasa']], 'tipo': row['tipo'][0], } new = { 'importe': row['importe'], 'filtro': filtro } data.append(new) return data def _cfdifacturas(self): sql = "SELECT * FROM cfdifacturas" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('version', 'version'), ('serie', 'serie'), ('folio', 'folio'), ('fecha', 'fecha'), ('fecha_timbrado', 'fecha_timbrado'), ('formadepago', 'forma_pago'), ('condicionesdepago', 'condiciones_pago'), ('subtotal', 'subtotal'), ('descuento', 'descuento'), ('tipocambio', 'tipo_cambio'), ('moneda', 'moneda'), ('total', 'total'), ('tipodecomprobante', 'tipo_comprobante'), ('metododepago', 'metodo_pago'), ('lugarexpedicion', 'lugar_expedicion'), ('totalimpuestosretenidos', 'total_retenciones'), ('totalimpuestostrasladados', 'total_traslados'), ('xml', 'xml'), ('id_cliente', 'cliente'), ('notas', 'notas'), ('uuid', 'uuid'), ('cancelada', 'cancelada'), ) data = [] totals = len(rows) for i, row in enumerate(rows): msg = '\tImportando factura {} de {}'.format(i+1, totals) log.info(msg) new = {t: row[s] for s, t in fields} for _, f in fields: new[f] = new[f] or '' new['fecha'] = new['fecha'].replace(microsecond=0) if new['fecha_timbrado']: new['fecha_timbrado'] = new['fecha_timbrado'].replace(microsecond=0) else: new['fecha_timbrado'] = None new['estatus'] = 'Timbrada' if new['cancelada']: new['estatus'] = 'Cancelada' if not new['uuid']: new['uuid'] = None elif new['uuid'] in('ok', '123', '??', 'X'): new['uuid'] = None new['estatus'] = 'Cancelada' new['cancelada'] = True if new['xml'] is None: new['xml'] = '' new['pagada'] = True new['total_mn'] = round(row['tipocambio'] * row['total'], 2) new['detalles'] = self._get_detalles(row['id']) new['impuestos'] = self._get_impuestos(row['id']) new['cliente'] = self._get_cliente(row) data.append(new) return data def _receptores(self): sql = "SELECT * FROM receptores" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('rfc', 'rfc'), ('nombre', 'nombre'), ('calle', 'calle'), ('noexterior', 'no_exterior'), ('nointerior', 'no_interior'), ('colonia', 'colonia'), ('municipio', 'municipio'), ('estado', 'estado'), ('pais', 'pais'), ('codigopostal', 'codigo_postal'), ('extranjero', 'es_extranjero'), ('activo', 'es_activo'), ('fechaalta', 'fecha_alta'), ('notas', 'notas'), ) data = [] sql1 = "SELECT correo FROM correos WHERE id_padre=%s" sql2 = "SELECT telefono FROM telefonos WHERE id_padre=%s" totals = len(rows) for i, row in enumerate(rows): msg = '\tImportando cliente {} de {}'.format(i+1, totals) log.info(msg) new = {t: row[s] for s, t in fields} new['slug'] = to_slug(new['nombre']) new['es_cliente'] = True if new['fecha_alta'] is None: new['fecha_alta'] = str(now()) else: new['fecha_alta'] = str(new['fecha_alta']) for _, f in fields: new[f] = new[f] or '' if new['es_extranjero']: new['tipo_persona'] = 4 elif new['rfc'] == 'XAXX010101000': new['tipo_persona'] = 3 elif len(new['rfc']) == 12: new['tipo_persona'] = 2 self._cursor.execute(sql1, (row['id'],)) tmp = self._cursor.fetchall() if tmp: new['correo_facturas'] = ', '.join([r[0] for r in tmp]) self._cursor.execute(sql2, (row['id'],)) tmp = self._cursor.fetchall() if tmp: new['telefonos'] = ', '.join([r[0] for r in tmp]) data.append(new) return data class ImportFacturaLibre(object): def __init__(self, path, rfc): self._rfc = rfc self._con = None self._cursor = None self._error = '' self._is_connect = self._connect(path) self._clientes = [] self._clientes_rfc = [] @property def error(self): return self._error @property def is_connect(self): return self._is_connect def _validate_rfc(self): sql = "SELECT rfc FROM emisor LIMIT 1" self._cursor.execute(sql) obj = self._cursor.fetchone() if obj is None: self._error = 'No se encontró al emisor: {}'.format(self._rfc) return False if not DEBUG: if obj['rfc'] != self._rfc: self._error = 'Los datos no corresponden al RFC: {}'.format(self._rfc) return False return True def _connect(self, path): try: self._con = sqlite3.connect(path) self._con.row_factory = sqlite3.Row self._cursor = self._con.cursor() return self._validate_rfc() except Exception as e: log.error(e) self._error = 'No se pudo conectar a la base de datos' return False def close(self): try: self._cursor.close() self._con.close() except: pass return def import_data(self): data = {} tables = ( ('receptores', 'Socios'), ('cfdfacturas', 'Facturas'), ('categorias', 'Categorias'), ) for source, target in tables: data[target] = self._get_table(source) data['Socios'] += self._clientes return data def _get_table(self, table): return getattr(self, '_{}'.format(table))() def import_productos(self): sql = "SELECT * FROM productos" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('id_categoria', 'categoria'), ('noIdentificacion', 'clave'), ('descripcion', 'descripcion'), ('unidad', 'unidad'), ('valorUnitario', 'valor_unitario'), ('existencia', 'existencia'), ('inventario', 'inventario'), ('codigobarras', 'codigo_barras'), ('CuentaPredial', 'cuenta_predial'), ('precio_compra', 'ultimo_precio'), ('minimo', 'minimo'), ) data = [] sql = """ SELECT nombre, tasa, tipo FROM impuestos, productos, productosimpuestos WHERE productos.id=productosimpuestos.id_producto AND productosimpuestos.id_impuesto=impuestos.id AND productos.id = ? """ for row in rows: new = {t: row[s] for s, t in fields} if new['categoria'] == 0: new['categoria'] = None new['descripcion'] = ' '.join(new['descripcion'].split()) new['clave_sat'] = DEFAULT_SAT_PRODUCTO self._cursor.execute(sql, (row['id'],)) impuestos = self._cursor.fetchall() new['impuestos'] = tuple(impuestos) data.append(new) return data def _categorias(self): sql = "SELECT * FROM categorias ORDER BY id_padre" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('id', 'id'), ('categoria', 'categoria'), ('id_padre', 'padre'), ) data = [] for row in rows: new = {t: row[s] for s, t in fields} if new['padre'] == 0: new['padre'] = None data.append(new) return data def _get_cliente(self, invoice): sql = "SELECT rfc, nombre FROM receptores WHERE id=?" self._cursor.execute(sql, [invoice['id_cliente']]) obj = self._cursor.fetchone() if not obj is None: data = { 'rfc': obj['rfc'], 'slug': to_slug(obj['nombre']), } return data if not invoice['xml']: return {} doc = parse_xml(invoice['xml']) version = doc.attrib['version'] node = doc.find('{}Receptor'.format(PRE[version])) rfc = node.attrib['rfc'] nombre = node.attrib['nombre'] # ~ Validaciones especiales tipo_persona = 1 if rfc == 'XEXX010101000': tipo_persona = 4 elif rfc == 'XAXX010101000': tipo_persona = 3 elif len(rfc) == 12: tipo_persona = 2 data = { 'tipo_persona': tipo_persona, 'rfc': rfc, 'nombre': nombre, 'slug': to_slug(nombre), 'es_cliente': True, 'es_activo': False, } if not rfc in self._clientes_rfc: self._clientes_rfc.append(rfc) self._clientes.append(data) data = { 'rfc': data['rfc'], 'slug': data['slug'], } return data def _get_detalles(self, id): sql = "SELECT * FROM cfddetalle WHERE id_cfd=?" self._cursor.execute(sql, [id]) rows = self._cursor.fetchall() fields = ( ('categoria', 'categoria'), ('cantidad', 'cantidad'), ('unidad', 'unidad'), ('noIdentificacion', 'clave'), ('descripcion', 'descripcion'), ('valorUnitario', 'valor_unitario'), ('importe', 'importe'), ('numero', 'pedimento'), ('fecha', 'fecha_pedimento'), ('aduana', 'aduana'), ('CuentaPredial', 'cuenta_predial'), ('alumno', 'alumno'), ('curp', 'curp'), ('nivel', 'nivel'), ('autorizacion', 'autorizacion'), ) data = [] for row in rows: new = {t: row[s] for s, t in fields if row[s]} data.append(new) return data def _get_impuestos(self, id): sql = "SELECT * FROM cfdimpuestos WHERE id_cfd=?" self._cursor.execute(sql, [id]) rows = self._cursor.fetchall() tasas = { '0': 0.0, '16': 0.16, '16.00': 0.16, '-16': 0.16, '11': 0.11, '-10': 0.10, '-2': 0.02, '-0.5': 0.005, '-2/3': 0.106667, '-10.6667': 0.106667, '-10.6666': 0.106667, '-10.666666': 0.106667, '-10.66660': 0.106667, '-10.67': 0.106667, '-10.66666666666667': 0.106667, '-4': 0.04, '1': 0.01, '25': 0.25, '26.5': 0.265, '30': 0.30, '8': 0.08, } data = [] for row in rows: # ~ print (id, dict(row)) filtro = { 'name': row['nombre'], 'tasa': tasas[row['tasa']], 'tipo': row['tipo'][0], } new = { 'importe': row['importe'], 'filtro': filtro } data.append(new) return data def _cfdfacturas(self): sql = "SELECT * FROM cfdfacturas" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('version', 'version'), ('serie', 'serie'), ('folio', 'folio'), ('fecha', 'fecha'), ('fecha_timbrado', 'fecha_timbrado'), ('formaDePago', 'forma_pago'), ('condicionesDePago', 'condiciones_pago'), ('subTotal', 'subtotal'), ('descuento', 'descuento'), ('TipoCambio', 'tipo_cambio'), ('Moneda', 'moneda'), ('total', 'total'), ('tipoDeComprobante', 'tipo_comprobante'), ('metodoDePago', 'metodo_pago'), ('LugarExpedicion', 'lugar_expedicion'), ('totalImpuestosRetenidos', 'total_retenciones'), ('totalImpuestosTrasladados', 'total_traslados'), ('xml', 'xml'), ('id_cliente', 'cliente'), ('notas', 'notas'), ('uuid', 'uuid'), ('donativo', 'donativo'), ('estatus', 'estatus'), ('regimen', 'regimen_fiscal'), ('xml_acuse', 'acuse'), ) data = [] for row in rows: row = dict(row) if not 'xml_acuse'in row: row['xml_acuse'] = '' new = {t: row[s] for s, t in fields} if not 'uuid' in new or not new['uuid']: new['uuid'] = None if not 'xml' in new or new['xml'] is None: new['xml'] = '' if row['estatus'] == 'Pagada': new['pagada'] = True elif row['estatus'] in ('Cancelada', 'Validada'): new['cancelada'] = True if new['fecha'] is None: new['fecha'] = str(now()) new['total_mn'] = round(row['TipoCambio'] * row['total'], 2) new['detalles'] = self._get_detalles(row['id']) new['impuestos'] = self._get_impuestos(row['id']) new['cliente'] = self._get_cliente(row) data.append(new) return data def _receptores(self): sql = "SELECT * FROM receptores" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('rfc', 'rfc'), ('nombre', 'nombre'), ('calle', 'calle'), ('noExterior', 'no_exterior'), ('noInterior', 'no_interior'), ('colonia', 'colonia'), ('municipio', 'municipio'), ('estado', 'estado'), ('pais', 'pais'), ('codigoPostal', 'codigo_postal'), ('extranjero', 'es_extranjero'), ('activo', 'es_activo'), ('fechaalta', 'fecha_alta'), ('notas', 'notas'), ('cuentaCliente', 'cuenta_cliente'), ('cuentaProveedor', 'cuenta_proveedor'), ('saldoCliente', 'saldo_cliente'), ('saldoProveedor', 'saldo_proveedor'), ('esCliente', 'es_cliente'), ('esProveedor', 'es_proveedor'), ) data = [] sql1 = "SELECT correo FROM correos WHERE id_cliente=?" sql2 = "SELECT telefono FROM telefonos WHERE id_cliente=?" for row in rows: new = {t: row[s] for s, t in fields} new['slug'] = to_slug(new['nombre']) new['fecha_alta'] = str(parser.parse(new['fecha_alta'])) for _, f in fields: new[f] = new[f] or '' if new['es_extranjero']: new['tipo_persona'] = 4 elif new['rfc'] == 'XAXX010101000': new['tipo_persona'] = 3 elif len(new['rfc']) == 12: new['tipo_persona'] = 2 self._cursor.execute(sql1, (row['id'],)) tmp = self._cursor.fetchall() if tmp: new['correo_facturas'] = ', '.join([r[0] for r in tmp]) self._cursor.execute(sql2, (row['id'],)) tmp = self._cursor.fetchall() if tmp: new['telefonos'] = ', '.join([r[0] for r in tmp]) data.append(new) return data def print_ticket(data, info): p = PrintTicket(info) return p.printer(data) def import_products(rfc): name = '{}_products.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'tmp', name) if not is_file(path): return (), 'No se encontró la plantilla' if APP_LIBO: app = LIBO() if app.is_running: return app.products(path) return (), 'No se encontro LibreOffice' def import_invoice(rfc): name = '{}_invoice.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'tmp', name) if not is_file(path): return (), 'No se encontró la plantilla' if APP_LIBO: app = LIBO() if app.is_running: return app.invoice(path) return (), 'No se encontro LibreOffice' def calc_to_date(value): return datetime.date.fromordinal(int(value) + 693594)