refactor: use exception status for logging (#21971)

This commit is contained in:
Elizabeth Thompson 2022-11-14 14:14:24 -08:00 committed by GitHub
parent c3f1873c43
commit ce145c676c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 273 additions and 31 deletions

View File

@ -90,6 +90,9 @@ class SupersetErrorType(str, Enum):
INVALID_PAYLOAD_FORMAT_ERROR = "INVALID_PAYLOAD_FORMAT_ERROR"
INVALID_PAYLOAD_SCHEMA_ERROR = "INVALID_PAYLOAD_SCHEMA_ERROR"
# Report errors
REPORT_NOTIFICATION_ERROR = "REPORT_NOTIFICATION_ERROR"
ISSUE_CODES = {
1000: _("The datasource is too large to query."),

View File

@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List
from flask_babel import lazy_gettext as _
from superset.commands.exceptions import (
@ -24,6 +26,7 @@ from superset.commands.exceptions import (
ForbiddenError,
ValidationError,
)
from superset.exceptions import SupersetError, SupersetErrorsException
from superset.reports.models import ReportScheduleType
@ -258,6 +261,17 @@ class ReportScheduleStateNotFoundError(CommandException):
message = _("Report Schedule state not found")
class ReportScheduleSystemErrorsException(CommandException, SupersetErrorsException):
errors: List[SupersetError] = []
message = _("Report schedule system error")
class ReportScheduleClientErrorsException(CommandException, SupersetErrorsException):
status = 400
errors: List[SupersetError] = []
message = _("Report schedule client error")
class ReportScheduleUnexpectedError(CommandException):
message = _("Report schedule unexpected error")

View File

@ -31,10 +31,13 @@ from superset.common.chart_data import ChartDataResultFormat, ChartDataResultTyp
from superset.dashboards.permalink.commands.create import (
CreateDashboardPermalinkCommand,
)
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetErrorsException, SupersetException
from superset.extensions import feature_flag_manager, machine_auth_provider_factory
from superset.reports.commands.alert import AlertCommand
from superset.reports.commands.exceptions import (
ReportScheduleAlertGracePeriodError,
ReportScheduleClientErrorsException,
ReportScheduleCsvFailedError,
ReportScheduleCsvTimeout,
ReportScheduleDataFrameFailedError,
@ -45,6 +48,7 @@ from superset.reports.commands.exceptions import (
ReportScheduleScreenshotFailedError,
ReportScheduleScreenshotTimeout,
ReportScheduleStateNotFoundError,
ReportScheduleSystemErrorsException,
ReportScheduleUnexpectedError,
ReportScheduleWorkingTimeoutError,
)
@ -102,7 +106,6 @@ class BaseReportState:
Update the report schedule state et al. and reflect the change in the execution
log.
"""
self.update_report_schedule(state)
self.create_log(error_message)
@ -384,9 +387,9 @@ class BaseReportState:
"""
Sends a notification to all recipients
:raises: NotificationError
:raises: CommandException
"""
notification_errors: List[NotificationError] = []
notification_errors: List[SupersetError] = []
for recipient in recipients:
notification = create_notification(recipient, notification_content)
try:
@ -398,19 +401,32 @@ class BaseReportState:
)
else:
notification.send()
except NotificationError as ex:
# collect notification errors but keep processing them
notification_errors.append(ex)
except (NotificationError, SupersetException) as ex:
# collect errors but keep processing them
notification_errors.append(
SupersetError(
message=ex.message,
error_type=SupersetErrorType.REPORT_NOTIFICATION_ERROR,
level=ErrorLevel.ERROR
if ex.status >= 500
else ErrorLevel.WARNING,
)
)
if notification_errors:
# raise errors separately so that we can utilize error status codes
# log all errors but raise based on the most severe
for error in notification_errors:
raise error
logger.warning(str(error))
if any(error.level == ErrorLevel.ERROR for error in notification_errors):
raise ReportScheduleSystemErrorsException(errors=notification_errors)
if any(error.level == ErrorLevel.WARNING for error in notification_errors):
raise ReportScheduleClientErrorsException(errors=notification_errors)
def send(self) -> None:
"""
Creates the notification content and sends them to all recipients
:raises: NotificationError
:raises: CommandException
"""
notification_content = self._get_notification_content()
self._send(notification_content, self._report_schedule.recipients)
@ -419,7 +435,7 @@ class BaseReportState:
"""
Creates and sends a notification for an error, to all recipients
:raises: NotificationError
:raises: CommandException
"""
header_data = self._get_log_data()
logger.info(
@ -517,25 +533,34 @@ class ReportNotTriggeredErrorState(BaseReportState):
return
self.send()
self.update_report_schedule_and_log(ReportState.SUCCESS)
except Exception as first_ex:
except (SupersetErrorsException, Exception) as first_ex:
error_message = str(first_ex)
if isinstance(first_ex, SupersetErrorsException):
error_message = ";".join([error.message for error in first_ex.errors])
self.update_report_schedule_and_log(
ReportState.ERROR, error_message=str(first_ex)
ReportState.ERROR, error_message=error_message
)
# TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE
if not self.is_in_error_grace_period():
second_error_message = REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER
try:
self.send_error(
f"Error occurred for {self._report_schedule.type}:"
f" {self._report_schedule.name}",
str(first_ex),
)
self.update_report_schedule_and_log(
ReportState.ERROR,
error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
except SupersetErrorsException as second_ex:
second_error_message = ";".join(
[error.message for error in second_ex.errors]
)
except Exception as second_ex: # pylint: disable=broad-except
second_error_message = str(second_ex)
finally:
self.update_report_schedule_and_log(
ReportState.ERROR, error_message=str(second_ex)
ReportState.ERROR, error_message=second_error_message
)
raise first_ex

View File

@ -26,6 +26,7 @@ import bleach
from flask_babel import gettext as __
from superset import app
from superset.exceptions import SupersetErrorsException
from superset.reports.models import ReportRecipientType
from superset.reports.notifications.base import BaseNotification
from superset.reports.notifications.exceptions import NotificationError
@ -210,5 +211,9 @@ class EmailNotification(BaseNotification): # pylint: disable=too-few-public-met
logger.info(
"Report sent to email, notification content is %s", content.header_data
)
except SupersetErrorsException as ex:
raise NotificationError(
";".join([error.message for error in ex.errors])
) from ex
except Exception as ex:
raise NotificationError(ex) from ex
raise NotificationError(str(ex)) from ex

View File

@ -14,7 +14,33 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from superset.exceptions import SupersetException
class NotificationError(Exception):
pass
class NotificationError(SupersetException):
"""
Generic unknown exception - only used when
bubbling up unknown exceptions from lower levels
"""
class NotificationParamException(SupersetException):
status = 422
class NotificationAuthorizationException(SupersetException):
status = 401
class NotificationUnprocessableException(SupersetException):
"""
When a third party client service is down.
The request should be retried. There is no further
action required on our part or the user's other than to retry
"""
status = 400
class NotificationMalformedException(SupersetException):
status = 400

View File

@ -23,12 +23,27 @@ from typing import Sequence, Union
import backoff
from flask_babel import gettext as __
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError, SlackClientError
from slack_sdk.errors import (
BotUserAccessError,
SlackApiError,
SlackClientConfigurationError,
SlackClientError,
SlackClientNotConnectedError,
SlackObjectFormationError,
SlackRequestError,
SlackTokenRotationError,
)
from superset import app
from superset.reports.models import ReportRecipientType
from superset.reports.notifications.base import BaseNotification
from superset.reports.notifications.exceptions import NotificationError
from superset.reports.notifications.exceptions import (
NotificationAuthorizationException,
NotificationError,
NotificationMalformedException,
NotificationParamException,
NotificationUnprocessableException,
)
from superset.utils.decorators import statsd_gauge
from superset.utils.urls import modify_url_query
@ -173,5 +188,18 @@ Error: %(text)s
else:
client.chat_postMessage(channel=channel, text=body)
logger.info("Report sent to slack")
except SlackClientError as ex:
except (
BotUserAccessError,
SlackRequestError,
SlackClientConfigurationError,
) as ex:
raise NotificationParamException from ex
except SlackObjectFormationError as ex:
raise NotificationMalformedException from ex
except SlackTokenRotationError as ex:
raise NotificationAuthorizationException from ex
except SlackClientNotConnectedError as ex:
raise NotificationUnprocessableException from ex
except (SlackClientError, SlackApiError) as ex:
# any other slack errors not caught above
raise NotificationError(ex) from ex

View File

@ -29,6 +29,8 @@ from superset.reports.commands.log_prune import AsyncPruneReportScheduleLogComma
from superset.reports.dao import ReportScheduleDAO
from superset.tasks.cron_util import cron_schedule_window
from superset.utils.celery import session_scope
from superset.utils.core import LoggerLevel
from superset.utils.log import get_logger_from_status
logger = logging.getLogger(__name__)
@ -92,11 +94,13 @@ def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) ->
"An unexpected occurred while executing the report: %s", task_id
)
self.update_state(state="FAILURE")
except CommandException:
logger.exception(
"A downstream exception occurred while generating" " a report: %s", task_id
except CommandException as ex:
logger_func, level = get_logger_from_status(ex.status)
logger_func(
"A downstream %s occurred while generating a report: %s", level, task_id
)
self.update_state(state="FAILURE")
if level == LoggerLevel.EXCEPTION:
self.update_state(state="FAILURE")
@celery_app.task(name="reports.prune_log")

View File

@ -190,6 +190,12 @@ class DatasourceType(str, Enum):
VIEW = "view"
class LoggerLevel(str, Enum):
INFO = "info"
WARNING = "warning"
EXCEPTION = "exception"
class HeaderDataType(TypedDict):
notification_format: str
owners: List[int]

View File

@ -31,6 +31,7 @@ from typing import (
Dict,
Iterator,
Optional,
Tuple,
Type,
TYPE_CHECKING,
Union,
@ -41,11 +42,13 @@ from flask_appbuilder.const import API_URI_RIS_KEY
from sqlalchemy.exc import SQLAlchemyError
from typing_extensions import Literal
from superset.utils.core import get_user_id
from superset.utils.core import get_user_id, LoggerLevel
if TYPE_CHECKING:
from superset.stats_logger import BaseStatsLogger
logger = logging.getLogger(__name__)
def collect_request_payload() -> Dict[str, Any]:
"""Collect log payload identifiable from request context"""
@ -75,6 +78,24 @@ def collect_request_payload() -> Dict[str, Any]:
return payload
def get_logger_from_status(
status: int,
) -> Tuple[Callable[..., None], str]:
"""
Return logger method by status of exception.
Maps logger level to status code level
"""
log_map = {
"2": LoggerLevel.INFO,
"3": LoggerLevel.INFO,
"4": LoggerLevel.WARNING,
"5": LoggerLevel.EXCEPTION,
}
log_level = log_map[str(status)[0]]
return (getattr(logger, log_level), log_level)
class AbstractEventLogger(ABC):
def __call__(
self,

View File

@ -18,7 +18,7 @@ import json
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from unittest.mock import Mock, patch
from unittest.mock import call, Mock, patch
from uuid import uuid4
import pytest
@ -29,6 +29,7 @@ from freezegun import freeze_time
from sqlalchemy.sql import func
from superset import db
from superset.exceptions import SupersetException
from superset.models.core import Database
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
@ -37,16 +38,22 @@ from superset.reports.commands.exceptions import (
AlertQueryInvalidTypeError,
AlertQueryMultipleColumnsError,
AlertQueryMultipleRowsError,
ReportScheduleClientErrorsException,
ReportScheduleCsvFailedError,
ReportScheduleCsvTimeout,
ReportScheduleForbiddenError,
ReportScheduleNotFoundError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
ReportScheduleScreenshotTimeout,
ReportScheduleSystemErrorsException,
ReportScheduleUnexpectedError,
ReportScheduleWorkingTimeoutError,
)
from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand
from superset.reports.commands.execute import (
AsyncExecuteReportScheduleCommand,
BaseReportState,
)
from superset.reports.commands.log_prune import AsyncPruneReportScheduleLogCommand
from superset.reports.models import (
ReportDataFormat,
@ -56,6 +63,10 @@ from superset.reports.models import (
ReportScheduleValidatorType,
ReportState,
)
from superset.reports.notifications.exceptions import (
NotificationError,
NotificationParamException,
)
from superset.reports.types import ReportScheduleExecutor
from superset.utils.database import get_example_database
from tests.integration_tests.fixtures.birth_names_dashboard import (
@ -115,7 +126,6 @@ def assert_log(state: str, error_message: Optional[str] = None):
if state == ReportState.ERROR:
# On error we send an email
print(logs)
assert len(logs) == 3
else:
assert len(logs) == 2
@ -1392,7 +1402,7 @@ def test_email_dashboard_report_fails(
screenshot_mock.return_value = SCREENSHOT_FILE
email_mock.side_effect = SMTPException("Could not connect to SMTP XPTO")
with pytest.raises(ReportScheduleUnexpectedError):
with pytest.raises(ReportScheduleSystemErrorsException):
AsyncExecuteReportScheduleCommand(
TEST_ID, create_report_email_dashboard.id, datetime.utcnow()
).run()
@ -1886,3 +1896,66 @@ def test_prune_log_soft_time_out(bulk_delete_logs, create_report_email_dashboard
with pytest.raises(SoftTimeLimitExceeded) as excinfo:
AsyncPruneReportScheduleLogCommand().run()
assert str(excinfo.value) == "SoftTimeLimitExceeded()"
@patch("superset.reports.commands.execute.logger")
@patch("superset.reports.commands.execute.create_notification")
def test__send_with_client_errors(notification_mock, logger_mock):
notification_content = "I am some content"
recipients = ["test@foo.com"]
notification_mock.return_value.send.side_effect = NotificationParamException()
with pytest.raises(ReportScheduleClientErrorsException) as excinfo:
BaseReportState._send(BaseReportState, notification_content, recipients)
assert excinfo.errisinstance(SupersetException)
logger_mock.warning.assert_called_with(
(
"SupersetError(message='', error_type=<SupersetErrorType.REPORT_NOTIFICATION_ERROR: 'REPORT_NOTIFICATION_ERROR'>, level=<ErrorLevel.WARNING: 'warning'>, extra=None)"
)
)
@patch("superset.reports.commands.execute.logger")
@patch("superset.reports.commands.execute.create_notification")
def test__send_with_multiple_errors(notification_mock, logger_mock):
notification_content = "I am some content"
recipients = ["test@foo.com", "test2@bar.com"]
notification_mock.return_value.send.side_effect = [
NotificationParamException(),
NotificationError(),
]
# it raises the error with a 500 status if present
with pytest.raises(ReportScheduleSystemErrorsException) as excinfo:
BaseReportState._send(BaseReportState, notification_content, recipients)
assert excinfo.errisinstance(SupersetException)
# it logs both errors as warnings
logger_mock.warning.assert_has_calls(
[
call(
"SupersetError(message='', error_type=<SupersetErrorType.REPORT_NOTIFICATION_ERROR: 'REPORT_NOTIFICATION_ERROR'>, level=<ErrorLevel.WARNING: 'warning'>, extra=None)"
),
call(
"SupersetError(message='', error_type=<SupersetErrorType.REPORT_NOTIFICATION_ERROR: 'REPORT_NOTIFICATION_ERROR'>, level=<ErrorLevel.ERROR: 'error'>, extra=None)"
),
]
)
@patch("superset.reports.commands.execute.logger")
@patch("superset.reports.commands.execute.create_notification")
def test__send_with_server_errors(notification_mock, logger_mock):
notification_content = "I am some content"
recipients = ["test@foo.com"]
notification_mock.return_value.send.side_effect = NotificationError()
with pytest.raises(ReportScheduleSystemErrorsException) as excinfo:
BaseReportState._send(BaseReportState, notification_content, recipients)
assert excinfo.errisinstance(SupersetException)
# it logs the error
logger_mock.warning.assert_called_with(
(
"SupersetError(message='', error_type=<SupersetErrorType.REPORT_NOTIFICATION_ERROR: 'REPORT_NOTIFICATION_ERROR'>, level=<ErrorLevel.ERROR: 'error'>, extra=None)"
)
)

View File

@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from superset.utils.log import get_logger_from_status
def test_log_from_status_exception() -> None:
(func, log_level) = get_logger_from_status(500)
assert func.__name__ == "exception"
assert log_level == "exception"
def test_log_from_status_warning() -> None:
(func, log_level) = get_logger_from_status(422)
assert func.__name__ == "warning"
assert log_level == "warning"
def test_log_from_status_info() -> None:
(func, log_level) = get_logger_from_status(300)
assert func.__name__ == "info"
assert log_level == "info"