Skip to content

Commit

Permalink
Add BoundingBox for better encapsulation and reuse of bbox related logic
Browse files Browse the repository at this point in the history
Triggered from debugging load_result in openeo-geopyspark-driver for Open-EO/openeo-aggregator#95
  • Loading branch information
soxofaan committed Mar 23, 2023
1 parent c00030a commit 8fd9f55
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 4 deletions.
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.38.1a1"
__version__ = "0.39.0a1"
165 changes: 162 additions & 3 deletions openeo_driver/util/geometry.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import dataclasses
import json
import logging
import re
from pathlib import Path
from typing import Union, Tuple, Optional, Sequence, List
from typing import Union, Tuple, Optional, List, Mapping, Sequence

import pyproj
import shapely.geometry
import shapely.ops
from shapely.geometry import MultiPolygon, Point, Polygon
from shapely.geometry import MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry

from openeo_driver.errors import OpenEOApiException
from openeo_driver.util.utm import auto_utm_epsg
from openeo_driver.util.utm import auto_utm_epsg, auto_utm_epsg_for_geometry

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -332,3 +334,160 @@ def as_geojson_feature_collection(
"type": "FeatureCollection",
"features": [as_geojson_feature(f) for f in features],
}


class BoundingBoxException(ValueError):
pass


class CrsRequired(BoundingBoxException):
pass


@dataclasses.dataclass(frozen=True)
class BoundingBox:
"""
Bounding box with west, south, east, north coordinates
optionally (geo)referenced.
"""

west: float
south: float
east: float
north: float
crs: Optional[str] = dataclasses.field()

def __init__(
self,
west: float,
south: float,
east: float,
north: float,
*,
crs: Optional[Union[str, int]] = None,
):
missing = [
k
for k, v in zip(
("west", "south", "east", "north"), (west, south, east, north)
)
if v is None
]
if missing:
raise BoundingBoxException(f"Missing bounds: {missing}.")
# __setattr__ workaround to initialize read-only attributes
super().__setattr__("west", west)
super().__setattr__("south", south)
super().__setattr__("east", east)
super().__setattr__("north", north)
super().__setattr__("crs", self.normalize_crs(crs) if crs is not None else None)

@staticmethod
def normalize_crs(crs: Union[str, int]) -> str:
if isinstance(crs, int):
return f"EPSG:{crs}"
elif isinstance(crs, str):
# TODO: support other CRS'es too?
if not re.match("^epsg:\d+$", crs, flags=re.IGNORECASE):
raise BoundingBoxException(f"Invalid CRS {crs!r}")
return crs.upper()
raise BoundingBoxException(f"Invalid CRS {crs!r}")

@classmethod
def from_dict(
cls, d: Mapping, *, default_crs: Optional[Union[str, int]] = None
) -> "BoundingBox":
"""
Extract bounding box from given mapping/dict (required fields "west", "south", "east", "north"),
with optional CRS (field "crs").
:param d: dictionary with at least fields "west", "south", "east", "north", and optionally "crs"
:param default_crs: fallback CRS to use if not present in dictionary
:return:
"""
bounds = {k: d.get(k) for k in ["west", "south", "east", "north"]}
return cls(**bounds, crs=d.get("crs", default_crs))

@classmethod
def from_dict_or_none(
cls, d: Mapping, *, default_crs: Optional[Union[str, int]] = None
) -> Union["BoundingBox", None]:
"""
Like `from_dict`, but returns `None`
when no valid bounding box could be loaded from dict
"""
try:
return cls.from_dict(d=d, default_crs=default_crs)
except BoundingBoxException:
# TODO: option to log something?
return None

@classmethod
def from_wsen_tuple(
cls, wsen: Sequence[float], crs: Optional[Union[str, int]] = None
):
"""Build bounding box from tuple of west, south, east and north bounds (and optional crs"""
assert len(wsen) == 4
return cls(*wsen, crs=crs)

def is_georeferenced(self) -> bool:
return self.crs is not None

