Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some iris netcdf saves are fetching all the data. #5753

Closed
pp-mo opened this issue Feb 16, 2024 · 7 comments
Closed

Some iris netcdf saves are fetching all the data. #5753

pp-mo opened this issue Feb 16, 2024 · 7 comments

Comments

@pp-mo
Copy link
Member

pp-mo commented Feb 16, 2024

From a support issue loading a large dataset from a GRIB file and saving to netcdf.
The example data was about 16Gb, and was running out of memory when saving.

It seems that at least some netcdf saves are not able to be correctly saved in a chunk-by-chunk streamed manner.

Simple example to reproduce

import dask.array as da
import iris
from iris.cube import Cube
import tracemalloc

# Reasonably sized usage example, big enough to show problem
nt, ny, nx = 50, 700, 1400
# The data must be a stack of lazy arrays.
# A single random array with these chunks does *not* show the problem.
chunk_arrays = [
    da.random.uniform(0., 1, size=(ny, nx))
    for _ in range(nt)
]
data = da.stack(chunk_arrays)
cube = Cube(data)
print(f"Cube to save : {cube.summary(shorten=True)}")
data_size_mb = data.size * data.dtype.itemsize * 1.0 / 1024 ** 2
print(f"Cube data shape = {data.shape}, size = {data_size_mb:.1f} Mb")

tracemalloc.start()
iris.save(cube, 'tmp.nc')
_, peak_mem = tracemalloc.get_traced_memory()

memory_mb = peak_mem * 1.0 / 1024 ** 2
print(f"Peak memory used during save : {memory_mb:.1f}Mb")

from which, a sample output :

Cube to save : unknown / (unknown)                 (-- : 50; -- : 700; -- : 1400)
Cube data shape = (50, 700, 1400), size = 373.8 Mb
Peak memory used during save : 377.9Mb

For comparison, it is OK in xarray (!! sorry 😬 !!) :

( using same data array )

import xarray as xr
xrda = xr.DataArray(data)

tracemalloc.stop()
tracemalloc.start()
xrda.to_netcdf('tmp.nc')
_, peak_mem = tracemalloc.get_traced_memory()

memory_mb = peak_mem * 1.0 / 1024 ** 2
print(f"Peak memory used during Xarray save : {memory_mb:.1f}Mb")

from which..

Peak memory used during Xarray save : 30.2Mb

Expected result

The total memory required is expected to be only be around N or maybe N+1 * chunk sizes,
where N is the number of Dask workers
-- which here that was =4 (threads or "cpus").
So in this case, approx 8Mb per chunk, expected ~32-40 Mb., as seen in the Xarray case.

@pp-mo pp-mo changed the title Some iris saves to netcdf fetch all the data. Some iris netcdf saves will fetch all the data. Feb 16, 2024
@pp-mo pp-mo changed the title Some iris netcdf saves will fetch all the data. Some iris netcdf saves are fetching all the data. Feb 16, 2024
@pp-mo
Copy link
Member Author

pp-mo commented Feb 16, 2024

Further notes

I already investigated this in some depth, and it looks like the problem is the delayed-write mechanism introduced with #5191 .
In that, the da.store operation is lazy, and it is then combined in a delayed function with the results of fill-value checking.

So, I made some experiments to replicate this problem using synthetic data (as the above example), and a variety of storage mechanisms, like this :

if store_op == "all-direct":
    delayed = da.store(lazydata_all, store_target)

elif store_op == "lazy-save":
    delayed = da.store(lazydata_all, store_target, compute=False, lock=False)
    delayed.compute()

elif store_op == "lazy-via-delayed":

    @dask.delayed
    def mysummary(stores, data):
        return data

    delayed_save = da.store(lazydata_all, store_target, compute=False, lock=False)

    result = mysummary(delayed_save, 0)

    result.compute()

