From f31c3e2d9c30242dbc074c671921153099200aee Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 2 Nov 2019 21:16:53 +0100 Subject: [PATCH 1/8] Convert errors mode from two booleans to enum --- kopf/reactor/handling.py | 20 +++++++++----------- kopf/reactor/registries.py | 8 ++++++++ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 91333b5b..210b4ade 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -13,7 +13,6 @@ But these internal changes are filtered out from the cause detection and therefore do not trigger the user-defined handlers. """ - import asyncio import collections.abc import datetime @@ -187,7 +186,7 @@ async def handle_resource_watching_cause( handlers=handlers, cause=cause, state=states.State.from_scratch(handlers=handlers), - ignore_errors=True, # affects the log messages + default_errors=registries.ErrorsMode.IGNORED, ) # Store the results, but not the handlers' progress. @@ -367,8 +366,7 @@ async def _execute_handlers( handlers: Collection[registries.ResourceHandler], cause: causation.BaseCause, state: states.State, - ignore_errors: bool = False, - retry_on_errors: bool = True, + default_errors: registries.ErrorsMode = registries.ErrorsMode.TEMPORARY, ) -> Mapping[registries.HandlerId, states.HandlerOutcome]: """ Call the next handler(s) from the chain of the handlers. @@ -392,8 +390,7 @@ async def _execute_handlers( state=state[handler.id], cause=cause, lifecycle=lifecycle, # just a default for the sub-handlers, not used directly. - ignore_errors=ignore_errors, - retry_on_errors=retry_on_errors, + default_errors=default_errors, ) outcomes[handler.id] = outcome @@ -405,8 +402,7 @@ async def _execute_handler( cause: causation.BaseCause, state: states.HandlerState, lifecycle: lifecycles.LifeCycleFn, - ignore_errors: bool = False, - retry_on_errors: bool = True, + default_errors: registries.ErrorsMode = registries.ErrorsMode.TEMPORARY, ) -> states.HandlerOutcome: """ Execute one and only one handler. @@ -466,16 +462,18 @@ async def _execute_handler( # Regular errors behave as either temporary or permanent depending on the error strictness. except Exception as e: - if ignore_errors: + if default_errors == registries.ErrorsMode.IGNORED: logger.exception(f"Handler {handler.id!r} failed with an exception. Will ignore.") return states.HandlerOutcome(final=True, exception=e) - elif retry_on_errors: + elif default_errors == registries.ErrorsMode.TEMPORARY: logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") return states.HandlerOutcome(final=False, exception=e, delay=DEFAULT_RETRY_DELAY) - else: + elif default_errors == registries.ErrorsMode.PERMANENT: logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") return states.HandlerOutcome(final=True, exception=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? + else: + raise RuntimeError(f"Unknown mode for errors: {default_errors!r}") # No errors means the handler should be excluded from future runs in this reaction cycle. else: diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index a8e5cba3..2d91b0fe 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -13,6 +13,7 @@ """ import abc import collections +import enum import functools import logging import warnings @@ -39,6 +40,13 @@ HandlerResult = NewType('HandlerResult', object) +class ErrorsMode(enum.Enum): + """ How arbitrary (non-temporary/non-permanent) exceptions are treated. """ + IGNORED = enum.auto() + TEMPORARY = enum.auto() + PERMANENT = enum.auto() + + class ResourceHandlerFn(Protocol): def __call__( self, From 683c2f3ecb6aa1f18662fc59ab1a5a2bc71d2e0e Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 2 Nov 2019 21:25:25 +0100 Subject: [PATCH 2/8] Configure error mode individually per handler --- kopf/__init__.py | 2 ++ kopf/on.py | 51 +++++++++++++++++++++-------- kopf/reactor/handling.py | 9 ++--- kopf/reactor/registries.py | 9 +++-- tests/basic-structs/test_handler.py | 3 ++ tests/registries/test_decorators.py | 19 ++++++++--- 6 files changed, 69 insertions(+), 24 deletions(-) diff --git a/kopf/__init__.py b/kopf/__init__.py index 60406caa..5d65bb44 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -45,6 +45,7 @@ set_default_lifecycle, ) from kopf.reactor.registries import ( + ErrorsMode, ResourceRegistry, ResourceWatchingRegistry, ResourceChangingRegistry, @@ -90,6 +91,7 @@ 'get_default_lifecycle', 'set_default_lifecycle', 'build_object_reference', 'build_owner_reference', 'append_owner_reference', 'remove_owner_reference', + 'ErrorsMode', 'PermanentError', 'HandlerFatalError', 'TemporaryError', 'HandlerRetryError', 'HandlerTimeoutError', diff --git a/kopf/on.py b/kopf/on.py index 397b52af..484b6295 100644 --- a/kopf/on.py +++ b/kopf/on.py @@ -26,6 +26,7 @@ def resume( group: str, version: str, plural: str, *, id: Optional[str] = None, + errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, @@ -36,8 +37,10 @@ def resume( def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, - reason=None, initial=True, id=id, timeout=timeout, - fn=fn, labels=labels, annotations=annotations) + reason=None, initial=True, id=id, + errors=errors, timeout=timeout, + fn=fn, labels=labels, annotations=annotations, + ) return decorator @@ -45,6 +48,7 @@ def create( group: str, version: str, plural: str, *, id: Optional[str] = None, + errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, @@ -55,8 +59,10 @@ def create( def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, - reason=causation.Reason.CREATE, id=id, timeout=timeout, - fn=fn, labels=labels, annotations=annotations) + reason=causation.Reason.CREATE, id=id, + errors=errors, timeout=timeout, + fn=fn, labels=labels, annotations=annotations, + ) return decorator @@ -64,6 +70,7 @@ def update( group: str, version: str, plural: str, *, id: Optional[str] = None, + errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, @@ -74,8 +81,10 @@ def update( def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, - reason=causation.Reason.UPDATE, id=id, timeout=timeout, - fn=fn, labels=labels, annotations=annotations) + reason=causation.Reason.UPDATE, id=id, + errors=errors, timeout=timeout, + fn=fn, labels=labels, annotations=annotations, + ) return decorator @@ -83,6 +92,7 @@ def delete( group: str, version: str, plural: str, *, id: Optional[str] = None, + errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, optional: Optional[bool] = None, @@ -94,9 +104,11 @@ def delete( def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, - reason=causation.Reason.DELETE, id=id, timeout=timeout, + reason=causation.Reason.DELETE, id=id, + errors=errors, timeout=timeout, fn=fn, requires_finalizer=bool(not optional), - labels=labels, annotations=annotations) + labels=labels, annotations=annotations, + ) return decorator @@ -105,6 +117,7 @@ def field( field: Union[str, List[str], Tuple[str, ...]], *, id: Optional[str] = None, + errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, @@ -115,8 +128,10 @@ def field( def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, - reason=None, field=field, id=id, timeout=timeout, - fn=fn, labels=labels, annotations=annotations) + reason=None, field=field, id=id, + errors=errors, timeout=timeout, + fn=fn, labels=labels, annotations=annotations, + ) return decorator @@ -133,7 +148,8 @@ def event( def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_watching_handler( group=group, version=version, plural=plural, - id=id, fn=fn, labels=labels, annotations=annotations) + id=id, fn=fn, labels=labels, annotations=annotations, + ) return decorator @@ -142,6 +158,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: def this( *, id: Optional[str] = None, + errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, registry: Optional[registries.ResourceChangingRegistry] = None, ) -> ResourceHandlerDecorator: @@ -176,7 +193,10 @@ def create_task(*, spec, task=task, **kwargs): """ actual_registry = registry if registry is not None else handling.subregistry_var.get() def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: - return actual_registry.register(id=id, fn=fn, timeout=timeout) + return actual_registry.register( + id=id, fn=fn, + errors=errors, timeout=timeout, + ) return decorator @@ -184,6 +204,7 @@ def register( fn: registries.ResourceHandlerFn, *, id: Optional[str] = None, + errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, registry: Optional[registries.ResourceChangingRegistry] = None, ) -> registries.ResourceHandlerFn: @@ -211,4 +232,8 @@ def create_it(spec, **kwargs): def create_single_task(task=task, **_): pass """ - return this(id=id, timeout=timeout, registry=registry)(fn) + decorator = this( + id=id, registry=registry, + errors=errors, timeout=timeout, + ) + return decorator(fn) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 210b4ade..2c2b51a5 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -414,6 +414,7 @@ async def _execute_handler( This method is not supposed to raise any exceptions from the handlers: exceptions mean the failure of execution itself. """ + errors = handler.errors if handler.errors is not None else default_errors # Prevent successes/failures from posting k8s-events for resource-watching causes. logger: Union[logging.Logger, logging.LoggerAdapter] @@ -462,18 +463,18 @@ async def _execute_handler( # Regular errors behave as either temporary or permanent depending on the error strictness. except Exception as e: - if default_errors == registries.ErrorsMode.IGNORED: + if errors == registries.ErrorsMode.IGNORED: logger.exception(f"Handler {handler.id!r} failed with an exception. Will ignore.") return states.HandlerOutcome(final=True, exception=e) - elif default_errors == registries.ErrorsMode.TEMPORARY: + elif errors == registries.ErrorsMode.TEMPORARY: logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") return states.HandlerOutcome(final=False, exception=e, delay=DEFAULT_RETRY_DELAY) - elif default_errors == registries.ErrorsMode.PERMANENT: + elif errors == registries.ErrorsMode.PERMANENT: logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") return states.HandlerOutcome(final=True, exception=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? else: - raise RuntimeError(f"Unknown mode for errors: {default_errors!r}") + raise RuntimeError(f"Unknown mode for errors: {errors!r}") # No errors means the handler should be excluded from future runs in this reaction cycle. else: diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index 2d91b0fe..aa3b8622 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -75,6 +75,7 @@ class ResourceHandler(NamedTuple): id: HandlerId reason: Optional[causation.Reason] field: Optional[dicts.FieldPath] + errors: Optional[ErrorsMode] = None timeout: Optional[float] = None initial: Optional[bool] = None labels: Optional[bodies.Labels] = None @@ -114,6 +115,7 @@ def register( reason: Optional[causation.Reason] = None, event: Optional[str] = None, # deprecated, use `reason` field: Optional[dicts.FieldSpec] = None, + errors: Optional[ErrorsMode] = None, timeout: Optional[float] = None, initial: Optional[bool] = None, requires_finalizer: bool = False, @@ -126,7 +128,8 @@ def register( real_field = dicts.parse_field(field) or None # to not store tuple() as a no-field case. real_id = generate_id(fn=fn, id=id, prefix=self.prefix, suffix=".".join(real_field or [])) handler = ResourceHandler( - id=real_id, fn=fn, reason=reason, field=real_field, timeout=timeout, + id=real_id, fn=fn, reason=reason, field=real_field, + errors=errors, timeout=timeout, initial=initial, requires_finalizer=requires_finalizer, labels=labels, annotations=annotations, ) @@ -244,6 +247,7 @@ def register_resource_changing_handler( reason: Optional[causation.Reason] = None, event: Optional[str] = None, # deprecated, use `reason` field: Optional[dicts.FieldSpec] = None, + errors: Optional[ErrorsMode] = None, timeout: Optional[float] = None, initial: Optional[bool] = None, requires_finalizer: bool = False, @@ -255,7 +259,8 @@ def register_resource_changing_handler( """ resource = resources_.Resource(group, version, plural) return self._resource_changing_handlers[resource].register( - reason=reason, event=event, field=field, fn=fn, id=id, timeout=timeout, + reason=reason, event=event, field=field, fn=fn, id=id, + errors=errors, timeout=timeout, initial=initial, requires_finalizer=requires_finalizer, labels=labels, annotations=annotations, ) diff --git a/tests/basic-structs/test_handler.py b/tests/basic-structs/test_handler.py index 9604d99d..65a55728 100644 --- a/tests/basic-structs/test_handler.py +++ b/tests/basic-structs/test_handler.py @@ -13,6 +13,7 @@ def test_all_args(mocker): id = mocker.Mock() reason = mocker.Mock() field = mocker.Mock() + errors = mocker.Mock() timeout = mocker.Mock() initial = mocker.Mock() labels = mocker.Mock() @@ -23,6 +24,7 @@ def test_all_args(mocker): id=id, reason=reason, field=field, + errors=errors, timeout=timeout, initial=initial, labels=labels, @@ -34,6 +36,7 @@ def test_all_args(mocker): assert handler.reason is reason assert handler.event is reason # deprecated assert handler.field is field + assert handler.errors is errors assert handler.timeout is timeout assert handler.initial is initial assert handler.labels is labels diff --git a/tests/registries/test_decorators.py b/tests/registries/test_decorators.py index edd62f9b..d19631bb 100644 --- a/tests/registries/test_decorators.py +++ b/tests/registries/test_decorators.py @@ -3,7 +3,7 @@ import kopf from kopf.reactor.causation import Reason from kopf.reactor.handling import subregistry_var -from kopf.reactor.registries import ResourceRegistry, OperatorRegistry, ResourceChangingRegistry +from kopf.reactor.registries import ErrorsMode, OperatorRegistry, ResourceChangingRegistry from kopf.structs.resources import Resource @@ -21,6 +21,7 @@ def fn(**_): assert handlers[0].fn is fn assert handlers[0].reason == Reason.CREATE assert handlers[0].field is None + assert handlers[0].errors is None assert handlers[0].timeout is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -40,6 +41,7 @@ def fn(**_): assert handlers[0].fn is fn assert handlers[0].reason == Reason.UPDATE assert handlers[0].field is None + assert handlers[0].errors is None assert handlers[0].timeout is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -59,6 +61,7 @@ def fn(**_): assert handlers[0].fn is fn assert handlers[0].reason == Reason.DELETE assert handlers[0].field is None + assert handlers[0].errors is None assert handlers[0].timeout is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -79,6 +82,7 @@ def fn(**_): assert handlers[0].fn is fn assert handlers[0].reason is None assert handlers[0].field == ('field', 'subfield') + assert handlers[0].errors is None assert handlers[0].timeout is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -98,7 +102,7 @@ def test_on_create_with_all_kwargs(mocker): mocker.patch('kopf.reactor.registries.match', return_value=True) @kopf.on.create('group', 'version', 'plural', - id='id', timeout=123, registry=registry, + id='id', errors=ErrorsMode.PERMANENT, timeout=123, registry=registry, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -110,6 +114,7 @@ def fn(**_): assert handlers[0].reason == Reason.CREATE assert handlers[0].field is None assert handlers[0].id == 'id' + assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -122,7 +127,7 @@ def test_on_update_with_all_kwargs(mocker): mocker.patch('kopf.reactor.registries.match', return_value=True) @kopf.on.update('group', 'version', 'plural', - id='id', timeout=123, registry=registry, + id='id', errors=ErrorsMode.PERMANENT, timeout=123, registry=registry, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -134,6 +139,7 @@ def fn(**_): assert handlers[0].reason == Reason.UPDATE assert handlers[0].field is None assert handlers[0].id == 'id' + assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -150,7 +156,8 @@ def test_on_delete_with_all_kwargs(mocker, optional): mocker.patch('kopf.reactor.registries.match', return_value=True) @kopf.on.delete('group', 'version', 'plural', - id='id', timeout=123, registry=registry, optional=optional, + id='id', errors=ErrorsMode.PERMANENT, timeout=123, registry=registry, + optional=optional, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -162,6 +169,7 @@ def fn(**_): assert handlers[0].reason == Reason.DELETE assert handlers[0].field is None assert handlers[0].id == 'id' + assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -175,7 +183,7 @@ def test_on_field_with_all_kwargs(mocker): mocker.patch('kopf.reactor.registries.match', return_value=True) @kopf.on.field('group', 'version', 'plural', 'field.subfield', - id='id', timeout=123, registry=registry, + id='id', errors=ErrorsMode.PERMANENT, timeout=123, registry=registry, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -187,6 +195,7 @@ def fn(**_): assert handlers[0].reason is None assert handlers[0].field ==('field', 'subfield') assert handlers[0].id == 'id/field.subfield' + assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} From 5bd03a246f4898f36bfe91b5fdcd09beadd413d6 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 2 Nov 2019 21:30:33 +0100 Subject: [PATCH 3/8] Configure number of retries individually per handler --- kopf/__init__.py | 2 + kopf/on.py | 21 +++++--- kopf/reactor/handling.py | 7 +++ kopf/reactor/registries.py | 7 ++- tests/basic-structs/test_handler.py | 3 ++ tests/handling/conftest.py | 8 ++-- ...st_timeouts.py => test_retrying_limits.py} | 48 +++++++++++++++++++ tests/registries/test_decorators.py | 20 ++++++-- 8 files changed, 99 insertions(+), 17 deletions(-) rename tests/handling/{test_timeouts.py => test_retrying_limits.py} (54%) diff --git a/kopf/__init__.py b/kopf/__init__.py index 5d65bb44..83ae4ad5 100644 --- a/kopf/__init__.py +++ b/kopf/__init__.py @@ -38,6 +38,7 @@ TemporaryError, PermanentError, HandlerTimeoutError, + HandlerRetriesError, execute, ) from kopf.reactor.lifecycles import ( @@ -95,6 +96,7 @@ 'PermanentError', 'HandlerFatalError', 'TemporaryError', 'HandlerRetryError', 'HandlerTimeoutError', + 'HandlerRetriesError', 'BaseRegistry', # deprecated 'SimpleRegistry', # deprecated 'GlobalRegistry', # deprecated diff --git a/kopf/on.py b/kopf/on.py index 484b6295..7b1da193 100644 --- a/kopf/on.py +++ b/kopf/on.py @@ -28,6 +28,7 @@ def resume( id: Optional[str] = None, errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, annotations: Optional[bodies.Annotations] = None, @@ -38,7 +39,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=None, initial=True, id=id, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, fn=fn, labels=labels, annotations=annotations, ) return decorator @@ -50,6 +51,7 @@ def create( id: Optional[str] = None, errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, annotations: Optional[bodies.Annotations] = None, @@ -60,7 +62,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=causation.Reason.CREATE, id=id, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, fn=fn, labels=labels, annotations=annotations, ) return decorator @@ -72,6 +74,7 @@ def update( id: Optional[str] = None, errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, annotations: Optional[bodies.Annotations] = None, @@ -82,7 +85,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=causation.Reason.UPDATE, id=id, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, fn=fn, labels=labels, annotations=annotations, ) return decorator @@ -94,6 +97,7 @@ def delete( id: Optional[str] = None, errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, registry: Optional[registries.OperatorRegistry] = None, optional: Optional[bool] = None, labels: Optional[bodies.Labels] = None, @@ -105,7 +109,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=causation.Reason.DELETE, id=id, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, fn=fn, requires_finalizer=bool(not optional), labels=labels, annotations=annotations, ) @@ -119,6 +123,7 @@ def field( id: Optional[str] = None, errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, annotations: Optional[bodies.Annotations] = None, @@ -129,7 +134,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=None, field=field, id=id, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, fn=fn, labels=labels, annotations=annotations, ) return decorator @@ -160,6 +165,7 @@ def this( id: Optional[str] = None, errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, registry: Optional[registries.ResourceChangingRegistry] = None, ) -> ResourceHandlerDecorator: """ @@ -195,7 +201,7 @@ def create_task(*, spec, task=task, **kwargs): def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register( id=id, fn=fn, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, ) return decorator @@ -206,6 +212,7 @@ def register( id: Optional[str] = None, errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, registry: Optional[registries.ResourceChangingRegistry] = None, ) -> registries.ResourceHandlerFn: """ @@ -234,6 +241,6 @@ def create_single_task(task=task, **_): """ decorator = this( id=id, registry=registry, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, ) return decorator(fn) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 2c2b51a5..03c70686 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -63,6 +63,10 @@ class HandlerTimeoutError(PermanentError): """ An error for the handler's timeout (if set). """ +class HandlerRetriesError(PermanentError): + """ An error for the handler's retries exceeded (if set). """ + + class HandlerChildrenRetry(TemporaryError): """ An internal pseudo-error to retry for the next sub-handlers attempt. """ @@ -430,6 +434,9 @@ async def _execute_handler( if handler.timeout is not None and state.runtime.total_seconds() > handler.timeout: raise HandlerTimeoutError(f"Handler {handler.id!r} has timed out after {state.runtime}.") + if handler.retries is not None and state.retries >= handler.retries: + raise HandlerRetriesError(f"Handler {handler.id!r} has exceeded {state.retries} retries.") + result = await _call_handler( handler, cause=cause, diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index aa3b8622..9eb3a09a 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -77,6 +77,7 @@ class ResourceHandler(NamedTuple): field: Optional[dicts.FieldPath] errors: Optional[ErrorsMode] = None timeout: Optional[float] = None + retries: Optional[int] = None initial: Optional[bool] = None labels: Optional[bodies.Labels] = None annotations: Optional[bodies.Annotations] = None @@ -117,6 +118,7 @@ def register( field: Optional[dicts.FieldSpec] = None, errors: Optional[ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, initial: Optional[bool] = None, requires_finalizer: bool = False, labels: Optional[bodies.Labels] = None, @@ -129,7 +131,7 @@ def register( real_id = generate_id(fn=fn, id=id, prefix=self.prefix, suffix=".".join(real_field or [])) handler = ResourceHandler( id=real_id, fn=fn, reason=reason, field=real_field, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, initial=initial, requires_finalizer=requires_finalizer, labels=labels, annotations=annotations, ) @@ -249,6 +251,7 @@ def register_resource_changing_handler( field: Optional[dicts.FieldSpec] = None, errors: Optional[ErrorsMode] = None, timeout: Optional[float] = None, + retries: Optional[int] = None, initial: Optional[bool] = None, requires_finalizer: bool = False, labels: Optional[bodies.Labels] = None, @@ -260,7 +263,7 @@ def register_resource_changing_handler( resource = resources_.Resource(group, version, plural) return self._resource_changing_handlers[resource].register( reason=reason, event=event, field=field, fn=fn, id=id, - errors=errors, timeout=timeout, + errors=errors, timeout=timeout, retries=retries, initial=initial, requires_finalizer=requires_finalizer, labels=labels, annotations=annotations, ) diff --git a/tests/basic-structs/test_handler.py b/tests/basic-structs/test_handler.py index 65a55728..d9b7821b 100644 --- a/tests/basic-structs/test_handler.py +++ b/tests/basic-structs/test_handler.py @@ -15,6 +15,7 @@ def test_all_args(mocker): field = mocker.Mock() errors = mocker.Mock() timeout = mocker.Mock() + retries = mocker.Mock() initial = mocker.Mock() labels = mocker.Mock() annotations = mocker.Mock() @@ -26,6 +27,7 @@ def test_all_args(mocker): field=field, errors=errors, timeout=timeout, + retries=retries, initial=initial, labels=labels, annotations=annotations, @@ -38,6 +40,7 @@ def test_all_args(mocker): assert handler.field is field assert handler.errors is errors assert handler.timeout is timeout + assert handler.retries is retries assert handler.initial is initial assert handler.labels is labels assert handler.annotations is annotations diff --git a/tests/handling/conftest.py b/tests/handling/conftest.py index 08051a68..f0e861bf 100644 --- a/tests/handling/conftest.py +++ b/tests/handling/conftest.py @@ -94,19 +94,19 @@ def handlers(clear_default_registry): async def event_fn(**kwargs): return event_mock(**kwargs) - @kopf.on.create('zalando.org', 'v1', 'kopfexamples', id='create_fn', timeout=600) + @kopf.on.create('zalando.org', 'v1', 'kopfexamples', id='create_fn', timeout=600, retries=100) async def create_fn(**kwargs): return create_mock(**kwargs) - @kopf.on.update('zalando.org', 'v1', 'kopfexamples', id='update_fn', timeout=600) + @kopf.on.update('zalando.org', 'v1', 'kopfexamples', id='update_fn', timeout=600, retries=100) async def update_fn(**kwargs): return update_mock(**kwargs) - @kopf.on.delete('zalando.org', 'v1', 'kopfexamples', id='delete_fn', timeout=600) + @kopf.on.delete('zalando.org', 'v1', 'kopfexamples', id='delete_fn', timeout=600, retries=100) async def delete_fn(**kwargs): return delete_mock(**kwargs) - @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn', timeout=600) + @kopf.on.resume('zalando.org', 'v1', 'kopfexamples', id='resume_fn', timeout=600, retries=100) async def resume_fn(**kwargs): return resume_mock(**kwargs) diff --git a/tests/handling/test_timeouts.py b/tests/handling/test_retrying_limits.py similarity index 54% rename from tests/handling/test_timeouts.py rename to tests/handling/test_retrying_limits.py index 5a17109d..4b271b39 100644 --- a/tests/handling/test_timeouts.py +++ b/tests/handling/test_retrying_limits.py @@ -1,4 +1,5 @@ import asyncio +import datetime import logging import freezegun @@ -58,3 +59,50 @@ async def test_timed_out_handler_fails( assert_logs([ "Handler .+ has timed out after", ]) + + +# The limits are hard-coded in conftest.py:handlers(). +# The extrahandlers are needed to prevent the cycle ending and status purging. +@pytest.mark.parametrize('cause_type', HANDLER_REASONS) +async def test_retries_limited_handler_fails( + registry, handlers, extrahandlers, resource, cause_mock, cause_type, + caplog, assert_logs, k8s_mocked): + caplog.set_level(logging.DEBUG) + name1 = f'{cause_type}_fn' + + cause_mock.reason = cause_type + cause_mock.body.update({ + 'status': {'kopf': {'progress': { + 'create_fn': {'retries': 100}, + 'update_fn': {'retries': 100}, + 'delete_fn': {'retries': 100}, + 'resume_fn': {'retries': 100}, + }}} + }) + + await resource_handler( + lifecycle=kopf.lifecycles.one_by_one, + registry=registry, + resource=resource, + event={'type': 'irrelevant', 'object': cause_mock.body}, + freeze=asyncio.Event(), + replenished=asyncio.Event(), + event_queue=asyncio.Queue(), + ) + + assert not handlers.create_mock.called + assert not handlers.update_mock.called + assert not handlers.delete_mock.called + assert not handlers.resume_mock.called + + # Progress is reset, as the handler is not going to retry. + assert not k8s_mocked.sleep_or_wait.called + assert k8s_mocked.patch_obj.called + + patch = k8s_mocked.patch_obj.call_args_list[0][1]['patch'] + assert patch['status']['kopf']['progress'] is not None + assert patch['status']['kopf']['progress'][name1]['failure'] is True + + assert_logs([ + r"Handler .+ has exceeded \d+ retries", + ]) diff --git a/tests/registries/test_decorators.py b/tests/registries/test_decorators.py index d19631bb..7c47cd15 100644 --- a/tests/registries/test_decorators.py +++ b/tests/registries/test_decorators.py @@ -23,6 +23,7 @@ def fn(**_): assert handlers[0].field is None assert handlers[0].errors is None assert handlers[0].timeout is None + assert handlers[0].retries is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -43,6 +44,7 @@ def fn(**_): assert handlers[0].field is None assert handlers[0].errors is None assert handlers[0].timeout is None + assert handlers[0].retries is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -63,6 +65,7 @@ def fn(**_): assert handlers[0].field is None assert handlers[0].errors is None assert handlers[0].timeout is None + assert handlers[0].retries is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -84,6 +87,7 @@ def fn(**_): assert handlers[0].field == ('field', 'subfield') assert handlers[0].errors is None assert handlers[0].timeout is None + assert handlers[0].retries is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -102,7 +106,8 @@ def test_on_create_with_all_kwargs(mocker): mocker.patch('kopf.reactor.registries.match', return_value=True) @kopf.on.create('group', 'version', 'plural', - id='id', errors=ErrorsMode.PERMANENT, timeout=123, registry=registry, + id='id', registry=registry, + errors=ErrorsMode.PERMANENT, timeout=123, retries=456, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -116,6 +121,7 @@ def fn(**_): assert handlers[0].id == 'id' assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 + assert handlers[0].retries == 456 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -127,7 +133,8 @@ def test_on_update_with_all_kwargs(mocker): mocker.patch('kopf.reactor.registries.match', return_value=True) @kopf.on.update('group', 'version', 'plural', - id='id', errors=ErrorsMode.PERMANENT, timeout=123, registry=registry, + id='id', registry=registry, + errors=ErrorsMode.PERMANENT, timeout=123, retries=456, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -141,6 +148,7 @@ def fn(**_): assert handlers[0].id == 'id' assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 + assert handlers[0].retries == 456 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -156,7 +164,8 @@ def test_on_delete_with_all_kwargs(mocker, optional): mocker.patch('kopf.reactor.registries.match', return_value=True) @kopf.on.delete('group', 'version', 'plural', - id='id', errors=ErrorsMode.PERMANENT, timeout=123, registry=registry, + id='id', registry=registry, + errors=ErrorsMode.PERMANENT, timeout=123, retries=456, optional=optional, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) @@ -171,6 +180,7 @@ def fn(**_): assert handlers[0].id == 'id' assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 + assert handlers[0].retries == 456 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -183,7 +193,8 @@ def test_on_field_with_all_kwargs(mocker): mocker.patch('kopf.reactor.registries.match', return_value=True) @kopf.on.field('group', 'version', 'plural', 'field.subfield', - id='id', errors=ErrorsMode.PERMANENT, timeout=123, registry=registry, + id='id', registry=registry, + errors=ErrorsMode.PERMANENT, timeout=123, retries=456, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -197,6 +208,7 @@ def fn(**_): assert handlers[0].id == 'id/field.subfield' assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 + assert handlers[0].retries == 456 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} From 923343fc2125b1f9937d410eb9a52baedc834ffa Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 2 Nov 2019 21:34:23 +0100 Subject: [PATCH 4/8] Check the handler timeouts by strict comparison (just in case) --- kopf/reactor/handling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 03c70686..538ad923 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -431,7 +431,7 @@ async def _execute_handler( try: logger.debug(f"Invoking handler {handler.id!r}.") - if handler.timeout is not None and state.runtime.total_seconds() > handler.timeout: + if handler.timeout is not None and state.runtime.total_seconds() >= handler.timeout: raise HandlerTimeoutError(f"Handler {handler.id!r} has timed out after {state.runtime}.") if handler.retries is not None and state.retries >= handler.retries: From 42e59c7dd249181f39f426a8c328ec170e031d8c Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 2 Nov 2019 23:12:47 +0100 Subject: [PATCH 5/8] Configure the cool-down period for arbitrary errors per handler --- kopf/on.py | 21 ++++++++++++++------- kopf/reactor/handling.py | 3 ++- kopf/reactor/registries.py | 7 +++++-- tests/basic-structs/test_handler.py | 3 +++ tests/registries/test_decorators.py | 16 ++++++++++++---- 5 files changed, 36 insertions(+), 14 deletions(-) diff --git a/kopf/on.py b/kopf/on.py index 7b1da193..085ba803 100644 --- a/kopf/on.py +++ b/kopf/on.py @@ -29,6 +29,7 @@ def resume( errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, annotations: Optional[bodies.Annotations] = None, @@ -39,7 +40,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=None, initial=True, id=id, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, fn=fn, labels=labels, annotations=annotations, ) return decorator @@ -52,6 +53,7 @@ def create( errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, annotations: Optional[bodies.Annotations] = None, @@ -62,7 +64,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=causation.Reason.CREATE, id=id, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, fn=fn, labels=labels, annotations=annotations, ) return decorator @@ -75,6 +77,7 @@ def update( errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, annotations: Optional[bodies.Annotations] = None, @@ -85,7 +88,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=causation.Reason.UPDATE, id=id, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, fn=fn, labels=labels, annotations=annotations, ) return decorator @@ -98,6 +101,7 @@ def delete( errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, optional: Optional[bool] = None, labels: Optional[bodies.Labels] = None, @@ -109,7 +113,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=causation.Reason.DELETE, id=id, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, fn=fn, requires_finalizer=bool(not optional), labels=labels, annotations=annotations, ) @@ -124,6 +128,7 @@ def field( errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, registry: Optional[registries.OperatorRegistry] = None, labels: Optional[bodies.Labels] = None, annotations: Optional[bodies.Annotations] = None, @@ -134,7 +139,7 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register_resource_changing_handler( group=group, version=version, plural=plural, reason=None, field=field, id=id, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, fn=fn, labels=labels, annotations=annotations, ) return decorator @@ -166,6 +171,7 @@ def this( errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, registry: Optional[registries.ResourceChangingRegistry] = None, ) -> ResourceHandlerDecorator: """ @@ -201,7 +207,7 @@ def create_task(*, spec, task=task, **kwargs): def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn: return actual_registry.register( id=id, fn=fn, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, ) return decorator @@ -213,6 +219,7 @@ def register( errors: Optional[registries.ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, registry: Optional[registries.ResourceChangingRegistry] = None, ) -> registries.ResourceHandlerFn: """ @@ -241,6 +248,6 @@ def create_single_task(task=task, **_): """ decorator = this( id=id, registry=registry, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, ) return decorator(fn) diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index 538ad923..2a88cfb3 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -419,6 +419,7 @@ async def _execute_handler( exceptions mean the failure of execution itself. """ errors = handler.errors if handler.errors is not None else default_errors + cooldown = handler.cooldown if handler.cooldown is not None else DEFAULT_RETRY_DELAY # Prevent successes/failures from posting k8s-events for resource-watching causes. logger: Union[logging.Logger, logging.LoggerAdapter] @@ -475,7 +476,7 @@ async def _execute_handler( return states.HandlerOutcome(final=True, exception=e) elif errors == registries.ErrorsMode.TEMPORARY: logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") - return states.HandlerOutcome(final=False, exception=e, delay=DEFAULT_RETRY_DELAY) + return states.HandlerOutcome(final=False, exception=e, delay=cooldown) elif errors == registries.ErrorsMode.PERMANENT: logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") return states.HandlerOutcome(final=True, exception=e) diff --git a/kopf/reactor/registries.py b/kopf/reactor/registries.py index 9eb3a09a..1528ceba 100644 --- a/kopf/reactor/registries.py +++ b/kopf/reactor/registries.py @@ -78,6 +78,7 @@ class ResourceHandler(NamedTuple): errors: Optional[ErrorsMode] = None timeout: Optional[float] = None retries: Optional[int] = None + cooldown: Optional[float] = None initial: Optional[bool] = None labels: Optional[bodies.Labels] = None annotations: Optional[bodies.Annotations] = None @@ -119,6 +120,7 @@ def register( errors: Optional[ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, initial: Optional[bool] = None, requires_finalizer: bool = False, labels: Optional[bodies.Labels] = None, @@ -131,7 +133,7 @@ def register( real_id = generate_id(fn=fn, id=id, prefix=self.prefix, suffix=".".join(real_field or [])) handler = ResourceHandler( id=real_id, fn=fn, reason=reason, field=real_field, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, initial=initial, requires_finalizer=requires_finalizer, labels=labels, annotations=annotations, ) @@ -252,6 +254,7 @@ def register_resource_changing_handler( errors: Optional[ErrorsMode] = None, timeout: Optional[float] = None, retries: Optional[int] = None, + cooldown: Optional[float] = None, initial: Optional[bool] = None, requires_finalizer: bool = False, labels: Optional[bodies.Labels] = None, @@ -263,7 +266,7 @@ def register_resource_changing_handler( resource = resources_.Resource(group, version, plural) return self._resource_changing_handlers[resource].register( reason=reason, event=event, field=field, fn=fn, id=id, - errors=errors, timeout=timeout, retries=retries, + errors=errors, timeout=timeout, retries=retries, cooldown=cooldown, initial=initial, requires_finalizer=requires_finalizer, labels=labels, annotations=annotations, ) diff --git a/tests/basic-structs/test_handler.py b/tests/basic-structs/test_handler.py index d9b7821b..b0160832 100644 --- a/tests/basic-structs/test_handler.py +++ b/tests/basic-structs/test_handler.py @@ -16,6 +16,7 @@ def test_all_args(mocker): errors = mocker.Mock() timeout = mocker.Mock() retries = mocker.Mock() + cooldown = mocker.Mock() initial = mocker.Mock() labels = mocker.Mock() annotations = mocker.Mock() @@ -28,6 +29,7 @@ def test_all_args(mocker): errors=errors, timeout=timeout, retries=retries, + cooldown=cooldown, initial=initial, labels=labels, annotations=annotations, @@ -41,6 +43,7 @@ def test_all_args(mocker): assert handler.errors is errors assert handler.timeout is timeout assert handler.retries is retries + assert handler.cooldown is cooldown assert handler.initial is initial assert handler.labels is labels assert handler.annotations is annotations diff --git a/tests/registries/test_decorators.py b/tests/registries/test_decorators.py index 7c47cd15..0d7f1396 100644 --- a/tests/registries/test_decorators.py +++ b/tests/registries/test_decorators.py @@ -24,6 +24,7 @@ def fn(**_): assert handlers[0].errors is None assert handlers[0].timeout is None assert handlers[0].retries is None + assert handlers[0].cooldown is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -45,6 +46,7 @@ def fn(**_): assert handlers[0].errors is None assert handlers[0].timeout is None assert handlers[0].retries is None + assert handlers[0].cooldown is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -66,6 +68,7 @@ def fn(**_): assert handlers[0].errors is None assert handlers[0].timeout is None assert handlers[0].retries is None + assert handlers[0].cooldown is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -88,6 +91,7 @@ def fn(**_): assert handlers[0].errors is None assert handlers[0].timeout is None assert handlers[0].retries is None + assert handlers[0].cooldown is None assert handlers[0].labels is None assert handlers[0].annotations is None @@ -107,7 +111,7 @@ def test_on_create_with_all_kwargs(mocker): @kopf.on.create('group', 'version', 'plural', id='id', registry=registry, - errors=ErrorsMode.PERMANENT, timeout=123, retries=456, + errors=ErrorsMode.PERMANENT, timeout=123, retries=456, cooldown=78, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -122,6 +126,7 @@ def fn(**_): assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 assert handlers[0].retries == 456 + assert handlers[0].cooldown == 78 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -134,7 +139,7 @@ def test_on_update_with_all_kwargs(mocker): @kopf.on.update('group', 'version', 'plural', id='id', registry=registry, - errors=ErrorsMode.PERMANENT, timeout=123, retries=456, + errors=ErrorsMode.PERMANENT, timeout=123, retries=456, cooldown=78, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -149,6 +154,7 @@ def fn(**_): assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 assert handlers[0].retries == 456 + assert handlers[0].cooldown == 78 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -165,7 +171,7 @@ def test_on_delete_with_all_kwargs(mocker, optional): @kopf.on.delete('group', 'version', 'plural', id='id', registry=registry, - errors=ErrorsMode.PERMANENT, timeout=123, retries=456, + errors=ErrorsMode.PERMANENT, timeout=123, retries=456, cooldown=78, optional=optional, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) @@ -181,6 +187,7 @@ def fn(**_): assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 assert handlers[0].retries == 456 + assert handlers[0].cooldown == 78 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} @@ -194,7 +201,7 @@ def test_on_field_with_all_kwargs(mocker): @kopf.on.field('group', 'version', 'plural', 'field.subfield', id='id', registry=registry, - errors=ErrorsMode.PERMANENT, timeout=123, retries=456, + errors=ErrorsMode.PERMANENT, timeout=123, retries=456, cooldown=78, labels={'somelabel': 'somevalue'}, annotations={'someanno': 'somevalue'}) def fn(**_): @@ -209,6 +216,7 @@ def fn(**_): assert handlers[0].errors == ErrorsMode.PERMANENT assert handlers[0].timeout == 123 assert handlers[0].retries == 456 + assert handlers[0].cooldown == 78 assert handlers[0].labels == {'somelabel': 'somevalue'} assert handlers[0].annotations == {'someanno': 'somevalue'} From eabafb968be3e408381075290acf5290e214148e Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 2 Nov 2019 21:46:17 +0100 Subject: [PATCH 6/8] Document the error modes and retries limits --- docs/errors.rst | 62 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/docs/errors.rst b/docs/errors.rst index 488d5023..4c992ae5 100644 --- a/docs/errors.rst +++ b/docs/errors.rst @@ -60,15 +60,30 @@ is no need to retry over time, as it will not become better:: raise kopf.PermanentError("The object is not valid anymore.") - Regular errors ============== -Any other exceptions behave as either the retriable (default) or fatal, -depending on the settings. +Kopf assumes that any arbitrary errors +(i.e. not `TemporaryError` and not `PermanentError`) +are the environment issues and can self-resolve after some time. + +As such, as a default behaviour, +Kopf retries the handlers with arbitrary errors +infinitely until they either succeed or fail permanently. + +The reaction to the arbitrary errors can be configured:: + + import kopf + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples', errors=kopf.ErrorsMode.PERMANENT) + def create_fn(spec, **_): + raise Exception() + +Possible values of ``errors`` are: -.. todo:: - An example of an unexpected HTTP error? +* `kopf.ErrorsMode.TEMPORARY` (the default). +* `kopf.ErrorsMode.PERMANENT` (prevent retries). +* `kopf.ErrorsMode.IGNORED` (same as in the resource watching handlers). Timeouts @@ -90,3 +105,40 @@ an `asyncio.TimeoutError` is raised; there is no equivalent way of terminating the synchronous functions by force. By default, there is no timeout, so the retries continue forever. + + +Retries +======= + +The number of retries can be limited too:: + + import kopf + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples', retries=3) + def create_fn(spec, **_): + raise Exception() + +Once the number of retries is reached, the handler fails permanently. + +By default, there is no limit, so the retries continue forever. + + +Cool-down on errors +=================== + +The interval between retries on arbitrary errors, when an external environment +is supposed to recover and be able to succeed the handler execution, +can be configured:: + + import kopf + + @kopf.on.create('zalando.org', 'v1', 'kopfexamples', cooldown=30) + def create_fn(spec, **_): + raise Exception() + +The default is 60 seconds. + +.. note:: + + This only affects the arbitrary errors. When `TemporaryError` + is explicitly used, the delay should be configured with ``delay=...``. From afce96873455cdd226307da38d712e596b2e83d7 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 5 Nov 2019 17:18:46 +0100 Subject: [PATCH 7/8] Improve the example of exceptions with the new features --- examples/03-exceptions/example.py | 34 +++++++++++++++++++------------ tests/e2e/test_examples.py | 2 +- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/examples/03-exceptions/example.py b/examples/03-exceptions/example.py index d4666516..bf56d0f9 100644 --- a/examples/03-exceptions/example.py +++ b/examples/03-exceptions/example.py @@ -1,11 +1,9 @@ -import time - import kopf E2E_TRACEBACKS = True -E2E_CREATION_STOP_WORDS = ['Third failure, the final one'] -E2E_SUCCESS_COUNTS = {} -E2E_FAILURE_COUNTS = {'create_fn': 1} +E2E_CREATION_STOP_WORDS = ['Something has changed,'] +E2E_SUCCESS_COUNTS = {'eventual_success_with_few_messages': 1} +E2E_FAILURE_COUNTS = {'eventual_failure_with_tracebacks': 1, 'instant_failure_with_traceback': 1, 'instant_failure_with_only_a_message': 1} class MyException(Exception): @@ -13,11 +11,21 @@ class MyException(Exception): @kopf.on.create('zalando.org', 'v1', 'kopfexamples') -def create_fn(retry, **kwargs): - time.sleep(0.1) # for different timestamps of the events - if not retry: - raise kopf.TemporaryError("First failure.", delay=1) - elif retry == 1: - raise MyException("Second failure.") - else: - raise kopf.PermanentError("Third failure, the final one.") +def instant_failure_with_only_a_message(**kwargs): + raise kopf.PermanentError("Fail once and for all.") + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples') +def eventual_success_with_few_messages(retry, **kwargs): + if retry < 3: # 0, 1, 2, 3 + raise kopf.TemporaryError("Expected recoverable error.", delay=1.0) + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples', retries=3, cooldown=1.0) +def eventual_failure_with_tracebacks(**kwargs): + raise MyException("An error that is supposed to be recoverable.") + + +@kopf.on.create('zalando.org', 'v1', 'kopfexamples', errors=kopf.ErrorsMode.PERMANENT, cooldown=1.0) +def instant_failure_with_traceback(**kwargs): + raise MyException("An error that is supposed to be recoverable.") diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index 339f1b74..f2977db5 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -86,7 +86,7 @@ def test_all_examples_are_runnable(mocker, with_crd, exampledir, caplog): assert set(name_counts.values()) == {1} # Verify that once a handler fails, it is never re-executed again. - handler_names = re.findall(r"Handler '(.+?)' failed permanently", runner.stdout) + handler_names = re.findall(r"Handler '(.+?)' failed (?:permanently|with an exception. Will stop.)", runner.stdout) if e2e_failure_counts is not None: checked_names = [name for name in handler_names if name in e2e_failure_counts] name_counts = collections.Counter(checked_names) From 98e015dbb5ca2e3057269efdbad9c171b1bea930 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 12 Nov 2019 11:32:04 +0100 Subject: [PATCH 8/8] Fix the docs (as per PR) --- docs/errors.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/errors.rst b/docs/errors.rst index 4c992ae5..f3ba59b8 100644 --- a/docs/errors.rst +++ b/docs/errors.rst @@ -65,11 +65,11 @@ Regular errors Kopf assumes that any arbitrary errors (i.e. not `TemporaryError` and not `PermanentError`) -are the environment issues and can self-resolve after some time. +are environment issues and can self-resolve after some time. As such, as a default behaviour, Kopf retries the handlers with arbitrary errors -infinitely until they either succeed or fail permanently. +infinitely until the handlers either succeed or fail permanently. The reaction to the arbitrary errors can be configured::