fix: table schema permissions (#23356)

This commit is contained in:
Beto Dealmeida 2023-03-14 15:18:18 -07:00 committed by GitHub
parent d2c1fb95ec
commit 1b95da7487
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 148 additions and 10 deletions

View File

@ -370,6 +370,10 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
# if True, database will be listed as option in the upload file form
supports_file_upload = True
# Is the DB engine spec able to change the default schema? This requires implementing
# a custom `adjust_database_uri` method.
dynamic_schema = False
@classmethod
def supports_url(cls, url: URL) -> bool:
"""

View File

@ -32,6 +32,8 @@ class DrillEngineSpec(BaseEngineSpec):
engine_name = "Apache Drill"
default_driver = "sadrill"
dynamic_schema = True
_time_grain_expressions = {
None: "{col}",
"PT1S": "NEARESTDATE({col}, 'SECOND')",

View File

@ -98,6 +98,8 @@ class HiveEngineSpec(PrestoEngineSpec):
allows_alias_to_source_column = True
allows_hidden_orderby_agg = False
dynamic_schema = True
# When running `SHOW FUNCTIONS`, what is the name of the column with the
# function names?
_show_functions_column = "tab_name"

View File

@ -69,6 +69,8 @@ class MySQLEngineSpec(BaseEngineSpec, BasicParametersMixin):
)
encryption_parameters = {"ssl": "1"}
dynamic_schema = True
column_type_mappings = (
(
re.compile(r"^int.*", re.IGNORECASE),

View File

@ -165,6 +165,8 @@ class PrestoBaseEngineSpec(BaseEngineSpec, metaclass=ABCMeta):
A base class that share common functions between Presto and Trino
"""
dynamic_schema = True
column_type_mappings = (
(
re.compile(r"^boolean.*", re.IGNORECASE),

View File

@ -83,6 +83,8 @@ class SnowflakeEngineSpec(PostgresBaseEngineSpec):
default_driver = "snowflake"
sqlalchemy_uri_placeholder = "snowflake://"
dynamic_schema = True
_time_grain_expressions = {
None: "{col}",
"PT1S": "DATE_TRUNC('SECOND', {col})",

View File

@ -1787,7 +1787,7 @@ class SupersetSecurityManager( # pylint: disable=too-many-public-methods
return []
def raise_for_access(
# pylint: disable=too-many-arguments,too-many-locals
# pylint: disable=too-many-arguments, too-many-locals, too-many-branches
self,
database: Optional["Database"] = None,
datasource: Optional["BaseDatasource"] = None,
@ -1823,8 +1823,20 @@ class SupersetSecurityManager( # pylint: disable=too-many-public-methods
return
if query:
# Some databases can change the default schema in which the query wil run,
# respecting the selection in SQL Lab. If that's the case, the query
# schema becomes the default one.
if database.db_engine_spec.dynamic_schema:
default_schema = query.schema
# For other databases, the selected schema in SQL Lab is used only for
# table discovery and autocomplete. In this case we need to use the
# database default schema for tables that don't have an explicit schema.
else:
with database.get_inspector_with_context() as inspector:
default_schema = inspector.default_schema_name
tables = {
Table(table_.table, table_.schema or query.schema)
Table(table_.table, table_.schema or default_schema)
for table_ in sql_parse.ParsedQuery(query.sql).tables
}
elif table:

View File

@ -274,15 +274,20 @@ def test_query_no_access(mocker: MockFixture, client) -> None:
from superset.models.core import Database
from superset.models.sql_lab import Query
inspect = mocker.patch("superset.security.manager.inspect")
inspect().default_schema_name = "public"
database = mocker.MagicMock()
mocker.patch(
query_find_by_id,
return_value=Query(database=database, sql="select * from foo"),
)
mocker.patch(query_datasources_by_name, return_value=[SqlaTable()])
mocker.patch(is_admin, return_value=False)
mocker.patch(is_owner, return_value=False)
mocker.patch(can_access, return_value=False)
with raises(SupersetSecurityException):
mocker.patch(
query_find_by_id,
return_value=Query(database=Database(), sql="select * from foo"),
)
mocker.patch(query_datasources_by_name, return_value=[SqlaTable()])
mocker.patch(is_admin, return_value=False)
mocker.patch(is_owner, return_value=False)
mocker.patch(can_access, return_value=False)
check_datasource_access(
datasource_id=1,
datasource_type=DatasourceType.QUERY,

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,91 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from pytest_mock import MockFixture
from superset.exceptions import SupersetSecurityException
from superset.extensions import appbuilder
from superset.security.manager import SupersetSecurityManager
def test_security_manager(app_context: None) -> None:
"""
Test that the security manager can be built.
"""
sm = SupersetSecurityManager(appbuilder)
assert sm
def test_raise_for_access_query_default_schema(
mocker: MockFixture,
app_context: None,
) -> None:
"""
Test that the DB default schema is used in non-qualified table names.
For example, in Postgres, for the following query:
> SELECT * FROM foo;
We should check that the user has access to the `public` schema, regardless of the
schema set in the query.
"""
sm = SupersetSecurityManager(appbuilder)
mocker.patch.object(sm, "can_access_database", return_value=False)
mocker.patch.object(sm, "get_schema_perm", return_value="[PostgreSQL].[public]")
SqlaTable = mocker.patch("superset.connectors.sqla.models.SqlaTable")
SqlaTable.query_datasources_by_name.return_value = []
database = mocker.MagicMock()
database.db_engine_spec.dynamic_schema = False
database.get_inspector_with_context().__enter__().default_schema_name = "public"
query = mocker.MagicMock()
query.database = database
query.sql = "SELECT * FROM ab_user"
# user has access to `public` schema
mocker.patch.object(sm, "can_access", return_value=True)
assert (
sm.raise_for_access( # type: ignore
database=None,
datasource=None,
query=query,
query_context=None,
table=None,
viz=None,
)
is None
)
sm.can_access.assert_called_with("schema_access", "[PostgreSQL].[public]") # type: ignore
# user has only access to `secret` schema
mocker.patch.object(sm, "can_access", return_value=False)
with pytest.raises(SupersetSecurityException) as excinfo:
sm.raise_for_access(
database=None,
datasource=None,
query=query,
query_context=None,
table=None,
viz=None,
)
assert (
str(excinfo.value)
== """You need access to the following tables: `public.ab_user`,
`all_database_access` or `all_datasource_access` permission"""
)