From 9c491962878d503eac8568373151daadbe167047 Mon Sep 17 00:00:00 2001 From: Vince Date: Tue, 4 Jul 2023 10:10:10 +0200 Subject: [PATCH 1/4] Improve median performance. --- .../physical-expr/src/aggregate/median.rs | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs index 6f79c98a6c3a..f022a753d50a 100644 --- a/datafusion/physical-expr/src/aggregate/median.rs +++ b/datafusion/physical-expr/src/aggregate/median.rs @@ -66,6 +66,7 @@ impl AggregateExpr for Median { fn create_accumulator(&self) -> Result> { Ok(Box::new(MedianAccumulator { data_type: self.data_type.clone(), + batches: vec![], all_values: vec![], })) } @@ -111,13 +112,28 @@ impl PartialEq for Median { /// The intermediate state is represented as a List of those scalars struct MedianAccumulator { data_type: DataType, + batches: Vec, all_values: Vec, } +fn to_scalar_values(arrays: &[ArrayRef]) -> Result> { + let num_values: usize = arrays.iter().map(|a| a.len()).sum(); + let mut all_values = Vec::with_capacity(num_values); + + for array in arrays { + for index in 0..array.len() { + all_values.push(ScalarValue::try_from_array(&array, index)?); + } + } + + Ok(all_values) +} + impl Accumulator for MedianAccumulator { fn state(&self) -> Result> { - let state = - ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone()); + let all_values = to_scalar_values(&self.batches)?; + let state = ScalarValue::new_list(Some(all_values), self.data_type.clone()); + Ok(vec![state]) } @@ -126,11 +142,7 @@ impl Accumulator for MedianAccumulator { let array = &values[0]; assert_eq!(array.data_type(), &self.data_type); - self.all_values.reserve(array.len()); - for index in 0..array.len() { - self.all_values - .push(ScalarValue::try_from_array(array, index)?); - } + self.batches.push(array.clone()); Ok(()) } @@ -157,7 +169,14 @@ impl Accumulator for MedianAccumulator { } fn evaluate(&self) -> Result { - if !self.all_values.iter().any(|v| !v.is_null()) { + let batch_values = to_scalar_values(&self.batches)?; + + if !self + .all_values + .iter() + .chain(batch_values.iter()) + .any(|v| !v.is_null()) + { return ScalarValue::try_from(&self.data_type); } @@ -166,6 +185,7 @@ impl Accumulator for MedianAccumulator { let array = ScalarValue::iter_to_array( self.all_values .iter() + .chain(batch_values.iter()) // ignore null values .filter(|v| !v.is_null()) .cloned(), @@ -214,7 +234,10 @@ impl Accumulator for MedianAccumulator { } fn size(&self) -> usize { + let batches_size: usize = self.batches.iter().map(|a| a.len()).sum(); + std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.all_values) + + batches_size - std::mem::size_of_val(&self.all_values) + self.data_type.size() - std::mem::size_of_val(&self.data_type) From 4f9a54092ca2f26f8378292815270d39ba45ab27 Mon Sep 17 00:00:00 2001 From: Vince Date: Tue, 4 Jul 2023 10:48:48 +0200 Subject: [PATCH 2/4] Fix formatting. --- datafusion/physical-expr/src/aggregate/median.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs index f022a753d50a..1f7fec7c6efa 100644 --- a/datafusion/physical-expr/src/aggregate/median.rs +++ b/datafusion/physical-expr/src/aggregate/median.rs @@ -236,7 +236,8 @@ impl Accumulator for MedianAccumulator { fn size(&self) -> usize { let batches_size: usize = self.batches.iter().map(|a| a.len()).sum(); - std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.all_values) + std::mem::size_of_val(self) + + ScalarValue::size_of_vec(&self.all_values) + batches_size - std::mem::size_of_val(&self.all_values) + self.data_type.size() From a308ba8431542336ce204e42a402181ef925bdf4 Mon Sep 17 00:00:00 2001 From: Vince Date: Wed, 5 Jul 2023 19:00:46 +0200 Subject: [PATCH 3/4] Review feedback --- .../physical-expr/src/aggregate/median.rs | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs index 1f7fec7c6efa..f4a7db47fdf2 100644 --- a/datafusion/physical-expr/src/aggregate/median.rs +++ b/datafusion/physical-expr/src/aggregate/median.rs @@ -66,7 +66,7 @@ impl AggregateExpr for Median { fn create_accumulator(&self) -> Result> { Ok(Box::new(MedianAccumulator { data_type: self.data_type.clone(), - batches: vec![], + arrays: vec![], all_values: vec![], })) } @@ -109,29 +109,19 @@ impl PartialEq for Median { /// The median accumulator accumulates the raw input values /// as `ScalarValue`s /// -/// The intermediate state is represented as a List of those scalars +/// The intermediate state is represented as a List of scalar values updated by +/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values +/// in the final evaluation step so that we avoid expensive conversions and +/// allocations during `update_batch`. struct MedianAccumulator { data_type: DataType, - batches: Vec, + arrays: Vec, all_values: Vec, } -fn to_scalar_values(arrays: &[ArrayRef]) -> Result> { - let num_values: usize = arrays.iter().map(|a| a.len()).sum(); - let mut all_values = Vec::with_capacity(num_values); - - for array in arrays { - for index in 0..array.len() { - all_values.push(ScalarValue::try_from_array(&array, index)?); - } - } - - Ok(all_values) -} - impl Accumulator for MedianAccumulator { fn state(&self) -> Result> { - let all_values = to_scalar_values(&self.batches)?; + let all_values = to_scalar_values(&self.arrays)?; let state = ScalarValue::new_list(Some(all_values), self.data_type.clone()); Ok(vec![state]) @@ -141,8 +131,9 @@ impl Accumulator for MedianAccumulator { assert_eq!(values.len(), 1); let array = &values[0]; + // Defer conversions to scalar values to final evaluation. assert_eq!(array.data_type(), &self.data_type); - self.batches.push(array.clone()); + self.arrays.push(array.clone()); Ok(()) } @@ -169,7 +160,7 @@ impl Accumulator for MedianAccumulator { } fn evaluate(&self) -> Result { - let batch_values = to_scalar_values(&self.batches)?; + let batch_values = to_scalar_values(&self.arrays)?; if !self .all_values @@ -234,7 +225,7 @@ impl Accumulator for MedianAccumulator { } fn size(&self) -> usize { - let batches_size: usize = self.batches.iter().map(|a| a.len()).sum(); + let batches_size: usize = self.arrays.iter().map(|a| a.len()).sum(); std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.all_values) @@ -245,6 +236,19 @@ impl Accumulator for MedianAccumulator { } } +fn to_scalar_values(arrays: &[ArrayRef]) -> Result> { + let num_values: usize = arrays.iter().map(|a| a.len()).sum(); + let mut all_values = Vec::with_capacity(num_values); + + for array in arrays { + for index in 0..array.len() { + all_values.push(ScalarValue::try_from_array(&array, index)?); + } + } + + Ok(all_values) +} + /// Given a returns `array[indicies[indicie_index]]` as a `ScalarValue` fn scalar_at_index( array: &dyn Array, From 3fa6cd2fe1f1dd8e3e5936405f33cb398440899f Mon Sep 17 00:00:00 2001 From: Vince Date: Wed, 5 Jul 2023 19:43:41 +0200 Subject: [PATCH 4/4] Renamed arrays size. --- datafusion/physical-expr/src/aggregate/median.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs index f4a7db47fdf2..28f1fc31995a 100644 --- a/datafusion/physical-expr/src/aggregate/median.rs +++ b/datafusion/physical-expr/src/aggregate/median.rs @@ -225,11 +225,11 @@ impl Accumulator for MedianAccumulator { } fn size(&self) -> usize { - let batches_size: usize = self.arrays.iter().map(|a| a.len()).sum(); + let arrays_size: usize = self.arrays.iter().map(|a| a.len()).sum(); std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.all_values) - + batches_size + + arrays_size - std::mem::size_of_val(&self.all_values) + self.data_type.size() - std::mem::size_of_val(&self.data_type)