# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # isort:skip_file import inspect import re import unittest from unittest.mock import Mock, patch from typing import Any, Dict import prison import pytest from flask import current_app, g from superset.models.dashboard import Dashboard from superset import app, appbuilder, db, security_manager, viz, ConnectorRegistry from superset.connectors.druid.models import DruidCluster, DruidDatasource from superset.connectors.sqla.models import RowLevelSecurityFilter, SqlaTable from superset.errors import ErrorLevel, SupersetError, SupersetErrorType from superset.exceptions import SupersetSecurityException from superset.models.core import Database from superset.models.slice import Slice from superset.sql_parse import Table from superset.utils.core import get_example_database from .base_tests import SupersetTestCase from tests.fixtures.birth_names_dashboard import load_birth_names_dashboard_with_slices from tests.fixtures.energy_dashboard import load_energy_table_with_slice from tests.fixtures.public_role import ( public_role_like_gamma, public_role_like_test_role, ) from tests.fixtures.unicode_dashboard import load_unicode_dashboard_with_slice from tests.fixtures.world_bank_dashboard import load_world_bank_dashboard_with_slices NEW_SECURITY_CONVERGE_VIEWS = ( "Annotation", "Database", "Dataset", "Dashboard", "CssTemplate", "Chart", "Query", "SavedQuery", ) def get_perm_tuples(role_name): perm_set = set() for perm in security_manager.find_role(role_name).permissions: perm_set.add((perm.permission.name, perm.view_menu.name)) return perm_set SCHEMA_ACCESS_ROLE = "schema_access_role" def create_schema_perm(view_menu_name: str) -> None: permission = "schema_access" security_manager.add_permission_view_menu(permission, view_menu_name) perm_view = security_manager.find_permission_view_menu(permission, view_menu_name) security_manager.add_permission_role( security_manager.find_role(SCHEMA_ACCESS_ROLE), perm_view ) return None def delete_schema_perm(view_menu_name: str) -> None: pv = security_manager.find_permission_view_menu("schema_access", "[examples].[2]") security_manager.del_permission_role( security_manager.find_role(SCHEMA_ACCESS_ROLE), pv ) security_manager.del_permission_view_menu("schema_access", "[examples].[2]") return None class TestRolePermission(SupersetTestCase): """Testing export role permissions.""" def setUp(self): session = db.session security_manager.add_role(SCHEMA_ACCESS_ROLE) session.commit() ds = ( db.session.query(SqlaTable) .filter_by(table_name="wb_health_population") .first() ) ds.schema = "temp_schema" ds.schema_perm = ds.get_schema_perm() ds_slices = ( session.query(Slice) .filter_by(datasource_type="table") .filter_by(datasource_id=ds.id) .all() ) for s in ds_slices: s.schema_perm = ds.schema_perm create_schema_perm("[examples].[temp_schema]") gamma_user = security_manager.find_user(username="gamma") gamma_user.roles.append(security_manager.find_role(SCHEMA_ACCESS_ROLE)) session.commit() def tearDown(self): session = db.session ds = ( session.query(SqlaTable) .filter_by(table_name="wb_health_population") .first() ) schema_perm = ds.schema_perm ds.schema = None ds.schema_perm = None ds_slices = ( session.query(Slice) .filter_by(datasource_type="table") .filter_by(datasource_id=ds.id) .all() ) for s in ds_slices: s.schema_perm = None delete_schema_perm(schema_perm) session.delete(security_manager.find_role(SCHEMA_ACCESS_ROLE)) session.commit() def test_set_perm_sqla_table(self): session = db.session table = SqlaTable( schema="tmp_schema", table_name="tmp_perm_table", database=get_example_database(), ) session.add(table) session.commit() stored_table = ( session.query(SqlaTable).filter_by(table_name="tmp_perm_table").one() ) self.assertEqual( stored_table.perm, f"[examples].[tmp_perm_table](id:{stored_table.id})" ) self.assertIsNotNone( security_manager.find_permission_view_menu( "datasource_access", stored_table.perm ) ) self.assertEqual(stored_table.schema_perm, "[examples].[tmp_schema]") self.assertIsNotNone( security_manager.find_permission_view_menu( "schema_access", stored_table.schema_perm ) ) # table name change stored_table.table_name = "tmp_perm_table_v2" session.commit() stored_table = ( session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one() ) self.assertEqual( stored_table.perm, f"[examples].[tmp_perm_table_v2](id:{stored_table.id})" ) self.assertIsNotNone( security_manager.find_permission_view_menu( "datasource_access", stored_table.perm ) ) # no changes in schema self.assertEqual(stored_table.schema_perm, "[examples].[tmp_schema]") self.assertIsNotNone( security_manager.find_permission_view_menu( "schema_access", stored_table.schema_perm ) ) # schema name change stored_table.schema = "tmp_schema_v2" session.commit() stored_table = ( session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one() ) self.assertEqual( stored_table.perm, f"[examples].[tmp_perm_table_v2](id:{stored_table.id})" ) self.assertIsNotNone( security_manager.find_permission_view_menu( "datasource_access", stored_table.perm ) ) # no changes in schema self.assertEqual(stored_table.schema_perm, "[examples].[tmp_schema_v2]") self.assertIsNotNone( security_manager.find_permission_view_menu( "schema_access", stored_table.schema_perm ) ) # database change new_db = Database(sqlalchemy_uri="some_uri", database_name="tmp_db") session.add(new_db) stored_table.database = ( session.query(Database).filter_by(database_name="tmp_db").one() ) session.commit() stored_table = ( session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one() ) self.assertEqual( stored_table.perm, f"[tmp_db].[tmp_perm_table_v2](id:{stored_table.id})" ) self.assertIsNotNone( security_manager.find_permission_view_menu( "datasource_access", stored_table.perm ) ) # no changes in schema self.assertEqual(stored_table.schema_perm, "[tmp_db].[tmp_schema_v2]") self.assertIsNotNone( security_manager.find_permission_view_menu( "schema_access", stored_table.schema_perm ) ) # no schema stored_table.schema = None session.commit() stored_table = ( session.query(SqlaTable).filter_by(table_name="tmp_perm_table_v2").one() ) self.assertEqual( stored_table.perm, f"[tmp_db].[tmp_perm_table_v2](id:{stored_table.id})" ) self.assertIsNotNone( security_manager.find_permission_view_menu( "datasource_access", stored_table.perm ) ) self.assertIsNone(stored_table.schema_perm) session.delete(new_db) session.delete(stored_table) session.commit() @pytest.mark.usefixtures("load_world_bank_dashboard_with_slices") def test_set_perm_druid_datasource(self): self.create_druid_test_objects() session = db.session druid_cluster = ( session.query(DruidCluster).filter_by(cluster_name="druid_test").one() ) datasource = DruidDatasource( datasource_name="tmp_datasource", cluster=druid_cluster, cluster_id=druid_cluster.id, ) session.add(datasource) session.commit() # store without a schema stored_datasource = ( session.query(DruidDatasource) .filter_by(datasource_name="tmp_datasource") .one() ) self.assertEqual( stored_datasource.perm, f"[druid_test].[tmp_datasource](id:{stored_datasource.id})", ) self.assertIsNotNone( security_manager.find_permission_view_menu( "datasource_access", stored_datasource.perm ) ) self.assertIsNone(stored_datasource.schema_perm) # store with a schema stored_datasource.datasource_name = "tmp_schema.tmp_datasource" session.commit() self.assertEqual( stored_datasource.perm, f"[druid_test].[tmp_schema.tmp_datasource](id:{stored_datasource.id})", ) self.assertIsNotNone( security_manager.find_permission_view_menu( "datasource_access", stored_datasource.perm ) ) self.assertIsNotNone(stored_datasource.schema_perm, "[druid_test].[tmp_schema]") self.assertIsNotNone( security_manager.find_permission_view_menu( "schema_access", stored_datasource.schema_perm ) ) session.delete(stored_datasource) session.commit() def test_set_perm_druid_cluster(self): session = db.session cluster = DruidCluster(cluster_name="tmp_druid_cluster") session.add(cluster) stored_cluster = ( session.query(DruidCluster) .filter_by(cluster_name="tmp_druid_cluster") .one() ) self.assertEqual( stored_cluster.perm, f"[tmp_druid_cluster].(id:{stored_cluster.id})" ) self.assertIsNotNone( security_manager.find_permission_view_menu( "database_access", stored_cluster.perm ) ) stored_cluster.cluster_name = "tmp_druid_cluster2" session.commit() self.assertEqual( stored_cluster.perm, f"[tmp_druid_cluster2].(id:{stored_cluster.id})" ) self.assertIsNotNone( security_manager.find_permission_view_menu( "database_access", stored_cluster.perm ) ) session.delete(stored_cluster) session.commit() def test_set_perm_database(self): session = db.session database = Database( database_name="tmp_database", sqlalchemy_uri="sqlite://test" ) session.add(database) stored_db = ( session.query(Database).filter_by(database_name="tmp_database").one() ) self.assertEqual(stored_db.perm, f"[tmp_database].(id:{stored_db.id})") self.assertIsNotNone( security_manager.find_permission_view_menu( "database_access", stored_db.perm ) ) stored_db.database_name = "tmp_database2" session.commit() stored_db = ( session.query(Database).filter_by(database_name="tmp_database2").one() ) self.assertEqual(stored_db.perm, f"[tmp_database2].(id:{stored_db.id})") self.assertIsNotNone( security_manager.find_permission_view_menu( "database_access", stored_db.perm ) ) session.delete(stored_db) session.commit() def test_hybrid_perm_druid_cluster(self): cluster = DruidCluster(cluster_name="tmp_druid_cluster3") db.session.add(cluster) id_ = ( db.session.query(DruidCluster.id) .filter_by(cluster_name="tmp_druid_cluster3") .scalar() ) record = ( db.session.query(DruidCluster) .filter_by(perm=f"[tmp_druid_cluster3].(id:{id_})") .one() ) self.assertEqual(record.get_perm(), record.perm) self.assertEqual(record.id, id_) self.assertEqual(record.cluster_name, "tmp_druid_cluster3") db.session.delete(cluster) db.session.commit() def test_hybrid_perm_database(self): database = Database( database_name="tmp_database3", sqlalchemy_uri="sqlite://test" ) db.session.add(database) id_ = ( db.session.query(Database.id) .filter_by(database_name="tmp_database3") .scalar() ) record = ( db.session.query(Database) .filter_by(perm=f"[tmp_database3].(id:{id_})") .one() ) self.assertEqual(record.get_perm(), record.perm) self.assertEqual(record.id, id_) self.assertEqual(record.database_name, "tmp_database3") db.session.delete(database) db.session.commit() def test_set_perm_slice(self): session = db.session database = Database( database_name="tmp_database", sqlalchemy_uri="sqlite://test" ) table = SqlaTable(table_name="tmp_perm_table", database=database) session.add(database) session.add(table) session.commit() # no schema permission slice = Slice( datasource_id=table.id, datasource_type="table", datasource_name="tmp_perm_table", slice_name="slice_name", ) session.add(slice) session.commit() slice = session.query(Slice).filter_by(slice_name="slice_name").one() self.assertEqual(slice.perm, table.perm) self.assertEqual(slice.perm, f"[tmp_database].[tmp_perm_table](id:{table.id})") self.assertEqual(slice.schema_perm, table.schema_perm) self.assertIsNone(slice.schema_perm) table.schema = "tmp_perm_schema" table.table_name = "tmp_perm_table_v2" session.commit() # TODO(bogdan): modify slice permissions on the table update. self.assertNotEquals(slice.perm, table.perm) self.assertEqual(slice.perm, f"[tmp_database].[tmp_perm_table](id:{table.id})") self.assertEqual( table.perm, f"[tmp_database].[tmp_perm_table_v2](id:{table.id})" ) # TODO(bogdan): modify slice schema permissions on the table update. self.assertNotEquals(slice.schema_perm, table.schema_perm) self.assertIsNone(slice.schema_perm) # updating slice refreshes the permissions slice.slice_name = "slice_name_v2" session.commit() self.assertEqual(slice.perm, table.perm) self.assertEqual( slice.perm, f"[tmp_database].[tmp_perm_table_v2](id:{table.id})" ) self.assertEqual(slice.schema_perm, table.schema_perm) self.assertEqual(slice.schema_perm, "[tmp_database].[tmp_perm_schema]") session.delete(slice) session.delete(table) session.delete(database) session.commit() # TODO test slice permission @patch("superset.security.manager.g") def test_schemas_accessible_by_user_admin(self, mock_g): mock_g.user = security_manager.find_user("admin") with self.client.application.test_request_context(): database = get_example_database() schemas = security_manager.get_schemas_accessible_by_user( database, ["1", "2", "3"] ) self.assertEqual(schemas, ["1", "2", "3"]) # no changes @patch("superset.security.manager.g") def test_schemas_accessible_by_user_schema_access(self, mock_g): # User has schema access to the schema 1 create_schema_perm("[examples].[1]") mock_g.user = security_manager.find_user("gamma") with self.client.application.test_request_context(): database = get_example_database() schemas = security_manager.get_schemas_accessible_by_user( database, ["1", "2", "3"] ) # temp_schema is not passed in the params self.assertEqual(schemas, ["1"]) delete_schema_perm("[examples].[1]") @patch("superset.security.manager.g") def test_schemas_accessible_by_user_datasource_access(self, mock_g): # User has schema access to the datasource temp_schema.wb_health_population in examples DB. mock_g.user = security_manager.find_user("gamma") with self.client.application.test_request_context(): database = get_example_database() schemas = security_manager.get_schemas_accessible_by_user( database, ["temp_schema", "2", "3"] ) self.assertEqual(schemas, ["temp_schema"]) @patch("superset.security.manager.g") def test_schemas_accessible_by_user_datasource_and_schema_access(self, mock_g): # User has schema access to the datasource temp_schema.wb_health_population in examples DB. create_schema_perm("[examples].[2]") mock_g.user = security_manager.find_user("gamma") with self.client.application.test_request_context(): database = get_example_database() schemas = security_manager.get_schemas_accessible_by_user( database, ["temp_schema", "2", "3"] ) self.assertEqual(schemas, ["temp_schema", "2"]) vm = security_manager.find_permission_view_menu( "schema_access", "[examples].[2]" ) self.assertIsNotNone(vm) delete_schema_perm("[examples].[2]") @pytest.mark.usefixtures("load_world_bank_dashboard_with_slices") def test_gamma_user_schema_access_to_dashboards(self): dash = db.session.query(Dashboard).filter_by(slug="world_health").first() dash.published = True db.session.commit() self.login(username="gamma") data = str(self.client.get("api/v1/dashboard/").data) self.assertIn("/superset/dashboard/world_health/", data) self.assertNotIn("/superset/dashboard/births/", data) def test_gamma_user_schema_access_to_tables(self): self.login(username="gamma") data = str(self.client.get("tablemodelview/list/").data) self.assertIn("wb_health_population", data) self.assertNotIn("birth_names", data) @pytest.mark.usefixtures("load_world_bank_dashboard_with_slices") def test_gamma_user_schema_access_to_charts(self): self.login(username="gamma") data = str(self.client.get("api/v1/chart/").data) self.assertIn( "Life Expectancy VS Rural %", data ) # wb_health_population slice, has access self.assertIn( "Parallel Coordinates", data ) # wb_health_population slice, has access self.assertNotIn("Girl Name Cloud", data) # birth_names slice, no access @pytest.mark.usefixtures("public_role_like_gamma") def test_public_sync_role_data_perms(self): """ Security: Tests if the sync role method preserves data access permissions if they already exist on a public role. Also check that non data access permissions are removed """ table = db.session.query(SqlaTable).filter_by(table_name="birth_names").one() self.grant_public_access_to_table(table) public_role = security_manager.get_public_role() unwanted_pvm = security_manager.find_permission_view_menu( "menu_access", "Security" ) public_role.permissions.append(unwanted_pvm) db.session.commit() security_manager.sync_role_definitions() public_role = security_manager.get_public_role() public_role_resource_names = [ permission.view_menu.name for permission in public_role.permissions ] assert table.get_perm() in public_role_resource_names assert "Security" not in public_role_resource_names # Cleanup self.revoke_public_access_to_table(table) @pytest.mark.usefixtures("public_role_like_test_role") def test_public_sync_role_builtin_perms(self): """ Security: Tests public role creation based on a builtin role """ public_role = security_manager.get_public_role() public_role_resource_names = [ [permission.view_menu.name, permission.permission.name] for permission in public_role.permissions ] for pvm in current_app.config["FAB_ROLES"]["TestRole"]: assert pvm in public_role_resource_names def test_sqllab_gamma_user_schema_access_to_sqllab(self): session = db.session example_db = session.query(Database).filter_by(database_name="examples").one() example_db.expose_in_sqllab = True session.commit() arguments = { "keys": ["none"], "filters": [{"col": "expose_in_sqllab", "opr": "eq", "value": True}], "order_columns": "database_name", "order_direction": "asc", "page": 0, "page_size": -1, } NEW_FLASK_GET_SQL_DBS_REQUEST = f"/api/v1/database/?q={prison.dumps(arguments)}" self.login(username="gamma") databases_json = self.client.get(NEW_FLASK_GET_SQL_DBS_REQUEST).json self.assertEqual(databases_json["count"], 1) self.logout() def assert_can_read(self, view_menu, permissions_set): if view_menu in NEW_SECURITY_CONVERGE_VIEWS: self.assertIn(("can_read", view_menu), permissions_set) else: self.assertIn(("can_list", view_menu), permissions_set) def assert_can_write(self, view_menu, permissions_set): if view_menu in NEW_SECURITY_CONVERGE_VIEWS: self.assertIn(("can_write", view_menu), permissions_set) else: self.assertIn(("can_add", view_menu), permissions_set) self.assertIn(("can_delete", view_menu), permissions_set) self.assertIn(("can_edit", view_menu), permissions_set) def assert_cannot_write(self, view_menu, permissions_set): if view_menu in NEW_SECURITY_CONVERGE_VIEWS: self.assertNotIn(("can_write", view_menu), permissions_set) else: self.assertNotIn(("can_add", view_menu), permissions_set) self.assertNotIn(("can_delete", view_menu), permissions_set) self.assertNotIn(("can_edit", view_menu), permissions_set) self.assertNotIn(("can_save", view_menu), permissions_set) def assert_can_all(self, view_menu, permissions_set): self.assert_can_read(view_menu, permissions_set) self.assert_can_write(view_menu, permissions_set) def assert_can_menu(self, view_menu, permissions_set): self.assertIn(("menu_access", view_menu), permissions_set) def assert_can_gamma(self, perm_set): self.assert_can_read("Dataset", perm_set) # make sure that user can create slices and dashboards self.assert_can_all("Dashboard", perm_set) self.assert_can_all("Chart", perm_set) self.assertIn(("can_add_slices", "Superset"), perm_set) self.assertIn(("can_copy_dash", "Superset"), perm_set) self.assertIn(("can_created_dashboards", "Superset"), perm_set) self.assertIn(("can_created_slices", "Superset"), perm_set) self.assertIn(("can_csv", "Superset"), perm_set) self.assertIn(("can_dashboard", "Superset"), perm_set) self.assertIn(("can_explore", "Superset"), perm_set) self.assertIn(("can_share_chart", "Superset"), perm_set) self.assertIn(("can_share_dashboard", "Superset"), perm_set) self.assertIn(("can_explore_json", "Superset"), perm_set) self.assertIn(("can_fave_dashboards", "Superset"), perm_set) self.assertIn(("can_fave_slices", "Superset"), perm_set) self.assertIn(("can_save_dash", "Superset"), perm_set) self.assertIn(("can_slice", "Superset"), perm_set) self.assertIn(("can_explore_json", "Superset"), perm_set) self.assertIn(("can_userinfo", "UserDBModelView"), perm_set) self.assert_can_menu("Databases", perm_set) self.assert_can_menu("Datasets", perm_set) self.assert_can_menu("Data", perm_set) self.assert_can_menu("Charts", perm_set) self.assert_can_menu("Dashboards", perm_set) def assert_can_alpha(self, perm_set): self.assert_can_all("Annotation", perm_set) self.assert_can_all("CssTemplate", perm_set) self.assert_can_all("Dataset", perm_set) self.assert_can_read("Query", perm_set) self.assert_can_read("Database", perm_set) self.assertIn(("can_import_dashboards", "Superset"), perm_set) self.assertIn(("can_this_form_post", "CsvToDatabaseView"), perm_set) self.assertIn(("can_this_form_get", "CsvToDatabaseView"), perm_set) self.assert_can_menu("Manage", perm_set) self.assert_can_menu("Annotation Layers", perm_set) self.assert_can_menu("CSS Templates", perm_set) self.assert_can_menu("Upload a CSV", perm_set) self.assertIn(("all_datasource_access", "all_datasource_access"), perm_set) def assert_cannot_alpha(self, perm_set): if app.config["ENABLE_ACCESS_REQUEST"]: self.assert_cannot_write("AccessRequestsModelView", perm_set) self.assert_can_all("AccessRequestsModelView", perm_set) self.assert_cannot_write("Queries", perm_set) self.assert_cannot_write("RoleModelView", perm_set) self.assert_cannot_write("UserDBModelView", perm_set) self.assert_cannot_write("Database", perm_set) def assert_can_admin(self, perm_set): self.assert_can_all("Database", perm_set) self.assert_can_all("RoleModelView", perm_set) self.assert_can_all("UserDBModelView", perm_set) self.assertIn(("all_database_access", "all_database_access"), perm_set) self.assertIn(("can_override_role_permissions", "Superset"), perm_set) self.assertIn(("can_sync_druid_source", "Superset"), perm_set) self.assertIn(("can_override_role_permissions", "Superset"), perm_set) self.assertIn(("can_approve", "Superset"), perm_set) self.assert_can_menu("Security", perm_set) self.assert_can_menu("List Users", perm_set) self.assert_can_menu("List Roles", perm_set) def test_is_admin_only(self): self.assertFalse( security_manager._is_admin_only( security_manager.find_permission_view_menu("can_read", "Dataset") ) ) self.assertFalse( security_manager._is_admin_only( security_manager.find_permission_view_menu( "all_datasource_access", "all_datasource_access" ) ) ) log_permissions = ["can_read"] for log_permission in log_permissions: self.assertTrue( security_manager._is_admin_only( security_manager.find_permission_view_menu(log_permission, "Log") ) ) if app.config["ENABLE_ACCESS_REQUEST"]: self.assertTrue( security_manager._is_admin_only( security_manager.find_permission_view_menu( "can_list", "AccessRequestsModelView" ) ) ) self.assertTrue( security_manager._is_admin_only( security_manager.find_permission_view_menu( "can_edit", "UserDBModelView" ) ) ) self.assertTrue( security_manager._is_admin_only( security_manager.find_permission_view_menu("can_approve", "Superset") ) ) @unittest.skipUnless( SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed" ) def test_is_alpha_only(self): self.assertFalse( security_manager._is_alpha_only( security_manager.find_permission_view_menu("can_read", "Dataset") ) ) self.assertTrue( security_manager._is_alpha_only( security_manager.find_permission_view_menu("can_write", "Dataset") ) ) self.assertTrue( security_manager._is_alpha_only( security_manager.find_permission_view_menu( "all_datasource_access", "all_datasource_access" ) ) ) self.assertTrue( security_manager._is_alpha_only( security_manager.find_permission_view_menu( "all_database_access", "all_database_access" ) ) ) def test_is_gamma_pvm(self): self.assertTrue( security_manager._is_gamma_pvm( security_manager.find_permission_view_menu("can_read", "Dataset") ) ) def test_gamma_permissions_basic(self): self.assert_can_gamma(get_perm_tuples("Gamma")) self.assert_cannot_alpha(get_perm_tuples("Gamma")) @pytest.mark.usefixtures("public_role_like_gamma") def test_public_permissions_basic(self): self.assert_can_gamma(get_perm_tuples("Public")) @unittest.skipUnless( SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed" ) def test_alpha_permissions(self): alpha_perm_tuples = get_perm_tuples("Alpha") self.assert_can_gamma(alpha_perm_tuples) self.assert_can_alpha(alpha_perm_tuples) self.assert_cannot_alpha(alpha_perm_tuples) @unittest.skipUnless( SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed" ) @pytest.mark.usefixtures("load_world_bank_dashboard_with_slices") def test_admin_permissions(self): self.assert_can_gamma(get_perm_tuples("Admin")) self.assert_can_alpha(get_perm_tuples("Admin")) self.assert_can_admin(get_perm_tuples("Admin")) def test_sql_lab_permissions(self): sql_lab_set = get_perm_tuples("sql_lab") self.assertIn(("can_csv", "Superset"), sql_lab_set) self.assertIn(("can_read", "Database"), sql_lab_set) self.assertIn(("can_read", "SavedQuery"), sql_lab_set) self.assertIn(("can_sql_json", "Superset"), sql_lab_set) self.assertIn(("can_sqllab_viz", "Superset"), sql_lab_set) self.assertIn(("can_sqllab_table_viz", "Superset"), sql_lab_set) self.assertIn(("can_sqllab", "Superset"), sql_lab_set) self.assertIn(("menu_access", "SQL Lab"), sql_lab_set) self.assertIn(("menu_access", "SQL Editor"), sql_lab_set) self.assertIn(("menu_access", "Saved Queries"), sql_lab_set) self.assertIn(("menu_access", "Query Search"), sql_lab_set) self.assert_cannot_alpha(sql_lab_set) def test_granter_permissions(self): granter_set = get_perm_tuples("granter") self.assertIn(("can_override_role_permissions", "Superset"), granter_set) self.assertIn(("can_approve", "Superset"), granter_set) self.assert_cannot_alpha(granter_set) def test_gamma_permissions(self): gamma_perm_set = set() for perm in security_manager.find_role("Gamma").permissions: gamma_perm_set.add((perm.permission.name, perm.view_menu.name)) # check read only perms # make sure that user can create slices and dashboards self.assert_can_all("Dashboard", gamma_perm_set) self.assert_can_read("Dataset", gamma_perm_set) # make sure that user can create slices and dashboards self.assert_can_all("Chart", gamma_perm_set) self.assert_cannot_write("UserDBModelView", gamma_perm_set) self.assert_cannot_write("RoleModelView", gamma_perm_set) self.assertIn(("can_add_slices", "Superset"), gamma_perm_set) self.assertIn(("can_copy_dash", "Superset"), gamma_perm_set) self.assertIn(("can_created_dashboards", "Superset"), gamma_perm_set) self.assertIn(("can_created_slices", "Superset"), gamma_perm_set) self.assertIn(("can_csv", "Superset"), gamma_perm_set) self.assertIn(("can_dashboard", "Superset"), gamma_perm_set) self.assertIn(("can_explore", "Superset"), gamma_perm_set) self.assertIn(("can_share_chart", "Superset"), gamma_perm_set) self.assertIn(("can_share_dashboard", "Superset"), gamma_perm_set) self.assertIn(("can_explore_json", "Superset"), gamma_perm_set) self.assertIn(("can_fave_dashboards", "Superset"), gamma_perm_set) self.assertIn(("can_fave_slices", "Superset"), gamma_perm_set) self.assertIn(("can_save_dash", "Superset"), gamma_perm_set) self.assertIn(("can_slice", "Superset"), gamma_perm_set) self.assertIn(("can_userinfo", "UserDBModelView"), gamma_perm_set) def test_views_are_secured(self): """Preventing the addition of unsecured views without has_access decorator""" # These FAB views are secured in their body as opposed to by decorators method_allowlist = ("action", "action_post") # List of redirect & other benign views views_allowlist = [ ["MyIndexView", "index"], ["UtilView", "back"], ["LocaleView", "index"], ["AuthDBView", "login"], ["AuthDBView", "logout"], ["R", "index"], ["Superset", "log"], ["Superset", "theme"], ["Superset", "welcome"], ["SecurityApi", "login"], ["SecurityApi", "refresh"], ["SupersetIndexView", "index"], ] unsecured_views = [] for view_class in appbuilder.baseviews: class_name = view_class.__class__.__name__ for name, value in inspect.getmembers( view_class, predicate=inspect.ismethod ): if ( name not in method_allowlist and [class_name, name] not in views_allowlist and hasattr(value, "_urls") and not hasattr(value, "_permission_name") ): unsecured_views.append((class_name, name)) if unsecured_views: view_str = "\n".join([str(v) for v in unsecured_views]) raise Exception(f"Some views are not secured:\n{view_str}") class TestSecurityManager(SupersetTestCase): """ Testing the Security Manager. """ @patch("superset.security.SupersetSecurityManager.raise_for_access") def test_can_access_datasource(self, mock_raise_for_access): datasource = self.get_datasource_mock() mock_raise_for_access.return_value = None self.assertTrue(security_manager.can_access_datasource(datasource=datasource)) mock_raise_for_access.side_effect = SupersetSecurityException( SupersetError( "dummy", SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR, ErrorLevel.ERROR, ) ) self.assertFalse(security_manager.can_access_datasource(datasource=datasource)) @patch("superset.security.SupersetSecurityManager.raise_for_access") def test_can_access_table(self, mock_raise_for_access): database = get_example_database() table = Table("bar", "foo") mock_raise_for_access.return_value = None self.assertTrue(security_manager.can_access_table(database, table)) mock_raise_for_access.side_effect = SupersetSecurityException( SupersetError( "dummy", SupersetErrorType.TABLE_SECURITY_ACCESS_ERROR, ErrorLevel.ERROR, ) ) self.assertFalse(security_manager.can_access_table(database, table)) @patch("superset.security.SupersetSecurityManager.can_access") @patch("superset.security.SupersetSecurityManager.can_access_schema") def test_raise_for_access_datasource(self, mock_can_access_schema, mock_can_access): datasource = self.get_datasource_mock() mock_can_access_schema.return_value = True security_manager.raise_for_access(datasource=datasource) mock_can_access.return_value = False mock_can_access_schema.return_value = False with self.assertRaises(SupersetSecurityException): security_manager.raise_for_access(datasource=datasource) @patch("superset.security.SupersetSecurityManager.can_access") def test_raise_for_access_query(self, mock_can_access): query = Mock( database=get_example_database(), schema="bar", sql="SELECT * FROM foo" ) mock_can_access.return_value = True security_manager.raise_for_access(query=query) mock_can_access.return_value = False with self.assertRaises(SupersetSecurityException): security_manager.raise_for_access(query=query) @patch("superset.security.SupersetSecurityManager.can_access") @patch("superset.security.SupersetSecurityManager.can_access_schema") def test_raise_for_access_query_context( self, mock_can_access_schema, mock_can_access ): query_context = Mock(datasource=self.get_datasource_mock()) mock_can_access_schema.return_value = True security_manager.raise_for_access(query_context=query_context) mock_can_access.return_value = False mock_can_access_schema.return_value = False with self.assertRaises(SupersetSecurityException): security_manager.raise_for_access(query_context=query_context) @patch("superset.security.SupersetSecurityManager.can_access") def test_raise_for_access_table(self, mock_can_access): database = get_example_database() table = Table("bar", "foo") mock_can_access.return_value = True security_manager.raise_for_access(database=database, table=table) mock_can_access.return_value = False with self.assertRaises(SupersetSecurityException): security_manager.raise_for_access(database=database, table=table) @patch("superset.security.SupersetSecurityManager.can_access") @patch("superset.security.SupersetSecurityManager.can_access_schema") def test_raise_for_access_viz(self, mock_can_access_schema, mock_can_access): test_viz = viz.TableViz(self.get_datasource_mock(), form_data={}) mock_can_access_schema.return_value = True security_manager.raise_for_access(viz=test_viz) mock_can_access.return_value = False mock_can_access_schema.return_value = False with self.assertRaises(SupersetSecurityException): security_manager.raise_for_access(viz=test_viz) class TestRowLevelSecurity(SupersetTestCase): """ Testing Row Level Security """ rls_entry = None query_obj: Dict[str, Any] = dict( groupby=[], metrics=None, filter=[], is_timeseries=False, columns=["value"], granularity=None, from_dttm=None, to_dttm=None, extras={}, ) NAME_AB_ROLE = "NameAB" NAME_Q_ROLE = "NameQ" NAMES_A_REGEX = re.compile(r"name like 'A%'") NAMES_B_REGEX = re.compile(r"name like 'B%'") NAMES_Q_REGEX = re.compile(r"name like 'Q%'") BASE_FILTER_REGEX = re.compile(r"gender = 'boy'") def setUp(self): session = db.session # Create roles security_manager.add_role(self.NAME_AB_ROLE) security_manager.add_role(self.NAME_Q_ROLE) gamma_user = security_manager.find_user(username="gamma") gamma_user.roles.append(security_manager.find_role(self.NAME_AB_ROLE)) gamma_user.roles.append(security_manager.find_role(self.NAME_Q_ROLE)) self.create_user_with_roles("NoRlsRoleUser", ["Gamma"]) session.commit() # Create regular RowLevelSecurityFilter (energy_usage, unicode_test) self.rls_entry1 = RowLevelSecurityFilter() self.rls_entry1.tables.extend( session.query(SqlaTable) .filter(SqlaTable.table_name.in_(["energy_usage", "unicode_test"])) .all() ) self.rls_entry1.filter_type = "Regular" self.rls_entry1.clause = "value > {{ cache_key_wrapper(1) }}" self.rls_entry1.group_key = None self.rls_entry1.roles.append(security_manager.find_role("Gamma")) self.rls_entry1.roles.append(security_manager.find_role("Alpha")) db.session.add(self.rls_entry1) # Create regular RowLevelSecurityFilter (birth_names name starts with A or B) self.rls_entry2 = RowLevelSecurityFilter() self.rls_entry2.tables.extend( session.query(SqlaTable) .filter(SqlaTable.table_name.in_(["birth_names"])) .all() ) self.rls_entry2.filter_type = "Regular" self.rls_entry2.clause = "name like 'A%' or name like 'B%'" self.rls_entry2.group_key = "name" self.rls_entry2.roles.append(security_manager.find_role("NameAB")) db.session.add(self.rls_entry2) # Create Regular RowLevelSecurityFilter (birth_names name starts with Q) self.rls_entry3 = RowLevelSecurityFilter() self.rls_entry3.tables.extend( session.query(SqlaTable) .filter(SqlaTable.table_name.in_(["birth_names"])) .all() ) self.rls_entry3.filter_type = "Regular" self.rls_entry3.clause = "name like 'Q%'" self.rls_entry3.group_key = "name" self.rls_entry3.roles.append(security_manager.find_role("NameQ")) db.session.add(self.rls_entry3) # Create Base RowLevelSecurityFilter (birth_names boys) self.rls_entry4 = RowLevelSecurityFilter() self.rls_entry4.tables.extend( session.query(SqlaTable) .filter(SqlaTable.table_name.in_(["birth_names"])) .all() ) self.rls_entry4.filter_type = "Base" self.rls_entry4.clause = "gender = 'boy'" self.rls_entry4.group_key = "gender" self.rls_entry4.roles.append(security_manager.find_role("Admin")) db.session.add(self.rls_entry4) db.session.commit() def tearDown(self): session = db.session session.delete(self.rls_entry1) session.delete(self.rls_entry2) session.delete(self.rls_entry3) session.delete(self.rls_entry4) session.delete(security_manager.find_role("NameAB")) session.delete(security_manager.find_role("NameQ")) session.delete(self.get_user("NoRlsRoleUser")) session.commit() @pytest.mark.usefixtures("load_energy_table_with_slice") def test_rls_filter_alters_energy_query(self): g.user = self.get_user(username="alpha") tbl = self.get_table_by_name("energy_usage") sql = tbl.get_query_str(self.query_obj) assert tbl.get_extra_cache_keys(self.query_obj) == [1] assert "value > 1" in sql @pytest.mark.usefixtures("load_energy_table_with_slice") def test_rls_filter_doesnt_alter_energy_query(self): g.user = self.get_user( username="admin" ) # self.login() doesn't actually set the user tbl = self.get_table_by_name("energy_usage") sql = tbl.get_query_str(self.query_obj) assert tbl.get_extra_cache_keys(self.query_obj) == [] assert "value > 1" not in sql @pytest.mark.usefixtures("load_unicode_dashboard_with_slice") def test_multiple_table_filter_alters_another_tables_query(self): g.user = self.get_user( username="alpha" ) # self.login() doesn't actually set the user tbl = self.get_table_by_name("unicode_test") sql = tbl.get_query_str(self.query_obj) assert tbl.get_extra_cache_keys(self.query_obj) == [1] assert "value > 1" in sql @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") def test_rls_filter_alters_gamma_birth_names_query(self): g.user = self.get_user(username="gamma") tbl = self.get_table_by_name("birth_names") sql = tbl.get_query_str(self.query_obj) # establish that the filters are grouped together correctly with # ANDs, ORs and parens in the correct place assert ( "WHERE ((name like 'A%'\n or name like 'B%')\n OR (name like 'Q%'))\n AND (gender = 'boy');" in sql ) @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") def test_rls_filter_alters_no_role_user_birth_names_query(self): g.user = self.get_user(username="NoRlsRoleUser") tbl = self.get_table_by_name("birth_names") sql = tbl.get_query_str(self.query_obj) # gamma's filters should not be present query assert not self.NAMES_A_REGEX.search(sql) assert not self.NAMES_B_REGEX.search(sql) assert not self.NAMES_Q_REGEX.search(sql) # base query should be present assert self.BASE_FILTER_REGEX.search(sql) @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") def test_rls_filter_doesnt_alter_admin_birth_names_query(self): g.user = self.get_user(username="admin") tbl = self.get_table_by_name("birth_names") sql = tbl.get_query_str(self.query_obj) # no filters are applied for admin user assert not self.NAMES_A_REGEX.search(sql) assert not self.NAMES_B_REGEX.search(sql) assert not self.NAMES_Q_REGEX.search(sql) assert not self.BASE_FILTER_REGEX.search(sql)