diff --git a/crates/polars-expr/src/reduce/first_last.rs b/crates/polars-expr/src/reduce/first_last.rs index 5ad086755645..ffde18729bb5 100644 --- a/crates/polars-expr/src/reduce/first_last.rs +++ b/crates/polars-expr/src/reduce/first_last.rs @@ -1,6 +1,7 @@ use std::marker::PhantomData; use polars_core::frame::row::AnyValueBufferTrusted; +use polars_core::with_match_physical_numeric_polars_type; use super::*; @@ -13,10 +14,27 @@ pub fn new_last_reduction(dtype: DataType) -> Box { } fn new_reduction_with_policy(dtype: DataType) -> Box { - Box::new(GenericFirstLastGroupedReduction::

::new(dtype)) + use DataType::*; + use VecGroupedReduction as VGR; + match dtype { + Boolean => Box::new(VecGroupedReduction::new( + dtype, + BoolFirstLastReducer::

(PhantomData), + )), + _ if dtype.is_primitive_numeric() || dtype.is_temporal() => { + with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| { + Box::new(VGR::new(dtype, NumFirstLastReducer::(PhantomData))) + }) + }, + String | Binary => Box::new(VecGroupedReduction::new( + dtype, + BinaryFirstLastReducer::

(PhantomData), + )), + _ => Box::new(GenericFirstLastGroupedReduction::

::new(dtype)), + } } -trait Policy { +trait Policy: Send + Sync + 'static { fn index(len: usize) -> usize; fn should_replace(new: u64, old: u64) -> bool; } @@ -41,7 +59,7 @@ impl Policy for Last { } fn should_replace(new: u64, old: u64) -> bool { - new > old + new >= old } } @@ -57,17 +75,190 @@ impl Policy for Arbitrary { } } +struct NumFirstLastReducer(PhantomData<(P, T)>); + +impl Clone for NumFirstLastReducer { + fn clone(&self) -> Self { + Self(PhantomData) + } +} + +impl Reducer for NumFirstLastReducer +where + P: Policy, + T: PolarsNumericType, + ChunkedArray: IntoSeries, +{ + type Dtype = T; + type Value = (Option, u64); + + fn init(&self) -> Self::Value { + (None, 0) + } + + fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> { + s.to_physical_repr() + } + + fn combine(&self, a: &mut Self::Value, b: &Self::Value) { + if P::should_replace(b.1, a.1) { + *a = *b; + } + } + + fn reduce_one(&self, a: &mut Self::Value, b: Option, seq_id: u64) { + if P::should_replace(seq_id, a.1) { + *a = (b, seq_id); + } + } + + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, seq_id: u64) { + if !ca.is_empty() && P::should_replace(seq_id, v.1) { + let val = ca.get(P::index(ca.len())); + *v = (val, seq_id); + } + } + + fn finish( + &self, + v: Vec, + m: Option, + dtype: &DataType, + ) -> PolarsResult { + assert!(m.is_none()); // This should only be used with VecGroupedReduction. + let ca: ChunkedArray = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY); + ca.into_series().cast(dtype) + } +} + +struct BinaryFirstLastReducer

(PhantomData

); + +impl

Clone for BinaryFirstLastReducer

{ + fn clone(&self) -> Self { + Self(PhantomData) + } +} + +fn replace_opt_bytes(l: &mut Option>, r: Option<&[u8]>) { + match (l, r) { + (Some(l), Some(r)) => { + l.clear(); + l.extend_from_slice(r); + }, + (l, r) => *l = r.map(|s| s.to_owned()), + } +} + +impl

Reducer for BinaryFirstLastReducer

+where + P: Policy, +{ + type Dtype = BinaryType; + type Value = (Option>, u64); + + fn init(&self) -> Self::Value { + (None, 0) + } + + fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> { + Cow::Owned(s.cast(&DataType::Binary).unwrap()) + } + + fn combine(&self, a: &mut Self::Value, b: &Self::Value) { + if P::should_replace(b.1, a.1) { + a.0.clone_from(&b.0); + a.1 = b.1; + } + } + + fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, seq_id: u64) { + if P::should_replace(seq_id, a.1) { + replace_opt_bytes(&mut a.0, b); + a.1 = seq_id; + } + } + + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, seq_id: u64) { + if !ca.is_empty() && P::should_replace(seq_id, v.1) { + replace_opt_bytes(&mut v.0, ca.get(P::index(ca.len()))); + v.1 = seq_id; + } + } + + fn finish( + &self, + v: Vec, + m: Option, + dtype: &DataType, + ) -> PolarsResult { + assert!(m.is_none()); // This should only be used with VecGroupedReduction. + let ca: BinaryChunked = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY); + ca.into_series().cast(dtype) + } +} + +struct BoolFirstLastReducer

(PhantomData

); + +impl

Clone for BoolFirstLastReducer

{ + fn clone(&self) -> Self { + Self(PhantomData) + } +} + +impl

Reducer for BoolFirstLastReducer

+where + P: Policy, +{ + type Dtype = BooleanType; + type Value = (Option, u64); + + fn init(&self) -> Self::Value { + (None, 0) + } + + fn combine(&self, a: &mut Self::Value, b: &Self::Value) { + if P::should_replace(b.1, a.1) { + *a = *b; + } + } + + fn reduce_one(&self, a: &mut Self::Value, b: Option, seq_id: u64) { + if P::should_replace(seq_id, a.1) { + a.0 = b; + a.1 = seq_id; + } + } + + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, seq_id: u64) { + if !ca.is_empty() && P::should_replace(seq_id, v.1) { + v.0 = ca.get(P::index(ca.len())); + v.1 = seq_id; + } + } + + fn finish( + &self, + v: Vec, + m: Option, + _dtype: &DataType, + ) -> PolarsResult { + assert!(m.is_none()); // This should only be used with VecGroupedReduction. + let ca: BooleanChunked = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY); + Ok(ca.into_series()) + } +} + pub struct GenericFirstLastGroupedReduction

{ - dtype: DataType, + in_dtype: DataType, values: Vec>, seqs: Vec, policy: PhantomData P>, } impl

GenericFirstLastGroupedReduction

{ - fn new(dtype: DataType) -> Self { + fn new(in_dtype: DataType) -> Self { Self { - dtype, + in_dtype, values: Vec::new(), seqs: Vec::new(), policy: PhantomData, @@ -78,7 +269,7 @@ impl

GenericFirstLastGroupedReduction

{ impl GroupedReduction for GenericFirstLastGroupedReduction

{ fn new_empty(&self) -> Box { Box::new(Self { - dtype: self.dtype.clone(), + in_dtype: self.in_dtype.clone(), values: Vec::new(), seqs: Vec::new(), policy: PhantomData, @@ -176,7 +367,7 @@ impl GroupedReduction for GenericFirstLastGroupedReduction< std::iter::zip(values, seqs) .map(|(values, seqs)| { Box::new(Self { - dtype: self.dtype.clone(), + in_dtype: self.in_dtype.clone(), values, seqs, policy: PhantomData, @@ -188,7 +379,7 @@ impl GroupedReduction for GenericFirstLastGroupedReduction< fn finalize(&mut self) -> PolarsResult { self.seqs.clear(); unsafe { - let mut buf = AnyValueBufferTrusted::new(&self.dtype, self.values.len()); + let mut buf = AnyValueBufferTrusted::new(&self.in_dtype, self.values.len()); for v in core::mem::take(&mut self.values) { buf.add_unchecked_owned_physical(&v); } diff --git a/crates/polars-expr/src/reduce/mean.rs b/crates/polars-expr/src/reduce/mean.rs index 51d48f54790d..a5c666b9dd7a 100644 --- a/crates/polars-expr/src/reduce/mean.rs +++ b/crates/polars-expr/src/reduce/mean.rs @@ -101,12 +101,12 @@ where } #[inline(always)] - fn reduce_one(&self, a: &mut Self::Value, b: Option) { + fn reduce_one(&self, a: &mut Self::Value, b: Option, _seq_id: u64) { a.0 += b.unwrap_or(T::Native::zero()).as_(); a.1 += b.is_some() as usize; } - fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray) { + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, _seq_id: u64) { v.0 += ChunkAgg::_sum_as_f64(ca); v.1 += ca.len() - ca.null_count(); } @@ -141,12 +141,12 @@ impl Reducer for BoolMeanReducer { } #[inline(always)] - fn reduce_one(&self, a: &mut Self::Value, b: Option) { + fn reduce_one(&self, a: &mut Self::Value, b: Option, _seq_id: u64) { a.0 += b.unwrap_or(false) as usize; a.1 += b.is_some() as usize; } - fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray) { + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, _seq_id: u64) { v.0 += ca.sum().unwrap_or(0) as usize; v.1 += ca.len() - ca.null_count(); } diff --git a/crates/polars-expr/src/reduce/min_max.rs b/crates/polars-expr/src/reduce/min_max.rs index 8f172d25f4e5..571c213c8923 100644 --- a/crates/polars-expr/src/reduce/min_max.rs +++ b/crates/polars-expr/src/reduce/min_max.rs @@ -190,10 +190,10 @@ impl Reducer for BinaryMinReducer { } fn combine(&self, a: &mut Self::Value, b: &Self::Value) { - self.reduce_one(a, b.as_deref()) + self.reduce_one(a, b.as_deref(), 0) } - fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>) { + fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) { match (a, b) { (_, None) => {}, (l @ None, Some(r)) => *l = Some(r.to_owned()), @@ -206,8 +206,8 @@ impl Reducer for BinaryMinReducer { } } - fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked) { - self.reduce_one(v, ca.min_binary()) + fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) { + self.reduce_one(v, ca.min_binary(), 0) } fn finish( @@ -238,11 +238,11 @@ impl Reducer for BinaryMaxReducer { #[inline(always)] fn combine(&self, a: &mut Self::Value, b: &Self::Value) { - self.reduce_one(a, b.as_deref()) + self.reduce_one(a, b.as_deref(), 0) } #[inline(always)] - fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>) { + fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) { match (a, b) { (_, None) => {}, (l @ None, Some(r)) => *l = Some(r.to_owned()), @@ -256,8 +256,8 @@ impl Reducer for BinaryMaxReducer { } #[inline(always)] - fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked) { - self.reduce_one(v, ca.max_binary()) + fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) { + self.reduce_one(v, ca.max_binary(), 0) } #[inline(always)] diff --git a/crates/polars-expr/src/reduce/mod.rs b/crates/polars-expr/src/reduce/mod.rs index f63b5790e26c..e180275615a1 100644 --- a/crates/polars-expr/src/reduce/mod.rs +++ b/crates/polars-expr/src/reduce/mod.rs @@ -121,8 +121,9 @@ pub trait Reducer: Send + Sync + Clone + 'static { &self, a: &mut Self::Value, b: Option<::Physical<'_>>, + seq_id: u64, ); - fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray); + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, seq_id: u64); fn finish( &self, v: Vec, @@ -179,6 +180,7 @@ impl Reducer for NumReducer { &self, a: &mut Self::Value, b: Option<::Physical<'_>>, + _seq_id: u64, ) { if let Some(b) = b { *a = ::combine(*a, b); @@ -186,7 +188,7 @@ impl Reducer for NumReducer { } #[inline(always)] - fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray) { + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, _seq_id: u64) { if let Some(r) = ::reduce_ca(ca) { *v = ::combine(*v, r); } @@ -243,13 +245,14 @@ where &mut self, values: &Series, group_idx: IdxSize, - _seq_id: u64, + seq_id: u64, ) -> PolarsResult<()> { assert!(values.dtype() == &self.in_dtype); + let seq_id = seq_id + 1; // So we can use 0 for 'none yet'. let values = self.reducer.cast_series(values); let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); self.reducer - .reduce_ca(&mut self.values[group_idx as usize], ca); + .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id); Ok(()) } @@ -257,10 +260,11 @@ where &mut self, values: &Series, group_idxs: &[IdxSize], - _seq_id: u64, + seq_id: u64, ) -> PolarsResult<()> { assert!(values.dtype() == &self.in_dtype); assert!(values.len() == group_idxs.len()); + let seq_id = seq_id + 1; // So we can use 0 for 'none yet'. let values = self.reducer.cast_series(values); let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); unsafe { @@ -268,7 +272,7 @@ where if values.has_nulls() { for (g, ov) in group_idxs.iter().zip(ca.iter()) { let grp = self.values.get_unchecked_mut(*g as usize); - self.reducer.reduce_one(grp, ov); + self.reducer.reduce_one(grp, ov, seq_id); } } else { let mut offset = 0; @@ -276,7 +280,7 @@ where let subgroup = &group_idxs[offset..offset + arr.len()]; for (g, v) in subgroup.iter().zip(arr.values_iter()) { let grp = self.values.get_unchecked_mut(*g as usize); - self.reducer.reduce_one(grp, Some(v)); + self.reducer.reduce_one(grp, Some(v), seq_id); } offset += arr.len(); } @@ -395,15 +399,16 @@ where &mut self, values: &Series, group_idx: IdxSize, - _seq_id: u64, + seq_id: u64, ) -> PolarsResult<()> { // TODO: we should really implement a sum-as-other-type operation instead // of doing this materialized cast. assert!(values.dtype() == &self.in_dtype); + let seq_id = seq_id + 1; // So we can use 0 for 'none yet'. let values = values.to_physical_repr(); let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); self.reducer - .reduce_ca(&mut self.values[group_idx as usize], ca); + .reduce_ca(&mut self.values[group_idx as usize], ca, seq_id); if ca.len() != ca.null_count() { self.mask.set(group_idx as usize, true); } @@ -414,12 +419,13 @@ where &mut self, values: &Series, group_idxs: &[IdxSize], - _seq_id: u64, + seq_id: u64, ) -> PolarsResult<()> { // TODO: we should really implement a sum-as-other-type operation instead // of doing this materialized cast. assert!(values.dtype() == &self.in_dtype); assert!(values.len() == group_idxs.len()); + let seq_id = seq_id + 1; // So we can use 0 for 'none yet'. let values = values.to_physical_repr(); let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); unsafe { @@ -427,7 +433,7 @@ where for (g, ov) in group_idxs.iter().zip(ca.iter()) { if let Some(v) = ov { let grp = self.values.get_unchecked_mut(*g as usize); - self.reducer.reduce_one(grp, Some(v)); + self.reducer.reduce_one(grp, Some(v), seq_id); self.mask.set_unchecked(*g as usize, true); } } diff --git a/crates/polars-expr/src/reduce/var_std.rs b/crates/polars-expr/src/reduce/var_std.rs index af60db051935..824a37282853 100644 --- a/crates/polars-expr/src/reduce/var_std.rs +++ b/crates/polars-expr/src/reduce/var_std.rs @@ -75,13 +75,13 @@ impl Reducer for VarStdReducer { } #[inline(always)] - fn reduce_one(&self, a: &mut Self::Value, b: Option) { + fn reduce_one(&self, a: &mut Self::Value, b: Option, _seq_id: u64) { if let Some(x) = b { a.add_one(x.as_()); } } - fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray) { + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, _seq_id: u64) { for arr in ca.downcast_iter() { v.combine(&polars_compute::var_cov::var(arr)) } @@ -129,12 +129,12 @@ impl Reducer for BoolVarStdReducer { } #[inline(always)] - fn reduce_one(&self, a: &mut Self::Value, b: Option) { + fn reduce_one(&self, a: &mut Self::Value, b: Option, _seq_id: u64) { a.0 += b.unwrap_or(false) as usize; a.1 += b.is_some() as usize; } - fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray) { + fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray, _seq_id: u64) { v.0 += ca.sum().unwrap_or(0) as usize; v.1 += ca.len() - ca.null_count(); }