Skip to content

Commit

Permalink
create new operation type - atomized
Browse files Browse the repository at this point in the history
  • Loading branch information
kasyanovse committed Dec 21, 2023
1 parent 0cb15ec commit 0a5defd
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 106 deletions.
14 changes: 14 additions & 0 deletions fedot/core/operations/atomized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from fedot.core.operations.model import Model
from fedot.core.repository.operation_types_repository import OperationTypesRepository


class Atomized(Model):
"""
Class with fit/predict methods defining the atomized strategy for the task
:param operation_type: name of the model
"""

def __init__(self, operation_type: str):
super().__init__(operation_type=operation_type)
self.operations_repo = OperationTypesRepository('atomized')
48 changes: 48 additions & 0 deletions fedot/core/operations/evaluation/atomized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import warnings
from typing import Optional

from fedot.core.data.data import InputData, OutputData
from fedot.core.operations.evaluation.evaluation_interfaces import EvaluationStrategy, SkLearnEvaluationStrategy
from fedot.core.operations.evaluation.operation_implementations.data_operations.decompose \
import DecomposerRegImplementation
from fedot.core.operations.evaluation.operation_implementations.data_operations.sklearn_filters \
import IsolationForestRegImplementation
from fedot.core.operations.evaluation.operation_implementations. \
data_operations.sklearn_filters import LinearRegRANSACImplementation, NonLinearRegRANSACImplementation
from fedot.core.operations.evaluation.operation_implementations. \
data_operations.sklearn_selectors import LinearRegFSImplementation, NonLinearRegFSImplementation
from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_differ import \
AtomizedTimeSeriesDiffer
from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_sampler import \
AtomizedTimeSeriesSampler
from fedot.core.operations.evaluation.operation_implementations.models.atomized.atomized_ts_scaler import \
AtomizedTimeSeriesScaler
from fedot.core.operations.evaluation.operation_implementations.models.knn import FedotKnnRegImplementation
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.utilities.random import ImplementationRandomStateHandler

warnings.filterwarnings("ignore", category=UserWarning)


class FedotAtomizedStrategy(EvaluationStrategy):
_operations_by_types = {
'atomized_ts_differ': AtomizedTimeSeriesDiffer,
'atomized_ts_scaler': AtomizedTimeSeriesScaler,
'atomized_ts_sampler': AtomizedTimeSeriesSampler
}

def __init__(self, operation_type: str, params: Optional[OperationParameters] = None):
self.operation_impl = self._convert_to_operation(operation_type)
super().__init__(operation_type, params)

def fit(self, train_data: InputData):
model = self.operation_impl(self.params_for_fit.get('pipeline'))
return model.fit(train_data)

def predict(self, trained_operation, predict_data: InputData) -> OutputData:
prediction = trained_operation.predict(predict_data)
return prediction

def predict_for_fit(self, trained_operation, predict_data: InputData) -> OutputData:
prediction = trained_operation.predict_for_fit(predict_data)
return prediction
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
from fedot.core.repository.tasks import TaskTypesEnum, TsForecastingParams, Task


class AtomizedTimeSeriesDiffer(AtomizedModel):
class AtomizedTimeSeriesDiffer:
""" Get diff of timeseries, train model/forecast, integrate result """

operation_type = 'atomized_ts_differ'

def __init__(self, pipeline: Optional['Pipeline'] = None):
if pipeline is None:
pipeline = Pipeline(PipelineNode('ridge'))
super().__init__(pipeline=pipeline)
self.pipeline = pipeline

def _diff(self, data: InputData, fit_stage: bool):
new_features = np.diff(data.features, axis=1)
Expand All @@ -34,39 +32,35 @@ def _diff(self, data: InputData, fit_stage: bool):
new_target = data.target

supplementary_data = data.supplementary_data
supplementary_data.time_series_bias.append(bias)
# supplementary_data.time_series_bias.append(bias)

new_data = InputData(idx=data.idx,
features=new_features,
target=new_target,
task=data.task,
data_type=data.data_type,
supplementary_data=supplementary_data)
return new_data
return new_data, bias

def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData):
def fit(self, data: InputData):
# TODO define is there need for unfit
if data.task.task_type is not TaskTypesEnum.ts_forecasting:
raise ValueError(f"{self.__class__} supports only time series forecasting task")
return super().fit(params, self._diff(data, fit_stage=True))
data, _ = self._diff(data, fit_stage=True)
self.pipeline.fit(data)
return self

