Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(kernelcpd): exhaustive test for kernelcpd #108

Merged
merged 9 commits into from
Feb 1, 2021
3 changes: 3 additions & 0 deletions src/ruptures/detection/binseg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ruptures.base import BaseCost, BaseEstimator
from ruptures.costs import cost_factory
from ruptures.utils import pairwise
import numpy as np


class Binseg(BaseEstimator):
Expand Down Expand Up @@ -84,6 +85,8 @@ def _seg(self, n_bkps=None, pen=None, epsilon=None):
def _single_bkp(self, start, end):
"""Return the optimal breakpoint of [start:end] (if it exists)."""
segment_cost = self.cost.error(start, end)
if np.isinf(segment_cost) and segment_cost < 0: # if constant on segment
return None, 0
gain_list = list()
for bkp in range(start, end, self.jump):
if bkp - start > self.min_size and end - bkp > self.min_size:
Expand Down
8 changes: 1 addition & 7 deletions src/ruptures/detection/kernelcpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from ruptures.costs import cost_factory
from ruptures.utils import from_path_matrix_to_bkps_list, sanity_check

# from ruptures.detection._detection.ekcpd import (ekcpd_cosine, ekcpd_Gaussian,
from ._detection.ekcpd import (
ekcpd_cosine,
ekcpd_Gaussian,
Expand All @@ -14,11 +13,6 @@
ekcpd_pelt_L2,
)

# from ..utils._utils.convert_path_matrix import from_path_matrix_to_bkps_list


# from ruptures.utils._utils.convert_path_matrix import from_path_matrix_to_bkps_list


class KernelCPD(BaseEstimator):

