From 5b3604497c26ce35327c79303a04f3d4e660389c Mon Sep 17 00:00:00 2001 From: yangzhong Date: Wed, 19 Oct 2022 16:33:13 +0800 Subject: [PATCH] Add round robin executor slots reservation policy for the scheduler to evenly assign tasks to executors --- ballista/scheduler/scheduler_config_spec.toml | 6 + ballista/scheduler/src/config.rs | 44 +++++ ballista/scheduler/src/lib.rs | 1 + ballista/scheduler/src/main.rs | 23 ++- .../scheduler/src/scheduler_server/mod.rs | 15 +- .../scheduler/src/state/executor_manager.rs | 186 +++++++++++++++--- ballista/scheduler/src/state/mod.rs | 5 +- 7 files changed, 242 insertions(+), 38 deletions(-) create mode 100644 ballista/scheduler/src/config.rs diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml index 6ec78103c..625e23b9c 100644 --- a/ballista/scheduler/scheduler_config_spec.toml +++ b/ballista/scheduler/scheduler_config_spec.toml @@ -72,6 +72,12 @@ type = "ballista_core::config::TaskSchedulingPolicy" doc = "The scheduing policy for the scheduler, possible values: pull-staged, push-staged. Default: pull-staged" default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" +[[param]] +name = "executor_slots_policy" +type = "ballista_scheduler::config::SlotsPolicy" +doc = "The executor slots policy for the scheduler, possible values: bias, round-robin. Default: bias" +default = "ballista_scheduler::config::SlotsPolicy::Bias" + [[param]] name = "plugin_dir" type = "String" diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs new file mode 100644 index 000000000..4f1f5c243 --- /dev/null +++ b/ballista/scheduler/src/config.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +//! Ballista scheduler specific configuration + +use clap::ArgEnum; +use std::fmt; + +// an enum used to configure the executor slots policy +// needs to be visible to code generated by configure_me +#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)] +pub enum SlotsPolicy { + Bias, + RoundRobin, +} + +impl std::str::FromStr for SlotsPolicy { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + ArgEnum::from_str(s, true) + } +} + +impl parse_arg::ParseArgFromStr for SlotsPolicy { + fn describe_type(mut writer: W) -> fmt::Result { + write!(writer, "The executor slots policy for the scheduler") + } +} diff --git a/ballista/scheduler/src/lib.rs b/ballista/scheduler/src/lib.rs index d755bc682..98fac3098 100644 --- a/ballista/scheduler/src/lib.rs +++ b/ballista/scheduler/src/lib.rs @@ -18,6 +18,7 @@ #![doc = include_str ! ("../README.md")] pub mod api; +pub mod config; pub mod display; pub mod planner; pub mod scheduler_server; diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs index 9b09db6af..fafdfa7a0 100644 --- a/ballista/scheduler/src/main.rs +++ b/ballista/scheduler/src/main.rs @@ -62,6 +62,7 @@ mod config { } use ballista_core::utils::create_grpc_server; +use ballista_scheduler::config::SlotsPolicy; #[cfg(feature = "flight-sql")] use ballista_scheduler::flight_sql::FlightSqlServiceImpl; use config::prelude::*; @@ -71,7 +72,8 @@ async fn start_server( scheduler_name: String, config_backend: Arc, addr: SocketAddr, - policy: TaskSchedulingPolicy, + scheduling_policy: TaskSchedulingPolicy, + slots_policy: SlotsPolicy, ) -> Result<()> { info!( "Ballista v{} Scheduler listening on {:?}", @@ -80,14 +82,15 @@ async fn start_server( // Should only call SchedulerServer::new() once in the process info!( "Starting Scheduler grpc server with task scheduling policy of {:?}", - policy + scheduling_policy ); let mut scheduler_server: SchedulerServer = - match policy { + match scheduling_policy { TaskSchedulingPolicy::PushStaged => SchedulerServer::new_with_policy( scheduler_name, config_backend.clone(), - policy, + scheduling_policy, + slots_policy, BallistaCodec::default(), default_session_builder, ), @@ -239,7 +242,15 @@ async fn main() -> Result<()> { } }; - let policy: TaskSchedulingPolicy = opt.scheduler_policy; - start_server(scheduler_name, client, addr, policy).await?; + let scheduling_policy: TaskSchedulingPolicy = opt.scheduler_policy; + let slots_policy: SlotsPolicy = opt.executor_slots_policy; + start_server( + scheduler_name, + client, + addr, + scheduling_policy, + slots_policy, + ) + .await?; Ok(()) } diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 176b85c89..883c6d067 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -30,6 +30,7 @@ use datafusion::logical_plan::LogicalPlan; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_proto::logical_plan::AsLogicalPlan; +use crate::config::SlotsPolicy; use log::{error, warn}; use crate::scheduler_server::event::QueryStageSchedulerEvent; @@ -72,6 +73,7 @@ impl SchedulerServer SchedulerServer SchedulerServer, - policy: TaskSchedulingPolicy, + scheduling_policy: TaskSchedulingPolicy, + slots_policy: SlotsPolicy, codec: BallistaCodec, session_builder: SessionBuilder, ) -> Self { @@ -104,9 +108,10 @@ impl SchedulerServer Result> { let state_storage = Arc::new(StandaloneClient::try_new_temporary()?); let mut scheduler: SchedulerServer = SchedulerServer::new_with_policy( "localhost:50050".to_owned(), state_storage.clone(), - policy, + scheduling_policy, + SlotsPolicy::Bias, BallistaCodec::default(), default_session_builder, ); diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 70fc7a0a0..2da288fec 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -23,6 +23,7 @@ use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock}; use ballista_core::error::{BallistaError, Result}; use ballista_core::serde::protobuf; +use crate::config::SlotsPolicy; use crate::state::execution_graph::RunningTaskInfo; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; use ballista_core::serde::protobuf::{ @@ -84,6 +85,8 @@ pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180; #[derive(Clone)] pub(crate) struct ExecutorManager { + // executor slot policy + slots_policy: SlotsPolicy, state: Arc, // executor_id -> ExecutorMetadata map executor_metadata: Arc>, @@ -95,8 +98,12 @@ pub(crate) struct ExecutorManager { } impl ExecutorManager { - pub(crate) fn new(state: Arc) -> Self { + pub(crate) fn new( + state: Arc, + slots_policy: SlotsPolicy, + ) -> Self { Self { + slots_policy, state, executor_metadata: Arc::new(DashMap::new()), executors_heartbeat: Arc::new(DashMap::new()), @@ -121,38 +128,28 @@ impl ExecutorManager { /// for scheduling. /// This operation is atomic, so if this method return an Err, no slots have been reserved. pub async fn reserve_slots(&self, n: u32) -> Result> { + self.reserve_slots_global(n).await + } + + /// Reserve up to n executor task slots with considering the global resource snapshot + async fn reserve_slots_global(&self, n: u32) -> Result> { let lock = self.state.lock(Keyspace::Slots, "global").await?; with_lock(lock, async { debug!("Attempting to reserve {} executor slots", n); let start = Instant::now(); - let mut reservations: Vec = vec![]; - let mut desired: u32 = n; let alive_executors = self.get_alive_executors_within_one_minute(); - let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![]; - - for executor_id in alive_executors { - let value = self.state.get(Keyspace::Slots, &executor_id).await?; - let mut data = - decode_into::(&value)?; - let take = std::cmp::min(data.available_task_slots, desired); - - for _ in 0..take { - reservations.push(ExecutorReservation::new_free(executor_id.clone())); - data.available_task_slots -= 1; - desired -= 1; + let (reservations, txn_ops) = match self.slots_policy { + SlotsPolicy::Bias => { + self.reserve_slots_global_bias(n, alive_executors).await? } - - let proto: protobuf::ExecutorData = data.into(); - let new_data = encode_protobuf(&proto)?; - txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id)); - - if desired == 0 { - break; + SlotsPolicy::RoundRobin => { + self.reserve_slots_global_round_robin(n, alive_executors) + .await? } - } + }; self.state.apply_txn(txn_ops).await?; @@ -168,6 +165,112 @@ impl ExecutorManager { .await } + /// It will get ExecutorReservation from one executor as many as possible. + /// By this way, it can reduce the chance of decoding and encoding ExecutorData. + /// However, it may make the whole cluster unbalanced, + /// which means some executors may be very busy while other executors may be idle. + async fn reserve_slots_global_bias( + &self, + mut n: u32, + alive_executors: HashSet, + ) -> Result<(Vec, Vec<(Operation, Keyspace, String)>)> { + let mut reservations: Vec = vec![]; + let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![]; + + for executor_id in alive_executors { + if n == 0 { + break; + } + + let value = self.state.get(Keyspace::Slots, &executor_id).await?; + let mut data = decode_into::(&value)?; + let take = std::cmp::min(data.available_task_slots, n); + + for _ in 0..take { + reservations.push(ExecutorReservation::new_free(executor_id.clone())); + data.available_task_slots -= 1; + n -= 1; + } + + let proto: protobuf::ExecutorData = data.into(); + let new_data = encode_protobuf(&proto)?; + txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id)); + } + + Ok((reservations, txn_ops)) + } + + /// Create ExecutorReservation in a round robin way to evenly assign tasks to executors + async fn reserve_slots_global_round_robin( + &self, + mut n: u32, + alive_executors: HashSet, + ) -> Result<(Vec, Vec<(Operation, Keyspace, String)>)> { + let mut reservations: Vec = vec![]; + let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![]; + + let all_executor_data = self + .state + .scan(Keyspace::Slots, None) + .await? + .into_iter() + .map(|(_, data)| decode_into::(&data)) + .collect::>>()?; + + let mut available_executor_data: Vec = all_executor_data + .into_iter() + .filter_map(|data| { + (data.available_task_slots > 0 + && alive_executors.contains(&data.executor_id)) + .then_some(data) + }) + .collect(); + available_executor_data + .sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots)); + + // Exclusive + let mut last_updated_idx = 0usize; + loop { + let n_before = n; + for (idx, data) in available_executor_data.iter_mut().enumerate() { + if n == 0 { + break; + } + + // Since the vector is sorted in descending order, + // if finding one executor has not enough slots, the following will have not enough, either + if data.available_task_slots == 0 { + break; + } + + reservations + .push(ExecutorReservation::new_free(data.executor_id.clone())); + data.available_task_slots -= 1; + n -= 1; + + if idx >= last_updated_idx { + last_updated_idx = idx + 1; + } + } + + if n_before == n { + break; + } + } + + for (idx, data) in available_executor_data.into_iter().enumerate() { + if idx >= last_updated_idx { + break; + } + let executor_id = data.executor_id.clone(); + let proto: protobuf::ExecutorData = data.into(); + let new_data = encode_protobuf(&proto)?; + txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id)); + } + + Ok((reservations, txn_ops)) + } + /// Returned reserved task slots to the pool of available slots. This operation is atomic /// so either the entire pool of reserved task slots it returned or none are. pub async fn cancel_reservations( @@ -638,6 +741,7 @@ impl ExecutorHeartbeatListener { #[cfg(test)] mod test { + use crate::config::SlotsPolicy; use crate::state::backend::standalone::StandaloneClient; use crate::state::executor_manager::{ExecutorManager, ExecutorReservation}; use ballista_core::error::Result; @@ -648,9 +752,16 @@ mod test { #[tokio::test] async fn test_reserve_and_cancel() -> Result<()> { + test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?; + test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?; + + Ok(()) + } + + async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) -> Result<()> { let state_storage = Arc::new(StandaloneClient::try_new_temporary()?); - let executor_manager = ExecutorManager::new(state_storage); + let executor_manager = ExecutorManager::new(state_storage, slots_policy); let executors = test_executors(10, 4); @@ -678,9 +789,16 @@ mod test { #[tokio::test] async fn test_reserve_partial() -> Result<()> { + test_reserve_partial_inner(SlotsPolicy::Bias).await?; + test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?; + + Ok(()) + } + + async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) -> Result<()> { let state_storage = Arc::new(StandaloneClient::try_new_temporary()?); - let executor_manager = ExecutorManager::new(state_storage); + let executor_manager = ExecutorManager::new(state_storage, slots_policy); let executors = test_executors(10, 4); @@ -720,6 +838,13 @@ mod test { #[tokio::test] async fn test_reserve_concurrent() -> Result<()> { + test_reserve_concurrent_inner(SlotsPolicy::Bias).await?; + test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?; + + Ok(()) + } + + async fn test_reserve_concurrent_inner(slots_policy: SlotsPolicy) -> Result<()> { let (sender, mut receiver) = tokio::sync::mpsc::channel::>>(1000); @@ -727,7 +852,7 @@ mod test { let state_storage = Arc::new(StandaloneClient::try_new_temporary()?); - let executor_manager = ExecutorManager::new(state_storage); + let executor_manager = ExecutorManager::new(state_storage, slots_policy); for (executor_metadata, executor_data) in executors { executor_manager @@ -762,9 +887,16 @@ mod test { #[tokio::test] async fn test_register_reserve() -> Result<()> { + test_register_reserve_inner(SlotsPolicy::Bias).await?; + test_register_reserve_inner(SlotsPolicy::RoundRobin).await?; + + Ok(()) + } + + async fn test_register_reserve_inner(slots_policy: SlotsPolicy) -> Result<()> { let state_storage = Arc::new(StandaloneClient::try_new_temporary()?); - let executor_manager = ExecutorManager::new(state_storage); + let executor_manager = ExecutorManager::new(state_storage, slots_policy); let executors = test_executors(10, 4); diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 3de58a0be..6943cbd56 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -28,6 +28,7 @@ use crate::state::executor_manager::{ExecutorManager, ExecutorReservation}; use crate::state::session_manager::SessionManager; use crate::state::task_manager::TaskManager; +use crate::config::SlotsPolicy; use crate::state::execution_graph::TaskDescription; use ballista_core::error::{BallistaError, Result}; use ballista_core::serde::protobuf::TaskStatus; @@ -101,6 +102,7 @@ impl SchedulerState SchedulerState, scheduler_name: String, + slots_policy: SlotsPolicy, ) -> Self { Self { - executor_manager: ExecutorManager::new(config_client.clone()), + executor_manager: ExecutorManager::new(config_client.clone(), slots_policy), task_manager: TaskManager::new( config_client.clone(), session_builder,