Skip to content

Commit 0e6f046

Browse files
authored
Add REST API to generate DOT graph for individual query stage (#310)
1 parent aae3f3c commit 0e6f046

File tree

12 files changed

+183
-157
lines changed

12 files changed

+183
-157
lines changed

ballista/rust/scheduler/src/api/handlers.rs

+35-25
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U: AsExecutionPlan>(
145145
((job.completed_stages as f32 / job.num_stages as f32) * 100_f32) as u8;
146146
JobResponse {
147147
job_id: job.job_id.to_string(),
148-
job_name: job.job_name.to_owned().unwrap_or_default(),
148+
job_name: job.job_name.to_string(),
149149
job_status,
150150
num_stages: job.num_stages,
151151
completed_stages: job.completed_stages,
@@ -167,35 +167,27 @@ pub(crate) async fn get_query_stages<T: AsLogicalPlan, U: AsExecutionPlan>(
167167
data_server: SchedulerServer<T, U>,
168168
job_id: String,
169169
) -> Result<impl warp::Reply, Rejection> {
170-
let maybe_graph = data_server
170+
if let Some(graph) = data_server
171171
.state
172172
.task_manager
173173
.get_job_execution_graph(&job_id)
174174
.await
175-
.map_err(|_| warp::reject())?;
176-
177-
match maybe_graph {
178-
Some(graph) => Ok(warp::reply::json(&QueryStagesResponse {
175+
.map_err(|_| warp::reject())?
176+
{
177+
Ok(warp::reply::json(&QueryStagesResponse {
179178
stages: graph
180179
.stages()
181180
.iter()
182181
.map(|(id, stage)| {
183182
let mut summary = QueryStageSummary {
184183
stage_id: id.to_string(),
185-
stage_status: "".to_string(),
184+
stage_status: stage.variant_name().to_string(),
186185
input_rows: 0,
187186
output_rows: 0,
188187
elapsed_compute: "".to_string(),
189188
};
190189
match stage {
191-
ExecutionStage::UnResolved(_) => {
192-
summary.stage_status = "Unresolved".to_string();
193-
}
194-
ExecutionStage::Resolved(_) => {
195-
summary.stage_status = "Resolved".to_string();
196-
}
197190
ExecutionStage::Running(running_stage) => {
198-
summary.stage_status = "Running".to_string();
199191
summary.input_rows = running_stage
200192
.stage_metrics
201193
.as_ref()
@@ -213,7 +205,6 @@ pub(crate) async fn get_query_stages<T: AsLogicalPlan, U: AsExecutionPlan>(
213205
.unwrap_or_default();
214206
}
215207
ExecutionStage::Successful(completed_stage) => {
216-
summary.stage_status = "Completed".to_string();
217208
summary.input_rows = get_combined_count(
218209
&completed_stage.stage_metrics,
219210
"input_rows",
@@ -225,15 +216,14 @@ pub(crate) async fn get_query_stages<T: AsLogicalPlan, U: AsExecutionPlan>(
225216
summary.elapsed_compute =
226217
get_elapsed_compute_nanos(&completed_stage.stage_metrics);
227218
}
228-
ExecutionStage::Failed(_) => {
229-
summary.stage_status = "Failed".to_string();
230-
}
219+
_ => {}
231220
}
232221
summary
233222
})
234223
.collect(),
235-
})),
236-
_ => Ok(warp::reply::json(&QueryStagesResponse { stages: vec![] })),
224+
}))
225+
} else {
226+
Ok(warp::reply::json(&QueryStagesResponse { stages: vec![] }))
237227
}
238228
}
239229

@@ -273,16 +263,36 @@ pub(crate) async fn get_job_dot_graph<T: AsLogicalPlan, U: AsExecutionPlan>(
273263
data_server: SchedulerServer<T, U>,
274264
job_id: String,
275265
) -> Result<String, Rejection> {
276-
let graph = data_server
266+
if let Some(graph) = data_server
277267
.state
278268
.task_manager
279269
.get_job_execution_graph(&job_id)
280270
.await
281-
.map_err(|_| warp::reject())?;
271+
.map_err(|_| warp::reject())?
272+
{
273+
ExecutionGraphDot::generate(graph).map_err(|_| warp::reject())
274+
} else {
275+
Ok("Not Found".to_string())
276+
}
277+
}
282278

283-
match graph {
284-
Some(x) => ExecutionGraphDot::generate(x).map_err(|_| warp::reject()),
285-
_ => Ok("Not Found".to_string()),
279+
/// Generate a dot graph for the specified job id and query stage and return as plain text
280+
pub(crate) async fn get_query_stage_dot_graph<T: AsLogicalPlan, U: AsExecutionPlan>(
281+
data_server: SchedulerServer<T, U>,
282+
job_id: String,
283+
stage_id: usize,
284+
) -> Result<String, Rejection> {
285+
if let Some(graph) = data_server
286+
.state
287+
.task_manager
288+
.get_job_execution_graph(&job_id)
289+
.await
290+
.map_err(|_| warp::reject())?
291+
{
292+
ExecutionGraphDot::generate_for_query_stage(graph, stage_id)
293+
.map_err(|_| warp::reject())
294+
} else {
295+
Ok("Not Found".to_string())
286296
}
287297
}
288298

ballista/rust/scheduler/src/api/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + AsExecutionPlan>(
105105
.and(with_data_server(scheduler_server.clone()))
106106
.and_then(|job_id, data_server| handlers::get_job_dot_graph(data_server, job_id));
107107

108+
let route_query_stage_dot =
109+
warp::path!("api" / "job" / String / "stage" / usize / "dot")
110+
.and(with_data_server(scheduler_server.clone()))
111+
.and_then(|job_id, stage_id, data_server| {
112+
handlers::get_query_stage_dot_graph(data_server, job_id, stage_id)
113+
});
114+
108115
let route_job_dot_svg = warp::path!("api" / "job" / String / "dot_svg")
109116
.and(with_data_server(scheduler_server))
110117
.and_then(|job_id, data_server| handlers::get_job_svg_graph(data_server, job_id));
@@ -114,6 +121,7 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + AsExecutionPlan>(
114121
.or(route_jobs)
115122
.or(route_query_stages)
116123
.or(route_job_dot)
124+
.or(route_query_stage_dot)
117125
.or(route_job_dot_svg);
118126
routes.boxed()
119127
}

ballista/rust/scheduler/src/flight_sql.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,9 @@ impl FlightSqlServiceImpl {
295295
plan: &LogicalPlan,
296296
) -> Result<String, Status> {
297297
let job_id = self.server.state.task_manager.generate_job_id();
298+
let job_name = format!("Flight SQL job {}", job_id);
298299
self.server
299-
.submit_job(&job_id, None, ctx, plan)
300+
.submit_job(&job_id, &job_name, ctx, plan)
300301
.await
301302
.map_err(|e| {
302303
let msg =

ballista/rust/scheduler/src/scheduler_server/event.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::sync::Arc;
2828
pub enum QueryStageSchedulerEvent {
2929
JobQueued {
3030
job_id: String,
31-
job_name: Option<String>,
31+
job_name: String,
3232
session_ctx: Arc<SessionContext>,
3333
plan: Box<LogicalPlan>,
3434
},

ballista/rust/scheduler/src/scheduler_server/grpc.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -424,9 +424,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
424424
debug!("Received plan for execution: {:?}", plan);
425425

426426
let job_id = self.state.task_manager.generate_job_id();
427-
let job_name = config.settings().get(BALLISTA_JOB_NAME);
427+
let job_name = config
428+
.settings()
429+
.get(BALLISTA_JOB_NAME)
430+
.cloned()
431+
.unwrap_or_default();
428432

429-
self.submit_job(&job_id, job_name.cloned(), session_ctx, &plan)
433+
self.submit_job(&job_id, &job_name, session_ctx, &plan)
430434
.await
431435
.map_err(|e| {
432436
let msg =

ballista/rust/scheduler/src/scheduler_server/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -141,15 +141,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
141141
pub(crate) async fn submit_job(
142142
&self,
143143
job_id: &str,
144-
job_name: Option<String>,
144+
job_name: &str,
145145
ctx: Arc<SessionContext>,
146146
plan: &LogicalPlan,
147147
) -> Result<()> {
148148
self.query_stage_event_loop
149149
.get_sender()?
150150
.post_event(QueryStageSchedulerEvent::JobQueued {
151151
job_id: job_id.to_owned(),
152-
job_name,
152+
job_name: job_name.to_owned(),
153153
session_ctx: ctx,
154154
plan: Box::new(plan.clone()),
155155
})
@@ -339,7 +339,7 @@ mod test {
339339
// Submit job
340340
scheduler
341341
.state
342-
.submit_job(job_id, None, ctx, &plan)
342+
.submit_job(job_id, "", ctx, &plan)
343343
.await
344344
.expect("submitting plan");
345345

@@ -444,7 +444,7 @@ mod test {
444444

445445
let job_id = "job";
446446

447-
scheduler.state.submit_job(job_id, None, ctx, &plan).await?;
447+
scheduler.state.submit_job(job_id, "", ctx, &plan).await?;
448448

449449
// Complete tasks that are offered through scheduler events
450450
loop {
@@ -593,7 +593,7 @@ mod test {
593593

594594
let job_id = "job";
595595

596-
scheduler.state.submit_job(job_id, None, ctx, &plan).await?;
596+
scheduler.state.submit_job(job_id, "", ctx, &plan).await?;
597597

598598
let available_tasks = scheduler
599599
.state
@@ -729,7 +729,7 @@ mod test {
729729
let job_id = "job";
730730

731731
// This should fail when we try and create the physical plan
732-
scheduler.submit_job(job_id, None, ctx, &plan).await?;
732+
scheduler.submit_job(job_id, "", ctx, &plan).await?;
733733

734734
let scheduler = scheduler.clone();
735735

ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
8080
let state = self.state.clone();
8181
tokio::spawn(async move {
8282
let event = if let Err(e) = state
83-
.submit_job(&job_id, job_name, session_ctx, &plan)
83+
.submit_job(&job_id, &job_name, session_ctx, &plan)
8484
.await
8585
{
8686
let msg = format!("Error planning job {}: {:?}", job_id, e);

ballista/rust/scheduler/src/state/execution_graph.rs

+14-18
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ pub struct ExecutionGraph {
104104
scheduler_id: String,
105105
/// ID for this job
106106
job_id: String,
107-
/// Optional job name
108-
job_name: Option<String>,
107+
/// Job name, can be empty string
108+
job_name: String,
109109
/// Session ID for this job
110110
session_id: String,
111111
/// Status of this job
@@ -136,7 +136,7 @@ impl ExecutionGraph {
136136
pub fn new(
137137
scheduler_id: &str,
138138
job_id: &str,
139-
job_name: Option<String>,
139+
job_name: &str,
140140
session_id: &str,
141141
plan: Arc<dyn ExecutionPlan>,
142142
) -> Result<Self> {
@@ -152,7 +152,7 @@ impl ExecutionGraph {
152152
Ok(Self {
153153
scheduler_id: scheduler_id.to_string(),
154154
job_id: job_id.to_string(),
155-
job_name,
155+
job_name: job_name.to_string(),
156156
session_id: session_id.to_string(),
157157
status: JobStatus {
158158
status: Some(job_status::Status::Queued(QueuedJob {})),
@@ -169,8 +169,8 @@ impl ExecutionGraph {
169169
self.job_id.as_str()
170170
}
171171

172-
pub fn job_name(&self) -> Option<&String> {
173-
self.job_name.as_ref()
172+
pub fn job_name(&self) -> &str {
173+
self.job_name.as_str()
174174
}
175175

176176
pub fn session_id(&self) -> &str {
@@ -1284,11 +1284,7 @@ impl ExecutionGraph {
12841284
Ok(ExecutionGraph {
12851285
scheduler_id: proto.scheduler_id,
12861286
job_id: proto.job_id,
1287-
job_name: if proto.job_name.is_empty() {
1288-
None
1289-
} else {
1290-
Some(proto.job_name)
1291-
},
1287+
job_name: proto.job_name,
12921288
session_id: proto.session_id,
12931289
status: proto.status.ok_or_else(|| {
12941290
BallistaError::Internal(
@@ -1364,7 +1360,7 @@ impl ExecutionGraph {
13641360

13651361
Ok(protobuf::ExecutionGraph {
13661362
job_id: graph.job_id,
1367-
job_name: graph.job_name.unwrap_or_default(),
1363+
job_name: graph.job_name,
13681364
session_id: graph.session_id,
13691365
status: Some(graph.status),
13701366
stages,
@@ -2748,7 +2744,7 @@ mod test {
27482744

27492745
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
27502746

2751-
ExecutionGraph::new("localhost:50050", "job", None, "session", plan).unwrap()
2747+
ExecutionGraph::new("localhost:50050", "job", "", "session", plan).unwrap()
27522748
}
27532749

27542750
async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph {
@@ -2776,7 +2772,7 @@ mod test {
27762772

27772773
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
27782774

2779-
ExecutionGraph::new("localhost:50050", "job", None, "session", plan).unwrap()
2775+
ExecutionGraph::new("localhost:50050", "job", "", "session", plan).unwrap()
27802776
}
27812777

27822778
async fn test_coalesce_plan(partition: usize) -> ExecutionGraph {
@@ -2799,7 +2795,7 @@ mod test {
27992795

28002796
let plan = ctx.create_physical_plan(&optimized_plan).await.unwrap();
28012797

2802-
ExecutionGraph::new("localhost:50050", "job", None, "session", plan).unwrap()
2798+
ExecutionGraph::new("localhost:50050", "job", "", "session", plan).unwrap()
28032799
}
28042800

28052801
async fn test_join_plan(partition: usize) -> ExecutionGraph {
@@ -2841,7 +2837,7 @@ mod test {
28412837
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
28422838

28432839
let graph =
2844-
ExecutionGraph::new("localhost:50050", "job", None, "session", plan).unwrap();
2840+
ExecutionGraph::new("localhost:50050", "job", "", "session", plan).unwrap();
28452841

28462842
println!("{:?}", graph);
28472843

@@ -2866,7 +2862,7 @@ mod test {
28662862
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
28672863

28682864
let graph =
2869-
ExecutionGraph::new("localhost:50050", "job", None, "session", plan).unwrap();
2865+
ExecutionGraph::new("localhost:50050", "job", "", "session", plan).unwrap();
28702866

28712867
println!("{:?}", graph);
28722868

@@ -2891,7 +2887,7 @@ mod test {
28912887
println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
28922888

28932889
let graph =
2894-
ExecutionGraph::new("localhost:50050", "job", None, "session", plan).unwrap();
2890+
ExecutionGraph::new("localhost:50050", "job", "", "session", plan).unwrap();
28952891

28962892
println!("{:?}", graph);
28972893

ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs

+24
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,30 @@ impl Debug for ExecutionStage {
7373
}
7474
}
7575

76+
impl ExecutionStage {
77+
/// Get the name of the variant
78+
pub(crate) fn variant_name(&self) -> &str {
79+
match self {
80+
ExecutionStage::UnResolved(_) => "Unresolved",
81+
ExecutionStage::Resolved(_) => "Resolved",
82+
ExecutionStage::Running(_) => "Running",
83+
ExecutionStage::Successful(_) => "Successful",
84+
ExecutionStage::Failed(_) => "Failed",
85+
}
86+
}
87+
88+
/// Get the query plan for this query stage
89+
pub(crate) fn plan(&self) -> &dyn ExecutionPlan {
90+
match self {
91+
ExecutionStage::UnResolved(stage) => stage.plan.as_ref(),
92+
ExecutionStage::Resolved(stage) => stage.plan.as_ref(),
93+
ExecutionStage::Running(stage) => stage.plan.as_ref(),
94+
ExecutionStage::Successful(stage) => stage.plan.as_ref(),
95+
ExecutionStage::Failed(stage) => stage.plan.as_ref(),
96+
}
97+
}
98+
}
99+
76100
/// For a stage whose input stages are not all completed, we say it's a unresolved stage
77101
#[derive(Clone)]
78102
pub(crate) struct UnresolvedStage {

0 commit comments

Comments
 (0)