diff --git a/docs/iris/src/whatsnew/contributions_2.1/newfeature_2018-Jan-18_trajectory-interpolate.txt b/docs/iris/src/whatsnew/contributions_2.1/newfeature_2018-Jan-18_trajectory-interpolate.txt new file mode 100644 index 0000000000..ff6c3076e1 --- /dev/null +++ b/docs/iris/src/whatsnew/contributions_2.1/newfeature_2018-Jan-18_trajectory-interpolate.txt @@ -0,0 +1 @@ +* Added :meth:`iris.analysis.trajectory.interpolate` that allows you interpolate to find values along a trajectory. \ No newline at end of file diff --git a/lib/iris/analysis/trajectory.py b/lib/iris/analysis/trajectory.py index c4a0f9a0e1..c2fa52ee88 100644 --- a/lib/iris/analysis/trajectory.py +++ b/lib/iris/analysis/trajectory.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2010 - 2017, Met Office +# (C) British Crown Copyright 2010 - 2018, Met Office # # This file is part of Iris. # @@ -132,6 +132,73 @@ def __repr__(self): return 'Trajectory(%s, sample_count=%s)' % (self.waypoints, self.sample_count) + def _get_interp_points(self): + """ + Translate `self.sampled_points` to the format expected by the + interpolator. + + Returns: + `self.sampled points` in the format required by + `:func:`~iris.analysis.trajectory.interpolate`. + + """ + points = {k: [point_dict[k] for point_dict in self.sampled_points] + for k in self.sampled_points[0].keys()} + return [(k, v) for k, v in points.items()] + + def _src_cube_anon_dims(self, cube): + """ + A helper method to locate the index of anonymous dimensions on the + interpolation target, ``cube``. + + Returns: + The index of any anonymous dimensions in ``cube``. + + """ + named_dims = [cube.coord_dims(c)[0] for c in cube.dim_coords] + return list(set(range(cube.ndim)) - set(named_dims)) + + def interpolate(self, cube, method=None): + """ + Calls :func:`~iris.analysis.trajectory.interpolate` to interpolate + ``cube`` on the defined trajectory. + + Assumes that the coordinate names supplied in the waypoints + dictionaries match to coordinate names in `cube`, and that points are + supplied in the same coord_system as in `cube`, where appropriate (i.e. + for horizontal coordinate points). + + Args: + + * cube + The source Cube to interpolate. + + Kwargs: + + * method: + The interpolation method to use; "linear" (default) or "nearest". + Only nearest is available when specifying multi-dimensional + coordinates. + + """ + sample_points = self._get_interp_points() + interpolated_cube = interpolate(cube, sample_points, method=method) + # Add an "index" coord to name the anonymous dimension produced by + # the interpolation, if present. + if len(interpolated_cube.dim_coords) < interpolated_cube.ndim: + # Add a new coord `index` to describe the new dimension created by + # interpolating. + index_coord = iris.coords.DimCoord(range(self.sample_count), + long_name='index') + # Make sure anonymous dims in `cube` do not mistakenly get labelled + # as the new `index` dimension created by interpolating. + src_anon_dims = self._src_cube_anon_dims(cube) + interp_anon_dims = self._src_cube_anon_dims(interpolated_cube) + anon_dim_index, = list(set(interp_anon_dims) - set(src_anon_dims)) + # Add the new coord to the interpolated cube. + interpolated_cube.add_dim_coord(index_coord, anon_dim_index) + return interpolated_cube + def interpolate(cube, sample_points, method=None): """ diff --git a/lib/iris/tests/unit/analysis/trajectory/test_Trajectory.py b/lib/iris/tests/unit/analysis/trajectory/test_Trajectory.py index 85ca32e979..e15a8c3d4f 100644 --- a/lib/iris/tests/unit/analysis/trajectory/test_Trajectory.py +++ b/lib/iris/tests/unit/analysis/trajectory/test_Trajectory.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2016, Met Office +# (C) British Crown Copyright 2016 - 2018, Met Office # # This file is part of Iris. # @@ -26,9 +26,12 @@ # importing anything else. import iris.tests as tests +import mock + import numpy as np from iris.analysis.trajectory import Trajectory +from iris.tests.stock import simple_3d, simple_4d_with_hybrid_height class Test___init__(tests.IrisTest): @@ -70,5 +73,114 @@ def test_zigzag(self): {'lat': 0.12499999999999989, 'lon': 3.875}) +class Test__get_interp_points(tests.IrisTest): + def test_basic(self): + dim_names = 'lat' + waypoints = [{dim_names: 0}, {dim_names: 1}] + sample_count = 5 + trajectory = Trajectory(waypoints, sample_count=sample_count) + result = trajectory._get_interp_points() + expected_points = list(np.linspace(0, 1, sample_count)) + + self.assertEqual(len(result), len(waypoints[0])) + self.assertEqual(len(result[0][1]), sample_count) + self.assertEqual(result[0][1], expected_points) + self.assertEqual(result[0][0], dim_names) + + def test_2d(self): + dim_names = ['lat', 'lon'] + waypoints = [{dim_names[0]: 0, dim_names[1]: 0}, + {dim_names[0]: 1, dim_names[1]: 2}] + sample_count = 5 + trajectory = Trajectory(waypoints, sample_count=sample_count) + result = trajectory._get_interp_points() + + self.assertEqual(len(result), len(waypoints[0])) + self.assertEqual(len(result[0][1]), sample_count) + self.assertEqual(len(result[1][1]), sample_count) + self.assertIn(result[0][0], dim_names) + self.assertIn(result[1][0], dim_names) + + def test_3d(self): + dim_names = ['y', 'x', 'z'] + waypoints = [{dim_names[0]: 0, dim_names[1]: 0, dim_names[2]: 2}, + {dim_names[0]: 1, dim_names[1]: 2, dim_names[2]: 10}] + sample_count = 5 + trajectory = Trajectory(waypoints, sample_count=sample_count) + result = trajectory._get_interp_points() + + self.assertEqual(len(result), len(waypoints[0])) + self.assertEqual(len(result[0][1]), sample_count) + self.assertEqual(len(result[1][1]), sample_count) + self.assertEqual(len(result[2][1]), sample_count) + self.assertIn(result[0][0], dim_names) + self.assertIn(result[1][0], dim_names) + self.assertIn(result[2][0], dim_names) + + +class Test_interpolate(tests.IrisTest): + def _result_cube_metadata(self, res_cube): + dim_names = [c.name() for c in res_cube.dim_coords] + named_dims = [res_cube.coord_dims(c)[0] for c in res_cube.dim_coords] + anon_dims = list(set(range(res_cube.ndim)) - set(named_dims)) + anon_dims = None if not len(anon_dims) else anon_dims + return dim_names, named_dims, anon_dims + + def test_cube__simple_3d(self): + # Test that an 'index' coord is added to the resultant cube. + cube = simple_3d() + waypoints = [{'latitude': 40, 'longitude': 40}, + {'latitude': 0, 'longitude': 0}] + sample_count = 3 + new_coord_name = 'index' + trajectory = Trajectory(waypoints, sample_count=sample_count) + result = trajectory.interpolate(cube) + + dim_names, named_dims, anon_dims = self._result_cube_metadata(result) + new_coord = result.coord(new_coord_name) + exp_named_dims = [0, 1] + + self.assertEqual(result.ndim, cube.ndim - 1) + self.assertIn(new_coord_name, dim_names) + self.assertEqual(named_dims, exp_named_dims) + self.assertIsNone(anon_dims) + self.assertEqual(len(new_coord.points), sample_count) + + def test_cube__anon_dim(self): + cube = simple_4d_with_hybrid_height() + cube.remove_coord('model_level_number') # Make cube dim 1 anonymous. + waypoints = [{'grid_latitude': 21, 'grid_longitude': 31}, + {'grid_latitude': 23, 'grid_longitude': 33}] + sample_count = 4 + new_coord_name = 'index' + trajectory = Trajectory(waypoints, sample_count=sample_count) + result = trajectory.interpolate(cube) + + dim_names, named_dims, anon_dims = self._result_cube_metadata(result) + new_coord = result.coord(new_coord_name) + exp_named_dims = [0, 2] + exp_anon_dims = [1] + + self.assertEqual(result.ndim, cube.ndim - 1) + self.assertIn(new_coord_name, dim_names) + self.assertEqual(named_dims, exp_named_dims) + self.assertEqual(anon_dims, exp_anon_dims) + self.assertEqual(len(new_coord.points), sample_count) + + def test_call(self): + # Test that :func:`iris.analysis.trajectory.interpolate` is called by + # `Trajectory.interpolate`. + cube = simple_3d() + to_patch = 'iris.analysis.trajectory.interpolate' + waypoints = [{'latitude': 40, 'longitude': 40}, + {'latitude': 0, 'longitude': 0}] + sample_count = 3 + trajectory = Trajectory(waypoints, sample_count=sample_count) + + with mock.patch(to_patch, return_value=cube) as mock_interpolate: + trajectory.interpolate(cube) + mock_interpolate.assert_called_once() + + if __name__ == "__main__": tests.main()