Skip to content

Commit 5dfdfea

Browse files
Remove ExecutorReservation and change the task assignment philosophy from executor first to task first (#823)
* Remove ExecutorReservation and change the task assignment philosophy from executor first to task first * Add PENDING_JOBS_METRIC and RUNNING_JOBS_METRIC to replace INFLIGHT_TASKS_METRIC_NAME --------- Co-authored-by: yangzhong <[email protected]>
1 parent 28f64ea commit 5dfdfea

File tree

16 files changed

+1412
-2437
lines changed

16 files changed

+1412
-2437
lines changed

ballista/scheduler/src/cluster/kv.rs

+67-204
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
use crate::cluster::storage::{KeyValueStore, Keyspace, Lock, Operation, WatchEvent};
1919
use crate::cluster::{
20-
reserve_slots_bias, reserve_slots_round_robin, ClusterState, ExecutorHeartbeatStream,
21-
JobState, JobStateEvent, JobStateEventStream, JobStatus, TaskDistribution,
20+
bind_task_bias, bind_task_round_robin, BoundTask, ClusterState,
21+
ExecutorHeartbeatStream, ExecutorSlot, JobState, JobStateEvent, JobStateEventStream,
22+
JobStatus, TaskDistribution,
2223
};
2324
use crate::scheduler_server::{timestamp_secs, SessionBuilder};
2425
use crate::state::execution_graph::ExecutionGraph;
25-
use crate::state::executor_manager::ExecutorReservation;
2626
use crate::state::session_manager::create_datafusion_context;
27+
use crate::state::task_manager::JobInfoCache;
2728
use crate::state::{decode_into, decode_protobuf};
2829
use async_trait::async_trait;
2930
use ballista_core::config::BallistaConfig;
@@ -174,12 +175,12 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
174175
Ok(())
175176
}
176177

177-
async fn reserve_slots(
178+
async fn bind_schedulable_tasks(
178179
&self,
179-
num_slots: u32,
180180
distribution: TaskDistribution,
181+
active_jobs: Arc<HashMap<String, JobInfoCache>>,
181182
executors: Option<HashSet<String>>,
182-
) -> Result<Vec<ExecutorReservation>> {
183+
) -> Result<Vec<BoundTask>> {
183184
let lock = self.store.lock(Keyspace::Slots, "global").await?;
184185

185186
with_lock(lock, async {
@@ -192,7 +193,7 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
192193
))
193194
})?;
194195

195-
let mut available_slots: Vec<&mut AvailableTaskSlots> = slots
196+
let available_slots: Vec<&mut AvailableTaskSlots> = slots
196197
.task_slots
197198
.iter_mut()
198199
.filter_map(|data| {
@@ -205,82 +206,33 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
205206
})
206207
.collect();
207208

208-
available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
209-
210-
let reservations = match distribution {
211-
TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
209+
let bound_tasks = match distribution {
210+
TaskDistribution::Bias => {
211+
bind_task_bias(available_slots, active_jobs, |_| false).await
212+
}
212213
TaskDistribution::RoundRobin => {
213-
reserve_slots_round_robin(available_slots, num_slots)
214+
bind_task_round_robin(available_slots, active_jobs, |_| false).await
214215
}
215216
};
216217

217-
if !reservations.is_empty() {
218+
if !bound_tasks.is_empty() {
218219
self.store
219220
.put(Keyspace::Slots, "all".to_owned(), slots.encode_to_vec())
220221
.await?
221222
}
222223

223-
Ok(reservations)
224+
Ok(bound_tasks)
224225
})
225226
.await
226227
}
227228

