diff --git a/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs index 564ddedb255f9..8846868b0ebc6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs @@ -120,7 +120,14 @@ impl StreamNode for StreamGlobalSimpleAgg { ), distinct_dedup_tables: distinct_dedup_tables .into_iter() - .map(|(key_idx, table)| (key_idx as u32, table.to_internal_table_prost())) + .map(|(key_idx, table)| { + ( + key_idx as u32, + table + .with_id(state.gen_table_id_wrapped()) + .to_internal_table_prost(), + ) + }) .collect(), }) } diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 0f0a3ed897246..b8184bb627428 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -129,7 +129,14 @@ impl StreamNode for StreamHashAgg { ), distinct_dedup_tables: distinct_dedup_tables .into_iter() - .map(|(key_idx, table)| (key_idx as u32, table.to_internal_table_prost())) + .map(|(key_idx, table)| { + ( + key_idx as u32, + table + .with_id(state.gen_table_id_wrapped()) + .to_internal_table_prost(), + ) + }) .collect(), }) } diff --git a/src/meta/src/stream/stream_graph/visit.rs b/src/meta/src/stream/stream_graph/visit.rs index 2bbaffbf6712e..d9f399f5f7006 100644 --- a/src/meta/src/stream/stream_graph/visit.rs +++ b/src/meta/src/stream/stream_graph/visit.rs @@ -39,7 +39,7 @@ where /// Visit the internal tables of a [`StreamFragment`]. pub(super) fn visit_internal_tables(fragment: &mut StreamFragment, mut f: F) where - F: FnMut(&mut Table, &'static str), + F: FnMut(&mut Table, &str), { macro_rules! always { ($table:expr, $name:expr) => {{ @@ -93,6 +93,9 @@ where always!(s.table, "HashAgg"); } } + for (distinct_col, dedup_table) in &mut node.distinct_dedup_tables { + f(dedup_table, &format!("HashAggDedupForCol{}", distinct_col)); + } } NodeBody::GlobalSimpleAgg(node) => { assert_eq!(node.agg_call_states.len(), node.agg_calls.len()); @@ -104,6 +107,12 @@ where always!(s.table, "GlobalSimpleAgg"); } } + for (distinct_col, dedup_table) in &mut node.distinct_dedup_tables { + f( + dedup_table, + &format!("GlobalSimpleAggDedupForCol{}", distinct_col), + ); + } } // Top-N