Support week_ending_saturday for Druid. (#1491)

* Support week_ending_saturday for Druid.

* Use period granularity

* Use ISO 8601 for period definitions.

* Fix tests

* More flexibility for the freeform choices.
This commit is contained in:
Bogdan 2016-11-02 18:45:10 -07:00 committed by GitHub
parent 1700a807e9
commit ae46561648
4 changed files with 71 additions and 16 deletions

View File

@ -356,6 +356,10 @@ class FormFactory(object):
('6 hour', _('6 hour')),
('1 day', _('1 day')),
('7 days', _('7 days')),
('week', _('week')),
('week_starting_sunday', _('week_starting_sunday')),
('week_ending_saturday', _('week_ending_saturday')),
('month', _('month')),
),
"description": _(
"The time granularity for the visualization. Note that you "

View File

@ -1604,7 +1604,9 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
return {
"time_columns": [
'all', '5 seconds', '30 seconds', '1 minute',
'5 minutes', '1 hour', '6 hour', '1 day', '7 days'
'5 minutes', '1 hour', '6 hour', '1 day', '7 days',
'week', 'week_starting_sunday', 'week_ending_saturday',
'month',
],
"time_grains": ['now']
}
@ -1793,6 +1795,56 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
col_obj.generate_metrics()
session.flush()
@staticmethod
def time_offset(granularity):
if granularity == 'week_ending_saturday':
return 6 * 24 * 3600 * 1000 # 6 days
return 0
# uses https://en.wikipedia.org/wiki/ISO_8601
# http://druid.io/docs/0.8.0/querying/granularities.html
# TODO: pass origin from the UI
@staticmethod
def granularity(period_name, timezone=None):
if not period_name or period_name == 'all':
return 'all'
iso_8601_dict = {
'5 seconds': 'PT5S',
'30 seconds': 'PT30S',
'1 minute': 'PT1M',
'5 minutes': 'PT5M',
'1 hour': 'PT1H',
'6 hour': 'PT6H',
'one day': 'P1D',
'1 day': 'P1D',
'7 days': 'P7D',
'week': 'P1W',
'week_starting_sunday': 'P1W',
'week_ending_saturday': 'P1W',
'month': 'P1M',
}
granularity = {'type': 'period'}
if timezone:
granularity['timezone'] = timezone
if period_name in iso_8601_dict:
granularity['period'] = iso_8601_dict[period_name]
if period_name in ('week_ending_saturday', 'week_starting_sunday'):
# use Sunday as start of the week
granularity['origin'] = '2016-01-03T00:00:00'
elif not isinstance(period_name, string_types):
granularity['type'] = 'duration'
granularity['duration'] = period_name
elif period_name.startswith('P'):
# identify if the string is the iso_8601 period
granularity['period'] = period_name
else:
granularity['type'] = 'duration'
granularity['duration'] = utils.parse_human_timedelta(
period_name).total_seconds() * 1000
return granularity
def query( # druid
self, groupby, metrics,
granularity,
@ -1820,6 +1872,7 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
# add tzinfo to native datetime with config
from_dttm = from_dttm.replace(tzinfo=config.get("DRUID_TZ"))
to_dttm = to_dttm.replace(tzinfo=config.get("DRUID_TZ"))
timezone = from_dttm.tzname()
query_str = ""
metrics_dict = {m.metric_name: m for m in self.metrics}
@ -1835,7 +1888,6 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
field_names.append(_f.get('fieldName'))
elif _type == 'arithmetic':
field_names += recursive_get_fields(_f)
return list(set(field_names))
for metric_name in metrics:
@ -1874,22 +1926,12 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
"Access to the metrics denied: " + ', '.join(rejected_metrics)
)
granularity = granularity or "all"
if granularity != "all":
granularity = utils.parse_human_timedelta(
granularity).total_seconds() * 1000
if not isinstance(granularity, string_types):
granularity = {"type": "duration", "duration": granularity}
origin = extras.get('druid_time_origin')
if origin:
dttm = utils.parse_human_datetime(origin)
granularity['origin'] = dttm.isoformat()
qry = dict(
datasource=self.datasource_name,
dimensions=groupby,
aggregations=aggregations,
granularity=granularity,
granularity=DruidDatasource.granularity(
granularity, timezone=timezone),
post_aggregations=post_aggs,
intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(),
)
@ -1995,6 +2037,16 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
cols += [col for col in groupby if col in df.columns]
cols += [col for col in metrics if col in df.columns]
df = df[cols]
time_offset = DruidDatasource.time_offset(granularity)
def increment_timestamp(ts):
dt = utils.parse_human_datetime(ts).replace(
tzinfo=config.get("DRUID_TZ"))
return dt + timedelta(milliseconds=time_offset)
if 'timestamp' in df.columns and time_offset:
df.timestamp = df.timestamp.apply(increment_timestamp)
return QueryResult(
df=df,
query=query_str,

View File

@ -128,7 +128,6 @@ class CaravelTestCase(unittest.TestCase):
return db.session.query(models.DruidDatasource).filter_by(
datasource_name=name).first()
def get_resp(self, url):
"""Shortcut to get the parsed results while following redirects"""
resp = self.client.get(url, follow_redirects=True)

View File

@ -115,7 +115,7 @@ class DruidTests(CaravelTestCase):
resp = self.get_resp('/caravel/explore/druid/{}/'.format(
datasource_id))
assert "[test_cluster].[test_datasource]" in resp
self.assertIn("[test_cluster].[test_datasource]", resp)
# One groupby
url = (