Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ decouple stride + support setpoints #74

Merged
merged 5 commits into from
Aug 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 129 additions & 37 deletions tests/test_features_feature_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,48 @@ def test_single_series_feature_collection(dummy_data):
assert all(res_df.index[1:] - res_df.index[:-1] == pd.to_timedelta(5, unit="s"))


def test_single_series_feature_collection_strides(dummy_data):
stride = "5s"
fd1 = FeatureDescriptor(np.sum, series_name="EDA", window="10s")
fd2 = FeatureDescriptor(np.sum, series_name="EDA", window="10s", stride='20s')
fd3 = FeatureDescriptor(np.sum, series_name="EDA", window="10s", stride=stride)
fc1 = FeatureCollection(feature_descriptors=fd1)
fc2 = FeatureCollection(feature_descriptors=fd2)
fc3 = FeatureCollection(feature_descriptors=fd3)

assert fc1.get_required_series() == fc2.get_required_series()
assert fc1.get_required_series() == fc3.get_required_series()

res1 = fc1.calculate(dummy_data, stride=stride, return_df=False, n_jobs=1)
res2 = fc2.calculate(dummy_data, stride=stride, return_df=False, n_jobs=1)
res3 = fc3.calculate(dummy_data, return_df=False, n_jobs=1)

assert (len(res1) == 1) & (len(res2) == 1) & (len(res3) == 1)

assert_frame_equal(res1[0], res2[0])
assert_frame_equal(res1[0], res3[0])


def test_single_series_feature_collection_setpoints(dummy_data):
setpoints = [0, 5, 7, 10]
setpoints = dummy_data.index[setpoints].values
fd1 = FeatureDescriptor(np.sum, series_name="EDA", window="10s")
fd2 = FeatureDescriptor(np.sum, series_name="EDA", window="10s", stride='20s')
fc1 = FeatureCollection(feature_descriptors=fd1)
fc2 = FeatureCollection(feature_descriptors=fd2)

assert fc1.get_required_series() == fc2.get_required_series()

res1 = fc1.calculate(dummy_data, setpoints=setpoints, window_idx="begin")
res2 = fc2.calculate(dummy_data, setpoints=setpoints, window_idx="begin")

assert (len(res1) == 1) & (len(res2) == 1)
assert (len(res1[0]) == 4) & (len(res2[0]) == 4)

assert_frame_equal(res1[0], res2[0])
assert all(res1[0].index.values == setpoints)


