Skip to content

Commit

Permalink
Update PandasConnector for order_desc - see apache#3302
Browse files Browse the repository at this point in the history
  • Loading branch information
rhunwicks committed Sep 20, 2017
1 parent 1520c9f commit 5960f49
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 15 deletions.
21 changes: 12 additions & 9 deletions contrib/connectors/pandas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,15 @@ def process_dataframe(
orderby=None,
extras=None,
columns=None,
form_data=None):
form_data=None,
order_desc=True):
"""Querying any dataframe table from this common interface"""
if orderby:
orderby, ascending = map(list, zip(*orderby))
else:
orderby = []
ascending = []
metric_order_asc = not order_desc
filter = filter or []
query_str = 'df'

Expand Down Expand Up @@ -415,19 +417,21 @@ def process_dataframe(
df = (df[df.set_index(groupby).index.isin(
df.groupby(groupby)
.aggregate(aggregates)
.sort_values(metric.source, ascending=False)
.sort_values(metric.source, ascending=metric_order_asc)
.iloc[:timeseries_limit].index)])

query_str += ('[df.set_index({groupby}).index.isin('
'df.groupby({groupby})'
'.aggregate({aggregates})'
".sort_values('{metric.source}', ascending=False)"
".sort_values('{metric.source}', "
'ascending={metric_order_asc})'
'.iloc[:{timeseries_limit}].index)]').format(
groupby=groupby,
timeseries_limit_metric=timeseries_limit_metric,
timeseries_limit=timeseries_limit,
aggregates=aggregates,
metric=metric)
metric=metric,
metric_order_asc=metric_order_asc)

# Additional filtering of rows prior to aggregation
if filter:
Expand Down Expand Up @@ -502,10 +506,9 @@ def process_dataframe(
filter_str=filter_str)

# Order by the first metric descending by default,
# or within the existing orderby, if we have a groupby
if groupby:
orderby.append((metrics + filtered_metrics)[0])
ascending.append(False)
# or within the existing orderby
orderby.append((metrics + filtered_metrics)[0])
ascending.append(metric_order_asc)

