Skip to content

Commit

Permalink
perf: Add criterion benchmark for aggregate expressions (#948)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Sep 18, 2024
1 parent c6efe00 commit d5116a1
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 1 deletion.
2 changes: 2 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ datafusion-comet-proto = { workspace = true }

[dev-dependencies]
pprof = { version = "0.13.0", features = ["flamegraph"] }
criterion = "0.5.1"
criterion = { version = "0.5.1", features = ["async_tokio"] }
jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "7"
Expand Down Expand Up @@ -122,3 +122,7 @@ harness = false
[[bench]]
name = "filter"
harness = false

[[bench]]
name = "aggregate"
harness = false
202 changes: 202 additions & 0 deletions native/core/benches/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder};

use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::{Decimal128Builder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
use comet::execution::datafusion::expressions::avg_decimal::AvgDecimal;
use comet::execution::datafusion::expressions::sum_decimal::SumDecimal;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_execution::TaskContext;
use datafusion_expr::AggregateUDF;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::Column;
use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("aggregate");
let num_rows = 8192;
let batch = create_record_batch(num_rows);
let mut batches = Vec::new();
for _ in 0..10 {
batches.push(batch.clone());
}
let partitions = &[batches];
let c0: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c0", 0));
let c1: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c1", 1));

let rt = Runtime::new().unwrap();

group.bench_function("avg_decimal_datafusion", |b| {
let datafusion_sum_decimal = avg_udaf();
b.to_async(&rt).iter(|| {
black_box(agg_test(
partitions,
c0.clone(),
c1.clone(),
datafusion_sum_decimal.clone(),
"avg",
))
})
});

group.bench_function("avg_decimal_comet", |b| {
let comet_avg_decimal = Arc::new(AggregateUDF::new_from_impl(AvgDecimal::new(
Arc::clone(&c1),
"avg",
DataType::Decimal128(38, 10),
DataType::Decimal128(38, 10),
)));
b.to_async(&rt).iter(|| {
black_box(agg_test(
partitions,
c0.clone(),
c1.clone(),
comet_avg_decimal.clone(),
"avg",
))
})
});

group.bench_function("sum_decimal_datafusion", |b| {
let datafusion_sum_decimal = sum_udaf();
b.to_async(&rt).iter(|| {
black_box(agg_test(
partitions,
c0.clone(),
c1.clone(),
datafusion_sum_decimal.clone(),
"sum",
))
})
});

group.bench_function("sum_decimal_comet", |b| {
let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl(SumDecimal::new(
"sum",
Arc::clone(&c1),
DataType::Decimal128(38, 10),
)));
b.to_async(&rt).iter(|| {
black_box(agg_test(
partitions,
c0.clone(),
c1.clone(),
comet_sum_decimal.clone(),
"sum",
))
})
});

group.finish();
}

async fn agg_test(
partitions: &[Vec<RecordBatch>],
c0: Arc<dyn PhysicalExpr>,
c1: Arc<dyn PhysicalExpr>,
aggregate_udf: Arc<AggregateUDF>,
alias: &str,
) {
let schema = &partitions[0][0].schema();
let scan: Arc<dyn ExecutionPlan> =
Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap());
let aggregate = create_aggregate(scan, c0.clone(), c1.clone(), schema, aggregate_udf, alias);
let mut stream = aggregate
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
while let Some(batch) = stream.next().await {
let _batch = batch.unwrap();
}
}

fn create_aggregate(
scan: Arc<dyn ExecutionPlan>,
c0: Arc<dyn PhysicalExpr>,
c1: Arc<dyn PhysicalExpr>,
schema: &SchemaRef,
aggregate_udf: Arc<AggregateUDF>,
alias: &str,
) -> Arc<AggregateExec> {
let aggr_expr = AggregateExprBuilder::new(aggregate_udf, vec![c1])
.schema(schema.clone())
.alias(alias)
.with_ignore_nulls(false)
.with_distinct(false)
.build()
.unwrap();

Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::new_single(vec![(c0, "c0".to_string())]),
vec![aggr_expr],
vec![None], // no filter expressions
scan,
Arc::clone(schema),
)
.unwrap(),
)
}

fn create_record_batch(num_rows: usize) -> RecordBatch {
let mut decimal_builder = Decimal128Builder::with_capacity(num_rows);
let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
for i in 0..num_rows {
decimal_builder.append_value(i as i128);
string_builder.append_value(format!("this is string #{}", i % 1024));
}
let decimal_array = Arc::new(decimal_builder.finish());
let string_array = Arc::new(string_builder.finish());

let mut fields = vec![];
let mut columns: Vec<ArrayRef> = vec![];

// string column
fields.push(Field::new("c0", DataType::Utf8, false));
columns.push(string_array);

// decimal column
fields.push(Field::new("c1", DataType::Decimal128(38, 10), false));
columns.push(decimal_array);

let schema = Schema::new(fields);
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}

fn config() -> Criterion {
Criterion::default()
.measurement_time(Duration::from_millis(500))
.warm_up_time(Duration::from_millis(500))
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);

0 comments on commit d5116a1

Please sign in to comment.