def test_uneven_sampled_series_feature_collection(dummy_data):
fd = FeatureDescriptor(
function=np.sum,
Expand Down Expand Up @@ -145,15 +187,15 @@ def corr(s1, s2):
fc_str: str = fc.__repr__()
assert "EDA|TMP" in fc_str
assert (
fc_str
== "EDA|TMP: (\n\twin: 30s , stride: 30s: [\n\t\tFeatureDescriptor - func: FuncWrapper(corr, ['corrcoef'], {}),\n\t]\n)\n"
fc_str
== "EDA|TMP: (\n\twin: 30s : [\n\t\tFeatureDescriptor - func: FuncWrapper(corr, ['corrcoef'], {}) stride: ['30s'],\n\t]\n)\n"
)

out = fc.calculate(dummy_data, n_jobs=1, return_df=True)
assert out.columns[0] == "EDA|TMP__corrcoef__w=30s_s=30s"
assert out.columns[0] == "EDA|TMP__corrcoef__w=30s"

out = fc.calculate(dummy_data, n_jobs=None, return_df=True)
assert out.columns[0] == "EDA|TMP__corrcoef__w=30s_s=30s"
assert out.columns[0] == "EDA|TMP__corrcoef__w=30s"


def test_window_idx_single_series_feature_collection(dummy_data):
Expand Down Expand Up @@ -228,12 +270,12 @@ def sum_func(sig: np.ndarray) -> float:
assert set(res_list_names) == set(res_df.columns)
expected_output_names = [
[
f"{sig}__sum_func__w=5s_s=2.5s",
f"{sig}__sum_func__w=7.5s_s=2.5s",
f"{sig}__amax__w=5s_s=2.5s",
f"{sig}__amax__w=7.5s_s=2.5s",
f"{sig}__amin__w=5s_s=2.5s",
f"{sig}__amin__w=7.5s_s=2.5s",
f"{sig}__sum_func__w=5s",
f"{sig}__sum_func__w=7.5s",
f"{sig}__amax__w=5s",
f"{sig}__amax__w=7.5s",
f"{sig}__amin__w=5s",
f"{sig}__amin__w=7.5s",
]
for sig in ["EDA", "TMP"]
]
Expand Down Expand Up @@ -269,6 +311,26 @@ def sum_func(sig: np.ndarray) -> float:
assert len(res_df) == expected_length


def test_multiplefeaturedescriptors_feature_collection_strides(dummy_data):
stride= "2.5s"
mfd1 = MultipleFeatureDescriptors([np.max, np.min], ["EDA", "TMP"], ["5s", "7.5s"])
mfd2 = MultipleFeatureDescriptors([np.max, np.min], ["EDA", "TMP"], ["5s", "7.5s"], strides=["5s"])#, "10s"]) # TODO: list of strides supporten...
mfd3 = MultipleFeatureDescriptors([np.max, np.min], ["EDA", "TMP"], ["5s", "7.5s"], strides=stride)
fc1 = FeatureCollection(mfd1)
fc2 = FeatureCollection(mfd2)
fc3 = FeatureCollection(mfd3)

assert fc1.get_required_series() == fc2.get_required_series()
assert fc1.get_required_series() == fc3.get_required_series()

res1 = fc1.calculate(dummy_data, stride=stride, return_df=True, n_jobs=0)
res2 = fc2.calculate(dummy_data, stride=stride, return_df=True, n_jobs=0)
res3 = fc3.calculate(dummy_data, return_df=True, n_jobs=0)

assert_frame_equal(res1, res2)
assert_frame_equal(res1, res3)


def test_featurecollection_feature_collection(dummy_data):
fd = FeatureDescriptor(
function=np.sum,
Expand Down Expand Up @@ -493,9 +555,9 @@ def quantiles(sig: pd.Series) -> Tuple[float, float, float]:
(int(len(dummy_data) / (1 / freq)) - window_s) / stride_s)

expected_output_names = [
"EDA__q_0.1__w=5s_s=2.5s",
"EDA__q_0.5__w=5s_s=2.5s",
"EDA__q_0.9__w=5s_s=2.5s",
"EDA__q_0.1__w=5s",
"EDA__q_0.5__w=5s",
"EDA__q_0.9__w=5s",
]
assert set(res_df.columns.values) == set(expected_output_names)
assert (res_df[expected_output_names[0]] != res_df[expected_output_names[1]]).any()
Expand All @@ -522,7 +584,7 @@ def abs_mean_diff(sig1: pd.Series, sig2: pd.Series) -> float:
assert len(res_df) == math.ceil(
(int(len(dummy_data) / (1 / freq)) - window_s) / stride_s)

expected_output_name = "EDA|TMP__abs_mean_diff__w=5s_s=2.5s"
expected_output_name = "EDA|TMP__abs_mean_diff__w=5s"
assert res_df.columns.values[0] == expected_output_name


Expand Down Expand Up @@ -552,9 +614,9 @@ def quantiles_abs_diff(
(int(len(dummy_data) / (1 / freq)) - window_s) / stride_s)

expected_output_names = [
"EDA|TMP__q_0.1_abs_diff__w=5s_s=13.5s",
"EDA|TMP__q_0.5_abs_diff__w=5s_s=13.5s",
"EDA|TMP__q_0.9_abs_diff__w=5s_s=13.5s",
"EDA|TMP__q_0.1_abs_diff__w=5s",
"EDA|TMP__q_0.5_abs_diff__w=5s",
"EDA|TMP__q_0.9_abs_diff__w=5s",
]
assert set(res_df.columns.values) == set(expected_output_names)
assert (res_df[expected_output_names[0]] != res_df[expected_output_names[1]]).any()
Expand Down Expand Up @@ -680,18 +742,13 @@ def linear_trend_timewise(x):
(int(len(dummy_data) / downscale_factor / (1 / freq)) - window_s) / stride_s
)

