feat: auto sync table columns when change dataset (#15887)

* feat: auto sync dataset metadata when change dataset

* diablo sync button when edit mode

* handle undefine schema

* fix py UT

* fix FE UT

* improve test coverage

* fix UT
This commit is contained in:
Yongjie Zhao 2021-08-02 09:55:31 +01:00 committed by GitHub
parent 46188c1011
commit a786373fff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 261 additions and 82 deletions

View File

@ -40,8 +40,7 @@ const props = {
addDangerToast: () => {},
onChange: () => {},
};
const DATASOURCE_ENDPOINT = 'glob:*/datasource/external_metadata/*';
const DATASOURCE_ENDPOINT = 'glob:*/datasource/external_metadata_by_name/*';
describe('DatasourceEditor', () => {
const mockStore = configureStore([thunk]);

View File

@ -358,6 +358,9 @@ class DatasourceEditor extends React.PureComponent {
this.onChangeEditMode = this.onChangeEditMode.bind(this);
this.onDatasourcePropChange = this.onDatasourcePropChange.bind(this);
this.onDatasourceChange = this.onDatasourceChange.bind(this);
this.tableChangeAndSyncMetadata = this.tableChangeAndSyncMetadata.bind(
this,
);
this.syncMetadata = this.syncMetadata.bind(this);
this.setColumns = this.setColumns.bind(this);
this.validateAndChange = this.validateAndChange.bind(this);
@ -387,8 +390,8 @@ class DatasourceEditor extends React.PureComponent {
this.setState(prevState => ({ isEditMode: !prevState.isEditMode }));
}
onDatasourceChange(datasource) {
this.setState({ datasource }, this.validateAndChange);
onDatasourceChange(datasource, callback) {
this.setState({ datasource }, callback);
}
onDatasourcePropChange(attr, value) {
@ -397,7 +400,9 @@ class DatasourceEditor extends React.PureComponent {
prevState => ({
datasource: { ...prevState.datasource, [attr]: value },
}),
this.onDatasourceChange(datasource),
attr === 'table_name'
? this.onDatasourceChange(datasource, this.tableChangeAndSyncMetadata)
: this.onDatasourceChange(datasource, this.validateAndChange),
);
}
@ -414,6 +419,13 @@ class DatasourceEditor extends React.PureComponent {
this.validate(this.onChange);
}
tableChangeAndSyncMetadata() {
this.validate(() => {
this.syncMetadata();
this.onChange();
});
}
updateColumns(cols) {
const { databaseColumns } = this.state;
const databaseColumnNames = cols.map(col => col.name);
@ -473,9 +485,11 @@ class DatasourceEditor extends React.PureComponent {
syncMetadata() {
const { datasource } = this.state;
const endpoint = `/datasource/external_metadata/${
const endpoint = `/datasource/external_metadata_by_name/${
datasource.type || datasource.datasource_type
}/${datasource.id}/`;
}/${datasource.database.database_name}/${datasource.schema}/${
datasource.table_name
}/`;
this.setState({ metadataLoading: true });
SupersetClient.get({ endpoint })
@ -1081,6 +1095,7 @@ class DatasourceEditor extends React.PureComponent {
buttonStyle="tertiary"
onClick={this.syncMetadata}
className="sync-from-source"
disabled={this.state.isEditMode}
>
<i className="fa fa-database" />{' '}
{t('Sync columns from source')}

View File

@ -19,7 +19,6 @@ import json
import logging
import re
from collections import defaultdict, OrderedDict
from contextlib import closing
from dataclasses import dataclass, field # pylint: disable=wrong-import-order
from datetime import datetime, timedelta
from typing import (
@ -67,17 +66,15 @@ from sqlalchemy.sql import column, ColumnElement, literal_column, table, text
from sqlalchemy.sql.elements import ColumnClause
from sqlalchemy.sql.expression import Label, Select, TextAsFrom, TextClause
from sqlalchemy.sql.selectable import Alias, TableClause
from sqlalchemy.types import TypeEngine
from superset import app, db, is_feature_enabled, security_manager
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.db_engine_specs.base import BaseEngineSpec, TimestampExpression
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
QueryObjectValidationError,
SupersetGenericDBErrorException,
SupersetSecurityException,
from superset.connectors.sqla.utils import (
get_physical_table_metadata,
get_virtual_table_metadata,
)
from superset.db_engine_specs.base import BaseEngineSpec, TimestampExpression
from superset.exceptions import QueryObjectValidationError
from superset.jinja_context import (
BaseTemplateProcessor,
ExtraCache,
@ -86,7 +83,6 @@ from superset.jinja_context import (
from superset.models.annotations import Annotation
from superset.models.core import Database
from superset.models.helpers import AuditMixinNullable, QueryResult
from superset.result_set import SupersetResultSet
from superset.sql_parse import ParsedQuery
from superset.typing import AdhocMetric, Metric, OrderBy, QueryObjectDict
from superset.utils import core as utils
@ -652,72 +648,11 @@ class SqlaTable( # pylint: disable=too-many-public-methods,too-many-instance-at
return self.database.sql_url + "?table_name=" + str(self.table_name)
def external_metadata(self) -> List[Dict[str, str]]:
db_engine_spec = self.db_engine_spec
if self.sql:
engine = self.database.get_sqla_engine(schema=self.schema)
sql = self.get_template_processor().process_template(
self.sql, **self.template_params_dict
)
parsed_query = ParsedQuery(sql)
if not db_engine_spec.is_readonly_query(parsed_query):
raise SupersetSecurityException(
SupersetError(
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
message=_("Only `SELECT` statements are allowed"),
level=ErrorLevel.ERROR,
)
)
statements = parsed_query.get_statements()
if len(statements) > 1:
raise SupersetSecurityException(
SupersetError(
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
message=_("Only single queries supported"),
level=ErrorLevel.ERROR,
)
)
# TODO(villebro): refactor to use same code that's used by
# sql_lab.py:execute_sql_statements
try:
with closing(engine.raw_connection()) as conn:
cursor = conn.cursor()
query = self.database.apply_limit_to_sql(statements[0])
db_engine_spec.execute(cursor, query)
result = db_engine_spec.fetch_data(cursor, limit=1)
result_set = SupersetResultSet(
result, cursor.description, db_engine_spec
)
cols = result_set.columns
except Exception as exc:
raise SupersetGenericDBErrorException(message=str(exc))
else:
db_dialect = self.database.get_dialect()
cols = self.database.get_columns(
self.table_name, schema=self.schema or None
)
for col in cols:
try:
if isinstance(col["type"], TypeEngine):
db_type = db_engine_spec.column_datatype_to_string(
col["type"], db_dialect
)
type_spec = db_engine_spec.get_column_spec(db_type)
col.update(
{
"type": db_type,
"type_generic": type_spec.generic_type
if type_spec
else None,
"is_dttm": type_spec.is_dttm if type_spec else None,
}
)
# Broad exception catch, because there are multiple possible exceptions
# from different drivers that fall outside CompileError
except Exception: # pylint: disable=broad-except
col.update(
{"type": "UNKNOWN", "generic_type": None, "is_dttm": None,}
)
return cols
return get_virtual_table_metadata(dataset=self)
return get_physical_table_metadata(
database=self.database, table_name=self.table_name, schema_name=self.schema,
)
@property
def time_column_grains(self) -> Dict[str, Any]:

View File

@ -0,0 +1,110 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from contextlib import closing
from typing import Dict, List, Optional, TYPE_CHECKING
from flask_babel import lazy_gettext as _
from sqlalchemy.sql.type_api import TypeEngine
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
SupersetGenericDBErrorException,
SupersetSecurityException,
)
from superset.models.core import Database
from superset.result_set import SupersetResultSet
from superset.sql_parse import ParsedQuery
if TYPE_CHECKING:
from superset.connectors.sqla.models import SqlaTable
def get_physical_table_metadata(
database: Database, table_name: str, schema_name: Optional[str] = None,
) -> List[Dict[str, str]]:
"""Use SQLAlchemy inspector to get table metadata"""
db_engine_spec = database.db_engine_spec
db_dialect = database.get_dialect()
# ensure empty schema
_schema_name = schema_name if schema_name else None
cols = database.get_columns(table_name, schema=_schema_name)
for col in cols:
try:
if isinstance(col["type"], TypeEngine):
db_type = db_engine_spec.column_datatype_to_string(
col["type"], db_dialect
)
type_spec = db_engine_spec.get_column_spec(db_type)
col.update(
{
"type": db_type,
"type_generic": type_spec.generic_type if type_spec else None,
"is_dttm": type_spec.is_dttm if type_spec else None,
}
)
# Broad exception catch, because there are multiple possible exceptions
# from different drivers that fall outside CompileError
except Exception: # pylint: disable=broad-except
col.update(
{"type": "UNKNOWN", "generic_type": None, "is_dttm": None,}
)
return cols
def get_virtual_table_metadata(dataset: "SqlaTable") -> List[Dict[str, str]]:
"""Use SQLparser to get virtual dataset metadata"""
if not dataset.sql:
raise SupersetGenericDBErrorException(
message=_("Virtual dataset query cannot be empty"),
)
db_engine_spec = dataset.database.db_engine_spec
engine = dataset.database.get_sqla_engine(schema=dataset.schema)
sql = dataset.get_template_processor().process_template(
dataset.sql, **dataset.template_params_dict
)
parsed_query = ParsedQuery(sql)
if not db_engine_spec.is_readonly_query(parsed_query):
raise SupersetSecurityException(
SupersetError(
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
message=_("Only `SELECT` statements are allowed"),
level=ErrorLevel.ERROR,
)
)
statements = parsed_query.get_statements()
if len(statements) > 1:
raise SupersetSecurityException(
SupersetError(
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
message=_("Only single queries supported"),
level=ErrorLevel.ERROR,
)
)
# TODO(villebro): refactor to use same code that's used by
# sql_lab.py:execute_sql_statements
try:
with closing(engine.raw_connection()) as conn:
cursor = conn.cursor()
query = dataset.database.apply_limit_to_sql(statements[0])
db_engine_spec.execute(cursor, query)
result = db_engine_spec.fetch_data(cursor, limit=1)
result_set = SupersetResultSet(result, cursor.description, db_engine_spec)
cols = result_set.columns
except Exception as exc:
raise SupersetGenericDBErrorException(message=str(exc))
return cols

View File

@ -179,6 +179,7 @@ class SupersetSecurityManager( # pylint: disable=too-many-public-methods
"can_list",
"can_get",
"can_external_metadata",
"can_external_metadata_by_name",
"can_read",
}

View File

@ -24,11 +24,14 @@ from flask_babel import _
from superset import app, db, event_logger
from superset.connectors.connector_registry import ConnectorRegistry
from superset.connectors.sqla.utils import get_physical_table_metadata
from superset.datasets.commands.exceptions import DatasetForbiddenError
from superset.exceptions import SupersetException, SupersetSecurityException
from superset.models.core import Database
from superset.typing import FlaskResponse
from superset.views.base import check_ownership
from ..utils.core import parse_js_uri_path_item
from .base import api, BaseSupersetView, handle_api_exception, json_error_response
@ -118,3 +121,46 @@ class Datasource(BaseSupersetView):
except SupersetException as ex:
return json_error_response(str(ex), status=400)
return self.json_response(external_metadata)
@expose(
"/external_metadata_by_name/<datasource_type>/<database_name>/"
"<schema_name>/<table_name>/"
)
@has_access_api
@api
@handle_api_exception
def external_metadata_by_name(
self,
datasource_type: str,
database_name: str,
schema_name: str,
table_name: str,
) -> FlaskResponse:
"""Gets table metadata from the source system and SQLAlchemy inspector"""
database_name = parse_js_uri_path_item(database_name) or ""
schema_name = parse_js_uri_path_item(schema_name, eval_undefined=True) or ""
table_name = parse_js_uri_path_item(table_name) or ""
datasource = ConnectorRegistry.get_datasource_by_name(
session=db.session,
datasource_type=datasource_type,
database_name=database_name,
schema=schema_name,
datasource_name=table_name,
)
try:
if datasource is not None:
external_metadata = datasource.external_metadata()
else:
# Use the SQLAlchemy inspector to get columns
database = (
db.session.query(Database)
.filter_by(database_name=database_name)
.one()
)
external_metadata = get_physical_table_metadata(
database=database, table_name=table_name, schema_name=schema_name,
)
except SupersetException as ex:
return json_error_response(str(ex), status=400)
return self.json_response(external_metadata)

View File

@ -16,6 +16,7 @@
# under the License.
"""Unit tests for Superset"""
import json
from contextlib import contextmanager
from copy import deepcopy
from unittest import mock
@ -24,7 +25,8 @@ import pytest
from superset import app, ConnectorRegistry, db
from superset.connectors.sqla.models import SqlaTable
from superset.datasets.commands.exceptions import DatasetNotFoundError
from superset.exceptions import SupersetException, SupersetGenericDBErrorException
from superset.exceptions import SupersetGenericDBErrorException
from superset.models.core import Database
from superset.utils.core import get_example_database
from tests.integration_tests.fixtures.birth_names_dashboard import (
load_birth_names_dashboard_with_slices,
@ -34,6 +36,22 @@ from .base_tests import db_insert_temp_object, SupersetTestCase
from .fixtures.datasource import datasource_post
@contextmanager
def create_test_table_context(database: Database):
database.get_sqla_engine().execute(
"CREATE TABLE test_table AS SELECT 1 as first, 2 as second"
)
database.get_sqla_engine().execute(
"INSERT INTO test_table (first, second) VALUES (1, 2)"
)
database.get_sqla_engine().execute(
"INSERT INTO test_table (first, second) VALUES (3, 4)"
)
yield db.session
database.get_sqla_engine().execute("DROP TABLE test_table")
class TestDatasource(SupersetTestCase):
def setUp(self):
self.original_attrs = {}
@ -75,6 +93,61 @@ class TestDatasource(SupersetTestCase):
session.delete(table)
session.commit()
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
def test_external_metadata_by_name_for_physical_table(self):
self.login(username="admin")
tbl = self.get_table_by_name("birth_names")
# empty schema need to be represented by undefined
url = (
f"/datasource/external_metadata_by_name/table/"
f"{tbl.database.database_name}/undefined/{tbl.table_name}/"
)
resp = self.get_json_resp(url)
col_names = {o.get("name") for o in resp}
self.assertEqual(
col_names, {"num_boys", "num", "gender", "name", "ds", "state", "num_girls"}
)
def test_external_metadata_by_name_for_virtual_table(self):
self.login(username="admin")
session = db.session
table = SqlaTable(
table_name="dummy_sql_table",
database=get_example_database(),
sql="select 123 as intcol, 'abc' as strcol",
)
session.add(table)
session.commit()
table = self.get_table_by_name("dummy_sql_table")
# empty schema need to be represented by undefined
url = (
f"/datasource/external_metadata_by_name/table/"
f"{table.database.database_name}/undefined/{table.table_name}/"
)
resp = self.get_json_resp(url)
assert {o.get("name") for o in resp} == {"intcol", "strcol"}
session.delete(table)
session.commit()
def test_external_metadata_by_name_from_sqla_inspector(self):
self.login(username="admin")
example_database = get_example_database()
with create_test_table_context(example_database):
url = (
f"/datasource/external_metadata_by_name/table/"
f"{example_database.database_name}/undefined/test_table/"
)
resp = self.get_json_resp(url)
col_names = {o.get("name") for o in resp}
self.assertEqual(col_names, {"first", "second"})
url = (
f"/datasource/external_metadata_by_name/table/" f"foobar/undefined/foobar/"
)
resp = self.get_json_resp(url, raise_on_error=False)
self.assertIn("error", resp)
def test_external_metadata_for_virtual_table_template_params(self):
self.login(username="admin")
session = db.session