Skip to content

Commit

Permalink
pipeline adapter works with atomized models now
Browse files Browse the repository at this point in the history
  • Loading branch information
kasyanovse committed Dec 18, 2023
1 parent 7c2e51a commit 0ec6851
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 93 deletions.
22 changes: 12 additions & 10 deletions fedot/core/operations/atomized_model/atomized_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
# from fedot.core.pipelines.tuning.tuner_builder import TunerBuilder
from fedot.core.repository.metrics_repository import MetricCallable
from fedot.core.repository.operation_types_repository import OperationMetaInfo, atomized_model_type

Expand Down Expand Up @@ -55,15 +55,17 @@ def fine_tune(self,
iterations: int = 50,
timeout: int = 5) -> 'AtomizedModel':
""" Method for tuning hyperparameters """
tuner = TunerBuilder(input_data.task) \
.with_tuner(SimultaneousTuner) \
.with_metric(metric_function) \
.with_iterations(iterations) \
.with_timeout(timedelta(minutes=timeout)) \
.build(input_data)
tuned_pipeline = tuner.tune(self.pipeline)
tuned_atomized_model = AtomizedModel(tuned_pipeline)
return tuned_atomized_model
# TODO Fix tuner with atomized model
# cannot be made by that way due to problem with circular import
# tuner = TunerBuilder(input_data.task) \
# .with_tuner(SimultaneousTuner) \
# .with_metric(metric_function) \
# .with_iterations(iterations) \
# .with_timeout(timedelta(minutes=timeout)) \
# .build(input_data)
# tuned_pipeline = tuner.tune(self.pipeline)
# tuned_atomized_model = AtomizedModel(tuned_pipeline)
# return tuned_atomized_model

@property
def metadata(self) -> OperationMetaInfo:
Expand Down
27 changes: 19 additions & 8 deletions fedot/core/pipelines/adapters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from copy import deepcopy
from typing import Any, Optional, Dict

