Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor XarrayZarrRecipe to be serialization-friendly #160

Merged
merged 23 commits into from
Jun 25, 2021
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,6 @@ dmypy.json
# Pyre type checker
.pyre/
_version.py

# tutorials
*.nc
59 changes: 33 additions & 26 deletions docs/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ recipe.prepare_target()

For example, for Zarr targets, this sets up the Zarr group with the necessary
arrays and metadata.
This is the most complex step, and the most likely place to get an error.

### Stage 3: Store Chunks

Expand All @@ -57,43 +58,49 @@ If there is any cleanup or consolidation to be done, it happens here.
recipe.finalize_target()
```

For example, consolidating Zarr metadta happens in the finalize step.
For example, consolidating Zarr metadata happens in the finalize step.

## Execution by Executors
## Compiled Recipes

Very large recipes cannot feasibly be executed this way.
To support distributed parallel execution, Pangeo Forge borrows the
[Executors framework from Rechunker](https://rechunker.readthedocs.io/en/latest/executors.html).
Instead, recipes can be _compiled_ to executable objects.
We currently support three types of compilation.

There are currently three executors implemented.
- {class}`pangeo_forge_recipes.executors.PythonPipelineExecutor`: a reference executor
using simple python
- {class}`pangeo_forge_recipes.executors.DaskPipelineExecutor`: distributed executor using Dask
- {class}`pangeo_forge_recipes.executors.PrefectPipelineExecutor`: distributed executor using Prefect
### Python Function

To use an executor, the recipe must first be transformed into a `Pipeline` object.
The full process looks like this:
To convert a recipe to a single python function, use the method `.to_function()`.
For example

```{code-block} python
pipeline = recipe.to_pipelines()
executor = PrefectPipelineExecutor()
plan = executor.pipelines_to_plan(pipeline)
executor.execute_plan(plan) # actually runs the recipe
recipe_func = recipe.to_function()
recipe_func() # actually execute the recipe
```

## Executors
Note that the python function approach does not support parallel or distributed execution.
It's mostly just a convenience utility.

```{eval-rst}
.. autoclass:: pangeo_forge_recipes.executors.PythonPipelineExecutor
:members:
```

```{eval-rst}
.. autoclass:: pangeo_forge_recipes.executors.DaskPipelineExecutor
:members:
### Dask Delayed

