Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add array storage helpers #2065

Merged
merged 31 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ebbfbe0
implement store.list_prefix and store._set_dict
d-v-b Aug 3, 2024
da6083e
simplify string handling
d-v-b Aug 3, 2024
dc5fe47
add nchunks_initialized, and necessary additions for it
d-v-b Aug 2, 2024
b694b6e
rename _iter_chunks to _iter_chunk_coords
d-v-b Aug 2, 2024
6a27ca8
fix test name
d-v-b Aug 3, 2024
d15be9a
bring in correct store list_dir implementations
d-v-b Aug 3, 2024
ef34f25
Merge branch 'v3' of https://github.com/zarr-developers/zarr-python i…
d-v-b Aug 12, 2024
962ffed
bump numcodecs to dodge zstd exception
d-v-b Aug 12, 2024
5c98ab4
remove store._set_dict, and add _set_many and get_many instead
d-v-b Aug 12, 2024
9e64fa8
update deprecation warning template
d-v-b Aug 13, 2024
a4b4696
add a type annotation
d-v-b Aug 13, 2024
04b1d6a
refactor chunk iterators. they are not properties any more, just meth…
d-v-b Aug 13, 2024
12b3bc1
Merge branch 'v3' of https://github.com/zarr-developers/zarr-python i…
d-v-b Aug 13, 2024
3e2c656
Merge branch 'v3' of github.com:zarr-developers/zarr-python into add-…
d-v-b Sep 19, 2024
b7c1a56
_get_many returns tuple[str, buffer]
d-v-b Sep 19, 2024
44bed5c
stricter store types
d-v-b Sep 19, 2024
021d41e
Merge branch 'v3' of github.com:zarr-developers/zarr-python into add-…
d-v-b Sep 23, 2024
2db860b
fix types
d-v-b Sep 23, 2024
45f27b1
Merge branch 'v3' of github.com:zarr-developers/zarr-python into add-…
d-v-b Sep 23, 2024
78f22b9
Merge branch 'v3' of github.com:zarr-developers/zarr-python into add-…
d-v-b Sep 24, 2024
b5e08e8
lint
d-v-b Sep 24, 2024
43743e1
remove deprecation warnings
d-v-b Sep 24, 2024
f65a6e8
fix zip list_prefix
d-v-b Sep 24, 2024
df6f9a7
tests for nchunks_initialized, chunks_initialized; add selection_shap…
d-v-b Sep 24, 2024
e60cbe0
add nchunks test
d-v-b Sep 24, 2024
5c54449
fix docstrings
d-v-b Sep 24, 2024
e8598c6
fix docstring
d-v-b Sep 24, 2024
ae216e1
Merge branch 'v3' into add-array-storage-helpers
d-v-b Sep 24, 2024
768ab43
revert unnecessary changes to project config
d-v-b Sep 24, 2024
c953f21
Merge branch 'add-array-storage-helpers' of github.com:d-v-b/zarr-pyt…
d-v-b Sep 24, 2024
f0d61b2
Merge branch 'v3' of https://github.com/zarr-developers/zarr-python i…
d-v-b Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,5 @@ filterwarnings = [
"error:::zarr.*",
"ignore:PY_SSIZE_T_CLEAN will be required.*:DeprecationWarning",
"ignore:The loop argument is deprecated since Python 3.8.*:DeprecationWarning",
"ignore:.*is transitional and will be removed.*:DeprecationWarning",
]
10 changes: 9 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Mapping
from typing import Any, NamedTuple, Protocol, runtime_checkable

from typing_extensions import Self
Expand Down Expand Up @@ -221,6 +221,14 @@ def close(self) -> None:
self._is_open = False
pass

async def _set_dict(self, dict: Mapping[str, Buffer]) -> None:
"""
Insert objects into storage as defined by a prefix: value mapping.
"""
for key, value in dict.items():
await self.set(key, value)
return None


@runtime_checkable
class ByteGetter(Protocol):
Expand Down
140 changes: 134 additions & 6 deletions src/zarr/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
# Questions to consider:
# 1. Was splitting the array into two classes really necessary?
from asyncio import gather
from collections.abc import Iterable
from collections.abc import Iterable, Iterator
from dataclasses import dataclass, field, replace
from typing import Any, Literal, cast

import numpy as np
import numpy.typing as npt
from typing_extensions import deprecated

from zarr.abc.codec import Codec, CodecPipeline
from zarr.abc.store import set_or_delete
Expand Down Expand Up @@ -52,11 +53,13 @@
OrthogonalSelection,
Selection,
VIndex,
ceildiv,
check_fields,
check_no_multi_fields,
is_pure_fancy_indexing,
is_pure_orthogonal_indexing,
is_scalar,
iter_grid,
pop_fields,
)
from zarr.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata
Expand All @@ -65,7 +68,7 @@
from zarr.store.core import (
ensure_no_existing_node,
)
from zarr.sync import sync
from zarr.sync import collect_aiterator, sync