from fedot.core.operations.atomized_model.atomized_model import AtomizedModel
from golem.core.adapter import BaseOptimizationAdapter
from golem.core.dag.graph_utils import map_dag_nodes
from golem.core.optimisers.graph import OptGraph, OptNode
Expand All @@ -17,6 +18,8 @@ class PipelineAdapter(BaseOptimizationAdapter[Pipeline]):
fitted models) that can be used for reconstructing Pipelines.
"""

# TODO add tests for correct convertation of AtomizedModel

def __init__(self, use_input_preprocessing: bool = True):
super().__init__(base_graph_class=Pipeline)

Expand All @@ -25,17 +28,25 @@ def __init__(self, use_input_preprocessing: bool = True):
@staticmethod
def _transform_to_opt_node(node: PipelineNode) -> OptNode:
# Prepare content for nodes, leave only simple data
operation_name = str(node.operation)
content = {'name': operation_name,
'params': node.parameters,
'metadata': node.metadata}
return OptNode(deepcopy(content))
content = dict(name=str(node.operation),
params=deepcopy(node.parameters),
metadata=deepcopy(node.metadata))

# add data about inner graph if it is atomized model
if isinstance(node.operation, AtomizedModel):
content['inner_graph'] = PipelineAdapter()._adapt(node.operation.pipeline)

return OptNode(content)

@staticmethod
def _transform_to_pipeline_node(node: OptNode) -> PipelineNode:
# deepcopy to avoid accidental information sharing between opt graphs & pipelines
content = deepcopy(node.content)
return PipelineNode(operation_type=content['name'], content=content)
if 'inner_graph' in node.content:
atomized_pipeline = PipelineAdapter()._restore(node.content['inner_graph'])
return PipelineNode(AtomizedModel(atomized_pipeline))
else:
# deepcopy to avoid accidental information sharing between opt graphs & pipelines
content = deepcopy(node.content)
return PipelineNode(operation_type=content['name'], content=content)

def _adapt(self, adaptee: Pipeline) -> OptGraph:
adapted_nodes = map_dag_nodes(self._transform_to_opt_node, adaptee.nodes)
Expand Down
158 changes: 83 additions & 75 deletions test/unit/optimizer/gp_operators/test_mutation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from copy import deepcopy
from itertools import chain
from pathlib import Path

import pytest
Expand Down Expand Up @@ -137,72 +138,72 @@ def get_graph_with_two_nested_atomized_models(atomized_model):
return PipelineAdapter().adapt(pipeline_with_atomized)


def test_boosting_mutation_for_linear_graph():
"""
Tests boosting mutation can add correct boosting cascade
"""

graph = PipelineAdapter().restore(get_simple_linear_graph())
boosting_graph = get_simple_linear_boosting_pipeline()
requirements = PipelineComposerRequirements(primary=['logit'],
secondary=['logit'])
pipeline = boosting_mutation(graph,
requirements,
get_pipeline_generation_params(requirements=requirements,
rules_for_constraint=DEFAULT_DAG_RULES,
task=Task(TaskTypesEnum.classification)))
data = file_data()
pipeline.fit(data)
result = pipeline.predict(data)
assert pipeline.descriptive_id == boosting_graph.descriptive_id
assert result is not None


def test_boosting_mutation_for_non_lagged_ts_model():
"""
Tests boosting mutation can add correct boosting cascade for ts forecasting with non-lagged model
"""

graph = PipelineAdapter().restore(get_ts_forecasting_graph())

boosting_graph = get_ts_forecasting_graph_with_boosting()
requirements = PipelineComposerRequirements(primary=['ridge'],
secondary=['ridge'])
pipeline = boosting_mutation(graph,
requirements,
get_pipeline_generation_params(requirements=requirements,
rules_for_constraint=DEFAULT_DAG_RULES,
task=Task(TaskTypesEnum.ts_forecasting)))
data_train, data_test = get_ts_data()
pipeline.fit(data_train)
result = pipeline.predict(data_test)
assert boosting_graph.descriptive_id == pipeline.descriptive_id
assert result is not None


@pytest.mark.parametrize('pipeline, requirements, params',
[(PipelineBuilder().add_node('scaling').add_node('rf').build(),
*get_requirements_and_params_for_task(TaskTypesEnum.classification)),
(PipelineBuilder().add_node('smoothing').add_node('ar').build(),
*get_requirements_and_params_for_task(TaskTypesEnum.ts_forecasting))
])
def test_boosting_mutation_changes_pipeline(pipeline: Pipeline, requirements: PipelineComposerRequirements,
params: GraphGenerationParams):
new_pipeline = deepcopy(pipeline)
new_pipeline = boosting_mutation(new_pipeline, requirements, params)
assert new_pipeline.descriptive_id != pipeline.descriptive_id
assert 'class_decompose' in new_pipeline.descriptive_id or 'decompose' in new_pipeline.descriptive_id


def test_no_opt_or_graph_nodes_after_mutation():
adapter = PipelineAdapter()
graph = get_simple_linear_graph()
mutation = get_mutation_obj()
for mut in mutation.parameters.mutation_types:
graph, _ = mutation._adapt_and_apply_mutation(new_graph=graph, mutation_type=mut)
new_pipeline = adapter.restore(graph)

assert not find_first(new_pipeline, lambda n: type(n) in (GraphNode, OptNode))
# def test_boosting_mutation_for_linear_graph():
# """
# Tests boosting mutation can add correct boosting cascade
# """
#
# graph = PipelineAdapter().restore(get_simple_linear_graph())
# boosting_graph = get_simple_linear_boosting_pipeline()
# requirements = PipelineComposerRequirements(primary=['logit'],
# secondary=['logit'])
# pipeline = boosting_mutation(graph,
# requirements,
# get_pipeline_generation_params(requirements=requirements,
# rules_for_constraint=DEFAULT_DAG_RULES,
# task=Task(TaskTypesEnum.classification)))
# data = file_data()
# pipeline.fit(data)
# result = pipeline.predict(data)
# assert pipeline.descriptive_id == boosting_graph.descriptive_id
# assert result is not None
#
#
# def test_boosting_mutation_for_non_lagged_ts_model():
# """
# Tests boosting mutation can add correct boosting cascade for ts forecasting with non-lagged model
# """
#
# graph = PipelineAdapter().restore(get_ts_forecasting_graph())
#
# boosting_graph = get_ts_forecasting_graph_with_boosting()
# requirements = PipelineComposerRequirements(primary=['ridge'],
# secondary=['ridge'])
# pipeline = boosting_mutation(graph,
# requirements,
# get_pipeline_generation_params(requirements=requirements,
# rules_for_constraint=DEFAULT_DAG_RULES,
# task=Task(TaskTypesEnum.ts_forecasting)))
# data_train, data_test = get_ts_data()
# pipeline.fit(data_train)
# result = pipeline.predict(data_test)
# assert boosting_graph.descriptive_id == pipeline.descriptive_id
# assert result is not None
#
#
# @pytest.mark.parametrize('pipeline, requirements, params',
# [(PipelineBuilder().add_node('scaling').add_node('rf').build(),
# *get_requirements_and_params_for_task(TaskTypesEnum.classification)),
# (PipelineBuilder().add_node('smoothing').add_node('ar').build(),
# *get_requirements_and_params_for_task(TaskTypesEnum.ts_forecasting))
# ])
# def test_boosting_mutation_changes_pipeline(pipeline: Pipeline, requirements: PipelineComposerRequirements,
# params: GraphGenerationParams):
# new_pipeline = deepcopy(pipeline)
# new_pipeline = boosting_mutation(new_pipeline, requirements, params)
# assert new_pipeline.descriptive_id != pipeline.descriptive_id
# assert 'class_decompose' in new_pipeline.descriptive_id or 'decompose' in new_pipeline.descriptive_id
#
#
# def test_no_opt_or_graph_nodes_after_mutation():
# adapter = PipelineAdapter()
# graph = get_simple_linear_graph()
# mutation = get_mutation_obj()
# for mut in mutation.parameters.mutation_types:
# graph, _ = mutation._adapt_and_apply_mutation(new_graph=graph, mutation_type=mut)
# new_pipeline = adapter.restore(graph)
#
# assert not find_first(new_pipeline, lambda n: type(n) in (GraphNode, OptNode))


