From 712297480c5eaaacccd71ff3ab595db08fd38522 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Thu, 15 Jun 2017 17:02:17 -0700 Subject: [PATCH] [CLI] adding 'superset flower' command (flower is a UI for Celery) (#2963) * [CLI] adding 'superset flower' command (flower is a UI for Celery) * Addressing comments --- setup.py | 4 ++- superset/cli.py | 81 +++++++++++++++++++++++++++++++-------------- superset/sql_lab.py | 5 +-- superset/utils.py | 12 +++++++ 4 files changed, 74 insertions(+), 28 deletions(-) diff --git a/setup.py b/setup.py index b8f48abf8..17ebe43ee 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,8 @@ setup( scripts=['superset/bin/superset'], install_requires=[ 'boto3==1.4.4', - 'celery==3.1.23', + 'celery==3.1.25', + 'colorama==0.3.9', 'cryptography==1.7.2', 'flask-appbuilder==1.9.0', 'flask-cache==0.13.1', @@ -52,6 +53,7 @@ setup( 'flask-sqlalchemy==2.1', 'flask-testing==0.6.2', 'flask-wtf==0.14.2', + 'flower==0.9.1', 'future>=0.16.0, <0.17', 'humanize==0.5.1', 'gunicorn==19.7.1', diff --git a/superset/cli.py b/superset/cli.py index 3bf79cb70..46b0ca794 100755 --- a/superset/cli.py +++ b/superset/cli.py @@ -5,17 +5,18 @@ from __future__ import print_function from __future__ import unicode_literals import logging -import celery from celery.bin import worker as celery_worker from datetime import datetime from subprocess import Popen +from colorama import Fore, Style from flask_migrate import MigrateCommand from flask_script import Manager -from superset import app, db, security +from superset import app, db, security, utils config = app.config +celery_app = utils.get_celery_app(config) manager = Manager(app) manager.add_command('db', MigrateCommand) @@ -41,7 +42,8 @@ def init(): '-p', '--port', default=config.get("SUPERSET_WEBSERVER_PORT"), help="Specify the port on which to run the web server") @manager.option( - '-w', '--workers', default=config.get("SUPERSET_WORKERS", 2), + '-w', '--workers', + default=config.get("SUPERSET_WORKERS", 2), help="Number of gunicorn web server workers to fire up") @manager.option( '-t', '--timeout', default=config.get("SUPERSET_WEBSERVER_TIMEOUT"), @@ -55,6 +57,13 @@ def runserver(debug, no_reload, address, port, timeout, workers, socket): """Starts a Superset web server.""" debug = debug or config.get("DEBUG") if debug: + print(Fore.BLUE + '-=' * 20) + print( + Fore.YELLOW + "Starting Superset server in " + + Fore.RED + "DEBUG" + + Fore.YELLOW + " mode") + print(Fore.BLUE + '-=' * 20) + print(Style.RESET_ALL) app.run( host='0.0.0.0', port=int(port), @@ -71,7 +80,9 @@ def runserver(debug, no_reload, address, port, timeout, workers, socket): "--limit-request-line 0 " "--limit-request-field_size 0 " "superset:app").format(**locals()) - print("Starting server with command: " + cmd) + print(Fore.GREEN + "Starting server with command: ") + print(Fore.YELLOW + cmd) + print(Style.RESET_ALL) Popen(cmd, shell=True).wait() @@ -80,14 +91,13 @@ def runserver(debug, no_reload, address, port, timeout, workers, socket): help="Show extra information") def version(verbose): """Prints the current version number""" - s = ( - "\n-----------------------\n" - "Superset {version}\n" - "-----------------------").format( - version=config.get('VERSION_STRING')) - print(s) + print(Fore.BLUE + '-=' * 15) + print(Fore.YELLOW + "Superset " + Fore.CYAN + "{version}".format( + version=config.get('VERSION_STRING'))) + print(Fore.BLUE + '-=' * 15) if verbose: print("[DB] : " + "{}".format(db.engine)) + print(Style.RESET_ALL) @manager.option( @@ -173,22 +183,43 @@ def update_datasources_cache(): @manager.option( - '-w', '--workers', default=config.get("SUPERSET_CELERY_WORKERS", 32), + '-w', '--workers', + type=int, help="Number of celery server workers to fire up") def worker(workers): """Starts a Superset worker for async SQL query execution.""" - # celery -A tasks worker --loglevel=info - print("Starting SQL Celery worker.") - if config.get('CELERY_CONFIG'): - print("Celery broker url: ") - print(config.get('CELERY_CONFIG').BROKER_URL) + if workers: + celery_app.conf.update(CELERYD_CONCURRENCY=workers) + elif config.get("SUPERSET_CELERY_WORKERS"): + celery_app.conf.update( + worker_concurrency=config.get("SUPERSET_CELERY_WORKERS")) - application = celery.current_app._get_current_object() - c_worker = celery_worker.worker(app=application) - options = { - 'broker': config.get('CELERY_CONFIG').BROKER_URL, - 'loglevel': 'INFO', - 'traceback': True, - 'concurrency': int(workers), - } - c_worker.run(**options) + worker = celery_worker.worker(app=celery_app) + worker.run() + + +@manager.option( + '-p', '--port', + default='5555', + help=('Port on which to start the Flower process')) +@manager.option( + '-a', '--address', + default='localhost', + help=('Address on which to run the service')) +def flower(port, address): + """Runs a Celery Flower web server + + Celery Flower is a UI to monitor the Celery operation on a given + broker""" + BROKER_URL = celery_app.conf.BROKER_URL + cmd = ( + "celery flower " + "--broker={BROKER_URL} " + "--port={port} " + "--address={address} " + ).format(**locals()) + print(Fore.GREEN + "Starting a Celery Flower instance") + print(Fore.BLUE + '-=' * 40) + print(Fore.YELLOW + cmd) + print(Fore.BLUE + '-=' * 40) + Popen(cmd, shell=True).wait() diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 7158d9c75..176ff4cdb 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -16,9 +16,10 @@ from superset.models.sql_lab import Query from superset.sql_parse import SupersetQuery from superset.db_engine_specs import LimitMethod from superset.jinja_context import get_template_processor -from superset.utils import QueryStatus +from superset.utils import QueryStatus, get_celery_app -celery_app = celery.Celery(config_source=app.config.get('CELERY_CONFIG')) +config = app.config +celery_app = get_celery_app(config) def dedup(l, suffix='__'): diff --git a/superset/utils.py b/superset/utils.py index 655ccad20..49a0f4735 100644 --- a/superset/utils.py +++ b/superset/utils.py @@ -21,6 +21,8 @@ import zlib from builtins import object from datetime import date, datetime, time + +import celery from dateutil.parser import parse from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart @@ -622,3 +624,13 @@ def zlib_decompress_to_string(blob): decompressed = zlib.decompress(bytes(blob, "utf-8")) return decompressed.decode("utf-8") return zlib.decompress(blob) + +_celery_app = None + + +def get_celery_app(config): + global _celery_app + if _celery_app: + return _celery_app + _celery_app = celery.Celery(config_source=config.get('CELERY_CONFIG')) + return _celery_app