Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 2061 create external table ddl table partition cols #2099

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use ballista::prelude::*;
use datafusion::prelude::{col, lit};
use datafusion::prelude::{col, lit, ParquetReadOptions};

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results, using the DataFrame trait
Expand All @@ -33,7 +33,7 @@ async fn main() -> Result<()> {

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename)
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;
Expand Down
46 changes: 36 additions & 10 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext,
AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext,
};
use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};

Expand Down Expand Up @@ -221,13 +221,17 @@ impl BallistaContext {

/// Create a DataFrame representing a Parquet table scan
/// TODO fetch schema from scheduler instead of resolving locally
pub async fn read_parquet(&self, path: &str) -> Result<Arc<DataFrame>> {
pub async fn read_parquet(
&self,
path: &str,
options: ParquetReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
let path = fs::canonicalize(&path)?;

let ctx = self.context.clone();
let df = ctx.read_parquet(path.to_str().unwrap()).await?;
let df = ctx.read_parquet(path.to_str().unwrap(), options).await?;
Ok(df)
}

Expand Down Expand Up @@ -272,8 +276,13 @@ impl BallistaContext {
}
}

pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
match self.read_parquet(path).await?.to_logical_plan() {
pub async fn register_parquet(
&self,
name: &str,
path: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
match self.read_parquet(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source)
}
Expand Down Expand Up @@ -366,25 +375,38 @@ impl BallistaContext {
ref location,
ref file_type,
ref has_header,
ref table_partition_cols,
}) => match file_type {
FileType::CSV => {
self.register_csv(
name,
location,
CsvReadOptions::new()
.schema(&schema.as_ref().to_owned().into())
.has_header(*has_header),
.has_header(*has_header)
.table_partition_cols(table_partition_cols.to_vec()),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location).await?;
self.register_parquet(
name,
location,
ParquetReadOptions::default()
.table_partition_cols(table_partition_cols.to_vec()),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
}
FileType::Avro => {
self.register_avro(name, location, AvroReadOptions::default())
.await?;
self.register_avro(
name,
location,
AvroReadOptions::default()
.table_partition_cols(table_partition_cols.to_vec()),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
Expand Down Expand Up @@ -525,7 +547,11 @@ mod tests {

let testdata = datafusion::test_util::parquet_test_data();
context
.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
.register_parquet(
"single_nan",
&format!("{}/single_nan.parquet", testdata),
ParquetReadOptions::default(),
)
.await
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ message CreateExternalTableNode {
FileType file_type = 3;
bool has_header = 4;
datafusion.DfSchema schema = 5;
repeated string table_partition_cols = 6;
}

message CreateCatalogSchemaNode {
Expand Down
6 changes: 6 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ impl AsLogicalPlan for LogicalPlanNode {
location: create_extern_table.location.clone(),
file_type: pb_file_type.into(),
has_header: create_extern_table.has_header,
table_partition_cols: create_extern_table
.table_partition_cols
.clone(),
}))
}
LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
Expand Down Expand Up @@ -771,6 +774,7 @@ impl AsLogicalPlan for LogicalPlanNode {
file_type,
has_header,
schema: df_schema,
table_partition_cols,
}) => {
use datafusion::sql::parser::FileType;

Expand All @@ -789,6 +793,7 @@ impl AsLogicalPlan for LogicalPlanNode {
file_type: pb_file_type as i32,
has_header: *has_header,
schema: Some(df_schema.into()),
table_partition_cols: table_partition_cols.clone(),
},
)),
})
Expand Down Expand Up @@ -1123,6 +1128,7 @@ mod roundtrip_tests {
location: String::from("employee.csv"),
file_type: *file,
has_header: true,
table_partition_cols: vec![],
});

roundtrip_test!(create_table_node);
Expand Down
7 changes: 5 additions & 2 deletions benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::error::Result;
use datafusion::execution::context::{SessionConfig, SessionContext};

