Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl convert_to_state for GroupsAccumulatorAdapter (faster median for high cardinality aggregates) #11827

Merged
merged 13 commits into from
Sep 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,11 @@ impl GroupsAccumulatorAdapter {
let state = &mut self.states[group_idx];
sizes_pre += state.size();

let values_to_accumulate =
slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?;
let values_to_accumulate = slice_and_maybe_filter(
&values,
opt_filter.as_ref().map(|f| f.as_boolean()),
offsets,
)?;
(f)(state.accumulator.as_mut(), &values_to_accumulate)?;

// clear out the state so they are empty for next
Expand Down Expand Up @@ -290,6 +293,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
result
}

// filtered_null_mask(opt_filter, &values);
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let vec_size_pre = self.states.allocated_size();
let states = emit_to.take_needed(&mut self.states);
Expand Down Expand Up @@ -348,6 +352,46 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
fn size(&self) -> usize {
self.allocation_bytes
}

fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
let num_rows = values[0].len();

// Each row has its respective group
let mut results = vec![];
for row_idx in 0..num_rows {
// Create the empty accumulator for converting
let mut converted_accumulator = (self.factory)()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a follow on PR I wonder if we could potentially to improve performance by adding a clear() or reset() type function to each accumulator to avoid having to create a new accumulator for each group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I want to reuse the converted_accumulator at the beginning, but it is not ensure that the state will be reset after calling state.
It is clever to add such function to do the reset work explicitly.


// Convert row to states
let values_to_accumulate =
slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?;
converted_accumulator.update_batch(&values_to_accumulate)?;
let states = converted_accumulator.state()?;

// Resize results to have enough columns according to the converted states
results.resize_with(states.len(), || Vec::with_capacity(num_rows));

// Add the states to results
for (idx, state_val) in states.into_iter().enumerate() {
results[idx].push(state_val);
}
}

let arrays = results
.into_iter()
.map(ScalarValue::iter_to_array)
.collect::<Result<Vec<_>>>()?;

Ok(arrays)
}

fn supports_convert_to_state(&self) -> bool {
true
}
}

