diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 3c5802804..0671fdbe2 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -236,47 +236,66 @@ impl SchedulerState> = tasks.into_values().collect(); // Total number of tasks to be launched for one executor let n_tasks: usize = tasks.iter().map(|stage_tasks| stage_tasks.len()).sum(); - match self - .executor_manager - .get_executor_metadata(&executor_id) - .await - { - Ok(executor) => { - if let Err(e) = self - .task_manager - .launch_multi_task( - &executor, - tasks, - &self.executor_manager, - ) - .await - { - error!("Failed to launch new task: {:?}", e); - for _i in 0..n_tasks { - unassigned_reservations.push( - ExecutorReservation::new_free( - executor_id.clone(), - ), - ); + let task_manager = self.task_manager.clone(); + let executor_manager = self.executor_manager.clone(); + let join_handle = tokio::spawn(async move { + let success = match executor_manager + .get_executor_metadata(&executor_id) + .await + { + Ok(executor) => { + if let Err(e) = task_manager + .launch_multi_task( + &executor, + tasks, + &executor_manager, + ) + .await + { + error!("Failed to launch new task: {:?}", e); + false + } else { + true } } - } - Err(e) => { - error!("Failed to launch new task, could not get executor metadata: {:?}", e); - for _i in 0..n_tasks { - unassigned_reservations.push( - ExecutorReservation::new_free(executor_id.clone()), - ); + Err(e) => { + error!("Failed to launch new task, could not get executor metadata: {:?}", e); + false } + }; + if success { + vec![] + } else { + vec![ + ExecutorReservation::new_free(executor_id.clone(),); + n_tasks + ] } - } + }); + join_handles.push(join_handle); } + + let unassigned_executor_reservations = + futures::future::join_all(join_handles) + .await + .into_iter() + .collect::>, + tokio::task::JoinError, + >>()?; + unassigned_reservations.append( + &mut unassigned_executor_reservations + .into_iter() + .flatten() + .collect::>(), + ); (unassigned_reservations, pending_tasks) } Err(e) => {