From 20bdb72d5ff015d736fb582918cd551563139273 Mon Sep 17 00:00:00 2001 From: Bowen <36908971+BowenXiao1999@users.noreply.github.com> Date: Fri, 3 Feb 2023 19:20:31 +0800 Subject: [PATCH] fix: report local execution mode error (#7454) 1. Enable local mode error propagation. Now when local mode task (in CN) happens error, it can report to users. 2. Store sender in TaskExecution, avoid early drop (Otherwise it's possible that the task execution error will become hash shuffle error) This pr revert some previous workaround: TODO in sqlsmith, store the sender in task execution Approved-By: liurenjie1024 Co-Authored-By: BowenXiao1999 <931759898@qq.com> Co-Authored-By: Bowen <36908971+bowenxiao1999@users.noreply.github.com> --- e2e_test/batch/issue_7324.slt | 31 +++++++ src/batch/src/execution/grpc_exchange.rs | 4 +- src/batch/src/rpc/service/task_service.rs | 35 ++++--- src/batch/src/task/broadcast_channel.rs | 1 + src/batch/src/task/channel.rs | 2 +- .../task/consistent_hash_shuffle_channel.rs | 1 + src/batch/src/task/fifo_channel.rs | 1 + src/batch/src/task/hash_shuffle_channel.rs | 2 +- src/batch/src/task/task_execution.rs | 92 ++++++++++++++----- src/batch/src/task/task_manager.rs | 19 +++- src/tests/sqlsmith/src/validation.rs | 7 -- 11 files changed, 140 insertions(+), 55 deletions(-) create mode 100644 e2e_test/batch/issue_7324.slt diff --git a/e2e_test/batch/issue_7324.slt b/e2e_test/batch/issue_7324.slt new file mode 100644 index 0000000000000..e40e67064f742 --- /dev/null +++ b/e2e_test/batch/issue_7324.slt @@ -0,0 +1,31 @@ +# This is a test on error propagation of local mode. If we can not correctly handle the error report, it will hang up (#7324). + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +SET CREATE_COMPACTION_GROUP_FOR_MV TO true; + +statement ok +CREATE TABLE INT2_TBL(f1 int2); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES ('0 '); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES (' 1234 '); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES (' -1234'); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES ('32767'); + +statement ok +INSERT INTO INT2_TBL(f1) VALUES ('-32767'); + +statement error +SELECT i.f1, i.f1 * smallint '2' AS x FROM INT2_TBL i; + +statement ok +drop table INT2_TBL; \ No newline at end of file diff --git a/src/batch/src/execution/grpc_exchange.rs b/src/batch/src/execution/grpc_exchange.rs index d27320a5c1b0c..9ed1c8b659a95 100644 --- a/src/batch/src/execution/grpc_exchange.rs +++ b/src/batch/src/execution/grpc_exchange.rs @@ -77,7 +77,9 @@ impl ExchangeSource for GrpcExchangeSource { fn take_data(&mut self) -> Self::TakeDataFuture<'_> { async { let res = match self.stream.next().await { - None => return Ok(None), + None => { + return Ok(None); + } Some(r) => r, }; let task_data = res?; diff --git a/src/batch/src/rpc/service/task_service.rs b/src/batch/src/rpc/service/task_service.rs index a4a8e28b0e305..63e336a1afe64 100644 --- a/src/batch/src/rpc/service/task_service.rs +++ b/src/batch/src/rpc/service/task_service.rs @@ -26,7 +26,8 @@ use tonic::{Request, Response, Status}; use crate::rpc::service::exchange::GrpcExchangeWriter; use crate::task::{ - self, BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, TaskId, + BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId, + TASK_STATUS_BUFFER_SIZE, }; const LOCAL_EXECUTE_BUFFER_SIZE: usize = 64; @@ -43,6 +44,7 @@ impl BatchServiceImpl { } } pub(crate) type TaskInfoResponseResult = std::result::Result; +pub(crate) type GetDataResponseResult = std::result::Result; #[async_trait::async_trait] impl TaskService for BatchServiceImpl { type CreateTaskStream = ReceiverStream; @@ -59,6 +61,8 @@ impl TaskService for BatchServiceImpl { epoch, } = request.into_inner(); + let (state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE); + let state_reporter = StateReporter::new_with_dist_sender(state_tx); let res = self .mgr .fire_task( @@ -69,6 +73,7 @@ impl TaskService for BatchServiceImpl { self.env.clone(), TaskId::from(task_id.as_ref().expect("no task id found")), ), + state_reporter, ) .await; match res { @@ -78,8 +83,7 @@ impl TaskService for BatchServiceImpl { // Will be used for receive task status update. // Note: we introduce this hack cuz `.execute()` do not produce a status stream, // but still share `.async_execute()` and `.try_execute()`. - self.mgr - .get_task_receiver(&task::TaskId::from(&task_id.unwrap())), + state_rx, ))), Err(e) => { error!("failed to fire task {}", e); @@ -120,8 +124,9 @@ impl TaskService for BatchServiceImpl { ); let task = BatchTaskExecution::new(&task_id, plan, context, epoch, self.mgr.runtime())?; let task = Arc::new(task); - - if let Err(e) = task.clone().async_execute().await { + let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE); + let state_reporter = StateReporter::new_with_local_sender(tx.clone()); + if let Err(e) = task.clone().async_execute(state_reporter).await { error!( "failed to build executors and trigger execution of Task {:?}: {}", task_id, e @@ -142,20 +147,14 @@ impl TaskService for BatchServiceImpl { ); e })?; - let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE); - let mut writer = GrpcExchangeWriter::new(tx.clone()); - let finish = output - .take_data_with_num(&mut writer, tx.capacity()) - .await?; - if !finish { - self.mgr.runtime().spawn(async move { - match output.take_data(&mut writer).await { - Ok(_) => Ok(()), - Err(e) => tx.send(Err(e.into())).await, - } - }); - } + // Always spawn a task and do not block current function. + self.mgr.runtime().spawn(async move { + match output.take_data(&mut writer).await { + Ok(_) => Ok(()), + Err(e) => tx.send(Err(e.into())).await, + } + }); Ok(Response::new(ReceiverStream::new(rx))) } } diff --git a/src/batch/src/task/broadcast_channel.rs b/src/batch/src/task/broadcast_channel.rs index 0ad93e8a2d0f0..b8307c1bc33a9 100644 --- a/src/batch/src/task/broadcast_channel.rs +++ b/src/batch/src/task/broadcast_channel.rs @@ -28,6 +28,7 @@ use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSende use crate::task::data_chunk_in_channel::DataChunkInChannel; /// `BroadcastSender` sends the same chunk to a number of `BroadcastReceiver`s. +#[derive(Clone)] pub struct BroadcastSender { senders: Vec>>, broadcast_info: BroadcastInfo, diff --git a/src/batch/src/task/channel.rs b/src/batch/src/task/channel.rs index d8f14ff3b85bd..06332f2067a6e 100644 --- a/src/batch/src/task/channel.rs +++ b/src/batch/src/task/channel.rs @@ -40,7 +40,7 @@ pub(super) trait ChanSender: Send { fn send(&mut self, chunk: Option) -> Self::SendFuture<'_>; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ChanSenderImpl { HashShuffle(HashShuffleSender), ConsistentHashShuffle(ConsistentHashShuffleSender), diff --git a/src/batch/src/task/consistent_hash_shuffle_channel.rs b/src/batch/src/task/consistent_hash_shuffle_channel.rs index 57dfd60415c2d..5ac15257cb074 100644 --- a/src/batch/src/task/consistent_hash_shuffle_channel.rs +++ b/src/batch/src/task/consistent_hash_shuffle_channel.rs @@ -32,6 +32,7 @@ use crate::error::Result as BatchResult; use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl}; use crate::task::data_chunk_in_channel::DataChunkInChannel; +#[derive(Clone)] pub struct ConsistentHashShuffleSender { senders: Vec>>, consistent_hash_info: ConsistentHashInfo, diff --git a/src/batch/src/task/fifo_channel.rs b/src/batch/src/task/fifo_channel.rs index faafa035ba3c3..3d6ae1fe8a6da 100644 --- a/src/batch/src/task/fifo_channel.rs +++ b/src/batch/src/task/fifo_channel.rs @@ -24,6 +24,7 @@ use crate::error::BatchError::SenderError; use crate::error::Result as BatchResult; use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl}; use crate::task::data_chunk_in_channel::DataChunkInChannel; +#[derive(Clone)] pub struct FifoSender { sender: mpsc::Sender>, } diff --git a/src/batch/src/task/hash_shuffle_channel.rs b/src/batch/src/task/hash_shuffle_channel.rs index 2f2d99b86369c..cb4edbfa3f618 100644 --- a/src/batch/src/task/hash_shuffle_channel.rs +++ b/src/batch/src/task/hash_shuffle_channel.rs @@ -30,7 +30,7 @@ use crate::error::BatchError::SenderError; use crate::error::Result as BatchResult; use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl}; use crate::task::data_chunk_in_channel::DataChunkInChannel; - +#[derive(Clone)] pub struct HashShuffleSender { senders: Vec>>, hash_info: HashInfo, diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 04ea1a6e88d03..7819f34627aee 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -39,12 +39,12 @@ use crate::error::BatchError::SenderError; use crate::error::{BatchError, Result as BatchResult}; use crate::executor::{BoxedExecutor, ExecutorBuilder}; use crate::rpc::service::exchange::ExchangeWriter; -use crate::rpc::service::task_service::TaskInfoResponseResult; +use crate::rpc::service::task_service::{GetDataResponseResult, TaskInfoResponseResult}; use crate::task::channel::{create_output_channel, ChanReceiverImpl, ChanSenderImpl}; use crate::task::BatchTaskContext; // Now we will only at most have 2 status for each status channel. Running -> Failed or Finished. -const TASK_STATUS_BUFFER_SIZE: usize = 2; +pub const TASK_STATUS_BUFFER_SIZE: usize = 2; /// A special version for batch allocation stat, passed in another task `context` C to report task /// mem usage 0 bytes at the end. @@ -86,6 +86,48 @@ where .await } +/// Send batch task status (local/distributed) to frontend. +/// +/// +/// Local mode use `StateReporter::Local`, Distributed mode use `StateReporter::Distributed` to send +/// status (Failed/Finished) update. `StateReporter::Mock` is only used in test and do not takes any +/// effect. Local sender only report Failed update, Distributed sender will also report +/// Finished/Pending/Starting/Aborted etc. +pub enum StateReporter { + Local(tokio::sync::mpsc::Sender), + Distributed(tokio::sync::mpsc::Sender), + Mock(), +} + +impl StateReporter { + pub async fn send(&mut self, val: TaskInfoResponseResult) -> BatchResult<()> { + match self { + Self::Local(s) => { + if let Err(e) = val { + s.send(Err(e)).await.map_err(|_| SenderError) + } else { + // do nothing and just return. + Ok(()) + } + } + Self::Distributed(s) => s.send(val).await.map_err(|_| SenderError), + Self::Mock() => Ok(()), + } + } + + pub fn new_with_local_sender(s: tokio::sync::mpsc::Sender) -> Self { + Self::Local(s) + } + + pub fn new_with_dist_sender(s: tokio::sync::mpsc::Sender) -> Self { + Self::Distributed(s) + } + + pub fn new_with_test() -> Self { + Self::Mock() + } +} + #[derive(PartialEq, Eq, Hash, Clone, Debug, Default)] pub struct TaskId { pub task_id: u32, @@ -247,6 +289,9 @@ pub struct BatchTaskExecution { /// Receivers data of the task. receivers: Mutex>>, + /// Sender for sending chunks between different executors. + sender: ChanSenderImpl, + /// Context for task execution context: C, @@ -275,17 +320,27 @@ impl BatchTaskExecution { runtime: &'static Runtime, ) -> Result { let task_id = TaskId::from(prost_tid); + + let (sender, receivers) = create_output_channel( + plan.get_exchange_info()?, + context.get_config().developer.batch_output_channel_size, + )?; + + let mut rts = Vec::new(); + rts.extend(receivers.into_iter().map(Some)); + Ok(Self { task_id, plan, state: Mutex::new(TaskStatus::Pending), - receivers: Mutex::new(Vec::new()), + receivers: Mutex::new(rts), failure: Arc::new(Mutex::new(None)), epoch, shutdown_tx: Mutex::new(None), state_rx: Mutex::new(None), context, runtime, + sender, }) } @@ -299,7 +354,8 @@ impl BatchTaskExecution { /// hash partitioned across multiple channels. /// To obtain the result, one must pick one of the channels to consume via [`TaskOutputId`]. As /// such, parallel consumers are able to consume the result independently. - pub async fn async_execute(self: Arc) -> Result<()> { + pub async fn async_execute(self: Arc, state_tx: StateReporter) -> Result<()> { + let mut state_tx = state_tx; trace!( "Prepare executing plan [{:?}]: {}", self.task_id, @@ -316,26 +372,15 @@ impl BatchTaskExecution { .await?; // Init shutdown channel and data receivers. - let (sender, receivers) = create_output_channel( - self.plan.get_exchange_info()?, - self.context - .get_config() - .developer - .batch_output_channel_size, - )?; + let sender = self.sender.clone(); let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::(); *self.shutdown_tx.lock() = Some(shutdown_tx); - self.receivers - .lock() - .extend(receivers.into_iter().map(Some)); let failure = self.failure.clone(); let task_id = self.task_id.clone(); // After we init the output receivers, it's must safe to schedule next stage -- able to send // TaskStatus::Running here. - let (mut state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE); // Init the state receivers. Swap out later. - *self.state_rx.lock() = Some(state_rx); self.change_state_notify(TaskStatus::Running, &mut state_tx, None) .await?; @@ -373,6 +418,11 @@ impl BatchTaskExecution { // It's possible to send fail. Same reason in `.try_execute`. warn!("send task execution error message fail!"); } + + // There will be no more chunks, so send None. + if let Err(_e) = sender.send(None).await { + warn!("failed to send None to annotate end"); + } } }; @@ -437,15 +487,12 @@ impl BatchTaskExecution { pub async fn change_state_notify( &self, task_status: TaskStatus, - state_tx: &mut tokio::sync::mpsc::Sender, + state_tx: &mut StateReporter, err_str: Option, ) -> BatchResult<()> { self.change_state(task_status); if let Some(err_str) = err_str { - state_tx - .send(Err(Status::internal(err_str))) - .await - .map_err(|_| SenderError) + state_tx.send(Err(Status::internal(err_str))).await } else { // Notify frontend the task status. state_tx @@ -458,7 +505,6 @@ impl BatchTaskExecution { ..Default::default() })) .await - .map_err(|_| SenderError) } } @@ -471,7 +517,7 @@ impl BatchTaskExecution { root: BoxedExecutor, sender: &mut ChanSenderImpl, mut shutdown_rx: Receiver, - state_tx: &mut tokio::sync::mpsc::Sender, + state_tx: &mut StateReporter, ) -> Result<()> { let mut data_chunk_stream = root.execute(); let mut state = TaskStatus::Unspecified; diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index cfc25f2d35693..4a025b36bfa37 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -32,7 +32,9 @@ use tonic::Status; use crate::rpc::service::exchange::GrpcExchangeWriter; use crate::rpc::service::task_service::TaskInfoResponseResult; -use crate::task::{BatchTaskExecution, ComputeNodeContext, TaskId, TaskOutput, TaskOutputId}; +use crate::task::{ + BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId, TaskOutput, TaskOutputId, +}; /// `BatchManager` is responsible for managing all batch tasks. #[derive(Clone)] @@ -82,6 +84,7 @@ impl BatchManager { plan: PlanFragment, epoch: BatchQueryEpoch, context: ComputeNodeContext, + state_reporter: StateReporter, ) -> Result<()> { trace!("Received task id: {:?}, plan: {:?}", tid, plan); let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime)?; @@ -100,7 +103,7 @@ impl BatchManager { )) .into()) }; - task.clone().async_execute().await?; + task.clone().async_execute(state_reporter).await?; ret } @@ -272,7 +275,7 @@ mod tests { use risingwave_pb::expr::TableFunction; use tonic::Code; - use crate::task::{BatchManager, ComputeNodeContext, TaskId}; + use crate::task::{BatchManager, ComputeNodeContext, StateReporter, TaskId}; #[test] fn test_task_not_found() { @@ -332,11 +335,18 @@ mod tests { plan.clone(), to_committed_batch_query_epoch(0), context.clone(), + StateReporter::new_with_test(), ) .await .unwrap(); let err = manager - .fire_task(&task_id, plan, to_committed_batch_query_epoch(0), context) + .fire_task( + &task_id, + plan, + to_committed_batch_query_epoch(0), + context, + StateReporter::new_with_test(), + ) .await .unwrap_err(); assert!(err @@ -382,6 +392,7 @@ mod tests { plan.clone(), to_committed_batch_query_epoch(0), context.clone(), + StateReporter::new_with_test(), ) .await .unwrap(); diff --git a/src/tests/sqlsmith/src/validation.rs b/src/tests/sqlsmith/src/validation.rs index 5bf22ae4fe3fe..c093de39e0331 100644 --- a/src/tests/sqlsmith/src/validation.rs +++ b/src/tests/sqlsmith/src/validation.rs @@ -52,12 +52,6 @@ fn is_nested_loop_join_error(db_error: &str) -> bool { db_error.contains("Not supported: streaming nested-loop join") } -// FIXME: -// This error should not occur, remove once issue is fixed. -fn is_hash_shuffle_error(db_error: &str) -> bool { - db_error.contains("broken hash_shuffle_channel") -} - fn is_subquery_unnesting_error(db_error: &str) -> bool { db_error.contains("Subquery can not be unnested") } @@ -81,7 +75,6 @@ pub fn is_permissible_error(db_error: &str) -> bool { || is_unimplemented_error(db_error) || not_unique_error(db_error) || is_window_error(db_error) - || is_hash_shuffle_error(db_error) || is_nested_loop_join_error(db_error) || is_subquery_unnesting_error(db_error) || is_numeric_overflow_error(db_error)