Skip to content

Commit 6e39732

Browse files
Address 6 code review comments: _fire docstring, resync fix, test assertions, e2e data checks
Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com>
1 parent 25d1f12 commit 6e39732

3 files changed

Lines changed: 80 additions & 21 deletions

File tree

kubernetes/e2e_test/test_informer.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,11 @@ def test_cache_populated_after_start(self):
8383
self.addCleanup(inf.stop)
8484

8585
self._wait_in_cache(inf, "default/" + name)
86-
self.assertEqual(_name_of(inf.cache.get_by_key("default/" + name)), name)
86+
cached = inf.cache.get_by_key("default/" + name)
87+
self.assertEqual(_name_of(cached), name)
88+
# Verify the cached object actually contains the expected data payload.
89+
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
90+
self.assertEqual(data.get("k"), "v")
8791

8892
def test_added_event_and_cache_entry(self):
8993
"""Creating a ConfigMap fires ADDED and the object appears in the cache."""
@@ -127,7 +131,11 @@ def test_modified_event_and_cache_refresh(self):
127131
name=name, namespace="default", body={"data": {"k": "updated"}}
128132
)
129133
self._expect(seen, "MODIFIED/" + name)
130-
self.assertIsNotNone(inf.cache.get_by_key("default/" + name))
134+
# Verify that the cache now holds the updated data.
135+
cached = inf.cache.get_by_key("default/" + name)
136+
self.assertIsNotNone(cached)
137+
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
138+
self.assertEqual(data.get("k"), "updated")
131139

132140
def test_deleted_event_removes_from_cache(self):
133141
"""Deleting a ConfigMap fires DELETED and removes it from the cache."""

kubernetes/informer/informer.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ def _build_kwargs(self):
173173
return kw
174174

175175
def _fire(self, event_type, obj):
176+
"""Execute all registered callbacks for *event_type*, passing *obj*.
177+
178+
Callbacks are invoked sequentially on the informer's background thread.
179+
Any exception raised by an individual handler is logged and swallowed so
180+
that remaining handlers still run.
181+
"""
176182
with self._handler_lock:
177183
handlers = list(self._handlers.get(event_type, []))
178184
for fn in handlers:
@@ -254,6 +260,13 @@ def _run_loop(self):
254260
self._watch = Watch()
255261
kw = self._build_kwargs()
256262
kw["resource_version"] = self._resource_version
263+
# When a resync period is configured, set a matching server-side
264+
# watch timeout so that the stream exits after resync_period seconds
265+
# even if no events arrive. Without this, a quiet period longer
266+
# than resync_period would never trigger a resync because the check
267+
# below only runs when the generator yields an event.
268+
if self._resync_period > 0:
269+
kw["timeout_seconds"] = max(1, int(self._resync_period))
257270
try:
258271
for event in self._watch.stream(self._list_func, **kw):
259272
if self._stop_event.is_set():
@@ -282,19 +295,6 @@ def _run_loop(self):
282295
self._fire(BOOKMARK, event.get("raw_object", obj))
283296
elif evt_type == ERROR:
284297
self._fire(ERROR, obj)
285-
# Periodic resync: full re-list from the API server, firing
286-
# ADDED/MODIFIED/DELETED for any changes since the last list.
287-
if (
288-
self._resync_period > 0
289-
and (time.monotonic() - last_resync) >= self._resync_period
290-
):
291-
logger.debug("Informer resync triggered")
292-
try:
293-
self._initial_list()
294-
except Exception as exc:
295-
logger.exception("Error during resync list; continuing")
296-
self._fire(ERROR, exc)
297-
last_resync = time.monotonic()
298298
except ApiException as exc:
299299
if exc.status == 410:
300300
# The stored resource version is too old; force a full re-list.
@@ -323,3 +323,20 @@ def _run_loop(self):
323323
):
324324
self._resource_version = self._watch.resource_version
325325
self._watch = None
326+
327+
# Periodic resync: after the watch stream exits (whether due to the
328+
# server-side timeout_seconds, a stop request, or an error) check if
329+
# a resync is due. This path is what actually fires the resync when
330+
# the cluster is quiet and no events arrive for resync_period seconds.
331+
if (
332+
not self._stop_event.is_set()
333+
and self._resource_version is not None # 410 already schedules a re-list
334+
and self._resync_period > 0
335+
and (time.monotonic() - last_resync) >= self._resync_period
336+
):
337+
logger.debug("Informer resync triggered")
338+
try:
339+
self._initial_list()
340+
except Exception as exc:
341+
logger.exception("Error during resync list; continuing")
342+
self._fire(ERROR, exc)