228-
async fn reserve_slots_exact(
229-
&self,
230-
num_slots: u32,
231-
distribution: TaskDistribution,
232-
executors: Option<HashSet<String>>,
233-
) -> Result<Vec<ExecutorReservation>> {
234-
let lock = self.store.lock(Keyspace::Slots, "global").await?;
235-
236-
with_lock(lock, async {
237-
let resources = self.store.get(Keyspace::Slots, "all").await?;
238-
239-
let mut slots =
240-
ExecutorTaskSlots::decode(resources.as_slice()).map_err(|err| {
241-
BallistaError::Internal(format!(
242-
"Unexpected value in executor slots state: {err:?}"
243-
))
244-
})?;
245-
246-
let mut available_slots: Vec<&mut AvailableTaskSlots> = slots
247-
.task_slots
248-
.iter_mut()
249-
.filter_map(|data| {
250-
(data.slots > 0
251-
&& executors
252-
.as_ref()
253-
.map(|executors| executors.contains(&data.executor_id))
254-
.unwrap_or(true))
255-
.then_some(data)
256-
})
257-
.collect();
258-
259-
available_slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
260-
261-
let reservations = match distribution {
262-
TaskDistribution::Bias => reserve_slots_bias(available_slots, num_slots),
263-
TaskDistribution::RoundRobin => {
264-
reserve_slots_round_robin(available_slots, num_slots)
265-
}
266-
};
267-
268-
if reservations.len() == num_slots as usize {
269-
self.store
270-
.put(Keyspace::Slots, "all".to_owned(), slots.encode_to_vec())
271-
.await?;
272-
Ok(reservations)
273-
} else {
274-
Ok(vec![])
275-
}
276-
})
277-
.await
278-
}
229+
async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()> {
230+
let mut increments = HashMap::new();
231+
for (executor_id, num_slots) in executor_slots {
232+
let v = increments.entry(executor_id).or_insert_with(|| 0);
233+
*v += num_slots;
234+
}
279235

280-
async fn cancel_reservations(
281-
&self,
282-
reservations: Vec<ExecutorReservation>,
283-
) -> Result<()> {
284236
let lock = self.store.lock(Keyspace::Slots, "all").await?;
285237

286238
with_lock(lock, async {
@@ -293,18 +245,9 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
293245
))
294246
})?;
295247

296-
let mut increments = HashMap::new();
297-
for ExecutorReservation { executor_id, .. } in reservations {
298-
if let Some(inc) = increments.get_mut(&executor_id) {
299-
*inc += 1;
300-
} else {
301-
increments.insert(executor_id, 1usize);
302-
}
303-
}
304-
305248
for executor_slots in slots.task_slots.iter_mut() {
306249
if let Some(slots) = increments.get(&executor_slots.executor_id) {
307-
executor_slots.slots += *slots as u32;
250+
executor_slots.slots += *slots;
308251
}
309252
}
310253

@@ -319,8 +262,7 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
319262
&self,
320263
metadata: ExecutorMetadata,
321264
spec: ExecutorData,
322-
reserve: bool,
323-
) -> Result<Vec<ExecutorReservation>> {
265+
) -> Result<()> {
324266
let executor_id = metadata.id.clone();
325267

326268
//TODO this should be in a transaction
@@ -338,83 +280,40 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
338280
})
339281
.await?;
340282

341-
if !reserve {
342-
let available_slots = AvailableTaskSlots {
343-
executor_id,
344-
slots: spec.available_task_slots,
345-
};
346-
347-
let lock = self.store.lock(Keyspace::Slots, "all").await?;
283+
let available_slots = AvailableTaskSlots {
284+
executor_id,
285+
slots: spec.available_task_slots,
286+
};
348287

349-
with_lock(lock, async {
350-
let current_slots = self.store.get(Keyspace::Slots, "all").await?;
288+
let lock = self.store.lock(Keyspace::Slots, "all").await?;
351289

352-
let mut current_slots: ExecutorTaskSlots =
353-
decode_protobuf(current_slots.as_slice())?;
290+
with_lock(lock, async {
291+
let current_slots = self.store.get(Keyspace::Slots, "all").await?;
354292

355-
if let Some((idx, _)) =
356-
current_slots.task_slots.iter().find_position(|slots| {
357-
slots.executor_id == available_slots.executor_id
358-
})
359-
{
360-
current_slots.task_slots[idx] = available_slots;
361-
} else {
362-
current_slots.task_slots.push(available_slots);
363-
}
293+
let mut current_slots: ExecutorTaskSlots =
294+
decode_protobuf(current_slots.as_slice())?;
364295

365-
self.store
366-
.put(
367-
Keyspace::Slots,
368-
"all".to_string(),
369-
current_slots.encode_to_vec(),
370-
)
371-
.await
372-
})
373-
.await?;
374-
375-
Ok(vec![])
376-
} else {
377-
let num_slots = spec.available_task_slots as usize;
378-
let mut reservations: Vec<ExecutorReservation> = vec![];
379-
for _ in 0..num_slots {
380-
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
296+
if let Some((idx, _)) = current_slots
297+
.task_slots
298+
.iter()
299+
.find_position(|slots| slots.executor_id == available_slots.executor_id)
300+
{
301+
current_slots.task_slots[idx] = available_slots;
302+
} else {
303+
current_slots.task_slots.push(available_slots);
381304
}
382305

383-
let available_slots = AvailableTaskSlots {
384-
executor_id,
385-
slots: 0,
386-
};
387-
388-
let lock = self.store.lock(Keyspace::Slots, "all").await?;
389-
390-
with_lock(lock, async {
391-
let current_slots = self.store.get(Keyspace::Slots, "all").await?;
392-
393-
let mut current_slots: ExecutorTaskSlots =
394-
decode_protobuf(current_slots.as_slice())?;
395-
396-
if let Some((idx, _)) =
397-
current_slots.task_slots.iter().find_position(|slots| {
398-
slots.executor_id == available_slots.executor_id
399-
})
400-
{
401-
current_slots.task_slots[idx] = available_slots;
402-
} else {
403-
current_slots.task_slots.push(available_slots);
404-
}
405-
406-
self.store
407-
.put(
408-
Keyspace::Slots,
409-
"all".to_string(),
410-
current_slots.encode_to_vec(),
411-
)
412-
.await
413-
})
414-
.await?;
306+
self.store
307+
.put(
308+
Keyspace::Slots,
309+
"all".to_string(),
310+
current_slots.encode_to_vec(),
311+
)
312+
.await
313+
})
314+
.await?;
415315

416-
Ok(reservations)
417-
}
316+
Ok(())
418317
}
419318

