From a6cdc517531a8fa685b735fb0ddd0c78c2597da7 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 26 Apr 2023 10:51:37 +0800 Subject: [PATCH] [Improve] refactor the offer_reservation avoid wait result --- ballista/scheduler/src/state/mod.rs | 193 ++++++++++++++-------------- 1 file changed, 97 insertions(+), 96 deletions(-) diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 31675cbe0..483828ccc 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -195,112 +195,31 @@ impl SchedulerState, ) -> Result<(Vec, usize)> { - let (free_list, pending_tasks) = match self - .task_manager - .fill_reservations(&reservations) - .await + let pending_tasks = match self.task_manager.fill_reservations(&reservations).await { - Ok((assignments, mut unassigned_reservations, pending_tasks)) => { - // Put tasks to the same executor together - // And put tasks belonging to the same stage together for creating MultiTaskDefinition - let mut executor_stage_assignments: HashMap< - String, - HashMap<(String, usize), Vec>, - > = HashMap::new(); - for (executor_id, task) in assignments.into_iter() { - let stage_key = - (task.partition.job_id.clone(), task.partition.stage_id); - if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id) - { - if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) { - executor_stage_tasks.push(task); - } else { - tasks.insert(stage_key, vec![task]); - } - } else { - let mut executor_stage_tasks: HashMap< - (String, usize), - Vec, - > = HashMap::new(); - executor_stage_tasks.insert(stage_key, vec![task]); - executor_stage_assignments - .insert(executor_id, executor_stage_tasks); - } - } - - let mut join_handles = vec![]; - for (executor_id, tasks) in executor_stage_assignments.into_iter() { - let tasks: Vec> = tasks.into_values().collect(); - // Total number of tasks to be launched for one executor - let n_tasks: usize = - tasks.iter().map(|stage_tasks| stage_tasks.len()).sum(); - - let task_manager = self.task_manager.clone(); - let executor_manager = self.executor_manager.clone(); - let join_handle = tokio::spawn(async move { - let success = match executor_manager - .get_executor_metadata(&executor_id) - .await - { - Ok(executor) => { - if let Err(e) = task_manager - .launch_multi_task( - &executor, - tasks, - &executor_manager, - ) - .await - { - error!("Failed to launch new task: {:?}", e); - false - } else { - true - } - } - Err(e) => { - error!("Failed to launch new task, could not get executor metadata: {:?}", e); - false - } - }; - if success { - vec![] - } else { - vec![ - ExecutorReservation::new_free(executor_id.clone(),); - n_tasks - ] - } - }); - join_handles.push(join_handle); - } + Ok((assignments, unassigned_reservations, pending_tasks)) => { + let executor_stage_assignments = Self::combine_task(assignments); - let unassigned_executor_reservations = - futures::future::join_all(join_handles) - .await - .into_iter() - .collect::>, - tokio::task::JoinError, - >>()?; - unassigned_reservations.append( - &mut unassigned_executor_reservations - .into_iter() - .flatten() - .collect::>(), + self.spawn_tasks_and_persist_reservations_back( + executor_stage_assignments, + unassigned_reservations, ); - (unassigned_reservations, pending_tasks) + + pending_tasks } + // If error set all reservations back Err(e) => { error!("Error filling reservations: {:?}", e); - (reservations, 0) + self.executor_manager + .cancel_reservations(reservations) + .await?; + 0 } }; let mut new_reservations = vec![]; - if !free_list.is_empty() { - // If any reserved slots remain, return them to the pool - self.executor_manager.cancel_reservations(free_list).await?; - } else if pending_tasks > 0 { + + if pending_tasks > 0 { // If there are pending tasks available, try and schedule them let pending_reservations = self .executor_manager @@ -312,6 +231,86 @@ impl SchedulerState>, + >, + mut unassigned_reservations: Vec, + ) { + let task_manager = self.task_manager.clone(); + let executor_manager = self.executor_manager.clone(); + + tokio::spawn(async move { + for (executor_id, tasks) in executor_stage_assignments.into_iter() { + let tasks: Vec> = tasks.into_values().collect(); + // Total number of tasks to be launched for one executor + let n_tasks: usize = + tasks.iter().map(|stage_tasks| stage_tasks.len()).sum(); + + match executor_manager.get_executor_metadata(&executor_id).await { + Ok(executor) => { + if let Err(e) = task_manager + .launch_multi_task(&executor, tasks, &executor_manager) + .await + { + error!("Failed to launch new task: {:?}", e); + // set resource back. + unassigned_reservations.append(&mut vec![ + ExecutorReservation::new_free( + executor_id.clone(), + ); + n_tasks + ]); + } + } + Err(e) => { + error!("Failed to launch new task, could not get executor metadata: {:?}", e); + // here no need set resource back. + } + }; + } + if !unassigned_reservations.is_empty() { + // If any reserved slots remain, return them to the pool + executor_manager + .cancel_reservations(unassigned_reservations) + .await + .expect("cancel_reservations fail!"); + } + }); + } + + // Put tasks to the same executor together + // And put tasks belonging to the same stage together for creating MultiTaskDefinition + // return a map of >. + fn combine_task( + assignments: Vec<(String, TaskDescription)>, + ) -> HashMap>> { + let mut executor_stage_assignments: HashMap< + String, + HashMap<(String, usize), Vec>, + > = HashMap::new(); + for (executor_id, task) in assignments.into_iter() { + let stage_key = (task.partition.job_id.clone(), task.partition.stage_id); + if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id) { + if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) { + executor_stage_tasks.push(task); + } else { + tasks.insert(stage_key, vec![task]); + } + } else { + let mut executor_stage_tasks: HashMap< + (String, usize), + Vec, + > = HashMap::new(); + executor_stage_tasks.insert(stage_key, vec![task]); + executor_stage_assignments.insert(executor_id, executor_stage_tasks); + } + } + executor_stage_assignments + } + pub(crate) async fn submit_job( &self, job_id: &str, @@ -459,6 +458,8 @@ mod test { assert_eq!(assigned, 0); assert!(result.is_empty()); + // Need sleep wait for the spawn task work done. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; // All reservations should have been cancelled so we should be able to reserve them now let reservations = state.executor_manager.reserve_slots(4).await?;