# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """Unit tests for alerting in Superset""" import logging from contextlib import contextmanager from unittest.mock import patch import pytest from flask_appbuilder.security.sqla.models import Role from superset.extensions import db, security_manager from superset.migrations.shared.security_converge import ( add_pvms, migrate_roles, Pvm, PvmMigrationMapType, ) from tests.integration_tests.test_app import app logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @contextmanager def create_old_role(pvm_map: PvmMigrationMapType, external_pvms): with app.app_context(): pvms = [] for old_pvm, new_pvms in pvm_map.items(): pvms.append( security_manager.add_permission_view_menu( old_pvm.permission, old_pvm.view ) ) for external_pvm in external_pvms: pvms.append( security_manager.find_permission_view_menu( external_pvm.permission, external_pvm.view ) ) new_role = Role(name="Dummy Role", permissions=pvms) db.session.add(new_role) db.session.commit() yield new_role new_role = ( db.session.query(Role).filter(Role.name == "Dummy Role").one_or_none() ) new_role.permissions = [] for old_pvm, new_pvms in pvm_map.items(): security_manager.del_permission_view_menu(old_pvm.permission, old_pvm.view) for new_pvm in new_pvms: security_manager.del_permission_view_menu( new_pvm.permission, new_pvm.view ) db.session.delete(new_role) db.session.commit() @pytest.mark.parametrize( "description, new_pvms, pvm_map, external_pvms, deleted_views, deleted_permissions", [ ( "Many to one readonly", {"NewDummy": ("can_read",)}, { Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),), }, (), ("DummyView",), (), ), ( "Many to one with new permission", {"NewDummy": ("can_new_perm", "can_write")}, { Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_new_perm"),), Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_write"),), }, (), ("DummyView",), (), ), ( "Many to one with multiple permissions", { "NewDummy": ( "can_read", "can_write", ) }, { Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),), Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), Pvm("DummyView", "can_delete"): (Pvm("NewDummy", "can_write"),), }, (), ("DummyView",), (), ), ( "Many to one with multiple views", { "NewDummy": ( "can_read", "can_write", ) }, { Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),), Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), Pvm("DummyView", "can_delete"): (Pvm("NewDummy", "can_write"),), Pvm("DummySecondView", "can_list"): (Pvm("NewDummy", "can_read"),), Pvm("DummySecondView", "can_show"): (Pvm("NewDummy", "can_read"),), Pvm("DummySecondView", "can_add"): (Pvm("NewDummy", "can_write"),), Pvm("DummySecondView", "can_delete"): (Pvm("NewDummy", "can_write"),), }, (), ("DummyView", "DummySecondView"), (), ), ( "Many to one with existing permission-view (pvm)", { "NewDummy": ( "can_read", "can_write", ) }, { Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), }, (Pvm("UserDBModelView", "can_list"),), ("DummyView",), (), ), ( "Many to one with existing multiple permission-view (pvm)", { "NewDummy": ( "can_read", "can_write", ) }, { Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),), Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), Pvm("DummySecondView", "can_list"): (Pvm("NewDummy", "can_read"),), Pvm("DummySecondView", "can_add"): (Pvm("NewDummy", "can_write"),), }, ( Pvm("UserDBModelView", "can_list"), Pvm("UserDBModelView", "can_add"), ), ("DummyView",), (), ), ( "Many to one with old permission that gets deleted", { "NewDummy": ( "can_read", "can_write", ) }, { Pvm("DummyView", "can_new_perm"): (Pvm("NewDummy", "can_read"),), Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),), }, (), ("DummyView",), ("can_new_perm",), ), ( "Many to Many (normally should be a downgrade)", { "DummyView": ( "can_list", "can_show", "can_add", ) }, { Pvm("NewDummy", "can_read"): ( Pvm("DummyView", "can_list"), Pvm("DummyView", "can_show"), ), Pvm("NewDummy", "can_write"): (Pvm("DummyView", "can_add"),), }, (), ("NewDummy",), (), ), ( "Many to Many delete old permissions", { "DummyView": ( "can_list", "can_show", "can_add", ) }, { Pvm("NewDummy", "can_new_perm1"): ( Pvm("DummyView", "can_list"), Pvm("DummyView", "can_show"), ), Pvm( "NewDummy", "can_new_perm2", ): (Pvm("DummyView", "can_add"),), }, (), ("NewDummy",), ("can_new_perm1", "can_new_perm2"), ), ], ) def test_migrate_role( description, new_pvms, pvm_map, external_pvms, deleted_views, deleted_permissions ): """ Permission migration: generic tests """ logger.info(description) with create_old_role(pvm_map, external_pvms) as old_role: role_name = old_role.name # Run migrations add_pvms(db.session, new_pvms) migrate_roles(db.session, pvm_map) role = db.session.query(Role).filter(Role.name == role_name).one_or_none() for old_pvm, new_pvms in pvm_map.items(): old_pvm_model = security_manager.find_permission_view_menu( old_pvm.permission, old_pvm.view ) assert old_pvm_model is None new_pvm_model = security_manager.find_permission_view_menu( new_pvms[0].permission, new_pvms[0].view ) assert new_pvm_model is not None assert new_pvm_model in role.permissions # assert deleted view menus for deleted_view in deleted_views: assert security_manager.find_view_menu(deleted_view) is None # assert deleted permissions for deleted_permission in deleted_permissions: assert security_manager.find_permission(deleted_permission) is None # assert externals are still there for external_pvm in external_pvms: assert ( security_manager.find_permission_view_menu( external_pvm.permission, external_pvm.view ) is not None )