From 86cab50e6e33cef2361e25559a83614548d19728 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 7 Nov 2019 00:15:17 +0100 Subject: [PATCH 1/5] Correct the logging sentence for the NO-OP reason --- kopf/reactor/handling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index c5c34915..873e8296 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -319,7 +319,7 @@ async def handle_resource_changing_cause( logger.debug("Deletion event, but we are done with it, and we do not care.") if cause.reason == causation.Reason.NOOP: - logger.debug("Something has changed, but we are not interested (state is the same).") + logger.debug("Something has changed, but we are not interested (the essence is the same).") # For the case of a newly created object, or one that doesn't have the correct # finalizers, lock it to this operator. Not all newly created objects will From 1dd4c247dd3c85d9adb88b1d9b31d9f0db7703d2 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Wed, 6 Nov 2019 23:56:33 +0100 Subject: [PATCH 2/5] Make the operator invocation stricter: kwargs only, no positional args --- kopf/reactor/running.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kopf/reactor/running.py b/kopf/reactor/running.py index 1e05eb52..11d2e503 100644 --- a/kopf/reactor/running.py +++ b/kopf/reactor/running.py @@ -76,6 +76,7 @@ def login( def run( + *, loop: Optional[asyncio.AbstractEventLoop] = None, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, @@ -110,6 +111,7 @@ def run( async def operator( + *, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, standalone: bool = False, @@ -144,6 +146,7 @@ async def operator( async def spawn_tasks( + *, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, standalone: bool = False, From 10d9f9195771c5a394742f627485360617e487fb Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 7 Nov 2019 00:09:39 +0100 Subject: [PATCH 3/5] Store the per-object in-memory entries for the handling routines --- kopf/reactor/handling.py | 12 ++++ kopf/reactor/running.py | 8 +++ kopf/structs/containers.py | 80 ++++++++++++++++++++++++++ tests/basic-structs/test_containers.py | 40 +++++++++++++ tests/handling/test_cause_handling.py | 9 +++ tests/handling/test_cause_logging.py | 4 ++ tests/handling/test_delays.py | 3 + tests/handling/test_errors.py | 4 ++ tests/handling/test_event_handling.py | 3 + tests/handling/test_freezing.py | 2 + tests/handling/test_multistep.py | 3 + tests/handling/test_no_handlers.py | 2 + tests/handling/test_retrying_limits.py | 4 +- 13 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 kopf/structs/containers.py create mode 100644 tests/basic-structs/test_containers.py diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 873e8296..4097d061 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -30,6 +30,7 @@ from kopf.reactor import registries from kopf.reactor import states from kopf.structs import bodies +from kopf.structs import containers from kopf.structs import dicts from kopf.structs import diffs from kopf.structs import finalizers @@ -145,6 +146,7 @@ async def activity_trigger( async def resource_handler( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + memories: containers.ResourceMemories, resource: resources.Resource, event: bodies.Event, freeze: asyncio.Event, @@ -174,6 +176,12 @@ async def resource_handler( logger.debug("Ignoring the events due to freeze.") return + # Recall what is stored about that object. Share it in little portions with the consumers. + # And immediately forget it if the object is deleted from the cluster (but keep in memory). + memory = await memories.recall(body) + if event['type'] == 'DELETED': + await memories.forget(body) + # Invoke all silent spies. No causation, no progress storage is performed. if registry.has_resource_watching_handlers(resource=resource): resource_watching_cause = causation.detect_resource_watching_cause( @@ -185,6 +193,7 @@ async def resource_handler( await handle_resource_watching_cause( lifecycle=lifecycles.all_at_once, registry=registry, + memory=memory, cause=resource_watching_cause, ) @@ -206,6 +215,7 @@ async def resource_handler( delay = await handle_resource_changing_cause( lifecycle=lifecycle, registry=registry, + memory=memory, cause=resource_changing_cause, ) @@ -234,6 +244,7 @@ async def resource_handler( async def handle_resource_watching_cause( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + memory: containers.ResourceMemory, cause: causation.ResourceWatchingCause, ) -> None: """ @@ -262,6 +273,7 @@ async def handle_resource_watching_cause( async def handle_resource_changing_cause( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + memory: containers.ResourceMemory, cause: causation.ResourceChangingCause, ) -> Optional[float]: """ diff --git a/kopf/reactor/running.py b/kopf/reactor/running.py index 11d2e503..6cd37bf7 100644 --- a/kopf/reactor/running.py +++ b/kopf/reactor/running.py @@ -17,6 +17,7 @@ from kopf.reactor import lifecycles from kopf.reactor import queueing from kopf.reactor import registries +from kopf.structs import containers from kopf.structs import credentials if TYPE_CHECKING: @@ -80,6 +81,7 @@ def run( loop: Optional[asyncio.AbstractEventLoop] = None, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, peering_name: Optional[str] = None, @@ -98,6 +100,7 @@ def run( loop.run_until_complete(operator( lifecycle=lifecycle, registry=registry, + memories=memories, standalone=standalone, namespace=namespace, priority=priority, @@ -114,6 +117,7 @@ async def operator( *, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, peering_name: Optional[str] = None, @@ -134,6 +138,7 @@ async def operator( operator_tasks = await spawn_tasks( lifecycle=lifecycle, registry=registry, + memories=memories, standalone=standalone, namespace=namespace, priority=priority, @@ -149,6 +154,7 @@ async def spawn_tasks( *, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, peering_name: Optional[str] = None, @@ -167,6 +173,7 @@ async def spawn_tasks( # The freezer and the registry are scoped to this whole task-set, to sync them all. lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle() registry = registry if registry is not None else registries.get_default_registry() + memories = memories if memories is not None else containers.ResourceMemories() vault = vault if vault is not None else global_vault vault = vault if vault is not None else credentials.Vault() event_queue: posting.K8sEventQueue = asyncio.Queue(loop=loop) @@ -240,6 +247,7 @@ async def spawn_tasks( handler=functools.partial(handling.resource_handler, lifecycle=lifecycle, registry=registry, + memories=memories, resource=resource, event_queue=event_queue, freeze=freeze_flag)))), # freeze is only checked diff --git a/kopf/structs/containers.py b/kopf/structs/containers.py new file mode 100644 index 00000000..48cb3b56 --- /dev/null +++ b/kopf/structs/containers.py @@ -0,0 +1,80 @@ +""" +A in-memory storage of arbitrary information per resource/object. + +The information is stored strictly in-memory and is not persistent. +On the operator restart, all the memories are lost. + +It is used internally to track allocated system resources for each Kubernetes +object, even if that object does not show up in the event streams for long time. +""" +import dataclasses +from typing import MutableMapping + +from kopf.structs import bodies + + +@dataclasses.dataclass(frozen=False) +class ResourceMemory: + """ A memo about a single resource/object. Usually stored in `Memories`. """ + pass + + +class ResourceMemories: + """ + A container of all memos about every existing resource in a single operator. + + Distinct operator tasks have their own memory containers, which + do not overlap. This solves the problem if storing the per-resource + entries in the global or context variables. + + The memos can store anything the resource handlers need to persist within + a single process/operator lifetime, but not persisted on the resource. + For example, the runtime system resources: flags, threads, tasks, etc. + Or the scalar values, which have meaning only for this operator process. + + The container is relatively async-safe: one individual resource is always + handled sequentially, never in parallel with itself (different resources + are handled in parallel through), so the same key will not be added/deleted + in the background during the operation, so the locking is not needed. + """ + _items: MutableMapping[str, ResourceMemory] + + def __init__(self) -> None: + super().__init__() + self._items = {} + + async def recall( + self, + body: bodies.Body, + ) -> ResourceMemory: + """ + Either find a resource's memory, or create and remember a new one. + """ + key = self._build_key(body) + if key not in self._items: + memory = ResourceMemory() + self._items[key] = memory + return self._items[key] + + async def forget(self, body: bodies.Body) -> None: + """ + Forget the resource's memory if it exists; or ignore if it does not. + """ + key = self._build_key(body) + if key in self._items: + del self._items[key] + + def _build_key( + self, + body: bodies.Body, + ) -> str: + """ + Construct an immutable persistent key of a resource. + + Generally, a uid is sufficient, as it is unique within the cluster. + But it can be e.g. plural/namespace/name triplet, or anything else, + even of different types (as long as it satisfies the type checkers). + + But it must be consistent within a single process lifetime. + """ + return body.get('metadata', {}).get('uid') or '' diff --git a/tests/basic-structs/test_containers.py b/tests/basic-structs/test_containers.py new file mode 100644 index 00000000..b36e752d --- /dev/null +++ b/tests/basic-structs/test_containers.py @@ -0,0 +1,40 @@ +from kopf.structs.bodies import Body +from kopf.structs.containers import ResourceMemory, ResourceMemories + +BODY: Body = { + 'metadata': { + 'uid': 'uid1', + } +} + + +def test_creation_with_defaults(): + ResourceMemory() + + +async def test_recalling_creates_when_absent(): + memories = ResourceMemories() + memory = await memories.recall(BODY) + assert isinstance(memory, ResourceMemory) + + +async def test_recalling_reuses_when_present(): + memories = ResourceMemories() + memory1 = await memories.recall(BODY) + memory2 = await memories.recall(BODY) + assert memory1 is memory2 + + +async def test_forgetting_deletes_when_present(): + memories = ResourceMemories() + memory1 = await memories.recall(BODY) + await memories.forget(BODY) + + # Check by recalling -- it should be a new one. + memory2 = await memories.recall(BODY) + assert memory1 is not memory2 + + +async def test_forgetting_ignores_when_absent(): + memories = ResourceMemories() + await memories.forget(BODY) diff --git a/tests/handling/test_cause_handling.py b/tests/handling/test_cause_handling.py index 7d43c73d..7215f9a9 100644 --- a/tests/handling/test_cause_handling.py +++ b/tests/handling/test_cause_handling.py @@ -4,6 +4,7 @@ import kopf from kopf.reactor.causation import Reason from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories from kopf.structs.finalizers import FINALIZER from kopf.structs.lastseen import LAST_SEEN_ANNOTATION @@ -18,6 +19,7 @@ async def test_acquire(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -54,6 +56,7 @@ async def test_create(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -97,6 +100,7 @@ async def test_update(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -140,6 +144,7 @@ async def test_delete(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -191,6 +196,7 @@ async def test_release(registry, resource, handlers, cause_mock, caplog, k8s_moc lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -231,6 +237,7 @@ async def test_gone(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -261,6 +268,7 @@ async def test_free(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -291,6 +299,7 @@ async def test_noop(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), diff --git a/tests/handling/test_cause_logging.py b/tests/handling/test_cause_logging.py index 0ca4547d..77d64b74 100644 --- a/tests/handling/test_cause_logging.py +++ b/tests/handling/test_cause_logging.py @@ -6,6 +6,7 @@ import kopf from kopf.reactor.causation import ALL_REASONS, HANDLER_REASONS from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories @pytest.mark.parametrize('cause_type', ALL_REASONS) @@ -17,6 +18,7 @@ async def test_all_logs_are_prefixed(registry, resource, handlers, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -46,6 +48,7 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -68,6 +71,7 @@ async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_ty lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), diff --git a/tests/handling/test_delays.py b/tests/handling/test_delays.py index bff14eaf..c501ef0b 100644 --- a/tests/handling/test_delays.py +++ b/tests/handling/test_delays.py @@ -11,6 +11,7 @@ from kopf.reactor.handling import WAITING_KEEPALIVE_INTERVAL from kopf.reactor.handling import resource_handler from kopf.reactor.states import HandlerState +from kopf.structs.containers import ResourceMemories from kopf.structs.finalizers import FINALIZER @@ -35,6 +36,7 @@ async def test_delayed_handlers_progress( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -89,6 +91,7 @@ async def test_delayed_handlers_sleep( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), diff --git a/tests/handling/test_errors.py b/tests/handling/test_errors.py index 127dd655..7ae93697 100644 --- a/tests/handling/test_errors.py +++ b/tests/handling/test_errors.py @@ -7,6 +7,7 @@ from kopf.reactor.causation import Reason, HANDLER_REASONS from kopf.reactor.handling import PermanentError, TemporaryError from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories # The extrahandlers are needed to prevent the cycle ending and status purging. @@ -27,6 +28,7 @@ async def test_fatal_error_stops_handler( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -69,6 +71,7 @@ async def test_retry_error_delays_handler( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -112,6 +115,7 @@ async def test_arbitrary_error_delays_handler( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), diff --git a/tests/handling/test_event_handling.py b/tests/handling/test_event_handling.py index de6a724f..7516e58b 100644 --- a/tests/handling/test_event_handling.py +++ b/tests/handling/test_event_handling.py @@ -6,6 +6,7 @@ import kopf from kopf.reactor.causation import ALL_REASONS from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories @pytest.mark.parametrize('cause_type', ALL_REASONS) @@ -19,6 +20,7 @@ async def test_handlers_called_always( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'ev-type', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -54,6 +56,7 @@ async def test_errors_are_ignored( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'ev-type', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), diff --git a/tests/handling/test_freezing.py b/tests/handling/test_freezing.py index 1fe3299f..66d76ddf 100644 --- a/tests/handling/test_freezing.py +++ b/tests/handling/test_freezing.py @@ -4,6 +4,7 @@ import kopf from kopf.reactor.handling import resource_handler from kopf.reactor.registries import OperatorRegistry +from kopf.structs.containers import ResourceMemories async def test_nothing_is_called_when_freeze_is_set(mocker, resource, caplog, assert_logs): @@ -27,6 +28,7 @@ async def test_nothing_is_called_when_freeze_is_set(mocker, resource, caplog, as lifecycle=lifecycle, registry=registry, resource=resource, + memories=ResourceMemories(), event=event, freeze=freeze, replenished=asyncio.Event(), diff --git a/tests/handling/test_multistep.py b/tests/handling/test_multistep.py index e8271ee8..862de96a 100644 --- a/tests/handling/test_multistep.py +++ b/tests/handling/test_multistep.py @@ -5,6 +5,7 @@ import kopf from kopf.reactor.causation import Reason, HANDLER_REASONS from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories @pytest.mark.parametrize('cause_type', HANDLER_REASONS) @@ -20,6 +21,7 @@ async def test_1st_step_stores_progress_by_patching( lifecycle=kopf.lifecycles.asap, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -66,6 +68,7 @@ async def test_2nd_step_finishes_the_handlers( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), diff --git a/tests/handling/test_no_handlers.py b/tests/handling/test_no_handlers.py index 3dfd60f9..e26cad9e 100644 --- a/tests/handling/test_no_handlers.py +++ b/tests/handling/test_no_handlers.py @@ -6,6 +6,7 @@ import kopf from kopf.reactor.causation import HANDLER_REASONS from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories from kopf.structs.lastseen import LAST_SEEN_ANNOTATION @@ -30,6 +31,7 @@ async def test_skipped_with_no_handlers( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), diff --git a/tests/handling/test_retrying_limits.py b/tests/handling/test_retrying_limits.py index 4b271b39..34da4004 100644 --- a/tests/handling/test_retrying_limits.py +++ b/tests/handling/test_retrying_limits.py @@ -1,5 +1,4 @@ import asyncio -import datetime import logging import freezegun @@ -8,6 +7,7 @@ import kopf from kopf.reactor.causation import HANDLER_REASONS from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories # The timeout is hard-coded in conftest.py:handlers(). @@ -37,6 +37,7 @@ async def test_timed_out_handler_fails( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -84,6 +85,7 @@ async def test_retries_limited_handler_fails( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'irrelevant', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), From cf6eb98b7ff509040bbf491f0c5651becaeb11fa Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 7 Nov 2019 00:13:42 +0100 Subject: [PATCH 4/5] =?UTF-8?q?Keep=20the=20"initial"=20cause=E2=80=99s=20?= =?UTF-8?q?flag=20throughout=20few=20handling=20cycles?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kopf/reactor/causation.py | 10 ++++-- kopf/reactor/handling.py | 7 +++- kopf/structs/containers.py | 7 ++-- tests/handling/conftest.py | 20 +++++------ tests/handling/test_cause_handling.py | 46 +++++++++++++++++--------- tests/handling/test_cause_logging.py | 13 +++++--- tests/handling/test_delays.py | 6 ++-- tests/handling/test_errors.py | 9 +++-- tests/handling/test_multistep.py | 10 ++++-- tests/handling/test_no_handlers.py | 2 +- tests/handling/test_retrying_limits.py | 8 +++-- 11 files changed, 90 insertions(+), 48 deletions(-) diff --git a/kopf/reactor/causation.py b/kopf/reactor/causation.py index 19ec5d37..bc67b84c 100644 --- a/kopf/reactor/causation.py +++ b/kopf/reactor/causation.py @@ -146,6 +146,7 @@ def detect_resource_changing_cause( event: bodies.Event, diff: Optional[diffs.Diff] = None, requires_finalizer: bool = True, + initial: bool = False, **kwargs: Any, ) -> ResourceChangingCause: """ @@ -159,7 +160,6 @@ def detect_resource_changing_cause( # Put them back to the pass-through kwargs (to avoid code duplication). body = event['object'] - initial = event['type'] is None # special value simulated by us in kopf.reactor.watching. kwargs.update(body=body, initial=initial) if diff is not None: kwargs.update(diff=diff) @@ -190,11 +190,15 @@ def detect_resource_changing_cause( # For an object seen for the first time (i.e. just-created), call the creation handlers, # then mark the state as if it was seen when the creation has finished. + # Creation never mixes with resuming, even if an object is detected on startup (first listing). if not lastseen.has_essence_stored(body): + kwargs['initial'] = False return ResourceChangingCause(reason=Reason.CREATE, **kwargs) - # Cases with no state changes are usually ignored (NOOP). But for the "None" events, - # as simulated for the initial listing, we call the resuming handlers (e.g. threads/tasks). + # Cases with no essence changes are usually ignored (NOOP). But for the not-yet-resumed objects, + # we simulate a fake cause to invoke the resuming handlers. For cases with the essence changes, + # the resuming handlers will be mixed-in to the regular cause handling ("cuckoo-style") + # due to the ``initial=True`` flag on the cause, regardless of the reason. if not diff and initial: return ResourceChangingCause(reason=Reason.RESUME, **kwargs) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 4097d061..e5b7027d 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -178,7 +178,7 @@ async def resource_handler( # Recall what is stored about that object. Share it in little portions with the consumers. # And immediately forget it if the object is deleted from the cluster (but keep in memory). - memory = await memories.recall(body) + memory = await memories.recall(body, noticed_by_listing=event['type'] is None) if event['type'] == 'DELETED': await memories.forget(body) @@ -210,6 +210,7 @@ async def resource_handler( old=old, new=new, diff=diff, + initial=memory.noticed_by_listing and not memory.fully_handled_once, requires_finalizer=registry.requires_finalizer(resource=resource, body=body), ) delay = await handle_resource_changing_cause( @@ -323,6 +324,10 @@ async def handle_resource_changing_cause( logger.debug("Removing the finalizer, thus allowing the actual deletion.") finalizers.remove_finalizers(body=body, patch=patch) + # Once all handlers have succeeded at least once for any reason, or if there were none, + # prevent further resume-handlers (which otherwise happens on each watch-stream re-listing). + memory.fully_handled_once = True + # Informational causes just print the log lines. if cause.reason == causation.Reason.GONE: logger.debug("Deleted, really deleted, and we are notified.") diff --git a/kopf/structs/containers.py b/kopf/structs/containers.py index 48cb3b56..072f3d87 100644 --- a/kopf/structs/containers.py +++ b/kopf/structs/containers.py @@ -16,7 +16,8 @@ @dataclasses.dataclass(frozen=False) class ResourceMemory: """ A memo about a single resource/object. Usually stored in `Memories`. """ - pass + noticed_by_listing: bool = False + fully_handled_once: bool = False class ResourceMemories: @@ -46,13 +47,15 @@ def __init__(self) -> None: async def recall( self, body: bodies.Body, + *, + noticed_by_listing: bool = False, ) -> ResourceMemory: """ Either find a resource's memory, or create and remember a new one. """ key = self._build_key(body) if key not in self._items: - memory = ResourceMemory() + memory = ResourceMemory(noticed_by_listing=noticed_by_listing) self._items[key] = memory return self._items[key] diff --git a/tests/handling/conftest.py b/tests/handling/conftest.py index 12b59835..f1542c62 100644 --- a/tests/handling/conftest.py +++ b/tests/handling/conftest.py @@ -94,6 +94,11 @@ def handlers(clear_default_registry): async def event_fn(**kwargs): return event_mock(**kwargs) + # Keep on-resume on top, to catch any issues with the test design (where it could be skipped). + @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn', timeout=600, retries=100) + async def resume_fn(**kwargs): + return resume_mock(**kwargs) + @kopf.on.create('zalando.org', 'v1', 'kopfexamples', id='create_fn', timeout=600, retries=100) async def create_fn(**kwargs): return create_mock(**kwargs) @@ -106,10 +111,6 @@ async def update_fn(**kwargs): async def delete_fn(**kwargs): return delete_mock(**kwargs) - @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn', timeout=600, retries=100) - async def resume_fn(**kwargs): - return resume_mock(**kwargs) - return HandlersContainer( event_mock=event_mock, create_mock=create_mock, @@ -136,6 +137,11 @@ def extrahandlers(clear_default_registry, handlers): async def event_fn2(**kwargs): return event_mock(**kwargs) + # Keep on-resume on top, to catch any issues with the test design (where it could be skipped). + @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn2') + async def resume_fn2(**kwargs): + return resume_mock(**kwargs) + @kopf.on.create('zalando.org', 'v1', 'kopfexamples', id='create_fn2') async def create_fn2(**kwargs): return create_mock(**kwargs) @@ -148,10 +154,6 @@ async def update_fn2(**kwargs): async def delete_fn2(**kwargs): return delete_mock(**kwargs) - @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn2') - async def resume_fn2(**kwargs): - return resume_mock(**kwargs) - return HandlersContainer( event_mock=event_mock, create_mock=create_mock, @@ -181,7 +183,6 @@ def new_detect_fn(**kwargs): original_new = kwargs.pop('new', None) original_old = kwargs.pop('old', None) reason = mock.reason if mock.reason is not None else original_reason - initial = bool(reason == Reason.RESUME) body = copy.deepcopy(mock.body) if mock.body is not None else original_body diff = copy.deepcopy(mock.diff) if mock.diff is not None else original_diff new = copy.deepcopy(mock.new) if mock.new is not None else original_new @@ -194,7 +195,6 @@ def new_detect_fn(**kwargs): # I.e. everything except what we mock: reason & body. cause = ResourceChangingCause( reason=reason, - initial=initial, body=body, diff=diff, new=new, diff --git a/tests/handling/test_cause_handling.py b/tests/handling/test_cause_handling.py index 7215f9a9..33e67d9b 100644 --- a/tests/handling/test_cause_handling.py +++ b/tests/handling/test_cause_handling.py @@ -1,6 +1,8 @@ import asyncio import logging +import pytest + import kopf from kopf.reactor.causation import Reason from kopf.reactor.handling import resource_handler @@ -8,10 +10,14 @@ from kopf.structs.finalizers import FINALIZER from kopf.structs.lastseen import LAST_SEEN_ANNOTATION +EVENT_TYPES = [None, 'ADDED', 'MODIFIED', 'DELETED'] + -async def test_acquire(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_acquire(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) + cause_mock.reason = Reason.ACQUIRE event_queue = asyncio.Queue() @@ -20,7 +26,7 @@ async def test_acquire(registry, handlers, resource, cause_mock, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -46,7 +52,8 @@ async def test_acquire(registry, handlers, resource, cause_mock, ]) -async def test_create(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_create(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.CREATE @@ -57,7 +64,7 @@ async def test_create(registry, handlers, resource, cause_mock, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -90,7 +97,8 @@ async def test_create(registry, handlers, resource, cause_mock, ]) -async def test_update(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_update(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.UPDATE @@ -101,7 +109,7 @@ async def test_update(registry, handlers, resource, cause_mock, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -134,7 +142,8 @@ async def test_update(registry, handlers, resource, cause_mock, ]) -async def test_delete(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_delete(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.DELETE @@ -145,7 +154,7 @@ async def test_delete(registry, handlers, resource, cause_mock, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -176,7 +185,9 @@ async def test_delete(registry, handlers, resource, cause_mock, ]) -async def test_release(registry, resource, handlers, cause_mock, caplog, k8s_mocked, assert_logs): +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_release(registry, resource, handlers, cause_mock, event_type, + caplog, k8s_mocked, assert_logs): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.RELEASE cause_mock.body.setdefault('metadata', {})['finalizers'] = [FINALIZER] @@ -197,7 +208,7 @@ async def test_release(registry, resource, handlers, cause_mock, caplog, k8s_moc registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -227,7 +238,8 @@ async def test_release(registry, resource, handlers, cause_mock, caplog, k8s_moc # Informational causes: just log, and do nothing else. # -async def test_gone(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_gone(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.GONE @@ -238,7 +250,7 @@ async def test_gone(registry, handlers, resource, cause_mock, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -258,7 +270,8 @@ async def test_gone(registry, handlers, resource, cause_mock, ]) -async def test_free(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_free(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.FREE @@ -269,7 +282,7 @@ async def test_free(registry, handlers, resource, cause_mock, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -289,7 +302,8 @@ async def test_free(registry, handlers, resource, cause_mock, ]) -async def test_noop(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_noop(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.NOOP @@ -300,7 +314,7 @@ async def test_noop(registry, handlers, resource, cause_mock, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, diff --git a/tests/handling/test_cause_logging.py b/tests/handling/test_cause_logging.py index 77d64b74..22458ef5 100644 --- a/tests/handling/test_cause_logging.py +++ b/tests/handling/test_cause_logging.py @@ -4,7 +4,7 @@ import pytest import kopf -from kopf.reactor.causation import ALL_REASONS, HANDLER_REASONS +from kopf.reactor.causation import ALL_REASONS, HANDLER_REASONS, Reason from kopf.reactor.handling import resource_handler from kopf.structs.containers import ResourceMemories @@ -12,6 +12,7 @@ @pytest.mark.parametrize('cause_type', ALL_REASONS) async def test_all_logs_are_prefixed(registry, resource, handlers, logstream, cause_type, cause_mock): + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type await resource_handler( @@ -19,7 +20,7 @@ async def test_all_logs_are_prefixed(registry, resource, handlers, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -39,6 +40,8 @@ async def test_all_logs_are_prefixed(registry, resource, handlers, async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, cause_mock, caplog, assert_logs, diff): caplog.set_level(logging.DEBUG) + + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.diff = diff cause_mock.new = object() # checked for `not None` @@ -49,7 +52,7 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -64,6 +67,8 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_type, cause_mock, caplog, assert_logs): caplog.set_level(logging.DEBUG) + + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.diff = None # same as the default, but for clarity @@ -72,7 +77,7 @@ async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_ty registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_delays.py b/tests/handling/test_delays.py index c501ef0b..a5d3089d 100644 --- a/tests/handling/test_delays.py +++ b/tests/handling/test_delays.py @@ -29,6 +29,7 @@ async def test_delayed_handlers_progress( handlers.delete_mock.side_effect = TemporaryError("oops", delay=delay) handlers.resume_mock.side_effect = TemporaryError("oops", delay=delay) + event_type = None if cause_reason == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_reason with freezegun.freeze_time(now): @@ -37,7 +38,7 @@ async def test_delayed_handlers_progress( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -74,6 +75,7 @@ async def test_delayed_handlers_sleep( # Simulate the original persisted state of the resource. started_dt = datetime.datetime.fromisoformat('2000-01-01T00:00:00') # long time ago is fine. delayed_dt = datetime.datetime.fromisoformat(delayed_iso) + event_type = None if cause_reason == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_reason cause_mock.body.update({ 'status': {'kopf': {'progress': { @@ -92,7 +94,7 @@ async def test_delayed_handlers_sleep( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_errors.py b/tests/handling/test_errors.py index 7ae93697..bd63e795 100644 --- a/tests/handling/test_errors.py +++ b/tests/handling/test_errors.py @@ -18,6 +18,7 @@ async def test_fatal_error_stops_handler( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type handlers.create_mock.side_effect = PermanentError("oops") handlers.update_mock.side_effect = PermanentError("oops") @@ -29,7 +30,7 @@ async def test_fatal_error_stops_handler( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -61,6 +62,7 @@ async def test_retry_error_delays_handler( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type handlers.create_mock.side_effect = TemporaryError("oops") handlers.update_mock.side_effect = TemporaryError("oops") @@ -72,7 +74,7 @@ async def test_retry_error_delays_handler( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -105,6 +107,7 @@ async def test_arbitrary_error_delays_handler( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type handlers.create_mock.side_effect = Exception("oops") handlers.update_mock.side_effect = Exception("oops") @@ -116,7 +119,7 @@ async def test_arbitrary_error_delays_handler( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_multistep.py b/tests/handling/test_multistep.py index 862de96a..9edf90e0 100644 --- a/tests/handling/test_multistep.py +++ b/tests/handling/test_multistep.py @@ -15,6 +15,7 @@ async def test_1st_step_stores_progress_by_patching( name1 = f'{cause_type}_fn' name2 = f'{cause_type}_fn2' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type await resource_handler( @@ -22,7 +23,7 @@ async def test_1st_step_stores_progress_by_patching( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -50,15 +51,18 @@ async def test_1st_step_stores_progress_by_patching( @pytest.mark.parametrize('cause_type', HANDLER_REASONS) -async def test_2nd_step_finishes_the_handlers( +async def test_2nd_step_finishes_the_handlers(caplog, registry, handlers, extrahandlers, resource, cause_mock, cause_type, k8s_mocked): name1 = f'{cause_type}_fn' name2 = f'{cause_type}_fn2' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.body.update({ 'status': {'kopf': {'progress': { + 'resume_fn': {'started': '1979-01-01T00:00:00', 'success': True}, + 'resume_fn2': {'started': '1979-01-01T00:00:00', 'success': True}, name1: {'started': '1979-01-01T00:00:00', 'success': True}, name2: {'started': '1979-01-01T00:00:00'}, }}} @@ -69,7 +73,7 @@ async def test_2nd_step_finishes_the_handlers( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_no_handlers.py b/tests/handling/test_no_handlers.py index e26cad9e..92cbf7cb 100644 --- a/tests/handling/test_no_handlers.py +++ b/tests/handling/test_no_handlers.py @@ -32,7 +32,7 @@ async def test_skipped_with_no_handlers( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': None, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_retrying_limits.py b/tests/handling/test_retrying_limits.py index 34da4004..1e448910 100644 --- a/tests/handling/test_retrying_limits.py +++ b/tests/handling/test_retrying_limits.py @@ -5,7 +5,7 @@ import pytest import kopf -from kopf.reactor.causation import HANDLER_REASONS +from kopf.reactor.causation import HANDLER_REASONS, Reason from kopf.reactor.handling import resource_handler from kopf.structs.containers import ResourceMemories @@ -22,6 +22,7 @@ async def test_timed_out_handler_fails( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.body.update({ 'status': {'kopf': {'progress': { @@ -38,7 +39,7 @@ async def test_timed_out_handler_fails( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -71,6 +72,7 @@ async def test_retries_limited_handler_fails( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.body.update({ 'status': {'kopf': {'progress': { @@ -86,7 +88,7 @@ async def test_retries_limited_handler_fails( registry=registry, resource=resource, memories=ResourceMemories(), - event={'type': 'irrelevant', 'object': cause_mock.body}, + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), From e086138e7aff5b3c4723c907d2d517c3d9c132f7 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 7 Nov 2019 00:14:54 +0100 Subject: [PATCH 5/5] Add on-resume handlers to the demo of available handlers --- examples/05-handlers/example.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/examples/05-handlers/example.py b/examples/05-handlers/example.py index a3f87933..e33a3d93 100644 --- a/examples/05-handlers/example.py +++ b/examples/05-handlers/example.py @@ -1,11 +1,21 @@ import kopf +@kopf.on.resume('zalando.org', 'v1', 'kopfexamples') +def resume_fn_1(cause, **kwargs): + print(f'RESUMED 1st') + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') def create_fn_1(**kwargs): print('CREATED 1st') +@kopf.on.resume('zalando.org', 'v1', 'kopfexamples') +def resume_fn_2(cause, **kwargs): + print(f'RESUMED 2nd') + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') def create_fn_2(**kwargs): print('CREATED 2nd')