superset/tests/unit_tests/security/manager_test.py

307 lines
9.0 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from flask_appbuilder.security.sqla.models import Role, User
from pytest_mock import MockFixture
from superset.common.query_object import QueryObject
from superset.connectors.sqla.models import Database, SqlaTable
from superset.exceptions import SupersetSecurityException
from superset.extensions import appbuilder
from superset.models.slice import Slice
from superset.security.manager import SupersetSecurityManager
from superset.utils.core import override_user
def test_security_manager(app_context: None) -> None:
"""
Test that the security manager can be built.
"""
sm = SupersetSecurityManager(appbuilder)
assert sm
def test_raise_for_access_guest_user(
mocker: MockFixture,
app_context: None,
) -> None:
"""
Test that guest user can't modify chart payload.
"""
sm = SupersetSecurityManager(appbuilder)
mocker.patch.object(sm, "is_guest_user", return_value=True)
mocker.patch.object(sm, "can_access", return_value=True)
query_context = mocker.MagicMock()
query_context.slice_.id = 42
stored_metrics = [
{
"aggregate": None,
"column": None,
"datasourceWarning": False,
"expressionType": "SQL",
"hasCustomLabel": False,
"label": "COUNT(*) + 1",
"optionName": "metric_ssa1gwimio_cxpyjc7vj3s",
"sqlExpression": "COUNT(*) + 1",
}
]
query_context.slice_.params_dict = {
"metrics": stored_metrics,
}
# normal request
query_context.form_data = {
"slice_id": 42,
"metrics": stored_metrics,
}
query_context.queries = [QueryObject(metrics=stored_metrics)] # type: ignore
sm.raise_for_access(query_context=query_context)
# tampered requests
query_context.form_data = {
"slice_id": 43,
"metrics": stored_metrics,
}
query_context.queries = [QueryObject(metrics=stored_metrics)] # type: ignore
with pytest.raises(SupersetSecurityException):
sm.raise_for_access(query_context=query_context)
tampered_metrics = [
{
"aggregate": None,
"column": None,
"datasourceWarning": False,
"expressionType": "SQL",
"hasCustomLabel": False,
"label": "COUNT(*) + 2",
"optionName": "metric_ssa1gwimio_cxpyjc7vj3s",
"sqlExpression": "COUNT(*) + 2",
}
]
query_context.form_data = {
"slice_id": 42,
"metrics": tampered_metrics,
}
with pytest.raises(SupersetSecurityException):
sm.raise_for_access(query_context=query_context)
query_context.form_data = {
"slice_id": 42,
"metrics": stored_metrics,
}
query_context.queries = [QueryObject(metrics=tampered_metrics)] # type: ignore
with pytest.raises(SupersetSecurityException):
sm.raise_for_access(query_context=query_context)
def test_raise_for_access_query_default_schema(
mocker: MockFixture,
app_context: None,
) -> None:
"""
Test that the DB default schema is used in non-qualified table names.
For example, in Postgres, for the following query:
> SELECT * FROM foo;
We should check that the user has access to the `public` schema, regardless of the
schema set in the query.
"""
sm = SupersetSecurityManager(appbuilder)
mocker.patch.object(sm, "can_access_database", return_value=False)
mocker.patch.object(sm, "get_schema_perm", return_value="[PostgreSQL].[public]")
SqlaTable = mocker.patch("superset.connectors.sqla.models.SqlaTable")
SqlaTable.query_datasources_by_name.return_value = []
database = mocker.MagicMock()
database.get_default_schema_for_query.return_value = "public"
query = mocker.MagicMock()
query.database = database
query.sql = "SELECT * FROM ab_user"
# user has access to `public` schema
mocker.patch.object(sm, "can_access", return_value=True)
assert (
sm.raise_for_access( # type: ignore
database=None,
datasource=None,
query=query,
query_context=None,
table=None,
viz=None,
)
is None
)
sm.can_access.assert_called_with("schema_access", "[PostgreSQL].[public]") # type: ignore
# user has only access to `secret` schema
mocker.patch.object(sm, "can_access", return_value=False)
with pytest.raises(SupersetSecurityException) as excinfo:
sm.raise_for_access(
database=None,
datasource=None,
query=query,
query_context=None,
table=None,
viz=None,
)
assert (
str(excinfo.value)
== """You need access to the following tables: `public.ab_user`,
`all_database_access` or `all_datasource_access` permission"""
)
def test_raise_for_access_chart_for_datasource_permission(
mocker: MockFixture,
app_context: None,
) -> None:
"""
Test that the security manager can raise an exception for chart access,
when the user does not have access to the chart datasource
"""
sm = SupersetSecurityManager(appbuilder)
session = sm.get_session
engine = session.get_bind()
Slice.metadata.create_all(engine) # pylint: disable=no-member
alpha = User(
first_name="Alice",
last_name="Doe",
email="adoe@example.org",
username="admin",
roles=[Role(name="Alpha")],
)
dataset = SqlaTable(
table_name="test_table",
metrics=[],
main_dttm_col=None,
database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"),
)
session.add(dataset)
session.flush()
slice = Slice(
id=1,
datasource_id=dataset.id,
datasource_type="table",
datasource_name="tmp_perm_table",
slice_name="slice_name",
)
session.add(slice)
session.flush()
mocker.patch.object(sm, "can_access_datasource", return_value=False)
with override_user(alpha):
with pytest.raises(SupersetSecurityException) as excinfo:
sm.raise_for_access(
chart=slice,
)
assert str(excinfo.value) == "You don't have access to this chart."
mocker.patch.object(sm, "can_access_datasource", return_value=True)
with override_user(alpha):
sm.raise_for_access(
chart=slice,
)
def test_raise_for_access_chart_on_admin(
app_context: None,
) -> None:
"""
Test that the security manager can raise an exception for chart access,
when the user does not have access to the chart datasource
"""
from flask_appbuilder.security.sqla.models import Role, User
from superset.models.slice import Slice
from superset.utils.core import override_user
sm = SupersetSecurityManager(appbuilder)
session = sm.get_session
engine = session.get_bind()
Slice.metadata.create_all(engine) # pylint: disable=no-member
admin = User(
first_name="Alice",
last_name="Doe",
email="adoe@example.org",
username="admin",
roles=[Role(name="Admin")],
)
slice = Slice(
id=1,
datasource_id=1,
datasource_type="table",
datasource_name="tmp_perm_table",
slice_name="slice_name",
)
session.add(slice)
session.flush()
with override_user(admin):
sm.raise_for_access(
chart=slice,
)
def test_raise_for_access_chart_owner(
app_context: None,
) -> None:
"""
Test that the security manager can raise an exception for chart access,
when the user does not have access to the chart datasource
"""
sm = SupersetSecurityManager(appbuilder)
session = sm.get_session
engine = session.get_bind()
Slice.metadata.create_all(engine) # pylint: disable=no-member
alpha = User(
first_name="Alice",
last_name="Doe",
email="adoe@example.org",
username="admin",
roles=[Role(name="Alpha")],
)
slice = Slice(
id=1,
datasource_id=1,
datasource_type="table",
datasource_name="tmp_perm_table",
slice_name="slice_name",
owners=[alpha],
)
session.add(slice)
session.flush()
with override_user(alpha):
sm.raise_for_access(
chart=slice,
)