From 8d18a31418f14fc9d6174e12f6f21863320c1ada Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Mon, 19 Oct 2020 23:04:02 +0200 Subject: [PATCH 1/5] Drop deprecated "legacy peering mode" (KopfPeering, but cluster-wide) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The legacy peering mode is now 1.5 years since it was deprecated in the early stages. There is no need to support it anymore, as barely anyone still use it in their clusters (taking into account that peering per se it not used often at all). But the legacy mode complicates the coming features with dynamic cluster scanning. So, the time has come to drop it — without releasing the major version of the framework. --- docs/peering.rst | 24 +++---------- kopf/clients/fetching.py | 24 ------------- kopf/engines/peering.py | 40 +++------------------ tests/k8s/test_read_crd.py | 53 ---------------------------- tests/peering/test_peer_detection.py | 4 --- tests/peering/test_peers.py | 17 ++------- 6 files changed, 13 insertions(+), 149 deletions(-) delete mode 100644 tests/k8s/test_read_crd.py diff --git a/docs/peering.rst b/docs/peering.rst index abc47b4f..a132a4d4 100644 --- a/docs/peering.rst +++ b/docs/peering.rst @@ -59,25 +59,11 @@ Create the peering objects as needed with one of: .. note:: - Previously, ``KopfPeering`` was the only CRD, and it was cluster-scoped. - Now, it is namespaced. For the new users, it all will be fine and working. - - If the old ``KopfPeering`` CRD is already deployed to your cluster, - it will also continue to work as before without re-configuration: - though there will be no namespace isolation as documented here --- - it will be cluster peering regardless of :option:`--namespace` - (as it was before the changes). - - When possible (but strictly after the Kopf's version upgrade), - please delete the old CRD, and re-create from scratch: - - .. code-block:: bash - - kubectl delete crd kopfpeerings.zalando.org - # give it 1-2 minutes to cleanup, or repeat until succeeded: - kubectl create -f peering.yaml - - Then re-deploy your custom peering objects of your apps. + In ``kopf<0.11`` (until May'2019), ``KopfPeering`` was the only CRD, + and it was cluster-scoped. In ``kopf>=0.11,<0.29`` (until Oct'2020), + this mode was deprecated but supported if the old CRD existed. + Since ``kopf>=0.29`` (Nov'2020), it is not supported anymore. + To upgrade, delete and re-create the peering CRDs to the new ones. Custom peering diff --git a/kopf/clients/fetching.py b/kopf/clients/fetching.py index 8e5bd30a..297d4c7d 100644 --- a/kopf/clients/fetching.py +++ b/kopf/clients/fetching.py @@ -15,30 +15,6 @@ class _UNSET(enum.Enum): token = enum.auto() -@auth.reauthenticated_request -async def read_crd( - *, - resource: resources.Resource, - default: Union[_T, _UNSET] = _UNSET.token, - context: Optional[auth.APIContext] = None, # injected by the decorator -) -> Union[bodies.RawBody, _T]: - if context is None: - raise RuntimeError("API instance is not injected by the decorator.") - - try: - response = await context.session.get( - url=CRD_CRD.get_url(server=context.server, name=resource.name), - ) - await errors.check_response(response) - respdata = await response.json() - return cast(bodies.RawBody, respdata) - - except aiohttp.ClientResponseError as e: - if e.status in [403, 404] and not isinstance(default, _UNSET): - return default - raise - - @auth.reauthenticated_request async def read_obj( *, diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 2d372a31..09e9675f 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -48,7 +48,6 @@ # The CRD info on the special sync-object. CLUSTER_PEERING_RESOURCE = resources.Resource('zalando.org', 'v1', 'clusterkopfpeerings') NAMESPACED_PEERING_RESOURCE = resources.Resource('zalando.org', 'v1', 'kopfpeerings') -LEGACY_PEERING_RESOURCE = resources.Resource('zalando.org', 'v1', 'kopfpeerings') PEERING_DEFAULT_NAME = 'default' @@ -65,7 +64,6 @@ def __init__( lastseen: Optional[Union[str, datetime.datetime]] = None, lifetime: Union[int, datetime.timedelta] = 60, namespace: Optional[str] = None, - legacy: bool = False, **_: Any, # for the forward-compatibility with the new fields ): super().__init__() @@ -81,14 +79,13 @@ def __init__( self.lastseen = self.lastseen.replace(tzinfo=None) # only the naive utc -- for comparison self.deadline = self.lastseen + self.lifetime self.is_dead = self.deadline <= datetime.datetime.utcnow() - self.legacy = legacy def __repr__(self) -> str: return f"{self.__class__.__name__}({self.id}, namespace={self.namespace}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})" @property def resource(self) -> resources.Resource: - return LEGACY_PEERING_RESOURCE if self.legacy else CLUSTER_PEERING_RESOURCE if self.namespace is None else NAMESPACED_PEERING_RESOURCE + return CLUSTER_PEERING_RESOURCE if self.namespace is None else NAMESPACED_PEERING_RESOURCE @classmethod async def detect( @@ -105,15 +102,11 @@ async def detect( if name: if await Peer._is_peering_exist(name, namespace=namespace): return cls(name=name, namespace=namespace, **kwargs) - elif await Peer._is_peering_legacy(name, namespace=namespace): - return cls(name=name, namespace=namespace, legacy=True, **kwargs) else: raise Exception(f"The peering {name!r} was not found") if await Peer._is_peering_exist(name=PEERING_DEFAULT_NAME, namespace=namespace): return cls(name=PEERING_DEFAULT_NAME, namespace=namespace, **kwargs) - elif await Peer._is_peering_legacy(name=PEERING_DEFAULT_NAME, namespace=namespace): - return cls(name=PEERING_DEFAULT_NAME, namespace=namespace, legacy=True, **kwargs) logger.warning(f"Default peering object not found, falling back to the standalone mode.") return None @@ -140,14 +133,14 @@ async def keepalive(self) -> None: Add a peer to the peers, and update its alive status. """ self.touch() - await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) + await apply_peers([self], name=self.name, namespace=self.namespace) async def disappear(self) -> None: """ Remove a peer from the peers (gracefully). """ self.touch(lifetime=0) - await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy) + await apply_peers([self], name=self.name, namespace=self.namespace) @staticmethod async def _is_peering_exist(name: str, namespace: Optional[str]) -> bool: @@ -155,32 +148,11 @@ async def _is_peering_exist(name: str, namespace: Optional[str]) -> bool: obj = await fetching.read_obj(resource=resource, namespace=namespace, name=name, default=None) return obj is not None - @staticmethod - async def _is_peering_legacy(name: str, namespace: Optional[str]) -> bool: - """ - Legacy mode for the peering: cluster-scoped KopfPeering (new mode: namespaced). - - .. deprecated:: 1.0 - - This logic will be removed since 1.0. - Deploy ``ClusterKopfPeering`` as per documentation, and use it normally. - """ - crd = await fetching.read_crd(resource=LEGACY_PEERING_RESOURCE, default=None) - if crd is None: - return False - - if str(crd.get('spec', {}).get('scope', '')).lower() != 'cluster': - return False # no legacy mode detected - - obj = await fetching.read_obj(resource=LEGACY_PEERING_RESOURCE, name=name, default=None) - return obj is not None - async def apply_peers( peers: Iterable[Peer], name: str, namespace: Union[None, str], - legacy: bool = False, ) -> None: """ Apply the changes in the peers to the sync-object. @@ -190,9 +162,7 @@ async def apply_peers( """ patch = patches.Patch() patch.update({'status': {peer.id: None if peer.is_dead else peer.as_dict() for peer in peers}}) - resource = (LEGACY_PEERING_RESOURCE if legacy else - CLUSTER_PEERING_RESOURCE if namespace is None else - NAMESPACED_PEERING_RESOURCE) + resource = CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE await patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) @@ -231,7 +201,7 @@ async def process_peering_event( if autoclean and dead_peers: # NB: sync and blocking, but this is fine. - await apply_peers(dead_peers, name=ourselves.name, namespace=ourselves.namespace, legacy=ourselves.legacy) + await apply_peers(dead_peers, name=ourselves.name, namespace=ourselves.namespace) if prio_peers: if freeze_mode.is_off(): diff --git a/tests/k8s/test_read_crd.py b/tests/k8s/test_read_crd.py deleted file mode 100644 index 1041dfd8..00000000 --- a/tests/k8s/test_read_crd.py +++ /dev/null @@ -1,53 +0,0 @@ -import aiohttp.web -import pytest - -from kopf.clients.fetching import CRD_CRD, read_crd - - -async def test_when_present( - resp_mocker, aresponses, hostname, resource): - - get_mock = resp_mocker(return_value=aiohttp.web.json_response({'a': 'b'})) - aresponses.add(hostname, CRD_CRD.get_url(name=resource.name), 'get', get_mock) - - crd = await read_crd(resource=resource) - assert crd == {'a': 'b'} - - assert get_mock.called - assert get_mock.call_count == 1 - - -@pytest.mark.parametrize('status', [403, 404]) -async def test_when_absent_with_no_default( - resp_mocker, aresponses, hostname, resource, status): - - get_mock = resp_mocker(return_value=aresponses.Response(status=status, reason="boo!")) - aresponses.add(hostname, CRD_CRD.get_url(name=resource.name), 'get', get_mock) - - with pytest.raises(aiohttp.ClientResponseError) as e: - await read_crd(resource=resource) - assert e.value.status == status - - -@pytest.mark.parametrize('default', [None, object()], ids=['none', 'object']) -@pytest.mark.parametrize('status', [403, 404]) -async def test_when_absent_with_default( - resp_mocker, aresponses, hostname, resource, default, status): - - get_mock = resp_mocker(return_value=aresponses.Response(status=status, reason="boo!")) - aresponses.add(hostname, CRD_CRD.get_url(name=resource.name), 'get', get_mock) - - crd = await read_crd(resource=resource, default=default) - assert crd is default - - -@pytest.mark.parametrize('status', [400, 401, 500, 666]) -async def test_raises_api_error_despite_default( - resp_mocker, aresponses, hostname, resource, status): - - get_mock = resp_mocker(return_value=aresponses.Response(status=status, reason="boo!")) - aresponses.add(hostname, CRD_CRD.get_url(name=resource.name), 'get', get_mock) - - with pytest.raises(aiohttp.ClientResponseError) as e: - await read_crd(resource=resource, default=object()) - assert e.value.status == status diff --git a/tests/peering/test_peer_detection.py b/tests/peering/test_peer_detection.py index be9fd74b..da3b86c5 100644 --- a/tests/peering/test_peer_detection.py +++ b/tests/peering/test_peer_detection.py @@ -5,10 +5,6 @@ from kopf.engines.peering import CLUSTER_PEERING_RESOURCE, NAMESPACED_PEERING_RESOURCE, \ PEERING_DEFAULT_NAME, Peer -# Note: the legacy peering is intentionally not tested: it was long time before -# these tests were written, so it does not make sense to keep it stable. -# The legacy peering is going to be removed in version 1.0 when it happens. - @pytest.fixture() def with_cluster_default(hostname, aresponses): diff --git a/tests/peering/test_peers.py b/tests/peering/test_peers.py index 7935b0ed..52110360 100644 --- a/tests/peering/test_peers.py +++ b/tests/peering/test_peers.py @@ -3,8 +3,7 @@ import freezegun import pytest -from kopf.engines.peering import CLUSTER_PEERING_RESOURCE, LEGACY_PEERING_RESOURCE, \ - NAMESPACED_PEERING_RESOURCE, Peer +from kopf.engines.peering import CLUSTER_PEERING_RESOURCE, NAMESPACED_PEERING_RESOURCE, Peer @freezegun.freeze_time('2020-12-31T23:59:59.123456') @@ -13,7 +12,6 @@ def test_defaults(): assert peer.id == 'id' assert peer.name == 'name' assert peer.namespace is None - assert peer.legacy is False assert peer.lifetime == datetime.timedelta(seconds=60) assert peer.lastseen == datetime.datetime(2020, 12, 31, 23, 59, 59, 123456) @@ -49,23 +47,14 @@ def test_priority_unspecified(): assert peer.priority == 0 -@pytest.mark.parametrize('namespace', [None, 'namespaced']) -def test_resource_for_legacy_peering(namespace): - peer = Peer(id='id', name='name', legacy=True, namespace=namespace) - assert peer.legacy is True - assert peer.resource == LEGACY_PEERING_RESOURCE - - def test_resource_for_cluster_peering(): - peer = Peer(id='id', name='name', legacy=False, namespace=None) - assert peer.legacy is False + peer = Peer(id='id', name='name', namespace=None) assert peer.resource == CLUSTER_PEERING_RESOURCE assert peer.namespace is None def test_resource_for_namespaced_peering(): - peer = Peer(id='id', name='name', legacy=False, namespace='namespaced') - assert peer.legacy is False + peer = Peer(id='id', name='name', namespace='namespaced') assert peer.resource == NAMESPACED_PEERING_RESOURCE assert peer.namespace == 'namespaced' From c9ab9cb40b54373ab52658a00ba8497f2d928c0e Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Mon, 26 Oct 2020 21:50:26 +0100 Subject: [PATCH 2/5] Configure peering via settings instead of "ourselves" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the peering setup was stored in a pseudo-peer, so called "ourselves" — as detected at startup. This also included the "legacy peering mode" (which is now removed), and the whole peering configuration — namespace, peering object name, priority, our own identity, etc. Now, this is split: the peering settings are moved to settings; the dynamic runtime identity is used directly; so as the namespace (with potentially multiple namespaces for multiple peering tasks in mind). The `Peer` class is only left for the actual peers as read from the peering object, and solely for the purpose of doing the math with lifetime & deadlines. --- docs/peering.rst | 49 +++- kopf/cli.py | 31 +-- kopf/engines/peering.py | 205 ++++++++------- kopf/reactor/running.py | 28 ++- kopf/structs/configuration.py | 43 ++++ tests/peering/test_freeze_mode.py | 77 ++++-- tests/peering/test_keepalive.py | 14 +- tests/peering/test_peer_detection.py | 234 +++++------------- ...t_apply_peers.py => test_peer_patching.py} | 65 ++--- tests/peering/test_peers.py | 84 ++----- tests/peering/test_resource_guessing.py | 13 + tests/settings/test_defaults.py | 5 + 12 files changed, 404 insertions(+), 444 deletions(-) rename tests/peering/{test_apply_peers.py => test_peer_patching.py} (58%) create mode 100644 tests/peering/test_resource_guessing.py diff --git a/docs/peering.rst b/docs/peering.rst index a132a4d4..521c8aae 100644 --- a/docs/peering.rst +++ b/docs/peering.rst @@ -23,6 +23,16 @@ To set the operator's priority, use :option:`--priority`: kopf run --priority=100 ... +Or: + +.. code-block:: python + + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.peering.priority = 100 + As a shortcut, there is a :option:`--dev` option, which sets the priority to ``666``, and is intended for the development mode. @@ -74,11 +84,27 @@ The operator can be instructed to use alternative peering objects:: kopf run --peering=example ... kopf run --peering=example --namespace=some-ns ... +Or: + +.. code-block:: python + + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.peering.name = "example" + settings.peering.mandatory = True + Depending on :option:`--namespace`, either ``ClusterKopfPeering`` or ``KopfPeering`` will be used (in the operator's namespace). If the peering object does not exist, the operator will fail to start. -Using :option:`--peering` assumes that the peering is required. +Using :option:`--peering` assumes that the peering is mandatory. + +Please note that in the startup handler, this is not exactly the same: +the mandatory mode must be set explicitly. Otherwise, the operator will try +to auto-detect the presence of the custom peering object, but will not fail +if it is absent -- unlike with the ``--peering=`` CLI option. The operators from different peering objects do not see each other. @@ -94,6 +120,16 @@ the standalone mode can be enabled:: kopf run --standalone ... +Or: + +.. code-block:: python + + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.peering.standalone = True + In that case, the operator will not freeze if other operators with the higher priority will start handling the objects, which may lead to the conflicting changes and reactions from multiple operators @@ -131,6 +167,17 @@ operator in the deployment or replicaset: kopf run --priority=$RANDOM ... +Or: + +.. code-block:: python + + import random + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.peering.priority = random.randint(0, 32767) + ``$RANDOM`` is a feature of bash (if you use another shell, see its man page for an equivalent). It returns a random integer in the range 0..32767. diff --git a/kopf/cli.py b/kopf/cli.py index 12200d9a..ab0313c6 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -121,21 +121,22 @@ def freeze( priority: int, ) -> None: """ Freeze the resource handling in the cluster. """ - ourserlves = peering.Peer( - id=id or peering.detect_own_id(manual=True), - name=peering_name, - namespace=namespace, - priority=priority, - lifetime=lifetime, - ) + identity = peering.Identity(id) if id else peering.detect_own_id(manual=True) registry = registries.SmartOperatorRegistry() settings = configuration.OperatorSettings() + settings.peering.name = peering_name + settings.peering.priority = priority vault = credentials.Vault() auth.vault_var.set(vault) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait({ activities.authenticate(registry=registry, settings=settings, vault=vault), - ourserlves.keepalive(), + peering.touch( + identity=identity, + settings=settings, + namespace=namespace, + lifetime=lifetime, + ), })) @@ -150,17 +151,19 @@ def resume( peering_name: str, ) -> None: """ Resume the resource handling in the cluster. """ - ourselves = peering.Peer( - id=id or peering.detect_own_id(manual=True), - name=peering_name, - namespace=namespace, - ) + identity = peering.Identity(id) if id else peering.detect_own_id(manual=True) registry = registries.SmartOperatorRegistry() settings = configuration.OperatorSettings() + settings.peering.name = peering_name vault = credentials.Vault() auth.vault_var.set(vault) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait({ activities.authenticate(registry=registry, settings=settings, vault=vault), - ourselves.disappear(), + peering.touch( + identity=identity, + settings=settings, + namespace=namespace, + lifetime=0, + ), })) diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 09e9675f..4c5d4a5f 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -35,12 +35,12 @@ import logging import os import random -from typing import Any, Dict, Iterable, Mapping, NoReturn, Optional, Union, cast +from typing import Any, Dict, Iterable, Mapping, NewType, NoReturn, Optional, Union, cast import iso8601 from kopf.clients import fetching, patching -from kopf.structs import bodies, patches, primitives, resources +from kopf.structs import bodies, configuration, patches, primitives, resources from kopf.utilities import hostnames logger = logging.getLogger(__name__) @@ -48,7 +48,8 @@ # The CRD info on the special sync-object. CLUSTER_PEERING_RESOURCE = resources.Resource('zalando.org', 'v1', 'clusterkopfpeerings') NAMESPACED_PEERING_RESOURCE = resources.Resource('zalando.org', 'v1', 'kopfpeerings') -PEERING_DEFAULT_NAME = 'default' + +Identity = NewType('Identity', str) # The class used to represent a peer in the parsed peers list (for convenience). @@ -57,19 +58,15 @@ class Peer: def __init__( self, - id: str, *, - name: str, + identity: Identity, priority: int = 0, lastseen: Optional[Union[str, datetime.datetime]] = None, lifetime: Union[int, datetime.timedelta] = 60, - namespace: Optional[str] = None, **_: Any, # for the forward-compatibility with the new fields ): super().__init__() - self.id = id - self.name = name - self.namespace = namespace + self.identity = identity self.priority = priority self.lifetime = (lifetime if isinstance(lifetime, datetime.timedelta) else datetime.timedelta(seconds=int(lifetime))) @@ -81,96 +78,24 @@ def __init__( self.is_dead = self.deadline <= datetime.datetime.utcnow() def __repr__(self) -> str: - return f"{self.__class__.__name__}({self.id}, namespace={self.namespace}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})" - - @property - def resource(self) -> resources.Resource: - return CLUSTER_PEERING_RESOURCE if self.namespace is None else NAMESPACED_PEERING_RESOURCE - - @classmethod - async def detect( - cls, - standalone: bool, - namespace: Optional[str], - name: Optional[str], - **kwargs: Any, - ) -> Optional["Peer"]: - - if standalone: - return None - - if name: - if await Peer._is_peering_exist(name, namespace=namespace): - return cls(name=name, namespace=namespace, **kwargs) - else: - raise Exception(f"The peering {name!r} was not found") - - if await Peer._is_peering_exist(name=PEERING_DEFAULT_NAME, namespace=namespace): - return cls(name=PEERING_DEFAULT_NAME, namespace=namespace, **kwargs) - - logger.warning(f"Default peering object not found, falling back to the standalone mode.") - return None + return f"{self.__class__.__name__}(identity={self.identity}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})" def as_dict(self) -> Dict[str, Any]: # Only the non-calculated and non-identifying fields. return { - 'namespace': self.namespace, 'priority': self.priority, 'lastseen': self.lastseen.isoformat(), 'lifetime': self.lifetime.total_seconds(), } - def touch(self, *, lifetime: Optional[int] = None) -> None: - self.lastseen = datetime.datetime.utcnow() - self.lifetime = (self.lifetime if lifetime is None else - lifetime if isinstance(lifetime, datetime.timedelta) else - datetime.timedelta(seconds=int(lifetime))) - self.deadline = self.lastseen + self.lifetime - self.is_dead = self.deadline <= datetime.datetime.utcnow() - - async def keepalive(self) -> None: - """ - Add a peer to the peers, and update its alive status. - """ - self.touch() - await apply_peers([self], name=self.name, namespace=self.namespace) - - async def disappear(self) -> None: - """ - Remove a peer from the peers (gracefully). - """ - self.touch(lifetime=0) - await apply_peers([self], name=self.name, namespace=self.namespace) - - @staticmethod - async def _is_peering_exist(name: str, namespace: Optional[str]) -> bool: - resource = CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE - obj = await fetching.read_obj(resource=resource, namespace=namespace, name=name, default=None) - return obj is not None - - -async def apply_peers( - peers: Iterable[Peer], - name: str, - namespace: Union[None, str], -) -> None: - """ - Apply the changes in the peers to the sync-object. - - The dead peers are removed, the new or alive peers are stored. - Note: this does NOT change their `lastseen` field, so do it explicitly with ``touch()``. - """ - patch = patches.Patch() - patch.update({'status': {peer.id: None if peer.is_dead else peer.as_dict() for peer in peers}}) - resource = CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE - await patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) - async def process_peering_event( *, raw_event: bodies.RawEvent, freeze_mode: primitives.Toggle, - ourselves: Peer, + namespace: Optional[str], + identity: Identity, + settings: configuration.OperatorSettings, autoclean: bool = True, replenished: asyncio.Event, ) -> None: @@ -184,24 +109,23 @@ async def process_peering_event( and to all the resource handlers to check its value when the events arrive (see `create_tasks` and `run` functions). """ + body: bodies.RawBody = raw_event['object'] + meta: bodies.RawMeta = raw_event['object']['metadata'] # Silently ignore the peering objects which are not ours to worry. - body = raw_event['object'] - name = body.get('metadata', {}).get('name', None) - namespace = body.get('metadata', {}).get('namespace', None) - if namespace != ourselves.namespace or name != ourselves.name or name is None: + if meta.get('namespace') != namespace or meta.get('name') != settings.peering.name: return # Find if we are still the highest priority operator. pairs = cast(Mapping[str, Mapping[str, object]], body.get('status', {})) - peers = [Peer(id=opid, name=name, **opinfo) for opid, opinfo in pairs.items()] + peers = [Peer(identity=Identity(opid), **opinfo) for opid, opinfo in pairs.items()] dead_peers = [peer for peer in peers if peer.is_dead] - prio_peers = [peer for peer in peers if not peer.is_dead and peer.priority > ourselves.priority] - same_peers = [peer for peer in peers if not peer.is_dead and peer.priority == ourselves.priority and peer.id != ourselves.id] + live_peers = [peer for peer in peers if not peer.is_dead and peer.identity != identity] + prio_peers = [peer for peer in live_peers if peer.priority > settings.peering.priority] + same_peers = [peer for peer in live_peers if peer.priority == settings.peering.priority] if autoclean and dead_peers: - # NB: sync and blocking, but this is fine. - await apply_peers(dead_peers, name=ourselves.name, namespace=ourselves.namespace) + await clean(peers=dead_peers, settings=settings, namespace=namespace) if prio_peers: if freeze_mode.is_off(): @@ -218,32 +142,101 @@ async def process_peering_event( await freeze_mode.turn_off() -async def peers_keepalive( +async def keepalive( *, - ourselves: Peer, + namespace: Optional[str], + identity: Identity, + settings: configuration.OperatorSettings, ) -> NoReturn: """ An ever-running coroutine to regularly send our own keep-alive status for the peers. """ try: while True: - logger.debug(f"Peering keep-alive update for {ourselves.id} (priority {ourselves.priority})") - await ourselves.keepalive() + await touch( + identity=identity, + settings=settings, + namespace=namespace, + ) # How often do we update. Keep limited to avoid k8s api flooding. # Should be slightly less than the lifetime, enough for a patch request to finish. - await asyncio.sleep(max(1, int(ourselves.lifetime.total_seconds() - 10))) + await asyncio.sleep(max(1, int(settings.peering.lifetime - 10))) finally: try: - await asyncio.shield(ourselves.disappear()) + await asyncio.shield(touch( + identity=identity, + settings=settings, + namespace=namespace, + lifetime=0, + )) except asyncio.CancelledError: - # It is the cancellation of `keepalive()`, not of the shielded `disappear()`. pass except Exception: logger.exception(f"Couldn't remove self from the peering. Ignoring.") -def detect_own_id(*, manual: bool) -> str: +async def touch( + *, + identity: Identity, + settings: configuration.OperatorSettings, + namespace: Optional[str], + lifetime: Optional[int] = None, +) -> None: + name = settings.peering.name + resource = guess_resource(namespace=namespace) + + peer = Peer( + identity=identity, + priority=settings.peering.priority, + lifetime=settings.peering.lifetime if lifetime is None else lifetime, + ) + + patch = patches.Patch() + patch.update({'status': {identity: None if peer.is_dead else peer.as_dict()}}) + rsp = await patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) + + where = f'in {namespace!r}' if namespace else 'cluster-wide' + result = "not found" if rsp is None else "ok" + logger.debug(f"Keep-alive in {name!r} {where}: {result}.") + + +async def clean( + *, + peers: Iterable[Peer], + settings: configuration.OperatorSettings, + namespace: Optional[str], +) -> None: + name = settings.peering.name + resource = guess_resource(namespace=namespace) + + patch = patches.Patch() + patch.update({'status': {peer.identity: None for peer in peers}}) + await patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) + + +async def detect_presence( + *, + settings: configuration.OperatorSettings, + namespace: Optional[str], +) -> Optional[bool]: + + if settings.peering.standalone: + return None + + resource = guess_resource(namespace=namespace) + name = settings.peering.name + obj = await fetching.read_obj(resource=resource, namespace=namespace, name=name, default=None) + if settings.peering.mandatory and obj is None: + raise Exception(f"The mandatory peering {name!r} was not found.") + elif obj is None: + logger.warning(f"Default peering object not found, falling back to the standalone mode.") + return False + else: + return True + + +def detect_own_id(*, manual: bool) -> Identity: """ Detect or generate the id for ourselves, i.e. the execute operator. @@ -265,10 +258,14 @@ def detect_own_id(*, manual: bool) -> str: pod = os.environ.get('POD_ID', None) if pod is not None: - return pod + return Identity(pod) user = getpass.getuser() host = hostnames.get_descriptive_hostname() now = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S") rnd = ''.join(random.choices('abcdefhijklmnopqrstuvwxyz0123456789', k=3)) - return f'{user}@{host}' if manual else f'{user}@{host}/{now}/{rnd}' + return Identity(f'{user}@{host}' if manual else f'{user}@{host}/{now}/{rnd}') + + +def guess_resource(namespace: Optional[str]) -> resources.Resource: + return CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE diff --git a/kopf/reactor/running.py b/kopf/reactor/running.py index 383192ec..a25763f7 100644 --- a/kopf/reactor/running.py +++ b/kopf/reactor/running.py @@ -186,6 +186,15 @@ async def spawn_tasks( ready_flag = ready_flag if ready_flag is not None else asyncio.Event() tasks: MutableSequence[asyncio_Task] = [] + # Map kwargs into the settings object. + if peering_name is not None: + settings.peering.mandatory = True + settings.peering.name = peering_name + if standalone is not None: + settings.peering.standalone = standalone + if priority is not None: + settings.peering.priority = priority + # Global credentials store for this operator, also for CRD-reading & peering mode detection. auth.vault_var.set(vault) @@ -245,23 +254,24 @@ async def spawn_tasks( endpoint=liveness_endpoint))) # Monitor the peers, unless explicitly disabled. - ourselves: Optional[peering.Peer] = await peering.Peer.detect( - id=peering.detect_own_id(manual=False), priority=priority, - standalone=standalone, namespace=namespace, name=peering_name, - ) - if ourselves: + if await peering.detect_presence(namespace=namespace, settings=settings): + identity = peering.detect_own_id(manual=False) tasks.append(_create_checked_root_task( name="peering keepalive", ready_flag=ready_flag, - coro=peering.peers_keepalive( - ourselves=ourselves))) + coro=peering.keepalive( + namespace=namespace, + settings=settings, + identity=identity))) tasks.append(_create_checked_root_task( name="watcher of peering", ready_flag=ready_flag, coro=queueing.watcher( namespace=namespace, settings=settings, - resource=ourselves.resource, + resource=peering.guess_resource(namespace=namespace), processor=functools.partial(peering.process_peering_event, - ourselves=ourselves, + namespace=namespace, + settings=settings, + identity=identity, freeze_mode=freeze_mode)))) # Resource event handling, only once for every known resource (de-duplicated). diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index 982e8978..7f933079 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -81,6 +81,48 @@ class PostingSettings: """ +@dataclasses.dataclass +class PeeringSettings: + + name: str = 'default' + """ + The name of the peering object to use. + Distinct peering objects are isolated peering neighbourhoods, + i.e. operators in one of them are not visible to operators in others. + """ + + priority: int = 0 + """ + The operator's priority to use. The operators with lower priority freeze + when they see operators with higher or the same priority -- + to avoid double-processing and double-handling of the resources. + """ + + lifetime: int = 60 + """ + For how long (in seconds) the operator's record is considered actual + by other operators before assuming that the corresponding operator + is not functioning and the freeze mode should be re-evaluated. + The peered operators will update their records as long as they are running, + slightly faster than their records expires (5-10 seconds earlier). + """ + + mandatory: bool = False + """ + Is peering mandatory for this operator, or optional? If it is mandatory, + the operator will fail to run in the absence of the peering CRD or object. + If optional, it will continue in the standalone mode (i.e. without peering). + """ + + standalone: bool = False + """ + Should the operator be forced to run without peering even if it exists? + Normally, operator automatically detect the peering objects named "default", + and run with peering enabled if they exist, or standalone if absent. + But they can be forced to either mode (standalone or mandatory peering). + """ + + @dataclasses.dataclass class WatchingSettings: @@ -285,6 +327,7 @@ class BackgroundSettings: class OperatorSettings: process: ProcessSettings = dataclasses.field(default_factory=ProcessSettings) posting: PostingSettings = dataclasses.field(default_factory=PostingSettings) + peering: PeeringSettings = dataclasses.field(default_factory=PeeringSettings) watching: WatchingSettings = dataclasses.field(default_factory=WatchingSettings) batching: BatchingSettings = dataclasses.field(default_factory=BatchingSettings) execution: ExecutionSettings = dataclasses.field(default_factory=ExecutionSettings) diff --git a/tests/peering/test_freeze_mode.py b/tests/peering/test_freeze_mode.py index 2e0b28a2..c5c608f4 100644 --- a/tests/peering/test_freeze_mode.py +++ b/tests/peering/test_freeze_mode.py @@ -3,7 +3,7 @@ import freezegun import pytest -from kopf.engines.peering import Peer, process_peering_event +from kopf.engines.peering import process_peering_event from kopf.structs import bodies, primitives @@ -20,9 +20,8 @@ ['our-name', None, 'their-name', 'our-namespace'], ]) async def test_other_peering_objects_are_ignored( - mocker, our_name, our_namespace, their_name, their_namespace): + mocker, settings, our_name, our_namespace, their_name, their_namespace): - ourselves = Peer(id='id', name=our_name, namespace=our_namespace) status = mocker.Mock() status.items.side_effect = Exception("This should not be called.") event = bodies.RawEvent( @@ -31,18 +30,24 @@ async def test_other_peering_objects_are_ignored( 'metadata': {'name': their_name, 'namespace': their_namespace}, 'status': status, }) + + settings.peering.name = our_name await process_peering_event( raw_event=event, - ourselves=ourselves, freeze_mode=primitives.Toggle(), replenished=asyncio.Event(), autoclean=False, + identity='id', + settings=settings, + namespace=our_namespace, ) assert not status.items.called @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_toggled_on_for_higher_priority_peer_when_initially_off(caplog, assert_logs): +async def test_toggled_on_for_higher_priority_peer_when_initially_off( + caplog, assert_logs, settings): + event = bodies.RawEvent( type='ADDED', # irrelevant object={ @@ -55,19 +60,22 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off(caplog, as }, }, }) + settings.peering.name = 'name' + settings.peering.priority = 100 replenished = asyncio.Event() freeze_mode = primitives.Toggle(False) - ourselves = Peer(id='id', name='name', namespace='namespace', priority=100) caplog.set_level(0) assert freeze_mode.is_off() await process_peering_event( raw_event=event, freeze_mode=freeze_mode, - ourselves=ourselves, replenished=replenished, autoclean=False, + namespace='namespace', + identity='id', + settings=settings, ) assert freeze_mode.is_on() assert_logs(["Freezing operations in favour of"], prohibited=[ @@ -78,7 +86,9 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off(caplog, as @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_ignored_for_higher_priority_peer_when_already_on(caplog, assert_logs): +async def test_ignored_for_higher_priority_peer_when_already_on( + caplog, assert_logs, settings): + event = bodies.RawEvent( type='ADDED', # irrelevant object={ @@ -91,19 +101,22 @@ async def test_ignored_for_higher_priority_peer_when_already_on(caplog, assert_l }, }, }) + settings.peering.name = 'name' + settings.peering.priority = 100 replenished = asyncio.Event() freeze_mode = primitives.Toggle(True) - ourselves = Peer(id='id', name='name', namespace='namespace', priority=100) caplog.set_level(0) assert freeze_mode.is_on() await process_peering_event( raw_event=event, freeze_mode=freeze_mode, - ourselves=ourselves, replenished=replenished, autoclean=False, + namespace='namespace', + identity='id', + settings=settings, ) assert freeze_mode.is_on() assert_logs([], prohibited=[ @@ -115,7 +128,9 @@ async def test_ignored_for_higher_priority_peer_when_already_on(caplog, assert_l @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_toggled_off_for_lower_priority_peer_when_initially_on(caplog, assert_logs): +async def test_toggled_off_for_lower_priority_peer_when_initially_on( + caplog, assert_logs, settings): + event = bodies.RawEvent( type='ADDED', # irrelevant object={ @@ -128,19 +143,22 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on(caplog, ass }, }, }) + settings.peering.name = 'name' + settings.peering.priority = 100 replenished = asyncio.Event() freeze_mode = primitives.Toggle(True) - ourselves = Peer(id='id', name='name', namespace='namespace', priority=100) caplog.set_level(0) assert freeze_mode.is_on() await process_peering_event( raw_event=event, freeze_mode=freeze_mode, - ourselves=ourselves, replenished=replenished, autoclean=False, + namespace='namespace', + identity='id', + settings=settings, ) assert freeze_mode.is_off() assert_logs(["Resuming operations after the freeze"], prohibited=[ @@ -151,7 +169,9 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on(caplog, ass @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_ignored_for_lower_priority_peer_when_already_off(caplog, assert_logs): +async def test_ignored_for_lower_priority_peer_when_already_off( + caplog, assert_logs, settings): + event = bodies.RawEvent( type='ADDED', # irrelevant object={ @@ -164,19 +184,22 @@ async def test_ignored_for_lower_priority_peer_when_already_off(caplog, assert_l }, }, }) + settings.peering.name = 'name' + settings.peering.priority = 100 replenished = asyncio.Event() freeze_mode = primitives.Toggle(False) - ourselves = Peer(id='id', name='name', namespace='namespace', priority=100) caplog.set_level(0) assert freeze_mode.is_off() await process_peering_event( raw_event=event, freeze_mode=freeze_mode, - ourselves=ourselves, replenished=replenished, autoclean=False, + namespace='namespace', + identity='id', + settings=settings, ) assert freeze_mode.is_off() assert_logs([], prohibited=[ @@ -188,7 +211,9 @@ async def test_ignored_for_lower_priority_peer_when_already_off(caplog, assert_l @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_toggled_on_for_same_priority_peer_when_initially_off(caplog, assert_logs): +async def test_toggled_on_for_same_priority_peer_when_initially_off( + caplog, assert_logs, settings): + event = bodies.RawEvent( type='ADDED', # irrelevant object={ @@ -201,19 +226,22 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off(caplog, asse }, }, }) + settings.peering.name = 'name' + settings.peering.priority = 100 replenished = asyncio.Event() freeze_mode = primitives.Toggle(False) - ourselves = Peer(id='id', name='name', namespace='namespace', priority=100) caplog.set_level(0) assert freeze_mode.is_off() await process_peering_event( raw_event=event, freeze_mode=freeze_mode, - ourselves=ourselves, replenished=replenished, autoclean=False, + namespace='namespace', + identity='id', + settings=settings, ) assert freeze_mode.is_on() assert_logs([ @@ -226,7 +254,9 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off(caplog, asse @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_ignored_for_same_priority_peer_when_already_on(caplog, assert_logs): +async def test_ignored_for_same_priority_peer_when_already_on( + caplog, assert_logs, settings): + event = bodies.RawEvent( type='ADDED', # irrelevant object={ @@ -239,19 +269,22 @@ async def test_ignored_for_same_priority_peer_when_already_on(caplog, assert_log }, }, }) + settings.peering.name = 'name' + settings.peering.priority = 100 replenished = asyncio.Event() freeze_mode = primitives.Toggle(True) - ourselves = Peer(id='id', name='name', namespace='namespace', priority=100) caplog.set_level(0) assert freeze_mode.is_on() await process_peering_event( raw_event=event, freeze_mode=freeze_mode, - ourselves=ourselves, replenished=replenished, autoclean=False, + namespace='namespace', + identity='id', + settings=settings, ) assert freeze_mode.is_on() assert_logs([ diff --git a/tests/peering/test_keepalive.py b/tests/peering/test_keepalive.py index a27c6f41..40594dff 100644 --- a/tests/peering/test_keepalive.py +++ b/tests/peering/test_keepalive.py @@ -1,27 +1,25 @@ import pytest -from kopf.engines.peering import Peer, peers_keepalive +from kopf.engines.peering import Peer, keepalive class StopInfiniteCycleException(Exception): pass -async def test_background_task_runs(mocker): - ourselves = Peer(id='id', name='name', namespace='namespace', lifetime=33) - keepalive_mock = mocker.patch.object(ourselves, 'keepalive') - disappear_mock = mocker.patch.object(ourselves, 'disappear') +async def test_background_task_runs(mocker, settings): + touch_mock = mocker.patch('kopf.engines.peering.touch') sleep_mock = mocker.patch('asyncio.sleep') sleep_mock.side_effect = [None, None, StopInfiniteCycleException] + settings.peering.lifetime = 33 with pytest.raises(StopInfiniteCycleException): - await peers_keepalive(ourselves=ourselves) + await keepalive(settings=settings, identity='id', namespace='namespace') assert sleep_mock.call_count == 3 assert sleep_mock.call_args_list[0][0][0] == 33 - 10 assert sleep_mock.call_args_list[1][0][0] == 33 - 10 assert sleep_mock.call_args_list[2][0][0] == 33 - 10 - assert keepalive_mock.call_count == 3 - assert disappear_mock.call_count == 1 + assert touch_mock.call_count == 4 # 3 updates + 1 clean-up diff --git a/tests/peering/test_peer_detection.py b/tests/peering/test_peer_detection.py index da3b86c5..5555c40b 100644 --- a/tests/peering/test_peer_detection.py +++ b/tests/peering/test_peer_detection.py @@ -2,199 +2,93 @@ import pytest -from kopf.engines.peering import CLUSTER_PEERING_RESOURCE, NAMESPACED_PEERING_RESOURCE, \ - PEERING_DEFAULT_NAME, Peer +from kopf.engines.peering import CLUSTER_PEERING_RESOURCE, \ + NAMESPACED_PEERING_RESOURCE, detect_presence @pytest.fixture() -def with_cluster_default(hostname, aresponses): - url = CLUSTER_PEERING_RESOURCE.get_url(name=PEERING_DEFAULT_NAME) +def with_cluster_cr(hostname, aresponses): + url = CLUSTER_PEERING_RESOURCE.get_url(namespace=None, name='existent') aresponses.add(hostname, url, 'get', {'spec': {}}) @pytest.fixture() -def with_cluster_specific(hostname, aresponses): - url = CLUSTER_PEERING_RESOURCE.get_url(name='peering-name') +def with_namespaced_cr(hostname, aresponses): + url = NAMESPACED_PEERING_RESOURCE.get_url(namespace='namespace', name='existent') aresponses.add(hostname, url, 'get', {'spec': {}}) -@pytest.fixture() -def with_namespaced_default(hostname, aresponses): - url = NAMESPACED_PEERING_RESOURCE.get_url(namespace='namespace', name=PEERING_DEFAULT_NAME) - aresponses.add(hostname, url, 'get', {'spec': {}}) - - -@pytest.fixture() -def with_namespaced_specific(hostname, aresponses): - url = NAMESPACED_PEERING_RESOURCE.get_url(namespace='namespace', name='peering-name') - aresponses.add(hostname, url, 'get', {'spec': {}}) - - -# -# Parametrization via fixtures (it does not work from tests). -# -@pytest.fixture(params=[ - pytest.param(None, id='no-crds'), - pytest.param('with_both_crds', id='both-crds'), - pytest.param('with_cluster_crd', id='only-cluster-crd'), - pytest.param('with_namespaced_crd', id='only-namespaced-crd'), -]) -def all_crd_modes(request): - return request.getfixturevalue(request.param) if request.param else None - - -@pytest.fixture(params=[ - pytest.param('with_both_crds', id='both-crds'), - pytest.param('with_cluster_crd', id='only-cluster-crd'), -]) -def all_crd_modes_with_cluster_scoped_crd(request): - return request.getfixturevalue(request.param) if request.param else None - - -@pytest.fixture(params=[ - pytest.param('with_both_crds', id='both-crds'), - pytest.param('with_namespaced_crd', id='only-namespaced-crd'), -]) -def all_crd_modes_with_namespace_scoped_crd(request): - return request.getfixturevalue(request.param) if request.param else None - - -@pytest.fixture(params=[ - pytest.param(True, id='with-cluster-default'), - pytest.param(False, id='without-cluster-default') -]) -def both_cluster_default_modes(request): - return request.getfixturevalue('with_cluster_default') if request.param else None - - -@pytest.fixture(params=[ - pytest.param(True, id='with-cluster-specific'), - pytest.param(False, id='without-cluster-specific') -]) -def both_cluster_specific_modes(request): - return request.getfixturevalue('with_cluster_specific') if request.param else None - - -@pytest.fixture(params=[ - pytest.param(True, id='with-namespaced-default'), - pytest.param(False, id='without-namespaced-default') -]) -def both_namespaced_default_modes(request): - return request.getfixturevalue('with_namespaced_default') if request.param else None - - -@pytest.fixture(params=[ - pytest.param(True, id='with-namespaced-specific'), - pytest.param(False, id='without-namespaced-specific') -]) -def both_namespaced_specific_modes(request): - return request.getfixturevalue('with_namespaced_specific') if request.param else None - - -# -# Actual tests: only the action & assertions. -# -@pytest.mark.usefixtures('both_namespaced_specific_modes') -@pytest.mark.usefixtures('both_namespaced_default_modes') -@pytest.mark.usefixtures('both_cluster_specific_modes') -@pytest.mark.usefixtures('both_cluster_default_modes') -@pytest.mark.usefixtures('all_crd_modes') -@pytest.mark.parametrize('name', [None, 'name']) -@pytest.mark.parametrize('namespace', [None, 'namespaced']) -async def test_standalone(namespace, name): - peer = await Peer.detect(standalone=True, namespace=namespace, name=name) - assert peer is None - - -@pytest.mark.usefixtures('both_namespaced_specific_modes') -@pytest.mark.usefixtures('both_namespaced_default_modes') -@pytest.mark.usefixtures('both_cluster_specific_modes') -@pytest.mark.usefixtures('with_cluster_default') -@pytest.mark.usefixtures('all_crd_modes_with_cluster_scoped_crd') -async def test_cluster_scoped_with_default_name(): - peer = await Peer.detect(id='id', standalone=False, namespace=None, name=None) - assert peer.name == PEERING_DEFAULT_NAME - assert peer.namespace is None - - -@pytest.mark.usefixtures('both_namespaced_specific_modes') -@pytest.mark.usefixtures('with_namespaced_default') -@pytest.mark.usefixtures('both_cluster_specific_modes') -@pytest.mark.usefixtures('both_cluster_default_modes') -@pytest.mark.usefixtures('all_crd_modes_with_namespace_scoped_crd') -async def test_namespace_scoped_with_default_name(): - peer = await Peer.detect(id='id', standalone=False, namespace='namespace', name=None) - assert peer.name == PEERING_DEFAULT_NAME - assert peer.namespace == 'namespace' - - -@pytest.mark.usefixtures('both_namespaced_specific_modes') -@pytest.mark.usefixtures('both_namespaced_default_modes') -@pytest.mark.usefixtures('with_cluster_specific') -@pytest.mark.usefixtures('both_cluster_default_modes') -@pytest.mark.usefixtures('all_crd_modes_with_cluster_scoped_crd') -async def test_cluster_scoped_with_specific_name(): - peer = await Peer.detect(id='id', standalone=False, namespace=None, name='peering-name') - assert peer.name == 'peering-name' - assert peer.namespace is None - - -@pytest.mark.usefixtures('with_namespaced_specific') -@pytest.mark.usefixtures('both_namespaced_default_modes') -@pytest.mark.usefixtures('both_cluster_specific_modes') -@pytest.mark.usefixtures('both_cluster_default_modes') -@pytest.mark.usefixtures('all_crd_modes_with_namespace_scoped_crd') -async def test_namespace_scoped_with_specific_name(): - peer = await Peer.detect(id='id', standalone=False, namespace='namespace', name='peering-name') - assert peer.name == 'peering-name' - assert peer.namespace == 'namespace' - - -@pytest.mark.usefixtures('both_namespaced_specific_modes') -@pytest.mark.usefixtures('both_namespaced_default_modes') -@pytest.mark.usefixtures('both_cluster_specific_modes') -@pytest.mark.usefixtures('both_cluster_default_modes') -@pytest.mark.usefixtures('all_crd_modes_with_cluster_scoped_crd') -async def test_cluster_scoped_with_absent_name(hostname, aresponses): +@pytest.mark.usefixtures('with_namespaced_cr') +@pytest.mark.usefixtures('with_cluster_cr') +@pytest.mark.usefixtures('with_both_crds') +@pytest.mark.parametrize('name', ['existent', 'absent']) +@pytest.mark.parametrize('namespace', [None, 'namespace'], ids=['cluster', 'namespaced']) +@pytest.mark.parametrize('mandatory', [False, True], ids=['optional', 'mandatory']) +async def test_standalone(mandatory, namespace, name, settings): + settings.peering.standalone = True + settings.peering.mandatory = mandatory + settings.peering.name = name + peering = await detect_presence(settings=settings, namespace=namespace) + assert peering is None + + +@pytest.mark.usefixtures('with_cluster_cr') +@pytest.mark.usefixtures('with_cluster_crd') +@pytest.mark.parametrize('mandatory', [False, True], ids=['optional', 'mandatory']) +async def test_cluster_scoped_when_existent(mandatory, settings): + settings.peering.mandatory = mandatory + settings.peering.name = 'existent' + peering = await detect_presence(settings=settings, namespace=None) + assert peering is True + + +@pytest.mark.usefixtures('with_namespaced_cr') +@pytest.mark.usefixtures('with_namespaced_crd') +@pytest.mark.parametrize('mandatory', [False, True], ids=['optional', 'mandatory']) +async def test_namespace_scoped_when_existent(mandatory, settings): + settings.peering.mandatory = mandatory + settings.peering.name = 'existent' + peering = await detect_presence(settings=settings, namespace='namespace') + assert peering is True + + +@pytest.mark.usefixtures('with_cluster_crd') +async def test_cluster_scoped_when_absent(hostname, aresponses, settings): + settings.peering.mandatory = True + settings.peering.name = 'absent' aresponses.add(hostname, re.compile(r'.*'), 'get', aresponses.Response(status=404), repeat=999) - with pytest.raises(Exception, match=r"The peering 'absent-name' was not found") as e: - await Peer.detect(id='id', standalone=False, namespace=None, name='absent-name') + with pytest.raises(Exception, match=r"The mandatory peering 'absent' was not found") as e: + await detect_presence(settings=settings, namespace=None) -@pytest.mark.usefixtures('both_namespaced_specific_modes') -@pytest.mark.usefixtures('both_namespaced_default_modes') -@pytest.mark.usefixtures('both_cluster_specific_modes') -@pytest.mark.usefixtures('both_cluster_default_modes') -@pytest.mark.usefixtures('all_crd_modes_with_namespace_scoped_crd') -async def test_namespace_scoped_with_absent_name(hostname, aresponses): +@pytest.mark.usefixtures('with_namespaced_crd') +async def test_namespace_scoped_when_absent(hostname, aresponses, settings): + settings.peering.mandatory = True + settings.peering.name = 'absent' aresponses.add(hostname, re.compile(r'.*'), 'get', aresponses.Response(status=404), repeat=999) - with pytest.raises(Exception, match=r"The peering 'absent-name' was not found") as e: - await Peer.detect(id='id', standalone=False, namespace='namespace', name='absent-name') + with pytest.raises(Exception, match=r"The mandatory peering 'absent' was not found") as e: + await detect_presence(settings=settings, namespace='namespace') -# NB: without cluster-default peering. -@pytest.mark.usefixtures('both_namespaced_specific_modes') -@pytest.mark.usefixtures('both_namespaced_default_modes') -@pytest.mark.usefixtures('both_cluster_specific_modes') -@pytest.mark.usefixtures('all_crd_modes_with_cluster_scoped_crd') -async def test_cluster_scoped_with_warning(hostname, aresponses, assert_logs, caplog): +@pytest.mark.usefixtures('with_cluster_crd') +async def test_fallback_with_cluster_scoped(hostname, aresponses, assert_logs, caplog, settings): + settings.peering.mandatory = False + settings.peering.name = 'absent' aresponses.add(hostname, re.compile(r'.*'), 'get', aresponses.Response(status=404), repeat=999) - peer = await Peer.detect(id='id', standalone=False, namespace=None, name=None) - assert peer is None + peering = await detect_presence(settings=settings, namespace=None) + assert peering is False assert_logs([ "Default peering object not found, falling back to the standalone mode." ]) -# NB: without namespaced-default peering. -@pytest.mark.usefixtures('both_namespaced_specific_modes') -@pytest.mark.usefixtures('both_cluster_specific_modes') -@pytest.mark.usefixtures('both_cluster_default_modes') -@pytest.mark.usefixtures('all_crd_modes_with_namespace_scoped_crd') -async def test_namespace_scoped_with_warning(hostname, aresponses, assert_logs, caplog): +@pytest.mark.usefixtures('with_namespaced_crd') +async def test_fallback_with_namespace_scoped(hostname, aresponses, assert_logs, caplog, settings): + settings.peering.mandatory = False + settings.peering.name = 'absent' aresponses.add(hostname, re.compile(r'.*'), 'get', aresponses.Response(status=404), repeat=999) - peer = await Peer.detect(id='id', standalone=False, namespace='namespace', name=None) - assert peer is None + peering = await detect_presence(settings=settings, namespace='namespace') + assert peering is False assert_logs([ "Default peering object not found, falling back to the standalone mode." ]) diff --git a/tests/peering/test_apply_peers.py b/tests/peering/test_peer_patching.py similarity index 58% rename from tests/peering/test_apply_peers.py rename to tests/peering/test_peer_patching.py index 32fdb3dc..bee57f61 100644 --- a/tests/peering/test_apply_peers.py +++ b/tests/peering/test_peer_patching.py @@ -3,7 +3,7 @@ import pytest from kopf.engines.peering import CLUSTER_PEERING_RESOURCE, \ - NAMESPACED_PEERING_RESOURCE, Peer, apply_peers + NAMESPACED_PEERING_RESOURCE, Peer, clean, touch @pytest.mark.usefixtures('with_both_crds') @@ -11,46 +11,26 @@ pytest.param('ns', NAMESPACED_PEERING_RESOURCE, id='namespace-scoped'), pytest.param(None, CLUSTER_PEERING_RESOURCE, id='cluster-scoped'), ]) -@freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_applying_a_dead_peer_purges_it( - hostname, aresponses, resp_mocker, namespace, peering_resource): - - patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) - url = peering_resource.get_url(name='name0', namespace=namespace) - aresponses.add(hostname, url, 'patch', patch_mock) - - peer = Peer(id='id1', name='...', namespace='ns1', lastseen='2020-01-01T00:00:00') - await apply_peers(peers=[peer], name='name0', namespace=namespace) - - assert patch_mock.called - patch = await patch_mock.call_args_list[0][0][0].json() - assert set(patch['status']) == {'id1'} - assert patch['status']['id1'] is None - - -@pytest.mark.usefixtures('with_both_crds') -@pytest.mark.parametrize('namespace, peering_resource', [ - pytest.param('ns', NAMESPACED_PEERING_RESOURCE, id='namespace-scoped'), - pytest.param(None, CLUSTER_PEERING_RESOURCE, id='cluster-scoped'), +@pytest.mark.parametrize('lastseen', [ + pytest.param('2020-01-01T00:00:00', id='when-dead'), + pytest.param('2020-12-31T23:59:59', id='when-alive'), ]) @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_applying_an_alive_peer_stores_it( - hostname, aresponses, resp_mocker, namespace, peering_resource): +async def test_cleaning_peers_purges_them( + hostname, aresponses, resp_mocker, namespace, peering_resource, settings, lastseen): + settings.peering.name = 'name0' patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) url = peering_resource.get_url(name='name0', namespace=namespace) aresponses.add(hostname, url, 'patch', patch_mock) - peer = Peer(id='id1', name='...', namespace='ns1', lastseen='2020-12-31T23:59:59') - await apply_peers(peers=[peer], name='name0', namespace=namespace) + peer = Peer(identity='id1', lastseen=lastseen) + await clean(peers=[peer], settings=settings, namespace=namespace) assert patch_mock.called patch = await patch_mock.call_args_list[0][0][0].json() assert set(patch['status']) == {'id1'} - assert patch['status']['id1']['namespace'] == 'ns1' - assert patch['status']['id1']['priority'] == 0 - assert patch['status']['id1']['lastseen'] == '2020-12-31T23:59:59' - assert patch['status']['id1']['lifetime'] == 60 + assert patch['status']['id1'] is None @pytest.mark.usefixtures('with_both_crds') @@ -58,25 +38,20 @@ async def test_applying_an_alive_peer_stores_it( pytest.param('ns', NAMESPACED_PEERING_RESOURCE, id='namespace-scoped'), pytest.param(None, CLUSTER_PEERING_RESOURCE, id='cluster-scoped'), ]) -@pytest.mark.parametrize('lastseen', [ - pytest.param('2020-01-01T00:00:00', id='when-dead'), - pytest.param('2020-12-31T23:59:59', id='when-alive'), -]) @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_keepalive( - hostname, aresponses, resp_mocker, namespace, peering_resource, lastseen): +async def test_touching_a_peer_stores_it( + hostname, aresponses, resp_mocker, namespace, peering_resource, settings): + settings.peering.name = 'name0' patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) url = peering_resource.get_url(name='name0', namespace=namespace) aresponses.add(hostname, url, 'patch', patch_mock) - peer = Peer(id='id1', name='name0', namespace=namespace, lastseen=lastseen) - await peer.keepalive() + await touch(identity='id1', settings=settings, namespace=namespace) assert patch_mock.called patch = await patch_mock.call_args_list[0][0][0].json() assert set(patch['status']) == {'id1'} - assert patch['status']['id1']['namespace'] == namespace assert patch['status']['id1']['priority'] == 0 assert patch['status']['id1']['lastseen'] == '2020-12-31T23:59:59.123456' assert patch['status']['id1']['lifetime'] == 60 @@ -87,20 +62,16 @@ async def test_keepalive( pytest.param('ns', NAMESPACED_PEERING_RESOURCE, id='namespace-scoped'), pytest.param(None, CLUSTER_PEERING_RESOURCE, id='cluster-scoped'), ]) -@pytest.mark.parametrize('lastseen', [ - pytest.param('2020-01-01T00:00:00', id='when-dead'), - pytest.param('2020-12-31T23:59:59', id='when-alive'), -]) @freezegun.freeze_time('2020-12-31T23:59:59.123456') -async def test_disappear( - hostname, aresponses, resp_mocker, namespace, peering_resource, lastseen): +async def test_expiring_a_peer_purges_it( + hostname, aresponses, resp_mocker, namespace, peering_resource, settings): + settings.peering.name = 'name0' patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) url = peering_resource.get_url(name='name0', namespace=namespace) aresponses.add(hostname, url, 'patch', patch_mock) - peer = Peer(id='id1', name='name0', namespace=namespace, lastseen=lastseen) - await peer.disappear() + await touch(identity='id1', settings=settings, namespace=namespace, lifetime=0) assert patch_mock.called patch = await patch_mock.call_args_list[0][0][0].json() diff --git a/tests/peering/test_peers.py b/tests/peering/test_peers.py index 52110360..4694f8a5 100644 --- a/tests/peering/test_peers.py +++ b/tests/peering/test_peers.py @@ -1,105 +1,82 @@ import datetime import freezegun -import pytest -from kopf.engines.peering import CLUSTER_PEERING_RESOURCE, NAMESPACED_PEERING_RESOURCE, Peer +from kopf.engines.peering import Peer @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_defaults(): - peer = Peer(id='id', name='name') - assert peer.id == 'id' - assert peer.name == 'name' - assert peer.namespace is None + peer = Peer(identity='id') + assert peer.identity == 'id' assert peer.lifetime == datetime.timedelta(seconds=60) assert peer.lastseen == datetime.datetime(2020, 12, 31, 23, 59, 59, 123456) @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_repr(): - peer = Peer(id='some-id', name='some-name', namespace='some-namespace') + peer = Peer(identity='some-id') text = repr(peer) assert text.startswith('Peer(') assert text.endswith(')') - assert '(some-id, ' in text + assert 'identity=some-id' in text assert 'priority=0' in text assert 'lastseen=' in text assert 'lifetime=' in text - # The peering object's name is of no interest, the peer's id is. - assert 'name=' not in text - - # The namespace of the operator can affect the conflict detection. - # It is not always the same as the peering object's namespace. - assert 'namespace=some-namespace' in text - @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_priority_specified(): - peer = Peer(id='id', name='name', priority=123) + peer = Peer(identity='id', priority=123) assert peer.priority == 123 @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_priority_unspecified(): - peer = Peer(id='id', name='name') + peer = Peer(identity='id') assert peer.priority == 0 -def test_resource_for_cluster_peering(): - peer = Peer(id='id', name='name', namespace=None) - assert peer.resource == CLUSTER_PEERING_RESOURCE - assert peer.namespace is None - - -def test_resource_for_namespaced_peering(): - peer = Peer(id='id', name='name', namespace='namespaced') - assert peer.resource == NAMESPACED_PEERING_RESOURCE - assert peer.namespace == 'namespaced' - - @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_with_lifetime_as_timedelta(): - peer = Peer(id='id', name='name', lifetime=datetime.timedelta(seconds=123)) + peer = Peer(identity='id', lifetime=datetime.timedelta(seconds=123)) assert peer.lifetime == datetime.timedelta(seconds=123) @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_with_lifetime_as_number(): - peer = Peer(id='id', name='name', lifetime=123) + peer = Peer(identity='id', lifetime=123) assert peer.lifetime == datetime.timedelta(seconds=123) @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_with_lifetime_unspecified(): - peer = Peer(id='id', name='name') + peer = Peer(identity='id') assert peer.lifetime == datetime.timedelta(seconds=60) @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_with_lastseen_as_datetime(): - peer = Peer(id='id', name='name', lastseen=datetime.datetime(2020, 1, 1, 12, 34, 56, 789123)) + peer = Peer(identity='id', lastseen=datetime.datetime(2020, 1, 1, 12, 34, 56, 789123)) assert peer.lastseen == datetime.datetime(2020, 1, 1, 12, 34, 56, 789123) @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_with_lastseen_as_string(): - peer = Peer(id='id', name='name', lastseen='2020-01-01T12:34:56.789123') + peer = Peer(identity='id', lastseen='2020-01-01T12:34:56.789123') assert peer.lastseen == datetime.datetime(2020, 1, 1, 12, 34, 56, 789123) @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_with_lastseen_unspecified(): - peer = Peer(id='id', name='name') + peer = Peer(identity='id') assert peer.lastseen == datetime.datetime(2020, 12, 31, 23, 59, 59, 123456) @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_as_alive(): peer = Peer( - id='id', - name='name', + identity='id', lifetime=10, lastseen='2020-12-31T23:59:50.123456', # less than 10 seconds before "now" ) @@ -112,8 +89,7 @@ def test_creation_as_alive(): @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_as_dead(): peer = Peer( - id='id', - name='name', + identity='id', lifetime=10, lastseen='2020-12-31T23:59:49.123456', # 10 seconds before "now" sharp ) @@ -121,33 +97,3 @@ def test_creation_as_dead(): assert peer.lastseen == datetime.datetime(2020, 12, 31, 23, 59, 49, 123456) assert peer.deadline == datetime.datetime(2020, 12, 31, 23, 59, 59, 123456) assert peer.is_dead is True - - -def test_touching_when_alive(): - with freezegun.freeze_time('2020-01-01T10:20:30'): - peer = Peer(id='id1', name='name1', lifetime=123) - - assert not peer.is_dead - - with freezegun.freeze_time('2020-02-02T11:22:33'): - peer.touch() - - assert peer.lifetime == datetime.timedelta(seconds=123) - assert peer.lastseen == datetime.datetime(2020, 2, 2, 11, 22, 33) - assert peer.deadline == datetime.datetime(2020, 2, 2, 11, 24, 36) - assert not peer.is_dead - - -def test_touching_when_dead(): - with freezegun.freeze_time('2020-01-01T10:20:30'): - peer = Peer(id='id1', name='name1', lifetime=123, lastseen='2019-01-01T00:00:00') - - assert peer.is_dead - - with freezegun.freeze_time('2020-02-02T11:22:33'): - peer.touch() - - assert peer.lifetime == datetime.timedelta(seconds=123) - assert peer.lastseen == datetime.datetime(2020, 2, 2, 11, 22, 33) - assert peer.deadline == datetime.datetime(2020, 2, 2, 11, 24, 36) - assert not peer.is_dead diff --git a/tests/peering/test_resource_guessing.py b/tests/peering/test_resource_guessing.py new file mode 100644 index 00000000..113c14fb --- /dev/null +++ b/tests/peering/test_resource_guessing.py @@ -0,0 +1,13 @@ +import pytest + +from kopf.engines.peering import CLUSTER_PEERING_RESOURCE, \ + NAMESPACED_PEERING_RESOURCE, guess_resource + + +@pytest.mark.parametrize('namespace, expected_resource', [ + (None, CLUSTER_PEERING_RESOURCE), + ('ns', NAMESPACED_PEERING_RESOURCE), +]) +def test_resource(namespace, expected_resource): + resource = guess_resource(namespace=namespace) + assert resource == expected_resource diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py index f2110c9f..2583ea7f 100644 --- a/tests/settings/test_defaults.py +++ b/tests/settings/test_defaults.py @@ -6,6 +6,11 @@ async def test_declared_public_interface_and_promised_defaults(): settings = kopf.OperatorSettings() assert settings.posting.level == logging.INFO + assert settings.peering.name == "default" + assert settings.peering.priority == 0 + assert settings.peering.lifetime == 60 + assert settings.peering.mandatory == False + assert settings.peering.standalone == False assert settings.watching.reconnect_backoff == 0.1 assert settings.watching.connect_timeout is None assert settings.watching.server_timeout is None From 9ee4db9cda35b985dd4982d88f0fa1d8eedea498 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 31 Oct 2020 16:42:53 +0100 Subject: [PATCH 3/5] Make the peering CLI options defaultless With the default values, they override the default values defined in the settings object. Which is not a big deal until they match, but can be an issue if the configuration defaults are changed later. It is better to have one and only one source of truth for the default values. A downside: the defaults will not be seen in `--help`. But these values can be misleading anyway, as the operator can override the values in the startup handlers. --- kopf/cli.py | 14 +++++++------- kopf/reactor/running.py | 12 ++++++------ tests/cli/test_options.py | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/kopf/cli.py b/kopf/cli.py index ab0313c6..a20bf877 100644 --- a/kopf/cli.py +++ b/kopf/cli.py @@ -64,11 +64,11 @@ def main() -> None: @main.command() @logging_options @click.option('-n', '--namespace', default=None) -@click.option('--standalone', is_flag=True, default=False) +@click.option('--standalone', is_flag=True, default=None) @click.option('--dev', 'priority', type=int, is_flag=True, flag_value=666) @click.option('-L', '--liveness', 'liveness_endpoint', type=str) -@click.option('-P', '--peering', 'peering_name', type=str, default=None, envvar='KOPF_RUN_PEERING') -@click.option('-p', '--priority', type=int, default=0) +@click.option('-P', '--peering', 'peering_name', type=str, envvar='KOPF_RUN_PEERING') +@click.option('-p', '--priority', type=int) @click.option('-m', '--module', 'modules', multiple=True) @click.argument('paths', nargs=-1) @click.make_pass_decorator(CLIControls, ensure=True) @@ -77,8 +77,8 @@ def run( paths: List[str], modules: List[str], peering_name: Optional[str], - priority: int, - standalone: bool, + priority: Optional[int], + standalone: Optional[bool], namespace: Optional[str], liveness_endpoint: Optional[str], ) -> None: @@ -108,7 +108,7 @@ def run( @click.option('-n', '--namespace', default=None) @click.option('-i', '--id', type=str, default=None) @click.option('--dev', 'priority', flag_value=666) -@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_FREEZE_PEERING') +@click.option('-P', '--peering', 'peering_name', required=True, envvar='KOPF_FREEZE_PEERING') @click.option('-p', '--priority', type=int, default=100, required=True) @click.option('-t', '--lifetime', type=int, required=True) @click.option('-m', '--message', type=str) @@ -144,7 +144,7 @@ def freeze( @logging_options @click.option('-n', '--namespace', default=None) @click.option('-i', '--id', type=str, default=None) -@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_RESUME_PEERING') +@click.option('-P', '--peering', 'peering_name', required=True, envvar='KOPF_RESUME_PEERING') def resume( id: Optional[str], namespace: Optional[str], diff --git a/kopf/reactor/running.py b/kopf/reactor/running.py index a25763f7..b866964e 100644 --- a/kopf/reactor/running.py +++ b/kopf/reactor/running.py @@ -76,8 +76,8 @@ def run( registry: Optional[registries.OperatorRegistry] = None, settings: Optional[configuration.OperatorSettings] = None, memories: Optional[containers.ResourceMemories] = None, - standalone: bool = False, - priority: int = 0, + standalone: Optional[bool] = None, + priority: Optional[int] = None, peering_name: Optional[str] = None, liveness_endpoint: Optional[str] = None, namespace: Optional[str] = None, @@ -116,8 +116,8 @@ async def operator( registry: Optional[registries.OperatorRegistry] = None, settings: Optional[configuration.OperatorSettings] = None, memories: Optional[containers.ResourceMemories] = None, - standalone: bool = False, - priority: int = 0, + standalone: Optional[bool] = None, + priority: Optional[int] = None, peering_name: Optional[str] = None, liveness_endpoint: Optional[str] = None, namespace: Optional[str] = None, @@ -157,8 +157,8 @@ async def spawn_tasks( registry: Optional[registries.OperatorRegistry] = None, settings: Optional[configuration.OperatorSettings] = None, memories: Optional[containers.ResourceMemories] = None, - standalone: bool = False, - priority: int = 0, + standalone: Optional[bool] = None, + priority: Optional[int] = None, peering_name: Optional[str] = None, liveness_endpoint: Optional[str] = None, namespace: Optional[str] = None, diff --git a/tests/cli/test_options.py b/tests/cli/test_options.py index 3c1a4787..6d9bc411 100644 --- a/tests/cli/test_options.py +++ b/tests/cli/test_options.py @@ -22,7 +22,7 @@ def test_options_passed_to_preload(invoke, options, envvars, kwarg, value, prelo @pytest.mark.parametrize('kwarg, value, options, envvars', [ - ('standalone', False, [], {}), + ('standalone', None, [], {}), ('standalone', True, ['--standalone'], {}), ('standalone', True, [], {'KOPF_RUN_STANDALONE': 'true'}), @@ -36,7 +36,7 @@ def test_options_passed_to_preload(invoke, options, envvars, kwarg, value, prelo ('peering_name', 'peer', ['--peering=peer'], {}), ('peering_name', 'peer', [], {'KOPF_RUN_PEERING': 'peer'}), - ('priority', 0, [], {}), + ('priority', None, [], {}), ('priority', 123, ['-p', '123'], {}), ('priority', 123, ['--priority=123'], {}), ('priority', 123, [], {'KOPF_RUN_PRIORITY': '123'}), From f694793a322065e592f6f0209eec21135785cfac Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Tue, 27 Oct 2020 19:18:07 +0100 Subject: [PATCH 4/5] Hide logged keep-alives in optional stealth mode --- docs/peering.rst | 24 ++++++++++ kopf/engines/peering.py | 7 +-- kopf/structs/configuration.py | 14 ++++++ tests/peering/test_peer_patching.py | 71 +++++++++++++++++++++++++++++ tests/settings/test_defaults.py | 1 + 5 files changed, 114 insertions(+), 3 deletions(-) diff --git a/docs/peering.rst b/docs/peering.rst index 521c8aae..7e589df7 100644 --- a/docs/peering.rst +++ b/docs/peering.rst @@ -185,3 +185,27 @@ With high probability, 2-3 pods will get their unique priorities. You can also use the pod's IP address in its numeric form as the priority, or any other source of integers. + + +Stealth keep-alives +=================== + +Every few seconds (60 by default), the operator will send a keep-alive update +to the chosen peering, showing that it is still functioning. Other operators +will notice that and make decisions on their freezing or resuming. + +The operator also logs a keep-alive activity to its own logs. This can be +distracting. To disable: + +.. code-block:: python + + import random + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.peering.stealth = True + +There is no equivalent CLI option for that. + +Please note that it only affects logging. The keep-alive are sent anyway. diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 4c5d4a5f..98d680dd 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -196,9 +196,10 @@ async def touch( patch.update({'status': {identity: None if peer.is_dead else peer.as_dict()}}) rsp = await patching.patch_obj(resource=resource, namespace=namespace, name=name, patch=patch) - where = f'in {namespace!r}' if namespace else 'cluster-wide' - result = "not found" if rsp is None else "ok" - logger.debug(f"Keep-alive in {name!r} {where}: {result}.") + if not settings.peering.stealth or rsp is None: + where = f"in {namespace!r}" if namespace else "cluster-wide" + result = "not found" if rsp is None else "ok" + logger.debug(f"Keep-alive in {name!r} {where}: {result}.") async def clean( diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index 7f933079..21cd243f 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -91,6 +91,20 @@ class PeeringSettings: i.e. operators in one of them are not visible to operators in others. """ + stealth: bool = False + """ + Should this operator log its keep-alives? + + In some cases, it might be undesired to log regular keep-alives while + they actually happen (to keep the logs clean and readable). + + Note that some occasions are logged unconditionally: + + * those affecting the operator's behaviour, such as freezes/resumes; + * those requiring human intervention, such as absence of a peering object + in the auto-detection mode (to make the peering mandatory or standalone). + """ + priority: int = 0 """ The operator's priority to use. The operators with lower priority freeze diff --git a/tests/peering/test_peer_patching.py b/tests/peering/test_peer_patching.py index bee57f61..eb7ac019 100644 --- a/tests/peering/test_peer_patching.py +++ b/tests/peering/test_peer_patching.py @@ -77,3 +77,74 @@ async def test_expiring_a_peer_purges_it( patch = await patch_mock.call_args_list[0][0][0].json() assert set(patch['status']) == {'id1'} assert patch['status']['id1'] is None + + +@pytest.mark.usefixtures('with_both_crds') +@pytest.mark.parametrize('namespace, peering_resource', [ + pytest.param('ns', NAMESPACED_PEERING_RESOURCE, id='namespace-scoped'), + pytest.param(None, CLUSTER_PEERING_RESOURCE, id='cluster-scoped'), +]) +@freezegun.freeze_time('2020-12-31T23:59:59.123456') +async def test_logs_are_skipped_in_stealth_mode( + hostname, aresponses, resp_mocker, namespace, peering_resource, settings, + assert_logs, caplog): + + caplog.set_level(0) + settings.peering.stealth = True + settings.peering.name = 'name0' + patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) + url = peering_resource.get_url(name='name0', namespace=namespace) + aresponses.add(hostname, url, 'patch', patch_mock) + + await touch(identity='id1', settings=settings, namespace=namespace) + + assert_logs([], prohibited=[ + "Keep-alive in", + ]) + + +@pytest.mark.usefixtures('with_both_crds') +@pytest.mark.parametrize('namespace, peering_resource', [ + pytest.param('ns', NAMESPACED_PEERING_RESOURCE, id='namespace-scoped'), + pytest.param(None, CLUSTER_PEERING_RESOURCE, id='cluster-scoped'), +]) +async def test_logs_are_logged_in_exposed_mode( + hostname, aresponses, resp_mocker, namespace, peering_resource, settings, + assert_logs, caplog): + + caplog.set_level(0) + settings.peering.stealth = False + settings.peering.name = 'name0' + patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) + url = peering_resource.get_url(name='name0', namespace=namespace) + aresponses.add(hostname, url, 'patch', patch_mock) + + await touch(identity='id1', settings=settings, namespace=namespace) + + assert_logs([ + r"Keep-alive in 'name0' (in 'ns'|cluster-wide): ok", + ]) + + +@pytest.mark.usefixtures('with_both_crds') +@pytest.mark.parametrize('stealth', [True, False], ids=['stealth', 'exposed']) +@pytest.mark.parametrize('namespace, peering_resource', [ + pytest.param('ns', NAMESPACED_PEERING_RESOURCE, id='namespace-scoped'), + pytest.param(None, CLUSTER_PEERING_RESOURCE, id='cluster-scoped'), +]) +async def test_logs_are_logged_when_absent( + hostname, aresponses, resp_mocker, namespace, peering_resource, stealth, settings, + assert_logs, caplog): + + caplog.set_level(0) + settings.peering.stealth = stealth + settings.peering.name = 'name0' + patch_mock = resp_mocker(return_value=aresponses.Response(status=404)) + url = peering_resource.get_url(name='name0', namespace=namespace) + aresponses.add(hostname, url, 'patch', patch_mock) + + await touch(identity='id1', settings=settings, namespace=namespace) + + assert_logs([ + r"Keep-alive in 'name0' (in 'ns'|cluster-wide): not found", + ]) diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py index 2583ea7f..47e34ca2 100644 --- a/tests/settings/test_defaults.py +++ b/tests/settings/test_defaults.py @@ -7,6 +7,7 @@ async def test_declared_public_interface_and_promised_defaults(): settings = kopf.OperatorSettings() assert settings.posting.level == logging.INFO assert settings.peering.name == "default" + assert settings.peering.stealth == False assert settings.peering.priority == 0 assert settings.peering.lifetime == 60 assert settings.peering.mandatory == False From abed46223ec54ad50c66a10edb0f9b4e3b526f5d Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sat, 31 Oct 2020 16:46:26 +0100 Subject: [PATCH 5/5] Fix a grammar mistake in the default peering log message --- docs/walkthrough/starting.rst | 2 +- examples/12-embedded/README.md | 2 +- kopf/engines/peering.py | 2 +- tests/peering/test_peer_detection.py | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/walkthrough/starting.rst b/docs/walkthrough/starting.rst index 277fae7c..19f23993 100644 --- a/docs/walkthrough/starting.rst +++ b/docs/walkthrough/starting.rst @@ -38,7 +38,7 @@ The output looks like this: .. code-block:: none [2019-05-31 10:42:11,870] kopf.config [DEBUG ] configured via kubeconfig file - [2019-05-31 10:42:11,913] kopf.reactor.peering [WARNING ] Default peering object not found, falling back to the standalone mode. + [2019-05-31 10:42:11,913] kopf.reactor.peering [WARNING ] Default peering object is not found, falling back to the standalone mode. [2019-05-31 10:42:12,037] kopf.reactor.handlin [DEBUG ] [default/my-claim] First appearance: {'apiVersion': 'zalando.org/v1', 'kind': 'EphemeralVolumeClaim', 'metadata': {'annotations': {'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"zalando.org/v1","kind":"EphemeralVolumeClaim","metadata":{"annotations":{},"name":"my-claim","namespace":"default"}}\n'}, 'creationTimestamp': '2019-05-29T00:41:57Z', 'generation': 1, 'name': 'my-claim', 'namespace': 'default', 'resourceVersion': '47720', 'selfLink': '/apis/zalando.org/v1/namespaces/default/ephemeralvolumeclaims/my-claim', 'uid': '904c2b9b-81aa-11e9-a202-a6e6b278a294'}} [2019-05-31 10:42:12,038] kopf.reactor.handlin [DEBUG ] [default/my-claim] Adding the finalizer, thus preventing the actual deletion. [2019-05-31 10:42:12,038] kopf.reactor.handlin [DEBUG ] [default/my-claim] Patching with: {'metadata': {'finalizers': ['KopfFinalizerMarker']}} diff --git a/examples/12-embedded/README.md b/examples/12-embedded/README.md index 15471f88..8b097dd0 100644 --- a/examples/12-embedded/README.md +++ b/examples/12-embedded/README.md @@ -21,7 +21,7 @@ Starting the main app. [DEBUG ] Pykube is configured via kubeconfig file. [DEBUG ] Client is configured via kubeconfig file. -[WARNING ] Default peering object not found, falling back to the standalone mode. +[WARNING ] Default peering object is not found, falling back to the standalone mode. [WARNING ] OS signals are ignored: running not in the main thread. Do the main app activity here. Step 1/3. diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 98d680dd..46c942ea 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -231,7 +231,7 @@ async def detect_presence( if settings.peering.mandatory and obj is None: raise Exception(f"The mandatory peering {name!r} was not found.") elif obj is None: - logger.warning(f"Default peering object not found, falling back to the standalone mode.") + logger.warning(f"Default peering object is not found, falling back to the standalone mode.") return False else: return True diff --git a/tests/peering/test_peer_detection.py b/tests/peering/test_peer_detection.py index 5555c40b..0a3a4d21 100644 --- a/tests/peering/test_peer_detection.py +++ b/tests/peering/test_peer_detection.py @@ -78,7 +78,7 @@ async def test_fallback_with_cluster_scoped(hostname, aresponses, assert_logs, c peering = await detect_presence(settings=settings, namespace=None) assert peering is False assert_logs([ - "Default peering object not found, falling back to the standalone mode." + "Default peering object is not found, falling back to the standalone mode." ]) @@ -90,5 +90,5 @@ async def test_fallback_with_namespace_scoped(hostname, aresponses, assert_logs, peering = await detect_presence(settings=settings, namespace='namespace') assert peering is False assert_logs([ - "Default peering object not found, falling back to the standalone mode." + "Default peering object is not found, falling back to the standalone mode." ])