From b88f1034bf4fe107e72fb1e4b528f0edc6688ab9 Mon Sep 17 00:00:00 2001 From: yahoNanJing <90197956+yahoNanJing@users.noreply.github.com> Date: Thu, 17 Mar 2022 14:57:04 +0800 Subject: [PATCH] Introduce StageManager for managing tasks stage by stage (#1983) * Introduce StageManager * Add unit test for both of the event-pull-based task scheduling and event-push-based task scheduling * Fix clippy * Fix for PR review * Fix for PR review * Fix clippy caused by #[cfg(test)] and #[cfg(not(test))] Co-authored-by: yangzhong --- ballista/rust/scheduler/Cargo.toml | 1 + ballista/rust/scheduler/src/planner.rs | 20 + .../scheduler/src/scheduler_server/event.rs | 33 + .../src/scheduler_server/event_loop.rs | 30 +- .../src/scheduler_server/external_scaler.rs | 10 +- .../scheduler/src/scheduler_server/grpc.rs | 242 +++-- .../scheduler/src/scheduler_server/mod.rs | 370 +++++++- .../scheduler_server/query_stage_scheduler.rs | 465 ++++++++-- ...in_memory_state.rs => executor_manager.rs} | 91 +- ballista/rust/scheduler/src/state/mod.rs | 827 +----------------- .../scheduler/src/state/persistent_state.rs | 3 +- .../rust/scheduler/src/state/stage_manager.rs | 782 +++++++++++++++++ .../scheduler/src/state/task_scheduler.rs | 160 ++-- 13 files changed, 1844 insertions(+), 1190 deletions(-) create mode 100644 ballista/rust/scheduler/src/scheduler_server/event.rs rename ballista/rust/scheduler/src/state/{in_memory_state.rs => executor_manager.rs} (68%) create mode 100644 ballista/rust/scheduler/src/state/stage_manager.rs diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index 2ff0073756a1..0d6df9012e3e 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -55,6 +55,7 @@ tower = { version = "0.4" } warp = "0.3" parking_lot = "0.12" async-trait = "0.1.41" +async-recursion = "1.0.0" [dev-dependencies] ballista-core = { path = "../core", version = "0.6.0" } diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index b18b213499ea..8b5751a9ebd6 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -176,6 +176,26 @@ impl DistributedPlanner { } } +/// Returns the unresolved shuffles in the execution plan +pub fn find_unresolved_shuffles( + plan: &Arc, +) -> Result> { + if let Some(unresolved_shuffle) = + plan.as_any().downcast_ref::() + { + Ok(vec![unresolved_shuffle.clone()]) + } else { + Ok(plan + .children() + .iter() + .map(find_unresolved_shuffles) + .collect::>>()? + .into_iter() + .flatten() + .collect()) + } +} + pub fn remove_unresolved_shuffles( stage: &dyn ExecutionPlan, partition_locations: &HashMap>>, diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs b/ballista/rust/scheduler/src/scheduler_server/event.rs new file mode 100644 index 000000000000..9252453e6fe3 --- /dev/null +++ b/ballista/rust/scheduler/src/scheduler_server/event.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::physical_plan::ExecutionPlan; +use std::sync::Arc; + +#[derive(Clone)] +pub(crate) enum SchedulerServerEvent { + // number of offer rounds + ReviveOffers(u32), +} + +#[derive(Clone)] +pub enum QueryStageSchedulerEvent { + JobSubmitted(String, Arc), + StageFinished(String, u32), + JobFinished(String), + JobFailed(String, u32, String), +} diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs index a7d656c94618..c343743ca35f 100644 --- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs +++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs @@ -21,6 +21,7 @@ use std::time::Duration; use async_trait::async_trait; use log::{debug, warn}; +use crate::scheduler_server::event::SchedulerServerEvent; use ballista_core::error::{BallistaError, Result}; use ballista_core::event_loop::EventAction; use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition}; @@ -31,11 +32,6 @@ use crate::scheduler_server::ExecutorsClient; use crate::state::task_scheduler::TaskScheduler; use crate::state::SchedulerState; -#[derive(Clone)] -pub(crate) enum SchedulerServerEvent { - JobSubmitted(String), -} - pub(crate) struct SchedulerServerEventAction< T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan, @@ -57,17 +53,16 @@ impl } } - async fn offer_resources( - &self, - job_id: String, - ) -> Result> { - let mut available_executors = self.state.get_available_executors_data(); + #[allow(unused_variables)] + async fn offer_resources(&self, n: u32) -> Result> { + let mut available_executors = + self.state.executor_manager.get_available_executors_data(); // In case of there's no enough resources, reschedule the tasks of the job if available_executors.is_empty() { // TODO Maybe it's better to use an exclusive runtime for this kind task scheduling warn!("Not enough available executors for task running"); tokio::time::sleep(Duration::from_millis(100)).await; - return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id))); + return Ok(Some(SchedulerServerEvent::ReviveOffers(1))); } let mut executors_data_change: Vec = available_executors @@ -80,7 +75,7 @@ impl let (tasks_assigment, num_tasks) = self .state - .fetch_tasks(&mut available_executors, &job_id) + .fetch_schedulable_tasks(&mut available_executors, n) .await?; for (data_change, data) in executors_data_change .iter_mut() @@ -90,6 +85,7 @@ impl data.available_task_slots as i32 - data_change.task_slots; } + #[cfg(not(test))] if num_tasks > 0 { self.launch_tasks(&executors_data_change, tasks_assigment) .await?; @@ -98,6 +94,7 @@ impl Ok(None) } + #[allow(dead_code)] async fn launch_tasks( &self, executors: &[ExecutorDataChange], @@ -132,10 +129,11 @@ impl .unwrap() .clone() }; - // Update the resources first - self.state.update_executor_data(executor_data_change); // TODO check whether launching task is successful or not client.launch_task(LaunchTaskParams { task: tasks }).await?; + self.state + .executor_manager + .update_executor_data(executor_data_change); } else { // Since the task assignment policy is round robin, // if find tasks for one executor is empty, just break fast @@ -162,9 +160,7 @@ impl event: SchedulerServerEvent, ) -> Result> { match event { - SchedulerServerEvent::JobSubmitted(job_id) => { - self.offer_resources(job_id).await - } + SchedulerServerEvent::ReviveOffers(n) => self.offer_resources(n).await, } } diff --git a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs index 13fb7b592d96..4b3966df2701 100644 --- a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs +++ b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs @@ -20,7 +20,6 @@ use crate::scheduler_server::externalscaler::{ GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef, }; use crate::scheduler_server::SchedulerServer; -use ballista_core::serde::protobuf::task_status; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use log::debug; use tonic::{Request, Response}; @@ -35,14 +34,7 @@ impl ExternalScaler &self, _request: Request, ) -> Result, tonic::Status> { - let tasks = self.state.get_all_tasks(); - let result = tasks.iter().any(|task| { - !matches!( - task.status, - Some(task_status::Status::Completed(_)) - | Some(task_status::Status::Failed(_)) - ) - }); + let result = self.state.stage_manager.has_running_tasks(); debug!("Are there active tasks? {}", result); Ok(Response::new(IsActiveResponse { result })) } diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index aaa98fb7207c..d6f95fb2adb6 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -17,7 +17,7 @@ use anyhow::Context; use ballista_core::config::TaskSchedulingPolicy; -use ballista_core::execution_plans::ShuffleWriterExec; +use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::execute_query_params::Query; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; use ballista_core::serde::protobuf::executor_registration::OptionalHost; @@ -27,9 +27,8 @@ use ballista_core::serde::protobuf::{ FileType, GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PollWorkParams, PollWorkResult, QueuedJob, RegisterExecutorParams, RegisterExecutorResult, - TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult, + UpdateTaskStatusParams, UpdateTaskStatusResult, }; -use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; use ballista_core::serde::scheduler::{ ExecutorData, ExecutorDataChange, ExecutorMetadata, }; @@ -40,15 +39,15 @@ use datafusion::datasource::object_store::{local::LocalFileSystem, ObjectStore}; use futures::StreamExt; use log::{debug, error, info, trace, warn}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; -use std::collections::HashSet; use std::convert::TryInto; use std::sync::Arc; +use std::time::Instant; use std::time::{SystemTime, UNIX_EPOCH}; use tonic::{Request, Response, Status}; -use crate::scheduler_server::event_loop::SchedulerServerEvent; -use crate::scheduler_server::query_stage_scheduler::QueryStageSchedulerEvent; +use crate::scheduler_server::event::QueryStageSchedulerEvent; use crate::scheduler_server::SchedulerServer; +use crate::state::task_scheduler::TaskScheduler; #[tonic::async_trait] impl SchedulerGrpc @@ -93,8 +92,7 @@ impl SchedulerGrpc state: None, }; // In case that it's the first time to poll work, do registration - if let Some(_executor_meta) = self.state.get_executor_metadata(&metadata.id) { - } else { + if self.state.get_executor_metadata(&metadata.id).is_none() { self.state .save_executor_metadata(metadata.clone()) .await @@ -104,72 +102,40 @@ impl SchedulerGrpc tonic::Status::internal(msg) })?; } - self.state.save_executor_heartbeat(executor_heartbeat); - for task_status in task_status { - self.state - .save_task_status(&task_status) - .await - .map_err(|e| { - let msg = format!("Could not save task status: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - } + self.state + .executor_manager + .save_executor_heartbeat(executor_heartbeat); + self.update_task_status(task_status).await.map_err(|e| { + let msg = format!( + "Fail to update tasks status from executor {:?} due to {:?}", + &metadata.id, e + ); + error!("{}", msg); + tonic::Status::internal(msg) + })?; let task: Result, Status> = if can_accept_task { - let plan = self + let mut executors_data = vec![ExecutorData { + executor_id: metadata.id.clone(), + total_task_slots: 1, + available_task_slots: 1, + }]; + let (mut tasks, num_tasks) = self .state - .assign_next_schedulable_task(&metadata.id) + .fetch_schedulable_tasks(&mut executors_data, 1) .await .map_err(|e| { let msg = format!("Error finding next assignable task: {}", e); error!("{}", msg); tonic::Status::internal(msg) })?; - if let Some((task, _plan)) = &plan { - let task_id = task.task_id.as_ref().unwrap(); - info!( - "Sending new task to {}: {}/{}/{}", - metadata.id, - task_id.job_id, - task_id.stage_id, - task_id.partition_id - ); - } - match plan { - Some((status, plan)) => { - let plan_clone = plan.clone(); - let output_partitioning = if let Some(shuffle_writer) = - plan_clone.as_any().downcast_ref::() - { - shuffle_writer.shuffle_output_partitioning() - } else { - return Err(Status::invalid_argument(format!( - "Task root plan was not a ShuffleWriterExec: {:?}", - plan_clone - ))); - }; - let mut buf: Vec = vec![]; - U::try_from_physical_plan( - plan, - self.codec.physical_extension_codec(), - ) - .and_then(|m| m.try_encode(&mut buf)) - .map_err(|e| { - Status::internal(format!( - "error serializing execution plan: {:?}", - e - )) - })?; - Ok(Some(TaskDefinition { - plan: buf, - task_id: status.task_id, - output_partitioning: hash_partitioning_to_proto( - output_partitioning, - ) - .map_err(|_| Status::internal("TBD".to_string()))?, - })) - } - None => Ok(None), + if num_tasks == 0 { + Ok(None) + } else { + assert_eq!(tasks.len(), 1); + let mut task = tasks.pop().unwrap(); + assert_eq!(task.len(), 1); + let task = task.pop().unwrap(); + Ok(Some(task)) } } else { Ok(None) @@ -232,7 +198,9 @@ impl SchedulerGrpc total_task_slots: metadata.specification.task_slots, available_task_slots: metadata.specification.task_slots, }; - self.state.save_executor_data(executor_data); + self.state + .executor_manager + .save_executor_data(executor_data); Ok(Response::new(RegisterExecutorResult { success: true })) } else { warn!("Received invalid register executor request"); @@ -258,7 +226,9 @@ impl SchedulerGrpc .as_secs(), state, }; - self.state.save_executor_heartbeat(executor_heartbeat); + self.state + .executor_manager + .save_executor_heartbeat(executor_heartbeat); Ok(Response::new(HeartBeatResult { reregister: false })) } @@ -275,50 +245,29 @@ impl SchedulerGrpc "Received task status update request for executor {:?}", executor_id ); - trace!("Related task status is {:?}", task_status); - let mut jobs = HashSet::new(); + let num_tasks = task_status.len(); + if let Some(executor_data) = + self.state.executor_manager.get_executor_data(&executor_id) { - let num_tasks = task_status.len(); - for task_status in task_status { - self.state - .save_task_status(&task_status) - .await - .map_err(|e| { - let msg = format!("Could not save task status: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - if let Some(task_id) = task_status.task_id { - jobs.insert(task_id.job_id.clone()); - } - } - - if let Some(executor_data) = self.state.get_executor_data(&executor_id) { - self.state.update_executor_data(&ExecutorDataChange { + self.state + .executor_manager + .update_executor_data(&ExecutorDataChange { executor_id: executor_data.executor_id, task_slots: num_tasks as i32, }); - } else { - error!("Fail to get executor data for {:?}", &executor_id); - } - } - if let Some(event_loop) = self.event_loop.as_ref() { - for job_id in jobs { - event_loop - .get_sender() - .map_err(|e| tonic::Status::internal(format!("{}", e)))? - .post_event(SchedulerServerEvent::JobSubmitted(job_id.clone())) - .await - .map_err(|e| { - let msg = format!( - "Could not send job {} to the channel due to {:?}", - &job_id, e - ); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - } + } else { + error!("Fail to get executor data for {:?}", &executor_id); } + + self.update_task_status(task_status).await.map_err(|e| { + let msg = format!( + "Fail to update tasks status from executor {:?} due to {:?}", + &executor_id, e + ); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + Ok(Response::new(UpdateTaskStatusResult { success: true })) } @@ -409,9 +358,17 @@ impl SchedulerGrpc // Generate job id. // TODO Maybe the format will be changed in the future let job_id = generate_job_id(); + let state = self.state.clone(); + let query_stage_event_sender = + self.query_stage_event_loop.get_sender().map_err(|e| { + tonic::Status::internal(format!( + "Could not get query stage event sender due to: {}", + e + )) + })?; // Save placeholder job metadata - self.state + state .save_job_metadata( &job_id, &JobStatus { @@ -423,19 +380,55 @@ impl SchedulerGrpc tonic::Status::internal(format!("Could not save job metadata: {}", e)) })?; - match self - .post_event(QueryStageSchedulerEvent::JobSubmitted( - job_id.clone(), - Box::new(plan), - )) + let job_id_spawn = job_id.clone(); + let ctx = self.ctx.read().await.clone(); + tokio::spawn(async move { + if let Err(e) = async { + // create physical plan + let start = Instant::now(); + let plan = async { + let optimized_plan = ctx.optimize(&plan).map_err(|e| { + let msg = + format!("Could not create optimized logical plan: {}", e); + error!("{}", msg); + + BallistaError::General(msg) + })?; + + debug!("Calculated optimized plan: {:?}", optimized_plan); + + ctx.create_physical_plan(&optimized_plan) + .await + .map_err(|e| { + let msg = + format!("Could not create physical plan: {}", e); + error!("{}", msg); + + BallistaError::General(msg) + }) + } + .await?; + info!( + "DataFusion created physical plan in {} milliseconds", + start.elapsed().as_millis() + ); + + query_stage_event_sender + .post_event(QueryStageSchedulerEvent::JobSubmitted( + job_id_spawn.clone(), + plan, + )) + .await?; + + Ok::<(), BallistaError>(()) + } .await - { - Err(error) => { - let msg = format!("Job {} failed due to {}", job_id, error); + { + let msg = format!("Job {} failed due to {}", job_id_spawn, e); warn!("{}", msg); - self.state + state .save_job_metadata( - &job_id, + &job_id_spawn, &JobStatus { status: Some(job_status::Status::Failed(FailedJob { error: msg.to_string(), @@ -443,11 +436,16 @@ impl SchedulerGrpc }, ) .await - .unwrap(); - return Err(tonic::Status::internal(msg)); + .unwrap_or_else(|_| { + panic!( + "Fail to update job status to failed for {}", + job_id_spawn + ) + }); } - Ok(_) => Ok(Response::new(ExecuteQueryResult { job_id })), - } + }); + + Ok(Response::new(ExecuteQueryResult { job_id })) } else { Err(tonic::Status::internal("Error parsing request")) } diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 51f6fe4180b4..6386d6b0c7c8 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -26,16 +26,13 @@ use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy}; use ballista_core::error::Result; use ballista_core::event_loop::EventLoop; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; - +use ballista_core::serde::protobuf::TaskStatus; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::prelude::{SessionConfig, SessionContext}; -use crate::scheduler_server::event_loop::{ - SchedulerServerEvent, SchedulerServerEventAction, -}; -use crate::scheduler_server::query_stage_scheduler::{ - QueryStageScheduler, QueryStageSchedulerEvent, -}; +use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent}; +use crate::scheduler_server::event_loop::SchedulerServerEventAction; +use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler; use crate::state::backend::StateBackendClient; use crate::state::SchedulerState; @@ -45,6 +42,7 @@ pub mod externalscaler { include!(concat!(env!("OUT_DIR"), "/externalscaler.rs")); } +pub mod event; mod event_loop; mod external_scaler; mod grpc; @@ -104,7 +102,7 @@ impl SchedulerServer SchedulerServer SchedulerServer Result<()> { + pub(crate) async fn update_task_status( + &self, + tasks_status: Vec, + ) -> Result<()> { + let num_tasks_status = tasks_status.len() as u32; + let stage_events = self.state.stage_manager.update_tasks_status(tasks_status); + if stage_events.is_empty() { + if let Some(event_loop) = self.event_loop.as_ref() { + event_loop + .get_sender()? + .post_event(SchedulerServerEvent::ReviveOffers(num_tasks_status)) + .await?; + } + } else { + for stage_event in stage_events { + self.post_stage_event(stage_event).await?; + } + } + + Ok(()) + } + + async fn post_stage_event(&self, event: QueryStageSchedulerEvent) -> Result<()> { self.query_stage_event_loop .get_sender()? .post_event(event) @@ -166,3 +185,334 @@ pub fn create_datafusion_context(config: &BallistaConfig) -> SessionContext { SessionConfig::new().with_target_partitions(config.default_shuffle_partitions()); SessionContext::with_config(config) } + +#[cfg(all(test, feature = "sled"))] +mod test { + use std::sync::Arc; + use std::time::{Duration, Instant}; + + use tokio::sync::RwLock; + + use ballista_core::config::TaskSchedulingPolicy; + use ballista_core::error::{BallistaError, Result}; + use ballista_core::execution_plans::ShuffleWriterExec; + use ballista_core::serde::protobuf::{ + job_status, task_status, CompletedTask, LogicalPlanNode, PartitionId, + PhysicalPlanNode, TaskStatus, + }; + use ballista_core::serde::scheduler::ExecutorData; + use ballista_core::serde::BallistaCodec; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::logical_plan::{col, sum, LogicalPlan, LogicalPlanBuilder}; + use datafusion::prelude::{SessionConfig, SessionContext}; + + use crate::scheduler_server::event::QueryStageSchedulerEvent; + use crate::scheduler_server::SchedulerServer; + use crate::state::backend::standalone::StandaloneClient; + use crate::state::task_scheduler::TaskScheduler; + + #[tokio::test] + async fn test_pull_based_task_scheduling() -> Result<()> { + let now = Instant::now(); + test_task_scheduling(TaskSchedulingPolicy::PullStaged, test_plan(), 4).await?; + println!( + "pull-based task scheduling cost {}ms", + now.elapsed().as_millis() + ); + + Ok(()) + } + + #[tokio::test] + async fn test_push_based_task_scheduling() -> Result<()> { + let now = Instant::now(); + test_task_scheduling(TaskSchedulingPolicy::PushStaged, test_plan(), 4).await?; + println!( + "push-based task scheduling cost {}ms", + now.elapsed().as_millis() + ); + + Ok(()) + } + + async fn test_task_scheduling( + policy: TaskSchedulingPolicy, + plan_of_linear_stages: LogicalPlan, + total_available_task_slots: usize, + ) -> Result<()> { + let config = + SessionConfig::new().with_target_partitions(total_available_task_slots); + let scheduler = test_scheduler(policy, config).await?; + if matches!(policy, TaskSchedulingPolicy::PushStaged) { + let executors = test_executors(total_available_task_slots); + for executor_data in executors { + scheduler + .state + .executor_manager + .save_executor_data(executor_data); + } + } + + let plan = async { + let ctx = scheduler.ctx.read().await.clone(); + let optimized_plan = ctx.optimize(&plan_of_linear_stages).map_err(|e| { + BallistaError::General(format!( + "Could not create optimized logical plan: {}", + e + )) + })?; + + ctx.create_physical_plan(&optimized_plan) + .await + .map_err(|e| { + BallistaError::General(format!( + "Could not create physical plan: {}", + e + )) + }) + } + .await?; + + let job_id = "job"; + + { + // verify job submit + scheduler + .post_stage_event(QueryStageSchedulerEvent::JobSubmitted( + job_id.to_owned(), + plan, + )) + .await?; + + let waiting_time_ms = + test_waiting_async(|| scheduler.state.get_job_metadata(job_id).is_some()) + .await; + let job_status = scheduler.state.get_job_metadata(job_id); + assert!( + job_status.is_some(), + "Fail to receive JobSubmitted event within {}ms", + waiting_time_ms + ); + } + + let stage_task_num = test_get_job_stage_task_num(&scheduler, job_id); + let first_stage_id = 1u32; + let final_stage_id = stage_task_num.len() as u32 - 1; + assert!(scheduler + .state + .stage_manager + .is_final_stage(job_id, final_stage_id)); + + if matches!(policy, TaskSchedulingPolicy::PullStaged) { + assert!(!scheduler.state.stage_manager.has_running_tasks()); + assert!(scheduler + .state + .stage_manager + .is_running_stage(job_id, first_stage_id)); + if first_stage_id != final_stage_id { + assert!(scheduler + .state + .stage_manager + .is_pending_stage(job_id, final_stage_id)); + } + } + + // complete stage one by one + for stage_id in first_stage_id..final_stage_id { + let next_stage_id = stage_id + 1; + let num_tasks = stage_task_num[stage_id as usize] as usize; + if matches!(policy, TaskSchedulingPolicy::PullStaged) { + let mut executors = test_executors(total_available_task_slots); + let _fet_tasks = scheduler + .state + .fetch_schedulable_tasks(&mut executors, 1) + .await?; + } + assert!(scheduler.state.stage_manager.has_running_tasks()); + assert!(scheduler + .state + .stage_manager + .is_running_stage(job_id, stage_id)); + assert!(scheduler + .state + .stage_manager + .is_pending_stage(job_id, next_stage_id)); + + test_complete_stage(&scheduler, job_id, 1, num_tasks).await?; + assert!(!scheduler.state.stage_manager.has_running_tasks()); + assert!(!scheduler + .state + .stage_manager + .is_running_stage(job_id, stage_id)); + assert!(scheduler + .state + .stage_manager + .is_completed_stage(job_id, stage_id)); + let waiting_time_ms = test_waiting_async(|| { + !scheduler + .state + .stage_manager + .is_pending_stage(job_id, next_stage_id) + }) + .await; + assert!( + !scheduler + .state + .stage_manager + .is_pending_stage(job_id, next_stage_id), + "Fail to update stage state machine within {}ms", + waiting_time_ms + ); + assert!(scheduler + .state + .stage_manager + .is_running_stage(job_id, next_stage_id)); + } + + // complete the final stage + { + let num_tasks = stage_task_num[final_stage_id as usize] as usize; + if matches!(policy, TaskSchedulingPolicy::PullStaged) { + let mut executors = test_executors(total_available_task_slots); + let _fet_tasks = scheduler + .state + .fetch_schedulable_tasks(&mut executors, 1) + .await?; + } + assert!(scheduler.state.stage_manager.has_running_tasks()); + + test_complete_stage(&scheduler, job_id, final_stage_id, num_tasks).await?; + assert!(!scheduler.state.stage_manager.has_running_tasks()); + assert!(!scheduler + .state + .stage_manager + .is_running_stage(job_id, final_stage_id)); + assert!(scheduler + .state + .stage_manager + .is_completed_stage(job_id, final_stage_id)); + let waiting_time_ms = test_waiting_async(|| { + let job_status = scheduler.state.get_job_metadata(job_id).unwrap(); + matches!(job_status.status, Some(job_status::Status::Completed(_))) + }) + .await; + + let job_status = scheduler.state.get_job_metadata(job_id).unwrap(); + assert!( + matches!(job_status.status, Some(job_status::Status::Completed(_))), + "Fail to update job state machine within {}ms", + waiting_time_ms + ); + } + + Ok(()) + } + + async fn test_waiting_async(cond: F) -> u64 + where + F: Fn() -> bool, + { + let round_waiting_time = 10; + let num_round = 5; + for _i in 0..num_round { + if cond() { + break; + } + tokio::time::sleep(Duration::from_millis(round_waiting_time)).await; + } + + round_waiting_time * num_round + } + + async fn test_complete_stage( + scheduler: &SchedulerServer, + job_id: &str, + stage_id: u32, + num_tasks: usize, + ) -> Result<()> { + let tasks_status: Vec = (0..num_tasks as u32) + .into_iter() + .map(|task_id| TaskStatus { + status: Some(task_status::Status::Completed(CompletedTask { + executor_id: "localhost".to_owned(), + partitions: Vec::new(), + })), + task_id: Some(PartitionId { + job_id: job_id.to_owned(), + stage_id, + partition_id: task_id, + }), + }) + .collect(); + scheduler.update_task_status(tasks_status).await + } + + async fn test_scheduler( + policy: TaskSchedulingPolicy, + config: SessionConfig, + ) -> Result> { + let state_storage = Arc::new(StandaloneClient::try_new_temporary()?); + let mut scheduler: SchedulerServer = + SchedulerServer::new_with_policy( + state_storage.clone(), + "default".to_owned(), + policy, + Arc::new(RwLock::new(SessionContext::with_config(config))), + BallistaCodec::default(), + ); + scheduler.init().await?; + + Ok(scheduler) + } + + fn test_executors(num_partitions: usize) -> Vec { + let task_slots = (num_partitions as u32 + 1) / 2; + + vec![ + ExecutorData { + executor_id: "localhost1".to_owned(), + total_task_slots: task_slots, + available_task_slots: task_slots, + }, + ExecutorData { + executor_id: "localhost2".to_owned(), + total_task_slots: num_partitions as u32 - task_slots, + available_task_slots: num_partitions as u32 - task_slots, + }, + ] + } + + fn test_get_job_stage_task_num( + scheduler: &SchedulerServer, + job_id: &str, + ) -> Vec { + let mut ret = vec![0, 1]; + let mut stage_id = 1; + while let Some(stage_plan) = scheduler.state.get_stage_plan(job_id, stage_id) { + if let Some(shuffle_writer) = + stage_plan.as_any().downcast_ref::() + { + if let Some(partitions) = shuffle_writer.shuffle_output_partitioning() { + ret.push(partitions.partition_count() as u32) + } + } + stage_id += 1; + } + + ret + } + + fn test_plan() -> LogicalPlan { + let schema = Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("gmv", DataType::UInt64, false), + ]); + + LogicalPlanBuilder::scan_empty(None, &schema, Some(vec![0, 1])) + .unwrap() + .aggregate(vec![col("id")], vec![sum(col("gmv"))]) + .unwrap() + .build() + .unwrap() + } +} diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs index 52af5484c8be..c29d7be55fba 100644 --- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -15,135 +15,302 @@ // specific language governing permissions and limitations // under the License. +use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use std::time::Instant; +use async_recursion::async_recursion; use async_trait::async_trait; use log::{debug, error, info, warn}; -use tokio::sync::RwLock; use ballista_core::error::{BallistaError, Result}; + use ballista_core::event_loop::{EventAction, EventSender}; +use ballista_core::execution_plans::UnresolvedShuffleExec; use ballista_core::serde::protobuf::{ - job_status, JobStatus, PartitionId, RunningJob, TaskStatus, + job_status, task_status, CompletedJob, CompletedTask, FailedJob, FailedTask, + JobStatus, RunningJob, TaskStatus, }; -use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; -use datafusion::logical_plan::LogicalPlan; +use ballista_core::serde::scheduler::{ExecutorMetadata, PartitionStats}; +use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan}; use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::SessionContext; -use crate::planner::DistributedPlanner; -use crate::scheduler_server::event_loop::SchedulerServerEvent; +use crate::planner::{ + find_unresolved_shuffles, remove_unresolved_shuffles, DistributedPlanner, +}; +use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent}; use crate::state::SchedulerState; -#[derive(Clone)] -pub enum QueryStageSchedulerEvent { - JobSubmitted(String, Box), -} - pub(crate) struct QueryStageScheduler< T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan, > { - ctx: Arc>, state: Arc>, event_sender: Option>, } impl QueryStageScheduler { pub(crate) fn new( - ctx: Arc>, state: Arc>, event_sender: Option>, ) -> Self { Self { - ctx, state, event_sender, } } - async fn create_physical_plan( - &self, - plan: Box, - ) -> Result> { - let start = Instant::now(); - - let ctx = self.ctx.read().await.clone(); - let optimized_plan = ctx.optimize(plan.as_ref()).map_err(|e| { - let msg = format!("Could not create optimized logical plan: {}", e); - error!("{}", msg); - BallistaError::General(msg) - })?; - - debug!("Calculated optimized plan: {:?}", optimized_plan); - - let plan = ctx - .create_physical_plan(&optimized_plan) - .await - .map_err(|e| { - let msg = format!("Could not create physical plan: {}", e); - error!("{}", msg); - BallistaError::General(msg) - }); - - info!( - "DataFusion created physical plan in {} milliseconds", - start.elapsed().as_millis() - ); - - plan - } - async fn generate_stages( &self, job_id: &str, plan: Arc, ) -> Result<()> { let mut planner = DistributedPlanner::new(); + // The last one is the final stage let stages = planner.plan_query_stages(job_id, plan).await.map_err(|e| { let msg = format!("Could not plan query stages: {}", e); error!("{}", msg); BallistaError::General(msg) })?; + let mut stages_dependency: HashMap> = HashMap::new(); // save stages into state - for shuffle_writer in stages { + for shuffle_writer in stages.iter() { + let stage_id = shuffle_writer.stage_id(); + let stage_plan: Arc = shuffle_writer.clone(); self.state - .save_stage_plan( - job_id, - shuffle_writer.stage_id(), - shuffle_writer.clone(), - ) + .save_stage_plan(job_id, stage_id, stage_plan.clone()) .await .map_err(|e| { let msg = format!("Could not save stage plan: {}", e); error!("{}", msg); BallistaError::General(msg) })?; - let num_partitions = shuffle_writer.output_partitioning().partition_count(); - for partition_id in 0..num_partitions { - let pending_status = TaskStatus { - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: shuffle_writer.stage_id() as u32, - partition_id: partition_id as u32, - }), - status: None, - }; - self.state - .save_task_status(&pending_status) - .await - .map_err(|e| { - let msg = format!("Could not save task status: {}", e); - error!("{}", msg); - BallistaError::General(msg) - })?; + + for child in find_unresolved_shuffles(&stage_plan)? { + stages_dependency + .entry(child.stage_id as u32) + .or_insert_with(HashSet::new) + .insert(stage_id as u32); + } + } + + self.state + .stage_manager + .add_stages_dependency(job_id, stages_dependency); + + let final_stage_id = stages.last().as_ref().unwrap().stage_id(); + self.state + .stage_manager + .add_final_stage(job_id, final_stage_id as u32); + self.submit_stage(job_id, final_stage_id).await?; + + Ok(()) + } + + async fn submit_pending_stages(&self, job_id: &str, stage_id: usize) -> Result<()> { + if let Some(parent_stages) = self + .state + .stage_manager + .get_parent_stages(job_id, stage_id as u32) + { + self.state + .stage_manager + .remove_pending_stage(job_id, &parent_stages); + for parent_stage in parent_stages { + self.submit_stage(job_id, parent_stage as usize).await?; } } Ok(()) } + + #[async_recursion] + async fn submit_stage(&self, job_id: &str, stage_id: usize) -> Result<()> { + { + if self + .state + .stage_manager + .is_running_stage(job_id, stage_id as u32) + { + debug!("stage {}/{} has already been submitted", job_id, stage_id); + return Ok(()); + } + if self + .state + .stage_manager + .is_pending_stage(job_id, stage_id as u32) + { + debug!( + "stage {}/{} has already been added to the pending list", + job_id, stage_id + ); + return Ok(()); + } + } + if let Some(stage_plan) = self.state.get_stage_plan(job_id, stage_id) { + if let Some(incomplete_unresolved_shuffles) = self + .try_resolve_stage(job_id, stage_id, stage_plan.clone()) + .await? + { + assert!( + !incomplete_unresolved_shuffles.is_empty(), + "there are no incomplete unresolved shuffles" + ); + for incomplete_unresolved_shuffle in incomplete_unresolved_shuffles { + self.submit_stage(job_id, incomplete_unresolved_shuffle.stage_id) + .await?; + } + self.state + .stage_manager + .add_pending_stage(job_id, stage_id as u32); + } else { + self.state.stage_manager.add_running_stage( + job_id, + stage_id as u32, + stage_plan.output_partitioning().partition_count() as u32, + ); + } + } else { + return Err(BallistaError::General(format!( + "Fail to find stage plan for {}/{}", + job_id, stage_id + ))); + } + Ok(()) + } + + /// Try to resolve a stage if all of the unresolved shuffles are completed. + /// Return the unresolved shuffles which are incomplete + async fn try_resolve_stage( + &self, + job_id: &str, + stage_id: usize, + stage_plan: Arc, + ) -> Result>> { + // Find all of the unresolved shuffles + let unresolved_shuffles = find_unresolved_shuffles(&stage_plan)?; + + // If no dependent shuffles + if unresolved_shuffles.is_empty() { + return Ok(None); + } + + // Find all of the incomplete unresolved shuffles + let (incomplete_unresolved_shuffles, unresolved_shuffles): ( + Vec, + Vec, + ) = unresolved_shuffles.into_iter().partition(|s| { + !self + .state + .stage_manager + .is_completed_stage(job_id, s.stage_id as u32) + }); + + if !incomplete_unresolved_shuffles.is_empty() { + return Ok(Some(incomplete_unresolved_shuffles)); + } + + // All of the unresolved shuffles are completed, update the stage plan + { + let mut partition_locations: HashMap< + usize, // input stage id + HashMap< + usize, // task id of this stage + Vec, // shuffle partitions + >, + > = HashMap::new(); + for unresolved_shuffle in unresolved_shuffles.iter() { + let input_stage_id = unresolved_shuffle.stage_id; + let stage_shuffle_partition_locations = partition_locations + .entry(input_stage_id) + .or_insert_with(HashMap::new); + if let Some(input_stage_tasks) = self + .state + .stage_manager + .get_stage_tasks(job_id, input_stage_id as u32) + { + // each input partition can produce multiple output partitions + for (shuffle_input_partition_id, task_status) in + input_stage_tasks.iter().enumerate() + { + match &task_status.status { + Some(task_status::Status::Completed(CompletedTask { + executor_id, + partitions, + })) => { + debug!( + "Task for unresolved shuffle input partition {} completed and produced these shuffle partitions:\n\t{}", + shuffle_input_partition_id, + partitions.iter().map(|p| format!("{}={}", p.partition_id, &p.path)).collect::>().join("\n\t") + ); + + for shuffle_write_partition in partitions { + let temp = stage_shuffle_partition_locations + .entry( + shuffle_write_partition.partition_id as usize, + ) + .or_insert(Vec::new()); + let executor_meta = self + .state + .get_executor_metadata(executor_id) + .ok_or_else(|| { + BallistaError::General(format!( + "Fail to find executor metadata for {}", + &executor_id + )) + })?; + let partition_location = + ballista_core::serde::scheduler::PartitionLocation { + partition_id: + ballista_core::serde::scheduler::PartitionId { + job_id: job_id.to_owned(), + stage_id: unresolved_shuffle.stage_id, + partition_id: shuffle_write_partition + .partition_id + as usize, + }, + executor_meta, + partition_stats: PartitionStats::new( + Some(shuffle_write_partition.num_rows), + Some(shuffle_write_partition.num_batches), + Some(shuffle_write_partition.num_bytes), + ), + path: shuffle_write_partition.path.clone(), + }; + debug!( + "Scheduler storing stage {} output partition {} path: {}", + unresolved_shuffle.stage_id, + partition_location.partition_id.partition_id, + partition_location.path + ); + temp.push(partition_location); + } + } + _ => { + debug!( + "Stage {} input partition {} has not completed yet", + unresolved_shuffle.stage_id, + shuffle_input_partition_id + ); + // TODO task error handling + } + } + } + } else { + return Err(BallistaError::General(format!( + "Fail to find completed stage for {}/{}", + job_id, stage_id + ))); + } + } + + let plan = + remove_unresolved_shuffles(stage_plan.as_ref(), &partition_locations)?; + self.state.save_stage_plan(job_id, stage_id, plan).await?; + } + + Ok(None) + } } #[async_trait] @@ -163,32 +330,148 @@ impl match event { QueryStageSchedulerEvent::JobSubmitted(job_id, plan) => { info!("Job {} submitted", job_id); - let plan = self.create_physical_plan(plan).await?; - if let Err(e) = self - .state - .save_job_metadata( - &job_id, - &JobStatus { - status: Some(job_status::Status::Running(RunningJob {})), - }, - ) - .await - { - warn!("Could not update job {} status to running: {}", job_id, e); + match self.generate_stages(&job_id, plan).await { + Err(e) => { + let msg = format!("Job {} failed due to {}", job_id, e); + warn!("{}", msg); + self.state + .save_job_metadata( + &job_id, + &JobStatus { + status: Some(job_status::Status::Failed(FailedJob { + error: msg.to_string(), + })), + }, + ) + .await?; + } + Ok(()) => { + if let Err(e) = self + .state + .save_job_metadata( + &job_id, + &JobStatus { + status: Some(job_status::Status::Running( + RunningJob {}, + )), + }, + ) + .await + { + warn!( + "Could not update job {} status to running: {}", + job_id, e + ); + } + } } - self.generate_stages(&job_id, plan).await?; - - if let Some(event_sender) = self.event_sender.as_ref() { - // Send job_id to the scheduler channel - event_sender - .post_event(SchedulerServerEvent::JobSubmitted(job_id)) - .await?; + } + QueryStageSchedulerEvent::StageFinished(job_id, stage_id) => { + info!("Job stage {}/{} finished", job_id, stage_id); + self.submit_pending_stages(&job_id, stage_id as usize) + .await?; + } + QueryStageSchedulerEvent::JobFinished(job_id) => { + info!("Job {} finished", job_id); + let tasks_for_complete_final_stage = self + .state + .stage_manager + .get_tasks_for_complete_final_stage(&job_id)?; + let executors: HashMap = self + .state + .get_executors_metadata() + .await? + .into_iter() + .map(|(meta, _)| (meta.id.to_string(), meta)) + .collect(); + let job_status = get_job_status_from_tasks( + &tasks_for_complete_final_stage, + &executors, + ); + self.state.save_job_metadata(&job_id, &job_status).await?; + } + QueryStageSchedulerEvent::JobFailed(job_id, stage_id, fail_message) => { + error!( + "Job stage {}/{} failed due to {}", + &job_id, stage_id, fail_message + ); + let job_status = JobStatus { + status: Some(job_status::Status::Failed(FailedJob { + error: fail_message, + })), }; + self.state.save_job_metadata(&job_id, &job_status).await?; } } + + if let Some(event_sender) = self.event_sender.as_ref() { + // The stage event must triggerred with releasing some resources. Therefore, revive offers for the scheduler + event_sender + .post_event(SchedulerServerEvent::ReviveOffers(1)) + .await?; + }; Ok(None) } // TODO fn on_error(&self, _error: BallistaError) {} } + +fn get_job_status_from_tasks( + tasks: &[Arc], + executors: &HashMap, +) -> JobStatus { + let mut job_status = tasks + .iter() + .map(|task| match &task.status { + Some(task_status::Status::Completed(CompletedTask { + executor_id, + partitions, + })) => Ok((task, executor_id, partitions)), + _ => Err(BallistaError::General("Task not completed".to_string())), + }) + .collect::>>() + .ok() + .map(|info| { + let mut partition_location = vec![]; + for (status, executor_id, partitions) in info { + let input_partition_id = status.task_id.as_ref().unwrap(); //TODO unwrap + let executor_meta = executors.get(executor_id).map(|e| e.clone().into()); + for shuffle_write_partition in partitions { + let shuffle_input_partition_id = Some(protobuf::PartitionId { + job_id: input_partition_id.job_id.clone(), + stage_id: input_partition_id.stage_id, + partition_id: input_partition_id.partition_id, + }); + partition_location.push(protobuf::PartitionLocation { + partition_id: shuffle_input_partition_id.clone(), + executor_meta: executor_meta.clone(), + partition_stats: Some(protobuf::PartitionStats { + num_batches: shuffle_write_partition.num_batches as i64, + num_rows: shuffle_write_partition.num_rows as i64, + num_bytes: shuffle_write_partition.num_bytes as i64, + column_stats: vec![], + }), + path: shuffle_write_partition.path.clone(), + }); + } + } + job_status::Status::Completed(CompletedJob { partition_location }) + }); + + if job_status.is_none() { + // Update other statuses + for task in tasks.iter() { + if let Some(task_status::Status::Failed(FailedTask { error })) = &task.status + { + let error = error.clone(); + job_status = Some(job_status::Status::Failed(FailedJob { error })); + break; + } + } + } + + JobStatus { + status: Some(job_status.unwrap_or(job_status::Status::Running(RunningJob {}))), + } +} diff --git a/ballista/rust/scheduler/src/state/in_memory_state.rs b/ballista/rust/scheduler/src/state/executor_manager.rs similarity index 68% rename from ballista/rust/scheduler/src/state/in_memory_state.rs rename to ballista/rust/scheduler/src/state/executor_manager.rs index 666d2294011f..40821beabb51 100644 --- a/ballista/rust/scheduler/src/state/in_memory_state.rs +++ b/ballista/rust/scheduler/src/state/executor_manager.rs @@ -15,32 +15,26 @@ // specific language governing permissions and limitations // under the License. -use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use ballista_core::serde::protobuf::ExecutorHeartbeat; use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange}; use log::{error, info, warn}; use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -type JobTasks = HashMap>; #[derive(Clone)] -pub(crate) struct InMemorySchedulerState { +pub(crate) struct ExecutorManager { executors_heartbeat: Arc>>, executors_data: Arc>>, - - // job -> stage -> partition - tasks: Arc>>, } -/// For in-memory state, we don't use async to provide related services -impl InMemorySchedulerState { +impl ExecutorManager { pub(crate) fn new() -> Self { Self { executors_heartbeat: Arc::new(RwLock::new(HashMap::new())), executors_data: Arc::new(RwLock::new(HashMap::new())), - tasks: Arc::new(RwLock::new(HashMap::new())), } } @@ -71,6 +65,7 @@ impl InMemorySchedulerState { .collect() } + #[allow(dead_code)] fn get_alive_executors_within_one_minute(&self) -> HashSet { let now_epoch_ts = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -121,6 +116,8 @@ impl InMemorySchedulerState { /// There are two checks: /// 1. firstly alive /// 2. secondly available task slots > 0 + #[cfg(not(test))] + #[allow(dead_code)] pub(crate) fn get_available_executors_data(&self) -> Vec { let mut res = { let alive_executors = self.get_alive_executors_within_one_minute(); @@ -137,72 +134,12 @@ impl InMemorySchedulerState { res } - pub(crate) fn save_task_status(&self, status: &TaskStatus) { - let task_id = status.task_id.as_ref().unwrap(); - let mut tasks = self.tasks.write(); - let job_tasks = tasks - .entry(task_id.job_id.clone()) - .or_insert_with(HashMap::new); - let stage_tasks = job_tasks - .entry(task_id.stage_id) - .or_insert_with(HashMap::new); - stage_tasks.insert(task_id.partition_id, status.clone()); - } - - pub(crate) fn _get_task( - &self, - job_id: &str, - stage_id: usize, - partition_id: usize, - ) -> Option { - let tasks = self.tasks.read(); - let job_tasks = tasks.get(job_id); - if let Some(job_tasks) = job_tasks { - let stage_id = stage_id as u32; - let stage_tasks = job_tasks.get(&stage_id); - if let Some(stage_tasks) = stage_tasks { - let partition_id = partition_id as u32; - stage_tasks.get(&partition_id).cloned() - } else { - None - } - } else { - None - } - } - - pub(crate) fn get_job_tasks(&self, job_id: &str) -> Option> { - let tasks = self.tasks.read(); - let job_tasks = tasks.get(job_id); - - if let Some(job_tasks) = job_tasks { - let mut res = vec![]; - fill_job_tasks(&mut res, job_tasks); - Some(res) - } else { - None - } - } - - pub(crate) fn get_tasks(&self) -> Vec { - let mut res = vec![]; - - let tasks = self.tasks.read(); - for (_job_id, job_tasks) in tasks.iter() { - fill_job_tasks(&mut res, job_tasks); - } - + #[cfg(test)] + #[allow(dead_code)] + pub(crate) fn get_available_executors_data(&self) -> Vec { + let mut res: Vec = + self.executors_data.read().values().cloned().collect(); + res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots)); res } } - -fn fill_job_tasks( - res: &mut Vec, - job_tasks: &HashMap>, -) { - for stage_tasks in job_tasks.values() { - for task_status in stage_tasks.values() { - res.push(task_status.clone()); - } - } -} diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index b673f7a9ea8a..4d8d4a04f7b4 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -21,94 +21,30 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use datafusion::physical_plan::ExecutionPlan; -use log::{debug, error, info, warn}; -use tokio::sync::mpsc; +use ballista_core::error::Result; -use ballista_core::error::{BallistaError, Result}; -use ballista_core::execution_plans::UnresolvedShuffleExec; - -use ballista_core::serde::protobuf::{ - job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat, FailedJob, - FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus, -}; -use ballista_core::serde::scheduler::{ - ExecutorData, ExecutorDataChange, ExecutorMetadata, PartitionId, PartitionStats, -}; -use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec}; +use ballista_core::serde::protobuf::{ExecutorHeartbeat, JobStatus}; +use ballista_core::serde::scheduler::ExecutorMetadata; +use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::prelude::SessionContext; -use super::planner::remove_unresolved_shuffles; - use crate::state::backend::StateBackendClient; -use crate::state::in_memory_state::InMemorySchedulerState; +use crate::state::executor_manager::ExecutorManager; use crate::state::persistent_state::PersistentSchedulerState; +use crate::state::stage_manager::StageManager; pub mod backend; -mod in_memory_state; +mod executor_manager; mod persistent_state; +mod stage_manager; pub mod task_scheduler; -#[derive(Clone)] -struct SchedulerStateWatcher { - tx_task: mpsc::Sender, -} - -impl SchedulerStateWatcher { - async fn watch(&self, task_status: TaskStatus) -> Result<()> { - self.tx_task.send(task_status).await.map_err(|e| { - BallistaError::Internal(format!( - "Fail to send task status event to channel due to {:?}", - e - )) - })?; - - Ok(()) - } - - fn synchronize_job_status_loop< - T: 'static + AsLogicalPlan, - U: 'static + AsExecutionPlan, - >( - &self, - scheduler_state: SchedulerState, - mut rx_task: mpsc::Receiver, - ) { - tokio::spawn(async move { - info!("Starting the scheduler state watcher"); - loop { - if let Some(task_status) = rx_task.recv().await { - debug!("Watch on task status {:?}", task_status); - if let Some(task_id) = task_status.task_id { - scheduler_state - .synchronize_job_status(&task_id.job_id) - .await - .unwrap_or_else(|e| { - error!( - "Fail to synchronize the status for job {:?} due to {:?}", - task_id.job_id, e - ); - }); - } else { - warn!( - "There's no PartitionId in the task status {:?}", - task_status - ); - } - } else { - info!("Channel is closed and will exit the loop"); - return; - }; - } - }); - } -} - #[derive(Clone)] pub(super) struct SchedulerState { persistent_state: PersistentSchedulerState, - in_memory_state: InMemorySchedulerState, - listener: SchedulerStateWatcher, + pub executor_manager: ExecutorManager, + pub stage_manager: StageManager, } impl SchedulerState { @@ -117,21 +53,15 @@ impl SchedulerState, ) -> Self { - // TODO Make the buffer size configurable - let (tx_task, rx_task) = mpsc::channel::(1000); - let ret = Self { + Self { persistent_state: PersistentSchedulerState::new( config_client, namespace, codec, ), - in_memory_state: InMemorySchedulerState::new(), - listener: SchedulerStateWatcher { tx_task }, - }; - ret.listener - .synchronize_job_status_loop(ret.clone(), rx_task); - - ret + executor_manager: ExecutorManager::new(), + stage_manager: StageManager::new(), + } } pub async fn init(&self, ctx: &SessionContext) -> Result<()> { @@ -150,7 +80,7 @@ impl SchedulerState SchedulerState Result> { - let mut result = vec![]; - - let now_epoch_ts = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards"); - let last_seen_ts_threshold = now_epoch_ts - .checked_sub(last_seen_threshold) - .unwrap_or_else(|| Duration::from_secs(0)); - let alive_executors = self - .in_memory_state - .get_alive_executors(last_seen_ts_threshold.as_secs()); - for executor_id in alive_executors { - let meta = self.get_executor_metadata(&executor_id); - if meta.is_none() { - return Err(BallistaError::General(format!( - "No executor metadata found for {}", - executor_id - ))); - } - result.push(meta.unwrap()); - } - - Ok(result) - } - pub fn get_executor_metadata(&self, executor_id: &str) -> Option { self.persistent_state.get_executor_metadata(executor_id) } @@ -219,27 +120,6 @@ impl SchedulerState Vec { - self.in_memory_state.get_available_executors_data() - } - - pub fn get_executor_data(&self, executor_id: &str) -> Option { - self.in_memory_state.get_executor_data(executor_id) - } - pub async fn save_job_metadata( &self, job_id: &str, @@ -272,375 +152,6 @@ impl SchedulerState Option> { self.persistent_state.get_stage_plan(job_id, stage_id) } - - pub async fn save_task_status(&self, status: &TaskStatus) -> Result<()> { - self.in_memory_state.save_task_status(status); - self.listener.watch(status.clone()).await?; - - Ok(()) - } - - pub fn _get_task_status( - &self, - job_id: &str, - stage_id: usize, - partition_id: usize, - ) -> Option { - self.in_memory_state - ._get_task(job_id, stage_id, partition_id) - } - - pub fn get_job_tasks(&self, job_id: &str) -> Option> { - self.in_memory_state.get_job_tasks(job_id) - } - - pub fn get_all_tasks(&self) -> Vec { - self.in_memory_state.get_tasks() - } - - /// This function ensures that the task wasn't assigned to an executor that died. - /// If that is the case, then the task is re-scheduled. - /// Returns true if the task was dead, false otherwise. - async fn reschedule_dead_task( - &self, - task_status: &TaskStatus, - executors: &[ExecutorMetadata], - ) -> Result { - let executor_id: &str = match &task_status.status { - Some(task_status::Status::Completed(CompletedTask { - executor_id, .. - })) => executor_id, - Some(task_status::Status::Running(RunningTask { executor_id })) => { - executor_id - } - _ => return Ok(false), - }; - let executor_meta = executors.iter().find(|exec| exec.id == executor_id); - let task_is_dead = executor_meta.is_none(); - if task_is_dead { - info!( - "Executor {} isn't alive. Rescheduling task {:?}", - executor_id, - task_status.task_id.as_ref().unwrap() - ); - // Task was handled in an executor that isn't alive anymore, so we can't resolve it - // We mark the task as pending again and continue - let mut task_status = task_status.clone(); - task_status.status = None; - self.save_task_status(&task_status).await?; - } - Ok(task_is_dead) - } - - pub async fn assign_next_schedulable_task( - &self, - executor_id: &str, - ) -> Result)>> { - let tasks = self.get_all_tasks(); - self.assign_next_schedulable_task_inner(executor_id, tasks) - .await - } - - pub async fn assign_next_schedulable_job_task( - &self, - executor_id: &str, - job_id: &str, - ) -> Result)>> { - let job_tasks = self.get_job_tasks(job_id); - if job_tasks.is_some() { - self.assign_next_schedulable_task_inner(executor_id, job_tasks.unwrap()) - .await - } else { - Ok(None) - } - } - - async fn assign_next_schedulable_task_inner( - &self, - executor_id: &str, - tasks: Vec, - ) -> Result)>> { - match self.get_next_schedulable_task(tasks).await? { - Some((status, plan)) => { - let mut status = status.clone(); - status.status = Some(task_status::Status::Running(RunningTask { - executor_id: executor_id.to_owned(), - })); - self.save_task_status(&status).await?; - Ok(Some((status, plan))) - } - _ => Ok(None), - } - } - - async fn get_next_schedulable_task( - &self, - tasks: Vec, - ) -> Result)>> { - let tasks = tasks - .into_iter() - .map(|task| { - let task_id = task.task_id.as_ref().unwrap(); - ( - PartitionId::new( - &task_id.job_id, - task_id.stage_id as usize, - task_id.partition_id as usize, - ), - task, - ) - }) - .collect::>(); - // TODO: Make the duration a configurable parameter - let executors = self - .get_alive_executors_metadata(Duration::from_secs(60)) - .await?; - 'tasks: for (_key, status) in tasks.iter() { - if status.status.is_none() { - let task_id = status.task_id.as_ref().unwrap(); - let plan = self - .get_stage_plan(&task_id.job_id, task_id.stage_id as usize) - .unwrap(); - - // Let's try to resolve any unresolved shuffles we find - let unresolved_shuffles = find_unresolved_shuffles(&plan)?; - let mut partition_locations: HashMap< - usize, // stage id - HashMap< - usize, // shuffle output partition id - Vec, // shuffle partitions - >, - > = HashMap::new(); - for unresolved_shuffle in unresolved_shuffles { - // we schedule one task per *input* partition and each input partition - // can produce multiple output partitions - for shuffle_input_partition_id in - 0..unresolved_shuffle.input_partition_count - { - let partition_id = PartitionId { - job_id: task_id.job_id.clone(), - stage_id: unresolved_shuffle.stage_id, - partition_id: shuffle_input_partition_id, - }; - let referenced_task = tasks.get(&partition_id).unwrap(); - let task_is_dead = self - .reschedule_dead_task(referenced_task, &executors) - .await?; - if task_is_dead { - continue 'tasks; - } - match &referenced_task.status { - Some(task_status::Status::Completed(CompletedTask { - executor_id, - partitions, - })) => { - debug!("Task for unresolved shuffle input partition {} completed and produced these shuffle partitions:\n\t{}", - shuffle_input_partition_id, - partitions.iter().map(|p| format!("{}={}", p.partition_id, &p.path)).collect::>().join("\n\t") - ); - let stage_shuffle_partition_locations = - partition_locations - .entry(unresolved_shuffle.stage_id) - .or_insert_with(HashMap::new); - let executor_meta = executors - .iter() - .find(|exec| exec.id == *executor_id) - .unwrap() - .clone(); - - for shuffle_write_partition in partitions { - let temp = stage_shuffle_partition_locations - .entry( - shuffle_write_partition.partition_id as usize, - ) - .or_insert_with(Vec::new); - let executor_meta = executor_meta.clone(); - let partition_location = - ballista_core::serde::scheduler::PartitionLocation { - partition_id: - ballista_core::serde::scheduler::PartitionId { - job_id: task_id.job_id.clone(), - stage_id: unresolved_shuffle.stage_id, - partition_id: shuffle_write_partition - .partition_id - as usize, - }, - executor_meta, - partition_stats: PartitionStats::new( - Some(shuffle_write_partition.num_rows), - Some(shuffle_write_partition.num_batches), - Some(shuffle_write_partition.num_bytes), - ), - path: shuffle_write_partition.path.clone(), - }; - debug!( - "Scheduler storing stage {} output partition {} path: {}", - unresolved_shuffle.stage_id, - partition_location.partition_id.partition_id, - partition_location.path - ); - temp.push(partition_location); - } - } - Some(task_status::Status::Failed(FailedTask { error })) => { - // A task should fail when its referenced_task fails - let mut status = status.clone(); - let err_msg = error.to_string(); - status.status = - Some(task_status::Status::Failed(FailedTask { - error: err_msg, - })); - self.save_task_status(&status).await?; - continue 'tasks; - } - _ => { - debug!( - "Stage {} input partition {} has not completed yet", - unresolved_shuffle.stage_id, - shuffle_input_partition_id, - ); - continue 'tasks; - } - }; - } - } - - let plan = - remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?; - - // If we get here, there are no more unresolved shuffled and the task can be run - return Ok(Some((status.clone(), plan))); - } - } - Ok(None) - } - - async fn synchronize_job_status(&self, job_id: &str) -> Result<()> { - let executors: HashMap = self - .get_executors_metadata() - .await? - .into_iter() - .map(|(meta, _)| (meta.id.to_string(), meta)) - .collect(); - let status: JobStatus = self.persistent_state.get_job_metadata(job_id).unwrap(); - let new_status = self.get_job_status_from_tasks(job_id, &executors).await?; - if let Some(new_status) = new_status { - if status != new_status { - info!( - "Changing status for job {} to {:?}", - job_id, new_status.status - ); - debug!("Old status: {:?}", status); - debug!("New status: {:?}", new_status); - self.save_job_metadata(job_id, &new_status).await?; - } - } - Ok(()) - } - - async fn get_job_status_from_tasks( - &self, - job_id: &str, - executors: &HashMap, - ) -> Result> { - let statuses = self.in_memory_state.get_job_tasks(job_id); - if statuses.is_none() { - return Ok(None); - } - let statuses = statuses.unwrap(); - if statuses.is_empty() { - return Ok(None); - } - - // Check for job completion - let last_stage = statuses - .iter() - .map(|task| task.task_id.as_ref().unwrap().stage_id) - .max() - .unwrap(); - let statuses: Vec<_> = statuses - .into_iter() - .filter(|task| task.task_id.as_ref().unwrap().stage_id == last_stage) - .collect(); - let mut job_status = statuses - .iter() - .map(|status| match &status.status { - Some(task_status::Status::Completed(CompletedTask { - executor_id, - partitions, - })) => Ok((status, executor_id, partitions)), - _ => Err(BallistaError::General("Task not completed".to_string())), - }) - .collect::>>() - .ok() - .map(|info| { - let mut partition_location = vec![]; - for (status, executor_id, partitions) in info { - let input_partition_id = status.task_id.as_ref().unwrap(); //TODO unwrap - let executor_meta = - executors.get(executor_id).map(|e| e.clone().into()); - for shuffle_write_partition in partitions { - let shuffle_input_partition_id = Some(protobuf::PartitionId { - job_id: input_partition_id.job_id.clone(), - stage_id: input_partition_id.stage_id, - partition_id: input_partition_id.partition_id, - }); - partition_location.push(protobuf::PartitionLocation { - partition_id: shuffle_input_partition_id.clone(), - executor_meta: executor_meta.clone(), - partition_stats: Some(protobuf::PartitionStats { - num_batches: shuffle_write_partition.num_batches as i64, - num_rows: shuffle_write_partition.num_rows as i64, - num_bytes: shuffle_write_partition.num_bytes as i64, - column_stats: vec![], - }), - path: shuffle_write_partition.path.clone(), - }); - } - } - job_status::Status::Completed(CompletedJob { partition_location }) - }); - - if job_status.is_none() { - // Update other statuses - for status in statuses { - match status.status { - Some(task_status::Status::Failed(FailedTask { error })) => { - job_status = - Some(job_status::Status::Failed(FailedJob { error })); - break; - } - Some(task_status::Status::Running(_)) if job_status == None => { - job_status = Some(job_status::Status::Running(RunningJob {})); - } - _ => (), - } - } - } - Ok(job_status.map(|status| JobStatus { - status: Some(status), - })) - } -} - -/// Returns the unresolved shuffles in the execution plan -fn find_unresolved_shuffles( - plan: &Arc, -) -> Result> { - if let Some(unresolved_shuffle) = - plan.as_any().downcast_ref::() - { - Ok(vec![unresolved_shuffle.clone()]) - } else { - Ok(plan - .children() - .iter() - .map(find_unresolved_shuffles) - .collect::>>()? - .into_iter() - .flatten() - .collect()) - } } #[cfg(all(test, feature = "sled"))] @@ -649,8 +160,7 @@ mod test { use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::{ - job_status, task_status, CompletedTask, FailedTask, JobStatus, LogicalPlanNode, - PartitionId, PhysicalPlanNode, QueuedJob, RunningJob, RunningTask, TaskStatus, + job_status, JobStatus, LogicalPlanNode, PhysicalPlanNode, QueuedJob, }; use ballista_core::serde::scheduler::{ExecutorMetadata, ExecutorSpecification}; use ballista_core::serde::BallistaCodec; @@ -720,309 +230,4 @@ mod test { assert!(result.is_none()); Ok(()) } - - #[tokio::test] - async fn task_status() -> Result<(), BallistaError> { - let state: SchedulerState = - SchedulerState::new( - Arc::new(StandaloneClient::try_new_temporary()?), - "test".to_string(), - BallistaCodec::default(), - ); - let meta = TaskStatus { - status: Some(task_status::Status::Failed(FailedTask { - error: "error".to_owned(), - })), - task_id: Some(PartitionId { - job_id: "job".to_owned(), - stage_id: 1, - partition_id: 2, - }), - }; - state.save_task_status(&meta).await?; - let result = state._get_task_status("job", 1, 2); - assert!(result.is_some()); - match result.unwrap().status.unwrap() { - task_status::Status::Failed(_) => (), - _ => panic!("Unexpected status"), - } - Ok(()) - } - - #[tokio::test] - async fn task_status_non_existant() -> Result<(), BallistaError> { - let state: SchedulerState = - SchedulerState::new( - Arc::new(StandaloneClient::try_new_temporary()?), - "test".to_string(), - BallistaCodec::default(), - ); - let meta = TaskStatus { - status: Some(task_status::Status::Failed(FailedTask { - error: "error".to_owned(), - })), - task_id: Some(PartitionId { - job_id: "job".to_owned(), - stage_id: 1, - partition_id: 2, - }), - }; - state.save_task_status(&meta).await?; - let result = state._get_task_status("job", 25, 2); - assert!(result.is_none()); - Ok(()) - } - - #[tokio::test] - async fn task_synchronize_job_status_queued() -> Result<(), BallistaError> { - let state: SchedulerState = - SchedulerState::new( - Arc::new(StandaloneClient::try_new_temporary()?), - "test".to_string(), - BallistaCodec::default(), - ); - let job_id = "job"; - let job_status = JobStatus { - status: Some(job_status::Status::Queued(QueuedJob {})), - }; - state.save_job_metadata(job_id, &job_status).await?; - // Call it explicitly to achieve fast synchronization - state.synchronize_job_status(job_id).await?; - let result = state.get_job_metadata(job_id).unwrap(); - assert_eq!(result, job_status); - Ok(()) - } - - #[tokio::test] - async fn task_synchronize_job_status_running() -> Result<(), BallistaError> { - let state: SchedulerState = - SchedulerState::new( - Arc::new(StandaloneClient::try_new_temporary()?), - "test".to_string(), - BallistaCodec::default(), - ); - let job_id = "job"; - let job_status = JobStatus { - status: Some(job_status::Status::Running(RunningJob {})), - }; - state.save_job_metadata(job_id, &job_status).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Completed(CompletedTask { - executor_id: "".to_owned(), - partitions: vec![], - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 0, - }), - }; - state.save_task_status(&meta).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Running(RunningTask { - executor_id: "".to_owned(), - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 1, - }), - }; - state.save_task_status(&meta).await?; - // Call it explicitly to achieve fast synchronization - state.synchronize_job_status(job_id).await?; - let result = state.get_job_metadata(job_id).unwrap(); - assert_eq!(result, job_status); - Ok(()) - } - - #[tokio::test] - async fn task_synchronize_job_status_running2() -> Result<(), BallistaError> { - let state: SchedulerState = - SchedulerState::new( - Arc::new(StandaloneClient::try_new_temporary()?), - "test".to_string(), - BallistaCodec::default(), - ); - let job_id = "job"; - let job_status = JobStatus { - status: Some(job_status::Status::Running(RunningJob {})), - }; - state.save_job_metadata(job_id, &job_status).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Completed(CompletedTask { - executor_id: "".to_owned(), - partitions: vec![], - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 0, - }), - }; - state.save_task_status(&meta).await?; - let meta = TaskStatus { - status: None, - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 1, - }), - }; - state.save_task_status(&meta).await?; - // Call it explicitly to achieve fast synchronization - state.synchronize_job_status(job_id).await?; - let result = state.get_job_metadata(job_id).unwrap(); - assert_eq!(result, job_status); - Ok(()) - } - - #[tokio::test] - async fn task_synchronize_job_status_completed() -> Result<(), BallistaError> { - let state: SchedulerState = - SchedulerState::new( - Arc::new(StandaloneClient::try_new_temporary()?), - "test".to_string(), - BallistaCodec::default(), - ); - let job_id = "job"; - let job_status = JobStatus { - status: Some(job_status::Status::Running(RunningJob {})), - }; - state.save_job_metadata(job_id, &job_status).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Completed(CompletedTask { - executor_id: "".to_owned(), - partitions: vec![], - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 0, - }), - }; - state.save_task_status(&meta).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Completed(CompletedTask { - executor_id: "".to_owned(), - partitions: vec![], - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 1, - }), - }; - state.save_task_status(&meta).await?; - // Call it explicitly to achieve fast synchronization - state.synchronize_job_status(job_id).await?; - let result = state.get_job_metadata(job_id).unwrap(); - match result.status.unwrap() { - job_status::Status::Completed(_) => (), - status => panic!("Received status: {:?}", status), - } - Ok(()) - } - - #[tokio::test] - async fn task_synchronize_job_status_completed2() -> Result<(), BallistaError> { - let state: SchedulerState = - SchedulerState::new( - Arc::new(StandaloneClient::try_new_temporary()?), - "test".to_string(), - BallistaCodec::default(), - ); - let job_id = "job"; - let job_status = JobStatus { - status: Some(job_status::Status::Queued(QueuedJob {})), - }; - state.save_job_metadata(job_id, &job_status).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Completed(CompletedTask { - executor_id: "".to_owned(), - partitions: vec![], - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 0, - }), - }; - state.save_task_status(&meta).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Completed(CompletedTask { - executor_id: "".to_owned(), - partitions: vec![], - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 1, - }), - }; - state.save_task_status(&meta).await?; - // Call it explicitly to achieve fast synchronization - state.synchronize_job_status(job_id).await?; - let result = state.get_job_metadata(job_id).unwrap(); - match result.status.unwrap() { - job_status::Status::Completed(_) => (), - status => panic!("Received status: {:?}", status), - } - Ok(()) - } - - #[tokio::test] - async fn task_synchronize_job_status_failed() -> Result<(), BallistaError> { - let state: SchedulerState = - SchedulerState::new( - Arc::new(StandaloneClient::try_new_temporary()?), - "test".to_string(), - BallistaCodec::default(), - ); - let job_id = "job"; - let job_status = JobStatus { - status: Some(job_status::Status::Running(RunningJob {})), - }; - state.save_job_metadata(job_id, &job_status).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Completed(CompletedTask { - executor_id: "".to_owned(), - partitions: vec![], - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 0, - }), - }; - state.save_task_status(&meta).await?; - let meta = TaskStatus { - status: Some(task_status::Status::Failed(FailedTask { - error: "".to_owned(), - })), - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 1, - }), - }; - state.save_task_status(&meta).await?; - let meta = TaskStatus { - status: None, - task_id: Some(PartitionId { - job_id: job_id.to_owned(), - stage_id: 0, - partition_id: 2, - }), - }; - state.save_task_status(&meta).await?; - // Call it explicitly to achieve fast synchronization - state.synchronize_job_status(job_id).await?; - let result = state.get_job_metadata(job_id).unwrap(); - match result.status.unwrap() { - job_status::Status::Failed(_) => (), - status => panic!("Received status: {:?}", status), - } - Ok(()) - } } diff --git a/ballista/rust/scheduler/src/state/persistent_state.rs b/ballista/rust/scheduler/src/state/persistent_state.rs index ac4ef2f55930..3ef07e964327 100644 --- a/ballista/rust/scheduler/src/state/persistent_state.rs +++ b/ballista/rust/scheduler/src/state/persistent_state.rs @@ -27,13 +27,12 @@ use ballista_core::error::{BallistaError, Result}; use ballista_core::serde::protobuf::JobStatus; use crate::state::backend::StateBackendClient; +use crate::state::stage_manager::StageKey; use ballista_core::serde::scheduler::ExecutorMetadata; use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; -type StageKey = (String, u32); - #[derive(Clone)] pub(crate) struct PersistentSchedulerState< T: 'static + AsLogicalPlan, diff --git a/ballista/rust/scheduler/src/state/stage_manager.rs b/ballista/rust/scheduler/src/state/stage_manager.rs new file mode 100644 index 000000000000..f83b5fda9cae --- /dev/null +++ b/ballista/rust/scheduler/src/state/stage_manager.rs @@ -0,0 +1,782 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use log::{error, info, warn}; +use parking_lot::RwLock; +use rand::Rng; + +use crate::scheduler_server::event::QueryStageSchedulerEvent; +use crate::state::task_scheduler::StageScheduler; +use ballista_core::error::{BallistaError, Result}; +use ballista_core::serde::protobuf; +use ballista_core::serde::protobuf::{task_status, FailedTask, TaskStatus}; + +pub type StageKey = (String, u32); + +#[derive(Clone)] +pub struct StageManager { + stage_distribution: Arc>, + + // The final stage id for jobs + final_stages: Arc>>, + + // (job_id, stage_id) -> stage set in which each one depends on (job_id, stage_id) + stages_dependency: Arc>>>, + + // job_id -> pending stages + pending_stages: Arc>>>, +} + +impl StageManager { + pub fn new() -> Self { + Self { + stage_distribution: Arc::new(RwLock::new(StageDistribution::new())), + final_stages: Arc::new(RwLock::new(HashMap::new())), + stages_dependency: Arc::new(RwLock::new(HashMap::new())), + pending_stages: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub fn add_final_stage(&self, job_id: &str, stage_id: u32) { + let mut final_stages = self.final_stages.write(); + final_stages.insert(job_id.to_owned(), stage_id); + } + + pub fn is_final_stage(&self, job_id: &str, stage_id: u32) -> bool { + self.get_final_stage_id(job_id) + .map(|final_stage_id| final_stage_id == stage_id) + .unwrap_or(false) + } + + fn get_final_stage_id(&self, job_id: &str) -> Option { + let final_stages = self.final_stages.read(); + final_stages.get(job_id).cloned() + } + + pub fn get_tasks_for_complete_final_stage( + &self, + job_id: &str, + ) -> Result>> { + let final_stage_id = self.get_final_stage_id(job_id).ok_or_else(|| { + BallistaError::General(format!( + "Fail to find final stage id for job {}", + job_id + )) + })?; + + let stage_key = (job_id.to_owned(), final_stage_id); + let stage_distribution = self.stage_distribution.read(); + + if let Some(stage) = stage_distribution.stages_completed.get(&stage_key) { + Ok(stage.tasks.clone()) + } else { + Err(BallistaError::General(format!( + "The final stage id {} has not been completed yet", + final_stage_id + ))) + } + } + + pub fn add_pending_stage(&self, job_id: &str, stage_id: u32) { + let mut pending_stages = self.pending_stages.write(); + pending_stages + .entry(job_id.to_owned()) + .or_insert_with(HashSet::new) + .insert(stage_id); + } + + pub fn is_pending_stage(&self, job_id: &str, stage_id: u32) -> bool { + let pending_stages = self.pending_stages.read(); + if let Some(pending_stages) = pending_stages.get(job_id) { + pending_stages.contains(&stage_id) + } else { + false + } + } + + pub fn remove_pending_stage( + &self, + job_id: &str, + stages_remove: &HashSet, + ) -> bool { + let mut pending_stages = self.pending_stages.write(); + let mut is_stages_empty = false; + let ret = if let Some(stages) = pending_stages.get_mut(job_id) { + let len_before_remove = stages.len(); + for stage_id in stages_remove { + stages.remove(stage_id); + } + is_stages_empty = stages.is_empty(); + stages.len() != len_before_remove + } else { + false + }; + + if is_stages_empty { + pending_stages.remove(job_id); + } + + ret + } + + pub fn add_stages_dependency( + &self, + job_id: &str, + dependencies: HashMap>, + ) { + let mut stages_dependency = self.stages_dependency.write(); + for (stage_id, parent_stages) in dependencies.into_iter() { + stages_dependency.insert((job_id.to_owned(), stage_id), parent_stages); + } + } + + pub fn get_parent_stages(&self, job_id: &str, stage_id: u32) -> Option> { + let stage_key = (job_id.to_owned(), stage_id); + let stages_dependency = self.stages_dependency.read(); + stages_dependency.get(&stage_key).cloned() + } + + pub fn add_running_stage(&self, job_id: &str, stage_id: u32, num_partitions: u32) { + let stage = Stage::new(job_id, stage_id, num_partitions); + + let mut stage_distribution = self.stage_distribution.write(); + stage_distribution + .stages_running + .insert((job_id.to_string(), stage_id), stage); + } + + pub fn is_running_stage(&self, job_id: &str, stage_id: u32) -> bool { + let stage_key = (job_id.to_owned(), stage_id); + let stage_distribution = self.stage_distribution.read(); + stage_distribution.stages_running.get(&stage_key).is_some() + } + + pub fn is_completed_stage(&self, job_id: &str, stage_id: u32) -> bool { + let stage_key = (job_id.to_owned(), stage_id); + let stage_distribution = self.stage_distribution.read(); + stage_distribution + .stages_completed + .get(&stage_key) + .is_some() + } + + pub(crate) fn get_stage_tasks( + &self, + job_id: &str, + stage_id: u32, + ) -> Option>> { + let stage_key = (job_id.to_owned(), stage_id); + let stage_distribution = self.stage_distribution.read(); + if let Some(stage) = stage_distribution.stages_running.get(&stage_key) { + Some(stage.tasks.clone()) + } else { + stage_distribution + .stages_completed + .get(&stage_key) + .map(|task| task.tasks.clone()) + } + } + + pub(crate) fn update_tasks_status( + &self, + tasks_status: Vec, + ) -> Vec { + let mut all_tasks_status: HashMap> = HashMap::new(); + for task_status in tasks_status { + if let Some(task_id) = task_status.task_id.as_ref() { + let stage_tasks_status = all_tasks_status + .entry((task_id.job_id.clone(), task_id.stage_id)) + .or_insert_with(Vec::new); + stage_tasks_status.push(task_status); + } else { + error!("There's no task id when updating status"); + } + } + + let mut ret = vec![]; + let mut stage_distribution = self.stage_distribution.write(); + for (stage_key, stage_tasks_status) in all_tasks_status.into_iter() { + if let Some(stage) = stage_distribution.stages_running.get_mut(&stage_key) { + for task_status in &stage_tasks_status { + stage.update_task_status(task_status); + } + if let Some(fail_message) = stage.get_fail_message() { + ret.push(QueryStageSchedulerEvent::JobFailed( + stage_key.0.clone(), + stage_key.1, + fail_message, + )); + } else if stage.is_completed() { + stage_distribution.complete_stage(stage_key.clone()); + if self.is_final_stage(&stage_key.0, stage_key.1) { + ret.push(QueryStageSchedulerEvent::JobFinished( + stage_key.0.clone(), + )); + } else { + ret.push(QueryStageSchedulerEvent::StageFinished( + stage_key.0.clone(), + stage_key.1, + )); + } + } + } else { + error!("Fail to find stage for {:?}/{}", &stage_key.0, stage_key.1); + } + } + + ret + } + + pub fn fetch_pending_tasks( + &self, + max_num: usize, + cond: F, + ) -> Option<(String, u32, Vec)> + where + F: Fn(&StageKey) -> bool, + { + if let Some(next_stage) = self.fetch_schedulable_stage(cond) { + if let Some(next_tasks) = + self.find_stage_pending_tasks(&next_stage.0, next_stage.1, max_num) + { + Some((next_stage.0.to_owned(), next_stage.1, next_tasks)) + } else { + warn!( + "Fail to find pending tasks for stage {}/{}", + next_stage.0, next_stage.1 + ); + None + } + } else { + None + } + } + + fn find_stage_pending_tasks( + &self, + job_id: &str, + stage_id: u32, + max_num: usize, + ) -> Option> { + let stage_key = (job_id.to_owned(), stage_id); + let stage_distribution = self.stage_distribution.read(); + stage_distribution + .stages_running + .get(&stage_key) + .map(|stage| stage.find_pending_tasks(max_num)) + } + + pub fn has_running_tasks(&self) -> bool { + let stage_distribution = self.stage_distribution.read(); + for stage in stage_distribution.stages_running.values() { + if !stage.get_running_tasks().is_empty() { + return true; + } + } + + false + } +} + +// TODO Currently, it will randomly choose a stage. In the future, we can add more sophisticated stage choose algorithm here, like priority, etc. +impl StageScheduler for StageManager { + fn fetch_schedulable_stage(&self, cond: F) -> Option + where + F: Fn(&StageKey) -> bool, + { + let mut rng = rand::thread_rng(); + let stage_distribution = self.stage_distribution.read(); + let stages_running = &stage_distribution.stages_running; + if stages_running.is_empty() { + info!("There's no running stages"); + return None; + } + let stages = stages_running + .iter() + .filter(|entry| entry.1.is_schedulable() && cond(entry.0)) + .map(|entry| entry.0) + .collect::>(); + if stages.is_empty() { + None + } else { + let n_th = rng.gen_range(0..stages.len()); + Some(stages[n_th].clone()) + } + } +} + +struct StageDistribution { + // The key is (job_id, stage_id) + stages_running: HashMap, + stages_completed: HashMap, +} + +impl StageDistribution { + fn new() -> Self { + Self { + stages_running: HashMap::new(), + stages_completed: HashMap::new(), + } + } + + fn complete_stage(&mut self, stage_key: StageKey) { + if let Some(stage) = self.stages_running.remove(&stage_key) { + assert!( + stage.is_completed(), + "Stage {}/{} is not completed", + stage_key.0, + stage_key.1 + ); + self.stages_completed.insert(stage_key, stage); + } else { + warn!( + "Fail to find running stage {:?}/{}", + stage_key.0, stage_key.1 + ); + } + } +} + +pub struct Stage { + pub stage_id: u32, + tasks: Vec>, + + tasks_distribution: TaskStatusDistribution, +} + +impl Stage { + fn new(job_id: &str, stage_id: u32, num_partitions: u32) -> Self { + let mut tasks = vec![]; + for partition_id in 0..num_partitions { + let pending_status = Arc::new(TaskStatus { + task_id: Some(protobuf::PartitionId { + job_id: job_id.to_owned(), + stage_id, + partition_id, + }), + status: None, + }); + + tasks.push(pending_status); + } + + Stage { + stage_id, + tasks, + tasks_distribution: TaskStatusDistribution::new(num_partitions as usize), + } + } + + // If error happens for updating some task status, just quietly print the error message + fn update_task_status(&mut self, task: &TaskStatus) { + if let Some(task_id) = &task.task_id { + let task_idx = task_id.partition_id as usize; + if task_idx < self.tasks.len() { + let existing_task_status = self.tasks[task_idx].clone(); + if self.tasks_distribution.update( + task_idx, + &existing_task_status.status, + &task.status, + ) { + self.tasks[task_idx] = Arc::new(task.clone()); + } else { + error!( + "Fail to update status from {:?} to {:?} for task: {:?}/{:?}/{:?}", &existing_task_status.status, &task.status, + &task_id.job_id, &task_id.stage_id, task_idx + ) + } + } else { + error!( + "Fail to find existing task: {:?}/{:?}/{:?}", + &task_id.job_id, &task_id.stage_id, task_idx + ) + } + } else { + error!("Fail to update task status due to no task id"); + } + } + + fn is_schedulable(&self) -> bool { + self.tasks_distribution.is_schedulable() + } + + fn is_completed(&self) -> bool { + self.tasks_distribution.is_completed() + } + + // If return None, means no failed tasks + fn get_fail_message(&self) -> Option { + if self.tasks_distribution.is_failed() { + let task_idx = self.tasks_distribution.sample_failed_index(); + if let Some(task) = self.tasks.get(task_idx) { + if let Some(task_status::Status::Failed(FailedTask { error })) = + &task.status + { + Some(error.clone()) + } else { + warn!("task {:?} is not failed", task); + None + } + } else { + warn!("Could not find error tasks"); + None + } + } else { + None + } + } + + pub fn find_pending_tasks(&self, max_num: usize) -> Vec { + self.tasks_distribution.find_pending_indicators(max_num) + } + + fn get_running_tasks(&self) -> Vec> { + self.tasks_distribution + .running_indicator + .indicator + .iter() + .enumerate() + .filter(|(_i, is_running)| **is_running) + .map(|(i, _is_running)| self.tasks[i].clone()) + .collect() + } +} + +#[derive(Clone)] +struct TaskStatusDistribution { + len: usize, + pending_indicator: TaskStatusIndicator, + running_indicator: TaskStatusIndicator, + failed_indicator: TaskStatusIndicator, + completed_indicator: TaskStatusIndicator, +} + +impl TaskStatusDistribution { + fn new(len: usize) -> Self { + Self { + len, + pending_indicator: TaskStatusIndicator { + indicator: (0..len).map(|_| true).collect::>(), + n_of_true: len, + }, + running_indicator: TaskStatusIndicator { + indicator: (0..len).map(|_| false).collect::>(), + n_of_true: 0, + }, + failed_indicator: TaskStatusIndicator { + indicator: (0..len).map(|_| false).collect::>(), + n_of_true: 0, + }, + completed_indicator: TaskStatusIndicator { + indicator: (0..len).map(|_| false).collect::>(), + n_of_true: 0, + }, + } + } + + fn is_schedulable(&self) -> bool { + self.pending_indicator.n_of_true != 0 + } + + fn is_completed(&self) -> bool { + self.completed_indicator.n_of_true == self.len + } + + fn is_failed(&self) -> bool { + self.failed_indicator.n_of_true != 0 + } + + fn sample_failed_index(&self) -> usize { + for i in 0..self.len { + if self.failed_indicator.indicator[i] { + return i; + } + } + + self.len + } + + fn find_pending_indicators(&self, max_num: usize) -> Vec { + let mut ret = vec![]; + if max_num < 1 { + return ret; + } + + let len = std::cmp::min(max_num, self.len); + for idx in 0..self.len { + if self.pending_indicator.indicator[idx] { + ret.push(idx as u32); + if ret.len() >= len { + break; + } + } + } + + ret + } + + fn update( + &mut self, + idx: usize, + from: &Option, + to: &Option, + ) -> bool { + assert!( + idx < self.len, + "task index {} should be smaller than {}", + idx, + self.len + ); + + match (from, to) { + (Some(from), Some(to)) => match (from, to) { + (task_status::Status::Running(_), task_status::Status::Failed(_)) => { + self.running_indicator.set_false(idx); + self.failed_indicator.set_true(idx); + } + (task_status::Status::Running(_), task_status::Status::Completed(_)) => { + self.running_indicator.set_false(idx); + self.completed_indicator.set_true(idx); + } + _ => { + return false; + } + }, + (None, Some(task_status::Status::Running(_))) => { + self.pending_indicator.set_false(idx); + self.running_indicator.set_true(idx); + } + (Some(from), None) => match from { + task_status::Status::Failed(_) => { + self.failed_indicator.set_false(idx); + self.pending_indicator.set_true(idx); + } + task_status::Status::Completed(_) => { + self.completed_indicator.set_false(idx); + self.pending_indicator.set_true(idx); + } + _ => { + return false; + } + }, + _ => { + return false; + } + } + + true + } +} + +#[derive(Clone)] +struct TaskStatusIndicator { + indicator: Vec, + n_of_true: usize, +} + +impl TaskStatusIndicator { + fn set_false(&mut self, idx: usize) { + self.indicator[idx] = false; + self.n_of_true -= 1; + } + + fn set_true(&mut self, idx: usize) { + self.indicator[idx] = true; + self.n_of_true += 1; + } +} + +#[cfg(test)] +mod test { + use crate::state::stage_manager::StageManager; + use ballista_core::error::Result; + use ballista_core::serde::protobuf::{ + task_status, CompletedTask, FailedTask, PartitionId, RunningTask, TaskStatus, + }; + + #[tokio::test] + async fn test_task_status_state_machine_failed() -> Result<()> { + let stage_manager = StageManager::new(); + + let num_partitions = 3; + let job_id = "job"; + let stage_id = 1u32; + + stage_manager.add_running_stage(job_id, stage_id, num_partitions); + + let task_id = PartitionId { + job_id: job_id.to_owned(), + stage_id, + partition_id: 2, + }; + + { + // Invalid transformation from Pending to Failed + stage_manager.update_tasks_status(vec![TaskStatus { + status: Some(task_status::Status::Failed(FailedTask { + error: "error".to_owned(), + })), + task_id: Some(task_id.clone()), + }]); + let ret = stage_manager.get_stage_tasks(job_id, stage_id); + assert!(ret.is_some()); + assert!(ret + .unwrap() + .get(task_id.partition_id as usize) + .unwrap() + .status + .is_none()); + } + + { + // Valid transformation from Pending to Running to Failed + stage_manager.update_tasks_status(vec![TaskStatus { + status: Some(task_status::Status::Running(RunningTask { + executor_id: "localhost".to_owned(), + })), + task_id: Some(task_id.clone()), + }]); + stage_manager.update_tasks_status(vec![TaskStatus { + status: Some(task_status::Status::Failed(FailedTask { + error: "error".to_owned(), + })), + task_id: Some(task_id.clone()), + }]); + let ret = stage_manager.get_stage_tasks(job_id, stage_id); + assert!(ret.is_some()); + match ret + .unwrap() + .get(task_id.partition_id as usize) + .unwrap() + .status + .as_ref() + .unwrap() + { + task_status::Status::Failed(_) => (), + _ => panic!("Unexpected status"), + } + } + + Ok(()) + } + + #[tokio::test] + async fn test_task_status_state_machine_completed() -> Result<()> { + let stage_manager = StageManager::new(); + + let num_partitions = 3; + let job_id = "job"; + let stage_id = 1u32; + + stage_manager.add_running_stage(job_id, stage_id, num_partitions); + + let task_id = PartitionId { + job_id: job_id.to_owned(), + stage_id, + partition_id: 2, + }; + + // Valid transformation from Pending to Running to Completed to Pending + task_from_pending_to_completed(&stage_manager, &task_id); + let ret = stage_manager.get_stage_tasks(job_id, stage_id); + assert!(ret.is_some()); + match ret + .unwrap() + .get(task_id.partition_id as usize) + .unwrap() + .status + .as_ref() + .unwrap() + { + task_status::Status::Completed(_) => (), + _ => panic!("Unexpected status"), + } + stage_manager.update_tasks_status(vec![TaskStatus { + status: None, + task_id: Some(task_id.clone()), + }]); + let ret = stage_manager.get_stage_tasks(job_id, stage_id); + assert!(ret.is_some()); + assert!(ret + .unwrap() + .get(task_id.partition_id as usize) + .unwrap() + .status + .is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_stage_state_machine_completed() -> Result<()> { + let stage_manager = StageManager::new(); + + let num_partitions = 3; + let job_id = "job"; + let stage_id = 1u32; + + // Valid transformation from Running to Completed + stage_manager.add_running_stage(job_id, stage_id, num_partitions); + assert!(stage_manager.is_running_stage(job_id, stage_id)); + for partition_id in 0..num_partitions { + task_from_pending_to_completed( + &stage_manager, + &PartitionId { + job_id: job_id.to_owned(), + stage_id, + partition_id, + }, + ); + } + assert!(stage_manager.is_completed_stage(job_id, stage_id)); + + // Valid transformation from Completed to Running + stage_manager.update_tasks_status(vec![TaskStatus { + status: None, + task_id: Some(PartitionId { + job_id: job_id.to_owned(), + stage_id, + partition_id: 0, + }), + }]); + assert!(!stage_manager.is_running_stage(job_id, stage_id)); + + Ok(()) + } + + fn task_from_pending_to_completed( + stage_manager: &StageManager, + task_id: &PartitionId, + ) { + stage_manager.update_tasks_status(vec![TaskStatus { + status: Some(task_status::Status::Running(RunningTask { + executor_id: "localhost".to_owned(), + })), + task_id: Some(task_id.clone()), + }]); + stage_manager.update_tasks_status(vec![TaskStatus { + status: Some(task_status::Status::Completed(CompletedTask { + executor_id: "localhost".to_owned(), + partitions: Vec::new(), + })), + task_id: Some(task_id.clone()), + }]); + } +} diff --git a/ballista/rust/scheduler/src/state/task_scheduler.rs b/ballista/rust/scheduler/src/state/task_scheduler.rs index 3cbf783beb61..3ebd033d4995 100644 --- a/ballista/rust/scheduler/src/state/task_scheduler.rs +++ b/ballista/rust/scheduler/src/state/task_scheduler.rs @@ -15,68 +15,119 @@ // specific language governing permissions and limitations // under the License. +use crate::state::stage_manager::StageKey; use crate::state::SchedulerState; use async_trait::async_trait; use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; -use ballista_core::serde::protobuf::TaskDefinition; +use ballista_core::serde::protobuf::{ + job_status, task_status, FailedJob, RunningTask, TaskDefinition, TaskStatus, +}; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use ballista_core::serde::scheduler::ExecutorData; +use ballista_core::serde::scheduler::{ExecutorData, PartitionId}; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; -use log::{error, info}; -use tonic::Status; +use log::{debug, info}; #[async_trait] pub trait TaskScheduler { - async fn fetch_tasks( + // For each round, it will fetch tasks from one stage + async fn fetch_schedulable_tasks( &self, available_executors: &mut [ExecutorData], - job_id: &str, + n_round: u32, ) -> Result<(Vec>, usize), BallistaError>; } +pub trait StageScheduler { + fn fetch_schedulable_stage(&self, cond: F) -> Option + where + F: Fn(&StageKey) -> bool; +} + #[async_trait] impl TaskScheduler for SchedulerState { - async fn fetch_tasks( + async fn fetch_schedulable_tasks( &self, available_executors: &mut [ExecutorData], - job_id: &str, + n_round: u32, ) -> Result<(Vec>, usize), BallistaError> { let mut ret: Vec> = Vec::with_capacity(available_executors.len()); - for _idx in 0..available_executors.len() { + let mut max_task_num = 0u32; + for executor in available_executors.iter() { ret.push(Vec::new()); + max_task_num += executor.available_task_slots; } - let mut num_tasks = 0; - loop { - info!("Go inside fetching task loop"); - let mut has_tasks = true; - for (idx, executor) in available_executors.iter_mut().enumerate() { - if executor.available_task_slots == 0 { - break; - } - let plan = self - .assign_next_schedulable_job_task(&executor.executor_id, job_id) - .await - .map_err(|e| { - let msg = format!("Error finding next assignable task: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - if let Some((task, _plan)) = &plan { - let task_id = task.task_id.as_ref().unwrap(); - info!( - "Sending new task to {}: {}/{}/{}", - executor.executor_id, - task_id.job_id, - task_id.stage_id, - task_id.partition_id - ); - } - match plan { - Some((status, plan)) => { + + let mut tasks_status = vec![]; + let mut has_resources = true; + for i in 0..n_round { + if !has_resources { + break; + } + let mut num_tasks = 0; + // For each round, it will fetch tasks from one stage + if let Some((job_id, stage_id, tasks)) = + self.stage_manager.fetch_pending_tasks( + max_task_num as usize - tasks_status.len(), + |stage_key| { + // Don't scheduler stages for jobs with error status + if let Some(job_meta) = self.get_job_metadata(&stage_key.0) { + if !matches!( + &job_meta.status, + Some(job_status::Status::Failed(FailedJob { error: _ })) + ) { + true + } else { + info!("Stage {}/{} not to be scheduled due to its job failed", stage_key.0, stage_key.1); + false + } + } else { + false + } + }, + ) + { + let plan = + self.get_stage_plan(&job_id, stage_id as usize) + .ok_or_else(|| { + BallistaError::General(format!( + "Fail to find execution plan for stage {}/{}", + job_id, stage_id + )) + })?; + loop { + debug!("Go inside fetching task loop for stage {}/{}", job_id, stage_id); + + let mut has_tasks = true; + for (idx, executor) in available_executors.iter_mut().enumerate() { + if executor.available_task_slots == 0 { + has_resources = false; + break; + } + + if num_tasks >= tasks.len() { + has_tasks = false; + break; + } + + let task_id = PartitionId { + job_id: job_id.clone(), + stage_id: stage_id as usize, + partition_id: tasks[num_tasks] as usize, + }; + + let task_id = Some(task_id.into()); + let running_task = TaskStatus { + task_id: task_id.clone(), + status: Some(task_status::Status::Running(RunningTask { + executor_id: executor.executor_id.to_owned(), + })), + }; + tasks_status.push(running_task); + let plan_clone = plan.clone(); let output_partitioning = if let Some(shuffle_writer) = plan_clone.as_any().downcast_ref::() @@ -91,12 +142,12 @@ impl TaskScheduler let mut buf: Vec = vec![]; U::try_from_physical_plan( - plan, + plan.clone(), self.get_codec().physical_extension_codec(), ) .and_then(|m| m.try_encode(&mut buf)) .map_err(|e| { - Status::internal(format!( + tonic::Status::internal(format!( "error serializing execution plan: {:?}", e )) @@ -104,31 +155,38 @@ impl TaskScheduler ret[idx].push(TaskDefinition { plan: buf, - task_id: status.task_id, + task_id, output_partitioning: hash_partitioning_to_proto( output_partitioning, ) - .map_err(|_| Status::internal("TBD".to_string()))?, + .map_err(|_| tonic::Status::internal("TBD".to_string()))?, }); executor.available_task_slots -= 1; num_tasks += 1; } - _ => { - // Indicate there's no more tasks to be scheduled - has_tasks = false; + if !has_tasks { + break; + } + if !has_resources { break; } } } - if !has_tasks { - break; - } - let has_executors = - available_executors.get(0).unwrap().available_task_slots > 0; - if !has_executors { + if !has_resources { + info!( + "Not enough resource for task running. Stopped at round {}", + i + ); break; } } - Ok((ret, num_tasks)) + + let total_task_num = tasks_status.len(); + info!("{} tasks to be scheduled", total_task_num); + + // No need to deal with the stage event, since the task status is changing from pending to running + self.stage_manager.update_tasks_status(tasks_status); + + Ok((ret, total_task_num)) } }