Skip to content

Commit

Permalink
Merge pull request #258 from Jc2k/98_filter_by_callback
Browse files Browse the repository at this point in the history
Filter by arbitrary callback function
  • Loading branch information
nolar authored Jan 17, 2020
2 parents 5e29dc0 + a8c15e2 commit 473afdf
Show file tree
Hide file tree
Showing 13 changed files with 270 additions and 70 deletions.
6 changes: 6 additions & 0 deletions docs/handlers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ The following filters are available for all event, cause, and field handlers:
def my_handler(spec, **_):
pass

* Check on any field on the body with a when callback. The filter callback takes the same args as a handler::

@kopf.on.create('zalando.org', 'v1', 'kopfexamples', when=lambda body, **_: body.get('spec', {}).get('myfield', '') == 'somevalue')
def my_handler(spec, **_):
pass


Startup handlers
================
Expand Down
10 changes: 10 additions & 0 deletions examples/11-filtering-handlers/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,13 @@ def create_with_annotations_exist(logger, **kwargs):
@kopf.on.create('zalando.org', 'v1', 'kopfexamples', annotations={'someannotation': 'othervalue'})
def create_with_annotations_not_satisfied(logger, **kwargs):
logger.info("Annotation not satisfied.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', when=lambda body, **_: True)
def create_with_filter_satisfied(logger, **kwargs):
logger.info("Filter satisfied.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', when=lambda body, **_: False)
def create_with_filter_not_satisfied(logger, **kwargs):
logger.info("Filter not satisfied.")
2 changes: 2 additions & 0 deletions examples/11-filtering-handlers/test_example_11.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,5 @@ def test_handler_filtering(mocker):
assert '[default/kopf-example-1] Annotation satisfied.' in runner.stdout
assert '[default/kopf-example-1] Annotation exists.' in runner.stdout
assert '[default/kopf-example-1] Annotation not satisfied.' not in runner.stdout
assert '[default/kopf-example-1] Filter satisfied.' in runner.stdout
assert '[default/kopf-example-1] Filter not satisfied.' not in runner.stdout
18 changes: 12 additions & 6 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def resume(
deleted: Optional[bool] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
when: Optional[registries.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.resume()`` handler for the object resuming on operator (re)start. """
actual_registry = registry if registry is not None else registries.get_default_registry()
Expand All @@ -126,7 +127,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
group=group, version=version, plural=plural,
reason=None, initial=True, deleted=deleted, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
fn=fn, labels=labels, annotations=annotations,
fn=fn, labels=labels, annotations=annotations, when=when,
)
return decorator

Expand All @@ -143,6 +144,7 @@ def create(
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
when: Optional[registries.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.create()`` handler for the object creation. """
actual_registry = registry if registry is not None else registries.get_default_registry()
Expand All @@ -151,7 +153,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
group=group, version=version, plural=plural,
reason=causation.Reason.CREATE, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
fn=fn, labels=labels, annotations=annotations,
fn=fn, labels=labels, annotations=annotations, when=when,
)
return decorator

Expand All @@ -168,6 +170,7 @@ def update(
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
when: Optional[registries.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.update()`` handler for the object update or change. """
actual_registry = registry if registry is not None else registries.get_default_registry()
Expand All @@ -176,7 +179,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
group=group, version=version, plural=plural,
reason=causation.Reason.UPDATE, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
fn=fn, labels=labels, annotations=annotations,
fn=fn, labels=labels, annotations=annotations, when=when,
)
return decorator

Expand All @@ -194,6 +197,7 @@ def delete(
optional: Optional[bool] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
when: Optional[registries.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.delete()`` handler for the object deletion. """
actual_registry = registry if registry is not None else registries.get_default_registry()
Expand All @@ -203,7 +207,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
reason=causation.Reason.DELETE, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
fn=fn, requires_finalizer=bool(not optional),
labels=labels, annotations=annotations,
labels=labels, annotations=annotations, when=when,
)
return decorator

Expand All @@ -221,6 +225,7 @@ def field(
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
when: Optional[registries.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.field()`` handler for the individual field changes. """
actual_registry = registry if registry is not None else registries.get_default_registry()
Expand All @@ -229,7 +234,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
group=group, version=version, plural=plural,
reason=None, field=field, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
fn=fn, labels=labels, annotations=annotations,
fn=fn, labels=labels, annotations=annotations, when=when,
)
return decorator

Expand All @@ -241,13 +246,14 @@ def event(
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
when: Optional[registries.WhenHandlerFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.event()`` handler for the silent spies on the events. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register_resource_watching_handler(
group=group, version=version, plural=plural,
id=id, fn=fn, labels=labels, annotations=annotations,
id=id, fn=fn, labels=labels, annotations=annotations, when=when,
)
return decorator

Expand Down
2 changes: 1 addition & 1 deletion kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async def handle_resource_changing_cause(
done = None
skip = None

requires_finalizer = registry.requires_finalizer(resource=cause.resource, body=cause.body)
requires_finalizer = registry.requires_finalizer(resource=cause.resource, cause=cause)
has_finalizer = finalizers.has_finalizers(body=cause.body)

if requires_finalizer and not has_finalizer:
Expand Down
63 changes: 39 additions & 24 deletions kopf/reactor/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import contextlib
import contextvars
import functools
from typing import Optional, Any, Union, List, Iterable, Iterator, Tuple
from typing import Optional, Any, Union, List, Iterable, Iterator, Tuple, Dict

from kopf import config
from kopf.reactor import causation
Expand Down Expand Up @@ -41,39 +41,29 @@ def context(
for var, token in reversed(tokens):
var.reset(token)


async def invoke(
fn: Invokable,
*args: Any,
cause: Optional[causation.BaseCause] = None,
**kwargs: Any,
) -> Any:
def get_invoke_arguments(
*args: Any,
cause: Optional[causation.BaseCause] = None,
**kwargs: Any
) -> Dict[str, Any]:
"""
Invoke a single function, but safely for the main asyncio process.
Used both for the handler functions and for the lifecycle callbacks.
A full set of the arguments is provided, expanding the cause to some easily
usable aliases. The function is expected to accept ``**kwargs`` for the args
that it does not use -- for forward compatibility with the new features.
The synchronous methods are executed in the executor (threads or processes),
thus making it non-blocking for the main event loop of the operator.
See: https://pymotw.com/3/asyncio/executors.html
Expand kwargs dict with fields from the causation.
"""
new_kwargs = {}
new_kwargs.update(kwargs)

# Add aliases for the kwargs, directly linked to the body, or to the assumed defaults.
if isinstance(cause, causation.BaseCause):
kwargs.update(
new_kwargs.update(
cause=cause,
logger=cause.logger,
)
if isinstance(cause, causation.ActivityCause):
kwargs.update(
new_kwargs.update(
activity=cause.activity,
)
if isinstance(cause, causation.ResourceCause):
kwargs.update(
new_kwargs.update(
patch=cause.patch,
memo=cause.memo,
body=cause.body,
Expand All @@ -85,19 +75,44 @@ async def invoke(
namespace=cause.body.get('metadata', {}).get('namespace'),
)
if isinstance(cause, causation.ResourceWatchingCause):
kwargs.update(
new_kwargs.update(
event=cause.raw,
type=cause.type,
)
if isinstance(cause, causation.ResourceChangingCause):
kwargs.update(
new_kwargs.update(
event=cause.reason, # deprecated; kept for backward-compatibility
reason=cause.reason,
diff=cause.diff,
old=cause.old,
new=cause.new,
)

return new_kwargs


async def invoke(
fn: Invokable,
*args: Any,
cause: Optional[causation.BaseCause] = None,
**kwargs: Any,
) -> Any:
"""
Invoke a single function, but safely for the main asyncio process.
Used both for the handler functions and for the lifecycle callbacks.
A full set of the arguments is provided, expanding the cause to some easily
usable aliases. The function is expected to accept ``**kwargs`` for the args
that it does not use -- for forward compatibility with the new features.
The synchronous methods are executed in the executor (threads or processes),
thus making it non-blocking for the main event loop of the operator.
See: https://pymotw.com/3/asyncio/executors.html
"""

kwargs = get_invoke_arguments(*args, cause=cause, **kwargs)

if is_async_fn(fn):
result = await fn(*args, **kwargs) # type: ignore
else:
Expand Down
Loading

0 comments on commit 473afdf

Please sign in to comment.