diff --git a/native/Cargo.lock b/native/Cargo.lock index 3692f0488..601da3e3f 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -707,6 +707,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -719,6 +720,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 58fe00e75..13f6b135f 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -80,7 +80,7 @@ datafusion-comet-proto = { workspace = true } [dev-dependencies] pprof = { version = "0.13.0", features = ["flamegraph"] } -criterion = "0.5.1" +criterion = { version = "0.5.1", features = ["async_tokio"] } jni = { version = "0.21", features = ["invocation"] } lazy_static = "1.4" assertables = "7" @@ -122,3 +122,7 @@ harness = false [[bench]] name = "filter" harness = false + +[[bench]] +name = "aggregate" +harness = false diff --git a/native/core/benches/aggregate.rs b/native/core/benches/aggregate.rs new file mode 100644 index 000000000..e6b3e3155 --- /dev/null +++ b/native/core/benches/aggregate.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; + +use arrow::datatypes::{DataType, Field, Schema}; +use arrow_array::builder::{Decimal128Builder, StringBuilder}; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::SchemaRef; +use comet::execution::datafusion::expressions::avg_decimal::AvgDecimal; +use comet::execution::datafusion::expressions::sum_decimal::SumDecimal; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion::functions_aggregate::average::avg_udaf; +use datafusion::functions_aggregate::sum::sum_udaf; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_execution::TaskContext; +use datafusion_expr::AggregateUDF; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; +use datafusion_physical_expr::expressions::Column; +use futures::StreamExt; +use std::sync::Arc; +use std::time::Duration; +use tokio::runtime::Runtime; + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("aggregate"); + let num_rows = 8192; + let batch = create_record_batch(num_rows); + let mut batches = Vec::new(); + for _ in 0..10 { + batches.push(batch.clone()); + } + let partitions = &[batches]; + let c0: Arc = Arc::new(Column::new("c0", 0)); + let c1: Arc = Arc::new(Column::new("c1", 1)); + + let rt = Runtime::new().unwrap(); + + group.bench_function("avg_decimal_datafusion", |b| { + let datafusion_sum_decimal = avg_udaf(); + b.to_async(&rt).iter(|| { + black_box(agg_test( + partitions, + c0.clone(), + c1.clone(), + datafusion_sum_decimal.clone(), + "avg", + )) + }) + }); + + group.bench_function("avg_decimal_comet", |b| { + let comet_avg_decimal = Arc::new(AggregateUDF::new_from_impl(AvgDecimal::new( + Arc::clone(&c1), + "avg", + DataType::Decimal128(38, 10), + DataType::Decimal128(38, 10), + ))); + b.to_async(&rt).iter(|| { + black_box(agg_test( + partitions, + c0.clone(), + c1.clone(), + comet_avg_decimal.clone(), + "avg", + )) + }) + }); + + group.bench_function("sum_decimal_datafusion", |b| { + let datafusion_sum_decimal = sum_udaf(); + b.to_async(&rt).iter(|| { + black_box(agg_test( + partitions, + c0.clone(), + c1.clone(), + datafusion_sum_decimal.clone(), + "sum", + )) + }) + }); + + group.bench_function("sum_decimal_comet", |b| { + let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl(SumDecimal::new( + "sum", + Arc::clone(&c1), + DataType::Decimal128(38, 10), + ))); + b.to_async(&rt).iter(|| { + black_box(agg_test( + partitions, + c0.clone(), + c1.clone(), + comet_sum_decimal.clone(), + "sum", + )) + }) + }); + + group.finish(); +} + +async fn agg_test( + partitions: &[Vec], + c0: Arc, + c1: Arc, + aggregate_udf: Arc, + alias: &str, +) { + let schema = &partitions[0][0].schema(); + let scan: Arc = + Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap()); + let aggregate = create_aggregate(scan, c0.clone(), c1.clone(), schema, aggregate_udf, alias); + let mut stream = aggregate + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + while let Some(batch) = stream.next().await { + let _batch = batch.unwrap(); + } +} + +fn create_aggregate( + scan: Arc, + c0: Arc, + c1: Arc, + schema: &SchemaRef, + aggregate_udf: Arc, + alias: &str, +) -> Arc { + let aggr_expr = AggregateExprBuilder::new(aggregate_udf, vec![c1]) + .schema(schema.clone()) + .alias(alias) + .with_ignore_nulls(false) + .with_distinct(false) + .build() + .unwrap(); + + Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![(c0, "c0".to_string())]), + vec![aggr_expr], + vec![None], // no filter expressions + scan, + Arc::clone(schema), + ) + .unwrap(), + ) +} + +fn create_record_batch(num_rows: usize) -> RecordBatch { + let mut decimal_builder = Decimal128Builder::with_capacity(num_rows); + let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32); + for i in 0..num_rows { + decimal_builder.append_value(i as i128); + string_builder.append_value(format!("this is string #{}", i % 1024)); + } + let decimal_array = Arc::new(decimal_builder.finish()); + let string_array = Arc::new(string_builder.finish()); + + let mut fields = vec![]; + let mut columns: Vec = vec![]; + + // string column + fields.push(Field::new("c0", DataType::Utf8, false)); + columns.push(string_array); + + // decimal column + fields.push(Field::new("c1", DataType::Decimal128(38, 10), false)); + columns.push(decimal_array); + + let schema = Schema::new(fields); + RecordBatch::try_new(Arc::new(schema), columns).unwrap() +} + +fn config() -> Criterion { + Criterion::default() + .measurement_time(Duration::from_millis(500)) + .warm_up_time(Duration::from_millis(500)) +} + +criterion_group! { + name = benches; + config = config(); + targets = criterion_benchmark +} +criterion_main!(benches);