fix: Improve the reliability of alerts & reports (#25239)
This commit is contained in:
parent
a724850c84
commit
f672d5da5c
|
|
@ -914,6 +914,10 @@ DASHBOARD_AUTO_REFRESH_INTERVALS = [
|
|||
[86400, "24 hours"],
|
||||
]
|
||||
|
||||
# This is used as a workaround for the alerts & reports scheduler task to get the time
|
||||
# celery beat triggered it, see https://github.com/celery/celery/issues/6974 for details
|
||||
CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1)
|
||||
|
||||
# Default celery config is to use SQLA as a broker, in a production setting
|
||||
# you'll want to use a proper broker as specified here:
|
||||
# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html
|
||||
|
|
@ -942,6 +946,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
|
|||
"reports.scheduler": {
|
||||
"task": "reports.scheduler",
|
||||
"schedule": crontab(minute="*", hour="*"),
|
||||
"options": {"expires": int(CELERY_BEAT_SCHEDULER_EXPIRES.total_seconds())},
|
||||
},
|
||||
"reports.prune_log": {
|
||||
"task": "reports.prune_log",
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from datetime import datetime, timedelta, timezone as dt_timezone
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from croniter import croniter
|
||||
from pytz import timezone as pytz_timezone, UnknownTimeZoneError
|
||||
|
|
@ -27,10 +27,10 @@ from superset import app
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
|
||||
def cron_schedule_window(
|
||||
triggered_at: datetime, cron: str, timezone: str
|
||||
) -> Iterator[datetime]:
|
||||
window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"]
|
||||
# create a time-aware datetime in utc
|
||||
time_now = datetime.now(tz=dt_timezone.utc)
|
||||
try:
|
||||
tz = pytz_timezone(timezone)
|
||||
except UnknownTimeZoneError:
|
||||
|
|
@ -39,9 +39,9 @@ def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
|
|||
logger.warning("Timezone %s was invalid. Falling back to 'UTC'", timezone)
|
||||
utc = pytz_timezone("UTC")
|
||||
# convert the current time to the user's local time for comparison
|
||||
time_now = time_now.astimezone(tz)
|
||||
start_at = time_now - timedelta(seconds=1)
|
||||
stop_at = time_now + timedelta(seconds=window_size)
|
||||
time_now = triggered_at.astimezone(tz)
|
||||
start_at = time_now - timedelta(seconds=window_size / 2)
|
||||
stop_at = time_now + timedelta(seconds=window_size / 2)
|
||||
crons = croniter(cron, start_at)
|
||||
for schedule in crons.all_next(datetime):
|
||||
if schedule >= stop_at:
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from celery import Celery
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
|
|
@ -47,9 +48,15 @@ def scheduler() -> None:
|
|||
return
|
||||
with session_scope(nullpool=True) as session:
|
||||
active_schedules = ReportScheduleDAO.find_active(session)
|
||||
triggered_at = (
|
||||
datetime.fromisoformat(scheduler.request.expires)
|
||||
- app.config["CELERY_BEAT_SCHEDULER_EXPIRES"]
|
||||
if scheduler.request.expires
|
||||
else datetime.utcnow()
|
||||
)
|
||||
for active_schedule in active_schedules:
|
||||
for schedule in cron_schedule_window(
|
||||
active_schedule.crontab, active_schedule.timezone
|
||||
triggered_at, active_schedule.crontab, active_schedule.timezone
|
||||
):
|
||||
logger.info(
|
||||
"Scheduling alert %s eta: %s", active_schedule.name, schedule
|
||||
|
|
|
|||
|
|
@ -14,11 +14,9 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
import pytz
|
||||
from dateutil import parser
|
||||
from freezegun import freeze_time
|
||||
from freezegun.api import FakeDatetime
|
||||
|
||||
from superset.tasks.cron_util import cron_schedule_window
|
||||
|
|
@ -27,23 +25,28 @@ from superset.tasks.cron_util import cron_schedule_window
|
|||
@pytest.mark.parametrize(
|
||||
"current_dttm, cron, expected",
|
||||
[
|
||||
("2020-01-01T08:59:01Z", "0 1 * * *", []),
|
||||
("2020-01-01T08:59:01+00:00", "0 1 * * *", []),
|
||||
(
|
||||
"2020-01-01T08:59:02Z",
|
||||
"2020-01-01T08:59:32+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-01-01T08:59:59Z",
|
||||
"2020-01-01T08:59:59+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-01-01T09:00:00Z",
|
||||
"2020-01-01T09:00:00+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-01-01T09:00:01Z", "0 1 * * *", []),
|
||||
(
|
||||
"2020-01-01T09:00:01+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-01-01T09:00:30+00:00", "0 1 * * *", []),
|
||||
],
|
||||
)
|
||||
def test_cron_schedule_window_los_angeles(
|
||||
|
|
@ -53,34 +56,40 @@ def test_cron_schedule_window_los_angeles(
|
|||
Reports scheduler: Test cron schedule window for "America/Los_Angeles"
|
||||
"""
|
||||
|
||||
with freeze_time(current_dttm):
|
||||
datetimes = cron_schedule_window(cron, "America/Los_Angeles")
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
|
||||
== expected
|
||||
)
|
||||
datetimes = cron_schedule_window(
|
||||
datetime.fromisoformat(current_dttm), cron, "America/Los_Angeles"
|
||||
)
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"current_dttm, cron, expected",
|
||||
[
|
||||
("2020-01-01T00:59:01Z", "0 1 * * *", []),
|
||||
("2020-01-01T00:59:01+00:00", "0 1 * * *", []),
|
||||
("2020-01-01T00:59:02+00:00", "0 1 * * *", []),
|
||||
(
|
||||
"2020-01-01T00:59:02Z",
|
||||
"2020-01-01T00:59:59+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-01-01T00:59:59Z",
|
||||
"2020-01-01T01:00:00+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-01-01T01:00:00Z",
|
||||
"2020-01-01T01:00:01+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-01-01T01:00:01Z", "0 1 * * *", []),
|
||||
(
|
||||
"2020-01-01T01:00:29+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-01-01T01:00:30+00:00", "0 1 * * *", []),
|
||||
],
|
||||
)
|
||||
def test_cron_schedule_window_invalid_timezone(
|
||||
|
|
@ -90,35 +99,41 @@ def test_cron_schedule_window_invalid_timezone(
|
|||
Reports scheduler: Test cron schedule window for "invalid timezone"
|
||||
"""
|
||||
|
||||
with freeze_time(current_dttm):
|
||||
datetimes = cron_schedule_window(cron, "invalid timezone")
|
||||
# it should default to UTC
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
|
||||
== expected
|
||||
)
|
||||
datetimes = cron_schedule_window(
|
||||
datetime.fromisoformat(current_dttm), cron, "invalid timezone"
|
||||
)
|
||||
# it should default to UTC
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"current_dttm, cron, expected",
|
||||
[
|
||||
("2020-01-01T05:59:01Z", "0 1 * * *", []),
|
||||
("2020-01-01T05:59:01+00:00", "0 1 * * *", []),
|
||||
("2020-01-01T05:59:02+00:00", "0 1 * * *", []),
|
||||
(
|
||||
"2020-01-01T05:59:02Z",
|
||||
"2020-01-01T05:59:59+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-01-01T5:59:59Z",
|
||||
"2020-01-01T06:00:00+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-01-01T6:00:00",
|
||||
"2020-01-01T06:00:01+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-01-01T6:00:01Z", "0 1 * * *", []),
|
||||
(
|
||||
"2020-01-01T06:00:29+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-01-01T06:00:30+00:00", "0 1 * * *", []),
|
||||
],
|
||||
)
|
||||
def test_cron_schedule_window_new_york(
|
||||
|
|
@ -128,34 +143,40 @@ def test_cron_schedule_window_new_york(
|
|||
Reports scheduler: Test cron schedule window for "America/New_York"
|
||||
"""
|
||||
|
||||
with freeze_time(current_dttm, tz_offset=0):
|
||||
datetimes = cron_schedule_window(cron, "America/New_York")
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
|
||||
== expected
|
||||
)
|
||||
datetimes = cron_schedule_window(
|
||||
datetime.fromisoformat(current_dttm), cron, "America/New_York"
|
||||
)
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"current_dttm, cron, expected",
|
||||
[
|
||||
("2020-01-01T06:59:01Z", "0 1 * * *", []),
|
||||
("2020-01-01T06:59:01+00:00", "0 1 * * *", []),
|
||||
("2020-01-01T06:59:02+00:00", "0 1 * * *", []),
|
||||
(
|
||||
"2020-01-01T06:59:02Z",
|
||||
"2020-01-01T06:59:59+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-01-01T06:59:59Z",
|
||||
"2020-01-01T07:00:00+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-01-01T07:00:00",
|
||||
"2020-01-01T07:00:01+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-01-01T07:00:01Z", "0 1 * * *", []),
|
||||
(
|
||||
"2020-01-01T07:00:29+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-01-01T07:00:30+00:00", "0 1 * * *", []),
|
||||
],
|
||||
)
|
||||
def test_cron_schedule_window_chicago(
|
||||
|
|
@ -165,34 +186,40 @@ def test_cron_schedule_window_chicago(
|
|||
Reports scheduler: Test cron schedule window for "America/Chicago"
|
||||
"""
|
||||
|
||||
with freeze_time(current_dttm, tz_offset=0):
|
||||
datetimes = cron_schedule_window(cron, "America/Chicago")
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
|
||||
== expected
|
||||
)
|
||||
datetimes = cron_schedule_window(
|
||||
datetime.fromisoformat(current_dttm), cron, "America/Chicago"
|
||||
)
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"current_dttm, cron, expected",
|
||||
[
|
||||
("2020-07-01T05:59:01Z", "0 1 * * *", []),
|
||||
("2020-07-01T05:59:01+00:00", "0 1 * * *", []),
|
||||
("2020-07-01T05:59:02+00:00", "0 1 * * *", []),
|
||||
(
|
||||
"2020-07-01T05:59:02Z",
|
||||
"2020-07-01T05:59:59+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-07-01T05:59:59Z",
|
||||
"2020-07-01T06:00:00+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
(
|
||||
"2020-07-01T06:00:00",
|
||||
"2020-07-01T06:00:01+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-07-01T06:00:01Z", "0 1 * * *", []),
|
||||
(
|
||||
"2020-07-01T06:00:29+00:00",
|
||||
"0 1 * * *",
|
||||
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
|
||||
),
|
||||
("2020-07-01T06:00:30+00:00", "0 1 * * *", []),
|
||||
],
|
||||
)
|
||||
def test_cron_schedule_window_chicago_daylight(
|
||||
|
|
@ -202,9 +229,9 @@ def test_cron_schedule_window_chicago_daylight(
|
|||
Reports scheduler: Test cron schedule window for "America/Chicago"
|
||||
"""
|
||||
|
||||
with freeze_time(current_dttm, tz_offset=0):
|
||||
datetimes = cron_schedule_window(cron, "America/Chicago")
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
|
||||
== expected
|
||||
)
|
||||
datetimes = cron_schedule_window(
|
||||
datetime.fromisoformat(current_dttm), cron, "America/Chicago"
|
||||
)
|
||||
assert (
|
||||
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue