mirror of https://github.com/apache/superset.git
Removing uneeded results_backends.py (#2717)
This commit is contained in:
parent
d65054e015
commit
cb14640a82
|
@ -366,28 +366,74 @@ Upgrading should be as straightforward as running::
|
|||
SQL Lab
|
||||
-------
|
||||
SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible
|
||||
databases out there. By default, queries are run in a web request, and
|
||||
databases. By default, queries are executed in the scope of a web
|
||||
request so they
|
||||
may eventually timeout as queries exceed the maximum duration of a web
|
||||
request in your environment, whether it'd be a reverse proxy or the Superset
|
||||
server itself.
|
||||
|
||||
In the modern analytics world, it's not uncommon to run large queries that
|
||||
run for minutes or hours.
|
||||
On large analytic databases, it's common to run queries that
|
||||
execute for minutes or hours.
|
||||
To enable support for long running queries that
|
||||
execute beyond the typical web request's timeout (30-60 seconds), it is
|
||||
necessary to deploy an asynchronous backend, which consist of one or many
|
||||
Superset worker, which is implemented as a Celery worker, and a Celery
|
||||
broker for which we recommend using Redis or RabbitMQ.
|
||||
necessary to configure an asynchronous backend for Superset which consist of:
|
||||
|
||||
It's also preferable to setup an async result backend as a key value store
|
||||
that can hold the long-running query results for a period of time. More
|
||||
details to come as to how to set this up here soon.
|
||||
* one or many Superset worker (which is implemented as a Celery worker), and
|
||||
can be started with the ``superset worker`` command, run
|
||||
``superset worker --help`` to view the related options
|
||||
* a celery broker (message queue) for which we recommend using Redis
|
||||
or RabbitMQ
|
||||
* a results backend that defines where the worker will persist the query
|
||||
results
|
||||
|
||||
SQL Lab supports templating in queries, and it's possible to override
|
||||
Configuring Celery requires defining a ``CELERY_CONFIG`` in your
|
||||
``superset_config.py``. Both the worker and web server processes should
|
||||
have the same configuration.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
class CeleryConfig(object):
|
||||
BROKER_URL = 'redis://localhost:6379/0'
|
||||
CELERY_IMPORTS = ('superset.sql_lab', )
|
||||
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
|
||||
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
|
||||
|
||||
CELERY_CONFIG = CeleryConfig
|
||||
|
||||
To setup a result backend, you need to pass an instance of a derivative
|
||||
of ``werkzeug.contrib.cache.BaseCache`` to the ``RESULTS_BACKEND``
|
||||
configuration key in your ``superset_config.py``. It's possible to use
|
||||
Memcached, Redis, S3 (https://pypi.python.org/pypi/s3werkzeugcache),
|
||||
memory or the file system (in a single server-type setup or for testing),
|
||||
or to write your own caching interface. Your ``superset_config.py`` may
|
||||
look something like:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# On S3
|
||||
from s3cache.s3cache import S3Cache
|
||||
S3_CACHE_BUCKET = 'foobar-superset'
|
||||
S3_CACHE_KEY_PREFIX = 'sql_lab_result'
|
||||
RESULTS_BACKEND = S3Cache(S3_CACHE_BUCKET, S3_CACHE_KEY_PREFIX)
|
||||
|
||||
# On Redis
|
||||
from werkzeug.contrib.cache import RedisCache
|
||||
RESULTS_BACKEND = RedisCache(
|
||||
host='localhost', port=6379, key_prefix='superset_results')
|
||||
|
||||
|
||||
Also note that SQL Lab supports Jinja templating in queries, and that it's
|
||||
possible to overload
|
||||
the default Jinja context in your environment by defining the
|
||||
``JINJA_CONTEXT_ADDONS`` in your superset configuration. Objects referenced
|
||||
in this dictionary are made available for users to use in their SQL.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
JINJA_CONTEXT_ADDONS = {
|
||||
'my_crazy_macro': lambda x: x*2,
|
||||
}
|
||||
|
||||
|
||||
Making your own build
|
||||
---------------------
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 154 KiB |
|
@ -1,175 +0,0 @@
|
|||
"""Results backends are used to store long-running query results
|
||||
|
||||
The Abstraction is flask-cache, which uses the BaseCache class from werkzeug
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
try:
|
||||
import cPickle as pickle
|
||||
except ImportError:
|
||||
import pickle
|
||||
|
||||
import io
|
||||
import logging
|
||||
|
||||
import boto3
|
||||
from werkzeug.contrib.cache import BaseCache
|
||||
|
||||
from superset import app
|
||||
|
||||
config = app.config
|
||||
|
||||
|
||||
class S3Cache(BaseCache):
|
||||
|
||||
"""S3 cache implementation.
|
||||
|
||||
Adapted from examples in
|
||||
https://github.com/pallets/werkzeug/blob/master/werkzeug/contrib/cache.py.
|
||||
|
||||
Timeout parameters are ignored as S3 doesn't support key-level expiration.
|
||||
To expire keys, set up an expiration policy as described in
|
||||
https://aws.amazon.com/blogs/aws/amazon-s3-object-expiration/.
|
||||
"""
|
||||
|
||||
def __init__(self, default_timeout=300):
|
||||
self.default_timeout = default_timeout
|
||||
|
||||
self.s3_client = boto3.client('s3')
|
||||
|
||||
self.bucket = config.get('S3_CACHE_BUCKET')
|
||||
self.key_prefix = config.get('S3_CACHE_KEY_PREFIX')
|
||||
|
||||
def get(self, key):
|
||||
"""Look up key in the cache and return the value for it.
|
||||
|
||||
:param key: the key to be looked up.
|
||||
:returns: The value if it exists and is readable, else ``None``.
|
||||
"""
|
||||
if not self._key_exists(key):
|
||||
return None
|
||||
else:
|
||||
value_file = io.BytesIO()
|
||||
|
||||
try:
|
||||
self.s3_client.download_fileobj(
|
||||
self.bucket,
|
||||
self._full_s3_key(key),
|
||||
value_file
|
||||
)
|
||||
except Exception as e:
|
||||
logging.warn('Error while trying to get key %s', key)
|
||||
logging.exception(e)
|
||||
|
||||
return None
|
||||
else:
|
||||
value_file.seek(0)
|
||||
return pickle.load(value_file)
|
||||
|
||||
def delete(self, key):
|
||||
"""Delete `key` from the cache.
|
||||
|
||||
:param key: the key to delete.
|
||||
:returns: Whether the key existed and has been deleted.
|
||||
:rtype: boolean
|
||||
"""
|
||||
if not self._key_exists(key):
|
||||
return False
|
||||
else:
|
||||
try:
|
||||
self.s3_client.delete_objects(
|
||||
Bucket=self.bucket,
|
||||
Delete={
|
||||
'Objects': [
|
||||
{
|
||||
'Key': self._full_s3_key(key)
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logging.warn('Error while trying to delete key %s', key)
|
||||
logging.exception(e)
|
||||
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def set(self, key, value, timeout=None):
|
||||
"""Add a new key/value to the cache.
|
||||
|
||||
If the key already exists, the existing value is overwritten.
|
||||
|
||||
:param key: the key to set
|
||||
:param value: the value for the key
|
||||
:param timeout: the cache timeout for the key in seconds (if not
|
||||
specified, it uses the default timeout). A timeout of
|
||||
0 idicates that the cache never expires.
|
||||
:returns: ``True`` if key has been updated, ``False`` for backend
|
||||
errors. Pickling errors, however, will raise a subclass of
|
||||
``pickle.PickleError``.
|
||||
:rtype: boolean
|
||||
"""
|
||||
value_file = io.BytesIO()
|
||||
pickle.dump(value, value_file)
|
||||
|
||||
try:
|
||||
value_file.seek(0)
|
||||
self.s3_client.upload_fileobj(
|
||||
value_file,
|
||||
self.bucket,
|
||||
self._full_s3_key(key)
|
||||
)
|
||||
except Exception as e:
|
||||
logging.warn('Error while trying to set key %s', key)
|
||||
logging.exception(e)
|
||||
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def add(self, key, value, timeout=None):
|
||||
"""Works like :meth:`set` but does not overwrite existing values.
|
||||
|
||||
:param key: the key to set
|
||||
:param value: the value for the key
|
||||
:param timeout: the cache timeout for the key in seconds (if not
|
||||
specified, it uses the default timeout). A timeout of
|
||||
0 idicates that the cache never expires.
|
||||
:returns: Same as :meth:`set`, but also ``False`` for already
|
||||
existing keys.
|
||||
:rtype: boolean
|
||||
"""
|
||||
if self._key_exists(key):
|
||||
return False
|
||||
else:
|
||||
return self.set(key, value, timeout=timeout)
|
||||
|
||||
def clear(self):
|
||||
"""Clears the cache.
|
||||
|
||||
Keep in mind that not all caches support completely clearing the cache.
|
||||
:returns: Whether the cache has been cleared.
|
||||
:rtype: boolean
|
||||
"""
|
||||
return False
|
||||
|
||||
def _full_s3_key(self, key):
|
||||
"""Convert a cache key to a full S3 key, including the key prefix."""
|
||||
return '%s%s' % (self.key_prefix, key)
|
||||
|
||||
def _key_exists(self, key):
|
||||
"""Determine whether the given key exists in the bucket."""
|
||||
try:
|
||||
self.s3_client.head_object(
|
||||
Bucket=self.bucket,
|
||||
Key=self._full_s3_key(key)
|
||||
)
|
||||
except Exception:
|
||||
# head_object throws an exception when object doesn't exist
|
||||
return False
|
||||
else:
|
||||
return True
|
|
@ -1,124 +0,0 @@
|
|||
try:
|
||||
import cPickle as pickle
|
||||
except ImportError:
|
||||
import pickle
|
||||
|
||||
import mock
|
||||
|
||||
from superset import app, results_backends
|
||||
from .base_tests import SupersetTestCase
|
||||
|
||||
app.config['S3_CACHE_BUCKET'] = 'test-bucket'
|
||||
app.config['S3_CACHE_KEY_PREFIX'] = 'test-prefix/'
|
||||
|
||||
|
||||
class ResultsBackendsTests(SupersetTestCase):
|
||||
requires_examples = False
|
||||
|
||||
@mock.patch('boto3.client')
|
||||
def setUp(self, mock_boto3_client):
|
||||
self.mock_boto3_client = mock_boto3_client
|
||||
self.mock_s3_client = mock.MagicMock()
|
||||
|
||||
self.mock_boto3_client.return_value = self.mock_s3_client
|
||||
|
||||
self.s3_cache = results_backends.S3Cache()
|
||||
self.s3_cache._key_exists = ResultsBackendsTests._mock_key_exists
|
||||
|
||||
@staticmethod
|
||||
def _mock_download_fileobj(bucket, key, value_file):
|
||||
value_file.write(pickle.dumps('%s:%s' % (bucket, key)))
|
||||
|
||||
@staticmethod
|
||||
def _mock_key_exists(key):
|
||||
return key == 'test-key'
|
||||
|
||||
def test_s3_cache_initilization(self):
|
||||
self.mock_boto3_client.assert_called_with('s3')
|
||||
|
||||
def test_s3_cache_set(self):
|
||||
result = self.s3_cache.set('test-key', 'test-value')
|
||||
|
||||
self.assertTrue(result)
|
||||
self.mock_s3_client.upload_fileobj.assert_called_once()
|
||||
|
||||
call_args = self.mock_s3_client.upload_fileobj.call_args_list[0][0]
|
||||
|
||||
self.assertEquals(pickle.loads(call_args[0].getvalue()), 'test-value')
|
||||
self.assertEquals(call_args[1], 'test-bucket')
|
||||
self.assertEquals(call_args[2], 'test-prefix/test-key')
|
||||
|
||||
def test_s3_cache_set_exception(self):
|
||||
self.mock_s3_client.upload_fileobj.side_effect = Exception('Something bad happened!')
|
||||
result = self.s3_cache.set('test-key', 'test-value')
|
||||
|
||||
self.assertFalse(result)
|
||||
self.mock_s3_client.upload_fileobj.assert_called_once()
|
||||
|
||||
def test_s3_cache_get_exists(self):
|
||||
self.mock_s3_client.download_fileobj.side_effect = (
|
||||
ResultsBackendsTests._mock_download_fileobj)
|
||||
result = self.s3_cache.get('test-key')
|
||||
|
||||
self.assertEquals(result, 'test-bucket:test-prefix/test-key')
|
||||
self.mock_s3_client.download_fileobj.assert_called_once()
|
||||
|
||||
def test_s3_cache_get_does_not_exist(self):
|
||||
result = self.s3_cache.get('test-key2')
|
||||
|
||||
self.assertEquals(result, None)
|
||||
self.assertFalse(self.mock_s3_client.download_fileobj.called)
|
||||
|
||||
def test_s3_cache_get_exception(self):
|
||||
self.mock_s3_client.download_fileobj.side_effect = Exception('Something bad happened')
|
||||
result = self.s3_cache.get('test-key')
|
||||
|
||||
self.assertEquals(result, None)
|
||||
self.mock_s3_client.download_fileobj.assert_called_once()
|
||||
|
||||
def test_s3_cache_delete_exists(self):
|
||||
result = self.s3_cache.delete('test-key')
|
||||
|
||||
self.assertTrue(result)
|
||||
self.mock_s3_client.delete_objects.assert_called_once_with(
|
||||
Bucket='test-bucket',
|
||||
Delete={'Objects': [{'Key': 'test-prefix/test-key'}]}
|
||||
)
|
||||
|
||||
def test_s3_cache_delete_does_not_exist(self):
|
||||
result = self.s3_cache.delete('test-key2')
|
||||
|
||||
self.assertFalse(result)
|
||||
self.assertFalse(self.mock_s3_client.delete_objects.called)
|
||||
|
||||
def test_s3_cache_delete_exception(self):
|
||||
self.mock_s3_client.delete_objects.side_effect = Exception('Something bad happened')
|
||||
result = self.s3_cache.delete('test-key')
|
||||
|
||||
self.assertFalse(result)
|
||||
self.mock_s3_client.delete_objects.assert_called_once()
|
||||
|
||||
def test_s3_cache_add_exists(self):
|
||||
result = self.s3_cache.add('test-key', 'test-value')
|
||||
|
||||
self.assertFalse(result)
|
||||
self.assertFalse(self.mock_s3_client.upload_fileobj.called)
|
||||
|
||||
def test_s3_cache_add_does_not_exist(self):
|
||||
result = self.s3_cache.add('test-key2', 'test-value')
|
||||
|
||||
self.assertTrue(result)
|
||||
self.mock_s3_client.upload_fileobj.assert_called_once()
|
||||
|
||||
call_args = self.mock_s3_client.upload_fileobj.call_args_list[0][0]
|
||||
|
||||
self.assertEquals(pickle.loads(call_args[0].getvalue()), 'test-value')
|
||||
self.assertEquals(call_args[1], 'test-bucket')
|
||||
self.assertEquals(call_args[2], 'test-prefix/test-key2')
|
||||
|
||||
def test_s3_cache_add_exception(self):
|
||||
self.mock_s3_client.upload_fileobj.side_effect = Exception('Something bad happened')
|
||||
result = self.s3_cache.add('test-key2', 'test-value')
|
||||
|
||||
self.assertFalse(result)
|
||||
self.mock_s3_client.upload_fileobj.assert_called_once()
|
Loading…
Reference in New Issue