Skip to content

Commit 9bb46b1

Browse files
authored
Benchmark & UI improvements (#343)
* fix labeler * show logical plan in debug mode * set job name * infer schema parquet * show query time in UI * clippy * clippy
1 parent 492395a commit 9bb46b1

File tree

5 files changed

+65
-11
lines changed

5 files changed

+65
-11
lines changed

ballista/rust/core/proto/ballista.proto

+2
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,8 @@ message ExecutionGraph {
437437
uint32 task_id_gen = 8;
438438
repeated StageAttempts failed_attempts = 9;
439439
string job_name = 10;
440+
uint64 start_time = 11;
441+
uint64 end_time = 12;
440442
}
441443

442444
message StageAttempts {

ballista/rust/scheduler/src/api/handlers.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U: AsExecutionPlan>(
137137
"partitions"
138138
};
139139
format!(
140-
"Completed. Produced {} {} containing {} {}.",
140+
"Completed. Produced {} {} containing {} {}. Elapsed time: {} ms.",
141141
num_partitions, num_partitions_term, num_rows, num_rows_term,
142+
job.end_time - job.start_time
142143
)
143144
}
144145
_ => "Invalid State".to_string(),

ballista/rust/scheduler/src/state/execution_graph.rs

+25
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ pub struct ExecutionGraph {
110110
session_id: String,
111111
/// Status of this job
112112
status: JobStatus,
113+
/// Job start time
114+
start_time: u64,
115+
/// Job end time
116+
end_time: u64,
113117
/// Map from Stage ID -> ExecutionStage
114118
stages: HashMap<usize, ExecutionStage>,
115119
/// Total number fo output partitions
@@ -157,6 +161,11 @@ impl ExecutionGraph {
157161
status: JobStatus {
158162
status: Some(job_status::Status::Queued(QueuedJob {})),
159163
},
164+
start_time: SystemTime::now()
165+
.duration_since(UNIX_EPOCH)
166+
.unwrap()
167+
.as_millis() as u64,
168+
end_time: 0,
160169
stages,
161170
output_partitions,
162171
output_locations: vec![],
@@ -181,6 +190,14 @@ impl ExecutionGraph {
181190
self.status.clone()
182191
}
183192

193+
pub fn start_time(&self) -> u64 {
194+
self.start_time
195+
}
196+
197+
pub fn end_time(&self) -> u64 {
198+
self.end_time
199+
}
200+
184201
pub fn stage_count(&self) -> usize {
185202
self.stages.len()
186203
}
@@ -1212,6 +1229,10 @@ impl ExecutionGraph {
12121229
partition_location,
12131230
})),
12141231
};
1232+
self.end_time = SystemTime::now()
1233+
.duration_since(UNIX_EPOCH)
1234+
.unwrap()
1235+
.as_millis() as u64;
12151236

