From 2ac5bad5ccb282a82ce9cc5cafd73272af59be35 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 11 Jan 2023 17:34:19 +0100 Subject: [PATCH 1/2] fix: stream group topn used wrong types for hash key --- e2e_test/streaming/group_top_n.slt | 28 ++++++++++++++++++++++++ src/common/src/hash/dispatcher.rs | 4 +++- src/stream/src/from_proto/group_top_n.rs | 5 ++++- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/e2e_test/streaming/group_top_n.slt b/e2e_test/streaming/group_top_n.slt index 1dee1f895ba79..50095bbdbc7cb 100644 --- a/e2e_test/streaming/group_top_n.slt +++ b/e2e_test/streaming/group_top_n.slt @@ -198,3 +198,31 @@ drop materialized view mv; statement ok drop table bid; + +# https://github.com/risingwavelabs/risingwave/issues/7276 +statement ok +CREATE TABLE t ( + x int, + ts TIMESTAMP +); + +statement ok +create materialized view mv as SELECT + 1 as x +FROM ( + SELECT + row_number() over (partition by ts order by x) as rank + FROM + t +) +WHERE rank <=1; + +statement ok +INSERT INTO t VALUES +(1, '2015-07-15 00:00:01'); + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/common/src/hash/dispatcher.rs b/src/common/src/hash/dispatcher.rs index 9fd61a7ca96f6..8f33d461bb28f 100644 --- a/src/common/src/hash/dispatcher.rs +++ b/src/common/src/hash/dispatcher.rs @@ -64,7 +64,9 @@ pub trait HashKeyDispatcher: Sized { fn data_types(&self) -> &[DataType]; fn dispatch(self) -> Self::Output { - match calc_hash_key_kind(self.data_types()) { + let k = calc_hash_key_kind(self.data_types()); + tracing::debug!("dispatch {:?} to {:?}", self.data_types(), k); + match k { HashKeyKind::Key8 => self.dispatch_impl::(), HashKeyKind::Key16 => self.dispatch_impl::(), HashKeyKind::Key32 => self.dispatch_impl::(), diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index e062c9f516353..b2449697326cb 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -46,7 +46,10 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder { let state_table = StateTable::from_table_catalog(table, store, vnodes).await; let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); let [input]: [_; 1] = params.input.try_into().unwrap(); - let group_key_types = input.schema().data_types()[..group_by.len()].to_vec(); + let group_key_types = group_by + .iter() + .map(|i| input.schema()[*i].data_type()) + .collect(); let order_by = node.order_by.iter().map(OrderPair::from_prost).collect(); assert_eq!(¶ms.pk_indices, input.pk_indices()); From c04e4e4c3c5041c82b5a1c6e93653f31787e1658 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Thu, 12 Jan 2023 14:19:06 +0800 Subject: [PATCH 2/2] remove debug --- src/common/src/hash/dispatcher.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/common/src/hash/dispatcher.rs b/src/common/src/hash/dispatcher.rs index 8f33d461bb28f..9fd61a7ca96f6 100644 --- a/src/common/src/hash/dispatcher.rs +++ b/src/common/src/hash/dispatcher.rs @@ -64,9 +64,7 @@ pub trait HashKeyDispatcher: Sized { fn data_types(&self) -> &[DataType]; fn dispatch(self) -> Self::Output { - let k = calc_hash_key_kind(self.data_types()); - tracing::debug!("dispatch {:?} to {:?}", self.data_types(), k); - match k { + match calc_hash_key_kind(self.data_types()) { HashKeyKind::Key8 => self.dispatch_impl::(), HashKeyKind::Key16 => self.dispatch_impl::(), HashKeyKind::Key32 => self.dispatch_impl::(),