From 6032daf5f01266fc4414973ac84afed5041ada82 Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 30 Jul 2015 00:33:37 +0000 Subject: [PATCH 1/3] Multi clusters --- app/models.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/app/models.py b/app/models.py index 0c5b266fc..b3e7bb646 100644 --- a/app/models.py +++ b/app/models.py @@ -2,7 +2,7 @@ from flask.ext.appbuilder import Model from datetime import datetime, timedelta from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn, ImageColumn from flask.ext.appbuilder.security.sqla.models import User -from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean +from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime from sqlalchemy.orm import relationship from app import db, utils from dateutil.parser import parse @@ -11,6 +11,20 @@ import json client = utils.get_pydruid_client() + +class Cluster(Model, AuditMixin): + __tablename__ = 'clusters' + id = Column(Integer, primary_key=True) + cluster_name = Column(String(256), unique=True) + coordinator_host = Column(String(256)) + coordinator_port = Column(Integer) + coordinator_endpoint = Column(String(256)) + broker_host = Column(String(256)) + broker_port = Column(Integer) + broker_endpoint = Column(String(256)) + metadata_last_refreshed = Column(DateTime) + + class Datasource(Model, AuditMixin): __tablename__ = 'datasources' id = Column(Integer, primary_key=True) @@ -22,6 +36,9 @@ class Datasource(Model, AuditMixin): user_id = Column(Integer, ForeignKey('ab_user.id')) owner = relationship('User', backref='datasources', foreign_keys=[user_id]) + cluster_name = Column(Integer, + ForeignKey('clusters.cluster_name')) + cluster = relationship('Cluster', backref='datasources', foreign_keys=[cluster_name]) @property def metrics_combo(self): From 374802e437720ebe4a230115e3b4d7c61f2fff2c Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 30 Jul 2015 06:39:30 +0000 Subject: [PATCH 2/3] Now enabling multi-cluster, connection info managed in UI --- app/__init__.py | 2 ++ app/models.py | 71 +++++++++++++++++++++++++++++++++---------------- app/utils.py | 8 ++---- app/views.py | 51 ++++++++++++++++++++++------------- app/viz.py | 6 ++--- config.py | 8 ------ 6 files changed, 88 insertions(+), 58 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 9bbdd8ea9..97c830559 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -20,4 +20,6 @@ appbuilder = AppBuilder( app, db.session, base_template='panoramix/base.html', indexview=MyIndexView) +get_session = appbuilder.get_session + from app import views diff --git a/app/models.py b/app/models.py index b3e7bb646..f0591881b 100644 --- a/app/models.py +++ b/app/models.py @@ -1,15 +1,15 @@ from flask.ext.appbuilder import Model -from datetime import datetime, timedelta -from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn, ImageColumn -from flask.ext.appbuilder.security.sqla.models import User +from pydruid import client +from datetime import timedelta +from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime from sqlalchemy.orm import relationship -from app import db, utils +from app import get_session from dateutil.parser import parse + +import logging import json - - -client = utils.get_pydruid_client() +import requests class Cluster(Model, AuditMixin): @@ -24,6 +24,27 @@ class Cluster(Model, AuditMixin): broker_endpoint = Column(String(256)) metadata_last_refreshed = Column(DateTime) + def __repr__(self): + return self.cluster_name + + def get_pydruid_client(self): + cli = client.PyDruid( + "http://{0}:{1}/".format(self.broker_host, self.broker_port), + self.broker_endpoint) + return cli + + def refresh_datasources(self): + endpoint = ( + "http://{self.coordinator_host}:{self.coordinator_port}/" + "{self.coordinator_endpoint}/datasources" + ).format(self=self) + datasources = json.loads(requests.get(endpoint).text) + for datasource in datasources: + #try: + Datasource.sync_to_db(datasource, self) + #except Exception as e: + # logging.exception(e) + # logging.error("Failed at syncing " + datasource) class Datasource(Model, AuditMixin): __tablename__ = 'datasources' @@ -60,15 +81,15 @@ class Datasource(Model, AuditMixin): if m.metric_name == metric_name ][0] - @classmethod - def latest_metadata(cls, name): - results = client.time_boundary(datasource=name) + def latest_metadata(self): + client = self.cluster.get_pydruid_client() + results = client.time_boundary(datasource=self.datasource_name) max_time = results[0]['result']['minTime'] max_time = parse(max_time) intervals = (max_time - timedelta(seconds=1)).isoformat() + '/' intervals += (max_time + timedelta(seconds=1)).isoformat() segment_metadata = client.segment_metadata( - datasource=name, + datasource=self.datasource_name, intervals=intervals) if segment_metadata: return segment_metadata[-1]['columns'] @@ -78,16 +99,20 @@ class Datasource(Model, AuditMixin): col.generate_metrics() @classmethod - def sync_to_db(cls, name): - datasource = db.session.query(cls).filter_by(datasource_name=name).first() + def sync_to_db(cls, name, cluster): + session = get_session() + datasource = session.query(cls).filter_by(datasource_name=name).first() if not datasource: - db.session.add(cls(datasource_name=name)) - cols = cls.latest_metadata(name) + datasource = cls(datasource_name=name) + session.add(datasource) + datasource.cluster = cluster + + cols = datasource.latest_metadata() if not cols: return for col in cols: col_obj = ( - db.session + session .query(Column) .filter_by(datasource_name=name, column_name=col) .first() @@ -95,14 +120,14 @@ class Datasource(Model, AuditMixin): datatype = cols[col]['type'] if not col_obj: col_obj = Column(datasource_name=name, column_name=col) - db.session.add(col_obj) + session.add(col_obj) if datatype == "STRING": col_obj.groupby = True col_obj.filterable = True if col_obj: col_obj.type = cols[col]['type'] col_obj.generate_metrics() - db.session.commit() + #session.commit() @property def column_names(self): @@ -171,8 +196,7 @@ class Column(Model, AuditMixin): metric_name='count', verbose_name='COUNT(*)', metric_type='count', - json=json.dumps({ - 'type': 'count', 'name': 'count'}) + json=json.dumps({'type': 'count', 'name': 'count'}) )) if self.sum and self.isnum: @@ -217,14 +241,15 @@ class Column(Model, AuditMixin): 'name': name, 'fieldNames': [self.column_name]}) )) + session = get_session() for metric in metrics: m = ( - db.session.query(M) + session.query(M) .filter(M.datasource_name==self.datasource_name) .filter(M.metric_name==metric.metric_name) .first() ) metric.datasource_name = self.datasource_name if not m: - db.session.add(metric) - db.session.commit() + session.add(metric) + session.commit() diff --git a/app/utils.py b/app/utils.py index 59ced72a1..6276b002e 100644 --- a/app/utils.py +++ b/app/utils.py @@ -1,13 +1,9 @@ import config -from datetime import timedelta, datetime +from datetime import datetime import parsedatetime +from app import db -def get_pydruid_client(): - from pydruid import client - return client.PyDruid( - "http://{0}:{1}/".format(config.DRUID_HOST, config.DRUID_PORT), - config.DRUID_BASE_ENDPOINT) def parse_human_datetime(s): diff --git a/app/views.py b/app/views.py index c843f36ee..26c04b8bb 100644 --- a/app/views.py +++ b/app/views.py @@ -1,11 +1,11 @@ -from datetime import timedelta +from datetime import datetime import logging import json from flask import request, redirect, flash, Response from flask.ext.appbuilder.models.sqla.interface import SQLAInterface from flask.ext.appbuilder import ModelView, CompactCRUDMixin, BaseView, expose -from app import appbuilder, db, models, viz, utils, app +from app import appbuilder, db, models, viz, utils, app, get_session from flask.ext.appbuilder.security.decorators import has_access, permission_name import config from pydruid.client import doublesum @@ -62,13 +62,32 @@ class MetricInlineView(CompactCRUDMixin, ModelView): appbuilder.add_view_no_menu(MetricInlineView) +class ClusterModelView(ModelView, DeleteMixin): + datamodel = SQLAInterface(models.Cluster) + add_columns = [ + 'cluster_name', + 'coordinator_host', 'coordinator_port', 'coordinator_endpoint', + 'broker_host', 'broker_port', 'broker_endpoint', + ] + edit_columns = add_columns + list_columns = ['cluster_name', 'metadata_last_refreshed'] + +appbuilder.add_view( + ClusterModelView, + "Clusters", + icon="fa-server", + category="Admin", + category_icon='fa-envelope') + + class DatasourceModelView(ModelView, DeleteMixin): datamodel = SQLAInterface(models.Datasource) - list_columns = ['datasource_link', 'owner', 'is_featured', 'is_hidden'] + list_columns = [ + 'datasource_link', 'cluster', 'owner', 'is_featured', 'is_hidden'] related_views = [ColumnInlineView, MetricInlineView] edit_columns = [ - 'datasource_name', 'description', 'owner', 'is_featured', 'is_hidden', - 'default_endpoint'] + 'datasource_name', 'cluster', 'description', 'owner', + 'is_featured', 'is_hidden', 'default_endpoint'] page_size = 100 base_order = ('datasource_name', 'asc') @@ -129,19 +148,15 @@ class Panoramix(BaseView): @permission_name('refresh_datasources') @expose("/refresh_datasources/") def refresh_datasources(self): - import requests - endpoint = ( - "http://{COORDINATOR_HOST}:{COORDINATOR_PORT}/" - "{COORDINATOR_BASE_ENDPOINT}/datasources" - ).format(**config.__dict__) - datasources = json.loads(requests.get(endpoint).text) - for datasource in datasources: - try: - models.Datasource.sync_to_db(datasource) - except Exception as e: - logging.exception(e) - logging.error("Failed at syncing " + datasource) - flash("Refreshed metadata from Druid!", 'info') + session = db.session() + for cluster in session.query(models.Cluster).all(): + cluster.refresh_datasources() + cluster.metadata_last_refreshed = datetime.now() + flash( + "Refreshed metadata from cluster " + "[" + cluster.cluster_name + "]", + 'info') + session.commit() return redirect("/datasourcemodelview/list/") @expose("/autocomplete///") diff --git a/app/viz.py b/app/viz.py index 700c0d611..8dac33c97 100644 --- a/app/viz.py +++ b/app/viz.py @@ -164,12 +164,12 @@ class BaseViz(object): return d def bake_query(self): - client = utils.get_pydruid_client() + client = self.datasource.cluster.get_pydruid_client() client.groupby(**self.query_obj()) return client.export_pandas() def get_query(self): - client = utils.get_pydruid_client() + client = self.datasource.cluster.get_pydruid_client() client.groupby(**self.query_obj()) return client.query_dict @@ -265,7 +265,7 @@ class TimeSeriesViz(HighchartsViz): """ Doing a 2 phase query where we limit the number of series. """ - client = utils.get_pydruid_client() + client = self.datasource.cluster.get_pydruid_client() qry = self.query_obj() orig_filter = qry['filter'] if 'filter' in qry else '' qry['granularity'] = "all" diff --git a/config.py b/config.py index 87d19d2cd..ed71108fc 100644 --- a/config.py +++ b/config.py @@ -14,14 +14,6 @@ There' a ``from local_config import *`` at the end of this file. #--------------------------------------------------------- ROW_LIMIT = 5000 -DRUID_HOST = '0.0.0.0' -DRUID_PORT = '8084' -DRUID_BASE_ENDPOINT = 'druid/v2' - -COORDINATOR_HOST = '0.0.0.0' -COORDINATOR_PORT = '8081' -COORDINATOR_BASE_ENDPOINT = 'druid/coordinator/v1' - PANORAMIX_WEBSERVER_PORT = 8088 #--------------------------------------------------------- From 5e8fcabf10eed37b585d83230345400630252098 Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 30 Jul 2015 21:44:26 +0000 Subject: [PATCH 3/3] Removing compare feature --- README.md | 11 +++++++++++ TODO.md | 4 ---- app/templates/panoramix/viz_highcharts.html | 2 +- app/viz.py | 2 +- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 29bcd8eb2..6c076fd8b 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ fabmanager create-admin # Start the web server python run.py + ``` After installation, you should be able to point your browser to the right @@ -80,3 +81,13 @@ the credential you entered while creating the admin account, and navigate to `Menu -> Admin -> Refresh Metadata`. This action should bring in all of your datasources for Panoramix to be aware of, and they should show up in `Menu -> Datasources`, from where you can start playing with your data! + +Configuration +------------- +* From the UI, enter the information about your clusters in the +``Admin->Clusters`` menu by hitting the + sign. + +* Once the Druid cluster connection information is entered, hit the +``Admin->Refresh Metadata`` menu item to populate + +* Navigate to your datasources diff --git a/TODO.md b/TODO.md index c8c17f05e..b85494863 100644 --- a/TODO.md +++ b/TODO.md @@ -6,7 +6,3 @@ * Add verbose_name and label method to metrics and columns * csv * Save / bookmark / url shortener -* on save, process metadata / generate metrics - -* multi cluster - diff --git a/app/templates/panoramix/viz_highcharts.html b/app/templates/panoramix/viz_highcharts.html index efb4d1927..50f392737 100644 --- a/app/templates/panoramix/viz_highcharts.html +++ b/app/templates/panoramix/viz_highcharts.html @@ -7,7 +7,7 @@ {% if form.compare %}
{{ form.compare.label }}: {{ form.compare(class_="form-control") }}
{% endif %} - {% if form.compare %} + {% if form.rolling_type %}
{{ form.rolling_type.label }}: {{ form.rolling_type(class_="form-control select2") }} {{ form.rolling_periods.label }}: {{ form.rolling_periods(class_="form-control") }} diff --git a/app/viz.py b/app/viz.py index 8dac33c97..022126395 100644 --- a/app/viz.py +++ b/app/viz.py @@ -254,7 +254,7 @@ class TimeSeriesViz(HighchartsViz): def form_class(self): return form_factory(self.datasource, request.args, extra_fields_dict={ - 'compare': TextField('Period Compare',), + #'compare': TextField('Period Compare',), 'rolling_type': SelectField( 'Rolling', choices=[(s, s) for s in ['mean', 'sum', 'std']]),