Skip to content

Commit

Permalink
Merge pull request #660 from norlandrhagen/return_combined_ref
Browse files Browse the repository at this point in the history
  • Loading branch information
cisaacstern authored Dec 8, 2023
2 parents 7ba80bd + 6b45c03 commit 6e40279
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 75 deletions.
21 changes: 14 additions & 7 deletions .github/workflows/test-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ jobs:
# integration testing for 3.10 and 3.11 (for runner versions that follow that PR).
python-version: ["3.9"] # , "3.10", "3.11"]
runner-version: [
"pangeo-forge-runner==0.8.0",
"pangeo-forge-runner==0.9.0",
"pangeo-forge-runner==0.9.1",
"pangeo-forge-runner==0.9.2",
]
steps:
- uses: actions/checkout@v4
Expand All @@ -44,6 +44,18 @@ jobs:
cache: pip
cache-dependency-path: pyproject.toml


- name: Install pangeo-forge recipes and runner
shell: bash -l {0}
run: |
python -m pip install ${{ matrix.runner-version }}
python -m pip install -e ".[test,minio]"
- name: Install optional grib deps
shell: bash -l {0}
run: |
python -m pip install ecmwflibs eccodes cfgrib
- name: 'Setup minio'
run: |
wget --quiet https://dl.min.io/server/minio/release/linux-amd64/minio
Expand All @@ -54,11 +66,6 @@ jobs:
- name: 🎯 Check cache hit
run: echo '${{ steps.setup-python.outputs.cache-hit }}'
- name: 🌈 Install pangeo-forge-recipes & pangeo-forge-runner
shell: bash -l {0}
run: |
python -m pip install ${{ matrix.runner-version }}
python -m pip install -e ".[test,minio]"

# order reversed to fix https://github.com/pangeo-forge/pangeo-forge-recipes/pull/595#issuecomment-1811630921
# this should however be fixed in the runner itself
Expand Down
6 changes: 4 additions & 2 deletions examples/feedstock/hrrr_kerchunk_concat_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:

ds = xr.open_dataset(store, engine="zarr", chunks={})
ds = ds.set_coords(("latitude", "longitude"))
assert ds.attrs["centre"] == "kwbc"
assert len(ds["step"]) == 4
ds = ds.expand_dims(dim="time")
assert ds.attrs["GRIB_centre"] == "kwbc"
assert len(ds["step"]) == 2
assert len(ds["time"]) == 1
assert "t" in ds.data_vars
for coord in ["time", "surface", "latitude", "longitude"]:
Expand All @@ -52,6 +53,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
concat_dims=pattern.concat_dims,
identical_dims=identical_dims,
precombine_inputs=True,
remote_protocol=remote_protocol,
)
| "Test dataset" >> beam.Map(test_ds)
)
2 changes: 2 additions & 0 deletions examples/feedstock/hrrr_kerchunk_concat_valid_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
store_name="hrrr-concat-valid-time",
concat_dims=concat_dims,
identical_dims=identical_dims,
# fails due to: _pickle.PicklingError: Can't pickle <function drop_unknown
# at 0x290e46a70>: attribute lookup drop_unknown on __main__ failed
mzz_kwargs=dict(preprocess=drop_unknown),
precombine_inputs=True,
)
Expand Down
2 changes: 2 additions & 0 deletions examples/feedstock/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ recipes:
object: "noaa_oisst:recipe"
- id: "terraclimate"
object: "terraclimate:recipe"
- id: "hrrr-kerchunk-concat-step"
object: "hrrr_kerchunk_concat_step:recipe"
23 changes: 21 additions & 2 deletions pangeo_forge_recipes/combiners.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import operator
from dataclasses import dataclass, field
from functools import reduce
from typing import List, Sequence, Tuple
from typing import Dict, List, Optional, Sequence, Tuple

import apache_beam as beam
import fsspec
from kerchunk.combine import MultiZarrToZarr

