diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index e5a075f0..009bc1f9 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -1,18 +1,18 @@ import contextlib import datetime import functools +import itertools import logging import time from collections import defaultdict from typing import List, Dict, Union, Tuple, Optional, Iterable, Iterator, Callable, Any -import dateutil.parser import flask import openeo_driver.util.view_helpers from openeo.capabilities import ComparableVersion from openeo.rest import OpenEoApiError, OpenEoRestError, OpenEoClientException -from openeo.util import dict_no_none, TimingLogger, deep_get +from openeo.util import dict_no_none, TimingLogger, deep_get, rfc3339 from openeo_aggregator.config import AggregatorConfig, STREAM_CHUNK_SIZE_DEFAULT, CACHE_TTL_DEFAULT, \ CONNECTION_TIMEOUT_RESULT, CONNECTION_TIMEOUT_JOB_START from openeo_aggregator.connection import MultiBackendConnection, BackendConnection, streaming_flask_response @@ -113,7 +113,8 @@ def _get_all_metadata(self) -> Tuple[List[dict], _InternalCollectionMetadata]: return collections_metadata, internal_data - def _normalize_metadata(self, metadata: dict) -> dict: + @classmethod + def _normalize_metadata(cls, metadata: dict) -> dict: cid = metadata.get("id", None) if cid is None: raise OpenEOApiException("Missing collection id in metadata") @@ -137,7 +138,8 @@ def _normalize_metadata(self, metadata: dict) -> dict: _log.warning("Unable to provide root/parent/self links in collection metadata outside flask app context") return metadata - def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict: + @classmethod + def _merge_collection_metadata(cls, by_backend: Dict[str, dict], report=_log.warning) -> dict: """ Merge collection metadata dicts from multiple backends """ @@ -149,6 +151,7 @@ def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict: cid = ids.pop() _log.info(f"Merging collection metadata for {cid!r}") + # Start with some initial/required fields result = { "id": cid, "stac_version": max(list(getter.get("stac_version")) + ["0.9.0"]), @@ -156,16 +159,37 @@ def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict: "description": getter.first("description", default=cid), "type": getter.first("type", default="Collection"), "links": [l for l in list(getter.concat("links")) if l.get("rel") not in ("self", "parent", "root")], - "summaries": getter.select("summaries").simple_merge() } - # Note: CRS is required by OGC API: https://docs.opengeospatial.org/is/18-058/18-058.html#_crs_identifier_list - result.update(getter.simple_merge([ - "stac_extensions", "keywords", "deprecated", "providers", "assets", - "crs", - "sci:citation", "sci:doi", "sci:publications" - ])) - - ## All keys with special merge handling. + + # Generic field merging + # Notes: + # - `crs` is required by OGC API: https://docs.opengeospatial.org/is/18-058/18-058.html#_crs_identifier_list + # - `sci:doi` and related are defined at https://github.com/stac-extensions/scientific + for field in getter.available_keys(["stac_extensions", "keywords", "providers", "sci:publications"]): + result[field] = getter.concat(field, skip_duplicates=True) + for field in getter.available_keys(["deprecated"]): + result[field] = all(getter.get(field)) + for field in getter.available_keys(["crs", "sci:citation", "sci:doi"]): + result[field] = getter.first(field) + + # Summary merging + result["summaries"] = {} + summaries_getter = getter.select("summaries") + for summary_name in summaries_getter.keys(): + if summary_name in [ + "constellation", "platform", "instruments", + ]: + result["summaries"][summary_name] = summaries_getter.concat(summary_name, skip_duplicates=True) + elif summary_name.startswith("sar:") or summary_name.startswith("sat:"): + result["summaries"][summary_name] = summaries_getter.concat(summary_name, skip_duplicates=True) + else: + report(f"Unhandled merging of summary {summary_name!r}") + + # Assets + if getter.has_key("assets"): + result["assets"] = {k: getter.select("assets").first(k) for k in getter.select("assets").keys()} + + # All keys with special merge handling. versions = set(getter.get("version")) if versions: # TODO: smarter version maximum? Low priority, versions key is not used in most backends. @@ -184,42 +208,50 @@ def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict: }, } - cube_dimensions = getter.first("cube:dimensions") - if cube_dimensions: - result["cube:dimensions"] = cube_dimensions - cube_band_value = getter.select("cube:dimensions").first("bands", None) - # Bands - if cube_band_value is not None: - result["cube:dimensions"]["bands"] = cube_band_value - # TODO: concatenating band values is going to give weird results (because band indices change) - cube_dimension_bands = getter.select("cube:dimensions").select("bands").concat('values', skip_duplicates=True) - result["cube:dimensions"]["bands"]["values"] = cube_dimension_bands - # Temporal dimension - cube_time_value = getter.select("cube:dimensions").first("t", None) - if cube_time_value is not None: - result["cube:dimensions"]["t"] = cube_time_value - t_extents = list(getter.select("cube:dimensions").select("t").get("extent")) - t_extents_as_datetime: [datetime.datetime] = [ - dateutil.parser.isoparse(ex) for subex in t_extents for ex in subex if ex is not None - ] - t_extent = [ - None if any([ex[0] is None for ex in t_extents]) else min(t_extents_as_datetime), - None if any([ex[1] is None for ex in t_extents]) else max(t_extents_as_datetime) - ] - t_extent = [ex.strftime('%Y-%m-%dT%H:%M:%SZ') or None for ex in t_extent] - result["cube:dimensions"]["t"]["extent"] = t_extent + if getter.has_key("cube:dimensions"): + cube_dim_getter = getter.select("cube:dimensions") + result["cube:dimensions"] = {} + # Spatial dimensions - for dim in ["x", "y"]: - cube_dim_value = getter.select("cube:dimensions").first(dim, None) - if cube_dim_value is not None: - result["cube:dimensions"][dim] = cube_dim_value - extents = getter.select("cube:dimensions").select(dim).get("extent") - extents_flat = [ex for subex in extents for ex in subex if ex is not None] - spatial_extent = [min(extents_flat), max(extents_flat)] if extents_flat else [None, None] - result["cube:dimensions"][dim]["extent"] = spatial_extent + for dim in cube_dim_getter.available_keys(["x", "y"]): + result["cube:dimensions"][dim] = cube_dim_getter.first(dim) + # TODO: check consistency of step and reference_system? + try: + bounds = cube_dim_getter.select(dim).concat("extent") + result["cube:dimensions"][dim]["extent"] = [min(bounds), max(bounds)] + except Exception as e: + report(f"Failed to merge cube:dimensions.{dim}.extent: {e!r}") + # Temporal dimension + for dim in cube_dim_getter.available_keys(["t"]): + result["cube:dimensions"][dim] = cube_dim_getter.first(dim) + # TODO: check consistency of step? + try: + t_starts = [e[0] for e in cube_dim_getter.select(dim).get("extent") if e[0]] + t_ends = [e[1] for e in cube_dim_getter.select(dim).get("extent") if e[1]] + result["cube:dimensions"][dim]["extent"] = [ + min(rfc3339.normalize(t) for t in t_starts) if t_starts else None, + max(rfc3339.normalize(t) for t in t_ends) if t_ends else None + ] + except Exception as e: + report(f"Failed to merge cube:dimensions.{dim}.extent: {e!r}") + + for dim in cube_dim_getter.available_keys(["bands"]): + result["cube:dimensions"][dim] = cube_dim_getter.first(dim) + try: + # Find common prefix of bands + # TODO: better approach? e.g. keep everything and rewrite process graphs on the fly? + bands_iterator = cube_dim_getter.select(dim).get("values") + prefix = next(bands_iterator) + for bands in bands_iterator: + prefix = [t[0] for t in itertools.takewhile(lambda t: t[0] == t[1], zip(prefix, bands))] + if bands != prefix: + report(f"Trimming bands {bands} to common prefix {prefix}") + result["cube:dimensions"][dim]["values"] = prefix + except Exception as e: + report(f"Failed to merge cube:dimensions.{dim}.extent: {e!r}") # TODO: use a more robust/user friendly backend pointer than backend id (which is internal implementation detail) - result["summaries"][self.STAC_PROPERTY_PROVIDER_BACKEND] = list(by_backend.keys()) + result["summaries"][cls.STAC_PROPERTY_PROVIDER_BACKEND] = list(by_backend.keys()) ## Log warnings for improper metadata. # license => Log warning for collections without license links. @@ -227,20 +259,6 @@ def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict: if result["license"] in ["various", "proprietary"] and not license_links: _log.warning(f"Missing license links for collection: {cid}") - # cube:dimensions => Assert step size and reference_system equal. - for dim in ['t', 'x', 'y']: - step_sizes = getter.select("cube:dimensions").select(dim).union("step_size") - if len(step_sizes) > 1: - _log.warning( - f"Multiple cube:dimensions.{dim}.step_size values across backends for in collection: {cid}: {step_sizes}" - ) - if dim in ["x", "y"]: - reference_systems = getter.select("cube:dimensions").select(dim).union("reference_system") - if len(reference_systems) > 1: - _log.warning( - f"Multiple cube:dimension.{dim}.reference_system values across backends for collection {cid}: {reference_systems}" - ) - return result @staticmethod diff --git a/src/openeo_aggregator/utils.py b/src/openeo_aggregator/utils.py index 1158fef8..23534c9a 100644 --- a/src/openeo_aggregator/utils.py +++ b/src/openeo_aggregator/utils.py @@ -1,4 +1,5 @@ import datetime +import functools import logging import types @@ -99,6 +100,15 @@ def get(self, key: str) -> Iterator: if key in d: yield d[key] + def keys(self) -> set[str]: + return functools.reduce(lambda a, b: a.union(b), (d.keys() for d in self.dictionaries), set()) + + def has_key(self, key: str) -> bool: + return any(key in d for d in self.dictionaries) + + def available_keys(self, keys: List[str]) -> List[str]: + return [k for k in keys if self.has_key(k)] + def concat(self, key: str, skip_duplicates=False) -> list: """ Concatenate all lists/tuples at given `key (optionally skipping duplicate items in the process) @@ -114,41 +124,6 @@ def concat(self, key: str, skip_duplicates=False) -> list: _log.warning(f"Skipping unexpected type in MultiDictGetter.concat: {items}") return result - def union(self, key:str) -> set: - """Like `concat` but with set-wise union (removing duplicates).""" - return set(self.concat(key=key, skip_duplicates=True)) - - def simple_merge(self, included_keys=None) -> dict: - """ - All dictionaries are merged following simple rules: - For list or sets: all elements are merged into a single list, without duplicates. - For dictionaries: all keys are added to a single dict, duplicate keys are merged recursively. - For all other types: the first value is returned. - - It assumes that all duplicate keys in a dictionary have items of the same type. - - Args: - included_keys: If given, only these top level keys are merged into the final dict. - """ - if len(self.dictionaries) == 0: - return {} - if len(self.dictionaries) == 1: - return self.dictionaries[0] - - result = {} - for dictionary in self.dictionaries: - for key, item in dictionary.items(): - if included_keys is not None and key not in included_keys: - continue - if key in result: - if isinstance(item, list) or isinstance(item, set): - result[key] = self.concat(key, skip_duplicates=True) - elif isinstance(item, dict): - result[key] = self.select(key).simple_merge() - else: - result[key] = item - return result - def first(self, key, default=None): return next(self.get(key), default) diff --git a/tests/test_backend.py b/tests/test_backend.py index e22fe351..71528988 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -188,7 +188,7 @@ def test_get_all_metadata_common_collections_merging( }, "cube:dimensions": { "bands": {"type": "bands", "values": ["B01", "B02"]}, - "x": {"type": "spatial", "axis": "x"} + "x": {"type": "spatial", "axis": "x", "extent": [3, 4]}, }, "links": [ {"rel": "license", "href": "https://spdx.org/licenses/MIT.html"}, @@ -230,8 +230,8 @@ def test_get_all_metadata_common_collections_merging( ["2012-02-02T00:00:00Z", "2019-01-01T00:00:00Z"]]}}, "cube:dimensions": { "bands": {"type": "bands", "values": ["B01", "B02"]}, - "x": {"type": "spatial", "extent": [None, None], "axis": "x"}, - "y": {"type": "spatial", "extent": [None, None], "axis": "y"} + "x": {"type": "spatial", "extent": [3, 4], "axis": "x"}, + "y": {"type": "spatial", "axis": "y"} }, "license": "various", "providers": [{"name": "ESA", "roles": ["producer"]}, {"name": "ESA", "roles": ["licensor"]}], @@ -367,34 +367,43 @@ def test_get_collection_metadata_merging(self, multi_backend_connection, backend } def test_get_collection_metadata_merging_summaries( - self, multi_backend_connection, backend1, backend2, requests_mock, flask_app + self, multi_backend_connection, backend1, backend2, requests_mock, flask_app ): requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S2"}]}) requests_mock.get(backend1 + "/collections/S2", json={ - "id": "S2", "summaries": { - "constellation": ["sentinel-1"], "instruments": ["c-sar"], "platform": ["sentinel-1a", "sentinel-1b"], - "raster:bands": [{ - "description": "Single co-polarization, vertical transmit/vertical receive", "name": "VV", - "openeo:gsd": { - "unit": "°", "value": [[0.00009259259, 0.00009259259], [0.00023148147, 0.00023148147], - [0.00037037037, 0.00037037037]] - } - }], - "sar:center_frequency": [5.405], "sar:frequency_band": ["C"], + "id": "S2", + "summaries": { + "constellation": ["sentinel-1"], + "instruments": ["c-sar"], + "platform": ["sentinel-1a", "sentinel-1b"], + "sar:center_frequency": [5.405], + "sar:frequency_band": ["C"], "sar:instrument_mode": ["SM", "IW", "EW", "WV"], - "sar:polarizations": ["SH", "SV", "DH", "DV", "HH", "HV", "VV", "VH"], "sar:product_type": ["GRD"], - "sar:resolution": [10, 25, 40], "sat:orbit_state": ["ascending", "descending"] + "sar:polarizations": ["SH", "SV", "DH", "DV", "HH", "HV", "VV", "VH"], + "sar:product_type": ["GRD"], + "sar:resolution": [10, 25, 40], + "sat:orbit_state": ["ascending", "descending"], }, }) requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S2"}]}) requests_mock.get(backend2 + "/collections/S2", json={ - "id": "S2", "summaries": { - "constellation": ["sentinel-1"], "instruments": ["c-sar"], "platform": ["sentinel-1"], - "sar:center_frequency": [5.405], "sar:frequency_band": ["C"], "sar:instrument_mode": ["IW"], - "sar:looks_azimuth": [1], "sar:looks_equivalent_number": [4.4], "sar:looks_range": [5], - "sar:pixel_spacing_azimuth": [10], "sar:pixel_spacing_range": [10], - "sar:polarizations": ["HH", "VV", "VV+VH", "HH+HV"], "sar:product_type": ["GRD"], - "sar:resolution_azimuth": [22], "sar:resolution_range": [20] + "id": "S2", + "summaries": { + "constellation": ["sentinel-1"], + "instruments": ["c-sar"], + "platform": ["sentinel-1"], + "sar:center_frequency": [5.405], + "sar:frequency_band": ["C"], + "sar:instrument_mode": ["IW"], + "sar:looks_azimuth": [1], + "sar:looks_equivalent_number": [4.4], + "sar:looks_range": [5], + "sar:pixel_spacing_azimuth": [10], + "sar:pixel_spacing_range": [10], + "sar:polarizations": ["HH", "VV", "VV+VH", "HH+HV"], + "sar:product_type": ["GRD"], + "sar:resolution_azimuth": [22], + "sar:resolution_range": [20], } }) catalog = AggregatorCollectionCatalog(backends=multi_backend_connection) @@ -403,28 +412,35 @@ def test_get_collection_metadata_merging_summaries( 'id': 'S2', 'stac_version': '0.9.0', 'title': 'S2', 'description': 'S2', 'type': 'Collection', 'license': 'proprietary', 'extent': {'spatial': {'bbox': [[-180, -90, 180, 90]]}, 'temporal': {'interval': [[None, None]]}}, - 'links': [{'href': 'http://oeoa.test/openeo/1.1.0/collections', 'rel': 'root'}, - {'href': 'http://oeoa.test/openeo/1.1.0/collections', 'rel': 'parent'}, - {'href': 'http://oeoa.test/openeo/1.1.0/collections/S2', 'rel': 'self'}], + 'links': [ + {'href': 'http://oeoa.test/openeo/1.1.0/collections', 'rel': 'root'}, + {'href': 'http://oeoa.test/openeo/1.1.0/collections', 'rel': 'parent'}, + {'href': 'http://oeoa.test/openeo/1.1.0/collections/S2', 'rel': 'self'} + ], 'summaries': { - 'provider:backend': ['b1', 'b2'], "constellation": ["sentinel-1"], "instruments": ["c-sar"], - "platform": ["sentinel-1a", "sentinel-1b", "sentinel-1"], "raster:bands": [{ - "description": "Single co-polarization, vertical transmit/vertical receive", "name": "VV", - "openeo:gsd": { - "unit": "°", "value": [[0.00009259259, 0.00009259259], [0.00023148147, 0.00023148147], - [0.00037037037, 0.00037037037]] - } - }], "sar:center_frequency": [5.405], "sar:frequency_band": ["C"], + 'provider:backend': ['b1', 'b2'], + "constellation": ["sentinel-1"], + "instruments": ["c-sar"], + "platform": ["sentinel-1a", "sentinel-1b", "sentinel-1"], + "sar:center_frequency": [5.405], + "sar:frequency_band": ["C"], "sar:instrument_mode": ["SM", "IW", "EW", "WV"], - "sar:polarizations": ["SH", "SV", "DH", "DV", "HH", "HV", "VV", "VH", "VV+VH", "HH+HV"], "sar:product_type": ["GRD"], - "sar:resolution": [10, 25, 40], "sat:orbit_state": ["ascending", "descending"], - 'sar:looks_azimuth': [1], 'sar:looks_equivalent_number': [4.4], 'sar:looks_range': [5], - 'sar:pixel_spacing_azimuth': [10], 'sar:pixel_spacing_range': [10], 'sar:resolution_azimuth': [22], + "sar:polarizations": ["SH", "SV", "DH", "DV", "HH", "HV", "VV", "VH", "VV+VH", "HH+HV"], + "sar:product_type": ["GRD"], + "sar:resolution": [10, 25, 40], + "sat:orbit_state": ["ascending", "descending"], + 'sar:looks_azimuth': [1], + 'sar:looks_equivalent_number': [4.4], + 'sar:looks_range': [5], + 'sar:pixel_spacing_azimuth': [10], + 'sar:pixel_spacing_range': [10], + 'sar:resolution_azimuth': [22], "sar:resolution_range": [20] } } - def test_get_collection_metadata_merging_extent(self, multi_backend_connection, backend1, backend2, requests_mock, flask_app): + def test_get_collection_metadata_merging_extent(self, multi_backend_connection, backend1, backend2, requests_mock, + flask_app): requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S2"}]}) requests_mock.get(backend1 + "/collections/S2", json={ "id": "S2", "extent": { @@ -518,7 +534,7 @@ def test_get_collection_metadata_merging_cubedimensions( requests_mock.get(backend2 + "/collections/S2", json={ "id": "S2", "cube:dimensions": { "bands": { - "type": "bands", "values": ["HH", "VV", "HH+HV", "VV+VH"] + "type": "bands", "values": ["VV", "VH", "HH", "HH+HV", "VV+VH", "HV"] }, "t": { "extent": ["2013-04-03T00:00:00Z", "2019-04-03T00:00:00Z"], "step": 1, "type": "temporal" }, "x": { @@ -538,7 +554,7 @@ def test_get_collection_metadata_merging_cubedimensions( 'description': 'S2', 'type': 'Collection', 'license': 'proprietary', "cube:dimensions": { "bands": { - "type": "bands", "values": ["VV", "VH", "HV", "HH", "HH+HV", "VV+VH"], + "type": "bands", "values": ["VV", "VH"], }, "t": { "extent": ["2013-04-03T00:00:00Z", "2020-04-03T00:00:00Z"], "step": 1, "type": "temporal" diff --git a/tests/test_utils.py b/tests/test_utils.py index ea5e7ac8..cca783da 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -111,6 +111,22 @@ def test_basic(self): assert list(getter.get("c")) == [333] assert list(getter.get("d")) == [] + def test_keys(self): + assert MultiDictGetter([]).keys() == set() + assert MultiDictGetter([{"a": 1, "b": 2}, {"b": 222, "c": 333}]).keys() == {"a", "b", "c"} + assert MultiDictGetter([{"a": 1}, {"bb": 2}, {"ccc": 222}]).keys() == {"a", "bb", "ccc"} + + def test_has_key(self): + getter = MultiDictGetter([{"a": 1, "b": 2}, {"b": 222, "c": 333}]) + assert getter.has_key("a") + assert getter.has_key("b") + assert getter.has_key("c") + assert not getter.has_key("d") + + def test_available_keys(self): + getter = MultiDictGetter([{"a": 1, "b": 2}, {"b": 222, "c": 333}]) + assert getter.available_keys(["a", "c", "d"]) == ["a", "c"] + def test_concat(self): getter = MultiDictGetter([ {"a": [1, 11], "b": [2, 22], "c": [33]}, @@ -145,16 +161,6 @@ def test_concat_type_handling(self, data, expected, expect_warning, caplog): else: assert not caplog.text - def test_union(self): - getter = MultiDictGetter([ - {"a": [1, 11], "b": [2, 22], "c": [33]}, - {"b": [222, 2222], "c": (33, 3333)} - ]) - assert getter.union("a") == {1, 11} - assert getter.union("b") == {2, 22, 222, 2222} - assert getter.union("c") == {33, 3333} - assert getter.union("d") == set() - def test_first(self): getter = MultiDictGetter([{"a": 1, "b": 2}, {"b": 222, "c": 333}]) assert getter.first("a") == 1