chore: log cache keys to the logs (#10678)

* Log cache keys to the logs

* Add tests

* Use separate table for the cache keys

* Add migration for the cache lookup table

Co-authored-by: bogdan kyryliuk <bogdankyryliuk@dropbox.com>
This commit is contained in:
Bogdan 2020-09-01 09:41:25 -07:00 committed by GitHub
parent 807bd656c6
commit 4572ebb600
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 153 additions and 35 deletions

View File

@ -32,6 +32,7 @@ from superset.exceptions import QueryObjectValidationError
from superset.stats_logger import BaseStatsLogger
from superset.utils import core as utils
from superset.utils.core import DTTM_ALIAS
from superset.viz import set_and_log_cache
config = app.config
stats_logger: BaseStatsLogger = config["STATS_LOGGER"]
@ -272,16 +273,14 @@ class QueryContext:
stacktrace = utils.get_stacktrace()
if is_loaded and cache_key and cache and status != utils.QueryStatus.FAILED:
try:
cache_value = dict(dttm=cached_dttm, df=df, query=query)
stats_logger.incr("set_cache_key")
cache.set(cache_key, cache_value, timeout=self.cache_timeout)
except Exception as ex: # pylint: disable=broad-except
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logger.warning("Could not cache key %s", cache_key)
logger.exception(ex)
cache.delete(cache_key)
set_and_log_cache(
cache_key,
df,
query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
)
return {
"cache_key": cache_key,
"cached_dttm": cache_value["dttm"] if cache_value is not None else None,

View File

@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Add cache to datasource lookup table.
Revision ID: 175ea3592453
Revises: f80a3b88324b
Create Date: 2020-08-28 17:16:57.379425
"""
# revision identifiers, used by Alembic.
revision = "175ea3592453"
down_revision = "f80a3b88324b"
import sqlalchemy as sa
from alembic import op
def upgrade():
op.create_table(
"cache_keys",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("cache_key", sa.String(256), nullable=False),
sa.Column("cache_timeout", sa.Integer(), nullable=True),
sa.Column("datasource_uid", sa.String(64), nullable=False),
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_cache_keys_datasource_uid"),
"cache_keys",
["datasource_uid"],
unique=False,
)
def downgrade():
op.drop_index(op.f("ix_cache_keys_datasource_uid"), table_name="cache_keys")
op.drop_table("cache_keys")

32
superset/models/cache.py Executable file
View File

@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from flask_appbuilder import Model
from sqlalchemy import Column, DateTime, Integer, String
class CacheKey(Model): # pylint: disable=too-few-public-methods
"""Stores cache key records for the superset visualization."""
__tablename__ = "cache_keys"
id = Column(Integer, primary_key=True)
cache_key = Column(String(256), nullable=False)
cache_timeout = Column(Integer, nullable=True)
datasource_uid = Column(String(64), nullable=False, index=True)
created_on = Column(DateTime, default=datetime.now, nullable=True)

View File

@ -169,7 +169,6 @@ class DBEventLogger(AbstractEventLogger):
user_id=user_id,
)
logs.append(log)
try:
sesh = current_app.appbuilder.get_session
sesh.bulk_save_objects(logs)

View File

@ -53,7 +53,7 @@ from flask_babel import lazy_gettext as _
from geopy.point import Point
from pandas.tseries.frequencies import to_offset
from superset import app, cache, security_manager
from superset import app, cache, db, security_manager
from superset.constants import NULL_STRING
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
@ -61,6 +61,7 @@ from superset.exceptions import (
QueryObjectValidationError,
SpatialException,
)
from superset.models.cache import CacheKey
from superset.models.helpers import QueryResult
from superset.typing import QueryObjectDict, VizData, VizPayload
from superset.utils import core as utils
@ -95,6 +96,34 @@ METRIC_KEYS = [
]
def set_and_log_cache(
cache_key: str,
df: pd.DataFrame,
query: str,
cached_dttm: str,
cache_timeout: int,
datasource_uid: Optional[str],
) -> None:
try:
cache_value = dict(dttm=cached_dttm, df=df, query=query)
stats_logger.incr("set_cache_key")
cache.set(cache_key, cache_value, timeout=cache_timeout)
if datasource_uid:
ck = CacheKey(
cache_key=cache_key,
cache_timeout=cache_timeout,
datasource_uid=datasource_uid,
)
db.session.add(ck)
except Exception as ex:
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logger.warning("Could not cache key {}".format(cache_key))
logger.exception(ex)
cache.delete(cache_key)
class BaseViz:
"""All visualizations derive this base class"""
@ -536,16 +565,14 @@ class BaseViz:
and cache
and self.status != utils.QueryStatus.FAILED
):
try:
cache_value = dict(dttm=cached_dttm, df=df, query=self.query)
stats_logger.incr("set_cache_key")
cache.set(cache_key, cache_value, timeout=self.cache_timeout)
except Exception as ex:
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logger.warning("Could not cache key {}".format(cache_key))
logger.exception(ex)
cache.delete(cache_key)
set_and_log_cache(
cache_key,
df,
self.query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
)
return {
"cache_key": self._any_cache_key,
"cached_dttm": self._any_cached_dttm,

View File

@ -43,10 +43,9 @@ from dateutil import relativedelta as rdelta
from flask import request
from flask_babel import lazy_gettext as _
from geopy.point import Point
from markdown import markdown
from pandas.tseries.frequencies import to_offset
from superset import app, cache, get_manifest_files, security_manager
from superset import app, cache, security_manager
from superset.constants import NULL_STRING
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
@ -63,6 +62,7 @@ from superset.utils.core import (
merge_extra_filters,
to_adhoc,
)
from superset.viz import set_and_log_cache
if TYPE_CHECKING:
from superset.connectors.base.models import BaseDatasource
@ -521,16 +521,14 @@ class BaseViz:
and cache
and self.status != utils.QueryStatus.FAILED
):
try:
cache_value = dict(dttm=cached_dttm, df=df, query=self.query)
stats_logger.incr("set_cache_key")
cache.set(cache_key, cache_value, timeout=self.cache_timeout)
except Exception as ex:
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logger.warning("Could not cache key {}".format(cache_key))
logger.exception(ex)
cache.delete(cache_key)
set_and_log_cache(
cache_key,
df,
self.query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
)
return {
"cache_key": self._any_cache_key,

View File

@ -35,6 +35,7 @@ from unittest import mock, skipUnless
import pandas as pd
import sqlalchemy as sqla
from superset.models.cache import CacheKey
from tests.test_app import app # isort:skip
import superset.views.utils
from superset import (
@ -583,6 +584,12 @@ class TestCore(SupersetTestCase):
+ quote(json.dumps([{"col": "name", "op": "in", "val": ["Jennifer"]}]))
) == [{"slice_id": slc.id, "viz_error": None, "viz_status": "success"}]
def test_cache_logging(self):
slc = self.get_slice("Girls", db.session)
self.get_json_resp("/superset/warm_up_cache?slice_id={}".format(slc.id))
ck = db.session.query(CacheKey).order_by(CacheKey.id.desc()).first()
assert ck.datasource_uid == "3__table"
def test_shortner(self):
self.login(username="admin")
data = (

View File

@ -18,6 +18,7 @@ import tests.test_app
from superset import db
from superset.charts.schemas import ChartDataQueryContextSchema
from superset.connectors.connector_registry import ConnectorRegistry
from superset.models.cache import CacheKey
from superset.utils.core import (
AdhocMetricExpressionType,
ChartDataResultFormat,
@ -141,7 +142,6 @@ class TestQueryContext(SupersetTestCase):
query_object = query_context.queries[0]
extras = query_object.to_dict()["extras"]
self.assertTrue("time_range_endpoints" in extras)
self.assertEqual(
extras["time_range_endpoints"],
(TimeRangeEndpoint.INCLUSIVE, TimeRangeEndpoint.EXCLUSIVE),
@ -180,6 +180,9 @@ class TestQueryContext(SupersetTestCase):
self.assertIn("name,sum__num\n", data)
self.assertEqual(len(data.split("\n")), 12)
ck = db.session.query(CacheKey).order_by(CacheKey.id.desc()).first()
assert ck.datasource_uid == "3__table"
def test_sql_injection_via_groupby(self):
"""
Ensure that calling invalid columns names in groupby are caught