Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added failing tests for #151 #220

Merged
merged 3 commits into from
Oct 13, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from dataclasses import replace
from unittest.mock import patch

import dask.core
import pytest
import xarray as xr
import zarr

# need to import this way (rather than use pytest.lazy_fixture) to make it work with dask
from pytest_lazyfixture import lazy_fixture

from pangeo_forge_recipes.patterns import FilePattern
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.recipes.base import BaseRecipe
from pangeo_forge_recipes.recipes.xarray_zarr import XarrayZarrRecipe

Expand Down Expand Up @@ -167,7 +168,7 @@ def test_recipe_caching_copying(recipe, execute_recipe, cache_inputs, copy_input
file_pattern,
**kwargs,
cache_inputs=cache_inputs,
copy_input_to_local_file=copy_input_to_local_file
copy_input_to_local_file=copy_input_to_local_file,
)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
Expand Down Expand Up @@ -454,3 +455,59 @@ def test_base_recipe():
recipe.to_prefect().run()
assert recipe.finalized
assert recipe.target == {i: i for i in range(4)}


def _make_filename_for_memory_usage_test(time):
import pandas as pd

input_url_pattern = (
"https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/{yyyy}/{mm}/{dd}/"
"imerg/3B-HHR.MS.MRG.3IMERG.{yyyymmdd}-S{sh}{sm}00-E{eh}{em}59.{MMMM}.V06B.HDF5"
).format(
yyyy=time.strftime("%Y"),
mm=time.strftime("%m"),
dd=time.strftime("%d"),
yyyymmdd=time.strftime("%Y%m%d"),
sh=time.strftime("%H"),
sm=time.strftime("%M"),
eh=time.strftime("%H"),
em=(time + pd.Timedelta("29 min")).strftime("%M"),
MMMM=f"{(time.hour*60 + time.minute):04}",
)
return input_url_pattern


def _simple_func(*args, **kwargs):
return None


@pytest.mark.timeout(90)
@pytest.mark.filterwarnings("ignore:Large object")
def test_memory_usage():
# https://github.com/pangeo-forge/pangeo-forge-recipes/issues/151
# Requires >4 GiB of memory to run.
pd = pytest.importorskip("pandas")
distributed = pytest.importorskip("distributed")

dates = pd.date_range("2020-05-31T00:00:00", "2021-05-31T23:59:59", freq="30min")
time_concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(_make_filename_for_memory_usage_test, time_concat_dim,)

recipe = XarrayZarrRecipe(
pattern, xarray_open_kwargs={"group": "Grid", "decode_coords": "all"}, inputs_per_chunk=1,
)

obj = recipe.to_dask()
dsk = obj.dask

for k, v in dsk.items():
if dask.core.istask(v):
_, *args = v
dsk[k] = (_simple_func,) + tuple(args)

with dask.config.set(
**{"distributed.worker.memory.pause": 0.95, "distributed.worker.memory.terminate": 0.9}
):
with distributed.Client(n_workers=1, threads_per_worker=1, memory_limit="4G") as client:
print("submitting")
client.get(dsk, [obj.key])