feat: new dataset/table/column models (#17543)

* feat: new models for SIP-68

* feat: new dataset models DB migration

* Add shadow write (WIP)

* Physical dataset shadow write (WIP)

* Virtual dataset shadow write (WIP)

* Update migration to populate models

* Cascade delete columns

* after_delete

* Update hook

* Add metric test

* Do not rename tables

* Small fixes

* Fix migration

* Fix tests

* Fix more tests

* Even more tests

* Addressing comments (WIP)

* Fix migration

* Rebase and update head

* Fix unit tests

* Add external management cols

* Small fixes
This commit is contained in:
Beto Dealmeida 2022-02-24 11:02:01 -08:00 committed by GitHub
parent 9fd18e98ac
commit 00c99c91ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 3013 additions and 169 deletions

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,99 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Column model.
This model was introduced in SIP-68 (https://github.com/apache/superset/issues/14909),
and represents a "column" in a table or dataset. In addition to a column, new models for
tables, metrics, and datasets were also introduced.
These models are not fully implemented, and shouldn't be used yet.
"""
import sqlalchemy as sa
from flask_appbuilder import Model
from superset.models.helpers import (
AuditMixinNullable,
ExtraJSONMixin,
ImportExportMixin,
)
class Column(
Model, AuditMixinNullable, ExtraJSONMixin, ImportExportMixin,
):
"""
A "column".
The definition of column here is overloaded: it can represent a physical column in a
database relation (table or view); a computed/derived column on a dataset; or an
aggregation expression representing a metric.
"""
__tablename__ = "sl_columns"
id = sa.Column(sa.Integer, primary_key=True)
# We use ``sa.Text`` for these attributes because (1) in modern databases the
# performance is the same as ``VARCHAR``[1] and (2) because some table names can be
# **really** long (eg, Google Sheets URLs).
#
# [1] https://www.postgresql.org/docs/9.1/datatype-character.html
name = sa.Column(sa.Text)
type = sa.Column(sa.Text)
# Columns are defined by expressions. For tables, these are the actual columns names,
# and should match the ``name`` attribute. For datasets, these can be any valid SQL
# expression. If the SQL expression is an aggregation the column is a metric,
# otherwise it's a computed column.
expression = sa.Column(sa.Text)
# Does the expression point directly to a physical column?
is_physical = sa.Column(sa.Boolean, default=True)
# Additional metadata describing the column.
description = sa.Column(sa.Text)
warning_text = sa.Column(sa.Text)
unit = sa.Column(sa.Text)
# Is this a time column? Useful for plotting time series.
is_temporal = sa.Column(sa.Boolean, default=False)
# Is this a spatial column? This could be leveraged in the future for spatial
# visualizations.
is_spatial = sa.Column(sa.Boolean, default=False)
# Is this column a partition? Useful for scheduling queries and previewing the latest
# data.
is_partition = sa.Column(sa.Boolean, default=False)
# Is this column an aggregation (metric)?
is_aggregation = sa.Column(sa.Boolean, default=False)
# Assuming the column is an aggregation, is it additive? Useful for determining which
# aggregations can be done on the metric. Eg, ``COUNT(DISTINCT user_id)`` is not
# additive, so it shouldn't be used in a ``SUM``.
is_additive = sa.Column(sa.Boolean, default=False)
# Is an increase desired? Useful for displaying the results of A/B tests, or setting
# up alerts. Eg, this is true for "revenue", but false for "latency".
is_increase_desired = sa.Column(sa.Boolean, default=True)
# Column is managed externally and should be read-only inside Superset
is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
external_url = sa.Column(sa.Text, nullable=True)

View File

@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Schema for the column model.
This model was introduced in SIP-68 (https://github.com/apache/superset/issues/14909),
and represents a "column" in a table or dataset. In addition to a column, new models for
tables, metrics, and datasets were also introduced.
These models are not fully implemented, and shouldn't be used yet.
"""
from marshmallow_sqlalchemy import SQLAlchemyAutoSchema
from superset.columns.models import Column
class ColumnSchema(SQLAlchemyAutoSchema):
"""
Schema for the ``Column`` model.
"""
class Meta: # pylint: disable=too-few-public-methods
model = Column
load_instance = True
include_relationships = True

View File

@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-many-lines
# pylint: disable=too-many-lines, redefined-outer-name
import dataclasses
import json
import logging
@ -53,6 +53,7 @@ from sqlalchemy import (
desc,
Enum,
ForeignKey,
inspect,
Integer,
or_,
select,
@ -71,12 +72,14 @@ from sqlalchemy.sql.expression import Label, Select, TextAsFrom
from sqlalchemy.sql.selectable import Alias, TableClause
from superset import app, db, is_feature_enabled, security_manager
from superset.columns.models import Column as NewColumn
from superset.common.db_query_status import QueryStatus
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.connectors.sqla.utils import (
get_physical_table_metadata,
get_virtual_table_metadata,
)
from superset.datasets.models import Dataset as NewDataset
from superset.db_engine_specs.base import BaseEngineSpec, CTE_ALIAS, TimestampExpression
from superset.exceptions import QueryObjectValidationError
from superset.jinja_context import (
@ -86,8 +89,14 @@ from superset.jinja_context import (
)
from superset.models.annotations import Annotation
from superset.models.core import Database
from superset.models.helpers import AuditMixinNullable, CertificationMixin, QueryResult
from superset.models.helpers import (
AuditMixinNullable,
CertificationMixin,
clone_model,
QueryResult,
)
from superset.sql_parse import ParsedQuery
from superset.tables.models import Table as NewTable
from superset.typing import AdhocColumn, AdhocMetric, Metric, OrderBy, QueryObjectDict
from superset.utils import core as utils
from superset.utils.core import (
@ -104,6 +113,13 @@ logger = logging.getLogger(__name__)
VIRTUAL_TABLE_ALIAS = "virtual_table"
# a non-exhaustive set of additive metrics
ADDITIVE_METRIC_TYPES = {
"count",
"sum",
"doubleSum",
}
class SqlaQuery(NamedTuple):
applied_template_filters: List[str]
@ -1830,23 +1846,474 @@ class SqlaTable(Model, BaseDatasource): # pylint: disable=too-many-public-metho
raise Exception(get_dataset_exist_error_msg(target.full_name))
@staticmethod
def update_table(
_mapper: Mapper, _connection: Connection, obj: Union[SqlMetric, TableColumn]
def update_table( # pylint: disable=unused-argument
mapper: Mapper, connection: Connection, target: Union[SqlMetric, TableColumn]
) -> None:
"""
Forces an update to the table's changed_on value when a metric or column on the
table is updated. This busts the cache key for all charts that use the table.
:param _mapper: Unused.
:param _connection: Unused.
:param obj: The metric or column that was updated.
:param mapper: Unused.
:param connection: Unused.
:param target: The metric or column that was updated.
"""
db.session.execute(update(SqlaTable).where(SqlaTable.id == obj.table.id))
inspector = inspect(target)
session = inspector.session
# get DB-specific conditional quoter for expressions that point to columns or
# table names
database = (
target.table.database
or session.query(Database).filter_by(id=target.database_id).one()
)
engine = database.get_sqla_engine(schema=target.table.schema)
conditional_quote = engine.dialect.identifier_preparer.quote
session.execute(update(SqlaTable).where(SqlaTable.id == target.table.id))
# update ``Column`` model as well
dataset = (
session.query(NewDataset).filter_by(sqlatable_id=target.table.id).one()
)
if isinstance(target, TableColumn):
columns = [
column
for column in dataset.columns
if column.name == target.column_name
]
if not columns:
return
column = columns[0]
extra_json = json.loads(target.extra or "{}")
for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}:
value = getattr(target, attr)
if value:
extra_json[attr] = value
column.name = target.column_name
column.type = target.type or "Unknown"
column.expression = target.expression or conditional_quote(
target.column_name
)
column.description = target.description
column.is_temporal = target.is_dttm
column.is_physical = target.expression is None
column.extra_json = json.dumps(extra_json) if extra_json else None
else: # SqlMetric
columns = [
column
for column in dataset.columns
if column.name == target.metric_name
]
if not columns:
return
column = columns[0]
extra_json = json.loads(target.extra or "{}")
for attr in {"verbose_name", "metric_type", "d3format"}:
value = getattr(target, attr)
if value:
extra_json[attr] = value
is_additive = (
target.metric_type
and target.metric_type.lower() in ADDITIVE_METRIC_TYPES
)
column.name = target.metric_name
column.expression = target.expression
column.warning_text = target.warning_text
column.description = target.description
column.is_additive = is_additive
column.extra_json = json.dumps(extra_json) if extra_json else None
@staticmethod
def after_insert( # pylint: disable=too-many-locals
mapper: Mapper, connection: Connection, target: "SqlaTable",
) -> None:
"""
Shadow write the dataset to new models.
The ``SqlaTable`` model is currently being migrated to two new models, ``Table``
and ``Dataset``. In the first phase of the migration the new models are populated
whenever ``SqlaTable`` is modified (created, updated, or deleted).
In the second phase of the migration reads will be done from the new models.
Finally, in the third phase of the migration the old models will be removed.
For more context: https://github.com/apache/superset/issues/14909
"""
# set permissions
security_manager.set_perm(mapper, connection, target)
session = inspect(target).session
# get DB-specific conditional quoter for expressions that point to columns or
# table names
database = (
target.database
or session.query(Database).filter_by(id=target.database_id).one()
)
engine = database.get_sqla_engine(schema=target.schema)
conditional_quote = engine.dialect.identifier_preparer.quote
# create columns
columns = []
for column in target.columns:
# ``is_active`` might be ``None`` at this point, but it defaults to ``True``.
if column.is_active is False:
continue
extra_json = json.loads(column.extra or "{}")
for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}:
value = getattr(column, attr)
if value:
extra_json[attr] = value
columns.append(
NewColumn(
name=column.column_name,
type=column.type or "Unknown",
expression=column.expression
or conditional_quote(column.column_name),
description=column.description,
is_temporal=column.is_dttm,
is_aggregation=False,
is_physical=column.expression is None,
is_spatial=False,
is_partition=False,
is_increase_desired=True,
extra_json=json.dumps(extra_json) if extra_json else None,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
),
)
# create metrics
for metric in target.metrics:
extra_json = json.loads(metric.extra or "{}")
for attr in {"verbose_name", "metric_type", "d3format"}:
value = getattr(metric, attr)
if value:
extra_json[attr] = value
is_additive = (
metric.metric_type
and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES
)
columns.append(
NewColumn(
name=metric.metric_name,
type="Unknown", # figuring this out would require a type inferrer
expression=metric.expression,
warning_text=metric.warning_text,
description=metric.description,
is_aggregation=True,
is_additive=is_additive,
is_physical=False,
is_spatial=False,
is_partition=False,
is_increase_desired=True,
extra_json=json.dumps(extra_json) if extra_json else None,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
),
)
# physical dataset
tables = []
if target.sql is None:
physical_columns = [column for column in columns if column.is_physical]
# create table
table = NewTable(
name=target.table_name,
schema=target.schema,
catalog=None, # currently not supported
database_id=target.database_id,
columns=physical_columns,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
)
tables.append(table)
# virtual dataset
else:
# mark all columns as virtual (not physical)
for column in columns:
column.is_physical = False
# find referenced tables
parsed = ParsedQuery(target.sql)
referenced_tables = parsed.tables
# predicate for finding the referenced tables
predicate = or_(
*[
and_(
NewTable.schema == (table.schema or target.schema),
NewTable.name == table.table,
)
for table in referenced_tables
]
)
tables = session.query(NewTable).filter(predicate).all()
# create the new dataset
dataset = NewDataset(
sqlatable_id=target.id,
name=target.table_name,
expression=target.sql or conditional_quote(target.table_name),
tables=tables,
columns=columns,
is_physical=target.sql is None,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
)
session.add(dataset)
@staticmethod
def after_delete( # pylint: disable=unused-argument
mapper: Mapper, connection: Connection, target: "SqlaTable",
) -> None:
"""
Shadow write the dataset to new models.
The ``SqlaTable`` model is currently being migrated to two new models, ``Table``
and ``Dataset``. In the first phase of the migration the new models are populated
whenever ``SqlaTable`` is modified (created, updated, or deleted).
In the second phase of the migration reads will be done from the new models.
Finally, in the third phase of the migration the old models will be removed.
For more context: https://github.com/apache/superset/issues/14909
"""
session = inspect(target).session
dataset = (
session.query(NewDataset).filter_by(sqlatable_id=target.id).one_or_none()
)
if dataset:
session.delete(dataset)
@staticmethod
def after_update( # pylint: disable=too-many-branches, too-many-locals, too-many-statements
mapper: Mapper, connection: Connection, target: "SqlaTable",
) -> None:
"""
Shadow write the dataset to new models.
The ``SqlaTable`` model is currently being migrated to two new models, ``Table``
and ``Dataset``. In the first phase of the migration the new models are populated
whenever ``SqlaTable`` is modified (created, updated, or deleted).
In the second phase of the migration reads will be done from the new models.
Finally, in the third phase of the migration the old models will be removed.
For more context: https://github.com/apache/superset/issues/14909
"""
inspector = inspect(target)
session = inspector.session
# double-check that ``UPDATE``s are actually pending (this method is called even
# for instances that have no net changes to their column-based attributes)
if not session.is_modified(target, include_collections=True):
return
# set permissions
security_manager.set_perm(mapper, connection, target)
dataset = (
session.query(NewDataset).filter_by(sqlatable_id=target.id).one_or_none()
)
if not dataset:
return
# get DB-specific conditional quoter for expressions that point to columns or
# table names
database = (
target.database
or session.query(Database).filter_by(id=target.database_id).one()
)
engine = database.get_sqla_engine(schema=target.schema)
conditional_quote = engine.dialect.identifier_preparer.quote
# update columns
if inspector.attrs.columns.history.has_changes():
# handle deleted columns
if inspector.attrs.columns.history.deleted:
column_names = {
column.column_name
for column in inspector.attrs.columns.history.deleted
}
dataset.columns = [
column
for column in dataset.columns
if column.name not in column_names
]
# handle inserted columns
for column in inspector.attrs.columns.history.added:
# ``is_active`` might be ``None``, but it defaults to ``True``.
if column.is_active is False:
continue
extra_json = json.loads(column.extra or "{}")
for attr in {
"groupby",
"filterable",
"verbose_name",
"python_date_format",
}:
value = getattr(column, attr)
if value:
extra_json[attr] = value
dataset.columns.append(
NewColumn(
name=column.column_name,
type=column.type or "Unknown",
expression=column.expression
or conditional_quote(column.column_name),
description=column.description,
is_temporal=column.is_dttm,
is_aggregation=False,
is_physical=column.expression is None,
is_spatial=False,
is_partition=False,
is_increase_desired=True,
extra_json=json.dumps(extra_json) if extra_json else None,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
)
)
# update metrics
if inspector.attrs.metrics.history.has_changes():
# handle deleted metrics
if inspector.attrs.metrics.history.deleted:
column_names = {
metric.metric_name
for metric in inspector.attrs.metrics.history.deleted
}
dataset.columns = [
column
for column in dataset.columns
if column.name not in column_names
]
# handle inserted metrics
for metric in inspector.attrs.metrics.history.added:
extra_json = json.loads(metric.extra or "{}")
for attr in {"verbose_name", "metric_type", "d3format"}:
value = getattr(metric, attr)
if value:
extra_json[attr] = value
is_additive = (
metric.metric_type
and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES
)
dataset.columns.append(
NewColumn(
name=metric.metric_name,
type="Unknown",
expression=metric.expression,
warning_text=metric.warning_text,
description=metric.description,
is_aggregation=True,
is_additive=is_additive,
is_physical=False,
is_spatial=False,
is_partition=False,
is_increase_desired=True,
extra_json=json.dumps(extra_json) if extra_json else None,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
)
)
# physical dataset
if target.sql is None:
physical_columns = [
column for column in dataset.columns if column.is_physical
]
# if the table name changed we should create a new table instance, instead
# of reusing the original one
if (
inspector.attrs.table_name.history.has_changes()
or inspector.attrs.schema.history.has_changes()
or inspector.attrs.database_id.history.has_changes()
):
# does the dataset point to an existing table?
table = (
session.query(NewTable)
.filter_by(
database_id=target.database_id,
schema=target.schema,
name=target.table_name,
)
.first()
)
if not table:
# create new columns
physical_columns = [
clone_model(column, ignore=["uuid"])
for column in physical_columns
]
# create new table
table = NewTable(
name=target.table_name,
schema=target.schema,
catalog=None,
database_id=target.database_id,
columns=physical_columns,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
)
dataset.tables = [table]
elif dataset.tables:
table = dataset.tables[0]
table.columns = physical_columns
# virtual dataset
else:
# mark all columns as virtual (not physical)
for column in dataset.columns:
column.is_physical = False
# update referenced tables if SQL changed
if inspector.attrs.sql.history.has_changes():
parsed = ParsedQuery(target.sql)
referenced_tables = parsed.tables
predicate = or_(
*[
and_(
NewTable.schema == (table.schema or target.schema),
NewTable.name == table.table,
)
for table in referenced_tables
]
)
dataset.tables = session.query(NewTable).filter(predicate).all()
# update other attributes
dataset.name = target.table_name
dataset.expression = target.sql or conditional_quote(target.table_name)
dataset.is_physical = target.sql is None
sa.event.listen(SqlaTable, "after_insert", security_manager.set_perm)
sa.event.listen(SqlaTable, "after_update", security_manager.set_perm)
sa.event.listen(SqlaTable, "before_update", SqlaTable.before_update)
sa.event.listen(SqlaTable, "after_insert", SqlaTable.after_insert)
sa.event.listen(SqlaTable, "after_delete", SqlaTable.after_delete)
sa.event.listen(SqlaTable, "after_update", SqlaTable.after_update)
sa.event.listen(SqlMetric, "after_update", SqlaTable.update_table)
sa.event.listen(TableColumn, "after_update", SqlaTable.update_table)

View File

@ -0,0 +1,92 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Dataset model.
This model was introduced in SIP-68 (https://github.com/apache/superset/issues/14909),
and represents a "dataset" -- either a physical table or a virtual. In addition to a
dataset, new models for columns, metrics, and tables were also introduced.
These models are not fully implemented, and shouldn't be used yet.
"""
from typing import List
import sqlalchemy as sa
from flask_appbuilder import Model
from sqlalchemy.orm import relationship
from superset.columns.models import Column
from superset.models.helpers import (
AuditMixinNullable,
ExtraJSONMixin,
ImportExportMixin,
)
from superset.tables.models import Table
column_association_table = sa.Table(
"sl_dataset_columns",
Model.metadata, # pylint: disable=no-member
sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id")),
sa.Column("column_id", sa.ForeignKey("sl_columns.id")),
)
table_association_table = sa.Table(
"sl_dataset_tables",
Model.metadata, # pylint: disable=no-member
sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id")),
sa.Column("table_id", sa.ForeignKey("sl_tables.id")),
)
class Dataset(Model, AuditMixinNullable, ExtraJSONMixin, ImportExportMixin):
"""
A table/view in a database.
"""
__tablename__ = "sl_datasets"
id = sa.Column(sa.Integer, primary_key=True)
# A temporary column, used for shadow writing to the new model. Once the ``SqlaTable``
# model has been deleted this column can be removed.
sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True)
# We use ``sa.Text`` for these attributes because (1) in modern databases the
# performance is the same as ``VARCHAR``[1] and (2) because some table names can be
# **really** long (eg, Google Sheets URLs).
#
# [1] https://www.postgresql.org/docs/9.1/datatype-character.html
name = sa.Column(sa.Text)
expression = sa.Column(sa.Text)
# n:n relationship
tables: List[Table] = relationship("Table", secondary=table_association_table)
# The relationship between datasets and columns is 1:n, but we use a many-to-many
# association to differentiate between the relationship between tables and columns.
columns: List[Column] = relationship(
"Column", secondary=column_association_table, cascade="all, delete"
)
# Does the dataset point directly to a ``Table``?
is_physical = sa.Column(sa.Boolean, default=False)
# Column is managed externally and should be read-only inside Superset
is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
external_url = sa.Column(sa.Text, nullable=True)

View File

@ -21,6 +21,9 @@ from typing import Any, Dict
from flask_babel import lazy_gettext as _
from marshmallow import fields, pre_load, Schema, ValidationError
from marshmallow.validate import Length
from marshmallow_sqlalchemy import SQLAlchemyAutoSchema
from superset.datasets.models import Dataset
get_delete_ids_schema = {"type": "array", "items": {"type": "integer"}}
get_export_ids_schema = {"type": "array", "items": {"type": "integer"}}
@ -209,3 +212,14 @@ class ImportV1DatasetSchema(Schema):
version = fields.String(required=True)
database_uuid = fields.UUID(required=True)
data = fields.URL()
class DatasetSchema(SQLAlchemyAutoSchema):
"""
Schema for the ``Dataset`` model.
"""
class Meta: # pylint: disable=too-few-public-methods
model = Dataset
load_instance = True
include_relationships = True

View File

@ -0,0 +1,598 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-few-public-methods
"""New dataset models
Revision ID: b8d3a24d9131
Revises: 5afbb1a5849b
Create Date: 2021-11-11 16:41:53.266965
"""
import json
from typing import Any, Dict, List, Optional, Type
from uuid import uuid4
import sqlalchemy as sa
from alembic import op
from sqlalchemy import and_, inspect, or_
from sqlalchemy.engine import create_engine, Engine
from sqlalchemy.engine.url import make_url, URL
from sqlalchemy.exc import ArgumentError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref, relationship, Session
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy_utils import UUIDType
from superset import app, db, db_engine_specs
from superset.connectors.sqla.models import ADDITIVE_METRIC_TYPES
from superset.extensions import encrypted_field_factory, security_manager
from superset.sql_parse import ParsedQuery
from superset.utils.memoized import memoized
# revision identifiers, used by Alembic.
revision = "b8d3a24d9131"
down_revision = "5afbb1a5849b"
Base = declarative_base()
custom_password_store = app.config["SQLALCHEMY_CUSTOM_PASSWORD_STORE"]
DB_CONNECTION_MUTATOR = app.config["DB_CONNECTION_MUTATOR"]
class Database(Base):
__tablename__ = "dbs"
__table_args__ = (UniqueConstraint("database_name"),)
id = sa.Column(sa.Integer, primary_key=True)
database_name = sa.Column(sa.String(250), unique=True, nullable=False)
sqlalchemy_uri = sa.Column(sa.String(1024), nullable=False)
password = sa.Column(encrypted_field_factory.create(sa.String(1024)))
impersonate_user = sa.Column(sa.Boolean, default=False)
encrypted_extra = sa.Column(encrypted_field_factory.create(sa.Text), nullable=True)
extra = sa.Column(
sa.Text,
default=json.dumps(
dict(
metadata_params={},
engine_params={},
metadata_cache_timeout={},
schemas_allowed_for_file_upload=[],
)
),
)
server_cert = sa.Column(encrypted_field_factory.create(sa.Text), nullable=True)
@property
def sqlalchemy_uri_decrypted(self) -> str:
try:
url = make_url(self.sqlalchemy_uri)
except (ArgumentError, ValueError):
return "dialect://invalid_uri"
if custom_password_store:
url.password = custom_password_store(url)
else:
url.password = self.password
return str(url)
@property
def backend(self) -> str:
sqlalchemy_url = make_url(self.sqlalchemy_uri_decrypted)
return sqlalchemy_url.get_backend_name() # pylint: disable=no-member
@classmethod
@memoized
def get_db_engine_spec_for_backend(
cls, backend: str
) -> Type[db_engine_specs.BaseEngineSpec]:
engines = db_engine_specs.get_engine_specs()
return engines.get(backend, db_engine_specs.BaseEngineSpec)
@property
def db_engine_spec(self) -> Type[db_engine_specs.BaseEngineSpec]:
return self.get_db_engine_spec_for_backend(self.backend)
def get_extra(self) -> Dict[str, Any]:
return self.db_engine_spec.get_extra_params(self)
def get_effective_user(
self, object_url: URL, user_name: Optional[str] = None,
) -> Optional[str]:
effective_username = None
if self.impersonate_user:
effective_username = object_url.username
if user_name:
effective_username = user_name
return effective_username
def get_encrypted_extra(self) -> Dict[str, Any]:
return json.loads(self.encrypted_extra) if self.encrypted_extra else {}
@memoized(watch=("impersonate_user", "sqlalchemy_uri_decrypted", "extra"))
def get_sqla_engine(self, schema: Optional[str] = None) -> Engine:
extra = self.get_extra()
sqlalchemy_url = make_url(self.sqlalchemy_uri_decrypted)
self.db_engine_spec.adjust_database_uri(sqlalchemy_url, schema)
effective_username = self.get_effective_user(sqlalchemy_url, "admin")
# If using MySQL or Presto for example, will set url.username
self.db_engine_spec.modify_url_for_impersonation(
sqlalchemy_url, self.impersonate_user, effective_username
)
params = extra.get("engine_params", {})
connect_args = params.get("connect_args", {})
if self.impersonate_user:
self.db_engine_spec.update_impersonation_config(
connect_args, str(sqlalchemy_url), effective_username
)
if connect_args:
params["connect_args"] = connect_args
params.update(self.get_encrypted_extra())
if DB_CONNECTION_MUTATOR:
sqlalchemy_url, params = DB_CONNECTION_MUTATOR(
sqlalchemy_url,
params,
effective_username,
security_manager,
"migration",
)
return create_engine(sqlalchemy_url, **params)
class TableColumn(Base):
__tablename__ = "table_columns"
__table_args__ = (UniqueConstraint("table_id", "column_name"),)
id = sa.Column(sa.Integer, primary_key=True)
table_id = sa.Column(sa.Integer, sa.ForeignKey("tables.id"))
is_active = sa.Column(sa.Boolean, default=True)
extra = sa.Column(sa.Text)
column_name = sa.Column(sa.String(255), nullable=False)
type = sa.Column(sa.String(32))
expression = sa.Column(sa.Text)
description = sa.Column(sa.Text)
is_dttm = sa.Column(sa.Boolean, default=False)
filterable = sa.Column(sa.Boolean, default=True)
groupby = sa.Column(sa.Boolean, default=True)
verbose_name = sa.Column(sa.String(1024))
python_date_format = sa.Column(sa.String(255))
class SqlMetric(Base):
__tablename__ = "sql_metrics"
__table_args__ = (UniqueConstraint("table_id", "metric_name"),)
id = sa.Column(sa.Integer, primary_key=True)
table_id = sa.Column(sa.Integer, sa.ForeignKey("tables.id"))
extra = sa.Column(sa.Text)
metric_type = sa.Column(sa.String(32))
metric_name = sa.Column(sa.String(255), nullable=False)
expression = sa.Column(sa.Text, nullable=False)
warning_text = sa.Column(sa.Text)
description = sa.Column(sa.Text)
d3format = sa.Column(sa.String(128))
verbose_name = sa.Column(sa.String(1024))
class SqlaTable(Base):
__tablename__ = "tables"
__table_args__ = (UniqueConstraint("database_id", "schema", "table_name"),)
def fetch_columns_and_metrics(self, session: Session) -> None:
self.columns = session.query(TableColumn).filter(
TableColumn.table_id == self.id
)
self.metrics = session.query(SqlMetric).filter(TableColumn.table_id == self.id)
id = sa.Column(sa.Integer, primary_key=True)
columns: List[TableColumn] = []
column_class = TableColumn
metrics: List[SqlMetric] = []
metric_class = SqlMetric
database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
database: Database = relationship(
"Database",
backref=backref("tables", cascade="all, delete-orphan"),
foreign_keys=[database_id],
)
schema = sa.Column(sa.String(255))
table_name = sa.Column(sa.String(250), nullable=False)
sql = sa.Column(sa.Text)
is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
external_url = sa.Column(sa.Text, nullable=True)
table_column_association_table = sa.Table(
"sl_table_columns",
Base.metadata,
sa.Column("table_id", sa.ForeignKey("sl_tables.id")),
sa.Column("column_id", sa.ForeignKey("sl_columns.id")),
)
dataset_column_association_table = sa.Table(
"sl_dataset_columns",
Base.metadata,
sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id")),
sa.Column("column_id", sa.ForeignKey("sl_columns.id")),
)
dataset_table_association_table = sa.Table(
"sl_dataset_tables",
Base.metadata,
sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id")),
sa.Column("table_id", sa.ForeignKey("sl_tables.id")),
)
class NewColumn(Base):
__tablename__ = "sl_columns"
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Text)
type = sa.Column(sa.Text)
expression = sa.Column(sa.Text)
is_physical = sa.Column(sa.Boolean, default=True)
description = sa.Column(sa.Text)
warning_text = sa.Column(sa.Text)
is_temporal = sa.Column(sa.Boolean, default=False)
is_aggregation = sa.Column(sa.Boolean, default=False)
is_additive = sa.Column(sa.Boolean, default=False)
is_spatial = sa.Column(sa.Boolean, default=False)
is_partition = sa.Column(sa.Boolean, default=False)
is_increase_desired = sa.Column(sa.Boolean, default=True)
is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
external_url = sa.Column(sa.Text, nullable=True)
extra_json = sa.Column(sa.Text, default="{}")
class NewTable(Base):
__tablename__ = "sl_tables"
__table_args__ = (UniqueConstraint("database_id", "catalog", "schema", "name"),)
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.Text)
schema = sa.Column(sa.Text)
catalog = sa.Column(sa.Text)
database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
database: Database = relationship(
"Database",
backref=backref("new_tables", cascade="all, delete-orphan"),
foreign_keys=[database_id],
)
columns: List[NewColumn] = relationship(
"NewColumn", secondary=table_column_association_table, cascade="all, delete"
)
is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
external_url = sa.Column(sa.Text, nullable=True)
class NewDataset(Base):
__tablename__ = "sl_datasets"
id = sa.Column(sa.Integer, primary_key=True)
sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True)
name = sa.Column(sa.Text)
expression = sa.Column(sa.Text)
tables: List[NewTable] = relationship(
"NewTable", secondary=dataset_table_association_table
)
columns: List[NewColumn] = relationship(
"NewColumn", secondary=dataset_column_association_table, cascade="all, delete"
)
is_physical = sa.Column(sa.Boolean, default=False)
is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
external_url = sa.Column(sa.Text, nullable=True)
def after_insert(target: SqlaTable) -> None: # pylint: disable=too-many-locals
"""
Copy old datasets to the new models.
"""
session = inspect(target).session
# get DB-specific conditional quoter for expressions that point to columns or
# table names
database = (
target.database
or session.query(Database).filter_by(id=target.database_id).one()
)
engine = database.get_sqla_engine(schema=target.schema)
conditional_quote = engine.dialect.identifier_preparer.quote
# create columns
columns = []
for column in target.columns:
# ``is_active`` might be ``None`` at this point, but it defaults to ``True``.
if column.is_active is False:
continue
extra_json = json.loads(column.extra or "{}")
for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}:
value = getattr(column, attr)
if value:
extra_json[attr] = value
columns.append(
NewColumn(
name=column.column_name,
type=column.type or "Unknown",
expression=column.expression or conditional_quote(column.column_name),
description=column.description,
is_temporal=column.is_dttm,
is_aggregation=False,
is_physical=column.expression is None or column.expression == "",
is_spatial=False,
is_partition=False,
is_increase_desired=True,
extra_json=json.dumps(extra_json) if extra_json else None,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
),
)
# create metrics
for metric in target.metrics:
extra_json = json.loads(metric.extra or "{}")
for attr in {"verbose_name", "metric_type", "d3format"}:
value = getattr(metric, attr)
if value:
extra_json[attr] = value
is_additive = (
metric.metric_type and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES
)
columns.append(
NewColumn(
name=metric.metric_name,
type="Unknown", # figuring this out would require a type inferrer
expression=metric.expression,
warning_text=metric.warning_text,
description=metric.description,
is_aggregation=True,
is_additive=is_additive,
is_physical=False,
is_spatial=False,
is_partition=False,
is_increase_desired=True,
extra_json=json.dumps(extra_json) if extra_json else None,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
),
)
# physical dataset
tables = []
if target.sql is None:
physical_columns = [column for column in columns if column.is_physical]
# create table
table = NewTable(
name=target.table_name,
schema=target.schema,
catalog=None, # currently not supported
database_id=target.database_id,
columns=physical_columns,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
)
tables.append(table)
# virtual dataset
else:
# mark all columns as virtual (not physical)
for column in columns:
column.is_physical = False
# find referenced tables
parsed = ParsedQuery(target.sql)
referenced_tables = parsed.tables
# predicate for finding the referenced tables
predicate = or_(
*[
and_(
NewTable.schema == (table.schema or target.schema),
NewTable.name == table.table,
)
for table in referenced_tables
]
)
tables = session.query(NewTable).filter(predicate).all()
# create the new dataset
dataset = NewDataset(
sqlatable_id=target.id,
name=target.table_name,
expression=target.sql or conditional_quote(target.table_name),
tables=tables,
columns=columns,
is_physical=target.sql is None,
is_managed_externally=target.is_managed_externally,
external_url=target.external_url,
)
session.add(dataset)
def upgrade():
# Create tables for the new models.
op.create_table(
"sl_columns",
# AuditMixinNullable
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.Column("changed_on", sa.DateTime(), nullable=True),
sa.Column("created_by_fk", sa.Integer(), nullable=True),
sa.Column("changed_by_fk", sa.Integer(), nullable=True),
# ExtraJSONMixin
sa.Column("extra_json", sa.Text(), nullable=True),
# ImportExportMixin
sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4),
# Column
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column("name", sa.TEXT(), nullable=False),
sa.Column("type", sa.TEXT(), nullable=False),
sa.Column("expression", sa.TEXT(), nullable=False),
sa.Column("is_physical", sa.BOOLEAN(), nullable=False, default=True,),
sa.Column("description", sa.TEXT(), nullable=True),
sa.Column("warning_text", sa.TEXT(), nullable=True),
sa.Column("unit", sa.TEXT(), nullable=True),
sa.Column("is_temporal", sa.BOOLEAN(), nullable=False),
sa.Column("is_spatial", sa.BOOLEAN(), nullable=False, default=False,),
sa.Column("is_partition", sa.BOOLEAN(), nullable=False, default=False,),
sa.Column("is_aggregation", sa.BOOLEAN(), nullable=False, default=False,),
sa.Column("is_additive", sa.BOOLEAN(), nullable=False, default=False,),
sa.Column("is_increase_desired", sa.BOOLEAN(), nullable=False, default=True,),
sa.Column(
"is_managed_externally",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
sa.Column("external_url", sa.Text(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
with op.batch_alter_table("sl_columns") as batch_op:
batch_op.create_unique_constraint("uq_sl_columns_uuid", ["uuid"])
op.create_table(
"sl_tables",
# AuditMixinNullable
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.Column("changed_on", sa.DateTime(), nullable=True),
sa.Column("created_by_fk", sa.Integer(), nullable=True),
sa.Column("changed_by_fk", sa.Integer(), nullable=True),
# ExtraJSONMixin
sa.Column("extra_json", sa.Text(), nullable=True),
# ImportExportMixin
sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4),
# Table
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column("database_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("catalog", sa.TEXT(), nullable=True),
sa.Column("schema", sa.TEXT(), nullable=True),
sa.Column("name", sa.TEXT(), nullable=False),
sa.Column(
"is_managed_externally",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
sa.Column("external_url", sa.Text(), nullable=True),
sa.ForeignKeyConstraint(["database_id"], ["dbs.id"], name="sl_tables_ibfk_1"),
sa.PrimaryKeyConstraint("id"),
)
with op.batch_alter_table("sl_tables") as batch_op:
batch_op.create_unique_constraint("uq_sl_tables_uuid", ["uuid"])
op.create_table(
"sl_table_columns",
sa.Column("table_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("column_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["column_id"], ["sl_columns.id"], name="sl_table_columns_ibfk_2"
),
sa.ForeignKeyConstraint(
["table_id"], ["sl_tables.id"], name="sl_table_columns_ibfk_1"
),
)
op.create_table(
"sl_datasets",
# AuditMixinNullable
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.Column("changed_on", sa.DateTime(), nullable=True),
sa.Column("created_by_fk", sa.Integer(), nullable=True),
sa.Column("changed_by_fk", sa.Integer(), nullable=True),
# ExtraJSONMixin
sa.Column("extra_json", sa.Text(), nullable=True),
# ImportExportMixin
sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4),
# Dataset
sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column("sqlatable_id", sa.INTEGER(), nullable=True),
sa.Column("name", sa.TEXT(), nullable=False),
sa.Column("expression", sa.TEXT(), nullable=False),
sa.Column("is_physical", sa.BOOLEAN(), nullable=False, default=False,),
sa.Column(
"is_managed_externally",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
sa.Column("external_url", sa.Text(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
with op.batch_alter_table("sl_datasets") as batch_op:
batch_op.create_unique_constraint("uq_sl_datasets_uuid", ["uuid"])
batch_op.create_unique_constraint(
"uq_sl_datasets_sqlatable_id", ["sqlatable_id"]
)
op.create_table(
"sl_dataset_columns",
sa.Column("dataset_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("column_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["column_id"], ["sl_columns.id"], name="sl_dataset_columns_ibfk_2"
),
sa.ForeignKeyConstraint(
["dataset_id"], ["sl_datasets.id"], name="sl_dataset_columns_ibfk_1"
),
)
op.create_table(
"sl_dataset_tables",
sa.Column("dataset_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column("table_id", sa.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["dataset_id"], ["sl_datasets.id"], name="sl_dataset_tables_ibfk_1"
),
sa.ForeignKeyConstraint(
["table_id"], ["sl_tables.id"], name="sl_dataset_tables_ibfk_2"
),
)
# migrate existing datasets to the new models
bind = op.get_bind()
session = db.Session(bind=bind) # pylint: disable=no-member
datasets = session.query(SqlaTable).all()
for dataset in datasets:
dataset.fetch_columns_and_metrics(session)
after_insert(target=dataset)
def downgrade():
op.drop_table("sl_dataset_columns")
op.drop_table("sl_dataset_tables")
op.drop_table("sl_datasets")
op.drop_table("sl_table_columns")
op.drop_table("sl_tables")
op.drop_table("sl_columns")

View File

@ -29,6 +29,7 @@ import pytz
import sqlalchemy as sa
import yaml
from flask import escape, g, Markup
from flask_appbuilder import Model
from flask_appbuilder.models.decorators import renders
from flask_appbuilder.models.mixins import AuditMixin
from flask_appbuilder.security.sqla.models import User
@ -510,3 +511,22 @@ class CertificationMixin:
@property
def warning_markdown(self) -> Optional[str]:
return self.get_extra_dict().get("warning_markdown")
def clone_model(
target: Model, ignore: Optional[List[str]] = None, **kwargs: Any
) -> Model:
"""
Clone a SQLAlchemy model.
"""
ignore = ignore or []
table = target.__table__
data = {
attr: getattr(target, attr)
for attr in table.columns.keys()
if attr not in table.primary_key.columns.keys() and attr not in ignore
}
data.update(kwargs)
return target.__class__(**data)

View File

@ -950,6 +950,7 @@ class SupersetSecurityManager( # pylint: disable=too-many-public-methods
.where(link_table.c.id == target.id)
.values(perm=target.get_perm())
)
target.perm = target.get_perm()
if (
hasattr(target, "schema_perm")
@ -960,6 +961,7 @@ class SupersetSecurityManager( # pylint: disable=too-many-public-methods
.where(link_table.c.id == target.id)
.values(schema_perm=target.get_schema_perm())
)
target.schema_perm = target.get_schema_perm()
pvm_names = []
if target.__tablename__ in {"dbs", "clusters"}:

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

92
superset/tables/models.py Normal file
View File

@ -0,0 +1,92 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Table model.
This model was introduced in SIP-68 (https://github.com/apache/superset/issues/14909),
and represents a "table" in a given database -- either a physical table or a view. In
addition to a table, new models for columns, metrics, and datasets were also introduced.
These models are not fully implemented, and shouldn't be used yet.
"""
from typing import List
import sqlalchemy as sa
from flask_appbuilder import Model
from sqlalchemy.orm import backref, relationship
from sqlalchemy.schema import UniqueConstraint
from superset.columns.models import Column
from superset.models.core import Database
from superset.models.helpers import (
AuditMixinNullable,
ExtraJSONMixin,
ImportExportMixin,
)
association_table = sa.Table(
"sl_table_columns",
Model.metadata, # pylint: disable=no-member
sa.Column("table_id", sa.ForeignKey("sl_tables.id")),
sa.Column("column_id", sa.ForeignKey("sl_columns.id")),
)
class Table(Model, AuditMixinNullable, ExtraJSONMixin, ImportExportMixin):
"""
A table/view in a database.
"""
__tablename__ = "sl_tables"
# Note this uniqueness constraint is not part of the physical schema, i.e., it does
# not exist in the migrations. The reason it does not physically exist is MySQL,
# PostgreSQL, etc. have a different interpretation of uniqueness when it comes to NULL
# which is problematic given the catalog and schema are optional.
__table_args__ = (UniqueConstraint("database_id", "catalog", "schema", "name"),)
id = sa.Column(sa.Integer, primary_key=True)
database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False)
database: Database = relationship(
"Database",
# TODO (betodealmeida): rename the backref to ``tables`` once we get rid of the
# old models.
backref=backref("new_tables", cascade="all, delete-orphan"),
foreign_keys=[database_id],
)
# We use ``sa.Text`` for these attributes because (1) in modern databases the
# performance is the same as ``VARCHAR``[1] and (2) because some table names can be
# **really** long (eg, Google Sheets URLs).
#
# [1] https://www.postgresql.org/docs/9.1/datatype-character.html
catalog = sa.Column(sa.Text)
schema = sa.Column(sa.Text)
name = sa.Column(sa.Text)
# The relationship between tables and columns is 1:n, but we use a many-to-many
# association to differentiate between the relationship between datasets and
# columns.
columns: List[Column] = relationship(
"Column", secondary=association_table, cascade="all, delete"
)
# Column is managed externally and should be read-only inside Superset
is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False)
external_url = sa.Column(sa.Text, nullable=True)

View File

@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Schema for table model.
This model was introduced in SIP-68 (https://github.com/apache/superset/issues/14909),
and represents a "table" in a given database -- either a physical table or a view. In
addition to a table, new models for columns, metrics, and datasets were also introduced.
These models are not fully implemented, and shouldn't be used yet.
"""
from marshmallow_sqlalchemy import SQLAlchemyAutoSchema
from superset.tables.models import Table
class TableSchema(SQLAlchemyAutoSchema):
"""
Schema for the ``Table`` model.
"""
class Meta: # pylint: disable=too-few-public-methods
model = Table
load_instance = True
include_relationships = True

View File

@ -21,7 +21,7 @@ from typing import Any, Dict, Generator, List, TYPE_CHECKING
import pytest
from superset import security_manager as sm
from superset import db, security_manager as sm
from superset.dashboards.filter_sets.consts import (
DESCRIPTION_FIELD,
JSON_METADATA_FIELD,
@ -66,20 +66,6 @@ if TYPE_CHECKING:
security_manager: BaseSecurityManager = sm
# @pytest.fixture(autouse=True, scope="session")
# def setup_sample_data() -> Any:
# pass
@pytest.fixture(autouse=True)
def expire_on_commit_true() -> Generator[None, None, None]:
ctx: AppContext
with app.app_context() as ctx:
ctx.app.appbuilder.get_session.configure(expire_on_commit=False)
yield
ctx.app.appbuilder.get_session.configure(expire_on_commit=True)
@pytest.fixture(autouse=True, scope="module")
def test_users() -> Generator[Dict[str, int], None, None]:
usernames = [
@ -92,17 +78,14 @@ def test_users() -> Generator[Dict[str, int], None, None]:
filter_set_role = build_filter_set_role()
admin_role: Role = security_manager.find_role("Admin")
usernames_to_ids = create_test_users(admin_role, filter_set_role, usernames)
yield usernames_to_ids
ctx: AppContext
delete_users(usernames_to_ids)
yield usernames_to_ids
delete_users(usernames_to_ids)
def delete_users(usernames_to_ids: Dict[str, int]) -> None:
with app.app_context() as ctx:
session: Session = ctx.app.appbuilder.get_session
for username in usernames_to_ids.keys():
session.delete(security_manager.find_user(username))
session.commit()
for username in usernames_to_ids.keys():
db.session.delete(security_manager.find_user(username))
db.session.commit()
def create_test_users(
@ -150,106 +133,86 @@ def client() -> Generator[FlaskClient[Any], None, None]:
@pytest.fixture
def dashboard() -> Generator[Dashboard, None, None]:
dashboard: Dashboard
slice_: Slice
datasource: SqlaTable
database: Database
session: Session
try:
with app.app_context() as ctx:
dashboard_owner_user = security_manager.find_user(DASHBOARD_OWNER_USERNAME)
database = create_database("test_database_filter_sets")
datasource = create_datasource_table(
name="test_datasource", database=database, owners=[dashboard_owner_user]
)
slice_ = create_slice(
datasource=datasource, name="test_slice", owners=[dashboard_owner_user]
)
dashboard = create_dashboard(
dashboard_title="test_dashboard",
published=True,
slices=[slice_],
owners=[dashboard_owner_user],
)
session = ctx.app.appbuilder.get_session
session.add(dashboard)
session.commit()
yield dashboard
except Exception as ex:
print(str(ex))
finally:
with app.app_context() as ctx:
session = ctx.app.appbuilder.get_session
try:
dashboard.owners = []
slice_.owners = []
datasource.owners = []
session.merge(dashboard)
session.merge(slice_)
session.merge(datasource)
session.commit()
session.delete(dashboard)
session.delete(slice_)
session.delete(datasource)
session.delete(database)
session.commit()
except Exception as ex:
print(str(ex))
def dashboard(app_context) -> Generator[Dashboard, None, None]:
dashboard_owner_user = security_manager.find_user(DASHBOARD_OWNER_USERNAME)
database = create_database("test_database_filter_sets")
datasource = create_datasource_table(
name="test_datasource", database=database, owners=[dashboard_owner_user]
)
slice_ = create_slice(
datasource=datasource, name="test_slice", owners=[dashboard_owner_user]
)
dashboard = create_dashboard(
dashboard_title="test_dashboard",
published=True,
slices=[slice_],
owners=[dashboard_owner_user],
)
db.session.add(dashboard)
db.session.commit()
yield dashboard
db.session.delete(dashboard)
db.session.delete(slice_)
db.session.delete(datasource)
db.session.delete(database)
db.session.commit()
@pytest.fixture
def dashboard_id(dashboard) -> int:
return dashboard.id
def dashboard_id(dashboard: Dashboard) -> Generator[int, None, None]:
yield dashboard.id
@pytest.fixture
def filtersets(
dashboard_id: int, test_users: Dict[str, int], dumped_valid_json_metadata: str
) -> Generator[Dict[str, List[FilterSet]], None, None]:
try:
with app.app_context() as ctx:
session: Session = ctx.app.appbuilder.get_session
first_filter_set = FilterSet(
name="filter_set_1_of_" + str(dashboard_id),
dashboard_id=dashboard_id,
json_metadata=dumped_valid_json_metadata,
owner_id=dashboard_id,
owner_type="Dashboard",
)
second_filter_set = FilterSet(
name="filter_set_2_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=dashboard_id,
owner_type="Dashboard",
)
third_filter_set = FilterSet(
name="filter_set_3_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=test_users[FILTER_SET_OWNER_USERNAME],
owner_type="User",
)
forth_filter_set = FilterSet(
name="filter_set_4_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=test_users[FILTER_SET_OWNER_USERNAME],
owner_type="User",
)
session.add(first_filter_set)
session.add(second_filter_set)
session.add(third_filter_set)
session.add(forth_filter_set)
session.commit()
yv = {
"Dashboard": [first_filter_set, second_filter_set],
FILTER_SET_OWNER_USERNAME: [third_filter_set, forth_filter_set],
}
yield yv
except Exception as ex:
print(str(ex))
first_filter_set = FilterSet(
name="filter_set_1_of_" + str(dashboard_id),
dashboard_id=dashboard_id,
json_metadata=dumped_valid_json_metadata,
owner_id=dashboard_id,
owner_type="Dashboard",
)
second_filter_set = FilterSet(
name="filter_set_2_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=dashboard_id,
owner_type="Dashboard",
)
third_filter_set = FilterSet(
name="filter_set_3_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=test_users[FILTER_SET_OWNER_USERNAME],
owner_type="User",
)
fourth_filter_set = FilterSet(
name="filter_set_4_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=test_users[FILTER_SET_OWNER_USERNAME],
owner_type="User",
)
db.session.add(first_filter_set)
db.session.add(second_filter_set)
db.session.add(third_filter_set)
db.session.add(fourth_filter_set)
db.session.commit()
yield {
"Dashboard": [first_filter_set, second_filter_set],
FILTER_SET_OWNER_USERNAME: [third_filter_set, fourth_filter_set],
}
db.session.delete(first_filter_set)
db.session.delete(second_filter_set)
db.session.delete(third_filter_set)
db.session.delete(fourth_filter_set)
db.session.commit()
@pytest.fixture
@ -299,8 +262,8 @@ def valid_filter_set_data_for_update(
@pytest.fixture
def not_exists_dashboard(dashboard_id: int) -> int:
return dashboard_id + 1
def not_exists_dashboard_id(dashboard_id: int) -> Generator[int, None, None]:
yield dashboard_id + 1
@pytest.fixture

View File

@ -94,14 +94,14 @@ class TestCreateFilterSetsApi:
def test_with_dashboard_not_exists__404(
self,
not_exists_dashboard: int,
not_exists_dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# act
login(client, "admin")
response = call_create_filter_set(
client, not_exists_dashboard, valid_filter_set_data_for_create
client, not_exists_dashboard_id, valid_filter_set_data_for_create
)
# assert

View File

@ -61,7 +61,7 @@ class TestDeleteFilterSet:
def test_with_dashboard_not_exists_filterset_not_exists__404(
self,
not_exists_dashboard: int,
not_exists_dashboard_id: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
@ -70,14 +70,14 @@ class TestDeleteFilterSet:
filter_set_id = max(collect_all_ids(filtersets)) + 1
response = call_delete_filter_set(
client, {"id": filter_set_id}, not_exists_dashboard
client, {"id": filter_set_id}, not_exists_dashboard_id
)
# assert
assert response.status_code == 404
def test_with_dashboard_not_exists_filterset_exists__404(
self,
not_exists_dashboard: int,
not_exists_dashboard_id: int,
dashboard_based_filter_set_dict: Dict[str, Any],
client: FlaskClient[Any],
):
@ -86,7 +86,7 @@ class TestDeleteFilterSet:
# act
response = call_delete_filter_set(
client, dashboard_based_filter_set_dict, not_exists_dashboard
client, dashboard_based_filter_set_dict, not_exists_dashboard_id
)
# assert
assert response.status_code == 404

View File

@ -37,13 +37,13 @@ if TYPE_CHECKING:
class TestGetFilterSetsApi:
def test_with_dashboard_not_exists__404(
self, not_exists_dashboard: int, client: FlaskClient[Any],
self, not_exists_dashboard_id: int, client: FlaskClient[Any],
):
# arrange
login(client, "admin")
# act
response = call_get_filter_sets(client, not_exists_dashboard)
response = call_get_filter_sets(client, not_exists_dashboard_id)
# assert
assert response.status_code == 404

View File

@ -85,7 +85,7 @@ class TestUpdateFilterSet:
def test_with_dashboard_not_exists_filterset_not_exists__404(
self,
not_exists_dashboard: int,
not_exists_dashboard_id: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
@ -94,14 +94,14 @@ class TestUpdateFilterSet:
filter_set_id = max(collect_all_ids(filtersets)) + 1
response = call_update_filter_set(
client, {"id": filter_set_id}, {}, not_exists_dashboard
client, {"id": filter_set_id}, {}, not_exists_dashboard_id
)
# assert
assert response.status_code == 404
def test_with_dashboard_not_exists_filterset_exists__404(
self,
not_exists_dashboard: int,
not_exists_dashboard_id: int,
dashboard_based_filter_set_dict: Dict[str, Any],
client: FlaskClient[Any],
):
@ -110,7 +110,7 @@ class TestUpdateFilterSet:
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, {}, not_exists_dashboard
client, dashboard_based_filter_set_dict, {}, not_exists_dashboard_id
)
# assert
assert response.status_code == 404

View File

@ -20,7 +20,7 @@ from typing import List, Optional
from flask_appbuilder import Model
from flask_appbuilder.security.sqla.models import User
from superset import appbuilder
from superset import db
from superset.connectors.sqla.models import SqlaTable, sqlatable_user
from superset.models.core import Database
from superset.models.dashboard import (
@ -38,7 +38,7 @@ from tests.integration_tests.dashboards.dashboard_test_utils import (
logger = logging.getLogger(__name__)
session = appbuilder.get_session
session = db.session
inserted_dashboards_ids = []
inserted_databases_ids = []
@ -192,9 +192,11 @@ def delete_all_inserted_objects() -> None:
def delete_all_inserted_dashboards():
try:
dashboards_to_delete: List[Dashboard] = session.query(Dashboard).filter(
Dashboard.id.in_(inserted_dashboards_ids)
).all()
dashboards_to_delete: List[Dashboard] = (
session.query(Dashboard)
.filter(Dashboard.id.in_(inserted_dashboards_ids))
.all()
)
for dashboard in dashboards_to_delete:
try:
delete_dashboard(dashboard, False)
@ -239,9 +241,9 @@ def delete_dashboard_slices_associations(dashboard: Dashboard) -> None:
def delete_all_inserted_slices():
try:
slices_to_delete: List[Slice] = session.query(Slice).filter(
Slice.id.in_(inserted_slices_ids)
).all()
slices_to_delete: List[Slice] = (
session.query(Slice).filter(Slice.id.in_(inserted_slices_ids)).all()
)
for slice in slices_to_delete:
try:
delete_slice(slice, False)
@ -270,9 +272,11 @@ def delete_slice_users_associations(slice_: Slice) -> None:
def delete_all_inserted_tables():
try:
tables_to_delete: List[SqlaTable] = session.query(SqlaTable).filter(
SqlaTable.id.in_(inserted_sqltables_ids)
).all()
tables_to_delete: List[SqlaTable] = (
session.query(SqlaTable)
.filter(SqlaTable.id.in_(inserted_sqltables_ids))
.all()
)
for table in tables_to_delete:
try:
delete_sqltable(table, False)
@ -303,9 +307,11 @@ def delete_table_users_associations(table: SqlaTable) -> None:
def delete_all_inserted_dbs():
try:
dbs_to_delete: List[Database] = session.query(Database).filter(
Database.id.in_(inserted_databases_ids)
).all()
dbs_to_delete: List[Database] = (
session.query(Database)
.filter(Database.id.in_(inserted_databases_ids))
.all()
)
for db in dbs_to_delete:
try:
delete_database(db, False)

View File

@ -500,6 +500,7 @@ class TestDatasetApi(SupersetTestCase):
"message": {"table_name": ["Dataset energy_usage already exists"]}
}
@unittest.skip("test is failing stochastically")
def test_create_dataset_same_name_different_schema(self):
if backend() == "sqlite":
# sqlite doesn't support schemas

View File

@ -52,6 +52,10 @@ from tests.integration_tests.fixtures.public_role import (
public_role_like_gamma,
public_role_like_test_role,
)
from tests.integration_tests.fixtures.birth_names_dashboard import (
load_birth_names_dashboard_with_slices,
load_birth_names_data,
)
from tests.integration_tests.fixtures.world_bank_dashboard import (
load_world_bank_dashboard_with_slices,
load_world_bank_data,
@ -224,7 +228,7 @@ class TestRolePermission(SupersetTestCase):
)
# database change
new_db = Database(sqlalchemy_uri="some_uri", database_name="tmp_db")
new_db = Database(sqlalchemy_uri="sqlite://", database_name="tmp_db")
session.add(new_db)
stored_table.database = (
session.query(Database).filter_by(database_name="tmp_db").one()
@ -358,9 +362,7 @@ class TestRolePermission(SupersetTestCase):
def test_set_perm_database(self):
session = db.session
database = Database(
database_name="tmp_database", sqlalchemy_uri="sqlite://test"
)
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
session.add(database)
stored_db = (
@ -411,9 +413,7 @@ class TestRolePermission(SupersetTestCase):
db.session.commit()
def test_hybrid_perm_database(self):
database = Database(
database_name="tmp_database3", sqlalchemy_uri="sqlite://test"
)
database = Database(database_name="tmp_database3", sqlalchemy_uri="sqlite://")
db.session.add(database)
@ -437,9 +437,7 @@ class TestRolePermission(SupersetTestCase):
def test_set_perm_slice(self):
session = db.session
database = Database(
database_name="tmp_database", sqlalchemy_uri="sqlite://test"
)
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
table = SqlaTable(table_name="tmp_perm_table", database=database)
session.add(database)
session.add(table)
@ -573,6 +571,7 @@ class TestRolePermission(SupersetTestCase):
) # wb_health_population slice, has access
self.assertNotIn("Girl Name Cloud", data) # birth_names slice, no access
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
@pytest.mark.usefixtures("public_role_like_gamma")
def test_public_sync_role_data_perms(self):
"""

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=import-outside-toplevel, unused-argument
from sqlalchemy.orm.session import Session
def test_column_model(app_context: None, session: Session) -> None:
"""
Test basic attributes of a ``Column``.
"""
from superset.columns.models import Column
engine = session.get_bind()
Column.metadata.create_all(engine) # pylint: disable=no-member
column = Column(name="ds", type="TIMESTAMP", expression="ds",)
session.add(column)
session.flush()
assert column.id == 1
assert column.uuid is not None
assert column.name == "ds"
assert column.type == "TIMESTAMP"
assert column.expression == "ds"
# test that default values are set correctly
assert column.description is None
assert column.warning_text is None
assert column.unit is None
assert column.is_temporal is False
assert column.is_spatial is False
assert column.is_partition is False
assert column.is_aggregation is False
assert column.is_additive is False
assert column.is_increase_desired is True

View File

@ -22,15 +22,14 @@ from typing import Any, Dict
from sqlalchemy.orm.session import Session
from superset.datasets.schemas import ImportV1DatasetSchema
def test_import_(app_context: None, session: Session) -> None:
def test_import_dataset(app_context: None, session: Session) -> None:
"""
Test importing a dataset.
"""
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
from superset.datasets.commands.importers.v1.utils import import_dataset
from superset.datasets.schemas import ImportV1DatasetSchema
from superset.models.core import Database
engine = session.get_bind()
@ -120,11 +119,11 @@ def test_import_(app_context: None, session: Session) -> None:
assert len(sqla_table.columns) == 1
assert sqla_table.columns[0].column_name == "profit"
assert sqla_table.columns[0].verbose_name is None
assert sqla_table.columns[0].is_dttm is False
assert sqla_table.columns[0].is_active is True
assert sqla_table.columns[0].is_dttm is None
assert sqla_table.columns[0].is_active is None
assert sqla_table.columns[0].type == "INTEGER"
assert sqla_table.columns[0].groupby is True
assert sqla_table.columns[0].filterable is True
assert sqla_table.columns[0].groupby is None
assert sqla_table.columns[0].filterable is None
assert sqla_table.columns[0].expression == "revenue-expenses"
assert sqla_table.columns[0].description is None
assert sqla_table.columns[0].python_date_format is None
@ -139,6 +138,7 @@ def test_import_column_extra_is_string(app_context: None, session: Session) -> N
"""
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
from superset.datasets.commands.importers.v1.utils import import_dataset
from superset.datasets.schemas import ImportV1DatasetSchema
from superset.models.core import Database
engine = session.get_bind()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=import-outside-toplevel, unused-argument
from sqlalchemy.orm.session import Session
def test_table_model(app_context: None, session: Session) -> None:
"""
Test basic attributes of a ``Table``.
"""
from superset.columns.models import Column
from superset.models.core import Database
from superset.tables.models import Table
engine = session.get_bind()
Table.metadata.create_all(engine) # pylint: disable=no-member
table = Table(
name="my_table",
schema="my_schema",
catalog="my_catalog",
database=Database(database_name="my_database", sqlalchemy_uri="test://"),
columns=[Column(name="ds", type="TIMESTAMP", expression="ds",)],
)
session.add(table)
session.flush()
assert table.id == 1
assert table.uuid is not None
assert table.database_id == 1
assert table.catalog == "my_catalog"
assert table.schema == "my_schema"
assert table.name == "my_table"
assert [column.name for column in table.columns] == ["ds"]