diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 96a0240b..7e1258ff 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,8 @@ ## Upgrading - +- `LatestValueCache` now closes the receiver when it is stopped. +- Fetching values from stopped `LatestValueCache` instances is now disallowed. ## New Features diff --git a/src/frequenz/channels/_latest_value_cache.py b/src/frequenz/channels/_latest_value_cache.py index a9af9436..4a3dff87 100644 --- a/src/frequenz/channels/_latest_value_cache.py +++ b/src/frequenz/channels/_latest_value_cache.py @@ -53,6 +53,9 @@ class LatestValueCache(typing.Generic[T_co]): It provides a way to look up the latest value in a stream without any delay, as long as there has been one value received. + + Takes ownership of the receiver. When the cache is stopped, the receiver + will be closed. """ def __init__( @@ -66,12 +69,13 @@ def __init__( provided, a unique identifier will be generated from the object's [`id()`][id]. It is used mostly for debugging purposes. """ - self._receiver = receiver + self._receiver: Receiver[T_co] = receiver self._unique_id: str = hex(id(self)) if unique_id is None else unique_id self._latest_value: T_co | _Sentinel = _Sentinel() - self._task = asyncio.create_task( + self._task: asyncio.Task[None] = asyncio.create_task( self._run(), name=f"LatestValueCache«{self._unique_id}»" ) + self._stopped: bool = False @property def unique_id(self) -> str: @@ -93,6 +97,8 @@ def get(self) -> T_co: """ if isinstance(self._latest_value, _Sentinel): raise ValueError("No value has been received yet.") + if self._stopped: + raise ValueError("Cache has been stopped.") return self._latest_value def has_value(self) -> bool: @@ -108,7 +114,9 @@ async def _run(self) -> None: self._latest_value = value async def stop(self) -> None: - """Stop the cache.""" + """Stop the cache and close the owned receiver.""" + self._receiver.close() + self._stopped = True if not self._task.done(): self._task.cancel() try: