From d4122dd24e607461fb305dd7fff73fdef51b234d Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 11 Feb 2021 22:58:34 +0100 Subject: [PATCH 1/2] Pass unused freeze checkers/toggles as None by default Signed-off-by: Sergey Vasilyev --- kopf/clients/watching.py | 4 ++-- kopf/engines/peering.py | 6 ++++-- kopf/reactor/observation.py | 2 -- kopf/reactor/orchestration.py | 1 - kopf/reactor/queueing.py | 2 +- tests/peering/test_freeze_mode.py | 1 - 6 files changed, 7 insertions(+), 9 deletions(-) diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index 9f17b49f..c83e95c2 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -44,7 +44,7 @@ async def infinite_watch( settings: configuration.OperatorSettings, resource: references.Resource, namespace: references.Namespace, - freeze_checker: Optional[primitives.ToggleSet] = None, + freeze_checker: Optional[primitives.ToggleSet] = None, # None for tests & observation _iterations: Optional[int] = None, # used in tests/mocks/fixtures ) -> AsyncIterator[bodies.RawEvent]: """ @@ -86,7 +86,7 @@ async def streaming_block( *, resource: references.Resource, namespace: references.Namespace, - freeze_checker: Optional[primitives.ToggleSet], + freeze_checker: Optional[primitives.ToggleSet], # None for tests & observation ) -> AsyncIterator[aiotasks.Future]: """ Block the execution until the freeze is off; signal when it is on again. diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 9aa29fd3..593dfc44 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -94,7 +94,7 @@ async def process_peering_event( settings: configuration.OperatorSettings, autoclean: bool = True, replenished: asyncio.Event, - freeze_toggle: primitives.Toggle, + freeze_toggle: Optional[primitives.Toggle] = None, # None for tests & observation ) -> None: """ Handle a single update of the peers by us or by other operators. @@ -124,7 +124,9 @@ async def process_peering_event( if autoclean and dead_peers: await clean(peers=dead_peers, settings=settings, resource=resource, namespace=namespace) - if prio_peers: + if freeze_toggle is None: + pass + elif prio_peers: if freeze_toggle.is_off(): logger.info(f"Freezing operations in favour of {prio_peers}.") await freeze_toggle.turn_to(True) diff --git a/kopf/reactor/observation.py b/kopf/reactor/observation.py index cf38ff2c..684e33a8 100644 --- a/kopf/reactor/observation.py +++ b/kopf/reactor/observation.py @@ -67,7 +67,6 @@ async def namespace_observer( if not settings.scanning.disabled and not clusterwide: try: await queueing.watcher( - freeze_checker=None, settings=settings, resource=resource, namespace=None, @@ -116,7 +115,6 @@ async def resource_observer( if not settings.scanning.disabled: try: await queueing.watcher( - freeze_checker=None, settings=settings, resource=resource, namespace=None, diff --git a/kopf/reactor/orchestration.py b/kopf/reactor/orchestration.py index 32715813..9a20a31f 100644 --- a/kopf/reactor/orchestration.py +++ b/kopf/reactor/orchestration.py @@ -186,7 +186,6 @@ async def spawn_missing_peerings( ensemble.peering_tasks[dkey] = aiotasks.create_guarded_task( name=f"peering observer for {what}", logger=logger, cancellable=True, coro=queueing.watcher( - freeze_checker=None, settings=settings, resource=resource, namespace=namespace, diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index ec00b08d..ed4e5bc8 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -129,7 +129,7 @@ async def watcher( settings: configuration.OperatorSettings, resource: references.Resource, processor: WatchStreamProcessor, - freeze_checker: Optional[primitives.ToggleSet] = None, + freeze_checker: Optional[primitives.ToggleSet] = None, # None for tests & observation ) -> None: """ The watchers watches for the resource events via the API, and spawns the workers for every object. diff --git a/tests/peering/test_freeze_mode.py b/tests/peering/test_freeze_mode.py index 8492e4b8..68ecc243 100644 --- a/tests/peering/test_freeze_mode.py +++ b/tests/peering/test_freeze_mode.py @@ -40,7 +40,6 @@ async def test_other_peering_objects_are_ignored( settings.peering.name = 'our-name' await process_peering_event( raw_event=event, - freeze_toggle=primitives.Toggle(), replenished=asyncio.Event(), autoclean=False, identity='id', From d6c4d04585cd309cefaed031787015b798d25cc3 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 13 Feb 2021 13:57:24 +0100 Subject: [PATCH 2/2] Change terminology from "freezing" to "pausing" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This covers: * Documentation and examples. * Code comments and docstrings. * Variable names. * Log messages. * Tests. But not: * `kopf freeze` CLI — that would require the public interface change. As the primary goal, the variables are now named after their main purpose and meaning (operator_paused, conflicts_found) instead of their type or role (freeze_checker, freeze_toggle). This makes code more readable semantically (as if reading English). And also enabled adding more checkers & toggle with different roles (e.g. for cache pre-population). As a secondary goal, it is now called "pausing", as there are already too many things named after "freezing": `freezegun`, `frozen_time`, task freezing, API requests freezing (for auth), frozen dataclasses, throttling freezes on errors, etc. The word is overloaded with meanings, while "pausing" is nearly unused. **IMPORTANT:** The code and logic are exactly the same! Nothing is changed, just renamed. The public interface is not affected, only the internal things are. Signed-off-by: Sergey Vasilyev --- DEVELOPMENT.md | 6 +- docs/peering.rst | 16 ++-- examples/06-peering/README.md | 12 +-- kopf/cli.py | 4 +- kopf/clients/watching.py | 84 ++++++++--------- kopf/engines/peering.py | 31 +++---- kopf/reactor/daemons.py | 14 +-- kopf/reactor/orchestration.py | 42 ++++----- kopf/reactor/queueing.py | 4 +- kopf/reactor/running.py | 8 +- kopf/structs/configuration.py | 6 +- kopf/structs/primitives.py | 4 +- tests/handling/daemons/conftest.py | 10 +- .../daemons/test_daemon_termination.py | 12 +-- tests/k8s/test_watching_continuously.py | 14 +-- tests/k8s/test_watching_with_freezes.py | 34 +++---- tests/orchestration/test_task_adjustments.py | 40 ++++---- tests/peering/test_freeze_mode.py | 92 +++++++++---------- 18 files changed, 215 insertions(+), 218 deletions(-) diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index f6387ecf..4e53432e 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -109,15 +109,15 @@ and `--namespace` CLI options, but does not prevent the mis-configurations. To run against the real cluster, use the dev-mode of the framework. This will set the operator's priority to 666 (just a high number), -and will freeze all other running operators (the default priority is 0) +and will pause all other running operators (the default priority is 0) for the runtime, so that they do not collide with each other: ```bash kopf run examples/01-minimal/example.py --verbose --dev ``` -Alternatively, explicitly freeze/resume all other operators, -and it will freeze them even if your operator is not running +Alternatively, explicitly pause/resume all other operators, +and it will pause them even if your operator is not running (e.g., for 2 hours): ```bash diff --git a/docs/peering.rst b/docs/peering.rst index e7bb0172..cf9134d6 100644 --- a/docs/peering.rst +++ b/docs/peering.rst @@ -10,12 +10,12 @@ Priorities ========== Each operator has a priority (the default is 0). Whenever the operator -notices that other operators start with a higher priority, it freezes +notices that other operators start with a higher priority, it pauses its operation until those operators stop working. This is done to prevent collisions of multiple operators handling -the same objects. If two operators runs with the same priority all operators -issue a warning and freeze, so that the cluster becomes not served anymore. +the same objects. If two operators runs with the same priority, all operators +issue a warning and pause, so that the cluster is not served anymore. To set the operator's priority, use :option:`--priority`: @@ -107,12 +107,12 @@ Or: Depending on :option:`--namespace` or :option:`--all-namespaces`, either ``ClusterKopfPeering`` or ``KopfPeering`` will be used automatically. -If the peering object does not exist, the operator will freeze at start. +If the peering object does not exist, the operator will pause at start. Using :option:`--peering` assumes that the peering is mandatory. Please note that in the startup handler, this is not exactly the same: the mandatory mode must be set explicitly. Otherwise, the operator will try -to auto-detect the presence of the custom peering object, but will not freeze +to auto-detect the presence of the custom peering object, but will not pause if it is absent -- unlike with the ``--peering=`` CLI option. The operators from different peering objects do not see each other. @@ -139,7 +139,7 @@ Or: def configure(settings: kopf.OperatorSettings, **_): settings.peering.standalone = True -In that case, the operator will not freeze if other operators with +In that case, the operator will not pause if other operators with the higher priority will start handling the objects, which may lead to the conflicting changes and reactions from multiple operators for the same events. @@ -164,7 +164,7 @@ will stop until the operator's pod is restarted (and if restarted at all). To start multiple operator pods, they must be distinctly prioritised. In that case, only one operator will be active --- the one with the highest -priority. All other operators will freeze and wait until this operator dies. +priority. All other operators will pause and wait until this operator exits. Once it dies, the second highest priority operator will come into play. And so on. @@ -200,7 +200,7 @@ Stealth keep-alives Every few seconds (60 by default), the operator will send a keep-alive update to the chosen peering, showing that it is still functioning. Other operators -will notice that and make decisions on their freezing or resuming. +will notice that and make decisions on their pausing or resuming. The operator also logs a keep-alive activity to its own logs. This can be distracting. To disable: diff --git a/examples/06-peering/README.md b/examples/06-peering/README.md index e58cc105..3f5fe8c4 100644 --- a/examples/06-peering/README.md +++ b/examples/06-peering/README.md @@ -6,7 +6,7 @@ their liveness and the priorities, and cooperate to avoid the undesired side-effects (e.g., duplicated children creation, infinite cross-changes). The main use-case for this is the development mode: when a developer starts -an operator on their workstation, all the deployed operators should freeze +an operator on their workstation, all the deployed operators should pause and stop processing of the objects, until the developer's operator exits. In shell A, start an operator: @@ -38,18 +38,18 @@ Now, stop the operator B wtih Ctrl+C (twice), and start it with `--dev` option kopf run example.py --verbose --dev ``` -Observe how the operator A freezes and lets +Observe how the operator A pauses and lets operator B to take control over the objects. ``` -[2019-02-05 20:43:40,360] kopf.peering [INFO ] Freezing operations in favour of [Peer(54e7054f28d948c4985db79410c9ef4a, priority=666, lastseen=2019-02-05 19:43:40.166561, lifetime=0:01:00)]. +[2019-02-05 20:43:40,360] kopf.peering [INFO ] Pausing operations in favour of [Peer(54e7054f28d948c4985db79410c9ef4a, priority=666, lastseen=2019-02-05 19:43:40.166561, lifetime=0:01:00)]. ``` Stop the operator B again with Ctrl+C (twice). The operator A resumes its operations: ``` -[2019-02-05 20:44:54,311] kopf.peering [INFO ] Resuming operations after the freeze. +[2019-02-05 20:44:54,311] kopf.peering [INFO ] Resuming operations after the pause. ``` The same can be achieved with the explicit CLI commands: @@ -60,6 +60,6 @@ kopf resume ``` ``` -[2019-02-05 20:45:34,354] kopf.peering [INFO ] Freezing operations in favour of [Peer(manual, priority=100, lastseen=2019-02-05 19:45:34.226070, lifetime=0:01:00)]. -[2019-02-05 20:45:49,427] kopf.peering [INFO ] Resuming operations after the freeze. +[2019-02-05 20:45:34,354] kopf.peering [INFO ] Pausing operations in favour of [Peer(manual, priority=100, lastseen=2019-02-05 19:45:34.226070, lifetime=0:01:00)]. +[2019-02-05 20:45:49,427] kopf.peering [INFO ] Resuming operations after the pause. ``` diff --git a/kopf/cli.py b/kopf/cli.py index d922630a..891226b6 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -128,7 +128,7 @@ def freeze( peering_name: str, priority: int, ) -> None: - """ Freeze the resource handling in the cluster. """ + """ Pause the resource handling in the operator(s). """ identity = peering.Identity(id) if id else peering.detect_own_id(manual=True) insights = references.Insights() settings = configuration.OperatorSettings() @@ -159,7 +159,7 @@ def resume( clusterwide: bool, peering_name: str, ) -> None: - """ Resume the resource handling in the cluster. """ + """ Resume the resource handling in the operator(s). """ identity = peering.Identity(id) if id else peering.detect_own_id(manual=True) insights = references.Insights() settings = configuration.OperatorSettings() diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index c83e95c2..d7893e69 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -44,7 +44,7 @@ async def infinite_watch( settings: configuration.OperatorSettings, resource: references.Resource, namespace: references.Namespace, - freeze_checker: Optional[primitives.ToggleSet] = None, # None for tests & observation + operator_paused: Optional[primitives.ToggleSet] = None, # None for tests & observation _iterations: Optional[int] = None, # used in tests/mocks/fixtures ) -> AsyncIterator[bodies.RawEvent]: """ @@ -57,7 +57,7 @@ async def infinite_watch( a new one is recreated, and the stream continues. It only exits with unrecoverable exceptions. """ - how = ' (frozen)' if freeze_checker is not None and freeze_checker.is_on() else '' + how = ' (paused)' if operator_paused is not None and operator_paused.is_on() else '' where = f'in {namespace!r}' if namespace is not None else 'cluster-wide' logger.debug(f"Starting the watch-stream for {resource} {where}{how}.") try: @@ -66,13 +66,13 @@ async def infinite_watch( async with streaming_block( namespace=namespace, resource=resource, - freeze_checker=freeze_checker, - ) as freeze_waiter: + operator_paused=operator_paused, + ) as operator_pause_waiter: stream = continuous_watch( settings=settings, resource=resource, namespace=namespace, - freeze_waiter=freeze_waiter, + operator_pause_waiter=operator_pause_waiter, ) async for raw_event in stream: yield raw_event @@ -86,58 +86,58 @@ async def streaming_block( *, resource: references.Resource, namespace: references.Namespace, - freeze_checker: Optional[primitives.ToggleSet], # None for tests & observation + operator_paused: Optional[primitives.ToggleSet] = None, # None for tests & observation ) -> AsyncIterator[aiotasks.Future]: """ - Block the execution until the freeze is off; signal when it is on again. + Block the execution until un-paused; signal when it is active again. - This prevents both watching and listing while the freeze mode is on, + This prevents both watching and listing while the operator is paused, until it is off. Specifically, the watch-stream closes its connection - once the freeze mode is on, so the while-true & for-event-in-stream cycles - exit, and the streaming coroutine is started again by `infinite_stream()` - (the watcher timeout is swallowed by the freeze time). + once paused, so the while-true & for-event-in-stream cycles exit, + and the streaming coroutine is started again by `infinite_stream()` + (the watcher timeout is swallowed by the pause time). - Returns a future (or a task) that is set when the freeze is turned on again. + Returns a future (or a task) that is set (or finished) when paused again. A stop-future is a client-specific way of terminating the streaming HTTPS - connections when the freeze is turned back on. The low-level streaming API - call attaches its `response.close()` to the future's "done" callback, - so that the stream is closed once the freeze is turned back on. + connections when paused again. The low-level streaming API call attaches + its `response.close()` to the future's "done" callback, + so that the stream is closed once the operator is paused. Note: this routine belongs to watching and does not belong to peering. - The freeze can be managed in any other ways: as an imaginary edge case, - imagine a operator with UI with a "pause" button that freezes the operator. + The pause can be managed in any other ways: as an imaginary edge case, + imagine a operator with UI with a "pause" button that pauses the operator. """ where = f'in {namespace!r}' if namespace is not None else 'cluster-wide' - # Block until unfrozen before even starting the API communication. - if freeze_checker is not None and freeze_checker.is_on(): - names = {toggle.name for toggle in freeze_checker if toggle.is_on() and toggle.name} - freezing_reason = f" (blockers: {', '.join(names)})" if names else "" - logger.debug(f"Freezing the watch-stream for {resource} {where}{freezing_reason}.") + # Block until unpaused before even starting the API communication. + if operator_paused is not None and operator_paused.is_on(): + names = {toggle.name for toggle in operator_paused if toggle.is_on() and toggle.name} + pause_reason = f" (blockers: {', '.join(names)})" if names else "" + logger.debug(f"Pausing the watch-stream for {resource} {where}{pause_reason}.") - await freeze_checker.wait_for(False) + await operator_paused.wait_for(False) - names = {toggle.name for toggle in freeze_checker if toggle.is_on() and toggle.name} + names = {toggle.name for toggle in operator_paused if toggle.is_on() and toggle.name} resuming_reason = f" (resolved: {', '.join(names)})" if names else "" logger.debug(f"Resuming the watch-stream for {resource} {where}{resuming_reason}.") - # Create the signalling future that the freeze is on again. - freeze_waiter: aiotasks.Future - if freeze_checker is not None: - freeze_waiter = aiotasks.create_task( - freeze_checker.wait_for(True), - name=f"freeze-waiter for {resource}") + # Create the signalling future for when paused again. + operator_pause_waiter: aiotasks.Future + if operator_paused is not None: + operator_pause_waiter = aiotasks.create_task( + operator_paused.wait_for(True), + name=f"pause-waiter for {resource}") else: - freeze_waiter = asyncio.Future() # a dummy just to have it + operator_pause_waiter = asyncio.Future() # a dummy just to have it - # Go for the streaming with the prepared freezing/unfreezing setup. + # Go for the streaming with the prepared pauseing/unpausing setup. try: - yield freeze_waiter + yield operator_pause_waiter finally: with contextlib.suppress(asyncio.CancelledError): - freeze_waiter.cancel() - await freeze_waiter + operator_pause_waiter.cancel() + await operator_pause_waiter async def continuous_watch( @@ -145,7 +145,7 @@ async def continuous_watch( settings: configuration.OperatorSettings, resource: references.Resource, namespace: references.Namespace, - freeze_waiter: aiotasks.Future, + operator_pause_waiter: aiotasks.Future, ) -> AsyncIterator[bodies.RawEvent]: # First, list the resources regularly, and get the list's resource version. @@ -156,7 +156,7 @@ async def continuous_watch( # Repeat through disconnects of the watch as long as the resource version is valid (no errors). # The individual watching API calls are disconnected by timeout even if the stream is fine. - while not freeze_waiter.done(): + while not operator_pause_waiter.done(): # Then, watch the resources starting from the list's resource version. stream = watch_objs( @@ -165,7 +165,7 @@ async def continuous_watch( namespace=namespace, timeout=settings.watching.server_timeout, since=resource_version, - freeze_waiter=freeze_waiter, + operator_pause_waiter=operator_pause_waiter, ) async for raw_input in stream: raw_type = raw_input['type'] @@ -205,7 +205,7 @@ async def watch_objs( timeout: Optional[float] = None, since: Optional[str] = None, context: Optional[auth.APIContext] = None, # injected by the decorator - freeze_waiter: aiotasks.Future, + operator_pause_waiter: aiotasks.Future, ) -> AsyncIterator[bodies.RawInput]: """ Watch objects of a specific resource type. @@ -230,7 +230,7 @@ async def watch_objs( params['timeoutSeconds'] = str(timeout) # Stream the parsed events from the response until it is closed server-side, - # or until it is closed client-side by the freeze-waiting future's callbacks. + # or until it is closed client-side by the pause-waiting future's callbacks. try: response = await context.session.get( url=resource.get_url(server=context.server, namespace=namespace, params=params), @@ -242,14 +242,14 @@ async def watch_objs( await errors.check_response(response) response_close_callback = lambda _: response.close() - freeze_waiter.add_done_callback(response_close_callback) + operator_pause_waiter.add_done_callback(response_close_callback) try: async with response: async for line in _iter_jsonlines(response.content): raw_input = cast(bodies.RawInput, json.loads(line.decode("utf-8"))) yield raw_input finally: - freeze_waiter.remove_done_callback(response_close_callback) + operator_pause_waiter.remove_done_callback(response_close_callback) except (aiohttp.ClientConnectionError, aiohttp.ClientPayloadError, asyncio.TimeoutError): pass diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 593dfc44..de4ea310 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -94,17 +94,14 @@ async def process_peering_event( settings: configuration.OperatorSettings, autoclean: bool = True, replenished: asyncio.Event, - freeze_toggle: Optional[primitives.Toggle] = None, # None for tests & observation + conflicts_found: Optional[primitives.Toggle] = None, # None for tests & observation ) -> None: """ Handle a single update of the peers by us or by other operators. - When an operator with a higher priority appears, switch to the freeze-mode. - The these operators disappear or become presumably dead, resume the event handling. - - The freeze object is passed both to the peers handler to set/clear it, - and to all the resource handlers to check its value when the events arrive - (see :func:`spawn_tasks`). + When an operator with a higher priority appears, pause this operator. + When conflicting operators disappear or become presumably dead, + resume the event handling in the current operator (un-pause it). """ body: bodies.RawBody = raw_event['object'] meta: bodies.RawMeta = raw_event['object']['metadata'] @@ -124,21 +121,21 @@ async def process_peering_event( if autoclean and dead_peers: await clean(peers=dead_peers, settings=settings, resource=resource, namespace=namespace) - if freeze_toggle is None: + if conflicts_found is None: pass elif prio_peers: - if freeze_toggle.is_off(): - logger.info(f"Freezing operations in favour of {prio_peers}.") - await freeze_toggle.turn_to(True) + if conflicts_found.is_off(): + logger.info(f"Pausing operations in favour of {prio_peers}.") + await conflicts_found.turn_to(True) elif same_peers: logger.warning(f"Possibly conflicting operators with the same priority: {same_peers}.") - if freeze_toggle.is_off(): - logger.warning(f"Freezing all operators, including self: {peers}") - await freeze_toggle.turn_to(True) + if conflicts_found.is_off(): + logger.warning(f"Pausing all operators, including self: {peers}") + await conflicts_found.turn_to(True) else: - if freeze_toggle.is_on(): - logger.info(f"Resuming operations after the freeze. Conflicting operators with the same priority are gone.") - await freeze_toggle.turn_to(False) + if conflicts_found.is_on(): + logger.info(f"Resuming operations after the pause. Conflicting operators with the same priority are gone.") + await conflicts_found.turn_to(False) # Either wait for external updates (and exit when they arrive), or until the blocking peers # are expected to expire, and force the immediate re-evaluation by a certain change of self. diff --git a/kopf/reactor/daemons.py b/kopf/reactor/daemons.py index 0c46f4f5..a39a03ac 100644 --- a/kopf/reactor/daemons.py +++ b/kopf/reactor/daemons.py @@ -211,7 +211,7 @@ async def daemon_killer( *, settings: configuration.OperatorSettings, memories: containers.ResourceMemories, - freeze_checker: primitives.ToggleSet, + operator_paused: primitives.ToggleSet, ) -> None: """ An operator's root task to kill the daemons on the operator's demand. @@ -230,7 +230,7 @@ async def daemon_killer( There are never 2 instances of the same daemon running in parallel. In normal cases (enough time is given to stop), this is usually done - by the post-freeze re-listing event. In rare cases when the unfreeze + by the post-pause re-listing event. In rare cases when the re-pausing happens faster than the daemon is stopped (highly unlikely to happen), that event can be missed because the daemon is being stopped yet, so the respawn can happen with a significant delay. @@ -244,8 +244,8 @@ async def daemon_killer( try: while True: - # Stay here while the operator is running normally, until it is frozen. - await freeze_checker.wait_for(True) + # Stay here while the operator is running normally, until it is paused. + await operator_paused.wait_for(True) # The stopping tasks are "fire-and-forget" -- we do not get (or care of) the result. # The daemons remain resumable, since they exit not on their own accord. @@ -256,9 +256,9 @@ async def daemon_killer( daemon=daemon, reason=primitives.DaemonStoppingReason.OPERATOR_PAUSING)) - # Stay here while the operator is frozen, until it is resumed. + # Stay here while the operator is paused, until it is resumed. # The fresh stream of watch-events will spawn new daemons naturally. - await freeze_checker.wait_for(False) + await operator_paused.wait_for(False) # Terminate all running daemons when the operator exits (and this task is cancelled). finally: @@ -378,7 +378,7 @@ async def _runner( finally: # Prevent future re-spawns for those exited on their own, for no reason. - # Only the filter-mismatching or peering-frozen daemons can be re-spawned. + # Only the filter-mismatching or peering-pausing daemons can be re-spawned. if stopper.reason == primitives.DaemonStoppingReason.NONE: memory.forever_stopped.add(handler.id) diff --git a/kopf/reactor/orchestration.py b/kopf/reactor/orchestration.py index 9a20a31f..cff532bd 100644 --- a/kopf/reactor/orchestration.py +++ b/kopf/reactor/orchestration.py @@ -45,10 +45,10 @@ class EnsembleKey(NamedTuple): @dataclasses.dataclass class Ensemble: - # Multidimentional freeze: for every namespace, and a few for the whole cluster (for CRDs). - freeze_checker: primitives.ToggleSet - freeze_blocker: primitives.Toggle - freeze_toggles: Dict[EnsembleKey, primitives.Toggle] = dataclasses.field(default_factory=dict) + # Multidimentional pausing: for every namespace, and a few for the whole cluster (for CRDs). + operator_paused: primitives.ToggleSet + peering_missing: primitives.Toggle + conflicts_found: Dict[EnsembleKey, primitives.Toggle] = dataclasses.field(default_factory=dict) # Multidimensional tasks -- one for every combination of relevant dimensions. watcher_tasks: Dict[EnsembleKey, aiotasks.Task] = dataclasses.field(default_factory=dict) @@ -59,15 +59,15 @@ def get_keys(self) -> Collection[EnsembleKey]: return (frozenset(self.watcher_tasks) | frozenset(self.peering_tasks) | frozenset(self.pinging_tasks) | - frozenset(self.freeze_toggles)) + frozenset(self.conflicts_found)) def get_tasks(self, keys: Container[EnsembleKey]) -> Collection[aiotasks.Task]: return {task for tasks in [self.watcher_tasks, self.peering_tasks, self.pinging_tasks] for key, task in tasks.items() if key in keys} - def get_toggles(self, keys: Container[EnsembleKey]) -> Collection[primitives.Toggle]: - return {toggle for key, toggle in self.freeze_toggles.items() if key in keys} + def get_flags(self, keys: Container[EnsembleKey]) -> Collection[primitives.Toggle]: + return {toggle for key, toggle in self.conflicts_found.items() if key in keys} def del_keys(self, keys: Container[EnsembleKey]) -> None: d: MutableMapping[EnsembleKey, Any] @@ -75,7 +75,7 @@ def del_keys(self, keys: Container[EnsembleKey]) -> None: for key in set(d): if key in keys: del d[key] - for d in [self.freeze_toggles]: # separated for easier type inferrence + for d in [self.conflicts_found]: # separated for easier type inferrence for key in set(d): if key in keys: del d[key] @@ -87,10 +87,10 @@ async def ochestrator( settings: configuration.OperatorSettings, identity: peering.Identity, insights: references.Insights, - freeze_checker: primitives.ToggleSet, + operator_paused: primitives.ToggleSet, ) -> None: - freeze_blocker = await freeze_checker.make_toggle(name='peering CRD is absent') - ensemble = Ensemble(freeze_blocker=freeze_blocker, freeze_checker=freeze_checker) + peering_missing = await operator_paused.make_toggle(name='peering CRD is missing') + ensemble = Ensemble(peering_missing=peering_missing, operator_paused=operator_paused) try: async with insights.revised: while True: @@ -122,9 +122,9 @@ async def adjust_tasks( peering_resource = insights.backbone.get(peering_selector) if peering_selector else None peering_resources = {peering_resource} if peering_resource is not None else set() - # Freeze or resume all streams if the peering CRDs are absent but required. - # Ignore the CRD absence in auto-detection mode: freeze only when (and if) the CRDs are added. - await ensemble.freeze_blocker.turn_to(settings.peering.mandatory and not peering_resources) + # Pause or resume all streams if the peering CRDs are absent but required. + # Ignore the CRD absence in auto-detection mode: pause only when (and if) the CRDs are added. + await ensemble.peering_missing.turn_to(settings.peering.mandatory and not peering_resources) # Stop & start the tasks to match the task matrix with the cluster insights. # As a rule of thumb, stop the tasks first, start later -- not vice versa! @@ -155,9 +155,9 @@ async def terminate_redundancies( if key.namespace not in remaining_namespaces or key.resource not in remaining_resources} redundant_tasks = ensemble.get_tasks(redundant_keys) - redundant_toggles = ensemble.get_toggles(redundant_keys) + redundant_flags = ensemble.get_flags(redundant_keys) await aiotasks.stop(redundant_tasks, title="streaming", logger=logger, interval=10, quiet=True) - await ensemble.freeze_checker.drop_toggles(redundant_toggles) + await ensemble.operator_paused.drop_toggles(redundant_flags) ensemble.del_keys(redundant_keys) @@ -173,9 +173,9 @@ async def spawn_missing_peerings( dkey = EnsembleKey(resource=resource, namespace=namespace) if dkey not in ensemble.peering_tasks: what = f"{settings.peering.name}@{namespace}" - is_pre_frozen = settings.peering.mandatory - freeze_toggle = await ensemble.freeze_checker.make_toggle(is_pre_frozen, name=what) - ensemble.freeze_toggles[dkey] = freeze_toggle + is_preactivated = settings.peering.mandatory + conflicts_found = await ensemble.operator_paused.make_toggle(is_preactivated, name=what) + ensemble.conflicts_found[dkey] = conflicts_found ensemble.pinging_tasks[dkey] = aiotasks.create_guarded_task( name=f"peering keep-alive for {what}", logger=logger, cancellable=True, coro=peering.keepalive( @@ -190,7 +190,7 @@ async def spawn_missing_peerings( resource=resource, namespace=namespace, processor=functools.partial(peering.process_peering_event, - freeze_toggle=freeze_toggle, + conflicts_found=conflicts_found, namespace=namespace, resource=resource, settings=settings, @@ -216,7 +216,7 @@ async def spawn_missing_watchers( ensemble.watcher_tasks[dkey] = aiotasks.create_guarded_task( name=f"watcher for {what}", logger=logger, cancellable=True, coro=queueing.watcher( - freeze_checker=ensemble.freeze_checker, + operator_paused=ensemble.operator_paused, settings=settings, resource=resource, namespace=namespace, diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index ed4e5bc8..44e4dd64 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -129,7 +129,7 @@ async def watcher( settings: configuration.OperatorSettings, resource: references.Resource, processor: WatchStreamProcessor, - freeze_checker: Optional[primitives.ToggleSet] = None, # None for tests & observation + operator_paused: Optional[primitives.ToggleSet] = None, # None for tests & observation ) -> None: """ The watchers watches for the resource events via the API, and spawns the workers for every object. @@ -172,7 +172,7 @@ def exception_handler(scheduler: aiojobs.Scheduler, context: _aiojobs_Context) - stream = watching.infinite_watch( settings=settings, resource=resource, namespace=namespace, - freeze_checker=freeze_checker, + operator_paused=operator_paused, ) async for raw_event in stream: key: ObjectRef = (resource, get_uid(raw_event)) diff --git a/kopf/reactor/running.py b/kopf/reactor/running.py index 3c0b33f0..2d5cf594 100644 --- a/kopf/reactor/running.py +++ b/kopf/reactor/running.py @@ -165,7 +165,7 @@ async def spawn_tasks( FutureWarning) clusterwide = True - # The freezer and the registry are scoped to this whole task-set, to sync them all. + # All tasks of the operator are synced via these primitives and structures: lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle() registry = registry if registry is not None else registries.get_default_registry() settings = settings if settings is not None else configuration.OperatorSettings() @@ -177,7 +177,7 @@ async def spawn_tasks( event_queue: posting.K8sEventQueue = asyncio.Queue() signal_flag: aiotasks.Future = asyncio.Future() started_flag: asyncio.Event = asyncio.Event() - freeze_checker = primitives.ToggleSet() + operator_paused = primitives.ToggleSet() tasks: MutableSequence[aiotasks.Task] = [] # Map kwargs into the settings object. @@ -225,7 +225,7 @@ async def spawn_tasks( coro=daemons.daemon_killer( settings=settings, memories=memories, - freeze_checker=freeze_checker))) + operator_paused=operator_paused))) # Keeping the credentials fresh and valid via the authentication handlers on demand. tasks.append(aiotasks.create_guarded_task( @@ -283,7 +283,7 @@ async def spawn_tasks( settings=settings, insights=insights, identity=identity, - freeze_checker=freeze_checker, + operator_paused=operator_paused, processor=functools.partial(processing.process_resource_event, lifecycle=lifecycle, registry=registry, diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index 6ac1a7f6..6329a6f7 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -99,14 +99,14 @@ class PeeringSettings: Note that some occasions are logged unconditionally: - * those affecting the operator's behaviour, such as freezes/resumes; + * those affecting the operator's behaviour, such as pauses/resumes; * those requiring human intervention, such as absence of a peering object in the auto-detection mode (to make the peering mandatory or standalone). """ priority: int = 0 """ - The operator's priority to use. The operators with lower priority freeze + The operator's priority to use. The operators with lower priority pause when they see operators with higher or the same priority -- to avoid double-processing and double-handling of the resources. """ @@ -115,7 +115,7 @@ class PeeringSettings: """ For how long (in seconds) the operator's record is considered actual by other operators before assuming that the corresponding operator - is not functioning and the freeze mode should be re-evaluated. + is not functioning and the paused state should be re-evaluated. The peered operators will update their records as long as they are running, slightly faster than their records expires (5-10 seconds earlier). diff --git a/kopf/structs/primitives.py b/kopf/structs/primitives.py index 29eba97b..f7e91957 100644 --- a/kopf/structs/primitives.py +++ b/kopf/structs/primitives.py @@ -91,7 +91,7 @@ class Toggle: But these events cannot be awaited until cleared. The bi-directional toggles are needed in some places in the code, such as - in the population/depletion of a `Vault`, or as an operator's freeze-mode. + in the population/depletion of a `Vault`, or as in the operator's pause. The optional name is used only for hinting in reprs. It can be used when there are many toggles, and they need to be distinguished somehow. @@ -153,7 +153,7 @@ class ToggleSet(Collection[Toggle]): The multi-toggle is used mostly in peering, where every individual peering identified by name and namespace has its own individual toggle to manage, - but the whole set of toggles of all names & namespaces is used for freezing + but the whole set of toggles of all names & namespaces is used for pausing the operators as one single logical toggle. Note: the set can only contain toggles that were produced by the set; diff --git a/tests/handling/daemons/conftest.py b/tests/handling/daemons/conftest.py index 22b18127..92b0de21 100644 --- a/tests/handling/daemons/conftest.py +++ b/tests/handling/daemons/conftest.py @@ -79,22 +79,22 @@ async def _simulate_cycle(event_object: RawBody): @pytest.fixture() -async def freeze_checker(): +async def operator_paused(): return ToggleSet() @pytest.fixture() -async def freeze_toggle(freeze_checker: ToggleSet): - return await freeze_checker.make_toggle(name="freeze_toggle fixture") +async def conflicts_found(operator_paused: ToggleSet): + return await operator_paused.make_toggle(name="conflicts_found fixture") @pytest.fixture() -async def background_daemon_killer(settings, memories, freeze_checker): +async def background_daemon_killer(settings, memories, operator_paused): """ Run the daemon killer in the background. """ task = asyncio.create_task(daemon_killer( - settings=settings, memories=memories, freeze_checker=freeze_checker)) + settings=settings, memories=memories, operator_paused=operator_paused)) yield task.cancel() try: diff --git a/tests/handling/daemons/test_daemon_termination.py b/tests/handling/daemons/test_daemon_termination.py index 3ec39dea..22f360a2 100644 --- a/tests/handling/daemons/test_daemon_termination.py +++ b/tests/handling/daemons/test_daemon_termination.py @@ -6,7 +6,7 @@ import kopf -async def test_daemon_exits_gracefully_and_instantly_via_stopper( +async def test_daemon_exits_gracefully_and_instantly_on_termination_request( settings, resource, dummy, simulate_cycle, caplog, assert_logs, k8s_mocked, frozen_time, mocker, timer): caplog.set_level(logging.DEBUG) @@ -40,8 +40,8 @@ async def fn(**kwargs): @pytest.mark.usefixtures('background_daemon_killer') -async def test_daemon_exits_gracefully_and_instantly_via_peering_freeze( - settings, memories, resource, dummy, simulate_cycle, freeze_toggle, +async def test_daemon_exits_gracefully_and_instantly_on_operator_pausing( + settings, memories, resource, dummy, simulate_cycle, conflicts_found, caplog, assert_logs, k8s_mocked, frozen_time, mocker, timer): caplog.set_level(logging.DEBUG) @@ -58,9 +58,9 @@ async def fn(**kwargs): await simulate_cycle(event_object) await dummy.steps['called'].wait() - # 1st stage: trigger termination due to peering freeze. + # 1st stage: trigger termination due to the operator's pause. mocker.resetall() - await freeze_toggle.turn_to(True) + await conflicts_found.turn_to(True) # Check that the daemon has exited near-instantly, with no delays. with timer: @@ -68,7 +68,7 @@ async def fn(**kwargs): assert timer.seconds < 0.01 # near-instantly # There is no way to test for re-spawning here: it is done by watch-events, - # which are tested by the peering freezes elsewhere (test_daemon_spawning.py). + # which are tested by the paused operators elsewhere (test_daemon_spawning.py). # We only test that it is capable for respawning (not forever-stopped): memory = await memories.recall(event_object) assert not memory.forever_stopped diff --git a/tests/k8s/test_watching_continuously.py b/tests/k8s/test_watching_continuously.py index 1d6e104d..af607476 100644 --- a/tests/k8s/test_watching_continuously.py +++ b/tests/k8s/test_watching_continuously.py @@ -50,7 +50,7 @@ async def test_empty_stream_yields_nothing( async for event in continuous_watch(settings=settings, resource=resource, namespace=namespace, - freeze_waiter=asyncio.Future()): + operator_pause_waiter=asyncio.Future()): events.append(event) assert len(events) == 0 @@ -66,7 +66,7 @@ async def test_event_stream_yields_everything( async for event in continuous_watch(settings=settings, resource=resource, namespace=namespace, - freeze_waiter=asyncio.Future()): + operator_pause_waiter=asyncio.Future()): events.append(event) assert len(events) == 2 @@ -85,7 +85,7 @@ async def test_unknown_event_type_ignored( async for event in continuous_watch(settings=settings, resource=resource, namespace=namespace, - freeze_waiter=asyncio.Future()): + operator_pause_waiter=asyncio.Future()): events.append(event) assert len(events) == 2 @@ -106,7 +106,7 @@ async def test_error_410gone_exits_normally( async for event in continuous_watch(settings=settings, resource=resource, namespace=namespace, - freeze_waiter=asyncio.Future()): + operator_pause_waiter=asyncio.Future()): events.append(event) assert len(events) == 1 @@ -125,7 +125,7 @@ async def test_unknown_error_raises_exception( async for event in continuous_watch(settings=settings, resource=resource, namespace=namespace, - freeze_waiter=asyncio.Future()): + operator_pause_waiter=asyncio.Future()): events.append(event) assert len(events) == 1 @@ -145,7 +145,7 @@ async def test_exception_escalates( async for event in continuous_watch(settings=settings, resource=resource, namespace=namespace, - freeze_waiter=asyncio.Future()): + operator_pause_waiter=asyncio.Future()): events.append(event) assert len(events) == 0 @@ -167,7 +167,7 @@ async def test_long_line_parsing( async for event in continuous_watch(settings=settings, resource=resource, namespace=namespace, - freeze_waiter=asyncio.Future()): + operator_pause_waiter=asyncio.Future()): events.append(event) assert len(events) == 3 diff --git a/tests/k8s/test_watching_with_freezes.py b/tests/k8s/test_watching_with_freezes.py index dcb4a3e5..007ab35e 100644 --- a/tests/k8s/test_watching_with_freezes.py +++ b/tests/k8s/test_watching_with_freezes.py @@ -5,74 +5,74 @@ import pytest from kopf.clients.watching import streaming_block -from kopf.structs.primitives import Toggle, ToggleSet +from kopf.structs.primitives import ToggleSet -async def test_freezing_is_ignored_if_turned_off( +async def test_pausing_is_ignored_if_turned_off( resource, namespace, timer, caplog, assert_logs): caplog.set_level(logging.DEBUG) - freeze_checker = ToggleSet() - await freeze_checker.make_toggle(False) + operator_paused = ToggleSet() + await operator_paused.make_toggle(False) async with timer, async_timeout.timeout(0.5) as timeout: async with streaming_block( resource=resource, namespace=namespace, - freeze_checker=freeze_checker, + operator_paused=operator_paused, ): pass assert not timeout.expired assert timer.seconds < 0.2 # no waits, exits as soon as possible assert_logs([], prohibited=[ - r"Freezing the watch-stream for", + r"Pausing the watch-stream for", r"Resuming the watch-stream for", ]) -async def test_freezing_waits_forever_if_not_resumed( +async def test_pausing_waits_forever_if_not_resumed( resource, namespace, timer, caplog, assert_logs): caplog.set_level(logging.DEBUG) - freeze_checker = ToggleSet() - await freeze_checker.make_toggle(True) + operator_paused = ToggleSet() + await operator_paused.make_toggle(True) with pytest.raises(asyncio.TimeoutError): async with timer, async_timeout.timeout(0.5) as timeout: async with streaming_block( resource=resource, namespace=namespace, - freeze_checker=freeze_checker, + operator_paused=operator_paused, ): pass assert timeout.expired assert timer.seconds >= 0.5 assert_logs([ - r"Freezing the watch-stream for", + r"Pausing the watch-stream for", ], prohibited=[ r"Resuming the watch-stream for", ]) -async def test_freezing_waits_until_resumed( +async def test_pausing_waits_until_resumed( resource, namespace, timer, caplog, assert_logs): caplog.set_level(logging.DEBUG) - freeze_checker = ToggleSet() - freeze_toggle = await freeze_checker.make_toggle(True) + operator_paused = ToggleSet() + conflicts_found = await operator_paused.make_toggle(True) async def delayed_resuming(delay: float): await asyncio.sleep(delay) - await freeze_toggle.turn_to(False) + await conflicts_found.turn_to(False) async with timer, async_timeout.timeout(1.0) as timeout: asyncio.create_task(delayed_resuming(0.2)) async with streaming_block( resource=resource, namespace=namespace, - freeze_checker=freeze_checker, + operator_paused=operator_paused, ): pass @@ -80,6 +80,6 @@ async def delayed_resuming(delay: float): assert timer.seconds >= 0.2 assert timer.seconds <= 0.5 assert_logs([ - r"Freezing the watch-stream for", + r"Pausing the watch-stream for", r"Resuming the watch-stream for", ]) diff --git a/tests/orchestration/test_task_adjustments.py b/tests/orchestration/test_task_adjustments.py index f418c928..a828b77b 100644 --- a/tests/orchestration/test_task_adjustments.py +++ b/tests/orchestration/test_task_adjustments.py @@ -38,9 +38,9 @@ async def insights(settings, peering_resource): @pytest.fixture() async def ensemble(_no_asyncio_pending_tasks): - freeze_checker = primitives.ToggleSet() - freeze_blocker = await freeze_checker.make_toggle() - ensemble = Ensemble(freeze_checker=freeze_checker, freeze_blocker=freeze_blocker) + operator_paused = primitives.ToggleSet() + peering_missing = await operator_paused.make_toggle() + ensemble = Ensemble(operator_paused=operator_paused, peering_missing=peering_missing) try: yield ensemble @@ -62,7 +62,7 @@ async def test_empty_insights_cause_no_adjustments( assert not ensemble.watcher_tasks assert not ensemble.peering_tasks assert not ensemble.pinging_tasks - assert not ensemble.freeze_toggles + assert not ensemble.conflicts_found async def test_new_resources_and_namespaces_spawn_new_tasks( @@ -93,7 +93,7 @@ async def test_new_resources_and_namespaces_spawn_new_tasks( assert set(ensemble.watcher_tasks) == {r1ns1, r1ns2, r2ns1, r2ns2} assert set(ensemble.peering_tasks) == {peer1, peer2} assert set(ensemble.pinging_tasks) == {peer1, peer2} - assert set(ensemble.freeze_toggles) == {peer1, peer2} + assert set(ensemble.conflicts_found) == {peer1, peer2} async def test_gone_resources_and_namespaces_stop_running_tasks( @@ -138,7 +138,7 @@ async def test_gone_resources_and_namespaces_stop_running_tasks( assert set(ensemble.watcher_tasks) == {r1ns1} assert set(ensemble.peering_tasks) == {peer1} assert set(ensemble.pinging_tasks) == {peer1} - assert set(ensemble.freeze_toggles) == {peer1} + assert set(ensemble.conflicts_found) == {peer1} assert r1ns2_task.cancelled() assert r2ns1_task.cancelled() assert r2ns2_task.cancelled() @@ -181,7 +181,7 @@ async def test_cluster_tasks_continue_running_on_namespace_deletion( assert set(ensemble.watcher_tasks) == {r1nsN, r2nsN} assert set(ensemble.peering_tasks) == {peerN} assert set(ensemble.pinging_tasks) == {peerN} - assert set(ensemble.freeze_toggles) == {peerN} + assert set(ensemble.conflicts_found) == {peerN} assert not r1nsN_task.cancelled() assert not r2nsN_task.cancelled() assert not r1nsN_task.done() @@ -208,18 +208,18 @@ async def test_no_peering_tasks_with_no_peering_resources( assert ensemble.watcher_tasks assert not ensemble.peering_tasks assert not ensemble.pinging_tasks - assert not ensemble.freeze_toggles + assert not ensemble.conflicts_found -async def test_frozen_with_mandatory_peering_but_absent_peering_resource( +async def test_paused_with_mandatory_peering_but_absent_peering_resource( settings, ensemble: Ensemble): settings.peering.mandatory = True insights = Insights() - await ensemble.freeze_blocker.turn_to(False) # prerequisite - assert ensemble.freeze_blocker.is_off() # prerequisite - assert ensemble.freeze_checker.is_off() # prerequisite + await ensemble.peering_missing.turn_to(False) # prerequisite + assert ensemble.peering_missing.is_off() # prerequisite + assert ensemble.operator_paused.is_off() # prerequisite await adjust_tasks( processor=processor, @@ -229,17 +229,17 @@ async def test_frozen_with_mandatory_peering_but_absent_peering_resource( ensemble=ensemble, ) - assert ensemble.freeze_blocker.is_on() - assert ensemble.freeze_checker.is_on() + assert ensemble.peering_missing.is_on() + assert ensemble.operator_paused.is_on() -async def test_unfrozen_with_mandatory_peering_and_existing_peering_resource( +async def test_unpaused_with_mandatory_peering_and_existing_peering_resource( settings, ensemble: Ensemble, insights: Insights, peering_resource): settings.peering.namespaced = peering_resource.namespaced - await ensemble.freeze_blocker.turn_to(True) # prerequisite - assert ensemble.freeze_blocker.is_on() # prerequisite - assert ensemble.freeze_checker.is_on() # prerequisite + await ensemble.peering_missing.turn_to(True) # prerequisite + assert ensemble.peering_missing.is_on() # prerequisite + assert ensemble.operator_paused.is_on() # prerequisite await adjust_tasks( processor=processor, @@ -249,5 +249,5 @@ async def test_unfrozen_with_mandatory_peering_and_existing_peering_resource( ensemble=ensemble, ) - assert ensemble.freeze_blocker.is_off() - assert ensemble.freeze_checker.is_off() + assert ensemble.peering_missing.is_off() + assert ensemble.operator_paused.is_off() diff --git a/tests/peering/test_freeze_mode.py b/tests/peering/test_freeze_mode.py index 68ecc243..888f6ef9 100644 --- a/tests/peering/test_freeze_mode.py +++ b/tests/peering/test_freeze_mode.py @@ -72,14 +72,14 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off( settings.peering.name = 'name' settings.peering.priority = 100 - freeze_toggle = primitives.Toggle(False) + conflicts_found = primitives.Toggle(False) k8s_mocked.sleep_or_wait.return_value = 1 # as if interrupted by stream pressure caplog.set_level(0) - assert freeze_toggle.is_off() + assert conflicts_found.is_off() await process_peering_event( raw_event=event, - freeze_toggle=freeze_toggle, + conflicts_found=conflicts_found, replenished=asyncio.Event(), autoclean=False, namespace=peering_namespace, @@ -87,14 +87,14 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off( identity='id', settings=settings, ) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() assert k8s_mocked.sleep_or_wait.call_count == 1 assert 9 < k8s_mocked.sleep_or_wait.call_args[0][0][0] < 10 assert not k8s_mocked.patch_obj.called - assert_logs(["Freezing operations in favour of"], prohibited=[ + assert_logs(["Pausing operations in favour of"], prohibited=[ "Possibly conflicting operators", - "Freezing all operators, including self", - "Resuming operations after the freeze", + "Pausing all operators, including self", + "Resuming operations after the pause", ]) @@ -118,14 +118,14 @@ async def test_ignored_for_higher_priority_peer_when_already_on( settings.peering.name = 'name' settings.peering.priority = 100 - freeze_toggle = primitives.Toggle(True) + conflicts_found = primitives.Toggle(True) k8s_mocked.sleep_or_wait.return_value = 1 # as if interrupted by stream pressure caplog.set_level(0) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() await process_peering_event( raw_event=event, - freeze_toggle=freeze_toggle, + conflicts_found=conflicts_found, replenished=asyncio.Event(), autoclean=False, namespace=peering_namespace, @@ -133,15 +133,15 @@ async def test_ignored_for_higher_priority_peer_when_already_on( identity='id', settings=settings, ) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() assert k8s_mocked.sleep_or_wait.call_count == 1 assert 9 < k8s_mocked.sleep_or_wait.call_args[0][0][0] < 10 assert not k8s_mocked.patch_obj.called assert_logs([], prohibited=[ "Possibly conflicting operators", - "Freezing all operators, including self", - "Freezing operations in favour of", - "Resuming operations after the freeze", + "Pausing all operators, including self", + "Pausing operations in favour of", + "Resuming operations after the pause", ]) @@ -165,14 +165,14 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on( settings.peering.name = 'name' settings.peering.priority = 100 - freeze_toggle = primitives.Toggle(True) + conflicts_found = primitives.Toggle(True) k8s_mocked.sleep_or_wait.return_value = 1 # as if interrupted by stream pressure caplog.set_level(0) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() await process_peering_event( raw_event=event, - freeze_toggle=freeze_toggle, + conflicts_found=conflicts_found, replenished=asyncio.Event(), autoclean=False, namespace=peering_namespace, @@ -180,14 +180,14 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on( identity='id', settings=settings, ) - assert freeze_toggle.is_off() + assert conflicts_found.is_off() assert k8s_mocked.sleep_or_wait.call_count == 1 assert k8s_mocked.sleep_or_wait.call_args[0][0] == [] assert not k8s_mocked.patch_obj.called - assert_logs(["Resuming operations after the freeze"], prohibited=[ + assert_logs(["Resuming operations after the pause"], prohibited=[ "Possibly conflicting operators", - "Freezing all operators, including self", - "Freezing operations in favour of", + "Pausing all operators, including self", + "Pausing operations in favour of", ]) @@ -211,14 +211,14 @@ async def test_ignored_for_lower_priority_peer_when_already_off( settings.peering.name = 'name' settings.peering.priority = 100 - freeze_toggle = primitives.Toggle(False) + conflicts_found = primitives.Toggle(False) k8s_mocked.sleep_or_wait.return_value = 1 # as if interrupted by stream pressure caplog.set_level(0) - assert freeze_toggle.is_off() + assert conflicts_found.is_off() await process_peering_event( raw_event=event, - freeze_toggle=freeze_toggle, + conflicts_found=conflicts_found, replenished=asyncio.Event(), autoclean=False, namespace=peering_namespace, @@ -226,15 +226,15 @@ async def test_ignored_for_lower_priority_peer_when_already_off( identity='id', settings=settings, ) - assert freeze_toggle.is_off() + assert conflicts_found.is_off() assert k8s_mocked.sleep_or_wait.call_count == 1 assert k8s_mocked.sleep_or_wait.call_args[0][0] == [] assert not k8s_mocked.patch_obj.called assert_logs([], prohibited=[ "Possibly conflicting operators", - "Freezing all operators, including self", - "Freezing operations in favour of", - "Resuming operations after the freeze", + "Pausing all operators, including self", + "Pausing operations in favour of", + "Resuming operations after the pause", ]) @@ -258,14 +258,14 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off( settings.peering.name = 'name' settings.peering.priority = 100 - freeze_toggle = primitives.Toggle(False) + conflicts_found = primitives.Toggle(False) k8s_mocked.sleep_or_wait.return_value = 1 # as if interrupted by stream pressure caplog.set_level(0) - assert freeze_toggle.is_off() + assert conflicts_found.is_off() await process_peering_event( raw_event=event, - freeze_toggle=freeze_toggle, + conflicts_found=conflicts_found, replenished=asyncio.Event(), autoclean=False, namespace=peering_namespace, @@ -273,16 +273,16 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off( identity='id', settings=settings, ) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() assert k8s_mocked.sleep_or_wait.call_count == 1 assert 9 < k8s_mocked.sleep_or_wait.call_args[0][0][0] < 10 assert not k8s_mocked.patch_obj.called assert_logs([ "Possibly conflicting operators", - "Freezing all operators, including self", + "Pausing all operators, including self", ], prohibited=[ - "Freezing operations in favour of", - "Resuming operations after the freeze", + "Pausing operations in favour of", + "Resuming operations after the pause", ]) @@ -306,14 +306,14 @@ async def test_ignored_for_same_priority_peer_when_already_on( settings.peering.name = 'name' settings.peering.priority = 100 - freeze_toggle = primitives.Toggle(True) + conflicts_found = primitives.Toggle(True) k8s_mocked.sleep_or_wait.return_value = 1 # as if interrupted by stream pressure caplog.set_level(0) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() await process_peering_event( raw_event=event, - freeze_toggle=freeze_toggle, + conflicts_found=conflicts_found, replenished=asyncio.Event(), autoclean=False, namespace=peering_namespace, @@ -321,16 +321,16 @@ async def test_ignored_for_same_priority_peer_when_already_on( identity='id', settings=settings, ) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() assert k8s_mocked.sleep_or_wait.call_count == 1 assert 9 < k8s_mocked.sleep_or_wait.call_args[0][0][0] < 10 assert not k8s_mocked.patch_obj.called assert_logs([ "Possibly conflicting operators", ], prohibited=[ - "Freezing all operators, including self", - "Freezing operations in favour of", - "Resuming operations after the freeze", + "Pausing all operators, including self", + "Pausing operations in favour of", + "Resuming operations after the pause", ]) @@ -355,14 +355,14 @@ async def test_resumes_immediately_on_expiration_of_blocking_peers( settings.peering.name = 'name' settings.peering.priority = 100 - freeze_toggle = primitives.Toggle(True) + conflicts_found = primitives.Toggle(True) k8s_mocked.sleep_or_wait.return_value = None # as if finished sleeping uninterrupted caplog.set_level(0) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() await process_peering_event( raw_event=event, - freeze_toggle=freeze_toggle, + conflicts_found=conflicts_found, replenished=asyncio.Event(), autoclean=False, namespace=peering_namespace, @@ -370,7 +370,7 @@ async def test_resumes_immediately_on_expiration_of_blocking_peers( identity='id', settings=settings, ) - assert freeze_toggle.is_on() + assert conflicts_found.is_on() assert k8s_mocked.sleep_or_wait.call_count == 1 assert 9 < k8s_mocked.sleep_or_wait.call_args[0][0][0] < 10 assert k8s_mocked.patch_obj.called