Skip to content

Commit

Permalink
0.6.16 a58
Browse files Browse the repository at this point in the history
  • Loading branch information
winedarksea committed Oct 5, 2024
1 parent a495a73 commit aca39df
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 139 deletions.
12 changes: 12 additions & 0 deletions autots/evaluator/auto_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
SeasonalityMotif,
FFT,
BallTreeMultivariateMotif,
BasicLinearModel,
)
from autots.models.statsmodels import (
GLS,
Expand Down Expand Up @@ -710,6 +711,17 @@ def ModelMonster(
n_jobs=n_jobs,
**parameters,
)
elif model == 'BasicLinearModel':
return BasicLinearModel(
frequency=frequency,
prediction_interval=prediction_interval,
holiday_country=holiday_country,
random_seed=random_seed,
verbose=verbose,
forecast_length=forecast_length,
n_jobs=n_jobs,
**parameters,
)
elif model == "":
raise AttributeError(
("Model name is empty. Likely this means AutoTS has not been fit.")
Expand Down
284 changes: 284 additions & 0 deletions autots/models/basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
seasonal_independent_match,
date_part,
base_seasonalities,
create_changepoint_features,
changepoint_fcst_from_last_row,
random_datepart,
)
from autots.tools.probabilistic import Point_to_Probability, historic_quantile
from autots.tools.window_functions import (
Expand Down Expand Up @@ -3320,4 +3323,285 @@ def get_params(self):
"point_method": self.point_method,
"distance_metric": self.distance_metric,
"k": self.k,
"sample_fraction": self.sample_fraction,
}


class BasicLinearModel(ModelObject):
"""Ridge regression of seasonal + trend changepoint + constant + regressor.
Like a minimal version of Prophet or Cassandra.
Args:
name (str): String to identify class
frequency (str): String alias of datetime index frequency or else 'infer'
prediction_interval (float): Confidence interval for probabilistic forecast
regression_type (str): "User" or None. If used, will be as covariate. The ratio of num_series:num_regressor_series will largely determine the impact
window (int): length of forecast history to match on
point_method (int): how to summarize the nearest neighbors to generate the point forecast
"weighted_mean", "mean", "median", "midhinge"
distance_metric (str): all valid values for scipy cdist + "nan_euclidean" from sklearn
include_differenced (bool): True to have the distance metric result be an average of the distance on absolute values as well as differenced values
k (int): number of closest neighbors to consider
stride_size (int): how many obs to skip between each new window. Higher numbers will reduce the number of matching windows and make the model faster.
"""

def __init__(
self,
name: str = "BasicLinearModel",
frequency: str = 'infer',
prediction_interval: float = 0.9,
holiday_country: str = 'US',
random_seed: int = 2024,
verbose: int = 0,
regression_type: str = None,
datepart_method: str = "common_fourier",
changepoint_spacing: int = None,
changepoint_distance_end: int = None,
λ: float = 0.01,
**kwargs,
):
ModelObject.__init__(
self,
name,
frequency,
prediction_interval,
holiday_country=holiday_country,
random_seed=random_seed,
verbose=verbose,
regression_type=regression_type,
)
self.datepart_method = datepart_method
self.changepoint_spacing = changepoint_spacing
self.changepoint_distance_end = changepoint_distance_end
self.λ = λ

self.regressor_columns = []

def base_scaler(self, df):
self.scaler_mean = np.mean(df, axis=0)
self.scaler_std = np.std(df, axis=0).replace(0, 1)
return (df - self.scaler_mean) / self.scaler_std

def scale_data(self, df):
if self.scaling is not None:
if self.scaling == "BaseScaler":
df = self.base_scaler(df)
else:
df = self.scaler.transform(df)
if not isinstance(df, pd.DataFrame):
return pd.DataFrame(df, columns=df.columns, index=df.index)
else:
return df

def to_origin_space(
self, df, trans_method='forecast', components=False, bounds=False
):
"""Take transformed outputs back to original feature space."""
if self.scaling == "BaseScaler":
# return self.preprocesser.inverse_transform(df, trans_method=trans_method) * self.scaler_std + self.scaler_mean
if components:
return self.preprocesser.inverse_transform(
df * self.scaler_std, trans_method=trans_method, bounds=bounds
)
else:
return self.preprocesser.inverse_transform(
df * self.scaler_std + self.scaler_mean,
trans_method=trans_method,
bounds=bounds,
)

def create_x(self, df, future_regressor=None):
x_s = date_part(df.index, method=self.datepart_method)
x_t = create_changepoint_features(df.index, changepoint_spacing=self.changepoint_spacing, changepoint_distance_end=self.changepoint_distance_end)
self.last_row = x_t.iloc[-1]
X = pd.concat([x_s, x_t], axis=1)
if str(self.regression_type).lower() == "user" and future_regressor is not None:
temp = future_regressor.reindex(df.index).rename(columns=lambda x: "regr_" + str(x))
self.regressor_columns = temp.columns
X = pd.concat(X, temp, axis=1)
X["constant"] = 1

self.seasonal_columns = x_s.columns.tolist()
self.trend_columns = x_t.columns.tolist()
return X

def fit(self, df, future_regressor=None):
"""Train algorithm given data supplied.
Args:
df (pandas.DataFrame): Datetime Indexed
"""
df = self.basic_profile(df)
self.df = df
if self.changepoint_spacing is None or self.changepoint_distance_end is None:
half_yr_spacing = int(df.shape[0] / ((df.index.max().year - df.index.min().year + 1) * 2))
if self.changepoint_spacing is None:
self.changepoint_spacing = half_yr_spacing
if self.changepoint_spacing is None:
self.changepoint_distance_end = int(half_yr_spacing / 2)
if str(self.regression_type).lower() == "user":
if future_regressor is None:
raise ValueError(
"regression_type=='User' but no future_regressor supplied"
)
X = self.create_x(df, future_regressor)

# Convert X and df (Y) to NumPy arrays for linear regression
X_values = X.to_numpy().astype(float)
Y_values = df.to_numpy().astype(float)

if self.λ is not None:
I = np.eye(X_values.shape[1])
# Perform Ridge regression using the modified normal equation
self.beta = np.linalg.inv(X_values.T @ X_values + self.λ * I) @ X_values.T @ Y_values
else:
# Perform linear regression using the normal equation: (X.T @ X)^(-1) @ X.T @ Y
self.beta = np.linalg.pinv(X_values.T @ X_values) @ X_values.T @ Y_values

# Calculate predicted values for Y
Y_pred = X_values @ self.beta

# Calculate residuals for each column of Y
residuals = Y_values - Y_pred

# Calculate the sum of squared errors (SSE) and standard error (sigma) for each column of Y
sse = np.sum(residuals ** 2, axis=0) # Sum of squared errors for each column (shape (21,))
n = Y_values.shape[0]
p = X_values.shape[1] # Number of predictors
self.sigma = np.sqrt(sse / (n - p)) # Standard deviation of residuals for each column (shape (21,))

self.fit_runtime = datetime.datetime.now() - self.startTime
return self

def predict(
self, forecast_length: int, future_regressor=None, just_point_forecast=False
):
"""Generates forecast data immediately following dates of index supplied to .fit()
Args:
forecast_length (int): Number of periods of data to forecast ahead
regressor (numpy.Array): additional regressor, not used
just_point_forecast (bool): If True, return a pandas.DataFrame of just point forecasts
Returns:
Either a PredictionObject of forecasts and metadata, or
if just_point_forecast == True, a dataframe of point forecasts
"""
predictStartTime = datetime.datetime.now()
test_index = self.create_forecast_index(forecast_length=forecast_length)

x_s = date_part(test_index, method=self.datepart_method)
x_t = changepoint_fcst_from_last_row(self.last_row, int(forecast_length))
x_t.index = test_index
X = pd.concat([x_s, x_t], axis=1)
if str(self.regression_type).lower() == "user":
X = pd.concat(X, future_regressor.reindex(test_index), axis=1)
X["constant"] = 1
X_values = X.to_numpy().astype(float)
self.X = X

forecast = pd.DataFrame(X_values @ self.beta, columns=self.column_names, index=test_index)

if just_point_forecast:
return forecast
else:
z_value = norm.ppf(1 - (1 - self.prediction_interval) / 2) # z-score for 95% confidence, e.g., 1.96
# Vectorized calculation of leverage for all points: diag(X @ (X^T X)^(-1) @ X^T)
hat_matrix_diag = np.einsum('ij,jk,ik->i', X_values, np.linalg.pinv(X_values.T @ X_values, rcond=5e-16), X_values)
# Broadcast the sigma values (shape (21,)) to match the number of rows (shape (2389,))
# This will give us a matrix of shape (2389, 21)
sigma_expanded = self.sigma[np.newaxis, :]
# Calculate the margin of error for each prediction in each column
margin_of_error = pd.DataFrame(
z_value * sigma_expanded * np.sqrt(1 + hat_matrix_diag[:, np.newaxis]),
columns=self.column_names, index=test_index,
)
upper_forecast = forecast + margin_of_error
lower_forecast = forecast - margin_of_error

predict_runtime = datetime.datetime.now() - predictStartTime
prediction = PredictionObject(
model_name=self.name,
forecast_length=forecast_length,
forecast_index=forecast.index,
forecast_columns=forecast.columns,
# so it's producing float32 but pandas is better with float64
lower_forecast=lower_forecast.astype(float),
forecast=forecast.astype(float),
upper_forecast=upper_forecast.astype(float),
prediction_interval=self.prediction_interval,
predict_runtime=predict_runtime,
fit_runtime=self.fit_runtime,
model_parameters=self.get_params(),
)

return prediction

def return_components(self, df):
# Needs some work
# doens't handle regressor features
# recompiles X which is suboptimal
# could use better naming
X = self.create_x(df)
contribution_seasonality = X[self.seasonal_columns].values @ self.beta[:len(self.seasonal_columns)]
contribution_changepoints = X[self.trend_columns].values @ self.beta[len(self.seasonal_columns):len(self.seasonal_columns) + len(self.trend_columns)]
contribution_constant = X["constant"].values.reshape(-1, 1) @ self.beta[-1:]
return contribution_seasonality, contribution_changepoints, contribution_constant

def coefficient_summary(self, df):
"""Used in profiler."""
contribution_seasonality, contribution_changepoints, contribution_constant = self.return_components(df)
# Total contribution (sum of absolute contributions for each time step)
total_contribution = np.abs(contribution_seasonality) + np.abs(contribution_changepoints) + np.abs(contribution_constant)

# Normalize each contribution by the total contribution
contrib_seasonality_pct = np.abs(contribution_seasonality) / total_contribution
contrib_changepoints_pct = np.abs(contribution_changepoints) / total_contribution
contrib_constant_pct = np.abs(contribution_constant) / total_contribution

# Calculate the average percentage contribution for each group
avg_contrib_seasonality = np.mean(contrib_seasonality_pct, axis=0)
avg_contrib_changepoints = np.mean(contrib_changepoints_pct, axis=0)
avg_contrib_constant = np.mean(contrib_constant_pct, axis=0)

# Create a DataFrame to summarize the percentage contributions
feature_contributions = pd.DataFrame({
"seasonality_contribution": avg_contrib_seasonality,
"changepoint_contribution": avg_contrib_changepoints,
"constant_contribution": avg_contrib_constant
}, index=self.column_names)
"""
feature_contributions['largest_contributor'] = feature_contributions[[
'seasonality_contribution',
'changepoint_contribution',
'constant_contribution'
]].idxmax(axis=1)
"""
feature_contributions["season_trend_percent"] = feature_contributions["seasonality_contribution"] / (feature_contributions["changepoint_contribution"] + feature_contributions["seasonality_contribution"])
return feature_contributions

def get_new_params(self, method: str = 'random'):
"""Returns dict of new parameters for parameter tuning"""
if "regressor" in method:
regression_choice = "User"
else:
regression_choice = random.choices(
[None, 'User'], [0.8, 0.2]
)[0]
return {
"datepart_method": random_datepart(method=method),
"changepoint_spacing": random.choices([6, 28, 60, 90, 180, 360, 5040], [0.05, 0.1, 0.1, 0.1, 0.2, 0.1, 0.2])[0],
"changepoint_distance_end": random.choices([6, 28, 60, 90, 180, 360, 5040], [0.05, 0.1, 0.1, 0.1, 0.2, 0.1, 0.2])[0],
"regression_type": regression_choice,
"λ": random.choices([None, 0.001, 0.01, 0.1, 1, 10, 100, 1000, 10000], [0.6, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1], k=1)[0],
}

def get_params(self):
"""Return dict of current parameters"""
return {
"datepart_method": self.datepart_method,
"changepoint_spacing": self.changepoint_spacing,
"changepoint_distance_end": self.changepoint_distance_end,
"regression_type": self.regression_type,
"λ": self.λ,
}
8 changes: 1 addition & 7 deletions autots/models/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from autots.models.model_list import no_shared
from autots.tools.impute import fill_median
from autots.models.sklearn import retrieve_classifier
from autots.tools.profile import profile_time_series
from autots.tools.profile import profile_time_series, summarize_series


horizontal_aliases = ['horizontal', 'probabilistic', 'horizontal-max', 'horizontal-min']
Expand All @@ -36,12 +36,6 @@
]


def summarize_series(df):
"""Summarize time series data. For now just df.describe()."""
df_sum = df.describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.9])
return df_sum


def mosaic_or_horizontal(all_series: dict):
"""Take a mosaic or horizontal model and return series or models.
Expand Down
Loading

0 comments on commit aca39df

Please sign in to comment.