Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Narrow JSON type, ensure that to_dict always returns a dict, and v2 filter / compressor parsing #2179

Merged
merged 6 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from zarr.abc.store import ByteGetter, ByteSetter
from zarr.core.array_spec import ArraySpec
from zarr.core.chunk_grids import ChunkGrid
from zarr.core.common import JSON
from zarr.core.indexing import SelectorTuple

__all__ = [
Expand Down Expand Up @@ -242,7 +241,7 @@ async def encode_partial(
)


class CodecPipeline(Metadata):
class CodecPipeline:
"""Base class for implementing CodecPipeline.
A CodecPipeline implements the read and write paths for chunk data.
On the read path, it is responsible for fetching chunks from a store (via ByteGetter),
Expand All @@ -266,12 +265,12 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:

@classmethod
@abstractmethod
def from_list(cls, codecs: Iterable[Codec]) -> Self:
"""Creates a codec pipeline from a list of codecs.
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
"""Creates a codec pipeline from an iterable of codecs.

Parameters
----------
codecs : list[Codec]
codecs : Iterable[Codec]

Returns
-------
Expand Down Expand Up @@ -402,15 +401,6 @@ async def write(
"""
...

@classmethod
def from_dict(cls, data: Iterable[JSON | Codec]) -> Self:
"""
Create an instance of the model from a dictionary
"""
...

return cls(**data)


async def _batching_helper(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/abc/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@dataclass(frozen=True)
class Metadata:
def to_dict(self) -> JSON:
def to_dict(self) -> dict[str, JSON]:
"""
Recursively serialize this model to a dictionary.
This method inspects the fields of self and calls `x.to_dict()` for any fields that
Expand Down
11 changes: 5 additions & 6 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec)

@dataclass(frozen=True)
class V2Filters(ArrayArrayCodec):
filters: list[dict[str, JSON]]
filters: tuple[numcodecs.abc.Codec, ...] | None

is_fixed_size = False

Expand All @@ -79,8 +79,7 @@ async def _decode_single(
chunk_ndarray = chunk_array.as_ndarray_like()
# apply filters in reverse order
if self.filters is not None:
for filter_metadata in self.filters[::-1]:
filter = numcodecs.get_codec(filter_metadata)
for filter in self.filters[::-1]:
chunk_ndarray = await to_thread(filter.decode, chunk_ndarray)

# ensure correct chunk shape
Expand All @@ -99,9 +98,9 @@ async def _encode_single(
) -> NDBuffer | None:
chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order)

for filter_metadata in self.filters:
filter = numcodecs.get_codec(filter_metadata)
chunk_ndarray = await to_thread(filter.encode, chunk_ndarray)
if self.filters is not None:
for filter in self.filters:
chunk_ndarray = await to_thread(filter.encode, chunk_ndarray)

return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)

Expand Down
4 changes: 2 additions & 2 deletions src/zarr/codecs/blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ def to_dict(self) -> dict[str, JSON]:
"name": "blosc",
"configuration": {
"typesize": self.typesize,
"cname": self.cname,
"cname": self.cname.value,
"clevel": self.clevel,
"shuffle": self.shuffle,
"shuffle": self.shuffle.value,
"blocksize": self.blocksize,
},
}
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/codecs/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def to_dict(self) -> dict[str, JSON]:
if self.endian is None:
return {"name": "bytes"}
else:
return {"name": "bytes", "configuration": {"endian": self.endian}}
return {"name": "bytes", "configuration": {"endian": self.endian.value}}

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
if array_spec.dtype.itemsize == 0:
Expand Down
30 changes: 6 additions & 24 deletions src/zarr/codecs/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

from collections.abc import Iterable, Iterator
from dataclasses import dataclass
from itertools import islice, pairwise
from typing import TYPE_CHECKING, Any, TypeVar
Expand All @@ -15,12 +14,14 @@
Codec,
CodecPipeline,
)
from zarr.core.common import JSON, ChunkCoords, concurrent_map, parse_named_configuration
from zarr.core.common import ChunkCoords, concurrent_map
from zarr.core.config import config
from zarr.core.indexing import SelectorTuple, is_scalar, is_total_slice
from zarr.registry import get_codec_class, register_pipeline
from zarr.registry import register_pipeline

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator

import numpy as np
from typing_extensions import Self

Expand Down Expand Up @@ -68,30 +69,11 @@ class BatchedCodecPipeline(CodecPipeline):
bytes_bytes_codecs: tuple[BytesBytesCodec, ...]
batch_size: int

@classmethod
def from_dict(cls, data: Iterable[JSON | Codec], *, batch_size: int | None = None) -> Self:
out: list[Codec] = []
if not isinstance(data, Iterable):
raise TypeError(f"Expected iterable, got {type(data)}")

for c in data:
if isinstance(
c, ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec
): # Can't use Codec here because of mypy limitation
out.append(c)
else:
name_parsed, _ = parse_named_configuration(c, require_configuration=False)
out.append(get_codec_class(name_parsed).from_dict(c)) # type: ignore[arg-type]
return cls.from_list(out, batch_size=batch_size)

def to_dict(self) -> JSON:
return [c.to_dict() for c in self]

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
return type(self).from_list([c.evolve_from_array_spec(array_spec=array_spec) for c in self])
return type(self).from_codecs(c.evolve_from_array_spec(array_spec=array_spec) for c in self)

@classmethod
def from_list(cls, codecs: Iterable[Codec], *, batch_size: int | None = None) -> Self:
def from_codecs(cls, codecs: Iterable[Codec], *, batch_size: int | None = None) -> Self:
array_array_codecs, array_bytes_codec, bytes_bytes_codecs = codecs_from_list(codecs)

return cls(
Expand Down
20 changes: 10 additions & 10 deletions src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ShardingCodecIndexLocation(Enum):
end = "end"


def parse_index_location(data: JSON) -> ShardingCodecIndexLocation:
def parse_index_location(data: object) -> ShardingCodecIndexLocation:
return parse_enum(data, ShardingCodecIndexLocation)


Expand Down Expand Up @@ -333,7 +333,7 @@ def __init__(
chunk_shape: ChunkCoordsLike,
codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(),),
index_codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(), Crc32cCodec()),
index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end,
index_location: ShardingCodecIndexLocation | str = ShardingCodecIndexLocation.end,
) -> None:
chunk_shape_parsed = parse_shapelike(chunk_shape)
codecs_parsed = parse_codecs(codecs)
Expand Down Expand Up @@ -373,16 +373,16 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:

@property
def codec_pipeline(self) -> CodecPipeline:
return get_pipeline_class().from_list(self.codecs)
return get_pipeline_class().from_codecs(self.codecs)

def to_dict(self) -> dict[str, JSON]:
return {
"name": "sharding_indexed",
"configuration": {
"chunk_shape": list(self.chunk_shape),
"codecs": [s.to_dict() for s in self.codecs],
"index_codecs": [s.to_dict() for s in self.index_codecs],
"index_location": self.index_location,
"chunk_shape": self.chunk_shape,
"codecs": tuple([s.to_dict() for s in self.codecs]),
"index_codecs": tuple([s.to_dict() for s in self.index_codecs]),
"index_location": self.index_location.value,
},
}

Expand Down Expand Up @@ -620,7 +620,7 @@ async def _decode_shard_index(
index_array = next(
iter(
await get_pipeline_class()
.from_list(self.index_codecs)
.from_codecs(self.index_codecs)
.decode(
[(index_bytes, self._get_index_chunk_spec(chunks_per_shard))],
)
Expand All @@ -633,7 +633,7 @@ async def _encode_shard_index(self, index: _ShardIndex) -> Buffer:
index_bytes = next(
iter(
await get_pipeline_class()
.from_list(self.index_codecs)
.from_codecs(self.index_codecs)
.encode(
[
(
Expand All @@ -651,7 +651,7 @@ async def _encode_shard_index(self, index: _ShardIndex) -> Buffer:
def _shard_index_size(self, chunks_per_shard: ChunkCoords) -> int:
return (
get_pipeline_class()
.from_list(self.index_codecs)
.from_codecs(self.index_codecs)
.compute_encoded_size(
16 * product(chunks_per_shard), self._get_index_chunk_spec(chunks_per_shard)
)
Expand Down
2 changes: 1 addition & 1 deletion src/zarr/codecs/transpose.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
return cls(**configuration_parsed) # type: ignore[arg-type]

def to_dict(self) -> dict[str, JSON]:
return {"name": "transpose", "configuration": {"order": list(self.order)}}
return {"name": "transpose", "configuration": {"order": tuple(self.order)}}

def validate(self, shape: tuple[int, ...], dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None:
if len(self.order) != len(shape):
Expand Down
20 changes: 6 additions & 14 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ def parse_array_metadata(data: Any) -> ArrayV2Metadata | ArrayV3Metadata:

def create_codec_pipeline(metadata: ArrayV2Metadata | ArrayV3Metadata) -> CodecPipeline:
if isinstance(metadata, ArrayV3Metadata):
return get_pipeline_class().from_list(metadata.codecs)
return get_pipeline_class().from_codecs(metadata.codecs)
elif isinstance(metadata, ArrayV2Metadata):
return get_pipeline_class().from_list(
[V2Filters(metadata.filters or []), V2Compressor(metadata.compressor)]
return get_pipeline_class().from_codecs(
[V2Filters(metadata.filters), V2Compressor(metadata.compressor)]
)
else:
raise TypeError
Expand Down Expand Up @@ -299,8 +299,6 @@ async def _create_v2(
attributes: dict[str, JSON] | None = None,
exists_ok: bool = False,
) -> AsyncArray:
import numcodecs

if not exists_ok:
await ensure_no_existing_node(store_path, zarr_format=2)
if order is None:
Expand All @@ -315,15 +313,9 @@ async def _create_v2(
chunks=chunks,
order=order,
dimension_separator=dimension_separator,
fill_value=0 if fill_value is None else fill_value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restoring the 0 if fill_value is None else fill_value does fix the failure in tests/v3/test_array.py::test_serializable_sync_array.

But that said, I like the change here... Setting a default fill value here seems wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, but this is just for v2 metadata, which is allowed to be None. Maybe this isn't the wrong spot to do that.

compressor=(
numcodecs.get_codec(compressor).get_config() if compressor is not None else None
),
filters=(
[numcodecs.get_codec(filter).get_config() for filter in filters]
if filters is not None
else None
),
fill_value=fill_value,
compressor=compressor,
filters=filters,
attributes=attributes,
)
array = cls(metadata=metadata, store_path=store_path)
Expand Down
6 changes: 3 additions & 3 deletions src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import contextvars
import functools
import operator
from collections.abc import Iterable
from collections.abc import Iterable, Mapping
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -32,7 +32,7 @@
ChunkCoords = tuple[int, ...]
ChunkCoordsLike = Iterable[int]
ZarrFormat = Literal[2, 3]
JSON = None | str | int | float | Enum | dict[str, "JSON"] | list["JSON"] | tuple["JSON", ...]
JSON = None | str | int | float | Mapping[str, "JSON"] | tuple["JSON", ...]
MemoryOrder = Literal["C", "F"]
AccessModeLiteral = Literal["r", "r+", "a", "w", "w-"]

Expand Down Expand Up @@ -80,7 +80,7 @@ def enum_names(enum: type[E]) -> Iterator[str]:
yield item.name


def parse_enum(data: JSON, cls: type[E]) -> E:
def parse_enum(data: object, cls: type[E]) -> E:
if isinstance(data, cls):
return data
if not isinstance(data, str):
Expand Down
Loading
Loading