elif store_op == "lazy-combined":
    @dask.delayed
    def mysummary(stores, data):
        return data

    delayed_save = da.store(lazydata_all, store_target, compute=False, lock=False)

    alt_value = lazydata_all[0, 0, 0]
    result = mysummary(delayed_save, alt_value)

    result.compute()

elif store_op == "lazy-scanandstore":

    def my_storing_arrayop(data, store, check_min, check_max):
        store = data
        result = np.count_nonzero((check_min <= data) & (data < check_max))
        return np.array([result])

    lazy_results = da.map_blocks(
        my_storing_arrayop,
        lazydata_all,
        store_target,
        0.0, 0.1,  # min+max check range ops ==> count negative values
        chunks=(1,),  # indicate that results are scalars
        dtype=bool,
        drop_axis=[1, 2]  # indicates that results are scalars
    ).sum()
    result = lazy_results.compute()

    datasize = lazydata_all.size
    percent = 100.0 * result / datasize
    print(
        f"Lazy scan-and-store counted values < 0.1 :  {result}"
        f"\n  ~{percent:0.2f}% of {datasize}'"
    )

_, peak_mem = tracemalloc.get_traced_memory()
memory_mb = peak_mem * 1.0 / 1024 ** 2print("\nDone.")
print(f"Consumed memory ~ {mib:6.2f} Mb.")

Results ..

With similar data to the original case, the various above "store_op" options showed these memory usages :

Saving mode, "store_op=" Consumed memory / Mb
'all-direct' 33.71
'lazy-save' 33.70
'lazy-via-delayed' 33.70
'lazy-combined' 432.50
'lazy-scanandstore' 55.63

.. which means

wrapping the "lazy store operation" in a delayed is not in itself a problem (option 'lazy-via-delayed') , even with the additional "data" argument to the delayed function (in this case not a lazy object)

BUT passing an extra delayed argument which is a calculation on the same data does cause the problem ("lazy-combined" case)
-- it seems that dask in this case cannot "see" that both operations can be performed on 1 chunk at a time
So, this "'lazy-combined'" version is specifically designed to mimic what the netcdf saver is doing here
-- (in that context, the purpose of the 'additional' calculation is the fill-value check ).

HOWEVER, the "lazy-scanandstore" I think shows a possible way out.
The map_blocks operation now scans each section of data and stores it. So the chunks are treated separately, as they should be. So I think this is a model for how to fix the problem

@trexfeathers
Copy link
Contributor

@bouweandela @fnattino have you seen any examples of this?

@bouweandela
Copy link
Member

Have you seen dask/dask#8380 @pp-mo? I think it may be relevant here, but I'm not entirely sure yet.

@bouweandela
Copy link
Member

bouweandela commented Mar 6, 2024

I suspect this issue is caused by some variation of the issue described here. Adding

    if arraylib is da:
        from dask.graph_manipulation import clone
        data = clone(data, assume_layers=True)

before this code

is_masked = arraylib.any(arraylib.ma.getmaskarray(data))

seems to avoid the high memory use by decoupling the store graph from the fill value check graph (at the cost of generating the source data twice).

@pp-mo
Copy link
Member Author

pp-mo commented Mar 8, 2024

I suspect this issue is caused by some variation of the issue described here.
... at the cost of generating the source data twice

Well, interesting. But TBH I don't really understand what this is really doing.

FWIW I think we are very keen not to fetch data twice, if it can at all be avoided -- and if we accepted that, we could simply do fill-value checking and storage in separate steps.

Do you think this approach could possibly be modified to deliver the desired one-pass data store-and-check, @bouweandela ?

@pp-mo
Copy link
Member Author

pp-mo commented Mar 11, 2024

Relevant issue raised on Dask repo
dask/dask#10991

@trexfeathers
Copy link
Contributor

@pp-mo happy to close this given #5833 ?

@pp-mo pp-mo closed this as completed Apr 3, 2024
@scitools-ci scitools-ci bot removed this from 🚴 Peloton May 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants