Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Configure reactions to errors in the handlers, per-handler #222

Merged
merged 8 commits into from
Nov 12, 2019
62 changes: 57 additions & 5 deletions docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,30 @@ is no need to retry over time, as it will not become better::
raise kopf.PermanentError("The object is not valid anymore.")



Regular errors
==============

Any other exceptions behave as either the retriable (default) or fatal,
depending on the settings.
Kopf assumes that any arbitrary errors
(i.e. not `TemporaryError` and not `PermanentError`)
are environment issues and can self-resolve after some time.

As such, as a default behaviour,
Kopf retries the handlers with arbitrary errors
infinitely until the handlers either succeed or fail permanently.

The reaction to the arbitrary errors can be configured::

import kopf

@kopf.on.create('zalando.org', 'v1', 'kopfexamples', errors=kopf.ErrorsMode.PERMANENT)
def create_fn(spec, **_):
raise Exception()

Possible values of ``errors`` are:

.. todo::
An example of an unexpected HTTP error?
* `kopf.ErrorsMode.TEMPORARY` (the default).
* `kopf.ErrorsMode.PERMANENT` (prevent retries).
* `kopf.ErrorsMode.IGNORED` (same as in the resource watching handlers).


Timeouts
Expand All @@ -90,3 +105,40 @@ an `asyncio.TimeoutError` is raised;
there is no equivalent way of terminating the synchronous functions by force.

By default, there is no timeout, so the retries continue forever.


Retries
=======

The number of retries can be limited too::

import kopf

@kopf.on.create('zalando.org', 'v1', 'kopfexamples', retries=3)
def create_fn(spec, **_):
raise Exception()

Once the number of retries is reached, the handler fails permanently.

By default, there is no limit, so the retries continue forever.


Cool-down on errors
===================

The interval between retries on arbitrary errors, when an external environment
is supposed to recover and be able to succeed the handler execution,
can be configured::

import kopf

@kopf.on.create('zalando.org', 'v1', 'kopfexamples', cooldown=30)
def create_fn(spec, **_):
raise Exception()

The default is 60 seconds.

.. note::

This only affects the arbitrary errors. When `TemporaryError`
is explicitly used, the delay should be configured with ``delay=...``.
34 changes: 21 additions & 13 deletions examples/03-exceptions/example.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
import time

import kopf

E2E_TRACEBACKS = True
E2E_CREATION_STOP_WORDS = ['Third failure, the final one']
E2E_SUCCESS_COUNTS = {}
E2E_FAILURE_COUNTS = {'create_fn': 1}
E2E_CREATION_STOP_WORDS = ['Something has changed,']
E2E_SUCCESS_COUNTS = {'eventual_success_with_few_messages': 1}
E2E_FAILURE_COUNTS = {'eventual_failure_with_tracebacks': 1, 'instant_failure_with_traceback': 1, 'instant_failure_with_only_a_message': 1}


class MyException(Exception):
pass


