Allowing for druid post aggregations (#418)
* Druid post aggregations * Fixing tests
This commit is contained in:
parent
0ca3f5ec80
commit
26d273643b
|
|
@ -24,6 +24,7 @@ from flask.ext.appbuilder.models.mixins import AuditMixin
|
|||
from pydruid.client import PyDruid
|
||||
from flask.ext.appbuilder.models.decorators import renders
|
||||
from pydruid.utils.filters import Dimension, Filter
|
||||
from pydruid.utils.postaggregator import Postaggregator
|
||||
from six import string_types
|
||||
from sqlalchemy import (
|
||||
Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Date,
|
||||
|
|
@ -44,6 +45,17 @@ config = app.config
|
|||
QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration'])
|
||||
|
||||
|
||||
class JavascriptPostAggregator(Postaggregator):
|
||||
def __init__(self, name, field_names, function):
|
||||
self.post_aggregator = {
|
||||
'type': 'javascript',
|
||||
'fieldNames': field_names,
|
||||
'name': name,
|
||||
'function': function,
|
||||
}
|
||||
self.name = name
|
||||
|
||||
|
||||
class AuditMixinNullable(AuditMixin):
|
||||
|
||||
"""Altering the AuditMixin to use nullable fields
|
||||
|
|
@ -319,6 +331,10 @@ class Queryable(object):
|
|||
def dttm_cols(self):
|
||||
return []
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return '/{}/edit/{}'.format(self.baselink, self.id)
|
||||
|
||||
|
||||
class Database(Model, AuditMixinNullable):
|
||||
|
||||
|
|
@ -467,10 +483,6 @@ class SqlaTable(Model, Queryable, AuditMixinNullable):
|
|||
def description_markeddown(self):
|
||||
return utils.markdown(self.description)
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return '/tablemodelview/edit/{}'.format(self.id)
|
||||
|
||||
@property
|
||||
def link(self):
|
||||
return '<a href="{self.url}">{self.table_name}</a>'.format(**locals())
|
||||
|
|
@ -896,7 +908,7 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
|
|||
|
||||
type = "druid"
|
||||
|
||||
baselink = "datasourcemodelview"
|
||||
baselink = "druiddatasourcemodelview"
|
||||
|
||||
__tablename__ = 'datasources'
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
|
@ -930,10 +942,6 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
|
|||
"[{obj.cluster_name}].[{obj.datasource_name}]"
|
||||
"(id:{obj.id})").format(obj=self)
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return '/datasourcemodelview/edit/{}'.format(self.id)
|
||||
|
||||
@property
|
||||
def link(self):
|
||||
return (
|
||||
|
|
@ -1047,9 +1055,34 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
|
|||
to_dttm = to_dttm.replace(tzinfo=config.get("DRUID_TZ"))
|
||||
|
||||
query_str = ""
|
||||
metrics_dict = {m.metric_name: m for m in self.metrics}
|
||||
all_metrics = []
|
||||
post_aggs = {}
|
||||
for metric_name in metrics:
|
||||
metric = metrics_dict[metric_name]
|
||||
if metric.metric_type != 'postagg':
|
||||
all_metrics.append(metric_name)
|
||||
else:
|
||||
conf = metric.json_obj
|
||||
fields = conf.get('fields', [])
|
||||
all_metrics += [
|
||||
f.get('fieldName') for f in fields
|
||||
if f.get('type') == 'fieldAccess']
|
||||
all_metrics += conf.get('fieldNames', [])
|
||||
if conf.get('type') == 'javascript':
|
||||
post_aggs[metric_name] = JavascriptPostAggregator(
|
||||
name=conf.get('name'),
|
||||
field_names=conf.get('fieldNames'),
|
||||
function=conf.get('function'))
|
||||
else:
|
||||
post_aggs[metric_name] = Postaggregator(
|
||||
conf.get('fn', "/"),
|
||||
conf.get('fields', []),
|
||||
conf.get('name', ''))
|
||||
aggregations = {
|
||||
m.metric_name: m.json_obj
|
||||
for m in self.metrics if m.metric_name in metrics
|
||||
for m in self.metrics
|
||||
if m.metric_name in all_metrics
|
||||
}
|
||||
granularity = granularity or "all"
|
||||
if granularity != "all":
|
||||
|
|
@ -1067,6 +1100,7 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
|
|||
dimensions=groupby,
|
||||
aggregations=aggregations,
|
||||
granularity=granularity,
|
||||
post_aggregations=post_aggs,
|
||||
intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(),
|
||||
)
|
||||
filters = None
|
||||
|
|
@ -1171,7 +1205,6 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
|
|||
cols += ['timestamp']
|
||||
cols += [col for col in groupby if col in df.columns]
|
||||
cols += [col for col in metrics if col in df.columns]
|
||||
cols += [col for col in df.columns if col not in cols]
|
||||
df = df[cols]
|
||||
return QueryResult(
|
||||
df=df,
|
||||
|
|
|
|||
|
|
@ -159,14 +159,20 @@ class DruidMetricInlineView(CompactCRUDMixin, CaravelModelView): # noqa
|
|||
datamodel = SQLAInterface(models.DruidMetric)
|
||||
list_columns = ['metric_name', 'verbose_name', 'metric_type']
|
||||
edit_columns = [
|
||||
'metric_name', 'description', 'verbose_name', 'metric_type',
|
||||
'datasource', 'json']
|
||||
add_columns = [
|
||||
'metric_name', 'verbose_name', 'metric_type', 'datasource', 'json']
|
||||
'metric_name', 'description', 'verbose_name', 'metric_type', 'json',
|
||||
'datasource']
|
||||
add_columns = edit_columns
|
||||
page_size = 500
|
||||
validators_columns = {
|
||||
'json': [validate_json],
|
||||
}
|
||||
description_columns = {
|
||||
'metric_type': utils.markdown(
|
||||
"use `postagg` as the metric type if you are defining a "
|
||||
"[Druid Post Aggregation]"
|
||||
"(http://druid.io/docs/latest/querying/post-aggregations.html)",
|
||||
True),
|
||||
}
|
||||
appbuilder.add_view_no_menu(DruidMetricInlineView)
|
||||
|
||||
|
||||
|
|
@ -390,10 +396,7 @@ appbuilder.add_view(
|
|||
class DruidDatasourceModelView(CaravelModelView, DeleteMixin): # noqa
|
||||
datamodel = SQLAInterface(models.DruidDatasource)
|
||||
list_columns = [
|
||||
'datasource_link', 'cluster', 'owner',
|
||||
'creator', 'created_on',
|
||||
'changed_by_', 'changed_on',
|
||||
'offset']
|
||||
'datasource_link', 'cluster', 'changed_by_', 'modified', 'offset']
|
||||
related_views = [DruidColumnInlineView, DruidMetricInlineView]
|
||||
edit_columns = [
|
||||
'datasource_name', 'cluster', 'description', 'owner',
|
||||
|
|
|
|||
|
|
@ -260,7 +260,7 @@ class DruidTests(CaravelTestCase):
|
|||
df = pd.DataFrame(nres)
|
||||
instance.export_pandas.return_value = df
|
||||
instance.query_dict = {}
|
||||
resp = self.client.get('/caravel/explore/druid/1/?viz_type=table&granularity=one+day&druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&include_search=false&metrics=count&flt_col_0=dim1&flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&action=&datasource_name=test_datasource&datasource_id=1&datasource_type=druid&previous_viz_type=table&json=true&force=true')
|
||||
resp = self.client.get('/caravel/explore/druid/1/?viz_type=table&granularity=one+day&druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&include_search=false&metrics=count&groupby=name&flt_col_0=dim1&flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&action=&datasource_name=test_datasource&datasource_id=1&datasource_type=druid&previous_viz_type=table&json=true&force=true')
|
||||
print('-'*300)
|
||||
print(resp.data.decode('utf-8'))
|
||||
assert "Canada" in resp.data.decode('utf-8')
|
||||
|
|
|
|||
Loading…
Reference in New Issue