Skip to content

Commit

Permalink
[KED-2004] Manage hook_manager lifecycle in session (#1153)
Browse files Browse the repository at this point in the history
  • Loading branch information
merelcht authored Feb 4, 2022
1 parent 2d7a41c commit 93d01c8
Show file tree
Hide file tree
Showing 30 changed files with 431 additions and 409 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* The default `kedro` environment names can now be set in `settings.py` with the help of the `CONFIG_LOADER_ARGS` variable. The relevant keys to be supplied are `base_env` and `default_run_env`. These values are set to `base` and `local` respectively as a default.
* Added `kedro.config.abstract_config.AbstractConfigLoader` as an abstract base class for all `ConfigLoader` implementations. `ConfigLoader` and `TemplatedConfigLoader` now inherit directly from this base class.
* Streamlined the `ConfigLoader.get` and `TemplatedConfigLoader.get` API and delegated the actual `get` method functional implementation to the `kedro.config.common` module.
* The `hook_manager` is no longer a global singleton. The `hook_manager` lifecycle is now managed by the `KedroSession`, a new `hook_manager` will be created everytime a `session` is instantiated.
* Added the following new datasets:

| Type | Description | Location |
Expand Down Expand Up @@ -66,6 +67,8 @@
* Changed the behaviour of `kedro build-reqs` to compile requirements from `requirements.txt` instead of `requirements.in` and save them to `requirements.lock` instead of `requirements.txt`.
* Removed `ProjectHooks.register_catalog` `hook_spec` in favour of loading `DATA_CATALOG_CLASS` directly from `settings.py`. The default option for `DATA_CATALOG_CLASS` is now set to `kedro.io.DataCatalog`.
* Removed `RegistrationSpecs` and all registration hooks that belonged to it. Going forward users can register custom library components through `settings.py`.
* Added the `PluginManager` `hook_manager` argument to `KedroContext` and the `Runner.run()` method, which will be provided by the `KedroSession`.
* Removed the public method `get_hook_manager()` and replaced its functionality by `_create_hook_manager()`.

## Thanks for supporting contributions

Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
"integer -- return number of occurrences of value",
"integer -- return first index of value.",
"kedro.extras.datasets.pandas.json_dataset.JSONDataSet",
"pluggy._manager.PluginManager",
),
"py:data": (
"typing.Any",
Expand Down
9 changes: 8 additions & 1 deletion features/load_context.feature
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ Feature: Custom Kedro project
And I execute the kedro command "run"
Then I should get a successful exit code

Scenario: Hooks from installed plugins are automatically registered
Scenario: Hooks from installed plugins are automatically registered and work with the default runner
Given I have installed the test plugin
When I execute the kedro command "run"
Then I should get a successful exit code
And I should get a message including "Registered hooks from 1 installed plugin(s): test-plugin-0.1"
And I should get a message including "Reached after_catalog_created hook"

Scenario: Hooks from installed plugins are automatically registered and work with the parallel runner
Given I have installed the test plugin
When I execute the kedro command "run --runner=ParallelRunner"
Then I should get a successful exit code
And I should get a message including "Registered hooks from 1 installed plugin(s): test-plugin-0.1"
And I should get a message including "Reached after_catalog_created hook"

Scenario: Disable automatically registered plugin hooks
Given I have installed the test plugin
And I have disabled hooks for "test-plugin" plugin via config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pytest
from kedro.config import ConfigLoader
from kedro.framework.context import KedroContext
from kedro.framework.hooks import _create_hook_manager


@pytest.fixture
Expand All @@ -26,6 +27,7 @@ def project_context(config_loader):
package_name="{{ cookiecutter.python_package }}",
project_path=Path.cwd(),
config_loader=config_loader,
hook_manager=_create_hook_manager(),
)


Expand Down
11 changes: 0 additions & 11 deletions kedro/extras/extensions/ipython.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ def _remove_cached_modules(package_name):
del sys.modules[module] # pragma: no cover


def _clear_hook_manager():
from kedro.framework.hooks import get_hook_manager

hook_manager = get_hook_manager()
name_plugin_pairs = hook_manager.list_name_plugin()
for name, plugin in name_plugin_pairs:
hook_manager.unregister(name=name, plugin=plugin) # pragma: no cover


