Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change terminology from "freezing" to "pausing" #680

Merged
merged 2 commits into from
Feb 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ and `--namespace` CLI options, but does not prevent the mis-configurations.

To run against the real cluster, use the dev-mode of the framework.
This will set the operator's priority to 666 (just a high number),
and will freeze all other running operators (the default priority is 0)
and will pause all other running operators (the default priority is 0)
for the runtime, so that they do not collide with each other:

```bash
kopf run examples/01-minimal/example.py --verbose --dev
```

Alternatively, explicitly freeze/resume all other operators,
and it will freeze them even if your operator is not running
Alternatively, explicitly pause/resume all other operators,
and it will pause them even if your operator is not running
(e.g., for 2 hours):

```bash
Expand Down
16 changes: 8 additions & 8 deletions docs/peering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ Priorities
==========

Each operator has a priority (the default is 0). Whenever the operator
notices that other operators start with a higher priority, it freezes
notices that other operators start with a higher priority, it pauses
its operation until those operators stop working.

This is done to prevent collisions of multiple operators handling
the same objects. If two operators runs with the same priority all operators
issue a warning and freeze, so that the cluster becomes not served anymore.
the same objects. If two operators runs with the same priority, all operators
issue a warning and pause, so that the cluster is not served anymore.

To set the operator's priority, use :option:`--priority`:

Expand Down Expand Up @@ -107,12 +107,12 @@ Or:
Depending on :option:`--namespace` or :option:`--all-namespaces`,
either ``ClusterKopfPeering`` or ``KopfPeering`` will be used automatically.

If the peering object does not exist, the operator will freeze at start.
If the peering object does not exist, the operator will pause at start.
Using :option:`--peering` assumes that the peering is mandatory.

Please note that in the startup handler, this is not exactly the same:
the mandatory mode must be set explicitly. Otherwise, the operator will try
to auto-detect the presence of the custom peering object, but will not freeze
to auto-detect the presence of the custom peering object, but will not pause
if it is absent -- unlike with the ``--peering=`` CLI option.

The operators from different peering objects do not see each other.
Expand All @@ -139,7 +139,7 @@ Or:
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.standalone = True

In that case, the operator will not freeze if other operators with
In that case, the operator will not pause if other operators with
the higher priority will start handling the objects, which may lead
to the conflicting changes and reactions from multiple operators
for the same events.
Expand All @@ -164,7 +164,7 @@ will stop until the operator's pod is restarted (and if restarted at all).

To start multiple operator pods, they must be distinctly prioritised.
In that case, only one operator will be active --- the one with the highest
priority. All other operators will freeze and wait until this operator dies.
priority. All other operators will pause and wait until this operator exits.
Once it dies, the second highest priority operator will come into play.
And so on.

Expand Down Expand Up @@ -200,7 +200,7 @@ Stealth keep-alives

Every few seconds (60 by default), the operator will send a keep-alive update
to the chosen peering, showing that it is still functioning. Other operators
will notice that and make decisions on their freezing or resuming.
will notice that and make decisions on their pausing or resuming.

The operator also logs a keep-alive activity to its own logs. This can be
distracting. To disable:
Expand Down
12 changes: 6 additions & 6 deletions examples/06-peering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ their liveness and the priorities, and cooperate to avoid the undesired
side-effects (e.g., duplicated children creation, infinite cross-changes).

The main use-case for this is the development mode: when a developer starts
an operator on their workstation, all the deployed operators should freeze
an operator on their workstation, all the deployed operators should pause
and stop processing of the objects, until the developer's operator exits.

In shell A, start an operator:
Expand Down Expand Up @@ -38,18 +38,18 @@ Now, stop the operator B wtih Ctrl+C (twice), and start it with `--dev` option
kopf run example.py --verbose --dev
```

Observe how the operator A freezes and lets
Observe how the operator A pauses and lets
operator B to take control over the objects.

```
[2019-02-05 20:43:40,360] kopf.peering [INFO ] Freezing operations in favour of [Peer(54e7054f28d948c4985db79410c9ef4a, priority=666, lastseen=2019-02-05 19:43:40.166561, lifetime=0:01:00)].
[2019-02-05 20:43:40,360] kopf.peering [INFO ] Pausing operations in favour of [Peer(54e7054f28d948c4985db79410c9ef4a, priority=666, lastseen=2019-02-05 19:43:40.166561, lifetime=0:01:00)].
```