def assert_crs(self):
if self.crs is None:
raise CrsRequired(f"A CRS is required, but not available in {self}.")

def as_dict(self) -> dict:
return {
"west": self.west,
"south": self.south,
"east": self.east,
"north": self.north,
"crs": self.crs,
}

def as_tuple(self) -> Tuple[float, float, float, float, Union[str, None]]:
return (self.west, self.south, self.east, self.north, self.crs)

def as_wsen_tuple(self) -> Tuple[float, float, float, float]:
return (self.west, self.south, self.east, self.north)

def as_polygon(self) -> shapely.geometry.Polygon:
"""Get bounding box as a shapely Polygon"""
return shapely.geometry.box(
minx=self.west, miny=self.south, maxx=self.east, maxy=self.north
)

def contains(self, x: float, y: float) -> bool:
"""Check if given point is inside the bounding box"""
return (self.west <= x <= self.east) and (self.south <= y <= self.north)

def reproject(self, crs) -> "BoundingBox":
"""
Reproject bounding box to given CRS to a new bounding box.
Note that bounding box of the reprojected geometry
typically has a larger spatial coverage than the
original bounding box.
"""
self.assert_crs()
crs = self.normalize_crs(crs)
if crs == self.crs:
return self
transform = pyproj.Transformer.from_crs(
crs_from=self.crs, crs_to=crs, always_xy=True
).transform
reprojected = shapely.ops.transform(transform, self.as_polygon())
return BoundingBox(*reprojected.bounds, crs=crs)

def best_utm(self) -> int:
"""
Determine the best UTM zone for this bbox
:return: EPSG code of UTM zone, e.g. 32631 for Belgian bounding boxes
"""
self.assert_crs()
return auto_utm_epsg_for_geometry(self.as_polygon(), crs=self.crs)

def reproject_to_best_utm(self):
return self.reproject(crs=self.best_utm())
140 changes: 140 additions & 0 deletions tests/util/test_geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
as_geojson_feature,
as_geojson_feature_collection,
reproject_geometry,
BoundingBox,
BoundingBoxException,
CrsRequired,
)


Expand Down Expand Up @@ -606,3 +609,140 @@ def test_as_geojson_feature_collection_mix(self):
},
],
}


class TestBoundingBox:
def test_basic(self):
bbox = BoundingBox(1, 2, 3, 4)
assert bbox.west == 1
assert bbox.south == 2
assert bbox.east == 3
assert bbox.north == 4
assert bbox.crs is None

@pytest.mark.parametrize("crs", [4326, "EPSG:4326", "epsg:4326"])
def test_basic_with_crs(self, crs):
bbox = BoundingBox(1, 2, 3, 4, crs=crs)
assert bbox.west == 1
assert bbox.south == 2
assert bbox.east == 3
assert bbox.north == 4
assert bbox.crs == "EPSG:4326"

def test_immutable(self):
bbox = BoundingBox(1, 2, 3, 4, crs=4326)
assert bbox.west == 1
with pytest.raises(AttributeError):
bbox.west = 100
with pytest.raises(AttributeError):
bbox.crs = "EPSG:32631"
assert bbox.west == 1
assert bbox.crs == "EPSG:4326"

def test_repr(self):
bbox = BoundingBox(1, 2, 3, 4, crs=4326)
expected = "BoundingBox(west=1, south=2, east=3, north=4, crs='EPSG:4326')"
assert repr(bbox) == expected

def test_str(self):
bbox = BoundingBox(1, 2, 3, 4, crs=4326)
expected = "BoundingBox(west=1, south=2, east=3, north=4, crs='EPSG:4326')"
assert str(bbox) == expected

def test_missing_bounds(self):
with pytest.raises(BoundingBoxException, match=r"Missing bounds: \['south'\]"):
_ = BoundingBox(1, None, 3, 4)

def test_missing_invalid_crs(self):
with pytest.raises(BoundingBoxException):
_ = BoundingBox(1, 2, 3, 4, crs="foobar:42")

