Adding support for Pinot (#6719)

Summary: Added limited support for visualizations with Pinot via
Sqlalchemy.

Pinot QL (PQL) is a bit weird and limited, and this patch hacks superset to
deal with that weirdness:

1. Pinot's grouping by time is best done as a long epoch. Grouping by a
time string is really slow and times out.

2. Pinot's response does not respect column aliases. So columns are not
named what they are expected to. So we remember the given column aliases
and then stamp them back onto the dataframe

3. Pinot's Json rest call does not return the output types. Instead
everything is cast to string. So when grouping by time, the group key
is integral and has to be treated specially when casting back to the
dataframe __timestamp column.

4. Finally, pinot does support grouping by on expressions. But those
expressions cannot then appear on the select clause. They are returned
regardless in the response. ie, 'select foo, count(*) from bar group by
foo' is okay, but 'select expr(foo), count(*) from bar group by
expr(foo)' ain't. One must use 'select count(*) from bar group by
expr(foo)'.

I also fixed a couple of things that looked like bugs to me: for
example, the row-ordering-limit should come at the end always.

Test Plan: Tested with the modified pinotdb sqlalchemy driver and an
internal pinot cluster. The pinotdb driver changes are in
https://github.com/agrawaldevesh/pinot-dbapi.

Pinot does not support orderby-limit for aggregated queries. To annotate
a query as an aggregate query, this patch adds a hint to the prepared
select statement that the pinotdb sqlalchemy driver then heeds.
This commit is contained in:
agrawaldevesh 2019-02-05 17:04:19 -08:00 committed by Beto Dealmeida
parent 6e9130550d
commit ff9506fec2
3 changed files with 135 additions and 27 deletions

View File

@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=C,R,W
from collections import OrderedDict
from collections import namedtuple, OrderedDict
from datetime import datetime
import logging
@ -46,6 +46,9 @@ from superset.utils import core as utils, import_datasource
config = app.config
metadata = Model.metadata # pylint: disable=no-member
SqlaQuery = namedtuple('SqlaQuery', ['sqla_query', 'labels_expected'])
QueryStringExtended = namedtuple('QueryStringExtended', ['sql', 'labels_expected'])
class AnnotationDatasource(BaseDatasource):
""" Dummy object so we can query annotations using 'Viz' objects just like
@ -141,23 +144,20 @@ class TableColumn(Model, BaseColumn):
"""Getting the time component of the query"""
label = self.table.get_label(utils.DTTM_ALIAS)
db = self.table.database
pdf = self.python_date_format
is_epoch = pdf in ('epoch_s', 'epoch_ms')
if not self.expression and not time_grain and not is_epoch:
return column(self.column_name, type_=DateTime).label(label)
expr = self.expression or self.column_name
if is_epoch:
# if epoch, translate to DATE using db specific conf
db_spec = self.table.database.db_engine_spec
if pdf == 'epoch_s':
expr = db_spec.epoch_to_dttm().format(col=expr)
elif pdf == 'epoch_ms':
expr = db_spec.epoch_ms_to_dttm().format(col=expr)
grain = None
if time_grain:
grain = self.table.database.grains_dict().get(time_grain)
if grain:
expr = grain.function.format(col=expr)
grain = db.grains_dict().get(time_grain)
if not grain:
raise NotImplementedError(
f'No grain spec for {time_grain} for database {db.database_name}')
expr = db.db_engine_spec.get_time_expr(
self.expression or self.column_name,
pdf, time_grain, grain)
return literal_column(expr, type_=DateTime).label(label)
@classmethod
@ -476,15 +476,18 @@ class SqlaTable(Model, BaseDatasource):
return get_template_processor(
table=self, database=self.database, **kwargs)
def get_query_str(self, query_obj):
qry = self.get_sqla_query(**query_obj)
sql = self.database.compile_sqla_query(qry)
def get_query_str_extended(self, query_obj):
sqlaq = self.get_sqla_query(**query_obj)
sql = self.database.compile_sqla_query(sqlaq.sqla_query)
logging.info(sql)
sql = sqlparse.format(sql, reindent=True)
if query_obj['is_prequery']:
query_obj['prequeries'].append(sql)
sql = self.mutate_query_from_config(sql)
return sql
return QueryStringExtended(labels_expected=sqlaq.labels_expected, sql=sql)
def get_query_str(self, query_obj):
return self.get_query_str_extended(query_obj).sql
def get_sqla_table(self):
tbl = table(self.table_name)
@ -517,12 +520,11 @@ class SqlaTable(Model, BaseDatasource):
if expression_type == utils.ADHOC_METRIC_EXPRESSION_TYPES['SIMPLE']:
column_name = metric.get('column').get('column_name')
sqla_column = column(column_name)
table_column = cols.get(column_name)
if table_column:
sqla_column = table_column.get_sqla_col()
else:
sqla_column = column(column_name)
sqla_metric = self.sqla_aggregations[metric.get('aggregate')](sqla_column)
sqla_metric = sqla_metric.label(label)
return sqla_metric
@ -551,7 +553,7 @@ class SqlaTable(Model, BaseDatasource):
order_desc=True,
prequeries=None,
is_prequery=False,
):
):
"""Querying any sqla table from this common interface"""
template_kwargs = {
'from_dttm': from_dttm,
@ -640,6 +642,12 @@ class SqlaTable(Model, BaseDatasource):
time_filters.append(dttm_col.get_time_filter(from_dttm, to_dttm))
select_exprs += metrics_exprs
labels_expected = [str(c.name) for c in select_exprs]
select_exprs = db_engine_spec.make_select_compatible(
groupby_exprs_with_timestamp.values(),
select_exprs)
qry = sa.select(select_exprs)
tbl = self.get_from_clause(template_processor)
@ -793,7 +801,8 @@ class SqlaTable(Model, BaseDatasource):
groupby_exprs_sans_timestamp)
qry = qry.where(top_groups)
return qry.select_from(tbl)
return SqlaQuery(sqla_query=qry.select_from(tbl),
labels_expected=labels_expected)
def _get_top_groups(self, df, dimensions, groupby_exprs):
groups = []
@ -807,19 +816,21 @@ class SqlaTable(Model, BaseDatasource):
def query(self, query_obj):
qry_start_dttm = datetime.now()
sql = self.get_query_str(query_obj)
query_str_ext = self.get_query_str_extended(query_obj)
sql = query_str_ext.sql
status = utils.QueryStatus.SUCCESS
error_message = None
df = None
db_engine_spec = self.database.db_engine_spec
try:
df = self.database.get_df(sql, self.schema)
if self.mutated_labels:
df = df.rename(index=str, columns=self.mutated_labels)
db_engine_spec.mutate_df_columns(df, sql, query_str_ext.labels_expected)
except Exception as e:
status = utils.QueryStatus.FAILED
logging.exception(e)
error_message = (
self.database.db_engine_spec.extract_error_message(e))
logging.exception(f'Query {sql} on schema {self.schema} failed')
error_message = db_engine_spec.extract_error_message(e)
# if this is a main query with prequeries, combine them together
if not query_obj['is_prequery']:

View File

@ -114,6 +114,18 @@ class BaseEngineSpec(object):
force_column_alias_quotes = False
arraysize = None
@classmethod
def get_time_expr(cls, expr, pdf, time_grain, grain):
# if epoch, translate to DATE using db specific conf
if pdf == 'epoch_s':
expr = cls.epoch_to_dttm().format(col=expr)
elif pdf == 'epoch_ms':
expr = cls.epoch_ms_to_dttm().format(col=expr)
if grain:
expr = grain.function.format(col=expr)
return expr
@classmethod
def get_time_grains(cls):
blacklist = config.get('TIME_GRAIN_BLACKLIST', [])
@ -124,6 +136,16 @@ class BaseEngineSpec(object):
grain_functions.update(grain_addon_functions.get(cls.engine, {}))
return _create_time_grains_tuple(grains, grain_functions, blacklist)
@classmethod
def make_select_compatible(cls, groupby_exprs, select_exprs):
# Some databases will just return the group-by field into the select, but don't
# allow the group-by field to be put into the select list.
return select_exprs
@classmethod
def mutate_df_columns(cls, df, sql, labels_expected):
pass
@classmethod
def fetch_data(cls, cursor, limit):
if cls.arraysize:
@ -1413,6 +1435,64 @@ class AthenaEngineSpec(BaseEngineSpec):
return 'from_unixtime({col})'
class PinotEngineSpec(BaseEngineSpec):
engine = 'pinot'
allows_subquery = False
inner_joins = False
_time_grain_to_datetimeconvert = {
'PT1S': '1:SECONDS',
'PT1M': '1:MINUTES',
'PT1H': '1:HOURS',
'P1D': '1:DAYS',
'P1Y': '1:YEARS',
'P1M': '1:MONTHS',
}
# Pinot does its own conversion below
time_grain_functions = {k: None for k in _time_grain_to_datetimeconvert.keys()}
@classmethod
def get_time_expr(cls, expr, pdf, time_grain, grain):
is_epoch = pdf in ('epoch_s', 'epoch_ms')
if not is_epoch:
raise NotImplementedError('Pinot currently only supports epochs')
# The DATETIMECONVERT pinot udf is documented at
# Per https://github.com/apache/incubator-pinot/wiki/dateTimeConvert-UDF
# We are not really converting any time units, just bucketing them.
seconds_or_ms = 'MILLISECONDS' if pdf == 'epoch_ms' else 'SECONDS'
tf = f'1:{seconds_or_ms}:EPOCH'
granularity = cls._time_grain_to_datetimeconvert.get(time_grain)
if not granularity:
raise NotImplementedError('No pinot grain spec for ' + str(time_grain))
# In pinot the output is a string since there is no timestamp column like pg
return f'DATETIMECONVERT({expr}, "{tf}", "{tf}", "{granularity}")'
@classmethod
def make_select_compatible(cls, groupby_exprs, select_exprs):
# Pinot does not want the group by expr's to appear in the select clause
select_sans_groupby = []
# We want identity and not equality, so doing the filtering manually
for s in select_exprs:
for gr in groupby_exprs:
if s is gr:
break
else:
select_sans_groupby.append(s)
return select_sans_groupby
@classmethod
def mutate_df_columns(cls, df, sql, labels_expected):
if df is not None and \
not df.empty and \
labels_expected is not None:
if len(df.columns) != len(labels_expected):
raise Exception(f'For {sql}, df.columns: {df.columns}'
f' differs from {labels_expected}')
else:
df.columns = labels_expected
class ClickHouseEngineSpec(BaseEngineSpec):
"""Dialect for ClickHouse analytical DB."""

View File

@ -96,6 +96,8 @@ class BaseViz(object):
self.time_shift = timedelta()
self.status = None
self.error_msg = ''
self.results = None
self.error_message = None
self.force = force
@ -226,7 +228,22 @@ class BaseViz(object):
if DTTM_ALIAS in df.columns:
if timestamp_format in ('epoch_s', 'epoch_ms'):
# Column has already been formatted as a timestamp.
df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp)
dttm_col = df[DTTM_ALIAS]
one_ts_val = dttm_col[0]
# convert time column to pandas Timestamp, but different
# ways to convert depending on string or int types
try:
int(one_ts_val)
is_integral = True
except ValueError:
is_integral = False
if is_integral:
unit = 's' if timestamp_format == 'epoch_s' else 'ms'
df[DTTM_ALIAS] = pd.to_datetime(dttm_col, utc=False, unit=unit,
origin='unix')
else:
df[DTTM_ALIAS] = dttm_col.apply(pd.Timestamp)
else:
df[DTTM_ALIAS] = pd.to_datetime(
df[DTTM_ALIAS], utc=False, format=timestamp_format)