Skip to content

Commit

Permalink
Merge pull request #257 from nolar/freeze-resets
Browse files Browse the repository at this point in the history
Reset watch-streams for the freeze mode, relist on unfreezing
  • Loading branch information
nolar authored Dec 19, 2019
2 parents 60038b5 + 222d5f3 commit d74a063
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 97 deletions.
85 changes: 71 additions & 14 deletions kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
"""

import asyncio
import collections
import contextlib
import json
import logging
from typing import Optional, Dict, AsyncIterator, cast
from typing import Optional, Dict, AsyncIterator, Any, cast, TYPE_CHECKING

import aiohttp

Expand All @@ -31,12 +31,15 @@
from kopf.clients import discovery
from kopf.clients import fetching
from kopf.structs import bodies
from kopf.structs import primitives
from kopf.structs import resources

logger = logging.getLogger(__name__)

# Pykube declares it inside of a function, not importable from the package/module.
PykubeWatchEvent = collections.namedtuple("WatchEvent", "type object")
if TYPE_CHECKING:
asyncio_Future = asyncio.Future[Any]
else:
asyncio_Future = asyncio.Future


class WatchingError(Exception):
Expand All @@ -49,6 +52,7 @@ async def infinite_watch(
*,
resource: resources.Resource,
namespace: Optional[str],
freeze_mode: Optional[primitives.Toggle] = None,
) -> AsyncIterator[bodies.Event]:
"""
Stream the watch-events infinitely.
Expand All @@ -61,7 +65,12 @@ async def infinite_watch(
It only exits with unrecoverable exceptions.
"""
while True:
async for event in streaming_watch(resource=resource, namespace=namespace):
stream = streaming_watch(
resource=resource,
namespace=namespace,
freeze_mode=freeze_mode,
)
async for event in stream:
yield event
await asyncio.sleep(config.WatchersConfig.watcher_retry_delay)

Expand All @@ -70,10 +79,46 @@ async def streaming_watch(
*,
resource: resources.Resource,
namespace: Optional[str],
freeze_mode: Optional[primitives.Toggle] = None,
) -> AsyncIterator[bodies.Event]:

# Prevent both watching and listing while the freeze mode is on, until it is off.
# Specifically, the watch-stream closes its connection once the freeze mode is on,
# so the while-true & for-event-in-stream cycles exit, and this coroutine is started
# again by the `infinite_stream()` (the watcher timeout is swallowed by the freeze time).
if freeze_mode is not None and freeze_mode.is_on():
logger.debug("Freezing the watch-stream for %r", resource)
await freeze_mode.wait_for_off()
logger.debug("Resuming the watch-stream for %r", resource)

# A stop-feature is a client-specific way of terminating the streaming HTTPS connection
# when a freeze-mode is turned on. The low-level API call attaches its `response.close()`
# to the future's callbacks, and a background task triggers it when the mode is turned on.
freeze_waiter: asyncio_Future
if freeze_mode is not None:
freeze_waiter = asyncio.create_task(freeze_mode.wait_for_on())
else:
freeze_waiter = asyncio.Future() # a dummy just ot have it

try:
stream = continuous_watch(
resource=resource, namespace=namespace,
freeze_waiter=freeze_waiter,
)
async for event in stream:
yield event
finally:
with contextlib.suppress(asyncio.CancelledError):
freeze_waiter.cancel()
await freeze_waiter


async def continuous_watch(
*,
resource: resources.Resource,
namespace: Optional[str],
freeze_waiter: asyncio_Future,
) -> AsyncIterator[bodies.Event]:
"""
Stream the watch-events from one single API watch-call.
"""

# First, list the resources regularly, and get the list's resource version.
# Simulate the events with type "None" event - used in detection of causes.
Expand All @@ -83,13 +128,14 @@ async def streaming_watch(

# Repeat through disconnects of the watch as long as the resource version is valid (no errors).
# The individual watching API calls are disconnected by timeout even if the stream is fine.
while True:
while not freeze_waiter.done():

# Then, watch the resources starting from the list's resource version.
stream = watch_objs(
resource=resource, namespace=namespace,
timeout=config.WatchersConfig.default_stream_timeout,
since=resource_version,
freeze_waiter=freeze_waiter,
)
async for event in stream:

Expand All @@ -98,7 +144,7 @@ async def streaming_watch(
# The error occurs when there is nothing happening for few minutes. This is normal.
if event['type'] == 'ERROR' and cast(bodies.Error, event['object'])['code'] == 410:
logger.debug("Restarting the watch-stream for %r", resource)
return # out of regular stream, to the infinite stream.
return # out of the regular stream, to the infinite stream.

# Other watch errors should be fatal for the operator.
if event['type'] == 'ERROR':
Expand All @@ -125,6 +171,7 @@ async def watch_objs(
timeout: Optional[float] = None,
since: Optional[str] = None,
session: Optional[auth.APISession] = None, # injected by the decorator
freeze_waiter: asyncio_Future,
) -> AsyncIterator[bodies.RawEvent]:
"""
Watch objects of a specific resource type.
Expand All @@ -151,16 +198,26 @@ async def watch_objs(
if timeout is not None:
params['timeoutSeconds'] = str(timeout)

# Talk to the API and initiate a streaming response.
response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace, params=params),
timeout=aiohttp.ClientTimeout(total=None),
)
response.raise_for_status()

