Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unpin zarr & xarray #555

Merged
merged 12 commits into from
Jan 9, 2025
4 changes: 2 additions & 2 deletions icechunk-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ classifiers = [
license = { text = "Apache-2.0" }
dynamic = ["version"]

dependencies = ["zarr==3.0.0rc2"]
dependencies = ["zarr>=3"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right way to specify it? should it be 3 but not for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh you want "zarr>=3<4"?

cc @jhamman for input


[tool.poetry]
name = "icechunk"
Expand All @@ -39,7 +39,7 @@ test = [
"ruff",
"dask>=2024.11.0",
"distributed>=2024.11.0",
"xarray@git+https://github.com/pydata/xarray.git@main",
"xarray>=2025.01.1",
"hypothesis",
"pandas-stubs",
"boto3-stubs[s3]",
Expand Down
44 changes: 38 additions & 6 deletions icechunk-python/python/icechunk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
from typing import TYPE_CHECKING, Any

from icechunk._icechunk_python import PyStore
from zarr.abc.store import ByteRangeRequest, Store
from zarr.abc.store import (
ByteRequest,
OffsetByteRequest,
RangeByteRequest,
Store,
SuffixByteRequest,
)
from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.common import BytesLike
from zarr.core.sync import SyncMixin
Expand All @@ -12,6 +18,20 @@
from icechunk import Session


def _byte_request_to_tuple(
byte_request: ByteRequest | None,
) -> tuple[int | None, int | None]:
if byte_request is None:
return (None, None)
if isinstance(byte_request, RangeByteRequest):
return (byte_request.start, byte_request.end)
if isinstance(byte_request, OffsetByteRequest):
return (byte_request.offset, None)
if isinstance(byte_request, SuffixByteRequest):
return (None, byte_request.suffix)
raise ValueError("Invalid byte request type")


class IcechunkStore(Store, SyncMixin):
_store: PyStore

Expand Down Expand Up @@ -93,22 +113,33 @@ async def get(
self,
key: str,
prototype: BufferPrototype,
byte_range: tuple[int | None, int | None] | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None:
"""Retrieve the value associated with a given key.

Parameters
----------
key : str
byte_range : tuple[int, Optional[int]], optional
byte_range : ByteRequest, optional

ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved.

- RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned.
- OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header.
- SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns
-------
Buffer
"""

try:
result = await self._store.get(key, byte_range)
if byte_range is None:
result = await self._store.get(
key,
)
else:
result = await self._store.get(key, _byte_request_to_tuple(byte_range))
except KeyError as _e:
# Zarr python expects None to be returned if the key does not exist
# but an IcechunkStore returns an error if the key does not exist
Expand All @@ -119,7 +150,7 @@ async def get(
async def get_partial_values(
self,
prototype: BufferPrototype,
key_ranges: Iterable[tuple[str, ByteRangeRequest]],
key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
"""Retrieve possibly partial values from given key_ranges.

Expand All @@ -134,7 +165,8 @@ async def get_partial_values(
"""
# NOTE: pyo3 has not implicit conversion from an Iterable to a rust iterable. So we convert it
# to a list here first. Possible opportunity for optimization.
result = await self._store.get_partial_values(list(key_ranges))
ranges = [(k[0], _byte_request_to_tuple(k[1])) for k in key_ranges]
result = await self._store.get_partial_values(list(ranges))
return [prototype.buffer.from_bytes(r) for r in result]

async def exists(self, key: str) -> bool:
Expand Down
3 changes: 2 additions & 1 deletion icechunk-python/tests/test_zarr/test_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from zarr.core.group import GroupMetadata
from zarr.core.sync import sync
from zarr.errors import ContainsArrayError, ContainsGroupError
from zarr.storage import StorePath, make_store_path
from zarr.storage import StorePath
from zarr.storage._common import make_store_path


@pytest.fixture(params=["memory"])
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/tests/test_zarr/test_store/test_core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from icechunk import IcechunkStore
from tests.conftest import parse_repo
from zarr.storage import make_store_path
from zarr.storage._common import make_store_path


async def test_make_store_path() -> None:
Expand Down
15 changes: 13 additions & 2 deletions icechunk-python/tests/test_zarr/test_store/test_icechunk_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from icechunk import IcechunkStore, local_filesystem_storage
from icechunk.repository import Repository
from zarr.abc.store import OffsetByteRequest, RangeByteRequest, SuffixByteRequest
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
from zarr.core.sync import collect_aiterator
from zarr.testing.store import StoreTests
Expand Down Expand Up @@ -226,15 +227,25 @@ async def test_get_partial_values(
values = await store.get_partial_values(
default_buffer_prototype(),
[
("zarr.json", (0, 5)),
("zarr.json", RangeByteRequest(0, 5)),
("zarr.json", SuffixByteRequest(5)),
("zarr.json", OffsetByteRequest(10)),
],
)

assert len(values) == 1
assert len(values) == 3
data = values[0].to_bytes()
assert len(data) == 5
assert data == DEFAULT_GROUP_METADATA[:5]

data = values[1].to_bytes()
assert len(data) == len(DEFAULT_GROUP_METADATA) - 5
assert data == DEFAULT_GROUP_METADATA[:-5]

data = values[2].to_bytes()
assert len(data) == len(DEFAULT_GROUP_METADATA) - 10
assert data == DEFAULT_GROUP_METADATA[10:]

async def test_set(self, store: IcechunkStore) -> None:
await store.set("zarr.json", self.buffer_cls.from_bytes(DEFAULT_GROUP_METADATA))
assert await store.exists("zarr.json")
Expand Down
6 changes: 3 additions & 3 deletions icechunk/src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub enum ByteRange {
Bounded(Range<ChunkOffset>),
/// All bytes from the given offset (included) to the end of the object
From(ChunkOffset),
/// Last n bytes in the object
/// All bytes up to the last n bytes in the object
Last(ChunkLength),
}

Expand Down Expand Up @@ -185,7 +185,7 @@ impl ByteRange {
bytes.slice(range.start as usize..range.end as usize)
}
ByteRange::From(from) => bytes.slice(*from as usize..),
ByteRange::Last(n) => bytes.slice(bytes.len() - *n as usize..bytes.len()),
ByteRange::Last(n) => bytes.slice(0usize..bytes.len() - *n as usize),
}
}
}
Expand All @@ -195,7 +195,7 @@ impl From<(Option<ChunkOffset>, Option<ChunkOffset>)> for ByteRange {
match (start, end) {
(Some(start), Some(end)) => Self::Bounded(start..end),
(Some(start), None) => Self::From(start),
(None, Some(end)) => Self::Bounded(0..end),
(None, Some(end)) => Self::Last(end),
(None, None) => Self::ALL,
}
}
Expand Down
4 changes: 2 additions & 2 deletions icechunk/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ pub async fn list_branches(
Ok(branches)
}

async fn branch_history<'a, 'b>(
async fn branch_history<'a>(
storage: &'a (dyn Storage + Send + Sync),
storage_settings: &storage::Settings,
branch: &'b str,
branch: &str,
) -> RefResult<impl Stream<Item = RefResult<BranchVersion>> + 'a> {
let key = branch_root(branch)?;
let all = storage.ref_versions(storage_settings, key.as_str()).await?;
Expand Down
8 changes: 4 additions & 4 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,9 +1012,9 @@ async fn updated_nodes<'a>(
.chain(change_set.new_nodes_iterator(manifest_id)))
}

async fn get_node<'a>(
async fn get_node(
asset_manager: &AssetManager,
change_set: &'a ChangeSet,
change_set: &ChangeSet,
snapshot_id: &SnapshotId,
path: &Path,
) -> SessionResult<NodeSnapshot> {
Expand All @@ -1035,9 +1035,9 @@ async fn get_node<'a>(
}
}

async fn get_existing_node<'a>(
async fn get_existing_node(
asset_manager: &AssetManager,
change_set: &'a ChangeSet,
change_set: &ChangeSet,
snapshot_id: &SnapshotId,
path: &Path,
) -> SessionResult<NodeSnapshot> {
Expand Down
Loading