Skip to content

Commit d5d584a

Browse files
Revert "Merge pull request #19 from coralogix/sc-5792"
This reverts commit 08140ef, reversing changes made to a7f1384.
1 parent f722f7d commit d5d584a

File tree

3 files changed

+41
-103
lines changed

3 files changed

+41
-103
lines changed

ballista/rust/scheduler/src/scheduler_server/external_scaler.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExternalScaler
5757
Ok(Response::new(GetMetricsResponse {
5858
metric_values: vec![MetricValue {
5959
metric_name: INFLIGHT_TASKS_METRIC_NAME.to_string(),
60-
metric_value: self.state.task_manager.get_pending_task_queue_size()
61-
as i64,
60+
metric_value: 10000000, // A very high number to saturate the HPA
6261
}],
6362
}))
6463
}

ballista/rust/scheduler/src/state/mod.rs

-24
Original file line numberDiff line numberDiff line change
@@ -351,37 +351,24 @@ mod test {
351351

352352
let plan = test_graph(session_ctx.clone()).await;
353353

354-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);
355-
356354
// Create 4 jobs so we have four pending tasks
357355
state
358356
.task_manager
359357
.submit_job("job-1", session_ctx.session_id().as_str(), plan.clone(), 0)
360358
.await?;
361-
362-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);
363-
364359
state
365360
.task_manager
366361
.submit_job("job-2", session_ctx.session_id().as_str(), plan.clone(), 0)
367362
.await?;
368-
369-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 2);
370-
371363
state
372364
.task_manager
373365
.submit_job("job-3", session_ctx.session_id().as_str(), plan.clone(), 0)
374366
.await?;
375-
376-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 3);
377-
378367
state
379368
.task_manager
380369
.submit_job("job-4", session_ctx.session_id().as_str(), plan.clone(), 0)
381370
.await?;
382371

383-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 4);
384-
385372
let executors = test_executors(1, 4);
386373

387374
let (executor_metadata, executor_data) = executors[0].clone();
@@ -394,7 +381,6 @@ mod test {
394381
let result = state.offer_reservation(reservations).await?;
395382

396383
assert!(result.is_empty());
397-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);
398384

399385
// All task slots should be assigned so we should not be able to reserve more tasks
400386
let reservations = state.executor_manager.reserve_slots(4).await?;
@@ -422,16 +408,12 @@ mod test {
422408

423409
let plan = test_graph(session_ctx.clone()).await;
424410

425-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);
426-
427411
// Create a job
428412
state
429413
.task_manager
430414
.submit_job("job-1", session_ctx.session_id().as_str(), plan.clone(), 0)
431415
.await?;
432416

433-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);
434-
435417
let executors = test_executors(1, 4);
436418

437419
let (executor_metadata, executor_data) = executors[0].clone();
@@ -468,8 +450,6 @@ mod test {
468450
)
469451
.await?;
470452

471-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);
472-
473453
state
474454
.executor_manager
475455
.register_executor(executor_metadata, executor_data, false)
@@ -479,16 +459,12 @@ mod test {
479459

480460
assert_eq!(reservations.len(), 1);
481461

482-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 1);
483-
484462
// Offer the reservation. It should be filled with one of the 4 pending tasks. The other 3 should
485463
// be reserved for the other 3 tasks, emitting another offer event
486464
let reservations = state.offer_reservation(reservations).await?;
487465

488466
assert_eq!(reservations.len(), 3);
489467

490-
assert_eq!(state.task_manager.get_pending_task_queue_size(), 0);
491-
492468
// Remaining 3 task slots should be reserved for pending tasks
493469
let reservations = state.executor_manager.reserve_slots(4).await?;
494470

ballista/rust/scheduler/src/state/task_manager.rs

