Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Generalized pipeline and graph pipeline enhancement Proposal #33

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 331 additions & 0 deletions steps/21_general_pipeline_and_graph_pipeline/step.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
# Generalized Pipeline and Graphpipeline

Contributors: benheid

## Contents

[TOC]


## Overview

### Problem Statement

We would like to introduce a new type of pipeline in sktime.

1. We would like to have a general pipeline instead of having pipelines for different tasks.
I.e. one pipeline that can contain classifier but also works with forecaster.
2. Based on that, we would like to introduce or enable the graphpipelineing concept as proposed in pyWATTS.

Initially proposed in sktime issues
* https://github.com/sktime/sktime/issues/4281

Relies on existing Step PR:
* https://github.com/sktime/sktime/pull/4341


## Requirements

### Minimal Functional Requirements

* The graphpipeline should cover the functionality of:
* TransformedTargetForecaster linear pipeline
* ForecastingPipeline linear pipeline
* TransformerPipeline
* FeatureUnion and variable subsetting
* Additionally, a simple “triangle” graphical situation should be realised in the minimal version
* This requires the management of the execution order. May use the resolution order from pyWATTS.
* The minimal graphpipeline has only one output step, which can be a forecaster.
* The minimal graphpipeline supports only one forecaster.

These three requirements cover the following aspects:
* The implementation needs to be polymorph with regard to transformer and forecaster
* simple truly graphical situations beyond current sktime functionality are covered (triangle)


#### General requirements


* the new pipeline should be compatible to all sktime estimators
* it should behave like an classifier, if a classifier is added in the pipeline, behave like a forecaster
if a forecaster is added ...
* Code duplications should be reduced. I.e., we need to figure out if we can share code accross the different transform/predict/.. methods.


## Proposed solution

### User journey design
The user has only to import the pipeline regardless of the type of the used estimator.


