-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy path_one_shot.py
More file actions
90 lines (65 loc) · 2.65 KB
/
_one_shot.py
File metadata and controls
90 lines (65 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
"""A channel that can send a single message."""
import typing
from asyncio import Condition
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderClosedError
def oneshot(
message_type: type[ChannelMessageT], # pylint: disable=unused-argument
) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]:
"""Create a one-shot channel.
A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.
Args:
message_type: The type of messages that can be sent through this channel.
Returns:
A tuple of a sender and a receiver for this channel.
"""
channel = _OneShot[ChannelMessageT]()
return _OneShotSender(channel), _OneShotReceiver(channel)
class _Empty:
pass
_EMPTY = _Empty()
class _OneShot(typing.Generic[ChannelMessageT]):
"""A one-shot channel.
A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.
"""
def __init__(self) -> None:
"""Create a new one-shot channel."""
self.message: ChannelMessageT | _Empty = _EMPTY
self.sent = False
self.drained = False
self.condition = Condition()
class _OneShotSender(Sender[ChannelMessageT]):
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
self._channel = channel
async def send(self, message: ChannelMessageT, /) -> None:
if self._channel.sent:
raise SenderClosedError(self)
self._channel.message = message
self._channel.sent = True
if self._channel.condition.locked():
self._channel.condition.notify()
def close(self) -> None:
self._channel.sent = True
class _OneShotReceiver(Receiver[ChannelMessageT]):
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
self._channel = channel
async def ready(self) -> bool:
while not self._channel.sent:
await self._channel.condition.wait()
if self._channel.drained:
return False
return True
def consume(self) -> ChannelMessageT:
if self._channel.drained:
raise ReceiverStoppedError(self)
if isinstance(self._channel.message, _Empty):
raise ReceiverStoppedError(self)
self._channel.drained = True
return self._channel.message