+40-77
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::state::execution_graph::{ExecutionGraph, Task};
2222
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
2323
use crate::state::{decode_protobuf, encode_protobuf, with_lock};
2424
use ballista_core::config::BallistaConfig;
25+
#[cfg(not(test))]
2526
use ballista_core::error::BallistaError;
2627
use ballista_core::error::Result;
2728
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
@@ -41,7 +42,6 @@ use rand::distributions::Alphanumeric;
4142
use rand::{thread_rng, Rng};
4243
use std::collections::HashMap;
4344
use std::default::Default;
44-
use std::sync::atomic::{AtomicUsize, Ordering};
4545
use std::sync::Arc;
4646
use std::time::{SystemTime, UNIX_EPOCH};
4747
use tokio::sync::RwLock;
@@ -60,7 +60,6 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
6060
scheduler_id: String,
6161
// Cache for active execution graphs curated by this scheduler
6262
active_job_cache: ExecutionGraphCache,
63-
pending_task_queue_size: Arc<AtomicUsize>,
6463
}
6564

6665
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> {
@@ -77,7 +76,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
7776
codec,
7877
scheduler_id,
7978
active_job_cache: Arc::new(RwLock::new(HashMap::new())),
80-
pending_task_queue_size: Arc::new(AtomicUsize::new(0)),
8179
}
8280
}
8381

@@ -103,12 +101,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
103101
.await?;
104102

105103
graph.revive();
106-
let available_tasks = graph.available_tasks();
107104

108105
let mut active_graph_cache = self.active_job_cache.write().await;
109106
active_graph_cache.insert(job_id.to_owned(), Arc::new(RwLock::new(graph)));
110107

111-
self.increase_pending_queue_size(available_tasks)
108+
Ok(())
112109
}
113110

