Start unit test

This commit is contained in:
Mauricio Baeza 2020-08-25 12:32:58 -05:00
parent b7dbbc1ebb
commit 4527ca3da6
3 changed files with 2378 additions and 78 deletions

View File

@ -23,7 +23,10 @@ import getpass
import logging
import os
import platform
import socket
import subprocess
import sys
import time
from enum import IntEnum
from functools import wraps
@ -53,7 +56,7 @@ try:
from peewee import Database, DateTimeField, DateField, TimeField, \
__exception_wrapper__
except ImportError as e:
debug('Install peewee')
print('Install peewee')
peewee = None
@ -981,7 +984,7 @@ class FirebirdDatabase(Database):
def last_insert_id(self, cursor, query_type=None):
# ~ debug('LAST_ID', cursor)
return 1
return 0
def rows_affected(self, cursor):
return self._db.rows_affected
@ -991,6 +994,10 @@ class FirebirdDatabase(Database):
return self._db.path
class BaseRow:
pass
class BaseQuery(object):
PY_TYPES = {
'SQL_LONG': 'getLong',
@ -1010,6 +1017,9 @@ class BaseQuery(object):
self._names = query.Columns.ElementNames
self._data = self._get_data()
def __getitem__(self, index):
return self._data[index]
def __iter__(self):
self._index = 0
return self
@ -1029,20 +1039,29 @@ class BaseQuery(object):
value = _struct_to_date(value)
return value
def _get_row(self):
row = BaseRow()
for i in range(1, self._cols + 1):
column_name = self._meta.getColumnName(i)
value = self._to_python(i)
setattr(row, column_name, value)
return row
def _get_data(self):
data = []
while self._query.next():
row = [self._to_python(i) for i in range(1, self._cols + 1)]
data.append(tuple(row))
row = self._get_row()
data.append(row)
return data
@property
def tuples(self):
return tuple(self._data)
data = [tuple(r.__dict__.values()) for r in self._data]
return tuple(data)
@property
def dicts(self):
data = [dict(zip(self._names, row)) for row in self.tuples]
data = [r.__dict__ for r in self._data]
return tuple(data)
@ -1070,17 +1089,6 @@ class LOBase(object):
# ~ setPropertyValue
# ~ setRef
PY_TYPES = {
'SQL_LONG': 'getLong',
'SQL_VARYING': 'getString',
'SQL_FLOAT': 'getFloat',
'SQL_BOOLEAN': 'getBoolean',
'SQL_TYPE_DATE': 'getDate',
'SQL_TYPE_TIME': 'getTime',
'SQL_TIMESTAMP': 'getTimestamp',
}
TYPES_DATE = ('SQL_TYPE_DATE', 'SQL_TYPE_TIME', 'SQL_TIMESTAMP')
def __init__(self, obj, args={}):
self._obj = obj
self._type = BASE
@ -1110,6 +1118,9 @@ class LOBase(object):
db = self._dbc.getByName(self.name)
self._con = db.getConnection('', '')
def __contains__(self, item):
return item in self.tables
@property
def obj(self):
return self._obj
@ -1165,12 +1176,15 @@ class LOBase(object):
return
def _validate_sql(self, sql, params):
limit = ' LIMIT '
for p in params:
sql = sql.replace('?', str(p), 1)
sql = sql.replace('?', f"'{p}'", 1)
if limit in sql:
sql = sql.split(limit)[0]
sql = sql.replace('SELECT', f'SELECT FIRST {params[-1]}')
return sql
def cursor(self, sql, params):
print(1, sql)
if sql.startswith('SELECT'):
sql = self._validate_sql(sql, params)
cursor = self._con.prepareStatement(sql)
@ -1189,7 +1203,6 @@ class LOBase(object):
getattr(cursor, self.DB_TYPES[t])(i, v)
return cursor
@catch_exception
def execute(self, sql, params):
debug(sql, params)
cursor = self.cursor(sql, params)
@ -1204,26 +1217,8 @@ class LOBase(object):
result = cursor.execute(sql)
self.save()
print('RESULT', result)
return result
def _to_python(self, meta, query, index):
i = index + 1
type_field = meta.getColumnTypeName(i)
value = getattr(query, self.PY_TYPES[type_field])(i)
if type_field in self.TYPES_DATE:
value = _struct_to_date(value)
return value
def _query_to_python(self, query):
data = []
meta = query.MetaData
cols = meta.ColumnCount
while query.next():
row = [self._to_python(meta, query, i) for i in range(cols)]
data.append(row)
return data
def select(self, sql):
debug('SELECT', sql)
if not sql.startswith('SELECT'):
@ -1231,7 +1226,6 @@ class LOBase(object):
cursor = self._con.prepareStatement(sql)
query = cursor.executeQuery()
# ~ data = self._query_to_python(query)
return BaseQuery(query)
def get_query(self, query):
@ -1989,6 +1983,8 @@ def __getattr__(name):
return Rectangle()
if name == 'paths':
return Paths()
if name == 'docs':
return LODocs()
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
@ -2002,7 +1998,7 @@ def get_fonts():
return device.FontDescriptors
docs = LODocs()
# ~ docs = LODocs()
sheets = LOSheets()
cells = LOCells()
@ -2170,3 +2166,75 @@ def get_color(value):
COLOR_ON_FOCUS = get_color('LightYellow')
class LOServer(object):
HOST = 'localhost'
PORT = '8100'
ARG = f'socket,host={HOST},port={PORT};urp;StarOffice.ComponentContext'
CMD = ['soffice',
'-env:SingleAppInstance=false',
'-env:UserInstallation=file:///tmp/LO_Process8100',
'--headless', '--norestore', '--invisible',
f'--accept={ARG}']
def __init__(self):
self._server = None
self._ctx = None
self._sm = None
self._start_server()
self._init_values()
def _init_values(self):
global CTX
global SM
if not self.is_running:
return
ctx = uno.getComponentContext()
service = 'com.sun.star.bridge.UnoUrlResolver'
resolver = ctx.ServiceManager.createInstanceWithContext(service, ctx)
self._ctx = resolver.resolve('uno:{}'.format(self.ARG))
self._sm = self._ctx.getServiceManager()
CTX = self._ctx
SM = self._sm
return
@property
def is_running(self):
try:
s = socket.create_connection((self.HOST, self.PORT), 5.0)
s.close()
debug('LibreOffice is running...')
return True
except ConnectionRefusedError:
return False
def _start_server(self):
if self.is_running:
return
for i in range(3):
self._server = subprocess.Popen(self.CMD,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(3)
if self.is_running:
break
return
def stop(self):
if self._server is None:
print('Search pgrep soffice')
else:
self._server.terminate()
debug('LibreOffice is stop...')
return
def create_instance(self, name, with_context=True):
if with_context:
instance = self._sm.createInstanceWithContext(name, self._ctx)
else:
instance = self._sm.createInstance(name)
return instance

2236
source/libo.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,25 +1,22 @@
#!/usr/bin/env python3
# coding: utf-8
import inspect
import unittest
import easymacro as app
from com.sun.star.uno import XInterface
import easymacro2 as app
class BaseTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.server = app.LIBOServer()
cls.server = app.LOServer()
@classmethod
def tearDownClass(cls):
cls.server.stop()
def setUp(self):
msg = 'In method: {}'.format(self._testMethodName)
msg = f'In method: {self._testMethodName}'
app.debug(msg)
# ~ @unittest.SkipTest
@ -27,47 +24,47 @@ class BaseTest(unittest.TestCase):
pass
class TestPruebas(BaseTest):
# ~ class TestPruebas(BaseTest):
def test_new_doc(self):
pass
# ~ def test_new_doc(self):
# ~ pass
class TestDocuments(BaseTest):
# ~ class TestDocuments(BaseTest):
def test_new_doc(self):
result = app.new_doc()
self.assertIsInstance(result, app.LOCalc)
result.close()
# ~ def test_new_doc(self):
# ~ result = app.new_doc()
# ~ self.assertIsInstance(result, app.LOCalc)
# ~ result.close()
def test_get_type_doc(self):
expected = 'calc'
result = app.new_doc()
self.assertEqual(result.type, expected)
result.close()
# ~ def test_get_type_doc(self):
# ~ expected = 'calc'
# ~ result = app.new_doc()
# ~ self.assertEqual(result.type, expected)
# ~ result.close()
class TestTools(BaseTest):
# ~ class TestTools(BaseTest):
def test_create_instance_not_exists(self):
result = app.create_instance('no.exists')
self.assertIsNone(result)
# ~ def test_create_instance_not_exists(self):
# ~ result = app.create_instance('no.exists')
# ~ self.assertIsNone(result)
def test_create_instance(self):
result = app.create_instance('com.sun.star.frame.Desktop')
self.assertIsNotNone(result)
# ~ def test_create_instance(self):
# ~ result = app.create_instance('com.sun.star.frame.Desktop')
# ~ self.assertIsNotNone(result)
def test_set_get_config(self):
expected = 'TEST'
result = app.set_config('test', 'TEST', 'test')
self.assertTrue(result)
result = app.get_config('test', '', 'test')
self.assertEqual(result, expected)
# ~ def test_set_get_config(self):
# ~ expected = 'TEST'
# ~ result = app.set_config('test', 'TEST', 'test')
# ~ self.assertTrue(result)
# ~ result = app.get_config('test', '', 'test')
# ~ self.assertEqual(result, expected)
def test_msgbox(self):
expected = 0
result = app.msgbox('TEST')
self.assertEqual(result, expected)
# ~ def test_msgbox(self):
# ~ expected = 0
# ~ result = app.msgbox('TEST')
# ~ self.assertEqual(result, expected)
class TestVars(BaseTest):
@ -78,7 +75,7 @@ class TestVars(BaseTest):
self.assertEqual(result, expected)
def test_version(self):
expected = '6.2'
expected = '7.0'
result = app.VERSION
self.assertEqual(result, expected)
@ -119,4 +116,3 @@ class TestVars(BaseTest):
if __name__ == '__main__':
unittest.main()