@@ -196,112 +196,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
196
196
& self ,
197
197
reservations : Vec < ExecutorReservation > ,
198
198
) -> Result < ( Vec < ExecutorReservation > , usize ) > {
199
- let ( free_list, pending_tasks) = match self
200
- . task_manager
201
- . fill_reservations ( & reservations)
202
- . await
199
+ let pending_tasks = match self . task_manager . fill_reservations ( & reservations) . await
203
200
{
204
- Ok ( ( assignments, mut unassigned_reservations, pending_tasks) ) => {
205
- // Put tasks to the same executor together
206
- // And put tasks belonging to the same stage together for creating MultiTaskDefinition
207
- let mut executor_stage_assignments: HashMap <
208
- String ,
209
- HashMap < ( String , usize ) , Vec < TaskDescription > > ,
210
- > = HashMap :: new ( ) ;
211
- for ( executor_id, task) in assignments. into_iter ( ) {
212
- let stage_key =
213
- ( task. partition . job_id . clone ( ) , task. partition . stage_id ) ;
214
- if let Some ( tasks) = executor_stage_assignments. get_mut ( & executor_id)
215
- {
216
- if let Some ( executor_stage_tasks) = tasks. get_mut ( & stage_key) {
217
- executor_stage_tasks. push ( task) ;
218
- } else {
219
- tasks. insert ( stage_key, vec ! [ task] ) ;
220
- }
221
- } else {
222
- let mut executor_stage_tasks: HashMap <
223
- ( String , usize ) ,
224
- Vec < TaskDescription > ,
225
- > = HashMap :: new ( ) ;
226
- executor_stage_tasks. insert ( stage_key, vec ! [ task] ) ;
227
- executor_stage_assignments
228
- . insert ( executor_id, executor_stage_tasks) ;
229
- }
230
- }
231
-
232
- let mut join_handles = vec ! [ ] ;
233
- for ( executor_id, tasks) in executor_stage_assignments. into_iter ( ) {
234
- let tasks: Vec < Vec < TaskDescription > > = tasks. into_values ( ) . collect ( ) ;
235
- // Total number of tasks to be launched for one executor
236
- let n_tasks: usize =
237
- tasks. iter ( ) . map ( |stage_tasks| stage_tasks. len ( ) ) . sum ( ) ;
238
-
239
- let task_manager = self . task_manager . clone ( ) ;
240
- let executor_manager = self . executor_manager . clone ( ) ;
241
- let join_handle = tokio:: spawn ( async move {
242
- let success = match executor_manager
243
- . get_executor_metadata ( & executor_id)
244
- . await
245
- {
246
- Ok ( executor) => {
247
- if let Err ( e) = task_manager
248
- . launch_multi_task (
249
- & executor,
250
- tasks,
251
- & executor_manager,
252
- )
253
- . await
254
- {
255
- error ! ( "Failed to launch new task: {:?}" , e) ;
256
- false
257
- } else {
258
- true
259
- }
260
- }
261
- Err ( e) => {
262
- error ! ( "Failed to launch new task, could not get executor metadata: {:?}" , e) ;
263
- false
264
- }
265
- } ;
266
- if success {
267
- vec ! [ ]
268
- } else {
269
- vec ! [
270
- ExecutorReservation :: new_free( executor_id. clone( ) , ) ;
271
- n_tasks
272
- ]
273
- }
274
- } ) ;
275
- join_handles. push ( join_handle) ;
276
- }
201
+ Ok ( ( assignments, unassigned_reservations, pending_tasks) ) => {
202
+ let executor_stage_assignments = Self :: combine_task ( assignments) ;
277
203
278
- let unassigned_executor_reservations =
279
- futures:: future:: join_all ( join_handles)
280
- . await
281
- . into_iter ( )
282
- . collect :: < std:: result:: Result <
283
- Vec < Vec < ExecutorReservation > > ,
284
- tokio:: task:: JoinError ,
285
- > > ( ) ?;
286
- unassigned_reservations. append (
287
- & mut unassigned_executor_reservations
288
- . into_iter ( )
289
- . flatten ( )
290
- . collect :: < Vec < ExecutorReservation > > ( ) ,
204
+ self . spawn_tasks_and_persist_reservations_back (
205
+ executor_stage_assignments,
206
+ unassigned_reservations,
291
207
) ;
292
- ( unassigned_reservations, pending_tasks)
208
+
209
+ pending_tasks
293
210
}
211
+ // If error set all reservations back
294
212
Err ( e) => {
295
213
error ! ( "Error filling reservations: {:?}" , e) ;
296
- ( reservations, 0 )
214
+ self . executor_manager
215
+ . cancel_reservations ( reservations)
216
+ . await ?;
217
+ 0
297
218
}
298
219
} ;
299
220
300
221
let mut new_reservations = vec ! [ ] ;
301
- if !free_list. is_empty ( ) {
302
- // If any reserved slots remain, return them to the pool
303
- self . executor_manager . cancel_reservations ( free_list) . await ?;
304
- } else if pending_tasks > 0 {
222
+
223
+ if pending_tasks > 0 {
305
224
// If there are pending tasks available, try and schedule them
306
225
let pending_reservations = self
307
226
. executor_manager
@@ -313,6 +232,86 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
313
232
Ok ( ( new_reservations, pending_tasks) )
314
233
}
315
234
235
+ fn spawn_tasks_and_persist_reservations_back (
236
+ & self ,
237
+ executor_stage_assignments : HashMap <
238
+ String ,
239
+ HashMap < ( String , usize ) , Vec < TaskDescription > > ,
240
+ > ,
241
+ mut unassigned_reservations : Vec < ExecutorReservation > ,
242
+ ) {
243
+ let task_manager = self . task_manager . clone ( ) ;
244
+ let executor_manager = self . executor_manager . clone ( ) ;
245
+
246
+ tokio:: spawn ( async move {
247
+ for ( executor_id, tasks) in executor_stage_assignments. into_iter ( ) {
248
+ let tasks: Vec < Vec < TaskDescription > > = tasks. into_values ( ) . collect ( ) ;
249
+ // Total number of tasks to be launched for one executor
250
+ let n_tasks: usize =
251
+ tasks. iter ( ) . map ( |stage_tasks| stage_tasks. len ( ) ) . sum ( ) ;
252
+
253
+ match executor_manager. get_executor_metadata ( & executor_id) . await {
254
+ Ok ( executor) => {
255
+ if let Err ( e) = task_manager
256
+ . launch_multi_task ( & executor, tasks, & executor_manager)
257
+ . await
258
+ {
259
+ error ! ( "Failed to launch new task: {:?}" , e) ;
260
+ // set resource back.
261
+ unassigned_reservations. append ( & mut vec ! [
262
+ ExecutorReservation :: new_free(
263
+ executor_id. clone( ) ,
264
+ ) ;
265
+ n_tasks
266
+ ] ) ;
267
+ }
268
+ }
269
+ Err ( e) => {
270
+ error ! ( "Failed to launch new task, could not get executor metadata: {:?}" , e) ;
271
+ // here no need set resource back.
272
+ }
273
+ } ;
274
+ }
275
+ if !unassigned_reservations. is_empty ( ) {
276
+ // If any reserved slots remain, return them to the pool
277
+ executor_manager
278
+ . cancel_reservations ( unassigned_reservations)
279
+ . await
280
+ . expect ( "cancel_reservations fail!" ) ;
281
+ }
282
+ } ) ;
283
+ }
284
+
285
+ // Put tasks to the same executor together
286
+ // And put tasks belonging to the same stage together for creating MultiTaskDefinition
287
+ // return a map of <executor_id, <stage_key, TaskDesc>>.
288
+ fn combine_task (
289
+ assignments : Vec < ( String , TaskDescription ) > ,
290
+ ) -> HashMap < String , HashMap < ( String , usize ) , Vec < TaskDescription > > > {
291
+ let mut executor_stage_assignments: HashMap <
292
+ String ,
293
+ HashMap < ( String , usize ) , Vec < TaskDescription > > ,
294
+ > = HashMap :: new ( ) ;
295
+ for ( executor_id, task) in assignments. into_iter ( ) {
296
+ let stage_key = ( task. partition . job_id . clone ( ) , task. partition . stage_id ) ;
297
+ if let Some ( tasks) = executor_stage_assignments. get_mut ( & executor_id) {
298
+ if let Some ( executor_stage_tasks) = tasks. get_mut ( & stage_key) {
299
+ executor_stage_tasks. push ( task) ;
300
+ } else {
301
+ tasks. insert ( stage_key, vec ! [ task] ) ;
302
+ }
303
+ } else {
304
+ let mut executor_stage_tasks: HashMap <
305
+ ( String , usize ) ,
306
+ Vec < TaskDescription > ,
307
+ > = HashMap :: new ( ) ;
308
+ executor_stage_tasks. insert ( stage_key, vec ! [ task] ) ;
309
+ executor_stage_assignments. insert ( executor_id, executor_stage_tasks) ;
310
+ }
311
+ }
312
+ executor_stage_assignments
313
+ }
314
+
316
315
pub ( crate ) async fn plan_job (
317
316
& self ,
318
317
job_id : & str ,
@@ -454,6 +453,8 @@ mod test {
454
453
assert_eq ! ( assigned, 0 ) ;
455
454
assert ! ( result. is_empty( ) ) ;
456
455
456
+ // Need sleep wait for the spawn task work done.
457
+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
457
458
// All reservations should have been cancelled so we should be able to reserve them now
458
459
let reservations = state. executor_manager . reserve_slots ( 4 ) . await ?;
459
460
0 commit comments