From 827072d783f2c5fe42c3cb232b1109ce819b4d45 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 26 Aug 2021 23:06:43 +0800 Subject: [PATCH 01/12] Object Store API to read from remote storage systems --- .../src/serde/physical_plan/from_proto.rs | 2 + datafusion/src/datasource/mod.rs | 1 + .../src/datasource/object_store/local.rs | 128 ++++++++++++++++++ datafusion/src/datasource/object_store/mod.rs | 127 +++++++++++++++++ datafusion/src/execution/context.rs | 28 ++++ 5 files changed, 286 insertions(+) create mode 100644 datafusion/src/datasource/object_store/local.rs create mode 100644 datafusion/src/datasource/object_store/mod.rs diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index d37940e71a49..4a2f4a2fbb76 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -34,6 +34,7 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::catalog::catalog::{ CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, }; +use datafusion::datasource::object_store::ObjectStoreRegistry; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, }; @@ -627,6 +628,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { aggregate_functions: Default::default(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), }; let fun_expr = functions::create_physical_fun( diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9699a997caa1..4e28f04385e5 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -22,6 +22,7 @@ pub mod datasource; pub mod empty; pub mod json; pub mod memory; +pub mod object_store; pub mod parquet; pub use self::csv::{CsvFile, CsvReadOptions}; diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs new file mode 100644 index 000000000000..53c551c0c156 --- /dev/null +++ b/datafusion/src/datasource/object_store/local.rs @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object store that represents the Local File System. + +use async_trait::async_trait; +use futures::{stream, AsyncRead, StreamExt}; +use std::sync::Arc; + +use crate::datasource::object_store::{ + FileMeta, FileMetaStream, ObjectReader, ObjectStore, +}; +use crate::error::DataFusionError; +use crate::error::Result; +use std::fs::Metadata; + +#[derive(Debug)] +/// Local File System as Object Store. +pub struct LocalFileSystem; + +#[async_trait] +impl ObjectStore for LocalFileSystem { + async fn list(&self, prefix: &str) -> Result { + list_all(prefix.to_owned()).await + } + + async fn get_reader(&self, file: FileMeta) -> Result> { + Ok(Arc::new(LocalFileReader::new(file)?)) + } +} + +struct LocalFileReader { + file: FileMeta, +} + +impl LocalFileReader { + fn new(file: FileMeta) -> Result { + Ok(Self { file }) + } +} + +#[async_trait] +impl ObjectReader for LocalFileReader { + async fn get_reader( + &self, + _start: u64, + _length: usize, + ) -> Result> { + todo!() + } + + async fn length(&self) -> Result { + match self.file.size { + Some(size) => Ok(size), + None => Ok(0u64), + } + } +} + +async fn list_all(prefix: String) -> Result { + fn get_meta(path: String, metadata: Metadata) -> FileMeta { + FileMeta { + path, + last_modified: metadata.modified().map(chrono::DateTime::from).ok(), + size: Some(metadata.len()), + } + } + + async fn find_files_in_dir( + path: String, + to_visit: &mut Vec, + ) -> Result> { + let mut dir = tokio::fs::read_dir(path).await?; + let mut files = Vec::new(); + + while let Some(child) = dir.next_entry().await? { + if let Some(child_path) = child.path().to_str() { + let metadata = child.metadata().await?; + if metadata.is_dir() { + to_visit.push(child_path.to_string()); + } else { + files.push(get_meta(child_path.to_owned(), metadata)) + } + } else { + return Err(DataFusionError::Plan("Invalid path".to_string())); + } + } + Ok(files) + } + + let prefix_meta = tokio::fs::metadata(&prefix).await?; + let prefix = prefix.to_owned(); + if prefix_meta.is_file() { + Ok(Box::pin(stream::once(async move { + Ok(get_meta(prefix, prefix_meta)) + }))) + } else { + let result = stream::unfold(vec![prefix], move |mut to_visit| async move { + match to_visit.pop() { + None => None, + Some(path) => { + let file_stream = match find_files_in_dir(path, &mut to_visit).await { + Ok(files) => stream::iter(files).map(Ok).left_stream(), + Err(e) => stream::once(async { Err(e) }).right_stream(), + }; + + Some((file_stream, to_visit)) + } + } + }) + .flatten(); + Ok(Box::pin(result)) + } +} diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs new file mode 100644 index 000000000000..925537a05a02 --- /dev/null +++ b/datafusion/src/datasource/object_store/mod.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object Store abstracts access to an underlying file/object storage. + +pub mod local; + +use std::collections::HashMap; +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use futures::{AsyncRead, Stream}; + +use local::LocalFileSystem; + +use crate::error::{DataFusionError, Result}; +use chrono::Utc; + +/// Object Reader for one file in a object store +#[async_trait] +pub trait ObjectReader { + /// Get reader for a part [start, start + length] in the file asynchronously + async fn get_reader(&self, start: u64, length: usize) -> Result>; + + /// Get length for the file asynchronously + async fn length(&self) -> Result; +} + +/// File meta we got from object store +pub struct FileMeta { + /// Path of the file + pub path: String, + /// Last time the file was modified in UTC + pub last_modified: Option>, + /// File size in total + pub size: Option, +} + +/// Stream of files get listed from object store +pub type FileMetaStream = + Pin> + Send + Sync + 'static>>; + +/// A ObjectStore abstracts access to an underlying file/object storage. +/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes +#[async_trait] +pub trait ObjectStore: Sync + Send + Debug { + /// Returns all the files in path `prefix` asynchronously + async fn list(&self, prefix: &str) -> Result; + + /// Get object reader for one file + async fn get_reader(&self, file: FileMeta) -> Result>; +} + +static LOCAL_SCHEME: &str = "file"; + +/// A Registry holds all the object stores at runtime with a scheme for each store. +/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS +/// and query data inside these systems. +pub struct ObjectStoreRegistry { + /// A map from scheme to object store that serve list / read operations for the store + pub object_stores: RwLock>>, +} + +impl ObjectStoreRegistry { + /// Create the registry that object stores can registered into. + /// ['LocalFileSystem'] store is registered in by default to support read local files natively. + pub fn new() -> Self { + let mut map: HashMap> = HashMap::new(); + map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + + Self { + object_stores: RwLock::new(map), + } + } + + /// Adds a new store to this registry. + /// If a store of the same prefix existed before, it is replaced in the registry and returned. + pub fn register_store( + &self, + scheme: String, + store: Arc, + ) -> Option> { + let mut stores = self.object_stores.write().unwrap(); + stores.insert(scheme, store) + } + + /// Get the store registered for scheme + pub fn get(&self, scheme: &str) -> Option> { + let stores = self.object_stores.read().unwrap(); + stores.get(scheme).cloned() + } + + /// Get a suitable store for the URI based on it's scheme. For example: + /// URI with scheme file or no schema will return the default LocalFS store, + /// URI with scheme s3 will return the S3 store if it's registered. + pub fn get_by_uri(&self, uri: &str) -> Result> { + if let Some((scheme, _)) = uri.split_once(':') { + let stores = self.object_stores.read().unwrap(); + if let Some(store) = stores.get(&*scheme.to_lowercase()) { + Ok(store.clone()) + } else { + Err(DataFusionError::Internal(format!( + "No suitable object store found for {}", + scheme + ))) + } + } else { + Ok(Arc::new(LocalFileSystem)) + } + } +} diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 2e6a7a4f7012..54c7ce8be280 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -49,6 +49,7 @@ use crate::catalog::{ ResolvedTableReference, TableReference, }; use crate::datasource::csv::CsvFile; +use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry}; use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -164,6 +165,7 @@ impl ExecutionContext { aggregate_functions: HashMap::new(), config, execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), })), } } @@ -363,6 +365,29 @@ impl ExecutionContext { self.state.lock().unwrap().catalog_list.catalog(name) } + /// Registers a object store with scheme using a custom `ObjectStore` so that + /// an external file system or object storage system could be used against this context. + /// + /// Returns the `ObjectStore` previously registered for this scheme, if any + pub fn register_object_store( + &self, + scheme: impl Into, + object_store: Arc, + ) -> Option> { + let scheme = scheme.into(); + + self.state + .lock() + .unwrap() + .object_store_registry + .register_store(scheme, object_store) + } + + /// Retrieves a `ObjectStore` instance by scheme + pub fn object_store(&self, scheme: &str) -> Option> { + self.state.lock().unwrap().object_store_registry.get(scheme) + } + /// Registers a table using a custom `TableProvider` so that /// it can be referenced from SQL statements executed against this /// context. @@ -849,6 +874,8 @@ pub struct ExecutionContextState { pub config: ExecutionConfig, /// Execution properties pub execution_props: ExecutionProps, + /// Object Store that are registered with the context + pub object_store_registry: Arc, } impl ExecutionProps { @@ -876,6 +903,7 @@ impl ExecutionContextState { aggregate_functions: HashMap::new(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), + object_store_registry: Arc::new(ObjectStoreRegistry::new()), } } From 9275bdf65825e294c755c42ec9ee146bf2813087 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 26 Aug 2021 23:36:34 +0800 Subject: [PATCH 02/12] add tokio fs --- datafusion/Cargo.toml | 2 +- datafusion/src/datasource/object_store/local.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 86eb64cf873e..23a6d72e5713 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -58,7 +58,7 @@ chrono = "0.4" async-trait = "0.1.41" futures = "0.3" pin-project-lite= "^0.2.0" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] } tokio-stream = "0.1" log = "^0.4" md-5 = { version = "^0.9.1", optional = true } diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 53c551c0c156..865112d88634 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -17,16 +17,17 @@ //! Object store that represents the Local File System. +use std::fs::Metadata; +use std::sync::Arc; + use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; -use std::sync::Arc; use crate::datasource::object_store::{ FileMeta, FileMetaStream, ObjectReader, ObjectStore, }; use crate::error::DataFusionError; use crate::error::Result; -use std::fs::Metadata; #[derive(Debug)] /// Local File System as Object Store. From 81eab1ab0908b57be6896d9e05de0a4eda0386eb Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 27 Aug 2021 23:06:24 +0800 Subject: [PATCH 03/12] resolve comments --- .../src/datasource/object_store/local.rs | 15 +++++----- datafusion/src/datasource/object_store/mod.rs | 29 ++++++++++--------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 865112d88634..bfc601513ef3 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -28,6 +28,7 @@ use crate::datasource::object_store::{ }; use crate::error::DataFusionError; use crate::error::Result; +use crate::logical_plan::Expr; #[derive(Debug)] /// Local File System as Object Store. @@ -35,11 +36,12 @@ pub struct LocalFileSystem; #[async_trait] impl ObjectStore for LocalFileSystem { - async fn list(&self, prefix: &str) -> Result { + async fn list(&self, prefix: &str, _filters: &[Expr]) -> Result { + // TODO handle pushed down filters list_all(prefix.to_owned()).await } - async fn get_reader(&self, file: FileMeta) -> Result> { + fn get_reader(&self, file: FileMeta) -> Result> { Ok(Arc::new(LocalFileReader::new(file)?)) } } @@ -64,11 +66,8 @@ impl ObjectReader for LocalFileReader { todo!() } - async fn length(&self) -> Result { - match self.file.size { - Some(size) => Ok(size), - None => Ok(0u64), - } + fn length(&self) -> Result { + Ok(self.file.size) } } @@ -77,7 +76,7 @@ async fn list_all(prefix: String) -> Result { FileMeta { path, last_modified: metadata.modified().map(chrono::DateTime::from).ok(), - size: Some(metadata.len()), + size: metadata.len(), } } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index 925537a05a02..c42427336e47 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -30,6 +30,7 @@ use futures::{AsyncRead, Stream}; use local::LocalFileSystem; use crate::error::{DataFusionError, Result}; +use crate::logical_plan::Expr; use chrono::Utc; /// Object Reader for one file in a object store @@ -39,7 +40,7 @@ pub trait ObjectReader { async fn get_reader(&self, start: u64, length: usize) -> Result>; /// Get length for the file asynchronously - async fn length(&self) -> Result; + fn length(&self) -> Result; } /// File meta we got from object store @@ -49,7 +50,7 @@ pub struct FileMeta { /// Last time the file was modified in UTC pub last_modified: Option>, /// File size in total - pub size: Option, + pub size: u64, } /// Stream of files get listed from object store @@ -60,11 +61,12 @@ pub type FileMetaStream = /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes #[async_trait] pub trait ObjectStore: Sync + Send + Debug { - /// Returns all the files in path `prefix` asynchronously - async fn list(&self, prefix: &str) -> Result; + /// Returns all the files in path `prefix` asynchronously, implementations could + /// choose to use the pushed down filters for list filtering. + async fn list(&self, prefix: &str, filters: &[Expr]) -> Result; /// Get object reader for one file - async fn get_reader(&self, file: FileMeta) -> Result>; + fn get_reader(&self, file: FileMeta) -> Result>; } static LOCAL_SCHEME: &str = "file"; @@ -112,14 +114,15 @@ impl ObjectStoreRegistry { pub fn get_by_uri(&self, uri: &str) -> Result> { if let Some((scheme, _)) = uri.split_once(':') { let stores = self.object_stores.read().unwrap(); - if let Some(store) = stores.get(&*scheme.to_lowercase()) { - Ok(store.clone()) - } else { - Err(DataFusionError::Internal(format!( - "No suitable object store found for {}", - scheme - ))) - } + stores + .get(&*scheme.to_lowercase()) + .map(Clone::clone) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {}", + scheme + )) + }) } else { Ok(Arc::new(LocalFileSystem)) } From f4ed233397210dd74b84a10721460edd4979987c Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 27 Aug 2021 23:15:27 +0800 Subject: [PATCH 04/12] fmt --- datafusion/src/datasource/object_store/local.rs | 5 ++--- datafusion/src/datasource/object_store/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index bfc601513ef3..c983bc39ca16 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -37,7 +37,6 @@ pub struct LocalFileSystem; #[async_trait] impl ObjectStore for LocalFileSystem { async fn list(&self, prefix: &str, _filters: &[Expr]) -> Result { - // TODO handle pushed down filters list_all(prefix.to_owned()).await } @@ -66,8 +65,8 @@ impl ObjectReader for LocalFileReader { todo!() } - fn length(&self) -> Result { - Ok(self.file.size) + fn length(&self) -> u64 { + self.file.size } } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index c42427336e47..1ca4a58f5593 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -39,8 +39,8 @@ pub trait ObjectReader { /// Get reader for a part [start, start + length] in the file asynchronously async fn get_reader(&self, start: u64, length: usize) -> Result>; - /// Get length for the file asynchronously - fn length(&self) -> Result; + /// Get length for the file + fn length(&self) -> u64; } /// File meta we got from object store From eb6627877f4a0003a5ba05c3d173a1c8187e449e Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 28 Aug 2021 19:43:56 +0800 Subject: [PATCH 05/12] resolve comments --- .../src/datasource/object_store/local.rs | 45 +++++++++++++++++-- datafusion/src/datasource/object_store/mod.rs | 12 ++--- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index c983bc39ca16..cc63f1f21403 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -28,7 +28,6 @@ use crate::datasource::object_store::{ }; use crate::error::DataFusionError; use crate::error::Result; -use crate::logical_plan::Expr; #[derive(Debug)] /// Local File System as Object Store. @@ -36,11 +35,11 @@ pub struct LocalFileSystem; #[async_trait] impl ObjectStore for LocalFileSystem { - async fn list(&self, prefix: &str, _filters: &[Expr]) -> Result { + async fn list(&self, prefix: &str) -> Result { list_all(prefix.to_owned()).await } - fn get_reader(&self, file: FileMeta) -> Result> { + fn file_reader(&self, file: FileMeta) -> Result> { Ok(Arc::new(LocalFileReader::new(file)?)) } } @@ -57,7 +56,7 @@ impl LocalFileReader { #[async_trait] impl ObjectReader for LocalFileReader { - async fn get_reader( + async fn chunk_reader( &self, _start: u64, _length: usize, @@ -125,3 +124,41 @@ async fn list_all(prefix: String) -> Result { Ok(Box::pin(result)) } } + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use std::collections::HashSet; + use std::fs::create_dir; + use std::fs::File; + use tempfile::tempdir; + + #[tokio::test] + async fn test_recursive_listing() -> Result<()> { + let tmp = tempdir()?; + create_dir(tmp.path().join("x"))?; + create_dir(tmp.path().join("y"))?; + let a_path = tmp.path().join("a.txt"); + let b_path = tmp.path().join("x/b.txt"); + let c_path = tmp.path().join("y/c.txt"); + File::create(&a_path)?; + File::create(&b_path)?; + File::create(&c_path)?; + + let mut all_files = HashSet::new(); + let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?; + while let Some(file) = files.next().await { + let file = file?; + assert_eq!(file.size, 0); + all_files.insert(file.path); + } + + assert_eq!(all_files.len(), 3); + assert!(all_files.contains(a_path.to_str().unwrap())); + assert!(all_files.contains(b_path.to_str().unwrap())); + assert!(all_files.contains(c_path.to_str().unwrap())); + + Ok(()) + } +} diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index 1ca4a58f5593..5a2186a307c4 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -30,20 +30,21 @@ use futures::{AsyncRead, Stream}; use local::LocalFileSystem; use crate::error::{DataFusionError, Result}; -use crate::logical_plan::Expr; use chrono::Utc; /// Object Reader for one file in a object store #[async_trait] pub trait ObjectReader { /// Get reader for a part [start, start + length] in the file asynchronously - async fn get_reader(&self, start: u64, length: usize) -> Result>; + async fn chunk_reader(&self, start: u64, length: usize) + -> Result>; /// Get length for the file fn length(&self) -> u64; } /// File meta we got from object store +#[derive(Debug)] pub struct FileMeta { /// Path of the file pub path: String, @@ -61,12 +62,11 @@ pub type FileMetaStream = /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes #[async_trait] pub trait ObjectStore: Sync + Send + Debug { - /// Returns all the files in path `prefix` asynchronously, implementations could - /// choose to use the pushed down filters for list filtering. - async fn list(&self, prefix: &str, filters: &[Expr]) -> Result; + /// Returns all the files in path `prefix` asynchronously. + async fn list(&self, prefix: &str) -> Result; /// Get object reader for one file - fn get_reader(&self, file: FileMeta) -> Result>; + fn file_reader(&self, file: FileMeta) -> Result>; } static LOCAL_SCHEME: &str = "file"; From e61b1359cfe4bad2adcb1e953c5aee86ac08a339 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 28 Aug 2021 20:36:46 +0800 Subject: [PATCH 06/12] fix test --- datafusion/src/datasource/object_store/local.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index cc63f1f21403..d7b6c78e3ca5 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -136,12 +136,18 @@ mod tests { #[tokio::test] async fn test_recursive_listing() -> Result<()> { + // tmp/a.txt + // tmp/x/b.txt + // tmp/y/c.txt let tmp = tempdir()?; - create_dir(tmp.path().join("x"))?; - create_dir(tmp.path().join("y"))?; + let x_path = tmp.path().join("x"); + let y_path = tmp.path().join("y"); + create_dir(&x_path)?; + create_dir(&y_path)?; + let a_path = tmp.path().join("a.txt"); - let b_path = tmp.path().join("x/b.txt"); - let c_path = tmp.path().join("y/c.txt"); + let b_path = x_path.join("b.txt"); + let c_path = y_path.join("c.txt"); File::create(&a_path)?; File::create(&b_path)?; File::create(&c_path)?; From 9f1c23daff6e72161a61a6b92b56128f3ccda727 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 28 Aug 2021 21:37:08 +0800 Subject: [PATCH 07/12] rerun --- datafusion/src/datasource/object_store/local.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index d7b6c78e3ca5..744733897d29 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -142,12 +142,11 @@ mod tests { let tmp = tempdir()?; let x_path = tmp.path().join("x"); let y_path = tmp.path().join("y"); - create_dir(&x_path)?; - create_dir(&y_path)?; - let a_path = tmp.path().join("a.txt"); let b_path = x_path.join("b.txt"); let c_path = y_path.join("c.txt"); + create_dir(&x_path)?; + create_dir(&y_path)?; File::create(&a_path)?; File::create(&b_path)?; File::create(&c_path)?; From 40ae475945d5f20f218b12fccba15d89af0b0f9a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 29 Aug 2021 18:28:11 +0800 Subject: [PATCH 08/12] file_reader async --- datafusion/src/datasource/object_store/local.rs | 2 +- datafusion/src/datasource/object_store/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 744733897d29..19a12ec4df6d 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -39,7 +39,7 @@ impl ObjectStore for LocalFileSystem { list_all(prefix.to_owned()).await } - fn file_reader(&self, file: FileMeta) -> Result> { + async fn file_reader(&self, file: FileMeta) -> Result> { Ok(Arc::new(LocalFileReader::new(file)?)) } } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index 5a2186a307c4..fe64cda51aef 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -66,7 +66,7 @@ pub trait ObjectStore: Sync + Send + Debug { async fn list(&self, prefix: &str) -> Result; /// Get object reader for one file - fn file_reader(&self, file: FileMeta) -> Result>; + async fn file_reader(&self, file: FileMeta) -> Result>; } static LOCAL_SCHEME: &str = "file"; From ed840f4b228a5dc0dc083898ae5b5b5287953d9a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 30 Aug 2021 01:28:55 +0800 Subject: [PATCH 09/12] add delimiter option in list --- datafusion/src/datasource/object_store/local.rs | 6 +++++- datafusion/src/datasource/object_store/mod.rs | 9 +++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 19a12ec4df6d..8a0182f0043f 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -35,7 +35,11 @@ pub struct LocalFileSystem; #[async_trait] impl ObjectStore for LocalFileSystem { - async fn list(&self, prefix: &str) -> Result { + async fn list( + &self, + prefix: &str, + _delimiter: Option, + ) -> Result { list_all(prefix.to_owned()).await } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index fe64cda51aef..bbcdfe50eb1b 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -62,8 +62,13 @@ pub type FileMetaStream = /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes #[async_trait] pub trait ObjectStore: Sync + Send + Debug { - /// Returns all the files in path `prefix` asynchronously. - async fn list(&self, prefix: &str) -> Result; + /// Returns all the files in path `prefix`, or all paths between + /// the `prefix` and the first occurrence of the delimiter if it is provided. + async fn list( + &self, + prefix: &str, + delimiter: Option, + ) -> Result; /// Get object reader for one file async fn file_reader(&self, file: FileMeta) -> Result>; From c1aabf44eab36984fadad1b3b0c840be2f7278a3 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 30 Aug 2021 21:29:57 +0800 Subject: [PATCH 10/12] fix fmt --- ballista/rust/core/src/serde/physical_plan/from_proto.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 33372883587b..6aa0fa111921 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -34,8 +34,8 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::catalog::catalog::{ CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, }; -use datafusion::datasource::object_store::ObjectStoreRegistry; use datafusion::datasource::datasource::Statistics; +use datafusion::datasource::object_store::ObjectStoreRegistry; use datafusion::datasource::FilePartition; use datafusion::execution::context::{ ExecutionConfig, ExecutionContextState, ExecutionProps, From 46f54be3f5cf9f544affc68c3aef9d142006a1b8 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 30 Aug 2021 22:08:03 +0800 Subject: [PATCH 11/12] file_reader to sync --- datafusion/src/datasource/object_store/local.rs | 2 +- datafusion/src/datasource/object_store/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 8a0182f0043f..9791832d9fac 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -43,7 +43,7 @@ impl ObjectStore for LocalFileSystem { list_all(prefix.to_owned()).await } - async fn file_reader(&self, file: FileMeta) -> Result> { + fn file_reader(&self, file: FileMeta) -> Result> { Ok(Arc::new(LocalFileReader::new(file)?)) } } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index bbcdfe50eb1b..cca80329ef46 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -71,7 +71,7 @@ pub trait ObjectStore: Sync + Send + Debug { ) -> Result; /// Get object reader for one file - async fn file_reader(&self, file: FileMeta) -> Result>; + fn file_reader(&self, file: FileMeta) -> Result>; } static LOCAL_SCHEME: &str = "file"; From 694ae9cb1707e661bd34cd3b68177a9ee6bb7674 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 31 Aug 2021 12:02:45 +0800 Subject: [PATCH 12/12] An optional list_dir api --- .../src/datasource/object_store/local.rs | 14 +++++++---- datafusion/src/datasource/object_store/mod.rs | 24 +++++++++++++++---- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 9791832d9fac..2b27f6c8f993 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -24,7 +24,7 @@ use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; use crate::datasource::object_store::{ - FileMeta, FileMetaStream, ObjectReader, ObjectStore, + FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, }; use crate::error::DataFusionError; use crate::error::Result; @@ -35,12 +35,16 @@ pub struct LocalFileSystem; #[async_trait] impl ObjectStore for LocalFileSystem { - async fn list( + async fn list_file(&self, prefix: &str) -> Result { + list_all(prefix.to_owned()).await + } + + async fn list_dir( &self, - prefix: &str, + _prefix: &str, _delimiter: Option, - ) -> Result { - list_all(prefix.to_owned()).await + ) -> Result { + todo!() } fn file_reader(&self, file: FileMeta) -> Result> { diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index cca80329ef46..fd25fd43a2e7 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -43,6 +43,15 @@ pub trait ObjectReader { fn length(&self) -> u64; } +/// Represents a file or a prefix that may require further resolution +#[derive(Debug)] +pub enum ListEntry { + /// File metadata + FileMeta(FileMeta), + /// Prefix to be further resolved during partition discovery + Prefix(String), +} + /// File meta we got from object store #[derive(Debug)] pub struct FileMeta { @@ -58,17 +67,24 @@ pub struct FileMeta { pub type FileMetaStream = Pin> + Send + Sync + 'static>>; +/// Stream of list entries get from object store +pub type ListEntryStream = + Pin> + Send + Sync + 'static>>; + /// A ObjectStore abstracts access to an underlying file/object storage. /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes #[async_trait] pub trait ObjectStore: Sync + Send + Debug { - /// Returns all the files in path `prefix`, or all paths between - /// the `prefix` and the first occurrence of the delimiter if it is provided. - async fn list( + /// Returns all the files in path `prefix` + async fn list_file(&self, prefix: &str) -> Result; + + /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, + /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. + async fn list_dir( &self, prefix: &str, delimiter: Option, - ) -> Result; + ) -> Result; /// Get object reader for one file fn file_reader(&self, file: FileMeta) -> Result>;