-
Notifications
You must be signed in to change notification settings - Fork 27
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* RLS: v0.3.4 * add new types * python executor working * got prefect working * made dask work with zarr arrays; now have to do dask inputs * removed limitations on executor; still looking for source of dask failures * works! now need to remove comments * cleanup * fix pre-commit * add forgotten file * wip * dask, prefect, python executors refactored * isort * isort * remove debugging * fix type hints * rearrange modules * add dedicated pipeline tests * refactor pre-commit ci * try pre-commit without installing env * remove python 3.9 from CI * found silent prefect bug
- Loading branch information
Showing
22 changed files
with
381 additions
and
224 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,14 +19,20 @@ jobs: | |
run: | | ||
python -m pip install --upgrade pip | ||
pip install .[dev] | ||
- name: Run pre-commit | ||
uses: pre-commit/[email protected] | ||
- name: Test with pytest | ||
run: | | ||
py.test tests -v --cov=rechunker --cov-config .coveragerc --cov-report term-missing | ||
coverage xml | ||
- name: Codecov | ||
uses: codecov/codecov-action@v1 | ||
- name: Check type hints | ||
run: | | ||
mypy rechunker | ||
|
||
lint: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- name: Set up Python 3.8 | ||
uses: actions/setup-python@v2 | ||
with: | ||
python-version: 3.8 | ||
- name: Run pre-commit | ||
uses: pre-commit/[email protected] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
from functools import reduce | ||
import operator | ||
from functools import reduce | ||
from typing import Sequence | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from .dask import DaskPipelineExecutor | ||
from .prefect import PrefectPipelineExecutor | ||
from .python import PythonPipelineExecutor | ||
|
||
__all__ = ["PythonPipelineExecutor", "DaskPipelineExecutor", "PrefectPipelineExecutor"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,116 +1,103 @@ | ||
import uuid | ||
from typing import Iterable, Tuple | ||
from functools import reduce | ||
from typing import Iterable | ||
|
||
import dask | ||
import dask.array | ||
from dask.delayed import Delayed | ||
from dask.optimization import fuse | ||
|
||
from rechunker.types import Array, CopySpec, Executor | ||
from rechunker.types import ( | ||
MultiStagePipeline, | ||
ParallelPipelines, | ||
PipelineExecutor, | ||
Stage, | ||
) | ||
|
||
|
||
class DaskExecutor(Executor[Delayed]): | ||
class DaskPipelineExecutor(PipelineExecutor[Delayed]): | ||
"""An execution engine based on dask. | ||
Supports zarr and dask arrays as inputs. Outputs must be zarr arrays. | ||
Execution plans for DaskExecutors are dask.delayed objects. | ||
""" | ||
|
||
def prepare_plan(self, specs: Iterable[CopySpec]) -> Delayed: | ||
return _copy_all(specs) | ||
def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Delayed: | ||
return _make_pipelines(pipelines) | ||
|
||
def execute_plan(self, plan: Delayed, **kwargs): | ||
return plan.compute(**kwargs) | ||
|
||
|
||
def _direct_array_copy( | ||
source: Array, target: Array, chunks: Tuple[int, ...] | ||
) -> Delayed: | ||
"""Direct copy between arrays.""" | ||
if isinstance(source, dask.array.Array): | ||
source_read = source | ||
def _make_pipelines(pipelines: ParallelPipelines) -> Delayed: | ||
pipelines_delayed = [_make_pipeline(pipeline) for pipeline in pipelines] | ||
return _merge(*pipelines_delayed) | ||
|
||
|
||
def _make_pipeline(pipeline: MultiStagePipeline) -> Delayed: | ||
stages_delayed = [_make_stage(stage) for stage in pipeline] | ||
d = reduce(_add_upstream, stages_delayed) | ||
return d | ||
|
||
|
||
def _make_stage(stage: Stage) -> Delayed: | ||
if stage.map_args is None: | ||
return dask.delayed(stage.func)() | ||
else: | ||
source_read = dask.array.from_zarr(source, chunks=chunks) | ||
target_store_delayed = dask.array.store( | ||
source_read, target, lock=False, compute=False | ||
name = stage.func.__name__ + "-" + dask.base.tokenize(stage.func) | ||
dsk = {(name, i): (stage.func, arg) for i, arg in enumerate(stage.map_args)} | ||
# create a barrier | ||
top_key = "stage-" + dask.base.tokenize(stage.func, stage.map_args) | ||
|
||
def merge_all(*args): | ||
# this function is dependent on its arguments but doesn't actually do anything | ||
return None | ||
|
||
dsk.update({top_key: (merge_all, *list(dsk))}) | ||
return Delayed(top_key, dsk) | ||
|
||
|
||
def _merge_task(*args): | ||
pass | ||
|
||
|
||
def _merge(*args: Iterable[Delayed]) -> Delayed: | ||
name = "merge-" + dask.base.tokenize(*args) | ||
# mypy doesn't like arg.key | ||
keys = [getattr(arg, "key") for arg in args] | ||
new_task = (_merge_task, *keys) | ||
# mypy doesn't like arg.dask | ||
graph = dask.base.merge( | ||
*[dask.utils.ensure_dict(getattr(arg, "dask")) for arg in args] | ||
) | ||
return target_store_delayed | ||
graph[name] = new_task | ||
d = Delayed(name, graph) | ||
return d | ||
|
||
|
||
def _chunked_array_copy(spec: CopySpec) -> Delayed: | ||
"""Chunked copy between arrays.""" | ||
if spec.intermediate.array is None: | ||
target_store_delayed = _direct_array_copy( | ||
spec.read.array, spec.write.array, spec.read.chunks, | ||
) | ||
def _add_upstream(first: Delayed, second: Delayed): | ||
upstream_key = first.key | ||
dsk = second.dask | ||
top_layer = _get_top_layer(dsk) | ||
new_top_layer = {} | ||
|
||
# fuse | ||
target_dsk = dask.utils.ensure_dict(target_store_delayed.dask) | ||
dsk_fused, _ = fuse(target_dsk) | ||
for key, value in top_layer.items(): | ||
new_top_layer[key] = ((lambda a, b: a), value, upstream_key) | ||
|
||
return Delayed(target_store_delayed.key, dsk_fused) | ||
dsk_new = dask.base.merge( | ||
dask.utils.ensure_dict(first.dask), dask.utils.ensure_dict(dsk), new_top_layer | ||
) | ||
|
||
return Delayed(second.key, dsk_new) | ||
|
||
|
||
def _get_top_layer(dsk): | ||
if hasattr(dsk, "layers"): | ||
# this is a HighLevelGraph | ||
top_layer_key = list(dsk.layers)[0] | ||
top_layer = dsk.layers[top_layer_key] | ||
else: | ||
# do intermediate store | ||
int_store_delayed = _direct_array_copy( | ||
spec.read.array, spec.intermediate.array, spec.read.chunks, | ||
) | ||
target_store_delayed = _direct_array_copy( | ||
spec.intermediate.array, spec.write.array, spec.write.chunks, | ||
) | ||
|
||
# now do some hacking to chain these together into a single graph. | ||
# get the two graphs as dicts | ||
int_dsk = dask.utils.ensure_dict(int_store_delayed.dask) | ||
target_dsk = dask.utils.ensure_dict(target_store_delayed.dask) | ||
|
||
# find the root store key representing the read | ||
root_keys = [] | ||
for key in target_dsk: | ||
if isinstance(key, str): | ||
if key.startswith("from-zarr"): | ||
root_keys.append(key) | ||
assert len(root_keys) == 1 | ||
root_key = root_keys[0] | ||
|
||
# now rewrite the graph | ||
target_dsk[root_key] = ( | ||
lambda a, *b: a, | ||
target_dsk[root_key], | ||
*int_dsk[int_store_delayed.key], | ||
) | ||
target_dsk.update(int_dsk) | ||
|
||
# fuse | ||
dsk_fused, _ = fuse(target_dsk) | ||
return Delayed(target_store_delayed.key, dsk_fused) | ||
|
||
|
||
def _barrier(*args): | ||
return None | ||
|
||
|
||
def _copy_all(specs: Iterable[CopySpec],) -> Delayed: | ||
|
||
stores_delayed = [_chunked_array_copy(spec) for spec in specs] | ||
|
||
if len(stores_delayed) == 1: | ||
return stores_delayed[0] | ||
|
||
# This next block makes a task that | ||
# 1. depends on each of the component arrays | ||
# 2. but doesn't require transmitting large dependencies (depend on barrier_name, | ||
# rather than on part.key directly) to compute the result | ||
always_new_token = uuid.uuid1().hex | ||
barrier_name = "barrier-" + always_new_token | ||
dsk2 = { | ||
(barrier_name, i): (_barrier, part.key) for i, part in enumerate(stores_delayed) | ||
} | ||
|
||
name = "rechunked-" + dask.base.tokenize([x.name for x in stores_delayed]) | ||
dsk = dask.base.merge(*[x.dask for x in stores_delayed], dsk2) | ||
dsk[name] = (_barrier,) + tuple( | ||
(barrier_name, i) for i, _ in enumerate(stores_delayed) | ||
) | ||
return Delayed(name, dsk) | ||
# could this go wrong? | ||
first_key = next(iter(dsk)) | ||
first_task = first_key[0].split("-")[0] | ||
top_layer = {k: v for k, v in dsk.items() if k[0].startswith(first_task + "-")} | ||
return top_layer |
Oops, something went wrong.