Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill a usability gap in iris.analysis.Trajectory #2770

Merged
merged 5 commits into from
May 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Added :meth:`iris.analysis.trajectory.interpolate` that allows you interpolate to find values along a trajectory.
69 changes: 68 additions & 1 deletion lib/iris/analysis/trajectory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2010 - 2017, Met Office
# (C) British Crown Copyright 2010 - 2018, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -132,6 +132,73 @@ def __repr__(self):
return 'Trajectory(%s, sample_count=%s)' % (self.waypoints,
self.sample_count)

def _get_interp_points(self):
"""
Translate `self.sampled_points` to the format expected by the
interpolator.

Returns:
`self.sampled points` in the format required by
`:func:`~iris.analysis.trajectory.interpolate`.

"""
points = {k: [point_dict[k] for point_dict in self.sampled_points]
for k in self.sampled_points[0].keys()}
return [(k, v) for k, v in points.items()]

def _src_cube_anon_dims(self, cube):
"""
A helper method to locate the index of anonymous dimensions on the
interpolation target, ``cube``.

Returns:
The index of any anonymous dimensions in ``cube``.

"""
named_dims = [cube.coord_dims(c)[0] for c in cube.dim_coords]
return list(set(range(cube.ndim)) - set(named_dims))

def interpolate(self, cube, method=None):
"""
Calls :func:`~iris.analysis.trajectory.interpolate` to interpolate
``cube`` on the defined trajectory.

Assumes that the coordinate names supplied in the waypoints
dictionaries match to coordinate names in `cube`, and that points are
supplied in the same coord_system as in `cube`, where appropriate (i.e.
for horizontal coordinate points).

Args:

* cube
The source Cube to interpolate.

Kwargs:

* method:
The interpolation method to use; "linear" (default) or "nearest".
Only nearest is available when specifying multi-dimensional
coordinates.

"""
sample_points = self._get_interp_points()
interpolated_cube = interpolate(cube, sample_points, method=method)
# Add an "index" coord to name the anonymous dimension produced by
# the interpolation, if present.
if len(interpolated_cube.dim_coords) < interpolated_cube.ndim:
# Add a new coord `index` to describe the new dimension created by
# interpolating.
index_coord = iris.coords.DimCoord(range(self.sample_count),
long_name='index')
# Make sure anonymous dims in `cube` do not mistakenly get labelled
# as the new `index` dimension created by interpolating.
src_anon_dims = self._src_cube_anon_dims(cube)
interp_anon_dims = self._src_cube_anon_dims(interpolated_cube)
anon_dim_index, = list(set(interp_anon_dims) - set(src_anon_dims))
# Add the new coord to the interpolated cube.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

interpolated_cube.add_dim_coord(index_coord, anon_dim_index)
return interpolated_cube


