From 84b79ac772e464d0939ff4b8d177fa2be9fedc53 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 09:09:34 +0100 Subject: [PATCH 01/11] Pass a placeholder settings container down the stack where needed --- kopf/__init__.py | 4 ++++ kopf/clients/watching.py | 8 ++++++++ kopf/engines/logging.py | 8 ++++++-- kopf/engines/posting.py | 6 ++++++ kopf/engines/probing.py | 3 +++ kopf/reactor/activities.py | 9 ++++++++- kopf/reactor/causation.py | 2 ++ kopf/reactor/handling.py | 13 ++++++++++++- kopf/reactor/invocation.py | 6 ++++++ kopf/reactor/processing.py | 10 +++++++++- kopf/reactor/queueing.py | 14 ++++++++++++-- kopf/reactor/running.py | 22 ++++++++++++++++++++++ kopf/structs/configuration.py | 33 +++++++++++++++++++++++++++++++++ tests/settings/test_defaults.py | 5 +++++ 14 files changed, 136 insertions(+), 7 deletions(-) create mode 100644 kopf/structs/configuration.py create mode 100644 tests/settings/test_defaults.py diff --git a/kopf/__init__.py b/kopf/__init__.py index 96a3836d..23a3d0b0 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -62,6 +62,9 @@ build_object_reference, build_owner_reference, ) +from kopf.structs.configuration import ( + OperatorSettings, +) from kopf.structs.credentials import ( LoginError, ConnectionInfo, @@ -121,4 +124,5 @@ 'get_default_registry', 'set_default_registry', 'PRESENT', 'ABSENT', + 'OperatorSettings', ] diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index a4bcb713..75512df2 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -31,6 +31,7 @@ from kopf.clients import discovery from kopf.clients import fetching from kopf.structs import bodies +from kopf.structs import configuration from kopf.structs import primitives from kopf.structs import resources @@ -50,6 +51,7 @@ class WatchingError(Exception): async def infinite_watch( *, + settings: configuration.OperatorSettings, resource: resources.Resource, namespace: Optional[str], freeze_mode: Optional[primitives.Toggle] = None, @@ -66,6 +68,7 @@ async def infinite_watch( """ while True: stream = streaming_watch( + settings=settings, resource=resource, namespace=namespace, freeze_mode=freeze_mode, @@ -77,6 +80,7 @@ async def infinite_watch( async def streaming_watch( *, + settings: configuration.OperatorSettings, resource: resources.Resource, namespace: Optional[str], freeze_mode: Optional[primitives.Toggle] = None, @@ -102,6 +106,7 @@ async def streaming_watch( try: stream = continuous_watch( + settings=settings, resource=resource, namespace=namespace, freeze_waiter=freeze_waiter, ) @@ -115,6 +120,7 @@ async def streaming_watch( async def continuous_watch( *, + settings: configuration.OperatorSettings, resource: resources.Resource, namespace: Optional[str], freeze_waiter: asyncio_Future, @@ -132,6 +138,7 @@ async def continuous_watch( # Then, watch the resources starting from the list's resource version. stream = watch_objs( + settings=settings, resource=resource, namespace=namespace, timeout=config.WatchersConfig.default_stream_timeout, since=resource_version, @@ -168,6 +175,7 @@ async def continuous_watch( @auth.reauthenticated_stream async def watch_objs( *, + settings: configuration.OperatorSettings, resource: resources.Resource, namespace: Optional[str] = None, timeout: Optional[float] = None, diff --git a/kopf/engines/logging.py b/kopf/engines/logging.py index 8f9aada6..48a7e62e 100644 --- a/kopf/engines/logging.py +++ b/kopf/engines/logging.py @@ -10,11 +10,12 @@ """ import copy import logging -from typing import Tuple, MutableMapping, Any +from typing import Tuple, MutableMapping, Any, Optional from kopf import config from kopf.engines import posting from kopf.structs import bodies +from kopf.structs import configuration class ObjectPrefixingFormatter(logging.Formatter): @@ -42,6 +43,8 @@ def createLock(self) -> None: def filter(self, record: logging.LogRecord) -> bool: # Only those which have a k8s object referred (see: `ObjectLogger`). # Otherwise, we have nothing to post, and nothing to do. + settings: Optional[configuration.OperatorSettings] + settings = getattr(record, 'settings', None) level_ok = record.levelno >= config.EventsConfig.events_loglevel has_ref = hasattr(record, 'k8s_ref') skipped = hasattr(record, 'k8s_skip') and getattr(record, 'k8s_skip') @@ -85,8 +88,9 @@ class ObjectLogger(logging.LoggerAdapter): (e.g. in case of background posting via the queue; see `K8sPoster`). """ - def __init__(self, *, body: bodies.Body): + def __init__(self, *, body: bodies.Body, settings: configuration.OperatorSettings) -> None: super().__init__(logger, dict( + settings=settings, k8s_skip=False, k8s_ref=dict( apiVersion=body.get('apiVersion'), diff --git a/kopf/engines/posting.py b/kopf/engines/posting.py index 11c6dbe7..3e1d2cc4 100644 --- a/kopf/engines/posting.py +++ b/kopf/engines/posting.py @@ -22,6 +22,7 @@ from kopf import config from kopf.clients import events from kopf.structs import bodies +from kopf.structs import configuration from kopf.structs import dicts if TYPE_CHECKING: @@ -35,6 +36,11 @@ event_queue_loop_var: ContextVar[asyncio.AbstractEventLoop] = ContextVar('event_queue_loop_var') event_queue_var: ContextVar[K8sEventQueue] = ContextVar('event_queue_var') +# Per-operator container for settings. We only need a log level from there. +# This variable is dedicated to a posting engine, as the call chain is interrupted +# by user-side handlers (no pass-through `settings` arg). +settings_var: ContextVar[configuration.OperatorSettings] = ContextVar('settings_var') + class K8sEvent(NamedTuple): """ diff --git a/kopf/engines/probing.py b/kopf/engines/probing.py index 994c9574..97d3843b 100644 --- a/kopf/engines/probing.py +++ b/kopf/engines/probing.py @@ -10,6 +10,7 @@ from kopf.reactor import lifecycles from kopf.reactor import registries from kopf.structs import callbacks +from kopf.structs import configuration from kopf.structs import handlers logger = logging.getLogger(__name__) @@ -24,6 +25,7 @@ async def health_reporter( endpoint: str, *, registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, ready_flag: Optional[asyncio.Event] = None, # used for testing ) -> None: """ @@ -54,6 +56,7 @@ async def get_health( activity_results = await activities.run_activity( lifecycle=lifecycles.all_at_once, registry=registry, + settings=settings, activity=handlers.Activity.PROBE, ) probing_container.clear() diff --git a/kopf/reactor/activities.py b/kopf/reactor/activities.py index c91ab239..a6acab11 100644 --- a/kopf/reactor/activities.py +++ b/kopf/reactor/activities.py @@ -27,6 +27,7 @@ from kopf.reactor import registries from kopf.reactor import states from kopf.structs import callbacks +from kopf.structs import configuration from kopf.structs import credentials from kopf.structs import handlers as handlers_ @@ -49,6 +50,7 @@ def __init__( async def authenticator( *, registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, vault: credentials.Vault, ) -> NoReturn: """ Keep the credentials forever up to date. """ @@ -56,6 +58,7 @@ async def authenticator( while True: await authenticate( registry=registry, + settings=settings, vault=vault, _activity_title="Re-authentication" if counter else "Initial authentication", ) @@ -65,6 +68,7 @@ async def authenticator( async def authenticate( *, registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, vault: credentials.Vault, _activity_title: str = "Authentication", ) -> None: @@ -79,6 +83,7 @@ async def authenticate( activity_results = await run_activity( lifecycle=lifecycles.all_at_once, registry=registry, + settings=settings, activity=handlers_.Activity.AUTHENTICATION, ) @@ -96,18 +101,20 @@ async def run_activity( *, lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, activity: handlers_.Activity, ) -> Mapping[handlers_.HandlerId, callbacks.Result]: logger = logging.getLogger(f'kopf.activities.{activity.value}') # For the activity handlers, we have neither bodies, nor patches, just the state. - cause = causation.ActivityCause(logger=logger, activity=activity) + cause = causation.ActivityCause(logger=logger, activity=activity, settings=settings) handlers = registry.activity_handlers.get_handlers(activity=activity) state = states.State.from_scratch(handlers=handlers) outcomes: MutableMapping[handlers_.HandlerId, states.HandlerOutcome] = {} while not state.done: current_outcomes = await handling.execute_handlers_once( lifecycle=lifecycle, + settings=settings, handlers=handlers, cause=cause, state=state, diff --git a/kopf/reactor/causation.py b/kopf/reactor/causation.py index ead0bbac..1d567105 100644 --- a/kopf/reactor/causation.py +++ b/kopf/reactor/causation.py @@ -25,6 +25,7 @@ from typing import Any, Optional, Union, TypeVar from kopf.structs import bodies +from kopf.structs import configuration from kopf.structs import containers from kopf.structs import diffs from kopf.structs import finalizers @@ -42,6 +43,7 @@ class BaseCause: @dataclasses.dataclass class ActivityCause(BaseCause): activity: handlers.Activity + settings: configuration.OperatorSettings @dataclasses.dataclass diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 44c1da08..54e5ef98 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -19,6 +19,7 @@ from kopf.reactor import registries from kopf.reactor import states from kopf.structs import callbacks +from kopf.structs import configuration from kopf.structs import dicts from kopf.structs import diffs from kopf.structs import handlers as handlers_ @@ -61,6 +62,7 @@ class HandlerChildrenRetry(TemporaryError): # Used in `@kopf.on.this` and `kopf.execute()` to add/get the sub-handlers. sublifecycle_var: ContextVar[lifecycles.LifeCycleFn] = ContextVar('sublifecycle_var') subregistry_var: ContextVar[registries.ResourceChangingRegistry] = ContextVar('subregistry_var') +subsettings_var: ContextVar[configuration.OperatorSettings] = ContextVar('subsettings_var') subexecuted_var: ContextVar[bool] = ContextVar('subexecuted_var') handler_var: ContextVar[handlers_.BaseHandler] = ContextVar('handler_var') cause_var: ContextVar[causation.BaseCause] = ContextVar('cause_var') @@ -153,10 +155,12 @@ async def execute( "no practical use (there are no retries or state tracking).") # Execute the real handlers (all or few or one of them, as per the lifecycle). + subsettings = subsettings_var.get() subhandlers = subregistry.get_handlers(cause=cause) state = states.State.from_body(body=cause.body, handlers=subhandlers) outcomes = await execute_handlers_once( lifecycle=lifecycle, + settings=subsettings, handlers=subhandlers, cause=cause, state=state, @@ -172,6 +176,7 @@ async def execute( async def execute_handlers_once( lifecycle: lifecycles.LifeCycleFn, + settings: configuration.OperatorSettings, handlers: Collection[handlers_.BaseHandler], cause: causation.BaseCause, state: states.State, @@ -195,6 +200,7 @@ async def execute_handlers_once( outcomes: MutableMapping[handlers_.HandlerId, states.HandlerOutcome] = {} for handler in handlers_plan: outcome = await execute_handler_once( + settings=settings, handler=handler, state=state[handler.id], cause=cause, @@ -207,6 +213,7 @@ async def execute_handlers_once( async def execute_handler_once( + settings: configuration.OperatorSettings, handler: handlers_.BaseHandler, cause: causation.BaseCause, state: states.HandlerState, @@ -229,7 +236,7 @@ async def execute_handler_once( # Prevent successes/failures from posting k8s-events for resource-watching causes. logger: Union[logging.Logger, logging.LoggerAdapter] if isinstance(cause, causation.ResourceWatchingCause): - logger = logging_engine.LocalObjectLogger(body=cause.body) + logger = logging_engine.LocalObjectLogger(body=cause.body, settings=settings) else: logger = cause.logger @@ -249,6 +256,7 @@ async def execute_handler_once( retry=state.retries, started=state.started, runtime=state.runtime, + settings=settings, lifecycle=lifecycle, # just a default for the sub-handlers, not used directly. ) @@ -299,6 +307,7 @@ async def invoke_handler( handler: handlers_.BaseHandler, *args: Any, cause: causation.BaseCause, + settings: configuration.OperatorSettings, lifecycle: lifecycles.LifeCycleFn, **kwargs: Any, ) -> Optional[callbacks.Result]: @@ -328,6 +337,7 @@ async def invoke_handler( with invocation.context([ (sublifecycle_var, lifecycle), (subregistry_var, registries.ResourceChangingRegistry()), + (subsettings_var, settings), (subexecuted_var, False), (handler_var, handler), (cause_var, cause), @@ -337,6 +347,7 @@ async def invoke_handler( result = await invocation.invoke( handler.fn, *args, + settings=settings, cause=cause, **kwargs, ) diff --git a/kopf/reactor/invocation.py b/kopf/reactor/invocation.py index 317d04a1..dcff5d03 100644 --- a/kopf/reactor/invocation.py +++ b/kopf/reactor/invocation.py @@ -14,6 +14,7 @@ from kopf import config from kopf.reactor import causation from kopf.structs import callbacks +from kopf.structs import configuration if TYPE_CHECKING: asyncio_Future = asyncio.Future[Any] @@ -59,6 +60,10 @@ def build_kwargs( new_kwargs.update( activity=cause.activity, ) + if isinstance(cause, causation.ActivityCause) and cause.activity == cause.activity.STARTUP: + new_kwargs.update( + settings=cause.settings, + ) if isinstance(cause, causation.ResourceCause): new_kwargs.update( patch=cause.patch, @@ -93,6 +98,7 @@ def build_kwargs( async def invoke( fn: callbacks.BaseFn, *args: Any, + settings: Optional[configuration.OperatorSettings] = None, cause: Optional[causation.BaseCause] = None, **kwargs: Any, ) -> Any: diff --git a/kopf/reactor/processing.py b/kopf/reactor/processing.py index 36e59d1f..8cee7f76 100644 --- a/kopf/reactor/processing.py +++ b/kopf/reactor/processing.py @@ -27,6 +27,7 @@ from kopf.reactor import registries from kopf.reactor import states from kopf.structs import bodies +from kopf.structs import configuration from kopf.structs import containers from kopf.structs import finalizers from kopf.structs import handlers as handlers_ @@ -38,6 +39,7 @@ async def process_resource_event( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, memories: containers.ResourceMemories, resource: resources.Resource, raw_event: bodies.RawEvent, @@ -64,7 +66,7 @@ async def process_resource_event( delay: Optional[float] = None # Each object has its own prefixed logger, to distinguish parallel handling. - logger = logging_engine.ObjectLogger(body=body) + logger = logging_engine.ObjectLogger(body=body, settings=settings) posting.event_queue_loop_var.set(asyncio.get_running_loop()) posting.event_queue_var.set(event_queue) # till the end of this object's task. @@ -87,6 +89,7 @@ async def process_resource_event( await process_resource_watching_cause( lifecycle=lifecycles.all_at_once, registry=registry, + settings=settings, memory=memory, cause=resource_watching_cause, ) @@ -111,6 +114,7 @@ async def process_resource_event( delay = await process_resource_changing_cause( lifecycle=lifecycle, registry=registry, + settings=settings, memory=memory, cause=resource_changing_cause, ) @@ -149,6 +153,7 @@ async def process_resource_event( async def process_resource_watching_cause( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, memory: containers.ResourceMemory, cause: causation.ResourceWatchingCause, ) -> None: @@ -165,6 +170,7 @@ async def process_resource_watching_cause( handlers = registry.resource_watching_handlers[cause.resource].get_handlers(cause=cause) outcomes = await handling.execute_handlers_once( lifecycle=lifecycle, + settings=settings, handlers=handlers, cause=cause, state=states.State.from_scratch(handlers=handlers), @@ -178,6 +184,7 @@ async def process_resource_watching_cause( async def process_resource_changing_cause( lifecycle: lifecycles.LifeCycleFn, registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, memory: containers.ResourceMemory, cause: causation.ResourceChangingCause, ) -> Optional[float]: @@ -217,6 +224,7 @@ async def process_resource_changing_cause( if handlers: outcomes = await handling.execute_handlers_once( lifecycle=lifecycle, + settings=settings, handlers=handlers, cause=cause, state=state, diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index f0c31a89..57ed03ea 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -35,6 +35,7 @@ from kopf import config from kopf.clients import watching from kopf.structs import bodies +from kopf.structs import configuration from kopf.structs import primitives from kopf.structs import resources @@ -76,6 +77,7 @@ class Stream(NamedTuple): # TODO: add the label_selector support for the dev-mode? async def watcher( namespace: Union[None, str], + settings: configuration.OperatorSettings, resource: resources.Resource, processor: WatchStreamProcessor, freeze_mode: Optional[primitives.Toggle] = None, @@ -101,6 +103,7 @@ async def watcher( # Either use the existing object's queue, or create a new one together with the per-object job. # "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done. stream = watching.infinite_watch( + settings=settings, resource=resource, namespace=namespace, freeze_mode=freeze_mode, ) @@ -113,10 +116,15 @@ async def watcher( streams[key] = Stream(watchevents=asyncio.Queue(), replenished=asyncio.Event()) streams[key].replenished.set() # interrupt current sleeps, if any. await streams[key].watchevents.put(raw_event) - await scheduler.spawn(worker(processor=processor, streams=streams, key=key)) + await scheduler.spawn(worker( + processor=processor, + settings=settings, + streams=streams, + key=key, + )) finally: # Allow the existing workers to finish gracefully before killing them. - await _wait_for_depletion(scheduler=scheduler, streams=streams) + await _wait_for_depletion(scheduler=scheduler, streams=streams, settings=settings) # Forcedly terminate all the fire-and-forget per-object jobs, of they are still running. await asyncio.shield(scheduler.close()) @@ -124,6 +132,7 @@ async def watcher( async def worker( processor: WatchStreamProcessor, + settings: configuration.OperatorSettings, streams: Streams, key: ObjectRef, ) -> None: @@ -194,6 +203,7 @@ async def worker( async def _wait_for_depletion( *, scheduler: aiojobs.Scheduler, + settings: configuration.OperatorSettings, streams: Streams, ) -> None: diff --git a/kopf/reactor/running.py b/kopf/reactor/running.py index 7a50ad80..eea57c94 100644 --- a/kopf/reactor/running.py +++ b/kopf/reactor/running.py @@ -16,6 +16,7 @@ from kopf.reactor import processing from kopf.reactor import queueing from kopf.reactor import registries +from kopf.structs import configuration from kopf.structs import containers from kopf.structs import credentials from kopf.structs import handlers @@ -59,9 +60,11 @@ def login( # Perform the initial one-time authentication in presumably the same loop. loop = loop if loop is not None else asyncio.get_event_loop() registry = registries.get_default_registry() + settings = configuration.OperatorSettings() try: loop.run_until_complete(activities.authenticate( registry=registry, + settings=settings, vault=global_vault, )) except asyncio.CancelledError: @@ -80,6 +83,7 @@ def run( loop: Optional[asyncio.AbstractEventLoop] = None, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + settings: Optional[configuration.OperatorSettings] = None, memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, @@ -100,6 +104,7 @@ def run( loop.run_until_complete(operator( lifecycle=lifecycle, registry=registry, + settings=settings, memories=memories, standalone=standalone, namespace=namespace, @@ -118,6 +123,7 @@ async def operator( *, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + settings: Optional[configuration.OperatorSettings] = None, memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, @@ -140,6 +146,7 @@ async def operator( operator_tasks = await spawn_tasks( lifecycle=lifecycle, registry=registry, + settings=settings, memories=memories, standalone=standalone, namespace=namespace, @@ -157,6 +164,7 @@ async def spawn_tasks( *, lifecycle: Optional[lifecycles.LifeCycleFn] = None, registry: Optional[registries.OperatorRegistry] = None, + settings: Optional[configuration.OperatorSettings] = None, memories: Optional[containers.ResourceMemories] = None, standalone: bool = False, priority: int = 0, @@ -177,6 +185,7 @@ async def spawn_tasks( # The freezer and the registry are scoped to this whole task-set, to sync them all. lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle() registry = registry if registry is not None else registries.get_default_registry() + settings = settings if settings is not None else configuration.OperatorSettings() memories = memories if memories is not None else containers.ResourceMemories() vault = vault if vault is not None else global_vault vault = vault if vault is not None else credentials.Vault() @@ -189,6 +198,10 @@ async def spawn_tasks( # Global credentials store for this operator, also for CRD-reading & peering mode detection. auth.vault_var.set(vault) + # Special case: pass the settings container through the user-side handlers (no explicit args). + # Toolkits have to keep the original operator context somehow, and the only way is contextvars. + posting.settings_var.set(settings) + # Few common background forever-running infrastructural tasks (irregular root tasks). tasks.extend([ loop.create_task(_stop_flag_checker( @@ -199,6 +212,7 @@ async def spawn_tasks( root_tasks=tasks, # used as a "live" view, populated later. ready_flag=ready_flag, registry=registry, + settings=settings, vault=vault, # to purge & finalize the caches in the end. )), ]) @@ -209,6 +223,7 @@ async def spawn_tasks( name="credentials retriever", ready_flag=ready_flag, coro=activities.authenticator( registry=registry, + settings=settings, vault=vault))), ]) @@ -228,6 +243,7 @@ async def spawn_tasks( name="health reporter", ready_flag=ready_flag, coro=probing.health_reporter( registry=registry, + settings=settings, endpoint=liveness_endpoint))), ]) @@ -244,6 +260,7 @@ async def spawn_tasks( name="watcher of peering", ready_flag=ready_flag, coro=queueing.watcher( namespace=namespace, + settings=settings, resource=ourselves.resource, processor=functools.partial(peering.process_peering_event, ourselves=ourselves, @@ -257,11 +274,13 @@ async def spawn_tasks( name=f"watcher of {resource.name}", ready_flag=ready_flag, coro=queueing.watcher( namespace=namespace, + settings=settings, resource=resource, freeze_mode=freeze_mode, processor=functools.partial(processing.process_resource_event, lifecycle=lifecycle, registry=registry, + settings=settings, memories=memories, resource=resource, event_queue=event_queue)))), @@ -468,6 +487,7 @@ async def _startup_cleanup_activities( root_tasks: Sequence[asyncio_Task], # mutated externally! ready_flag: Optional[primitives.Flag], registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, vault: credentials.Vault, ) -> None: """ @@ -488,6 +508,7 @@ async def _startup_cleanup_activities( await activities.run_activity( lifecycle=lifecycles.all_at_once, registry=registry, + settings=settings, activity=handlers.Activity.STARTUP, ) except asyncio.CancelledError: @@ -518,6 +539,7 @@ async def _startup_cleanup_activities( await activities.run_activity( lifecycle=lifecycles.all_at_once, registry=registry, + settings=settings, activity=handlers.Activity.CLEANUP, ) await vault.close() diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py new file mode 100644 index 00000000..fc8e5446 --- /dev/null +++ b/kopf/structs/configuration.py @@ -0,0 +1,33 @@ +""" +All configuration flags, options, settings to fine-tune an operator. + +All settings are grouped semantically just for convenience +(instead of a flat mega-object with all the values in it). + +The individual groups or settings can eventually be moved or regrouped within +the root object, while keeping the legacy names for backward compatibility. + +.. note:: + + There is a discussion on usage of such words as "configuration", + "preferences", "settings", "options", "properties" in the internet: + + * https://stackoverflow.com/q/2074384/857383 + * https://qr.ae/pNvt40 + * etc. + + In this framework, they are called *"settings"* (plural). + Combined, they form a *"configuration"* (singular). + + Some of the settings are flags, some are scalars, some are optional, + some are not (but all of them have reasonable defaults). + + Regardless of the exact class and module names, all of these terms can be + used interchangeably -- but so that it is understandable what is meant. +""" +import dataclasses + + +@dataclasses.dataclass +class OperatorSettings: + pass diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py new file mode 100644 index 00000000..8c823e09 --- /dev/null +++ b/tests/settings/test_defaults.py @@ -0,0 +1,5 @@ +import kopf + + +async def test_declared_public_interface_and_promised_defaults(): + settings = kopf.OperatorSettings() From 49fef44b46573f4c5e304000393f5b929eda87e0 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 09:24:30 +0100 Subject: [PATCH 02/11] Extend KopfRunner testing toolkit with user-provided settings & registry --- kopf/cli.py | 8 ++++++++ kopf/toolkits/runner.py | 11 ++++++++++- tests/testing/test_runner.py | 15 +++++++++++++++ 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/kopf/cli.py b/kopf/cli.py index 03377c3a..cdbecb9e 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -7,7 +7,9 @@ from kopf import config from kopf.engines import peering +from kopf.reactor import registries from kopf.reactor import running +from kopf.structs import configuration from kopf.structs import credentials from kopf.structs import primitives from kopf.utilities import loaders @@ -19,6 +21,8 @@ class CLIControls: ready_flag: Optional[primitives.Flag] = None stop_flag: Optional[primitives.Flag] = None vault: Optional[credentials.Vault] = None + registry: Optional[registries.OperatorRegistry] = None + settings: Optional[configuration.OperatorSettings] = None def logging_options(fn: Callable[..., Any]) -> Callable[..., Any]: @@ -64,6 +68,8 @@ def run( liveness_endpoint: Optional[str], ) -> None: """ Start an operator process and handle all the requests. """ + if __controls.registry is not None: + registries.set_default_registry(__controls.registry) loaders.preload( paths=paths, modules=modules, @@ -74,6 +80,8 @@ def run( priority=priority, peering_name=peering_name, liveness_endpoint=liveness_endpoint, + registry=__controls.registry, + settings=__controls.settings, stop_flag=__controls.stop_flag, ready_flag=__controls.ready_flag, vault=__controls.vault, diff --git a/kopf/toolkits/runner.py b/kopf/toolkits/runner.py index 06044720..0221701f 100644 --- a/kopf/toolkits/runner.py +++ b/kopf/toolkits/runner.py @@ -9,6 +9,8 @@ from typing_extensions import Literal from kopf import cli +from kopf.reactor import registries +from kopf.structs import configuration _ExcType = BaseException _ExcInfo = Tuple[Type[_ExcType], _ExcType, types.TracebackType] @@ -60,6 +62,8 @@ def __init__( *args: Any, reraise: bool = True, timeout: Optional[float] = None, + registry: Optional[registries.OperatorRegistry] = None, + settings: Optional[configuration.OperatorSettings] = None, **kwargs: Any, ): super().__init__() @@ -67,6 +71,8 @@ def __init__( self.kwargs = kwargs self.reraise = reraise self.timeout = timeout + self.registry = registry + self.settings = settings self._stop = threading.Event() self._ready = threading.Event() # NB: not asyncio.Event! self._thread = threading.Thread(target=self._target) @@ -119,7 +125,10 @@ def _target(self) -> None: # Execute the requested CLI command in the thread & thread's loop. # Remember the result & exception for re-raising in the parent thread. try: - ctxobj = cli.CLIControls(stop_flag=self._stop) + ctxobj = cli.CLIControls( + registry=self.registry, + settings=self.settings, + stop_flag=self._stop) runner = click.testing.CliRunner() result = runner.invoke(cli.main, *self.args, **self.kwargs, obj=ctxobj) except BaseException as e: diff --git a/tests/testing/test_runner.py b/tests/testing/test_runner.py index d5a4e652..8e34d469 100644 --- a/tests/testing/test_runner.py +++ b/tests/testing/test_runner.py @@ -1,5 +1,7 @@ import pytest +from kopf.reactor.registries import OperatorRegistry +from kopf.structs.configuration import OperatorSettings from kopf.testing import KopfRunner @@ -22,6 +24,19 @@ def test_command_invocation_works(): # Note: stderr is not captured, it is mixed with stdout. +def test_registry_and_settings_are_propagated(mocker): + operator_mock = mocker.patch('kopf.reactor.running.operator') + registry = OperatorRegistry() + settings = OperatorSettings() + with KopfRunner(['run', '--standalone'], registry=registry, settings=settings) as runner: + pass + assert runner.exit_code == 0 + assert runner.exception is None + assert operator_mock.called + assert operator_mock.call_args[1]['registry'] is registry + assert operator_mock.call_args[1]['settings'] is settings + + def test_exception_from_runner_suppressed_with_no_reraise(): with KopfRunner(['run', 'non-existent.py', '--standalone'], reraise=False) as runner: pass From 1940afafe74f00725de6859d8394212e18ae2de0 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 14:56:51 +0100 Subject: [PATCH 03/11] Switch logging and posting from configs to settings --- kopf/engines/logging.py | 3 +-- kopf/engines/posting.py | 11 +++++++---- kopf/structs/configuration.py | 24 +++++++++++++++++++++++- tests/settings/test_defaults.py | 3 +++ 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/kopf/engines/logging.py b/kopf/engines/logging.py index 48a7e62e..97e0c8cf 100644 --- a/kopf/engines/logging.py +++ b/kopf/engines/logging.py @@ -12,7 +12,6 @@ import logging from typing import Tuple, MutableMapping, Any, Optional -from kopf import config from kopf.engines import posting from kopf.structs import bodies from kopf.structs import configuration @@ -45,7 +44,7 @@ def filter(self, record: logging.LogRecord) -> bool: # Otherwise, we have nothing to post, and nothing to do. settings: Optional[configuration.OperatorSettings] settings = getattr(record, 'settings', None) - level_ok = record.levelno >= config.EventsConfig.events_loglevel + level_ok = settings is not None and record.levelno >= settings.posting.level has_ref = hasattr(record, 'k8s_ref') skipped = hasattr(record, 'k8s_skip') and getattr(record, 'k8s_skip') return level_ok and has_ref and not skipped and super().filter(record) diff --git a/kopf/engines/posting.py b/kopf/engines/posting.py index 3e1d2cc4..6d0abadc 100644 --- a/kopf/engines/posting.py +++ b/kopf/engines/posting.py @@ -15,11 +15,11 @@ This also includes all logging messages posted by the framework itself. """ import asyncio +import logging import sys from contextvars import ContextVar from typing import NamedTuple, NoReturn, Optional, Union, Iterator, Iterable, cast, TYPE_CHECKING -from kopf import config from kopf.clients import events from kopf.structs import bodies from kopf.structs import configuration @@ -102,7 +102,8 @@ def info( reason: str, message: str = '', ) -> None: - if config.EventsConfig.events_loglevel <= config.LOGLEVEL_INFO: + settings: configuration.OperatorSettings = settings_var.get() + if settings.posting.level <= logging.INFO: event(obj, type='Normal', reason=reason, message=message) @@ -112,7 +113,8 @@ def warn( reason: str, message: str = '', ) -> None: - if config.EventsConfig.events_loglevel <= config.LOGLEVEL_WARNING: + settings: configuration.OperatorSettings = settings_var.get() + if settings.posting.level <= logging.WARNING: event(obj, type='Warning', reason=reason, message=message) @@ -127,7 +129,8 @@ def exception( _, exc, _ = sys.exc_info() reason = reason if reason else type(exc).__name__ message = f'{message} {exc}' if message and exc else f'{exc}' if exc else f'{message}' - if config.EventsConfig.events_loglevel <= config.LOGLEVEL_ERROR: + settings: configuration.OperatorSettings = settings_var.get() + if settings.posting.level <= logging.ERROR: event(obj, type='Error', reason=reason, message=message) diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index fc8e5446..079361bb 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -27,7 +27,29 @@ """ import dataclasses +from kopf import config # for legacy defaults only + @dataclasses.dataclass -class OperatorSettings: +class LoggingSettings: pass + + +@dataclasses.dataclass +class PostingSettings: + + level: int = dataclasses.field( + default_factory=lambda: config.EventsConfig.events_loglevel) + """ + A minimal level of logging events that will be posted as K8s Events. + The default is ``logging.INFO`` (i.e. all info, warning, errors are posted). + + This also affects ``kopf.event()`` and similar functions + (``kopf.info()``, ``kopf.warn()``, ``kopf.exception()``). + """ + + +@dataclasses.dataclass +class OperatorSettings: + logging: LoggingSettings = dataclasses.field(default_factory=LoggingSettings) + posting: PostingSettings = dataclasses.field(default_factory=PostingSettings) diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py index 8c823e09..dc8b7d78 100644 --- a/tests/settings/test_defaults.py +++ b/tests/settings/test_defaults.py @@ -1,5 +1,8 @@ +import logging + import kopf async def test_declared_public_interface_and_promised_defaults(): settings = kopf.OperatorSettings() + assert settings.posting.level == logging.INFO From 773b4ff761bb87e30e577601114f2b82077ee794 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 14:58:08 +0100 Subject: [PATCH 04/11] Switch K8s event-watching from configs to settings --- kopf/clients/watching.py | 7 +++---- kopf/structs/configuration.py | 25 +++++++++++++++++++++++++ tests/settings/test_defaults.py | 3 +++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/kopf/clients/watching.py b/kopf/clients/watching.py index 75512df2..f49f322f 100644 --- a/kopf/clients/watching.py +++ b/kopf/clients/watching.py @@ -26,7 +26,6 @@ import aiohttp -from kopf import config from kopf.clients import auth from kopf.clients import discovery from kopf.clients import fetching @@ -75,7 +74,7 @@ async def infinite_watch( ) async for raw_event in stream: yield raw_event - await asyncio.sleep(config.WatchersConfig.watcher_retry_delay) + await asyncio.sleep(settings.watching.retry_delay) async def streaming_watch( @@ -140,7 +139,7 @@ async def continuous_watch( stream = watch_objs( settings=settings, resource=resource, namespace=namespace, - timeout=config.WatchersConfig.default_stream_timeout, + timeout=settings.watching.stream_timeout, since=resource_version, freeze_waiter=freeze_waiter, ) @@ -211,7 +210,7 @@ async def watch_objs( # Talk to the API and initiate a streaming response. response = await context.session.get( url=resource.get_url(server=context.server, namespace=namespace, params=params), - timeout=aiohttp.ClientTimeout(total=config.WatchersConfig.session_timeout), + timeout=aiohttp.ClientTimeout(total=settings.watching.session_timeout), ) response.raise_for_status() diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index 079361bb..9b081c34 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -26,6 +26,7 @@ used interchangeably -- but so that it is understandable what is meant. """ import dataclasses +from typing import Optional from kopf import config # for legacy defaults only @@ -49,7 +50,31 @@ class PostingSettings: """ +@dataclasses.dataclass +class WatchingSettings: + + session_timeout: Optional[float] = dataclasses.field( + default_factory=lambda: config.WatchersConfig.session_timeout) + """ + An HTTP/HTTPS session timeout to use in watch requests. + """ + + stream_timeout: Optional[float] = dataclasses.field( + default_factory=lambda: config.WatchersConfig.default_stream_timeout) + """ + The maximum duration of one streaming request. Patched in some tests. + If ``None``, then obey the server-side timeouts (they seem to be random). + """ + + retry_delay: float = dataclasses.field( + default_factory=lambda: config.WatchersConfig.watcher_retry_delay) + """ + How long should a pause be between watch requests (to prevent API flooding). + """ + + @dataclasses.dataclass class OperatorSettings: logging: LoggingSettings = dataclasses.field(default_factory=LoggingSettings) posting: PostingSettings = dataclasses.field(default_factory=PostingSettings) + watching: WatchingSettings = dataclasses.field(default_factory=WatchingSettings) diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py index dc8b7d78..3381db0a 100644 --- a/tests/settings/test_defaults.py +++ b/tests/settings/test_defaults.py @@ -6,3 +6,6 @@ async def test_declared_public_interface_and_promised_defaults(): settings = kopf.OperatorSettings() assert settings.posting.level == logging.INFO + assert settings.watching.retry_delay == 0.1 + assert settings.watching.stream_timeout is None + assert settings.watching.session_timeout is None From 4782497dd9193c5ecc61e70b383201c5be956ae9 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 14:59:16 +0100 Subject: [PATCH 05/11] Switch demultiplexer's event-queue handling and batching to use settings --- kopf/reactor/queueing.py | 11 +++++----- kopf/structs/configuration.py | 36 +++++++++++++++++++++++++++++++++ tests/settings/test_defaults.py | 4 ++++ 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index 57ed03ea..a690a0bc 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -32,7 +32,6 @@ import aiojobs from typing_extensions import Protocol -from kopf import config from kopf.clients import watching from kopf.structs import bodies from kopf.structs import configuration @@ -97,7 +96,7 @@ async def watcher( # All per-object workers are handled as fire-and-forget jobs via the scheduler, # and communicated via the per-object event queues. - scheduler = await aiojobs.create_scheduler(limit=config.WorkersConfig.queue_workers_limit) + scheduler = await aiojobs.create_scheduler(limit=settings.batching.worker_limit) streams: Streams = {} try: # Either use the existing object's queue, or create a new one together with the per-object job. @@ -163,7 +162,7 @@ async def worker( try: raw_event = await asyncio.wait_for( watchevents.get(), - timeout=config.WorkersConfig.worker_idle_timeout) + timeout=settings.batching.idle_timeout) except asyncio.TimeoutError: break else: @@ -172,7 +171,7 @@ async def worker( prev_event = raw_event next_event = await asyncio.wait_for( watchevents.get(), - timeout=config.WorkersConfig.worker_batch_window) + timeout=settings.batching.batch_window) shouldstop = shouldstop or isinstance(next_event, EOS) raw_event = prev_event if isinstance(next_event, EOS) else next_event except asyncio.TimeoutError: @@ -216,8 +215,8 @@ async def _wait_for_depletion( started = time.perf_counter() while streams and \ scheduler.active_count and \ - time.perf_counter() - started < config.WorkersConfig.worker_exit_timeout: - await asyncio.sleep(config.WorkersConfig.worker_exit_timeout / 100.) + time.perf_counter() - started < settings.batching.exit_timeout: + await asyncio.sleep(settings.batching.exit_timeout / 100.) # The last check if the termination is going to be graceful or not. if streams: diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index 9b081c34..5bf16682 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -73,8 +73,44 @@ class WatchingSettings: """ +@dataclasses.dataclass +class BatchingSettings: + """ + Settings for how raw events are batched and processed. + """ + + worker_limit: Optional[int] = dataclasses.field( + default_factory=lambda: config.WorkersConfig.queue_workers_limit) + """ + How many workers can be running simultaneously on per-object event queue. + If ``None``, there is no limit to the number of workers (as many as needed). + """ + + idle_timeout: float = dataclasses.field( + default_factory=lambda: config.WorkersConfig.worker_idle_timeout) + """ + How soon an idle worker is exited and garbage-collected if no events arrive. + """ + + batch_window: float = dataclasses.field( + default_factory=lambda: config.WorkersConfig.worker_batch_window) + """ + How fast/slow does a worker deplete the queue when an event is received. + All events arriving within this window will be ignored except the last one. + """ + + exit_timeout: float = dataclasses.field( + default_factory=lambda: config.WorkersConfig.worker_exit_timeout) + """ + How soon a worker is cancelled when the parent watcher is going to exit. + This is the time given to the worker to deplete and process the queue. + """ + + + @dataclasses.dataclass class OperatorSettings: logging: LoggingSettings = dataclasses.field(default_factory=LoggingSettings) posting: PostingSettings = dataclasses.field(default_factory=PostingSettings) watching: WatchingSettings = dataclasses.field(default_factory=WatchingSettings) + batching: BatchingSettings = dataclasses.field(default_factory=BatchingSettings) diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py index 3381db0a..6ce442e3 100644 --- a/tests/settings/test_defaults.py +++ b/tests/settings/test_defaults.py @@ -9,3 +9,7 @@ async def test_declared_public_interface_and_promised_defaults(): assert settings.watching.retry_delay == 0.1 assert settings.watching.stream_timeout is None assert settings.watching.session_timeout is None + assert settings.batching.worker_limit is None + assert settings.batching.idle_timeout == 5.0 + assert settings.batching.exit_timeout == 2.0 + assert settings.batching.batch_window == 0.1 From 80e8a28f8674e647b703860aa91097b43e70fb9f Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 15:00:25 +0100 Subject: [PATCH 06/11] Switch thread-pool executor for sync handlers from configs to settings --- kopf/config.py | 24 +++++------ kopf/reactor/invocation.py | 3 +- kopf/structs/configuration.py | 40 ++++++++++++++++++ tests/settings/test_defaults.py | 2 + tests/settings/test_executor.py | 73 +++++++++++++++++++++++++++++++++ 5 files changed, 127 insertions(+), 15 deletions(-) create mode 100644 tests/settings/test_executor.py diff --git a/kopf/config.py b/kopf/config.py index 91834a0f..1b4724cc 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -1,5 +1,4 @@ import asyncio -import concurrent.futures import logging from typing import Optional @@ -73,8 +72,6 @@ class WorkersConfig: Used as single point of configuration for kopf.reactor. """ - threadpool_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None - queue_workers_limit: Optional[int] = None # if None, there is no limits to workers number """ How many workers can be running simultaneously on per-object event queue. """ @@ -90,14 +87,6 @@ class WorkersConfig: worker_exit_timeout: float = 2.0 """ How long does a worker can work on watcher exit before being cancelled. """ - @staticmethod - def get_syn_executor() -> concurrent.futures.ThreadPoolExecutor: - if not WorkersConfig.threadpool_executor: - WorkersConfig.threadpool_executor = concurrent.futures.ThreadPoolExecutor( - max_workers=WorkersConfig.synchronous_tasks_threadpool_limit - ) - return WorkersConfig.threadpool_executor - @staticmethod def set_synchronous_tasks_threadpool_limit(new_limit: int) -> None: """ @@ -107,8 +96,17 @@ def set_synchronous_tasks_threadpool_limit(new_limit: int) -> None: raise ValueError('Can`t set threadpool limit lower than 1') WorkersConfig.synchronous_tasks_threadpool_limit = new_limit - if WorkersConfig.threadpool_executor: - WorkersConfig.threadpool_executor._max_workers = new_limit # type: ignore + + # Also apply to the current runtime settings, if we are at runtime (not load-time). + try: + # Wherever we can find it; ignore "nice" architecture (this class is deprecated anyway). + from kopf.engines import posting # noqa + from kopf.structs import configuration # noqa # cyclic imports, for type annotations + settings: configuration.OperatorSettings = posting.settings_var.get() + except LookupError: + pass + else: + settings.execution.max_workers = new_limit class WatchersConfig: diff --git a/kopf/reactor/invocation.py b/kopf/reactor/invocation.py index dcff5d03..c1073e3b 100644 --- a/kopf/reactor/invocation.py +++ b/kopf/reactor/invocation.py @@ -11,7 +11,6 @@ import functools from typing import Optional, Any, List, Iterable, Iterator, Tuple, Dict, cast, TYPE_CHECKING -from kopf import config from kopf.reactor import causation from kopf.structs import callbacks from kopf.structs import configuration @@ -137,7 +136,7 @@ async def invoke( # Cancellation is postponed until the thread exits, but it happens anyway (for consistency). # Note: the docs say the result is a future, but typesheds say it is a coroutine => cast()! loop = asyncio.get_event_loop() - executor = config.WorkersConfig.get_syn_executor() + executor = settings.execution.executor if settings is not None else None future = cast(asyncio_Future, loop.run_in_executor(executor, real_fn)) cancellation: Optional[asyncio.CancelledError] = None while not future.done(): diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index 5bf16682..2b844bb0 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -25,6 +25,7 @@ Regardless of the exact class and module names, all of these terms can be used interchangeably -- but so that it is understandable what is meant. """ +import concurrent.futures import dataclasses from typing import Optional @@ -107,6 +108,44 @@ class BatchingSettings: """ +@dataclasses.dataclass +class ExecutionSettings: + """ + Settings for synchronous handlers execution (e.g. thread-/process-pools). + """ + + executor: concurrent.futures.Executor = dataclasses.field( + default_factory=concurrent.futures.ThreadPoolExecutor) + """ + The executor to be used for synchronous handler invocation. + + It can be changed at runtime (e.g. to reset the pool size). Already running + handlers (specific invocations) will continue with their original executors. + """ + + _max_workers: Optional[int] = dataclasses.field( + default_factory=lambda: config.WorkersConfig.synchronous_tasks_threadpool_limit) + + @property + def max_workers(self) -> Optional[int]: + """ + How many threads/processes is dedicated to handler execution. + + It can be changed at runtime (the threads/processes are not terminated). + """ + return self._max_workers + + @max_workers.setter + def max_workers(self, value: int) -> None: + if value < 1: + raise ValueError("Can't set thread pool limit lower than 1.") + self._max_workers = value + + if hasattr(self.executor, '_max_workers'): + self.executor._max_workers = value # type: ignore + else: + raise TypeError("Current executor does not support `max_workers`.") + @dataclasses.dataclass class OperatorSettings: @@ -114,3 +153,4 @@ class OperatorSettings: posting: PostingSettings = dataclasses.field(default_factory=PostingSettings) watching: WatchingSettings = dataclasses.field(default_factory=WatchingSettings) batching: BatchingSettings = dataclasses.field(default_factory=BatchingSettings) + execution: ExecutionSettings = dataclasses.field(default_factory=ExecutionSettings) diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py index 6ce442e3..4fbb181f 100644 --- a/tests/settings/test_defaults.py +++ b/tests/settings/test_defaults.py @@ -13,3 +13,5 @@ async def test_declared_public_interface_and_promised_defaults(): assert settings.batching.idle_timeout == 5.0 assert settings.batching.exit_timeout == 2.0 assert settings.batching.batch_window == 0.1 + assert settings.execution.executor is not None + assert settings.execution.max_workers is None diff --git a/tests/settings/test_executor.py b/tests/settings/test_executor.py new file mode 100644 index 00000000..f448ac66 --- /dev/null +++ b/tests/settings/test_executor.py @@ -0,0 +1,73 @@ +import concurrent.futures +import threading +from unittest.mock import MagicMock + +import kopf +from kopf.engines.posting import settings_var +from kopf.reactor.invocation import invoke + + +class CatchyExecutor(concurrent.futures.ThreadPoolExecutor): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.calls = [] + + def submit(self, fn, *args, **kwargs): + self.calls.append(fn) + return super().submit(fn, *args, **kwargs) + + +async def test_synchronous_calls_are_threaded(): + settings = kopf.OperatorSettings() + thread = None + + def fn(): + nonlocal thread + thread = threading.current_thread() + + mock = MagicMock(wraps=fn) + await invoke(fn=mock, settings=settings) + + assert mock.called + assert thread is not None # remembered from inside fn() + assert thread is not threading.current_thread() # not in the main thread + + +async def test_synchronous_calls_use_replaced_executor(): + settings = kopf.OperatorSettings() + executor = CatchyExecutor() + settings.execution.executor = executor + + mock = MagicMock() + await invoke(fn=mock, settings=settings) + + assert mock.called + assert len(executor.calls) == 1 + + +async def test_synchronous_executor_limit_is_applied(): + settings = kopf.OperatorSettings() + assert hasattr(settings.execution.executor, '_max_workers') # prerequisite + + assert settings.execution.max_workers is None # as in "unset by us, assume defaults" + assert settings.execution.executor._max_workers is not None # usually CPU count * N. + + settings.execution.max_workers = 123456 + + assert settings.execution.max_workers == 123456 + assert settings.execution.executor._max_workers == 123456 + + +async def test_synchronous_executor_limit_is_applied_legacy_way(): + settings = kopf.OperatorSettings() + assert hasattr(settings.execution.executor, '_max_workers') # prerequisite + + assert settings.execution.max_workers is None # as in "unset by us, assume defaults" + assert settings.execution.executor._max_workers is not None # usually CPU count * N. + + settings_var.set(settings) # an assumption on the implementation + kopf.config.WorkersConfig.set_synchronous_tasks_threadpool_limit(123456) + + assert settings.execution.max_workers == 123456 + assert settings.execution.executor._max_workers == 123456 From 6b0e321306cee9bb0d8e9de1b2407b5601c08be7 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 15:00:58 +0100 Subject: [PATCH 07/11] Put the deprecation markers for the old configs --- kopf/__init__.py | 12 ++++++------ kopf/config.py | 3 +++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/kopf/__init__.py b/kopf/__init__.py index 23a3d0b0..ef94aae1 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -11,12 +11,12 @@ ) from kopf.config import ( configure, - LOGLEVEL_INFO, - LOGLEVEL_WARNING, - LOGLEVEL_ERROR, - LOGLEVEL_CRITICAL, - EventsConfig, - WorkersConfig, + LOGLEVEL_INFO, # deprecated + LOGLEVEL_WARNING, # deprecated + LOGLEVEL_ERROR, # deprecated + LOGLEVEL_CRITICAL, # deprecated + EventsConfig, # deprecated + WorkersConfig, # deprecated ) from kopf.engines.posting import ( event, diff --git a/kopf/config.py b/kopf/config.py index 1b4724cc..a85d096a 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -58,6 +58,7 @@ def configure( loop.set_debug(bool(debug)) +# DEPRECATED: Used for initial defaults for per-operator settings (see kopf.structs.configuration). class EventsConfig: """ Used to configure events sending behaviour. @@ -67,6 +68,7 @@ class EventsConfig: """ What events should be logged. """ +# DEPRECATED: Used for initial defaults for per-operator settings (see kopf.structs.configuration). class WorkersConfig: """ Used as single point of configuration for kopf.reactor. @@ -109,6 +111,7 @@ def set_synchronous_tasks_threadpool_limit(new_limit: int) -> None: settings.execution.max_workers = new_limit +# DEPRECATED: Used for initial defaults for per-operator settings (see kopf.structs.configuration). class WatchersConfig: """ Used to configure the K8s API watchers and streams. From 399ae9b9336dd34697d228bef2a4d4c9f647f090 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 09:30:27 +0100 Subject: [PATCH 08/11] Adjust tests to use new per-test settings instead of configs & mocks --- examples/09-testing/test_example_09.py | 9 +- .../11-filtering-handlers/test_example_11.py | 9 +- tests/authentication/test_authentication.py | 9 +- tests/basic-structs/test_causes.py | 3 + tests/conftest.py | 16 ++++ tests/e2e/test_examples.py | 4 +- tests/handling/test_activity_triggering.py | 15 ++-- tests/handling/test_cause_handling.py | 19 +++-- tests/handling/test_cause_logging.py | 11 ++- tests/handling/test_delays.py | 6 +- tests/handling/test_errors.py | 9 +- tests/handling/test_event_handling.py | 6 +- tests/handling/test_multistep.py | 6 +- tests/handling/test_no_handlers.py | 3 +- tests/handling/test_retrying_limits.py | 6 +- tests/handling/test_timing_consistency.py | 3 +- tests/k8s/test_watching.py | 84 ++++++++++++++----- tests/posting/test_log2k8s.py | 38 +++++---- tests/posting/test_poster.py | 14 ++-- tests/reactor/conftest.py | 3 +- tests/reactor/test_queueing.py | 31 +++---- tests/test_liveness.py | 3 +- 22 files changed, 207 insertions(+), 100 deletions(-) diff --git a/examples/09-testing/test_example_09.py b/examples/09-testing/test_example_09.py index 7dcbf572..f37391db 100644 --- a/examples/09-testing/test_example_09.py +++ b/examples/09-testing/test_example_09.py @@ -28,13 +28,16 @@ def obj_absent(): check=False, timeout=10, capture_output=True, shell=True) -def test_resource_lifecycle(mocker): +def test_resource_lifecycle(): # To prevent lengthy threads in the loop executor when the process exits. - mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10) + settings = kopf.OperatorSettings() + settings.watching.stream_timeout = 10 # Run an operator and simulate some activity with the operated resource. - with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py], timeout=60) as runner: + with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py], + timeout=60, settings=settings) as runner: + subprocess.run(f"kubectl create -f {obj_yaml}", shell=True, check=True, timeout=10, capture_output=True) time.sleep(5) # give it some time to react diff --git a/examples/11-filtering-handlers/test_example_11.py b/examples/11-filtering-handlers/test_example_11.py index a8eb1521..23f10b06 100644 --- a/examples/11-filtering-handlers/test_example_11.py +++ b/examples/11-filtering-handlers/test_example_11.py @@ -28,13 +28,16 @@ def obj_absent(): check=False, timeout=10, capture_output=True, shell=True) -def test_handler_filtering(mocker): +def test_handler_filtering(): # To prevent lengthy threads in the loop executor when the process exits. - mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10) + settings = kopf.OperatorSettings() + settings.watching.stream_timeout = 10 # Run an operator and simulate some activity with the operated resource. - with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py]) as runner: + with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py], + settings=settings) as runner: + subprocess.run(f"kubectl create -f {obj_yaml}", shell=True, check=True, timeout=10, capture_output=True) time.sleep(5) # give it some time to react diff --git a/tests/authentication/test_authentication.py b/tests/authentication/test_authentication.py index 80f989be..b5d4e719 100644 --- a/tests/authentication/test_authentication.py +++ b/tests/authentication/test_authentication.py @@ -6,12 +6,13 @@ from kopf.structs.handlers import ActivityHandler, Activity -async def test_empty_registry_produces_no_credentials(): +async def test_empty_registry_produces_no_credentials(settings): vault = Vault() registry = OperatorRegistry() await authenticate( registry=registry, + settings=settings, vault=vault, ) @@ -21,7 +22,7 @@ async def test_empty_registry_produces_no_credentials(): pass -async def test_noreturn_handler_produces_no_credentials(): +async def test_noreturn_handler_produces_no_credentials(settings): vault = Vault() registry = OperatorRegistry() @@ -36,6 +37,7 @@ def login_fn(**_): await authenticate( registry=registry, + settings=settings, vault=vault, ) @@ -45,7 +47,7 @@ def login_fn(**_): pass -async def test_single_credentials_provided_to_vault(): +async def test_single_credentials_provided_to_vault(settings): info = ConnectionInfo(server='https://expected/') vault = Vault() registry = OperatorRegistry() @@ -61,6 +63,7 @@ def login_fn(**_): await authenticate( registry=registry, + settings=settings, vault=vault, ) diff --git a/tests/basic-structs/test_causes.py b/tests/basic-structs/test_causes.py index b8469e07..3519ad25 100644 --- a/tests/basic-structs/test_causes.py +++ b/tests/basic-structs/test_causes.py @@ -12,11 +12,14 @@ def test_cause_with_no_args(cls): def test_activity_cause(mocker): logger = mocker.Mock() activity = mocker.Mock() + settings = mocker.Mock() cause = ActivityCause( activity=activity, + settings=settings, logger=logger, ) assert cause.activity is activity + assert cause.settings is settings assert cause.logger is logger diff --git a/tests/conftest.py b/tests/conftest.py index ebb2ace3..091f6cf1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,6 +18,8 @@ from kopf.clients.auth import APIContext from kopf.config import configure from kopf.engines.logging import ObjectPrefixingFormatter +from kopf.engines.posting import settings_var +from kopf.structs.configuration import OperatorSettings from kopf.structs.credentials import Vault, VaultKey, ConnectionInfo from kopf.structs.resources import Resource @@ -90,6 +92,20 @@ def resource(): return Resource('zalando.org', 'v1', 'kopfexamples') +@pytest.fixture() +def settings(): + return OperatorSettings() + + +@pytest.fixture() +def settings_via_contextvar(settings): + token = settings_var.set(settings) + try: + yield + finally: + settings_var.reset(token) + + # # Mocks for Kopf's internal but global variables. # diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index d067a05c..5fdf9407 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -9,7 +9,7 @@ from kopf.testing import KopfRunner -def test_all_examples_are_runnable(mocker, with_crd, exampledir, caplog): +def test_all_examples_are_runnable(mocker, settings, with_crd, exampledir, caplog): # If the example has its own opinion on the timing, try to respect it. # See e.g. /examples/99-all-at-once/example.py. @@ -45,7 +45,7 @@ def test_all_examples_are_runnable(mocker, with_crd, exampledir, caplog): mocker.patch('kopf.reactor.handling.DEFAULT_RETRY_DELAY', 1) # To prevent lengthy threads in the loop executor when the process exits. - mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10) + settings.watching.stream_timeout = 10 # Run an operator and simulate some activity with the operated resource. with KopfRunner(['run', '--standalone', '--verbose', str(example_py)], timeout=60) as runner: diff --git a/tests/handling/test_activity_triggering.py b/tests/handling/test_activity_triggering.py index 039b63e0..191315b4 100644 --- a/tests/handling/test_activity_triggering.py +++ b/tests/handling/test_activity_triggering.py @@ -21,7 +21,7 @@ def test_activity_error_exception(): @pytest.mark.parametrize('activity', list(Activity)) -async def test_results_are_returned_on_success(activity): +async def test_results_are_returned_on_success(settings, activity): def sample_fn1(**_): return 123 @@ -41,6 +41,7 @@ def sample_fn2(**_): results = await run_activity( registry=registry, + settings=settings, activity=activity, lifecycle=all_at_once, ) @@ -51,7 +52,7 @@ def sample_fn2(**_): @pytest.mark.parametrize('activity', list(Activity)) -async def test_errors_are_raised_aggregated(activity): +async def test_errors_are_raised_aggregated(settings, activity): def sample_fn1(**_): raise PermanentError("boo!123") @@ -72,6 +73,7 @@ def sample_fn2(**_): with pytest.raises(ActivityError) as e: await run_activity( registry=registry, + settings=settings, activity=activity, lifecycle=all_at_once, ) @@ -90,7 +92,7 @@ def sample_fn2(**_): @pytest.mark.parametrize('activity', list(Activity)) -async def test_errors_are_cascaded_from_one_of_the_originals(activity): +async def test_errors_are_cascaded_from_one_of_the_originals(settings, activity): def sample_fn(**_): raise PermanentError("boo!") @@ -104,6 +106,7 @@ def sample_fn(**_): with pytest.raises(ActivityError) as e: await run_activity( registry=registry, + settings=settings, activity=activity, lifecycle=all_at_once, ) @@ -114,7 +117,7 @@ def sample_fn(**_): @pytest.mark.parametrize('activity', list(Activity)) -async def test_retries_are_simulated(activity, mocker): +async def test_retries_are_simulated(settings, activity, mocker): mock = mocker.MagicMock() def sample_fn(**_): @@ -130,6 +133,7 @@ def sample_fn(**_): with pytest.raises(ActivityError) as e: await run_activity( registry=registry, + settings=settings, activity=activity, lifecycle=all_at_once, ) @@ -139,7 +143,7 @@ def sample_fn(**_): @pytest.mark.parametrize('activity', list(Activity)) -async def test_delays_are_simulated(activity, mocker): +async def test_delays_are_simulated(settings, activity, mocker): def sample_fn(**_): raise TemporaryError('to be retried', delay=123) @@ -161,6 +165,7 @@ async def sleep_or_wait_substitute(*_, **__): with pytest.raises(ActivityError) as e: await run_activity( registry=registry, + settings=settings, activity=activity, lifecycle=all_at_once, ) diff --git a/tests/handling/test_cause_handling.py b/tests/handling/test_cause_handling.py index 97dd15b5..80f0e722 100644 --- a/tests/handling/test_cause_handling.py +++ b/tests/handling/test_cause_handling.py @@ -6,7 +6,6 @@ import kopf from kopf.reactor.processing import process_resource_event from kopf.structs.containers import ResourceMemories -from kopf.structs.finalizers import FINALIZER from kopf.structs.handlers import Reason from kopf.structs.lastseen import LAST_SEEN_ANNOTATION @@ -14,7 +13,7 @@ @pytest.mark.parametrize('event_type', EVENT_TYPES) -async def test_create(registry, handlers, resource, cause_mock, event_type, +async def test_create(registry, settings, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.CREATE @@ -23,6 +22,7 @@ async def test_create(registry, handlers, resource, cause_mock, event_type, await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -55,7 +55,7 @@ async def test_create(registry, handlers, resource, cause_mock, event_type, @pytest.mark.parametrize('event_type', EVENT_TYPES) -async def test_update(registry, handlers, resource, cause_mock, event_type, +async def test_update(registry, settings, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.UPDATE @@ -64,6 +64,7 @@ async def test_update(registry, handlers, resource, cause_mock, event_type, await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -96,7 +97,7 @@ async def test_update(registry, handlers, resource, cause_mock, event_type, @pytest.mark.parametrize('event_type', EVENT_TYPES) -async def test_delete(registry, handlers, resource, cause_mock, event_type, +async def test_delete(registry, settings, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.DELETE @@ -105,6 +106,7 @@ async def test_delete(registry, handlers, resource, cause_mock, event_type, await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -139,7 +141,7 @@ async def test_delete(registry, handlers, resource, cause_mock, event_type, # @pytest.mark.parametrize('event_type', EVENT_TYPES) -async def test_gone(registry, handlers, resource, cause_mock, event_type, +async def test_gone(registry, settings, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.GONE @@ -148,6 +150,7 @@ async def test_gone(registry, handlers, resource, cause_mock, event_type, await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -170,7 +173,7 @@ async def test_gone(registry, handlers, resource, cause_mock, event_type, @pytest.mark.parametrize('event_type', EVENT_TYPES) -async def test_free(registry, handlers, resource, cause_mock, event_type, +async def test_free(registry, settings, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.FREE @@ -179,6 +182,7 @@ async def test_free(registry, handlers, resource, cause_mock, event_type, await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -201,7 +205,7 @@ async def test_free(registry, handlers, resource, cause_mock, event_type, @pytest.mark.parametrize('event_type', EVENT_TYPES) -async def test_noop(registry, handlers, resource, cause_mock, event_type, +async def test_noop(registry, settings, handlers, resource, cause_mock, event_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.NOOP @@ -210,6 +214,7 @@ async def test_noop(registry, handlers, resource, cause_mock, event_type, await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, diff --git a/tests/handling/test_cause_logging.py b/tests/handling/test_cause_logging.py index fe8d6792..704b3cb7 100644 --- a/tests/handling/test_cause_logging.py +++ b/tests/handling/test_cause_logging.py @@ -10,7 +10,7 @@ @pytest.mark.parametrize('cause_type', ALL_REASONS) -async def test_all_logs_are_prefixed(registry, resource, handlers, +async def test_all_logs_are_prefixed(registry, settings, resource, handlers, logstream, cause_type, cause_mock): event_type = None if cause_type == Reason.RESUME else 'irrelevant' event_body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} @@ -19,6 +19,7 @@ async def test_all_logs_are_prefixed(registry, resource, handlers, await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': event_body}, @@ -35,8 +36,8 @@ async def test_all_logs_are_prefixed(registry, resource, handlers, pytest.param((('op', ('field',), 'old', 'new'),), id='realistic-diff'), ]) @pytest.mark.parametrize('cause_type', HANDLER_REASONS) -async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, cause_mock, - caplog, assert_logs, diff): +async def test_diffs_logged_if_present(registry, settings, resource, handlers, + cause_type, cause_mock, caplog, assert_logs, diff): caplog.set_level(logging.DEBUG) event_type = None if cause_type == Reason.RESUME else 'irrelevant' @@ -48,6 +49,7 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -66,7 +68,7 @@ async def test_diffs_logged_if_present(registry, resource, handlers, cause_type, pytest.param([], id='empty-list-diff'), pytest.param((), id='empty-tuple-diff'), ]) -async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_type, cause_mock, +async def test_diffs_not_logged_if_absent(registry, settings, resource, handlers, cause_type, cause_mock, caplog, assert_logs, diff): caplog.set_level(logging.DEBUG) @@ -77,6 +79,7 @@ async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_ty await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, diff --git a/tests/handling/test_delays.py b/tests/handling/test_delays.py index 6468c131..c207a835 100644 --- a/tests/handling/test_delays.py +++ b/tests/handling/test_delays.py @@ -20,7 +20,7 @@ ['2020-01-01T00:00:00', '2020-01-01T00:04:56.789000', 4 * 60 + 56.789], ], ids=['fast']) async def test_delayed_handlers_progress( - registry, handlers, resource, cause_mock, cause_reason, + registry, settings, handlers, resource, cause_mock, cause_reason, caplog, assert_logs, k8s_mocked, now, delayed_iso, delay): caplog.set_level(logging.DEBUG) @@ -36,6 +36,7 @@ async def test_delayed_handlers_progress( await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -67,7 +68,7 @@ async def test_delayed_handlers_progress( ['2020-01-01T00:00:00', '2099-12-31T23:59:59.000000', WAITING_KEEPALIVE_INTERVAL], ], ids=['fast', 'slow']) async def test_delayed_handlers_sleep( - registry, handlers, resource, cause_mock, cause_reason, + registry, settings, handlers, resource, cause_mock, cause_reason, caplog, assert_logs, k8s_mocked, now, delayed_iso, delay): caplog.set_level(logging.DEBUG) @@ -91,6 +92,7 @@ async def test_delayed_handlers_sleep( await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': event_body}, diff --git a/tests/handling/test_errors.py b/tests/handling/test_errors.py index 9089f105..42f169fa 100644 --- a/tests/handling/test_errors.py +++ b/tests/handling/test_errors.py @@ -13,7 +13,7 @@ # The extrahandlers are needed to prevent the cycle ending and status purging. @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_fatal_error_stops_handler( - registry, handlers, extrahandlers, resource, cause_mock, cause_type, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -28,6 +28,7 @@ async def test_fatal_error_stops_handler( await process_resource_event( lifecycle=kopf.lifecycles.one_by_one, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -56,7 +57,7 @@ async def test_fatal_error_stops_handler( # The extrahandlers are needed to prevent the cycle ending and status purging. @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_retry_error_delays_handler( - registry, handlers, extrahandlers, resource, cause_mock, cause_type, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -71,6 +72,7 @@ async def test_retry_error_delays_handler( await process_resource_event( lifecycle=kopf.lifecycles.one_by_one, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -100,7 +102,7 @@ async def test_retry_error_delays_handler( # The extrahandlers are needed to prevent the cycle ending and status purging. @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_arbitrary_error_delays_handler( - registry, handlers, extrahandlers, resource, cause_mock, cause_type, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -115,6 +117,7 @@ async def test_arbitrary_error_delays_handler( await process_resource_event( lifecycle=kopf.lifecycles.one_by_one, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, diff --git a/tests/handling/test_event_handling.py b/tests/handling/test_event_handling.py index ddb23c1e..bebbe855 100644 --- a/tests/handling/test_event_handling.py +++ b/tests/handling/test_event_handling.py @@ -11,7 +11,7 @@ @pytest.mark.parametrize('cause_type', ALL_REASONS) async def test_handlers_called_always( - registry, handlers, extrahandlers, resource, cause_mock, cause_type, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = cause_type @@ -19,6 +19,7 @@ async def test_handlers_called_always( await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': 'ev-type', 'object': {'field': 'value'}}, @@ -44,7 +45,7 @@ async def test_handlers_called_always( @pytest.mark.parametrize('cause_type', ALL_REASONS) async def test_errors_are_ignored( - registry, handlers, extrahandlers, resource, cause_mock, cause_type, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) cause_mock.reason = cause_type @@ -53,6 +54,7 @@ async def test_errors_are_ignored( await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': 'ev-type', 'object': {}}, diff --git a/tests/handling/test_multistep.py b/tests/handling/test_multistep.py index 6c8d2d41..5fcb74a3 100644 --- a/tests/handling/test_multistep.py +++ b/tests/handling/test_multistep.py @@ -10,7 +10,7 @@ @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_1st_step_stores_progress_by_patching( - registry, handlers, extrahandlers, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, k8s_mocked): name1 = f'{cause_type}_fn' name2 = f'{cause_type}_fn2' @@ -21,6 +21,7 @@ async def test_1st_step_stores_progress_by_patching( await process_resource_event( lifecycle=kopf.lifecycles.asap, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': {}}, @@ -51,7 +52,7 @@ async def test_1st_step_stores_progress_by_patching( @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_2nd_step_finishes_the_handlers(caplog, - registry, handlers, extrahandlers, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, k8s_mocked): name1 = f'{cause_type}_fn' name2 = f'{cause_type}_fn2' @@ -70,6 +71,7 @@ async def test_2nd_step_finishes_the_handlers(caplog, await process_resource_event( lifecycle=kopf.lifecycles.one_by_one, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': event_body}, diff --git a/tests/handling/test_no_handlers.py b/tests/handling/test_no_handlers.py index 1d1db01a..0b441ec9 100644 --- a/tests/handling/test_no_handlers.py +++ b/tests/handling/test_no_handlers.py @@ -12,7 +12,7 @@ @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_skipped_with_no_handlers( - registry, resource, cause_mock, cause_type, + registry, settings, resource, cause_mock, cause_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) @@ -32,6 +32,7 @@ async def test_skipped_with_no_handlers( await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': event_body}, diff --git a/tests/handling/test_retrying_limits.py b/tests/handling/test_retrying_limits.py index bdee3480..4c977f74 100644 --- a/tests/handling/test_retrying_limits.py +++ b/tests/handling/test_retrying_limits.py @@ -17,7 +17,7 @@ ['2099-12-31T23:59:59', '2020-01-01T00:00:00'], ], ids=['slow']) async def test_timed_out_handler_fails( - registry, handlers, extrahandlers, resource, cause_mock, cause_type, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, caplog, assert_logs, k8s_mocked, now, ts): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -37,6 +37,7 @@ async def test_timed_out_handler_fails( await process_resource_event( lifecycle=kopf.lifecycles.one_by_one, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': event_body}, @@ -66,7 +67,7 @@ async def test_timed_out_handler_fails( # The extrahandlers are needed to prevent the cycle ending and status purging. @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_retries_limited_handler_fails( - registry, handlers, extrahandlers, resource, cause_mock, cause_type, + registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, caplog, assert_logs, k8s_mocked): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -85,6 +86,7 @@ async def test_retries_limited_handler_fails( await process_resource_event( lifecycle=kopf.lifecycles.one_by_one, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': event_type, 'object': event_body}, diff --git a/tests/handling/test_timing_consistency.py b/tests/handling/test_timing_consistency.py index 6f14e898..29799d2c 100644 --- a/tests/handling/test_timing_consistency.py +++ b/tests/handling/test_timing_consistency.py @@ -8,7 +8,7 @@ from kopf.structs.containers import ResourceMemories -async def test_consistent_awakening(registry, resource, k8s_mocked, mocker): +async def test_consistent_awakening(registry, settings, resource, k8s_mocked, mocker): """ A special case to ensure that "now" is consistent during the handling. @@ -58,6 +58,7 @@ def move_to_tsB(*_, **__): await process_resource_event( lifecycle=kopf.lifecycles.all_at_once, registry=registry, + settings=settings, resource=resource, memories=ResourceMemories(), raw_event={'type': 'ADDED', 'object': body}, diff --git a/tests/k8s/test_watching.py b/tests/k8s/test_watching.py index 9334b975..9d390fc7 100644 --- a/tests/k8s/test_watching.py +++ b/tests/k8s/test_watching.py @@ -50,23 +50,30 @@ def namespace(request): return request.param -async def test_empty_stream_yields_nothing(resource, stream, namespace): +async def test_empty_stream_yields_nothing(settings, resource, stream, namespace): + stream.feed([], namespace=namespace) stream.close(namespace=namespace) events = [] - async for event in streaming_watch(resource=resource, namespace=namespace): + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace): events.append(event) assert len(events) == 0 -async def test_event_stream_yields_everything(resource, stream, namespace): +async def test_event_stream_yields_everything( + settings, resource, stream, namespace): + stream.feed(STREAM_WITH_NORMAL_EVENTS, namespace=namespace) stream.close(namespace=namespace) events = [] - async for event in streaming_watch(resource=resource, namespace=namespace): + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace): events.append(event) assert len(events) == 2 @@ -74,13 +81,17 @@ async def test_event_stream_yields_everything(resource, stream, namespace): assert events[1]['object']['spec'] == 'b' -async def test_unknown_event_type_ignored(resource, stream, namespace, caplog): +async def test_unknown_event_type_ignored( + settings, resource, stream, namespace, caplog): + caplog.set_level(logging.DEBUG) stream.feed(STREAM_WITH_UNKNOWN_EVENT, namespace=namespace) stream.close(namespace=namespace) events = [] - async for event in streaming_watch(resource=resource, namespace=namespace): + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace): events.append(event) assert len(events) == 2 @@ -90,13 +101,17 @@ async def test_unknown_event_type_ignored(resource, stream, namespace, caplog): assert "UNKNOWN" in caplog.text -async def test_error_410gone_exits_normally(resource, stream, namespace, caplog): +async def test_error_410gone_exits_normally( + settings, resource, stream, namespace, caplog): + caplog.set_level(logging.DEBUG) stream.feed(STREAM_WITH_ERROR_410GONE, namespace=namespace) stream.close(namespace=namespace) events = [] - async for event in streaming_watch(resource=resource, namespace=namespace): + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace): events.append(event) assert len(events) == 1 @@ -104,13 +119,17 @@ async def test_error_410gone_exits_normally(resource, stream, namespace, caplog) assert "Restarting the watch-stream" in caplog.text -async def test_unknown_error_raises_exception(resource, stream, namespace): +async def test_unknown_error_raises_exception( + settings, resource, stream, namespace): + stream.feed(STREAM_WITH_ERROR_CODE, namespace=namespace) stream.close(namespace=namespace) events = [] with pytest.raises(WatchingError) as e: - async for event in streaming_watch(resource=resource, namespace=namespace): + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace): events.append(event) assert len(events) == 1 @@ -118,21 +137,26 @@ async def test_unknown_error_raises_exception(resource, stream, namespace): assert '666' in str(e.value) -async def test_exception_escalates(resource, stream, namespace, enforced_session, mocker): +async def test_exception_escalates( + settings, resource, stream, namespace, enforced_session, mocker): + enforced_session.get = mocker.Mock(side_effect=SampleException()) stream.feed([], namespace=namespace) stream.close(namespace=namespace) events = [] with pytest.raises(SampleException): - async for event in streaming_watch(resource=resource, namespace=namespace): + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace): events.append(event) assert len(events) == 0 async def test_freezing_is_ignored_if_turned_off( - resource, stream, namespace, timer, caplog, assert_logs): + settings, resource, stream, namespace, timer, caplog, assert_logs): + stream.feed(STREAM_WITH_NORMAL_EVENTS, namespace=namespace) stream.close(namespace=namespace) @@ -140,7 +164,9 @@ async def test_freezing_is_ignored_if_turned_off( events = [] async def read_stream(): - async for event in streaming_watch(resource=resource, namespace=namespace, + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace, freeze_mode=freeze_mode): events.append(event) @@ -157,7 +183,8 @@ async def read_stream(): async def test_freezing_waits_forever_if_not_resumed( - resource, stream, namespace, timer, caplog, assert_logs): + settings, resource, stream, namespace, timer, caplog, assert_logs): + stream.feed(STREAM_WITH_NORMAL_EVENTS, namespace=namespace) stream.close(namespace=namespace) @@ -165,7 +192,9 @@ async def test_freezing_waits_forever_if_not_resumed( events = [] async def read_stream(): - async for event in streaming_watch(resource=resource, namespace=namespace, + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace, freeze_mode=freeze_mode): events.append(event) @@ -184,7 +213,8 @@ async def read_stream(): async def test_freezing_waits_until_resumed( - resource, stream, namespace, timer, caplog, assert_logs): + settings, resource, stream, namespace, timer, caplog, assert_logs): + stream.feed(STREAM_WITH_NORMAL_EVENTS, namespace=namespace) stream.close(namespace=namespace) @@ -196,7 +226,9 @@ async def delayed_resuming(delay: float): await freeze_mode.turn_off() async def read_stream(): - async for event in streaming_watch(resource=resource, namespace=namespace, + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace, freeze_mode=freeze_mode): events.append(event) @@ -214,7 +246,9 @@ async def read_stream(): ]) -async def test_infinite_watch_never_exits_normally(resource, stream, namespace, aresponses): +async def test_infinite_watch_never_exits_normally( + settings, resource, stream, namespace, aresponses): + error = aresponses.Response(status=555, reason='stop-infinite-cycle') stream.feed( STREAM_WITH_ERROR_410GONE, # watching restarted @@ -226,7 +260,9 @@ async def test_infinite_watch_never_exits_normally(resource, stream, namespace, events = [] with pytest.raises(aiohttp.ClientResponseError) as e: - async for event in infinite_watch(resource=resource, namespace=namespace): + async for event in infinite_watch(settings=settings, + resource=resource, + namespace=namespace): events.append(event) assert e.value.status == 555 @@ -239,7 +275,9 @@ async def test_infinite_watch_never_exits_normally(resource, stream, namespace, # See: See: https://github.com/zalando-incubator/kopf/issues/275 -async def test_long_line_parsing(resource, stream, namespace, aresponses): +async def test_long_line_parsing( + settings, resource, stream, namespace, aresponses): + content = [ {'type': 'ADDED', 'object': {'spec': {'field': 'x'}}}, {'type': 'ADDED', 'object': {'spec': {'field': 'y' * (2 * 1024 * 1024)}}}, @@ -249,7 +287,9 @@ async def test_long_line_parsing(resource, stream, namespace, aresponses): stream.close(namespace=namespace) events = [] - async for event in streaming_watch(resource=resource, namespace=namespace): + async for event in streaming_watch(settings=settings, + resource=resource, + namespace=namespace): events.append(event) assert len(events) == 3 diff --git a/tests/posting/test_log2k8s.py b/tests/posting/test_log2k8s.py index 79450456..d9c9b272 100644 --- a/tests/posting/test_log2k8s.py +++ b/tests/posting/test_log2k8s.py @@ -2,7 +2,6 @@ import pytest -from kopf.config import EventsConfig from kopf.engines.logging import ObjectLogger, LocalObjectLogger OBJ1 = {'apiVersion': 'group1/version1', 'kind': 'Kind1', @@ -17,8 +16,10 @@ ['error', "Error"], ['critical', "Fatal"], ]) -async def test_posting_normal_levels(caplog, logstream, logfn, event_type, event_queue, event_queue_loop): - logger = ObjectLogger(body=OBJ1) +async def test_posting_normal_levels(settings, caplog, logstream, logfn, event_type, + event_queue, event_queue_loop): + + logger = ObjectLogger(body=OBJ1, settings=settings) logger_fn = getattr(logger, logfn) logger_fn("hello %s", "world") @@ -39,14 +40,14 @@ async def test_posting_normal_levels(caplog, logstream, logfn, event_type, event ['error', "Error", logging.ERROR], ['critical', "Fatal", logging.CRITICAL], ]) -async def test_posting_above_config(caplog, logstream, logfn, event_type, min_levelno, +async def test_posting_above_config(settings, caplog, logstream, logfn, event_type, min_levelno, event_queue, event_queue_loop, mocker): - logger = ObjectLogger(body=OBJ1) + logger = ObjectLogger(body=OBJ1, settings=settings) logger_fn = getattr(logger, logfn) - mocker.patch.object(EventsConfig, 'events_loglevel', min_levelno) + settings.posting.level = min_levelno logger_fn("hello %s", "world") - mocker.patch.object(EventsConfig, 'events_loglevel', min_levelno + 1) + settings.posting.level = min_levelno + 1 logger_fn("must not be posted") assert event_queue.qsize() == 1 @@ -61,8 +62,10 @@ async def test_posting_above_config(caplog, logstream, logfn, event_type, min_le @pytest.mark.parametrize('logfn', [ 'debug', ]) -async def test_skipping_hidden_levels(caplog, logstream, logfn, event_queue, event_queue_loop): - logger = ObjectLogger(body=OBJ1) +async def test_skipping_hidden_levels(settings, caplog, logstream, logfn, + event_queue, event_queue_loop): + + logger = ObjectLogger(body=OBJ1, settings=settings) logger_fn = getattr(logger, logfn) logger_fn("hello %s", "world") @@ -79,14 +82,15 @@ async def test_skipping_hidden_levels(caplog, logstream, logfn, event_queue, eve 'error', 'critical', ]) -async def test_skipping_below_config(caplog, logstream, logfn, event_queue, event_queue_loop, - mocker): - logger = ObjectLogger(body=OBJ1) +async def test_skipping_below_config(settings, caplog, logstream, logfn, + event_queue, event_queue_loop, mocker): + + logger = ObjectLogger(body=OBJ1, settings=settings) logger_fn = getattr(logger, logfn) - mocker.patch.object(EventsConfig, 'events_loglevel', 666) + settings.posting.level = 666 logger_fn("hello %s", "world") - mocker.patch.object(EventsConfig, 'events_loglevel', 0) + settings.posting.level = 0 logger.info("must be here") assert event_queue.qsize() == 1 # not 2! @@ -100,8 +104,10 @@ async def test_skipping_below_config(caplog, logstream, logfn, event_queue, even 'error', 'critical', ]) -async def test_skipping_when_local_with_all_levels(caplog, logstream, logfn, event_queue, event_queue_loop): - logger = LocalObjectLogger(body=OBJ1) +async def test_skipping_when_local_with_all_levels(settings, caplog, logstream, logfn, + event_queue, event_queue_loop): + + logger = LocalObjectLogger(body=OBJ1, settings=settings) logger_fn = getattr(logger, logfn) logger_fn("hello %s", "world") diff --git a/tests/posting/test_poster.py b/tests/posting/test_poster.py index 780dc453..86eb2f0f 100644 --- a/tests/posting/test_poster.py +++ b/tests/posting/test_poster.py @@ -5,8 +5,7 @@ from asynctest import call from kopf import event, info, warn, exception -from kopf.config import EventsConfig -from kopf.engines.posting import poster, K8sEvent, event_queue_var, event_queue_loop_var +from kopf.engines.posting import poster, K8sEvent, event_queue_var, event_queue_loop_var, settings_var OBJ1 = {'apiVersion': 'group1/version1', 'kind': 'Kind1', 'metadata': {'uid': 'uid1', 'name': 'name1', 'namespace': 'ns1'}} @@ -18,6 +17,11 @@ 'uid': 'uid2', 'name': 'name2', 'namespace': 'ns2'} +@pytest.fixture(autouse=True) +def _settings_via_contextvar(settings_via_contextvar): + pass + + async def test_poster_polls_and_posts(mocker): event1 = K8sEvent(type='type1', reason='reason1', message='message1', ref=REF1) event2 = K8sEvent(type='type2', reason='reason2', message='message2', ref=REF2) @@ -83,13 +87,13 @@ async def test_via_event_function(mocker, event_queue, event_queue_loop): pytest.param(warn, "Warning", logging.WARNING, id='warn'), pytest.param(exception, "Error", logging.ERROR, id='exception'), ]) -async def test_via_shortcut(mocker, event_fn, event_type, min_levelno, +async def test_via_shortcut(settings, mocker, event_fn, event_type, min_levelno, event_queue, event_queue_loop): post_event = mocker.patch('kopf.clients.events.post_event') - mocker.patch.object(EventsConfig, 'events_loglevel', min_levelno) + settings.posting.level = min_levelno event_fn(OBJ1, reason='reason1', message='message1') # posted - mocker.patch.object(EventsConfig, 'events_loglevel', min_levelno + 1) + settings.posting.level = min_levelno + 1 event_fn(OBJ1, reason='reason2', message='message2') # not posted assert not post_event.called diff --git a/tests/reactor/conftest.py b/tests/reactor/conftest.py index d3bdc3d5..5f752f2a 100644 --- a/tests/reactor/conftest.py +++ b/tests/reactor/conftest.py @@ -39,7 +39,7 @@ def watcher_limited(mocker): @pytest.fixture() -def watcher_in_background(resource, event_loop, worker_spy, stream): +def watcher_in_background(settings, resource, event_loop, worker_spy, stream): # Prevent remembering the streaming objects in the mocks. async def do_nothing(*args, **kwargs): @@ -52,6 +52,7 @@ async def do_nothing(*args, **kwargs): coro = watcher( namespace=None, resource=resource, + settings=settings, processor=do_nothing, ) task = event_loop.create_task(coro) diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index 6fc67093..05e5cf92 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -46,7 +46,7 @@ ]) @pytest.mark.usefixtures('watcher_limited') async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor, - stream, events, uids, cnts): + settings, stream, events, uids, cnts): """ Verify that every unique uid goes into its own queue+worker, which are never shared. """ # Inject the events of unique objects - to produce few streams/workers. @@ -58,6 +58,7 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor await watcher( namespace=None, resource=resource, + settings=settings, processor=processor, ) @@ -114,13 +115,14 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor ]) @pytest.mark.usefixtures('watcher_limited') -async def test_watchevent_batching(mocker, resource, processor, timer, stream, events, uids, vals): +async def test_watchevent_batching(settings, resource, processor, timer, + stream, events, uids, vals): """ Verify that only the last event per uid is actually handled. """ # Override the default timeouts to make the tests faster. - mocker.patch('kopf.config.WorkersConfig.worker_idle_timeout', 0.5) - mocker.patch('kopf.config.WorkersConfig.worker_batch_window', 0.1) - mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) + settings.batching.idle_timeout = 0.5 + settings.batching.batch_window = 0.1 + settings.batching.exit_timeout = 0.5 # Inject the events of unique objects - to produce few streams/workers. stream.feed(events) @@ -131,13 +133,13 @@ async def test_watchevent_batching(mocker, resource, processor, timer, stream, e await watcher( namespace=None, resource=resource, + settings=settings, processor=processor, ) # Significantly less than the queue getting timeout, but sufficient to run. # 2 <= 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough. - from kopf import config - assert timer.seconds < config.WorkersConfig.worker_batch_window + CODE_OVERHEAD + assert timer.seconds < settings.batching.batch_window + CODE_OVERHEAD # Was the processor called at all? Awaited as needed for async fns? assert processor.awaited @@ -169,13 +171,13 @@ async def test_watchevent_batching(mocker, resource, processor, timer, stream, e ]) @pytest.mark.usefixtures('watcher_in_background') -async def test_garbage_collection_of_streams(mocker, stream, events, unique, worker_spy): +async def test_garbage_collection_of_streams(settings, stream, events, unique, worker_spy): # Override the default timeouts to make the tests faster. - mocker.patch('kopf.config.WorkersConfig.worker_idle_timeout', 0.5) - mocker.patch('kopf.config.WorkersConfig.worker_batch_window', 0.1) - mocker.patch('kopf.config.WorkersConfig.worker_exit_timeout', 0.5) - mocker.patch('kopf.config.WatchersConfig.watcher_retry_delay', 1.0) # to prevent src depletion + settings.batching.idle_timeout = 0.5 + settings.batching.batch_window = 0.1 + settings.batching.exit_timeout = 0.5 + settings.watching.retry_delay = 1.0 # to prevent src depletion # Inject the events of unique objects - to produce few streams/workers. stream.feed(events) @@ -197,9 +199,8 @@ async def test_garbage_collection_of_streams(mocker, stream, events, unique, wor # Give the workers some time to finish waiting for the events. # Once the idle timeout, they will exit and gc their individual streams. - from kopf import config - await asyncio.sleep(config.WorkersConfig.worker_batch_window) # depleting the queues. - await asyncio.sleep(config.WorkersConfig.worker_idle_timeout) # idling on empty queues. + await asyncio.sleep(settings.batching.batch_window) # depleting the queues. + await asyncio.sleep(settings.batching.idle_timeout) # idling on empty queues. await asyncio.sleep(CODE_OVERHEAD) # The mutable(!) streams dict is now empty, i.e. garbage-collected. diff --git a/tests/test_liveness.py b/tests/test_liveness.py index 7c22ce60..d1c8a0c7 100644 --- a/tests/test_liveness.py +++ b/tests/test_liveness.py @@ -14,7 +14,7 @@ async def liveness_registry(): @pytest.fixture() -async def liveness_url(liveness_registry, aiohttp_unused_port): +async def liveness_url(settings, liveness_registry, aiohttp_unused_port): # The server startup is not instant, so we need a readiness flag. ready_flag = asyncio.Event() @@ -24,6 +24,7 @@ async def liveness_url(liveness_registry, aiohttp_unused_port): health_reporter( endpoint=f'http://:{port}/xyz', registry=liveness_registry, + settings=settings, ready_flag=ready_flag, ) ) From 846c1cd85b3e215ee9b5caa01cab2475f3eee915 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 16:57:54 +0100 Subject: [PATCH 09/11] Add a setting to disable log message posting without level trickery --- kopf/engines/logging.py | 3 ++- kopf/engines/posting.py | 30 +++++++++++++++++++----------- kopf/structs/configuration.py | 9 +++++++++ tests/posting/test_log2k8s.py | 21 +++++++++++++++++++++ tests/posting/test_threadsafety.py | 3 ++- 5 files changed, 53 insertions(+), 13 deletions(-) diff --git a/kopf/engines/logging.py b/kopf/engines/logging.py index 97e0c8cf..f313600a 100644 --- a/kopf/engines/logging.py +++ b/kopf/engines/logging.py @@ -45,9 +45,10 @@ def filter(self, record: logging.LogRecord) -> bool: settings: Optional[configuration.OperatorSettings] settings = getattr(record, 'settings', None) level_ok = settings is not None and record.levelno >= settings.posting.level + enabled = settings is not None and settings.posting.enabled has_ref = hasattr(record, 'k8s_ref') skipped = hasattr(record, 'k8s_skip') and getattr(record, 'k8s_skip') - return level_ok and has_ref and not skipped and super().filter(record) + return enabled and level_ok and has_ref and not skipped and super().filter(record) def emit(self, record: logging.LogRecord) -> None: # Same try-except as in e.g. `logging.StreamHandler`. diff --git a/kopf/engines/posting.py b/kopf/engines/posting.py index 6d0abadc..d41c9ec3 100644 --- a/kopf/engines/posting.py +++ b/kopf/engines/posting.py @@ -91,35 +91,41 @@ def event( reason: str, message: str = '', ) -> None: - for obj in cast(Iterator[bodies.Body], dicts.walk(objs)): - ref = bodies.build_object_reference(obj) - enqueue(ref=ref, type=type, reason=reason, message=message) + settings: configuration.OperatorSettings = settings_var.get() + if settings.posting.enabled: + for obj in cast(Iterator[bodies.Body], dicts.walk(objs)): + ref = bodies.build_object_reference(obj) + enqueue(ref=ref, type=type, reason=reason, message=message) def info( - obj: bodies.Body, + objs: Union[bodies.Body, Iterable[bodies.Body]], *, reason: str, message: str = '', ) -> None: settings: configuration.OperatorSettings = settings_var.get() - if settings.posting.level <= logging.INFO: - event(obj, type='Normal', reason=reason, message=message) + if settings.posting.enabled and settings.posting.level <= logging.INFO: + for obj in cast(Iterator[bodies.Body], dicts.walk(objs)): + ref = bodies.build_object_reference(obj) + enqueue(ref=ref, type='Normal', reason=reason, message=message) def warn( - obj: bodies.Body, + objs: Union[bodies.Body, Iterable[bodies.Body]], *, reason: str, message: str = '', ) -> None: settings: configuration.OperatorSettings = settings_var.get() if settings.posting.level <= logging.WARNING: - event(obj, type='Warning', reason=reason, message=message) + for obj in cast(Iterator[bodies.Body], dicts.walk(objs)): + ref = bodies.build_object_reference(obj) + enqueue(ref=ref, type='Warning', reason=reason, message=message) def exception( - obj: bodies.Body, + objs: Union[bodies.Body, Iterable[bodies.Body]], *, reason: str = '', message: str = '', @@ -130,8 +136,10 @@ def exception( reason = reason if reason else type(exc).__name__ message = f'{message} {exc}' if message and exc else f'{exc}' if exc else f'{message}' settings: configuration.OperatorSettings = settings_var.get() - if settings.posting.level <= logging.ERROR: - event(obj, type='Error', reason=reason, message=message) + if settings.posting.enabled and settings.posting.level <= logging.ERROR: + for obj in cast(Iterator[bodies.Body], dicts.walk(objs)): + ref = bodies.build_object_reference(obj) + enqueue(ref=ref, type='Error', reason=reason, message=message) async def poster( diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index 2b844bb0..19286983 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -40,6 +40,15 @@ class LoggingSettings: @dataclasses.dataclass class PostingSettings: + enabled: bool = True + """ + Should the log messages be sent as Kubernetes Events for an object. + The events can be seen in ``kubectl describe`` output for the object. + + This also affects ``kopf.event()`` and similar functions + (``kopf.info()``, ``kopf.warn()``, ``kopf.exception()``). + """ + level: int = dataclasses.field( default_factory=lambda: config.EventsConfig.events_loglevel) """ diff --git a/tests/posting/test_log2k8s.py b/tests/posting/test_log2k8s.py index d9c9b272..60e363dd 100644 --- a/tests/posting/test_log2k8s.py +++ b/tests/posting/test_log2k8s.py @@ -97,6 +97,27 @@ async def test_skipping_below_config(settings, caplog, logstream, logfn, assert caplog.messages == ["hello world", "must be here"] +@pytest.mark.parametrize('logfn', [ + 'debug', + 'info', + 'warning', + 'error', + 'critical', +]) +async def test_skipping_when_disabled(settings, caplog, logstream, logfn, + event_queue, event_queue_loop): + + logger = LocalObjectLogger(body=OBJ1, settings=settings) + logger_fn = getattr(logger, logfn) + + settings.posting.enabled = False + settings.posting.level = 0 + logger_fn("hello %s", "world") + + assert event_queue.qsize() == 0 + assert caplog.messages == ["hello world"] + + @pytest.mark.parametrize('logfn', [ 'debug', 'info', diff --git a/tests/posting/test_threadsafety.py b/tests/posting/test_threadsafety.py index c2604396..65914ed9 100644 --- a/tests/posting/test_threadsafety.py +++ b/tests/posting/test_threadsafety.py @@ -115,7 +115,8 @@ def thread_fn(): assert 0.2 <= timer.seconds <= 0.4 -async def test_queueing_is_threadsafe(timer, awakener, threader, event_queue, event_queue_loop): +async def test_queueing_is_threadsafe(timer, awakener, threader, event_queue, event_queue_loop, + settings_via_contextvar): def thread_fn(): event(OBJ1, type='type1', reason='reason1', message='message1') From d94070fc8448aac05533e6c6091cb507951dace3 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 09:09:49 +0100 Subject: [PATCH 10/11] Document settings in a new way via settings (instead of old configs) --- docs/configuration.rst | 135 +++++++++++++++++++++++++++++++++++++++++ docs/configuring.rst | 37 ----------- docs/index.rst | 2 +- docs/kwargs.rst | 16 +++++ 4 files changed, 152 insertions(+), 38 deletions(-) create mode 100644 docs/configuration.rst delete mode 100644 docs/configuring.rst diff --git a/docs/configuration.rst b/docs/configuration.rst new file mode 100644 index 00000000..d54f1d84 --- /dev/null +++ b/docs/configuration.rst @@ -0,0 +1,135 @@ +============= +Configuration +============= + +There are tools to configure some of kopf functionality, like asynchronous +tasks behaviour and logging events. + + +Startup configuration +===================== + +Every operator has its settings (even if there are more than one operator +in the same processes, e.g. due to :doc:`embedding`). The settings affect +how the framework behaves in details. + +The settings can be modified in the startup handlers (see :doc:`startup`): + +.. code-block:: python + + import kopf + import logging + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.posting.level = logging.WARNING + settings.watching.session_timeout = 1 * 60 + settings.watching.stream_timeout = 10 * 60 + +All the settings have reasonable defaults, so the configuration should be used +only for fine-tuning when and if necessary. + +For more settings, see `kopf.OperatorSettings` and :kwarg:`settings` kwarg. + + +Logging events +============== + +``settings.posting`` allows to control which log messages should be post as +Kubernetes events. Use ``logging`` constants or integer values to set the level: +e.g., ``logging.WARNING``, ``logging.ERROR``, etc. +The default is ``logging`.INFO``. + +.. code-block:: python + + import logging + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.posting.level = logging.ERROR + +The event-posting can be disabled completely (the default is to be enabled): + +.. code-block:: python + + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.posting.enabled = False + +.. note:: + + These settings also affect `kopf.event` and related functions: + `kopf.info`, `kopf.warn`, `kopf.exception`, etc -- + even if they are called explicitly in the code. + + To avoid these settings having impact on your code, post events + directly with an API client library instead of Kopf-provided toolkit. + + +Synchronous handlers +==================== + +``settings.execution`` allows to set a number of synchronous workers used +and redefined the asyncio executor: + +.. code-block:: python + + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.execution.max_workers = 20 + + +It is possible to replace the whole asyncio executor used +for synchronous handlers (see :doc:`async`). + +Please note that the handlers that started in a previous executor, will be +continued and finished with their original executor. This includes the startup +handler itself. To avoid it, make the on-startup handler asynchronous: + +.. code-block:: python + + import concurrent.futures + import kopf + + @kopf.on.startup() + async def configure(settings: kopf.OperatorSettings, **_): + settings.execution.executor = concurrent.futures.ThreadPoolExecutor() + + +API timeouts +============ + +Few timeouts can be controlled when communicating with Kubernetes API: + +``settings.watching.session_timeout`` (seconds) is how long the session +with a watching request will exist before terminating from the **client** side. +The default is forever (``None``). + +``settings.watching.stream_timeout`` (seconds) is how long the session +with a watching request will exist before terminating from the **server** side. +The default is to let the server decide (``None``). + +It makes no sense to set the client-side timeout shorter than the server side +timeout, but it is given to the developers' responsibility to decide. + +The server-side timeouts are unpredictable, they can be in 10 seconds or +in 10 minutes. Yet, it feels wrong to assume any "good" values in a framework +(especially since it works without timeouts defined, just produces extra logs). + +``settings.watching.retry_delay`` (seconds) is for how long to sleep between +watching requests -- in order to prevent API flooding in case of errors +or disconnects. The default is 0.1 seconds (nearly instant, but no flooding). + +.. code-block:: python + + import concurrent.futures + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.watching.stream_timeout = 10 * 60 diff --git a/docs/configuring.rst b/docs/configuring.rst deleted file mode 100644 index 9b7c9348..00000000 --- a/docs/configuring.rst +++ /dev/null @@ -1,37 +0,0 @@ -================ -Configuration -================ - -There are tools to configure some of kopf functionality, like asynchronous -tasks behaviour and logging events. - - -Configure logging events -======================== - -`kopf.config.EventsConfig` allows to set what types of kopf logs should be -reflected in events. Use `logging` constants or integer values to set the level: -e.g., `logging.WARNING`, `logging.ERROR`, etc. The default is `logging.INFO`. - -.. code-block:: python - - import logging - import kopf - - # Now kopf will send events only when error or critical occasion happens - kopf.EventsConfig.events_loglevel = logging.ERROR - - -Configure Workers -================= - -`kopf.config.WorkersConfig` allows to set numbers of workers, launch periods, -and timeouts for many kinds of tasks. - -.. code-block:: python - - import kopf - - # Let's set how many workers can be running simultaneously on per-object event queue - kopf.WorkersConfig.synchronous_tasks_threadpool_limit = 20 - diff --git a/docs/index.rst b/docs/index.rst index d4ec17d6..4970434b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -43,7 +43,7 @@ Kopf: Kubernetes Operators Framework shutdown probing authentication - configuring + configuration peering .. toctree:: diff --git a/docs/kwargs.rst b/docs/kwargs.rst index bf0b9eda..dd02d3f5 100644 --- a/docs/kwargs.rst +++ b/docs/kwargs.rst @@ -38,6 +38,22 @@ in case of retries & errors -- i.e. of the first attempt. in case of retries & errors -- i.e. since the first attempt. +.. kwarg:: settings + +Operator configuration +====================== + +``settings`` is passed to activity handlers (but not to resource handlers). + +It is an object with predefined nested structure of containers with values, +which defines the operator's behaviour. See also: `kopf.OperatorSettings`. + +It can be modified if needed (usually in the startup handlers). Every operator +(if there are more than one in the same process) has its own config. + +See also: :doc:`configuration`. + + Resource-related kwargs ======================= From a1cf3c20fa408d380761014bfdec873de8812627 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Thu, 26 Mar 2020 18:14:49 +0100 Subject: [PATCH 11/11] Resolve cyclic module imports as detected by LGTM.com --- kopf/__init__.py | 4 +++- kopf/cli.py | 4 ++-- kopf/config.py | 50 ----------------------------------------- kopf/engines/logging.py | 47 ++++++++++++++++++++++++++++++++++++++ tests/conftest.py | 3 +-- 5 files changed, 53 insertions(+), 55 deletions(-) diff --git a/kopf/__init__.py b/kopf/__init__.py index ef94aae1..d42efa4b 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -10,7 +10,6 @@ on, # as a separate name on the public namespace ) from kopf.config import ( - configure, LOGLEVEL_INFO, # deprecated LOGLEVEL_WARNING, # deprecated LOGLEVEL_ERROR, # deprecated @@ -18,6 +17,9 @@ EventsConfig, # deprecated WorkersConfig, # deprecated ) +from kopf.engines.logging import ( + configure, +) from kopf.engines.posting import ( event, info, diff --git a/kopf/cli.py b/kopf/cli.py index cdbecb9e..77bf9848 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -5,7 +5,7 @@ import click -from kopf import config +from kopf.engines import logging from kopf.engines import peering from kopf.reactor import registries from kopf.reactor import running @@ -32,7 +32,7 @@ def logging_options(fn: Callable[..., Any]) -> Callable[..., Any]: @click.option('-q', '--quiet', is_flag=True) @functools.wraps(fn) # to preserve other opts/args def wrapper(verbose: bool, quiet: bool, debug: bool, *args: Any, **kwargs: Any) -> Any: - config.configure(debug=debug, verbose=verbose, quiet=quiet) + logging.configure(debug=debug, verbose=verbose, quiet=quiet) return fn(*args, **kwargs) return wrapper diff --git a/kopf/config.py b/kopf/config.py index a85d096a..c82c609c 100644 --- a/kopf/config.py +++ b/kopf/config.py @@ -1,12 +1,6 @@ -import asyncio import logging from typing import Optional -from kopf.engines import logging as logging_engine - -format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' - - # Deprecated: use ``logging.*`` constants instead. Kept here for backward-compatibility. LOGLEVEL_INFO = logging.INFO LOGLEVEL_WARNING = logging.WARNING @@ -14,50 +8,6 @@ LOGLEVEL_CRITICAL = logging.CRITICAL -def configure( - debug: Optional[bool] = None, - verbose: Optional[bool] = None, - quiet: Optional[bool] = None, -) -> None: - log_level = 'DEBUG' if debug or verbose else 'WARNING' if quiet else 'INFO' - - logger = logging.getLogger() - handler = logging.StreamHandler() - formatter = logging_engine.ObjectPrefixingFormatter(format) - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(log_level) - - # Configure the Kubernetes client defaults according to our settings. - try: - import kubernetes - except ImportError: - pass - else: - config = kubernetes.client.configuration.Configuration() - config.logger_format = format - config.logger_file = None # once again after the constructor to re-apply the formatter - config.debug = debug - kubernetes.client.configuration.Configuration.set_default(config) - - # Kubernetes client is as buggy as hell: it adds its own stream handlers even in non-debug mode, - # does not respect the formatting, and dumps too much of the low-level info. - if not debug: - logger = logging.getLogger("urllib3") - del logger.handlers[1:] # everything except the default NullHandler - - # Prevent the low-level logging unless in the debug verbosity mode. Keep only the operator's messages. - # For no-propagation loggers, add a dummy null handler to prevent printing the messages. - for name in ['urllib3', 'asyncio', 'kubernetes']: - logger = logging.getLogger(name) - logger.propagate = bool(debug) - if not debug: - logger.handlers[:] = [logging.NullHandler()] - - loop = asyncio.get_event_loop() - loop.set_debug(bool(debug)) - - # DEPRECATED: Used for initial defaults for per-operator settings (see kopf.structs.configuration). class EventsConfig: """ diff --git a/kopf/engines/logging.py b/kopf/engines/logging.py index f313600a..82752597 100644 --- a/kopf/engines/logging.py +++ b/kopf/engines/logging.py @@ -8,6 +8,7 @@ the operators' code, and can lead to information loss or mismatch (e.g. when logging call is added, but posting is forgotten). """ +import asyncio import copy import logging from typing import Tuple, MutableMapping, Any, Optional @@ -129,3 +130,49 @@ def log(self, *args: Any, **kwargs: Any) -> None: logger = logging.getLogger('kopf.objects') logger.addHandler(K8sPoster()) + +format = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s' + + +def configure( + debug: Optional[bool] = None, + verbose: Optional[bool] = None, + quiet: Optional[bool] = None, +) -> None: + log_level = 'DEBUG' if debug or verbose else 'WARNING' if quiet else 'INFO' + + logger = logging.getLogger() + handler = logging.StreamHandler() + formatter = ObjectPrefixingFormatter(format) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(log_level) + + # Configure the Kubernetes client defaults according to our settings. + try: + import kubernetes + except ImportError: + pass + else: + config = kubernetes.client.configuration.Configuration() + config.logger_format = format + config.logger_file = None # once again after the constructor to re-apply the formatter + config.debug = debug + kubernetes.client.configuration.Configuration.set_default(config) + + # Kubernetes client is as buggy as hell: it adds its own stream handlers even in non-debug mode, + # does not respect the formatting, and dumps too much of the low-level info. + if not debug: + logger = logging.getLogger("urllib3") + del logger.handlers[1:] # everything except the default NullHandler + + # Prevent the low-level logging unless in the debug verbosity mode. Keep only the operator's messages. + # For no-propagation loggers, add a dummy null handler to prevent printing the messages. + for name in ['urllib3', 'asyncio', 'kubernetes']: + logger = logging.getLogger(name) + logger.propagate = bool(debug) + if not debug: + logger.handlers[:] = [logging.NullHandler()] + + loop = asyncio.get_event_loop() + loop.set_debug(bool(debug)) diff --git a/tests/conftest.py b/tests/conftest.py index 091f6cf1..b149fa13 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,8 +16,7 @@ import kopf from kopf.clients.auth import APIContext -from kopf.config import configure -from kopf.engines.logging import ObjectPrefixingFormatter +from kopf.engines.logging import configure, ObjectPrefixingFormatter from kopf.engines.posting import settings_var from kopf.structs.configuration import OperatorSettings from kopf.structs.credentials import Vault, VaultKey, ConnectionInfo