Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #285 from nolar/consistent-now-when-awakening
Browse files Browse the repository at this point in the history
Distinguish "no handlers at all" and "no handers for now" cases
  • Loading branch information
nolar authored Jan 8, 2020
2 parents 741942f + 6bd30a5 commit 1719184
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 12 deletions.
27 changes: 18 additions & 9 deletions kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,27 @@ async def resource_handler(

# Sleep strictly after patching, never before -- to keep the status proper.
# The patching above, if done, interrupts the sleep instantly, so we skip it at all.
# Note: a zero-second or negative sleep is still a sleep, it will trigger a dummy patch.
if delay and patch:
logger.debug(f"Sleeping was skipped because of the patch, {delay} seconds left.")
elif delay:
logger.debug(f"Sleeping for {delay} seconds for the delayed handlers.")
unslept = await sleeping.sleep_or_wait(min(delay, WAITING_KEEPALIVE_INTERVAL), replenished)
if unslept is not None:
logger.debug(f"Sleeping was interrupted by new changes, {unslept} seconds left.")
elif delay is None and not patch:
logger.debug(f"Handling cycle is finished, waiting for new changes since now.")
elif delay is not None:
if delay > 0:
logger.debug(f"Sleeping for {delay} seconds for the delayed handlers.")
limited_delay = min(delay, WAITING_KEEPALIVE_INTERVAL)
unslept_delay = await sleeping.sleep_or_wait(limited_delay, replenished)
else:
now = datetime.datetime.utcnow()
dummy = patches.Patch({'status': {'kopf': {'dummy': now.isoformat()}}})
logger.debug("Provoking reaction with: %r", dummy)
await patching.patch_obj(resource=resource, patch=dummy, body=body)
unslept_delay = None # no need to sleep? means: slept in full.

if unslept_delay is not None:
logger.debug(f"Sleeping was interrupted by new changes, {unslept_delay} seconds left.")
else:
# Any unique always-changing value will work; not necessary a timestamp.
dummy_value = datetime.datetime.utcnow().isoformat()
dummy_patch = patches.Patch({'status': {'kopf': {'dummy': dummy_value}}})
logger.debug("Provoking reaction with: %r", dummy_patch)
await patching.patch_obj(resource=resource, patch=dummy_patch, body=body)


async def handle_resource_watching_cause(
Expand Down
6 changes: 3 additions & 3 deletions kopf/reactor/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,16 @@ def done(self) -> bool:
return all(handler_state.finished for handler_state in self._states.values())

@property
def delay(self) -> float:
def delay(self) -> Optional[float]:
now = datetime.datetime.utcnow()
state_times = [handler_state.delayed for handler_state in self._states.values()]
clean_times = [t for t in state_times if t is not None]
if clean_times:
until = min(clean_times) # the soonest awake datetime.
delay = (until - now).total_seconds()
return max(0, delay)
else:
delay = 0
return max(0, delay)
return None


def deliver_results(
Expand Down
75 changes: 75 additions & 0 deletions tests/handling/test_timing_consistency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio
import datetime

import freezegun

import kopf
from kopf.reactor.handling import resource_handler
from kopf.structs.containers import ResourceMemories


async def test_consistent_awakening(registry, resource, k8s_mocked, mocker):
"""
A special case to ensure that "now" is consistent during the handling.
Previously, "now" of `handler.awakened` and "now" of `state.delay` were
different (maybe for less than 1 ms). If the scheduled awakening time was
unlucky to be between these two points in time, the operator stopped
reacting on this object until any other events or changes arrive.
Implementation-wise, the operator neither selected the handlers (because
it was "1ms too early", as per `handler.awakened`),
nor did it sleep (because it was "1ms too late", as per `state.delay`),
nor did it produce even a dummy patch (because zero-sleep meant "no sleep").
After the fix, zero-sleep produces a dummy patch to trigger the reaction
cycle after the sleep is over (as if it was an actual zero-time sleep).
In the test, the time granularity is intentionally that low -- 1 µs.
The time is anyway frozen and does not progress unless explicitly ticked.
See also: #284
"""

# Simulate that the object is scheduled to be awakened between the watch-event and sleep.
ts0 = datetime.datetime(2019, 12, 30, 10, 56, 43)
tsA_triggered = "2019-12-30T10:56:42.999999"
ts0_scheduled = "2019-12-30T10:56:43.000000"
tsB_delivered = "2019-12-30T10:56:43.000001"

# A dummy handler: it will not be selected for execution anyway, we just need to have it.
@kopf.on.create(resource.group, resource.version, resource.plural, id='some-id')
def handler_fn(**_):
pass

# Simulate the ticking of time, so that it goes beyond the scheduled awakening time.
# Any hook point between handler selection and delay calculation is fine,
# but State.store() also prevents other status-fields from being added and the patch populated.
def move_to_tsB(*_, **__):
frozen_dt.move_to(tsB_delivered)

state_store = mocker.patch('kopf.reactor.states.State.store', side_effect=move_to_tsB)
body = {'status': {'kopf': {'progress': {'some-id': {'delayed': ts0_scheduled}}}}}

# Simulate the call as if the event has just arrived on the watch-stream.
# Another way (same effect): handle_resource_changing_cause() and its result.
with freezegun.freeze_time(tsA_triggered) as frozen_dt:
assert datetime.datetime.utcnow() < ts0 # extra precaution
await resource_handler(
lifecycle=kopf.lifecycles.all_at_once,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event={'type': 'ADDED', 'object': body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert datetime.datetime.utcnow() > ts0 # extra precaution

assert state_store.called

# Without "now"-time consistency, neither sleep() would be called, nor a patch applied.
# Verify that the patch was actually applied, so that the reaction cycle continues.
assert not k8s_mocked.asyncio_sleep.called
assert k8s_mocked.patch_obj.called
assert 'dummy' in k8s_mocked.patch_obj.call_args_list[-1][1]['patch']['status']['kopf']

0 comments on commit 1719184

Please sign in to comment.