Skip to content

Commit 7c7a997

Browse files
Make it concurrently to launch tasks to executors (#557)
* Make it concurrently to launch tasks to executors * Refine for comments Co-authored-by: yangzhong <[email protected]>
1 parent a7c527f commit 7c7a997

File tree

1 file changed

+49
-30
lines changed
  • ballista/scheduler/src/state

1 file changed

+49
-30
lines changed

ballista/scheduler/src/state/mod.rs

+49-30
Original file line numberDiff line numberDiff line change
@@ -236,47 +236,66 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
236236
}
237237
}
238238

239+
let mut join_handles = vec![];
239240
for (executor_id, tasks) in executor_stage_assignments.into_iter() {
240241
let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
241242
// Total number of tasks to be launched for one executor
242243
let n_tasks: usize =
243244
tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
244245

245-
match self
246-
.executor_manager
247-
.get_executor_metadata(&executor_id)
248-
.await
249-
{
250-
Ok(executor) => {
251-
if let Err(e) = self
252-
.task_manager
253-
.launch_multi_task(
254-
&executor,
255-
tasks,
256-
&self.executor_manager,
257-
)
258-
.await
259-
{
260-
error!("Failed to launch new task: {:?}", e);
261-
for _i in 0..n_tasks {
262-
unassigned_reservations.push(
263-
ExecutorReservation::new_free(
264-
executor_id.clone(),
265-
),
266-
);
246+
let task_manager = self.task_manager.clone();
247+
let executor_manager = self.executor_manager.clone();
248+
let join_handle = tokio::spawn(async move {
249+
let success = match executor_manager
250+
.get_executor_metadata(&executor_id)
251+
.await
252+
{
253+
Ok(executor) => {
254+
if let Err(e) = task_manager
255+
.launch_multi_task(
256+
&executor,
257+
tasks,
258+
&executor_manager,
259+
)
260+
.await
261+
{
262+
error!("Failed to launch new task: {:?}", e);
263+
false
264+
} else {
265+
true
267266
}
268267
}
269-
}
270-
Err(e) => {
271-
error!("Failed to launch new task, could not get executor metadata: {:?}", e);
272-
for _i in 0..n_tasks {
273-
unassigned_reservations.push(
274-
ExecutorReservation::new_free(executor_id.clone()),
275-
);
268+
Err(e) => {
269+
error!("Failed to launch new task, could not get executor metadata: {:?}", e);
270+
false
276271
}
272+
};
273+
if success {
274+
vec![]
275+
} else {
276+
vec![
277+
ExecutorReservation::new_free(executor_id.clone(),);
278+
n_tasks
279+
]
277280
}
278-
}
281+
});
282+
join_handles.push(join_handle);
279283
}
284+
285+
let unassigned_executor_reservations =
286+
futures::future::join_all(join_handles)
287+
.await
288+
.into_iter()
289+
.collect::<std::result::Result<
290+
Vec<Vec<ExecutorReservation>>,
291+
tokio::task::JoinError,
292+
>>()?;
293+
unassigned_reservations.append(
294+
&mut unassigned_executor_reservations
295+
.into_iter()
296+
.flatten()
297+
.collect::<Vec<ExecutorReservation>>(),
298+
);
280299
(unassigned_reservations, pending_tasks)
281300
}
282301
Err(e) => {

0 commit comments

Comments
 (0)