Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Filter labels/annotations by callbacks with arbitrary logic #328

Merged
merged 6 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/filters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,15 @@ By arbitrary callbacks
when=lambda spec, **_: spec.get('my-field') == 'somevalue')
def my_handler(spec, **_):
pass

* Check on labels/annotations with an arbitrary callback for individual values
(the value comes as the first positional argument, plus usual :doc:`kwargs`)::

def check_value(value, spec, **_):
return value == 'some-value' and spec.get('field') is not None

@kopf.on.create('zalando.org', 'v1', 'kopfexamples',
labels={'some-label': check_value},
annotations={'some-annotation': check_value})
def my_handler(spec, **_):
pass
18 changes: 18 additions & 0 deletions examples/11-filtering-handlers/example.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import kopf


def say_yes(value, spec, **_):
return value == 'somevalue' and spec.get('field') is not None


def say_no(value, spec, **_):
return value == 'somevalue' and spec.get('field') == 'not-this-value-for-sure'


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': 'somevalue'})
def create_with_labels_matching(logger, **kwargs):
logger.info("Label is matching.")
Expand All @@ -16,6 +24,11 @@ def create_with_labels_absent(logger, **kwargs):
logger.info("Label is absent.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': say_yes})
def create_with_labels_callback_matching(logger, **kwargs):
logger.info("Label callback matching.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', annotations={'someannotation': 'somevalue'})
def create_with_annotations_matching(logger, **kwargs):
logger.info("Annotation is matching.")
Expand All @@ -31,6 +44,11 @@ def create_with_annotations_absent(logger, **kwargs):
logger.info("Annotation is absent.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', annotations={'someannotation': say_no})
def create_with_annotations_callback_matching(logger, **kwargs):
logger.info("Annotation callback mismatch.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', when=lambda body, **_: True)
def create_with_filter_satisfied(logger, **kwargs):
logger.info("Filter satisfied.")
Expand Down
2 changes: 2 additions & 0 deletions examples/11-filtering-handlers/test_example_11.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ def test_handler_filtering(mocker):
assert '[default/kopf-example-1] Label is matching.' in runner.stdout
assert '[default/kopf-example-1] Label is present.' in runner.stdout
assert '[default/kopf-example-1] Label is absent.' in runner.stdout
assert '[default/kopf-example-1] Label callback matching.' in runner.stdout
assert '[default/kopf-example-1] Annotation is matching.' in runner.stdout
assert '[default/kopf-example-1] Annotation is present.' in runner.stdout
assert '[default/kopf-example-1] Annotation is absent.' in runner.stdout
assert '[default/kopf-example-1] Annotation callback mismatch.' not in runner.stdout
assert '[default/kopf-example-1] Filter satisfied.' in runner.stdout
assert '[default/kopf-example-1] Filter not satisfied.' not in runner.stdout
4 changes: 2 additions & 2 deletions kopf/engines/probing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import aiohttp.web

from kopf.reactor import activities
from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import handlers
from kopf.reactor import lifecycles
from kopf.reactor import registries
from kopf.structs import callbacks

logger = logging.getLogger(__name__)

Expand All @@ -34,7 +34,7 @@ async def health_reporter(
is cancelled or failed). Once it will stop responding for any reason,
Kubernetes will assume the pod is not alive anymore, and will restart it.
"""
probing_container: MutableMapping[handlers.HandlerId, callbacks.HandlerResult] = {}
probing_container: MutableMapping[handlers.HandlerId, callbacks.Result] = {}
probing_timestamp: Optional[datetime.datetime] = None
probing_max_age = datetime.timedelta(seconds=10.0)
probing_lock = asyncio.Lock()
Expand Down
18 changes: 9 additions & 9 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def creation_handler(**kwargs):

from typing import Optional, Callable

from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import errors as errors_
from kopf.reactor import handlers
from kopf.reactor import handling
from kopf.reactor import registries
from kopf.structs import callbacks
from kopf.structs import dicts
from kopf.structs import filters
from kopf.structs import resources
Expand Down Expand Up @@ -136,7 +136,7 @@ def resume( # lgtm[py/similar-function]
deleted: Optional[bool] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.resume()`` handler for the object resuming on operator (re)start. """
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
Expand Down Expand Up @@ -168,7 +168,7 @@ def create( # lgtm[py/similar-function]
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.create()`` handler for the object creation. """
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
Expand Down Expand Up @@ -200,7 +200,7 @@ def update( # lgtm[py/similar-function]
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.update()`` handler for the object update or change. """
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
Expand Down Expand Up @@ -233,7 +233,7 @@ def delete( # lgtm[py/similar-function]
optional: Optional[bool] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.delete()`` handler for the object deletion. """
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
Expand Down Expand Up @@ -266,7 +266,7 @@ def field( # lgtm[py/similar-function]
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.field()`` handler for the individual field changes. """
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
Expand Down Expand Up @@ -294,7 +294,7 @@ def event( # lgtm[py/similar-function]
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> ResourceHandlerDecorator:
""" ``@kopf.on.event()`` handler for the silent spies on the events. """
def decorator(fn: callbacks.ResourceHandlerFn) -> callbacks.ResourceHandlerFn:
Expand Down Expand Up @@ -327,7 +327,7 @@ def this( # lgtm[py/similar-function]
registry: Optional[registries.ResourceChangingRegistry] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> ResourceHandlerDecorator:
"""
``@kopf.on.this()`` decorator for the dynamically generated sub-handlers.
Expand Down Expand Up @@ -388,7 +388,7 @@ def register( # lgtm[py/similar-function]
registry: Optional[registries.ResourceChangingRegistry] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> callbacks.ResourceHandlerFn:
"""
Register a function as a sub-handler of the currently executed handler.
Expand Down
4 changes: 2 additions & 2 deletions kopf/reactor/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import logging
from typing import NoReturn, Mapping

from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import handlers
from kopf.reactor import handling
from kopf.reactor import lifecycles
from kopf.reactor import registries
from kopf.reactor import states
from kopf.structs import callbacks
from kopf.structs import credentials

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -95,7 +95,7 @@ async def run_activity(
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
activity: causation.Activity,
) -> Mapping[handlers.HandlerId, callbacks.HandlerResult]:
) -> Mapping[handlers.HandlerId, callbacks.Result]:
logger = logging.getLogger(f'kopf.activities.{activity.value}')

# For the activity handlers, we have neither bodies, nor patches, just the state.
Expand Down
6 changes: 3 additions & 3 deletions kopf/reactor/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import warnings
from typing import NewType, Callable, Optional, Any

from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import errors as errors_
from kopf.structs import callbacks
from kopf.structs import dicts
from kopf.structs import filters

Expand All @@ -20,7 +20,7 @@
@dataclasses.dataclass
class BaseHandler:
id: HandlerId
fn: Callable[..., Optional[callbacks.HandlerResult]]
fn: Callable[..., Optional[callbacks.Result]]
errors: Optional[errors_.ErrorsMode]
timeout: Optional[float]
retries: Optional[int]
Expand Down Expand Up @@ -59,7 +59,7 @@ class ResourceHandler(BaseHandler):
deleted: Optional[bool] # used for mixed-in (initial==True) @on.resume handlers only.
labels: Optional[filters.MetaFilter]
annotations: Optional[filters.MetaFilter]
when: Optional[callbacks.WhenHandlerFn]
when: Optional[callbacks.WhenFilterFn]
requires_finalizer: Optional[bool]

@property
Expand Down
6 changes: 3 additions & 3 deletions kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@

from kopf.engines import logging as logging_engine
from kopf.engines import sleeping
from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import errors
from kopf.reactor import handlers as handlers_
from kopf.reactor import invocation
from kopf.reactor import lifecycles
from kopf.reactor import registries
from kopf.reactor import states
from kopf.structs import callbacks
from kopf.structs import dicts
from kopf.structs import diffs

Expand Down Expand Up @@ -340,7 +340,7 @@ async def invoke_handler(
cause: causation.BaseCause,
lifecycle: lifecycles.LifeCycleFn,
**kwargs: Any,
) -> Optional[callbacks.HandlerResult]:
) -> Optional[callbacks.Result]:
"""
Invoke one handler only, according to the calling conventions.

Expand Down Expand Up @@ -384,4 +384,4 @@ async def invoke_handler(
await execute()

# Since we know that we invoked the handler, we cast "any" result to a handler result.
return callbacks.HandlerResult(result)
return callbacks.Result(result)
2 changes: 1 addition & 1 deletion kopf/reactor/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from typing import Optional, Any, Union, List, Iterable, Iterator, Tuple, Dict, cast, TYPE_CHECKING

from kopf import config
from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.structs import callbacks

if TYPE_CHECKING:
asyncio_Future = asyncio.Future[Any]
Expand Down
45 changes: 31 additions & 14 deletions kopf/reactor/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
from typing import (Any, MutableMapping, Optional, Sequence, Collection, Iterable, Iterator,
List, Set, FrozenSet, Mapping, Callable, cast, Generic, TypeVar)

from kopf.reactor import callbacks
from kopf.reactor import causation
from kopf.reactor import errors as errors_
from kopf.reactor import handlers
from kopf.reactor import invocation
from kopf.structs import bodies
from kopf.structs import callbacks
from kopf.structs import dicts
from kopf.structs import filters
from kopf.structs import resources as resources_
Expand Down Expand Up @@ -125,7 +124,7 @@ def register(
requires_finalizer: bool = False,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> callbacks.ResourceHandlerFn:
warnings.warn("registry.register() is deprecated; "
"use @kopf.on... decorators with registry= kwarg.",
Expand Down Expand Up @@ -272,7 +271,7 @@ def register_resource_watching_handler(
id: Optional[str] = None,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> callbacks.ResourceHandlerFn:
"""
Register an additional handler function for low-level events.
Expand Down Expand Up @@ -306,7 +305,7 @@ def register_resource_changing_handler(
requires_finalizer: bool = False,
labels: Optional[filters.MetaFilter] = None,
annotations: Optional[filters.MetaFilter] = None,
when: Optional[callbacks.WhenHandlerFn] = None,
when: Optional[callbacks.WhenFilterFn] = None,
) -> callbacks.ResourceHandlerFn:
"""
Register an additional handler function for the specific resource and specific reason.
Expand Down Expand Up @@ -546,11 +545,13 @@ def match(
changed_fields: Collection[dicts.FieldPath] = frozenset(),
ignore_fields: bool = False,
) -> bool:
# Kwargs are lazily evaluated on the first _actual_ use, and shared for all filters since then.
kwargs: MutableMapping[str, Any] = {}
return all([
_matches_field(handler, changed_fields or {}, ignore_fields),
_matches_labels(handler, cause.body),
_matches_annotations(handler, cause.body),
_matches_filter_callback(handler, cause),
_matches_labels(handler, cause, kwargs),
_matches_annotations(handler, cause, kwargs),
_matches_filter_callback(handler, cause, kwargs),
])


Expand All @@ -566,26 +567,32 @@ def _matches_field(

def _matches_labels(
handler: handlers.ResourceHandler,
body: bodies.Body,
cause: causation.ResourceCause,
kwargs: MutableMapping[str, Any],
) -> bool:
return (not handler.labels or
_matches_metadata(pattern=handler.labels,
content=body.get('metadata', {}).get('labels', {})))
content=cause.body.get('metadata', {}).get('labels', {}),
kwargs=kwargs, cause=cause))


def _matches_annotations(
handler: handlers.ResourceHandler,
body: bodies.Body,
cause: causation.ResourceCause,
kwargs: MutableMapping[str, Any],
) -> bool:
return (not handler.annotations or
_matches_metadata(pattern=handler.annotations,
content=body.get('metadata', {}).get('annotations', {})))
content=cause.body.get('metadata', {}).get('annotations', {}),
kwargs=kwargs, cause=cause))


def _matches_metadata(
*,
pattern: filters.MetaFilter, # from the handler
content: Mapping[str, str], # from the body
kwargs: MutableMapping[str, Any],
cause: causation.ResourceCause,
) -> bool:
for key, value in pattern.items():
if value is filters.MetaFilterToken.ABSENT and key not in content:
Expand All @@ -594,6 +601,13 @@ def _matches_metadata(
continue
elif value is None and key in content: # deprecated; warned in @kopf.on
continue
elif callable(value):
if not kwargs:
kwargs.update(invocation.build_kwargs(cause=cause))
if value(content.get(key, None), **kwargs):
continue
else:
return False
elif key not in content:
return False
elif value != content[key]:
Expand All @@ -606,10 +620,13 @@ def _matches_metadata(
def _matches_filter_callback(
handler: handlers.ResourceHandler,
cause: causation.ResourceCause,
kwargs: MutableMapping[str, Any],
) -> bool:
if not handler.when:
if handler.when is None:
return True
return handler.when(**invocation.build_kwargs(cause=cause))
if not kwargs:
kwargs.update(invocation.build_kwargs(cause=cause))
return handler.when(**kwargs)


_default_registry: Optional[OperatorRegistry] = None
Expand Down
Loading