diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index 64b38cfd5..86d927393 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=C,R,W -from collections import OrderedDict +from collections import namedtuple, OrderedDict from datetime import datetime import logging @@ -46,6 +46,9 @@ from superset.utils import core as utils, import_datasource config = app.config metadata = Model.metadata # pylint: disable=no-member +SqlaQuery = namedtuple('SqlaQuery', ['sqla_query', 'labels_expected']) +QueryStringExtended = namedtuple('QueryStringExtended', ['sql', 'labels_expected']) + class AnnotationDatasource(BaseDatasource): """ Dummy object so we can query annotations using 'Viz' objects just like @@ -141,23 +144,20 @@ class TableColumn(Model, BaseColumn): """Getting the time component of the query""" label = self.table.get_label(utils.DTTM_ALIAS) + db = self.table.database pdf = self.python_date_format is_epoch = pdf in ('epoch_s', 'epoch_ms') if not self.expression and not time_grain and not is_epoch: return column(self.column_name, type_=DateTime).label(label) - - expr = self.expression or self.column_name - if is_epoch: - # if epoch, translate to DATE using db specific conf - db_spec = self.table.database.db_engine_spec - if pdf == 'epoch_s': - expr = db_spec.epoch_to_dttm().format(col=expr) - elif pdf == 'epoch_ms': - expr = db_spec.epoch_ms_to_dttm().format(col=expr) + grain = None if time_grain: - grain = self.table.database.grains_dict().get(time_grain) - if grain: - expr = grain.function.format(col=expr) + grain = db.grains_dict().get(time_grain) + if not grain: + raise NotImplementedError( + f'No grain spec for {time_grain} for database {db.database_name}') + expr = db.db_engine_spec.get_time_expr( + self.expression or self.column_name, + pdf, time_grain, grain) return literal_column(expr, type_=DateTime).label(label) @classmethod @@ -476,15 +476,18 @@ class SqlaTable(Model, BaseDatasource): return get_template_processor( table=self, database=self.database, **kwargs) - def get_query_str(self, query_obj): - qry = self.get_sqla_query(**query_obj) - sql = self.database.compile_sqla_query(qry) + def get_query_str_extended(self, query_obj): + sqlaq = self.get_sqla_query(**query_obj) + sql = self.database.compile_sqla_query(sqlaq.sqla_query) logging.info(sql) sql = sqlparse.format(sql, reindent=True) if query_obj['is_prequery']: query_obj['prequeries'].append(sql) sql = self.mutate_query_from_config(sql) - return sql + return QueryStringExtended(labels_expected=sqlaq.labels_expected, sql=sql) + + def get_query_str(self, query_obj): + return self.get_query_str_extended(query_obj).sql def get_sqla_table(self): tbl = table(self.table_name) @@ -517,12 +520,11 @@ class SqlaTable(Model, BaseDatasource): if expression_type == utils.ADHOC_METRIC_EXPRESSION_TYPES['SIMPLE']: column_name = metric.get('column').get('column_name') - sqla_column = column(column_name) table_column = cols.get(column_name) - if table_column: sqla_column = table_column.get_sqla_col() - + else: + sqla_column = column(column_name) sqla_metric = self.sqla_aggregations[metric.get('aggregate')](sqla_column) sqla_metric = sqla_metric.label(label) return sqla_metric @@ -551,7 +553,7 @@ class SqlaTable(Model, BaseDatasource): order_desc=True, prequeries=None, is_prequery=False, - ): + ): """Querying any sqla table from this common interface""" template_kwargs = { 'from_dttm': from_dttm, @@ -640,6 +642,12 @@ class SqlaTable(Model, BaseDatasource): time_filters.append(dttm_col.get_time_filter(from_dttm, to_dttm)) select_exprs += metrics_exprs + + labels_expected = [str(c.name) for c in select_exprs] + + select_exprs = db_engine_spec.make_select_compatible( + groupby_exprs_with_timestamp.values(), + select_exprs) qry = sa.select(select_exprs) tbl = self.get_from_clause(template_processor) @@ -793,7 +801,8 @@ class SqlaTable(Model, BaseDatasource): groupby_exprs_sans_timestamp) qry = qry.where(top_groups) - return qry.select_from(tbl) + return SqlaQuery(sqla_query=qry.select_from(tbl), + labels_expected=labels_expected) def _get_top_groups(self, df, dimensions, groupby_exprs): groups = [] @@ -807,19 +816,21 @@ class SqlaTable(Model, BaseDatasource): def query(self, query_obj): qry_start_dttm = datetime.now() - sql = self.get_query_str(query_obj) + query_str_ext = self.get_query_str_extended(query_obj) + sql = query_str_ext.sql status = utils.QueryStatus.SUCCESS error_message = None df = None + db_engine_spec = self.database.db_engine_spec try: df = self.database.get_df(sql, self.schema) if self.mutated_labels: df = df.rename(index=str, columns=self.mutated_labels) + db_engine_spec.mutate_df_columns(df, sql, query_str_ext.labels_expected) except Exception as e: status = utils.QueryStatus.FAILED - logging.exception(e) - error_message = ( - self.database.db_engine_spec.extract_error_message(e)) + logging.exception(f'Query {sql} on schema {self.schema} failed') + error_message = db_engine_spec.extract_error_message(e) # if this is a main query with prequeries, combine them together if not query_obj['is_prequery']: diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 3ae433c98..40d1ea520 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -114,6 +114,18 @@ class BaseEngineSpec(object): force_column_alias_quotes = False arraysize = None + @classmethod + def get_time_expr(cls, expr, pdf, time_grain, grain): + # if epoch, translate to DATE using db specific conf + if pdf == 'epoch_s': + expr = cls.epoch_to_dttm().format(col=expr) + elif pdf == 'epoch_ms': + expr = cls.epoch_ms_to_dttm().format(col=expr) + + if grain: + expr = grain.function.format(col=expr) + return expr + @classmethod def get_time_grains(cls): blacklist = config.get('TIME_GRAIN_BLACKLIST', []) @@ -124,6 +136,16 @@ class BaseEngineSpec(object): grain_functions.update(grain_addon_functions.get(cls.engine, {})) return _create_time_grains_tuple(grains, grain_functions, blacklist) + @classmethod + def make_select_compatible(cls, groupby_exprs, select_exprs): + # Some databases will just return the group-by field into the select, but don't + # allow the group-by field to be put into the select list. + return select_exprs + + @classmethod + def mutate_df_columns(cls, df, sql, labels_expected): + pass + @classmethod def fetch_data(cls, cursor, limit): if cls.arraysize: @@ -1413,6 +1435,64 @@ class AthenaEngineSpec(BaseEngineSpec): return 'from_unixtime({col})' +class PinotEngineSpec(BaseEngineSpec): + engine = 'pinot' + allows_subquery = False + inner_joins = False + + _time_grain_to_datetimeconvert = { + 'PT1S': '1:SECONDS', + 'PT1M': '1:MINUTES', + 'PT1H': '1:HOURS', + 'P1D': '1:DAYS', + 'P1Y': '1:YEARS', + 'P1M': '1:MONTHS', + } + + # Pinot does its own conversion below + time_grain_functions = {k: None for k in _time_grain_to_datetimeconvert.keys()} + + @classmethod + def get_time_expr(cls, expr, pdf, time_grain, grain): + is_epoch = pdf in ('epoch_s', 'epoch_ms') + if not is_epoch: + raise NotImplementedError('Pinot currently only supports epochs') + # The DATETIMECONVERT pinot udf is documented at + # Per https://github.com/apache/incubator-pinot/wiki/dateTimeConvert-UDF + # We are not really converting any time units, just bucketing them. + seconds_or_ms = 'MILLISECONDS' if pdf == 'epoch_ms' else 'SECONDS' + tf = f'1:{seconds_or_ms}:EPOCH' + granularity = cls._time_grain_to_datetimeconvert.get(time_grain) + if not granularity: + raise NotImplementedError('No pinot grain spec for ' + str(time_grain)) + # In pinot the output is a string since there is no timestamp column like pg + return f'DATETIMECONVERT({expr}, "{tf}", "{tf}", "{granularity}")' + + @classmethod + def make_select_compatible(cls, groupby_exprs, select_exprs): + # Pinot does not want the group by expr's to appear in the select clause + select_sans_groupby = [] + # We want identity and not equality, so doing the filtering manually + for s in select_exprs: + for gr in groupby_exprs: + if s is gr: + break + else: + select_sans_groupby.append(s) + return select_sans_groupby + + @classmethod + def mutate_df_columns(cls, df, sql, labels_expected): + if df is not None and \ + not df.empty and \ + labels_expected is not None: + if len(df.columns) != len(labels_expected): + raise Exception(f'For {sql}, df.columns: {df.columns}' + f' differs from {labels_expected}') + else: + df.columns = labels_expected + + class ClickHouseEngineSpec(BaseEngineSpec): """Dialect for ClickHouse analytical DB.""" diff --git a/superset/viz.py b/superset/viz.py index 51fab12b5..a88860e25 100644 --- a/superset/viz.py +++ b/superset/viz.py @@ -96,6 +96,8 @@ class BaseViz(object): self.time_shift = timedelta() self.status = None + self.error_msg = '' + self.results = None self.error_message = None self.force = force @@ -226,7 +228,22 @@ class BaseViz(object): if DTTM_ALIAS in df.columns: if timestamp_format in ('epoch_s', 'epoch_ms'): # Column has already been formatted as a timestamp. - df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp) + dttm_col = df[DTTM_ALIAS] + one_ts_val = dttm_col[0] + + # convert time column to pandas Timestamp, but different + # ways to convert depending on string or int types + try: + int(one_ts_val) + is_integral = True + except ValueError: + is_integral = False + if is_integral: + unit = 's' if timestamp_format == 'epoch_s' else 'ms' + df[DTTM_ALIAS] = pd.to_datetime(dttm_col, utc=False, unit=unit, + origin='unix') + else: + df[DTTM_ALIAS] = dttm_col.apply(pd.Timestamp) else: df[DTTM_ALIAS] = pd.to_datetime( df[DTTM_ALIAS], utc=False, format=timestamp_format)