mirror of https://github.com/apache/superset.git
fix(reports): Clear last value when state is WORKING (#19941)
* fix(reports): Clear last value when state is WORKING * Update cbe71abde154_fix_report_schedule_and_log.py Co-authored-by: John Bodley <john.bodley@airbnb.com>
This commit is contained in:
parent
449d08b25e
commit
89a844a40f
|
@ -0,0 +1,84 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
"""fix report schedule and execution log
|
||||||
|
|
||||||
|
Revision ID: cbe71abde154
|
||||||
|
Revises: a9422eeaae74
|
||||||
|
Create Date: 2022-05-03 19:39:32.074608
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = "cbe71abde154"
|
||||||
|
down_revision = "a9422eeaae74"
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
from sqlalchemy import Column, Float, Integer, String, Text
|
||||||
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
|
|
||||||
|
from superset import db
|
||||||
|
from superset.models.reports import ReportState
|
||||||
|
|
||||||
|
Base = declarative_base()
|
||||||
|
|
||||||
|
|
||||||
|
class ReportExecutionLog(Base):
|
||||||
|
__tablename__ = "report_execution_log"
|
||||||
|
|
||||||
|
id = Column(Integer, primary_key=True)
|
||||||
|
state = Column(String(50), nullable=False)
|
||||||
|
value = Column(Float)
|
||||||
|
value_row_json = Column(Text)
|
||||||
|
|
||||||
|
|
||||||
|
class ReportSchedule(Base):
|
||||||
|
__tablename__ = "report_schedule"
|
||||||
|
|
||||||
|
id = Column(Integer, primary_key=True)
|
||||||
|
last_state = Column(String(50))
|
||||||
|
last_value = Column(Float)
|
||||||
|
last_value_row_json = Column(Text)
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
bind = op.get_bind()
|
||||||
|
session = db.Session(bind=bind)
|
||||||
|
|
||||||
|
for schedule in (
|
||||||
|
session.query(ReportSchedule)
|
||||||
|
.filter(ReportSchedule.last_state == ReportState.WORKING)
|
||||||
|
.all()
|
||||||
|
):
|
||||||
|
schedule.last_value = None
|
||||||
|
schedule.last_value_row_json = None
|
||||||
|
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
for execution_log in (
|
||||||
|
session.query(ReportExecutionLog)
|
||||||
|
.filter(ReportExecutionLog.state == ReportState.WORKING)
|
||||||
|
.all()
|
||||||
|
):
|
||||||
|
execution_log.value = None
|
||||||
|
execution_log.value_row_json = None
|
||||||
|
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
pass
|
|
@ -94,35 +94,39 @@ class BaseReportState:
|
||||||
self._start_dttm = datetime.utcnow()
|
self._start_dttm = datetime.utcnow()
|
||||||
self._execution_id = execution_id
|
self._execution_id = execution_id
|
||||||
|
|
||||||
def set_state_and_log(
|
def update_report_schedule_and_log(
|
||||||
self,
|
self,
|
||||||
state: ReportState,
|
state: ReportState,
|
||||||
error_message: Optional[str] = None,
|
error_message: Optional[str] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Updates current ReportSchedule state and TS. If on final state writes the log
|
Update the report schedule state et al. and reflect the change in the execution
|
||||||
for this execution
|
log.
|
||||||
"""
|
"""
|
||||||
now_dttm = datetime.utcnow()
|
|
||||||
self.set_state(state, now_dttm)
|
|
||||||
self.create_log(
|
|
||||||
state,
|
|
||||||
error_message=error_message,
|
|
||||||
)
|
|
||||||
|
|
||||||
def set_state(self, state: ReportState, dttm: datetime) -> None:
|
self.update_report_schedule(state)
|
||||||
|
self.create_log(error_message)
|
||||||
|
|
||||||
|
def update_report_schedule(self, state: ReportState) -> None:
|
||||||
"""
|
"""
|
||||||
Set the current report schedule state, on this case we want to
|
Update the report schedule state et al.
|
||||||
commit immediately
|
|
||||||
|
When the report state is WORKING we must ensure that the values from the last
|
||||||
|
execution run are cleared to ensure that they are not propagated to the
|
||||||
|
execution log.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if state == ReportState.WORKING:
|
||||||
|
self._report_schedule.last_value = None
|
||||||
|
self._report_schedule.last_value_row_json = None
|
||||||
|
|
||||||
self._report_schedule.last_state = state
|
self._report_schedule.last_state = state
|
||||||
self._report_schedule.last_eval_dttm = dttm
|
self._report_schedule.last_eval_dttm = datetime.utcnow()
|
||||||
|
|
||||||
self._session.merge(self._report_schedule)
|
self._session.merge(self._report_schedule)
|
||||||
self._session.commit()
|
self._session.commit()
|
||||||
|
|
||||||
def create_log(
|
def create_log(self, error_message: Optional[str] = None) -> None:
|
||||||
self, state: ReportState, error_message: Optional[str] = None
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Creates a Report execution log, uses the current computed last_value for Alerts
|
Creates a Report execution log, uses the current computed last_value for Alerts
|
||||||
"""
|
"""
|
||||||
|
@ -132,7 +136,7 @@ class BaseReportState:
|
||||||
end_dttm=datetime.utcnow(),
|
end_dttm=datetime.utcnow(),
|
||||||
value=self._report_schedule.last_value,
|
value=self._report_schedule.last_value,
|
||||||
value_row_json=self._report_schedule.last_value_row_json,
|
value_row_json=self._report_schedule.last_value_row_json,
|
||||||
state=state,
|
state=self._report_schedule.last_state,
|
||||||
error_message=error_message,
|
error_message=error_message,
|
||||||
report_schedule=self._report_schedule,
|
report_schedule=self._report_schedule,
|
||||||
uuid=self._execution_id,
|
uuid=self._execution_id,
|
||||||
|
@ -489,17 +493,19 @@ class ReportNotTriggeredErrorState(BaseReportState):
|
||||||
initial = True
|
initial = True
|
||||||
|
|
||||||
def next(self) -> None:
|
def next(self) -> None:
|
||||||
self.set_state_and_log(ReportState.WORKING)
|
self.update_report_schedule_and_log(ReportState.WORKING)
|
||||||
try:
|
try:
|
||||||
# If it's an alert check if the alert is triggered
|
# If it's an alert check if the alert is triggered
|
||||||
if self._report_schedule.type == ReportScheduleType.ALERT:
|
if self._report_schedule.type == ReportScheduleType.ALERT:
|
||||||
if not AlertCommand(self._report_schedule).run():
|
if not AlertCommand(self._report_schedule).run():
|
||||||
self.set_state_and_log(ReportState.NOOP)
|
self.update_report_schedule_and_log(ReportState.NOOP)
|
||||||
return
|
return
|
||||||
self.send()
|
self.send()
|
||||||
self.set_state_and_log(ReportState.SUCCESS)
|
self.update_report_schedule_and_log(ReportState.SUCCESS)
|
||||||
except CommandException as first_ex:
|
except CommandException as first_ex:
|
||||||
self.set_state_and_log(ReportState.ERROR, error_message=str(first_ex))
|
self.update_report_schedule_and_log(
|
||||||
|
ReportState.ERROR, error_message=str(first_ex)
|
||||||
|
)
|
||||||
# TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE
|
# TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE
|
||||||
if not self.is_in_error_grace_period():
|
if not self.is_in_error_grace_period():
|
||||||
try:
|
try:
|
||||||
|
@ -508,12 +514,12 @@ class ReportNotTriggeredErrorState(BaseReportState):
|
||||||
f" {self._report_schedule.name}",
|
f" {self._report_schedule.name}",
|
||||||
str(first_ex),
|
str(first_ex),
|
||||||
)
|
)
|
||||||
self.set_state_and_log(
|
self.update_report_schedule_and_log(
|
||||||
ReportState.ERROR,
|
ReportState.ERROR,
|
||||||
error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
|
error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
|
||||||
)
|
)
|
||||||
except CommandException as second_ex:
|
except CommandException as second_ex:
|
||||||
self.set_state_and_log(
|
self.update_report_schedule_and_log(
|
||||||
ReportState.ERROR, error_message=str(second_ex)
|
ReportState.ERROR, error_message=str(second_ex)
|
||||||
)
|
)
|
||||||
raise first_ex
|
raise first_ex
|
||||||
|
@ -532,13 +538,13 @@ class ReportWorkingState(BaseReportState):
|
||||||
def next(self) -> None:
|
def next(self) -> None:
|
||||||
if self.is_on_working_timeout():
|
if self.is_on_working_timeout():
|
||||||
exception_timeout = ReportScheduleWorkingTimeoutError()
|
exception_timeout = ReportScheduleWorkingTimeoutError()
|
||||||
self.set_state_and_log(
|
self.update_report_schedule_and_log(
|
||||||
ReportState.ERROR,
|
ReportState.ERROR,
|
||||||
error_message=str(exception_timeout),
|
error_message=str(exception_timeout),
|
||||||
)
|
)
|
||||||
raise exception_timeout
|
raise exception_timeout
|
||||||
exception_working = ReportSchedulePreviousWorkingError()
|
exception_working = ReportSchedulePreviousWorkingError()
|
||||||
self.set_state_and_log(
|
self.update_report_schedule_and_log(
|
||||||
ReportState.WORKING,
|
ReportState.WORKING,
|
||||||
error_message=str(exception_working),
|
error_message=str(exception_working),
|
||||||
)
|
)
|
||||||
|
@ -559,15 +565,15 @@ class ReportSuccessState(BaseReportState):
|
||||||
def next(self) -> None:
|
def next(self) -> None:
|
||||||
if self._report_schedule.type == ReportScheduleType.ALERT:
|
if self._report_schedule.type == ReportScheduleType.ALERT:
|
||||||
if self.is_in_grace_period():
|
if self.is_in_grace_period():
|
||||||
self.set_state_and_log(
|
self.update_report_schedule_and_log(
|
||||||
ReportState.GRACE,
|
ReportState.GRACE,
|
||||||
error_message=str(ReportScheduleAlertGracePeriodError()),
|
error_message=str(ReportScheduleAlertGracePeriodError()),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
self.set_state_and_log(ReportState.WORKING)
|
self.update_report_schedule_and_log(ReportState.WORKING)
|
||||||
try:
|
try:
|
||||||
if not AlertCommand(self._report_schedule).run():
|
if not AlertCommand(self._report_schedule).run():
|
||||||
self.set_state_and_log(ReportState.NOOP)
|
self.update_report_schedule_and_log(ReportState.NOOP)
|
||||||
return
|
return
|
||||||
except CommandException as ex:
|
except CommandException as ex:
|
||||||
self.send_error(
|
self.send_error(
|
||||||
|
@ -575,7 +581,7 @@ class ReportSuccessState(BaseReportState):
|
||||||
f" {self._report_schedule.name}",
|
f" {self._report_schedule.name}",
|
||||||
str(ex),
|
str(ex),
|
||||||
)
|
)
|
||||||
self.set_state_and_log(
|
self.update_report_schedule_and_log(
|
||||||
ReportState.ERROR,
|
ReportState.ERROR,
|
||||||
error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
|
error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
|
||||||
)
|
)
|
||||||
|
@ -583,9 +589,11 @@ class ReportSuccessState(BaseReportState):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.send()
|
self.send()
|
||||||
self.set_state_and_log(ReportState.SUCCESS)
|
self.update_report_schedule_and_log(ReportState.SUCCESS)
|
||||||
except CommandException as ex:
|
except CommandException as ex:
|
||||||
self.set_state_and_log(ReportState.ERROR, error_message=str(ex))
|
self.update_report_schedule_and_log(
|
||||||
|
ReportState.ERROR, error_message=str(ex)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ReportScheduleStateMachine: # pylint: disable=too-few-public-methods
|
class ReportScheduleStateMachine: # pylint: disable=too-few-public-methods
|
||||||
|
|
|
@ -122,6 +122,11 @@ def assert_log(state: str, error_message: Optional[str] = None):
|
||||||
assert state in log_states
|
assert state in log_states
|
||||||
assert error_message in [log.error_message for log in logs]
|
assert error_message in [log.error_message for log in logs]
|
||||||
|
|
||||||
|
for log in logs:
|
||||||
|
if log.state == ReportState.WORKING:
|
||||||
|
assert log.value is None
|
||||||
|
assert log.value_row_json is None
|
||||||
|
|
||||||
|
|
||||||
def create_report_notification(
|
def create_report_notification(
|
||||||
email_target: Optional[str] = None,
|
email_target: Optional[str] = None,
|
||||||
|
@ -370,11 +375,15 @@ def create_report_slack_chart_working():
|
||||||
)
|
)
|
||||||
report_schedule.last_state = ReportState.WORKING
|
report_schedule.last_state = ReportState.WORKING
|
||||||
report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0)
|
report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0)
|
||||||
|
report_schedule.last_value = None
|
||||||
|
report_schedule.last_value_row_json = None
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
log = ReportExecutionLog(
|
log = ReportExecutionLog(
|
||||||
scheduled_dttm=report_schedule.last_eval_dttm,
|
scheduled_dttm=report_schedule.last_eval_dttm,
|
||||||
start_dttm=report_schedule.last_eval_dttm,
|
start_dttm=report_schedule.last_eval_dttm,
|
||||||
end_dttm=report_schedule.last_eval_dttm,
|
end_dttm=report_schedule.last_eval_dttm,
|
||||||
|
value=report_schedule.last_value,
|
||||||
|
value_row_json=report_schedule.last_value_row_json,
|
||||||
state=ReportState.WORKING,
|
state=ReportState.WORKING,
|
||||||
report_schedule=report_schedule,
|
report_schedule=report_schedule,
|
||||||
uuid=uuid4(),
|
uuid=uuid4(),
|
||||||
|
|
Loading…
Reference in New Issue