@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_fn(retry, **kwargs):
time.sleep(0.1) # for different timestamps of the events
if not retry:
raise kopf.TemporaryError("First failure.", delay=1)
elif retry == 1:
raise MyException("Second failure.")
else:
raise kopf.PermanentError("Third failure, the final one.")
def instant_failure_with_only_a_message(**kwargs):
raise kopf.PermanentError("Fail once and for all.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def eventual_success_with_few_messages(retry, **kwargs):
if retry < 3: # 0, 1, 2, 3
raise kopf.TemporaryError("Expected recoverable error.", delay=1.0)


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', retries=3, cooldown=1.0)
def eventual_failure_with_tracebacks(**kwargs):
raise MyException("An error that is supposed to be recoverable.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', errors=kopf.ErrorsMode.PERMANENT, cooldown=1.0)
def instant_failure_with_traceback(**kwargs):
raise MyException("An error that is supposed to be recoverable.")
4 changes: 4 additions & 0 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
TemporaryError,
PermanentError,
HandlerTimeoutError,
HandlerRetriesError,
execute,
)
from kopf.reactor.lifecycles import (
get_default_lifecycle,
set_default_lifecycle,
)
from kopf.reactor.registries import (
ErrorsMode,
ResourceRegistry,
ResourceWatchingRegistry,
ResourceChangingRegistry,
Expand Down Expand Up @@ -90,9 +92,11 @@
'get_default_lifecycle', 'set_default_lifecycle',
'build_object_reference', 'build_owner_reference',
'append_owner_reference', 'remove_owner_reference',
'ErrorsMode',
'PermanentError', 'HandlerFatalError',
'TemporaryError', 'HandlerRetryError',
'HandlerTimeoutError',
'HandlerRetriesError',
'BaseRegistry', # deprecated
'SimpleRegistry', # deprecated
'GlobalRegistry', # deprecated
Expand Down
65 changes: 52 additions & 13 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ def resume(
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[registries.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
cooldown: Optional[float] = None,
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
Expand All @@ -36,16 +39,21 @@ def resume(
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=None, initial=True, id=id, timeout=timeout,
fn=fn, labels=labels, annotations=annotations)
reason=None, initial=True, id=id,
errors=errors, timeout=timeout, retries=retries, cooldown=cooldown,
fn=fn, labels=labels, annotations=annotations,
)
return decorator


def create(
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[registries.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
cooldown: Optional[float] = None,
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
Expand All @@ -55,16 +63,21 @@ def create(
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=causation.Reason.CREATE, id=id, timeout=timeout,
fn=fn, labels=labels, annotations=annotations)
reason=causation.Reason.CREATE, id=id,
errors=errors, timeout=timeout, retries=retries, cooldown=cooldown,
fn=fn, labels=labels, annotations=annotations,
)
return decorator


def update(
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[registries.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
cooldown: Optional[float] = None,
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
Expand All @@ -74,16 +87,21 @@ def update(
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=causation.Reason.UPDATE, id=id, timeout=timeout,
fn=fn, labels=labels, annotations=annotations)
reason=causation.Reason.UPDATE, id=id,
errors=errors, timeout=timeout, retries=retries, cooldown=cooldown,
fn=fn, labels=labels, annotations=annotations,
)
return decorator


def delete(
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
errors: Optional[registries.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
cooldown: Optional[float] = None,
registry: Optional[registries.OperatorRegistry] = None,
optional: Optional[bool] = None,
labels: Optional[bodies.Labels] = None,
Expand All @@ -94,9 +112,11 @@ def delete(
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=causation.Reason.DELETE, id=id, timeout=timeout,
reason=causation.Reason.DELETE, id=id,
errors=errors, timeout=timeout, retries=retries, cooldown=cooldown,
fn=fn, requires_finalizer=bool(not optional),
labels=labels, annotations=annotations)
labels=labels, annotations=annotations,
)
return decorator


Expand All @@ -105,7 +125,10 @@ def field(
field: Union[str, List[str], Tuple[str, ...]],
*,
id: Optional[str] = None,
errors: Optional[registries.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
cooldown: Optional[float] = None,
registry: Optional[registries.OperatorRegistry] = None,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
Expand All @@ -115,8 +138,10 @@ def field(
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=None, field=field, id=id, timeout=timeout,
fn=fn, labels=labels, annotations=annotations)
reason=None, field=field, id=id,
errors=errors, timeout=timeout, retries=retries, cooldown=cooldown,
fn=fn, labels=labels, annotations=annotations,
)
return decorator


Expand All @@ -133,7 +158,8 @@ def event(
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register_resource_watching_handler(
group=group, version=version, plural=plural,
id=id, fn=fn, labels=labels, annotations=annotations)
id=id, fn=fn, labels=labels, annotations=annotations,
)
return decorator


Expand All @@ -142,7 +168,10 @@ def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
def this(
*,
id: Optional[str] = None,
errors: Optional[registries.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
cooldown: Optional[float] = None,
registry: Optional[registries.ResourceChangingRegistry] = None,
) -> ResourceHandlerDecorator:
"""
Expand Down Expand Up @@ -176,15 +205,21 @@ def create_task(*, spec, task=task, **kwargs):
"""
actual_registry = registry if registry is not None else handling.subregistry_var.get()
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register(id=id, fn=fn, timeout=timeout)
return actual_registry.register(
id=id, fn=fn,
errors=errors, timeout=timeout, retries=retries, cooldown=cooldown,
)
return decorator


def register(
fn: registries.ResourceHandlerFn,
*,
id: Optional[str] = None,
errors: Optional[registries.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
cooldown: Optional[float] = None,
registry: Optional[registries.ResourceChangingRegistry] = None,
) -> registries.ResourceHandlerFn:
"""
Expand All @@ -211,4 +246,8 @@ def create_it(spec, **kwargs):
def create_single_task(task=task, **_):
pass
"""
return this(id=id, timeout=timeout, registry=registry)(fn)
decorator = this(
id=id, registry=registry,
errors=errors, timeout=timeout, retries=retries, cooldown=cooldown,
)
return decorator(fn)
Loading