Skip to content

Commit

Permalink
Issue #95 split POC script from reusable logic
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 22, 2023
1 parent 24c7c57 commit 7c56373
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 194 deletions.
76 changes: 76 additions & 0 deletions scripts/crossbackend-processing-poc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import functools
import logging

import openeo
from openeo.util import TimingLogger
from openeo_aggregator.metadata import STAC_PROPERTY_FEDERATION_BACKENDS
from openeo_aggregator.partitionedjobs import (
PartitionedJob,
)
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendSplitter,
run_partitioned_job,
)

_log = logging.getLogger("crossbackend-poc")


def main():
logging.basicConfig(level=logging.INFO)

temporal_extent = ["2022-09-01", "2022-09-10"]
spatial_extent = {"west": 3, "south": 51, "east": 3.1, "north": 51.1}
process_graph = {
"lc1": {
"process_id": "load_collection",
"arguments": {
"id": "TERRASCOPE_S2_TOC_V2",
"temporal_extent": temporal_extent,
"spatial_extent": spatial_extent,
"bands": ["B02", "B03"],
},
},
"lc2": {
"process_id": "load_collection",
"arguments": {
"id": "TERRASCOPE_S2_TOC_V2",
"temporal_extent": temporal_extent,
"spatial_extent": spatial_extent,
"bands": ["B04"],
},
},
"mc1": {
"process_id": "merge_cubes",
"arguments": {
"cube1": {"from_node": "lc1"},
"cube2": {"from_node": "lc2"},
},
},
"sr1": {
"process_id": "save_result",
"arguments": {"data": {"from_node": "mc1"}, "format": "NetCDF"},
"result": True,
},
}

with TimingLogger(title="Connecting", logger=_log):
connection = openeo.connect("openeocloud-dev.vito.be").authenticate_oidc()

@functools.lru_cache(maxsize=100)
def backend_for_collection(collection_id) -> str:
metadata = connection.describe_collection(collection_id)
return metadata["summaries"][STAC_PROPERTY_FEDERATION_BACKENDS][0]

splitter = CrossBackendSplitter(
backend_for_collection=backend_for_collection, always_split=True
)
pjob: PartitionedJob = splitter.split({"process_graph": process_graph})
_log.info(f"Partitioned job: {pjob!r}")

with TimingLogger(title="Running partitioned job", logger=_log):
run_info = run_partitioned_job(pjob=pjob, connection=connection)
print(f"Run info of subjobs: {run_info}")


if __name__ == "__main__":
main()
Loading

0 comments on commit 7c56373

Please sign in to comment.