Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Move handlers to its own module -- to unload registries
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar committed Jan 27, 2020
1 parent 3fe6cf1 commit ac3b646
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 142 deletions.
3 changes: 2 additions & 1 deletion kopf/engines/probing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kopf.reactor import activities
from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import handlers
from kopf.reactor import lifecycles
from kopf.reactor import registries

Expand All @@ -33,7 +34,7 @@ async def health_reporter(
is cancelled or failed). Once it will stop responding for any reason,
Kubernetes will assume the pod is not alive anymore, and will restart it.
"""
probing_container: MutableMapping[registries.HandlerId, callbacks.HandlerResult] = {}
probing_container: MutableMapping[handlers.HandlerId, callbacks.HandlerResult] = {}
probing_timestamp: Optional[datetime.datetime] = None
probing_max_age = datetime.timedelta(seconds=10.0)
probing_lock = asyncio.Lock()
Expand Down
5 changes: 3 additions & 2 deletions kopf/reactor/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import handlers
from kopf.reactor import handling
from kopf.reactor import lifecycles
from kopf.reactor import registries
Expand All @@ -37,7 +38,7 @@ def __init__(
self,
msg: str,
*,
outcomes: Mapping[registries.HandlerId, states.HandlerOutcome],
outcomes: Mapping[handlers.HandlerId, states.HandlerOutcome],
) -> None:
super().__init__(msg)
self.outcomes = outcomes
Expand Down Expand Up @@ -94,7 +95,7 @@ async def run_activity(
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
activity: causation.Activity,
) -> Mapping[registries.HandlerId, callbacks.HandlerResult]:
) -> Mapping[handlers.HandlerId, callbacks.HandlerResult]:
logger = logging.getLogger(f'kopf.activities.{activity.value}')

# For the activity handlers, we have neither bodies, nor patches, just the state.
Expand Down
68 changes: 68 additions & 0 deletions kopf/reactor/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import dataclasses
import warnings
from typing import NewType, Callable, Optional, Any

from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import errors as errors_
from kopf.structs import bodies
from kopf.structs import dicts

# Strings are taken from the users, but then tainted as this type for stricter type-checking:
# to prevent usage of some other strings (e.g. operator id) as the handlers ids.
HandlerId = NewType('HandlerId', str)


# A registered handler (function + meta info).
# FIXME: Must be frozen, but mypy fails in _call_handler() with a cryptic error:
# FIXME: Argument 1 to "invoke" has incompatible type "Optional[HandlerResult]";
# FIXME: expected "Union[LifeCycleFn, ActivityHandlerFn, ResourceHandlerFn]"
@dataclasses.dataclass
class BaseHandler:
id: HandlerId
fn: Callable[..., Optional[callbacks.HandlerResult]]
errors: Optional[errors_.ErrorsMode]
timeout: Optional[float]
retries: Optional[int]
backoff: Optional[float]
cooldown: dataclasses.InitVar[Optional[float]] # deprecated, use `backoff`

def __post_init__(self, cooldown: Optional[float]) -> None:
if self.backoff is not None and cooldown is not None:
raise TypeError("Either backoff or cooldown can be set, not both.")
elif cooldown is not None:
warnings.warn("cooldown=... is deprecated, use backoff=...", DeprecationWarning)
self.backoff = cooldown

# @property cannot be used due to a data field definition with the same name.
def __getattribute__(self, name: str) -> Any:
if name == 'cooldown':
warnings.warn("handler.cooldown is deprecated, use handler.backoff", DeprecationWarning)
return self.backoff
else:
return super().__getattribute__(name)


@dataclasses.dataclass
class ActivityHandler(BaseHandler):
fn: callbacks.ActivityHandlerFn # type clarification
activity: Optional[causation.Activity] = None
_fallback: bool = False # non-public!


@dataclasses.dataclass
class ResourceHandler(BaseHandler):
fn: callbacks.ResourceHandlerFn # type clarification
reason: Optional[causation.Reason]
field: Optional[dicts.FieldPath]
initial: Optional[bool] = None
deleted: Optional[bool] = None # used for mixed-in (initial==True) @on.resume handlers only.
labels: Optional[bodies.Labels] = None
annotations: Optional[bodies.Annotations] = None
when: Optional[callbacks.WhenHandlerFn] = None
requires_finalizer: Optional[bool] = None

@property
def event(self) -> Optional[causation.Reason]:
warnings.warn("`handler.event` is deprecated; use `handler.reason`.", DeprecationWarning)
return self.reason
48 changes: 25 additions & 23 deletions kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import errors
from kopf.reactor import handlers as handlers_
from kopf.reactor import invocation
from kopf.reactor import lifecycles
from kopf.reactor import registries
Expand Down Expand Up @@ -64,14 +65,14 @@ class HandlerChildrenRetry(TemporaryError):
sublifecycle_var: ContextVar[lifecycles.LifeCycleFn] = ContextVar('sublifecycle_var')
subregistry_var: ContextVar[registries.ResourceChangingRegistry] = ContextVar('subregistry_var')
subexecuted_var: ContextVar[bool] = ContextVar('subexecuted_var')
handler_var: ContextVar[registries.BaseHandler] = ContextVar('handler_var')
handler_var: ContextVar[handlers_.BaseHandler] = ContextVar('handler_var')
cause_var: ContextVar[causation.BaseCause] = ContextVar('cause_var')


async def execute(
*,
fns: Optional[Iterable[invocation.Invokable]] = None,
handlers: Optional[Iterable[registries.ResourceHandler]] = None,
handlers: Optional[Iterable[handlers_.ResourceHandler]] = None,
registry: Optional[registries.ResourceChangingRegistry] = None,
lifecycle: Optional[lifecycles.LifeCycleFn] = None,
cause: Optional[causation.BaseCause] = None,
Expand All @@ -95,33 +96,34 @@ async def execute(
# Restore the current context as set in the handler execution cycle.
lifecycle = lifecycle if lifecycle is not None else sublifecycle_var.get()
cause = cause if cause is not None else cause_var.get()
handler: registries.BaseHandler = handler_var.get()
parent_handler: handlers_.BaseHandler = handler_var.get()
parent_prefix = parent_handler.id if parent_handler is not None else None

# Validate the inputs; the function signatures cannot put these kind of restrictions, so we do.
if len([v for v in [fns, handlers, registry] if v is not None]) > 1:
raise TypeError("Only one of the fns, handlers, registry can be passed. Got more.")

elif fns is not None and isinstance(fns, collections.abc.Mapping):
registry = registries.ResourceChangingRegistry(prefix=handler.id if handler else None)
subregistry = registries.ResourceChangingRegistry(prefix=parent_prefix)
for id, fn in fns.items():
registry.register(fn=fn, id=id)
subregistry.register(fn=fn, id=id)

elif fns is not None and isinstance(fns, collections.abc.Iterable):
registry = registries.ResourceChangingRegistry(prefix=handler.id if handler else None)
subregistry = registries.ResourceChangingRegistry(prefix=parent_prefix)
for fn in fns:
registry.register(fn=fn)
subregistry.register(fn=fn)

elif fns is not None:
raise ValueError(f"fns must be a mapping or an iterable, got {fns.__class__}.")

elif handlers is not None:
registry = registries.ResourceChangingRegistry(prefix=handler.id if handler else None)
subregistry = registries.ResourceChangingRegistry(prefix=parent_prefix)
for handler in handlers:
registry.append(handler=handler)
subregistry.append(handler=handler)

# Use the registry as is; assume that the caller knows what they do.
elif registry is not None:
pass
subregistry = registry

# Prevent double implicit execution.
elif subexecuted_var.get():
Expand All @@ -130,19 +132,19 @@ async def execute(
# If no explicit args were passed, implicitly use the accumulated handlers from `@kopf.on.this`.
else:
subexecuted_var.set(True)
registry = subregistry_var.get()
subregistry = subregistry_var.get()

# The sub-handlers are only for upper-level causes, not for lower-level events.
if not isinstance(cause, causation.ResourceChangingCause):
raise RuntimeError("Sub-handlers of event-handlers are not supported and have "
"no practical use (there are no retries or state tracking).")

# Execute the real handlers (all or few or one of them, as per the lifecycle).
handlers = registry.get_handlers(cause=cause)
state = states.State.from_body(body=cause.body, handlers=handlers)
subhandlers = subregistry.get_handlers(cause=cause)
state = states.State.from_body(body=cause.body, handlers=subhandlers)
outcomes = await execute_handlers_once(
lifecycle=lifecycle,
handlers=handlers,
handlers=subhandlers,
cause=cause,
state=state,
)
Expand All @@ -157,10 +159,10 @@ async def execute(

async def run_handlers_until_done(
cause: causation.BaseCause,
handlers: Collection[registries.BaseHandler],
handlers: Collection[handlers_.BaseHandler],
lifecycle: lifecycles.LifeCycleFn,
default_errors: errors.ErrorsMode = errors.ErrorsMode.TEMPORARY,
) -> Mapping[registries.HandlerId, states.HandlerOutcome]:
) -> Mapping[handlers_.HandlerId, states.HandlerOutcome]:
"""
Run the full cycle until all the handlers are done.
Expand All @@ -173,7 +175,7 @@ async def run_handlers_until_done(

# For the activity handlers, we have neither bodies, nor patches, just the state.
state = states.State.from_scratch(handlers=handlers)
latest_outcomes: MutableMapping[registries.HandlerId, states.HandlerOutcome] = {}
latest_outcomes: MutableMapping[handlers_.HandlerId, states.HandlerOutcome] = {}
while not state.done:
outcomes = await execute_handlers_once(
lifecycle=lifecycle,
Expand All @@ -193,11 +195,11 @@ async def run_handlers_until_done(

async def execute_handlers_once(
lifecycle: lifecycles.LifeCycleFn,
handlers: Collection[registries.BaseHandler],
handlers: Collection[handlers_.BaseHandler],
cause: causation.BaseCause,
state: states.State,
default_errors: errors.ErrorsMode = errors.ErrorsMode.TEMPORARY,
) -> Mapping[registries.HandlerId, states.HandlerOutcome]:
) -> Mapping[handlers_.HandlerId, states.HandlerOutcome]:
"""
Call the next handler(s) from the chain of the handlers.
Expand All @@ -213,7 +215,7 @@ async def execute_handlers_once(
handlers_plan = await invocation.invoke(lifecycle, handlers_todo, cause=cause, state=state)

# Execute all planned (selected) handlers in one event reaction cycle, even if there are few.
outcomes: MutableMapping[registries.HandlerId, states.HandlerOutcome] = {}
outcomes: MutableMapping[handlers_.HandlerId, states.HandlerOutcome] = {}
for handler in handlers_plan:
outcome = await execute_handler_once(
handler=handler,
Expand All @@ -228,7 +230,7 @@ async def execute_handlers_once(


async def execute_handler_once(
handler: registries.BaseHandler,
handler: handlers_.BaseHandler,
cause: causation.BaseCause,
state: states.HandlerState,
lifecycle: lifecycles.LifeCycleFn,
Expand Down Expand Up @@ -317,7 +319,7 @@ async def execute_handler_once(


async def invoke_handler(
handler: registries.BaseHandler,
handler: handlers_.BaseHandler,
*args: Any,
cause: causation.BaseCause,
lifecycle: lifecycles.LifeCycleFn,
Expand All @@ -336,7 +338,7 @@ async def invoke_handler(
# For the field-handlers, the old/new/diff values must match the field, not the whole object.
if (True and # for readable indenting
isinstance(cause, causation.ResourceChangingCause) and
isinstance(handler, registries.ResourceHandler) and
isinstance(handler, handlers_.ResourceHandler) and
handler.field is not None):
old = dicts.resolve(cause.old, handler.field, None, assume_empty=True)
new = dicts.resolve(cause.new, handler.field, None, assume_empty=True)
Expand Down
6 changes: 3 additions & 3 deletions kopf/reactor/lifecycles.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

from typing_extensions import Protocol

from kopf.reactor import registries
from kopf.reactor import handlers as handlers_
from kopf.reactor import states

logger = logging.getLogger(__name__)

Handlers = Sequence[registries.BaseHandler]
Handlers = Sequence[handlers_.BaseHandler]


class LifeCycleFn(Protocol):
Expand Down Expand Up @@ -62,7 +62,7 @@ def shuffled(handlers: Handlers, **kwargs: Any) -> Handlers:
def asap(handlers: Handlers, *, state: states.State, **kwargs: Any) -> Handlers:
""" Execute one handler at a time, skip on failure, try the next one, retry after the full cycle. """

def keyfn(handler: registries.BaseHandler) -> int:
def keyfn(handler: handlers_.BaseHandler) -> int:
return state[handler.id].retries or 0

return sorted(handlers, key=keyfn)[:1]
Expand Down
Loading

0 comments on commit ac3b646

Please sign in to comment.