Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] refactor the offer_reservation avoid wait result #760

Merged
merged 1 commit into from
May 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 97 additions & 96 deletions ballista/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,112 +195,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
&self,
reservations: Vec<ExecutorReservation>,
) -> Result<(Vec<ExecutorReservation>, usize)> {
let (free_list, pending_tasks) = match self
.task_manager
.fill_reservations(&reservations)
.await
let pending_tasks = match self.task_manager.fill_reservations(&reservations).await
{
Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
// Put tasks to the same executor together
// And put tasks belonging to the same stage together for creating MultiTaskDefinition
let mut executor_stage_assignments: HashMap<
String,
HashMap<(String, usize), Vec<TaskDescription>>,
> = HashMap::new();
for (executor_id, task) in assignments.into_iter() {
let stage_key =
(task.partition.job_id.clone(), task.partition.stage_id);
if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id)
{
if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
executor_stage_tasks.push(task);
} else {
tasks.insert(stage_key, vec![task]);
}
} else {
let mut executor_stage_tasks: HashMap<
(String, usize),
Vec<TaskDescription>,
> = HashMap::new();
executor_stage_tasks.insert(stage_key, vec![task]);
executor_stage_assignments
.insert(executor_id, executor_stage_tasks);
}
}

let mut join_handles = vec![];
for (executor_id, tasks) in executor_stage_assignments.into_iter() {
let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
// Total number of tasks to be launched for one executor
let n_tasks: usize =
tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();

let task_manager = self.task_manager.clone();
let executor_manager = self.executor_manager.clone();
let join_handle = tokio::spawn(async move {
let success = match executor_manager
.get_executor_metadata(&executor_id)
.await
{
Ok(executor) => {
if let Err(e) = task_manager
.launch_multi_task(
&executor,
tasks,
&executor_manager,
)
.await
{
error!("Failed to launch new task: {:?}", e);
false
} else {
true
}
}
Err(e) => {
error!("Failed to launch new task, could not get executor metadata: {:?}", e);
false
}
};
if success {
vec![]
} else {
vec![
ExecutorReservation::new_free(executor_id.clone(),);
n_tasks
]
}
});
join_handles.push(join_handle);
}
Ok((assignments, unassigned_reservations, pending_tasks)) => {
let executor_stage_assignments = Self::combine_task(assignments);

let unassigned_executor_reservations =
futures::future::join_all(join_handles)
.await
.into_iter()
.collect::<std::result::Result<
Vec<Vec<ExecutorReservation>>,
tokio::task::JoinError,
>>()?;
unassigned_reservations.append(
&mut unassigned_executor_reservations
.into_iter()
.flatten()
.collect::<Vec<ExecutorReservation>>(),
self.spawn_tasks_and_persist_reservations_back(
executor_stage_assignments,
unassigned_reservations,
);
(unassigned_reservations, pending_tasks)

pending_tasks
}
// If error set all reservations back
Err(e) => {
error!("Error filling reservations: {:?}", e);
(reservations, 0)
self.executor_manager
.cancel_reservations(reservations)
.await?;
0
}
};

let mut new_reservations = vec![];
if !free_list.is_empty() {
// If any reserved slots remain, return them to the pool
self.executor_manager.cancel_reservations(free_list).await?;
} else if pending_tasks > 0 {

if pending_tasks > 0 {
// If there are pending tasks available, try and schedule them
let pending_reservations = self
.executor_manager
Expand All @@ -312,6 +231,86 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
Ok((new_reservations, pending_tasks))
}

fn spawn_tasks_and_persist_reservations_back(
&self,
executor_stage_assignments: HashMap<
String,
HashMap<(String, usize), Vec<TaskDescription>>,
>,
mut unassigned_reservations: Vec<ExecutorReservation>,
) {
let task_manager = self.task_manager.clone();
let executor_manager = self.executor_manager.clone();

tokio::spawn(async move {
for (executor_id, tasks) in executor_stage_assignments.into_iter() {
let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
// Total number of tasks to be launched for one executor
let n_tasks: usize =
tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();

match executor_manager.get_executor_metadata(&executor_id).await {
Ok(executor) => {
if let Err(e) = task_manager
.launch_multi_task(&executor, tasks, &executor_manager)
.await
{
error!("Failed to launch new task: {:?}", e);
// set resource back.
unassigned_reservations.append(&mut vec![
ExecutorReservation::new_free(
executor_id.clone(),
);
n_tasks
]);
}
}
Err(e) => {
error!("Failed to launch new task, could not get executor metadata: {:?}", e);
// here no need set resource back.
}
};
}
if !unassigned_reservations.is_empty() {
// If any reserved slots remain, return them to the pool
executor_manager
.cancel_reservations(unassigned_reservations)
.await
.expect("cancel_reservations fail!");
}
});
}

// Put tasks to the same executor together
// And put tasks belonging to the same stage together for creating MultiTaskDefinition
// return a map of <executor_id, <stage_key, TaskDesc>>.
fn combine_task(
assignments: Vec<(String, TaskDescription)>,
) -> HashMap<String, HashMap<(String, usize), Vec<TaskDescription>>> {
let mut executor_stage_assignments: HashMap<
String,
HashMap<(String, usize), Vec<TaskDescription>>,
> = HashMap::new();
for (executor_id, task) in assignments.into_iter() {
let stage_key = (task.partition.job_id.clone(), task.partition.stage_id);
if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id) {
if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
executor_stage_tasks.push(task);
} else {
tasks.insert(stage_key, vec![task]);
}
} else {
let mut executor_stage_tasks: HashMap<
(String, usize),
Vec<TaskDescription>,
> = HashMap::new();
executor_stage_tasks.insert(stage_key, vec![task]);
executor_stage_assignments.insert(executor_id, executor_stage_tasks);
}
}
executor_stage_assignments
}

pub(crate) async fn submit_job(
&self,
job_id: &str,
Expand Down Expand Up @@ -459,6 +458,8 @@ mod test {
assert_eq!(assigned, 0);
assert!(result.is_empty());

// Need sleep wait for the spawn task work done.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// All reservations should have been cancelled so we should be able to reserve them now
let reservations = state.executor_manager.reserve_slots(4).await?;

Expand Down