diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 57fdc3c0b..3de58a0be 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -16,6 +16,7 @@ // under the License. use std::any::type_name; +use std::collections::HashMap; use std::future::Future; use std::sync::Arc; use std::time::Instant; @@ -27,6 +28,7 @@ use crate::state::executor_manager::{ExecutorManager, ExecutorReservation}; use crate::state::session_manager::SessionManager; use crate::state::task_manager::TaskManager; +use crate::state::execution_graph::TaskDescription; use ballista_core::error::{BallistaError, Result}; use ballista_core::serde::protobuf::TaskStatus; use ballista_core::serde::{AsExecutionPlan, BallistaCodec}; @@ -196,7 +198,39 @@ impl SchedulerState { + // Put tasks to the same executor together + // And put tasks belonging to the same stage together for creating MultiTaskDefinition + let mut executor_stage_assignments: HashMap< + String, + HashMap<(String, usize), Vec>, + > = HashMap::new(); for (executor_id, task) in assignments.into_iter() { + let stage_key = + (task.partition.job_id.clone(), task.partition.stage_id); + if let Some(tasks) = executor_stage_assignments.get_mut(&executor_id) + { + if let Some(executor_stage_tasks) = tasks.get_mut(&stage_key) { + executor_stage_tasks.push(task); + } else { + tasks.insert(stage_key, vec![task]); + } + } else { + let mut executor_stage_tasks: HashMap< + (String, usize), + Vec, + > = HashMap::new(); + executor_stage_tasks.insert(stage_key, vec![task]); + executor_stage_assignments + .insert(executor_id, executor_stage_tasks); + } + } + + for (executor_id, tasks) in executor_stage_assignments.into_iter() { + let tasks: Vec> = tasks.into_values().collect(); + // Total number of tasks to be launched for one executor + let n_tasks: usize = + tasks.iter().map(|stage_tasks| stage_tasks.len()).sum(); + match self .executor_manager .get_executor_metadata(&executor_id) @@ -205,19 +239,30 @@ impl SchedulerState { if let Err(e) = self .task_manager - .launch_task(&executor, task, &self.executor_manager) + .launch_multi_task( + &executor, + tasks, + &self.executor_manager, + ) .await { error!("Failed to launch new task: {:?}", e); - unassigned_reservations.push( - ExecutorReservation::new_free(executor_id.clone()), - ); + for _i in 0..n_tasks { + unassigned_reservations.push( + ExecutorReservation::new_free( + executor_id.clone(), + ), + ); + } } } Err(e) => { error!("Failed to launch new task, could not get executor metadata: {:?}", e); - unassigned_reservations - .push(ExecutorReservation::new_free(executor_id.clone())); + for _i in 0..n_tasks { + unassigned_reservations.push( + ExecutorReservation::new_free(executor_id.clone()), + ); + } } } } diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index 841c54214..fed7cce3c 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -29,7 +29,8 @@ use ballista_core::error::Result; use crate::state::session_manager::create_datafusion_context; use ballista_core::serde::protobuf::{ - self, job_status, FailedJob, JobStatus, TaskDefinition, TaskStatus, + self, job_status, FailedJob, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId, + TaskStatus, }; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; use ballista_core::serde::scheduler::ExecutorMetadata; @@ -518,6 +519,7 @@ impl TaskManager .await } + #[allow(dead_code)] #[cfg(not(test))] /// Launch the given task on the specified executor pub(crate) async fn launch_task( @@ -544,8 +546,9 @@ impl TaskManager Ok(()) } - /// In unit tests, we do not have actual executors running, so it simplifies things to just noop. + #[allow(dead_code)] #[cfg(test)] + /// In unit tests, we do not have actual executors running, so it simplifies things to just noop. pub(crate) async fn launch_task( &self, _executor: &ExecutorMetadata, @@ -623,6 +626,114 @@ impl TaskManager } } + #[cfg(not(test))] + /// Launch the given tasks on the specified executor + pub(crate) async fn launch_multi_task( + &self, + executor: &ExecutorMetadata, + tasks: Vec>, + executor_manager: &ExecutorManager, + ) -> Result<()> { + info!("Launching multi task on executor {:?}", executor.id); + let multi_tasks: Result> = tasks + .into_iter() + .map(|stage_tasks| self.prepare_multi_task_definition(stage_tasks)) + .collect(); + let multi_tasks = multi_tasks?; + let mut client = executor_manager.get_client(&executor.id).await?; + client + .launch_multi_task(protobuf::LaunchMultiTaskParams { + multi_tasks, + scheduler_id: self.scheduler_id.clone(), + }) + .await + .map_err(|e| { + BallistaError::Internal(format!( + "Failed to connect to executor {}: {:?}", + executor.id, e + )) + })?; + Ok(()) + } + + #[cfg(test)] + /// Launch the given tasks on the specified executor + pub(crate) async fn launch_multi_task( + &self, + _executor: &ExecutorMetadata, + _tasks: Vec>, + _executor_manager: &ExecutorManager, + ) -> Result<()> { + Ok(()) + } + + #[allow(dead_code)] + pub fn prepare_multi_task_definition( + &self, + tasks: Vec, + ) -> Result { + debug!("Preparing multi task definition for {:?}", tasks); + if let Some(task) = tasks.get(0) { + let session_id = task.session_id.clone(); + let job_id = task.partition.job_id.clone(); + let stage_id = task.partition.stage_id; + let stage_attempt_num = task.stage_attempt_num; + + if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) { + let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id) + { + plan.clone() + } else { + let mut plan_buf: Vec = vec![]; + let plan_proto = U::try_from_physical_plan( + task.plan.clone(), + self.codec.physical_extension_codec(), + )?; + plan_proto.try_encode(&mut plan_buf)?; + + job_info + .encoded_stage_plans + .insert(stage_id, plan_buf.clone()); + + plan_buf + }; + let output_partitioning = + hash_partitioning_to_proto(task.output_partitioning.as_ref())?; + + let task_ids = tasks + .iter() + .map(|task| TaskId { + task_id: task.task_id as u32, + task_attempt_num: task.task_attempt as u32, + partition_id: task.partition.partition_id as u32, + }) + .collect(); + + let multi_task_definition = MultiTaskDefinition { + task_ids, + job_id, + stage_id: stage_id as u32, + stage_attempt_num: stage_attempt_num as u32, + plan, + output_partitioning, + session_id, + launch_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + props: vec![], + }; + Ok(multi_task_definition) + } else { + Err(BallistaError::General(format!("Cannot prepare multi task definition for job {} which is not in active cache", job_id))) + } + } else { + Err(BallistaError::General( + "Cannot prepare multi task definition for an empty vec".to_string(), + )) + } + } + /// Get the `ExecutionGraph` for the given job ID from cache pub(crate) async fn get_active_execution_graph( &self,