From 42433f03cac2480e479cc7149eb5cec6fa1feba0 Mon Sep 17 00:00:00 2001
From: Andy Grove <andygrove73@gmail.com>
Date: Mon, 10 Oct 2022 07:11:35 -0600
Subject: [PATCH 1/7] fix labeler

---
 .github/workflows/dev_pr.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/dev_pr.yml b/.github/workflows/dev_pr.yml
index 0011cafa6..6806a0f40 100644
--- a/.github/workflows/dev_pr.yml
+++ b/.github/workflows/dev_pr.yml
@@ -36,7 +36,7 @@ jobs:
           github.event_name == 'pull_request_target' &&
             (github.event.action == 'opened' ||
              github.event.action == 'synchronize')
-        uses: actions/labeler@4.0.2
+        uses: actions/labeler@v4.0.2
         with:
           repo-token: ${{ secrets.GITHUB_TOKEN }}
           configuration-path: .github/workflows/dev_pr/labeler.yml

From f8d728b370ff27ef899f7fa8c39263ef16aadcf6 Mon Sep 17 00:00:00 2001
From: Andy Grove <andygrove73@gmail.com>
Date: Mon, 10 Oct 2022 08:23:56 -0600
Subject: [PATCH 2/7] show logical plan in debug mode

---
 benchmarks/src/bin/tpch.rs | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 00da201c6..726f396e9 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -375,6 +375,10 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
                 .await
                 .map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
                 .unwrap();
+            let plan = df.to_logical_plan()?;
+            if opt.debug {
+                println!("=== Optimized logical plan ===\n{:?}\n", plan);
+            }
             batches = df
                 .collect()
                 .await

From cd1edd6915b17a24e5994d2b232d3d463bcf6527 Mon Sep 17 00:00:00 2001
From: Andy Grove <andygrove73@gmail.com>
Date: Wed, 12 Oct 2022 17:02:34 -0600
Subject: [PATCH 3/7] set job name

---
 benchmarks/src/bin/tpch.rs | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 726f396e9..4d2975b82 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -20,6 +20,7 @@
 use ballista::context::BallistaContext;
 use ballista::prelude::{
     BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
+    BALLISTA_JOB_NAME,
 };
 use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
 use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
@@ -343,6 +344,10 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
             BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
             &format!("{}", opt.partitions),
         )
+        .set(
+            BALLISTA_JOB_NAME,
+            &format!("Query derived from TPC-H q{}", opt.query),
+        )
         .set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
         .build()
         .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

From 5d5bb6d49d3993ff6e92c41c82576e270619326b Mon Sep 17 00:00:00 2001
From: Andy Grove <andygrove73@gmail.com>
Date: Wed, 12 Oct 2022 18:08:48 -0600
Subject: [PATCH 4/7] infer schema parquet

---
 benchmarks/src/bin/tpch.rs | 31 +++++++++++++++++++++----------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index a16c13aa8..a743851af 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -57,6 +57,7 @@ use std::{
     sync::Arc,
     time::{Instant, SystemTime},
 };
+use datafusion::execution::context::SessionState;
 use structopt::StructOpt;
 
 #[cfg(feature = "snmalloc")]
