[sqllab] Using app context for Celery task (#4669)
This commit is contained in:
parent
29678680ee
commit
d49a0e7958
|
|
@ -19,7 +19,6 @@ from sqlalchemy.pool import NullPool
|
|||
|
||||
from superset import app, dataframe, db, results_backend, security_manager, utils
|
||||
from superset.db_engine_specs import LimitMethod
|
||||
from superset.jinja_context import get_template_processor
|
||||
from superset.models.sql_lab import Query
|
||||
from superset.sql_parse import SupersetQuery
|
||||
from superset.utils import get_celery_app, QueryStatus
|
||||
|
|
@ -109,13 +108,12 @@ def convert_results_to_df(cursor_description, data):
|
|||
|
||||
@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT)
|
||||
def get_sql_results(
|
||||
ctask, query_id, return_results=True, store_results=False,
|
||||
user_name=None, template_params=None):
|
||||
ctask, query_id, rendered_query, return_results=True, store_results=False,
|
||||
user_name=None):
|
||||
"""Executes the sql query returns the results."""
|
||||
try:
|
||||
return execute_sql(
|
||||
ctask, query_id, return_results, store_results, user_name,
|
||||
template_params)
|
||||
ctask, query_id, rendered_query, return_results, store_results, user_name)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
stats_logger.incr('error_sqllab_unhandled')
|
||||
|
|
@ -129,8 +127,8 @@ def get_sql_results(
|
|||
|
||||
|
||||
def execute_sql(
|
||||
ctask, query_id, return_results=True, store_results=False, user_name=None,
|
||||
template_params=None,
|
||||
ctask, query_id, rendered_query, return_results=True, store_results=False,
|
||||
user_name=None,
|
||||
):
|
||||
"""Executes the sql query returns the results."""
|
||||
session = get_session(not ctask.request.called_directly)
|
||||
|
|
@ -161,16 +159,6 @@ def execute_sql(
|
|||
if store_results and not results_backend:
|
||||
return handle_error("Results backend isn't configured.")
|
||||
|
||||
try:
|
||||
template_processor = get_template_processor(
|
||||
database=database, query=query)
|
||||
tp = template_params or {}
|
||||
rendered_query = template_processor.process_template(query.sql, **tp)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
msg = 'Template rendering failed: ' + utils.error_msg_from_exception(e)
|
||||
return handle_error(msg)
|
||||
|
||||
# Limit enforced only for retrieving the data, not for the CTA queries.
|
||||
superset_query = SupersetQuery(rendered_query)
|
||||
executed_sql = superset_query.stripped()
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ from superset.connectors.connector_registry import ConnectorRegistry
|
|||
from superset.connectors.sqla.models import AnnotationDatasource, SqlaTable
|
||||
from superset.exceptions import SupersetException, SupersetSecurityException
|
||||
from superset.forms import CsvToDatabaseForm
|
||||
from superset.jinja_context import get_template_processor
|
||||
from superset.legacy import cast_form_data
|
||||
import superset.models.core as models
|
||||
from superset.models.sql_lab import Query
|
||||
|
|
@ -2409,16 +2410,27 @@ class Superset(BaseSupersetView):
|
|||
raise Exception(_('Query record was not created as expected.'))
|
||||
logging.info('Triggering query_id: {}'.format(query_id))
|
||||
|
||||
try:
|
||||
template_processor = get_template_processor(
|
||||
database=query.database, query=query)
|
||||
rendered_query = template_processor.process_template(
|
||||
query.sql,
|
||||
**template_params)
|
||||
except Exception as e:
|
||||
return json_error_response(
|
||||
'Template rendering failed: {}'.format(utils.error_msg_from_exception(e)))
|
||||
|
||||
# Async request.
|
||||
if async:
|
||||
logging.info('Running query on a Celery worker')
|
||||
# Ignore the celery future object and the request may time out.
|
||||
try:
|
||||
sql_lab.get_sql_results.delay(
|
||||
query_id=query_id, return_results=False,
|
||||
query_id,
|
||||
rendered_query,
|
||||
return_results=False,
|
||||
store_results=not query.select_as_cta,
|
||||
user_name=g.user.username,
|
||||
template_params=template_params)
|
||||
user_name=g.user.username)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
msg = (
|
||||
|
|
@ -2447,8 +2459,9 @@ class Superset(BaseSupersetView):
|
|||
error_message=timeout_msg):
|
||||
# pylint: disable=no-value-for-parameter
|
||||
data = sql_lab.get_sql_results(
|
||||
query_id=query_id, return_results=True,
|
||||
template_params=template_params)
|
||||
query_id,
|
||||
rendered_query,
|
||||
return_results=True)
|
||||
payload = json.dumps(
|
||||
data, default=utils.pessimistic_json_iso_dttm_ser)
|
||||
except Exception as e:
|
||||
|
|
|
|||
Loading…
Reference in New Issue