fix: Use Celery task ETA for alert/report schedule (#24537)
This commit is contained in:
parent
0986423d30
commit
e402c94a9f
|
|
@ -18,7 +18,6 @@ import logging
|
|||
|
||||
from celery import Celery
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from dateutil import parser
|
||||
|
||||
from superset import app, is_feature_enabled
|
||||
from superset.commands.exceptions import CommandException
|
||||
|
|
@ -64,21 +63,15 @@ def scheduler() -> None:
|
|||
active_schedule.working_timeout
|
||||
+ app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"]
|
||||
)
|
||||
execute.apply_async(
|
||||
(
|
||||
active_schedule.id,
|
||||
schedule,
|
||||
),
|
||||
**async_options,
|
||||
)
|
||||
execute.apply_async((active_schedule.id,), **async_options)
|
||||
|
||||
|
||||
@celery_app.task(name="reports.execute", bind=True)
|
||||
def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) -> None:
|
||||
def execute(self: Celery.task, report_schedule_id: int) -> None:
|
||||
task_id = None
|
||||
try:
|
||||
task_id = execute.request.id
|
||||
scheduled_dttm_ = parser.parse(scheduled_dttm)
|
||||
scheduled_dttm = execute.request.eta
|
||||
logger.info(
|
||||
"Executing alert/report, task id: %s, scheduled_dttm: %s",
|
||||
task_id,
|
||||
|
|
@ -87,7 +80,7 @@ def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) ->
|
|||
AsyncExecuteReportScheduleCommand(
|
||||
task_id,
|
||||
report_schedule_id,
|
||||
scheduled_dttm_,
|
||||
scheduled_dttm,
|
||||
).run()
|
||||
except ReportScheduleUnexpectedError:
|
||||
logger.exception(
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ def test_execute_task(update_state_mock, command_mock, init_mock, owners):
|
|||
init_mock.return_value = None
|
||||
command_mock.side_effect = ReportScheduleUnexpectedError("Unexpected error")
|
||||
with freeze_time("2020-01-01T09:00:00Z"):
|
||||
execute(report_schedule.id, "2020-01-01T09:00:00Z")
|
||||
execute(report_schedule.id)
|
||||
update_state_mock.assert_called_with(state="FAILURE")
|
||||
|
||||
db.session.delete(report_schedule)
|
||||
|
|
@ -199,7 +199,7 @@ def test_execute_task_with_command_exception(
|
|||
init_mock.return_value = None
|
||||
command_mock.side_effect = CommandException("Unexpected error")
|
||||
with freeze_time("2020-01-01T09:00:00Z"):
|
||||
execute(report_schedule.id, "2020-01-01T09:00:00Z")
|
||||
execute(report_schedule.id)
|
||||
update_state_mock.assert_called_with(state="FAILURE")
|
||||
logger_mock.exception.assert_called_with(
|
||||
"A downstream exception occurred while generating a report: None. Unexpected error",
|
||||
|
|
|
|||
Loading…
Reference in New Issue