Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: report local execution mode error #7454

Merged
merged 10 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions e2e_test/batch/issue_7324.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# This is a test on error propagation of local mode. If we can not correctly handle the error report, it will hang up (#7324).

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
SET CREATE_COMPACTION_GROUP_FOR_MV TO true;

statement ok
CREATE TABLE INT2_TBL(f1 int2);

statement ok
INSERT INTO INT2_TBL(f1) VALUES ('0 ');

statement ok
INSERT INTO INT2_TBL(f1) VALUES (' 1234 ');

statement ok
INSERT INTO INT2_TBL(f1) VALUES (' -1234');

statement ok
INSERT INTO INT2_TBL(f1) VALUES ('32767');

statement ok
INSERT INTO INT2_TBL(f1) VALUES ('-32767');

statement error
SELECT i.f1, i.f1 * smallint '2' AS x FROM INT2_TBL i;

statement ok
drop table INT2_TBL;
4 changes: 3 additions & 1 deletion src/batch/src/execution/grpc_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ impl ExchangeSource for GrpcExchangeSource {
fn take_data(&mut self) -> Self::TakeDataFuture<'_> {
async {
let res = match self.stream.next().await {
None => return Ok(None),
None => {
return Ok(None);
}
Some(r) => r,
};
let task_data = res?;
Expand Down
35 changes: 17 additions & 18 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use tonic::{Request, Response, Status};

use crate::rpc::service::exchange::GrpcExchangeWriter;
use crate::task::{
self, BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, TaskId,
BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId,
TASK_STATUS_BUFFER_SIZE,
};

const LOCAL_EXECUTE_BUFFER_SIZE: usize = 64;
Expand All @@ -43,6 +44,7 @@ impl BatchServiceImpl {
}
}
pub(crate) type TaskInfoResponseResult = std::result::Result<TaskInfoResponse, Status>;
pub(crate) type GetDataResponseResult = std::result::Result<GetDataResponse, Status>;
#[async_trait::async_trait]
impl TaskService for BatchServiceImpl {
type CreateTaskStream = ReceiverStream<TaskInfoResponseResult>;
Expand All @@ -59,6 +61,8 @@ impl TaskService for BatchServiceImpl {
epoch,
} = request.into_inner();

let (state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE);
let state_reporter = StateReporter::new_with_dist_sender(state_tx);
let res = self
.mgr
.fire_task(
Expand All @@ -69,6 +73,7 @@ impl TaskService for BatchServiceImpl {
self.env.clone(),
TaskId::from(task_id.as_ref().expect("no task id found")),
),
state_reporter,
)
.await;
match res {
Expand All @@ -78,8 +83,7 @@ impl TaskService for BatchServiceImpl {
// Will be used for receive task status update.
// Note: we introduce this hack cuz `.execute()` do not produce a status stream,
// but still share `.async_execute()` and `.try_execute()`.
self.mgr
.get_task_receiver(&task::TaskId::from(&task_id.unwrap())),
state_rx,
))),
Err(e) => {
error!("failed to fire task {}", e);
Expand Down Expand Up @@ -120,8 +124,9 @@ impl TaskService for BatchServiceImpl {
);
let task = BatchTaskExecution::new(&task_id, plan, context, epoch, self.mgr.runtime())?;
let task = Arc::new(task);

if let Err(e) = task.clone().async_execute().await {
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);
let state_reporter = StateReporter::new_with_local_sender(tx.clone());
if let Err(e) = task.clone().async_execute(state_reporter).await {
error!(
"failed to build executors and trigger execution of Task {:?}: {}",
task_id, e
Expand All @@ -142,20 +147,14 @@ impl TaskService for BatchServiceImpl {
);
e
})?;
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);

let mut writer = GrpcExchangeWriter::new(tx.clone());
let finish = output
.take_data_with_num(&mut writer, tx.capacity())
.await?;
if !finish {
self.mgr.runtime().spawn(async move {
match output.take_data(&mut writer).await {
Ok(_) => Ok(()),
Err(e) => tx.send(Err(e.into())).await,
}
});
}
// Always spawn a task and do not block current function.
self.mgr.runtime().spawn(async move {
match output.take_data(&mut writer).await {
Ok(_) => Ok(()),
Err(e) => tx.send(Err(e.into())).await,
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
1 change: 1 addition & 0 deletions src/batch/src/task/broadcast_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSende
use crate::task::data_chunk_in_channel::DataChunkInChannel;

/// `BroadcastSender` sends the same chunk to a number of `BroadcastReceiver`s.
#[derive(Clone)]
pub struct BroadcastSender {
senders: Vec<mpsc::Sender<Option<DataChunkInChannel>>>,
broadcast_info: BroadcastInfo,
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(super) trait ChanSender: Send {
fn send(&mut self, chunk: Option<DataChunk>) -> Self::SendFuture<'_>;
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ChanSenderImpl {
HashShuffle(HashShuffleSender),
ConsistentHashShuffle(ConsistentHashShuffleSender),
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/task/consistent_hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::error::Result as BatchResult;
use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
use crate::task::data_chunk_in_channel::DataChunkInChannel;

#[derive(Clone)]
pub struct ConsistentHashShuffleSender {
senders: Vec<mpsc::Sender<Option<DataChunkInChannel>>>,
consistent_hash_info: ConsistentHashInfo,
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/task/fifo_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::error::BatchError::SenderError;
use crate::error::Result as BatchResult;
use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
use crate::task::data_chunk_in_channel::DataChunkInChannel;
#[derive(Clone)]
pub struct FifoSender {
sender: mpsc::Sender<Option<DataChunkInChannel>>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::error::BatchError::SenderError;
use crate::error::Result as BatchResult;
use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
use crate::task::data_chunk_in_channel::DataChunkInChannel;

#[derive(Clone)]
pub struct HashShuffleSender {
senders: Vec<mpsc::Sender<Option<DataChunkInChannel>>>,
hash_info: HashInfo,
Expand Down
92 changes: 69 additions & 23 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ use crate::error::BatchError::SenderError;
use crate::error::{BatchError, Result as BatchResult};
use crate::executor::{BoxedExecutor, ExecutorBuilder};
use crate::rpc::service::exchange::ExchangeWriter;
use crate::rpc::service::task_service::TaskInfoResponseResult;
use crate::rpc::service::task_service::{GetDataResponseResult, TaskInfoResponseResult};
use crate::task::channel::{create_output_channel, ChanReceiverImpl, ChanSenderImpl};
use crate::task::BatchTaskContext;

// Now we will only at most have 2 status for each status channel. Running -> Failed or Finished.
const TASK_STATUS_BUFFER_SIZE: usize = 2;
pub const TASK_STATUS_BUFFER_SIZE: usize = 2;

/// A special version for batch allocation stat, passed in another task `context` C to report task
/// mem usage 0 bytes at the end.
Expand Down Expand Up @@ -86,6 +86,48 @@ where
.await
}

/// Send batch task status (local/distributed) to frontend.
///
///
/// Local mode use `StateReporter::Local`, Distributed mode use `StateReporter::Distributed` to send
/// status (Failed/Finished) update. `StateReporter::Mock` is only used in test and do not takes any
/// effect. Local sender only report Failed update, Distributed sender will also report
/// Finished/Pending/Starting/Aborted etc.
pub enum StateReporter {
Local(tokio::sync::mpsc::Sender<GetDataResponseResult>),
Distributed(tokio::sync::mpsc::Sender<TaskInfoResponseResult>),
Mock(),
}

impl StateReporter {
pub async fn send(&mut self, val: TaskInfoResponseResult) -> BatchResult<()> {
match self {
Self::Local(s) => {
if let Err(e) = val {
s.send(Err(e)).await.map_err(|_| SenderError)
} else {
// do nothing and just return.
Ok(())
}
}
Self::Distributed(s) => s.send(val).await.map_err(|_| SenderError),
Self::Mock() => Ok(()),
}
}

pub fn new_with_local_sender(s: tokio::sync::mpsc::Sender<GetDataResponseResult>) -> Self {
Self::Local(s)
}

pub fn new_with_dist_sender(s: tokio::sync::mpsc::Sender<TaskInfoResponseResult>) -> Self {
Self::Distributed(s)
}

pub fn new_with_test() -> Self {
Self::Mock()
}
}

#[derive(PartialEq, Eq, Hash, Clone, Debug, Default)]
pub struct TaskId {
pub task_id: u32,
Expand Down Expand Up @@ -247,6 +289,9 @@ pub struct BatchTaskExecution<C> {
/// Receivers data of the task.
receivers: Mutex<Vec<Option<ChanReceiverImpl>>>,

/// Sender for sending chunks between different executors.
sender: ChanSenderImpl,

/// Context for task execution
context: C,

Expand Down Expand Up @@ -275,17 +320,27 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
runtime: &'static Runtime,
) -> Result<Self> {
let task_id = TaskId::from(prost_tid);

let (sender, receivers) = create_output_channel(
plan.get_exchange_info()?,
context.get_config().developer.batch_output_channel_size,
)?;

let mut rts = Vec::new();
rts.extend(receivers.into_iter().map(Some));

Ok(Self {
task_id,
plan,
state: Mutex::new(TaskStatus::Pending),
receivers: Mutex::new(Vec::new()),
receivers: Mutex::new(rts),
failure: Arc::new(Mutex::new(None)),
epoch,
shutdown_tx: Mutex::new(None),
state_rx: Mutex::new(None),
context,
runtime,
sender,
})
}

Expand All @@ -299,7 +354,8 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
/// hash partitioned across multiple channels.
/// To obtain the result, one must pick one of the channels to consume via [`TaskOutputId`]. As
/// such, parallel consumers are able to consume the result independently.
pub async fn async_execute(self: Arc<Self>) -> Result<()> {
pub async fn async_execute(self: Arc<Self>, state_tx: StateReporter) -> Result<()> {
let mut state_tx = state_tx;
trace!(
"Prepare executing plan [{:?}]: {}",
self.task_id,
Expand All @@ -316,26 +372,15 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
.await?;

// Init shutdown channel and data receivers.
let (sender, receivers) = create_output_channel(
self.plan.get_exchange_info()?,
self.context
.get_config()
.developer
.batch_output_channel_size,
)?;
let sender = self.sender.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<u64>();
*self.shutdown_tx.lock() = Some(shutdown_tx);
self.receivers
.lock()
.extend(receivers.into_iter().map(Some));
let failure = self.failure.clone();
let task_id = self.task_id.clone();

// After we init the output receivers, it's must safe to schedule next stage -- able to send
// TaskStatus::Running here.
let (mut state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE);
// Init the state receivers. Swap out later.
*self.state_rx.lock() = Some(state_rx);
self.change_state_notify(TaskStatus::Running, &mut state_tx, None)
.await?;

Expand Down Expand Up @@ -373,6 +418,11 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
// It's possible to send fail. Same reason in `.try_execute`.
warn!("send task execution error message fail!");
}

// There will be no more chunks, so send None.
if let Err(_e) = sender.send(None).await {
warn!("failed to send None to annotate end");
}
}
};

Expand Down Expand Up @@ -437,15 +487,12 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
pub async fn change_state_notify(
&self,
task_status: TaskStatus,
state_tx: &mut tokio::sync::mpsc::Sender<TaskInfoResponseResult>,
state_tx: &mut StateReporter,
err_str: Option<String>,
) -> BatchResult<()> {
self.change_state(task_status);
if let Some(err_str) = err_str {
state_tx
.send(Err(Status::internal(err_str)))
.await
.map_err(|_| SenderError)
state_tx.send(Err(Status::internal(err_str))).await
} else {
// Notify frontend the task status.
state_tx
Expand All @@ -458,7 +505,6 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
..Default::default()
}))
.await
.map_err(|_| SenderError)
}
}

Expand All @@ -471,7 +517,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
root: BoxedExecutor,
sender: &mut ChanSenderImpl,
mut shutdown_rx: Receiver<u64>,
state_tx: &mut tokio::sync::mpsc::Sender<TaskInfoResponseResult>,
state_tx: &mut StateReporter,
) -> Result<()> {
let mut data_chunk_stream = root.execute();
let mut state = TaskStatus::Unspecified;
Expand Down
Loading