def _sample_predict(self,
fitted_operation: 'Pipeline',
data: InputData,
params: Optional[Union[OperationParameters, Dict[str, Any]]] = None,
output_mode: str = 'default') -> OutputData:
new_data = self._diff(data, fit_stage=False)
prediction = super().predict(fitted_operation=fitted_operation,
data=new_data,
params=params,
output_mode=output_mode)
bias = prediction.supplementary_data.time_series_bias.pop()
def predict(self, data: InputData) -> OutputData:
new_data, bias = self._diff(data, fit_stage=False)
prediction = self.pipeline.predict(new_data)
new_predict = np.cumsum(prediction.predict.reshape((bias.shape[0], -1)), axis=1) + bias
new_predict = new_predict.reshape(prediction.predict.shape)
prediction.predict = new_predict
return prediction

def predict(self, *args, **kwargs) -> OutputData:
return self._sample_predict(*args, **kwargs)
prediction.idx = data.idx
if prediction.target is not None:
prediction.predict = np.reshape(prediction.predict, prediction.target.shape)
return prediction

def predict_for_fit(self, *args, **kwargs) -> OutputData:
return self._sample_predict(*args, **kwargs)
def predict_for_fit(self, data: InputData) -> OutputData:
return self.predict(data)
39 changes: 6 additions & 33 deletions fedot/core/operations/factory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from fedot.core.operations.atomized import Atomized
from fedot.core.operations.automl import AutoML
from fedot.core.operations.data_operation import DataOperation
from fedot.core.operations.model import Model
Expand All @@ -15,7 +16,9 @@ class OperationFactory:

def __init__(self, operation_name):
self.operation_name = operation_name
self.operation_type = self._define_operation_type()
self.operation_type = (OperationTypesRepository('all')
.operation_info_by_id(self.operation_name)
.repository_name)

