Skip to content

Commit

Permalink
perf: Specialize first/last agg for simple types in new-streaming eng…
Browse files Browse the repository at this point in the history
…ine (#20728)
  • Loading branch information
orlp authored Jan 15, 2025
1 parent 958d00f commit e8d10a9
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 36 deletions.
209 changes: 200 additions & 9 deletions crates/polars-expr/src/reduce/first_last.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::marker::PhantomData;

use polars_core::frame::row::AnyValueBufferTrusted;
use polars_core::with_match_physical_numeric_polars_type;

use super::*;

Expand All @@ -13,10 +14,27 @@ pub fn new_last_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {
}

fn new_reduction_with_policy<P: Policy + 'static>(dtype: DataType) -> Box<dyn GroupedReduction> {
Box::new(GenericFirstLastGroupedReduction::<P>::new(dtype))
use DataType::*;
use VecGroupedReduction as VGR;
match dtype {
Boolean => Box::new(VecGroupedReduction::new(
dtype,
BoolFirstLastReducer::<P>(PhantomData),
)),
_ if dtype.is_primitive_numeric() || dtype.is_temporal() => {
with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| {
Box::new(VGR::new(dtype, NumFirstLastReducer::<P, $T>(PhantomData)))
})
},
String | Binary => Box::new(VecGroupedReduction::new(
dtype,
BinaryFirstLastReducer::<P>(PhantomData),
)),
_ => Box::new(GenericFirstLastGroupedReduction::<P>::new(dtype)),
}
}

trait Policy {
trait Policy: Send + Sync + 'static {
fn index(len: usize) -> usize;
fn should_replace(new: u64, old: u64) -> bool;
}
Expand All @@ -41,7 +59,7 @@ impl Policy for Last {
}

fn should_replace(new: u64, old: u64) -> bool {
new > old
new >= old
}
}

Expand All @@ -57,17 +75,190 @@ impl Policy for Arbitrary {
}
}

struct NumFirstLastReducer<P, T>(PhantomData<(P, T)>);

impl<P, T> Clone for NumFirstLastReducer<P, T> {
fn clone(&self) -> Self {
Self(PhantomData)
}
}

impl<P, T> Reducer for NumFirstLastReducer<P, T>
where
P: Policy,
T: PolarsNumericType,
ChunkedArray<T>: IntoSeries,
{
type Dtype = T;
type Value = (Option<T::Native>, u64);

fn init(&self) -> Self::Value {
(None, 0)
}

fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
s.to_physical_repr()
}

fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
if P::should_replace(b.1, a.1) {
*a = *b;
}
}

fn reduce_one(&self, a: &mut Self::Value, b: Option<T::Native>, seq_id: u64) {
if P::should_replace(seq_id, a.1) {
*a = (b, seq_id);
}
}

fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
if !ca.is_empty() && P::should_replace(seq_id, v.1) {
let val = ca.get(P::index(ca.len()));
*v = (val, seq_id);
}
}

fn finish(
&self,
v: Vec<Self::Value>,
m: Option<Bitmap>,
dtype: &DataType,
) -> PolarsResult<Series> {
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
let ca: ChunkedArray<T> = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);
ca.into_series().cast(dtype)
}
}

struct BinaryFirstLastReducer<P>(PhantomData<P>);

impl<P> Clone for BinaryFirstLastReducer<P> {
fn clone(&self) -> Self {
Self(PhantomData)
}
}

fn replace_opt_bytes(l: &mut Option<Vec<u8>>, r: Option<&[u8]>) {
match (l, r) {
(Some(l), Some(r)) => {
l.clear();
l.extend_from_slice(r);
},
(l, r) => *l = r.map(|s| s.to_owned()),
}
}

impl<P> Reducer for BinaryFirstLastReducer<P>
where
P: Policy,
{
type Dtype = BinaryType;
type Value = (Option<Vec<u8>>, u64);

fn init(&self) -> Self::Value {
(None, 0)
}

fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
Cow::Owned(s.cast(&DataType::Binary).unwrap())
}

fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
if P::should_replace(b.1, a.1) {
a.0.clone_from(&b.0);
a.1 = b.1;
}
}

fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, seq_id: u64) {
if P::should_replace(seq_id, a.1) {
replace_opt_bytes(&mut a.0, b);
a.1 = seq_id;
}
}

fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
if !ca.is_empty() && P::should_replace(seq_id, v.1) {
replace_opt_bytes(&mut v.0, ca.get(P::index(ca.len())));
v.1 = seq_id;
}
}

fn finish(
&self,
v: Vec<Self::Value>,
m: Option<Bitmap>,
dtype: &DataType,
) -> PolarsResult<Series> {
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
let ca: BinaryChunked = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);
ca.into_series().cast(dtype)
}
}

struct BoolFirstLastReducer<P>(PhantomData<P>);

impl<P> Clone for BoolFirstLastReducer<P> {
fn clone(&self) -> Self {
Self(PhantomData)
}
}

impl<P> Reducer for BoolFirstLastReducer<P>
where
P: Policy,
{
type Dtype = BooleanType;
type Value = (Option<bool>, u64);

fn init(&self) -> Self::Value {
(None, 0)
}

fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
if P::should_replace(b.1, a.1) {
*a = *b;
}
}

