mirror of https://github.com/apache/superset.git
chore(metastore-cache): add codec support (#24586)
This commit is contained in:
parent
226c7f807d
commit
a4880cabd4
|
@ -49,6 +49,7 @@ from superset.advanced_data_type.plugins.internet_port import internet_port
|
||||||
from superset.advanced_data_type.types import AdvancedDataType
|
from superset.advanced_data_type.types import AdvancedDataType
|
||||||
from superset.constants import CHANGE_ME_SECRET_KEY
|
from superset.constants import CHANGE_ME_SECRET_KEY
|
||||||
from superset.jinja_context import BaseTemplateProcessor
|
from superset.jinja_context import BaseTemplateProcessor
|
||||||
|
from superset.key_value.types import JsonKeyValueCodec
|
||||||
from superset.stats_logger import DummyStatsLogger
|
from superset.stats_logger import DummyStatsLogger
|
||||||
from superset.superset_typing import CacheConfig
|
from superset.superset_typing import CacheConfig
|
||||||
from superset.tasks.types import ExecutorType
|
from superset.tasks.types import ExecutorType
|
||||||
|
@ -681,20 +682,32 @@ CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "NullCache"}
|
||||||
# Cache for datasource metadata and query results
|
# Cache for datasource metadata and query results
|
||||||
DATA_CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "NullCache"}
|
DATA_CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "NullCache"}
|
||||||
|
|
||||||
# Cache for dashboard filter state (`CACHE_TYPE` defaults to `SimpleCache` when
|
# Cache for dashboard filter state. `CACHE_TYPE` defaults to `SupersetMetastoreCache`
|
||||||
# running in debug mode unless overridden)
|
# that stores the values in the key-value table in the Superset metastore, as it's
|
||||||
|
# required for Superset to operate correctly, but can be replaced by any
|
||||||
|
# `Flask-Caching` backend.
|
||||||
FILTER_STATE_CACHE_CONFIG: CacheConfig = {
|
FILTER_STATE_CACHE_CONFIG: CacheConfig = {
|
||||||
|
"CACHE_TYPE": "SupersetMetastoreCache",
|
||||||
"CACHE_DEFAULT_TIMEOUT": int(timedelta(days=90).total_seconds()),
|
"CACHE_DEFAULT_TIMEOUT": int(timedelta(days=90).total_seconds()),
|
||||||
# should the timeout be reset when retrieving a cached value
|
# Should the timeout be reset when retrieving a cached value?
|
||||||
"REFRESH_TIMEOUT_ON_RETRIEVAL": True,
|
"REFRESH_TIMEOUT_ON_RETRIEVAL": True,
|
||||||
|
# The following parameter only applies to `MetastoreCache`:
|
||||||
|
# How should entries be serialized/deserialized?
|
||||||
|
"CODEC": JsonKeyValueCodec(),
|
||||||
}
|
}
|
||||||
|
|
||||||
# Cache for explore form data state (`CACHE_TYPE` defaults to `SimpleCache` when
|
# Cache for explore form data state. `CACHE_TYPE` defaults to `SupersetMetastoreCache`
|
||||||
# running in debug mode unless overridden)
|
# that stores the values in the key-value table in the Superset metastore, as it's
|
||||||
|
# required for Superset to operate correctly, but can be replaced by any
|
||||||
|
# `Flask-Caching` backend.
|
||||||
EXPLORE_FORM_DATA_CACHE_CONFIG: CacheConfig = {
|
EXPLORE_FORM_DATA_CACHE_CONFIG: CacheConfig = {
|
||||||
|
"CACHE_TYPE": "SupersetMetastoreCache",
|
||||||
"CACHE_DEFAULT_TIMEOUT": int(timedelta(days=7).total_seconds()),
|
"CACHE_DEFAULT_TIMEOUT": int(timedelta(days=7).total_seconds()),
|
||||||
# should the timeout be reset when retrieving a cached value
|
# Should the timeout be reset when retrieving a cached value?
|
||||||
"REFRESH_TIMEOUT_ON_RETRIEVAL": True,
|
"REFRESH_TIMEOUT_ON_RETRIEVAL": True,
|
||||||
|
# The following parameter only applies to `MetastoreCache`:
|
||||||
|
# How should entries be serialized/deserialized?
|
||||||
|
"CODEC": JsonKeyValueCodec(),
|
||||||
}
|
}
|
||||||
|
|
||||||
# store cache keys by datasource UID (via CacheKey) for custom processing/invalidation
|
# store cache keys by datasource UID (via CacheKey) for custom processing/invalidation
|
||||||
|
|
|
@ -14,26 +14,37 @@
|
||||||
# KIND, either express or implied. See the License for the
|
# KIND, either express or implied. See the License for the
|
||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
import logging
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
from uuid import UUID, uuid3
|
from uuid import UUID, uuid3
|
||||||
|
|
||||||
from flask import Flask
|
from flask import current_app, Flask, has_app_context
|
||||||
from flask_caching import BaseCache
|
from flask_caching import BaseCache
|
||||||
|
|
||||||
from superset.key_value.exceptions import KeyValueCreateFailedError
|
from superset.key_value.exceptions import KeyValueCreateFailedError
|
||||||
from superset.key_value.types import KeyValueResource, PickleKeyValueCodec
|
from superset.key_value.types import (
|
||||||
|
KeyValueCodec,
|
||||||
|
KeyValueResource,
|
||||||
|
PickleKeyValueCodec,
|
||||||
|
)
|
||||||
from superset.key_value.utils import get_uuid_namespace
|
from superset.key_value.utils import get_uuid_namespace
|
||||||
|
|
||||||
RESOURCE = KeyValueResource.METASTORE_CACHE
|
RESOURCE = KeyValueResource.METASTORE_CACHE
|
||||||
CODEC = PickleKeyValueCodec()
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class SupersetMetastoreCache(BaseCache):
|
class SupersetMetastoreCache(BaseCache):
|
||||||
def __init__(self, namespace: UUID, default_timeout: int = 300) -> None:
|
def __init__(
|
||||||
|
self,
|
||||||
|
namespace: UUID,
|
||||||
|
codec: KeyValueCodec,
|
||||||
|
default_timeout: int = 300,
|
||||||
|
) -> None:
|
||||||
super().__init__(default_timeout)
|
super().__init__(default_timeout)
|
||||||
self.namespace = namespace
|
self.namespace = namespace
|
||||||
|
self.codec = codec
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def factory(
|
def factory(
|
||||||
|
@ -41,6 +52,17 @@ class SupersetMetastoreCache(BaseCache):
|
||||||
) -> BaseCache:
|
) -> BaseCache:
|
||||||
seed = config.get("CACHE_KEY_PREFIX", "")
|
seed = config.get("CACHE_KEY_PREFIX", "")
|
||||||
kwargs["namespace"] = get_uuid_namespace(seed)
|
kwargs["namespace"] = get_uuid_namespace(seed)
|
||||||
|
codec = config.get("CODEC") or PickleKeyValueCodec()
|
||||||
|
if (
|
||||||
|
has_app_context()
|
||||||
|
and not current_app.debug
|
||||||
|
and isinstance(codec, PickleKeyValueCodec)
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
"Using PickleKeyValueCodec with SupersetMetastoreCache may be unsafe, "
|
||||||
|
"use at your own risk."
|
||||||
|
)
|
||||||
|
kwargs["codec"] = codec
|
||||||
return cls(*args, **kwargs)
|
return cls(*args, **kwargs)
|
||||||
|
|
||||||
def get_key(self, key: str) -> UUID:
|
def get_key(self, key: str) -> UUID:
|
||||||
|
@ -69,7 +91,7 @@ class SupersetMetastoreCache(BaseCache):
|
||||||
resource=RESOURCE,
|
resource=RESOURCE,
|
||||||
key=self.get_key(key),
|
key=self.get_key(key),
|
||||||
value=value,
|
value=value,
|
||||||
codec=CODEC,
|
codec=self.codec,
|
||||||
expires_on=self._get_expiry(timeout),
|
expires_on=self._get_expiry(timeout),
|
||||||
).run()
|
).run()
|
||||||
return True
|
return True
|
||||||
|
@ -82,7 +104,7 @@ class SupersetMetastoreCache(BaseCache):
|
||||||
CreateKeyValueCommand(
|
CreateKeyValueCommand(
|
||||||
resource=RESOURCE,
|
resource=RESOURCE,
|
||||||
value=value,
|
value=value,
|
||||||
codec=CODEC,
|
codec=self.codec,
|
||||||
key=self.get_key(key),
|
key=self.get_key(key),
|
||||||
expires_on=self._get_expiry(timeout),
|
expires_on=self._get_expiry(timeout),
|
||||||
).run()
|
).run()
|
||||||
|
@ -98,7 +120,7 @@ class SupersetMetastoreCache(BaseCache):
|
||||||
return GetKeyValueCommand(
|
return GetKeyValueCommand(
|
||||||
resource=RESOURCE,
|
resource=RESOURCE,
|
||||||
key=self.get_key(key),
|
key=self.get_key(key),
|
||||||
codec=CODEC,
|
codec=self.codec,
|
||||||
).run()
|
).run()
|
||||||
|
|
||||||
def has(self, key: str) -> bool:
|
def has(self, key: str) -> bool:
|
||||||
|
|
|
@ -16,17 +16,27 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from contextlib import nullcontext
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import TYPE_CHECKING
|
from typing import Any, TYPE_CHECKING
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from flask.ctx import AppContext
|
from flask.ctx import AppContext
|
||||||
from freezegun import freeze_time
|
from freezegun import freeze_time
|
||||||
|
|
||||||
|
from superset.key_value.exceptions import KeyValueCodecEncodeException
|
||||||
|
from superset.key_value.types import (
|
||||||
|
JsonKeyValueCodec,
|
||||||
|
KeyValueCodec,
|
||||||
|
PickleKeyValueCodec,
|
||||||
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from superset.extensions.metastore_cache import SupersetMetastoreCache
|
from superset.extensions.metastore_cache import SupersetMetastoreCache
|
||||||
|
|
||||||
|
NAMESPACE = UUID("ee173d1b-ccf3-40aa-941c-985c15224496")
|
||||||
|
|
||||||
FIRST_KEY = "foo"
|
FIRST_KEY = "foo"
|
||||||
FIRST_KEY_INITIAL_VALUE = {"foo": "bar"}
|
FIRST_KEY_INITIAL_VALUE = {"foo": "bar"}
|
||||||
FIRST_KEY_UPDATED_VALUE = "foo"
|
FIRST_KEY_UPDATED_VALUE = "foo"
|
||||||
|
@ -40,8 +50,9 @@ def cache() -> SupersetMetastoreCache:
|
||||||
from superset.extensions.metastore_cache import SupersetMetastoreCache
|
from superset.extensions.metastore_cache import SupersetMetastoreCache
|
||||||
|
|
||||||
return SupersetMetastoreCache(
|
return SupersetMetastoreCache(
|
||||||
namespace=UUID("ee173d1b-ccf3-40aa-941c-985c15224496"),
|
namespace=NAMESPACE,
|
||||||
default_timeout=600,
|
default_timeout=600,
|
||||||
|
codec=PickleKeyValueCodec(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -75,3 +86,37 @@ def test_expiry(app_context: AppContext, cache: SupersetMetastoreCache) -> None:
|
||||||
with freeze_time(dttm + delta + timedelta(seconds=1)):
|
with freeze_time(dttm + delta + timedelta(seconds=1)):
|
||||||
assert cache.has(FIRST_KEY) is False
|
assert cache.has(FIRST_KEY) is False
|
||||||
assert cache.get(FIRST_KEY) is None
|
assert cache.get(FIRST_KEY) is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"input_,codec,expected_result",
|
||||||
|
[
|
||||||
|
({"foo": "bar"}, JsonKeyValueCodec(), {"foo": "bar"}),
|
||||||
|
(("foo", "bar"), JsonKeyValueCodec(), ["foo", "bar"]),
|
||||||
|
(complex(1, 1), JsonKeyValueCodec(), KeyValueCodecEncodeException()),
|
||||||
|
({"foo": "bar"}, PickleKeyValueCodec(), {"foo": "bar"}),
|
||||||
|
(("foo", "bar"), PickleKeyValueCodec(), ("foo", "bar")),
|
||||||
|
(complex(1, 1), PickleKeyValueCodec(), complex(1, 1)),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_codec(
|
||||||
|
input_: Any,
|
||||||
|
codec: KeyValueCodec,
|
||||||
|
expected_result: Any,
|
||||||
|
app_context: AppContext,
|
||||||
|
) -> None:
|
||||||
|
from superset.extensions.metastore_cache import SupersetMetastoreCache
|
||||||
|
|
||||||
|
cache = SupersetMetastoreCache(
|
||||||
|
namespace=NAMESPACE,
|
||||||
|
default_timeout=600,
|
||||||
|
codec=codec,
|
||||||
|
)
|
||||||
|
cm = (
|
||||||
|
pytest.raises(type(expected_result))
|
||||||
|
if isinstance(expected_result, Exception)
|
||||||
|
else nullcontext()
|
||||||
|
)
|
||||||
|
with cm:
|
||||||
|
cache.set(FIRST_KEY, input_)
|
||||||
|
assert cache.get(FIRST_KEY) == expected_result
|
||||||
|
|
Loading…
Reference in New Issue