Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REF: remove libreduction.apply_frame_axis0 #42992

Merged
merged 3 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ Performance improvements
- Performance improvement in constructing :class:`DataFrame` objects (:issue:`42631`)
- Performance improvement in :meth:`GroupBy.shift` when ``fill_value`` argument is provided (:issue:`26615`)
- Performance improvement in :meth:`DataFrame.corr` for ``method=pearson`` on data without missing values (:issue:`40956`)
- Performance improvement in some :meth:`GroupBy.apply` operations (:issue:`42992`)
-

.. ---------------------------------------------------------------------------
Expand Down
153 changes: 1 addition & 152 deletions pandas/_libs/reduction.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ from numpy cimport (

cnp.import_array()

from pandas._libs.util cimport (
is_array,
set_array_not_contiguous,
)
from pandas._libs.util cimport is_array

from pandas._libs.lib import is_scalar

Expand Down Expand Up @@ -350,151 +347,3 @@ cdef class Slider:
cdef reset(self):
self.buf.data = self.orig_data
self.buf.shape[0] = 0


def apply_frame_axis0(object frame, object f, object names,
const int64_t[:] starts, const int64_t[:] ends):
cdef:
BlockSlider slider
Py_ssize_t i, n = len(starts)
list results
object piece
dict item_cache

# We have already checked that we don't have a MultiIndex before calling
assert frame.index.nlevels == 1

results = []

slider = BlockSlider(frame)

mutated = False
item_cache = slider.dummy._item_cache
try:
for i in range(n):
slider.move(starts[i], ends[i])

item_cache.clear() # ugh
chunk = slider.dummy
object.__setattr__(chunk, 'name', names[i])

piece = f(chunk)

# Need to infer if low level index slider will cause segfaults
require_slow_apply = i == 0 and piece is chunk
try:
if piece.index is not chunk.index:
mutated = True
except AttributeError:
# `piece` might not have an index, could be e.g. an int
pass

if not is_scalar(piece):
# Need to copy data to avoid appending references
try:
piece = piece.copy(deep="all")
except (TypeError, AttributeError):
pass

results.append(piece)

# If the data was modified inplace we need to
# take the slow path to not risk segfaults
# we have already computed the first piece
if require_slow_apply:
break
finally:
slider.reset()

return results, mutated


cdef class BlockSlider:
"""
Only capable of sliding on axis=0
"""
cdef:
object frame, dummy, index, block
list blocks, blk_values
ndarray orig_blklocs, orig_blknos
ndarray values
Slider idx_slider
char **base_ptrs
int nblocks
Py_ssize_t i

def __init__(self, object frame):
self.frame = frame
self.dummy = frame[:0]
self.index = self.dummy.index

# GH#35417 attributes we need to restore at each step in case
# the function modified them.
mgr = self.dummy._mgr
self.orig_blklocs = mgr.blklocs
self.orig_blknos = mgr.blknos
self.blocks = [x for x in self.dummy._mgr.blocks]

self.blk_values = [block.values for block in self.dummy._mgr.blocks]

for values in self.blk_values:
set_array_not_contiguous(values)

self.nblocks = len(self.blk_values)
# See the comment in indexes/base.py about _index_data.
# We need this for EA-backed indexes that have a reference to a 1-d
# ndarray like datetime / timedelta / period.
self.idx_slider = Slider(
self.frame.index._index_data, self.dummy.index._index_data)

self.base_ptrs = <char**>malloc(sizeof(char*) * self.nblocks)
for i, block in enumerate(self.blk_values):
self.base_ptrs[i] = (<ndarray>block).data

def __dealloc__(self):
free(self.base_ptrs)

cdef move(self, int start, int end):
cdef:
ndarray arr
Py_ssize_t i

self._restore_blocks()

# move blocks
for i in range(self.nblocks):
arr = self.blk_values[i]

# axis=1 is the frame's axis=0
arr.data = self.base_ptrs[i] + arr.strides[1] * start
arr.shape[1] = end - start

# move and set the index
self.idx_slider.move(start, end)

object.__setattr__(self.index, '_index_data', self.idx_slider.buf)
self.index._engine.clear_mapping()
self.index._cache.clear() # e.g. inferred_freq must go

cdef reset(self):
cdef:
ndarray arr
Py_ssize_t i

self._restore_blocks()

for i in range(self.nblocks):
arr = self.blk_values[i]

# axis=1 is the frame's axis=0
arr.data = self.base_ptrs[i]
arr.shape[1] = 0

cdef _restore_blocks(self):
"""
Ensure that we have the original blocks, blknos, and blklocs.
"""
mgr = self.dummy._mgr
mgr.blocks = tuple(self.blocks)
mgr._blklocs = self.orig_blklocs
mgr._blknos = self.orig_blknos
63 changes: 2 additions & 61 deletions pandas/core/groupby/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,13 @@
import pandas.core.common as com
from pandas.core.frame import DataFrame
from pandas.core.generic import NDFrame
from pandas.core.groupby import (
base,
grouper,
)
from pandas.core.groupby import grouper
from pandas.core.indexes.api import (
CategoricalIndex,
Index,
MultiIndex,
ensure_index,
)
from pandas.core.internals import ArrayManager
from pandas.core.series import Series
from pandas.core.sorting import (
compress_group_index,
Expand Down Expand Up @@ -718,60 +714,10 @@ def apply(self, f: F, data: FrameOrSeries, axis: int = 0):
mutated = self.mutated
splitter = self._get_splitter(data, axis=axis)
group_keys = self._get_group_keys()
result_values = None

if data.ndim == 2 and any(
isinstance(x, ExtensionArray) for x in data._iter_column_arrays()
):
# calling splitter.fast_apply will raise TypeError via apply_frame_axis0
# if we pass EA instead of ndarray
# TODO: can we have a workaround for EAs backed by ndarray?
pass

elif isinstance(data._mgr, ArrayManager):
# TODO(ArrayManager) don't use fast_apply / libreduction.apply_frame_axis0
# for now -> relies on BlockManager internals
pass
elif (
com.get_callable_name(f) not in base.plotting_methods
and isinstance(splitter, FrameSplitter)
and axis == 0
# fast_apply/libreduction doesn't allow non-numpy backed indexes
and not data.index._has_complex_internals
):
try:
sdata = splitter.sorted_data
result_values, mutated = splitter.fast_apply(f, sdata, group_keys)

except IndexError:
# This is a rare case in which re-running in python-space may
# make a difference, see test_apply_mutate.test_mutate_groups
pass

else:
# If the fast apply path could be used we can return here.
# Otherwise we need to fall back to the slow implementation.
if len(result_values) == len(group_keys):
return group_keys, result_values, mutated

if result_values is None:
# result_values is None if fast apply path wasn't taken
# or fast apply aborted with an unexpected exception.
# In either case, initialize the result list and perform
# the slow iteration.
result_values = []
skip_first = False
else:
# If result_values is not None we're in the case that the
# fast apply loop was broken prematurely but we have
# already the result for the first group which we can reuse.
skip_first = True
result_values = []

# This calls DataSplitter.__iter__
zipped = zip(group_keys, splitter)
if skip_first:
# pop the first item from the front of the iterator
next(zipped)

for key, group in zipped:
object.__setattr__(group, "name", key)
Expand Down Expand Up @@ -1290,11 +1236,6 @@ def _chop(self, sdata: Series, slice_obj: slice) -> Series:


class FrameSplitter(DataSplitter):
def fast_apply(self, f: F, sdata: FrameOrSeries, names):
# must return keys::list, values::list, mutated::bool
starts, ends = lib.generate_slices(self.slabels, self.ngroups)
return libreduction.apply_frame_axis0(sdata, f, names, starts, ends)

def _chop(self, sdata: DataFrame, slice_obj: slice) -> DataFrame:
# Fastpath equivalent to:
# if self.axis == 0:
Expand Down
45 changes: 2 additions & 43 deletions pandas/tests/groupby/test_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import numpy as np
import pytest

import pandas.util._test_decorators as td

import pandas as pd
from pandas import (
DataFrame,
Expand Down Expand Up @@ -86,40 +84,6 @@ def test_apply_trivial_fail():
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fast_apply not used
def test_fast_apply():
# make sure that fast apply is correctly called
# rather than raising any kind of error
# otherwise the python path will be callsed
# which slows things down
N = 1000
labels = np.random.randint(0, 2000, size=N)
labels2 = np.random.randint(0, 3, size=N)
df = DataFrame(
{
"key": labels,
"key2": labels2,
"value1": np.random.randn(N),
"value2": ["foo", "bar", "baz", "qux"] * (N // 4),
}
)

def f(g):
return 1

g = df.groupby(["key", "key2"])

grouper = g.grouper

splitter = grouper._get_splitter(g._selected_obj, axis=g.axis)
group_keys = grouper._get_group_keys()
sdata = splitter.sorted_data

values, mutated = splitter.fast_apply(f, sdata, group_keys)

assert not mutated


@pytest.mark.parametrize(
"df, group_names",
[
Expand Down Expand Up @@ -216,8 +180,6 @@ def test_group_apply_once_per_group2(capsys):
assert result == expected


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fast_apply not used
@pytest.mark.xfail(reason="GH-34998")
def test_apply_fast_slow_identical():
# GH 31613

Expand All @@ -237,16 +199,13 @@ def fast(group):
tm.assert_frame_equal(fast_df, slow_df)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fast_apply not used
@pytest.mark.parametrize(
"func",
[
lambda x: x,
pytest.param(lambda x: x[:], marks=pytest.mark.xfail(reason="GH-34998")),
lambda x: x[:],
lambda x: x.copy(deep=False),
pytest.param(
lambda x: x.copy(deep=True), marks=pytest.mark.xfail(reason="GH-34998")
),
lambda x: x.copy(deep=True),
],
)
def test_groupby_apply_identity_maybecopy_index_identical(func):
Expand Down