Skip to content

Commit

Permalink
Further finetuning of PR #66 (issue #5)
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 16, 2022
1 parent 4ab369c commit e1f0d0c
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 146 deletions.
138 changes: 78 additions & 60 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import contextlib
import datetime
import functools
import itertools
import logging
import time
from collections import defaultdict
from typing import List, Dict, Union, Tuple, Optional, Iterable, Iterator, Callable, Any

import dateutil.parser
import flask

import openeo_driver.util.view_helpers
from openeo.capabilities import ComparableVersion
from openeo.rest import OpenEoApiError, OpenEoRestError, OpenEoClientException
from openeo.util import dict_no_none, TimingLogger, deep_get
from openeo.util import dict_no_none, TimingLogger, deep_get, rfc3339
from openeo_aggregator.config import AggregatorConfig, STREAM_CHUNK_SIZE_DEFAULT, CACHE_TTL_DEFAULT, \
CONNECTION_TIMEOUT_RESULT, CONNECTION_TIMEOUT_JOB_START
from openeo_aggregator.connection import MultiBackendConnection, BackendConnection, streaming_flask_response
Expand Down Expand Up @@ -113,7 +113,8 @@ def _get_all_metadata(self) -> Tuple[List[dict], _InternalCollectionMetadata]:

return collections_metadata, internal_data

def _normalize_metadata(self, metadata: dict) -> dict:
@classmethod
def _normalize_metadata(cls, metadata: dict) -> dict:
cid = metadata.get("id", None)
if cid is None:
raise OpenEOApiException("Missing collection id in metadata")
Expand All @@ -137,7 +138,8 @@ def _normalize_metadata(self, metadata: dict) -> dict:
_log.warning("Unable to provide root/parent/self links in collection metadata outside flask app context")
return metadata

def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict:
@classmethod
def _merge_collection_metadata(cls, by_backend: Dict[str, dict], report=_log.warning) -> dict:
"""
Merge collection metadata dicts from multiple backends
"""
Expand All @@ -149,23 +151,45 @@ def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict:
cid = ids.pop()
_log.info(f"Merging collection metadata for {cid!r}")

# Start with some initial/required fields
result = {
"id": cid,
"stac_version": max(list(getter.get("stac_version")) + ["0.9.0"]),
"title": getter.first("title", default=cid),
"description": getter.first("description", default=cid),
"type": getter.first("type", default="Collection"),
"links": [l for l in list(getter.concat("links")) if l.get("rel") not in ("self", "parent", "root")],
"summaries": getter.select("summaries").simple_merge()
}
# Note: CRS is required by OGC API: https://docs.opengeospatial.org/is/18-058/18-058.html#_crs_identifier_list
result.update(getter.simple_merge([
"stac_extensions", "keywords", "deprecated", "providers", "assets",
"crs",
"sci:citation", "sci:doi", "sci:publications"
]))

## All keys with special merge handling.

# Generic field merging
# Notes:
# - `crs` is required by OGC API: https://docs.opengeospatial.org/is/18-058/18-058.html#_crs_identifier_list
# - `sci:doi` and related are defined at https://github.com/stac-extensions/scientific
for field in getter.available_keys(["stac_extensions", "keywords", "providers", "sci:publications"]):
result[field] = getter.concat(field, skip_duplicates=True)
for field in getter.available_keys(["deprecated"]):
result[field] = all(getter.get(field))
for field in getter.available_keys(["crs", "sci:citation", "sci:doi"]):
result[field] = getter.first(field)

# Summary merging
result["summaries"] = {}
summaries_getter = getter.select("summaries")
for summary_name in summaries_getter.keys():
if summary_name in [
"constellation", "platform", "instruments",
]:
result["summaries"][summary_name] = summaries_getter.concat(summary_name, skip_duplicates=True)
elif summary_name.startswith("sar:") or summary_name.startswith("sat:"):
result["summaries"][summary_name] = summaries_getter.concat(summary_name, skip_duplicates=True)
else:
report(f"Unhandled merging of summary {summary_name!r}")

