Skip to content

Commit

Permalink
feat: make DeltaStorageHandler pickle serializable (delta-io#1016)
Browse files Browse the repository at this point in the history
# Description

Integrating with polars requires the `DeltaStorageHandler` to be
serializable with pickle. this PR implements the required dunder methods
to make it so...

Unfortunately we lost the ability to instantiate the
`DeltaStorageHandler` with an existing object store, however I do
believe that this is not a critical loss.

cc @chitralverma @ritchie46

# Related Issue(s)

closes delta-io#1015

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
roeap authored and chitralverma committed Mar 17, 2023
1 parent 94423ba commit 246bfc7
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 146 deletions.
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ env_logger = "0"
futures = "0.3"
lazy_static = "1"
regex = "1"
serde = "1"
serde_json = "1"
tokio = { version = "1", features = ["rt-multi-thread"] }

Expand Down
165 changes: 22 additions & 143 deletions python/deltalake/fs.py
Original file line number Diff line number Diff line change
@@ -1,158 +1,38 @@
from typing import Any, Dict, List, Optional
from typing import Dict, List, Optional

import pyarrow as pa
from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler

from ._internal import DeltaFileSystemHandler


class DeltaStorageHandler(FileSystemHandler):
# NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks.
class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
"""
DeltaStorageHander is a concrete implementations of a PyArrow FileSystemHandler.
DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler.
"""

def __init__(
self,
table_uri: str,
options: Optional[Dict[str, str]] = None,
backend: Optional[Any] = None,
) -> None:
self._storage = backend or DeltaFileSystemHandler(table_uri, options)

def __eq__(self, other: Any) -> bool:
return NotImplemented

def __ne__(self, other: Any) -> bool:
return NotImplemented

def get_type_name(self) -> str:
"""
The filesystem’s type name.
:return: The filesystem’s type name.
"""
return self._storage.get_type_name()

def normalize_path(self, path: str) -> str:
"""
Normalize filesystem path.
:param path: the path to normalize
:return: the normalized path
def open_input_file(self, path: str) -> pa.PythonFile:
"""
return self._storage.normalize_path(path)

def get_file_info(self, paths: List[str]) -> List[FileInfo]:
"""
Get info for the given files.
:param paths: List of file paths
:return: list of file info objects
"""
return self._storage.get_file_info(paths)

def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]:
"""
Get info for the files defined by FileSelector.
:param selector: FileSelector object
:return: list of file info objects
"""
return self._storage.get_file_info_selector(
selector.base_dir, selector.allow_not_found, selector.recursive
)

def create_dir(self, path: str, recursive: bool = True) -> None:
"""
Create a directory and subdirectories.
This function succeeds if the directory already exists.
:param path: The path of the new directory.
:param recursive: Create nested directories as well.
"""
self._storage.create_dir(path, recursive)

def delete_dir(self, path: str) -> None:
"""
Delete a directory and its contents, recursively.
:param path: The path of the directory to be deleted.
"""
self._storage.delete_dir(path)

def delete_dir_contents(self, path: str) -> None:
"""
Delete a directory’s contents, recursively.
Like delete_dir, but doesn’t delete the directory itself.
:param path: The path of the directory to be deleted.
"""
self._storage.delete_dir_contents(path)

def delete_root_dir_contents(self) -> None:
"""
Delete a directory's contents, recursively.
Like delete_dir_contents, but for the root directory (path is empty or “/”)
"""
self._storage.delete_root_dir_contents()

def delete_file(self, path: str) -> None:
"""
Delete a file.
:param path: The path of the file to be deleted.
"""
self._storage.delete_file(path)

def move(self, src: str, dest: str) -> None:
"""
Move / rename a file or directory.
If the destination exists: - if it is a non-empty directory, an error is returned - otherwise,
if it has the same type as the source, it is replaced - otherwise,
behavior is unspecified (implementation-dependent).
:param src: The path of the file or the directory to be moved.
:param dest: The destination path where the file or directory is moved to.
"""
self._storage.move_file(src, dest)

def copy_file(self, src: str, dest: str) -> None:
"""
Copy a file.
If the destination exists and is a directory, an error is returned.
Otherwise, it is replaced.
:param src: The path of the file to be copied from.
:param dest: The destination path where the file is copied to.
"""
self._storage.copy_file(src, dest)

def open_input_stream(self, path: str) -> pa.NativeFile:
"""
Open an input stream for sequential reading.
Open an input file for random access reading.
:param source: The source to open for reading.
:return: NativeFile
"""
return pa.PythonFile(self._storage.open_input_file(path))
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))

def open_input_file(self, path: str) -> pa.NativeFile:
def open_input_stream(self, path: str) -> pa.PythonFile:
"""
Open an input file for random access reading.
Open an input stream for sequential reading.
:param source: The source to open for reading.
:return: NativeFile
"""
return pa.PythonFile(self._storage.open_input_file(path))
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))

