-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy path_latest_value_cache.py
More file actions
136 lines (104 loc) · 4.37 KB
/
_latest_value_cache.py
File metadata and controls
136 lines (104 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
"""The LatestValueCache caches the latest value in a receiver.
It provides a way to look up the latest value in a stream whenever required, as
long as there has been one value received.
[LatestValueCache][frequenz.channels.LatestValueCache] takes a
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
value received by that receiver. As soon as a value is received, its
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
the latest value received. The `get` method will raise an exception if called
before any messages have been received from the receiver.
Example:
```python
from frequenz.channels import Broadcast, LatestValueCache
channel = Broadcast[int](name="lvc_test")
cache = LatestValueCache(channel.new_receiver())
sender = channel.new_sender()
assert not cache.has_value()
await sender.send(5)
assert cache.has_value()
assert cache.get() == 5
```
"""
import asyncio
import typing
from ._receiver import Receiver
T_co = typing.TypeVar("T_co", covariant=True)
class _Sentinel:
"""A sentinel to denote that no value has been received yet."""
def __str__(self) -> str:
"""Return a string representation of this sentinel."""
return "<no value received yet>"
class LatestValueCache(typing.Generic[T_co]):
"""A cache that stores the latest value in a receiver.
It provides a way to look up the latest value in a stream without any delay,
as long as there has been one value received.
Takes ownership of the receiver. When the cache is stopped, the receiver
will be closed.
"""
def __init__(
self, receiver: Receiver[T_co], *, unique_id: str | None = None
) -> None:
"""Create a new cache.
Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
"""
self._receiver: Receiver[T_co] = receiver
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value: T_co | _Sentinel = _Sentinel()
self._task: asyncio.Task[None] = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)
self._stopped: bool = False
@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id
def get(self) -> T_co:
"""Return the latest value that has been received.
This raises a `ValueError` if no value has been received yet. Use `has_value` to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.
Returns:
The latest value that has been received.
Raises:
ValueError: If no value has been received yet.
"""
if isinstance(self._latest_value, _Sentinel):
raise ValueError("No value has been received yet.")
if self._stopped:
raise ValueError("Cache has been stopped.")
return self._latest_value
def has_value(self) -> bool:
"""Check whether a value has been received yet.
Returns:
`True` if a value has been received, `False` otherwise.
"""
return not isinstance(self._latest_value, _Sentinel)
async def _run(self) -> None:
async for value in self._receiver:
self._latest_value = value
async def stop(self) -> None:
"""Stop the cache and close the owned receiver."""
self._receiver.close()
self._stopped = True
if not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
def __repr__(self) -> str:
"""Return a string representation of this cache."""
return (
f"<LatestValueCache latest_value={self._latest_value!r}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)
def __str__(self) -> str:
"""Return the last value seen by this cache."""
return str(self._latest_value)