fix(alerts/reports): working timeout with celery kill and logic fix (#13911)

* fix: working timeout with celery kill and logic fix

* add config flags

* fix typo

* fix python lint

* log query time for alerts

* add tests

* fix lint
This commit is contained in:
Daniel Vaz Gaspar 2021-04-08 11:23:31 +01:00 committed by GitHub
parent b427a80a4f
commit 89817d4cee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 166 additions and 11 deletions

View File

@ -929,6 +929,14 @@ ENABLE_ALERTS = False
# Used for Alerts/Reports (Feature flask ALERT_REPORTS) to set the size for the
# sliding cron window size, should be synced with the celery beat config minus 1 second
ALERT_REPORTS_CRON_WINDOW_SIZE = 59
ALERT_REPORTS_WORKING_TIME_OUT_KILL = True
# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout
# Equal to working timeout + ALERT_REPORTS_WORKING_TIME_OUT_LAG
ALERT_REPORTS_WORKING_TIME_OUT_LAG = 10
# if ALERT_REPORTS_WORKING_TIME_OUT_KILL is True, set a celery hard timeout
# Equal to working timeout + ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG
ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG = 1
# A custom prefix to use on all Alerts & Reports emails
EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] "

View File

@ -17,6 +17,7 @@
import json
import logging
from operator import eq, ge, gt, le, lt, ne
from timeit import default_timer
from typing import Optional
import numpy as np
@ -145,7 +146,15 @@ class AlertCommand(BaseCommand):
limited_rendered_sql = self._report_schedule.database.apply_limit_to_sql(
rendered_sql, ALERT_SQL_LIMIT
)
return self._report_schedule.database.get_df(limited_rendered_sql)
start = default_timer()
df = self._report_schedule.database.get_df(limited_rendered_sql)
stop = default_timer()
logger.info(
"Query for %s took %.2f ms",
self._report_schedule.name,
(stop - start) * 1000.0,
)
return df
except SoftTimeLimitExceeded:
raise AlertQueryTimeout()
except Exception as ex:

View File

@ -292,12 +292,17 @@ class BaseReportState:
"""
Checks if an alert is on a working timeout
"""
last_working = ReportScheduleDAO.find_last_entered_working_log(
self._report_schedule, session=self._session
)
if not last_working:
return False
return (
self._report_schedule.working_timeout is not None
and self._report_schedule.last_eval_dttm is not None
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.working_timeout)
> self._report_schedule.last_eval_dttm
> last_working.end_dttm
)
def next(self) -> None:

View File

