Skip to content

Optimize TSDataset.describe and TSDataset.info by vectorization #1344

Merged
merged 5 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Speed up metrics computation by optimizing segment validation, forbid NaNs during metrics computation ([#1338](https://github.com/tinkoff-ai/etna/pull/1338))
- Unify errors, warnings and checks in models ([#1312](https://github.com/tinkoff-ai/etna/pull/1312))
- Remove upper limitation on version of numba ([#1321](https://github.com/tinkoff-ai/etna/pull/1321))
- Optimize `TSDataset.describe` and `TSDataset.info` by vectorization ([#1344](https://github.com/tinkoff-ai/etna/pull/1344))

### Fixed
- Pipeline ensembles fail in `etna forecast` CLI ([#1331](https://github.com/tinkoff-ai/etna/pull/1331))
Expand Down
52 changes: 32 additions & 20 deletions etna/datasets/tsdataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,25 +1320,37 @@ def _gather_common_data(self) -> Dict[str, Any]:

return common_dict

def _gather_segments_data(self, segments: Sequence[str]) -> Dict[str, List[Any]]:
def _gather_segments_data(self, segments: Optional[Sequence[str]]) -> Dict[str, pd.Series]:
"""Gather information about each segment."""
# gather segment information
segments_dict: Dict[str, list] = {
"start_timestamp": [],
"end_timestamp": [],
"length": [],
"num_missing": [],
}
segments_index: Union[slice, Sequence[str]]
if segments is None:
segments_index = slice(None)
segments = self.segments
else:
segments_index = segments
segments = segments

df = self.df.loc[:, (segments_index, "target")]

num_timestamps = df.shape[0]
not_na = ~np.isnan(df.values)
min_idx = np.argmax(not_na, axis=0)
max_idx = num_timestamps - np.argmax(not_na[::-1, :], axis=0) - 1

segments_dict = {}
segments_dict["start_timestamp"] = df.index[min_idx].to_series(index=segments)
segments_dict["end_timestamp"] = df.index[max_idx].to_series(index=segments)
segments_dict["length"] = pd.Series(max_idx - min_idx + 1, dtype="Int64", index=segments)
segments_dict["num_missing"] = pd.Series(
segments_dict["length"] - np.sum(not_na, axis=0), dtype="Int64", index=segments
)

for segment in segments:
segment_series = self[:, segment, "target"]
first_index = segment_series.first_valid_index()
last_index = segment_series.last_valid_index()
segment_series = segment_series.loc[first_index:last_index]
segments_dict["start_timestamp"].append(first_index)
segments_dict["end_timestamp"].append(last_index)
segments_dict["length"].append(segment_series.shape[0])
segments_dict["num_missing"].append(pd.isna(segment_series).sum())
# handle all-nans series
all_nans_mask = np.all(~not_na, axis=0)
segments_dict["start_timestamp"][all_nans_mask] = None
segments_dict["end_timestamp"][all_nans_mask] = None
segments_dict["length"][all_nans_mask] = None
segments_dict["num_missing"][all_nans_mask] = None

return segments_dict

Expand Down Expand Up @@ -1400,15 +1412,15 @@ def describe(self, segments: Optional[Sequence[str]] = None) -> pd.DataFrame:
segment_0 2021-06-01 2021-06-30 30 0 2 1 1 1 D
segment_1 2021-06-01 2021-06-30 30 0 2 1 1 1 D
"""
if segments is None:
segments = self.segments

# gather common information
common_dict = self._gather_common_data()

# gather segment information
segments_dict = self._gather_segments_data(segments)

if segments is None:
segments = self.segments

# combine information
segments_dict["num_segments"] = [common_dict["num_segments"]] * len(segments)
segments_dict["num_exogs"] = [common_dict["num_exogs"]] * len(segments)
Expand Down
69 changes: 43 additions & 26 deletions tests/test_datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,31 @@ def df_and_regressors() -> Tuple[pd.DataFrame, pd.DataFrame, List[str]]:
return df, df_exog, ["regressor_1", "regressor_2"]


@pytest.fixture
def ts_info() -> TSDataset:
timestamp = pd.date_range("2021-01-01", "2021-02-01")
df_1 = pd.DataFrame({"timestamp": timestamp, "target": 11, "segment": "1"})
df_2 = pd.DataFrame({"timestamp": timestamp[5:], "target": 12, "segment": "2"})
df_3 = pd.DataFrame({"timestamp": timestamp, "target": np.NaN, "segment": "3"})
df = pd.concat([df_1, df_2, df_3], ignore_index=True)
df = TSDataset.to_dataset(df)

timestamp = pd.date_range("2020-12-01", "2021-02-11")
df_1 = pd.DataFrame({"timestamp": timestamp, "regressor_1": 1, "regressor_2": 2, "segment": "1"})
df_2 = pd.DataFrame({"timestamp": timestamp[5:], "regressor_1": 3, "regressor_2": 4, "segment": "2"})
df_3 = pd.DataFrame({"timestamp": timestamp, "regressor_1": 5, "regressor_2": 6, "segment": "3"})
df_exog = pd.concat([df_1, df_2, df_3], ignore_index=True)
df_exog = TSDataset.to_dataset(df_exog)

# add NaN in the middle
df.iloc[-5, 0] = np.NaN
# add NaNs at the end
df.iloc[-3:, 1] = np.NaN

ts = TSDataset(df=df, df_exog=df_exog, freq="D", known_future=["regressor_1", "regressor_2"])
return ts


@pytest.fixture
def df_update_add_column() -> pd.DataFrame:
timestamp = pd.date_range("2021-01-01", "2021-02-12")
Expand Down Expand Up @@ -848,61 +873,53 @@ def test_fit_transform_raise_warning_on_diff_endings(ts_diff_endings):
ts_diff_endings.fit_transform([])


def test_gather_common_data(df_and_regressors):
def test_gather_common_data(ts_info):
"""Check that TSDataset._gather_common_data correctly finds common data for info/describe methods."""
df, df_exog, known_future = df_and_regressors
ts = TSDataset(df=df, df_exog=df_exog, freq="D", known_future=known_future)
common_data = ts._gather_common_data()
assert common_data["num_segments"] == 2
common_data = ts_info._gather_common_data()
assert common_data["num_segments"] == 3
assert common_data["num_exogs"] == 2
assert common_data["num_regressors"] == 2
assert common_data["num_known_future"] == 2
assert common_data["freq"] == "D"


def test_gather_segments_data(df_and_regressors):
def test_gather_segments_data(ts_info):
"""Check that TSDataset._gather_segments_data correctly finds segment data for info/describe methods."""
df, df_exog, known_future = df_and_regressors
# add NaN in the middle
df.iloc[-5, 0] = np.NaN
# add NaNs at the end
df.iloc[-3:, 1] = np.NaN
ts = TSDataset(df=df, df_exog=df_exog, freq="D", known_future=known_future)
segments = ts.segments
segments_dict = ts._gather_segments_data(segments)
segment_df = pd.DataFrame(segments_dict, index=segments)
segments_dict = ts_info._gather_segments_data(ts_info.segments)
segment_df = pd.DataFrame(segments_dict, index=ts_info.segments)

assert np.all(segment_df.index == ts.segments)
assert segment_df.loc["1", "start_timestamp"] == pd.Timestamp("2021-01-01")
assert segment_df.loc["2", "start_timestamp"] == pd.Timestamp("2021-01-06")
assert segment_df.loc["3", "start_timestamp"] is pd.NaT
assert segment_df.loc["1", "end_timestamp"] == pd.Timestamp("2021-02-01")
assert segment_df.loc["2", "end_timestamp"] == pd.Timestamp("2021-01-29")
assert segment_df.loc["3", "end_timestamp"] is pd.NaT
assert segment_df.loc["1", "length"] == 32
assert segment_df.loc["2", "length"] == 24
assert segment_df.loc["3", "length"] is pd.NA
assert segment_df.loc["1", "num_missing"] == 1
assert segment_df.loc["2", "num_missing"] == 0
assert segment_df.loc["3", "num_missing"] is pd.NA


def test_describe(df_and_regressors):
def test_describe(ts_info):
"""Check that TSDataset.describe works correctly."""
df, df_exog, known_future = df_and_regressors
# add NaN in the middle
df.iloc[-5, 0] = np.NaN
# add NaNs at the end
df.iloc[-3:, 1] = np.NaN
ts = TSDataset(df=df, df_exog=df_exog, freq="D", known_future=known_future)
description = ts.describe()
description = ts_info.describe()

assert np.all(description.index == ts.segments)
assert np.all(description.index == ts_info.segments)
assert description.loc["1", "start_timestamp"] == pd.Timestamp("2021-01-01")
assert description.loc["2", "start_timestamp"] == pd.Timestamp("2021-01-06")
assert description.loc["3", "start_timestamp"] is pd.NaT
assert description.loc["1", "end_timestamp"] == pd.Timestamp("2021-02-01")
assert description.loc["2", "end_timestamp"] == pd.Timestamp("2021-01-29")
assert description.loc["3", "end_timestamp"] is pd.NaT
assert description.loc["1", "length"] == 32
assert description.loc["2", "length"] == 24
assert description.loc["3", "length"] is pd.NA
assert description.loc["1", "num_missing"] == 1
assert description.loc["2", "num_missing"] == 0
assert np.all(description["num_segments"] == 2)
assert description.loc["3", "num_missing"] is pd.NA
assert np.all(description["num_segments"] == 3)
assert np.all(description["num_exogs"] == 2)
assert np.all(description["num_regressors"] == 2)
assert np.all(description["num_known_future"] == 2)
Expand Down