-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Add SharedInformer implementation to python-client #2515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
ab86645
ef3f21c
9c708ad
d21aa1b
63bd3d5
4bf1e06
90c2b02
5a8fef1
b91f278
25d1f12
6e39732
1d10150
6425bab
a858453
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| # Copyright 2024 The Kubernetes Authors. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Example: use SharedInformer to watch pods in the default namespace. | ||
|
|
||
| The informer runs a background daemon thread that keeps a local cache | ||
| synchronised with the Kubernetes API server. The main thread is free to | ||
| query the cache at any time without worrying about connectivity or retries. | ||
| """ | ||
|
|
||
| import time | ||
|
|
||
| import kubernetes | ||
| from kubernetes import config | ||
| from kubernetes.client import CoreV1Api | ||
| from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer | ||
|
|
||
|
|
||
| def on_pod_added(pod): | ||
| name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] | ||
| print("[ADDED] ", name) | ||
|
|
||
|
|
||
| def on_pod_modified(pod): | ||
| name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] | ||
| print("[MODIFIED]", name) | ||
|
|
||
|
|
||
| def on_pod_deleted(pod): | ||
| name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] | ||
| print("[DELETED] ", name) | ||
|
|
||
|
|
||
| def main(): | ||
| config.load_kube_config() | ||
|
|
||
| v1 = CoreV1Api() | ||
| informer = SharedInformer( | ||
| list_func=v1.list_namespaced_pod, | ||
| namespace="default", | ||
| resync_period=60, | ||
| ) | ||
|
|
||
| informer.add_event_handler(ADDED, on_pod_added) | ||
| informer.add_event_handler(MODIFIED, on_pod_modified) | ||
| informer.add_event_handler(DELETED, on_pod_deleted) | ||
|
|
||
| informer.start() | ||
| print('Informer started. Watching pods in "default" namespace ...') | ||
|
|
||
| try: | ||
| while True: | ||
| cached = informer.cache.list() | ||
| print("Cached pods: {}".format(len(cached))) | ||
| time.sleep(10) | ||
| except KeyboardInterrupt: | ||
| pass | ||
| finally: | ||
| informer.stop() | ||
| print("Informer stopped.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,3 +23,4 @@ | |
| from . import stream | ||
| from . import utils | ||
| from . import leaderelection | ||
| from . import informer | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,177 @@ | ||
| # Copyright 2024 The Kubernetes Authors. | ||
| # Licensed under the Apache License, Version 2.0 (the "License"). | ||
| # End-to-end tests for kubernetes.informer.SharedInformer. | ||
|
|
||
| import threading | ||
| import time | ||
| import unittest | ||
| import uuid | ||
|
|
||
| from kubernetes.client import api_client | ||
| from kubernetes.client.api import core_v1_api | ||
| from kubernetes.e2e_test import base | ||
| from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer | ||
|
|
||
| _TIMEOUT = 30 | ||
|
|
||
|
|
||
| def _uid(): | ||
| return str(uuid.uuid4())[-12:] | ||
|
|
||
|
|
||
| def _cm(name, payload=None): | ||
| return { | ||
| "apiVersion": "v1", | ||
| "kind": "ConfigMap", | ||
| "metadata": {"name": name, "labels": {"inf-e2e": "1"}}, | ||
| "data": payload or {"k": "v"}, | ||
| } | ||
|
|
||
|
|
||
| def _name_of(obj): | ||
| if hasattr(obj, "metadata"): | ||
| return obj.metadata.name | ||
| return (obj.get("metadata") or {}).get("name") | ||
|
|
||
|
|
||
| class TestSharedInformerE2E(unittest.TestCase): | ||
|
|
||
| @classmethod | ||
| def setUpClass(cls): | ||
| cls.cfg = base.get_e2e_configuration() | ||
| cls.apiclient = api_client.ApiClient(configuration=cls.cfg) | ||
| cls.api = core_v1_api.CoreV1Api(cls.apiclient) | ||
|
|
||
| def _drop(self, cm_name): | ||
| try: | ||
| self.api.delete_namespaced_config_map(name=cm_name, namespace="default") | ||
| except Exception: | ||
| pass | ||
|
|
||
| def _expect(self, ev, label): | ||
| if not ev.wait(timeout=_TIMEOUT): | ||
| self.fail("Timeout waiting for: " + label) | ||
|
|
||
| def _wait_in_cache(self, inf, key): | ||
| stop = time.monotonic() + _TIMEOUT | ||
| while time.monotonic() < stop: | ||
| if inf.cache.get_by_key(key) is not None: | ||
| return | ||
| time.sleep(0.25) | ||
| self.fail("key " + key + " never appeared in cache") | ||
|
|
||
| def _wait_listed(self, inf): | ||
| stop = time.monotonic() + _TIMEOUT | ||
| while inf._resource_version is None and time.monotonic() < stop: | ||
| time.sleep(0.1) | ||
| self.assertIsNotNone(inf._resource_version, "initial list never completed") | ||
|
|
||
| # ------------------------------------------------------- | ||
|
|
||
| def test_cache_populated_after_start(self): | ||
| """Pre-existing ConfigMaps appear in the cache once the informer starts.""" | ||
| name = "inf-pre-" + _uid() | ||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self.addCleanup(self._drop, name) | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
|
|
||
| self._wait_in_cache(inf, "default/" + name) | ||
| self.assertEqual(_name_of(inf.cache.get_by_key("default/" + name)), name) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other than the name, shall we also verify the data of the stored configmap, to ensure the cache was correctly populated? |
||
|
|
||
| def test_added_event_and_cache_entry(self): | ||
| """Creating a ConfigMap fires ADDED and the object appears in the cache.""" | ||
| name = "inf-add-" + _uid() | ||
| seen = threading.Event() | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
| self.addCleanup(self._drop, name) | ||
|
|
||
| self._wait_listed(inf) | ||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self._expect(seen, "ADDED/" + name) | ||
| self.assertIsNotNone(inf.cache.get_by_key("default/" + name)) | ||
|
|
||
| def test_modified_event_and_cache_refresh(self): | ||
| """Patching a ConfigMap fires MODIFIED and the cache holds the updated object.""" | ||
| name = "inf-mod-" + _uid() | ||
| seen = threading.Event() | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.add_event_handler(MODIFIED, lambda o: seen.set() if _name_of(o) == name else None) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
| self.addCleanup(self._drop, name) | ||
|
|
||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self._wait_in_cache(inf, "default/" + name) | ||
|
|
||
| self.api.patch_namespaced_config_map( | ||
| name=name, namespace="default", body={"data": {"k": "updated"}} | ||
| ) | ||
| self._expect(seen, "MODIFIED/" + name) | ||
| self.assertIsNotNone(inf.cache.get_by_key("default/" + name)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we also confirm the updated data for this modified configmap? |
||
|
|
||
| def test_deleted_event_removes_from_cache(self): | ||
| """Deleting a ConfigMap fires DELETED and removes it from the cache.""" | ||
| name = "inf-del-" + _uid() | ||
| seen = threading.Event() | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.add_event_handler(DELETED, lambda o: seen.set() if _name_of(o) == name else None) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
|
|
||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self._wait_in_cache(inf, "default/" + name) | ||
|
|
||
| self.api.delete_namespaced_config_map(name=name, namespace="default") | ||
| self._expect(seen, "DELETED/" + name) | ||
| self.assertIsNone(inf.cache.get_by_key("default/" + name)) | ||
|
|
||
| def test_resource_version_advances(self): | ||
| """The stored resourceVersion advances after watch events are received.""" | ||
| name = "inf-rv-" + _uid() | ||
| seen = threading.Event() | ||
|
|
||
| inf = SharedInformer( | ||
| list_func=self.api.list_namespaced_config_map, | ||
| namespace="default", | ||
| label_selector="inf-e2e=1", | ||
| ) | ||
| inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None) | ||
| inf.start() | ||
| self.addCleanup(inf.stop) | ||
| self.addCleanup(self._drop, name) | ||
|
|
||
| self._wait_listed(inf) | ||
| rv_before = int(inf._resource_version) | ||
|
|
||
| self.api.create_namespaced_config_map(body=_cm(name), namespace="default") | ||
| self._expect(seen, "ADDED/" + name) | ||
| self.assertGreater(int(inf._resource_version), rv_before) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| # Copyright 2024 The Kubernetes Authors. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: wrong copyright year? Does it matter? |
||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from .cache import ObjectCache, _meta_namespace_key | ||
| from .informer import SharedInformer, ADDED, MODIFIED, DELETED, BOOKMARK, ERROR | ||
|
|
||
| __all__ = [ | ||
| "ObjectCache", | ||
| "_meta_namespace_key", | ||
| "SharedInformer", | ||
| "ADDED", | ||
| "MODIFIED", | ||
| "DELETED", | ||
| "BOOKMARK", | ||
| "ERROR", | ||
| ] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| # Copyright 2024 The Kubernetes Authors. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Thread-safe in-memory store for the Kubernetes informer.""" | ||
|
|
||
| import threading | ||
|
|
||
|
|
||
| def _meta_namespace_key(obj): | ||
| """Build a lookup key from object metadata. | ||
|
|
||
| Supports both dict-based objects and generated model objects. | ||
| Returns namespace/name for namespaced objects, just name otherwise. | ||
| """ | ||
| if isinstance(obj, dict): | ||
| meta = obj.get("metadata") or {} | ||
| ns = meta.get("namespace") or "" | ||
| name = meta.get("name") or "" | ||
| else: | ||
| meta = getattr(obj, "metadata", None) | ||
| if meta is None: | ||
| return "" | ||
| if hasattr(meta, "namespace"): | ||
| ns = getattr(meta, "namespace", None) or "" | ||
| name = getattr(meta, "name", None) or "" | ||
| else: | ||
| ns = meta.get("namespace") or "" | ||
| name = meta.get("name") or "" | ||
| if ns: | ||
| return "{}/{}".format(ns, name) | ||
| return name | ||
|
|
||
|
|
||
| class ObjectCache: | ||
| """Thread-safe in-memory mapping of Kubernetes objects. | ||
|
|
||
| The SharedInformer keeps this store synchronised with the API server. | ||
| Consumers can call list() and get_by_key() from any thread safely. | ||
| """ | ||
|
|
||
| def __init__(self, key_func=None): | ||
| self._key_func = key_func if key_func is not None else _meta_namespace_key | ||
| self._objects = {} | ||
| self._rlock = threading.RLock() | ||
|
|
||
| # --- mutation helpers (called by SharedInformer) --- | ||
|
|
||
| def _put(self, obj): | ||
| key = self._key_func(obj) | ||
| with self._rlock: | ||
| self._objects[key] = obj | ||
|
|
||
| def _remove(self, obj): | ||
| key = self._key_func(obj) | ||
| with self._rlock: | ||
| self._objects.pop(key, None) | ||
|
|
||
| def _replace_all(self, objects): | ||
| rebuilt = {self._key_func(o): o for o in objects} | ||
| with self._rlock: | ||
| self._objects = rebuilt | ||
|
|
||
| # --- public read API --- | ||
|
|
||
| def list(self): | ||
| """Return a snapshot list of all cached objects.""" | ||
| with self._rlock: | ||
| return list(self._objects.values()) | ||
|
|
||
| def list_keys(self): | ||
| """Return a snapshot list of all cache keys.""" | ||
| with self._rlock: | ||
| return list(self._objects.keys()) | ||
|
|
||
| def get(self, obj): | ||
| """Look up the cached copy of obj. Returns None when absent.""" | ||
| key = self._key_func(obj) | ||
| return self.get_by_key(key) | ||
|
|
||
| def get_by_key(self, key): | ||
| """Look up an object by key. Returns None when absent.""" | ||
| with self._rlock: | ||
| return self._objects.get(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/2024/2026/, please update all files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 1d10150 — all 6 newly created files now have
Copyright 2026.