diff --git a/caravel/models.py b/caravel/models.py index ffc44b287c..573de6352a 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -4,6 +4,7 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals +from collections import OrderedDict import functools import json import logging @@ -1850,11 +1851,10 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable): conf.get('fields', []), conf.get('name', '')) - aggregations = { - m.metric_name: m.json_obj - for m in self.metrics - if m.metric_name in all_metrics - } + aggregations = OrderedDict() + for m in self.metrics: + if m.metric_name in all_metrics: + aggregations[m.metric_name] = m.json_obj rejected_metrics = [ m.metric_name for m in self.metrics @@ -1898,63 +1898,78 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable): client = self.cluster.get_pydruid_client() orig_filters = filters - if timeseries_limit and is_timeseries: - order_by = metrics[0] if metrics else self.metrics[0] - if timeseries_limit_metric: - order_by = timeseries_limit_metric - # Limit on the number of timeseries, doing a two-phases query - pre_qry = deepcopy(qry) - pre_qry['granularity'] = "all" - pre_qry['limit_spec'] = { - "type": "default", - "limit": timeseries_limit, - 'intervals': ( - inner_from_dttm.isoformat() + '/' + - inner_to_dttm.isoformat()), - "columns": [{ - "dimension": order_by, - "direction": "descending", - }], - } - client.groupby(**pre_qry) - query_str += "// Two phase query\n// Phase 1\n" - query_str += json.dumps( - client.query_builder.last_query.query_dict, indent=2) + "\n" - query_str += "//\nPhase 2 (built based on phase one's results)\n" - df = client.export_pandas() - if df is not None and not df.empty: - dims = qry['dimensions'] - filters = [] - for unused, row in df.iterrows(): - fields = [] - for dim in dims: - f = Dimension(dim) == row[dim] - fields.append(f) - if len(fields) > 1: - filt = Filter(type="and", fields=fields) - filters.append(filt) - elif fields: - filters.append(fields[0]) + if len(groupby) == 0: + del qry['dimensions'] + client.timeseries(**qry) + if len(groupby) == 1: + if not timeseries_limit: + timeseries_limit = 10000 + qry['threshold'] = timeseries_limit + qry['dimension'] = qry.get('dimensions')[0] + del qry['dimensions'] + qry['metric'] = list(qry['aggregations'].keys())[0] + client.topn(**qry) + elif len(groupby) > 1: + if timeseries_limit and is_timeseries: + order_by = metrics[0] if metrics else self.metrics[0] + if timeseries_limit_metric: + order_by = timeseries_limit_metric + # Limit on the number of timeseries, doing a two-phases query + pre_qry = deepcopy(qry) + pre_qry['granularity'] = "all" + pre_qry['limit_spec'] = { + "type": "default", + "limit": timeseries_limit, + 'intervals': ( + inner_from_dttm.isoformat() + '/' + + inner_to_dttm.isoformat()), + "columns": [{ + "dimension": order_by, + "direction": "descending", + }], + } + client.groupby(**pre_qry) + query_str += "// Two phase query\n// Phase 1\n" + query_str += json.dumps( + client.query_builder.last_query.query_dict, indent=2) + query_str += "\n" + query_str += ( + "//\nPhase 2 (built based on phase one's results)\n") + df = client.export_pandas() + if df is not None and not df.empty: + dims = qry['dimensions'] + filters = [] + for unused, row in df.iterrows(): + fields = [] + for dim in dims: + f = Dimension(dim) == row[dim] + fields.append(f) + if len(fields) > 1: + filt = Filter(type="and", fields=fields) + filters.append(filt) + elif fields: + filters.append(fields[0]) - if filters: - ff = Filter(type="or", fields=filters) - if not orig_filters: - qry['filter'] = ff - else: - qry['filter'] = Filter(type="and", fields=[ - ff, - orig_filters]) - qry['limit_spec'] = None - if row_limit: - qry['limit_spec'] = { - "type": "default", - "limit": row_limit, - "columns": [{ - "dimension": metrics[0] if metrics else self.metrics[0], - "direction": "descending", - }], - } - client.groupby(**qry) + if filters: + ff = Filter(type="or", fields=filters) + if not orig_filters: + qry['filter'] = ff + else: + qry['filter'] = Filter(type="and", fields=[ + ff, + orig_filters]) + qry['limit_spec'] = None + if row_limit: + qry['limit_spec'] = { + "type": "default", + "limit": row_limit, + "columns": [{ + "dimension": ( + metrics[0] if metrics else self.metrics[0]), + "direction": "descending", + }], + } + client.groupby(**qry) query_str += json.dumps( client.query_builder.last_query.query_dict, indent=2) df = client.export_pandas() diff --git a/caravel/views.py b/caravel/views.py index 4f4d9cc93f..8a8d39eff2 100755 --- a/caravel/views.py +++ b/caravel/views.py @@ -1250,6 +1250,7 @@ class Caravel(BaseCaravelView): datasource_id=datasource_id, args=request.args) except Exception as e: + logging.exception(e) return json_error_response(utils.error_msg_from_exception(e)) if not self.datasource_access(viz_obj.datasource): @@ -1263,6 +1264,7 @@ class Caravel(BaseCaravelView): try: payload = viz_obj.get_json() except Exception as e: + logging.exception(e) return json_error_response(utils.error_msg_from_exception(e)) return Response( diff --git a/tests/druid_tests.py b/tests/druid_tests.py index 0a9dee4ba9..9d85c7c2cc 100644 --- a/tests/druid_tests.py +++ b/tests/druid_tests.py @@ -113,10 +113,11 @@ class DruidTests(CaravelTestCase): instance.query_dict = {} instance.query_builder.last_query.query_dict = {} - resp = self.client.get('/caravel/explore/druid/{}/'.format( + resp = self.get_resp('/caravel/explore/druid/{}/'.format( datasource_id)) - assert "[test_cluster].[test_datasource]" in resp.data.decode('utf-8') + assert "[test_cluster].[test_datasource]" in resp + # One groupby url = ( '/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&' 'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&' @@ -125,8 +126,35 @@ class DruidTests(CaravelTestCase): 'action=&datasource_name=test_datasource&datasource_id={}&' 'datasource_type=druid&previous_viz_type=table&' 'force=true'.format(datasource_id, datasource_id)) - resp = self.get_resp(url) - assert "Canada" in resp + resp = self.get_json_resp(url) + self.assertEqual("Canada", resp['data']['records'][0]['name']) + + # two groupby + url = ( + '/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&' + 'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&' + 'include_search=false&metrics=count&groupby=name&' + 'flt_col_0=dim1&groupby=second&' + 'flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&' + 'action=&datasource_name=test_datasource&datasource_id={}&' + 'datasource_type=druid&previous_viz_type=table&' + 'force=true'.format(datasource_id, datasource_id)) + resp = self.get_json_resp(url) + self.assertEqual("Canada", resp['data']['records'][0]['name']) + + # no groupby + url = ( + '/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&' + 'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&' + 'include_search=false&metrics=count&' + 'flt_col_0=dim1&' + 'flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&' + 'action=&datasource_name=test_datasource&datasource_id={}&' + 'datasource_type=druid&previous_viz_type=table&' + 'force=true'.format(datasource_id, datasource_id)) + resp = self.get_json_resp(url) + self.assertEqual( + "2012-01-01T00:00:00", resp['data']['records'][0]['timestamp']) def test_druid_sync_from_config(self): CLUSTER_NAME = 'new_druid'