from .aggregation import XarrayCombineAccumulator, XarraySchema
Expand Down Expand Up @@ -51,6 +52,9 @@ class CombineMultiZarrToZarr(beam.CombineFn):
:param concat_dims: Dimensions along which to concatenate inputs.
:param identical_dims: Dimensions shared among all inputs.
:param remote_options: Storage options for opening remote files
:param remote_protocol: If files are accessed over the network, provide the remote protocol
over which they are accessed. e.g.: "s3", "gcp", "https", etc.
:mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``.
:precombine_inputs: If ``True``, precombine each input with itself, using
``kerchunk.combine.MultiZarrToZarr``, before adding it to the accumulator.
Expand All @@ -61,10 +65,15 @@ class CombineMultiZarrToZarr(beam.CombineFn):
along a dimension that does not exist in the individual inputs. In this latter
case, precombining adds the additional dimension to the input so that its
dimensionality will match that of the accumulator.
:param storage_options: Storage options dict to pass to the MultiZarrToZarr
"""

concat_dims: List[str]
identical_dims: List[str]
target_options: Optional[Dict] = field(default_factory=lambda: {"anon": True})
remote_options: Optional[Dict] = field(default_factory=lambda: {"anon": True})
remote_protocol: Optional[str] = None
mzz_kwargs: dict = field(default_factory=dict)
precombine_inputs: bool = False

Expand All @@ -73,6 +82,9 @@ def to_mzz(self, references):
references,
concat_dims=self.concat_dims,
identical_dims=self.identical_dims,
target_options=self.target_options,
remote_options=self.remote_options,
remote_protocol=self.remote_protocol,
**self.mzz_kwargs,
)

Expand All @@ -92,4 +104,11 @@ def merge_accumulators(self, accumulators: Sequence[MultiZarrToZarr]) -> MultiZa
return self.to_mzz(references)

def extract_output(self, accumulator: MultiZarrToZarr) -> MultiZarrToZarr:
return accumulator
return fsspec.filesystem(
"reference",
fo=accumulator.translate(),
storage_options={
"remote_protocol": self.remote_protocol,
"skip_instance_cache": True,
},
).get_mapper()
3 changes: 3 additions & 0 deletions pangeo_forge_recipes/injections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ def get_injection_specs():
"StoreToZarr": {
"target_root": "TARGET_STORAGE",
},
"WriteReference": {
"target_root": "TARGET_STORAGE",
},
"WriteCombinedReference": {
"target_root": "TARGET_STORAGE",
},
Expand Down
94 changes: 80 additions & 14 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class OpenWithKerchunk(beam.PTransform):
# passed directly to `open_with_kerchunk`
file_type: FileType = FileType.unknown
inline_threshold: Optional[int] = 300
storage_options: Optional[Dict] = None
storage_options: Optional[Dict] = field(default_factory=dict)
remote_protocol: Optional[str] = None
kerchunk_open_kwargs: Optional[dict] = field(default_factory=dict)

Expand Down Expand Up @@ -418,6 +418,10 @@ class CombineReferences(beam.PTransform):
:param concat_dims: Dimensions along which to concatenate inputs.
:param identical_dims: Dimensions shared among all inputs.
:param target_options: Storage options for opening target files
:param remote_options: Storage options for opening remote files
:param remote_protocol: If files are accessed over the network, provide the remote protocol
over which they are accessed. e.g.: "s3", "gcp", "https", etc.
:mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``.
:precombine_inputs: If ``True``, precombine each input with itself, using
``kerchunk.combine.MultiZarrToZarr``, before adding it to the accumulator.
Expand All @@ -432,27 +436,78 @@ class CombineReferences(beam.PTransform):

concat_dims: List[str]
identical_dims: List[str]
target_options: Optional[Dict] = field(default_factory=lambda: {"anon": True})
remote_options: Optional[Dict] = field(default_factory=lambda: {"anon": True})
remote_protocol: Optional[str] = None
mzz_kwargs: dict = field(default_factory=dict)
precombine_inputs: bool = False

def expand(self, references: beam.PCollection) -> beam.PCollection:

return references | beam.CombineGlobally(
CombineMultiZarrToZarr(
concat_dims=self.concat_dims,
identical_dims=self.identical_dims,
target_options=self.target_options,
remote_options=self.remote_options,
remote_protocol=self.remote_protocol,
mzz_kwargs=self.mzz_kwargs,
precombine_inputs=self.precombine_inputs,
),
)


@dataclass
class WriteReference(beam.PTransform, ZarrWriterMixin):
"""Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object.
:param store_name: Zarr store will be created with this name under ``target_root``.
:param concat_dims: Dimensions along which to concatenate inputs.
:param target_root: Root path the Zarr store will be created inside; ``store_name``
will be appended to this prefix to create a full path.
:param output_file_name: Name to give the output references file
(``.json`` or ``.parquet`` suffix).
:param target_options: Storage options for opening target files
:param remote_options: Storage options for opening remote files
:param remote_protocol: If files are accessed over the network, provide the remote protocol
over which they are accessed. e.g.: "s3", "gcp", "https", etc.
:param mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``.
"""

store_name: str
concat_dims: List[str]
target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
default_factory=RequiredAtRuntimeDefault
)
output_file_name: str = "reference.json"
target_options: Optional[Dict] = field(default_factory=lambda: {"anon": True})
remote_options: Optional[Dict] = field(default_factory=lambda: {"anon": True})
remote_protocol: Optional[str] = None
mzz_kwargs: dict = field(default_factory=dict)

def expand(self, references: beam.PCollection) -> beam.PCollection:
return references | beam.Map(
write_combined_reference,
full_target=self.get_full_target(),
concat_dims=self.concat_dims,
output_file_name=self.output_file_name,
target_options=self.target_options,
remote_options=self.remote_options,
remote_protocol=self.remote_protocol,
mzz_kwargs=self.mzz_kwargs,
)


@dataclass
class WriteCombinedReference(beam.PTransform, ZarrWriterMixin):
"""Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object.
:param store_name: Zarr store will be created with this name under ``target_root``.
:param concat_dims: Dimensions along which to concatenate inputs.
:param identical_dims: Dimensions shared among all inputs.
:param target_options: Storage options for opening target files
:param remote_options: Storage options for opening remote files
:param remote_protocol: If files are accessed over the network, provide the remote protocol
over which they are accessed. e.g.: "s3", "gcp", "https", etc.
:param mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``.
:param precombine_inputs: If ``True``, precombine each input with itself, using
``kerchunk.combine.MultiZarrToZarr``, before adding it to the accumulator.
Expand All @@ -472,26 +527,37 @@ class WriteCombinedReference(beam.PTransform, ZarrWriterMixin):
store_name: str
concat_dims: List[str]
identical_dims: List[str]
target_options: Optional[Dict] = field(default_factory=lambda: {"anon": True})
remote_options: Optional[Dict] = field(default_factory=lambda: {"anon": True})
remote_protocol: Optional[str] = None
mzz_kwargs: dict = field(default_factory=dict)
precombine_inputs: bool = False
target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
default_factory=RequiredAtRuntimeDefault
)
output_file_name: str = "reference.json"

