Skip to content

Commit aae3f3c

Browse files
authored
REST API to get query stages (#305)
1 parent 78ed511 commit aae3f3c

File tree

2 files changed

+111
-15
lines changed

2 files changed

+111
-15
lines changed

ballista/rust/scheduler/src/api/handlers.rs

+107-12
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111
// limitations under the License.
1212

1313
use crate::scheduler_server::SchedulerServer;
14+
use crate::state::execution_graph::ExecutionStage;
1415
use crate::state::execution_graph_dot::ExecutionGraphDot;
1516
use ballista_core::serde::protobuf::job_status::Status;
1617
use ballista_core::serde::AsExecutionPlan;
1718
use ballista_core::BALLISTA_VERSION;
19+
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Time};
1820
use datafusion_proto::logical_plan::AsLogicalPlan;
1921
use graphviz_rust::cmd::{CommandArg, Format};
2022
use graphviz_rust::exec;
2123
use graphviz_rust::printer::PrinterContext;
24+
use std::time::Duration;
2225
use warp::Rejection;
2326

2427
#[derive(Debug, serde::Serialize)]
@@ -50,6 +53,15 @@ pub struct JobResponse {
5053
pub percent_complete: u8,
5154
}
5255

56+
#[derive(Debug, serde::Serialize)]
57+
pub struct QueryStageSummary {
58+
pub stage_id: String,
59+
pub stage_status: String,
60+
pub input_rows: usize,
61+
pub output_rows: usize,
62+
pub elapsed_compute: String,
63+
}
64+
5365
/// Return current scheduler state
5466
pub(crate) async fn get_scheduler_state<T: AsLogicalPlan, U: AsExecutionPlan>(
5567
data_server: SchedulerServer<T, U>,
@@ -146,33 +158,116 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U: AsExecutionPlan>(
146158
}
147159

148160
#[derive(Debug, serde::Serialize)]
149-
pub struct JobSummaryResponse {
150-
/// Just show debug output for now but what we really want here is a list of stages with
151-
/// plans and metrics and the relationship between them
152-
pub summary: String,
161+
pub struct QueryStagesResponse {
162+
pub stages: Vec<QueryStageSummary>,
153163
}
154164

155165
/// Get the execution graph for the specified job id
156-
pub(crate) async fn get_job_summary<T: AsLogicalPlan, U: AsExecutionPlan>(
166+
pub(crate) async fn get_query_stages<T: AsLogicalPlan, U: AsExecutionPlan>(
157167
data_server: SchedulerServer<T, U>,
158168
job_id: String,
159169
) -> Result<impl warp::Reply, Rejection> {
160-
let graph = data_server
170+
let maybe_graph = data_server
161171
.state
162172
.task_manager
163173
.get_job_execution_graph(&job_id)
164174
.await
165175
.map_err(|_| warp::reject())?;
166176

167-
match graph {
168-
Some(x) => Ok(warp::reply::json(&JobSummaryResponse {
169-
summary: format!("{:?}", x),
170-
})),
171-
_ => Ok(warp::reply::json(&JobSummaryResponse {
172-
summary: "Not Found".to_string(),
177+
match maybe_graph {
178+
Some(graph) => Ok(warp::reply::json(&QueryStagesResponse {
179+
stages: graph
180+
.stages()
181+
.iter()
182+
.map(|(id, stage)| {
183+
let mut summary = QueryStageSummary {
184+
stage_id: id.to_string(),
185+
stage_status: "".to_string(),
186+
input_rows: 0,
187+
output_rows: 0,
188+
elapsed_compute: "".to_string(),
189+
};
190+
match stage {
191+
ExecutionStage::UnResolved(_) => {
192+
summary.stage_status = "Unresolved".to_string();
193+
}
194+
ExecutionStage::Resolved(_) => {
195+
summary.stage_status = "Resolved".to_string();
196+
}
197+
ExecutionStage::Running(running_stage) => {
198+
summary.stage_status = "Running".to_string();
199+
summary.input_rows = running_stage
200+
.stage_metrics
201+
.as_ref()
202+
.map(|m| get_combined_count(m.as_slice(), "input_rows"))
203+
.unwrap_or(0);
204+
summary.output_rows = running_stage
205+
.stage_metrics
206+
.as_ref()
207+
.map(|m| get_combined_count(m.as_slice(), "output_rows"))
208+
.unwrap_or(0);
209+
summary.elapsed_compute = running_stage
210+
.stage_metrics
211+
.as_ref()
212+
.map(|m| get_elapsed_compute_nanos(m.as_slice()))
213+
.unwrap_or_default();
214+
}
215+
ExecutionStage::Successful(completed_stage) => {
216+
summary.stage_status = "Completed".to_string();
217+
summary.input_rows = get_combined_count(
218+
&completed_stage.stage_metrics,
219+
"input_rows",
220+
);
221+
summary.output_rows = get_combined_count(
222+
&completed_stage.stage_metrics,
223+
"output_rows",
224+
);
225+
summary.elapsed_compute =
226+
get_elapsed_compute_nanos(&completed_stage.stage_metrics);
227+
}
228+
ExecutionStage::Failed(_) => {
229+
summary.stage_status = "Failed".to_string();
230+
}
231+
}
232+
summary
233+
})
234+
.collect(),
173235
})),
236+
_ => Ok(warp::reply::json(&QueryStagesResponse { stages: vec![] })),
174237
}
175238
}
239+
240+
fn get_elapsed_compute_nanos(metrics: &[MetricsSet]) -> String {
241+
let nanos: usize = metrics
242+
.iter()
243+
.flat_map(|vec| {
244+
vec.iter().map(|metric| match metric.as_ref().value() {
245+
MetricValue::ElapsedCompute(time) => time.value(),
246+
_ => 0,
247+
})
248+
})
249+
.sum();
250+
let t = Time::new();
251+
t.add_duration(Duration::from_nanos(nanos as u64));
252+
t.to_string()
253+
}
254+
255+
fn get_combined_count(metrics: &[MetricsSet], name: &str) -> usize {
256+
metrics
257+
.iter()
258+
.flat_map(|vec| {
259+
vec.iter().map(|metric| {
260+
let metric_value = metric.value();
261+
if metric_value.name() == name {
262+
metric_value.as_usize()
263+
} else {
264+
0
265+
}
266+
})
267+
})
268+
.sum()
269+
}
270+
176271
/// Generate a dot graph for the specified job id and return as plain text
177272
pub(crate) async fn get_job_dot_graph<T: AsLogicalPlan, U: AsExecutionPlan>(
178273
data_server: SchedulerServer<T, U>,

ballista/rust/scheduler/src/api/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,22 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + AsExecutionPlan>(
9797
.and(with_data_server(scheduler_server.clone()))
9898
.and_then(|data_server| handlers::get_jobs(data_server));
9999

100-
let route_job_summary = warp::path!("api" / "job" / String)
100+
let route_query_stages = warp::path!("api" / "job" / String / "stages")
101101
.and(with_data_server(scheduler_server.clone()))
102-
.and_then(|job_id, data_server| handlers::get_job_summary(data_server, job_id));
102+
.and_then(|job_id, data_server| handlers::get_query_stages(data_server, job_id));
103103

104104
let route_job_dot = warp::path!("api" / "job" / String / "dot")
105105
.and(with_data_server(scheduler_server.clone()))
106106
.and_then(|job_id, data_server| handlers::get_job_dot_graph(data_server, job_id));
107+
107108
let route_job_dot_svg = warp::path!("api" / "job" / String / "dot_svg")
108109
.and(with_data_server(scheduler_server))
109110
.and_then(|job_id, data_server| handlers::get_job_svg_graph(data_server, job_id));
110111

111112
let routes = route_scheduler_state
112113
.or(route_executors)
113114
.or(route_jobs)
114-
.or(route_job_summary)
115+
.or(route_query_stages)
115116
.or(route_job_dot)
116117
.or(route_job_dot_svg);
117118
routes.boxed()

0 commit comments

Comments
 (0)