-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathnode.py
405 lines (314 loc) · 14.4 KB
/
node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple, Union
import numpy as np
from golem.core.dag.linked_graph_node import LinkedGraphNode
from golem.core.log import default_log
from golem.core.optimisers.timer import Timer
from golem.serializers.serializer import register_serializable
from fedot.core.data.data import InputData, OutputData
from fedot.core.data.merge.data_merger import DataMerger
from fedot.core.operations.factory import OperationFactory
from fedot.core.operations.operation import Operation
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.core.repository.operation_types_repository import OperationTypesRepository
from fedot.core.utils import DEFAULT_PARAMS_STUB, NESTED_PARAMS_LABEL
@register_serializable
@dataclass
class NodeMetadata:
"""Dataclass. :class:`PipelineNode` metadata
Args:
metric: quality score
"""
metric: Optional[float] = None
class PipelineNode(LinkedGraphNode):
"""The class defines the interface of nodes modifying tha data flow in the :class:`Pipeline`
Args:
operation_type: operation defined in the operation repository
nodes_from: parent nodes where data comes from
node_data: ``dict`` with :class:`InputData` for fit and predict stage
kwargs: optional arguments (i.e. logger)
"""
def __init__(self, operation_type: Optional[Union[str, Operation]] = None,
nodes_from: Optional[List[PipelineNode]] = None,
node_data: Optional[dict] = None,
**kwargs):
if node_data is None:
self._node_data = {}
self.direct_set = False
else:
self._node_data = node_data
# Was the data passed directly to the node or not
self.direct_set = True
passed_content = kwargs.get('content')
if passed_content:
# Define operation, based on content dictionary
operation = self._process_content_init(passed_content)
params = passed_content.get('params', {})
self.metadata = passed_content.get('metadata', NodeMetadata())
else:
# There is no content for node
operation = self._process_direct_init(operation_type)
# Define operation with default parameters
params = {}
self.metadata = NodeMetadata()
self.fit_time_in_seconds = 0
self.inference_time_in_seconds = 0
super().__init__(content={'name': operation}, nodes_from=nodes_from)
# use parameters.setter to process input parameters correctly
self.parameters = params
self.log = default_log(self)
self._fitted_operation = None
self.rating = None
@property
def is_primary(self):
if not self.nodes_from or len(self.nodes_from) == 0:
return True
def _process_content_init(self, passed_content: dict) -> Operation:
""" Updating content in the node """
if isinstance(passed_content['name'], str):
# Need to convert name of operation into operation class object
operation_factory = OperationFactory(operation_name=passed_content['name'])
operation = operation_factory.get_operation()
passed_content.update({'name': operation})
else:
operation = passed_content['name']
self.content = passed_content
return operation
@staticmethod
def _process_direct_init(operation_type: Optional[Union[str, Operation]]) -> Operation:
"""Define operation based on the direct ``operation_type`` without defining content in the node
Args:
operation_type: node type representation
Returns:
Operation: operation class object
"""
if not operation_type:
raise ValueError('Operation is not defined in the node')
if not isinstance(operation_type, str):
# AtomizedModel
operation = operation_type
else:
# Define appropriate operation or data operation
operation_factory = OperationFactory(operation_name=operation_type)
operation = operation_factory.get_operation()
return operation
def update_params(self):
"""Updates :attr:`custom_params` with changed parameters"""
new_params = self.fitted_operation.get_params()
changed_parameters = new_params.changed_parameters
updated_parameters = {**self.parameters, **changed_parameters}
self.parameters = updated_parameters
@property
def name(self) -> str:
""" Returns str name of operation """
return self.operation.operation_type
@property
def operation(self) -> Operation:
"""Returns node operation object
Returns:
Operation: operation object
"""
return self.content['name']
@operation.setter
def operation(self, value: Operation):
"""Updates ``operation`` property with the provided ``value``
Args:
value: new operation object
"""
self.content.update({'name': value})
@property
def fitted_operation(self) -> Optional[Any]:
"""Returns already fitted operation if exists or ``None`` instead
Returns:
node fitted operation or ``None``
"""
return getattr(self, '_fitted_operation', None)
@fitted_operation.setter
def fitted_operation(self, value: Any):
"""Sets node fitted operation with the provided ``value``
Args:
value: any model from the ``list`` of acceptable nodes for the chosen task and problem
"""
if value is None:
if hasattr(self, '_fitted_operation'):
del self._fitted_operation
else:
self._fitted_operation = value
def unfit(self):
"""Sets ``node_data`` (if exists) and ``fitted_operation`` to ``None``
"""
self.fitted_operation = None
if hasattr(self, 'node_data'):
self.node_data = None
def fit(self, input_data: InputData) -> OutputData:
"""Runs training process in the node
Args:
input_data: data used for operation training
Returns:
OutputData: values predicted on the provided ``input_data``
"""
self.log.debug(f'Trying to fit pipeline node with operation: {self.operation}')
input_data = self._get_input_data(input_data=input_data, parent_operation='fit')
if self.fitted_operation is None:
with Timer() as t:
self.fitted_operation, operation_predict = self.operation.fit(params=self._parameters,
data=input_data)
self.fit_time_in_seconds = round(t.seconds_from_start, 3)
else:
operation_predict = self.operation.predict_for_fit(fitted_operation=self.fitted_operation,
data=input_data,
params=self._parameters)
# Update parameters after operation fitting (they can be corrected)
not_atomized_operation = 'atomized' not in self.operation.operation_type
if not_atomized_operation and 'correct_params' in self.operation.metadata.tags:
self.update_params()
return operation_predict
def predict(self, input_data: InputData, output_mode: str = 'default') -> OutputData:
"""Runs prediction process in the node
Args:
input_data: data used for prediction
output_mode: desired output for operations (e.g. ``'labels'``, ``'probs'``, ``'full_probs'``)
Returns:
OutputData: values predicted on the provided ``input_data``
"""
self.log.debug(f'Obtain prediction in pipeline node by operation: {self.operation}')
input_data = self._get_input_data(input_data=input_data, parent_operation='predict')
with Timer() as t:
operation_predict = self.operation.predict(fitted_operation=self.fitted_operation,
params=self._parameters,
data=input_data,
output_mode=output_mode)
self.inference_time_in_seconds = round(t.seconds_from_start, 3)
return operation_predict
def get_data_from_node(self) -> dict:
"""Returns data if it was set to the nodes directly
Returns:
dict: ``dict`` with :class:`InputData` for fit and predict stage
"""
return self.node_data
@property
def node_data(self) -> dict:
"""Returns directly set :attr:`node_data`
Returns:
dict: ``dict`` with :class:`InputData` for fit and predict stage
"""
return getattr(self, '_node_data', {})
@node_data.setter
def node_data(self, value: dict):
"""Sets :attr:`node_data`
Args:
value: ``dict`` with :class:`InputData` for fit and predict stage
"""
if value is None:
if hasattr(self, '_node_data'):
del self._node_data
else:
self._node_data = value
def _get_input_data(self, input_data: InputData, parent_operation: str):
if self.nodes_from:
input_data = self._input_from_parents(input_data=input_data, parent_operation=parent_operation)
else:
if self.direct_set:
input_data = self.node_data
else:
self.node_data = input_data
return input_data
def _input_from_parents(self, input_data: InputData, parent_operation: str) -> InputData:
"""Processes all the parent nodes via the current operation using ``input_data``
Args:
input_data: input data from pipeline abstraction (source input data)
parent_operation: name of parent operation (``'fit'`` or ``'predict'``)
Returns:
InputData: predictions from the secondary nodes
"""
if len(self.nodes_from) == 0:
raise ValueError('No parent nodes found')
self.log.debug(f'Fit all parent nodes in secondary node with operation: {self.operation}')
parent_nodes = self._nodes_from_with_fixed_order()
parent_results, _ = _combine_parents(parent_nodes, input_data,
parent_operation)
secondary_input = DataMerger.get(parent_results).merge()
# Update info about visited nodes
parent_operations = [node.operation.operation_type for node in parent_nodes]
secondary_input.supplementary_data.previous_operations = parent_operations
return secondary_input
def _nodes_from_with_fixed_order(self):
"""Sorts :attr:`nodes_from` (if exists) by the nodes unique id
Returns:
sorted :attr:`nodes_from` by :obj:`GraphNode.descriptive_id` or ``None``
"""
return sorted(self.nodes_from, key=lambda node: node.descriptive_id)
@property
def parameters(self) -> dict:
"""Returns node custom parameters
Returns:
dict: of custom parameters
"""
return self.content.get('params')
@parameters.setter
def parameters(self, params: dict):
"""Sets custom parameters of the node or set default
Args:
params: new parameters to be placed instead of existing
"""
if params is not None:
# The check for "default_params" is needed for backward compatibility.
if params == DEFAULT_PARAMS_STUB:
params = {}
# take nested params if they appeared (mostly used for tuning)
if NESTED_PARAMS_LABEL in params:
params = params[NESTED_PARAMS_LABEL]
self._parameters = OperationParameters.from_operation_type(self.operation.operation_type, **params)
self.content['params'] = self._parameters.to_dict()
def __str__(self) -> str:
"""Returns ``str`` representation of the node
Returns:
str: string field node operation type
"""
return str(self.operation.operation_type)
@property
def tags(self) -> Optional[List[str]]:
"""Returns tags of operation in the node or empty list
Returns:
Optional[List[str]]: ``empty list`` if node is of atomized type and ``list of tags`` otherwise
"""
if 'atomized' in self.operation.operation_type:
# There are no tags for atomized operation
return []
info = OperationTypesRepository(operation_type='all').operation_info_by_id(self.operation.operation_type)
if info is not None:
return info.tags
def _combine_parents(parent_nodes: List[PipelineNode],
input_data: Optional[InputData], parent_operation: str) -> Tuple[List[OutputData], np.array]:
""" Combines predictions from the ``parent_nodes``
Args:
parent_nodes: list of parent nodes, from which predictions will be combined
input_data: input data from pipeline abstraction (source input data)
parent_operation: name of parent operation (``'fit'`` or ``'predict'``)
Returns:
Tuple[List[OutputData], np.array]: :obj:`output data list from parent nodes`,
:obj:`target for final pipeline prediction`
"""
if input_data is not None:
# InputData was set to pipeline
target = input_data.target
parent_results = []
for parent in parent_nodes:
if parent_operation == 'predict':
prediction = parent.predict(input_data=input_data)
parent_results.append(prediction)
elif parent_operation == 'fit':
prediction = parent.fit(input_data=input_data)
parent_results.append(prediction)
else:
raise ValueError("Value parent_operation should be 'fit' or 'predict'")
if input_data is None:
# InputData was set to primary nodes
target = prediction.target
return parent_results, target
# TODO: these two lines are used for backwards compatibility.
# It should be removed and replaced by a script for converting old-style pipelines (with PrimaryNode and SecondaryNode)
# to a new-style ones (only with PipelineNode).
PrimaryNode = PipelineNode
SecondaryNode = PipelineNode