Skip to content

Automatically closing broadcast channels#494

Open
shsms wants to merge 7 commits intofrequenz-floss:v1.x.xfrom
shsms:auto-close-channels
Open

Automatically closing broadcast channels#494
shsms wants to merge 7 commits intofrequenz-floss:v1.x.xfrom
shsms:auto-close-channels

Conversation

@shsms
Copy link
Contributor

@shsms shsms commented Feb 9, 2026

Channels created with the broadcast function will close automatically when either all senders or all receivers to a channel are closed.

shsms added 4 commits February 9, 2026 12:21
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
@shsms shsms requested a review from a team as a code owner February 9, 2026 16:00
@shsms shsms requested review from Marenz and Copilot and removed request for a team February 9, 2026 16:00
@github-actions github-actions bot added part:tests Affects the unit, integration and performance (benchmarks) tests part:channels Affects channels implementation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:experimental Affects the experimental package labels Feb 9, 2026
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an auto-closing mode for Broadcast channels via a new broadcast() factory, aiming to close the underlying channel automatically once all senders or all receivers are closed.

Changes:

  • Introduce broadcast() factory that creates a Broadcast with auto_close=True.
  • Add a close() method to the Sender interface and implement “closed sender” behavior (incl. SenderClosedError) in Anycast/Broadcast senders.
  • Add tests asserting Broadcast auto-close behavior for “all receivers closed” and “all senders closed”.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
tests/test_broadcast.py Adds coverage for auto-close semantics using the new broadcast() factory.
src/frequenz/channels/experimental/_relay_sender.py Implements close() by forwarding close to underlying senders.
src/frequenz/channels/_sender.py Extends sender API with close(), adds SenderClosedError, and introduces clone/subscribe sender ABCs.
src/frequenz/channels/_broadcast.py Adds broadcast() factory and implements sender/receiver-driven auto-close logic in Broadcast.
src/frequenz/channels/_anycast.py Adds sender close-state tracking and close() implementation; raises SenderClosedError on use-after-close.
src/frequenz/channels/init.py Exposes broadcast() and new sender-related types/errors from the public package API.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +410 to +417
if self._channel._auto_close and (
self._channel._sender_count == 0 or len(self._channel._receivers) == 0
):
await self._channel.aclose()
raise SenderError("The channel was closed", self) from ChannelClosedError(
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _Sender.send(), the auto-close branch runs before the _closed check. If a user calls send() on a closed sender, this can raise a channel-closed SenderError (and even close the whole channel) instead of the documented SenderClosedError. Check self._closed first to preserve the expected error semantics and avoid closing the channel due to a misuse of a closed sender.

Copilot uses AI. Check for mistakes.


class SenderClosedError(SenderError[SenderMessageT_co]):
"""An error indicating that a send operation was attempted a closed sender."""
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring grammar: “attempted a closed sender” should be “attempted on a closed sender” (or similar).

Suggested change
"""An error indicating that a send operation was attempted a closed sender."""
"""An error indicating that a send operation was attempted on a closed sender."""

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +128
A new sender that sends messages to the same channel as this sender.
"""


Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SubscribableSender.subscribe() returns a Receiver, but the docstring says it returns “A new sender…”. Update the return docs to say it returns a new receiver attached to the same channel/stream.

Suggested change
A new sender that sends messages to the same channel as this sender.
"""
A new receiver that receives messages from the same channel/stream
as this sender.
"""

Copilot uses AI. Check for mistakes.
Comment on lines +129 to +130
class ClonableSender(Sender[SenderMessageT_contra], ABC):
"""A [Sender][frequenz.channels.Sender] that can be cloned."""
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Public API naming: ClonableSender / ClonableSubscribableSender appear to be misspelled (standard spelling is “Cloneable”). Since these are exported in __init__.py, consider renaming now to avoid locking in a typo in the public API.

Copilot uses AI. Check for mistakes.
After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_Sender.close() is not idempotent: calling it multiple times will decrement _channel._sender_count multiple times, which can make the count negative and incorrectly trigger auto-close while there are still open senders. Make close() a no-op if the sender is already closed (and only decrement the counter once).

Suggested change
"""
"""
if self._closed:
return

Copilot uses AI. Check for mistakes.
Comment on lines +441 to +445
self._closed = True
self._channel._sender_count -= 1
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the last sender closes, receivers that are currently blocked in ready() will not be woken up because close() doesn't notify _recv_cv or otherwise trigger channel closure. With auto_close=True, this can leave receivers waiting forever even though all senders are closed. Consider scheduling aclose() (or at least notifying _recv_cv) when _sender_count reaches 0.

Copilot uses AI. Check for mistakes.
shsms added 3 commits February 9, 2026 18:02
The `broadcast` function would only return a sender and a receiver
from an auto-closing channel.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
@shsms shsms force-pushed the auto-close-channels branch from 7f347bd to d98c90a Compare February 9, 2026 17:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:channels Affects channels implementation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:experimental Affects the experimental package part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant