Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zarr-Python 3 compatibility #11388

Merged
merged 20 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2343749
switch to using zarr.open_array instead of using the Array constructor
jhamman Sep 14, 2024
e37704b
further changes for zarr3 compat
jhamman Sep 14, 2024
4380993
switch to working branch for upstream test
jhamman Sep 14, 2024
916561c
Update dask/array/core.py
jhamman Sep 16, 2024
104bc6f
update imports
jhamman Sep 17, 2024
c3df8d6
Merge branch 'main' of github.com:dask/dask into fix/zarr-array-const…
jhamman Sep 29, 2024
e3aab83
fixup test
jhamman Sep 29, 2024
228deb9
fixup distributed test
jhamman Sep 29, 2024
c710118
Merge branch 'fix/zarr-array-construction' of github.com:jhamman/dask…
jhamman Oct 1, 2024
1134808
Update continuous_integration/scripts/install.sh
jhamman Oct 1, 2024
3d8f445
Merge branch 'main' into fix/zarr-array-construction-2
jhamman Oct 1, 2024
a044730
fixup order / xarray tests
jhamman Oct 7, 2024
e99bbdd
Merge branch 'fix/zarr-array-construction-2' of github.com:jhamman/da…
jhamman Oct 7, 2024
aeaf871
Merge branch 'main' of github.com:dask/dask into fix/zarr-array-const…
jhamman Oct 7, 2024
c0f671e
Merge branch 'main' into fix/zarr-array-construction-2
jhamman Oct 7, 2024
9721f02
cleanup resources in fixture
jhamman Oct 10, 2024
b658930
Merge branch 'fix/zarr-array-construction-2' of github.com:jhamman/da…
jhamman Oct 10, 2024
eadc5f5
point at working branch
jhamman Oct 10, 2024
0094664
tidy
jhamman Oct 10, 2024
c1c7f5a
Merge branch 'main' into fix/zarr-array-construction-2
jhamman Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions continuous_integration/scripts/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ if [[ ${UPSTREAM_DEV} ]]; then
git+https://github.com/dask/distributed \
git+https://github.com/dask/dask-expr \
git+https://github.com/dask/fastparquet \
git+https://github.com/zarr-developers/zarr-python.git@main
# Zarr's default branch (`v3`) is still under development.
# Explicitly specify `main` until their default branch is ready.
# https://github.com/zarr-developers/zarr-python/issues/1922
git+https://github.com/zarr-developers/zarr-python
mamba uninstall --force numpy pandas scipy numexpr numba sparse scikit-image h5py
python -m pip install --no-deps --pre --retries 10 \
-i https://pypi.anaconda.org/scientific-python-nightly-wheels/simple \
Expand Down
36 changes: 30 additions & 6 deletions dask/array/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from bisect import bisect
from collections import defaultdict
from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence
from functools import partial, reduce, wraps
from functools import lru_cache, partial, reduce, wraps
from itertools import product, zip_longest
from numbers import Integral, Number
from operator import add, mul
Expand All @@ -22,6 +22,7 @@

import numpy as np
from numpy.typing import ArrayLike
from packaging.version import Version
from tlz import accumulate, concat, first, frequencies, groupby, partition
from tlz.curried import pluck