fn reduce_one(&self, a: &mut Self::Value, b: Option<bool>, seq_id: u64) {
if P::should_replace(seq_id, a.1) {
a.0 = b;
a.1 = seq_id;
}
}

fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
if !ca.is_empty() && P::should_replace(seq_id, v.1) {
v.0 = ca.get(P::index(ca.len()));
v.1 = seq_id;
}
}

fn finish(
&self,
v: Vec<Self::Value>,
m: Option<Bitmap>,
_dtype: &DataType,
) -> PolarsResult<Series> {
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
let ca: BooleanChunked = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);
Ok(ca.into_series())
}
}

pub struct GenericFirstLastGroupedReduction<P> {
dtype: DataType,
in_dtype: DataType,
values: Vec<AnyValue<'static>>,
seqs: Vec<u64>,
policy: PhantomData<fn() -> P>,
}

impl<P> GenericFirstLastGroupedReduction<P> {
fn new(dtype: DataType) -> Self {
fn new(in_dtype: DataType) -> Self {
Self {
dtype,
in_dtype,
values: Vec::new(),
seqs: Vec::new(),
policy: PhantomData,
Expand All @@ -78,7 +269,7 @@ impl<P> GenericFirstLastGroupedReduction<P> {
impl<P: Policy + 'static> GroupedReduction for GenericFirstLastGroupedReduction<P> {
fn new_empty(&self) -> Box<dyn GroupedReduction> {
Box::new(Self {
dtype: self.dtype.clone(),
in_dtype: self.in_dtype.clone(),
values: Vec::new(),
seqs: Vec::new(),
policy: PhantomData,
Expand Down Expand Up @@ -176,7 +367,7 @@ impl<P: Policy + 'static> GroupedReduction for GenericFirstLastGroupedReduction<
std::iter::zip(values, seqs)
.map(|(values, seqs)| {
Box::new(Self {
dtype: self.dtype.clone(),
in_dtype: self.in_dtype.clone(),
values,
seqs,
policy: PhantomData,
Expand All @@ -188,7 +379,7 @@ impl<P: Policy + 'static> GroupedReduction for GenericFirstLastGroupedReduction<
fn finalize(&mut self) -> PolarsResult<Series> {
self.seqs.clear();
unsafe {
let mut buf = AnyValueBufferTrusted::new(&self.dtype, self.values.len());
let mut buf = AnyValueBufferTrusted::new(&self.in_dtype, self.values.len());
for v in core::mem::take(&mut self.values) {
buf.add_unchecked_owned_physical(&v);
}
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-expr/src/reduce/mean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ where
}

#[inline(always)]
fn reduce_one(&self, a: &mut Self::Value, b: Option<T::Native>) {
fn reduce_one(&self, a: &mut Self::Value, b: Option<T::Native>, _seq_id: u64) {
a.0 += b.unwrap_or(T::Native::zero()).as_();
a.1 += b.is_some() as usize;
}

fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>) {
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
v.0 += ChunkAgg::_sum_as_f64(ca);
v.1 += ca.len() - ca.null_count();
}
Expand Down Expand Up @@ -141,12 +141,12 @@ impl Reducer for BoolMeanReducer {
}

#[inline(always)]
fn reduce_one(&self, a: &mut Self::Value, b: Option<bool>) {
fn reduce_one(&self, a: &mut Self::Value, b: Option<bool>, _seq_id: u64) {
a.0 += b.unwrap_or(false) as usize;
a.1 += b.is_some() as usize;
}

fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>) {
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
v.0 += ca.sum().unwrap_or(0) as usize;
v.1 += ca.len() - ca.null_count();
}
Expand Down
16 changes: 8 additions & 8 deletions crates/polars-expr/src/reduce/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ impl Reducer for BinaryMinReducer {
}

fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
self.reduce_one(a, b.as_deref())
self.reduce_one(a, b.as_deref(), 0)
}

fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>) {
fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) {
match (a, b) {
(_, None) => {},
(l @ None, Some(r)) => *l = Some(r.to_owned()),
Expand All @@ -206,8 +206,8 @@ impl Reducer for BinaryMinReducer {
}
}

fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked) {
self.reduce_one(v, ca.min_binary())
fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) {
self.reduce_one(v, ca.min_binary(), 0)
}

fn finish(
Expand Down Expand Up @@ -238,11 +238,11 @@ impl Reducer for BinaryMaxReducer {

#[inline(always)]
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
self.reduce_one(a, b.as_deref())
self.reduce_one(a, b.as_deref(), 0)
}

#[inline(always)]
fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>) {
fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) {
match (a, b) {
(_, None) => {},
(l @ None, Some(r)) => *l = Some(r.to_owned()),
Expand All @@ -256,8 +256,8 @@ impl Reducer for BinaryMaxReducer {
}

#[inline(always)]
fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked) {
self.reduce_one(v, ca.max_binary())
fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) {
self.reduce_one(v, ca.max_binary(), 0)
}

#[inline(always)]
Expand Down
Loading

0 comments on commit e8d10a9

Please sign in to comment.