Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmark & UI improvements #343

Merged
merged 8 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ message ExecutionGraph {
uint32 task_id_gen = 8;
repeated StageAttempts failed_attempts = 9;
string job_name = 10;
uint64 start_time = 11;
uint64 end_time = 12;
}

message StageAttempts {
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U: AsExecutionPlan>(
"partitions"
};
format!(
"Completed. Produced {} {} containing {} {}.",
"Completed. Produced {} {} containing {} {}. Elapsed time: {} ms.",
num_partitions, num_partitions_term, num_rows, num_rows_term,
job.end_time - job.start_time
)
}
_ => "Invalid State".to_string(),
Expand Down
25 changes: 25 additions & 0 deletions ballista/rust/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ pub struct ExecutionGraph {
session_id: String,
/// Status of this job
status: JobStatus,
/// Job start time
start_time: u64,
/// Job end time
end_time: u64,
/// Map from Stage ID -> ExecutionStage
stages: HashMap<usize, ExecutionStage>,
/// Total number fo output partitions
Expand Down Expand Up @@ -157,6 +161,11 @@ impl ExecutionGraph {
status: JobStatus {
status: Some(job_status::Status::Queued(QueuedJob {})),
},
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
end_time: 0,
stages,
output_partitions,
output_locations: vec![],
Expand All @@ -181,6 +190,14 @@ impl ExecutionGraph {
self.status.clone()
}

pub fn start_time(&self) -> u64 {
self.start_time
}

pub fn end_time(&self) -> u64 {
self.end_time
}

pub fn stage_count(&self) -> usize {
self.stages.len()
}
Expand Down Expand Up @@ -1212,6 +1229,10 @@ impl ExecutionGraph {
partition_location,
})),
};
self.end_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;

Ok(())
}
Expand Down Expand Up @@ -1291,6 +1312,8 @@ impl ExecutionGraph {
"Invalid Execution Graph: missing job status".to_owned(),
)
})?,
start_time: proto.start_time,
end_time: proto.end_time,
stages,
output_partitions: proto.output_partitions as usize,
output_locations,
Expand Down Expand Up @@ -1363,6 +1386,8 @@ impl ExecutionGraph {
job_name: graph.job_name,
session_id: graph.session_id,
status: Some(graph.status),
start_time: graph.start_time,
end_time: graph.end_time,
stages,
output_partitions: graph.output_partitions as u64,
output_locations,
Expand Down
5 changes: 5 additions & 0 deletions ballista/rust/scheduler/src/state/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
let mut jobs = vec![];
for job_id in &job_ids {
let graph = self.get_execution_graph(job_id).await?;

let mut completed_stages = 0;
for stage in graph.stages().values() {
if let ExecutionStage::Successful(_) = stage {
Expand All @@ -144,6 +145,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
job_id: job_id.clone(),
job_name: graph.job_name().to_string(),
status: graph.status(),
start_time: graph.start_time(),
end_time: graph.end_time(),
num_stages: graph.stage_count(),
completed_stages,
});
Expand Down Expand Up @@ -669,6 +672,8 @@ pub struct JobOverview {
pub job_id: String,
pub job_name: String,
pub status: JobStatus,
pub start_time: u64,
pub end_time: u64,
pub num_stages: usize,
pub completed_stages: usize,
}
41 changes: 31 additions & 10 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
use ballista::context::BallistaContext;
use ballista::prelude::{
BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
BALLISTA_JOB_NAME,
};
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::logical_plan::LogicalPlan;
use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
Expand Down Expand Up @@ -272,6 +274,7 @@ async fn main() -> Result<()> {
}
}

#[allow(clippy::await_holding_lock)]
async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
println!("Running benchmarks with the following options: {:?}", opt);
let mut benchmark_run = BenchmarkRun::new(opt.query);
Expand All @@ -282,12 +285,17 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB

// register tables
for table in TABLES {
let table_provider = get_table(
opt.path.to_str().unwrap(),
table,
opt.file_format.as_str(),
opt.partitions,
)?;
let table_provider = {
let mut session_state = ctx.state.write();
get_table(
&mut session_state,
opt.path.to_str().unwrap(),
table,
opt.file_format.as_str(),
opt.partitions,
)
.await?
};
if opt.mem_table {
println!("Loading table '{}' into memory", table);
let start = Instant::now();
Expand Down Expand Up @@ -343,6 +351,10 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
&format!("{}", opt.partitions),
)
.set(
BALLISTA_JOB_NAME,
&format!("Query derived from TPC-H q{}", opt.query),
)
.set(BALLISTA_DEFAULT_BATCH_SIZE, &format!("{}", opt.batch_size))
.build()
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Expand Down Expand Up @@ -375,6 +387,10 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
let plan = df.to_logical_plan()?;
if opt.debug {
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
batches = df
.collect()
.await
Expand Down Expand Up @@ -718,7 +734,8 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
Ok(())
}

fn get_table(
async fn get_table(
ctx: &mut SessionState,
path: &str,
table: &str,
table_format: &str,
Expand Down Expand Up @@ -765,9 +782,13 @@ fn get_table(
};

let url = ListingTableUrl::parse(path)?;
let config = ListingTableConfig::new(url)
.with_listing_options(options)
.with_schema(schema);
let config = ListingTableConfig::new(url).with_listing_options(options);

let config = if table_format == "parquet" {
config.infer_schema(ctx).await?
} else {
config.with_schema(schema)
};

Ok(Arc::new(ListingTable::try_new(config)?))
}
Expand Down