Skip to content

Commit c3d1130

Browse files
committed
Improve launch task efficiency by calling LaunchMultiTask
1 parent 8c49b92 commit c3d1130

File tree

2 files changed

+164
-8
lines changed

2 files changed

+164
-8
lines changed

ballista/scheduler/src/state/mod.rs

+51-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use std::any::type_name;
19+
use std::collections::HashMap;
1920
use std::future::Future;
2021
use std::sync::Arc;
2122
use std::time::Instant;
@@ -27,6 +28,7 @@ use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
2728
use crate::state::session_manager::SessionManager;
2829
use crate::state::task_manager::TaskManager;
2930

31+
use crate::state::execution_graph::TaskDescription;
3032
use ballista_core::error::{BallistaError, Result};
3133
use ballista_core::serde::protobuf::TaskStatus;
3234
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
@@ -196,7 +198,39 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
196198
.await
197199
{
198200
Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
201+
// Put tasks to the same executor together
202+
// And put tasks belonging to the same stage together for creating MultiTaskDefinition
203+
let mut executor_stage_assignments: HashMap<
204+
String,
205+
HashMap<(String, usize), Vec<TaskDescription>>,
206+
> = HashMap::new();
199207
for (executor_id, task) in assignments.into_iter() {
208+
let stage_key =
209+
(task.partition.job_id.clone(), task.partition.stage_id);
210+
if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id)
211+
{
212+
if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) {
213+
executor_stage_tasks.push(task);
214+
} else {
215+
tasks.insert(stage_key, vec![task]);
216+
}
217+
} else {
218+
let mut executor_stage_tasks: HashMap<
219+
(String, usize),
220+
Vec<TaskDescription>,
221+
> = HashMap::new();
222+
executor_stage_tasks.insert(stage_key, vec![task]);
223+
executor_stage_assignments
224+
.insert(executor_id, executor_stage_tasks);
225+
}
226+
}
227+
228+
for (executor_id, tasks) in executor_stage_assignments.into_iter() {
229+
let tasks: Vec<Vec<TaskDescription>> = tasks.into_values().collect();
230+
// Total number of tasks to be launched for one executor
231+
let n_tasks: usize =
232+
tasks.iter().map(|stage_tasks| stage_tasks.len()).sum();
233+
200234
match self
201235
.executor_manager
202236
.get_executor_metadata(&executor_id)
@@ -205,19 +239,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
205239
Ok(executor) => {
206240
if let Err(e) = self
207241
.task_manager
208-
.launch_task(&executor, task, &self.executor_manager)
242+
.launch_multi_task(
243+
&executor,
244+
tasks,
245+
&self.executor_manager,
246+
)
209247
.await
210248
{
211249
error!("Failed to launch new task: {:?}", e);
212-
unassigned_reservations.push(
213-
ExecutorReservation::new_free(executor_id.clone()),
214-
);
250+
for _i in 0..n_tasks {
251+
unassigned_reservations.push(
252+
ExecutorReservation::new_free(
253+
executor_id.clone(),
254+
),
255+
);
256+
}
215257
}
216258
}
217259
Err(e) => {
218260
error!("Failed to launch new task, could not get executor metadata: {:?}", e);
219-
unassigned_reservations
220-
.push(ExecutorReservation::new_free(executor_id.clone()));
261+
for _i in 0..n_tasks {
262+
unassigned_reservations.push(
263+
ExecutorReservation::new_free(executor_id.clone()),
264+
);
265+
}
221266
}
222267
}
223268
}

ballista/scheduler/src/state/task_manager.rs

+113-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use ballista_core::error::Result;
2929

3030
use crate::state::session_manager::create_datafusion_context;
3131
use ballista_core::serde::protobuf::{
32-
self, job_status, FailedJob, JobStatus, TaskDefinition, TaskStatus,
32+
self, job_status, FailedJob, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId,
33+
TaskStatus,
3334
};
3435
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
3536
use ballista_core::serde::scheduler::ExecutorMetadata;
@@ -518,6 +519,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
518519
.await
519520
}
520521

522+
#[allow(dead_code)]
521523
#[cfg(not(test))]
522524
/// Launch the given task on the specified executor
523525
pub(crate) async fn launch_task(
@@ -544,8 +546,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
544546
Ok(())
545547
}
546548