def get_operation(self) -> Operation:
"""
Expand All @@ -30,39 +33,9 @@ def get_operation(self) -> Operation:
operation = DataOperation(operation_type=self.operation_name)
elif self.operation_type == 'automl':
operation = AutoML(operation_type=self.operation_name)
elif self.operation_type == 'atomized':
operation = Atomized(operation_type=self.operation_name)
else:
raise ValueError(f'Operation type {self.operation_type} is not supported')

return operation

@property
def operation_type_name(self):
return self.operation_type

def _define_operation_type(self) -> str:
"""
The method determines what type of operations is set for this node
:return : operations type 'model', 'automl' or 'data_operation'
"""

# Get available models from model_repository.json file
operations_repo = OperationTypesRepository('data_operation')
operations = operations_repo.operations
if 'automl' in OperationTypesRepository.get_available_repositories():
automl_repo = OperationTypesRepository('automl')
models_automl = automl_repo.operations
else:
models_automl = []

operation_name = get_operation_type_from_id(self.operation_name)

# If there is a such model in the list
if any(operation_name == model.id for model in operations):
operation_type = 'data_operation'
elif any(operation_name == model.id for model in models_automl):
operation_type = 'automl'
# Otherwise - it is model
else:
operation_type = 'model'
return operation_type
56 changes: 21 additions & 35 deletions fedot/core/optimisers/genetic_operators/mutation.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,23 @@
from copy import deepcopy
from functools import partial, wraps, WRAPPER_ASSIGNMENTS
from itertools import chain
from random import choice, randint, random, sample
from typing import TYPE_CHECKING, Optional, Dict, Callable, Union

from fedot.core.operations.atomized_model.atomized_model import AtomizedModel
from fedot.core.operations.atomized_model.atomized_ts_differ import AtomizedTimeSeriesDiffer
from fedot.core.operations.atomized_model.atomized_ts_sampler import AtomizedTimeSeriesSampler
from fedot.core.operations.atomized_model.atomized_ts_scaler import AtomizedTimeSeriesScaler
from functools import WRAPPER_ASSIGNMENTS
from random import choice
from typing import Dict, Callable, Union

from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.repository.operation_types_repository import get_operations_for_task, OperationTypesRepository
from fedot.core.repository.operation_types_repository import OperationTypesRepository
from golem.core.adapter import register_native
from golem.core.dag.graph import ReconnectType
from golem.core.dag.graph_node import GraphNode
from golem.core.dag.graph_utils import distance_to_root_level, distance_to_primary_level, graph_has_cycle
from golem.core.optimisers.advisor import RemoveType
from golem.core.optimisers.genetic.operators.base_mutations import \
add_as_child, add_separate_parent_node, add_intermediate_node, single_edge_mutation, single_add_mutation, \
single_edge_mutation, single_add_mutation, \
single_change_mutation, single_drop_mutation
from golem.core.optimisers.graph import OptGraph, OptNode
from golem.core.optimisers.opt_node_factory import OptNodeFactory
from golem.core.optimisers.graph import OptGraph
from golem.core.optimisers.optimization_parameters import GraphRequirements
from golem.core.optimisers.optimizer import GraphGenerationParams, AlgorithmParameters
from golem.utilities.data_structures import ComparableEnum as Enum
from golem.core.optimisers.optimizer import GraphGenerationParams
from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters



OperationTypesRepository.init_repository('atomized')
ATOMIZED_OPERATION_REPOSITORY = OperationTypesRepository('atomized')
# TODO add ability to construct PipelineNodes as PipelineNode(name_of_atomized_operation)
ATOMIZED_OPERATION_MAP = {'atomized_ts_differ': AtomizedTimeSeriesDiffer,
'atomized_ts_scaler': AtomizedTimeSeriesScaler,
'atomized_ts_sampler': AtomizedTimeSeriesSampler}


def _extract_graphs(graph: OptGraph) -> Dict[str, OptGraph]:
Expand All @@ -42,8 +26,8 @@ def _extract_graphs(graph: OptGraph) -> Dict[str, OptGraph]:
and values as graphs """
graphs = {'': graph}
for node in graph.nodes:
if 'inner_graph' in node.content:
extracted_graphs = _extract_graphs(node.content['inner_graph'])
if 'pipeline' in node.parameters:
extracted_graphs = _extract_graphs(node.parameters['pipeline'])
for k, v in extracted_graphs.items():
graphs[k or node.uid] = v
return graphs
Expand All @@ -61,13 +45,13 @@ def _insert_graphs(full_graph: OptGraph, node_uid: str, graph: OptGraph) -> OptG
node = nodes.pop()
if node.uid == node_uid:
break
if 'inner_graph' in node.content:
nodes.extend(node.content['inner_graph'].nodes)
if 'pipeline' in node.content['params']:
nodes.extend(node.content['params']['pipeline'].nodes)
else:
raise ValueError(f"Unknown node uid: {node_uid}")
if 'inner_graph' not in node.content:
raise ValueError(f"Cannot insert graph to non AtomizedModel")
node.content['inner_graph'] = graph
if 'pipeline' not in node.content['params']:
raise ValueError(f"Cannot insert graph to non atomized model")
node.content['params']['pipeline'] = graph
return full_graph


