diff --git a/src/batch/src/rpc/service/task_service.rs b/src/batch/src/rpc/service/task_service.rs index 0d24fcb5ebd07..63e336a1afe64 100644 --- a/src/batch/src/rpc/service/task_service.rs +++ b/src/batch/src/rpc/service/task_service.rs @@ -83,8 +83,6 @@ impl TaskService for BatchServiceImpl { // Will be used for receive task status update. // Note: we introduce this hack cuz `.execute()` do not produce a status stream, // but still share `.async_execute()` and `.try_execute()`. - // self.mgr - // .get_task_receiver(&task::TaskId::from(&task_id.unwrap())), state_rx, ))), Err(e) => { @@ -138,7 +136,7 @@ impl TaskService for BatchServiceImpl { let pb_task_output_id = TaskOutputId { task_id: Some(task_id.clone()), - // Since this is local execution path, the exchange would follo single distribution, + // Since this is local execution path, the exchange would follow single distribution, // therefore we would only have one data output. output_id: 0, }; diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 3570efb8d2863..4619e5cc393cb 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -44,7 +44,7 @@ use crate::task::channel::{create_output_channel, ChanReceiverImpl, ChanSenderIm use crate::task::BatchTaskContext; // Now we will only at most have 2 status for each status channel. Running -> Failed or Finished. -const TASK_STATUS_BUFFER_SIZE: usize = 2; +pub const TASK_STATUS_BUFFER_SIZE: usize = 2; /// A special version for batch allocation stat, passed in another task `context` C to report task /// mem usage 0 bytes at the end. @@ -86,6 +86,48 @@ where .await } +/// Send batch task status (local/distributed) to frontend. +/// +/// +/// Local mode use `StateReporter::Local`, Distributed mode use `StateReporter::Distributed` to send +/// status (Failed/Finished) update. `StateReporter::Mock` is only used in test and do not takes any +/// effect. Local sender only report Failed update, Distributed sender will also report +/// Finished/Pending/Starting/Aborted etc. +pub enum StateReporter { + Local(tokio::sync::mpsc::Sender), + Distributed(tokio::sync::mpsc::Sender), + Mock(), +} + +impl StateReporter { + pub async fn send(&mut self, val: TaskInfoResponseResult) -> BatchResult<()> { + match self { + Self::Local(s) => { + if let Err(e) = val { + s.send(Err(e)).await.map_err(|_| SenderError) + } else { + // do nothing and just return. + Ok(()) + } + } + Self::Distributed(s) => s.send(val).await.map_err(|_| SenderError), + Self::Mock() => Ok(()), + } + } + + pub fn new_with_local_sender(s: tokio::sync::mpsc::Sender) -> Self { + Self::Local(s) + } + + pub fn new_with_dist_sender(s: tokio::sync::mpsc::Sender) -> Self { + Self::Distributed(s) + } + + pub fn new_with_test() -> Self { + Self::Mock() + } +} + #[derive(PartialEq, Eq, Hash, Clone, Debug, Default)] pub struct TaskId { pub task_id: u32, @@ -247,6 +289,7 @@ pub struct BatchTaskExecution { /// Receivers data of the task. receivers: Mutex>>, + /// Sender for sending chunks between different executors. sender: ChanSenderImpl, /// Context for task execution @@ -311,11 +354,8 @@ impl BatchTaskExecution { /// hash partitioned across multiple channels. /// To obtain the result, one must pick one of the channels to consume via [`TaskOutputId`]. As /// such, parallel consumers are able to consume the result independently. - pub async fn async_execute( - self: Arc, - local_execution_failure_sender: Option>, - ) -> Result<()> { - let mut local_execution_failure_sender = local_execution_failure_sender; + pub async fn async_execute(self: Arc, state_tx: StateReporter) -> Result<()> { + let mut state_tx = state_tx; trace!( "Prepare executing plan [{:?}]: {}", self.task_id, @@ -340,10 +380,8 @@ impl BatchTaskExecution { // After we init the output receivers, it's must safe to schedule next stage -- able to send // TaskStatus::Running here. - let (mut state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE); // Init the state receivers. Swap out later. - *self.state_rx.lock() = Some(state_rx); - self.change_state_notify(TaskStatus::Running, &mut state_tx, None, None) + self.change_state_notify(TaskStatus::Running, &mut state_tx, None) .await?; // Clone `self` to make compiler happy because of the move block. @@ -378,12 +416,7 @@ impl BatchTaskExecution { warn!("failed to send None to annotate end"); } if let Err(_e) = t_1 - .change_state_notify( - TaskStatus::Failed, - &mut state_tx, - Some(err_str), - local_execution_failure_sender.as_mut(), - ) + .change_state_notify(TaskStatus::Failed, &mut state_tx, Some(err_str)) .await { // It's possible to send fail. Same reason in `.try_execute`. @@ -453,22 +486,12 @@ impl BatchTaskExecution { pub async fn change_state_notify( &self, task_status: TaskStatus, - state_tx: &mut tokio::sync::mpsc::Sender, + state_tx: &mut StateReporter, err_str: Option, - local_exec_failur_tx: Option<&mut tokio::sync::mpsc::Sender>, ) -> BatchResult<()> { self.change_state(task_status); if let Some(err_str) = err_str { - // This is a hack for local execution mode failure propagate - if let Some(l) = local_exec_failur_tx { - l.send(Err(Status::internal(err_str.clone()))) - .await - .map_err(|_| SenderError)?; - } - state_tx - .send(Err(Status::internal(err_str))) - .await - .map_err(|_| SenderError) + state_tx.send(Err(Status::internal(err_str))).await } else { // Notify frontend the task status. state_tx @@ -481,7 +504,6 @@ impl BatchTaskExecution { ..Default::default() })) .await - .map_err(|_| SenderError) } } @@ -494,7 +516,7 @@ impl BatchTaskExecution { root: BoxedExecutor, sender: &mut ChanSenderImpl, mut shutdown_rx: Receiver, - state_tx: &mut tokio::sync::mpsc::Sender, + state_tx: &mut StateReporter, ) -> Result<()> { let mut data_chunk_stream = root.execute(); let mut state = TaskStatus::Unspecified; @@ -545,7 +567,7 @@ impl BatchTaskExecution { } } - if let Err(e) = self.change_state_notify(state, state_tx, None, None).await { + if let Err(e) = self.change_state_notify(state, state_tx, None).await { warn!( "The status receiver in FE has closed so the status push is failed {:}", e diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index 3a39502e7d204..4a025b36bfa37 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -32,7 +32,9 @@ use tonic::Status; use crate::rpc::service::exchange::GrpcExchangeWriter; use crate::rpc::service::task_service::TaskInfoResponseResult; -use crate::task::{BatchTaskExecution, ComputeNodeContext, TaskId, TaskOutput, TaskOutputId}; +use crate::task::{ + BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId, TaskOutput, TaskOutputId, +}; /// `BatchManager` is responsible for managing all batch tasks. #[derive(Clone)] @@ -82,6 +84,7 @@ impl BatchManager { plan: PlanFragment, epoch: BatchQueryEpoch, context: ComputeNodeContext, + state_reporter: StateReporter, ) -> Result<()> { trace!("Received task id: {:?}, plan: {:?}", tid, plan); let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime)?; @@ -100,7 +103,7 @@ impl BatchManager { )) .into()) }; - task.clone().async_execute(None).await?; + task.clone().async_execute(state_reporter).await?; ret } @@ -272,7 +275,7 @@ mod tests { use risingwave_pb::expr::TableFunction; use tonic::Code; - use crate::task::{BatchManager, ComputeNodeContext, TaskId}; + use crate::task::{BatchManager, ComputeNodeContext, StateReporter, TaskId}; #[test] fn test_task_not_found() { @@ -332,11 +335,18 @@ mod tests { plan.clone(), to_committed_batch_query_epoch(0), context.clone(), + StateReporter::new_with_test(), ) .await .unwrap(); let err = manager - .fire_task(&task_id, plan, to_committed_batch_query_epoch(0), context) + .fire_task( + &task_id, + plan, + to_committed_batch_query_epoch(0), + context, + StateReporter::new_with_test(), + ) .await .unwrap_err(); assert!(err @@ -382,6 +392,7 @@ mod tests { plan.clone(), to_committed_batch_query_epoch(0), context.clone(), + StateReporter::new_with_test(), ) .await .unwrap();