fix(API): Updating assets via the API should preserve ownership configuration (#27364)

This commit is contained in:
Vitor Avila 2024-03-06 13:40:41 -03:00 committed by GitHub
parent 69d870cb7a
commit 66bf70172f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 404 additions and 17 deletions

View File

@ -19,7 +19,7 @@ from typing import Any, Optional
from flask_appbuilder.security.sqla.models import User
from superset.commands.utils import populate_owners
from superset.commands.utils import compute_owner_list, populate_owner_list
class BaseCommand(ABC):
@ -55,7 +55,7 @@ class CreateMixin: # pylint: disable=too-few-public-methods
:raises OwnersNotFoundValidationError: if at least one owner can't be resolved
:returns: Final list of owners
"""
return populate_owners(owner_ids, default_to_user=True)
return populate_owner_list(owner_ids, default_to_user=True)
class UpdateMixin: # pylint: disable=too-few-public-methods
@ -69,4 +69,18 @@ class UpdateMixin: # pylint: disable=too-few-public-methods
:raises OwnersNotFoundValidationError: if at least one owner can't be resolved
:returns: Final list of owners
"""
return populate_owners(owner_ids, default_to_user=False)
return populate_owner_list(owner_ids, default_to_user=False)
@staticmethod
def compute_owners(
current_owners: Optional[list[User]],
new_owners: Optional[list[int]],
) -> list[User]:
"""
Handle list of owners for update events.
:param current_owners: list of current owners
:param new_owners: list of new owners specified in the update payload
:returns: Final list of owners
"""
return compute_owner_list(current_owners, new_owners)

View File

@ -90,7 +90,10 @@ class UpdateChartCommand(UpdateMixin, BaseCommand):
if not is_query_context_update(self._properties):
try:
security_manager.raise_for_ownership(self._model)
owners = self.populate_owners(owner_ids)
owners = self.compute_owners(
self._model.owners,
owner_ids,
)
self._properties["owners"] = owners
except SupersetSecurityException as ex:
raise ChartForbiddenError() from ex

View File

@ -66,7 +66,7 @@ class UpdateDashboardCommand(UpdateMixin, BaseCommand):
def validate(self) -> None:
exceptions: list[ValidationError] = []
owners_ids: Optional[list[int]] = self._properties.get("owners")
owner_ids: Optional[list[int]] = self._properties.get("owners")
roles_ids: Optional[list[int]] = self._properties.get("roles")
slug: Optional[str] = self._properties.get("slug")
@ -85,10 +85,11 @@ class UpdateDashboardCommand(UpdateMixin, BaseCommand):
exceptions.append(DashboardSlugExistsValidationError())
# Validate/Populate owner
if owners_ids is None:
owners_ids = [owner.id for owner in self._model.owners]
try:
owners = self.populate_owners(owners_ids)
owners = self.compute_owners(
self._model.owners,
owner_ids,
)
self._properties["owners"] = owners
except ValidationError as ex:
exceptions.append(ex)

View File

@ -100,7 +100,10 @@ class UpdateDatasetCommand(UpdateMixin, BaseCommand):
exceptions.append(DatabaseChangeValidationError())
# Validate/Populate owner
try:
owners = self.populate_owners(owner_ids)
owners = self.compute_owners(
self._model.owners,
owner_ids,
)
self._properties["owners"] = owners
except ValidationError as ex:
exceptions.append(ex)

View File

@ -118,10 +118,11 @@ class UpdateReportScheduleCommand(UpdateMixin, BaseReportScheduleCommand):
raise ReportScheduleForbiddenError() from ex
# Validate/Populate owner
if owner_ids is None:
owner_ids = [owner.id for owner in self._model.owners]
try:
owners = self.populate_owners(owner_ids)
owners = self.compute_owners(
self._model.owners,
owner_ids,
)
self._properties["owners"] = owners
except ValidationError as ex:
exceptions.append(ex)

View File

@ -35,7 +35,7 @@ if TYPE_CHECKING:
from superset.connectors.sqla.models import BaseDatasource
def populate_owners(
def populate_owner_list(
owner_ids: list[int] | None,
default_to_user: bool,
) -> list[User]:
@ -62,6 +62,25 @@ def populate_owners(
return owners
def compute_owner_list(
current_owners: list[User] | None,
new_owners: list[int] | None,
) -> list[User]:
"""
Helper function for update commands, to properly handle the owners list.
Preserve the previous configuration unless included in the update payload.
:param current_owners: list of current owners
:param new_owners: list of new owners specified in the update payload
:returns: Final list of owners
"""
current_owners = current_owners or []
owners_ids = (
[owner.id for owner in current_owners] if new_owners is None else new_owners
)
return populate_owner_list(owners_ids, default_to_user=False)
def populate_roles(role_ids: list[int] | None = None) -> list[Role]:
"""
Helper function for commands, will fetch all roles from roles id's

View File

@ -31,7 +31,7 @@ from superset.commands.dataset.exceptions import (
DatasetForbiddenError,
DatasetNotFoundError,
)
from superset.commands.utils import populate_owners
from superset.commands.utils import populate_owner_list
from superset.connectors.sqla.models import SqlaTable
from superset.connectors.sqla.utils import get_physical_table_metadata
from superset.daos.datasource import DatasourceDAO
@ -94,7 +94,7 @@ class Datasource(BaseSupersetView):
except SupersetSecurityException as ex:
raise DatasetForbiddenError() from ex
datasource_dict["owners"] = populate_owners(
datasource_dict["owners"] = populate_owner_list(
datasource_dict["owners"], default_to_user=False
)

View File

@ -735,10 +735,59 @@ class TestChartApi(ApiOwnersTestCaseMixin, InsertChartMixin, SupersetTestCase):
db.session.delete(model)
db.session.commit()
@pytest.mark.usefixtures("add_dashboard_to_chart")
def test_update_chart_preserve_ownership(self):
"""
Chart API: Test update chart preserves owner list (if un-changed)
"""
chart_data = {
"slice_name": "title1_changed",
}
admin = self.get_user("admin")
self.login(username="admin")
uri = f"api/v1/chart/{self.chart.id}"
rv = self.put_assert_metric(uri, chart_data, "put")
self.assertEqual(rv.status_code, 200)
self.assertEqual([admin], self.chart.owners)
@pytest.mark.usefixtures("add_dashboard_to_chart")
def test_update_chart_clear_owner_list(self):
"""
Chart API: Test update chart admin can clear owner list
"""
chart_data = {"slice_name": "title1_changed", "owners": []}
admin = self.get_user("admin")
self.login(username="admin")
uri = f"api/v1/chart/{self.chart.id}"
rv = self.put_assert_metric(uri, chart_data, "put")
self.assertEqual(rv.status_code, 200)
self.assertEqual([], self.chart.owners)
def test_update_chart_populate_owner(self):
"""
Chart API: Test update admin can update chart with
no owners to a different owner
"""
gamma = self.get_user("gamma")
admin = self.get_user("admin")
chart_id = self.insert_chart("title", [], 1).id
model = db.session.query(Slice).get(chart_id)
self.assertEqual(model.owners, [])
chart_data = {"owners": [gamma.id]}
self.login(username="admin")
uri = f"api/v1/chart/{chart_id}"
rv = self.put_assert_metric(uri, chart_data, "put")
self.assertEqual(rv.status_code, 200)
model_updated = db.session.query(Slice).get(chart_id)
self.assertNotIn(admin, model_updated.owners)
self.assertIn(gamma, model_updated.owners)
db.session.delete(model_updated)
db.session.commit()
@pytest.mark.usefixtures("add_dashboard_to_chart")
def test_update_chart_new_dashboards(self):
"""
Chart API: Test update set new owner to current user
Chart API: Test update chart associating it with new dashboard
"""
chart_data = {
"slice_name": "title1_changed",
@ -754,7 +803,7 @@ class TestChartApi(ApiOwnersTestCaseMixin, InsertChartMixin, SupersetTestCase):
@pytest.mark.usefixtures("add_dashboard_to_chart")
def test_not_update_chart_none_dashboards(self):
"""
Chart API: Test update set new owner to current user
Chart API: Test update chart without changing dashboards configuration
"""
chart_data = {"slice_name": "title1_changed_again"}
self.login(username="admin")

View File

@ -1550,6 +1550,43 @@ class TestDashboardApi(ApiOwnersTestCaseMixin, InsertChartMixin, SupersetTestCas
db.session.delete(model)
db.session.commit()
def test_update_dashboard_clear_owner_list(self):
"""
Dashboard API: Test update admin can clear up owners list
"""
admin = self.get_user("admin")
dashboard_id = self.insert_dashboard("title1", "slug1", [admin.id]).id
self.login(username="admin")
uri = f"api/v1/dashboard/{dashboard_id}"
dashboard_data = {"owners": []}
rv = self.client.put(uri, json=dashboard_data)
self.assertEqual(rv.status_code, 200)
model = db.session.query(Dashboard).get(dashboard_id)
self.assertEqual([], model.owners)
db.session.delete(model)
db.session.commit()
def test_update_dashboard_populate_owner(self):
"""
Dashboard API: Test update admin can update dashboard with
no owners to a different owner
"""
self.login(username="admin")
gamma = self.get_user("gamma")
dashboard = self.insert_dashboard(
"title1",
"slug1",
[],
)
uri = f"api/v1/dashboard/{dashboard.id}"
dashboard_data = {"owners": [gamma.id]}
rv = self.client.put(uri, json=dashboard_data)
self.assertEqual(rv.status_code, 200)
model = db.session.query(Dashboard).get(dashboard.id)
self.assertEqual([gamma], model.owners)
db.session.delete(model)
db.session.commit()
def test_update_dashboard_slug_formatting(self):
"""
Dashboard API: Test update slug formatting

View File

@ -833,12 +833,66 @@ class TestDatasetApi(SupersetTestCase):
assert rv.status_code == 422
assert data == {"message": "Dataset could not be created."}
def test_update_dataset_preserve_ownership(self):
"""
Dataset API: Test update dataset preserves owner list (if un-changed)
"""
dataset = self.insert_default_dataset()
current_owners = dataset.owners
self.login(username="admin")
dataset_data = {"description": "new description"}
uri = f"api/v1/dataset/{dataset.id}"
rv = self.put_assert_metric(uri, dataset_data, "put")
assert rv.status_code == 200
model = db.session.query(SqlaTable).get(dataset.id)
assert model.owners == current_owners
db.session.delete(dataset)
db.session.commit()
def test_update_dataset_clear_owner_list(self):
"""
Dataset API: Test update dataset admin can clear ownership config
"""
dataset = self.insert_default_dataset()
self.login(username="admin")
dataset_data = {"owners": []}
uri = f"api/v1/dataset/{dataset.id}"
rv = self.put_assert_metric(uri, dataset_data, "put")
assert rv.status_code == 200
model = db.session.query(SqlaTable).get(dataset.id)
assert model.owners == []
db.session.delete(dataset)
db.session.commit()
def test_update_dataset_populate_owner(self):
"""
Dataset API: Test update admin can update dataset with
no owners to a different owner
"""
self.login(username="admin")
gamma = self.get_user("gamma")
dataset = self.insert_dataset("ab_permission", [], get_main_database())
dataset_data = {"owners": [gamma.id]}
uri = f"api/v1/dataset/{dataset.id}"
rv = self.put_assert_metric(uri, dataset_data, "put")
assert rv.status_code == 200
model = db.session.query(SqlaTable).get(dataset.id)
assert model.owners == [gamma]
db.session.delete(dataset)
db.session.commit()
def test_update_dataset_item(self):
"""
Dataset API: Test update dataset item
"""
dataset = self.insert_default_dataset()
current_owners = dataset.owners
self.login(username="admin")
dataset_data = {"description": "changed_description"}
uri = f"api/v1/dataset/{dataset.id}"
@ -846,6 +900,7 @@ class TestDatasetApi(SupersetTestCase):
assert rv.status_code == 200
model = db.session.query(SqlaTable).get(dataset.id)
assert model.description == dataset_data["description"]
assert model.owners == current_owners
db.session.delete(dataset)
db.session.commit()

View File

@ -1458,6 +1458,93 @@ class TestReportSchedulesApi(SupersetTestCase):
rv = self.put_assert_metric(uri, report_schedule_data, "put")
self.assertEqual(rv.status_code, 403)
@pytest.mark.usefixtures("create_report_schedules")
def test_update_report_preserve_ownership(self):
"""
ReportSchedule API: Test update report preserves owner list (if un-changed)
"""
self.login(username="admin")
existing_report = (
db.session.query(ReportSchedule)
.filter(ReportSchedule.name == "name1")
.one_or_none()
)
current_owners = existing_report.owners
report_schedule_data = {
"description": "Updated description",
}
uri = f"api/v1/report/{existing_report.id}"
rv = self.put_assert_metric(uri, report_schedule_data, "put")
updated_report = (
db.session.query(ReportSchedule)
.filter(ReportSchedule.name == "name1")
.one_or_none()
)
assert updated_report.owners == current_owners
@pytest.mark.usefixtures("create_report_schedules")
def test_update_report_clear_owner_list(self):
"""
ReportSchedule API: Test update report admin can clear ownership config
"""
self.login(username="admin")
existing_report = (
db.session.query(ReportSchedule)
.filter(ReportSchedule.name == "name1")
.one_or_none()
)
report_schedule_data = {
"owners": [],
}
uri = f"api/v1/report/{existing_report.id}"
rv = self.put_assert_metric(uri, report_schedule_data, "put")
updated_report = (
db.session.query(ReportSchedule)
.filter(ReportSchedule.name == "name1")
.one_or_none()
)
assert updated_report.owners == []
@pytest.mark.usefixtures("create_report_schedules")
def test_update_report_populate_owner(self):
"""
ReportSchedule API: Test update admin can update report with
no owners to a different owner
"""
gamma = self.get_user("gamma")
self.login(username="admin")
# Modify an existing report to make remove all owners
existing_report = (
db.session.query(ReportSchedule)
.filter(ReportSchedule.name == "name1")
.one_or_none()
)
report_update_data = {
"owners": [],
}
uri = f"api/v1/report/{existing_report.id}"
rv = self.put_assert_metric(uri, report_update_data, "put")
updated_report = (
db.session.query(ReportSchedule)
.filter(ReportSchedule.name == "name1")
.one_or_none()
)
assert updated_report.owners == []
# Populate the field
report_update_data = {
"owners": [gamma.id],
}
uri = f"api/v1/report/{updated_report.id}"
rv = self.put_assert_metric(uri, report_update_data, "put")
updated_report = (
db.session.query(ReportSchedule)
.filter(ReportSchedule.name == "name1")
.one_or_none()
)
assert updated_report.owners == [gamma]
@pytest.mark.usefixtures("create_report_schedules")
def test_delete_report_schedule(self):
"""

View File

@ -0,0 +1,118 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest.mock import MagicMock, patch
import pytest
from superset.commands.utils import compute_owner_list, populate_owner_list, User
@patch("superset.commands.utils.g")
def test_populate_owner_list_default_to_user(mock_user):
owner_list = populate_owner_list([], True)
assert owner_list == [mock_user.user]
@patch("superset.commands.utils.g")
def test_populate_owner_list_default_to_user_handle_none(mock_user):
owner_list = populate_owner_list(None, True)
assert owner_list == [mock_user.user]
@patch("superset.commands.utils.g")
@patch("superset.commands.utils.security_manager")
@patch("superset.commands.utils.get_user_id")
def test_populate_owner_list_admin_user(mock_user_id, mock_sm, mock_g):
test_user = User(id=1, first_name="First", last_name="Last")
mock_g.user = User(id=4, first_name="Admin", last_name="User")
mock_user_id.return_value = 4
mock_sm.is_admin = MagicMock(return_value=True)
mock_sm.get_user_by_id = MagicMock(return_value=test_user)
owner_list = populate_owner_list([1], False)
assert owner_list == [test_user]
@patch("superset.commands.utils.g")
@patch("superset.commands.utils.security_manager")
@patch("superset.commands.utils.get_user_id")
def test_populate_owner_list_admin_user_empty_list(mock_user_id, mock_sm, mock_g):
mock_g.user = User(id=4, first_name="Admin", last_name="User")
mock_user_id.return_value = 4
mock_sm.is_admin = MagicMock(return_value=True)
owner_list = populate_owner_list([], False)
assert owner_list == []
@patch("superset.commands.utils.g")
@patch("superset.commands.utils.security_manager")
@patch("superset.commands.utils.get_user_id")
def test_populate_owner_list_non_admin(mock_user_id, mock_sm, mock_g):
test_user = User(id=1, first_name="First", last_name="Last")
mock_g.user = User(id=4, first_name="Non", last_name="admin")
mock_user_id.return_value = 4
mock_sm.is_admin = MagicMock(return_value=False)
mock_sm.get_user_by_id = MagicMock(return_value=test_user)
owner_list = populate_owner_list([1], False)
assert owner_list == [mock_g.user, test_user]
@patch("superset.commands.utils.populate_owner_list")
def test_compute_owner_list_new_owners(mock_populate_owner_list):
current_owners = [User(id=1), User(id=2), User(id=3)]
new_owners = [4, 5, 6]
compute_owner_list(current_owners, new_owners)
mock_populate_owner_list.assert_called_once_with(new_owners, default_to_user=False)
@patch("superset.commands.utils.populate_owner_list")
def test_compute_owner_list_no_new_owners(mock_populate_owner_list):
current_owners = [User(id=1), User(id=2), User(id=3)]
new_owners = None
compute_owner_list(current_owners, new_owners)
mock_populate_owner_list.assert_called_once_with([1, 2, 3], default_to_user=False)
@patch("superset.commands.utils.populate_owner_list")
def test_compute_owner_list_new_owner_empty_list(mock_populate_owner_list):
current_owners = [User(id=1), User(id=2), User(id=3)]
new_owners = []
compute_owner_list(current_owners, new_owners)
mock_populate_owner_list.assert_called_once_with(new_owners, default_to_user=False)
@patch("superset.commands.utils.populate_owner_list")
def test_compute_owner_list_no_owners(mock_populate_owner_list):
current_owners = []
new_owners = [4, 5, 6]
compute_owner_list(current_owners, new_owners)
mock_populate_owner_list.assert_called_once_with(new_owners, default_to_user=False)
@patch("superset.commands.utils.populate_owner_list")
def test_compute_owner_list_no_owners_handle_none(mock_populate_owner_list):
current_owners = None
new_owners = [4, 5, 6]
compute_owner_list(current_owners, new_owners)
mock_populate_owner_list.assert_called_once_with(new_owners, default_to_user=False)