def parse_array_metadata(data: Any) -> ArrayV2Metadata | ArrayV3Metadata:
Expand Down Expand Up @@ -393,10 +396,12 @@ def shape(self) -> ChunkCoords:
def chunks(self) -> ChunkCoords:
if isinstance(self.metadata.chunk_grid, RegularChunkGrid):
return self.metadata.chunk_grid.chunk_shape
else:
raise ValueError(
f"chunk attribute is only available for RegularChunkGrid, this array has a {self.metadata.chunk_grid}"
)

msg = (
f"The `chunks` attribute is only defined for arrays using `RegularChunkGrid`."
f"This array has a {self.metadata.chunk_grid} instead."
)
raise NotImplementedError(msg)

@property
def size(self) -> int:
Expand Down Expand Up @@ -437,6 +442,59 @@ def basename(self) -> str | None:
return self.name.split("/")[-1]
return None

@property
@deprecated(
"cdata_shape is transitional and will be removed in an early zarr-python v3 release."
)
def cdata_shape(self) -> ChunkCoords:
"""
The shape of the chunk grid for this array.
"""
return tuple(ceildiv(s, c) for s, c in zip(self.shape, self.chunks, strict=False))

@property
@deprecated("nchunks is transitional and will be removed in an early zarr-python v3 release.")
def nchunks(self) -> int:
"""
The number of chunks in the stored representation of this array.
"""
return product(self.cdata_shape)

@property
def _iter_chunk_coords(self) -> Iterator[ChunkCoords]:
"""
Produce an iterator over the coordinates of each chunk, in chunk grid space.
"""
return iter_grid(self.cdata_shape)

@property
def _iter_chunk_keys(self) -> Iterator[str]:
"""
Return an iterator over the keys of each chunk.
"""
for k in self._iter_chunk_coords:
yield self.metadata.encode_chunk_key(k)

@property
def _iter_chunk_regions(self) -> Iterator[tuple[slice, ...]]:
"""
Iterate over the regions spanned by each chunk.
"""
for cgrid_position in self._iter_chunk_coords:
out: tuple[slice, ...] = ()
for c_pos, c_shape in zip(cgrid_position, self.chunks, strict=False):
start = c_pos * c_shape
stop = start + c_shape
out += (slice(start, stop, 1),)
yield out

@property
def nbytes(self) -> int:
"""
The number of bytes that can be stored in this array.
"""
return self.nchunks * self.dtype.itemsize

