feat: add timezones to report cron (#15849)

* add timezones to report cron

* fix test
This commit is contained in:
Elizabeth Thompson 2021-07-27 18:28:24 -07:00 committed by GitHub
parent a3f54a6c7b
commit ea49aa3d2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 351 additions and 49 deletions

View File

@ -111,6 +111,7 @@ class ReportSchedule(Model, AuditMixinNullable):
creation_method = Column(
String(255), server_default=ReportCreationMethodType.ALERTS_REPORTS
)
timezone = Column(String(100), default="UTC", nullable=False)
report_format = Column(String(50), default=ReportDataFormat.VISUALIZATION)
sql = Column(Text())
# (Alerts/Reports) M-O to chart

View File

@ -106,6 +106,7 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi):
"recipients.type",
"report_format",
"sql",
"timezone",
"type",
"validator_config_json",
"validator_type",
@ -136,6 +137,7 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi):
"owners.last_name",
"recipients.id",
"recipients.type",
"timezone",
"type",
]
add_columns = [
@ -154,6 +156,7 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi):
"recipients",
"report_format",
"sql",
"timezone",
"type",
"validator_config_json",
"validator_type",

View File

@ -57,6 +57,7 @@ crontab_description = (
"[Crontab Guru](https://crontab.guru/) is "
"a helpful resource that can help you craft a CRON expression."
)
timezone_description = "A timezone string that represents the location of the timezone."
sql_description = (
"A SQL statement that defines whether the alert should get triggered or "
"not. The query is expected to return either NULL or a number value."
@ -152,6 +153,7 @@ class ReportSchedulePostSchema(Schema):
allow_none=False,
required=True,
)
timezone = fields.String(description=timezone_description, default="UTC")
sql = fields.String(
description=sql_description, example="SELECT value FROM time_series_table"
)
@ -231,6 +233,7 @@ class ReportSchedulePutSchema(Schema):
validate=[validate_crontab, Length(1, 1000)],
required=False,
)
timezone = fields.String(description=timezone_description, default="UTC")
sql = fields.String(
description=sql_description,
example="SELECT value FROM time_series_table",

View File

@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime, timedelta, timezone as dt_timezone
from typing import Iterator
import pytz
from croniter import croniter
from superset import app
def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"]
# create a time-aware datetime in utc
time_now = datetime.now(tz=dt_timezone.utc)
tz = pytz.timezone(timezone)
utc = pytz.timezone("UTC")
# convert the current time to the user's local time for comparison
time_now = time_now.astimezone(tz)
start_at = time_now - timedelta(seconds=1)
stop_at = time_now + timedelta(seconds=window_size)
crons = croniter(cron, start_at)
for schedule in crons.all_next(datetime):
if schedule >= stop_at:
break
# convert schedule back to utc
yield schedule.astimezone(utc).replace(tzinfo=None)

View File

@ -15,10 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime, timedelta
from typing import Iterator
import croniter
from celery.exceptions import SoftTimeLimitExceeded
from dateutil import parser
@ -29,23 +26,12 @@ from superset.reports.commands.exceptions import ReportScheduleUnexpectedError
from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand
from superset.reports.commands.log_prune import AsyncPruneReportScheduleLogCommand
from superset.reports.dao import ReportScheduleDAO
from superset.tasks.cron_util import cron_schedule_window
from superset.utils.celery import session_scope
logger = logging.getLogger(__name__)
def cron_schedule_window(cron: str) -> Iterator[datetime]:
window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"]
utc_now = datetime.utcnow()
start_at = utc_now - timedelta(seconds=1)
stop_at = utc_now + timedelta(seconds=window_size)
crons = croniter.croniter(cron, start_at)
for schedule in crons.all_next(datetime):
if schedule >= stop_at:
break
yield schedule
@celery_app.task(name="reports.scheduler")
def scheduler() -> None:
"""
@ -54,7 +40,9 @@ def scheduler() -> None:
with session_scope(nullpool=True) as session:
active_schedules = ReportScheduleDAO.find_active(session)
for active_schedule in active_schedules:
for schedule in cron_schedule_window(active_schedule.crontab):
for schedule in cron_schedule_window(
active_schedule.crontab, active_schedule.timezone
):
logger.info(
"Scheduling alert %s eta: %s", active_schedule.name, schedule
)

View File

@ -60,10 +60,10 @@ class TestReportSchedulesApi(SupersetTestCase):
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT,
name=f"name_working",
crontab=f"* * * * *",
sql=f"SELECT value from table",
description=f"Report working",
name="name_working",
crontab="* * * * *",
sql="SELECT value from table",
description="Report working",
chart=chart,
database=example_db,
owners=[admin_user, alpha_user],
@ -205,6 +205,7 @@ class TestReportSchedulesApi(SupersetTestCase):
"type": "Email",
}
],
"timezone": report_schedule.timezone,
"type": report_schedule.type,
"validator_config_json": report_schedule.validator_config_json,
"validator_type": report_schedule.validator_type,
@ -279,6 +280,7 @@ class TestReportSchedulesApi(SupersetTestCase):
"name",
"owners",
"recipients",
"timezone",
"type",
]
assert rv.status_code == 200
@ -302,7 +304,7 @@ class TestReportSchedulesApi(SupersetTestCase):
ReportSchedule Api: Test sorting on get list report schedules
"""
self.login(username="admin")
uri = f"api/v1/report/"
uri = "api/v1/report/"
order_columns = [
"active",
@ -674,6 +676,61 @@ class TestReportSchedulesApi(SupersetTestCase):
rv = self.client.post(uri, json=report_schedule_data)
assert rv.status_code == 201
# Test that report cannot be created with null timezone
report_schedule_data = {
"type": ReportScheduleType.ALERT,
"name": "new5",
"description": "description",
"creation_method": ReportCreationMethodType.ALERTS_REPORTS,
"crontab": "0 9 * * *",
"recipients": [
{
"type": ReportRecipientType.EMAIL,
"recipient_config_json": {"target": "target@superset.org"},
},
{
"type": ReportRecipientType.SLACK,
"recipient_config_json": {"target": "channel"},
},
],
"working_timeout": 3600,
"timezone": None,
"dashboard": dashboard.id,
"database": example_db.id,
}
rv = self.client.post(uri, json=report_schedule_data)
assert rv.status_code == 400
data = json.loads(rv.data.decode("utf-8"))
assert data == {"message": {"timezone": ["Field may not be null."]}}
# Test that report should reflect the timezone value passed in
report_schedule_data = {
"type": ReportScheduleType.ALERT,
"name": "new6",
"description": "description",
"creation_method": ReportCreationMethodType.ALERTS_REPORTS,
"crontab": "0 9 * * *",
"recipients": [
{
"type": ReportRecipientType.EMAIL,
"recipient_config_json": {"target": "target@superset.org"},
},
{
"type": ReportRecipientType.SLACK,
"recipient_config_json": {"target": "channel"},
},
],
"working_timeout": 3600,
"timezone": "America/Los_Angeles",
"dashboard": dashboard.id,
"database": example_db.id,
}
uri = "api/v1/report/"
rv = self.client.post(uri, json=report_schedule_data)
data = json.loads(rv.data.decode("utf-8"))
assert data["result"]["timezone"] == "America/Los_Angeles"
assert rv.status_code == 201
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
def test_create_report_schedule_chart_dash_validation(self):
"""

View File

@ -23,43 +23,23 @@ from freezegun.api import FakeDatetime # type: ignore
from superset.extensions import db
from superset.models.reports import ReportScheduleType
from superset.tasks.scheduler import cron_schedule_window, scheduler
from superset.tasks.scheduler import scheduler
from tests.integration_tests.reports.utils import insert_report_schedule
from tests.integration_tests.test_app import app
@pytest.mark.parametrize(
"current_dttm, cron, excepted",
[
("2020-01-01T08:59:01Z", "0 9 * * *", []),
("2020-01-01T08:59:02Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T08:59:59Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T09:00:00Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T09:00:01Z", "0 9 * * *", []),
],
)
def test_cron_schedule_window(
current_dttm: str, cron: str, excepted: List[FakeDatetime]
):
"""
Reports scheduler: Test cron schedule window
"""
with app.app_context():
with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron)
assert list(datetimes) == excepted
@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_timeout(execute_mock):
def test_scheduler_celery_timeout_ny(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *",
type=ReportScheduleType.ALERT,
name="report",
crontab="0 4 * * *",
timezone="America/New_York",
)
with freeze_time("2020-01-01T09:00:00Z"):
@ -71,14 +51,17 @@ def test_scheduler_celery_timeout(execute_mock):
@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_no_timeout(execute_mock):
def test_scheduler_celery_no_timeout_ny(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():
app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = False
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *",
type=ReportScheduleType.ALERT,
name="report",
crontab="0 4 * * *",
timezone="America/New_York",
)
with freeze_time("2020-01-01T09:00:00Z"):
@ -86,3 +69,49 @@ def test_scheduler_celery_no_timeout(execute_mock):
assert execute_mock.call_args[1] == {"eta": FakeDatetime(2020, 1, 1, 9, 0)}
db.session.delete(report_schedule)
db.session.commit()
app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = True
@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_timeout_utc(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT,
name="report",
crontab="0 9 * * *",
timezone="UTC",
)
with freeze_time("2020-01-01T09:00:00Z"):
scheduler()
print(execute_mock.call_args)
assert execute_mock.call_args[1]["soft_time_limit"] == 3601
assert execute_mock.call_args[1]["time_limit"] == 3610
db.session.delete(report_schedule)
db.session.commit()
@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_no_timeout_utc(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():
app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = False
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT,
name="report",
crontab="0 9 * * *",
timezone="UTC",
)
with freeze_time("2020-01-01T09:00:00Z"):
scheduler()
assert execute_mock.call_args[1] == {"eta": FakeDatetime(2020, 1, 1, 9, 0)}
db.session.delete(report_schedule)
db.session.commit()
app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = True

View File

@ -36,6 +36,7 @@ def insert_report_schedule(
type: str,
name: str,
crontab: str,
timezone: Optional[str] = None,
sql: Optional[str] = None,
description: Optional[str] = None,
chart: Optional[Slice] = None,
@ -59,6 +60,7 @@ def insert_report_schedule(
type=type,
name=name,
crontab=crontab,
timezone=timezone,
sql=sql,
description=description,
chart=chart,

View File

@ -0,0 +1,177 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=unused-argument, invalid-name
from datetime import datetime
from typing import List
import pytest
import pytz
from dateutil import parser
from flask.ctx import AppContext
from freezegun import freeze_time
from freezegun.api import FakeDatetime # type: ignore
from superset.tasks.cron_util import cron_schedule_window
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T08:59:01Z", "0 1 * * *", []),
(
"2020-01-01T08:59:02Z",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T08:59:59Z",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T09:00:00Z",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T09:00:01Z", "0 1 * * *", []),
],
)
def test_cron_schedule_window_los_angeles(
app_context: AppContext, current_dttm: str, cron: str, expected: List[FakeDatetime]
) -> None:
"""
Reports scheduler: Test cron schedule window for "America/Los_Angeles"
"""
with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron, "America/Los_Angeles")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T05:59:01Z", "0 1 * * *", []),
(
"2020-01-01T05:59:02Z",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T5:59:59Z",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T6:00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T6:00:01Z", "0 1 * * *", []),
],
)
def test_cron_schedule_window_new_york(
app_context: AppContext, current_dttm: str, cron: str, expected: List[FakeDatetime]
) -> None:
"""
Reports scheduler: Test cron schedule window for "America/New_York"
"""
with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/New_York")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T06:59:01Z", "0 1 * * *", []),
(
"2020-01-01T06:59:02Z",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T06:59:59Z",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T07:00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T07:00:01Z", "0 1 * * *", []),
],
)
def test_cron_schedule_window_chicago(
app_context: AppContext, current_dttm: str, cron: str, expected: List[FakeDatetime]
) -> None:
"""
Reports scheduler: Test cron schedule window for "America/Chicago"
"""
with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/Chicago")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-07-01T05:59:01Z", "0 1 * * *", []),
(
"2020-07-01T05:59:02Z",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-07-01T05:59:59Z",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-07-01T06:00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-07-01T06:00:01Z", "0 1 * * *", []),
],
)
def test_cron_schedule_window_chicago_daylight(
app_context: AppContext, current_dttm: str, cron: str, expected: List[FakeDatetime]
) -> None:
"""
Reports scheduler: Test cron schedule window for "America/Chicago"
"""
with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/Chicago")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)