Skip to content

Commit

Permalink
add_filters_from_pre_query ignores extraction functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Mogball committed Dec 2, 2017
1 parent 51e51f6 commit 2361192
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 21 deletions.
38 changes: 24 additions & 14 deletions superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,14 +874,7 @@ def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
for unused, row in df.iterrows():
fields = []
for dim in dimensions:
dim_value = dim
# Handle dimension specs by creating a filters
# based on its `outputName`
if isinstance(dim, dict):
dim_value = dim.get('outputName')
if not dim_value:
continue
f = Dimension(dim_value) == row[dim_value]
f = Dimension(dim) == row[dim]
fields.append(f)
if len(fields) > 1:
term = Filter(type='and', fields=fields)
Expand Down Expand Up @@ -1002,11 +995,10 @@ def run_query( # noqa / druid
elif (
not having_filters and
len(groupby) == 1 and
order_desc and
not isinstance(list(qry.get('dimensions'))[0], dict)
order_desc
):
dim = list(qry.get('dimensions'))[0]
logging.info('Running topn query for dimension [{}]'.format(dim))
logging.info('Running two-phase topn query for dimension [{}]'.format(dim))
if timeseries_limit_metric:
order_by = timeseries_limit_metric
else:
Expand All @@ -1017,9 +1009,14 @@ def run_query( # noqa / druid
pre_qry['threshold'] = min(row_limit,
timeseries_limit or row_limit)
pre_qry['metric'] = order_by
pre_qry['dimension'] = dim
if isinstance(dim, dict):
if 'dimension' in dim:
pre_qry['dimension'] = dim['dimension']
else:
pre_qry['dimension'] = dim
del pre_qry['dimensions']
client.topn(**pre_qry)
logging.info('Phase 1 Complete')
query_str += '// Two phase query\n// Phase 1\n'
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
Expand All @@ -1031,7 +1028,7 @@ def run_query( # noqa / druid
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'],
[pre_qry['dimension']],
filters)
qry['threshold'] = timeseries_limit or 1000
if row_limit and granularity == 'all':
Expand All @@ -1040,6 +1037,7 @@ def run_query( # noqa / druid
del qry['dimensions']
qry['metric'] = list(qry['aggregations'].keys())[0]
client.topn(**qry)
logging.info('Phase 2 Complete')
elif len(groupby) > 0:
# If grouping on multiple fields or using a having filter
# we have to force a groupby query
Expand All @@ -1063,7 +1061,18 @@ def run_query( # noqa / druid
'direction': order_direction,
}],
}
pre_qry_dims = []
# Replace dimensions specs with their `dimension`
# values, and ignore those without
for dim in qry['dimensions']:
if isinstance(dim, dict):
if 'dimension' in dim:
pre_qry_dims.append(dim['dimension'])
else:
pre_qry_dims.append(dim)
pre_qry['dimensions'] = list(set(pre_qry_dims))
client.groupby(**pre_qry)
logging.info('Phase 1 Complete')
query_str += '// Two phase query\n// Phase 1\n'
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
Expand All @@ -1075,7 +1084,7 @@ def run_query( # noqa / druid
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'],
pre_qry['dimensions'],
filters,
)
qry['limit_spec'] = None
Expand All @@ -1090,6 +1099,7 @@ def run_query( # noqa / druid
}],
}
client.groupby(**qry)
logging.info('Query Complete')
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
return query_str
Expand Down
2 changes: 2 additions & 0 deletions superset/connectors/druid/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def pre_update(self, col):
raise ValueError('Dimension Spec must be a JSON object')
if 'outputName' not in dimension_spec:
raise ValueError('Dimension Spec does not contain `outputName`')
if 'dimension' not in dimension_spec:
raise ValueError('Dimension Spec is missing `dimension`')
# `outputName` should be the same as the `column_name`
if dimension_spec['outputName'] != col.column_name:
raise ValueError(
Expand Down
16 changes: 9 additions & 7 deletions tests/druid_func_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def test_run_query_single_groupby(self):
self.assertIn('dimensions', client.groupby.call_args_list[0][1])
self.assertEqual(['col1'], client.groupby.call_args_list[0][1]['dimensions'])
# order_desc but timeseries and dimension spec
spec = {'spec': 1}
# calls topn with single dimension spec 'dimension'
spec = {'outputName': 'hello', 'dimension': 'matcho'}
spec_json = json.dumps(spec)
col3 = DruidColumn(column_name='col3', dimension_spec_json=spec_json)
ds.columns.append(col3)
Expand All @@ -224,13 +225,14 @@ def test_run_query_single_groupby(self):
client=client, order_desc=True, timeseries_limit=5,
filter=[], row_limit=100,
)
self.assertEqual(0, len(client.topn.call_args_list))
self.assertEqual(2, len(client.groupby.call_args_list))
self.assertEqual(2, len(client.topn.call_args_list))
self.assertEqual(0, len(client.groupby.call_args_list))
self.assertEqual(0, len(client.timeseries.call_args_list))
self.assertIn('dimensions', client.groupby.call_args_list[0][1])
self.assertIn('dimensions', client.groupby.call_args_list[1][1])
self.assertEqual([spec], client.groupby.call_args_list[0][1]['dimensions'])
self.assertEqual([spec], client.groupby.call_args_list[1][1]['dimensions'])
self.assertIn('dimension', client.topn.call_args_list[0][1])
self.assertIn('dimension', client.topn.call_args_list[1][1])
# uses dimension for pre query and full spec for final query
self.assertEqual('matcho', client.topn.call_args_list[0][1]['dimension'])
self.assertEqual(spec, client.topn.call_args_list[1][1]['dimension'])

def test_run_query_multiple_groupby(self):
client = Mock()
Expand Down

0 comments on commit 2361192

Please sign in to comment.