diff --git a/README.md b/README.md index 29bcd8eb2..6c076fd8b 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ fabmanager create-admin # Start the web server python run.py + ``` After installation, you should be able to point your browser to the right @@ -80,3 +81,13 @@ the credential you entered while creating the admin account, and navigate to `Menu -> Admin -> Refresh Metadata`. This action should bring in all of your datasources for Panoramix to be aware of, and they should show up in `Menu -> Datasources`, from where you can start playing with your data! + +Configuration +------------- +* From the UI, enter the information about your clusters in the +``Admin->Clusters`` menu by hitting the + sign. + +* Once the Druid cluster connection information is entered, hit the +``Admin->Refresh Metadata`` menu item to populate + +* Navigate to your datasources diff --git a/TODO.md b/TODO.md index c8c17f05e..b85494863 100644 --- a/TODO.md +++ b/TODO.md @@ -6,7 +6,3 @@ * Add verbose_name and label method to metrics and columns * csv * Save / bookmark / url shortener -* on save, process metadata / generate metrics - -* multi cluster - diff --git a/app/__init__.py b/app/__init__.py index 9bbdd8ea9..97c830559 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -20,4 +20,6 @@ appbuilder = AppBuilder( app, db.session, base_template='panoramix/base.html', indexview=MyIndexView) +get_session = appbuilder.get_session + from app import views diff --git a/app/models.py b/app/models.py index 0c5b266fc..f0591881b 100644 --- a/app/models.py +++ b/app/models.py @@ -1,15 +1,50 @@ from flask.ext.appbuilder import Model -from datetime import datetime, timedelta -from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn, ImageColumn -from flask.ext.appbuilder.security.sqla.models import User -from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean +from pydruid import client +from datetime import timedelta +from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn +from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime from sqlalchemy.orm import relationship -from app import db, utils +from app import get_session from dateutil.parser import parse + +import logging import json +import requests -client = utils.get_pydruid_client() +class Cluster(Model, AuditMixin): + __tablename__ = 'clusters' + id = Column(Integer, primary_key=True) + cluster_name = Column(String(256), unique=True) + coordinator_host = Column(String(256)) + coordinator_port = Column(Integer) + coordinator_endpoint = Column(String(256)) + broker_host = Column(String(256)) + broker_port = Column(Integer) + broker_endpoint = Column(String(256)) + metadata_last_refreshed = Column(DateTime) + + def __repr__(self): + return self.cluster_name + + def get_pydruid_client(self): + cli = client.PyDruid( + "http://{0}:{1}/".format(self.broker_host, self.broker_port), + self.broker_endpoint) + return cli + + def refresh_datasources(self): + endpoint = ( + "http://{self.coordinator_host}:{self.coordinator_port}/" + "{self.coordinator_endpoint}/datasources" + ).format(self=self) + datasources = json.loads(requests.get(endpoint).text) + for datasource in datasources: + #try: + Datasource.sync_to_db(datasource, self) + #except Exception as e: + # logging.exception(e) + # logging.error("Failed at syncing " + datasource) class Datasource(Model, AuditMixin): __tablename__ = 'datasources' @@ -22,6 +57,9 @@ class Datasource(Model, AuditMixin): user_id = Column(Integer, ForeignKey('ab_user.id')) owner = relationship('User', backref='datasources', foreign_keys=[user_id]) + cluster_name = Column(Integer, + ForeignKey('clusters.cluster_name')) + cluster = relationship('Cluster', backref='datasources', foreign_keys=[cluster_name]) @property def metrics_combo(self): @@ -43,15 +81,15 @@ class Datasource(Model, AuditMixin): if m.metric_name == metric_name ][0] - @classmethod - def latest_metadata(cls, name): - results = client.time_boundary(datasource=name) + def latest_metadata(self): + client = self.cluster.get_pydruid_client() + results = client.time_boundary(datasource=self.datasource_name) max_time = results[0]['result']['minTime'] max_time = parse(max_time) intervals = (max_time - timedelta(seconds=1)).isoformat() + '/' intervals += (max_time + timedelta(seconds=1)).isoformat() segment_metadata = client.segment_metadata( - datasource=name, + datasource=self.datasource_name, intervals=intervals) if segment_metadata: return segment_metadata[-1]['columns'] @@ -61,16 +99,20 @@ class Datasource(Model, AuditMixin): col.generate_metrics() @classmethod - def sync_to_db(cls, name): - datasource = db.session.query(cls).filter_by(datasource_name=name).first() + def sync_to_db(cls, name, cluster): + session = get_session() + datasource = session.query(cls).filter_by(datasource_name=name).first() if not datasource: - db.session.add(cls(datasource_name=name)) - cols = cls.latest_metadata(name) + datasource = cls(datasource_name=name) + session.add(datasource) + datasource.cluster = cluster + + cols = datasource.latest_metadata() if not cols: return for col in cols: col_obj = ( - db.session + session .query(Column) .filter_by(datasource_name=name, column_name=col) .first() @@ -78,14 +120,14 @@ class Datasource(Model, AuditMixin): datatype = cols[col]['type'] if not col_obj: col_obj = Column(datasource_name=name, column_name=col) - db.session.add(col_obj) + session.add(col_obj) if datatype == "STRING": col_obj.groupby = True col_obj.filterable = True if col_obj: col_obj.type = cols[col]['type'] col_obj.generate_metrics() - db.session.commit() + #session.commit() @property def column_names(self): @@ -154,8 +196,7 @@ class Column(Model, AuditMixin): metric_name='count', verbose_name='COUNT(*)', metric_type='count', - json=json.dumps({ - 'type': 'count', 'name': 'count'}) + json=json.dumps({'type': 'count', 'name': 'count'}) )) if self.sum and self.isnum: @@ -200,14 +241,15 @@ class Column(Model, AuditMixin): 'name': name, 'fieldNames': [self.column_name]}) )) + session = get_session() for metric in metrics: m = ( - db.session.query(M) + session.query(M) .filter(M.datasource_name==self.datasource_name) .filter(M.metric_name==metric.metric_name) .first() ) metric.datasource_name = self.datasource_name if not m: - db.session.add(metric) - db.session.commit() + session.add(metric) + session.commit() diff --git a/app/templates/panoramix/viz_highcharts.html b/app/templates/panoramix/viz_highcharts.html index efb4d1927..50f392737 100644 --- a/app/templates/panoramix/viz_highcharts.html +++ b/app/templates/panoramix/viz_highcharts.html @@ -7,7 +7,7 @@ {% if form.compare %}