diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 69ae971fd434..9b81c617ec61 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -17,6 +17,7 @@ //! Distributed execution context. +use datafusion::execution::options::ArrowReadOptions; use parking_lot::Mutex; use sqlparser::ast::Statement; use std::collections::HashMap; @@ -186,6 +187,30 @@ impl BallistaContext { Ok(df) } + /// Create a DataFrame representing a Arrow table scan + /// TODO fetch schema from scheduler instead of resolving locally + pub async fn read_arrow( + &self, + path: &str, + options: ArrowReadOptions<'_>, + ) -> Result> { + // convert to absolute path because the executor likely has a different working directory + let path = PathBuf::from(path); + let path = fs::canonicalize(&path)?; + + // use local DataFusion context for now but later this might call the scheduler + let mut ctx = { + let guard = self.state.lock(); + create_df_ctx_with_ballista_query_planner::( + &guard.scheduler_host, + guard.scheduler_port, + guard.config(), + ) + }; + let df = ctx.read_arrow(path.to_str().unwrap(), options).await?; + Ok(df) + } + /// Create a DataFrame representing a CSV table scan /// TODO fetch schema from scheduler instead of resolving locally pub async fn read_csv( @@ -244,6 +269,20 @@ impl BallistaContext { } } + pub async fn register_arrow( + &self, + name: &str, + path: &str, + options: ArrowReadOptions<'_>, + ) -> Result<()> { + match self.read_arrow(path, options).await?.to_logical_plan() { + LogicalPlan::TableScan(TableScan { source, .. }) => { + self.register_table(name, source) + } + _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())), + } + } + pub async fn register_avro( &self, name: &str, diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index a0925b4443bb..117da2e3720f 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -376,6 +376,7 @@ enum FileType { Parquet = 1; CSV = 2; Avro = 3; + Arrow = 4; } message AnalyzeNode { diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index d4a28cf10490..4fb29f4c8fea 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -1006,6 +1006,7 @@ impl Into for protobuf::FileType { protobuf::FileType::Parquet => FileType::Parquet, protobuf::FileType::Csv => FileType::CSV, protobuf::FileType::Avro => FileType::Avro, + protobuf::FileType::Arrow => FileType::Arrow, } } } diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 3166a48d6f36..91a53ea37933 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -739,6 +739,7 @@ impl AsLogicalPlan for LogicalPlanNode { FileType::Parquet => protobuf::FileType::Parquet, FileType::CSV => protobuf::FileType::Csv, FileType::Avro => protobuf::FileType::Avro, + FileType::Arrow => protobuf::FileType::Arrow, }; Ok(protobuf::LogicalPlanNode { diff --git a/datafusion/src/datasource/file_format/arrow_file.rs b/datafusion/src/datasource/file_format/arrow_file.rs new file mode 100644 index 000000000000..fced624ddc23 --- /dev/null +++ b/datafusion/src/datasource/file_format/arrow_file.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Apache Arrow format abstractions + +use std::any::Any; +use std::io::{Read, Seek}; +use std::sync::Arc; + +use arrow::datatypes::Schema; +use arrow::ipc::reader::FileReader; +use arrow::{self, datatypes::SchemaRef}; +use async_trait::async_trait; +use futures::StreamExt; + +use super::FileFormat; +use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; +use crate::error::Result; +use crate::logical_plan::Expr; +use crate::physical_plan::file_format::{ArrowExec, FileScanConfig}; +use crate::physical_plan::ExecutionPlan; +use crate::physical_plan::Statistics; + +/// The default file extension of arrow files +pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow"; +/// Arrow `FileFormat` implementation. +#[derive(Default, Debug)] +pub struct ArrowFormat; + +#[async_trait] +impl FileFormat for ArrowFormat { + fn as_any(&self) -> &dyn Any { + self + } + + async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { + let mut schemas = vec![]; + while let Some(obj_reader) = readers.next().await { + let mut reader = obj_reader?.sync_reader()?; + let schema = read_arrow_schema_from_reader(&mut reader)?; + schemas.push(schema.as_ref().clone()); + } + let merged_schema = Schema::try_merge(schemas)?; + Ok(Arc::new(merged_schema)) + } + + async fn infer_stats(&self, _reader: Arc) -> Result { + Ok(Statistics::default()) + } + + async fn create_physical_plan( + &self, + conf: FileScanConfig, + _filters: &[Expr], + ) -> Result> { + let exec = ArrowExec::new(conf); + Ok(Arc::new(exec)) + } +} + +fn read_arrow_schema_from_reader(reader: R) -> Result { + let reader = FileReader::try_new(reader)?; + Ok(reader.schema()) +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::DataType; + + use crate::datasource::{ + file_format::FileFormat, object_store::local::local_object_reader_stream, + }; + + use super::ArrowFormat; + + #[tokio::test] + async fn test_schema() { + let filename = "tests/example.arrow"; + let format = ArrowFormat {}; + let file_schema = format + .infer_schema(local_object_reader_stream(vec![filename.to_owned()])) + .await + .expect("Schema inference"); + assert_eq!( + vec!["f0", "f1", "f2"], + file_schema + .fields() + .iter() + .map(|x| x.name().clone()) + .collect::>() + ); + + assert_eq!( + vec![DataType::Int64, DataType::Utf8, DataType::Boolean], + file_schema + .fields() + .iter() + .map(|x| x.data_type().clone()) + .collect::>() + ); + } +} diff --git a/datafusion/src/datasource/file_format/mod.rs b/datafusion/src/datasource/file_format/mod.rs index 21da2e1e6a27..94226e2c3b94 100644 --- a/datafusion/src/datasource/file_format/mod.rs +++ b/datafusion/src/datasource/file_format/mod.rs @@ -16,7 +16,7 @@ // under the License. //! Module containing helper methods for the various file formats - +pub mod arrow_file; pub mod avro; pub mod csv; pub mod json; diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 4afb2f54c3ab..c3f427efa19d 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -18,7 +18,6 @@ //! Parquet format abstractions use std::any::Any; -use std::io::Read; use std::sync::Arc; use arrow::datatypes::Schema; @@ -40,6 +39,7 @@ use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::{DataType, Field}; +use crate::datasource::object_store::ReadSeek; use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::DataFusionError; @@ -346,7 +346,7 @@ impl Length for ChunkObjectReader { } impl ChunkReader for ChunkObjectReader { - type T = Box; + type T = Box; fn get_read(&self, start: u64, length: usize) -> ParquetResult { self.0 diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 3fbd6c12397d..b6cf8967d009 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -27,7 +27,7 @@ use crate::{ datasource::file_format::avro::AvroFormat, datasource::file_format::csv::CsvFormat, datasource::file_format::json::JsonFormat, - datasource::file_format::parquet::ParquetFormat, + datasource::file_format::{arrow_file::ArrowFormat, parquet::ParquetFormat}, error::{DataFusionError, Result}, logical_plan::Expr, physical_plan::{ @@ -95,6 +95,7 @@ impl ListingTableConfig { "csv" => Ok(Arc::new(CsvFormat::default())), "json" => Ok(Arc::new(JsonFormat::default())), "parquet" => Ok(Arc::new(ParquetFormat::default())), + "arrow" => Ok(Arc::new(ArrowFormat::default())), _ => Err(DataFusionError::Internal(format!( "Unable to infer file type from suffix {}", suffix diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index edfe5e2cecd6..9ff1635a3d12 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -18,7 +18,7 @@ //! Object store that represents the Local File System. use std::fs::{self, File, Metadata}; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::{BufReader, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; @@ -30,7 +30,7 @@ use crate::datasource::object_store::{ use crate::datasource::PartitionedFile; use crate::error::{DataFusionError, Result}; -use super::{ObjectReaderStream, SizedFile}; +use super::{ObjectReaderStream, ReadSeek, SizedFile}; #[derive(Debug)] /// Local File System as Object Store. @@ -85,15 +85,14 @@ impl ObjectReader for LocalFileReader { fn sync_chunk_reader( &self, start: u64, - length: usize, - ) -> Result> { + _length: usize, + ) -> Result> { // A new file descriptor is opened for each chunk reader. // This okay because chunks are usually fairly large. let mut file = File::open(&self.file.path)?; file.seek(SeekFrom::Start(start))?; - let file = BufReader::new(file.take(length as u64)); - + let file = BufReader::new(file); Ok(Box::new(file)) } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index aad70e70a308..e4156ee9daea 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -22,7 +22,7 @@ pub mod local; use parking_lot::RwLock; use std::collections::HashMap; use std::fmt::{self, Debug}; -use std::io::Read; +use std::io::{BufReader, Read, Seek}; use std::pin::Pin; use std::sync::Arc; @@ -34,6 +34,10 @@ use local::LocalFileSystem; use crate::error::{DataFusionError, Result}; +/// Trait Combining Read and Seek +pub trait ReadSeek: Read + Seek {} +impl ReadSeek for BufReader {} + /// Object Reader for one file in an object store. /// /// Note that the dynamic dispatch on the reader might @@ -49,10 +53,10 @@ pub trait ObjectReader: Send + Sync { &self, start: u64, length: usize, - ) -> Result>; + ) -> Result>; /// Get reader for the entire file - fn sync_reader(&self) -> Result> { + fn sync_reader(&self) -> Result> { self.sync_chunk_reader(0, self.length() as usize) } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 5a913e91b0f8..168f2a85ddf7 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -21,7 +21,10 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, information_schema::CatalogWithInformationSchema, }, - datasource::listing::{ListingOptions, ListingTable}, + datasource::{ + file_format::arrow_file::{ArrowFormat, DEFAULT_ARROW_EXTENSION}, + listing::{ListingOptions, ListingTable}, + }, datasource::{ file_format::{ avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}, @@ -98,7 +101,7 @@ use parquet::file::properties::WriterProperties; use super::{ disk_manager::DiskManagerConfig, memory_manager::MemoryManagerConfig, - options::{AvroReadOptions, CsvReadOptions}, + options::{ArrowReadOptions, AvroReadOptions, CsvReadOptions}, DiskManager, MemoryManager, }; @@ -233,6 +236,10 @@ impl ExecutionContext { Arc::new(AvroFormat::default()) as Arc, DEFAULT_AVRO_EXTENSION, )), + FileType::Arrow => Ok(( + Arc::new(ArrowFormat::default()) as Arc, + DEFAULT_ARROW_EXTENSION, + )), _ => Err(DataFusionError::NotImplemented(format!( "Unsupported file type {:?}.", file_type @@ -354,7 +361,6 @@ impl ExecutionContext { } /// Creates a DataFrame for reading an Avro data source. - pub async fn read_avro( &mut self, uri: impl Into, @@ -377,6 +383,29 @@ impl ExecutionContext { ))) } + /// Creates a DataFrame for reading an Arrow data source. + pub async fn read_arrow( + &mut self, + uri: impl Into, + options: ArrowReadOptions<'_>, + ) -> Result> { + let uri: String = uri.into(); + let (object_store, path) = self.object_store(&uri)?; + let target_partitions = self.state.lock().config.target_partitions; + Ok(Arc::new(DataFrameImpl::new( + self.state.clone(), + &LogicalPlanBuilder::scan_arrow( + object_store, + path, + options, + None, + target_partitions, + ) + .await? + .build()?, + ))) + } + /// Creates an empty DataFrame. pub fn read_empty(&self) -> Result> { Ok(Arc::new(DataFrameImpl::new( @@ -508,6 +537,22 @@ impl ExecutionContext { Ok(()) } + /// Registers an Arrow data source so that it can be referenced from SQL statements + /// executed against this context. + pub async fn register_arrow( + &mut self, + name: &str, + uri: &str, + options: ArrowReadOptions<'_>, + ) -> Result<()> { + let listing_options = + options.to_listing_options(self.state.lock().config.target_partitions); + + self.register_listing_table(name, uri, listing_options, None) + .await?; + Ok(()) + } + /// Registers an Avro data source so that it can be referenced from SQL statements /// executed against this context. pub async fn register_avro( diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs index 79b07536acb3..8bef1745bb59 100644 --- a/datafusion/src/execution/options.rs +++ b/datafusion/src/execution/options.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; +use crate::datasource::file_format::arrow_file::ArrowFormat; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::{ file_format::{avro::AvroFormat, csv::CsvFormat}, @@ -155,6 +156,41 @@ impl<'a> AvroReadOptions<'a> { } } +/// Arrow read options +#[derive(Clone)] +pub struct ArrowReadOptions<'a> { + /// The data source schema. + pub schema: Option, + + /// File extension; only files with this extension are selected for data input. + /// Defaults to ".arrow". + pub file_extension: &'a str, +} + +impl<'a> Default for ArrowReadOptions<'a> { + fn default() -> Self { + Self { + schema: None, + file_extension: ".arrow", + } + } +} + +impl<'a> ArrowReadOptions<'a> { + /// Helper to convert these user facing options to `ListingTable` options + pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { + let file_format = ArrowFormat::default(); + + ListingOptions { + format: Arc::new(file_format), + collect_stat: false, + file_extension: self.file_extension.to_owned(), + target_partitions, + table_partition_cols: vec![], + } + } +} + /// Line-delimited JSON read options #[derive(Clone)] pub struct NdJsonReadOptions<'a> { diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 0144b75166ab..eb1d46175c9f 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -17,13 +17,6 @@ //! This module provides a builder for creating LogicalPlans -use crate::datasource::{ - empty::EmptyTable, - file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, - listing::{ListingOptions, ListingTable, ListingTableConfig}, - object_store::ObjectStore, - MemTable, TableProvider, -}; use crate::error::{DataFusionError, Result}; use crate::logical_plan::expr_schema::ExprSchemable; use crate::logical_plan::plan::{ @@ -33,6 +26,16 @@ use crate::logical_plan::plan::{ use crate::optimizer::utils; use crate::prelude::*; use crate::scalar::ScalarValue; +use crate::{ + datasource::{ + empty::EmptyTable, + file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, + listing::{ListingOptions, ListingTable, ListingTableConfig}, + object_store::ObjectStore, + MemTable, TableProvider, + }, + execution::options::ArrowReadOptions, +}; use arrow::{ datatypes::{DataType, Schema, SchemaRef}, record_batch::RecordBatch, @@ -352,6 +355,55 @@ impl LogicalPlanBuilder { Self::scan(table_name, Arc::new(provider), projection) } + /// Scan an Arrow data source + pub async fn scan_arrow( + object_store: Arc, + path: impl Into, + options: ArrowReadOptions<'_>, + projection: Option>, + target_partitions: usize, + ) -> Result { + let path = path.into(); + Self::scan_arrow_with_name( + object_store, + path.clone(), + options, + projection, + path, + target_partitions, + ) + .await + } + + /// Scan an Arrow data source and register it with a given table name + pub async fn scan_arrow_with_name( + object_store: Arc, + path: impl Into, + options: ArrowReadOptions<'_>, + projection: Option>, + table_name: impl Into, + target_partitions: usize, + ) -> Result { + let listing_options = options.to_listing_options(target_partitions); + + let path: String = path.into(); + + let resolved_schema = match options.schema { + Some(s) => s, + None => { + listing_options + .infer_schema(Arc::clone(&object_store), &path) + .await? + } + }; + let config = ListingTableConfig::new(object_store, path) + .with_listing_options(listing_options) + .with_schema(resolved_schema); + let provider = ListingTable::try_new(config)?; + + Self::scan(table_name, Arc::new(provider), projection) + } + /// Scan an empty data source, mainly used in tests pub fn scan_empty( name: Option<&str>, diff --git a/datafusion/src/physical_plan/file_format/arrow_file.rs b/datafusion/src/physical_plan/file_format/arrow_file.rs new file mode 100644 index 000000000000..2684fcbe5f7a --- /dev/null +++ b/datafusion/src/physical_plan/file_format/arrow_file.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for reading Arrow files + +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use crate::{ + error::{DataFusionError, Result}, + physical_plan::{ + file_format::FileScanConfig, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + Statistics, + }, +}; + +use arrow::{datatypes::SchemaRef, error::ArrowError}; +use datafusion_physical_expr::PhysicalSortExpr; +use log::debug; + +use fmt::Debug; + +use crate::execution::runtime_env::RuntimeEnv; +use async_trait::async_trait; + +use super::file_stream::{BatchIter, FileStream}; + +/// Execution plan for scanning one or more Arrow partitions +#[derive(Debug, Clone)] +pub struct ArrowExec { + base_config: FileScanConfig, + projected_statistics: Statistics, + projected_schema: SchemaRef, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, +} + +/// Stores metrics about the arrow execution for a particular arrow file +#[derive(Debug, Clone)] +struct ArrowFileMetrics {} + +impl ArrowExec { + /// Create a new Arrow reader execution plan provided file list and schema. + /// Even if `limit` is set, Arrow rounds up the number of records to the next `batch_size`. + pub fn new(base_config: FileScanConfig) -> Self { + debug!( + "Creating ArrowExec, files: {:?}, projection {:?}, limit: {:?}", + base_config.file_groups, base_config.projection, base_config.limit + ); + + let metrics = ExecutionPlanMetricsSet::new(); + + let (projected_schema, projected_statistics) = base_config.project(); + + Self { + base_config, + projected_schema, + projected_statistics, + metrics, + } + } + + /// Ref to the base configs + pub fn base_config(&self) -> &FileScanConfig { + &self.base_config + } +} + +#[async_trait] +impl ExecutionPlan for ArrowExec { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.projected_schema) + } + + fn children(&self) -> Vec> { + // this is a leaf node and has no children + vec![] + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn relies_on_input_order(&self) -> bool { + false + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + if children.is_empty() { + Ok(Arc::new(self.clone())) + } else { + Err(DataFusionError::Internal(format!( + "Children cannot be replaced in {:?}", + self + ))) + } + } + + async fn execute( + &self, + partition_index: usize, + _runtime: Arc, + ) -> Result { + let fun = move |file, _remaining: &Option| { + let arrow_reader = arrow::ipc::reader::FileReader::try_new(file); + + match arrow_reader { + Ok(r) => Box::new(r) as BatchIter, + Err(e) => Box::new( + vec![Err(ArrowError::ExternalError(Box::new(e)))].into_iter(), + ), + } + }; + + Ok(Box::pin(FileStream::new( + Arc::clone(&self.base_config.object_store), + self.base_config.file_groups[partition_index].clone(), + fun, + Arc::clone(&self.projected_schema), + self.base_config.limit, + self.base_config.table_partition_cols.clone(), + ))) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "ArrowExec: limit={:?}, partitions={}", + self.base_config.limit, + super::FileGroupsDisplay(&self.base_config.file_groups) + ) + } + } + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Statistics { + self.projected_statistics.clone() + } +} + +#[cfg(test)] +mod tests {} diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index 958b1721bb39..0db64141c4d0 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -22,7 +22,10 @@ //! compliant with the `SendableRecordBatchStream` trait. use crate::{ - datasource::{object_store::ObjectStore, PartitionedFile}, + datasource::{ + object_store::{ObjectStore, ReadSeek}, + PartitionedFile, + }, physical_plan::RecordBatchStream, scalar::ScalarValue, }; @@ -33,7 +36,6 @@ use arrow::{ }; use futures::Stream; use std::{ - io::Read, iter, pin::Pin, sync::Arc, @@ -48,12 +50,15 @@ pub type BatchIter = Box> + Send + /// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object /// and an optional number of required records. pub trait FormatReaderOpener: - FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static + FnMut(Box, &Option) -> BatchIter + + Send + + Unpin + + 'static { } impl FormatReaderOpener for T where - T: FnMut(Box, &Option) -> BatchIter + T: FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 7658addd3561..b3c5644b2835 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -17,6 +17,7 @@ //! Execution plans that read file formats +mod arrow_file; mod avro; mod csv; mod file_stream; @@ -31,6 +32,7 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; +pub use arrow_file::ArrowExec; pub use avro::AvroExec; pub use csv::CsvExec; pub use json::NdJsonExec; diff --git a/datafusion/src/sql/parser.rs b/datafusion/src/sql/parser.rs index 335257764442..805c6d69ae46 100644 --- a/datafusion/src/sql/parser.rs +++ b/datafusion/src/sql/parser.rs @@ -45,6 +45,8 @@ pub enum FileType { CSV, /// Avro binary records Avro, + /// Arrow data + Arrow, } impl FromStr for FileType { @@ -56,8 +58,9 @@ impl FromStr for FileType { "NDJSON" => Ok(Self::NdJson), "CSV" => Ok(Self::CSV), "AVRO" => Ok(Self::Avro), + "ARROW" => Ok(Self::Arrow), other => Err(ParserError::ParserError(format!( - "expect one of PARQUET, AVRO, NDJSON, or CSV, found: {}", + "expect one of PARQUET, AVRO, NDJSON, CSV or ARROW, found: {}", other ))), } @@ -430,7 +433,7 @@ mod tests { // Error cases: Invalid type let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'"; - expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV"); + expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, CSV or ARROW"); Ok(()) } diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 8b59ccdca644..ca56eac8adac 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -325,6 +325,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } FileType::NdJson => {} FileType::Avro => {} + FileType::Arrow => {} }; let schema = self.build_schema(columns)?; diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index e93b4cd2d410..65730d6a568a 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -16,15 +16,12 @@ // under the License. //! Object store implem used for testing -use std::{ - io, - io::{Cursor, Read}, - sync::Arc, -}; +use std::{io, io::Cursor, sync::Arc}; use crate::{ datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, + FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, ReadSeek, + SizedFile, }, error::{DataFusionError, Result}, }; @@ -94,6 +91,7 @@ impl ObjectStore for TestObjectStore { } } } +impl> ReadSeek for Cursor {} struct EmptyObjectReader(u64); @@ -111,7 +109,7 @@ impl ObjectReader for EmptyObjectReader { &self, _start: u64, _length: usize, - ) -> Result> { + ) -> Result> { Ok(Box::new(Cursor::new(vec![0; self.0 as usize]))) } diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs index 8ee0298f72ce..5a1cddec3eac 100644 --- a/datafusion/src/test_util.rs +++ b/datafusion/src/test_util.rs @@ -21,7 +21,6 @@ use std::collections::BTreeMap; use std::{env, error::Error, path::PathBuf, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - /// Compares formatted output of a record batch with an expected /// vector of strings, with the result of pretty formatting record /// batches. This is a macro so errors appear on the correct line @@ -38,7 +37,7 @@ macro_rules! assert_batches_eq { let expected_lines: Vec = $EXPECTED_LINES.iter().map(|&s| s.into()).collect(); - let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) + let formatted = ::arrow::util::pretty::pretty_format_batches($CHUNKS) .unwrap() .to_string(); diff --git a/datafusion/tests/example.arrow b/datafusion/tests/example.arrow new file mode 100644 index 000000000000..f91b501631a0 Binary files /dev/null and b/datafusion/tests/example.arrow differ diff --git a/datafusion/tests/sql/arrow_files.rs b/datafusion/tests/sql/arrow_files.rs new file mode 100644 index 000000000000..99f4d699e20d --- /dev/null +++ b/datafusion/tests/sql/arrow_files.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::execution::options::ArrowReadOptions; + +use super::*; + +async fn register_arrow(ctx: &mut ExecutionContext) { + ctx.register_arrow( + "arrow_simple", + "tests/example.arrow", + ArrowReadOptions::default(), + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn arrow_query() { + let mut ctx = ExecutionContext::new(); + register_arrow(&mut ctx).await; + let sql = "SELECT * FROM arrow_simple"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----+-----+-------+", + "| f0 | f1 | f2 |", + "+----+-----+-------+", + "| 1 | foo | true |", + "| 2 | bar | |", + "| 3 | baz | false |", + "| 4 | | true |", + "+----+-----+-------+", + ]; + + assert_batches_eq!(expected, &actual); +} + +#[tokio::test] +async fn arrow_explain() { + let mut ctx = ExecutionContext::new(); + register_arrow(&mut ctx).await; + let sql = "EXPLAIN SELECT * FROM arrow_simple"; + let actual = execute(&mut ctx, sql).await; + let actual = normalize_vec_for_explain(actual); + let expected = vec![ + vec![ + "logical_plan", + "Projection: #arrow_simple.f0, #arrow_simple.f1, #arrow_simple.f2\ + \n TableScan: arrow_simple projection=Some([0, 1, 2])", + ], + vec![ + "physical_plan", + "ProjectionExec: expr=[f0@0 as f0, f1@1 as f1, f2@2 as f2]\ + \n ArrowExec: limit=None, partitions=[tests/example.arrow]\n", + ], + ]; + + assert_eq!(expected, actual); +} diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index a548d619d635..8732c4fa460f 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -74,6 +74,7 @@ macro_rules! test_expression { } pub mod aggregates; +pub mod arrow_files; #[cfg(feature = "avro")] pub mod avro; pub mod create_drop;