diff --git a/contrib/connectors/pandas/models.py b/contrib/connectors/pandas/models.py index b7645d1d4d568..fc0509b7dbbb4 100644 --- a/contrib/connectors/pandas/models.py +++ b/contrib/connectors/pandas/models.py @@ -552,41 +552,63 @@ def process_dataframe( df = df.groupby(groupby + timestamp_exprs, sort=False) query_str += '.groupby({}, sort=False)'.format(groupby + timestamp_exprs) - for sources, expr, func in apply_functions: - if sources: - dfs.append(df[sources].apply(func)) - query_strs.append(query_str + - '[{}].apply({})'.format(sources, expr)) - else: - dfs.append(df.apply(func)) - query_strs.append(query_str + '.apply({})'.format(expr)) - else: - # Multi-column aggregates need to be passed the DataFrame, - # whereas if we call DataFrame.apply() without a groupby - # the func is called on each column individually - for sources, expr, func in apply_functions: - if sources: - dfs.append(pd.Series(func(df[sources]))) - query_strs.append('pd.Series({expr}({df}[{sources}]))'.format( - expr=expr, - df=query_str, - sources=sources)) - else: - dfs.append(pd.Series(func(df))) - query_strs.append('pd.Series({expr}({df}))'.format( - expr=expr, - df=query_str)) + for sources, expr, func in apply_functions: + apply_df = df + apply_str = query_str + + if sources: + apply_df = df[sources] + apply_str = query_str + '[{}]'.format(sources) + + if groupby or timestamp_exprs: + apply_df = df.apply(func) + apply_str += '.apply({})'.format(expr) + else: + # If we call DataFrame.apply() without a groupby then the func is + # called on each column individually. Therefore, if we have a + # summary with multi-column aggregates we need to pass the whole + # DataFrame to the function. + apply_df = pd.Series(func(apply_df)).to_frame() + apply_str = 'pd.Series({expr}({df})).to_frame()'.format( + expr=expr, + df=apply_str) + + # Superset expects a DataFrame with single Row and + # the metrics as columns, rather than with the metrics as the + # index, so if we have a summary then we need to pivot it. + apply_df = apply_df.unstack().to_frame().T + apply_str += '.unstack().to_frame().T' + + dfs.append(apply_df) + query_strs.append(apply_str) if aggregates: - dfs.append(df.aggregate(aggregates)) - query_strs.append(query_str + - '.aggregate({})'.format(aggregates)) + if groupby or timestamp_exprs: + dfs.append(df.aggregate(aggregates)) + query_strs.append(query_str + + '.aggregate({})'.format(aggregates)) + else: + # For a summary table we need to preserve the metric order + # so we can set the correct column names, so process aggregates + # for each dataframe column in turn + for col, col_agg in aggregates.items(): + agg_df = df.aggregate({col: col_agg}) + agg_str = query_str + '.aggregate({}: {})'.format(col, col_agg) + + # Superset expects a DataFrame with single Row and + # the metrics as columns, rather than with the metrics as the + # index, so if we have a summary then we need to pivot it. + agg_df = agg_df.unstack().to_frame().T + agg_str += '.unstack().to_frame().T' + + dfs.append(agg_df) + query_strs.append(agg_str) # If there is more than one DataFrame in the list then # concatenate them along the index if len(dfs) > 1: - df = pd.concat(dfs, axis=0) - query_str = 'pd.concat([{}])'.format(', '.join(query_strs)) + df = pd.concat(dfs, axis=1) + query_str = 'pd.concat([{}], axis=1)'.format(', '.join(query_strs)) else: df = dfs[0] query_str = query_strs[0] @@ -594,13 +616,6 @@ def process_dataframe( if groupby or timestamp_exprs: df = df.reset_index() query_str += '.reset_index()' - else: - # Note that Superset expects a DataFrame with single Row and - # the metrics as columns, rather than with the metrics - # as the index, so if we have a summary then we need to - # reindex it - df = df.bfill(axis=1).T.reset_index(drop=True).head(n=1) - query_str += '.bfill(axis=1).T.reset_index(drop=True).head(n=1)' # Set the correct columns names and then reorder the columns # to match the requested order diff --git a/contrib/tests/connector_tests.py b/contrib/tests/connector_tests.py index 631f0c11da5db..b64e377e31b09 100644 --- a/contrib/tests/connector_tests.py +++ b/contrib/tests/connector_tests.py @@ -162,7 +162,7 @@ def test_summary_single_metric(self): def test_summary_multiple_metrics(self): parameters = { 'groupby': [], - 'metrics': ['sum__value', 'avg__value', 'value_percentage'], + 'metrics': ['sum__value', 'avg__value', 'value_percentage', 'ratio'], 'granularity': 'received', 'from_dttm': datetime.datetime(2001, 1, 1), 'to_dttm': datetime.datetime(2001, 12, 31), @@ -175,6 +175,7 @@ def test_summary_multiple_metrics(self): 'time_grain_sqla': None, }, } + self.df['ratio'] = self.df['value'] / self.df['value2'] result = self.datasource.query(parameters) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) @@ -184,6 +185,7 @@ def test_summary_multiple_metrics(self): ('avg__value', [self.df['value'].mean()]), ('value_percentage', [sum(self.df['value']) / sum(self.df['value'] + self.df['value2'])]), + ('ratio', [self.df['ratio'].mean()]), ])) self.assertEqual(result.df, expected_df) @@ -581,7 +583,7 @@ def test_groupby_single_metric(self): def test_groupby_multiple_metrics(self): parameters = { 'groupby': ['project', 'region'], - 'metrics': ['sum__value', 'avg__value'], + 'metrics': ['sum__value', 'avg__value', 'value_percentage', 'ratio'], 'granularity': 'received', 'from_dttm': datetime.datetime(2001, 1, 1), 'to_dttm': datetime.datetime(2001, 12, 31), @@ -594,15 +596,22 @@ def test_groupby_multiple_metrics(self): 'time_grain_sqla': None, }, } + self.df['ratio'] = self.df['value'] / self.df['value2'] result = self.datasource.query(parameters) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df.groupby(parameters['groupby']) - .aggregate({'value': ['sum', 'mean']}) - .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] - expected_df = expected_df.sort_values(['sum__value'], ascending=False) + .aggregate(OrderedDict([('value', ['sum', 'mean']), + ('ratio', ['mean'])]))) + expected_df['value_percentage'] = (self.df.groupby(parameters['groupby']) + .apply(lambda x: sum(x['value']) / + sum(x['value'] + x['value2']))) + expected_df = expected_df.reset_index() + expected_df.columns = (parameters['groupby'] + + ['sum__value', 'avg__value', 'ratio', 'value_percentage']) + expected_df = (expected_df[parameters['groupby'] + parameters['metrics']] + .sort_values(['sum__value'], ascending=False)) self.assertEqual(result.df, expected_df) def test_groupby_ratio_metric(self):