use datafusion::physical_plan::collect;
use datafusion::prelude::CsvReadOptions;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -82,7 +82,10 @@ async fn main() -> Result<()> {
let options = CsvReadOptions::new().schema(&schema).has_header(true);
ctx.register_csv("tripdata", path, options).await?
}
"parquet" => ctx.register_parquet("tripdata", path).await?,
"parquet" => {
ctx.register_parquet("tripdata", path, ParquetReadOptions::default())
.await?
}
other => {
println!("Invalid file format '{}'", other);
process::exit(-1);
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) {
}
"parquet" => {
let path = format!("{}/{}", path, table);
ctx.register_parquet(table, &path)
ctx.register_parquet(table, &path, ParquetReadOptions::default())
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<()> {

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename)
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl FlightService for FlightServiceImpl {
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
ParquetReadOptions::default(),
)
.await
.map_err(to_tonic_err)?;
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/parquet_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn main() -> Result<()> {
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
ParquetReadOptions::default(),
)
.await?;

Expand Down
10 changes: 7 additions & 3 deletions datafusion/core/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::datatypes::{
};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::SessionContext;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distributions::uniform::SampleUniform;
Expand Down Expand Up @@ -196,8 +196,12 @@ fn criterion_benchmark(c: &mut Criterion) {
let context = SessionContext::new();

let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
rt.block_on(context.register_parquet("t", file_path.as_str()))
.unwrap();
rt.block_on(context.register_parquet(
"t",
file_path.as_str(),
ParquetReadOptions::default(),
))
.unwrap();

// We read the queries from a file so they can be changed without recompiling the benchmark
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
Expand Down
51 changes: 32 additions & 19 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ use chrono::{DateTime, Utc};
use parquet::file::properties::WriterProperties;
use uuid::Uuid;

use super::options::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions};
use super::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};

/// The default catalog name - this impacts what SQL queries use if not specified
const DEFAULT_CATALOG: &str = "datafusion";
Expand Down Expand Up @@ -215,6 +217,7 @@ impl SessionContext {
ref location,
ref file_type,
ref has_header,
ref table_partition_cols,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
Expand All @@ -241,7 +244,7 @@ impl SessionContext {
collect_stat: false,
file_extension: file_extension.to_owned(),
target_partitions: self.copied_config().target_partitions,
table_partition_cols: vec![],
table_partition_cols: table_partition_cols.clone(),
};

// TODO make schema in CreateExternalTable optional instead of empty
Expand Down Expand Up @@ -462,14 +465,23 @@ impl SessionContext {
}

/// Creates a DataFrame for reading a Parquet data source.
pub async fn read_parquet(&self, uri: impl Into<String>) -> Result<Arc<DataFrame>> {
pub async fn read_parquet(
&self,
uri: impl Into<String>,
options: ParquetReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
let uri: String = uri.into();
let (object_store, path) = self.runtime_env().object_store(&uri)?;
let target_partitions = self.copied_config().target_partitions;
let logical_plan =
LogicalPlanBuilder::scan_parquet(object_store, path, None, target_partitions)
.await?
.build()?;
let logical_plan = LogicalPlanBuilder::scan_parquet(
object_store,
path,
options,
None,
target_partitions,
)
.await?
.build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &logical_plan)))
}

Expand Down Expand Up @@ -548,20 +560,19 @@ impl SessionContext {

/// Registers a Parquet data source so that it can be referenced from SQL statements
/// executed against this context.
pub async fn register_parquet(&self, name: &str, uri: &str) -> Result<()> {
let (target_partitions, enable_pruning) = {
pub async fn register_parquet(
&self,
name: &str,
uri: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
let (target_partitions, parquet_pruning) = {
let conf = self.copied_config();
(conf.target_partitions, conf.parquet_pruning)
};
let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning);

let listing_options = ListingOptions {
format: Arc::new(file_format),
collect_stat: true,
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
target_partitions,
table_partition_cols: vec![],
};
let listing_options = options
.parquet_pruning(parquet_pruning)
.to_listing_options(target_partitions);

self.register_listing_table(name, uri, listing_options, None)
.await?;
Expand Down Expand Up @@ -3510,7 +3521,9 @@ mod tests {

async fn call_read_parquet(&self) -> Arc<DataFrame> {
let ctx = SessionContext::new();
ctx.read_parquet("dummy").await.unwrap()
ctx.read_parquet("dummy", ParquetReadOptions::default())
.await
.unwrap()
}
}
}
Loading