Skip to content

Commit

Permalink
Graph edit distance between pipelines (#614)
Browse files Browse the repository at this point in the history
Graph edit distance between pipelines
  • Loading branch information
MorrisNein authored Apr 7, 2022
1 parent 992f30e commit 3fa9b2f
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 30 deletions.
23 changes: 22 additions & 1 deletion fedot/core/dag/graph_operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from copy import deepcopy
from typing import Any, List, Optional, Union, Tuple
from typing import Any, List, Optional, Union, Tuple, Dict

from networkx import set_node_attributes, graph_edit_distance

from fedot.core.dag.graph_node import GraphNode
from fedot.core.pipelines.convert import graph_structure_as_nx_graph
Expand Down Expand Up @@ -224,3 +226,22 @@ def get_all_edges(self) -> List[Tuple[GraphNode, GraphNode]]:
for parent_node in node.nodes_from:
edges.append((parent_node, node))
return edges

def distance_to(self, other_graph: 'Graph') -> int:
def node_match(node_data_1: Dict[str, GraphNode], node_data_2: Dict[str, GraphNode]) -> bool:
node_1, node_2 = node_data_1.get('node'), node_data_2.get('node')

is_operation_match = str(node_1) == str(node_2)
is_params_match = node_1.content.get('params') == node_2.content.get('params')
is_match = is_operation_match and is_params_match
return is_match

graphs = (self._graph, other_graph)
nx_graphs = []
for graph in graphs:
nx_graph, nodes = graph_structure_as_nx_graph(graph)
set_node_attributes(nx_graph, nodes, name='node')
nx_graphs.append(nx_graph)

distance = graph_edit_distance(*nx_graphs, node_match=node_match)
return int(distance)
12 changes: 7 additions & 5 deletions fedot/core/optimisers/adapters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import abstractmethod
from copy import deepcopy
from typing import Any, Type
from typing import Any, Type, Optional, Dict

from fedot.core.dag.graph_node import GraphNode
from fedot.core.log import default_log
Expand Down Expand Up @@ -108,20 +108,22 @@ def adapt(self, adaptee: Pipeline):
graph = OptGraph(source_pipeline.nodes)
return graph

def restore(self, opt_graph: OptGraph, computation_time=None):
def restore(self, opt_graph: OptGraph, metadata: Optional[Dict[str, Any]] = None) -> 'Pipeline':
""" Convert OptGraph class into Pipeline class """
metadata = metadata or {}
source_graph = deepcopy(opt_graph)

# Inverse transformation since root node
for node in source_graph.nodes:
_transform_node(node=node, primary_class=PrimaryNode, secondary_class=SecondaryNode,
transform_func=self._transform_to_pipeline_node)
pipeline = Pipeline(source_graph.nodes)
pipeline.computation_time = computation_time
pipeline.computation_time = metadata.get('computation_time')
return pipeline

def restore_as_template(self, opt_graph: OptGraph, computation_time=None):
pipeline = self.restore(opt_graph, computation_time)
def restore_as_template(self, opt_graph: OptGraph, metadata: Optional[Dict[str, Any]] = None):
metadata = metadata or {}
pipeline = self.restore(opt_graph, metadata)
tmp = PipelineTemplate(pipeline)
return tmp

Expand Down
4 changes: 2 additions & 2 deletions fedot/core/optimisers/gp_comp/evaluating.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def single_evaluating(reversed_individuals):
individual_context.ind.fitness = calculate_objective(graph, individual_context.objective_function,
individual_context.is_multi_objective,
individual_context.graph_generation_params)
individual_context.ind.computation_time = timeit.default_timer() - start_time
individual_context.metadata = {'computation_time': timeit.default_timer() - start_time}
if individual_context.ind.fitness is not None:
evaluated_individuals.append(individual_context.ind)
num_of_successful_evals += 1
Expand Down Expand Up @@ -76,7 +76,7 @@ def individual_evaluation(individual: Dict) -> Union[Individual, None]:
replace_n_jobs_in_nodes(graph)
individual_.ind.fitness = calculate_objective(graph, individual_.objective_function,
individual_.is_multi_objective, individual_.graph_generation_params)
individual_.ind.computation_time = timeit.default_timer() - start_time
individual_.ind.metadata['computation_time'] = timeit.default_timer() - start_time
return individual_.ind


Expand Down
13 changes: 7 additions & 6 deletions fedot/core/optimisers/gp_comp/individual.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List, Any, Dict, Optional, Union
from uuid import uuid4

from fedot.core.optimisers.graph import OptGraph
Expand All @@ -8,13 +8,14 @@


class Individual:
def __init__(self, graph: 'OptGraph', fitness: List[float] = None,
parent_operators: List[ParentOperator] = None, computation_time: Optional[int] = None):
def __init__(self, graph: 'OptGraph', fitness: Optional[Union[float, List[float]]] = None,
parent_operators: Optional[List[ParentOperator]] = None,
metadata: Optional[Dict[str, Any]] = None):
self.uid = str(uuid4())
self.parent_operators = parent_operators if parent_operators is not None else []
self.parent_operators = parent_operators or []
self.fitness = fitness
self.computation_time = computation_time
self.graph = graph
self.metadata: Dict[str, Any] = metadata or {}

def __eq__(self, other):
def __eq__(self, other: 'Individual'):
return self.uid == other.uid
13 changes: 7 additions & 6 deletions fedot/core/optimisers/opt_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def write_composer_history_to_csv(self, file='history.csv'):
fitness = ind.fitness.values
else:
fitness = ind.fitness
ind_pipeline_template = adapter.restore_as_template(ind.graph, ind.computation_time)
ind_pipeline_template = adapter.restore_as_template(ind.graph, ind.metadata)
row = [
idx, gen_num, fitness,
len(ind_pipeline_template.operation_templates), ind_pipeline_template.depth, ind.computation_time
len(ind_pipeline_template.operation_templates), ind_pipeline_template.depth, ind.metadata
]
self._add_history_to_csv(file, row)
idx += 1
Expand All @@ -79,10 +79,11 @@ def _write_header_to_csv(self, f):
metric_str = 'metric'
if self.is_multi_objective:
metric_str += 's'
row = ['index', 'generation', metric_str, 'quantity_of_operations', 'depth', 'computation_time']
row = ['index', 'generation', metric_str, 'quantity_of_operations', 'depth', 'metadata']
writer.writerow(row)

def _add_history_to_csv(self, f: str, row: List[Any]):
@staticmethod
def _add_history_to_csv(f: str, row: List[Any]):
with open(f, 'a', newline='') as file:
writer = csv.writer(file, quoting=csv.QUOTE_ALL)
writer.writerow(row)
Expand All @@ -101,7 +102,7 @@ def save_current_results(self, path: Optional[str] = None):
{'fitness_name': self.short_metrics_names[0],
'fitness_value': self.historical_fitness[last_gen_id][ind_id]}
PipelineAdapter().restore_as_template(
individual.graph, individual.computation_time
individual.graph, individual.metadata
).export_pipeline(path=ind_path, additional_info=additional_info, datetime_in_path=False)
except Exception as ex:
print(ex)
Expand Down Expand Up @@ -183,7 +184,7 @@ def all_historical_quality(self):
def historical_pipelines(self):
adapter = PipelineAdapter()
return [
adapter.restore_as_template(ind.graph, ind.computation_time)
adapter.restore_as_template(ind.graph, ind.metadata)
for ind in list(itertools.chain(*self.individuals))
]

Expand Down
4 changes: 2 additions & 2 deletions fedot/core/pipelines/convert.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Tuple, Dict
from uuid import uuid4

import networkx as nx
Expand All @@ -8,7 +8,7 @@
from fedot.core.pipelines.template import PipelineTemplate


def graph_structure_as_nx_graph(structural_graph: 'Graph'):
def graph_structure_as_nx_graph(structural_graph: 'Graph') -> Tuple[nx.DiGraph, Dict[uuid4, 'Node']]:
""" Convert graph into networkx graph object """
nx_graph = nx.DiGraph()
node_labels = {}
Expand Down
9 changes: 3 additions & 6 deletions fedot/core/pipelines/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import Counter
from datetime import datetime
from io import BytesIO
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union, Dict, Any
from uuid import uuid4

import joblib
Expand Down Expand Up @@ -41,20 +41,17 @@ def __init__(self, pipeline: 'Pipeline' = None, log: Log = None):
self.total_pipeline_operations = Counter()
self.operation_templates: List[OperationTemplate] = []
self.unique_pipeline_id = str(uuid4())
self.metadata: Dict[str, Any] = {}
if pipeline is not None:
self.depth = pipeline.depth
self.metadata['computation_time'] = pipeline.computation_time

# Save preprocessing operations
self.data_preprocessor = pipeline.preprocessor
else:
self.depth = 0
self.data_preprocessor = None

try:
self.computation_time = pipeline.computation_time
except AttributeError:
self.computation_time = None

if not log:
self.log = default_log(__name__)
else:
Expand Down
1 change: 0 additions & 1 deletion test/unit/composer/test_history.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os

from fedot.api.main import Fedot
from fedot.core.constants import FAST_TRAIN_PRESET_NAME
from fedot.core.composer.advisor import PipelineChangeAdvisor
from fedot.core.composer.gp_composer.gp_composer import PipelineComposerRequirements
from fedot.core.dag.validation_rules import DEFAULT_DAG_RULES
Expand Down
36 changes: 35 additions & 1 deletion test/unit/dag/test_graph_operator.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from copy import deepcopy

from fedot.core.dag.graph_operator import GraphOperator
from fedot.core.optimisers.adapters import PipelineAdapter
from fedot.core.optimisers.graph import OptNode
from fedot.core.pipelines.node import PrimaryNode, SecondaryNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.pipeline_builder import PipelineBuilder


def get_pipeline():
def get_pipeline() -> Pipeline:
third_level_one = PrimaryNode('lda')

second_level_one = SecondaryNode('qda', nodes_from=[third_level_one])
Expand Down Expand Up @@ -95,6 +97,38 @@ def test_node_children():
assert children[0] is pipeline.nodes[1]


# ------------------------------------------------------------------------------
# Tests for distance_to_other method

def test_distance_to_same_pipeline_restored():
# given
adapter = PipelineAdapter()
pipeline = get_pipeline()
opt_graph = adapter.adapt(pipeline)

# when
distance = pipeline.operator.distance_to(adapter.restore(opt_graph))

# then
assert distance == 0


def test_known_distances():
pipeline_scaling = PipelineBuilder().add_node('scaling').to_pipeline() # scaling
pipeline_xgboost = PipelineBuilder().add_node('xgboost').to_pipeline() # xgboost
pipeline_knn = PipelineBuilder().add_node('scaling').add_node('knn').to_pipeline() # scaling -> knn
pipeline_linear = PipelineBuilder().add_node('scaling').add_node('linear').to_pipeline() # scaling -> linear
pipeline_knn_alternate_params = PipelineBuilder().add_node('scaling').\
add_node('knn', params={'mectric': 'euclidean'}).to_pipeline() # scaling -> knn_alternate_params

assert pipeline_knn.operator.distance_to(pipeline_knn) == 0 # the same pipeline
assert pipeline_knn.operator.distance_to(pipeline_scaling) == 2 # changes: 1 node (operation) + 1 edge
assert pipeline_knn.operator.distance_to(pipeline_linear) == 1 # changes: 1 node (operation)
assert pipeline_knn.operator.distance_to(pipeline_knn_alternate_params) == 1 # changes: 1 node (params)
assert pipeline_knn.operator.distance_to(pipeline_xgboost) == 3 # changes: 2 nodes (operations) + 1 edge
assert pipeline_linear.operator.distance_to(pipeline_knn_alternate_params) == 1 # changes: 1 operation + params


# ------------------------------------------------------------------------------
# Tests for disconnect_nodes method

Expand Down

0 comments on commit 3fa9b2f

Please sign in to comment.