Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #337 from nolar/settings-naming-followup
Browse files Browse the repository at this point in the history
Better naming for timeouts + connection timeout
  • Loading branch information
nolar authored Mar 29, 2020
2 parents 37edf2d + 6f8ca99 commit 842a907
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 34 deletions.
44 changes: 27 additions & 17 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
Configuration
=============

There are tools to configure some of kopf functionality, like asynchronous
tasks behaviour and logging events.
It is possible to fine-tune some aspects of Kopf-based operators,
like timeouts, synchronous handler pool sizes, automatic Kubernetes Event
creation from object-related log messages, etc.


Startup configuration
=====================

Every operator has its settings (even if there are more than one operator
Every operator has its settings (even if there is more than one operator
in the same processes, e.g. due to :doc:`embedding`). The settings affect
how the framework behaves in details.

Expand All @@ -23,8 +24,8 @@ The settings can be modified in the startup handlers (see :doc:`startup`):
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.posting.level = logging.WARNING
settings.watching.session_timeout = 1 * 60
settings.watching.stream_timeout = 10 * 60
settings.watching.connect_timeout = 1 * 60
settings.watching.server_timeout = 10 * 60
All the settings have reasonable defaults, so the configuration should be used
only for fine-tuning when and if necessary.
Expand Down Expand Up @@ -72,8 +73,9 @@ The event-posting can be disabled completely (the default is to be enabled):
Synchronous handlers
====================

``settings.execution`` allows to set a number of synchronous workers used
and redefined the asyncio executor:
``settings.execution`` allows to set the number of synchronous workers used
by the operator for synchronous handlers, or replace the asyncio executor
with another one:

.. code-block:: python
Expand Down Expand Up @@ -106,24 +108,32 @@ API timeouts

Few timeouts can be controlled when communicating with Kubernetes API:

``settings.watching.session_timeout`` (seconds) is how long the session
with a watching request will exist before terminating from the **client** side.
``settings.watching.server_timeout`` (seconds) is how long the session
with a watching request will exist before closing it from the **server** side.
This value is passed to the server side in a query string, and the server
decides on how to follow it. The watch-stream is then gracefully closed.
The default is to use the server setup (``None``).

``settings.watching.client_timeout`` (seconds) is how long the session
with a watching request will exist before closing it from the **client** side.
This includes the connection establishing and event streaming.
The default is forever (``None``).

``settings.watching.stream_timeout`` (seconds) is how long the session
with a watching request will exist before terminating from the **server** side.
The default is to let the server decide (``None``).
``settings.watching.connect_timeout`` (seconds) is how long a connection
can be established before failing. (With current aiohttp-based implementation,
this corresponds to ``sock_connect=`` timeout, not to ``connect=`` timeout,
which would also include the time for getting a connection from the pool.)

It makes no sense to set the client-side timeout shorter than the server side
timeout, but it is given to the developers' responsibility to decide.

The server-side timeouts are unpredictable, they can be in 10 seconds or
in 10 minutes. Yet, it feels wrong to assume any "good" values in a framework
The server-side timeouts are unpredictable, they can be 10 seconds or
10 minutes. Yet, it feels wrong to assume any "good" values in a framework
(especially since it works without timeouts defined, just produces extra logs).

``settings.watching.retry_delay`` (seconds) is for how long to sleep between
``settings.watching.reconnect_backoff`` (seconds) is a backoff interval between
watching requests -- in order to prevent API flooding in case of errors
or disconnects. The default is 0.1 seconds (nearly instant, but no flooding).
or disconnects. The default is 0.1 seconds (nearly instant, but not flooding).

.. code-block:: python
Expand All @@ -132,4 +142,4 @@ or disconnects. The default is 0.1 seconds (nearly instant, but no flooding).
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.watching.stream_timeout = 10 * 60
settings.watching.server_timeout = 10 * 60
2 changes: 1 addition & 1 deletion examples/09-testing/test_example_09.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_resource_lifecycle():

# To prevent lengthy threads in the loop executor when the process exits.
settings = kopf.OperatorSettings()
settings.watching.stream_timeout = 10
settings.watching.server_timeout = 10

# Run an operator and simulate some activity with the operated resource.
with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py],
Expand Down
2 changes: 1 addition & 1 deletion examples/11-filtering-handlers/test_example_11.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_handler_filtering():

