Skip to content

Commit

Permalink
Issue #95 UseAbstractJobSplitter API for cross-backend splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 17, 2023
1 parent 4d00221 commit be933f6
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 69 deletions.
21 changes: 17 additions & 4 deletions src/openeo_aggregator/partitionedjobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import NamedTuple, List, Dict
from typing import NamedTuple, List, Dict, Sequence, Union, Any

from openeo_aggregator.utils import FlatPG, PGWithMetadata
from openeo_driver.errors import OpenEOApiException
Expand All @@ -22,9 +22,22 @@ class PartitionedJob(NamedTuple):
process: PGWithMetadata
metadata: dict
job_options: dict
# List of sub-jobs
subjobs: List[SubJob]
# TODO: how to express depdendencies between subjobs?
subjobs: Dict[str, SubJob]
# Optional dependencies between sub-jobs (maps subjob id to list of subjob ids it depdends on)
dependencies: Dict[str, Sequence[str]] = {}

@staticmethod
def to_subjobs_dict(
subjobs: Union[Sequence[SubJob], Dict[Any, SubJob]]
) -> Dict[str, SubJob]:
"""Helper to convert a collection of SubJobs to a dictionary"""
# TODO: hide this logic in a setter or __init__ (e.g. when outgrowing the constraints of typing.NamedTuple)
if isinstance(subjobs, Sequence):
return {f"{i:04d}": j for i, j in enumerate(subjobs)}
elif isinstance(subjobs, dict):
return {str(k): v for k, v in subjobs.items()}
else:
raise ValueError(subjobs)


# Statuses of partitioned jobs and subjobs
Expand Down
139 changes: 86 additions & 53 deletions src/openeo_aggregator/partitionedjobs/crossbackendprocessing/poc.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,88 @@
import collections
import logging
from pprint import pprint
from typing import Callable, Dict, Tuple
from typing import Callable, Dict, List

from openeo_aggregator.partitionedjobs import SubJob
from openeo_aggregator.partitionedjobs import SubJob, PartitionedJob
from openeo_aggregator.partitionedjobs.splitting import AbstractJobSplitter
from openeo_aggregator.utils import PGWithMetadata

_log = logging.getLogger(__name__)


def cross_backend_split(
process_graph: dict,
backend_for_collection: Callable[[str], str],
) -> Tuple[SubJob, Dict[str, SubJob]]:
class CrossBackendSplitter(AbstractJobSplitter):
"""
Split a process graph for cross-back-end processing
:param process_graph: flat dict representation of a process graph
:param backend_for_collection: function that determines backend for a given collection id
:return:
Split a process graph, to be executed across multiple back-ends,
based on availability of collections
"""
# Extract necessary back-ends from `load_collection` usage
backend_usage = collections.Counter(
backend_for_collection(node["arguments"]["id"])
for node in process_graph.values()
if node["process_id"] == "load_collection"
)
_log.info(f"Extracted backend usage from `load_collection` nodes: {backend_usage}")

primary_backend = backend_usage.most_common(1)[0][0]
secondary_backends = {b for b in backend_usage if b != primary_backend}
_log.info(f"Backend split: {primary_backend=} {secondary_backends=}")

primary_pg = SubJob(process_graph={}, backend_id=primary_backend)
secondary_pgs: Dict[str, SubJob] = {}
for node_id, node in process_graph.items():
if node["process_id"] == "load_collection":
bid = backend_for_collection(node["arguments"]["id"])
if bid == primary_backend:
primary_pg.process_graph[node_id] = node
def __init__(self, backend_for_collection: Callable[[str], str]):
# TODO: just handle this `backend_for_collection` callback with a regular method?
self.backend_for_collection = backend_for_collection

def split(
self, process: PGWithMetadata, metadata: dict = None, job_options: dict = None
) -> PartitionedJob:
process_graph = process["process_graph"]

# Extract necessary back-ends from `load_collection` usage
backend_usage = collections.Counter(
self.backend_for_collection(node["arguments"]["id"])
for node in process_graph.values()
if node["process_id"] == "load_collection"
)
_log.info(
f"Extracted backend usage from `load_collection` nodes: {backend_usage}"
)

