Skip to content

Commit

Permalink
1182-fix (#1215)
Browse files Browse the repository at this point in the history
* added preparatory functions for fast_train check

* Remove fast_train preset and non-default tag from ransac_non_lin_reg

* Remove non-default tag from ransac_non_lin_reg

* Remove fast_train tag from diff_filter

* Form a test for checking fast_train preset

* Add secondary perfomance check for operations with fast_train preset

* Fix secondary fast_train check list copy

* speedup naive average

* update test

* Move perfomance evaluating function above the tests

* add `resample` and `one_hot_encoding` to skip list.py

---------

Co-authored-by: Sergey Kasyanov <[email protected]>
  • Loading branch information
Lopa10ko and kasyanovse authored Dec 12, 2023
1 parent 607b416 commit dd7a75e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from copy import copy
from typing import Optional

import numpy as np

Expand Down Expand Up @@ -104,9 +103,9 @@ def predict(self, input_data: InputData) -> OutputData:
""" Get desired part of time series for averaging and calculate mean value """
forecast_length = input_data.task.task_params.forecast_length

elements_to_take = self._how_many_elements_use_for_averaging(input_data.features)
window = self._window(input_data.features)
# Prepare single forecast
mean_value = np.nanmean(input_data.features[-elements_to_take:])
mean_value = np.nanmean(input_data.features[-window:])
forecast = np.array([mean_value] * forecast_length).reshape((1, -1))

output_data = self._convert_to_output(input_data,
Expand All @@ -117,9 +116,13 @@ def predict(self, input_data: InputData) -> OutputData:
def predict_for_fit(self, input_data: InputData) -> OutputData:
input_data = copy(input_data)
forecast_length = input_data.task.task_params.forecast_length
parts = split_rolling_slices(input_data)
mean_values_for_chunks = self.average_by_axis(parts)
forecast = np.repeat(mean_values_for_chunks.reshape((-1, 1)), forecast_length, axis=1)
features = input_data.features
shape = features.shape[0]

window = self._window(features)
mean_values = np.array([np.mean(features[-window-shape+i:i+1]) for i in range(shape)])

forecast = np.repeat(mean_values.reshape((-1, 1)), forecast_length, axis=1)

# Update target
new_idx, transformed_target = ts_to_table(idx=input_data.idx, time_series=input_data.target,
Expand All @@ -133,42 +136,5 @@ def predict_for_fit(self, input_data: InputData) -> OutputData:
data_type=DataTypesEnum.table)
return output_data

def average_by_axis(self, parts: np.array):
""" Perform averaging for each column using last part of it """
mean_values_for_chunks = np.apply_along_axis(self._average, 1, parts)
return mean_values_for_chunks

def _average(self, row: np.array):
row = row[np.logical_not(np.isnan(row))]
if len(row) == 1:
return row

elements_to_take = self._how_many_elements_use_for_averaging(row)
return np.mean(row[-elements_to_take:])

def _how_many_elements_use_for_averaging(self, time_series: np.array):
elements_to_take = round(len(time_series) * self.part_for_averaging)
elements_to_take = fix_elements_number(elements_to_take)
return elements_to_take


def split_rolling_slices(input_data: InputData):
""" Prepare slices for features series.
Example of result for time series [0, 1, 2, 3]:
[[0, nan, nan, nan],
[0, 1, nan, nan],
[0, 1, 2, nan],
[0, 1, 2, 3]]
"""
nan_mask = np.triu(np.ones_like(input_data.features, dtype=bool), k=1)
final_matrix = np.tril(input_data.features, k=0)
final_matrix = np.array(final_matrix, dtype=float)
final_matrix[nan_mask] = np.nan

return final_matrix


def fix_elements_number(elements_to_take: int):
if elements_to_take < 2:
return 2
return elements_to_take
def _window(self, time_series: np.ndarray):
return max(2, round(time_series.shape[0] * self.part_for_averaging))
6 changes: 3 additions & 3 deletions fedot/core/repository/data/data_operation_repository.json
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@
},
"ransac_non_lin_reg": {
"meta": "regression_preprocessing",
"presets": ["fast_train", "*tree"],
"presets": ["*tree"],
"tags": ["affects_target", "non_linear", "filtering",
"correct_params", "non_applicable_for_ts", "non-default"]
"correct_params", "non_applicable_for_ts"]
},
"isolation_forest_reg": {
"meta": "regression_preprocessing",
Expand Down Expand Up @@ -293,7 +293,7 @@
},
"diff_filter": {
"meta": "custom_time_series_transformation",
"presets": ["fast_train", "ts"],
"presets": ["ts"],
"tags": [
"differential",
"non_lagged",
Expand Down
93 changes: 65 additions & 28 deletions test/integration/models/test_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pickle
from collections import defaultdict

from copy import deepcopy
from time import perf_counter
from typing import Tuple, Optional

import numpy as np
import pytest
Expand All @@ -10,6 +11,7 @@
from sklearn.metrics import mean_absolute_error, mean_squared_error, roc_auc_score as roc_auc
from sklearn.preprocessing import MinMaxScaler

from fedot.core.constants import FAST_TRAIN_PRESET_NAME
from fedot.core.data.data import InputData, OutputData
from fedot.core.data.data_split import train_test_data_setup
from fedot.core.data.supplementary_data import SupplementaryData
Expand All @@ -26,7 +28,7 @@
from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.core.repository.operation_types_repository import OperationTypesRepository
from fedot.core.repository.operation_types_repository import OperationMetaInfo, OperationTypesRepository
from fedot.core.repository.tasks import Task, TaskTypesEnum, TsForecastingParams
from test.unit.common_tests import is_predict_ignores_target
from test.unit.data_operations.test_time_series_operations import synthetic_univariate_ts
Expand Down Expand Up @@ -135,6 +137,41 @@ def get_pca_incorrect_data():
return input_data


def get_operation_perfomance(operation: OperationMetaInfo,
data_lengths: Tuple[float, ...],
times: int = 1) -> Optional[Tuple[float, ...]]:
"""
Helper function to check perfomance of only the first valid operation pair (task_type, input_type).
"""
def fit_time_for_operation(operation: OperationMetaInfo,
data: InputData):
nodes_from = []
if task_type is TaskTypesEnum.ts_forecasting:
if 'non_lagged' not in operation.tags:
nodes_from = [PipelineNode('lagged')]
node = PipelineNode(operation.id, nodes_from=nodes_from)
pipeline = Pipeline(node)
start_time = perf_counter()
pipeline.fit(data)
return perf_counter() - start_time

for task_type in operation.task_type:
for data_type in operation.input_types:
perfomance_values = []
for length in data_lengths:
data = get_data_for_testing(task_type, data_type,
length=length, features_count=2,
random=True)
if data is not None:
min_evaluated_time = min(fit_time_for_operation(operation, data) for _ in range(times))
perfomance_values.append(min_evaluated_time)
if perfomance_values:
if len(perfomance_values) != len(data_lengths):
raise ValueError('not all measurements have been proceeded')
return tuple(perfomance_values)
raise Exception(f"Fit time for operation ``{operation.id}`` cannot be measured")


@pytest.fixture()
def classification_dataset():
samples = 1000
Expand Down Expand Up @@ -475,31 +512,31 @@ def test_operations_are_serializable():


def test_operations_are_fast():
# models that raise exception
to_skip = ['custom', 'decompose', 'class_decompose']
time_limits = defaultdict(lambda *args: 0.5, {'expensive': 2, 'non-default': 100})
"""
Test ensures that all operations with fast_train preset meet sustainability expectation.
Test defines operation complexity as polynomial function of data size.
If complexity function grows fast, then operation should not have fast_train tag.
"""

data_lengths = tuple(map(int, np.logspace(2.2, 4, 6)))
reference_operations = ['rf', 'rfr']
to_skip = ['custom', 'decompose', 'class_decompose', 'kmeans',
'resample', 'one_hot_encoding'] + reference_operations
reference_time = (float('inf'), ) * len(data_lengths)
# tries for time measuring
attempt = 2

for operation in OperationTypesRepository('all')._repo:
if operation.id in to_skip:
continue
time_limit = [time_limits[tag] for tag in time_limits if tag in operation.tags]
time_limit = max(time_limit) if time_limit else time_limits.default_factory()
for task_type in operation.task_type:
for data_type in operation.input_types:
data = get_data_for_testing(task_type, data_type,
length=100, features_count=2,
random=True)
if data is not None:
try:
nodes_from = []
if task_type is TaskTypesEnum.ts_forecasting:
if 'non_lagged' not in operation.tags:
nodes_from = [PipelineNode('lagged')]
node = PipelineNode(operation.id, nodes_from=nodes_from)
pipeline = Pipeline(node)
start_time = perf_counter()
pipeline.fit(data)
stop_time = perf_counter() - start_time
assert stop_time <= time_limit or True
except NotImplementedError:
pass
if operation.id in reference_operations:
perfomance_values = get_operation_perfomance(operation, data_lengths, attempt)
reference_time = tuple(map(min, zip(perfomance_values, reference_time)))

for operation in OperationTypesRepository('all')._repo:
if (operation.id not in to_skip and operation.presets and FAST_TRAIN_PRESET_NAME in operation.presets):
for _ in range(attempt):
perfomance_values = get_operation_perfomance(operation, data_lengths)
# if attempt is successful then stop
if all(x >= y for x, y in zip(reference_time, perfomance_values)):
break
else:
raise Exception(f"Operation {operation.id} cannot have ``fast-train`` tag")

0 comments on commit dd7a75e

Please sign in to comment.