Skip to content

Commit

Permalink
refactor(agg): clean up unused fields & refactor (#3339)
Browse files Browse the repository at this point in the history
* refactor(agg): clean up unused fields

* delete file

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
BowenXiao1999 and mergify[bot] authored Jun 21, 2022
1 parent e48dced commit 309ce36
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 354 deletions.
15 changes: 1 addition & 14 deletions src/stream/src/executor/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub use agg_call::*;
pub use agg_state::*;
use dyn_clone::{self, DynClone};
pub use foldable::*;
use itertools::Itertools;
use risingwave_common::array::column::Column;
use risingwave_common::array::stream_chunk::Ops;
use risingwave_common::array::{
Expand Down Expand Up @@ -433,7 +432,6 @@ pub fn generate_state_table<S: StateStore>(
pub async fn generate_managed_agg_state<S: StateStore>(
key: Option<&Row>,
agg_calls: &[AggCall],
keyspace: &[Keyspace<S>],
pk_data_types: PkDataTypes,
epoch: u64,
key_hash_code: Option<HashCode>,
Expand All @@ -445,20 +443,9 @@ pub async fn generate_managed_agg_state<S: StateStore>(
const_assert_eq!(ROW_COUNT_COLUMN, 0);
let mut row_count = None;

for ((idx, agg_call), keyspace) in agg_calls.iter().enumerate().zip_eq(keyspace) {
// TODO: in pure in-memory engine, we should not do this serialization.

// The prefix of the state is `table_id/[group_key]`
let keyspace = if let Some(key) = key {
let bytes = key.serialize().unwrap();
keyspace.append(bytes)
} else {
keyspace.clone()
};

for (idx, agg_call) in agg_calls.iter().enumerate() {
let mut managed_state = ManagedStateImpl::create_managed_state(
agg_call.clone(),
keyspace,
row_count,
pk_data_types.clone(),
idx == ROW_COUNT_COLUMN,
Expand Down
26 changes: 4 additions & 22 deletions src/stream/src/executor/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ pub struct SimpleAggExecutor<S: StateStore> {
/// Schema from input
input_schema: Schema,

/// The executor operates on this keyspace.
keyspace: Vec<Keyspace<S>>,

/// Aggregation states of the current operator.
/// This is an `Option` and the initial state is built when `Executor::next` is called, since
/// we may not want `Self::new` to be an `async` function.
Expand Down Expand Up @@ -125,7 +122,6 @@ impl<S: StateStore> SimpleAggExecutor<S> {
},
input_pk_indices: input_info.pk_indices,
input_schema: input_info.schema,
keyspace,
states: None,
agg_calls,
key_indices,
Expand All @@ -139,7 +135,6 @@ impl<S: StateStore> SimpleAggExecutor<S> {
input_pk_indices: &[usize],
input_schema: &Schema,
states: &mut Option<AggState<S>>,
keyspace: &[Keyspace<S>],
chunk: StreamChunk,
epoch: u64,
state_tables: &mut [StateTable<S>],
Expand Down Expand Up @@ -169,7 +164,6 @@ impl<S: StateStore> SimpleAggExecutor<S> {
let state = generate_managed_agg_state(
None,
agg_calls,
keyspace,
input_pk_data_types,
epoch,
None,
Expand Down Expand Up @@ -201,12 +195,9 @@ impl<S: StateStore> SimpleAggExecutor<S> {
async fn flush_data(
schema: &Schema,
states: &mut Option<AggState<S>>,
keyspace: &[Keyspace<S>],
epoch: u64,
state_tables: &mut [StateTable<S>],
) -> StreamExecutorResult<Option<StreamChunk>> {
// The state store of each keyspace is the same so just need the first.
let store = keyspace[0].state_store();
// --- Flush states to the state store ---
// Some state will have the correct output only after their internal states have been fully
// flushed.
Expand All @@ -216,15 +207,13 @@ impl<S: StateStore> SimpleAggExecutor<S> {
_ => return Ok(None), // Nothing to flush.
};

let mut write_batch = store.start_write_batch();
for (state, state_table) in states
.managed_states
.iter_mut()
.zip_eq(state_tables.iter_mut())
{
state.flush(&mut write_batch, state_table).await?;
state.flush(state_table).await?;
}
write_batch.ingest(epoch).await?;

// Batch commit state tables.
for state_table in state_tables.iter_mut() {
Expand Down Expand Up @@ -259,7 +248,6 @@ impl<S: StateStore> SimpleAggExecutor<S> {
info,
input_pk_indices,
input_schema,
keyspace,
mut states,
agg_calls,
key_indices: _,
Expand All @@ -281,7 +269,6 @@ impl<S: StateStore> SimpleAggExecutor<S> {
&input_pk_indices,
&input_schema,
&mut states,
&keyspace,
chunk,
epoch,
&mut state_tables,
Expand All @@ -290,14 +277,9 @@ impl<S: StateStore> SimpleAggExecutor<S> {
}
Message::Barrier(barrier) => {
let next_epoch = barrier.epoch.curr;
if let Some(chunk) = Self::flush_data(
&info.schema,
&mut states,
&keyspace,
epoch,
&mut state_tables,
)
.await?
if let Some(chunk) =
Self::flush_data(&info.schema, &mut states, epoch, &mut state_tables)
.await?
{
assert_eq!(epoch, barrier.epoch.prev);
yield Message::Chunk(chunk);
Expand Down
12 changes: 1 addition & 11 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ struct HashAggExecutorExtra<S: StateStore> {
/// Schema from input
input_schema: Schema,

/// The executor operates on this keyspace.
keyspace: Vec<Keyspace<S>>,

/// A [`HashAggExecutor`] may have multiple [`AggCall`]s.
agg_calls: Vec<AggCall>,

Expand Down Expand Up @@ -140,7 +137,6 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
identity: format!("HashAggExecutor-{:X}", executor_id),
input_pk_indices: input_info.pk_indices,
input_schema: input_info.schema,
keyspace,
agg_calls,
key_indices,
state_tables: Arc::new(RwLock::new(state_tables)),
Expand Down Expand Up @@ -206,7 +202,6 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
ref agg_calls,
ref input_pk_indices,
ref input_schema,
ref keyspace,
ref schema,
ref mut state_tables,
..
Expand Down Expand Up @@ -276,7 +271,6 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
.map_err(StreamExecutorError::eval_error)?,
),
agg_calls,
keyspace,
input_pk_data_types.clone(),
epoch,
Some(hash_code),
Expand Down Expand Up @@ -323,21 +317,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
async fn flush_data<'a>(
&mut HashAggExecutorExtra::<S> {
ref key_indices,
ref keyspace,
ref schema,
ref mut state_tables,
..
}: &'a mut HashAggExecutorExtra<S>,
state_map: &'a mut EvictableHashMap<K, Option<Box<AggState<S>>>>,
epoch: u64,
) {
// The state store of each keyspace is the same so just need the first.
let store = keyspace[0].state_store();
// --- Flush states to the state store ---
// Some state will have the correct output only after their internal states have been
// fully flushed.
let dirty_cnt = {
let mut write_batch = store.start_write_batch();
let mut dirty_cnt = 0;
let mut state_tables = state_tables.write().await;
for states in state_map.values_mut() {
Expand All @@ -350,7 +340,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
.iter_mut()
.zip_eq(state_tables.iter_mut())
{
state.flush(&mut write_batch, state_table).await?;
state.flush(state_table).await?;
}
}
}
Expand Down
Loading

0 comments on commit 309ce36

Please sign in to comment.