primary_backend = backend_usage.most_common(1)[0][0]
secondary_backends = {b for b in backend_usage if b != primary_backend}
_log.info(f"Backend split: {primary_backend=} {secondary_backends=}")

primary_id = "primary"
primary_pg = SubJob(process_graph={}, backend_id=primary_backend)

subjobs: Dict[str, SubJob] = {primary_id: primary_pg}
dependencies: Dict[str, List[str]] = {primary_id: []}

for node_id, node in process_graph.items():
if node["process_id"] == "load_collection":
bid = self.backend_for_collection(node["arguments"]["id"])
if bid == primary_backend:
primary_pg.process_graph[node_id] = node
else:
# New secondary pg
pg = {
node_id: node,
"sr1": {
# TODO: other/better choices for save_result format (e.g. based on backend support)?
# TODO: particular format options?
"process_id": "save_result",
"arguments": {"format": "NetCDF"},
},
}
dependency_id = f"{bid}:{node_id}"
subjobs[dependency_id] = SubJob(process_graph=pg, backend_id=bid)
dependencies[primary_id].append(dependency_id)
# Link to primary pg with load_result
primary_pg.process_graph[node_id] = {
# TODO: encapsulate this placeholder process/id better?
"process_id": "load_result",
"arguments": {"id": f"placeholder:{dependency_id}"},
}
else:
# New secondary pg
pg = {
node_id: node,
"sr1": {
# TODO: other/better choices for save_result format (e.g. based on backend support)?
"process_id": "save_result",
"arguments": {"format": "NetCDF"},
},
}
dependency_id = node_id
secondary_pgs[dependency_id] = SubJob(process_graph=pg, backend_id=bid)
# Link to primary pg with load_result
primary_pg.process_graph[node_id] = {
# TODO: encapsulate this placeholder process/id better?
"process_id": "load_result",
"arguments": {"id": f"placeholder:{dependency_id}"},
}
else:
primary_pg.process_graph[node_id] = node
primary_pg.process_graph[node_id] = node

return primary_pg, secondary_pgs
return PartitionedJob(
process=process,
metadata=metadata,
job_options=job_options,
subjobs=PartitionedJob.to_subjobs_dict(subjobs),
dependencies=dependencies,
)


