feat(key-value): add superset metastore cache (#19232)

This commit is contained in:
Ville Brofeldt 2022-03-21 19:46:56 +02:00 committed by GitHub
parent 82a6811e7e
commit 72b9a7fa5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 540 additions and 45 deletions

View File

@ -7,7 +7,7 @@ version: 1
## Caching ## Caching
Superset uses [Flask-Caching](https://flask-caching.readthedocs.io/) for caching purpose. Configuring caching is as easy as providing a custom cache config in your Superset uses [Flask-Caching](https://flask-caching.readthedocs.io/) for caching purposes. Configuring caching is as easy as providing a custom cache config in your
`superset_config.py` that complies with [the Flask-Caching specifications](https://flask-caching.readthedocs.io/en/latest/#configuring-flask-caching). `superset_config.py` that complies with [the Flask-Caching specifications](https://flask-caching.readthedocs.io/en/latest/#configuring-flask-caching).
Flask-Caching supports various caching backends, including Redis, Memcached, SimpleCache (in-memory), or the Flask-Caching supports various caching backends, including Redis, Memcached, SimpleCache (in-memory), or the
local filesystem. Custom cache backends are also supported. See [here](https://flask-caching.readthedocs.io/en/latest/#custom-cache-backends) for specifics. local filesystem. Custom cache backends are also supported. See [here](https://flask-caching.readthedocs.io/en/latest/#custom-cache-backends) for specifics.
@ -18,10 +18,17 @@ The following cache configurations can be customized:
- Dashboard filter state (required): `FILTER_STATE_CACHE_CONFIG`. - Dashboard filter state (required): `FILTER_STATE_CACHE_CONFIG`.
- Explore chart form data (required): `EXPLORE_FORM_DATA_CACHE_CONFIG` - Explore chart form data (required): `EXPLORE_FORM_DATA_CACHE_CONFIG`
Please note, that Dashboard and Explore caching is required. When running Superset in debug mode, both Explore and Dashboard caches will default to `SimpleCache`; Please note, that Dashboard and Explore caching is required. If these caches are undefined, Superset falls back to using a built-in cache that stores data
However, trying to run Superset in non-debug mode without defining a cache for these will cause the application to fail on startup. When running in the metadata database. While it is recommended to use a dedicated cache, the built-in cache can also be used to cache other data.
superset in single-worker mode, any cache backend is supported. However, when running Superset in on a multi-worker setup, a dedicated cache is required. For this For example, to use the built-in cache to store chart data, use the following config:
we recommend using either Redis or Memcached:
```python
DATA_CACHE_CONFIG = {
"CACHE_TYPE": "SupersetMetastoreCache",
"CACHE_KEY_PREFIX": "superset_results", # make sure this string is unique to avoid collisions
"CACHE_DEFAULT_TIMEOUT": 86400, # 60 seconds * 60 minutes * 24 hours
}
```
- Redis (recommended): we recommend the [redis](https://pypi.python.org/pypi/redis) Python package - Redis (recommended): we recommend the [redis](https://pypi.python.org/pypi/redis) Python package
- Memcached: we recommend using [pylibmc](https://pypi.org/project/pylibmc/) client library as - Memcached: we recommend using [pylibmc](https://pypi.org/project/pylibmc/) client library as

View File

@ -51,7 +51,10 @@ class CreateDashboardPermalinkCommand(BaseDashboardPermalinkCommand):
"state": self.state, "state": self.state,
} }
return CreateKeyValueCommand( return CreateKeyValueCommand(
self.actor, self.resource, value, self.key_type actor=self.actor,
resource=self.resource,
value=value,
key_type=self.key_type,
).run() ).run()
except SQLAlchemyError as ex: except SQLAlchemyError as ex:
logger.exception("Error running create command") logger.exception("Error running create command")

View File

@ -44,7 +44,7 @@ class GetDashboardPermalinkCommand(BaseDashboardPermalinkCommand):
self.validate() self.validate()
try: try:
command = GetKeyValueCommand( command = GetKeyValueCommand(
self.resource, self.key, key_type=self.key_type resource=self.resource, key=self.key, key_type=self.key_type
) )
value: Optional[DashboardPermalinkValue] = command.run() value: Optional[DashboardPermalinkValue] = command.run()
if value: if value:

View File

@ -49,7 +49,10 @@ class CreateExplorePermalinkCommand(BaseExplorePermalinkCommand):
"state": self.state, "state": self.state,
} }
command = CreateKeyValueCommand( command = CreateKeyValueCommand(
self.actor, self.resource, value, self.key_type actor=self.actor,
resource=self.resource,
value=value,
key_type=self.key_type,
) )
return command.run() return command.run()
except SQLAlchemyError as ex: except SQLAlchemyError as ex:

View File

@ -44,7 +44,7 @@ class GetExplorePermalinkCommand(BaseExplorePermalinkCommand):
self.validate() self.validate()
try: try:
value: Optional[ExplorePermalinkValue] = GetKeyValueCommand( value: Optional[ExplorePermalinkValue] = GetKeyValueCommand(
self.resource, self.key, key_type=self.key_type resource=self.resource, key=self.key, key_type=self.key_type
).run() ).run()
if value: if value:
chart_id: Optional[int] = value.get("chartId") chart_id: Optional[int] = value.get("chartId")

View File

@ -16,6 +16,7 @@
# under the License. # under the License.
import json import json
import os import os
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional from typing import Any, Callable, Dict, List, Optional
import celery import celery
@ -108,7 +109,7 @@ class ProfilingExtension: # pylint: disable=too-few-public-methods
app.wsgi_app = SupersetProfiler(app.wsgi_app, self.interval) # type: ignore app.wsgi_app = SupersetProfiler(app.wsgi_app, self.interval) # type: ignore
APP_DIR = os.path.dirname(__file__) APP_DIR = os.path.join(os.path.dirname(__file__), os.path.pardir)
appbuilder = AppBuilder(update_perms=False) appbuilder = AppBuilder(update_perms=False)
async_query_manager = AsyncQueryManager() async_query_manager = AsyncQueryManager()
cache_manager = CacheManager() cache_manager = CacheManager()

View File

@ -0,0 +1,117 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime, timedelta
from hashlib import md5
from typing import Any, Dict, List, Optional
from uuid import UUID, uuid3
from flask import Flask
from flask_caching import BaseCache
from superset.key_value.exceptions import KeyValueCreateFailedError
from superset.key_value.types import KeyType
RESOURCE = "superset_metastore_cache"
KEY_TYPE: KeyType = "uuid"
class SupersetMetastoreCache(BaseCache):
def __init__(self, namespace: UUID, default_timeout: int = 300) -> None:
super().__init__(default_timeout)
self.namespace = namespace
@classmethod
def factory(
cls, app: Flask, config: Dict[str, Any], args: List[Any], kwargs: Dict[str, Any]
) -> BaseCache:
# base namespace for generating deterministic UUIDs
md5_obj = md5()
seed = config.get("CACHE_KEY_PREFIX", "")
md5_obj.update(seed.encode("utf-8"))
kwargs["namespace"] = UUID(md5_obj.hexdigest())
return cls(*args, **kwargs)
def get_key(self, key: str) -> str:
return str(uuid3(self.namespace, key))
@staticmethod
def _prune() -> None:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.delete_expired import (
DeleteExpiredKeyValueCommand,
)
DeleteExpiredKeyValueCommand(resource=RESOURCE).run()
def _get_expiry(self, timeout: Optional[int]) -> Optional[datetime]:
timeout = self._normalize_timeout(timeout)
if timeout is not None and timeout > 0:
return datetime.now() + timedelta(seconds=timeout)
return None
def set(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.upsert import UpsertKeyValueCommand
UpsertKeyValueCommand(
resource=RESOURCE,
key_type=KEY_TYPE,
key=self.get_key(key),
value=value,
expires_on=self._get_expiry(timeout),
).run()
return True
def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.create import CreateKeyValueCommand
try:
CreateKeyValueCommand(
resource=RESOURCE,
value=value,
key_type=KEY_TYPE,
key=self.get_key(key),
expires_on=self._get_expiry(timeout),
).run()
self._prune()
return True
except KeyValueCreateFailedError:
return False
def get(self, key: str) -> Any:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.get import GetKeyValueCommand
return GetKeyValueCommand(
resource=RESOURCE, key_type=KEY_TYPE, key=self.get_key(key),
).run()
def has(self, key: str) -> bool:
entry = self.get(key)
if entry:
return True
return False
def delete(self, key: str) -> Any:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.delete import DeleteKeyValueCommand
return DeleteKeyValueCommand(
resource=RESOURCE, key_type=KEY_TYPE, key=self.get_key(key),
).run()

View File

@ -18,6 +18,7 @@ import logging
import pickle import pickle
from datetime import datetime from datetime import datetime
from typing import Any, Optional from typing import Any, Optional
from uuid import UUID
from flask_appbuilder.security.sqla.models import User from flask_appbuilder.security.sqla.models import User
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
@ -33,18 +34,20 @@ logger = logging.getLogger(__name__)
class CreateKeyValueCommand(BaseCommand): class CreateKeyValueCommand(BaseCommand):
actor: User actor: Optional[User]
resource: str resource: str
value: Any value: Any
key_type: KeyType key_type: KeyType
key: Optional[str]
expires_on: Optional[datetime] expires_on: Optional[datetime]
def __init__( def __init__(
self, self,
actor: User,
resource: str, resource: str,
value: Any, value: Any,
key_type: KeyType, key_type: KeyType = "uuid",
actor: Optional[User] = None,
key: Optional[str] = None,
expires_on: Optional[datetime] = None, expires_on: Optional[datetime] = None,
): ):
""" """
@ -53,6 +56,8 @@ class CreateKeyValueCommand(BaseCommand):
:param resource: the resource (dashboard, chart etc) :param resource: the resource (dashboard, chart etc)
:param value: the value to persist in the key-value store :param value: the value to persist in the key-value store
:param key_type: the type of the key to return :param key_type: the type of the key to return
:param actor: the user performing the command
:param key: id of entry (autogenerated if undefined)
:param expires_on: entry expiration time :param expires_on: entry expiration time
:return: the key associated with the persisted value :return: the key associated with the persisted value
""" """
@ -60,12 +65,14 @@ class CreateKeyValueCommand(BaseCommand):
self.actor = actor self.actor = actor
self.value = value self.value = value
self.key_type = key_type self.key_type = key_type
self.key = key
self.expires_on = expires_on self.expires_on = expires_on
def run(self) -> str: def run(self) -> str:
try: try:
return self.create() return self.create()
except SQLAlchemyError as ex: except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running create command") logger.exception("Error running create command")
raise KeyValueCreateFailedError() from ex raise KeyValueCreateFailedError() from ex
@ -77,9 +84,19 @@ class CreateKeyValueCommand(BaseCommand):
resource=self.resource, resource=self.resource,
value=pickle.dumps(self.value), value=pickle.dumps(self.value),
created_on=datetime.now(), created_on=datetime.now(),
created_by_fk=None if self.actor.is_anonymous else self.actor.id, created_by_fk=None
if self.actor is None or self.actor.is_anonymous
else self.actor.id,
expires_on=self.expires_on, expires_on=self.expires_on,
) )
if self.key is not None:
try:
if self.key_type == "uuid":
entry.uuid = UUID(self.key)
else:
entry.id = int(self.key)
except ValueError as ex:
raise KeyValueCreateFailedError() from ex
db.session.add(entry) db.session.add(entry)
db.session.commit() db.session.commit()
return extract_key(entry, self.key_type) return extract_key(entry, self.key_type)

View File

@ -15,6 +15,7 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
import logging import logging
from typing import Optional
from flask_appbuilder.security.sqla.models import User from flask_appbuilder.security.sqla.models import User
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
@ -30,13 +31,12 @@ logger = logging.getLogger(__name__)
class DeleteKeyValueCommand(BaseCommand): class DeleteKeyValueCommand(BaseCommand):
actor: User
key: str key: str
key_type: KeyType key_type: KeyType
resource: str resource: str
def __init__( def __init__(
self, actor: User, resource: str, key: str, key_type: KeyType = "uuid" self, resource: str, key: str, key_type: KeyType = "uuid",
): ):
""" """
Delete a key-value pair Delete a key-value pair
@ -47,7 +47,6 @@ class DeleteKeyValueCommand(BaseCommand):
:return: was the entry deleted or not :return: was the entry deleted or not
""" """
self.resource = resource self.resource = resource
self.actor = actor
self.key = key self.key = key
self.key_type = key_type self.key_type = key_type
@ -55,6 +54,7 @@ class DeleteKeyValueCommand(BaseCommand):
try: try:
return self.delete() return self.delete()
except SQLAlchemyError as ex: except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running delete command") logger.exception("Error running delete command")
raise KeyValueDeleteFailedError() from ex raise KeyValueDeleteFailedError() from ex

View File

@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
from superset import db
from superset.commands.base import BaseCommand
from superset.key_value.exceptions import KeyValueDeleteFailedError
from superset.key_value.models import KeyValueEntry
logger = logging.getLogger(__name__)
class DeleteExpiredKeyValueCommand(BaseCommand):
resource: str
def __init__(self, resource: str):
"""
Delete all expired key-value pairs
:param resource: the resource (dashboard, chart etc)
:return: was the entry deleted or not
"""
self.resource = resource
def run(self) -> None:
try:
self.delete_expired()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running delete command")
raise KeyValueDeleteFailedError() from ex
def validate(self) -> None:
pass
@staticmethod
def delete_expired() -> None:
(
db.session.query(KeyValueEntry)
.filter(KeyValueEntry.expires_on <= datetime.now())
.delete()
)
db.session.commit()

View File

@ -34,7 +34,7 @@ logger = logging.getLogger(__name__)
class UpdateKeyValueCommand(BaseCommand): class UpdateKeyValueCommand(BaseCommand):
actor: User actor: Optional[User]
resource: str resource: str
value: Any value: Any
key: str key: str
@ -43,10 +43,10 @@ class UpdateKeyValueCommand(BaseCommand):
def __init__( def __init__(
self, self,
actor: User,
resource: str, resource: str,
key: str, key: str,
value: Any, value: Any,
actor: Optional[User] = None,
key_type: KeyType = "uuid", key_type: KeyType = "uuid",
expires_on: Optional[datetime] = None, expires_on: Optional[datetime] = None,
): ):
@ -56,6 +56,7 @@ class UpdateKeyValueCommand(BaseCommand):
:param resource: the resource (dashboard, chart etc) :param resource: the resource (dashboard, chart etc)
:param key: the key to update :param key: the key to update
:param value: the value to persist in the key-value store :param value: the value to persist in the key-value store
:param actor: the user performing the command
:param key_type: the type of the key to update :param key_type: the type of the key to update
:param expires_on: entry expiration time :param expires_on: entry expiration time
:return: the key associated with the updated value :return: the key associated with the updated value
@ -71,6 +72,7 @@ class UpdateKeyValueCommand(BaseCommand):
try: try:
return self.update() return self.update()
except SQLAlchemyError as ex: except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running update command") logger.exception("Error running update command")
raise KeyValueUpdateFailedError() from ex raise KeyValueUpdateFailedError() from ex
@ -89,8 +91,11 @@ class UpdateKeyValueCommand(BaseCommand):
entry.value = pickle.dumps(self.value) entry.value = pickle.dumps(self.value)
entry.expires_on = self.expires_on entry.expires_on = self.expires_on
entry.changed_on = datetime.now() entry.changed_on = datetime.now()
entry.changed_by_fk = None if self.actor.is_anonymous else self.actor.id entry.changed_by_fk = (
None if self.actor is None or self.actor.is_anonymous else self.actor.id
)
db.session.merge(entry) db.session.merge(entry)
db.session.commit() db.session.commit()
return extract_key(entry, self.key_type) return extract_key(entry, self.key_type)
return None return None

View File

@ -0,0 +1,109 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import pickle
from datetime import datetime
from typing import Any, Optional
from flask_appbuilder.security.sqla.models import User
from sqlalchemy.exc import SQLAlchemyError
from superset import db
from superset.commands.base import BaseCommand
from superset.key_value.commands.create import CreateKeyValueCommand
from superset.key_value.exceptions import KeyValueUpdateFailedError
from superset.key_value.models import KeyValueEntry
from superset.key_value.types import KeyType
from superset.key_value.utils import extract_key, get_filter
logger = logging.getLogger(__name__)
class UpsertKeyValueCommand(BaseCommand):
actor: Optional[User]
resource: str
value: Any
key: str
key_type: KeyType
expires_on: Optional[datetime]
def __init__(
self,
resource: str,
key: str,
value: Any,
actor: Optional[User] = None,
key_type: KeyType = "uuid",
expires_on: Optional[datetime] = None,
):
"""
Upsert a key value entry
:param resource: the resource (dashboard, chart etc)
:param key: the key to update
:param value: the value to persist in the key-value store
:param key_type: the type of the key to update
:param actor: the user performing the command
:param expires_on: entry expiration time
:return: the key associated with the updated value
"""
self.actor = actor
self.resource = resource
self.key = key
self.value = value
self.key_type = key_type
self.expires_on = expires_on
def run(self) -> Optional[str]:
try:
return self.upsert()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running update command")
raise KeyValueUpdateFailedError() from ex
def validate(self) -> None:
pass
def upsert(self) -> Optional[str]:
filter_ = get_filter(self.resource, self.key, self.key_type)
entry: KeyValueEntry = (
db.session.query(KeyValueEntry)
.filter_by(**filter_)
.autoflush(False)
.first()
)
if entry:
entry.value = pickle.dumps(self.value)
entry.expires_on = self.expires_on
entry.changed_on = datetime.now()
entry.changed_by_fk = (
None if self.actor is None or self.actor.is_anonymous else self.actor.id
)
db.session.merge(entry)
db.session.commit()
return extract_key(entry, self.key_type)
else:
return CreateKeyValueCommand(
resource=self.resource,
value=self.value,
key_type=self.key_type,
actor=self.actor,
key=self.key,
expires_on=self.expires_on,
).run()

View File

@ -15,14 +15,14 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
import logging import logging
import math
from flask import Flask from flask import Flask
from flask_babel import gettext as _
from flask_caching import Cache from flask_caching import Cache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CACHE_IMPORT_PATH = "superset.extensions.metastore_cache.SupersetMetastoreCache"
class CacheManager: class CacheManager:
def __init__(self) -> None: def __init__(self) -> None:
@ -40,27 +40,24 @@ class CacheManager:
) -> None: ) -> None:
cache_config = app.config[cache_config_key] cache_config = app.config[cache_config_key]
cache_type = cache_config.get("CACHE_TYPE") cache_type = cache_config.get("CACHE_TYPE")
if app.debug and cache_type is None: if required and cache_type in (None, "SupersetMetastoreCache"):
cache_threshold = cache_config.get("CACHE_THRESHOLD", math.inf) if cache_type is None:
logger.warning(
"Falling back to the built-in cache, that stores data in the "
"metadata database, for the followinng cache: `%s`. "
"It is recommended to use `RedisCache`, `MemcachedCache` or "
"another dedicated caching backend for production deployments",
cache_config_key,
)
cache_key_prefix = cache_config.get("CACHE_KEY_PREFIX", cache_config_key)
cache_config.update( cache_config.update(
{"CACHE_TYPE": "SimpleCache", "CACHE_THRESHOLD": cache_threshold,} {"CACHE_TYPE": CACHE_IMPORT_PATH, "CACHE_KEY_PREFIX": cache_key_prefix}
) )
if "CACHE_DEFAULT_TIMEOUT" not in cache_config: if "CACHE_DEFAULT_TIMEOUT" not in cache_config:
default_timeout = app.config.get("CACHE_DEFAULT_TIMEOUT") default_timeout = app.config.get("CACHE_DEFAULT_TIMEOUT")
cache_config["CACHE_DEFAULT_TIMEOUT"] = default_timeout cache_config["CACHE_DEFAULT_TIMEOUT"] = default_timeout
if required and cache_type in ("null", "NullCache"):
raise Exception(
_(
"The CACHE_TYPE `%(cache_type)s` for `%(cache_config_key)s` is not "
"supported. It is recommended to use `RedisCache`, "
"`MemcachedCache` or another dedicated caching backend for "
"production deployments",
cache_type=cache_config["CACHE_TYPE"],
cache_config_key=cache_config_key,
),
)
cache.init_app(app, cache_config) cache.init_app(app, cache_config)
def init_app(self, app: Flask) -> None: def init_app(self, app: Flask) -> None:

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from uuid import UUID
import pytest
from flask.ctx import AppContext
from freezegun import freeze_time
if TYPE_CHECKING:
from superset.extensions.metastore_cache import SupersetMetastoreCache
FIRST_KEY = "foo"
FIRST_KEY_INITIAL_VALUE = {"foo": "bar"}
FIRST_KEY_UPDATED_VALUE = "foo"
SECOND_KEY = "baz"
SECOND_VALUE = "qwerty"
@pytest.fixture
def cache() -> SupersetMetastoreCache:
from superset.extensions.metastore_cache import SupersetMetastoreCache
return SupersetMetastoreCache(
namespace=UUID("ee173d1b-ccf3-40aa-941c-985c15224496"), default_timeout=600,
)
def test_caching_flow(app_context: AppContext, cache: SupersetMetastoreCache) -> None:
assert cache.has(FIRST_KEY) is False
assert cache.add(FIRST_KEY, FIRST_KEY_INITIAL_VALUE) is True
assert cache.has(FIRST_KEY) is True
cache.set(SECOND_KEY, SECOND_VALUE)
assert cache.get(FIRST_KEY) == FIRST_KEY_INITIAL_VALUE
assert cache.get(SECOND_KEY) == SECOND_VALUE
assert cache.add(FIRST_KEY, FIRST_KEY_UPDATED_VALUE) is False
assert cache.get(FIRST_KEY) == FIRST_KEY_INITIAL_VALUE
assert cache.set(FIRST_KEY, FIRST_KEY_UPDATED_VALUE) == True
assert cache.get(FIRST_KEY) == FIRST_KEY_UPDATED_VALUE
cache.delete(FIRST_KEY)
assert cache.has(FIRST_KEY) is False
assert cache.get(FIRST_KEY) is None
assert cache.has(SECOND_KEY)
assert cache.get(SECOND_KEY) == SECOND_VALUE
def test_expiry(app_context: AppContext, cache: SupersetMetastoreCache) -> None:
delta = timedelta(days=90)
dttm = datetime(2022, 3, 18, 0, 0, 0)
with freeze_time(dttm):
cache.set(FIRST_KEY, FIRST_KEY_INITIAL_VALUE, int(delta.total_seconds()))
assert cache.get(FIRST_KEY) == FIRST_KEY_INITIAL_VALUE
with freeze_time(dttm + delta - timedelta(seconds=1)):
assert cache.has(FIRST_KEY)
assert cache.get(FIRST_KEY) == FIRST_KEY_INITIAL_VALUE
with freeze_time(dttm + delta + timedelta(seconds=1)):
assert cache.has(FIRST_KEY) is False
assert cache.get(FIRST_KEY) is None