Expand All @@ -43,7 +37,7 @@ def __init__(self, kernel="linear", min_size=2, jump=1, params=None):
- `cosine`: $k(x,y)= (x^T y)/(\|x\|\|y\|$.

Args:
kernel (str, optional): name of the kernel, ["linear", "rbf"]
kernel (str, optional): name of the kernel, ["linear", "rbf", "cosine"]
min_size (int, optional): minimum segment length.
jump (int, optional): not considered, set to 1.
params (dict, optional): a dictionary of parameters for the kernel instance
Expand Down
6 changes: 5 additions & 1 deletion src/ruptures/detection/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def _seg(self, n_bkps=None, pen=None, epsilon=None):
# peak search
# forcing order to be above one in case jump is too large (issue #16)
order = max(max(self.width, 2 * self.min_size) // (2 * self.jump), 1)
(peak_inds_shifted,) = argrelmax(self.score, order=order, mode="wrap")
peak_inds_shifted = argrelmax(self.score, order=order, mode="wrap")[0]

if peak_inds_shifted.size == 0: # no peaks if the score is constant
return bkps
Expand Down Expand Up @@ -129,6 +129,10 @@ def fit(self, signal) -> "Window":
for k in self.inds:
start, end = k - self.width // 2, k + self.width // 2
gain = self.cost.error(start, end)
if np.isinf(gain) and gain < 0:
# segment is constant and no improvment possible on start .. end
score.append(0)
continue
gain -= self.cost.error(start, k) + self.cost.error(k, end)
score.append(gain)
self.score = np.array(score)
Expand Down
3 changes: 3 additions & 0 deletions src/ruptures/utils/bnode.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Binary node."""
import functools
import numpy as np


@functools.total_ordering
Expand All @@ -25,6 +26,8 @@ def gain(self):
return 0
elif abs(self.val) < 1e-8:
return 0
elif np.isinf(self.val) and self.val < 0:
return 0
return self.val - (self.left.val + self.right.val)

def __lt__(self, other):
Expand Down
124 changes: 97 additions & 27 deletions tests/test_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def signal_bkps_1D_constant():

@pytest.mark.parametrize("algo", [Binseg, BottomUp, Dynp, Pelt, Window])
def test_empty(signal_bkps_1D, algo):
signal, bkps = signal_bkps_1D
signal, _ = signal_bkps_1D
algo().fit(signal).predict(1)
algo().fit_predict(signal, 1)

Expand All @@ -50,18 +50,24 @@ def test_empty(signal_bkps_1D, algo):
product([Binseg, BottomUp, Window], ["l1", "l2", "ar", "normal", "rbf", "rank"]),
)
def test_model_1D(signal_bkps_1D, algo, model):
signal, bkps = signal_bkps_1D
algo().fit_predict(signal, pen=1)
algo().fit_predict(signal, n_bkps=1)
algo().fit_predict(signal, epsilon=10)
signal, _ = signal_bkps_1D
algo(model=model).fit_predict(signal, pen=1)
ret = algo(model=model).fit_predict(signal, n_bkps=1)
assert len(ret) == 2
assert ret[-1] == signal.shape[0]
algo(model=model).fit_predict(signal, epsilon=10)


@pytest.mark.parametrize(
"algo, model", product([Dynp, Pelt], ["l1", "l2", "ar", "normal", "rbf", "rank"])
)
def test_model_1D_bis(signal_bkps_1D, algo, model):
signal, bkps = signal_bkps_1D
algo().fit_predict(signal, 1)
algo_t = algo(model=model)
ret = algo_t.fit_predict(signal, 1)
if isinstance(algo_t, Dynp):
assert len(ret) == 2
assert ret[-1] == signal.shape[0]


@pytest.mark.parametrize(
Expand All @@ -73,7 +79,28 @@ def test_model_1D_bis(signal_bkps_1D, algo, model):
)
def test_model_1D_constant(signal_bkps_1D_constant, algo, model):
signal, bkps = signal_bkps_1D_constant
algo().fit_predict(signal, 1)
algo_t = algo(model=model)
ret = algo_t.fit_predict(signal, 1)
if isinstance(algo_t, Dynp) or isinstance(algo_t, BottomUp):
# With constant signal, those search methods
# will return another break points alongside signal.shape[0]
assert len(ret) == 2
if isinstance(algo_t, Binseg):
if model == "normal":
# With constant signal, this search method with normal cost
# will return only signal.shape[0] as breaking points
assert len(ret) == 1
else:
# With constant signal, this search method with another cost
# will return another break points alongside signal.shape[0]
assert len(ret) == 2
if isinstance(algo_t, Window):
# With constant signal, this search methods
# will return only signal.shape[0] as breaking points
assert len(ret) == 1
if isinstance(algo_t, Pelt):
assert len(ret) <= 2
assert ret[-1] == signal.shape[0]


@pytest.mark.parametrize(
Expand All @@ -83,32 +110,46 @@ def test_model_1D_constant(signal_bkps_1D_constant, algo, model):
),
)
def test_model_5D(signal_bkps_5D, algo, model):
signal, bkps = signal_bkps_5D
algo().fit_predict(signal, pen=1)
algo().fit_predict(signal, n_bkps=1)
algo().fit_predict(signal, epsilon=10)
signal, _ = signal_bkps_5D
algo(model=model).fit_predict(signal, pen=1)
ret = algo(model=model).fit_predict(signal, n_bkps=1)
assert len(ret) == 2
algo(model=model).fit_predict(signal, epsilon=10)


@pytest.mark.parametrize(
"algo, model",
product([Dynp, Pelt], ["l1", "l2", "linear", "normal", "rbf", "rank"]),
)
def test_model_5D_bis(signal_bkps_5D, algo, model):
signal, bkps = signal_bkps_5D
algo().fit_predict(signal, 1)
signal, _ = signal_bkps_5D
algo_t = algo(model=model)
ret = algo_t.fit_predict(signal, 1)
if isinstance(algo_t, Dynp):
assert len(ret) == 2


@pytest.mark.parametrize("algo", [Binseg, BottomUp, Window, Dynp, Pelt])
def test_custom_cost(signal_bkps_1D, algo):
signal, bkps = signal_bkps_1D
signal, _ = signal_bkps_1D
c = CostAR(order=10)
algo(custom_cost=c).fit_predict(signal, 1)
algo_t = algo(custom_cost=c)
ret = algo_t.fit_predict(signal, 1)
if isinstance(algo_t, Pelt):
assert len(ret) >= 2
else:
assert len(ret) == 2


@pytest.mark.parametrize("algo", [Binseg, BottomUp, Window, Dynp, Pelt])
def test_pass_param_to_cost(signal_bkps_1D, algo):
signal, bkps = signal_bkps_1D
algo(model="ar", params={"order": 10}).fit_predict(signal, 1)
signal, _ = signal_bkps_1D
algo_t = algo(model="ar", params={"order": 10})
ret = algo_t.fit_predict(signal, 1)
if isinstance(algo_t, Pelt):
assert len(ret) >= 2
else:
assert len(ret) == 2


@pytest.mark.parametrize(
Expand All @@ -117,9 +158,12 @@ def test_pass_param_to_cost(signal_bkps_1D, algo):
)
def test_cython_dynp_1D_linear(signal_bkps_1D, algo, kernel, min_size):
signal, bkps = signal_bkps_1D
algo(kernel=kernel, min_size=min_size, jump=1).fit(signal).predict(
n_bkps=len(bkps) - 1
ret = (
algo(kernel=kernel, min_size=min_size, jump=1)
.fit(signal)
.predict(n_bkps=len(bkps) - 1)
)
assert len(ret) == len(bkps)


@pytest.mark.parametrize(
Expand All @@ -128,9 +172,12 @@ def test_cython_dynp_1D_linear(signal_bkps_1D, algo, kernel, min_size):
)
def test_cython_dynp_5D_linear(signal_bkps_5D, algo, kernel, min_size):
signal, bkps = signal_bkps_5D
algo(kernel=kernel, min_size=min_size, jump=1).fit(signal).predict(
n_bkps=len(bkps) - 1
ret = (
algo(kernel=kernel, min_size=min_size, jump=1)
.fit(signal)
.predict(n_bkps=len(bkps) - 1)
)
assert len(ret) == len(bkps)


@pytest.mark.parametrize(
Expand All @@ -139,9 +186,12 @@ def test_cython_dynp_5D_linear(signal_bkps_5D, algo, kernel, min_size):
)
def test_cython_dynp_1D_rbf(signal_bkps_1D, algo, kernel, min_size):
signal, bkps = signal_bkps_1D
algo(kernel=kernel, min_size=min_size, jump=1, params={"gamma": 1.5}).fit(
signal
).predict(n_bkps=len(bkps) - 1)
ret = (
algo(kernel=kernel, min_size=min_size, jump=1, params={"gamma": 1.5})
.fit(signal)
.predict(n_bkps=len(bkps) - 1)
)
assert len(ret) == len(bkps)


@pytest.mark.parametrize(
Expand All @@ -150,9 +200,12 @@ def test_cython_dynp_1D_rbf(signal_bkps_1D, algo, kernel, min_size):
)
def test_cython_dynp_5D_rbf(signal_bkps_5D, algo, kernel, min_size):
signal, bkps = signal_bkps_5D
algo(kernel=kernel, min_size=min_size, jump=1, params={"gamma": 1.5}).fit(
signal
).predict(n_bkps=len(bkps) - 1)
ret = (
algo(kernel=kernel, min_size=min_size, jump=1, params={"gamma": 1.5})
.fit(signal)
.predict(n_bkps=len(bkps) - 1)
)
assert len(ret) == len(bkps)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -213,3 +266,20 @@ def test_cython_dynp_5D_no_noise_rbf(signal_bkps_5D_no_noise, algo, kernel, min_
.predict(n_bkps=len(bkps) - 1)
)
assert res == bkps


# Exhaustive test of KernelCPD
@pytest.mark.parametrize(
"algo, kernel",
product([KernelCPD], ["linear", "rbf", "cosine"]),
)
def test_kernelcpd(signal_bkps_5D, algo, kernel):
signal, bkps = signal_bkps_5D
# Test we do not compute if intermediary results exist
algo_temp = algo(kernel=kernel)
algo_temp.fit(signal).predict(n_bkps=len(bkps) - 1)
algo_temp.predict(n_bkps=1)
# Test penalized version
algo(kernel=kernel).fit(signal).predict(pen=0.2)
# Test fit_predict
algo(kernel=kernel).fit_predict(signal, pen=0.2)