superset/tests/integration_tests/security_tests.py

2156 lines
80 KiB
Python
Raw Normal View History

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# isort:skip_file
import inspect
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
import time
import unittest
from collections import namedtuple
from unittest import mock
from unittest.mock import Mock, patch, call, ANY
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
from typing import Any
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
import jwt
import prison
import pytest
from flask import current_app, g
from flask_appbuilder.security.sqla.models import Role
from superset.daos.datasource import DatasourceDAO # noqa: F401
from superset.models.dashboard import Dashboard
from superset import app, appbuilder, db, security_manager, viz
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
from superset.connectors.sqla.models import SqlaTable
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetSecurityException
from superset.models.core import Database
from superset.models.slice import Slice
from superset.sql_parse import Table
2021-12-15 20:06:40 -05:00
from superset.utils.core import (
DatasourceType,
2021-12-15 20:06:40 -05:00
backend,
get_example_default_schema,
override_user,
2021-12-15 20:06:40 -05:00
)
from superset.utils import json
from superset.utils.database import get_example_database
from superset.utils.urls import get_url_host
from tests.integration_tests.base_tests import SupersetTestCase
from tests.integration_tests.constants import GAMMA_USERNAME
from tests.integration_tests.conftest import with_feature_flags
from tests.integration_tests.fixtures.public_role import (
public_role_like_gamma, # noqa: F401
public_role_like_test_role, # noqa: F401
)
from tests.integration_tests.fixtures.birth_names_dashboard import (
load_birth_names_dashboard_with_slices, # noqa: F401
load_birth_names_data, # noqa: F401
)
from tests.integration_tests.fixtures.world_bank_dashboard import (
load_world_bank_dashboard_with_slices, # noqa: F401
load_world_bank_data, # noqa: F401
)
NEW_SECURITY_CONVERGE_VIEWS = (
"Annotation",
"Database",
"Dataset",
"Dashboard",
"CssTemplate",
"Chart",
"Query",
"SavedQuery",
)
def get_perm_tuples(role_name):
perm_set = set()
for perm in security_manager.find_role(role_name).permissions:
perm_set.add((perm.permission.name, perm.view_menu.name))
return perm_set
SCHEMA_ACCESS_ROLE = "schema_access_role"
def create_schema_perm(view_menu_name: str) -> None:
permission = "schema_access"
security_manager.add_permission_view_menu(permission, view_menu_name)
perm_view = security_manager.find_permission_view_menu(permission, view_menu_name)
security_manager.add_permission_role(
security_manager.find_role(SCHEMA_ACCESS_ROLE), perm_view
)
return None
def delete_schema_perm(view_menu_name: str) -> None:
pv = security_manager.find_permission_view_menu("schema_access", "[examples].[2]")
security_manager.del_permission_role(
security_manager.find_role(SCHEMA_ACCESS_ROLE), pv
)
security_manager.del_permission_view_menu("schema_access", "[examples].[2]")
return None
class TestRolePermission(SupersetTestCase):
"""Testing export role permissions."""
def setUp(self):
schema = get_example_default_schema()
security_manager.add_role(SCHEMA_ACCESS_ROLE)
db.session.commit()
ds = (
db.session.query(SqlaTable)
.filter_by(table_name="wb_health_population", schema=schema)
.first()
)
ds.schema = "temp_schema"
ds.schema_perm = ds.get_schema_perm()
ds_slices = (
db.session.query(Slice)
.filter_by(datasource_type=DatasourceType.TABLE)
.filter_by(datasource_id=ds.id)
.all()
)
for s in ds_slices:
s.schema_perm = ds.schema_perm
create_schema_perm("[examples].[temp_schema]")
gamma_user = security_manager.find_user(username="gamma")
gamma_user.roles.append(security_manager.find_role(SCHEMA_ACCESS_ROLE))
db.session.commit()
def tearDown(self):
ds = (
db.session.query(SqlaTable)
.filter_by(table_name="wb_health_population", schema="temp_schema")
.first()
)
schema_perm = ds.schema_perm
ds.schema = get_example_default_schema()
ds.schema_perm = None
ds_slices = (
db.session.query(Slice)
.filter_by(datasource_type=DatasourceType.TABLE)
.filter_by(datasource_id=ds.id)
.all()
)
for s in ds_slices:
s.schema_perm = None
delete_schema_perm(schema_perm)
db.session.delete(security_manager.find_role(SCHEMA_ACCESS_ROLE))
db.session.commit()
super().tearDown()
def test_after_insert_dataset(self):
security_manager.on_view_menu_after_insert = Mock()
security_manager.on_permission_view_after_insert = Mock()
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
table = SqlaTable(
schema="tmp_schema",
table_name="tmp_perm_table",
database=tmp_db1,
)
db.session.add(table)
db.session.commit()
table = db.session.query(SqlaTable).filter_by(table_name="tmp_perm_table").one()
self.assertEqual(table.perm, f"[tmp_db1].[tmp_perm_table](id:{table.id})")
pvm_dataset = security_manager.find_permission_view_menu(
"datasource_access", table.perm
)
pvm_schema = security_manager.find_permission_view_menu(
"schema_access", table.schema_perm
)
# Assert dataset permission is created and local perms are ok
self.assertIsNotNone(pvm_dataset)
self.assertEqual(table.perm, f"[tmp_db1].[tmp_perm_table](id:{table.id})")
self.assertEqual(table.schema_perm, "[tmp_db1].[tmp_schema]")
self.assertIsNotNone(pvm_schema)
# assert on permission hooks
call_args = security_manager.on_permission_view_after_insert.call_args
assert call_args.args[2].id == pvm_schema.id
security_manager.on_permission_view_after_insert.assert_has_calls(
[
call(ANY, ANY, ANY),
call(ANY, ANY, ANY),
]
)
# Cleanup
db.session.delete(table)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_insert_dataset_rollback(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
table = SqlaTable(
schema="tmp_schema",
table_name="tmp_table",
database=tmp_db1,
)
db.session.add(table)
db.session.flush()
pvm_dataset = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table](id:{table.id})"
)
self.assertIsNotNone(pvm_dataset)
table_id = table.id
db.session.rollback()
table = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table").one_or_none()
)
self.assertIsNone(table)
pvm_dataset = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table](id:{table_id})"
)
self.assertIsNone(pvm_dataset)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_insert_dataset_table_none(self):
table = SqlaTable(
schema="tmp_schema",
table_name="tmp_perm_table",
# Setting database_id instead of database will skip permission creation
database_id=get_example_database().id,
)
db.session.add(table)
db.session.commit()
stored_table = (
db.session.query(SqlaTable).filter_by(table_name="tmp_perm_table").one()
)
# Assert permission is created
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", stored_table.perm
)
)
# Assert no bogus permission is created
self.assertIsNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[None].[tmp_perm_table](id:{stored_table.id})"
)
)
# Cleanup
db.session.delete(table)
db.session.commit()
def test_after_insert_database(self):
security_manager.on_permission_view_after_insert = Mock()
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
self.assertEqual(tmp_db1.perm, f"[tmp_db1].(id:{tmp_db1.id})")
tmp_db1_pvm = security_manager.find_permission_view_menu(
"database_access", tmp_db1.perm
)
self.assertIsNotNone(tmp_db1_pvm)
# Assert the hook is called
security_manager.on_permission_view_after_insert.assert_has_calls(
[
call(ANY, ANY, ANY),
]
)
call_args = security_manager.on_permission_view_after_insert.call_args
assert call_args.args[2].id == tmp_db1_pvm.id
db.session.delete(tmp_db1)
db.session.commit()
def test_after_insert_database_rollback(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.flush()
pvm_database = security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
self.assertIsNotNone(pvm_database)
db.session.rollback()
pvm_database = security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1](id:{tmp_db1.id})"
)
self.assertIsNone(pvm_database)
def test_after_update_database__perm_database_access(self):
security_manager.on_view_menu_after_update = Mock()
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
self.assertIsNotNone(
security_manager.find_permission_view_menu("database_access", tmp_db1.perm)
)
tmp_db1.database_name = "tmp_db2"
db.session.commit()
# Assert that the old permission was updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
)
# Assert that the db permission was updated
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
)
# Assert the hook is called
tmp_db1_view_menu = security_manager.find_view_menu(
f"[tmp_db2].(id:{tmp_db1.id})"
)
security_manager.on_view_menu_after_update.assert_has_calls(
[
call(ANY, ANY, tmp_db1_view_menu),
]
)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_database_rollback(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
self.assertIsNotNone(
security_manager.find_permission_view_menu("database_access", tmp_db1.perm)
)
tmp_db1.database_name = "tmp_db2"
db.session.flush()
# Assert that the old permission was updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
)
# Assert that the db permission was updated
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
)
db.session.rollback()
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
)
# Assert that the db permission was updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_database__perm_database_access_exists(self):
security_manager.on_permission_view_after_delete = Mock()
# Add a bogus existing permission before the change
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
security_manager.add_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
self.assertIsNotNone(
security_manager.find_permission_view_menu("database_access", tmp_db1.perm)
)
tmp_db1.database_name = "tmp_db2"
db.session.commit()
# Assert that the old permission was updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
)
# Assert that the db permission was updated
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db2].(id:{tmp_db1.id})"
)
)
security_manager.on_permission_view_after_delete.assert_has_calls(
[
call(ANY, ANY, ANY),
]
)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_database__perm_datasource_access(self):
security_manager.on_view_menu_after_update = Mock()
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
table2 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table2",
database=tmp_db1,
)
db.session.add(table2)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
table2 = db.session.query(SqlaTable).filter_by(table_name="tmp_table2").one()
# assert initial perms
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table2](id:{table2.id})"
)
)
self.assertEqual(slice1.perm, f"[tmp_db1].[tmp_table1](id:{table1.id})")
self.assertEqual(table1.perm, f"[tmp_db1].[tmp_table1](id:{table1.id})")
self.assertEqual(table2.perm, f"[tmp_db1].[tmp_table2](id:{table2.id})")
# Refresh and update the database name
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
tmp_db1.database_name = "tmp_db2"
db.session.commit()
# Assert that the old permissions were updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
)
self.assertIsNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table2](id:{table2.id})"
)
)
# Assert that the db permission was updated
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db2].[tmp_table1](id:{table1.id})"
)
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db2].[tmp_table2](id:{table2.id})"
)
)
self.assertEqual(slice1.perm, f"[tmp_db2].[tmp_table1](id:{table1.id})")
self.assertEqual(table1.perm, f"[tmp_db2].[tmp_table1](id:{table1.id})")
self.assertEqual(table2.perm, f"[tmp_db2].[tmp_table2](id:{table2.id})")
# Assert hooks are called
tmp_db1_view_menu = security_manager.find_view_menu(
f"[tmp_db2].(id:{tmp_db1.id})"
)
table1_view_menu = security_manager.find_view_menu(
f"[tmp_db2].[tmp_table1](id:{table1.id})"
)
table2_view_menu = security_manager.find_view_menu(
f"[tmp_db2].[tmp_table2](id:{table2.id})"
)
security_manager.on_view_menu_after_update.assert_has_calls(
[
call(ANY, ANY, tmp_db1_view_menu),
call(ANY, ANY, table1_view_menu),
call(ANY, ANY, table2_view_menu),
]
)
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(table2)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_delete_database(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
database_pvm = security_manager.find_permission_view_menu(
"database_access", tmp_db1.perm
)
self.assertIsNotNone(database_pvm)
role1 = Role(name="tmp_role1")
role1.permissions.append(database_pvm)
db.session.add(role1)
db.session.commit()
db.session.delete(tmp_db1)
db.session.commit()
# Assert that PVM is removed from Role
role1 = security_manager.find_role("tmp_role1")
self.assertEqual(role1.permissions, [])
# Assert that the old permission was updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
)
# Cleanup
db.session.delete(role1)
db.session.commit()
def test_after_delete_database_rollback(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
tmp_db1 = db.session.query(Database).filter_by(database_name="tmp_db1").one()
database_pvm = security_manager.find_permission_view_menu(
"database_access", tmp_db1.perm
)
self.assertIsNotNone(database_pvm)
role1 = Role(name="tmp_role1")
role1.permissions.append(database_pvm)
db.session.add(role1)
db.session.commit()
db.session.delete(tmp_db1)
db.session.flush()
role1 = security_manager.find_role("tmp_role1")
self.assertEqual(role1.permissions, [])
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
)
db.session.rollback()
# Test a rollback reverts everything
database_pvm = security_manager.find_permission_view_menu(
"database_access", f"[tmp_db1].(id:{tmp_db1.id})"
)
role1 = security_manager.find_role("tmp_role1")
self.assertEqual(role1.permissions, [database_pvm])
# Cleanup
db.session.delete(role1)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_delete_dataset(self):
security_manager.on_permission_view_after_delete = Mock()
tmp_db = Database(database_name="tmp_db", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db,
)
db.session.add(table1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
role1 = Role(name="tmp_role1")
role1.permissions.append(table1_pvm)
db.session.add(role1)
db.session.commit()
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test delete
db.session.delete(table1)
db.session.commit()
role1 = security_manager.find_role("tmp_role1")
self.assertEqual(role1.permissions, [])
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNone(table1_pvm)
table1_view_menu = security_manager.find_view_menu(
f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNone(table1_view_menu)
# Assert the hook is called
security_manager.on_permission_view_after_delete.assert_has_calls(
[
call(ANY, ANY, ANY),
]
)
# cleanup
db.session.delete(role1)
db.session.delete(tmp_db)
db.session.commit()
def test_after_delete_dataset_rollback(self):
tmp_db = Database(database_name="tmp_db", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db,
)
db.session.add(table1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
role1 = Role(name="tmp_role1")
role1.permissions.append(table1_pvm)
db.session.add(role1)
db.session.commit()
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test delete, permissions are correctly deleted
db.session.delete(table1)
db.session.flush()
role1 = security_manager.find_role("tmp_role1")
self.assertEqual(role1.permissions, [])
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNone(table1_pvm)
# Test rollback, permissions exist everything is correctly rollback
db.session.rollback()
role1 = security_manager.find_role("tmp_role1")
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
self.assertEqual(role1.permissions, [table1_pvm])
# cleanup
db.session.delete(table1)
db.session.delete(role1)
db.session.delete(tmp_db)
db.session.commit()
def test_after_update_dataset__name_changes(self):
security_manager.on_view_menu_after_update = Mock()
tmp_db = Database(database_name="tmp_db", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.table_name = "tmp_table1_changed"
db.session.commit()
# Test old permission does not exist
old_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNone(old_table1_pvm)
# Test new permission exist
new_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
)
self.assertIsNotNone(new_table1_pvm)
# test dataset permission changed
changed_table1 = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table1_changed").one()
)
self.assertEqual(
changed_table1.perm, f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
)
# Test Chart permission changed
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
self.assertEqual(slice1.perm, f"[tmp_db].[tmp_table1_changed](id:{table1.id})")
# Assert hook is called
view_menu_dataset = security_manager.find_view_menu(
f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
)
security_manager.on_view_menu_after_update.assert_has_calls(
[
call(ANY, ANY, view_menu_dataset),
]
)
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db)
db.session.commit()
def test_after_update_dataset_rollback(self):
tmp_db = Database(database_name="tmp_db", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.table_name = "tmp_table1_changed"
db.session.flush()
# Test old permission does not exist
old_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNone(old_table1_pvm)
# Test new permission exist
new_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1_changed](id:{table1.id})"
)
self.assertIsNotNone(new_table1_pvm)
# Test rollback
db.session.rollback()
old_table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(old_table1_pvm)
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db)
db.session.commit()
def test_after_update_dataset__db_changes(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
tmp_db2 = Database(database_name="tmp_db2", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.add(tmp_db2)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.database = tmp_db2
db.session.commit()
# Test old permission does not exist
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
self.assertIsNone(table1_pvm)
# Test new permission exist
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db2].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
# test dataset permission and schema permission changed
changed_table1 = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
)
self.assertEqual(changed_table1.perm, f"[tmp_db2].[tmp_table1](id:{table1.id})")
self.assertEqual(changed_table1.schema_perm, "[tmp_db2].[tmp_schema]") # noqa: F541
# Test Chart permission changed
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
self.assertEqual(slice1.perm, f"[tmp_db2].[tmp_table1](id:{table1.id})")
self.assertEqual(slice1.schema_perm, f"[tmp_db2].[tmp_schema]") # noqa: F541
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db1)
db.session.delete(tmp_db2)
db.session.commit()
def test_after_update_dataset__schema_changes(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.schema = "tmp_schema_changed"
db.session.commit()
# Test permission still exists
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
# test dataset schema permission changed
changed_table1 = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
)
self.assertEqual(changed_table1.perm, f"[tmp_db1].[tmp_table1](id:{table1.id})")
self.assertEqual(changed_table1.schema_perm, "[tmp_db1].[tmp_schema_changed]") # noqa: F541
# Test Chart schema permission changed
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
self.assertEqual(slice1.perm, f"[tmp_db1].[tmp_table1](id:{table1.id})")
self.assertEqual(slice1.schema_perm, "[tmp_db1].[tmp_schema_changed]") # noqa: F541
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_dataset__schema_none(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.schema = None
db.session.commit()
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
self.assertEqual(table1.perm, f"[tmp_db1].[tmp_table1](id:{table1.id})")
self.assertIsNone(table1.schema_perm)
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db1)
db.session.commit()
def test_after_update_dataset__name_db_changes(self):
tmp_db1 = Database(database_name="tmp_db1", sqlalchemy_uri="sqlite://")
tmp_db2 = Database(database_name="tmp_db2", sqlalchemy_uri="sqlite://")
db.session.add(tmp_db1)
db.session.add(tmp_db2)
db.session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=tmp_db1,
)
db.session.add(table1)
db.session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
db.session.add(slice1)
db.session.commit()
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
# refresh
table1 = db.session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
# Test update
table1.table_name = "tmp_table1_changed"
table1.database = tmp_db2
db.session.commit()
# Test old permission does not exist
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db1].[tmp_table1](id:{table1.id})"
)
self.assertIsNone(table1_pvm)
# Test new permission exist
table1_pvm = security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_db2].[tmp_table1_changed](id:{table1.id})"
)
self.assertIsNotNone(table1_pvm)
# test dataset permission and schema permission changed
changed_table1 = (
db.session.query(SqlaTable).filter_by(table_name="tmp_table1_changed").one()
)
self.assertEqual(
changed_table1.perm, f"[tmp_db2].[tmp_table1_changed](id:{table1.id})"
)
self.assertEqual(changed_table1.schema_perm, "[tmp_db2].[tmp_schema]") # noqa: F541
# Test Chart permission changed
slice1 = db.session.query(Slice).filter_by(slice_name="tmp_slice1").one()
self.assertEqual(slice1.perm, f"[tmp_db2].[tmp_table1_changed](id:{table1.id})")
self.assertEqual(slice1.schema_perm, f"[tmp_db2].[tmp_schema]") # noqa: F541
# cleanup
db.session.delete(slice1)
db.session.delete(table1)
db.session.delete(tmp_db1)
db.session.delete(tmp_db2)
db.session.commit()
def test_hybrid_perm_database(self):
database = Database(database_name="tmp_database3", sqlalchemy_uri="sqlite://")
db.session.add(database)
id_ = (
db.session.query(Database.id)
.filter_by(database_name="tmp_database3")
.scalar()
)
record = (
db.session.query(Database)
.filter_by(perm=f"[tmp_database3].(id:{id_})")
.one()
)
self.assertEqual(record.get_perm(), record.perm)
self.assertEqual(record.id, id_)
self.assertEqual(record.database_name, "tmp_database3")
db.session.delete(database)
db.session.commit()
def test_set_perm_slice(self):
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
table = SqlaTable(table_name="tmp_perm_table", database=database)
db.session.add(database)
db.session.add(table)
db.session.commit()
# no schema permission
slice = Slice(
datasource_id=table.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_perm_table",
slice_name="slice_name",
)
db.session.add(slice)
db.session.commit()
slice = db.session.query(Slice).filter_by(slice_name="slice_name").one()
self.assertEqual(slice.perm, table.perm)
self.assertEqual(slice.perm, f"[tmp_database].[tmp_perm_table](id:{table.id})")
self.assertEqual(slice.schema_perm, table.schema_perm)
self.assertIsNone(slice.schema_perm)
table.schema = "tmp_perm_schema"
table.table_name = "tmp_perm_table_v2"
db.session.commit()
table = (
db.session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one()
)
self.assertEqual(slice.perm, table.perm)
self.assertEqual(
slice.perm, f"[tmp_database].[tmp_perm_table_v2](id:{table.id})"
)
self.assertEqual(
table.perm, f"[tmp_database].[tmp_perm_table_v2](id:{table.id})"
)
self.assertEqual(slice.schema_perm, table.schema_perm)
self.assertEqual(slice.schema_perm, "[tmp_database].[tmp_perm_schema]")
db.session.delete(slice)
db.session.delete(table)
db.session.delete(database)
db.session.commit()
@patch("superset.utils.core.g")
@patch("superset.security.manager.g")
def test_schemas_accessible_by_user_admin(self, mock_sm_g, mock_g):
mock_g.user = mock_sm_g.user = security_manager.find_user("admin")
with self.client.application.test_request_context():
database = get_example_database()
schemas = security_manager.get_schemas_accessible_by_user(
database, None, {"1", "2", "3"}
)
self.assertEqual(schemas, {"1", "2", "3"}) # no changes
@patch("superset.utils.core.g")
@patch("superset.security.manager.g")
def test_schemas_accessible_by_user_schema_access(self, mock_sm_g, mock_g):
# User has schema access to the schema 1
create_schema_perm("[examples].[1]")
mock_g.user = mock_sm_g.user = security_manager.find_user("gamma")
with self.client.application.test_request_context():
database = get_example_database()
schemas = security_manager.get_schemas_accessible_by_user(
database, None, {"1", "2", "3"}
)
# temp_schema is not passed in the params
self.assertEqual(schemas, {"1"})
delete_schema_perm("[examples].[1]")
def test_schemas_accessible_by_user_datasource_access(self):
# User has schema access to the datasource temp_schema.wb_health_population in examples DB.
database = get_example_database()
with self.client.application.test_request_context():
with override_user(security_manager.find_user("gamma")):
schemas = security_manager.get_schemas_accessible_by_user(
database, None, {"temp_schema", "2", "3"}
)
self.assertEqual(schemas, {"temp_schema"})
def test_schemas_accessible_by_user_datasource_and_schema_access(self):
# User has schema access to the datasource temp_schema.wb_health_population in examples DB.
create_schema_perm("[examples].[2]")
with self.client.application.test_request_context():
database = get_example_database()
with override_user(security_manager.find_user("gamma")):
schemas = security_manager.get_schemas_accessible_by_user(
database, None, {"temp_schema", "2", "3"}
)
self.assertEqual(schemas, {"temp_schema", "2"})
vm = security_manager.find_permission_view_menu(
"schema_access", "[examples].[2]"
)
self.assertIsNotNone(vm)
delete_schema_perm("[examples].[2]")
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
def test_gamma_user_schema_access_to_dashboards(self):
dash = db.session.query(Dashboard).filter_by(slug="world_health").first()
dash.published = True
db.session.commit()
self.login(GAMMA_USERNAME)
data = str(self.client.get("api/v1/dashboard/").data)
self.assertIn("/superset/dashboard/world_health/", data)
self.assertNotIn("/superset/dashboard/births/", data)
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
@pytest.mark.usefixtures("public_role_like_gamma")
def test_public_sync_role_data_perms(self):
"""
Security: Tests if the sync role method preserves data access permissions
if they already exist on a public role.
Also check that non data access permissions are removed
"""
table = db.session.query(SqlaTable).filter_by(table_name="birth_names").one()
self.grant_public_access_to_table(table)
public_role = security_manager.get_public_role()
unwanted_pvm = security_manager.find_permission_view_menu(
"menu_access", "Security"
)
public_role.permissions.append(unwanted_pvm)
db.session.commit()
security_manager.sync_role_definitions()
public_role = security_manager.get_public_role()
public_role_resource_names = [
permission.view_menu.name for permission in public_role.permissions
]
assert table.get_perm() in public_role_resource_names
assert "Security" not in public_role_resource_names
# Cleanup
self.revoke_public_access_to_table(table)
@pytest.mark.usefixtures("public_role_like_test_role")
def test_public_sync_role_builtin_perms(self):
"""
Security: Tests public role creation based on a builtin role
"""
public_role = security_manager.get_public_role()
public_role_resource_names = [
[permission.view_menu.name, permission.permission.name]
for permission in public_role.permissions
]
for pvm in current_app.config["FAB_ROLES"]["TestRole"]:
assert pvm in public_role_resource_names
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
def test_sqllab_gamma_user_schema_access_to_sqllab(self):
example_db = (
db.session.query(Database).filter_by(database_name="examples").one()
)
example_db.expose_in_sqllab = True
db.session.commit()
arguments = {
"keys": ["none"],
"columns": ["expose_in_sqllab"],
"filters": [{"col": "expose_in_sqllab", "opr": "eq", "value": True}],
"order_columns": "database_name",
"order_direction": "asc",
"page": 0,
"page_size": -1,
}
NEW_FLASK_GET_SQL_DBS_REQUEST = f"/api/v1/database/?q={prison.dumps(arguments)}"
self.login(GAMMA_USERNAME)
databases_json = self.client.get(NEW_FLASK_GET_SQL_DBS_REQUEST).json
self.assertEqual(databases_json["count"], 1)
def assert_can_read(self, view_menu, permissions_set):
if view_menu in NEW_SECURITY_CONVERGE_VIEWS:
self.assertIn(("can_read", view_menu), permissions_set)
else:
self.assertIn(("can_list", view_menu), permissions_set)
def assert_can_write(self, view_menu, permissions_set):
if view_menu in NEW_SECURITY_CONVERGE_VIEWS:
self.assertIn(("can_write", view_menu), permissions_set)
else:
self.assertIn(("can_add", view_menu), permissions_set)
self.assertIn(("can_delete", view_menu), permissions_set)
self.assertIn(("can_edit", view_menu), permissions_set)
def assert_cannot_write(self, view_menu, permissions_set):
if view_menu in NEW_SECURITY_CONVERGE_VIEWS:
self.assertNotIn(("can_write", view_menu), permissions_set)
else:
self.assertNotIn(("can_add", view_menu), permissions_set)
self.assertNotIn(("can_delete", view_menu), permissions_set)
self.assertNotIn(("can_edit", view_menu), permissions_set)
self.assertNotIn(("can_save", view_menu), permissions_set)
def assert_can_all(self, view_menu, permissions_set):
self.assert_can_read(view_menu, permissions_set)
self.assert_can_write(view_menu, permissions_set)
def assert_can_menu(self, view_menu, permissions_set):
self.assertIn(("menu_access", view_menu), permissions_set)
def assert_cannot_menu(self, view_menu, permissions_set):
self.assertNotIn(("menu_access", view_menu), permissions_set)
def assert_cannot_gamma(self, perm_set):
self.assert_cannot_write("Annotation", perm_set)
self.assert_cannot_write("CssTemplate", perm_set)
self.assert_cannot_menu("SQL Lab", perm_set)
self.assert_cannot_menu("CSS Templates", perm_set)
self.assert_cannot_menu("Annotation Layers", perm_set)
self.assert_cannot_menu("Manage", perm_set)
self.assert_cannot_menu("Queries", perm_set)
self.assert_cannot_menu("Import dashboards", perm_set)
self.assert_cannot_menu("Upload a CSV", perm_set)
self.assert_cannot_menu("ReportSchedule", perm_set)
self.assert_cannot_menu("Alerts & Report", perm_set)
self.assertNotIn(("can_csv_upload", "Database"), perm_set)
def assert_can_gamma(self, perm_set):
self.assert_can_read("Dataset", perm_set)
# make sure that user can create slices and dashboards
self.assert_can_all("Dashboard", perm_set)
self.assert_can_all("Chart", perm_set)
2019-06-25 16:34:48 -04:00
self.assertIn(("can_csv", "Superset"), perm_set)
self.assertIn(("can_dashboard", "Superset"), perm_set)
self.assertIn(("can_explore", "Superset"), perm_set)
self.assertIn(("can_share_chart", "Superset"), perm_set)
self.assertIn(("can_share_dashboard", "Superset"), perm_set)
2019-06-25 16:34:48 -04:00
self.assertIn(("can_explore_json", "Superset"), perm_set)
self.assertIn(("can_explore_json", "Superset"), perm_set)
self.assertIn(("can_userinfo", "UserDBModelView"), perm_set)
self.assertIn(("can_view_chart_as_table", "Dashboard"), perm_set)
self.assertIn(("can_view_query", "Dashboard"), perm_set)
self.assert_can_menu("Databases", perm_set)
self.assert_can_menu("Datasets", perm_set)
self.assert_can_menu("Data", perm_set)
self.assert_can_menu("Charts", perm_set)
self.assert_can_menu("Dashboards", perm_set)
def assert_can_alpha(self, perm_set):
self.assert_can_all("Annotation", perm_set)
self.assert_can_all("CssTemplate", perm_set)
self.assert_can_all("Dataset", perm_set)
self.assert_can_read("Database", perm_set)
self.assertIn(("can_csv_upload", "Database"), perm_set)
self.assert_can_menu("Manage", perm_set)
self.assert_can_menu("Annotation Layers", perm_set)
self.assert_can_menu("CSS Templates", perm_set)
2019-06-25 16:34:48 -04:00
self.assertIn(("all_datasource_access", "all_datasource_access"), perm_set)
def assert_cannot_alpha(self, perm_set):
2019-06-25 16:34:48 -04:00
self.assert_cannot_write("Queries", perm_set)
self.assert_cannot_write("RoleModelView", perm_set)
self.assert_cannot_write("UserDBModelView", perm_set)
self.assert_cannot_write("Database", perm_set)
def assert_can_admin(self, perm_set):
self.assert_can_all("Database", perm_set)
2019-06-25 16:34:48 -04:00
self.assert_can_all("RoleModelView", perm_set)
self.assert_can_all("UserDBModelView", perm_set)
self.assertIn(("all_database_access", "all_database_access"), perm_set)
self.assert_can_menu("Security", perm_set)
self.assert_can_menu("List Users", perm_set)
self.assert_can_menu("List Roles", perm_set)
def test_is_admin_only(self):
2019-06-25 16:34:48 -04:00
self.assertFalse(
security_manager._is_admin_only(
security_manager.find_permission_view_menu("can_read", "Dataset")
2019-06-25 16:34:48 -04:00
)
)
self.assertFalse(
security_manager._is_admin_only(
security_manager.find_permission_view_menu(
2019-06-25 16:34:48 -04:00
"all_datasource_access", "all_datasource_access"
)
)
)
log_permissions = ["can_read"]
for log_permission in log_permissions:
self.assertTrue(
security_manager._is_admin_only(
security_manager.find_permission_view_menu(log_permission, "Log")
)
2019-06-25 16:34:48 -04:00
)
2019-06-25 16:34:48 -04:00
self.assertTrue(
security_manager._is_admin_only(
2019-06-25 16:34:48 -04:00
security_manager.find_permission_view_menu(
"can_edit", "UserDBModelView"
)
)
)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_is_alpha_only(self):
2019-06-25 16:34:48 -04:00
self.assertFalse(
security_manager._is_alpha_only(
security_manager.find_permission_view_menu("can_read", "Dataset")
2019-06-25 16:34:48 -04:00
)
)
self.assertTrue(
security_manager._is_alpha_only(
security_manager.find_permission_view_menu("can_write", "Dataset")
2019-06-25 16:34:48 -04:00
)
)
self.assertTrue(
security_manager._is_alpha_only(
2019-06-25 16:34:48 -04:00
security_manager.find_permission_view_menu(
"all_datasource_access", "all_datasource_access"
)
)
)
self.assertTrue(
security_manager._is_alpha_only(
2019-06-25 16:34:48 -04:00
security_manager.find_permission_view_menu(
"all_database_access", "all_database_access"
)
)
)
def test_is_gamma_pvm(self):
2019-06-25 16:34:48 -04:00
self.assertTrue(
security_manager._is_gamma_pvm(
security_manager.find_permission_view_menu("can_read", "Dataset")
2019-06-25 16:34:48 -04:00
)
)
def test_gamma_permissions_basic(self):
2019-06-25 16:34:48 -04:00
self.assert_can_gamma(get_perm_tuples("Gamma"))
self.assert_cannot_alpha(get_perm_tuples("Gamma"))
self.assert_cannot_gamma(get_perm_tuples("Gamma"))
@pytest.mark.usefixtures("public_role_like_gamma")
def test_public_permissions_basic(self):
self.assert_can_gamma(get_perm_tuples("Public"))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_alpha_permissions(self):
alpha_perm_tuples = get_perm_tuples("Alpha")
self.assert_can_gamma(alpha_perm_tuples)
self.assert_can_alpha(alpha_perm_tuples)
self.assert_cannot_alpha(alpha_perm_tuples)
self.assertNotIn(("can_this_form_get", "UserInfoEditView"), alpha_perm_tuples)
self.assertNotIn(("can_this_form_post", "UserInfoEditView"), alpha_perm_tuples)
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
def test_admin_permissions(self):
2021-12-15 20:06:40 -05:00
if backend() == "hive":
return
2019-06-25 16:34:48 -04:00
self.assert_can_gamma(get_perm_tuples("Admin"))
self.assert_can_alpha(get_perm_tuples("Admin"))
self.assert_can_admin(get_perm_tuples("Admin"))
def test_sql_lab_permissions(self):
2019-06-25 16:34:48 -04:00
sql_lab_set = get_perm_tuples("sql_lab")
self.assertEqual(
sql_lab_set,
{
("can_activate", "TabStateView"),
("can_csv", "Superset"),
("can_delete_query", "TabStateView"),
("can_delete", "TabStateView"),
("can_execute_sql_query", "SQLLab"),
("can_export", "SavedQuery"),
("can_export_csv", "SQLLab"),
("can_get", "TabStateView"),
("can_get_results", "SQLLab"),
("can_migrate_query", "TabStateView"),
("can_sqllab", "Superset"),
("can_sqllab_history", "Superset"),
("can_put", "TabStateView"),
("can_post", "TabStateView"),
("can_write", "SavedQuery"),
("can_read", "Query"),
("can_read", "Database"),
("can_read", "SQLLab"),
("can_read", "SavedQuery"),
("menu_access", "Query Search"),
("menu_access", "Saved Queries"),
("menu_access", "SQL Editor"),
("menu_access", "SQL Lab"),
},
)
self.assert_cannot_alpha(sql_lab_set)
def test_gamma_permissions(self):
gamma_perm_set = set()
2019-06-25 16:34:48 -04:00
for perm in security_manager.find_role("Gamma").permissions:
gamma_perm_set.add((perm.permission.name, perm.view_menu.name))
# check read only perms
# make sure that user can create slices and dashboards
self.assert_can_all("Dashboard", gamma_perm_set)
self.assert_can_read("Dataset", gamma_perm_set)
# make sure that user can create slices and dashboards
self.assert_can_all("Chart", gamma_perm_set)
2019-06-25 16:34:48 -04:00
self.assert_cannot_write("UserDBModelView", gamma_perm_set)
self.assert_cannot_write("RoleModelView", gamma_perm_set)
2019-06-25 16:34:48 -04:00
self.assertIn(("can_csv", "Superset"), gamma_perm_set)
self.assertIn(("can_dashboard", "Superset"), gamma_perm_set)
self.assertIn(("can_explore", "Superset"), gamma_perm_set)
self.assertIn(("can_share_chart", "Superset"), gamma_perm_set)
self.assertIn(("can_share_dashboard", "Superset"), gamma_perm_set)
2019-06-25 16:34:48 -04:00
self.assertIn(("can_explore_json", "Superset"), gamma_perm_set)
self.assertIn(("can_userinfo", "UserDBModelView"), gamma_perm_set)
self.assertIn(("can_view_chart_as_table", "Dashboard"), gamma_perm_set)
self.assertIn(("can_view_query", "Dashboard"), gamma_perm_set)
def test_views_are_secured(self):
"""Preventing the addition of unsecured views without has_access decorator"""
# These FAB views are secured in their body as opposed to by decorators
method_allowlist = ("action", "action_post")
# List of redirect & other benign views
views_allowlist = [
2019-06-25 16:34:48 -04:00
["MyIndexView", "index"],
["UtilView", "back"],
["LocaleView", "index"],
["AuthDBView", "login"],
["AuthDBView", "logout"],
["CurrentUserRestApi", "get_me"],
["CurrentUserRestApi", "get_my_roles"],
["UserRestApi", "avatar"],
# TODO (embedded) remove Dashboard:embedded after uuids have been shipped
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
["Dashboard", "embedded"],
["EmbeddedView", "embedded"],
2019-06-25 16:34:48 -04:00
["R", "index"],
["Superset", "log"],
["Superset", "theme"],
["Superset", "welcome"],
["SecurityApi", "login"],
["SecurityApi", "refresh"],
["SupersetIndexView", "index"],
["DatabaseRestApi", "oauth2"],
]
unsecured_views = []
for view_class in appbuilder.baseviews:
class_name = view_class.__class__.__name__
2019-06-25 16:34:48 -04:00
for name, value in inspect.getmembers(
view_class, predicate=inspect.ismethod
):
if (
name not in method_allowlist
and [class_name, name] not in views_allowlist
2019-06-25 16:34:48 -04:00
and hasattr(value, "_urls")
and not hasattr(value, "_permission_name")
):
unsecured_views.append((class_name, name))
if unsecured_views:
2019-06-25 16:34:48 -04:00
view_str = "\n".join([str(v) for v in unsecured_views])
raise Exception(f"Some views are not secured:\n{view_str}")
class TestSecurityManager(SupersetTestCase):
"""
Testing the Security Manager.
"""
@patch("superset.security.SupersetSecurityManager.raise_for_access")
def test_can_access_datasource(self, mock_raise_for_access):
datasource = self.get_datasource_mock()
mock_raise_for_access.return_value = None
self.assertTrue(security_manager.can_access_datasource(datasource=datasource))
mock_raise_for_access.side_effect = SupersetSecurityException(
SupersetError(
"dummy",
SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
ErrorLevel.ERROR,
)
)
self.assertFalse(security_manager.can_access_datasource(datasource=datasource))
@patch("superset.security.SupersetSecurityManager.raise_for_access")
def test_can_access_table(self, mock_raise_for_access):
database = get_example_database()
table = Table("bar", "foo")
mock_raise_for_access.return_value = None
self.assertTrue(security_manager.can_access_table(database, table))
mock_raise_for_access.side_effect = SupersetSecurityException(
SupersetError(
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
"dummy", SupersetErrorType.TABLE_SECURITY_ACCESS_ERROR, ErrorLevel.ERROR
)
)
self.assertFalse(security_manager.can_access_table(database, table))
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
@patch("superset.security.SupersetSecurityManager.can_access_schema")
def test_raise_for_access_datasource(
self, mock_can_access_schema, mock_can_access, mock_is_owner
):
datasource = self.get_datasource_mock()
mock_can_access_schema.return_value = True
security_manager.raise_for_access(datasource=datasource)
mock_can_access.return_value = False
mock_can_access_schema.return_value = False
mock_is_owner.return_value = False
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(datasource=datasource)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
def test_raise_for_access_query(self, mock_can_access, mock_is_owner):
query = Mock(
database=get_example_database(), schema="bar", sql="SELECT * FROM foo"
)
mock_can_access.return_value = True
security_manager.raise_for_access(query=query)
mock_can_access.return_value = False
mock_is_owner.return_value = False
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(query=query)
def test_raise_for_access_sql_fails(self):
with override_user(security_manager.find_user("gamma")):
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(
database=get_example_database(),
schema="bar",
sql="SELECT * FROM foo",
)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
def test_raise_for_access_sql(self, mock_can_access, mock_is_owner):
mock_can_access.return_value = True
mock_is_owner.return_value = True
with override_user(security_manager.find_user("gamma")):
security_manager.raise_for_access(
database=get_example_database(), schema="bar", sql="SELECT * FROM foo"
)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
@patch("superset.security.SupersetSecurityManager.can_access_schema")
def test_raise_for_access_query_context(
self, mock_can_access_schema, mock_can_access, mock_is_owner
):
query_context = Mock(datasource=self.get_datasource_mock(), form_data={})
mock_can_access_schema.return_value = True
security_manager.raise_for_access(query_context=query_context)
mock_can_access.return_value = False
mock_can_access_schema.return_value = False
mock_is_owner.return_value = False
with override_user(security_manager.find_user("gamma")):
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(query_context=query_context)
@patch("superset.security.SupersetSecurityManager.can_access")
def test_raise_for_access_table(self, mock_can_access):
database = get_example_database()
table = Table("bar", "foo")
mock_can_access.return_value = True
security_manager.raise_for_access(database=database, table=table)
mock_can_access.return_value = False
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(database=database, table=table)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
@patch("superset.security.SupersetSecurityManager.can_access_schema")
def test_raise_for_access_viz(
self, mock_can_access_schema, mock_can_access, mock_is_owner
):
test_viz = viz.TimeTableViz(self.get_datasource_mock(), form_data={})
mock_can_access_schema.return_value = True
security_manager.raise_for_access(viz=test_viz)
mock_can_access.return_value = False
mock_can_access_schema.return_value = False
mock_is_owner.return_value = False
with override_user(security_manager.find_user("gamma")):
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(viz=test_viz)
[SIP-29] Add support for row-level security (#8699) * Support and apply filters. * Added the UI for row level security, and moved it all under SQLA in order to access the Table model more easily. * Added a row level security filter documentation entry. * Accidentally added two new lines to this file. * Blacked and iSorted, hopefully. Also, sometimes g.user may not be set. * Another isort, and handling g not having a user attribute another way. * Let's try this again #CI tests. * Adjusted import order for isort; I was sure I'd already done this.. * Row level filters should be wrapped in parentheses in case one contains an OR. * Oops, did not think that would change Black's formatting. * Changes as per @mistercrunch. * RLS filters are now many-to-many with Roles. * Updated documentation to reflect RLS filters supporting multiple rows. * Let's see what happens when I set it to the previous revision ID * Updated from upstream. * There was a pylint error. * Added RLS ids to the cache keys; modified documentation; added template processing to RLS filters. * A new migration was merged in. * Removed RLS cache key from query_object. * RLS added to the cache_key from query_context. * Changes as per @etr2460. * Updating entry for RLS pull request. * Another migration to skip. * Changes as per @serenajiang. * Blacked. * Blacked and added some attributes to check for. * Changed to a manual query as per @mistercrunch. * Blacked. * Another migration in the meantime. * Black wanted some whitespace changes. * AttributeError: 'AnonymousUserMixin' object has no attribute 'id'. * Oops, did hasattr backwards. * Changes as per @mistercrunch. * Doesn't look like text us required here anymore. * Changes as per @dpgaspar * Two RLS tests. * Row level security is now disabled by default via the feature flag ENABLE_ROW_LEVEL_SECURITY. * New head to revise. * Changed the comment.
2020-02-22 04:21:31 -05:00
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
@pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
@with_feature_flags(DASHBOARD_RBAC=True)
@patch("superset.security.SupersetSecurityManager.is_owner")
@patch("superset.security.SupersetSecurityManager.can_access")
@patch("superset.security.SupersetSecurityManager.can_access_schema")
def test_raise_for_access_rbac(
self,
mock_can_access_schema,
mock_can_access,
mock_is_owner,
):
births = self.get_dash_by_slug("births")
girls = self.get_slice("Girls")
birth_names = girls.datasource
world_health = self.get_dash_by_slug("world_health")
treemap = self.get_slice("Treemap")
births.json_metadata = json.dumps(
{
"native_filter_configuration": [
{
"id": "NATIVE_FILTER-ABCDEFGH",
"targets": [{"datasetId": birth_names.id}],
},
{
"id": "NATIVE_FILTER-IJKLMNOP",
"targets": [{"datasetId": treemap.id}],
},
]
}
)
mock_is_owner.return_value = False
mock_can_access.return_value = False
mock_can_access_schema.return_value = False
with override_user(security_manager.find_user("gamma")):
for kwarg in ["query_context", "viz"]:
births.roles = []
# No dashboard roles.
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"slice_id": girls.id,
},
)
}
)
births.roles = [self.get_role("Gamma")]
# Undefined dashboard.
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={},
)
}
)
# Undefined dashboard chart.
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={"dashboardId": births.id},
)
}
)
# Ill-defined dashboard chart.
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"slice_id": treemap.id,
},
)
}
)
# Dashboard chart not associated with said datasource.
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": world_health.id,
"slice_id": treemap.id,
},
)
}
)
# Dashboard chart associated with said datasource.
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"slice_id": girls.id,
},
)
}
)
# Ill-defined native filter.
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"type": "NATIVE_FILTER",
},
)
}
)
# Native filter not associated with said datasource.
with self.assertRaises(SupersetSecurityException):
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"native_filter_id": "NATIVE_FILTER-IJKLMNOP",
"type": "NATIVE_FILTER",
},
)
}
)
# Native filter associated with said datasource.
security_manager.raise_for_access(
**{
kwarg: Mock(
datasource=birth_names,
form_data={
"dashboardId": births.id,
"native_filter_id": "NATIVE_FILTER-ABCDEFGH",
"type": "NATIVE_FILTER",
},
)
}
)
def test_get_user_roles(self):
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
admin = security_manager.find_user("admin")
with override_user(admin):
roles = security_manager.get_user_roles()
self.assertEqual(admin.roles, roles)
[SIP-29] Add support for row-level security (#8699) * Support and apply filters. * Added the UI for row level security, and moved it all under SQLA in order to access the Table model more easily. * Added a row level security filter documentation entry. * Accidentally added two new lines to this file. * Blacked and iSorted, hopefully. Also, sometimes g.user may not be set. * Another isort, and handling g not having a user attribute another way. * Let's try this again #CI tests. * Adjusted import order for isort; I was sure I'd already done this.. * Row level filters should be wrapped in parentheses in case one contains an OR. * Oops, did not think that would change Black's formatting. * Changes as per @mistercrunch. * RLS filters are now many-to-many with Roles. * Updated documentation to reflect RLS filters supporting multiple rows. * Let's see what happens when I set it to the previous revision ID * Updated from upstream. * There was a pylint error. * Added RLS ids to the cache keys; modified documentation; added template processing to RLS filters. * A new migration was merged in. * Removed RLS cache key from query_object. * RLS added to the cache_key from query_context. * Changes as per @etr2460. * Updating entry for RLS pull request. * Another migration to skip. * Changes as per @serenajiang. * Blacked. * Blacked and added some attributes to check for. * Changed to a manual query as per @mistercrunch. * Blacked. * Another migration in the meantime. * Black wanted some whitespace changes. * AttributeError: 'AnonymousUserMixin' object has no attribute 'id'. * Oops, did hasattr backwards. * Changes as per @mistercrunch. * Doesn't look like text us required here anymore. * Changes as per @dpgaspar * Two RLS tests. * Row level security is now disabled by default via the feature flag ENABLE_ROW_LEVEL_SECURITY. * New head to revise. * Changed the comment.
2020-02-22 04:21:31 -05:00
def test_get_anonymous_roles(self):
with override_user(security_manager.get_anonymous_user()):
roles = security_manager.get_user_roles()
self.assertEqual([security_manager.get_public_role()], roles)
def test_all_database_access(self):
gamma_user = security_manager.find_user(username="gamma")
g.user = gamma_user
# Double checking that gamma users can't access all databases
assert not security_manager.can_access_all_databases()
assert not security_manager.can_access_datasource(self.get_datasource_mock())
all_db_pvm = ("all_database_access", "all_database_access")
with self.temporary_user(gamma_user, extra_pvms=[all_db_pvm]):
assert security_manager.can_access_all_databases()
assert security_manager.can_access_datasource(self.get_datasource_mock())
class TestDatasources(SupersetTestCase):
@patch("superset.security.SupersetSecurityManager.can_access_database")
@patch("superset.security.SupersetSecurityManager.get_session")
def test_get_user_datasources_admin(
self, mock_get_session, mock_can_access_database
):
Datasource = namedtuple("Datasource", ["database", "schema", "name"])
mock_can_access_database.return_value = True
mock_get_session.query.return_value.filter.return_value.all.return_value = []
with mock.patch.object(
SqlaTable, "get_all_datasources"
) as mock_get_all_datasources:
mock_get_all_datasources.return_value = [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
Datasource("database2", None, "table1"),
]
with override_user(security_manager.find_user("admin")):
datasources = security_manager.get_user_datasources()
assert sorted(datasources) == [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
Datasource("database2", None, "table1"),
]
@patch("superset.security.SupersetSecurityManager.can_access_database")
@patch("superset.security.SupersetSecurityManager.get_session")
def test_get_user_datasources_gamma(
self, mock_get_session, mock_can_access_database
):
Datasource = namedtuple("Datasource", ["database", "schema", "name"])
mock_can_access_database.return_value = False
mock_get_session.query.return_value.filter.return_value.all.return_value = []
with mock.patch.object(
SqlaTable, "get_all_datasources"
) as mock_get_all_datasources:
mock_get_all_datasources.return_value = [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
Datasource("database2", None, "table1"),
]
with override_user(security_manager.find_user("gamma")):
datasources = security_manager.get_user_datasources()
assert datasources == []
@patch("superset.security.SupersetSecurityManager.can_access_database")
@patch("superset.security.SupersetSecurityManager.get_session")
def test_get_user_datasources_gamma_with_schema(
self, mock_get_session, mock_can_access_database
):
Datasource = namedtuple("Datasource", ["database", "schema", "name"])
mock_can_access_database.return_value = False
mock_get_session.query.return_value.filter.return_value.all.return_value = [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
]
with mock.patch.object(
SqlaTable, "get_all_datasources"
) as mock_get_all_datasources:
mock_get_all_datasources.return_value = [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
Datasource("database2", None, "table1"),
]
with override_user(security_manager.find_user("gamma")):
datasources = security_manager.get_user_datasources()
assert sorted(datasources) == [
Datasource("database1", "schema1", "table1"),
Datasource("database1", "schema1", "table2"),
]
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
class FakeRequest:
headers: Any = {}
form: Any = {}
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
class TestGuestTokens(SupersetTestCase):
def create_guest_token(self):
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
rls = [{"dataset": 1, "clause": "access = 1"}]
return security_manager.create_guest_access_token(user, resources, rls)
@patch("superset.security.SupersetSecurityManager._get_current_epoch_time")
def test_create_guest_access_token(self, get_time_mock):
now = time.time()
get_time_mock.return_value = now # so we know what it should =
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
rls = [{"dataset": 1, "clause": "access = 1"}]
token = security_manager.create_guest_access_token(user, resources, rls)
aud = get_url_host()
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
# unfortunately we cannot mock time in the jwt lib
decoded_token = jwt.decode(
token,
self.app.config["GUEST_TOKEN_JWT_SECRET"],
algorithms=[self.app.config["GUEST_TOKEN_JWT_ALGO"]],
audience=aud,
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
)
self.assertEqual(user, decoded_token["user"])
self.assertEqual(resources, decoded_token["resources"])
self.assertEqual(now, decoded_token["iat"])
self.assertEqual(aud, decoded_token["aud"])
self.assertEqual("guest", decoded_token["type"])
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
self.assertEqual(
now + (self.app.config["GUEST_TOKEN_JWT_EXP_SECONDS"]),
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
decoded_token["exp"],
)
def test_get_guest_user(self):
token = self.create_guest_token()
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
self.assertIsNotNone(guest_user)
self.assertEqual("test_guest", guest_user.username)
def test_get_guest_user_with_request_form(self):
token = self.create_guest_token()
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = None
fake_request.form["guest_token"] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
self.assertIsNotNone(guest_user)
self.assertEqual("test_guest", guest_user.username)
feat: embedded dashboard core (#17530) * feat(dashboard): embedded dashboard UI configuration (#17175) (#17450) * setup embedded provider * update ui configuration * fix test * feat: Guest token (for embedded dashboard auth) (#17517) * generate an embed token * improve existing tests * add some auth setup, and rename token * fix the stuff for compatibility with external request loaders * docs, standard jwt claims, tweaks * black * lint * tests, and safer token decoding * linting * type annotation * prettier * add feature flag * quiet pylint * apparently typing is a problem again * Make guest role name configurable * fake being a non-anonymous user * just one log entry * customizable algo * lint * lint again * 403 works now! * get guest token from header instead of cookie * Revert "403 works now!" This reverts commit df2f49a6d4267b3cccccd66549d54e25bae8e0b6. * fix tests * Revert "Revert "403 works now!"" This reverts commit 883dff38f16537e41f0eb5d699845263c96be5cb. * rename method * correct import * feat: entry for embedded dashboard (#17529) * create entry for embedded dashboard in webpack * add cookies * lint * token message handshake * guestTokenHeaderName * use setupClient instead of calling configure * rename the webpack chunk * simplified handshake * embedded entrypoint: render a proper app * make the embedded page accept anonymous connections * format * lint * fix test # Conflicts: # superset-frontend/src/embedded/index.tsx # superset/views/core.py * lint * Update superset-frontend/src/embedded/index.tsx Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * comment out origins checks * move embedded for core to dashboard * pylint * isort Co-authored-by: David Aaron Suddjian <aasuddjian@gmail.com> Co-authored-by: David Aaron Suddjian <1858430+suddjian@users.noreply.github.com> * feat: Authorizing guest access to embedded dashboards (#17757) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * add more test Co-authored-by: Lily Kuang <lily@preset.io> * feat: Row Level Security rules for guest tokens (#17836) * helper methods and dashboard access * guest token dashboard authz * adjust csrf exempt list * eums don't work that way * Remove unnecessary import * move row level security tests to their own file * a bit of refactoring * add guest token security tests * refactor tests * clean imports * variable names can be too long apparently * missing argument to get_user_roles * don't redefine builtins * remove unused imports * fix test import * default to global user when getting roles * missing import * mock it * test get_user_roles * infer g.user for ease of tests * remove redundant check * tests for guest user security manager fns * use algo to get rid of warning messages * tweaking access checks * fix guest token security tests * missing imports * more tests * more testing and also some small refactoring * move validation out of parsing * fix dashboard access check again * rls rules for guest tokens * test guest token rls rules * more flexible rls rules * lint * fix tests * fix test * defaults * fix some tests * fix some tests * lint Co-authored-by: Lily Kuang <lily@preset.io> * SupersetClient guest token test * Apply suggestions from code review Co-authored-by: Lily Kuang <lily@preset.io> Co-authored-by: Lily Kuang <lily@preset.io>
2022-01-25 19:41:32 -05:00
@patch("superset.security.SupersetSecurityManager._get_current_epoch_time")
def test_get_guest_user_expired_token(self, get_time_mock):
# make a just-expired token
get_time_mock.return_value = (
time.time() - (self.app.config["GUEST_TOKEN_JWT_EXP_SECONDS"] * 1000) - 1
)
token = self.create_guest_token()
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
self.assertIsNone(guest_user)
def test_get_guest_user_no_user(self):
user = None
resources = [{"type": "dashboard", "id": 1}]
rls = {}
token = security_manager.create_guest_access_token(user, resources, rls)
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
self.assertIsNone(guest_user)
self.assertRaisesRegex(ValueError, "Guest token does not contain a user claim")
def test_get_guest_user_no_resource(self):
user = {"username": "test_guest"}
resources = []
rls = {}
token = security_manager.create_guest_access_token(user, resources, rls)
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
security_manager.get_guest_user_from_request(fake_request)
self.assertRaisesRegex(
ValueError, "Guest token does not contain a resources claim"
)
def test_get_guest_user_not_guest_type(self):
now = time.time()
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
aud = get_url_host()
claims = {
"user": user,
"resources": resources,
"rls_rules": [],
# standard jwt claims:
"aud": aud,
"iat": now, # issued at
"type": "not_guest",
}
token = jwt.encode(
claims,
self.app.config["GUEST_TOKEN_JWT_SECRET"],
algorithm=self.app.config["GUEST_TOKEN_JWT_ALGO"],
)
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
self.assertIsNone(guest_user)
self.assertRaisesRegex(ValueError, "This is not a guest token.")
def test_get_guest_user_bad_audience(self):
now = time.time()
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
aud = get_url_host() # noqa: F841
claims = {
"user": user,
"resources": resources,
"rls_rules": [],
# standard jwt claims:
"aud": "bad_audience",
"iat": now, # issued at
"type": "guest",
}
token = jwt.encode(
claims,
self.app.config["GUEST_TOKEN_JWT_SECRET"],
algorithm=self.app.config["GUEST_TOKEN_JWT_ALGO"],
)
fake_request = FakeRequest()
fake_request.headers[current_app.config["GUEST_TOKEN_HEADER_NAME"]] = token
guest_user = security_manager.get_guest_user_from_request(fake_request)
self.assertRaisesRegex(jwt.exceptions.InvalidAudienceError, "Invalid audience")
self.assertIsNone(guest_user)
@patch("superset.security.SupersetSecurityManager._get_current_epoch_time")
def test_create_guest_access_token_callable_audience(self, get_time_mock):
now = time.time()
get_time_mock.return_value = now
app.config["GUEST_TOKEN_JWT_AUDIENCE"] = Mock(return_value="cool_code")
user = {"username": "test_guest"}
resources = [{"some": "resource"}]
rls = [{"dataset": 1, "clause": "access = 1"}]
token = security_manager.create_guest_access_token(user, resources, rls)
decoded_token = jwt.decode(
token,
self.app.config["GUEST_TOKEN_JWT_SECRET"],
algorithms=[self.app.config["GUEST_TOKEN_JWT_ALGO"]],
audience="cool_code",
)
app.config["GUEST_TOKEN_JWT_AUDIENCE"].assert_called_once()
self.assertEqual("cool_code", decoded_token["aud"])
self.assertEqual("guest", decoded_token["type"])
app.config["GUEST_TOKEN_JWT_AUDIENCE"] = None