diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 67d0be572..1c5e8bcf7 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -437,6 +437,8 @@ message ExecutionGraph { uint32 task_id_gen = 8; repeated StageAttempts failed_attempts = 9; string job_name = 10; + uint64 start_time = 11; + uint64 end_time = 12; } message StageAttempts { diff --git a/ballista/rust/scheduler/src/api/handlers.rs b/ballista/rust/scheduler/src/api/handlers.rs index 48d770a31..71cb9ea92 100644 --- a/ballista/rust/scheduler/src/api/handlers.rs +++ b/ballista/rust/scheduler/src/api/handlers.rs @@ -137,8 +137,9 @@ pub(crate) async fn get_jobs( "partitions" }; format!( - "Completed. Produced {} {} containing {} {}.", + "Completed. Produced {} {} containing {} {}. Elapsed time: {} ms.", num_partitions, num_partitions_term, num_rows, num_rows_term, + job.end_time - job.start_time ) } _ => "Invalid State".to_string(), diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs b/ballista/rust/scheduler/src/state/execution_graph.rs index 1524b64c6..5702bed16 100644 --- a/ballista/rust/scheduler/src/state/execution_graph.rs +++ b/ballista/rust/scheduler/src/state/execution_graph.rs @@ -110,6 +110,10 @@ pub struct ExecutionGraph { session_id: String, /// Status of this job status: JobStatus, + /// Job start time + start_time: u64, + /// Job end time + end_time: u64, /// Map from Stage ID -> ExecutionStage stages: HashMap, /// Total number fo output partitions @@ -157,6 +161,11 @@ impl ExecutionGraph { status: JobStatus { status: Some(job_status::Status::Queued(QueuedJob {})), }, + start_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + end_time: 0, stages, output_partitions, output_locations: vec![], @@ -181,6 +190,14 @@ impl ExecutionGraph { self.status.clone() } + pub fn start_time(&self) -> u64 { + self.start_time + } + + pub fn end_time(&self) -> u64 { + self.end_time + } + pub fn stage_count(&self) -> usize { self.stages.len() } @@ -1212,6 +1229,10 @@ impl ExecutionGraph { partition_location, })), }; + self.end_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; Ok(()) } @@ -1291,6 +1312,8 @@ impl ExecutionGraph { "Invalid Execution Graph: missing job status".to_owned(), ) })?, + start_time: proto.start_time, + end_time: proto.end_time, stages, output_partitions: proto.output_partitions as usize, output_locations, @@ -1363,6 +1386,8 @@ impl ExecutionGraph { job_name: graph.job_name, session_id: graph.session_id, status: Some(graph.status), + start_time: graph.start_time, + end_time: graph.end_time, stages, output_partitions: graph.output_partitions as u64, output_locations, diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index ddd5ba9c9..d005e269a 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -134,6 +134,7 @@ impl TaskManager let mut jobs = vec![]; for job_id in &job_ids { let graph = self.get_execution_graph(job_id).await?; + let mut completed_stages = 0; for stage in graph.stages().values() { if let ExecutionStage::Successful(_) = stage { @@ -144,6 +145,8 @@ impl TaskManager job_id: job_id.clone(), job_name: graph.job_name().to_string(), status: graph.status(), + start_time: graph.start_time(), + end_time: graph.end_time(), num_stages: graph.stage_count(), completed_stages, }); @@ -669,6 +672,8 @@ pub struct JobOverview { pub job_id: String, pub job_name: String, pub status: JobStatus, + pub start_time: u64, + pub end_time: u64, pub num_stages: usize, pub completed_stages: usize, } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index e70c70765..ece296fb5 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -20,12 +20,14 @@ use ballista::context::BallistaContext; use ballista::prelude::{ BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, + BALLISTA_JOB_NAME, }; use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use datafusion::datasource::listing::ListingTableUrl; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::context::SessionState; use datafusion::logical_plan::LogicalPlan; use datafusion::parquet::basic::Compression; use datafusion::parquet::file::properties::WriterProperties; @@ -272,6 +274,7 @@ async fn main() -> Result<()> { } } +#[allow(clippy::await_holding_lock)] async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result> { println!("Running benchmarks with the following options: {:?}", opt); let mut benchmark_run = BenchmarkRun::new(opt.query); @@ -282,12 +285,17 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result<()> { BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, &format!("{}", opt.partitions), ) + .set( + BALLISTA_JOB_NAME, + &format!("Query derived from TPC-H q{}", opt.query), + ) .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size)) .build() .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; @@ -375,6 +387,10 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { .await .map_err(|e| DataFusionError::Plan(format!("{:?}", e))) .unwrap(); + let plan = df.to_logical_plan()?; + if opt.debug { + println!("=== Optimized logical plan ===\n{:?}\n", plan); + } batches = df .collect() .await @@ -718,7 +734,8 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { Ok(()) } -fn get_table( +async fn get_table( + ctx: &mut SessionState, path: &str, table: &str, table_format: &str, @@ -765,9 +782,13 @@ fn get_table( }; let url = ListingTableUrl::parse(path)?; - let config = ListingTableConfig::new(url) - .with_listing_options(options) - .with_schema(schema); + let config = ListingTableConfig::new(url).with_listing_options(options); + + let config = if table_format == "parquet" { + config.infer_schema(ctx).await? + } else { + config.with_schema(schema) + }; Ok(Arc::new(ListingTable::try_new(config)?)) }