From 2ff71b3b42fe3cd44f8de21e2b8a3d4359f02800 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Sep 2024 16:42:08 -0600 Subject: [PATCH 1/6] agg bench --- native/core/benches/aggregate.rs | 111 +++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 native/core/benches/aggregate.rs diff --git a/native/core/benches/aggregate.rs b/native/core/benches/aggregate.rs new file mode 100644 index 000000000..10ad5a9e7 --- /dev/null +++ b/native/core/benches/aggregate.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; + +use arrow::compute::filter_record_batch; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow_array::builder::{BooleanBuilder, Int32Builder, StringBuilder}; +use arrow_array::{ArrayRef, RecordBatch}; +use comet::execution::operators::comet_filter_record_batch; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use std::sync::Arc; +use std::time::Duration; + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("filter"); + + let num_rows = 8192; + let num_int_cols = 4; + let num_string_cols = 4; + + let batch = create_record_batch(num_rows, num_int_cols, num_string_cols); + + // create some different predicates + let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows); + let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows); + let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows); + for i in 0..num_rows { + predicate_select_few.append_value(i % 10 == 0); + predicate_select_many.append_value(i % 10 > 0); + predicate_select_all.append_value(true); + } + let predicate_select_few = predicate_select_few.finish(); + let predicate_select_many = predicate_select_many.finish(); + let predicate_select_all = predicate_select_all.finish(); + + // baseline uses Arrow's filter_record_batch method + group.bench_function("arrow_filter_record_batch - few rows selected", |b| { + b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) + }); + group.bench_function("arrow_filter_record_batch - many rows selected", |b| { + b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) + }); + group.bench_function("arrow_filter_record_batch - all rows selected", |b| { + b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) + }); + + group.bench_function("comet_filter_record_batch - few rows selected", |b| { + b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) + }); + group.bench_function("comet_filter_record_batch - many rows selected", |b| { + b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) + }); + group.bench_function("comet_filter_record_batch - all rows selected", |b| { + b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) + }); + + group.finish(); +} + +fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) -> RecordBatch { + let mut int32_builder = Int32Builder::with_capacity(num_rows); + let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32); + for i in 0..num_rows { + int32_builder.append_value(i as i32); + string_builder.append_value(format!("this is string #{i}")); + } + let int32_array = Arc::new(int32_builder.finish()); + let string_array = Arc::new(string_builder.finish()); + + let mut fields = vec![]; + let mut columns: Vec = vec![]; + let mut i = 0; + for _ in 0..num_int_cols { + fields.push(Field::new(format!("c{i}"), DataType::Int32, false)); + columns.push(int32_array.clone()); // note this is just copying a reference to the array + i += 1; + } + for _ in 0..num_string_cols { + fields.push(Field::new(format!("c{i}"), DataType::Utf8, false)); + columns.push(string_array.clone()); // note this is just copying a reference to the array + i += 1; + } + let schema = Schema::new(fields); + RecordBatch::try_new(Arc::new(schema), columns).unwrap() +} + +fn config() -> Criterion { + Criterion::default() + .measurement_time(Duration::from_millis(500)) + .warm_up_time(Duration::from_millis(500)) +} + +criterion_group! { + name = benches; + config = config(); + targets = criterion_benchmark +} +criterion_main!(benches); From b24794ed44fbd45fd273914db308bf3b590e3425 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Sep 2024 16:53:52 -0600 Subject: [PATCH 2/6] fix --- native/Cargo.lock | 2 + native/core/Cargo.toml | 6 +- native/core/benches/aggregate.rs | 109 +++++++++++++++++-------------- 3 files changed, 66 insertions(+), 51 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 3692f0488..601da3e3f 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -707,6 +707,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -719,6 +720,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 58fe00e75..68c470be6 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -80,7 +80,7 @@ datafusion-comet-proto = { workspace = true } [dev-dependencies] pprof = { version = "0.13.0", features = ["flamegraph"] } -criterion = "0.5.1" +criterion = { version = "0.5.1", features = ["async_tokio"] } jni = { version = "0.21", features = ["invocation"] } lazy_static = "1.4" assertables = "7" @@ -122,3 +122,7 @@ harness = false [[bench]] name = "filter" harness = false + +[[bench]] +name = "aggregate" +harness = false \ No newline at end of file diff --git a/native/core/benches/aggregate.rs b/native/core/benches/aggregate.rs index 10ad5a9e7..bf91fdc28 100644 --- a/native/core/benches/aggregate.rs +++ b/native/core/benches/aggregate.rs @@ -15,62 +15,73 @@ // specific language governing permissions and limitations // under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; -use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, Schema}; -use arrow_array::builder::{BooleanBuilder, Int32Builder, StringBuilder}; +use arrow_array::builder::{Int32Builder, StringBuilder}; use arrow_array::{ArrayRef, RecordBatch}; -use comet::execution::operators::comet_filter_record_batch; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::functions_aggregate::sum::sum_udaf; +use datafusion::physical_plan::aggregates::{AggregateMode, PhysicalGroupBy}; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; +use datafusion_physical_expr::expressions::Column; use std::sync::Arc; use std::time::Duration; +use tokio::runtime::Runtime; +use futures::StreamExt; fn criterion_benchmark(c: &mut Criterion) { - let mut group = c.benchmark_group("filter"); - + let mut group = c.benchmark_group("aggregate"); let num_rows = 8192; - let num_int_cols = 4; - let num_string_cols = 4; + let batch = create_record_batch(num_rows); + let mut batches = Vec::new(); + for _ in 0..10 { + batches.push(batch.clone()); + } + let partitions = &[batches]; + let scan : Arc = Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()); + let c0 = Arc::new(Column::new("c0", 0)); + let c1 = Arc::new(Column::new("c1", 1)); - let batch = create_record_batch(num_rows, num_int_cols, num_string_cols); + let schema = scan.schema(); - // create some different predicates - let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows); - let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows); - let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows); - for i in 0..num_rows { - predicate_select_few.append_value(i % 10 == 0); - predicate_select_many.append_value(i % 10 > 0); - predicate_select_all.append_value(true); - } - let predicate_select_few = predicate_select_few.finish(); - let predicate_select_many = predicate_select_many.finish(); - let predicate_select_all = predicate_select_all.finish(); + let aggr_expr = AggregateExprBuilder::new(sum_udaf(), vec![c1]) + .schema(schema.clone()) + .alias("sum") + .with_ignore_nulls(false) + .with_distinct(false) + .build() + .unwrap(); - // baseline uses Arrow's filter_record_batch method - group.bench_function("arrow_filter_record_batch - few rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) - }); - group.bench_function("arrow_filter_record_batch - many rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) - }); - group.bench_function("arrow_filter_record_batch - all rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) - }); + let aggregate = Arc::new( + datafusion::physical_plan::aggregates::AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![(c0, "c0".to_string())]), + vec![aggr_expr], + vec![None], // no filter expressions + scan, + Arc::clone(&schema), + ).unwrap() + ); - group.bench_function("comet_filter_record_batch - few rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) - }); - group.bench_function("comet_filter_record_batch - many rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) - }); - group.bench_function("comet_filter_record_batch - all rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) + let rt = Runtime::new().unwrap(); + + group.bench_function("aggregate - sum int", |b| { + b.to_async(&rt).iter(|| async { + let mut x = aggregate.execute(0, Arc::new(TaskContext::default())).unwrap(); + while let Some(batch) = x.next().await { + let _batch = batch.unwrap(); + } + }) }); group.finish(); } -fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) -> RecordBatch { + + +fn create_record_batch(num_rows: usize) -> RecordBatch { let mut int32_builder = Int32Builder::with_capacity(num_rows); let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32); for i in 0..num_rows { @@ -83,16 +94,14 @@ fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) let mut fields = vec![]; let mut columns: Vec = vec![]; let mut i = 0; - for _ in 0..num_int_cols { - fields.push(Field::new(format!("c{i}"), DataType::Int32, false)); - columns.push(int32_array.clone()); // note this is just copying a reference to the array - i += 1; - } - for _ in 0..num_string_cols { - fields.push(Field::new(format!("c{i}"), DataType::Utf8, false)); - columns.push(string_array.clone()); // note this is just copying a reference to the array - i += 1; - } + // string column + fields.push(Field::new(format!("c{i}"), DataType::Utf8, false)); + columns.push(string_array.clone()); // note this is just copying a reference to the array + i += 1; + // int column + fields.push(Field::new(format!("c{i}"), DataType::Int32, false)); + columns.push(int32_array.clone()); // note this is just copying a reference to the array + // i += 1; let schema = Schema::new(fields); RecordBatch::try_new(Arc::new(schema), columns).unwrap() } From af9fc152b5fd16762c21cd716e7a436a3e9246ed Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Sep 2024 17:15:43 -0600 Subject: [PATCH 3/6] fix --- native/core/benches/aggregate.rs | 128 ++++++++++++++++++++++--------- 1 file changed, 91 insertions(+), 37 deletions(-) diff --git a/native/core/benches/aggregate.rs b/native/core/benches/aggregate.rs index bf91fdc28..da3c54c21 100644 --- a/native/core/benches/aggregate.rs +++ b/native/core/benches/aggregate.rs @@ -16,20 +16,24 @@ // under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; use arrow::datatypes::{DataType, Field, Schema}; -use arrow_array::builder::{Int32Builder, StringBuilder}; +use arrow_array::builder::{Decimal128Builder, StringBuilder}; use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::SchemaRef; +use comet::execution::datafusion::expressions::sum_decimal::SumDecimal; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::functions_aggregate::sum::sum_udaf; -use datafusion::physical_plan::aggregates::{AggregateMode, PhysicalGroupBy}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion_execution::TaskContext; +use datafusion_expr::AggregateUDF; use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::Column; +use futures::StreamExt; use std::sync::Arc; use std::time::Duration; use tokio::runtime::Runtime; -use futures::StreamExt; fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("aggregate"); @@ -40,13 +44,72 @@ fn criterion_benchmark(c: &mut Criterion) { batches.push(batch.clone()); } let partitions = &[batches]; - let scan : Arc = Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()); - let c0 = Arc::new(Column::new("c0", 0)); - let c1 = Arc::new(Column::new("c1", 1)); + let scan: Arc = + Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()); + let schema = scan.schema().clone(); - let schema = scan.schema(); + let c0: Arc = Arc::new(Column::new("c0", 0)); + let c1: Arc = Arc::new(Column::new("c1", 1)); - let aggr_expr = AggregateExprBuilder::new(sum_udaf(), vec![c1]) + let rt = Runtime::new().unwrap(); + + let datafusion_sum_decimal = sum_udaf(); + group.bench_function("aggregate - sum decimal (DataFusion)", |b| { + b.to_async(&rt).iter(|| async { + let scan: Arc = + Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()); + let aggregate = create_aggregate( + scan, + c0.clone(), + c1.clone(), + &schema, + datafusion_sum_decimal.clone(), + ); + let mut x = aggregate + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + while let Some(batch) = x.next().await { + let _batch = batch.unwrap(); + } + }) + }); + + let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl(SumDecimal::new( + "sum", + Arc::clone(&c1), + DataType::Decimal128(7, 2), + ))); + group.bench_function("aggregate - sum decimal (Comet)", |b| { + b.to_async(&rt).iter(|| async { + let scan: Arc = + Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()); + let aggregate = create_aggregate( + scan, + c0.clone(), + c1.clone(), + &schema, + comet_sum_decimal.clone(), + ); + let mut x = aggregate + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + while let Some(batch) = x.next().await { + let _batch = batch.unwrap(); + } + }) + }); + + group.finish(); +} + +fn create_aggregate( + scan: Arc, + c0: Arc, + c1: Arc, + schema: &SchemaRef, + aggregate_udf: Arc, +) -> Arc { + let aggr_expr = AggregateExprBuilder::new(aggregate_udf, vec![c1]) .schema(schema.clone()) .alias("sum") .with_ignore_nulls(false) @@ -55,53 +118,44 @@ fn criterion_benchmark(c: &mut Criterion) { .unwrap(); let aggregate = Arc::new( - datafusion::physical_plan::aggregates::AggregateExec::try_new( + AggregateExec::try_new( AggregateMode::Partial, PhysicalGroupBy::new_single(vec![(c0, "c0".to_string())]), vec![aggr_expr], vec![None], // no filter expressions scan, Arc::clone(&schema), - ).unwrap() + ) + .unwrap(), ); - - let rt = Runtime::new().unwrap(); - - group.bench_function("aggregate - sum int", |b| { - b.to_async(&rt).iter(|| async { - let mut x = aggregate.execute(0, Arc::new(TaskContext::default())).unwrap(); - while let Some(batch) = x.next().await { - let _batch = batch.unwrap(); - } - }) - }); - - group.finish(); + aggregate } - - fn create_record_batch(num_rows: usize) -> RecordBatch { - let mut int32_builder = Int32Builder::with_capacity(num_rows); + let mut decimal_builder = Decimal128Builder::with_capacity(num_rows); let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32); for i in 0..num_rows { - int32_builder.append_value(i as i32); - string_builder.append_value(format!("this is string #{i}")); + decimal_builder.append_value(i as i128); + string_builder.append_value(format!("this is string #{}", i % 1024)); } - let int32_array = Arc::new(int32_builder.finish()); + let decimal_array = Arc::new(decimal_builder.finish()); let string_array = Arc::new(string_builder.finish()); let mut fields = vec![]; let mut columns: Vec = vec![]; - let mut i = 0; + // string column - fields.push(Field::new(format!("c{i}"), DataType::Utf8, false)); - columns.push(string_array.clone()); // note this is just copying a reference to the array - i += 1; - // int column - fields.push(Field::new(format!("c{i}"), DataType::Int32, false)); - columns.push(int32_array.clone()); // note this is just copying a reference to the array - // i += 1; + fields.push(Field::new(format!("c0"), DataType::Utf8, false)); + columns.push(Arc::clone(&string_array)); + + // decimal column + fields.push(Field::new( + format!("c1"), + DataType::Decimal128(38, 10), + false, + )); + columns.push(Arc::clone(&decimal_array)); + let schema = Schema::new(fields); RecordBatch::try_new(Arc::new(schema), columns).unwrap() } From 36260ad2ccb3b005208a663389ece02f32d8a62f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Sep 2024 18:25:26 -0600 Subject: [PATCH 4/6] refactor --- native/core/benches/aggregate.rs | 72 +++++++++++++++----------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/native/core/benches/aggregate.rs b/native/core/benches/aggregate.rs index da3c54c21..5b31748d3 100644 --- a/native/core/benches/aggregate.rs +++ b/native/core/benches/aggregate.rs @@ -44,64 +44,60 @@ fn criterion_benchmark(c: &mut Criterion) { batches.push(batch.clone()); } let partitions = &[batches]; - let scan: Arc = - Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()); - let schema = scan.schema().clone(); - let c0: Arc = Arc::new(Column::new("c0", 0)); let c1: Arc = Arc::new(Column::new("c1", 1)); let rt = Runtime::new().unwrap(); - let datafusion_sum_decimal = sum_udaf(); group.bench_function("aggregate - sum decimal (DataFusion)", |b| { - b.to_async(&rt).iter(|| async { - let scan: Arc = - Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()); - let aggregate = create_aggregate( - scan, + let datafusion_sum_decimal = sum_udaf(); + b.to_async(&rt).iter(|| { + agg_test( + partitions, c0.clone(), c1.clone(), - &schema, datafusion_sum_decimal.clone(), - ); - let mut x = aggregate - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); - while let Some(batch) = x.next().await { - let _batch = batch.unwrap(); - } + ) }) }); - let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl(SumDecimal::new( - "sum", - Arc::clone(&c1), - DataType::Decimal128(7, 2), - ))); group.bench_function("aggregate - sum decimal (Comet)", |b| { - b.to_async(&rt).iter(|| async { - let scan: Arc = - Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()); - let aggregate = create_aggregate( - scan, + let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl(SumDecimal::new( + "sum", + Arc::clone(&c1), + DataType::Decimal128(7, 2), + ))); + b.to_async(&rt).iter(|| { + agg_test( + partitions, c0.clone(), c1.clone(), - &schema, comet_sum_decimal.clone(), - ); - let mut x = aggregate - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); - while let Some(batch) = x.next().await { - let _batch = batch.unwrap(); - } + ) }) }); group.finish(); } +async fn agg_test( + partitions: &[Vec], + c0: Arc, + c1: Arc, + aggregate_udf: Arc, +) { + let schema = &partitions[0][0].schema(); + let scan: Arc = + Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap()); + let aggregate = create_aggregate(scan, c0.clone(), c1.clone(), &schema, aggregate_udf); + let mut stream = aggregate + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + while let Some(batch) = stream.next().await { + let _batch = batch.unwrap(); + } +} + fn create_aggregate( scan: Arc, c0: Arc, @@ -146,7 +142,7 @@ fn create_record_batch(num_rows: usize) -> RecordBatch { // string column fields.push(Field::new(format!("c0"), DataType::Utf8, false)); - columns.push(Arc::clone(&string_array)); + columns.push(string_array); // decimal column fields.push(Field::new( @@ -154,7 +150,7 @@ fn create_record_batch(num_rows: usize) -> RecordBatch { DataType::Decimal128(38, 10), false, )); - columns.push(Arc::clone(&decimal_array)); + columns.push(decimal_array); let schema = Schema::new(fields); RecordBatch::try_new(Arc::new(schema), columns).unwrap() From e0d3a58a8f0887c242a9cd3ca94c2b9b2c6979f3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Sep 2024 07:32:53 -0600 Subject: [PATCH 5/6] avg --- native/core/benches/aggregate.rs | 62 ++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/native/core/benches/aggregate.rs b/native/core/benches/aggregate.rs index 5b31748d3..605e4cb00 100644 --- a/native/core/benches/aggregate.rs +++ b/native/core/benches/aggregate.rs @@ -19,8 +19,10 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::builder::{Decimal128Builder, StringBuilder}; use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::SchemaRef; +use comet::execution::datafusion::expressions::avg_decimal::AvgDecimal; use comet::execution::datafusion::expressions::sum_decimal::SumDecimal; use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::functions_aggregate::average::avg_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; @@ -49,7 +51,38 @@ fn criterion_benchmark(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - group.bench_function("aggregate - sum decimal (DataFusion)", |b| { + group.bench_function("avg_decimal_datafusion", |b| { + let datafusion_sum_decimal = avg_udaf(); + b.to_async(&rt).iter(|| { + agg_test( + partitions, + c0.clone(), + c1.clone(), + datafusion_sum_decimal.clone(), + "avg", + ) + }) + }); + + group.bench_function("avg_decimal_comet", |b| { + let comet_avg_decimal = Arc::new(AggregateUDF::new_from_impl(AvgDecimal::new( + Arc::clone(&c1), + "avg", + DataType::Decimal128(38, 10), + DataType::Decimal128(38, 10), + ))); + b.to_async(&rt).iter(|| { + agg_test( + partitions, + c0.clone(), + c1.clone(), + comet_avg_decimal.clone(), + "avg", + ) + }) + }); + + group.bench_function("sum_decimal_datafusion", |b| { let datafusion_sum_decimal = sum_udaf(); b.to_async(&rt).iter(|| { agg_test( @@ -57,15 +90,16 @@ fn criterion_benchmark(c: &mut Criterion) { c0.clone(), c1.clone(), datafusion_sum_decimal.clone(), + "sum", ) }) }); - group.bench_function("aggregate - sum decimal (Comet)", |b| { + group.bench_function("sum_decimal_comet", |b| { let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl(SumDecimal::new( "sum", Arc::clone(&c1), - DataType::Decimal128(7, 2), + DataType::Decimal128(38, 10), ))); b.to_async(&rt).iter(|| { agg_test( @@ -73,6 +107,7 @@ fn criterion_benchmark(c: &mut Criterion) { c0.clone(), c1.clone(), comet_sum_decimal.clone(), + "sum", ) }) }); @@ -85,11 +120,12 @@ async fn agg_test( c0: Arc, c1: Arc, aggregate_udf: Arc, + alias: &str, ) { let schema = &partitions[0][0].schema(); let scan: Arc = Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap()); - let aggregate = create_aggregate(scan, c0.clone(), c1.clone(), &schema, aggregate_udf); + let aggregate = create_aggregate(scan, c0.clone(), c1.clone(), schema, aggregate_udf, alias); let mut stream = aggregate .execute(0, Arc::new(TaskContext::default())) .unwrap(); @@ -104,27 +140,27 @@ fn create_aggregate( c1: Arc, schema: &SchemaRef, aggregate_udf: Arc, + alias: &str, ) -> Arc { let aggr_expr = AggregateExprBuilder::new(aggregate_udf, vec![c1]) .schema(schema.clone()) - .alias("sum") + .alias(alias) .with_ignore_nulls(false) .with_distinct(false) .build() .unwrap(); - let aggregate = Arc::new( + Arc::new( AggregateExec::try_new( AggregateMode::Partial, PhysicalGroupBy::new_single(vec![(c0, "c0".to_string())]), vec![aggr_expr], vec![None], // no filter expressions scan, - Arc::clone(&schema), + Arc::clone(schema), ) .unwrap(), - ); - aggregate + ) } fn create_record_batch(num_rows: usize) -> RecordBatch { @@ -141,15 +177,11 @@ fn create_record_batch(num_rows: usize) -> RecordBatch { let mut columns: Vec = vec![]; // string column - fields.push(Field::new(format!("c0"), DataType::Utf8, false)); + fields.push(Field::new("c0", DataType::Utf8, false)); columns.push(string_array); // decimal column - fields.push(Field::new( - format!("c1"), - DataType::Decimal128(38, 10), - false, - )); + fields.push(Field::new("c1", DataType::Decimal128(38, 10), false)); columns.push(decimal_array); let schema = Schema::new(fields); From 13312aad3b24922f228bdeea2878be54c2e695df Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Sep 2024 13:22:03 -0600 Subject: [PATCH 6/6] address feedback --- native/core/Cargo.toml | 2 +- native/core/benches/aggregate.rs | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 68c470be6..13f6b135f 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -125,4 +125,4 @@ harness = false [[bench]] name = "aggregate" -harness = false \ No newline at end of file +harness = false diff --git a/native/core/benches/aggregate.rs b/native/core/benches/aggregate.rs index 605e4cb00..e6b3e3155 100644 --- a/native/core/benches/aggregate.rs +++ b/native/core/benches/aggregate.rs @@ -21,7 +21,7 @@ use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::SchemaRef; use comet::execution::datafusion::expressions::avg_decimal::AvgDecimal; use comet::execution::datafusion::expressions::sum_decimal::SumDecimal; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion::functions_aggregate::average::avg_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::physical_expr::PhysicalExpr; @@ -54,13 +54,13 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function("avg_decimal_datafusion", |b| { let datafusion_sum_decimal = avg_udaf(); b.to_async(&rt).iter(|| { - agg_test( + black_box(agg_test( partitions, c0.clone(), c1.clone(), datafusion_sum_decimal.clone(), "avg", - ) + )) }) }); @@ -72,26 +72,26 @@ fn criterion_benchmark(c: &mut Criterion) { DataType::Decimal128(38, 10), ))); b.to_async(&rt).iter(|| { - agg_test( + black_box(agg_test( partitions, c0.clone(), c1.clone(), comet_avg_decimal.clone(), "avg", - ) + )) }) }); group.bench_function("sum_decimal_datafusion", |b| { let datafusion_sum_decimal = sum_udaf(); b.to_async(&rt).iter(|| { - agg_test( + black_box(agg_test( partitions, c0.clone(), c1.clone(), datafusion_sum_decimal.clone(), "sum", - ) + )) }) }); @@ -102,13 +102,13 @@ fn criterion_benchmark(c: &mut Criterion) { DataType::Decimal128(38, 10), ))); b.to_async(&rt).iter(|| { - agg_test( + black_box(agg_test( partitions, c0.clone(), c1.clone(), comet_sum_decimal.clone(), "sum", - ) + )) }) });