Expand Down Expand Up @@ -3582,6 +3583,16 @@ def from_array(
return Array(dsk, name, chunks, meta=meta, dtype=getattr(x, "dtype", None))


@lru_cache
def _zarr_v3() -> bool:
try:
import zarr
except ImportError:
return False
else:
return Version(zarr.__version__).major >= 3


def from_zarr(
url,
component=None,
Expand Down Expand Up @@ -3632,12 +3643,15 @@ def from_zarr(
if isinstance(url, os.PathLike):
url = os.fspath(url)
if storage_options:
store = zarr.storage.FSStore(url, **storage_options)
if _zarr_v3():
store = zarr.store.RemoteStore(url, **storage_options)
else:
store = zarr.storage.FSStore(url, **storage_options)
else:
store = url
z = zarr.open_array(store, read_only=True, path=component, **kwargs)
z = zarr.open_array(store=store, read_only=True, path=component, **kwargs)
else:
z = zarr.open_array(url, read_only=True, path=component, **kwargs)
z = zarr.open_array(store=url, read_only=True, path=component, **kwargs)
chunks = chunks if chunks is not None else z.chunks
if name is None:
name = "from-zarr-" + tokenize(z, component, storage_options, chunks, **kwargs)
Expand Down Expand Up @@ -3706,9 +3720,14 @@ def to_zarr(
"currently supported by Zarr.%s" % unknown_chunk_message
)

if _zarr_v3():
zarr_mem_store_types = (zarr.storage.MemoryStore,)
else:
zarr_mem_store_types = (dict, zarr.storage.MemoryStore, zarr.storage.KVStore)

if isinstance(url, zarr.Array):
z = url
if isinstance(z.store, (dict, zarr.storage.MemoryStore, zarr.storage.KVStore)):
if isinstance(z.store, zarr_mem_store_types):
try:
from distributed import default_client

Expand Down Expand Up @@ -3751,7 +3770,12 @@ def to_zarr(
storage_options = storage_options or {}

if storage_options:
store = zarr.storage.FSStore(url, **storage_options)
if _zarr_v3():
store = zarr.storage.RemoteStore(
url, mode=kwargs.pop("mode", "a"), **storage_options
)
else:
store = zarr.storage.FSStore(url, **storage_options)
else:
store = url

Expand Down
20 changes: 12 additions & 8 deletions dask/array/tests/test_array_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from operator import add, sub
from threading import Lock

from packaging.version import Version
from tlz import concat, merge
from tlz.curried import identity

Expand Down Expand Up @@ -4598,15 +4599,17 @@ def test_read_zarr_chunks():
assert arr.chunks == ((5, 4),)


def test_zarr_pass_mapper():
pytest.importorskip("zarr")
import zarr.storage
def test_zarr_pass_store():
zarr = pytest.importorskip("zarr")

with tmpdir() as d:
mapper = zarr.storage.DirectoryStore(d)
if Version(zarr.__version__) < Version("3.0.0.a0"):
store = zarr.storage.DirectoryStore(d)
else:
store = zarr.storage.LocalStore(d, mode="w")
a = da.zeros((3, 3), chunks=(1, 1))
a.to_zarr(mapper)
a2 = da.from_zarr(mapper)
a.to_zarr(store)
a2 = da.from_zarr(store)
assert_eq(a, a2)
assert a2.chunks == a.chunks

Expand All @@ -4623,8 +4626,9 @@ def test_zarr_group():
# second time is fine, group exists
a.to_zarr(d, component="test2", overwrite=False)
a.to_zarr(d, component="nested/test", overwrite=False)
group = zarr.open_group(d, mode="r")
assert list(group) == ["nested", "test", "test2"]

group = zarr.open_group(store=d, mode="r")
assert set(group) == {"nested", "test", "test2"}
assert "test" in group["nested"]

a2 = da.from_zarr(d, component="test")
Expand Down
52 changes: 40 additions & 12 deletions dask/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from functools import partial
from operator import add

from packaging.version import Version

from distributed import Client, SchedulerPlugin, WorkerPlugin, futures_of, wait
from distributed.utils_test import cleanup # noqa F401
from distributed.utils_test import client as c # noqa F401
Expand Down Expand Up @@ -340,10 +342,33 @@ def test_futures_in_graph(c):
assert xxyy3.compute(scheduler="dask.distributed") == ((1 + 1) + (2 + 2)) + 10


def test_zarr_distributed_roundtrip(c):
@pytest.fixture(scope="function")
def zarr(c):
zarr_lib = pytest.importorskip("zarr")
# Zarr-Python 3 lazily allocates a dedicated thread/IO loop
# for to execute async tasks. To avoid having this thread
# be picked up as a "leaked thread", we manually trigger it's
# creation before using zarr
try:
_ = zarr_lib.core.sync._get_loop()
_ = zarr_lib.core.sync._get_executor()
yield zarr_lib
except AttributeError:
yield zarr_lib
finally:
# Zarr-Python 3 lazily allocates a IO thread, a thread pool executor, and
# an IO loop. Here we clean up these resources to avoid leaking threads
# In normal operations, this is done as by an atexit handler when Zarr
# is shutting down.
try:
zarr_lib.core.sync.cleanup_resources()
except AttributeError:
pass


def test_zarr_distributed_roundtrip(c, zarr):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
pytest.importorskip("zarr")

with tmpdir() as d:
a = da.zeros((3, 3), chunks=(1, 1))
Expand All @@ -353,16 +378,18 @@ def test_zarr_distributed_roundtrip(c):
assert a2.chunks == a.chunks


def test_zarr_distributed_with_explicit_directory_store(c):
def test_zarr_distributed_with_explicit_directory_store(c, zarr):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
zarr = pytest.importorskip("zarr")

with tmpdir() as d:
chunks = (1, 1)
a = da.zeros((3, 3), chunks=chunks)
s = zarr.storage.DirectoryStore(d)
z = zarr.creation.open_array(
if Version(zarr.__version__) < Version("3.0.0.a0"):
s = zarr.storage.DirectoryStore(d)
else:
s = zarr.storage.LocalStore(d, mode="a")
z = zarr.open_array(
shape=a.shape,
chunks=chunks,
dtype=a.dtype,
Expand All @@ -375,15 +402,17 @@ def test_zarr_distributed_with_explicit_directory_store(c):
assert a2.chunks == a.chunks


def test_zarr_distributed_with_explicit_memory_store(c):
def test_zarr_distributed_with_explicit_memory_store(c, zarr):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
zarr = pytest.importorskip("zarr")

chunks = (1, 1)
a = da.zeros((3, 3), chunks=chunks)
s = zarr.storage.MemoryStore()
z = zarr.creation.open_array(
if Version(zarr.__version__) < Version("3.0.0.a0"):
s = zarr.storage.MemoryStore()
else:
s = zarr.storage.MemoryStore(mode="a")
z = zarr.open_array(
shape=a.shape,
chunks=chunks,
dtype=a.dtype,
Expand All @@ -394,10 +423,9 @@ def test_zarr_distributed_with_explicit_memory_store(c):
a.to_zarr(z)


def test_zarr_in_memory_distributed_err(c):
def test_zarr_in_memory_distributed_err(c, zarr):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
zarr = pytest.importorskip("zarr")

chunks = (1, 1)
a = da.ones((3, 3), chunks=chunks)
Expand Down
7 changes: 6 additions & 1 deletion dask/tests/test_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections import defaultdict

import pytest
from packaging.version import Version

import dask
from dask import delayed
Expand Down Expand Up @@ -891,7 +892,11 @@ def test_array_store_final_order(tmpdir):
arrays = [da.ones((110, 4), chunks=(100, 2)) for i in range(4)]
x = da.concatenate(arrays, axis=0).rechunk((100, 2))

store = zarr.DirectoryStore(tmpdir)
if Version(zarr.__version__) < Version("3.0.0.a0"):
store = zarr.storage.DirectoryStore(tmpdir)
else:
store = zarr.storage.LocalStore(str(tmpdir), mode="w")

root = zarr.group(store, overwrite=True)
dest = root.empty_like(name="dest", data=x, chunks=x.chunksize, overwrite=True)
d = x.store(dest, lock=False, compute=False)
Expand Down
8 changes: 8 additions & 0 deletions dask/tests/test_sizeof.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from array import array

import pytest
from packaging.version import Version

from dask.multiprocessing import get_context
from dask.sizeof import sizeof
Expand Down Expand Up @@ -276,12 +277,19 @@ def test_xarray():

def test_xarray_not_in_memory():
xr = pytest.importorskip("xarray")
zarr = pytest.importorskip("zarr")
np = pytest.importorskip("numpy")
pytest.importorskip("zarr")
Comment on lines +280 to 282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS, the second zarr import can be removed.


ind = np.arange(-66, 67, 1).astype(float)
arr = np.random.random((len(ind),))

# TODO: remove this conditional after consolidated metadata lands in v3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this referring to landing in dask or in Zarr? Did it land? There's no issue or PR reference to check.

if Version(zarr.__version__) > Version("3.0.0.a0") and Version(
zarr.__version__
) < Version("3.0.0"):
pytest.xfail("consolidated metadata and xarray support is not complete")

with tmpdir() as path:
xr.DataArray(
arr,
Expand Down
Loading