"""Unit tests for Superset""" from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import logging import json import os import unittest from flask_appbuilder.security.sqla import models as ab_models from superset import app, cli, db, models, appbuilder, sm from superset.security import sync_role_definitions os.environ['SUPERSET_CONFIG'] = 'tests.superset_test_config' BASE_DIR = app.config.get("BASE_DIR") class SupersetTestCase(unittest.TestCase): requires_examples = False examples_loaded = False def __init__(self, *args, **kwargs): if ( self.requires_examples and not os.environ.get('SOLO_TEST') and not os.environ.get('examples_loaded') ): logging.info("Loading examples") cli.load_examples(load_test_data=True) logging.info("Done loading examples") sync_role_definitions() os.environ['examples_loaded'] = '1' else: sync_role_definitions() super(SupersetTestCase, self).__init__(*args, **kwargs) self.client = app.test_client() self.maxDiff = None gamma_sqllab = sm.add_role("gamma_sqllab") for perm in sm.find_role('Gamma').permissions: sm.add_permission_role(gamma_sqllab, perm) for perm in sm.find_role('sql_lab').permissions: sm.add_permission_role(gamma_sqllab, perm) admin = appbuilder.sm.find_user('admin') if not admin: appbuilder.sm.add_user( 'admin', 'admin', ' user', 'admin@fab.org', appbuilder.sm.find_role('Admin'), password='general') gamma = appbuilder.sm.find_user('gamma') if not gamma: appbuilder.sm.add_user( 'gamma', 'gamma', 'user', 'gamma@fab.org', appbuilder.sm.find_role('Gamma'), password='general') gamma_sqllab = appbuilder.sm.find_user('gamma_sqllab') if not gamma_sqllab: gamma_sqllab = appbuilder.sm.add_user( 'gamma_sqllab', 'gamma_sqllab', 'user', 'gamma_sqllab@fab.org', appbuilder.sm.find_role('gamma_sqllab'), password='general') alpha = appbuilder.sm.find_user('alpha') if not alpha: appbuilder.sm.add_user( 'alpha', 'alpha', 'user', 'alpha@fab.org', appbuilder.sm.find_role('Alpha'), password='general') # create druid cluster and druid datasources session = db.session cluster = session.query(models.DruidCluster).filter_by( cluster_name="druid_test").first() if not cluster: cluster = models.DruidCluster(cluster_name="druid_test") session.add(cluster) session.commit() druid_datasource1 = models.DruidDatasource( datasource_name='druid_ds_1', cluster_name='druid_test' ) session.add(druid_datasource1) druid_datasource2 = models.DruidDatasource( datasource_name='druid_ds_2', cluster_name='druid_test' ) session.add(druid_datasource2) session.commit() def get_or_create(self, cls, criteria, session): obj = session.query(cls).filter_by(**criteria).first() if not obj: obj = cls(**criteria) return obj def login(self, username='admin', password='general'): resp = self.get_resp( '/login/', data=dict(username=username, password=password)) self.assertIn('Welcome', resp) def get_query_by_sql(self, sql): session = db.create_scoped_session() query = session.query(models.Query).filter_by(sql=sql).first() session.close() return query def get_latest_query(self, sql): session = db.create_scoped_session() query = ( session.query(models.Query) .order_by(models.Query.id.desc()) .first() ) session.close() return query def get_slice(self, slice_name, session): slc = ( session.query(models.Slice) .filter_by(slice_name=slice_name) .one() ) session.expunge_all() return slc def get_table_by_name(self, name): return db.session.query(models.SqlaTable).filter_by( table_name=name).first() def get_druid_ds_by_name(self, name): return db.session.query(models.DruidDatasource).filter_by( datasource_name=name).first() def get_resp( self, url, data=None, follow_redirects=True, raise_on_error=True): """Shortcut to get the parsed results while following redirects""" if data: resp = self.client.post( url, data=data, follow_redirects=follow_redirects) else: resp = self.client.get(url, follow_redirects=follow_redirects) if raise_on_error and resp.status_code > 400: raise Exception( "http request failed with code {}".format(resp.status_code)) return resp.data.decode('utf-8') def get_json_resp( self, url, data=None, follow_redirects=True, raise_on_error=True): """Shortcut to get the parsed results while following redirects""" resp = self.get_resp(url, data, follow_redirects, raise_on_error) return json.loads(resp) def get_main_database(self, session): return ( db.session.query(models.Database) .filter_by(database_name='main') .first() ) def get_access_requests(self, username, ds_type, ds_id): DAR = models.DatasourceAccessRequest return ( db.session.query(DAR) .filter( DAR.created_by == sm.find_user(username=username), DAR.datasource_type == ds_type, DAR.datasource_id == ds_id, ) .first() ) def logout(self): self.client.get('/logout/', follow_redirects=True) def grant_public_access_to_table(self, table): public_role = appbuilder.sm.find_role('Public') perms = db.session.query(ab_models.PermissionView).all() for perm in perms: if (perm.permission.name == 'datasource_access' and perm.view_menu and table.perm in perm.view_menu.name): appbuilder.sm.add_permission_role(public_role, perm) def revoke_public_access_to_table(self, table): public_role = appbuilder.sm.find_role('Public') perms = db.session.query(ab_models.PermissionView).all() for perm in perms: if (perm.permission.name == 'datasource_access' and perm.view_menu and table.perm in perm.view_menu.name): appbuilder.sm.del_permission_role(public_role, perm) def run_sql(self, sql, client_id, user_name=None, raise_on_error=False): if user_name: self.logout() self.login(username=(user_name if user_name else 'admin')) dbid = self.get_main_database(db.session).id resp = self.get_json_resp( '/superset/sql_json/', raise_on_error=False, data=dict(database_id=dbid, sql=sql, select_as_create_as=False, client_id=client_id), ) if raise_on_error and 'error' in resp: raise Exception("run_sql failed") return resp def test_gamma_permissions(self): def assert_can_read(view_menu): self.assertIn(('can_show', view_menu), gamma_perm_set) self.assertIn(('can_list', view_menu), gamma_perm_set) def assert_can_write(view_menu): self.assertIn(('can_add', view_menu), gamma_perm_set) self.assertIn(('can_download', view_menu), gamma_perm_set) self.assertIn(('can_delete', view_menu), gamma_perm_set) self.assertIn(('can_edit', view_menu), gamma_perm_set) def assert_cannot_write(view_menu): self.assertNotIn(('can_add', view_menu), gamma_perm_set) self.assertNotIn(('can_download', view_menu), gamma_perm_set) self.assertNotIn(('can_delete', view_menu), gamma_perm_set) self.assertNotIn(('can_edit', view_menu), gamma_perm_set) self.assertNotIn(('can_save', view_menu), gamma_perm_set) def assert_can_all(view_menu): assert_can_read(view_menu) assert_can_write(view_menu) gamma_perm_set = set() for perm in sm.find_role('Gamma').permissions: gamma_perm_set.add((perm.permission.name, perm.view_menu.name)) # check read only perms assert_can_read('TableModelView') assert_cannot_write('DruidColumnInlineView') # make sure that user can create slices and dashboards assert_can_all('SliceModelView') assert_can_all('DashboardModelView') self.assertIn(('can_add_slices', 'Superset'), gamma_perm_set) self.assertIn(('can_copy_dash', 'Superset'), gamma_perm_set) self.assertIn(('can_activity_per_day', 'Superset'), gamma_perm_set) self.assertIn(('can_created_dashboards', 'Superset'), gamma_perm_set) self.assertIn(('can_created_slices', 'Superset'), gamma_perm_set) self.assertIn(('can_csv', 'Superset'), gamma_perm_set) self.assertIn(('can_dashboard', 'Superset'), gamma_perm_set) self.assertIn(('can_explore', 'Superset'), gamma_perm_set) self.assertIn(('can_explore_json', 'Superset'), gamma_perm_set) self.assertIn(('can_fave_dashboards', 'Superset'), gamma_perm_set) self.assertIn(('can_fave_slices', 'Superset'), gamma_perm_set) self.assertIn(('can_save_dash', 'Superset'), gamma_perm_set) self.assertIn(('can_slice', 'Superset'), gamma_perm_set) self.assertIn(('can_update_explore', 'Superset'), gamma_perm_set)