Stop the operator B again with Ctrl+C (twice).
The operator A resumes its operations:

```
[2019-02-05 20:44:54,311] kopf.peering [INFO ] Resuming operations after the freeze.
[2019-02-05 20:44:54,311] kopf.peering [INFO ] Resuming operations after the pause.
```

The same can be achieved with the explicit CLI commands:
Expand All @@ -60,6 +60,6 @@ kopf resume
```

```
[2019-02-05 20:45:34,354] kopf.peering [INFO ] Freezing operations in favour of [Peer(manual, priority=100, lastseen=2019-02-05 19:45:34.226070, lifetime=0:01:00)].
[2019-02-05 20:45:49,427] kopf.peering [INFO ] Resuming operations after the freeze.
[2019-02-05 20:45:34,354] kopf.peering [INFO ] Pausing operations in favour of [Peer(manual, priority=100, lastseen=2019-02-05 19:45:34.226070, lifetime=0:01:00)].
[2019-02-05 20:45:49,427] kopf.peering [INFO ] Resuming operations after the pause.
```
4 changes: 2 additions & 2 deletions kopf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def freeze(
peering_name: str,
priority: int,
) -> None:
""" Freeze the resource handling in the cluster. """
""" Pause the resource handling in the operator(s). """
identity = peering.Identity(id) if id else peering.detect_own_id(manual=True)
insights = references.Insights()
settings = configuration.OperatorSettings()
Expand Down Expand Up @@ -159,7 +159,7 @@ def resume(
clusterwide: bool,
peering_name: str,
) -> None:
""" Resume the resource handling in the cluster. """
""" Resume the resource handling in the operator(s). """
identity = peering.Identity(id) if id else peering.detect_own_id(manual=True)
insights = references.Insights()
settings = configuration.OperatorSettings()
Expand Down
84 changes: 42 additions & 42 deletions kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def infinite_watch(
settings: configuration.OperatorSettings,
resource: references.Resource,
namespace: references.Namespace,
freeze_checker: Optional[primitives.ToggleSet] = None,
operator_paused: Optional[primitives.ToggleSet] = None, # None for tests & observation
_iterations: Optional[int] = None, # used in tests/mocks/fixtures
) -> AsyncIterator[bodies.RawEvent]:
"""
Expand All @@ -57,7 +57,7 @@ async def infinite_watch(
a new one is recreated, and the stream continues.
It only exits with unrecoverable exceptions.
"""
how = ' (frozen)' if freeze_checker is not None and freeze_checker.is_on() else ''
how = ' (paused)' if operator_paused is not None and operator_paused.is_on() else ''
where = f'in {namespace!r}' if namespace is not None else 'cluster-wide'
logger.debug(f"Starting the watch-stream for {resource} {where}{how}.")
try:
Expand All @@ -66,13 +66,13 @@ async def infinite_watch(
async with streaming_block(
namespace=namespace,
resource=resource,
freeze_checker=freeze_checker,
) as freeze_waiter:
operator_paused=operator_paused,
) as operator_pause_waiter:
stream = continuous_watch(
settings=settings,
resource=resource,
namespace=namespace,
freeze_waiter=freeze_waiter,
operator_pause_waiter=operator_pause_waiter,
)
async for raw_event in stream:
yield raw_event
Expand All @@ -86,66 +86,66 @@ async def streaming_block(
*,
resource: references.Resource,
namespace: references.Namespace,
freeze_checker: Optional[primitives.ToggleSet],
operator_paused: Optional[primitives.ToggleSet] = None, # None for tests & observation
) -> AsyncIterator[aiotasks.Future]:
"""
Block the execution until the freeze is off; signal when it is on again.
Block the execution until un-paused; signal when it is active again.

This prevents both watching and listing while the freeze mode is on,
This prevents both watching and listing while the operator is paused,
until it is off. Specifically, the watch-stream closes its connection
once the freeze mode is on, so the while-true & for-event-in-stream cycles
exit, and the streaming coroutine is started again by `infinite_stream()`
(the watcher timeout is swallowed by the freeze time).
once paused, so the while-true & for-event-in-stream cycles exit,
and the streaming coroutine is started again by `infinite_stream()`
(the watcher timeout is swallowed by the pause time).

Returns a future (or a task) that is set when the freeze is turned on again.
Returns a future (or a task) that is set (or finished) when paused again.

A stop-future is a client-specific way of terminating the streaming HTTPS
connections when the freeze is turned back on. The low-level streaming API
call attaches its `response.close()` to the future's "done" callback,
so that the stream is closed once the freeze is turned back on.
connections when paused again. The low-level streaming API call attaches
its `response.close()` to the future's "done" callback,
so that the stream is closed once the operator is paused.

Note: this routine belongs to watching and does not belong to peering.
The freeze can be managed in any other ways: as an imaginary edge case,
imagine a operator with UI with a "pause" button that freezes the operator.
The pause can be managed in any other ways: as an imaginary edge case,
imagine a operator with UI with a "pause" button that pauses the operator.
"""
where = f'in {namespace!r}' if namespace is not None else 'cluster-wide'

# Block until unfrozen before even starting the API communication.
if freeze_checker is not None and freeze_checker.is_on():
names = {toggle.name for toggle in freeze_checker if toggle.is_on() and toggle.name}
freezing_reason = f" (blockers: {', '.join(names)})" if names else ""
logger.debug(f"Freezing the watch-stream for {resource} {where}{freezing_reason}.")
# Block until unpaused before even starting the API communication.
if operator_paused is not None and operator_paused.is_on():
names = {toggle.name for toggle in operator_paused if toggle.is_on() and toggle.name}
pause_reason = f" (blockers: {', '.join(names)})" if names else ""
logger.debug(f"Pausing the watch-stream for {resource} {where}{pause_reason}.")

await freeze_checker.wait_for(False)
await operator_paused.wait_for(False)

names = {toggle.name for toggle in freeze_checker if toggle.is_on() and toggle.name}
names = {toggle.name for toggle in operator_paused if toggle.is_on() and toggle.name}
resuming_reason = f" (resolved: {', '.join(names)})" if names else ""
logger.debug(f"Resuming the watch-stream for {resource} {where}{resuming_reason}.")

# Create the signalling future that the freeze is on again.
freeze_waiter: aiotasks.Future
if freeze_checker is not None:
freeze_waiter = aiotasks.create_task(
freeze_checker.wait_for(True),
name=f"freeze-waiter for {resource}")
# Create the signalling future for when paused again.
operator_pause_waiter: aiotasks.Future
if operator_paused is not None:
operator_pause_waiter = aiotasks.create_task(
operator_paused.wait_for(True),
name=f"pause-waiter for {resource}")
else:
freeze_waiter = asyncio.Future() # a dummy just to have it
operator_pause_waiter = asyncio.Future() # a dummy just to have it

# Go for the streaming with the prepared freezing/unfreezing setup.
# Go for the streaming with the prepared pauseing/unpausing setup.
try:
yield freeze_waiter
yield operator_pause_waiter
finally:
with contextlib.suppress(asyncio.CancelledError):
freeze_waiter.cancel()
await freeze_waiter
operator_pause_waiter.cancel()
await operator_pause_waiter


async def continuous_watch(
*,
settings: configuration.OperatorSettings,
resource: references.Resource,
namespace: references.Namespace,
freeze_waiter: aiotasks.Future,
operator_pause_waiter: aiotasks.Future,
) -> AsyncIterator[bodies.RawEvent]:

# First, list the resources regularly, and get the list's resource version.
Expand All @@ -156,7 +156,7 @@ async def continuous_watch(

# Repeat through disconnects of the watch as long as the resource version is valid (no errors).
# The individual watching API calls are disconnected by timeout even if the stream is fine.
while not freeze_waiter.done():
while not operator_pause_waiter.done():

# Then, watch the resources starting from the list's resource version.
stream = watch_objs(
Expand All @@ -165,7 +165,7 @@ async def continuous_watch(
namespace=namespace,
timeout=settings.watching.server_timeout,
since=resource_version,
freeze_waiter=freeze_waiter,
operator_pause_waiter=operator_pause_waiter,
)
async for raw_input in stream:
raw_type = raw_input['type']
Expand Down Expand Up @@ -205,7 +205,7 @@ async def watch_objs(
timeout: Optional[float] = None,
since: Optional[str] = None,
context: Optional[auth.APIContext] = None, # injected by the decorator
freeze_waiter: aiotasks.Future,
operator_pause_waiter: aiotasks.Future,
) -> AsyncIterator[bodies.RawInput]:
"""
Watch objects of a specific resource type.
Expand All @@ -230,7 +230,7 @@ async def watch_objs(
params['timeoutSeconds'] = str(timeout)

# Stream the parsed events from the response until it is closed server-side,
# or until it is closed client-side by the freeze-waiting future's callbacks.
# or until it is closed client-side by the pause-waiting future's callbacks.
try:
response = await context.session.get(
url=resource.get_url(server=context.server, namespace=namespace, params=params),
Expand All @@ -242,14 +242,14 @@ async def watch_objs(
await errors.check_response(response)

response_close_callback = lambda _: response.close()
freeze_waiter.add_done_callback(response_close_callback)
operator_pause_waiter.add_done_callback(response_close_callback)
try:
async with response:
async for line in _iter_jsonlines(response.content):
raw_input = cast(bodies.RawInput, json.loads(line.decode("utf-8")))
yield raw_input
finally:
freeze_waiter.remove_done_callback(response_close_callback)
operator_pause_waiter.remove_done_callback(response_close_callback)

except (aiohttp.ClientConnectionError, aiohttp.ClientPayloadError, asyncio.TimeoutError):
pass
Expand Down
33 changes: 16 additions & 17 deletions kopf/engines/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,14 @@ async def process_peering_event(
settings: configuration.OperatorSettings,
autoclean: bool = True,
replenished: asyncio.Event,
freeze_toggle: primitives.Toggle,
conflicts_found: Optional[primitives.Toggle] = None, # None for tests & observation
) -> None:
"""
Handle a single update of the peers by us or by other operators.

When an operator with a higher priority appears, switch to the freeze-mode.
The these operators disappear or become presumably dead, resume the event handling.

The freeze object is passed both to the peers handler to set/clear it,
and to all the resource handlers to check its value when the events arrive
(see :func:`spawn_tasks`).
When an operator with a higher priority appears, pause this operator.
When conflicting operators disappear or become presumably dead,
resume the event handling in the current operator (un-pause it).
"""
body: bodies.RawBody = raw_event['object']
meta: bodies.RawMeta = raw_event['object']['metadata']
Expand All @@ -124,19 +121,21 @@ async def process_peering_event(
if autoclean and dead_peers:
await clean(peers=dead_peers, settings=settings, resource=resource, namespace=namespace)

if prio_peers:
if freeze_toggle.is_off():
logger.info(f"Freezing operations in favour of {prio_peers}.")
await freeze_toggle.turn_to(True)
if conflicts_found is None:
pass
elif prio_peers:
if conflicts_found.is_off():
logger.info(f"Pausing operations in favour of {prio_peers}.")
await conflicts_found.turn_to(True)
elif same_peers:
logger.warning(f"Possibly conflicting operators with the same priority: {same_peers}.")
if freeze_toggle.is_off():
logger.warning(f"Freezing all operators, including self: {peers}")
await freeze_toggle.turn_to(True)
if conflicts_found.is_off():
logger.warning(f"Pausing all operators, including self: {peers}")
await conflicts_found.turn_to(True)
else:
if freeze_toggle.is_on():
logger.info(f"Resuming operations after the freeze. Conflicting operators with the same priority are gone.")
await freeze_toggle.turn_to(False)
if conflicts_found.is_on():
logger.info(f"Resuming operations after the pause. Conflicting operators with the same priority are gone.")
await conflicts_found.turn_to(False)

# Either wait for external updates (and exit when they arrive), or until the blocking peers
# are expected to expire, and force the immediate re-evaluation by a certain change of self.
Expand Down
Loading