# Assets
if getter.has_key("assets"):
result["assets"] = {k: getter.select("assets").first(k) for k in getter.select("assets").keys()}

# All keys with special merge handling.
versions = set(getter.get("version"))
if versions:
# TODO: smarter version maximum? Low priority, versions key is not used in most backends.
Expand All @@ -184,63 +208,57 @@ def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict:
},
}

cube_dimensions = getter.first("cube:dimensions")
if cube_dimensions:
result["cube:dimensions"] = cube_dimensions
cube_band_value = getter.select("cube:dimensions").first("bands", None)
# Bands
if cube_band_value is not None:
result["cube:dimensions"]["bands"] = cube_band_value
# TODO: concatenating band values is going to give weird results (because band indices change)
cube_dimension_bands = getter.select("cube:dimensions").select("bands").concat('values', skip_duplicates=True)
result["cube:dimensions"]["bands"]["values"] = cube_dimension_bands
# Temporal dimension
cube_time_value = getter.select("cube:dimensions").first("t", None)
if cube_time_value is not None:
result["cube:dimensions"]["t"] = cube_time_value
t_extents = list(getter.select("cube:dimensions").select("t").get("extent"))
t_extents_as_datetime: [datetime.datetime] = [
dateutil.parser.isoparse(ex) for subex in t_extents for ex in subex if ex is not None
]
t_extent = [
None if any([ex[0] is None for ex in t_extents]) else min(t_extents_as_datetime),
None if any([ex[1] is None for ex in t_extents]) else max(t_extents_as_datetime)
]
t_extent = [ex.strftime('%Y-%m-%dT%H:%M:%SZ') or None for ex in t_extent]
result["cube:dimensions"]["t"]["extent"] = t_extent
if getter.has_key("cube:dimensions"):
cube_dim_getter = getter.select("cube:dimensions")
result["cube:dimensions"] = {}

# Spatial dimensions
for dim in ["x", "y"]:
cube_dim_value = getter.select("cube:dimensions").first(dim, None)
if cube_dim_value is not None:
result["cube:dimensions"][dim] = cube_dim_value
extents = getter.select("cube:dimensions").select(dim).get("extent")
extents_flat = [ex for subex in extents for ex in subex if ex is not None]
spatial_extent = [min(extents_flat), max(extents_flat)] if extents_flat else [None, None]
result["cube:dimensions"][dim]["extent"] = spatial_extent
for dim in cube_dim_getter.available_keys(["x", "y"]):
result["cube:dimensions"][dim] = cube_dim_getter.first(dim)
# TODO: check consistency of step and reference_system?
try:
bounds = cube_dim_getter.select(dim).concat("extent")
result["cube:dimensions"][dim]["extent"] = [min(bounds), max(bounds)]
except Exception as e:
report(f"Failed to merge cube:dimensions.{dim}.extent: {e!r}")
# Temporal dimension
for dim in cube_dim_getter.available_keys(["t"]):
result["cube:dimensions"][dim] = cube_dim_getter.first(dim)
# TODO: check consistency of step?
try:
t_starts = [e[0] for e in cube_dim_getter.select(dim).get("extent") if e[0]]
t_ends = [e[1] for e in cube_dim_getter.select(dim).get("extent") if e[1]]
result["cube:dimensions"][dim]["extent"] = [
min(rfc3339.normalize(t) for t in t_starts) if t_starts else None,
max(rfc3339.normalize(t) for t in t_ends) if t_ends else None
]
except Exception as e:
report(f"Failed to merge cube:dimensions.{dim}.extent: {e!r}")

