Skip to content

Commit

Permalink
Merge branch 'issue95-cross-backend-processing'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 23, 2023
2 parents 90cd41e + 019879e commit 3a1ee85
Show file tree
Hide file tree
Showing 15 changed files with 821 additions and 32 deletions.
1 change: 0 additions & 1 deletion .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ name: Pre-commit

on:
push:
branches: [ "main", "master" ]
pull_request:

jobs:
Expand Down
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,19 @@ All notable changes to this project will be documented in this file.

The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Current: 0.7.x]
## [Current: 0.8.x]

### Added

- Initial, experimental (client-side) proof of concept for cross-backend processing
([#95](https://github.com/Open-EO/openeo-aggregator/issues/95))

### Changed

### Fixed


## [0.7.x]

### Added

Expand Down
78 changes: 78 additions & 0 deletions scripts/crossbackend-processing-poc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import functools
import logging

import openeo
from openeo.util import TimingLogger

from openeo_aggregator.metadata import STAC_PROPERTY_FEDERATION_BACKENDS
from openeo_aggregator.partitionedjobs import PartitionedJob
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendSplitter,
run_partitioned_job,
)

_log = logging.getLogger("crossbackend-poc")


def main():
logging.basicConfig(level=logging.INFO)

temporal_extent = ["2022-09-01", "2022-09-10"]
spatial_extent = {"west": 3, "south": 51, "east": 3.1, "north": 51.1}
process_graph = {
"lc1": {
"process_id": "load_collection",
"arguments": {
"id": "TERRASCOPE_S2_TOC_V2",
"temporal_extent": temporal_extent,
"spatial_extent": spatial_extent,
"bands": ["B02", "B03"],
},
},
"lc2": {
"process_id": "load_collection",
"arguments": {
"id": "TERRASCOPE_S2_TOC_V2",
"temporal_extent": temporal_extent,
"spatial_extent": spatial_extent,
"bands": ["B04"],
},
},
"mc1": {
"process_id": "merge_cubes",
"arguments": {
"cube1": {"from_node": "lc1"},
"cube2": {"from_node": "lc2"},
},
},
"sr1": {
"process_id": "save_result",
"arguments": {"data": {"from_node": "mc1"}, "format": "NetCDF"},
"result": True,
},
}

backend_url = "openeocloud-dev.vito.be"
# backend_url = "openeo.cloud"

with TimingLogger(title=f"Connecting to {backend_url}", logger=_log):
connection = openeo.connect(url=backend_url).authenticate_oidc()

@functools.lru_cache(maxsize=100)
def backend_for_collection(collection_id) -> str:
metadata = connection.describe_collection(collection_id)
return metadata["summaries"][STAC_PROPERTY_FEDERATION_BACKENDS][0]

splitter = CrossBackendSplitter(
backend_for_collection=backend_for_collection, always_split=True
)
pjob: PartitionedJob = splitter.split({"process_graph": process_graph})
_log.info(f"Partitioned job: {pjob!r}")

with TimingLogger(title="Running partitioned job", logger=_log):
run_info = run_partitioned_job(pjob=pjob, connection=connection)
print(f"Run info of subjobs: {run_info}")


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
install_requires=[
"requests",
"attrs",
"openeo>=0.9.3.a2.dev",
"openeo>=0.16.0.a3.dev",
"openeo_driver>=0.38.1.dev",
"flask~=2.0",
"gunicorn~=20.0",
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.7.4a1"
__version__ = "0.8.0a1"
17 changes: 14 additions & 3 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@
PartitionedJobConnection,
PartitionedJobTracker,
)
from openeo_aggregator.utils import dict_merge, normalize_issuer_url, subdict
from openeo_aggregator.utils import (
FlatPG,
PGWithMetadata,
dict_merge,
normalize_issuer_url,
subdict,
)

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -450,7 +456,7 @@ def evaluate(self, process_graph: dict, env: EvalEnv = None):

return streaming_flask_response(backend_response, chunk_size=self._stream_chunk_size)

def preprocess_process_graph(self, process_graph: dict, backend_id: str) -> dict:
def preprocess_process_graph(self, process_graph: FlatPG, backend_id: str) -> dict:
def preprocess(node: Any) -> Any:
if isinstance(node, dict):
if "process_id" in node and "arguments" in node:
Expand Down Expand Up @@ -621,7 +627,12 @@ def _create_job_standard(
)

def _create_partitioned_job(
self, user_id: str, process: dict, api_version: str, metadata: dict, job_options: dict = None
self,
user_id: str,
process: PGWithMetadata,
api_version: str,
metadata: dict,
job_options: dict = None,
) -> BatchJobMetadata:
"""
Advanced/handled batch job creation:
Expand Down
30 changes: 23 additions & 7 deletions src/openeo_aggregator/partitionedjobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import List, NamedTuple
from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Union

from openeo_driver.errors import OpenEOApiException

from openeo_aggregator.utils import FlatPG, PGWithMetadata


class PartitionedJobFailure(OpenEOApiException):
code = "PartitionedJobFailure"
Expand All @@ -10,19 +12,33 @@ class PartitionedJobFailure(OpenEOApiException):
class SubJob(NamedTuple):
"""A part of a partitioned job, target at a particular, single back-end."""
# Process graph of the subjob (derived in some way from original parent process graph)
process_graph: dict
# Id of target backend
backend_id: str
process_graph: FlatPG
# Id of target backend (or None if there is no dedicated backend)
backend_id: Optional[str]


class PartitionedJob(NamedTuple):
"""A large or multi-back-end job that is split in several sub jobs"""
# Original process graph
process: dict
process: PGWithMetadata
metadata: dict
job_options: dict
# List of sub-jobs
subjobs: List[SubJob]
subjobs: Dict[str, SubJob]
# Optional dependencies between sub-jobs (maps subjob id to list of subjob ids it depdends on)
dependencies: Dict[str, Sequence[str]] = {}

@staticmethod
def to_subjobs_dict(
subjobs: Union[Sequence[SubJob], Dict[Any, SubJob]]
) -> Dict[str, SubJob]:
"""Helper to convert a collection of SubJobs to a dictionary"""
# TODO: hide this logic in a setter or __init__ (e.g. when outgrowing the constraints of typing.NamedTuple)
if isinstance(subjobs, Sequence):
return {f"{i:04d}": j for i, j in enumerate(subjobs)}
elif isinstance(subjobs, dict):
return {str(k): v for k, v in subjobs.items()}
else:
raise ValueError(subjobs)


# Statuses of partitioned jobs and subjobs
Expand Down
Loading

0 comments on commit 3a1ee85

Please sign in to comment.