Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix DataFrameGroupby.agg() issue with as_index=False #273

Merged
merged 11 commits into from
Dec 19, 2023
1 change: 0 additions & 1 deletion bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ def value_counts(
by_column_ids=columns,
aggregations=[(dummy, agg_ops.count_op)],
dropna=dropna,
as_index=True,
)
count_id = agg_ids[0]
if normalize:
Expand Down
51 changes: 15 additions & 36 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,6 @@ def aggregate(
by_column_ids: typing.Sequence[str] = (),
aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp]] = (),
*,
as_index: bool = True,
dropna: bool = True,
) -> typing.Tuple[Block, typing.Sequence[str]]:
"""
Expand All @@ -947,40 +946,21 @@ def aggregate(
aggregate_labels = self._get_labels_for_columns(
[agg[0] for agg in aggregations]
)
if as_index:
names: typing.List[Label] = []
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])
return (
Block(
result_expr,
index_columns=by_column_ids,
column_labels=aggregate_labels,
index_labels=names,
),
output_col_ids,
)
else: # as_index = False
# If as_index=False, drop grouping levels, but keep grouping value columns
by_value_columns = [
col for col in by_column_ids if col in self.value_columns
]
by_column_labels = self._get_labels_for_columns(by_value_columns)
labels = (*by_column_labels, *aggregate_labels)
offsets_id = guid.generate_guid()
result_expr_pruned = result_expr.select_columns(
[*by_value_columns, *output_col_ids]
).promote_offsets(offsets_id)

return (
Block(
result_expr_pruned, index_columns=[offsets_id], column_labels=labels
),
output_col_ids,
)
names: typing.List[Label] = []
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])
return (
Block(
result_expr,
index_columns=by_column_ids,
column_labels=aggregate_labels,
index_labels=names,
),
output_col_ids,
)

def get_stat(self, column_id: str, stat: agg_ops.AggregateOp):
"""Gets aggregates immediately, and caches it"""
Expand Down Expand Up @@ -1309,7 +1289,6 @@ def pivot(
result_block, _ = block.aggregate(
by_column_ids=self.index_columns,
aggregations=aggregations,
as_index=True,
dropna=True,
)

Expand Down
15 changes: 10 additions & 5 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,10 @@ def _agg_string(self, func: str) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
if not self._as_index:
agg_block = agg_block.reset_index()
return df.DataFrame(agg_block)

def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
Expand All @@ -285,7 +286,6 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
if want_aggfunc_level:
Expand All @@ -297,6 +297,8 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
)
else:
agg_block = agg_block.with_column_labels(pd.Index(column_labels))
if not self._as_index:
agg_block = agg_block.reset_index()
return df.DataFrame(agg_block)

def _agg_list(self, func: typing.Sequence) -> df.DataFrame:
Expand All @@ -311,14 +313,15 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
agg_block = agg_block.with_column_labels(
pd.MultiIndex.from_tuples(
column_labels, names=[*self._block.column_labels.names, None]
)
)
if not self._as_index:
agg_block = agg_block.reset_index()
return df.DataFrame(agg_block)

def _agg_named(self, **kwargs) -> df.DataFrame:
Expand All @@ -339,10 +342,11 @@ def _agg_named(self, **kwargs) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
agg_block = agg_block.with_column_labels(column_labels)
if not self._as_index:
agg_block = agg_block.reset_index()
return df.DataFrame(agg_block)

aggregate = agg
Expand Down Expand Up @@ -379,9 +383,10 @@ def _aggregate_all(
result_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
if not self._as_index:
result_block = result_block.reset_index()
return df.DataFrame(result_block)

def _apply_window_op(
Expand Down
16 changes: 8 additions & 8 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,6 @@ def mode(self) -> Series:
block, agg_ids = block.aggregate(
by_column_ids=[self._value_column],
aggregations=((self._value_column, agg_ops.count_op),),
as_index=False,
)
value_count_col_id = agg_ids[0]
block, max_value_count_col_id = block.apply_window_op(
Expand All @@ -830,14 +829,15 @@ def mode(self) -> Series:
ops.eq_op,
)
block = block.filter(is_mode_col_id)
mode_values_series = Series(
block.select_column(self._value_column).assign_label(
self._value_column, self.name
)
)
return typing.cast(
Series, mode_values_series.sort_values().reset_index(drop=True)
# use temporary name for reset_index to avoid collision, restore after dropping extra columns
block = (
block.with_index_labels(["mode_temp_internal"])
.order_by([OrderingColumnReference(self._value_column)])
.reset_index(drop=False)
)
block = block.select_column(self._value_column).with_column_labels([self.name])
mode_values_series = Series(block.select_column(self._value_column))
return typing.cast(Series, mode_values_series)

def mean(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.mean_op))
Expand Down
17 changes: 13 additions & 4 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,32 @@ def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)


@pytest.mark.parametrize(
("as_index"),
[
(True),
(False),
],
)
def test_dataframe_groupby_agg_dict_with_list(
scalars_df_index, scalars_pandas_df_index
scalars_df_index, scalars_pandas_df_index, as_index
):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"]
bf_result = (
scalars_df_index[col_names]
.groupby("string_col")
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
)
pd_result = (
scalars_pandas_df_index[col_names]
.groupby("string_col")
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
)
bf_result_computed = bf_result.to_pandas()

pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)
pd.testing.assert_frame_equal(
pd_result, bf_result_computed, check_dtype=False, check_index_type=False
)


def test_dataframe_groupby_agg_dict_no_lists(scalars_df_index, scalars_pandas_df_index):
Expand Down
4 changes: 4 additions & 0 deletions tests/system/small/test_multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ def test_multi_index_dataframe_groupby_level_aggregate(
.groupby(level=level, as_index=as_index)
.mean(numeric_only=True)
)
print("pandas")
shobsi marked this conversation as resolved.
Show resolved Hide resolved
print(pd_result.to_string())
print("bigframes")
print(bf_result.to_string())

# Pandas will have int64 index, while bigquery will have Int64 when resetting
pandas.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False)
Expand Down