feat(rbac): add customizable related filters (#22526)

This commit is contained in:
Ville Brofeldt 2023-01-05 16:42:08 +02:00 committed by GitHub
parent b3529479ab
commit 037deb9a1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 102 additions and 4 deletions

View File

@ -44,6 +44,7 @@ from typing import (
Tuple,
Type,
TYPE_CHECKING,
TypedDict,
Union,
)
@ -54,6 +55,7 @@ from dateutil import tz
from flask import Blueprint
from flask_appbuilder.security.manager import AUTH_DB
from pandas._libs.parsers import STR_NA_VALUES # pylint: disable=no-name-in-module
from sqlalchemy.orm.query import Query
from superset.advanced_data_type.plugins.internet_address import internet_address
from superset.advanced_data_type.plugins.internet_port import internet_port
@ -1502,6 +1504,32 @@ ENVIRONMENT_TAG_CONFIG = {
},
}
# Extra related query filters make it possible to limit which objects are shown
# in the UI. For examples, to only show "admin" or users starting with the letter "b" in
# the "Owners" dropdowns, you could add the following in your config:
# def user_filter(query: Query, *args, *kwargs):
# from superset import security_manager
#
# user_model = security_manager.user_model
# filters = [
# user_model.username == "admin",
# user_model.username.ilike("b%"),
# ]
# return query.filter(or_(*filters))
#
# EXTRA_RELATED_QUERY_FILTERS = {"user": user_filter}
#
# Similarly, to restrict the roles in the "Roles" dropdown you can provide a custom
# filter callback for the "role" key.
class ExtraRelatedQueryFilters(TypedDict, total=False):
role: Callable[[Query], Query]
user: Callable[[Query], Query]
EXTRA_RELATED_QUERY_FILTERS: ExtraRelatedQueryFilters = {}
# -------------------------------------------------------------------
# * WARNING: STOP EDITING HERE *
# -------------------------------------------------------------------

View File

@ -95,7 +95,11 @@ from superset.views.base_api import (
requires_json,
statsd_metrics,
)
from superset.views.filters import BaseFilterRelatedUsers, FilterRelatedOwners
from superset.views.filters import (
BaseFilterRelatedRoles,
BaseFilterRelatedUsers,
FilterRelatedOwners,
)
logger = logging.getLogger(__name__)
@ -244,7 +248,9 @@ class DashboardRestApi(BaseSupersetModelRestApi):
base_related_field_filters = {
"owners": [["id", BaseFilterRelatedUsers, lambda: []]],
"created_by": [["id", BaseFilterRelatedUsers, lambda: []]],
"roles": [["id", BaseFilterRelatedRoles, lambda: []]],
}
related_field_filters = {
"owners": RelatedFieldFilter("first_name", FilterRelatedOwners),
"roles": RelatedFieldFilter("name", FilterRelatedRoles),

View File

@ -72,11 +72,35 @@ class BaseFilterRelatedUsers(BaseFilter): # pylint: disable=too-few-public-meth
arg_name = "username"
def apply(self, query: Query, value: Optional[Any]) -> Query:
user_model = security_manager.user_model
if extra_filters := current_app.config["EXTRA_RELATED_QUERY_FILTERS"].get(
"user",
):
query = extra_filters(query)
exclude_users = (
security_manager.get_exclude_users_from_lists()
if current_app.config["EXCLUDE_USERS_FROM_LISTS"] is None
else current_app.config["EXCLUDE_USERS_FROM_LISTS"]
)
query_ = query.filter(and_(user_model.username.not_in(exclude_users)))
return query_
if exclude_users:
user_model = security_manager.user_model
return query.filter(and_(user_model.username.not_in(exclude_users)))
return query
class BaseFilterRelatedRoles(BaseFilter): # pylint: disable=too-few-public-methods
"""
Filter to apply on related roles.
"""
name = lazy_gettext("role")
arg_name = "role"
def apply(self, query: Query, value: Optional[Any]) -> Query:
if extra_filters := current_app.config["EXTRA_RELATED_QUERY_FILTERS"].get(
"role",
):
return extra_filters(query)
return query

View File

@ -219,6 +219,26 @@ class ApiOwnersTestCaseMixin:
for expected_user in expected_users:
assert expected_user in response_users
def test_get_related_owners_with_extra_filters(self):
"""
API: Test get related owners with extra related query filters
"""
self.login(username="admin")
def _base_filter(query):
return query.filter_by(username="alpha")
with patch.dict(
"superset.views.filters.current_app.config",
{"EXTRA_RELATED_QUERY_FILTERS": {"user": _base_filter}},
):
uri = f"api/v1/{self.resource_name}/related/owners"
rv = self.client.get(uri)
assert rv.status_code == 200
response = json.loads(rv.data.decode("utf-8"))
response_users = [result["text"] for result in response["result"]]
assert response_users == ["alpha user"]
def test_get_related_owners_paginated(self):
"""
API: Test get related owners with pagination

View File

@ -1828,6 +1828,26 @@ class TestDashboardApi(SupersetTestCase, ApiOwnersTestCaseMixin, InsertChartMixi
response_roles = [result["text"] for result in response["result"]]
assert "Alpha" in response_roles
def test_get_all_related_roles_with_with_extra_filters(self):
"""
API: Test get filter related roles with extra related query filters
"""
self.login(username="admin")
def _base_filter(query):
return query.filter_by(name="Alpha")
with patch.dict(
"superset.views.filters.current_app.config",
{"EXTRA_RELATED_QUERY_FILTERS": {"role": _base_filter}},
):
uri = f"api/v1/dashboard/related/roles"
rv = self.client.get(uri)
assert rv.status_code == 200
response = json.loads(rv.data.decode("utf-8"))
response_roles = [result["text"] for result in response["result"]]
assert response_roles == ["Alpha"]
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
def test_embedded_dashboards(self):
self.login(username="admin")