@@ -22,7 +22,6 @@ use crate::state::execution_graph::{ExecutionGraph, Task};
22
22
use crate :: state:: executor_manager:: { ExecutorManager , ExecutorReservation } ;
23
23
use crate :: state:: { decode_protobuf, encode_protobuf, with_lock} ;
24
24
use ballista_core:: config:: BallistaConfig ;
25
- #[ cfg( not( test) ) ]
26
25
use ballista_core:: error:: BallistaError ;
27
26
use ballista_core:: error:: Result ;
28
27
use ballista_core:: serde:: protobuf:: executor_grpc_client:: ExecutorGrpcClient ;
@@ -42,6 +41,7 @@ use rand::distributions::Alphanumeric;
42
41
use rand:: { thread_rng, Rng } ;
43
42
use std:: collections:: HashMap ;
44
43
use std:: default:: Default ;
44
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
45
45
use std:: sync:: Arc ;
46
46
use std:: time:: { SystemTime , UNIX_EPOCH } ;
47
47
use tokio:: sync:: RwLock ;
@@ -60,6 +60,7 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
60
60
scheduler_id : String ,
61
61
// Cache for active execution graphs curated by this scheduler
62
62
active_job_cache : ExecutionGraphCache ,
63
+ pending_task_queue_size : Arc < AtomicUsize > ,
63
64
}
64
65
65
66
impl < T : ' static + AsLogicalPlan , U : ' static + AsExecutionPlan > TaskManager < T , U > {
@@ -76,6 +77,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
76
77
codec,
77
78
scheduler_id,
78
79
active_job_cache : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
80
+ pending_task_queue_size : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
79
81
}
80
82
}
81
83
@@ -101,11 +103,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
101
103
. await ?;
102
104
103
105
graph. revive ( ) ;
106
+ let available_tasks = graph. available_tasks ( ) ;
104
107
105
108
let mut active_graph_cache = self . active_job_cache . write ( ) . await ;
106
109
active_graph_cache. insert ( job_id. to_owned ( ) , Arc :: new ( RwLock :: new ( graph) ) ) ;
107
110
108
- Ok ( ( ) )
111
+ self . increase_pending_queue_size ( available_tasks )
109
112
}
110
113
111
114
/// Get the status of of a job. First look in the active cache.
@@ -199,16 +202,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
199
202
let mut pending_tasks = 0usize ;
200
203
let mut assign_tasks = 0usize ;
201
204
let job_cache = self . active_job_cache . read ( ) . await ;
205
+
202
206
for ( _job_id, graph) in job_cache. iter ( ) {
203
207
let mut graph = graph. write ( ) . await ;
204
208
for reservation in free_reservations. iter ( ) . skip ( assign_tasks) {
205
209
if let Some ( task) = graph. pop_next_task ( & reservation. executor_id ) ? {
206
210
assignments. push ( ( reservation. executor_id . clone ( ) , task) ) ;
207
211
assign_tasks += 1 ;
212
+ self . decrease_pending_queue_size ( 1 ) ?;
208
213
} else {
209
214
break ;
210
215
}
211
216
}
217
+
212
218
if assign_tasks >= free_reservations. len ( ) {
213
219
pending_tasks = graph. available_tasks ( ) ;
214
220
break ;
@@ -253,18 +259,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
253
259
) -> Result < ( ) > {
254
260
let lock = self . state . lock ( Keyspace :: ActiveJobs , "" ) . await ?;
255
261
256
- let running_tasks = self
257
- . get_execution_graph ( job_id)
258
- . await
259
- . map ( |graph| graph. running_tasks ( ) )
260
- . unwrap_or_else ( |_| vec ! [ ] ) ;
261
-
262
- info ! (
263
- "Cancelling {} running tasks for job {}" ,
264
- running_tasks. len( ) ,
265
- job_id
266
- ) ;
267
-
268
262
let failed_at = SystemTime :: now ( )
269
263
. duration_since ( UNIX_EPOCH )
270
264
. expect ( "Time went backwards" )
@@ -273,39 +267,47 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
273
267
self . fail_job_inner ( lock, job_id, "Cancelled" . to_owned ( ) , failed_at)
274
268
. await ?;
275
269
276
- let mut tasks: HashMap < & str , Vec < protobuf:: PartitionId > > = Default :: default ( ) ;
277
-
278
- for ( partition, executor_id) in & running_tasks {
279
- if let Some ( parts) = tasks. get_mut ( executor_id. as_str ( ) ) {
280
- parts. push ( protobuf:: PartitionId {
281
- job_id : job_id. to_owned ( ) ,
282
- stage_id : partition. stage_id as u32 ,
283
- partition_id : partition. partition_id as u32 ,
284
- } )
285
- } else {
286
- tasks. insert (
287
- executor_id. as_str ( ) ,
288
- vec ! [ protobuf:: PartitionId {
270
+ if let Ok ( graph) = self . get_execution_graph ( job_id) . await {
271
+ let running_tasks = graph. running_tasks ( ) ;
272
+ let mut tasks: HashMap < & str , Vec < protobuf:: PartitionId > > = Default :: default ( ) ;
273
+
274
+ info ! (
275
+ "Cancelling {} running tasks for job {}" ,
276
+ running_tasks. len( ) ,
277
+ job_id
278
+ ) ;
279
+ for ( partition, executor_id) in & running_tasks {
280
+ if let Some ( parts) = tasks. get_mut ( executor_id. as_str ( ) ) {
281
+ parts. push ( protobuf:: PartitionId {
289
282
job_id : job_id. to_owned ( ) ,
290
283
stage_id : partition. stage_id as u32 ,
291
284
partition_id : partition. partition_id as u32 ,
292
- } ] ,
293
- ) ;
285
+ } )
286
+ } else {
287
+ tasks. insert (
288
+ executor_id. as_str ( ) ,
289
+ vec ! [ protobuf:: PartitionId {
290
+ job_id: job_id. to_owned( ) ,
291
+ stage_id: partition. stage_id as u32 ,
292
+ partition_id: partition. partition_id as u32 ,
293
+ } ] ,
294
+ ) ;
295
+ }
294
296
}
295
- }
296
297
297
- for ( executor_id, partitions) in tasks {
298
- if let Ok ( mut client) = executor_manager. get_client ( executor_id) . await {
299
- client
300
- . cancel_tasks ( CancelTasksParams {
301
- partition_id : partitions,
302
- } )
303
- . await ?;
304
- } else {
305
- error ! ( "Failed to get client for executor ID {}" , executor_id)
298
+ for ( executor_id, partitions) in tasks {
299
+ if let Ok ( mut client) = executor_manager. get_client ( executor_id) . await {
300
+ client
301
+ . cancel_tasks ( CancelTasksParams {
302
+ partition_id : partitions,
303
+ } )
304
+ . await ?;
305
+ } else {
306
+ error ! ( "Failed to get client for executor ID {}" , executor_id)
307
+ }
306
308
}
309
+ self . decrease_pending_queue_size ( graph. available_tasks ( ) ) ?;
307
310
}
308
-
309
311
Ok ( ( ) )
310
312
}
311
313
@@ -364,6 +366,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
364
366
pub async fn fail_running_job ( & self , job_id : & str ) -> Result < ( ) > {
365
367
if let Some ( graph) = self . get_active_execution_graph ( job_id) . await {
366
368
let graph = graph. read ( ) . await . clone ( ) ;
369
+ let available_tasks = graph. available_tasks ( ) ;
367
370
let value = self . encode_execution_graph ( graph) ?;
368
371
369
372
debug ! ( "Moving job {} from Active to Failed" , job_id) ;
@@ -372,6 +375,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
372
375
self . state
373
376
. put ( Keyspace :: FailedJobs , job_id. to_owned ( ) , value)
374
377
. await ?;
378
+ self . decrease_pending_queue_size ( available_tasks) ?
375
379
} else {
376
380
warn ! ( "Fail to find job {} in the cache" , job_id) ;
377
381
}
@@ -385,10 +389,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
385
389
let mut graph = graph. write ( ) . await ;
386
390
graph. revive ( ) ;
387
391
let graph = graph. clone ( ) ;
392
+ let available_tasks = graph. available_tasks ( ) ;
388
393
let value = self . encode_execution_graph ( graph) ?;
394
+
389
395
self . state
390
396
. put ( Keyspace :: ActiveJobs , job_id. to_owned ( ) , value)
391
397
. await ?;
398
+ self . increase_pending_queue_size ( available_tasks) ?;
392
399
} else {
393
400
warn ! ( "Fail to find job {} in the cache" , job_id) ;
394
401
}
@@ -574,4 +581,34 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
574
581
. take ( 7 )
575
582
. collect ( )
576
583
}
584
+
585
+ pub fn increase_pending_queue_size ( & self , num : usize ) -> Result < ( ) > {
586
+ match self . pending_task_queue_size . fetch_update (
587
+ Ordering :: Relaxed ,
588
+ Ordering :: Relaxed ,
589
+ |s| Some ( s + num) ,
590
+ ) {
591
+ Ok ( _) => Ok ( ( ) ) ,
592
+ Err ( _) => Err ( BallistaError :: Internal (
593
+ "Unable to update pending task counter" . to_owned ( ) ,
594
+ ) ) ,
595
+ }
596
+ }
597
+
598
+ pub fn decrease_pending_queue_size ( & self , num : usize ) -> Result < ( ) > {
599
+ match self . pending_task_queue_size . fetch_update (
600
+ Ordering :: Relaxed ,
601
+ Ordering :: Relaxed ,
602
+ |s| Some ( s - num) ,
603
+ ) {
604
+ Ok ( _) => Ok ( ( ) ) ,
605
+ Err ( _) => Err ( BallistaError :: Internal (
606
+ "Unable to update pending task counter" . to_owned ( ) ,
607
+ ) ) ,
608
+ }
609
+ }
610
+
611
+ pub fn get_pending_task_queue_size ( & self ) -> usize {
612
+ self . pending_task_queue_size . load ( Ordering :: SeqCst )
613
+ }
577
614
}
0 commit comments