[log] New, make action log configurable and generic (#7705)

* [log] New, make action log configurable and generic

* [log] Fix, missing apache license

* [log] Fix, user_id is a required parameter on event logs

* [log] Fix, Rename Action to Event

* [log] Fix, flake8

* [logger] Change all log_this decorators to new abstract one

* [logger] [docs] Simple docs to show how to override the event log

* [style] Fix, single quote to double quote

* [style] Fix, single quote to double quote
This commit is contained in:
Daniel Vaz Gaspar 2019-07-08 17:38:12 +01:00 committed by Maxime Beauchemin
parent f5839c413e
commit 1ab04190cd
6 changed files with 186 additions and 98 deletions

View File

@ -678,6 +678,40 @@ environment variable: ::
*Adapted from http://flask.pocoo.org/snippets/69/*
Event Logging
-------------
Superset by default logs special action event on it's database. These log can be accessed on the UI navigating to
"Security" -> "Action Log". You can freely customize these logs by implementing your own event log class.
Example of a simple JSON to Stdout class::
class JSONStdOutEventLogger(AbstractEventLogger):
def log(self, user_id, action, *args, **kwargs):
records = kwargs.get('records', list())
dashboard_id = kwargs.get('dashboard_id')
slice_id = kwargs.get('slice_id')
duration_ms = kwargs.get('duration_ms')
referrer = kwargs.get('referrer')
for record in records:
log = dict(
action=action,
json=record,
dashboard_id=dashboard_id,
slice_id=slice_id,
duration_ms=duration_ms,
referrer=referrer,
user_id=user_id
)
print(json.dumps(log))
Then on Superset's config reference the class you want to use::
EVENT_LOGGER = JSONStdOutEventLogger
Upgrading
---------

View File

@ -36,6 +36,7 @@ from superset import config
from superset.connectors.connector_registry import ConnectorRegistry
from superset.security import SupersetSecurityManager
from superset.utils.core import pessimistic_connection_handling, setup_cache
from superset.utils.log import DBEventLogger
wtforms_json.init()
@ -217,6 +218,9 @@ results_backend = app.config.get("RESULTS_BACKEND")
_feature_flags = app.config.get("DEFAULT_FEATURE_FLAGS") or {}
_feature_flags.update(app.config.get("FEATURE_FLAGS") or {})
# Event Logger
event_logger = app.config.get("EVENT_LOGGER", DBEventLogger)()
def get_feature_flags():
GET_FEATURE_FLAGS_FUNC = app.config.get("GET_FEATURE_FLAGS_FUNC")

View File

@ -19,7 +19,6 @@
from contextlib import closing
from copy import copy, deepcopy
from datetime import datetime
import functools
import json
import logging
import textwrap
@ -1197,69 +1196,6 @@ class Log(Model):
duration_ms = Column(Integer)
referrer = Column(String(1024))
@classmethod
def log_this(cls, f):
"""Decorator to log user actions"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
user_id = None
if g.user:
user_id = g.user.get_id()
d = request.form.to_dict() or {}
# request parameters can overwrite post body
request_params = request.args.to_dict()
d.update(request_params)
d.update(kwargs)
slice_id = d.get("slice_id")
dashboard_id = d.get("dashboard_id")
try:
slice_id = int(
slice_id or json.loads(d.get("form_data")).get("slice_id")
)
except (ValueError, TypeError):
slice_id = 0
stats_logger.incr(f.__name__)
start_dttm = datetime.now()
value = f(*args, **kwargs)
duration_ms = (datetime.now() - start_dttm).total_seconds() * 1000
# bulk insert
try:
explode_by = d.get("explode")
records = json.loads(d.get(explode_by))
except Exception:
records = [d]
referrer = request.referrer[:1000] if request.referrer else None
logs = []
for record in records:
try:
json_string = json.dumps(record)
except Exception:
json_string = None
log = cls(
action=f.__name__,
json=json_string,
dashboard_id=dashboard_id,
slice_id=slice_id,
duration_ms=duration_ms,
referrer=referrer,
user_id=user_id,
)
logs.append(log)
sesh = db.session()
sesh.bulk_save_objects(logs)
sesh.commit()
return value
return wrapper
class FavStar(Model):
__tablename__ = "favstar"

115
superset/utils/log.py Normal file
View File

@ -0,0 +1,115 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=C,R,W
from abc import ABC, abstractmethod
from datetime import datetime
import functools
import json
from flask import current_app, g, request
class AbstractEventLogger(ABC):
@abstractmethod
def log(self, user_id, action, *args, **kwargs):
pass
def log_this(self, f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
user_id = None
if g.user:
user_id = g.user.get_id()
d = request.form.to_dict() or {}
# request parameters can overwrite post body
request_params = request.args.to_dict()
d.update(request_params)
d.update(kwargs)
slice_id = d.get("slice_id")
dashboard_id = d.get("dashboard_id")
try:
slice_id = int(
slice_id or json.loads(d.get("form_data")).get("slice_id")
)
except (ValueError, TypeError):
slice_id = 0
self.stats_logger.incr(f.__name__)
start_dttm = datetime.now()
value = f(*args, **kwargs)
duration_ms = (datetime.now() - start_dttm).total_seconds() * 1000
# bulk insert
try:
explode_by = d.get("explode")
records = json.loads(d.get(explode_by))
except Exception:
records = [d]
referrer = request.referrer[:1000] if request.referrer else None
self.log(
user_id,
f.__name__,
records=records,
dashboard_id=dashboard_id,
slice_id=slice_id,
duration_ms=duration_ms,
referrer=referrer,
)
return value
return wrapper
@property
def stats_logger(self):
return current_app.config.get("STATS_LOGGER")
class DBEventLogger(AbstractEventLogger):
def log(self, user_id, action, *args, **kwargs):
from superset.models.core import Log
records = kwargs.get("records", list())
dashboard_id = kwargs.get("dashboard_id")
slice_id = kwargs.get("slice_id")
duration_ms = kwargs.get("duration_ms")
referrer = kwargs.get("referrer")
logs = list()
for record in records:
try:
json_string = json.dumps(record)
except Exception:
json_string = None
log = Log(
action=action,
json=json_string,
dashboard_id=dashboard_id,
slice_id=slice_id,
duration_ms=duration_ms,
referrer=referrer,
user_id=user_id,
)
logs.append(log)
sesh = current_app.appbuilder.get_session
sesh.bulk_save_objects(logs)
sesh.commit()

View File

@ -20,17 +20,16 @@ from flask_appbuilder import expose
from flask_appbuilder.security.decorators import has_access_api
import simplejson as json
from superset import appbuilder, db, security_manager
from superset import appbuilder, db, event_logger, security_manager
from superset.common.query_context import QueryContext
from superset.legacy import update_time_range
import superset.models.core as models
from superset.models.core import Log
from superset.utils import core as utils
from .base import api, BaseSupersetView, handle_api_exception
class Api(BaseSupersetView):
@Log.log_this
@event_logger.log_this
@api
@handle_api_exception
@has_access_api
@ -48,7 +47,7 @@ class Api(BaseSupersetView):
payload_json, default=utils.json_int_dttm_ser, ignore_nan=True
)
@Log.log_this
@event_logger.log_this
@api
@handle_api_exception
@has_access_api

View File

@ -56,6 +56,7 @@ from superset import (
cache,
conf,
db,
event_logger,
get_feature_flags,
results_backend,
security_manager,
@ -108,7 +109,6 @@ from .utils import (
config = app.config
CACHE_DEFAULT_TIMEOUT = config.get("CACHE_DEFAULT_TIMEOUT", 0)
stats_logger = config.get("STATS_LOGGER")
log_this = models.Log.log_this
DAR = models.DatasourceAccessRequest
QueryStatus = utils.QueryStatus
@ -838,7 +838,7 @@ class DashboardModelView(SupersetModelView, DeleteMixin): # noqa
ids = "".join("&id={}".format(d.id) for d in items)
return redirect("/dashboard/export_dashboards_form?{}".format(ids[1:]))
@log_this
@event_logger.log_this
@has_access
@expose("/export_dashboards_form")
def download_dashboards(self):
@ -953,7 +953,7 @@ class KV(BaseSupersetView):
"""Used for storing and retrieving key value pairs"""
@log_this
@event_logger.log_this
@has_access_api
@expose("/store/", methods=["POST"])
def store(self):
@ -966,7 +966,7 @@ class KV(BaseSupersetView):
return json_error_response(e)
return Response(json.dumps({"id": obj.id}), status=200)
@log_this
@event_logger.log_this
@has_access_api
@expose("/<key_id>/", methods=["GET"])
def get_value(self, key_id):
@ -985,7 +985,7 @@ class R(BaseSupersetView):
"""used for short urls"""
@log_this
@event_logger.log_this
@expose("/<url_id>")
def index(self, url_id):
url = db.session.query(models.Url).filter_by(id=url_id).first()
@ -1000,7 +1000,7 @@ class R(BaseSupersetView):
flash("URL to nowhere...", "danger")
return redirect("/")
@log_this
@event_logger.log_this
@has_access_api
@expose("/shortner/", methods=["POST"])
def shortner(self):
@ -1082,7 +1082,7 @@ class Superset(BaseSupersetView):
{"granted": granted_perms, "requested": list(db_ds_names)}, status=201
)
@log_this
@event_logger.log_this
@has_access
@expose("/request_access/")
def request_access(self):
@ -1127,7 +1127,7 @@ class Superset(BaseSupersetView):
datasource_names=", ".join([o.name for o in datasources]),
)
@log_this
@event_logger.log_this
@has_access
@expose("/approve")
def approve(self):
@ -1365,7 +1365,7 @@ class Superset(BaseSupersetView):
payload = viz_obj.get_payload()
return data_payload_response(*viz_obj.payload_json_and_has_error(payload))
@log_this
@event_logger.log_this
@api
@has_access_api
@expose("/slice_json/<slice_id>")
@ -1382,7 +1382,7 @@ class Superset(BaseSupersetView):
)
return self.generate_json(viz_obj)
@log_this
@event_logger.log_this
@api
@has_access_api
@expose("/annotation_json/<layer_id>")
@ -1395,7 +1395,7 @@ class Superset(BaseSupersetView):
payload = viz_obj.get_payload()
return data_payload_response(*viz_obj.payload_json_and_has_error(payload))
@log_this
@event_logger.log_this
@api
@has_access_api
@handle_api_exception
@ -1438,7 +1438,7 @@ class Superset(BaseSupersetView):
viz_obj, csv=csv, query=query, results=results, samples=samples
)
@log_this
@event_logger.log_this
@has_access
@expose("/import_dashboards", methods=["GET", "POST"])
def import_dashboards(self):
@ -1468,7 +1468,7 @@ class Superset(BaseSupersetView):
return redirect("/dashboard/list/")
return self.render_template("superset/import_dashboards.html")
@log_this
@event_logger.log_this
@has_access
@expose("/explorev2/<datasource_type>/<datasource_id>/")
def explorev2(self, datasource_type, datasource_id):
@ -1482,7 +1482,7 @@ class Superset(BaseSupersetView):
)
)
@log_this
@event_logger.log_this
@has_access
@expose("/explore/<datasource_type>/<datasource_id>/", methods=["GET", "POST"])
@expose("/explore/", methods=["GET", "POST"])
@ -2448,7 +2448,7 @@ class Superset(BaseSupersetView):
edit_mode = request.args.get("edit") == "true"
# Hack to log the dashboard_id properly, even when getting a slug
@log_this
@event_logger.log_this
def dashboard(**kwargs): # noqa
pass
@ -2491,14 +2491,14 @@ class Superset(BaseSupersetView):
)
@api
@log_this
@event_logger.log_this
@expose("/log/", methods=["POST"])
def log(self):
return Response(status=200)
@has_access
@expose("/sync_druid/", methods=["POST"])
@log_this
@event_logger.log_this
def sync_druid_source(self):
"""Syncs the druid datasource in main db with the provided config.
@ -2554,7 +2554,7 @@ class Superset(BaseSupersetView):
@has_access
@expose("/sqllab_viz/", methods=["POST"])
@log_this
@event_logger.log_this
def sqllab_viz(self):
SqlaTable = ConnectorRegistry.sources["table"]
data = json.loads(request.form.get("data"))
@ -2591,7 +2591,7 @@ class Superset(BaseSupersetView):
@has_access
@expose("/table/<database_id>/<table_name>/<schema>/")
@log_this
@event_logger.log_this
def table(self, database_id, table_name, schema):
schema = utils.parse_js_uri_path_item(schema, eval_undefined=True)
table_name = utils.parse_js_uri_path_item(table_name)
@ -2655,7 +2655,7 @@ class Superset(BaseSupersetView):
@has_access
@expose("/extra_table_metadata/<database_id>/<table_name>/<schema>/")
@log_this
@event_logger.log_this
def extra_table_metadata(self, database_id, table_name, schema):
schema = utils.parse_js_uri_path_item(schema, eval_undefined=True)
table_name = utils.parse_js_uri_path_item(table_name)
@ -2666,7 +2666,7 @@ class Superset(BaseSupersetView):
@has_access
@expose("/select_star/<database_id>/<table_name>")
@expose("/select_star/<database_id>/<table_name>/<schema>")
@log_this
@event_logger.log_this
def select_star(self, database_id, table_name, schema=None):
mydb = db.session.query(models.Database).filter_by(id=database_id).first()
schema = utils.parse_js_uri_path_item(schema, eval_undefined=True)
@ -2681,7 +2681,7 @@ class Superset(BaseSupersetView):
@has_access_api
@expose("/cached_key/<key>/")
@log_this
@event_logger.log_this
def cached_key(self, key):
"""Returns a key from the cache"""
resp = cache.get(key)
@ -2691,7 +2691,7 @@ class Superset(BaseSupersetView):
@has_access_api
@expose("/cache_key_exist/<key>/")
@log_this
@event_logger.log_this
def cache_key_exist(self, key):
"""Returns if a key from cache exist"""
key_exist = True if cache.get(key) else False
@ -2700,7 +2700,7 @@ class Superset(BaseSupersetView):
@has_access_api
@expose("/results/<key>/")
@log_this
@event_logger.log_this
def results(self, key):
"""Serves a key off of the results backend"""
if not results_backend:
@ -2743,7 +2743,7 @@ class Superset(BaseSupersetView):
@has_access_api
@expose("/stop_query/", methods=["POST"])
@log_this
@event_logger.log_this
def stop_query(self):
client_id = request.form.get("client_id")
try:
@ -2756,7 +2756,7 @@ class Superset(BaseSupersetView):
@has_access_api
@expose("/validate_sql_json/", methods=["POST", "GET"])
@log_this
@event_logger.log_this
def validate_sql_json(self):
"""Validates that arbitrary sql is acceptable for the given database.
Returns a list of error/warning annotations as json.
@ -2819,7 +2819,7 @@ class Superset(BaseSupersetView):
@has_access_api
@expose("/sql_json/", methods=["POST", "GET"])
@log_this
@event_logger.log_this
def sql_json(self):
"""Runs arbitrary sql and returns and json"""
async_ = request.form.get("runAsync") == "true"
@ -2959,7 +2959,7 @@ class Superset(BaseSupersetView):
@has_access
@expose("/csv/<client_id>")
@log_this
@event_logger.log_this
def csv(self, client_id):
"""Download the query results as csv."""
logging.info("Exporting CSV file [{}]".format(client_id))
@ -3006,7 +3006,7 @@ class Superset(BaseSupersetView):
@handle_api_exception
@has_access
@expose("/fetch_datasource_metadata")
@log_this
@event_logger.log_this
def fetch_datasource_metadata(self):
datasource_id, datasource_type = request.args.get("datasourceKey").split("__")
datasource = ConnectorRegistry.get_datasource(
@ -3048,7 +3048,7 @@ class Superset(BaseSupersetView):
@has_access
@expose("/search_queries")
@log_this
@event_logger.log_this
def search_queries(self) -> Response:
"""
Search for previously run sqllab queries. Used for Sqllab Query Search