diff --git a/python/Cargo.toml b/python/Cargo.toml index 8a7d9944bf..240cd35d72 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -23,6 +23,7 @@ env_logger = "0" futures = "0.3" lazy_static = "1" regex = "1" +serde = "1" serde_json = "1" tokio = { version = "1", features = ["rt-multi-thread"] } diff --git a/python/deltalake/fs.py b/python/deltalake/fs.py index 1935c1b21b..efb92fe239 100644 --- a/python/deltalake/fs.py +++ b/python/deltalake/fs.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional +from typing import Dict, List, Optional import pyarrow as pa from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler @@ -6,153 +6,33 @@ from ._internal import DeltaFileSystemHandler -class DeltaStorageHandler(FileSystemHandler): +# NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks. +class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler): """ - DeltaStorageHander is a concrete implementations of a PyArrow FileSystemHandler. + DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler. """ - def __init__( - self, - table_uri: str, - options: Optional[Dict[str, str]] = None, - backend: Optional[Any] = None, - ) -> None: - self._storage = backend or DeltaFileSystemHandler(table_uri, options) - - def __eq__(self, other: Any) -> bool: - return NotImplemented - - def __ne__(self, other: Any) -> bool: - return NotImplemented - - def get_type_name(self) -> str: - """ - The filesystem’s type name. - - :return: The filesystem’s type name. - """ - return self._storage.get_type_name() - - def normalize_path(self, path: str) -> str: - """ - Normalize filesystem path. - - :param path: the path to normalize - :return: the normalized path + def open_input_file(self, path: str) -> pa.PythonFile: """ - return self._storage.normalize_path(path) - - def get_file_info(self, paths: List[str]) -> List[FileInfo]: - """ - Get info for the given files. - - :param paths: List of file paths - :return: list of file info objects - """ - return self._storage.get_file_info(paths) - - def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: - """ - Get info for the files defined by FileSelector. - - :param selector: FileSelector object - :return: list of file info objects - """ - return self._storage.get_file_info_selector( - selector.base_dir, selector.allow_not_found, selector.recursive - ) - - def create_dir(self, path: str, recursive: bool = True) -> None: - """ - Create a directory and subdirectories. - - This function succeeds if the directory already exists. - - :param path: The path of the new directory. - :param recursive: Create nested directories as well. - """ - self._storage.create_dir(path, recursive) - - def delete_dir(self, path: str) -> None: - """ - Delete a directory and its contents, recursively. - - :param path: The path of the directory to be deleted. - """ - self._storage.delete_dir(path) - - def delete_dir_contents(self, path: str) -> None: - """ - Delete a directory’s contents, recursively. - - Like delete_dir, but doesn’t delete the directory itself. - - :param path: The path of the directory to be deleted. - """ - self._storage.delete_dir_contents(path) - - def delete_root_dir_contents(self) -> None: - """ - Delete a directory's contents, recursively. - - Like delete_dir_contents, but for the root directory (path is empty or “/”) - """ - self._storage.delete_root_dir_contents() - - def delete_file(self, path: str) -> None: - """ - Delete a file. - - :param path: The path of the file to be deleted. - """ - self._storage.delete_file(path) - - def move(self, src: str, dest: str) -> None: - """ - Move / rename a file or directory. - - If the destination exists: - if it is a non-empty directory, an error is returned - otherwise, - if it has the same type as the source, it is replaced - otherwise, - behavior is unspecified (implementation-dependent). - - :param src: The path of the file or the directory to be moved. - :param dest: The destination path where the file or directory is moved to. - """ - self._storage.move_file(src, dest) - - def copy_file(self, src: str, dest: str) -> None: - """ - Copy a file. - - If the destination exists and is a directory, an error is returned. - Otherwise, it is replaced. - - :param src: The path of the file to be copied from. - :param dest: The destination path where the file is copied to. - """ - self._storage.copy_file(src, dest) - - def open_input_stream(self, path: str) -> pa.NativeFile: - """ - Open an input stream for sequential reading. + Open an input file for random access reading. :param source: The source to open for reading. :return: NativeFile """ - return pa.PythonFile(self._storage.open_input_file(path)) + return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path)) - def open_input_file(self, path: str) -> pa.NativeFile: + def open_input_stream(self, path: str) -> pa.PythonFile: """ - Open an input file for random access reading. + Open an input stream for sequential reading. :param source: The source to open for reading. :return: NativeFile """ - return pa.PythonFile(self._storage.open_input_file(path)) + return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path)) def open_output_stream( - self, path: str, metadata: Optional[Dict[str, Any]] = None - ) -> pa.NativeFile: + self, path: str, metadata: Optional[Dict[str, str]] = None + ) -> pa.PythonFile: """ Open an output stream for sequential writing. @@ -162,18 +42,17 @@ def open_output_stream( :param metadata: If not None, a mapping of string keys to string values. :return: NativeFile """ - return pa.PythonFile(self._storage.open_output_stream(path, metadata)) + return pa.PythonFile( + DeltaFileSystemHandler.open_output_stream(self, path, metadata) + ) - def open_append_stream( - self, path: str, metadata: Optional[Dict[str, Any]] = None - ) -> pa.NativeFile: + def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore """ - DEPRECATED: Open an output stream for appending. - - If the target doesn’t exist, a new empty file is created. + Get info for the files defined by FileSelector. - :param path: The source to open for writing. - :param metadata: If not None, a mapping of string keys to string values. - :return: NativeFile + :param selector: FileSelector object + :return: list of file info objects """ - raise NotImplementedError + return DeltaFileSystemHandler.get_file_info_selector( + self, selector.base_dir, selector.allow_not_found, selector.recursive + ) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index eedf3f0825..3de557a9e6 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -354,7 +354,6 @@ def to_pyarrow_dataset( DeltaStorageHandler( self._table.table_uri(), self._storage_options, - self._table.get_py_storage_backend(), ) ) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 259b7a98d1..f9eb817586 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -9,14 +9,22 @@ use deltalake::DeltaTableBuilder; use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError}; use pyo3::prelude::*; use pyo3::types::{IntoPyDict, PyBytes}; +use serde::{Deserialize, Serialize}; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::runtime::Runtime; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct FsConfig { + pub(crate) root_url: String, + pub(crate) options: HashMap, +} + #[pyclass(subclass)] #[derive(Debug, Clone)] pub struct DeltaFileSystemHandler { pub(crate) inner: Arc, pub(crate) rt: Arc, + pub(crate) config: FsConfig, } #[pymethods] @@ -25,12 +33,16 @@ impl DeltaFileSystemHandler { #[args(options = "None")] fn new(table_uri: &str, options: Option>) -> PyResult { let storage = DeltaTableBuilder::from_uri(table_uri) - .with_storage_options(options.unwrap_or_default()) + .with_storage_options(options.clone().unwrap_or_default()) .build_storage() .map_err(PyDeltaTableError::from_raw)?; Ok(Self { inner: storage, rt: Arc::new(rt()?), + config: FsConfig { + root_url: table_uri.into(), + options: options.unwrap_or_default(), + }, }) } @@ -242,6 +254,13 @@ impl DeltaFileSystemHandler { .map_err(PyDeltaTableError::from_object_store)?; Ok(file) } + + pub fn __getnewargs__(&self) -> PyResult<(String, Option>)> { + Ok(( + self.config.root_url.clone(), + Some(self.config.options.clone()), + )) + } } // TODO the C++ implementation track an internal lock on all random access files, DO we need this here? diff --git a/python/src/lib.rs b/python/src/lib.rs index 2cff068c9e..a6d7b0b993 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -31,6 +31,7 @@ use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use crate::filesystem::FsConfig; use crate::schema::schema_to_pyobject; create_exception!(deltalake, PyDeltaTableError, PyException); @@ -83,6 +84,8 @@ enum PartitionFilterValue<'a> { #[pyclass] struct RawDeltaTable { _table: deltalake::DeltaTable, + // storing the config additionally on the table helps us make pickling work. + _config: FsConfig, } #[pyclass] @@ -111,6 +114,7 @@ impl RawDeltaTable { without_files: bool, ) -> PyResult { let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri); + let options = storage_options.clone().unwrap_or_default(); if let Some(storage_options) = storage_options { builder = builder.with_storage_options(storage_options) } @@ -125,7 +129,13 @@ impl RawDeltaTable { let table = rt()? .block_on(builder.load()) .map_err(PyDeltaTableError::from_raw)?; - Ok(RawDeltaTable { _table: table }) + Ok(RawDeltaTable { + _table: table, + _config: FsConfig { + root_url: table_uri.into(), + options, + }, + }) } #[classmethod] @@ -426,6 +436,7 @@ impl RawDeltaTable { Ok(filesystem::DeltaFileSystemHandler { inner: self._table.object_store(), rt: Arc::new(rt()?), + config: self._config.clone(), }) } } diff --git a/python/tests/test_fs.py b/python/tests/test_fs.py index 9418ff3862..550dcc701c 100644 --- a/python/tests/test_fs.py +++ b/python/tests/test_fs.py @@ -1,3 +1,4 @@ +import pickle import urllib import pyarrow as pa @@ -233,3 +234,16 @@ def test_roundtrip_azure_decoded_sas(azurite_sas_creds, sample_data: pa.Table): table = dt.to_pyarrow_table() assert table == sample_data assert dt.version() == 0 + + +def test_pickle_roundtrip(tmp_path): + store = DeltaStorageHandler(str(tmp_path.absolute())) + + with (tmp_path / "asd.pkl").open("wb") as handle: + pickle.dump(store, handle) + + with (tmp_path / "asd.pkl").open("rb") as handle: + store_pkl = pickle.load(handle) + + infos = store_pkl.get_file_info(["asd.pkl"]) + assert infos[0].size > 0