fix(sqllab): reinstate "Force trino client async execution" (#25680)
This commit is contained in:
parent
e7cdfeeb2c
commit
4e94fc4cff
|
|
@ -52,12 +52,12 @@ Some of the recommended packages are shown below. Please refer to [setup.py](htt
|
||||||
| [Oracle](/docs/databases/oracle) | `pip install cx_Oracle` | `oracle://` |
|
| [Oracle](/docs/databases/oracle) | `pip install cx_Oracle` | `oracle://` |
|
||||||
| [PostgreSQL](/docs/databases/postgres) | `pip install psycopg2` | `postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>` |
|
| [PostgreSQL](/docs/databases/postgres) | `pip install psycopg2` | `postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>` |
|
||||||
| [Presto](/docs/databases/presto) | `pip install pyhive` | `presto://` |
|
| [Presto](/docs/databases/presto) | `pip install pyhive` | `presto://` |
|
||||||
| [Rockset](/docs/databases/rockset) | `pip install rockset-sqlalchemy` | `rockset://<api_key>:@<api_server>`
|
| [Rockset](/docs/databases/rockset) | `pip install rockset-sqlalchemy` | `rockset://<api_key>:@<api_server>` |
|
||||||
| [SAP Hana](/docs/databases/hana) | `pip install hdbcli sqlalchemy-hana or pip install apache-superset[hana]` | `hana://{username}:{password}@{host}:{port}` |
|
| [SAP Hana](/docs/databases/hana) | `pip install hdbcli sqlalchemy-hana or pip install apache-superset[hana]` | `hana://{username}:{password}@{host}:{port}` |
|
||||||
| [StarRocks](/docs/databases/starrocks) | `pip install starrocks` | `starrocks://<User>:<Password>@<Host>:<Port>/<Catalog>.<Database>` |
|
| [StarRocks](/docs/databases/starrocks) | `pip install starrocks` | `starrocks://<User>:<Password>@<Host>:<Port>/<Catalog>.<Database>` |
|
||||||
| [Snowflake](/docs/databases/snowflake) | `pip install snowflake-sqlalchemy` | `snowflake://{user}:{password}@{account}.{region}/{database}?role={role}&warehouse={warehouse}` |
|
| [Snowflake](/docs/databases/snowflake) | `pip install snowflake-sqlalchemy` | `snowflake://{user}:{password}@{account}.{region}/{database}?role={role}&warehouse={warehouse}` |
|
||||||
| SQLite | No additional library needed | `sqlite://` |
|
| SQLite | No additional library needed | `sqlite://path/to/file.db?check_same_thread=false` |
|
||||||
| [SQL Server](/docs/databases/sql-server) | `pip install pymssql` | `mssql+pymssql://` |
|
| [SQL Server](/docs/databases/sql-server) | `pip install pymssql` | `mssql+pymssql://` |
|
||||||
| [Teradata](/docs/databases/teradata) | `pip install teradatasqlalchemy` | `teradatasql://{user}:{password}@{host}` |
|
| [Teradata](/docs/databases/teradata) | `pip install teradatasqlalchemy` | `teradatasql://{user}:{password}@{host}` |
|
||||||
| [TimescaleDB](/docs/databases/timescaledb) | `pip install psycopg2` | `postgresql://<UserName>:<DBPassword>@<Database Host>:<Port>/<Database Name>` |
|
| [TimescaleDB](/docs/databases/timescaledb) | `pip install psycopg2` | `postgresql://<UserName>:<DBPassword>@<Database Host>:<Port>/<Database Name>` |
|
||||||
| [Trino](/docs/databases/trino) | `pip install trino` | `trino://{username}:{password}@{hostname}:{port}/{catalog}` |
|
| [Trino](/docs/databases/trino) | `pip install trino` | `trino://{username}:{password}@{hostname}:{port}/{catalog}` |
|
||||||
|
|
|
||||||
|
|
@ -142,7 +142,7 @@ Another workaround is to change where superset stores the sqlite database by add
|
||||||
`superset_config.py`:
|
`superset_config.py`:
|
||||||
|
|
||||||
```
|
```
|
||||||
SQLALCHEMY_DATABASE_URI = 'sqlite:////new/location/superset.db'
|
SQLALCHEMY_DATABASE_URI = 'sqlite:////new/location/superset.db?check_same_thread=false'
|
||||||
```
|
```
|
||||||
|
|
||||||
You can read more about customizing Superset using the configuration file
|
You can read more about customizing Superset using the configuration file
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,9 @@ SECRET_KEY = 'YOUR_OWN_RANDOM_GENERATED_SECRET_KEY'
|
||||||
# superset metadata (slices, connections, tables, dashboards, ...).
|
# superset metadata (slices, connections, tables, dashboards, ...).
|
||||||
# Note that the connection information to connect to the datasources
|
# Note that the connection information to connect to the datasources
|
||||||
# you want to explore are managed directly in the web UI
|
# you want to explore are managed directly in the web UI
|
||||||
SQLALCHEMY_DATABASE_URI = 'sqlite:////path/to/superset.db'
|
# The check_same_thread=false property ensures the sqlite client does not attempt
|
||||||
|
# to enforce single-threaded access, which may be problematic in some edge cases
|
||||||
|
SQLALCHEMY_DATABASE_URI = 'sqlite:////path/to/superset.db?check_same_thread=false'
|
||||||
|
|
||||||
# Flask-WTF flag for CSRF
|
# Flask-WTF flag for CSRF
|
||||||
WTF_CSRF_ENABLED = True
|
WTF_CSRF_ENABLED = True
|
||||||
|
|
|
||||||
|
|
@ -184,7 +184,10 @@ SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||||
SECRET_KEY = os.environ.get("SUPERSET_SECRET_KEY") or CHANGE_ME_SECRET_KEY
|
SECRET_KEY = os.environ.get("SUPERSET_SECRET_KEY") or CHANGE_ME_SECRET_KEY
|
||||||
|
|
||||||
# The SQLAlchemy connection string.
|
# The SQLAlchemy connection string.
|
||||||
SQLALCHEMY_DATABASE_URI = "sqlite:///" + os.path.join(DATA_DIR, "superset.db")
|
SQLALCHEMY_DATABASE_URI = (
|
||||||
|
f"""sqlite:///{os.path.join(DATA_DIR, "superset.db")}?check_same_thread=false"""
|
||||||
|
)
|
||||||
|
|
||||||
# SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
|
# SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
|
||||||
# SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
|
# SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1066,6 +1066,24 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
||||||
query object"""
|
query object"""
|
||||||
# TODO: Fix circular import error caused by importing sql_lab.Query
|
# TODO: Fix circular import error caused by importing sql_lab.Query
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def execute_with_cursor(
|
||||||
|
cls, cursor: Any, sql: str, query: Query, session: Session
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Trigger execution of a query and handle the resulting cursor.
|
||||||
|
|
||||||
|
For most implementations this just makes calls to `execute` and
|
||||||
|
`handle_cursor` consecutively, but in some engines (e.g. Trino) we may
|
||||||
|
need to handle client limitations such as lack of async support and
|
||||||
|
perform a more complicated operation to get information from the cursor
|
||||||
|
in a timely manner and facilitate operations such as query stop
|
||||||
|
"""
|
||||||
|
logger.debug("Query %d: Running query: %s", query.id, sql)
|
||||||
|
cls.execute(cursor, sql, async_=True)
|
||||||
|
logger.debug("Query %d: Handling cursor", query.id)
|
||||||
|
cls.handle_cursor(cursor, query, session)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def extract_error_message(cls, ex: Exception) -> str:
|
def extract_error_message(cls, ex: Exception) -> str:
|
||||||
return f"{cls.engine} error: {cls._extract_error_message(ex)}"
|
return f"{cls.engine} error: {cls._extract_error_message(ex)}"
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ from __future__ import annotations
|
||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
from typing import Any, TYPE_CHECKING
|
from typing import Any, TYPE_CHECKING
|
||||||
|
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
|
|
@ -151,14 +153,21 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
|
def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
|
||||||
if tracking_url := cls.get_tracking_url(cursor):
|
"""
|
||||||
query.tracking_url = tracking_url
|
Handle a trino client cursor.
|
||||||
|
|
||||||
|
WARNING: if you execute a query, it will block until complete and you
|
||||||
|
will not be able to handle the cursor until complete. Use
|
||||||
|
`execute_with_cursor` instead, to handle this asynchronously.
|
||||||
|
"""
|
||||||
|
|
||||||
# Adds the executed query id to the extra payload so the query can be cancelled
|
# Adds the executed query id to the extra payload so the query can be cancelled
|
||||||
query.set_extra_json_key(
|
cancel_query_id = cursor.query_id
|
||||||
key=QUERY_CANCEL_KEY,
|
logger.debug("Query %d: queryId %s found in cursor", query.id, cancel_query_id)
|
||||||
value=(cancel_query_id := cursor.stats["queryId"]),
|
query.set_extra_json_key(key=QUERY_CANCEL_KEY, value=cancel_query_id)
|
||||||
)
|
|
||||||
|
if tracking_url := cls.get_tracking_url(cursor):
|
||||||
|
query.tracking_url = tracking_url
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
|
@ -173,6 +182,51 @@ class TrinoEngineSpec(PrestoBaseEngineSpec):
|
||||||
|
|
||||||
super().handle_cursor(cursor=cursor, query=query, session=session)
|
super().handle_cursor(cursor=cursor, query=query, session=session)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def execute_with_cursor(
|
||||||
|
cls, cursor: Any, sql: str, query: Query, session: Session
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Trigger execution of a query and handle the resulting cursor.
|
||||||
|
|
||||||
|
Trino's client blocks until the query is complete, so we need to run it
|
||||||
|
in another thread and invoke `handle_cursor` to poll for the query ID
|
||||||
|
to appear on the cursor in parallel.
|
||||||
|
"""
|
||||||
|
execute_result: dict[str, Any] = {}
|
||||||
|
|
||||||
|
def _execute(results: dict[str, Any]) -> None:
|
||||||
|
logger.debug("Query %d: Running query: %s", query.id, sql)
|
||||||
|
|
||||||
|
# Pass result / exception information back to the parent thread
|
||||||
|
try:
|
||||||
|
cls.execute(cursor, sql)
|
||||||
|
results["complete"] = True
|
||||||
|
except Exception as ex: # pylint: disable=broad-except
|
||||||
|
results["complete"] = True
|
||||||
|
results["error"] = ex
|
||||||
|
|
||||||
|
execute_thread = threading.Thread(target=_execute, args=(execute_result,))
|
||||||
|
execute_thread.start()
|
||||||
|
|
||||||
|
# Wait for a query ID to be available before handling the cursor, as
|
||||||
|
# it's required by that method; it may never become available on error.
|
||||||
|
while not cursor.query_id and not execute_result.get("complete"):
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
logger.debug("Query %d: Handling cursor", query.id)
|
||||||
|
cls.handle_cursor(cursor, query, session)
|
||||||
|
|
||||||
|
# Block until the query completes; same behaviour as the client itself
|
||||||
|
logger.debug("Query %d: Waiting for query to complete", query.id)
|
||||||
|
while not execute_result.get("complete"):
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Unfortunately we'll mangle the stack trace due to the thread, but
|
||||||
|
# throwing the original exception allows mapping database errors as normal
|
||||||
|
if err := execute_result.get("error"):
|
||||||
|
raise err
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def prepare_cancel_query(cls, query: Query, session: Session) -> None:
|
def prepare_cancel_query(cls, query: Query, session: Session) -> None:
|
||||||
if QUERY_CANCEL_KEY not in query.extra:
|
if QUERY_CANCEL_KEY not in query.extra:
|
||||||
|
|
|
||||||
|
|
@ -191,7 +191,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
|
||||||
return handle_query_error(ex, query, session)
|
return handle_query_error(ex, query, session)
|
||||||
|
|
||||||
|
|
||||||
def execute_sql_statement( # pylint: disable=too-many-arguments,too-many-statements
|
def execute_sql_statement( # pylint: disable=too-many-arguments
|
||||||
sql_statement: str,
|
sql_statement: str,
|
||||||
query: Query,
|
query: Query,
|
||||||
session: Session,
|
session: Session,
|
||||||
|
|
@ -271,10 +271,7 @@ def execute_sql_statement( # pylint: disable=too-many-arguments,too-many-statem
|
||||||
)
|
)
|
||||||
session.commit()
|
session.commit()
|
||||||
with stats_timing("sqllab.query.time_executing_query", stats_logger):
|
with stats_timing("sqllab.query.time_executing_query", stats_logger):
|
||||||
logger.debug("Query %d: Running query: %s", query.id, sql)
|
db_engine_spec.execute_with_cursor(cursor, sql, query, session)
|
||||||
db_engine_spec.execute(cursor, sql, async_=True)
|
|
||||||
logger.debug("Query %d: Handling cursor", query.id)
|
|
||||||
db_engine_spec.handle_cursor(cursor, query, session)
|
|
||||||
|
|
||||||
with stats_timing("sqllab.query.time_fetching_results", stats_logger):
|
with stats_timing("sqllab.query.time_fetching_results", stats_logger):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
|
||||||
|
|
@ -352,7 +352,7 @@ def test_handle_cursor_early_cancel(
|
||||||
query_id = "myQueryId"
|
query_id = "myQueryId"
|
||||||
|
|
||||||
cursor_mock = engine_mock.return_value.__enter__.return_value
|
cursor_mock = engine_mock.return_value.__enter__.return_value
|
||||||
cursor_mock.stats = {"queryId": query_id}
|
cursor_mock.query_id = query_id
|
||||||
session_mock = mocker.MagicMock()
|
session_mock = mocker.MagicMock()
|
||||||
|
|
||||||
query = Query()
|
query = Query()
|
||||||
|
|
@ -366,3 +366,32 @@ def test_handle_cursor_early_cancel(
|
||||||
assert cancel_query_mock.call_args[1]["cancel_query_id"] == query_id
|
assert cancel_query_mock.call_args[1]["cancel_query_id"] == query_id
|
||||||
else:
|
else:
|
||||||
assert cancel_query_mock.call_args is None
|
assert cancel_query_mock.call_args is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_execute_with_cursor_in_parallel(mocker: MockerFixture):
|
||||||
|
"""Test that `execute_with_cursor` fetches query ID from the cursor"""
|
||||||
|
from superset.db_engine_specs.trino import TrinoEngineSpec
|
||||||
|
|
||||||
|
query_id = "myQueryId"
|
||||||
|
|
||||||
|
mock_cursor = mocker.MagicMock()
|
||||||
|
mock_cursor.query_id = None
|
||||||
|
|
||||||
|
mock_query = mocker.MagicMock()
|
||||||
|
mock_session = mocker.MagicMock()
|
||||||
|
|
||||||
|
def _mock_execute(*args, **kwargs):
|
||||||
|
mock_cursor.query_id = query_id
|
||||||
|
|
||||||
|
mock_cursor.execute.side_effect = _mock_execute
|
||||||
|
|
||||||
|
TrinoEngineSpec.execute_with_cursor(
|
||||||
|
cursor=mock_cursor,
|
||||||
|
sql="SELECT 1 FROM foo",
|
||||||
|
query=mock_query,
|
||||||
|
session=mock_session,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_query.set_extra_json_key.assert_called_once_with(
|
||||||
|
key=QUERY_CANCEL_KEY, value=query_id
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -55,8 +55,8 @@ def test_execute_sql_statement(mocker: MockerFixture, app: None) -> None:
|
||||||
)
|
)
|
||||||
|
|
||||||
database.apply_limit_to_sql.assert_called_with("SELECT 42 AS answer", 2, force=True)
|
database.apply_limit_to_sql.assert_called_with("SELECT 42 AS answer", 2, force=True)
|
||||||
db_engine_spec.execute.assert_called_with(
|
db_engine_spec.execute_with_cursor.assert_called_with(
|
||||||
cursor, "SELECT 42 AS answer LIMIT 2", async_=True
|
cursor, "SELECT 42 AS answer LIMIT 2", query, session
|
||||||
)
|
)
|
||||||
SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)
|
SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)
|
||||||
|
|
||||||
|
|
@ -106,10 +106,8 @@ def test_execute_sql_statement_with_rls(
|
||||||
101,
|
101,
|
||||||
force=True,
|
force=True,
|
||||||
)
|
)
|
||||||
db_engine_spec.execute.assert_called_with(
|
db_engine_spec.execute_with_cursor.assert_called_with(
|
||||||
cursor,
|
cursor, "SELECT * FROM sales WHERE organization_id=42 LIMIT 101", query, session
|
||||||
"SELECT * FROM sales WHERE organization_id=42 LIMIT 101",
|
|
||||||
async_=True,
|
|
||||||
)
|
)
|
||||||
SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)
|
SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue