"""Unit tests for Sql Lab""" from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals from datetime import datetime, timedelta import json import unittest from flask_appbuilder.security.sqla import models as ab_models from superset import db, models, utils, appbuilder, security, sm from .base_tests import SupersetTestCase class SqlLabTests(SupersetTestCase): """Testings for Sql Lab""" def __init__(self, *args, **kwargs): super(SqlLabTests, self).__init__(*args, **kwargs) gamma_sqllab = appbuilder.sm.find_role('gamma_sqllab') security.merge_perm(sm, 'database_access', self.get_main_database(db.session).perm) def run_some_queries(self): self.logout() db.session.query(models.Query).delete() db.session.commit() self.run_sql( "SELECT * FROM ab_user", client_id='client_id_1', user_name='admin') self.run_sql( "SELECT * FROM NO_TABLE", client_id='client_id_3', user_name='admin') self.run_sql( "SELECT * FROM ab_permission", client_id='client_id_2', user_name='gamma_sqllab') self.logout() def tearDown(self): db.session.query(models.Query).delete() db.session.commit() self.logout() def test_sql_json(self): self.login('admin') data = self.run_sql('SELECT * FROM ab_user', "1") self.assertLess(0, len(data['data'])) data = self.run_sql('SELECT * FROM unexistant_table', "2") self.assertLess(0, len(data['error'])) def test_sql_json_has_access(self): main_db = self.get_main_database(db.session) sm.add_permission_view_menu('database_access', main_db.perm) db.session.commit() main_db_permission_view = ( db.session.query(ab_models.PermissionView) .join(ab_models.ViewMenu) .filter(ab_models.ViewMenu.name == '[main].(id:1)') .first() ) astronaut = sm.add_role("Astronaut") sm.add_permission_role(astronaut, main_db_permission_view) # Astronaut role is Gamma + sqllab + main db permissions for perm in sm.find_role('Gamma').permissions: sm.add_permission_role(astronaut, perm) for perm in sm.find_role('sql_lab').permissions: sm.add_permission_role(astronaut, perm) gagarin = appbuilder.sm.find_user('gagarin') if not gagarin: appbuilder.sm.add_user( 'gagarin', 'Iurii', 'Gagarin', 'gagarin@cosmos.ussr', astronaut, password='general') data = self.run_sql('SELECT * FROM ab_user', "3", user_name='gagarin') db.session.query(models.Query).delete() db.session.commit() self.assertLess(0, len(data['data'])) def test_queries_endpoint(self): self.run_some_queries() # Not logged in, should error out resp = self.client.get('/superset/queries/0') self.assertEquals(403, resp.status_code) # Admin sees queries self.login('admin') data = self.get_json_resp('/superset/queries/0') self.assertEquals(2, len(data)) # Run 2 more queries self.run_sql("SELECT * FROM ab_user LIMIT 1", client_id='client_id_4') self.run_sql("SELECT * FROM ab_user LIMIT 2", client_id='client_id_5') self.login('admin') data = self.get_json_resp('/superset/queries/0') self.assertEquals(4, len(data)) now = datetime.now() + timedelta(days=1) query = db.session.query(models.Query).filter_by( sql='SELECT * FROM ab_user LIMIT 1').first() query.changed_on = now db.session.commit() data = self.get_json_resp( '/superset/queries/{}'.format( int(utils.datetime_to_epoch(now))-1000)) self.assertEquals(1, len(data)) self.logout() resp = self.client.get('/superset/queries/0') self.assertEquals(403, resp.status_code) def test_search_query_on_db_id(self): self.run_some_queries() self.login('admin') # Test search queries on database Id data = self.get_json_resp('/superset/search_queries?database_id=1') self.assertEquals(3, len(data)) db_ids = [k['dbId'] for k in data] self.assertEquals([1, 1, 1], db_ids) resp = self.get_resp('/superset/search_queries?database_id=-1') data = json.loads(resp) self.assertEquals(0, len(data)) def test_search_query_on_user(self): self.run_some_queries() self.login('admin') # Test search queries on user Id user = appbuilder.sm.find_user('admin') data = self.get_json_resp( '/superset/search_queries?user_id={}'.format(user.id)) self.assertEquals(2, len(data)) user_ids = {k['userId'] for k in data} self.assertEquals(set([user.id]), user_ids) user = appbuilder.sm.find_user('gamma_sqllab') resp = self.get_resp( '/superset/search_queries?user_id={}'.format(user.id)) data = json.loads(resp) self.assertEquals(1, len(data)) self.assertEquals(data[0]['userId'] , user.id) def test_search_query_on_status(self): self.run_some_queries() self.login('admin') # Test search queries on status resp = self.get_resp('/superset/search_queries?status=success') data = json.loads(resp) self.assertEquals(2, len(data)) states = [k['state'] for k in data] self.assertEquals(['success', 'success'], states) resp = self.get_resp('/superset/search_queries?status=failed') data = json.loads(resp) self.assertEquals(1, len(data)) self.assertEquals(data[0]['state'], 'failed') def test_search_query_on_text(self): self.run_some_queries() self.login('admin') url = '/superset/search_queries?search_text=permission' data = self.get_json_resp(url) self.assertEquals(1, len(data)) self.assertIn('permission', data[0]['sql']) def test_search_query_on_time(self): self.run_some_queries() self.login('admin') first_query_time = ( db.session.query(models.Query) .filter_by(sql='SELECT * FROM ab_user').one() ).start_time second_query_time = ( db.session.query(models.Query) .filter_by(sql='SELECT * FROM ab_permission').one() ).start_time # Test search queries on time filter from_time = 'from={}'.format(int(first_query_time)) to_time = 'to={}'.format(int(second_query_time)) params = [from_time, to_time] resp = self.get_resp('/superset/search_queries?'+'&'.join(params)) data = json.loads(resp) self.assertEquals(2, len(data)) for k in data: self.assertLess(int(first_query_time), k['startDttm']) self.assertLess(k['startDttm'], int(second_query_time)) def test_alias_duplicate(self): self.run_sql( "SELECT username as col, id as col, username FROM ab_user", client_id='2e2df3', user_name='admin', raise_on_error=True) if __name__ == '__main__': unittest.main()