Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #230 from nolar/no-resumes-repeated
Browse files Browse the repository at this point in the history
Prevent repeated resumes of a resource by remembering its resumed flag
  • Loading branch information
nolar authored Nov 13, 2019
2 parents bcf6ad7 + e086138 commit 223f53d
Show file tree
Hide file tree
Showing 16 changed files with 274 additions and 47 deletions.
10 changes: 10 additions & 0 deletions examples/05-handlers/example.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import kopf


@kopf.on.resume('zalando.org', 'v1', 'kopfexamples')
def resume_fn_1(cause, **kwargs):
print(f'RESUMED 1st')


@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_fn_1(**kwargs):
print('CREATED 1st')


@kopf.on.resume('zalando.org', 'v1', 'kopfexamples')
def resume_fn_2(cause, **kwargs):
print(f'RESUMED 2nd')


@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_fn_2(**kwargs):
print('CREATED 2nd')
Expand Down
10 changes: 7 additions & 3 deletions kopf/reactor/causation.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def detect_resource_changing_cause(
event: bodies.Event,
diff: Optional[diffs.Diff] = None,
requires_finalizer: bool = True,
initial: bool = False,
**kwargs: Any,
) -> ResourceChangingCause:
"""
Expand All @@ -159,7 +160,6 @@ def detect_resource_changing_cause(

# Put them back to the pass-through kwargs (to avoid code duplication).
body = event['object']
initial = event['type'] is None # special value simulated by us in kopf.reactor.watching.
kwargs.update(body=body, initial=initial)
if diff is not None:
kwargs.update(diff=diff)
Expand Down Expand Up @@ -190,11 +190,15 @@ def detect_resource_changing_cause(

# For an object seen for the first time (i.e. just-created), call the creation handlers,
# then mark the state as if it was seen when the creation has finished.
# Creation never mixes with resuming, even if an object is detected on startup (first listing).
if not lastseen.has_essence_stored(body):
kwargs['initial'] = False
return ResourceChangingCause(reason=Reason.CREATE, **kwargs)

# Cases with no state changes are usually ignored (NOOP). But for the "None" events,
# as simulated for the initial listing, we call the resuming handlers (e.g. threads/tasks).
# Cases with no essence changes are usually ignored (NOOP). But for the not-yet-resumed objects,
# we simulate a fake cause to invoke the resuming handlers. For cases with the essence changes,
# the resuming handlers will be mixed-in to the regular cause handling ("cuckoo-style")
# due to the ``initial=True`` flag on the cause, regardless of the reason.
if not diff and initial:
return ResourceChangingCause(reason=Reason.RESUME, **kwargs)

Expand Down
19 changes: 18 additions & 1 deletion kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from kopf.reactor import registries
from kopf.reactor import states
from kopf.structs import bodies
from kopf.structs import containers
from kopf.structs import dicts
from kopf.structs import diffs
from kopf.structs import finalizers
Expand Down Expand Up @@ -145,6 +146,7 @@ async def activity_trigger(
async def resource_handler(
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
memories: containers.ResourceMemories,
resource: resources.Resource,
event: bodies.Event,
freeze: asyncio.Event,
Expand Down Expand Up @@ -174,6 +176,12 @@ async def resource_handler(
logger.debug("Ignoring the events due to freeze.")
return

# Recall what is stored about that object. Share it in little portions with the consumers.
# And immediately forget it if the object is deleted from the cluster (but keep in memory).
memory = await memories.recall(body, noticed_by_listing=event['type'] is None)
if event['type'] == 'DELETED':
await memories.forget(body)

# Invoke all silent spies. No causation, no progress storage is performed.
if registry.has_resource_watching_handlers(resource=resource):
resource_watching_cause = causation.detect_resource_watching_cause(
Expand All @@ -185,6 +193,7 @@ async def resource_handler(
await handle_resource_watching_cause(
lifecycle=lifecycles.all_at_once,
registry=registry,
memory=memory,
cause=resource_watching_cause,
)

Expand All @@ -201,11 +210,13 @@ async def resource_handler(
old=old,
new=new,
diff=diff,
initial=memory.noticed_by_listing and not memory.fully_handled_once,
requires_finalizer=registry.requires_finalizer(resource=resource, body=body),
)
delay = await handle_resource_changing_cause(
lifecycle=lifecycle,
registry=registry,
memory=memory,
cause=resource_changing_cause,
)

Expand Down Expand Up @@ -234,6 +245,7 @@ async def resource_handler(
async def handle_resource_watching_cause(
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
memory: containers.ResourceMemory,
cause: causation.ResourceWatchingCause,
) -> None:
"""
Expand Down Expand Up @@ -262,6 +274,7 @@ async def handle_resource_watching_cause(
async def handle_resource_changing_cause(
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
memory: containers.ResourceMemory,
cause: causation.ResourceChangingCause,
) -> Optional[float]:
"""
Expand Down Expand Up @@ -311,6 +324,10 @@ async def handle_resource_changing_cause(
logger.debug("Removing the finalizer, thus allowing the actual deletion.")
finalizers.remove_finalizers(body=body, patch=patch)

# Once all handlers have succeeded at least once for any reason, or if there were none,
# prevent further resume-handlers (which otherwise happens on each watch-stream re-listing).
memory.fully_handled_once = True

# Informational causes just print the log lines.
if cause.reason == causation.Reason.GONE:
logger.debug("Deleted, really deleted, and we are notified.")
Expand All @@ -319,7 +336,7 @@ async def handle_resource_changing_cause(
logger.debug("Deletion event, but we are done with it, and we do not care.")

if cause.reason == causation.Reason.NOOP:
logger.debug("Something has changed, but we are not interested (state is the same).")
logger.debug("Something has changed, but we are not interested (the essence is the same).")

# For the case of a newly created object, or one that doesn't have the correct
# finalizers, lock it to this operator. Not all newly created objects will
Expand Down
11 changes: 11 additions & 0 deletions kopf/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from kopf.reactor import lifecycles
from kopf.reactor import queueing
from kopf.reactor import registries
from kopf.structs import containers
from kopf.structs import credentials

if TYPE_CHECKING:
Expand Down Expand Up @@ -76,9 +77,11 @@ def login(


def run(
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
lifecycle: Optional[lifecycles.LifeCycleFn] = None,
registry: Optional[registries.OperatorRegistry] = None,
memories: Optional[containers.ResourceMemories] = None,
standalone: bool = False,
priority: int = 0,
peering_name: Optional[str] = None,
Expand All @@ -97,6 +100,7 @@ def run(
loop.run_until_complete(operator(
lifecycle=lifecycle,
registry=registry,
memories=memories,
standalone=standalone,
namespace=namespace,
priority=priority,
Expand All @@ -110,8 +114,10 @@ def run(


async def operator(
*,
lifecycle: Optional[lifecycles.LifeCycleFn] = None,
registry: Optional[registries.OperatorRegistry] = None,
memories: Optional[containers.ResourceMemories] = None,
standalone: bool = False,
priority: int = 0,
peering_name: Optional[str] = None,
Expand All @@ -132,6 +138,7 @@ async def operator(
operator_tasks = await spawn_tasks(
lifecycle=lifecycle,
registry=registry,
memories=memories,
standalone=standalone,
namespace=namespace,
priority=priority,
Expand All @@ -144,8 +151,10 @@ async def operator(


async def spawn_tasks(
*,
lifecycle: Optional[lifecycles.LifeCycleFn] = None,
registry: Optional[registries.OperatorRegistry] = None,
memories: Optional[containers.ResourceMemories] = None,
standalone: bool = False,
priority: int = 0,
peering_name: Optional[str] = None,
Expand All @@ -164,6 +173,7 @@ async def spawn_tasks(
# The freezer and the registry are scoped to this whole task-set, to sync them all.
lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle()
registry = registry if registry is not None else registries.get_default_registry()
memories = memories if memories is not None else containers.ResourceMemories()
vault = vault if vault is not None else global_vault
vault = vault if vault is not None else credentials.Vault()
event_queue: posting.K8sEventQueue = asyncio.Queue(loop=loop)
Expand Down Expand Up @@ -237,6 +247,7 @@ async def spawn_tasks(
handler=functools.partial(handling.resource_handler,
lifecycle=lifecycle,
registry=registry,
memories=memories,
resource=resource,
event_queue=event_queue,
freeze=freeze_flag)))), # freeze is only checked
Expand Down
83 changes: 83 additions & 0 deletions kopf/structs/containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
A in-memory storage of arbitrary information per resource/object.
The information is stored strictly in-memory and is not persistent.
On the operator restart, all the memories are lost.
It is used internally to track allocated system resources for each Kubernetes
object, even if that object does not show up in the event streams for long time.
"""
import dataclasses
from typing import MutableMapping

from kopf.structs import bodies


@dataclasses.dataclass(frozen=False)
class ResourceMemory:
""" A memo about a single resource/object. Usually stored in `Memories`. """
noticed_by_listing: bool = False
fully_handled_once: bool = False


class ResourceMemories:
"""
A container of all memos about every existing resource in a single operator.
Distinct operator tasks have their own memory containers, which
do not overlap. This solves the problem if storing the per-resource
entries in the global or context variables.
The memos can store anything the resource handlers need to persist within
a single process/operator lifetime, but not persisted on the resource.
For example, the runtime system resources: flags, threads, tasks, etc.
Or the scalar values, which have meaning only for this operator process.
The container is relatively async-safe: one individual resource is always
handled sequentially, never in parallel with itself (different resources
are handled in parallel through), so the same key will not be added/deleted
in the background during the operation, so the locking is not needed.
"""
_items: MutableMapping[str, ResourceMemory]

def __init__(self) -> None:
super().__init__()
self._items = {}

async def recall(
self,
body: bodies.Body,
*,
noticed_by_listing: bool = False,
) -> ResourceMemory:
"""
Either find a resource's memory, or create and remember a new one.
"""
key = self._build_key(body)
if key not in self._items:
memory = ResourceMemory(noticed_by_listing=noticed_by_listing)
self._items[key] = memory
return self._items[key]

async def forget(self, body: bodies.Body) -> None:
"""
Forget the resource's memory if it exists; or ignore if it does not.
"""
key = self._build_key(body)
if key in self._items:
del self._items[key]

def _build_key(
self,
body: bodies.Body,
) -> str:
"""
Construct an immutable persistent key of a resource.
Generally, a uid is sufficient, as it is unique within the cluster.
But it can be e.g. plural/namespace/name triplet, or anything else,
even of different types (as long as it satisfies the type checkers).
But it must be consistent within a single process lifetime.
"""
return body.get('metadata', {}).get('uid') or ''
40 changes: 40 additions & 0 deletions tests/basic-structs/test_containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from kopf.structs.bodies import Body
from kopf.structs.containers import ResourceMemory, ResourceMemories

BODY: Body = {
'metadata': {
'uid': 'uid1',
}
}


def test_creation_with_defaults():
ResourceMemory()


async def test_recalling_creates_when_absent():
memories = ResourceMemories()
memory = await memories.recall(BODY)
assert isinstance(memory, ResourceMemory)


async def test_recalling_reuses_when_present():
memories = ResourceMemories()
memory1 = await memories.recall(BODY)
memory2 = await memories.recall(BODY)
assert memory1 is memory2


async def test_forgetting_deletes_when_present():
memories = ResourceMemories()
memory1 = await memories.recall(BODY)
await memories.forget(BODY)

# Check by recalling -- it should be a new one.
memory2 = await memories.recall(BODY)
assert memory1 is not memory2


async def test_forgetting_ignores_when_absent():
memories = ResourceMemories()
await memories.forget(BODY)
Loading

0 comments on commit 223f53d

Please sign in to comment.