def _find_kedro_project(current_dir): # pragma: no cover
from kedro.framework.startup import _is_project

Expand All @@ -53,8 +44,6 @@ def reload_kedro(path, env: str = None, extra_params: Dict[str, Any] = None):
from kedro.framework.session.session import _activate_session
from kedro.framework.startup import bootstrap_project

_clear_hook_manager()

path = path or project_path
metadata = bootstrap_project(path)

Expand Down
9 changes: 6 additions & 3 deletions kedro/framework/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from urllib.parse import urlparse
from warnings import warn

from pluggy import PluginManager

from kedro.config import ConfigLoader, MissingConfigException
from kedro.framework.hooks import get_hook_manager
from kedro.framework.project import settings
from kedro.io import DataCatalog
from kedro.pipeline.pipeline import _transcode_split
Expand Down Expand Up @@ -168,6 +169,7 @@ def __init__(
package_name: str,
project_path: Union[Path, str],
config_loader: ConfigLoader,
hook_manager: PluginManager,
env: str = None,
extra_params: Dict[str, Any] = None,
): # pylint: disable=too-many-arguments
Expand All @@ -183,6 +185,7 @@ def __init__(
package_name: Package name for the Kedro project the context is
created for.
project_path: Project path to define the context for.
hook_manager: The ``PluginManager`` to activate hooks, supplied by the session.
env: Optional argument for configuration default environment to be used
for running the pipeline. If not specified, it defaults to "local".
extra_params: Optional dictionary containing extra project parameters.
Expand All @@ -194,6 +197,7 @@ def __init__(
self._config_loader = config_loader
self._env = env
self._extra_params = deepcopy(extra_params)
self._hook_manager = hook_manager

@property # type: ignore
def env(self) -> Optional[str]:
Expand Down Expand Up @@ -279,8 +283,7 @@ def _get_catalog(
catalog.add_feed_dict(feed_dict)
if catalog.layers:
_validate_layers_for_transcoding(catalog)
hook_manager = get_hook_manager()
hook_manager.hook.after_catalog_created( # pylint: disable=no-member
self._hook_manager.hook.after_catalog_created(
catalog=catalog,
conf_catalog=conf_catalog,
conf_creds=conf_creds,
Expand Down
4 changes: 2 additions & 2 deletions kedro/framework/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""``kedro.framework.hooks`` provides primitives to use hooks to extend KedroContext's behaviour"""
from .manager import get_hook_manager
from .manager import _create_hook_manager
from .markers import hook_impl

__all__ = ["get_hook_manager", "hook_impl"]
__all__ = ["_create_hook_manager", "hook_impl"]
11 changes: 0 additions & 11 deletions kedro/framework/hooks/manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""This module provides an utility function to retrieve the global hook_manager singleton
in a Kedro's execution process.
"""
# pylint: disable=global-statement,invalid-name
import logging
from typing import Any, Iterable

Expand All @@ -10,8 +9,6 @@
from .markers import HOOK_NAMESPACE
from .specs import DataCatalogSpecs, DatasetSpecs, NodeSpecs, PipelineSpecs

_hook_manager = None

_PLUGIN_HOOKS = "kedro.hooks" # entry-point to load hooks from for installed plugins

logger = logging.getLogger(__name__)
Expand All @@ -27,14 +24,6 @@ def _create_hook_manager() -> PluginManager:
return manager


def get_hook_manager():
"""Create or return the global _hook_manager singleton instance."""
global _hook_manager
if _hook_manager is None:
_hook_manager = _create_hook_manager()
return _hook_manager


def _register_hooks(hook_manager: PluginManager, hooks: Iterable[Any]) -> None:
"""Register all hooks as specified in ``hooks`` with the global ``hook_manager``.
Expand Down
7 changes: 0 additions & 7 deletions kedro/framework/project/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from dynaconf import LazySettings
from dynaconf.validator import ValidationError, Validator

from kedro.framework.hooks import get_hook_manager
from kedro.framework.hooks.manager import _register_hooks, _register_hooks_setuptools
from kedro.pipeline import Pipeline


Expand Down Expand Up @@ -181,11 +179,6 @@ def configure_project(package_name: str):
settings_module = f"{package_name}.settings"
settings.configure(settings_module)

# set up all hooks so we can discover all pipelines
hook_manager = get_hook_manager()
_register_hooks(hook_manager, settings.HOOKS)
_register_hooks_setuptools(hook_manager, settings.DISABLE_HOOKS_FOR_PLUGINS)

pipelines_module = f"{package_name}.pipeline_registry"
pipelines.configure(pipelines_module)

Expand Down
13 changes: 10 additions & 3 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from kedro.config import ConfigLoader
from kedro.framework.context import KedroContext
from kedro.framework.context.context import _convert_paths_to_absolute_posix
from kedro.framework.hooks import get_hook_manager
from kedro.framework.hooks import _create_hook_manager
from kedro.framework.hooks.manager import _register_hooks, _register_hooks_setuptools
from kedro.framework.project import (
configure_logging,
configure_project,
Expand Down Expand Up @@ -107,6 +108,11 @@ def __init__(
self._package_name = package_name
self._store = self._init_store()

hook_manager = _create_hook_manager()
_register_hooks(hook_manager, settings.HOOKS)
_register_hooks_setuptools(hook_manager, settings.DISABLE_HOOKS_FOR_PLUGINS)
self._hook_manager = hook_manager

@classmethod
def create( # pylint: disable=too-many-arguments
cls,
Expand Down Expand Up @@ -244,6 +250,7 @@ def load_context(self) -> KedroContext:
config_loader=config_loader,
env=env,
extra_params=extra_params,
hook_manager=self._hook_manager,
)
return context

Expand Down Expand Up @@ -374,13 +381,13 @@ def run( # pylint: disable=too-many-arguments,too-many-locals

# Run the runner
runner = runner or SequentialRunner()
hook_manager = get_hook_manager()
hook_manager = self._hook_manager
hook_manager.hook.before_pipeline_run( # pylint: disable=no-member
run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
)

try:
run_result = runner.run(filtered_pipeline, catalog, run_id)
run_result = runner.run(filtered_pipeline, catalog, hook_manager, run_id)
except Exception as error:
hook_manager.hook.on_pipeline_error(
error=error,
Expand Down
27 changes: 20 additions & 7 deletions kedro/runner/parallel_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
from pickle import PicklingError
from typing import Any, Dict, Iterable, Set

from pluggy import PluginManager

from kedro.framework.hooks.manager import (
_create_hook_manager,
_register_hooks,
_register_hooks_setuptools,
)
from kedro.framework.project import settings
from kedro.io import DataCatalog, DataSetError, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node
Expand Down Expand Up @@ -89,11 +97,8 @@ def _run_node_synchronization( # pylint: disable=too-many-arguments
conf_logging: Dict[str, Any] = None,
) -> Node:
"""Run a single `Node` with inputs from and outputs to the `catalog`.
`KedroSession` instance is activated in every subprocess because of Windows
(and latest OSX with Python 3.8) limitation.
Windows has no "fork", so every subprocess is a brand new process
created via "spawn", hence the need to a) setup the logging, b) register
the hooks, and c) activate `KedroSession` in every subprocess.
A `PluginManager` `hook_manager` instance is created in every subprocess because
the `PluginManager` can't be serialised.
Args:
node: The ``Node`` to run.
Expand All @@ -112,7 +117,11 @@ def _run_node_synchronization( # pylint: disable=too-many-arguments
conf_logging = conf_logging or {}
_bootstrap_subprocess(package_name, conf_logging)

return run_node(node, catalog, is_async, run_id)
hook_manager = _create_hook_manager()
_register_hooks(hook_manager, settings.HOOKS)
_register_hooks_setuptools(hook_manager, settings.DISABLE_HOOKS_FOR_PLUGINS)

return run_node(node, catalog, hook_manager, is_async, run_id)


class ParallelRunner(AbstractRunner):
Expand Down Expand Up @@ -252,7 +261,11 @@ def _get_required_workers_count(self, pipeline: Pipeline):
return min(required_processes, self._max_workers)

def _run( # pylint: disable=too-many-locals,useless-suppression
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
self,
pipeline: Pipeline,
catalog: DataCatalog,
hook_manager: PluginManager,
run_id: str = None,
) -> None:
"""The abstract interface for running pipelines.
Expand Down
Loading

0 comments on commit 93d01c8

Please sign in to comment.