View File

@ -56,9 +56,7 @@ def test_delete_id_entry(
from superset.key_value.models import KeyValueEntry from superset.key_value.models import KeyValueEntry
assert ( assert (
DeleteKeyValueCommand( DeleteKeyValueCommand(resource=RESOURCE, key=ID_KEY, key_type="id",).run()
actor=admin, resource=RESOURCE, key=ID_KEY, key_type="id",
).run()
is True is True
) )
@ -70,9 +68,7 @@ def test_delete_uuid_entry(
from superset.key_value.models import KeyValueEntry from superset.key_value.models import KeyValueEntry
assert ( assert (
DeleteKeyValueCommand( DeleteKeyValueCommand(resource=RESOURCE, key=UUID_KEY, key_type="uuid").run()
actor=admin, resource=RESOURCE, key=UUID_KEY, key_type="uuid",
).run()
is True is True
) )
@ -84,8 +80,6 @@ def test_delete_entry_missing(
from superset.key_value.models import KeyValueEntry from superset.key_value.models import KeyValueEntry
assert ( assert (
DeleteKeyValueCommand( DeleteKeyValueCommand(resource=RESOURCE, key="456", key_type="id").run()
actor=admin, resource=RESOURCE, key="456", key_type="id",
).run()
is False is False
) )

View File

@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import pickle
from typing import TYPE_CHECKING
from uuid import UUID
from flask.ctx import AppContext
from flask_appbuilder.security.sqla.models import User
from superset.extensions import db
from tests.integration_tests.key_value.commands.fixtures import (
admin,
ID_KEY,
key_value_entry,
RESOURCE,
UUID_KEY,
)
if TYPE_CHECKING:
from superset.key_value.models import KeyValueEntry
NEW_VALUE = "new value"
def test_upsert_id_entry(
app_context: AppContext, admin: User, key_value_entry: KeyValueEntry,
) -> None:
from superset.key_value.commands.upsert import UpsertKeyValueCommand
from superset.key_value.models import KeyValueEntry
key = UpsertKeyValueCommand(
actor=admin, resource=RESOURCE, key=ID_KEY, value=NEW_VALUE, key_type="id",
).run()
assert key == ID_KEY
entry = (
db.session.query(KeyValueEntry).filter_by(id=int(ID_KEY)).autoflush(False).one()
)
assert pickle.loads(entry.value) == NEW_VALUE
assert entry.changed_by_fk == admin.id
def test_upsert_uuid_entry(
app_context: AppContext, admin: User, key_value_entry: KeyValueEntry,
) -> None:
from superset.key_value.commands.upsert import UpsertKeyValueCommand
from superset.key_value.models import KeyValueEntry
key = UpsertKeyValueCommand(
actor=admin, resource=RESOURCE, key=UUID_KEY, value=NEW_VALUE, key_type="uuid",
).run()
assert key == UUID_KEY
entry = (
db.session.query(KeyValueEntry)
.filter_by(uuid=UUID(UUID_KEY))
.autoflush(False)
.one()
)
assert pickle.loads(entry.value) == NEW_VALUE
assert entry.changed_by_fk == admin.id
def test_upsert_missing_entry(
app_context: AppContext, admin: User, key_value_entry: KeyValueEntry,
) -> None:
from superset.key_value.commands.upsert import UpsertKeyValueCommand
from superset.key_value.models import KeyValueEntry
key = UpsertKeyValueCommand(
actor=admin, resource=RESOURCE, key="456", value=NEW_VALUE, key_type="id",
).run()
assert key == "456"
db.session.query(KeyValueEntry).filter_by(id=456).delete()
db.session.commit()