Automatically closing broadcast channels#494
Automatically closing broadcast channels#494shsms wants to merge 7 commits intofrequenz-floss:v1.x.xfrom
Conversation
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
There was a problem hiding this comment.
Pull request overview
Adds an auto-closing mode for Broadcast channels via a new broadcast() factory, aiming to close the underlying channel automatically once all senders or all receivers are closed.
Changes:
- Introduce
broadcast()factory that creates a Broadcast withauto_close=True. - Add a
close()method to theSenderinterface and implement “closed sender” behavior (incl.SenderClosedError) in Anycast/Broadcast senders. - Add tests asserting Broadcast auto-close behavior for “all receivers closed” and “all senders closed”.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_broadcast.py | Adds coverage for auto-close semantics using the new broadcast() factory. |
| src/frequenz/channels/experimental/_relay_sender.py | Implements close() by forwarding close to underlying senders. |
| src/frequenz/channels/_sender.py | Extends sender API with close(), adds SenderClosedError, and introduces clone/subscribe sender ABCs. |
| src/frequenz/channels/_broadcast.py | Adds broadcast() factory and implements sender/receiver-driven auto-close logic in Broadcast. |
| src/frequenz/channels/_anycast.py | Adds sender close-state tracking and close() implementation; raises SenderClosedError on use-after-close. |
| src/frequenz/channels/init.py | Exposes broadcast() and new sender-related types/errors from the public package API. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if self._channel._auto_close and ( | ||
| self._channel._sender_count == 0 or len(self._channel._receivers) == 0 | ||
| ): | ||
| await self._channel.aclose() | ||
| raise SenderError("The channel was closed", self) from ChannelClosedError( |
There was a problem hiding this comment.
In _Sender.send(), the auto-close branch runs before the _closed check. If a user calls send() on a closed sender, this can raise a channel-closed SenderError (and even close the whole channel) instead of the documented SenderClosedError. Check self._closed first to preserve the expected error semantics and avoid closing the channel due to a misuse of a closed sender.
|
|
||
|
|
||
| class SenderClosedError(SenderError[SenderMessageT_co]): | ||
| """An error indicating that a send operation was attempted a closed sender.""" |
There was a problem hiding this comment.
Docstring grammar: “attempted a closed sender” should be “attempted on a closed sender” (or similar).
| """An error indicating that a send operation was attempted a closed sender.""" | |
| """An error indicating that a send operation was attempted on a closed sender.""" |
| A new sender that sends messages to the same channel as this sender. | ||
| """ | ||
|
|
||
|
|
There was a problem hiding this comment.
SubscribableSender.subscribe() returns a Receiver, but the docstring says it returns “A new sender…”. Update the return docs to say it returns a new receiver attached to the same channel/stream.
| A new sender that sends messages to the same channel as this sender. | |
| """ | |
| A new receiver that receives messages from the same channel/stream | |
| as this sender. | |
| """ |
| class ClonableSender(Sender[SenderMessageT_contra], ABC): | ||
| """A [Sender][frequenz.channels.Sender] that can be cloned.""" |
There was a problem hiding this comment.
Public API naming: ClonableSender / ClonableSubscribableSender appear to be misspelled (standard spelling is “Cloneable”). Since these are exported in __init__.py, consider renaming now to avoid locking in a typo in the public API.
| After a sender is closed, it can no longer be used to send messages. Any | ||
| attempt to send a message through a closed sender will raise a | ||
| [SenderError][frequenz.channels.SenderError]. | ||
| """ |
There was a problem hiding this comment.
_Sender.close() is not idempotent: calling it multiple times will decrement _channel._sender_count multiple times, which can make the count negative and incorrectly trigger auto-close while there are still open senders. Make close() a no-op if the sender is already closed (and only decrement the counter once).
| """ | |
| """ | |
| if self._closed: | |
| return |
| self._closed = True | ||
| self._channel._sender_count -= 1 |
There was a problem hiding this comment.
When the last sender closes, receivers that are currently blocked in ready() will not be woken up because close() doesn't notify _recv_cv or otherwise trigger channel closure. With auto_close=True, this can leave receivers waiting forever even though all senders are closed. Consider scheduling aclose() (or at least notifying _recv_cv) when _sender_count reaches 0.
The `broadcast` function would only return a sender and a receiver from an auto-closing channel. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
7f347bd to
d98c90a
Compare
Channels created with the
broadcastfunction will close automatically when either all senders or all receivers to a channel are closed.