def interpolate(cube, sample_points, method=None):
"""
Expand Down
114 changes: 113 additions & 1 deletion lib/iris/tests/unit/analysis/trajectory/test_Trajectory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2016, Met Office
# (C) British Crown Copyright 2016 - 2018, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -26,9 +26,12 @@
# importing anything else.
import iris.tests as tests

import mock

import numpy as np

from iris.analysis.trajectory import Trajectory
from iris.tests.stock import simple_3d, simple_4d_with_hybrid_height


class Test___init__(tests.IrisTest):
Expand Down Expand Up @@ -70,5 +73,114 @@ def test_zigzag(self):
{'lat': 0.12499999999999989, 'lon': 3.875})


class Test__get_interp_points(tests.IrisTest):
def test_basic(self):
dim_names = 'lat'
waypoints = [{dim_names: 0}, {dim_names: 1}]
sample_count = 5
trajectory = Trajectory(waypoints, sample_count=sample_count)
result = trajectory._get_interp_points()
expected_points = list(np.linspace(0, 1, sample_count))

self.assertEqual(len(result), len(waypoints[0]))
self.assertEqual(len(result[0][1]), sample_count)
self.assertEqual(result[0][1], expected_points)
self.assertEqual(result[0][0], dim_names)

def test_2d(self):
dim_names = ['lat', 'lon']
waypoints = [{dim_names[0]: 0, dim_names[1]: 0},
{dim_names[0]: 1, dim_names[1]: 2}]
sample_count = 5
trajectory = Trajectory(waypoints, sample_count=sample_count)
result = trajectory._get_interp_points()

self.assertEqual(len(result), len(waypoints[0]))
self.assertEqual(len(result[0][1]), sample_count)
self.assertEqual(len(result[1][1]), sample_count)
self.assertIn(result[0][0], dim_names)
self.assertIn(result[1][0], dim_names)

def test_3d(self):
dim_names = ['y', 'x', 'z']
waypoints = [{dim_names[0]: 0, dim_names[1]: 0, dim_names[2]: 2},
{dim_names[0]: 1, dim_names[1]: 2, dim_names[2]: 10}]
sample_count = 5
trajectory = Trajectory(waypoints, sample_count=sample_count)
result = trajectory._get_interp_points()

self.assertEqual(len(result), len(waypoints[0]))
self.assertEqual(len(result[0][1]), sample_count)
self.assertEqual(len(result[1][1]), sample_count)
self.assertEqual(len(result[2][1]), sample_count)
self.assertIn(result[0][0], dim_names)
self.assertIn(result[1][0], dim_names)
self.assertIn(result[2][0], dim_names)


class Test_interpolate(tests.IrisTest):
def _result_cube_metadata(self, res_cube):
dim_names = [c.name() for c in res_cube.dim_coords]
named_dims = [res_cube.coord_dims(c)[0] for c in res_cube.dim_coords]
anon_dims = list(set(range(res_cube.ndim)) - set(named_dims))
anon_dims = None if not len(anon_dims) else anon_dims
return dim_names, named_dims, anon_dims

def test_cube__simple_3d(self):
# Test that an 'index' coord is added to the resultant cube.
cube = simple_3d()
waypoints = [{'latitude': 40, 'longitude': 40},
{'latitude': 0, 'longitude': 0}]
sample_count = 3
new_coord_name = 'index'
trajectory = Trajectory(waypoints, sample_count=sample_count)
result = trajectory.interpolate(cube)

dim_names, named_dims, anon_dims = self._result_cube_metadata(result)
new_coord = result.coord(new_coord_name)
exp_named_dims = [0, 1]

self.assertEqual(result.ndim, cube.ndim - 1)
self.assertIn(new_coord_name, dim_names)
self.assertEqual(named_dims, exp_named_dims)
self.assertIsNone(anon_dims)
self.assertEqual(len(new_coord.points), sample_count)

def test_cube__anon_dim(self):
cube = simple_4d_with_hybrid_height()
cube.remove_coord('model_level_number') # Make cube dim 1 anonymous.
waypoints = [{'grid_latitude': 21, 'grid_longitude': 31},
{'grid_latitude': 23, 'grid_longitude': 33}]
sample_count = 4
new_coord_name = 'index'
trajectory = Trajectory(waypoints, sample_count=sample_count)
result = trajectory.interpolate(cube)

dim_names, named_dims, anon_dims = self._result_cube_metadata(result)
new_coord = result.coord(new_coord_name)
exp_named_dims = [0, 2]
exp_anon_dims = [1]

self.assertEqual(result.ndim, cube.ndim - 1)
self.assertIn(new_coord_name, dim_names)
self.assertEqual(named_dims, exp_named_dims)
self.assertEqual(anon_dims, exp_anon_dims)
self.assertEqual(len(new_coord.points), sample_count)

def test_call(self):
# Test that :func:`iris.analysis.trajectory.interpolate` is called by
# `Trajectory.interpolate`.
cube = simple_3d()
to_patch = 'iris.analysis.trajectory.interpolate'
waypoints = [{'latitude': 40, 'longitude': 40},
{'latitude': 0, 'longitude': 0}]
sample_count = 3
trajectory = Trajectory(waypoints, sample_count=sample_count)

with mock.patch(to_patch, return_value=cube) as mock_interpolate:
trajectory.interpolate(cube)
mock_interpolate.assert_called_once()


if __name__ == "__main__":
tests.main()