Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
waruto210 committed Jan 31, 2023
1 parent 7afcde9 commit 36134f0
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 29 deletions.
10 changes: 6 additions & 4 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;
use anyhow::Result;
use futures_async_stream::try_stream;
use itertools::Itertools;
use maplit::hashmap;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::error::RwError;
use risingwave_common::field_generator::FieldGeneratorImpl;
Expand Down Expand Up @@ -88,13 +89,14 @@ impl DatagenEventGenerator {
self.offset += 1;
rows_generated_this_second += 1;
}

let chunk = StreamChunk::from_rows(&rows, &self.data_types);
let state = [(self.split_id.clone(), self.offset.to_string())]
.into_iter()
.collect();
let mapping = hashmap! {
self.split_id.clone() => (self.offset - 1).to_string()
};
yield StreamChunkWithState {
chunk,
split_offset_mapping: Some(state),
split_offset_mapping: Some(mapping),
};
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/connector/src/source/nexmark/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use anyhow::Result;
use async_trait::async_trait;
use futures::StreamExt;
use futures_async_stream::try_stream;
use maplit::hashmap;
use nexmark::config::NexmarkConfig;
use nexmark::event::EventType;
use nexmark::EventGenerator;
Expand Down Expand Up @@ -105,13 +106,13 @@ impl SplitReaderV2 for NexmarkSplitReader {
fn into_stream(self) -> BoxSourceWithStateStream {
// Will buffer at most 4 event chunks.
const BUFFER_SIZE: usize = 4;
spawn_data_generation_stream(self.into_data_stream_inner(), BUFFER_SIZE).boxed()
spawn_data_generation_stream(self.into_chunk_stream(), BUFFER_SIZE).boxed()
}
}

impl NexmarkSplitReader {
#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
async fn into_data_stream_inner(mut self) {
async fn into_chunk_stream(mut self) {
let start_time = Instant::now();
let start_offset = self.generator.global_offset();
let start_ts = self.generator.timestamp();
Expand Down Expand Up @@ -147,9 +148,7 @@ impl NexmarkSplitReader {
)
.await;
}
let mapping = [(self.split_id.clone(), self.generator.offset().to_string())]
.into_iter()
.collect();
let mapping = hashmap! {self.split_id.clone() => self.generator.offset().to_string()};
let stream_chunk = StreamChunk::from_rows(&rows, &event_dtypes);
yield StreamChunkWithState {
chunk: stream_chunk,
Expand Down
24 changes: 12 additions & 12 deletions src/frontend/planner_test/tests/testdata/share.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
| └─BatchFilter { predicate: (initial_bid = 1:Int32) }
| └─BatchProject { exprs: [id, initial_bid] }
| └─BatchFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
| └─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"], filter: (None, None) }
| └─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"], filter: (None, None) }
└─BatchExchange { order: [], dist: HashShard(id) }
└─BatchProject { exprs: [id] }
└─BatchFilter { predicate: (initial_bid = 2:Int32) }
└─BatchProject { exprs: [id, initial_bid] }
└─BatchFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
└─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"], filter: (None, None) }
└─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [cnt], pk_columns: [] }
└─StreamProject { exprs: [sum0(count)] }
Expand All @@ -51,17 +51,17 @@
| └─StreamShare { id = 519 }
| └─StreamProject { exprs: [id, initial_bid, _row_id] }
| └─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
| └─StreamRowIdGen { row_id_index: 9 }
| └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"] }
| └─StreamRowIdGen { row_id_index: 10 }
| └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] }
└─StreamExchange { dist: HashShard(id) }
└─StreamProject { exprs: [id, _row_id] }
└─StreamFilter { predicate: (initial_bid = 2:Int32) }
└─StreamProject { exprs: [id, initial_bid, _row_id] }
└─StreamShare { id = 519 }
└─StreamProject { exprs: [id, initial_bid, _row_id] }
└─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
└─StreamRowIdGen { row_id_index: 9 }
└─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"] }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] }
- id: nexmark_q5
before:
- create_sources
Expand Down Expand Up @@ -105,15 +105,15 @@
| └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] }
| └─BatchExchange { order: [], dist: HashShard(auction) }
| └─BatchProject { exprs: [auction, date_time] }
| └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"], filter: (None, None) }
| └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
└─BatchProject { exprs: [max(count), window_start] }
└─BatchHashAgg { group_key: [window_start], aggs: [max(count)] }
└─BatchExchange { order: [], dist: HashShard(window_start) }
└─BatchHashAgg { group_key: [auction, window_start], aggs: [count] }
└─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] }
└─BatchExchange { order: [], dist: HashShard(auction) }
└─BatchProject { exprs: [auction, date_time] }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"], filter: (None, None) }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] }
└─StreamProject { exprs: [auction, count, window_start, window_start] }
Expand All @@ -127,8 +127,8 @@
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamShare { id = 744 }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamRowIdGen { row_id_index: 4 }
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] }
| └─StreamRowIdGen { row_id_index: 7 }
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
└─StreamProject { exprs: [max(count), window_start] }
└─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
└─StreamExchange { dist: HashShard(window_start) }
Expand All @@ -139,5 +139,5 @@
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamShare { id = 744 }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamRowIdGen { row_id_index: 4 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
13 changes: 5 additions & 8 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,7 @@ mod tests {
async fn test_source_executor() {
let table_id = TableId::default();
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Int64),
Field::with_name(DataType::Int32, "sequence_int"),
],
fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
};
let row_id_index = Some(0);
let pk_column_ids = vec![0];
Expand Down Expand Up @@ -583,10 +580,10 @@ mod tests {
assert_eq!(
msg.into_chunk().unwrap(),
StreamChunk::from_pretty(
" I i
+ . 11
+ . 12
+ . 13"
" i
+ 11
+ 12
+ 13"
)
);
}
Expand Down

0 comments on commit 36134f0

Please sign in to comment.