Skip to content

Commit

Permalink
Return buffer protocol object from get_range and get_ranges (#39)
Browse files Browse the repository at this point in the history
* Return buffer protocol object from get_range and get_ranges

* Fix docs
  • Loading branch information
kylebarron authored Oct 23, 2024
1 parent f5a5c29 commit b581ce4
Show file tree
Hide file tree
Showing 8 changed files with 532 additions and 35 deletions.
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/api/get.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
::: object_store_rs.get_ranges_async
::: object_store_rs.GetOptions
::: object_store_rs.GetResult
::: object_store_rs.Buffer
5 changes: 3 additions & 2 deletions object-store-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ futures = { workspace = true }
http = { workspace = true }
indexmap = { workspace = true }
object_store = { workspace = true }
pyo3 = { workspace = true, features = ["chrono", "abi3-py39"] }
pyo3-arrow = { version = "0.5.1", default-features = false }
pyo3 = { workspace = true, features = ["chrono"] }
# Latest git to get PyArrowBuffer constructor
pyo3-arrow = { git = "https://github.com/kylebarron/arro3", rev = "d771e39c2748313a86bfcdfa36ec68cc24d9804b" }
pyo3-async-runtimes = { workspace = true, features = ["tokio-runtime"] }
pyo3-file = { workspace = true }
pyo3-object_store = { path = "../pyo3-object_store" }
Expand Down
34 changes: 27 additions & 7 deletions object-store-rs/python/object_store_rs/_get.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import sys
from datetime import datetime
from typing import List, Sequence, TypedDict

from ._list import ObjectMeta
from .store import ObjectStore

if sys.version_info >= (3, 12):
from collections.abc import Buffer as _Buffer
else:
from typing_extensions import Buffer as _Buffer

class GetOptions(TypedDict):
"""Options for a get request, such as range"""

Expand Down Expand Up @@ -160,6 +166,17 @@ class BytesStream:
def __next__(self) -> bytes:
"""Return the next chunk of bytes in the stream."""

class Buffer(_Buffer):
"""
A buffer implementing the Python buffer protocol, allowing zero-copy access to the
underlying memory provided by Rust.
You can pass this to [`memoryview`][] for a zero-copy view into the underlying data.
"""

def as_bytes(self) -> bytes:
"""Copy this buffer into a Python `bytes` object."""

def get(
store: ObjectStore, path: str, *, options: GetOptions | None = None
) -> GetResult:
Expand All @@ -182,7 +199,7 @@ async def get_async(
Refer to the documentation for [get][object_store_rs.get].
"""

def get_range(store: ObjectStore, path: str, offset: int, length: int) -> bytes:
def get_range(store: ObjectStore, path: str, offset: int, length: int) -> Buffer:
"""
Return the bytes that are stored at the specified location in the given byte range.
Expand All @@ -198,22 +215,23 @@ def get_range(store: ObjectStore, path: str, offset: int, length: int) -> bytes:
length: The number of bytes.
Returns:
bytes
A `Buffer` object implementing the Python buffer protocol, allowing
zero-copy access to the underlying memory provided by Rust.
"""

async def get_range_async(
store: ObjectStore, path: str, offset: int, length: int
) -> bytes:
) -> Buffer:
"""Call `get_range` asynchronously.
Refer to the documentation for [get_range][object_store_rs.get_range].
"""

def get_ranges(
store: ObjectStore, path: str, offsets: Sequence[int], lengths: Sequence[int]
) -> List[bytes]:
) -> List[Buffer]:
"""
Return the bytes that are stored at the specified locationin the given byte ranges
Return the bytes that are stored at the specified location in the given byte ranges
To improve performance this will:
Expand All @@ -227,12 +245,14 @@ def get_ranges(
lengths: A sequence of `int` representing the number of bytes within each range.
Returns:
A sequence of `bytes`, one for each range.
A sequence of `Buffer`, one for each range. This `Buffer` object implements the
Python buffer protocol, allowing zero-copy access to the underlying memory
provided by Rust.
"""

async def get_ranges_async(
store: ObjectStore, path: str, offsets: Sequence[int], lengths: Sequence[int]
) -> List[bytes]:
) -> List[Buffer]:
"""Call `get_ranges` asynchronously.
Refer to the documentation for [get_ranges][object_store_rs.get_ranges].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ from ._copy import copy as copy
from ._copy import copy_async as copy_async
from ._delete import delete as delete
from ._delete import delete_async as delete_async
from ._get import Buffer as Buffer
from ._get import GetOptions as GetOptions
from ._get import GetResult as GetResult
from ._get import get as get
Expand Down
21 changes: 15 additions & 6 deletions object-store-rs/src/get.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use arrow::buffer::Buffer;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::{BoxStream, Fuse};
Expand All @@ -8,6 +9,7 @@ use object_store::{GetOptions, GetResult, ObjectStore};
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3_arrow::buffer::PyArrowBuffer;
use pyo3_object_store::error::{PyObjectStoreError, PyObjectStoreResult};
use pyo3_object_store::PyObjectStore;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -265,12 +267,12 @@ pub(crate) fn get_range(
path: String,
offset: usize,
length: usize,
) -> PyObjectStoreResult<PyBytesWrapper> {
) -> PyObjectStoreResult<PyArrowBuffer> {
let runtime = get_runtime(py)?;
let range = offset..offset + length;
py.allow_threads(|| {
let out = runtime.block_on(store.as_ref().get_range(&path.into(), range))?;
Ok::<_, PyObjectStoreError>(PyBytesWrapper::new(out))
Ok::<_, PyObjectStoreError>(PyArrowBuffer::new(Buffer::from_bytes(out.into())))
})
}

Expand All @@ -289,7 +291,7 @@ pub(crate) fn get_range_async(
.get_range(&path.into(), range)
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(PyBytesWrapper::new(out))
Ok(PyArrowBuffer::new(Buffer::from_bytes(out.into())))
})
}

Expand All @@ -300,7 +302,7 @@ pub(crate) fn get_ranges(
path: String,
offsets: Vec<usize>,
lengths: Vec<usize>,
) -> PyObjectStoreResult<Vec<PyBytesWrapper>> {
) -> PyObjectStoreResult<Vec<PyArrowBuffer>> {
let runtime = get_runtime(py)?;
let ranges = offsets
.into_iter()
Expand All @@ -309,7 +311,11 @@ pub(crate) fn get_ranges(
.collect::<Vec<_>>();
py.allow_threads(|| {
let out = runtime.block_on(store.as_ref().get_ranges(&path.into(), &ranges))?;
Ok::<_, PyObjectStoreError>(out.into_iter().map(PyBytesWrapper::new).collect())
Ok::<_, PyObjectStoreError>(
out.into_iter()
.map(|buf| PyArrowBuffer::new(Buffer::from_bytes(buf.into())))
.collect(),
)
})
}

Expand All @@ -332,6 +338,9 @@ pub(crate) fn get_ranges_async(
.get_ranges(&path.into(), &ranges)
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(out.into_iter().map(PyBytesWrapper::new).collect::<Vec<_>>())
Ok(out
.into_iter()
.map(|buf| PyArrowBuffer::new(Buffer::from_bytes(buf.into())))
.collect::<Vec<_>>())
})
}
27 changes: 27 additions & 0 deletions tests/test_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,30 @@ async def test_stream_async():
pos += size

assert pos == len(data)


def test_get_range():
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 100
path = "big-data.txt"

obs.put(store, path, data)
buffer = obs.get_range(store, path, 5, 10)
view = memoryview(buffer)
assert view == data[5:15]


def test_get_ranges():
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 100
path = "big-data.txt"

obs.put(store, path, data)
offsets = [5, 10, 15, 20]
lengths = [10, 10, 10, 10]
buffers = obs.get_ranges(store, path, offsets, lengths)

for offset, length, buffer in zip(offsets, lengths, buffers):
assert memoryview(buffer) == data[offset : offset + length]
Loading

0 comments on commit b581ce4

Please sign in to comment.