Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize file patterns #117

Merged
merged 3 commits into from
May 24, 2021

Conversation

rabernat
Copy link
Contributor

@rabernat rabernat commented May 3, 2021

This PR potentially closes #116 by making the FilePattern object smaller in serialized form.

@cisaacstern
Copy link
Member

In reviewing the copy_pruned() PR, I realized that I do not really get what pickling is, why its valuable, and when we need to use it: #139 (comment)

Re-posting here as an invitation to further discussion.

@rabernat
Copy link
Contributor Author

Pickling is a form of serialization. It takes a python object (in this case, the FilePattern object) and turns it into a set of bytes. These bytes can be sent over the network and then reconstituted on a remote computer.

When Dask runs our recipe, it converts all the operations into a dask task graph. The tasks contain functions that themselves contain FilePattern objects. In order to send the tasks to the dask scheduler / workers, these functions have to be serialized.

Before this PR, the FilePattern objects contained a rather large Xarray dataset. To avoid complex dependencies between tasks, each task contains a complete copy of the recipe object, which hold the entire FilePattern and its big xarray dataset.

The goal of this PR is to use pickle's getstate / setstate mechanism to avoid storing the large Xarray dataset in the serialized FilePattern.

I am curious to hear from @TomAugspurger whether this actually improved anything in the diagnostics he reported in #116 (comment).

@rabernat rabernat requested a review from TomAugspurger May 20, 2021 12:24
@rabernat
Copy link
Contributor Author

This may not help, but I don't think it harms. I will merge this at the end of the day today if there are no further comments.

@TomAugspurger
Copy link
Contributor

I think this does help. Using this test for 1000 items we went from ~2s to 1s.

import pandas as pd
import json
import fsspec
from timeit import default_timer as tic

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
from pangeo_forge_recipes.storage import FSSpecTarget, CacheFSSpecTarget

import dask
from distributed import Client
from dask.highlevelgraph import HighLevelGraph

def main():
    feedstock_name = "abc"
    # set up recipe

    input_url_pattern = (
        "https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation"
        "/v2.1/access/avhrr/{yyyymm}/oisst-avhrr-v02r01.{yyyymmdd}.nc"
    )

    dates = pd.date_range("1981-09-01", "2020-01-05", freq="D")
    input_urls = [
        input_url_pattern.format(
            yyyymm=day.strftime("%Y%m"), yyyymmdd=day.strftime("%Y%m%d")
        )
        for day in dates
    ]
    input_urls = input_urls[:1000]

    print(len(input_urls))
    pattern = pattern_from_file_sequence(input_urls, "time", nitems_per_file=1)
    recipe = XarrayZarrRecipe(pattern, inputs_per_chunk=5)

    # set up storage

    fmt = 'zarr'
    recipe_key = 'oisst_avhrr_v02r01'
    recipe_name = f'{feedstock_name}/{recipe_key}'

    feedstock_name = 'noaa_oisst'

    cache_base = f'gs://pangeo-forge-us-central1/pangeo-forge-cache'
    metadata_base = f'gs://pangeo-forge-us-central1/pangeo-forge-metadata'
    endpoint_url = 'https://ncsa.osn.xsede.org'
    fs_gcs = fsspec.get_filesystem_class("memory")("gcs")
    fs_osn = fsspec.get_filesystem_class("memory")("osn")

    target_base = f's3://Pangeo/pangeo-forge'
    recipe.input_cache = CacheFSSpecTarget(fs_gcs, f"{cache_base}/{recipe_name}")
    recipe.metadata_cache = FSSpecTarget(fs_gcs, f"{metadata_base}/{recipe_name}")
    recipe.target = FSSpecTarget(fs_osn, f"{target_base}/{recipe_name}.{fmt}") 

    from pangeo_forge_recipes.executors import DaskPipelineExecutor
    executor_dask = DaskPipelineExecutor()
    pl = recipe.to_pipelines()
    plan = executor_dask.pipelines_to_plan(pl)  # dask.Delayed obj
    dsk2 = HighLevelGraph.from_collections("x", plan.dask)
    keys = list(plan.dask)

    with Client() as client:
        t0 = tic()
        x = dsk2.__dask_distributed_pack__(client, keys)
        t1 = tic()

        print(t1 - t0)

if __name__ == "__main__":
    main()

But for larger ones, there's still work to be done. I'll look at it for a bit.

@cisaacstern
Copy link
Member

@TomAugspurger, in order to define filesystems with

    fs_gcs = fsspec.get_filesystem_class("memory")("gcs")
    fs_osn = fsspec.get_filesystem_class("memory")("osn")

do you first register credentialed gcsfs and s3fs classes with fsspec.registery.register_implementation?

Since it looks like you run main() from the command line, how do you get these fsspec.registry additions discoverable in the runtime namespace?

Currently I'm manually writing recipes to OSN from a local notebook (with inline creds), but I would much prefer to run something like what you have above.

@martindurant
Copy link
Contributor

martindurant commented May 21, 2021

This line

fsspec.get_filesystem_class("memory")("gcs")

is the same as

fsspec.filesystem("memory",  "gcs")

but the memory filesystem doesn't actually take any arguments. This will ensure that the two instances are not the same, but warning they will share the same underlying dict storage (before this is global at the class level).

@TomAugspurger
Copy link
Contributor

TomAugspurger commented May 22, 2021

Currently I'm manually writing recipes to OSN from a local notebook (with inline creds), but I would much prefer to run something like what you have above.

Just an FYI, my filesystems in that snippet aren't actually capable of writing anywhere (other than local memory). The names fs_gcs and fs_osn are just leftover's from Ryan's original script.


I think the best path forward here is to construct a custom HighLevelGraph (or perhaps a Layer, I haven't kept up to date but layer seems like the more appropriate class these days), focusing on the cache_input stage. Right now that has stage of the graph grows linearly with the number of inputs. I'll need to check the general case, but at least for the test case the layer something like:

from dask.highlevelgraph import Layer

class CacheInputLayer(Layer):
    def __init__(self, annotations=None, func=None, n=None):
        self.token = dask.base.tokenize(func)
        self.func = func
        self.n = n
        super().__init__(annotations=annotations)

    def __getitem__(self, index):
        token, n = index
        if token == self.token and isinstance(n, int) and (0 <= n < self.n):
            return (self.func, n)
        
    def __len__(self):
        return self.n
    
    def __iter__(self):
        for i in range(self.n):
            yield (self.func, i)
        
    def is_materialized(self):
        return False


cache_input = pl[0][0]
layer = CacheInputLayer(func=cache_input.func, n=len(cache_input.map_args))

The idea is to replace all the cache_input-<token> layers that end up in the Dask graph from the rechunker plan = executor_dask.pipelines_to_plan(pl) line with a mapping that gives the same result for those keys.

A single instance of that class will replace the n inputs 14,006 in the full example. (Note that the class doesn't implement any kind of error checking, etc.). Then it doesn't really matter how large / slow the FilePattern is to serialize since we only have one.

I don't know when I'll have time to get to this, so if anyone wants to dig into it feel free. Otherwise I'll try to over the next couple weeks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve serializability of Recipe and related classes
4 participants