Skip to content

Commit a1473d0

Browse files
Add RoundRobinLocal slots policy for caching executor data to avoid seld persistency (#396)
* Add RoundRobinLocal slots policy for caching executor data to avoid seld persistency * Rename clean_up_executors_data to clean_up_job_data in executor manager Co-authored-by: yangzhong <[email protected]>
1 parent 0050ece commit a1473d0

File tree

4 files changed

+158
-6
lines changed

4 files changed

+158
-6
lines changed

ballista/scheduler/scheduler_config_spec.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
7575
[[param]]
7676
name = "executor_slots_policy"
7777
type = "ballista_scheduler::config::SlotsPolicy"
78-
doc = "The executor slots policy for the scheduler, possible values: bias, round-robin. Default: bias"
78+
doc = "The executor slots policy for the scheduler, possible values: bias, round-robin, round-robin-local. Default: bias"
7979
default = "ballista_scheduler::config::SlotsPolicy::Bias"
8080

8181
[[param]]

ballista/scheduler/src/config.rs

+7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ use std::fmt;
2727
pub enum SlotsPolicy {
2828
Bias,
2929
RoundRobin,
30+
RoundRobinLocal,
31+
}
32+
33+
impl SlotsPolicy {
34+
pub fn is_local(&self) -> bool {
35+
matches!(self, SlotsPolicy::RoundRobinLocal)
36+
}
3037
}
3138

3239
impl std::str::FromStr for SlotsPolicy {

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
155155
CLEANUP_FINISHED_JOB_DELAY_SECS,
156156
))
157157
.await;
158-
executor_manager.clean_up_executors_data(job_id).await;
158+
executor_manager.clean_up_job_data(job_id).await;
159159
});
160160
}
161161
QueryStageSchedulerEvent::JobRunningFailed(job_id, failure_reason) => {
@@ -176,7 +176,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
176176
CLEANUP_FINISHED_JOB_DELAY_SECS,
177177
))
178178
.await;
179-
executor_manager.clean_up_executors_data(job_id).await;
179+
executor_manager.clean_up_job_data(job_id).await;
180180
});
181181
}
182182
QueryStageSchedulerEvent::JobUpdated(job_id) => {
@@ -194,7 +194,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
194194
CLEANUP_FINISHED_JOB_DELAY_SECS,
195195
))
196196
.await;
197-
executor_manager.clean_up_executors_data(job_id).await;
197+
executor_manager.clean_up_job_data(job_id).await;
198198
});
199199
}
200200
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {

ballista/scheduler/src/state/executor_manager.rs

+147-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use ballista_core::utils::create_grpc_client_connection;
3535
use dashmap::{DashMap, DashSet};
3636
use futures::StreamExt;
3737
use log::{debug, error, info, warn};
38+
use parking_lot::Mutex;
3839
use std::collections::{HashMap, HashSet};
3940
use std::sync::Arc;
4041
use tonic::transport::Channel;
@@ -92,6 +93,8 @@ pub(crate) struct ExecutorManager {
9293
executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
9394
// executor_id -> ExecutorHeartbeat map
9495
executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
96+
// executor_id -> ExecutorData map, only used when the slots policy is of local
97+
executor_data: Arc<Mutex<HashMap<String, ExecutorData>>>,
9598
// dead executor sets:
9699
dead_executors: Arc<DashSet<String>>,
97100
clients: ExecutorClients,
@@ -107,6 +110,7 @@ impl ExecutorManager {
107110
state,
108111
executor_metadata: Arc::new(DashMap::new()),
109112
executors_heartbeat: Arc::new(DashMap::new()),
113+
executor_data: Arc::new(Mutex::new(HashMap::new())),
110114
dead_executors: Arc::new(DashSet::new()),
111115
clients: Default::default(),
112116
}
@@ -128,7 +132,82 @@ impl ExecutorManager {
128132
/// for scheduling.
129133
/// This operation is atomic, so if this method return an Err, no slots have been reserved.
130134
pub async fn reserve_slots(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
131-
self.reserve_slots_global(n).await
135+
if self.slots_policy.is_local() {
136+
self.reserve_slots_local(n).await
137+
} else {
138+
self.reserve_slots_global(n).await
139+
}
140+
}
141+
142+
async fn reserve_slots_local(&self, n: u32) -> Result<Vec<ExecutorReservation>> {
143+
debug!("Attempting to reserve {} executor slots", n);
144+
145+
let alive_executors = self.get_alive_executors_within_one_minute();
146+
147+
match self.slots_policy {
148+
SlotsPolicy::RoundRobinLocal => {
149+
self.reserve_slots_local_round_robin(n, alive_executors)
150+
.await
151+
}
152+
_ => Err(BallistaError::General(format!(
153+
"Reservation policy {:?} is not supported",
154+
self.slots_policy
155+
))),
156+
}
157+
}
158+
159+
/// Create ExecutorReservation in a round robin way to evenly assign tasks to executors
160+
async fn reserve_slots_local_round_robin(
161+
&self,
162+
mut n: u32,
163+
alive_executors: HashSet<String>,
164+
) -> Result<Vec<ExecutorReservation>> {
165+
let mut executor_data = self.executor_data.lock();
166+
167+
let mut available_executor_data: Vec<&mut ExecutorData> = executor_data
168+
.values_mut()
169+
.filter_map(|data| {
170+
(data.available_task_slots > 0
171+
&& alive_executors.contains(&data.executor_id))
172+
.then_some(data)
173+
})
174+
.collect();
175+
available_executor_data
176+
.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots));
177+
178+
let mut reservations: Vec<ExecutorReservation> = vec![];
179+
180+
// Exclusive
181+
let mut last_updated_idx = 0usize;
182+
loop {
183+
let n_before = n;
184+
for (idx, data) in available_executor_data.iter_mut().enumerate() {
185+
if n == 0 {
186+
break;
187+
}
188+
189+
// Since the vector is sorted in descending order,
190+
// if finding one executor has not enough slots, the following will have not enough, either
191+
if data.available_task_slots == 0 {
192+
break;
193+
}
194+
195+
reservations
196+
.push(ExecutorReservation::new_free(data.executor_id.clone()));
197+
data.available_task_slots -= 1;
198+
n -= 1;
199+
200+
if idx >= last_updated_idx {
201+
last_updated_idx = idx + 1;
202+
}
203+
}
204+
205+
if n_before == n {
206+
break;
207+
}
208+
}
209+
210+
Ok(reservations)
132211
}
133212

134213
/// Reserve up to n executor task slots with considering the global resource snapshot
@@ -149,6 +228,12 @@ impl ExecutorManager {
149228
self.reserve_slots_global_round_robin(n, alive_executors)
150229
.await?
151230
}
231+
_ => {
232+
return Err(BallistaError::General(format!(
233+
"Reservation policy {:?} is not supported",
234+
self.slots_policy
235+
)))
236+
}
152237
};
153238

