diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index 9a16b75fbc1d3..181dc247eef57 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -12,18 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::future::Either; -use futures::stream::select; +use either::Either; use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId}; -use risingwave_connector::source::StreamChunkWithState; use risingwave_source::dml_manager::DmlManagerRef; use super::error::StreamExecutorError; +use super::stream_reader::StreamReaderWithPause; use super::{ - expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices, - PkIndicesRef, + expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, Mutation, + PkIndices, PkIndicesRef, }; /// [`DmlExecutor`] accepts both stream data and batch data for data manipulation on a specific @@ -92,31 +91,40 @@ impl DmlExecutor { .dml_manager .register_reader(self.table_id, self.table_version_id, &self.column_descs) .map_err(StreamExecutorError::connector_error)?; - let batch_reader = batch_reader - .stream_reader() - .into_stream() - .map(Either::Right); + let batch_reader = batch_reader.stream_reader().into_stream(); - yield Message::Barrier(barrier); + // Merge the two streams using `StreamReaderWithPause` because when we receive a pause + // barrier, we should stop receiving the data from DML. We poll data from the two streams in + // a round robin way. + let mut stream = StreamReaderWithPause::::new(upstream, batch_reader); - // Stream data from the upstream executor. - let upstream = upstream.map(Either::Left); + // If the first barrier is configuration change, then the DML executor must be newly + // created, and we should start with the paused state. + if barrier.is_update() { + stream.pause_stream(); + } - // Merge the two streams. - let stream = select(upstream, batch_reader); + yield Message::Barrier(barrier); - #[for_await] - for input_msg in stream { - match input_msg { + while let Some(input_msg) = stream.next().await { + match input_msg? { Either::Left(msg) => { - // Stream data. - let msg: Message = msg?; + // Stream messages. + if let Message::Barrier(barrier) = &msg { + // We should handle barrier messages here to pause or resume the data from + // DML. + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + _ => {} + } + } + } yield msg; } Either::Right(chunk) => { // Batch data. - let chunk: StreamChunkWithState = - chunk.map_err(StreamExecutorError::connector_error)?; yield Message::Chunk(chunk.chunk); } } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index ac7f897a391dc..6331b4a1b3825 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -85,6 +85,7 @@ mod sink; mod sort; mod sort_buffer; pub mod source; +mod stream_reader; pub mod subtask; mod top_n; mod union; diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index cc8900630ebfc..89320e8ac52a9 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -33,7 +33,7 @@ use super::executor_core::StreamSourceCore; use crate::error::StreamResult; use crate::executor::error::StreamExecutorError; use crate::executor::monitor::StreamingMetrics; -use crate::executor::source::reader::SourceReaderStream; +use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; /// [`FsSourceExecutor`] is a streaming source, fir external file systems /// such as s3. @@ -110,10 +110,10 @@ impl FsSourceExecutor { Ok(steam_reader.into_stream()) } - async fn apply_split_change( + async fn apply_split_change( &mut self, source_desc: &FsSourceDesc, - stream: &mut SourceReaderStream, + stream: &mut StreamReaderWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { @@ -176,10 +176,10 @@ impl FsSourceExecutor { Ok((!no_change_flag).then_some(target_state)) } - async fn replace_stream_reader_with_target_state( + async fn replace_stream_reader_with_target_state( &mut self, source_desc: &FsSourceDesc, - stream: &mut SourceReaderStream, + stream: &mut StreamReaderWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -192,7 +192,7 @@ impl FsSourceExecutor { let reader = self .build_stream_source_reader(source_desc, Some(target_state.clone())) .await?; - stream.replace_source_stream(reader); + stream.replace_data_stream(reader); self.stream_source_core.stream_source_splits = target_state .into_iter() @@ -332,10 +332,12 @@ impl FsSourceExecutor { .stack_trace("fs_source_start_reader") .await?; - // Merge the chunks from source and the barriers into a single stream. - let mut stream = SourceReaderStream::new(barrier_receiver, source_chunk_reader); + // Merge the chunks from source and the barriers into a single stream. We prioritize + // barriers over source data chunks here. + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); if start_with_paused { - stream.pause_source(); + stream.pause_stream(); } yield Message::Barrier(barrier); @@ -349,42 +351,53 @@ impl FsSourceExecutor { while let Some(msg) = stream.next().await { match msg? { // This branch will be preferred. - Either::Left(barrier) => { - last_barrier_time = Instant::now(); - if self_paused { - stream.resume_source(); - self_paused = false; - } - let epoch = barrier.epoch; - - if let Some(ref mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::SourceChangeSplit(actor_splits) => { - self.apply_split_change(&source_desc, &mut stream, actor_splits) - .await? - } - Mutation::Pause => stream.pause_source(), - Mutation::Resume => stream.resume_source(), - Mutation::Update { actor_splits, .. } => { - self.apply_split_change(&source_desc, &mut stream, actor_splits) + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + last_barrier_time = Instant::now(); + if self_paused { + stream.resume_stream(); + self_paused = false; + } + let epoch = barrier.epoch; + + if let Some(ref mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::SourceChangeSplit(actor_splits) => { + self.apply_split_change(&source_desc, &mut stream, actor_splits) + .await? + } + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + Mutation::Update { actor_splits, .. } => { + self.apply_split_change( + &source_desc, + &mut stream, + actor_splits, + ) .await?; + } + _ => {} } - _ => {} } + self.take_snapshot_and_clear_cache(epoch).await?; + + self.metrics + .source_row_per_barrier + .with_label_values(&[ + self.ctx.id.to_string().as_str(), + self.stream_source_core.source_identify.as_ref(), + ]) + .inc_by(metric_row_per_barrier); + metric_row_per_barrier = 0; + + yield msg; } - self.take_snapshot_and_clear_cache(epoch).await?; - - self.metrics - .source_row_per_barrier - .with_label_values(&[ - self.ctx.id.to_string().as_str(), - self.stream_source_core.source_identify.as_ref(), - ]) - .inc_by(metric_row_per_barrier); - metric_row_per_barrier = 0; - - yield Message::Barrier(barrier); - } + _ => { + // For the source executor, the message we receive from this arm should + // always be barrier message. + unreachable!(); + } + }, Either::Right(StreamChunkWithState { chunk, @@ -395,7 +408,7 @@ impl FsSourceExecutor { // we can guarantee the source is not paused since it received stream // chunks. self_paused = true; - stream.pause_source(); + stream.pause_stream(); } // update split offset if let Some(mapping) = split_offset_mapping { diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 1ca31c79696da..7c13505e58545 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -13,13 +13,28 @@ // limitations under the License. pub mod executor_core; +use async_stack_trace::StackTrace; pub use executor_core::StreamSourceCore; mod fs_source_executor; pub use fs_source_executor::*; +use risingwave_common::bail; +pub use state_table_handler::*; pub mod source_executor; -mod reader; pub mod state_table_handler; -pub use state_table_handler::*; +use futures_async_stream::try_stream; +use tokio::sync::mpsc::UnboundedReceiver; + +use crate::executor::error::StreamExecutorError; +use crate::executor::{Barrier, Message}; + +/// Receive barriers from barrier manager with the channel, error on channel close. +#[try_stream(ok = Message, error = StreamExecutorError)] +pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver) { + while let Some(barrier) = rx.recv().stack_trace("receive_barrier").await { + yield Message::Barrier(barrier); + } + bail!("barrier reader closed unexpectedly"); +} diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 606af4cfdfe64..1ac2ace79f6cd 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -28,7 +28,7 @@ use tokio::time::Instant; use super::executor_core::StreamSourceCore; use crate::executor::monitor::StreamingMetrics; -use crate::executor::source::reader::SourceReaderStream; +use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; /// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to @@ -107,10 +107,10 @@ impl SourceExecutor { .map_err(StreamExecutorError::connector_error) } - async fn apply_split_change( + async fn apply_split_change( &mut self, source_desc: &SourceDesc, - stream: &mut SourceReaderStream, + stream: &mut StreamReaderWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { @@ -164,10 +164,10 @@ impl SourceExecutor { Ok((!no_change_flag).then_some(target_state)) } - async fn replace_stream_reader_with_target_state( + async fn replace_stream_reader_with_target_state( &mut self, source_desc: &SourceDesc, - stream: &mut SourceReaderStream, + stream: &mut StreamReaderWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -180,7 +180,7 @@ impl SourceExecutor { let reader = self .build_stream_source_reader(source_desc, Some(target_state.clone())) .await?; - stream.replace_source_stream(reader); + stream.replace_data_stream(reader); self.stream_source_core .as_mut() @@ -290,13 +290,15 @@ impl SourceExecutor { .stack_trace("source_build_reader") .await?; - // Merge the chunks from source and the barriers into a single stream. - let mut stream = SourceReaderStream::new(barrier_receiver, source_chunk_reader); + // Merge the chunks from source and the barriers into a single stream. We prioritize + // barriers over source data chunks here. + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); // If the first barrier is configuration change, then the source executor must be newly // created, and we should start with the paused state. if barrier.is_update() { - stream.pause_source(); + stream.pause_stream(); } yield Message::Barrier(barrier); @@ -311,47 +313,58 @@ impl SourceExecutor { while let Some(msg) = stream.next().await { match msg? { // This branch will be preferred. - Either::Left(barrier) => { - last_barrier_time = Instant::now(); - if self_paused { - stream.resume_source(); - self_paused = false; - } - let epoch = barrier.epoch; - - if let Some(ref mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::SourceChangeSplit(actor_splits) => { - self.apply_split_change(&source_desc, &mut stream, actor_splits) - .await? - } - Mutation::Pause => stream.pause_source(), - Mutation::Resume => stream.resume_source(), - Mutation::Update { actor_splits, .. } => { - self.apply_split_change(&source_desc, &mut stream, actor_splits) + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + last_barrier_time = Instant::now(); + if self_paused { + stream.resume_stream(); + self_paused = false; + } + let epoch = barrier.epoch; + + if let Some(ref mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::SourceChangeSplit(actor_splits) => { + self.apply_split_change(&source_desc, &mut stream, actor_splits) + .await? + } + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + Mutation::Update { actor_splits, .. } => { + self.apply_split_change( + &source_desc, + &mut stream, + actor_splits, + ) .await?; + } + _ => {} } - _ => {} } - } - self.take_snapshot_and_clear_cache(epoch).await?; + self.take_snapshot_and_clear_cache(epoch).await?; - self.metrics - .source_row_per_barrier - .with_label_values(&[ - self.ctx.id.to_string().as_str(), - self.stream_source_core - .as_ref() - .unwrap() - .source_identify - .as_ref(), - ]) - .inc_by(metric_row_per_barrier); - metric_row_per_barrier = 0; + self.metrics + .source_row_per_barrier + .with_label_values(&[ + self.ctx.id.to_string().as_str(), + self.stream_source_core + .as_ref() + .unwrap() + .source_identify + .as_ref(), + ]) + .inc_by(metric_row_per_barrier); + metric_row_per_barrier = 0; - yield Message::Barrier(barrier); - } + yield msg; + } + _ => { + // For the source executor, the message we receive from this arm should + // always be barrier message. + unreachable!(); + } + }, Either::Right(StreamChunkWithState { chunk, @@ -362,7 +375,7 @@ impl SourceExecutor { // we can guarantee the source is not paused since it received stream // chunks. self_paused = true; - stream.pause_source(); + stream.pause_stream(); } if let Some(mapping) = split_offset_mapping { let state: HashMap<_, _> = mapping diff --git a/src/stream/src/executor/source/reader.rs b/src/stream/src/executor/stream_reader.rs similarity index 56% rename from src/stream/src/executor/source/reader.rs rename to src/stream/src/executor/stream_reader.rs index b801b853eb0eb..aef2870a67e12 100644 --- a/src/stream/src/executor/source/reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -20,37 +20,38 @@ use either::Either; use futures::stream::{select_with_strategy, BoxStream, PollNext, SelectWithStrategy}; use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; -use risingwave_common::bail; use risingwave_connector::source::{BoxSourceWithStateStream, StreamChunkWithState}; -use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; -use crate::executor::Barrier; - -type SourceReaderMessage = StreamExecutorResult>; -type SourceReaderArm = BoxStream<'static, SourceReaderMessage>; -type SourceReaderStreamInner = - SelectWithStrategy PollNext, ()>; - -pub(super) struct SourceReaderStream { - inner: SourceReaderStreamInner, +use crate::executor::Message; + +type ExecutorMessageStream = BoxStream<'static, StreamExecutorResult>; +type StreamReaderData = StreamExecutorResult>; +type ReaderArm = BoxStream<'static, StreamReaderData>; +type StreamReaderWithPauseInner = + SelectWithStrategy PollNext, PollNext>; + +/// [`StreamReaderWithPause`] merges two streams, with one receiving barriers (and maybe other types +/// of messages) and the other receiving data only (no barrier). The merged stream can be paused +/// (`StreamReaderWithPause::pause_stream`) and resumed (`StreamReaderWithPause::resume_stream`). +/// A paused stream will not receive any data from either original stream until a barrier arrives +/// and the stream is resumed. +/// +/// ## Priority +/// +/// If `BIASED` is `true`, the left-hand stream (the one receiving barriers) will get a higher +/// priority over the right-hand one. Otherwise, the two streams will be polled in a round robin +/// fashion. +pub(super) struct StreamReaderWithPause { + inner: StreamReaderWithPauseInner, /// Whether the source stream is paused. paused: bool, } -impl SourceReaderStream { - /// Receive barriers from barrier manager with the channel, error on channel close. - #[try_stream(ok = Barrier, error = StreamExecutorError)] - async fn barrier_receiver(mut rx: UnboundedReceiver) { - while let Some(barrier) = rx.recv().stack_trace("source_recv_barrier").await { - yield barrier; - } - bail!("barrier reader closed unexpectedly"); - } - - /// Receive chunks and states from the source reader, hang up on error. +impl StreamReaderWithPause { + /// Receive chunks and states from the reader. Hang up on error. #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)] - async fn source_stream(stream: BoxSourceWithStateStream) { + async fn data_stream(stream: BoxSourceWithStateStream) { // TODO: support stack trace for Stream #[for_await] for chunk in stream { @@ -64,38 +65,33 @@ impl SourceReaderStream { } } - /// Convert this reader to a stream. + /// Construct a `StreamReaderWithPause` with one stream receiving barrier messages (and maybe + /// other types of messages) and the other receiving data only (no barrier). pub fn new( - barrier_receiver: UnboundedReceiver, - source_stream: BoxSourceWithStateStream, + message_stream: ExecutorMessageStream, + data_stream: BoxSourceWithStateStream, ) -> Self { + let message_stream_arm = message_stream.map_ok(Either::Left).boxed(); + let data_stream_arm = Self::data_stream(data_stream).map_ok(Either::Right).boxed(); + let inner = Self::new_inner(message_stream_arm, data_stream_arm); Self { - inner: Self::new_inner( - Self::barrier_receiver(barrier_receiver) - .map_ok(Either::Left) - .boxed(), - Self::source_stream(source_stream) - .map_ok(Either::Right) - .boxed(), - ), + inner, paused: false, } } - fn new_inner( - barrier_receiver_arm: SourceReaderArm, - source_stream_arm: SourceReaderArm, - ) -> SourceReaderStreamInner { - select_with_strategy( - barrier_receiver_arm, - source_stream_arm, - // We prefer barrier on the left hand side over source chunks. - |_: &mut ()| PollNext::Left, - ) + fn new_inner(message_stream: ReaderArm, data_stream: ReaderArm) -> StreamReaderWithPauseInner { + let strategy = if BIASED { + |_: &mut PollNext| PollNext::Left + } else { + // The poll strategy is not biased: we poll the two streams in a round robin way. + |last: &mut PollNext| last.toggle() + }; + select_with_strategy(message_stream, data_stream, strategy) } - /// Replace the source stream with a new one for given `stream`. Used for split change. - pub fn replace_source_stream(&mut self, source_stream: BoxSourceWithStateStream) { + /// Replace the data stream with a new one for given `stream`. Used for split change. + pub fn replace_data_stream(&mut self, data_stream: BoxSourceWithStateStream) { // Take the barrier receiver arm. let barrier_receiver_arm = std::mem::replace( self.inner.get_mut().0, @@ -106,35 +102,38 @@ impl SourceReaderStream { // to ensure the internal state of the `SelectWithStrategy` is reset. (#6300) self.inner = Self::new_inner( barrier_receiver_arm, - Self::source_stream(source_stream) - .map_ok(Either::Right) - .boxed(), + Self::data_stream(data_stream).map_ok(Either::Right).boxed(), ); } - /// Pause the source stream. - pub fn pause_source(&mut self) { + /// Pause the data stream. + pub fn pause_stream(&mut self) { assert!(!self.paused, "already paused"); self.paused = true; } - /// Resume the source stream, panic if the source is not paused before. - pub fn resume_source(&mut self) { + /// Resume the data stream. Panic if the data stream is not paused. + pub fn resume_stream(&mut self) { assert!(self.paused, "not paused"); self.paused = false; } } -impl Stream for SourceReaderStream { - type Item = SourceReaderMessage; +impl Stream for StreamReaderWithPause { + type Item = StreamReaderData; fn poll_next( mut self: Pin<&mut Self>, ctx: &mut std::task::Context<'_>, ) -> Poll> { if self.paused { + // Note: It is safe here to poll the left arm even if it contains streaming messages + // other than barriers: after the upstream executor sends a `Mutation::Pause`, there + // should be no more message until a `Mutation::Update` and a 'Mutation::Resume`. self.inner.get_mut().0.poll_next_unpin(ctx) } else { + // TODO: We may need to prioritize the data stream (right-hand stream) after resuming + // from the paused state. self.inner.poll_next_unpin(ctx) } } @@ -149,6 +148,7 @@ mod tests { use tokio::sync::mpsc; use super::*; + use crate::executor::{barrier_to_message_stream, Barrier}; #[tokio::test] async fn test_pause_and_resume() { @@ -157,7 +157,8 @@ mod tests { let table_dml_handle = TableDmlHandle::new(vec![]); let source_stream = table_dml_handle.stream_reader().into_stream(); - let stream = SourceReaderStream::new(barrier_rx, source_stream); + let barrier_stream = barrier_to_message_stream(barrier_rx).boxed(); + let stream = StreamReaderWithPause::::new(barrier_stream, source_stream); pin_mut!(stream); macro_rules! next { @@ -181,7 +182,7 @@ mod tests { assert_matches!(next!().unwrap(), Either::Left(_)); // Pause the stream. - stream.pause_source(); + stream.pause_stream(); // Write a barrier. barrier_tx.send(Barrier::new_test_barrier(2)).unwrap(); @@ -197,7 +198,7 @@ mod tests { assert!(next!().is_none()); // Resume the stream. - stream.resume_source(); + stream.resume_stream(); // Then we can receive the chunk sent when the stream is paused. assert_matches!(next!().unwrap(), Either::Right(_)); }