Skip to content

Commit 08140ef

Browse files
authored
Merge pull request #19 from coralogix/sc-5792
sc-5792: pending queue variable
2 parents a7f1384 + 53cdc2b commit 08140ef

File tree

3 files changed

+103
-41
lines changed

3 files changed

+103
-41
lines changed

ballista/rust/scheduler/src/scheduler_server/external_scaler.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
5757
Ok(Response::new(GetMetricsResponse {
5858
metric_values: vec![MetricValue {
5959
metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
60-
metric_value: 10000000, // A very high number to saturate the HPA
60+
metric_value: self.state.task_manager.get_pending_task_queue_size()
61+
as i64,
6162
}],
6263
}))
6364
}

ballista/rust/scheduler/src/state/mod.rs

+24
Original file line numberDiff line numberDiff line change
@@ -351,24 +351,37 @@ mod test {
351351

352352
let plan = test_graph(session_ctx.clone()).await;
353353

354+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);
355+
354356
// Create 4 jobs so we have four pending tasks
355357
state
356358
.task_manager
357359
.submit_job("job-1", session_ctx.session_id().as_str(), plan.clone(), 0)
358360
.await?;
361+
362+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);
363+
359364
state
360365
.task_manager
361366
.submit_job("job-2", session_ctx.session_id().as_str(), plan.clone(), 0)
362367
.await?;
368+
369+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 2);
370+
363371
state
364372
.task_manager
365373
.submit_job("job-3", session_ctx.session_id().as_str(), plan.clone(), 0)
366374
.await?;
375+
376+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 3);
377+
367378
state
368379
.task_manager
369380
.submit_job("job-4", session_ctx.session_id().as_str(), plan.clone(), 0)
370381
.await?;
371382

383+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 4);
384+
372385
let executors = test_executors(1, 4);
373386

374387
let (executor_metadata, executor_data) = executors[0].clone();
@@ -381,6 +394,7 @@ mod test {
381394
let result = state.offer_reservation(reservations).await?;
382395

383396
assert!(result.is_empty());
397+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);
384398

385399
// All task slots should be assigned so we should not be able to reserve more tasks
386400
let reservations = state.executor_manager.reserve_slots(4).await?;
@@ -408,12 +422,16 @@ mod test {
408422

409423
let plan = test_graph(session_ctx.clone()).await;
410424

425+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);
426+
411427
// Create a job
412428
state
413429
.task_manager
414430
.submit_job("job-1", session_ctx.session_id().as_str(), plan.clone(), 0)
415431
.await?;
416432

433+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);
434+
417435
let executors = test_executors(1, 4);
418436

419437
let (executor_metadata, executor_data) = executors[0].clone();
@@ -450,6 +468,8 @@ mod test {
450468
)
451469
.await?;
452470

471+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);
472+
453473
state
454474
.executor_manager
455475
.register_executor(executor_metadata, executor_data, false)
@@ -459,12 +479,16 @@ mod test {
459479

460480
assert_eq!(reservations.len(), 1);
461481

482+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);
483+
462484
// Offer the reservation. It should be filled with one of the 4 pending tasks. The other 3 should
463485
// be reserved for the other 3 tasks, emitting another offer event
464486
let reservations = state.offer_reservation(reservations).await?;
465487

466488
assert_eq!(reservations.len(), 3);
467489

490+
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);
491+
468492
// Remaining 3 task slots should be reserved for pending tasks
469493
let reservations = state.executor_manager.reserve_slots(4).await?;
470494

ballista/rust/scheduler/src/state/task_manager.rs

