Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmark looks for path with and without extension #354

Merged
merged 1 commit into from
Oct 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
let path = opt.path.to_str().unwrap();
let file_format = opt.file_format.as_str();

register_tables(path, file_format, &ctx).await;
register_tables(path, file_format, &ctx, opt.debug).await?;

let mut millis = vec![];

Expand Down Expand Up @@ -474,7 +474,7 @@ async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> {
let sql_path = opt.sql_path.to_str().unwrap().to_string();

for ctx in &clients {
register_tables(path, file_format, ctx).await;
register_tables(path, file_format, ctx, opt.debug).await?;
}

let request_per_thread = request_amount.div(concurrency);
Expand Down Expand Up @@ -552,44 +552,83 @@ fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> {
}
}

async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) {
async fn register_tables(
path: &str,
file_format: &str,
ctx: &BallistaContext,
debug: bool,
) -> Result<()> {
for table in TABLES {
match file_format {
// dbgen creates .tbl ('|' delimited) files without header
"tbl" => {
let path = format!("{}/{}.tbl", path, table);
let path = find_path(path, table, "tbl")?;
let schema = get_schema(table);
let options = CsvReadOptions::new()
.schema(&schema)
.delimiter(b'|')
.has_header(false)
.file_extension(".tbl");
if debug {
println!(
"Registering table '{}' using TBL files at path {}",
table, path
);
}
ctx.register_csv(table, &path, options)
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
}
"csv" => {
let path = format!("{}/{}", path, table);
let path = find_path(path, table, "csv")?;
let schema = get_schema(table);
let options = CsvReadOptions::new().schema(&schema).has_header(true);
if debug {
println!(
"Registering table '{}' using CSV files at path {}",
table, path
);
}
ctx.register_csv(table, &path, options)
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
}
"parquet" => {
let path = format!("{}/{}", path, table);
let path = find_path(path, table, "parquet")?;
if debug {
println!(
"Registering table '{}' using Parquet files at path {}",
table, path
);
}
ctx.register_parquet(table, &path, ParquetReadOptions::default())
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
}
other => {
unimplemented!("Invalid file format '{}'", other);
return Err(DataFusionError::Plan(format!(
"Invalid file format '{}'",
other
)))
}
}
}
Ok(())
}

fn find_path(path: &str, table: &str, ext: &str) -> Result<String> {
let path1 = format!("{}/{}.{}", path, table, ext);
let path2 = format!("{}/{}", path, table);
if Path::new(&path1).exists() {
Ok(path1)
} else if Path::new(&path2).exists() {
Ok(path2)
} else {
Err(DataFusionError::Plan(format!(
"Could not find {} files at {} or {}",
ext, path1, path2
)))
}
}

/// Get the SQL statements from the specified query file
Expand Down