[SIP-3] Scheduled email reports for Slices / Dashboards (#5294)

* [scheduled reports] Add support for scheduled reports

* Scheduled email reports for slice and dashboard visualization
  (attachment or inline)
* Scheduled email reports for slice data (CSV attachment on inline table)
* Each schedule has a list of recipients (all of them can receive a single mail,
  or separate mails)
* All outgoing mails can have a mandatory bcc - for audit purposes.
* Each dashboard/slice can have multiple schedules.

In addition, this PR also makes a few minor improvements to the celery
infrastructure.
* Create a common celery app
* Added more celery annotations for the tasks
* Introduced celery beat
* Update docs about concurrency / pools

* [scheduled reports] - Debug mode for scheduled emails

* [scheduled reports] - Ability to send test mails

* [scheduled reports] - Test email functionality - minor improvements

* [scheduled reports] - Rebase with master. Minor fixes

* [scheduled reports] - Add warning messages

* [scheduled reports] - flake8

* [scheduled reports] - fix rebase

* [scheduled reports] - fix rebase

* [scheduled reports] - fix flake8

* [scheduled reports] Rebase in prep for merge

* Fixed alembic tree after rebase
* Updated requirements to latest version of packages (and tested)
* Removed py2 stuff

* [scheduled reports] - fix flake8

* [scheduled reports] - address review comments

* [scheduled reports] - rebase with master
This commit is contained in:
Mahendra M 2018-12-10 22:29:29 -08:00 committed by Maxime Beauchemin
parent f366bbe735
commit 808622414c
23 changed files with 1569 additions and 40 deletions

9
.gitignore vendored
View File

@ -44,3 +44,12 @@ yarn-error.log
*.iml
venv
@eaDir/
# Test data
celery_results.sqlite
celerybeat-schedule
celerydb.sqlite
celerybeat.pid
geckodriver.log
ghostdriver.log
testCSV.csv

View File

@ -88,7 +88,7 @@ It's easy: use the ``Filter Box`` widget, build a slice, and add it to your
dashboard.
The ``Filter Box`` widget allows you to define a query to populate dropdowns
that can be use for filtering. To build the list of distinct values, we
that can be used for filtering. To build the list of distinct values, we
run a query, and sort the result by the metric you provide, sorting
descending.

View File

@ -603,14 +603,12 @@ Upgrading should be as straightforward as running::
superset db upgrade
superset init
SQL Lab
-------
SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible
databases. By default, queries are executed in the scope of a web
request so they
may eventually timeout as queries exceed the maximum duration of a web
request in your environment, whether it'd be a reverse proxy or the Superset
server itself.
Celery Tasks
------------
On large analytic databases, it's common to run background jobs, reports
and/or queries that execute for minutes or hours. In certain cases, we need
to support long running tasks that execute beyond the typical web request's
timeout (30-60 seconds).
On large analytic databases, it's common to run queries that
execute for minutes or hours.
@ -634,15 +632,41 @@ have the same configuration.
class CeleryConfig(object):
BROKER_URL = 'redis://localhost:6379/0'
CELERY_IMPORTS = ('superset.sql_lab', )
CELERY_IMPORTS = (
'superset.sql_lab',
'superset.tasks',
)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
CELERYD_LOG_LEVEL = 'DEBUG'
CELERYD_PREFETCH_MULTIPLIER = 10
CELERY_ACKS_LATE = True
CELERY_ANNOTATIONS = {
'sql_lab.get_sql_results': {
'rate_limit': '100/s',
},
'email_reports.send': {
'rate_limit': '1/s',
'time_limit': 120,
'soft_time_limit': 150,
'ignore_result': True,
},
}
CELERYBEAT_SCHEDULE = {
'email_reports.schedule_hourly': {
'task': 'email_reports.schedule_hourly',
'schedule': crontab(minute=1, hour='*'),
},
}
CELERY_CONFIG = CeleryConfig
To start a Celery worker to leverage the configuration run: ::
* To start a Celery worker to leverage the configuration run: ::
celery worker --app=superset.sql_lab:celery_app --pool=gevent -Ofair
celery worker --app=superset.tasks.celery_app:app --pool=prefork -Ofair -c 4
* To start a job which schedules periodic background jobs, run ::
celery beat --app=superset.tasks.celery_app:app
To setup a result backend, you need to pass an instance of a derivative
of ``werkzeug.contrib.cache.BaseCache`` to the ``RESULTS_BACKEND``
@ -665,11 +689,65 @@ look something like:
RESULTS_BACKEND = RedisCache(
host='localhost', port=6379, key_prefix='superset_results')
Note that it's important that all the worker nodes and web servers in
the Superset cluster share a common metadata database.
This means that SQLite will not work in this context since it has
limited support for concurrency and
typically lives on the local file system.
**Important notes**
* It is important that all the worker nodes and web servers in
the Superset cluster share a common metadata database.
This means that SQLite will not work in this context since it has
limited support for concurrency and
typically lives on the local file system.
* There should only be one instance of ``celery beat`` running in your
entire setup. If not, background jobs can get scheduled multiple times
resulting in weird behaviors like duplicate delivery of reports,
higher than expected load / traffic etc.
Email Reports
-------------
Email reports allow users to schedule email reports for
* slice and dashboard visualization (Attachment or inline)
* slice data (CSV attachment on inline table)
Schedules are defined in crontab format and each schedule
can have a list of recipients (all of them can receive a single mail,
or separate mails). For audit purposes, all outgoing mails can have a
mandatory bcc.
**Requirements**
* A selenium compatible driver & headless browser
* `geckodriver <https://github.com/mozilla/geckodriver>`_ and Firefox is preferred
* `chromedriver <http://chromedriver.chromium.org/>`_ is a good option too
* Run `celery worker` and `celery beat` as follows ::
celery worker --app=superset.tasks.celery_app:app --pool=prefork -Ofair -c 4
celery beat --app=superset.tasks.celery_app:app
**Important notes**
* Be mindful of the concurrency setting for celery (using ``-c 4``).
Selenium/webdriver instances can consume a lot of CPU / memory on your servers.
* In some cases, if you notice a lot of leaked ``geckodriver`` processes, try running
your celery processes with ::
celery worker --pool=prefork --max-tasks-per-child=128 ...
* It is recommended to run separate workers for ``sql_lab`` and
``email_reports`` tasks. Can be done by using ``queue`` field in ``CELERY_ANNOTATIONS``
SQL Lab
-------
SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible
databases. By default, queries are executed in the scope of a web
request so they may eventually timeout as queries exceed the maximum duration of a web
request in your environment, whether it'd be a reverse proxy or the Superset
server itself. In such cases, it is preferred to use ``celery`` to run the queries
in the background. Please follow the examples/notes mentioned above to get your
celery setup working.
Also note that SQL Lab supports Jinja templating in queries and that it's
possible to overload
@ -684,6 +762,8 @@ in this dictionary are made available for users to use in their SQL.
}
Celery Flower
-------------
Flower is a web based tool for monitoring the Celery cluster which you can
install from pip: ::
@ -691,7 +771,7 @@ install from pip: ::
and run via: ::
celery flower --app=superset.sql_lab:celery_app
celery flower --app=superset.tasks.celery_app:app
Building from source
---------------------

