mirror of
https://github.com/apache/superset.git
synced 2024-09-19 20:19:37 -04:00
279 lines
9.4 KiB
Python
279 lines
9.4 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
"""Unit tests for alerting in Superset"""
|
|
import logging
|
|
from contextlib import contextmanager
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from flask_appbuilder.security.sqla.models import Role
|
|
|
|
from superset.extensions import db, security_manager
|
|
from superset.migrations.shared.security_converge import (
|
|
add_pvms,
|
|
migrate_roles,
|
|
Pvm,
|
|
PvmMigrationMapType,
|
|
)
|
|
from tests.integration_tests.test_app import app
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@contextmanager
|
|
def create_old_role(pvm_map: PvmMigrationMapType, external_pvms):
|
|
with app.app_context():
|
|
pvms = []
|
|
for old_pvm, new_pvms in pvm_map.items():
|
|
pvms.append(
|
|
security_manager.add_permission_view_menu(
|
|
old_pvm.permission, old_pvm.view
|
|
)
|
|
)
|
|
for external_pvm in external_pvms:
|
|
pvms.append(
|
|
security_manager.find_permission_view_menu(
|
|
external_pvm.permission, external_pvm.view
|
|
)
|
|
)
|
|
|
|
new_role = Role(name="Dummy Role", permissions=pvms)
|
|
db.session.add(new_role)
|
|
db.session.commit()
|
|
|
|
yield new_role
|
|
|
|
new_role = (
|
|
db.session.query(Role).filter(Role.name == "Dummy Role").one_or_none()
|
|
)
|
|
new_role.permissions = []
|
|
for old_pvm, new_pvms in pvm_map.items():
|
|
security_manager.del_permission_view_menu(old_pvm.permission, old_pvm.view)
|
|
for new_pvm in new_pvms:
|
|
security_manager.del_permission_view_menu(
|
|
new_pvm.permission, new_pvm.view
|
|
)
|
|
|
|
db.session.delete(new_role)
|
|
db.session.commit()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"description, new_pvms, pvm_map, external_pvms, deleted_views, deleted_permissions",
|
|
[
|
|
(
|
|
"Many to one readonly",
|
|
{"NewDummy": ("can_read",)},
|
|
{
|
|
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),),
|
|
},
|
|
(),
|
|
("DummyView",),
|
|
(),
|
|
),
|
|
(
|
|
"Many to one with new permission",
|
|
{"NewDummy": ("can_new_perm", "can_write")},
|
|
{
|
|
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_new_perm"),),
|
|
Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_write"),),
|
|
},
|
|
(),
|
|
("DummyView",),
|
|
(),
|
|
),
|
|
(
|
|
"Many to one with multiple permissions",
|
|
{
|
|
"NewDummy": (
|
|
"can_read",
|
|
"can_write",
|
|
)
|
|
},
|
|
{
|
|
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
|
|
Pvm("DummyView", "can_delete"): (Pvm("NewDummy", "can_write"),),
|
|
},
|
|
(),
|
|
("DummyView",),
|
|
(),
|
|
),
|
|
(
|
|
"Many to one with multiple views",
|
|
{
|
|
"NewDummy": (
|
|
"can_read",
|
|
"can_write",
|
|
)
|
|
},
|
|
{
|
|
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummyView", "can_show"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
|
|
Pvm("DummyView", "can_delete"): (Pvm("NewDummy", "can_write"),),
|
|
Pvm("DummySecondView", "can_list"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummySecondView", "can_show"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummySecondView", "can_add"): (Pvm("NewDummy", "can_write"),),
|
|
Pvm("DummySecondView", "can_delete"): (Pvm("NewDummy", "can_write"),),
|
|
},
|
|
(),
|
|
("DummyView", "DummySecondView"),
|
|
(),
|
|
),
|
|
(
|
|
"Many to one with existing permission-view (pvm)",
|
|
{
|
|
"NewDummy": (
|
|
"can_read",
|
|
"can_write",
|
|
)
|
|
},
|
|
{
|
|
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
|
|
},
|
|
(Pvm("UserDBModelView", "can_list"),),
|
|
("DummyView",),
|
|
(),
|
|
),
|
|
(
|
|
"Many to one with existing multiple permission-view (pvm)",
|
|
{
|
|
"NewDummy": (
|
|
"can_read",
|
|
"can_write",
|
|
)
|
|
},
|
|
{
|
|
Pvm("DummyView", "can_list"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
|
|
Pvm("DummySecondView", "can_list"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummySecondView", "can_add"): (Pvm("NewDummy", "can_write"),),
|
|
},
|
|
(
|
|
Pvm("UserDBModelView", "can_list"),
|
|
Pvm("UserDBModelView", "can_add"),
|
|
),
|
|
("DummyView",),
|
|
(),
|
|
),
|
|
(
|
|
"Many to one with old permission that gets deleted",
|
|
{
|
|
"NewDummy": (
|
|
"can_read",
|
|
"can_write",
|
|
)
|
|
},
|
|
{
|
|
Pvm("DummyView", "can_new_perm"): (Pvm("NewDummy", "can_read"),),
|
|
Pvm("DummyView", "can_add"): (Pvm("NewDummy", "can_write"),),
|
|
},
|
|
(),
|
|
("DummyView",),
|
|
("can_new_perm",),
|
|
),
|
|
(
|
|
"Many to Many (normally should be a downgrade)",
|
|
{
|
|
"DummyView": (
|
|
"can_list",
|
|
"can_show",
|
|
"can_add",
|
|
)
|
|
},
|
|
{
|
|
Pvm("NewDummy", "can_read"): (
|
|
Pvm("DummyView", "can_list"),
|
|
Pvm("DummyView", "can_show"),
|
|
),
|
|
Pvm("NewDummy", "can_write"): (Pvm("DummyView", "can_add"),),
|
|
},
|
|
(),
|
|
("NewDummy",),
|
|
(),
|
|
),
|
|
(
|
|
"Many to Many delete old permissions",
|
|
{
|
|
"DummyView": (
|
|
"can_list",
|
|
"can_show",
|
|
"can_add",
|
|
)
|
|
},
|
|
{
|
|
Pvm("NewDummy", "can_new_perm1"): (
|
|
Pvm("DummyView", "can_list"),
|
|
Pvm("DummyView", "can_show"),
|
|
),
|
|
Pvm(
|
|
"NewDummy",
|
|
"can_new_perm2",
|
|
): (Pvm("DummyView", "can_add"),),
|
|
},
|
|
(),
|
|
("NewDummy",),
|
|
("can_new_perm1", "can_new_perm2"),
|
|
),
|
|
],
|
|
)
|
|
def test_migrate_role(
|
|
description, new_pvms, pvm_map, external_pvms, deleted_views, deleted_permissions
|
|
):
|
|
"""
|
|
Permission migration: generic tests
|
|
"""
|
|
logger.info(description)
|
|
with create_old_role(pvm_map, external_pvms) as old_role:
|
|
role_name = old_role.name
|
|
session = db.session
|
|
|
|
# Run migrations
|
|
add_pvms(session, new_pvms)
|
|
migrate_roles(session, pvm_map)
|
|
|
|
role = db.session.query(Role).filter(Role.name == role_name).one_or_none()
|
|
for old_pvm, new_pvms in pvm_map.items():
|
|
old_pvm_model = security_manager.find_permission_view_menu(
|
|
old_pvm.permission, old_pvm.view
|
|
)
|
|
assert old_pvm_model is None
|
|
new_pvm_model = security_manager.find_permission_view_menu(
|
|
new_pvms[0].permission, new_pvms[0].view
|
|
)
|
|
assert new_pvm_model is not None
|
|
assert new_pvm_model in role.permissions
|
|
# assert deleted view menus
|
|
for deleted_view in deleted_views:
|
|
assert security_manager.find_view_menu(deleted_view) is None
|
|
# assert deleted permissions
|
|
for deleted_permission in deleted_permissions:
|
|
assert security_manager.find_permission(deleted_permission) is None
|
|
# assert externals are still there
|
|
for external_pvm in external_pvms:
|
|
assert (
|
|
security_manager.find_permission_view_menu(
|
|
external_pvm.permission, external_pvm.view
|
|
)
|
|
is not None
|
|
)
|