diff --git a/Cargo.lock b/Cargo.lock index 2a551ec0e4718..8d83cb3fa3397 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4030,6 +4030,7 @@ dependencies = [ "madsim", "madsim-tokio", "madsim-tonic", + "maplit", "memcomparable", "num-traits", "paste", @@ -4039,6 +4040,7 @@ dependencies = [ "rdkafka", "risingwave_batch", "risingwave_common", + "risingwave_connector", "risingwave_pb", "risingwave_rpc_client", "risingwave_source", @@ -4591,6 +4593,7 @@ dependencies = [ "madsim", "madsim-tokio", "madsim-tonic", + "maplit", "memcomparable", "num-traits", "parking_lot", diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index d72a6ec6de039..94aa0161dce32 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -21,6 +21,7 @@ itertools = "0.10" lazy_static = "1" log = "0.4" madsim = "=0.2.0-alpha.3" +maplit = "1.0.2" memcomparable = { path = "../utils/memcomparable" } num-traits = "0.2" paste = "1" @@ -29,6 +30,7 @@ prost = "0.10" rdkafka = { version = "0.28", features = ["cmake-build"] } risingwave_batch = { path = "../batch" } risingwave_common = { path = "../common" } +risingwave_connector = { path = "../connector" } risingwave_pb = { path = "../prost" } risingwave_rpc_client = { path = "../rpc_client" } risingwave_source = { path = "../source" } diff --git a/src/connector/src/datagen/source/reader.rs b/src/connector/src/datagen/source/reader.rs index 58003bea34fa6..88c5515db71ff 100644 --- a/src/connector/src/datagen/source/reader.rs +++ b/src/connector/src/datagen/source/reader.rs @@ -104,10 +104,20 @@ impl SplitReader for DatagenSplitReader { .get(&random_seed_key) .map(|s| s.to_string()) { - Some(seed) => seed.parse::().unwrap_or(split_index), + Some(seed) => { + match seed.parse::() { + // we use given seed xor split_index to make sure every split has different + // seed + Ok(seed) => seed ^ split_index, + Err(e) => { + log::warn!("cannot parse {:?} to u64 due to {:?}, will use {:?} as random seed", seed, e, split_index); + split_index + } + } + } None => split_index, }; - match column.data_type{ + match column.data_type { DataType::Timestamp => { let max_past_key = format!("fields.{}.max_past", name); let max_past_value = diff --git a/src/connector/src/datagen/split.rs b/src/connector/src/datagen/split.rs index 16e2e6412f741..aea0e3939132c 100644 --- a/src/connector/src/datagen/split.rs +++ b/src/connector/src/datagen/split.rs @@ -20,9 +20,9 @@ use crate::base::SplitMetaData; #[derive(Clone, Serialize, Deserialize, Debug, Default, PartialEq, Hash)] pub struct DatagenSplit { - pub(crate) split_index: i32, - pub(crate) split_num: i32, - pub(crate) start_offset: Option, + pub split_index: i32, + pub split_num: i32, + pub start_offset: Option, } impl SplitMetaData for DatagenSplit { diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 8a64073cabc9a..c28ff81ae4e6e 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -37,9 +37,9 @@ extern crate core; pub mod base; -mod datagen; +pub mod datagen; mod filesystem; -mod kafka; +pub mod kafka; pub mod kinesis; mod nexmark; mod pulsar; diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index a659705ce5b9a..fe06142c9b7c4 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -24,6 +24,7 @@ itertools = "0.10" lazy_static = "1" log = "0.4" madsim = "=0.2.0-alpha.3" +maplit = "1.0.2" memcomparable = { path = "../utils/memcomparable" } num-traits = "0.2" parking_lot = "0.12" diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 90341a6aa1107..8b099fd78286e 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -167,8 +167,8 @@ pub trait ExprFn = Fn(&DataChunk) -> Result + Send + Sync + 'static; #[derive(Debug, PartialEq, Clone, Default)] pub struct AddOutput { - map: HashMap<(ActorId, DispatcherId), Vec>, - splits: HashMap>, + pub map: HashMap<(ActorId, DispatcherId), Vec>, + pub splits: HashMap>, } #[derive(Debug, Clone, PartialEq)] diff --git a/src/stream/src/executor/source.rs b/src/stream/src/executor/source.rs index e0023eeae9f5c..71b7393b9f544 100644 --- a/src/stream/src/executor/source.rs +++ b/src/stream/src/executor/source.rs @@ -27,8 +27,8 @@ use risingwave_connector::state::SourceStateHandler; use risingwave_connector::{ConnectorState, SplitImpl, SplitMetaData}; use risingwave_source::*; use risingwave_storage::{Keyspace, StateStore}; -use tokio::sync::mpsc::{channel, Receiver, UnboundedReceiver}; -use tokio::sync::{oneshot, Notify}; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::{Mutex, Notify}; use tokio::time::Instant; use super::error::StreamExecutorError; @@ -134,7 +134,7 @@ impl SourceExecutor { struct SourceReader { /// The reader for stream source. - stream_reader: Box, + stream_reader: Arc>>, /// The reader for barrier. barrier_receiver: UnboundedReceiver, /// Expected barrier latency in ms. If there are no barrier within the expected barrier @@ -145,49 +145,34 @@ struct SourceReader { impl SourceReader { #[try_stream(ok = StreamChunkWithState, error = RwError)] async fn stream_reader( - mut stream_reader: Box, + stream_reader: Arc>>, notifier: Arc, expected_barrier_latency_ms: u64, - mut inject_source_rx: Receiver<(Box, oneshot::Sender<()>)>, ) { - let (msg_tx, mut msg_rx) = channel::>(1); - let handler = tokio::task::spawn(async move { - loop { - let now = Instant::now(); - - // We allow data to flow for `expected_barrier_latency_ms` milliseconds. - while now.elapsed().as_millis() < expected_barrier_latency_ms as u128 { - tokio::select! { - biased; - reader = inject_source_rx.recv() => { - if let Some((new_reader, tx)) = reader { - stream_reader = new_reader; - tx.send(()).unwrap(); - } - } - chunk = stream_reader.next() => { msg_tx.send(chunk).await.unwrap(); } + 'outer: loop { + let now = Instant::now(); + + // We allow data to flow for `expected_barrier_latency_ms` milliseconds. + while now.elapsed().as_millis() < expected_barrier_latency_ms as u128 { + let mut reader_guard = stream_reader.lock().await; + let chunk_result = reader_guard.next().await; + drop(reader_guard); + match chunk_result { + Ok(chunk) => yield chunk, + Err(e) => { + error!("hang up stream reader due to polling error: {}", e); + break 'outer; } - } + }; + } - // Here we consider two cases: - // - // 1. Barrier arrived before waiting for notified. In this case, this await will - // complete instantly, and we will continue to produce new data. - // 2. Barrier arrived after waiting for notified. Then source will be stalled. + // Here we consider two cases: + // + // 1. Barrier arrived before waiting for notified. In this case, this await will + // complete instantly, and we will continue to produce new data. + // 2. Barrier arrived after waiting for notified. Then source will be stalled. - notifier.notified().await; - } - }); - 'outer: loop { - match msg_rx.recv().await.unwrap() { - Ok(chunk) => yield chunk, - Err(e) => { - error!("hang up stream reader due to polling error: {}", e); - handler.abort(); - // Then hang up this stream by breaking the loop. - break 'outer; - } - } + notifier.notified().await; } futures::future::pending().await @@ -206,7 +191,6 @@ impl SourceReader { fn into_stream( self, - inject_source: Receiver<(Box, oneshot::Sender<()>)>, ) -> impl Stream, Result>> { let notifier = Arc::new(Notify::new()); @@ -215,7 +199,6 @@ impl SourceReader { self.stream_reader, notifier, self.expected_barrier_latency_ms, - inject_source, ); select_with_strategy( barrier_receiver.map(Either::Left), @@ -326,22 +309,21 @@ impl SourceExecutor { }; // todo: use epoch from msg to restore state from state store - let stream_reader = self - .build_stream_source_reader(recover_state) - .await - .map_err(StreamExecutorError::source_error)?; + let stream_reader = Arc::new(Mutex::new( + self.build_stream_source_reader(recover_state) + .await + .map_err(StreamExecutorError::source_error)?, + )); let reader = SourceReader { - stream_reader, + stream_reader: stream_reader.clone(), barrier_receiver, expected_barrier_latency_ms: self.expected_barrier_latency_ms, }; yield Message::Barrier(barrier); - let (inject_source_tx, inject_source_rx) = - channel::<(Box, oneshot::Sender<()>)>(1); #[for_await] - for msg in reader.into_stream(inject_source_rx) { + for msg in reader.into_stream() { match msg { // This branch will be preferred. Either::Left(barrier) => { @@ -370,20 +352,7 @@ impl SourceExecutor { )) .await .map_err(StreamExecutorError::source_error)?; - - let (tx, rx) = oneshot::channel(); - inject_source_tx - .send((reader, tx)) - .await - .to_rw_result() - .map_err(|e| { - StreamExecutorError::channel_closed( - e.to_string(), - ) - })?; - - // force sync - rx.await.unwrap(); + *stream_reader.lock().await = reader; self.stream_source_splits = target_state; } } @@ -473,10 +442,20 @@ mod tests { use std::sync::Arc; use futures::StreamExt; + use maplit::hashmap; use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::types::DataType; + use risingwave_common::util::sort_util::{OrderPair, OrderType}; + use risingwave_connector::datagen::DatagenSplit; + use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::data::data_type::TypeName; + use risingwave_pb::data::DataType as ProstDataType; + use risingwave_pb::plan_common::{ + ColumnCatalog as ProstColumnCatalog, ColumnDesc as ProstColumnDesc, + RowFormatType as ProstRowFormatType, + }; use risingwave_source::*; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::unbounded_channel; @@ -706,4 +685,191 @@ mod tests { Ok(()) } + + fn mock_stream_source_info() -> StreamSourceInfo { + let properties: HashMap = hashmap! { + "connector".to_string() => "datagen".to_string(), + "fields.v1.min".to_string() => "1".to_string(), + "fields.v1.max".to_string() => "1000".to_string(), + "fields.v1.seed".to_string() => "12345".to_string(), + }; + + let columns = vec![ + ProstColumnCatalog { + column_desc: Some(ProstColumnDesc { + column_type: Some(ProstDataType { + type_name: TypeName::Int64 as i32, + ..Default::default() + }), + column_id: 0, + ..Default::default() + }), + is_hidden: false, + }, + ProstColumnCatalog { + column_desc: Some(ProstColumnDesc { + column_type: Some(ProstDataType { + type_name: TypeName::Int32 as i32, + ..Default::default() + }), + column_id: 1, + name: "v1".to_string(), + ..Default::default() + }), + is_hidden: false, + }, + ]; + + StreamSourceInfo { + properties, + row_format: ProstRowFormatType::Json as i32, + row_schema_location: "".to_string(), + row_id_index: 0, + columns, + pk_column_ids: vec![0], + } + } + + fn drop_row_id(chunk: StreamChunk) -> StreamChunk { + let (ops, mut columns, bitmap) = chunk.into_inner(); + columns.remove(0); + // columns.pop(); + StreamChunk::new(ops, columns, bitmap) + } + + #[tokio::test] + async fn test_split_change_mutation() -> Result<()> { + let stream_source_info = mock_stream_source_info(); + let source_table_id = TableId::default(); + let source_manager = Arc::new(MemSourceManager::default()); + + source_manager + .create_source(&source_table_id, stream_source_info) + .await?; + + let get_schema = |column_ids: &[ColumnId], source_desc: &SourceDesc| { + let mut fields = Vec::with_capacity(column_ids.len()); + for &column_id in column_ids { + let column_desc = source_desc + .columns + .iter() + .find(|c| c.column_id == column_id) + .unwrap(); + fields.push(Field::unnamed(column_desc.data_type.clone())); + } + Schema::new(fields) + }; + + let actor_id = ActorId::default(); + let source_desc = source_manager.get_source(&source_table_id)?; + let keyspace = Keyspace::table_root(MemoryStateStore::new(), &TableId::from(0x2333)); + let column_ids = vec![ColumnId::from(0), ColumnId::from(1)]; + let schema = get_schema(&column_ids, &source_desc); + let pk_indices = vec![0_usize]; + let (barrier_tx, barrier_rx) = unbounded_channel::(); + + let source_exec = SourceExecutor::new( + actor_id, + source_table_id, + source_desc, + keyspace.clone(), + column_ids.clone(), + schema, + pk_indices, + barrier_rx, + 1, + 1, + "SourceExecutor".to_string(), + Arc::new(StreamingMetrics::unused()), + u64::MAX, + )?; + + let mut materialize = MaterializeExecutor::new( + Box::new(source_exec), + keyspace.clone(), + vec![OrderPair::new(0, OrderType::Ascending)], + column_ids.clone(), + 2, + vec![0usize], + ) + .boxed() + .execute(); + + let curr_epoch = 1919; + let init_barrier = + Barrier::new_test_barrier(curr_epoch).with_mutation(Mutation::AddOutput(AddOutput { + map: HashMap::new(), + splits: hashmap! { + ActorId::default() => vec![ + SplitImpl::Datagen( + DatagenSplit { + split_index: 0, + split_num: 3, + start_offset: None, + }), + ], + }, + })); + barrier_tx.send(init_barrier).unwrap(); + + let _ = materialize.next().await.unwrap(); // barrier + + let chunk_1 = materialize.next().await.unwrap().unwrap().into_chunk(); + + let chunk_1_truth = StreamChunk::from_pretty( + " I i + + 0 533 + + 0 833 + + 0 738 + + 0 344", + ); + + assert_eq!(drop_row_id(chunk_1.unwrap()), drop_row_id(chunk_1_truth)); + + let change_split_mutation = Barrier::new_test_barrier(curr_epoch + 1).with_mutation( + Mutation::SourceChangeSplit(hashmap! { + ActorId::default() => Some(vec![ + SplitImpl::Datagen( + DatagenSplit { + split_index: 0, + split_num: 3, + start_offset: None, + } + ), SplitImpl::Datagen( + DatagenSplit { + split_index: 1, + split_num: 3, + start_offset: None, + } + ), + ]) + }), + ); + barrier_tx.send(change_split_mutation).unwrap(); + + let _ = materialize.next().await.unwrap(); // barrier + + let chunk_2 = materialize.next().await.unwrap().unwrap().into_chunk(); + + let chunk_2_truth = StreamChunk::from_pretty( + " I i + + 0 525 + + 0 425 + + 0 29 + + 0 201", + ); + assert_eq!(drop_row_id(chunk_2.unwrap()), drop_row_id(chunk_2_truth)); + + let chunk_3 = materialize.next().await.unwrap().unwrap().into_chunk(); + + let chunk_3_truth = StreamChunk::from_pretty( + " I i + + 0 833 + + 0 533 + + 0 344", + ); + assert_eq!(drop_row_id(chunk_3.unwrap()), drop_row_id(chunk_3_truth)); + + Ok(()) + } }