From 808622414c0f901bbecc53678fca0085af41a04c Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Mon, 10 Dec 2018 22:29:29 -0800 Subject: [PATCH] [SIP-3] Scheduled email reports for Slices / Dashboards (#5294) * [scheduled reports] Add support for scheduled reports * Scheduled email reports for slice and dashboard visualization (attachment or inline) * Scheduled email reports for slice data (CSV attachment on inline table) * Each schedule has a list of recipients (all of them can receive a single mail, or separate mails) * All outgoing mails can have a mandatory bcc - for audit purposes. * Each dashboard/slice can have multiple schedules. In addition, this PR also makes a few minor improvements to the celery infrastructure. * Create a common celery app * Added more celery annotations for the tasks * Introduced celery beat * Update docs about concurrency / pools * [scheduled reports] - Debug mode for scheduled emails * [scheduled reports] - Ability to send test mails * [scheduled reports] - Test email functionality - minor improvements * [scheduled reports] - Rebase with master. Minor fixes * [scheduled reports] - Add warning messages * [scheduled reports] - flake8 * [scheduled reports] - fix rebase * [scheduled reports] - fix rebase * [scheduled reports] - fix flake8 * [scheduled reports] Rebase in prep for merge * Fixed alembic tree after rebase * Updated requirements to latest version of packages (and tested) * Removed py2 stuff * [scheduled reports] - fix flake8 * [scheduled reports] - address review comments * [scheduled reports] - rebase with master --- .gitignore | 9 + docs/faq.rst | 2 +- docs/installation.rst | 116 ++++- requirements.txt | 3 + setup.py | 3 + superset/config.py | 91 +++- .../6c7537a6004a_models_for_email_reports.py | 69 +++ superset/models/__init__.py | 1 + superset/models/schedules.py | 93 ++++ superset/sql_lab.py | 9 +- superset/tasks/__init__.py | 2 + superset/tasks/celery_app.py | 11 + superset/tasks/schedules.py | 441 ++++++++++++++++++ .../superset/reports/slice_data.html | 31 ++ superset/utils/core.py | 33 +- superset/views/__init__.py | 1 + superset/views/core.py | 1 - superset/views/schedules.py | 279 +++++++++++ tests/email_tests.py | 35 ++ tests/fixtures/sample.png | Bin 0 -> 4481 bytes tests/fixtures/trends.csv | 3 + tests/schedules_test.py | 368 +++++++++++++++ tests/utils.py | 8 +- 23 files changed, 1569 insertions(+), 40 deletions(-) create mode 100644 superset/migrations/versions/6c7537a6004a_models_for_email_reports.py create mode 100644 superset/models/schedules.py create mode 100644 superset/tasks/__init__.py create mode 100644 superset/tasks/celery_app.py create mode 100644 superset/tasks/schedules.py create mode 100644 superset/templates/superset/reports/slice_data.html create mode 100644 superset/views/schedules.py create mode 100644 tests/fixtures/sample.png create mode 100644 tests/fixtures/trends.csv create mode 100644 tests/schedules_test.py diff --git a/.gitignore b/.gitignore index c6cc07bf35..2ace393240 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,12 @@ yarn-error.log *.iml venv @eaDir/ + +# Test data +celery_results.sqlite +celerybeat-schedule +celerydb.sqlite +celerybeat.pid +geckodriver.log +ghostdriver.log +testCSV.csv diff --git a/docs/faq.rst b/docs/faq.rst index 3b69044563..9deba3c0a1 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -88,7 +88,7 @@ It's easy: use the ``Filter Box`` widget, build a slice, and add it to your dashboard. The ``Filter Box`` widget allows you to define a query to populate dropdowns -that can be use for filtering. To build the list of distinct values, we +that can be used for filtering. To build the list of distinct values, we run a query, and sort the result by the metric you provide, sorting descending. diff --git a/docs/installation.rst b/docs/installation.rst index 7224cd2f8c..fd6efec17a 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -603,14 +603,12 @@ Upgrading should be as straightforward as running:: superset db upgrade superset init -SQL Lab -------- -SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible -databases. By default, queries are executed in the scope of a web -request so they -may eventually timeout as queries exceed the maximum duration of a web -request in your environment, whether it'd be a reverse proxy or the Superset -server itself. +Celery Tasks +------------ +On large analytic databases, it's common to run background jobs, reports +and/or queries that execute for minutes or hours. In certain cases, we need +to support long running tasks that execute beyond the typical web request's +timeout (30-60 seconds). On large analytic databases, it's common to run queries that execute for minutes or hours. @@ -634,15 +632,41 @@ have the same configuration. class CeleryConfig(object): BROKER_URL = 'redis://localhost:6379/0' - CELERY_IMPORTS = ('superset.sql_lab', ) + CELERY_IMPORTS = ( + 'superset.sql_lab', + 'superset.tasks', + ) CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' - CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}} + CELERYD_LOG_LEVEL = 'DEBUG' + CELERYD_PREFETCH_MULTIPLIER = 10 + CELERY_ACKS_LATE = True + CELERY_ANNOTATIONS = { + 'sql_lab.get_sql_results': { + 'rate_limit': '100/s', + }, + 'email_reports.send': { + 'rate_limit': '1/s', + 'time_limit': 120, + 'soft_time_limit': 150, + 'ignore_result': True, + }, + } + CELERYBEAT_SCHEDULE = { + 'email_reports.schedule_hourly': { + 'task': 'email_reports.schedule_hourly', + 'schedule': crontab(minute=1, hour='*'), + }, + } CELERY_CONFIG = CeleryConfig -To start a Celery worker to leverage the configuration run: :: +* To start a Celery worker to leverage the configuration run: :: - celery worker --app=superset.sql_lab:celery_app --pool=gevent -Ofair + celery worker --app=superset.tasks.celery_app:app --pool=prefork -Ofair -c 4 + +* To start a job which schedules periodic background jobs, run :: + + celery beat --app=superset.tasks.celery_app:app To setup a result backend, you need to pass an instance of a derivative of ``werkzeug.contrib.cache.BaseCache`` to the ``RESULTS_BACKEND`` @@ -665,11 +689,65 @@ look something like: RESULTS_BACKEND = RedisCache( host='localhost', port=6379, key_prefix='superset_results') -Note that it's important that all the worker nodes and web servers in -the Superset cluster share a common metadata database. -This means that SQLite will not work in this context since it has -limited support for concurrency and -typically lives on the local file system. +**Important notes** + +* It is important that all the worker nodes and web servers in + the Superset cluster share a common metadata database. + This means that SQLite will not work in this context since it has + limited support for concurrency and + typically lives on the local file system. + +* There should only be one instance of ``celery beat`` running in your + entire setup. If not, background jobs can get scheduled multiple times + resulting in weird behaviors like duplicate delivery of reports, + higher than expected load / traffic etc. + + +Email Reports +------------- +Email reports allow users to schedule email reports for + +* slice and dashboard visualization (Attachment or inline) +* slice data (CSV attachment on inline table) + +Schedules are defined in crontab format and each schedule +can have a list of recipients (all of them can receive a single mail, +or separate mails). For audit purposes, all outgoing mails can have a +mandatory bcc. + +**Requirements** + +* A selenium compatible driver & headless browser + + * `geckodriver `_ and Firefox is preferred + * `chromedriver `_ is a good option too +* Run `celery worker` and `celery beat` as follows :: + + celery worker --app=superset.tasks.celery_app:app --pool=prefork -Ofair -c 4 + celery beat --app=superset.tasks.celery_app:app + +**Important notes** + +* Be mindful of the concurrency setting for celery (using ``-c 4``). + Selenium/webdriver instances can consume a lot of CPU / memory on your servers. + +* In some cases, if you notice a lot of leaked ``geckodriver`` processes, try running + your celery processes with :: + + celery worker --pool=prefork --max-tasks-per-child=128 ... + +* It is recommended to run separate workers for ``sql_lab`` and + ``email_reports`` tasks. Can be done by using ``queue`` field in ``CELERY_ANNOTATIONS`` + +SQL Lab +------- +SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible +databases. By default, queries are executed in the scope of a web +request so they may eventually timeout as queries exceed the maximum duration of a web +request in your environment, whether it'd be a reverse proxy or the Superset +server itself. In such cases, it is preferred to use ``celery`` to run the queries +in the background. Please follow the examples/notes mentioned above to get your +celery setup working. Also note that SQL Lab supports Jinja templating in queries and that it's possible to overload @@ -684,6 +762,8 @@ in this dictionary are made available for users to use in their SQL. } +Celery Flower +------------- Flower is a web based tool for monitoring the Celery cluster which you can install from pip: :: @@ -691,7 +771,7 @@ install from pip: :: and run via: :: - celery flower --app=superset.sql_lab:celery_app + celery flower --app=superset.tasks.celery_app:app Building from source --------------------- diff --git a/requirements.txt b/requirements.txt index 081ca70520..fedd2b177e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,6 +20,7 @@ chardet==3.0.4 # via requests click==6.7 colorama==0.3.9 contextlib2==0.5.5 +croniter==0.3.26 cryptography==1.9 defusedxml==0.5.0 # via python3-openid docutils==0.14 # via botocore @@ -69,9 +70,11 @@ python3-openid==3.1.0 # via flask-openid pytz==2018.5 # via babel, celery, flower, pandas pyyaml==3.13 requests==2.18.4 +retry==0.9.2 rfc3986==1.1.0 # via tableschema s3transfer==0.1.13 # via boto3 sasl==0.2.1 # via thrift-sasl +selenium==3.141.0 simplejson==3.15.0 six==1.11.0 # via bleach, cryptography, isodate, jsonlines, linear-tsv, pathlib2, polyline, pydruid, python-dateutil, sasl, sqlalchemy-utils, tableschema, tabulator, thrift sqlalchemy-utils==0.32.21 diff --git a/setup.py b/setup.py index 542ca3fbec..1b55314e93 100644 --- a/setup.py +++ b/setup.py @@ -94,6 +94,9 @@ setup( 'thrift-sasl>=0.2.1', 'unicodecsv', 'unidecode>=0.04.21', + 'croniter==0.3.25', + 'selenium==3.14.0', + 'retry==0.9.2', ], extras_require={ 'cors': ['flask-cors>=2.0.0'], diff --git a/superset/config.py b/superset/config.py index 1613e75df7..37afdd5f37 100644 --- a/superset/config.py +++ b/superset/config.py @@ -11,6 +11,7 @@ import json import os import sys +from celery.schedules import crontab from dateutil import tz from flask_appbuilder.security.manager import AUTH_DB @@ -309,19 +310,43 @@ WARNING_MSG = None # Default celery config is to use SQLA as a broker, in a production setting # you'll want to use a proper broker as specified here: # http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html -""" -# Example: + + class CeleryConfig(object): - BROKER_URL = 'sqla+sqlite:///celerydb.sqlite' - CELERY_IMPORTS = ('superset.sql_lab', ) - CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite' - CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}} - CELERYD_LOG_LEVEL = 'DEBUG' - CELERYD_PREFETCH_MULTIPLIER = 1 - CELERY_ACKS_LATE = True + BROKER_URL = 'sqla+sqlite:///celerydb.sqlite' + CELERY_IMPORTS = ( + 'superset.sql_lab', + 'superset.tasks', + ) + CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite' + CELERYD_LOG_LEVEL = 'DEBUG' + CELERYD_PREFETCH_MULTIPLIER = 1 + CELERY_ACKS_LATE = True + CELERY_ANNOTATIONS = { + 'sql_lab.get_sql_results': { + 'rate_limit': '100/s', + }, + 'email_reports.send': { + 'rate_limit': '1/s', + 'time_limit': 120, + 'soft_time_limit': 150, + 'ignore_result': True, + }, + } + CELERYBEAT_SCHEDULE = { + 'email_reports.schedule_hourly': { + 'task': 'email_reports.schedule_hourly', + 'schedule': crontab(minute=1, hour='*'), + }, + } + + CELERY_CONFIG = CeleryConfig + """ +# Set celery config to None to disable all the above configuration CELERY_CONFIG = None +""" # static http headers to be served by your Superset server. # This header prevents iFrames from other domains and @@ -463,6 +488,54 @@ SQL_QUERY_MUTATOR = None # using flask-compress ENABLE_FLASK_COMPRESS = True +# Enable / disable scheduled email reports +ENABLE_SCHEDULED_EMAIL_REPORTS = False + +# If enabled, certail features are run in debug mode +# Current list: +# * Emails are sent using dry-run mode (logging only) +SCHEDULED_EMAIL_DEBUG_MODE = False + +# Email reports - minimum time resolution (in minutes) for the crontab +EMAIL_REPORTS_CRON_RESOLUTION = 15 + +# Email report configuration +# From address in emails +EMAIL_REPORT_FROM_ADDRESS = 'reports@superset.org' + +# Send bcc of all reports to this address. Set to None to disable. +# This is useful for maintaining an audit trail of all email deliveries. +EMAIL_REPORT_BCC_ADDRESS = None + +# User credentials to use for generating reports +# This user should have permissions to browse all the dashboards and +# slices. +# TODO: In the future, login as the owner of the item to generate reports +EMAIL_REPORTS_USER = 'admin' +EMAIL_REPORTS_SUBJECT_PREFIX = '[Report] ' + +# The webdriver to use for generating reports. Use one of the following +# firefox +# Requires: geckodriver and firefox installations +# Limitations: can be buggy at times +# chrome: +# Requires: headless chrome +# Limitations: unable to generate screenshots of elements +EMAIL_REPORTS_WEBDRIVER = 'firefox' + +# Window size - this will impact the rendering of the data +WEBDRIVER_WINDOW = { + 'dashboard': (1600, 2000), + 'slice': (3000, 1200), +} + +# Any config options to be passed as-is to the webdriver +WEBDRIVER_CONFIGURATION = {} + +# The base URL to query for accessing the user interface +WEBDRIVER_BASEURL = 'http://0.0.0.0:8080/' + + try: if CONFIG_PATH_ENV_VAR in os.environ: # Explicitly import config module that is not in pythonpath; useful diff --git a/superset/migrations/versions/6c7537a6004a_models_for_email_reports.py b/superset/migrations/versions/6c7537a6004a_models_for_email_reports.py new file mode 100644 index 0000000000..fdfbf8c33c --- /dev/null +++ b/superset/migrations/versions/6c7537a6004a_models_for_email_reports.py @@ -0,0 +1,69 @@ +"""models for email reports + +Revision ID: 6c7537a6004a +Revises: e502db2af7be +Create Date: 2018-05-15 20:28:51.977572 + +""" + +# revision identifiers, used by Alembic. +revision = '6c7537a6004a' +down_revision = 'a61b40f9f57f' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('dashboard_email_schedules', + sa.Column('created_on', sa.DateTime(), nullable=True), + sa.Column('changed_on', sa.DateTime(), nullable=True), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('active', sa.Boolean(), nullable=True), + sa.Column('crontab', sa.String(length=50), nullable=True), + sa.Column('recipients', sa.Text(), nullable=True), + sa.Column('deliver_as_group', sa.Boolean(), nullable=True), + sa.Column('delivery_type', sa.Enum('attachment', 'inline', name='emaildeliverytype'), nullable=True), + sa.Column('dashboard_id', sa.Integer(), nullable=True), + sa.Column('created_by_fk', sa.Integer(), nullable=True), + sa.Column('changed_by_fk', sa.Integer(), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['dashboard_id'], ['dashboards.id'], ), + sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_dashboard_email_schedules_active'), 'dashboard_email_schedules', ['active'], unique=False) + op.create_table('slice_email_schedules', + sa.Column('created_on', sa.DateTime(), nullable=True), + sa.Column('changed_on', sa.DateTime(), nullable=True), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('active', sa.Boolean(), nullable=True), + sa.Column('crontab', sa.String(length=50), nullable=True), + sa.Column('recipients', sa.Text(), nullable=True), + sa.Column('deliver_as_group', sa.Boolean(), nullable=True), + sa.Column('delivery_type', sa.Enum('attachment', 'inline', name='emaildeliverytype'), nullable=True), + sa.Column('slice_id', sa.Integer(), nullable=True), + sa.Column('email_format', sa.Enum('visualization', 'data', name='sliceemailreportformat'), nullable=True), + sa.Column('created_by_fk', sa.Integer(), nullable=True), + sa.Column('changed_by_fk', sa.Integer(), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['slice_id'], ['slices.id'], ), + sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_slice_email_schedules_active'), 'slice_email_schedules', ['active'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_slice_email_schedules_active'), table_name='slice_email_schedules') + op.drop_table('slice_email_schedules') + op.drop_index(op.f('ix_dashboard_email_schedules_active'), table_name='dashboard_email_schedules') + op.drop_table('dashboard_email_schedules') + # ### end Alembic commands ### diff --git a/superset/models/__init__.py b/superset/models/__init__.py index ed8b591125..437d194919 100644 --- a/superset/models/__init__.py +++ b/superset/models/__init__.py @@ -1,3 +1,4 @@ from . import core # noqa from . import sql_lab # noqa from . import user_attributes # noqa +from . import schedules # noqa diff --git a/superset/models/schedules.py b/superset/models/schedules.py new file mode 100644 index 0000000000..fe9997dd41 --- /dev/null +++ b/superset/models/schedules.py @@ -0,0 +1,93 @@ +# pylint: disable=C,R,W +"""Models for scheduled execution of jobs""" + +import enum + +from flask_appbuilder import Model +from sqlalchemy import ( + Boolean, Column, Enum, ForeignKey, Integer, String, Text, +) +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.orm import relationship + +from superset import security_manager +from superset.models.helpers import AuditMixinNullable, ImportMixin + + +metadata = Model.metadata # pylint: disable=no-member + + +class ScheduleType(enum.Enum): + slice = 'slice' + dashboard = 'dashboard' + + +class EmailDeliveryType(enum.Enum): + attachment = 'Attachment' + inline = 'Inline' + + +class SliceEmailReportFormat(enum.Enum): + visualization = 'Visualization' + data = 'Raw data' + + +class EmailSchedule(): + + """Schedules for emailing slices / dashboards""" + + __tablename__ = 'email_schedules' + + id = Column(Integer, primary_key=True) + active = Column(Boolean, default=True, index=True) + crontab = Column(String(50)) + + @declared_attr + def user_id(self): + return Column(Integer, ForeignKey('ab_user.id')) + + @declared_attr + def user(self): + return relationship( + security_manager.user_model, + backref=self.__tablename__, + foreign_keys=[self.user_id], + ) + + recipients = Column(Text) + deliver_as_group = Column(Boolean, default=False) + delivery_type = Column(Enum(EmailDeliveryType)) + + +class DashboardEmailSchedule(Model, + AuditMixinNullable, + ImportMixin, + EmailSchedule): + __tablename__ = 'dashboard_email_schedules' + dashboard_id = Column(Integer, ForeignKey('dashboards.id')) + dashboard = relationship( + 'Dashboard', + backref='email_schedules', + foreign_keys=[dashboard_id], + ) + + +class SliceEmailSchedule(Model, + AuditMixinNullable, + ImportMixin, + EmailSchedule): + __tablename__ = 'slice_email_schedules' + slice_id = Column(Integer, ForeignKey('slices.id')) + slice = relationship( + 'Slice', + backref='email_schedules', + foreign_keys=[slice_id], + ) + email_format = Column(Enum(SliceEmailReportFormat)) + + +def get_scheduler_model(report_type): + if report_type == ScheduleType.dashboard.value: + return DashboardEmailSchedule + elif report_type == ScheduleType.slice.value: + return SliceEmailSchedule diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 09691476cd..0c76c7cc16 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -14,8 +14,8 @@ from sqlalchemy.pool import NullPool from superset import app, dataframe, db, results_backend, security_manager from superset.models.sql_lab import Query from superset.sql_parse import SupersetQuery +from superset.tasks.celery_app import app as celery_app from superset.utils.core import ( - get_celery_app, json_iso_dttm_ser, now_as_float, QueryStatus, @@ -23,8 +23,7 @@ from superset.utils.core import ( ) config = app.config -celery_app = get_celery_app(config) -stats_logger = app.config.get('STATS_LOGGER') +stats_logger = config.get('STATS_LOGGER') SQLLAB_TIMEOUT = config.get('SQLLAB_ASYNC_TIME_LIMIT_SEC', 600) log_query = config.get('QUERY_LOGGER') @@ -77,7 +76,9 @@ def session_scope(nullpool): session.close() -@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT) +@celery_app.task(name='sql_lab.get_sql_results', + bind=True, + soft_time_limit=SQLLAB_TIMEOUT) def get_sql_results( ctask, query_id, rendered_query, return_results=True, store_results=False, user_name=None, start_time=None): diff --git a/superset/tasks/__init__.py b/superset/tasks/__init__.py new file mode 100644 index 0000000000..d7259b9cb5 --- /dev/null +++ b/superset/tasks/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +from . import schedules # noqa diff --git a/superset/tasks/celery_app.py b/superset/tasks/celery_app.py new file mode 100644 index 0000000000..028ce7df26 --- /dev/null +++ b/superset/tasks/celery_app.py @@ -0,0 +1,11 @@ +# pylint: disable=C,R,W + +"""Utility functions used across Superset""" + +# Superset framework imports +from superset import app +from superset.utils.core import get_celery_app + +# Globals +config = app.config +app = get_celery_app(config) diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py new file mode 100644 index 0000000000..a8f1eb13b6 --- /dev/null +++ b/superset/tasks/schedules.py @@ -0,0 +1,441 @@ +# pylint: disable=C,R,W + +"""Utility functions used across Superset""" + +from collections import namedtuple +from datetime import datetime, timedelta +from email.utils import make_msgid, parseaddr +import logging +import time + + +import croniter +from dateutil.tz import tzlocal +from flask import render_template, Response, session, url_for +from flask_babel import gettext as __ +from flask_login import login_user +import requests +from retry.api import retry_call +from selenium.common.exceptions import WebDriverException +from selenium.webdriver import chrome, firefox +import simplejson as json +from six.moves import urllib +from werkzeug.utils import parse_cookie + +# Superset framework imports +from superset import app, db, security_manager +from superset.models.schedules import ( + EmailDeliveryType, + get_scheduler_model, + ScheduleType, + SliceEmailReportFormat, +) +from superset.tasks.celery_app import app as celery_app +from superset.utils.core import ( + get_email_address_list, + send_email_smtp, +) + +# Globals +config = app.config +logging.getLogger('tasks.email_reports').setLevel(logging.INFO) + +# Time in seconds, we will wait for the page to load and render +PAGE_RENDER_WAIT = 30 + + +EmailContent = namedtuple('EmailContent', ['body', 'data', 'images']) + + +def _get_recipients(schedule): + bcc = config.get('EMAIL_REPORT_BCC_ADDRESS', None) + + if schedule.deliver_as_group: + to = schedule.recipients + yield (to, bcc) + else: + for to in get_email_address_list(schedule.recipients): + yield (to, bcc) + + +def _deliver_email(schedule, subject, email): + for (to, bcc) in _get_recipients(schedule): + send_email_smtp( + to, subject, email.body, config, + data=email.data, + images=email.images, + bcc=bcc, + mime_subtype='related', + dryrun=config.get('SCHEDULED_EMAIL_DEBUG_MODE'), + ) + + +def _generate_mail_content(schedule, screenshot, name, url): + if schedule.delivery_type == EmailDeliveryType.attachment: + images = None + data = { + 'screenshot.png': screenshot, + } + body = __( + 'Explore in Superset

', + name=name, + url=url, + ) + elif schedule.delivery_type == EmailDeliveryType.inline: + # Get the domain from the 'From' address .. + # and make a message id without the < > in the ends + domain = parseaddr(config.get('SMTP_MAIL_FROM'))[1].split('@')[1] + msgid = make_msgid(domain)[1:-1] + + images = { + msgid: screenshot, + } + data = None + body = __( + """ + Explore in Superset

+ + """, + name=name, url=url, msgid=msgid, + ) + + return EmailContent(body, data, images) + + +def _get_auth_cookies(): + # Login with the user specified to get the reports + with app.test_request_context(): + user = security_manager.find_user(config.get('EMAIL_REPORTS_USER')) + login_user(user) + + # A mock response object to get the cookie information from + response = Response() + app.session_interface.save_session(app, session, response) + + cookies = [] + + # Set the cookies in the driver + for name, value in response.headers: + if name.lower() == 'set-cookie': + cookie = parse_cookie(value) + cookies.append(cookie['session']) + + return cookies + + +def _get_url_path(view, **kwargs): + with app.test_request_context(): + return urllib.parse.urljoin( + str(config.get('WEBDRIVER_BASEURL')), + url_for(view, **kwargs), + ) + + +def create_webdriver(): + # Create a webdriver for use in fetching reports + if config.get('EMAIL_REPORTS_WEBDRIVER') == 'firefox': + driver_class = firefox.webdriver.WebDriver + options = firefox.options.Options() + elif config.get('EMAIL_REPORTS_WEBDRIVER') == 'chrome': + driver_class = chrome.webdriver.WebDriver + options = chrome.options.Options() + + options.add_argument('--headless') + + # Prepare args for the webdriver init + kwargs = dict( + options=options, + ) + kwargs.update(config.get('WEBDRIVER_CONFIGURATION')) + + # Initialize the driver + driver = driver_class(**kwargs) + + # Some webdrivers need an initial hit to the welcome URL + # before we set the cookie + welcome_url = _get_url_path('Superset.welcome') + + # Hit the welcome URL and check if we were asked to login + driver.get(welcome_url) + elements = driver.find_elements_by_id('loginbox') + + # This indicates that we were not prompted for a login box. + if not elements: + return driver + + # Set the cookies in the driver + for cookie in _get_auth_cookies(): + info = dict(name='session', value=cookie) + driver.add_cookie(info) + + return driver + + +def destroy_webdriver(driver): + """ + Destroy a driver + """ + + # This is some very flaky code in selenium. Hence the retries + # and catch-all exceptions + try: + retry_call(driver.close, tries=2) + except Exception: + pass + try: + driver.quit() + except Exception: + pass + + +def deliver_dashboard(schedule): + """ + Given a schedule, delivery the dashboard as an email report + """ + dashboard = schedule.dashboard + + dashboard_url = _get_url_path( + 'Superset.dashboard', + dashboard_id=dashboard.id, + ) + + # Create a driver, fetch the page, wait for the page to render + driver = create_webdriver() + window = config.get('WEBDRIVER_WINDOW')['dashboard'] + driver.set_window_size(*window) + driver.get(dashboard_url) + time.sleep(PAGE_RENDER_WAIT) + + # Set up a function to retry once for the element. + # This is buggy in certain selenium versions with firefox driver + get_element = getattr(driver, 'find_element_by_class_name') + element = retry_call( + get_element, + fargs=['grid-container'], + tries=2, + delay=PAGE_RENDER_WAIT, + ) + + try: + screenshot = element.screenshot_as_png + except WebDriverException: + # Some webdrivers do not support screenshots for elements. + # In such cases, take a screenshot of the entire page. + screenshot = driver.screenshot() # pylint: disable=no-member + finally: + destroy_webdriver(driver) + + # Generate the email body and attachments + email = _generate_mail_content( + schedule, + screenshot, + dashboard.dashboard_title, + dashboard_url, + ) + + subject = __( + '%(prefix)s %(title)s', + prefix=config.get('EMAIL_REPORTS_SUBJECT_PREFIX'), + title=dashboard.dashboard_title, + ) + + _deliver_email(schedule, subject, email) + + +def _get_slice_data(schedule): + slc = schedule.slice + + slice_url = _get_url_path( + 'Superset.explore_json', + csv='true', + form_data=json.dumps({'slice_id': slc.id}), + ) + + # URL to include in the email + url = _get_url_path( + 'Superset.slice', + slice_id=slc.id, + ) + + cookies = {} + for cookie in _get_auth_cookies(): + cookies['session'] = cookie + + response = requests.get(slice_url, cookies=cookies) + response.raise_for_status() + + # TODO: Move to the csv module + rows = [r.split(b',') for r in response.content.splitlines()] + + if schedule.delivery_type == EmailDeliveryType.inline: + data = None + + # Parse the csv file and generate HTML + columns = rows.pop(0) + with app.app_context(): + body = render_template( + 'superset/reports/slice_data.html', + columns=columns, + rows=rows, + name=slc.slice_name, + link=url, + ) + + elif schedule.delivery_type == EmailDeliveryType.attachment: + data = { + __('%(name)s.csv', name=slc.slice_name): response.content, + } + body = __( + 'Explore in Superset

', + name=slc.slice_name, + url=url, + ) + + return EmailContent(body, data, None) + + +def _get_slice_visualization(schedule): + slc = schedule.slice + + # Create a driver, fetch the page, wait for the page to render + driver = create_webdriver() + window = config.get('WEBDRIVER_WINDOW')['slice'] + driver.set_window_size(*window) + + slice_url = _get_url_path( + 'Superset.slice', + slice_id=slc.id, + ) + + driver.get(slice_url) + time.sleep(PAGE_RENDER_WAIT) + + # Set up a function to retry once for the element. + # This is buggy in certain selenium versions with firefox driver + element = retry_call( + driver.find_element_by_class_name, + fargs=['chart-container'], + tries=2, + delay=PAGE_RENDER_WAIT, + ) + + try: + screenshot = element.screenshot_as_png + except WebDriverException: + # Some webdrivers do not support screenshots for elements. + # In such cases, take a screenshot of the entire page. + screenshot = driver.screenshot() # pylint: disable=no-member + finally: + destroy_webdriver(driver) + + # Generate the email body and attachments + return _generate_mail_content( + schedule, + screenshot, + slc.slice_name, + slice_url, + ) + + +def deliver_slice(schedule): + """ + Given a schedule, delivery the slice as an email report + """ + if schedule.email_format == SliceEmailReportFormat.data: + email = _get_slice_data(schedule) + elif schedule.email_format == SliceEmailReportFormat.visualization: + email = _get_slice_visualization(schedule) + else: + raise RuntimeError('Unknown email report format') + + subject = __( + '%(prefix)s %(title)s', + prefix=config.get('EMAIL_REPORTS_SUBJECT_PREFIX'), + title=schedule.slice.slice_name, + ) + + _deliver_email(schedule, subject, email) + + +@celery_app.task(name='email_reports.send', bind=True, soft_time_limit=300) +def schedule_email_report(task, report_type, schedule_id, recipients=None): + model_cls = get_scheduler_model(report_type) + schedule = db.session.query(model_cls).get(schedule_id) + + # The user may have disabled the schedule. If so, ignore this + if not schedule or not schedule.active: + logging.info('Ignoring deactivated schedule') + return + + # TODO: Detach the schedule object from the db session + if recipients is not None: + schedule.id = schedule_id + schedule.recipients = recipients + + if report_type == ScheduleType.dashboard.value: + deliver_dashboard(schedule) + elif report_type == ScheduleType.slice.value: + deliver_slice(schedule) + else: + raise RuntimeError('Unknown report type') + + +def next_schedules(crontab, start_at, stop_at, resolution=0): + crons = croniter.croniter(crontab, start_at - timedelta(seconds=1)) + previous = start_at - timedelta(days=1) + + for eta in crons.all_next(datetime): + # Do not cross the time boundary + if eta >= stop_at: + break + + if eta < start_at: + continue + + # Do not allow very frequent tasks + if eta - previous < timedelta(seconds=resolution): + continue + + yield eta + previous = eta + + +def schedule_window(report_type, start_at, stop_at, resolution): + """ + Find all active schedules and schedule celery tasks for + each of them with a specific ETA (determined by parsing + the cron schedule for the schedule) + """ + model_cls = get_scheduler_model(report_type) + dbsession = db.create_scoped_session() + schedules = dbsession.query(model_cls).filter(model_cls.active.is_(True)) + + for schedule in schedules: + args = ( + report_type, + schedule.id, + ) + + # Schedule the job for the specified time window + for eta in next_schedules(schedule.crontab, + start_at, + stop_at, + resolution=resolution): + schedule_email_report.apply_async(args, eta=eta) + + +@celery_app.task(name='email_reports.schedule_hourly') +def schedule_hourly(): + """ Celery beat job meant to be invoked hourly """ + + if not config.get('ENABLE_SCHEDULED_EMAIL_REPORTS'): + logging.info('Scheduled email reports not enabled in config') + return + + resolution = config.get('EMAIL_REPORTS_CRON_RESOLUTION', 0) * 60 + + # Get the top of the hour + start_at = datetime.now(tzlocal()).replace(microsecond=0, second=0, minute=0) + stop_at = start_at + timedelta(seconds=3600) + schedule_window(ScheduleType.dashboard.value, start_at, stop_at, resolution) + schedule_window(ScheduleType.slice.value, start_at, stop_at, resolution) diff --git a/superset/templates/superset/reports/slice_data.html b/superset/templates/superset/reports/slice_data.html new file mode 100644 index 0000000000..c5bb3d1035 --- /dev/null +++ b/superset/templates/superset/reports/slice_data.html @@ -0,0 +1,31 @@ + + + + +

+ Explore this data in Superset +

+ + + + + + + {%- for column in columns %} + + {%- endfor %} + + + + {%- for row in rows %} + + {%- for column in row %} + + {%- endfor %} + + {%- endfor %} + +
{{ name }}
{{ column.decode('utf-8') | replace('_', ' ') | title }}
{{ column.decode('utf-8') }}
+ + diff --git a/superset/utils/core.py b/superset/utils/core.py index bd23d0776c..2a002b8c16 100644 --- a/superset/utils/core.py +++ b/superset/utils/core.py @@ -4,6 +4,7 @@ from builtins import object from datetime import date, datetime, time, timedelta import decimal from email.mime.application import MIMEApplication +from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formatdate @@ -582,21 +583,23 @@ def notify_user_about_perm_udate( dryrun=not config.get('EMAIL_NOTIFICATIONS')) -def send_email_smtp(to, subject, html_content, config, files=None, - dryrun=False, cc=None, bcc=None, mime_subtype='mixed'): +def send_email_smtp(to, subject, html_content, config, + files=None, data=None, images=None, dryrun=False, + cc=None, bcc=None, mime_subtype='mixed'): """ Send an email with html content, eg: send_email_smtp( 'test@example.com', 'foo', 'Foo bar',['/dev/null'], dryrun=True) """ smtp_mail_from = config.get('SMTP_MAIL_FROM') - to = get_email_address_list(to) msg = MIMEMultipart(mime_subtype) msg['Subject'] = subject msg['From'] = smtp_mail_from msg['To'] = ', '.join(to) + msg.preamble = 'This is a multi-part message in MIME format.' + recipients = to if cc: cc = get_email_address_list(cc) @@ -612,6 +615,7 @@ def send_email_smtp(to, subject, html_content, config, files=None, mime_text = MIMEText(html_content, 'html') msg.attach(mime_text) + # Attach files by reading them from disk for fname in files or []: basename = os.path.basename(fname) with open(fname, 'rb') as f: @@ -621,6 +625,23 @@ def send_email_smtp(to, subject, html_content, config, files=None, Content_Disposition="attachment; filename='%s'" % basename, Name=basename)) + # Attach any files passed directly + for name, body in (data or {}).items(): + msg.attach( + MIMEApplication( + body, + Content_Disposition="attachment; filename='%s'" % name, + Name=name, + )) + + # Attach any inline images, which may be required for display in + # HTML content (inline) + for msgid, body in (images or {}).items(): + image = MIMEImage(body) + image.add_header('Content-ID', '<%s>' % msgid) + image.add_header('Content-Disposition', 'inline') + msg.attach(image) + send_MIME_email(smtp_mail_from, recipients, msg, config, dryrun=dryrun) @@ -639,7 +660,7 @@ def send_MIME_email(e_from, e_to, mime_msg, config, dryrun=False): s.starttls() if SMTP_USER and SMTP_PASSWORD: s.login(SMTP_USER, SMTP_PASSWORD) - logging.info('Sent an alert email to ' + str(e_to)) + logging.info('Sent an email to ' + str(e_to)) s.sendmail(e_from, e_to, mime_msg.as_string()) s.quit() else: @@ -651,11 +672,13 @@ def get_email_address_list(address_string): if isinstance(address_string, basestring): if ',' in address_string: address_string = address_string.split(',') + elif '\n' in address_string: + address_string = address_string.split('\n') elif ';' in address_string: address_string = address_string.split(';') else: address_string = [address_string] - return address_string + return [x.strip() for x in address_string if x.strip()] def choicify(values): diff --git a/superset/views/__init__.py b/superset/views/__init__.py index eed1ff2326..6bd52d9c14 100644 --- a/superset/views/__init__.py +++ b/superset/views/__init__.py @@ -4,3 +4,4 @@ from . import core # noqa from . import sql_lab # noqa from . import annotations # noqa from . import datasource # noqa +from . import schedules # noqa diff --git a/superset/views/core.py b/superset/views/core.py index c565e3a52e..3c0ffa026b 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -1635,7 +1635,6 @@ class Superset(BaseSupersetView): @staticmethod def _set_dash_metadata(dashboard, data): positions = data['positions'] - # find slices in the position data slice_ids = [] slice_id_to_name = {} diff --git a/superset/views/schedules.py b/superset/views/schedules.py new file mode 100644 index 0000000000..3e576214a3 --- /dev/null +++ b/superset/views/schedules.py @@ -0,0 +1,279 @@ +# pylint: disable=C,R,W + +import enum + +from croniter import croniter +from flask import flash, g +from flask_appbuilder import expose +from flask_appbuilder.models.sqla.interface import SQLAInterface +from flask_appbuilder.security.decorators import has_access +from flask_babel import gettext as __ +from flask_babel import lazy_gettext as _ +import simplejson as json +from wtforms import BooleanField, StringField + +from superset import app, appbuilder, db, security_manager +from superset.exceptions import SupersetException +from superset.models.core import Dashboard, Slice +from superset.models.schedules import ( + DashboardEmailSchedule, + ScheduleType, + SliceEmailSchedule, +) +from superset.tasks.schedules import schedule_email_report +from superset.utils.core import ( + get_email_address_list, + json_iso_dttm_ser, +) +from superset.views.core import json_success +from .base import DeleteMixin, SupersetModelView + + +class EmailScheduleView(SupersetModelView, DeleteMixin): + _extra_data = { + 'test_email': False, + 'test_email_recipients': None, + } + schedule_type = None + schedule_type_model = None + + page_size = 20 + + add_exclude_columns = [ + 'user', + 'created_on', + 'changed_on', + 'created_by', + 'changed_by', + ] + + edit_exclude_columns = add_exclude_columns + + description_columns = { + 'deliver_as_group': 'If enabled, send a single email to all ' + 'recipients (in email/To: field)', + 'crontab': 'Unix style crontab schedule to deliver emails. ' + 'Changes to schedules reflect in one hour.', + 'delivery_type': 'Indicates how the rendered content is delivered', + } + + add_form_extra_fields = { + 'test_email': BooleanField( + 'Send Test Email', + default=False, + description='If enabled, we send a test mail on create / update', + ), + 'test_email_recipients': StringField( + 'Test Email Recipients', + default=None, + description='List of recipients to send test email to. ' + 'If empty, we send it to the original recipients', + ), + } + + edit_form_extra_fields = add_form_extra_fields + + def process_form(self, form, is_created): + recipients = form.test_email_recipients.data.strip() or None + self._extra_data['test_email'] = form.test_email.data + self._extra_data['test_email_recipients'] = recipients + + def pre_add(self, obj): + try: + recipients = get_email_address_list(obj.recipients) + obj.recipients = ', '.join(recipients) + except Exception: + raise SupersetException('Invalid email list') + + obj.user = obj.user or g.user + if not croniter.is_valid(obj.crontab): + raise SupersetException('Invalid crontab format') + + def pre_update(self, obj): + self.pre_add(obj) + + def post_add(self, obj): + # Schedule a test mail if the user requested for it. + if self._extra_data['test_email']: + recipients = self._extra_data['test_email_recipients'] + args = (self.schedule_type, obj.id) + kwargs = dict(recipients=recipients) + schedule_email_report.apply_async(args=args, kwargs=kwargs) + + # Notify the user that schedule changes will be activate only in the + # next hour + if obj.active: + flash('Schedule changes will get applied in one hour', 'warning') + + def post_update(self, obj): + self.post_add(obj) + + @has_access + @expose('/fetch//', methods=['GET']) + def fetch_schedules(self, item_id): + + query = db.session.query(self.datamodel.obj) + query = query.join(self.schedule_type_model).filter( + self.schedule_type_model.id == item_id) + + schedules = [] + for schedule in query.all(): + info = {'schedule': schedule.id} + + for col in self.list_columns + self.add_exclude_columns: + info[col] = getattr(schedule, col) + + if isinstance(info[col], enum.Enum): + info[col] = info[col].name + elif isinstance(info[col], security_manager.user_model): + info[col] = info[col].username + + info['user'] = schedule.user.username + info[self.schedule_type] = getattr(schedule, self.schedule_type).id + schedules.append(info) + + return json_success(json.dumps(schedules, default=json_iso_dttm_ser)) + + +class DashboardEmailScheduleView(EmailScheduleView): + schedule_type = ScheduleType.dashboard.name + schedule_type_model = Dashboard + + add_title = _('Schedule Email Reports for Dashboards') + edit_title = add_title + list_title = _('Manage Email Reports for Dashboards') + + datamodel = SQLAInterface(DashboardEmailSchedule) + order_columns = ['user', 'dashboard', 'created_on'] + + list_columns = [ + 'dashboard', + 'active', + 'crontab', + 'user', + 'deliver_as_group', + 'delivery_type', + ] + + add_columns = [ + 'dashboard', + 'active', + 'crontab', + 'recipients', + 'deliver_as_group', + 'delivery_type', + 'test_email', + 'test_email_recipients', + ] + + edit_columns = add_columns + + search_columns = [ + 'dashboard', + 'active', + 'user', + 'deliver_as_group', + 'delivery_type', + ] + + label_columns = { + 'dashboard': _('Dashboard'), + 'created_on': _('Created On'), + 'changed_on': _('Changed On'), + 'user': _('User'), + 'active': _('Active'), + 'crontab': _('Crontab'), + 'recipients': _('Recipients'), + 'deliver_as_group': _('Deliver As Group'), + 'delivery_type': _('Delivery Type'), + } + + def pre_add(self, obj): + if obj.dashboard is None: + raise SupersetException('Dashboard is mandatory') + super(DashboardEmailScheduleView, self).pre_add(obj) + + +class SliceEmailScheduleView(EmailScheduleView): + schedule_type = ScheduleType.slice.name + schedule_type_model = Slice + add_title = _('Schedule Email Reports for Charts') + edit_title = add_title + list_title = _('Manage Email Reports for Charts') + + datamodel = SQLAInterface(SliceEmailSchedule) + order_columns = ['user', 'slice', 'created_on'] + list_columns = [ + 'slice', + 'active', + 'crontab', + 'user', + 'deliver_as_group', + 'delivery_type', + 'email_format', + ] + + add_columns = [ + 'slice', + 'active', + 'crontab', + 'recipients', + 'deliver_as_group', + 'delivery_type', + 'email_format', + 'test_email', + 'test_email_recipients', + ] + + edit_columns = add_columns + + search_columns = [ + 'slice', + 'active', + 'user', + 'deliver_as_group', + 'delivery_type', + 'email_format', + ] + + label_columns = { + 'slice': _('Chart'), + 'created_on': _('Created On'), + 'changed_on': _('Changed On'), + 'user': _('User'), + 'active': _('Active'), + 'crontab': _('Crontab'), + 'recipients': _('Recipients'), + 'deliver_as_group': _('Deliver As Group'), + 'delivery_type': _('Delivery Type'), + 'email_format': _('Email Format'), + } + + def pre_add(self, obj): + if obj.slice is None: + raise SupersetException('Slice is mandatory') + super(SliceEmailScheduleView, self).pre_add(obj) + + +def _register_schedule_menus(): + appbuilder.add_separator('Manage') + + appbuilder.add_view( + DashboardEmailScheduleView, + 'Dashboard Email Schedules', + label=__('Dashboard Emails'), + category='Manage', + category_label=__('Manage'), + icon='fa-search') + + appbuilder.add_view( + SliceEmailScheduleView, + 'Chart Emails', + label=__('Chart Email Schedules'), + category='Manage', + category_label=__('Manage'), + icon='fa-search') + + +if app.config.get('ENABLE_SCHEDULED_EMAIL_REPORTS'): + _register_schedule_menus() diff --git a/tests/email_tests.py b/tests/email_tests.py index 559372b2af..229eb8be09 100644 --- a/tests/email_tests.py +++ b/tests/email_tests.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- """Unit tests for email service in Superset""" from email.mime.application import MIMEApplication +from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart import logging import tempfile @@ -10,6 +11,7 @@ import mock from superset import app from superset.utils import core as utils +from .utils import read_fixture send_email_test = mock.Mock() @@ -37,6 +39,39 @@ class EmailSmtpTest(unittest.TestCase): mimeapp = MIMEApplication('attachment') assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload() + @mock.patch('superset.utils.core.send_MIME_email') + def test_send_smtp_data(self, mock_send_mime): + utils.send_email_smtp( + 'to', 'subject', 'content', app.config, data={'1.txt': b'data'}) + assert mock_send_mime.called + call_args = mock_send_mime.call_args[0] + logging.debug(call_args) + assert call_args[0] == app.config.get('SMTP_MAIL_FROM') + assert call_args[1] == ['to'] + msg = call_args[2] + assert msg['Subject'] == 'subject' + assert msg['From'] == app.config.get('SMTP_MAIL_FROM') + assert len(msg.get_payload()) == 2 + mimeapp = MIMEApplication('data') + assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload() + + @mock.patch('superset.utils.core.send_MIME_email') + def test_send_smtp_inline_images(self, mock_send_mime): + image = read_fixture('sample.png') + utils.send_email_smtp( + 'to', 'subject', 'content', app.config, images=dict(blah=image)) + assert mock_send_mime.called + call_args = mock_send_mime.call_args[0] + logging.debug(call_args) + assert call_args[0] == app.config.get('SMTP_MAIL_FROM') + assert call_args[1] == ['to'] + msg = call_args[2] + assert msg['Subject'] == 'subject' + assert msg['From'] == app.config.get('SMTP_MAIL_FROM') + assert len(msg.get_payload()) == 2 + mimeapp = MIMEImage(image) + assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload() + @mock.patch('superset.utils.core.send_MIME_email') def test_send_bcc_smtp(self, mock_send_mime): attachment = tempfile.NamedTemporaryFile() diff --git a/tests/fixtures/sample.png b/tests/fixtures/sample.png new file mode 100644 index 0000000000000000000000000000000000000000..a3522415108e9f8a12217512738cf384d5ff5644 GIT binary patch literal 4481 zcmZWsbySpX(_awjPC;S?B!wj;M7ojgrOTyUmIYS2W9jY|kdjtd>5!6K2}zNZ?p6?9 ze4gif-{+j~I%n>gYi52k*UX&zkBQXMP$I&o!v_EWL@LViI(MAPOmuMZct|lg3q0N`)8Mw@E8{$Tgh%=! zH1v(N5;c`60aqb-jEeOBH0I|*W_JkL?r$p@2*m8kjFwk2W^?;_+lgQD)yD8MyBoLt zrjrrC8)*ON$cUmo;4|48K~acrW~9o1+Bi!ffKn@njt}A{-z2H6y$l%KJUKYt$F4`Z zepMvjvDmn+D0B5y!PZr#Bs^DsTVvVrBWN2`R&kKhgwKg2 zOf$%`fvb^uyi5H`(v?^)A33<$)b_95@O6W%Clxv3(%{0#OQ$S8K4aSuhxG`>lTXlR z&@JOy8a@MmMQH6Xjp|w?dAHItxYrGa0P%x#_K0M+ZcC4!ibH;?t{;ll&S!+p-L>i~ zM`tj!WroWRQhjGE--}ts3(OX1A2)pev98oR*tpDri{szP#254+PLGe^C8qzh^aE3%+Zqc)GQnq<92cL(^4|dhh#We z%=#e@<1%MK_+bP*_ls&H(MlJE17*`!HUGpzoOLxm+S5<=LC3o~9cW^au*zJx#W#xM z@NX{I)kM(4NInXtZx7eku^JWyDdSzP42GDdlWT7znVQT8bOuyLnP2aYT*eyxe$ac+ zLg%t9xT~ZJeddn)JUq9D*U||rXlcz}OI=CvoUO|y_fUQguc$&nwF%NCdaeDi#kA<2ubT;@FU^~?v*es;Ii`5q|mp(tq1@a z%SmFfGObP=RMDOJ_2-(;+H0(?7+W$h7Lr8&HKAj?UM;S7aodF@n^==nXENPizzDId zxb=cCd;VtpD*1{rKYI#q^0rWOC7&^xO;T(=g_s~(OW_Re^>7920Wcj~54CkM_`X4` zkY&}Oghj7+9mXz;M$eWg#mAsF+L-=jdT|nqh?*Xe)do)rf4Yt+sUDo88Z0`Qa7zU^ zdnhS6-d5;rD7ox?E0Sg0Wx{LNFWiF&dFBv%MMTgdgq%NFG`=yZF?B{6%6?ADL=mpc znnXPiwWboqTBq{!E%`Xxxa2tAIF%05);*)dN=-`W$CNKOzPue^KPNsk+s&Ww+_K zE&p2ZjPy+HOyKPIw%K+Fog*EE(2CHyy`BU8#3sa&P*)L`CA%)mCBY@+$RTH6G~cDw zg}YBErbS@CZQWBnjZO1|-UscI=5= zPR^!x^1s}Mly{w4*$Q%JmS%=vi(z+RTZiOgn_+h&ZILcW1!OGpB9=q3TG2sqCsr&5 zJN7J={UKVUTBO~k&~n8w$?E;sTF!uvuHXdL1f*o71U>R}M0OZ|WP4;kb7Ht{UnN9e`}#{dOTMoc z@N^4*Lw!cgp8vpF#T%fQ(hwC{uEl07YJH!RW zMbn$mTd3D(ea>!lTH#@Do_JXb(O}T9$T7I8cP2 zyf#^zT0b9Kn3Y)k{pQMoQJB$)(MKXrQy{x1yWVHqMpI4`wV{DF2bCB`O%vCcJKu9g zc{E(Gq>)7r&2@XP+bkzUtI8jNC-RWw&Lq<;-MhLi31=kiDT!#6FZTT3$lu+!zhw z+*a6e8*VbLjJz9b1Yx8pjhEEm6E4v9N6sEjZQXYz;<2dsJSIP0B%`39t@CgTcyZ=y zoyY3%YL5%U(csa<%FfCn`z3n{`zf1-vT1@Rr^@rl{K+J>7jc^3lf|D3S#BfC{U&ls zQq@}c7ofj#wrFha(Vz2Npe4s@=fiTGp(&v$ZJhbDiMx*HT8_%wX_GQ>Om-|A_tSgL z(0dzR$Aezzaq)@4iM&#uk{ZDXoagf$*(zg|0guNeQ>STItLV1TD4j9APQJ7XGL+Hp z3ZFgtKr*78%aSqiY}$1{stJvMk02m39KAZcli|!4FmN4C;7h%08u3KUY^LtBz4lRI zd#_CNt8}fjmpnh)K3A8uZtjhT7FYE)_eMuE-BOA!#6#xOSBHKMwHSz2 z&(yEH;{14OooQyIFuReOlS&hSx@|h%NYrj0aubW?P5R@G5Ok9{d(sA8;A^8fI=s$}F%)pkZ--PeB3{^
LoELSZ4djIq)k$0oIWQaH2qoxlH1DtrtTBeP3O(NRLRZQ1@lGkq4Yrhy8xJ%!_D!bMP-K=4h7DY zIi5vqtNBmG9rcm%BccDpAr+7t5{ep z+~wYX#IQ#vFZ^Vc4c)k6K#@-_T##RL++yi3IT1LvXy@Op33za2Be%P~;qf2BP|;yM z1OPCy9rVBmu)3O@aeyS|1$go6bJoz{=drkyH|f{@0ulvFAn-|<0bJo z56HXkWDuc>#(G5-wa?hxJ)$3Of|#u&=*-MPDCq^row=qVFHlL=s${Dkt!@LM_H zuOj^dWnAKL;VmC`>N35W8CFIg@pEMJ!fgz{_n2cS;6b^&P0`DIaleNvzSpunzn4+isB>pu*IF&{t0A%+9#W`KLQJMg z{$XQRQtcM1-ofDUe9kX7n5*wiT+Ry=jLjZ}uP;}JE}REr2iFJ>JP8cA*&e#cJkA4d z>gjCjZG7LhTI!QGgc_#)l1*_WdOE$x-E%?xlP@49@b*axx3-(n>WnJL!cX@|P}bDr zvfi$BkE?4|h1p@4FPu?wK-k?bmN5Jh&W025HiHH+`c%<6@yZjIkL=U^22JKOorGRq z@JQ7`zbdK8oFg+o&({$sr=dmmXjm}Gxnkvtt&7y=ji-9At449i8rh3B*Y8+8Jxup_ zlRK{jB6%23G^p@_n7!fA;)zfhtH`YMnKYXtv0~h(m6I$S^&Qt zU7^lQ9s89vT`n2HEEX9geeH@d3urR=mi7e>U^WQ2QauI|vm47i zeI#~?Z8Qt=7VYy;)SuK!^_U_{|K5LRJKM@xH4Wr-&7a9s6)wGKL+S-7h5?$$}csH zs1d-@6)eKS@Ot9>-iM;q^Oia!I#;gdTr}TPTu{8u-!mf?rj`n3Rev%KayTEeHq?Ht zpfEtDNr-;is+_rZzqnbC-Xv8aou)cs%6h&bWw`jep2{`}Zwj5j-cTRkpC3&{K|{V; H)-w1%S{FaS literal 0 HcmV?d00001 diff --git a/tests/fixtures/trends.csv b/tests/fixtures/trends.csv new file mode 100644 index 0000000000..1e347d9150 --- /dev/null +++ b/tests/fixtures/trends.csv @@ -0,0 +1,3 @@ +t1,t2,t3__sum +c11,c12,c13 +c21,c22,c23 diff --git a/tests/schedules_test.py b/tests/schedules_test.py new file mode 100644 index 0000000000..9ade8ebffd --- /dev/null +++ b/tests/schedules_test.py @@ -0,0 +1,368 @@ +from datetime import datetime, timedelta +import unittest + +from flask_babel import gettext as __ +from mock import Mock, patch, PropertyMock +from selenium.common.exceptions import WebDriverException + +from superset import app, db +from superset.models.core import Dashboard, Slice +from superset.models.schedules import ( + DashboardEmailSchedule, + EmailDeliveryType, + SliceEmailReportFormat, + SliceEmailSchedule, +) +from superset.tasks.schedules import ( + create_webdriver, + deliver_dashboard, + deliver_slice, + next_schedules, +) +from .utils import read_fixture + + +class SchedulesTestCase(unittest.TestCase): + + RECIPIENTS = 'recipient1@superset.com, recipient2@superset.com' + BCC = 'bcc@superset.com' + CSV = read_fixture('trends.csv') + + @classmethod + def setUpClass(cls): + cls.common_data = dict( + active=True, + crontab='* * * * *', + recipients=cls.RECIPIENTS, + deliver_as_group=True, + delivery_type=EmailDeliveryType.inline, + ) + + # Pick up a random slice and dashboard + slce = db.session.query(Slice).all()[0] + dashboard = db.session.query(Dashboard).all()[0] + + dashboard_schedule = DashboardEmailSchedule(**cls.common_data) + dashboard_schedule.dashboard_id = dashboard.id + dashboard_schedule.user_id = 1 + db.session.add(dashboard_schedule) + + slice_schedule = SliceEmailSchedule(**cls.common_data) + slice_schedule.slice_id = slce.id + slice_schedule.user_id = 1 + slice_schedule.email_format = SliceEmailReportFormat.data + + db.session.add(slice_schedule) + db.session.commit() + + cls.slice_schedule = slice_schedule.id + cls.dashboard_schedule = dashboard_schedule.id + + @classmethod + def tearDownClass(cls): + db.session.query(SliceEmailSchedule).filter_by(id=cls.slice_schedule).delete() + db.session.query(DashboardEmailSchedule).filter_by( + id=cls.dashboard_schedule).delete() + db.session.commit() + + def test_crontab_scheduler(self): + crontab = '* * * * *' + + start_at = datetime.now().replace(microsecond=0, second=0, minute=0) + stop_at = start_at + timedelta(seconds=3600) + + # Fire off the task every minute + schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0)) + + self.assertEqual(schedules[0], start_at) + self.assertEqual(schedules[-1], stop_at - timedelta(seconds=60)) + self.assertEqual(len(schedules), 60) + + # Fire off the task every 10 minutes, controlled via resolution + schedules = list(next_schedules(crontab, start_at, stop_at, resolution=10 * 60)) + + self.assertEqual(schedules[0], start_at) + self.assertEqual(schedules[-1], stop_at - timedelta(seconds=10 * 60)) + self.assertEqual(len(schedules), 6) + + # Fire off the task every 12 minutes, controlled via resolution + schedules = list(next_schedules(crontab, start_at, stop_at, resolution=12 * 60)) + + self.assertEqual(schedules[0], start_at) + self.assertEqual(schedules[-1], stop_at - timedelta(seconds=12 * 60)) + self.assertEqual(len(schedules), 5) + + def test_wider_schedules(self): + crontab = '*/15 2,10 * * *' + + for hour in range(0, 24): + start_at = datetime.now().replace( + microsecond=0, second=0, minute=0, hour=hour) + stop_at = start_at + timedelta(seconds=3600) + schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0)) + + if hour in (2, 10): + self.assertEqual(len(schedules), 4) + else: + self.assertEqual(len(schedules), 0) + + def test_complex_schedule(self): + # Run the job on every Friday of March and May + # On these days, run the job at + # 5:10 pm + # 5:11 pm + # 5:12 pm + # 5:13 pm + # 5:14 pm + # 5:15 pm + # 5:25 pm + # 5:28 pm + # 5:31 pm + # 5:34 pm + # 5:37 pm + # 5:40 pm + crontab = '10-15,25-40/3 17 * 3,5 5' + start_at = datetime.strptime('2018/01/01', '%Y/%m/%d') + stop_at = datetime.strptime('2018/12/31', '%Y/%m/%d') + + schedules = list(next_schedules(crontab, start_at, stop_at, resolution=60)) + self.assertEqual(len(schedules), 108) + fmt = '%Y-%m-%d %H:%M:%S' + self.assertEqual(schedules[0], datetime.strptime('2018-03-02 17:10:00', fmt)) + self.assertEqual(schedules[-1], datetime.strptime('2018-05-25 17:40:00', fmt)) + self.assertEqual(schedules[59], datetime.strptime('2018-03-30 17:40:00', fmt)) + self.assertEqual(schedules[60], datetime.strptime('2018-05-04 17:10:00', fmt)) + + @patch('superset.tasks.schedules.firefox.webdriver.WebDriver') + def test_create_driver(self, mock_driver_class): + mock_driver = Mock() + mock_driver_class.return_value = mock_driver + mock_driver.find_elements_by_id.side_effect = [True, False] + + create_webdriver() + create_webdriver() + mock_driver.add_cookie.assert_called_once() + + @patch('superset.tasks.schedules.firefox.webdriver.WebDriver') + @patch('superset.tasks.schedules.send_email_smtp') + @patch('superset.tasks.schedules.time') + def test_deliver_dashboard_inline(self, mtime, send_email_smtp, driver_class): + element = Mock() + driver = Mock() + mtime.sleep.return_value = None + + driver_class.return_value = driver + + # Ensure that we are able to login with the driver + driver.find_elements_by_id.side_effect = [True, False] + driver.find_element_by_class_name.return_value = element + element.screenshot_as_png = read_fixture('sample.png') + + schedule = db.session.query(DashboardEmailSchedule).filter_by( + id=self.dashboard_schedule).all()[0] + + deliver_dashboard(schedule) + mtime.sleep.assert_called_once() + driver.screenshot.assert_not_called() + send_email_smtp.assert_called_once() + + @patch('superset.tasks.schedules.firefox.webdriver.WebDriver') + @patch('superset.tasks.schedules.send_email_smtp') + @patch('superset.tasks.schedules.time') + def test_deliver_dashboard_as_attachment(self, mtime, send_email_smtp, driver_class): + element = Mock() + driver = Mock() + mtime.sleep.return_value = None + + driver_class.return_value = driver + + # Ensure that we are able to login with the driver + driver.find_elements_by_id.side_effect = [True, False] + driver.find_element_by_id.return_value = element + driver.find_element_by_class_name.return_value = element + element.screenshot_as_png = read_fixture('sample.png') + + schedule = db.session.query(DashboardEmailSchedule).filter_by( + id=self.dashboard_schedule).all()[0] + + schedule.delivery_type = EmailDeliveryType.attachment + deliver_dashboard(schedule) + + mtime.sleep.assert_called_once() + driver.screenshot.assert_not_called() + send_email_smtp.assert_called_once() + self.assertIsNone(send_email_smtp.call_args[1]['images']) + self.assertEquals( + send_email_smtp.call_args[1]['data']['screenshot.png'], + element.screenshot_as_png, + ) + + @patch('superset.tasks.schedules.firefox.webdriver.WebDriver') + @patch('superset.tasks.schedules.send_email_smtp') + @patch('superset.tasks.schedules.time') + def test_dashboard_chrome_like(self, mtime, send_email_smtp, driver_class): + # Test functionality for chrome driver which does not support + # element snapshots + element = Mock() + driver = Mock() + mtime.sleep.return_value = None + type(element).screenshot_as_png = PropertyMock(side_effect=WebDriverException) + + driver_class.return_value = driver + + # Ensure that we are able to login with the driver + driver.find_elements_by_id.side_effect = [True, False] + driver.find_element_by_id.return_value = element + driver.find_element_by_class_name.return_value = element + driver.screenshot.return_value = read_fixture('sample.png') + + schedule = db.session.query(DashboardEmailSchedule).filter_by( + id=self.dashboard_schedule).all()[0] + + deliver_dashboard(schedule) + mtime.sleep.assert_called_once() + driver.screenshot.assert_called_once() + send_email_smtp.assert_called_once() + + self.assertEquals(send_email_smtp.call_args[0][0], self.RECIPIENTS) + self.assertEquals( + list(send_email_smtp.call_args[1]['images'].values())[0], + driver.screenshot.return_value, + ) + + @patch('superset.tasks.schedules.firefox.webdriver.WebDriver') + @patch('superset.tasks.schedules.send_email_smtp') + @patch('superset.tasks.schedules.time') + def test_deliver_email_options(self, mtime, send_email_smtp, driver_class): + element = Mock() + driver = Mock() + mtime.sleep.return_value = None + + driver_class.return_value = driver + + # Ensure that we are able to login with the driver + driver.find_elements_by_id.side_effect = [True, False] + driver.find_element_by_class_name.return_value = element + element.screenshot_as_png = read_fixture('sample.png') + + schedule = db.session.query(DashboardEmailSchedule).filter_by( + id=self.dashboard_schedule).all()[0] + + # Send individual mails to the group + schedule.deliver_as_group = False + + # Set a bcc email address + app.config['EMAIL_REPORT_BCC_ADDRESS'] = self.BCC + + deliver_dashboard(schedule) + mtime.sleep.assert_called_once() + driver.screenshot.assert_not_called() + + self.assertEquals(send_email_smtp.call_count, 2) + self.assertEquals(send_email_smtp.call_args[1]['bcc'], self.BCC) + + @patch('superset.tasks.schedules.firefox.webdriver.WebDriver') + @patch('superset.tasks.schedules.send_email_smtp') + @patch('superset.tasks.schedules.time') + def test_deliver_slice_inline_image(self, mtime, send_email_smtp, driver_class): + element = Mock() + driver = Mock() + mtime.sleep.return_value = None + + driver_class.return_value = driver + + # Ensure that we are able to login with the driver + driver.find_elements_by_id.side_effect = [True, False] + driver.find_element_by_class_name.return_value = element + element.screenshot_as_png = read_fixture('sample.png') + + schedule = db.session.query(SliceEmailSchedule).filter_by( + id=self.slice_schedule).all()[0] + + schedule.email_format = SliceEmailReportFormat.visualization + schedule.delivery_format = EmailDeliveryType.inline + + deliver_slice(schedule) + mtime.sleep.assert_called_once() + driver.screenshot.assert_not_called() + send_email_smtp.assert_called_once() + + self.assertEquals( + list(send_email_smtp.call_args[1]['images'].values())[0], + element.screenshot_as_png, + ) + + @patch('superset.tasks.schedules.firefox.webdriver.WebDriver') + @patch('superset.tasks.schedules.send_email_smtp') + @patch('superset.tasks.schedules.time') + def test_deliver_slice_attachment(self, mtime, send_email_smtp, driver_class): + element = Mock() + driver = Mock() + mtime.sleep.return_value = None + + driver_class.return_value = driver + + # Ensure that we are able to login with the driver + driver.find_elements_by_id.side_effect = [True, False] + driver.find_element_by_class_name.return_value = element + element.screenshot_as_png = read_fixture('sample.png') + + schedule = db.session.query(SliceEmailSchedule).filter_by( + id=self.slice_schedule).all()[0] + + schedule.email_format = SliceEmailReportFormat.visualization + schedule.delivery_type = EmailDeliveryType.attachment + + deliver_slice(schedule) + mtime.sleep.assert_called_once() + driver.screenshot.assert_not_called() + send_email_smtp.assert_called_once() + + self.assertEquals( + send_email_smtp.call_args[1]['data']['screenshot.png'], + element.screenshot_as_png, + ) + + @patch('superset.tasks.schedules.requests.get') + @patch('superset.tasks.schedules.send_email_smtp') + def test_deliver_slice_csv_attachment(self, send_email_smtp, get): + response = Mock() + get.return_value = response + response.raise_for_status.return_value = None + response.content = self.CSV + + schedule = db.session.query(SliceEmailSchedule).filter_by( + id=self.slice_schedule).all()[0] + + schedule.email_format = SliceEmailReportFormat.data + schedule.delivery_type = EmailDeliveryType.attachment + + deliver_slice(schedule) + send_email_smtp.assert_called_once() + + file_name = __('%(name)s.csv', name=schedule.slice.slice_name) + + self.assertEquals( + send_email_smtp.call_args[1]['data'][file_name], + self.CSV, + ) + + @patch('superset.tasks.schedules.requests.get') + @patch('superset.tasks.schedules.send_email_smtp') + def test_deliver_slice_csv_inline(self, send_email_smtp, get): + response = Mock() + get.return_value = response + response.raise_for_status.return_value = None + response.content = self.CSV + + schedule = db.session.query(SliceEmailSchedule).filter_by( + id=self.slice_schedule).all()[0] + + schedule.email_format = SliceEmailReportFormat.data + schedule.delivery_type = EmailDeliveryType.inline + + deliver_slice(schedule) + send_email_smtp.assert_called_once() + + self.assertIsNone(send_email_smtp.call_args[1]['data']) + self.assertTrue('