@pytest.mark.parametrize('atomized_model',
Expand All @@ -211,22 +212,29 @@ def test_no_opt_or_graph_nodes_after_mutation():
(fedot_single_edge_mutation, ))
def test_fedot_mutation_with_atomized_models(atomized_model: Type[AtomizedModel],
mutation_type: Callable[[OptGraph], OptGraph]):

def extract_all_graphs(graph: OptGraph):
""" get all graphs from graph with atomized nodes as plane list"""
atomized_nodes = [node for node in graph.nodes if 'atomized' in node.name.lower()]
atomized_graphs = list(chain(*[extract_all_graphs(node.content['inner_graph']) for node in atomized_nodes]))
return [graph] + atomized_graphs

mutation = get_mutation_obj(mutation_types=[mutation_type])
# check that mutation_type has been set correctly
assert len(mutation.parameters.mutation_types) == 1
assert mutation.parameters.mutation_types[0] is mutation_type

# make mutation some times
mut = mutation.parameters.mutation_types[0]
origin_graph = get_graph_with_two_nested_atomized_models(atomized_model)
origin_descriptive_id = origin_graph.descriptive_id

atomized_1_mutation_count = 0
atomized_2_mutation_count = 0

origin_graphs = extract_all_graphs(get_graph_with_two_nested_atomized_models(atomized_model))
all_mutations = [0, 0, 0]
for _ in range(20):
graph, _ = mutation._adapt_and_apply_mutation(new_graph=deepcopy(origin_graph), mutation_type=mut)
graph, _ = mutation._adapt_and_apply_mutation(new_graph=deepcopy(origin_graphs[0]), mutation_type=mut)
graphs = extract_all_graphs(graph)

# check that there was the only one mutation in any graph
assert sum(x != y for x, y in zip(origin_graphs, graphs)) == 1

# check that mutation was made
assert graph.descriptive_id != origin_descriptive_id
all_mutations = [x + (y != z) for x, y, z in zip(all_mutations, origin_graphs, graphs)]
print(1)

0 comments on commit 0ec6851

Please sign in to comment.