154239
self.state.apply_txn(txn_ops).await?;
@@ -276,6 +361,42 @@ impl ExecutorManager {
276361
pub async fn cancel_reservations(
277362
&self,
278363
reservations: Vec<ExecutorReservation>,
364+
) -> Result<()> {
365+
if self.slots_policy.is_local() {
366+
self.cancel_reservations_local(reservations).await
367+
} else {
368+
self.cancel_reservations_global(reservations).await
369+
}
370+
}
371+
372+
async fn cancel_reservations_local(
373+
&self,
374+
reservations: Vec<ExecutorReservation>,
375+
) -> Result<()> {
376+
let mut executor_slots: HashMap<String, u32> = HashMap::new();
377+
for reservation in reservations {
378+
if let Some(slots) = executor_slots.get_mut(&reservation.executor_id) {
379+
*slots += 1;
380+
} else {
381+
executor_slots.insert(reservation.executor_id, 1);
382+
}
383+
}
384+
385+
let mut executor_data = self.executor_data.lock();
386+
for (id, released_slots) in executor_slots.into_iter() {
387+
if let Some(slots) = executor_data.get_mut(&id) {
388+
slots.available_task_slots += released_slots;
389+
} else {
390+
warn!("ExecutorData for {} is not cached in memory", id);
391+
}
392+
}
393+
394+
Ok(())
395+
}
396+
397+
async fn cancel_reservations_global(
398+
&self,
399+
reservations: Vec<ExecutorReservation>,
279400
) -> Result<()> {
280401
let lock = self.state.lock(Keyspace::Slots, "global").await?;
281402

@@ -363,7 +484,7 @@ impl ExecutorManager {
363484
}
364485

365486
/// Send rpc to Executors to clean up the job data
366-
pub async fn clean_up_executors_data(&self, job_id: String) {
487+
pub async fn clean_up_job_data(&self, job_id: String) {
367488
let alive_executors = self.get_alive_executors_within_one_minute();
368489
for executor in alive_executors {
369490
let job_id_clone = job_id.to_owned();
@@ -506,6 +627,12 @@ impl ExecutorManager {
506627
.await?;
507628

508629
if !reserve {
630+
if self.slots_policy.is_local() {
631+
let mut executor_data = self.executor_data.lock();
632+
executor_data
633+
.insert(specification.executor_id.clone(), specification.clone());
634+
}
635+
509636
let proto: protobuf::ExecutorData = specification.into();
510637
let value = encode_protobuf(&proto)?;
511638
self.state.put(Keyspace::Slots, executor_id, value).await?;
@@ -519,6 +646,13 @@ impl ExecutorManager {
519646
}
520647

521648
specification.available_task_slots = 0;
649+
650+
if self.slots_policy.is_local() {
651+
let mut executor_data = self.executor_data.lock();
652+
executor_data
653+
.insert(specification.executor_id.clone(), specification.clone());
654+
}
655+
522656
let proto: protobuf::ExecutorData = specification.into();
523657
let value = encode_protobuf(&proto)?;
524658
self.state.put(Keyspace::Slots, executor_id, value).await?;
@@ -611,6 +745,13 @@ impl ExecutorManager {
611745

612746
self.executors_heartbeat
613747
.remove(&heartbeat.executor_id.clone());
748+
749+
// Remove executor data cache for dead executors
750+
{
751+
let mut executor_data = self.executor_data.lock();
752+
executor_data.remove(&executor_id);
753+
}
754+
614755
self.dead_executors.insert(executor_id);
615756
Ok(())
616757
}
@@ -754,6 +895,7 @@ mod test {
754895
async fn test_reserve_and_cancel() -> Result<()> {
755896
test_reserve_and_cancel_inner(SlotsPolicy::Bias).await?;
756897
test_reserve_and_cancel_inner(SlotsPolicy::RoundRobin).await?;
898+
test_reserve_and_cancel_inner(SlotsPolicy::RoundRobinLocal).await?;
757899

758900
Ok(())
759901
}
@@ -791,6 +933,7 @@ mod test {
791933
async fn test_reserve_partial() -> Result<()> {
792934
test_reserve_partial_inner(SlotsPolicy::Bias).await?;
793935
test_reserve_partial_inner(SlotsPolicy::RoundRobin).await?;
936+
test_reserve_partial_inner(SlotsPolicy::RoundRobinLocal).await?;
794937

795938
Ok(())
796939
}
@@ -840,6 +983,7 @@ mod test {
840983
async fn test_reserve_concurrent() -> Result<()> {
841984
test_reserve_concurrent_inner(SlotsPolicy::Bias).await?;
842985
test_reserve_concurrent_inner(SlotsPolicy::RoundRobin).await?;
986+
test_reserve_concurrent_inner(SlotsPolicy::RoundRobinLocal).await?;
843987

844988
Ok(())
845989
}
@@ -889,6 +1033,7 @@ mod test {
8891033
async fn test_register_reserve() -> Result<()> {
8901034
test_register_reserve_inner(SlotsPolicy::Bias).await?;
8911035
test_register_reserve_inner(SlotsPolicy::RoundRobin).await?;
1036+
test_register_reserve_inner(SlotsPolicy::RoundRobinLocal).await?;
8921037

8931038
Ok(())
8941039
}

0 commit comments

Comments
 (0)