[CLI] adding 'superset flower' command (flower is a UI for Celery) (#2963)

* [CLI] adding 'superset flower' command (flower is a UI for Celery)

* Addressing comments
This commit is contained in:
Maxime Beauchemin 2017-06-15 17:02:17 -07:00 committed by GitHub
parent 6ddccaaa9b
commit 712297480c
4 changed files with 74 additions and 28 deletions

View File

@ -43,7 +43,8 @@ setup(
scripts=['superset/bin/superset'], scripts=['superset/bin/superset'],
install_requires=[ install_requires=[
'boto3==1.4.4', 'boto3==1.4.4',
'celery==3.1.23', 'celery==3.1.25',
'colorama==0.3.9',
'cryptography==1.7.2', 'cryptography==1.7.2',
'flask-appbuilder==1.9.0', 'flask-appbuilder==1.9.0',
'flask-cache==0.13.1', 'flask-cache==0.13.1',
@ -52,6 +53,7 @@ setup(
'flask-sqlalchemy==2.1', 'flask-sqlalchemy==2.1',
'flask-testing==0.6.2', 'flask-testing==0.6.2',
'flask-wtf==0.14.2', 'flask-wtf==0.14.2',
'flower==0.9.1',
'future>=0.16.0, <0.17', 'future>=0.16.0, <0.17',
'humanize==0.5.1', 'humanize==0.5.1',
'gunicorn==19.7.1', 'gunicorn==19.7.1',

View File

@ -5,17 +5,18 @@ from __future__ import print_function
from __future__ import unicode_literals from __future__ import unicode_literals
import logging import logging
import celery
from celery.bin import worker as celery_worker from celery.bin import worker as celery_worker
from datetime import datetime from datetime import datetime
from subprocess import Popen from subprocess import Popen
from colorama import Fore, Style
from flask_migrate import MigrateCommand from flask_migrate import MigrateCommand
from flask_script import Manager from flask_script import Manager
from superset import app, db, security from superset import app, db, security, utils
config = app.config config = app.config
celery_app = utils.get_celery_app(config)
manager = Manager(app) manager = Manager(app)
manager.add_command('db', MigrateCommand) manager.add_command('db', MigrateCommand)
@ -41,7 +42,8 @@ def init():
'-p', '--port', default=config.get("SUPERSET_WEBSERVER_PORT"), '-p', '--port', default=config.get("SUPERSET_WEBSERVER_PORT"),
help="Specify the port on which to run the web server") help="Specify the port on which to run the web server")
@manager.option( @manager.option(
'-w', '--workers', default=config.get("SUPERSET_WORKERS", 2), '-w', '--workers',
default=config.get("SUPERSET_WORKERS", 2),
help="Number of gunicorn web server workers to fire up") help="Number of gunicorn web server workers to fire up")
@manager.option( @manager.option(
'-t', '--timeout', default=config.get("SUPERSET_WEBSERVER_TIMEOUT"), '-t', '--timeout', default=config.get("SUPERSET_WEBSERVER_TIMEOUT"),
@ -55,6 +57,13 @@ def runserver(debug, no_reload, address, port, timeout, workers, socket):
"""Starts a Superset web server.""" """Starts a Superset web server."""
debug = debug or config.get("DEBUG") debug = debug or config.get("DEBUG")
if debug: if debug:
print(Fore.BLUE + '-=' * 20)
print(
Fore.YELLOW + "Starting Superset server in " +
Fore.RED + "DEBUG" +
Fore.YELLOW + " mode")
print(Fore.BLUE + '-=' * 20)
print(Style.RESET_ALL)
app.run( app.run(
host='0.0.0.0', host='0.0.0.0',
port=int(port), port=int(port),
@ -71,7 +80,9 @@ def runserver(debug, no_reload, address, port, timeout, workers, socket):
"--limit-request-line 0 " "--limit-request-line 0 "
"--limit-request-field_size 0 " "--limit-request-field_size 0 "
"superset:app").format(**locals()) "superset:app").format(**locals())
print("Starting server with command: " + cmd) print(Fore.GREEN + "Starting server with command: ")
print(Fore.YELLOW + cmd)
print(Style.RESET_ALL)
Popen(cmd, shell=True).wait() Popen(cmd, shell=True).wait()
@ -80,14 +91,13 @@ def runserver(debug, no_reload, address, port, timeout, workers, socket):
help="Show extra information") help="Show extra information")
def version(verbose): def version(verbose):
"""Prints the current version number""" """Prints the current version number"""
s = ( print(Fore.BLUE + '-=' * 15)
"\n-----------------------\n" print(Fore.YELLOW + "Superset " + Fore.CYAN + "{version}".format(
"Superset {version}\n" version=config.get('VERSION_STRING')))
"-----------------------").format( print(Fore.BLUE + '-=' * 15)
version=config.get('VERSION_STRING'))
print(s)
if verbose: if verbose:
print("[DB] : " + "{}".format(db.engine)) print("[DB] : " + "{}".format(db.engine))
print(Style.RESET_ALL)
@manager.option( @manager.option(
@ -173,22 +183,43 @@ def update_datasources_cache():
@manager.option( @manager.option(
'-w', '--workers', default=config.get("SUPERSET_CELERY_WORKERS", 32), '-w', '--workers',
type=int,
help="Number of celery server workers to fire up") help="Number of celery server workers to fire up")
def worker(workers): def worker(workers):
"""Starts a Superset worker for async SQL query execution.""" """Starts a Superset worker for async SQL query execution."""
# celery -A tasks worker --loglevel=info if workers:
print("Starting SQL Celery worker.") celery_app.conf.update(CELERYD_CONCURRENCY=workers)
if config.get('CELERY_CONFIG'): elif config.get("SUPERSET_CELERY_WORKERS"):
print("Celery broker url: ") celery_app.conf.update(
print(config.get('CELERY_CONFIG').BROKER_URL) worker_concurrency=config.get("SUPERSET_CELERY_WORKERS"))
application = celery.current_app._get_current_object() worker = celery_worker.worker(app=celery_app)
c_worker = celery_worker.worker(app=application) worker.run()
options = {
'broker': config.get('CELERY_CONFIG').BROKER_URL,
'loglevel': 'INFO', @manager.option(
'traceback': True, '-p', '--port',
'concurrency': int(workers), default='5555',
} help=('Port on which to start the Flower process'))
c_worker.run(**options) @manager.option(
'-a', '--address',
default='localhost',
help=('Address on which to run the service'))
def flower(port, address):
"""Runs a Celery Flower web server
Celery Flower is a UI to monitor the Celery operation on a given
broker"""
BROKER_URL = celery_app.conf.BROKER_URL
cmd = (
"celery flower "
"--broker={BROKER_URL} "
"--port={port} "
"--address={address} "
).format(**locals())
print(Fore.GREEN + "Starting a Celery Flower instance")
print(Fore.BLUE + '-=' * 40)
print(Fore.YELLOW + cmd)
print(Fore.BLUE + '-=' * 40)
Popen(cmd, shell=True).wait()

View File

@ -16,9 +16,10 @@ from superset.models.sql_lab import Query
from superset.sql_parse import SupersetQuery from superset.sql_parse import SupersetQuery
from superset.db_engine_specs import LimitMethod from superset.db_engine_specs import LimitMethod
from superset.jinja_context import get_template_processor from superset.jinja_context import get_template_processor
from superset.utils import QueryStatus from superset.utils import QueryStatus, get_celery_app
celery_app = celery.Celery(config_source=app.config.get('CELERY_CONFIG')) config = app.config
celery_app = get_celery_app(config)
def dedup(l, suffix='__'): def dedup(l, suffix='__'):

View File

@ -21,6 +21,8 @@ import zlib
from builtins import object from builtins import object
from datetime import date, datetime, time from datetime import date, datetime, time
import celery
from dateutil.parser import parse from dateutil.parser import parse
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
@ -622,3 +624,13 @@ def zlib_decompress_to_string(blob):
decompressed = zlib.decompress(bytes(blob, "utf-8")) decompressed = zlib.decompress(bytes(blob, "utf-8"))
return decompressed.decode("utf-8") return decompressed.decode("utf-8")
return zlib.decompress(blob) return zlib.decompress(blob)
_celery_app = None
def get_celery_app(config):
global _celery_app
if _celery_app:
return _celery_app
_celery_app = celery.Celery(config_source=config.get('CELERY_CONFIG'))
return _celery_app