# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # pylint: disable=import-outside-toplevel, unused-argument, redefined-outer-name, invalid-name from functools import partial from typing import Any, TYPE_CHECKING import pytest from pytest_mock import MockerFixture from sqlalchemy.orm.session import Session from superset import db if TYPE_CHECKING: from superset.connectors.sqla.models import SqlaTable FULL_DTTM_DEFAULTS_EXAMPLE = { "main_dttm_col": "id", "dttm_columns": { "dttm": { "python_date_format": "epoch_s", "expression": "CAST(dttm as INTEGER)", }, "id": {"python_date_format": "epoch_ms"}, "month": { "python_date_format": "%Y-%m-%d", "expression": ( "CASE WHEN length(month) = 7 THEN month || '-01' ELSE month END" ), }, }, } def apply_dttm_defaults(table: "SqlaTable", dttm_defaults: dict[str, Any]) -> None: """Applies dttm defaults to the table, mutates in place.""" for dbcol in table.columns: # Set is_dttm is column is listed in dttm_columns. if dbcol.column_name in dttm_defaults.get("dttm_columns", {}): dbcol.is_dttm = True # Skip non dttm columns. if dbcol.column_name not in dttm_defaults.get("dttm_columns", {}): continue # Set table main_dttm_col. if dbcol.column_name == dttm_defaults.get("main_dttm_col"): table.main_dttm_col = dbcol.column_name # Apply defaults if empty. dttm_column_defaults = dttm_defaults.get("dttm_columns", {}).get( dbcol.column_name, {} ) dbcol.is_dttm = True if ( not dbcol.python_date_format and "python_date_format" in dttm_column_defaults ): dbcol.python_date_format = dttm_column_defaults["python_date_format"] if not dbcol.expression and "expression" in dttm_column_defaults: dbcol.expression = dttm_column_defaults["expression"] @pytest.fixture def test_table(session: Session) -> "SqlaTable": """ Fixture that generates an in-memory table. """ from superset.connectors.sqla.models import SqlaTable, TableColumn from superset.models.core import Database engine = db.session.get_bind() SqlaTable.metadata.create_all(engine) # pylint: disable=no-member columns = [ TableColumn(column_name="ds", is_dttm=1, type="TIMESTAMP"), TableColumn(column_name="event_time", is_dttm=1, type="TIMESTAMP"), TableColumn(column_name="id", type="INTEGER"), TableColumn(column_name="dttm", type="INTEGER"), TableColumn(column_name="duration_ms", type="INTEGER"), ] return SqlaTable( table_name="test_table", columns=columns, metrics=[], main_dttm_col=None, database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"), ) def test_main_dttm_col(mocker: MockerFixture, test_table: "SqlaTable") -> None: """ Test the ``SQLA_TABLE_MUTATOR`` config. """ dttm_defaults = { "main_dttm_col": "event_time", "dttm_columns": {"ds": {}, "event_time": {}}, } mocker.patch( "superset.connectors.sqla.models.config", new={ "SQLA_TABLE_MUTATOR": partial( apply_dttm_defaults, dttm_defaults=dttm_defaults, ) }, ) mocker.patch( "superset.connectors.sqla.models.get_physical_table_metadata", return_value=[ {"column_name": "ds", "type": "TIMESTAMP", "is_dttm": True}, {"column_name": "event_time", "type": "TIMESTAMP", "is_dttm": True}, {"column_name": "id", "type": "INTEGER", "is_dttm": False}, ], ) assert test_table.main_dttm_col is None test_table.fetch_metadata() assert test_table.main_dttm_col == "event_time" def test_main_dttm_col_nonexistent( mocker: MockerFixture, test_table: "SqlaTable", ) -> None: """ Test the ``SQLA_TABLE_MUTATOR`` config when main datetime column doesn't exist. """ dttm_defaults = { "main_dttm_col": "nonexistent", } mocker.patch( "superset.connectors.sqla.models.config", new={ "SQLA_TABLE_MUTATOR": partial( apply_dttm_defaults, dttm_defaults=dttm_defaults, ) }, ) mocker.patch( "superset.connectors.sqla.models.get_physical_table_metadata", return_value=[ {"column_name": "ds", "type": "TIMESTAMP", "is_dttm": True}, {"column_name": "event_time", "type": "TIMESTAMP", "is_dttm": True}, {"column_name": "id", "type": "INTEGER", "is_dttm": False}, ], ) assert test_table.main_dttm_col is None test_table.fetch_metadata() # fall back to ds assert test_table.main_dttm_col == "ds" def test_main_dttm_col_nondttm( mocker: MockerFixture, test_table: "SqlaTable", ) -> None: """ Test the ``SQLA_TABLE_MUTATOR`` config when main datetime column has wrong type. """ dttm_defaults = { "main_dttm_col": "id", } mocker.patch( "superset.connectors.sqla.models.config", new={ "SQLA_TABLE_MUTATOR": partial( apply_dttm_defaults, dttm_defaults=dttm_defaults, ) }, ) mocker.patch( "superset.connectors.sqla.models.get_physical_table_metadata", return_value=[ {"column_name": "ds", "type": "TIMESTAMP", "is_dttm": True}, {"column_name": "event_time", "type": "TIMESTAMP", "is_dttm": True}, {"column_name": "id", "type": "INTEGER", "is_dttm": False}, ], ) assert test_table.main_dttm_col is None test_table.fetch_metadata() # fall back to ds assert test_table.main_dttm_col == "ds" def test_python_date_format_by_column_name( mocker: MockerFixture, test_table: "SqlaTable", ) -> None: """ Test the ``SQLA_TABLE_MUTATOR`` setting for "python_date_format". """ table_defaults = { "dttm_columns": { "id": {"python_date_format": "epoch_ms"}, "dttm": {"python_date_format": "epoch_s"}, }, } mocker.patch( "superset.connectors.sqla.models.config", new={ "SQLA_TABLE_MUTATOR": partial( apply_dttm_defaults, dttm_defaults=table_defaults, ) }, ) mocker.patch( "superset.connectors.sqla.models.get_physical_table_metadata", return_value=[ {"column_name": "id", "type": "INTEGER", "is_dttm": False}, {"column_name": "dttm", "type": "INTEGER", "is_dttm": False}, ], ) test_table.fetch_metadata() id_col = [c for c in test_table.columns if c.column_name == "id"][0] assert id_col.is_dttm assert id_col.python_date_format == "epoch_ms" dttm_col = [c for c in test_table.columns if c.column_name == "dttm"][0] assert dttm_col.is_dttm assert dttm_col.python_date_format == "epoch_s" def test_expression_by_column_name( mocker: MockerFixture, test_table: "SqlaTable", ) -> None: """ Test the ``SQLA_TABLE_MUTATOR`` setting for expression. """ table_defaults = { "dttm_columns": { "dttm": {"expression": "CAST(dttm as INTEGER)"}, "duration_ms": {"expression": "CAST(duration_ms as DOUBLE)"}, }, } mocker.patch( "superset.connectors.sqla.models.config", new={ "SQLA_TABLE_MUTATOR": partial( apply_dttm_defaults, dttm_defaults=table_defaults, ) }, ) mocker.patch( "superset.connectors.sqla.models.get_physical_table_metadata", return_value=[ {"column_name": "dttm", "type": "INTEGER", "is_dttm": False}, {"column_name": "duration_ms", "type": "INTEGER", "is_dttm": False}, ], ) test_table.fetch_metadata() dttm_col = [c for c in test_table.columns if c.column_name == "dttm"][0] assert dttm_col.is_dttm assert dttm_col.expression == "CAST(dttm as INTEGER)" duration_ms_col = [c for c in test_table.columns if c.column_name == "duration_ms"][ 0 ] assert duration_ms_col.is_dttm assert duration_ms_col.expression == "CAST(duration_ms as DOUBLE)" def test_full_setting( mocker: MockerFixture, test_table: "SqlaTable", ) -> None: """ Test the ``SQLA_TABLE_MUTATOR`` with full settings. """ mocker.patch( "superset.connectors.sqla.models.config", new={ "SQLA_TABLE_MUTATOR": partial( apply_dttm_defaults, dttm_defaults=FULL_DTTM_DEFAULTS_EXAMPLE, ) }, ) mocker.patch( "superset.connectors.sqla.models.get_physical_table_metadata", return_value=[ {"column_name": "id", "type": "INTEGER", "is_dttm": False}, {"column_name": "dttm", "type": "INTEGER", "is_dttm": False}, {"column_name": "duration_ms", "type": "INTEGER", "is_dttm": False}, ], ) test_table.fetch_metadata() id_col = [c for c in test_table.columns if c.column_name == "id"][0] assert id_col.is_dttm assert id_col.python_date_format == "epoch_ms" assert id_col.expression == "" dttm_col = [c for c in test_table.columns if c.column_name == "dttm"][0] assert dttm_col.is_dttm assert dttm_col.python_date_format == "epoch_s" assert dttm_col.expression == "CAST(dttm as INTEGER)"