diff --git a/app/models.py b/app/models.py index 49eb37014..cb7e04d3b 100644 --- a/app/models.py +++ b/app/models.py @@ -37,6 +37,16 @@ class Database(Model, AuditMixin): def __repr__(self): return self.database_name + def get_sqla_engine(self): + return create_engine(self.sqlalchemy_uri) + + def get_table(self): + meta = MetaData() + return sqlaTable( + self.table_name, meta, + autoload=True, + autoload_with=self.get_sqla_engine()) + class Table(Model, AuditMixin, Queryable): __tablename__ = 'tables' @@ -48,6 +58,10 @@ class Table(Model, AuditMixin, Queryable): database = relationship( 'Database', backref='tables', foreign_keys=[database_id]) + @property + def name(self): + return self.table_name + @property def table_link(self): url = "/panoramix/table/{}/".format(self.id) @@ -57,18 +71,62 @@ class Table(Model, AuditMixin, Queryable): def metrics_combo(self): return sorted( [ - ( - 'sum__{}'.format(m.column_name), - 'SUM({})'.format(m.column_name), - ) - for m in self.columns if m.sum], + (m.metric_name, m.verbose_name) + for m in self.metrics], key=lambda x: x[1]) + def query( + self, groupby, metrics, + granularity, + from_dttm, to_dttm, + limit_spec=None, + filter=None, + is_timeseries=True, + timeseries_limit=15, row_limit=None): + from pandas import read_sql_query + metrics_exprs = [ + "{} AS {}".format(m.expression, m.metric_name) + for m in self.metrics if m.metric_name in metrics] + from_dttm_iso = from_dttm.isoformat() + to_dttm_iso = to_dttm.isoformat() + + select_exprs = [] + groupby_exprs = [] + + if groupby: + select_exprs = groupby + groupby_exprs = [s for s in groupby] + select_exprs += metrics_exprs + if granularity != "all": + select_exprs += ['ds as timestamp'] + groupby_exprs += ['ds'] + + select_exprs = ",\n".join(select_exprs) + groupby_exprs = ",\n".join(groupby_exprs) + + where_clause = [ + "ds >= '{from_dttm_iso}'", + "ds < '{to_dttm_iso}'" + ] + where_clause = " AND\n".join(where_clause).format(**locals()) + sql = """ + SELECT + {select_exprs} + FROM {self.table_name} + WHERE + {where_clause} + GROUP BY + {groupby_exprs} + """.format(**locals()) + df = read_sql_query( + sql=sql, + con=self.database.get_sqla_engine() + ) + return df + + def fetch_metadata(self): - engine = create_engine(self.database.sqlalchemy_uri) - meta = MetaData() - table = sqlaTable( - self.table_name, meta, autoload=True, autoload_with=engine) + table = self.database.get_table(self.table_name) TC = TableColumn for col in table.columns: dbcol = ( @@ -90,13 +148,28 @@ class Table(Model, AuditMixin, Queryable): db.session.commit() +class SqlMetric(Model): + __tablename__ = 'sql_metrics' + id = Column(Integer, primary_key=True) + metric_name = Column(String(512)) + verbose_name = Column(String(1024)) + metric_type = Column(String(32)) + table_id = Column( + String(256), + ForeignKey('tables.id')) + table = relationship( + 'Table', backref='metrics', foreign_keys=[table_id]) + expression = Column(Text) + description = Column(Text) + + class TableColumn(Model, AuditMixin): __tablename__ = 'table_columns' id = Column(Integer, primary_key=True) table_id = Column( String(256), ForeignKey('tables.id')) - table = relationship('Table', backref='columns') + table = relationship('Table', backref='columns', foreign_keys=[table_id]) column_name = Column(String(256)) is_dttm = Column(Boolean, default=True) is_active = Column(Boolean, default=True) @@ -138,11 +211,8 @@ class Cluster(Model, AuditMixin): ).format(self=self) datasources = json.loads(requests.get(endpoint).text) for datasource in datasources: - #try: - Datasource.sync_to_db(datasource, self) - #except Exception as e: - # logging.exception(e) - # logging.error("Failed at syncing " + datasource) + Datasource.sync_to_db(datasource, self) + class Datasource(Model, AuditMixin, Queryable): __tablename__ = 'datasources' @@ -165,6 +235,10 @@ class Datasource(Model, AuditMixin, Queryable): [(m.metric_name, m.verbose_name) for m in self.metrics], key=lambda x: x[1]) + @property + def name(self): + return self.datasource_name + def __repr__(self): return self.datasource_name @@ -227,7 +301,37 @@ class Datasource(Model, AuditMixin, Queryable): col_obj.datasource = datasource col_obj.generate_metrics() #session.commit() + def query( + self, groupby, metrics, + granularity, + from_dttm, to_dttm, + limit_spec=None, + filter=None, + is_timeseries=True, + timeseries_limit=15, row_limit=None): + aggregations = { + m.metric_name: m.json_obj + for m in self.metrics if m.metric_name in metrics + } + if not isinstance(granularity, basestring): + granularity = {"type": "duration", "duration": granularity} + + qry = dict( + datasource=self.datasource_name, + dimensions=groupby, + aggregations=aggregations, + granularity=granularity, + intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(), + ) + if filter: + qry['filter'] = filter + if limit_spec: + qry['limit_spec'] = limit_spec + client = self.cluster.get_pydruid_client() + client.groupby(**qry) + df = client.export_pandas() + return df class Metric(Model): diff --git a/app/templates/panoramix/datasource.html b/app/templates/panoramix/datasource.html index 0a53cc654..599d80efb 100644 --- a/app/templates/panoramix/datasource.html +++ b/app/templates/panoramix/datasource.html @@ -22,7 +22,7 @@ form input.form-control {