# To prevent lengthy threads in the loop executor when the process exits.
settings = kopf.OperatorSettings()
settings.watching.stream_timeout = 10
settings.watching.server_timeout = 10

# Run an operator and simulate some activity with the operated resource.
with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py],
Expand Down
9 changes: 6 additions & 3 deletions kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def infinite_watch(
)
async for raw_event in stream:
yield raw_event
await asyncio.sleep(settings.watching.retry_delay)
await asyncio.sleep(settings.watching.reconnect_backoff)


async def streaming_watch(
Expand Down Expand Up @@ -139,7 +139,7 @@ async def continuous_watch(
stream = watch_objs(
settings=settings,
resource=resource, namespace=namespace,
timeout=settings.watching.stream_timeout,
timeout=settings.watching.server_timeout,
since=resource_version,
freeze_waiter=freeze_waiter,
)
Expand Down Expand Up @@ -210,7 +210,10 @@ async def watch_objs(
# Talk to the API and initiate a streaming response.
response = await context.session.get(
url=resource.get_url(server=context.server, namespace=namespace, params=params),
timeout=aiohttp.ClientTimeout(total=settings.watching.session_timeout),
timeout=aiohttp.ClientTimeout(
total=settings.watching.client_timeout,
sock_connect=settings.watching.connect_timeout,
),
)
response.raise_for_status()

Expand Down
19 changes: 12 additions & 7 deletions kopf/structs/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,25 @@ class PostingSettings:
@dataclasses.dataclass
class WatchingSettings:

session_timeout: Optional[float] = dataclasses.field(
server_timeout: Optional[float] = dataclasses.field(
default_factory=lambda: config.WatchersConfig.default_stream_timeout)
"""
The maximum duration of one streaming request. Patched in some tests.
If ``None``, then obey the server-side timeouts (they seem to be random).
"""

client_timeout: Optional[float] = dataclasses.field(
default_factory=lambda: config.WatchersConfig.session_timeout)
"""
An HTTP/HTTPS session timeout to use in watch requests.
"""

stream_timeout: Optional[float] = dataclasses.field(
default_factory=lambda: config.WatchersConfig.default_stream_timeout)
connect_timeout: Optional[float] = None
"""
The maximum duration of one streaming request. Patched in some tests.
If ``None``, then obey the server-side timeouts (they seem to be random).
An HTTP/HTTPS connection timeout to use in watch requests.
"""

retry_delay: float = dataclasses.field(
reconnect_backoff: float = dataclasses.field(
default_factory=lambda: config.WatchersConfig.watcher_retry_delay)
"""
How long should a pause be between watch requests (to prevent API flooding).
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_all_examples_are_runnable(mocker, settings, with_crd, exampledir, caplo
mocker.patch('kopf.reactor.handling.DEFAULT_RETRY_DELAY', 1)

# To prevent lengthy threads in the loop executor when the process exits.
settings.watching.stream_timeout = 10
settings.watching.server_timeout = 10

# Run an operator and simulate some activity with the operated resource.
with KopfRunner(['run', '--standalone', '--verbose', str(example_py)], timeout=60) as runner:
Expand Down
2 changes: 1 addition & 1 deletion tests/reactor/test_queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async def test_garbage_collection_of_streams(settings, stream, events, unique, w
settings.batching.idle_timeout = 0.5
settings.batching.batch_window = 0.1
settings.batching.exit_timeout = 0.5
settings.watching.retry_delay = 1.0 # to prevent src depletion
settings.watching.reconnect_backoff = 1.0 # to prevent src depletion

# Inject the events of unique objects - to produce few streams/workers.
stream.feed(events)
Expand Down
7 changes: 4 additions & 3 deletions tests/settings/test_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
async def test_declared_public_interface_and_promised_defaults():
settings = kopf.OperatorSettings()
assert settings.posting.level == logging.INFO
assert settings.watching.retry_delay == 0.1
assert settings.watching.stream_timeout is None
assert settings.watching.session_timeout is None
assert settings.watching.reconnect_backoff == 0.1
assert settings.watching.connect_timeout is None
assert settings.watching.server_timeout is None
assert settings.watching.client_timeout is None
assert settings.batching.worker_limit is None
assert settings.batching.idle_timeout == 5.0
assert settings.batching.exit_timeout == 2.0
Expand Down

0 comments on commit 842a907

Please sign in to comment.