2018-02-25 18:06:11 -05:00
|
|
|
# -*- coding: utf-8 -*-
|
2016-11-10 02:08:22 -05:00
|
|
|
"""Unit tests for Superset Celery worker"""
|
2016-08-30 01:11:36 -04:00
|
|
|
from __future__ import absolute_import
|
|
|
|
from __future__ import division
|
|
|
|
from __future__ import print_function
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
|
|
|
|
import json
|
|
|
|
import os
|
|
|
|
import subprocess
|
|
|
|
import time
|
|
|
|
import unittest
|
|
|
|
|
|
|
|
import pandas as pd
|
2017-11-07 23:23:40 -05:00
|
|
|
from past.builtins import basestring
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2018-03-27 19:46:02 -04:00
|
|
|
from superset import app, cli, dataframe, db, security_manager
|
2017-03-10 12:11:51 -05:00
|
|
|
from superset.models.helpers import QueryStatus
|
2017-04-04 23:15:19 -04:00
|
|
|
from superset.models.sql_lab import Query
|
2017-01-06 15:24:07 -05:00
|
|
|
from superset.sql_parse import SupersetQuery
|
2016-11-10 02:08:22 -05:00
|
|
|
from .base_tests import SupersetTestCase
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-11-07 23:23:40 -05:00
|
|
|
|
2016-08-30 01:11:36 -04:00
|
|
|
BASE_DIR = app.config.get('BASE_DIR')
|
|
|
|
|
|
|
|
|
|
|
|
class CeleryConfig(object):
|
|
|
|
BROKER_URL = 'sqla+sqlite:///' + app.config.get('SQL_CELERY_DB_FILE_PATH')
|
2016-11-10 02:08:22 -05:00
|
|
|
CELERY_IMPORTS = ('superset.sql_lab', )
|
2017-11-12 14:09:22 -05:00
|
|
|
CELERY_RESULT_BACKEND = (
|
|
|
|
'db+sqlite:///' + app.config.get('SQL_CELERY_RESULTS_DB_FILE_PATH'))
|
2016-08-30 01:11:36 -04:00
|
|
|
CELERY_ANNOTATIONS = {'sql_lab.add': {'rate_limit': '10/s'}}
|
|
|
|
CONCURRENCY = 1
|
2017-11-10 20:52:34 -05:00
|
|
|
|
|
|
|
|
2016-08-30 01:11:36 -04:00
|
|
|
app.config['CELERY_CONFIG'] = CeleryConfig
|
|
|
|
|
|
|
|
|
2016-11-10 02:08:22 -05:00
|
|
|
class UtilityFunctionTests(SupersetTestCase):
|
2016-09-23 14:14:38 -04:00
|
|
|
|
2016-08-30 01:11:36 -04:00
|
|
|
# TODO(bkyryliuk): support more cases in CTA function.
|
|
|
|
def test_create_table_as(self):
|
2017-11-14 00:06:51 -05:00
|
|
|
q = SupersetQuery('SELECT * FROM outer_space;')
|
2017-01-06 15:24:07 -05:00
|
|
|
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(
|
2017-11-14 00:06:51 -05:00
|
|
|
'CREATE TABLE tmp AS \nSELECT * FROM outer_space',
|
|
|
|
q.as_create_table('tmp'))
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
self.assertEqual(
|
2017-11-14 00:06:51 -05:00
|
|
|
'DROP TABLE IF EXISTS tmp;\n'
|
|
|
|
'CREATE TABLE tmp AS \nSELECT * FROM outer_space',
|
|
|
|
q.as_create_table('tmp', overwrite=True))
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-01-06 15:24:07 -05:00
|
|
|
# now without a semicolon
|
2017-11-14 00:06:51 -05:00
|
|
|
q = SupersetQuery('SELECT * FROM outer_space')
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(
|
2017-11-14 00:06:51 -05:00
|
|
|
'CREATE TABLE tmp AS \nSELECT * FROM outer_space',
|
|
|
|
q.as_create_table('tmp'))
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-01-06 15:24:07 -05:00
|
|
|
# now a multi-line query
|
2016-08-30 01:11:36 -04:00
|
|
|
multi_line_query = (
|
2017-11-14 00:06:51 -05:00
|
|
|
'SELECT * FROM planets WHERE\n'
|
2017-01-06 15:24:07 -05:00
|
|
|
"Luke_Father = 'Darth Vader'")
|
|
|
|
q = SupersetQuery(multi_line_query)
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(
|
2017-11-14 00:06:51 -05:00
|
|
|
'CREATE TABLE tmp AS \nSELECT * FROM planets WHERE\n'
|
2017-01-06 15:24:07 -05:00
|
|
|
"Luke_Father = 'Darth Vader'",
|
2017-11-14 00:06:51 -05:00
|
|
|
q.as_create_table('tmp'),
|
2017-01-06 15:24:07 -05:00
|
|
|
)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
|
2016-11-10 02:08:22 -05:00
|
|
|
class CeleryTestCase(SupersetTestCase):
|
2016-08-30 01:11:36 -04:00
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
super(CeleryTestCase, self).__init__(*args, **kwargs)
|
|
|
|
self.client = app.test_client()
|
|
|
|
|
|
|
|
def get_query_by_name(self, sql):
|
|
|
|
session = db.session
|
2017-04-04 23:15:19 -04:00
|
|
|
query = session.query(Query).filter_by(sql=sql).first()
|
2016-08-30 01:11:36 -04:00
|
|
|
session.close()
|
|
|
|
return query
|
|
|
|
|
|
|
|
def get_query_by_id(self, id):
|
|
|
|
session = db.session
|
2017-04-04 23:15:19 -04:00
|
|
|
query = session.query(Query).filter_by(id=id).first()
|
2016-08-30 01:11:36 -04:00
|
|
|
session.close()
|
|
|
|
return query
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def setUpClass(cls):
|
|
|
|
try:
|
|
|
|
os.remove(app.config.get('SQL_CELERY_DB_FILE_PATH'))
|
|
|
|
except OSError as e:
|
|
|
|
app.logger.warn(str(e))
|
|
|
|
try:
|
|
|
|
os.remove(app.config.get('SQL_CELERY_RESULTS_DB_FILE_PATH'))
|
|
|
|
except OSError as e:
|
|
|
|
app.logger.warn(str(e))
|
|
|
|
|
2018-03-27 19:46:02 -04:00
|
|
|
security_manager.sync_role_definitions()
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2016-11-10 02:08:22 -05:00
|
|
|
worker_command = BASE_DIR + '/bin/superset worker'
|
2016-08-30 01:11:36 -04:00
|
|
|
subprocess.Popen(
|
|
|
|
worker_command, shell=True, stdout=subprocess.PIPE)
|
|
|
|
|
2018-03-27 19:46:02 -04:00
|
|
|
admin = security_manager.find_user('admin')
|
2016-08-30 01:11:36 -04:00
|
|
|
if not admin:
|
2018-03-27 19:46:02 -04:00
|
|
|
security_manager.add_user(
|
2016-08-30 01:11:36 -04:00
|
|
|
'admin', 'admin', ' user', 'admin@fab.org',
|
2018-03-27 19:46:02 -04:00
|
|
|
security_manager.find_role('Admin'),
|
2016-08-30 01:11:36 -04:00
|
|
|
password='general')
|
|
|
|
cli.load_examples(load_test_data=True)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def tearDownClass(cls):
|
|
|
|
subprocess.call(
|
|
|
|
"ps auxww | grep 'celeryd' | awk '{print $2}' | xargs kill -9",
|
2017-11-08 00:32:45 -05:00
|
|
|
shell=True,
|
2016-08-30 01:11:36 -04:00
|
|
|
)
|
|
|
|
subprocess.call(
|
2017-11-14 00:06:51 -05:00
|
|
|
"ps auxww | grep 'superset worker' | awk '{print $2}' | xargs kill -9",
|
2017-11-08 00:32:45 -05:00
|
|
|
shell=True,
|
2016-08-30 01:11:36 -04:00
|
|
|
)
|
|
|
|
|
2016-10-21 19:55:37 -04:00
|
|
|
def run_sql(self, db_id, sql, client_id, cta='false', tmp_table='tmp',
|
2016-08-30 01:11:36 -04:00
|
|
|
async='false'):
|
|
|
|
self.login()
|
|
|
|
resp = self.client.post(
|
2016-11-10 02:08:22 -05:00
|
|
|
'/superset/sql_json/',
|
2016-08-30 01:11:36 -04:00
|
|
|
data=dict(
|
2016-10-21 19:55:37 -04:00
|
|
|
database_id=db_id,
|
2016-08-30 01:11:36 -04:00
|
|
|
sql=sql,
|
|
|
|
async=async,
|
|
|
|
select_as_cta=cta,
|
|
|
|
tmp_table_name=tmp_table,
|
2016-09-28 16:55:06 -04:00
|
|
|
client_id=client_id,
|
2016-08-30 01:11:36 -04:00
|
|
|
),
|
|
|
|
)
|
|
|
|
self.logout()
|
|
|
|
return json.loads(resp.data.decode('utf-8'))
|
|
|
|
|
2017-03-06 19:20:55 -05:00
|
|
|
def test_run_sync_query_dont_exist(self):
|
2016-10-21 19:55:37 -04:00
|
|
|
main_db = self.get_main_database(db.session)
|
|
|
|
db_id = main_db.id
|
2016-08-30 01:11:36 -04:00
|
|
|
sql_dont_exist = 'SELECT name FROM table_dont_exist'
|
2017-11-14 00:06:51 -05:00
|
|
|
result1 = self.run_sql(db_id, sql_dont_exist, '1', cta='true')
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertTrue('error' in result1)
|
|
|
|
|
2017-03-06 19:20:55 -05:00
|
|
|
def test_run_sync_query_cta(self):
|
|
|
|
main_db = self.get_main_database(db.session)
|
|
|
|
db_id = main_db.id
|
|
|
|
eng = main_db.get_sqla_engine()
|
|
|
|
perm_name = 'can_sql_json'
|
2016-11-17 14:58:33 -05:00
|
|
|
sql_where = (
|
|
|
|
"SELECT name FROM ab_permission WHERE name='{}'".format(perm_name))
|
2016-08-30 01:11:36 -04:00
|
|
|
result2 = self.run_sql(
|
2017-11-14 00:06:51 -05:00
|
|
|
db_id, sql_where, '2', tmp_table='tmp_table_2', cta='true')
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(QueryStatus.SUCCESS, result2['query']['state'])
|
2016-09-23 14:14:38 -04:00
|
|
|
self.assertEqual([], result2['data'])
|
2017-03-10 17:49:11 -05:00
|
|
|
self.assertEqual([], result2['columns'])
|
2016-08-30 01:11:36 -04:00
|
|
|
query2 = self.get_query_by_id(result2['query']['serverId'])
|
|
|
|
|
|
|
|
# Check the data in the tmp table.
|
|
|
|
df2 = pd.read_sql_query(sql=query2.select_sql, con=eng)
|
|
|
|
data2 = df2.to_dict(orient='records')
|
2016-11-17 14:58:33 -05:00
|
|
|
self.assertEqual([{'name': perm_name}], data2)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-03-06 19:20:55 -05:00
|
|
|
def test_run_sync_query_cta_no_data(self):
|
|
|
|
main_db = self.get_main_database(db.session)
|
|
|
|
db_id = main_db.id
|
2016-08-30 01:11:36 -04:00
|
|
|
sql_empty_result = 'SELECT * FROM ab_user WHERE id=666'
|
|
|
|
result3 = self.run_sql(
|
2017-11-14 00:06:51 -05:00
|
|
|
db_id, sql_empty_result, '3', tmp_table='tmp_table_3', cta='true')
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(QueryStatus.SUCCESS, result3['query']['state'])
|
2016-09-23 14:14:38 -04:00
|
|
|
self.assertEqual([], result3['data'])
|
2017-03-10 17:49:11 -05:00
|
|
|
self.assertEqual([], result3['columns'])
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
query3 = self.get_query_by_id(result3['query']['serverId'])
|
|
|
|
self.assertEqual(QueryStatus.SUCCESS, query3.status)
|
|
|
|
|
|
|
|
def test_run_async_query(self):
|
2016-10-21 19:55:37 -04:00
|
|
|
main_db = self.get_main_database(db.session)
|
2016-08-30 01:11:36 -04:00
|
|
|
eng = main_db.get_sqla_engine()
|
|
|
|
sql_where = "SELECT name FROM ab_role WHERE name='Admin'"
|
2017-03-06 19:20:55 -05:00
|
|
|
result = self.run_sql(
|
2017-11-14 00:06:51 -05:00
|
|
|
main_db.id, sql_where, '4', async='true', tmp_table='tmp_async_1',
|
2016-10-21 19:55:37 -04:00
|
|
|
cta='true')
|
2017-03-06 19:20:55 -05:00
|
|
|
assert result['query']['state'] in (
|
2016-09-19 18:28:10 -04:00
|
|
|
QueryStatus.PENDING, QueryStatus.RUNNING, QueryStatus.SUCCESS)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
2017-03-06 19:20:55 -05:00
|
|
|
query = self.get_query_by_id(result['query']['serverId'])
|
|
|
|
df = pd.read_sql_query(query.select_sql, con=eng)
|
|
|
|
self.assertEqual(QueryStatus.SUCCESS, query.status)
|
|
|
|
self.assertEqual([{'name': 'Admin'}], df.to_dict(orient='records'))
|
|
|
|
self.assertEqual(QueryStatus.SUCCESS, query.status)
|
2017-11-14 00:06:51 -05:00
|
|
|
self.assertTrue('FROM tmp_async_1' in query.select_sql)
|
|
|
|
self.assertTrue('LIMIT 666' in query.select_sql)
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(
|
2017-11-14 00:06:51 -05:00
|
|
|
'CREATE TABLE tmp_async_1 AS \nSELECT name FROM ab_role '
|
2017-03-06 19:20:55 -05:00
|
|
|
"WHERE name='Admin'", query.executed_sql)
|
|
|
|
self.assertEqual(sql_where, query.sql)
|
|
|
|
self.assertEqual(0, query.rows)
|
|
|
|
self.assertEqual(666, query.limit)
|
|
|
|
self.assertEqual(False, query.limit_used)
|
|
|
|
self.assertEqual(True, query.select_as_cta)
|
|
|
|
self.assertEqual(True, query.select_as_cta_used)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-03-24 12:23:51 -04:00
|
|
|
@staticmethod
|
|
|
|
def de_unicode_dict(d):
|
|
|
|
def str_if_basestring(o):
|
|
|
|
if isinstance(o, basestring):
|
|
|
|
return str(o)
|
|
|
|
return o
|
|
|
|
return {str_if_basestring(k): str_if_basestring(d[k]) for k in d}
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def dictify_list_of_dicts(cls, l, k):
|
|
|
|
return {str(o[k]): cls.de_unicode_dict(o) for o in l}
|
|
|
|
|
2017-03-10 17:49:11 -05:00
|
|
|
def test_get_columns(self):
|
2016-10-21 19:55:37 -04:00
|
|
|
main_db = self.get_main_database(db.session)
|
2017-11-14 00:06:51 -05:00
|
|
|
df = main_db.get_df('SELECT * FROM multiformat_time_series', None)
|
2016-11-10 02:08:22 -05:00
|
|
|
cdf = dataframe.SupersetDataFrame(df)
|
2017-03-24 12:23:51 -04:00
|
|
|
|
|
|
|
# Making ordering non-deterministic
|
|
|
|
cols = self.dictify_list_of_dicts(cdf.columns, 'name')
|
|
|
|
|
2016-09-23 14:14:38 -04:00
|
|
|
if main_db.sqlalchemy_uri.startswith('sqlite'):
|
2017-03-24 12:23:51 -04:00
|
|
|
self.assertEqual(self.dictify_list_of_dicts([
|
|
|
|
{'is_date': True, 'type': 'STRING', 'name': 'ds',
|
|
|
|
'is_dim': False},
|
|
|
|
{'is_date': True, 'type': 'STRING', 'name': 'ds2',
|
|
|
|
'is_dim': False},
|
|
|
|
{'agg': 'sum', 'is_date': False, 'type': 'INT',
|
|
|
|
'name': 'epoch_ms', 'is_dim': False},
|
|
|
|
{'agg': 'sum', 'is_date': False, 'type': 'INT',
|
|
|
|
'name': 'epoch_s', 'is_dim': False},
|
|
|
|
{'is_date': True, 'type': 'STRING', 'name': 'string0',
|
|
|
|
'is_dim': False},
|
|
|
|
{'is_date': False, 'type': 'STRING',
|
|
|
|
'name': 'string1', 'is_dim': True},
|
|
|
|
{'is_date': True, 'type': 'STRING', 'name': 'string2',
|
|
|
|
'is_dim': False},
|
|
|
|
{'is_date': False, 'type': 'STRING',
|
2017-11-08 23:34:23 -05:00
|
|
|
'name': 'string3', 'is_dim': True}], 'name'),
|
|
|
|
cols,
|
2016-09-23 14:14:38 -04:00
|
|
|
)
|
|
|
|
else:
|
2017-03-24 12:23:51 -04:00
|
|
|
self.assertEqual(self.dictify_list_of_dicts([
|
|
|
|
{'is_date': True, 'type': 'DATETIME', 'name': 'ds',
|
|
|
|
'is_dim': False},
|
|
|
|
{'is_date': True, 'type': 'DATETIME',
|
|
|
|
'name': 'ds2', 'is_dim': False},
|
|
|
|
{'agg': 'sum', 'is_date': False, 'type': 'INT',
|
|
|
|
'name': 'epoch_ms', 'is_dim': False},
|
|
|
|
{'agg': 'sum', 'is_date': False, 'type': 'INT',
|
|
|
|
'name': 'epoch_s', 'is_dim': False},
|
|
|
|
{'is_date': True, 'type': 'STRING', 'name': 'string0',
|
|
|
|
'is_dim': False},
|
|
|
|
{'is_date': False, 'type': 'STRING',
|
|
|
|
'name': 'string1', 'is_dim': True},
|
|
|
|
{'is_date': True, 'type': 'STRING', 'name': 'string2',
|
|
|
|
'is_dim': False},
|
|
|
|
{'is_date': False, 'type': 'STRING',
|
2017-11-08 23:34:23 -05:00
|
|
|
'name': 'string3', 'is_dim': True}], 'name'),
|
|
|
|
cols,
|
2016-09-23 14:14:38 -04:00
|
|
|
)
|
|
|
|
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|