refactor: optimize backend log payload (#11927)
This commit is contained in:
parent
77cae64ccd
commit
76f9f185fb
|
|
@ -30,7 +30,7 @@ combine_as_imports = true
|
|||
include_trailing_comma = true
|
||||
line_length = 88
|
||||
known_first_party = superset
|
||||
known_third_party =alembic,apispec,backoff,bleach,cachelib,celery,click,colorama,contextlib2,cron_descriptor,croniter,cryptography,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,freezegun,geohash,geopy,humanize,isodate,jinja2,jwt,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parameterized,parsedatetime,pathlib2,pgsanity,pkg_resources,polyline,prison,pyarrow,pyhive,pytest,pytz,redis,retry,selenium,setuptools,simplejson,slack,sqlalchemy,sqlalchemy_utils,sqlparse,werkzeug,wtforms,wtforms_json,yaml
|
||||
known_third_party =alembic,apispec,backoff,bleach,cachelib,celery,click,colorama,contextlib2,cron_descriptor,croniter,cryptography,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,freezegun,geohash,geopy,humanize,isodate,jinja2,jwt,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parameterized,parsedatetime,pathlib2,pgsanity,pkg_resources,polyline,prison,pyarrow,pyhive,pytest,pytz,redis,retry,selenium,setuptools,simplejson,slack,sqlalchemy,sqlalchemy_utils,sqlparse,typing_extensions,werkzeug,wtforms,wtforms_json,yaml
|
||||
multi_line_output = 3
|
||||
order_by_type = false
|
||||
|
||||
|
|
|
|||
|
|
@ -22,19 +22,39 @@ import textwrap
|
|||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from contextlib import contextmanager
|
||||
from typing import Any, Callable, cast, Iterator, Optional, Type
|
||||
from typing import Any, Callable, cast, Dict, Iterator, Optional, Type, Union
|
||||
|
||||
from flask import current_app, g, request
|
||||
from flask_appbuilder.const import API_URI_RIS_KEY
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from typing_extensions import Literal
|
||||
|
||||
from superset.stats_logger import BaseStatsLogger
|
||||
|
||||
|
||||
def strip_int_from_path(path: Optional[str]) -> str:
|
||||
"""Simple function to remove ints from '/' separated paths"""
|
||||
if path:
|
||||
return "/".join(["<int>" if s.isdigit() else s for s in path.split("/")])
|
||||
return ""
|
||||
def collect_request_payload() -> Dict[str, Any]:
|
||||
"""Collect log payload identifiable from request context"""
|
||||
payload: Dict[str, Any] = {
|
||||
"path": request.path,
|
||||
**request.form.to_dict(),
|
||||
# url search params can overwrite POST body
|
||||
**request.args.to_dict(),
|
||||
}
|
||||
|
||||
# save URL match pattern in addition to the request path
|
||||
url_rule = str(request.url_rule)
|
||||
if url_rule != request.path:
|
||||
payload["url_rule"] = url_rule
|
||||
|
||||
# remove rison raw string (q=xxx in search params) in favor of
|
||||
# rison object (could come from `payload_override`)
|
||||
if "rison" in payload and API_URI_RIS_KEY in payload:
|
||||
del payload[API_URI_RIS_KEY]
|
||||
# delete empty rison object
|
||||
if "rison" in payload and not payload["rison"]:
|
||||
del payload["rison"]
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
class AbstractEventLogger(ABC):
|
||||
|
|
@ -53,26 +73,37 @@ class AbstractEventLogger(ABC):
|
|||
pass
|
||||
|
||||
@contextmanager
|
||||
def log_context(
|
||||
self, action: str, ref: Optional[str] = None, log_to_statsd: bool = True,
|
||||
def log_context( # pylint: disable=too-many-locals
|
||||
self, action: str, object_ref: Optional[str] = None, log_to_statsd: bool = True,
|
||||
) -> Iterator[Callable[..., None]]:
|
||||
"""
|
||||
Log an event while reading information from the request context.
|
||||
`kwargs` will be appended directly to the log payload.
|
||||
Log an event with additional information from the request context.
|
||||
|
||||
:param action: a name to identify the event
|
||||
:param object_ref: reference to the Python object that triggered this action
|
||||
:param log_to_statsd: whether to update statsd counter for the action
|
||||
"""
|
||||
from superset.views.core import get_form_data
|
||||
|
||||
start_time = time.time()
|
||||
referrer = request.referrer[:1000] if request.referrer else None
|
||||
user_id = g.user.get_id() if hasattr(g, "user") and g.user else None
|
||||
payload = request.form.to_dict() or {}
|
||||
# request parameters can overwrite post body
|
||||
payload.update(request.args.to_dict())
|
||||
payload_override = {}
|
||||
|
||||
# yield a helper to update additional kwargs
|
||||
yield lambda **kwargs: payload.update(kwargs)
|
||||
# yield a helper to add additional payload
|
||||
yield lambda **kwargs: payload_override.update(kwargs)
|
||||
|
||||
dashboard_id = payload.get("dashboard_id")
|
||||
payload = collect_request_payload()
|
||||
if object_ref:
|
||||
payload["object_ref"] = object_ref
|
||||
# manual updates from context comes the last
|
||||
payload.update(payload_override)
|
||||
|
||||
dashboard_id: Optional[int] = None
|
||||
try:
|
||||
dashboard_id = int(payload.get("dashboard_id")) # type: ignore
|
||||
except (TypeError, ValueError):
|
||||
dashboard_id = None
|
||||
|
||||
if "form_data" in payload:
|
||||
form_data, _ = get_form_data()
|
||||
|
|
@ -89,15 +120,8 @@ class AbstractEventLogger(ABC):
|
|||
if log_to_statsd:
|
||||
self.stats_logger.incr(action)
|
||||
|
||||
payload.update(
|
||||
{
|
||||
"path": request.path,
|
||||
"path_no_param": strip_int_from_path(request.path),
|
||||
"ref": ref,
|
||||
}
|
||||
)
|
||||
# bulk insert
|
||||
try:
|
||||
# bulk insert
|
||||
explode_by = payload.get("explode")
|
||||
records = json.loads(payload.get(explode_by)) # type: ignore
|
||||
except Exception: # pylint: disable=broad-except
|
||||
|
|
@ -114,16 +138,30 @@ class AbstractEventLogger(ABC):
|
|||
)
|
||||
|
||||
def _wrapper(
|
||||
self, f: Callable[..., Any], **wrapper_kwargs: Any
|
||||
self,
|
||||
f: Callable[..., Any],
|
||||
action: Optional[Union[str, Callable[..., str]]] = None,
|
||||
object_ref: Optional[Union[str, Callable[..., str], Literal[False]]] = None,
|
||||
allow_extra_payload: Optional[bool] = False,
|
||||
**wrapper_kwargs: Any,
|
||||
) -> Callable[..., Any]:
|
||||
action_str = wrapper_kwargs.get("action") or f.__name__
|
||||
ref = f.__qualname__ if hasattr(f, "__qualname__") else None
|
||||
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
with self.log_context(action_str, ref, **wrapper_kwargs) as log:
|
||||
value = f(*args, **kwargs)
|
||||
action_str = (
|
||||
action(*args, **kwargs) if callable(action) else action
|
||||
) or f.__name__
|
||||
object_ref_str = (
|
||||
object_ref(*args, **kwargs) if callable(object_ref) else object_ref
|
||||
) or (f.__qualname__ if object_ref is not False else None)
|
||||
with self.log_context(
|
||||
action=action_str, object_ref=object_ref_str, **wrapper_kwargs
|
||||
) as log:
|
||||
log(**kwargs)
|
||||
if allow_extra_payload:
|
||||
# add a payload updater to the decorated function
|
||||
value = f(*args, add_extra_log_payload=log, **kwargs)
|
||||
else:
|
||||
value = f(*args, **kwargs)
|
||||
return value
|
||||
|
||||
return wrapper
|
||||
|
|
@ -140,18 +178,9 @@ class AbstractEventLogger(ABC):
|
|||
|
||||
return func
|
||||
|
||||
def log_manually(self, f: Callable[..., Any]) -> Callable[..., Any]:
|
||||
"""Allow a function to manually update"""
|
||||
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
with self.log_context(f.__name__) as log:
|
||||
# updated_log_payload should be either the last positional
|
||||
# argument or one of the named arguments of the decorated function
|
||||
value = f(*args, update_log_payload=log, **kwargs)
|
||||
return value
|
||||
|
||||
return wrapper
|
||||
def log_this_with_extra_payload(self, f: Callable[..., Any]) -> Callable[..., Any]:
|
||||
"""Decorator that instrument `update_log_payload` to kwargs"""
|
||||
return self._wrapper(f, allow_extra_payload=True)
|
||||
|
||||
@property
|
||||
def stats_logger(self) -> BaseStatsLogger:
|
||||
|
|
@ -217,9 +246,8 @@ class DBEventLogger(AbstractEventLogger):
|
|||
) -> None:
|
||||
from superset.models.core import Log
|
||||
|
||||
records = kwargs.get("records", list())
|
||||
|
||||
logs = list()
|
||||
records = kwargs.get("records", [])
|
||||
logs = []
|
||||
for record in records:
|
||||
json_string: Optional[str]
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -49,7 +49,6 @@ get_related_schema = {
|
|||
"filter": {"type": "string"},
|
||||
},
|
||||
}
|
||||
log_context = event_logger.log_context
|
||||
|
||||
|
||||
class RelatedResultResponseSchema(Schema):
|
||||
|
|
@ -310,65 +309,83 @@ class BaseSupersetModelRestApi(ModelRestApi):
|
|||
if time_delta:
|
||||
self.timing_stats("time", key, time_delta)
|
||||
|
||||
@event_logger.log_this_with_context(
|
||||
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.info",
|
||||
object_ref=False,
|
||||
log_to_statsd=False,
|
||||
)
|
||||
def info_headless(self, **kwargs: Any) -> Response:
|
||||
"""
|
||||
Add statsd metrics to builtin FAB _info endpoint
|
||||
"""
|
||||
ref = f"{self.__class__.__name__}.info"
|
||||
with log_context(ref, ref, log_to_statsd=False):
|
||||
duration, response = time_function(super().info_headless, **kwargs)
|
||||
self.send_stats_metrics(response, self.info.__name__, duration)
|
||||
return response
|
||||
duration, response = time_function(super().info_headless, **kwargs)
|
||||
self.send_stats_metrics(response, self.info.__name__, duration)
|
||||
return response
|
||||
|
||||
@event_logger.log_this_with_context(
|
||||
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.get",
|
||||
object_ref=False,
|
||||
log_to_statsd=False,
|
||||
)
|
||||
def get_headless(self, pk: int, **kwargs: Any) -> Response:
|
||||
"""
|
||||
Add statsd metrics to builtin FAB GET endpoint
|
||||
"""
|
||||
ref = f"{self.__class__.__name__}.get"
|
||||
with log_context(ref, ref, log_to_statsd=False):
|
||||
duration, response = time_function(super().get_headless, pk, **kwargs)
|
||||
self.send_stats_metrics(response, self.get.__name__, duration)
|
||||
return response
|
||||
duration, response = time_function(super().get_headless, pk, **kwargs)
|
||||
self.send_stats_metrics(response, self.get.__name__, duration)
|
||||
return response
|
||||
|
||||
@event_logger.log_this_with_context(
|
||||
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.get_list",
|
||||
object_ref=False,
|
||||
log_to_statsd=False,
|
||||
)
|
||||
def get_list_headless(self, **kwargs: Any) -> Response:
|
||||
"""
|
||||
Add statsd metrics to builtin FAB GET list endpoint
|
||||
"""
|
||||
ref = f"{self.__class__.__name__}.get_list"
|
||||
with log_context(ref, ref, log_to_statsd=False):
|
||||
duration, response = time_function(super().get_list_headless, **kwargs)
|
||||
self.send_stats_metrics(response, self.get_list.__name__, duration)
|
||||
return response
|
||||
duration, response = time_function(super().get_list_headless, **kwargs)
|
||||
self.send_stats_metrics(response, self.get_list.__name__, duration)
|
||||
return response
|
||||
|
||||
@event_logger.log_this_with_context(
|
||||
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.post",
|
||||
object_ref=False,
|
||||
log_to_statsd=False,
|
||||
)
|
||||
def post_headless(self) -> Response:
|
||||
"""
|
||||
Add statsd metrics to builtin FAB POST endpoint
|
||||
"""
|
||||
ref = f"{self.__class__.__name__}.post"
|
||||
with log_context(ref, ref, log_to_statsd=False):
|
||||
duration, response = time_function(super().post_headless)
|
||||
self.send_stats_metrics(response, self.post.__name__, duration)
|
||||
return response
|
||||
duration, response = time_function(super().post_headless)
|
||||
self.send_stats_metrics(response, self.post.__name__, duration)
|
||||
return response
|
||||
|
||||
@event_logger.log_this_with_context(
|
||||
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.put",
|
||||
object_ref=False,
|
||||
log_to_statsd=False,
|
||||
)
|
||||
def put_headless(self, pk: int) -> Response:
|
||||
"""
|
||||
Add statsd metrics to builtin FAB PUT endpoint
|
||||
"""
|
||||
ref = f"{self.__class__.__name__}.put"
|
||||
with log_context(ref, ref, log_to_statsd=False):
|
||||
duration, response = time_function(super().put_headless, pk)
|
||||
self.send_stats_metrics(response, self.put.__name__, duration)
|
||||
return response
|
||||
duration, response = time_function(super().put_headless, pk)
|
||||
self.send_stats_metrics(response, self.put.__name__, duration)
|
||||
return response
|
||||
|
||||
@event_logger.log_this_with_context(
|
||||
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.delete",
|
||||
object_ref=False,
|
||||
log_to_statsd=False,
|
||||
)
|
||||
def delete_headless(self, pk: int) -> Response:
|
||||
"""
|
||||
Add statsd metrics to builtin FAB DELETE endpoint
|
||||
"""
|
||||
ref = f"{self.__class__.__name__}.delete"
|
||||
with log_context(ref, ref, log_to_statsd=False):
|
||||
duration, response = time_function(super().delete_headless, pk)
|
||||
self.send_stats_metrics(response, self.delete.__name__, duration)
|
||||
return response
|
||||
duration, response = time_function(super().delete_headless, pk)
|
||||
self.send_stats_metrics(response, self.delete.__name__, duration)
|
||||
return response
|
||||
|
||||
@expose("/related/<column_name>", methods=["GET"])
|
||||
@protect()
|
||||
|
|
|
|||
|
|
@ -1752,13 +1752,13 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
|
||||
@has_access
|
||||
@expose("/dashboard/<dashboard_id_or_slug>/")
|
||||
@event_logger.log_manually
|
||||
@event_logger.log_this_with_extra_payload
|
||||
def dashboard( # pylint: disable=too-many-locals
|
||||
self,
|
||||
dashboard_id_or_slug: str,
|
||||
# this parameter is added by `log_manually`,
|
||||
# this parameter is added by `log_this_with_manual_updates`,
|
||||
# set a default value to appease pylint
|
||||
update_log_payload: Callable[..., None] = lambda **kwargs: None,
|
||||
add_extra_log_payload: Callable[..., None] = lambda **kwargs: None,
|
||||
) -> FlaskResponse:
|
||||
"""Server side rendering for a dashboard"""
|
||||
session = db.session()
|
||||
|
|
@ -1807,7 +1807,7 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
request.args.get(utils.ReservedUrlParameters.EDIT_MODE.value) == "true"
|
||||
)
|
||||
|
||||
update_log_payload(
|
||||
add_extra_log_payload(
|
||||
dashboard_id=dash.id,
|
||||
dashboard_version="v2",
|
||||
dash_edit_perm=dash_edit_perm,
|
||||
|
|
|
|||
|
|
@ -17,26 +17,23 @@
|
|||
import logging
|
||||
import time
|
||||
import unittest
|
||||
from datetime import datetime
|
||||
from unittest.mock import patch
|
||||
|
||||
from superset.utils.log import (
|
||||
AbstractEventLogger,
|
||||
DBEventLogger,
|
||||
get_event_logger_from_cfg_value,
|
||||
)
|
||||
from superset.utils.log import DBEventLogger, get_event_logger_from_cfg_value
|
||||
from tests.test_app import app
|
||||
|
||||
|
||||
class TestEventLogger(unittest.TestCase):
|
||||
def test_returns_configured_object_if_correct(self):
|
||||
# test that assignment of concrete AbstractBaseClass impl returns unmodified object
|
||||
def test_correct_config_object(self):
|
||||
# test that assignment of concrete AbstractBaseClass impl returns
|
||||
# unmodified object
|
||||
obj = DBEventLogger()
|
||||
res = get_event_logger_from_cfg_value(obj)
|
||||
self.assertTrue(obj is res)
|
||||
self.assertIs(obj, res)
|
||||
|
||||
def test_event_logger_config_class_deprecation(self):
|
||||
# test that assignment of a class object to EVENT_LOGGER is correctly deprecated
|
||||
def test_config_class_deprecation(self):
|
||||
# test that assignment of a class object to EVENT_LOGGER is correctly
|
||||
# deprecated
|
||||
res = None
|
||||
|
||||
# print warning if a class is assigned to EVENT_LOGGER
|
||||
|
|
@ -46,13 +43,14 @@ class TestEventLogger(unittest.TestCase):
|
|||
# class is instantiated and returned
|
||||
self.assertIsInstance(res, DBEventLogger)
|
||||
|
||||
def test_raises_typerror_if_not_abc_impl(self):
|
||||
# test that assignment of non AbstractEventLogger derived type raises TypeError
|
||||
def test_raises_typerror_if_not_abc(self):
|
||||
# test that assignment of non AbstractEventLogger derived type raises
|
||||
# TypeError
|
||||
with self.assertRaises(TypeError):
|
||||
get_event_logger_from_cfg_value(logging.getLogger())
|
||||
|
||||
@patch.object(DBEventLogger, "log")
|
||||
def test_log_this_decorator(self, mock_log):
|
||||
def test_log_this(self, mock_log):
|
||||
logger = DBEventLogger()
|
||||
|
||||
@logger.log_this
|
||||
|
|
@ -60,27 +58,46 @@ class TestEventLogger(unittest.TestCase):
|
|||
time.sleep(0.05)
|
||||
return 1
|
||||
|
||||
with app.test_request_context():
|
||||
with app.test_request_context("/superset/dashboard/1/?myparam=foo"):
|
||||
result = test_func()
|
||||
payload = mock_log.call_args[1]
|
||||
self.assertEqual(result, 1)
|
||||
assert mock_log.call_args[1]["duration_ms"] >= 50
|
||||
self.assertEqual(
|
||||
payload["records"],
|
||||
[
|
||||
{
|
||||
"myparam": "foo",
|
||||
"path": "/superset/dashboard/1/",
|
||||
"url_rule": "/superset/dashboard/<dashboard_id_or_slug>/",
|
||||
"object_ref": test_func.__qualname__,
|
||||
}
|
||||
],
|
||||
)
|
||||
self.assertGreaterEqual(payload["duration_ms"], 50)
|
||||
|
||||
@patch.object(DBEventLogger, "log")
|
||||
def test_log_manually_decorator(self, mock_log):
|
||||
def test_log_this_with_extra_payload(self, mock_log):
|
||||
logger = DBEventLogger()
|
||||
|
||||
@logger.log_manually
|
||||
def test_func(arg1, update_log_payload, karg1=1):
|
||||
@logger.log_this_with_extra_payload
|
||||
def test_func(arg1, add_extra_log_payload, karg1=1):
|
||||
time.sleep(0.1)
|
||||
update_log_payload(foo="bar")
|
||||
add_extra_log_payload(foo="bar")
|
||||
return arg1 * karg1
|
||||
|
||||
with app.test_request_context():
|
||||
result = test_func(1, karg1=2) # pylint: disable=no-value-for-parameter
|
||||
payload = mock_log.call_args[1]
|
||||
self.assertEqual(result, 2)
|
||||
# should contain only manual payload
|
||||
self.assertEqual(
|
||||
mock_log.call_args[1]["records"],
|
||||
[{"foo": "bar", "path": "/", "path_no_param": "/", "ref": None}],
|
||||
payload["records"],
|
||||
[
|
||||
{
|
||||
"foo": "bar",
|
||||
"path": "/",
|
||||
"karg1": 2,
|
||||
"object_ref": test_func.__qualname__,
|
||||
}
|
||||
],
|
||||
)
|
||||
assert mock_log.call_args[1]["duration_ms"] >= 100
|
||||
self.assertGreaterEqual(payload["duration_ms"], 100)
|
||||
|
|
|
|||
Loading…
Reference in New Issue