def expand(self, references: beam.PCollection) -> beam.PCollection:
reference = references | CombineReferences(
concat_dims=self.concat_dims,
identical_dims=self.identical_dims,
mzz_kwargs=self.mzz_kwargs,
precombine_inputs=self.precombine_inputs,
)

return reference | beam.Map(
write_combined_reference,
full_target=self.get_full_target(),
concat_dims=self.concat_dims,
output_file_name=self.output_file_name,
def expand(self, references: beam.PCollection) -> beam.PCollection[zarr.storage.FSStore]:
return (
references
| CombineReferences(
concat_dims=self.concat_dims,
identical_dims=self.identical_dims,
target_options=self.target_options,
remote_options=self.remote_options,
remote_protocol=self.remote_protocol,
mzz_kwargs=self.mzz_kwargs,
precombine_inputs=self.precombine_inputs,
)
| WriteReference(
store_name=self.store_name,
concat_dims=self.concat_dims,
target_root=self.target_root,
output_file_name=self.output_file_name,
target_options=self.target_options,
remote_options=self.remote_options,
remote_protocol=self.remote_protocol,
)
)


Expand Down
52 changes: 24 additions & 28 deletions pangeo_forge_recipes/writers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
from typing import List, Protocol, Tuple, Union
from typing import Dict, List, MutableMapping, Optional, Protocol, Tuple, Union

import fsspec
import numpy as np
import xarray as xr
import zarr
from fsspec.implementations.reference import LazyReferenceMapper
from fsspec.implementations.reference import LazyReferenceMapper, ReferenceFileSystem
from kerchunk.combine import MultiZarrToZarr

from .patterns import CombineOp, Index
Expand Down Expand Up @@ -93,62 +94,57 @@ def store_dataset_fragment(
return target_store


def _select_single_protocol(full_target: FSSpecTarget) -> str:
# Grabs first protocol if there are multiple options: Based off of logic in fsspec:
# https://github.com/fsspec/filesystem_spec/blob/b8aeb13361e89f22f323bbc93c8308ff2ffede19/fsspec/spec.py#L1410-L1414
return (
full_target.fs.protocol[0]
if isinstance(full_target.fs.protocol, (tuple, list))
else full_target.fs.protocol
)


def write_combined_reference(
reference: MultiZarrToZarr,
reference: MutableMapping,
full_target: FSSpecTarget,
concat_dims: List[str],
output_file_name: str,
target_options: Optional[Dict] = {"anon": True},
remote_options: Optional[Dict] = {"anon": True},
remote_protocol: Optional[str] = None,
refs_per_component: int = 1000,
) -> FSSpecTarget:
mzz_kwargs: Optional[Dict] = None,
) -> zarr.storage.FSStore:
"""Write a kerchunk combined references object to file."""

import ujson # type: ignore

file_ext = os.path.splitext(output_file_name)[-1]

outpath = full_target._full_path(output_file_name)

if file_ext == ".json":
multi_kerchunk = reference.translate()
with full_target.fs.open(outpath, "wb") as f:
f.write(ujson.dumps(multi_kerchunk).encode())
# If reference is a ReferenceFileSystem, write to json
if isinstance(reference, fsspec.FSMap) and isinstance(reference.fs, ReferenceFileSystem):
reference.fs.save_json(outpath, **remote_options)

elif file_ext == ".parquet":

# Creates empty parquet store to be written to
if full_target.exists(output_file_name):
full_target.rm(output_file_name, recursive=True)
full_target.makedir(output_file_name)

remote_protocol = _select_single_protocol(full_target)

out = LazyReferenceMapper.create(refs_per_component, outpath, full_target.fs)

# Calls MultiZarrToZarr on a MultiZarrToZarr object and adds kwargs to write to parquet.

MultiZarrToZarr(
[reference.translate()],
[reference],
concat_dims=concat_dims,
target_options=target_options,
remote_options=remote_options,
remote_protocol=remote_protocol,
out=out,
**mzz_kwargs,
).translate()

# call to write reference to empty parquet store
out.flush()

else:
raise NotImplementedError(f"{file_ext = } not supported.")

return full_target
return ReferenceFileSystem(
outpath,
target_options=target_options,
remote_options=remote_options,
remote_protocol=remote_protocol,
lazy=True,
).get_mapper()


class ZarrWriterProtocol(Protocol):
Expand Down
Loading

0 comments on commit 6e40279

Please sign in to comment.