Skip to content

Commit

Permalink
Better tests for summary metrics - see apache#3302
Browse files Browse the repository at this point in the history
  • Loading branch information
rhunwicks committed Oct 3, 2017
1 parent dd769ed commit 1df6237
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 42 deletions.
87 changes: 51 additions & 36 deletions contrib/connectors/pandas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,55 +552,70 @@ def process_dataframe(
df = df.groupby(groupby + timestamp_exprs, sort=False)
query_str += '.groupby({}, sort=False)'.format(groupby + timestamp_exprs)

for sources, expr, func in apply_functions:
if sources:
dfs.append(df[sources].apply(func))
query_strs.append(query_str +
'[{}].apply({})'.format(sources, expr))
else:
dfs.append(df.apply(func))
query_strs.append(query_str + '.apply({})'.format(expr))
else:
# Multi-column aggregates need to be passed the DataFrame,
# whereas if we call DataFrame.apply() without a groupby
# the func is called on each column individually
for sources, expr, func in apply_functions:
if sources:
dfs.append(pd.Series(func(df[sources])))
query_strs.append('pd.Series({expr}({df}[{sources}]))'.format(
expr=expr,
df=query_str,
sources=sources))
else:
dfs.append(pd.Series(func(df)))
query_strs.append('pd.Series({expr}({df}))'.format(
expr=expr,
df=query_str))
for sources, expr, func in apply_functions:
apply_df = df
apply_str = query_str

if sources:
apply_df = df[sources]
apply_str = query_str + '[{}]'.format(sources)

if groupby or timestamp_exprs:
apply_df = df.apply(func)
apply_str += '.apply({})'.format(expr)
else:
# If we call DataFrame.apply() without a groupby then the func is
# called on each column individually. Therefore, if we have a
# summary with multi-column aggregates we need to pass the whole
# DataFrame to the function.
apply_df = pd.Series(func(apply_df)).to_frame()
apply_str = 'pd.Series({expr}({df})).to_frame()'.format(
expr=expr,
df=apply_str)

# Superset expects a DataFrame with single Row and
# the metrics as columns, rather than with the metrics as the
# index, so if we have a summary then we need to pivot it.
apply_df = apply_df.unstack().to_frame().T
apply_str += '.unstack().to_frame().T'

dfs.append(apply_df)
query_strs.append(apply_str)

if aggregates:
dfs.append(df.aggregate(aggregates))
query_strs.append(query_str +
'.aggregate({})'.format(aggregates))
if groupby or timestamp_exprs:
dfs.append(df.aggregate(aggregates))
query_strs.append(query_str +
'.aggregate({})'.format(aggregates))
else:
# For a summary table we need to preserve the metric order
# so we can set the correct column names, so process aggregates
# for each dataframe column in turn
for col, col_agg in aggregates.items():
agg_df = df.aggregate({col: col_agg})
agg_str = query_str + '.aggregate({}: {})'.format(col, col_agg)

# Superset expects a DataFrame with single Row and
# the metrics as columns, rather than with the metrics as the
# index, so if we have a summary then we need to pivot it.
agg_df = agg_df.unstack().to_frame().T
agg_str += '.unstack().to_frame().T'

dfs.append(agg_df)
query_strs.append(agg_str)

# If there is more than one DataFrame in the list then
# concatenate them along the index
if len(dfs) > 1:
df = pd.concat(dfs, axis=0)
query_str = 'pd.concat([{}])'.format(', '.join(query_strs))
df = pd.concat(dfs, axis=1)
query_str = 'pd.concat([{}], axis=1)'.format(', '.join(query_strs))
else:
df = dfs[0]
query_str = query_strs[0]

if groupby or timestamp_exprs:
df = df.reset_index()
query_str += '.reset_index()'
else:
# Note that Superset expects a DataFrame with single Row and
# the metrics as columns, rather than with the metrics
# as the index, so if we have a summary then we need to
# reindex it
df = df.bfill(axis=1).T.reset_index(drop=True).head(n=1)
query_str += '.bfill(axis=1).T.reset_index(drop=True).head(n=1)'

# Set the correct columns names and then reorder the columns
# to match the requested order
Expand Down
21 changes: 15 additions & 6 deletions contrib/tests/connector_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_summary_single_metric(self):
def test_summary_multiple_metrics(self):
parameters = {
'groupby': [],
'metrics': ['sum__value', 'avg__value', 'value_percentage'],
'metrics': ['sum__value', 'avg__value', 'value_percentage', 'ratio'],
'granularity': 'received',
'from_dttm': datetime.datetime(2001, 1, 1),
'to_dttm': datetime.datetime(2001, 12, 31),
Expand All @@ -175,6 +175,7 @@ def test_summary_multiple_metrics(self):
'time_grain_sqla': None,
},
}
self.df['ratio'] = self.df['value'] / self.df['value2']
result = self.datasource.query(parameters)
self.assertIsInstance(result, QueryResult)
self.assertEqual(result.error_message, None)
Expand All @@ -184,6 +185,7 @@ def test_summary_multiple_metrics(self):
('avg__value', [self.df['value'].mean()]),
('value_percentage', [sum(self.df['value']) /
sum(self.df['value'] + self.df['value2'])]),
('ratio', [self.df['ratio'].mean()]),
]))
self.assertEqual(result.df, expected_df)

Expand Down Expand Up @@ -581,7 +583,7 @@ def test_groupby_single_metric(self):
def test_groupby_multiple_metrics(self):
parameters = {
'groupby': ['project', 'region'],
'metrics': ['sum__value', 'avg__value'],
'metrics': ['sum__value', 'avg__value', 'value_percentage', 'ratio'],
'granularity': 'received',
'from_dttm': datetime.datetime(2001, 1, 1),
'to_dttm': datetime.datetime(2001, 12, 31),
Expand All @@ -594,15 +596,22 @@ def test_groupby_multiple_metrics(self):
'time_grain_sqla': None,
},
}
self.df['ratio'] = self.df['value'] / self.df['value2']
result = self.datasource.query(parameters)
self.assertIsInstance(result, QueryResult)
self.assertEqual(result.error_message, None)
self.assertEqual(result.status, QueryStatus.SUCCESS)
expected_df = (self.df.groupby(parameters['groupby'])
.aggregate({'value': ['sum', 'mean']})
.reset_index())
expected_df.columns = parameters['groupby'] + parameters['metrics']
expected_df = expected_df.sort_values(['sum__value'], ascending=False)
.aggregate(OrderedDict([('value', ['sum', 'mean']),
('ratio', ['mean'])])))
expected_df['value_percentage'] = (self.df.groupby(parameters['groupby'])
.apply(lambda x: sum(x['value']) /
sum(x['value'] + x['value2'])))
expected_df = expected_df.reset_index()
expected_df.columns = (parameters['groupby'] +
['sum__value', 'avg__value', 'ratio', 'value_percentage'])
expected_df = (expected_df[parameters['groupby'] + parameters['metrics']]
.sort_values(['sum__value'], ascending=False))
self.assertEqual(result.df, expected_df)

def test_groupby_ratio_metric(self):
Expand Down

0 comments on commit 1df6237

Please sign in to comment.