feat: support databend for superset (#23308)

This commit is contained in:
Jeremy 2023-11-03 03:00:11 +08:00 committed by GitHub
parent b58cc24bd4
commit 5690946b1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 508 additions and 1 deletions

View File

@ -128,6 +128,7 @@ Here are some of the major database solutions that are supported:
<img src="superset-frontend/src/assets/images/pinot.png" alt="pinot" border="0" width="200" height="80"/>
<img src="superset-frontend/src/assets/images/teradata.png" alt="teradata" border="0" width="200" height="80"/>
<img src="superset-frontend/src/assets/images/yugabyte.png" alt="yugabyte" border="0" width="200" height="80"/>
<img src="superset-frontend/src/assets/images/databend.svg" alt="databend" border="0" width="200" height="80"/>
<img src="superset-frontend/src/assets/images/starrocks.png" alt="starrocks" border="0" width="200" height="80"/>
</p>

View File

@ -0,0 +1,23 @@
---
title: Databend
hide_title: true
sidebar_position: 39
version: 1
---
## Databend
The recommended connector library for Databend is [databend-sqlalchemy](https://pypi.org/project/databend-sqlalchemy/).
Superset has been tested on `databend-sqlalchemy>=0.2.3`.
The recommended connection string is:
```
databend://{username}:{password}@{host}:{port}/{database_name}
```
Here's a connection string example of Superset connecting to a Databend database:
```
databend://user:password@localhost:8000/default?secure=false
```

View File

@ -50,7 +50,6 @@ VERSION_INFO_FILE = os.path.join(BASE_DIR, "superset", "static", "version_info.j
with open(VERSION_INFO_FILE, "w") as version_file:
json.dump(version_info, version_file)
setup(
name="apache-superset",
description="A modern, enterprise-ready business intelligence web application",

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 7.1 KiB

View File

@ -0,0 +1,353 @@
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
import re
from datetime import datetime
from typing import Any, cast, TYPE_CHECKING
from flask_babel import gettext as __
from marshmallow import fields, Schema
from marshmallow.validate import Range
from sqlalchemy import types
from sqlalchemy.engine.url import URL
from urllib3.exceptions import NewConnectionError
from superset.databases.utils import make_url_safe
from superset.db_engine_specs.base import (
BaseEngineSpec,
BasicParametersMixin,
BasicParametersType,
BasicPropertiesType,
)
from superset.db_engine_specs.exceptions import SupersetDBAPIDatabaseError
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.utils.core import GenericDataType
from superset.utils.hashing import md5_sha_from_str
from superset.utils.network import is_hostname_valid, is_port_open
if TYPE_CHECKING:
from superset.models.core import Database
logger = logging.getLogger(__name__)
class DatabendBaseEngineSpec(BaseEngineSpec):
"""Shared engine spec for Databend."""
time_secondary_columns = True
time_groupby_inline = True
_time_grain_expressions = {
None: "{col}",
"PT1M": "to_start_of_minute(TO_DATETIME({col}))",
"PT5M": "to_start_of_five_minutes(TO_DATETIME({col}))",
"PT10M": "to_start_of_ten_minutes(TO_DATETIME({col}))",
"PT15M": "to_start_of_fifteen_minutes(TO_DATETIME({col}))",
"PT30M": "TO_DATETIME(intDiv(toUInt32(TO_DATETIME({col})), 1800)*1800)",
"PT1H": "to_start_of_hour(TO_DATETIME({col}))",
"P1D": "to_start_of_day(TO_DATETIME({col}))",
"P1W": "to_monday(TO_DATETIME({col}))",
"P1M": "to_start_of_month(TO_DATETIME({col}))",
"P3M": "to_start_of_quarter(TO_DATETIME({col}))",
"P1Y": "to_start_of_year(TO_DATETIME({col}))",
}
column_type_mappings = (
(
re.compile(r".*Varchar.*", re.IGNORECASE),
types.String(),
GenericDataType.STRING,
),
(
re.compile(r".*Array.*", re.IGNORECASE),
types.String(),
GenericDataType.STRING,
),
(
re.compile(r".*Map.*", re.IGNORECASE),
types.String(),
GenericDataType.STRING,
),
(
re.compile(r".*Json.*", re.IGNORECASE),
types.JSON(),
GenericDataType.STRING,
),
(
re.compile(r".*Bool.*", re.IGNORECASE),
types.Boolean(),
GenericDataType.BOOLEAN,
),
(
re.compile(r".*String.*", re.IGNORECASE),
types.String(),
GenericDataType.STRING,
),
(
re.compile(r".*Int\d+.*", re.IGNORECASE),
types.INTEGER(),
GenericDataType.NUMERIC,
),
(
re.compile(r".*Float\d+.*", re.IGNORECASE),
types.FLOAT(),
GenericDataType.NUMERIC,
),
(
re.compile(r".*Double\d+.*", re.IGNORECASE),
types.FLOAT(),
GenericDataType.NUMERIC,
),
(
re.compile(r".*Decimal.*", re.IGNORECASE),
types.DECIMAL(),
GenericDataType.NUMERIC,
),
(
re.compile(r".*DateTime.*", re.IGNORECASE),
types.DateTime(),
GenericDataType.TEMPORAL,
),
(
re.compile(r".*Date.*", re.IGNORECASE),
types.Date(),
GenericDataType.TEMPORAL,
),
)
@classmethod
def epoch_to_dttm(cls) -> str:
return "{col}"
@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: dict[str, Any] | None = None
) -> str | None:
sqla_type = cls.get_sqla_column_type(target_type)
if isinstance(sqla_type, types.Date):
return f"to_date('{dttm.date().isoformat()}')"
if isinstance(sqla_type, types.DateTime):
return f"""to_dateTime('{dttm.isoformat(sep=" ", timespec="seconds")}')"""
return None
class DatabendEngineSpec(DatabendBaseEngineSpec):
"""Engine spec for databend_sqlalchemy connector"""
engine = "databend"
engine_name = "Databend"
_function_names: list[str] = []
_show_functions_column = "name"
supports_file_upload = False
@classmethod
def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]:
return {NewConnectionError: SupersetDBAPIDatabaseError}
@classmethod
def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception:
new_exception = cls.get_dbapi_exception_mapping().get(type(exception))
if new_exception == SupersetDBAPIDatabaseError:
return SupersetDBAPIDatabaseError("Connection failed")
if not new_exception:
return exception
return new_exception(str(exception))
@classmethod
def get_function_names(cls, database: Database) -> list[str]:
if cls._function_names:
return cls._function_names
try:
names = database.get_df("SELECT name FROM system.functions;")[
"name"
].tolist()
cls._function_names = names
return names
except Exception as ex: # pylint: disable=broad-except
logger.exception("Error retrieving system.functions: %s", str(ex))
return []
class DatabendParametersSchema(Schema):
username = fields.String(allow_none=True, description=__("Username"))
password = fields.String(allow_none=True, description=__("Password"))
host = fields.String(required=True, description=__("Hostname or IP address"))
port = fields.Integer(
allow_none=True,
description=__("Database port"),
validate=Range(min=0, max=65535),
)
database = fields.String(allow_none=True, description=__("Database name"))
encryption = fields.Boolean(
default=True, description=__("Use an encrypted connection to the database")
)
query = fields.Dict(
keys=fields.Str(), values=fields.Raw(), description=__("Additional parameters")
)
class DatabendConnectEngineSpec(DatabendEngineSpec, BasicParametersMixin):
"""Engine spec for databend sqlalchemy connector"""
engine = "databend"
engine_name = "Databend"
default_driver = "databend"
_function_names: list[str] = []
sqlalchemy_uri_placeholder = (
"databend://user:password@host[:port][/dbname][?secure=value&=value...]"
)
parameters_schema = DatabendParametersSchema()
encryption_parameters = {"secure": "true"}
@classmethod
def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]:
return {}
@classmethod
def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception:
new_exception = cls.get_dbapi_exception_mapping().get(type(exception))
if new_exception == SupersetDBAPIDatabaseError:
return SupersetDBAPIDatabaseError("Connection failed")
if not new_exception:
return exception
return new_exception(str(exception))
@classmethod
def get_function_names(cls, database: Database) -> list[str]:
if cls._function_names:
return cls._function_names
try:
names = database.get_df("SELECT name FROM system.functions;")[
"name"
].tolist()
cls._function_names = names
return names
except Exception as ex: # pylint: disable=broad-except
logger.exception("Error retrieving system.functions: %s", str(ex))
return []
@classmethod
def get_datatype(cls, type_code: str) -> str:
return type_code
@classmethod
def build_sqlalchemy_uri(
cls, parameters: BasicParametersType, *_args: dict[str, str] | None
) -> str:
url_params = parameters.copy()
if url_params.get("encryption"):
query = parameters.get("query", {}).copy()
query.update(cls.encryption_parameters)
url_params["query"] = query
if not url_params.get("database"):
url_params["database"] = "__default__"
url_params.pop("encryption", None)
return str(URL(f"{cls.engine}", **url_params))
@classmethod
def get_parameters_from_uri(
cls, uri: str, *_args: dict[str, Any] | None
) -> BasicParametersType:
url = make_url_safe(uri)
query = url.query
if "secure" in query:
encryption = url.query.get("secure") == "true"
query.pop("secure")
else:
encryption = False
return BasicParametersType(
username=url.username,
password=url.password,
host=url.host,
port=url.port,
database="" if url.database == "__default__" else cast(str, url.database),
query=query,
encryption=encryption,
)
@classmethod
def default_port(cls, interface: str, secure: bool) -> int:
if interface.startswith("http"):
return 443 if secure else 8000
raise ValueError("Unrecognized Databend interface")
@classmethod
def validate_parameters(
cls, properties: BasicPropertiesType
) -> list[SupersetError]:
# The newest versions of superset send a "properties" object with a
# parameters key, instead of just the parameters, so we hack to be compatible
parameters = properties.get("parameters", properties)
host = parameters.get("host", None)
host = str(host) if host is not None else None
if not host:
return [
SupersetError(
"Hostname is required",
SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
ErrorLevel.WARNING,
{"missing": ["host"]},
)
]
if not is_hostname_valid(host):
return [
SupersetError(
"The hostname provided can't be resolved.",
SupersetErrorType.CONNECTION_INVALID_HOSTNAME_ERROR,
ErrorLevel.ERROR,
{"invalid": ["host"]},
)
]
port = parameters.get("port")
if port is not None:
if isinstance(port, (int, str)):
try:
port = int(port)
if port <= 0 or port >= 65535:
port = -1
except (ValueError, TypeError):
port = -1
encryption = parameters.get("encryption", False)
if port is None or port == -1:
encryption = bool(encryption)
port = cls.default_port("http", encryption)
if port <= 0 or port >= 65535:
return [
SupersetError(
"Port must be a valid integer between 0 and 65535 (inclusive).",
SupersetErrorType.CONNECTION_INVALID_PORT_ERROR,
ErrorLevel.ERROR,
{"invalid": ["port"]},
)
]
if not is_port_open(host, port):
return [
SupersetError(
"The port is closed.",
SupersetErrorType.CONNECTION_PORT_CLOSED_ERROR,
ErrorLevel.ERROR,
{"invalid": ["port"]},
)
]
return []
@staticmethod
def _mutate_label(label: str) -> str:
"""
Suffix with the first six characters from the md5 of the label to avoid
collisions with original column names
:param label: Expected expression label
:return: Conditionally mutated label
"""
return f"{label}_{md5_sha_from_str(label)[:6]}"

View File

@ -0,0 +1,130 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Any, Optional
from unittest.mock import Mock
import pytest
from sqlalchemy.types import (
Boolean,
Date,
DateTime,
DECIMAL,
Float,
Integer,
String,
TypeEngine,
)
from urllib3.connection import HTTPConnection
from superset.utils.core import GenericDataType
from tests.unit_tests.db_engine_specs.utils import (
assert_column_spec,
assert_convert_dttm,
)
from tests.unit_tests.fixtures.common import dttm
@pytest.mark.parametrize(
"target_type,expected_result",
[
("Date", "to_date('2019-01-02')"),
("DateTime", "to_dateTime('2019-01-02 03:04:05')"),
("UnknownType", None),
],
)
def test_convert_dttm(
target_type: str, expected_result: Optional[str], dttm: datetime
) -> None:
from superset.db_engine_specs.databend import DatabendEngineSpec as spec
assert_convert_dttm(spec, target_type, expected_result, dttm)
def test_execute_connection_error() -> None:
from urllib3.exceptions import NewConnectionError
from superset.db_engine_specs.databend import DatabendEngineSpec
from superset.db_engine_specs.exceptions import SupersetDBAPIDatabaseError
cursor = Mock()
cursor.execute.side_effect = NewConnectionError(
HTTPConnection("Dummypool"), "Exception with sensitive data"
)
with pytest.raises(SupersetDBAPIDatabaseError) as ex:
DatabendEngineSpec.execute(cursor, "SELECT col1 from table1")
@pytest.mark.parametrize(
"native_type,sqla_type,attrs,generic_type,is_dttm",
[
("Varchar", String, None, GenericDataType.STRING, False),
("Nullable(Varchar)", String, None, GenericDataType.STRING, False),
("Array(UInt8)", String, None, GenericDataType.STRING, False),
("Int8", Integer, None, GenericDataType.NUMERIC, False),
("Int16", Integer, None, GenericDataType.NUMERIC, False),
("Int32", Integer, None, GenericDataType.NUMERIC, False),
("Int64", Integer, None, GenericDataType.NUMERIC, False),
("Int128", Integer, None, GenericDataType.NUMERIC, False),
("Int256", Integer, None, GenericDataType.NUMERIC, False),
("Nullable(Int64)", Integer, None, GenericDataType.NUMERIC, False),
("UInt8", Integer, None, GenericDataType.NUMERIC, False),
("UInt16", Integer, None, GenericDataType.NUMERIC, False),
("UInt32", Integer, None, GenericDataType.NUMERIC, False),
("UInt64", Integer, None, GenericDataType.NUMERIC, False),
("UInt128", Integer, None, GenericDataType.NUMERIC, False),
("UInt256", Integer, None, GenericDataType.NUMERIC, False),
("Float", Float, None, GenericDataType.NUMERIC, False),
("Double", Float, None, GenericDataType.NUMERIC, False),
("Decimal(1, 2)", DECIMAL, None, GenericDataType.NUMERIC, False),
("Decimal32(2)", DECIMAL, None, GenericDataType.NUMERIC, False),
("Decimal64(2)", DECIMAL, None, GenericDataType.NUMERIC, False),
("Decimal128(2)", DECIMAL, None, GenericDataType.NUMERIC, False),
("Decimal256(2)", DECIMAL, None, GenericDataType.NUMERIC, False),
("Bool", Boolean, None, GenericDataType.BOOLEAN, False),
("Nullable(Bool)", Boolean, None, GenericDataType.BOOLEAN, False),
("Date", Date, None, GenericDataType.TEMPORAL, True),
("Nullable(Date)", Date, None, GenericDataType.TEMPORAL, True),
("Datetime", DateTime, None, GenericDataType.TEMPORAL, True),
("Nullable(Datetime)", DateTime, None, GenericDataType.TEMPORAL, True),
],
)
def test_get_column_spec(
native_type: str,
sqla_type: type[TypeEngine],
attrs: Optional[dict[str, Any]],
generic_type: GenericDataType,
is_dttm: bool,
) -> None:
from superset.db_engine_specs.databend import DatabendConnectEngineSpec as spec
assert_column_spec(spec, native_type, sqla_type, attrs, generic_type, is_dttm)
@pytest.mark.parametrize(
"column_name,expected_result",
[
("time", "time_07cc69"),
("count", "count_e2942a"),
],
)
def test_make_label_compatible(column_name: str, expected_result: str) -> None:
from superset.db_engine_specs.databend import DatabendConnectEngineSpec as spec
label = spec.make_label_compatible(column_name)
assert label == expected_result