Skip to content

Commit

Permalink
add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
BowenXiao1999 committed Feb 2, 2023
1 parent 444389e commit d51835d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 37 deletions.
4 changes: 1 addition & 3 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ impl TaskService for BatchServiceImpl {
// Will be used for receive task status update.
// Note: we introduce this hack cuz `.execute()` do not produce a status stream,
// but still share `.async_execute()` and `.try_execute()`.
// self.mgr
// .get_task_receiver(&task::TaskId::from(&task_id.unwrap())),
state_rx,
))),
Err(e) => {
Expand Down Expand Up @@ -138,7 +136,7 @@ impl TaskService for BatchServiceImpl {

let pb_task_output_id = TaskOutputId {
task_id: Some(task_id.clone()),
// Since this is local execution path, the exchange would follo single distribution,
// Since this is local execution path, the exchange would follow single distribution,
// therefore we would only have one data output.
output_id: 0,
};
Expand Down
82 changes: 52 additions & 30 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::task::channel::{create_output_channel, ChanReceiverImpl, ChanSenderIm
use crate::task::BatchTaskContext;

// Now we will only at most have 2 status for each status channel. Running -> Failed or Finished.
const TASK_STATUS_BUFFER_SIZE: usize = 2;
pub const TASK_STATUS_BUFFER_SIZE: usize = 2;

/// A special version for batch allocation stat, passed in another task `context` C to report task
/// mem usage 0 bytes at the end.
Expand Down Expand Up @@ -86,6 +86,48 @@ where
.await
}

/// Send batch task status (local/distributed) to frontend.
///
///
/// Local mode use `StateReporter::Local`, Distributed mode use `StateReporter::Distributed` to send
/// status (Failed/Finished) update. `StateReporter::Mock` is only used in test and do not takes any
/// effect. Local sender only report Failed update, Distributed sender will also report
/// Finished/Pending/Starting/Aborted etc.
pub enum StateReporter {
Local(tokio::sync::mpsc::Sender<GetDataResponseResult>),
Distributed(tokio::sync::mpsc::Sender<TaskInfoResponseResult>),
Mock(),
}

impl StateReporter {
pub async fn send(&mut self, val: TaskInfoResponseResult) -> BatchResult<()> {
match self {
Self::Local(s) => {
if let Err(e) = val {
s.send(Err(e)).await.map_err(|_| SenderError)
} else {
// do nothing and just return.
Ok(())
}
}
Self::Distributed(s) => s.send(val).await.map_err(|_| SenderError),
Self::Mock() => Ok(()),
}
}

pub fn new_with_local_sender(s: tokio::sync::mpsc::Sender<GetDataResponseResult>) -> Self {
Self::Local(s)
}

pub fn new_with_dist_sender(s: tokio::sync::mpsc::Sender<TaskInfoResponseResult>) -> Self {
Self::Distributed(s)
}

pub fn new_with_test() -> Self {
Self::Mock()
}
}

#[derive(PartialEq, Eq, Hash, Clone, Debug, Default)]
pub struct TaskId {
pub task_id: u32,
Expand Down Expand Up @@ -247,6 +289,7 @@ pub struct BatchTaskExecution<C> {
/// Receivers data of the task.
receivers: Mutex<Vec<Option<ChanReceiverImpl>>>,

/// Sender for sending chunks between different executors.
sender: ChanSenderImpl,

/// Context for task execution
Expand Down Expand Up @@ -311,11 +354,8 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
/// hash partitioned across multiple channels.
/// To obtain the result, one must pick one of the channels to consume via [`TaskOutputId`]. As
/// such, parallel consumers are able to consume the result independently.
pub async fn async_execute(
self: Arc<Self>,
local_execution_failure_sender: Option<tokio::sync::mpsc::Sender<GetDataResponseResult>>,
) -> Result<()> {
let mut local_execution_failure_sender = local_execution_failure_sender;
pub async fn async_execute(self: Arc<Self>, state_tx: StateReporter) -> Result<()> {
let mut state_tx = state_tx;
trace!(
"Prepare executing plan [{:?}]: {}",
self.task_id,
Expand All @@ -340,10 +380,8 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

// After we init the output receivers, it's must safe to schedule next stage -- able to send
// TaskStatus::Running here.
let (mut state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE);
// Init the state receivers. Swap out later.
*self.state_rx.lock() = Some(state_rx);
self.change_state_notify(TaskStatus::Running, &mut state_tx, None, None)
self.change_state_notify(TaskStatus::Running, &mut state_tx, None)
.await?;

// Clone `self` to make compiler happy because of the move block.
Expand Down Expand Up @@ -378,12 +416,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
warn!("failed to send None to annotate end");
}
if let Err(_e) = t_1
.change_state_notify(
TaskStatus::Failed,
&mut state_tx,
Some(err_str),
local_execution_failure_sender.as_mut(),
)
.change_state_notify(TaskStatus::Failed, &mut state_tx, Some(err_str))
.await
{
// It's possible to send fail. Same reason in `.try_execute`.
Expand Down Expand Up @@ -453,22 +486,12 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
pub async fn change_state_notify(
&self,
task_status: TaskStatus,
state_tx: &mut tokio::sync::mpsc::Sender<TaskInfoResponseResult>,
state_tx: &mut StateReporter,
err_str: Option<String>,
local_exec_failur_tx: Option<&mut tokio::sync::mpsc::Sender<GetDataResponseResult>>,
) -> BatchResult<()> {
self.change_state(task_status);
if let Some(err_str) = err_str {
// This is a hack for local execution mode failure propagate
if let Some(l) = local_exec_failur_tx {
l.send(Err(Status::internal(err_str.clone())))
.await
.map_err(|_| SenderError)?;
}
state_tx
.send(Err(Status::internal(err_str)))
.await
.map_err(|_| SenderError)
state_tx.send(Err(Status::internal(err_str))).await
} else {
// Notify frontend the task status.
state_tx
Expand All @@ -481,7 +504,6 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
..Default::default()
}))
.await
.map_err(|_| SenderError)
}
}

Expand All @@ -494,7 +516,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
root: BoxedExecutor,
sender: &mut ChanSenderImpl,
mut shutdown_rx: Receiver<u64>,
state_tx: &mut tokio::sync::mpsc::Sender<TaskInfoResponseResult>,
state_tx: &mut StateReporter,
) -> Result<()> {
let mut data_chunk_stream = root.execute();
let mut state = TaskStatus::Unspecified;
Expand Down Expand Up @@ -545,7 +567,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
}
}

if let Err(e) = self.change_state_notify(state, state_tx, None, None).await {
if let Err(e) = self.change_state_notify(state, state_tx, None).await {
warn!(
"The status receiver in FE has closed so the status push is failed {:}",
e
Expand Down
19 changes: 15 additions & 4 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use tonic::Status;

use crate::rpc::service::exchange::GrpcExchangeWriter;
use crate::rpc::service::task_service::TaskInfoResponseResult;
use crate::task::{BatchTaskExecution, ComputeNodeContext, TaskId, TaskOutput, TaskOutputId};
use crate::task::{
BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId, TaskOutput, TaskOutputId,
};

/// `BatchManager` is responsible for managing all batch tasks.
#[derive(Clone)]
Expand Down Expand Up @@ -82,6 +84,7 @@ impl BatchManager {
plan: PlanFragment,
epoch: BatchQueryEpoch,
context: ComputeNodeContext,
state_reporter: StateReporter,
) -> Result<()> {
trace!("Received task id: {:?}, plan: {:?}", tid, plan);
let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime)?;
Expand All @@ -100,7 +103,7 @@ impl BatchManager {
))
.into())
};
task.clone().async_execute(None).await?;
task.clone().async_execute(state_reporter).await?;
ret
}

Expand Down Expand Up @@ -272,7 +275,7 @@ mod tests {
use risingwave_pb::expr::TableFunction;
use tonic::Code;

use crate::task::{BatchManager, ComputeNodeContext, TaskId};
use crate::task::{BatchManager, ComputeNodeContext, StateReporter, TaskId};

#[test]
fn test_task_not_found() {
Expand Down Expand Up @@ -332,11 +335,18 @@ mod tests {
plan.clone(),
to_committed_batch_query_epoch(0),
context.clone(),
StateReporter::new_with_test(),
)
.await
.unwrap();
let err = manager
.fire_task(&task_id, plan, to_committed_batch_query_epoch(0), context)
.fire_task(
&task_id,
plan,
to_committed_batch_query_epoch(0),
context,
StateReporter::new_with_test(),
)
.await
.unwrap_err();
assert!(err
Expand Down Expand Up @@ -382,6 +392,7 @@ mod tests {
plan.clone(),
to_committed_batch_query_epoch(0),
context.clone(),
StateReporter::new_with_test(),
)
.await
.unwrap();
Expand Down

0 comments on commit d51835d

Please sign in to comment.