@ -226,6 +226,25 @@ class ReportScheduleDAO(BaseDAO):
.first()
)
@staticmethod
def find_last_entered_working_log(
report_schedule: ReportSchedule, session: Optional[Session] = None,
) -> Optional[ReportExecutionLog]:
"""
Finds last success execution log for a given report
"""
session = session or db.session
return (
session.query(ReportExecutionLog)
.filter(
ReportExecutionLog.state == ReportState.WORKING,
ReportExecutionLog.report_schedule == report_schedule,
ReportExecutionLog.error_message.is_(None),
)
.order_by(ReportExecutionLog.end_dttm.desc())
.first()
)
@staticmethod
def find_last_error_notification(
report_schedule: ReportSchedule, session: Optional[Session] = None,

View File

@ -57,7 +57,20 @@ def scheduler() -> None:
logger.info(
"Scheduling alert %s eta: %s", active_schedule.name, schedule
)
execute.apply_async((active_schedule.id, schedule,), eta=schedule)
async_options = {"eta": schedule}
if (
active_schedule.working_timeout is not None
and app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"]
):
async_options["time_limit"] = (
active_schedule.working_timeout
+ app.config["ALERT_REPORTS_WORKING_TIME_OUT_LAG"]
)
async_options["soft_time_limit"] = (
active_schedule.working_timeout
+ app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"]
)
execute.apply_async((active_schedule.id, schedule,), **async_options)
@celery_app.task(name="reports.execute")

View File

@ -100,9 +100,9 @@ def assert_log(state: str, error_message: Optional[str] = None):
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
if state == ReportState.WORKING:
assert len(logs) == 1
assert logs[0].error_message == error_message
assert logs[0].state == state
assert len(logs) == 2
assert logs[1].error_message == error_message
assert logs[1].state == state
return
# On error we send an email
if state == ReportState.ERROR:
@ -232,6 +232,17 @@ def create_report_slack_chart_working():
report_schedule.last_state = ReportState.WORKING
report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0)
db.session.commit()
log = ReportExecutionLog(
scheduled_dttm=report_schedule.last_eval_dttm,
start_dttm=report_schedule.last_eval_dttm,
end_dttm=report_schedule.last_eval_dttm,
state=ReportState.WORKING,
report_schedule=report_schedule,
uuid=uuid4(),
)
db.session.add(log)
db.session.commit()
yield report_schedule
cleanup_report_schedule(report_schedule)
@ -638,12 +649,11 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working):
"""
ExecuteReport Command: Test report schedule still working but should timed out
"""
# setup screenshot mock
current_time = create_report_slack_chart_working.last_eval_dttm + timedelta(
seconds=create_report_slack_chart_working.working_timeout + 1
)
with freeze_time(current_time):
with pytest.raises(ReportScheduleWorkingTimeoutError):
AsyncExecuteReportScheduleCommand(
test_id, create_report_slack_chart_working.id, datetime.utcnow()
@ -652,9 +662,10 @@ def test_report_schedule_working_timeout(create_report_slack_chart_working):
# Only needed for MySQL, understand why
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
assert len(logs) == 1
assert logs[0].error_message == ReportScheduleWorkingTimeoutError.message
assert logs[0].state == ReportState.ERROR
# Two logs, first is created by fixture
assert len(logs) == 2
assert logs[1].error_message == ReportScheduleWorkingTimeoutError.message
assert logs[1].state == ReportState.ERROR
assert create_report_slack_chart_working.last_state == ReportState.ERROR

View File

@ -0,0 +1,88 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List
from unittest.mock import patch
import pytest
from freezegun import freeze_time
from freezegun.api import FakeDatetime # type: ignore
from superset.extensions import db
from superset.models.reports import ReportScheduleType
from superset.tasks.scheduler import cron_schedule_window, scheduler
from tests.reports.utils import insert_report_schedule
from tests.test_app import app
@pytest.mark.parametrize(
"current_dttm, cron, excepted",
[
("2020-01-01T08:59:01Z", "0 9 * * *", []),
("2020-01-01T08:59:02Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T08:59:59Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T09:00:00Z", "0 9 * * *", [FakeDatetime(2020, 1, 1, 9, 0)]),
("2020-01-01T09:00:01Z", "0 9 * * *", []),
],
)
def test_cron_schedule_window(
current_dttm: str, cron: str, excepted: List[FakeDatetime]
):
"""
Reports scheduler: Test cron schedule window
"""
with app.app_context():
with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron)
assert list(datetimes) == excepted
@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_timeout(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *",
)
with freeze_time("2020-01-01T09:00:00Z"):
scheduler()
assert execute_mock.call_args[1]["soft_time_limit"] == 3601
assert execute_mock.call_args[1]["time_limit"] == 3610
db.session.delete(report_schedule)
db.session.commit()
@patch("superset.tasks.scheduler.execute.apply_async")
def test_scheduler_celery_no_timeout(execute_mock):
"""
Reports scheduler: Test scheduler setting celery soft and hard timeout
"""
with app.app_context():
app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] = False
report_schedule = insert_report_schedule(
type=ReportScheduleType.ALERT, name=f"report", crontab=f"0 9 * * *",
)
with freeze_time("2020-01-01T09:00:00Z"):
scheduler()
assert execute_mock.call_args[1] == {"eta": FakeDatetime(2020, 1, 1, 9, 0)}
db.session.delete(report_schedule)
db.session.commit()

View File

@ -97,6 +97,8 @@ DATA_CACHE_CONFIG = {
GLOBAL_ASYNC_QUERIES_JWT_SECRET = "test-secret-change-me-test-secret-change-me"
ALERT_REPORTS_WORKING_TIME_OUT_KILL = True
class CeleryConfig(object):
BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_CELERY_DB}"