/// Extension trait for [`Vec`] to account for allocations.
Expand Down Expand Up @@ -384,7 +428,7 @@ fn get_filter_at_indices(
// Copied from physical-plan
pub(crate) fn slice_and_maybe_filter(
aggr_array: &[ArrayRef],
filter_opt: Option<&ArrayRef>,
filter_opt: Option<&BooleanArray>,
offsets: &[usize],
) -> Result<Vec<ArrayRef>> {
let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
Expand All @@ -394,13 +438,12 @@ pub(crate) fn slice_and_maybe_filter(
.collect();

if let Some(f) = filter_opt {
let filter_array = f.slice(offset, length);
let filter_array = filter_array.as_boolean();
let filter = f.slice(offset, length);

sliced_arrays
.iter()
.map(|array| {
compute::filter(array, filter_array).map_err(|e| arrow_datafusion_err!(e))
compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e))
})
.collect()
} else {
Expand Down
220 changes: 220 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,51 @@ GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
-2117946883 d -2117946883 NULL NULL NULL
-2098805236 c -2098805236 NULL NULL NULL

query ITIIII
SELECT c5, c1,
MEDIAN(c5),
MEDIAN(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END),
MEDIAN(c5) FILTER (WHERE c1 = 'b'),
MEDIAN(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END) FILTER (WHERE c1 = 'b')
FROM aggregate_test_100
GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
----
-2141999138 c -2141999138 NULL NULL NULL
-2141451704 a -2141451704 -2141451704 NULL NULL
-2138770630 b -2138770630 NULL -2138770630 NULL
-2117946883 d -2117946883 NULL NULL NULL
-2098805236 c -2098805236 NULL NULL NULL

query ITIIII
SELECT c5, c1,
APPROX_MEDIAN(c5),
APPROX_MEDIAN(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END),
APPROX_MEDIAN(c5) FILTER (WHERE c1 = 'b'),
APPROX_MEDIAN(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END) FILTER (WHERE c1 = 'b')
FROM aggregate_test_100
GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
----
-2141999138 c -2141999138 NULL NULL NULL
-2141451704 a -2141451704 -2141451704 NULL NULL
-2138770630 b -2138770630 NULL -2138770630 NULL
-2117946883 d -2117946883 NULL NULL NULL
-2098805236 c -2098805236 NULL NULL NULL

query ITIIII
SELECT c5, c1,
APPROX_DISTINCT(c5),
APPROX_DISTINCT(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END),
APPROX_DISTINCT(c5) FILTER (WHERE c1 = 'b'),
APPROX_DISTINCT(CASE WHEN c1 = 'a' THEN c5 ELSE NULL END) FILTER (WHERE c1 = 'b')
FROM aggregate_test_100
GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
----
-2141999138 c 1 0 0 0
-2141451704 a 1 1 0 0
-2138770630 b 1 0 1 0
-2117946883 d 1 0 0 0
-2098805236 c 1 0 0 0

# FIXME: add bool_and(v3) column when issue fixed
# ISSUE https://github.com/apache/datafusion/issues/11846
query TBBB rowsort
Expand Down Expand Up @@ -222,6 +267,36 @@ SELECT c2, sum(c5), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
4 16155718643 9.531112968922
5 6449337880 7.074412226677

# Test median for int / float
query IIR
SELECT c2, median(c5), median(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
----
1 23971150 0.5922606
2 -562486880 0.43422085
3 240273900 0.40199697
4 762932956 0.48515016
5 604973998 0.49842384

# Test approx_median for int / float
query IIR
SELECT c2, approx_median(c5), approx_median(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
----
1 191655437 0.59926736
2 -587831330 0.43230486
3 240273900 0.40199697
4 762932956 0.48515016
5 593204320 0.5156586

# Test approx_distinct for varchar / int
query III
SELECT c2, approx_distinct(c1), approx_distinct(c5) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
----
1 5 22
2 5 22
3 5 19
4 5 23
5 5 14

# Test count with nullable fields
query III
SELECT c2, count(c3), count(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
Expand Down Expand Up @@ -252,6 +327,36 @@ SELECT c2, sum(c3), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
4 29 9.531112968922
5 -194 7.074412226677

# Test median with nullable fields
query IIR
SELECT c2, median(c3), median(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 12 0.6067944
2 1 0.46076488
3 14 0.40154034
4 -17 0.48515016
5 -35 0.5536642

# Test approx_median with nullable fields
query IIR
SELECT c2, approx_median(c3), approx_median(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 12 0.6067944
2 1 0.46076488
3 14 0.40154034
4 -7 0.48515016
5 -39 0.5536642

# Test approx_distinct with nullable fields
query II
SELECT c2, approx_distinct(c3) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 19
2 16
3 13
4 16
5 12

# Test avg for tinyint / float
query TRR
SELECT
Expand Down Expand Up @@ -338,6 +443,48 @@ FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
4 417
5 284

# Test approx_distinct with filter
query III
SELECT
c2,
approx_distinct(c3) FILTER (WHERE c3 > 0),
approx_distinct(c3) FILTER (WHERE c11 > 10)
FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
----
1 13 0
2 12 0
3 11 0
4 13 0
5 5 0

# Test median with filter
query III
SELECT
c2,
median(c3) FILTER (WHERE c3 > 0),
median(c3) FILTER (WHERE c3 < 0)
FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
----
1 57 -56
2 52 -60
3 71 -74
4 65 -69
5 64 -59

# Test approx_median with filter
query III
SELECT
c2,
approx_median(c3) FILTER (WHERE c3 > 0),
approx_median(c3) FILTER (WHERE c3 < 0)
FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
----
1 57 -56
2 52 -60
3 71 -76
4 65 -64
5 64 -59

# Test count with nullable fields and filter
query III
SELECT c2,
Expand Down Expand Up @@ -421,6 +568,79 @@ FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
4 -171 56 2.10740506649 1.939846396446
5 -86 -76 1.8741710186 1.600569307804

# Test approx_distinct with nullable fields and filter
query II
SELECT c2,
approx_distinct(c3) FILTER (WHERE c5 > 0)
FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 11
2 6
3 6
4 11
5 8

# Test approx_distinct with nullable fields and nullable filter
query II
SELECT c2,
approx_distinct(c3) FILTER (WHERE c11 > 0.5)
FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 10
2 6
3 3
4 3
5 6

# Test median with nullable fields and filter
query IIR
SELECT c2,
median(c3) FILTER (WHERE c5 > 0),
median(c11) FILTER (WHERE c5 < 0)
FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 -5 0.6623719
2 15 0.52930677
3 13 0.32792538
4 -38 0.49774808
5 -18 0.49842384

# Test min / max with nullable fields and nullable filter
query II
SELECT c2,
median(c3) FILTER (WHERE c11 > 0.5)
FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 33
2 -29
3 22
4 -90
5 -22

# Test approx_median with nullable fields and filter
query IIR
SELECT c2,
approx_median(c3) FILTER (WHERE c5 > 0),
approx_median(c11) FILTER (WHERE c5 < 0)
FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 -5 0.6623719
2 12 0.52930677
3 13 0.32792538
4 -38 0.49774808
5 -21 0.47652745

# Test approx_median with nullable fields and nullable filter
query II
SELECT c2,
approx_median(c3) FILTER (WHERE c11 > 0.5)
FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2;
----
1 35
2 -29
3 22
4 -90
5 -32

statement ok
DROP TABLE aggregate_test_100_null;
Expand Down