remove duplicated utils (#5851)
This commit is contained in:
parent
b453cd2bf2
commit
fdd44ace27
|
|
@ -14,10 +14,16 @@ import sqlalchemy
|
|||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import NullPool
|
||||
|
||||
from superset import app, dataframe, db, results_backend, security_manager, utils
|
||||
from superset import app, dataframe, db, results_backend, security_manager
|
||||
from superset.models.sql_lab import Query
|
||||
from superset.sql_parse import SupersetQuery
|
||||
from superset.utils import get_celery_app, QueryStatus
|
||||
from superset.utils import (
|
||||
get_celery_app,
|
||||
json_iso_dttm_ser,
|
||||
now_as_float,
|
||||
QueryStatus,
|
||||
zlib_compress,
|
||||
)
|
||||
|
||||
config = app.config
|
||||
celery_app = get_celery_app(config)
|
||||
|
|
@ -103,7 +109,7 @@ def execute_sql(
|
|||
if store_results and start_time:
|
||||
# only asynchronous queries
|
||||
stats_logger.timing(
|
||||
'sqllab.query.time_pending', utils.now_as_float() - start_time)
|
||||
'sqllab.query.time_pending', now_as_float() - start_time)
|
||||
query = get_query(query_id, session)
|
||||
payload = dict(query_id=query_id)
|
||||
|
||||
|
|
@ -160,7 +166,7 @@ def execute_sql(
|
|||
|
||||
query.executed_sql = executed_sql
|
||||
query.status = QueryStatus.RUNNING
|
||||
query.start_running_time = utils.now_as_float()
|
||||
query.start_running_time = now_as_float()
|
||||
session.merge(query)
|
||||
session.commit()
|
||||
logging.info("Set query to 'running'")
|
||||
|
|
@ -175,7 +181,7 @@ def execute_sql(
|
|||
cursor = conn.cursor()
|
||||
logging.info('Running query: \n{}'.format(executed_sql))
|
||||
logging.info(query.executed_sql)
|
||||
query_start_time = utils.now_as_float()
|
||||
query_start_time = now_as_float()
|
||||
db_engine_spec.execute(cursor, query.executed_sql, async_=True)
|
||||
logging.info('Handling cursor')
|
||||
db_engine_spec.handle_cursor(cursor, query, session)
|
||||
|
|
@ -183,7 +189,7 @@ def execute_sql(
|
|||
data = db_engine_spec.fetch_data(cursor, query.limit)
|
||||
stats_logger.timing(
|
||||
'sqllab.query.time_executing_query',
|
||||
utils.now_as_float() - query_start_time)
|
||||
now_as_float() - query_start_time)
|
||||
except SoftTimeLimitExceeded as e:
|
||||
logging.exception(e)
|
||||
if conn is not None:
|
||||
|
|
@ -203,7 +209,7 @@ def execute_sql(
|
|||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
if query.status == utils.QueryStatus.STOPPED:
|
||||
if query.status == QueryStatus.STOPPED:
|
||||
return handle_error('The query has been stopped')
|
||||
|
||||
cdf = dataframe.SupersetDataFrame(data, cursor_description, db_engine_spec)
|
||||
|
|
@ -219,7 +225,7 @@ def execute_sql(
|
|||
schema=database.force_ctas_schema,
|
||||
show_cols=False,
|
||||
latest_partition=False))
|
||||
query.end_time = utils.now_as_float()
|
||||
query.end_time = now_as_float()
|
||||
session.merge(query)
|
||||
session.flush()
|
||||
|
||||
|
|
@ -232,17 +238,17 @@ def execute_sql(
|
|||
if store_results:
|
||||
key = '{}'.format(uuid.uuid4())
|
||||
logging.info('Storing results in results backend, key: {}'.format(key))
|
||||
write_to_results_backend_start = utils.now_as_float()
|
||||
write_to_results_backend_start = now_as_float()
|
||||
json_payload = json.dumps(
|
||||
payload, default=utils.json_iso_dttm_ser, ignore_nan=True)
|
||||
payload, default=json_iso_dttm_ser, ignore_nan=True)
|
||||
cache_timeout = database.cache_timeout
|
||||
if cache_timeout is None:
|
||||
cache_timeout = config.get('CACHE_DEFAULT_TIMEOUT', 0)
|
||||
results_backend.set(key, utils.zlib_compress(json_payload), cache_timeout)
|
||||
results_backend.set(key, zlib_compress(json_payload), cache_timeout)
|
||||
query.results_key = key
|
||||
stats_logger.timing(
|
||||
'sqllab.query.results_backend_write',
|
||||
utils.now_as_float() - write_to_results_backend_start)
|
||||
now_as_float() - write_to_results_backend_start)
|
||||
session.merge(query)
|
||||
session.commit()
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue