@@ -119,44 +119,59 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
119
119
120
120
let state = self . state . clone ( ) ;
121
121
tokio:: spawn ( async move {
122
- let event = if let Err ( e) = state
123
- . submit_job ( & job_id, & job_name, session_ctx, & plan, queued_at)
124
- . await
125
- {
126
- let fail_message = format ! ( "Error planning job {job_id}: {e:?}" ) ;
127
- error ! ( "{}" , & fail_message) ;
128
- QueryStageSchedulerEvent :: JobPlanningFailed {
129
- job_id,
130
- fail_message,
131
- queued_at,
132
- failed_at : timestamp_millis ( ) ,
133
- }
134
- } else {
135
- QueryStageSchedulerEvent :: JobSubmitted {
136
- job_id,
137
- queued_at,
138
- submitted_at : timestamp_millis ( ) ,
139
- resubmit : false ,
140
- }
141
- } ;
122
+ let event =
123
+ match state. plan_job ( & job_id, session_ctx. clone ( ) , & plan) . await {
124
+ Ok ( plan) => QueryStageSchedulerEvent :: JobSubmitted {
125
+ job_id,
126
+ job_name,
127
+ session_id : session_ctx. session_id ( ) ,
128
+ queued_at,
129
+ submitted_at : timestamp_millis ( ) ,
130
+ resubmit : false ,
131
+ plan,
132
+ } ,
133
+ Err ( error) => {
134
+ let fail_message =
135
+ format ! ( "Error planning job {job_id}: {error:?}" ) ;
136
+ error ! ( "{}" , & fail_message) ;
137
+ QueryStageSchedulerEvent :: JobPlanningFailed {
138
+ job_id,
139
+ fail_message,
140
+ queued_at,
141
+ failed_at : timestamp_millis ( ) ,
142
+ }
143
+ }
144
+ } ;
142
145
if let Err ( e) = tx_event. post_event ( event) . await {
143
146
error ! ( "Fail to send event due to {}" , e) ;
144
147
}
145
148
} ) ;
146
149
}
147
150
QueryStageSchedulerEvent :: JobSubmitted {
148
151
job_id,
152
+ job_name,
153
+ session_id,
149
154
queued_at,
150
155
submitted_at,
151
156
resubmit,
157
+ plan,
152
158
} => {
153
159
if !resubmit {
154
160
self . metrics_collector . record_submitted (
155
161
& job_id,
156
162
queued_at,
157
163
submitted_at,
158
164
) ;
159
-
165
+ self . state
166
+ . task_manager
167
+ . submit_job (
168
+ job_id. as_str ( ) ,
169
+ job_name. as_str ( ) ,
170
+ session_id. as_str ( ) ,
171
+ plan. clone ( ) ,
172
+ queued_at,
173
+ )
174
+ . await ?;
160
175
info ! ( "Job {} submitted" , job_id) ;
161
176
} else {
162
177
debug ! ( "Job {} resubmitted" , job_id) ;
@@ -192,9 +207,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
192
207
if let Err ( e) = tx_event
193
208
. post_event ( QueryStageSchedulerEvent :: JobSubmitted {
194
209
job_id,
210
+ job_name,
211
+ session_id,
195
212
queued_at,
196
213
submitted_at,
197
214
resubmit : true ,
215
+ plan : plan. clone ( ) ,
198
216
} )
199
217
. await
200
218
{
@@ -387,6 +405,7 @@ mod tests {
387
405
use ballista_core:: event_loop:: EventAction ;
388
406
use datafusion:: arrow:: datatypes:: { DataType , Field , Schema } ;
389
407
use datafusion:: logical_expr:: { col, sum, LogicalPlan } ;
408
+ use datafusion:: physical_plan:: empty:: EmptyExec ;
390
409
use datafusion:: test_util:: scan_empty_with_partitions;
391
410
use std:: sync:: Arc ;
392
411
use std:: time:: Duration ;
@@ -418,15 +437,26 @@ mod tests {
418
437
419
438
let event = QueryStageSchedulerEvent :: JobSubmitted {
420
439
job_id : "job-id" . to_string ( ) ,
440
+ job_name : "job-name" . to_string ( ) ,
441
+ session_id : "session-id" . to_string ( ) ,
421
442
queued_at : 0 ,
422
443
submitted_at : 0 ,
423
444
resubmit : false ,
445
+ plan : Arc :: new ( EmptyExec :: new ( false , Arc :: new ( test_schema ( ) ) ) ) ,
424
446
} ;
425
447
448
+ // Mock the JobQueued work.
449
+ query_stage_scheduler
450
+ . state
451
+ . task_manager
452
+ . queue_job ( "job-id" , "job-name" , 0 )
453
+ . await ?;
454
+
426
455
query_stage_scheduler. on_receive ( event, & tx, & rx) . await ?;
427
456
428
457
let next_event = rx. recv ( ) . await . unwrap ( ) ;
429
458
459
+ dbg ! ( next_event. clone( ) ) ;
430
460
assert ! ( matches!(
431
461
next_event,
432
462
QueryStageSchedulerEvent :: JobSubmitted { job_id, resubmit, .. } if job_id == "job-id" && resubmit
@@ -540,10 +570,7 @@ mod tests {
540
570
}
541
571
542
572
fn test_plan ( partitions : usize ) -> LogicalPlan {
543
- let schema = Schema :: new ( vec ! [
544
- Field :: new( "id" , DataType :: Utf8 , false ) ,
545
- Field :: new( "gmv" , DataType :: UInt64 , false ) ,
546
- ] ) ;
573
+ let schema = test_schema ( ) ;
547
574
548
575
scan_empty_with_partitions ( None , & schema, Some ( vec ! [ 0 , 1 ] ) , partitions)
549
576
. unwrap ( )
@@ -552,4 +579,11 @@ mod tests {
552
579
. build ( )
553
580
. unwrap ( )
554
581
}
582
+
583
+ fn test_schema ( ) -> Schema {
584
+ Schema :: new ( vec ! [
585
+ Field :: new( "id" , DataType :: Utf8 , false ) ,
586
+ Field :: new( "gmv" , DataType :: UInt64 , false ) ,
587
+ ] )
588
+ }
555
589
}
0 commit comments