From baf81b42b33bd4e6ffab94722a021d87b79931d7 Mon Sep 17 00:00:00 2001 From: Lily Wang <31115101+lilyminium@users.noreply.github.com> Date: Fri, 19 Apr 2019 00:34:28 +1000 Subject: [PATCH] Manually specify chunks in open_zarr (#2530) * added manual chunks for open_zarr * updated whats-new * fixed pep8 issues * removed whitespace * added deprecation warning * fixed pep8 issues * added warning for bad chunks * fixed lingering rebase conflicts * fixed pep8 issues * added stacklevel * fixed pep8 issues * Various fixes for explicit Dataset.indexes (#2858) * Various fixes for explicit Dataset.indexes Fixes GH2856 I've added internal consistency checks to the uses of ``assert_equal`` in our test suite, so this shouldn't happen again. * Fix indexes in Dataset.interp * 0.12.1 release * revert to 0.12.2 dev * update links to https (#2872) * Fix mypy typing error in cftime_offsets.py (#2878) * decreased pytest verbosity (#2881) * added manual chunks for open_zarr * updated whats-new * fixed pep8 issues * removed whitespace * added deprecation warning * fixed pep8 issues * added warning for bad chunks * fixed lingering rebase conflicts * fixed pep8 issues * added stacklevel * fixed pep8 issues * disallow unicode again * disallow unicode again --- doc/whats-new.rst | 5 ++ xarray/backends/zarr.py | 110 +++++++++++++++++++++++++++------- xarray/tests/test_backends.py | 90 ++++++++++++++++++++++++++-- 3 files changed, 178 insertions(+), 27 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index e9bdc710029..059b6e8b544 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -140,6 +140,11 @@ Other enhancements By `Keisuke Fujii `_. - Added :py:meth:`~xarray.Dataset.drop_dims` (:issue:`1949`). By `Kevin Squire `_. +- ``xr.open_zarr`` now accepts manually specified chunks with the ``chunks=`` + parameter. ``auto_chunk=True`` is equivalent to ``chunks='auto'`` for + backwards compatibility. The ``overwrite_encoded_chunks`` parameter is + added to remove the original zarr chunk encoding. + By `Lily Wang `_. Bug fixes ~~~~~~~~~ diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index ee77e0833c4..f5364314af8 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -1,3 +1,4 @@ +import warnings from collections import OrderedDict from distutils.version import LooseVersion @@ -352,10 +353,11 @@ def close(self): zarr.consolidate_metadata(self.ds.store) -def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, +def open_zarr(store, group=None, synchronizer=None, chunks='auto', decode_cf=True, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, - drop_variables=None, consolidated=False): + drop_variables=None, consolidated=False, + overwrite_encoded_chunks=False, **kwargs): """Load and decode a dataset from a Zarr store. .. note:: Experimental @@ -375,10 +377,15 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, Array synchronizer provided to zarr group : str, obtional Group path. (a.k.a. `path` in zarr terminology.) - auto_chunk : bool, optional - Whether to automatically create dask chunks corresponding to each - variable's zarr chunks. If False, zarr array data will lazily convert - to numpy arrays upon access. + chunks : int or dict or tuple or {None, 'auto'}, optional + Chunk sizes along each dimension, e.g., ``5`` or + ``{'x': 5, 'y': 5}``. If `chunks='auto'`, dask chunks are created + based on the variable's zarr chunks. If `chunks=None`, zarr array + data will lazily convert to numpy arrays upon access. This accepts + all the chunk specifications as Dask does. + overwrite_encoded_chunks: bool, optional + Whether to drop the zarr chunks encoded for each variable when a + dataset is loaded with specified chunk sizes (default: False) decode_cf : bool, optional Whether to decode these variables, assuming they were saved according to CF conventions. @@ -422,6 +429,24 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, ---------- http://zarr.readthedocs.io/ """ + if 'auto_chunk' in kwargs: + auto_chunk = kwargs.pop('auto_chunk') + if auto_chunk: + chunks = 'auto' # maintain backwards compatibility + else: + chunks = None + + warnings.warn("auto_chunk is deprecated. Use chunks='auto' instead.", + FutureWarning, stacklevel=2) + + if kwargs: + raise TypeError("open_zarr() got unexpected keyword arguments " + + ",".join(kwargs.keys())) + + if not isinstance(chunks, (int, dict)): + if chunks != 'auto' and chunks is not None: + raise ValueError("chunks must be an int, dict, 'auto', or None. " + "Instead found %s. " % chunks) if not decode_cf: mask_and_scale = False @@ -449,21 +474,60 @@ def maybe_decode_store(store, lock=False): # auto chunking needs to be here and not in ZarrStore because variable # chunks do not survive decode_cf - if auto_chunk: - # adapted from Dataset.Chunk() - def maybe_chunk(name, var): - from dask.base import tokenize - chunks = var.encoding.get('chunks') - if (var.ndim > 0) and (chunks is not None): - # does this cause any data to be read? - token2 = tokenize(name, var._data) - name2 = 'zarr-%s' % token2 - return var.chunk(chunks, name=name2, lock=None) - else: - return var - - variables = OrderedDict([(k, maybe_chunk(k, v)) - for k, v in ds.variables.items()]) - return ds._replace_vars_and_dims(variables) - else: + # return trivial case + if not chunks: return ds + + # adapted from Dataset.Chunk() + if isinstance(chunks, int): + chunks = dict.fromkeys(ds.dims, chunks) + + if isinstance(chunks, tuple) and len(chunks) == len(ds.dims): + chunks = dict(zip(ds.dims, chunks)) + + def get_chunk(name, var, chunks): + chunk_spec = dict(zip(var.dims, var.encoding.get('chunks'))) + + # Coordinate labels aren't chunked + if var.ndim == 1 and var.dims[0] == name: + return chunk_spec + + if chunks == 'auto': + return chunk_spec + + for dim in var.dims: + if dim in chunks: + spec = chunks[dim] + if isinstance(spec, int): + spec = (spec,) + if isinstance(spec, (tuple, list)) and chunk_spec[dim]: + if any(s % chunk_spec[dim] for s in spec): + warnings.warn("Specified Dask chunks %r would " + "separate Zarr chunk shape %r for " + "dimension %r. This significantly " + "degrades performance. Consider " + "rechunking after loading instead." + % (chunks[dim], chunk_spec[dim], dim), + stacklevel=2) + chunk_spec[dim] = chunks[dim] + return chunk_spec + + def maybe_chunk(name, var, chunks): + from dask.base import tokenize + + chunk_spec = get_chunk(name, var, chunks) + + if (var.ndim > 0) and (chunk_spec is not None): + # does this cause any data to be read? + token2 = tokenize(name, var._data) + name2 = 'zarr-%s' % token2 + var = var.chunk(chunk_spec, name=name2, lock=None) + if overwrite_encoded_chunks and var.chunks is not None: + var.encoding['chunks'] = tuple(x[0] for x in var.chunks) + return var + else: + return var + + variables = OrderedDict([(k, maybe_chunk(k, v, chunks)) + for k, v in ds.variables.items()]) + return ds._replace_vars_and_dims(variables) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index f76d1927210..5e28ff46665 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1391,7 +1391,7 @@ def test_auto_chunk(self): original = create_test_data().chunk() with self.roundtrip( - original, open_kwargs={'auto_chunk': False}) as actual: + original, open_kwargs={'chunks': None}) as actual: for k, v in actual.variables.items(): # only index variables should be in memory assert v._in_memory == (k in actual.dims) @@ -1399,19 +1399,101 @@ def test_auto_chunk(self): assert v.chunks is None with self.roundtrip( - original, open_kwargs={'auto_chunk': True}) as actual: + original, open_kwargs={'chunks': 'auto'}) as actual: for k, v in actual.variables.items(): # only index variables should be in memory assert v._in_memory == (k in actual.dims) # chunk size should be the same as original assert v.chunks == original[k].chunks + def test_manual_chunk(self): + original = create_test_data().chunk({'dim1': 3, 'dim2': 4, 'dim3': 3}) + + # All of these should return non-chunked arrays + NO_CHUNKS = (None, 0, {}) + for no_chunk in NO_CHUNKS: + open_kwargs = {'chunks': no_chunk} + with self.roundtrip(original, open_kwargs=open_kwargs) as actual: + for k, v in actual.variables.items(): + # only index variables should be in memory + assert v._in_memory == (k in actual.dims) + # there should be no chunks + assert v.chunks is None + + # uniform arrays + for i in range(2, 6): + rechunked = original.chunk(chunks=i) + open_kwargs = {'chunks': i} + with self.roundtrip(original, open_kwargs=open_kwargs) as actual: + for k, v in actual.variables.items(): + # only index variables should be in memory + assert v._in_memory == (k in actual.dims) + # chunk size should be the same as rechunked + assert v.chunks == rechunked[k].chunks + + chunks = {'dim1': 2, 'dim2': 3, 'dim3': 5} + rechunked = original.chunk(chunks=chunks) + + open_kwargs = {'chunks': chunks, 'overwrite_encoded_chunks': True} + with self.roundtrip(original, open_kwargs=open_kwargs) as actual: + for k, v in actual.variables.items(): + assert v.chunks == rechunked[k].chunks + + with self.roundtrip(actual) as auto: + # encoding should have changed + for k, v in actual.variables.items(): + assert v.chunks == rechunked[k].chunks + + assert_identical(actual, auto) + assert_identical(actual.load(), auto.load()) + + def test_warning_on_bad_chunks(self): + original = create_test_data().chunk({'dim1': 4, 'dim2': 3, 'dim3': 5}) + + bad_chunks = (2, {'dim2': (3, 3, 2, 1)}) + for chunks in bad_chunks: + kwargs = {'chunks': chunks} + with pytest.warns(UserWarning): + with self.roundtrip(original, open_kwargs=kwargs) as actual: + for k, v in actual.variables.items(): + # only index variables should be in memory + assert v._in_memory == (k in actual.dims) + + good_chunks = ({'dim2': 3}, {'dim3': 10}) + for chunks in good_chunks: + kwargs = {'chunks': chunks} + with pytest.warns(None) as record: + with self.roundtrip(original, open_kwargs=kwargs) as actual: + for k, v in actual.variables.items(): + # only index variables should be in memory + assert v._in_memory == (k in actual.dims) + assert len(record) == 0 + + def test_deprecate_auto_chunk(self): + original = create_test_data().chunk() + with pytest.warns(FutureWarning): + with self.roundtrip( + original, open_kwargs={'auto_chunk': True}) as actual: + for k, v in actual.variables.items(): + # only index variables should be in memory + assert v._in_memory == (k in actual.dims) + # chunk size should be the same as original + assert v.chunks == original[k].chunks + + with pytest.warns(FutureWarning): + with self.roundtrip( + original, open_kwargs={'auto_chunk': False}) as actual: + for k, v in actual.variables.items(): + # only index variables should be in memory + assert v._in_memory == (k in actual.dims) + # there should be no chunks + assert v.chunks is None + def test_write_uneven_dask_chunks(self): # regression for GH#2225 original = create_test_data().chunk({'dim1': 3, 'dim2': 4, 'dim3': 3}) - with self.roundtrip( - original, open_kwargs={'auto_chunk': True}) as actual: + original, open_kwargs={'chunks': 'auto'}) as actual: for k, v in actual.data_vars.items(): print(k) assert v.chunks == actual[k].chunks