Taken from [Hackmd](https://hackmd.io/6PMsV6DLRIyCvwBMfKHuhw)
```python
p = Pipeline(stuff)
p.add_step(step1, "step1-name", {"x": "column1"})
p.add_step(step2, "step2-name", {"x": "column1"})
p.add_step(step3, "step3-name", {"x": "step1-name",
"y": "step2-name"})

...
p.add_step(...)
p.fit(data, params)
p.method(more_data)
```
* **Constructor of the Pipeline (`__init__`)**
* The constructor creates the pipeline object it accepts the following arguments
* A list of steps/step_informations of the pipeline that can fully describe a pipeline.
However, probably **should not** this. This is mainly required to be compatibe to sklearn.
Since the pipeline steps are pipeline parameters.
* Additional parameters as store paths for intermediate results (at least pyWATTS uses such information.)
* **add_step** method.
* The `add_step` method adds a transformer/forecaster to the pipeline. Thereby, it mutates the state of the pipeline.
Under the hood, the graph structure is maintained by having a Step/StepInformation object that contains the added
transformer/forecaster together with the links to its' predecessors.
It requires the following arguments:
* The transformer/forecaster which should be add to the pipeline.
* The name that this step should have within the pipeline.
* A list of predessors, identified by the keys. Note column names of the input dataset can also be used as keys.
* Further kwargs, that can specify the behaviour of the transformer/forecaster in the pipeline.
* **fit** method
* Fits everything within the pipeline.
* **method** stands for every method that is possible on the pipeline. The possible methods of the pipeline a determined by its last step.

The pipeline object is created, and then with `add_step` the transformer/forecaster/.. are added to the pipeline.
Afterwards, fit can be called, and then for example `predict` if the last step in the pipeline has a `predict` method.


#### Comparison to existing solutions
##### Compare to make_pipeline

```python
p = make_pipeline(steps)
p.fit(data, params)
p.method(more_data)
```
However, such a strucure as in the example with `add_step` is not possible.

##### Compare to dunders

```python
p = Pipeline(stuff)
step1 = step1()(x=p["column1"])
step2 = step2()(x=p["column1"])
step3 = step3()(x=step1, y=step2)
...

p.fit(data, params)
p.method(more_data)
```

This example (as used in pyWATTS) is very similiar to the proposed solution.
However, this solution requires the implementation of the `__call__` dunder in the base class.

## Code design

The solution below introduces a new pipeline class that
* inherits from BaseEstimator.
* uses ducktyping for behaving like a specific class (e.g. Forecaster, Classifier, ...)
* uses inspection for determining the correct arguments for call, fit, etc. methods.

A prototype that describes the general idea is provided in the following PR:

#### TODOS
* [ ] Exemplarily flowchart, how the pipeline works
* [ ] Adding steps
* [ ] Fitting/Transforming/Predicting the pipeline (Resolution Order)

The general pipeline implements each fit/predict/transform method that is available in sktime. See example from above.

### Code Design: Graphpipeline
The proposed solution relies on the PR [sktime 4321](https://github.com/sktime/sktime/pull/4341), in this PR also
more details of the code are available. It may look as follows

```python

class Pipeline(BaseEstimator):

def __init__(self, steps):
# Initialise the method

@staticmethod
def _check_validity(step, method_name, **kwargs):
# Checks if the method_name is allowed to call on the pipeline.
# Thus, it uses ducktyping
# Returns the all kwargs that are provided to the pipeline and needed by method_name.

def fit(self, **kwargs):
# Fits the pipeline

def transform(self, *args, **kwargs):
# Implementation of transform, such methods also are required for predict, ...

def add_step(self, skobject, name, edges, **kwargs):
# Adds a step to the pipeline and store the step informations

```
#### Explanations of the code:
* The `__init__` gets the steps as argument, which are just assigned to the parameter `self.steps`.
* Note for the GraphPipeline Solution, we probably need a additional add_step method, which can vary `self.steps`
* The fit/transform/predict/... methods have a similar design.
* To take all possible arguments they have the `*args, **kwargs` parameters. These parameters are cloned to avoid side effects.
* All transformers of the pipeline are executed (all steps before the last).
Thereby, first the validity is checked. I.e., does the current step implement the transform method and are all required parameters available.
* Call the transform method with the required arguments.
* Afterwards, it is checked if all required arguments for the desired method of the last step are available.
* Finally, the desired method of the last step is called
* The `_check_validity` method checks for a specific estimator/transformer if the method is available and if all required parameters are in kwargs.
Afterwards, it returns a dict containing all required parameters.
* The `add_step` methods adds a step to the pipeline. Therefore, it
* creates an internal Step/StepInformation object that contains all information about the
predecessors.
* Maintaince the list of steps/step_informations in the pipeline

#### Further Supportive Classes/Methods and their code

* **Step** class
* Intermediate layer of the graphpipeline, which wraps a forecaster/transformer/...
I.e., what are predesessors etc.
* Can realise additional functionality as printing/plotting intermediate results
* Has a get_result method, which
* calls get_result method on predecessors (i.e. fetching the input of the wrapped transformer/...)
* executes fit if required
* executes the predict/transform/... call on the wrapped transformer/...
* stores the result of predict/transform... in a buffer
* returns the result of predict/transform...


* **StepInformation** class
* Not sure if required if we not use the `__call__` dunder.


#### How is determined if a method is allowed to be called on the pipeline. I.e. what is the type of the pipeline.
If a method is called on the pipeline, the pipeline checks if the method is available. If not it will raise
an NotImplementException or something similar.
The allowed methods are specified by the type of the last element of the pipeline. I.e., if the last element is a transformer,
`transform` is allowed. If the last element is a forecaster, then `predict`, `predict_quantile`, ... are allowed.

Under the hood, this is determined by using ducktyping as in the following code snippet.

````python
if not hasattr(last_step, method_name):
raise Exception(f"Method {method_name} does not exist for {last_step.__name__}")
method = getattr(last_step, method_name)
````
hereby, `last_step` is the last element in the pipeline, since it determines the allowed methods.
Furthermore, `method_name` is the name of the method that should be called, e.g. `predict`.
The result of the `getattr` method is the requested method.


To identify the allowed arguments of the method, we use inspection.
More specifically, first, we determine which parameters the method needs via inspection.
Afterwards, we check if these parameter are provided to the method that is called.
The code for this might look as follows:

```python
method_signature = inspect.signature(method).parameters

for name, param in method_signature.items():
if name == "self":
continue
if name not in kwargs and param.default is inspect._empty and param.kind != _ParameterKind.VAR_KEYWORD:
raise Exception(f"Necesssary parameter {name} of method {method_name} is not provided")
if name in kwargs:
use_kwargs[name] = kwargs[name]
```
First, with inspect we get a dict of all arguments which the method has.
Second, we iterate through this dict and make the following checks:
* ignore the `self` argument
* raise an exception if an required argument is not provided.
The argument is required if it's default value is empty
and if it is not key word argument.
* store the argument in a dict of the arguments `use_kwargs` that should be used with the name of the parameter as key.

The dict `use_kwargs` is then passed to the method that should be called using the double star (`**use_kwargs`).

#### How is the graph represented within in the pipeline
The pipeline has a Directed Acyclic Graph (DAG) structure. DAGs are storable by backward links.
Thus, we have an intermediate layer (class `step`), such a step is created with each call of `add_step`.
The step stores the transformer/estimator added by `add_step`, together with links to all predecessors.
The pipeline itself needs to dissolve the graph structure the information which steps have no sucessors.

#### Resolution Order
The correct execution order of the steps in the pipeline is determined based on an algorithm for checking if a graph is a DAG.
I.e.
* Call on the last step, get_result/etc., which calls get_result recurrently on its predecessors.
* If no predessors is available, the associate step should be connected to a column, then this column from the provided data is returned.
* Using the return value of each recurrent call of `get_result`, the step calls fit/transform/predict/... on the wraped transformer/...
* Returns the result of the transform/... call

#### How to determine if inverse_transform needs to be executed?
**Decision Needed Here**
* [ ] Should we require the user to say explicitly that she/he wants to execute inverse_transform
and that she/he needs to specify at which position (by adding the step a second time to the pipeline?)

### Code Design: What will be reused
* skBASE
* pyWATTS resolution order

#### Interface Compatibility

The pipeline itself inherits from skbase and is a BaseEstimator.
Consequently, the pipeline should comply to most/all of the interfaces.
Note, in contrast to sktime, this solution requires ducktyping (which is also widely used in sklearn).

#### Testing
* Regression tests via simple examples. The examples should have some assertions.
These assertions would then allow to use the examples as regression tests.
* For basic functionality, unit tests should directly be implemented.


* [ ] **Do we allow Mocking?**

## Comparison to current linear pipelines

##### TODOs
* [ ] 1:1 comparison of the same example. Perhaps the example from the pyData global.
* [ ] Architectural Differences.

### Current implementation and workflow

### Problems of the current soultion
* Currently, for each estimator a separate pipeline implementation exist.
* Furthermore, graph pipelines cannot be realised

## Implementation plan
### Logical Units
* Pipeline
* Stub/Structure
* Fit/Transform/Predict/....
* Type Inference
* Check Validity
* AddStep
* Examples
* Step
* Resolution Order


### Testing Strategy, Decision Needed!
* TDD or partial TDD using the simple examples?
* tests of sub-units
* interface compatibility

### Code Dependencies
If under the same number then we can work on it in parallel
1. First Implementation Steps
* Pipeline Stub
2. Second Implementation Steps
* Examples -> Use this for TDD
* Step Stub
3. Third Implementation Steps
* Pipeline AddStep
* Step ResolutionOrder, ..
* Pipeline TypeInference
* Pipeline CheckValidity

### Required Decision Regarding Development Process
* Development branch in which we merge? (For merges in Dev Branch no PRs are required).
* Create Draft PR for documenting the progress
* Create Project/Milestone?

## Examples that are used for partial tdd
* [ ] List of examples