+77-40
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use crate::state::execution_graph::{ExecutionGraph, Task};
2222
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
2323
use crate::state::{decode_protobuf, encode_protobuf, with_lock};
2424
use ballista_core::config::BallistaConfig;
25-
#[cfg(not(test))]
2625
use ballista_core::error::BallistaError;
2726
use ballista_core::error::Result;
2827
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
@@ -42,6 +41,7 @@ use rand::distributions::Alphanumeric;
4241
use rand::{thread_rng, Rng};
4342
use std::collections::HashMap;
4443
use std::default::Default;
44+
use std::sync::atomic::{AtomicUsize, Ordering};
4545
use std::sync::Arc;
4646
use std::time::{SystemTime, UNIX_EPOCH};
4747
use tokio::sync::RwLock;
@@ -60,6 +60,7 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
6060
scheduler_id: String,
6161
// Cache for active execution graphs curated by this scheduler
6262
active_job_cache: ExecutionGraphCache,
63+
pending_task_queue_size: Arc<AtomicUsize>,
6364
}
6465

6566
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> {
@@ -76,6 +77,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
7677
codec,
7778
scheduler_id,
7879
active_job_cache: Arc::new(RwLock::new(HashMap::new())),
80+
pending_task_queue_size: Arc::new(AtomicUsize::new(0)),
7981
}
8082
}
8183

@@ -101,11 +103,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
101103
.await?;
102104

103105
graph.revive();
106+
let available_tasks = graph.available_tasks();
104107

105108
let mut active_graph_cache = self.active_job_cache.write().await;
106109
active_graph_cache.insert(job_id.to_owned(), Arc::new(RwLock::new(graph)));
107110

108-
Ok(())
111+
self.increase_pending_queue_size(available_tasks)
109112
}
110113

