Skip to content

Commit

Permalink
Cache Dask arrays created from NetCDFDataProxys to speed up loading…
Browse files Browse the repository at this point in the history
… files with multiple variables (#6252)

* Cache Dask arrays to speed up loading files with multiple variables

* Add benchmark for files with many cubes

* Add whatsnew

* Add test

* Add license header

* Use a global to set the cache size

* Update whatsnew
  • Loading branch information
bouweandela authored Feb 14, 2025
1 parent 8f5cb02 commit 472db73
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 7 deletions.
4 changes: 2 additions & 2 deletions docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ This document explains the changes made to Iris for this release
🚀 Performance Enhancements
===========================

#. `@bouweandela`_ made loading :class:`~iris.cube.Cube`s from small NetCDF
files faster. (:pull:`6229`)
#. `@bouweandela`_ made loading :class:`~iris.cube.Cube`s from NetCDF files
faster. (:pull:`6229` and :pull:`6252`)

#. `@fnattino`_ enabled lazy cube interpolation using the linear and
nearest-neighbour interpolators (:class:`iris.analysis.Linear` and
Expand Down
56 changes: 52 additions & 4 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import numpy as np
import numpy.ma as ma

MAX_CACHE_SIZE = 100
"""Maximum number of Dask arrays to cache."""


def non_lazy(func):
"""Turn a lazy function into a function that returns a result immediately."""
Expand Down Expand Up @@ -202,6 +205,7 @@ def _optimum_chunksize_internals(
dim = working[0]
working = working[1:]
result.append(dim)
result = tuple(result)

return result

Expand All @@ -227,6 +231,33 @@ def _optimum_chunksize(
)


class LRUCache:
def __init__(self, maxsize: int) -> None:
self._cache: dict = {}
self.maxsize = maxsize

def __getitem__(self, key):
value = self._cache.pop(key)
self._cache[key] = value
return value

def __setitem__(self, key, value):
self._cache[key] = value
if len(self._cache) > self.maxsize:
self._cache.pop(next(iter(self._cache)))

def __contains__(self, key):
return key in self._cache

def __repr__(self):
return (
f"<{self.__class__.__name__} maxsize={self.maxsize} cache={self._cache!r} >"
)


CACHE = LRUCache(MAX_CACHE_SIZE)


def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None):
"""Convert the input array `data` to a :class:`dask.array.Array`.
Expand Down Expand Up @@ -264,6 +295,8 @@ def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None):
but reduced by a factor if that exceeds the dask default chunksize.
"""
from iris.fileformats.netcdf._thread_safe_nc import NetCDFDataProxy

if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)

Expand All @@ -277,7 +310,7 @@ def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None):
if chunks is None:
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
chunks = list(data.shape)
chunks = tuple(data.shape)

# Adjust chunk size for better dask performance,
# NOTE: but only if no shape dimension is zero, so that we can handle the
Expand All @@ -291,9 +324,24 @@ def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None):
dims_fixed=dims_fixed,
)

if not is_lazy_data(data):
data = da.from_array(data, chunks=chunks, asarray=asarray, meta=meta)
return data
# Define a cache key for caching arrays created from NetCDFDataProxy objects.
# Creating new Dask arrays is relatively slow, therefore caching is beneficial
# if many cubes in the same file share coordinate arrays.
if isinstance(data, NetCDFDataProxy):
key = (repr(data), chunks, asarray, meta.dtype, type(meta))
else:
key = None

if is_lazy_data(data):
result = data
elif key in CACHE:
result = CACHE[key].copy()
else:
result = da.from_array(data, chunks=chunks, asarray=asarray, meta=meta)
if key is not None:
CACHE[key] = result.copy()

return result


def _co_realise_lazy_arrays(arrays):
Expand Down
2 changes: 1 addition & 1 deletion lib/iris/tests/unit/lazy_data/test_as_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_default_chunks_limiting(self, mocker):
as_lazy_data(data)
assert limitcall_patch.call_args_list == [
mock.call(
list(test_shape),
tuple(test_shape),
shape=test_shape,
dtype=np.dtype("f4"),
dims_fixed=None,
Expand Down
25 changes: 25 additions & 0 deletions lib/iris/tests/unit/lazy_data/test_lrucache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Test function :func:`iris._lazy data.LRUCache`."""

from iris._lazy_data import LRUCache


def test_lrucache():
cache = LRUCache(2)

cache["a"] = 1

assert "a" in cache
assert cache["a"] == 1

cache["b"] = 2
cache["c"] = 3

assert "a" not in cache
assert "b" in cache
assert "c" in cache

assert str(cache) == "<LRUCache maxsize=2 cache={'b': 2, 'c': 3} >"

0 comments on commit 472db73

Please sign in to comment.