420319
async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
@@ -502,18 +401,17 @@ impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
502401
impl<S: KeyValueStore, T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> JobState
503402
for KeyValueState<S, T, U>
504403
{
505-
async fn accept_job(
506-
&self,
507-
job_id: &str,
508-
job_name: &str,
509-
queued_at: u64,
510-
) -> Result<()> {
404+
fn accept_job(&self, job_id: &str, job_name: &str, queued_at: u64) -> Result<()> {
511405
self.queued_jobs
512406
.insert(job_id.to_string(), (job_name.to_string(), queued_at));
513407

514408
Ok(())
515409
}
516410

411+
fn pending_job_number(&self) -> usize {
412+
self.queued_jobs.len()
413+
}
414+
517415
async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> Result<()> {
518416
if self.queued_jobs.get(&job_id).is_some() {
519417
let status = graph.status();
@@ -774,62 +672,17 @@ async fn with_lock<Out, F: Future<Output = Out>>(mut lock: Box<dyn Lock>, op: F)
774672

775673
#[cfg(test)]
776674
mod test {
675+
777676
use crate::cluster::kv::KeyValueState;
778677
use crate::cluster::storage::sled::SledClient;
779-
use crate::cluster::test::{
780-
test_executor_registration, test_fuzz_reservations, test_job_lifecycle,
781-
test_job_planning_failure, test_reservation,
782-
};
783-
use crate::cluster::TaskDistribution;
678+
use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
784679
use crate::test_utils::{
785680
test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
786681
};
787682
use ballista_core::error::Result;
788683
use ballista_core::serde::BallistaCodec;
789684
use ballista_core::utils::default_session_builder;
790685

791-
#[cfg(feature = "sled")]
792-
fn make_sled_state() -> Result<KeyValueState<SledClient>> {
793-
Ok(KeyValueState::new(
794-
"",
795-
SledClient::try_new_temporary()?,
796-
BallistaCodec::default(),
797-
default_session_builder,
798-
))
799-
}
800-
801-
#[cfg(feature = "sled")]
802-
#[tokio::test]
803-
async fn test_sled_executor_reservation() -> Result<()> {
804-
test_executor_registration(make_sled_state()?).await
805-
}
806-
807-
#[cfg(feature = "sled")]
808-
#[tokio::test]
809-
async fn test_sled_reserve() -> Result<()> {
810-
test_reservation(make_sled_state()?, TaskDistribution::Bias).await?;
811-
test_reservation(make_sled_state()?, TaskDistribution::RoundRobin).await?;
812-
813-
Ok(())
814-
}
815-
816-
#[cfg(feature = "sled")]
817-
#[tokio::test]
818-
async fn test_sled_fuzz_reserve() -> Result<()> {
819-
test_fuzz_reservations(make_sled_state()?, 10, TaskDistribution::Bias, 10, 10)
820-
.await?;
821-
test_fuzz_reservations(
822-
make_sled_state()?,
823-
10,
824-
TaskDistribution::RoundRobin,
825-
10,
826-
10,
827-
)
828-
.await?;
829-
830-
Ok(())
831-
}
832-
833686
#[cfg(feature = "sled")]
834687
#[tokio::test]
835688
async fn test_sled_job_lifecycle() -> Result<()> {
@@ -854,4 +707,14 @@ mod test {
854707

855708
Ok(())
856709
}
710+
711+
#[cfg(feature = "sled")]
712+
fn make_sled_state() -> Result<KeyValueState<SledClient>> {
713+
Ok(KeyValueState::new(
714+
"",
715+
SledClient::try_new_temporary()?,
716+
BallistaCodec::default(),
717+
default_session_builder,
718+
))
719+
}
857720
}

0 commit comments

Comments
 (0)