Enhancement of query context and object. (#6962)
* added more functionalities for query context and object. * fixed cache logic * added default value for groupby * updated comments and removed print
This commit is contained in:
parent
aded70a156
commit
d5b9795f87
|
|
@ -14,30 +14,247 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# pylint: disable=R
|
||||
# pylint: disable=C,R,W
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
import pickle as pkl
|
||||
import traceback
|
||||
from typing import Dict, List
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
from superset import app, cache
|
||||
from superset import db
|
||||
from superset.connectors.connector_registry import ConnectorRegistry
|
||||
from superset.utils import core as utils
|
||||
from superset.utils.core import DTTM_ALIAS
|
||||
from .query_object import QueryObject
|
||||
|
||||
config = app.config
|
||||
stats_logger = config.get('STATS_LOGGER')
|
||||
|
||||
|
||||
class QueryContext:
|
||||
"""
|
||||
The query context contains the query object and additional fields necessary
|
||||
to retrieve the data payload for a given viz.
|
||||
"""
|
||||
|
||||
default_fillna = 0
|
||||
cache_type = 'df'
|
||||
enforce_numerical_metrics = True
|
||||
|
||||
# TODO: Type datasource and query_object dictionary with TypedDict when it becomes
|
||||
# a vanilla python type https://github.com/python/mypy/issues/5288
|
||||
def __init__(
|
||||
self,
|
||||
datasource: Dict,
|
||||
queries: List[Dict],
|
||||
force: bool = False,
|
||||
custom_cache_timeout: int = None,
|
||||
):
|
||||
self.datasource = ConnectorRegistry.get_datasource(datasource.get('type'),
|
||||
int(datasource.get('id')),
|
||||
db.session)
|
||||
self.queries = list(map(lambda query_obj: QueryObject(**query_obj), queries))
|
||||
|
||||
def get_data(self):
|
||||
raise NotImplementedError()
|
||||
self.force = force
|
||||
|
||||
self.custom_cache_timeout = custom_cache_timeout
|
||||
|
||||
self.enforce_numerical_metrics = True
|
||||
|
||||
def get_query_result(self, query_object):
|
||||
"""Returns a pandas dataframe based on the query object"""
|
||||
|
||||
# Here, we assume that all the queries will use the same datasource, which is
|
||||
# is a valid assumption for current setting. In a long term, we may or maynot
|
||||
# support multiple queries from different data source.
|
||||
|
||||
timestamp_format = None
|
||||
if self.datasource.type == 'table':
|
||||
dttm_col = self.datasource.get_col(query_object.granularity)
|
||||
if dttm_col:
|
||||
timestamp_format = dttm_col.python_date_format
|
||||
|
||||
# The datasource here can be different backend but the interface is common
|
||||
result = self.datasource.query(query_object.to_dict())
|
||||
|
||||
df = result.df
|
||||
# Transform the timestamp we received from database to pandas supported
|
||||
# datetime format. If no python_date_format is specified, the pattern will
|
||||
# be considered as the default ISO date format
|
||||
# If the datetime format is unix, the parse will use the corresponding
|
||||
# parsing logic
|
||||
if df is not None and not df.empty:
|
||||
if DTTM_ALIAS in df.columns:
|
||||
if timestamp_format in ('epoch_s', 'epoch_ms'):
|
||||
# Column has already been formatted as a timestamp.
|
||||
df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp)
|
||||
else:
|
||||
df[DTTM_ALIAS] = pd.to_datetime(
|
||||
df[DTTM_ALIAS], utc=False, format=timestamp_format)
|
||||
if self.datasource.offset:
|
||||
df[DTTM_ALIAS] += timedelta(hours=self.datasource.offset)
|
||||
df[DTTM_ALIAS] += query_object.time_shift
|
||||
|
||||
if self.enforce_numerical_metrics:
|
||||
self.df_metrics_to_num(df, query_object)
|
||||
|
||||
df.replace([np.inf, -np.inf], np.nan)
|
||||
df = self.handle_nulls(df)
|
||||
return {
|
||||
'query': result.query,
|
||||
'status': result.status,
|
||||
'error_message': result.error_message,
|
||||
'df': df,
|
||||
}
|
||||
|
||||
def df_metrics_to_num(self, df, query_object):
|
||||
"""Converting metrics to numeric when pandas.read_sql cannot"""
|
||||
metrics = [metric for metric in query_object.metrics]
|
||||
for col, dtype in df.dtypes.items():
|
||||
if dtype.type == np.object_ and col in metrics:
|
||||
df[col] = pd.to_numeric(df[col], errors='coerce')
|
||||
|
||||
def handle_nulls(self, df):
|
||||
fillna = self.get_fillna_for_columns(df.columns)
|
||||
return df.fillna(fillna)
|
||||
|
||||
def get_fillna_for_col(self, col):
|
||||
"""Returns the value to use as filler for a specific Column.type"""
|
||||
if col and col.is_string:
|
||||
return ' NULL'
|
||||
return self.default_fillna
|
||||
|
||||
def get_fillna_for_columns(self, columns=None):
|
||||
"""Returns a dict or scalar that can be passed to DataFrame.fillna"""
|
||||
if columns is None:
|
||||
return self.default_fillna
|
||||
columns_dict = {col.column_name: col for col in self.datasource.columns}
|
||||
fillna = {
|
||||
c: self.get_fillna_for_col(columns_dict.get(c))
|
||||
for c in columns
|
||||
}
|
||||
return fillna
|
||||
|
||||
def get_data(self, df):
|
||||
return df.to_dict(orient='records')
|
||||
|
||||
def get_single_payload(self, query_obj):
|
||||
"""Returns a payload of metadata and data"""
|
||||
payload = self.get_df_payload(query_obj)
|
||||
df = payload.get('df')
|
||||
status = payload.get('status')
|
||||
if status != utils.QueryStatus.FAILED:
|
||||
if df is not None and df.empty:
|
||||
payload['error'] = 'No data'
|
||||
else:
|
||||
payload['data'] = self.get_data(df)
|
||||
if 'df' in payload:
|
||||
del payload['df']
|
||||
return payload
|
||||
|
||||
def get_payload(self):
|
||||
"""Get all the paylaods from the arrays"""
|
||||
return [self.get_single_payload(query_ojbect) for query_ojbect in self.queries]
|
||||
|
||||
@property
|
||||
def cache_timeout(self):
|
||||
if self.custom_cache_timeout is not None:
|
||||
return self.custom_cache_timeout
|
||||
if self.datasource.cache_timeout is not None:
|
||||
return self.datasource.cache_timeout
|
||||
if (
|
||||
hasattr(self.datasource, 'database') and
|
||||
self.datasource.database.cache_timeout) is not None:
|
||||
return self.datasource.database.cache_timeout
|
||||
return config.get('CACHE_DEFAULT_TIMEOUT')
|
||||
|
||||
def get_df_payload(self, query_obj, **kwargs):
|
||||
"""Handles caching around the df paylod retrieval"""
|
||||
cache_key = query_obj.cache_key(
|
||||
datasource=self.datasource.uid, **kwargs) if query_obj else None
|
||||
logging.info('Cache key: {}'.format(cache_key))
|
||||
is_loaded = False
|
||||
stacktrace = None
|
||||
df = None
|
||||
cached_dttm = datetime.utcnow().isoformat().split('.')[0]
|
||||
cache_value = None
|
||||
status = None
|
||||
query = ''
|
||||
error_message = None
|
||||
if cache_key and cache and not self.force:
|
||||
cache_value = cache.get(cache_key)
|
||||
if cache_value:
|
||||
stats_logger.incr('loaded_from_cache')
|
||||
try:
|
||||
cache_value = pkl.loads(cache_value)
|
||||
df = cache_value['df']
|
||||
query = cache_value['query']
|
||||
status = utils.QueryStatus.SUCCESS
|
||||
is_loaded = True
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
logging.error('Error reading cache: ' +
|
||||
utils.error_msg_from_exception(e))
|
||||
logging.info('Serving from cache')
|
||||
|
||||
if query_obj and not is_loaded:
|
||||
try:
|
||||
query_result = self.get_query_result(query_obj)
|
||||
status = query_result['status']
|
||||
query = query_result['query']
|
||||
error_message = query_result['error_message']
|
||||
df = query_result['df']
|
||||
if status != utils.QueryStatus.FAILED:
|
||||
stats_logger.incr('loaded_from_source')
|
||||
is_loaded = True
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
if not error_message:
|
||||
error_message = '{}'.format(e)
|
||||
status = utils.QueryStatus.FAILED
|
||||
stacktrace = traceback.format_exc()
|
||||
|
||||
if (
|
||||
is_loaded and
|
||||
cache_key and
|
||||
cache and
|
||||
status != utils.QueryStatus.FAILED):
|
||||
try:
|
||||
cache_value = dict(
|
||||
dttm=cached_dttm,
|
||||
df=df if df is not None else None,
|
||||
query=query,
|
||||
)
|
||||
cache_value = pkl.dumps(
|
||||
cache_value, protocol=pkl.HIGHEST_PROTOCOL)
|
||||
|
||||
logging.info('Caching {} chars at key {}'.format(
|
||||
len(cache_value), cache_key))
|
||||
|
||||
stats_logger.incr('set_cache_key')
|
||||
cache.set(
|
||||
cache_key,
|
||||
cache_value,
|
||||
timeout=self.cache_timeout)
|
||||
except Exception as e:
|
||||
# cache.set call can fail if the backend is down or if
|
||||
# the key is too large or whatever other reasons
|
||||
logging.warning('Could not cache key {}'.format(cache_key))
|
||||
logging.exception(e)
|
||||
cache.delete(cache_key)
|
||||
return {
|
||||
'cache_key': cache_key,
|
||||
'cached_dttm': cache_value['dttm'] if cache_value is not None else None,
|
||||
'cache_timeout': self.cache_timeout,
|
||||
'df': df,
|
||||
'error': error_message,
|
||||
'is_cached': cache_key is not None,
|
||||
'query': query,
|
||||
'status': status,
|
||||
'stacktrace': stacktrace,
|
||||
'rowcount': len(df.index) if df is not None else 0,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,15 +15,17 @@
|
|||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# pylint: disable=R
|
||||
from typing import Dict, List, Optional
|
||||
import hashlib
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
import simplejson as json
|
||||
|
||||
from superset import app
|
||||
from superset.utils import core as utils
|
||||
|
||||
|
||||
# TODO: Type Metrics dictionary with TypedDict when it becomes a vanilla python type
|
||||
# https://github.com/python/mypy/issues/5288
|
||||
Metric = Dict
|
||||
|
||||
|
||||
class QueryObject:
|
||||
"""
|
||||
|
|
@ -33,31 +35,87 @@ class QueryObject:
|
|||
def __init__(
|
||||
self,
|
||||
granularity: str,
|
||||
metrics: List[Union[Dict, str]],
|
||||
groupby: List[str] = None,
|
||||
metrics: List[Metric] = None,
|
||||
filters: List[str] = None,
|
||||
time_range: Optional[str] = None,
|
||||
time_shift: Optional[str] = None,
|
||||
is_timeseries: bool = False,
|
||||
timeseries_limit: int = 0,
|
||||
row_limit: int = app.config.get('ROW_LIMIT'),
|
||||
limit: int = 0,
|
||||
timeseries_limit_metric: Optional[Metric] = None,
|
||||
timeseries_limit_metric: Optional[Dict] = None,
|
||||
order_desc: bool = True,
|
||||
extras: Optional[Dict] = None,
|
||||
prequeries: Optional[Dict] = None,
|
||||
is_prequery: bool = False,
|
||||
columns: List[str] = None,
|
||||
orderby: List[List] = None,
|
||||
):
|
||||
self.granularity = granularity
|
||||
self.from_dttm, self.to_dttm = utils.get_since_until(time_range, time_shift)
|
||||
self.is_timeseries = is_timeseries
|
||||
self.groupby = groupby or []
|
||||
self.metrics = metrics or []
|
||||
self.filter = filters or []
|
||||
self.time_range = time_range
|
||||
self.time_shift = utils.parse_human_timedelta(time_shift)
|
||||
self.groupby = groupby if groupby is not None else []
|
||||
|
||||
# Temporal solution for backward compatability issue
|
||||
# due the new format of non-ad-hoc metric.
|
||||
self.metrics = [metric if 'expressionType' in metric else metric['label']
|
||||
for metric in metrics]
|
||||
self.row_limit = row_limit
|
||||
self.timeseries_limit = int(limit)
|
||||
self.filter = filters if filters is not None else []
|
||||
self.timeseries_limit = timeseries_limit
|
||||
self.timeseries_limit_metric = timeseries_limit_metric
|
||||
self.order_desc = order_desc
|
||||
self.prequeries = []
|
||||
self.is_prequery = False
|
||||
self.extras = extras
|
||||
self.prequeries = prequeries
|
||||
self.is_prequery = is_prequery
|
||||
self.extras = extras if extras is not None else {}
|
||||
self.columns = columns if columns is not None else []
|
||||
self.orderby = orderby if orderby is not None else []
|
||||
|
||||
def to_dict(self):
|
||||
raise NotImplementedError()
|
||||
query_object_dict = {
|
||||
'granularity': self.granularity,
|
||||
'from_dttm': self.from_dttm,
|
||||
'to_dttm': self.to_dttm,
|
||||
'is_timeseries': self.is_timeseries,
|
||||
'groupby': self.groupby,
|
||||
'metrics': self.metrics,
|
||||
'row_limit': self.row_limit,
|
||||
'filter': self.filter,
|
||||
'timeseries_limit': self.timeseries_limit,
|
||||
'timeseries_limit_metric': self.timeseries_limit_metric,
|
||||
'order_desc': self.order_desc,
|
||||
'prequeries': self.prequeries,
|
||||
'is_prequery': self.is_prequery,
|
||||
'extras': self.extras,
|
||||
'columns': self.columns,
|
||||
'orderby': self.orderby,
|
||||
}
|
||||
return query_object_dict
|
||||
|
||||
def cache_key(self, **extra):
|
||||
"""
|
||||
The cache key is made out of the key/values in `query_obj`, plus any
|
||||
other key/values in `extra`
|
||||
We remove datetime bounds that are hard values, and replace them with
|
||||
the use-provided inputs to bounds, which may be time-relative (as in
|
||||
"5 days ago" or "now").
|
||||
"""
|
||||
cache_dict = self.to_dict()
|
||||
cache_dict.update(extra)
|
||||
|
||||
for k in ['from_dttm', 'to_dttm']:
|
||||
del cache_dict[k]
|
||||
if self.time_range:
|
||||
cache_dict['time_range'] = self.time_range
|
||||
json_data = self.json_dumps(cache_dict, sort_keys=True)
|
||||
return hashlib.md5(json_data.encode('utf-8')).hexdigest()
|
||||
|
||||
def json_dumps(self, obj, sort_keys=False):
|
||||
return json.dumps(
|
||||
obj,
|
||||
default=utils.json_int_dttm_ser,
|
||||
ignore_nan=True,
|
||||
sort_keys=sort_keys,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -15,16 +15,18 @@
|
|||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# pylint: disable=R
|
||||
import json
|
||||
|
||||
from flask import g, request
|
||||
from flask import request
|
||||
from flask_appbuilder import expose
|
||||
from flask_appbuilder.security.decorators import has_access_api
|
||||
import simplejson as json
|
||||
|
||||
from superset import appbuilder, security_manager
|
||||
from superset import appbuilder, db, security_manager
|
||||
from superset.common.query_context import QueryContext
|
||||
from superset.legacy import update_time_range
|
||||
import superset.models.core as models
|
||||
from superset.models.core import Log
|
||||
from .base import api, BaseSupersetView, data_payload_response, handle_api_exception
|
||||
from superset.utils import core as utils
|
||||
from .base import api, BaseSupersetView, handle_api_exception
|
||||
|
||||
|
||||
class Api(BaseSupersetView):
|
||||
|
|
@ -37,11 +39,37 @@ class Api(BaseSupersetView):
|
|||
"""
|
||||
Takes a query_obj constructed in the client and returns payload data response
|
||||
for the given query_obj.
|
||||
params: query_context: json_blob
|
||||
"""
|
||||
query_context = QueryContext(**json.loads(request.form.get('query_context')))
|
||||
security_manager.assert_datasource_permission(query_context.datasource, g.user)
|
||||
payload_json = query_context.get_data()
|
||||
return data_payload_response(payload_json)
|
||||
security_manager.assert_datasource_permission(query_context.datasource)
|
||||
payload_json = query_context.get_payload()
|
||||
return json.dumps(
|
||||
payload_json,
|
||||
default=utils.json_int_dttm_ser,
|
||||
ignore_nan=True,
|
||||
)
|
||||
|
||||
@Log.log_this
|
||||
@api
|
||||
@handle_api_exception
|
||||
@has_access_api
|
||||
@expose('/v1/form_data/', methods=['GET'])
|
||||
def query_form_data(self):
|
||||
"""
|
||||
Get the formdata stored in the database for existing slice.
|
||||
params: slice_id: integer
|
||||
"""
|
||||
form_data = {}
|
||||
slice_id = request.args.get('slice_id')
|
||||
if slice_id:
|
||||
slc = db.session.query(models.Slice).filter_by(id=slice_id).one_or_none()
|
||||
if slc:
|
||||
form_data = slc.form_data.copy()
|
||||
|
||||
update_time_range(form_data)
|
||||
|
||||
return json.dumps(form_data)
|
||||
|
||||
|
||||
appbuilder.add_view_no_menu(Api)
|
||||
|
|
|
|||
Loading…
Reference in New Issue