chore: remove old alerts and configs keys (#19261)

* remove templates

* remove models and more templates

* remove view

* remove tasks

* remove views

* remove schedule models

* remove views, init files config code

* remove supersetapp init code

* remove tests and clean up pylint errors

* remove unused import

* pylint

* run black

* remove deprecate notice
This commit is contained in:
Phillip Kelley-Dotson 2022-03-25 13:25:44 -07:00 committed by GitHub
parent 2b53578ad7
commit 394c9a19fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 6 additions and 3020 deletions

View File

@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime, timedelta
import click
from colorama import Fore
@ -23,7 +22,6 @@ from flask.cli import with_appcontext
import superset.utils.database as database_utils
from superset import app, security_manager
from superset.utils.celery import session_scope
logger = logging.getLogger(__name__)
@ -88,23 +86,3 @@ def load_test_users_run() -> None:
password="general",
)
sm.get_session.commit()
@click.command()
@with_appcontext
def alert() -> None:
"""Run the alert scheduler loop"""
# this command is just for testing purposes
# pylint: disable=import-outside-toplevel
from superset.models.schedules import ScheduleType
from superset.tasks.schedules import schedule_window
click.secho("Processing one alert loop", fg="green")
with session_scope(nullpool=True) as session:
schedule_window(
report_type=ScheduleType.alert,
start_at=datetime.now() - timedelta(1000),
stop_at=datetime.now(),
resolution=6000,
session=session,
)

View File

