Skip to content

Commit

Permalink
Issue #5 update collection metadata merging
Browse files Browse the repository at this point in the history
  • Loading branch information
JeroenVerstraelen authored and soxofaan committed Sep 16, 2022
1 parent bea49b7 commit 0b44750
Show file tree
Hide file tree
Showing 4 changed files with 362 additions and 313 deletions.
137 changes: 90 additions & 47 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from collections import defaultdict
from typing import List, Dict, Union, Tuple, Optional, Iterable, Iterator, Callable, Any

import dateutil
import flask

import openeo_driver.util.view_helpers
from flask import url_for
from flask import url_for, has_app_context
from openeo.capabilities import ComparableVersion
from openeo.rest import OpenEoApiError, OpenEoRestError, OpenEoClientException
from openeo.util import dict_no_none, TimingLogger, deep_get
Expand Down Expand Up @@ -107,11 +108,34 @@ def _get_all_metadata(self) -> Tuple[List[dict], _InternalCollectionMetadata]:
else:
_log.info(f"Merging {cid!r} collection metadata from backends {by_backend.keys()}")
metadata = self._merge_collection_metadata(by_backend)
metadata = self._normalize_metadata(metadata)
collections_metadata.append(metadata)
internal_data.set_backends_for_collection(cid, by_backend.keys())

return collections_metadata, internal_data

def _normalize_metadata(self, metadata: dict) -> dict:
cid = metadata.get("id", None)
if cid is None:
raise OpenEOApiException("Missing collection id in metadata")
if has_app_context():
if "links" not in metadata:
metadata["links"] = []
metadata["links"] = [l for l in metadata["links"] if l.get("rel") not in ("self", "parent", "root")]
metadata["links"].append({
"href": url_for("openeo.collections", _external=True),
"rel": "root"
})
metadata["links"].append({
"href": url_for("openeo.collections", _external=True),
"rel": "parent"
})
metadata["links"].append({
"href": url_for("openeo.collection_by_id", collection_id=cid, _external=True),
"rel": "self"
})
return metadata

