fix: add retry to SQL-based alerting celery task (#10542)
* added retry and minimized sqlalchemy object lives * pylint * added try catch * adjusted naming * added scoped session * update tests for dbsession * added requested changes * nit todo Co-authored-by: Jason Davis <@dropbox.com>
This commit is contained in:
parent
5e944e5730
commit
8b9292ed05
|
|
@ -47,12 +47,13 @@ from flask_login import login_user
|
|||
from retry.api import retry_call
|
||||
from selenium.common.exceptions import WebDriverException
|
||||
from selenium.webdriver import chrome, firefox
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError
|
||||
from werkzeug.http import parse_cookie
|
||||
|
||||
from superset import app, db, security_manager, thumbnail_cache
|
||||
from superset.extensions import celery_app
|
||||
from superset.models.alerts import Alert, AlertLog
|
||||
from superset.models.core import Database
|
||||
from superset.models.dashboard import Dashboard
|
||||
from superset.models.schedules import (
|
||||
EmailDeliveryType,
|
||||
|
|
@ -79,6 +80,7 @@ config = app.config
|
|||
logger = logging.getLogger("tasks.email_reports")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
stats_logger = current_app.config["STATS_LOGGER"]
|
||||
EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"]
|
||||
WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"]
|
||||
WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"]
|
||||
|
|
@ -533,6 +535,11 @@ def schedule_email_report( # pylint: disable=unused-argument
|
|||
name="alerts.run_query",
|
||||
bind=True,
|
||||
soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
|
||||
# TODO: find cause of https://github.com/apache/incubator-superset/issues/10530
|
||||
# and remove retry
|
||||
autoretry_for=(NoSuchColumnError, ResourceClosedError,),
|
||||
retry_kwargs={"max_retries": 5},
|
||||
retry_backoff=True,
|
||||
)
|
||||
def schedule_alert_query( # pylint: disable=unused-argument
|
||||
task: Task,
|
||||
|
|
@ -542,24 +549,33 @@ def schedule_alert_query( # pylint: disable=unused-argument
|
|||
is_test_alert: Optional[bool] = False,
|
||||
) -> None:
|
||||
model_cls = get_scheduler_model(report_type)
|
||||
dbsession = db.create_scoped_session()
|
||||
schedule = dbsession.query(model_cls).get(schedule_id)
|
||||
|
||||
# The user may have disabled the schedule. If so, ignore this
|
||||
if not schedule or not schedule.active:
|
||||
logger.info("Ignoring deactivated alert")
|
||||
return
|
||||
try:
|
||||
schedule = db.session.query(model_cls).get(schedule_id)
|
||||
|
||||
if report_type == ScheduleType.alert:
|
||||
if is_test_alert and recipients:
|
||||
deliver_alert(schedule.id, recipients)
|
||||
# The user may have disabled the schedule. If so, ignore this
|
||||
if not schedule or not schedule.active:
|
||||
logger.info("Ignoring deactivated alert")
|
||||
return
|
||||
|
||||
if run_alert_query(schedule.id, dbsession):
|
||||
# deliver_dashboard OR deliver_slice
|
||||
return
|
||||
else:
|
||||
raise RuntimeError("Unknown report type")
|
||||
if report_type == ScheduleType.alert:
|
||||
if is_test_alert and recipients:
|
||||
deliver_alert(schedule.id, recipients)
|
||||
return
|
||||
|
||||
if run_alert_query(
|
||||
schedule.id, schedule.database_id, schedule.sql, schedule.label
|
||||
):
|
||||
# deliver_dashboard OR deliver_slice
|
||||
return
|
||||
else:
|
||||
raise RuntimeError("Unknown report type")
|
||||
except NoSuchColumnError as column_error:
|
||||
stats_logger.incr("run_alert_task.error.nosuchcolumnerror")
|
||||
raise column_error
|
||||
except ResourceClosedError as resource_error:
|
||||
stats_logger.incr("run_alert_task.error.resourceclosederror")
|
||||
raise resource_error
|
||||
|
||||
|
||||
class AlertState:
|
||||
|
|
@ -618,23 +634,23 @@ def deliver_alert(alert_id: int, recipients: Optional[str] = None) -> None:
|
|||
_deliver_email(recipients, deliver_as_group, subject, body, data, images)
|
||||
|
||||
|
||||
def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]:
|
||||
def run_alert_query(
|
||||
alert_id: int, database_id: int, sql: str, label: str
|
||||
) -> Optional[bool]:
|
||||
"""
|
||||
Execute alert.sql and return value if any rows are returned
|
||||
"""
|
||||
alert = db.session.query(Alert).get(alert_id)
|
||||
|
||||
logger.info("Processing alert ID: %i", alert.id)
|
||||
database = alert.database
|
||||
logger.info("Processing alert ID: %i", alert_id)
|
||||
database = db.session.query(Database).get(database_id)
|
||||
if not database:
|
||||
logger.error("Alert database not preset")
|
||||
return None
|
||||
|
||||
if not alert.sql:
|
||||
if not sql:
|
||||
logger.error("Alert SQL not preset")
|
||||
return None
|
||||
|
||||
parsed_query = ParsedQuery(alert.sql)
|
||||
parsed_query = ParsedQuery(sql)
|
||||
sql = parsed_query.stripped()
|
||||
|
||||
state = None
|
||||
|
|
@ -642,27 +658,31 @@ def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]:
|
|||
|
||||
df = pd.DataFrame()
|
||||
try:
|
||||
logger.info("Evaluating SQL for alert %s", alert)
|
||||
logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label)
|
||||
df = database.get_df(sql)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
state = AlertState.ERROR
|
||||
logging.exception(exc)
|
||||
logging.error("Failed at evaluating alert: %s (%s)", alert.label, alert.id)
|
||||
logging.error("Failed at evaluating alert: %s (%s)", label, alert_id)
|
||||
|
||||
dttm_end = datetime.utcnow()
|
||||
last_eval_dttm = datetime.utcnow()
|
||||
|
||||
if state != AlertState.ERROR:
|
||||
alert.last_eval_dttm = datetime.utcnow()
|
||||
if not df.empty:
|
||||
# Looking for truthy cells
|
||||
for row in df.to_records():
|
||||
if any(row):
|
||||
state = AlertState.TRIGGER
|
||||
deliver_alert(alert.id)
|
||||
deliver_alert(alert_id)
|
||||
break
|
||||
if not state:
|
||||
state = AlertState.PASS
|
||||
|
||||
db.session.commit()
|
||||
alert = db.session.query(Alert).get(alert_id)
|
||||
if state != AlertState.ERROR:
|
||||
alert.last_eval_dttm = last_eval_dttm
|
||||
alert.last_state = state
|
||||
alert.logs.append(
|
||||
AlertLog(
|
||||
|
|
@ -672,7 +692,7 @@ def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]:
|
|||
state=state,
|
||||
)
|
||||
)
|
||||
dbsession.commit()
|
||||
db.session.commit()
|
||||
|
||||
return None
|
||||
|
||||
|
|
|
|||
|
|
@ -38,41 +38,42 @@ def setup_database():
|
|||
slice_id = db.session.query(Slice).all()[0].id
|
||||
database_id = utils.get_example_database().id
|
||||
|
||||
alert1 = Alert(
|
||||
id=1,
|
||||
label="alert_1",
|
||||
active=True,
|
||||
crontab="*/1 * * * *",
|
||||
sql="SELECT 0",
|
||||
alert_type="email",
|
||||
slice_id=slice_id,
|
||||
database_id=database_id,
|
||||
)
|
||||
alert2 = Alert(
|
||||
id=2,
|
||||
label="alert_2",
|
||||
active=True,
|
||||
crontab="*/1 * * * *",
|
||||
sql="SELECT 55",
|
||||
alert_type="email",
|
||||
slice_id=slice_id,
|
||||
database_id=database_id,
|
||||
)
|
||||
alert3 = Alert(
|
||||
id=3,
|
||||
label="alert_3",
|
||||
active=False,
|
||||
crontab="*/1 * * * *",
|
||||
sql="UPDATE 55",
|
||||
alert_type="email",
|
||||
slice_id=slice_id,
|
||||
database_id=database_id,
|
||||
)
|
||||
alert4 = Alert(id=4, active=False, label="alert_4", database_id=-1)
|
||||
alert5 = Alert(id=5, active=False, label="alert_5", database_id=database_id)
|
||||
alerts = [
|
||||
Alert(
|
||||
id=1,
|
||||
label="alert_1",
|
||||
active=True,
|
||||
crontab="*/1 * * * *",
|
||||
sql="SELECT 0",
|
||||
alert_type="email",
|
||||
slice_id=slice_id,
|
||||
database_id=database_id,
|
||||
),
|
||||
Alert(
|
||||
id=2,
|
||||
label="alert_2",
|
||||
active=True,
|
||||
crontab="*/1 * * * *",
|
||||
sql="SELECT 55",
|
||||
alert_type="email",
|
||||
slice_id=slice_id,
|
||||
database_id=database_id,
|
||||
),
|
||||
Alert(
|
||||
id=3,
|
||||
label="alert_3",
|
||||
active=False,
|
||||
crontab="*/1 * * * *",
|
||||
sql="UPDATE 55",
|
||||
alert_type="email",
|
||||
slice_id=slice_id,
|
||||
database_id=database_id,
|
||||
),
|
||||
Alert(id=4, active=False, label="alert_4", database_id=-1),
|
||||
Alert(id=5, active=False, label="alert_5", database_id=database_id),
|
||||
]
|
||||
|
||||
for num in range(1, 6):
|
||||
eval(f"db.session.add(alert{num})")
|
||||
db.session.bulk_save_objects(alerts)
|
||||
db.session.commit()
|
||||
yield db.session
|
||||
|
||||
|
|
@ -82,45 +83,46 @@ def setup_database():
|
|||
|
||||
@patch("superset.tasks.schedules.deliver_alert")
|
||||
@patch("superset.tasks.schedules.logging.Logger.error")
|
||||
def test_run_alert_query(mock_error, mock_deliver, setup_database):
|
||||
database = setup_database
|
||||
run_alert_query(database.query(Alert).filter_by(id=1).one().id, database)
|
||||
alert1 = database.query(Alert).filter_by(id=1).one()
|
||||
assert mock_deliver.call_count == 0
|
||||
assert len(alert1.logs) == 1
|
||||
assert alert1.logs[0].alert_id == 1
|
||||
assert alert1.logs[0].state == "pass"
|
||||
def test_run_alert_query(mock_error, mock_deliver_alert, setup_database):
|
||||
dbsession = setup_database
|
||||
|
||||
run_alert_query(database.query(Alert).filter_by(id=2).one().id, database)
|
||||
alert2 = database.query(Alert).filter_by(id=2).one()
|
||||
assert mock_deliver.call_count == 1
|
||||
assert len(alert2.logs) == 1
|
||||
assert alert2.logs[0].alert_id == 2
|
||||
assert alert2.logs[0].state == "trigger"
|
||||
# Test passing alert with null SQL result
|
||||
alert1 = dbsession.query(Alert).filter_by(id=1).one()
|
||||
run_alert_query(alert1.id, alert1.database_id, alert1.sql, alert1.label)
|
||||
assert mock_deliver_alert.call_count == 0
|
||||
assert mock_error.call_count == 0
|
||||
|
||||
run_alert_query(database.query(Alert).filter_by(id=3).one().id, database)
|
||||
alert3 = database.query(Alert).filter_by(id=3).one()
|
||||
assert mock_deliver.call_count == 1
|
||||
# Test passing alert with True SQL result
|
||||
alert2 = dbsession.query(Alert).filter_by(id=2).one()
|
||||
run_alert_query(alert2.id, alert2.database_id, alert2.sql, alert2.label)
|
||||
assert mock_deliver_alert.call_count == 1
|
||||
assert mock_error.call_count == 0
|
||||
|
||||
# Test passing alert with error in SQL query
|
||||
alert3 = dbsession.query(Alert).filter_by(id=3).one()
|
||||
run_alert_query(alert3.id, alert3.database_id, alert3.sql, alert3.label)
|
||||
assert mock_deliver_alert.call_count == 1
|
||||
assert mock_error.call_count == 2
|
||||
assert len(alert3.logs) == 1
|
||||
assert alert3.logs[0].alert_id == 3
|
||||
assert alert3.logs[0].state == "error"
|
||||
|
||||
run_alert_query(database.query(Alert).filter_by(id=4).one().id, database)
|
||||
assert mock_deliver.call_count == 1
|
||||
# Test passing alert with invalid database
|
||||
alert4 = dbsession.query(Alert).filter_by(id=4).one()
|
||||
run_alert_query(alert4.id, alert4.database_id, alert4.sql, alert4.label)
|
||||
assert mock_deliver_alert.call_count == 1
|
||||
assert mock_error.call_count == 3
|
||||
|
||||
run_alert_query(database.query(Alert).filter_by(id=5).one().id, database)
|
||||
assert mock_deliver.call_count == 1
|
||||
# Test passing alert with no SQL statement
|
||||
alert5 = dbsession.query(Alert).filter_by(id=5).one()
|
||||
run_alert_query(alert5.id, alert5.database_id, alert5.sql, alert5.label)
|
||||
assert mock_deliver_alert.call_count == 1
|
||||
assert mock_error.call_count == 4
|
||||
|
||||
|
||||
@patch("superset.tasks.schedules.deliver_alert")
|
||||
@patch("superset.tasks.schedules.run_alert_query")
|
||||
def test_schedule_alert_query(mock_run_alert, mock_deliver_alert, setup_database):
|
||||
database = setup_database
|
||||
active_alert = database.query(Alert).filter_by(id=1).one()
|
||||
inactive_alert = database.query(Alert).filter_by(id=3).one()
|
||||
dbsession = setup_database
|
||||
active_alert = dbsession.query(Alert).filter_by(id=1).one()
|
||||
inactive_alert = dbsession.query(Alert).filter_by(id=3).one()
|
||||
|
||||
# Test that inactive alerts are no processed
|
||||
schedule_alert_query(report_type=ScheduleType.alert, schedule_id=inactive_alert.id)
|
||||
|
|
|
|||
Loading…
Reference in New Issue