def main():
# Simple proof of concept for cross-backend splitting
process_graph = {
"lc1": {"process_id": "load_collection", "arguments": {"id": "VITO_1"}},
"lc2": {"process_id": "load_collection", "arguments": {"id": "SH_1"}},
Expand All @@ -78,11 +98,24 @@ def main():
print("Original PG:")
pprint(process_graph)

split = cross_backend_split(
process_graph, backend_for_collection=lambda cid: cid.split("_")[0]
splitter = CrossBackendSplitter(
backend_for_collection=lambda cid: cid.split("_")[0]
)

pjob = splitter.split({"process_graph": process_graph})

def namedtuples_to_dict(x):
"""Walk data structure and convert namedtuples to dictionaries"""
if hasattr(x, "_asdict"):
return namedtuples_to_dict(x._asdict())
elif isinstance(x, list):
return [namedtuples_to_dict(i) for i in x]
elif isinstance(x, dict):
return {k: namedtuples_to_dict(v) for k, v in x.items()}
else:
return x
print("Cross-backend split:")
pprint(split)
pprint(namedtuples_to_dict(pjob), width=120)


if __name__ == "__main__":
Expand Down
15 changes: 12 additions & 3 deletions src/openeo_aggregator/partitionedjobs/splitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class AbstractJobSplitter(metaclass=abc.ABCMeta):
def split(
self, process: PGWithMetadata, metadata: dict = None, job_options: dict = None
) -> PartitionedJob:
# TODO: how to express dependencies? give SubJobs an id for referencing?
# TODO: how to express combination/aggregation of multiple subjob results as a final result?
...

Expand All @@ -65,7 +64,12 @@ def split(
process_graph = self.processing.preprocess_process_graph(process_graph, backend_id=backend_id)

subjob = SubJob(process_graph=process_graph, backend_id=backend_id)
return PartitionedJob(process=process, metadata=metadata, job_options=job_options, subjobs=[subjob])
return PartitionedJob(
process=process,
metadata=metadata,
job_options=job_options,
subjobs=PartitionedJob.to_subjobs_dict([subjob]),
)


class TileGrid(typing.NamedTuple):
Expand Down Expand Up @@ -176,7 +180,12 @@ def split(
"tiles": [t.as_dict() for t in tiles]
}

return PartitionedJob(process=process, metadata=metadata, job_options=job_options, subjobs=subjobs)
return PartitionedJob(
process=process,
metadata=metadata,
job_options=job_options,
subjobs=PartitionedJob.to_subjobs_dict(subjobs),
)

def _extract_global_spatial_extent(self, process: PGWithMetadata) -> BoundingBox:
"""Extract global spatial extent from given process graph"""
Expand Down
1 change: 1 addition & 0 deletions src/openeo_aggregator/partitionedjobs/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def start_sjobs(self, user_id: str, pjob_id: str, flask_request: flask.Request):
start_stats = collections.Counter()
with TimingLogger(title=f"Starting partitioned job {pjob_id!r} with {len(sjobs)} sub-jobs", logger=_log.info):
# TODO: only start a subset of sub-jobs? #37
# TODO: only start sub-jobs where all dependencies are available
for sjob_id, sjob_metadata in sjobs.items():
sjob_status = self._db.get_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id)["status"]
_log.info(f"To Start: {pjob_id!r}:{sjob_id!r} (status {sjob_status})")
Expand Down
3 changes: 1 addition & 2 deletions src/openeo_aggregator/partitionedjobs/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ def insert(self, user_id: str, pjob: PartitionedJob) -> str:
)

# Insert subjobs
for i, subjob in enumerate(pjob.subjobs):
sjob_id = f"{i:04d}"
for i, (sjob_id, subjob) in enumerate(pjob.subjobs.items()):
self._client.create(
path=self._path(user_id, pjob_id, "sjobs", sjob_id),
value=self.serialize(
Expand Down
10 changes: 6 additions & 4 deletions tests/partitionedjobs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ def pjob():
process=P35,
metadata={},
job_options={},
subjobs=[
SubJob(process_graph=PG12, backend_id="b1"),
SubJob(process_graph=PG23, backend_id="b2"),
]
subjobs=PartitionedJob.to_subjobs_dict(
[
SubJob(process_graph=PG12, backend_id="b1"),
SubJob(process_graph=PG23, backend_id="b2"),
]
),
)


Expand Down
9 changes: 6 additions & 3 deletions tests/partitionedjobs/test_splitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def test_flimsy_splitter(multi_backend_connection, catalog, config):
process = {"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}}
pjob = splitter.split(process)
assert len(pjob.subjobs) == 1
assert pjob.subjobs[0].process_graph == {
((sjob_id, sjob),) = pjob.subjobs.items()
assert sjob_id == "0000"
assert sjob.process_graph == {
"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True},
}

Expand Down Expand Up @@ -93,7 +95,8 @@ def test_simple_small_coverage(self, aggregator_processing, tile_grid, west, sou

pjob = splitter.split(process=process, job_options={"tile_grid": tile_grid})
assert len(pjob.subjobs) == 1
new_process_graph = pjob.subjobs[0].process_graph
((sjob_id, sjob),) = pjob.subjobs.items()
new_process_graph = sjob.process_graph
assert len(new_process_graph) == 3
new_node_id, = set(new_process_graph.keys()).difference(process["process_graph"].keys())
assert new_process_graph["lc"] == process["process_graph"]["lc"]
Expand Down Expand Up @@ -133,7 +136,7 @@ def test_basic(self, aggregator_processing, spatial_extent, tile_grid, expected_

filter_bbox_extents = []

for subjob in pjob.subjobs:
for subjob_id, subjob in pjob.subjobs.items():
new_process_graph = subjob.process_graph
assert len(new_process_graph) == 3
new_node_id, = set(new_process_graph.keys()).difference(process["process_graph"].keys())
Expand Down

0 comments on commit be933f6

Please sign in to comment.