diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 04b9409325561..abaac7a46a031 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -16,6 +16,7 @@ use std::time::Duration; use anyhow::Result; use futures_async_stream::try_stream; use itertools::Itertools; +use maplit::hashmap; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::error::RwError; use risingwave_common::field_generator::FieldGeneratorImpl; @@ -88,13 +89,14 @@ impl DatagenEventGenerator { self.offset += 1; rows_generated_this_second += 1; } + let chunk = StreamChunk::from_rows(&rows, &self.data_types); - let state = [(self.split_id.clone(), self.offset.to_string())] - .into_iter() - .collect(); + let mapping = hashmap! { + self.split_id.clone() => (self.offset - 1).to_string() + }; yield StreamChunkWithState { chunk, - split_offset_mapping: Some(state), + split_offset_mapping: Some(mapping), }; } } diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index d708930364711..e77d4572eae0e 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -19,6 +19,7 @@ use anyhow::Result; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use maplit::hashmap; use nexmark::config::NexmarkConfig; use nexmark::event::EventType; use nexmark::EventGenerator; @@ -105,13 +106,13 @@ impl SplitReaderV2 for NexmarkSplitReader { fn into_stream(self) -> BoxSourceWithStateStream { // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; - spawn_data_generation_stream(self.into_data_stream_inner(), BUFFER_SIZE).boxed() + spawn_data_generation_stream(self.into_chunk_stream(), BUFFER_SIZE).boxed() } } impl NexmarkSplitReader { #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] - async fn into_data_stream_inner(mut self) { + async fn into_chunk_stream(mut self) { let start_time = Instant::now(); let start_offset = self.generator.global_offset(); let start_ts = self.generator.timestamp(); @@ -147,9 +148,7 @@ impl NexmarkSplitReader { ) .await; } - let mapping = [(self.split_id.clone(), self.generator.offset().to_string())] - .into_iter() - .collect(); + let mapping = hashmap! {self.split_id.clone() => self.generator.offset().to_string()}; let stream_chunk = StreamChunk::from_rows(&rows, &event_dtypes); yield StreamChunkWithState { chunk: stream_chunk, diff --git a/src/frontend/planner_test/tests/testdata/share.yaml b/src/frontend/planner_test/tests/testdata/share.yaml index dfca75175b229..83dbad15b71c2 100644 --- a/src/frontend/planner_test/tests/testdata/share.yaml +++ b/src/frontend/planner_test/tests/testdata/share.yaml @@ -30,13 +30,13 @@ | └─BatchFilter { predicate: (initial_bid = 1:Int32) } | └─BatchProject { exprs: [id, initial_bid] } | └─BatchFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) } - | └─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"], filter: (None, None) } + | └─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"], filter: (None, None) } └─BatchExchange { order: [], dist: HashShard(id) } └─BatchProject { exprs: [id] } └─BatchFilter { predicate: (initial_bid = 2:Int32) } └─BatchProject { exprs: [id, initial_bid] } └─BatchFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) } - └─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"], filter: (None, None) } + └─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"], filter: (None, None) } stream_plan: | StreamMaterialize { columns: [cnt], pk_columns: [] } └─StreamProject { exprs: [sum0(count)] } @@ -51,8 +51,8 @@ | └─StreamShare { id = 519 } | └─StreamProject { exprs: [id, initial_bid, _row_id] } | └─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) } - | └─StreamRowIdGen { row_id_index: 9 } - | └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"] } + | └─StreamRowIdGen { row_id_index: 10 } + | └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] } └─StreamExchange { dist: HashShard(id) } └─StreamProject { exprs: [id, _row_id] } └─StreamFilter { predicate: (initial_bid = 2:Int32) } @@ -60,8 +60,8 @@ └─StreamShare { id = 519 } └─StreamProject { exprs: [id, initial_bid, _row_id] } └─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) } - └─StreamRowIdGen { row_id_index: 9 } - └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"] } + └─StreamRowIdGen { row_id_index: 10 } + └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] } - id: nexmark_q5 before: - create_sources @@ -105,7 +105,7 @@ | └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] } | └─BatchExchange { order: [], dist: HashShard(auction) } | └─BatchProject { exprs: [auction, date_time] } - | └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"], filter: (None, None) } + | └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } └─BatchProject { exprs: [max(count), window_start] } └─BatchHashAgg { group_key: [window_start], aggs: [max(count)] } └─BatchExchange { order: [], dist: HashShard(window_start) } @@ -113,7 +113,7 @@ └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] } └─BatchExchange { order: [], dist: HashShard(auction) } └─BatchProject { exprs: [auction, date_time] } - └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"], filter: (None, None) } + └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } stream_plan: | StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] } └─StreamProject { exprs: [auction, count, window_start, window_start] } @@ -127,8 +127,8 @@ | └─StreamProject { exprs: [auction, date_time, _row_id] } | └─StreamShare { id = 744 } | └─StreamProject { exprs: [auction, date_time, _row_id] } - | └─StreamRowIdGen { row_id_index: 4 } - | └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] } + | └─StreamRowIdGen { row_id_index: 7 } + | └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } └─StreamProject { exprs: [max(count), window_start] } └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] } └─StreamExchange { dist: HashShard(window_start) } @@ -139,5 +139,5 @@ └─StreamProject { exprs: [auction, date_time, _row_id] } └─StreamShare { id = 744 } └─StreamProject { exprs: [auction, date_time, _row_id] } - └─StreamRowIdGen { row_id_index: 4 } - └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] } + └─StreamRowIdGen { row_id_index: 7 } + └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 5db1cfbcbf4e3..ece356878c8e6 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -501,10 +501,7 @@ mod tests { async fn test_source_executor() { let table_id = TableId::default(); let schema = Schema { - fields: vec![ - Field::unnamed(DataType::Int64), - Field::with_name(DataType::Int32, "sequence_int"), - ], + fields: vec![Field::with_name(DataType::Int32, "sequence_int")], }; let row_id_index = Some(0); let pk_column_ids = vec![0]; @@ -583,10 +580,10 @@ mod tests { assert_eq!( msg.into_chunk().unwrap(), StreamChunk::from_pretty( - " I i - + . 11 - + . 12 - + . 13" + " i + + 11 + + 12 + + 13" ) ); }