diff --git a/watch/watch.py b/watch/watch.py index fb4c1abf..bdf24f1a 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -124,6 +124,8 @@ def stream(self, func, *args, **kwargs): return_type = self.get_return_type(func) kwargs['watch'] = True kwargs['_preload_content'] = False + if 'resource_version' in kwargs: + self.resource_version = kwargs['resource_version'] timeouts = ('timeout_seconds' in kwargs) while True: diff --git a/watch/watch_test.py b/watch/watch_test.py index f2804f4a..08eb36c2 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -16,12 +16,15 @@ import unittest -from mock import Mock +from mock import Mock, call from .watch import Watch class WatchTests(unittest.TestCase): + def setUp(self): + # counter for a test that needs test global state + self.callcount = 0 def test_watch_with_decode(self): fake_resp = Mock() @@ -64,6 +67,74 @@ def test_watch_with_decode(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_resource_version_set(self): + # https://github.com/kubernetes-client/python/issues/700 + # ensure watching from a resource version does reset to resource + # version 0 after k8s resets the watch connection + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + values = [ + '{"type": "ADDED", "object": {"metadata": {"name": "test1",' + '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + '{"type": "ADDED", "object": {"metadata": {"name": "test2",' + '"resourceVersion": "2"}, "spec": {}, "sta', + 'tus": {}}}\n' + '{"type": "ADDED", "object": {"metadata": {"name": "test3",' + '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n' + ] + # return nothing on the first call and values on the second + # this emulates a watch from a rv that returns nothing in the first k8s + # watch reset and values later + + def get_values(*args, **kwargs): + self.callcount += 1 + if self.callcount == 1: + return [] + else: + return values + + fake_resp.read_chunked = Mock( + side_effect=get_values) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + # ensure we keep our requested resource version or the version latest + # returned version when the existing versions are older than the + # requested version + # needed for the list existing objects, then watch from there use case + calls = [] + + iterations = 2 + # first two calls must use the passed rv, the first call is a + # "reset" and does not actually return anything + # the second call must use the same rv but will return values + # (with a wrong rv but a real cluster would behave correctly) + # calls following that will use the rv from those returned values + calls.append(call(_preload_content=False, watch=True, + resource_version="5")) + calls.append(call(_preload_content=False, watch=True, + resource_version="5")) + for i in range(iterations): + # ideally we want 5 here but as rv must be treated as an + # opaque value we cannot interpret it and order it so rely + # on k8s returning the events completely and in order + calls.append(call(_preload_content=False, watch=True, + resource_version="3")) + + for c, e in enumerate(w.stream(fake_api.get_namespaces, + resource_version="5")): + if c == len(values) * iterations: + w.stop() + + # check calls are in the list, gives good error output + fake_api.get_namespaces.assert_has_calls(calls) + # more strict test with worse error message + self.assertEqual(fake_api.get_namespaces.mock_calls, calls) + def test_watch_stream_twice(self): w = Watch(float) for step in ['first', 'second']: