From f8cabaa6ddc7642d4c10a2c6ef3fd3c9cc7b2e58 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 17 Nov 2021 12:33:13 +0100 Subject: [PATCH 01/12] initial sharding prototype --- zarr/_storage/sharded_store.py | 102 +++++++++++++++++++++++++++++++++ zarr/_storage/store.py | 1 + zarr/core.py | 37 ++++++++++-- zarr/creation.py | 6 +- zarr/meta.py | 8 ++- zarr/storage.py | 10 +++- zarr/util.py | 33 +++++++++++ 7 files changed, 186 insertions(+), 11 deletions(-) create mode 100644 zarr/_storage/sharded_store.py diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py new file mode 100644 index 0000000000..440ec20a2c --- /dev/null +++ b/zarr/_storage/sharded_store.py @@ -0,0 +1,102 @@ +from functools import reduce +from itertools import product +from typing import Any, Iterable, Iterator, Optional, Tuple + +from zarr._storage.store import BaseStore, Store +from zarr.storage import StoreLike, array_meta_key, attrs_key, group_meta_key + + +def _cum_prod(x: Iterable[int]) -> Iterable[int]: + prod = 1 + yield prod + for i in x[:-1]: + prod *= i + yield prod + + +class ShardedStore(Store): + """This class should not be used directly, + but is added to an Array as a wrapper when needed automatically.""" + + def __init__( + self, store: + StoreLike, + shards: Tuple[int, ...], + dimension_separator: str, + chunk_has_constant_size: bool, + fill_value: bytes, + value_len: Optional[int], + ) -> None: + self._store: BaseStore = BaseStore._ensure_store(store) + self._shards = shards + # This defines C/F-order + self._shards_cumprod = tuple(_cum_prod(shards)) + self._num_chunks_per_shard = reduce(lambda x, y: x*y, shards, 1) + self._dimension_separator = dimension_separator + # TODO: add jumptable for compressed data + assert not chunk_has_constant_size, "Currently only uncompressed data can be used." + self._chunk_has_constant_size = chunk_has_constant_size + if not chunk_has_constant_size: + assert value_len is not None + self._fill_chunk = fill_value * value_len + else: + self._fill_chunk = None + + # TODO: add warnings for ineffective reads/writes: + # * warn if partial reads are not available + # * optionally warn on unaligned writes if no partial writes are available + + def __key_to_sharded__(self, key: str) -> Tuple[str, int]: + # TODO: allow to be in a group (aka only use last parts for dimensions) + subkeys = map(int, key.split(self._dimension_separator)) + + shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(subkeys, self._shards))) + shard_key = self._dimension_separator.join(map(str, shard_tuple)) + index = sum(i * j for i, j in zip(index_tuple, self._shards_cumprod)) + return shard_key, index + + def __get_chunk_slice__(self, shard_key: str, shard_index: int) -> Tuple[int, int]: + # TODO: here we would use the jumptable for compression + start = shard_index * len(self._fill_chunk) + return slice(start, start + len(self._fill_chunk)) + + def __getitem__(self, key: str) -> bytes: + shard_key, shard_index = self.__key_to_sharded__(key) + chunk_slice = self.__get_chunk_slice__(shard_key, shard_index) + # TODO use partial reads if available + full_shard_value = self._store[shard_key] + return full_shard_value[chunk_slice] + + def __setitem__(self, key: str, value: bytes) -> None: + shard_key, shard_index = self.__key_to_sharded__(key) + if shard_key in self._store: + full_shard_value = bytearray(self._store[shard_key]) + else: + full_shard_value = bytearray(self._fill_chunk * self._num_chunks_per_shard) + chunk_slice = self.__get_chunk_slice__(shard_key, shard_index) + # TODO use partial writes if available + full_shard_value[chunk_slice] = value + self._store[shard_key] = full_shard_value + + def __delitem__(self, key) -> None: + # TODO not implemented yet + # For uncompressed chunks, deleting the "last" chunk might need to be detected. + raise NotImplementedError("Deletion is not yet implemented") + + def __iter__(self) -> Iterator[str]: + for shard_key in self._store.__iter__(): + if any(shard_key.endswith(i) for i in (array_meta_key, group_meta_key, attrs_key)): + yield shard_key + else: + # TODO: allow to be in a group (aka only use last parts for dimensions) + subkeys = tuple(map(int, shard_key.split(self._dimension_separator))) + for offset in product(*(range(i) for i in self._shards)): + original_key = (subkeys_i * shards_i + offset_i for subkeys_i, offset_i, shards_i in zip(subkeys, offset, self._shards)) + yield self._dimension_separator.join(map(str, original_key)) + + def __len__(self) -> int: + return sum(1 for _ in self.keys()) + + # TODO: For efficient reads and writes, we need to implement + # getitems, setitems & delitems + # and combine writes/reads/deletions to the same shard. diff --git a/zarr/_storage/store.py b/zarr/_storage/store.py index 6f5bf78e28..6714e729f7 100644 --- a/zarr/_storage/store.py +++ b/zarr/_storage/store.py @@ -110,6 +110,7 @@ def _ensure_store(store: Any): class Store(BaseStore): + # TODO: document methods which allow optimizations, e.g. delitems, setitems, getitems, listdir, … """Abstract store class used by implementations following the Zarr v2 spec. Adds public `listdir`, `rename`, and `rmdir` methods on top of BaseStore. diff --git a/zarr/core.py b/zarr/core.py index d366139423..562e756077 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -10,6 +10,7 @@ from numcodecs.compat import ensure_bytes, ensure_ndarray from collections.abc import MutableMapping +from zarr._storage.sharded_store import ShardedStore from zarr.attrs import Attributes from zarr.codecs import AsType, get_codec @@ -213,6 +214,7 @@ def _load_metadata_nosync(self): self._meta = meta self._shape = meta['shape'] self._chunks = meta['chunks'] + self._shards = meta.get('shards') self._dtype = meta['dtype'] self._fill_value = meta['fill_value'] self._order = meta['order'] @@ -264,7 +266,9 @@ def _flush_metadata_nosync(self): filters_config = None meta = dict(shape=self._shape, chunks=self._chunks, dtype=self._dtype, compressor=compressor_config, fill_value=self._fill_value, - order=self._order, filters=filters_config) + order=self._order, filters=filters_config, shards=self._shards) + if self._shards is not None: + meta['shards'] = self._shards mkey = self._key_prefix + array_meta_key self._store[mkey] = self._store._metadata_class.encode_array_metadata(meta) @@ -307,11 +311,26 @@ def read_only(self, value): @property def chunk_store(self): - """A MutableMapping providing the underlying storage for array chunks.""" if self._chunk_store is None: - return self._store + chunk_store = self._store + else: + chunk_store = self._chunk_store + """A MutableMapping providing the underlying storage for array chunks.""" + if self._shards is None: + return chunk_store else: - return self._chunk_store + try: + return self._cached_sharded_store + except AttributeError: + self._cached_sharded_store = BaseStore._ensure_store(ShardedStore( + chunk_store, + shards=self._shards, + dimension_separator=self._dimension_separator, + chunk_has_constant_size = self._compressor is not None, # TODO add exceptions, e.g. dtype==object + fill_value = np.full(1, fill_value=self._fill_value or 0, dtype=self._dtype).tobytes(), + value_len = reduce(operator.mul, self._chunks, 1), + )) + return self._cached_sharded_store @property def shape(self): @@ -332,6 +351,12 @@ def chunks(self): chunk of the array.""" return self._chunks + @property + def shards(self): + """A tuple of integers describing the number of chunks in each shard + of the array.""" + return self._shards + @property def dtype(self): """The NumPy data type.""" @@ -1899,7 +1924,7 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, and hasattr(self._compressor, "decode_partial") and not fields and self.dtype != object - and hasattr(self.chunk_store, "getitems") + and hasattr(self.chunk_store, "getitems") # TODO: this should rather check for read_block or similar ): partial_read_decode = True cdatas = { @@ -2236,6 +2261,7 @@ def digest(self, hashname="sha1"): h = hashlib.new(hashname) + # TODO: operate on shards here if available: for i in itertools.product(*[range(s) for s in self.cdata_shape]): h.update(self.chunk_store.get(self._chunk_key(i), b"")) @@ -2362,6 +2388,7 @@ def _resize_nosync(self, *args): except KeyError: # chunk not initialized pass + # TODO: collect all chunks do delete and use _chunk_delitems def append(self, data, axis=0): """Append `data` to `axis`. diff --git a/zarr/creation.py b/zarr/creation.py index 244a9b080c..b669c4241b 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -1,3 +1,4 @@ +from typing import Optional, Tuple, Union from warnings import warn import numpy as np @@ -19,7 +20,8 @@ def create(shape, chunks=True, dtype=None, compressor='default', fill_value=0, order='C', store=None, synchronizer=None, overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, - object_codec=None, dimension_separator=None, write_empty_chunks=True, **kwargs): + object_codec=None, dimension_separator=None, write_empty_chunks=True, + shards: Union[int, Tuple[int, ...], None]=None, **kwargs): """Create an array. Parameters @@ -145,7 +147,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, overwrite=overwrite, path=path, chunk_store=chunk_store, filters=filters, object_codec=object_codec, - dimension_separator=dimension_separator) + dimension_separator=dimension_separator, shards=shards) # instantiate array z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer, diff --git a/zarr/meta.py b/zarr/meta.py index c292b09a14..964858de43 100644 --- a/zarr/meta.py +++ b/zarr/meta.py @@ -51,6 +51,7 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A object_codec = None dimension_separator = meta.get("dimension_separator", None) + shards = meta.get("shards", None) fill_value = cls.decode_fill_value(meta['fill_value'], dtype, object_codec) meta = dict( zarr_format=meta["zarr_format"], @@ -64,6 +65,8 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A ) if dimension_separator: meta['dimension_separator'] = dimension_separator + if shards: + meta['shards'] = tuple(shards) except Exception as e: raise MetadataError("error decoding metadata") from e else: @@ -77,6 +80,7 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: dtype, sdshape = dtype.subdtype dimension_separator = meta.get("dimension_separator") + shards = meta.get("shards") if dtype.hasobject: import numcodecs object_codec = numcodecs.get_codec(meta['filters'][0]) @@ -96,8 +100,8 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: if dimension_separator: meta['dimension_separator'] = dimension_separator - if dimension_separator: - meta["dimension_separator"] = dimension_separator + if shards: + meta['shards'] = shards return json_dumps(meta) diff --git a/zarr/storage.py b/zarr/storage.py index 7170eeaf23..e5e07cc235 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -54,7 +54,7 @@ from zarr.util import (buffer_size, json_loads, nolock, normalize_chunks, normalize_dimension_separator, normalize_dtype, normalize_fill_value, normalize_order, - normalize_shape, normalize_storage_path, retry_call) + normalize_shape, normalize_shards, normalize_storage_path, retry_call) from zarr._storage.absstore import ABSStore # noqa: F401 from zarr._storage.store import (_listdir_from_keys, @@ -236,6 +236,7 @@ def init_array( filters=None, object_codec=None, dimension_separator=None, + shards: Union[int, Tuple[int, ...], None]=None, ): """Initialize an array store with the given configuration. Note that this is a low-level function and there should be no need to call this directly from user code. @@ -353,7 +354,8 @@ def init_array( order=order, overwrite=overwrite, path=path, chunk_store=chunk_store, filters=filters, object_codec=object_codec, - dimension_separator=dimension_separator) + dimension_separator=dimension_separator, + shards=shards) def _init_array_metadata( @@ -370,6 +372,7 @@ def _init_array_metadata( filters=None, object_codec=None, dimension_separator=None, + shards:Union[int, Tuple[int, ...], None] = None, ): # guard conditions @@ -388,6 +391,7 @@ def _init_array_metadata( shape = normalize_shape(shape) + dtype.shape dtype = dtype.base chunks = normalize_chunks(chunks, shape, dtype.itemsize) + shards = normalize_shards(shards, shape) order = normalize_order(order) fill_value = normalize_fill_value(fill_value, dtype) @@ -445,6 +449,8 @@ def _init_array_metadata( compressor=compressor_config, fill_value=fill_value, order=order, filters=filters_config, dimension_separator=dimension_separator) + if shards is not None: + meta["shards"] = shards key = _path_to_prefix(path) + array_meta_key if hasattr(store, '_metadata_class'): store[key] = store._metadata_class.encode_array_metadata(meta) # type: ignore diff --git a/zarr/util.py b/zarr/util.py index d092ffe0de..738e4ae4eb 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -149,6 +149,38 @@ def normalize_chunks( return tuple(chunks) +def normalize_shards( + shards: Optional[Tuple[int, ...]], shape: Tuple[int, ...], +) -> Tuple[int, ...]: + """Convenience function to normalize the `shards` argument for an array + with the given `shape`.""" + + # N.B., expect shape already normalized + + if shards is None: + return None + + # handle 1D convenience form + if isinstance(shards, numbers.Integral): + shards = tuple(int(shards) for _ in shape) + + # handle bad dimensionality + if len(shards) > len(shape): + raise ValueError('too many dimensions in shards') + + # handle underspecified shards + if len(shards) < len(shape): + # assume single shards across remaining dimensions + shards += (1, ) * len(shape) - len(shards) + + # handle None or -1 in shards + if -1 in shards or None in shards: + shards = tuple(s if c == -1 or c is None else int(c) + for s, c in zip(shape, shards)) + + return tuple(shards) + + def normalize_dtype(dtype: Union[str, np.dtype], object_codec) -> Tuple[np.dtype, Any]: # convenience API for object arrays @@ -560,6 +592,7 @@ def __init__(self, store_key, chunk_store): # is it fsstore or an actual fsspec map object assert hasattr(self.chunk_store, "map") self.map = self.chunk_store.map + # maybe use partial_read here also self.fs = self.chunk_store.fs self.store_key = store_key self.buff = None From 8290f1e9f0525555dd0b8fef3094c858ad4b6a6e Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 17 Nov 2021 12:33:34 +0100 Subject: [PATCH 02/12] add small script to test chunking --- chunking_test.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 chunking_test.py diff --git a/chunking_test.py b/chunking_test.py new file mode 100644 index 0000000000..1cb9ea6eef --- /dev/null +++ b/chunking_test.py @@ -0,0 +1,24 @@ +import json +import os + +import zarr + +store = zarr.DirectoryStore("data/chunking_test.zarr") +z = zarr.zeros((20, 3), chunks=(3, 3), shards=(2, 2), store=store, overwrite=True, compressor=None) +z[...] = 42 +z[15, 1] = 389 +z[19, 2] = 1 +z[0, 1] = -4.2 + +print("ONDISK", sorted(os.listdir("data/chunking_test.zarr"))) +assert json.loads(store[".zarray"].decode()) ["shards"] == [2, 2] + +print("STORE", list(store)) +print("CHUNKSTORE (SHARDED)", list(z.chunk_store)) + +z_reopened = zarr.open("data/chunking_test.zarr") +assert z_reopened.shards == (2, 2) +assert z_reopened[15, 1] == 389 +assert z_reopened[19, 2] == 1 +assert z_reopened[0, 1] == -4.2 +assert z_reopened[0, 0] == 42 From a44b2e5802d4fe85859b55effa3d6f1b14431de8 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 17 Nov 2021 14:12:57 +0100 Subject: [PATCH 03/12] Update util.py --- zarr/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zarr/util.py b/zarr/util.py index 738e4ae4eb..b603edd1ef 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -592,7 +592,7 @@ def __init__(self, store_key, chunk_store): # is it fsstore or an actual fsspec map object assert hasattr(self.chunk_store, "map") self.map = self.chunk_store.map - # maybe use partial_read here also + # TODO maybe use partial_read here also self.fs = self.chunk_store.fs self.store_key = store_key self.buff = None From 97a9368ddb021cf01be5b7b1676c8801bdde5663 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 18 Nov 2021 14:24:44 +0100 Subject: [PATCH 04/12] implement feedback --- zarr/_storage/sharded_store.py | 31 +++++++++++++++++++------------ zarr/core.py | 31 +++++++++++++++++-------------- zarr/util.py | 4 ++-- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py index 440ec20a2c..87206755b0 100644 --- a/zarr/_storage/sharded_store.py +++ b/zarr/_storage/sharded_store.py @@ -2,6 +2,8 @@ from itertools import product from typing import Any, Iterable, Iterator, Optional, Tuple +import numpy as np + from zarr._storage.store import BaseStore, Store from zarr.storage import StoreLike, array_meta_key, attrs_key, group_meta_key @@ -19,26 +21,28 @@ class ShardedStore(Store): but is added to an Array as a wrapper when needed automatically.""" def __init__( - self, store: - StoreLike, + self, + store: StoreLike, shards: Tuple[int, ...], dimension_separator: str, - chunk_has_constant_size: bool, - fill_value: bytes, - value_len: Optional[int], + are_chunks_compressed: bool, + dtype: np.dtype, + fill_value: Any, + chunk_size: int, ) -> None: self._store: BaseStore = BaseStore._ensure_store(store) self._shards = shards # This defines C/F-order - self._shards_cumprod = tuple(_cum_prod(shards)) + self._shard_strides = tuple(_cum_prod(shards)) self._num_chunks_per_shard = reduce(lambda x, y: x*y, shards, 1) self._dimension_separator = dimension_separator # TODO: add jumptable for compressed data - assert not chunk_has_constant_size, "Currently only uncompressed data can be used." + chunk_has_constant_size = not are_chunks_compressed and not dtype == object + assert chunk_has_constant_size, "Currently only uncompressed, fixed-length data can be used." self._chunk_has_constant_size = chunk_has_constant_size - if not chunk_has_constant_size: - assert value_len is not None - self._fill_chunk = fill_value * value_len + if chunk_has_constant_size: + binary_fill_value = np.full(1, fill_value=fill_value or 0, dtype=dtype).tobytes() + self._fill_chunk = binary_fill_value * chunk_size else: self._fill_chunk = None @@ -52,11 +56,11 @@ def __key_to_sharded__(self, key: str) -> Tuple[str, int]: shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(subkeys, self._shards))) shard_key = self._dimension_separator.join(map(str, shard_tuple)) - index = sum(i * j for i, j in zip(index_tuple, self._shards_cumprod)) + index = sum(i * j for i, j in zip(index_tuple, self._shard_strides)) return shard_key, index def __get_chunk_slice__(self, shard_key: str, shard_index: int) -> Tuple[int, int]: - # TODO: here we would use the jumptable for compression + # TODO: here we would use the jumptable for compression, which uses shard_key start = shard_index * len(self._fill_chunk) return slice(start, start + len(self._fill_chunk)) @@ -86,8 +90,11 @@ def __delitem__(self, key) -> None: def __iter__(self) -> Iterator[str]: for shard_key in self._store.__iter__(): if any(shard_key.endswith(i) for i in (array_meta_key, group_meta_key, attrs_key)): + # Special keys such as ".zarray" are passed on as-is yield shard_key else: + # For each shard key in the wrapped store, all corresponding chunks are yielded. + # TODO: For compressed chunks we might yield only the actualy contained chunks by reading the jumptables. # TODO: allow to be in a group (aka only use last parts for dimensions) subkeys = tuple(map(int, shard_key.split(self._dimension_separator))) for offset in product(*(range(i) for i in self._shards)): diff --git a/zarr/core.py b/zarr/core.py index 562e756077..01ba6a058e 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -5,6 +5,7 @@ import operator import re from functools import reduce +from typing import Optional, Tuple import numpy as np from numcodecs.compat import ensure_bytes, ensure_ndarray @@ -192,6 +193,9 @@ def __init__( self._oindex = OIndex(self) self._vindex = VIndex(self) + # the sharded store is only initialized when needed + self._cached_sharded_store = None + def _load_metadata(self): """(Re)load metadata from store.""" if self._synchronizer is None: @@ -264,11 +268,11 @@ def _flush_metadata_nosync(self): filters_config = [f.get_config() for f in self._filters] else: filters_config = None + # Possible (unrelated) bug: + # should the dimension_separator also be included in this dict? meta = dict(shape=self._shape, chunks=self._chunks, dtype=self._dtype, compressor=compressor_config, fill_value=self._fill_value, order=self._order, filters=filters_config, shards=self._shards) - if self._shards is not None: - meta['shards'] = self._shards mkey = self._key_prefix + array_meta_key self._store[mkey] = self._store._metadata_class.encode_array_metadata(meta) @@ -311,26 +315,25 @@ def read_only(self, value): @property def chunk_store(self): + """A MutableMapping providing the underlying storage for array chunks.""" if self._chunk_store is None: chunk_store = self._store else: chunk_store = self._chunk_store - """A MutableMapping providing the underlying storage for array chunks.""" if self._shards is None: return chunk_store else: - try: - return self._cached_sharded_store - except AttributeError: - self._cached_sharded_store = BaseStore._ensure_store(ShardedStore( + if self._cached_sharded_store is None: + self._cached_sharded_store = ShardedStore( chunk_store, shards=self._shards, dimension_separator=self._dimension_separator, - chunk_has_constant_size = self._compressor is not None, # TODO add exceptions, e.g. dtype==object - fill_value = np.full(1, fill_value=self._fill_value or 0, dtype=self._dtype).tobytes(), - value_len = reduce(operator.mul, self._chunks, 1), - )) - return self._cached_sharded_store + are_chunks_compressed=self._compressor is not None, + dtype=self._dtype, + fill_value=self._fill_value or 0, + chunk_size=reduce(operator.mul, self._chunks, 1), + ) + return self._cached_sharded_store @property def shape(self): @@ -346,9 +349,9 @@ def shape(self, value): self.resize(value) @property - def chunks(self): + def chunks(self) -> Optional[Tuple[int, ...]]: """A tuple of integers describing the length of each dimension of a - chunk of the array.""" + chunk of the array, or None.""" return self._chunks @property diff --git a/zarr/util.py b/zarr/util.py index b603edd1ef..220e49cbd7 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -150,8 +150,8 @@ def normalize_chunks( def normalize_shards( - shards: Optional[Tuple[int, ...]], shape: Tuple[int, ...], -) -> Tuple[int, ...]: + shards: Optional[Tuple[Optional[int], ...]], shape: Tuple[int, ...], +) -> Optional[Tuple[int, ...]]: """Convenience function to normalize the `shards` argument for an array with the given `shape`.""" From 7e2768ac8f44a1cac32707a26ecbb49a1171a2aa Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Fri, 26 Nov 2021 13:44:43 +0100 Subject: [PATCH 05/12] make shard_format configurable, add bitmask for uncompressed chunks --- chunking_test.py | 9 +-- zarr/_storage/sharded_store.py | 119 ++++++++++++++++++++++----------- zarr/core.py | 14 ++-- zarr/creation.py | 4 +- zarr/meta.py | 7 +- zarr/storage.py | 6 +- 6 files changed, 107 insertions(+), 52 deletions(-) diff --git a/chunking_test.py b/chunking_test.py index 1cb9ea6eef..43491f677c 100644 --- a/chunking_test.py +++ b/chunking_test.py @@ -4,17 +4,18 @@ import zarr store = zarr.DirectoryStore("data/chunking_test.zarr") -z = zarr.zeros((20, 3), chunks=(3, 3), shards=(2, 2), store=store, overwrite=True, compressor=None) -z[...] = 42 +z = zarr.zeros((20, 3), chunks=(3, 2), shards=(2, 2), store=store, overwrite=True, compressor=None) +z[:10, :] = 42 z[15, 1] = 389 z[19, 2] = 1 z[0, 1] = -4.2 +print(store[".zarray"].decode()) print("ONDISK", sorted(os.listdir("data/chunking_test.zarr"))) assert json.loads(store[".zarray"].decode()) ["shards"] == [2, 2] -print("STORE", list(store)) -print("CHUNKSTORE (SHARDED)", list(z.chunk_store)) +print("STORE", sorted(store)) +print("CHUNKSTORE (SHARDED)", sorted(z.chunk_store)) z_reopened = zarr.open("data/chunking_test.zarr") assert z_reopened.shards == (2, 2) diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py index 87206755b0..2857e738c7 100644 --- a/zarr/_storage/sharded_store.py +++ b/zarr/_storage/sharded_store.py @@ -1,6 +1,7 @@ +from collections import defaultdict from functools import reduce -from itertools import product -from typing import Any, Iterable, Iterator, Optional, Tuple +import math +from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union import numpy as np @@ -16,7 +17,7 @@ def _cum_prod(x: Iterable[int]) -> Iterable[int]: yield prod -class ShardedStore(Store): +class MortonOrderShardedStore(Store): """This class should not be used directly, but is added to an Array as a wrapper when needed automatically.""" @@ -32,59 +33,97 @@ def __init__( ) -> None: self._store: BaseStore = BaseStore._ensure_store(store) self._shards = shards - # This defines C/F-order - self._shard_strides = tuple(_cum_prod(shards)) self._num_chunks_per_shard = reduce(lambda x, y: x*y, shards, 1) self._dimension_separator = dimension_separator - # TODO: add jumptable for compressed data + chunk_has_constant_size = not are_chunks_compressed and not dtype == object assert chunk_has_constant_size, "Currently only uncompressed, fixed-length data can be used." self._chunk_has_constant_size = chunk_has_constant_size if chunk_has_constant_size: binary_fill_value = np.full(1, fill_value=fill_value or 0, dtype=dtype).tobytes() self._fill_chunk = binary_fill_value * chunk_size - else: - self._fill_chunk = None + self._emtpy_meta = b"\x00" * math.ceil(self._num_chunks_per_shard / 8) + + # unused when using Morton order + self._shard_strides = tuple(_cum_prod(shards)) # TODO: add warnings for ineffective reads/writes: # * warn if partial reads are not available # * optionally warn on unaligned writes if no partial writes are available - - def __key_to_sharded__(self, key: str) -> Tuple[str, int]: + + def __get_meta__(self, shard_content: Union[bytes, bytearray]) -> int: + return int.from_bytes(shard_content[-len(self._emtpy_meta):], byteorder="big") + + def __set_meta__(self, shard_content: bytearray, meta: int) -> None: + shard_content[-len(self._emtpy_meta):] = meta.to_bytes(len(self._emtpy_meta), byteorder="big") + + # The following two methods define the order of the chunks in a shard + # TODO use morton order + def __chunk_key_to_shard_key_and_index__(self, chunk_key: str) -> Tuple[str, int]: # TODO: allow to be in a group (aka only use last parts for dimensions) - subkeys = map(int, key.split(self._dimension_separator)) + chunk_subkeys = map(int, chunk_key.split(self._dimension_separator)) - shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(subkeys, self._shards))) + shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(chunk_subkeys, self._shards))) shard_key = self._dimension_separator.join(map(str, shard_tuple)) index = sum(i * j for i, j in zip(index_tuple, self._shard_strides)) return shard_key, index - def __get_chunk_slice__(self, shard_key: str, shard_index: int) -> Tuple[int, int]: - # TODO: here we would use the jumptable for compression, which uses shard_key + def __shard_key_and_index_to_chunk_key__(self, shard_key_tuple: Tuple[int, ...], shard_index: int) -> str: + offset = tuple(shard_index % s2 // s1 for s1, s2 in zip(self._shard_strides, self._shard_strides[1:] + (self._num_chunks_per_shard,))) + original_key = (shard_key_i * shards_i + offset_i for shard_key_i, offset_i, shards_i in zip(shard_key_tuple, offset, self._shards)) + return self._dimension_separator.join(map(str, original_key)) + + def __keys_to_shard_groups__(self, keys: Iterable[str]) -> Dict[str, List[Tuple[str, str]]]: + shard_indices_per_shard_key = defaultdict(list) + for chunk_key in keys: + shard_key, shard_index = self.__chunk_key_to_shard_key_and_index__(chunk_key) + shard_indices_per_shard_key[shard_key].append((shard_index, chunk_key)) + return shard_indices_per_shard_key + + def __get_chunk_slice__(self, shard_index: int) -> Tuple[int, int]: start = shard_index * len(self._fill_chunk) return slice(start, start + len(self._fill_chunk)) def __getitem__(self, key: str) -> bytes: - shard_key, shard_index = self.__key_to_sharded__(key) - chunk_slice = self.__get_chunk_slice__(shard_key, shard_index) - # TODO use partial reads if available - full_shard_value = self._store[shard_key] - return full_shard_value[chunk_slice] + return self.getitems([key])[key] + + def getitems(self, keys: Iterable[str], **kwargs) -> Dict[str, bytes]: + result = {} + for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(keys).items(): + # TODO use partial reads if available + full_shard_value = self._store[shard_key] + # TODO omit items if they don't exist + for shard_index, chunk_key in chunks_in_shard: + result[chunk_key] = full_shard_value[self.__get_chunk_slice__(shard_index)] + return result def __setitem__(self, key: str, value: bytes) -> None: - shard_key, shard_index = self.__key_to_sharded__(key) - if shard_key in self._store: - full_shard_value = bytearray(self._store[shard_key]) - else: - full_shard_value = bytearray(self._fill_chunk * self._num_chunks_per_shard) - chunk_slice = self.__get_chunk_slice__(shard_key, shard_index) - # TODO use partial writes if available - full_shard_value[chunk_slice] = value - self._store[shard_key] = full_shard_value + self.setitems({key: value}) + + def setitems(self, values: Dict[str, bytes]) -> None: + for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(values.keys()).items(): + if len(chunks_in_shard) == self._num_chunks_per_shard: + # TODO shards at a non-dataset-size aligned surface are not captured here yet + full_shard_value = b"".join( + values[chunk_key] for _, chunk_key in sorted(chunks_in_shard) + ) + b"\xff" * len(self._emtpy_meta) + self._store[shard_key] = full_shard_value + else: + # TODO use partial writes if available + try: + full_shard_value = bytearray(self._store[shard_key]) + except KeyError: + full_shard_value = bytearray(self._fill_chunk * self._num_chunks_per_shard + self._emtpy_meta) + chunk_mask = self.__get_meta__(full_shard_value) + for shard_index, chunk_key in chunks_in_shard: + chunk_mask |= 1 << shard_index + full_shard_value[self.__get_chunk_slice__(shard_index)] = values[chunk_key] + self.__set_meta__(full_shard_value, chunk_mask) + self._store[shard_key] = full_shard_value def __delitem__(self, key) -> None: - # TODO not implemented yet - # For uncompressed chunks, deleting the "last" chunk might need to be detected. + # TODO not implemented yet, also delitems + # Deleting the "last" chunk in a shard needs to remove the whole shard raise NotImplementedError("Deletion is not yet implemented") def __iter__(self) -> Iterator[str]: @@ -94,16 +133,20 @@ def __iter__(self) -> Iterator[str]: yield shard_key else: # For each shard key in the wrapped store, all corresponding chunks are yielded. - # TODO: For compressed chunks we might yield only the actualy contained chunks by reading the jumptables. # TODO: allow to be in a group (aka only use last parts for dimensions) - subkeys = tuple(map(int, shard_key.split(self._dimension_separator))) - for offset in product(*(range(i) for i in self._shards)): - original_key = (subkeys_i * shards_i + offset_i for subkeys_i, offset_i, shards_i in zip(subkeys, offset, self._shards)) - yield self._dimension_separator.join(map(str, original_key)) + shard_key_tuple = tuple(map(int, shard_key.split(self._dimension_separator))) + mask = self.__get_meta__(self._store[shard_key]) + for i in range(self._num_chunks_per_shard): + if mask == 0: + break + if mask & 1: + yield self.__shard_key_and_index_to_chunk_key__(shard_key_tuple, i) + mask >>= 1 def __len__(self) -> int: return sum(1 for _ in self.keys()) - # TODO: For efficient reads and writes, we need to implement - # getitems, setitems & delitems - # and combine writes/reads/deletions to the same shard. + +SHARDED_STORES = { + "morton_order": MortonOrderShardedStore, +} diff --git a/zarr/core.py b/zarr/core.py index 01ba6a058e..2c5505079d 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -11,7 +11,7 @@ from numcodecs.compat import ensure_bytes, ensure_ndarray from collections.abc import MutableMapping -from zarr._storage.sharded_store import ShardedStore +from zarr._storage.sharded_store import SHARDED_STORES from zarr.attrs import Attributes from zarr.codecs import AsType, get_codec @@ -219,6 +219,7 @@ def _load_metadata_nosync(self): self._shape = meta['shape'] self._chunks = meta['chunks'] self._shards = meta.get('shards') + self._shard_format = meta.get('shard_format') self._dtype = meta['dtype'] self._fill_value = meta['fill_value'] self._order = meta['order'] @@ -272,7 +273,8 @@ def _flush_metadata_nosync(self): # should the dimension_separator also be included in this dict? meta = dict(shape=self._shape, chunks=self._chunks, dtype=self._dtype, compressor=compressor_config, fill_value=self._fill_value, - order=self._order, filters=filters_config, shards=self._shards) + order=self._order, filters=filters_config, + shards=self._shards, shard_format=self._shard_format) mkey = self._key_prefix + array_meta_key self._store[mkey] = self._store._metadata_class.encode_array_metadata(meta) @@ -324,7 +326,7 @@ def chunk_store(self): return chunk_store else: if self._cached_sharded_store is None: - self._cached_sharded_store = ShardedStore( + self._cached_sharded_store = SHARDED_STORES[self._shard_format]( chunk_store, shards=self._shards, dimension_separator=self._dimension_separator, @@ -1731,7 +1733,7 @@ def _set_selection(self, indexer, value, fields=None): check_array_shape('value', value, sel_shape) # iterate over chunks in range - if not hasattr(self.store, "setitems") or self._synchronizer is not None \ + if not hasattr(self.chunk_store, "setitems") or self._synchronizer is not None \ or any(map(lambda x: x == 0, self.shape)): # iterative approach for chunk_coords, chunk_selection, out_selection in indexer: @@ -1974,8 +1976,8 @@ def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None): self.chunk_store.setitems(to_store) def _chunk_delitems(self, ckeys): - if hasattr(self.store, "delitems"): - self.store.delitems(ckeys) + if hasattr(self.chunk_store, "delitems"): + self.chunk_store.delitems(ckeys) else: # pragma: no cover # exempting this branch from coverage as there are no extant stores # that will trigger this condition, but it's possible that they diff --git a/zarr/creation.py b/zarr/creation.py index b669c4241b..d31860164a 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -21,7 +21,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, object_codec=None, dimension_separator=None, write_empty_chunks=True, - shards: Union[int, Tuple[int, ...], None]=None, **kwargs): + shards: Union[int, Tuple[int, ...], None]=None, shard_format: str="morton_order", **kwargs): """Create an array. Parameters @@ -147,7 +147,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, overwrite=overwrite, path=path, chunk_store=chunk_store, filters=filters, object_codec=object_codec, - dimension_separator=dimension_separator, shards=shards) + dimension_separator=dimension_separator, shards=shards, shard_format=shard_format) # instantiate array z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer, diff --git a/zarr/meta.py b/zarr/meta.py index 964858de43..d63be624d3 100644 --- a/zarr/meta.py +++ b/zarr/meta.py @@ -52,6 +52,7 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A dimension_separator = meta.get("dimension_separator", None) shards = meta.get("shards", None) + shard_format = meta.get("shard_format", None) fill_value = cls.decode_fill_value(meta['fill_value'], dtype, object_codec) meta = dict( zarr_format=meta["zarr_format"], @@ -67,6 +68,8 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A meta['dimension_separator'] = dimension_separator if shards: meta['shards'] = tuple(shards) + assert shard_format is not None + meta['shard_format'] = shard_format except Exception as e: raise MetadataError("error decoding metadata") from e else: @@ -81,6 +84,7 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: dimension_separator = meta.get("dimension_separator") shards = meta.get("shards") + shard_format = meta.get("shard_format") if dtype.hasobject: import numcodecs object_codec = numcodecs.get_codec(meta['filters'][0]) @@ -99,9 +103,10 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: ) if dimension_separator: meta['dimension_separator'] = dimension_separator - if shards: meta['shards'] = shards + assert shard_format is not None + meta['shard_format'] = shard_format return json_dumps(meta) diff --git a/zarr/storage.py b/zarr/storage.py index e5e07cc235..19709cc115 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -237,6 +237,7 @@ def init_array( object_codec=None, dimension_separator=None, shards: Union[int, Tuple[int, ...], None]=None, + shard_format: Optional[str]=None, ): """Initialize an array store with the given configuration. Note that this is a low-level function and there should be no need to call this directly from user code. @@ -355,7 +356,7 @@ def init_array( chunk_store=chunk_store, filters=filters, object_codec=object_codec, dimension_separator=dimension_separator, - shards=shards) + shards=shards, shard_format=shard_format) def _init_array_metadata( @@ -373,6 +374,7 @@ def _init_array_metadata( object_codec=None, dimension_separator=None, shards:Union[int, Tuple[int, ...], None] = None, + shard_format: Optional[str]=None, ): # guard conditions @@ -392,6 +394,7 @@ def _init_array_metadata( dtype = dtype.base chunks = normalize_chunks(chunks, shape, dtype.itemsize) shards = normalize_shards(shards, shape) + shard_format = shard_format or "morton_order" order = normalize_order(order) fill_value = normalize_fill_value(fill_value, dtype) @@ -451,6 +454,7 @@ def _init_array_metadata( dimension_separator=dimension_separator) if shards is not None: meta["shards"] = shards + meta["shard_format"] = shard_format key = _path_to_prefix(path) + array_meta_key if hasattr(store, '_metadata_class'): store[key] = store._metadata_class.encode_array_metadata(meta) # type: ignore From 80712680ab699fdb7fa5e96977f2a709c505a8a7 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Fri, 26 Nov 2021 13:55:44 +0100 Subject: [PATCH 06/12] add chunking_test.py output to itself --- chunking_test.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/chunking_test.py b/chunking_test.py index 43491f677c..7a5c873846 100644 --- a/chunking_test.py +++ b/chunking_test.py @@ -11,12 +11,38 @@ z[0, 1] = -4.2 print(store[".zarray"].decode()) -print("ONDISK", sorted(os.listdir("data/chunking_test.zarr"))) +# { +# "chunks": [ +# 3, +# 2 +# ], +# "compressor": null, +# "dtype": " Date: Tue, 7 Dec 2021 10:28:22 +0100 Subject: [PATCH 07/12] implement indexed sharded format --- chunking_test.py | 6 +- zarr/_storage/sharded_store.py | 181 ++++++++++++++++++--------------- zarr/core.py | 4 - zarr/creation.py | 2 +- 4 files changed, 105 insertions(+), 88 deletions(-) diff --git a/chunking_test.py b/chunking_test.py index 7a5c873846..b58ed334d7 100644 --- a/chunking_test.py +++ b/chunking_test.py @@ -25,7 +25,7 @@ # 20, # 3 # ], -# "shard_format": "morton_order", +# "shard_format": "indexed", # "shards": [ # 2, # 2 @@ -43,6 +43,10 @@ # STORE ['.zarray', '0.0', '1.0', '2.0', '3.0'] # CHUNKSTORE (SHARDED) ['.zarray', '0.0', '0.1', '1.0', '1.1', '2.0', '2.1', '3.0', '3.1', '5.0', '6.1'] +index_bytes = z.store["0.0"][-2*2*16:] +print("INDEX 0.0", [int.from_bytes(index_bytes[i:i+8], byteorder="big") for i in range(0, len(index_bytes), 8)]) +# INDEX 0.0 [0, 48, 48, 48, 96, 48, 144, 48] + z_reopened = zarr.open("data/chunking_test.zarr") assert z_reopened.shards == (2, 2) assert z_reopened[15, 1] == 389 diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py index 2857e738c7..fe920ee408 100644 --- a/zarr/_storage/sharded_store.py +++ b/zarr/_storage/sharded_store.py @@ -1,7 +1,7 @@ from collections import defaultdict from functools import reduce -import math -from typing import Any, Dict, Iterable, Iterator, List, Tuple, Union +from itertools import product +from typing import Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union import numpy as np @@ -9,15 +9,45 @@ from zarr.storage import StoreLike, array_meta_key, attrs_key, group_meta_key -def _cum_prod(x: Iterable[int]) -> Iterable[int]: - prod = 1 - yield prod - for i in x[:-1]: - prod *= i - yield prod +class _ShardIndex(NamedTuple): + store: "IndexedShardedStore" + offsets_and_lengths: np.ndarray # dtype uint64, shape (shards_0, _shards_1, ..., 2) + def __localize_chunk__(self, chunk: Tuple[int, ...]) -> Tuple[int, ...]: + return tuple(chunk_i % shard_i for chunk_i, shard_i in zip(chunk, self.store._shards)) -class MortonOrderShardedStore(Store): + def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]: + localized_chunk = self.__localize_chunk__(chunk) + chunk_start, chunk_len = self.offsets_and_lengths[localized_chunk] + if chunk_len == 0: + return None + else: + return slice(chunk_start, chunk_start + chunk_len) + + def set_chunk_slice(self, chunk: Tuple[int, ...], chunk_slice: Optional[slice]) -> None: + localized_chunk = self.__localize_chunk__(chunk) + if chunk_slice is None: + self.offsets_and_lengths[localized_chunk] = (0, 0) + else: + self.offsets_and_lengths[localized_chunk] = (chunk_slice.start, chunk_slice.stop - chunk_slice.start) + + def to_bytes(self) -> bytes: + return self.offsets_and_lengths.tobytes(order='C') + + @classmethod + def from_bytes(cls, buffer: Union[bytes, bytearray], store: "IndexedShardedStore") -> "_ShardIndex": + return cls( + store=store, + offsets_and_lengths=np.frombuffer(bytearray(buffer), dtype=">u8").reshape(*store._shards, 2, order="C") + ) + + @classmethod + def create_empty(cls, store: "IndexedShardedStore"): + # reserving 2*64bit per chunk for offset and length: + return cls.from_bytes(b"\x00" * (16 * store._num_chunks_per_shard), store=store) + + +class IndexedShardedStore(Store): """This class should not be used directly, but is added to an Array as a wrapper when needed automatically.""" @@ -26,63 +56,39 @@ def __init__( store: StoreLike, shards: Tuple[int, ...], dimension_separator: str, - are_chunks_compressed: bool, - dtype: np.dtype, - fill_value: Any, - chunk_size: int, ) -> None: self._store: BaseStore = BaseStore._ensure_store(store) self._shards = shards self._num_chunks_per_shard = reduce(lambda x, y: x*y, shards, 1) self._dimension_separator = dimension_separator - chunk_has_constant_size = not are_chunks_compressed and not dtype == object - assert chunk_has_constant_size, "Currently only uncompressed, fixed-length data can be used." - self._chunk_has_constant_size = chunk_has_constant_size - if chunk_has_constant_size: - binary_fill_value = np.full(1, fill_value=fill_value or 0, dtype=dtype).tobytes() - self._fill_chunk = binary_fill_value * chunk_size - self._emtpy_meta = b"\x00" * math.ceil(self._num_chunks_per_shard / 8) - - # unused when using Morton order - self._shard_strides = tuple(_cum_prod(shards)) - # TODO: add warnings for ineffective reads/writes: # * warn if partial reads are not available # * optionally warn on unaligned writes if no partial writes are available - def __get_meta__(self, shard_content: Union[bytes, bytearray]) -> int: - return int.from_bytes(shard_content[-len(self._emtpy_meta):], byteorder="big") - - def __set_meta__(self, shard_content: bytearray, meta: int) -> None: - shard_content[-len(self._emtpy_meta):] = meta.to_bytes(len(self._emtpy_meta), byteorder="big") - - # The following two methods define the order of the chunks in a shard - # TODO use morton order - def __chunk_key_to_shard_key_and_index__(self, chunk_key: str) -> Tuple[str, int]: - # TODO: allow to be in a group (aka only use last parts for dimensions) - chunk_subkeys = map(int, chunk_key.split(self._dimension_separator)) - - shard_tuple, index_tuple = zip(*((subkey // shard_i, subkey % shard_i) for subkey, shard_i in zip(chunk_subkeys, self._shards))) - shard_key = self._dimension_separator.join(map(str, shard_tuple)) - index = sum(i * j for i, j in zip(index_tuple, self._shard_strides)) - return shard_key, index - - def __shard_key_and_index_to_chunk_key__(self, shard_key_tuple: Tuple[int, ...], shard_index: int) -> str: - offset = tuple(shard_index % s2 // s1 for s1, s2 in zip(self._shard_strides, self._shard_strides[1:] + (self._num_chunks_per_shard,))) - original_key = (shard_key_i * shards_i + offset_i for shard_key_i, offset_i, shards_i in zip(shard_key_tuple, offset, self._shards)) - return self._dimension_separator.join(map(str, original_key)) - def __keys_to_shard_groups__(self, keys: Iterable[str]) -> Dict[str, List[Tuple[str, str]]]: shard_indices_per_shard_key = defaultdict(list) for chunk_key in keys: - shard_key, shard_index = self.__chunk_key_to_shard_key_and_index__(chunk_key) - shard_indices_per_shard_key[shard_key].append((shard_index, chunk_key)) + # TODO: allow to be in a group (aka only use last parts for dimensions) + chunk_subkeys = tuple(map(int, chunk_key.split(self._dimension_separator))) + shard_key_tuple = (subkey // shard_i for subkey, shard_i in zip(chunk_subkeys, self._shards)) + shard_key = self._dimension_separator.join(map(str, shard_key_tuple)) + shard_indices_per_shard_key[shard_key].append((chunk_key, chunk_subkeys)) return shard_indices_per_shard_key - def __get_chunk_slice__(self, shard_index: int) -> Tuple[int, int]: - start = shard_index * len(self._fill_chunk) - return slice(start, start + len(self._fill_chunk)) + def __get_index__(self, buffer: Union[bytes, bytearray]) -> _ShardIndex: + # At the end of each shard 2*64bit per chunk for offset and length define the index: + return _ShardIndex.from_bytes(buffer[-16 * self._num_chunks_per_shard:], self) + + def __get_chunks_in_shard(self, shard_key: str) -> Iterator[Tuple[int, ...]]: + # TODO: allow to be in a group (aka only use last parts for dimensions) + shard_key_tuple = tuple(map(int, shard_key.split(self._dimension_separator))) + for chunk_offset in product(*(range(i) for i in self._shards)): + yield tuple( + shard_key_i * shards_i + offset_i + for shard_key_i, offset_i, shards_i + in zip(shard_key_tuple, chunk_offset, self._shards) + ) def __getitem__(self, key: str) -> bytes: return self.getitems([key])[key] @@ -90,11 +96,13 @@ def __getitem__(self, key: str) -> bytes: def getitems(self, keys: Iterable[str], **kwargs) -> Dict[str, bytes]: result = {} for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(keys).items(): - # TODO use partial reads if available + # TODO use partial read if available full_shard_value = self._store[shard_key] - # TODO omit items if they don't exist - for shard_index, chunk_key in chunks_in_shard: - result[chunk_key] = full_shard_value[self.__get_chunk_slice__(shard_index)] + index = self.__get_index__(full_shard_value) + for chunk_key, chunk_subkeys in chunks_in_shard: + chunk_slice = index.get_chunk_slice(chunk_subkeys) + if chunk_slice is not None: + result[chunk_key] = full_shard_value[chunk_slice] return result def __setitem__(self, key: str, value: bytes) -> None: @@ -102,24 +110,36 @@ def __setitem__(self, key: str, value: bytes) -> None: def setitems(self, values: Dict[str, bytes]) -> None: for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(values.keys()).items(): - if len(chunks_in_shard) == self._num_chunks_per_shard: - # TODO shards at a non-dataset-size aligned surface are not captured here yet - full_shard_value = b"".join( - values[chunk_key] for _, chunk_key in sorted(chunks_in_shard) - ) + b"\xff" * len(self._emtpy_meta) - self._store[shard_key] = full_shard_value + all_chunks = set(self.__get_chunks_in_shard(shard_key)) + chunks_to_set = set(chunk_subkeys for _chunk_key, chunk_subkeys in chunks_in_shard) + chunks_to_read = all_chunks - chunks_to_set + new_content = {chunk_subkeys: values[chunk_key] for chunk_key, chunk_subkeys in chunks_in_shard} + try: + # TODO use partial read if available + full_shard_value = self._store[shard_key] + except KeyError: + index = _ShardIndex.create_empty(self) + for chunk_to_read in chunks_to_read: + new_content[chunk_to_read] = b"" else: - # TODO use partial writes if available - try: - full_shard_value = bytearray(self._store[shard_key]) - except KeyError: - full_shard_value = bytearray(self._fill_chunk * self._num_chunks_per_shard + self._emtpy_meta) - chunk_mask = self.__get_meta__(full_shard_value) - for shard_index, chunk_key in chunks_in_shard: - chunk_mask |= 1 << shard_index - full_shard_value[self.__get_chunk_slice__(shard_index)] = values[chunk_key] - self.__set_meta__(full_shard_value, chunk_mask) - self._store[shard_key] = full_shard_value + index = self.__get_index__(full_shard_value) + for chunk_to_read in chunks_to_read: + chunk_slice = index.get_chunk_slice(chunk_to_read) + if chunk_slice is None: + new_content[chunk_to_read] = b"" + else: + new_content[chunk_to_read] = full_shard_value[chunk_slice] + + # TODO use partial write if available and possible (e.g. at the end) + shard_content = b"" + # TODO: order the chunks in the shard: + for chunk_subkeys, chunk_content in new_content.items(): + chunk_slice = slice(len(shard_content), len(shard_content) + len(chunk_content)) + index.set_chunk_slice(chunk_subkeys, chunk_slice) + shard_content += chunk_content + # Appending the index at the end of the shard: + shard_content += index.to_bytes() + self._store[shard_key] = shard_content def __delitem__(self, key) -> None: # TODO not implemented yet, also delitems @@ -133,20 +153,17 @@ def __iter__(self) -> Iterator[str]: yield shard_key else: # For each shard key in the wrapped store, all corresponding chunks are yielded. - # TODO: allow to be in a group (aka only use last parts for dimensions) - shard_key_tuple = tuple(map(int, shard_key.split(self._dimension_separator))) - mask = self.__get_meta__(self._store[shard_key]) - for i in range(self._num_chunks_per_shard): - if mask == 0: - break - if mask & 1: - yield self.__shard_key_and_index_to_chunk_key__(shard_key_tuple, i) - mask >>= 1 + # TODO: use partial read if available: + index = self.__get_index__(self._store[shard_key]) + for chunk_tuple in self.__get_chunks_in_shard(shard_key): + if index.get_chunk_slice(chunk_tuple) is not None: + # TODO: if shard is in a group, prepend group-prefix to chunk + yield self._dimension_separator.join(map(str, chunk_tuple)) def __len__(self) -> int: return sum(1 for _ in self.keys()) SHARDED_STORES = { - "morton_order": MortonOrderShardedStore, + "indexed": IndexedShardedStore, } diff --git a/zarr/core.py b/zarr/core.py index 2c5505079d..364b4c55c7 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -330,10 +330,6 @@ def chunk_store(self): chunk_store, shards=self._shards, dimension_separator=self._dimension_separator, - are_chunks_compressed=self._compressor is not None, - dtype=self._dtype, - fill_value=self._fill_value or 0, - chunk_size=reduce(operator.mul, self._chunks, 1), ) return self._cached_sharded_store diff --git a/zarr/creation.py b/zarr/creation.py index d31860164a..d8c14029d4 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -21,7 +21,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, object_codec=None, dimension_separator=None, write_empty_chunks=True, - shards: Union[int, Tuple[int, ...], None]=None, shard_format: str="morton_order", **kwargs): + shards: Union[int, Tuple[int, ...], None]=None, shard_format: str="indexed", **kwargs): """Create an array. Parameters From d0434a600c085d9654b450ee216b4ff66b36b67c Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 8 Dec 2021 17:56:38 +0100 Subject: [PATCH 08/12] index: use little endian, note empty chunks with pair of max uint64 --- chunking_test.py | 2 +- zarr/_storage/sharded_store.py | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/chunking_test.py b/chunking_test.py index b58ed334d7..0598a590ff 100644 --- a/chunking_test.py +++ b/chunking_test.py @@ -44,7 +44,7 @@ # CHUNKSTORE (SHARDED) ['.zarray', '0.0', '0.1', '1.0', '1.1', '2.0', '2.1', '3.0', '3.1', '5.0', '6.1'] index_bytes = z.store["0.0"][-2*2*16:] -print("INDEX 0.0", [int.from_bytes(index_bytes[i:i+8], byteorder="big") for i in range(0, len(index_bytes), 8)]) +print("INDEX 0.0", [int.from_bytes(index_bytes[i:i+8], byteorder="little") for i in range(0, len(index_bytes), 8)]) # INDEX 0.0 [0, 48, 48, 48, 96, 48, 144, 48] z_reopened = zarr.open("data/chunking_test.zarr") diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py index fe920ee408..94fb6a0a31 100644 --- a/zarr/_storage/sharded_store.py +++ b/zarr/_storage/sharded_store.py @@ -9,6 +9,9 @@ from zarr.storage import StoreLike, array_meta_key, attrs_key, group_meta_key +MAX_UINT_64 = 2 ** 64 - 1 + + class _ShardIndex(NamedTuple): store: "IndexedShardedStore" offsets_and_lengths: np.ndarray # dtype uint64, shape (shards_0, _shards_1, ..., 2) @@ -19,7 +22,7 @@ def __localize_chunk__(self, chunk: Tuple[int, ...]) -> Tuple[int, ...]: def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]: localized_chunk = self.__localize_chunk__(chunk) chunk_start, chunk_len = self.offsets_and_lengths[localized_chunk] - if chunk_len == 0: + if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64): return None else: return slice(chunk_start, chunk_start + chunk_len) @@ -27,7 +30,7 @@ def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]: def set_chunk_slice(self, chunk: Tuple[int, ...], chunk_slice: Optional[slice]) -> None: localized_chunk = self.__localize_chunk__(chunk) if chunk_slice is None: - self.offsets_and_lengths[localized_chunk] = (0, 0) + self.offsets_and_lengths[localized_chunk] = (MAX_UINT_64, MAX_UINT_64) else: self.offsets_and_lengths[localized_chunk] = (chunk_slice.start, chunk_slice.stop - chunk_slice.start) @@ -38,13 +41,13 @@ def to_bytes(self) -> bytes: def from_bytes(cls, buffer: Union[bytes, bytearray], store: "IndexedShardedStore") -> "_ShardIndex": return cls( store=store, - offsets_and_lengths=np.frombuffer(bytearray(buffer), dtype=">u8").reshape(*store._shards, 2, order="C") + offsets_and_lengths=np.frombuffer(bytearray(buffer), dtype=" None: full_shard_value = self._store[shard_key] except KeyError: index = _ShardIndex.create_empty(self) - for chunk_to_read in chunks_to_read: - new_content[chunk_to_read] = b"" else: index = self.__get_index__(full_shard_value) for chunk_to_read in chunks_to_read: chunk_slice = index.get_chunk_slice(chunk_to_read) - if chunk_slice is None: - new_content[chunk_to_read] = b"" - else: + if chunk_slice is not None: new_content[chunk_to_read] = full_shard_value[chunk_slice] # TODO use partial write if available and possible (e.g. at the end) From 4859e3120278bd7d8c2aef7e1512030fb0a92443 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 22 Dec 2021 12:30:54 +0100 Subject: [PATCH 09/12] fix linting & typing --- zarr/_storage/sharded_store.py | 30 +++++++++++++++++++++++------- zarr/_storage/store.py | 3 ++- zarr/core.py | 3 ++- zarr/creation.py | 5 +++-- zarr/storage.py | 8 ++++---- zarr/util.py | 8 +++++--- 6 files changed, 39 insertions(+), 18 deletions(-) diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py index 94fb6a0a31..f783a0de96 100644 --- a/zarr/_storage/sharded_store.py +++ b/zarr/_storage/sharded_store.py @@ -32,22 +32,32 @@ def set_chunk_slice(self, chunk: Tuple[int, ...], chunk_slice: Optional[slice]) if chunk_slice is None: self.offsets_and_lengths[localized_chunk] = (MAX_UINT_64, MAX_UINT_64) else: - self.offsets_and_lengths[localized_chunk] = (chunk_slice.start, chunk_slice.stop - chunk_slice.start) + self.offsets_and_lengths[localized_chunk] = ( + chunk_slice.start, + chunk_slice.stop - chunk_slice.start + ) def to_bytes(self) -> bytes: return self.offsets_and_lengths.tobytes(order='C') @classmethod - def from_bytes(cls, buffer: Union[bytes, bytearray], store: "IndexedShardedStore") -> "_ShardIndex": + def from_bytes( + cls, buffer: Union[bytes, bytearray], store: "IndexedShardedStore" + ) -> "_ShardIndex": return cls( store=store, - offsets_and_lengths=np.frombuffer(bytearray(buffer), dtype=" Dict[str, List[Tuple[str, str]]]: + def __keys_to_shard_groups__( + self, keys: Iterable[str] + ) -> Dict[str, List[Tuple[str, Tuple[int, ...]]]]: shard_indices_per_shard_key = defaultdict(list) for chunk_key in keys: # TODO: allow to be in a group (aka only use last parts for dimensions) chunk_subkeys = tuple(map(int, chunk_key.split(self._dimension_separator))) - shard_key_tuple = (subkey // shard_i for subkey, shard_i in zip(chunk_subkeys, self._shards)) + shard_key_tuple = ( + subkey // shard_i for subkey, shard_i in zip(chunk_subkeys, self._shards) + ) shard_key = self._dimension_separator.join(map(str, shard_key_tuple)) shard_indices_per_shard_key[shard_key].append((chunk_key, chunk_subkeys)) return shard_indices_per_shard_key @@ -116,7 +130,9 @@ def setitems(self, values: Dict[str, bytes]) -> None: all_chunks = set(self.__get_chunks_in_shard(shard_key)) chunks_to_set = set(chunk_subkeys for _chunk_key, chunk_subkeys in chunks_in_shard) chunks_to_read = all_chunks - chunks_to_set - new_content = {chunk_subkeys: values[chunk_key] for chunk_key, chunk_subkeys in chunks_in_shard} + new_content = { + chunk_subkeys: values[chunk_key] for chunk_key, chunk_subkeys in chunks_in_shard + } try: # TODO use partial read if available full_shard_value = self._store[shard_key] diff --git a/zarr/_storage/store.py b/zarr/_storage/store.py index 6714e729f7..5347eeb425 100644 --- a/zarr/_storage/store.py +++ b/zarr/_storage/store.py @@ -110,7 +110,8 @@ def _ensure_store(store: Any): class Store(BaseStore): - # TODO: document methods which allow optimizations, e.g. delitems, setitems, getitems, listdir, … + # TODO: document methods which allow optimizations, + # e.g. delitems, setitems, getitems, listdir, … """Abstract store class used by implementations following the Zarr v2 spec. Adds public `listdir`, `rename`, and `rmdir` methods on top of BaseStore. diff --git a/zarr/core.py b/zarr/core.py index 4985ddb6cf..c45fe1b2de 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1930,7 +1930,8 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, and hasattr(self._compressor, "decode_partial") and not fields and self.dtype != object - and hasattr(self.chunk_store, "getitems") # TODO: this should rather check for read_block or similar + # TODO: this should rather check for read_block or similar + and hasattr(self.chunk_store, "getitems") ): partial_read_decode = True cdatas = { diff --git a/zarr/creation.py b/zarr/creation.py index 22173b5c07..fc87363ba5 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple, Union +from typing import Tuple, Union from warnings import warn import numpy as np @@ -21,7 +21,8 @@ def create(shape, chunks=True, dtype=None, compressor='default', overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, object_codec=None, dimension_separator=None, write_empty_chunks=True, - shards: Union[int, Tuple[int, ...], None]=None, shard_format: str="indexed", **kwargs): + shards: Union[int, Tuple[int, ...], None] = None, + shard_format: str = "indexed", **kwargs): """Create an array. Parameters diff --git a/zarr/storage.py b/zarr/storage.py index cae664fa49..45fcced3d7 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -236,8 +236,8 @@ def init_array( filters=None, object_codec=None, dimension_separator=None, - shards: Union[int, Tuple[int, ...], None]=None, - shard_format: Optional[str]=None, + shards: Union[int, Tuple[int, ...], None] = None, + shard_format: Optional[str] = None, ): """Initialize an array store with the given configuration. Note that this is a low-level function and there should be no need to call this directly from user code. @@ -373,8 +373,8 @@ def _init_array_metadata( filters=None, object_codec=None, dimension_separator=None, - shards:Union[int, Tuple[int, ...], None] = None, - shard_format: Optional[str]=None, + shards: Union[int, Tuple[int, ...], None] = None, + shard_format: Optional[str] = None, ): # guard conditions diff --git a/zarr/util.py b/zarr/util.py index 0bd2673aa0..3b017b98ca 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -14,7 +14,7 @@ from numcodecs.registry import codec_registry from numcodecs.blosc import cbuffer_sizes, cbuffer_metainfo -from typing import Any, Callable, Dict, Optional, Tuple, Union +from typing import Any, Callable, Dict, Optional, Tuple, Union, cast def flatten(arg: Iterable) -> Iterable: @@ -150,7 +150,7 @@ def normalize_chunks( def normalize_shards( - shards: Optional[Tuple[Optional[int], ...]], shape: Tuple[int, ...], + shards: Union[int, Tuple[Optional[int], ...], None], shape: Tuple[int, ...], ) -> Optional[Tuple[int, ...]]: """Convenience function to normalize the `shards` argument for an array with the given `shape`.""" @@ -163,6 +163,7 @@ def normalize_shards( # handle 1D convenience form if isinstance(shards, numbers.Integral): shards = tuple(int(shards) for _ in shape) + shards = cast(Tuple[Optional[int]], shards) # handle bad dimensionality if len(shards) > len(shape): @@ -171,13 +172,14 @@ def normalize_shards( # handle underspecified shards if len(shards) < len(shape): # assume single shards across remaining dimensions - shards += (1, ) * len(shape) - len(shards) + shards += (1, ) * (len(shape) - len(shards)) # handle None or -1 in shards if -1 in shards or None in shards: shards = tuple(s if c == -1 or c is None else int(c) for s, c in zip(shape, shards)) + shards = cast(Tuple[int], shards) return tuple(shards) From f330cf9554cbe69e8c8bb7b8faf7be2a17c0060e Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Tue, 11 Jan 2022 10:27:05 +0100 Subject: [PATCH 10/12] prototype sharding in the Array class --- zarr/_storage/sharded_store.py | 184 --------------------------------- zarr/core.py | 175 ++++++++++++++++++++++++++----- 2 files changed, 152 insertions(+), 207 deletions(-) delete mode 100644 zarr/_storage/sharded_store.py diff --git a/zarr/_storage/sharded_store.py b/zarr/_storage/sharded_store.py deleted file mode 100644 index f783a0de96..0000000000 --- a/zarr/_storage/sharded_store.py +++ /dev/null @@ -1,184 +0,0 @@ -from collections import defaultdict -from functools import reduce -from itertools import product -from typing import Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union - -import numpy as np - -from zarr._storage.store import BaseStore, Store -from zarr.storage import StoreLike, array_meta_key, attrs_key, group_meta_key - - -MAX_UINT_64 = 2 ** 64 - 1 - - -class _ShardIndex(NamedTuple): - store: "IndexedShardedStore" - offsets_and_lengths: np.ndarray # dtype uint64, shape (shards_0, _shards_1, ..., 2) - - def __localize_chunk__(self, chunk: Tuple[int, ...]) -> Tuple[int, ...]: - return tuple(chunk_i % shard_i for chunk_i, shard_i in zip(chunk, self.store._shards)) - - def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]: - localized_chunk = self.__localize_chunk__(chunk) - chunk_start, chunk_len = self.offsets_and_lengths[localized_chunk] - if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64): - return None - else: - return slice(chunk_start, chunk_start + chunk_len) - - def set_chunk_slice(self, chunk: Tuple[int, ...], chunk_slice: Optional[slice]) -> None: - localized_chunk = self.__localize_chunk__(chunk) - if chunk_slice is None: - self.offsets_and_lengths[localized_chunk] = (MAX_UINT_64, MAX_UINT_64) - else: - self.offsets_and_lengths[localized_chunk] = ( - chunk_slice.start, - chunk_slice.stop - chunk_slice.start - ) - - def to_bytes(self) -> bytes: - return self.offsets_and_lengths.tobytes(order='C') - - @classmethod - def from_bytes( - cls, buffer: Union[bytes, bytearray], store: "IndexedShardedStore" - ) -> "_ShardIndex": - return cls( - store=store, - offsets_and_lengths=np.frombuffer( - bytearray(buffer), dtype=" None: - self._store: BaseStore = BaseStore._ensure_store(store) - self._shards = shards - self._num_chunks_per_shard = reduce(lambda x, y: x*y, shards, 1) - self._dimension_separator = dimension_separator - - # TODO: add warnings for ineffective reads/writes: - # * warn if partial reads are not available - # * optionally warn on unaligned writes if no partial writes are available - - def __keys_to_shard_groups__( - self, keys: Iterable[str] - ) -> Dict[str, List[Tuple[str, Tuple[int, ...]]]]: - shard_indices_per_shard_key = defaultdict(list) - for chunk_key in keys: - # TODO: allow to be in a group (aka only use last parts for dimensions) - chunk_subkeys = tuple(map(int, chunk_key.split(self._dimension_separator))) - shard_key_tuple = ( - subkey // shard_i for subkey, shard_i in zip(chunk_subkeys, self._shards) - ) - shard_key = self._dimension_separator.join(map(str, shard_key_tuple)) - shard_indices_per_shard_key[shard_key].append((chunk_key, chunk_subkeys)) - return shard_indices_per_shard_key - - def __get_index__(self, buffer: Union[bytes, bytearray]) -> _ShardIndex: - # At the end of each shard 2*64bit per chunk for offset and length define the index: - return _ShardIndex.from_bytes(buffer[-16 * self._num_chunks_per_shard:], self) - - def __get_chunks_in_shard(self, shard_key: str) -> Iterator[Tuple[int, ...]]: - # TODO: allow to be in a group (aka only use last parts for dimensions) - shard_key_tuple = tuple(map(int, shard_key.split(self._dimension_separator))) - for chunk_offset in product(*(range(i) for i in self._shards)): - yield tuple( - shard_key_i * shards_i + offset_i - for shard_key_i, offset_i, shards_i - in zip(shard_key_tuple, chunk_offset, self._shards) - ) - - def __getitem__(self, key: str) -> bytes: - return self.getitems([key])[key] - - def getitems(self, keys: Iterable[str], **kwargs) -> Dict[str, bytes]: - result = {} - for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(keys).items(): - # TODO use partial read if available - full_shard_value = self._store[shard_key] - index = self.__get_index__(full_shard_value) - for chunk_key, chunk_subkeys in chunks_in_shard: - chunk_slice = index.get_chunk_slice(chunk_subkeys) - if chunk_slice is not None: - result[chunk_key] = full_shard_value[chunk_slice] - return result - - def __setitem__(self, key: str, value: bytes) -> None: - self.setitems({key: value}) - - def setitems(self, values: Dict[str, bytes]) -> None: - for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(values.keys()).items(): - all_chunks = set(self.__get_chunks_in_shard(shard_key)) - chunks_to_set = set(chunk_subkeys for _chunk_key, chunk_subkeys in chunks_in_shard) - chunks_to_read = all_chunks - chunks_to_set - new_content = { - chunk_subkeys: values[chunk_key] for chunk_key, chunk_subkeys in chunks_in_shard - } - try: - # TODO use partial read if available - full_shard_value = self._store[shard_key] - except KeyError: - index = _ShardIndex.create_empty(self) - else: - index = self.__get_index__(full_shard_value) - for chunk_to_read in chunks_to_read: - chunk_slice = index.get_chunk_slice(chunk_to_read) - if chunk_slice is not None: - new_content[chunk_to_read] = full_shard_value[chunk_slice] - - # TODO use partial write if available and possible (e.g. at the end) - shard_content = b"" - # TODO: order the chunks in the shard: - for chunk_subkeys, chunk_content in new_content.items(): - chunk_slice = slice(len(shard_content), len(shard_content) + len(chunk_content)) - index.set_chunk_slice(chunk_subkeys, chunk_slice) - shard_content += chunk_content - # Appending the index at the end of the shard: - shard_content += index.to_bytes() - self._store[shard_key] = shard_content - - def __delitem__(self, key) -> None: - # TODO not implemented yet, also delitems - # Deleting the "last" chunk in a shard needs to remove the whole shard - raise NotImplementedError("Deletion is not yet implemented") - - def __iter__(self) -> Iterator[str]: - for shard_key in self._store.__iter__(): - if any(shard_key.endswith(i) for i in (array_meta_key, group_meta_key, attrs_key)): - # Special keys such as ".zarray" are passed on as-is - yield shard_key - else: - # For each shard key in the wrapped store, all corresponding chunks are yielded. - # TODO: use partial read if available: - index = self.__get_index__(self._store[shard_key]) - for chunk_tuple in self.__get_chunks_in_shard(shard_key): - if index.get_chunk_slice(chunk_tuple) is not None: - # TODO: if shard is in a group, prepend group-prefix to chunk - yield self._dimension_separator.join(map(str, chunk_tuple)) - - def __len__(self) -> int: - return sum(1 for _ in self.keys()) - - -SHARDED_STORES = { - "indexed": IndexedShardedStore, -} diff --git a/zarr/core.py b/zarr/core.py index c45fe1b2de..afeb736d65 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1,17 +1,17 @@ import binascii +from collections import defaultdict import hashlib import itertools import math import operator import re from functools import reduce -from typing import Optional, Tuple +from typing import Dict, Iterable, Iterator, List, Optional, Tuple, NamedTuple, Union import numpy as np from numcodecs.compat import ensure_bytes, ensure_ndarray from collections.abc import MutableMapping -from zarr._storage.sharded_store import SHARDED_STORES from zarr.attrs import Attributes from zarr.codecs import AsType, get_codec @@ -49,6 +49,55 @@ ) +MAX_UINT_64 = 2 ** 64 - 1 + +class _ShardIndex(NamedTuple): + array: "Array" + offsets_and_lengths: np.ndarray # dtype uint64, shape (shards_0, _shards_1, ..., 2) + + def __localize_chunk__(self, chunk: Tuple[int, ...]) -> Tuple[int, ...]: + return tuple(chunk_i % shard_i for chunk_i, shard_i in zip(chunk, self.array._shards)) + + def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]: + localized_chunk = self.__localize_chunk__(chunk) + chunk_start, chunk_len = self.offsets_and_lengths[localized_chunk] + if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64): + return None + else: + return slice(chunk_start, chunk_start + chunk_len) + + def set_chunk_slice(self, chunk: Tuple[int, ...], chunk_slice: Optional[slice]) -> None: + localized_chunk = self.__localize_chunk__(chunk) + if chunk_slice is None: + self.offsets_and_lengths[localized_chunk] = (MAX_UINT_64, MAX_UINT_64) + else: + self.offsets_and_lengths[localized_chunk] = ( + chunk_slice.start, + chunk_slice.stop - chunk_slice.start + ) + + def to_bytes(self) -> bytes: + return self.offsets_and_lengths.tobytes(order='C') + + @classmethod + def from_bytes( + cls, buffer: Union[bytes, bytearray], array: "Array" + ) -> "_ShardIndex": + return cls( + array=array, + offsets_and_lengths=np.frombuffer( + bytearray(buffer), dtype=" bool: """ return self._write_empty_chunks + @property + def _num_chunks_per_shard(self) -> int: + return reduce(lambda x, y: x*y, self._shards, 1) + + def __keys_to_shard_groups__( + self, keys: Iterable[str] + ) -> Dict[str, List[Tuple[str, Tuple[int, ...]]]]: + shard_indices_per_shard_key = defaultdict(list) + for chunk_key in keys: + # TODO: allow to be in a group (aka only use last parts for dimensions) + chunk_subkeys = tuple(map(int, chunk_key.split(self._dimension_separator))) + shard_key_tuple = ( + subkey // shard_i for subkey, shard_i in zip(chunk_subkeys, self._shards) + ) + shard_key = self._dimension_separator.join(map(str, shard_key_tuple)) + shard_indices_per_shard_key[shard_key].append((chunk_key, chunk_subkeys)) + return shard_indices_per_shard_key + + def __get_index__(self, buffer: Union[bytes, bytearray]) -> _ShardIndex: + # At the end of each shard 2*64bit per chunk for offset and length define the index: + return _ShardIndex.from_bytes(buffer[-16 * self._num_chunks_per_shard:], self) + + def __get_chunks_in_shard(self, shard_key: str) -> Iterator[Tuple[int, ...]]: + # TODO: allow to be in a group (aka only use last parts for dimensions) + shard_key_tuple = tuple(map(int, shard_key.split(self._dimension_separator))) + for chunk_offset in itertools.product(*(range(i) for i in self._shards)): + yield tuple( + shard_key_i * shards_i + offset_i + for shard_key_i, offset_i, shards_i + in zip(shard_key_tuple, chunk_offset, self._shards) + ) + def __eq__(self, other): return ( isinstance(other, Array) and @@ -883,7 +954,7 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None): try: # obtain encoded data for chunk ckey = self._chunk_key((0,)) - cdata = self.chunk_store[ckey] + cdata = self._read_single_possibly_sharded(ckey) except KeyError: # chunk not initialized @@ -1196,8 +1267,10 @@ def _get_selection(self, indexer, out=None, fields=None): check_array_shape('out', out, out_shape) # iterate over chunks - if not hasattr(self.chunk_store, "getitems") or \ - any(map(lambda x: x == 0, self.shape)): + if self._shards is None and ( + not hasattr(self.chunk_store, "getitems") + or any(map(lambda x: x == 0, self.shape)) + ): # sequentially get one key at a time from storage for chunk_coords, chunk_selection, out_selection in indexer: @@ -1666,7 +1739,7 @@ def _set_basic_selection_zd(self, selection, value, fields=None): # setup chunk try: # obtain compressed data for chunk - cdata = self.chunk_store[ckey] + cdata = self._read_single_possibly_sharded(ckey) except KeyError: # chunk not initialized @@ -1734,8 +1807,11 @@ def _set_selection(self, indexer, value, fields=None): check_array_shape('value', value, sel_shape) # iterate over chunks in range - if not hasattr(self.chunk_store, "setitems") or self._synchronizer is not None \ - or any(map(lambda x: x == 0, self.shape)): + if self._shards is None and ( + not hasattr(self.chunk_store, "setitems") + or self._synchronizer is not None + or any(map(lambda x: x == 0, self.shape)) + ): # iterative approach for chunk_coords, chunk_selection, out_selection in indexer: @@ -1909,6 +1985,21 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, self._process_chunk(out, cdata, chunk_selection, drop_axes, out_is_ndarray, fields, out_selection) + def _read_single_possibly_sharded(self, ckey): + if self._shards is None: + return self.chunk_store[ckey] + else: + shard_key, chunks_in_shard = next(iter(self.__keys_to_shard_groups__([ckey]).items())) + # TODO use partial read if available + full_shard_value = self.chunk_store[shard_key] + index = self.__get_index__(full_shard_value) + for _chunk_key, chunk_subkeys in chunks_in_shard: + chunk_slice = index.get_chunk_slice(chunk_subkeys) + if chunk_slice is None: + raise KeyError + else: + return full_shard_value[chunk_slice] + def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, drop_axes=None, fields=None): """As _chunk_getitem, but for lists of chunks @@ -1941,7 +2032,16 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, } else: partial_read_decode = False - cdatas = self.chunk_store.getitems(ckeys, on_error="omit") + cdatas = {} + for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(ckeys).items(): + # TODO use partial read if available + full_shard_value = self.chunk_store[shard_key] + index = self.__get_index__(full_shard_value) + for chunk_key, chunk_subkeys in chunks_in_shard: + chunk_slice = index.get_chunk_slice(chunk_subkeys) + if chunk_slice is not None: + cdatas[chunk_key] = full_shard_value[chunk_slice] + for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection): if ckey in cdatas: self._process_chunk( @@ -1975,7 +2075,36 @@ def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None): to_store = {k: self._encode_chunk(cdatas[k]) for k in nonempty_keys} else: to_store = {k: self._encode_chunk(v) for k, v in cdatas.items()} - self.chunk_store.setitems(to_store) + + for shard_key, chunks_in_shard in self.__keys_to_shard_groups__(to_store.keys()).items(): + all_chunks = set(self.__get_chunks_in_shard(shard_key)) + chunks_to_set = set(chunk_subkeys for _chunk_key, chunk_subkeys in chunks_in_shard) + chunks_to_read = all_chunks - chunks_to_set + new_content = { + chunk_subkeys: to_store[chunk_key] for chunk_key, chunk_subkeys in chunks_in_shard + } + try: + # TODO use partial read if available + full_shard_value = self.chunk_store[shard_key] + except KeyError: + index = _ShardIndex.create_empty(self) + else: + index = self.__get_index__(full_shard_value) + for chunk_to_read in chunks_to_read: + chunk_slice = index.get_chunk_slice(chunk_to_read) + if chunk_slice is not None: + new_content[chunk_to_read] = full_shard_value[chunk_slice] + + # TODO use partial write if available and possible (e.g. at the end) + shard_content = b"" + # TODO: order the chunks in the shard: + for chunk_subkeys, chunk_content in new_content.items(): + chunk_slice = slice(len(shard_content), len(shard_content) + len(chunk_content)) + index.set_chunk_slice(chunk_subkeys, chunk_slice) + shard_content += chunk_content + # Appending the index at the end of the shard: + shard_content += index.to_bytes() + self.chunk_store[shard_key] = shard_content def _chunk_delitems(self, ckeys): if hasattr(self.chunk_store, "delitems"): @@ -2055,7 +2184,7 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None): try: # obtain compressed data for chunk - cdata = self.chunk_store[ckey] + cdata = self._read_single_possibly_sharded(ckey) except KeyError: From 7d042d2167bb758a951cb463b2cfefd394adbc84 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Tue, 11 Jan 2022 10:36:07 +0100 Subject: [PATCH 11/12] fix output in chunking_test --- chunking_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chunking_test.py b/chunking_test.py index 0598a590ff..8e0e7ff18b 100644 --- a/chunking_test.py +++ b/chunking_test.py @@ -41,7 +41,7 @@ # ONDISK ['.zarray', '0.0', '1.0', '2.0', '3.0'] # STORE ['.zarray', '0.0', '1.0', '2.0', '3.0'] -# CHUNKSTORE (SHARDED) ['.zarray', '0.0', '0.1', '1.0', '1.1', '2.0', '2.1', '3.0', '3.1', '5.0', '6.1'] +# CHUNKSTORE (SHARDED) ['.zarray', '0.0', '1.0', '2.0', '3.0'] index_bytes = z.store["0.0"][-2*2*16:] print("INDEX 0.0", [int.from_bytes(index_bytes[i:i+8], byteorder="little") for i in range(0, len(index_bytes), 8)]) From 8b2fec295cc8a3e5b6caabccbacd04aaa6cdf7ec Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Tue, 11 Jan 2022 16:45:21 +0100 Subject: [PATCH 12/12] rename test-script --- chunking_test.py => sharding_test.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename chunking_test.py => sharding_test.py (100%) diff --git a/chunking_test.py b/sharding_test.py similarity index 100% rename from chunking_test.py rename to sharding_test.py