-
Notifications
You must be signed in to change notification settings - Fork 9
Automatically closing broadcast channels #494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v1.x.x
Are you sure you want to change the base?
Changes from all commits
4e45aac
a1b7256
9b1e846
090f5c0
86794f1
1a5b8e0
d98c90a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,12 +16,51 @@ | |||||||||
| from ._exceptions import ChannelClosedError | ||||||||||
| from ._generic import ChannelMessageT | ||||||||||
| from ._receiver import Receiver, ReceiverStoppedError | ||||||||||
| from ._sender import Sender, SenderError | ||||||||||
| from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError | ||||||||||
|
|
||||||||||
| _logger = logging.getLogger(__name__) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class Broadcast(Generic[ChannelMessageT]): | ||||||||||
| def broadcast( | ||||||||||
| message_type: type[ChannelMessageT], # pylint: disable=unused-argument | ||||||||||
| /, | ||||||||||
| *, | ||||||||||
| name: str, | ||||||||||
| resend_latest: bool = False, | ||||||||||
| ) -> tuple[ClonableSubscribableSender[ChannelMessageT], Receiver[ChannelMessageT]]: | ||||||||||
| """Create a new Broadcast channel and return a sender and a receiver attached to it. | ||||||||||
|
|
||||||||||
| The channel will be automatically closed when all senders or all receivers | ||||||||||
| are closed. | ||||||||||
|
|
||||||||||
| Args: | ||||||||||
| message_type: The type of messages that will be sent through this channel. This | ||||||||||
| is only for type checking purposes, it is not used at runtime. | ||||||||||
| name: The name of the channel. This is for logging purposes, and it will be | ||||||||||
| shown in the string representation of the channel. | ||||||||||
| resend_latest: When True, every time a new receiver is created with | ||||||||||
| `new_receiver`, the last message seen by the channel will be sent to the | ||||||||||
| new receiver automatically. This allows new receivers on slow streams to | ||||||||||
| get the latest message as soon as they are created, without having to | ||||||||||
| wait for the next message on the channel to arrive. It is safe to be | ||||||||||
| set in data/reporting channels, but is not recommended for use in | ||||||||||
| channels that stream control instructions. | ||||||||||
|
|
||||||||||
| Returns: | ||||||||||
| A tuple of a sender and a receiver attached to the created channel. | ||||||||||
| """ | ||||||||||
| channel = Broadcast[ChannelMessageT]( | ||||||||||
| name=name, resend_latest=resend_latest, auto_close=True | ||||||||||
| ) | ||||||||||
| return channel.new_sender(), channel.new_receiver() | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @deprecated( | ||||||||||
| "Please use the `broadcast` function to create a Broadcast channel instead." | ||||||||||
| ) | ||||||||||
| class Broadcast( # pylint: disable=too-many-instance-attributes | ||||||||||
| Generic[ChannelMessageT] | ||||||||||
| ): | ||||||||||
| """A channel that deliver all messages to all receivers. | ||||||||||
|
|
||||||||||
| # Description | ||||||||||
|
|
@@ -184,7 +223,13 @@ async def main() -> None: | |||||||||
| ``` | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| def __init__(self, *, name: str, resend_latest: bool = False) -> None: | ||||||||||
| def __init__( | ||||||||||
| self, | ||||||||||
| *, | ||||||||||
| name: str, | ||||||||||
| resend_latest: bool = False, | ||||||||||
| auto_close: bool = False, | ||||||||||
| ) -> None: | ||||||||||
| """Initialize this channel. | ||||||||||
|
|
||||||||||
| Args: | ||||||||||
|
|
@@ -197,6 +242,8 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: | |||||||||
| wait for the next message on the channel to arrive. It is safe to be | ||||||||||
| set in data/reporting channels, but is not recommended for use in | ||||||||||
| channels that stream control instructions. | ||||||||||
| auto_close: If True, the channel will be closed when all senders or all | ||||||||||
| receivers are closed. | ||||||||||
| """ | ||||||||||
| self._name: str = name | ||||||||||
| """The name of the broadcast channel. | ||||||||||
|
|
@@ -207,6 +254,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: | |||||||||
| self._recv_cv: Condition = Condition() | ||||||||||
| """The condition to wait for data in the channel's buffer.""" | ||||||||||
|
|
||||||||||
| self._sender_count: int = 0 | ||||||||||
| """The number of senders attached to this channel.""" | ||||||||||
|
|
||||||||||
| self._receivers: dict[ | ||||||||||
| int, weakref.ReferenceType[_Receiver[ChannelMessageT]] | ||||||||||
| ] = {} | ||||||||||
|
|
@@ -218,6 +268,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: | |||||||||
| self._latest: ChannelMessageT | None = None | ||||||||||
| """The latest message sent to the channel.""" | ||||||||||
|
|
||||||||||
| self._auto_close: bool = auto_close | ||||||||||
| """Whether to close the channel when all senders or all receivers are closed.""" | ||||||||||
|
|
||||||||||
| self.resend_latest: bool = resend_latest | ||||||||||
| """Whether to resend the latest message to new receivers. | ||||||||||
|
|
||||||||||
|
|
@@ -269,7 +322,7 @@ async def close(self) -> None: # noqa: D402 | |||||||||
| """Close the channel, deprecated alias for `aclose()`.""" # noqa: D402 | ||||||||||
| return await self.aclose() | ||||||||||
|
|
||||||||||
| def new_sender(self) -> Sender[ChannelMessageT]: | ||||||||||
| def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]: | ||||||||||
| """Return a new sender attached to this channel.""" | ||||||||||
| return _Sender(self) | ||||||||||
|
|
||||||||||
|
|
@@ -317,7 +370,7 @@ def __repr__(self) -> str: | |||||||||
| _T = TypeVar("_T") | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class _Sender(Sender[_T]): | ||||||||||
| class _Sender(ClonableSubscribableSender[_T]): | ||||||||||
| """A sender to send messages to the broadcast channel. | ||||||||||
|
|
||||||||||
| Should not be created directly, but through the | ||||||||||
|
|
@@ -334,6 +387,11 @@ def __init__(self, channel: Broadcast[_T], /) -> None: | |||||||||
| self._channel: Broadcast[_T] = channel | ||||||||||
| """The broadcast channel this sender belongs to.""" | ||||||||||
|
|
||||||||||
| self._closed: bool = False | ||||||||||
| """Whether this sender is closed.""" | ||||||||||
|
|
||||||||||
| self._channel._sender_count += 1 | ||||||||||
|
|
||||||||||
| @override | ||||||||||
| async def send(self, message: _T, /) -> None: | ||||||||||
| """Send a message to all broadcast receivers. | ||||||||||
|
|
@@ -345,12 +403,22 @@ async def send(self, message: _T, /) -> None: | |||||||||
| SenderError: If the underlying channel was closed. | ||||||||||
| A [ChannelClosedError][frequenz.channels.ChannelClosedError] is | ||||||||||
| set as the cause. | ||||||||||
| SenderClosedError: If this sender was closed. | ||||||||||
| """ | ||||||||||
| # pylint: disable=protected-access | ||||||||||
| if self._channel._closed: | ||||||||||
| raise SenderError("The channel was closed", self) from ChannelClosedError( | ||||||||||
| self._channel | ||||||||||
| ) | ||||||||||
| if self._channel._auto_close and ( | ||||||||||
| self._channel._sender_count == 0 or len(self._channel._receivers) == 0 | ||||||||||
| ): | ||||||||||
| await self._channel.aclose() | ||||||||||
| raise SenderError("The channel was closed", self) from ChannelClosedError( | ||||||||||
| self._channel | ||||||||||
| ) | ||||||||||
| if self._closed: | ||||||||||
| raise SenderClosedError(self) | ||||||||||
| self._channel._latest = message | ||||||||||
| stale_refs = [] | ||||||||||
| for _hash, recv_ref in self._channel._receivers.items(): | ||||||||||
|
|
@@ -365,6 +433,27 @@ async def send(self, message: _T, /) -> None: | |||||||||
| self._channel._recv_cv.notify_all() | ||||||||||
| # pylint: enable=protected-access | ||||||||||
|
|
||||||||||
| @override | ||||||||||
| def close(self) -> None: | ||||||||||
| """Close this sender. | ||||||||||
|
|
||||||||||
| After a sender is closed, it can no longer be used to send messages. Any | ||||||||||
| attempt to send a message through a closed sender will raise a | ||||||||||
| [SenderError][frequenz.channels.SenderError]. | ||||||||||
| """ | ||||||||||
|
||||||||||
| """ | |
| """ | |
| if self._closed: | |
| return |
Copilot
AI
Feb 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the last sender closes, receivers that are currently blocked in ready() will not be woken up because close() doesn't notify _recv_cv or otherwise trigger channel closure. With auto_close=True, this can leave receivers waiting forever even though all senders are closed. Consider scheduling aclose() (or at least notifying _recv_cv) when _sender_count reaches 0.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| # License: MIT | ||
| # Copyright © 2026 Frequenz Energy-as-a-Service GmbH | ||
|
|
||
| """A channel that can send a single message.""" | ||
|
|
||
| import typing | ||
| from asyncio import Condition | ||
|
|
||
| from ._generic import ChannelMessageT | ||
| from ._receiver import Receiver, ReceiverStoppedError | ||
| from ._sender import Sender, SenderClosedError | ||
|
|
||
|
|
||
| def oneshot( | ||
| message_type: type[ChannelMessageT], # pylint: disable=unused-argument | ||
| ) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]: | ||
| """Create a one-shot channel. | ||
|
|
||
| A one-shot channel is a channel that can only send one message. After the first | ||
| message is sent, the sender is closed and any further attempts to send a message | ||
| will raise a `SenderClosedError`. | ||
|
|
||
| Args: | ||
| message_type: The type of messages that can be sent through this channel. | ||
|
|
||
| Returns: | ||
| A tuple of a sender and a receiver for this channel. | ||
| """ | ||
| channel = _OneShot[ChannelMessageT]() | ||
| return _OneShotSender(channel), _OneShotReceiver(channel) | ||
|
|
||
|
|
||
| class _Empty: | ||
| pass | ||
|
|
||
|
|
||
| _EMPTY = _Empty() | ||
|
|
||
|
|
||
| class _OneShot(typing.Generic[ChannelMessageT]): | ||
| """A one-shot channel. | ||
|
|
||
| A one-shot channel is a channel that can only send one message. After the first | ||
| message is sent, the sender is closed and any further attempts to send a message | ||
| will raise a `SenderClosedError`. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| """Create a new one-shot channel.""" | ||
| self.message: ChannelMessageT | _Empty = _EMPTY | ||
| self.sent = False | ||
| self.drained = False | ||
| self.condition = Condition() | ||
|
|
||
|
|
||
| class _OneShotSender(Sender[ChannelMessageT]): | ||
| def __init__(self, channel: _OneShot[ChannelMessageT]) -> None: | ||
| self._channel = channel | ||
|
|
||
| async def send(self, message: ChannelMessageT, /) -> None: | ||
| if self._channel.sent: | ||
| raise SenderClosedError(self) | ||
| self._channel.message = message | ||
| self._channel.sent = True | ||
| if self._channel.condition.locked(): | ||
| self._channel.condition.notify() | ||
|
|
||
| def close(self) -> None: | ||
| self._channel.sent = True | ||
|
|
||
|
|
||
| class _OneShotReceiver(Receiver[ChannelMessageT]): | ||
| def __init__(self, channel: _OneShot[ChannelMessageT]) -> None: | ||
| self._channel = channel | ||
|
|
||
| async def ready(self) -> bool: | ||
| while not self._channel.sent: | ||
| await self._channel.condition.wait() | ||
| if self._channel.drained: | ||
| return False | ||
| return True | ||
|
|
||
| def consume(self) -> ChannelMessageT: | ||
| if self._channel.drained: | ||
| raise ReceiverStoppedError(self) | ||
| if isinstance(self._channel.message, _Empty): | ||
| raise ReceiverStoppedError(self) | ||
|
|
||
| self._channel.drained = True | ||
| return self._channel.message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
_Sender.send(), the auto-close branch runs before the_closedcheck. If a user callssend()on a closed sender, this can raise a channel-closedSenderError(and even close the whole channel) instead of the documentedSenderClosedError. Checkself._closedfirst to preserve the expected error semantics and avoid closing the channel due to a misuse of a closed sender.