# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """Unit tests for Superset""" import json from typing import List from unittest.mock import patch import prison import pytest import yaml from sqlalchemy.sql import func from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn from superset.dao.exceptions import ( DAOCreateFailedError, DAODeleteFailedError, DAOUpdateFailedError, ) from superset.extensions import db, security_manager from superset.models.core import Database from superset.utils.core import backend, get_example_database, get_main_database from superset.utils.dict_import_export import export_to_dict from tests.base_tests import SupersetTestCase from tests.conftest import CTAS_SCHEMA_NAME class TestDatasetApi(SupersetTestCase): fixture_tables_names = ("ab_permission", "ab_permission_view", "ab_view_menu") @staticmethod def insert_dataset( table_name: str, schema: str, owners: List[int], database: Database ) -> SqlaTable: obj_owners = list() for owner in owners: user = db.session.query(security_manager.user_model).get(owner) obj_owners.append(user) table = SqlaTable( table_name=table_name, schema=schema, owners=obj_owners, database=database ) db.session.add(table) db.session.commit() table.fetch_metadata() return table def insert_default_dataset(self): return self.insert_dataset( "ab_permission", "", [self.get_user("admin").id], get_main_database() ) def get_fixture_datasets(self) -> List[SqlaTable]: return ( db.session.query(SqlaTable) .filter(SqlaTable.table_name.in_(self.fixture_tables_names)) .all() ) @pytest.fixture() def create_datasets(self): with self.create_app().app_context(): datasets = [] admin = self.get_user("admin") main_db = get_main_database() for tables_name in self.fixture_tables_names: datasets.append( self.insert_dataset(tables_name, "", [admin.id], main_db) ) yield datasets # rollback changes for dataset in datasets: db.session.delete(dataset) db.session.commit() @staticmethod def get_energy_usage_dataset(): example_db = get_example_database() return ( db.session.query(SqlaTable) .filter_by(database=example_db, table_name="energy_usage") .one() ) def test_get_dataset_list(self): """ Dataset API: Test get dataset list """ example_db = get_example_database() self.login(username="admin") arguments = { "filters": [ {"col": "database", "opr": "rel_o_m", "value": f"{example_db.id}"}, {"col": "table_name", "opr": "eq", "value": f"birth_names"}, ] } uri = f"api/v1/dataset/?q={prison.dumps(arguments)}" rv = self.get_assert_metric(uri, "get_list") assert rv.status_code == 200 response = json.loads(rv.data.decode("utf-8")) assert response["count"] == 1 expected_columns = [ "changed_by", "changed_by_name", "changed_by_url", "changed_on_delta_humanized", "changed_on_utc", "database", "default_endpoint", "explore_url", "id", "kind", "owners", "schema", "sql", "table_name", ] assert sorted(list(response["result"][0].keys())) == expected_columns def test_get_dataset_list_gamma(self): """ Dataset API: Test get dataset list gamma """ example_db = get_example_database() self.login(username="gamma") uri = "api/v1/dataset/" rv = self.get_assert_metric(uri, "get_list") assert rv.status_code == 200 response = json.loads(rv.data.decode("utf-8")) assert response["result"] == [] def test_get_dataset_related_database_gamma(self): """ Dataset API: Test get dataset related databases gamma """ self.login(username="gamma") uri = "api/v1/dataset/related/database" rv = self.client.get(uri) assert rv.status_code == 200 response = json.loads(rv.data.decode("utf-8")) assert response["count"] == 0 assert response["result"] == [] def test_get_dataset_item(self): """ Dataset API: Test get dataset item """ table = self.get_energy_usage_dataset() self.login(username="admin") uri = f"api/v1/dataset/{table.id}" rv = self.get_assert_metric(uri, "get") assert rv.status_code == 200 response = json.loads(rv.data.decode("utf-8")) expected_result = { "cache_timeout": None, "database": {"database_name": "examples", "id": 1}, "default_endpoint": None, "description": "Energy consumption", "fetch_values_predicate": None, "filter_select_enabled": False, "is_sqllab_view": False, "main_dttm_col": None, "offset": 0, "owners": [], "schema": None, "sql": None, "table_name": "energy_usage", "template_params": None, } assert { k: v for k, v in response["result"].items() if k in expected_result } == expected_result assert len(response["result"]["columns"]) == 3 assert len(response["result"]["metrics"]) == 2 def test_get_dataset_distinct_schema(self): """ Dataset API: Test get dataset distinct schema """ def pg_test_query_parameter(query_parameter, expected_response): uri = f"api/v1/dataset/distinct/schema?q={prison.dumps(query_parameter)}" rv = self.client.get(uri) response = json.loads(rv.data.decode("utf-8")) assert rv.status_code == 200 assert response == expected_response example_db = get_example_database() datasets = [] if example_db.backend == "postgresql": datasets.append( self.insert_dataset("ab_permission", "public", [], get_main_database()) ) datasets.append( self.insert_dataset( "columns", "information_schema", [], get_main_database() ) ) schema_values = [ "", "admin_database", "information_schema", "public", ] expected_response = { "count": 4, "result": [{"text": val, "value": val} for val in schema_values], } self.login(username="admin") uri = "api/v1/dataset/distinct/schema" rv = self.client.get(uri) response = json.loads(rv.data.decode("utf-8")) assert rv.status_code == 200 assert response == expected_response # Test filter query_parameter = {"filter": "inf"} pg_test_query_parameter( query_parameter, { "count": 1, "result": [ {"text": "information_schema", "value": "information_schema"} ], }, ) query_parameter = {"page": 0, "page_size": 1} pg_test_query_parameter( query_parameter, {"count": 4, "result": [{"text": "", "value": ""}]}, ) query_parameter = {"page": 1, "page_size": 1} pg_test_query_parameter( query_parameter, { "count": 4, "result": [{"text": "admin_database", "value": "admin_database"}], }, ) for dataset in datasets: db.session.delete(dataset) db.session.commit() def test_get_dataset_distinct_not_allowed(self): """ Dataset API: Test get dataset distinct not allowed """ self.login(username="admin") uri = "api/v1/dataset/distinct/table_name" rv = self.client.get(uri) assert rv.status_code == 404 def test_get_dataset_distinct_gamma(self): """ Dataset API: Test get dataset distinct with gamma """ dataset = self.insert_default_dataset() self.login(username="gamma") uri = "api/v1/dataset/distinct/schema" rv = self.client.get(uri) assert rv.status_code == 200 response = json.loads(rv.data.decode("utf-8")) assert response["count"] == 0 assert response["result"] == [] db.session.delete(dataset) db.session.commit() def test_get_dataset_info(self): """ Dataset API: Test get dataset info """ self.login(username="admin") uri = "api/v1/dataset/_info" rv = self.get_assert_metric(uri, "info") assert rv.status_code == 200 def test_create_dataset_item(self): """ Dataset API: Test create dataset item """ main_db = get_main_database() self.login(username="admin") table_data = { "database": main_db.id, "schema": "", "table_name": "ab_permission", } uri = "api/v1/dataset/" rv = self.post_assert_metric(uri, table_data, "post") assert rv.status_code == 201 data = json.loads(rv.data.decode("utf-8")) table_id = data.get("id") model = db.session.query(SqlaTable).get(table_id) assert model.table_name == table_data["table_name"] assert model.database_id == table_data["database"] # Assert that columns were created columns = ( db.session.query(TableColumn) .filter_by(table_id=table_id) .order_by("column_name") .all() ) assert columns[0].column_name == "id" assert columns[1].column_name == "name" # Assert that metrics were created columns = ( db.session.query(SqlMetric) .filter_by(table_id=table_id) .order_by("metric_name") .all() ) assert columns[0].expression == "COUNT(*)" db.session.delete(model) db.session.commit() def test_create_dataset_item_gamma(self): """ Dataset API: Test create dataset item gamma """ self.login(username="gamma") main_db = get_main_database() table_data = { "database": main_db.id, "schema": "", "table_name": "ab_permission", } uri = "api/v1/dataset/" rv = self.client.post(uri, json=table_data) assert rv.status_code == 401 def test_create_dataset_item_owner(self): """ Dataset API: Test create item owner """ main_db = get_main_database() self.login(username="alpha") admin = self.get_user("admin") alpha = self.get_user("alpha") table_data = { "database": main_db.id, "schema": "", "table_name": "ab_permission", "owners": [admin.id], } uri = "api/v1/dataset/" rv = self.post_assert_metric(uri, table_data, "post") assert rv.status_code == 201 data = json.loads(rv.data.decode("utf-8")) model = db.session.query(SqlaTable).get(data.get("id")) assert admin in model.owners assert alpha in model.owners db.session.delete(model) db.session.commit() def test_create_dataset_item_owners_invalid(self): """ Dataset API: Test create dataset item owner invalid """ admin = self.get_user("admin") main_db = get_main_database() self.login(username="admin") table_data = { "database": main_db.id, "schema": "", "table_name": "ab_permission", "owners": [admin.id, 1000], } uri = f"api/v1/dataset/" rv = self.post_assert_metric(uri, table_data, "post") assert rv.status_code == 422 data = json.loads(rv.data.decode("utf-8")) expected_result = {"message": {"owners": ["Owners are invalid"]}} assert data == expected_result def test_create_dataset_validate_uniqueness(self): """ Dataset API: Test create dataset validate table uniqueness """ example_db = get_example_database() self.login(username="admin") table_data = { "database": example_db.id, "schema": "", "table_name": "birth_names", } uri = "api/v1/dataset/" rv = self.post_assert_metric(uri, table_data, "post") assert rv.status_code == 422 data = json.loads(rv.data.decode("utf-8")) assert data == { "message": {"table_name": ["Datasource birth_names already exists"]} } def test_create_dataset_same_name_different_schema(self): if backend() == "sqlite": # sqlite doesn't support schemas return example_db = get_example_database() example_db.get_sqla_engine().execute( f"CREATE TABLE {CTAS_SCHEMA_NAME}.birth_names AS SELECT 2 as two" ) self.login(username="admin") table_data = { "database": example_db.id, "schema": CTAS_SCHEMA_NAME, "table_name": "birth_names", } uri = "api/v1/dataset/" rv = self.post_assert_metric(uri, table_data, "post") assert rv.status_code == 201 # cleanup data = json.loads(rv.data.decode("utf-8")) uri = f'api/v1/dataset/{data.get("id")}' rv = self.client.delete(uri) assert rv.status_code == 200 example_db.get_sqla_engine().execute( f"DROP TABLE {CTAS_SCHEMA_NAME}.birth_names" ) def test_create_dataset_validate_database(self): """ Dataset API: Test create dataset validate database exists """ self.login(username="admin") dataset_data = {"database": 1000, "schema": "", "table_name": "birth_names"} uri = "api/v1/dataset/" rv = self.post_assert_metric(uri, dataset_data, "post") assert rv.status_code == 422 data = json.loads(rv.data.decode("utf-8")) assert data == {"message": {"database": ["Database does not exist"]}} def test_create_dataset_validate_tables_exists(self): """ Dataset API: Test create dataset validate table exists """ example_db = get_example_database() self.login(username="admin") table_data = { "database": example_db.id, "schema": "", "table_name": "does_not_exist", } uri = "api/v1/dataset/" rv = self.post_assert_metric(uri, table_data, "post") assert rv.status_code == 422 @patch("superset.datasets.dao.DatasetDAO.create") def test_create_dataset_sqlalchemy_error(self, mock_dao_create): """ Dataset API: Test create dataset sqlalchemy error """ mock_dao_create.side_effect = DAOCreateFailedError() self.login(username="admin") main_db = get_main_database() dataset_data = { "database": main_db.id, "schema": "", "table_name": "ab_permission", } uri = "api/v1/dataset/" rv = self.post_assert_metric(uri, dataset_data, "post") data = json.loads(rv.data.decode("utf-8")) assert rv.status_code == 422 assert data == {"message": "Dataset could not be created."} def test_update_dataset_item(self): """ Dataset API: Test update dataset item """ dataset = self.insert_default_dataset() self.login(username="admin") dataset_data = {"description": "changed_description"} uri = f"api/v1/dataset/{dataset.id}" rv = self.put_assert_metric(uri, dataset_data, "put") assert rv.status_code == 200 model = db.session.query(SqlaTable).get(dataset.id) assert model.description == dataset_data["description"] db.session.delete(dataset) db.session.commit() def test_update_dataset_create_column(self): """ Dataset API: Test update dataset create column """ # create example dataset by Command dataset = self.insert_default_dataset() new_column_data = { "column_name": "new_col", "description": "description", "expression": "expression", "type": "INTEGER", "verbose_name": "New Col", } uri = f"api/v1/dataset/{dataset.id}" # Get current cols and append the new column self.login(username="admin") rv = self.get_assert_metric(uri, "get") data = json.loads(rv.data.decode("utf-8")) for column in data["result"]["columns"]: column.pop("changed_on", None) column.pop("created_on", None) data["result"]["columns"].append(new_column_data) rv = self.client.put(uri, json={"columns": data["result"]["columns"]}) assert rv.status_code == 200 columns = ( db.session.query(TableColumn) .filter_by(table_id=dataset.id) .order_by("column_name") .all() ) assert columns[0].column_name == "id" assert columns[1].column_name == "name" assert columns[2].column_name == new_column_data["column_name"] assert columns[2].description == new_column_data["description"] assert columns[2].expression == new_column_data["expression"] assert columns[2].type == new_column_data["type"] assert columns[2].verbose_name == new_column_data["verbose_name"] db.session.delete(dataset) db.session.commit() def test_update_dataset_update_column(self): """ Dataset API: Test update dataset columns """ dataset = self.insert_default_dataset() self.login(username="admin") uri = f"api/v1/dataset/{dataset.id}" # Get current cols and alter one rv = self.get_assert_metric(uri, "get") resp_columns = json.loads(rv.data.decode("utf-8"))["result"]["columns"] for column in resp_columns: column.pop("changed_on", None) column.pop("created_on", None) resp_columns[0]["groupby"] = False resp_columns[0]["filterable"] = False rv = self.client.put(uri, json={"columns": resp_columns}) assert rv.status_code == 200 columns = ( db.session.query(TableColumn) .filter_by(table_id=dataset.id) .order_by("column_name") .all() ) assert columns[0].column_name == "id" assert columns[1].column_name, "name" # TODO(bkyryliuk): find the reason why update is failing for the presto database if get_example_database().backend != "presto": assert columns[0].groupby is False assert columns[0].filterable is False db.session.delete(dataset) db.session.commit() def test_update_dataset_update_column_uniqueness(self): """ Dataset API: Test update dataset columns uniqueness """ dataset = self.insert_default_dataset() self.login(username="admin") uri = f"api/v1/dataset/{dataset.id}" # try to insert a new column ID that already exists data = {"columns": [{"column_name": "id", "type": "INTEGER"}]} rv = self.put_assert_metric(uri, data, "put") assert rv.status_code == 422 data = json.loads(rv.data.decode("utf-8")) expected_result = { "message": {"columns": ["One or more columns already exist"]} } assert data == expected_result db.session.delete(dataset) db.session.commit() def test_update_dataset_update_metric_uniqueness(self): """ Dataset API: Test update dataset metric uniqueness """ dataset = self.insert_default_dataset() self.login(username="admin") uri = f"api/v1/dataset/{dataset.id}" # try to insert a new column ID that already exists data = {"metrics": [{"metric_name": "count", "expression": "COUNT(*)"}]} rv = self.put_assert_metric(uri, data, "put") assert rv.status_code == 422 data = json.loads(rv.data.decode("utf-8")) expected_result = { "message": {"metrics": ["One or more metrics already exist"]} } assert data == expected_result db.session.delete(dataset) db.session.commit() def test_update_dataset_update_column_duplicate(self): """ Dataset API: Test update dataset columns duplicate """ dataset = self.insert_default_dataset() self.login(username="admin") uri = f"api/v1/dataset/{dataset.id}" # try to insert a new column ID that already exists data = { "columns": [ {"column_name": "id", "type": "INTEGER"}, {"column_name": "id", "type": "VARCHAR"}, ] } rv = self.put_assert_metric(uri, data, "put") assert rv.status_code == 422 data = json.loads(rv.data.decode("utf-8")) expected_result = { "message": {"columns": ["One or more columns are duplicated"]} } assert data == expected_result db.session.delete(dataset) db.session.commit() def test_update_dataset_update_metric_duplicate(self): """ Dataset API: Test update dataset metric duplicate """ dataset = self.insert_default_dataset() self.login(username="admin") uri = f"api/v1/dataset/{dataset.id}" # try to insert a new column ID that already exists data = { "metrics": [ {"metric_name": "dup", "expression": "COUNT(*)"}, {"metric_name": "dup", "expression": "DIFF_COUNT(*)"}, ] } rv = self.put_assert_metric(uri, data, "put") assert rv.status_code == 422 data = json.loads(rv.data.decode("utf-8")) expected_result = { "message": {"metrics": ["One or more metrics are duplicated"]} } assert data == expected_result db.session.delete(dataset) db.session.commit() def test_update_dataset_item_gamma(self): """ Dataset API: Test update dataset item gamma """ dataset = self.insert_default_dataset() self.login(username="gamma") table_data = {"description": "changed_description"} uri = f"api/v1/dataset/{dataset.id}" rv = self.client.put(uri, json=table_data) assert rv.status_code == 401 db.session.delete(dataset) db.session.commit() def test_update_dataset_item_not_owned(self): """ Dataset API: Test update dataset item not owned """ dataset = self.insert_default_dataset() self.login(username="alpha") table_data = {"description": "changed_description"} uri = f"api/v1/dataset/{dataset.id}" rv = self.put_assert_metric(uri, table_data, "put") assert rv.status_code == 403 db.session.delete(dataset) db.session.commit() def test_update_dataset_item_owners_invalid(self): """ Dataset API: Test update dataset item owner invalid """ dataset = self.insert_default_dataset() self.login(username="admin") table_data = {"description": "changed_description", "owners": [1000]} uri = f"api/v1/dataset/{dataset.id}" rv = self.put_assert_metric(uri, table_data, "put") assert rv.status_code == 422 db.session.delete(dataset) db.session.commit() def test_update_dataset_item_uniqueness(self): """ Dataset API: Test update dataset uniqueness """ dataset = self.insert_default_dataset() self.login(username="admin") ab_user = self.insert_dataset( "ab_user", "", [self.get_user("admin").id], get_main_database() ) table_data = {"table_name": "ab_user"} uri = f"api/v1/dataset/{dataset.id}" rv = self.put_assert_metric(uri, table_data, "put") data = json.loads(rv.data.decode("utf-8")) assert rv.status_code == 422 expected_response = { "message": {"table_name": ["Datasource ab_user already exists"]} } assert data == expected_response db.session.delete(dataset) db.session.delete(ab_user) db.session.commit() @patch("superset.datasets.dao.DatasetDAO.update") def test_update_dataset_sqlalchemy_error(self, mock_dao_update): """ Dataset API: Test update dataset sqlalchemy error """ mock_dao_update.side_effect = DAOUpdateFailedError() dataset = self.insert_default_dataset() self.login(username="admin") table_data = {"description": "changed_description"} uri = f"api/v1/dataset/{dataset.id}" rv = self.client.put(uri, json=table_data) data = json.loads(rv.data.decode("utf-8")) assert rv.status_code == 422 assert data == {"message": "Dataset could not be updated."} db.session.delete(dataset) db.session.commit() def test_delete_dataset_item(self): """ Dataset API: Test delete dataset item """ dataset = self.insert_default_dataset() view_menu = security_manager.find_view_menu(dataset.get_perm()) assert view_menu is not None view_menu_id = view_menu.id self.login(username="admin") uri = f"api/v1/dataset/{dataset.id}" rv = self.client.delete(uri) assert rv.status_code == 200 non_view_menu = db.session.query(security_manager.viewmenu_model).get( view_menu_id ) assert non_view_menu is None def test_delete_item_dataset_not_owned(self): """ Dataset API: Test delete item not owned """ dataset = self.insert_default_dataset() self.login(username="alpha") uri = f"api/v1/dataset/{dataset.id}" rv = self.delete_assert_metric(uri, "delete") assert rv.status_code == 403 db.session.delete(dataset) db.session.commit() def test_delete_dataset_item_not_authorized(self): """ Dataset API: Test delete item not authorized """ dataset = self.insert_default_dataset() self.login(username="gamma") uri = f"api/v1/dataset/{dataset.id}" rv = self.client.delete(uri) assert rv.status_code == 401 db.session.delete(dataset) db.session.commit() @patch("superset.datasets.dao.DatasetDAO.delete") def test_delete_dataset_sqlalchemy_error(self, mock_dao_delete): """ Dataset API: Test delete dataset sqlalchemy error """ mock_dao_delete.side_effect = DAODeleteFailedError() dataset = self.insert_default_dataset() self.login(username="admin") uri = f"api/v1/dataset/{dataset.id}" rv = self.delete_assert_metric(uri, "delete") data = json.loads(rv.data.decode("utf-8")) assert rv.status_code == 422 assert data == {"message": "Dataset could not be deleted."} db.session.delete(dataset) db.session.commit() @pytest.mark.usefixtures("create_datasets") def test_bulk_delete_dataset_items(self): """ Dataset API: Test bulk delete dataset items """ datasets = self.get_fixture_datasets() dataset_ids = [dataset.id for dataset in datasets] view_menu_names = [] for dataset in datasets: view_menu_names.append(dataset.get_perm()) self.login(username="admin") uri = f"api/v1/dataset/?q={prison.dumps(dataset_ids)}" rv = self.delete_assert_metric(uri, "bulk_delete") data = json.loads(rv.data.decode("utf-8")) assert rv.status_code == 200 expected_response = {"message": f"Deleted {len(datasets)} datasets"} assert data == expected_response datasets = ( db.session.query(SqlaTable) .filter(SqlaTable.table_name.in_(self.fixture_tables_names)) .all() ) assert datasets == [] # Assert permissions get cleaned for view_menu_name in view_menu_names: assert security_manager.find_view_menu(view_menu_name) is None @pytest.mark.usefixtures("create_datasets") def test_bulk_delete_item_dataset_not_owned(self): """ Dataset API: Test bulk delete item not owned """ datasets = self.get_fixture_datasets() dataset_ids = [dataset.id for dataset in datasets] self.login(username="alpha") uri = f"api/v1/dataset/?q={prison.dumps(dataset_ids)}" rv = self.delete_assert_metric(uri, "bulk_delete") assert rv.status_code == 403 @pytest.mark.usefixtures("create_datasets") def test_bulk_delete_item_not_found(self): """ Dataset API: Test bulk delete item not found """ datasets = self.get_fixture_datasets() dataset_ids = [dataset.id for dataset in datasets] dataset_ids.append(db.session.query(func.max(SqlaTable.id)).scalar()) self.login(username="admin") uri = f"api/v1/dataset/?q={prison.dumps(dataset_ids)}" rv = self.delete_assert_metric(uri, "bulk_delete") assert rv.status_code == 404 @pytest.mark.usefixtures("create_datasets") def test_bulk_delete_dataset_item_not_authorized(self): """ Dataset API: Test bulk delete item not authorized """ datasets = self.get_fixture_datasets() dataset_ids = [dataset.id for dataset in datasets] self.login(username="gamma") uri = f"api/v1/dataset/?q={prison.dumps(dataset_ids)}" rv = self.client.delete(uri) assert rv.status_code == 401 @pytest.mark.usefixtures("create_datasets") def test_bulk_delete_dataset_item_incorrect(self): """ Dataset API: Test bulk delete item incorrect request """ datasets = self.get_fixture_datasets() dataset_ids = [dataset.id for dataset in datasets] dataset_ids.append("Wrong") self.login(username="admin") uri = f"api/v1/dataset/?q={prison.dumps(dataset_ids)}" rv = self.client.delete(uri) assert rv.status_code == 400 def test_dataset_item_refresh(self): """ Dataset API: Test item refresh """ dataset = self.insert_default_dataset() # delete a column id_column = ( db.session.query(TableColumn) .filter_by(table_id=dataset.id, column_name="id") .one() ) db.session.delete(id_column) db.session.commit() self.login(username="admin") uri = f"api/v1/dataset/{dataset.id}/refresh" rv = self.put_assert_metric(uri, {}, "refresh") assert rv.status_code == 200 # Assert the column is restored on refresh id_column = ( db.session.query(TableColumn) .filter_by(table_id=dataset.id, column_name="id") .one() ) assert id_column is not None db.session.delete(dataset) db.session.commit() def test_dataset_item_refresh_not_found(self): """ Dataset API: Test item refresh not found dataset """ max_id = db.session.query(func.max(SqlaTable.id)).scalar() self.login(username="admin") uri = f"api/v1/dataset/{max_id + 1}/refresh" rv = self.put_assert_metric(uri, {}, "refresh") assert rv.status_code == 404 def test_dataset_item_refresh_not_owned(self): """ Dataset API: Test item refresh not owned dataset """ dataset = self.insert_default_dataset() self.login(username="alpha") uri = f"api/v1/dataset/{dataset.id}/refresh" rv = self.put_assert_metric(uri, {}, "refresh") assert rv.status_code == 403 db.session.delete(dataset) db.session.commit() def test_export_dataset(self): """ Dataset API: Test export dataset """ birth_names_dataset = self.get_birth_names_dataset() # TODO: fix test for presto # debug with dump: https://github.com/apache/incubator-superset/runs/1092546855 if birth_names_dataset.database.backend in {"presto", "hive"}: return argument = [birth_names_dataset.id] uri = f"api/v1/dataset/export/?q={prison.dumps(argument)}" self.login(username="admin") rv = self.get_assert_metric(uri, "export") assert rv.status_code == 200 cli_export = export_to_dict( session=db.session, recursive=True, back_references=False, include_defaults=False, ) cli_export_tables = cli_export["databases"][0]["tables"] expected_response = {} for export_table in cli_export_tables: if export_table["table_name"] == "birth_names": expected_response = export_table break ui_export = yaml.safe_load(rv.data.decode("utf-8")) assert ui_export[0] == expected_response def test_export_dataset_not_found(self): """ Dataset API: Test export dataset not found """ max_id = db.session.query(func.max(SqlaTable.id)).scalar() # Just one does not exist and we get 404 argument = [max_id + 1, 1] uri = f"api/v1/dataset/export/?q={prison.dumps(argument)}" self.login(username="admin") rv = self.get_assert_metric(uri, "export") assert rv.status_code == 404 def test_export_dataset_gamma(self): """ Dataset API: Test export dataset has gamma """ birth_names_dataset = self.get_birth_names_dataset() argument = [birth_names_dataset.id] uri = f"api/v1/dataset/export/?q={prison.dumps(argument)}" self.login(username="gamma") rv = self.client.get(uri) assert rv.status_code == 401 def test_get_dataset_related_objects(self): """ Dataset API: Test get chart and dashboard count related to a dataset :return: """ self.login(username="admin") table = self.get_birth_names_dataset() uri = f"api/v1/dataset/{table.id}/related_objects" rv = self.get_assert_metric(uri, "related_objects") response = json.loads(rv.data.decode("utf-8")) assert rv.status_code == 200 assert response["charts"]["count"] == 18 assert response["dashboards"]["count"] == 1 def test_get_dataset_related_objects_not_found(self): """ Dataset API: Test related objects not found """ max_id = db.session.query(func.max(SqlaTable.id)).scalar() # id does not exist and we get 404 invalid_id = max_id + 1 uri = f"api/v1/dataset/{invalid_id}/related_objects/" self.login(username="admin") rv = self.client.get(uri) assert rv.status_code == 404 self.logout() self.login(username="gamma") table = self.get_birth_names_dataset() uri = f"api/v1/dataset/{table.id}/related_objects" rv = self.client.get(uri) assert rv.status_code == 404