expected_output_names = [
"EDA|TMP__q_0.1_abs_diff__w=5s_s=2.5s",
"EDA|TMP__q_0.5_abs_diff__w=5s_s=2.5s",
"EDA|TMP__q_0.9_abs_diff__w=5s_s=2.5s",
]
assert "EDA__min_time_diff__w=5s_s=2.5s" in res_df.columns
assert "EDA__amax__w=5s_s=2.5s" in res_df.columns
assert "EDA__min_time_diff__w=5s" in res_df.columns
assert "EDA__amax__w=5s" in res_df.columns
assert all(
res_df["EDA__min_time_diff__w=5s_s=2.5s"]
== res_df["EDA__max_time_diff__w=5s_s=2.5s"]
res_df["EDA__min_time_diff__w=5s"]
== res_df["EDA__max_time_diff__w=5s"]
)
assert all(res_df["EDA__min_time_diff__w=5s_s=2.5s"] == 0.25 * 3)
assert all(res_df["EDA__min_time_diff__w=5s"] == 0.25 * 3)


def test_categorical_funcs():
Expand Down Expand Up @@ -730,7 +787,7 @@ def count_categories(arr, categories):
data=categorical_data, approve_sparsity=True, n_jobs=n_jobs, return_df=True
)
for c in categories:
assert f"cat__count-{str(c)}__w=1D_s=12h" in out.columns
assert f"cat__count-{str(c)}__w=1D" in out.columns


def test_time_based_features():
Expand Down Expand Up @@ -770,12 +827,12 @@ def std_hour(time_arr):
out = fc.calculate(
data=time_value_series, approve_sparsity=True, n_jobs=1, return_df=True
)
assert out.columns[0] == "time__std_hour__w=6h_s=4h"
assert out.columns[0] == "time__std_hour__w=6h"

out = fc.calculate(
data=time_value_series, approve_sparsity=True, n_jobs=None, return_df=True
)
assert out.columns[0] == "time__std_hour__w=6h_s=4h"
assert out.columns[0] == "time__std_hour__w=6h"


def test_pass_by_value(dummy_data):
Expand Down Expand Up @@ -939,10 +996,9 @@ def sum_mean(x, axis):
res = fc.calculate(dummy_data, return_df=True)

assert res.shape[1] == 4
s = "EDA__";
p = "__w=1000_s=300"
assert np.all(res[s + "sum" + p].values == res[s + "sum_vect" + p].values)
assert np.all(res[s + "mean" + p].values == res[s + "mean_vect" + p].values)
s = "EDA__"; p = "__w=1000"
assert np.all(res[s+"sum"+p].values == res[s+"sum_vect"+p].values)
assert np.all(res[s+"mean"+p].values == res[s+"mean_vect"+p].values)


def test_multiple_inputs_vectorized_features(dummy_data):
Expand All @@ -964,9 +1020,9 @@ def windowed_diff(x1, x2):

assert res.shape[1] == 3
assert res.shape[0] > 1
p = "__w=5m_s=2m30s"
manual_diff = res["EDA__sum" + p].values - res["TMP__sum" + p].values
assert np.all(res["EDA|TMP__windowed_diff" + p].values == manual_diff)
p = "__w=5m"
manual_diff = res["EDA__sum"+p].values - res["TMP__sum"+p].values
assert np.all(res["EDA|TMP__windowed_diff"+p].values == manual_diff)


### Test feature extraction length
Expand Down Expand Up @@ -1104,6 +1160,15 @@ def test_type_error_add_feature_collection(dummy_data):
fc.add(np.sum)


def test_error_add_feature_collection_same_func_window(dummy_data):
fd = FeatureDescriptor(np.sum, "EDA", window="5s", stride="2.5s")
fc = FeatureCollection(feature_descriptors=fd)
fd2 = FeatureDescriptor(np.sum, "EDA", window="5s", stride="8s")

with pytest.raises(Exception):
fc.add(fd2)


def test_one_to_many_error_feature_collection(dummy_data):
def quantiles(sig: pd.Series) -> Tuple[float, float, float]:
return np.quantile(sig, q=[0.1, 0.5, 0.9])
Expand Down Expand Up @@ -1357,3 +1422,30 @@ def test_vectorized_irregularly_sampled_data(dummy_data):
# -> is a strict requirement to apply a vectorized feature function
with pytest.raises(Exception):
fc.calculate(df_eda)


def test_vectorized_multiple_asynchronous_strides(dummy_data):
fc = FeatureCollection(
feature_descriptors=FeatureDescriptor(
FuncWrapper(np.std, vectorized=True, axis=1),
"EDA", window="5min", stride=["3s", "5s"]
)
)

df_eda = dummy_data["EDA"].dropna()

# Fails bc of multiple asynchronous strides (resulting in different step sizes between the windows)
# -> is a strict requirement to apply a vectorized feature function
with pytest.raises(Exception):
fc.calculate(df_eda)


