2019-01-15 18:53:27 -05:00
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
# or more contributor license agreements. See the NOTICE file
|
|
|
|
# distributed with this work for additional information
|
|
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
|
|
# to you under the Apache License, Version 2.0 (the
|
|
|
|
# "License"); you may not use this file except in compliance
|
|
|
|
# with the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing,
|
|
|
|
# software distributed under the License is distributed on an
|
|
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
# KIND, either express or implied. See the License for the
|
|
|
|
# specific language governing permissions and limitations
|
|
|
|
# under the License.
|
2019-11-20 10:47:06 -05:00
|
|
|
# isort:skip_file
|
2016-11-10 02:08:22 -05:00
|
|
|
"""Unit tests for Superset Celery worker"""
|
2019-08-27 17:23:40 -04:00
|
|
|
import datetime
|
2020-03-03 12:52:20 -05:00
|
|
|
import io
|
2016-08-30 01:11:36 -04:00
|
|
|
import json
|
2020-03-03 12:52:20 -05:00
|
|
|
import logging
|
2016-08-30 01:11:36 -04:00
|
|
|
import subprocess
|
|
|
|
import time
|
|
|
|
import unittest
|
2019-08-27 17:23:40 -04:00
|
|
|
import unittest.mock as mock
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2019-11-27 10:06:06 -05:00
|
|
|
import flask
|
2020-03-03 12:52:20 -05:00
|
|
|
import sqlalchemy
|
|
|
|
from contextlib2 import contextmanager
|
2019-11-27 10:06:06 -05:00
|
|
|
from flask import current_app
|
2020-03-03 12:52:20 -05:00
|
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from sqlalchemy.pool import NullPool
|
2019-11-27 10:06:06 -05:00
|
|
|
|
|
|
|
from tests.test_app import app
|
2019-11-20 10:47:06 -05:00
|
|
|
from superset import db, sql_lab
|
2020-01-03 11:55:39 -05:00
|
|
|
from superset.result_set import SupersetResultSet
|
2019-08-27 17:23:40 -04:00
|
|
|
from superset.db_engine_specs.base import BaseEngineSpec
|
2019-11-27 10:06:06 -05:00
|
|
|
from superset.extensions import celery_app
|
2017-03-10 12:11:51 -05:00
|
|
|
from superset.models.helpers import QueryStatus
|
2017-04-04 23:15:19 -04:00
|
|
|
from superset.models.sql_lab import Query
|
2018-12-22 13:28:22 -05:00
|
|
|
from superset.sql_parse import ParsedQuery
|
2019-09-08 13:18:09 -04:00
|
|
|
from superset.utils.core import get_example_database
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2019-10-18 17:44:27 -04:00
|
|
|
from .base_tests import SupersetTestCase
|
2017-11-07 23:23:40 -05:00
|
|
|
|
2020-03-03 12:52:20 -05:00
|
|
|
CELERY_SHORT_SLEEP_TIME = 2
|
2020-05-14 11:58:30 -04:00
|
|
|
CELERY_SLEEP_TIME = 10
|
|
|
|
DROP_TABLE_SLEEP_TIME = 10
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
|
2016-11-10 02:08:22 -05:00
|
|
|
class UtilityFunctionTests(SupersetTestCase):
|
2016-08-30 01:11:36 -04:00
|
|
|
# TODO(bkyryliuk): support more cases in CTA function.
|
|
|
|
def test_create_table_as(self):
|
2019-06-25 16:34:48 -04:00
|
|
|
q = ParsedQuery("SELECT * FROM outer_space;")
|
2017-01-06 15:24:07 -05:00
|
|
|
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(
|
2019-06-25 16:34:48 -04:00
|
|
|
"CREATE TABLE tmp AS \nSELECT * FROM outer_space", q.as_create_table("tmp")
|
|
|
|
)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
self.assertEqual(
|
2019-06-25 16:34:48 -04:00
|
|
|
"DROP TABLE IF EXISTS tmp;\n"
|
|
|
|
"CREATE TABLE tmp AS \nSELECT * FROM outer_space",
|
|
|
|
q.as_create_table("tmp", overwrite=True),
|
|
|
|
)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-01-06 15:24:07 -05:00
|
|
|
# now without a semicolon
|
2019-06-25 16:34:48 -04:00
|
|
|
q = ParsedQuery("SELECT * FROM outer_space")
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(
|
2019-06-25 16:34:48 -04:00
|
|
|
"CREATE TABLE tmp AS \nSELECT * FROM outer_space", q.as_create_table("tmp")
|
|
|
|
)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-01-06 15:24:07 -05:00
|
|
|
# now a multi-line query
|
2019-06-25 16:34:48 -04:00
|
|
|
multi_line_query = "SELECT * FROM planets WHERE\n" "Luke_Father = 'Darth Vader'"
|
2018-12-22 13:28:22 -05:00
|
|
|
q = ParsedQuery(multi_line_query)
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(
|
2019-06-25 16:34:48 -04:00
|
|
|
"CREATE TABLE tmp AS \nSELECT * FROM planets WHERE\n"
|
2017-01-06 15:24:07 -05:00
|
|
|
"Luke_Father = 'Darth Vader'",
|
2019-06-25 16:34:48 -04:00
|
|
|
q.as_create_table("tmp"),
|
2017-01-06 15:24:07 -05:00
|
|
|
)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
|
2019-11-27 10:06:06 -05:00
|
|
|
class AppContextTests(SupersetTestCase):
|
|
|
|
def test_in_app_context(self):
|
|
|
|
@celery_app.task()
|
|
|
|
def my_task():
|
|
|
|
self.assertTrue(current_app)
|
|
|
|
|
|
|
|
# Make sure we can call tasks with an app already setup
|
|
|
|
my_task()
|
|
|
|
|
|
|
|
# Make sure the app gets pushed onto the stack properly
|
|
|
|
try:
|
|
|
|
popped_app = flask._app_ctx_stack.pop()
|
|
|
|
my_task()
|
|
|
|
finally:
|
|
|
|
flask._app_ctx_stack.push(popped_app)
|
|
|
|
|
|
|
|
|
2020-03-03 12:52:20 -05:00
|
|
|
CTAS_SCHEMA_NAME = "sqllab_test_db"
|
|
|
|
|
|
|
|
|
2016-11-10 02:08:22 -05:00
|
|
|
class CeleryTestCase(SupersetTestCase):
|
2016-08-30 01:11:36 -04:00
|
|
|
def get_query_by_name(self, sql):
|
|
|
|
session = db.session
|
2017-04-04 23:15:19 -04:00
|
|
|
query = session.query(Query).filter_by(sql=sql).first()
|
2016-08-30 01:11:36 -04:00
|
|
|
session.close()
|
|
|
|
return query
|
|
|
|
|
|
|
|
def get_query_by_id(self, id):
|
|
|
|
session = db.session
|
2017-04-04 23:15:19 -04:00
|
|
|
query = session.query(Query).filter_by(id=id).first()
|
2016-08-30 01:11:36 -04:00
|
|
|
session.close()
|
|
|
|
return query
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def setUpClass(cls):
|
2019-11-20 10:47:06 -05:00
|
|
|
with app.app_context():
|
|
|
|
db.session.query(Query).delete()
|
|
|
|
db.session.commit()
|
2018-09-19 16:30:25 -04:00
|
|
|
|
2019-06-25 16:34:48 -04:00
|
|
|
def run_sql(
|
2019-09-23 12:09:12 -04:00
|
|
|
self, db_id, sql, client_id=None, cta=False, tmp_table="tmp", async_=False
|
2019-06-25 16:34:48 -04:00
|
|
|
):
|
2016-08-30 01:11:36 -04:00
|
|
|
self.login()
|
|
|
|
resp = self.client.post(
|
2019-06-25 16:34:48 -04:00
|
|
|
"/superset/sql_json/",
|
2019-09-23 12:09:12 -04:00
|
|
|
json=dict(
|
2016-10-21 19:55:37 -04:00
|
|
|
database_id=db_id,
|
2016-08-30 01:11:36 -04:00
|
|
|
sql=sql,
|
2018-08-28 20:40:45 -04:00
|
|
|
runAsync=async_,
|
2016-08-30 01:11:36 -04:00
|
|
|
select_as_cta=cta,
|
|
|
|
tmp_table_name=tmp_table,
|
2016-09-28 16:55:06 -04:00
|
|
|
client_id=client_id,
|
2016-08-30 01:11:36 -04:00
|
|
|
),
|
|
|
|
)
|
|
|
|
self.logout()
|
2019-06-01 12:21:35 -04:00
|
|
|
return json.loads(resp.data)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-03-06 19:20:55 -05:00
|
|
|
def test_run_sync_query_dont_exist(self):
|
2019-09-08 13:18:09 -04:00
|
|
|
main_db = get_example_database()
|
2016-10-21 19:55:37 -04:00
|
|
|
db_id = main_db.id
|
2019-06-25 16:34:48 -04:00
|
|
|
sql_dont_exist = "SELECT name FROM table_dont_exist"
|
2019-09-23 12:09:12 -04:00
|
|
|
result1 = self.run_sql(db_id, sql_dont_exist, "1", cta=True)
|
2019-06-25 16:34:48 -04:00
|
|
|
self.assertTrue("error" in result1)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-03-06 19:20:55 -05:00
|
|
|
def test_run_sync_query_cta(self):
|
2019-09-08 13:18:09 -04:00
|
|
|
main_db = get_example_database()
|
2017-03-06 19:20:55 -05:00
|
|
|
db_id = main_db.id
|
2019-06-25 16:34:48 -04:00
|
|
|
tmp_table_name = "tmp_async_22"
|
2018-09-19 16:30:25 -04:00
|
|
|
self.drop_table_if_exists(tmp_table_name, main_db)
|
2019-09-08 13:18:09 -04:00
|
|
|
name = "James"
|
|
|
|
sql_where = f"SELECT name FROM birth_names WHERE name='{name}' LIMIT 1"
|
2019-09-23 12:09:12 -04:00
|
|
|
result = self.run_sql(db_id, sql_where, "2", tmp_table=tmp_table_name, cta=True)
|
2019-06-25 16:34:48 -04:00
|
|
|
self.assertEqual(QueryStatus.SUCCESS, result["query"]["state"])
|
|
|
|
self.assertEqual([], result["data"])
|
|
|
|
self.assertEqual([], result["columns"])
|
|
|
|
query2 = self.get_query_by_id(result["query"]["serverId"])
|
2016-08-30 01:11:36 -04:00
|
|
|
|
|
|
|
# Check the data in the tmp table.
|
2020-03-03 12:52:20 -05:00
|
|
|
results = self.run_sql(db_id, query2.select_sql, "sdf2134")
|
|
|
|
self.assertEqual(results["status"], "success")
|
|
|
|
self.assertGreater(len(results["data"]), 0)
|
|
|
|
|
|
|
|
# cleanup tmp table
|
|
|
|
self.drop_table_if_exists(tmp_table_name, get_example_database())
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2017-03-06 19:20:55 -05:00
|
|
|
def test_run_sync_query_cta_no_data(self):
|
2019-09-08 13:18:09 -04:00
|
|
|
main_db = get_example_database()
|
2017-03-06 19:20:55 -05:00
|
|
|
db_id = main_db.id
|
2019-09-08 13:18:09 -04:00
|
|
|
sql_empty_result = "SELECT * FROM birth_names WHERE name='random'"
|
2019-06-25 16:34:48 -04:00
|
|
|
result3 = self.run_sql(db_id, sql_empty_result, "3")
|
|
|
|
self.assertEqual(QueryStatus.SUCCESS, result3["query"]["state"])
|
|
|
|
self.assertEqual([], result3["data"])
|
|
|
|
self.assertEqual([], result3["columns"])
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2019-06-25 16:34:48 -04:00
|
|
|
query3 = self.get_query_by_id(result3["query"]["serverId"])
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(QueryStatus.SUCCESS, query3.status)
|
|
|
|
|
2018-09-19 16:30:25 -04:00
|
|
|
def drop_table_if_exists(self, table_name, database=None):
|
|
|
|
"""Drop table if it exists, works on any DB"""
|
2019-06-25 16:34:48 -04:00
|
|
|
sql = "DROP TABLE {}".format(table_name)
|
2018-09-19 16:30:25 -04:00
|
|
|
db_id = database.id
|
|
|
|
if database:
|
|
|
|
database.allow_dml = True
|
|
|
|
db.session.flush()
|
|
|
|
return self.run_sql(db_id, sql)
|
|
|
|
|
2020-03-03 12:52:20 -05:00
|
|
|
@mock.patch(
|
|
|
|
"superset.views.core.get_cta_schema_name", lambda d, u, s, sql: CTAS_SCHEMA_NAME
|
|
|
|
)
|
|
|
|
def test_run_sync_query_cta_config(self):
|
|
|
|
main_db = get_example_database()
|
|
|
|
db_id = main_db.id
|
|
|
|
if main_db.backend == "sqlite":
|
|
|
|
# sqlite doesn't support schemas
|
|
|
|
return
|
|
|
|
tmp_table_name = "tmp_async_22"
|
|
|
|
expected_full_table_name = f"{CTAS_SCHEMA_NAME}.{tmp_table_name}"
|
|
|
|
self.drop_table_if_exists(expected_full_table_name, main_db)
|
|
|
|
name = "James"
|
|
|
|
sql_where = f"SELECT name FROM birth_names WHERE name='{name}'"
|
|
|
|
result = self.run_sql(
|
|
|
|
db_id, sql_where, "cid2", tmp_table=tmp_table_name, cta=True
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assertEqual(QueryStatus.SUCCESS, result["query"]["state"])
|
|
|
|
self.assertEqual([], result["data"])
|
|
|
|
self.assertEqual([], result["columns"])
|
|
|
|
query = self.get_query_by_id(result["query"]["serverId"])
|
|
|
|
self.assertEqual(
|
|
|
|
f"CREATE TABLE {expected_full_table_name} AS \n"
|
|
|
|
"SELECT name FROM birth_names "
|
|
|
|
"WHERE name='James'",
|
|
|
|
query.executed_sql,
|
|
|
|
)
|
|
|
|
self.assertEqual(
|
|
|
|
"SELECT *\n" f"FROM {expected_full_table_name}", query.select_sql
|
|
|
|
)
|
|
|
|
time.sleep(CELERY_SHORT_SLEEP_TIME)
|
|
|
|
results = self.run_sql(db_id, query.select_sql)
|
|
|
|
self.assertEqual(results["status"], "success")
|
|
|
|
self.drop_table_if_exists(expected_full_table_name, get_example_database())
|
|
|
|
|
|
|
|
@mock.patch(
|
|
|
|
"superset.views.core.get_cta_schema_name", lambda d, u, s, sql: CTAS_SCHEMA_NAME
|
|
|
|
)
|
|
|
|
def test_run_async_query_cta_config(self):
|
|
|
|
main_db = get_example_database()
|
|
|
|
db_id = main_db.id
|
|
|
|
if main_db.backend == "sqlite":
|
|
|
|
# sqlite doesn't support schemas
|
|
|
|
return
|
|
|
|
tmp_table_name = "sqllab_test_table_async_1"
|
|
|
|
expected_full_table_name = f"{CTAS_SCHEMA_NAME}.{tmp_table_name}"
|
|
|
|
self.drop_table_if_exists(expected_full_table_name, main_db)
|
|
|
|
sql_where = "SELECT name FROM birth_names WHERE name='James' LIMIT 10"
|
|
|
|
result = self.run_sql(
|
|
|
|
db_id,
|
|
|
|
sql_where,
|
|
|
|
"cid3",
|
|
|
|
async_=True,
|
|
|
|
tmp_table="sqllab_test_table_async_1",
|
|
|
|
cta=True,
|
|
|
|
)
|
|
|
|
db.session.close()
|
|
|
|
time.sleep(CELERY_SLEEP_TIME)
|
|
|
|
|
|
|
|
query = self.get_query_by_id(result["query"]["serverId"])
|
|
|
|
self.assertEqual(QueryStatus.SUCCESS, query.status)
|
|
|
|
self.assertTrue(f"FROM {expected_full_table_name}" in query.select_sql)
|
|
|
|
self.assertEqual(
|
|
|
|
f"CREATE TABLE {expected_full_table_name} AS \n"
|
|
|
|
"SELECT name FROM birth_names "
|
|
|
|
"WHERE name='James' "
|
|
|
|
"LIMIT 10",
|
|
|
|
query.executed_sql,
|
|
|
|
)
|
|
|
|
self.drop_table_if_exists(expected_full_table_name, get_example_database())
|
|
|
|
|
|
|
|
def test_run_async_cta_query(self):
|
2019-09-08 13:18:09 -04:00
|
|
|
main_db = get_example_database()
|
2018-09-19 16:30:25 -04:00
|
|
|
db_id = main_db.id
|
|
|
|
|
2020-03-03 12:52:20 -05:00
|
|
|
table_name = "tmp_async_4"
|
|
|
|
self.drop_table_if_exists(table_name, main_db)
|
2020-05-14 11:58:30 -04:00
|
|
|
time.sleep(DROP_TABLE_SLEEP_TIME)
|
2018-09-19 16:30:25 -04:00
|
|
|
|
2019-09-08 13:18:09 -04:00
|
|
|
sql_where = "SELECT name FROM birth_names WHERE name='James' LIMIT 10"
|
2017-03-06 19:20:55 -05:00
|
|
|
result = self.run_sql(
|
2020-03-03 12:52:20 -05:00
|
|
|
db_id, sql_where, "cid4", async_=True, tmp_table="tmp_async_4", cta=True
|
2019-06-25 16:34:48 -04:00
|
|
|
)
|
2019-11-20 10:47:06 -05:00
|
|
|
db.session.close()
|
2019-06-25 16:34:48 -04:00
|
|
|
assert result["query"]["state"] in (
|
|
|
|
QueryStatus.PENDING,
|
|
|
|
QueryStatus.RUNNING,
|
|
|
|
QueryStatus.SUCCESS,
|
|
|
|
)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2018-09-19 16:30:25 -04:00
|
|
|
time.sleep(CELERY_SLEEP_TIME)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2019-06-25 16:34:48 -04:00
|
|
|
query = self.get_query_by_id(result["query"]["serverId"])
|
2017-03-06 19:20:55 -05:00
|
|
|
self.assertEqual(QueryStatus.SUCCESS, query.status)
|
2019-09-08 13:18:09 -04:00
|
|
|
|
2020-03-03 12:52:20 -05:00
|
|
|
self.assertTrue(f"FROM {table_name}" in query.select_sql)
|
2020-05-14 11:58:30 -04:00
|
|
|
|
2016-08-30 01:11:36 -04:00
|
|
|
self.assertEqual(
|
2020-03-03 12:52:20 -05:00
|
|
|
f"CREATE TABLE {table_name} AS \n"
|
2019-09-08 13:18:09 -04:00
|
|
|
"SELECT name FROM birth_names "
|
|
|
|
"WHERE name='James' "
|
|
|
|
"LIMIT 10",
|
2019-06-25 16:34:48 -04:00
|
|
|
query.executed_sql,
|
|
|
|
)
|
2017-03-06 19:20:55 -05:00
|
|
|
self.assertEqual(sql_where, query.sql)
|
|
|
|
self.assertEqual(0, query.rows)
|
|
|
|
self.assertEqual(True, query.select_as_cta)
|
|
|
|
self.assertEqual(True, query.select_as_cta_used)
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2020-03-03 12:52:20 -05:00
|
|
|
def test_run_async_cta_query_with_lower_limit(self):
|
2019-09-08 13:18:09 -04:00
|
|
|
main_db = get_example_database()
|
2018-09-19 16:30:25 -04:00
|
|
|
db_id = main_db.id
|
2019-09-08 13:18:09 -04:00
|
|
|
tmp_table = "tmp_async_2"
|
|
|
|
self.drop_table_if_exists(tmp_table, main_db)
|
2018-09-19 16:30:25 -04:00
|
|
|
|
2019-09-08 13:18:09 -04:00
|
|
|
sql_where = "SELECT name FROM birth_names LIMIT 1"
|
2018-05-25 18:45:11 -04:00
|
|
|
result = self.run_sql(
|
2020-03-03 12:52:20 -05:00
|
|
|
db_id, sql_where, "id1", async_=True, tmp_table=tmp_table, cta=True
|
2019-06-25 16:34:48 -04:00
|
|
|
)
|
2019-11-20 10:47:06 -05:00
|
|
|
db.session.close()
|
2019-06-25 16:34:48 -04:00
|
|
|
assert result["query"]["state"] in (
|
|
|
|
QueryStatus.PENDING,
|
|
|
|
QueryStatus.RUNNING,
|
|
|
|
QueryStatus.SUCCESS,
|
|
|
|
)
|
2018-05-25 18:45:11 -04:00
|
|
|
|
2018-09-19 16:30:25 -04:00
|
|
|
time.sleep(CELERY_SLEEP_TIME)
|
2018-05-25 18:45:11 -04:00
|
|
|
|
2019-06-25 16:34:48 -04:00
|
|
|
query = self.get_query_by_id(result["query"]["serverId"])
|
2018-05-25 18:45:11 -04:00
|
|
|
self.assertEqual(QueryStatus.SUCCESS, query.status)
|
2020-03-03 12:52:20 -05:00
|
|
|
|
|
|
|
self.assertIn(f"FROM {tmp_table}", query.select_sql)
|
2018-05-25 18:45:11 -04:00
|
|
|
self.assertEqual(
|
2019-09-08 13:18:09 -04:00
|
|
|
f"CREATE TABLE {tmp_table} AS \n" "SELECT name FROM birth_names LIMIT 1",
|
2019-06-25 16:34:48 -04:00
|
|
|
query.executed_sql,
|
|
|
|
)
|
2018-05-25 18:45:11 -04:00
|
|
|
self.assertEqual(sql_where, query.sql)
|
|
|
|
self.assertEqual(0, query.rows)
|
2020-03-03 12:52:20 -05:00
|
|
|
self.assertEqual(None, query.limit)
|
2018-05-25 18:45:11 -04:00
|
|
|
self.assertEqual(True, query.select_as_cta)
|
|
|
|
self.assertEqual(True, query.select_as_cta_used)
|
|
|
|
|
2019-08-27 17:23:40 -04:00
|
|
|
def test_default_data_serialization(self):
|
|
|
|
data = [("a", 4, 4.0, datetime.datetime(2019, 8, 18, 16, 39, 16, 660000))]
|
|
|
|
cursor_descr = (
|
|
|
|
("a", "string"),
|
|
|
|
("b", "int"),
|
|
|
|
("c", "float"),
|
|
|
|
("d", "datetime"),
|
|
|
|
)
|
|
|
|
db_engine_spec = BaseEngineSpec()
|
2020-01-03 11:55:39 -05:00
|
|
|
results = SupersetResultSet(data, cursor_descr, db_engine_spec)
|
2019-08-27 17:23:40 -04:00
|
|
|
|
|
|
|
with mock.patch.object(
|
|
|
|
db_engine_spec, "expand_data", wraps=db_engine_spec.expand_data
|
|
|
|
) as expand_data:
|
2020-03-03 12:52:20 -05:00
|
|
|
(
|
|
|
|
data,
|
|
|
|
selected_columns,
|
|
|
|
all_columns,
|
|
|
|
expanded_columns,
|
|
|
|
) = sql_lab._serialize_and_expand_data(results, db_engine_spec, False, True)
|
2019-08-27 17:23:40 -04:00
|
|
|
expand_data.assert_called_once()
|
|
|
|
|
|
|
|
self.assertIsInstance(data, list)
|
|
|
|
|
|
|
|
def test_new_data_serialization(self):
|
|
|
|
data = [("a", 4, 4.0, datetime.datetime(2019, 8, 18, 16, 39, 16, 660000))]
|
|
|
|
cursor_descr = (
|
|
|
|
("a", "string"),
|
|
|
|
("b", "int"),
|
|
|
|
("c", "float"),
|
|
|
|
("d", "datetime"),
|
|
|
|
)
|
|
|
|
db_engine_spec = BaseEngineSpec()
|
2020-01-03 11:55:39 -05:00
|
|
|
results = SupersetResultSet(data, cursor_descr, db_engine_spec)
|
2019-08-27 17:23:40 -04:00
|
|
|
|
|
|
|
with mock.patch.object(
|
|
|
|
db_engine_spec, "expand_data", wraps=db_engine_spec.expand_data
|
|
|
|
) as expand_data:
|
2020-03-03 12:52:20 -05:00
|
|
|
(
|
|
|
|
data,
|
|
|
|
selected_columns,
|
|
|
|
all_columns,
|
|
|
|
expanded_columns,
|
|
|
|
) = sql_lab._serialize_and_expand_data(results, db_engine_spec, True)
|
2019-08-27 17:23:40 -04:00
|
|
|
expand_data.assert_not_called()
|
|
|
|
|
|
|
|
self.assertIsInstance(data, bytes)
|
|
|
|
|
|
|
|
def test_default_payload_serialization(self):
|
|
|
|
use_new_deserialization = False
|
|
|
|
data = [("a", 4, 4.0, datetime.datetime(2019, 8, 18, 16, 39, 16, 660000))]
|
|
|
|
cursor_descr = (
|
|
|
|
("a", "string"),
|
|
|
|
("b", "int"),
|
|
|
|
("c", "float"),
|
|
|
|
("d", "datetime"),
|
|
|
|
)
|
|
|
|
db_engine_spec = BaseEngineSpec()
|
2020-01-03 11:55:39 -05:00
|
|
|
results = SupersetResultSet(data, cursor_descr, db_engine_spec)
|
2019-08-27 17:23:40 -04:00
|
|
|
query = {
|
|
|
|
"database_id": 1,
|
|
|
|
"sql": "SELECT * FROM birth_names LIMIT 100",
|
|
|
|
"status": QueryStatus.PENDING,
|
|
|
|
}
|
2020-03-03 12:52:20 -05:00
|
|
|
(
|
|
|
|
serialized_data,
|
|
|
|
selected_columns,
|
|
|
|
all_columns,
|
|
|
|
expanded_columns,
|
|
|
|
) = sql_lab._serialize_and_expand_data(
|
2020-01-03 11:55:39 -05:00
|
|
|
results, db_engine_spec, use_new_deserialization
|
2019-08-27 17:23:40 -04:00
|
|
|
)
|
|
|
|
payload = {
|
|
|
|
"query_id": 1,
|
|
|
|
"status": QueryStatus.SUCCESS,
|
|
|
|
"state": QueryStatus.SUCCESS,
|
|
|
|
"data": serialized_data,
|
|
|
|
"columns": all_columns,
|
|
|
|
"selected_columns": selected_columns,
|
|
|
|
"expanded_columns": expanded_columns,
|
|
|
|
"query": query,
|
|
|
|
}
|
|
|
|
|
|
|
|
serialized = sql_lab._serialize_payload(payload, use_new_deserialization)
|
|
|
|
self.assertIsInstance(serialized, str)
|
|
|
|
|
|
|
|
def test_msgpack_payload_serialization(self):
|
|
|
|
use_new_deserialization = True
|
|
|
|
data = [("a", 4, 4.0, datetime.datetime(2019, 8, 18, 16, 39, 16, 660000))]
|
|
|
|
cursor_descr = (
|
|
|
|
("a", "string"),
|
|
|
|
("b", "int"),
|
|
|
|
("c", "float"),
|
|
|
|
("d", "datetime"),
|
|
|
|
)
|
|
|
|
db_engine_spec = BaseEngineSpec()
|
2020-01-03 11:55:39 -05:00
|
|
|
results = SupersetResultSet(data, cursor_descr, db_engine_spec)
|
2019-08-27 17:23:40 -04:00
|
|
|
query = {
|
|
|
|
"database_id": 1,
|
|
|
|
"sql": "SELECT * FROM birth_names LIMIT 100",
|
|
|
|
"status": QueryStatus.PENDING,
|
|
|
|
}
|
2020-03-03 12:52:20 -05:00
|
|
|
(
|
|
|
|
serialized_data,
|
|
|
|
selected_columns,
|
|
|
|
all_columns,
|
|
|
|
expanded_columns,
|
|
|
|
) = sql_lab._serialize_and_expand_data(
|
2020-01-03 11:55:39 -05:00
|
|
|
results, db_engine_spec, use_new_deserialization
|
2019-08-27 17:23:40 -04:00
|
|
|
)
|
|
|
|
payload = {
|
|
|
|
"query_id": 1,
|
|
|
|
"status": QueryStatus.SUCCESS,
|
|
|
|
"state": QueryStatus.SUCCESS,
|
|
|
|
"data": serialized_data,
|
|
|
|
"columns": all_columns,
|
|
|
|
"selected_columns": selected_columns,
|
|
|
|
"expanded_columns": expanded_columns,
|
|
|
|
"query": query,
|
|
|
|
}
|
|
|
|
|
|
|
|
serialized = sql_lab._serialize_payload(payload, use_new_deserialization)
|
|
|
|
self.assertIsInstance(serialized, bytes)
|
|
|
|
|
2017-03-24 12:23:51 -04:00
|
|
|
@staticmethod
|
|
|
|
def de_unicode_dict(d):
|
|
|
|
def str_if_basestring(o):
|
2019-01-19 17:27:18 -05:00
|
|
|
if isinstance(o, str):
|
2017-03-24 12:23:51 -04:00
|
|
|
return str(o)
|
|
|
|
return o
|
2019-06-25 16:34:48 -04:00
|
|
|
|
2017-03-24 12:23:51 -04:00
|
|
|
return {str_if_basestring(k): str_if_basestring(d[k]) for k in d}
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def dictify_list_of_dicts(cls, l, k):
|
|
|
|
return {str(o[k]): cls.de_unicode_dict(o) for o in l}
|
|
|
|
|
2016-08-30 01:11:36 -04:00
|
|
|
|
2019-06-25 16:34:48 -04:00
|
|
|
if __name__ == "__main__":
|
2016-08-30 01:11:36 -04:00
|
|
|
unittest.main()
|