Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions kubernetes/base/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,45 @@ def stream(self, func, *args, **kwargs):
disable_retries = ('timeout_seconds' in kwargs)
retry_after_410 = False
deserialize = kwargs.pop('deserialize', True)

# If no resource_version is specified for a watch (not a log follow),
# perform an initial list call to get the current resourceVersion from
# the list metadata. This ensures that any subsequent watch restart
# after a 410 uses a valid, recent resourceVersion rather than a
# potentially stale one from an individual resource event.
if watch_arg == 'watch' and self.resource_version is None:
_list_excluded = {watch_arg, '_preload_content',
'allow_watch_bookmarks', 'timeout_seconds'}
list_kwargs = {k: v for k, v in kwargs.items()
if k not in _list_excluded}
initial_list = func(*args, **list_kwargs)
if (hasattr(initial_list, 'metadata')
and hasattr(initial_list.metadata, 'resource_version')
and isinstance(
initial_list.metadata.resource_version, str)
and initial_list.metadata.resource_version):
self.resource_version = initial_list.metadata.resource_version
kwargs['resource_version'] = self.resource_version
if (hasattr(initial_list, 'items')
and isinstance(initial_list.items, list)):
for item in initial_list.items:
raw_obj = \
self._api_client.sanitize_for_serialization(item)
if deserialize:
yield {
'type': 'ADDED',
'object': item,
'raw_object': raw_obj,
}
else:
yield {
'type': 'ADDED',
'object': raw_obj,
'raw_object': raw_obj,
}
if self._stop:
return

while True:
resp = func(*args, **kwargs)
try:
Expand Down
84 changes: 78 additions & 6 deletions kubernetes/base/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def test_watch_with_decode(self):
# make sure that all three records were consumed by the stream
self.assertEqual(4, count)

fake_api.get_namespaces.assert_called_once_with(
# The function is called twice: first for the initial list (no watch
# kwargs), then for the actual watch (with resource_version from list).
self.assertEqual(fake_api.get_namespaces.call_count, 2)
fake_api.get_namespaces.assert_called_with(
_preload_content=False, watch=True)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
Expand Down Expand Up @@ -292,6 +295,67 @@ def get_values(*args, **kwargs):
# more strict test with worse error message
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)

def test_watch_with_initial_list_resource_version(self):
"""Verify the list-then-watch pattern.

When stream() is called without a resource_version, it should:
1. Perform an initial list call to get the current resourceVersion.
2. Yield items from that list as ADDED events.
3. Start the watch from the list's resourceVersion so that
subsequent restarts after a 410 use a valid, recent version.
"""
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
# The watch stream returns one new event after the initial list.
fake_resp.stream = Mock(
return_value=[
'{"type": "MODIFIED", "object": {"metadata": '
'{"name": "ns-new", "resourceVersion": "200"}, '
'"spec": {}, "status": {}}}\n',
])

# Build a real-ish list response with two existing namespaces.
ns1 = client.V1Namespace(
metadata=client.V1ObjectMeta(
name="ns-one", resource_version="100"))
ns2 = client.V1Namespace(
metadata=client.V1ObjectMeta(
name="ns-two", resource_version="150"))
fake_list = client.V1NamespaceList(
metadata=client.V1ListMeta(resource_version="180"),
items=[ns1, ns2])

def _list_or_watch(*args, **kwargs):
return fake_resp if kwargs.get('watch') else fake_list

fake_api = Mock()
fake_api.list_namespaces = Mock(side_effect=_list_or_watch)
fake_api.list_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
events = []
for e in w.stream(fake_api.list_namespaces, timeout_seconds=1):
events.append(e)

# The two existing namespaces must appear first as ADDED events.
self.assertEqual(len(events), 3)
self.assertEqual(events[0]['type'], 'ADDED')
self.assertEqual(events[0]['object'].metadata.name, 'ns-one')
self.assertEqual(events[1]['type'], 'ADDED')
self.assertEqual(events[1]['object'].metadata.name, 'ns-two')
# The new event from the watch stream follows.
self.assertEqual(events[2]['type'], 'MODIFIED')
self.assertEqual(events[2]['object'].metadata.name, 'ns-new')

# The watch must have started from the list's resourceVersion.
fake_api.list_namespaces.assert_has_calls([
call(),
call(_preload_content=False, watch=True,
timeout_seconds=1, resource_version="180"),
])
self.assertEqual(w.resource_version, "200")

def test_watch_stream_twice(self):
w = Watch(float)
for step in ['first', 'second']:
Expand All @@ -312,7 +376,10 @@ def test_watch_stream_twice(self):
w.stop()

self.assertEqual(count, 3)
fake_api.get_namespaces.assert_called_once_with(
# The function is called twice per stream() invocation: once for
# the initial list call and once for the actual watch call.
self.assertEqual(fake_api.get_namespaces.call_count, 2)
fake_api.get_namespaces.assert_called_with(
_preload_content=False, watch=True)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
Expand Down Expand Up @@ -346,7 +413,9 @@ def test_watch_stream_loop(self):
w.stop()

self.assertEqual(count, 2)
self.assertEqual(fake_api.get_namespaces.call_count, 2)
# Each stream() call makes 2 API calls: initial list + watch.
# Two stream() calls = 4 total API calls.
self.assertEqual(fake_api.get_namespaces.call_count, 4)
self.assertEqual(fake_resp.stream.call_count, 2)
self.assertEqual(fake_resp.close.call_count, 2)
self.assertEqual(fake_resp.release_conn.call_count, 2)
Expand Down Expand Up @@ -423,8 +492,9 @@ def test_watch_with_exception(self):
pass
# expected

fake_api.get_thing.assert_called_once_with(
fake_api.get_thing.assert_called_with(
_preload_content=False, watch=True)
self.assertEqual(fake_api.get_thing.call_count, 2)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
Expand All @@ -447,8 +517,9 @@ def test_watch_with_error_event(self):
# No retry is attempted either, preventing an ApiException
assert not list(w.stream(fake_api.get_thing))

fake_api.get_thing.assert_called_once_with(
fake_api.get_thing.assert_called_with(
_preload_content=False, watch=True)
self.assertEqual(fake_api.get_thing.call_count, 2)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
Expand Down Expand Up @@ -500,8 +571,9 @@ def test_watch_with_error_event_and_timeout_param(self):
except client.rest.ApiException:
pass

fake_api.get_thing.assert_called_once_with(
fake_api.get_thing.assert_called_with(
_preload_content=False, watch=True, timeout_seconds=10)
self.assertEqual(fake_api.get_thing.call_count, 2)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
Expand Down