[druid] optimize Druid queries where possible (#1517)

* [druid] optimize Druid queries where possible

Trying to use timeseries, topn where possible, falling back on 2-phases
groupby only where needed

* Fixing py3 bug
This commit is contained in:
Maxime Beauchemin 2016-11-02 11:25:33 -07:00 committed by GitHub
parent cdf4dd0302
commit 1b124bfb87
3 changed files with 110 additions and 65 deletions

View File

@ -4,6 +4,7 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
from __future__ import unicode_literals from __future__ import unicode_literals
from collections import OrderedDict
import functools import functools
import json import json
import logging import logging
@ -1850,11 +1851,10 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
conf.get('fields', []), conf.get('fields', []),
conf.get('name', '')) conf.get('name', ''))
aggregations = { aggregations = OrderedDict()
m.metric_name: m.json_obj for m in self.metrics:
for m in self.metrics if m.metric_name in all_metrics:
if m.metric_name in all_metrics aggregations[m.metric_name] = m.json_obj
}
rejected_metrics = [ rejected_metrics = [
m.metric_name for m in self.metrics m.metric_name for m in self.metrics
@ -1898,63 +1898,78 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
client = self.cluster.get_pydruid_client() client = self.cluster.get_pydruid_client()
orig_filters = filters orig_filters = filters
if timeseries_limit and is_timeseries: if len(groupby) == 0:
order_by = metrics[0] if metrics else self.metrics[0] del qry['dimensions']
if timeseries_limit_metric: client.timeseries(**qry)
order_by = timeseries_limit_metric if len(groupby) == 1:
# Limit on the number of timeseries, doing a two-phases query if not timeseries_limit:
pre_qry = deepcopy(qry) timeseries_limit = 10000
pre_qry['granularity'] = "all" qry['threshold'] = timeseries_limit
pre_qry['limit_spec'] = { qry['dimension'] = qry.get('dimensions')[0]
"type": "default", del qry['dimensions']
"limit": timeseries_limit, qry['metric'] = list(qry['aggregations'].keys())[0]
'intervals': ( client.topn(**qry)
inner_from_dttm.isoformat() + '/' + elif len(groupby) > 1:
inner_to_dttm.isoformat()), if timeseries_limit and is_timeseries:
"columns": [{ order_by = metrics[0] if metrics else self.metrics[0]
"dimension": order_by, if timeseries_limit_metric:
"direction": "descending", order_by = timeseries_limit_metric
}], # Limit on the number of timeseries, doing a two-phases query
} pre_qry = deepcopy(qry)
client.groupby(**pre_qry) pre_qry['granularity'] = "all"
query_str += "// Two phase query\n// Phase 1\n" pre_qry['limit_spec'] = {
query_str += json.dumps( "type": "default",
client.query_builder.last_query.query_dict, indent=2) + "\n" "limit": timeseries_limit,
query_str += "//\nPhase 2 (built based on phase one's results)\n" 'intervals': (
df = client.export_pandas() inner_from_dttm.isoformat() + '/' +
if df is not None and not df.empty: inner_to_dttm.isoformat()),
dims = qry['dimensions'] "columns": [{
filters = [] "dimension": order_by,
for unused, row in df.iterrows(): "direction": "descending",
fields = [] }],
for dim in dims: }
f = Dimension(dim) == row[dim] client.groupby(**pre_qry)
fields.append(f) query_str += "// Two phase query\n// Phase 1\n"
if len(fields) > 1: query_str += json.dumps(
filt = Filter(type="and", fields=fields) client.query_builder.last_query.query_dict, indent=2)
filters.append(filt) query_str += "\n"
elif fields: query_str += (
filters.append(fields[0]) "//\nPhase 2 (built based on phase one's results)\n")
df = client.export_pandas()
if df is not None and not df.empty:
dims = qry['dimensions']
filters = []
for unused, row in df.iterrows():
fields = []
for dim in dims:
f = Dimension(dim) == row[dim]
fields.append(f)
if len(fields) > 1:
filt = Filter(type="and", fields=fields)
filters.append(filt)
elif fields:
filters.append(fields[0])
if filters: if filters:
ff = Filter(type="or", fields=filters) ff = Filter(type="or", fields=filters)
if not orig_filters: if not orig_filters:
qry['filter'] = ff qry['filter'] = ff
else: else:
qry['filter'] = Filter(type="and", fields=[ qry['filter'] = Filter(type="and", fields=[
ff, ff,
orig_filters]) orig_filters])
qry['limit_spec'] = None qry['limit_spec'] = None
if row_limit: if row_limit:
qry['limit_spec'] = { qry['limit_spec'] = {
"type": "default", "type": "default",
"limit": row_limit, "limit": row_limit,
"columns": [{ "columns": [{
"dimension": metrics[0] if metrics else self.metrics[0], "dimension": (
"direction": "descending", metrics[0] if metrics else self.metrics[0]),
}], "direction": "descending",
} }],
client.groupby(**qry) }
client.groupby(**qry)
query_str += json.dumps( query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2) client.query_builder.last_query.query_dict, indent=2)
df = client.export_pandas() df = client.export_pandas()

View File

@ -1250,6 +1250,7 @@ class Caravel(BaseCaravelView):
datasource_id=datasource_id, datasource_id=datasource_id,
args=request.args) args=request.args)
except Exception as e: except Exception as e:
logging.exception(e)
return json_error_response(utils.error_msg_from_exception(e)) return json_error_response(utils.error_msg_from_exception(e))
if not self.datasource_access(viz_obj.datasource): if not self.datasource_access(viz_obj.datasource):
@ -1263,6 +1264,7 @@ class Caravel(BaseCaravelView):
try: try:
payload = viz_obj.get_json() payload = viz_obj.get_json()
except Exception as e: except Exception as e:
logging.exception(e)
return json_error_response(utils.error_msg_from_exception(e)) return json_error_response(utils.error_msg_from_exception(e))
return Response( return Response(

View File

@ -113,10 +113,11 @@ class DruidTests(CaravelTestCase):
instance.query_dict = {} instance.query_dict = {}
instance.query_builder.last_query.query_dict = {} instance.query_builder.last_query.query_dict = {}
resp = self.client.get('/caravel/explore/druid/{}/'.format( resp = self.get_resp('/caravel/explore/druid/{}/'.format(
datasource_id)) datasource_id))
assert "[test_cluster].[test_datasource]" in resp.data.decode('utf-8') assert "[test_cluster].[test_datasource]" in resp
# One groupby
url = ( url = (
'/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&' '/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&'
'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&' 'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&'
@ -125,8 +126,35 @@ class DruidTests(CaravelTestCase):
'action=&datasource_name=test_datasource&datasource_id={}&' 'action=&datasource_name=test_datasource&datasource_id={}&'
'datasource_type=druid&previous_viz_type=table&' 'datasource_type=druid&previous_viz_type=table&'
'force=true'.format(datasource_id, datasource_id)) 'force=true'.format(datasource_id, datasource_id))
resp = self.get_resp(url) resp = self.get_json_resp(url)
assert "Canada" in resp self.assertEqual("Canada", resp['data']['records'][0]['name'])
# two groupby
url = (
'/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&'
'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&'
'include_search=false&metrics=count&groupby=name&'
'flt_col_0=dim1&groupby=second&'
'flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&'
'action=&datasource_name=test_datasource&datasource_id={}&'
'datasource_type=druid&previous_viz_type=table&'
'force=true'.format(datasource_id, datasource_id))
resp = self.get_json_resp(url)
self.assertEqual("Canada", resp['data']['records'][0]['name'])
# no groupby
url = (
'/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&'
'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&'
'include_search=false&metrics=count&'
'flt_col_0=dim1&'
'flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&'
'action=&datasource_name=test_datasource&datasource_id={}&'
'datasource_type=druid&previous_viz_type=table&'
'force=true'.format(datasource_id, datasource_id))
resp = self.get_json_resp(url)
self.assertEqual(
"2012-01-01T00:00:00", resp['data']['records'][0]['timestamp'])
def test_druid_sync_from_config(self): def test_druid_sync_from_config(self):
CLUSTER_NAME = 'new_druid' CLUSTER_NAME = 'new_druid'