def test_error_pass_stride_and_setpoints_calculate(dummy_data):
setpoints = [0, 5, 7, 10]
setpoints = dummy_data.index[setpoints].values
fc = FeatureCollection(
FeatureDescriptor(np.min, "EDA", window="5min", stride="3min")
)

with pytest.raises(Exception):
fc.calculate(dummy_data["EDA"], stride="3min", setpoints=setpoints)
45 changes: 40 additions & 5 deletions tests/test_features_feature_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from tsflex.features import FuncWrapper
from tsflex.features import FeatureDescriptor, MultipleFeatureDescriptors
from tsflex.utils.data import flatten

### FeatureDescriptor

Expand All @@ -24,7 +25,7 @@ def sum_func(sig: np.ndarray) -> float:

assert fd.series_name == tuple(["EDA"])
assert fd.window == pd.Timedelta(5, unit='seconds')
assert fd.stride == pd.Timedelta(2.5, unit='seconds')
assert fd.stride == [pd.Timedelta(2.5, unit='seconds')]
assert fd.get_required_series() == ["EDA"]
assert isinstance(fd.function, FuncWrapper)

Expand All @@ -39,11 +40,45 @@ def test_simple_raw_np_func_feature_descriptor():

assert fd.series_name == tuple(["EDA"])
assert fd.window == pd.Timedelta(5, unit='seconds')
assert fd.stride == pd.Timedelta(2.5, unit='seconds')
assert fd.stride == [pd.Timedelta(2.5, unit='seconds')]
assert fd.get_required_series() == ["EDA"]
assert isinstance(fd.function, FuncWrapper)


def test_simple_feature_descriptor_optional_stride():
def sum_func(sig: np.ndarray) -> float:
return sum(sig)

fd = FeatureDescriptor(
function=sum_func,
series_name="EDA",
window='5s',
)

assert fd.series_name == tuple(["EDA"])
assert fd.window == pd.Timedelta(5, unit='seconds')
assert fd.stride == None
assert fd.get_required_series() == ["EDA"]
assert isinstance(fd.function, FuncWrapper)


def test_simple_feature_descriptor_multiple_strides():
def sum_func(sig: np.ndarray) -> float:
return sum(sig)

fd = FeatureDescriptor(
function=sum_func,
series_name="EDA",
window='5s',
stride=['3s', '5s'],
)

assert fd.series_name == tuple(["EDA"])
assert fd.window == pd.Timedelta(5, unit='seconds')
assert fd.stride == [pd.Timedelta(3, unit='seconds'), pd.Timedelta(5, unit='seconds')]
assert fd.get_required_series() == ["EDA"]
assert isinstance(fd.function, FuncWrapper)

# TODO -> add new test in which floats represent the float position

def test_simple_feature_descriptor_str_str_seconds():
Expand All @@ -59,7 +94,7 @@ def sum_func(sig: np.ndarray) -> float:

assert fd.series_name == tuple(["EDA"])
assert fd.window == pd.Timedelta(5, unit='seconds')
assert fd.stride == pd.Timedelta(3, unit='seconds')
assert fd.stride == [pd.Timedelta(3, unit='seconds')]
assert fd.get_required_series() == ["EDA"]
assert isinstance(fd.function, FuncWrapper)

Expand All @@ -79,7 +114,7 @@ def sum_func(sig: np.ndarray) -> float:

assert fd.series_name == tuple(["EDA"])
assert fd.window == pd.Timedelta(5, unit='seconds')
assert fd.stride == pd.Timedelta(2.5, unit='seconds')
assert fd.stride == [pd.Timedelta(2.5, unit='seconds')]
assert fd.get_required_series() == ["EDA"]
assert isinstance(fd.function, FuncWrapper)

Expand Down Expand Up @@ -145,7 +180,7 @@ def sum_func(sig: np.ndarray) -> float:
assert sum([el == pd.Timedelta(seconds=5) for el in windows]) == 3*2
assert sum([el == pd.Timedelta(seconds=7.5) for el in windows]) == 3*2

strides = [fd.stride for fd in mfd.feature_descriptions]
strides = flatten([fd.stride for fd in mfd.feature_descriptions])
assert (set(strides) == set([pd.Timedelta(seconds=2.5)]))

functions = [fd.function for fd in mfd.feature_descriptions]
Expand Down
Loading