@pytest.mark.parametrize(
["data", "default_crs", "expected"],
[
({"west": 1, "south": 2, "east": 3, "north": 4}, None, (1, 2, 3, 4, None)),
(
{"west": 1, "south": 2, "east": 3, "north": 4},
4326,
(1, 2, 3, 4, "EPSG:4326"),
),
(
{"west": 1, "south": 2, "east": 3, "north": 4, "crs": 4326},
None,
(1, 2, 3, 4, "EPSG:4326"),
),
(
{"west": 1, "south": 2, "east": 3, "north": 4, "crs": "EPSG:4326"},
None,
(1, 2, 3, 4, "EPSG:4326"),
),
(
{"west": 1, "south": 2, "east": 3, "north": 4, "crs": "EPSG:4326"},
32631,
(1, 2, 3, 4, "EPSG:4326"),
),
],
)
def test_from_dict(self, data, default_crs, expected):
bbox = BoundingBox.from_dict(data, default_crs=default_crs)
assert (bbox.west, bbox.south, bbox.east, bbox.north, bbox.crs) == expected

def test_from_dict_invalid(self):
data = {"west": 1, "south": 2, "east": None, "northhhhhh": 4}
with pytest.raises(
BoundingBoxException, match=r"Missing bounds: \['east', 'north'\]"
):
_ = BoundingBox.from_dict(data)

def test_from_dict_or_none(self):
data = {"west": 1, "south": 2, "east": None, "northhhhhh": 4}
bbox = BoundingBox.from_dict_or_none(data)
assert bbox is None

def test_from_wsen_tuple(self):
bbox = BoundingBox.from_wsen_tuple((4, 3, 2, 1))
expected = (4, 3, 2, 1, None)
assert (bbox.west, bbox.south, bbox.east, bbox.north, bbox.crs) == expected

def test_from_wsen_tuple_with_crs(self):
bbox = BoundingBox.from_wsen_tuple((4, 3, 2, 1), crs=32631)
expected = (4, 3, 2, 1, "EPSG:32631")
assert (bbox.west, bbox.south, bbox.east, bbox.north, bbox.crs) == expected

def test_is_georeferenced(self):
assert BoundingBox(1, 2, 3, 4).is_georeferenced() is False
assert BoundingBox(1, 2, 3, 4, crs=4326).is_georeferenced() is True

def test_as_tuple(self):
bbox = BoundingBox(1, 2, 3, 4)
assert bbox.as_tuple() == (1, 2, 3, 4, None)
bbox = BoundingBox(1, 2, 3, 4, crs="epsg:4326")
assert bbox.as_tuple() == (1, 2, 3, 4, "EPSG:4326")

def test_as_wsen_tuple(self):
assert BoundingBox(1, 2, 3, 4).as_wsen_tuple() == (1, 2, 3, 4)
assert BoundingBox(1, 2, 3, 4, crs="epsg:4326").as_wsen_tuple() == (1, 2, 3, 4)

def test_reproject(self):
bbox = BoundingBox(3, 51, 3.1, 51.1, crs="epsg:4326")
reprojected = bbox.reproject(32631)
assert isinstance(reprojected, BoundingBox)
assert reprojected.as_tuple() == (
pytest.approx(500000, abs=10),
pytest.approx(5649824, abs=10),
pytest.approx(507016, abs=10),
pytest.approx(5660950, abs=10),
"EPSG:32631",
)

def test_best_utm_no_crs(self):
bbox = BoundingBox(1, 2, 3, 4)
with pytest.raises(CrsRequired):
_ = bbox.best_utm()

def test_best_utm(self):
bbox = BoundingBox(4, 51, 4.1, 51.1, crs="EPSG:4326")
assert bbox.best_utm() == 32631

bbox = BoundingBox(-72, -13, -71, -12, crs="EPSG:4326")
assert bbox.best_utm() == 32719

0 comments on commit 8fd9f55

Please sign in to comment.