def open_output_stream(
self, path: str, metadata: Optional[Dict[str, Any]] = None
) -> pa.NativeFile:
self, path: str, metadata: Optional[Dict[str, str]] = None
) -> pa.PythonFile:
"""
Open an output stream for sequential writing.
Expand All @@ -162,18 +42,17 @@ def open_output_stream(
:param metadata: If not None, a mapping of string keys to string values.
:return: NativeFile
"""
return pa.PythonFile(self._storage.open_output_stream(path, metadata))
return pa.PythonFile(
DeltaFileSystemHandler.open_output_stream(self, path, metadata)
)

def open_append_stream(
self, path: str, metadata: Optional[Dict[str, Any]] = None
) -> pa.NativeFile:
def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore
"""
DEPRECATED: Open an output stream for appending.
If the target doesn’t exist, a new empty file is created.
Get info for the files defined by FileSelector.
:param path: The source to open for writing.
:param metadata: If not None, a mapping of string keys to string values.
:return: NativeFile
:param selector: FileSelector object
:return: list of file info objects
"""
raise NotImplementedError
return DeltaFileSystemHandler.get_file_info_selector(
self, selector.base_dir, selector.allow_not_found, selector.recursive
)
1 change: 0 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ def to_pyarrow_dataset(
DeltaStorageHandler(
self._table.table_uri(),
self._storage_options,
self._table.get_py_storage_backend(),
)
)

Expand Down
21 changes: 20 additions & 1 deletion python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@ use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::runtime::Runtime;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct FsConfig {
pub(crate) root_url: String,
pub(crate) options: HashMap<String, String>,
}

#[pyclass(subclass)]
#[derive(Debug, Clone)]
pub struct DeltaFileSystemHandler {
pub(crate) inner: Arc<DynObjectStore>,
pub(crate) rt: Arc<Runtime>,
pub(crate) config: FsConfig,
}

#[pymethods]
Expand All @@ -25,12 +33,16 @@ impl DeltaFileSystemHandler {
#[args(options = "None")]
fn new(table_uri: &str, options: Option<HashMap<String, String>>) -> PyResult<Self> {
let storage = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(options.unwrap_or_default())
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
.map_err(PyDeltaTableError::from_raw)?;
Ok(Self {
inner: storage,
rt: Arc::new(rt()?),
config: FsConfig {
root_url: table_uri.into(),
options: options.unwrap_or_default(),
},
})
}

Expand Down Expand Up @@ -242,6 +254,13 @@ impl DeltaFileSystemHandler {
.map_err(PyDeltaTableError::from_object_store)?;
Ok(file)
}

pub fn __getnewargs__(&self) -> PyResult<(String, Option<HashMap<String, String>>)> {
Ok((
self.config.root_url.clone(),
Some(self.config.options.clone()),
))
}
}

// TODO the C++ implementation track an internal lock on all random access files, DO we need this here?
Expand Down
13 changes: 12 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use crate::filesystem::FsConfig;
use crate::schema::schema_to_pyobject;

create_exception!(deltalake, PyDeltaTableError, PyException);
Expand Down Expand Up @@ -83,6 +84,8 @@ enum PartitionFilterValue<'a> {
#[pyclass]
struct RawDeltaTable {
_table: deltalake::DeltaTable,
// storing the config additionally on the table helps us make pickling work.
_config: FsConfig,
}

#[pyclass]
Expand Down Expand Up @@ -111,6 +114,7 @@ impl RawDeltaTable {
without_files: bool,
) -> PyResult<Self> {
let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri);
let options = storage_options.clone().unwrap_or_default();
if let Some(storage_options) = storage_options {
builder = builder.with_storage_options(storage_options)
}
Expand All @@ -125,7 +129,13 @@ impl RawDeltaTable {
let table = rt()?
.block_on(builder.load())
.map_err(PyDeltaTableError::from_raw)?;
Ok(RawDeltaTable { _table: table })
Ok(RawDeltaTable {
_table: table,
_config: FsConfig {
root_url: table_uri.into(),
options,
},
})
}

#[classmethod]
Expand Down Expand Up @@ -426,6 +436,7 @@ impl RawDeltaTable {
Ok(filesystem::DeltaFileSystemHandler {
inner: self._table.object_store(),
rt: Arc::new(rt()?),
config: self._config.clone(),
})
}
}
Expand Down
14 changes: 14 additions & 0 deletions python/tests/test_fs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pickle
import urllib

import pyarrow as pa
Expand Down Expand Up @@ -233,3 +234,16 @@ def test_roundtrip_azure_decoded_sas(azurite_sas_creds, sample_data: pa.Table):
table = dt.to_pyarrow_table()
assert table == sample_data
assert dt.version() == 0


def test_pickle_roundtrip(tmp_path):
store = DeltaStorageHandler(str(tmp_path.absolute()))

with (tmp_path / "asd.pkl").open("wb") as handle:
pickle.dump(store, handle)

with (tmp_path / "asd.pkl").open("rb") as handle:
store_pkl = pickle.load(handle)

infos = store_pkl.get_file_info(["asd.pkl"])
assert infos[0].size > 0

0 comments on commit 246bfc7

Please sign in to comment.