Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add utilities for vector magnitude and normalisation #243

Merged
merged 13 commits into from
Aug 16, 2024
34 changes: 13 additions & 21 deletions examples/compute_kinematics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
# Imports
# -------

import numpy as np

# For interactive plots: install ipympl with `pip install ipympl` and uncomment
# the following line in your notebook
# %matplotlib widget
from matplotlib import pyplot as plt

from movement import sample_data
from movement.utils.vector import compute_norm

# %%
# Load sample dataset
Expand Down Expand Up @@ -255,13 +254,12 @@
# mouse along its trajectory.

# length of each displacement vector
displacement_vectors_lengths = np.linalg.norm(
displacement.sel(individuals=mouse_name, space=["x", "y"]).squeeze(),
axis=1,
displacement_vectors_lengths = compute_norm(
displacement.sel(individuals=mouse_name)
)

# sum of all displacement vectors
total_displacement = np.sum(displacement_vectors_lengths, axis=0) # in pixels
# sum the lengths of all displacement vectors (in pixels)
total_displacement = displacement_vectors_lengths.sum(dim="time").values[0]

print(
f"The mouse {mouse_name}'s trajectory is {total_displacement:.2f} "
Expand Down Expand Up @@ -299,14 +297,12 @@
# uses second order central differences.

# %%
# We can also visualise the speed, as the norm of the velocity vector:
# We can also visualise the speed, as the magnitude (norm)
# of the velocity vector:
fig, axes = plt.subplots(3, 1, sharex=True, sharey=True)
for mouse_name, ax in zip(velocity.individuals.values, axes, strict=False):
# compute the norm of the velocity vector for one mouse
speed_one_mouse = np.linalg.norm(
velocity.sel(individuals=mouse_name, space=["x", "y"]).squeeze(),
axis=1,
)
# compute the magnitude of the velocity vector for one mouse
speed_one_mouse = compute_norm(velocity.sel(individuals=mouse_name))
# plot speed against time
ax.plot(speed_one_mouse)
ax.set_title(mouse_name)
Expand Down Expand Up @@ -379,16 +375,12 @@
fig.tight_layout()

# %%
# The norm of the acceleration vector is the magnitude of the
# acceleration.
# We can also represent this for each individual.
# The can also represent the magnitude (norm) of the acceleration vector
# for each individual:
fig, axes = plt.subplots(3, 1, sharex=True, sharey=True)
for mouse_name, ax in zip(accel.individuals.values, axes, strict=False):
# compute norm of the acceleration vector for one mouse
accel_one_mouse = np.linalg.norm(
accel.sel(individuals=mouse_name, space=["x", "y"]).squeeze(),
axis=1,
)
# compute magnitude of the acceleration vector for one mouse
accel_one_mouse = compute_norm(accel.sel(individuals=mouse_name))

# plot acceleration against time
ax.plot(accel_one_mouse)
Expand Down
102 changes: 96 additions & 6 deletions movement/utils/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,93 @@
from movement.utils.logging import log_error


def compute_norm(data: xr.DataArray) -> xr.DataArray:
"""Compute the norm of the vectors along the spatial dimension.

The norm of a vector is its magnitude, also called Euclidean norm, 2-norm
or Euclidean length. Note that if the input data is expressed in polar
coordinates, the magnitude of a vector is the same as its radial coordinate
``rho``.

Parameters
----------
data : xarray.DataArray
The input data array containing either ``space`` or ``space_pol``
as a dimension.

Returns
-------
xarray.DataArray
A data array holding the norm of the input vectors.
Note that this output array has no spatial dimension but preserves
all other dimensions of the input data array (see Notes).

Notes
-----
If the input data array is a ``position`` array, this function will compute
the magnitude of the position vectors, for every individual and keypoint,
at every timestep. If the input data array is a ``shape`` array of a
bounding boxes dataset, it will compute the magnitude of the shape
vectors (i.e., the diagonal of the bounding box),
for every individual and at every timestep.


"""
if "space" in data.dims:
_validate_dimension_coordinates(data, {"space": ["x", "y"]})
return xr.apply_ufunc(
np.linalg.norm,
data,
input_core_dims=[["space"]],
kwargs={"axis": -1},
)
elif "space_pol" in data.dims:
_validate_dimension_coordinates(data, {"space_pol": ["rho", "phi"]})
return data.sel(space_pol="rho", drop=True)
else:
_raise_error_for_missing_spatial_dim()


def convert_to_unit(data: xr.DataArray) -> xr.DataArray:
"""Convert the vectors along the spatial dimension into unit vectors.

A unit vector is a vector pointing in the same direction as the original
vector but with norm = 1.

Parameters
----------
data : xarray.DataArray
The input data array containing either ``space`` or ``space_pol``
as a dimension.

Returns
-------
xarray.DataArray
A data array holding the unit vectors of the input data array
(all input dimensions are preserved).

Notes
-----
Note that the unit vector for the null vector is undefined, since the null
vector has 0 norm and no direction associated with it.

"""
if "space" in data.dims:
_validate_dimension_coordinates(data, {"space": ["x", "y"]})
return data / compute_norm(data)
elif "space_pol" in data.dims:
_validate_dimension_coordinates(data, {"space_pol": ["rho", "phi"]})
# Set both rho and phi values to NaN at null vectors (where rho = 0)
new_data = xr.where(data.sel(space_pol="rho") == 0, np.nan, data)
# Set the rho values to 1 for non-null vectors (phi is preserved)
new_data.loc[{"space_pol": "rho"}] = xr.where(
new_data.sel(space_pol="rho").isnull(), np.nan, 1
)
return new_data
else:
_raise_error_for_missing_spatial_dim()


def cart2pol(data: xr.DataArray) -> xr.DataArray:
"""Transform Cartesian coordinates to polar.

Expand All @@ -25,12 +112,7 @@ def cart2pol(data: xr.DataArray) -> xr.DataArray:

"""
_validate_dimension_coordinates(data, {"space": ["x", "y"]})
rho = xr.apply_ufunc(
np.linalg.norm,
data,
input_core_dims=[["space"]],
kwargs={"axis": -1},
)
rho = compute_norm(data)
phi = xr.apply_ufunc(
np.arctan2,
data.sel(space="y"),
Expand Down Expand Up @@ -122,3 +204,11 @@ def _validate_dimension_coordinates(
)
if error_message:
raise log_error(ValueError, error_message)


def _raise_error_for_missing_spatial_dim() -> None:
raise log_error(
ValueError,
"Input data array must contain either 'space' or 'space_pol' "
"as dimensions.",
)
67 changes: 67 additions & 0 deletions tests/test_unit/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,70 @@ def test_pol2cart(self, ds, expected_exception, request):
with expected_exception:
result = vector.pol2cart(ds.pol)
xr.testing.assert_allclose(result, ds.cart)

@pytest.mark.parametrize(
"ds, expected_exception",
[
("cart_pol_dataset", does_not_raise()),
("cart_pol_dataset_with_nan", does_not_raise()),
("cart_pol_dataset_missing_cart_dim", pytest.raises(ValueError)),
(
"cart_pol_dataset_missing_cart_coords",
pytest.raises(ValueError),
),
],
)
def test_compute_norm(self, ds, expected_exception, request):
"""Test vector norm computation with known values."""
ds = request.getfixturevalue(ds)
with expected_exception:
# validate the norm computation
result = vector.compute_norm(ds.cart)
expected = np.sqrt(
ds.cart.sel(space="x") ** 2 + ds.cart.sel(space="y") ** 2
)
xr.testing.assert_allclose(result, expected)

# result should be the same from Cartesian and polar coordinates
xr.testing.assert_allclose(result, vector.compute_norm(ds.pol))

# The result should only contain the time dimension.
assert result.dims == ("time",)

@pytest.mark.parametrize(
"ds, expected_exception",
[
("cart_pol_dataset", does_not_raise()),
("cart_pol_dataset_with_nan", does_not_raise()),
("cart_pol_dataset_missing_cart_dim", pytest.raises(ValueError)),
],
)
def test_convert_to_unit(self, ds, expected_exception, request):
"""Test conversion to unit vectors (normalisation)."""
ds = request.getfixturevalue(ds)
with expected_exception:
# normalise both the Cartesian and the polar data to unit vectors
unit_cart = vector.convert_to_unit(ds.cart)
unit_pol = vector.convert_to_unit(ds.pol)
# they should yield the same result, just in different coordinates
xr.testing.assert_allclose(unit_cart, vector.pol2cart(unit_pol))
xr.testing.assert_allclose(unit_pol, vector.cart2pol(unit_cart))

# since we established that polar vs Cartesian unit vectors are
# equivalent, it's enough to do other assertions on either one

# the normalised data should have the same dimensions as the input
assert unit_cart.dims == ds.cart.dims

# unit vector should be NaN if the input vector was null or NaN
is_null_vec = (ds.cart == 0).all("space") # null vec: x=0, y=0
is_nan_vec = ds.cart.isnull().any("space") # any NaN in x or y
expected_nan_idxs = is_null_vec | is_nan_vec
assert unit_cart.where(expected_nan_idxs).isnull().all()

# For non-NaN unit vectors in polar coordinates, the rho values
# should be 1 and the phi values should be the same as the input
expected_unit_pol = ds.pol.copy()
expected_unit_pol.loc[{"space_pol": "rho"}] = 1
expected_unit_pol = expected_unit_pol.where(~expected_nan_idxs)
xr.testing.assert_allclose(unit_pol, expected_unit_pol)
Loading