111114
/// Get the status of of a job. First look in the active cache.
@@ -199,16 +202,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
199202
let mut pending_tasks = 0usize;
200203
let mut assign_tasks = 0usize;
201204
let job_cache = self.active_job_cache.read().await;
205+
202206
for (_job_id, graph) in job_cache.iter() {
203207
let mut graph = graph.write().await;
204208
for reservation in free_reservations.iter().skip(assign_tasks) {
205209
if let Some(task) = graph.pop_next_task(&reservation.executor_id)? {
206210
assignments.push((reservation.executor_id.clone(), task));
207211
assign_tasks += 1;
212+
self.decrease_pending_queue_size(1)?;
208213
} else {
209214
break;
210215
}
211216
}
217+
212218
if assign_tasks >= free_reservations.len() {
213219
pending_tasks = graph.available_tasks();
214220
break;
@@ -253,18 +259,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
253259
) -> Result<()> {
254260
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
255261

256-
let running_tasks = self
257-
.get_execution_graph(job_id)
258-
.await
259-
.map(|graph| graph.running_tasks())
260-
.unwrap_or_else(|_| vec![]);
261-
262-
info!(
263-
"Cancelling {} running tasks for job {}",
264-
running_tasks.len(),
265-
job_id
266-
);
267-
268262
let failed_at = SystemTime::now()
269263
.duration_since(UNIX_EPOCH)
270264
.expect("Time went backwards")
@@ -273,39 +267,47 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
273267
self.fail_job_inner(lock, job_id, "Cancelled".to_owned(), failed_at)
274268
.await?;
275269

276-
let mut tasks: HashMap<&str, Vec<protobuf::PartitionId>> = Default::default();
277-
278-
for (partition, executor_id) in &running_tasks {
279-
if let Some(parts) = tasks.get_mut(executor_id.as_str()) {
280-
parts.push(protobuf::PartitionId {
281-
job_id: job_id.to_owned(),
282-
stage_id: partition.stage_id as u32,
283-
partition_id: partition.partition_id as u32,
284-
})
285-
} else {
286-
tasks.insert(
287-
executor_id.as_str(),
288-
vec![protobuf::PartitionId {
270+
if let Ok(graph) = self.get_execution_graph(job_id).await {
271+
let running_tasks = graph.running_tasks();
272+
let mut tasks: HashMap<&str, Vec<protobuf::PartitionId>> = Default::default();
273+
274+
info!(
275+
"Cancelling {} running tasks for job {}",
276+
running_tasks.len(),
277+
job_id
278+
);
279+
for (partition, executor_id) in &running_tasks {
280+
if let Some(parts) = tasks.get_mut(executor_id.as_str()) {
281+
parts.push(protobuf::PartitionId {
289282
job_id: job_id.to_owned(),
290283
stage_id: partition.stage_id as u32,
291284
partition_id: partition.partition_id as u32,
292-
}],
293-
);
285+
})
286+
} else {
287+
tasks.insert(
288+
executor_id.as_str(),
289+
vec![protobuf::PartitionId {
290+
job_id: job_id.to_owned(),
291+
stage_id: partition.stage_id as u32,
292+
partition_id: partition.partition_id as u32,
293+
}],
294+
);
295+
}
294296
}
295-
}
296297

297-
for (executor_id, partitions) in tasks {
298-
if let Ok(mut client) = executor_manager.get_client(executor_id).await {
299-
client
300-
.cancel_tasks(CancelTasksParams {
301-
partition_id: partitions,
302-
})
303-
.await?;
304-
} else {
305-
error!("Failed to get client for executor ID {}", executor_id)
298+
for (executor_id, partitions) in tasks {
299+
if let Ok(mut client) = executor_manager.get_client(executor_id).await {
300+
client
301+
.cancel_tasks(CancelTasksParams {
302+
partition_id: partitions,
303+
})
304+
.await?;
305+
} else {
306+
error!("Failed to get client for executor ID {}", executor_id)
307+
}
306308
}
309+
self.decrease_pending_queue_size(graph.available_tasks())?;
307310
}
308-
309311
Ok(())
310312
}
311313

@@ -364,6 +366,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
364366
pub async fn fail_running_job(&self, job_id: &str) -> Result<()> {
365367
if let Some(graph) = self.get_active_execution_graph(job_id).await {
366368
let graph = graph.read().await.clone();
369+
let available_tasks = graph.available_tasks();
367370
let value = self.encode_execution_graph(graph)?;
368371

369372
debug!("Moving job {} from Active to Failed", job_id);
@@ -372,6 +375,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
372375
self.state
373376
.put(Keyspace::FailedJobs, job_id.to_owned(), value)
374377
.await?;
378+
self.decrease_pending_queue_size(available_tasks)?
375379
} else {
376380
warn!("Fail to find job {} in the cache", job_id);
377381
}
@@ -385,10 +389,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
385389
let mut graph = graph.write().await;
386390
graph.revive();
387391
let graph = graph.clone();
392+
let available_tasks = graph.available_tasks();
388393
let value = self.encode_execution_graph(graph)?;
394+
389395
self.state
390396
.put(Keyspace::ActiveJobs, job_id.to_owned(), value)
391397
.await?;
398+
self.increase_pending_queue_size(available_tasks)?;
392399
} else {
393400
warn!("Fail to find job {} in the cache", job_id);
394401
}
@@ -574,4 +581,34 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
574581
.take(7)
575582
.collect()
576583
}
584+
585+
pub fn increase_pending_queue_size(&self, num: usize) -> Result<()> {
586+
match self.pending_task_queue_size.fetch_update(
587+
Ordering::Relaxed,
588+
Ordering::Relaxed,
589+
|s| Some(s + num),
590+
) {
591+
Ok(_) => Ok(()),
592+
Err(_) => Err(BallistaError::Internal(
593+
"Unable to update pending task counter".to_owned(),
594+
)),
595+
}
596+
}
597+
598+
pub fn decrease_pending_queue_size(&self, num: usize) -> Result<()> {
599+
match self.pending_task_queue_size.fetch_update(
600+
Ordering::Relaxed,
601+
Ordering::Relaxed,
602+
|s| Some(s - num),
603+
) {
604+
Ok(_) => Ok(()),
605+
Err(_) => Err(BallistaError::Internal(
606+
"Unable to update pending task counter".to_owned(),
607+
)),
608+
}
609+
}
610+
611+
pub fn get_pending_task_queue_size(&self) -> usize {
612+
self.pending_task_queue_size.load(Ordering::SeqCst)
613+
}
577614
}

0 commit comments

Comments
 (0)