[SQLLab] Refactor sql json endpoint (#8163)

* [superset] sql_json endpoint refactor

* [superset] sql_json endpoint refactor sync

* [superset] Fix, wrong parameter name

* [superset] flake8 and black

* [SQLLab] Don't need to pass request to helper SQLJsonParams

* [SQLLab] try except on query add

* [SQLLab] Remove unwanted changes

* [SQLLab] refactor (cont.)

* [superset] black

* [SQLLab] Remove SQLJsonParams class

* [SQLLab] Fix, add missing type annotations

* [superset] Better exception handling when saving query

* [superset] Remove unnecessary commit

* [superset] one or none instead of first

* [superset] Fix, missing return on error sql_json

* [superset] Fix, black

* [database] Use f strings
This commit is contained in:
Daniel Vaz Gaspar 2019-09-19 20:15:40 +01:00 committed by John Bodley
parent 2cd631a26c
commit 8bc5cd7dc0
1 changed files with 125 additions and 84 deletions

View File

@ -46,6 +46,8 @@ import pandas as pd
import pyarrow as pa
import simplejson as json
from sqlalchemy import and_, or_, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.session import Session
from werkzeug.routing import BaseConverter
from superset import (
@ -2593,56 +2595,152 @@ class Superset(BaseSupersetView):
)
return json_error_response(f"{msg}")
def _sql_json_async(
self, session: Session, rendered_query: str, query: Query
) -> str:
"""
Send SQL JSON query to celery workers
:param session: SQLAlchemy session object
:param rendered_query: the rendered query to perform by workers
:param query: The query (SQLAlchemy) object
:return: String JSON response
"""
logging.info(f"Query {query.id}: Running query on a Celery worker")
# Ignore the celery future object and the request may time out.
try:
sql_lab.get_sql_results.delay(
query.id,
rendered_query,
return_results=False,
store_results=not query.select_as_cta,
user_name=g.user.username if g.user else None,
start_time=now_as_float(),
)
except Exception as e:
logging.exception(f"Query {query.id}: {e}")
msg = _(
"Failed to start remote query on a worker. "
"Tell your administrator to verify the availability of "
"the message queue."
)
query.status = QueryStatus.FAILED
query.error_message = msg
session.commit()
return json_error_response("{}".format(msg))
resp = json_success(
json.dumps(
{"query": query.to_dict()},
default=utils.json_int_dttm_ser,
ignore_nan=True,
),
status=202,
)
session.commit()
return resp
def _sql_json_sync(
self, session: Session, rendered_query: str, query: Query
) -> str:
"""
Execute SQL query (sql json)
:param rendered_query: The rendered query (included templates)
:param query: The query SQL (SQLAlchemy) object
:return: String JSON response
"""
try:
timeout = config.get("SQLLAB_TIMEOUT")
timeout_msg = f"The query exceeded the {timeout} seconds timeout."
with utils.timeout(seconds=timeout, error_message=timeout_msg):
# pylint: disable=no-value-for-parameter
data = sql_lab.get_sql_results(
query.id,
rendered_query,
return_results=True,
user_name=g.user.username if g.user else None,
)
payload = json.dumps(
apply_display_max_row_limit(data),
default=utils.pessimistic_json_iso_dttm_ser,
ignore_nan=True,
encoding=None,
)
except Exception as e:
logging.exception(f"Query {query.id}: {e}")
return json_error_response(f"{{e}}")
if data.get("status") == QueryStatus.FAILED:
return json_error_response(payload=data)
return json_success(payload)
@has_access_api
@expose("/sql_json/", methods=["POST", "GET"])
@event_logger.log_this
def sql_json(self):
"""Runs arbitrary sql and returns and json"""
async_ = request.form.get("runAsync") == "true"
sql = request.form.get("sql")
database_id = request.form.get("database_id")
schema = request.form.get("schema") or None
template_params = json.loads(request.form.get("templateParams") or "{}")
limit = int(request.form.get("queryLimit", 0))
# Collect Values
database_id: int = int(request.form.get("database_id"))
schema: str = request.form.get("schema")
sql: str = request.form.get("sql")
try:
template_params: dict = json.loads(request.form.get("templateParams", "{}"))
except json.decoder.JSONDecodeError:
logging.warning(
f"Invalid template parameter {request.form.get('templateParams')}"
" specified. Defaulting to empty dict"
)
template_params = {}
limit = int(request.form.get("queryLimit", app.config.get("SQL_MAX_ROW")))
async_flag: bool = request.form.get("runAsync") == "true"
if limit < 0:
logging.warning(
"Invalid limit of {} specified. Defaulting to max limit.".format(limit)
f"Invalid limit of {limit} specified. Defaulting to max limit."
)
limit = 0
limit = limit or app.config.get("SQL_MAX_ROW")
select_as_cta: bool = request.form.get("select_as_cta") == "true"
tmp_table_name: str = request.form.get("tmp_table_name")
client_id: str = request.form.get("client_id") or utils.shortid()[:10]
sql_editor_id: str = request.form.get("sql_editor_id")
tab_name: str = request.form.get("tab")
status: bool = QueryStatus.PENDING if async_flag else QueryStatus.RUNNING
session = db.session()
mydb = session.query(models.Database).filter_by(id=database_id).one_or_none()
if not mydb:
return json_error_response(f"Database with id {database_id} is missing.")
select_as_cta = request.form.get("select_as_cta") == "true"
tmp_table_name = request.form.get("tmp_table_name")
# Set tmp_table_name for CTA
if select_as_cta and mydb.force_ctas_schema:
tmp_table_name = "{}.{}".format(mydb.force_ctas_schema, tmp_table_name)
tmp_table_name = f"{mydb.force_ctas_schema}.{tmp_table_name}"
client_id = request.form.get("client_id") or utils.shortid()[:10]
# Save current query
query = Query(
database_id=int(database_id),
database_id=database_id,
sql=sql,
schema=schema,
select_as_cta=select_as_cta,
start_time=now_as_float(),
tab_name=request.form.get("tab"),
status=QueryStatus.PENDING if async_ else QueryStatus.RUNNING,
sql_editor_id=request.form.get("sql_editor_id"),
tab_name=tab_name,
status=status,
sql_editor_id=sql_editor_id,
tmp_table_name=tmp_table_name,
user_id=g.user.get_id() if g.user else None,
client_id=client_id,
)
session.add(query)
session.flush()
query_id = query.id
session.commit() # shouldn't be necessary
try:
session.add(query)
session.flush()
query_id = query.id
session.commit() # shouldn't be necessary
except SQLAlchemyError as e:
logging.error(f"Errors saving query details {e}")
session.rollback()
raise Exception(_("Query record was not created as expected."))
if not query_id:
raise Exception(_("Query record was not created as expected."))
logging.info("Triggering query_id: {}".format(query_id))
logging.info(f"Triggering query_id: {query_id}")
rejected_tables = security_manager.rejected_tables(sql, mydb, schema)
if rejected_tables:
@ -2662,10 +2760,9 @@ class Superset(BaseSupersetView):
query.sql, **template_params
)
except Exception as e:
error_msg = utils.error_msg_from_exception(e)
return json_error_response(
"Query {}: Template rendering failed: {}".format(
query_id, utils.error_msg_from_exception(e)
)
f"Query {query_id}: Template rendering failed: {error_msg}"
)
# set LIMIT after template processing
@ -2673,66 +2770,10 @@ class Superset(BaseSupersetView):
query.limit = min(lim for lim in limits if lim is not None)
# Async request.
if async_:
logging.info(f"Query {query_id}: Running query on a Celery worker")
# Ignore the celery future object and the request may time out.
try:
sql_lab.get_sql_results.delay(
query_id,
rendered_query,
return_results=False,
store_results=not query.select_as_cta,
user_name=g.user.username if g.user else None,
start_time=now_as_float(),
)
except Exception as e:
logging.exception(f"Query {query_id}: {e}")
msg = _(
"Failed to start remote query on a worker. "
"Tell your administrator to verify the availability of "
"the message queue."
)
query.status = QueryStatus.FAILED
query.error_message = msg
session.commit()
return json_error_response("{}".format(msg))
resp = json_success(
json.dumps(
{"query": query.to_dict()},
default=utils.json_int_dttm_ser,
ignore_nan=True,
),
status=202,
)
session.commit()
return resp
if async_flag:
return self._sql_json_async(session, rendered_query, query)
# Sync request.
try:
timeout = config.get("SQLLAB_TIMEOUT")
timeout_msg = f"The query exceeded the {timeout} seconds timeout."
with utils.timeout(seconds=timeout, error_message=timeout_msg):
# pylint: disable=no-value-for-parameter
data = sql_lab.get_sql_results(
query_id,
rendered_query,
return_results=True,
user_name=g.user.username if g.user else None,
)
payload = json.dumps(
apply_display_max_row_limit(data),
default=utils.pessimistic_json_iso_dttm_ser,
ignore_nan=True,
encoding=None,
)
except Exception as e:
logging.exception(f"Query {query_id}: {e}")
return json_error_response("{}".format(e))
if data.get("status") == QueryStatus.FAILED:
return json_error_response(payload=data)
return json_success(payload)
return self._sql_json_sync(session, rendered_query, query)
@has_access
@expose("/csv/<client_id>")