From 262b24087732f510a613f6a14fa697ed3ad907c1 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 4 Oct 2021 14:47:42 -0500 Subject: [PATCH] Added failing tests for #151 --- tests/recipe_tests/test_XarrayZarrRecipe.py | 61 ++++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/tests/recipe_tests/test_XarrayZarrRecipe.py b/tests/recipe_tests/test_XarrayZarrRecipe.py index 3f056c1c..e279cf26 100644 --- a/tests/recipe_tests/test_XarrayZarrRecipe.py +++ b/tests/recipe_tests/test_XarrayZarrRecipe.py @@ -2,6 +2,7 @@ from dataclasses import replace from unittest.mock import patch +import dask.core import pytest import xarray as xr import zarr @@ -9,7 +10,7 @@ # need to import this way (rather than use pytest.lazy_fixture) to make it work with dask from pytest_lazyfixture import lazy_fixture -from pangeo_forge_recipes.patterns import FilePattern +from pangeo_forge_recipes.patterns import ConcatDim, FilePattern from pangeo_forge_recipes.recipes.base import BaseRecipe from pangeo_forge_recipes.recipes.xarray_zarr import XarrayZarrRecipe @@ -167,7 +168,7 @@ def test_recipe_caching_copying(recipe, execute_recipe, cache_inputs, copy_input file_pattern, **kwargs, cache_inputs=cache_inputs, - copy_input_to_local_file=copy_input_to_local_file + copy_input_to_local_file=copy_input_to_local_file, ) execute_recipe(rec) ds_actual = xr.open_zarr(target.get_mapper()).load() @@ -454,3 +455,59 @@ def test_base_recipe(): recipe.to_prefect().run() assert recipe.finalized assert recipe.target == {i: i for i in range(4)} + + +def _make_filename_for_memory_usage_test(time): + import pandas as pd + + input_url_pattern = ( + "https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/{yyyy}/{mm}/{dd}/" + "imerg/3B-HHR.MS.MRG.3IMERG.{yyyymmdd}-S{sh}{sm}00-E{eh}{em}59.{MMMM}.V06B.HDF5" + ).format( + yyyy=time.strftime("%Y"), + mm=time.strftime("%m"), + dd=time.strftime("%d"), + yyyymmdd=time.strftime("%Y%m%d"), + sh=time.strftime("%H"), + sm=time.strftime("%M"), + eh=time.strftime("%H"), + em=(time + pd.Timedelta("29 min")).strftime("%M"), + MMMM=f"{(time.hour*60 + time.minute):04}", + ) + return input_url_pattern + + +def _simple_func(*args, **kwargs): + return None + + +@pytest.mark.timeout(90) +@pytest.mark.filterwarnings("ignore:Large object") +def test_memory_usage(): + # https://github.com/pangeo-forge/pangeo-forge-recipes/issues/151 + # Requires >4 GiB of memory to run. + pd = pytest.importorskip("pandas") + distributed = pytest.importorskip("distributed") + + dates = pd.date_range("2020-05-31T00:00:00", "2021-05-31T23:59:59", freq="30min") + time_concat_dim = ConcatDim("time", dates, nitems_per_file=1) + pattern = FilePattern(_make_filename_for_memory_usage_test, time_concat_dim,) + + recipe = XarrayZarrRecipe( + pattern, xarray_open_kwargs={"group": "Grid", "decode_coords": "all"}, inputs_per_chunk=1, + ) + + obj = recipe.to_dask() + dsk = obj.dask + + for k, v in dsk.items(): + if dask.core.istask(v): + _, *args = v + dsk[k] = (_simple_func,) + tuple(args) + + with dask.config.set( + **{"distributed.worker.memory.pause": 0.95, "distributed.worker.memory.terminate": 0.9} + ): + with distributed.Client(n_workers=1, threads_per_worker=1, memory_limit="4G") as client: + print("submitting") + client.get(dsk, [obj.key])