dungdm93 / sqlalchemy-trino Goto Github PK
View Code? Open in Web Editor NEWTrino (f.k.a PrestoSQL) dialect for SQLAlchemy.
License: Apache License 2.0
Trino (f.k.a PrestoSQL) dialect for SQLAlchemy.
License: Apache License 2.0
Originally filed as sqlalchemy/sqlalchemy#7905
Here is an issue filed against Pandas that connects to a Trino database and yields a bad result when using SQLAlchemy:
Trino Iceberg connector with Hive metastore.
Here is the reproducible example (copied from Pandas issue):
#!/usr/bin/env python
from dotenv import dotenv_values, load_dotenv
import osc_ingest_trino as osc
import os
import pathlib
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path=dotenv_path,override=True)
import trino
from sqlalchemy.engine import create_engine
env_var_prefix = 'TRINO'
sqlstring = 'trino://{user}@{host}:{port}/'.format(
user = os.environ[f'{env_var_prefix}_USER'],
host = os.environ[f'{env_var_prefix}_HOST'],
port = os.environ[f'{env_var_prefix}_PORT']
)
sqlargs = {
'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
'http_scheme': 'https',
'catalog': 'osc_datacommons_dev'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()
ingest_catalog = 'osc_datacommons_dev'
ingest_schema = 'sandbox'
import pandas as pd
ticker_df = pd.DataFrame({'tname':['aapl','msft','goog','amzn','tsla','fb','ge','brk-a'],
'cik':[320193,789019,1652044,1018724,1318605,1326801,40545,1067983]}).convert_dtypes()
ingest_table = 'ticker_test'
columnschema = osc.create_table_schema_pairs(ticker_df)
qres = engine.execute(f"drop table if exists {ingest_catalog}.{ingest_schema}.{ingest_table}")
qres.fetchall()
tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{ingest_table}(
{columnschema}
) with (
partitioning = array['bucket(tname,20)'],
format = 'ORC'
)
"""
print(tabledef)
qres = engine.execute(tabledef)
print(qres.fetchall())
ticker_df.to_sql(ingest_table,
con=engine, schema=ingest_schema, if_exists='append',
index=False,
method=osc.TrinoBatchInsert(batch_size = 12000, verbose = True))
qres = engine.execute(f"show create table {ingest_schema}.{ingest_table}")
orc_table = qres.fetchall()
print(orc_table)
ticker_df.to_sql(ingest_table,
con=engine, schema=ingest_schema, if_exists='replace',
index=False,
method=osc.TrinoBatchInsert(batch_size = 12000, verbose = True))
qres = engine.execute(f"show create table {ingest_schema}.{ingest_table}")
replaced_table = qres.fetchall()
assert(orc_table==replaced_table)
We are using Trino version 373, Pandas 1.4.1, and SQLAlchemy 1.4.27. Here are the details reported to the Pandas team:
python : 3.8.8.final.0
python-bits : 64
OS : Linux
OS-release : 4.18.0-305.34.2.el8_4.x86_64
Version : pandas-dev/pandas#1 SMP Mon Jan 17 09:42:23 EST 2022
machine : x86_64
processor : x86_64
byteorder : little
LC_ALL : en_US.UTF-8
LANG : en_US.UTF-8
LOCALE : en_US.UTF-8
pandas : 1.4.2
numpy : 1.22.3
pytz : 2021.3
dateutil : 2.8.2
pip : 22.0.4
setuptools : 60.9.3
Cython : None
pytest : None
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : 4.6.4
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 3.0.3
IPython : 8.1.1
pandas_datareader: None
bs4 : 4.6.3
bottleneck : None
brotli : None
fastparquet : 0.8.1
fsspec : 2022.3.0
gcsfs : None
markupsafe : 2.1.0
matplotlib : None
numba : None
numexpr : None
odfpy : None
openpyxl : 3.0.9
pandas_gbq : None
pyarrow : 7.0.0
pyreadstat : None
pyxlsb : None
s3fs : None
scipy : None
snappy : None
sqlalchemy : 1.4.27
tables : None
tabulate : 0.8.9
xarray : None
xlrd : None
xlwt : None
zstandard : None
I'm playing around with something like the following, trying to create (or populate) a table with trino iceberg connector:
df = pd.DataFrame({
"version":[1,1,1],
"value":[1,2,3]
})
df.to_sql('test1',
schema='eje_test_iceberg',
con=engine,
method=None,
if_exists="replace",
index=False,
dtype={"version":Integer(),"value":Integer()}
)
Getting the following error:
NotSupportedError: (trino.exceptions.NotSupportedError)
[SQL: INSERT INTO eje_test_iceberg.test2 (version, value) VALUES (?, ?)]
[parameters: ((1, 1), (1, 2), (1, 3))]
Hi,
We have a Trino instance with LDAP and HTTPS enabled but we can't seem to get Superset to connect with our Trino instance.
What we did:
The error message we keep getting in the log is as follows:
[2021-02-25 20:00:00 +0800] [240613] [WARNING] Error sending message to statsd
Traceback (most recent call last):
File "/root/superset_env/env/lib/python3.8/site-packages/gunicorn/instrument/statsd.py", line 127, in _sock_send
self.sock.send(msg)
ConnectionRefusedError: [Errno 111] Connection refused
We confirmed the following:
We can't find much documentations online for this issue.
This is a query rather than an issue. I am deciding in between of using Presto or Trino in the Superset to query data in Minio/S3. My understanding is that Trino is a rebranded of presto.
Will someone able to point out what are the differences between these 2 drivers in Superset?
Thank you and best regards.
~/anaconda3/lib/python3.8/site-packages/sqlalchemy_trino/result.py in <module>
1 from typing import Optional
2
----> 3 from sqlalchemy.engine.result import ResultProxy, ResultMetaData
4 from trino.client import TrinoQuery, TrinoResult # noqa
5 from trino.dbapi import Cursor
ImportError: cannot import name 'ResultProxy' from 'sqlalchemy.engine.result' (/home/ray/anaconda3/lib/python3.8/site-packages/sqlalchemy/engine/result.py)
looks like a SQLAlchemy Version issue>
I am working in a database that only works with the following configuration:
from trino import dbapi, auth
conn = dbapi.connect(
host=host,
port=port,
user=username,
catalog=dialect,
schema=database_name,
http_scheme='https',
auth=auth.BasicAuthentication(username, password),
)
conn._http_session.verify = False
Auth basic, http_scheme and http_session.verify can be provide in con url?
Hello,
Since SQLAlchemy 1.4.5 it is required from third party dialects to set whether they comply with statement cache with supports_statement_cache
attribute. More details in the docs.
If the attribute is not set the warnings like
lib/python3.8/site-packages/sqlalchemy_trino/dialect.py:259: SAWarning: Dialect trino:rest will not make use of SQL compilation caching as it does not set the 'supports_statement_cache' attribute to ``True``.
This can have significant performance implications including some performance degradations in comparison to prior SQLAlchemy versions.
Dialect maintainers should seek to set this attribute to True after appropriate development and testing for SQLAlchemy 1.4 caching support.
Alternatively, this attribute may be set to False which will disable this warning. (Background on this error at: https://sqlalche.me/e/14/cprf)
are raised.
Trino JDBC driver supports connection parameter sessionUser
(to support user impersonation) since release 352. see trinodb/trino#6549
Since Apache superset supports the impersonation for Trino, but it looks like the implementation (apache/superset#14843) needs adjustment.
Supports parameter sessionUser
from query string, so that it can be send to Trino via the header X-Trino-User
.
Notice that this differs from the user from SQLAlchemy connection string which acts as principal (default value for X-Trino-User
) for authentication purpose.
Hi, the following code failed with Error TypeError: can't pickle PyCapsule objects
auth = trino.auth.KerberosAuthentication(service_name=KerberosRemoteServiceName,principal=principal, ca_bundle=ca_bundle)
from sqlalchemy.engine import create_engine
engine = create_engine(
f'trino://{username}@{presto_host}:{presto_port_ssl}/',
connect_args={'auth': auth, 'http_scheme':protocol, 'catalog':catalog},
)
table_name='chenhao_test_pandastotrino2'
schema="uip_chenhao_db"
query = dedent('''
SELECT "table_name"
FROM "information_schema"."tables"
WHERE "table_schema" = :schema
AND "table_name" = :table
''').strip()
sql = sqlalchemy.sql.text(query)
res = engine.execute(sql, schema=schema, table=table_name)
Here is the Error
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_2376/3114537824.py in <module>
----> 1 res = engine.execute(sql, schema=schema, table=table_name)
/opt/python3common/lib/python3.7/site-packages/sqlalchemy/engine/base.py in execute(self, statement, *multiparams, **params)
2233
2234 connection = self._contextual_connect(close_with_result=True)
-> 2235 return connection.execute(statement, *multiparams, **params)
2236
2237 def scalar(self, statement, *multiparams, **params):
/opt/python3common/lib/python3.7/site-packages/sqlalchemy/engine/base.py in execute(self, object_, *multiparams, **params)
1009 )
1010 else:
-> 1011 return meth(self, multiparams, params)
1012
1013 def _execute_function(self, func, multiparams, params):
/opt/python3common/lib/python3.7/site-packages/sqlalchemy/sql/elements.py in _execute_on_connection(self, connection, multiparams, params)
296 def _execute_on_connection(self, connection, multiparams, params):
297 if self.supports_execution:
--> 298 return connection._execute_clauseelement(self, multiparams, params)
299 else:
300 raise exc.ObjectNotExecutableError(self)
/opt/python3common/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_clauseelement(self, elem, multiparams, params)
1128 distilled_params,
1129 compiled_sql,
-> 1130 distilled_params,
1131 )
1132 if self._has_events or self.engine._has_events:
/opt/python3common/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1315 except BaseException as e:
1316 self._handle_dbapi_exception(
-> 1317 e, statement, parameters, cursor, context
1318 )
1319
/opt/python3common/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
1512 )
1513 else:
-> 1514 util.raise_(exc_info[1], with_traceback=exc_info[2])
1515
1516 finally:
/opt/python3common/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_(***failed resolving arguments***)
180
181 try:
--> 182 raise exception
183 finally:
184 # credit to
/opt/python3common/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1275 if not evt_handled:
1276 self.dialect.do_execute(
-> 1277 cursor, statement, parameters, context
1278 )
1279
/opt/python3common/lib/python3.7/site-packages/sqlalchemy_trino/dialect.py in do_execute(self, cursor, statement, parameters, context)
268 def do_execute(self, cursor: Cursor, statement: str, parameters: Tuple[Any, ...],
269 context: DefaultExecutionContext = None):
--> 270 cursor.execute(statement, parameters)
271 if context and context.should_autocommit:
272 # SQL statement only submitted to Trino server when cursor.fetch*() is called.
/opt/python3common/lib/python3.7/site-packages/trino/dbapi.py in execute(self, operation, params)
390 # At this point the query can be deallocated since it has already
391 # been executed
--> 392 self._deallocate_prepare_statement(added_prepare_header, statement_name)
393
394 else:
/opt/python3common/lib/python3.7/site-packages/trino/dbapi.py in _deallocate_prepare_statement(self, added_prepare_header, statement_name)
342 # Send deallocate statement. Copy the _request object to avoid poluting the
343 # one that is going to be used to execute the actual operation.
--> 344 query = trino.client.TrinoQuery(copy.deepcopy(self._request), sql=sql)
345 result = query.execute(
346 additional_http_headers={
/opt/sfdc/python37/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
178 y = x
179 else:
--> 180 y = _reconstruct(x, memo, *rv)
181
182 # If is its own copy, don't memoize.
/opt/sfdc/python37/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
278 if state is not None:
279 if deep:
--> 280 state = deepcopy(state, memo)
281 if hasattr(y, '__setstate__'):
282 y.__setstate__(state)
/opt/sfdc/python37/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
148 copier = _deepcopy_dispatch.get(cls)
149 if copier:
--> 150 y = copier(x, memo)
151 else:
152 try:
/opt/sfdc/python37/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
238 memo[id(x)] = y
239 for key, value in x.items():
--> 240 y[deepcopy(key, memo)] = deepcopy(value, memo)
241 return y
242 d[dict] = _deepcopy_dict
/opt/sfdc/python37/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
178 y = x
179 else:
--> 180 y = _reconstruct(x, memo, *rv)
181
182 # If is its own copy, don't memoize.
/opt/sfdc/python37/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
278 if state is not None:
279 if deep:
--> 280 state = deepcopy(state, memo)
281 if hasattr(y, '__setstate__'):
282 y.__setstate__(state)
/opt/sfdc/python37/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
148 copier = _deepcopy_dispatch.get(cls)
149 if copier:
--> 150 y = copier(x, memo)
151 else:
152 try:
/opt/sfdc/python37/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
238 memo[id(x)] = y
239 for key, value in x.items():
--> 240 y[deepcopy(key, memo)] = deepcopy(value, memo)
241 return y
242 d[dict] = _deepcopy_dict
/opt/sfdc/python37/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
178 y = x
179 else:
--> 180 y = _reconstruct(x, memo, *rv)
181
182 # If is its own copy, don't memoize.
/opt/sfdc/python37/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
278 if state is not None:
279 if deep:
--> 280 state = deepcopy(state, memo)
281 if hasattr(y, '__setstate__'):
282 y.__setstate__(state)
/opt/sfdc/python37/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
148 copier = _deepcopy_dispatch.get(cls)
149 if copier:
--> 150 y = copier(x, memo)
151 else:
152 try:
/opt/sfdc/python37/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
238 memo[id(x)] = y
239 for key, value in x.items():
--> 240 y[deepcopy(key, memo)] = deepcopy(value, memo)
241 return y
242 d[dict] = _deepcopy_dict
/opt/sfdc/python37/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
148 copier = _deepcopy_dispatch.get(cls)
149 if copier:
--> 150 y = copier(x, memo)
151 else:
152 try:
/opt/sfdc/python37/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
238 memo[id(x)] = y
239 for key, value in x.items():
--> 240 y[deepcopy(key, memo)] = deepcopy(value, memo)
241 return y
242 d[dict] = _deepcopy_dict
/opt/sfdc/python37/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
167 reductor = getattr(x, "__reduce_ex__", None)
168 if reductor:
--> 169 rv = reductor(4)
170 else:
171 reductor = getattr(x, "__reduce__", None)
TypeError: can't pickle PyCapsule objects
Can anyone please take a look? Thanks!
I tried both sqlchemy-trino both 0.3 and 0.4 versions but got the same error.
This is a follow up of trinodb/trino-python-client#79 (released in 0.306.0) where it introduces JWT authentication in trino python client.
A data scientist could use pandas API to create dataframe from a SQL query in Trino requires JWT authentication
For instance:
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine(
'trino://<host>:<port>',
connect_args={'jwt_token': 'a-jwt-token'},
)
sql_df = pd.read_sql(
"SELECT * FROM catalog-a.schema-b.table-c",
con=engine,
parse_dates=[
'created_at',
]
)
Notice that the above code example would work for user/password authentication protected Trino.
In our deployment, we are using Superset to connect to Trino. We recently forked Trino and are now building our own images (so we can incorporate our changes sooner than they get merged upstream). However, the following line is throwing
WARNING:superset.views.base:invalid literal for int() with base 10: '354-161-g84416f6'
sqlalchemy-trino/sqlalchemy_trino/dialect.py
Line 267 in 5af0cc9
After much digging, it seems that the Airlift parent POM sets the Implementation-Version field in the MANIFEST.MF file to the value returned by 'git describe '. That value happens to be parsable as an integer when the commit-id is from a tag creation (since Trino's tags are integers, e.g. '355'). But any other build will have an Implementation-Version that matches the format above.
So my question is... Is there a reason that line 267 tries to coerce this value into an int?
some relevant package versions:
pandas==1.3.2
SQLAlchemy==1.3.0
sqlalchemy-trino==0.3.0
trino==0.305.0
Code:
# running in kubernetes
sqlstring = 'trino://user:[email protected]:443/hive'
engine = sqlalchemy.create_engine(sqlstring)
pd.read_sql('select * from hive.team1.cat', engine)
Error:
/opt/app-root/lib64/python3.8/site-packages/sqlalchemy_trino/result.py in _load(self)
18 return
19 if not self.__cursor.description:
---> 20 raise Exception("Cursor must be loaded before call TrinoResultMetaData")
21 self.__delegator = ResultMetaData(self.__parent, self.__cursor.description)
22
Exception: Cursor must be loaded before call TrinoResultMetaData
Currently, the default catalog for creating connection string is set to hive. This could be a problem if no catalog is specified in the connection URL and then the hive catalog is used even if it is not configured on the Trino side. This leads to a connection error on Superset UI. To avoid this it is better to use a generic catalog such as 'system'.
Slack Thread Reference: https://trinodb.slack.com/archives/CP1MUNEUX/p1619614580484800
Error on Superset UI if connection string does not contain catalog name:
Recently, when I was trying sqlalchemy-trino
with Superset, I was getting an error in spite of setting the correct driver from superset documentation. This commit e1289d4 breaks this library for the older version of trino specifically for version351
or below because there is no version function available which sqlalchemy-trino
uses as initial query while connecting.
trino> select version();
Query 20210417_051800_00001_e3jb2 failed: line 1:8: Function 'version' not registered
select version()
Hi,
we are using Trino with Superset and Iceberg to process and persist our data. We found out that when we use data backed by Iceberg, which's schema contains a Timestamp type field then Superset is unable to download its schema. It fails at
sqlalchemy-trino/sqlalchemy_trino/datatype.py
Line 145 in 5a01b48
In Superset logs, I can see this error.
ERROR:root:too many values to unpack (expected 2)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/flask_appbuilder/api/__init__.py", line 84, in wraps
return f(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/superset/views/base_api.py", line 80, in wraps
duration, response = time_function(f, self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/superset/utils/core.py", line 1368, in time_function
response = func(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/superset/utils/log.py", line 224, in wrapper
value = f(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/superset/databases/api.py", line 489, in table_metadata
table_info = get_table_metadata(database, table_name, schema_name)
File "/usr/local/lib/python3.8/site-packages/superset/databases/utils.py", line 73, in get_table_metadata
indexes = get_indexes_metadata(database, table_name, schema_name)
File "/usr/local/lib/python3.8/site-packages/superset/databases/utils.py", line 38, in get_indexes_metadata
indexes = database.get_indexes(table_name, schema_name)
File "/usr/local/lib/python3.8/site-packages/superset/models/core.py", line 624, in get_indexes
indexes = self.inspector.get_indexes(table_name, schema)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 513, in get_indexes
return self.dialect.get_indexes(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy_trino/dialect.py", line 192, in get_indexes
partitioned_columns = self._get_columns(connection, f'{table_name}$partitions', schema, **kw)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy_trino/dialect.py", line 118, in _get_columns
type=datatype.parse_sqltype(record.data_type),
File "/usr/local/lib/python3.8/site-packages/sqlalchemy_trino/datatype.py", line 145, in parse_sqltype
name, attr_type_str = split(attr_str.strip(), delimiter=' ')
ValueError: too many values to unpack (expected 2)
I am quite sure that there is a problem with a line
sqlalchemy-trino/sqlalchemy_trino/datatype.py
Line 145 in 5a01b48
Cause I tried to run SQL query for retrieving data types and I find out that there are data types that are not handled correctly.
Explicitly there is data type row(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint)
which is split by ',' character and you get something like "min timestamp(6) with time zone" what you are trying to split by ' ' character into two attributes.
Do you have any suggestions on how to solve it?
Thanks for creating the library! When I run the following command, I get the following exception:
import sqlalchemy as db
engine = db.create_engine('trino://CREDENTIALS@TRINO_URL:443/metriql')
with engine.connect() as con:
statement = text("""SELECT 1""")
rs = con.execute(statement)
for row in rs:
print(row)
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2262, in _wrap_pool_connect
return fn()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 294, in unique_connection
return _ConnectionFairy._checkout(self)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 751, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 483, in checkout
rec = pool._do_get()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/pool/impl.py", line 138, in _do_get
self._dec_overflow()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 129, in reraise
raise value
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/pool/impl.py", line 135, in _do_get
return self._create_connection()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 299, in _create_connection
return _ConnectionRecord(self)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 428, in __init__
self.__connect(first_connect_check=True)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 638, in __connect
pool.dispatch.first_connect.for_modify(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 287, in exec_once
self(*args, **kw)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 297, in __call__
fn(*args, **kw)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 1443, in go
return once_fn(*arg, **kw)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/strategies.py", line 199, in first_connect
dialect.initialize(c)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 288, in initialize
self.server_version_info = self._get_server_version_info(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy_trino/dialect.py", line 270, in _get_server_version_info
res = connection.execute(sql.text(query)).scalar()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1315, in scalar
row = self.first()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1300, in first
return self.process_rows([row])[0]
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1181, in process_rows
keymap = metadata._keymap
File "/usr/local/lib/python3.9/site-packages/sqlalchemy_trino/result.py", line 24, in __getattr__
self._load()
File "/usr/local/lib/python3.9/site-packages/sqlalchemy_trino/result.py", line 20, in _load
raise Exception("Cursor must be loaded before call TrinoResultMetaData")
Exception: Cursor must be loaded before call TrinoResultMetaData
Trino returns the metadata in a lazy way if fetching metadata is expensive and the library assumes that metadata is available when the query is constructed.
The current implementation will keep using default TCP port number 8080
from underneath DB-API driver even using HTTPS:
sqlalchemy-trino/sqlalchemy_trino/dialect.py
Lines 92 to 94 in 5aea362
However, this could be documented or it will fail the connection with Connection timeout to port 8080
.
I found this error message from superset:
F [SupersetError(message="HTTPSConnectionPool(host='<trino-url>', port=8080): Max retries exceeded with url: /v1/statement (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f722688acd0>, 'Connection to <trino-url> timed out. (connect timeout=30.0)'))", error_type=<SupersetErrorType.GENERIC_DB_ENGINE_ERROR: 'GENERIC_DB_ENGINE_ERROR'>, level=<ErrorLevel.ERROR: 'error'>, extra={'engine_name': 'Trino', 'issue_codes': [{'code': 1002, 'message': 'Issue 1002 - The database returned an unexpected error.'}]})]
Which is obviously a problem of the port number (should use HTTPS port number instead of 8080
)
Hi. I have installed a Trino database driver and successfully connected Superset to Trino (which itself is connected to the Pinot database). However, I have encountered issues when trying to add a table to Superset. The following the error message I get in the console when attempting to add:
trino.exceptions.TrinoUserError: TrinoUserError(type=USER_ERROR, name=NOT_FOUND, message="Table 'default.tablename$partitions' not found", query_id=20210303_142123_00066_gfkn6)
According to Beto Dealmeida, it is a bug in the connector when the call to partitions is done when trying to figure out the latest partition.
When Superset tries to obtain metadata on a Pinot table to show the table schema. it fails with a An error occurred while fetching table metadata
message.
Looking at the superset logs I see the following error:
ERROR:root:TrinoUserError(type=USER_ERROR, name=NOT_FOUND, message="Table 'default.hitexecutionview$partitions' not found", query_id=20210616_104957_00099_unjsj)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/flask_appbuilder/api/__init__.py", line 84, in wraps
return f(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/superset/utils/log.py", line 70, in wrapper
value = f(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/superset/views/base_api.py", line 79, in wraps
duration, response = time_function(f, self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/superset/utils/core.py", line 1311, in time_function
response = func(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/superset/databases/api.py", line 454, in table_metadata
table_info = get_table_metadata(database, table_name, schema_name)
File "/usr/local/lib/python3.8/site-packages/superset/databases/utils.py", line 73, in get_table_metadata
indexes = get_indexes_metadata(database, table_name, schema_name)
File "/usr/local/lib/python3.8/site-packages/superset/databases/utils.py", line 38, in get_indexes_metadata
indexes = database.get_indexes(table_name, schema_name)
File "/usr/local/lib/python3.8/site-packages/superset/models/core.py", line 623, in get_indexes
return self.inspector.get_indexes(table_name, schema)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/reflection.py", line 513, in get_indexes
return self.dialect.get_indexes(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy_trino/dialect.py", line 195, in get_indexes
partitioned_columns = self._get_columns(connection, f'{table_name}$partitions', schema, **kw)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy_trino/dialect.py", line 118, in _get_columns
for record in res:
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/result.py", line 1010, in __iter__
row = self.fetchone()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/result.py", line 1343, in fetchone
self.connection._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1514, in _handle_dbapi_exception
util.raise_(exc_info[1], with_traceback=exc_info[2])
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/result.py", line 1336, in fetchone
row = self._fetchone_impl()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/result.py", line 1215, in _fetchone_impl
return self.cursor.fetchone()
File "/usr/local/lib/python3.8/site-packages/trino/dbapi.py", line 427, in fetchone
return next(self._iterator)
File "/usr/local/lib/python3.8/site-packages/trino/client.py", line 456, in __iter__
rows = self._query.fetch()
File "/usr/local/lib/python3.8/site-packages/trino/client.py", line 532, in fetch
status = self._request.process(response)
File "/usr/local/lib/python3.8/site-packages/trino/client.py", line 402, in process
raise self._process_error(response["error"], response.get("id"))
trino.exceptions.TrinoUserError: TrinoUserError(type=USER_ERROR, name=NOT_FOUND, message="Table 'default.hitexecutionview$partitions' not found", query_id=20210616_104957_00099_unjsj)
It looks like sqlachemy-trino package is not setting the catalog as pinot?
sqlalchemy-trino version: 0.3.0
I am getting errors while connecting Trino server which is configured with LDAP/SSL.
>>> from sqlalchemy import *
>>> from sqlalchemy.engine import create_engine
>>> from sqlalchemy.schema import *
>>> engine = create_engine('trino://trino:[email protected]:8443/example/', connect_args={ "verify": "/u01/venv/lib/python3.8/site-packages/superset/trino.pem","auth": "ldap"})
>>>
>>> with engine.connect() as con:
... rs = con.execute('SELECT now()')
... for row in rs:
... print(row)
...
Traceback (most recent call last):
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1204, in _execute_context
context = constructor(dialect, self, conn, *args)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 821, in _init_compiled
self.cursor = self.create_cursor()
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 1188, in create_cursor
return self._dbapi_connection.cursor()
File "/u01/venv/lib/python3.8/site-packages/trino/dbapi.py", line 169, in cursor
request = self._create_request()
File "/u01/venv/lib/python3.8/site-packages/trino/dbapi.py", line 144, in _create_request
return trino.client.TrinoRequest(
File "/u01/venv/lib/python3.8/site-packages/trino/client.py", line 236, in __init__
self._auth.set_http_session(self._http_session)
AttributeError: 'str' object has no attribute 'set_http_session'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2263, in connect
return self._connection_cls(self, **kwargs)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 104, in __init__
else engine.raw_connection()
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2369, in raw_connection
return self._wrap_pool_connect(
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2336, in _wrap_pool_connect
return fn()
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 304, in unique_connection
return _ConnectionFairy._checkout(self)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 778, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 495, in checkout
rec = pool._do_get()
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 140, in _do_get
self._dec_overflow()
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.raise_(
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 137, in _do_get
return self._create_connection()
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 309, in _create_connection
return _ConnectionRecord(self)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 440, in __init__
self.__connect(first_connect_check=True)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 664, in __connect
pool.dispatch.first_connect.for_modify(
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/event/attr.py", line 314, in exec_once_unless_exception
self._exec_once_impl(True, *args, **kw)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/event/attr.py", line 285, in _exec_once_impl
self(*args, **kw)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/event/attr.py", line 322, in __call__
fn(*args, **kw)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 1406, in go
return once_fn(*arg, **kw)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py", line 199, in first_connect
dialect.initialize(c)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 311, in initialize
self.server_version_info = self._get_server_version_info(
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy_trino/dialect.py", line 259, in _get_server_version_info
res = connection.execute(sql.text(query)).scalar()
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1206, in _execute_context
self._handle_dbapi_exception(
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1204, in _execute_context
context = constructor(dialect, self, conn, *args)
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 821, in _init_compiled
self.cursor = self.create_cursor()
File "/u01/venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 1188, in create_cursor
return self._dbapi_connection.cursor()
File "/u01/venv/lib/python3.8/site-packages/trino/dbapi.py", line 169, in cursor
request = self._create_request()
File "/u01/venv/lib/python3.8/site-packages/trino/dbapi.py", line 144, in _create_request
return trino.client.TrinoRequest(
File "/u01/venv/lib/python3.8/site-packages/trino/client.py", line 236, in __init__
self._auth.set_http_session(self._http_session)
sqlalchemy.exc.StatementError: (builtins.AttributeError) 'str' object has no attribute 'set_http_session'
[SQL: SELECT version()]
>>>
>>>
I am not well versed with Python, Like to know where could be the issue.
Python Version: 3.8
sqlalchemy_trino-0.4.1
Trino 372
Thanks
It seems like the official Trino Python DB API client now supports SQLAlchemy bindings and was wondering whether this package should be deprecated?
Currently the only way I know to tell sqlalchemy-trino about trino catalog name is in the connection url.
That works pretty well but it would be conceptually cleaner to support df.to_sql(catalog=name, schema=name ...)
Follwing example in the README
from sqlalchemy.engine import create_engine
def load_df():
engine = create_engine("trino://trino:8080/tpch/sf1")
with engine.connect() as connection:
df = pd.read_sql("SELECT * FROM public.foobar", connection)
print(df)
I get the following error:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy_trino.dialect.TrinoDialect object at 0x4025246be0>, dbapi_conn = <trino.dbapi.Connection object at 0x402526ad00>
def get_isolation_level(self, dbapi_conn: trino_dbapi.Connection) -> str:
level_names = ['AUTOCOMMIT',
'READ_UNCOMMITTED',
'READ_COMMITTED',
'REPEATABLE_READ',
'SERIALIZABLE']
> return level_names[dbapi_conn.isolation_level]
E TypeError: list indices must be integers or slices, not IsolationLevel
/usr/local/lib/python3.8/site-packages/sqlalchemy_trino/dialect.py:322: TypeError
It's probably me being silly somewhere. But I'm new to Trino and any help will be appreciated.
Many Thanks
Hello,
I've connected trino to superset and am able to load data into superset from trino. I have tried to enable CSV upload to my trino cluster from superset, but am receiving an error:
Unable to upload CSV file "sqllab_query_sf_population_polygons_20210315T102218.csv" to table "sample" in database "LAKE". Error message: 'Cursor' object has no attribute 'lastrowid'
I'm assuming this is a sqlalchemy-trino
issue as that is the driver I am using. CSV upload to other databases, sich as Postgres, works fine.
Here is an example of my dataset.
> cat sqllab_query_sf_population_polygons_20210315T102218.csv | column -t -s ","
zipcode population area
94107 26599 6.11
94105 5846 1.23
94129 3183 6.11
94121 41203 5.69
94118 38319 4.07
94123 23088 2.74
94133 26237 1.91
94109 55984 2.74
94111 3713 0.9
94104 406 0.18
I appreciate that there are a lot of moving parts here, and that the error could be in superset, or pandas, or trino-sqlalchemy, or trino, or my minio-based backing store.
Any suggestions on how I might go about eliminating trino-sqlalchemy as the root cause?
Thanks for putting this project together! It's really great :)
We started using it in our analytics work and ran into an issue when trying to leverage the pandas/sqlalchemy integration. The following code throws the error "Cursor must be loaded before call TrinoResultMetaData":
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine("trino://localhost:8086/s3_testing")
df = pd.read_sql("SELECT 1", engine)
Output:
~/anaconda/lib/python3.7/site-packages/sqlalchemy_trino/result.py in _load(self)
18 return
19 if not self._is_fetched():
---> 20 raise Exception("Cursor must be loaded before call TrinoResultMetaData")
21 self.__delegator = ResultMetaData(self.__parent, self.__cursor.description)
22
Exception: Cursor must be loaded before call TrinoResultMetaData
Any idea what might cause that?
While trying to fetch table comments, It's not work if the catalog does not support the feature. I got an error on iceberg catalog:
(snip)
trino.exceptions.TrinoUserError: TrinoUserError(type=USER_ERROR, name=NOT_SUPPORTED, message="Invalid Iceberg table name (unknown type 'properties'): mytbl$properties", query_id=20210502_013728_03976_krdnf)
Add support for args/params for extraCredentials to be passed via headers to trino
Thinking this link may be of use:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.