chore: Migrate /superset/queries/<last_updated_ms> to API v1 (#22611)

This commit is contained in:
Diego Medina 2023-01-27 15:20:56 -03:00 committed by GitHub
parent d00ba15c78
commit 14878a160f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1172 additions and 77 deletions

File diff suppressed because it is too large Load Diff

View File

@ -18,10 +18,12 @@
*/
import { useState } from 'react';
import { isObject } from 'lodash';
import rison from 'rison';
import {
SupersetClient,
Query,
runningQueryStateList,
QueryResponse,
} from '@superset-ui/core';
import { QueryDictionary } from 'src/SqlLab/types';
import useInterval from 'src/SqlLab/utils/useInterval';
@ -62,22 +64,30 @@ function QueryAutoRefresh({
refreshQueries,
queriesLastUpdate,
}: QueryAutoRefreshProps) {
// We do not want to spam requests in the case of slow connections and potentially recieve responses out of order
// We do not want to spam requests in the case of slow connections and potentially receive responses out of order
// pendingRequest check ensures we only have one active http call to check for query statuses
const [pendingRequest, setPendingRequest] = useState(false);
const checkForRefresh = () => {
if (!pendingRequest && shouldCheckForQueries(queries)) {
const params = rison.encode({
last_updated_ms: queriesLastUpdate - QUERY_UPDATE_BUFFER_MS,
});
setPendingRequest(true);
SupersetClient.get({
endpoint: `/superset/queries/${
queriesLastUpdate - QUERY_UPDATE_BUFFER_MS
}`,
endpoint: `/api/v1/query/updated_since?q=${params}`,
timeout: QUERY_TIMEOUT_LIMIT,
})
.then(({ json }) => {
if (json) {
refreshQueries?.(json);
const jsonPayload = json as { result?: QueryResponse[] };
const queries =
jsonPayload?.result?.reduce((acc, current) => {
acc[current.id] = current;
return acc;
}, {}) ?? {};
refreshQueries?.(queries);
}
})
.catch(() => {})

View File

@ -140,6 +140,7 @@ MODEL_API_RW_METHOD_PERMISSION_MAP = {
"get_data": "read",
"samples": "read",
"delete_ssh_tunnel": "write",
"get_updated_since": "read",
"stop_query": "read",
}

View File

@ -15,9 +15,10 @@
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Any
import backoff
from flask_appbuilder.api import expose, protect, request, safe
from flask_appbuilder.api import expose, protect, request, rison, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface
from superset import db, event_logger
@ -29,6 +30,7 @@ from superset.queries.dao import QueryDAO
from superset.queries.filters import QueryFilter
from superset.queries.schemas import (
openapi_spec_methods_override,
queries_get_updated_since_schema,
QuerySchema,
StopQuerySchema,
)
@ -59,6 +61,11 @@ class QueryRestApi(BaseSupersetModelRestApi):
RouteMethod.RELATED,
RouteMethod.DISTINCT,
"stop_query",
"get_updated_since",
}
apispec_parameter_schemas = {
"queries_get_updated_since_schema": queries_get_updated_since_schema,
}
list_columns = [
@ -142,6 +149,59 @@ class QueryRestApi(BaseSupersetModelRestApi):
allowed_rel_fields = {"database", "user"}
allowed_distinct_fields = {"status"}
@expose("/updated_since")
@protect()
@safe
@rison(queries_get_updated_since_schema)
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".get_updated_since",
log_to_statsd=False,
)
def get_updated_since(self, **kwargs: Any) -> FlaskResponse:
"""Get a list of queries that changed after last_updated_ms
---
get:
summary: Get a list of queries that changed after last_updated_ms
parameters:
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/queries_get_updated_since_schema'
responses:
200:
description: Queries list
content:
application/json:
schema:
type: object
properties:
result:
description: >-
A List of queries that changed after last_updated_ms
type: array
items:
$ref: '#/components/schemas/{{self.__class__.__name__}}.get'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
last_updated_ms = kwargs["rison"].get("last_updated_ms", 0)
queries = QueryDAO.get_queries_changed_after(last_updated_ms)
payload = [q.to_dict() for q in queries]
return self.response(200, result=payload)
except SupersetException as ex:
return self.response(ex.status, message=ex.message)
@expose("/stop", methods=["POST"])
@protect()
@safe

View File

@ -16,7 +16,7 @@
# under the License.
import logging
from datetime import datetime
from typing import Any, Dict
from typing import Any, Dict, List, Union
from superset import sql_lab
from superset.common.db_query_status import QueryStatus
@ -25,6 +25,7 @@ from superset.exceptions import QueryNotFoundException, SupersetCancelQueryExcep
from superset.extensions import db
from superset.models.sql_lab import Query, SavedQuery
from superset.queries.filters import QueryFilter
from superset.utils.core import get_user_id
from superset.utils.dates import now_as_float
logger = logging.getLogger(__name__)
@ -61,6 +62,17 @@ class QueryDAO(BaseDAO):
db.session.add(query)
query.set_extra_json_key("columns", columns)
@staticmethod
def get_queries_changed_after(last_updated_ms: Union[float, int]) -> List[Query]:
# UTC date time, same that is stored in the DB.
last_updated_dt = datetime.utcfromtimestamp(last_updated_ms / 1000)
return (
db.session.query(Query)
.filter(Query.user_id == get_user_id(), Query.changed_on >= last_updated_dt)
.all()
)
@staticmethod
def stop_query(client_id: str) -> None:
query = db.session.query(Query).filter_by(client_id=client_id).one_or_none()

View File

@ -33,6 +33,14 @@ openapi_spec_methods_override = {
},
}
queries_get_updated_since_schema = {
"type": "object",
"properties": {
"last_updated_ms": {"type": "number"},
},
"required": ["last_updated_ms"],
}
class DatabaseSchema(Schema):
database_name = fields.String()

View File

@ -2485,6 +2485,7 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
@event_logger.log_this
@expose("/queries/<float:last_updated_ms>")
@expose("/queries/<int:last_updated_ms>")
@deprecated()
def queries(self, last_updated_ms: Union[float, int]) -> FlaskResponse:
"""
Get the updated queries.

View File

@ -52,6 +52,7 @@ class TestQueryApi(SupersetTestCase):
rows: int = 100,
tab_name: str = "",
status: str = "success",
changed_on: datetime = datetime(2020, 1, 1),
) -> Query:
database = db.session.query(Database).get(database_id)
user = db.session.query(security_manager.user_model).get(user_id)
@ -67,7 +68,7 @@ class TestQueryApi(SupersetTestCase):
rows=rows,
tab_name=tab_name,
status=status,
changed_on=datetime(2020, 1, 1),
changed_on=changed_on,
)
db.session.add(query)
db.session.commit()
@ -394,6 +395,60 @@ class TestQueryApi(SupersetTestCase):
db.session.delete(query)
db.session.commit()
def test_get_updated_since(self):
"""
Query API: Test get queries updated since timestamp
"""
now = datetime.utcnow()
client_id = self.get_random_string()
admin = self.get_user("admin")
example_db = get_example_database()
old_query = self.insert_query(
example_db.id,
admin.id,
self.get_random_string(),
sql="SELECT col1, col2 from table1",
select_sql="SELECT col1, col2 from table1",
executed_sql="SELECT col1, col2 from table1 LIMIT 100",
changed_on=now - timedelta(days=3),
)
updated_query = self.insert_query(
example_db.id,
admin.id,
client_id,
sql="SELECT col1, col2 from table1",
select_sql="SELECT col1, col2 from table1",
executed_sql="SELECT col1, col2 from table1 LIMIT 100",
changed_on=now - timedelta(days=1),
)
self.login(username="admin")
timestamp = datetime.timestamp(now - timedelta(days=2)) * 1000
uri = f"api/v1/query/updated_since?q={prison.dumps({'last_updated_ms': timestamp})}"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 200)
expected_result = updated_query.to_dict()
data = json.loads(rv.data.decode("utf-8"))
self.assertEqual(len(data["result"]), 1)
for key, value in data["result"][0].items():
# We can't assert timestamp
if key not in (
"changedOn",
"changed_on",
"end_time",
"start_running_time",
"start_time",
"id",
):
self.assertEqual(value, expected_result[key])
# rollback changes
db.session.delete(old_query)
db.session.delete(updated_query)
db.session.commit()
@mock.patch("superset.sql_lab.cancel_query")
@mock.patch("superset.views.core.db.session")
def test_stop_query_not_found(

View File

@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import json
from datetime import datetime, timedelta
from typing import Any, Iterator
import pytest
@ -58,6 +59,61 @@ def test_query_dao_save_metadata(session: Session) -> None:
assert query.extra.get("columns", None) == []
def test_query_dao_get_queries_changed_after(session: Session) -> None:
from superset.models.core import Database
from superset.models.sql_lab import Query
engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member
db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")
now = datetime.utcnow()
old_query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
changed_on=now - timedelta(days=3),
)
updated_query_obj = Query(
client_id="updated_foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from foo",
select_sql="select * from foo",
executed_sql="select * from foo",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
changed_on=now - timedelta(days=1),
)
session.add(db)
session.add(old_query_obj)
session.add(updated_query_obj)
from superset.queries.dao import QueryDAO
timestamp = datetime.timestamp(now - timedelta(days=2)) * 1000
result = QueryDAO.get_queries_changed_after(timestamp)
assert len(result) == 1
assert result[0].client_id == "updated_foo"
def test_query_dao_stop_query_not_found(
mocker: MockFixture, app: Any, session: Session
) -> None: