diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 7fe574d3..6b163d4c 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -225,18 +225,27 @@ async def resource_handler( # Sleep strictly after patching, never before -- to keep the status proper. # The patching above, if done, interrupts the sleep instantly, so we skip it at all. + # Note: a zero-second or negative sleep is still a sleep, it will trigger a dummy patch. if delay and patch: logger.debug(f"Sleeping was skipped because of the patch, {delay} seconds left.") - elif delay: - logger.debug(f"Sleeping for {delay} seconds for the delayed handlers.") - unslept = await sleeping.sleep_or_wait(min(delay, WAITING_KEEPALIVE_INTERVAL), replenished) - if unslept is not None: - logger.debug(f"Sleeping was interrupted by new changes, {unslept} seconds left.") + elif delay is None and not patch: + logger.debug(f"Handling cycle is finished, waiting for new changes since now.") + elif delay is not None: + if delay > 0: + logger.debug(f"Sleeping for {delay} seconds for the delayed handlers.") + limited_delay = min(delay, WAITING_KEEPALIVE_INTERVAL) + unslept_delay = await sleeping.sleep_or_wait(limited_delay, replenished) else: - now = datetime.datetime.utcnow() - dummy = patches.Patch({'status': {'kopf': {'dummy': now.isoformat()}}}) - logger.debug("Provoking reaction with: %r", dummy) - await patching.patch_obj(resource=resource, patch=dummy, body=body) + unslept_delay = None # no need to sleep? means: slept in full. + + if unslept_delay is not None: + logger.debug(f"Sleeping was interrupted by new changes, {unslept_delay} seconds left.") + else: + # Any unique always-changing value will work; not necessary a timestamp. + dummy_value = datetime.datetime.utcnow().isoformat() + dummy_patch = patches.Patch({'status': {'kopf': {'dummy': dummy_value}}}) + logger.debug("Provoking reaction with: %r", dummy_patch) + await patching.patch_obj(resource=resource, patch=dummy_patch, body=body) async def handle_resource_watching_cause( diff --git a/kopf/reactor/states.py b/kopf/reactor/states.py index d2135a8b..f4462522 100644 --- a/kopf/reactor/states.py +++ b/kopf/reactor/states.py @@ -262,16 +262,16 @@ def done(self) -> bool: return all(handler_state.finished for handler_state in self._states.values()) @property - def delay(self) -> float: + def delay(self) -> Optional[float]: now = datetime.datetime.utcnow() state_times = [handler_state.delayed for handler_state in self._states.values()] clean_times = [t for t in state_times if t is not None] if clean_times: until = min(clean_times) # the soonest awake datetime. delay = (until - now).total_seconds() + return max(0, delay) else: - delay = 0 - return max(0, delay) + return None def deliver_results( diff --git a/tests/handling/test_timing_consistency.py b/tests/handling/test_timing_consistency.py new file mode 100644 index 00000000..b43516fe --- /dev/null +++ b/tests/handling/test_timing_consistency.py @@ -0,0 +1,75 @@ +import asyncio +import datetime + +import freezegun + +import kopf +from kopf.reactor.handling import resource_handler +from kopf.structs.containers import ResourceMemories + + +async def test_consistent_awakening(registry, resource, k8s_mocked, mocker): + """ + A special case to ensure that "now" is consistent during the handling. + + Previously, "now" of `handler.awakened` and "now" of `state.delay` were + different (maybe for less than 1 ms). If the scheduled awakening time was + unlucky to be between these two points in time, the operator stopped + reacting on this object until any other events or changes arrive. + + Implementation-wise, the operator neither selected the handlers (because + it was "1ms too early", as per `handler.awakened`), + nor did it sleep (because it was "1ms too late", as per `state.delay`), + nor did it produce even a dummy patch (because zero-sleep meant "no sleep"). + + After the fix, zero-sleep produces a dummy patch to trigger the reaction + cycle after the sleep is over (as if it was an actual zero-time sleep). + + In the test, the time granularity is intentionally that low -- 1 µs. + The time is anyway frozen and does not progress unless explicitly ticked. + + See also: #284 + """ + + # Simulate that the object is scheduled to be awakened between the watch-event and sleep. + ts0 = datetime.datetime(2019, 12, 30, 10, 56, 43) + tsA_triggered = "2019-12-30T10:56:42.999999" + ts0_scheduled = "2019-12-30T10:56:43.000000" + tsB_delivered = "2019-12-30T10:56:43.000001" + + # A dummy handler: it will not be selected for execution anyway, we just need to have it. + @kopf.on.create(resource.group, resource.version, resource.plural, id='some-id') + def handler_fn(**_): + pass + + # Simulate the ticking of time, so that it goes beyond the scheduled awakening time. + # Any hook point between handler selection and delay calculation is fine, + # but State.store() also prevents other status-fields from being added and the patch populated. + def move_to_tsB(*_, **__): + frozen_dt.move_to(tsB_delivered) + + state_store = mocker.patch('kopf.reactor.states.State.store', side_effect=move_to_tsB) + body = {'status': {'kopf': {'progress': {'some-id': {'delayed': ts0_scheduled}}}}} + + # Simulate the call as if the event has just arrived on the watch-stream. + # Another way (same effect): handle_resource_changing_cause() and its result. + with freezegun.freeze_time(tsA_triggered) as frozen_dt: + assert datetime.datetime.utcnow() < ts0 # extra precaution + await resource_handler( + lifecycle=kopf.lifecycles.all_at_once, + registry=registry, + resource=resource, + memories=ResourceMemories(), + event={'type': 'ADDED', 'object': body}, + replenished=asyncio.Event(), + event_queue=asyncio.Queue(), + ) + assert datetime.datetime.utcnow() > ts0 # extra precaution + + assert state_store.called + + # Without "now"-time consistency, neither sleep() would be called, nor a patch applied. + # Verify that the patch was actually applied, so that the reaction cycle continues. + assert not k8s_mocked.asyncio_sleep.called + assert k8s_mocked.patch_obj.called + assert 'dummy' in k8s_mocked.patch_obj.call_args_list[-1][1]['patch']['status']['kopf']