-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b4a27cf
commit e9d3036
Showing
14 changed files
with
419 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
20 changes: 20 additions & 0 deletions
20
webviz_subsurface/_providers/ensemble_surface_provider/_surface_to_float32_array.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import io | ||
|
||
import numpy as np | ||
import xtgeo | ||
|
||
|
||
def surface_to_float32_array(surface: xtgeo.RegularSurface) -> io.BytesIO: | ||
values = surface.values.astype(np.float32) | ||
values.fill_value = np.NaN | ||
values = np.ma.filled(values) | ||
|
||
# Rotate 90 deg left. | ||
# This will cause the width of to run along the X axis | ||
# and height of along Y axis (starting from bottom.) | ||
values = np.rot90(values) | ||
|
||
byte_io = io.BytesIO() | ||
byte_io.write(values.tobytes()) | ||
byte_io.seek(0) | ||
return byte_io |
17 changes: 17 additions & 0 deletions
17
webviz_subsurface/_providers/ensemble_surface_provider/_types.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from dataclasses import dataclass | ||
|
||
from .ensemble_surface_provider import SurfaceAddress | ||
|
||
|
||
@dataclass(frozen=True) | ||
class QualifiedSurfaceAddress: | ||
provider_id: str | ||
address: SurfaceAddress | ||
|
||
|
||
@dataclass(frozen=True) | ||
class QualifiedDiffSurfaceAddress: | ||
provider_id_a: str | ||
address_a: SurfaceAddress | ||
provider_id_b: str | ||
address_b: SurfaceAddress |
290 changes: 290 additions & 0 deletions
290
webviz_subsurface/_providers/ensemble_surface_provider/surface_array_server.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,290 @@ | ||
import io | ||
import hashlib | ||
import json | ||
import logging | ||
import math | ||
from dataclasses import asdict, dataclass | ||
from typing import List, Optional, Tuple, Union | ||
from urllib.parse import quote | ||
from uuid import uuid4 | ||
|
||
import flask | ||
import flask_caching | ||
import xtgeo | ||
from dash import Dash | ||
from webviz_config.webviz_instance_info import WEBVIZ_INSTANCE_INFO | ||
|
||
from webviz_subsurface._utils.perf_timer import PerfTimer | ||
|
||
from ._surface_to_float32_array import surface_to_float32_array | ||
from .ensemble_surface_provider import ( | ||
ObservedSurfaceAddress, | ||
SimulatedSurfaceAddress, | ||
StatisticalSurfaceAddress, | ||
SurfaceAddress, | ||
) | ||
from ._types import QualifiedSurfaceAddress, QualifiedDiffSurfaceAddress | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
_ROOT_URL_PATH = "/SurfaceArrayServer" | ||
|
||
_SURFACE_SERVER_INSTANCE: Optional["SurfaceArrayServer"] = None | ||
|
||
|
||
@dataclass(frozen=True) | ||
class SurfaceArrayMeta: | ||
x_min: float | ||
x_max: float | ||
y_min: float | ||
y_max: float | ||
x_ori: float | ||
y_ori: float | ||
val_min: float | ||
val_max: float | ||
rot_deg: float | ||
x_count: int | ||
y_count: int | ||
x_inc: float | ||
y_inc: float | ||
|
||
|
||
class SurfaceArrayServer: | ||
def __init__(self, app: Dash) -> None: | ||
cache_dir = ( | ||
WEBVIZ_INSTANCE_INFO.storage_folder | ||
/ f"SurfaceArrayServer_filecache_{uuid4()}" | ||
) | ||
LOGGER.debug(f"Setting up file cache in: {cache_dir}") | ||
self._array_cache = flask_caching.Cache( | ||
config={ | ||
"CACHE_TYPE": "FileSystemCache", | ||
"CACHE_DIR": cache_dir, | ||
"CACHE_DEFAULT_TIMEOUT": 0, | ||
} | ||
) | ||
self._array_cache.init_app(app.server) | ||
|
||
self._setup_url_rule(app) | ||
|
||
@staticmethod | ||
def instance(app: Dash) -> "SurfaceArrayServer": | ||
# pylint: disable=global-statement | ||
global _SURFACE_SERVER_INSTANCE | ||
if not _SURFACE_SERVER_INSTANCE: | ||
LOGGER.debug("Initializing SurfaceArrayServer instance") | ||
_SURFACE_SERVER_INSTANCE = SurfaceArrayServer(app) | ||
|
||
return _SURFACE_SERVER_INSTANCE | ||
|
||
def publish_surface( | ||
self, | ||
qualified_address: Union[QualifiedSurfaceAddress, QualifiedDiffSurfaceAddress], | ||
surface: xtgeo.RegularSurface, | ||
) -> None: | ||
timer = PerfTimer() | ||
|
||
if isinstance(qualified_address, QualifiedSurfaceAddress): | ||
base_cache_key = _address_to_str( | ||
qualified_address.provider_id, qualified_address.address | ||
) | ||
else: | ||
base_cache_key = _diff_address_to_str( | ||
qualified_address.provider_id_a, | ||
qualified_address.address_a, | ||
qualified_address.provider_id_b, | ||
qualified_address.address_b, | ||
) | ||
|
||
LOGGER.debug( | ||
f"Publishing surface (dim={surface.dimensions}, #cells={surface.ncol*surface.nrow}), " | ||
f"[base_cache_key={base_cache_key}]" | ||
) | ||
|
||
self._create_and_store_array_in_cache(base_cache_key, surface) | ||
|
||
LOGGER.debug(f"Surface published in: {timer.elapsed_s():.2f}s") | ||
|
||
def get_surface_metadata( | ||
self, | ||
qualified_address: Union[QualifiedSurfaceAddress, QualifiedDiffSurfaceAddress], | ||
) -> Optional[SurfaceArrayMeta]: | ||
|
||
if isinstance(qualified_address, QualifiedSurfaceAddress): | ||
base_cache_key = _address_to_str( | ||
qualified_address.provider_id, qualified_address.address | ||
) | ||
else: | ||
base_cache_key = _diff_address_to_str( | ||
qualified_address.provider_id_a, | ||
qualified_address.address_a, | ||
qualified_address.provider_id_b, | ||
qualified_address.address_b, | ||
) | ||
|
||
meta_cache_key = "META:" + base_cache_key | ||
meta: Optional[SurfaceArrayMeta] = self._array_cache.get(meta_cache_key) | ||
if not meta: | ||
return None | ||
|
||
if not isinstance(meta, SurfaceArrayMeta): | ||
LOGGER.error("Error loading SurfaceArrayMeta from cache") | ||
return None | ||
|
||
return meta | ||
|
||
@staticmethod | ||
def encode_partial_url( | ||
qualified_address: Union[QualifiedSurfaceAddress, QualifiedDiffSurfaceAddress], | ||
) -> str: | ||
|
||
if isinstance(qualified_address, QualifiedSurfaceAddress): | ||
address_str = _address_to_str( | ||
qualified_address.provider_id, qualified_address.address | ||
) | ||
else: | ||
address_str = _diff_address_to_str( | ||
qualified_address.provider_id_a, | ||
qualified_address.address_a, | ||
qualified_address.provider_id_b, | ||
qualified_address.address_b, | ||
) | ||
|
||
url_path: str = f"{_ROOT_URL_PATH}/{quote(address_str)}" | ||
return url_path | ||
|
||
def _setup_url_rule(self, app: Dash) -> None: | ||
@app.server.route(_ROOT_URL_PATH + "/<full_surf_address_str>") | ||
def _handle_surface_request(full_surf_address_str: str) -> flask.Response: | ||
LOGGER.debug( | ||
f"Handling surface_request: " | ||
f"full_surf_address_str={full_surf_address_str} " | ||
) | ||
|
||
timer = PerfTimer() | ||
|
||
array_cache_key = "ARRAY:" + full_surf_address_str | ||
LOGGER.debug(f"Looking for array in cache (key={array_cache_key}") | ||
|
||
cached_array_bytes = self._array_cache.get(array_cache_key) | ||
if not cached_array_bytes: | ||
LOGGER.error( | ||
f"Error getting array for address: {full_surf_address_str}" | ||
) | ||
flask.abort(404) | ||
|
||
response = flask.send_file( | ||
cached_array_bytes, mimetype="application/octet-stream" | ||
) | ||
LOGGER.debug( | ||
f"Request handled from array cache in: {timer.elapsed_s():.2f}s" | ||
) | ||
return response | ||
|
||
def _create_and_store_array_in_cache( | ||
self, | ||
base_cache_key: str, | ||
surface: xtgeo.RegularSurface, | ||
) -> None: | ||
|
||
timer = PerfTimer() | ||
LOGGER.debug("Converting surface to float32 array...") | ||
array_bytes: io.BytesIO = surface_to_float32_array(surface) | ||
|
||
et_to_array_s = timer.lap_s() | ||
|
||
array_cache_key = "ARRAY:" + base_cache_key | ||
meta_cache_key = "META:" + base_cache_key | ||
|
||
self._array_cache.add(array_cache_key, array_bytes) | ||
|
||
meta = SurfaceArrayMeta( | ||
x_min=surface.xmin, | ||
x_max=surface.xmax, | ||
y_min=surface.ymin, | ||
y_max=surface.ymax, | ||
x_ori=surface.xori, | ||
y_ori=surface.yori, | ||
x_count=surface.ncol, | ||
y_count=surface.nrow, | ||
val_min=surface.values.min(), | ||
val_max=surface.values.max(), | ||
rot_deg=surface.rotation, | ||
x_inc=surface.xinc, | ||
y_inc=surface.yinc, | ||
) | ||
self._array_cache.add(meta_cache_key, meta) | ||
et_write_cache_s = timer.lap_s() | ||
|
||
LOGGER.debug( | ||
f"Created surface array and wrote to cache in in: {timer.elapsed_s():.2f}s (" | ||
f"to_array={et_to_array_s:.2f}s, write_cache={et_write_cache_s:.2f}s), " | ||
f"[base_cache_key={base_cache_key}]" | ||
) | ||
|
||
|
||
def _address_to_str( | ||
provider_id: str, | ||
address: SurfaceAddress, | ||
) -> str: | ||
if isinstance(address, StatisticalSurfaceAddress): | ||
addr_type_str = "sta" | ||
elif isinstance(address, SimulatedSurfaceAddress): | ||
addr_type_str = "sim" | ||
elif isinstance(address, ObservedSurfaceAddress): | ||
addr_type_str = "obs" | ||
|
||
addr_hash = hashlib.md5( # nosec | ||
json.dumps(asdict(address), sort_keys=True).encode() | ||
).hexdigest() | ||
|
||
return f"{provider_id}___{addr_type_str}___{address.name}___{address.attribute}___{addr_hash}" | ||
|
||
|
||
def _diff_address_to_str( | ||
provider_id_a: str, | ||
address_a: SurfaceAddress, | ||
provider_id_b: str, | ||
address_b: SurfaceAddress, | ||
) -> str: | ||
return ( | ||
"diff~~~" | ||
+ _address_to_str(provider_id_a, address_a) | ||
+ "~~~" | ||
+ _address_to_str(provider_id_b, address_b) | ||
) | ||
|
||
|
||
def _calc_map_component_bounds_and_rot( | ||
surface: xtgeo.RegularSurface, | ||
) -> Tuple[List[float], float]: | ||
surf_corners = surface.get_map_xycorners() | ||
rptx = surf_corners[2][0] | ||
rpty = surf_corners[2][1] | ||
min_x = math.inf | ||
max_x = -math.inf | ||
min_y = math.inf | ||
max_y = -math.inf | ||
angle = -surface.rotation * math.pi / 180 | ||
for coord in surf_corners: | ||
xpos = coord[0] | ||
ypos = coord[1] | ||
x_rotated = ( | ||
rptx + ((xpos - rptx) * math.cos(angle)) - ((ypos - rpty) * math.sin(angle)) | ||
) | ||
y_rotated = ( | ||
rpty + ((xpos - rptx) * math.sin(angle)) + ((ypos - rpty) * math.cos(angle)) | ||
) | ||
min_x = min(min_x, x_rotated) | ||
max_x = max(max_x, x_rotated) | ||
min_y = min(min_y, y_rotated) | ||
max_y = max(max_y, y_rotated) | ||
|
||
bounds = [ | ||
min_x, | ||
min_y, | ||
max_x, | ||
max_y, | ||
] | ||
|
||
return bounds, surface.rotation |
Oops, something went wrong.