You can convert your recipe to a [Dask Delayed](https://docs.dask.org/en/latest/delayed.html)
object using the `.to_dask()` method. For example

```{code-block} python
delayed = recipe.to_dask()
delayed.compute()
```

```{eval-rst}
.. autoclass:: pangeo_forge_recipes.executors.PrefectPipelineExecutor
:members:
The `delayed` object can be executed by any of Dask's schedulers, including
cloud and HPC distributed schedulers.

### Prefect Flow

You can convert your recipe to a [Prefect Flow](https://docs.prefect.io/core/concepts/flows.html) using
the :meth:`BaseRecipe.to_prefect()` method. For example

```{code-block} python
flow = recipe.to_prefect()
flow.run()
```

By default the flow is run using Prefect's [LocalExecutor](https://docs.prefect.io/orchestration/flow_config/executors.html#localexecutor). See [executors](https://docs.prefect.io/orchestration/flow_config/executors.html) for more.
8 changes: 3 additions & 5 deletions docs/tutorials/multi_variable_recipe.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2355,10 +2355,8 @@
}
],
"source": [
"from pangeo_forge_recipes.executors import PrefectPipelineExecutor\n",
"executor = PrefectPipelineExecutor()\n",
"flow = executor.pipelines_to_plan(recipe.to_pipelines())\n",
"flow"
"flow = recipe.to_prefect()\n",
"flow.run()"
]
},
{
Expand Down Expand Up @@ -3986,7 +3984,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
"version": "3.8.6"
}
},
"nbformat": 4,
Expand Down
9 changes: 3 additions & 6 deletions docs/tutorials/terraclimate.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -950,11 +950,8 @@
}
],
"source": [
"from pangeo_forge_recipes.executors import PrefectPipelineExecutor\n",
"pipelines = recipe.to_pipelines()\n",
"executor = PrefectPipelineExecutor()\n",
"plan = executor.pipelines_to_plan(pipelines)\n",
"executor.execute_plan(plan)"
"flow = recipe.to_prefect()\n",
"flow.run()"
]
},
{
Expand Down Expand Up @@ -3144,7 +3141,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
"version": "3.8.6"
}
},
"nbformat": 4,
Expand Down
86 changes: 85 additions & 1 deletion pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from abc import ABC, abstractmethod
from functools import partial
from typing import Callable, Hashable, Iterable
Expand Down Expand Up @@ -78,7 +79,11 @@ def finalize_target(self) -> Callable[[], None]:
def to_pipelines(self) -> ParallelPipelines:
"""Translate recipe to pipeline for execution.
"""

warnings.warn(
"'to_pipelines' is deprecated. Use one of 'to_function', 'to_dask', or "
"'to_prefect' directly instead.",
FutureWarning,
)
pipeline = [] # type: MultiStagePipeline
if getattr(self, "cache_inputs", False): # TODO: formalize this contract
pipeline.append(Stage(self.cache_input, list(self.iter_inputs())))
Expand All @@ -89,6 +94,77 @@ def to_pipelines(self) -> ParallelPipelines:
pipelines.append(pipeline)
return pipelines

def to_function(self) -> Callable[[], None]:
"""
Translate the recipe to a Python function for execution.
"""

def pipeline():
# TODO: formalize this contract
if getattr(self, "cache_inputs", False):
for input_key in self.iter_inputs():
self.cache_input(input_key)
self.prepare_target()
for chunk_key in self.iter_chunks():
self.store_chunk(chunk_key)
self.finalize_target()

return pipeline

def to_dask(self):
"""
Translate the recipe to a dask.Delayed object for parallel execution.
"""
import dask

tasks = []
if getattr(self, "cache_inputs", False):
f = dask.delayed(self.cache_input)
for input_key in self.iter_inputs():
tasks.append(f(input_key))

b0 = dask.delayed(_barrier)(*tasks)
b1 = dask.delayed(_wait_and_call)(self.prepare_target, b0)
tasks = []
for chunk_key in self.iter_chunks():
tasks.append(dask.delayed(_wait_and_call)(self.store_chunk, b1, chunk_key))

b2 = dask.delayed(_barrier)(*tasks)
b3 = dask.delayed(_wait_and_call)(self.finalize_target, b2)
return b3

def to_prefect(self):
"""Compile the recipe to a Prefect.Flow object."""
from prefect import Flow, task, unmapped

has_cache_inputs = getattr(self, "cache_inputs", False)
if has_cache_inputs:
cache_input_task = task(self.cache_input, name="cache_input")
prepare_target_task = task(self.prepare_target, name="prepare_target")
store_chunk_task = task(self.store_chunk, name="store_chunk")
finalize_target_task = task(self.finalize_target, name="finalize_target")

with Flow("pangeo-forge-recipe") as flow:
if has_cache_inputs:
cache_task = cache_input_task.map(input_key=list(self.iter_inputs()))
upstream_tasks = [cache_task]
else:
upstream_tasks = []
prepare_task = prepare_target_task(upstream_tasks=upstream_tasks)
store_task = store_chunk_task.map(
chunk_key=list(self.iter_chunks()), upstream_tasks=[unmapped(prepare_task)],
)
_ = finalize_target_task(upstream_tasks=[store_task])

return flow

def __iter__(self):
if hasattr(self, "cache_inputs"):
yield self.cache_input, self.iter_inputs()
yield self.prepare_target, []
yield self.store_chunk, self.iter_chunks()
yield self.finalize_target, []

# https://stackoverflow.com/questions/59986413/achieving-multiple-inheritance-using-python-dataclasses
def __post_init__(self):
# just intercept the __post_init__ calls so they
Expand All @@ -111,3 +187,11 @@ def wrapped(*args, **kwargs):
return new_func

return wrapped


def _barrier(*args):
pass


def _wait_and_call(func, b, *args):
return func(*args)
Loading