async with response:
async for line in _iter_jsonlines(response.content):
event = cast(bodies.RawEvent, json.loads(line.decode("utf-8")))
yield event
# Stream the parsed events from the response until it is closed server-side,
# or until it is closed client-side by the freeze-waiting future's callbacks.
response_close_callback = lambda _: response.close()
freeze_waiter.add_done_callback(response_close_callback)
try:
async with response:
async for line in _iter_jsonlines(response.content):
event = cast(bodies.RawEvent, json.loads(line.decode("utf-8")))
yield event
except (aiohttp.ClientConnectionError, aiohttp.ClientPayloadError):
pass
finally:
freeze_waiter.remove_done_callback(response_close_callback)


async def _iter_jsonlines(
Expand Down
6 changes: 0 additions & 6 deletions kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ async def resource_handler(
memories: containers.ResourceMemories,
resource: resources.Resource,
event: bodies.Event,
freeze_mode: primitives.Toggle,
replenished: asyncio.Event,
event_queue: posting.K8sEventQueue,
) -> None:
Expand All @@ -172,11 +171,6 @@ async def resource_handler(
posting.event_queue_loop_var.set(asyncio.get_running_loop())
posting.event_queue_var.set(event_queue) # till the end of this object's task.

# If the global freeze is set for the processing (i.e. other operator overrides), do nothing.
if freeze_mode.is_on():
logger.debug("Ignoring the events due to freeze.")
return

# Recall what is stored about that object. Share it in little portions with the consumers.
# And immediately forget it if the object is deleted from the cluster (but keep in memory).
memory = await memories.recall(body, noticed_by_listing=event['type'] is None)
Expand Down
10 changes: 8 additions & 2 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
import enum
import logging
import time
from typing import Tuple, Union, MutableMapping, NewType, NamedTuple, TYPE_CHECKING, cast
from typing import Tuple, Union, MutableMapping, NewType, NamedTuple, TYPE_CHECKING, cast, Optional

import aiojobs
from typing_extensions import Protocol

from kopf import config
from kopf.clients import watching
from kopf.structs import bodies
from kopf.structs import primitives
from kopf.structs import resources

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -77,6 +78,7 @@ async def watcher(
namespace: Union[None, str],
resource: resources.Resource,
handler: WatcherCallback,
freeze_mode: Optional[primitives.Toggle] = None,
) -> None:
"""
The watchers watches for the resource events via the API, and spawns the handlers for every object.
Expand All @@ -98,7 +100,11 @@ async def watcher(
try:
# Either use the existing object's queue, or create a new one together with the per-object job.
# "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
async for event in watching.infinite_watch(resource=resource, namespace=namespace):
stream = watching.infinite_watch(
resource=resource, namespace=namespace,
freeze_mode=freeze_mode,
)
async for event in stream:
key = cast(ObjectRef, (resource, event['object']['metadata']['uid']))
try:
streams[key].replenished.set() # interrupt current sleeps, if any.
Expand Down
4 changes: 2 additions & 2 deletions kopf/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ async def spawn_tasks(
coro=queueing.watcher(
namespace=namespace,
resource=resource,
freeze_mode=freeze_mode,
handler=functools.partial(handling.resource_handler,
lifecycle=lifecycle,
registry=registry,
memories=memories,
resource=resource,
event_queue=event_queue,
freeze_mode=freeze_mode)))),
event_queue=event_queue)))),
])

# On Ctrl+C or pod termination, cancel all tasks gracefully.
Expand Down
9 changes: 0 additions & 9 deletions tests/handling/test_cause_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from kopf.structs.containers import ResourceMemories
from kopf.structs.finalizers import FINALIZER
from kopf.structs.lastseen import LAST_SEEN_ANNOTATION
from kopf.structs.primitives import Toggle

EVENT_TYPES = [None, 'ADDED', 'MODIFIED', 'DELETED']

Expand All @@ -28,7 +27,6 @@ async def test_acquire(registry, handlers, resource, cause_mock, event_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=event_queue,
)
Expand Down Expand Up @@ -66,7 +64,6 @@ async def test_create(registry, handlers, resource, cause_mock, event_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=event_queue,
)
Expand Down Expand Up @@ -108,7 +105,6 @@ async def test_update(registry, handlers, resource, cause_mock, event_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=event_queue,
)
Expand Down Expand Up @@ -150,7 +146,6 @@ async def test_delete(registry, handlers, resource, cause_mock, event_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=event_queue,
)
Expand Down Expand Up @@ -201,7 +196,6 @@ async def test_release(registry, resource, handlers, cause_mock, event_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=event_queue,
)
Expand Down Expand Up @@ -243,7 +237,6 @@ async def test_gone(registry, handlers, resource, cause_mock, event_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=event_queue,
)
Expand Down Expand Up @@ -275,7 +268,6 @@ async def test_free(registry, handlers, resource, cause_mock, event_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=event_queue,
)
Expand Down Expand Up @@ -307,7 +299,6 @@ async def test_noop(registry, handlers, resource, cause_mock, event_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=event_queue,
)
Expand Down
4 changes: 0 additions & 4 deletions tests/handling/test_cause_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from kopf.reactor.causation import ALL_REASONS, HANDLER_REASONS, Reason
from kopf.reactor.handling import resource_handler
from kopf.structs.containers import ResourceMemories
from kopf.structs.primitives import Toggle


@pytest.mark.parametrize('cause_type', ALL_REASONS)
Expand All @@ -22,7 +21,6 @@ async def test_all_logs_are_prefixed(registry, resource, handlers,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
Expand Down Expand Up @@ -52,7 +50,6 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
Expand Down Expand Up @@ -82,7 +79,6 @@ async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_ty
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
Expand Down
3 changes: 0 additions & 3 deletions tests/handling/test_delays.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from kopf.reactor.states import HandlerState
from kopf.structs.containers import ResourceMemories
from kopf.structs.finalizers import FINALIZER
from kopf.structs.primitives import Toggle


@pytest.mark.parametrize('cause_reason', HANDLER_REASONS)
Expand All @@ -40,7 +39,6 @@ async def test_delayed_handlers_progress(
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
Expand Down Expand Up @@ -96,7 +94,6 @@ async def test_delayed_handlers_sleep(
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
freeze_mode=Toggle(),
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
Expand Down
Loading

0 comments on commit d74a063

Please sign in to comment.