Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve serializability of Recipe and related classes #116

Closed
rabernat opened this issue Apr 30, 2021 · 5 comments · Fixed by #117 or #160
Closed

Improve serializability of Recipe and related classes #116

rabernat opened this issue Apr 30, 2021 · 5 comments · Fixed by #117 or #160

Comments

@rabernat
Copy link
Contributor

After the refactor in #101, our dask graphs have gotten a lot bigger.

I did the following, based on the NOAA OISST recipe from the docs

import pandas as pd
import s3fs
import gcsfs
import json

from pangeo_forge.patterns import pattern_from_file_sequence
from pangeo_forge.recipes import XarrayZarrRecipe
from pangeo_forge.storage import FSSpecTarget, CacheFSSpecTarget

# set up recipe

input_url_pattern = (
    "https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation"
    "/v2.1/access/avhrr/{yyyymm}/oisst-avhrr-v02r01.{yyyymmdd}.nc"
)

dates = pd.date_range("1981-09-01", "2021-01-05", freq="D")
input_urls = [
    input_url_pattern.format(
        yyyymm=day.strftime("%Y%m"), yyyymmdd=day.strftime("%Y%m%d")
    )
    for day in dates
]

pattern = pattern_from_file_sequence(input_urls, "time", nitems_per_file=1)
recipe = XarrayZarrRecipe(pattern, inputs_per_chunk=5)

# set up storage

fmt = 'zarr'
recipe_key = 'oisst_avhrr_v02r01'
recipe_name = f'{feedstock_name}/{recipe_key}'

feedstock_name = 'noaa_oisst'

cache_base = f'gs://pangeo-forge-us-central1/pangeo-forge-cache'
metadata_base = f'gs://pangeo-forge-us-central1/pangeo-forge-metadata'

with open('pangeo-181919-cc01a4fbb1ef.json') as fp:
    token = json.load(fp)
fs_gcs = gcsfs.GCSFileSystem(token=token, cache_timeout=-1, cache_type="none")

endpoint_url = 'https://ncsa.osn.xsede.org'
fs_osn = s3fs.S3FileSystem(
    key=osn_key,
    secret=osn_secret,
    client_kwargs={'endpoint_url': endpoint_url},
    default_cache_type='none',
    default_fill_cache=False,
    listings_expiry_time=0.01,
)

target_base = f's3://Pangeo/pangeo-forge'
recipe.input_cache = CacheFSSpecTarget(fs_gcs, f"{cache_base}/{recipe_name}")
recipe.metadata_cache = FSSpecTarget(fs_gcs, f"{metadata_base}/{recipe_name}")
recipe.target = FSSpecTarget(fs_osn, f"{target_base}/{recipe_name}.{fmt}") 

Then I converted it to a dask delayed object to execute on a Dask Gateway cluster

from pangeo_forge.executors import PythonPipelineExecutor
executor_dask = DaskPipelineExecutor()
pl = recipe.to_pipelines()
plan = executor_dask.pipelines_to_plan(pl)  # dask.Delayed obj

The graph is about 10 MB, not huge, right?

import cloudpickle
serialized = cloudpickle.dumps(plan.dask)
len(serialized)
# -> 10620079

But when I call

dask.persist(plan, retries=5)

I quickly run out of memory in my notebook pod (16 GB) before the the persist call finished. I also tried with , optimize_graph=False.

One culprit could be the new FilePattern class, which stores a big xarray dataset internally
https://github.com/pangeo-forge/pangeo-forge/blob/1c5f3520d6fdca718cb372d60161d4fd8617dcbb/pangeo_forge/patterns.py#L81

I feel like we could optimize this using __getstate__ / __setstate.

@martindurant it would be great if you could look at this.

@rabernat
Copy link
Contributor Author

rabernat commented May 3, 2021

I tried to mitigate this issue via #117. I also tried defining the recipe via

from pangeo_forge.patterns import ConcatDim, FilePattern
import pandas as pd


