Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write with Zarr #86

Merged
merged 20 commits into from
Apr 2, 2021
Merged

Write with Zarr #86

merged 20 commits into from
Apr 2, 2021

Conversation

rabernat
Copy link
Contributor

In debugging some real-world use cases (e.g. pangeo-forge/staged-recipes#23), I realized that our current way of writing suffers from some performance bottlenecks. Specifically, when calling xr.to_zarr from each store_chunk task, the entire dataset has to be read AND dimension coordinates have to be loaded. This translates into 1000s of gcsfs calls. This is particularly bad for the time dimension, which is a dimension coordinate but is chunked in time.

The solution, proposed by @TomAugspurger in https://discourse.pangeo.io/t/netcdf-to-zarr-best-practices/1119, is to bypass zarr for writing individual chunks. (I still use it to set up the dataset.) This gives us the best of both worlds.

@rabernat rabernat requested a review from TomAugspurger March 22, 2021 19:14
@rabernat
Copy link
Contributor Author

Note that this requires pydata/xarray#5065

var.data
) # TODO: can we buffer large data rather than loading it all?
zarr_array = zgroup[vname]
with lock_for_conflicts((vname, conflicts)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming, it's safe to lock here, rather than up on L324 where you read the zarr_array = zgroup[vname]? Some other worker thread writing to zgroup[vname] won't change anything we care about?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I want to avoid is having "surprise" tasks get sent to the scheduler by accidentally operating on chunked arrays. In the recipe class, we use dask arrays for laziness, but we don't actually want any parallel computations happening at dask.array level. Because parallelism is managed at a higher level in pangeo forge.

The scheduler="single-threaded" thing above is to make sure we don't accidentally read from the array using multiple threads, thus oversubscribing the workers. It's not related to locking. May not strictly be necessary. At some point we should do a careful audit of this throughout the code base.

@rabernat
Copy link
Contributor Author

The tests are failing with the error

>                   ds.to_zarr(target_mapper, mode="a", compute=False, safe_chunks=False)
E                   TypeError: to_zarr() got an unexpected keyword argument 'safe_chunks'

This argument is added in pydata/xarray#5065, which is why I am installing xarray from my dev branch. It shows up as installed:

xarray                    0.5.2.dev2502+gda5d4cfd          pypi_0    pypi

(Don't know where the weird version number is coming from, but I get the same thing locally where tests pass.)

@rabernat
Copy link
Contributor Author

rabernat commented Mar 31, 2021

This test failed

_ test_chunks[dask-netCDFtoZarr_sequential_multi_variable_recipe-target_chunks5-False-chunk_expectation5-2-D] _

recipe_fixture = (<class 'pangeo_forge.recipe.NetCDFtoZarrMultiVarSequentialRecipe'>, {'input_cache': CacheFSSpecTarget(fs=<fsspec.impl...implementations.local.LocalFileSystem object at 0x7f82dc0c5790>, root_path='/tmp/pytest-of-runner/pytest-0/target278'))
execute_recipe = <function execute_recipe.<locals>.execute at 0x7f82c1d93cb0>
inputs_per_chunk = 2, target_chunks = {'lon': 12, 'time': 3}
chunk_expectation = <contextlib.nullcontext object at 0x7f82ee2ac710>
specify_nitems_per_input = False

...

>       xr.testing.assert_identical(ds_actual, ds_expected)
E       AssertionError: Left and right Dataset objects are not identical
E       
E       
E       Differing data variables:
E       L   bar      (time, lat, lon) int64 9 4 3 2 2 8 0 0 4 8 ... 9 8 4 4 6 0 3 3 9 5
E           long_name: Beautiful Bar
E       R   bar      (time, lat, lon) int64 9 4 3 2 2 8 0 0 4 8 ... 9 8 4 4 6 0 3 3 9 5
E           long_name: Beautiful Bar

This case has overlapping chunk writes and uses dask.distributed locking to avoid write conflicts. I've never seen it fail locally, but this suggests there could be an intermittent problem here. (The same test passed on py 3.8). I'm going to retrigger the tests and see if it comes up again.

https://github.com/pangeo-forge/pangeo-forge/pull/86/checks?check_run_id=2239771509#step:8:459

@rabernat
Copy link
Contributor Author

Wow, tests are green!

f"Storing variable {vname} chunk {chunk_key} "
f"to Zarr region {zarr_region}"
)
zarr_array[zarr_region] = data
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if someone who understands Xarray variable encoding could have a look at this block. Perhaps @jhamman or even @shoyer would be willing to spend 5 minutes on this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems OK to me?

@shoyer
Copy link

shoyer commented Mar 31, 2021

This is particularly bad for the time dimension, which is a dimension coordinate but is chunked in time.

Is there a good reason for why you need multiple chunks for the time coordinate? I would guess this would slow down all loads of the dataset...

Comment on lines +330 to +332
# get encoding for variable from zarr attributes
# could this backfire some way?
var_coded.encoding.update(zarr_array.attrs)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fine to me, as long as it's consistent with how xarray loads encoding for Zarr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think it is.

f"Storing variable {vname} chunk {chunk_key} "
f"to Zarr region {zarr_region}"
)
zarr_array[zarr_region] = data
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems OK to me?

@rabernat
Copy link
Contributor Author

Thanks a lot for your comments Stephan!

Is there a good reason for why you need multiple chunks for the time coordinate? I would guess this would slow down all loads of the dataset...

The way Pangeo Forge works right now is that the dataset is built up incrementally (in a distributed fashion) from each "chunk", spread over the "sequence dimension" (usually time). In contrast to the xarray open_mfdataset -> to_zarr approach, we don't know the time coordinate a-priori before we start writing chunks. In order to avoid conflicts between chunks, each variable with time in it, including dimension coordinates, needs to be chunked.

There are two possible workarounds:

  • Cache the time coordinate data when we cache each input file and write it all as one chunk in prepare_target
  • Consolidate the time variable into a single chunk during finalize_target

Both of these would be performance optimizations, since things "work" already now. So I would leave them to a future PR.

@shoyer
Copy link

shoyer commented Mar 31, 2021

The way Pangeo Forge works right now is that the dataset is built up incrementally (in a distributed fashion) from each "chunk", spread over the "sequence dimension" (usually time). In contrast to the xarray open_mfdataset -> to_zarr approach, we don't know the time coordinate a-priori before we start writing chunks. In order to avoid conflicts between chunks, each variable with time in it, including dimension coordinates, needs to be chunked.

If you look at the approach I used in #82, I don't write out coordinate variables in a chunked fashion, but rather do it ahead of time as part of initializing the Zarr store.

This works well as long as you can figure out the coordinates ahead of time:

I guess this is basically your first workaround "write it all as one chunk in prepare_target"

@rabernat
Copy link
Contributor Author

rabernat commented Apr 1, 2021

  • The "time" coordinate is prescribed by construction -- but it looks like you often do the same in Pangeo forge, e.g.

That only works for certain types of recipes, e.g. where there is a fixed number of timesteps per file. For others, you have to open every file and peek inside. We already do that in fact:

https://github.com/pangeo-forge/pangeo-forge/blob/3314162568f7d8fe3d6f06627b936cee55d99c9c/pangeo_forge/recipe.py#L294-L295

https://github.com/pangeo-forge/pangeo-forge/blob/3314162568f7d8fe3d6f06627b936cee55d99c9c/pangeo_forge/recipe.py#L369-L374

So I agree this should be possible in theory. I'll open a new issue for it.

@rabernat
Copy link
Contributor Author

rabernat commented Apr 1, 2021

Again we got corrupted data on a dask write. This strongly suggests that my implementation of locking is broken. I need to dig into this deeper.

=================================== FAILURES ===================================
_ test_chunks[dask-netCDFtoZarr_sequential_multi_variable_recipe-target_chunks5-False-chunk_expectation5-2-2D] _

recipe_fixture = (<class 'pangeo_forge.recipe.NetCDFtoZarrMultiVarSequentialRecipe'>, {'input_cache': CacheFSSpecTarget(fs=<fsspec.impl...implementations.local.LocalFileSystem object at 0x7fef02c4cc70>, root_path='/tmp/pytest-of-runner/pytest-0/target383'))
execute_recipe = <function execute_recipe.<locals>.execute at 0x7feecde29af0>
inputs_per_chunk = 2, target_chunks = {'lon': 12, 'time': 3}
chunk_expectation = <contextlib.nullcontext object at 0x7fef02ebf250>
specify_nitems_per_input = False

    @pytest.mark.parametrize("inputs_per_chunk", [1, 2])
    @pytest.mark.parametrize(
        "target_chunks,specify_nitems_per_input,chunk_expectation",
        [
            ({}, True, does_not_raise()),
            ({"lon": 12}, True, does_not_raise()),
            ({"lon": 12, "time": 1}, True, does_not_raise()),
            ({"lon": 12, "time": 3}, True, does_not_raise()),
            ({"lon": 12, "time": 1}, False, does_not_raise()),
            ({"lon": 12, "time": 3}, False, does_not_raise()),
            # can't determine target chunks for the next two because 'time' missing from target_chunks
            ({}, False, pytest.raises(ValueError)),
            ({"lon": 12}, False, pytest.raises(ValueError)),
        ],
    )
    @pytest.mark.parametrize("recipe_fixture", all_recipes)
    def test_chunks(
        recipe_fixture,
        execute_recipe,
        inputs_per_chunk,
        target_chunks,
        chunk_expectation,
        specify_nitems_per_input,
    ):
        """Check that chunking of datasets works as expected."""
    
        RecipeClass, kwargs, ds_expected, target = recipe_fixture
    
        kwargs["target_chunks"] = target_chunks
        kwargs["inputs_per_chunk"] = inputs_per_chunk
        if specify_nitems_per_input:
            kwargs["nitems_per_input"] = kwargs["nitems_per_input"]  # it's already there in kwargs
            kwargs["metadata_cache"] = None
        else:
            # file will be scanned and metadata cached
            kwargs["nitems_per_input"] = None
            kwargs["metadata_cache"] = kwargs["input_cache"]
    
        with chunk_expectation as excinfo:
            rec = RecipeClass(**kwargs)
        if excinfo:
            # don't continue if we got an exception
            return
    
        execute_recipe(rec)
    
        # chunk validation
        ds_actual = xr.open_zarr(target.get_mapper(), consolidated=True)
        sequence_chunks = ds_actual.chunks["time"]
        seq_chunk_len = target_chunks.get("time", None) or (
            kwargs["nitems_per_input"] * inputs_per_chunk
        )
        # we expect all chunks but the last to have the expected size
        assert all([item == seq_chunk_len for item in sequence_chunks[:-1]])
        for other_dim, chunk_len in target_chunks.items():
            if other_dim == "time":
                continue
            assert all([item == chunk_len for item in ds_actual.chunks[other_dim][:-1]])
    
        ds_actual.load()
>       xr.testing.assert_identical(ds_actual, ds_expected)
E       AssertionError: Left and right Dataset objects are not identical
E       
E       
E       Differing data variables:
E       L   foo      (time, lat, lon) float64 0.417 0.7203 0.0001144 ... 0.1179 0.3748
E           long_name: Fantastic Foo
E       R   foo      (time, lat, lon) float64 0.417 0.7203 0.0001144 ... 0.1179 0.3748
E           long_name: Fantastic Foo

https://github.com/pangeo-forge/pangeo-forge/pull/86/checks?check_run_id=2246593216#step:8:457

def pytest_addoption(parser):
parser.addoption(
"--redirect-dask-worker-logs-to-stdout", action="store", default="NOTSET",
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a fun solution to #84 (comment).

del mapper[key]
assert list(mapper) == []

dask.compute([do_stuff(n) for n in range(n_tasks)])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR revealed that the locking I thought I had implemented was actually broken. Here is a test that really confirms it works.

@shoyer
Copy link

shoyer commented Apr 1, 2021

  • The "time" coordinate is prescribed by construction -- but it looks like you often do the same in Pangeo forge, e.g.

That only works for certain types of recipes, e.g. where there is a fixed number of timesteps per file. For others, you have to open every file and peek inside. We already do that in fact:

This makes sense.

For what it's worth, if I were writing this from scratch, I would probably solve this by aggregating "side outputs" after looking at each file (maybe in a pre-processing pass), rather than "re-writing" chunks in the Zarr file.

I guess the downside of this is that it would require support for aggregating results in executors, rather than simply support for mapping over all inputs. This would be easy in Beam and Dask, but maybe not more ad-hoc task schedulers.

@rabernat
Copy link
Contributor Author

rabernat commented Apr 1, 2021

I really appreciate the design discussion. Thanks for taking the time! 😊

I would probably solve this by aggregating "side outputs" after looking at each file (maybe in a pre-processing pass), rather than "re-writing" chunks in the Zarr file.

In the end, aren't these kind of the same?

If we want to aggregate the time coordinate from each chunk, we need to either

  • serialize the data and send it between tasks in a gather operation
  • serialize the data and write it to persistent storage

In either case, we have to deal with encoding. (Imagine that units: days since X varies between files).

If we consolidate chunks in a finalize step, we are basically just using the zarr array as a temporary cache for data from each chunk. And we don't have to deal with a new, distinct serialization mechanism.

@rabernat
Copy link
Contributor Author

rabernat commented Apr 1, 2021

I'm going to merge this later tonight if there are no further comments.

@rabernat rabernat merged commit 721dea4 into pangeo-forge:master Apr 2, 2021
@shoyer
Copy link

shoyer commented Apr 2, 2021

If we consolidate chunks in a finalize step, we are basically just using the zarr array as a temporary cache for data from each chunk. And we don't have to deal with a new, distinct serialization mechanism.

This is true, except you also have to write everything to disk.

In practice, this is typically fine, but it's definitely slower than keeping things entirely in memory. With large enough datasets, it can make a difference (this is one reason why Spark was preferred to Hadoop).

@rabernat
Copy link
Contributor Author

rabernat commented Apr 2, 2021

I have really been liking the simplicity of zero explicit communication between tasks, because you can just manually run each task in the pipeline as a function. Makes things so much easier to develop and debug.

One way to mitigate the disk speed problem would be to use a Redis db for the metadata_cache.

@shoyer
Copy link

shoyer commented Apr 2, 2021

I have really been liking the simplicity of zero explicit communication between tasks, because you can just manually run each task in the pipeline as a function. Makes things so much easier to develop and debug.

Based on my prototype in #82, manual execution for collecting metadata might look something like:

metadata = []
for example in recipe.iter_metadata():
    metadata.append(recipe.extract_metadata(example))
recipe.prepare_target(metadata)

From an API perspective, the main difference is that an executor would need the ability to pass around Python objects. I guess this would be straightforward for most executors, but some might need an additional explicit cache.

@shoyer
Copy link

shoyer commented May 3, 2021

Hilariously I just stumbled on this PR via git-blame, only to discover that I had already been here for a long-discussion, without absorbing at all why you did this! 🤣

I just stumbled on these issues myself, as the bottleneck in my pipeline! pydata/xarray#5252 should fix the situation in upstream Xarray as well, by using consolidated metadata for incremental writes with region, and by being careful not to ever read anything other than metadata values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants