Add support for querys

This commit is contained in:
Mauricio Baeza 2020-08-23 23:53:37 -05:00
parent 93bfb3cb78
commit b7dbbc1ebb
1 changed files with 147 additions and 17 deletions

View File

@ -302,14 +302,17 @@ def _date_to_struct(value):
def _struct_to_date(value):
d = None
if isinstance(value, Time):
d = datetime.time(value.Hours, value.Minutes, value.Seconds)
elif isinstance(value, Date):
d = datetime.date(value.Year, value.Month, value.Day)
if value != Date():
d = datetime.date(value.Year, value.Month, value.Day)
elif isinstance(value, DateTime):
d = datetime.datetime(
value.Year, value.Month, value.Day,
value.Hours, value.Minutes, value.Seconds)
if value.Year > 0:
d = datetime.datetime(
value.Year, value.Month, value.Day,
value.Hours, value.Minutes, value.Seconds)
return d
@ -977,16 +980,74 @@ class FirebirdDatabase(Database):
return cursor
def last_insert_id(self, cursor, query_type=None):
# ~ debug(cursor)
# ~ debug('LAST_ID', cursor)
return 1
def rows_affected(self, cursor):
return self._db.rows_affected
@property
def path(self):
return self._db.path
class BaseQuery(object):
PY_TYPES = {
'SQL_LONG': 'getLong',
'SQL_VARYING': 'getString',
'SQL_FLOAT': 'getFloat',
'SQL_BOOLEAN': 'getBoolean',
'SQL_TYPE_DATE': 'getDate',
'SQL_TYPE_TIME': 'getTime',
'SQL_TIMESTAMP': 'getTimestamp',
}
TYPES_DATE = ('SQL_TYPE_DATE', 'SQL_TYPE_TIME', 'SQL_TIMESTAMP')
def __init__(self, query):
self._query = query
self._meta = query.MetaData
self._cols = self._meta.ColumnCount
self._names = query.Columns.ElementNames
self._data = self._get_data()
def __iter__(self):
self._index = 0
return self
def __next__(self):
try:
row = self._data[self._index]
except IndexError:
raise StopIteration
self._index += 1
return row
def _to_python(self, index):
type_field = self._meta.getColumnTypeName(index)
value = getattr(self._query, self.PY_TYPES[type_field])(index)
if type_field in self.TYPES_DATE:
value = _struct_to_date(value)
return value
def _get_data(self):
data = []
while self._query.next():
row = [self._to_python(i) for i in range(1, self._cols + 1)]
data.append(tuple(row))
return data
@property
def tuples(self):
return tuple(self._data)
@property
def dicts(self):
data = [dict(zip(self._names, row)) for row in self.tuples]
return tuple(data)
class LOBase(object):
TYPES = {
DB_TYPES = {
str: 'setString',
int: 'setInt',
float: 'setFloat',
@ -1009,11 +1070,23 @@ class LOBase(object):
# ~ setPropertyValue
# ~ setRef
PY_TYPES = {
'SQL_LONG': 'getLong',
'SQL_VARYING': 'getString',
'SQL_FLOAT': 'getFloat',
'SQL_BOOLEAN': 'getBoolean',
'SQL_TYPE_DATE': 'getDate',
'SQL_TYPE_TIME': 'getTime',
'SQL_TIMESTAMP': 'getTimestamp',
}
TYPES_DATE = ('SQL_TYPE_DATE', 'SQL_TYPE_TIME', 'SQL_TIMESTAMP')
def __init__(self, obj, args={}):
self._obj = obj
self._type = BASE
self._path = args.get('path', '')
self._dbc = create_instance('com.sun.star.sdb.DatabaseContext')
self._rows_affected = 0
if self._path:
self._name = Path(self._path).name
path_url = _path_url(self._path)
@ -1058,6 +1131,10 @@ class LOBase(object):
tables = [t.Name.lower() for t in self._con.getTables()]
return tables
@property
def rows_affected(self):
return self._rows_affected
def register(self):
if not self.is_registered:
path_url = _path_url(self._path)
@ -1087,27 +1164,80 @@ class LOBase(object):
db.create_tables(tables)
return
def _validate_sql(self, sql, params):
for p in params:
sql = sql.replace('?', str(p), 1)
return sql
def cursor(self, sql, params):
print(1, sql)
if sql.startswith('SELECT'):
sql = self._validate_sql(sql, params)
cursor = self._con.prepareStatement(sql)
return cursor
if not params:
cursor = self._con.createStatement()
return cursor
cursor = self._con.prepareStatement(sql)
for i, v in enumerate(params, 1):
t = type(v)
if not t in self.TYPES:
if not t in self.DB_TYPES:
error('Type not support')
debug((i, t, v, self.TYPES[t]))
getattr(cursor, self.TYPES[t])(i, v)
debug((i, t, v, self.DB_TYPES[t]))
getattr(cursor, self.DB_TYPES[t])(i, v)
return cursor
@catch_exception
def execute(self, sql, params):
debug(sql, params)
if params:
cursor = self.cursor(sql, params)
cursor.execute()
cursor = self.cursor(sql, params)
if sql.startswith('SELECT'):
result = cursor.executeQuery()
elif params:
result = cursor.executeUpdate()
self._rows_affected = result
self.save()
else:
cursor = self._con.createStatement()
cursor.execute(sql)
self.save()
return cursor
result = cursor.execute(sql)
self.save()
print('RESULT', result)
return result
def _to_python(self, meta, query, index):
i = index + 1
type_field = meta.getColumnTypeName(i)
value = getattr(query, self.PY_TYPES[type_field])(i)
if type_field in self.TYPES_DATE:
value = _struct_to_date(value)
return value
def _query_to_python(self, query):
data = []
meta = query.MetaData
cols = meta.ColumnCount
while query.next():
row = [self._to_python(meta, query, i) for i in range(cols)]
data.append(row)
return data
def select(self, sql):
debug('SELECT', sql)
if not sql.startswith('SELECT'):
return ()
cursor = self._con.prepareStatement(sql)
query = cursor.executeQuery()
# ~ data = self._query_to_python(query)
return BaseQuery(query)
def get_query(self, query):
sql, args = query.sql()
sql = self._validate_sql(sql, args)
return self.select(sql)
class LOMath(LODocument):