diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index bff7999cdb16..63eb9127ece8 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -402,7 +402,6 @@ async fn get_table( unimplemented!("Invalid file format '{}'", other); } }; - let schema = Arc::new(get_tpch_table_schema(table)); let options = ListingOptions::new(format) .with_file_extension(extension) @@ -412,10 +411,11 @@ async fn get_table( let table_path = ListingTableUrl::parse(path)?; let config = ListingTableConfig::new(table_path).with_listing_options(options); - let config = if table_format == "parquet" { - config.infer_schema(&state).await? - } else { - config.with_schema(schema) + let config = match table_format { + "parquet" => config.infer_schema(&state).await?, + "tbl" => config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))), + "csv" => config.with_schema(Arc::new(get_tpch_table_schema(table))), + _ => unreachable!(), }; Ok(Arc::new(ListingTable::try_new(config)?)) @@ -827,6 +827,7 @@ mod tests { #[cfg(feature = "ci")] mod ci { use super::*; + use arrow::datatypes::{DataType, Field}; use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; async fn serde_round_trip(query: usize) -> Result<()> { @@ -1086,7 +1087,6 @@ mod ci { /// * the correct number of rows are returned /// * the content of the rows is correct async fn verify_query(n: usize) -> Result<()> { - use datafusion::arrow::datatypes::{DataType, Field}; use datafusion::common::ScalarValue; use datafusion::logical_expr::expr::Cast; @@ -1214,7 +1214,8 @@ mod ci { } fn get_tpch_data_path() -> Result { - let path = std::env::var("TPCH_DATA").unwrap_or("benchmarks/data".to_string()); + let path = + std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); if !Path::new(&path).exists() { return Err(DataFusionError::Execution(format!( "Benchmark data not found (set TPCH_DATA env var to override): {}", diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch.rs index deaecdd93db1..47a69f62d198 100644 --- a/benchmarks/src/tpch.rs +++ b/benchmarks/src/tpch.rs @@ -43,6 +43,15 @@ pub const TPCH_TABLES: &[&str] = &[ "part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region", ]; +/// The `.tbl` file contains a trailing column +pub fn get_tbl_tpch_table_schema(table: &str) -> Schema { + let mut schema = get_tpch_table_schema(table); + schema + .fields + .push(Field::new("__placeholder", DataType::Utf8, false)); + schema +} + /// Get the schema for the benchmarks derived from TPC-H pub fn get_tpch_table_schema(table: &str) -> Schema { // note that the schema intentionally uses signed integers so that any generated Parquet @@ -331,7 +340,7 @@ pub async fn convert_tbl( let output_root_path = Path::new(output_path); for table in TPCH_TABLES { let start = Instant::now(); - let schema = get_tpch_table_schema(table); + let schema = get_tbl_tpch_table_schema(table); let input_path = format!("{input_path}/{table}.tbl"); let options = CsvReadOptions::new() @@ -346,6 +355,16 @@ pub async fn convert_tbl( // build plan to read the TBL file let mut csv = ctx.read_csv(&input_path, options).await?; + // Select all apart from the padding column + let selection = csv + .schema() + .fields() + .iter() + .take(schema.fields.len() - 1) + .map(|d| Expr::Column(d.qualified_column())) + .collect(); + + csv = csv.select(selection)?; // optionally, repartition the file if partitions > 1 { csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?