feat: Mutate SQL query executed by alerts (#31840)
This commit is contained in:
parent
d4bd20ffb4
commit
754ccd0448
|
|
@ -20,6 +20,7 @@ import logging
|
|||
from operator import eq, ge, gt, le, lt, ne
|
||||
from timeit import default_timer
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
|
@ -40,6 +41,7 @@ from superset.reports.models import ReportSchedule, ReportScheduleValidatorType
|
|||
from superset.tasks.utils import get_executor
|
||||
from superset.utils import json
|
||||
from superset.utils.core import override_user
|
||||
from superset.utils.decorators import logs_context
|
||||
from superset.utils.retries import retry_call
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -52,8 +54,9 @@ OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne}
|
|||
|
||||
|
||||
class AlertCommand(BaseCommand):
|
||||
def __init__(self, report_schedule: ReportSchedule):
|
||||
def __init__(self, report_schedule: ReportSchedule, execution_id: UUID):
|
||||
self._report_schedule = report_schedule
|
||||
self._execution_id = execution_id
|
||||
self._result: float | None = None
|
||||
|
||||
def run(self) -> bool:
|
||||
|
|
@ -135,6 +138,13 @@ class AlertCommand(BaseCommand):
|
|||
self._report_schedule.validator_type == ReportScheduleValidatorType.OPERATOR
|
||||
)
|
||||
|
||||
def _get_alert_metadata_from_object(self) -> dict[str, Any]:
|
||||
return {
|
||||
"report_schedule_id": self._report_schedule.id,
|
||||
"execution_id": self._execution_id,
|
||||
}
|
||||
|
||||
@logs_context(context_func=_get_alert_metadata_from_object)
|
||||
def _execute_query(self) -> pd.DataFrame:
|
||||
"""
|
||||
Executes the actual alert SQL query template
|
||||
|
|
@ -152,6 +162,13 @@ class AlertCommand(BaseCommand):
|
|||
rendered_sql, ALERT_SQL_LIMIT
|
||||
)
|
||||
|
||||
if app.config["MUTATE_ALERT_QUERY"]:
|
||||
limited_rendered_sql = (
|
||||
self._report_schedule.database.mutate_sql_based_on_config(
|
||||
limited_rendered_sql
|
||||
)
|
||||
)
|
||||
|
||||
executor, username = get_executor( # pylint: disable=unused-variable
|
||||
executor_types=app.config["ALERT_REPORTS_EXECUTE_AS"],
|
||||
model=self._report_schedule,
|
||||
|
|
|
|||
|
|
@ -698,7 +698,7 @@ class ReportNotTriggeredErrorState(BaseReportState):
|
|||
try:
|
||||
# If it's an alert check if the alert is triggered
|
||||
if self._report_schedule.type == ReportScheduleType.ALERT:
|
||||
if not AlertCommand(self._report_schedule).run():
|
||||
if not AlertCommand(self._report_schedule, self._execution_id).run():
|
||||
self.update_report_schedule_and_log(ReportState.NOOP)
|
||||
return
|
||||
self.send()
|
||||
|
|
@ -782,7 +782,7 @@ class ReportSuccessState(BaseReportState):
|
|||
return
|
||||
self.update_report_schedule_and_log(ReportState.WORKING)
|
||||
try:
|
||||
if not AlertCommand(self._report_schedule).run():
|
||||
if not AlertCommand(self._report_schedule, self._execution_id).run():
|
||||
self.update_report_schedule_and_log(ReportState.NOOP)
|
||||
return
|
||||
except Exception as ex:
|
||||
|
|
|
|||
|
|
@ -1380,6 +1380,10 @@ def SQL_QUERY_MUTATOR( # pylint: disable=invalid-name,unused-argument # noqa:
|
|||
MUTATE_AFTER_SPLIT = False
|
||||
|
||||
|
||||
# Boolean config that determines if alert SQL queries should also be mutated or not.
|
||||
MUTATE_ALERT_QUERY = False
|
||||
|
||||
|
||||
# This allows for a user to add header data to any outgoing emails. For example,
|
||||
# if you need to include metadata in the header or you want to change the specifications
|
||||
# of the email title, header, or sender.
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# pylint: disable=invalid-name, unused-argument, import-outside-toplevel
|
||||
import uuid
|
||||
from contextlib import nullcontext, suppress
|
||||
from typing import Optional, Union
|
||||
|
||||
|
|
@ -84,7 +85,7 @@ def test_execute_query_as_report_executor(
|
|||
database=get_example_database(),
|
||||
validator_config_json='{"op": "==", "threshold": 1}',
|
||||
)
|
||||
command = AlertCommand(report_schedule=report_schedule)
|
||||
command = AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4())
|
||||
override_user_mock = mocker.patch("superset.commands.report.alert.override_user")
|
||||
cm = (
|
||||
pytest.raises(type(expected_result))
|
||||
|
|
@ -98,6 +99,83 @@ def test_execute_query_as_report_executor(
|
|||
app.config["ALERT_REPORTS_EXECUTE_AS"] = original_config
|
||||
|
||||
|
||||
def test_execute_query_mutate_query_enabled(
|
||||
mocker: MockerFixture,
|
||||
app_context: AppContext,
|
||||
get_user,
|
||||
) -> None:
|
||||
from superset.commands.report.alert import AlertCommand
|
||||
from superset.reports.models import ReportSchedule
|
||||
|
||||
default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"]
|
||||
|
||||
app.config["MUTATE_ALERT_QUERY"] = True
|
||||
mocker.patch("superset.commands.report.alert.override_user")
|
||||
mock_df = mocker.MagicMock(spec=pd.DataFrame)
|
||||
mock_df.empty = True
|
||||
mock_database = get_example_database()
|
||||
mock_get_df = mocker.patch.object(mock_database, "get_df", return_value=mock_df)
|
||||
mock_limited_sql = mocker.patch.object(mock_database, "apply_limit_to_sql")
|
||||
mock_mutate_call = mocker.patch.object(mock_database, "mutate_sql_based_on_config")
|
||||
|
||||
report_schedule = ReportSchedule(
|
||||
created_by=get_user("admin"),
|
||||
owners=[get_user("admin")],
|
||||
type=ReportScheduleType.ALERT,
|
||||
description="description",
|
||||
crontab="0 9 * * *",
|
||||
creation_method=ReportCreationMethod.ALERTS_REPORTS,
|
||||
sql="SELECT 1",
|
||||
grace_period=14400,
|
||||
working_timeout=3600,
|
||||
database=mock_database,
|
||||
validator_config_json='{"op": "==", "threshold": 1}',
|
||||
)
|
||||
AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run()
|
||||
|
||||
mock_mutate_call.assert_called_once_with(mock_limited_sql.return_value)
|
||||
mock_get_df.assert_called_once_with(sql=mock_mutate_call.return_value)
|
||||
|
||||
app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff
|
||||
|
||||
|
||||
def test_execute_query_mutate_query_disabled(
|
||||
mocker: MockerFixture,
|
||||
app_context: AppContext,
|
||||
get_user,
|
||||
) -> None:
|
||||
from superset.commands.report.alert import AlertCommand
|
||||
from superset.reports.models import ReportSchedule
|
||||
|
||||
default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"]
|
||||
|
||||
app.config["MUTATE_ALERT_QUERY"] = False
|
||||
mocker.patch("superset.commands.report.alert.override_user")
|
||||
mock_database = mocker.MagicMock()
|
||||
|
||||
report_schedule = ReportSchedule(
|
||||
created_by=get_user("admin"),
|
||||
owners=[get_user("admin")],
|
||||
type=ReportScheduleType.ALERT,
|
||||
description="description",
|
||||
crontab="0 9 * * *",
|
||||
creation_method=ReportCreationMethod.ALERTS_REPORTS,
|
||||
sql="SELECT 1",
|
||||
grace_period=14400,
|
||||
working_timeout=3600,
|
||||
database=mock_database,
|
||||
validator_config_json='{"op": "==", "threshold": 1}',
|
||||
)
|
||||
AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run()
|
||||
|
||||
mock_database.mutate_sql_based_on_config.assert_not_called()
|
||||
mock_database.get_df.assert_called_once_with(
|
||||
sql=mock_database.apply_limit_to_sql.return_value
|
||||
)
|
||||
|
||||
app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff
|
||||
|
||||
|
||||
def test_execute_query_succeeded_no_retry(
|
||||
mocker: MockerFixture, app_context: None
|
||||
) -> None:
|
||||
|
|
@ -108,7 +186,7 @@ def test_execute_query_succeeded_no_retry(
|
|||
side_effect=lambda: pd.DataFrame([{"sample_col": 0}]),
|
||||
)
|
||||
|
||||
command = AlertCommand(report_schedule=mocker.Mock())
|
||||
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())
|
||||
|
||||
command.validate()
|
||||
|
||||
|
|
@ -140,7 +218,7 @@ def test_execute_query_succeeded_with_retries(
|
|||
execute_query_mock.side_effect = _mocked_execute_query
|
||||
execute_query_mock.__name__ = "mocked_execute_query"
|
||||
|
||||
command = AlertCommand(report_schedule=mocker.Mock())
|
||||
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())
|
||||
|
||||
command.validate()
|
||||
|
||||
|
|
@ -162,7 +240,7 @@ def test_execute_query_failed_no_retry(
|
|||
execute_query_mock.side_effect = _mocked_execute_query
|
||||
execute_query_mock.__name__ = "mocked_execute_query"
|
||||
|
||||
command = AlertCommand(report_schedule=mocker.Mock())
|
||||
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())
|
||||
|
||||
with suppress(AlertQueryTimeout):
|
||||
command.validate()
|
||||
|
|
@ -184,9 +262,42 @@ def test_execute_query_failed_max_retries(
|
|||
execute_query_mock.side_effect = _mocked_execute_query
|
||||
execute_query_mock.__name__ = "mocked_execute_query"
|
||||
|
||||
command = AlertCommand(report_schedule=mocker.Mock())
|
||||
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())
|
||||
|
||||
with suppress(AlertQueryError):
|
||||
command.validate()
|
||||
# Should match the value defined in superset_test_config.py
|
||||
assert execute_query_mock.call_count == 3
|
||||
|
||||
|
||||
def test_get_alert_metadata_from_object(
|
||||
mocker: MockerFixture,
|
||||
app_context: AppContext,
|
||||
get_user,
|
||||
) -> None:
|
||||
from superset.commands.report.alert import AlertCommand
|
||||
from superset.reports.models import ReportSchedule
|
||||
|
||||
app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER]
|
||||
|
||||
mock_database = mocker.MagicMock()
|
||||
mock_exec_id = uuid.uuid4()
|
||||
report_schedule = ReportSchedule(
|
||||
created_by=get_user("admin"),
|
||||
owners=[get_user("admin")],
|
||||
type=ReportScheduleType.ALERT,
|
||||
description="description",
|
||||
crontab="0 9 * * *",
|
||||
creation_method=ReportCreationMethod.ALERTS_REPORTS,
|
||||
sql="SELECT 1",
|
||||
grace_period=14400,
|
||||
working_timeout=3600,
|
||||
database=mock_database,
|
||||
validator_config_json='{"op": "==", "threshold": 1}',
|
||||
)
|
||||
|
||||
cm = AlertCommand(report_schedule=report_schedule, execution_id=mock_exec_id)
|
||||
assert cm._get_alert_metadata_from_object() == {
|
||||
"report_schedule_id": report_schedule.id,
|
||||
"execution_id": mock_exec_id,
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue