diff --git a/superset/views/core.py b/superset/views/core.py index 0a128614b..c4ee033a3 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -46,6 +46,8 @@ import pandas as pd import pyarrow as pa import simplejson as json from sqlalchemy import and_, or_, select +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm.session import Session from werkzeug.routing import BaseConverter from superset import ( @@ -2593,56 +2595,152 @@ class Superset(BaseSupersetView): ) return json_error_response(f"{msg}") + def _sql_json_async( + self, session: Session, rendered_query: str, query: Query + ) -> str: + """ + Send SQL JSON query to celery workers + + :param session: SQLAlchemy session object + :param rendered_query: the rendered query to perform by workers + :param query: The query (SQLAlchemy) object + :return: String JSON response + """ + logging.info(f"Query {query.id}: Running query on a Celery worker") + # Ignore the celery future object and the request may time out. + try: + sql_lab.get_sql_results.delay( + query.id, + rendered_query, + return_results=False, + store_results=not query.select_as_cta, + user_name=g.user.username if g.user else None, + start_time=now_as_float(), + ) + except Exception as e: + logging.exception(f"Query {query.id}: {e}") + msg = _( + "Failed to start remote query on a worker. " + "Tell your administrator to verify the availability of " + "the message queue." + ) + query.status = QueryStatus.FAILED + query.error_message = msg + session.commit() + return json_error_response("{}".format(msg)) + resp = json_success( + json.dumps( + {"query": query.to_dict()}, + default=utils.json_int_dttm_ser, + ignore_nan=True, + ), + status=202, + ) + session.commit() + return resp + + def _sql_json_sync( + self, session: Session, rendered_query: str, query: Query + ) -> str: + """ + Execute SQL query (sql json) + + :param rendered_query: The rendered query (included templates) + :param query: The query SQL (SQLAlchemy) object + :return: String JSON response + """ + try: + timeout = config.get("SQLLAB_TIMEOUT") + timeout_msg = f"The query exceeded the {timeout} seconds timeout." + with utils.timeout(seconds=timeout, error_message=timeout_msg): + # pylint: disable=no-value-for-parameter + data = sql_lab.get_sql_results( + query.id, + rendered_query, + return_results=True, + user_name=g.user.username if g.user else None, + ) + + payload = json.dumps( + apply_display_max_row_limit(data), + default=utils.pessimistic_json_iso_dttm_ser, + ignore_nan=True, + encoding=None, + ) + except Exception as e: + logging.exception(f"Query {query.id}: {e}") + return json_error_response(f"{{e}}") + if data.get("status") == QueryStatus.FAILED: + return json_error_response(payload=data) + return json_success(payload) + @has_access_api @expose("/sql_json/", methods=["POST", "GET"]) @event_logger.log_this def sql_json(self): """Runs arbitrary sql and returns and json""" - async_ = request.form.get("runAsync") == "true" - sql = request.form.get("sql") - database_id = request.form.get("database_id") - schema = request.form.get("schema") or None - template_params = json.loads(request.form.get("templateParams") or "{}") - limit = int(request.form.get("queryLimit", 0)) + # Collect Values + database_id: int = int(request.form.get("database_id")) + schema: str = request.form.get("schema") + sql: str = request.form.get("sql") + try: + template_params: dict = json.loads(request.form.get("templateParams", "{}")) + except json.decoder.JSONDecodeError: + logging.warning( + f"Invalid template parameter {request.form.get('templateParams')}" + " specified. Defaulting to empty dict" + ) + template_params = {} + limit = int(request.form.get("queryLimit", app.config.get("SQL_MAX_ROW"))) + async_flag: bool = request.form.get("runAsync") == "true" if limit < 0: logging.warning( - "Invalid limit of {} specified. Defaulting to max limit.".format(limit) + f"Invalid limit of {limit} specified. Defaulting to max limit." ) limit = 0 - limit = limit or app.config.get("SQL_MAX_ROW") + select_as_cta: bool = request.form.get("select_as_cta") == "true" + tmp_table_name: str = request.form.get("tmp_table_name") + client_id: str = request.form.get("client_id") or utils.shortid()[:10] + sql_editor_id: str = request.form.get("sql_editor_id") + tab_name: str = request.form.get("tab") + status: bool = QueryStatus.PENDING if async_flag else QueryStatus.RUNNING session = db.session() mydb = session.query(models.Database).filter_by(id=database_id).one_or_none() - if not mydb: return json_error_response(f"Database with id {database_id} is missing.") - select_as_cta = request.form.get("select_as_cta") == "true" - tmp_table_name = request.form.get("tmp_table_name") + # Set tmp_table_name for CTA if select_as_cta and mydb.force_ctas_schema: - tmp_table_name = "{}.{}".format(mydb.force_ctas_schema, tmp_table_name) + tmp_table_name = f"{mydb.force_ctas_schema}.{tmp_table_name}" - client_id = request.form.get("client_id") or utils.shortid()[:10] + # Save current query query = Query( - database_id=int(database_id), + database_id=database_id, sql=sql, schema=schema, select_as_cta=select_as_cta, start_time=now_as_float(), - tab_name=request.form.get("tab"), - status=QueryStatus.PENDING if async_ else QueryStatus.RUNNING, - sql_editor_id=request.form.get("sql_editor_id"), + tab_name=tab_name, + status=status, + sql_editor_id=sql_editor_id, tmp_table_name=tmp_table_name, user_id=g.user.get_id() if g.user else None, client_id=client_id, ) - session.add(query) - session.flush() - query_id = query.id - session.commit() # shouldn't be necessary + try: + session.add(query) + session.flush() + query_id = query.id + session.commit() # shouldn't be necessary + except SQLAlchemyError as e: + logging.error(f"Errors saving query details {e}") + session.rollback() + raise Exception(_("Query record was not created as expected.")) if not query_id: raise Exception(_("Query record was not created as expected.")) - logging.info("Triggering query_id: {}".format(query_id)) + + logging.info(f"Triggering query_id: {query_id}") rejected_tables = security_manager.rejected_tables(sql, mydb, schema) if rejected_tables: @@ -2662,10 +2760,9 @@ class Superset(BaseSupersetView): query.sql, **template_params ) except Exception as e: + error_msg = utils.error_msg_from_exception(e) return json_error_response( - "Query {}: Template rendering failed: {}".format( - query_id, utils.error_msg_from_exception(e) - ) + f"Query {query_id}: Template rendering failed: {error_msg}" ) # set LIMIT after template processing @@ -2673,66 +2770,10 @@ class Superset(BaseSupersetView): query.limit = min(lim for lim in limits if lim is not None) # Async request. - if async_: - logging.info(f"Query {query_id}: Running query on a Celery worker") - # Ignore the celery future object and the request may time out. - try: - sql_lab.get_sql_results.delay( - query_id, - rendered_query, - return_results=False, - store_results=not query.select_as_cta, - user_name=g.user.username if g.user else None, - start_time=now_as_float(), - ) - except Exception as e: - logging.exception(f"Query {query_id}: {e}") - msg = _( - "Failed to start remote query on a worker. " - "Tell your administrator to verify the availability of " - "the message queue." - ) - query.status = QueryStatus.FAILED - query.error_message = msg - session.commit() - return json_error_response("{}".format(msg)) - - resp = json_success( - json.dumps( - {"query": query.to_dict()}, - default=utils.json_int_dttm_ser, - ignore_nan=True, - ), - status=202, - ) - session.commit() - return resp - + if async_flag: + return self._sql_json_async(session, rendered_query, query) # Sync request. - try: - timeout = config.get("SQLLAB_TIMEOUT") - timeout_msg = f"The query exceeded the {timeout} seconds timeout." - with utils.timeout(seconds=timeout, error_message=timeout_msg): - # pylint: disable=no-value-for-parameter - data = sql_lab.get_sql_results( - query_id, - rendered_query, - return_results=True, - user_name=g.user.username if g.user else None, - ) - - payload = json.dumps( - apply_display_max_row_limit(data), - default=utils.pessimistic_json_iso_dttm_ser, - ignore_nan=True, - encoding=None, - ) - except Exception as e: - logging.exception(f"Query {query_id}: {e}") - return json_error_response("{}".format(e)) - if data.get("status") == QueryStatus.FAILED: - return json_error_response(payload=data) - return json_success(payload) + return self._sql_json_sync(session, rendered_query, query) @has_access @expose("/csv/")