def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict:
"""
Merge collection metadata dicts from multiple backends
Expand All @@ -130,15 +154,15 @@ def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict:
# stac_version
result["stac_version"] = max(list(getter.get("stac_version")) + ["0.9.0"])
# stac_extensions
stac_extensions = sorted(getter.union("stac_extensions", skip_duplicates=True))
stac_extensions = sorted(getter.merge_arrays("stac_extensions", skip_duplicates=True))
if stac_extensions:
result["stac_extensions"] = stac_extensions
# title
result["title"] = getter.first("title", default=result["id"])
# description
result["description"] = getter.first("description", default=result["id"])

result["title"] = getter.first("title", default=cid)
result["description"] = getter.first("description", default=cid)

# keywords
keywords = getter.union("keywords", skip_duplicates=True)
keywords = getter.merge_arrays("keywords", skip_duplicates=True)
if keywords:
result["keywords"] = keywords
# version
Expand All @@ -150,80 +174,99 @@ def _merge_collection_metadata(self, by_backend: Dict[str, dict]) -> dict:
deprecateds = list(getter.get("deprecated"))
if deprecateds:
result["deprecated"] = all(deprecateds)
# type (TODO Test)

result["type"] = getter.first("type", default="Collection")
# license
licenses = set(getter.get("license"))
# Assume the license links are available.
licenses = set(getter.get("license"))
result["license"] = licenses.pop() if len(licenses) == 1 else ("various" if licenses else "proprietary")
# TODO: Log warning if Properierty or various and links are missing.

# providers
providers = getter.union("providers", skip_duplicates=True)
providers = getter.merge_arrays("providers", skip_duplicates=True)
if providers:
result["providers"] = list(providers)
# extent
result["extent"] = {
"spatial": {
"bbox": getter.select("extent").select("spatial").union("bbox", skip_duplicates=True) \
"bbox": getter.select("extent").select("spatial").merge_arrays("bbox", skip_duplicates=True) \
or [[-180, -90, 180, 90]],
},
"temporal": {
"interval": getter.select("extent").select("temporal").union("interval", skip_duplicates=True) \
"interval": getter.select("extent").select("temporal").merge_arrays("interval", skip_duplicates=True) \
or [[None, None]],
},
}
# links
result["links"] = [l for l in list(getter.union("links")) if l["rel"] not in ("self", "parent", "root")]
result["links"].append({
"href": url_for("openeo.collections", _external=True),
"rel": "root"
})
result["links"].append({
"href": url_for("openeo.collections", _external=True),
"rel": "parent"
})
result["links"].append({
"href": url_for("openeo.collection_by_id", collection_id=result["id"], _external=True),
"rel": "self"
})
result["links"] = [l for l in list(getter.merge_arrays("links")) if l.get("rel") not in ("self","parent","root")]

# cube_dimensions
# TODO: combine cube:dimensions smarter?
cube_dimensions = getter.first("cube:dimensions")
if cube_dimensions:
cube_dimension_bands = list(getter.select("cube:dimensions").select("bands").merge_arrays('values', skip_duplicates=True))
t_extents = list(getter.select("cube:dimensions").select("t").get("extent"))
t_extents_flat_without_none: [datetime.datetime] = [dateutil.parser.isoparse(ex) for subex in t_extents for ex in subex if ex is not None]
t_extent = [None, None]
if t_extents_flat_without_none:
t_extent[0] = None if [ex for ex in t_extents if ex[0] is None] else min(t_extents_flat_without_none).strftime('%Y-%m-%dT%H:%M:%SZ')
t_extent[1] = None if [ex for ex in t_extents if ex[1] is None] else max(t_extents_flat_without_none).strftime('%Y-%m-%dT%H:%M:%SZ')
x_extents = getter.select("cube:dimensions").select("x").get("extent")
x_extents_flat = [ex for subex in x_extents for ex in subex if ex is not None]
x_extent = [min(x_extents_flat), max(x_extents_flat)] if x_extents_flat else [None, None]
y_extents = getter.select("cube:dimensions").select("y").get("extent")
y_extents_flat = [ex for subex in y_extents for ex in subex if ex is not None]
y_extent = [min(y_extents_flat), max(y_extents_flat)] if y_extents_flat else [None, None]

# Assert step size and reference_system equal.
for dim in ['t', 'x', 'y']:
if len(getter.select("cube:dimensions").select(dim).merge_arrays("step_size", skip_duplicates=True)) > 1:
_log.warning(f"Step sizes are not equal among all backends "
f"for cube:dimensions.{dim} dimension in collection: {cid}")
if dim == 't': continue
if len(getter.select("cube:dimensions").select(dim).merge_arrays("reference_system", skip_duplicates=True)) > 1:
_log.warning(f"Reference systems are not equal among all backends "
f"for cube:dimensions.{dim} dimension in collection: {cid}")

result["cube:dimensions"] = cube_dimensions
cube_band_value = getter.select("cube:dimensions").first("bands", None)
if cube_band_value is not None:
result["cube:dimensions"]["bands"] = cube_band_value
result["cube:dimensions"]["bands"]["values"] = cube_dimension_bands
for dim, extent in zip(['t', 'x', 'y'], [t_extent, x_extent, y_extent]):
cube_dim_value = getter.select("cube:dimensions").first(dim, None)
if cube_dim_value is not None:
result["cube:dimensions"][dim] = cube_dim_value
result["cube:dimensions"][dim]["extent"] = extent

# summaries
# TODO: Better merging for summaries?
for summary in getter.get("summaries"):
print(summary)
result["summaries"] = {}
result["summaries"] = getter.select("summaries").simple_merge()
# TODO: use a more robust/user friendly backend pointer than backend id (which is internal implementation detail)
result["summaries"][self.STAC_PROPERTY_PROVIDER_BACKEND] = list(by_backend.keys())
# assets
# TODO: assets ? For now, take the union.
result["assets"] = list(getter.union("assets"))
result["assets"] = list(getter.merge_arrays("assets"))

# crs
crs_list = getter.union("crs", skip_duplicates=True)
# Required by OGC API - Features: https://docs.opengeospatial.org/is/18-058/18-058.html#_crs_identifier_list
crs_list = getter.merge_arrays("crs", skip_duplicates=True)
if crs_list:
result["crs"] = list(crs_list)
# datasource_type
data_source_type = getter.first("datasource_type", default=None)
if data_source_type:
result["datasource_type"] = data_source_type

# Scientific extension.
# sci:citation
citation_list = getter.first("sci:citations", default=None)
citation_list = getter.first("sci:citation", default=None)
if citation_list:
result["sci:citations"] = citation_list
result["sci:citation"] = citation_list
# sci:doi
doi_list = getter.first("sci:dois", default=None)
doi_list = getter.first("sci:doi", default=None)
if doi_list:
result["sci:dois"] = doi_list
result["sci:doi"] = doi_list
# sci:publications
publications_list = getter.union("sci:publications", skip_duplicates=True)
publications_list = getter.merge_arrays("sci:publications", skip_duplicates=True)
if publications_list:
result["sci:publications"] = list(publications_list)

# Log warning for collections without license links.
license_links = [l for l in list(getter.merge_arrays("links")) if l.get("rel")=="license"]
if result["license"] in ["various", "proprietary"] and not license_links:
_log.warning(f"Missing license links for collection: {cid}")
return result

@staticmethod
Expand Down Expand Up @@ -303,11 +346,11 @@ def _get_collection_metadata(self, collection_id: str) -> dict:
if len(by_backend) == 0:
raise CollectionNotFoundException(collection_id=collection_id)
elif len(by_backend) == 1:
# TODO: also go through _merge_collection_metadata procedure (for clean up/normalization)?
return by_backend.popitem()[1]
metadata = by_backend.popitem()[1]
else:
_log.info(f"Merging metadata for collection {collection_id}.")
return self._merge_collection_metadata(by_backend=by_backend)
metadata = self._merge_collection_metadata(by_backend=by_backend)
return self._normalize_metadata(metadata)

def load_collection(self, collection_id: str, load_params: LoadParameters, env: EvalEnv) -> DriverDataCube:
raise RuntimeError("openeo-aggregator does not implement concrete collection loading")
Expand Down
6 changes: 3 additions & 3 deletions src/openeo_aggregator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ def get(self, key: str) -> Iterator:
if key in d:
yield d[key]

def union(self, key: str, skip_duplicates=False) -> list:
def merge_arrays(self, key: str,skip_duplicates=False) -> list:
"""
Simple list based union of the items
(each of which must be an iterable itself, such as list or set) at given key.
This method assumes that for all dicts, dict[key] is of type iterable (list, set, ...).
It merges all iterables into a single list, with or without duplicates.
"""
result = []
for items in self.get(key):
Expand Down
Loading

0 comments on commit 0b44750

Please sign in to comment.