# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # pylint: disable=no-self-use, invalid-name from unittest import mock from unittest.mock import patch import pytest import yaml from sqlalchemy.exc import DBAPIError from superset import db, event_logger, security_manager from superset.commands.exceptions import CommandInvalidError from superset.commands.importers.exceptions import IncorrectVersionError from superset.connectors.sqla.models import SqlaTable from superset.databases.commands.exceptions import ( DatabaseNotFoundError, DatabaseSecurityUnsafeError, DatabaseTestConnectionDriverError, DatabaseTestConnectionFailedError, DatabaseTestConnectionUnexpectedError, ) from superset.databases.commands.export import ExportDatabasesCommand from superset.databases.commands.importers.v1 import ImportDatabasesCommand from superset.databases.commands.test_connection import TestConnectionDatabaseCommand from superset.databases.schemas import DatabaseTestConnectionSchema from superset.errors import SupersetError from superset.exceptions import SupersetSecurityException from superset.models.core import Database from superset.utils.core import backend, get_example_database from tests.base_tests import SupersetTestCase from tests.fixtures.birth_names_dashboard import load_birth_names_dashboard_with_slices from tests.fixtures.energy_dashboard import load_energy_table_with_slice from tests.fixtures.importexport import ( database_config, database_metadata_config, dataset_config, dataset_metadata_config, ) class TestExportDatabasesCommand(SupersetTestCase): @patch("superset.security.manager.g") @pytest.mark.usefixtures( "load_birth_names_dashboard_with_slices", "load_energy_table_with_slice" ) def test_export_database_command(self, mock_g): mock_g.user = security_manager.find_user("admin") example_db = get_example_database() db_uuid = example_db.uuid command = ExportDatabasesCommand([example_db.id]) contents = dict(command.run()) # TODO: this list shouldn't depend on the order in which unit tests are run # or on the backend; for now use a stable subset core_files = { "metadata.yaml", "databases/examples.yaml", "datasets/examples/energy_usage.yaml", "datasets/examples/wb_health_population.yaml", "datasets/examples/birth_names.yaml", } expected_extra = { "engine_params": {}, "metadata_cache_timeout": {}, "metadata_params": {}, "schemas_allowed_for_csv_upload": [], } if backend() == "presto": expected_extra = {"engine_params": {"connect_args": {"poll_interval": 0.1}}} assert core_files.issubset(set(contents.keys())) if example_db.backend == "postgresql": ds_type = "TIMESTAMP WITHOUT TIME ZONE" elif example_db.backend == "hive": ds_type = "TIMESTAMP" elif example_db.backend == "presto": ds_type = "VARCHAR(255)" else: ds_type = "DATETIME" if example_db.backend == "mysql": big_int_type = "BIGINT(20)" else: big_int_type = "BIGINT" metadata = yaml.safe_load(contents["databases/examples.yaml"]) assert metadata == ( { "allow_csv_upload": True, "allow_ctas": True, "allow_cvas": True, "allow_run_async": False, "cache_timeout": None, "database_name": "examples", "expose_in_sqllab": True, "extra": expected_extra, "sqlalchemy_uri": example_db.sqlalchemy_uri, "uuid": str(example_db.uuid), "version": "1.0.0", } ) metadata = yaml.safe_load(contents["datasets/examples/birth_names.yaml"]) metadata.pop("uuid") metadata["columns"].sort(key=lambda x: x["column_name"]) expected_metadata = { "cache_timeout": None, "columns": [ { "column_name": "ds", "description": None, "expression": None, "filterable": True, "groupby": True, "is_active": True, "is_dttm": True, "python_date_format": None, "type": ds_type, "verbose_name": None, }, { "column_name": "gender", "description": None, "expression": None, "filterable": True, "groupby": True, "is_active": True, "is_dttm": False, "python_date_format": None, "type": "STRING" if example_db.backend == "hive" else "VARCHAR(16)", "verbose_name": None, }, { "column_name": "name", "description": None, "expression": None, "filterable": True, "groupby": True, "is_active": True, "is_dttm": False, "python_date_format": None, "type": "STRING" if example_db.backend == "hive" else "VARCHAR(255)", "verbose_name": None, }, { "column_name": "num", "description": None, "expression": None, "filterable": True, "groupby": True, "is_active": True, "is_dttm": False, "python_date_format": None, "type": big_int_type, "verbose_name": None, }, { "column_name": "num_california", "description": None, "expression": "CASE WHEN state = 'CA' THEN num ELSE 0 END", "filterable": True, "groupby": True, "is_active": True, "is_dttm": False, "python_date_format": None, "type": None, "verbose_name": None, }, { "column_name": "state", "description": None, "expression": None, "filterable": True, "groupby": True, "is_active": True, "is_dttm": False, "python_date_format": None, "type": "STRING" if example_db.backend == "hive" else "VARCHAR(10)", "verbose_name": None, }, { "column_name": "num_boys", "description": None, "expression": None, "filterable": True, "groupby": True, "is_active": True, "is_dttm": False, "python_date_format": None, "type": big_int_type, "verbose_name": None, }, { "column_name": "num_girls", "description": None, "expression": None, "filterable": True, "groupby": True, "is_active": True, "is_dttm": False, "python_date_format": None, "type": big_int_type, "verbose_name": None, }, ], "database_uuid": str(db_uuid), "default_endpoint": None, "description": "", "extra": None, "fetch_values_predicate": "123 = 123", "filter_select_enabled": True, "main_dttm_col": "ds", "metrics": [ { "d3format": None, "description": None, "expression": "COUNT(*)", "extra": None, "metric_name": "count", "metric_type": "count", "verbose_name": "COUNT(*)", "warning_text": None, }, { "d3format": None, "description": None, "expression": "SUM(num)", "extra": None, "metric_name": "sum__num", "metric_type": None, "verbose_name": None, "warning_text": None, }, ], "offset": 0, "params": None, "schema": None, "sql": None, "table_name": "birth_names", "template_params": None, "version": "1.0.0", } expected_metadata["columns"].sort(key=lambda x: x["column_name"]) assert metadata == expected_metadata @patch("superset.security.manager.g") def test_export_database_command_no_access(self, mock_g): """Test that users can't export databases they don't have access to""" mock_g.user = security_manager.find_user("gamma") example_db = get_example_database() command = ExportDatabasesCommand([example_db.id]) contents = command.run() with self.assertRaises(DatabaseNotFoundError): next(contents) @patch("superset.security.manager.g") def test_export_database_command_invalid_database(self, mock_g): """Test that an error is raised when exporting an invalid database""" mock_g.user = security_manager.find_user("admin") command = ExportDatabasesCommand([-1]) contents = command.run() with self.assertRaises(DatabaseNotFoundError): next(contents) @patch("superset.security.manager.g") def test_export_database_command_key_order(self, mock_g): """Test that they keys in the YAML have the same order as export_fields""" mock_g.user = security_manager.find_user("admin") example_db = get_example_database() command = ExportDatabasesCommand([example_db.id]) contents = dict(command.run()) metadata = yaml.safe_load(contents["databases/examples.yaml"]) assert list(metadata.keys()) == [ "database_name", "sqlalchemy_uri", "cache_timeout", "expose_in_sqllab", "allow_run_async", "allow_ctas", "allow_cvas", "allow_csv_upload", "extra", "uuid", "version", ] class TestImportDatabasesCommand(SupersetTestCase): def test_import_v1_database(self): """Test that a database can be imported""" contents = { "metadata.yaml": yaml.safe_dump(database_metadata_config), "databases/imported_database.yaml": yaml.safe_dump(database_config), } command = ImportDatabasesCommand(contents) command.run() database = ( db.session.query(Database).filter_by(uuid=database_config["uuid"]).one() ) assert database.allow_csv_upload assert database.allow_ctas assert database.allow_cvas assert not database.allow_run_async assert database.cache_timeout is None assert database.database_name == "imported_database" assert database.expose_in_sqllab assert database.extra == "{}" assert database.sqlalchemy_uri == "sqlite:///test.db" db.session.delete(database) db.session.commit() def test_import_v1_database_multiple(self): """Test that a database can be imported multiple times""" num_databases = db.session.query(Database).count() contents = { "databases/imported_database.yaml": yaml.safe_dump(database_config), "metadata.yaml": yaml.safe_dump(database_metadata_config), } command = ImportDatabasesCommand(contents, overwrite=True) # import twice command.run() command.run() database = ( db.session.query(Database).filter_by(uuid=database_config["uuid"]).one() ) assert database.allow_csv_upload # update allow_csv_upload to False new_config = database_config.copy() new_config["allow_csv_upload"] = False contents = { "databases/imported_database.yaml": yaml.safe_dump(new_config), "metadata.yaml": yaml.safe_dump(database_metadata_config), } command = ImportDatabasesCommand(contents, overwrite=True) command.run() database = ( db.session.query(Database).filter_by(uuid=database_config["uuid"]).one() ) assert not database.allow_csv_upload # test that only one database was created new_num_databases = db.session.query(Database).count() assert new_num_databases == num_databases + 1 db.session.delete(database) db.session.commit() def test_import_v1_database_with_dataset(self): """Test that a database can be imported with datasets""" contents = { "databases/imported_database.yaml": yaml.safe_dump(database_config), "datasets/imported_dataset.yaml": yaml.safe_dump(dataset_config), "metadata.yaml": yaml.safe_dump(database_metadata_config), } command = ImportDatabasesCommand(contents) command.run() database = ( db.session.query(Database).filter_by(uuid=database_config["uuid"]).one() ) assert len(database.tables) == 1 assert str(database.tables[0].uuid) == "10808100-158b-42c4-842e-f32b99d88dfb" db.session.delete(database.tables[0]) db.session.delete(database) db.session.commit() def test_import_v1_database_with_dataset_multiple(self): """Test that a database can be imported multiple times w/o changing datasets""" contents = { "databases/imported_database.yaml": yaml.safe_dump(database_config), "datasets/imported_dataset.yaml": yaml.safe_dump(dataset_config), "metadata.yaml": yaml.safe_dump(database_metadata_config), } command = ImportDatabasesCommand(contents) command.run() dataset = ( db.session.query(SqlaTable).filter_by(uuid=dataset_config["uuid"]).one() ) assert dataset.offset == 66 new_config = dataset_config.copy() new_config["offset"] = 67 contents = { "databases/imported_database.yaml": yaml.safe_dump(database_config), "datasets/imported_dataset.yaml": yaml.safe_dump(new_config), "metadata.yaml": yaml.safe_dump(database_metadata_config), } command = ImportDatabasesCommand(contents, overwrite=True) command.run() # the underlying dataset should not be modified by the second import, since # we're importing a database, not a dataset dataset = ( db.session.query(SqlaTable).filter_by(uuid=dataset_config["uuid"]).one() ) assert dataset.offset == 66 db.session.delete(dataset) db.session.delete(dataset.database) db.session.commit() def test_import_v1_database_validation(self): """Test different validations applied when importing a database""" # metadata.yaml must be present contents = { "databases/imported_database.yaml": yaml.safe_dump(database_config), } command = ImportDatabasesCommand(contents) with pytest.raises(IncorrectVersionError) as excinfo: command.run() assert str(excinfo.value) == "Missing metadata.yaml" # version should be 1.0.0 contents["metadata.yaml"] = yaml.safe_dump( { "version": "2.0.0", "type": "Database", "timestamp": "2020-11-04T21:27:44.423819+00:00", } ) command = ImportDatabasesCommand(contents) with pytest.raises(IncorrectVersionError) as excinfo: command.run() assert str(excinfo.value) == "Must be equal to 1.0.0." # type should be Database contents["metadata.yaml"] = yaml.safe_dump(dataset_metadata_config) command = ImportDatabasesCommand(contents) with pytest.raises(CommandInvalidError) as excinfo: command.run() assert str(excinfo.value) == "Error importing database" assert excinfo.value.normalized_messages() == { "metadata.yaml": {"type": ["Must be equal to Database."]} } # must also validate datasets broken_config = dataset_config.copy() del broken_config["table_name"] contents["metadata.yaml"] = yaml.safe_dump(database_metadata_config) contents["datasets/imported_dataset.yaml"] = yaml.safe_dump(broken_config) command = ImportDatabasesCommand(contents) with pytest.raises(CommandInvalidError) as excinfo: command.run() assert str(excinfo.value) == "Error importing database" assert excinfo.value.normalized_messages() == { "datasets/imported_dataset.yaml": { "table_name": ["Missing data for required field."], } } def test_import_v1_database_masked_password(self): """Test that database imports with masked passwords are rejected""" masked_database_config = database_config.copy() masked_database_config[ "sqlalchemy_uri" ] = "postgresql://username:XXXXXXXXXX@host:12345/db" contents = { "metadata.yaml": yaml.safe_dump(database_metadata_config), "databases/imported_database.yaml": yaml.safe_dump(masked_database_config), } command = ImportDatabasesCommand(contents) with pytest.raises(CommandInvalidError) as excinfo: command.run() assert str(excinfo.value) == "Error importing database" assert excinfo.value.normalized_messages() == { "databases/imported_database.yaml": { "_schema": ["Must provide a password for the database"] } } @patch("superset.databases.commands.importers.v1.import_dataset") def test_import_v1_rollback(self, mock_import_dataset): """Test than on an exception everything is rolled back""" num_databases = db.session.query(Database).count() # raise an exception when importing the dataset, after the database has # already been imported mock_import_dataset.side_effect = Exception("A wild exception appears!") contents = { "databases/imported_database.yaml": yaml.safe_dump(database_config), "datasets/imported_dataset.yaml": yaml.safe_dump(dataset_config), "metadata.yaml": yaml.safe_dump(database_metadata_config), } command = ImportDatabasesCommand(contents) with pytest.raises(Exception) as excinfo: command.run() assert str(excinfo.value) == "Import database failed for an unknown reason" # verify that the database was not added new_num_databases = db.session.query(Database).count() assert new_num_databases == num_databases class TestTestConnectionDatabaseCommand(SupersetTestCase): @mock.patch("superset.databases.dao.Database.get_sqla_engine") @mock.patch( "superset.databases.commands.test_connection.event_logger.log_with_context" ) def test_connection_db_exception(self, mock_event_logger, mock_get_sqla_engine): """Test to make sure event_logger is called when an exception is raised""" database = get_example_database() mock_get_sqla_engine.side_effect = Exception("An error has occurred!") db_uri = database.sqlalchemy_uri_decrypted json_payload = {"sqlalchemy_uri": db_uri} command_without_db_name = TestConnectionDatabaseCommand( security_manager.find_user("admin"), json_payload ) with pytest.raises(DatabaseTestConnectionUnexpectedError) as excinfo: command_without_db_name.run() assert str(excinfo.value) == ( "Unexpected error occurred, please check your logs for details" ) mock_event_logger.assert_called() @mock.patch("superset.databases.dao.Database.get_sqla_engine") @mock.patch( "superset.databases.commands.test_connection.event_logger.log_with_context" ) def test_connection_superset_security_connection( self, mock_event_logger, mock_get_sqla_engine ): """Test to make sure event_logger is called when security connection exc is raised""" database = get_example_database() mock_get_sqla_engine.side_effect = SupersetSecurityException( SupersetError(error_type=500, message="test", level="info", extra={}) ) db_uri = database.sqlalchemy_uri_decrypted json_payload = {"sqlalchemy_uri": db_uri} command_without_db_name = TestConnectionDatabaseCommand( security_manager.find_user("admin"), json_payload ) with pytest.raises(DatabaseSecurityUnsafeError) as excinfo: command_without_db_name.run() assert str(excinfo.value) == ("Stopped an unsafe database connection") mock_event_logger.assert_called() @mock.patch("superset.databases.dao.Database.get_sqla_engine") @mock.patch( "superset.databases.commands.test_connection.event_logger.log_with_context" ) def test_connection_db_api_exc(self, mock_event_logger, mock_get_sqla_engine): """Test to make sure event_logger is called when DBAPIError is raised""" database = get_example_database() mock_get_sqla_engine.side_effect = DBAPIError( statement="error", params={}, orig={} ) db_uri = database.sqlalchemy_uri_decrypted json_payload = {"sqlalchemy_uri": db_uri} command_without_db_name = TestConnectionDatabaseCommand( security_manager.find_user("admin"), json_payload ) with pytest.raises(DatabaseTestConnectionFailedError) as excinfo: command_without_db_name.run() assert str(excinfo.value) == ( "Connection failed, please check your connection settings" ) mock_event_logger.assert_called()