Skip to content

Commit

Permalink
FIX #59: move arguments of MlflowNodeHook to mlflow.yml
Browse files Browse the repository at this point in the history
  • Loading branch information
Galileo-Galilei authored and kaemo committed Sep 10, 2020
1 parent 404da28 commit e0afaf4
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
### Changed

- Remove `conda_env` and `model_name` arguments from `MlflowPipelineHook` and add them to `PipelineML` and `pipeline_ml`. This is necessary for incoming hook auto-discovery in future release and it enables having multiple `PipelineML` in the same project. [#58](https://github.com/Galileo-Galilei/kedro-mlflow/pull/58)
- `flatten_dict_params`, `recursive` and `sep` arguments of the `MlflowNodeHook` are moved to the `mlflow.yml` config file to prepare plugin auto registration. This also modifies the `run.py` template (to remove the args) and the `mlflow.yml` keys to add a `hooks` entry. ([#59](https://github.com/Galileo-Galilei/kedro-mlflow/pull/59))

## [0.2.1] - 2018-08-06

Expand Down
2 changes: 1 addition & 1 deletion docs/source/02_hello_world_example/02_first_steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Click now on the last run executed, you will land on this page:
### Parameters versioning
Note that the parameters have been recorded *automagically*. Here, two parameters format are used:
1. The parameter ``example_test_data_ratio``, which is called in the ``pipeline.py`` file with the ``params:`` prefix
2. the dictionnary of all parameters in ``parameters.yml`` which is a reserved key word in ``Kedro``. Note that **this is bad practice** because you cannot know which parameters are really used inside the function called. Another problem is that it can generate too long parameters names and lead to mlflow errors.
2. the dictionary of all parameters in ``parameters.yml`` which is a reserved key word in ``Kedro``. Note that **this is bad practice** because you cannot know which parameters are really used inside the function called. Another problem is that it can generate too long parameters names and lead to mlflow errors.

You can see that these are effectively the registered parameters in the pipeline with the ``kedro-viz`` plugin:

Expand Down
2 changes: 1 addition & 1 deletion docs/source/03_tutorial/05_version_datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ and this dataset will be automatically versioned in each pipeline execution.
## Frequently asked questions
### Can I pass extra parameters to the ``MlflowDataSet`` for finer control?
The ``MlflowDataSet`` takes a ``data_set`` argument which is a python dictionnary passed to the ``__init__`` method of the dataset declared in ``type``. It means that you can pass any arguments accepted by the underlying dataset in this dictionary. If you want to pass ``load_args`` and ``save_args`` in the previous example, add them in the ``data_set`` argument:
The ``MlflowDataSet`` takes a ``data_set`` argument which is a python dictionary passed to the ``__init__`` method of the dataset declared in ``type``. It means that you can pass any arguments accepted by the underlying dataset in this dictionary. If you want to pass ``load_args`` and ``save_args`` in the previous example, add them in the ``data_set`` argument:
```yaml
my_dataset_to_version:
Expand Down
39 changes: 29 additions & 10 deletions kedro_mlflow/framework/context/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ class KedroMlflowConfig:

UI_OPTS = {"port": None, "host": None}

NODE_HOOK_OPTS = {"flatten_dict_params": False, "recursive": True, "sep": "."}

def __init__(
self,
project_path: Union[str, Path],
mlflow_tracking_uri: str = "mlruns",
experiment_opts: Union[Dict[str, Any], None] = None,
run_opts: Union[Dict[str, Any], None] = None,
ui_opts: Union[Dict[str, Any], None] = None,
node_hook_opts: Union[Dict[str, Any], None] = None,
):

# declare attributes in __init__.py to avoid pylint complaining
Expand All @@ -39,6 +42,7 @@ def __init__(
self.experiment_opts = None
self.run_opts = None
self.ui_opts = None
self.node_hook_opts = None
self.mlflow_client = None # the client to interact with the mlflow database
self.experiment = (
None # the mlflow experiment object to interact directly with it
Expand All @@ -52,6 +56,7 @@ def __init__(
experiment=experiment_opts,
run=run_opts,
ui=ui_opts,
hooks=dict(node=node_hook_opts),
)
self.from_dict(configuration)

Expand All @@ -65,20 +70,29 @@ def from_dict(self, configuration: Dict[str, str]):
configuration {Dict[str, str]} -- A dict with the following format :
{
mlflow_tracking_uri: a valid string for mlflow tracking storage,
experiments_opts:
experiments:
{
name {str}: the name of the experiment
create {bool} : should the experiment be created if it does not exists?
},
run_opts:
run:
{
nested {bool}: should we allow nested run within the context?
},
ui_opts:
ui:
{
port {int} : the port where the ui must be served
host {str} : the host for the ui
}
hooks:
{
node:
{
flatten_dict_params {bool}: When the parameter is a dict, should we crete several parameters i the dict, one for each entry? This may be necessary beacuase mlflow has a liit size for parameters. Default to False.
recursive {bool}: In we flatten dict parameters, should we apply the strategy recusrively in case of nested dicts? Default to True.
sep {str}: The separator in case of nested dict flattening {level1:{p1:1, p2:2}} will be logged as level1.p1, level.p2. Default to "."
}
}
}
"""
Expand All @@ -87,13 +101,17 @@ def from_dict(self, configuration: Dict[str, str]):
experiment_opts = configuration.get("experiment")
run_opts = configuration.get("run")
ui_opts = configuration.get("ui")
node_hook_opts = configuration.get("hooks", {}).get("node")

self.mlflow_tracking_uri = self._validate_uri(uri=mlflow_tracking_uri)
self.experiment_opts = _validate_opts(
opts=experiment_opts, default=self.EXPERIMENT_OPTS
)
self.run_opts = _validate_opts(opts=run_opts, default=self.RUN_OPTS)
self.ui_opts = _validate_opts(opts=ui_opts, default=self.UI_OPTS)
self.node_hook_opts = _validate_opts(
opts=node_hook_opts, default=self.NODE_HOOK_OPTS
)

# instantiate mlflow objects to interact with the database
# the client must not be create dbefore carefully checking the uri,
Expand Down Expand Up @@ -126,6 +144,7 @@ def to_dict(self):
"experiments": self.experiment_opts,
"run": self.run_opts,
"ui": self.ui_opts,
"hooks": {"node": self.node_hook_opts},
}
return info

Expand Down Expand Up @@ -191,22 +210,22 @@ def _validate_uri(self, uri: Union[str, None]) -> str:


def _validate_opts(opts: Dict[str, Any], default: Dict[str, Any]) -> Dict:
"""This functions creates a valid dictionnary containing options
"""This functions creates a valid dictionary containing options
for different mlflow setup.
Only the keys provided in the default dictionnary are allowed.
If provided, value of the opts dictionnary are kept, otherwise
Only the keys provided in the default dictionary are allowed.
If provided, value of the opts dictionary are kept, otherwise
default values are retrieved.
Arguments:
opts {Dict[str, Any]} -- A dictionnary containing the configuration.
default_opts {Dict[str, Any]} -- A default dictionnary that enforces valid keys and provides default values
opts {Dict[str, Any]} -- A dictionary containing the configuration.
default_opts {Dict[str, Any]} -- A default dictionary that enforces valid keys and provides default values
Raises:
KedroMlflowConfigError: If a key in the opt dictionnary does not exist in the default, it is not a valid key.
KedroMlflowConfigError: If a key in the opt dictionary does not exist in the default, it is not a valid key.
Returns:
Dict -- A dictionnary with all the keys of 'default' and the values of 'opts' if provided.
Dict -- A dictionary with all the keys of 'default' and the values of 'opts' if provided.
"""
# makes a deepcopy to avoid modifying class constants
default_copy = default.copy()
Expand Down
11 changes: 7 additions & 4 deletions kedro_mlflow/framework/hooks/node_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
from kedro.io import DataCatalog
from kedro.pipeline.node import Node

from kedro_mlflow.framework.context import get_mlflow_config


class MlflowNodeHook:
def __init__(
self, flatten_dict_params: bool = False, recursive: bool = True, sep: str = "."
):
self.flatten = flatten_dict_params
self.recursive = recursive
self.sep = sep
config = get_mlflow_config()
self.flatten = config.node_hook_opts["flatten_dict_params"]
self.recursive = config.node_hook_opts["recursive"]
self.sep = config.node_hook_opts["sep"]

@hook_impl
def before_node_run(
Expand Down Expand Up @@ -41,7 +44,7 @@ def before_node_run(
elif k == "parameters":
params_inputs[k] = v

# dictionnary parameters may be flattened for readibility
# dictionary parameters may be flattened for readibility
if self.flatten:
params_inputs = flatten_dict(
d=params_inputs, recursive=self.recursive, sep=self.sep
Expand Down
2 changes: 1 addition & 1 deletion kedro_mlflow/framework/hooks/pipeline_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def _format_conda_env(
Defaults to None.
Returns:
Dict[str, Any] -- A dictionnary which contains all informations to dump it to a conda environment.yml file.
Dict[str, Any] -- A dictionary which contains all informations to dump it to a conda environment.yml file.
"""
python_version = ".".join(
[
Expand Down
7 changes: 7 additions & 0 deletions kedro_mlflow/template/project/mlflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ run:
name: null # if `name` is None, pipeline name will be used for the run name
nested: True # # if `nested` is False, you won't be able to launch sub-runs inside your nodes

hooks:
node:
flatten_dict_params: False # if True, parameter which are dictionary will be splitted in multiple parameters when logged in mlflow, one for each key.
recursive: True # Should the dictionary flattening be applied recursively (i.e for nested dictionaries)? Not use if `flatten_dict_params` is False.
sep: "." # In case of recursive flattening, what separator should be used between the keys? E.g. {hyperaparam1: {p1:1, p2:2}}will be logged as hyperaparam1.p1 and hyperaparam1.p2 oin mlflow.


# UI-RELATED PARAMETERS -----------------

ui:
Expand Down
6 changes: 2 additions & 4 deletions kedro_mlflow/template/project/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ class ProjectContext(KedroContext):
project_version = "{{ cookiecutter.kedro_version }}"
package_name = "{{ cookiecutter.python_package }}"
hooks = (
MlflowNodeHook(flatten_dict_params=False),
MlflowPipelineHook(
model_name="{{ cookiecutter.python_package }}", conda_env="src/requirements.txt",
),
MlflowNodeHook(),
MlflowPipelineHook(),
)

def _get_pipelines(self) -> Dict[str, Pipeline]:
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
mlflow>=1.0.0, <2.0.0
kedro>=0.16.0, <0.17.0
kedro>=0.16.0, <=0.16.4 # 0.16.5 breaks pipeline_ml, template and hooks test
1 change: 1 addition & 0 deletions tests/framework/context/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_kedro_mlflow_config_init(tmp_path):
experiments=KedroMlflowConfig.EXPERIMENT_OPTS,
run=KedroMlflowConfig.RUN_OPTS,
ui=KedroMlflowConfig.UI_OPTS,
hooks=dict(node=KedroMlflowConfig.NODE_HOOK_OPTS),
)


Expand Down
4 changes: 4 additions & 0 deletions tests/framework/context/test_mlflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ def _write_yaml(filepath, config):
experiment=dict(name="fake_package", create=True),
run=dict(id="123456789", name="my_run", nested=True),
ui=dict(port="5151", host="localhost"),
hooks=dict(node=dict(flatten_dict_params=True, recursive=False, sep="-")),
),
)
expected = {
"mlflow_tracking_uri": (tmp_path / "mlruns").as_uri(),
"experiments": {"name": "fake_package", "create": True},
"run": {"id": "123456789", "name": "my_run", "nested": True},
"ui": {"port": "5151", "host": "localhost"},
"hooks": {
"node": {"flatten_dict_params": True, "recursive": False, "sep": "-"}
},
}
assert get_mlflow_config(project_path=tmp_path, env="local").to_dict() == expected
30 changes: 23 additions & 7 deletions tests/framework/hooks/test_node_hook.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import mlflow
import pytest
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node
from mlflow.tracking import MlflowClient

from kedro_mlflow.framework.context.config import KedroMlflowConfig
from kedro_mlflow.framework.hooks import MlflowNodeHook
from kedro_mlflow.framework.hooks.node_hook import flatten_dict

Expand Down Expand Up @@ -36,8 +38,26 @@ def test_flatten_dict_nested_2_levels():
}


def test_node_hook(tmp_path):
mlflow_node_hook = MlflowNodeHook(flatten_dict_params=True, recursive=True, sep="-")
@pytest.mark.parametrize(
"flatten_dict_params,expected",
[
(True, {"param1": "1", "parameters-param1": "1", "parameters-param2": "2"}),
(False, {"param1": "1", "parameters": "{'param1': 1, 'param2': 2}"}),
],
)
def test_node_hook_logging(tmp_path, mocker, flatten_dict_params, expected):

mocker.patch("kedro_mlflow.utils._is_kedro_project", return_value=True)
config = KedroMlflowConfig(
project_path=tmp_path,
node_hook_opts={"flatten_dict_params": flatten_dict_params, "sep": "-"},
)
# the function is imported inside the other file antd this is the file to patch
# see https://stackoverflow.com/questions/30987973/python-mock-patch-doesnt-work-as-expected-for-public-method
mocker.patch(
"kedro_mlflow.framework.hooks.node_hook.get_mlflow_config", return_value=config
)
mlflow_node_hook = MlflowNodeHook()

def fake_fun(arg1, arg2, arg3):
return None
Expand Down Expand Up @@ -71,8 +91,4 @@ def fake_fun(arg1, arg2, arg3):

mlflow_client = MlflowClient(mlflow_tracking_uri)
current_run = mlflow_client.get_run(run_id)
assert current_run.data.params == {
"param1": "1",
"parameters-param1": "1",
"parameters-param2": "2",
}
assert current_run.data.params == expected
1 change: 1 addition & 0 deletions tests/template/project/test_mlflow_yml.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test_mlflow_yml_rendering(template_mlflowyml):
ui=KedroMlflowConfig.UI_OPTS,
run=KedroMlflowConfig.RUN_OPTS,
experiment=KedroMlflowConfig.EXPERIMENT_OPTS,
hooks=dict(node=KedroMlflowConfig.NODE_HOOK_OPTS),
)
expected_config["experiment"]["name"] = "fake_project" # check for proper rendering
assert mlflow_config == expected_config
2 changes: 1 addition & 1 deletion tests/template/project/test_template_pyfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_runpy_template_is_consistent_with_kedro():
"",
)
kedro_mlflow_runpy = kedro_mlflow_runpy.replace(
' hooks = (\n MlflowNodeHook(flatten_dict_params=False),\n MlflowPipelineHook(\n model_name="fake_project", conda_env="src/requirements.txt",\n ),\n )\n',
" hooks = (\n MlflowNodeHook(),\n MlflowPipelineHook(),\n )\n",
"",
)

Expand Down

0 comments on commit e0afaf4

Please sign in to comment.