diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 8e2cafcd9..a2732d17c 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -14,10 +14,16 @@ import sqlalchemy from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import NullPool -from superset import app, dataframe, db, results_backend, security_manager, utils +from superset import app, dataframe, db, results_backend, security_manager from superset.models.sql_lab import Query from superset.sql_parse import SupersetQuery -from superset.utils import get_celery_app, QueryStatus +from superset.utils import ( + get_celery_app, + json_iso_dttm_ser, + now_as_float, + QueryStatus, + zlib_compress, +) config = app.config celery_app = get_celery_app(config) @@ -103,7 +109,7 @@ def execute_sql( if store_results and start_time: # only asynchronous queries stats_logger.timing( - 'sqllab.query.time_pending', utils.now_as_float() - start_time) + 'sqllab.query.time_pending', now_as_float() - start_time) query = get_query(query_id, session) payload = dict(query_id=query_id) @@ -160,7 +166,7 @@ def execute_sql( query.executed_sql = executed_sql query.status = QueryStatus.RUNNING - query.start_running_time = utils.now_as_float() + query.start_running_time = now_as_float() session.merge(query) session.commit() logging.info("Set query to 'running'") @@ -175,7 +181,7 @@ def execute_sql( cursor = conn.cursor() logging.info('Running query: \n{}'.format(executed_sql)) logging.info(query.executed_sql) - query_start_time = utils.now_as_float() + query_start_time = now_as_float() db_engine_spec.execute(cursor, query.executed_sql, async_=True) logging.info('Handling cursor') db_engine_spec.handle_cursor(cursor, query, session) @@ -183,7 +189,7 @@ def execute_sql( data = db_engine_spec.fetch_data(cursor, query.limit) stats_logger.timing( 'sqllab.query.time_executing_query', - utils.now_as_float() - query_start_time) + now_as_float() - query_start_time) except SoftTimeLimitExceeded as e: logging.exception(e) if conn is not None: @@ -203,7 +209,7 @@ def execute_sql( conn.commit() conn.close() - if query.status == utils.QueryStatus.STOPPED: + if query.status == QueryStatus.STOPPED: return handle_error('The query has been stopped') cdf = dataframe.SupersetDataFrame(data, cursor_description, db_engine_spec) @@ -219,7 +225,7 @@ def execute_sql( schema=database.force_ctas_schema, show_cols=False, latest_partition=False)) - query.end_time = utils.now_as_float() + query.end_time = now_as_float() session.merge(query) session.flush() @@ -232,17 +238,17 @@ def execute_sql( if store_results: key = '{}'.format(uuid.uuid4()) logging.info('Storing results in results backend, key: {}'.format(key)) - write_to_results_backend_start = utils.now_as_float() + write_to_results_backend_start = now_as_float() json_payload = json.dumps( - payload, default=utils.json_iso_dttm_ser, ignore_nan=True) + payload, default=json_iso_dttm_ser, ignore_nan=True) cache_timeout = database.cache_timeout if cache_timeout is None: cache_timeout = config.get('CACHE_DEFAULT_TIMEOUT', 0) - results_backend.set(key, utils.zlib_compress(json_payload), cache_timeout) + results_backend.set(key, zlib_compress(json_payload), cache_timeout) query.results_key = key stats_logger.timing( 'sqllab.query.results_backend_write', - utils.now_as_float() - write_to_results_backend_start) + now_as_float() - write_to_results_backend_start) session.merge(query) session.commit()