feat: saved queries with execution info (#11391)

* feat: add rows and last_run info to saved queries

* feat: add rows to saved query

* refactor and tests

* lint

* fix tests
This commit is contained in:
Daniel Vaz Gaspar 2020-10-26 18:20:07 +00:00 committed by GitHub
parent 604a519d8b
commit 144b279aa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 128 additions and 4 deletions

View File

@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""add exec info to saved queries
Revision ID: 585b0b1a7b18
Revises: af30ca79208f
Create Date: 2020-10-20 17:28:22.857694
"""
# revision identifiers, used by Alembic.
revision = "585b0b1a7b18"
down_revision = "af30ca79208f"
import sqlalchemy as sa
from alembic import op
def upgrade():
op.add_column("saved_query", sa.Column("last_run", sa.DateTime(), nullable=True))
op.add_column("saved_query", sa.Column("rows", sa.Integer(), nullable=True))
def downgrade():
op.drop_column("saved_query", "rows")
op.drop_column("saved_query", "last_run")

View File

@ -23,6 +23,8 @@ import simplejson as json
import sqlalchemy as sqla
from flask import Markup
from flask_appbuilder import Model
from flask_appbuilder.models.decorators import renders
from humanize import naturaltime
from sqlalchemy import (
Boolean,
Column,
@ -181,6 +183,8 @@ class SavedQuery(Model, AuditMixinNullable, ExtraJSONMixin):
foreign_keys=[db_id],
backref=backref("saved_queries", cascade="all, delete-orphan"),
)
rows = Column(Integer, nullable=True)
last_run = Column(DateTime, nullable=True)
def __repr__(self) -> str:
return str(self.label)
@ -210,6 +214,18 @@ class SavedQuery(Model, AuditMixinNullable, ExtraJSONMixin):
def sql_tables(self) -> List[Table]:
return list(ParsedQuery(self.sql).tables)
@property
def last_run_humanized(self) -> str:
return naturaltime(datetime.now() - self.changed_on)
@property
def _last_run_delta_humanized(self) -> str:
return naturaltime(datetime.now() - self.changed_on)
@renders("changed_on")
def last_run_delta_humanized(self) -> str:
return self._last_run_delta_humanized
class TabState(Model, AuditMixinNullable, ExtraJSONMixin):

View File

@ -15,9 +15,11 @@
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime
from superset.dao.base import BaseDAO
from superset.models.sql_lab import Query
from superset.extensions import db
from superset.models.sql_lab import Query, SavedQuery
from superset.queries.filters import QueryFilter
logger = logging.getLogger(__name__)
@ -26,3 +28,23 @@ logger = logging.getLogger(__name__)
class QueryDAO(BaseDAO):
model_cls = Query
base_filter = QueryFilter
@staticmethod
def update_saved_query_exec_info(query_id: int) -> None:
"""
Propagates query execution info back to saved query if applicable
:param query_id: The query id
:return:
"""
query = db.session.query(Query).get(query_id)
related_saved_queries = (
db.session.query(SavedQuery)
.filter(SavedQuery.database == query.database)
.filter(SavedQuery.sql == query.sql)
).all()
if related_saved_queries:
for saved_query in related_saved_queries:
saved_query.rows = query.rows
saved_query.last_run = datetime.now()
db.session.commit()

View File

@ -88,6 +88,8 @@ class SavedQueryRestApi(BaseSupersetModelRestApi):
"schema",
"sql",
"sql_tables",
"rows",
"last_run_delta_humanized",
]
add_columns = ["db_id", "description", "label", "schema", "sql"]
edit_columns = add_columns
@ -96,10 +98,12 @@ class SavedQueryRestApi(BaseSupersetModelRestApi):
"label",
"description",
"sql",
"rows",
"created_by.first_name",
"database.database_name",
"created_on",
"changed_on_delta_humanized",
"last_run_delta_humanized",
]
search_columns = ["id", "database", "label", "schema"]

View File

@ -82,6 +82,7 @@ from superset.models.datasource_access_request import DatasourceAccessRequest
from superset.models.slice import Slice
from superset.models.sql_lab import Query, TabState
from superset.models.user_attributes import UserAttribute
from superset.queries.dao import QueryDAO
from superset.security.analytics_db_safety import (
check_sqlalchemy_uri,
DBSecurityException,
@ -2144,6 +2145,7 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
"""
logger.info("Query %i: Running query on a Celery worker", query.id)
# Ignore the celery future object and the request may time out.
query_id = query.id
try:
task = sql_lab.get_sql_results.delay(
query.id,
@ -2170,6 +2172,10 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
query.error_message = msg
session.commit()
return json_error_response("{}".format(msg))
# Update saved query with execution info from the query execution
QueryDAO.update_saved_query_exec_info(query_id)
resp = json_success(
json.dumps(
{"query": query.to_dict()},
@ -2204,6 +2210,7 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
is_feature_enabled("SQLLAB_BACKEND_PERSISTENCE")
and not query.select_as_cta
)
query_id = query.id
with utils.timeout(seconds=timeout, error_message=timeout_msg):
# pylint: disable=no-value-for-parameter
data = sql_lab.get_sql_results(
@ -2216,6 +2223,9 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
log_params=log_params,
)
# Update saved query if needed
QueryDAO.update_saved_query_exec_info(query_id)
payload = json.dumps(
apply_display_max_row_limit(data),
default=utils.pessimistic_json_iso_dttm_ser,

View File

@ -168,8 +168,10 @@ class TestSavedQueryApi(SupersetTestCase):
"""
admin = self.get_user("admin")
saved_queries = (
db.session.query(SavedQuery).filter(SavedQuery.created_by == admin).all()
)
db.session.query(SavedQuery)
.filter(SavedQuery.created_by == admin)
.order_by(SavedQuery.schema.asc())
).all()
self.login(username="admin")
query_string = {"order_column": "schema", "order_direction": "asc"}
uri = f"api/v1/saved_query/?q={prison.dumps(query_string)}"

View File

@ -28,7 +28,7 @@ import tests.test_app
from superset import db, security_manager
from superset.connectors.sqla.models import SqlaTable
from superset.db_engine_specs import BaseEngineSpec
from superset.models.sql_lab import Query
from superset.models.sql_lab import Query, SavedQuery
from superset.result_set import SupersetResultSet
from superset.sql_parse import CtasMethod
from superset.utils.core import (
@ -71,6 +71,36 @@ class TestSqlLab(SupersetTestCase):
data = self.run_sql("SELECT * FROM unexistant_table", "2")
self.assertLess(0, len(data["error"]))
def test_sql_json_to_saved_query_info(self):
"""
SQLLab: Test SQLLab query execution info propagation to saved queries
"""
from freezegun import freeze_time
self.login("admin")
sql_statement = "SELECT * FROM birth_names LIMIT 10"
examples_db_id = get_example_database().id
saved_query = SavedQuery(db_id=examples_db_id, sql=sql_statement)
db.session.add(saved_query)
db.session.commit()
with freeze_time("2020-01-01T00:00:00Z"):
self.run_sql(sql_statement, "1")
saved_query_ = (
db.session.query(SavedQuery)
.filter(
SavedQuery.db_id == examples_db_id, SavedQuery.sql == sql_statement
)
.one_or_none()
)
assert saved_query_.rows is not None
assert saved_query_.last_run == datetime.now()
# Rollback changes
db.session.delete(saved_query_)
db.session.commit()
@parameterized.expand([CtasMethod.TABLE, CtasMethod.VIEW])
def test_sql_json_cta_dynamic_db(self, ctas_method):
examples_db = get_example_database()