async def _get_selection(
self,
indexer: Indexer,
Expand Down Expand Up @@ -735,6 +793,52 @@ def read_only(self) -> bool:
def fill_value(self) -> Any:
return self.metadata.fill_value

@property
@deprecated(
"cdata_shape is transitional and will be removed in an early zarr-python v3 release."
)
def cdata_shape(self) -> ChunkCoords:
"""
The shape of the chunk grid for this array.
"""
return tuple(ceildiv(s, c) for s, c in zip(self.shape, self.chunks, strict=False))

@property
@deprecated("nchunks is transitional and will be removed in an early zarr-python v3 release.")
def nchunks(self) -> int:
"""
The number of chunks in the stored representation of this array.
"""
return self._async_array.nchunks

@property
def _iter_chunks(self) -> Iterator[ChunkCoords]:
"""
Produce an iterator over the coordinates of each chunk, in chunk grid space.
"""
yield from self._async_array._iter_chunk_coords

@property
def nbytes(self) -> int:
"""
The number of bytes that can be stored in this array.
"""
return self._async_array.nbytes

@property
def _iter_chunk_keys(self) -> Iterator[str]:
"""
Return an iterator over the keys of each chunk.
"""
yield from self._async_array._iter_chunk_keys

@property
def _iter_chunk_regions(self) -> Iterator[tuple[slice, ...]]:
"""
Iterate over the regions spanned by each chunk.
"""
yield from self._async_array._iter_chunk_regions

def __array__(
self, dtype: npt.DTypeLike | None = None, copy: bool | None = None
) -> NDArrayLike:
Expand Down Expand Up @@ -2056,3 +2160,27 @@ def info(self) -> None:
return sync(
self._async_array.info(),
)


@deprecated(
"nchunks_initialized is transitional and will be removed in an early zarr-python v3 release."
)
def nchunks_initialized(array: Array) -> int:
return len(chunks_initialized(array))


def chunks_initialized(array: Array) -> tuple[str, ...]:
"""
Return the keys of all the chunks that exist in storage.
"""
# todo: make this compose with the underlying async iterator
store_contents = list(
collect_aiterator(array.store_path.store.list_prefix(prefix=array.store_path.path))
)
out: list[str] = []

for chunk_key in array._iter_chunk_keys:
if chunk_key in store_contents:
out.append(chunk_key)

return tuple(out)
28 changes: 26 additions & 2 deletions src/zarr/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
import math
import numbers
import operator
from collections.abc import Iterator, Sequence
from collections.abc import Iterable, Iterator, Sequence
from dataclasses import dataclass
from enum import Enum
from functools import reduce
from types import EllipsisType
from typing import (
TYPE_CHECKING,
Any,
NamedTuple,
Protocol,
TypeGuard,
Expand All @@ -27,6 +26,8 @@
from zarr.common import ChunkCoords, product

if TYPE_CHECKING:
from typing import Any

from zarr.array import Array
from zarr.chunk_grids import ChunkGrid

Expand Down Expand Up @@ -86,6 +87,29 @@ def ceildiv(a: float, b: float) -> int:
return math.ceil(a / b)


def iter_grid(shape: Iterable[int]) -> Iterator[ChunkCoords]:
"""
Iterate over the elements of grid.

Takes a grid shape expressed as an iterable of ints and
yields tuples bounded by that grid shape in lexicographic order.

Examples
--------
>>> tuple(iter_grid((1,)))
((0,),)

>>> tuple(iter_grid((2,3)))
((0, 0), (0, 1), (0, 2), (1, 0), (1, 1), (1, 2))

Parameters
----------
shape: Iterable[int]
The shape of the grid to iterate over.
"""
yield from itertools.product(*(map(range, shape)))


def is_integer(x: Any) -> TypeGuard[int]:
"""True if x is an integer (both pure Python or NumPy)."""
return isinstance(x, numbers.Integral) and not is_bool(x)
Expand Down
10 changes: 10 additions & 0 deletions src/zarr/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def get_chunk_spec(
def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str:
pass

@abstractmethod
def decode_chunk_key(self, key: str) -> ChunkCoords:
pass

@abstractmethod
def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
pass
Expand Down Expand Up @@ -252,6 +256,9 @@ def get_chunk_spec(
def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str:
return self.chunk_key_encoding.encode_chunk_key(chunk_coords)

def decode_chunk_key(self, key: str) -> ChunkCoords:
return self.chunk_key_encoding.decode_chunk_key(key)

def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
def _json_convert(o: Any) -> Any:
if isinstance(o, np.dtype):
Expand Down Expand Up @@ -445,6 +452,9 @@ def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str:
chunk_identifier = self.dimension_separator.join(map(str, chunk_coords))
return "0" if chunk_identifier == "" else chunk_identifier

def decode_chunk_key(self, key: str) -> ChunkCoords:
return tuple(map(int, key.split(self.dimension_separator)))

def update_shape(self, shape: ChunkCoords) -> Self:
return replace(self, shape=shape)

Expand Down
6 changes: 1 addition & 5 deletions src/zarr/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,10 @@ async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
-------
AsyncGenerator[str, None]
"""
for p in (self.root / prefix).rglob("*"):
if p.is_file():
yield str(p)

to_strip = str(self.root) + "/"
for p in (self.root / prefix).rglob("*"):
if p.is_file():
yield str(p).replace(to_strip, "")
yield str(p).removeprefix(to_strip)

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/store/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def list(self) -> AsyncGenerator[str, None]:
async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
for key in self._store_dict:
if key.startswith(prefix):
yield key
yield key.removeprefix(prefix)

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
if prefix.endswith("/"):
Expand Down
5 changes: 3 additions & 2 deletions src/zarr/store/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,6 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
yield onefile

async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
for onefile in await self._fs._ls(prefix, detail=False):
yield onefile
find_str = "/".join([self.path, prefix])
for onefile in await self._fs._find(find_str):
yield onefile.removeprefix(find_str)
17 changes: 17 additions & 0 deletions src/zarr/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,23 @@ def _get_loop() -> asyncio.AbstractEventLoop:
return loop[0]


async def _collect_aiterator(data: AsyncIterator[T]) -> tuple[T, ...]:
"""
Collect an entire async iterator into a tuple
"""
result = []
async for x in data:
result.append(x)
return tuple(result)


def collect_aiterator(data: AsyncIterator[T]) -> tuple[T, ...]:
"""
Synchronously collect an entire async iterator into a tuple.
"""
return sync(_collect_aiterator(data))


class SyncMixin:
def _sync(self, coroutine: Coroutine[Any, Any, T]) -> T:
# TODO: refactor this to to take *args and **kwargs and pass those to the method
Expand Down
Loading