for dim in cube_dim_getter.available_keys(["bands"]):
result["cube:dimensions"][dim] = cube_dim_getter.first(dim)
try:
# Find common prefix of bands
# TODO: better approach? e.g. keep everything and rewrite process graphs on the fly?
bands_iterator = cube_dim_getter.select(dim).get("values")
prefix = next(bands_iterator)
for bands in bands_iterator:
prefix = [t[0] for t in itertools.takewhile(lambda t: t[0] == t[1], zip(prefix, bands))]
if bands != prefix:
report(f"Trimming bands {bands} to common prefix {prefix}")
result["cube:dimensions"][dim]["values"] = prefix
except Exception as e:
report(f"Failed to merge cube:dimensions.{dim}.extent: {e!r}")

# TODO: use a more robust/user friendly backend pointer than backend id (which is internal implementation detail)
result["summaries"][self.STAC_PROPERTY_PROVIDER_BACKEND] = list(by_backend.keys())
result["summaries"][cls.STAC_PROPERTY_PROVIDER_BACKEND] = list(by_backend.keys())

## Log warnings for improper metadata.
# license => Log warning for collections without license links.
license_links = [l for l in getter.concat("links") if l.get("rel") == "license"]
if result["license"] in ["various", "proprietary"] and not license_links:
_log.warning(f"Missing license links for collection: {cid}")

# cube:dimensions => Assert step size and reference_system equal.
for dim in ['t', 'x', 'y']:
step_sizes = getter.select("cube:dimensions").select(dim).union("step_size")
if len(step_sizes) > 1:
_log.warning(
f"Multiple cube:dimensions.{dim}.step_size values across backends for in collection: {cid}: {step_sizes}"
)
if dim in ["x", "y"]:
reference_systems = getter.select("cube:dimensions").select(dim).union("reference_system")
if len(reference_systems) > 1:
_log.warning(
f"Multiple cube:dimension.{dim}.reference_system values across backends for collection {cid}: {reference_systems}"
)

return result

@staticmethod
Expand Down
45 changes: 10 additions & 35 deletions src/openeo_aggregator/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import functools
import logging
import types

Expand Down Expand Up @@ -99,6 +100,15 @@ def get(self, key: str) -> Iterator:
if key in d:
yield d[key]

def keys(self) -> set[str]:
return functools.reduce(lambda a, b: a.union(b), (d.keys() for d in self.dictionaries), set())

def has_key(self, key: str) -> bool:
return any(key in d for d in self.dictionaries)

def available_keys(self, keys: List[str]) -> List[str]:
return [k for k in keys if self.has_key(k)]

def concat(self, key: str, skip_duplicates=False) -> list:
"""
Concatenate all lists/tuples at given `key (optionally skipping duplicate items in the process)
Expand All @@ -114,41 +124,6 @@ def concat(self, key: str, skip_duplicates=False) -> list:
_log.warning(f"Skipping unexpected type in MultiDictGetter.concat: {items}")
return result

def union(self, key:str) -> set:
"""Like `concat` but with set-wise union (removing duplicates)."""
return set(self.concat(key=key, skip_duplicates=True))

def simple_merge(self, included_keys=None) -> dict:
"""
All dictionaries are merged following simple rules:
For list or sets: all elements are merged into a single list, without duplicates.
For dictionaries: all keys are added to a single dict, duplicate keys are merged recursively.
For all other types: the first value is returned.
It assumes that all duplicate keys in a dictionary have items of the same type.
Args:
included_keys: If given, only these top level keys are merged into the final dict.
"""
if len(self.dictionaries) == 0:
return {}
if len(self.dictionaries) == 1:
return self.dictionaries[0]

result = {}
for dictionary in self.dictionaries:
for key, item in dictionary.items():
if included_keys is not None and key not in included_keys:
continue
if key in result:
if isinstance(item, list) or isinstance(item, set):
result[key] = self.concat(key, skip_duplicates=True)
elif isinstance(item, dict):
result[key] = self.select(key).simple_merge()
else:
result[key] = item
return result

def first(self, key, default=None):
return next(self.get(key), default)

Expand Down
Loading

0 comments on commit e1f0d0c

Please sign in to comment.