feat(dashboard_rbac): dashboard_view access enforcement (#12875)
* test: dashboard_view_test failing * test: tests works first time * fix: pre-commit and some refactoring * fix: after CR * fix: replace not_published with draft * fix: after CR * fix: pre-commit fixes * fix: pre-commit and lint fixes * fix: remove unused * fix: remove unused import * fix: wrap the decorator to not block others * chore: reuse dashboard from decorator into function
This commit is contained in:
parent
9982fdebd8
commit
b472d1841c
|
|
@ -75,3 +75,7 @@ class DashboardForbiddenError(ForbiddenError):
|
|||
|
||||
class DashboardImportError(ImportFailedError):
|
||||
message = _("Import dashboard failed for an unknown reason")
|
||||
|
||||
|
||||
class DashboardAccessDeniedError(ForbiddenError):
|
||||
message = _("You don't have access to this dashboard.")
|
||||
|
|
|
|||
|
|
@ -14,12 +14,15 @@
|
|||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from functools import partial
|
||||
from typing import Any, Callable, Dict, List, Set, Union
|
||||
|
||||
import sqlalchemy as sqla
|
||||
from flask import g
|
||||
from flask_appbuilder import Model
|
||||
from flask_appbuilder.models.decorators import renders
|
||||
from flask_appbuilder.security.sqla.models import User
|
||||
|
|
@ -45,6 +48,7 @@ from superset import app, ConnectorRegistry, db, is_feature_enabled, security_ma
|
|||
from superset.connectors.base.models import BaseDatasource
|
||||
from superset.connectors.druid.models import DruidColumn, DruidMetric
|
||||
from superset.connectors.sqla.models import SqlMetric, TableColumn
|
||||
from superset.dashboards.commands.exceptions import DashboardAccessDeniedError
|
||||
from superset.extensions import cache_manager
|
||||
from superset.models.helpers import AuditMixinNullable, ImportExportMixin
|
||||
from superset.models.slice import Slice
|
||||
|
|
@ -356,6 +360,17 @@ class Dashboard( # pylint: disable=too-many-instance-attributes
|
|||
indent=4,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get(cls, id_or_slug: str) -> Dashboard:
|
||||
session = db.session()
|
||||
qry = session.query(Dashboard)
|
||||
if id_or_slug.isdigit():
|
||||
qry = qry.filter_by(id=int(id_or_slug))
|
||||
else:
|
||||
qry = qry.filter_by(slug=id_or_slug)
|
||||
|
||||
return qry.one_or_none()
|
||||
|
||||
|
||||
OnDashboardChange = Callable[[Mapper, Connection, Dashboard], Any]
|
||||
|
||||
|
|
@ -407,3 +422,22 @@ if is_feature_enabled("DASHBOARD_CACHE"):
|
|||
sqla.event.listen(TableColumn, "after_update", clear_dashboard_cache)
|
||||
sqla.event.listen(DruidMetric, "after_update", clear_dashboard_cache)
|
||||
sqla.event.listen(DruidColumn, "after_update", clear_dashboard_cache)
|
||||
|
||||
|
||||
def raise_for_dashboard_access(dashboard: Dashboard) -> None:
|
||||
from superset.views.base import get_user_roles, is_user_admin
|
||||
from superset.views.utils import is_owner
|
||||
|
||||
if is_feature_enabled("DASHBOARD_RBAC"):
|
||||
has_rbac_access = any(
|
||||
dashboard_role.id in [user_role.id for user_role in get_user_roles()]
|
||||
for dashboard_role in dashboard.roles
|
||||
)
|
||||
can_access = (
|
||||
is_user_admin()
|
||||
or is_owner(dashboard, g.user)
|
||||
or (dashboard.published and has_rbac_access)
|
||||
)
|
||||
|
||||
if not can_access:
|
||||
raise DashboardAccessDeniedError()
|
||||
|
|
|
|||
|
|
@ -15,11 +15,16 @@
|
|||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import time
|
||||
from functools import wraps
|
||||
from typing import Any, Callable, Dict, Iterator, Union
|
||||
|
||||
from contextlib2 import contextmanager
|
||||
from flask import Response
|
||||
|
||||
from superset import is_feature_enabled
|
||||
from superset.dashboards.commands.exceptions import DashboardAccessDeniedError
|
||||
from superset.stats_logger import BaseStatsLogger
|
||||
from superset.utils import core as utils
|
||||
from superset.utils.dates import now_as_float
|
||||
|
||||
|
||||
|
|
@ -69,3 +74,35 @@ def debounce(duration: Union[float, int] = 0.1) -> Callable[..., Any]:
|
|||
return wrapped
|
||||
|
||||
return decorate
|
||||
|
||||
|
||||
def on_security_exception(self: Any, ex: Exception) -> Response:
|
||||
return self.response(403, **{"message": utils.error_msg_from_exception(ex)})
|
||||
|
||||
|
||||
# noinspection PyPackageRequirements
|
||||
def check_dashboard_access(
|
||||
on_error: Callable[..., Any] = on_security_exception
|
||||
) -> Callable[..., Any]:
|
||||
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
|
||||
@wraps(f)
|
||||
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
from superset.models.dashboard import (
|
||||
Dashboard,
|
||||
raise_for_dashboard_access,
|
||||
)
|
||||
|
||||
dashboard = Dashboard.get(str(kwargs["dashboard_id_or_slug"]))
|
||||
if is_feature_enabled("DASHBOARD_RBAC"):
|
||||
try:
|
||||
raise_for_dashboard_access(dashboard)
|
||||
except DashboardAccessDeniedError as ex:
|
||||
return on_error(self, ex)
|
||||
except Exception as exception:
|
||||
raise exception
|
||||
|
||||
return f(self, *args, dashboard=dashboard, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
|
|
|||
|
|
@ -104,6 +104,7 @@ from superset.utils import core as utils
|
|||
from superset.utils.async_query_manager import AsyncQueryTokenException
|
||||
from superset.utils.cache import etag_cache
|
||||
from superset.utils.dates import now_as_float
|
||||
from superset.utils.decorators import check_dashboard_access
|
||||
from superset.views.base import (
|
||||
api,
|
||||
BaseSupersetView,
|
||||
|
|
@ -160,7 +161,6 @@ DATABASE_KEYS = [
|
|||
"id",
|
||||
]
|
||||
|
||||
|
||||
DATASOURCE_MISSING_ERR = __("The data source seems to have been deleted")
|
||||
USER_MISSING_ERR = __("The user seems to have been deleted")
|
||||
PARAMETER_MISSING_ERR = (
|
||||
|
|
@ -867,7 +867,8 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
Those should not be saved when saving the chart"""
|
||||
return [f for f in filters if not f.get("isExtra")]
|
||||
|
||||
def save_or_overwrite_slice( # pylint: disable=too-many-arguments,too-many-locals,no-self-use
|
||||
def save_or_overwrite_slice(
|
||||
# pylint: disable=too-many-arguments,too-many-locals,no-self-use
|
||||
self,
|
||||
slc: Optional[Slice],
|
||||
slice_add_perm: bool,
|
||||
|
|
@ -1785,33 +1786,34 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
@has_access
|
||||
@expose("/dashboard/<dashboard_id_or_slug>/")
|
||||
@event_logger.log_this_with_extra_payload
|
||||
@check_dashboard_access(
|
||||
on_error=lambda self, ex: Response(
|
||||
utils.error_msg_from_exception(ex), status=403
|
||||
)
|
||||
)
|
||||
def dashboard( # pylint: disable=too-many-locals
|
||||
self,
|
||||
dashboard_id_or_slug: str,
|
||||
# this parameter is added by `log_this_with_manual_updates`,
|
||||
# set a default value to appease pylint
|
||||
self, # pylint: disable=no-self-use
|
||||
dashboard_id_or_slug: str, # pylint: disable=unused-argument
|
||||
add_extra_log_payload: Callable[..., None] = lambda **kwargs: None,
|
||||
dashboard: Optional[Dashboard] = None,
|
||||
) -> FlaskResponse:
|
||||
"""Server side rendering for a dashboard"""
|
||||
session = db.session()
|
||||
qry = session.query(Dashboard)
|
||||
if dashboard_id_or_slug.isdigit():
|
||||
qry = qry.filter_by(id=int(dashboard_id_or_slug))
|
||||
else:
|
||||
qry = qry.filter_by(slug=dashboard_id_or_slug)
|
||||
|
||||
dash = qry.one_or_none()
|
||||
if not dash:
|
||||
"""
|
||||
Server side rendering for a dashboard
|
||||
:param dashboard_id_or_slug: identifier for dashboard. used in the decorators
|
||||
:param add_extra_log_payload: added by `log_this_with_manual_updates`, set a default value to appease pylint
|
||||
:param dashboard: added by `check_dashboard_access`
|
||||
"""
|
||||
if not dashboard:
|
||||
abort(404)
|
||||
|
||||
data = dash.full_data()
|
||||
data = dashboard.full_data()
|
||||
|
||||
if config["ENABLE_ACCESS_REQUEST"]:
|
||||
for datasource in data["datasources"].values():
|
||||
datasource = ConnectorRegistry.get_datasource(
|
||||
datasource_type=datasource["type"],
|
||||
datasource_id=datasource["id"],
|
||||
session=session,
|
||||
session=db.session(),
|
||||
)
|
||||
if datasource and not security_manager.can_access_datasource(
|
||||
datasource=datasource
|
||||
|
|
@ -1822,10 +1824,12 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
),
|
||||
"danger",
|
||||
)
|
||||
return redirect(f"/superset/request_access/?dashboard_id={dash.id}")
|
||||
return redirect(
|
||||
f"/superset/request_access/?dashboard_id={dashboard.id}"
|
||||
)
|
||||
|
||||
dash_edit_perm = check_ownership(
|
||||
dash, raise_if_false=False
|
||||
dashboard, raise_if_false=False
|
||||
) and security_manager.can_access("can_save_dash", "Superset")
|
||||
dash_save_perm = security_manager.can_access("can_save_dash", "Superset")
|
||||
superset_can_explore = security_manager.can_access("can_explore", "Superset")
|
||||
|
|
@ -1840,7 +1844,7 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
)
|
||||
|
||||
add_extra_log_payload(
|
||||
dashboard_id=dash.id,
|
||||
dashboard_id=dashboard.id,
|
||||
dashboard_version="v2",
|
||||
dash_edit_perm=dash_edit_perm,
|
||||
edit_mode=edit_mode,
|
||||
|
|
@ -1885,8 +1889,8 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
"superset/dashboard.html",
|
||||
entry="dashboard",
|
||||
standalone_mode=standalone_mode,
|
||||
title=dash.dashboard_title,
|
||||
custom_css=dash.css,
|
||||
title=dashboard.dashboard_title,
|
||||
custom_css=dashboard.css,
|
||||
bootstrap_data=json.dumps(
|
||||
bootstrap_data, default=utils.pessimistic_json_iso_dttm_ser
|
||||
),
|
||||
|
|
@ -2238,7 +2242,8 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
|
|||
@has_access_api
|
||||
@event_logger.log_this
|
||||
@expose("/validate_sql_json/", methods=["POST", "GET"])
|
||||
def validate_sql_json( # pylint: disable=too-many-locals,too-many-return-statements,no-self-use
|
||||
def validate_sql_json(
|
||||
# pylint: disable=too-many-locals,too-many-return-statements,no-self-use
|
||||
self,
|
||||
) -> FlaskResponse:
|
||||
"""Validates that arbitrary sql is acceptable for the given database.
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ class TestDashboardDatasetSecurity(DashboardTestCase):
|
|||
# arrange
|
||||
admin_user = security_manager.find_user(ADMIN_USERNAME)
|
||||
gamma_user = security_manager.find_user(GAMMA_USERNAME)
|
||||
admin_and_not_published_dashboard = create_dashboard_to_db(
|
||||
admin_and_draft_dashboard = create_dashboard_to_db(
|
||||
dashboard_title="admin_owned_unpublished_dash", owners=[admin_user]
|
||||
)
|
||||
|
||||
|
|
@ -171,7 +171,7 @@ class TestDashboardDatasetSecurity(DashboardTestCase):
|
|||
|
||||
# assert
|
||||
self.assertNotIn(
|
||||
admin_and_not_published_dashboard.url, get_dashboards_response_as_gamma
|
||||
admin_and_draft_dashboard.url, get_dashboards_response_as_gamma
|
||||
)
|
||||
|
||||
@pytest.mark.usefixtures("load_energy_table_with_slice", "load_dashboard")
|
||||
|
|
|
|||
|
|
@ -31,6 +31,136 @@ from tests.dashboards.superset_factory_util import (
|
|||
"superset.extensions.feature_flag_manager._feature_flags", DASHBOARD_RBAC=True,
|
||||
)
|
||||
class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
|
||||
def test_get_dashboard_view__admin_can_access(self):
|
||||
# arrange
|
||||
dashboard_to_access = create_dashboard_to_db(
|
||||
owners=[], slices=[create_slice_to_db()], published=False
|
||||
)
|
||||
self.login("admin")
|
||||
|
||||
# act
|
||||
response = self.get_dashboard_view_response(dashboard_to_access)
|
||||
|
||||
# assert
|
||||
self.assert_dashboard_view_response(response, dashboard_to_access)
|
||||
|
||||
def test_get_dashboard_view__owner_can_access(self):
|
||||
# arrange
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
owner = self.create_user_with_roles(
|
||||
username, [new_role], should_create_roles=True
|
||||
)
|
||||
dashboard_to_access = create_dashboard_to_db(
|
||||
owners=[owner], slices=[create_slice_to_db()], published=False
|
||||
)
|
||||
self.login(username)
|
||||
|
||||
# act
|
||||
response = self.get_dashboard_view_response(dashboard_to_access)
|
||||
|
||||
# assert
|
||||
self.assert_dashboard_view_response(response, dashboard_to_access)
|
||||
|
||||
def test_get_dashboard_view__user_can_not_access_without_permission(self):
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
self.create_user_with_roles(username, [new_role], should_create_roles=True)
|
||||
dashboard_to_access = create_dashboard_to_db(published=True)
|
||||
self.login(username)
|
||||
|
||||
# act
|
||||
response = self.get_dashboard_view_response(dashboard_to_access)
|
||||
|
||||
# assert
|
||||
self.assert403(response)
|
||||
|
||||
def test_get_dashboard_view__user_with_dashboard_permission_can_not_access_draft(
|
||||
self,
|
||||
):
|
||||
# arrange
|
||||
dashboard_to_access = create_dashboard_to_db(published=False)
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
self.create_user_with_roles(username, [new_role], should_create_roles=True)
|
||||
grant_access_to_dashboard(dashboard_to_access, new_role)
|
||||
self.login(username)
|
||||
|
||||
# act
|
||||
response = self.get_dashboard_view_response(dashboard_to_access)
|
||||
|
||||
# assert
|
||||
self.assert403(response)
|
||||
|
||||
# post
|
||||
revoke_access_to_dashboard(dashboard_to_access, new_role)
|
||||
|
||||
def test_get_dashboard_view__user_access_with_dashboard_permission(self):
|
||||
# arrange
|
||||
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
self.create_user_with_roles(username, [new_role], should_create_roles=True)
|
||||
|
||||
dashboard_to_access = create_dashboard_to_db(
|
||||
published=True, slices=[create_slice_to_db()]
|
||||
)
|
||||
self.login(username)
|
||||
grant_access_to_dashboard(dashboard_to_access, new_role)
|
||||
|
||||
# act
|
||||
response = self.get_dashboard_view_response(dashboard_to_access)
|
||||
|
||||
# assert
|
||||
self.assert_dashboard_view_response(response, dashboard_to_access)
|
||||
|
||||
# post
|
||||
revoke_access_to_dashboard(dashboard_to_access, new_role)
|
||||
|
||||
def test_get_dashboard_view__public_user_can_not_access_without_permission(self):
|
||||
dashboard_to_access = create_dashboard_to_db(published=True)
|
||||
self.logout()
|
||||
|
||||
# act
|
||||
response = self.get_dashboard_view_response(dashboard_to_access)
|
||||
|
||||
# assert
|
||||
self.assert403(response)
|
||||
|
||||
def test_get_dashboard_view__public_user_with_dashboard_permission_can_not_access_draft(
|
||||
self,
|
||||
):
|
||||
# arrange
|
||||
dashboard_to_access = create_dashboard_to_db(published=False)
|
||||
grant_access_to_dashboard(dashboard_to_access, "Public")
|
||||
self.logout()
|
||||
# act
|
||||
response = self.get_dashboard_view_response(dashboard_to_access)
|
||||
|
||||
# assert
|
||||
self.assert403(response)
|
||||
|
||||
# post
|
||||
revoke_access_to_dashboard(dashboard_to_access, "Public")
|
||||
|
||||
def test_get_dashboard_view__public_user_access_with_dashboard_permission(self):
|
||||
# arrange
|
||||
dashboard_to_access = create_dashboard_to_db(
|
||||
published=True, slices=[create_slice_to_db()]
|
||||
)
|
||||
grant_access_to_dashboard(dashboard_to_access, "Public")
|
||||
|
||||
self.logout()
|
||||
|
||||
# act
|
||||
response = self.get_dashboard_view_response(dashboard_to_access)
|
||||
|
||||
# assert
|
||||
self.assert_dashboard_view_response(response, dashboard_to_access)
|
||||
|
||||
# post
|
||||
revoke_access_to_dashboard(dashboard_to_access, "Public")
|
||||
|
||||
def test_get_dashboards_list__admin_get_all_dashboards(self):
|
||||
# arrange
|
||||
create_dashboard_to_db(
|
||||
|
|
@ -48,6 +178,20 @@ class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
|
|||
|
||||
def test_get_dashboards_list__owner_get_all_owned_dashboards(self):
|
||||
# arrange
|
||||
(
|
||||
not_owned_dashboards,
|
||||
owned_dashboards,
|
||||
) = self._create_sample_dashboards_with_owner_access()
|
||||
|
||||
# act
|
||||
response = self.get_dashboards_list_response()
|
||||
|
||||
# assert
|
||||
self.assert_dashboards_list_view_response(
|
||||
response, 2, owned_dashboards, not_owned_dashboards
|
||||
)
|
||||
|
||||
def _create_sample_dashboards_with_owner_access(self):
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
owner = self.create_user_with_roles(
|
||||
|
|
@ -67,16 +211,8 @@ class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
|
|||
slices=[create_slice_to_db(datasource_id=table.id)], published=True
|
||||
)
|
||||
]
|
||||
|
||||
self.login(username)
|
||||
|
||||
# act
|
||||
response = self.get_dashboards_list_response()
|
||||
|
||||
# assert
|
||||
self.assert_dashboards_list_view_response(
|
||||
response, 2, owned_dashboards, not_owned_dashboards
|
||||
)
|
||||
return not_owned_dashboards, owned_dashboards
|
||||
|
||||
def test_get_dashboards_list__user_without_any_permissions_get_empty_list(self):
|
||||
|
||||
|
|
@ -96,39 +232,41 @@ class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
|
|||
|
||||
def test_get_dashboards_list__user_get_only_published_permitted_dashboards(self):
|
||||
# arrange
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
self.create_user_with_roles(username, [new_role], should_create_roles=True)
|
||||
|
||||
published_dashboards = [
|
||||
create_dashboard_to_db(published=True),
|
||||
create_dashboard_to_db(published=True),
|
||||
]
|
||||
not_published_dashboards = [
|
||||
create_dashboard_to_db(published=False),
|
||||
create_dashboard_to_db(published=False),
|
||||
]
|
||||
|
||||
for dash in published_dashboards + not_published_dashboards:
|
||||
grant_access_to_dashboard(dash, new_role)
|
||||
|
||||
self.login(username)
|
||||
(
|
||||
new_role,
|
||||
draft_dashboards,
|
||||
published_dashboards,
|
||||
) = self._create_sample_only_published_dashboard_with_roles()
|
||||
|
||||
# act
|
||||
response = self.get_dashboards_list_response()
|
||||
|
||||
# assert
|
||||
self.assert_dashboards_list_view_response(
|
||||
response,
|
||||
len(published_dashboards),
|
||||
published_dashboards,
|
||||
not_published_dashboards,
|
||||
response, len(published_dashboards), published_dashboards, draft_dashboards,
|
||||
)
|
||||
|
||||
# post
|
||||
for dash in published_dashboards + not_published_dashboards:
|
||||
for dash in published_dashboards + draft_dashboards:
|
||||
revoke_access_to_dashboard(dash, new_role)
|
||||
|
||||
def _create_sample_only_published_dashboard_with_roles(self):
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
self.create_user_with_roles(username, [new_role], should_create_roles=True)
|
||||
published_dashboards = [
|
||||
create_dashboard_to_db(published=True),
|
||||
create_dashboard_to_db(published=True),
|
||||
]
|
||||
draft_dashboards = [
|
||||
create_dashboard_to_db(published=False),
|
||||
create_dashboard_to_db(published=False),
|
||||
]
|
||||
for dash in published_dashboards + draft_dashboards:
|
||||
grant_access_to_dashboard(dash, new_role)
|
||||
self.login(username)
|
||||
return new_role, draft_dashboards, published_dashboards
|
||||
|
||||
def test_get_dashboards_list__public_user_without_any_permissions_get_empty_list(
|
||||
self,
|
||||
):
|
||||
|
|
@ -148,27 +286,26 @@ class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
|
|||
create_dashboard_to_db(published=True),
|
||||
create_dashboard_to_db(published=True),
|
||||
]
|
||||
not_published_dashboards = [
|
||||
draft_dashboards = [
|
||||
create_dashboard_to_db(published=False),
|
||||
create_dashboard_to_db(published=False),
|
||||
]
|
||||
|
||||
for dash in published_dashboards + not_published_dashboards:
|
||||
for dash in published_dashboards + draft_dashboards:
|
||||
grant_access_to_dashboard(dash, "Public")
|
||||
|
||||
self.logout()
|
||||
|
||||
# act
|
||||
response = self.get_dashboards_list_response()
|
||||
|
||||
# assert
|
||||
self.assert_dashboards_list_view_response(
|
||||
response,
|
||||
len(published_dashboards),
|
||||
published_dashboards,
|
||||
not_published_dashboards,
|
||||
response, len(published_dashboards), published_dashboards, draft_dashboards,
|
||||
)
|
||||
|
||||
# post
|
||||
for dash in published_dashboards + not_published_dashboards:
|
||||
for dash in published_dashboards + draft_dashboards:
|
||||
revoke_access_to_dashboard(dash, "Public")
|
||||
|
||||
def test_get_dashboards_api__admin_get_all_dashboards(self):
|
||||
|
|
@ -188,27 +325,10 @@ class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
|
|||
|
||||
def test_get_dashboards_api__owner_get_all_owned_dashboards(self):
|
||||
# arrange
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
owner = self.create_user_with_roles(
|
||||
username, [new_role], should_create_roles=True
|
||||
)
|
||||
database = create_database_to_db()
|
||||
table = create_datasource_table_to_db(db_id=database.id, owners=[owner])
|
||||
first_dash = create_dashboard_to_db(
|
||||
owners=[owner], slices=[create_slice_to_db(datasource_id=table.id)]
|
||||
)
|
||||
second_dash = create_dashboard_to_db(
|
||||
owners=[owner], slices=[create_slice_to_db(datasource_id=table.id)]
|
||||
)
|
||||
owned_dashboards = [first_dash, second_dash]
|
||||
not_owned_dashboards = [
|
||||
create_dashboard_to_db(
|
||||
slices=[create_slice_to_db(datasource_id=table.id)], published=True
|
||||
)
|
||||
]
|
||||
|
||||
self.login(username)
|
||||
(
|
||||
not_owned_dashboards,
|
||||
owned_dashboards,
|
||||
) = self._create_sample_dashboards_with_owner_access()
|
||||
|
||||
# act
|
||||
response = self.get_dashboards_api_response()
|
||||
|
|
@ -232,43 +352,29 @@ class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
|
|||
self.assert_dashboards_api_response(response, 0)
|
||||
|
||||
def test_get_dashboards_api__user_get_only_published_permitted_dashboards(self):
|
||||
username = random_str()
|
||||
new_role = f"role_{random_str()}"
|
||||
self.create_user_with_roles(username, [new_role], should_create_roles=True)
|
||||
# arrange
|
||||
published_dashboards = [
|
||||
create_dashboard_to_db(published=True),
|
||||
create_dashboard_to_db(published=True),
|
||||
]
|
||||
not_published_dashboards = [
|
||||
create_dashboard_to_db(published=False),
|
||||
create_dashboard_to_db(published=False),
|
||||
]
|
||||
|
||||
for dash in published_dashboards + not_published_dashboards:
|
||||
grant_access_to_dashboard(dash, new_role)
|
||||
|
||||
self.login(username)
|
||||
(
|
||||
new_role,
|
||||
draft_dashboards,
|
||||
published_dashboards,
|
||||
) = self._create_sample_only_published_dashboard_with_roles()
|
||||
|
||||
# act
|
||||
response = self.get_dashboards_api_response()
|
||||
|
||||
# assert
|
||||
self.assert_dashboards_api_response(
|
||||
response,
|
||||
len(published_dashboards),
|
||||
published_dashboards,
|
||||
not_published_dashboards,
|
||||
response, len(published_dashboards), published_dashboards, draft_dashboards,
|
||||
)
|
||||
|
||||
# post
|
||||
for dash in published_dashboards + not_published_dashboards:
|
||||
for dash in published_dashboards + draft_dashboards:
|
||||
revoke_access_to_dashboard(dash, new_role)
|
||||
|
||||
def test_get_dashboards_api__public_user_without_any_permissions_get_empty_list(
|
||||
self,
|
||||
):
|
||||
create_dashboard_to_db(published=True)
|
||||
self.logout()
|
||||
|
||||
# act
|
||||
response = self.get_dashboards_api_response()
|
||||
|
|
@ -284,25 +390,24 @@ class TestDashboardRoleBasedSecurity(BaseTestDashboardSecurity):
|
|||
create_dashboard_to_db(published=True),
|
||||
create_dashboard_to_db(published=True),
|
||||
]
|
||||
not_published_dashboards = [
|
||||
draft_dashboards = [
|
||||
create_dashboard_to_db(published=False),
|
||||
create_dashboard_to_db(published=False),
|
||||
]
|
||||
|
||||
for dash in published_dashboards + not_published_dashboards:
|
||||
for dash in published_dashboards + draft_dashboards:
|
||||
grant_access_to_dashboard(dash, "Public")
|
||||
|
||||
self.logout()
|
||||
|
||||
# act
|
||||
response = self.get_dashboards_api_response()
|
||||
|
||||
# assert
|
||||
self.assert_dashboards_api_response(
|
||||
response,
|
||||
len(published_dashboards),
|
||||
published_dashboards,
|
||||
not_published_dashboards,
|
||||
response, len(published_dashboards), published_dashboards, draft_dashboards,
|
||||
)
|
||||
|
||||
# post
|
||||
for dash in published_dashboards + not_published_dashboards:
|
||||
for dash in published_dashboards + draft_dashboards:
|
||||
revoke_access_to_dashboard(dash, "Public")
|
||||
|
|
|
|||
Loading…
Reference in New Issue