From f4df3a83ab0d8b3fb33fe711b08d39eb3cb1a33e Mon Sep 17 00:00:00 2001 From: Eero Lihavainen Date: Fri, 15 Sep 2023 15:41:43 +0300 Subject: [PATCH] feat: accept file size to skip head request --- python/deltalake/_internal.pyi | 2 +- python/deltalake/fs.py | 32 +++++++++++++++++++++--- python/deltalake/table.py | 7 ++++-- python/src/filesystem.rs | 16 +++++++++--- python/tests/test_file_system_handler.py | 23 +++++++++++++++++ python/tests/test_table_read.py | 19 ++++++++++++++ 6 files changed, 88 insertions(+), 11 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 48da1d47df..e61b26b58f 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -254,7 +254,7 @@ class DeltaFileSystemHandler: """ def normalize_path(self, path: str) -> str: """Normalize filesystem path.""" - def open_input_file(self, path: str) -> ObjectInputFile: + def open_input_file(self, path: str, size: int | None = None) -> ObjectInputFile: """Open an input file for random access reading.""" def open_output_stream( self, path: str, metadata: dict[str, str] | None = None diff --git a/python/deltalake/fs.py b/python/deltalake/fs.py index efb92fe239..52b0b7de0f 100644 --- a/python/deltalake/fs.py +++ b/python/deltalake/fs.py @@ -12,23 +12,47 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler): DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler. """ - def open_input_file(self, path: str) -> pa.PythonFile: + known_sizes: Dict[str, int] = {} + + def __new__( # type:ignore + cls, + table_uri: str, + storage_options: Optional[Dict[str, str]] = None, + known_sizes: Optional[Dict[str, int]] = None, + ): + return super().__new__( + cls, table_uri=table_uri, options=storage_options # type:ignore + ) + + def __init__( + self, + table_uri: str, + storage_options: Optional[Dict[str, str]] = None, + known_sizes: Optional[Dict[str, int]] = None, + ): + if known_sizes: + self.known_sizes = known_sizes + return + + def open_input_file(self, path: str, size: Optional[int] = None) -> pa.PythonFile: """ Open an input file for random access reading. :param source: The source to open for reading. :return: NativeFile """ - return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path)) + size = self.known_sizes.get(path) + return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path, size)) - def open_input_stream(self, path: str) -> pa.PythonFile: + def open_input_stream(self, path: str, size: Optional[int] = None) -> pa.PythonFile: """ Open an input stream for sequential reading. :param source: The source to open for reading. :return: NativeFile """ - return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path)) + size = self.known_sizes.get(path) + return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path, size)) def open_output_stream( self, path: str, metadata: Optional[Dict[str, str]] = None diff --git a/python/deltalake/table.py b/python/deltalake/table.py index add8342555..cf7d844e11 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -522,10 +522,13 @@ def to_pyarrow_dataset( ) if not filesystem: + file_sizes = self.get_add_actions().to_pydict() + file_sizes = { + x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"]) + } filesystem = pa_fs.PyFileSystem( DeltaStorageHandler( - self._table.table_uri(), - self._storage_options, + self._table.table_uri(), self._storage_options, file_sizes ) ) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index d6a2d1dcb2..993c5fc580 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -236,7 +236,7 @@ impl DeltaFileSystemHandler { Ok(()) } - fn open_input_file(&self, path: String) -> PyResult { + fn open_input_file(&self, path: String, size: Option) -> PyResult { let path = Self::parse_path(&path); let file = self .rt @@ -244,6 +244,7 @@ impl DeltaFileSystemHandler { Arc::clone(&self.rt), self.inner.clone(), path, + size, )) .map_err(PythonError::from)?; Ok(file) @@ -296,11 +297,18 @@ impl ObjectInputFile { rt: Arc, store: Arc, path: Path, + size: Option, ) -> Result { - // Issue a HEAD Object to get the content-length and ensure any + // If file size is not given, issue a HEAD Object to get the content-length and ensure any // errors (e.g. file not found) don't wait until the first read() call. - let meta = store.head(&path).await?; - let content_length = meta.size as i64; + let content_length = match size { + Some(s) => s, + None => { + let meta = store.head(&path).await?; + meta.size as i64 + } + }; + // TODO make sure content length is valid // https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/filesystem/s3fs.cc#L1083 Ok(Self { diff --git a/python/tests/test_file_system_handler.py b/python/tests/test_file_system_handler.py index 95936dfecc..488a880e36 100644 --- a/python/tests/test_file_system_handler.py +++ b/python/tests/test_file_system_handler.py @@ -99,6 +99,29 @@ def test_open_input_file(file_systems, table_data): assert file.read_at(10, 0) == arrow_file.read_at(10, 0) +def test_open_input_file_with_size(tmp_path, table_data): + file_path = "table.parquet" + input_size = 12345 # incorrect file size for testing purposes + + # test that injected file size gets stored correctly + store1 = DeltaStorageHandler( + str(tmp_path.absolute()), known_sizes={file_path: input_size} + ) + wrapped_fs = fs.PyFileSystem(store1) + arrow_fs = fs.SubTreeFileSystem(str(tmp_path.absolute()), fs.LocalFileSystem()) + pq.write_table(table_data, file_path, filesystem=arrow_fs) + file = wrapped_fs.open_input_file(file_path) + assert file.size() == input_size + + # confirm that true size is different + store2 = DeltaStorageHandler(str(tmp_path.absolute())) + wrapped_fs = fs.PyFileSystem(store2) + arrow_fs = fs.SubTreeFileSystem(str(tmp_path.absolute()), fs.LocalFileSystem()) + pq.write_table(table_data, file_path, filesystem=arrow_fs) + file = wrapped_fs.open_input_file(file_path) + assert file.size() != input_size + + def test_read_table(file_systems, table_data): store, arrow_fs = file_systems file_path = "table.parquet" diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 36b144cf94..23fcef7e27 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -2,6 +2,7 @@ from datetime import datetime from pathlib import Path from threading import Barrier, Thread +from types import SimpleNamespace from unittest.mock import Mock from packaging import version @@ -104,6 +105,24 @@ def test_read_simple_table_update_incremental(): assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"id": [5, 7, 9]} +def test_read_simple_table_file_sizes_failure(): + table_path = "../rust/tests/data/simple_table" + dt = DeltaTable(table_path) + add_actions = dt.get_add_actions().to_pydict() + + # set all sizes to -1, the idea is to break the reading + add_actions_modified = { + x: [-1 for item in x] if x == "size_bytes" else y + for x, y in add_actions.items() + } + dt.get_add_actions = lambda: SimpleNamespace( + to_pydict=lambda: add_actions_modified + ) # type:ignore + + with pytest.raises(OSError, match="Cannot seek past end of file."): + dt.to_pyarrow_dataset().to_table().to_pydict() + + def test_read_partitioned_table_to_dict(): table_path = "../rust/tests/data/delta-0.8.0-partitioned" dt = DeltaTable(table_path)