Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reunite the watch-streaming-freezing coroutines #609

Merged
merged 3 commits into from
Dec 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async def infinite_watch(
resource: resources.Resource,
namespace: Optional[str],
freeze_mode: Optional[primitives.Toggle] = None,
_iterations: Optional[int] = None, # used in tests/mocks/fixtures
) -> AsyncIterator[bodies.RawEvent]:
"""
Stream the watch-events infinitely.
Expand All @@ -56,54 +57,70 @@ async def infinite_watch(
a new one is recreated, and the stream continues.
It only exits with unrecoverable exceptions.
"""
while True:
stream = streaming_watch(
settings=settings,
resource=resource,
namespace=namespace,
freeze_mode=freeze_mode,
)
async for raw_event in stream:
yield raw_event
while _iterations is None or _iterations > 0: # equivalent to `while True` in non-test mode.
_iterations = None if _iterations is None else _iterations - 1
async with streaming_block(
namespace=namespace,
resource=resource,
freeze_mode=freeze_mode,
) as freeze_waiter:
stream = continuous_watch(
settings=settings,
resource=resource,
namespace=namespace,
freeze_waiter=freeze_waiter,
)
async for raw_event in stream:
yield raw_event
await asyncio.sleep(settings.watching.reconnect_backoff)


async def streaming_watch(
@contextlib.asynccontextmanager
async def streaming_block(
*,
settings: configuration.OperatorSettings,
resource: resources.Resource,
namespace: Optional[str],
freeze_mode: Optional[primitives.Toggle] = None,
) -> AsyncIterator[bodies.RawEvent]:
freeze_mode: Optional[primitives.Toggle],
) -> AsyncIterator[aiotasks.Future]:
"""
Block the execution until the freeze is off; signal when it is on again.

This prevents both watching and listing while the freeze mode is on,
until it is off. Specifically, the watch-stream closes its connection
once the freeze mode is on, so the while-true & for-event-in-stream cycles
exit, and the streaming coroutine is started again by `infinite_stream()`
(the watcher timeout is swallowed by the freeze time).

Returns a future (or a task) that is set when the freeze is turned on again.

# Prevent both watching and listing while the freeze mode is on, until it is off.
# Specifically, the watch-stream closes its connection once the freeze mode is on,
# so the while-true & for-event-in-stream cycles exit, and this coroutine is started
# again by the `infinite_stream()` (the watcher timeout is swallowed by the freeze time).
A stop-future is a client-specific way of terminating the streaming HTTPS
connections when the freeze is turned back on. The low-level streaming API
call attaches its `response.close()` to the future's "done" callback,
so that the stream is closed once the freeze is turned back on.

Note: this routine belongs to watching and does not belong to peering.
The freeze can be managed in any other ways: as an imaginary edge case,
imagine a operator with UI with a "pause" button that freezes the operator.
"""

# Block until unfrozen before even starting the API communication.
if freeze_mode is not None and freeze_mode.is_on():
logger.debug("Freezing the watch-stream for %r", resource)
await freeze_mode.wait_for_off()
logger.debug("Resuming the watch-stream for %r", resource)

# A stop-feature is a client-specific way of terminating the streaming HTTPS connection
# when a freeze-mode is turned on. The low-level API call attaches its `response.close()`
# to the future's callbacks, and a background task triggers it when the mode is turned on.
# Create the signalling future that the freeze is on again.
freeze_waiter: aiotasks.Future
if freeze_mode is not None:
freeze_waiter = aiotasks.create_task(
freeze_mode.wait_for_on(),
name=f'freeze-waiter for {resource.name} @ {namespace or "cluster-wide"}')
else:
freeze_waiter = asyncio.Future() # a dummy just ot have it
freeze_waiter = asyncio.Future() # a dummy just to have it

# Go for the streaming with the prepared freezing/unfreezing setup.
try:
stream = continuous_watch(
settings=settings,
resource=resource, namespace=namespace,
freeze_waiter=freeze_waiter,
)
async for raw_event in stream:
yield raw_event
yield freeze_waiter
finally:
with contextlib.suppress(asyncio.CancelledError):
freeze_waiter.cancel()
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def feed(*args, namespace=None):
# TODO: One day, find a better way to terminate a ``while-true`` reconnection cycle.
def close(*, namespace=None):
"""
A way to stop `streaming_watch` from reconnecting: say it the resource version is gone
A way to stop the stream from reconnecting: say it that the resource version is gone
(we know a priori that it stops on this condition, and escalates to `infinite_stream`).
"""
feed([{'type': 'ERROR', 'object': {'code': 410}}], namespace=namespace)
Expand Down
8 changes: 8 additions & 0 deletions tests/k8s/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@
@pytest.fixture(autouse=True)
def _autouse_resp_mocker(resp_mocker, version_api):
pass


@pytest.fixture(params=[
pytest.param('something', id='namespace'),
pytest.param(None, id='cluster'),
])
def namespace(request):
return request.param
Loading