@@ -283,12 +284,16 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
 
     // register tables
     for table in TABLES {
-        let table_provider = get_table(
-            opt.path.to_str().unwrap(),
-            table,
-            opt.file_format.as_str(),
-            opt.partitions,
-        )?;
+        let mut session_state = ctx.state.write();
+        let table_provider = {
+            get_table(
+                &mut session_state,
+                opt.path.to_str().unwrap(),
+                table,
+                opt.file_format.as_str(),
+                opt.partitions,
+            ).await?
+        };
         if opt.mem_table {
             println!("Loading table '{}' into memory", table);
             let start = Instant::now();
@@ -727,7 +732,8 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
     Ok(())
 }
 
-fn get_table(
+async fn get_table(
+    ctx: &mut SessionState,
     path: &str,
     table: &str,
     table_format: &str,
@@ -774,9 +780,14 @@ fn get_table(
     };
 
     let url = ListingTableUrl::parse(path)?;
-    let config = ListingTableConfig::new(url)
-        .with_listing_options(options)
-        .with_schema(schema);
+    let config = ListingTableConfig::new(url).with_listing_options(options);
+
+    let config = if table_format == "parquet" {
+        config.infer_schema(ctx).await?
+    } else {
+        config.with_schema(schema)
+    };
+
 
     Ok(Arc::new(ListingTable::try_new(config)?))
 }

From 588bac8445e32a02600581242aac1897effb67d2 Mon Sep 17 00:00:00 2001
From: Andy Grove <andygrove73@gmail.com>
Date: Wed, 12 Oct 2022 18:58:40 -0600
Subject: [PATCH 5/7] show query time in UI

---
 ballista/rust/core/proto/ballista.proto       |  2 ++
 ballista/rust/scheduler/src/api/handlers.rs   |  3 ++-
 .../scheduler/src/state/execution_graph.rs    | 25 +++++++++++++++++++
 .../rust/scheduler/src/state/task_manager.rs  |  5 ++++
 benchmarks/src/bin/tpch.rs                    |  6 ++---
 5 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 67d0be572..1c5e8bcf7 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -437,6 +437,8 @@ message ExecutionGraph {
   uint32 task_id_gen = 8;
   repeated StageAttempts failed_attempts = 9;
   string job_name = 10;
+  uint64 start_time = 11;
+  uint64 end_time = 12;
 }
 
 message StageAttempts {
diff --git a/ballista/rust/scheduler/src/api/handlers.rs b/ballista/rust/scheduler/src/api/handlers.rs
index 48d770a31..71cb9ea92 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -137,8 +137,9 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U: AsExecutionPlan>(
                         "partitions"
                     };
                     format!(
-                        "Completed. Produced {} {} containing {} {}.",
+                        "Completed. Produced {} {} containing {} {}. Elapsed time: {} ms.",
                         num_partitions, num_partitions_term, num_rows, num_rows_term,
+                        job.end_time - job.start_time
                     )
                 }
                 _ => "Invalid State".to_string(),
diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs b/ballista/rust/scheduler/src/state/execution_graph.rs
index 1524b64c6..5702bed16 100644
--- a/ballista/rust/scheduler/src/state/execution_graph.rs
+++ b/ballista/rust/scheduler/src/state/execution_graph.rs
@@ -110,6 +110,10 @@ pub struct ExecutionGraph {
     session_id: String,
     /// Status of this job
     status: JobStatus,
+    /// Job start time
+    start_time: u64,
+    /// Job end time
+    end_time: u64,
     /// Map from Stage ID -> ExecutionStage
     stages: HashMap<usize, ExecutionStage>,
     /// Total number fo output partitions
@@ -157,6 +161,11 @@ impl ExecutionGraph {
             status: JobStatus {
                 status: Some(job_status::Status::Queued(QueuedJob {})),
             },
+            start_time: SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .unwrap()
+                .as_millis() as u64,
+            end_time: 0,
             stages,
             output_partitions,
             output_locations: vec![],
@@ -181,6 +190,14 @@ impl ExecutionGraph {
         self.status.clone()
     }
 
+    pub fn start_time(&self) -> u64 {
+        self.start_time
+    }
+
+    pub fn end_time(&self) -> u64 {
+        self.end_time
+    }
+
     pub fn stage_count(&self) -> usize {
         self.stages.len()
     }
@@ -1212,6 +1229,10 @@ impl ExecutionGraph {
                 partition_location,
             })),
         };
+        self.end_time = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap()
+            .as_millis() as u64;
 
         Ok(())
     }
@@ -1291,6 +1312,8 @@ impl ExecutionGraph {
                     "Invalid Execution Graph: missing job status".to_owned(),
                 )
             })?,
