feat: estimate query cost in Postgres (#12130)
* feat: estimate query cost in Postgres * Add example in config * Fix lint
This commit is contained in:
parent
fa27ed1147
commit
877b153331
|
|
@ -70,6 +70,7 @@ class ImportModelsCommand(BaseCommand):
|
|||
db.session.rollback()
|
||||
raise self.import_error()
|
||||
|
||||
# pylint: disable=too-many-branches
|
||||
def validate(self) -> None:
|
||||
exceptions: List[ValidationError] = []
|
||||
|
||||
|
|
@ -99,6 +100,10 @@ class ImportModelsCommand(BaseCommand):
|
|||
|
||||
# validate objects
|
||||
for file_name, content in self.contents.items():
|
||||
# skip directories
|
||||
if not content:
|
||||
continue
|
||||
|
||||
prefix = file_name.split("/")[0]
|
||||
schema = self.schemas.get(f"{prefix}/")
|
||||
if schema:
|
||||
|
|
|
|||
|
|
@ -608,6 +608,37 @@ SQLLAB_ASYNC_TIME_LIMIT_SEC = 60 * 60 * 6
|
|||
# query costs before they run. These EXPLAIN queries should have a small
|
||||
# timeout.
|
||||
SQLLAB_QUERY_COST_ESTIMATE_TIMEOUT = 10 # seconds
|
||||
# The feature is off by default, and currently only supported in Presto and Postgres.
|
||||
# It also need to be enabled on a per-database basis, by adding the key/value pair
|
||||
# `cost_estimate_enabled: true` to the database `extra` attribute.
|
||||
ESTIMATE_QUERY_COST = False
|
||||
# The cost returned by the databases is a relative value; in order to map the cost to
|
||||
# a tangible value you need to define a custom formatter that takes into consideration
|
||||
# your specific infrastructure. For example, you could analyze queries a posteriori by
|
||||
# running EXPLAIN on them, and compute a histogram of relative costs to present the
|
||||
# cost as a percentile:
|
||||
#
|
||||
# def postgres_query_cost_formatter(
|
||||
# result: List[Dict[str, Any]]
|
||||
# ) -> List[Dict[str, str]]:
|
||||
# # 25, 50, 75% percentiles
|
||||
# percentile_costs = [100.0, 1000.0, 10000.0]
|
||||
#
|
||||
# out = []
|
||||
# for row in result:
|
||||
# relative_cost = row["Total cost"]
|
||||
# percentile = bisect.bisect_left(percentile_costs, relative_cost) + 1
|
||||
# out.append({
|
||||
# "Relative cost": relative_cost,
|
||||
# "Percentile": str(percentile * 25) + "%",
|
||||
# })
|
||||
#
|
||||
# return out
|
||||
#
|
||||
# DEFAULT_FEATURE_FLAGS = {
|
||||
# "ESTIMATE_QUERY_COST": True,
|
||||
# "QUERY_COST_FORMATTERS_BY_ENGINE": {"postgresql": postgres_query_cost_formatter},
|
||||
# }
|
||||
|
||||
# Flag that controls if limit should be enforced on the CTA (create table as queries).
|
||||
SQLLAB_CTAS_NO_LIMIT = False
|
||||
|
|
|
|||
|
|
@ -418,6 +418,7 @@ class ImportV1DatabaseExtraSchema(Schema):
|
|||
engine_params = fields.Dict(keys=fields.Str(), values=fields.Raw())
|
||||
metadata_cache_timeout = fields.Dict(keys=fields.Str(), values=fields.Integer())
|
||||
schemas_allowed_for_csv_upload = fields.List(fields.String)
|
||||
cost_estimate_enabled = fields.Boolean()
|
||||
|
||||
|
||||
class ImportV1DatabaseSchema(Schema):
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ from sqlalchemy.sql import quoted_name, text
|
|||
from sqlalchemy.sql.expression import ColumnClause, ColumnElement, Select, TextAsFrom
|
||||
from sqlalchemy.types import TypeEngine
|
||||
|
||||
from superset import app, sql_parse
|
||||
from superset import app, security_manager, sql_parse
|
||||
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
|
||||
from superset.models.sql_lab import Query
|
||||
from superset.sql_parse import ParsedQuery, Table
|
||||
|
|
@ -203,7 +203,7 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
return any(pattern.match(db_column_type) for pattern in patterns)
|
||||
|
||||
@classmethod
|
||||
def get_allow_cost_estimate(cls, version: Optional[str] = None) -> bool:
|
||||
def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
|
|
@ -790,16 +790,12 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
return sql
|
||||
|
||||
@classmethod
|
||||
def estimate_statement_cost(
|
||||
cls, statement: str, database: "Database", cursor: Any, user_name: str
|
||||
) -> Dict[str, Any]:
|
||||
def estimate_statement_cost(cls, statement: str, cursor: Any,) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate a SQL query that estimates the cost of a given statement.
|
||||
|
||||
:param statement: A single SQL statement
|
||||
:param database: Database instance
|
||||
:param cursor: Cursor instance
|
||||
:param username: Effective username
|
||||
:return: Dictionary with different costs
|
||||
"""
|
||||
raise Exception("Database does not support cost estimation")
|
||||
|
|
@ -816,10 +812,31 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
"""
|
||||
raise Exception("Database does not support cost estimation")
|
||||
|
||||
@classmethod
|
||||
def process_statement(
|
||||
cls, statement: str, database: "Database", user_name: str
|
||||
) -> str:
|
||||
"""
|
||||
Process a SQL statement by stripping and mutating it.
|
||||
|
||||
:param statement: A single SQL statement
|
||||
:param database: Database instance
|
||||
:param username: Effective username
|
||||
:return: Dictionary with different costs
|
||||
"""
|
||||
parsed_query = ParsedQuery(statement)
|
||||
sql = parsed_query.stripped()
|
||||
|
||||
sql_query_mutator = config["SQL_QUERY_MUTATOR"]
|
||||
if sql_query_mutator:
|
||||
sql = sql_query_mutator(sql, user_name, security_manager, database)
|
||||
|
||||
return sql
|
||||
|
||||
@classmethod
|
||||
def estimate_query_cost(
|
||||
cls, database: "Database", schema: str, sql: str, source: Optional[str] = None
|
||||
) -> List[Dict[str, str]]:
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Estimate the cost of a multiple statement SQL query.
|
||||
|
||||
|
|
@ -828,8 +845,8 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
:param sql: SQL query with possibly multiple statements
|
||||
:param source: Source of the query (eg, "sql_lab")
|
||||
"""
|
||||
database_version = database.get_extra().get("version")
|
||||
if not cls.get_allow_cost_estimate(database_version):
|
||||
extra = database.get_extra() or {}
|
||||
if not cls.get_allow_cost_estimate(extra):
|
||||
raise Exception("Database does not support cost estimation")
|
||||
|
||||
user_name = g.user.username if g.user else None
|
||||
|
|
@ -841,10 +858,11 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
|
|||
with closing(engine.raw_connection()) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
for statement in statements:
|
||||
processed_statement = cls.process_statement(
|
||||
statement, database, user_name
|
||||
)
|
||||
costs.append(
|
||||
cls.estimate_statement_cost(
|
||||
statement, database, cursor, user_name
|
||||
)
|
||||
cls.estimate_statement_cost(processed_statement, cursor)
|
||||
)
|
||||
return costs
|
||||
|
||||
|
|
|
|||
|
|
@ -14,8 +14,9 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import Any, List, Optional, Tuple, TYPE_CHECKING
|
||||
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
|
||||
|
||||
from pytz import _FixedOffset # type: ignore
|
||||
from sqlalchemy.dialects.postgresql.base import PGInspector
|
||||
|
|
@ -71,6 +72,31 @@ class PostgresEngineSpec(PostgresBaseEngineSpec):
|
|||
max_column_name_length = 63
|
||||
try_remove_schema_from_table_name = False
|
||||
|
||||
@classmethod
|
||||
def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool:
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def estimate_statement_cost(cls, statement: str, cursor: Any) -> Dict[str, Any]:
|
||||
sql = f"EXPLAIN {statement}"
|
||||
cursor.execute(sql)
|
||||
|
||||
result = cursor.fetchone()[0]
|
||||
match = re.search(r"cost=([\d\.]+)\.\.([\d\.]+)", result)
|
||||
if match:
|
||||
return {
|
||||
"Start-up cost": float(match.group(1)),
|
||||
"Total cost": float(match.group(2)),
|
||||
}
|
||||
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
def query_cost_formatter(
|
||||
cls, raw_cost: List[Dict[str, Any]]
|
||||
) -> List[Dict[str, str]]:
|
||||
return [{k: str(v) for k, v in row.items()} for row in raw_cost]
|
||||
|
||||
@classmethod
|
||||
def get_table_names(
|
||||
cls, database: "Database", inspector: PGInspector, schema: Optional[str]
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ from sqlalchemy.engine.url import URL
|
|||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.sql.expression import ColumnClause, Select
|
||||
|
||||
from superset import app, cache_manager, is_feature_enabled, security_manager
|
||||
from superset import app, cache_manager, is_feature_enabled
|
||||
from superset.db_engine_specs.base import BaseEngineSpec
|
||||
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
|
||||
from superset.exceptions import SupersetTemplateException
|
||||
|
|
@ -132,7 +132,8 @@ class PrestoEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-metho
|
|||
}
|
||||
|
||||
@classmethod
|
||||
def get_allow_cost_estimate(cls, version: Optional[str] = None) -> bool:
|
||||
def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool:
|
||||
version = extra.get("version")
|
||||
return version is not None and StrictVersion(version) >= StrictVersion("0.319")
|
||||
|
||||
@classmethod
|
||||
|
|
@ -484,7 +485,7 @@ class PrestoEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-metho
|
|||
|
||||
@classmethod
|
||||
def estimate_statement_cost( # pylint: disable=too-many-locals
|
||||
cls, statement: str, database: "Database", cursor: Any, user_name: str
|
||||
cls, statement: str, cursor: Any
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Run a SQL query that estimates the cost of a given statement.
|
||||
|
|
@ -495,14 +496,7 @@ class PrestoEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-metho
|
|||
:param username: Effective username
|
||||
:return: JSON response from Presto
|
||||
"""
|
||||
parsed_query = ParsedQuery(statement)
|
||||
sql = parsed_query.stripped()
|
||||
|
||||
sql_query_mutator = config["SQL_QUERY_MUTATOR"]
|
||||
if sql_query_mutator:
|
||||
sql = sql_query_mutator(sql, user_name, security_manager, database)
|
||||
|
||||
sql = f"EXPLAIN (TYPE IO, FORMAT JSON) {sql}"
|
||||
sql = f"EXPLAIN (TYPE IO, FORMAT JSON) {statement}"
|
||||
cursor.execute(sql)
|
||||
|
||||
# the output from Presto is a single column and a single row containing
|
||||
|
|
|
|||
|
|
@ -183,14 +183,11 @@ class Database(
|
|||
|
||||
@property
|
||||
def allows_cost_estimate(self) -> bool:
|
||||
extra = self.get_extra()
|
||||
|
||||
database_version = extra.get("version")
|
||||
extra = self.get_extra() or {}
|
||||
cost_estimate_enabled: bool = extra.get("cost_estimate_enabled") # type: ignore
|
||||
|
||||
return (
|
||||
self.db_engine_spec.get_allow_cost_estimate(database_version)
|
||||
and cost_estimate_enabled
|
||||
self.db_engine_spec.get_allow_cost_estimate(extra) and cost_estimate_enabled
|
||||
)
|
||||
|
||||
@property
|
||||
|
|
|
|||
Loading…
Reference in New Issue