Skip to content

Commit

Permalink
[KED-1105] Make modular pipelines externally configurable (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Kirilenko authored Oct 22, 2019
1 parent ad5f749 commit 3c0f097
Show file tree
Hide file tree
Showing 6 changed files with 511 additions and 49 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Major features and improvements
* `kedro jupyter` now gives the default kernel a sensible name.
* `Pipeline.name` has been deprecated in favour of `Pipeline.tags`.
* `Pipeline.transform` has been added, allowing to rename and prefix datasets and nodes.
* Added Jupyter notebook line magic (`%run_viz`) to run `kedro viz` in notebook cell.

## Bug fixes and other changes
Expand Down
68 changes: 68 additions & 0 deletions docs/source/04_user_guide/06_pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,74 @@ It is important to keep in mind that Kedro resolves node execution order based o

As a modular pipeline developer, you may not know how your pipeline will be integrated in the downstream projects and what data catalog configuration they may have. Therefore, it is crucial to make it clear in the pipeline documentation what datasets (names and types) are required as inputs by your modular pipeline and what datasets it produces as outputs.

## Connecting existing pipelines

When two existing pipelines need to work together, they should be connected by the datasets.
But the names might be different, requiring manual fixes to be applied to the pipeline itself.
Alternative solution would be to `transform` an existing pipeline. Consider this example:

```python
cook_pipeline = Pipeline([
node(defrost, 'frozen_meat', 'meat'),
node(grill, 'meat', 'grilled_meat'),
])

lunch_pipeline = Pipeline([
node(eat, 'food', None),
])
```

A simple `cook_pipeline + lunch_pipeline` doesn't work, `food` input needs to be mapped to `grilled_meat` output.
That's how it can be done, all three resulting pipelines do the job equally fine:

```python
final_pipeline1 = cook_pipeline.transform(datasets={"grilled_meat": "food"}) + lunch_pipeline
final_pipeline2 = cook_pipeline + lunch_pipeline.transform(datasets={"food": "grilled_meat"})
final_pipeline3 = cook_pipeline.transform(datasets={"grilled_meat": "new_name"}) + \
lunch_pipeline.transform(datasets={"food": "new_name")
```

## Using a modular pipeline twice
Consider the example:

```python
cook_pipeline = Pipeline([
node(defrost, "frozen_meat", "meat", name="defrost_node"),
node(grill, "meat", "grilled_meat"),
])

breakfast_pipeline = Pipeline([
node(eat_breakfast, "breakfast_food", None),
])
lunch_pipeline = Pipeline([
node(eat_lunch, "lunch_food", None),
])
```
Now we need to "defrost" two different types of food and feed it to different pipelines.
But we can't use the `cook_pipeline` twice, the internal dataset names will conflict.
We might try to call `transform` and rename all datasets,
but the conflicting explicitly set `name="defrost_node"` remains.

The right solution is:
```python
pipeline = (
cook_pipeline.transform(
datasets={"grilled_meat": "breakfast_food"}, prefix="breakfast"
)
+ breakfast_pipeline
+ cook_pipeline.transform(
datasets={"grilled_meat": "lunch_food"}, prefix="lunch"
)
+ lunch_pipeline
)
```
`prefix="lunch"` renames all datasets and nodes, prefixing them with `"lunch."`,
except those datasets that we rename explicitly (`grilled_meat`).

The resulting pipeline now has two separate nodes, `breakfast.defrost_node` and
`lunch.defrost_node`. Also two separate datasets `breakfast.meat` and `lunch.meat`
connect the nodes inside the pipelines, causing no confusion between them.

## Bad pipelines

As you notice, pipelines can usually readily resolve their dependencies. In some cases, resolution is not possible and pipelines are not well-formed.
Expand Down
33 changes: 17 additions & 16 deletions kedro/pipeline/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ def __init__(
self._validate_unique_outputs()
self._validate_inputs_dif_than_outputs()

def _copy(self, **overwrite_params):
"""
Helper function to copy the node, replacing some values.
"""
params = {
"func": self._func,
"inputs": self._inputs,
"outputs": self._outputs,
"name": self._name,
"tags": self._tags,
"decorators": self._decorators,
}
params.update(overwrite_params)
return Node(**params)

@property
def _logger(self):
return logging.getLogger(__name__)
Expand Down Expand Up @@ -208,14 +223,7 @@ def tag(self, tags: Iterable[str]) -> "Node":
A copy of the current ``Node`` object with the tags added.
"""
return Node(
self._func,
self._inputs,
self._outputs,
name=self._name,
tags=set(self._tags) | set(tags),
decorators=self._decorators,
)
return self._copy(tags=set(self._tags) | set(tags))

@property
def name(self) -> str:
Expand Down Expand Up @@ -339,14 +347,7 @@ def decorate(self, *decorators: Callable) -> "Node":
>>> assert "output" in result
>>> assert result['output'] == "f(g(fg(h(1))))"
"""
return Node(
self._func,
self._inputs,
self._outputs,
name=self._name,
tags=self.tags,
decorators=self._decorators + list(reversed(decorators)),
)
return self._copy(decorators=self._decorators + list(reversed(decorators)))

def run(self, inputs: Dict[str, Any] = None) -> Dict[str, Any]:
"""Run this node using the provided inputs and return its results
Expand Down
159 changes: 134 additions & 25 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import warnings
from collections import Counter, defaultdict
from itertools import chain
from typing import Callable, Dict, Iterable, List, Optional, Set, Union
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union

from toposort import CircularDependencyError as ToposortCircleError
from toposort import toposort
Expand All @@ -46,8 +46,9 @@
TRANSCODING_SEPARATOR = "@"


def _get_transcode_compatible_name(element: str) -> str:
"""Strip out the transcoding separator and anything that follows.
def _transcode_split(element: str) -> Tuple[str, str]:
"""Split the name by the transcoding separator.
If the transcoding part is missing, empty string will be put in.
Returns:
Node input/output name before the transcoding separator, if present.
Expand All @@ -56,13 +57,47 @@ def _get_transcode_compatible_name(element: str) -> str:
is present in the name.
"""
split_name = element.split(TRANSCODING_SEPARATOR)

if len(split_name) > 2:
raise ValueError(
"Expected maximum 1 transcoding separator, found {} instead: '{}'.".format(
len(split_name) - 1, element
)
)
return split_name[0]
if len(split_name) == 1:
split_name.append("")

return tuple(split_name) # type: ignore


def _transcode_join(parts: Tuple[str, str]) -> str:
"""Join the name parts using the transcoding separator.
If the transcoding part is missing, the resulting name wil not have it as well.
Raises:
ValueError: wrong number of parts have been provided.
Returns:
Node input/output name.
"""
if not parts or len(parts) > 2:
raise ValueError("1 or 2 parts are expected: {}".format(parts))
if len(parts) == 1 or not parts[1]:
return parts[0]

return TRANSCODING_SEPARATOR.join(parts)


def _get_transcode_compatible_name(element: str) -> str:
"""Strip out the transcoding separator and anything that follows.
Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
return _transcode_split(element)[0]


class OutputNotUniqueError(Exception):
Expand Down Expand Up @@ -95,7 +130,7 @@ def __init__(
provide pipelines among the list of nodes, those pipelines will
be expanded and all their nodes will become part of this
new pipeline.
name: (DEPRECATED, use `tags` instead) The name of the pipeline.
name: (DEPRECATED, use `tags` method instead) The name of the pipeline.
If specified, this name will be used to tag all of the nodes
in the pipeline.
tags: Optional set of tags to be applied to all the pipeline nodes.
Expand Down Expand Up @@ -142,18 +177,17 @@ def __init__(
)
_validate_duplicate_nodes(nodes)
_validate_transcoded_inputs_outputs(nodes)

self._tags = set(tags or [])
_tags = set(tags or [])

if name:
warnings.warn(
"`name` parameter is deprecated for the `Pipeline`"
" constructor, use `tags` instead",
" constructor, use `Pipeline.tag` method instead",
DeprecationWarning,
)
self._tags.add(name)
_tags.add(name)

nodes = [n.tag(self._tags) for n in nodes]
nodes = [n.tag(_tags) for n in nodes]

self._name = name
self._nodes_by_name = {node.name: node for node in nodes}
Expand Down Expand Up @@ -184,9 +218,7 @@ def __repr__(self): # pragma: no cover
nodes_reprs_str = (
"[\n{}\n]".format(",\n".join(nodes_reprs)) if nodes_reprs else "[]"
)
name = ",\nname='{}'".format(self.name) if self.name else ""

constructor_repr = "({}{})".format(nodes_reprs_str, name)
constructor_repr = "({})".format(nodes_reprs_str)
return "{}{}".format(self.__class__.__name__, constructor_repr)

def __add__(self, other):
Expand Down Expand Up @@ -331,28 +363,18 @@ def set_to_string(set_of_strings):

@property
def name(self) -> Optional[str]:
"""(DEPRECATED, use `tags` instead) Get the pipeline name.
"""(DEPRECATED, use `Pipeline.tag` method instead) Get the pipeline name.
Returns:
The name of the pipeline as provided in the constructor.
"""
warnings.warn(
"`Pipeline.name` is deprecated, use `Pipeline.tags` instead.",
"`Pipeline.name` is deprecated, use `Pipeline.tag` method instead.",
DeprecationWarning,
)
return self._name

@property
def tags(self) -> Iterable[str]:
"""Get the pipeline tags.
Returns:
The list of the pipeline tags as provided in the constructor.
"""
return self._tags

@property
def node_dependencies(self) -> Dict[Node, Set[Node]]:
"""All dependencies of nodes where the first Node has a direct dependency on
Expand Down Expand Up @@ -705,6 +727,15 @@ def decorate(self, *decorators: Callable) -> "Pipeline":
nodes = [node.decorate(*decorators) for node in self.nodes]
return Pipeline(nodes)

def tag(self, tags: Iterable[str]) -> "Pipeline":
"""
Return a copy of the pipeline, with each node tagged accordingly.
:param tags: The tags to be added to the nodes.
:return: New `Pipeline` object.
"""
nodes = [n.tag(tags) for n in self.nodes]
return Pipeline(nodes)

def to_json(self):
"""Return a json representation of the pipeline."""
transformed = [
Expand All @@ -723,6 +754,84 @@ def to_json(self):

return json.dumps(pipeline_versioned)

def transform(
self, datasets: Dict[str, str] = None, prefix: str = None
) -> "Pipeline":
"""
Create a copy of the pipeline and its nodes,
with some dataset names modified.
Args:
datasets: A map of the existing dataset name to the new one.
Both input and output datasets can be replaced this way.
prefix: A prefix to give to all dataset names,
except those explicitly named with the `datasets` parameter.
Raises:
ValueError: invalid dataset names are given.
Returns:
A new ``Pipeline`` object with the new nodes, modified as requested.
"""
# pylint: disable=protected-access
datasets = datasets or {}
new_nodes = []
used_dataset_names = set()

def _prefix(name):
return "{}.{}".format(prefix, name) if prefix else name

def _map_and_prefix(name):
if name in datasets:
used_dataset_names.add(name)
return datasets[name]

base_name, transcode_name = _transcode_split(name)

if base_name in datasets:
used_dataset_names.add(base_name)
base_name = datasets[base_name]
return _transcode_join((base_name, transcode_name))

return _prefix(name)

def _process_dataset_names(names: Union[None, str, List[str], Dict[str, str]]):
if names is None:
return None
if isinstance(names, str):
return _map_and_prefix(names) # type: ignore
if isinstance(names, list):
return [
_map_and_prefix(name) for name in names # type: ignore
]
if isinstance(names, dict):
return {
key: _map_and_prefix(value) # type: ignore
for key, value in names.items()
}

raise ValueError( # pragma: no cover
"Unexpected input {} of type {}".format(names, type(names))
)

for node in self.nodes:
new_nodes.append(
node._copy(
inputs=_process_dataset_names(node._inputs),
outputs=_process_dataset_names(node._outputs),
name=_prefix(node._name) if node._name else None,
)
)

unused_dataset_names = set(datasets) - used_dataset_names

if unused_dataset_names:
raise ValueError(
"Failed to map datasets: {}".format(sorted(unused_dataset_names))
)

return Pipeline(new_nodes)


def _validate_no_node_list(nodes: Iterable[Union[Node, Pipeline]]):
if nodes is None:
Expand Down
Loading

0 comments on commit 3c0f097

Please sign in to comment.