+            start_time: proto.start_time,
+            end_time: proto.end_time,
             stages,
             output_partitions: proto.output_partitions as usize,
             output_locations,
@@ -1363,6 +1386,8 @@ impl ExecutionGraph {
             job_name: graph.job_name,
             session_id: graph.session_id,
             status: Some(graph.status),
+            start_time: graph.start_time,
+            end_time: graph.end_time,
             stages,
             output_partitions: graph.output_partitions as u64,
             output_locations,
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs
index ddd5ba9c9..d005e269a 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -134,6 +134,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
         let mut jobs = vec![];
         for job_id in &job_ids {
             let graph = self.get_execution_graph(job_id).await?;
+
             let mut completed_stages = 0;
             for stage in graph.stages().values() {
                 if let ExecutionStage::Successful(_) = stage {
@@ -144,6 +145,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
                 job_id: job_id.clone(),
                 job_name: graph.job_name().to_string(),
                 status: graph.status(),
+                start_time: graph.start_time(),
+                end_time: graph.end_time(),
                 num_stages: graph.stage_count(),
                 completed_stages,
             });
@@ -669,6 +672,8 @@ pub struct JobOverview {
     pub job_id: String,
     pub job_name: String,
     pub status: JobStatus,
+    pub start_time: u64,
+    pub end_time: u64,
     pub num_stages: usize,
     pub completed_stages: usize,
 }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index a743851af..14f520b2f 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -27,6 +27,7 @@ use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
 use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::SessionState;
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::parquet::basic::Compression;
 use datafusion::parquet::file::properties::WriterProperties;
@@ -57,7 +58,6 @@ use std::{
     sync::Arc,
     time::{Instant, SystemTime},
 };
-use datafusion::execution::context::SessionState;
 use structopt::StructOpt;
 
 #[cfg(feature = "snmalloc")]
@@ -292,7 +292,8 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
                 table,
                 opt.file_format.as_str(),
                 opt.partitions,
-            ).await?
+            )
+            .await?
         };
         if opt.mem_table {
             println!("Loading table '{}' into memory", table);
@@ -788,7 +789,6 @@ async fn get_table(
         config.with_schema(schema)
     };
 
-
     Ok(Arc::new(ListingTable::try_new(config)?))
 }
 

From 1eaa84addf0c16f291d4d2e3060203f5938cfe83 Mon Sep 17 00:00:00 2001
From: Andy Grove <andygrove73@gmail.com>
Date: Wed, 12 Oct 2022 20:36:16 -0600
Subject: [PATCH 6/7] clippy

---
 benchmarks/src/bin/tpch.rs | 1 +
 1 file changed, 1 insertion(+)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 14f520b2f..6c00d9c71 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -286,6 +286,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
     for table in TABLES {
         let mut session_state = ctx.state.write();
         let table_provider = {
+            #[allow(clippy::await_holding_lock)]
             get_table(
                 &mut session_state,
                 opt.path.to_str().unwrap(),

From 3bfe0d2f914b8c73308d675e680a1a2172acbbbf Mon Sep 17 00:00:00 2001
From: Andy Grove <andygrove73@gmail.com>
Date: Thu, 13 Oct 2022 08:18:47 -0600
Subject: [PATCH 7/7] clippy

---
 benchmarks/src/bin/tpch.rs | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 6c00d9c71..ece296fb5 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -274,6 +274,7 @@ async fn main() -> Result<()> {
     }
 }
 
+#[allow(clippy::await_holding_lock)]
 async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
     println!("Running benchmarks with the following options: {:?}", opt);
     let mut benchmark_run = BenchmarkRun::new(opt.query);
@@ -284,9 +285,8 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
 
     // register tables
     for table in TABLES {
-        let mut session_state = ctx.state.write();
         let table_provider = {
-            #[allow(clippy::await_holding_lock)]
+            let mut session_state = ctx.state.write();
             get_table(
                 &mut session_state,
                 opt.path.to_str().unwrap(),