mirror of https://github.com/apache/superset.git
fix(alerts&reports): add celery soft timeout support (#13436)
* fix(alerts&reports): add celery soft timeout support * make a specific exception for screenshots timeout * fix docs, add new test
This commit is contained in:
parent
49eeab6f55
commit
139c7878a5
|
@ -20,6 +20,8 @@ from operator import eq, ge, gt, le, lt, ne
|
|||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from flask_babel import lazy_gettext as _
|
||||
|
||||
from superset import jinja_context
|
||||
|
@ -30,6 +32,7 @@ from superset.reports.commands.exceptions import (
|
|||
AlertQueryInvalidTypeError,
|
||||
AlertQueryMultipleColumnsError,
|
||||
AlertQueryMultipleRowsError,
|
||||
AlertQueryTimeout,
|
||||
AlertValidatorConfigError,
|
||||
)
|
||||
|
||||
|
@ -48,6 +51,20 @@ class AlertCommand(BaseCommand):
|
|||
self._result: Optional[float] = None
|
||||
|
||||
def run(self) -> bool:
|
||||
"""
|
||||
Executes an alert SQL query and validates it.
|
||||
Will set the report_schedule.last_value or last_value_row_json
|
||||
with the query result
|
||||
|
||||
:return: bool, if the alert triggered or not
|
||||
:raises AlertQueryError: SQL query is not valid
|
||||
:raises AlertQueryInvalidTypeError: The output from the SQL query
|
||||
is not an allowed type
|
||||
:raises AlertQueryMultipleColumnsError: The SQL query returned multiple columns
|
||||
:raises AlertQueryMultipleRowsError: The SQL query returned multiple rows
|
||||
:raises AlertQueryTimeout: The SQL query received a celery soft timeout
|
||||
:raises AlertValidatorConfigError: The validator query data is not valid
|
||||
"""
|
||||
self.validate()
|
||||
|
||||
if self._is_validator_not_null:
|
||||
|
@ -112,9 +129,13 @@ class AlertCommand(BaseCommand):
|
|||
self._report_schedule.validator_type == ReportScheduleValidatorType.OPERATOR
|
||||
)
|
||||
|
||||
def validate(self) -> None:
|
||||
def _execute_query(self) -> pd.DataFrame:
|
||||
"""
|
||||
Validate the query result as a Pandas DataFrame
|
||||
Executes the actual alert SQL query template
|
||||
|
||||
:return: A pandas dataframe
|
||||
:raises AlertQueryError: SQL query is not valid
|
||||
:raises AlertQueryTimeout: The SQL query received a celery soft timeout
|
||||
"""
|
||||
sql_template = jinja_context.get_template_processor(
|
||||
database=self._report_schedule.database
|
||||
|
@ -124,10 +145,18 @@ class AlertCommand(BaseCommand):
|
|||
limited_rendered_sql = self._report_schedule.database.apply_limit_to_sql(
|
||||
rendered_sql, ALERT_SQL_LIMIT
|
||||
)
|
||||
df = self._report_schedule.database.get_df(limited_rendered_sql)
|
||||
return self._report_schedule.database.get_df(limited_rendered_sql)
|
||||
except SoftTimeLimitExceeded:
|
||||
raise AlertQueryTimeout()
|
||||
except Exception as ex:
|
||||
raise AlertQueryError(message=str(ex))
|
||||
|
||||
def validate(self) -> None:
|
||||
"""
|
||||
Validate the query result as a Pandas DataFrame
|
||||
"""
|
||||
df = self._execute_query()
|
||||
|
||||
if df.empty and self._is_validator_not_null:
|
||||
self._result = None
|
||||
return
|
||||
|
|
|
@ -155,6 +155,14 @@ class AlertQueryError(CommandException):
|
|||
message = _("Alert found an error while executing a query.")
|
||||
|
||||
|
||||
class AlertQueryTimeout(CommandException):
|
||||
message = _("A timeout occurred while executing the query.")
|
||||
|
||||
|
||||
class ReportScheduleScreenshotTimeout(CommandException):
|
||||
message = _("A timeout occurred while taking a screenshot.")
|
||||
|
||||
|
||||
class ReportScheduleAlertGracePeriodError(CommandException):
|
||||
message = _("Alert fired during grace period.")
|
||||
|
||||
|
|
|
@ -18,10 +18,11 @@ import logging
|
|||
from datetime import datetime, timedelta
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from flask_appbuilder.security.sqla.models import User
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from superset import app, thumbnail_cache
|
||||
from superset import app
|
||||
from superset.commands.base import BaseCommand
|
||||
from superset.commands.exceptions import CommandException
|
||||
from superset.models.reports import (
|
||||
|
@ -39,6 +40,7 @@ from superset.reports.commands.exceptions import (
|
|||
ReportScheduleNotificationError,
|
||||
ReportSchedulePreviousWorkingError,
|
||||
ReportScheduleScreenshotFailedError,
|
||||
ReportScheduleScreenshotTimeout,
|
||||
ReportScheduleSelleniumUserNotFoundError,
|
||||
ReportScheduleStateNotFoundError,
|
||||
ReportScheduleUnexpectedError,
|
||||
|
@ -172,9 +174,14 @@ class BaseReportState:
|
|||
)
|
||||
image_url = self._get_url(user_friendly=True)
|
||||
user = self._get_screenshot_user()
|
||||
image_data = screenshot.compute_and_cache(
|
||||
user=user, cache=thumbnail_cache, force=True,
|
||||
)
|
||||
try:
|
||||
image_data = screenshot.get_screenshot(user=user)
|
||||
except SoftTimeLimitExceeded:
|
||||
raise ReportScheduleScreenshotTimeout()
|
||||
except Exception as ex:
|
||||
raise ReportScheduleScreenshotFailedError(
|
||||
f"Failed taking a screenshot {str(ex)}"
|
||||
)
|
||||
if not image_data:
|
||||
raise ReportScheduleScreenshotFailedError()
|
||||
return ScreenshotData(url=image_url, image=image_data)
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Optional
|
||||
from unittest.mock import Mock, patch
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from contextlib2 import contextmanager
|
||||
|
@ -46,6 +46,8 @@ from superset.reports.commands.exceptions import (
|
|||
ReportScheduleNotFoundError,
|
||||
ReportScheduleNotificationError,
|
||||
ReportSchedulePreviousWorkingError,
|
||||
ReportScheduleScreenshotFailedError,
|
||||
ReportScheduleScreenshotTimeout,
|
||||
ReportScheduleWorkingTimeoutError,
|
||||
)
|
||||
from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand
|
||||
|
@ -503,7 +505,7 @@ def create_invalid_sql_alert_email_chart(request):
|
|||
"load_birth_names_dashboard_with_slices", "create_report_email_chart"
|
||||
)
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
|
||||
def test_email_chart_report_schedule(
|
||||
screenshot_mock, email_mock, create_report_email_chart
|
||||
):
|
||||
|
@ -541,7 +543,7 @@ def test_email_chart_report_schedule(
|
|||
"load_birth_names_dashboard_with_slices", "create_report_email_dashboard"
|
||||
)
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
@patch("superset.utils.screenshots.DashboardScreenshot.compute_and_cache")
|
||||
@patch("superset.utils.screenshots.DashboardScreenshot.get_screenshot")
|
||||
def test_email_dashboard_report_schedule(
|
||||
screenshot_mock, email_mock, create_report_email_dashboard
|
||||
):
|
||||
|
@ -573,7 +575,7 @@ def test_email_dashboard_report_schedule(
|
|||
"load_birth_names_dashboard_with_slices", "create_report_slack_chart"
|
||||
)
|
||||
@patch("superset.reports.notifications.slack.WebClient.files_upload")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
|
||||
def test_slack_chart_report_schedule(
|
||||
screenshot_mock, file_upload_mock, create_report_slack_chart
|
||||
):
|
||||
|
@ -694,7 +696,7 @@ def test_report_schedule_success_grace_end(create_alert_slack_chart_grace):
|
|||
|
||||
@pytest.mark.usefixtures("create_alert_email_chart")
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
|
||||
def test_alert_limit_is_applied(screenshot_mock, email_mock, create_alert_email_chart):
|
||||
"""
|
||||
ExecuteReport Command: Test that all alerts apply a SQL limit to stmts
|
||||
|
@ -718,7 +720,7 @@ def test_alert_limit_is_applied(screenshot_mock, email_mock, create_alert_email_
|
|||
"load_birth_names_dashboard_with_slices", "create_report_email_dashboard"
|
||||
)
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
@patch("superset.utils.screenshots.DashboardScreenshot.compute_and_cache")
|
||||
@patch("superset.utils.screenshots.DashboardScreenshot.get_screenshot")
|
||||
def test_email_dashboard_report_fails(
|
||||
screenshot_mock, email_mock, create_report_email_dashboard
|
||||
):
|
||||
|
@ -744,7 +746,7 @@ def test_email_dashboard_report_fails(
|
|||
"load_birth_names_dashboard_with_slices", "create_alert_email_chart"
|
||||
)
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
|
||||
def test_slack_chart_alert(screenshot_mock, email_mock, create_alert_email_chart):
|
||||
"""
|
||||
ExecuteReport Command: Test chart slack alert
|
||||
|
@ -794,6 +796,89 @@ def test_email_mul_alert(create_mul_alert_email_chart):
|
|||
).run()
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"load_birth_names_dashboard_with_slices", "create_alert_email_chart"
|
||||
)
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
def test_soft_timeout_alert(email_mock, create_alert_email_chart):
|
||||
"""
|
||||
ExecuteReport Command: Test soft timeout on alert queries
|
||||
"""
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from superset.reports.commands.exceptions import AlertQueryTimeout
|
||||
|
||||
with patch.object(
|
||||
create_alert_email_chart.database.db_engine_spec, "execute", return_value=None
|
||||
) as execute_mock:
|
||||
execute_mock.side_effect = SoftTimeLimitExceeded()
|
||||
with pytest.raises(AlertQueryTimeout):
|
||||
AsyncExecuteReportScheduleCommand(
|
||||
create_alert_email_chart.id, datetime.utcnow()
|
||||
).run()
|
||||
|
||||
notification_targets = get_target_from_report_schedule(create_alert_email_chart)
|
||||
# Assert the email smtp address, asserts a notification was sent with the error
|
||||
assert email_mock.call_args[0][0] == notification_targets[0]
|
||||
|
||||
assert_log(
|
||||
ReportState.ERROR, error_message="A timeout occurred while executing the query."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"load_birth_names_dashboard_with_slices", "create_alert_email_chart"
|
||||
)
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
|
||||
def test_soft_timeout_screenshot(screenshot_mock, email_mock, create_alert_email_chart):
|
||||
"""
|
||||
ExecuteReport Command: Test soft timeout on screenshot
|
||||
"""
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from superset.reports.commands.exceptions import AlertQueryTimeout
|
||||
|
||||
screenshot_mock.side_effect = SoftTimeLimitExceeded()
|
||||
with pytest.raises(ReportScheduleScreenshotTimeout):
|
||||
AsyncExecuteReportScheduleCommand(
|
||||
create_alert_email_chart.id, datetime.utcnow()
|
||||
).run()
|
||||
|
||||
notification_targets = get_target_from_report_schedule(create_alert_email_chart)
|
||||
# Assert the email smtp address, asserts a notification was sent with the error
|
||||
assert email_mock.call_args[0][0] == notification_targets[0]
|
||||
|
||||
assert_log(
|
||||
ReportState.ERROR, error_message="A timeout occurred while taking a screenshot."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"load_birth_names_dashboard_with_slices", "create_alert_email_chart"
|
||||
)
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
|
||||
def test_fail_screenshot(screenshot_mock, email_mock, create_alert_email_chart):
|
||||
"""
|
||||
ExecuteReport Command: Test soft timeout on screenshot
|
||||
"""
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from superset.reports.commands.exceptions import AlertQueryTimeout
|
||||
|
||||
screenshot_mock.side_effect = Exception("Unexpected error")
|
||||
with pytest.raises(ReportScheduleScreenshotFailedError):
|
||||
AsyncExecuteReportScheduleCommand(
|
||||
create_alert_email_chart.id, datetime.utcnow()
|
||||
).run()
|
||||
|
||||
notification_targets = get_target_from_report_schedule(create_alert_email_chart)
|
||||
# Assert the email smtp address, asserts a notification was sent with the error
|
||||
assert email_mock.call_args[0][0] == notification_targets[0]
|
||||
|
||||
assert_log(
|
||||
ReportState.ERROR, error_message="Failed taking a screenshot Unexpected error"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
def test_invalid_sql_alert(email_mock, create_invalid_sql_alert_email_chart):
|
||||
|
@ -860,7 +945,7 @@ def test_grace_period_error(email_mock, create_invalid_sql_alert_email_chart):
|
|||
|
||||
@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
|
||||
@patch("superset.reports.notifications.email.send_email_smtp")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
|
||||
@patch("superset.utils.screenshots.ChartScreenshot.get_screenshot")
|
||||
def test_grace_period_error_flap(
|
||||
screenshot_mock, email_mock, create_invalid_sql_alert_email_chart
|
||||
):
|
||||
|
|
Loading…
Reference in New Issue