@ -1051,18 +1051,6 @@ def SQL_QUERY_MUTATOR( # pylint: disable=invalid-name,unused-argument
return sql
# Enable / disable scheduled email reports
#
# Warning: This config key is deprecated and will be removed in version 2.0.0"
ENABLE_SCHEDULED_EMAIL_REPORTS = False
# Enable / disable Alerts, where users can define custom SQL that
# will send emails with screenshots of charts or dashboards periodically
# if it meets the criteria
#
# Warning: This config key is deprecated and will be removed in version 2.0.0"
ENABLE_ALERTS = False
# ---------------------------------------------------
# Alerts & Reports
# ---------------------------------------------------

View File

@ -150,13 +150,7 @@ class SupersetAppInitializer: # pylint: disable=too-many-public-methods
from superset.reports.logs.api import ReportExecutionLogRestApi
from superset.security.api import SecurityRestApi
from superset.views.access_requests import AccessRequestsModelView
from superset.views.alerts import (
AlertLogModelView,
AlertModelView,
AlertObservationModelView,
AlertView,
ReportView,
)
from superset.views.alerts import AlertView
from superset.views.annotations import (
AnnotationLayerModelView,
AnnotationModelView,
@ -185,10 +179,6 @@ class SupersetAppInitializer: # pylint: disable=too-many-public-methods
from superset.views.log.api import LogRestApi
from superset.views.log.views import LogModelView
from superset.views.redirects import R
from superset.views.schedules import (
DashboardEmailScheduleView,
SliceEmailScheduleView,
)
from superset.views.sql_lab import (
SavedQueryView,
SavedQueryViewApi,
@ -393,50 +383,6 @@ class SupersetAppInitializer: # pylint: disable=too-many-public-methods
#
# Conditionally setup email views
#
if self.config["ENABLE_SCHEDULED_EMAIL_REPORTS"]:
logging.warning(
"ENABLE_SCHEDULED_EMAIL_REPORTS "
"is deprecated and will be removed in version 2.0.0"
)
appbuilder.add_separator(
"Manage", cond=lambda: self.config["ENABLE_SCHEDULED_EMAIL_REPORTS"]
)
appbuilder.add_view(
DashboardEmailScheduleView,
"Dashboard Email Schedules",
label=__("Dashboard Emails"),
category="Manage",
category_label=__("Manage"),
icon="fa-search",
menu_cond=lambda: self.config["ENABLE_SCHEDULED_EMAIL_REPORTS"],
)
appbuilder.add_view(
SliceEmailScheduleView,
"Chart Emails",
label=__("Chart Email Schedules"),
category="Manage",
category_label=__("Manage"),
icon="fa-search",
menu_cond=lambda: self.config["ENABLE_SCHEDULED_EMAIL_REPORTS"],
)
if self.config["ENABLE_ALERTS"]:
logging.warning(
"ENABLE_ALERTS is deprecated and will be removed in version 2.0.0"
)
appbuilder.add_view(
AlertModelView,
"Alerts",
label=__("Alerts"),
category="Manage",
category_label=__("Manage"),
icon="fa-exclamation-triangle",
menu_cond=lambda: bool(self.config["ENABLE_ALERTS"]),
)
appbuilder.add_view_no_menu(AlertLogModelView)
appbuilder.add_view_no_menu(AlertObservationModelView)
appbuilder.add_view(
AlertView,
@ -447,7 +393,6 @@ class SupersetAppInitializer: # pylint: disable=too-many-public-methods
icon="fa-exclamation-triangle",
menu_cond=lambda: feature_flag_manager.is_feature_enabled("ALERT_REPORTS"),
)
appbuilder.add_view_no_menu(ReportView)
appbuilder.add_view(
AccessRequestsModelView,

View File

@ -14,12 +14,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from . import (
alerts,
core,
datasource_access_request,
dynamic_plugins,
schedules,
sql_lab,
user_attributes,
)
from . import core, datasource_access_request, dynamic_plugins, sql_lab, user_attributes

View File

@ -1,176 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Models for scheduled execution of jobs"""
import json
import textwrap
from datetime import datetime
from typing import Any, Optional
from flask_appbuilder import Model
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Integer,
String,
Table,
Text,
)
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import backref, relationship, RelationshipProperty
from superset import db, security_manager
from superset.models.helpers import AuditMixinNullable
metadata = Model.metadata # pylint: disable=no-member
alert_owner = Table(
"alert_owner",
metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey("ab_user.id")),
Column("alert_id", Integer, ForeignKey("alerts.id")),
)
class Alert(Model, AuditMixinNullable):
"""Schedules for emailing slices / dashboards"""
__tablename__ = "alerts"
id = Column(Integer, primary_key=True)
label = Column(String(150), nullable=False)
active = Column(Boolean, default=True, index=True)
# TODO(bkyryliuk): enforce minimal supported frequency
crontab = Column(String(50), nullable=False)
alert_type = Column(String(50))
owners = relationship(security_manager.user_model, secondary=alert_owner)
recipients = Column(Text)
slack_channel = Column(Text)
# TODO(bkyryliuk): implement log_retention
log_retention = Column(Integer, default=90)
grace_period = Column(Integer, default=60 * 60 * 24)
slice_id = Column(Integer, ForeignKey("slices.id"))
slice = relationship("Slice", backref="alerts", foreign_keys=[slice_id])
dashboard_id = Column(Integer, ForeignKey("dashboards.id"))
dashboard = relationship("Dashboard", backref="alert", foreign_keys=[dashboard_id])
last_eval_dttm = Column(DateTime, default=datetime.utcnow)
last_state = Column(String(10))
# Observation related columns
sql = Column(Text, nullable=False)
# Validation related columns
validator_type = Column(String(100), nullable=False)
validator_config = Column(
Text,
default=textwrap.dedent(
"""
{
}
"""
),
)
@declared_attr
def database_id(self) -> int:
return Column(Integer, ForeignKey("dbs.id"), nullable=False)
@declared_attr
def database(self) -> RelationshipProperty:
return relationship(
"Database",
foreign_keys=[self.database_id],
backref=backref("sql_observers", cascade="all, delete-orphan"),
)
def get_last_observation(self) -> Optional[Any]:
observations = list(
db.session.query(SQLObservation)
.filter_by(alert_id=self.id)
.order_by(SQLObservation.dttm.desc())
.limit(1)
)
if observations:
return observations[0]
return None
def __str__(self) -> str:
return f"<{self.id}:{self.label}>"
@property
def pretty_config(self) -> str:
"""String representing the comparison that will trigger a validator"""
config = json.loads(self.validator_config)
if self.validator_type.lower() == "operator":
return f"{config['op']} {config['threshold']}"
if self.validator_type.lower() == "not null":
return "!= Null or 0"
return ""
class AlertLog(Model):
"""Keeps track of alert-related operations"""
__tablename__ = "alert_logs"
id = Column(Integer, primary_key=True)
scheduled_dttm = Column(DateTime)
dttm_start = Column(DateTime, default=datetime.utcnow)
dttm_end = Column(DateTime, default=datetime.utcnow)
alert_id = Column(Integer, ForeignKey("alerts.id"))
alert = relationship("Alert", backref="logs", foreign_keys=[alert_id])
state = Column(String(10))
@property
def duration(self) -> int:
return (self.dttm_end - self.dttm_start).total_seconds()
# TODO: Currently SQLObservation table will constantly grow with no limit,
# add some retention restriction or more to a more scalable db e.g.
# https://github.com/apache/superset/blob/master/superset/utils/log.py#L32
class SQLObservation(Model): # pylint: disable=too-few-public-methods
"""Keeps track of the collected observations for alerts."""
__tablename__ = "sql_observations"
id = Column(Integer, primary_key=True)
dttm = Column(DateTime, default=datetime.utcnow, index=True)
alert_id = Column(Integer, ForeignKey("alerts.id"))
alert = relationship(
"Alert",
foreign_keys=[alert_id],
backref=backref("observations", cascade="all, delete-orphan"),
)
value = Column(Float)
error_msg = Column(String(500))

View File

@ -1,104 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Models for scheduled execution of jobs"""
import enum
from typing import Optional, Type
from flask_appbuilder import Model
from sqlalchemy import Boolean, Column, Enum, ForeignKey, Integer, String, Text
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import relationship, RelationshipProperty
from superset import security_manager
from superset.models.alerts import Alert
from superset.models.helpers import AuditMixinNullable, ImportExportMixin
metadata = Model.metadata # pylint: disable=no-member
class ScheduleType(str, enum.Enum):
# pylint: disable=invalid-name
slice = "slice"
dashboard = "dashboard"
alert = "alert"
class EmailDeliveryType(str, enum.Enum):
# pylint: disable=invalid-name
attachment = "Attachment"
inline = "Inline"
class SliceEmailReportFormat(str, enum.Enum):
# pylint: disable=invalid-name
visualization = "Visualization"
data = "Raw data"
class EmailSchedule:
"""Schedules for emailing slices / dashboards"""
__tablename__ = "email_schedules"
id = Column(Integer, primary_key=True)
active = Column(Boolean, default=True, index=True)
crontab = Column(String(50))
@declared_attr
def user_id(self) -> int:
return Column(Integer, ForeignKey("ab_user.id"))
@declared_attr
def user(self) -> RelationshipProperty:
return relationship(
security_manager.user_model,
backref=self.__tablename__,
foreign_keys=[self.user_id],
)
recipients = Column(Text)
slack_channel = Column(Text)
deliver_as_group = Column(Boolean, default=False)
delivery_type = Column(Enum(EmailDeliveryType))
class DashboardEmailSchedule(
Model, AuditMixinNullable, ImportExportMixin, EmailSchedule
):
__tablename__ = "dashboard_email_schedules"
dashboard_id = Column(Integer, ForeignKey("dashboards.id"))
dashboard = relationship(
"Dashboard", backref="email_schedules", foreign_keys=[dashboard_id]
)
class SliceEmailSchedule(Model, AuditMixinNullable, ImportExportMixin, EmailSchedule):
__tablename__ = "slice_email_schedules"
slice_id = Column(Integer, ForeignKey("slices.id"))
slice = relationship("Slice", backref="email_schedules", foreign_keys=[slice_id])
email_format = Column(Enum(SliceEmailReportFormat))
def get_scheduler_model(report_type: str) -> Optional[Type[EmailSchedule]]:
if report_type == ScheduleType.dashboard:
return DashboardEmailSchedule
if report_type == ScheduleType.slice:
return SliceEmailSchedule
if report_type == ScheduleType.alert:
return Alert
return None

View File

@ -1,17 +0,0 @@
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@ -1,96 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime
from typing import Optional
import pandas as pd
from sqlalchemy.orm import Session
from superset import jinja_context
from superset.models.alerts import Alert, SQLObservation
logger = logging.getLogger("tasks.email_reports")
# Session needs to be passed along in the celery workers and db.session cannot be used.
# For more info see: https://github.com/apache/superset/issues/10530
def observe(alert_id: int, session: Session) -> Optional[str]:
"""Collect observations for the alert.
Returns an error message if the observer value was not valid
"""
alert = session.query(Alert).filter_by(id=alert_id).one()
value = None
tp = jinja_context.get_template_processor(database=alert.database)
rendered_sql = tp.process_template(alert.sql)
df = alert.database.get_df(rendered_sql)
error_msg = validate_observer_result(df, alert.id, alert.label)
if not error_msg and not df.empty and df.to_records()[0][1] is not None:
value = float(df.to_records()[0][1])
observation = SQLObservation(
alert_id=alert_id, dttm=datetime.utcnow(), value=value, error_msg=error_msg,
)
session.add(observation)
session.commit()
return error_msg
def validate_observer_result(
sql_result: pd.DataFrame, alert_id: int, alert_label: str
) -> Optional[str]:
"""
Verifies if a DataFrame SQL query result to see if
it contains a valid value for a SQLObservation.
Returns an error message if the result is invalid.
"""
try:
if sql_result.empty:
# empty results are used for the not null validator
return None
rows = sql_result.to_records()
assert (
len(rows) == 1
), f"Observer for alert <{alert_id}:{alert_label}> returned more than 1 row"
assert (
len(rows[0]) == 2
), f"Observer for alert <{alert_id}:{alert_label}> returned more than 1 column"
if rows[0][1] is None:
return None
float(rows[0][1])
except AssertionError as error:
return str(error)
except (TypeError, ValueError):
return (
f"Observer for alert <{alert_id}:{alert_label}> returned a non-number value"
)
return None

View File

@ -1,111 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import enum
import json
from operator import eq, ge, gt, le, lt, ne
from typing import Callable, Optional
import numpy as np
from superset.exceptions import SupersetException
from superset.models.alerts import Alert
OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne}
class AlertValidatorType(str, enum.Enum):
NOT_NULL = "not null"
OPERATOR = "operator"
@classmethod
def valid_type(cls, validator_type: str) -> bool:
return any(val_type.value == validator_type for val_type in cls)
def check_validator(validator_type: str, config: str) -> None:
if not AlertValidatorType.valid_type(validator_type):
raise SupersetException(
f"Error: {validator_type} is not a valid validator type."
)
config_dict = json.loads(config)
if validator_type == AlertValidatorType.OPERATOR.value:
if not (config_dict.get("op") and config_dict.get("threshold") is not None):
raise SupersetException(
"Error: Operator Validator needs specified operator and threshold "
'values. Add "op" and "threshold" to config.'
)
if not config_dict["op"] in OPERATOR_FUNCTIONS.keys():
raise SupersetException(
f'Error: {config_dict["op"]} is an invalid operator type. Change '
f'the "op" value in the config to one of '
f'["<", "<=", ">", ">=", "==", "!="]'
)
if not isinstance(config_dict["threshold"], (int, float)):
raise SupersetException(
f'Error: {config_dict["threshold"]} is an invalid threshold value.'
f' Change the "threshold" value in the config.'
)
def not_null_validator(
alert: Alert, validator_config: str # pylint: disable=unused-argument
) -> bool:
"""Returns True if a recent observation is not NULL"""
observation = alert.get_last_observation()
# TODO: Validate malformed observations/observations with errors separately
if (
not observation
or observation.error_msg
or observation.value in (0, None, np.nan)
):
return False
return True
def operator_validator(alert: Alert, validator_config: str) -> bool:
"""
Returns True if a recent observation is greater than or equal to
the value given in the validator config
"""
observation = alert.get_last_observation()
if not observation or observation.value in (None, np.nan):
return False
operator = json.loads(validator_config)["op"]
threshold = json.loads(validator_config)["threshold"]
return OPERATOR_FUNCTIONS[operator](observation.value, threshold)
def get_validator_function(
validator_type: str,
) -> Optional[Callable[[Alert, str], bool]]:
"""Returns a validation function based on validator_type"""
alert_validators = {
AlertValidatorType.NOT_NULL.value: not_null_validator,
AlertValidatorType.OPERATOR.value: operator_validator,
}
if alert_validators.get(validator_type.lower()):
return alert_validators[validator_type.lower()]
return None

View File

@ -32,7 +32,7 @@ flask_app = create_app()
# Need to import late, as the celery_app will have been setup by "create_app()"
# pylint: disable=wrong-import-position, unused-import
from . import cache, schedules, scheduler # isort:skip
from . import cache, scheduler # isort:skip
# Export the celery app globally for Celery (as run on the cmd line) to find
app = celery_app

View File

@ -1,855 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
DEPRECATION NOTICE: this module is deprecated as of v1.0.0.
It will be removed in future versions of Superset. Please
migrate to the new scheduler: `superset.tasks.scheduler`.
"""
import logging
import time
import urllib.request
from collections import namedtuple
from datetime import datetime, timedelta
from email.utils import make_msgid, parseaddr
from enum import Enum
from typing import (
Any,
Callable,
Dict,
Iterator,
NamedTuple,
Optional,
Tuple,
TYPE_CHECKING,
Union,
)
from urllib.error import URLError
import croniter
import simplejson as json
from celery.app.task import Task
from dateutil.tz import tzlocal
from flask import current_app, render_template, url_for
from flask_babel import gettext as __
from selenium.common.exceptions import WebDriverException
from selenium.webdriver import chrome, firefox
from selenium.webdriver.remote.webdriver import WebDriver
from sqlalchemy import func
from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError
from sqlalchemy.orm import Session
from superset import app, security_manager, thumbnail_cache
from superset.extensions import celery_app, machine_auth_provider_factory
from superset.models.alerts import Alert, AlertLog
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
EmailDeliveryType,
get_scheduler_model,
ScheduleType,
SliceEmailReportFormat,
)
from superset.models.slice import Slice
from superset.tasks.alerts.observer import observe
from superset.tasks.alerts.validator import get_validator_function
from superset.tasks.slack_util import deliver_slack_msg
from superset.utils.celery import session_scope
from superset.utils.core import get_email_address_list, send_email_smtp
from superset.utils.retries import retry_call
from superset.utils.screenshots import ChartScreenshot, WebDriverProxy
from superset.utils.urls import get_url_path
if TYPE_CHECKING:
from flask_appbuilder.security.sqla.models import User
from werkzeug.datastructures import TypeConversionDict
# Globals
config = app.config
logger = logging.getLogger("tasks.email_reports")
logger.setLevel(logging.INFO)
stats_logger = current_app.config["STATS_LOGGER"]
EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"]
WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"]
WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"]
ReportContent = namedtuple(
"ReportContent",
[
"body", # email body
"data", # attachments
"images", # embedded images for the email
"slack_message", # html not supported, only markdown
# attachments for the slack message, embedding not supported
"slack_attachment",
],
)
class ScreenshotData(NamedTuple):
url: str # url to chat/dashboard for this screenshot
image: Optional[bytes] # bytes for the screenshot
class AlertContent(NamedTuple):
label: str # alert name
sql: str # sql statement for alert
observation_value: str # value from observation that triggered the alert
validation_error_message: str # a string of the comparison that triggered an alert
alert_url: str # url to alert details
image_data: Optional[ScreenshotData] # data for the alert screenshot
def _get_email_to_and_bcc(
recipients: str, deliver_as_group: bool
) -> Iterator[Tuple[str, str]]:
bcc = config["EMAIL_REPORT_BCC_ADDRESS"]
if deliver_as_group:
to = recipients
yield (to, bcc)
else:
for to in get_email_address_list(recipients):
yield (to, bcc)
# TODO(bkyryliuk): move email functionality into a separate module.
def _deliver_email( # pylint: disable=too-many-arguments
recipients: str,
deliver_as_group: bool,
subject: str,
body: str,
data: Optional[Dict[str, Any]],
images: Optional[Dict[str, bytes]],
) -> None:
for (to, bcc) in _get_email_to_and_bcc(recipients, deliver_as_group):
send_email_smtp(
to,
subject,
body,
config,
data=data,
images=images,
bcc=bcc,
mime_subtype="related",
dryrun=config["SCHEDULED_EMAIL_DEBUG_MODE"],
)
def _generate_report_content(
delivery_type: EmailDeliveryType, screenshot: bytes, name: str, url: str
) -> ReportContent:
data: Optional[Dict[str, Any]]
# how to: https://api.slack.com/reference/surfaces/formatting
slack_message = __(
"""
*%(name)s*\n
<%(url)s|Explore in Superset>
""",
name=name,
url=url,
)
if delivery_type == EmailDeliveryType.attachment:
images = None
data = {"screenshot": screenshot}
body = __(
'<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
name=name,
url=url,
)
elif delivery_type == EmailDeliveryType.inline:
# Get the domain from the 'From' address ..
# and make a message id without the < > in the ends
domain = parseaddr(config["SMTP_MAIL_FROM"])[1].split("@")[1]
msgid = make_msgid(domain)[1:-1]
images = {msgid: screenshot}
data = None
body = __(
"""
<b><a href="%(url)s">Explore in Superset</a></b><p></p>
<img src="cid:%(msgid)s">
""",
name=name,
url=url,
msgid=msgid,
)
return ReportContent(body, data, images, slack_message, screenshot)
def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str:
with app.test_request_context():
base_url = (
WEBDRIVER_BASEURL_USER_FRIENDLY if user_friendly else WEBDRIVER_BASEURL
)
return urllib.parse.urljoin(str(base_url), url_for(view, **kwargs))
def create_webdriver(session: Session) -> WebDriver:
return WebDriverProxy(driver_type=config["WEBDRIVER_TYPE"]).auth(
get_reports_user(session)
)
def get_reports_user(session: Session) -> "User":
return (
session.query(security_manager.user_model)
.filter(
func.lower(security_manager.user_model.username)
== func.lower(config["EMAIL_REPORTS_USER"])
)
.one()
)
def destroy_webdriver(
driver: Union[chrome.webdriver.WebDriver, firefox.webdriver.WebDriver]
) -> None:
"""
Destroy a driver
"""
# This is some very flaky code in selenium. Hence the retries
# and catch-all exceptions
try:
retry_call(driver.close, max_tries=2)
except Exception: # pylint: disable=broad-except
pass
try:
driver.quit()
except Exception: # pylint: disable=broad-except
pass
def deliver_dashboard( # pylint: disable=too-many-locals
dashboard_id: int,
recipients: Optional[str],
slack_channel: Optional[str],
delivery_type: EmailDeliveryType,
deliver_as_group: bool,
) -> None:
"""
Given a schedule, delivery the dashboard as an email report
"""
with session_scope(nullpool=True) as session:
dashboard = session.query(Dashboard).filter_by(id=dashboard_id).one()
dashboard_url = _get_url_path(
"Superset.dashboard", dashboard_id_or_slug=dashboard.id
)
dashboard_url_user_friendly = _get_url_path(
"Superset.dashboard", user_friendly=True, dashboard_id_or_slug=dashboard.id
)
# Create a driver, fetch the page, wait for the page to render
driver = create_webdriver(session)
window = config["WEBDRIVER_WINDOW"]["dashboard"]
driver.set_window_size(*window)
driver.get(dashboard_url)
time.sleep(EMAIL_PAGE_RENDER_WAIT)
# Set up a function to retry once for the element.
# This is buggy in certain selenium versions with firefox driver
get_element = getattr(driver, "find_element_by_class_name")
element = retry_call(
get_element,
fargs=["grid-container"],
max_tries=2,
interval=EMAIL_PAGE_RENDER_WAIT,
)
try:
screenshot = element.screenshot_as_png
except WebDriverException:
# Some webdrivers do not support screenshots for elements.
# In such cases, take a screenshot of the entire page.
screenshot = driver.screenshot()
finally:
destroy_webdriver(driver)
# Generate the email body and attachments
report_content = _generate_report_content(
delivery_type,
screenshot,
dashboard.dashboard_title,
dashboard_url_user_friendly,
)
subject = __(
"%(prefix)s %(title)s",
prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"],
title=dashboard.dashboard_title,
)
if recipients:
_deliver_email(
recipients,
deliver_as_group,
subject,
report_content.body,
report_content.data,
report_content.images,
)
if slack_channel:
deliver_slack_msg(
slack_channel,
subject,
report_content.slack_message,
report_content.slack_attachment,
)
def _get_slice_data(
slc: Slice, delivery_type: EmailDeliveryType, session: Session
) -> ReportContent:
slice_url = _get_url_path(
"Superset.explore_json", csv="true", form_data=json.dumps({"slice_id": slc.id})
)
# URL to include in the email
slice_url_user_friendly = _get_url_path(
"Superset.slice", slice_id=slc.id, user_friendly=True
)
# Login on behalf of the "reports" user in order to get cookies to deal with auth
auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(
get_reports_user(session)
)
# Build something like "session=cool_sess.val;other-cookie=awesome_other_cookie"
cookie_str = ";".join([f"{key}={val}" for key, val in auth_cookies.items()])
opener = urllib.request.build_opener()
opener.addheaders.append(("Cookie", cookie_str))
response = opener.open(slice_url)
if response.getcode() != 200:
raise URLError(response.getcode())
# TODO: Move to the csv module
content = response.read()
rows = [r.split(b",") for r in content.splitlines()]
if delivery_type == EmailDeliveryType.inline:
data = None
# Parse the csv file and generate HTML
columns = rows.pop(0)
with app.app_context():
body = render_template(
"superset/reports/slice_data.html",
columns=columns,
rows=rows,
name=slc.slice_name,
link=slice_url_user_friendly,
)
elif delivery_type == EmailDeliveryType.attachment:
data = {__("%(name)s.csv", name=slc.slice_name): content}
body = __(
'<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
name=slc.slice_name,
url=slice_url_user_friendly,
)
# how to: https://api.slack.com/reference/surfaces/formatting
slack_message = __(
"""
*%(slice_name)s*\n
<%(slice_url_user_friendly)s|Explore in Superset>
""",
slice_name=slc.slice_name,
slice_url_user_friendly=slice_url_user_friendly,
)
return ReportContent(body, data, None, slack_message, content)
def _get_slice_screenshot(slice_id: int, session: Session) -> ScreenshotData:
slice_obj = session.query(Slice).get(slice_id)
chart_url = get_url_path("Superset.slice", slice_id=slice_obj.id, standalone="true")
screenshot = ChartScreenshot(chart_url, slice_obj.digest)
image_url = _get_url_path(
"Superset.slice", user_friendly=True, slice_id=slice_obj.id,
)
user = security_manager.get_user_by_username(
current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
)
image_data = screenshot.compute_and_cache(
user=user, cache=thumbnail_cache, force=True,
)
session.commit()
return ScreenshotData(image_url, image_data)
def _get_slice_visualization(
slc: Slice, delivery_type: EmailDeliveryType, session: Session
) -> ReportContent:
# Create a driver, fetch the page, wait for the page to render
driver = create_webdriver(session)
window = config["WEBDRIVER_WINDOW"]["slice"]
driver.set_window_size(*window)
slice_url = _get_url_path("Superset.slice", slice_id=slc.id)
slice_url_user_friendly = _get_url_path(
"Superset.slice", slice_id=slc.id, user_friendly=True
)
driver.get(slice_url)
time.sleep(EMAIL_PAGE_RENDER_WAIT)
# Set up a function to retry once for the element.
# This is buggy in certain selenium versions with firefox driver
element = retry_call(
driver.find_element_by_class_name,
fargs=["chart-container"],
max_tries=2,
interval=EMAIL_PAGE_RENDER_WAIT,
)
try:
screenshot = element.screenshot_as_png
except WebDriverException:
# Some webdrivers do not support screenshots for elements.
# In such cases, take a screenshot of the entire page.
screenshot = driver.screenshot()
finally:
destroy_webdriver(driver)
# Generate the email body and attachments
return _generate_report_content(
delivery_type, screenshot, slc.slice_name, slice_url_user_friendly
)
def deliver_slice( # pylint: disable=too-many-arguments
slice_id: int,
recipients: Optional[str],
slack_channel: Optional[str],
delivery_type: EmailDeliveryType,
email_format: SliceEmailReportFormat,
deliver_as_group: bool,
session: Session,
) -> None:
"""
Given a schedule, delivery the slice as an email report
"""
slc = session.query(Slice).filter_by(id=slice_id).one()
if email_format == SliceEmailReportFormat.data:
report_content = _get_slice_data(slc, delivery_type, session)
elif email_format == SliceEmailReportFormat.visualization:
report_content = _get_slice_visualization(slc, delivery_type, session)
else:
raise RuntimeError("Unknown email report format")
subject = __(
"%(prefix)s %(title)s",
prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"],
title=slc.slice_name,
)
if recipients:
_deliver_email(
recipients,
deliver_as_group,
subject,
report_content.body,
report_content.data,
report_content.images,
)
if slack_channel:
deliver_slack_msg(
slack_channel,
subject,
report_content.slack_message,
report_content.slack_attachment,
)
@celery_app.task(
name="email_reports.send",
bind=True,
soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
)
def schedule_email_report(
_task: Task,
report_type: ScheduleType,
schedule_id: int,
recipients: Optional[str] = None,
slack_channel: Optional[str] = None,
) -> None:
model_cls = get_scheduler_model(report_type)
with session_scope(nullpool=True) as session:
schedule = session.query(model_cls).get(schedule_id)
# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logger.info("Ignoring deactivated schedule")
return
recipients = recipients or schedule.recipients
slack_channel = slack_channel or schedule.slack_channel
logger.info(
"Starting report for slack: %s and recipients: %s.",
slack_channel,
recipients,
)
if report_type == ScheduleType.dashboard:
deliver_dashboard(
schedule.dashboard_id,
recipients,
slack_channel,
schedule.delivery_type,
schedule.deliver_as_group,
)
elif report_type == ScheduleType.slice:
deliver_slice(
schedule.slice_id,
recipients,
slack_channel,
schedule.delivery_type,
schedule.email_format,
schedule.deliver_as_group,
session,
)
else:
raise RuntimeError("Unknown report type")
@celery_app.task(
name="alerts.run_query",
bind=True,
# TODO: find cause of https://github.com/apache/superset/issues/10530
# and remove retry
autoretry_for=(NoSuchColumnError, ResourceClosedError,),
retry_kwargs={"max_retries": 1},
retry_backoff=True,
)
def schedule_alert_query(
_task: Task,
report_type: ScheduleType,
schedule_id: int,
recipients: Optional[str] = None,
slack_channel: Optional[str] = None,
) -> None:
model_cls = get_scheduler_model(report_type)
with session_scope(nullpool=True) as session:
schedule = session.query(model_cls).get(schedule_id)
# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logger.info("Ignoring deactivated alert")
return
if report_type == ScheduleType.alert:
evaluate_alert(
schedule.id, schedule.label, session, recipients, slack_channel
)
else:
raise RuntimeError("Unknown report type")
class AlertState(str, Enum):
ERROR = "error"
TRIGGER = "trigger"
PASS = "pass"
def deliver_alert(
alert_id: int,
session: Session,
recipients: Optional[str] = None,
slack_channel: Optional[str] = None,
) -> None:
"""
Gathers alert information and sends out the alert
to its respective email and slack recipients
"""
alert = session.query(Alert).get(alert_id)
logging.info("Triggering alert: %s", alert)
# Set all the values for the alert report
# Alternate values are used in the case of a test alert
# where an alert might not have a validator
recipients = recipients or alert.recipients
slack_channel = slack_channel or alert.slack_channel
validation_error_message = (
str(alert.observations[-1].value) + " " + alert.pretty_config
)
if alert.slice:
alert_content = AlertContent(
alert.label,
alert.sql,
str(alert.observations[-1].value),
validation_error_message,
_get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id),
_get_slice_screenshot(alert.slice.id, session),
)
else:
# TODO: dashboard delivery!
alert_content = AlertContent(
alert.label,
alert.sql,
str(alert.observations[-1].value),
validation_error_message,
_get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id),
None,
)
if recipients:
deliver_email_alert(alert_content, recipients)
if slack_channel:
deliver_slack_alert(alert_content, slack_channel)
def deliver_email_alert(alert_content: AlertContent, recipients: str) -> None:
"""Delivers an email alert to the given email recipients"""
subject = f"[Superset] Triggered alert: {alert_content.label}"
deliver_as_group = False
data = None
images = {}
# TODO(JasonD28): add support for emails with no screenshot
image_url = None
if alert_content.image_data:
image_url = alert_content.image_data.url
if alert_content.image_data.image:
images = {"screenshot": alert_content.image_data.image}
body = render_template(
"email/alert.txt",
alert_url=alert_content.alert_url,
label=alert_content.label,
sql=alert_content.sql,
observation_value=alert_content.observation_value,
validation_error_message=alert_content.validation_error_message,
image_url=image_url,
)
_deliver_email(recipients, deliver_as_group, subject, body, data, images)
def deliver_slack_alert(alert_content: AlertContent, slack_channel: str) -> None:
"""Delivers a slack alert to the given slack channel"""
subject = __("[Alert] %(label)s", label=alert_content.label)
image = None
if alert_content.image_data:
slack_message = render_template(
"slack/alert.txt",
label=alert_content.label,
sql=alert_content.sql,
observation_value=alert_content.observation_value,
validation_error_message=alert_content.validation_error_message,
url=alert_content.image_data.url,
alert_url=alert_content.alert_url,
)
image = alert_content.image_data.image
else:
slack_message = render_template(
"slack/alert_no_screenshot.txt",
label=alert_content.label,
sql=alert_content.sql,
observation_value=alert_content.observation_value,
validation_error_message=alert_content.validation_error_message,
alert_url=alert_content.alert_url,
)
deliver_slack_msg(
slack_channel, subject, slack_message, image,
)
def evaluate_alert(
alert_id: int,
label: str,
session: Session,
recipients: Optional[str] = None,
slack_channel: Optional[str] = None,
) -> None:
"""Processes an alert to see if it should be triggered"""
logger.info("Processing alert ID: %i", alert_id)
state = None
dttm_start = datetime.utcnow()
try:
logger.info("Querying observers for alert <%s:%s>", alert_id, label)
error_msg = observe(alert_id, session)
if error_msg:
state = AlertState.ERROR
logging.error(error_msg)
except Exception as exc: # pylint: disable=broad-except
state = AlertState.ERROR
logging.exception(exc)
logging.error("Failed at query observers for alert: %s (%s)", label, alert_id)
dttm_end = datetime.utcnow()
if state != AlertState.ERROR:
# Don't validate alert on test runs since it may not be triggered
if recipients or slack_channel:
deliver_alert(alert_id, session, recipients, slack_channel)
state = AlertState.TRIGGER
# Validate during regular workflow and deliver only if triggered
elif validate_observations(alert_id, label, session):
deliver_alert(alert_id, session, recipients, slack_channel)
state = AlertState.TRIGGER
else:
state = AlertState.PASS
session.commit()
alert = session.query(Alert).get(alert_id)
if state != AlertState.ERROR:
alert.last_eval_dttm = dttm_end
alert.last_state = state
alert.logs.append(
AlertLog(
scheduled_dttm=dttm_start,
dttm_start=dttm_start,
dttm_end=dttm_end,
state=state,
)
)
session.commit()
def validate_observations(alert_id: int, label: str, session: Session) -> bool:
"""
Runs an alert's validators to check if it should be triggered or not
If so, return the name of the validator that returned true
"""
logger.info("Validating observations for alert <%s:%s>", alert_id, label)
alert = session.query(Alert).get(alert_id)
validate = get_validator_function(alert.validator_type)
return bool(validate and validate(alert, alert.validator_config))
def next_schedules(
crontab: str, start_at: datetime, stop_at: datetime, resolution: int = 0
) -> Iterator[datetime]:
crons = croniter.croniter(crontab, start_at - timedelta(seconds=1))
previous = start_at - timedelta(days=1)
for eta in crons.all_next(datetime):
# Do not cross the time boundary
if eta >= stop_at:
break
if eta < start_at:
continue
# Do not allow very frequent tasks
if eta - previous < timedelta(seconds=resolution):
continue
yield eta
previous = eta
def schedule_window(
report_type: str,
start_at: datetime,
stop_at: datetime,
resolution: int,
session: Session,
) -> None:
"""
Find all active schedules and schedule celery tasks for
each of them with a specific ETA (determined by parsing
the cron schedule for the schedule)
"""
model_cls = get_scheduler_model(report_type)
if not model_cls:
return None
schedules = session.query(model_cls).filter(model_cls.active.is_(True))
for schedule in schedules:
logging.info("Processing schedule %s", schedule)
args = (report_type, schedule.id)
schedule_start_at = start_at
if (
hasattr(schedule, "last_eval_dttm")
and schedule.last_eval_dttm
and schedule.last_eval_dttm > start_at
):
schedule_start_at = schedule.last_eval_dttm + timedelta(seconds=1)
# Schedule the job for the specified time window
for eta in next_schedules(
schedule.crontab, schedule_start_at, stop_at, resolution=resolution
):
logging.info("Scheduled eta %s", eta)
get_scheduler_action(report_type).apply_async(args, eta=eta) # type: ignore
return None
def get_scheduler_action(report_type: str) -> Optional[Callable[..., Any]]:
if report_type == ScheduleType.dashboard:
return schedule_email_report
if report_type == ScheduleType.slice:
return schedule_email_report
if report_type == ScheduleType.alert:
return schedule_alert_query
return None
@celery_app.task(name="email_reports.schedule_hourly")
def schedule_hourly() -> None:
"""Celery beat job meant to be invoked hourly"""
if not config["ENABLE_SCHEDULED_EMAIL_REPORTS"]:
logger.info("Scheduled email reports not enabled in config")
return
resolution = config["EMAIL_REPORTS_CRON_RESOLUTION"] * 60
# Get the top of the hour
start_at = datetime.now(tzlocal()).replace(microsecond=0, second=0, minute=0)
stop_at = start_at + timedelta(seconds=3600)
with session_scope(nullpool=True) as session:
schedule_window(ScheduleType.dashboard, start_at, stop_at, resolution, session)
schedule_window(ScheduleType.slice, start_at, stop_at, resolution, session)
@celery_app.task(name="alerts.schedule_check")
def schedule_alerts() -> None:
"""Celery beat job meant to be invoked every minute to check alerts"""
resolution = 0
now = datetime.utcnow()
start_at = now - timedelta(
seconds=300
) # process any missed tasks in the past few minutes
stop_at = now + timedelta(seconds=1)
with session_scope(nullpool=True) as session:
schedule_window(ScheduleType.alert, start_at, stop_at, resolution, session)

View File

@ -27,7 +27,6 @@ from . import (
dynamic_plugins,
health,
redirects,
schedules,
sql_lab,
tags,
)

View File

@ -14,76 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
DEPRECATION NOTICE: this module is deprecated and will be removed on 2.0.
"""
from croniter import croniter
from flask import abort, current_app as app, flash, Markup
from flask_appbuilder import CompactCRUDMixin, permission_name
from flask import abort
from flask_appbuilder import permission_name
from flask_appbuilder.api import expose
from flask_appbuilder.hooks import before_request
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_appbuilder.security.decorators import has_access
from flask_babel import lazy_gettext as _
from werkzeug.exceptions import NotFound
from superset import is_feature_enabled
from superset.constants import RouteMethod
from superset.models.alerts import Alert, AlertLog, SQLObservation
from superset.superset_typing import FlaskResponse
from superset.tasks.alerts.validator import check_validator
from superset.utils import core as utils
from superset.utils.core import get_email_address_str, markdown
from ..exceptions import SupersetException
from .base import BaseSupersetView, SupersetModelView
from .base import BaseSupersetView
# TODO: access control rules for this module
class EnsureEnabledMixin:
@staticmethod
def is_enabled() -> bool:
return bool(app.config["ENABLE_ALERTS"])
@before_request
def ensure_enabled(self) -> None:
if not self.is_enabled():
raise NotFound()
class AlertLogModelView(
CompactCRUDMixin, EnsureEnabledMixin, SupersetModelView
): # pylint: disable=too-many-ancestors
datamodel = SQLAInterface(AlertLog)
include_route_methods = {RouteMethod.LIST} | {"show"}
base_order = ("dttm_start", "desc")
list_columns = (
"scheduled_dttm",
"dttm_start",
"duration",
"state",
)
class AlertObservationModelView(
CompactCRUDMixin, EnsureEnabledMixin, SupersetModelView
): # pylint: disable=too-many-ancestors
datamodel = SQLAInterface(SQLObservation)
include_route_methods = {RouteMethod.LIST} | {"show"}
base_order = ("dttm", "desc")
list_title = _("List Observations")
show_title = _("Show Observation")
list_columns = (
"dttm",
"value",
"error_msg",
)
label_columns = {
"error_msg": _("Error Message"),
}
class BaseAlertReportView(BaseSupersetView):
route_base = "/report"
class_permission_name = "ReportSchedule"
@ -109,144 +52,3 @@ class BaseAlertReportView(BaseSupersetView):
class AlertView(BaseAlertReportView):
route_base = "/alert"
class_permission_name = "ReportSchedule"
class ReportView(BaseAlertReportView):
route_base = "/report"
class_permission_name = "ReportSchedule"
class AlertModelView(EnsureEnabledMixin, SupersetModelView):
datamodel = SQLAInterface(Alert)
route_base = "/alerts"
include_route_methods = RouteMethod.CRUD_SET | {"log"}
list_columns = (
"label",
"owners",
"database",
"sql",
"pretty_config",
"crontab",
"last_eval_dttm",
"last_state",
"active",
"owners",
)
show_columns = (
"label",
"database",
"sql",
"validator_type",
"validator_config",
"active",
"crontab",
"owners",
"slice",
"recipients",
"slack_channel",
"log_retention",
"grace_period",
"last_eval_dttm",
"last_state",
)
order_columns = ["label", "last_eval_dttm", "last_state", "active"]
add_columns = (
"label",
"database",
"sql",
"validator_type",
"validator_config",
"active",
"crontab",
# TODO: implement different types of alerts
# "alert_type",
"owners",
"recipients",
"slack_channel",
"slice",
# TODO: implement dashboard screenshots with alerts
# "dashboard",
"log_retention",
"grace_period",
)
label_columns = {
"log_retention": _("Log Retentions (days)"),
}
description_columns = {
"crontab": markdown(
"A CRON-like expression. "
"[Crontab Guru](https://crontab.guru/) is "
"a helpful resource that can help you craft a CRON expression.",
True,
),
"recipients": _("A semicolon ';' delimited list of email addresses"),
"log_retention": _("How long to keep the logs around for this alert"),
"grace_period": _(
"Once an alert is triggered, how long, in seconds, before "
"Superset nags you again."
),
"sql": _(
"A SQL statement that defines whether the alert should get triggered or "
"not. The query is expected to return either NULL or a number value."
),
"validator_type": utils.markdown(
"Determines when to trigger alert based off value from alert query. "
"Alerts will be triggered with these validator types:"
"<ul><li>Not Null - When the return value is Not NULL, Empty, or 0</li>"
"<li>Operator - When `sql_return_value comparison_operator threshold`"
" is True e.g. `50 <= 75`<br>Supports the comparison operators <, <=, "
">, >=, ==, and !=</li></ul>",
True,
),
"validator_config": utils.markdown(
"JSON string containing values the validator will compare against. "
"Each validator needs the following values:"
"<ul><li>Not Null - Nothing. You can leave the config as it is.</li>"
'<li>Operator<ul><li>`"op": "operator"` with an operator from ["<", '
'"<=", ">", ">=", "==", "!="] e.g. `"op": ">="`</li>'
'<li>`"threshold": threshold_value` e.g. `"threshold": 50`'
'</li></ul>Example config:<br>{<br> "op":">=",<br>"threshold": 60<br>}'
"</li></ul>",
True,
),
}
edit_columns = add_columns
related_views = [
AlertObservationModelView,
AlertLogModelView,
]
@expose("/list/")
@has_access
def list(self) -> FlaskResponse:
flash(
Markup(
_(
"This feature is deprecated and will be removed on 2.0. "
"Take a look at the replacement feature "
"<a href="
"'https://superset.apache.org/docs/installation/alerts-reports'>"
"Alerts & Reports documentation</a>"
)
),
"warning",
)
return super().list()
def pre_add(self, item: "AlertModelView") -> None:
item.recipients = get_email_address_str(item.recipients)
if not croniter.is_valid(item.crontab):
raise SupersetException("Invalid crontab format")
item.validator_type = item.validator_type.lower()
check_validator(item.validator_type, item.validator_config)
def pre_update(self, item: "AlertModelView") -> None:
item.validator_type = item.validator_type.lower()
check_validator(item.validator_type, item.validator_config)
def post_update(self, item: "AlertModelView") -> None:
self.post_add(item)

View File

@ -1,349 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
DEPRECATION NOTICE: this module is deprecated and will be removed on 2.0.
"""
import enum
from typing import Type, Union
import simplejson as json
from croniter import croniter
from flask import current_app as app, flash, g, Markup
from flask_appbuilder import expose
from flask_appbuilder.hooks import before_request
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_appbuilder.security.decorators import has_access
from flask_babel import lazy_gettext as _
from werkzeug.exceptions import NotFound
from wtforms import BooleanField, Form, StringField
from superset import db, security_manager
from superset.constants import RouteMethod
from superset.exceptions import SupersetException
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
DashboardEmailSchedule,
ScheduleType,
SliceEmailSchedule,
)
from superset.models.slice import Slice
from superset.superset_typing import FlaskResponse
from superset.tasks.schedules import schedule_email_report
from superset.utils.core import get_email_address_list, json_iso_dttm_ser
from superset.views.core import json_success
from .base import DeleteMixin, SupersetModelView
class EmailScheduleView(SupersetModelView, DeleteMixin):
include_route_methods = RouteMethod.CRUD_SET
_extra_data = {"test_email": False, "test_email_recipients": None}
@staticmethod
def is_enabled() -> bool:
return app.config["ENABLE_SCHEDULED_EMAIL_REPORTS"]
@before_request
def ensure_enabled(self) -> None:
if not self.is_enabled():
raise NotFound()
@property
def schedule_type(self) -> str:
raise NotImplementedError()
@property
def schedule_type_model(self) -> Type[Union[Dashboard, Slice]]:
raise NotImplementedError()
page_size = 20
add_exclude_columns = [
"user",
"created_on",
"changed_on",
"created_by",
"changed_by",
]
edit_exclude_columns = add_exclude_columns
description_columns = {
"deliver_as_group": "If enabled, send a single email to all "
"recipients (in email/To: field)",
"crontab": "Unix style crontab schedule to deliver emails. "
"Changes to schedules reflect in one hour.",
"delivery_type": "Indicates how the rendered content is delivered",
}
add_form_extra_fields = {
"test_email": BooleanField(
"Send Test Email",
default=False,
description="If enabled, we send a test mail on create / update",
),
"test_email_recipients": StringField(
"Test Email Recipients",
default=None,
description="List of recipients to send test email to. "
"If empty, we send it to the original recipients",
),
"test_slack_channel": StringField(
"Test Slack Channel",
default=None,
description="A slack channel to send a test message to.",
),
}
edit_form_extra_fields = add_form_extra_fields
def process_form(self, form: Form, is_created: bool) -> None:
if form.test_email_recipients.data:
test_email_recipients = form.test_email_recipients.data.strip()
else:
test_email_recipients = None
test_slack_channel = (
form.test_slack_channel.data.strip()
if form.test_slack_channel.data
else None
)
self._extra_data["test_email"] = form.test_email.data
self._extra_data["test_email_recipients"] = test_email_recipients
self._extra_data["test_slack_channel"] = test_slack_channel
def pre_add(self, item: "EmailScheduleView") -> None:
try:
recipients = get_email_address_list(item.recipients)
item.recipients = ", ".join(recipients)
except Exception as ex:
raise SupersetException("Invalid email list") from ex
item.user = item.user or g.user
if not croniter.is_valid(item.crontab):
raise SupersetException("Invalid crontab format")
def pre_update(self, item: "EmailScheduleView") -> None:
self.pre_add(item)
def post_add(self, item: "EmailScheduleView") -> None:
# Schedule a test mail if the user requested for it.
if self._extra_data["test_email"]:
recipients = self._extra_data["test_email_recipients"] or item.recipients
slack_channel = self._extra_data["test_slack_channel"] or item.slack_channel
args = (self.schedule_type, item.id)
kwargs = dict(recipients=recipients, slack_channel=slack_channel)
schedule_email_report.apply_async(args=args, kwargs=kwargs)
# Notify the user that schedule changes will be activate only in the
# next hour
if item.active:
flash("Schedule changes will get applied in one hour", "warning")
def post_update(self, item: "EmailScheduleView") -> None:
self.post_add(item)
@has_access
@expose("/fetch/<int:item_id>/", methods=["GET"])
def fetch_schedules(self, item_id: int) -> FlaskResponse:
query = db.session.query(self.datamodel.obj)
query = query.join(self.schedule_type_model).filter(
self.schedule_type_model.id == item_id
)
schedules = []
for schedule in query.all():
info = {"schedule": schedule.id}
for col in self.list_columns + self.add_exclude_columns:
info[col] = getattr(schedule, col)
if isinstance(info[col], enum.Enum):
info[col] = info[col].name
elif isinstance(info[col], security_manager.user_model):
info[col] = info[col].username
info["user"] = schedule.user.username
info[self.schedule_type] = getattr(schedule, self.schedule_type).id
schedules.append(info)
return json_success(json.dumps(schedules, default=json_iso_dttm_ser))
class DashboardEmailScheduleView(
EmailScheduleView
): # pylint: disable=too-many-ancestors
schedule_type = ScheduleType.dashboard
schedule_type_model = Dashboard
add_title = _("Schedule Email Reports for Dashboards")
edit_title = add_title
list_title = _("Manage Email Reports for Dashboards")
datamodel = SQLAInterface(DashboardEmailSchedule)
order_columns = ["user", "dashboard", "created_on"]
list_columns = [
"dashboard",
"active",
"crontab",
"user",
"deliver_as_group",
"delivery_type",
]
add_columns = [
"dashboard",
"active",
"crontab",
"recipients",
"slack_channel",
"deliver_as_group",
"delivery_type",
"test_email",
"test_email_recipients",
"test_slack_channel",
]
edit_columns = add_columns
search_columns = [
"dashboard",
"active",
"user",
"deliver_as_group",
"delivery_type",
]
label_columns = {
"dashboard": _("Dashboard"),
"created_on": _("Created On"),
"changed_on": _("Changed On"),
"user": _("User"),
"active": _("Active"),
"crontab": _("Crontab"),
"recipients": _("Recipients"),
"slack_channel": _("Slack Channel"),
"deliver_as_group": _("Deliver As Group"),
"delivery_type": _("Delivery Type"),
}
@expose("/list/")
@has_access
def list(self) -> FlaskResponse:
flash(
Markup(
_(
"This feature is deprecated and will be removed on 2.0. "
"Take a look at the replacement feature "
"<a href="
"'https://superset.apache.org/docs/installation/alerts-reports'>"
"Alerts & Reports documentation</a>"
)
),
"warning",
)
return super().list()
def pre_add(self, item: "DashboardEmailScheduleView") -> None:
if item.dashboard is None:
raise SupersetException("Dashboard is mandatory")
super().pre_add(item)
class SliceEmailScheduleView(EmailScheduleView): # pylint: disable=too-many-ancestors
schedule_type = ScheduleType.slice
schedule_type_model = Slice
add_title = _("Schedule Email Reports for Charts")
edit_title = add_title
list_title = _("Manage Email Reports for Charts")
datamodel = SQLAInterface(SliceEmailSchedule)
order_columns = ["user", "slice", "created_on"]
list_columns = [
"slice",
"active",
"crontab",
"user",
"deliver_as_group",
"delivery_type",
"email_format",
]
add_columns = [
"slice",
"active",
"crontab",
"recipients",
"slack_channel",
"deliver_as_group",
"delivery_type",
"email_format",
"test_email",
"test_email_recipients",
"test_slack_channel",
]
edit_columns = add_columns
search_columns = [
"slice",
"active",
"user",
"deliver_as_group",
"delivery_type",
"email_format",
]
label_columns = {
"slice": _("Chart"),
"created_on": _("Created On"),
"changed_on": _("Changed On"),
"user": _("User"),
"active": _("Active"),
"crontab": _("Crontab"),
"recipients": _("Recipients"),
"slack_channel": _("Slack Channel"),
"deliver_as_group": _("Deliver As Group"),
"delivery_type": _("Delivery Type"),
"email_format": _("Email Format"),
}
@expose("/list/")
@has_access
def list(self) -> FlaskResponse:
flash(
Markup(
_(
"This feature is deprecated and will be removed on 2.0. "
"Take a look at the replacement feature "
"<a href="
"'https://superset.apache.org/docs/installation/alerts-reports'>"
"Alerts & Reports documentation</a>"
)
),
"warning",
)
return super().list()
def pre_add(self, item: "SliceEmailScheduleView") -> None:
if item.slice is None:
raise SupersetException("Slice is mandatory")
super().pre_add(item)

View File

@ -1,414 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit tests for alerting in Superset"""
import json
import logging
from unittest.mock import patch
import pytest
from sqlalchemy.orm import Session
import superset.utils.database
from superset import db
from superset.exceptions import SupersetException
from superset.models.alerts import Alert, AlertLog, SQLObservation
from superset.models.slice import Slice
from superset.tasks.alerts.observer import observe
from superset.tasks.alerts.validator import (
AlertValidatorType,
check_validator,
not_null_validator,
operator_validator,
)
from superset.tasks.schedules import (
AlertState,
deliver_alert,
evaluate_alert,
validate_observations,
)
from superset.utils import core as utils
from superset.views.alerts import (
AlertLogModelView,
AlertModelView,
AlertObservationModelView,
)
from tests.integration_tests.base_tests import SupersetTestCase
from tests.integration_tests.test_app import app
from tests.integration_tests.utils import read_fixture
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@pytest.yield_fixture(scope="module")
def setup_database():
with app.app_context():
example_database = superset.utils.database.get_example_database()
example_database.get_sqla_engine().execute(
"CREATE TABLE test_table AS SELECT 1 as first, 2 as second"
)
example_database.get_sqla_engine().execute(
"INSERT INTO test_table (first, second) VALUES (3, 4)"
)
yield db.session
db.session.query(SQLObservation).delete()
db.session.query(AlertLog).delete()
db.session.query(Alert).delete()
db.session.commit()
example_database.get_sqla_engine().execute("DROP TABLE test_table")
def create_alert(
db_session: Session,
sql: str,
validator_type: AlertValidatorType = AlertValidatorType.OPERATOR,
validator_config: str = "",
) -> Alert:
db_session.commit()
alert = Alert(
label="test_alert",
active=True,
crontab="* * * * *",
slice_id=db_session.query(Slice).all()[0].id,
recipients="recipient1@superset.com",
slack_channel="#test_channel",
sql=sql,
database_id=superset.utils.database.get_example_database().id,
validator_type=validator_type,
validator_config=validator_config,
)
db_session.add(alert)
db_session.commit()
return alert
@pytest.mark.parametrize(
"description, query, value",
[
("Test int SQL return", "SELECT 55", 55.0),
("Test double SQL return", "SELECT 30.0 as wage", 30.0),
("Test NULL result", "SELECT null as null_result", None),
(
"Test empty SQL return",
"SELECT first FROM test_table WHERE first = -1",
None,
),
(
"Test multi line query",
"""
-- comment
SELECT
1 -- comment
FROM test_table
WHERE first = 1
""",
1.0,
),
("Test jinja", "SELECT {{ 2 }}", 2.0),
],
)
def test_alert_observer_no_error_msg(setup_database, description, query, value):
logger.info(description)
db_session = setup_database
alert = create_alert(db_session, query)
observe(alert.id, db_session)
if value is None:
assert alert.observations[-1].value is None
else:
assert alert.observations[-1].value == value
assert alert.observations[-1].error_msg is None
@pytest.mark.parametrize(
"description, query",
[
("Test str result", "SELECT 'test_string' as string_value"),
("Test two row result", "SELECT first FROM test_table"),
(
"Test two column result",
"SELECT first, second FROM test_table WHERE first = 1",
),
],
)
def test_alert_observer_error_msg(setup_database, description, query):
logger.info(description)
db_session = setup_database
alert = create_alert(db_session, query)
observe(alert.id, db_session)
assert alert.observations[-1].value is None
assert alert.observations[-1].error_msg is not None
@patch("superset.tasks.schedules.deliver_alert")
def test_evaluate_alert(mock_deliver_alert, setup_database):
db_session = setup_database
# Test error with Observer SQL statement
alert1 = create_alert(db_session, "$%^&")
evaluate_alert(alert1.id, alert1.label, db_session)
assert alert1.logs[-1].state == AlertState.ERROR
# Test pass on alert lacking validator config
alert2 = create_alert(db_session, "SELECT 55")
# evaluation fails if config is malformed
with pytest.raises(json.decoder.JSONDecodeError):
evaluate_alert(alert2.id, alert2.label, db_session)
assert not alert2.logs
# Test triggering successful alert
alert3 = create_alert(db_session, "SELECT 55", "not null", "{}")
evaluate_alert(alert3.id, alert3.label, db_session)
assert mock_deliver_alert.call_count == 1
assert alert3.logs[-1].state == AlertState.TRIGGER
@pytest.mark.parametrize(
"description, validator_type, config",
[
("Test with invalid operator type", "greater than", "{}"),
("Test with empty config", "operator", "{}"),
("Test with invalid operator", "operator", '{"op": "is", "threshold":50.0}'),
(
"Test with invalid threshold",
"operator",
'{"op": "is", "threshold":"hello"}',
),
],
)
def test_check_validator_error(description, validator_type, config):
logger.info(description)
with pytest.raises(SupersetException):
check_validator(validator_type, config)
@pytest.mark.parametrize(
"description, validator_type, config",
[
(
"Test with float threshold and no errors",
"operator",
'{"op": ">=", "threshold": 50.0}',
),
(
"Test with int threshold and no errors",
"operator",
'{"op": ">=", "threshold": 50}',
),
],
)
def test_check_validator_no_error(description, validator_type, config):
logger.info(description)
assert check_validator(validator_type, config) is None
@pytest.mark.parametrize(
"description, query, value",
[
("Test passing with 'null' SQL result", "SELECT 0", False),
(
"Test passing with empty SQL result",
"SELECT first FROM test_table WHERE first = -1",
False,
),
("Test triggering alert with non-null SQL result", "SELECT 55", True),
],
)
def test_not_null_validator(setup_database, description, query, value):
logger.info(description)
db_session = setup_database
alert = create_alert(db_session, query)
observe(alert.id, db_session)
assert not_null_validator(alert, "{}") is value
def test_operator_validator(setup_database):
dbsession = setup_database
# Test passing with empty SQL result
alert1 = create_alert(dbsession, "SELECT first FROM test_table WHERE first = -1")
observe(alert1.id, dbsession)
assert operator_validator(alert1, '{"op": ">=", "threshold": 60}') is False
# ensure that 0 threshold works
assert operator_validator(alert1, '{"op": ">=", "threshold": 0}') is False
# Test passing with result that doesn't pass a greater than threshold
alert2 = create_alert(dbsession, "SELECT 55")
observe(alert2.id, dbsession)
assert operator_validator(alert2, '{"op": ">=", "threshold": 60}') is False
# Test passing with result that passes a greater than threshold
assert operator_validator(alert2, '{"op": ">=", "threshold": 40}') is True
# Test passing with result that doesn't pass a less than threshold
assert operator_validator(alert2, '{"op": "<=", "threshold": 40}') is False
# Test passing with result that passes threshold
assert operator_validator(alert2, '{"op": "<=", "threshold": 60}') is True
# Test passing with result that doesn't equal threshold
assert operator_validator(alert2, '{"op": "==", "threshold": 60}') is False
# Test passing with result that equals threshold
assert operator_validator(alert2, '{"op": "==", "threshold": 55}') is True
# Test passing with result that equals decimal threshold
assert operator_validator(alert2, '{"op": ">", "threshold": 54.999}') is True
@pytest.mark.parametrize(
"description, query, validator_type, config",
[
("Test False on alert with no validator", "SELECT 55", "operator", ""),
("Test False on alert with no observations", "SELECT 0", "not null", "{}"),
],
)
def test_validate_observations_no_observe(
setup_database, description, query, validator_type, config
):
db_session = setup_database
logger.info(description)
alert = create_alert(db_session, query, validator_type, config)
assert validate_observations(alert.id, alert.label, db_session) is False
@pytest.mark.parametrize(
"description, query, validator_type, config, expected",
[
(
"Test False on alert that should not be triggered",
"SELECT 0",
"not null",
"{}",
False,
),
(
"Test True on alert that should be triggered",
"SELECT 55",
"operator",
'{"op": "<=", "threshold": 60}',
True,
),
],
)
def test_validate_observations_with_observe(
setup_database, description, query, validator_type, config, expected
):
db_session = setup_database
logger.info(description)
alert = create_alert(db_session, query, validator_type, config)
observe(alert.id, db_session)
assert validate_observations(alert.id, alert.label, db_session) is expected
def test_validate_observations(setup_database):
db_session = setup_database
# Test False on alert that shouldnt be triggered
alert3 = create_alert(db_session, "SELECT 0", "not null", "{}")
observe(alert3.id, db_session)
assert validate_observations(alert3.id, alert3.label, db_session) is False
# Test True on alert that should be triggered
alert4 = create_alert(
db_session, "SELECT 55", "operator", '{"op": "<=", "threshold": 60}'
)
observe(alert4.id, db_session)
assert validate_observations(alert4.id, alert4.label, db_session) is True
@patch("superset.tasks.slack_util.WebClient.files_upload")
@patch("superset.tasks.schedules.send_email_smtp")
@patch("superset.tasks.schedules._get_url_path")
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
def test_deliver_alert_screenshot(
screenshot_mock, url_mock, email_mock, file_upload_mock, setup_database
):
dbsession = setup_database
alert = create_alert(dbsession, "SELECT 55", "not null", "{}")
observe(alert.id, dbsession)
screenshot = read_fixture("sample.png")
screenshot_mock.return_value = screenshot
# TODO: fix AlertModelView.show url call from test
url_mock.side_effect = [
f"http://0.0.0.0:8080/alerts/show/{alert.id}",
f"http://0.0.0.0:8080/superset/slice/{alert.slice_id}/",
]
deliver_alert(alert.id, dbsession)
assert email_mock.call_args[1]["images"]["screenshot"] == screenshot
assert file_upload_mock.call_args[1] == {
"channels": alert.slack_channel,
"file": screenshot,
"initial_comment": f"\n*Triggered Alert: {alert.label} :redalert:*\n"
f"*Query*:```{alert.sql}```\n"
f"*Result*: {alert.observations[-1].value}\n"
f"*Reason*: {alert.observations[-1].value} {alert.pretty_config}\n"
f"<http://0.0.0.0:8080/alerts/show/{alert.id}"
f"|View Alert Details>\n<http://0.0.0.0:8080/superset/slice/{alert.slice_id}/"
"|*Explore in Superset*>",
"title": f"[Alert] {alert.label}",
}
class TestAlertsEndpoints(SupersetTestCase):
def test_log_model_view_disabled(self):
with patch.object(AlertLogModelView, "is_enabled", return_value=False):
self.login("admin")
uri = "/alertlogmodelview/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_log_model_view_enabled(self):
with patch.object(AlertLogModelView, "is_enabled", return_value=True):
self.login("admin")
uri = "/alertlogmodelview/list/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)
def test_model_view_disabled(self):
with patch.object(AlertModelView, "is_enabled", return_value=False):
self.login("admin")
uri = "/alerts/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_model_view_enabled(self):
with patch.object(AlertModelView, "is_enabled", return_value=True):
self.login("admin")
uri = "/alerts/list/"
rv = self.client.get(uri)
self.assertNotEqual(rv.status_code, 404)
def test_observation_view_disabled(self):
with patch.object(AlertObservationModelView, "is_enabled", return_value=False):
self.login("admin")
uri = "/alertobservationmodelview/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_observation_view_enabled(self):
with patch.object(AlertObservationModelView, "is_enabled", return_value=True):
self.login("admin")
uri = "/alertobservationmodelview/list/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)

View File

@ -1,596 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# isort:skip_file
from datetime import datetime, timedelta
from superset.views.schedules import DashboardEmailScheduleView, SliceEmailScheduleView
from unittest.mock import Mock, patch, PropertyMock
from flask_babel import gettext as __
import pytest
from selenium.common.exceptions import WebDriverException
from slack import errors, WebClient
from tests.integration_tests.fixtures.world_bank_dashboard import (
load_world_bank_dashboard_with_slices,
load_world_bank_data,
)
from tests.integration_tests.test_app import app
from superset import db
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
DashboardEmailSchedule,
EmailDeliveryType,
SliceEmailReportFormat,
SliceEmailSchedule,
)
from superset.tasks.schedules import (
create_webdriver,
deliver_dashboard,
deliver_slice,
next_schedules,
)
from superset.models.slice import Slice
from tests.integration_tests.base_tests import SupersetTestCase
from tests.integration_tests.utils import read_fixture
class TestSchedules(SupersetTestCase):
RECIPIENTS = "recipient1@superset.com, recipient2@superset.com"
BCC = "bcc@superset.com"
CSV = read_fixture("trends.csv")
@pytest.fixture()
def add_schedule_slice_and_dashboard(self):
with app.app_context():
self.common_data = dict(
active=True,
crontab="* * * * *",
recipients=self.RECIPIENTS,
deliver_as_group=True,
delivery_type=EmailDeliveryType.inline,
)
# Pick up a sample slice and dashboard
slice = db.session.query(Slice).filter_by(slice_name="Region Filter").one()
dashboard = (
db.session.query(Dashboard)
.filter_by(dashboard_title="World Bank's Data")
.one()
)
dashboard_schedule = DashboardEmailSchedule(**self.common_data)
dashboard_schedule.dashboard_id = dashboard.id
dashboard_schedule.user_id = 1
db.session.add(dashboard_schedule)
slice_schedule = SliceEmailSchedule(**self.common_data)
slice_schedule.slice_id = slice.id
slice_schedule.user_id = 1
slice_schedule.email_format = SliceEmailReportFormat.data
slice_schedule.slack_channel = "#test_channel"
db.session.add(slice_schedule)
db.session.commit()
self.slice_schedule = slice_schedule.id
self.dashboard_schedule = dashboard_schedule.id
yield
with app.app_context():
db.session.query(SliceEmailSchedule).filter_by(
id=self.slice_schedule
).delete()
db.session.query(DashboardEmailSchedule).filter_by(
id=self.dashboard_schedule
).delete()
db.session.commit()
def test_crontab_scheduler(self):
crontab = "* * * * *"
start_at = datetime.now().replace(microsecond=0, second=0, minute=0)
stop_at = start_at + timedelta(seconds=3600)
# Fire off the task every minute
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0))
self.assertEqual(schedules[0], start_at)
self.assertEqual(schedules[-1], stop_at - timedelta(seconds=60))
self.assertEqual(len(schedules), 60)
# Fire off the task every 10 minutes, controlled via resolution
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=10 * 60))
self.assertEqual(schedules[0], start_at)
self.assertEqual(schedules[-1], stop_at - timedelta(seconds=10 * 60))
self.assertEqual(len(schedules), 6)
# Fire off the task every 12 minutes, controlled via resolution
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=12 * 60))
self.assertEqual(schedules[0], start_at)
self.assertEqual(schedules[-1], stop_at - timedelta(seconds=12 * 60))
self.assertEqual(len(schedules), 5)
def test_wider_schedules(self):
crontab = "*/15 2,10 * * *"
for hour in range(0, 24):
start_at = datetime.now().replace(
microsecond=0, second=0, minute=0, hour=hour
)
stop_at = start_at + timedelta(seconds=3600)
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0))
if hour in (2, 10):
self.assertEqual(len(schedules), 4)
else:
self.assertEqual(len(schedules), 0)
def test_complex_schedule(self):
# Run the job on every Friday of March and May
# On these days, run the job at
# 5:10 pm
# 5:11 pm
# 5:12 pm
# 5:13 pm
# 5:14 pm
# 5:15 pm
# 5:25 pm
# 5:28 pm
# 5:31 pm
# 5:34 pm
# 5:37 pm
# 5:40 pm
crontab = "10-15,25-40/3 17 * 3,5 5"
start_at = datetime.strptime("2018/01/01", "%Y/%m/%d")
stop_at = datetime.strptime("2018/12/31", "%Y/%m/%d")
schedules = list(next_schedules(crontab, start_at, stop_at, resolution=60))
self.assertEqual(len(schedules), 108)
fmt = "%Y-%m-%d %H:%M:%S"
self.assertEqual(schedules[0], datetime.strptime("2018-03-02 17:10:00", fmt))
self.assertEqual(schedules[-1], datetime.strptime("2018-05-25 17:40:00", fmt))
self.assertEqual(schedules[59], datetime.strptime("2018-03-30 17:40:00", fmt))
self.assertEqual(schedules[60], datetime.strptime("2018-05-04 17:10:00", fmt))
@patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
def test_create_driver(self, mock_driver_class):
mock_driver = Mock()
mock_driver_class.return_value = mock_driver
mock_driver.find_elements_by_id.side_effect = [True, False]
create_webdriver(db.session)
mock_driver.add_cookie.assert_called_once()
@pytest.mark.usefixtures(
"load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard"
)
@patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
@patch("superset.tasks.schedules.send_email_smtp")
@patch("superset.tasks.schedules.time")
def test_deliver_dashboard_inline(self, mtime, send_email_smtp, driver_class):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture("sample.png")
schedule = (
db.session.query(DashboardEmailSchedule)
.filter_by(id=self.dashboard_schedule)
.one()
)
deliver_dashboard(
schedule.dashboard_id,
schedule.recipients,
schedule.slack_channel,
schedule.delivery_type,
schedule.deliver_as_group,
)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
send_email_smtp.assert_called_once()
@pytest.mark.usefixtures(
"load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard"
)
@patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
@patch("superset.tasks.schedules.send_email_smtp")
@patch("superset.tasks.schedules.time")
def test_deliver_dashboard_as_attachment(
self, mtime, send_email_smtp, driver_class
):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_id.return_value = element
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture("sample.png")
schedule = (
db.session.query(DashboardEmailSchedule)
.filter_by(id=self.dashboard_schedule)
.one()
)
schedule.delivery_type = EmailDeliveryType.attachment
deliver_dashboard(
schedule.dashboard_id,
schedule.recipients,
schedule.slack_channel,
schedule.delivery_type,
schedule.deliver_as_group,
)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
send_email_smtp.assert_called_once()
self.assertIsNone(send_email_smtp.call_args[1]["images"])
self.assertEqual(
send_email_smtp.call_args[1]["data"]["screenshot"],
element.screenshot_as_png,
)
@pytest.mark.usefixtures(
"load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard"
)
@patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
@patch("superset.tasks.schedules.send_email_smtp")
@patch("superset.tasks.schedules.time")
def test_dashboard_chrome_like(self, mtime, send_email_smtp, driver_class):
# Test functionality for chrome driver which does not support
# element snapshots
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
type(element).screenshot_as_png = PropertyMock(side_effect=WebDriverException)
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_id.return_value = element
driver.find_element_by_class_name.return_value = element
driver.screenshot.return_value = read_fixture("sample.png")
schedule = (
db.session.query(DashboardEmailSchedule)
.filter_by(id=self.dashboard_schedule)
.one()
)
deliver_dashboard(
schedule.dashboard_id,
schedule.recipients,
schedule.slack_channel,
schedule.delivery_type,
schedule.deliver_as_group,
)
mtime.sleep.assert_called_once()
driver.screenshot.assert_called_once()
send_email_smtp.assert_called_once()
self.assertEqual(send_email_smtp.call_args[0][0], self.RECIPIENTS)
self.assertEqual(
list(send_email_smtp.call_args[1]["images"].values())[0],
driver.screenshot.return_value,
)
@pytest.mark.usefixtures(
"load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard"
)
@patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
@patch("superset.tasks.schedules.send_email_smtp")
@patch("superset.tasks.schedules.time")
def test_deliver_email_options(self, mtime, send_email_smtp, driver_class):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture("sample.png")
schedule = (
db.session.query(DashboardEmailSchedule)
.filter_by(id=self.dashboard_schedule)
.one()
)
# Send individual mails to the group
schedule.deliver_as_group = False
# Set a bcc email address
app.config["EMAIL_REPORT_BCC_ADDRESS"] = self.BCC
deliver_dashboard(
schedule.dashboard_id,
schedule.recipients,
schedule.slack_channel,
schedule.delivery_type,
schedule.deliver_as_group,
)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
self.assertEqual(send_email_smtp.call_count, 2)
self.assertEqual(send_email_smtp.call_args[1]["bcc"], self.BCC)
@pytest.mark.usefixtures(
"load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard"
)
@patch("superset.tasks.slack_util.WebClient.files_upload")
@patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
@patch("superset.tasks.schedules.send_email_smtp")
@patch("superset.tasks.schedules.time")
def test_deliver_slice_inline_image(
self, mtime, send_email_smtp, driver_class, files_upload
):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture("sample.png")
schedule = (
db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one()
)
schedule.email_format = SliceEmailReportFormat.visualization
schedule.delivery_format = EmailDeliveryType.inline
deliver_slice(
schedule.slice_id,
schedule.recipients,
schedule.slack_channel,
schedule.delivery_type,
schedule.email_format,
schedule.deliver_as_group,
db.session,
)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
send_email_smtp.assert_called_once()
self.assertEqual(
list(send_email_smtp.call_args[1]["images"].values())[0],
element.screenshot_as_png,
)
self.assertEqual(
files_upload.call_args[1],
{
"channels": "#test_channel",
"file": element.screenshot_as_png,
"initial_comment": f"\n *Region Filter*\n\n <http://0.0.0.0:8080/superset/slice/{schedule.slice_id}/|Explore in Superset>\n ",
"title": "[Report] Region Filter",
},
)
@pytest.mark.usefixtures(
"load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard"
)
@patch("superset.tasks.slack_util.WebClient.files_upload")
@patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
@patch("superset.tasks.schedules.send_email_smtp")
@patch("superset.tasks.schedules.time")
def test_deliver_slice_attachment(
self, mtime, send_email_smtp, driver_class, files_upload
):
element = Mock()
driver = Mock()
mtime.sleep.return_value = None
driver_class.return_value = driver
# Ensure that we are able to login with the driver
driver.find_elements_by_id.side_effect = [True, False]
driver.find_element_by_class_name.return_value = element
element.screenshot_as_png = read_fixture("sample.png")
schedule = (
db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one()
)
schedule.email_format = SliceEmailReportFormat.visualization
schedule.delivery_type = EmailDeliveryType.attachment
deliver_slice(
schedule.slice_id,
schedule.recipients,
schedule.slack_channel,
schedule.delivery_type,
schedule.email_format,
schedule.deliver_as_group,
db.session,
)
mtime.sleep.assert_called_once()
driver.screenshot.assert_not_called()
send_email_smtp.assert_called_once()
self.assertEqual(
send_email_smtp.call_args[1]["data"]["screenshot"],
element.screenshot_as_png,
)
self.assertEqual(
files_upload.call_args[1],
{
"channels": "#test_channel",
"file": element.screenshot_as_png,
"initial_comment": f"\n *Region Filter*\n\n <http://0.0.0.0:8080/superset/slice/{schedule.slice_id}/|Explore in Superset>\n ",
"title": "[Report] Region Filter",
},
)
@pytest.mark.usefixtures(
"load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard"
)
@patch("superset.tasks.slack_util.WebClient.files_upload")
@patch("superset.tasks.schedules.urllib.request.OpenerDirector.open")
@patch("superset.tasks.schedules.urllib.request.urlopen")
@patch("superset.tasks.schedules.send_email_smtp")
def test_deliver_slice_csv_attachment(
self, send_email_smtp, mock_open, mock_urlopen, files_upload
):
response = Mock()
mock_open.return_value = response
mock_urlopen.return_value = response
mock_urlopen.return_value.getcode.return_value = 200
response.read.return_value = self.CSV
schedule = (
db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one()
)
schedule.email_format = SliceEmailReportFormat.data
schedule.delivery_type = EmailDeliveryType.attachment
deliver_slice(
schedule.slice_id,
schedule.recipients,
schedule.slack_channel,
schedule.delivery_type,
schedule.email_format,
schedule.deliver_as_group,
db.session,
)
send_email_smtp.assert_called_once()
file_name = __("%(name)s.csv", name=schedule.slice.slice_name)
self.assertEqual(send_email_smtp.call_args[1]["data"][file_name], self.CSV)
self.assertEqual(
files_upload.call_args[1],
{
"channels": "#test_channel",
"file": self.CSV,
"initial_comment": f"\n *Region Filter*\n\n <http://0.0.0.0:8080/superset/slice/{schedule.slice_id}/|Explore in Superset>\n ",
"title": "[Report] Region Filter",
},
)
@pytest.mark.usefixtures(
"load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard"
)
@patch("superset.tasks.slack_util.WebClient.files_upload")
@patch("superset.tasks.schedules.urllib.request.urlopen")
@patch("superset.tasks.schedules.urllib.request.OpenerDirector.open")
@patch("superset.tasks.schedules.send_email_smtp")
def test_deliver_slice_csv_inline(
self, send_email_smtp, mock_open, mock_urlopen, files_upload
):
response = Mock()
mock_open.return_value = response
mock_urlopen.return_value = response
mock_urlopen.return_value.getcode.return_value = 200
response.read.return_value = self.CSV
schedule = (
db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one()
)
schedule.email_format = SliceEmailReportFormat.data
schedule.delivery_type = EmailDeliveryType.inline
deliver_slice(
schedule.slice_id,
schedule.recipients,
schedule.slack_channel,
schedule.delivery_type,
schedule.email_format,
schedule.deliver_as_group,
db.session,
)
send_email_smtp.assert_called_once()
self.assertIsNone(send_email_smtp.call_args[1]["data"])
self.assertTrue("<table " in send_email_smtp.call_args[0][2])
self.assertEqual(
files_upload.call_args[1],
{
"channels": "#test_channel",
"file": self.CSV,
"initial_comment": f"\n *Region Filter*\n\n <http://0.0.0.0:8080/superset/slice/{schedule.slice_id}/|Explore in Superset>\n ",
"title": "[Report] Region Filter",
},
)
def test_dashboard_disabled(self):
with patch.object(DashboardEmailScheduleView, "is_enabled", return_value=False):
self.login("admin")
uri = "/dashboardemailscheduleview/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_dashboard_enabled(self):
with patch.object(DashboardEmailScheduleView, "is_enabled", return_value=True):
self.login("admin")
uri = "/dashboardemailscheduleview/list/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)
def test_slice_disabled(self):
with patch.object(SliceEmailScheduleView, "is_enabled", return_value=False):
self.login("admin")
uri = "/sliceemailscheduleview/list/"
rv = self.client.get(uri)
self.assertEqual(rv.status_code, 404)
def test_slice_enabled(self):
with patch.object(SliceEmailScheduleView, "is_enabled", return_value=True):
self.login("admin")
uri = "/sliceemailscheduleview/list/"
rv = self.client.get(uri)
self.assertLess(rv.status_code, 400)
def test_slack_client_compatibility():
c2 = WebClient()
# slackclient >2.5.0 raises TypeError: a bytes-like object is required, not 'str
# and requires to path a filepath instead of the bytes directly
with pytest.raises(errors.SlackApiError):
c2.files_upload(channels="#bogdan-test2", file=b"blabla", title="Test upload")