diff --git a/etna/models/nn/deepar.py b/etna/models/nn/deepar.py new file mode 100644 index 000000000..c76afb4c2 --- /dev/null +++ b/etna/models/nn/deepar.py @@ -0,0 +1,140 @@ +from typing import List +from typing import Optional +from typing import Union + +import pandas as pd +import pytorch_lightning as pl +from pytorch_forecasting.data import TimeSeriesDataSet +from pytorch_forecasting.models import DeepAR + +from etna.datasets.tsdataset import TSDataset +from etna.models.base import Model + + +class DeepARModel(Model): + """Wrapper for DeepAR from Pytorch Forecasting library. + Notes + ----- + We save TimeSeriesDataSet in instance to use it in the model. + It`s not right pattern of using Transforms and TSDataset. + """ + + def __init__( + self, + batch_size: int = 64, + context_length: Optional[int] = None, + max_epochs: int = 10, + gpus: Union[int, List[int]] = 0, + gradient_clip_val: float = 0.1, + learning_rate: List[float] = [0.001], + cell_type: str = "LSTM", + hidden_size: int = 10, + rnn_layers: int = 2, + dropout: float = 0.1, + ): + """ + Initialize DeepAR wrapper. + + Parameters + ---------- + batch_size: + Batch size. + context_length: + Max encoder length, if None max encoder length is equal to 2 horizons. + max_epochs: + Max epochs. + gpus: + 0 - is CPU, or [n_{i}] - to choose n_{i} GPU from cluster. + gradient_clip_val: + Cliping by norm is using, choose 0 to not clip. + learning_rate: + Learning rate. + cell_type: + One of 'LSTM', 'GRU'. + hidden_size: + Hidden size of network which can range from 8 to 512. + rnn_layers: + Number of LSTM layers. + dropout: + Dropout rate. + """ + self.max_epochs = max_epochs + self.gpus = gpus + self.gradient_clip_val = gradient_clip_val + self.learning_rate = learning_rate + self.batch_size = batch_size + self.context_length = context_length + self.cell_type = cell_type + self.hidden_size = hidden_size + self.rnn_layers = rnn_layers + self.dropout = dropout + + def _from_dataset(self, ts_dataset: TimeSeriesDataSet) -> DeepAR: + """ + Construct DeepAR. + + Returns + ------- + DeepAR + Class instance. + """ + return DeepAR.from_dataset( + ts_dataset, + learning_rate=self.learning_rate, + cell_type=self.cell_type, + hidden_size=self.hidden_size, + rnn_layers=self.rnn_layers, + dropout=self.dropout, + ) + + def fit(self, ts: TSDataset) -> "DeepARModel": + """ + Fit model. + + Parameters + ---------- + ts: + TSDataset to fit. + + Returns + ------- + DeepARModel + """ + self.model = self._from_dataset(ts.transforms[-1].pf_dataset_train) + + self.trainer = pl.Trainer( + logger=False, + max_epochs=self.max_epochs, + gpus=self.gpus, + checkpoint_callback=False, + gradient_clip_val=self.gradient_clip_val, + ) + + train_dataloader = ts.transforms[-1].pf_dataset_train.to_dataloader(train=True, batch_size=self.batch_size) + + self.trainer.fit(self.model, train_dataloader) + + return self + + def forecast(self, ts: TSDataset) -> TSDataset: + """ + Predict future. + + Parameters + ---------- + ts: + TSDataset to forecast. + + Returns + ------- + TSDataset + TSDataset with predictions. + """ + prediction_dataloader = ts.transforms[-1].pf_dataset_predict.to_dataloader( + train=False, batch_size=self.batch_size * 2 + ) + + predicts = self.model.predict(prediction_dataloader).numpy() # shape (segments, encoder_lenght) + + ts.loc[:, pd.IndexSlice[:, "target"]] = predicts.T[-len(ts.df) :] + return ts diff --git a/etna/models/nn/tft.py b/etna/models/nn/tft.py new file mode 100644 index 000000000..8c294f33b --- /dev/null +++ b/etna/models/nn/tft.py @@ -0,0 +1,148 @@ +from typing import List +from typing import Optional +from typing import Union + +import pandas as pd +import pytorch_lightning as pl +from pytorch_forecasting.data import TimeSeriesDataSet +from pytorch_forecasting.models import TemporalFusionTransformer + +from etna.datasets.tsdataset import TSDataset +from etna.models.base import Model + + +class TFTModel(Model): + """Wrapper for TemporalFusionTransformer from Pytorch Forecasting library. + Notes + ----- + We save TimeSeriesDataSet in instance to use it in the model. + It`s not right pattern of using Transforms and TSDataset. + """ + + def __init__( + self, + max_epochs: int = 10, + gpus: Union[int, List[int]] = 0, + gradient_clip_val: float = 0.1, + learning_rate: List[float] = [0.001], + batch_size: int = 64, + context_length: Optional[int] = None, + hidden_size: int = 16, + lstm_layers: int = 1, + attention_head_size: int = 4, + dropout: float = 0.1, + hidden_continuous_size: int = 8, + *args, + **kwargs, + ): + """ + Initialize TFT wrapper. + + Parameters + ---------- + batch_size: + Batch size. + context_length: + Max encoder length, if None max encoder length is equal to 2 horizons. + max_epochs: + Max epochs. + gpus: + 0 - is CPU, or [n_{i}] - to choose n_{i} GPU from cluster. + gradient_clip_val: + Cliping by norm is using, choose 0 to not clip. + learning_rate: + Learning rate. + hidden_size: + Hidden size of network which can range from 8 to 512. + lstm_layers: + Number of LSTM layers. + attention_head_size: + Number of attention heads. + dropout: + Dropout rate. + hidden_continuous_size: + Hidden size for processing continous variables. + """ + self.max_epochs = max_epochs + self.gpus = gpus + self.gradient_clip_val = gradient_clip_val + self.learning_rate = learning_rate + self.horizon = None + self.batch_size = batch_size + self.context_length = context_length + self.hidden_size = hidden_size + self.lstm_layers = lstm_layers + self.attention_head_size = attention_head_size + self.dropout = dropout + self.hidden_continuous_size = hidden_continuous_size + + def _from_dataset(self, ts_dataset: TimeSeriesDataSet) -> TemporalFusionTransformer: + """ + Construct TemporalFusionTransformer. + + Returns + ------- + TemporalFusionTransformer + Class instance. + """ + return TemporalFusionTransformer.from_dataset( + ts_dataset, + learning_rate=self.learning_rate, + hidden_size=self.hidden_size, + lstm_layers=self.lstm_layers, + attention_head_size=self.attention_head_size, + dropout=self.dropout, + hidden_continuous_size=self.hidden_continuous_size, + ) + + def fit(self, ts: TSDataset) -> "TFTModel": + """ + Fit model. + + Parameters + ---------- + ts: + TSDataset to fit. + + Returns + ------- + TFTModel + """ + self.model = self._from_dataset(ts.transforms[-1].pf_dataset_train) + + self.trainer = pl.Trainer( + logger=False, + max_epochs=self.max_epochs, + gpus=self.gpus, + checkpoint_callback=False, + gradient_clip_val=self.gradient_clip_val, + ) + + train_dataloader = ts.transforms[-1].pf_dataset_train.to_dataloader(train=True, batch_size=self.batch_size) + + self.trainer.fit(self.model, train_dataloader) + + return self + + def forecast(self, ts: TSDataset) -> pd.DataFrame: + """ + Predict future. + + Parameters + ---------- + ts: + TSDataset to forecast. + + Returns + ------- + TSDataset + TSDataset with predictions. + """ + prediction_dataloader = ts.transforms[-1].pf_dataset_predict.to_dataloader( + train=False, batch_size=self.batch_size * 2 + ) + + predicts = self.model.predict(prediction_dataloader).numpy() # shape (segments, encoder_lenght) + + ts.loc[:, pd.IndexSlice[:, "target"]] = predicts.T[-len(ts.df) :] + return ts diff --git a/etna/transforms/pytorch_forecasting.py b/etna/transforms/pytorch_forecasting.py new file mode 100644 index 000000000..5cab6c31d --- /dev/null +++ b/etna/transforms/pytorch_forecasting.py @@ -0,0 +1,174 @@ +import inspect +from typing import Dict +from typing import List +from typing import Tuple +from typing import Union + +import pandas as pd +from pytorch_forecasting.data import TimeSeriesDataSet +from pytorch_forecasting.data.encoders import EncoderNormalizer +from pytorch_forecasting.data.encoders import NaNLabelEncoder +from pytorch_forecasting.data.encoders import TorchNormalizer +from sklearn.preprocessing import RobustScaler +from sklearn.preprocessing import StandardScaler + +from etna.datasets.tsdataset import TSDataset +from etna.transforms.base import Transform + +NORMALIZER = Union[TorchNormalizer, NaNLabelEncoder, EncoderNormalizer] + + +class PytorchForecastingTransform(Transform): + """Transform for models from PytorchForecasting library.""" + + def __init__( + self, + max_encoder_length: int = 30, + min_encoder_length: int = None, + min_prediction_idx: int = None, + min_prediction_length: int = None, + max_prediction_length: int = 1, + static_categoricals: List[str] = [], + static_reals: List[str] = [], + time_varying_known_categoricals: List[str] = [], + time_varying_known_reals: List[str] = [], + time_varying_unknown_categoricals: List[str] = [], + time_varying_unknown_reals: List[str] = [], + variable_groups: Dict[str, List[int]] = {}, + dropout_categoricals: List[str] = [], + constant_fill_strategy: Dict[str, Union[str, float, int, bool]] = {}, + allow_missings: bool = True, + lags: Dict[str, List[int]] = {}, + add_relative_time_idx: bool = True, + add_target_scales: bool = True, + add_encoder_length: Union[bool, str] = True, + target_normalizer: Union[NORMALIZER, str, List[NORMALIZER], Tuple[NORMALIZER]] = "auto", + categorical_encoders: Dict[str, NaNLabelEncoder] = None, + scalers: Dict[str, Union[StandardScaler, RobustScaler, TorchNormalizer, EncoderNormalizer]] = {}, + ): + """Parameters for TimeSeriesDataSet object. + + Reference + --------- + https://github.com/jdb78/pytorch-forecasting/blob/v0.8.5/pytorch_forecasting/data/timeseries.py#L117 + """ + super().__init__() + self.max_encoder_length = max_encoder_length + self.min_encoder_length = min_encoder_length + self.min_prediction_idx = min_prediction_idx + self.min_prediction_length = min_prediction_length + self.max_prediction_length = max_prediction_length + self.static_categoricals = static_categoricals + self.static_reals = static_reals + self.time_varying_known_categoricals = time_varying_known_categoricals + self.time_varying_known_reals = time_varying_known_reals + self.time_varying_unknown_categoricals = time_varying_unknown_categoricals + self.time_varying_unknown_reals = time_varying_unknown_reals + self.variable_groups = variable_groups + self.add_relative_time_idx = add_relative_time_idx + self.add_target_scales = add_target_scales + self.add_encoder_length = add_encoder_length + self.allow_missings = allow_missings + self.target_normalizer = target_normalizer + self.categorical_encoders = categorical_encoders + self.dropout_categoricals = dropout_categoricals + self.constant_fill_strategy = constant_fill_strategy + self.lags = lags + self.scalers = scalers + + def fit(self, df: pd.DataFrame) -> "PytorchForecastingTransform": + """ + Fit TimeSeriesDataSet. + + Parameters + ---------- + df: + data to be fitted. + + Returns + ------- + PytorchForecastingTransform + """ + ts = TSDataset(df, "1d") + self.freq = ts.freq + ts = ts.to_pandas(flatten=True) + ts = ts.dropna() + self.min_timestamp = ts.timestamp.min() + + if self.time_varying_known_categoricals: + for feature_name in self.time_varying_known_categoricals: + ts[feature_name] = ts[feature_name].astype(str) + + ts["time_idx"] = ts["timestamp"] - self.min_timestamp + ts["time_idx"] = ts["time_idx"].apply(lambda x: x / self.freq) + ts["time_idx"] = ts["time_idx"].astype(int) + + pf_dataset = TimeSeriesDataSet( + ts, + time_idx="time_idx", + target="target", + group_ids=["segment"], + time_varying_known_reals=self.time_varying_known_reals, + time_varying_known_categoricals=self.time_varying_known_categoricals, + time_varying_unknown_reals=self.time_varying_unknown_reals, + max_encoder_length=self.max_encoder_length, + max_prediction_length=self.max_prediction_length, + min_encoder_length=self.min_encoder_length, + min_prediction_length=self.min_prediction_length, + add_relative_time_idx=self.add_relative_time_idx, + add_target_scales=self.add_target_scales, + add_encoder_length=self.add_encoder_length, + allow_missings=self.allow_missings, + target_normalizer=self.target_normalizer, + static_categoricals=self.static_categoricals, + min_prediction_idx=self.min_prediction_idx, + variable_groups=self.variable_groups, + dropout_categoricals=self.dropout_categoricals, + constant_fill_strategy=self.constant_fill_strategy, + lags=self.lags, + scalers=self.scalers, + ) + + self.pf_dataset_params = pf_dataset.get_parameters() + + return self + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Transform raw df to TimeSeriesDataSet. + + Parameters + ---------- + df: + data to be transformed. + + Returns + ------- + DataFrame + + Notes + ----- + We save TimeSeriesDataSet in instance to use it in the model. + It`s not right pattern of using Transforms and TSDataset. + """ + ts = TSDataset(df, "1d") + ts = ts.to_pandas(flatten=True) + ts = ts[ts.timestamp >= self.min_timestamp] + ts = ts.fillna(0) + + ts["time_idx"] = ts["timestamp"] - self.min_timestamp + ts["time_idx"] = ts["time_idx"].apply(lambda x: x / self.freq) + ts["time_idx"] = ts["time_idx"].astype(int) + if self.time_varying_known_categoricals: + for feature_name in self.time_varying_known_categoricals: + ts[feature_name] = ts[feature_name].astype(str) + + if inspect.stack()[1].function == "make_future": + pf_dataset_predict = TimeSeriesDataSet.from_parameters( + self.pf_dataset_params, ts, predict=True, stop_randomization=True + ) + self.pf_dataset_predict = pf_dataset_predict + else: + pf_dataset_train = TimeSeriesDataSet.from_parameters(self.pf_dataset_params, ts) + self.pf_dataset_train = pf_dataset_train + return df diff --git a/tests/test_models/nn/conftest.py b/tests/test_models/nn/conftest.py new file mode 100644 index 000000000..2ef2f7ee7 --- /dev/null +++ b/tests/test_models/nn/conftest.py @@ -0,0 +1,17 @@ +import pandas as pd +import pytest + + +@pytest.fixture +def weekly_period_df(n_repeats=15): + segment_1 = [7.0, 7.0, 7.0, 7.0, 7.0, 3.0, 1.0] + segment_2 = [7.0, 7.0, 7.0, 4.0, 1.0, 7.0, 7.0] + ts_range = list(pd.date_range("2020-01-03", freq="1D", periods=n_repeats * len(segment_1))) + df = pd.DataFrame( + { + "timestamp": ts_range * 2, + "target": segment_1 * n_repeats + segment_2 * n_repeats, + "segment": ["segment_1"] * n_repeats * len(segment_1) + ["segment_2"] * n_repeats * len(segment_2), + } + ) + return df diff --git a/tests/test_models/nn/test_deepar.py b/tests/test_models/nn/test_deepar.py new file mode 100644 index 000000000..31ee3c6b7 --- /dev/null +++ b/tests/test_models/nn/test_deepar.py @@ -0,0 +1,55 @@ +import random + +import numpy as np +import pytest +import torch +from pytorch_forecasting.data import GroupNormalizer + +from etna.datasets.tsdataset import TSDataset +from etna.metrics import MAE +from etna.models.nn.deepar import DeepARModel +from etna.transforms.datetime_flags import DateFlagsTransform +from etna.transforms.pytorch_forecasting import PytorchForecastingTransform + + +@pytest.mark.long +@pytest.mark.parametrize("horizon", [8, 21]) +def test_deepar_model_run_weekly_overfit(weekly_period_df, horizon): + """ + Given: I have dataframe with 2 segments with weekly seasonality with known future + When: + Then: I get {horizon} periods per dataset as a forecast and they "the same" as past + """ + SEED = 121 # noqa: N806 + torch.manual_seed(SEED) + random.seed(SEED) + np.random.seed(SEED) + + ts_start = sorted(set(weekly_period_df.timestamp))[-horizon] + train, test = ( + weekly_period_df[lambda x: x.timestamp < ts_start], + weekly_period_df[lambda x: x.timestamp >= ts_start], + ) + + ts_train = TSDataset(TSDataset.to_dataset(train), "1d") + ts_test = TSDataset(TSDataset.to_dataset(test), "1d") + dft = DateFlagsTransform(day_number_in_week=True, day_number_in_month=False) + pft = PytorchForecastingTransform( + max_encoder_length=21, + max_prediction_length=horizon, + time_varying_known_reals=["time_idx"], + time_varying_known_categoricals=["day_number_in_week"], + time_varying_unknown_reals=["target"], + target_normalizer=GroupNormalizer(groups=["segment"]), + ) + + ts_train.fit_transform([dft, pft]) + + tftmodel = DeepARModel(max_epochs=300, learning_rate=[0.1]) + ts_pred = ts_train.make_future(horizon) + tftmodel.fit(ts_train) + ts_pred = tftmodel.forecast(ts_pred) + + mae = MAE("macro") + + assert mae(ts_test, ts_pred) < 0.2207 diff --git a/tests/test_models/nn/test_tft.py b/tests/test_models/nn/test_tft.py new file mode 100644 index 000000000..8a31947bc --- /dev/null +++ b/tests/test_models/nn/test_tft.py @@ -0,0 +1,56 @@ +import random + +import numpy as np +import pytest +import torch + +from etna.datasets.tsdataset import TSDataset +from etna.metrics import MAE +from etna.models.nn.tft import TFTModel +from etna.transforms.datetime_flags import DateFlagsTransform +from etna.transforms.pytorch_forecasting import PytorchForecastingTransform + + +@pytest.mark.long +@pytest.mark.parametrize("horizon", [8, 21]) +def test_tft_model_run_weekly_overfit(weekly_period_df, horizon): + """ + Given: I have dataframe with 2 segments with weekly seasonality with known future + When: + Then: I get {horizon} periods per dataset as a forecast and they "the same" as past + """ + + SEED = 121 # noqa: N806 + torch.manual_seed(SEED) + random.seed(SEED) + np.random.seed(SEED) + + ts_start = sorted(set(weekly_period_df.timestamp))[-horizon] + train, test = ( + weekly_period_df[lambda x: x.timestamp < ts_start], + weekly_period_df[lambda x: x.timestamp >= ts_start], + ) + + ts_train = TSDataset(TSDataset.to_dataset(train), "1d") + ts_test = TSDataset(TSDataset.to_dataset(test), "1d") + dft = DateFlagsTransform(day_number_in_week=True, day_number_in_month=False) + pft = PytorchForecastingTransform( + max_encoder_length=21, + min_encoder_length=21, + max_prediction_length=horizon, + time_varying_known_reals=["time_idx"], + time_varying_known_categoricals=["day_number_in_week"], + time_varying_unknown_reals=["target"], + static_categoricals=["segment"], + target_normalizer=None, + ) + + ts_train.fit_transform([dft, pft]) + + tftmodel = TFTModel(max_epochs=300, learning_rate=[0.1]) + ts_pred = ts_train.make_future(horizon) + tftmodel.fit(ts_train) + ts_pred = tftmodel.forecast(ts_pred) + + mae = MAE("macro") + assert mae(ts_test, ts_pred) < 0.23