mirror of https://github.com/apache/superset.git
feat: Alerts! allowing users to set SQL-based email alerts with screenshots (#9944)
* feat: add support for alerts * Add ModelViews * First pass at adding scheduled Alert support * Fix syntax errors * Add log_retention and database to model * Improving screenshots * Still refactoring screenshots * Pass down thumb_size properly * Progress on screenshot endpoint * Add alerts.last_eval_dttm and alert query logic * Getting ready to split compute_screenshot and screenshot/{SHA} * split the endpoints * Improving alerting loop * empty methods * Add CLI command 'superset alert' that runs a sched loop * Getting things to work * email delivery * A working email\! * Add feature flag * Add some description_columns to AlertModelView * Little tweaks * Use database.get_df, eval cells for truthiness * Migrate thumbnail/screenshot functions to use new arg pattern * Addressing PR feedback * Update alerts DB migration down_revision * Resolve _deliver_email arg conflict * Make mypy happy * Make isort happy * Make pylint happy Co-authored-by: Rob DiCiuccio <rob.diciuccio@gmail.com>
This commit is contained in:
parent
b7c45fed80
commit
318e5347bc
|
@ -38,7 +38,7 @@ combine_as_imports = true
|
||||||
include_trailing_comma = true
|
include_trailing_comma = true
|
||||||
line_length = 88
|
line_length = 88
|
||||||
known_first_party = superset
|
known_first_party = superset
|
||||||
known_third_party =alembic,apispec,backoff,bleach,cachelib,celery,click,colorama,contextlib2,croniter,cryptography,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,geohash,geopy,humanize,isodate,jinja2,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parameterized,parsedatetime,pathlib2,polyline,prison,pyarrow,pyhive,pytz,retry,selenium,setuptools,simplejson,slack,sphinx_rtd_theme,sqlalchemy,sqlalchemy_utils,sqlparse,werkzeug,wtforms,wtforms_json,yaml
|
known_third_party =alembic,apispec,backoff,bleach,cachelib,celery,click,colorama,contextlib2,croniter,cryptography,dataclasses,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,geohash,geopy,humanize,isodate,jinja2,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parameterized,parsedatetime,pathlib2,polyline,prison,pyarrow,pyhive,pytz,retry,selenium,setuptools,simplejson,slack,sphinx_rtd_theme,sqlalchemy,sqlalchemy_utils,sqlparse,werkzeug,wtforms,wtforms_json,yaml
|
||||||
multi_line_output = 3
|
multi_line_output = 3
|
||||||
order_by_type = false
|
order_by_type = false
|
||||||
|
|
||||||
|
|
|
@ -171,6 +171,10 @@ class SupersetAppInitializer:
|
||||||
DashboardEmailScheduleView,
|
DashboardEmailScheduleView,
|
||||||
SliceEmailScheduleView,
|
SliceEmailScheduleView,
|
||||||
)
|
)
|
||||||
|
from superset.views.alerts import (
|
||||||
|
AlertModelView,
|
||||||
|
AlertLogModelView,
|
||||||
|
)
|
||||||
from superset.views.sql_lab import (
|
from superset.views.sql_lab import (
|
||||||
QueryView,
|
QueryView,
|
||||||
SavedQueryViewApi,
|
SavedQueryViewApi,
|
||||||
|
@ -395,6 +399,17 @@ class SupersetAppInitializer:
|
||||||
icon="fa-search",
|
icon="fa-search",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if self.config["ENABLE_ALERTS"]:
|
||||||
|
appbuilder.add_view(
|
||||||
|
AlertModelView,
|
||||||
|
"Alerts",
|
||||||
|
label=__("Alerts"),
|
||||||
|
category="Manage",
|
||||||
|
category_label=__("Manage"),
|
||||||
|
icon="fa-exclamation-triangle",
|
||||||
|
)
|
||||||
|
appbuilder.add_view_no_menu(AlertLogModelView)
|
||||||
|
|
||||||
#
|
#
|
||||||
# Conditionally add Access Request Model View
|
# Conditionally add Access Request Model View
|
||||||
#
|
#
|
||||||
|
|
|
@ -49,6 +49,7 @@ from superset.charts.schemas import (
|
||||||
ChartPutSchema,
|
ChartPutSchema,
|
||||||
get_delete_ids_schema,
|
get_delete_ids_schema,
|
||||||
openapi_spec_methods_override,
|
openapi_spec_methods_override,
|
||||||
|
screenshot_query_schema,
|
||||||
thumbnail_query_schema,
|
thumbnail_query_schema,
|
||||||
)
|
)
|
||||||
from superset.constants import RouteMethod
|
from superset.constants import RouteMethod
|
||||||
|
@ -58,6 +59,7 @@ from superset.models.slice import Slice
|
||||||
from superset.tasks.thumbnails import cache_chart_thumbnail
|
from superset.tasks.thumbnails import cache_chart_thumbnail
|
||||||
from superset.utils.core import ChartDataResultFormat, json_int_dttm_ser
|
from superset.utils.core import ChartDataResultFormat, json_int_dttm_ser
|
||||||
from superset.utils.screenshots import ChartScreenshot
|
from superset.utils.screenshots import ChartScreenshot
|
||||||
|
from superset.utils.urls import get_url_path
|
||||||
from superset.views.base_api import (
|
from superset.views.base_api import (
|
||||||
BaseSupersetModelRestApi,
|
BaseSupersetModelRestApi,
|
||||||
RelatedFieldFilter,
|
RelatedFieldFilter,
|
||||||
|
@ -166,7 +168,11 @@ class ChartRestApi(BaseSupersetModelRestApi):
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
if is_feature_enabled("THUMBNAILS"):
|
if is_feature_enabled("THUMBNAILS"):
|
||||||
self.include_route_methods = self.include_route_methods | {"thumbnail"}
|
self.include_route_methods = self.include_route_methods | {
|
||||||
|
"thumbnail",
|
||||||
|
"screenshot",
|
||||||
|
"cache_screenshot",
|
||||||
|
}
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
@expose("/", methods=["POST"])
|
@expose("/", methods=["POST"])
|
||||||
|
@ -479,18 +485,16 @@ class ChartRestApi(BaseSupersetModelRestApi):
|
||||||
|
|
||||||
raise self.response_400(message=f"Unsupported result_format: {result_format}")
|
raise self.response_400(message=f"Unsupported result_format: {result_format}")
|
||||||
|
|
||||||
@expose("/<pk>/thumbnail/<digest>/", methods=["GET"])
|
@expose("/<pk>/cache_screenshot/", methods=["GET"])
|
||||||
@protect()
|
@protect()
|
||||||
@rison(thumbnail_query_schema)
|
@rison(screenshot_query_schema)
|
||||||
@safe
|
@safe
|
||||||
@statsd_metrics
|
@statsd_metrics
|
||||||
def thumbnail(
|
def cache_screenshot(self, pk: int, **kwargs: Dict[str, bool]) -> WerkzeugResponse:
|
||||||
self, pk: int, digest: str, **kwargs: Dict[str, bool]
|
"""Get Chart screenshot
|
||||||
) -> WerkzeugResponse:
|
|
||||||
"""Get Chart thumbnail
|
|
||||||
---
|
---
|
||||||
get:
|
get:
|
||||||
description: Compute or get already computed chart thumbnail from cache.
|
description: Compute or get already computed screenshot from cache.
|
||||||
parameters:
|
parameters:
|
||||||
- in: path
|
- in: path
|
||||||
schema:
|
schema:
|
||||||
|
@ -500,6 +504,83 @@ class ChartRestApi(BaseSupersetModelRestApi):
|
||||||
schema:
|
schema:
|
||||||
type: string
|
type: string
|
||||||
name: sha
|
name: sha
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: Chart thumbnail image
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
cache_key:
|
||||||
|
type: string
|
||||||
|
chart_url:
|
||||||
|
type: string
|
||||||
|
image_url:
|
||||||
|
type: string
|
||||||
|
302:
|
||||||
|
description: Redirects to the current digest
|
||||||
|
400:
|
||||||
|
$ref: '#/components/responses/400'
|
||||||
|
401:
|
||||||
|
$ref: '#/components/responses/401'
|
||||||
|
404:
|
||||||
|
$ref: '#/components/responses/404'
|
||||||
|
500:
|
||||||
|
$ref: '#/components/responses/500'
|
||||||
|
"""
|
||||||
|
rison_dict = kwargs["rison"]
|
||||||
|
window_size = rison_dict.get("window_size") or (800, 600)
|
||||||
|
|
||||||
|
# Don't shrink the image if thumb_size is not specified
|
||||||
|
thumb_size = rison_dict.get("thumb_size") or window_size
|
||||||
|
|
||||||
|
chart = self.datamodel.get(pk, self._base_filters)
|
||||||
|
if not chart:
|
||||||
|
return self.response_404()
|
||||||
|
|
||||||
|
chart_url = get_url_path("Superset.slice", slice_id=chart.id, standalone="true")
|
||||||
|
screenshot_obj = ChartScreenshot(chart_url, chart.digest)
|
||||||
|
cache_key = screenshot_obj.cache_key(window_size, thumb_size)
|
||||||
|
image_url = get_url_path(
|
||||||
|
"ChartRestApi.screenshot", pk=chart.id, digest=cache_key
|
||||||
|
)
|
||||||
|
|
||||||
|
def trigger_celery() -> WerkzeugResponse:
|
||||||
|
logger.info("Triggering screenshot ASYNC")
|
||||||
|
kwargs = {
|
||||||
|
"url": chart_url,
|
||||||
|
"digest": chart.digest,
|
||||||
|
"force": True,
|
||||||
|
"window_size": window_size,
|
||||||
|
"thumb_size": thumb_size,
|
||||||
|
}
|
||||||
|
cache_chart_thumbnail.delay(**kwargs)
|
||||||
|
return self.response(
|
||||||
|
202, cache_key=cache_key, chart_url=chart_url, image_url=image_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
return trigger_celery()
|
||||||
|
|
||||||
|
@expose("/<pk>/screenshot/<digest>/", methods=["GET"])
|
||||||
|
@protect()
|
||||||
|
@rison(screenshot_query_schema)
|
||||||
|
@safe
|
||||||
|
@statsd_metrics
|
||||||
|
def screenshot(self, pk: int, digest: str) -> WerkzeugResponse:
|
||||||
|
"""Get Chart screenshot
|
||||||
|
---
|
||||||
|
get:
|
||||||
|
description: Get a computed screenshot from cache.
|
||||||
|
parameters:
|
||||||
|
- in: path
|
||||||
|
schema:
|
||||||
|
type: integer
|
||||||
|
name: pk
|
||||||
|
- in: path
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
name: digest
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
description: Chart thumbnail image
|
description: Chart thumbnail image
|
||||||
|
@ -520,16 +601,83 @@ class ChartRestApi(BaseSupersetModelRestApi):
|
||||||
$ref: '#/components/responses/500'
|
$ref: '#/components/responses/500'
|
||||||
"""
|
"""
|
||||||
chart = self.datamodel.get(pk, self._base_filters)
|
chart = self.datamodel.get(pk, self._base_filters)
|
||||||
|
|
||||||
|
# Making sure the chart still exists
|
||||||
if not chart:
|
if not chart:
|
||||||
return self.response_404()
|
return self.response_404()
|
||||||
|
|
||||||
|
# TODO make sure the user has access to the chart
|
||||||
|
|
||||||
|
# fetch the chart screenshot using the current user and cache if set
|
||||||
|
img = ChartScreenshot.get_from_cache_key(thumbnail_cache, digest)
|
||||||
|
if img:
|
||||||
|
return Response(
|
||||||
|
FileWrapper(img), mimetype="image/png", direct_passthrough=True
|
||||||
|
)
|
||||||
|
# TODO: return an empty image
|
||||||
|
return self.response_404()
|
||||||
|
|
||||||
|
@expose("/<pk>/thumbnail/<digest>/", methods=["GET"])
|
||||||
|
@protect()
|
||||||
|
@rison(thumbnail_query_schema)
|
||||||
|
@safe
|
||||||
|
@statsd_metrics
|
||||||
|
def thumbnail(
|
||||||
|
self, pk: int, digest: str, **kwargs: Dict[str, bool]
|
||||||
|
) -> WerkzeugResponse:
|
||||||
|
"""Get Chart thumbnail
|
||||||
|
---
|
||||||
|
get:
|
||||||
|
description: Compute or get already computed chart thumbnail from cache.
|
||||||
|
parameters:
|
||||||
|
- in: path
|
||||||
|
schema:
|
||||||
|
type: integer
|
||||||
|
name: pk
|
||||||
|
- in: path
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
name: digest
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: Chart thumbnail image
|
||||||
|
/content:
|
||||||
|
image/*:
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
format: binary
|
||||||
|
302:
|
||||||
|
description: Redirects to the current digest
|
||||||
|
400:
|
||||||
|
$ref: '#/components/responses/400'
|
||||||
|
401:
|
||||||
|
$ref: '#/components/responses/401'
|
||||||
|
404:
|
||||||
|
$ref: '#/components/responses/404'
|
||||||
|
500:
|
||||||
|
$ref: '#/components/responses/500'
|
||||||
|
"""
|
||||||
|
chart = self.datamodel.get(pk, self._base_filters)
|
||||||
|
if not chart:
|
||||||
|
return self.response_404()
|
||||||
|
|
||||||
|
url = get_url_path("Superset.slice", slice_id=chart.id, standalone="true")
|
||||||
if kwargs["rison"].get("force", False):
|
if kwargs["rison"].get("force", False):
|
||||||
cache_chart_thumbnail.delay(chart.id, force=True)
|
logger.info(
|
||||||
|
"Triggering thumbnail compute (chart id: %s) ASYNC", str(chart.id)
|
||||||
|
)
|
||||||
|
cache_chart_thumbnail.delay(url, chart.digest, force=True)
|
||||||
return self.response(202, message="OK Async")
|
return self.response(202, message="OK Async")
|
||||||
# fetch the chart screenshot using the current user and cache if set
|
# fetch the chart screenshot using the current user and cache if set
|
||||||
screenshot = ChartScreenshot(pk).get_from_cache(cache=thumbnail_cache)
|
screenshot = ChartScreenshot(url, chart.digest).get_from_cache(
|
||||||
|
cache=thumbnail_cache
|
||||||
|
)
|
||||||
# If not screenshot then send request to compute thumb to celery
|
# If not screenshot then send request to compute thumb to celery
|
||||||
if not screenshot:
|
if not screenshot:
|
||||||
cache_chart_thumbnail.delay(chart.id, force=True)
|
logger.info(
|
||||||
|
"Triggering thumbnail compute (chart id: %s) ASYNC", str(chart.id)
|
||||||
|
)
|
||||||
|
cache_chart_thumbnail.delay(url, chart.digest, force=True)
|
||||||
return self.response(202, message="OK Async")
|
return self.response(202, message="OK Async")
|
||||||
# If digests
|
# If digests
|
||||||
if chart.digest != digest:
|
if chart.digest != digest:
|
||||||
|
|
|
@ -28,9 +28,22 @@ from superset.utils import core as utils
|
||||||
# RISON/JSON schemas for query parameters
|
# RISON/JSON schemas for query parameters
|
||||||
#
|
#
|
||||||
get_delete_ids_schema = {"type": "array", "items": {"type": "integer"}}
|
get_delete_ids_schema = {"type": "array", "items": {"type": "integer"}}
|
||||||
|
width_height_schema = {
|
||||||
|
"type": "array",
|
||||||
|
"items": [{"type": "integer"}, {"type": "integer"},],
|
||||||
|
}
|
||||||
thumbnail_query_schema = {
|
thumbnail_query_schema = {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"properties": {"force": {"type": "boolean"}},
|
"properties": {"force": {"type": "boolean"},},
|
||||||
|
}
|
||||||
|
|
||||||
|
screenshot_query_schema = {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"force": {"type": "boolean"},
|
||||||
|
"window_size": width_height_schema,
|
||||||
|
"thumb_size": width_height_schema,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#
|
#
|
||||||
|
@ -85,7 +98,6 @@ openapi_spec_methods_override = {
|
||||||
"get": {"description": "Get a list of all possible owners for a chart."}
|
"get": {"description": "Get a list of all possible owners for a chart."}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
""" Overrides GET methods OpenApi descriptions """
|
|
||||||
|
|
||||||
|
|
||||||
def validate_json(value: Union[bytes, bytearray, str]) -> None:
|
def validate_json(value: Union[bytes, bytearray, str]) -> None:
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
from subprocess import Popen
|
from subprocess import Popen
|
||||||
from sys import stdout
|
from sys import stdout
|
||||||
from typing import Any, Dict, Type, Union
|
from typing import Any, Dict, Type, Union
|
||||||
|
@ -34,6 +34,7 @@ from superset import app, appbuilder, security_manager
|
||||||
from superset.app import create_app
|
from superset.app import create_app
|
||||||
from superset.extensions import celery_app, db
|
from superset.extensions import celery_app, db
|
||||||
from superset.utils import core as utils
|
from superset.utils import core as utils
|
||||||
|
from superset.utils.urls import get_url_path
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -524,7 +525,13 @@ def compute_thumbnails(
|
||||||
action = "Processing"
|
action = "Processing"
|
||||||
msg = f'{action} {friendly_type} "{model}" ({i+1}/{count})'
|
msg = f'{action} {friendly_type} "{model}" ({i+1}/{count})'
|
||||||
click.secho(msg, fg="green")
|
click.secho(msg, fg="green")
|
||||||
func(model.id, force=force)
|
if friendly_type == "chart":
|
||||||
|
url = get_url_path(
|
||||||
|
"Superset.slice", slice_id=model.id, standalone="true"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
url = get_url_path("Superset.dashboard", dashboard_id=model.id)
|
||||||
|
func(url, model.digest, force=force)
|
||||||
|
|
||||||
if not charts_only:
|
if not charts_only:
|
||||||
compute_generic_thumbnail(
|
compute_generic_thumbnail(
|
||||||
|
@ -601,3 +608,17 @@ def sync_tags() -> None:
|
||||||
add_types(db.engine, metadata)
|
add_types(db.engine, metadata)
|
||||||
add_owners(db.engine, metadata)
|
add_owners(db.engine, metadata)
|
||||||
add_favorites(db.engine, metadata)
|
add_favorites(db.engine, metadata)
|
||||||
|
|
||||||
|
|
||||||
|
@superset.command()
|
||||||
|
@with_appcontext
|
||||||
|
def alert() -> None:
|
||||||
|
"""Run the alert scheduler loop"""
|
||||||
|
# this command is just for testing purposes
|
||||||
|
from superset.tasks.schedules import schedule_window
|
||||||
|
from superset.models.schedules import ScheduleType
|
||||||
|
|
||||||
|
click.secho("Processing one alert loop", fg="green")
|
||||||
|
schedule_window(
|
||||||
|
ScheduleType.alert, datetime.now() - timedelta(1000), datetime.now(), 6000
|
||||||
|
)
|
||||||
|
|
|
@ -734,6 +734,11 @@ SQL_QUERY_MUTATOR = None
|
||||||
# Enable / disable scheduled email reports
|
# Enable / disable scheduled email reports
|
||||||
ENABLE_SCHEDULED_EMAIL_REPORTS = False
|
ENABLE_SCHEDULED_EMAIL_REPORTS = False
|
||||||
|
|
||||||
|
# Enable / disable Alerts, where users can define custom SQL that
|
||||||
|
# will send emails with screenshots of charts or dashboards periodically
|
||||||
|
# if it meets the criteria
|
||||||
|
ENABLE_ALERTS = False
|
||||||
|
|
||||||
# Slack API token for the superset reports
|
# Slack API token for the superset reports
|
||||||
SLACK_API_TOKEN = None
|
SLACK_API_TOKEN = None
|
||||||
SLACK_PROXY = None
|
SLACK_PROXY = None
|
||||||
|
|
|
@ -51,6 +51,7 @@ from superset.dashboards.schemas import (
|
||||||
from superset.models.dashboard import Dashboard
|
from superset.models.dashboard import Dashboard
|
||||||
from superset.tasks.thumbnails import cache_dashboard_thumbnail
|
from superset.tasks.thumbnails import cache_dashboard_thumbnail
|
||||||
from superset.utils.screenshots import DashboardScreenshot
|
from superset.utils.screenshots import DashboardScreenshot
|
||||||
|
from superset.utils.urls import get_url_path
|
||||||
from superset.views.base import generate_download_headers
|
from superset.views.base import generate_download_headers
|
||||||
from superset.views.base_api import (
|
from superset.views.base_api import (
|
||||||
BaseSupersetModelRestApi,
|
BaseSupersetModelRestApi,
|
||||||
|
@ -504,15 +505,19 @@ class DashboardRestApi(BaseSupersetModelRestApi):
|
||||||
dashboard = self.datamodel.get(pk, self._base_filters)
|
dashboard = self.datamodel.get(pk, self._base_filters)
|
||||||
if not dashboard:
|
if not dashboard:
|
||||||
return self.response_404()
|
return self.response_404()
|
||||||
|
|
||||||
|
dashboard_url = get_url_path("Superset.dashboard", dashboard_id=dashboard.id)
|
||||||
# If force, request a screenshot from the workers
|
# If force, request a screenshot from the workers
|
||||||
if kwargs["rison"].get("force", False):
|
if kwargs["rison"].get("force", False):
|
||||||
cache_dashboard_thumbnail.delay(dashboard.id, force=True)
|
cache_dashboard_thumbnail.delay(dashboard_url, dashboard.digest, force=True)
|
||||||
return self.response(202, message="OK Async")
|
return self.response(202, message="OK Async")
|
||||||
# fetch the dashboard screenshot using the current user and cache if set
|
# fetch the dashboard screenshot using the current user and cache if set
|
||||||
screenshot = DashboardScreenshot(pk).get_from_cache(cache=thumbnail_cache)
|
screenshot = DashboardScreenshot(
|
||||||
|
dashboard_url, dashboard.digest
|
||||||
|
).get_from_cache(cache=thumbnail_cache)
|
||||||
# If the screenshot does not exist, request one from the workers
|
# If the screenshot does not exist, request one from the workers
|
||||||
if not screenshot:
|
if not screenshot:
|
||||||
cache_dashboard_thumbnail.delay(dashboard.id, force=True)
|
cache_dashboard_thumbnail.delay(dashboard_url, dashboard.digest, force=True)
|
||||||
return self.response(202, message="OK Async")
|
return self.response(202, message="OK Async")
|
||||||
# If digests
|
# If digests
|
||||||
if dashboard.digest != digest:
|
if dashboard.digest != digest:
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
import dataclasses
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
@ -34,6 +33,7 @@ from typing import (
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import sqlparse
|
import sqlparse
|
||||||
from flask import g
|
from flask import g
|
||||||
|
|
|
@ -15,10 +15,11 @@
|
||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
# pylint: disable=too-few-public-methods,invalid-name
|
# pylint: disable=too-few-public-methods,invalid-name
|
||||||
from dataclasses import dataclass
|
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
class SupersetErrorType(str, Enum):
|
class SupersetErrorType(str, Enum):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
"""add_alerts
|
||||||
|
|
||||||
|
Revision ID: 2f1d15e8a6af
|
||||||
|
Revises: a72cb0ebeb22
|
||||||
|
Create Date: 2020-05-26 23:21:50.059635
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = "2f1d15e8a6af"
|
||||||
|
down_revision = "a72cb0ebeb22"
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
from sqlalchemy.dialects import mysql
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.create_table(
|
||||||
|
"alerts",
|
||||||
|
sa.Column("id", sa.Integer(), nullable=False),
|
||||||
|
sa.Column("label", sa.String(length=150), nullable=False),
|
||||||
|
sa.Column("active", sa.Boolean(), nullable=True),
|
||||||
|
sa.Column("crontab", sa.String(length=50), nullable=True),
|
||||||
|
sa.Column("sql", sa.Text(), nullable=True),
|
||||||
|
sa.Column("alert_type", sa.String(length=50), nullable=True),
|
||||||
|
sa.Column("log_retention", sa.Integer(), nullable=False, default=90),
|
||||||
|
sa.Column("grace_period", sa.Integer(), nullable=False, default=60 * 60 * 24),
|
||||||
|
sa.Column("recipients", sa.Text(), nullable=True),
|
||||||
|
sa.Column("slice_id", sa.Integer(), nullable=True),
|
||||||
|
sa.Column("database_id", sa.Integer(), nullable=False),
|
||||||
|
sa.Column("dashboard_id", sa.Integer(), nullable=True),
|
||||||
|
sa.Column("last_eval_dttm", sa.DateTime(), nullable=True),
|
||||||
|
sa.Column("last_state", sa.String(length=10), nullable=True),
|
||||||
|
sa.ForeignKeyConstraint(["dashboard_id"], ["dashboards.id"],),
|
||||||
|
sa.ForeignKeyConstraint(["slice_id"], ["slices.id"],),
|
||||||
|
sa.PrimaryKeyConstraint("id"),
|
||||||
|
)
|
||||||
|
op.create_index(op.f("ix_alerts_active"), "alerts", ["active"], unique=False)
|
||||||
|
op.create_table(
|
||||||
|
"alert_logs",
|
||||||
|
sa.Column("id", sa.Integer(), nullable=False),
|
||||||
|
sa.Column("scheduled_dttm", sa.DateTime(), nullable=True),
|
||||||
|
sa.Column("dttm_start", sa.DateTime(), nullable=True),
|
||||||
|
sa.Column("dttm_end", sa.DateTime(), nullable=True),
|
||||||
|
sa.Column("alert_id", sa.Integer(), nullable=True),
|
||||||
|
sa.Column("state", sa.String(length=10), nullable=True),
|
||||||
|
sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],),
|
||||||
|
sa.PrimaryKeyConstraint("id"),
|
||||||
|
)
|
||||||
|
op.create_table(
|
||||||
|
"alert_owner",
|
||||||
|
sa.Column("id", sa.Integer(), nullable=False),
|
||||||
|
sa.Column("user_id", sa.Integer(), nullable=True),
|
||||||
|
sa.Column("alert_id", sa.Integer(), nullable=True),
|
||||||
|
sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],),
|
||||||
|
sa.ForeignKeyConstraint(["user_id"], ["ab_user.id"],),
|
||||||
|
sa.PrimaryKeyConstraint("id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
op.drop_index(op.f("ix_alerts_active"), table_name="alerts")
|
||||||
|
op.drop_table("alert_owner")
|
||||||
|
op.drop_table("alert_logs")
|
||||||
|
op.drop_table("alerts")
|
|
@ -14,4 +14,11 @@
|
||||||
# KIND, either express or implied. See the License for the
|
# KIND, either express or implied. See the License for the
|
||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
from . import core, datasource_access_request, schedules, sql_lab, user_attributes
|
from . import (
|
||||||
|
alerts,
|
||||||
|
core,
|
||||||
|
datasource_access_request,
|
||||||
|
schedules,
|
||||||
|
sql_lab,
|
||||||
|
user_attributes,
|
||||||
|
)
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
"""Models for scheduled execution of jobs"""
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from flask_appbuilder import Model
|
||||||
|
from sqlalchemy import (
|
||||||
|
Boolean,
|
||||||
|
Column,
|
||||||
|
DateTime,
|
||||||
|
ForeignKey,
|
||||||
|
Integer,
|
||||||
|
String,
|
||||||
|
Table,
|
||||||
|
Text,
|
||||||
|
)
|
||||||
|
from sqlalchemy.orm import backref, relationship
|
||||||
|
|
||||||
|
from superset import security_manager
|
||||||
|
|
||||||
|
metadata = Model.metadata # pylint: disable=no-member
|
||||||
|
|
||||||
|
|
||||||
|
alert_owner = Table(
|
||||||
|
"alert_owner",
|
||||||
|
metadata,
|
||||||
|
Column("id", Integer, primary_key=True),
|
||||||
|
Column("user_id", Integer, ForeignKey("ab_user.id")),
|
||||||
|
Column("alert_id", Integer, ForeignKey("alerts.id")),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Alert(Model):
|
||||||
|
|
||||||
|
"""Schedules for emailing slices / dashboards"""
|
||||||
|
|
||||||
|
__tablename__ = "alerts"
|
||||||
|
|
||||||
|
id = Column(Integer, primary_key=True)
|
||||||
|
label = Column(String(150))
|
||||||
|
active = Column(Boolean, default=True, index=True)
|
||||||
|
crontab = Column(String(50))
|
||||||
|
sql = Column(Text)
|
||||||
|
|
||||||
|
alert_type = Column(String(50))
|
||||||
|
owners = relationship(security_manager.user_model, secondary=alert_owner)
|
||||||
|
recipients = Column(Text)
|
||||||
|
|
||||||
|
log_retention = Column(Integer, default=90)
|
||||||
|
grace_period = Column(Integer, default=60 * 60 * 24)
|
||||||
|
|
||||||
|
slice_id = Column(Integer, ForeignKey("slices.id"))
|
||||||
|
slice = relationship("Slice", backref="alerts", foreign_keys=[slice_id])
|
||||||
|
|
||||||
|
dashboard_id = Column(Integer, ForeignKey("dashboards.id"))
|
||||||
|
dashboard = relationship("Dashboard", backref="alert", foreign_keys=[dashboard_id])
|
||||||
|
|
||||||
|
database_id = Column(Integer, ForeignKey("dbs.id"), nullable=False)
|
||||||
|
database = relationship(
|
||||||
|
"Database",
|
||||||
|
foreign_keys=[database_id],
|
||||||
|
backref=backref("alerts", cascade="all, delete-orphan"),
|
||||||
|
)
|
||||||
|
|
||||||
|
last_eval_dttm = Column(DateTime, default=datetime.utcnow)
|
||||||
|
last_state = Column(String(10))
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return f"<{self.id}:{self.label}>"
|
||||||
|
|
||||||
|
|
||||||
|
class AlertLog(Model):
|
||||||
|
"""Keeps track of alert-related operations"""
|
||||||
|
|
||||||
|
__tablename__ = "alert_logs"
|
||||||
|
|
||||||
|
id = Column(Integer, primary_key=True)
|
||||||
|
scheduled_dttm = Column(DateTime)
|
||||||
|
dttm_start = Column(DateTime, default=datetime.utcnow)
|
||||||
|
dttm_end = Column(DateTime, default=datetime.utcnow)
|
||||||
|
alert_id = Column(Integer, ForeignKey("alerts.id"))
|
||||||
|
alert = relationship("Alert", backref="logs", foreign_keys=[alert_id])
|
||||||
|
state = Column(String(10))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def duration(self) -> int:
|
||||||
|
return (self.dttm_end - self.dttm_start).total_seconds()
|
|
@ -51,6 +51,7 @@ from superset.utils.dashboard_filter_scopes_converter import (
|
||||||
convert_filter_scopes,
|
convert_filter_scopes,
|
||||||
copy_filter_scopes,
|
copy_filter_scopes,
|
||||||
)
|
)
|
||||||
|
from superset.utils.urls import get_url_path
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
# pylint: disable=unused-import
|
# pylint: disable=unused-import
|
||||||
|
@ -480,7 +481,8 @@ class Dashboard( # pylint: disable=too-many-instance-attributes
|
||||||
def event_after_dashboard_changed( # pylint: disable=unused-argument
|
def event_after_dashboard_changed( # pylint: disable=unused-argument
|
||||||
mapper: Mapper, connection: Connection, target: Dashboard
|
mapper: Mapper, connection: Connection, target: Dashboard
|
||||||
) -> None:
|
) -> None:
|
||||||
cache_dashboard_thumbnail.delay(target.id, force=True)
|
url = get_url_path("Superset.dashboard", dashboard_id=target.id)
|
||||||
|
cache_dashboard_thumbnail.delay(url, target.digest, force=True)
|
||||||
|
|
||||||
|
|
||||||
# events for updating tags
|
# events for updating tags
|
||||||
|
|
|
@ -24,6 +24,7 @@ from sqlalchemy.ext.declarative import declared_attr
|
||||||
from sqlalchemy.orm import relationship, RelationshipProperty
|
from sqlalchemy.orm import relationship, RelationshipProperty
|
||||||
|
|
||||||
from superset import security_manager
|
from superset import security_manager
|
||||||
|
from superset.models.alerts import Alert
|
||||||
from superset.models.helpers import AuditMixinNullable, ImportMixin
|
from superset.models.helpers import AuditMixinNullable, ImportMixin
|
||||||
|
|
||||||
metadata = Model.metadata # pylint: disable=no-member
|
metadata = Model.metadata # pylint: disable=no-member
|
||||||
|
@ -32,6 +33,7 @@ metadata = Model.metadata # pylint: disable=no-member
|
||||||
class ScheduleType(str, enum.Enum):
|
class ScheduleType(str, enum.Enum):
|
||||||
slice = "slice"
|
slice = "slice"
|
||||||
dashboard = "dashboard"
|
dashboard = "dashboard"
|
||||||
|
alert = "alert"
|
||||||
|
|
||||||
|
|
||||||
class EmailDeliveryType(str, enum.Enum):
|
class EmailDeliveryType(str, enum.Enum):
|
||||||
|
@ -87,9 +89,11 @@ class SliceEmailSchedule(Model, AuditMixinNullable, ImportMixin, EmailSchedule):
|
||||||
email_format = Column(Enum(SliceEmailReportFormat))
|
email_format = Column(Enum(SliceEmailReportFormat))
|
||||||
|
|
||||||
|
|
||||||
def get_scheduler_model(report_type: ScheduleType) -> Optional[Type[EmailSchedule]]:
|
def get_scheduler_model(report_type: str) -> Optional[Type[EmailSchedule]]:
|
||||||
if report_type == ScheduleType.dashboard:
|
if report_type == ScheduleType.dashboard:
|
||||||
return DashboardEmailSchedule
|
return DashboardEmailSchedule
|
||||||
if report_type == ScheduleType.slice:
|
if report_type == ScheduleType.slice:
|
||||||
return SliceEmailSchedule
|
return SliceEmailSchedule
|
||||||
|
if report_type == ScheduleType.alert:
|
||||||
|
return Alert
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -34,6 +34,7 @@ from superset.models.helpers import AuditMixinNullable, ImportMixin
|
||||||
from superset.models.tags import ChartUpdater
|
from superset.models.tags import ChartUpdater
|
||||||
from superset.tasks.thumbnails import cache_chart_thumbnail
|
from superset.tasks.thumbnails import cache_chart_thumbnail
|
||||||
from superset.utils import core as utils
|
from superset.utils import core as utils
|
||||||
|
from superset.utils.urls import get_url_path
|
||||||
|
|
||||||
if is_feature_enabled("SIP_38_VIZ_REARCHITECTURE"):
|
if is_feature_enabled("SIP_38_VIZ_REARCHITECTURE"):
|
||||||
from superset.viz_sip38 import BaseViz, viz_types
|
from superset.viz_sip38 import BaseViz, viz_types
|
||||||
|
@ -340,7 +341,8 @@ def set_related_perm(mapper: Mapper, connection: Connection, target: Slice) -> N
|
||||||
def event_after_chart_changed( # pylint: disable=unused-argument
|
def event_after_chart_changed( # pylint: disable=unused-argument
|
||||||
mapper: Mapper, connection: Connection, target: Slice
|
mapper: Mapper, connection: Connection, target: Slice
|
||||||
) -> None:
|
) -> None:
|
||||||
cache_chart_thumbnail.delay(target.id, force=True)
|
url = get_url_path("Superset.slice", slice_id=target.id, standalone="true")
|
||||||
|
cache_chart_thumbnail.delay(url, target.digest, force=True)
|
||||||
|
|
||||||
|
|
||||||
sqla.event.listen(Slice, "before_insert", set_related_perm)
|
sqla.event.listen(Slice, "before_insert", set_related_perm)
|
||||||
|
|
|
@ -15,12 +15,12 @@
|
||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
import logging
|
import logging
|
||||||
from dataclasses import dataclass
|
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import List, Optional, Set
|
from typing import List, Optional, Set
|
||||||
from urllib import parse
|
from urllib import parse
|
||||||
|
|
||||||
import sqlparse
|
import sqlparse
|
||||||
|
from dataclasses import dataclass
|
||||||
from sqlparse.sql import Identifier, IdentifierList, remove_quotes, Token, TokenList
|
from sqlparse.sql import Identifier, IdentifierList, remove_quotes, Token, TokenList
|
||||||
from sqlparse.tokens import Keyword, Name, Punctuation, String, Whitespace
|
from sqlparse.tokens import Keyword, Name, Punctuation, String, Whitespace
|
||||||
from sqlparse.utils import imt
|
from sqlparse.utils import imt
|
||||||
|
|
|
@ -18,29 +18,42 @@
|
||||||
"""Utility functions used across Superset"""
|
"""Utility functions used across Superset"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import textwrap
|
||||||
import time
|
import time
|
||||||
import urllib.request
|
import urllib.request
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from email.utils import make_msgid, parseaddr
|
from email.utils import make_msgid, parseaddr
|
||||||
from typing import Any, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Callable,
|
||||||
|
Dict,
|
||||||
|
Iterator,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Tuple,
|
||||||
|
TYPE_CHECKING,
|
||||||
|
Union,
|
||||||
|
)
|
||||||
from urllib.error import URLError # pylint: disable=ungrouped-imports
|
from urllib.error import URLError # pylint: disable=ungrouped-imports
|
||||||
|
|
||||||
import croniter
|
import croniter
|
||||||
|
import pandas as pd
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
from celery.app.task import Task
|
from celery.app.task import Task
|
||||||
from dateutil.tz import tzlocal
|
from dateutil.tz import tzlocal
|
||||||
from flask import render_template, Response, session, url_for
|
from flask import current_app, render_template, Response, session, url_for
|
||||||
from flask_babel import gettext as __
|
from flask_babel import gettext as __
|
||||||
from flask_login import login_user
|
from flask_login import login_user
|
||||||
from retry.api import retry_call
|
from retry.api import retry_call
|
||||||
from selenium.common.exceptions import WebDriverException
|
from selenium.common.exceptions import WebDriverException
|
||||||
from selenium.webdriver import chrome, firefox
|
from selenium.webdriver import chrome, firefox
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
from werkzeug.http import parse_cookie
|
from werkzeug.http import parse_cookie
|
||||||
|
|
||||||
# Superset framework imports
|
from superset import app, db, security_manager, thumbnail_cache
|
||||||
from superset import app, db, security_manager
|
|
||||||
from superset.extensions import celery_app
|
from superset.extensions import celery_app
|
||||||
|
from superset.models.alerts import Alert, AlertLog
|
||||||
from superset.models.dashboard import Dashboard
|
from superset.models.dashboard import Dashboard
|
||||||
from superset.models.schedules import (
|
from superset.models.schedules import (
|
||||||
EmailDeliveryType,
|
EmailDeliveryType,
|
||||||
|
@ -49,8 +62,13 @@ from superset.models.schedules import (
|
||||||
SliceEmailReportFormat,
|
SliceEmailReportFormat,
|
||||||
)
|
)
|
||||||
from superset.models.slice import Slice
|
from superset.models.slice import Slice
|
||||||
|
from superset.sql_parse import ParsedQuery
|
||||||
from superset.tasks.slack_util import deliver_slack_msg
|
from superset.tasks.slack_util import deliver_slack_msg
|
||||||
from superset.utils.core import get_email_address_list, send_email_smtp
|
from superset.utils.core import get_email_address_list, send_email_smtp
|
||||||
|
from superset.utils.screenshots import ChartScreenshot
|
||||||
|
from superset.utils.urls import get_url_path
|
||||||
|
|
||||||
|
# pylint: disable=too-few-public-methods
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
# pylint: disable=unused-import
|
# pylint: disable=unused-import
|
||||||
|
@ -99,7 +117,7 @@ def _deliver_email( # pylint: disable=too-many-arguments
|
||||||
subject: str,
|
subject: str,
|
||||||
body: str,
|
body: str,
|
||||||
data: Optional[Dict[str, Any]],
|
data: Optional[Dict[str, Any]],
|
||||||
images: Optional[Dict[str, str]],
|
images: Optional[Dict[str, bytes]],
|
||||||
) -> None:
|
) -> None:
|
||||||
for (to, bcc) in _get_email_to_and_bcc(recipients, deliver_as_group):
|
for (to, bcc) in _get_email_to_and_bcc(recipients, deliver_as_group):
|
||||||
send_email_smtp(
|
send_email_smtp(
|
||||||
|
@ -132,7 +150,7 @@ def _generate_report_content(
|
||||||
|
|
||||||
if delivery_type == EmailDeliveryType.attachment:
|
if delivery_type == EmailDeliveryType.attachment:
|
||||||
images = None
|
images = None
|
||||||
data = {"screenshot.png": screenshot}
|
data = {"screenshot": screenshot}
|
||||||
body = __(
|
body = __(
|
||||||
'<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
|
'<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
|
||||||
name=name,
|
name=name,
|
||||||
|
@ -512,6 +530,140 @@ def schedule_email_report( # pylint: disable=unused-argument
|
||||||
raise RuntimeError("Unknown report type")
|
raise RuntimeError("Unknown report type")
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task(
|
||||||
|
name="alerts.run_query",
|
||||||
|
bind=True,
|
||||||
|
soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
|
||||||
|
)
|
||||||
|
def schedule_alert_query( # pylint: disable=unused-argument
|
||||||
|
task: Task,
|
||||||
|
report_type: ScheduleType,
|
||||||
|
schedule_id: int,
|
||||||
|
recipients: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
model_cls = get_scheduler_model(report_type)
|
||||||
|
dbsession = db.create_scoped_session()
|
||||||
|
schedule = dbsession.query(model_cls).get(schedule_id)
|
||||||
|
|
||||||
|
# The user may have disabled the schedule. If so, ignore this
|
||||||
|
if not schedule or not schedule.active:
|
||||||
|
logger.info("Ignoring deactivated alert")
|
||||||
|
return
|
||||||
|
|
||||||
|
if report_type == ScheduleType.alert:
|
||||||
|
if run_alert_query(schedule, dbsession):
|
||||||
|
# deliver_dashboard OR deliver_slice
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Unknown report type")
|
||||||
|
|
||||||
|
|
||||||
|
class AlertState:
|
||||||
|
ERROR = "error"
|
||||||
|
TRIGGER = "trigger"
|
||||||
|
PASS = "pass"
|
||||||
|
|
||||||
|
|
||||||
|
def deliver_alert(alert: Alert) -> None:
|
||||||
|
logging.info("Triggering alert: %s", alert)
|
||||||
|
img_data = None
|
||||||
|
images = {}
|
||||||
|
if alert.slice:
|
||||||
|
|
||||||
|
chart_url = get_url_path(
|
||||||
|
"Superset.slice", slice_id=alert.slice.id, standalone="true"
|
||||||
|
)
|
||||||
|
screenshot = ChartScreenshot(chart_url, alert.slice.digest)
|
||||||
|
cache_key = screenshot.cache_key()
|
||||||
|
image_url = get_url_path(
|
||||||
|
"ChartRestApi.screenshot", pk=alert.slice.id, digest=cache_key
|
||||||
|
)
|
||||||
|
|
||||||
|
user = security_manager.find_user(current_app.config["THUMBNAIL_SELENIUM_USER"])
|
||||||
|
img_data = screenshot.compute_and_cache(
|
||||||
|
user=user, cache=thumbnail_cache, force=True,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# TODO: dashboard delivery!
|
||||||
|
image_url = "https://media.giphy.com/media/dzaUX7CAG0Ihi/giphy.gif"
|
||||||
|
|
||||||
|
# generate the email
|
||||||
|
subject = f"[Superset] Triggered alert: {alert.label}"
|
||||||
|
deliver_as_group = False
|
||||||
|
data = None
|
||||||
|
if img_data:
|
||||||
|
images = {"screenshot": img_data}
|
||||||
|
body = __(
|
||||||
|
textwrap.dedent(
|
||||||
|
"""\
|
||||||
|
<h2>Alert: %(label)s</h2>
|
||||||
|
<img src="cid:screenshot" alt="%(label)s" />
|
||||||
|
"""
|
||||||
|
),
|
||||||
|
label=alert.label,
|
||||||
|
image_url=image_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
_deliver_email(alert.recipients, deliver_as_group, subject, body, data, images)
|
||||||
|
|
||||||
|
|
||||||
|
def run_alert_query(alert: Alert, dbsession: Session) -> Optional[bool]:
|
||||||
|
"""
|
||||||
|
Execute alert.sql and return value if any rows are returned
|
||||||
|
"""
|
||||||
|
logger.info("Processing alert ID: %i", alert.id)
|
||||||
|
database = alert.database
|
||||||
|
if not database:
|
||||||
|
logger.error("Alert database not preset")
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not alert.sql:
|
||||||
|
logger.error("Alert SQL not preset")
|
||||||
|
return None
|
||||||
|
|
||||||
|
parsed_query = ParsedQuery(alert.sql)
|
||||||
|
sql = parsed_query.stripped()
|
||||||
|
|
||||||
|
state = None
|
||||||
|
dttm_start = datetime.utcnow()
|
||||||
|
|
||||||
|
df = pd.DataFrame()
|
||||||
|
try:
|
||||||
|
logger.info("Evaluating SQL for alert %s", alert)
|
||||||
|
df = database.get_df(sql)
|
||||||
|
except Exception as exc: # pylint: disable=broad-except
|
||||||
|
state = AlertState.ERROR
|
||||||
|
logging.exception(exc)
|
||||||
|
logging.error("Failed at evaluating alert: %s (%s)", alert.label, alert.id)
|
||||||
|
|
||||||
|
dttm_end = datetime.utcnow()
|
||||||
|
|
||||||
|
if state != AlertState.ERROR:
|
||||||
|
alert.last_eval_dttm = datetime.utcnow()
|
||||||
|
if not df.empty:
|
||||||
|
# Looking for truthy cells
|
||||||
|
for row in df.to_records():
|
||||||
|
if any(row):
|
||||||
|
state = AlertState.TRIGGER
|
||||||
|
deliver_alert(alert)
|
||||||
|
break
|
||||||
|
if not state:
|
||||||
|
state = AlertState.PASS
|
||||||
|
|
||||||
|
alert.last_state = state
|
||||||
|
alert.logs.append(
|
||||||
|
AlertLog(
|
||||||
|
scheduled_dttm=dttm_start,
|
||||||
|
dttm_start=dttm_start,
|
||||||
|
dttm_end=dttm_end,
|
||||||
|
state=state,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
dbsession.commit()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def next_schedules(
|
def next_schedules(
|
||||||
crontab: str, start_at: datetime, stop_at: datetime, resolution: int = 0
|
crontab: str, start_at: datetime, stop_at: datetime, resolution: int = 0
|
||||||
) -> Iterator[datetime]:
|
) -> Iterator[datetime]:
|
||||||
|
@ -535,7 +687,7 @@ def next_schedules(
|
||||||
|
|
||||||
|
|
||||||
def schedule_window(
|
def schedule_window(
|
||||||
report_type: ScheduleType, start_at: datetime, stop_at: datetime, resolution: int
|
report_type: str, start_at: datetime, stop_at: datetime, resolution: int
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Find all active schedules and schedule celery tasks for
|
Find all active schedules and schedule celery tasks for
|
||||||
|
@ -551,17 +703,36 @@ def schedule_window(
|
||||||
schedules = dbsession.query(model_cls).filter(model_cls.active.is_(True))
|
schedules = dbsession.query(model_cls).filter(model_cls.active.is_(True))
|
||||||
|
|
||||||
for schedule in schedules:
|
for schedule in schedules:
|
||||||
|
logging.info("Processing schedule %s", schedule)
|
||||||
args = (report_type, schedule.id)
|
args = (report_type, schedule.id)
|
||||||
|
|
||||||
|
if (
|
||||||
|
hasattr(schedule, "last_eval_dttm")
|
||||||
|
and schedule.last_eval_dttm
|
||||||
|
and schedule.last_eval_dttm > start_at
|
||||||
|
):
|
||||||
|
# start_at = schedule.last_eval_dttm + timedelta(seconds=1)
|
||||||
|
pass
|
||||||
# Schedule the job for the specified time window
|
# Schedule the job for the specified time window
|
||||||
for eta in next_schedules(
|
for eta in next_schedules(
|
||||||
schedule.crontab, start_at, stop_at, resolution=resolution
|
schedule.crontab, start_at, stop_at, resolution=resolution
|
||||||
):
|
):
|
||||||
schedule_email_report.apply_async(args, eta=eta)
|
get_scheduler_action(report_type).apply_async(args, eta=eta) # type: ignore
|
||||||
|
break
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_scheduler_action(report_type: str) -> Optional[Callable[..., Any]]:
|
||||||
|
if report_type == ScheduleType.dashboard:
|
||||||
|
return schedule_email_report
|
||||||
|
if report_type == ScheduleType.slice:
|
||||||
|
return schedule_email_report
|
||||||
|
if report_type == ScheduleType.alert:
|
||||||
|
return schedule_alert_query
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(name="email_reports.schedule_hourly")
|
@celery_app.task(name="email_reports.schedule_hourly")
|
||||||
def schedule_hourly() -> None:
|
def schedule_hourly() -> None:
|
||||||
""" Celery beat job meant to be invoked hourly """
|
""" Celery beat job meant to be invoked hourly """
|
||||||
|
@ -577,3 +748,16 @@ def schedule_hourly() -> None:
|
||||||
stop_at = start_at + timedelta(seconds=3600)
|
stop_at = start_at + timedelta(seconds=3600)
|
||||||
schedule_window(ScheduleType.dashboard, start_at, stop_at, resolution)
|
schedule_window(ScheduleType.dashboard, start_at, stop_at, resolution)
|
||||||
schedule_window(ScheduleType.slice, start_at, stop_at, resolution)
|
schedule_window(ScheduleType.slice, start_at, stop_at, resolution)
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task(name="alerts.schedule_check")
|
||||||
|
def schedule_alerts() -> None:
|
||||||
|
""" Celery beat job meant to be invoked every minute to check alerts """
|
||||||
|
resolution = 0
|
||||||
|
now = datetime.utcnow()
|
||||||
|
start_at = now - timedelta(
|
||||||
|
seconds=3600
|
||||||
|
) # process any missed tasks in the past hour
|
||||||
|
stop_at = now + timedelta(seconds=1)
|
||||||
|
|
||||||
|
schedule_window(ScheduleType.alert, start_at, stop_at, resolution)
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
"""Utility functions used across Superset"""
|
"""Utility functions used across Superset"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
|
||||||
|
@ -27,28 +28,45 @@ from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
WindowSize = Tuple[int, int]
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(name="cache_chart_thumbnail", soft_time_limit=300)
|
@celery_app.task(name="cache_chart_thumbnail", soft_time_limit=300)
|
||||||
def cache_chart_thumbnail(chart_id: int, force: bool = False) -> None:
|
def cache_chart_thumbnail(
|
||||||
|
url: str,
|
||||||
|
digest: str,
|
||||||
|
force: bool = False,
|
||||||
|
window_size: Optional[WindowSize] = None,
|
||||||
|
thumb_size: Optional[WindowSize] = None,
|
||||||
|
) -> None:
|
||||||
with app.app_context(): # type: ignore
|
with app.app_context(): # type: ignore
|
||||||
if not thumbnail_cache:
|
if not thumbnail_cache:
|
||||||
logger.warning("No cache set, refusing to compute")
|
logger.warning("No cache set, refusing to compute")
|
||||||
return
|
return None
|
||||||
logging.info("Caching chart %i", chart_id)
|
logging.info("Caching chart at {url}")
|
||||||
screenshot = ChartScreenshot(model_id=chart_id)
|
screenshot = ChartScreenshot(url, digest)
|
||||||
user = security_manager.find_user(current_app.config["THUMBNAIL_SELENIUM_USER"])
|
user = security_manager.find_user(current_app.config["THUMBNAIL_SELENIUM_USER"])
|
||||||
screenshot.compute_and_cache(user=user, cache=thumbnail_cache, force=force)
|
screenshot.compute_and_cache(
|
||||||
|
user=user,
|
||||||
|
cache=thumbnail_cache,
|
||||||
|
force=force,
|
||||||
|
window_size=window_size,
|
||||||
|
thumb_size=thumb_size,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(name="cache_dashboard_thumbnail", soft_time_limit=300)
|
@celery_app.task(name="cache_dashboard_thumbnail", soft_time_limit=300)
|
||||||
def cache_dashboard_thumbnail( # pylint: disable=inconsistent-return-statements
|
def cache_dashboard_thumbnail(
|
||||||
dashboard_id: int, force: bool = False
|
url: str, digest: str, force: bool = False, thumb_size: Optional[WindowSize] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
with app.app_context(): # type: ignore
|
with app.app_context(): # type: ignore
|
||||||
if not thumbnail_cache:
|
if not thumbnail_cache:
|
||||||
logging.warning("No cache set, refusing to compute")
|
logging.warning("No cache set, refusing to compute")
|
||||||
return
|
return
|
||||||
logger.info("Caching dashboard %i", dashboard_id)
|
logger.info("Caching dashboard: %s", url)
|
||||||
screenshot = DashboardScreenshot(model_id=dashboard_id)
|
screenshot = DashboardScreenshot(url, digest)
|
||||||
user = security_manager.find_user(current_app.config["THUMBNAIL_SELENIUM_USER"])
|
user = security_manager.find_user(current_app.config["THUMBNAIL_SELENIUM_USER"])
|
||||||
screenshot.compute_and_cache(user=user, cache=thumbnail_cache, force=force)
|
screenshot.compute_and_cache(
|
||||||
|
user=user, cache=thumbnail_cache, force=force, thumb_size=thumb_size,
|
||||||
|
)
|
||||||
|
|
|
@ -721,7 +721,7 @@ def send_email_smtp( # pylint: disable=invalid-name,too-many-arguments,too-many
|
||||||
config: Dict[str, Any],
|
config: Dict[str, Any],
|
||||||
files: Optional[List[str]] = None,
|
files: Optional[List[str]] = None,
|
||||||
data: Optional[Dict[str, str]] = None,
|
data: Optional[Dict[str, str]] = None,
|
||||||
images: Optional[Dict[str, str]] = None,
|
images: Optional[Dict[str, bytes]] = None,
|
||||||
dryrun: bool = False,
|
dryrun: bool = False,
|
||||||
cc: Optional[str] = None,
|
cc: Optional[str] = None,
|
||||||
bcc: Optional[str] = None,
|
bcc: Optional[str] = None,
|
||||||
|
@ -778,8 +778,8 @@ def send_email_smtp( # pylint: disable=invalid-name,too-many-arguments,too-many
|
||||||
|
|
||||||
# Attach any inline images, which may be required for display in
|
# Attach any inline images, which may be required for display in
|
||||||
# HTML content (inline)
|
# HTML content (inline)
|
||||||
for msgid, body in (images or {}).items():
|
for msgid, imgdata in (images or {}).items():
|
||||||
image = MIMEImage(body)
|
image = MIMEImage(imgdata)
|
||||||
image.add_header("Content-ID", "<%s>" % msgid)
|
image.add_header("Content-ID", "<%s>" % msgid)
|
||||||
image.add_header("Content-Disposition", "inline")
|
image.add_header("Content-Disposition", "inline")
|
||||||
msg.attach(image)
|
msg.attach(image)
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
|
||||||
|
def md5_sha_from_str(val: str) -> str:
|
||||||
|
return hashlib.md5(val.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def md5_sha_from_dict(opts: Dict[Any, Any]) -> str:
|
||||||
|
json_data = json.dumps(opts, sort_keys=True)
|
||||||
|
return md5_sha_from_str(json_data)
|
|
@ -16,11 +16,10 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import urllib.parse
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING
|
from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING, Union
|
||||||
|
|
||||||
from flask import current_app, request, Response, session, url_for
|
from flask import current_app, request, Response, session
|
||||||
from flask_login import login_user
|
from flask_login import login_user
|
||||||
from retry.api import retry_call
|
from retry.api import retry_call
|
||||||
from selenium.common.exceptions import TimeoutException, WebDriverException
|
from selenium.common.exceptions import TimeoutException, WebDriverException
|
||||||
|
@ -31,6 +30,9 @@ from selenium.webdriver.support import expected_conditions as EC
|
||||||
from selenium.webdriver.support.ui import WebDriverWait
|
from selenium.webdriver.support.ui import WebDriverWait
|
||||||
from werkzeug.http import parse_cookie
|
from werkzeug.http import parse_cookie
|
||||||
|
|
||||||
|
from superset.utils.hashing import md5_sha_from_dict
|
||||||
|
from superset.utils.urls import headless_url
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -87,15 +89,6 @@ def auth_driver(driver: WebDriver, user: "User") -> WebDriver:
|
||||||
return driver
|
return driver
|
||||||
|
|
||||||
|
|
||||||
def headless_url(path: str) -> str:
|
|
||||||
return urllib.parse.urljoin(current_app.config.get("WEBDRIVER_BASEURL", ""), path)
|
|
||||||
|
|
||||||
|
|
||||||
def get_url_path(view: str, **kwargs: Any) -> str:
|
|
||||||
with current_app.test_request_context():
|
|
||||||
return headless_url(url_for(view, **kwargs))
|
|
||||||
|
|
||||||
|
|
||||||
class AuthWebDriverProxy:
|
class AuthWebDriverProxy:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
@ -119,6 +112,9 @@ class AuthWebDriverProxy:
|
||||||
options = chrome.options.Options()
|
options = chrome.options.Options()
|
||||||
arg: str = f"--window-size={self._window[0]},{self._window[1]}"
|
arg: str = f"--window-size={self._window[0]},{self._window[1]}"
|
||||||
options.add_argument(arg)
|
options.add_argument(arg)
|
||||||
|
# TODO: 2 lines attempting retina PPI don't seem to be working
|
||||||
|
options.add_argument("--force-device-scale-factor=2.0")
|
||||||
|
options.add_argument("--high-dpi-support=2.0")
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Webdriver name ({self._driver_type}) not supported")
|
raise Exception(f"Webdriver name ({self._driver_type}) not supported")
|
||||||
# Prepare args for the webdriver init
|
# Prepare args for the webdriver init
|
||||||
|
@ -149,7 +145,11 @@ class AuthWebDriverProxy:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_screenshot(
|
def get_screenshot(
|
||||||
self, url: str, element_name: str, user: "User", retries: int = SELENIUM_RETRIES
|
self,
|
||||||
|
url: str,
|
||||||
|
element_name: str,
|
||||||
|
user: "User",
|
||||||
|
retries: int = SELENIUM_RETRIES,
|
||||||
) -> Optional[bytes]:
|
) -> Optional[bytes]:
|
||||||
driver = self.auth(user)
|
driver = self.auth(user)
|
||||||
driver.set_window_size(*self._window)
|
driver.set_window_size(*self._window)
|
||||||
|
@ -187,21 +187,36 @@ class BaseScreenshot:
|
||||||
window_size: WindowSize = (800, 600)
|
window_size: WindowSize = (800, 600)
|
||||||
thumb_size: WindowSize = (400, 300)
|
thumb_size: WindowSize = (400, 300)
|
||||||
|
|
||||||
def __init__(self, model_id: int):
|
def __init__(self, url: str, digest: str):
|
||||||
self.model_id: int = model_id
|
self.digest: str = digest
|
||||||
|
self.url = url
|
||||||
self.screenshot: Optional[bytes] = None
|
self.screenshot: Optional[bytes] = None
|
||||||
self._driver = AuthWebDriverProxy(self.driver_type, self.window_size)
|
|
||||||
|
|
||||||
@property
|
def driver(self, window_size: Optional[WindowSize] = None) -> AuthWebDriverProxy:
|
||||||
def cache_key(self) -> str:
|
window_size = window_size or self.window_size
|
||||||
return f"thumb__{self.thumbnail_type}__{self.model_id}"
|
return AuthWebDriverProxy(self.driver_type, window_size)
|
||||||
|
|
||||||
@property
|
def cache_key(
|
||||||
def url(self) -> str:
|
self,
|
||||||
raise NotImplementedError()
|
window_size: Optional[Union[bool, WindowSize]] = None,
|
||||||
|
thumb_size: Optional[Union[bool, WindowSize]] = None,
|
||||||
|
) -> str:
|
||||||
|
window_size = window_size or self.window_size
|
||||||
|
thumb_size = thumb_size or self.thumb_size
|
||||||
|
args = {
|
||||||
|
"thumbnail_type": self.thumbnail_type,
|
||||||
|
"digest": self.digest,
|
||||||
|
"type": "thumb",
|
||||||
|
"window_size": window_size,
|
||||||
|
"thumb_size": thumb_size,
|
||||||
|
}
|
||||||
|
return md5_sha_from_dict(args)
|
||||||
|
|
||||||
def get_screenshot(self, user: "User") -> Optional[bytes]:
|
def get_screenshot(
|
||||||
self.screenshot = self._driver.get_screenshot(self.url, self.element, user)
|
self, user: "User", window_size: Optional[WindowSize] = None
|
||||||
|
) -> Optional[bytes]:
|
||||||
|
driver = self.driver(window_size)
|
||||||
|
self.screenshot = driver.get_screenshot(self.url, self.element, user)
|
||||||
return self.screenshot
|
return self.screenshot
|
||||||
|
|
||||||
def get(
|
def get(
|
||||||
|
@ -218,28 +233,41 @@ class BaseScreenshot:
|
||||||
:param thumb_size: Override thumbnail site
|
:param thumb_size: Override thumbnail site
|
||||||
"""
|
"""
|
||||||
payload: Optional[bytes] = None
|
payload: Optional[bytes] = None
|
||||||
thumb_size = thumb_size or self.thumb_size
|
cache_key = self.cache_key(self.window_size, thumb_size)
|
||||||
if cache:
|
if cache:
|
||||||
payload = cache.get(self.cache_key)
|
payload = cache.get(cache_key)
|
||||||
if not payload:
|
if not payload:
|
||||||
payload = self.compute_and_cache(
|
payload = self.compute_and_cache(
|
||||||
user=user, thumb_size=thumb_size, cache=cache
|
user=user, thumb_size=thumb_size, cache=cache
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.info("Loaded thumbnail from cache: %s", self.cache_key)
|
logger.info("Loaded thumbnail from cache: %s", cache_key)
|
||||||
if payload:
|
if payload:
|
||||||
return BytesIO(payload)
|
return BytesIO(payload)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_from_cache(self, cache: "Cache") -> Optional[BytesIO]:
|
def get_from_cache(
|
||||||
payload = cache.get(self.cache_key)
|
self,
|
||||||
|
cache: "Cache",
|
||||||
|
window_size: Optional[WindowSize] = None,
|
||||||
|
thumb_size: Optional[WindowSize] = None,
|
||||||
|
) -> Optional[BytesIO]:
|
||||||
|
cache_key = self.cache_key(window_size, thumb_size)
|
||||||
|
return self.get_from_cache_key(cache, cache_key)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_from_cache_key(cache: "Cache", cache_key: str) -> Optional[BytesIO]:
|
||||||
|
logger.info("Attempting to get from cache: %s", cache_key)
|
||||||
|
payload = cache.get(cache_key)
|
||||||
if payload:
|
if payload:
|
||||||
return BytesIO(payload)
|
return BytesIO(payload)
|
||||||
|
logger.info("Failed at getting from cache: %s", cache_key)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def compute_and_cache( # pylint: disable=too-many-arguments
|
def compute_and_cache( # pylint: disable=too-many-arguments
|
||||||
self,
|
self,
|
||||||
user: "User" = None,
|
user: "User" = None,
|
||||||
|
window_size: Optional[WindowSize] = None,
|
||||||
thumb_size: Optional[WindowSize] = None,
|
thumb_size: Optional[WindowSize] = None,
|
||||||
cache: "Cache" = None,
|
cache: "Cache" = None,
|
||||||
force: bool = True,
|
force: bool = True,
|
||||||
|
@ -254,22 +282,23 @@ class BaseScreenshot:
|
||||||
:param force: Will force the computation even if it's already cached
|
:param force: Will force the computation even if it's already cached
|
||||||
:return: Image payload
|
:return: Image payload
|
||||||
"""
|
"""
|
||||||
cache_key = self.cache_key
|
cache_key = self.cache_key(window_size, thumb_size)
|
||||||
|
window_size = window_size or self.window_size
|
||||||
|
thumb_size = thumb_size or self.thumb_size
|
||||||
if not force and cache and cache.get(cache_key):
|
if not force and cache and cache.get(cache_key):
|
||||||
logger.info("Thumb already cached, skipping...")
|
logger.info("Thumb already cached, skipping...")
|
||||||
return None
|
return None
|
||||||
thumb_size = thumb_size or self.thumb_size
|
|
||||||
logger.info("Processing url for thumbnail: %s", cache_key)
|
logger.info("Processing url for thumbnail: %s", cache_key)
|
||||||
|
|
||||||
payload = None
|
payload = None
|
||||||
|
|
||||||
# Assuming all sorts of things can go wrong with Selenium
|
# Assuming all sorts of things can go wrong with Selenium
|
||||||
try:
|
try:
|
||||||
payload = self.get_screenshot(user=user)
|
payload = self.get_screenshot(user=user, window_size=window_size)
|
||||||
except Exception as ex: # pylint: disable=broad-except
|
except Exception as ex: # pylint: disable=broad-except
|
||||||
logger.error("Failed at generating thumbnail %s", ex)
|
logger.error("Failed at generating thumbnail %s", ex)
|
||||||
|
|
||||||
if payload and self.window_size != thumb_size:
|
if payload and window_size != thumb_size:
|
||||||
try:
|
try:
|
||||||
payload = self.resize_image(payload, thumb_size=thumb_size)
|
payload = self.resize_image(payload, thumb_size=thumb_size)
|
||||||
except Exception as ex: # pylint: disable=broad-except
|
except Exception as ex: # pylint: disable=broad-except
|
||||||
|
@ -277,8 +306,9 @@ class BaseScreenshot:
|
||||||
payload = None
|
payload = None
|
||||||
|
|
||||||
if payload and cache:
|
if payload and cache:
|
||||||
logger.info("Caching thumbnail: %s %s", cache_key, str(cache))
|
logger.info("Caching thumbnail: %s", cache_key)
|
||||||
cache.set(cache_key, payload)
|
cache.set(cache_key, payload)
|
||||||
|
logger.info("Done caching thumbnail")
|
||||||
return payload
|
return payload
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -310,12 +340,8 @@ class BaseScreenshot:
|
||||||
class ChartScreenshot(BaseScreenshot):
|
class ChartScreenshot(BaseScreenshot):
|
||||||
thumbnail_type: str = "chart"
|
thumbnail_type: str = "chart"
|
||||||
element: str = "chart-container"
|
element: str = "chart-container"
|
||||||
window_size: WindowSize = (600, int(600 * 0.75))
|
window_size: WindowSize = (800, 600)
|
||||||
thumb_size: WindowSize = (300, int(300 * 0.75))
|
thumb_size: WindowSize = (800, 600)
|
||||||
|
|
||||||
@property
|
|
||||||
def url(self) -> str:
|
|
||||||
return get_url_path("Superset.slice", slice_id=self.model_id, standalone="true")
|
|
||||||
|
|
||||||
|
|
||||||
class DashboardScreenshot(BaseScreenshot):
|
class DashboardScreenshot(BaseScreenshot):
|
||||||
|
@ -323,7 +349,3 @@ class DashboardScreenshot(BaseScreenshot):
|
||||||
element: str = "grid-container"
|
element: str = "grid-container"
|
||||||
window_size: WindowSize = (1600, int(1600 * 0.75))
|
window_size: WindowSize = (1600, int(1600 * 0.75))
|
||||||
thumb_size: WindowSize = (400, int(400 * 0.75))
|
thumb_size: WindowSize = (400, int(400 * 0.75))
|
||||||
|
|
||||||
@property
|
|
||||||
def url(self) -> str:
|
|
||||||
return get_url_path("Superset.dashboard", dashboard_id=self.model_id)
|
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
import urllib
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from flask import current_app, url_for
|
||||||
|
|
||||||
|
|
||||||
|
def headless_url(path: str) -> str:
|
||||||
|
base_url = current_app.config.get("WEBDRIVER_BASEURL", "")
|
||||||
|
return urllib.parse.urljoin(base_url, path)
|
||||||
|
|
||||||
|
|
||||||
|
def get_url_path(view: str, **kwargs: Any) -> str:
|
||||||
|
with current_app.test_request_context():
|
||||||
|
return headless_url(url_for(view, **kwargs))
|
|
@ -16,6 +16,7 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
from . import (
|
from . import (
|
||||||
access_requests,
|
access_requests,
|
||||||
|
alerts,
|
||||||
annotations,
|
annotations,
|
||||||
api,
|
api,
|
||||||
base,
|
base,
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from flask_appbuilder import CompactCRUDMixin
|
||||||
|
from flask_appbuilder.models.sqla.interface import SQLAInterface
|
||||||
|
from flask_babel import lazy_gettext as _
|
||||||
|
|
||||||
|
from superset.constants import RouteMethod
|
||||||
|
from superset.models.alerts import Alert, AlertLog
|
||||||
|
from superset.utils.core import markdown
|
||||||
|
|
||||||
|
from .base import SupersetModelView
|
||||||
|
|
||||||
|
# TODO: access control rules for this module
|
||||||
|
|
||||||
|
|
||||||
|
class AlertLogModelView(
|
||||||
|
CompactCRUDMixin, SupersetModelView
|
||||||
|
): # pylint: disable=too-many-ancestors
|
||||||
|
datamodel = SQLAInterface(AlertLog)
|
||||||
|
include_route_methods = {RouteMethod.LIST} | {"show"}
|
||||||
|
list_columns = (
|
||||||
|
"scheduled_dttm",
|
||||||
|
"dttm_start",
|
||||||
|
"duration",
|
||||||
|
"state",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AlertModelView(SupersetModelView): # pylint: disable=too-many-ancestors
|
||||||
|
datamodel = SQLAInterface(Alert)
|
||||||
|
route_base = "/alert"
|
||||||
|
include_route_methods = RouteMethod.CRUD_SET
|
||||||
|
|
||||||
|
list_columns = (
|
||||||
|
"label",
|
||||||
|
"database",
|
||||||
|
"crontab",
|
||||||
|
"last_eval_dttm",
|
||||||
|
"last_state",
|
||||||
|
"active",
|
||||||
|
)
|
||||||
|
add_columns = (
|
||||||
|
"label",
|
||||||
|
"active",
|
||||||
|
"crontab",
|
||||||
|
"database",
|
||||||
|
"sql",
|
||||||
|
"alert_type",
|
||||||
|
"owners",
|
||||||
|
"recipients",
|
||||||
|
"slice",
|
||||||
|
"dashboard",
|
||||||
|
"log_retention",
|
||||||
|
"grace_period",
|
||||||
|
)
|
||||||
|
label_columns = {
|
||||||
|
"sql": "SQL",
|
||||||
|
"log_retention": _("Log Retentions (days)"),
|
||||||
|
}
|
||||||
|
description_columns = {
|
||||||
|
"sql": _(
|
||||||
|
"A SQL statement that defines whether the alert should get "
|
||||||
|
"triggered or not. If the statement return no row, the alert "
|
||||||
|
"is not triggered. If the statement returns one or many rows, "
|
||||||
|
"the cells will be evaluated to see if they are 'truthy' "
|
||||||
|
"if any cell is truthy, the alert will fire. Truthy values "
|
||||||
|
"are non zero, non null, non empty strings."
|
||||||
|
),
|
||||||
|
"crontab": markdown(
|
||||||
|
"A CRON-like expression. "
|
||||||
|
"[Crontab Guru](https://crontab.guru/) is "
|
||||||
|
"a helpful resource that can help you craft a CRON expression.",
|
||||||
|
True,
|
||||||
|
),
|
||||||
|
"recipients": _("A semicolon ';' delimited list of email addresses"),
|
||||||
|
"log_retention": _("How long to keep the logs around for this alert"),
|
||||||
|
"grace_period": _(
|
||||||
|
"Once an alert is triggered, how long, in seconds, before "
|
||||||
|
"Superset nags you again."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
edit_columns = add_columns
|
||||||
|
related_views = [AlertLogModelView]
|
|
@ -14,13 +14,13 @@
|
||||||
# KIND, either express or implied. See the License for the
|
# KIND, either express or implied. See the License for the
|
||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
import dataclasses
|
|
||||||
import functools
|
import functools
|
||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any, Callable, cast, Dict, List, Optional, TYPE_CHECKING, Union
|
from typing import Any, Callable, cast, Dict, List, Optional, TYPE_CHECKING, Union
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
import yaml
|
import yaml
|
||||||
from flask import abort, flash, g, get_flashed_messages, redirect, Response, session
|
from flask import abort, flash, g, get_flashed_messages, redirect, Response, session
|
||||||
|
|
|
@ -21,8 +21,6 @@ These objects represent the backend of all the visualizations that
|
||||||
Superset can render.
|
Superset can render.
|
||||||
"""
|
"""
|
||||||
import copy
|
import copy
|
||||||
import dataclasses
|
|
||||||
import hashlib
|
|
||||||
import inspect
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
import math
|
import math
|
||||||
|
@ -33,6 +31,7 @@ from datetime import datetime, timedelta
|
||||||
from itertools import product
|
from itertools import product
|
||||||
from typing import Any, cast, Dict, List, Optional, Set, Tuple, TYPE_CHECKING, Union
|
from typing import Any, cast, Dict, List, Optional, Set, Tuple, TYPE_CHECKING, Union
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
import geohash
|
import geohash
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
@ -62,6 +61,7 @@ from superset.utils.core import (
|
||||||
QueryMode,
|
QueryMode,
|
||||||
to_adhoc,
|
to_adhoc,
|
||||||
)
|
)
|
||||||
|
from superset.utils.hashing import md5_sha_from_str
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from superset.connectors.base.models import BaseDatasource
|
from superset.connectors.base.models import BaseDatasource
|
||||||
|
@ -411,7 +411,7 @@ class BaseViz:
|
||||||
)
|
)
|
||||||
cache_dict["changed_on"] = self.datasource.changed_on
|
cache_dict["changed_on"] = self.datasource.changed_on
|
||||||
json_data = self.json_dumps(cache_dict, sort_keys=True)
|
json_data = self.json_dumps(cache_dict, sort_keys=True)
|
||||||
return hashlib.md5(json_data.encode("utf-8")).hexdigest()
|
return md5_sha_from_str(json_data)
|
||||||
|
|
||||||
def get_payload(self, query_obj: Optional[QueryObjectDict] = None) -> VizPayload:
|
def get_payload(self, query_obj: Optional[QueryObjectDict] = None) -> VizPayload:
|
||||||
"""Returns a payload of metadata and data"""
|
"""Returns a payload of metadata and data"""
|
||||||
|
|
|
@ -22,7 +22,6 @@ Superset can render.
|
||||||
"""
|
"""
|
||||||
# mypy: ignore-errors
|
# mypy: ignore-errors
|
||||||
import copy
|
import copy
|
||||||
import dataclasses
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import inspect
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
|
@ -34,6 +33,7 @@ from datetime import datetime, timedelta
|
||||||
from itertools import product
|
from itertools import product
|
||||||
from typing import Any, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
|
from typing import Any, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
import geohash
|
import geohash
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
|
@ -242,7 +242,7 @@ class TestSchedules(SupersetTestCase):
|
||||||
send_email_smtp.assert_called_once()
|
send_email_smtp.assert_called_once()
|
||||||
self.assertIsNone(send_email_smtp.call_args[1]["images"])
|
self.assertIsNone(send_email_smtp.call_args[1]["images"])
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
send_email_smtp.call_args[1]["data"]["screenshot.png"],
|
send_email_smtp.call_args[1]["data"]["screenshot"],
|
||||||
element.screenshot_as_png,
|
element.screenshot_as_png,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -425,7 +425,7 @@ class TestSchedules(SupersetTestCase):
|
||||||
send_email_smtp.assert_called_once()
|
send_email_smtp.assert_called_once()
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
send_email_smtp.call_args[1]["data"]["screenshot.png"],
|
send_email_smtp.call_args[1]["data"]["screenshot"],
|
||||||
element.screenshot_as_png,
|
element.screenshot_as_png,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ from superset.utils.screenshots import (
|
||||||
DashboardScreenshot,
|
DashboardScreenshot,
|
||||||
get_auth_cookies,
|
get_auth_cookies,
|
||||||
)
|
)
|
||||||
|
from superset.utils.urls import get_url_path
|
||||||
from tests.test_app import app
|
from tests.test_app import app
|
||||||
|
|
||||||
from .base_tests import SupersetTestCase
|
from .base_tests import SupersetTestCase
|
||||||
|
@ -163,8 +164,9 @@ class TestThumbnails(SupersetTestCase):
|
||||||
Thumbnails: Simple get chart with wrong digest
|
Thumbnails: Simple get chart with wrong digest
|
||||||
"""
|
"""
|
||||||
chart = db.session.query(Slice).all()[0]
|
chart = db.session.query(Slice).all()[0]
|
||||||
|
chart_url = get_url_path("Superset.slice", slice_id=chart.id, standalone="true")
|
||||||
# Cache a test "image"
|
# Cache a test "image"
|
||||||
screenshot = ChartScreenshot(model_id=chart.id)
|
screenshot = ChartScreenshot(chart_url, chart.digest)
|
||||||
thumbnail_cache.set(screenshot.cache_key, self.mock_image)
|
thumbnail_cache.set(screenshot.cache_key, self.mock_image)
|
||||||
self.login(username="admin")
|
self.login(username="admin")
|
||||||
uri = f"api/v1/chart/{chart.id}/thumbnail/1234/"
|
uri = f"api/v1/chart/{chart.id}/thumbnail/1234/"
|
||||||
|
@ -178,8 +180,9 @@ class TestThumbnails(SupersetTestCase):
|
||||||
Thumbnails: Simple get cached dashboard screenshot
|
Thumbnails: Simple get cached dashboard screenshot
|
||||||
"""
|
"""
|
||||||
dashboard = db.session.query(Dashboard).all()[0]
|
dashboard = db.session.query(Dashboard).all()[0]
|
||||||
|
dashboard_url = get_url_path("Superset.dashboard", dashboard_id=dashboard.id)
|
||||||
# Cache a test "image"
|
# Cache a test "image"
|
||||||
screenshot = DashboardScreenshot(model_id=dashboard.id)
|
screenshot = DashboardScreenshot(dashboard_url, dashboard.digest)
|
||||||
thumbnail_cache.set(screenshot.cache_key, self.mock_image)
|
thumbnail_cache.set(screenshot.cache_key, self.mock_image)
|
||||||
self.login(username="admin")
|
self.login(username="admin")
|
||||||
uri = f"api/v1/dashboard/{dashboard.id}/thumbnail/{dashboard.digest}/"
|
uri = f"api/v1/dashboard/{dashboard.id}/thumbnail/{dashboard.digest}/"
|
||||||
|
@ -193,8 +196,9 @@ class TestThumbnails(SupersetTestCase):
|
||||||
Thumbnails: Simple get cached chart screenshot
|
Thumbnails: Simple get cached chart screenshot
|
||||||
"""
|
"""
|
||||||
chart = db.session.query(Slice).all()[0]
|
chart = db.session.query(Slice).all()[0]
|
||||||
|
chart_url = get_url_path("Superset.slice", slice_id=chart.id, standalone="true")
|
||||||
# Cache a test "image"
|
# Cache a test "image"
|
||||||
screenshot = ChartScreenshot(model_id=chart.id)
|
screenshot = ChartScreenshot(chart_url, chart.digest)
|
||||||
thumbnail_cache.set(screenshot.cache_key, self.mock_image)
|
thumbnail_cache.set(screenshot.cache_key, self.mock_image)
|
||||||
self.login(username="admin")
|
self.login(username="admin")
|
||||||
uri = f"api/v1/chart/{chart.id}/thumbnail/{chart.digest}/"
|
uri = f"api/v1/chart/{chart.id}/thumbnail/{chart.digest}/"
|
||||||
|
@ -208,8 +212,9 @@ class TestThumbnails(SupersetTestCase):
|
||||||
Thumbnails: Simple get dashboard with wrong digest
|
Thumbnails: Simple get dashboard with wrong digest
|
||||||
"""
|
"""
|
||||||
dashboard = db.session.query(Dashboard).all()[0]
|
dashboard = db.session.query(Dashboard).all()[0]
|
||||||
|
dashboard_url = get_url_path("Superset.dashboard", dashboard_id=dashboard.id)
|
||||||
# Cache a test "image"
|
# Cache a test "image"
|
||||||
screenshot = DashboardScreenshot(model_id=dashboard.id)
|
screenshot = DashboardScreenshot(dashboard_url, dashboard.digest)
|
||||||
thumbnail_cache.set(screenshot.cache_key, self.mock_image)
|
thumbnail_cache.set(screenshot.cache_key, self.mock_image)
|
||||||
self.login(username="admin")
|
self.login(username="admin")
|
||||||
uri = f"api/v1/dashboard/{dashboard.id}/thumbnail/1234/"
|
uri = f"api/v1/dashboard/{dashboard.id}/thumbnail/1234/"
|
||||||
|
|
Loading…
Reference in New Issue