def make_url(time):
    input_url_pattern = (
        "https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation"
        "/v2.1/access/avhrr/{yyyymm}/oisst-avhrr-v02r01.{yyyymmdd}.nc"
    )
    return input_url_pattern.format(
        yyyymm=time.strftime("%Y%m"), yyyymmdd=time.strftime("%Y%m%d")
    )


time_dim = ConcatDim(
    "time",
    pd.date_range("1981-09-01", "2021-01-05", freq="D"),
    nitems_per_file=1
)

pattern = FilePattern(make_url, time_dim)
recipe = XarrayZarrRecipe(pattern, inputs_per_chunk=5)

completely skipping any storage assignments. However, the execution step

from pangeo_forge.executors import PythonPipelineExecutor
executor_dask = DaskPipelineExecutor()
pl = recipe.to_pipelines()
plan = executor_dask.pipelines_to_plan(pl) 
dask.persist(plan, retries=5)

...still blows up my 16 GB of memory.

What tools can I use to debug the serialization of this graph to understand why it's so memory hungry.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented May 12, 2021

Looking into this:

stats = []
from time import perf_counter

for k, v in plan.dask.items():
    t0 = perf_counter()
    out = cloudpickle.dumps(v)
    t1 = perf_counter()
    stats.append((k, t1 - t0, len(out)))

import pandas as pd

df = pd.DataFrame(stats, columns=['key', 'time', 'size'])

df['hl_key'] = [dask.utils.key_split(x) for x in df['key'].array]
df.groupby("hl_key")[['time', 'size']].sum()
hl_key time size
cache_input 1.28 1.03953e+09
finalize_target 0 849997
merge 0 112
prepare_target 0 851614
stage 0 11173
store_chunk 0.64 2.08589e+08

On a subset of the data with ~1500 tasks. We need to improve the various objects in the recipe. I'll take a look at that next.

@rabernat
Copy link
Contributor Author

Thanks Tom! Can you try with #117? That's my attempt to improve serializability of the FilePattern objects. But it didn't seem to help.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented May 24, 2021

(copied from #117 (comment)). We can pursue this if serialization is still an issue.

I think the best path forward here is to construct a custom HighLevelGraph (or perhaps a Layer, I haven't kept up to date but layer seems like the more appropriate class these days), focusing on the cache_input stage. Right now that has stage of the graph grows linearly with the number of inputs. I'll need to check the general case, but at least for the test case the layer something like:

from dask.highlevelgraph import Layer

class CacheInputLayer(Layer):
    def __init__(self, annotations=None, func=None, n=None):
        self.token = dask.base.tokenize(func)
        self.func = func
        self.n = n
        super().__init__(annotations=annotations)

    def __getitem__(self, index):
        token, n = index
        if token == self.token and isinstance(n, int) and (0 <= n < self.n):
            return (self.func, n)
        
    def __len__(self):
        return self.n
    
    def __iter__(self):
        for i in range(self.n):
            yield (self.func, i)
        
    def is_materialized(self):
        return False


cache_input = pl[0][0]
layer = CacheInputLayer(func=cache_input.func, n=len(cache_input.map_args))

The idea is to replace all the cache_input-<token> layers that end up in the Dask graph from the rechunker plan = executor_dask.pipelines_to_plan(pl) line with a mapping that gives the same result for those keys.

A single instance of that class will replace the n inputs 14,006 in the full example. (Note that the class doesn't implement any kind of error checking, etc.). Then it doesn't really matter how large / slow the FilePattern is to serialize since we only have one.

I don't know when I'll have time to get to this, so if anyone wants to dig into it feel free. Otherwise I'll try to over the next couple weeks.

@rabernat
Copy link
Contributor Author

rabernat commented Jun 1, 2021

We are still struggling with memory issues for large recipes. The issue is coming up not just with the Dask executor but also with Prefect (see #151). So some deep debugging / creative thinking is still needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants