Skip to content

Commit

Permalink
always spawn a task to get data
Browse files Browse the repository at this point in the history
  • Loading branch information
BowenXiao1999 committed Feb 2, 2023
1 parent e32e123 commit 444389e
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use tonic::{Request, Response, Status};

use crate::rpc::service::exchange::GrpcExchangeWriter;
use crate::task::{
self, BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, TaskId,
BatchEnvironment, BatchManager, BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId,
TASK_STATUS_BUFFER_SIZE,
};

const LOCAL_EXECUTE_BUFFER_SIZE: usize = 64;
Expand Down Expand Up @@ -60,6 +61,8 @@ impl TaskService for BatchServiceImpl {
epoch,
} = request.into_inner();

let (state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE);
let state_reporter = StateReporter::new_with_dist_sender(state_tx);
let res = self
.mgr
.fire_task(
Expand All @@ -70,6 +73,7 @@ impl TaskService for BatchServiceImpl {
self.env.clone(),
TaskId::from(task_id.as_ref().expect("no task id found")),
),
state_reporter,
)
.await;
match res {
Expand All @@ -79,8 +83,9 @@ impl TaskService for BatchServiceImpl {
// Will be used for receive task status update.
// Note: we introduce this hack cuz `.execute()` do not produce a status stream,
// but still share `.async_execute()` and `.try_execute()`.
self.mgr
.get_task_receiver(&task::TaskId::from(&task_id.unwrap())),
// self.mgr
// .get_task_receiver(&task::TaskId::from(&task_id.unwrap())),
state_rx,
))),
Err(e) => {
error!("failed to fire task {}", e);
Expand Down Expand Up @@ -122,8 +127,8 @@ impl TaskService for BatchServiceImpl {
let task = BatchTaskExecution::new(&task_id, plan, context, epoch, self.mgr.runtime())?;
let task = Arc::new(task);
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);

if let Err(e) = task.clone().async_execute(Some(tx.clone())).await {
let state_reporter = StateReporter::new_with_local_sender(tx.clone());
if let Err(e) = task.clone().async_execute(state_reporter).await {
error!(
"failed to build executors and trigger execution of Task {:?}: {}",
task_id, e
Expand All @@ -133,7 +138,7 @@ impl TaskService for BatchServiceImpl {

let pb_task_output_id = TaskOutputId {
task_id: Some(task_id.clone()),
// Since this is local execution path, the exchange would follow single distribution,
// Since this is local execution path, the exchange would follo single distribution,
// therefore we would only have one data output.
output_id: 0,
};
Expand All @@ -145,17 +150,13 @@ impl TaskService for BatchServiceImpl {
e
})?;
let mut writer = GrpcExchangeWriter::new(tx.clone());
let finish = output
.take_data_with_num(&mut writer, tx.capacity())
.await?;
if !finish {
self.mgr.runtime().spawn(async move {
match output.take_data(&mut writer).await {
Ok(_) => Ok(()),
Err(e) => tx.send(Err(e.into())).await,
}
});
}
// Always spawn a task and do not block current function.
self.mgr.runtime().spawn(async move {
match output.take_data(&mut writer).await {
Ok(_) => Ok(()),
Err(e) => tx.send(Err(e.into())).await,
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}

0 comments on commit 444389e

Please sign in to comment.