Skip to content

Commit

Permalink
feat(stream): bench stream hash agg (#8808)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Mar 29, 2023
1 parent 81b306b commit a4a46fd
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 79 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl FieldGeneratorImpl {
DataType::Float64 => Ok(FieldGeneratorImpl::F64Random(F64RandomField::new(
min, max, seed,
)?)),
_ => unimplemented!(),
_ => unimplemented!("DataType: {}", data_type),
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,10 @@ workspace-hack = { path = "../workspace-hack" }

[dev-dependencies]
assert_matches = "1"
criterion = { version = "0.4", features = ["async_tokio", "async"] }
risingwave_hummock_test = { path = "../storage/hummock_test", features = ["test"] }
tracing-test = "0.2"

[[bench]]
name = "hash_agg"
harness = false
121 changes: 121 additions & 0 deletions src/stream/benches/hash_agg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion};
use futures::executor::block_on;
use futures::StreamExt;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_expr::expr::*;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::StateStore;
use risingwave_stream::executor::aggregation::{AggArgs, AggCall};
use risingwave_stream::executor::test_utils::agg_executor::new_boxed_hash_agg_executor;
use risingwave_stream::executor::test_utils::*;
use risingwave_stream::executor::{BoxedExecutor, PkIndices};
use tokio::runtime::Runtime;

fn bench_hash_agg(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("benchmark_hash_agg", |b| {
b.to_async(&rt).iter_batched(
|| setup_bench_hash_agg(MemoryStateStore::new()),
|e| execute_executor(e),
BatchSize::SmallInput,
)
});
}

fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {
// ---- Define hash agg executor parameters ----
let data_types = vec![DataType::Int64; 3];
let schema = Schema {
fields: vec![Field::unnamed(DataType::Int64); 3],
};

let group_key_indices = vec![0];

let append_only = false;

let agg_calls = vec![
AggCall {
kind: AggKind::Count, // as row count, index: 0
args: AggArgs::None,
return_type: DataType::Int64,
column_orders: vec![],
append_only,
filter: None,
distinct: false,
},
AggCall {
kind: AggKind::Sum,
args: AggArgs::Unary(DataType::Int64, 1),
return_type: DataType::Int64,
column_orders: vec![],
append_only,
filter: None,
distinct: false,
},
AggCall {
kind: AggKind::Sum,
args: AggArgs::Unary(DataType::Int64, 2),
return_type: DataType::Int64,
column_orders: vec![],
append_only,
filter: None,
distinct: false,
},
];

// ---- Generate Data ----
let num_of_chunks = 1000;
let chunk_size = 1024;
let chunks = gen_data(num_of_chunks, chunk_size, &data_types);

// ---- Create MockSourceExecutor ----
let (mut tx, source) = MockSource::channel(schema, PkIndices::new());
tx.push_barrier(1, false);
for chunk in chunks {
tx.push_chunk(chunk);
}
tx.push_barrier_with_prev_epoch_for_test(1002, 1, false);

// ---- Create HashAggExecutor to be benchmarked ----
let row_count_index = 0;
let pk_indices = vec![];
let extreme_cache_size = 1024;
let executor_id = 1;

block_on(new_boxed_hash_agg_executor(
store,
Box::new(source),
agg_calls,
row_count_index,
group_key_indices,
pk_indices,
extreme_cache_size,
executor_id,
))
}

pub async fn execute_executor(executor: BoxedExecutor) {
let mut stream = executor.execute();
while let Some(ret) = stream.next().await {
_ = black_box(ret.unwrap());
}
}

criterion_group!(benches, bench_hash_agg);
criterion_main!(benches);
81 changes: 7 additions & 74 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ use super::aggregation::{
OnlyOutputIfHasInput,
};
use super::{
expect_first_barrier, ActorContextRef, Executor, ExecutorInfo, PkIndicesRef,
StreamExecutorResult, Watermark,
expect_first_barrier, ActorContextRef, ExecutorInfo, PkIndicesRef, StreamExecutorResult,
Watermark,
};
use crate::cache::{cache_may_stale, new_with_hasher, ExecutorCache};
use crate::common::table::state_table::StateTable;
use crate::error::StreamResult;
use crate::executor::aggregation::{generate_agg_schema, AggCall, AggGroup as GenericAggGroup};
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{BoxedMessageStream, Message};
use crate::executor::{BoxedMessageStream, Executor, Message};
use crate::task::AtomicU64Ref;

type AggGroup<S> = GenericAggGroup<S, OnlyOutputIfHasInput>;
Expand Down Expand Up @@ -593,92 +593,25 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
pub mod tests {

use assert_matches::assert_matches;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::array::stream_chunk::StreamChunkTestExt;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::hash::SerializedKey;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{AscentOwnedRow, OwnedRow, Row};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::*;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::StateStore;

use crate::executor::agg_common::{AggExecutorArgs, GroupAggExecutorExtraArgs};
use crate::executor::aggregation::{AggArgs, AggCall};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::test_utils::agg_executor::{
create_agg_state_storage, create_result_table,
};
use crate::executor::test_utils::agg_executor::new_boxed_hash_agg_executor;
use crate::executor::test_utils::*;
use crate::executor::{ActorContext, Executor, HashAggExecutor, Message, PkIndices};

#[allow(clippy::too_many_arguments)]
async fn new_boxed_hash_agg_executor<S: StateStore>(
store: S,
input: Box<dyn Executor>,
agg_calls: Vec<AggCall>,
row_count_index: usize,
group_key_indices: Vec<usize>,
pk_indices: PkIndices,
extreme_cache_size: usize,
executor_id: u64,
) -> Box<dyn Executor> {
let mut storages = Vec::with_capacity(agg_calls.iter().len());
for (idx, agg_call) in agg_calls.iter().enumerate() {
storages.push(
create_agg_state_storage(
store.clone(),
TableId::new(idx as u32),
agg_call,
&group_key_indices,
&pk_indices,
input.as_ref(),
)
.await,
)
}

let result_table = create_result_table(
store,
TableId::new(agg_calls.len() as u32),
&agg_calls,
&group_key_indices,
input.as_ref(),
)
.await;

HashAggExecutor::<SerializedKey, S>::new(AggExecutorArgs {
input,
actor_ctx: ActorContext::create(123),
pk_indices,
executor_id,

extreme_cache_size,

agg_calls,
row_count_index,
storages,
result_table,
distinct_dedup_tables: Default::default(),
watermark_epoch: Arc::new(AtomicU64::new(0)),

extra: GroupAggExecutorExtraArgs {
group_key_indices,
chunk_size: 1024,
metrics: Arc::new(StreamingMetrics::unused()),
},
})
.unwrap()
.boxed()
}
use crate::executor::{Message, PkIndices};

// --- Test HashAgg with in-memory StateStore ---

Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ mod wrapper;
mod backfill;
#[cfg(test)]
mod integration_tests;
#[cfg(test)]
mod test_utils;
pub mod test_utils;

pub use actor::{Actor, ActorContext, ActorContextRef};
use anyhow::Context;
Expand Down
Loading

0 comments on commit a4a46fd

Please sign in to comment.