kubernetes/test/test_informer.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ def worker(n):
114114
for t in threads:
115115
t.join()
116116
self.assertEqual(errors, [])
117+
# Verify that the cache actually holds the objects that were put into it.
118+
for n in range(5):
119+
for i in range(50):
120+
key = "default/pod-{}-{}".format(n, i)
121+
self.assertIsNotNone(
122+
self.cache.get_by_key(key),
123+
"expected key {} in cache".format(key),
124+
)
117125

118126

119127
class TestSharedInformerHandlers(unittest.TestCase):
@@ -360,7 +368,13 @@ def fake_stream(func, **kw):
360368
self.assertIs(cached[0], pod)
361369

362370
def test_resync_period_triggers_full_list(self):
363-
"""A full List call must be made to the API server on every resync_period."""
371+
"""A full List call must be made to the API server on every resync_period.
372+
373+
With the new implementation the watch stream is given a server-side
374+
timeout equal to resync_period (via timeout_seconds). When the stream
375+
exits, the elapsed-time check fires the resync even if no events
376+
arrived – this is exactly the scenario this test exercises.
377+
"""
364378
pod = _make_pod("default", "resync-pod")
365379

366380
list_func = MagicMock()
@@ -371,20 +385,31 @@ def test_resync_period_triggers_full_list(self):
371385

372386
informer = SharedInformer(list_func=list_func, resync_period=60)
373387

388+
stream_calls = {"n": 0}
389+
374390
with patch("kubernetes.informer.informer.Watch") as MockWatch, \
375391
patch("kubernetes.informer.informer.time") as mock_time:
376-
# Sequence of time.monotonic() calls:
377-
# 1. last_resync = time.monotonic() → 0.0
378-
# 2. (time.monotonic() - last_resync) check → 61.0 (triggers resync)
379-
# 3. last_resync = time.monotonic() → 61.0 (reset after resync)
392+
# Sequence of time.monotonic() calls inside _run_loop:
393+
# 1. last_resync = time.monotonic() → 0.0 (watch-loop start)
394+
# 2. post-stream: time.monotonic() → 61.0 (≥60 → resync fires)
395+
# 3. last_resync = time.monotonic() → 61.0 (second watch-loop start)
396+
# The stop_event is set during the second stream, so the
397+
# post-stream check is short-circuited and no further calls occur.
380398
mock_time.monotonic.side_effect = [0.0, 61.0, 61.0]
399+
mock_time.sleep = time.sleep # keep real sleep/wait working
381400

382401
mock_w = MagicMock()
383402
mock_w.resource_version = "5"
384403

385404
def fake_stream(func, **kw):
386-
yield {"type": "ADDED", "object": pod}
405+
stream_calls["n"] += 1
406+
if stream_calls["n"] == 1:
407+
# Simulate the stream timing out (timeout_seconds expired)
408+
# with no events – the resync should fire after this returns.
409+
return iter([])
410+
# Second iteration: stop the informer.
387411
informer._stop_event.set()
412+
return iter([])
388413

389414
mock_w.stream.side_effect = fake_stream
390415
MockWatch.return_value = mock_w
@@ -955,8 +980,12 @@ def list_func(**kw):
955980
return resp
956981

957982
modified = []
983+
added = []
984+
deleted = []
958985
informer = SharedInformer(list_func=list_func)
959986
informer.add_event_handler(MODIFIED, modified.append)
987+
informer.add_event_handler(ADDED, added.append)
988+
informer.add_event_handler(DELETED, deleted.append)
960989

961990
stream_calls = {"n": 0}
962991

@@ -980,6 +1009,11 @@ def fake_stream(func, **kw):
9801009
# pod was in both the initial list (call 1) and the re-list (call 2).
9811010
# On the re-list it should fire MODIFIED (not ADDED again).
9821011
self.assertIn(pod, modified)
1012+
# ADDED fires exactly once: for the initial list. The re-list must
1013+
# NOT fire a second ADDED for an already-cached item.
1014+
self.assertEqual(len(added), 1, "ADDED should fire once (initial list) but not again on re-list")
1015+
# DELETED must not fire for an item present in both lists.
1016+
self.assertEqual(deleted, [], "DELETED should not fire for an item present in both lists")
9831017
# Still in cache.
9841018
self.assertIsNotNone(informer.cache.get_by_key("default/stable-pod"))
9851019

0 commit comments

Comments
 (0)