# Use the groupby and __timestamp by as a tie breaker
orderby = orderby + groupby + timestamp_cols
Expand All @@ -529,7 +532,7 @@ def process_dataframe(
# order by the size descending by default, or within the
# existing orderby
orderby.append(0)
ascending.append(False)
ascending.append(not order_desc)
# Use the group by as a tie breaker
orderby = orderby + groupby
ascending = ascending + ([True] * len(groupby))
Expand Down
91 changes: 85 additions & 6 deletions contrib/tests/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def setUpClass(cls):
| Region 1 | District A | Project A | 2001-03-31 11:00:00 | 85 |
| Region 1 | District B | Project B | 2001-03-31 12:00:00 | 5 |
| Region 2 | District C | Project C | 2001-03-31 14:00:00 | 35 |
| Region 1 | District A | Project A | 2001-04-30 10:00:00 | 15 |
| Region 1 | District A | Project A | 2001-04-30 12:00:00 | 15 |
| Region 2 | District C | Project C | 2001-04-30 13:00:00 | 15 |
"""

def assertFrameEqual(self, frame1, frame2, msg=None):
Expand Down Expand Up @@ -600,6 +603,34 @@ def test_groupby_multiple_metrics(self):
expected_df = expected_df.sort_values(['sum__value'], ascending=False)
self.assertEqual(result.df, expected_df)

def test_groupby_ascending_order(self):
parameters = {
'groupby': ['project', 'region'],
'metrics': ['sum__value', 'avg__value'],
'granularity': 'received',
'from_dttm': datetime.datetime(2001, 1, 1),
'to_dttm': datetime.datetime(2001, 12, 31),
'filter': [],
'is_timeseries': False,
'timeseries_limit': 0,
'timeseries_limit_metric': None,
'row_limit': 5000,
'extras': {
'time_grain_sqla': None,
},
'order_desc': False,
}
result = self.datasource.query(parameters)
self.assertIsInstance(result, QueryResult)
self.assertEqual(result.error_message, None)
self.assertEqual(result.status, QueryStatus.SUCCESS)
expected_df = (self.df.groupby(parameters['groupby'])
.aggregate({'value': ['sum', 'mean']})
.reset_index())
expected_df.columns = parameters['groupby'] + parameters['metrics']
expected_df = expected_df.sort_values(['sum__value'], ascending=True)
self.assertEqual(result.df, expected_df)

def test_timeseries_single_metric(self):
parameters = {
'groupby': [],
Expand All @@ -617,6 +648,7 @@ def test_timeseries_single_metric(self):
# See https://github.com/apache/incubator-superset/issues/617
'time_grain_sqla': 'day',
},
'order_desc': True,
}
result = self.datasource.query(parameters)
self.assertIsInstance(result, QueryResult)
Expand All @@ -632,7 +664,8 @@ def test_timeseries_single_metric(self):
['__timestamp'] +
parameters['metrics'])
expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
expected_df = (expected_df.sort_values(['__timestamp'], ascending=True)
expected_df = (expected_df.sort_values(parameters['metrics'][0],
ascending=(not parameters['order_desc']))
.reset_index(drop=True))
self.assertEqual(result.df, expected_df)

Expand All @@ -653,6 +686,7 @@ def test_timeseries_multiple_metrics(self):
# See https://github.com/apache/incubator-superset/issues/617
'time_grain_sqla': 'day',
},
'order_desc': True,
}
result = self.datasource.query(parameters)
self.assertIsInstance(result, QueryResult)
Expand All @@ -668,9 +702,9 @@ def test_timeseries_multiple_metrics(self):
['__timestamp'] +
parameters['metrics'])
expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
expected_df = (expected_df.sort_values(['__timestamp'], ascending=True)
.reset_index(drop=True))
self.assertEqual(result.df.reset_index(drop=True), expected_df)
expected_df = (expected_df.sort_values(parameters['metrics'][0],
ascending=(not parameters['order_desc'])))
self.assertEqual(result.df, expected_df)

def test_timeseries_groupby(self):
parameters = {
Expand Down Expand Up @@ -725,6 +759,7 @@ def test_timeseries_limit(self):
# See https://github.com/apache/incubator-superset/issues/617
'time_grain_sqla': 'day',
},
'order_desc': True,
}
result = self.datasource.query(parameters)
self.assertIsInstance(result, QueryResult)
Expand All @@ -733,7 +768,7 @@ def test_timeseries_limit(self):
time_grain = PandasDatasource.GRAINS[parameters['extras']['time_grain_sqla']]
limit_df = (self.df.groupby(parameters['groupby'])
.aggregate({'value': 'mean'})
.sort_values('value', ascending=False)
.sort_values('value', ascending=(not parameters['order_desc']))
.iloc[:parameters['timeseries_limit']])
source_df = self.df.set_index(parameters['groupby'])
expected_df = (source_df[source_df.index.isin(limit_df.index)]
Expand All @@ -745,7 +780,51 @@ def test_timeseries_limit(self):
['__timestamp'] +
parameters['metrics'])
expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
expected_df = (expected_df.sort_values(['sum__value'],
ascending=(not parameters['order_desc']))
.reset_index(drop=True))
self.assertEqual(result.df, expected_df)

def test_timeseries_limit_ascending_order(self):
parameters = {
'groupby': ['project', 'district'],
'metrics': ['sum__value'],
'granularity': 'received',
'from_dttm': datetime.datetime(2001, 1, 1),
'to_dttm': datetime.datetime(2001, 12, 31),
'filter': [],
'is_timeseries': True,
'timeseries_limit': 2,
'timeseries_limit_metric': 'avg__value',
'row_limit': 5000,
'extras': {
# Note that week and month don't work on SQLite
# See https://github.com/apache/incubator-superset/issues/617
'time_grain_sqla': 'day',
},
'order_desc': False,
}
result = self.datasource.query(parameters)
self.assertIsInstance(result, QueryResult)
self.assertEqual(result.error_message, None)
self.assertEqual(result.status, QueryStatus.SUCCESS)
time_grain = PandasDatasource.GRAINS[parameters['extras']['time_grain_sqla']]
limit_df = (self.df.groupby(parameters['groupby'])
.aggregate({'value': 'mean'})
.sort_values('value', ascending=(not parameters['order_desc']))
.iloc[:parameters['timeseries_limit']])
source_df = self.df.set_index(parameters['groupby'])
expected_df = (source_df[source_df.index.isin(limit_df.index)]
.groupby(parameters['groupby'] + [pd.Grouper(key=parameters['granularity'],
freq=time_grain)])
.aggregate({'value': ['sum']})
.reset_index())
expected_df.columns = (parameters['groupby'] +
['__timestamp'] +
parameters['metrics'])
expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
expected_df = (expected_df.sort_values(['sum__value'],
ascending=(not parameters['order_desc']))
.reset_index(drop=True))
self.assertEqual(result.df, expected_df)

Expand Down

0 comments on commit 5960f49

Please sign in to comment.