#!/usr/bin/env python import datetime import getpass import hashlib import json import locale import mimetypes import os import re import sqlite3 import socket import subprocess import tempfile import threading import time import unicodedata import uuid import zipfile from io import BytesIO from smtplib import SMTPException, SMTPAuthenticationError from xml.etree import ElementTree as ET try: import uno from com.sun.star.beans import PropertyValue from com.sun.star.awt import Size APP_LIBO = True except: APP_LIBO = False import pyqrcode from dateutil import parser from .helper import CaseInsensitiveDict, NumLet, SendMail, TemplateInvoice, \ SeaFileAPI from settings import DEBUG, log, template_lookup, COMPANIES, DB_SAT, \ PATH_XSLT, PATH_XSLTPROC, PATH_OPENSSL, PATH_TEMPLATES, PATH_MEDIA, PRE, \ PATH_XMLSEC, TEMPLATE_CANCEL, DEFAULT_SAT_PRODUCTO, DECIMALES from settings import SEAFILE_SERVER def _call(args): return subprocess.check_output(args, shell=True).decode() def _get_md5(data): return hashlib.md5(data.encode()).hexdigest() def save_temp(data, modo='wb'): path = tempfile.mkstemp()[1] with open(path, modo) as f: f.write(data) return path def save_file(path, data, modo='wb'): try: with open(path, modo) as f: f.write(data) return True except: return False def _join(*paths): return os.path.join(*paths) def _kill(path): try: os.remove(path) except: pass return def get_pass(): password = getpass.getpass('Introduce la contraseña: ') pass2 = getpass.getpass('Confirma la contraseña: ') if password != pass2: msg = 'Las contraseñas son diferentes' return False, msg password = password.strip() if not password: msg = 'La contraseña es necesaria' return False, msg return True, password def get_value(arg): value = input('Introduce el {}: '.format(arg)).strip() if not value: msg = 'El {} es requerido'.format(arg) log.error(msg) return '' return value def _valid_db_companies(): con = sqlite3.connect(COMPANIES) sql = """ CREATE TABLE IF NOT EXISTS names( rfc TEXT NOT NULL COLLATE NOCASE UNIQUE, con TEXT NOT NULL ); """ cursor = con.cursor() cursor.executescript(sql) cursor.close() con.close() return def _get_args(rfc): _valid_db_companies() con = sqlite3.connect(COMPANIES) cursor = con.cursor() sql = "SELECT con FROM names WHERE rfc=?" cursor.execute(sql, (rfc,)) values = cursor.fetchone() if values is None: msg = 'No se encontró el RFC' log.error(msg) return '' cursor.close() con.close() return values[0] def get_rfcs(): _valid_db_companies() con = sqlite3.connect(COMPANIES) cursor = con.cursor() sql = "SELECT * FROM names" cursor.execute(sql) values = cursor.fetchall() cursor.close() con.close() return values def get_con(rfc=''): if not rfc: rfc = get_value('RFC').upper() if not rfc: return {} args = _get_args(rfc.upper()) if not args: return {} return loads(args) def get_sat_key(table, key): con = sqlite3.connect(DB_SAT) cursor = con.cursor() sql = 'SELECT key, name FROM {} WHERE key=?'.format(table) cursor.execute(sql, (key,)) data = cursor.fetchone() cursor.close() con.close() if data is None: return {'ok': False, 'text': 'No se encontró la clave'} return {'ok': True, 'text': data[1]} def get_sat_unidades(key): con = sqlite3.connect(DB_SAT) con.row_factory = sqlite3.Row cursor = con.cursor() filtro = '%{}%'.format(key) sql = "SELECT * FROM unidades WHERE key LIKE ? OR name LIKE ?" cursor.execute(sql, [filtro, filtro]) data = cursor.fetchall() cursor.close() con.close() if data is None: return () data = [dict(r) for r in data] return tuple(data) def get_sat_productos(key): con = sqlite3.connect(DB_SAT) con.row_factory = sqlite3.Row cursor = con.cursor() filtro = '%{}%'.format(key) sql = "SELECT * FROM productos WHERE key LIKE ? OR name LIKE ?" cursor.execute(sql, [filtro, filtro]) data = cursor.fetchall() cursor.close() con.close() if data is None: return () data = [dict(r) for r in data] return tuple(data) def now(): return datetime.datetime.now().replace(microsecond=0) def get_token(): return _get_hash(uuid.uuid4().hex) def get_mimetype(path): mt = mimetypes.guess_type(path)[0] return mt or 'application/octet-stream' def is_file(path): return os.path.isfile(path) def get_stream(path): return get_file(path), get_size(path) def get_file(path): return open(path, 'rb') def read_file(path, mode='rb'): return open(path, mode).read() def get_size(path): return os.path.getsize(path) def get_template(name, data={}): #~ print ('NAME', name, data) template = template_lookup.get_template(name) return template.render(**data) def get_custom_styles(name, default='plantilla_factura.json'): path = _join(PATH_MEDIA, 'templates', name.lower()) if is_file(path): with open(path) as fh: return loads(fh.read()) path = _join(PATH_TEMPLATES, default) if is_file(path): with open(path) as fh: return loads(fh.read()) return {} def get_template_ods(name, default='plantilla_factura.ods'): path = _join(PATH_MEDIA, 'templates', name.lower()) if is_file(path): return path path = _join(PATH_TEMPLATES, default) if is_file(path): return path return '' def dumps(data): return json.dumps(data, default=str) def loads(data): return json.loads(data) def clean(values): for k, v in values.items(): if isinstance(v, str): values[k] = v.strip() return values def parse_con(values): data = values.split('|') try: con = {'type': data[0]} if con['type'] == 'sqlite': con['name'] = data[1] else: if data[1]: con['host'] = data[1] if data[2]: con['port'] = data[2] con['name'] = data[3] con['user'] = data[4] con['password'] = data[5] return con except IndexError: return {} def spaces(value): return ' '.join(value.split()) def to_slug(string): value = (unicodedata.normalize('NFKD', string) .encode('ascii', 'ignore') .decode('ascii').lower()) return value.replace(' ', '_') class Certificado(object): def __init__(self, paths): self._path_key = paths['path_key'] self._path_cer = paths['path_cer'] self._modulus = '' #~ self._save_files() self.error = '' #~ def _save_files(self): #~ try: #~ self._path_key = _save_temp(bytes(self._key)) #~ self._path_cer = _save_temp(bytes(self._cer)) #~ except Exception as e: #~ log.error(e) #~ self._path_key = '' #~ self._path_cer = '' #~ return def _kill(self, path): try: os.remove(path) except: pass return def _get_info_cer(self, session_rfc): data = {} args = 'openssl x509 -inform DER -in {}' try: cer_pem = _call(args.format(self._path_cer)) except Exception as e: self.error = 'No se pudo convertir el CER en PEM' return data args = 'openssl enc -base64 -in {}' try: cer_txt = _call(args.format(self._path_cer)) except Exception as e: self.error = 'No se pudo convertir el CER en TXT' return data args = 'openssl x509 -inform DER -in {} -noout -{}' try: result = _call(args.format(self._path_cer, 'purpose')).split('\n')[3] except Exception as e: self.error = 'No se puede saber si es FIEL' return data if result == 'SSL server : No': self.error = 'El certificado es FIEL' return data result = _call(args.format(self._path_cer, 'serial')) serie = result.split('=')[1].split('\n')[0][1::2] result = _call(args.format(self._path_cer, 'subject')) #~ Verificar si es por la version de OpenSSL t1 = 'x500UniqueIdentifier = ' t2 = 'x500UniqueIdentifier=' if t1 in result: rfc = result.split(t1)[1][:13].strip() elif t2 in result: rfc = result.split(t2)[1][:13].strip() else: self.error = 'No se pudo obtener el RFC del certificado' print ('\n', result) return data if not DEBUG: if not rfc == session_rfc: self.error = 'El RFC del certificado no corresponde.' return data dates = _call(args.format(self._path_cer, 'dates')).split('\n') desde = parser.parse(dates[0].split('=')[1]) hasta = parser.parse(dates[1].split('=')[1]) self._modulus = _call(args.format(self._path_cer, 'modulus')) data['cer'] = read_file(self._path_cer) data['cer_pem'] = cer_pem data['cer_txt'] = cer_txt.replace('\n', '') data['serie'] = serie data['rfc'] = rfc data['desde'] = desde.replace(tzinfo=None) data['hasta'] = hasta.replace(tzinfo=None) return data def _get_p12(self, password, rfc): tmp_cer = tempfile.mkstemp()[1] tmp_key = tempfile.mkstemp()[1] tmp_p12 = tempfile.mkstemp()[1] args = 'openssl x509 -inform DER -in "{}" -out "{}"' _call(args.format(self._path_cer, tmp_cer)) args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" -out "{}"' _call(args.format(self._path_key, password, tmp_key)) args = 'openssl pkcs12 -export -in "{}" -inkey "{}" -name "{}" ' \ '-passout pass:"{}" -out "{}"' _call(args.format(tmp_cer, tmp_key, rfc, _get_md5(rfc), tmp_p12)) data = read_file(tmp_p12) self._kill(tmp_cer) self._kill(tmp_key) self._kill(tmp_p12) return data def _get_info_key(self, password, rfc): data = {} args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}"' try: result = _call(args.format(self._path_key, password)) except Exception as e: self.error = 'Contraseña incorrecta' return data args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" | ' \ 'openssl rsa -noout -modulus' mod_key = _call(args.format(self._path_key, password)) if self._modulus != mod_key: self.error = 'Los archivos no son pareja' return data args = 'openssl pkcs8 -inform DER -in "{}" -passin pass:"{}" | ' \ 'openssl rsa -des3 -passout pass:"{}"'.format( self._path_key, password, _get_md5(rfc)) key_enc = _call(args) data['key'] = read_file(self._path_key) data['key_enc'] = key_enc data['p12'] = self._get_p12(password, rfc) return data def validate(self, password, rfc): if not self._path_key or not self._path_cer: self.error = 'Error en las rutas temporales del certificado' return {} data = self._get_info_cer(rfc) if not data: return {} llave = self._get_info_key(password, data['rfc']) if not llave: return {} data.update(llave) self._kill(self._path_key) self._kill(self._path_cer) return data def make_xml(data, certificado): from .cfdi_xml import CFDI if DEBUG: data['emisor']['Rfc'] = certificado.rfc data['emisor']['RegimenFiscal'] = '603' cfdi = CFDI() xml = cfdi.get_xml(data) data = { 'xsltproc': PATH_XSLTPROC, 'xslt': _join(PATH_XSLT, 'cadena.xslt'), 'xml': save_temp(xml, 'w'), 'openssl': PATH_OPENSSL, 'key': save_temp(certificado.key_enc, 'w'), 'pass': _get_md5(certificado.rfc) } args = '"{xsltproc}" "{xslt}" "{xml}" | ' \ '"{openssl}" dgst -sha256 -sign "{key}" -passin pass:"{pass}" | ' \ '"{openssl}" enc -base64 -A'.format(**data) sello = _call(args) _kill(data['xml']) _kill(data['key']) return cfdi.add_sello(sello) def timbra_xml(xml, auth): from .pac import Finkok as PAC if DEBUG: auth = {} else: if not auth: msg = 'Sin datos para timbrar' result = {'ok': True, 'error': msg} return result result = {'ok': True, 'error': ''} pac = PAC(auth) xml = pac.timbra_xml(xml) if not xml: result['ok'] = False result['error'] = pac.error return result result['xml'] = xml result['uuid'] = pac.uuid result['fecha'] = pac.fecha return result def get_sat(xml): from .pac import get_status_sat return get_status_sat(xml) class LIBO(object): HOST = 'localhost' PORT = '8100' ARG = 'socket,host={},port={};urp;StarOffice.ComponentContext'.format( HOST, PORT) def __init__(self): self._app = None self._start_office() self._init_values() def _init_values(self): self._es_pre = False self._ctx = None self._sm = None self._desktop = None if self.is_running: ctx = uno.getComponentContext() service = 'com.sun.star.bridge.UnoUrlResolver' resolver = ctx.ServiceManager.createInstanceWithContext(service, ctx) self._ctx = resolver.resolve('uno:{}'.format(self.ARG)) self._sm = self._ctx.ServiceManager self._desktop = self._create_instance('com.sun.star.frame.Desktop') return def _create_instance(self, name, with_context=True): if with_context: instance = self._sm.createInstanceWithContext(name, self._ctx) else: instance = self._sm.createInstance(name) return instance @property def is_running(self): try: s = socket.create_connection((self.HOST, self.PORT), 5.0) s.close() return True except ConnectionRefusedError: return False def _start_office(self): if self.is_running: return c = 1 while c < 4: c += 1 self.app = subprocess.Popen([ 'soffice', '--headless', '--accept={}'.format(self.ARG)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) time.sleep(5) if self.is_running: return return def _set_properties(self, properties): pl = [] for k, v in properties.items(): pv = PropertyValue() pv.Name = k pv.Value = v pl.append(pv) return tuple(pl) def _doc_open(self, path, options): options = self._set_properties(options) path = self._path_url(path) try: doc = self._desktop.loadComponentFromURL(path, '_blank', 0, options) return doc except: return None def _path_url(self, path): if path.startswith('file://'): return path return uno.systemPathToFileUrl(path) def close(self): if self.is_running: if not self._desktop is None: self._desktop.terminate() if not self._app is None: self._app.terminate() return def _read(self, path): try: return open(path, 'rb').read() except: return b'' def _clean(self): self._sd.SearchRegularExpression = True self._sd.setSearchString("\{(\w.+)\}") self._search.replaceAll(self._sd) return def _cancelado(self, cancel): if not cancel: pd = self._sheet.getDrawPage() if pd.getCount(): pd.remove(pd.getByIndex(0)) return def _set_search(self): self._sheet = self._template.getSheets().getByIndex(0) self._search = self._sheet.getPrintAreas()[0] self._search = self._sheet.getCellRangeByPosition( self._search.StartColumn, self._search.StartRow, self._search.EndColumn, self._search.EndRow ) self._sd = self._sheet.createSearchDescriptor() self._sd.SearchCaseSensitive = False return def _next_cell(self, cell): col = cell.getCellAddress().Column row = cell.getCellAddress().Row + 1 return self._sheet.getCellByPosition(col, row) def _copy_cell(self, cell): destino = self._next_cell(cell) self._sheet.copyRange(destino.getCellAddress(), cell.getRangeAddress()) return destino def _set_cell(self, k='', v=None, cell=None, value=False): if k: self._sd.setSearchString(k) ranges = self._search.findAll(self._sd) if ranges: ranges = ranges.getRangeAddressesAsString().split(';') for r in ranges: for c in r.split(','): cell = self._sheet.getCellRangeByName(c) if v is None: return cell if cell.getImplementationName() == 'ScCellObj': pattern = re.compile(k, re.IGNORECASE) nv = pattern.sub(v, cell.getString()) if value: cell.setValue(nv) else: cell.setString(nv) return cell if cell: if cell.getImplementationName() == 'ScCellObj': ca = cell.getCellAddress() new_cell = self._sheet.getCellByPosition(ca.Column, ca.Row + 1) if value: new_cell.setValue(v) else: new_cell.setString(v) return new_cell def _comprobante(self, data): for k, v in data.items(): if k in ('total', 'descuento', 'subtotal'): self._set_cell('{cfdi.%s}' % k, v, value=True) else: self._set_cell('{cfdi.%s}' % k, v) return def _emisor(self, data): for k, v in data.items(): self._set_cell('{emisor.%s}' % k, v) return def _receptor(self, data): for k, v in data.items(): self._set_cell('{receptor.%s}' % k, v) return def _conceptos(self, data): first = True for concepto in data: key = concepto.get('noidentificacion', '') description = concepto['descripcion'] unidad = concepto['unidad'] cantidad = concepto['cantidad'] valor_unitario = concepto['valorunitario'] importe = concepto['importe'] if first: first = False cell_1 = self._set_cell('{noidentificacion}', key) cell_2 = self._set_cell('{descripcion}', description) cell_3 = self._set_cell('{unidad}', unidad) cell_4 = self._set_cell('{cantidad}', cantidad, value=True) cell_5 = self._set_cell('{valorunitario}', valor_unitario, value=True) cell_6 = self._set_cell('{importe}', importe, value=True) else: row = cell_2.getCellAddress().Row + 1 self._sheet.getRows().insertByIndex(row, 1) if cell_1: self._copy_cell(cell_1) cell_1 = self._set_cell(v=key, cell=cell_1) if cell_3: self._copy_cell(cell_3) cell_3 = self._set_cell(v=unidad, cell=cell_3) self._copy_cell(cell_2) self._copy_cell(cell_4) self._copy_cell(cell_5) self._copy_cell(cell_6) cell_2 = self._set_cell(v=description, cell=cell_2) cell_4 = self._set_cell(v=cantidad, cell=cell_4, value=True) cell_5 = self._set_cell(v=valor_unitario, cell=cell_5, value=True) cell_6 = self._set_cell(v=importe, cell=cell_6, value=True) return def _totales(self, data): currency = data['moneda'] cell_title = self._set_cell('{subtotal.titulo}', 'SubTotal') value = data['subtotal'] cell_value = self._set_cell('{subtotal}', value, value=True) cell_value.CellStyle = currency #~ Si encuentra el campo {total}, se asume que los totales e impuestos #~ están declarados de forma independiente cada uno #~ if self._add_totales(xml): #~ return #~ Si no se encuentra, copia las celdas hacia abajo de #~ {subtotal.titulo} y {subtotal} #~ print (data['descuento']) if 'descuento' in data: self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v='Descuento', cell=cell_title) value = data['descuento'] cell_value = self._set_cell(v=value, cell=cell_value, value=True) cell_value.CellStyle = currency for tax in data['traslados']: self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v=tax[0], cell=cell_title) cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True) cell_value.CellStyle = currency for tax in data['retenciones']: self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v=tax[0], cell=cell_title) cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True) cell_value.CellStyle = currency for tax in data['taxlocales']: self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v=tax[0], cell=cell_title) cell_value = self._set_cell(v=tax[1], cell=cell_value, value=True) cell_value.CellStyle = currency self._copy_cell(cell_title) self._copy_cell(cell_value) cell_title = self._set_cell(v='Total', cell=cell_title) value = data['total'] cell_value = self._set_cell(v=value, cell=cell_value, value=True) cell_value.CellStyle = currency return def _timbre(self, data): if self._es_pre: return for k, v in data.items(): self._set_cell('{timbre.%s}' % k, v) pd = self._sheet.getDrawPage() image = self._template.createInstance('com.sun.star.drawing.GraphicObjectShape') image.GraphicURL = data['path_cbb'] pd.add(image) s = Size() s.Width = 4150 s.Height = 4500 image.setSize(s) image.Anchor = self._set_cell('{timbre.cbb}') return def _donataria(self, data): if not data: return for k, v in data.items(): self._set_cell('{donataria.%s}' % k, v) return def _ine(self, data): if not data: return for k, v in data.items(): self._set_cell('{ine.%s}' % k, v) return def _render(self, data): self._set_search() self._es_pre = data.pop('es_pre', False) self._comprobante(data['comprobante']) self._emisor(data['emisor']) self._receptor(data['receptor']) self._conceptos(data['conceptos']) self._totales(data['totales']) self._timbre(data['timbre']) self._donataria(data['donataria']) self._ine(data['ine']) self._cancelado(data['cancelada']) self._clean() return def pdf(self, path, data): options = {'AsTemplate': True, 'Hidden': True} self._template = self._doc_open(path, options) if self._template is None: return b'' self._render(data) path = '{}.ods'.format(tempfile.mkstemp()[1]) self._template.storeToURL(self._path_url(path), ()) doc = self._doc_open(path, {'Hidden': True}) options = {'FilterName': 'calc_pdf_Export'} path = tempfile.mkstemp()[1] doc.storeToURL(self._path_url(path), self._set_properties(options)) doc.close(True) self._template.close(True) return self._read(path) def to_pdf(data, emisor_rfc): rfc = data['emisor']['rfc'] if DEBUG: rfc = emisor_rfc version = data['comprobante']['version'] if APP_LIBO: app = LIBO() if app.is_running: donativo = '' if data['donativo']: donativo = '_donativo' name = '{}_{}{}.ods'.format(rfc.lower(), version, donativo) path = get_template_ods(name) if path: return app.pdf(path, data) name = '{}_{}.json'.format(rfc, version) custom_styles = get_custom_styles(name) path = get_path_temp() pdf = TemplateInvoice(path) pdf.custom_styles = custom_styles pdf.data = data pdf.render() return read_file(path) def parse_xml(xml): return ET.fromstring(xml) def get_dict(data): return CaseInsensitiveDict(data) def to_letters(value, moneda): monedas = { 'MXN': 'peso', 'USD': 'dólar', 'EUR': 'euro', } return NumLet(value, monedas.get(moneda, moneda)).letras def get_qr(data): path = tempfile.mkstemp()[1] qr = pyqrcode.create(data, mode='binary') qr.png(path, scale=7) return path def _get_relacionados(doc, version): node = doc.find('{}CfdiRelacionados'.format(PRE[version])) if node is None: return '' uuids = ['UUID: {}'.format(n.attrib['UUID']) for n in node.getchildren()] return '\n'.join(uuids) def _comprobante(doc, options): data = CaseInsensitiveDict(doc.attrib.copy()) del data['certificado'] data['seriefolio'] = '{} - {}'.format(data['serie'], data['folio']) data['totalenletras'] = to_letters(float(data['total']), data['moneda']) if data['version'] == '3.3': tipos = { 'I': 'ingreso', 'E': 'egreso', 'T': 'traslado', } data['tipodecomprobante'] = tipos.get(data['tipodecomprobante']) data['lugarexpedicion'] = \ 'C.P. de Expedición: {}'.format(data['lugarexpedicion']) data['metododepago'] = options['metododepago'] data['formadepago'] = options['formadepago'] if 'condicionesdepago' in data: data['condicionesdepago'] = \ 'Condiciones de pago: {}'.format(data['condicionesdepago']) data['moneda'] = options['moneda'] data['tiporelacion'] = options.get('tiporelacion', '') data['relacionados'] = _get_relacionados(doc, data['version']) else: fields = { 'formaDePago': 'Forma de Pago: {}\n', 'metodoDePago': 'Método de pago: {}\n', 'condicionesDePago': 'Condiciones de Pago: {}\n', 'NumCtaPago': 'Número de Cuenta de Pago: {}\n', 'Moneda': 'Moneda: {}\n', 'TipoCambio': 'Tipo de Cambio: {}', } datos = '' for k, v in fields.items(): if k in data: datos += v.format(data[k]) data['datos'] = datos data['hora'] = data['fecha'].split('T')[1] fecha = parser.parse(data['fecha']) try: locale.setlocale(locale.LC_TIME, "es_MX.UTF-8") except: pass data['fechaformato'] = fecha.strftime('%A, %d de %B de %Y') data['tipocambio'] = 'Tipo de Cambio: $ {:0.2f}'.format( float(data['tipocambio'])) return data def _emisor(doc, version, values): emisor = doc.find('{}Emisor'.format(PRE[version])) data = CaseInsensitiveDict(emisor.attrib.copy()) node = emisor.find('{}DomicilioFiscal'.format(PRE[version])) if not node is None: data.update(CaseInsensitiveDict(node.attrib.copy())) if version == '3.2': node = emisor.find('{}RegimenFiscal'.format(PRE[version])) if not node is None: data['regimenfiscal'] = node.attrib['Regimen'] data['regimen'] = node.attrib['Regimen'] else: data['regimenfiscal'] = values['regimenfiscal'] path = _join(PATH_MEDIA, 'logos', '{}.png'.format(data['rfc'].lower())) if is_file(path): data['logo'] = path return data def _receptor(doc, version, values): node = doc.find('{}Receptor'.format(PRE[version])) data = CaseInsensitiveDict(node.attrib.copy()) node = node.find('{}Domicilio'.format(PRE[version])) if not node is None: data.update(node.attrib.copy()) if version == '3.2': return data data['usocfdi'] = values['usocfdi'] return data def _conceptos(doc, version): data = [] conceptos = doc.find('{}Conceptos'.format(PRE[version])) for c in conceptos.getchildren(): values = CaseInsensitiveDict(c.attrib.copy()) if version == '3.3': values['noidentificacion'] = '{}\n(SAT {})'.format( values['noidentificacion'], values['ClaveProdServ']) values['unidad'] = '({})\n{}'.format( values['ClaveUnidad'], values['unidad']) n = c.find('{}CuentaPredial'.format(PRE[version])) if n is not None: v = CaseInsensitiveDict(n.attrib.copy()) info = '\nCuenta Predial Número: {}'.format(v['numero']) values['descripcion'] += info data.append(values) return data def _totales(doc, cfdi, version): data = {} data['moneda'] = doc.attrib['Moneda'] data['subtotal'] = cfdi['subtotal'] if 'descuento' in cfdi: data['descuento'] = cfdi['descuento'] data['total'] = cfdi['total'] tn = { '001': 'ISR', '002': 'IVA', '003': 'IEPS', } traslados = [] retenciones = [] taxlocales = [] imp = doc.find('{}Impuestos'.format(PRE[version])) if imp is not None: tmp = CaseInsensitiveDict(imp.attrib.copy()) for k, v in tmp.items(): data[k] = v node = imp.find('{}Traslados'.format(PRE[version])) if node is not None: for n in node.getchildren(): tmp = CaseInsensitiveDict(n.attrib.copy()) if version == '3.3': title = 'Traslado {} {}'.format( tn.get(tmp['impuesto']), tmp['tasaocuota']) else: title = 'Traslado {} {}'.format(tmp['impuesto'], tmp['tasa']) traslados.append((title, float(tmp['importe']))) node = imp.find('{}Retenciones'.format(PRE[version])) if node is not None: for n in node.getchildren(): tmp = CaseInsensitiveDict(n.attrib.copy()) if version == '3.3': title = 'Retención {} {}'.format( tn.get(tmp['impuesto']), '') else: title = 'Retención {} {}'.format(tmp['impuesto'], '') retenciones.append((title, float(tmp['importe']))) node = doc.find('{}Complemento/{}ImpuestosLocales'.format( PRE[version], PRE['LOCALES'])) if node is not None: for otro in list(node): if otro.tag == '{}RetencionesLocales'.format(PRE['LOCALES']): tipo = 'Retención ' name = 'ImpLocRetenido' tasa = 'TasadeRetencion' else: tipo = 'Traslado ' name = 'ImpLocTrasladado' tasa = 'TasadeTraslado' title = '{} {} {}%'.format( tipo, otro.attrib[name], otro.attrib[tasa]) importe = float(otro.attrib['Importe']) taxlocales.append((title, importe)) data['traslados'] = traslados data['retenciones'] = retenciones data['taxlocales'] = taxlocales return data def _timbre(doc, version, values): CADENA = '||{version}|{UUID}|{FechaTimbrado}|{selloCFD}|{noCertificadoSAT}||' if version == '3.3': CADENA = '||{Version}|{UUID}|{FechaTimbrado}|{SelloCFD}|{NoCertificadoSAT}||' node = doc.find('{}Complemento/{}TimbreFiscalDigital'.format( PRE[version], PRE['TIMBRE'])) data = CaseInsensitiveDict(node.attrib.copy()) total_s = '%017.06f' % float(values['total']) qr_data = '?re=%s&rr=%s&tt=%s&id=%s' % ( values['rfc_emisor'], values['rfc_receptor'], total_s, node.attrib['UUID']) data['path_cbb'] = get_qr(qr_data) data['cadenaoriginal'] = CADENA.format(**node.attrib) return data def _donataria(doc, version, fechadof): node = doc.find('{}Complemento/{}Donatarias'.format( PRE[version], PRE['DONATARIA'])) if node is None: return {} data = CaseInsensitiveDict(node.attrib.copy()) data['fechadof'] = fechadof return data def _ine(doc, version): node = doc.find('{}Complemento/{}INE'.format(PRE[version], PRE['INE'])) if node is None: return {} values = ( ('TipoComite', 'Tipo de Comite: {}'), ('TipoProceso', 'Tipo de Proceso: {}'), ('IdContabilidad', 'ID de Contabilidad: {}'), ) data = CaseInsensitiveDict(node.attrib.copy()) for k, v in values: data[k] = v.format(data[k]) return data def get_data_from_xml(invoice, values): data = {'cancelada': invoice.cancelada, 'donativo': invoice.donativo} doc = parse_xml(invoice.xml) data['comprobante'] = _comprobante(doc, values) version = data['comprobante']['version'] data['emisor'] = _emisor(doc, version, values) data['receptor'] = _receptor(doc, version, values) data['conceptos'] = _conceptos(doc, version) data['totales'] = _totales(doc, data['comprobante'], version) data['donataria'] = _donataria(doc, version, values['fechadof']) data['ine'] = _ine(doc, version) options = { 'rfc_emisor': data['emisor']['rfc'], 'rfc_receptor': data['receptor']['rfc'], 'total': data['comprobante']['total'], } data['timbre'] = _timbre(doc, version, options) del data['timbre']['version'] data['comprobante'].update(data['timbre']) return data def to_zip(*files): zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file: for data, file_name in files: zip_file.writestr(file_name, data) return zip_buffer.getvalue() def make_fields(xml): doc = ET.fromstring(xml) data = CaseInsensitiveDict(doc.attrib.copy()) data.pop('certificado') data.pop('sello') version = data['version'] receptor = doc.find('{}Receptor'.format(PRE[version])) receptor = CaseInsensitiveDict(receptor.attrib.copy()) data['receptor_nombre'] = receptor['nombre'] data['receptor_rfc'] = receptor['rfc'] data = {k.lower(): v for k, v in data.items()} return data def make_info_mail(data, fields): return data.format(**fields).replace('\n', '
') def send_mail(data): msg = '' server = SendMail(data['server']) is_connect = server.is_connect if is_connect: msg = server.send(data['options']) else: msg = server.error server.close() return {'ok': is_connect, 'msg': msg} def get_path_info(path): path, filename = os.path.split(path) name, extension = os.path.splitext(filename) return (path, filename, name, extension) def get_path_temp(): return tempfile.mkstemp()[1] def get_date(value, next_day=False): d = parser.parse(value) if next_day: return d + datetime.timedelta(days=1) return d def upload_file(rfc, opt, file_obj): if opt == 'emisorlogo': tmp = file_obj.filename.split('.') name = '{}.{}'.format(rfc.lower(), tmp[-1].lower()) path = _join(PATH_MEDIA, 'logos', name) elif opt == 'txt_plantilla_factura_32': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_3.2.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'txt_plantilla_factura_33': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_3.3.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'txt_plantilla_factura_33j': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'json': msg = 'Extensión de archivo incorrecta, selecciona un archivo JSON' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_3.3.json'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'txt_plantilla_donataria': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'ods': msg = 'Extensión de archivo incorrecta, selecciona un archivo ODS' return {'status': 'server', 'name': msg, 'ok': False} name = '{}_3.3_donativo.ods'.format(rfc.lower()) path = _join(PATH_MEDIA, 'templates', name) elif opt == 'bdfl': tmp = file_obj.filename.split('.') ext = tmp[-1].lower() if ext != 'sqlite': msg = 'Extensión de archivo incorrecta, selecciona un archivo SQLite' return {'status': 'server', 'name': msg, 'ok': False} name = '{}.sqlite'.format(rfc.lower()) path = _join('/tmp', name) if save_file(path, file_obj.file.read()): return {'status': 'server', 'name': file_obj.filename, 'ok': True} return {'status': 'error', 'ok': False} def cancel_cfdi(uuid, pk12, rfc, auth): from .pac import Finkok as PAC template = read_file(TEMPLATE_CANCEL, 'r') data = { 'rfc': rfc, 'fecha': datetime.datetime.now().isoformat()[:19], 'uuid': str(uuid).upper(), } template = template.format(**data) data = { 'xmlsec': PATH_XMLSEC, 'pk12': save_temp(pk12), 'pass': _get_md5(rfc), 'template': save_temp(template, 'w'), } args = '"{xmlsec}" --sign --pkcs12 "{pk12}" --pwd {pass} ' \ '"{template}"'.format(**data) xml_sign = _call(args) if DEBUG: auth = {} else: if not auth: msg = 'Sin datos para cancelar' result = {'ok': False, 'error': msg} return result msg = 'Factura cancelada correctamente' data = {'ok': True, 'msg': msg, 'row': {'estatus': 'Cancelada'}} pac = PAC(auth) result = pac.cancel_signature(xml_sign) if result: codes = {None: '', 'Could not get UUID Text': 'UUID no encontrado'} if not result['CodEstatus'] is None: data['ok'] = False data['msg'] = codes.get(result['CodEstatus'], result['CodEstatus']) else: data['ok'] = False data['msg'] = pac.error return data, result def run_in_thread(fn): def run(*k, **kw): t = threading.Thread(target=fn, args=k, kwargs=kw) t.start() return t return run def get_bool(value): if not value: return False if value == '1': return True return False def get_float(value): return round(float(value), DECIMALES) def crear_rol(user, contra=''): if not contra: contra = user args = 'psql -U postgres -c "CREATE ROLE {} WITH LOGIN ENCRYPTED ' \ 'PASSWORD \'{}\';"'.format(user, contra) try: result = _call(args) if result == 'CREATE ROLE\n': return True except Exception as e: log.info(e) return False def crear_db(nombre): args = 'psql -U postgres -c "CREATE DATABASE {0} WITH ' \ 'OWNER {0};"'.format(nombre) try: result = _call(args) print (result) if result == 'CREATE DATABASE\n': return True except Exception as e: log.info(e) return False def _to_seafile(path_db, data): # ~ if DEBUG: # ~ return _, filename = os.path.split(path_db) if SEAFILE_SERVER: msg = '\tSincronizando backup general...' log.info(msg) seafile = SeaFileAPI( SEAFILE_SERVER['URL'], SEAFILE_SERVER['USER'], SEAFILE_SERVER['PASS']) if seafile.is_connect: msg = '\tSincronizando: {} '.format(filename) log.info(msg) seafile.update_file( path_db, SEAFILE_SERVER['REPO'], '/', SEAFILE_SERVER['PASS']) msg = '\tRespaldo general de {} sincronizado'.format(filename) log.info(msg) msg = '\tSin datos para sincronización particular de {}'.format(filename) if len(data) < 2: log.info(msg) return if not data[0] or not data[1] or not data[2]: log.info(msg) return msg = '\tSincronizando backup particular...' log.info(msg) seafile = SeaFileAPI(SEAFILE_SERVER['URL'], data[0], data[1]) if seafile.is_connect: msg = '\t\tSincronizando: {} '.format(filename) log.info(msg) seafile.update_file(path_db, data[2], 'Base de datos/', data[1]) msg = '\t\tRespaldo partícular de {} sincronizado'.format(filename) log.info(msg) return @run_in_thread def _backup_and_sync(rfc, data): msg = 'Generando backup de: {}'.format(rfc) log.info(msg) sql = 'select correo_timbrado, token_timbrado, token_soporte from emisor;' path_bk = _join(PATH_MEDIA, 'tmp', '{}.bk'.format(rfc.lower())) if data['type'] == 'postgres': args = 'pg_dump -U postgres -Fc {} > "{}"'.format( data['name'], path_bk) sql = 'psql -U postgres -d {} -Atc "{}"'.format(data['name'], sql) elif data['type'] == 'sqlite': args = 'gzip -c "{}" > "{}"'.format(data['name'], path_bk) sql = 'sqlite3 "{}" "{}"'.format(data['name'], sql) try: result = _call(args) msg = '\tBackup generado de {}'.format(rfc) log.info(msg) result = _call(sql).strip().split('|') _to_seafile(path_bk, result) except Exception as e: log.info(e) return def backup_dbs(): con = sqlite3.connect(COMPANIES) cursor = con.cursor() sql = "SELECT * FROM names" cursor.execute(sql) rows = cursor.fetchall() if rows is None: return cursor.close() con.close() for rfc, data in rows: args = loads(data) _backup_and_sync(rfc, args) return class ImportFacturaLibre(object): def __init__(self, path, rfc): self._rfc = rfc self._con = None self._cursor = None self._error = '' self._is_connect = self._connect(path) self._clientes = [] self._clientes_rfc = [] @property def error(self): return self._error @property def is_connect(self): return self._is_connect def _validate_rfc(self): sql = "SELECT rfc FROM emisor LIMIT 1" self._cursor.execute(sql) obj = self._cursor.fetchone() if obj is None: self._error = 'No se encontró al emisor: {}'.format(self._rfc) return False if not DEBUG: if obj['rfc'] != self._rfc: self._error = 'Los datos no corresponden al RFC: {}'.format(self._rfc) return False return True def _connect(self, path): try: self._con = sqlite3.connect(path) self._con.row_factory = sqlite3.Row self._cursor = self._con.cursor() return self._validate_rfc() except Exception as e: log.error(e) self._error = 'No se pudo conectar a la base de datos' return False def close(self): try: self._cursor.close() self._con.close() except: pass return def import_data(self): data = {} tables = ( ('receptores', 'Socios'), ('cfdfacturas', 'Facturas'), ('categorias', 'Categorias'), ) for source, target in tables: data[target] = self._get_table(source) data['Socios'] += self._clientes return data def _get_table(self, table): return getattr(self, '_{}'.format(table))() def import_productos(self): sql = "SELECT * FROM productos" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('id_categoria', 'categoria'), ('noIdentificacion', 'clave'), ('descripcion', 'descripcion'), ('unidad', 'unidad'), ('valorUnitario', 'valor_unitario'), ('existencia', 'existencia'), ('inventario', 'inventario'), ('codigobarras', 'codigo_barras'), ('CuentaPredial', 'cuenta_predial'), ('precio_compra', 'ultimo_precio'), ('minimo', 'minimo'), ) data = [] sql = """ SELECT nombre, tasa, tipo FROM impuestos, productos, productosimpuestos WHERE productos.id=productosimpuestos.id_producto AND productosimpuestos.id_impuesto=impuestos.id AND productos.id = ? """ for row in rows: new = {t: row[s] for s, t in fields} new['descripcion'] = ' '.join(new['descripcion'].split()) new['clave_sat'] = DEFAULT_SAT_PRODUCTO self._cursor.execute(sql, (row['id'],)) impuestos = self._cursor.fetchall() new['impuestos'] = tuple(impuestos) data.append(new) return data def _categorias(self): sql = "SELECT * FROM categorias ORDER BY id_padre" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('id', 'id'), ('categoria', 'categoria'), ('id_padre', 'padre'), ) data = [] for row in rows: new = {t: row[s] for s, t in fields} if new['padre'] == 0: new['padre'] = None data.append(new) return data def _get_cliente(self, invoice): sql = "SELECT rfc, nombre FROM receptores WHERE id=?" self._cursor.execute(sql, [invoice['id_cliente']]) obj = self._cursor.fetchone() if not obj is None: data = { 'rfc': obj['rfc'], 'slug': to_slug(obj['nombre']), } return data if not invoice['xml']: return {} doc = parse_xml(invoice['xml']) version = doc.attrib['version'] node = doc.find('{}Receptor'.format(PRE[version])) rfc = node.attrib['rfc'] nombre = node.attrib['nombre'] tipo_persona = 1 if rfc == 'XEXX010101000': tipo_persona = 4 elif rfc == 'XAXX010101000': tipo_persona = 3 elif len(rfc) == 12: tipo_persona = 2 data = { 'tipo_persona': tipo_persona, 'rfc': rfc, 'nombre': nombre, 'slug': to_slug(nombre), 'es_cliente': True, 'es_activo': False, } if not rfc in self._clientes_rfc: self._clientes_rfc.append(rfc) self._clientes.append(data) data = { 'rfc': data['rfc'], 'slug': data['slug'], } return data def _get_detalles(self, id): sql = "SELECT * FROM cfddetalle WHERE id_cfd=?" self._cursor.execute(sql, [id]) rows = self._cursor.fetchall() fields = ( ('categoria', 'categoria'), ('cantidad', 'cantidad'), ('unidad', 'unidad'), ('noIdentificacion', 'clave'), ('descripcion', 'descripcion'), ('valorUnitario', 'valor_unitario'), ('importe', 'importe'), ('numero', 'pedimento'), ('fecha', 'fecha_pedimento'), ('aduana', 'aduana'), ('CuentaPredial', 'cuenta_predial'), ('alumno', 'alumno'), ('curp', 'curp'), ('nivel', 'nivel'), ('autorizacion', 'autorizacion'), ) data = [] for row in rows: new = {t: row[s] for s, t in fields} data.append(new) return data def _get_impuestos(self, id): sql = "SELECT * FROM cfdimpuestos WHERE id_cfd=?" self._cursor.execute(sql, [id]) rows = self._cursor.fetchall() tasas = { '16': 0.16, '11': 0.11, '-10': 0.10, '0': 0.0, '-2/3': 0.106667, '-0.5': 0.005, } data = [] for row in rows: filtro = { 'name': row['nombre'], 'tasa': tasas[row['tasa']], 'tipo': row['tipo'][0], } new = { 'importe': row['importe'], 'filtro': filtro } data.append(new) return data def _cfdfacturas(self): sql = "SELECT * FROM cfdfacturas" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('version', 'version'), ('serie', 'serie'), ('folio', 'folio'), ('fecha', 'fecha'), ('fecha_timbrado', 'fecha_timbrado'), ('formaDePago', 'forma_pago'), ('condicionesDePago', 'condiciones_pago'), ('subTotal', 'subtotal'), ('descuento', 'descuento'), ('TipoCambio', 'tipo_cambio'), ('Moneda', 'moneda'), ('total', 'total'), ('tipoDeComprobante', 'tipo_comprobante'), ('metodoDePago', 'metodo_pago'), ('LugarExpedicion', 'lugar_expedicion'), ('totalImpuestosRetenidos', 'total_retenciones'), ('totalImpuestosTrasladados', 'total_traslados'), ('xml', 'xml'), ('id_cliente', 'cliente'), ('notas', 'notas'), ('uuid', 'uuid'), ('donativo', 'donativo'), ('estatus', 'estatus'), ('regimen', 'regimen_fiscal'), ('xml_acuse', 'acuse'), ) data = [] for row in rows: new = {t: row[s] for s, t in fields} if not new['uuid']: new['uuid'] = None if new['xml'] is None: new['xml'] = '' if row['estatus'] == 'Pagada': new['pagada'] = True elif row['estatus'] == 'Cancelada': new['cancelada'] = True new['total_mn'] = round(row['TipoCambio'] * row['total'], 2) new['detalles'] = self._get_detalles(row['id']) new['impuestos'] = self._get_impuestos(row['id']) new['cliente'] = self._get_cliente(row) data.append(new) return data def _receptores(self): sql = "SELECT * FROM receptores" self._cursor.execute(sql) rows = self._cursor.fetchall() fields = ( ('rfc', 'rfc'), ('nombre', 'nombre'), ('calle', 'calle'), ('noExterior', 'no_exterior'), ('noInterior', 'no_interior'), ('colonia', 'colonia'), ('municipio', 'municipio'), ('estado', 'estado'), ('pais', 'pais'), ('codigoPostal', 'codigo_postal'), ('extranjero', 'es_extranjero'), ('activo', 'es_activo'), ('fechaalta', 'fecha_alta'), ('notas', 'notas'), ('cuentaCliente', 'cuenta_cliente'), ('cuentaProveedor', 'cuenta_proveedor'), ('saldoCliente', 'saldo_cliente'), ('saldoProveedor', 'saldo_proveedor'), ('esCliente', 'es_cliente'), ('esProveedor', 'es_proveedor'), ) data = [] sql1 = "SELECT correo FROM correos WHERE id_cliente=?" sql2 = "SELECT telefono FROM telefonos WHERE id_cliente=?" for row in rows: new = {t: row[s] for s, t in fields} new['slug'] = to_slug(new['nombre']) if new['es_extranjero']: new['tipo_persona'] = 4 elif new['rfc'] == 'XAXX010101000': new['tipo_persona'] = 3 elif len(new['rfc']) == 12: new['tipo_persona'] = 2 self._cursor.execute(sql1, (row['id'],)) tmp = self._cursor.fetchall() if tmp: new['correo_facturas'] = ', '.join([r[0] for r in tmp]) self._cursor.execute(sql2, (row['id'],)) tmp = self._cursor.fetchall() if tmp: new['telefonos'] = ', '.join([r[0] for r in tmp]) data.append(new) return data