diff --git a/examples/05-handlers/example.py b/examples/05-handlers/example.py index a3f87933..e33a3d93 100644 --- a/examples/05-handlers/example.py +++ b/examples/05-handlers/example.py @@ -1,11 +1,21 @@ import kopf +@kopf.on.resume('zalando.org', 'v1', 'kopfexamples') +def resume_fn_1(cause, **kwargs): + print(f'RESUMED 1st') + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') def create_fn_1(**kwargs): print('CREATED 1st') +@kopf.on.resume('zalando.org', 'v1', 'kopfexamples') +def resume_fn_2(cause, **kwargs): + print(f'RESUMED 2nd') + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples') def create_fn_2(**kwargs): print('CREATED 2nd') diff --git a/kopf/reactor/causation.py b/kopf/reactor/causation.py index 19ec5d37..bc67b84c 100644 --- a/kopf/reactor/causation.py +++ b/kopf/reactor/causation.py @@ -146,6 +146,7 @@ def detect_resource_changing_cause( event: bodies.Event, diff: Optional[diffs.Diff] = None, requires_finalizer: bool = True, + initial: bool = False, **kwargs: Any, ) -> ResourceChangingCause: """ @@ -159,7 +160,6 @@ def detect_resource_changing_cause( # Put them back to the pass-through kwargs (to avoid code duplication). body = event['object'] - initial = event['type'] is None # special value simulated by us in kopf.reactor.watching. kwargs.update(body=body, initial=initial) if diff is not None: kwargs.update(diff=diff) @@ -190,11 +190,15 @@ def detect_resource_changing_cause( # For an object seen for the first time (i.e. just-created), call the creation handlers, # then mark the state as if it was seen when the creation has finished. + # Creation never mixes with resuming, even if an object is detected on startup (first listing). if not lastseen.has_essence_stored(body): + kwargs['initial'] = False return ResourceChangingCause(reason=Reason.CREATE, **kwargs) - # Cases with no state changes are usually ignored (NOOP). But for the "None" events, - # as simulated for the initial listing, we call the resuming handlers (e.g. threads/tasks). + # Cases with no essence changes are usually ignored (NOOP). But for the not-yet-resumed objects, + # we simulate a fake cause to invoke the resuming handlers. For cases with the essence changes, + # the resuming handlers will be mixed-in to the regular cause handling ("cuckoo-style") + # due to the ``initial=True`` flag on the cause, regardless of the reason. if not diff and initial: return ResourceChangingCause(reason=Reason.RESUME, **kwargs) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index c5c34915..e5b7027d 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -30,6 +30,7 @@ from kopf.reactor import registries from kopf.reactor import states from kopf.structs import bodies +from kopf.structs import containers from kopf.structs import dicts from kopf.structs import diffs from kopf.structs import finalizers @@ -145,6 +146,7 @@ async def activity_trigger( async def resource_handler( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + memories: containers.ResourceMemories, resource: resources.Resource, event: bodies.Event, freeze: asyncio.Event, @@ -174,6 +176,12 @@ async def resource_handler( logger.debug("Ignoring the events due to freeze.") return + # Recall what is stored about that object. Share it in little portions with the consumers. + # And immediately forget it if the object is deleted from the cluster (but keep in memory). + memory = await memories.recall(body, noticed_by_listing=event['type'] is None) + if event['type'] == 'DELETED': + await memories.forget(body) + # Invoke all silent spies. No causation, no progress storage is performed. if registry.has_resource_watching_handlers(resource=resource): resource_watching_cause = causation.detect_resource_watching_cause( @@ -185,6 +193,7 @@ async def resource_handler( await handle_resource_watching_cause( lifecycle=lifecycles.all_at_once, registry=registry, + memory=memory, cause=resource_watching_cause, ) @@ -201,11 +210,13 @@ async def resource_handler( old=old, new=new, diff=diff, + initial=memory.noticed_by_listing and not memory.fully_handled_once, requires_finalizer=registry.requires_finalizer(resource=resource, body=body), ) delay = await handle_resource_changing_cause( lifecycle=lifecycle, registry=registry, + memory=memory, cause=resource_changing_cause, ) @@ -234,6 +245,7 @@ async def resource_handler( async def handle_resource_watching_cause( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + memory: containers.ResourceMemory, cause: causation.ResourceWatchingCause, ) -> None: """ @@ -262,6 +274,7 @@ async def handle_resource_watching_cause( async def handle_resource_changing_cause( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + memory: containers.ResourceMemory, cause: causation.ResourceChangingCause, ) -> Optional[float]: """ @@ -311,6 +324,10 @@ async def handle_resource_changing_cause( logger.debug("Removing the finalizer, thus allowing the actual deletion.") finalizers.remove_finalizers(body=body, patch=patch) + # Once all handlers have succeeded at least once for any reason, or if there were none, + # prevent further resume-handlers (which otherwise happens on each watch-stream re-listing). + memory.fully_handled_once = True + # Informational causes just print the log lines. if cause.reason == causation.Reason.GONE: logger.debug("Deleted, really deleted, and we are notified.") @@ -319,7 +336,7 @@ async def handle_resource_changing_cause( logger.debug("Deletion event, but we are done with it, and we do not care.") if cause.reason == causation.Reason.NOOP: - logger.debug("Something has changed, but we are not interested (state is the same).") + logger.debug("Something has changed, but we are not interested (the essence is the same).") # For the case of a newly created object, or one that doesn't have the correct # finalizers, lock it to this operator. Not all newly created objects will diff --git a/kopf/reactor/running.py b/kopf/reactor/running.py index 1e05eb52..6cd37bf7 100644 --- a/kopf/reactor/running.py +++ b/kopf/reactor/running.py @@ -17,6 +17,7 @@ from kopf.reactor import lifecycles from kopf.reactor import queueing from kopf.reactor import registries +from kopf.structs import containers from kopf.structs import credentials if TYPE_CHECKING: @@ -76,9 +77,11 @@ def login( def run( + *, loop: Optional[asyncio.AbstractEventLoop] = None, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, peering_name: Optional[str] = None, @@ -97,6 +100,7 @@ def run( loop.run_until_complete(operator( lifecycle=lifecycle, registry=registry, + memories=memories, standalone=standalone, namespace=namespace, priority=priority, @@ -110,8 +114,10 @@ def run( async def operator( + *, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, peering_name: Optional[str] = None, @@ -132,6 +138,7 @@ async def operator( operator_tasks = await spawn_tasks( lifecycle=lifecycle, registry=registry, + memories=memories, standalone=standalone, namespace=namespace, priority=priority, @@ -144,8 +151,10 @@ async def operator( async def spawn_tasks( + *, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, peering_name: Optional[str] = None, @@ -164,6 +173,7 @@ async def spawn_tasks( # The freezer and the registry are scoped to this whole task-set, to sync them all. lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle() registry = registry if registry is not None else registries.get_default_registry() + memories = memories if memories is not None else containers.ResourceMemories() vault = vault if vault is not None else global_vault vault = vault if vault is not None else credentials.Vault() event_queue: posting.K8sEventQueue = asyncio.Queue(loop=loop) @@ -237,6 +247,7 @@ async def spawn_tasks( handler=functools.partial(handling.resource_handler, lifecycle=lifecycle, registry=registry, + memories=memories, resource=resource, event_queue=event_queue, freeze=freeze_flag)))), # freeze is only checked diff --git a/kopf/structs/containers.py b/kopf/structs/containers.py new file mode 100644 index 00000000..072f3d87 --- /dev/null +++ b/kopf/structs/containers.py @@ -0,0 +1,83 @@ +""" +A in-memory storage of arbitrary information per resource/object. + +The information is stored strictly in-memory and is not persistent. +On the operator restart, all the memories are lost. + +It is used internally to track allocated system resources for each Kubernetes +object, even if that object does not show up in the event streams for long time. +""" +import dataclasses +from typing import MutableMapping + +from kopf.structs import bodies + + +@dataclasses.dataclass(frozen=False) +class ResourceMemory: + """ A memo about a single resource/object. Usually stored in `Memories`. """ + noticed_by_listing: bool = False + fully_handled_once: bool = False + + +class ResourceMemories: + """ + A container of all memos about every existing resource in a single operator. + + Distinct operator tasks have their own memory containers, which + do not overlap. This solves the problem if storing the per-resource + entries in the global or context variables. + + The memos can store anything the resource handlers need to persist within + a single process/operator lifetime, but not persisted on the resource. + For example, the runtime system resources: flags, threads, tasks, etc. + Or the scalar values, which have meaning only for this operator process. + + The container is relatively async-safe: one individual resource is always + handled sequentially, never in parallel with itself (different resources + are handled in parallel through), so the same key will not be added/deleted + in the background during the operation, so the locking is not needed. + """ + _items: MutableMapping[str, ResourceMemory] + + def __init__(self) -> None: + super().__init__() + self._items = {} + + async def recall( + self, + body: bodies.Body, + *, + noticed_by_listing: bool = False, + ) -> ResourceMemory: + """ + Either find a resource's memory, or create and remember a new one. + """ + key = self._build_key(body) + if key not in self._items: + memory = ResourceMemory(noticed_by_listing=noticed_by_listing) + self._items[key] = memory + return self._items[key] + + async def forget(self, body: bodies.Body) -> None: + """ + Forget the resource's memory if it exists; or ignore if it does not. + """ + key = self._build_key(body) + if key in self._items: + del self._items[key] + + def _build_key( + self, + body: bodies.Body, + ) -> str: + """ + Construct an immutable persistent key of a resource. + + Generally, a uid is sufficient, as it is unique within the cluster. + But it can be e.g. plural/namespace/name triplet, or anything else, + even of different types (as long as it satisfies the type checkers). + + But it must be consistent within a single process lifetime. + """ + return body.get('metadata', {}).get('uid') or '' diff --git a/tests/basic-structs/test_containers.py b/tests/basic-structs/test_containers.py new file mode 100644 index 00000000..b36e752d --- /dev/null +++ b/tests/basic-structs/test_containers.py @@ -0,0 +1,40 @@ +from kopf.structs.bodies import Body +from kopf.structs.containers import ResourceMemory, ResourceMemories + +BODY: Body = { + 'metadata': { + 'uid': 'uid1', + } +} + + +def test_creation_with_defaults(): + ResourceMemory() + + +async def test_recalling_creates_when_absent(): + memories = ResourceMemories() + memory = await memories.recall(BODY) + assert isinstance(memory, ResourceMemory) + + +async def test_recalling_reuses_when_present(): + memories = ResourceMemories() + memory1 = await memories.recall(BODY) + memory2 = await memories.recall(BODY) + assert memory1 is memory2 + + +async def test_forgetting_deletes_when_present(): + memories = ResourceMemories() + memory1 = await memories.recall(BODY) + await memories.forget(BODY) + + # Check by recalling -- it should be a new one. + memory2 = await memories.recall(BODY) + assert memory1 is not memory2 + + +async def test_forgetting_ignores_when_absent(): + memories = ResourceMemories() + await memories.forget(BODY) diff --git a/tests/handling/conftest.py b/tests/handling/conftest.py index 12b59835..f1542c62 100644 --- a/tests/handling/conftest.py +++ b/tests/handling/conftest.py @@ -94,6 +94,11 @@ def handlers(clear_default_registry): async def event_fn(**kwargs): return event_mock(**kwargs) + # Keep on-resume on top, to catch any issues with the test design (where it could be skipped). + @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn', timeout=600, retries=100) + async def resume_fn(**kwargs): + return resume_mock(**kwargs) + @kopf.on.create('zalando.org', 'v1', 'kopfexamples', id='create_fn', timeout=600, retries=100) async def create_fn(**kwargs): return create_mock(**kwargs) @@ -106,10 +111,6 @@ async def update_fn(**kwargs): async def delete_fn(**kwargs): return delete_mock(**kwargs) - @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn', timeout=600, retries=100) - async def resume_fn(**kwargs): - return resume_mock(**kwargs) - return HandlersContainer( event_mock=event_mock, create_mock=create_mock, @@ -136,6 +137,11 @@ def extrahandlers(clear_default_registry, handlers): async def event_fn2(**kwargs): return event_mock(**kwargs) + # Keep on-resume on top, to catch any issues with the test design (where it could be skipped). + @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn2') + async def resume_fn2(**kwargs): + return resume_mock(**kwargs) + @kopf.on.create('zalando.org', 'v1', 'kopfexamples', id='create_fn2') async def create_fn2(**kwargs): return create_mock(**kwargs) @@ -148,10 +154,6 @@ async def update_fn2(**kwargs): async def delete_fn2(**kwargs): return delete_mock(**kwargs) - @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn2') - async def resume_fn2(**kwargs): - return resume_mock(**kwargs) - return HandlersContainer( event_mock=event_mock, create_mock=create_mock, @@ -181,7 +183,6 @@ def new_detect_fn(**kwargs): original_new = kwargs.pop('new', None) original_old = kwargs.pop('old', None) reason = mock.reason if mock.reason is not None else original_reason - initial = bool(reason == Reason.RESUME) body = copy.deepcopy(mock.body) if mock.body is not None else original_body diff = copy.deepcopy(mock.diff) if mock.diff is not None else original_diff new = copy.deepcopy(mock.new) if mock.new is not None else original_new @@ -194,7 +195,6 @@ def new_detect_fn(**kwargs): # I.e. everything except what we mock: reason & body. cause = ResourceChangingCause( reason=reason, - initial=initial, body=body, diff=diff, new=new, diff --git a/tests/handling/test_cause_handling.py b/tests/handling/test_cause_handling.py index 7d43c73d..33e67d9b 100644 --- a/tests/handling/test_cause_handling.py +++ b/tests/handling/test_cause_handling.py @@ -1,16 +1,23 @@ import asyncio import logging +import pytest + import kopf from kopf.reactor.causation import Reason from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories from kopf.structs.finalizers import FINALIZER from kopf.structs.lastseen import LAST_SEEN_ANNOTATION +EVENT_TYPES = [None, 'ADDED', 'MODIFIED', 'DELETED'] + -async def test_acquire(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_acquire(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) + cause_mock.reason = Reason.ACQUIRE event_queue = asyncio.Queue() @@ -18,7 +25,8 @@ async def test_acquire(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -44,7 +52,8 @@ async def test_acquire(registry, handlers, resource, cause_mock, ]) -async def test_create(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_create(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.CREATE @@ -54,7 +63,8 @@ async def test_create(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -87,7 +97,8 @@ async def test_create(registry, handlers, resource, cause_mock, ]) -async def test_update(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_update(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.UPDATE @@ -97,7 +108,8 @@ async def test_update(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -130,7 +142,8 @@ async def test_update(registry, handlers, resource, cause_mock, ]) -async def test_delete(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_delete(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.DELETE @@ -140,7 +153,8 @@ async def test_delete(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -171,7 +185,9 @@ async def test_delete(registry, handlers, resource, cause_mock, ]) -async def test_release(registry, resource, handlers, cause_mock, caplog, k8s_mocked, assert_logs): +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_release(registry, resource, handlers, cause_mock, event_type, + caplog, k8s_mocked, assert_logs): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.RELEASE cause_mock.body.setdefault('metadata', {})['finalizers'] = [FINALIZER] @@ -191,7 +207,8 @@ async def test_release(registry, resource, handlers, cause_mock, caplog, k8s_moc lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -221,7 +238,8 @@ async def test_release(registry, resource, handlers, cause_mock, caplog, k8s_moc # Informational causes: just log, and do nothing else. # -async def test_gone(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_gone(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.GONE @@ -231,7 +249,8 @@ async def test_gone(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -251,7 +270,8 @@ async def test_gone(registry, handlers, resource, cause_mock, ]) -async def test_free(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_free(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.FREE @@ -261,7 +281,8 @@ async def test_free(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, @@ -281,7 +302,8 @@ async def test_free(registry, handlers, resource, cause_mock, ]) -async def test_noop(registry, handlers, resource, cause_mock, +@pytest.mark.parametrize('event_type', EVENT_TYPES) +async def test_noop(registry, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.NOOP @@ -291,7 +313,8 @@ async def test_noop(registry, handlers, resource, cause_mock, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=event_queue, diff --git a/tests/handling/test_cause_logging.py b/tests/handling/test_cause_logging.py index 0ca4547d..22458ef5 100644 --- a/tests/handling/test_cause_logging.py +++ b/tests/handling/test_cause_logging.py @@ -4,20 +4,23 @@ import pytest import kopf -from kopf.reactor.causation import ALL_REASONS, HANDLER_REASONS +from kopf.reactor.causation import ALL_REASONS, HANDLER_REASONS, Reason from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories @pytest.mark.parametrize('cause_type', ALL_REASONS) async def test_all_logs_are_prefixed(registry, resource, handlers, logstream, cause_type, cause_mock): + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type await resource_handler( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -37,6 +40,8 @@ async def test_all_logs_are_prefixed(registry, resource, handlers, async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, cause_mock, caplog, assert_logs, diff): caplog.set_level(logging.DEBUG) + + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.diff = diff cause_mock.new = object() # checked for `not None` @@ -46,7 +51,8 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -61,6 +67,8 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_type, cause_mock, caplog, assert_logs): caplog.set_level(logging.DEBUG) + + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.diff = None # same as the default, but for clarity @@ -68,7 +76,8 @@ async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_ty lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_delays.py b/tests/handling/test_delays.py index bff14eaf..a5d3089d 100644 --- a/tests/handling/test_delays.py +++ b/tests/handling/test_delays.py @@ -11,6 +11,7 @@ from kopf.reactor.handling import WAITING_KEEPALIVE_INTERVAL from kopf.reactor.handling import resource_handler from kopf.reactor.states import HandlerState +from kopf.structs.containers import ResourceMemories from kopf.structs.finalizers import FINALIZER @@ -28,6 +29,7 @@ async def test_delayed_handlers_progress( handlers.delete_mock.side_effect = TemporaryError("oops", delay=delay) handlers.resume_mock.side_effect = TemporaryError("oops", delay=delay) + event_type = None if cause_reason == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_reason with freezegun.freeze_time(now): @@ -35,7 +37,8 @@ async def test_delayed_handlers_progress( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -72,6 +75,7 @@ async def test_delayed_handlers_sleep( # Simulate the original persisted state of the resource. started_dt = datetime.datetime.fromisoformat('2000-01-01T00:00:00') # long time ago is fine. delayed_dt = datetime.datetime.fromisoformat(delayed_iso) + event_type = None if cause_reason == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_reason cause_mock.body.update({ 'status': {'kopf': {'progress': { @@ -89,7 +93,8 @@ async def test_delayed_handlers_sleep( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_errors.py b/tests/handling/test_errors.py index 127dd655..bd63e795 100644 --- a/tests/handling/test_errors.py +++ b/tests/handling/test_errors.py @@ -7,6 +7,7 @@ from kopf.reactor.causation import Reason, HANDLER_REASONS from kopf.reactor.handling import PermanentError, TemporaryError from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories # The extrahandlers are needed to prevent the cycle ending and status purging. @@ -17,6 +18,7 @@ async def test_fatal_error_stops_handler( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type handlers.create_mock.side_effect = PermanentError("oops") handlers.update_mock.side_effect = PermanentError("oops") @@ -27,7 +29,8 @@ async def test_fatal_error_stops_handler( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -59,6 +62,7 @@ async def test_retry_error_delays_handler( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type handlers.create_mock.side_effect = TemporaryError("oops") handlers.update_mock.side_effect = TemporaryError("oops") @@ -69,7 +73,8 @@ async def test_retry_error_delays_handler( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -102,6 +107,7 @@ async def test_arbitrary_error_delays_handler( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type handlers.create_mock.side_effect = Exception("oops") handlers.update_mock.side_effect = Exception("oops") @@ -112,7 +118,8 @@ async def test_arbitrary_error_delays_handler( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_event_handling.py b/tests/handling/test_event_handling.py index de6a724f..7516e58b 100644 --- a/tests/handling/test_event_handling.py +++ b/tests/handling/test_event_handling.py @@ -6,6 +6,7 @@ import kopf from kopf.reactor.causation import ALL_REASONS from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories @pytest.mark.parametrize('cause_type', ALL_REASONS) @@ -19,6 +20,7 @@ async def test_handlers_called_always( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'ev-type', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), @@ -54,6 +56,7 @@ async def test_errors_are_ignored( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, + memories=ResourceMemories(), event={'type': 'ev-type', 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), diff --git a/tests/handling/test_freezing.py b/tests/handling/test_freezing.py index 1fe3299f..66d76ddf 100644 --- a/tests/handling/test_freezing.py +++ b/tests/handling/test_freezing.py @@ -4,6 +4,7 @@ import kopf from kopf.reactor.handling import resource_handler from kopf.reactor.registries import OperatorRegistry +from kopf.structs.containers import ResourceMemories async def test_nothing_is_called_when_freeze_is_set(mocker, resource, caplog, assert_logs): @@ -27,6 +28,7 @@ async def test_nothing_is_called_when_freeze_is_set(mocker, resource, caplog, as lifecycle=lifecycle, registry=registry, resource=resource, + memories=ResourceMemories(), event=event, freeze=freeze, replenished=asyncio.Event(), diff --git a/tests/handling/test_multistep.py b/tests/handling/test_multistep.py index e8271ee8..9edf90e0 100644 --- a/tests/handling/test_multistep.py +++ b/tests/handling/test_multistep.py @@ -5,6 +5,7 @@ import kopf from kopf.reactor.causation import Reason, HANDLER_REASONS from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories @pytest.mark.parametrize('cause_type', HANDLER_REASONS) @@ -14,13 +15,15 @@ async def test_1st_step_stores_progress_by_patching( name1 = f'{cause_type}_fn' name2 = f'{cause_type}_fn2' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type await resource_handler( lifecycle=kopf.lifecycles.asap, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -48,15 +51,18 @@ async def test_1st_step_stores_progress_by_patching( @pytest.mark.parametrize('cause_type', HANDLER_REASONS) -async def test_2nd_step_finishes_the_handlers( +async def test_2nd_step_finishes_the_handlers(caplog, registry, handlers, extrahandlers, resource, cause_mock, cause_type, k8s_mocked): name1 = f'{cause_type}_fn' name2 = f'{cause_type}_fn2' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.body.update({ 'status': {'kopf': {'progress': { + 'resume_fn': {'started': '1979-01-01T00:00:00', 'success': True}, + 'resume_fn2': {'started': '1979-01-01T00:00:00', 'success': True}, name1: {'started': '1979-01-01T00:00:00', 'success': True}, name2: {'started': '1979-01-01T00:00:00'}, }}} @@ -66,7 +72,8 @@ async def test_2nd_step_finishes_the_handlers( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_no_handlers.py b/tests/handling/test_no_handlers.py index 3dfd60f9..92cbf7cb 100644 --- a/tests/handling/test_no_handlers.py +++ b/tests/handling/test_no_handlers.py @@ -6,6 +6,7 @@ import kopf from kopf.reactor.causation import HANDLER_REASONS from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories from kopf.structs.lastseen import LAST_SEEN_ANNOTATION @@ -30,7 +31,8 @@ async def test_skipped_with_no_handlers( lifecycle=kopf.lifecycles.all_at_once, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': None, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), diff --git a/tests/handling/test_retrying_limits.py b/tests/handling/test_retrying_limits.py index 4b271b39..1e448910 100644 --- a/tests/handling/test_retrying_limits.py +++ b/tests/handling/test_retrying_limits.py @@ -1,13 +1,13 @@ import asyncio -import datetime import logging import freezegun import pytest import kopf -from kopf.reactor.causation import HANDLER_REASONS +from kopf.reactor.causation import HANDLER_REASONS, Reason from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories # The timeout is hard-coded in conftest.py:handlers(). @@ -22,6 +22,7 @@ async def test_timed_out_handler_fails( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.body.update({ 'status': {'kopf': {'progress': { @@ -37,7 +38,8 @@ async def test_timed_out_handler_fails( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(), @@ -70,6 +72,7 @@ async def test_retries_limited_handler_fails( caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' + event_type = None if cause_type == Reason.RESUME else 'irrelevant' cause_mock.reason = cause_type cause_mock.body.update({ 'status': {'kopf': {'progress': { @@ -84,7 +87,8 @@ async def test_retries_limited_handler_fails( lifecycle=kopf.lifecycles.one_by_one, registry=registry, resource=resource, - event={'type': 'irrelevant', 'object': cause_mock.body}, + memories=ResourceMemories(), + event={'type': event_type, 'object': cause_mock.body}, freeze=asyncio.Event(), replenished=asyncio.Event(), event_queue=asyncio.Queue(),