Skip to content

Commit

Permalink
Added failing tests for pangeo-forge#151
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Augspurger committed Oct 4, 2021
1 parent b717031 commit 262b240
Showing 1 changed file with 59 additions and 2 deletions.
61 changes: 59 additions & 2 deletions tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from dataclasses import replace
from unittest.mock import patch

import dask.core
import pytest
import xarray as xr
import zarr

# need to import this way (rather than use pytest.lazy_fixture) to make it work with dask
from pytest_lazyfixture import lazy_fixture

from pangeo_forge_recipes.patterns import FilePattern
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.recipes.base import BaseRecipe
from pangeo_forge_recipes.recipes.xarray_zarr import XarrayZarrRecipe

Expand Down Expand Up @@ -167,7 +168,7 @@ def test_recipe_caching_copying(recipe, execute_recipe, cache_inputs, copy_input
file_pattern,
**kwargs,
cache_inputs=cache_inputs,
copy_input_to_local_file=copy_input_to_local_file
copy_input_to_local_file=copy_input_to_local_file,
)
execute_recipe(rec)
ds_actual = xr.open_zarr(target.get_mapper()).load()
Expand Down Expand Up @@ -454,3 +455,59 @@ def test_base_recipe():
recipe.to_prefect().run()
assert recipe.finalized
assert recipe.target == {i: i for i in range(4)}


def _make_filename_for_memory_usage_test(time):
import pandas as pd

input_url_pattern = (
"https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/{yyyy}/{mm}/{dd}/"
"imerg/3B-HHR.MS.MRG.3IMERG.{yyyymmdd}-S{sh}{sm}00-E{eh}{em}59.{MMMM}.V06B.HDF5"
).format(
yyyy=time.strftime("%Y"),
mm=time.strftime("%m"),
dd=time.strftime("%d"),
yyyymmdd=time.strftime("%Y%m%d"),
sh=time.strftime("%H"),
sm=time.strftime("%M"),
eh=time.strftime("%H"),
em=(time + pd.Timedelta("29 min")).strftime("%M"),
MMMM=f"{(time.hour*60 + time.minute):04}",
)
return input_url_pattern


def _simple_func(*args, **kwargs):
return None


@pytest.mark.timeout(90)
@pytest.mark.filterwarnings("ignore:Large object")
def test_memory_usage():
# https://github.com/pangeo-forge/pangeo-forge-recipes/issues/151
# Requires >4 GiB of memory to run.
pd = pytest.importorskip("pandas")
distributed = pytest.importorskip("distributed")

dates = pd.date_range("2020-05-31T00:00:00", "2021-05-31T23:59:59", freq="30min")
time_concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(_make_filename_for_memory_usage_test, time_concat_dim,)

recipe = XarrayZarrRecipe(
pattern, xarray_open_kwargs={"group": "Grid", "decode_coords": "all"}, inputs_per_chunk=1,
)

obj = recipe.to_dask()
dsk = obj.dask

for k, v in dsk.items():
if dask.core.istask(v):
_, *args = v
dsk[k] = (_simple_func,) + tuple(args)

with dask.config.set(
**{"distributed.worker.memory.pause": 0.95, "distributed.worker.memory.terminate": 0.9}
):
with distributed.Client(n_workers=1, threads_per_worker=1, memory_limit="4G") as client:
print("submitting")
client.get(dsk, [obj.key])

0 comments on commit 262b240

Please sign in to comment.