Skip to content

Commit

Permalink
Fix ts wrong links (#994)
Browse files Browse the repository at this point in the history
Fix for #979
  • Loading branch information
valer1435 authored Feb 27, 2023
1 parent 9dac6c0 commit 3a9f98a
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 59 deletions.
6 changes: 3 additions & 3 deletions examples/simple/time_series_forecasting/api_forecasting.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from typing import Optional

import pandas as pd

Expand All @@ -19,8 +18,9 @@
'stackoverflow': f'{fedot_project_root()}/examples/data/ts/stackoverflow.csv'}


def get_ts_data(dataset: str = 'beer', horizon: int = 10, validation_blocks: Optional[int] = None):
def get_ts_data(dataset='australia', horizon: int = 30, validation_blocks=None):
time_series = pd.read_csv(datasets[dataset])

task = Task(TaskTypesEnum.ts_forecasting,
TsForecastingParams(forecast_length=horizon))
if dataset not in ['australia']:
Expand All @@ -40,7 +40,7 @@ def get_ts_data(dataset: str = 'beer', horizon: int = 10, validation_blocks: Opt

def run_ts_forecasting_example(dataset='australia', horizon: int = 30, validation_blocks=2, timeout: float = None,
visualization=False, with_tuning=True):
train_data, test_data = get_ts_data(dataset, horizon, 2)
train_data, test_data = get_ts_data(dataset, horizon, validation_blocks)
# init model for the time series forecasting
model = Fedot(problem='ts_forecasting',
task_params=Task(TaskTypesEnum.ts_forecasting,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from copy import copy
from copy import copy, deepcopy
from typing import Optional, Union

import numpy as np
Expand Down Expand Up @@ -678,8 +678,9 @@ def transform(self, input_data: InputData) -> OutputData:
Returns:
output data with cutted time series
"""

old_target = deepcopy(input_data.target)
input_data = self._cut_input_data(input_data)
input_data.target = old_target

output_data = self._convert_to_output(input_data,
input_data.features,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def fit(self, input_data):
pass

def predict(self, input_data: InputData) -> OutputData:
input_data = copy(input_data)
""" Get desired part of time series for averaging and calculate mean value """
forecast_length = input_data.task.task_params.forecast_length

Expand All @@ -114,20 +115,18 @@ def predict(self, input_data: InputData) -> OutputData:
return output_data

def predict_for_fit(self, input_data: InputData) -> OutputData:
input_data = copy(input_data)
forecast_length = input_data.task.task_params.forecast_length
parts = split_rolling_slices(input_data)
mean_values_for_chunks = self.average_by_axis(parts)
forecast = np.repeat(mean_values_for_chunks.reshape((-1, 1)), forecast_length, axis=1)
forecast = forecast[:-forecast_length, :]

# Update target
_, transformed_target = ts_to_table(idx=input_data.idx, time_series=input_data.target,
window_size=forecast_length, is_lag=True)
input_data.target = transformed_target[1:, :]
new_idx, transformed_target = ts_to_table(idx=input_data.idx, time_series=input_data.target,
window_size=forecast_length)

# Update indices - there is no forecast for first element and skip last out of boundaries predictions
last_threshold = forecast_length - 1
new_idx = input_data.idx[1: -last_threshold]
input_data.target = transformed_target
forecast = forecast[new_idx]
input_data.idx = new_idx
output_data = self._convert_to_output(input_data,
predict=forecast,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,16 @@ def predict_for_fit(self, input_data: InputData) -> OutputData:
# adding nan to target as in predicted
nan_mask = np.isnan(predicted)
target = target.astype(float)
target[nan_mask] = np.nan
_, predict = ts_to_table(idx=idx,
time_series=predicted,
window_size=forecast_length)
target = target[~nan_mask]
idx = idx[~nan_mask]
predicted = predicted[~nan_mask]
new_idx, predict = ts_to_table(idx=idx,
time_series=predicted,
window_size=forecast_length)
_, target_columns = ts_to_table(idx=idx,
time_series=target,
window_size=forecast_length)

input_data.idx = input_data.idx[~nan_mask]
time_series=target,
window_size=forecast_length)
input_data.idx = new_idx
input_data.target = target_columns
output_data = self._convert_to_output(input_data,
predict=predict,
Expand Down Expand Up @@ -260,7 +261,7 @@ def fit(self, input_data):
error=self.params.get("error"),
trend=self.params.get("trend"),
seasonal=self.params.get("seasonal"),
damped_trend=self.params.get("damped_trend"),
damped_trend= self.params.get("damped_trend") if self.params.get("trend") else None,
seasonal_periods=self.seasonal_periods
)
self.model = self.model.fit(disp=False)
Expand Down
1 change: 0 additions & 1 deletion fedot/core/pipelines/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ def _combine_parents(parent_nodes: List[PipelineNode],
parent_results.append(prediction)
else:
raise NotImplementedError()

if input_data is None:
# InputData was set to primary nodes
target = prediction.target
Expand Down
1 change: 1 addition & 0 deletions fedot/core/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def try_load_from_cache(self, cache: Optional[OperationsCache], preprocessing_ca
Returns:
bool: indicating if at least one node was loaded
"""

if cache is not None:
cache.try_load_into_pipeline(self, fold_id)
if preprocessing_cache is not None:
Expand Down
2 changes: 1 addition & 1 deletion fedot/core/pipelines/pipeline_advisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ def propose_parent(self, node: OptNode, possible_operations: List[str]) -> List[

def check_for_specific_operations(operation_id: str):
if ('data_source' in operation_id or
'exog_ts' == operation_id or 'custom' in operation_id):
'exog_ts' in operation_id or 'custom' in operation_id):
return True
return False
102 changes: 73 additions & 29 deletions fedot/core/pipelines/verification_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ def is_pipeline_contains_ts_operations(pipeline: Pipeline):
def has_no_data_flow_conflicts_in_ts_pipeline(pipeline: Pipeline):
""" Function checks the correctness of connection between nodes """
task = Task(TaskTypesEnum.ts_forecasting)
models = get_operations_for_task(task=task, mode='model')
ts_models = get_operations_for_task(task=task, mode='model', tags=["non_lagged"])
non_ts_models = list(set(get_operations_for_task(task=task, mode='model')).difference(set(ts_models)))

# Preprocessing not only for time series
non_ts_data_operations = get_operations_for_task(task=task,
mode='data_operation',
Expand All @@ -119,40 +121,23 @@ def has_no_data_flow_conflicts_in_ts_pipeline(pipeline: Pipeline):
ts_data_operations.remove('sparse_lagged')
ts_data_operations.remove('exog_ts')

ts_to_table_operations = ['lagged', 'sparse_lagged', 'exog_ts']

# Dictionary as {'current operation in the node': 'parent operations list'}
# TODO refactor
wrong_connections = {'lagged': models + non_ts_data_operations + ['lagged', 'sparse_lagged', 'exog_ts'],
'sparse_lagged': models + non_ts_data_operations + ['lagged', 'sparse_lagged', 'exog_ts'],
'ar': models + non_ts_data_operations + ['lagged', 'sparse_lagged', 'exog_ts'],
'arima': models + non_ts_data_operations + ['lagged', 'sparse_lagged', 'exog_ts'],
'ridge': ts_data_operations, 'linear': ts_data_operations,
'lasso': ts_data_operations, 'dtreg': ts_data_operations,
'knnreg': ts_data_operations, 'scaling': ts_data_operations,
'xgbreg': ts_data_operations, 'adareg': ts_data_operations,
'gbr': ts_data_operations, 'treg': ts_data_operations,
'rfr': ts_data_operations, 'svr': ts_data_operations,
'sgdr': ts_data_operations, 'normalization': ts_data_operations,
'kernel_pca': ts_data_operations, 'poly_features': ts_data_operations,
'ransac_lin_reg': ts_data_operations, 'ransac_non_lin_reg': ts_data_operations,
'rfe_lin_reg': ts_data_operations, 'rfe_non_lin_reg': ts_data_operations,
'pca': ts_data_operations,
'gaussian_filter': ['lagged', 'sparse_lagged', 'exog_ts'],
'diff_filter': ['lagged', 'sparse_lagged', 'exog_ts'],
'smoothing': ['lagged', 'sparse_lagged', 'exog_ts'],
'cut': ['lagged', 'sparse_lagged', 'exog_ts'],
'ts_naive_average': ['lagged', 'sparse_lagged', 'exog_ts'],
'locf': ['lagged', 'sparse_lagged', 'exog_ts'],
'ets': ['lagged', 'sparse_lagged', 'exog_ts'],
'polyfit': ['lagged', 'sparse_lagged', 'exog_ts'],
'glm': ['lagged', 'sparse_lagged', 'exog_ts'],
'stl_arima': ['lagged', 'sparse_lagged', 'exog_ts'],
}
wrong_connections = get_wrong_links(ts_to_table_operations, ts_data_operations, non_ts_data_operations,
ts_models, non_ts_models)

limit_parents_count, need_to_have_parent = get_parent_limits(ts_to_table_operations, ts_data_operations,
non_ts_data_operations,
ts_models)

for node in pipeline.nodes:
# Operation name in the current node
current_operation = node.operation.operation_type
parent_nodes = node.nodes_from

if current_operation in limit_parents_count:
if limit_parents_count[current_operation] < len(parent_nodes):
raise ValueError(f'{ERROR_PREFIX} Pipeline has incorrect subgraph with wrong parent nodes combination')
if parent_nodes is not None:
# There are several parents for current node or at least 1
for parent in parent_nodes:
Expand All @@ -161,9 +146,68 @@ def has_no_data_flow_conflicts_in_ts_pipeline(pipeline: Pipeline):
forbidden_parents = wrong_connections.get(current_operation)
if forbidden_parents is not None:
__check_connection(parent_operation, forbidden_parents)
else:
if current_operation in need_to_have_parent:
raise ValueError(f'{ERROR_PREFIX} Pipeline has incorrect subgraph with wrong parent nodes combination')
return True


def get_wrong_links(ts_to_table_operations: list, ts_data_operations: list, non_ts_data_operations: list,
ts_models: list, non_ts_models: list) -> dict:
"""
Function that return wrong ts connections like op_A : [op_B, op_C] that means op_B and op_C
can't be a parent for op_A.
:param ts_to_table_operations: list of ts_to_table operations
:param ts_data_operations: list of ts data operations
:param non_ts_data_operations: list of non ts data operations
:param ts_models: list of ts models
:param non_ts_models: list of non ts models
:return: dict with wrong connections
"""
limit_lagged_parents = {lagged_op: ts_models + non_ts_models + non_ts_data_operations + ts_to_table_operations
for lagged_op in ts_to_table_operations}

limit_ts_models_parents = {ts_model: ts_models + non_ts_models + non_ts_data_operations + ts_to_table_operations
for ts_model in ts_models}
limit_non_ts_models_parents = {non_ts_model: ts_data_operations
for non_ts_model in non_ts_models}

limit_ts_data_operations_parents = {
ts_data_op: ts_models + non_ts_models + non_ts_data_operations + ts_to_table_operations
for ts_data_op in ts_data_operations}
limit_non_ts_data_operations_parents = {non_ts_data_op: ts_data_operations
for non_ts_data_op in non_ts_data_operations}

wrong_connections = {**limit_non_ts_data_operations_parents,
**limit_ts_data_operations_parents,
**limit_non_ts_models_parents,
**limit_ts_models_parents,
**limit_lagged_parents}
return wrong_connections


def get_parent_limits(ts_to_table_operations: list, ts_data_operations: list, non_ts_data_operations: list,
ts_models: list) -> (dict, list):
"""
Function that return some constraints on number of parents for time series forecasting graphs
:param ts_to_table_operations: list of ts_to_table operations
:param ts_data_operations: list of ts data operations
:param non_ts_data_operations: list of non ts data operations
:param ts_models: list of ts models
:return: dict with parent limits and list with operations that must have a parent
"""
limit_ts_model_data_op_parents_count = {ts_model_op: 1
for ts_model_op in ts_models + ts_data_operations + ts_to_table_operations}

limit_decompose_parents_count = {'decompose': 1}

limit_parents_count = {**limit_ts_model_data_op_parents_count, **limit_decompose_parents_count}
need_to_have_parent = [op for op in non_ts_data_operations]
return limit_parents_count, need_to_have_parent


def only_non_lagged_operations_are_primary(pipeline: Pipeline):
""" Only time series specific operations could be placed in primary nodes """

Expand Down
10 changes: 5 additions & 5 deletions fedot/core/repository/data/data_operation_repository.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@
"scaling": {
"meta": "custom_preprocessing",
"presets": ["fast_train", "ts", "*tree"],
"tags": ["simple", "feature_scaling"]
"tags": ["simple", "feature_scaling", "non_applicable_for_ts"]
},
"normalization": {
"meta": "custom_preprocessing",
"presets": ["fast_train", "ts", "*tree"],
"tags": ["simple", "feature_scaling"]
"tags": ["simple", "feature_scaling", "non_applicable_for_ts"]
},
"simple_imputation": {
"meta": "custom_preprocessing",
Expand Down Expand Up @@ -224,19 +224,19 @@
},
"isolation_forest_reg": {
"meta": "regression_preprocessing",
"tags": ["non_linear", "filtering"]
"tags": ["non_linear", "filtering", "non_applicable_for_ts"]
},
"isolation_forest_class": {
"meta": "classification_preprocessing",
"tags": ["non_linear", "filtering"]
},
"rfe_lin_reg": {
"meta": "regression_preprocessing",
"tags": ["linear", "feature_selection", "non_applicable_for_ts", "non-default"]
"tags": ["linear", "feature_selection", "non_applicable_for_ts", "non-default", "non_applicable_for_ts"]
},
"rfe_non_lin_reg": {
"meta": "regression_preprocessing",
"tags": ["non_linear", "feature_selection", "non_applicable_for_ts", "non-default"]
"tags": ["non_linear", "feature_selection", "non_applicable_for_ts", "non-default", "non_applicable_for_ts"]
},
"rfe_lin_class": {
"meta": "classification_preprocessing",
Expand Down
1 change: 1 addition & 0 deletions fedot/utilities/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def is_test_session():
return 'PYTEST_CURRENT_TEST' in os.environ



def is_recording_mode():
return 'FEDOT_RECORDING_MODE' in os.environ

Expand Down
4 changes: 2 additions & 2 deletions test/unit/models/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ def test_ts_naive_average_forecast_correctly():
predict_forecast = model.predict(predict_input)

# Check correctness during pipeline fit stage
assert (10, 4) == fit_forecast.target.shape
assert np.array_equal(fit_forecast.idx, np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
assert (11, 4) == fit_forecast.target.shape
assert np.array_equal(fit_forecast.idx, np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
assert np.isclose(fit_forecast.predict[0, 0], 0)

# Pipeline predict stage
Expand Down

0 comments on commit 3a9f98a

Please sign in to comment.