feat: Logic added to limiting factor column in Query model (#13521)
* Sqllab limit * Add migration script * Set default values * initial push * revisions * Update superset/views/core.py Co-authored-by: Beto Dealmeida <roberto@dealmeida.net> * moving migration to separate PR * with migration * revisions * Fix apply_limit_to_sql * all but tests * added unit tests * revisions * Update superset/sql_lab.py Co-authored-by: Beto Dealmeida <roberto@dealmeida.net> * Update superset/sql_parse.py Co-authored-by: Beto Dealmeida <roberto@dealmeida.net> * fixed black issue * Update superset/views/core.py Co-authored-by: Beto Dealmeida <roberto@dealmeida.net> * updated logic Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>
This commit is contained in:
parent
efcdc8cb9b
commit
e507508b48
|
|
@ -334,6 +334,7 @@ export default function sqlLabReducer(state = {}, action) {
|
|||
results: action.results,
|
||||
rows: action?.results?.data?.length,
|
||||
state: 'success',
|
||||
limitingFactor: action?.results?.query?.limitingFactor,
|
||||
tempSchema: action?.results?.query?.tempSchema,
|
||||
tempTable: action?.results?.query?.tempTable,
|
||||
errorMessage: null,
|
||||
|
|
|
|||
|
|
@ -65,4 +65,5 @@ export type Query = {
|
|||
templateParams: any;
|
||||
rows: number;
|
||||
queryLimit: number;
|
||||
limitingFactor: string;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -569,7 +569,9 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
return {}
|
||||
|
||||
@classmethod
|
||||
def apply_limit_to_sql(cls, sql: str, limit: int, database: "Database") -> str:
|
||||
def apply_limit_to_sql(
|
||||
cls, sql: str, limit: int, database: "Database", force: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
Alters the SQL statement to apply a LIMIT clause
|
||||
|
||||
|
|
@ -590,7 +592,7 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
|
||||
if cls.limit_method == LimitMethod.FORCE_LIMIT:
|
||||
parsed_query = sql_parse.ParsedQuery(sql)
|
||||
sql = parsed_query.set_or_update_query_limit(limit)
|
||||
sql = parsed_query.set_or_update_query_limit(limit, force=force)
|
||||
|
||||
return sql
|
||||
|
||||
|
|
|
|||
|
|
@ -439,8 +439,10 @@ class Database(
|
|||
cols=cols,
|
||||
)
|
||||
|
||||
def apply_limit_to_sql(self, sql: str, limit: int = 1000) -> str:
|
||||
return self.db_engine_spec.apply_limit_to_sql(sql, limit, self)
|
||||
def apply_limit_to_sql(
|
||||
self, sql: str, limit: int = 1000, force: bool = False
|
||||
) -> str:
|
||||
return self.db_engine_spec.apply_limit_to_sql(sql, limit, self, force=force)
|
||||
|
||||
def safe_sqlalchemy_uri(self) -> str:
|
||||
return self.sqlalchemy_uri
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""A collection of ORM sqlalchemy models for SQL Lab"""
|
||||
import enum
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List
|
||||
|
|
@ -29,6 +30,7 @@ from sqlalchemy import (
|
|||
Boolean,
|
||||
Column,
|
||||
DateTime,
|
||||
Enum,
|
||||
ForeignKey,
|
||||
Integer,
|
||||
Numeric,
|
||||
|
|
@ -49,6 +51,14 @@ from superset.sql_parse import CtasMethod, ParsedQuery, Table
|
|||
from superset.utils.core import QueryStatus, user_label
|
||||
|
||||
|
||||
class LimitingFactor(str, enum.Enum):
|
||||
QUERY = "QUERY"
|
||||
DROPDOWN = "DROPDOWN"
|
||||
QUERY_AND_DROPDOWN = "QUERY_AND_DROPDOWN"
|
||||
NOT_LIMITED = "NOT_LIMITED"
|
||||
UNKNOWN = "UNKNOWN"
|
||||
|
||||
|
||||
class Query(Model, ExtraJSONMixin):
|
||||
"""ORM model for SQL query
|
||||
|
||||
|
|
@ -76,6 +86,9 @@ class Query(Model, ExtraJSONMixin):
|
|||
executed_sql = Column(Text)
|
||||
# Could be configured in the superset config.
|
||||
limit = Column(Integer)
|
||||
limiting_factor = Column(
|
||||
Enum(LimitingFactor), server_default=LimitingFactor.UNKNOWN
|
||||
)
|
||||
select_as_cta = Column(Boolean)
|
||||
select_as_cta_used = Column(Boolean, default=False)
|
||||
ctas_method = Column(String(16), default=CtasMethod.TABLE)
|
||||
|
|
@ -120,6 +133,7 @@ class Query(Model, ExtraJSONMixin):
|
|||
"id": self.client_id,
|
||||
"queryId": self.id,
|
||||
"limit": self.limit,
|
||||
"limitingFactor": self.limiting_factor,
|
||||
"progress": self.progress,
|
||||
"rows": self.rows,
|
||||
"schema": self.schema,
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ from superset.dataframe import df_to_records
|
|||
from superset.db_engine_specs import BaseEngineSpec
|
||||
from superset.extensions import celery_app
|
||||
from superset.models.core import Database
|
||||
from superset.models.sql_lab import Query
|
||||
from superset.models.sql_lab import LimitingFactor, Query
|
||||
from superset.result_set import SupersetResultSet
|
||||
from superset.sql_parse import CtasMethod, ParsedQuery
|
||||
from superset.utils.celery import session_scope
|
||||
|
|
@ -179,7 +179,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
|
|||
return handle_query_error(str(ex), query, session)
|
||||
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
# pylint: disable=too-many-arguments, too-many-locals
|
||||
def execute_sql_statement(
|
||||
sql_statement: str,
|
||||
query: Query,
|
||||
|
|
@ -194,6 +194,10 @@ def execute_sql_statement(
|
|||
db_engine_spec = database.db_engine_spec
|
||||
parsed_query = ParsedQuery(sql_statement)
|
||||
sql = parsed_query.stripped()
|
||||
# This is a test to see if the query is being
|
||||
# limited by either the dropdown or the sql.
|
||||
# We are testing to see if more rows exist than the limit.
|
||||
increased_limit = None if query.limit is None else query.limit + 1
|
||||
|
||||
if not db_engine_spec.is_readonly_query(parsed_query) and not database.allow_dml:
|
||||
raise SqlLabSecurityException(
|
||||
|
|
@ -219,11 +223,14 @@ def execute_sql_statement(
|
|||
if SQL_MAX_ROW and (not query.limit or query.limit > SQL_MAX_ROW):
|
||||
query.limit = SQL_MAX_ROW
|
||||
if query.limit:
|
||||
sql = database.apply_limit_to_sql(sql, query.limit)
|
||||
# We are fetching one more than the requested limit in order
|
||||
# to test whether there are more rows than the limit.
|
||||
# Later, the extra row will be dropped before sending
|
||||
# the results back to the user.
|
||||
sql = database.apply_limit_to_sql(sql, increased_limit, force=True)
|
||||
|
||||
# Hook to allow environment-specific mutation (usually comments) to the SQL
|
||||
sql = SQL_QUERY_MUTATOR(sql, user_name, security_manager, database)
|
||||
|
||||
try:
|
||||
if log_query:
|
||||
log_query(
|
||||
|
|
@ -249,7 +256,12 @@ def execute_sql_statement(
|
|||
query.id,
|
||||
str(query.to_dict()),
|
||||
)
|
||||
data = db_engine_spec.fetch_data(cursor, query.limit)
|
||||
data = db_engine_spec.fetch_data(cursor, increased_limit)
|
||||
if query.limit is None or len(data) <= query.limit:
|
||||
query.limiting_factor = LimitingFactor.NOT_LIMITED
|
||||
else:
|
||||
# return 1 row less than increased_query
|
||||
data = data[:-1]
|
||||
except Exception as ex:
|
||||
logger.error("Query %d: %s", query.id, type(ex), exc_info=True)
|
||||
logger.debug("Query %d: %s", query.id, ex)
|
||||
|
|
|
|||
|
|
@ -311,7 +311,7 @@ class ParsedQuery:
|
|||
if any(not self._is_identifier(token2) for token2 in item.tokens):
|
||||
self._extract_from_token(item)
|
||||
|
||||
def set_or_update_query_limit(self, new_limit: int) -> str:
|
||||
def set_or_update_query_limit(self, new_limit: int, force: bool = False) -> str:
|
||||
"""Returns the query with the specified limit.
|
||||
|
||||
Does not change the underlying query if user did not apply the limit,
|
||||
|
|
@ -332,8 +332,8 @@ class ParsedQuery:
|
|||
break
|
||||
_, limit = statement.token_next(idx=limit_pos)
|
||||
# Override the limit only when it exceeds the configured value.
|
||||
if limit.ttype == sqlparse.tokens.Literal.Number.Integer and new_limit < int(
|
||||
limit.value
|
||||
if limit.ttype == sqlparse.tokens.Literal.Number.Integer and (
|
||||
force or new_limit < int(limit.value)
|
||||
):
|
||||
limit.value = new_limit
|
||||
elif limit.is_group:
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ from superset.models.core import Database, FavStar, Log
|
|||
from superset.models.dashboard import Dashboard
|
||||
from superset.models.datasource_access_request import DatasourceAccessRequest
|
||||
from superset.models.slice import Slice
|
||||
from superset.models.sql_lab import Query, TabState
|
||||
from superset.models.sql_lab import LimitingFactor, Query, TabState
|
||||
from superset.models.user_attributes import UserAttribute
|
||||
from superset.queries.dao import QueryDAO
|
||||
from superset.security.analytics_db_safety import check_sqlalchemy_uri
|
||||
|
|
@ -2393,6 +2393,7 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
# Update saved query if needed
|
||||
QueryDAO.update_saved_query_exec_info(query_id)
|
||||
|
||||
# TODO: set LimitingFactor to display?
|
||||
payload = json.dumps(
|
||||
apply_display_max_row_limit(data),
|
||||
default=utils.pessimistic_json_iso_dttm_ser,
|
||||
|
|
@ -2548,6 +2549,12 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
if not (config.get("SQLLAB_CTAS_NO_LIMIT") and select_as_cta):
|
||||
# set LIMIT after template processing
|
||||
limits = [mydb.db_engine_spec.get_limit_from_sql(rendered_query), limit]
|
||||
if limits[0] is None or limits[0] > limits[1]:
|
||||
query.limiting_factor = LimitingFactor.DROPDOWN
|
||||
elif limits[1] > limits[0]:
|
||||
query.limiting_factor = LimitingFactor.QUERY
|
||||
else: # limits[0] == limits[1]
|
||||
query.limiting_factor = LimitingFactor.QUERY_AND_DROPDOWN
|
||||
query.limit = min(lim for lim in limits if lim is not None)
|
||||
|
||||
# Flag for whether or not to expand data
|
||||
|
|
@ -2571,7 +2578,9 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
@has_access
|
||||
@event_logger.log_this
|
||||
@expose("/csv/<client_id>")
|
||||
def csv(self, client_id: str) -> FlaskResponse: # pylint: disable=no-self-use
|
||||
def csv( # pylint: disable=no-self-use,too-many-locals
|
||||
self, client_id: str
|
||||
) -> FlaskResponse:
|
||||
"""Download the query results as csv."""
|
||||
logger.info("Exporting CSV file [%s]", client_id)
|
||||
query = db.session.query(Query).filter_by(client_id=client_id).one()
|
||||
|
|
@ -2599,8 +2608,21 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
logger.info("Using pandas to convert to CSV")
|
||||
else:
|
||||
logger.info("Running a query to turn into CSV")
|
||||
sql = query.select_sql or query.executed_sql
|
||||
df = query.database.get_df(sql, query.schema)
|
||||
if query.select_sql:
|
||||
sql = query.select_sql
|
||||
limit = None
|
||||
else:
|
||||
sql = query.executed_sql
|
||||
limit = ParsedQuery(sql).limit
|
||||
if limit is not None and query.limiting_factor in {
|
||||
LimitingFactor.QUERY,
|
||||
LimitingFactor.DROPDOWN,
|
||||
LimitingFactor.QUERY_AND_DROPDOWN,
|
||||
}:
|
||||
# remove extra row from `increased_limit`
|
||||
limit -= 1
|
||||
df = query.database.get_df(sql, query.schema)[:limit]
|
||||
|
||||
csv_data = csv.df_to_escaped_csv(df, index=False, **config["CSV_EXPORT"])
|
||||
quoted_csv_name = parse.quote(query.name)
|
||||
response = CsvResponse(
|
||||
|
|
|
|||
|
|
@ -81,6 +81,19 @@ class TestDbEngineSpecs(TestDbEngineSpec):
|
|||
"SELECT * FROM (SELECT * FROM a LIMIT 10) LIMIT 1000",
|
||||
)
|
||||
|
||||
def test_limit_query_without_force(self):
|
||||
self.sql_limit_regex(
|
||||
"SELECT * FROM a LIMIT 10", "SELECT * FROM a LIMIT 10", limit=11,
|
||||
)
|
||||
|
||||
def test_limit_query_with_force(self):
|
||||
self.sql_limit_regex(
|
||||
"SELECT * FROM a LIMIT 10",
|
||||
"SELECT * FROM a LIMIT 11",
|
||||
limit=11,
|
||||
force=True,
|
||||
)
|
||||
|
||||
def test_limit_with_expr(self):
|
||||
self.sql_limit_regex(
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -25,8 +25,13 @@ from superset.models.core import Database
|
|||
|
||||
class TestDbEngineSpec(SupersetTestCase):
|
||||
def sql_limit_regex(
|
||||
self, sql, expected_sql, engine_spec_class=MySQLEngineSpec, limit=1000
|
||||
self,
|
||||
sql,
|
||||
expected_sql,
|
||||
engine_spec_class=MySQLEngineSpec,
|
||||
limit=1000,
|
||||
force=False,
|
||||
):
|
||||
main = Database(database_name="test_database", sqlalchemy_uri="sqlite://")
|
||||
limited = engine_spec_class.apply_limit_to_sql(sql, limit, main)
|
||||
limited = engine_spec_class.apply_limit_to_sql(sql, limit, main, force)
|
||||
self.assertEqual(expected_sql, limited)
|
||||
|
|
|
|||
|
|
@ -31,10 +31,11 @@ from superset.connectors.sqla.models import SqlaTable
|
|||
from superset.db_engine_specs import BaseEngineSpec
|
||||
from superset.errors import ErrorLevel, SupersetErrorType
|
||||
from superset.models.core import Database
|
||||
from superset.models.sql_lab import Query, SavedQuery
|
||||
from superset.models.sql_lab import LimitingFactor, Query, SavedQuery
|
||||
from superset.result_set import SupersetResultSet
|
||||
from superset.sql_lab import (
|
||||
execute_sql_statements,
|
||||
execute_sql_statement,
|
||||
get_sql_results,
|
||||
SqlLabException,
|
||||
SqlLabTimeoutException,
|
||||
|
|
@ -119,7 +120,6 @@ class TestSqlLab(SupersetTestCase):
|
|||
)
|
||||
assert saved_query_.rows is not None
|
||||
assert saved_query_.last_run == datetime.now()
|
||||
|
||||
# Rollback changes
|
||||
db.session.delete(saved_query_)
|
||||
db.session.commit()
|
||||
|
|
@ -507,18 +507,44 @@ class TestSqlLab(SupersetTestCase):
|
|||
"SELECT * FROM birth_names", client_id="sql_limit_2", query_limit=test_limit
|
||||
)
|
||||
self.assertEqual(len(data["data"]), test_limit)
|
||||
|
||||
data = self.run_sql(
|
||||
"SELECT * FROM birth_names LIMIT {}".format(test_limit),
|
||||
client_id="sql_limit_3",
|
||||
query_limit=test_limit + 1,
|
||||
)
|
||||
self.assertEqual(len(data["data"]), test_limit)
|
||||
self.assertEqual(data["query"]["limitingFactor"], LimitingFactor.QUERY)
|
||||
|
||||
data = self.run_sql(
|
||||
"SELECT * FROM birth_names LIMIT {}".format(test_limit + 1),
|
||||
client_id="sql_limit_4",
|
||||
query_limit=test_limit,
|
||||
)
|
||||
self.assertEqual(len(data["data"]), test_limit)
|
||||
self.assertEqual(data["query"]["limitingFactor"], LimitingFactor.DROPDOWN)
|
||||
|
||||
data = self.run_sql(
|
||||
"SELECT * FROM birth_names LIMIT {}".format(test_limit),
|
||||
client_id="sql_limit_5",
|
||||
query_limit=test_limit,
|
||||
)
|
||||
self.assertEqual(len(data["data"]), test_limit)
|
||||
self.assertEqual(
|
||||
data["query"]["limitingFactor"], LimitingFactor.QUERY_AND_DROPDOWN
|
||||
)
|
||||
|
||||
data = self.run_sql(
|
||||
"SELECT * FROM birth_names", client_id="sql_limit_6", query_limit=10000,
|
||||
)
|
||||
self.assertEqual(len(data["data"]), 1200)
|
||||
self.assertEqual(data["query"]["limitingFactor"], LimitingFactor.NOT_LIMITED)
|
||||
|
||||
data = self.run_sql(
|
||||
"SELECT * FROM birth_names", client_id="sql_limit_7", query_limit=1200,
|
||||
)
|
||||
self.assertEqual(len(data["data"]), 1200)
|
||||
self.assertEqual(data["query"]["limitingFactor"], LimitingFactor.NOT_LIMITED)
|
||||
|
||||
def test_query_api_filter(self) -> None:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ if "sqlite" in SQLALCHEMY_DATABASE_URI:
|
|||
PRESTO_POLL_INTERVAL = 0.1
|
||||
HIVE_POLL_INTERVAL = 0.1
|
||||
|
||||
SQL_MAX_ROW = 666
|
||||
SQL_MAX_ROW = 10000
|
||||
SQLLAB_CTAS_NO_LIMIT = True # SQL_MAX_ROW will not take affect for the CTA queries
|
||||
FEATURE_FLAGS = {
|
||||
**FEATURE_FLAGS,
|
||||
|
|
|
|||
Loading…
Reference in New Issue