Skip to content

Commit a21036a

Browse files
committed
fix conflict 2
1 parent 30232e0 commit a21036a

File tree

4 files changed

+81
-39
lines changed

4 files changed

+81
-39
lines changed

ballista/scheduler/src/scheduler_server/event.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use datafusion::logical_expr::LogicalPlan;
2222

2323
use crate::state::execution_graph::RunningTaskInfo;
2424
use ballista_core::serde::protobuf::TaskStatus;
25+
use datafusion::physical_plan::ExecutionPlan;
2526
use datafusion::prelude::SessionContext;
2627
use std::sync::Arc;
2728

@@ -36,9 +37,12 @@ pub enum QueryStageSchedulerEvent {
3637
},
3738
JobSubmitted {
3839
job_id: String,
40+
job_name: String,
41+
session_id: String,
3942
queued_at: u64,
4043
submitted_at: u64,
4144
resubmit: bool,
45+
plan: Arc<dyn ExecutionPlan>,
4246
},
4347
// For a job which failed during planning
4448
JobPlanningFailed {
@@ -76,8 +80,10 @@ impl Debug for QueryStageSchedulerEvent {
7680
} => {
7781
write!(f, "JobQueued : job_id={job_id}, job_name={job_name}.")
7882
}
79-
QueryStageSchedulerEvent::JobSubmitted { job_id, .. } => {
80-
write!(f, "JobSubmitted : job_id={job_id}.")
83+
QueryStageSchedulerEvent::JobSubmitted {
84+
job_id, resubmit, ..
85+
} => {
86+
write!(f, "JobSubmitted : job_id={job_id}, resubmit={resubmit}.")
8187
}
8288
QueryStageSchedulerEvent::JobPlanningFailed {
8389
job_id,

ballista/scheduler/src/scheduler_server/mod.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -440,13 +440,20 @@ mod test {
440440
.queue_job(job_id, "", timestamp_millis())
441441
.await?;
442442

443-
// Submit job
444-
scheduler
443+
// Plan job
444+
let plan = scheduler
445445
.state
446-
.submit_job(job_id, "", ctx, &plan, 0)
446+
.plan_job(job_id, ctx.clone(), &plan)
447447
.await
448448
.expect("submitting plan");
449449

450+
//Submit job plan
451+
scheduler
452+
.state
453+
.task_manager
454+
.submit_job(job_id, "", &ctx.session_id(), plan, 0)
455+
.await?;
456+
450457
// Refresh the ExecutionGraph
451458
while let Some(graph) = scheduler
452459
.state

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

+59-25
Original file line numberDiff line numberDiff line change
@@ -119,44 +119,59 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
119119

120120
let state = self.state.clone();
121121
tokio::spawn(async move {
122-
let event = if let Err(e) = state
123-
.submit_job(&job_id, &job_name, session_ctx, &plan, queued_at)
124-
.await
125-
{
126-
let fail_message = format!("Error planning job {job_id}: {e:?}");
127-
error!("{}", &fail_message);
128-
QueryStageSchedulerEvent::JobPlanningFailed {
129-
job_id,
130-
fail_message,
131-
queued_at,
132-
failed_at: timestamp_millis(),
133-
}
134-
} else {
135-
QueryStageSchedulerEvent::JobSubmitted {
136-
job_id,
137-
queued_at,
138-
submitted_at: timestamp_millis(),
139-
resubmit: false,
140-
}
141-
};
122+
let event =
123+
match state.plan_job(&job_id, session_ctx.clone(), &plan).await {
124+
Ok(plan) => QueryStageSchedulerEvent::JobSubmitted {
125+
job_id,
126+
job_name,
127+
session_id: session_ctx.session_id(),
128+
queued_at,
129+
submitted_at: timestamp_millis(),
130+
resubmit: false,
131+
plan,
132+
},
133+
Err(error) => {
134+
let fail_message =
135+
format!("Error planning job {job_id}: {error:?}");
136+
error!("{}", &fail_message);
137+
QueryStageSchedulerEvent::JobPlanningFailed {
138+
job_id,
139+
fail_message,
140+
queued_at,
141+
failed_at: timestamp_millis(),
142+
}
143+
}
144+
};
142145
if let Err(e) = tx_event.post_event(event).await {
143146
error!("Fail to send event due to {}", e);
144147
}
145148
});
146149
}
147150
QueryStageSchedulerEvent::JobSubmitted {
148151
job_id,
152+
job_name,
153+
session_id,
149154
queued_at,
150155
submitted_at,
151156
resubmit,
157+
plan,
152158
} => {
153159
if !resubmit {
154160
self.metrics_collector.record_submitted(
155161
&job_id,
156162
queued_at,
157163
submitted_at,
158164
);
159-
165+
self.state
166+
.task_manager
167+
.submit_job(
168+
job_id.as_str(),
169+
job_name.as_str(),
170+
session_id.as_str(),
171+
plan.clone(),
172+
queued_at,
173+
)
174+
.await?;
160175
info!("Job {} submitted", job_id);
161176
} else {
162177
debug!("Job {} resubmitted", job_id);
@@ -192,9 +207,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
192207
if let Err(e) = tx_event
193208
.post_event(QueryStageSchedulerEvent::JobSubmitted {
194209
job_id,
210+
job_name,
211+
session_id,
195212
queued_at,
196213
submitted_at,
197214
resubmit: true,
215+
plan: plan.clone(),
198216
})
199217
.await
200218
{
@@ -387,6 +405,7 @@ mod tests {
387405
use ballista_core::event_loop::EventAction;
388406
use datafusion::arrow::datatypes::{DataType, Field, Schema};
389407
use datafusion::logical_expr::{col, sum, LogicalPlan};
408+
use datafusion::physical_plan::empty::EmptyExec;
390409
use datafusion::test_util::scan_empty_with_partitions;
391410
use std::sync::Arc;
392411
use std::time::Duration;
@@ -418,15 +437,26 @@ mod tests {
418437

419438
let event = QueryStageSchedulerEvent::JobSubmitted {
420439
job_id: "job-id".to_string(),
440+
job_name: "job-name".to_string(),
441+
session_id: "session-id".to_string(),
421442
queued_at: 0,
422443
submitted_at: 0,
423444
resubmit: false,
445+
plan: Arc::new(EmptyExec::new(false, Arc::new(test_schema()))),
424446
};
425447

448+
// Mock the JobQueued work.
449+
query_stage_scheduler
450+
.state
451+
.task_manager
452+
.queue_job("job-id", "job-name", 0)
453+
.await?;
454+
426455
query_stage_scheduler.on_receive(event, &tx, &rx).await?;
427456

428457
let next_event = rx.recv().await.unwrap();
429458

459+
dbg!(next_event.clone());
430460
assert!(matches!(
431461
next_event,
432462
QueryStageSchedulerEvent::JobSubmitted { job_id, resubmit, .. } if job_id == "job-id" && resubmit
@@ -540,10 +570,7 @@ mod tests {
540570
}
541571

542572
fn test_plan(partitions: usize) -> LogicalPlan {
543-
let schema = Schema::new(vec![
544-
Field::new("id", DataType::Utf8, false),
545-
Field::new("gmv", DataType::UInt64, false),
546-
]);
573+
let schema = test_schema();
547574

548575
scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), partitions)
549576
.unwrap()
@@ -552,4 +579,11 @@ mod tests {
552579
.build()
553580
.unwrap()
554581
}
582+
583+
fn test_schema() -> Schema {
584+
Schema::new(vec![
585+
Field::new("id", DataType::Utf8, false),
586+
Field::new("gmv", DataType::UInt64, false),
587+
])
588+
}
555589
}

ballista/scheduler/src/state/mod.rs

+4-9
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use ballista_core::serde::protobuf::TaskStatus;
3838
use ballista_core::serde::BallistaCodec;
3939
use datafusion::logical_expr::LogicalPlan;
4040
use datafusion::physical_plan::display::DisplayableExecutionPlan;
41+
use datafusion::physical_plan::ExecutionPlan;
4142
use datafusion::prelude::SessionContext;
4243
use datafusion_proto::logical_plan::AsLogicalPlan;
4344
use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -311,14 +312,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
311312
executor_stage_assignments
312313
}
313314

314-
pub(crate) async fn submit_job(
315+
pub(crate) async fn plan_job(
315316
&self,
316317
job_id: &str,
317-
job_name: &str,
318318
session_ctx: Arc<SessionContext>,
319319
plan: &LogicalPlan,
320-
queued_at: u64,
321-
) -> Result<()> {
320+
) -> Result<Arc<dyn ExecutionPlan>> {
322321
let start = Instant::now();
323322

324323
if log::max_level() >= log::Level::Debug {
@@ -373,15 +372,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
373372
DisplayableExecutionPlan::new(plan.as_ref()).indent()
374373
);
375374

376-
self.task_manager
377-
.submit_job(job_id, job_name, &session_ctx.session_id(), plan, queued_at)
378-
.await?;
379-
380375
let elapsed = start.elapsed();
381376

382377
info!("Planned job {} in {:?}", job_id, elapsed);
383378

384-
Ok(())
379+
Ok(plan)
385380
}
386381

387382
/// Spawn a delayed future to clean up job data on both Scheduler and Executors

0 commit comments

Comments
 (0)