superset/tests/integration_tests/security/guest_token_security_tests.py
2023-07-13 10:33:12 -07:00

236 lines
8.8 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit tests for Superset"""
from unittest import mock
import pytest
from flask import g
from superset import db, security_manager
from superset.daos.dashboard import EmbeddedDashboardDAO
from superset.dashboards.commands.exceptions import DashboardAccessDeniedError
from superset.exceptions import SupersetSecurityException
from superset.models.dashboard import Dashboard
from superset.security.guest_token import GuestTokenResourceType
from superset.sql_parse import Table
from tests.integration_tests.base_tests import SupersetTestCase
from tests.integration_tests.fixtures.birth_names_dashboard import (
load_birth_names_dashboard_with_slices,
load_birth_names_data,
)
@mock.patch.dict(
"superset.extensions.feature_flag_manager._feature_flags",
EMBEDDED_SUPERSET=True,
)
class TestGuestUserSecurity(SupersetTestCase):
def authorized_guest(self):
return security_manager.get_guest_user_from_token(
{"user": {}, "resources": [{"type": "dashboard", "id": "some-uuid"}]}
)
def test_is_guest_user__regular_user(self):
is_guest = security_manager.is_guest_user(security_manager.find_user("admin"))
self.assertFalse(is_guest)
def test_is_guest_user__anonymous(self):
is_guest = security_manager.is_guest_user(security_manager.get_anonymous_user())
self.assertFalse(is_guest)
def test_is_guest_user__guest_user(self):
is_guest = security_manager.is_guest_user(self.authorized_guest())
self.assertTrue(is_guest)
@mock.patch.dict(
"superset.extensions.feature_flag_manager._feature_flags",
EMBEDDED_SUPERSET=False,
)
def test_is_guest_user__flag_off(self):
is_guest = security_manager.is_guest_user(self.authorized_guest())
self.assertFalse(is_guest)
def test_get_guest_user__regular_user(self):
g.user = security_manager.find_user("admin")
guest_user = security_manager.get_current_guest_user_if_guest()
self.assertIsNone(guest_user)
def test_get_guest_user__anonymous_user(self):
g.user = security_manager.get_anonymous_user()
guest_user = security_manager.get_current_guest_user_if_guest()
self.assertIsNone(guest_user)
def test_get_guest_user__guest_user(self):
g.user = self.authorized_guest()
guest_user = security_manager.get_current_guest_user_if_guest()
self.assertEqual(guest_user, g.user)
def test_get_guest_user_roles_explicit(self):
guest = self.authorized_guest()
roles = security_manager.get_user_roles(guest)
self.assertEqual(guest.roles, roles)
def test_get_guest_user_roles_implicit(self):
guest = self.authorized_guest()
g.user = guest
roles = security_manager.get_user_roles()
self.assertEqual(guest.roles, roles)
@mock.patch.dict(
"superset.extensions.feature_flag_manager._feature_flags",
EMBEDDED_SUPERSET=True,
)
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
class TestGuestUserDashboardAccess(SupersetTestCase):
def setUp(self) -> None:
self.dash = db.session.query(Dashboard).filter_by(slug="births").first()
self.embedded = EmbeddedDashboardDAO.upsert(self.dash, [])
self.authorized_guest = security_manager.get_guest_user_from_token(
{
"user": {},
"resources": [{"type": "dashboard", "id": str(self.embedded.uuid)}],
}
)
self.unauthorized_guest = security_manager.get_guest_user_from_token(
{
"user": {},
"resources": [
{"type": "dashboard", "id": "06383667-3e02-4e5e-843f-44e9c5896b6c"}
],
}
)
def test_has_guest_access__regular_user(self):
g.user = security_manager.find_user("admin")
has_guest_access = security_manager.has_guest_access(self.dash)
self.assertFalse(has_guest_access)
def test_has_guest_access__anonymous_user(self):
g.user = security_manager.get_anonymous_user()
has_guest_access = security_manager.has_guest_access(self.dash)
self.assertFalse(has_guest_access)
def test_has_guest_access__authorized_guest_user(self):
g.user = self.authorized_guest
has_guest_access = security_manager.has_guest_access(self.dash)
self.assertTrue(has_guest_access)
def test_has_guest_access__authorized_guest_user__non_zero_resource_index(self):
# set up a user who has authorized access, plus another resource
guest = self.authorized_guest
guest.resources = [
{"type": "dashboard", "id": "not-a-real-id"}
] + guest.resources
g.user = guest
has_guest_access = security_manager.has_guest_access(self.dash)
self.assertTrue(has_guest_access)
def test_has_guest_access__unauthorized_guest_user__different_resource_id(self):
g.user = security_manager.get_guest_user_from_token(
{
"user": {},
"resources": [{"type": "dashboard", "id": "not-a-real-id"}],
}
)
has_guest_access = security_manager.has_guest_access(self.dash)
self.assertFalse(has_guest_access)
def test_has_guest_access__unauthorized_guest_user__different_resource_type(self):
g.user = security_manager.get_guest_user_from_token(
{"user": {}, "resources": [{"type": "dirt", "id": self.embedded.uuid}]}
)
has_guest_access = security_manager.has_guest_access(self.dash)
self.assertFalse(has_guest_access)
def test_chart_raise_for_access_as_guest(self):
chart = self.dash.slices[0]
g.user = self.authorized_guest
security_manager.raise_for_access(viz=chart)
def test_chart_raise_for_access_as_unauthorized_guest(self):
chart = self.dash.slices[0]
g.user = self.unauthorized_guest
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(viz=chart)
def test_dataset_raise_for_access_as_guest(self):
dataset = self.dash.slices[0].datasource
g.user = self.authorized_guest
security_manager.raise_for_access(datasource=dataset)
def test_dataset_raise_for_access_as_unauthorized_guest(self):
dataset = self.dash.slices[0].datasource
g.user = self.unauthorized_guest
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(datasource=dataset)
def test_guest_token_does_not_grant_access_to_underlying_table(self):
sqla_table = self.dash.slices[0].table
table = Table(table=sqla_table.table_name)
g.user = self.authorized_guest
with self.assertRaises(Exception):
security_manager.raise_for_access(table=table, database=sqla_table.database)
def test_raise_for_dashboard_access_as_guest(self):
g.user = self.authorized_guest
security_manager.raise_for_dashboard_access(self.dash)
def test_raise_for_dashboard_access_as_unauthorized_guest(self):
g.user = self.unauthorized_guest
with self.assertRaises(DashboardAccessDeniedError):
security_manager.raise_for_dashboard_access(self.dash)
def test_raise_for_dashboard_access_as_guest_no_rbac(self):
"""
Test that guest account has no access to other dashboards.
A bug in the ``raise_for_dashboard_access`` logic allowed the guest user to
fetch data from other dashboards, as long as the other dashboard:
- was not embedded AND
- was not published OR
- had at least 1 datasource that the user had access to.
"""
g.user = self.unauthorized_guest
# Create a draft dashboard that is not embedded
dash = Dashboard()
dash.dashboard_title = "My Dashboard"
dash.owners = []
dash.slices = []
dash.published = False
db.session.add(dash)
db.session.commit()
with self.assertRaises(DashboardAccessDeniedError):
security_manager.raise_for_dashboard_access(dash)
db.session.delete(dash)
db.session.commit()