ALL OF SQL is coming along nicely
This commit is contained in:
parent
2d192d1ae8
commit
34acc90f3f
134
app/models.py
134
app/models.py
|
|
@ -37,6 +37,16 @@ class Database(Model, AuditMixin):
|
|||
def __repr__(self):
|
||||
return self.database_name
|
||||
|
||||
def get_sqla_engine(self):
|
||||
return create_engine(self.sqlalchemy_uri)
|
||||
|
||||
def get_table(self):
|
||||
meta = MetaData()
|
||||
return sqlaTable(
|
||||
self.table_name, meta,
|
||||
autoload=True,
|
||||
autoload_with=self.get_sqla_engine())
|
||||
|
||||
|
||||
class Table(Model, AuditMixin, Queryable):
|
||||
__tablename__ = 'tables'
|
||||
|
|
@ -48,6 +58,10 @@ class Table(Model, AuditMixin, Queryable):
|
|||
database = relationship(
|
||||
'Database', backref='tables', foreign_keys=[database_id])
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.table_name
|
||||
|
||||
@property
|
||||
def table_link(self):
|
||||
url = "/panoramix/table/{}/".format(self.id)
|
||||
|
|
@ -57,18 +71,62 @@ class Table(Model, AuditMixin, Queryable):
|
|||
def metrics_combo(self):
|
||||
return sorted(
|
||||
[
|
||||
(
|
||||
'sum__{}'.format(m.column_name),
|
||||
'SUM({})'.format(m.column_name),
|
||||
)
|
||||
for m in self.columns if m.sum],
|
||||
(m.metric_name, m.verbose_name)
|
||||
for m in self.metrics],
|
||||
key=lambda x: x[1])
|
||||
|
||||
def query(
|
||||
self, groupby, metrics,
|
||||
granularity,
|
||||
from_dttm, to_dttm,
|
||||
limit_spec=None,
|
||||
filter=None,
|
||||
is_timeseries=True,
|
||||
timeseries_limit=15, row_limit=None):
|
||||
from pandas import read_sql_query
|
||||
metrics_exprs = [
|
||||
"{} AS {}".format(m.expression, m.metric_name)
|
||||
for m in self.metrics if m.metric_name in metrics]
|
||||
from_dttm_iso = from_dttm.isoformat()
|
||||
to_dttm_iso = to_dttm.isoformat()
|
||||
|
||||
select_exprs = []
|
||||
groupby_exprs = []
|
||||
|
||||
if groupby:
|
||||
select_exprs = groupby
|
||||
groupby_exprs = [s for s in groupby]
|
||||
select_exprs += metrics_exprs
|
||||
if granularity != "all":
|
||||
select_exprs += ['ds as timestamp']
|
||||
groupby_exprs += ['ds']
|
||||
|
||||
select_exprs = ",\n".join(select_exprs)
|
||||
groupby_exprs = ",\n".join(groupby_exprs)
|
||||
|
||||
where_clause = [
|
||||
"ds >= '{from_dttm_iso}'",
|
||||
"ds < '{to_dttm_iso}'"
|
||||
]
|
||||
where_clause = " AND\n".join(where_clause).format(**locals())
|
||||
sql = """
|
||||
SELECT
|
||||
{select_exprs}
|
||||
FROM {self.table_name}
|
||||
WHERE
|
||||
{where_clause}
|
||||
GROUP BY
|
||||
{groupby_exprs}
|
||||
""".format(**locals())
|
||||
df = read_sql_query(
|
||||
sql=sql,
|
||||
con=self.database.get_sqla_engine()
|
||||
)
|
||||
return df
|
||||
|
||||
|
||||
def fetch_metadata(self):
|
||||
engine = create_engine(self.database.sqlalchemy_uri)
|
||||
meta = MetaData()
|
||||
table = sqlaTable(
|
||||
self.table_name, meta, autoload=True, autoload_with=engine)
|
||||
table = self.database.get_table(self.table_name)
|
||||
TC = TableColumn
|
||||
for col in table.columns:
|
||||
dbcol = (
|
||||
|
|
@ -90,13 +148,28 @@ class Table(Model, AuditMixin, Queryable):
|
|||
db.session.commit()
|
||||
|
||||
|
||||
class SqlMetric(Model):
|
||||
__tablename__ = 'sql_metrics'
|
||||
id = Column(Integer, primary_key=True)
|
||||
metric_name = Column(String(512))
|
||||
verbose_name = Column(String(1024))
|
||||
metric_type = Column(String(32))
|
||||
table_id = Column(
|
||||
String(256),
|
||||
ForeignKey('tables.id'))
|
||||
table = relationship(
|
||||
'Table', backref='metrics', foreign_keys=[table_id])
|
||||
expression = Column(Text)
|
||||
description = Column(Text)
|
||||
|
||||
|
||||
class TableColumn(Model, AuditMixin):
|
||||
__tablename__ = 'table_columns'
|
||||
id = Column(Integer, primary_key=True)
|
||||
table_id = Column(
|
||||
String(256),
|
||||
ForeignKey('tables.id'))
|
||||
table = relationship('Table', backref='columns')
|
||||
table = relationship('Table', backref='columns', foreign_keys=[table_id])
|
||||
column_name = Column(String(256))
|
||||
is_dttm = Column(Boolean, default=True)
|
||||
is_active = Column(Boolean, default=True)
|
||||
|
|
@ -138,11 +211,8 @@ class Cluster(Model, AuditMixin):
|
|||
).format(self=self)
|
||||
datasources = json.loads(requests.get(endpoint).text)
|
||||
for datasource in datasources:
|
||||
#try:
|
||||
Datasource.sync_to_db(datasource, self)
|
||||
#except Exception as e:
|
||||
# logging.exception(e)
|
||||
# logging.error("Failed at syncing " + datasource)
|
||||
Datasource.sync_to_db(datasource, self)
|
||||
|
||||
|
||||
class Datasource(Model, AuditMixin, Queryable):
|
||||
__tablename__ = 'datasources'
|
||||
|
|
@ -165,6 +235,10 @@ class Datasource(Model, AuditMixin, Queryable):
|
|||
[(m.metric_name, m.verbose_name) for m in self.metrics],
|
||||
key=lambda x: x[1])
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.datasource_name
|
||||
|
||||
def __repr__(self):
|
||||
return self.datasource_name
|
||||
|
||||
|
|
@ -227,7 +301,37 @@ class Datasource(Model, AuditMixin, Queryable):
|
|||
col_obj.datasource = datasource
|
||||
col_obj.generate_metrics()
|
||||
#session.commit()
|
||||
def query(
|
||||
self, groupby, metrics,
|
||||
granularity,
|
||||
from_dttm, to_dttm,
|
||||
limit_spec=None,
|
||||
filter=None,
|
||||
is_timeseries=True,
|
||||
timeseries_limit=15, row_limit=None):
|
||||
|
||||
aggregations = {
|
||||
m.metric_name: m.json_obj
|
||||
for m in self.metrics if m.metric_name in metrics
|
||||
}
|
||||
if not isinstance(granularity, basestring):
|
||||
granularity = {"type": "duration", "duration": granularity}
|
||||
|
||||
qry = dict(
|
||||
datasource=self.datasource_name,
|
||||
dimensions=groupby,
|
||||
aggregations=aggregations,
|
||||
granularity=granularity,
|
||||
intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(),
|
||||
)
|
||||
if filter:
|
||||
qry['filter'] = filter
|
||||
if limit_spec:
|
||||
qry['limit_spec'] = limit_spec
|
||||
client = self.cluster.get_pydruid_client()
|
||||
client.groupby(**qry)
|
||||
df = client.export_pandas()
|
||||
return df
|
||||
|
||||
|
||||
class Metric(Model):
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ form input.form-control {
|
|||
<div class="container-fluid">
|
||||
<div class="col-md-3">
|
||||
<h3>
|
||||
{{ datasource.datasource_name }}
|
||||
{{ datasource.name }}
|
||||
{% if datasource.description %}
|
||||
<i class="fa fa-info-circle" data-toggle="tooltip" data-placement="bottom" title="{{ datasource.description }}"></i>
|
||||
{% endif %}
|
||||
|
|
|
|||
12
app/views.py
12
app/views.py
|
|
@ -62,6 +62,16 @@ class ColumnInlineView(CompactCRUDMixin, ModelView):
|
|||
|
||||
appbuilder.add_view_no_menu(ColumnInlineView)
|
||||
|
||||
class SqlMetricInlineView(CompactCRUDMixin, ModelView):
|
||||
datamodel = SQLAInterface(models.SqlMetric)
|
||||
list_columns = ['metric_name', 'verbose_name', 'metric_type' ]
|
||||
edit_columns = [
|
||||
'metric_name', 'description', 'verbose_name', 'metric_type',
|
||||
'table', 'expression']
|
||||
add_columns = edit_columns
|
||||
page_size = 100
|
||||
appbuilder.add_view_no_menu(SqlMetricInlineView)
|
||||
|
||||
|
||||
class MetricInlineView(CompactCRUDMixin, ModelView):
|
||||
datamodel = SQLAInterface(models.Metric)
|
||||
|
|
@ -115,7 +125,7 @@ class TableView(ModelView, DeleteMixin):
|
|||
list_columns = ['table_link', 'database']
|
||||
add_columns = ['table_name', 'database']
|
||||
edit_columns = add_columns
|
||||
related_views = [TableColumnInlineView]
|
||||
related_views = [TableColumnInlineView, SqlMetricInlineView]
|
||||
|
||||
def post_insert(self, table):
|
||||
table.fetch_metadata()
|
||||
|
|
|
|||
73
app/viz.py
73
app/viz.py
|
|
@ -15,30 +15,6 @@ CHART_ARGS = {
|
|||
'target_div': 'chart',
|
||||
}
|
||||
|
||||
class BaseQuery(object):
|
||||
def __init__(
|
||||
self, groupby, metrics, filters,
|
||||
is_timeseries,
|
||||
timeseries_limit=15, row_limit=None):
|
||||
self.groupby = groupby
|
||||
self.metrics = metrics
|
||||
self.filters = filters
|
||||
self.is_timeseries = is_timeseries
|
||||
self.timeseries_limit = timeseries_limit
|
||||
self.row_limit = row_limit
|
||||
|
||||
def run(self):
|
||||
start = datetime.now()
|
||||
self._execute()
|
||||
self.duration = (datetime.now() - start).total_seconds()
|
||||
|
||||
def _execution(self):
|
||||
raise NotImplemented()
|
||||
|
||||
def pandas_df(self):
|
||||
raise NotImplemented()
|
||||
|
||||
|
||||
|
||||
class OmgWtForm(Form):
|
||||
field_order = (
|
||||
|
|
@ -146,39 +122,39 @@ class BaseViz(object):
|
|||
filters = cond
|
||||
return filters
|
||||
|
||||
def bake_query(self):
|
||||
return self.datasource.query(**self.query_obj())
|
||||
|
||||
def query_obj(self):
|
||||
ds = self.datasource
|
||||
args = self.form_data
|
||||
groupby = args.getlist("groupby") or []
|
||||
metrics = args.getlist("metrics") or ['count']
|
||||
granularity = args.get("granularity", "1 day")
|
||||
granularity = utils.parse_human_timedelta(granularity).total_seconds() * 1000
|
||||
aggregations = {
|
||||
m.metric_name: m.json_obj
|
||||
for m in ds.metrics if m.metric_name in self.metrics
|
||||
}
|
||||
granularity = utils.parse_human_timedelta(
|
||||
granularity).total_seconds() * 1000
|
||||
limit = int(
|
||||
args.get("limit", config.ROW_LIMIT)) or config.ROW_LIMIT
|
||||
since = args.get("since", "1 year ago")
|
||||
from_dttm = utils.parse_human_datetime(since)
|
||||
if from_dttm > datetime.now():
|
||||
from_dttm = datetime.now() - (from_dttm-datetime.now())
|
||||
from_dttm = from_dttm.isoformat()
|
||||
until = args.get("until", "now")
|
||||
to_dttm = utils.parse_human_datetime(until).isoformat()
|
||||
to_dttm = utils.parse_human_datetime(until)
|
||||
if from_dttm >= to_dttm:
|
||||
flash("The date range doesn't seem right.", "danger")
|
||||
from_dttm = to_dttm # Making them identicial to not raise
|
||||
d = {
|
||||
'datasource': ds.datasource_name,
|
||||
'granularity': {"type": "duration", "duration": granularity},
|
||||
'intervals': from_dttm + '/' + to_dttm,
|
||||
'dimensions': groupby,
|
||||
'aggregations': aggregations,
|
||||
'granularity': granularity,
|
||||
'from_dttm': from_dttm,
|
||||
'to_dttm': to_dttm,
|
||||
'groupby': groupby,
|
||||
'metrics': metrics,
|
||||
'limit_spec': {
|
||||
"type": "default",
|
||||
"limit": limit,
|
||||
"columns": [{
|
||||
"dimension": self.metrics[0],
|
||||
"dimension": metrics[0] if metrics else self.metrics[0],
|
||||
"direction": "descending",
|
||||
}],
|
||||
},
|
||||
|
|
@ -188,17 +164,7 @@ class BaseViz(object):
|
|||
d['filter'] = filters
|
||||
return d
|
||||
|
||||
def bake_query(self):
|
||||
client = self.datasource.cluster.get_pydruid_client()
|
||||
client.groupby(**self.query_obj())
|
||||
return client.export_pandas()
|
||||
|
||||
def get_query(self):
|
||||
client = self.datasource.cluster.get_pydruid_client()
|
||||
client.groupby(**self.query_obj())
|
||||
return client.query_dict
|
||||
|
||||
def df_prep(self, ):
|
||||
def df_prep(self):
|
||||
pass
|
||||
|
||||
def form_prep(self):
|
||||
|
|
@ -290,14 +256,12 @@ class TimeSeriesViz(HighchartsViz):
|
|||
"""
|
||||
Doing a 2 phase query where we limit the number of series.
|
||||
"""
|
||||
client = self.datasource.cluster.get_pydruid_client()
|
||||
qry = self.query_obj()
|
||||
orig_filter = qry['filter'] if 'filter' in qry else ''
|
||||
qry['granularity'] = "all"
|
||||
client.groupby(**qry)
|
||||
df = client.export_pandas()
|
||||
df = self.datasource.query(**qry)
|
||||
if not df is None:
|
||||
dims = qry['dimensions']
|
||||
dims = qry['groupby']
|
||||
filters = []
|
||||
for index, row in df.iterrows():
|
||||
fields = []
|
||||
|
|
@ -318,9 +282,8 @@ class TimeSeriesViz(HighchartsViz):
|
|||
qry['filter'] = Filter(type="and", fields=[
|
||||
Filter.build_filter(ff),
|
||||
Filter.build_filter(orig_filter)])
|
||||
del qry['limit_spec']
|
||||
client.groupby(**qry)
|
||||
return client.export_pandas()
|
||||
qry['limit_spec'] = None
|
||||
return self.datasource.query(**qry)
|
||||
|
||||
class TimeSeriesCompareViz(TimeSeriesViz):
|
||||
verbose_name = "Time Series - Percent Change"
|
||||
|
|
|
|||
Loading…
Reference in New Issue