From 2361192ddd129905f9066bac1d63029b0486f2e6 Mon Sep 17 00:00:00 2001 From: Jeff Niu Date: Fri, 1 Dec 2017 15:05:03 -0800 Subject: [PATCH] add_filters_from_pre_query ignores extraction functions --- superset/connectors/druid/models.py | 38 ++++++++++++++++++----------- superset/connectors/druid/views.py | 2 ++ tests/druid_func_tests.py | 16 ++++++------ 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index bffc3cb6be3da..74c663e5cdb30 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -874,14 +874,7 @@ def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter): for unused, row in df.iterrows(): fields = [] for dim in dimensions: - dim_value = dim - # Handle dimension specs by creating a filters - # based on its `outputName` - if isinstance(dim, dict): - dim_value = dim.get('outputName') - if not dim_value: - continue - f = Dimension(dim_value) == row[dim_value] + f = Dimension(dim) == row[dim] fields.append(f) if len(fields) > 1: term = Filter(type='and', fields=fields) @@ -1002,11 +995,10 @@ def run_query( # noqa / druid elif ( not having_filters and len(groupby) == 1 and - order_desc and - not isinstance(list(qry.get('dimensions'))[0], dict) + order_desc ): dim = list(qry.get('dimensions'))[0] - logging.info('Running topn query for dimension [{}]'.format(dim)) + logging.info('Running two-phase topn query for dimension [{}]'.format(dim)) if timeseries_limit_metric: order_by = timeseries_limit_metric else: @@ -1017,9 +1009,14 @@ def run_query( # noqa / druid pre_qry['threshold'] = min(row_limit, timeseries_limit or row_limit) pre_qry['metric'] = order_by - pre_qry['dimension'] = dim + if isinstance(dim, dict): + if 'dimension' in dim: + pre_qry['dimension'] = dim['dimension'] + else: + pre_qry['dimension'] = dim del pre_qry['dimensions'] client.topn(**pre_qry) + logging.info('Phase 1 Complete') query_str += '// Two phase query\n// Phase 1\n' query_str += json.dumps( client.query_builder.last_query.query_dict, indent=2) @@ -1031,7 +1028,7 @@ def run_query( # noqa / druid df = client.export_pandas() qry['filter'] = self._add_filter_from_pre_query_data( df, - qry['dimensions'], + [pre_qry['dimension']], filters) qry['threshold'] = timeseries_limit or 1000 if row_limit and granularity == 'all': @@ -1040,6 +1037,7 @@ def run_query( # noqa / druid del qry['dimensions'] qry['metric'] = list(qry['aggregations'].keys())[0] client.topn(**qry) + logging.info('Phase 2 Complete') elif len(groupby) > 0: # If grouping on multiple fields or using a having filter # we have to force a groupby query @@ -1063,7 +1061,18 @@ def run_query( # noqa / druid 'direction': order_direction, }], } + pre_qry_dims = [] + # Replace dimensions specs with their `dimension` + # values, and ignore those without + for dim in qry['dimensions']: + if isinstance(dim, dict): + if 'dimension' in dim: + pre_qry_dims.append(dim['dimension']) + else: + pre_qry_dims.append(dim) + pre_qry['dimensions'] = list(set(pre_qry_dims)) client.groupby(**pre_qry) + logging.info('Phase 1 Complete') query_str += '// Two phase query\n// Phase 1\n' query_str += json.dumps( client.query_builder.last_query.query_dict, indent=2) @@ -1075,7 +1084,7 @@ def run_query( # noqa / druid df = client.export_pandas() qry['filter'] = self._add_filter_from_pre_query_data( df, - qry['dimensions'], + pre_qry['dimensions'], filters, ) qry['limit_spec'] = None @@ -1090,6 +1099,7 @@ def run_query( # noqa / druid }], } client.groupby(**qry) + logging.info('Query Complete') query_str += json.dumps( client.query_builder.last_query.query_dict, indent=2) return query_str diff --git a/superset/connectors/druid/views.py b/superset/connectors/druid/views.py index 252a6f0ba2023..0f8e79a7dd4df 100644 --- a/superset/connectors/druid/views.py +++ b/superset/connectors/druid/views.py @@ -74,6 +74,8 @@ def pre_update(self, col): raise ValueError('Dimension Spec must be a JSON object') if 'outputName' not in dimension_spec: raise ValueError('Dimension Spec does not contain `outputName`') + if 'dimension' not in dimension_spec: + raise ValueError('Dimension Spec is missing `dimension`') # `outputName` should be the same as the `column_name` if dimension_spec['outputName'] != col.column_name: raise ValueError( diff --git a/tests/druid_func_tests.py b/tests/druid_func_tests.py index ba1f497936e70..f8d799b59fc82 100644 --- a/tests/druid_func_tests.py +++ b/tests/druid_func_tests.py @@ -212,7 +212,8 @@ def test_run_query_single_groupby(self): self.assertIn('dimensions', client.groupby.call_args_list[0][1]) self.assertEqual(['col1'], client.groupby.call_args_list[0][1]['dimensions']) # order_desc but timeseries and dimension spec - spec = {'spec': 1} + # calls topn with single dimension spec 'dimension' + spec = {'outputName': 'hello', 'dimension': 'matcho'} spec_json = json.dumps(spec) col3 = DruidColumn(column_name='col3', dimension_spec_json=spec_json) ds.columns.append(col3) @@ -224,13 +225,14 @@ def test_run_query_single_groupby(self): client=client, order_desc=True, timeseries_limit=5, filter=[], row_limit=100, ) - self.assertEqual(0, len(client.topn.call_args_list)) - self.assertEqual(2, len(client.groupby.call_args_list)) + self.assertEqual(2, len(client.topn.call_args_list)) + self.assertEqual(0, len(client.groupby.call_args_list)) self.assertEqual(0, len(client.timeseries.call_args_list)) - self.assertIn('dimensions', client.groupby.call_args_list[0][1]) - self.assertIn('dimensions', client.groupby.call_args_list[1][1]) - self.assertEqual([spec], client.groupby.call_args_list[0][1]['dimensions']) - self.assertEqual([spec], client.groupby.call_args_list[1][1]['dimensions']) + self.assertIn('dimension', client.topn.call_args_list[0][1]) + self.assertIn('dimension', client.topn.call_args_list[1][1]) + # uses dimension for pre query and full spec for final query + self.assertEqual('matcho', client.topn.call_args_list[0][1]['dimension']) + self.assertEqual(spec, client.topn.call_args_list[1][1]['dimension']) def test_run_query_multiple_groupby(self): client = Mock()