diff --git a/superset/errors.py b/superset/errors.py index 9198a82d3..8bc84a742 100644 --- a/superset/errors.py +++ b/superset/errors.py @@ -90,6 +90,9 @@ class SupersetErrorType(str, Enum): INVALID_PAYLOAD_FORMAT_ERROR = "INVALID_PAYLOAD_FORMAT_ERROR" INVALID_PAYLOAD_SCHEMA_ERROR = "INVALID_PAYLOAD_SCHEMA_ERROR" + # Report errors + REPORT_NOTIFICATION_ERROR = "REPORT_NOTIFICATION_ERROR" + ISSUE_CODES = { 1000: _("The datasource is too large to query."), diff --git a/superset/reports/commands/exceptions.py b/superset/reports/commands/exceptions.py index 89f2c82fb..a068b3c62 100644 --- a/superset/reports/commands/exceptions.py +++ b/superset/reports/commands/exceptions.py @@ -14,6 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import List + from flask_babel import lazy_gettext as _ from superset.commands.exceptions import ( @@ -24,6 +26,7 @@ from superset.commands.exceptions import ( ForbiddenError, ValidationError, ) +from superset.exceptions import SupersetError, SupersetErrorsException from superset.reports.models import ReportScheduleType @@ -258,6 +261,17 @@ class ReportScheduleStateNotFoundError(CommandException): message = _("Report Schedule state not found") +class ReportScheduleSystemErrorsException(CommandException, SupersetErrorsException): + errors: List[SupersetError] = [] + message = _("Report schedule system error") + + +class ReportScheduleClientErrorsException(CommandException, SupersetErrorsException): + status = 400 + errors: List[SupersetError] = [] + message = _("Report schedule client error") + + class ReportScheduleUnexpectedError(CommandException): message = _("Report schedule unexpected error") diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index 0ab6c6252..d20775ffd 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -31,10 +31,13 @@ from superset.common.chart_data import ChartDataResultFormat, ChartDataResultTyp from superset.dashboards.permalink.commands.create import ( CreateDashboardPermalinkCommand, ) +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.exceptions import SupersetErrorsException, SupersetException from superset.extensions import feature_flag_manager, machine_auth_provider_factory from superset.reports.commands.alert import AlertCommand from superset.reports.commands.exceptions import ( ReportScheduleAlertGracePeriodError, + ReportScheduleClientErrorsException, ReportScheduleCsvFailedError, ReportScheduleCsvTimeout, ReportScheduleDataFrameFailedError, @@ -45,6 +48,7 @@ from superset.reports.commands.exceptions import ( ReportScheduleScreenshotFailedError, ReportScheduleScreenshotTimeout, ReportScheduleStateNotFoundError, + ReportScheduleSystemErrorsException, ReportScheduleUnexpectedError, ReportScheduleWorkingTimeoutError, ) @@ -102,7 +106,6 @@ class BaseReportState: Update the report schedule state et al. and reflect the change in the execution log. """ - self.update_report_schedule(state) self.create_log(error_message) @@ -384,9 +387,9 @@ class BaseReportState: """ Sends a notification to all recipients - :raises: NotificationError + :raises: CommandException """ - notification_errors: List[NotificationError] = [] + notification_errors: List[SupersetError] = [] for recipient in recipients: notification = create_notification(recipient, notification_content) try: @@ -398,19 +401,32 @@ class BaseReportState: ) else: notification.send() - except NotificationError as ex: - # collect notification errors but keep processing them - notification_errors.append(ex) + except (NotificationError, SupersetException) as ex: + # collect errors but keep processing them + notification_errors.append( + SupersetError( + message=ex.message, + error_type=SupersetErrorType.REPORT_NOTIFICATION_ERROR, + level=ErrorLevel.ERROR + if ex.status >= 500 + else ErrorLevel.WARNING, + ) + ) if notification_errors: - # raise errors separately so that we can utilize error status codes + # log all errors but raise based on the most severe for error in notification_errors: - raise error + logger.warning(str(error)) + + if any(error.level == ErrorLevel.ERROR for error in notification_errors): + raise ReportScheduleSystemErrorsException(errors=notification_errors) + if any(error.level == ErrorLevel.WARNING for error in notification_errors): + raise ReportScheduleClientErrorsException(errors=notification_errors) def send(self) -> None: """ Creates the notification content and sends them to all recipients - :raises: NotificationError + :raises: CommandException """ notification_content = self._get_notification_content() self._send(notification_content, self._report_schedule.recipients) @@ -419,7 +435,7 @@ class BaseReportState: """ Creates and sends a notification for an error, to all recipients - :raises: NotificationError + :raises: CommandException """ header_data = self._get_log_data() logger.info( @@ -517,25 +533,34 @@ class ReportNotTriggeredErrorState(BaseReportState): return self.send() self.update_report_schedule_and_log(ReportState.SUCCESS) - except Exception as first_ex: + except (SupersetErrorsException, Exception) as first_ex: + error_message = str(first_ex) + if isinstance(first_ex, SupersetErrorsException): + error_message = ";".join([error.message for error in first_ex.errors]) + self.update_report_schedule_and_log( - ReportState.ERROR, error_message=str(first_ex) + ReportState.ERROR, error_message=error_message ) + # TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE if not self.is_in_error_grace_period(): + second_error_message = REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER try: self.send_error( f"Error occurred for {self._report_schedule.type}:" f" {self._report_schedule.name}", str(first_ex), ) - self.update_report_schedule_and_log( - ReportState.ERROR, - error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER, + + except SupersetErrorsException as second_ex: + second_error_message = ";".join( + [error.message for error in second_ex.errors] ) except Exception as second_ex: # pylint: disable=broad-except + second_error_message = str(second_ex) + finally: self.update_report_schedule_and_log( - ReportState.ERROR, error_message=str(second_ex) + ReportState.ERROR, error_message=second_error_message ) raise first_ex diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index 1f042ded8..f8b38cc3e 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -26,6 +26,7 @@ import bleach from flask_babel import gettext as __ from superset import app +from superset.exceptions import SupersetErrorsException from superset.reports.models import ReportRecipientType from superset.reports.notifications.base import BaseNotification from superset.reports.notifications.exceptions import NotificationError @@ -210,5 +211,9 @@ class EmailNotification(BaseNotification): # pylint: disable=too-few-public-met logger.info( "Report sent to email, notification content is %s", content.header_data ) + except SupersetErrorsException as ex: + raise NotificationError( + ";".join([error.message for error in ex.errors]) + ) from ex except Exception as ex: - raise NotificationError(ex) from ex + raise NotificationError(str(ex)) from ex diff --git a/superset/reports/notifications/exceptions.py b/superset/reports/notifications/exceptions.py index 749a91fd9..aa06906f8 100644 --- a/superset/reports/notifications/exceptions.py +++ b/superset/reports/notifications/exceptions.py @@ -14,7 +14,33 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from superset.exceptions import SupersetException -class NotificationError(Exception): - pass +class NotificationError(SupersetException): + """ + Generic unknown exception - only used when + bubbling up unknown exceptions from lower levels + """ + + +class NotificationParamException(SupersetException): + status = 422 + + +class NotificationAuthorizationException(SupersetException): + status = 401 + + +class NotificationUnprocessableException(SupersetException): + """ + When a third party client service is down. + The request should be retried. There is no further + action required on our part or the user's other than to retry + """ + + status = 400 + + +class NotificationMalformedException(SupersetException): + status = 400 diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py index 2ac311c7f..898397273 100644 --- a/superset/reports/notifications/slack.py +++ b/superset/reports/notifications/slack.py @@ -23,12 +23,27 @@ from typing import Sequence, Union import backoff from flask_babel import gettext as __ from slack_sdk import WebClient -from slack_sdk.errors import SlackApiError, SlackClientError +from slack_sdk.errors import ( + BotUserAccessError, + SlackApiError, + SlackClientConfigurationError, + SlackClientError, + SlackClientNotConnectedError, + SlackObjectFormationError, + SlackRequestError, + SlackTokenRotationError, +) from superset import app from superset.reports.models import ReportRecipientType from superset.reports.notifications.base import BaseNotification -from superset.reports.notifications.exceptions import NotificationError +from superset.reports.notifications.exceptions import ( + NotificationAuthorizationException, + NotificationError, + NotificationMalformedException, + NotificationParamException, + NotificationUnprocessableException, +) from superset.utils.decorators import statsd_gauge from superset.utils.urls import modify_url_query @@ -173,5 +188,18 @@ Error: %(text)s else: client.chat_postMessage(channel=channel, text=body) logger.info("Report sent to slack") - except SlackClientError as ex: + except ( + BotUserAccessError, + SlackRequestError, + SlackClientConfigurationError, + ) as ex: + raise NotificationParamException from ex + except SlackObjectFormationError as ex: + raise NotificationMalformedException from ex + except SlackTokenRotationError as ex: + raise NotificationAuthorizationException from ex + except SlackClientNotConnectedError as ex: + raise NotificationUnprocessableException from ex + except (SlackClientError, SlackApiError) as ex: + # any other slack errors not caught above raise NotificationError(ex) from ex diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 2db721de7..7472f6855 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -29,6 +29,8 @@ from superset.reports.commands.log_prune import AsyncPruneReportScheduleLogComma from superset.reports.dao import ReportScheduleDAO from superset.tasks.cron_util import cron_schedule_window from superset.utils.celery import session_scope +from superset.utils.core import LoggerLevel +from superset.utils.log import get_logger_from_status logger = logging.getLogger(__name__) @@ -92,11 +94,13 @@ def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) -> "An unexpected occurred while executing the report: %s", task_id ) self.update_state(state="FAILURE") - except CommandException: - logger.exception( - "A downstream exception occurred while generating" " a report: %s", task_id + except CommandException as ex: + logger_func, level = get_logger_from_status(ex.status) + logger_func( + "A downstream %s occurred while generating a report: %s", level, task_id ) - self.update_state(state="FAILURE") + if level == LoggerLevel.EXCEPTION: + self.update_state(state="FAILURE") @celery_app.task(name="reports.prune_log") diff --git a/superset/utils/core.py b/superset/utils/core.py index cd992250e..ea2bd6588 100644 --- a/superset/utils/core.py +++ b/superset/utils/core.py @@ -190,6 +190,12 @@ class DatasourceType(str, Enum): VIEW = "view" +class LoggerLevel(str, Enum): + INFO = "info" + WARNING = "warning" + EXCEPTION = "exception" + + class HeaderDataType(TypedDict): notification_format: str owners: List[int] diff --git a/superset/utils/log.py b/superset/utils/log.py index ef7b290b2..a52556ae6 100644 --- a/superset/utils/log.py +++ b/superset/utils/log.py @@ -31,6 +31,7 @@ from typing import ( Dict, Iterator, Optional, + Tuple, Type, TYPE_CHECKING, Union, @@ -41,11 +42,13 @@ from flask_appbuilder.const import API_URI_RIS_KEY from sqlalchemy.exc import SQLAlchemyError from typing_extensions import Literal -from superset.utils.core import get_user_id +from superset.utils.core import get_user_id, LoggerLevel if TYPE_CHECKING: from superset.stats_logger import BaseStatsLogger +logger = logging.getLogger(__name__) + def collect_request_payload() -> Dict[str, Any]: """Collect log payload identifiable from request context""" @@ -75,6 +78,24 @@ def collect_request_payload() -> Dict[str, Any]: return payload +def get_logger_from_status( + status: int, +) -> Tuple[Callable[..., None], str]: + """ + Return logger method by status of exception. + Maps logger level to status code level + """ + log_map = { + "2": LoggerLevel.INFO, + "3": LoggerLevel.INFO, + "4": LoggerLevel.WARNING, + "5": LoggerLevel.EXCEPTION, + } + log_level = log_map[str(status)[0]] + + return (getattr(logger, log_level), log_level) + + class AbstractEventLogger(ABC): def __call__( self, diff --git a/tests/integration_tests/reports/commands_tests.py b/tests/integration_tests/reports/commands_tests.py index 2dd1c461c..288e6746c 100644 --- a/tests/integration_tests/reports/commands_tests.py +++ b/tests/integration_tests/reports/commands_tests.py @@ -18,7 +18,7 @@ import json from contextlib import contextmanager from datetime import datetime, timedelta, timezone from typing import List, Optional -from unittest.mock import Mock, patch +from unittest.mock import call, Mock, patch from uuid import uuid4 import pytest @@ -29,6 +29,7 @@ from freezegun import freeze_time from sqlalchemy.sql import func from superset import db +from superset.exceptions import SupersetException from superset.models.core import Database from superset.models.dashboard import Dashboard from superset.models.slice import Slice @@ -37,16 +38,22 @@ from superset.reports.commands.exceptions import ( AlertQueryInvalidTypeError, AlertQueryMultipleColumnsError, AlertQueryMultipleRowsError, + ReportScheduleClientErrorsException, ReportScheduleCsvFailedError, ReportScheduleCsvTimeout, + ReportScheduleForbiddenError, ReportScheduleNotFoundError, ReportSchedulePreviousWorkingError, ReportScheduleScreenshotFailedError, ReportScheduleScreenshotTimeout, + ReportScheduleSystemErrorsException, ReportScheduleUnexpectedError, ReportScheduleWorkingTimeoutError, ) -from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand +from superset.reports.commands.execute import ( + AsyncExecuteReportScheduleCommand, + BaseReportState, +) from superset.reports.commands.log_prune import AsyncPruneReportScheduleLogCommand from superset.reports.models import ( ReportDataFormat, @@ -56,6 +63,10 @@ from superset.reports.models import ( ReportScheduleValidatorType, ReportState, ) +from superset.reports.notifications.exceptions import ( + NotificationError, + NotificationParamException, +) from superset.reports.types import ReportScheduleExecutor from superset.utils.database import get_example_database from tests.integration_tests.fixtures.birth_names_dashboard import ( @@ -115,7 +126,6 @@ def assert_log(state: str, error_message: Optional[str] = None): if state == ReportState.ERROR: # On error we send an email - print(logs) assert len(logs) == 3 else: assert len(logs) == 2 @@ -1392,7 +1402,7 @@ def test_email_dashboard_report_fails( screenshot_mock.return_value = SCREENSHOT_FILE email_mock.side_effect = SMTPException("Could not connect to SMTP XPTO") - with pytest.raises(ReportScheduleUnexpectedError): + with pytest.raises(ReportScheduleSystemErrorsException): AsyncExecuteReportScheduleCommand( TEST_ID, create_report_email_dashboard.id, datetime.utcnow() ).run() @@ -1886,3 +1896,66 @@ def test_prune_log_soft_time_out(bulk_delete_logs, create_report_email_dashboard with pytest.raises(SoftTimeLimitExceeded) as excinfo: AsyncPruneReportScheduleLogCommand().run() assert str(excinfo.value) == "SoftTimeLimitExceeded()" + + +@patch("superset.reports.commands.execute.logger") +@patch("superset.reports.commands.execute.create_notification") +def test__send_with_client_errors(notification_mock, logger_mock): + notification_content = "I am some content" + recipients = ["test@foo.com"] + notification_mock.return_value.send.side_effect = NotificationParamException() + with pytest.raises(ReportScheduleClientErrorsException) as excinfo: + BaseReportState._send(BaseReportState, notification_content, recipients) + + assert excinfo.errisinstance(SupersetException) + logger_mock.warning.assert_called_with( + ( + "SupersetError(message='', error_type=, level=, extra=None)" + ) + ) + + +@patch("superset.reports.commands.execute.logger") +@patch("superset.reports.commands.execute.create_notification") +def test__send_with_multiple_errors(notification_mock, logger_mock): + notification_content = "I am some content" + recipients = ["test@foo.com", "test2@bar.com"] + notification_mock.return_value.send.side_effect = [ + NotificationParamException(), + NotificationError(), + ] + # it raises the error with a 500 status if present + with pytest.raises(ReportScheduleSystemErrorsException) as excinfo: + BaseReportState._send(BaseReportState, notification_content, recipients) + + assert excinfo.errisinstance(SupersetException) + # it logs both errors as warnings + logger_mock.warning.assert_has_calls( + [ + call( + "SupersetError(message='', error_type=, level=, extra=None)" + ), + call( + "SupersetError(message='', error_type=, level=, extra=None)" + ), + ] + ) + + +@patch("superset.reports.commands.execute.logger") +@patch("superset.reports.commands.execute.create_notification") +def test__send_with_server_errors(notification_mock, logger_mock): + + notification_content = "I am some content" + recipients = ["test@foo.com"] + notification_mock.return_value.send.side_effect = NotificationError() + with pytest.raises(ReportScheduleSystemErrorsException) as excinfo: + BaseReportState._send(BaseReportState, notification_content, recipients) + + assert excinfo.errisinstance(SupersetException) + # it logs the error + logger_mock.warning.assert_called_with( + ( + "SupersetError(message='', error_type=, level=, extra=None)" + ) + ) diff --git a/tests/unit_tests/utils/log_tests.py b/tests/unit_tests/utils/log_tests.py new file mode 100644 index 000000000..5b031b577 --- /dev/null +++ b/tests/unit_tests/utils/log_tests.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from superset.utils.log import get_logger_from_status + + +def test_log_from_status_exception() -> None: + (func, log_level) = get_logger_from_status(500) + assert func.__name__ == "exception" + assert log_level == "exception" + + +def test_log_from_status_warning() -> None: + (func, log_level) = get_logger_from_status(422) + assert func.__name__ == "warning" + assert log_level == "warning" + + +def test_log_from_status_info() -> None: + (func, log_level) = get_logger_from_status(300) + assert func.__name__ == "info" + assert log_level == "info"