From 9401d6d80bfdd4e02869f7722efdd02a733de272 Mon Sep 17 00:00:00 2001 From: yahoNanJing <90197956+yahoNanJing@users.noreply.github.com> Date: Thu, 14 Jul 2022 20:51:46 +0800 Subject: [PATCH] Remove datafusion-data-access crate (#2904) Co-authored-by: yangzhong --- Cargo.toml | 1 - datafusion/data-access/Cargo.toml | 41 -- datafusion/data-access/README.md | 28 -- datafusion/data-access/src/lib.rs | 77 ---- .../data-access/src/object_store/local.rs | 368 ------------------ .../data-access/src/object_store/mod.rs | 87 ----- 6 files changed, 602 deletions(-) delete mode 100644 datafusion/data-access/Cargo.toml delete mode 100644 datafusion/data-access/README.md delete mode 100644 datafusion/data-access/src/lib.rs delete mode 100644 datafusion/data-access/src/object_store/local.rs delete mode 100644 datafusion/data-access/src/object_store/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 7a349735b64c..ab3f427e49be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ members = [ "datafusion/common", "datafusion/core", - "datafusion/data-access", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", diff --git a/datafusion/data-access/Cargo.toml b/datafusion/data-access/Cargo.toml deleted file mode 100644 index a0ffcb7434df..000000000000 --- a/datafusion/data-access/Cargo.toml +++ /dev/null @@ -1,41 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "datafusion-data-access" -description = "General data access layer currently mainly based on the object store interfaces" -version = "10.0.0" -homepage = "https://github.com/apache/arrow-datafusion" -repository = "https://github.com/apache/arrow-datafusion" -readme = "README.md" -authors = ["Apache Arrow "] -license = "Apache-2.0" -keywords = ["arrow", "query", "sql"] -edition = "2021" -rust-version = "1.59" - -[lib] -name = "datafusion_data_access" -path = "src/lib.rs" - -[dependencies] -async-trait = "0.1.41" -chrono = { version = "0.4", default-features = false, features = ["std"] } -futures = "0.3" -parking_lot = "0.12" -tempfile = "3" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion/data-access/README.md b/datafusion/data-access/README.md deleted file mode 100644 index 526603f69c07..000000000000 --- a/datafusion/data-access/README.md +++ /dev/null @@ -1,28 +0,0 @@ - - -# DataFusion Data Access Layer - -[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. - -This crate is a submodule of DataFusion that provides an `async` API for accessing data, either remotely or locally. -Currently, it is based on the object store interfaces. In the future, this module may include interfaces for accessing -databases, or streaming data. - -[df]: https://crates.io/crates/datafusion diff --git a/datafusion/data-access/src/lib.rs b/datafusion/data-access/src/lib.rs deleted file mode 100644 index 6e47be8aefb5..000000000000 --- a/datafusion/data-access/src/lib.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod object_store; - -use chrono::{DateTime, Utc}; -use std::{io, result}; - -/// Result type for operations that could result in an io error -pub type Result = result::Result; - -/// Represents a specific file or a prefix (folder) that may -/// require further resolution -#[derive(Debug)] -pub enum ListEntry { - /// Specific file with metadata - FileMeta(FileMeta), - /// Prefix to be further resolved during partition discovery - Prefix(String), -} - -/// The path and size of the file. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SizedFile { - /// Path of the file. It is relative to the current object - /// store (it does not specify the `xx://` scheme). - pub path: String, - /// File size in total - pub size: u64, -} - -/// Description of a file as returned by the listing command of a -/// given object store. The resulting path is relative to the -/// object store that generated it. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct FileMeta { - /// The path and size of the file. - pub sized_file: SizedFile, - /// The last modification time of the file according to the - /// object store metadata. This information might be used by - /// catalog systems like Delta Lake for time travel (see - /// ) - pub last_modified: Option>, -} - -impl FileMeta { - /// The path that describes this file. It is relative to the - /// associated object store. - pub fn path(&self) -> &str { - &self.sized_file.path - } - - /// The size of the file. - pub fn size(&self) -> u64 { - self.sized_file.size - } -} - -impl std::fmt::Display for FileMeta { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{} (size: {})", self.path(), self.size()) - } -} diff --git a/datafusion/data-access/src/object_store/local.rs b/datafusion/data-access/src/object_store/local.rs deleted file mode 100644 index 4134aa81ffc0..000000000000 --- a/datafusion/data-access/src/object_store/local.rs +++ /dev/null @@ -1,368 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Object store that represents the Local File System. - -use std::fs::{self, File, Metadata}; -use std::io; -use std::io::{BufReader, Read, Seek, SeekFrom}; -use std::sync::Arc; - -use async_trait::async_trait; -use futures::{stream, AsyncRead, StreamExt, TryStreamExt}; - -use crate::{FileMeta, ListEntry, Result, SizedFile}; - -use super::{ - FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore, -}; - -pub static LOCAL_SCHEME: &str = "file"; - -#[derive(Debug)] -/// Local File System as Object Store. -pub struct LocalFileSystem; - -#[async_trait] -impl ObjectStore for LocalFileSystem { - async fn list_file(&self, prefix: &str) -> Result { - let prefix = if let Some((_scheme, path)) = prefix.split_once("://") { - path - } else { - prefix - }; - list_all(prefix.to_owned()).await - } - - async fn list_dir( - &self, - prefix: &str, - delimiter: Option, - ) -> Result { - if let Some(d) = delimiter { - if d != "/" && d != "\\" { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - format!("delimiter not supported on local filesystem: {}", d), - )); - } - let mut entry_stream = tokio::fs::read_dir(prefix).await?; - - let list_entries = stream::poll_fn(move |cx| { - entry_stream.poll_next_entry(cx).map(|res| match res { - Ok(Some(x)) => Some(Ok(x)), - Ok(None) => None, - Err(err) => Some(Err(err)), - }) - }) - .then(|entry| async { - let entry = entry?; - let entry = if entry.file_type().await?.is_dir() { - ListEntry::Prefix(path_as_str(&entry.path())?.to_string()) - } else { - ListEntry::FileMeta(get_meta( - path_as_str(&entry.path())?.to_string(), - entry.metadata().await?, - )) - }; - Ok(entry) - }); - - Ok(Box::pin(list_entries)) - } else { - Ok(Box::pin( - self.list_file(prefix).await?.map_ok(ListEntry::FileMeta), - )) - } - } - - fn file_reader(&self, file: SizedFile) -> Result> { - Ok(Arc::new(LocalFileReader::new(file)?)) - } -} - -/// Try to convert a PathBuf reference into a &str -pub fn path_as_str(path: &std::path::Path) -> Result<&str> { - path.to_str().ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - format!("Invalid path '{}'", path.display()), - ) - }) -} - -struct LocalFileReader { - file: SizedFile, -} - -impl LocalFileReader { - fn new(file: SizedFile) -> Result { - Ok(Self { file }) - } -} - -#[async_trait] -impl ObjectReader for LocalFileReader { - async fn chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { - todo!( - "implement once async file readers are available (arrow-rs#78, arrow-rs#111)" - ) - } - - fn sync_chunk_reader( - &self, - start: u64, - length: usize, - ) -> Result> { - // A new file descriptor is opened for each chunk reader. - // This okay because chunks are usually fairly large. - let mut file = File::open(&self.file.path)?; - file.seek(SeekFrom::Start(start))?; - - let file = BufReader::new(file.take(length as u64)); - - Ok(Box::new(file)) - } - - fn length(&self) -> u64 { - self.file.size - } -} - -fn get_meta(path: String, metadata: Metadata) -> FileMeta { - FileMeta { - sized_file: SizedFile { - path, - size: metadata.len(), - }, - last_modified: metadata.modified().map(chrono::DateTime::from).ok(), - } -} - -async fn list_all(prefix: String) -> Result { - async fn find_files_in_dir( - path: String, - to_visit: &mut Vec, - ) -> Result> { - let mut dir = tokio::fs::read_dir(path).await?; - let mut files = Vec::new(); - - while let Some(child) = dir.next_entry().await? { - let child_path = path_as_str(&child.path())?.to_string(); - let metadata = child.metadata().await?; - if metadata.is_dir() { - to_visit.push(child_path.to_string()); - } else { - files.push(get_meta(child_path.to_owned(), metadata)) - } - } - files.sort_by(|a, b| a.path().cmp(b.path())); - Ok(files) - } - - let prefix_meta = tokio::fs::metadata(&prefix).await?; - let prefix = prefix.to_owned(); - if prefix_meta.is_file() { - Ok(Box::pin(stream::once(async move { - Ok(get_meta(prefix, prefix_meta)) - }))) - } else { - let result = stream::unfold(vec![prefix], move |mut to_visit| async move { - match to_visit.pop() { - None => None, - Some(path) => { - let file_stream = match find_files_in_dir(path, &mut to_visit).await { - Ok(files) => stream::iter(files).map(Ok).left_stream(), - Err(e) => stream::once(async { Err(e) }).right_stream(), - }; - - Some((file_stream, to_visit)) - } - } - }) - .flatten(); - Ok(Box::pin(result)) - } -} - -/// Create a stream of `ObjectReader` by converting each file in the `files` vector -/// into instances of `LocalFileReader` -pub fn local_object_reader_stream(files: Vec) -> ObjectReaderStream { - Box::pin(futures::stream::iter(files).map(|f| Ok(local_object_reader(f)))) -} - -/// Helper method to convert a file location to a `LocalFileReader` -pub fn local_object_reader(file: String) -> Arc { - LocalFileSystem - .file_reader(local_unpartitioned_file(file).sized_file) - .expect("File not found") -} - -/// Helper method to fetch the file size and date at given path and create a `FileMeta` -pub fn local_unpartitioned_file(file: String) -> FileMeta { - let metadata = fs::metadata(&file).expect("Local file metadata"); - FileMeta { - sized_file: SizedFile { - size: metadata.len(), - path: file, - }, - last_modified: metadata.modified().map(chrono::DateTime::from).ok(), - } -} - -#[cfg(test)] -mod tests { - use crate::ListEntry; - - use super::*; - use futures::StreamExt; - use std::collections::HashSet; - use std::fs::create_dir; - use std::fs::File; - use tempfile::tempdir; - - #[tokio::test] - async fn test_recursive_listing() -> Result<()> { - // tmp/a.txt - // tmp/x/b.txt - // tmp/y/c.txt - let tmp = tempdir()?; - let x_path = tmp.path().join("x"); - let y_path = tmp.path().join("y"); - let a_path = tmp.path().join("a.txt"); - let b_path = x_path.join("b.txt"); - let c_path = y_path.join("c.txt"); - create_dir(&x_path)?; - create_dir(&y_path)?; - File::create(&a_path)?; - File::create(&b_path)?; - File::create(&c_path)?; - - let mut all_files = HashSet::new(); - let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?; - while let Some(file) = files.next().await { - let file = file?; - assert_eq!(file.size(), 0); - all_files.insert(file.path().to_owned()); - } - - assert_eq!(all_files.len(), 3); - assert!(all_files.contains(a_path.to_str().unwrap())); - assert!(all_files.contains(b_path.to_str().unwrap())); - assert!(all_files.contains(c_path.to_str().unwrap())); - - Ok(()) - } - - #[tokio::test] - async fn test_list_dir() -> Result<()> { - // tmp/a.txt - // tmp/x/b.txt - let tmp = tempdir()?; - let x_path = tmp.path().join("x"); - let a_path = tmp.path().join("a.txt"); - let b_path = x_path.join("b.txt"); - create_dir(&x_path)?; - File::create(&a_path)?; - File::create(&b_path)?; - - fn get_path(entry: ListEntry) -> String { - match entry { - ListEntry::FileMeta(f) => f.sized_file.path, - ListEntry::Prefix(path) => path, - } - } - - async fn assert_equal_paths( - expected: Vec<&std::path::PathBuf>, - actual: ListEntryStream, - ) -> Result<()> { - let expected: HashSet = expected - .iter() - .map(|x| x.to_str().unwrap().to_string()) - .collect(); - let actual: HashSet = actual.map_ok(get_path).try_collect().await?; - assert_eq!(expected, actual); - Ok(()) - } - - // Providing no delimiter means recursive file listing - let files = LocalFileSystem - .list_dir(tmp.path().to_str().unwrap(), None) - .await?; - assert_equal_paths(vec![&a_path, &b_path], files).await?; - - // Providing slash as delimiter means list immediate files and directories - let files = LocalFileSystem - .list_dir(tmp.path().to_str().unwrap(), Some("/".to_string())) - .await?; - assert_equal_paths(vec![&a_path, &x_path], files).await?; - - Ok(()) - } - #[tokio::test] - async fn test_list_all_sort_by_filename() -> Result<()> { - // tmp/file_23590.parquet - // tmp/file_13690.parquet - // tmp/file_12590.parquet - // tmp/file_03590.parquet - let tmp = tempdir()?; - let a_path = tmp.path().join("file_23590.parquet"); - let b_path = tmp.path().join("file_13690.parquet"); - let c_path = tmp.path().join("file_12590.parquet"); - let d_path = tmp.path().join("file_03590.parquet"); - File::create(&a_path)?; - File::create(&b_path)?; - File::create(&c_path)?; - File::create(&d_path)?; - - let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?; - let mut list_files_name = Vec::new(); - while let Some(file) = files.next().await { - let file = file?; - list_files_name.push(file.path().to_owned()); - } - let sort_files_name = [ - tmp.path() - .join("file_03590.parquet") - .to_str() - .unwrap() - .to_string(), - tmp.path() - .join("file_12590.parquet") - .to_str() - .unwrap() - .to_string(), - tmp.path() - .join("file_13690.parquet") - .to_str() - .unwrap() - .to_string(), - tmp.path() - .join("file_23590.parquet") - .to_str() - .unwrap() - .to_string(), - ]; - assert_eq!(list_files_name, sort_files_name); - Ok(()) - } -} diff --git a/datafusion/data-access/src/object_store/mod.rs b/datafusion/data-access/src/object_store/mod.rs deleted file mode 100644 index 496a5494fa01..000000000000 --- a/datafusion/data-access/src/object_store/mod.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Object Store abstracts access to an underlying file/object storage. - -pub mod local; - -use std::fmt::Debug; -use std::io::Read; -use std::pin::Pin; -use std::sync::Arc; - -use async_trait::async_trait; -use futures::{AsyncRead, Stream}; - -use crate::{FileMeta, ListEntry, Result, SizedFile}; - -/// Stream of files listed from object store -pub type FileMetaStream = - Pin> + Send + Sync + 'static>>; - -/// Stream of list entries obtained from object store -pub type ListEntryStream = - Pin> + Send + Sync + 'static>>; - -/// Stream readers opened on a given object store -pub type ObjectReaderStream = - Pin>> + Send + Sync>>; - -/// Object Reader for one file in an object store. -/// -/// Note that the dynamic dispatch on the reader might -/// have some performance impacts. -#[async_trait] -pub trait ObjectReader: Send + Sync { - /// Get reader for a part [start, start + length] in the file asynchronously - async fn chunk_reader(&self, start: u64, length: usize) - -> Result>; - - /// Get reader for a part [start, start + length] in the file - fn sync_chunk_reader( - &self, - start: u64, - length: usize, - ) -> Result>; - - /// Get reader for the entire file - fn sync_reader(&self) -> Result> { - self.sync_chunk_reader(0, self.length() as usize) - } - - /// Get the size of the file - fn length(&self) -> u64; -} - -/// A ObjectStore abstracts access to an underlying file/object storage. -/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes -#[async_trait] -pub trait ObjectStore: Sync + Send + Debug { - /// Returns all the files in path `prefix` - async fn list_file(&self, prefix: &str) -> Result; - - /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, - /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. - async fn list_dir( - &self, - prefix: &str, - delimiter: Option, - ) -> Result; - - /// Get object reader for one file - fn file_reader(&self, file: SizedFile) -> Result>; -}