From 72e81bdcb9aa6e69a1fe093b4ef38b483b2d0a98 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 00:56:21 +0000 Subject: [PATCH 1/3] Initial plan From 5ff0982b9a528b4b7d6a36642b4cd9ddd213e7e0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 01:09:32 +0000 Subject: [PATCH 2/3] Implement list-then-watch pattern for resilient watch restarts (issue: stale resourceVersion after 410) Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/base/watch/watch.py | 40 ++++++++++++++ kubernetes/base/watch/watch_test.py | 85 +++++++++++++++++++++++++++-- 2 files changed, 119 insertions(+), 6 deletions(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 40e5e75bf..a2262584d 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -187,6 +187,46 @@ def stream(self, func, *args, **kwargs): disable_retries = ('timeout_seconds' in kwargs) retry_after_410 = False deserialize = kwargs.pop('deserialize', True) + + # If no resource_version is specified for a watch (not a log follow), + # perform an initial list call to get the current resourceVersion from + # the list metadata. This ensures that any subsequent watch restart + # after a 410 uses a valid, recent resourceVersion rather than a + # potentially stale one from an individual resource event. + if watch_arg == 'watch' and self.resource_version is None: + list_kwargs = {k: v for k, v in kwargs.items() + if k not in (watch_arg, '_preload_content', + 'allow_watch_bookmarks', + 'timeout_seconds')} + initial_list = func(*args, **list_kwargs) + if (hasattr(initial_list, 'metadata') + and hasattr(initial_list.metadata, 'resource_version') + and isinstance( + initial_list.metadata.resource_version, str) + and initial_list.metadata.resource_version): + self.resource_version = \ + initial_list.metadata.resource_version + kwargs['resource_version'] = self.resource_version + if (hasattr(initial_list, 'items') + and isinstance(initial_list.items, list)): + for item in initial_list.items: + raw_obj = \ + self._api_client.sanitize_for_serialization(item) + if deserialize: + yield { + 'type': 'ADDED', + 'object': item, + 'raw_object': raw_obj, + } + else: + yield { + 'type': 'ADDED', + 'object': raw_obj, + 'raw_object': raw_obj, + } + if self._stop: + return + while True: resp = func(*args, **kwargs) try: diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index d872020b4..721709949 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -15,6 +15,7 @@ import os import time import unittest +from types import SimpleNamespace from unittest.mock import Mock, call from kubernetes import client, config @@ -66,7 +67,10 @@ def test_watch_with_decode(self): # make sure that all three records were consumed by the stream self.assertEqual(4, count) - fake_api.get_namespaces.assert_called_once_with( + # The function is called twice: first for the initial list (no watch + # kwargs), then for the actual watch (with resource_version from list). + self.assertEqual(fake_api.get_namespaces.call_count, 2) + fake_api.get_namespaces.assert_called_with( _preload_content=False, watch=True) fake_resp.stream.assert_called_once_with( amt=None, decode_content=False) @@ -292,6 +296,67 @@ def get_values(*args, **kwargs): # more strict test with worse error message self.assertEqual(fake_api.get_namespaces.mock_calls, calls) + def test_watch_with_initial_list_resource_version(self): + """Verify the list-then-watch pattern. + + When stream() is called without a resource_version, it should: + 1. Perform an initial list call to get the current resourceVersion. + 2. Yield items from that list as ADDED events. + 3. Start the watch from the list's resourceVersion so that + subsequent restarts after a 410 use a valid, recent version. + """ + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + # The watch stream returns one new event after the initial list. + fake_resp.stream = Mock( + return_value=[ + '{"type": "MODIFIED", "object": {"metadata": ' + '{"name": "ns-new", "resourceVersion": "200"}, ' + '"spec": {}, "status": {}}}\n', + ]) + + # Build a real-ish list response with two existing namespaces. + ns1 = client.V1Namespace( + metadata=client.V1ObjectMeta( + name="ns-one", resource_version="100")) + ns2 = client.V1Namespace( + metadata=client.V1ObjectMeta( + name="ns-two", resource_version="150")) + fake_list = client.V1NamespaceList( + metadata=client.V1ListMeta(resource_version="180"), + items=[ns1, ns2]) + + def _list_or_watch(*args, **kwargs): + return fake_resp if kwargs.get('watch') else fake_list + + fake_api = Mock() + fake_api.list_namespaces = Mock(side_effect=_list_or_watch) + fake_api.list_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + events = [] + for e in w.stream(fake_api.list_namespaces, timeout_seconds=1): + events.append(e) + + # The two existing namespaces must appear first as ADDED events. + self.assertEqual(len(events), 3) + self.assertEqual(events[0]['type'], 'ADDED') + self.assertEqual(events[0]['object'].metadata.name, 'ns-one') + self.assertEqual(events[1]['type'], 'ADDED') + self.assertEqual(events[1]['object'].metadata.name, 'ns-two') + # The new event from the watch stream follows. + self.assertEqual(events[2]['type'], 'MODIFIED') + self.assertEqual(events[2]['object'].metadata.name, 'ns-new') + + # The watch must have started from the list's resourceVersion. + fake_api.list_namespaces.assert_has_calls([ + call(), + call(_preload_content=False, watch=True, + timeout_seconds=1, resource_version="180"), + ]) + self.assertEqual(w.resource_version, "200") + def test_watch_stream_twice(self): w = Watch(float) for step in ['first', 'second']: @@ -312,7 +377,10 @@ def test_watch_stream_twice(self): w.stop() self.assertEqual(count, 3) - fake_api.get_namespaces.assert_called_once_with( + # The function is called twice per stream() invocation: once for + # the initial list call and once for the actual watch call. + self.assertEqual(fake_api.get_namespaces.call_count, 2) + fake_api.get_namespaces.assert_called_with( _preload_content=False, watch=True) fake_resp.stream.assert_called_once_with( amt=None, decode_content=False) @@ -346,7 +414,9 @@ def test_watch_stream_loop(self): w.stop() self.assertEqual(count, 2) - self.assertEqual(fake_api.get_namespaces.call_count, 2) + # Each stream() call makes 2 API calls: initial list + watch. + # Two stream() calls = 4 total API calls. + self.assertEqual(fake_api.get_namespaces.call_count, 4) self.assertEqual(fake_resp.stream.call_count, 2) self.assertEqual(fake_resp.close.call_count, 2) self.assertEqual(fake_resp.release_conn.call_count, 2) @@ -423,8 +493,9 @@ def test_watch_with_exception(self): pass # expected - fake_api.get_thing.assert_called_once_with( + fake_api.get_thing.assert_called_with( _preload_content=False, watch=True) + self.assertEqual(fake_api.get_thing.call_count, 2) fake_resp.stream.assert_called_once_with( amt=None, decode_content=False) fake_resp.close.assert_called_once() @@ -447,8 +518,9 @@ def test_watch_with_error_event(self): # No retry is attempted either, preventing an ApiException assert not list(w.stream(fake_api.get_thing)) - fake_api.get_thing.assert_called_once_with( + fake_api.get_thing.assert_called_with( _preload_content=False, watch=True) + self.assertEqual(fake_api.get_thing.call_count, 2) fake_resp.stream.assert_called_once_with( amt=None, decode_content=False) fake_resp.close.assert_called_once() @@ -500,8 +572,9 @@ def test_watch_with_error_event_and_timeout_param(self): except client.rest.ApiException: pass - fake_api.get_thing.assert_called_once_with( + fake_api.get_thing.assert_called_with( _preload_content=False, watch=True, timeout_seconds=10) + self.assertEqual(fake_api.get_thing.call_count, 2) fake_resp.stream.assert_called_once_with( amt=None, decode_content=False) fake_resp.close.assert_called_once() From 71b0b1e21f226b5bfa36cbead3bdf6f55e58d870 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 01:10:39 +0000 Subject: [PATCH 3/3] Address code review: remove unused SimpleNamespace import, extract excluded_params set Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/base/watch/watch.py | 9 ++++----- kubernetes/base/watch/watch_test.py | 1 - 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index a2262584d..3ebb1e643 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -194,18 +194,17 @@ def stream(self, func, *args, **kwargs): # after a 410 uses a valid, recent resourceVersion rather than a # potentially stale one from an individual resource event. if watch_arg == 'watch' and self.resource_version is None: + _list_excluded = {watch_arg, '_preload_content', + 'allow_watch_bookmarks', 'timeout_seconds'} list_kwargs = {k: v for k, v in kwargs.items() - if k not in (watch_arg, '_preload_content', - 'allow_watch_bookmarks', - 'timeout_seconds')} + if k not in _list_excluded} initial_list = func(*args, **list_kwargs) if (hasattr(initial_list, 'metadata') and hasattr(initial_list.metadata, 'resource_version') and isinstance( initial_list.metadata.resource_version, str) and initial_list.metadata.resource_version): - self.resource_version = \ - initial_list.metadata.resource_version + self.resource_version = initial_list.metadata.resource_version kwargs['resource_version'] = self.resource_version if (hasattr(initial_list, 'items') and isinstance(initial_list.items, list)): diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index 721709949..2a6c20d75 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -15,7 +15,6 @@ import os import time import unittest -from types import SimpleNamespace from unittest.mock import Mock, call from kubernetes import client, config