From 20124627ca22eac9a4317ceba7c3bc4b73505d9a Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 13 Mar 2023 13:09:18 +0100 Subject: [PATCH] Issue #95 UseAbstractJobSplitter API for cross-backend splitting --- .../partitionedjobs/__init__.py | 21 ++- .../crossbackendprocessing/poc.py | 139 +++++++++++------- .../partitionedjobs/splitting.py | 15 +- .../partitionedjobs/tracking.py | 1 + .../partitionedjobs/zookeeper.py | 3 +- tests/partitionedjobs/conftest.py | 10 +- tests/partitionedjobs/test_splitting.py | 9 +- 7 files changed, 129 insertions(+), 69 deletions(-) diff --git a/src/openeo_aggregator/partitionedjobs/__init__.py b/src/openeo_aggregator/partitionedjobs/__init__.py index 3d963a75..9e2af8f8 100644 --- a/src/openeo_aggregator/partitionedjobs/__init__.py +++ b/src/openeo_aggregator/partitionedjobs/__init__.py @@ -1,4 +1,4 @@ -from typing import Dict, List, NamedTuple +from typing import Any, Dict, List, NamedTuple, Sequence, Union from openeo_driver.errors import OpenEOApiException @@ -23,9 +23,22 @@ class PartitionedJob(NamedTuple): process: PGWithMetadata metadata: dict job_options: dict - # List of sub-jobs - subjobs: List[SubJob] - # TODO: how to express depdendencies between subjobs? + subjobs: Dict[str, SubJob] + # Optional dependencies between sub-jobs (maps subjob id to list of subjob ids it depdends on) + dependencies: Dict[str, Sequence[str]] = {} + + @staticmethod + def to_subjobs_dict( + subjobs: Union[Sequence[SubJob], Dict[Any, SubJob]] + ) -> Dict[str, SubJob]: + """Helper to convert a collection of SubJobs to a dictionary""" + # TODO: hide this logic in a setter or __init__ (e.g. when outgrowing the constraints of typing.NamedTuple) + if isinstance(subjobs, Sequence): + return {f"{i:04d}": j for i, j in enumerate(subjobs)} + elif isinstance(subjobs, dict): + return {str(k): v for k, v in subjobs.items()} + else: + raise ValueError(subjobs) # Statuses of partitioned jobs and subjobs diff --git a/src/openeo_aggregator/partitionedjobs/crossbackendprocessing/poc.py b/src/openeo_aggregator/partitionedjobs/crossbackendprocessing/poc.py index f7a31114..abce2f00 100644 --- a/src/openeo_aggregator/partitionedjobs/crossbackendprocessing/poc.py +++ b/src/openeo_aggregator/partitionedjobs/crossbackendprocessing/poc.py @@ -1,68 +1,88 @@ import collections import logging from pprint import pprint -from typing import Callable, Dict, Tuple +from typing import Callable, Dict, List -from openeo_aggregator.partitionedjobs import SubJob +from openeo_aggregator.partitionedjobs import PartitionedJob, SubJob +from openeo_aggregator.partitionedjobs.splitting import AbstractJobSplitter +from openeo_aggregator.utils import PGWithMetadata _log = logging.getLogger(__name__) -def cross_backend_split( - process_graph: dict, - backend_for_collection: Callable[[str], str], -) -> Tuple[SubJob, Dict[str, SubJob]]: +class CrossBackendSplitter(AbstractJobSplitter): """ - Split a process graph for cross-back-end processing - - :param process_graph: flat dict representation of a process graph - :param backend_for_collection: function that determines backend for a given collection id - :return: + Split a process graph, to be executed across multiple back-ends, + based on availability of collections """ - # Extract necessary back-ends from `load_collection` usage - backend_usage = collections.Counter( - backend_for_collection(node["arguments"]["id"]) - for node in process_graph.values() - if node["process_id"] == "load_collection" - ) - _log.info(f"Extracted backend usage from `load_collection` nodes: {backend_usage}") - - primary_backend = backend_usage.most_common(1)[0][0] - secondary_backends = {b for b in backend_usage if b != primary_backend} - _log.info(f"Backend split: {primary_backend=} {secondary_backends=}") - - primary_pg = SubJob(process_graph={}, backend_id=primary_backend) - secondary_pgs: Dict[str, SubJob] = {} - for node_id, node in process_graph.items(): - if node["process_id"] == "load_collection": - bid = backend_for_collection(node["arguments"]["id"]) - if bid == primary_backend: - primary_pg.process_graph[node_id] = node + def __init__(self, backend_for_collection: Callable[[str], str]): + # TODO: just handle this `backend_for_collection` callback with a regular method? + self.backend_for_collection = backend_for_collection + + def split( + self, process: PGWithMetadata, metadata: dict = None, job_options: dict = None + ) -> PartitionedJob: + process_graph = process["process_graph"] + + # Extract necessary back-ends from `load_collection` usage + backend_usage = collections.Counter( + self.backend_for_collection(node["arguments"]["id"]) + for node in process_graph.values() + if node["process_id"] == "load_collection" + ) + _log.info( + f"Extracted backend usage from `load_collection` nodes: {backend_usage}" + ) + + primary_backend = backend_usage.most_common(1)[0][0] + secondary_backends = {b for b in backend_usage if b != primary_backend} + _log.info(f"Backend split: {primary_backend=} {secondary_backends=}") + + primary_id = "primary" + primary_pg = SubJob(process_graph={}, backend_id=primary_backend) + + subjobs: Dict[str, SubJob] = {primary_id: primary_pg} + dependencies: Dict[str, List[str]] = {primary_id: []} + + for node_id, node in process_graph.items(): + if node["process_id"] == "load_collection": + bid = self.backend_for_collection(node["arguments"]["id"]) + if bid == primary_backend: + primary_pg.process_graph[node_id] = node + else: + # New secondary pg + pg = { + node_id: node, + "sr1": { + # TODO: other/better choices for save_result format (e.g. based on backend support)? + # TODO: particular format options? + "process_id": "save_result", + "arguments": {"format": "NetCDF"}, + }, + } + dependency_id = f"{bid}:{node_id}" + subjobs[dependency_id] = SubJob(process_graph=pg, backend_id=bid) + dependencies[primary_id].append(dependency_id) + # Link to primary pg with load_result + primary_pg.process_graph[node_id] = { + # TODO: encapsulate this placeholder process/id better? + "process_id": "load_result", + "arguments": {"id": f"placeholder:{dependency_id}"}, + } else: - # New secondary pg - pg = { - node_id: node, - "sr1": { - # TODO: other/better choices for save_result format (e.g. based on backend support)? - "process_id": "save_result", - "arguments": {"format": "NetCDF"}, - }, - } - dependency_id = node_id - secondary_pgs[dependency_id] = SubJob(process_graph=pg, backend_id=bid) - # Link to primary pg with load_result - primary_pg.process_graph[node_id] = { - # TODO: encapsulate this placeholder process/id better? - "process_id": "load_result", - "arguments": {"id": f"placeholder:{dependency_id}"}, - } - else: - primary_pg.process_graph[node_id] = node + primary_pg.process_graph[node_id] = node - return primary_pg, secondary_pgs + return PartitionedJob( + process=process, + metadata=metadata, + job_options=job_options, + subjobs=PartitionedJob.to_subjobs_dict(subjobs), + dependencies=dependencies, + ) def main(): + # Simple proof of concept for cross-backend splitting process_graph = { "lc1": {"process_id": "load_collection", "arguments": {"id": "VITO_1"}}, "lc2": {"process_id": "load_collection", "arguments": {"id": "SH_1"}}, @@ -78,11 +98,24 @@ def main(): print("Original PG:") pprint(process_graph) - split = cross_backend_split( - process_graph, backend_for_collection=lambda cid: cid.split("_")[0] + splitter = CrossBackendSplitter( + backend_for_collection=lambda cid: cid.split("_")[0] ) + + pjob = splitter.split({"process_graph": process_graph}) + + def namedtuples_to_dict(x): + """Walk data structure and convert namedtuples to dictionaries""" + if hasattr(x, "_asdict"): + return namedtuples_to_dict(x._asdict()) + elif isinstance(x, list): + return [namedtuples_to_dict(i) for i in x] + elif isinstance(x, dict): + return {k: namedtuples_to_dict(v) for k, v in x.items()} + else: + return x print("Cross-backend split:") - pprint(split) + pprint(namedtuples_to_dict(pjob), width=120) if __name__ == "__main__": diff --git a/src/openeo_aggregator/partitionedjobs/splitting.py b/src/openeo_aggregator/partitionedjobs/splitting.py index 8fc36fab..10a2f3b4 100644 --- a/src/openeo_aggregator/partitionedjobs/splitting.py +++ b/src/openeo_aggregator/partitionedjobs/splitting.py @@ -51,7 +51,6 @@ class AbstractJobSplitter(metaclass=abc.ABCMeta): def split( self, process: PGWithMetadata, metadata: dict = None, job_options: dict = None ) -> PartitionedJob: - # TODO: how to express dependencies? give SubJobs an id for referencing? # TODO: how to express combination/aggregation of multiple subjob results as a final result? ... @@ -74,7 +73,12 @@ def split( process_graph = self.processing.preprocess_process_graph(process_graph, backend_id=backend_id) subjob = SubJob(process_graph=process_graph, backend_id=backend_id) - return PartitionedJob(process=process, metadata=metadata, job_options=job_options, subjobs=[subjob]) + return PartitionedJob( + process=process, + metadata=metadata, + job_options=job_options, + subjobs=PartitionedJob.to_subjobs_dict([subjob]), + ) class TileGrid(typing.NamedTuple): @@ -185,7 +189,12 @@ def split( "tiles": [t.as_dict() for t in tiles] } - return PartitionedJob(process=process, metadata=metadata, job_options=job_options, subjobs=subjobs) + return PartitionedJob( + process=process, + metadata=metadata, + job_options=job_options, + subjobs=PartitionedJob.to_subjobs_dict(subjobs), + ) def _extract_global_spatial_extent(self, process: PGWithMetadata) -> BoundingBox: """Extract global spatial extent from given process graph""" diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py index 02521e6f..259d1803 100644 --- a/src/openeo_aggregator/partitionedjobs/tracking.py +++ b/src/openeo_aggregator/partitionedjobs/tracking.py @@ -122,6 +122,7 @@ def start_sjobs(self, user_id: str, pjob_id: str, flask_request: flask.Request): start_stats = collections.Counter() with TimingLogger(title=f"Starting partitioned job {pjob_id!r} with {len(sjobs)} sub-jobs", logger=_log.info): # TODO: only start a subset of sub-jobs? #37 + # TODO: only start sub-jobs where all dependencies are available for sjob_id, sjob_metadata in sjobs.items(): sjob_status = self._db.get_sjob_status(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id)["status"] _log.info(f"To Start: {pjob_id!r}:{sjob_id!r} (status {sjob_status})") diff --git a/src/openeo_aggregator/partitionedjobs/zookeeper.py b/src/openeo_aggregator/partitionedjobs/zookeeper.py index 62afb4d4..0cbb0c7c 100644 --- a/src/openeo_aggregator/partitionedjobs/zookeeper.py +++ b/src/openeo_aggregator/partitionedjobs/zookeeper.py @@ -126,8 +126,7 @@ def insert(self, user_id: str, pjob: PartitionedJob) -> str: ) # Insert subjobs - for i, subjob in enumerate(pjob.subjobs): - sjob_id = f"{i:04d}" + for i, (sjob_id, subjob) in enumerate(pjob.subjobs.items()): self._client.create( path=self._path(user_id, pjob_id, "sjobs", sjob_id), value=self.serialize( diff --git a/tests/partitionedjobs/conftest.py b/tests/partitionedjobs/conftest.py index d758994a..c986ab25 100644 --- a/tests/partitionedjobs/conftest.py +++ b/tests/partitionedjobs/conftest.py @@ -42,10 +42,12 @@ def pjob(): process=P35, metadata={}, job_options={}, - subjobs=[ - SubJob(process_graph=PG12, backend_id="b1"), - SubJob(process_graph=PG23, backend_id="b2"), - ] + subjobs=PartitionedJob.to_subjobs_dict( + [ + SubJob(process_graph=PG12, backend_id="b1"), + SubJob(process_graph=PG23, backend_id="b2"), + ] + ), ) diff --git a/tests/partitionedjobs/test_splitting.py b/tests/partitionedjobs/test_splitting.py index a7d6e202..88e12d1e 100644 --- a/tests/partitionedjobs/test_splitting.py +++ b/tests/partitionedjobs/test_splitting.py @@ -29,7 +29,9 @@ def test_flimsy_splitter(multi_backend_connection, catalog, config): process = {"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}} pjob = splitter.split(process) assert len(pjob.subjobs) == 1 - assert pjob.subjobs[0].process_graph == { + ((sjob_id, sjob),) = pjob.subjobs.items() + assert sjob_id == "0000" + assert sjob.process_graph == { "add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, } @@ -97,7 +99,8 @@ def test_simple_small_coverage(self, aggregator_processing, tile_grid, west, sou pjob = splitter.split(process=process, job_options={"tile_grid": tile_grid}) assert len(pjob.subjobs) == 1 - new_process_graph = pjob.subjobs[0].process_graph + ((sjob_id, sjob),) = pjob.subjobs.items() + new_process_graph = sjob.process_graph assert len(new_process_graph) == 3 new_node_id, = set(new_process_graph.keys()).difference(process["process_graph"].keys()) assert new_process_graph["lc"] == process["process_graph"]["lc"] @@ -137,7 +140,7 @@ def test_basic(self, aggregator_processing, spatial_extent, tile_grid, expected_ filter_bbox_extents = [] - for subjob in pjob.subjobs: + for subjob_id, subjob in pjob.subjobs.items(): new_process_graph = subjob.process_graph assert len(new_process_graph) == 3 new_node_id, = set(new_process_graph.keys()).difference(process["process_graph"].keys())