View File

@ -20,6 +20,7 @@ chardet==3.0.4 # via requests
click==6.7
colorama==0.3.9
contextlib2==0.5.5
croniter==0.3.26
cryptography==1.9
defusedxml==0.5.0 # via python3-openid
docutils==0.14 # via botocore
@ -69,9 +70,11 @@ python3-openid==3.1.0 # via flask-openid
pytz==2018.5 # via babel, celery, flower, pandas
pyyaml==3.13
requests==2.18.4
retry==0.9.2
rfc3986==1.1.0 # via tableschema
s3transfer==0.1.13 # via boto3
sasl==0.2.1 # via thrift-sasl
selenium==3.141.0
simplejson==3.15.0
six==1.11.0 # via bleach, cryptography, isodate, jsonlines, linear-tsv, pathlib2, polyline, pydruid, python-dateutil, sasl, sqlalchemy-utils, tableschema, tabulator, thrift
sqlalchemy-utils==0.32.21

View File

@ -94,6 +94,9 @@ setup(
'thrift-sasl>=0.2.1',
'unicodecsv',
'unidecode>=0.04.21',
'croniter==0.3.25',
'selenium==3.14.0',
'retry==0.9.2',
],
extras_require={
'cors': ['flask-cors>=2.0.0'],

View File

@ -11,6 +11,7 @@ import json
import os
import sys
from celery.schedules import crontab
from dateutil import tz
from flask_appbuilder.security.manager import AUTH_DB
@ -309,19 +310,43 @@ WARNING_MSG = None
# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
"""
# Example:
class CeleryConfig(object):
BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'
CELERY_IMPORTS = ('superset.sql_lab', )
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
CELERYD_LOG_LEVEL = 'DEBUG'
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_ACKS_LATE = True
BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'
CELERY_IMPORTS = (
'superset.sql_lab',
'superset.tasks',
)
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
CELERYD_LOG_LEVEL = 'DEBUG'
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_ACKS_LATE = True
CELERY_ANNOTATIONS = {
'sql_lab.get_sql_results': {
'rate_limit': '100/s',
},
'email_reports.send': {
'rate_limit': '1/s',
'time_limit': 120,
'soft_time_limit': 150,
'ignore_result': True,
},
}
CELERYBEAT_SCHEDULE = {
'email_reports.schedule_hourly': {
'task': 'email_reports.schedule_hourly',
'schedule': crontab(minute=1, hour='*'),
},
}
CELERY_CONFIG = CeleryConfig
"""
# Set celery config to None to disable all the above configuration
CELERY_CONFIG = None
"""
# static http headers to be served by your Superset server.
# This header prevents iFrames from other domains and
@ -463,6 +488,54 @@ SQL_QUERY_MUTATOR = None
# using flask-compress
ENABLE_FLASK_COMPRESS = True
# Enable / disable scheduled email reports
ENABLE_SCHEDULED_EMAIL_REPORTS = False
# If enabled, certail features are run in debug mode
# Current list:
# * Emails are sent using dry-run mode (logging only)
SCHEDULED_EMAIL_DEBUG_MODE = False
# Email reports - minimum time resolution (in minutes) for the crontab
EMAIL_REPORTS_CRON_RESOLUTION = 15
# Email report configuration
# From address in emails
EMAIL_REPORT_FROM_ADDRESS = 'reports@superset.org'
# Send bcc of all reports to this address. Set to None to disable.
# This is useful for maintaining an audit trail of all email deliveries.
EMAIL_REPORT_BCC_ADDRESS = None
# User credentials to use for generating reports
# This user should have permissions to browse all the dashboards and
# slices.
# TODO: In the future, login as the owner of the item to generate reports
EMAIL_REPORTS_USER = 'admin'
EMAIL_REPORTS_SUBJECT_PREFIX = '[Report] '
# The webdriver to use for generating reports. Use one of the following
# firefox
# Requires: geckodriver and firefox installations
# Limitations: can be buggy at times
# chrome:
# Requires: headless chrome
# Limitations: unable to generate screenshots of elements
EMAIL_REPORTS_WEBDRIVER = 'firefox'
# Window size - this will impact the rendering of the data
WEBDRIVER_WINDOW = {
'dashboard': (1600, 2000),
'slice': (3000, 1200),
}
# Any config options to be passed as-is to the webdriver
WEBDRIVER_CONFIGURATION = {}
# The base URL to query for accessing the user interface
WEBDRIVER_BASEURL = 'http://0.0.0.0:8080/'
try:
if CONFIG_PATH_ENV_VAR in os.environ:
# Explicitly import config module that is not in pythonpath; useful

View File

@ -0,0 +1,69 @@
"""models for email reports
Revision ID: 6c7537a6004a
Revises: e502db2af7be
Create Date: 2018-05-15 20:28:51.977572
"""
# revision identifiers, used by Alembic.
revision = '6c7537a6004a'
down_revision = 'a61b40f9f57f'
from alembic import op
import sqlalchemy as sa
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('dashboard_email_schedules',
sa.Column('created_on', sa.DateTime(), nullable=True),
sa.Column('changed_on', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('active', sa.Boolean(), nullable=True),
sa.Column('crontab', sa.String(length=50), nullable=True),
sa.Column('recipients', sa.Text(), nullable=True),
sa.Column('deliver_as_group', sa.Boolean(), nullable=True),
sa.Column('delivery_type', sa.Enum('attachment', 'inline', name='emaildeliverytype'), nullable=True),
sa.Column('dashboard_id', sa.Integer(), nullable=True),
sa.Column('created_by_fk', sa.Integer(), nullable=True),
sa.Column('changed_by_fk', sa.Integer(), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
sa.ForeignKeyConstraint(['dashboard_id'], ['dashboards.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_dashboard_email_schedules_active'), 'dashboard_email_schedules', ['active'], unique=False)
op.create_table('slice_email_schedules',
sa.Column('created_on', sa.DateTime(), nullable=True),
sa.Column('changed_on', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('active', sa.Boolean(), nullable=True),
sa.Column('crontab', sa.String(length=50), nullable=True),
sa.Column('recipients', sa.Text(), nullable=True),
sa.Column('deliver_as_group', sa.Boolean(), nullable=True),
sa.Column('delivery_type', sa.Enum('attachment', 'inline', name='emaildeliverytype'), nullable=True),
sa.Column('slice_id', sa.Integer(), nullable=True),
sa.Column('email_format', sa.Enum('visualization', 'data', name='sliceemailreportformat'), nullable=True),
sa.Column('created_by_fk', sa.Integer(), nullable=True),
sa.Column('changed_by_fk', sa.Integer(), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
sa.ForeignKeyConstraint(['slice_id'], ['slices.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_slice_email_schedules_active'), 'slice_email_schedules', ['active'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_slice_email_schedules_active'), table_name='slice_email_schedules')
op.drop_table('slice_email_schedules')
op.drop_index(op.f('ix_dashboard_email_schedules_active'), table_name='dashboard_email_schedules')
op.drop_table('dashboard_email_schedules')
# ### end Alembic commands ###

View File

@ -1,3 +1,4 @@
from . import core # noqa
from . import sql_lab # noqa
from . import user_attributes # noqa
from . import schedules # noqa

View File

@ -0,0 +1,93 @@
# pylint: disable=C,R,W
"""Models for scheduled execution of jobs"""
import enum
from flask_appbuilder import Model
from sqlalchemy import (
Boolean, Column, Enum, ForeignKey, Integer, String, Text,
)
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import relationship
from superset import security_manager
from superset.models.helpers import AuditMixinNullable, ImportMixin
metadata = Model.metadata # pylint: disable=no-member
class ScheduleType(enum.Enum):
slice = 'slice'
dashboard = 'dashboard'
class EmailDeliveryType(enum.Enum):
attachment = 'Attachment'
inline = 'Inline'
class SliceEmailReportFormat(enum.Enum):
visualization = 'Visualization'
data = 'Raw data'
class EmailSchedule():
"""Schedules for emailing slices / dashboards"""
__tablename__ = 'email_schedules'
id = Column(Integer, primary_key=True)
active = Column(Boolean, default=True, index=True)
crontab = Column(String(50))
@declared_attr
def user_id(self):
return Column(Integer, ForeignKey('ab_user.id'))
@declared_attr
def user(self):
return relationship(
security_manager.user_model,
backref=self.__tablename__,
foreign_keys=[self.user_id],
)
recipients = Column(Text)
deliver_as_group = Column(Boolean, default=False)
delivery_type = Column(Enum(EmailDeliveryType))
class DashboardEmailSchedule(Model,
AuditMixinNullable,
ImportMixin,
EmailSchedule):
__tablename__ = 'dashboard_email_schedules'
dashboard_id = Column(Integer, ForeignKey('dashboards.id'))
dashboard = relationship(
'Dashboard',
backref='email_schedules',
foreign_keys=[dashboard_id],
)
class SliceEmailSchedule(Model,
AuditMixinNullable,
ImportMixin,
EmailSchedule):
__tablename__ = 'slice_email_schedules'
slice_id = Column(Integer, ForeignKey('slices.id'))
slice = relationship(
'Slice',
backref='email_schedules',
foreign_keys=[slice_id],
)
email_format = Column(Enum(SliceEmailReportFormat))
def get_scheduler_model(report_type):
if report_type == ScheduleType.dashboard.value:
return DashboardEmailSchedule
elif report_type == ScheduleType.slice.value:
return SliceEmailSchedule

View File

@ -14,8 +14,8 @@ from sqlalchemy.pool import NullPool
from superset import app, dataframe, db, results_backend, security_manager
from superset.models.sql_lab import Query
from superset.sql_parse import SupersetQuery
from superset.tasks.celery_app import app as celery_app
from superset.utils.core import (
get_celery_app,
json_iso_dttm_ser,
now_as_float,
QueryStatus,
@ -23,8 +23,7 @@ from superset.utils.core import (
)
config = app.config
celery_app = get_celery_app(config)
stats_logger = app.config.get('STATS_LOGGER')
stats_logger = config.get('STATS_LOGGER')
SQLLAB_TIMEOUT = config.get('SQLLAB_ASYNC_TIME_LIMIT_SEC', 600)
log_query = config.get('QUERY_LOGGER')
@ -77,7 +76,9 @@ def session_scope(nullpool):
session.close()
@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT)
@celery_app.task(name='sql_lab.get_sql_results',
bind=True,
soft_time_limit=SQLLAB_TIMEOUT)
def get_sql_results(
ctask, query_id, rendered_query, return_results=True, store_results=False,
user_name=None, start_time=None):

View File

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from . import schedules # noqa

View File

@ -0,0 +1,11 @@
# pylint: disable=C,R,W
"""Utility functions used across Superset"""
# Superset framework imports
from superset import app
from superset.utils.core import get_celery_app
# Globals
config = app.config
app = get_celery_app(config)

441
superset/tasks/schedules.py Normal file
View File

@ -0,0 +1,441 @@
# pylint: disable=C,R,W
"""Utility functions used across Superset"""
from collections import namedtuple
from datetime import datetime, timedelta
from email.utils import make_msgid, parseaddr
import logging
import time
import croniter
from dateutil.tz import tzlocal
from flask import render_template, Response, session, url_for
from flask_babel import gettext as __
from flask_login import login_user
import requests
from retry.api import retry_call
from selenium.common.exceptions import WebDriverException
from selenium.webdriver import chrome, firefox
import simplejson as json
from six.moves import urllib
from werkzeug.utils import parse_cookie
# Superset framework imports
from superset import app, db, security_manager
from superset.models.schedules import (
EmailDeliveryType,
get_scheduler_model,
ScheduleType,
SliceEmailReportFormat,
)
from superset.tasks.celery_app import app as celery_app
from superset.utils.core import (
get_email_address_list,
send_email_smtp,
)
# Globals
config = app.config
logging.getLogger('tasks.email_reports').setLevel(logging.INFO)
# Time in seconds, we will wait for the page to load and render
PAGE_RENDER_WAIT = 30
EmailContent = namedtuple('EmailContent', ['body', 'data', 'images'])
def _get_recipients(schedule):
bcc = config.get('EMAIL_REPORT_BCC_ADDRESS', None)
if schedule.deliver_as_group:
to = schedule.recipients
yield (to, bcc)
else:
for to in get_email_address_list(schedule.recipients):
yield (to, bcc)
def _deliver_email(schedule, subject, email):
for (to, bcc) in _get_recipients(schedule):
send_email_smtp(
to, subject, email.body, config,
data=email.data,
images=email.images,
bcc=bcc,
mime_subtype='related',
dryrun=config.get('SCHEDULED_EMAIL_DEBUG_MODE'),
)
def _generate_mail_content(schedule, screenshot, name, url):
if schedule.delivery_type == EmailDeliveryType.attachment:
images = None
data = {
'screenshot.png': screenshot,
}
body = __(
'<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
name=name,
url=url,
)
elif schedule.delivery_type == EmailDeliveryType.inline:
# Get the domain from the 'From' address ..
# and make a message id without the < > in the ends
domain = parseaddr(config.get('SMTP_MAIL_FROM'))[1].split('@')[1]
msgid = make_msgid(domain)[1:-1]
images = {
msgid: screenshot,
}
data = None
body = __(
"""
<b><a href="%(url)s">Explore in Superset</a></b><p></p>
<img src="cid:%(msgid)s">
""",
name=name, url=url, msgid=msgid,
)
return EmailContent(body, data, images)
def _get_auth_cookies():
# Login with the user specified to get the reports
with app.test_request_context():
user = security_manager.find_user(config.get('EMAIL_REPORTS_USER'))
login_user(user)
# A mock response object to get the cookie information from
response = Response()
app.session_interface.save_session(app, session, response)
cookies = []
# Set the cookies in the driver
for name, value in response.headers:
if name.lower() == 'set-cookie':
cookie = parse_cookie(value)
cookies.append(cookie['session'])
return cookies
def _get_url_path(view, **kwargs):
with app.test_request_context():
return urllib.parse.urljoin(
str(config.get('WEBDRIVER_BASEURL')),
url_for(view, **kwargs),
)
def create_webdriver():
# Create a webdriver for use in fetching reports
if config.get('EMAIL_REPORTS_WEBDRIVER') == 'firefox':
driver_class = firefox.webdriver.WebDriver
options = firefox.options.Options()
elif config.get('EMAIL_REPORTS_WEBDRIVER') == 'chrome':
driver_class = chrome.webdriver.WebDriver
options = chrome.options.Options()
options.add_argument('--headless')
# Prepare args for the webdriver init
kwargs = dict(
options=options,
)
kwargs.update(config.get('WEBDRIVER_CONFIGURATION'))
# Initialize the driver
driver = driver_class(**kwargs)
# Some webdrivers need an initial hit to the welcome URL
# before we set the cookie
welcome_url = _get_url_path('Superset.welcome')
# Hit the welcome URL and check if we were asked to login
driver.get(welcome_url)
elements = driver.find_elements_by_id('loginbox')
# This indicates that we were not prompted for a login box.
if not elements:
return driver
# Set the cookies in the driver
for cookie in _get_auth_cookies():
info = dict(name='session', value=cookie)
driver.add_cookie(info)
return driver
def destroy_webdriver(driver):
"""
Destroy a driver
"""
# This is some very flaky code in selenium. Hence the retries
# and catch-all exceptions
try:
retry_call(driver.close, tries=2)
except Exception:
pass
try:
driver.quit()
except Exception:
pass
def deliver_dashboard(schedule):
"""
Given a schedule, delivery the dashboard as an email report
"""
dashboard = schedule.dashboard
dashboard_url = _get_url_path(
'Superset.dashboard',
dashboard_id=dashboard.id,
)
# Create a driver, fetch the page, wait for the page to render
driver = create_webdriver()
window = config.get('WEBDRIVER_WINDOW')['dashboard']
driver.set_window_size(*window)
driver.get(dashboard_url)
time.sleep(PAGE_RENDER_WAIT)
# Set up a function to retry once for the element.
# This is buggy in certain selenium versions with firefox driver
get_element = getattr(driver, 'find_element_by_class_name')
element = retry_call(
get_element,
fargs=['grid-container'],
tries=2,
delay=PAGE_RENDER_WAIT,
)
try:
screenshot = element.screenshot_as_png
except WebDriverException:
# Some webdrivers do not support screenshots for elements.
# In such cases, take a screenshot of the entire page.
screenshot = driver.screenshot() # pylint: disable=no-member
finally:
destroy_webdriver(driver)
# Generate the email body and attachments
email = _generate_mail_content(
schedule,
screenshot,
dashboard.dashboard_title,
dashboard_url,
)
subject = __(
'%(prefix)s %(title)s',
prefix=config.get('EMAIL_REPORTS_SUBJECT_PREFIX'),
title=dashboard.dashboard_title,
)
_deliver_email(schedule, subject, email)
def _get_slice_data(schedule):
slc = schedule.slice
slice_url = _get_url_path(
'Superset.explore_json',
csv='true',
form_data=json.dumps({'slice_id': slc.id}),
)
# URL to include in the email
url = _get_url_path(
'Superset.slice',
slice_id=slc.id,
)
cookies = {}
for cookie in _get_auth_cookies():
cookies['session'] = cookie
response = requests.get(slice_url, cookies=cookies)
response.raise_for_status()
# TODO: Move to the csv module
rows = [r.split(b',') for r in response.content.splitlines()]
if schedule.delivery_type == EmailDeliveryType.inline:
data = None
# Parse the csv file and generate HTML
columns = rows.pop(0)
with app.app_context():
body = render_template(
'superset/reports/slice_data.html',
columns=columns,
rows=rows,
name=slc.slice_name,
link=url,
)
elif schedule.delivery_type == EmailDeliveryType.attachment:
data = {
__('%(name)s.csv', name=slc.slice_name): response.content,
}
body = __(
'<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
name=slc.slice_name,
url=url,
)
return EmailContent(body, data, None)
def _get_slice_visualization(schedule):
slc = schedule.slice
# Create a driver, fetch the page, wait for the page to render
driver = create_webdriver()
window = config.get('WEBDRIVER_WINDOW')['slice']
driver.set_window_size(*window)
slice_url = _get_url_path(
'Superset.slice',
slice_id=slc.id,
)
driver.get(slice_url)
time.sleep(PAGE_RENDER_WAIT)
# Set up a function to retry once for the element.
# This is buggy in certain selenium versions with firefox driver
element = retry_call(
driver.find_element_by_class_name,
fargs=['chart-container'],
tries=2,
delay=PAGE_RENDER_WAIT,
)
try:
screenshot = element.screenshot_as_png
except WebDriverException:
# Some webdrivers do not support screenshots for elements.
# In such cases, take a screenshot of the entire page.
screenshot = driver.screenshot() # pylint: disable=no-member
finally:
destroy_webdriver(driver)
# Generate the email body and attachments
return _generate_mail_content(
schedule,
screenshot,
slc.slice_name,
slice_url,
)
def deliver_slice(schedule):
"""
Given a schedule, delivery the slice as an email report
"""
if schedule.email_format == SliceEmailReportFormat.data:
email = _get_slice_data(schedule)
elif schedule.email_format == SliceEmailReportFormat.visualization:
email = _get_slice_visualization(schedule)
else:
raise RuntimeError('Unknown email report format')
subject = __(
'%(prefix)s %(title)s',
prefix=config.get('EMAIL_REPORTS_SUBJECT_PREFIX'),
title=schedule.slice.slice_name,
)
_deliver_email(schedule, subject, email)
@celery_app.task(name='email_reports.send', bind=True, soft_time_limit=300)
def schedule_email_report(task, report_type, schedule_id, recipients=None):
model_cls = get_scheduler_model(report_type)
schedule = db.session.query(model_cls).get(schedule_id)
# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logging.info('Ignoring deactivated schedule')
return
# TODO: Detach the schedule object from the db session
if recipients is not None:
schedule.id = schedule_id
schedule.recipients = recipients
if report_type == ScheduleType.dashboard.value:
deliver_dashboard(schedule)
elif report_type == ScheduleType.slice.value:
deliver_slice(schedule)
else:
raise RuntimeError('Unknown report type')
def next_schedules(crontab, start_at, stop_at, resolution=0):
crons = croniter.croniter(crontab, start_at - timedelta(seconds=1))
previous = start_at - timedelta(days=1)
for eta in crons.all_next(datetime):
# Do not cross the time boundary
if eta >= stop_at:
break
if eta < start_at:
continue
# Do not allow very frequent tasks
if eta - previous < timedelta(seconds=resolution):
continue
yield eta
previous = eta
def schedule_window(report_type, start_at, stop_at, resolution):
"""
Find all active schedules and schedule celery tasks for
each of them with a specific ETA (determined by parsing
the cron schedule for the schedule)
"""
model_cls = get_scheduler_model(report_type)
dbsession = db.create_scoped_session()
schedules = dbsession.query(model_cls).filter(model_cls.active.is_(True))
for schedule in schedules:
args = (
report_type,
schedule.id,
)
# Schedule the job for the specified time window
for eta in next_schedules(schedule.crontab,
start_at,
stop_at,
resolution=resolution):
schedule_email_report.apply_async(args, eta=eta)
@celery_app.task(name='email_reports.schedule_hourly')
def schedule_hourly():
""" Celery beat job meant to be invoked hourly """
if not config.get('ENABLE_SCHEDULED_EMAIL_REPORTS'):
logging.info('Scheduled email reports not enabled in config')
return
resolution = config.get('EMAIL_REPORTS_CRON_RESOLUTION', 0) * 60
# Get the top of the hour
start_at = datetime.now(tzlocal()).replace(microsecond=0, second=0, minute=0)
stop_at = start_at + timedelta(seconds=3600)
schedule_window(ScheduleType.dashboard.value, start_at, stop_at, resolution)
schedule_window(ScheduleType.slice.value, start_at, stop_at, resolution)

View File

@ -0,0 +1,31 @@
<html>
<head>
</head>
<body>
<p></p>
<b><a href="{{ link }}">Explore this data in Superset</a></b>
<p></p>
<table border='1' cellspacing='0' cellpadding='3'
style='border: 1px solid #c0c0c0; border-collapse: collapse;'>
<thead>
<tr>
<th colspan={{ columns | length }} bgcolor='#c0c0c0'>{{ name }}</th>
</tr>
<tr>
{%- for column in columns %}
<th bgcolor='#f0f0f0'>{{ column.decode('utf-8') | replace('_', ' ') | title }}</th>
{%- endfor %}
</tr>
</thead>
<tbody>
{%- for row in rows %}
<tr>
{%- for column in row %}
<td>{{ column.decode('utf-8') }}</td>
{%- endfor %}
</tr>
{%- endfor %}
</tbody>
</table>
</body>
</html>

View File

@ -4,6 +4,7 @@ from builtins import object
from datetime import date, datetime, time, timedelta
import decimal
from email.mime.application import MIMEApplication
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
@ -582,21 +583,23 @@ def notify_user_about_perm_udate(
dryrun=not config.get('EMAIL_NOTIFICATIONS'))
def send_email_smtp(to, subject, html_content, config, files=None,
dryrun=False, cc=None, bcc=None, mime_subtype='mixed'):
def send_email_smtp(to, subject, html_content, config,
files=None, data=None, images=None, dryrun=False,
cc=None, bcc=None, mime_subtype='mixed'):
"""
Send an email with html content, eg:
send_email_smtp(
'test@example.com', 'foo', '<b>Foo</b> bar',['/dev/null'], dryrun=True)
"""
smtp_mail_from = config.get('SMTP_MAIL_FROM')
to = get_email_address_list(to)
msg = MIMEMultipart(mime_subtype)
msg['Subject'] = subject
msg['From'] = smtp_mail_from
msg['To'] = ', '.join(to)
msg.preamble = 'This is a multi-part message in MIME format.'
recipients = to
if cc:
cc = get_email_address_list(cc)
@ -612,6 +615,7 @@ def send_email_smtp(to, subject, html_content, config, files=None,
mime_text = MIMEText(html_content, 'html')
msg.attach(mime_text)
# Attach files by reading them from disk
for fname in files or []:
basename = os.path.basename(fname)
with open(fname, 'rb') as f:
@ -621,6 +625,23 @@ def send_email_smtp(to, subject, html_content, config, files=None,
Content_Disposition="attachment; filename='%s'" % basename,
Name=basename))
# Attach any files passed directly
for name, body in (data or {}).items():
msg.attach(
MIMEApplication(
body,
Content_Disposition="attachment; filename='%s'" % name,
Name=name,
))
# Attach any inline images, which may be required for display in
# HTML content (inline)
for msgid, body in (images or {}).items():
image = MIMEImage(body)
image.add_header('Content-ID', '<%s>' % msgid)
image.add_header('Content-Disposition', 'inline')
msg.attach(image)
send_MIME_email(smtp_mail_from, recipients, msg, config, dryrun=dryrun)
@ -639,7 +660,7 @@ def send_MIME_email(e_from, e_to, mime_msg, config, dryrun=False):
s.starttls()
if SMTP_USER and SMTP_PASSWORD:
s.login(SMTP_USER, SMTP_PASSWORD)
logging.info('Sent an alert email to ' + str(e_to))
logging.info('Sent an email to ' + str(e_to))
s.sendmail(e_from, e_to, mime_msg.as_string())
s.quit()
else:
@ -651,11 +672,13 @@ def get_email_address_list(address_string):
if isinstance(address_string, basestring):
if ',' in address_string:
address_string = address_string.split(',')
elif '\n' in address_string:
address_string = address_string.split('\n')
elif ';' in address_string:
address_string = address_string.split(';')
else:
address_string = [address_string]
return address_string
return [x.strip() for x in address_string if x.strip()]
def choicify(values):

View File

@ -4,3 +4,4 @@ from . import core # noqa
from . import sql_lab # noqa
from . import annotations # noqa
from . import datasource # noqa
from . import schedules # noqa

View File

@ -1635,7 +1635,6 @@ class Superset(BaseSupersetView):
@staticmethod
def _set_dash_metadata(dashboard, data):
positions = data['positions']
# find slices in the position data
slice_ids = []
slice_id_to_name = {}

279
superset/views/schedules.py Normal file
View File

@ -0,0 +1,279 @@
# pylint: disable=C,R,W
import enum
from croniter import croniter
from flask import flash, g
from flask_appbuilder import expose
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_appbuilder.security.decorators import has_access
from flask_babel import gettext as __
from flask_babel import lazy_gettext as _
import simplejson as json
from wtforms import BooleanField, StringField
from superset import app, appbuilder, db, security_manager
from superset.exceptions import SupersetException
from superset.models.core import Dashboard, Slice
from superset.models.schedules import (
DashboardEmailSchedule,
ScheduleType,
SliceEmailSchedule,
)
from superset.tasks.schedules import schedule_email_report
from superset.utils.core import (
get_email_address_list,
json_iso_dttm_ser,
)
from superset.views.core import json_success
from .base import DeleteMixin, SupersetModelView
class EmailScheduleView(SupersetModelView, DeleteMixin):
_extra_data = {
'test_email': False,
'test_email_recipients': None,
}
schedule_type = None
schedule_type_model = None
page_size = 20
add_exclude_columns = [
'user',
'created_on',
'changed_on',
'created_by',
'changed_by',
]
edit_exclude_columns = add_exclude_columns
description_columns = {
'deliver_as_group': 'If enabled, send a single email to all '
'recipients (in email/To: field)',
'crontab': 'Unix style crontab schedule to deliver emails. '
'Changes to schedules reflect in one hour.',
'delivery_type': 'Indicates how the rendered content is delivered',
}
add_form_extra_fields = {
'test_email': BooleanField(
'Send Test Email',
default=False,
description='If enabled, we send a test mail on create / update',
),
'test_email_recipients': StringField(
'Test Email Recipients',
default=None,
description='List of recipients to send test email to. '
'If empty, we send it to the original recipients',
),
}
edit_form_extra_fields = add_form_extra_fields
def process_form(self, form, is_created):
recipients = form.test_email_recipients.data.strip() or None
self._extra_data['test_email'] = form.test_email.data
self._extra_data['test_email_recipients'] = recipients
def pre_add(self, obj):
try:
recipients = get_email_address_list(obj.recipients)
obj.recipients = ', '.join(recipients)
except Exception:
raise SupersetException('Invalid email list')
obj.user = obj.user or g.user
if not croniter.is_valid(obj.crontab):
raise SupersetException('Invalid crontab format')
def pre_update(self, obj):
self.pre_add(obj)
def post_add(self, obj):
# Schedule a test mail if the user requested for it.
if self._extra_data['test_email']:
recipients = self._extra_data['test_email_recipients']
args = (self.schedule_type, obj.id)
kwargs = dict(recipients=recipients)
schedule_email_report.apply_async(args=args, kwargs=kwargs)
# Notify the user that schedule changes will be activate only in the
# next hour
if obj.active:
flash('Schedule changes will get applied in one hour', 'warning')
def post_update(self, obj):
self.post_add(obj)
@has_access
@expose('/fetch/<int:item_id>/', methods=['GET'])
def fetch_schedules(self, item_id):
query = db.session.query(self.datamodel.obj)
query = query.join(self.schedule_type_model).filter(
self.schedule_type_model.id == item_id)
schedules = []
for schedule in query.all():
info = {'schedule': schedule.id}
for col in self.list_columns + self.add_exclude_columns:
info[col] = getattr(schedule, col)
if isinstance(info[col], enum.Enum):
info[col] = info[col].name
elif isinstance(info[col], security_manager.user_model):
info[col] = info[col].username
info['user'] = schedule.user.username
info[self.schedule_type] = getattr(schedule, self.schedule_type).id
schedules.append(info)
return json_success(json.dumps(schedules, default=json_iso_dttm_ser))
class DashboardEmailScheduleView(EmailScheduleView):
schedule_type = ScheduleType.dashboard.name
schedule_type_model = Dashboard
add_title = _('Schedule Email Reports for Dashboards')
edit_title = add_title
list_title = _('Manage Email Reports for Dashboards')
datamodel = SQLAInterface(DashboardEmailSchedule)
order_columns = ['user', 'dashboard', 'created_on']
list_columns = [
'dashboard',
'active',
'crontab',
'user',
'deliver_as_group',
'delivery_type',
]
add_columns = [
'dashboard',
'active',
'crontab',
'recipients',
'deliver_as_group',
'delivery_type',
'test_email',
'test_email_recipients',
]
edit_columns = add_columns
search_columns = [
'dashboard',
'active',
'user',
'deliver_as_group',
'delivery_type',
]
label_columns = {
'dashboard': _('Dashboard'),
'created_on': _('Created On'),
'changed_on': _('Changed On'),
'user': _('User'),
'active': _('Active'),
'crontab': _('Crontab'),
'recipients': _('Recipients'),
'deliver_as_group': _('Deliver As Group'),
'delivery_type': _('Delivery Type'),
}
def pre_add(self, obj):
if obj.dashboard is None:
raise SupersetException('Dashboard is mandatory')
super(DashboardEmailScheduleView, self).pre_add(obj)
class SliceEmailScheduleView(EmailScheduleView):
schedule_type = ScheduleType.slice.name
schedule_type_model = Slice
add_title = _('Schedule Email Reports for Charts')
edit_title = add_title
list_title = _('Manage Email Reports for Charts')
datamodel = SQLAInterface(SliceEmailSchedule)
order_columns = ['user', 'slice', 'created_on']
list_columns = [
'slice',
'active',
'crontab',
'user',
'deliver_as_group',
'delivery_type',
'email_format',
]
add_columns = [
'slice',
'active',
'crontab',
'recipients',
'deliver_as_group',
'delivery_type',
'email_format',
'test_email',
'test_email_recipients',
]
edit_columns = add_columns
search_columns = [
'slice',
'active',
'user',
'deliver_as_group',
'delivery_type',
'email_format',
]
label_columns = {
'slice': _('Chart'),
'created_on': _('Created On'),
'changed_on': _('Changed On'),
'user': _('User'),
'active': _('Active'),
'crontab': _('Crontab'),
'recipients': _('Recipients'),
'deliver_as_group': _('Deliver As Group'),
'delivery_type': _('Delivery Type'),
'email_format': _('Email Format'),
}
def pre_add(self, obj):
if obj.slice is None:
raise SupersetException('Slice is mandatory')
super(SliceEmailScheduleView, self).pre_add(obj)
def _register_schedule_menus():
appbuilder.add_separator('Manage')
appbuilder.add_view(
DashboardEmailScheduleView,
'Dashboard Email Schedules',
label=__('Dashboard Emails'),
category='Manage',
category_label=__('Manage'),
icon='fa-search')
appbuilder.add_view(
SliceEmailScheduleView,
'Chart Emails',
label=__('Chart Email Schedules'),
category='Manage',
category_label=__('Manage'),
icon='fa-search')
if app.config.get('ENABLE_SCHEDULED_EMAIL_REPORTS'):
_register_schedule_menus()

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""Unit tests for email service in Superset"""
from email.mime.application import MIMEApplication
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
import logging
import tempfile
@ -10,6 +11,7 @@ import mock
from superset import app
from superset.utils import core as utils
from .utils import read_fixture
send_email_test = mock.Mock()
@ -37,6 +39,39 @@ class EmailSmtpTest(unittest.TestCase):
mimeapp = MIMEApplication('attachment')
assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload()
@mock.patch('superset.utils.core.send_MIME_email')
def test_send_smtp_data(self, mock_send_mime):
utils.send_email_smtp(
'to', 'subject', 'content', app.config, data={'1.txt': b'data'})
assert mock_send_mime.called
call_args = mock_send_mime.call_args[0]
logging.debug(call_args)
assert call_args[0] == app.config.get('SMTP_MAIL_FROM')
assert call_args[1] == ['to']
msg = call_args[2]
assert msg['Subject'] == 'subject'
assert msg['From'] == app.config.get('SMTP_MAIL_FROM')
assert len(msg.get_payload()) == 2
mimeapp = MIMEApplication('data')
assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload()
@mock.patch('superset.utils.core.send_MIME_email')
def test_send_smtp_inline_images(self, mock_send_mime):
image = read_fixture('sample.png')
utils.send_email_smtp(
'to', 'subject', 'content', app.config, images=dict(blah=image))
assert mock_send_mime.called
call_args = mock_send_mime.call_args[0]
logging.debug(call_args)
assert call_args[0] == app.config.get('SMTP_MAIL_FROM')
assert call_args[1] == ['to']
msg = call_args[2]
assert msg['Subject'] == 'subject'
assert msg['From'] == app.config.get('SMTP_MAIL_FROM')
assert len(msg.get_payload()) == 2
mimeapp = MIMEImage(image)
assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload()
@mock.patch('superset.utils.core.send_MIME_email')
def test_send_bcc_smtp(self, mock_send_mime):
attachment = tempfile.NamedTemporaryFile()

BIN
tests/fixtures/sample.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.4 KiB

3
tests/fixtures/trends.csv vendored Normal file
View File

@ -0,0 +1,3 @@
t1,t2,t3__sum
c11,c12,c13
c21,c22,c23
1 t1 t2 t3__sum
2 c11 c12 c13
3 c21 c22 c23

368
tests/schedules_test.py Normal file
View File

@ -0,0 +1,368 @@
from datetime import datetime, timedelta
import unittest
from flask_babel import gettext as __
from mock import Mock, patch, PropertyMock
from selenium.common.exceptions import WebDriverException
from superset import app, db
from superset.models.core import Dashboard, Slice
from superset.models.schedules import (
DashboardEmailSchedule,
EmailDeliveryType,
SliceEmailReportFormat,
SliceEmailSchedule,
)
from superset.tasks.schedules import (
create_webdriver,
deliver_dashboard,
deliver_slice,
next_schedules,
)
from .utils import read_fixture
class SchedulesTestCase(unittest.TestCase):
RECIPIENTS = 'recipient1@superset.com, recipient2@superset.com'
BCC = 'bcc@superset.com'
CSV = read_fixture('trends.csv')
@classmethod
def setUpClass(cls):
cls.common_data = dict(
active=True,
crontab='* * * * *',
recipients=cls.RECIPIENTS,
deliver_as_group=True,
delivery_type=EmailDeliveryType.inline,
)
# Pick up a random slice and dashboard
slce = db.session.query(Slice).all()[0]
dashboard = db.session.query(Dashboard).all()[0]
dashboard_schedule = DashboardEmailSchedule(**cls.common_data)
dashboard_schedule.dashboard_id = dashboard.id
dashboard_schedule.user_id = 1
db.session.add(dashboard_schedule)
slice_schedule = SliceEmailSchedule(**cls.common_data)
slice_schedule.slice_id = slce.id
slice_schedule.user_id = 1
slice_schedule.email_format = SliceEmailReportFormat.data
db.session.add(slice_schedule)
db.session.commit()
cls.slice_schedule = slice_schedule.id
cls.dashboard_schedule = dashboard_schedule.id
@classmethod
def tearDownClass(cls):
db.session.query(SliceEmailSchedule).filter_by(id=cls.slice_schedule).delete()
db.session.query(DashboardEmailSchedule).filter_by(
id=cls.dashboard_schedule).delete()
db.session.commit()
def test_crontab_scheduler(self):
crontab = '* * * * *'
start_at = datetime.now().replace(microsecond=0, second=0, minute=0)
stop_at = start_at + timedelta(seconds=3600)
# Fire off the task every minute
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0))
self.assertEqual(schedules[0], start_at)
self.assertEqual(schedules[-1], stop_at - timedelta(seconds=60))
self.assertEqual(len(schedules), 60)
# Fire off the task every 10 minutes, controlled via resolution
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=10 * 60))
self.assertEqual(schedules[0], start_at)
self.assertEqual(schedules[-1], stop_at - timedelta(seconds=10 * 60))
self.assertEqual(len(schedules), 6)
# Fire off the task every 12 minutes, controlled via resolution
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=12 * 60))
self.assertEqual(schedules[0], start_at)
self.assertEqual(schedules[-1], stop_at - timedelta(seconds=12 * 60))
self.assertEqual(len(schedules), 5)
def test_wider_schedules(self):
crontab = '*/15 2,10 * * *'
for hour in range(0, 24):
start_at = datetime.now().replace(
microsecond=0, second=0, minute=0, hour=hour)
stop_at = start_at + timedelta(seconds=3600)
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0))
if hour in (2, 10):
self.assertEqual(len(schedules), 4)
else:
self.assertEqual(len(schedules), 0)
def test_complex_schedule(self):
# Run the job on every Friday of March and May
# On these days, run the job at
# 5:10 pm
# 5:11 pm
# 5:12 pm
# 5:13 pm
# 5:14 pm
# 5:15 pm
# 5:25 pm
# 5:28 pm
# 5:31 pm
# 5:34 pm
# 5:37 pm
# 5:40 pm
crontab = '10-15,25-40/3 17 * 3,5 5'
start_at = datetime.strptime('2018/01/01', '%Y/%m/%d')
stop_at = datetime.strptime('2018/12/31', '%Y/%m/%d')
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=60))
self.assertEqual(len(schedules), 108)
fmt = '%Y-%m-%d %H:%M:%S'
self.assertEqual(schedules[0], datetime.strptime('2018-03-02 17:10:00', fmt))
self.assertEqual(schedules[-1], datetime.strptime('2018-05-25 17:40:00', fmt))
self.assertEqual(schedules[59], datetime.strptime('2018-03-30 17:40:00', fmt))
self.assertEqual(schedules[60], datetime.strptime('2018-05-04 17:10:00', fmt))
@patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
def test_create_driver(self, mock_driver_class):
mock_driver = Mock()
mock_driver_class.return_value = mock_driver
mock_driver.find_elements_by_id.side_effect = [True, False]
create_webdriver()
create_webdriver()
mock_driver.add_cookie.assert_called_once()
@patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
@patch('superset.tasks.schedules.send_email_smtp')
@patch('superset.tasks.schedules.time')
def test_deliver_dashboard_inline(self, mtime, send_email_smtp, driver_class):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture('sample.png')
schedule = db.session.query(DashboardEmailSchedule).filter_by(
id=self.dashboard_schedule).all()[0]
deliver_dashboard(schedule)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
send_email_smtp.assert_called_once()
@patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
@patch('superset.tasks.schedules.send_email_smtp')
@patch('superset.tasks.schedules.time')
def test_deliver_dashboard_as_attachment(self, mtime, send_email_smtp, driver_class):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_id.return_value = element
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture('sample.png')
schedule = db.session.query(DashboardEmailSchedule).filter_by(
id=self.dashboard_schedule).all()[0]
schedule.delivery_type = EmailDeliveryType.attachment
deliver_dashboard(schedule)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
send_email_smtp.assert_called_once()
self.assertIsNone(send_email_smtp.call_args[1]['images'])
self.assertEquals(
send_email_smtp.call_args[1]['data']['screenshot.png'],
element.screenshot_as_png,
)
@patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
@patch('superset.tasks.schedules.send_email_smtp')
@patch('superset.tasks.schedules.time')
def test_dashboard_chrome_like(self, mtime, send_email_smtp, driver_class):
# Test functionality for chrome driver which does not support
# element snapshots
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
type(element).screenshot_as_png = PropertyMock(side_effect=WebDriverException)
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_id.return_value = element
driver.find_element_by_class_name.return_value = element
driver.screenshot.return_value = read_fixture('sample.png')
schedule = db.session.query(DashboardEmailSchedule).filter_by(
id=self.dashboard_schedule).all()[0]
deliver_dashboard(schedule)
mtime.sleep.assert_called_once()
driver.screenshot.assert_called_once()
send_email_smtp.assert_called_once()
self.assertEquals(send_email_smtp.call_args[0][0], self.RECIPIENTS)
self.assertEquals(
list(send_email_smtp.call_args[1]['images'].values())[0],
driver.screenshot.return_value,
)
@patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
@patch('superset.tasks.schedules.send_email_smtp')
@patch('superset.tasks.schedules.time')
def test_deliver_email_options(self, mtime, send_email_smtp, driver_class):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture('sample.png')
schedule = db.session.query(DashboardEmailSchedule).filter_by(
id=self.dashboard_schedule).all()[0]
# Send individual mails to the group
schedule.deliver_as_group = False
# Set a bcc email address
app.config['EMAIL_REPORT_BCC_ADDRESS'] = self.BCC
deliver_dashboard(schedule)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
self.assertEquals(send_email_smtp.call_count, 2)
self.assertEquals(send_email_smtp.call_args[1]['bcc'], self.BCC)
@patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
@patch('superset.tasks.schedules.send_email_smtp')
@patch('superset.tasks.schedules.time')
def test_deliver_slice_inline_image(self, mtime, send_email_smtp, driver_class):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture('sample.png')
schedule = db.session.query(SliceEmailSchedule).filter_by(
id=self.slice_schedule).all()[0]
schedule.email_format = SliceEmailReportFormat.visualization
schedule.delivery_format = EmailDeliveryType.inline
deliver_slice(schedule)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
send_email_smtp.assert_called_once()
self.assertEquals(
list(send_email_smtp.call_args[1]['images'].values())[0],
element.screenshot_as_png,
)
@patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
@patch('superset.tasks.schedules.send_email_smtp')
@patch('superset.tasks.schedules.time')
def test_deliver_slice_attachment(self, mtime, send_email_smtp, driver_class):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture('sample.png')
schedule = db.session.query(SliceEmailSchedule).filter_by(
id=self.slice_schedule).all()[0]
schedule.email_format = SliceEmailReportFormat.visualization
schedule.delivery_type = EmailDeliveryType.attachment
deliver_slice(schedule)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
send_email_smtp.assert_called_once()
self.assertEquals(
send_email_smtp.call_args[1]['data']['screenshot.png'],
element.screenshot_as_png,
)
@patch('superset.tasks.schedules.requests.get')
@patch('superset.tasks.schedules.send_email_smtp')
def test_deliver_slice_csv_attachment(self, send_email_smtp, get):
response = Mock()
get.return_value = response
response.raise_for_status.return_value = None
response.content = self.CSV
schedule = db.session.query(SliceEmailSchedule).filter_by(
id=self.slice_schedule).all()[0]
schedule.email_format = SliceEmailReportFormat.data
schedule.delivery_type = EmailDeliveryType.attachment
deliver_slice(schedule)
send_email_smtp.assert_called_once()
file_name = __('%(name)s.csv', name=schedule.slice.slice_name)
self.assertEquals(
send_email_smtp.call_args[1]['data'][file_name],
self.CSV,
)
@patch('superset.tasks.schedules.requests.get')
@patch('superset.tasks.schedules.send_email_smtp')
def test_deliver_slice_csv_inline(self, send_email_smtp, get):
response = Mock()
get.return_value = response
response.raise_for_status.return_value = None
response.content = self.CSV
schedule = db.session.query(SliceEmailSchedule).filter_by(
id=self.slice_schedule).all()[0]
schedule.email_format = SliceEmailReportFormat.data
schedule.delivery_type = EmailDeliveryType.inline
deliver_slice(schedule)
send_email_smtp.assert_called_once()
self.assertIsNone(send_email_smtp.call_args[1]['data'])
self.assertTrue('<table ' in send_email_smtp.call_args[0][2])

View File

@ -4,6 +4,10 @@ from os import path
FIXTURES_DIR = 'tests/fixtures'
def read_fixture(fixture_file_name):
with open(path.join(FIXTURES_DIR, fixture_file_name), 'rb') as fixture_file:
return fixture_file.read()
def load_fixture(fixture_file_name):
with open(path.join(FIXTURES_DIR, fixture_file_name)) as fixture_file:
return json.load(fixture_file)
return json.loads(read_fixture(fixture_file_name))