12161237
Ok(())
12171238
}
@@ -1291,6 +1312,8 @@ impl ExecutionGraph {
12911312
"Invalid Execution Graph: missing job status".to_owned(),
12921313
)
12931314
})?,
1315+
start_time: proto.start_time,
1316+
end_time: proto.end_time,
12941317
stages,
12951318
output_partitions: proto.output_partitions as usize,
12961319
output_locations,
@@ -1363,6 +1386,8 @@ impl ExecutionGraph {
13631386
job_name: graph.job_name,
13641387
session_id: graph.session_id,
13651388
status: Some(graph.status),
1389+
start_time: graph.start_time,
1390+
end_time: graph.end_time,
13661391
stages,
13671392
output_partitions: graph.output_partitions as u64,
13681393
output_locations,

ballista/rust/scheduler/src/state/task_manager.rs

+5
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
134134
let mut jobs = vec![];
135135
for job_id in &job_ids {
136136
let graph = self.get_execution_graph(job_id).await?;
137+
137138
let mut completed_stages = 0;
138139
for stage in graph.stages().values() {
139140
if let ExecutionStage::Successful(_) = stage {
@@ -144,6 +145,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
144145
job_id: job_id.clone(),
145146
job_name: graph.job_name().to_string(),
146147
status: graph.status(),
148+
start_time: graph.start_time(),
149+
end_time: graph.end_time(),
147150
num_stages: graph.stage_count(),
148151
completed_stages,
149152
});
@@ -669,6 +672,8 @@ pub struct JobOverview {
669672
pub job_id: String,
670673
pub job_name: String,
671674
pub status: JobStatus,
675+
pub start_time: u64,
676+
pub end_time: u64,
672677
pub num_stages: usize,
673678
pub completed_stages: usize,
674679
}

benchmarks/src/bin/tpch.rs

+31-10
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
use ballista::context::BallistaContext;
2121
use ballista::prelude::{
2222
BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
23+
BALLISTA_JOB_NAME,
2324
};
2425
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
2526
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
2627
use datafusion::datasource::listing::ListingTableUrl;
2728
use datafusion::datasource::{MemTable, TableProvider};
2829
use datafusion::error::{DataFusionError, Result};
30+
use datafusion::execution::context::SessionState;
2931
use datafusion::logical_plan::LogicalPlan;
3032
use datafusion::parquet::basic::Compression;
3133
use datafusion::parquet::file::properties::WriterProperties;
@@ -272,6 +274,7 @@ async fn main() -> Result<()> {
272274
}
273275
}
274276

277+
#[allow(clippy::await_holding_lock)]
275278
async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
276279
println!("Running benchmarks with the following options: {:?}", opt);
277280
let mut benchmark_run = BenchmarkRun::new(opt.query);
@@ -282,12 +285,17 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
282285

283286
// register tables
284287
for table in TABLES {
285-
let table_provider = get_table(
286-
opt.path.to_str().unwrap(),
287-
table,
288-
opt.file_format.as_str(),
289-
opt.partitions,
290-
)?;
288+
let table_provider = {
289+
let mut session_state = ctx.state.write();
290+
get_table(
291+
&mut session_state,
292+
opt.path.to_str().unwrap(),
293+
table,
294+
opt.file_format.as_str(),
295+
opt.partitions,
296+
)
297+
.await?
298+
};
291299
if opt.mem_table {
292300
println!("Loading table '{}' into memory", table);
293301
let start = Instant::now();
@@ -343,6 +351,10 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
343351
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
344352
&format!("{}", opt.partitions),
345353
)
354+
.set(
355+
BALLISTA_JOB_NAME,
356+
&format!("Query derived from TPC-H q{}", opt.query),
357+
)
346358
.set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
347359
.build()
348360
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
@@ -375,6 +387,10 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
375387
.await
376388
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
377389
.unwrap();
390+
let plan = df.to_logical_plan()?;
391+
if opt.debug {
392+
println!("=== Optimized logical plan ===\n{:?}\n", plan);
393+
}
378394
batches = df
379395
.collect()
380396
.await
@@ -718,7 +734,8 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
718734
Ok(())
719735
}
720736

721-
fn get_table(
737+
async fn get_table(
738+
ctx: &mut SessionState,
722739
path: &str,
723740
table: &str,
724741
table_format: &str,
@@ -765,9 +782,13 @@ fn get_table(
765782
};
766783

767784
let url = ListingTableUrl::parse(path)?;
768-
let config = ListingTableConfig::new(url)
769-
.with_listing_options(options)
770-
.with_schema(schema);
785+
let config = ListingTableConfig::new(url).with_listing_options(options);
786+
787+
let config = if table_format == "parquet" {
788+
config.infer_schema(ctx).await?
789+
} else {
790+
config.with_schema(schema)
791+
};
771792

772793
Ok(Arc::new(ListingTable::try_new(config)?))
773794
}

0 commit comments

Comments
 (0)