diff --git a/darts/models/forecasting/arima.py b/darts/models/forecasting/arima.py index 3d6aa4e338..8dd6cefd7b 100644 --- a/darts/models/forecasting/arima.py +++ b/darts/models/forecasting/arima.py @@ -16,13 +16,15 @@ from statsmodels.tsa.arima.model import ARIMA as staARIMA from darts.logging import get_logger -from darts.models.forecasting.forecasting_model import DualCovariatesForecastingModel +from darts.models.forecasting.forecasting_model import ( + TransferableDualCovariatesForecastingModel, +) from darts.timeseries import TimeSeries logger = get_logger(__name__) -class ARIMA(DualCovariatesForecastingModel): +class ARIMA(TransferableDualCovariatesForecastingModel): def __init__( self, p: int = 12, @@ -66,11 +68,14 @@ def __str__(self): return f"SARIMA{self.order}x{self.seasonal_order}" def _fit(self, series: TimeSeries, future_covariates: Optional[TimeSeries] = None): - super()._fit(series, future_covariates) + + # storing to restore the statsmodels model results object + self.training_historic_future_covariates = future_covariates + m = staARIMA( - self.training_series.values(), - exog=future_covariates.values() if future_covariates else None, + series.values(copy=False), + exog=future_covariates.values(copy=False) if future_covariates else None, order=self.order, seasonal_order=self.seasonal_order, trend=self.trend, @@ -82,6 +87,8 @@ def _fit(self, series: TimeSeries, future_covariates: Optional[TimeSeries] = Non def _predict( self, n: int, + series: Optional[TimeSeries] = None, + historic_future_covariates: Optional[TimeSeries] = None, future_covariates: Optional[TimeSeries] = None, num_samples: int = 1, ) -> TimeSeries: @@ -93,18 +100,43 @@ def _predict( "your model." ) - super()._predict(n, future_covariates, num_samples) + super()._predict( + n, series, historic_future_covariates, future_covariates, num_samples + ) + + # updating statsmodels results object state with the new ts and covariates + if series is not None: + self.model = self.model.apply( + series.values(copy=False), + exog=historic_future_covariates.values(copy=False) + if historic_future_covariates + else None, + ) if num_samples == 1: forecast = self.model.forecast( - steps=n, exog=future_covariates.values() if future_covariates else None + steps=n, + exog=future_covariates.values(copy=False) + if future_covariates + else None, ) else: forecast = self.model.simulate( nsimulations=n, repetitions=num_samples, initial_state=self.model.states.predicted[-1, :], - exog=future_covariates.values() if future_covariates else None, + exog=future_covariates.values(copy=False) + if future_covariates + else None, + ) + + # restoring statsmodels results object state + if series is not None: + self.model = self.model.apply( + self._orig_training_series.values(copy=False), + exog=self.training_historic_future_covariates.values(copy=False) + if self.training_historic_future_covariates + else None, ) return self._build_forecast_series(forecast) diff --git a/darts/models/forecasting/forecasting_model.py b/darts/models/forecasting/forecasting_model.py index 6b0732f292..fa903de005 100644 --- a/darts/models/forecasting/forecasting_model.py +++ b/darts/models/forecasting/forecasting_model.py @@ -1083,7 +1083,7 @@ class DualCovariatesForecastingModel(ForecastingModel, ABC): Among other things, it lets Darts forecasting models wrap around statsmodels models having a `future_covariates` parameter, which corresponds to future-known covariates. - All implementations have to implement the `fit()` and `predict()` methods defined below. + All implementations have to implement the `_fit()` and `_predict()` methods defined below. """ _expect_covariate = False @@ -1137,6 +1137,7 @@ def predict( n: int, future_covariates: Optional[TimeSeries] = None, num_samples: int = 1, + **kwargs, ) -> TimeSeries: """Forecasts values for `n` time steps after the end of the training series. @@ -1159,8 +1160,7 @@ def predict( TimeSeries, a single time series containing the `n` next points after then end of the training series. """ - if future_covariates is None: - super().predict(n, num_samples) + super().predict(n, num_samples) if self._expect_covariate and future_covariates is None: raise_log( @@ -1170,6 +1170,12 @@ def predict( ) ) + raise_if( + not self._expect_covariate and future_covariates is not None, + "The model has been trained without `future_covariates` variable, but the " + "`future_covariates` parameter provided to `predict()` is not None.", + ) + if future_covariates is not None: start = self.training_series.end_time() + self.training_series.freq @@ -1194,13 +1200,13 @@ def predict( ] raise_if_not( - len(future_covariates) == n and self._expect_covariate, + len(future_covariates) == n, invalid_time_span_error, logger, ) return self._predict( - n, future_covariates=future_covariates, num_samples=num_samples + n, future_covariates=future_covariates, num_samples=num_samples, **kwargs ) @abstractmethod @@ -1234,3 +1240,132 @@ def _predict_wrapper( return self.predict( n, future_covariates=future_covariates, num_samples=num_samples ) + + +class TransferableDualCovariatesForecastingModel(DualCovariatesForecastingModel, ABC): + """The base class for the forecasting models that are not global, but support future covariates, and can + additionally be applied to new data unrelated to the original series used for fitting the model. Currently, + all the derived classes wrap statsmodels models. + + All implementations have to implement the `_fit()`, `_predict()` methods. + """ + + def predict( + self, + n: int, + series: Optional[TimeSeries] = None, + future_covariates: Optional[TimeSeries] = None, + num_samples: int = 1, + **kwargs, + ) -> TimeSeries: + """If the `series` parameter is not set, forecasts values for `n` time steps after the end of the training + series. If some future covariates were specified during the training, they must also be specified here. + + If the `series` parameter is set, forecasts values for `n` time steps after the end of the new target + series. If some future covariates were specified during the training, they must also be specified here. + + Parameters + ---------- + n + Forecast horizon - the number of time steps after the end of the series for which to produce predictions. + series + Optionally, a new target series whose future values will be predicted. Defaults to `None`, meaning that the + model will forecast the future value of the training series. + future_covariates + The time series of future-known covariates which can be fed as input to the model. It must correspond to + the covariate time series that has been used with the :func:`fit()` method for training. + + If `series` is not set, it must contain at least the next `n` time steps/indices after the end of the + training target series. If `series` is set, it must contain at least the time steps/indices corresponding + to the new target series (historic future covariates), plus the next `n` time steps/indices after the end. + num_samples + Number of times a prediction is sampled from a probabilistic model. Should be left set to 1 + for deterministic models. + + Returns + ------- + TimeSeries, a single time series containing the `n` next points after then end of the training series. + """ + + if self._expect_covariate and future_covariates is None: + raise_log( + ValueError( + "The model has been trained with `future_covariates` variable. Some matching " + "`future_covariates` variables have to be provided to `predict()`." + ) + ) + + historic_future_covariates = None + + if series is not None and future_covariates: + raise_if_not( + future_covariates.start_time() <= series.start_time() + and future_covariates.end_time() >= series.end_time() + n * series.freq, + "The provided `future_covariates` related to the new target series must contain at least the same time" + "steps/indices as the target `series` + `n`.", + logger, + ) + # splitting the future covariates + ( + historic_future_covariates, + future_covariates, + ) = future_covariates.split_after(series.end_time()) + + # in case future covariate have more values on the left end side that we don't need + if not series.has_same_time_as(historic_future_covariates): + historic_future_covariates = historic_future_covariates.slice_intersect( + series + ) + + # DualCovariatesForecastingModel performs some checks on self.training_series. We temporary replace that with + # the new ts + if series is not None: + self._orig_training_series = self.training_series + self.training_series = series + + result = super().predict( + n=n, + series=series, + historic_future_covariates=historic_future_covariates, + future_covariates=future_covariates, + num_samples=num_samples, + **kwargs, + ) + + # restoring the original training ts + if series is not None: + self.training_series = self._orig_training_series + + return result + + @abstractmethod + def _predict( + self, + n: int, + series: Optional[TimeSeries] = None, + historic_future_covariates: Optional[TimeSeries] = None, + future_covariates: Optional[TimeSeries] = None, + num_samples: int = 1, + ) -> TimeSeries: + """Forecasts values for a certain number of time steps after the end of the series. + TransferableDualCovariatesForecastingModel must implement the predict logic in this method. + """ + pass + + def _predict_wrapper( + self, + n: int, + series: TimeSeries, + past_covariates: Optional[TimeSeries], + future_covariates: Optional[TimeSeries], + num_samples: int, + ) -> TimeSeries: + return self.predict( + n=n, + series=series, + future_covariates=future_covariates, + num_samples=num_samples, + ) + + def _supports_non_retrainable_historical_forecasts(self) -> bool: + return True diff --git a/darts/models/forecasting/varima.py b/darts/models/forecasting/varima.py index cb6df1a85f..5ff59ac7fb 100644 --- a/darts/models/forecasting/varima.py +++ b/darts/models/forecasting/varima.py @@ -16,13 +16,15 @@ from statsmodels.tsa.api import VARMAX as staVARMA from darts.logging import get_logger, raise_if -from darts.models.forecasting.forecasting_model import DualCovariatesForecastingModel +from darts.models.forecasting.forecasting_model import ( + TransferableDualCovariatesForecastingModel, +) from darts.timeseries import TimeSeries logger = get_logger(__name__) -class VARIMA(DualCovariatesForecastingModel): +class VARIMA(TransferableDualCovariatesForecastingModel): def __init__(self, p: int = 1, d: int = 0, q: int = 0, trend: Optional[str] = None): """VARIMA @@ -57,17 +59,24 @@ def __str__(self): return f"VARMA({self.p},{self.q})" return f"VARIMA({self.p},{self.d},{self.q})" - def fit(self, series: TimeSeries, future_covariates: Optional[TimeSeries] = None): - # for VARIMA we need to process target `series` before calling DualForecastingModels' fit() method - self._last_values = ( - series.last_values() - ) # needed for back-transformation when d=1 + def _differentiate_series(self, series: TimeSeries) -> TimeSeries: + """Differentiate the series self.d times""" for _ in range(self.d): series = TimeSeries.from_dataframe( df=series.pd_dataframe(copy=False).diff().dropna(), static_covariates=series.static_covariates, hierarchy=series.hierarchy, ) + return series + + def fit(self, series: TimeSeries, future_covariates: Optional[TimeSeries] = None): + # for VARIMA we need to process target `series` before calling TransferableDualCovariatesForecastingModel' + # fit() method + self._last_values = ( + series.last_values() + ) # needed for back-transformation when d=1 + + series = self._differentiate_series(series) super().fit(series, future_covariates) @@ -77,12 +86,13 @@ def _fit( self, series: TimeSeries, future_covariates: Optional[TimeSeries] = None ) -> None: super()._fit(series, future_covariates) - series = self.training_series - future_covariates = future_covariates.values() if future_covariates else None + + # storing to restore the statsmodels model results object + self.training_historic_future_covariates = future_covariates m = staVARMA( - endog=series.pd_dataframe(copy=False), - exog=future_covariates, + endog=series.values(copy=False), + exog=future_covariates.values(copy=False) if future_covariates else None, order=(self.p, self.q), trend=self.trend, ) @@ -92,27 +102,100 @@ def _fit( def _predict( self, n: int, + series: Optional[TimeSeries] = None, + historic_future_covariates: Optional[TimeSeries] = None, future_covariates: Optional[TimeSeries] = None, num_samples: int = 1, ) -> TimeSeries: - super()._predict(n, future_covariates, num_samples) - forecast = self.model.forecast( - steps=n, exog=future_covariates.values() if future_covariates else None + if num_samples > 1 and self.trend: + logger.warn( + "Trends are not well supported yet for getting probabilistic forecasts with ARIMA." + "If you run into issues, try calling fit() with num_samples=1 or removing the trend from" + "your model." + ) + + self._last_num_samples = num_samples + + super()._predict( + n, series, historic_future_covariates, future_covariates, num_samples ) + + if series is not None: + self._training_last_values = self._last_values + # store new _last_values of the new target series + self._last_values = ( + series.last_values() + ) # needed for back-transformation when d=1 + + series = self._differentiate_series(series) + + # if the series is differentiated, the new len will be = len - 1, we have to adjust the future covariates + if historic_future_covariates and self.d > 0: + historic_future_covariates = historic_future_covariates.slice_intersect( + series + ) + + # updating statsmodels results object state + + self.model = self.model.apply( + series.values(copy=False), + exog=historic_future_covariates.values(copy=False) + if historic_future_covariates + else None, + ) + + # forecast before restoring the training state + if num_samples == 1: + forecast = self.model.forecast( + steps=n, + exog=future_covariates.values(copy=False) + if future_covariates + else None, + ) + else: + forecast = self.model.simulate( + nsimulations=n, + repetitions=num_samples, + initial_state=self.model.states.predicted[-1, :], + exog=future_covariates.values(copy=False) + if future_covariates + else None, + ) + forecast = self._invert_transformation(forecast) + + # restoring statsmodels results object state and last values + if series is not None: + self.model = self.model.apply( + self._orig_training_series.values(copy=False), + exog=self.training_historic_future_covariates.values(copy=False) + if self.training_historic_future_covariates + else None, + ) + + self._last_values = self._training_last_values + return self._build_forecast_series(np.array(forecast)) def _invert_transformation(self, series_df: pd.DataFrame): if self.d == 0: return series_df - series_df = self._last_values + series_df.cumsum(axis=0) + if self._last_num_samples > 1: + series_df = np.tile( + self._last_values, (self._last_num_samples, 1) + ).T + series_df.cumsum(axis=0) + else: + series_df = self._last_values + series_df.cumsum(axis=0) return series_df @property def min_train_series_length(self) -> int: return 30 + def _is_probabilistic(self) -> bool: + return True + def _supports_range_index(self) -> bool: raise_if( self.trend and self.trend != "c", diff --git a/darts/tests/models/forecasting/test_local_forecasting_models.py b/darts/tests/models/forecasting/test_local_forecasting_models.py index 7992f70fae..8157369a4e 100644 --- a/darts/tests/models/forecasting/test_local_forecasting_models.py +++ b/darts/tests/models/forecasting/test_local_forecasting_models.py @@ -20,6 +20,9 @@ StatsForecastAutoARIMA, Theta, ) +from darts.models.forecasting.forecasting_model import ( + TransferableDualCovariatesForecastingModel, +) from darts.tests.base_test_class import DartsBaseTestClass from darts.timeseries import TimeSeries from darts.utils import timeseries_generation as tg @@ -224,3 +227,112 @@ def test_dummy_series(self): autoarima = AutoARIMA(trend="t") with self.assertRaises(ValueError): autoarima.fit(series=ts) + + def test_statsmodels_dual_models(self): + + # same tests, but VARIMA requires to work on a multivariate target series + UNIVARIATE = "univariate" + MULTIVARIATE = "multivariate" + + params = [ + (ARIMA, {}, UNIVARIATE), + (VARIMA, {"d": 0}, MULTIVARIATE), + (VARIMA, {"d": 1}, MULTIVARIATE), + ] + + for model_cls, kwargs, model_type in params: + pred_len = 5 + if model_type == MULTIVARIATE: + series1 = self.ts_ice_heater_train + series2 = self.ts_ice_heater_val + else: + series1 = self.ts_pass_train + series2 = self.ts_pass_val + + # creating covariates from series + noise + noise1 = tg.gaussian_timeseries(length=len(series1)) + noise2 = tg.gaussian_timeseries(length=len(series2)) + + for _ in range(1, series1.n_components): + noise1 = noise1.stack(tg.gaussian_timeseries(length=len(series1))) + noise2 = noise2.stack(tg.gaussian_timeseries(length=len(series2))) + + exog1 = series1 + noise1 + exog2 = series2 + noise2 + + exog1_longer = exog1.concatenate(exog1, ignore_time_axis=True) + exog2_longer = exog2.concatenate(exog2, ignore_time_axis=True) + + # shortening of pred_len so that exog are enough for the training series prediction + series1 = series1[:-pred_len] + series2 = series2[:-pred_len] + + # check runnability with different time series + model = model_cls(**kwargs) + model.fit(series1) + pred1 = model.predict(n=pred_len) + pred2 = model.predict(n=pred_len, series=series2) + + # check probabilistic forecast + n_samples = 3 + pred1 = model.predict(n=pred_len, num_samples=n_samples) + pred2 = model.predict(n=pred_len, series=series2, num_samples=n_samples) + + # check that the results with a second custom ts are different from the results given with the training ts + self.assertFalse(np.array_equal(pred1.values, pred2.values())) + + # check runnability with exogeneous variables + model = model_cls(**kwargs) + model.fit(series1, future_covariates=exog1) + pred1 = model.predict(n=pred_len, future_covariates=exog1) + pred2 = model.predict(n=pred_len, series=series2, future_covariates=exog2) + + self.assertFalse(np.array_equal(pred1.values(), pred2.values())) + + # check runnability with future covariates with extra time steps in the past compared to the target series + model = model_cls(**kwargs) + model.fit(series1, future_covariates=exog1_longer) + pred1 = model.predict(n=pred_len, future_covariates=exog1_longer) + pred2 = model.predict( + n=pred_len, series=series2, future_covariates=exog2_longer + ) + + # check error is raised if model expects covariates but those are not passed when predicting with new data + with self.assertRaises(ValueError): + model = model_cls(**kwargs) + model.fit(series1, future_covariates=exog1) + model.predict(n=pred_len, series=series2) + + # check error is raised if new future covariates are not wide enough for prediction (on the original series) + with self.assertRaises(ValueError): + model = model_cls(**kwargs) + model.fit(series1, future_covariates=exog1) + model.predict(n=pred_len, future_covariates=exog1[:-pred_len]) + + # check error is raised if new future covariates are not wide enough for prediction (on a new series) + with self.assertRaises(ValueError): + model = model_cls(**kwargs) + model.fit(series1, future_covariates=exog1) + model.predict( + n=pred_len, series=series2, future_covariates=exog2[:-pred_len] + ) + # and checking the case with unsufficient historic future covariates + with self.assertRaises(ValueError): + model = model_cls(**kwargs) + model.fit(series1, future_covariates=exog1) + model.predict( + n=pred_len, series=series2, future_covariates=exog2[pred_len:] + ) + + # verify that we can still forecast the original training series after predicting a new target series + model = model_cls(**kwargs) + model.fit(series1, future_covariates=exog1) + pred1 = model.predict(n=pred_len, future_covariates=exog1) + model.predict(n=pred_len, series=series2, future_covariates=exog2) + pred3 = model.predict(n=pred_len, future_covariates=exog1) + + self.assertTrue(np.array_equal(pred1.values(), pred3.values())) + + # check backtesting with retrain=False + model: TransferableDualCovariatesForecastingModel = model_cls(**kwargs) + model.backtest(series1, future_covariates=exog1, retrain=False)