547-
/// In unit tests, we do not have actual executors running, so it simplifies things to just noop.
549+
#[allow(dead_code)]
548550
#[cfg(test)]
551+
/// In unit tests, we do not have actual executors running, so it simplifies things to just noop.
549552
pub(crate) async fn launch_task(
550553
&self,
551554
_executor: &ExecutorMetadata,
@@ -623,6 +626,114 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
623626
}
624627
}
625628

629+
#[cfg(not(test))]
630+
/// Launch the given tasks on the specified executor
631+
pub(crate) async fn launch_multi_task(
632+
&self,
633+
executor: &ExecutorMetadata,
634+
tasks: Vec<Vec<TaskDescription>>,
635+
executor_manager: &ExecutorManager,
636+
) -> Result<()> {
637+
info!("Launching multi task on executor {:?}", executor.id);
638+
let multi_tasks: Result<Vec<MultiTaskDefinition>> = tasks
639+
.into_iter()
640+
.map(|stage_tasks| self.prepare_multi_task_definition(stage_tasks))
641+
.collect();
642+
let multi_tasks = multi_tasks?;
643+
let mut client = executor_manager.get_client(&executor.id).await?;
644+
client
645+
.launch_multi_task(protobuf::LaunchMultiTaskParams {
646+
multi_tasks,
647+
scheduler_id: self.scheduler_id.clone(),
648+
})
649+
.await
650+
.map_err(|e| {
651+
BallistaError::Internal(format!(
652+
"Failed to connect to executor {}: {:?}",
653+
executor.id, e
654+
))
655+
})?;
656+
Ok(())
657+
}
658+
659+
#[cfg(test)]
660+
/// Launch the given tasks on the specified executor
661+
pub(crate) async fn launch_multi_task(
662+
&self,
663+
_executor: &ExecutorMetadata,
664+
_tasks: Vec<Vec<TaskDescription>>,
665+
_executor_manager: &ExecutorManager,
666+
) -> Result<()> {
667+
Ok(())
668+
}
669+
670+
#[allow(dead_code)]
671+
pub fn prepare_multi_task_definition(
672+
&self,
673+
tasks: Vec<TaskDescription>,
674+
) -> Result<MultiTaskDefinition> {
675+
debug!("Preparing multi task definition for {:?}", tasks);
676+
if let Some(task) = tasks.get(0) {
677+
let session_id = task.session_id.clone();
678+
let job_id = task.partition.job_id.clone();
679+
let stage_id = task.partition.stage_id;
680+
let stage_attempt_num = task.stage_attempt_num;
681+
682+
if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
683+
let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id)
684+
{
685+
plan.clone()
686+
} else {
687+
let mut plan_buf: Vec<u8> = vec![];
688+
let plan_proto = U::try_from_physical_plan(
689+
task.plan.clone(),
690+
self.codec.physical_extension_codec(),
691+
)?;
692+
plan_proto.try_encode(&mut plan_buf)?;
693+
694+
job_info
695+
.encoded_stage_plans
696+
.insert(stage_id, plan_buf.clone());
697+
698+
plan_buf
699+
};
700+
let output_partitioning =
701+
hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
702+
703+
let task_ids = tasks
704+
.iter()
705+
.map(|task| TaskId {
706+
task_id: task.task_id as u32,
707+
task_attempt_num: task.task_attempt as u32,
708+
partition_id: task.partition.partition_id as u32,
709+
})
710+
.collect();
711+
712+
let multi_task_definition = MultiTaskDefinition {
713+
task_ids,
714+
job_id,
715+
stage_id: stage_id as u32,
716+
stage_attempt_num: stage_attempt_num as u32,
717+
plan,
718+
output_partitioning,
719+
session_id,
720+
launch_time: SystemTime::now()
721+
.duration_since(UNIX_EPOCH)
722+
.unwrap()
723+
.as_millis() as u64,
724+
props: vec![],
725+
};
726+
Ok(multi_task_definition)
727+
} else {
728+
Err(BallistaError::General(format!("Cannot prepare multi task definition for job {} which is not in active cache", job_id)))
729+
}
730+
} else {
731+
Err(BallistaError::General(
732+
"Cannot prepare multi task definition for an empty vec".to_string(),
733+
))
734+
}
735+
}
736+
626737
/// Get the `ExecutionGraph` for the given job ID from cache
627738
pub(crate) async fn get_active_execution_graph(
628739
&self,

0 commit comments

Comments
 (0)