From ff9506fec229a170b0ad7cdacb1fccfd8eff6ef3 Mon Sep 17 00:00:00 2001 From: agrawaldevesh Date: Tue, 5 Feb 2019 17:04:19 -0800 Subject: [PATCH] Adding support for Pinot (#6719) Summary: Added limited support for visualizations with Pinot via Sqlalchemy. Pinot QL (PQL) is a bit weird and limited, and this patch hacks superset to deal with that weirdness: 1. Pinot's grouping by time is best done as a long epoch. Grouping by a time string is really slow and times out. 2. Pinot's response does not respect column aliases. So columns are not named what they are expected to. So we remember the given column aliases and then stamp them back onto the dataframe 3. Pinot's Json rest call does not return the output types. Instead everything is cast to string. So when grouping by time, the group key is integral and has to be treated specially when casting back to the dataframe __timestamp column. 4. Finally, pinot does support grouping by on expressions. But those expressions cannot then appear on the select clause. They are returned regardless in the response. ie, 'select foo, count(*) from bar group by foo' is okay, but 'select expr(foo), count(*) from bar group by expr(foo)' ain't. One must use 'select count(*) from bar group by expr(foo)'. I also fixed a couple of things that looked like bugs to me: for example, the row-ordering-limit should come at the end always. Test Plan: Tested with the modified pinotdb sqlalchemy driver and an internal pinot cluster. The pinotdb driver changes are in https://github.com/agrawaldevesh/pinot-dbapi. Pinot does not support orderby-limit for aggregated queries. To annotate a query as an aggregate query, this patch adds a hint to the prepared select statement that the pinotdb sqlalchemy driver then heeds. --- superset/connectors/sqla/models.py | 63 +++++++++++++---------- superset/db_engine_specs.py | 80 ++++++++++++++++++++++++++++++ superset/viz.py | 19 ++++++- 3 files changed, 135 insertions(+), 27 deletions(-) diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index 64b38cfd5..86d927393 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=C,R,W -from collections import OrderedDict +from collections import namedtuple, OrderedDict from datetime import datetime import logging @@ -46,6 +46,9 @@ from superset.utils import core as utils, import_datasource config = app.config metadata = Model.metadata # pylint: disable=no-member +SqlaQuery = namedtuple('SqlaQuery', ['sqla_query', 'labels_expected']) +QueryStringExtended = namedtuple('QueryStringExtended', ['sql', 'labels_expected']) + class AnnotationDatasource(BaseDatasource): """ Dummy object so we can query annotations using 'Viz' objects just like @@ -141,23 +144,20 @@ class TableColumn(Model, BaseColumn): """Getting the time component of the query""" label = self.table.get_label(utils.DTTM_ALIAS) + db = self.table.database pdf = self.python_date_format is_epoch = pdf in ('epoch_s', 'epoch_ms') if not self.expression and not time_grain and not is_epoch: return column(self.column_name, type_=DateTime).label(label) - - expr = self.expression or self.column_name - if is_epoch: - # if epoch, translate to DATE using db specific conf - db_spec = self.table.database.db_engine_spec - if pdf == 'epoch_s': - expr = db_spec.epoch_to_dttm().format(col=expr) - elif pdf == 'epoch_ms': - expr = db_spec.epoch_ms_to_dttm().format(col=expr) + grain = None if time_grain: - grain = self.table.database.grains_dict().get(time_grain) - if grain: - expr = grain.function.format(col=expr) + grain = db.grains_dict().get(time_grain) + if not grain: + raise NotImplementedError( + f'No grain spec for {time_grain} for database {db.database_name}') + expr = db.db_engine_spec.get_time_expr( + self.expression or self.column_name, + pdf, time_grain, grain) return literal_column(expr, type_=DateTime).label(label) @classmethod @@ -476,15 +476,18 @@ class SqlaTable(Model, BaseDatasource): return get_template_processor( table=self, database=self.database, **kwargs) - def get_query_str(self, query_obj): - qry = self.get_sqla_query(**query_obj) - sql = self.database.compile_sqla_query(qry) + def get_query_str_extended(self, query_obj): + sqlaq = self.get_sqla_query(**query_obj) + sql = self.database.compile_sqla_query(sqlaq.sqla_query) logging.info(sql) sql = sqlparse.format(sql, reindent=True) if query_obj['is_prequery']: query_obj['prequeries'].append(sql) sql = self.mutate_query_from_config(sql) - return sql + return QueryStringExtended(labels_expected=sqlaq.labels_expected, sql=sql) + + def get_query_str(self, query_obj): + return self.get_query_str_extended(query_obj).sql def get_sqla_table(self): tbl = table(self.table_name) @@ -517,12 +520,11 @@ class SqlaTable(Model, BaseDatasource): if expression_type == utils.ADHOC_METRIC_EXPRESSION_TYPES['SIMPLE']: column_name = metric.get('column').get('column_name') - sqla_column = column(column_name) table_column = cols.get(column_name) - if table_column: sqla_column = table_column.get_sqla_col() - + else: + sqla_column = column(column_name) sqla_metric = self.sqla_aggregations[metric.get('aggregate')](sqla_column) sqla_metric = sqla_metric.label(label) return sqla_metric @@ -551,7 +553,7 @@ class SqlaTable(Model, BaseDatasource): order_desc=True, prequeries=None, is_prequery=False, - ): + ): """Querying any sqla table from this common interface""" template_kwargs = { 'from_dttm': from_dttm, @@ -640,6 +642,12 @@ class SqlaTable(Model, BaseDatasource): time_filters.append(dttm_col.get_time_filter(from_dttm, to_dttm)) select_exprs += metrics_exprs + + labels_expected = [str(c.name) for c in select_exprs] + + select_exprs = db_engine_spec.make_select_compatible( + groupby_exprs_with_timestamp.values(), + select_exprs) qry = sa.select(select_exprs) tbl = self.get_from_clause(template_processor) @@ -793,7 +801,8 @@ class SqlaTable(Model, BaseDatasource): groupby_exprs_sans_timestamp) qry = qry.where(top_groups) - return qry.select_from(tbl) + return SqlaQuery(sqla_query=qry.select_from(tbl), + labels_expected=labels_expected) def _get_top_groups(self, df, dimensions, groupby_exprs): groups = [] @@ -807,19 +816,21 @@ class SqlaTable(Model, BaseDatasource): def query(self, query_obj): qry_start_dttm = datetime.now() - sql = self.get_query_str(query_obj) + query_str_ext = self.get_query_str_extended(query_obj) + sql = query_str_ext.sql status = utils.QueryStatus.SUCCESS error_message = None df = None + db_engine_spec = self.database.db_engine_spec try: df = self.database.get_df(sql, self.schema) if self.mutated_labels: df = df.rename(index=str, columns=self.mutated_labels) + db_engine_spec.mutate_df_columns(df, sql, query_str_ext.labels_expected) except Exception as e: status = utils.QueryStatus.FAILED - logging.exception(e) - error_message = ( - self.database.db_engine_spec.extract_error_message(e)) + logging.exception(f'Query {sql} on schema {self.schema} failed') + error_message = db_engine_spec.extract_error_message(e) # if this is a main query with prequeries, combine them together if not query_obj['is_prequery']: diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 3ae433c98..40d1ea520 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -114,6 +114,18 @@ class BaseEngineSpec(object): force_column_alias_quotes = False arraysize = None + @classmethod + def get_time_expr(cls, expr, pdf, time_grain, grain): + # if epoch, translate to DATE using db specific conf + if pdf == 'epoch_s': + expr = cls.epoch_to_dttm().format(col=expr) + elif pdf == 'epoch_ms': + expr = cls.epoch_ms_to_dttm().format(col=expr) + + if grain: + expr = grain.function.format(col=expr) + return expr + @classmethod def get_time_grains(cls): blacklist = config.get('TIME_GRAIN_BLACKLIST', []) @@ -124,6 +136,16 @@ class BaseEngineSpec(object): grain_functions.update(grain_addon_functions.get(cls.engine, {})) return _create_time_grains_tuple(grains, grain_functions, blacklist) + @classmethod + def make_select_compatible(cls, groupby_exprs, select_exprs): + # Some databases will just return the group-by field into the select, but don't + # allow the group-by field to be put into the select list. + return select_exprs + + @classmethod + def mutate_df_columns(cls, df, sql, labels_expected): + pass + @classmethod def fetch_data(cls, cursor, limit): if cls.arraysize: @@ -1413,6 +1435,64 @@ class AthenaEngineSpec(BaseEngineSpec): return 'from_unixtime({col})' +class PinotEngineSpec(BaseEngineSpec): + engine = 'pinot' + allows_subquery = False + inner_joins = False + + _time_grain_to_datetimeconvert = { + 'PT1S': '1:SECONDS', + 'PT1M': '1:MINUTES', + 'PT1H': '1:HOURS', + 'P1D': '1:DAYS', + 'P1Y': '1:YEARS', + 'P1M': '1:MONTHS', + } + + # Pinot does its own conversion below + time_grain_functions = {k: None for k in _time_grain_to_datetimeconvert.keys()} + + @classmethod + def get_time_expr(cls, expr, pdf, time_grain, grain): + is_epoch = pdf in ('epoch_s', 'epoch_ms') + if not is_epoch: + raise NotImplementedError('Pinot currently only supports epochs') + # The DATETIMECONVERT pinot udf is documented at + # Per https://github.com/apache/incubator-pinot/wiki/dateTimeConvert-UDF + # We are not really converting any time units, just bucketing them. + seconds_or_ms = 'MILLISECONDS' if pdf == 'epoch_ms' else 'SECONDS' + tf = f'1:{seconds_or_ms}:EPOCH' + granularity = cls._time_grain_to_datetimeconvert.get(time_grain) + if not granularity: + raise NotImplementedError('No pinot grain spec for ' + str(time_grain)) + # In pinot the output is a string since there is no timestamp column like pg + return f'DATETIMECONVERT({expr}, "{tf}", "{tf}", "{granularity}")' + + @classmethod + def make_select_compatible(cls, groupby_exprs, select_exprs): + # Pinot does not want the group by expr's to appear in the select clause + select_sans_groupby = [] + # We want identity and not equality, so doing the filtering manually + for s in select_exprs: + for gr in groupby_exprs: + if s is gr: + break + else: + select_sans_groupby.append(s) + return select_sans_groupby + + @classmethod + def mutate_df_columns(cls, df, sql, labels_expected): + if df is not None and \ + not df.empty and \ + labels_expected is not None: + if len(df.columns) != len(labels_expected): + raise Exception(f'For {sql}, df.columns: {df.columns}' + f' differs from {labels_expected}') + else: + df.columns = labels_expected + + class ClickHouseEngineSpec(BaseEngineSpec): """Dialect for ClickHouse analytical DB.""" diff --git a/superset/viz.py b/superset/viz.py index 51fab12b5..a88860e25 100644 --- a/superset/viz.py +++ b/superset/viz.py @@ -96,6 +96,8 @@ class BaseViz(object): self.time_shift = timedelta() self.status = None + self.error_msg = '' + self.results = None self.error_message = None self.force = force @@ -226,7 +228,22 @@ class BaseViz(object): if DTTM_ALIAS in df.columns: if timestamp_format in ('epoch_s', 'epoch_ms'): # Column has already been formatted as a timestamp. - df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp) + dttm_col = df[DTTM_ALIAS] + one_ts_val = dttm_col[0] + + # convert time column to pandas Timestamp, but different + # ways to convert depending on string or int types + try: + int(one_ts_val) + is_integral = True + except ValueError: + is_integral = False + if is_integral: + unit = 's' if timestamp_format == 'epoch_s' else 'ms' + df[DTTM_ALIAS] = pd.to_datetime(dttm_col, utc=False, unit=unit, + origin='unix') + else: + df[DTTM_ALIAS] = dttm_col.apply(pd.Timestamp) else: df[DTTM_ALIAS] = pd.to_datetime( df[DTTM_ALIAS], utc=False, format=timestamp_format)