114111
/// Get the status of of a job. First look in the active cache.
@@ -202,19 +199,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
202199
let mut pending_tasks = 0usize;
203200
let mut assign_tasks = 0usize;
204201
let job_cache = self.active_job_cache.read().await;
205-
206202
for (_job_id, graph) in job_cache.iter() {
207203
let mut graph = graph.write().await;
208204
for reservation in free_reservations.iter().skip(assign_tasks) {
209205
if let Some(task) = graph.pop_next_task(&reservation.executor_id)? {
210206
assignments.push((reservation.executor_id.clone(), task));
211207
assign_tasks += 1;
212-
self.decrease_pending_queue_size(1)?;
213208
} else {
214209
break;
215210
}
216211
}
217-
218212
if assign_tasks >= free_reservations.len() {
219213
pending_tasks = graph.available_tasks();
220214
break;
@@ -259,6 +253,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
259253
) -> Result<()> {
260254
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
261255

256+
let running_tasks = self
257+
.get_execution_graph(job_id)
258+
.await
259+
.map(|graph| graph.running_tasks())
260+
.unwrap_or_else(|_| vec![]);
261+
262+
info!(
263+
"Cancelling {} running tasks for job {}",
264+
running_tasks.len(),
265+
job_id
266+
);
267+
262268
let failed_at = SystemTime::now()
263269
.duration_since(UNIX_EPOCH)
264270
.expect("Time went backwards")
@@ -267,47 +273,39 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
267273
self.fail_job_inner(lock, job_id, "Cancelled".to_owned(), failed_at)
268274
.await?;
269275

270-
if let Ok(graph) = self.get_execution_graph(job_id).await {
271-
let running_tasks = graph.running_tasks();
272-
let mut tasks: HashMap<&str, Vec<protobuf::PartitionId>> = Default::default();
273-
274-
info!(
275-
"Cancelling {} running tasks for job {}",
276-
running_tasks.len(),
277-
job_id
278-
);
279-
for (partition, executor_id) in &running_tasks {
280-
if let Some(parts) = tasks.get_mut(executor_id.as_str()) {
281-
parts.push(protobuf::PartitionId {
276+
let mut tasks: HashMap<&str, Vec<protobuf::PartitionId>> = Default::default();
277+
278+
for (partition, executor_id) in &running_tasks {
279+
if let Some(parts) = tasks.get_mut(executor_id.as_str()) {
280+
parts.push(protobuf::PartitionId {
281+
job_id: job_id.to_owned(),
282+
stage_id: partition.stage_id as u32,
283+
partition_id: partition.partition_id as u32,
284+
})
285+
} else {
286+
tasks.insert(
287+
executor_id.as_str(),
288+
vec![protobuf::PartitionId {
282289
job_id: job_id.to_owned(),
283290
stage_id: partition.stage_id as u32,
284291
partition_id: partition.partition_id as u32,
285-
})
286-
} else {
287-
tasks.insert(
288-
executor_id.as_str(),
289-
vec![protobuf::PartitionId {
290-
job_id: job_id.to_owned(),
291-
stage_id: partition.stage_id as u32,
292-
partition_id: partition.partition_id as u32,
293-
}],
294-
);
295-
}
292+
}],
293+
);
296294
}
295+
}
297296

298-
for (executor_id, partitions) in tasks {
299-
if let Ok(mut client) = executor_manager.get_client(executor_id).await {
300-
client
301-
.cancel_tasks(CancelTasksParams {
302-
partition_id: partitions,
303-
})
304-
.await?;
305-
} else {
306-
error!("Failed to get client for executor ID {}", executor_id)
307-
}
297+
for (executor_id, partitions) in tasks {
298+
if let Ok(mut client) = executor_manager.get_client(executor_id).await {
299+
client
300+
.cancel_tasks(CancelTasksParams {
301+
partition_id: partitions,
302+
})
303+
.await?;
304+
} else {
305+
error!("Failed to get client for executor ID {}", executor_id)
308306
}
309-
self.decrease_pending_queue_size(graph.available_tasks())?;
310307
}
308+
311309
Ok(())
312310
}
313311

@@ -366,7 +364,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
366364
pub async fn fail_running_job(&self, job_id: &str) -> Result<()> {
367365
if let Some(graph) = self.get_active_execution_graph(job_id).await {
368366
let graph = graph.read().await.clone();
369-
let available_tasks = graph.available_tasks();
370367
let value = self.encode_execution_graph(graph)?;
371368

372369
debug!("Moving job {} from Active to Failed", job_id);
@@ -375,7 +372,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
375372
self.state
376373
.put(Keyspace::FailedJobs, job_id.to_owned(), value)
377374
.await?;
378-
self.decrease_pending_queue_size(available_tasks)?
379375
} else {
380376
warn!("Fail to find job {} in the cache", job_id);
381377
}
@@ -389,13 +385,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
389385
let mut graph = graph.write().await;
390386
graph.revive();
391387
let graph = graph.clone();
392-
let available_tasks = graph.available_tasks();
393388
let value = self.encode_execution_graph(graph)?;
394-
395389
self.state
396390
.put(Keyspace::ActiveJobs, job_id.to_owned(), value)
397391
.await?;
398-
self.increase_pending_queue_size(available_tasks)?;
399392
} else {
400393
warn!("Fail to find job {} in the cache", job_id);
401394
}
@@ -581,34 +574,4 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
581574
.take(7)
582575
.collect()
583576
}
584-
585-
pub fn increase_pending_queue_size(&self, num: usize) -> Result<()> {
586-
match self.pending_task_queue_size.fetch_update(
587-
Ordering::Relaxed,
588-
Ordering::Relaxed,
589-
|s| Some(s + num),
590-
) {
591-
Ok(_) => Ok(()),
592-
Err(_) => Err(BallistaError::Internal(
593-
"Unable to update pending task counter".to_owned(),
594-
)),
595-
}
596-
}
597-
598-
pub fn decrease_pending_queue_size(&self, num: usize) -> Result<()> {
599-
match self.pending_task_queue_size.fetch_update(
600-
Ordering::Relaxed,
601-
Ordering::Relaxed,
602-
|s| Some(s - num),
603-
) {
604-
Ok(_) => Ok(()),
605-
Err(_) => Err(BallistaError::Internal(
606-
"Unable to update pending task counter".to_owned(),
607-
)),
608-
}
609-
}
610-
611-
pub fn get_pending_task_queue_size(&self) -> usize {
612-
self.pending_task_queue_size.load(Ordering::SeqCst)
613-
}
614577
}

0 commit comments

Comments
 (0)