Expand Down Expand Up @@ -107,8 +91,8 @@ def mutation_for_atomized_graph(graph: Union[OptGraph, Pipeline],
fedot_single_drop_mutation = register_native(atomized_mutation(single_drop_mutation))


# TODO make insert_ts_atomized_operation atomized mutation
# @atomized_mutation

@atomized_mutation
def insert_atomized_operation(pipeline: Pipeline,
requirements: GraphRequirements,
graph_gen_params: GraphGenerationParams,
Expand All @@ -117,8 +101,9 @@ def insert_atomized_operation(pipeline: Pipeline,
""" Wrap part of pipeline to atomized operation
"""
task_type = graph_gen_params.advisor.task.task_type
atomized_operations = ATOMIZED_OPERATION_REPOSITORY.suitable_operation(task_type=task_type)
atomized_operations = ATOMIZED_OPERATION_REPOSITORY.suitable_operation(task_type=task_type, tags=['non-default'])
atomized_operation = choice(atomized_operations)
atomized_operation = 'atomized_ts_differ'
info = ATOMIZED_OPERATION_REPOSITORY.operation_info_by_id(atomized_operation)
it, ot = set(info.input_types), set(info.output_types)

Expand All @@ -130,7 +115,8 @@ def insert_atomized_operation(pipeline: Pipeline,

if nodes:
node = choice(nodes)
new_node = PipelineNode(ATOMIZED_OPERATION_MAP[atomized_operation](Pipeline(node)))
inner_pipeline = Pipeline(PipelineNode(content=node.content))
new_node = PipelineNode(content={'name': atomized_operation, 'params': {'pipeline': inner_pipeline}})
pipeline.update_node(node, new_node)
return pipeline

Expand Down
2 changes: 0 additions & 2 deletions fedot/core/optimisers/objective/data_objective_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import numpy as np

from fedot.core.pipelines.adapters import PipelineAdapter
from golem.core.log import default_log, is_test_session
from golem.core.optimisers.fitness import Fitness
from golem.core.optimisers.objective.objective import Objective, to_fitness
Expand Down Expand Up @@ -67,7 +66,6 @@ def evaluate(self, graph: Pipeline) -> Fitness:
try:
prepared_pipeline = self.prepare_graph(graph, train_data, fold_id, self._eval_n_jobs)
except Exception as ex:
ppl = PipelineAdapter()._restore(graph)
self._log.warning(f'Unsuccessful pipeline fit during fitness evaluation. '
f'Skipping the pipeline. Exception <{ex}> on {graph_id}')
if is_test_session() and not isinstance(ex, TimeoutError):
Expand Down
9 changes: 7 additions & 2 deletions fedot/core/pipelines/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class PipelineAdapter(BaseOptimizationAdapter[Pipeline]):
fitted models) that can be used for reconstructing Pipelines.
"""

# TODO add tests for correct convertation of AtomizedModel
# TODO add tests for correct convertation of Atomized

def __init__(self, use_input_preprocessing: bool = True):
super().__init__(base_graph_class=Pipeline)
Expand All @@ -34,9 +34,12 @@ def _transform_to_opt_node(node: PipelineNode) -> OptNode:

# add data about inner graph if it is atomized model
if isinstance(node.operation, AtomizedModel):
content['atomized_class'] = node.operation.__class__
content['inner_graph'] = PipelineAdapter()._adapt(node.operation.pipeline)

# add data about inner graph if it is atomized
if 'pipeline' in content['params']:
content['params']['pipeline'] = PipelineAdapter()._adapt(content['params']['pipeline'])

return OptNode(content)

@staticmethod
Expand All @@ -47,6 +50,8 @@ def _transform_to_pipeline_node(node: OptNode) -> PipelineNode:
else:
# deepcopy to avoid accidental information sharing between opt graphs & pipelines
content = deepcopy(node.content)
if 'params' in content and 'pipeline' in content['params']:
content['params']['pipeline'] = PipelineAdapter()._restore(content['params']['pipeline'])
return PipelineNode(operation_type=content['name'], content=content)

def _adapt(self, adaptee: Pipeline) -> OptGraph:
Expand Down
3 changes: 3 additions & 0 deletions fedot/core/pipelines/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def description(self) -> str:
if 'inner_graph' in self.content:
root_nodes = self.content['inner_graph'].root_nodes()
node_label = f"{node_label}(INNER{''.join(node.descriptive_id for node in root_nodes)}INNER)"
if 'params' in self.content and 'pipeline' in self.content['params']:
root_nodes = self.content['params']['pipeline'].root_nodes()
node_label = f"{node_label}(INNER{''.join(node.descriptive_id for node in root_nodes)}INNER)"
return node_label


Expand Down
3 changes: 3 additions & 0 deletions fedot/core/pipelines/verification_rules.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Optional

from fedot.core.operations.atomized import Atomized
from fedot.core.operations.atomized_model.atomized_model import AtomizedModel
from fedot.core.operations.model import Model
from fedot.core.pipelines.node import PipelineNode
Expand Down Expand Up @@ -29,6 +30,8 @@ def has_final_operation_as_model(pipeline: Pipeline):
root_node = pipeline.root_node
if root_node.operation.operation_type == atomized_model_type():
has_final_operation_as_model(root_node.operation.pipeline)
elif isinstance(root_node.operation, Atomized):
has_final_operation_as_model(root_node.content['params']['pipeline'])
elif type(root_node.operation) is not Model:
raise ValueError(f'{ERROR_PREFIX} Root operation is not a model')
return True
Expand Down
Loading

0 comments on commit 0a5defd

Please sign in to comment.