diff --git a/e2e_test/batch/issue_7324.slt b/e2e_test/batch/issue_7324.slt new file mode 100644 index 0000000000000..e40e67064f742 --- /dev/null +++ b/e2e_test/batch/issue_7324.slt @@ -0,0 +1,31 @@ +# This is a test on error propagation of local mode. If we can not correctly handle the error report, it will hang up (#7324). + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +SET CREATE_COMPACTION_GROUP_FOR_MV TO true; + +statement ok +CREATE TABLE INT2_TBL(f1 int2); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES ('0 '); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES (' 1234 '); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES (' -1234'); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES ('32767'); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES ('-32767'); + +statement error +SELECT i.f1, i.f1 * smallint '2' AS x FROM INT2_TBL i; + +statement ok +drop table INT2_TBL; \ No newline at end of file diff --git a/src/batch/src/execution/grpc_exchange.rs b/src/batch/src/execution/grpc_exchange.rs index d27320a5c1b0c..9ed1c8b659a95 100644 --- a/src/batch/src/execution/grpc_exchange.rs +++ b/src/batch/src/execution/grpc_exchange.rs @@ -77,7 +77,9 @@ impl ExchangeSource for GrpcExchangeSource { fn take_data(&mut self) -> Self::TakeDataFuture<'_> { async { let res = match self.stream.next().await { - None => return Ok(None), + None => { + return Ok(None); + } Some(r) => r, }; let task_data = res?; diff --git a/src/batch/src/rpc/service/task_service.rs b/src/batch/src/rpc/service/task_service.rs index a4a8e28b0e305..63e336a1afe64 100644 --- a/src/batch/src/rpc/service/task_service.rs +++ b/src/batch/src/rpc/service/task_service.rs @@ -26,7 +26,8 @@ use tonic::{Request, Response, Status}; use crate::rpc::service::exchange::GrpcExchangeWriter; use crate::task::{ - self, BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, TaskId, + BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId, + TASK_STATUS_BUFFER_SIZE, }; const LOCAL_EXECUTE_BUFFER_SIZE: usize = 64; @@ -43,6 +44,7 @@ impl BatchServiceImpl { } } pub(crate) type TaskInfoResponseResult = std::result::Result; +pub(crate) type GetDataResponseResult = std::result::Result; #[async_trait::async_trait] impl TaskService for BatchServiceImpl { type CreateTaskStream = ReceiverStream; @@ -59,6 +61,8 @@ impl TaskService for BatchServiceImpl { epoch, } = request.into_inner(); + let (state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE); + let state_reporter = StateReporter::new_with_dist_sender(state_tx); let res = self .mgr .fire_task( @@ -69,6 +73,7 @@ impl TaskService for BatchServiceImpl { self.env.clone(), TaskId::from(task_id.as_ref().expect("no task id found")), ), + state_reporter, ) .await; match res { @@ -78,8 +83,7 @@ impl TaskService for BatchServiceImpl { // Will be used for receive task status update. // Note: we introduce this hack cuz `.execute()` do not produce a status stream, // but still share `.async_execute()` and `.try_execute()`. - self.mgr - .get_task_receiver(&task::TaskId::from(&task_id.unwrap())), + state_rx, ))), Err(e) => { error!("failed to fire task {}", e); @@ -120,8 +124,9 @@ impl TaskService for BatchServiceImpl { ); let task = BatchTaskExecution::new(&task_id, plan, context, epoch, self.mgr.runtime())?; let task = Arc::new(task); - - if let Err(e) = task.clone().async_execute().await { + let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE); + let state_reporter = StateReporter::new_with_local_sender(tx.clone()); + if let Err(e) = task.clone().async_execute(state_reporter).await { error!( "failed to build executors and trigger execution of Task {:?}: {}", task_id, e @@ -142,20 +147,14 @@ impl TaskService for BatchServiceImpl { ); e })?; - let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE); - let mut writer = GrpcExchangeWriter::new(tx.clone()); - let finish = output - .take_data_with_num(&mut writer, tx.capacity()) - .await?; - if !finish { - self.mgr.runtime().spawn(async move { - match output.take_data(&mut writer).await { - Ok(_) => Ok(()), - Err(e) => tx.send(Err(e.into())).await, - } - }); - } + // Always spawn a task and do not block current function. + self.mgr.runtime().spawn(async move { + match output.take_data(&mut writer).await { + Ok(_) => Ok(()), + Err(e) => tx.send(Err(e.into())).await, + } + }); Ok(Response::new(ReceiverStream::new(rx))) } } diff --git a/src/batch/src/task/broadcast_channel.rs b/src/batch/src/task/broadcast_channel.rs index 0ad93e8a2d0f0..b8307c1bc33a9 100644 --- a/src/batch/src/task/broadcast_channel.rs +++ b/src/batch/src/task/broadcast_channel.rs @@ -28,6 +28,7 @@ use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSende use crate::task::data_chunk_in_channel::DataChunkInChannel; /// `BroadcastSender` sends the same chunk to a number of `BroadcastReceiver`s. +#[derive(Clone)] pub struct BroadcastSender { senders: Vec>>, broadcast_info: BroadcastInfo, diff --git a/src/batch/src/task/channel.rs b/src/batch/src/task/channel.rs index d8f14ff3b85bd..06332f2067a6e 100644 --- a/src/batch/src/task/channel.rs +++ b/src/batch/src/task/channel.rs @@ -40,7 +40,7 @@ pub(super) trait ChanSender: Send { fn send(&mut self, chunk: Option) -> Self::SendFuture<'_>; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ChanSenderImpl { HashShuffle(HashShuffleSender), ConsistentHashShuffle(ConsistentHashShuffleSender), diff --git a/src/batch/src/task/consistent_hash_shuffle_channel.rs b/src/batch/src/task/consistent_hash_shuffle_channel.rs index 57dfd60415c2d..5ac15257cb074 100644 --- a/src/batch/src/task/consistent_hash_shuffle_channel.rs +++ b/src/batch/src/task/consistent_hash_shuffle_channel.rs @@ -32,6 +32,7 @@ use crate::error::Result as BatchResult; use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl}; use crate::task::data_chunk_in_channel::DataChunkInChannel; +#[derive(Clone)] pub struct ConsistentHashShuffleSender { senders: Vec>>, consistent_hash_info: ConsistentHashInfo, diff --git a/src/batch/src/task/fifo_channel.rs b/src/batch/src/task/fifo_channel.rs index faafa035ba3c3..3d6ae1fe8a6da 100644 --- a/src/batch/src/task/fifo_channel.rs +++ b/src/batch/src/task/fifo_channel.rs @@ -24,6 +24,7 @@ use crate::error::BatchError::SenderError; use crate::error::Result as BatchResult; use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl}; use crate::task::data_chunk_in_channel::DataChunkInChannel; +#[derive(Clone)] pub struct FifoSender { sender: mpsc::Sender>, } diff --git a/src/batch/src/task/hash_shuffle_channel.rs b/src/batch/src/task/hash_shuffle_channel.rs index 2f2d99b86369c..cb4edbfa3f618 100644 --- a/src/batch/src/task/hash_shuffle_channel.rs +++ b/src/batch/src/task/hash_shuffle_channel.rs @@ -30,7 +30,7 @@ use crate::error::BatchError::SenderError; use crate::error::Result as BatchResult; use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl}; use crate::task::data_chunk_in_channel::DataChunkInChannel; - +#[derive(Clone)] pub struct HashShuffleSender { senders: Vec>>, hash_info: HashInfo, diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 04ea1a6e88d03..7819f34627aee 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -39,12 +39,12 @@ use crate::error::BatchError::SenderError; use crate::error::{BatchError, Result as BatchResult}; use crate::executor::{BoxedExecutor, ExecutorBuilder}; use crate::rpc::service::exchange::ExchangeWriter; -use crate::rpc::service::task_service::TaskInfoResponseResult; +use crate::rpc::service::task_service::{GetDataResponseResult, TaskInfoResponseResult}; use crate::task::channel::{create_output_channel, ChanReceiverImpl, ChanSenderImpl}; use crate::task::BatchTaskContext; // Now we will only at most have 2 status for each status channel. Running -> Failed or Finished. -const TASK_STATUS_BUFFER_SIZE: usize = 2; +pub const TASK_STATUS_BUFFER_SIZE: usize = 2; /// A special version for batch allocation stat, passed in another task `context` C to report task /// mem usage 0 bytes at the end. @@ -86,6 +86,48 @@ where .await } +/// Send batch task status (local/distributed) to frontend. +/// +/// +/// Local mode use `StateReporter::Local`, Distributed mode use `StateReporter::Distributed` to send +/// status (Failed/Finished) update. `StateReporter::Mock` is only used in test and do not takes any +/// effect. Local sender only report Failed update, Distributed sender will also report +/// Finished/Pending/Starting/Aborted etc. +pub enum StateReporter { + Local(tokio::sync::mpsc::Sender), + Distributed(tokio::sync::mpsc::Sender), + Mock(), +} + +impl StateReporter { + pub async fn send(&mut self, val: TaskInfoResponseResult) -> BatchResult<()> { + match self { + Self::Local(s) => { + if let Err(e) = val { + s.send(Err(e)).await.map_err(|_| SenderError) + } else { + // do nothing and just return. + Ok(()) + } + } + Self::Distributed(s) => s.send(val).await.map_err(|_| SenderError), + Self::Mock() => Ok(()), + } + } + + pub fn new_with_local_sender(s: tokio::sync::mpsc::Sender) -> Self { + Self::Local(s) + } + + pub fn new_with_dist_sender(s: tokio::sync::mpsc::Sender) -> Self { + Self::Distributed(s) + } + + pub fn new_with_test() -> Self { + Self::Mock() + } +} + #[derive(PartialEq, Eq, Hash, Clone, Debug, Default)] pub struct TaskId { pub task_id: u32, @@ -247,6 +289,9 @@ pub struct BatchTaskExecution { /// Receivers data of the task. receivers: Mutex>>, + /// Sender for sending chunks between different executors. + sender: ChanSenderImpl, + /// Context for task execution context: C, @@ -275,17 +320,27 @@ impl BatchTaskExecution { runtime: &'static Runtime, ) -> Result { let task_id = TaskId::from(prost_tid); + + let (sender, receivers) = create_output_channel( + plan.get_exchange_info()?, + context.get_config().developer.batch_output_channel_size, + )?; + + let mut rts = Vec::new(); + rts.extend(receivers.into_iter().map(Some)); + Ok(Self { task_id, plan, state: Mutex::new(TaskStatus::Pending), - receivers: Mutex::new(Vec::new()), + receivers: Mutex::new(rts), failure: Arc::new(Mutex::new(None)), epoch, shutdown_tx: Mutex::new(None), state_rx: Mutex::new(None), context, runtime, + sender, }) } @@ -299,7 +354,8 @@ impl BatchTaskExecution { /// hash partitioned across multiple channels. /// To obtain the result, one must pick one of the channels to consume via [`TaskOutputId`]. As /// such, parallel consumers are able to consume the result independently. - pub async fn async_execute(self: Arc) -> Result<()> { + pub async fn async_execute(self: Arc, state_tx: StateReporter) -> Result<()> { + let mut state_tx = state_tx; trace!( "Prepare executing plan [{:?}]: {}", self.task_id, @@ -316,26 +372,15 @@ impl BatchTaskExecution { .await?; // Init shutdown channel and data receivers. - let (sender, receivers) = create_output_channel( - self.plan.get_exchange_info()?, - self.context - .get_config() - .developer - .batch_output_channel_size, - )?; + let sender = self.sender.clone(); let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::(); *self.shutdown_tx.lock() = Some(shutdown_tx); - self.receivers - .lock() - .extend(receivers.into_iter().map(Some)); let failure = self.failure.clone(); let task_id = self.task_id.clone(); // After we init the output receivers, it's must safe to schedule next stage -- able to send // TaskStatus::Running here. - let (mut state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE); // Init the state receivers. Swap out later. - *self.state_rx.lock() = Some(state_rx); self.change_state_notify(TaskStatus::Running, &mut state_tx, None) .await?; @@ -373,6 +418,11 @@ impl BatchTaskExecution { // It's possible to send fail. Same reason in `.try_execute`. warn!("send task execution error message fail!"); } + + // There will be no more chunks, so send None. + if let Err(_e) = sender.send(None).await { + warn!("failed to send None to annotate end"); + } } }; @@ -437,15 +487,12 @@ impl BatchTaskExecution { pub async fn change_state_notify( &self, task_status: TaskStatus, - state_tx: &mut tokio::sync::mpsc::Sender, + state_tx: &mut StateReporter, err_str: Option, ) -> BatchResult<()> { self.change_state(task_status); if let Some(err_str) = err_str { - state_tx - .send(Err(Status::internal(err_str))) - .await - .map_err(|_| SenderError) + state_tx.send(Err(Status::internal(err_str))).await } else { // Notify frontend the task status. state_tx @@ -458,7 +505,6 @@ impl BatchTaskExecution { ..Default::default() })) .await - .map_err(|_| SenderError) } } @@ -471,7 +517,7 @@ impl BatchTaskExecution { root: BoxedExecutor, sender: &mut ChanSenderImpl, mut shutdown_rx: Receiver, - state_tx: &mut tokio::sync::mpsc::Sender, + state_tx: &mut StateReporter, ) -> Result<()> { let mut data_chunk_stream = root.execute(); let mut state = TaskStatus::Unspecified; diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index cfc25f2d35693..4a025b36bfa37 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -32,7 +32,9 @@ use tonic::Status; use crate::rpc::service::exchange::GrpcExchangeWriter; use crate::rpc::service::task_service::TaskInfoResponseResult; -use crate::task::{BatchTaskExecution, ComputeNodeContext, TaskId, TaskOutput, TaskOutputId}; +use crate::task::{ + BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId, TaskOutput, TaskOutputId, +}; /// `BatchManager` is responsible for managing all batch tasks. #[derive(Clone)] @@ -82,6 +84,7 @@ impl BatchManager { plan: PlanFragment, epoch: BatchQueryEpoch, context: ComputeNodeContext, + state_reporter: StateReporter, ) -> Result<()> { trace!("Received task id: {:?}, plan: {:?}", tid, plan); let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime)?; @@ -100,7 +103,7 @@ impl BatchManager { )) .into()) }; - task.clone().async_execute().await?; + task.clone().async_execute(state_reporter).await?; ret } @@ -272,7 +275,7 @@ mod tests { use risingwave_pb::expr::TableFunction; use tonic::Code; - use crate::task::{BatchManager, ComputeNodeContext, TaskId}; + use crate::task::{BatchManager, ComputeNodeContext, StateReporter, TaskId}; #[test] fn test_task_not_found() { @@ -332,11 +335,18 @@ mod tests { plan.clone(), to_committed_batch_query_epoch(0), context.clone(), + StateReporter::new_with_test(), ) .await .unwrap(); let err = manager - .fire_task(&task_id, plan, to_committed_batch_query_epoch(0), context) + .fire_task( + &task_id, + plan, + to_committed_batch_query_epoch(0), + context, + StateReporter::new_with_test(), + ) .await .unwrap_err(); assert!(err @@ -382,6 +392,7 @@ mod tests { plan.clone(), to_committed_batch_query_epoch(0), context.clone(), + StateReporter::new_with_test(), ) .await .unwrap(); diff --git a/src/tests/sqlsmith/src/validation.rs b/src/tests/sqlsmith/src/validation.rs index 5bf22ae4fe3fe..c093de39e0331 100644 --- a/src/tests/sqlsmith/src/validation.rs +++ b/src/tests/sqlsmith/src/validation.rs @@ -52,12 +52,6 @@ fn is_nested_loop_join_error(db_error: &str) -> bool { db_error.contains("Not supported: streaming nested-loop join") } -// FIXME: -// This error should not occur, remove once issue is fixed. -fn is_hash_shuffle_error(db_error: &str) -> bool { - db_error.contains("broken hash_shuffle_channel") -} - fn is_subquery_unnesting_error(db_error: &str) -> bool { db_error.contains("Subquery can not be unnested") } @@ -81,7 +75,6 @@ pub fn is_permissible_error(db_error: &str) -> bool { || is_unimplemented_error(db_error) || not_unique_error(db_error) || is_window_error(db_error) - || is_hash_shuffle_error(db_error) || is_nested_loop_join_error(db_error) || is_subquery_unnesting_error(db_error) || is_numeric_overflow_error(db_error)