- {{ datasource.datasource_name }} + {{ datasource.name }} {% if datasource.description %} {% endif %} diff --git a/app/views.py b/app/views.py index c3ab428a6..7f6693814 100644 --- a/app/views.py +++ b/app/views.py @@ -62,6 +62,16 @@ class ColumnInlineView(CompactCRUDMixin, ModelView): appbuilder.add_view_no_menu(ColumnInlineView) +class SqlMetricInlineView(CompactCRUDMixin, ModelView): + datamodel = SQLAInterface(models.SqlMetric) + list_columns = ['metric_name', 'verbose_name', 'metric_type' ] + edit_columns = [ + 'metric_name', 'description', 'verbose_name', 'metric_type', + 'table', 'expression'] + add_columns = edit_columns + page_size = 100 +appbuilder.add_view_no_menu(SqlMetricInlineView) + class MetricInlineView(CompactCRUDMixin, ModelView): datamodel = SQLAInterface(models.Metric) @@ -115,7 +125,7 @@ class TableView(ModelView, DeleteMixin): list_columns = ['table_link', 'database'] add_columns = ['table_name', 'database'] edit_columns = add_columns - related_views = [TableColumnInlineView] + related_views = [TableColumnInlineView, SqlMetricInlineView] def post_insert(self, table): table.fetch_metadata() diff --git a/app/viz.py b/app/viz.py index 7bde8a373..2bcfcea55 100644 --- a/app/viz.py +++ b/app/viz.py @@ -15,30 +15,6 @@ CHART_ARGS = { 'target_div': 'chart', } -class BaseQuery(object): - def __init__( - self, groupby, metrics, filters, - is_timeseries, - timeseries_limit=15, row_limit=None): - self.groupby = groupby - self.metrics = metrics - self.filters = filters - self.is_timeseries = is_timeseries - self.timeseries_limit = timeseries_limit - self.row_limit = row_limit - - def run(self): - start = datetime.now() - self._execute() - self.duration = (datetime.now() - start).total_seconds() - - def _execution(self): - raise NotImplemented() - - def pandas_df(self): - raise NotImplemented() - - class OmgWtForm(Form): field_order = ( @@ -146,39 +122,39 @@ class BaseViz(object): filters = cond return filters + def bake_query(self): + return self.datasource.query(**self.query_obj()) + def query_obj(self): ds = self.datasource args = self.form_data groupby = args.getlist("groupby") or [] + metrics = args.getlist("metrics") or ['count'] granularity = args.get("granularity", "1 day") - granularity = utils.parse_human_timedelta(granularity).total_seconds() * 1000 - aggregations = { - m.metric_name: m.json_obj - for m in ds.metrics if m.metric_name in self.metrics - } + granularity = utils.parse_human_timedelta( + granularity).total_seconds() * 1000 limit = int( args.get("limit", config.ROW_LIMIT)) or config.ROW_LIMIT since = args.get("since", "1 year ago") from_dttm = utils.parse_human_datetime(since) if from_dttm > datetime.now(): from_dttm = datetime.now() - (from_dttm-datetime.now()) - from_dttm = from_dttm.isoformat() until = args.get("until", "now") - to_dttm = utils.parse_human_datetime(until).isoformat() + to_dttm = utils.parse_human_datetime(until) if from_dttm >= to_dttm: flash("The date range doesn't seem right.", "danger") from_dttm = to_dttm # Making them identicial to not raise d = { - 'datasource': ds.datasource_name, - 'granularity': {"type": "duration", "duration": granularity}, - 'intervals': from_dttm + '/' + to_dttm, - 'dimensions': groupby, - 'aggregations': aggregations, + 'granularity': granularity, + 'from_dttm': from_dttm, + 'to_dttm': to_dttm, + 'groupby': groupby, + 'metrics': metrics, 'limit_spec': { "type": "default", "limit": limit, "columns": [{ - "dimension": self.metrics[0], + "dimension": metrics[0] if metrics else self.metrics[0], "direction": "descending", }], }, @@ -188,17 +164,7 @@ class BaseViz(object): d['filter'] = filters return d - def bake_query(self): - client = self.datasource.cluster.get_pydruid_client() - client.groupby(**self.query_obj()) - return client.export_pandas() - - def get_query(self): - client = self.datasource.cluster.get_pydruid_client() - client.groupby(**self.query_obj()) - return client.query_dict - - def df_prep(self, ): + def df_prep(self): pass def form_prep(self): @@ -290,14 +256,12 @@ class TimeSeriesViz(HighchartsViz): """ Doing a 2 phase query where we limit the number of series. """ - client = self.datasource.cluster.get_pydruid_client() qry = self.query_obj() orig_filter = qry['filter'] if 'filter' in qry else '' qry['granularity'] = "all" - client.groupby(**qry) - df = client.export_pandas() + df = self.datasource.query(**qry) if not df is None: - dims = qry['dimensions'] + dims = qry['groupby'] filters = [] for index, row in df.iterrows(): fields = [] @@ -318,9 +282,8 @@ class TimeSeriesViz(HighchartsViz): qry['filter'] = Filter(type="and", fields=[ Filter.build_filter(ff), Filter.build_filter(orig_filter)]) - del qry['limit_spec'] - client.groupby(**qry) - return client.export_pandas() + qry['limit_spec'] = None + return self.datasource.query(**qry) class TimeSeriesCompareViz(TimeSeriesViz): verbose_name = "Time Series - Percent Change"