From ac3b646e161bc9c3a7ff94c7832c9b6592e2ce18 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Mon, 27 Jan 2020 15:47:30 +0100 Subject: [PATCH] Move handlers to its own module -- to unload registries --- kopf/engines/probing.py | 3 +- kopf/reactor/activities.py | 5 +- kopf/reactor/handlers.py | 68 +++++++++++ kopf/reactor/handling.py | 48 ++++---- kopf/reactor/lifecycles.py | 6 +- kopf/reactor/registries.py | 113 ++++-------------- kopf/reactor/states.py | 20 ++-- kopf/toolkits/legacy_registries.py | 27 +++-- tests/basic-structs/test_handlers.py | 2 +- .../test_handlers_deprecated_cooldown.py | 2 +- tests/handling/test_activity_triggering.py | 3 +- 11 files changed, 155 insertions(+), 142 deletions(-) create mode 100644 kopf/reactor/handlers.py diff --git a/kopf/engines/probing.py b/kopf/engines/probing.py index a84603c0..65ebfd2c 100644 --- a/kopf/engines/probing.py +++ b/kopf/engines/probing.py @@ -9,6 +9,7 @@ from kopf.reactor import activities from kopf.reactor import callbacks from kopf.reactor import causation +from kopf.reactor import handlers from kopf.reactor import lifecycles from kopf.reactor import registries @@ -33,7 +34,7 @@ async def health_reporter( is cancelled or failed). Once it will stop responding for any reason, Kubernetes will assume the pod is not alive anymore, and will restart it. """ - probing_container: MutableMapping[registries.HandlerId, callbacks.HandlerResult] = {} + probing_container: MutableMapping[handlers.HandlerId, callbacks.HandlerResult] = {} probing_timestamp: Optional[datetime.datetime] = None probing_max_age = datetime.timedelta(seconds=10.0) probing_lock = asyncio.Lock() diff --git a/kopf/reactor/activities.py b/kopf/reactor/activities.py index 5a32900b..24004f62 100644 --- a/kopf/reactor/activities.py +++ b/kopf/reactor/activities.py @@ -21,6 +21,7 @@ from kopf.reactor import callbacks from kopf.reactor import causation +from kopf.reactor import handlers from kopf.reactor import handling from kopf.reactor import lifecycles from kopf.reactor import registries @@ -37,7 +38,7 @@ def __init__( self, msg: str, *, - outcomes: Mapping[registries.HandlerId, states.HandlerOutcome], + outcomes: Mapping[handlers.HandlerId, states.HandlerOutcome], ) -> None: super().__init__(msg) self.outcomes = outcomes @@ -94,7 +95,7 @@ async def run_activity( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, activity: causation.Activity, -) -> Mapping[registries.HandlerId, callbacks.HandlerResult]: +) -> Mapping[handlers.HandlerId, callbacks.HandlerResult]: logger = logging.getLogger(f'kopf.activities.{activity.value}') # For the activity handlers, we have neither bodies, nor patches, just the state. diff --git a/kopf/reactor/handlers.py b/kopf/reactor/handlers.py new file mode 100644 index 00000000..12f50307 --- /dev/null +++ b/kopf/reactor/handlers.py @@ -0,0 +1,68 @@ +import dataclasses +import warnings +from typing import NewType, Callable, Optional, Any + +from kopf.reactor import callbacks +from kopf.reactor import causation +from kopf.reactor import errors as errors_ +from kopf.structs import bodies +from kopf.structs import dicts + +# Strings are taken from the users, but then tainted as this type for stricter type-checking: +# to prevent usage of some other strings (e.g. operator id) as the handlers ids. +HandlerId = NewType('HandlerId', str) + + +# A registered handler (function + meta info). +# FIXME: Must be frozen, but mypy fails in _call_handler() with a cryptic error: +# FIXME: Argument 1 to "invoke" has incompatible type "Optional[HandlerResult]"; +# FIXME: expected "Union[LifeCycleFn, ActivityHandlerFn, ResourceHandlerFn]" +@dataclasses.dataclass +class BaseHandler: + id: HandlerId + fn: Callable[..., Optional[callbacks.HandlerResult]] + errors: Optional[errors_.ErrorsMode] + timeout: Optional[float] + retries: Optional[int] + backoff: Optional[float] + cooldown: dataclasses.InitVar[Optional[float]] # deprecated, use `backoff` + + def __post_init__(self, cooldown: Optional[float]) -> None: + if self.backoff is not None and cooldown is not None: + raise TypeError("Either backoff or cooldown can be set, not both.") + elif cooldown is not None: + warnings.warn("cooldown=... is deprecated, use backoff=...", DeprecationWarning) + self.backoff = cooldown + + # @property cannot be used due to a data field definition with the same name. + def __getattribute__(self, name: str) -> Any: + if name == 'cooldown': + warnings.warn("handler.cooldown is deprecated, use handler.backoff", DeprecationWarning) + return self.backoff + else: + return super().__getattribute__(name) + + +@dataclasses.dataclass +class ActivityHandler(BaseHandler): + fn: callbacks.ActivityHandlerFn # type clarification + activity: Optional[causation.Activity] = None + _fallback: bool = False # non-public! + + +@dataclasses.dataclass +class ResourceHandler(BaseHandler): + fn: callbacks.ResourceHandlerFn # type clarification + reason: Optional[causation.Reason] + field: Optional[dicts.FieldPath] + initial: Optional[bool] = None + deleted: Optional[bool] = None # used for mixed-in (initial==True) @on.resume handlers only. + labels: Optional[bodies.Labels] = None + annotations: Optional[bodies.Annotations] = None + when: Optional[callbacks.WhenHandlerFn] = None + requires_finalizer: Optional[bool] = None + + @property + def event(self) -> Optional[causation.Reason]: + warnings.warn("`handler.event` is deprecated; use `handler.reason`.", DeprecationWarning) + return self.reason diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 4c06d602..d46c9702 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -18,6 +18,7 @@ from kopf.reactor import callbacks from kopf.reactor import causation from kopf.reactor import errors +from kopf.reactor import handlers as handlers_ from kopf.reactor import invocation from kopf.reactor import lifecycles from kopf.reactor import registries @@ -64,14 +65,14 @@ class HandlerChildrenRetry(TemporaryError): sublifecycle_var: ContextVar[lifecycles.LifeCycleFn] = ContextVar('sublifecycle_var') subregistry_var: ContextVar[registries.ResourceChangingRegistry] = ContextVar('subregistry_var') subexecuted_var: ContextVar[bool] = ContextVar('subexecuted_var') -handler_var: ContextVar[registries.BaseHandler] = ContextVar('handler_var') +handler_var: ContextVar[handlers_.BaseHandler] = ContextVar('handler_var') cause_var: ContextVar[causation.BaseCause] = ContextVar('cause_var') async def execute( *, fns: Optional[Iterable[invocation.Invokable]] = None, - handlers: Optional[Iterable[registries.ResourceHandler]] = None, + handlers: Optional[Iterable[handlers_.ResourceHandler]] = None, registry: Optional[registries.ResourceChangingRegistry] = None, lifecycle: Optional[lifecycles.LifeCycleFn] = None, cause: Optional[causation.BaseCause] = None, @@ -95,33 +96,34 @@ async def execute( # Restore the current context as set in the handler execution cycle. lifecycle = lifecycle if lifecycle is not None else sublifecycle_var.get() cause = cause if cause is not None else cause_var.get() - handler: registries.BaseHandler = handler_var.get() + parent_handler: handlers_.BaseHandler = handler_var.get() + parent_prefix = parent_handler.id if parent_handler is not None else None # Validate the inputs; the function signatures cannot put these kind of restrictions, so we do. if len([v for v in [fns, handlers, registry] if v is not None]) > 1: raise TypeError("Only one of the fns, handlers, registry can be passed. Got more.") elif fns is not None and isinstance(fns, collections.abc.Mapping): - registry = registries.ResourceChangingRegistry(prefix=handler.id if handler else None) + subregistry = registries.ResourceChangingRegistry(prefix=parent_prefix) for id, fn in fns.items(): - registry.register(fn=fn, id=id) + subregistry.register(fn=fn, id=id) elif fns is not None and isinstance(fns, collections.abc.Iterable): - registry = registries.ResourceChangingRegistry(prefix=handler.id if handler else None) + subregistry = registries.ResourceChangingRegistry(prefix=parent_prefix) for fn in fns: - registry.register(fn=fn) + subregistry.register(fn=fn) elif fns is not None: raise ValueError(f"fns must be a mapping or an iterable, got {fns.__class__}.") elif handlers is not None: - registry = registries.ResourceChangingRegistry(prefix=handler.id if handler else None) + subregistry = registries.ResourceChangingRegistry(prefix=parent_prefix) for handler in handlers: - registry.append(handler=handler) + subregistry.append(handler=handler) # Use the registry as is; assume that the caller knows what they do. elif registry is not None: - pass + subregistry = registry # Prevent double implicit execution. elif subexecuted_var.get(): @@ -130,7 +132,7 @@ async def execute( # If no explicit args were passed, implicitly use the accumulated handlers from `@kopf.on.this`. else: subexecuted_var.set(True) - registry = subregistry_var.get() + subregistry = subregistry_var.get() # The sub-handlers are only for upper-level causes, not for lower-level events. if not isinstance(cause, causation.ResourceChangingCause): @@ -138,11 +140,11 @@ async def execute( "no practical use (there are no retries or state tracking).") # Execute the real handlers (all or few or one of them, as per the lifecycle). - handlers = registry.get_handlers(cause=cause) - state = states.State.from_body(body=cause.body, handlers=handlers) + subhandlers = subregistry.get_handlers(cause=cause) + state = states.State.from_body(body=cause.body, handlers=subhandlers) outcomes = await execute_handlers_once( lifecycle=lifecycle, - handlers=handlers, + handlers=subhandlers, cause=cause, state=state, ) @@ -157,10 +159,10 @@ async def execute( async def run_handlers_until_done( cause: causation.BaseCause, - handlers: Collection[registries.BaseHandler], + handlers: Collection[handlers_.BaseHandler], lifecycle: lifecycles.LifeCycleFn, default_errors: errors.ErrorsMode = errors.ErrorsMode.TEMPORARY, -) -> Mapping[registries.HandlerId, states.HandlerOutcome]: +) -> Mapping[handlers_.HandlerId, states.HandlerOutcome]: """ Run the full cycle until all the handlers are done. @@ -173,7 +175,7 @@ async def run_handlers_until_done( # For the activity handlers, we have neither bodies, nor patches, just the state. state = states.State.from_scratch(handlers=handlers) - latest_outcomes: MutableMapping[registries.HandlerId, states.HandlerOutcome] = {} + latest_outcomes: MutableMapping[handlers_.HandlerId, states.HandlerOutcome] = {} while not state.done: outcomes = await execute_handlers_once( lifecycle=lifecycle, @@ -193,11 +195,11 @@ async def run_handlers_until_done( async def execute_handlers_once( lifecycle: lifecycles.LifeCycleFn, - handlers: Collection[registries.BaseHandler], + handlers: Collection[handlers_.BaseHandler], cause: causation.BaseCause, state: states.State, default_errors: errors.ErrorsMode = errors.ErrorsMode.TEMPORARY, -) -> Mapping[registries.HandlerId, states.HandlerOutcome]: +) -> Mapping[handlers_.HandlerId, states.HandlerOutcome]: """ Call the next handler(s) from the chain of the handlers. @@ -213,7 +215,7 @@ async def execute_handlers_once( handlers_plan = await invocation.invoke(lifecycle, handlers_todo, cause=cause, state=state) # Execute all planned (selected) handlers in one event reaction cycle, even if there are few. - outcomes: MutableMapping[registries.HandlerId, states.HandlerOutcome] = {} + outcomes: MutableMapping[handlers_.HandlerId, states.HandlerOutcome] = {} for handler in handlers_plan: outcome = await execute_handler_once( handler=handler, @@ -228,7 +230,7 @@ async def execute_handlers_once( async def execute_handler_once( - handler: registries.BaseHandler, + handler: handlers_.BaseHandler, cause: causation.BaseCause, state: states.HandlerState, lifecycle: lifecycles.LifeCycleFn, @@ -317,7 +319,7 @@ async def execute_handler_once( async def invoke_handler( - handler: registries.BaseHandler, + handler: handlers_.BaseHandler, *args: Any, cause: causation.BaseCause, lifecycle: lifecycles.LifeCycleFn, @@ -336,7 +338,7 @@ async def invoke_handler( # For the field-handlers, the old/new/diff values must match the field, not the whole object. if (True and # for readable indenting isinstance(cause, causation.ResourceChangingCause) and - isinstance(handler, registries.ResourceHandler) and + isinstance(handler, handlers_.ResourceHandler) and handler.field is not None): old = dicts.resolve(cause.old, handler.field, None, assume_empty=True) new = dicts.resolve(cause.new, handler.field, None, assume_empty=True) diff --git a/kopf/reactor/lifecycles.py b/kopf/reactor/lifecycles.py index 2c350799..b00f1783 100644 --- a/kopf/reactor/lifecycles.py +++ b/kopf/reactor/lifecycles.py @@ -14,12 +14,12 @@ from typing_extensions import Protocol -from kopf.reactor import registries +from kopf.reactor import handlers as handlers_ from kopf.reactor import states logger = logging.getLogger(__name__) -Handlers = Sequence[registries.BaseHandler] +Handlers = Sequence[handlers_.BaseHandler] class LifeCycleFn(Protocol): @@ -62,7 +62,7 @@ def shuffled(handlers: Handlers, **kwargs: Any) -> Handlers: def asap(handlers: Handlers, *, state: states.State, **kwargs: Any) -> Handlers: """ Execute one handler at a time, skip on failure, try the next one, retry after the full cycle. """ - def keyfn(handler: registries.BaseHandler) -> int: + def keyfn(handler: handlers_.BaseHandler) -> int: return state[handler.id].retries or 0 return sorted(handlers, key=keyfn)[:1] diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index 5f900b4d..7363deba 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -13,85 +13,24 @@ """ import abc import collections -import dataclasses import functools -import warnings from types import FunctionType, MethodType from typing import (Any, MutableMapping, Optional, Sequence, Collection, Iterable, Iterator, - List, Set, FrozenSet, Mapping, NewType, Callable, cast, Generic, TypeVar) + List, Set, FrozenSet, Mapping, Callable, cast, Generic, TypeVar) from kopf.reactor import callbacks from kopf.reactor import causation from kopf.reactor import errors as errors_ +from kopf.reactor import handlers from kopf.reactor import invocation from kopf.structs import bodies from kopf.structs import dicts from kopf.structs import resources as resources_ from kopf.utilities import piggybacking -# Strings are taken from the users, but then tainted as this type for stricter type-checking: -# to prevent usage of some other strings (e.g. operator id) as the handlers ids. -HandlerId = NewType('HandlerId', str) - - -# A registered handler (function + meta info). -# FIXME: Must be frozen, but mypy fails in _call_handler() with a cryptic error: -# FIXME: Argument 1 to "invoke" has incompatible type "Optional[HandlerResult]"; -# FIXME: expected "Union[LifeCycleFn, ActivityHandlerFn, ResourceHandlerFn]" -@dataclasses.dataclass -class BaseHandler: - id: HandlerId - fn: Callable[..., Optional[callbacks.HandlerResult]] - errors: Optional[errors_.ErrorsMode] - timeout: Optional[float] - retries: Optional[int] - backoff: Optional[float] - cooldown: dataclasses.InitVar[Optional[float]] # deprecated, use `backoff` - - def __post_init__(self, cooldown: Optional[float]) -> None: - if self.backoff is not None and cooldown is not None: - raise TypeError("Either backoff or cooldown can be set, not both.") - elif cooldown is not None: - warnings.warn("cooldown=... is deprecated, use backoff=...", DeprecationWarning) - self.backoff = cooldown - - # @property cannot be used due to a data field definition with the same name. - def __getattribute__(self, name: str) -> Any: - if name == 'cooldown': - warnings.warn("handler.cooldown is deprecated, use handler.backoff", DeprecationWarning) - return self.backoff - else: - return super().__getattribute__(name) - - -@dataclasses.dataclass -class ActivityHandler(BaseHandler): - fn: callbacks.ActivityHandlerFn # type clarification - activity: Optional[causation.Activity] = None - _fallback: bool = False # non-public! - - -@dataclasses.dataclass -class ResourceHandler(BaseHandler): - fn: callbacks.ResourceHandlerFn # type clarification - reason: Optional[causation.Reason] - field: Optional[dicts.FieldPath] - initial: Optional[bool] = None - deleted: Optional[bool] = None # used for mixed-in (initial==True) @on.resume handlers only. - labels: Optional[bodies.Labels] = None - annotations: Optional[bodies.Annotations] = None - when: Optional[callbacks.WhenHandlerFn] = None - requires_finalizer: Optional[bool] = None - - @property - def event(self) -> Optional[causation.Reason]: - warnings.warn("`handler.event` is deprecated; use `handler.reason`.", DeprecationWarning) - return self.reason - - # We only type-check for known classes of handlers/callbacks, and ignore any custom subclasses. HandlerFnT = TypeVar('HandlerFnT', callbacks.ActivityHandlerFn, callbacks.ResourceHandlerFn) -HandlerT = TypeVar('HandlerT', ActivityHandler, ResourceHandler) +HandlerT = TypeVar('HandlerT', handlers.ActivityHandler, handlers.ResourceHandler) CauseT = TypeVar('CauseT', bound=causation.BaseCause) @@ -111,7 +50,7 @@ def append(self, handler: HandlerT) -> None: self._handlers.append(handler) -class ActivityRegistry(GenericRegistry[ActivityHandler, callbacks.ActivityHandlerFn]): +class ActivityRegistry(GenericRegistry[handlers.ActivityHandler, callbacks.ActivityHandlerFn]): """ An actual registry of activity handlers. """ def register( @@ -128,7 +67,7 @@ def register( _fallback: bool = False, ) -> callbacks.ActivityHandlerFn: real_id = generate_id(fn=fn, id=id, prefix=self.prefix) - handler = ActivityHandler( + handler = handlers.ActivityHandler( id=real_id, fn=fn, activity=activity, errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown, _fallback=_fallback, @@ -139,13 +78,13 @@ def register( def get_handlers( self, activity: causation.Activity, - ) -> Sequence[ActivityHandler]: + ) -> Sequence[handlers.ActivityHandler]: return list(_deduplicated(self.iter_handlers(activity=activity))) def iter_handlers( self, activity: causation.Activity, - ) -> Iterator[ActivityHandler]: + ) -> Iterator[handlers.ActivityHandler]: found: bool = False # Regular handlers go first. @@ -161,7 +100,7 @@ def iter_handlers( yield handler -class ResourceRegistry(GenericRegistry[ResourceHandler, callbacks.ResourceHandlerFn], Generic[CauseT]): +class ResourceRegistry(GenericRegistry[handlers.ResourceHandler, callbacks.ResourceHandlerFn], Generic[CauseT]): """ An actual registry of resource handlers. """ def register( @@ -189,7 +128,7 @@ def register( real_field = dicts.parse_field(field) or None # to not store tuple() as a no-field case. real_id = generate_id(fn=fn, id=id, prefix=self.prefix, suffix=".".join(real_field or [])) - handler = ResourceHandler( + handler = handlers.ResourceHandler( id=real_id, fn=fn, reason=reason, field=real_field, errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown, initial=initial, deleted=deleted, requires_finalizer=requires_finalizer, @@ -202,14 +141,14 @@ def register( def get_handlers( self, cause: CauseT, - ) -> Sequence[ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: return list(_deduplicated(self.iter_handlers(cause=cause))) @abc.abstractmethod def iter_handlers( self, cause: CauseT, - ) -> Iterator[ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: raise NotImplementedError def get_extra_fields( @@ -241,7 +180,7 @@ class ResourceWatchingRegistry(ResourceRegistry[causation.ResourceWatchingCause] def iter_handlers( self, cause: causation.ResourceWatchingCause, - ) -> Iterator[ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: for handler in self._handlers: if match(handler=handler, cause=cause, ignore_fields=True): yield handler @@ -252,7 +191,7 @@ class ResourceChangingRegistry(ResourceRegistry[causation.ResourceChangingCause] def iter_handlers( self, cause: causation.ResourceChangingCause, - ) -> Iterator[ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: changed_fields = frozenset(field for _, field, _, _ in cause.diff or []) for handler in self._handlers: if handler.reason is None or handler.reason == cause.reason: @@ -381,32 +320,32 @@ def get_activity_handlers( self, *, activity: causation.Activity, - ) -> Sequence[ActivityHandler]: + ) -> Sequence[handlers.ActivityHandler]: return list(_deduplicated(self.iter_activity_handlers(activity=activity))) def get_resource_watching_handlers( self, cause: causation.ResourceWatchingCause, - ) -> Sequence[ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: return list(_deduplicated(self.iter_resource_watching_handlers(cause=cause))) def get_resource_changing_handlers( self, cause: causation.ResourceChangingCause, - ) -> Sequence[ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: return list(_deduplicated(self.iter_resource_changing_handlers(cause=cause))) def iter_activity_handlers( self, *, activity: causation.Activity, - ) -> Iterator[ActivityHandler]: + ) -> Iterator[handlers.ActivityHandler]: yield from self._activity_handlers.iter_handlers(activity=activity) def iter_resource_watching_handlers( self, cause: causation.ResourceWatchingCause, - ) -> Iterator[ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: """ Iterate all handlers for the low-level events. """ @@ -416,7 +355,7 @@ def iter_resource_watching_handlers( def iter_resource_changing_handlers( self, cause: causation.ResourceChangingCause, - ) -> Iterator[ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: """ Iterate all handlers that match this cause/event, in the order they were registered (even if mixed). """ @@ -485,12 +424,12 @@ def generate_id( id: Optional[str], prefix: Optional[str] = None, suffix: Optional[str] = None, -) -> HandlerId: +) -> handlers.HandlerId: real_id: str real_id = id if id is not None else get_callable_id(fn) real_id = real_id if not suffix else f'{real_id}/{suffix}' real_id = real_id if not prefix else f'{prefix}/{real_id}' - return cast(HandlerId, real_id) + return cast(handlers.HandlerId, real_id) def get_callable_id(c: Callable[..., Any]) -> str: @@ -546,7 +485,7 @@ def fn(**kwargs): pass def match( - handler: ResourceHandler, + handler: handlers.ResourceHandler, cause: causation.ResourceCause, changed_fields: Collection[dicts.FieldPath] = frozenset(), ignore_fields: bool = False, @@ -560,7 +499,7 @@ def match( def _matches_field( - handler: ResourceHandler, + handler: handlers.ResourceHandler, changed_fields: Collection[dicts.FieldPath] = frozenset(), ignore_fields: bool = False, ) -> bool: @@ -570,7 +509,7 @@ def _matches_field( def _matches_labels( - handler: ResourceHandler, + handler: handlers.ResourceHandler, body: bodies.Body, ) -> bool: return (not handler.labels or @@ -579,7 +518,7 @@ def _matches_labels( def _matches_annotations( - handler: ResourceHandler, + handler: handlers.ResourceHandler, body: bodies.Body, ) -> bool: return (not handler.annotations or @@ -603,7 +542,7 @@ def _matches_metadata( def _matches_filter_callback( - handler: ResourceHandler, + handler: handlers.ResourceHandler, cause: causation.ResourceCause, ) -> bool: if not handler.when: diff --git a/kopf/reactor/states.py b/kopf/reactor/states.py index d755d6b8..5e22aa1d 100644 --- a/kopf/reactor/states.py +++ b/kopf/reactor/states.py @@ -56,7 +56,7 @@ from typing import Any, Optional, Mapping, Dict, Collection, Iterator, cast, overload from kopf.reactor import callbacks -from kopf.reactor import registries +from kopf.reactor import handlers from kopf.structs import bodies from kopf.structs import patches @@ -167,7 +167,7 @@ def runtime(self) -> datetime.timedelta: return now - (self.started if self.started else now) -class State(Mapping[registries.HandlerId, HandlerState]): +class State(Mapping[handlers.HandlerId, HandlerState]): """ A state of selected handlers, as persisted in the object's status. @@ -178,11 +178,11 @@ class State(Mapping[registries.HandlerId, HandlerState]): reflect the changes in the object's status. A new state is created every time some changes/outcomes are merged into the current state. """ - _states: Mapping[registries.HandlerId, HandlerState] + _states: Mapping[handlers.HandlerId, HandlerState] def __init__( self, - __src: Mapping[registries.HandlerId, HandlerState], + __src: Mapping[handlers.HandlerId, HandlerState], ): super().__init__() self._states = dict(__src) @@ -191,7 +191,7 @@ def __init__( def from_scratch( cls, *, - handlers: Collection[registries.BaseHandler], + handlers: Collection[handlers.BaseHandler], ) -> "State": return cls.from_body(cast(bodies.Body, {}), handlers=handlers) @@ -200,7 +200,7 @@ def from_body( cls, body: bodies.Body, *, - handlers: Collection[registries.BaseHandler], + handlers: Collection[handlers.BaseHandler], ) -> "State": storage = body.get('status', {}).get('kopf', {}) progress = storage.get('progress', {}) @@ -214,7 +214,7 @@ def from_body( def with_outcomes( self, - outcomes: Mapping[registries.HandlerId, HandlerOutcome], + outcomes: Mapping[handlers.HandlerId, HandlerOutcome], ) -> "State": unknown_ids = [handler_id for handler_id in outcomes if handler_id not in self] if unknown_ids: @@ -251,10 +251,10 @@ def purge(self, patch: patches.Patch, body: bodies.Body) -> None: def __len__(self) -> int: return len(self._states) - def __iter__(self) -> Iterator[registries.HandlerId]: + def __iter__(self) -> Iterator[handlers.HandlerId]: return iter(self._states) - def __getitem__(self, item: registries.HandlerId) -> HandlerState: + def __getitem__(self, item: handlers.HandlerId) -> HandlerState: return self._states[item] @property @@ -277,7 +277,7 @@ def delay(self) -> Optional[float]: def deliver_results( *, - outcomes: Mapping[registries.HandlerId, HandlerOutcome], + outcomes: Mapping[handlers.HandlerId, HandlerOutcome], patch: patches.Patch, ) -> None: """ diff --git a/kopf/toolkits/legacy_registries.py b/kopf/toolkits/legacy_registries.py index e780a9bb..9d520b3f 100644 --- a/kopf/toolkits/legacy_registries.py +++ b/kopf/toolkits/legacy_registries.py @@ -10,6 +10,7 @@ from typing import Any, Union, Sequence, Iterator from kopf.reactor import causation +from kopf.reactor import handlers from kopf.reactor import registries from kopf.structs import bodies from kopf.structs import patches @@ -30,14 +31,14 @@ def get_event_handlers( self, resource: resources_.Resource, event: bodies.Event, - ) -> Sequence[registries.ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: raise NotImplementedError @abc.abstractmethod def get_cause_handlers( self, cause: causation.ResourceChangingCause, - ) -> Sequence[registries.ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: raise NotImplementedError @abc.abstractmethod @@ -45,14 +46,14 @@ def iter_event_handlers( self, resource: resources_.Resource, event: bodies.Event, - ) -> Iterator[registries.ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: raise NotImplementedError @abc.abstractmethod def iter_cause_handlers( self, cause: causation.ResourceChangingCause, - ) -> Iterator[registries.ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: raise NotImplementedError @@ -68,14 +69,14 @@ class SimpleRegistry(BaseRegistry, registries.ResourceRegistry[AnyCause]): def iter_handlers( self, cause: AnyCause, - ) -> Iterator[registries.ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: yield from self._handlers def get_event_handlers( self, resource: resources_.Resource, event: bodies.Event, - ) -> Sequence[registries.ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: warnings.warn("SimpleRegistry.get_event_handlers() is deprecated; use " "ResourceWatchingRegistry.get_handlers().", DeprecationWarning) return list(registries._deduplicated(self.iter_event_handlers( @@ -84,7 +85,7 @@ def get_event_handlers( def get_cause_handlers( self, cause: causation.ResourceChangingCause, - ) -> Sequence[registries.ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: warnings.warn("SimpleRegistry.get_cause_handlers() is deprecated; use " "ResourceChangingRegistry.get_handlers().", DeprecationWarning) return list(registries._deduplicated(self.iter_cause_handlers(cause=cause))) @@ -93,7 +94,7 @@ def iter_event_handlers( self, resource: resources_.Resource, event: bodies.Event, - ) -> Iterator[registries.ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: warnings.warn("SimpleRegistry.iter_event_handlers() is deprecated; use " "ResourceWatchingRegistry.iter_handlers().", DeprecationWarning) @@ -105,7 +106,7 @@ def iter_event_handlers( def iter_cause_handlers( self, cause: causation.ResourceChangingCause, - ) -> Iterator[registries.ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: warnings.warn("SimpleRegistry.iter_cause_handlers() is deprecated; use " "ResourceChangingRegistry.iter_handlers().", DeprecationWarning) @@ -150,7 +151,7 @@ def get_event_handlers( self, resource: resources_.Resource, event: bodies.Event, - ) -> Sequence[registries.ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: warnings.warn("GlobalRegistry.get_event_handlers() is deprecated; use " "OperatorRegistry.get_resource_watching_handlers().", DeprecationWarning) cause = _create_watching_cause(resource=resource, event=event) @@ -159,7 +160,7 @@ def get_event_handlers( def get_cause_handlers( self, cause: causation.ResourceChangingCause, - ) -> Sequence[registries.ResourceHandler]: + ) -> Sequence[handlers.ResourceHandler]: warnings.warn("GlobalRegistry.get_cause_handlers() is deprecated; use " "OperatorRegistry.get_resource_changing_handlers().", DeprecationWarning) return self.get_resource_changing_handlers(cause=cause) @@ -168,7 +169,7 @@ def iter_event_handlers( self, resource: resources_.Resource, event: bodies.Event, - ) -> Iterator[registries.ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: warnings.warn("GlobalRegistry.iter_event_handlers() is deprecated; use " "OperatorRegistry.iter_resource_watching_handlers().", DeprecationWarning) cause = _create_watching_cause(resource=resource, event=event) @@ -177,7 +178,7 @@ def iter_event_handlers( def iter_cause_handlers( self, cause: causation.ResourceChangingCause, - ) -> Iterator[registries.ResourceHandler]: + ) -> Iterator[handlers.ResourceHandler]: warnings.warn("GlobalRegistry.iter_cause_handlers() is deprecated; use " "OperatorRegistry.iter_resource_changing_handlers().", DeprecationWarning) yield from self.iter_resource_changing_handlers(cause=cause) diff --git a/tests/basic-structs/test_handlers.py b/tests/basic-structs/test_handlers.py index 941bdc7c..2d38772b 100644 --- a/tests/basic-structs/test_handlers.py +++ b/tests/basic-structs/test_handlers.py @@ -1,6 +1,6 @@ import pytest -from kopf.reactor.registries import ActivityHandler, ResourceHandler +from kopf.reactor.handlers import ActivityHandler, ResourceHandler @pytest.mark.parametrize('cls', [ActivityHandler, ResourceHandler]) diff --git a/tests/basic-structs/test_handlers_deprecated_cooldown.py b/tests/basic-structs/test_handlers_deprecated_cooldown.py index 2224465d..3a54a67f 100644 --- a/tests/basic-structs/test_handlers_deprecated_cooldown.py +++ b/tests/basic-structs/test_handlers_deprecated_cooldown.py @@ -1,5 +1,5 @@ # Original test-file: tests/basic-structs/test_handlers.py -from kopf.reactor.registries import ActivityHandler, ResourceHandler +from kopf.reactor.handlers import ActivityHandler, ResourceHandler def test_activity_handler_with_deprecated_cooldown_instead_of_backoff(mocker): diff --git a/tests/handling/test_activity_triggering.py b/tests/handling/test_activity_triggering.py index 4cfb3f00..98d47342 100644 --- a/tests/handling/test_activity_triggering.py +++ b/tests/handling/test_activity_triggering.py @@ -5,9 +5,10 @@ from kopf.reactor.activities import ActivityError, run_activity from kopf.reactor.causation import Activity +from kopf.reactor.handlers import HandlerId from kopf.reactor.handling import PermanentError, TemporaryError from kopf.reactor.lifecycles import all_at_once -from kopf.reactor.registries import HandlerId, OperatorRegistry +from kopf.reactor.registries import OperatorRegistry from kopf.reactor.states import HandlerOutcome