Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"""

from ._anycast import Anycast
from ._broadcast import Broadcast
from ._broadcast import Broadcast, broadcast
from ._exceptions import ChannelClosedError, ChannelError, Error
from ._generic import (
ChannelMessageT,
Expand All @@ -92,6 +92,7 @@
)
from ._latest_value_cache import LatestValueCache
from ._merge import Merger, merge
from ._one_shot import oneshot
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
from ._select import (
Selected,
Expand All @@ -100,14 +101,23 @@
select,
selected_from,
)
from ._sender import Sender, SenderError
from ._sender import (
ClonableSender,
ClonableSubscribableSender,
Sender,
SenderClosedError,
SenderError,
SubscribableSender,
)

__all__ = [
"Anycast",
"Broadcast",
"ChannelClosedError",
"ChannelError",
"ChannelMessageT",
"ClonableSender",
"ClonableSubscribableSender",
"Error",
"ErroredChannelT_co",
"LatestValueCache",
Expand All @@ -120,11 +130,15 @@
"SelectError",
"Selected",
"Sender",
"SenderClosedError",
"SenderError",
"SenderMessageT_co",
"SenderMessageT_contra",
"SubscribableSender",
"UnhandledSelectedError",
"broadcast",
"merge",
"oneshot",
"select",
"selected_from",
]
19 changes: 18 additions & 1 deletion src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
self._channel: Anycast[_T] = channel
"""The channel that this sender belongs to."""

self._closed: bool = False
"""Whether the sender is closed."""

@override
async def send(self, message: _T, /) -> None:
"""Send a message across the channel.
Expand All @@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
if self._closed:
raise SenderClosedError(self)

# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
Expand All @@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify(1)
# pylint: enable=protected-access

@override
def close(self) -> None:
"""Close this sender.

After closing, the sender will not be able to send any more messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
self._closed = True

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
104 changes: 99 additions & 5 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,51 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)


class Broadcast(Generic[ChannelMessageT]):
def broadcast(
message_type: type[ChannelMessageT], # pylint: disable=unused-argument
/,
*,
name: str,
resend_latest: bool = False,
) -> tuple[ClonableSubscribableSender[ChannelMessageT], Receiver[ChannelMessageT]]:
"""Create a new Broadcast channel and return a sender and a receiver attached to it.

The channel will be automatically closed when all senders or all receivers
are closed.

Args:
message_type: The type of messages that will be sent through this channel. This
is only for type checking purposes, it is not used at runtime.
name: The name of the channel. This is for logging purposes, and it will be
shown in the string representation of the channel.
resend_latest: When True, every time a new receiver is created with
`new_receiver`, the last message seen by the channel will be sent to the
new receiver automatically. This allows new receivers on slow streams to
get the latest message as soon as they are created, without having to
wait for the next message on the channel to arrive. It is safe to be
set in data/reporting channels, but is not recommended for use in
channels that stream control instructions.

Returns:
A tuple of a sender and a receiver attached to the created channel.
"""
channel = Broadcast[ChannelMessageT](
name=name, resend_latest=resend_latest, auto_close=True
)
return channel.new_sender(), channel.new_receiver()


@deprecated(
"Please use the `broadcast` function to create a Broadcast channel instead."
)
class Broadcast( # pylint: disable=too-many-instance-attributes
Generic[ChannelMessageT]
):
"""A channel that deliver all messages to all receivers.

# Description
Expand Down Expand Up @@ -184,7 +223,13 @@ async def main() -> None:
```
"""

def __init__(self, *, name: str, resend_latest: bool = False) -> None:
def __init__(
self,
*,
name: str,
resend_latest: bool = False,
auto_close: bool = False,
) -> None:
"""Initialize this channel.

Args:
Expand All @@ -197,6 +242,8 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
wait for the next message on the channel to arrive. It is safe to be
set in data/reporting channels, but is not recommended for use in
channels that stream control instructions.
auto_close: If True, the channel will be closed when all senders or all
receivers are closed.
"""
self._name: str = name
"""The name of the broadcast channel.
Expand All @@ -207,6 +254,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
self._recv_cv: Condition = Condition()
"""The condition to wait for data in the channel's buffer."""

self._sender_count: int = 0
"""The number of senders attached to this channel."""

self._receivers: dict[
int, weakref.ReferenceType[_Receiver[ChannelMessageT]]
] = {}
Expand All @@ -218,6 +268,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
self._latest: ChannelMessageT | None = None
"""The latest message sent to the channel."""

self._auto_close: bool = auto_close
"""Whether to close the channel when all senders or all receivers are closed."""

self.resend_latest: bool = resend_latest
"""Whether to resend the latest message to new receivers.

Expand Down Expand Up @@ -269,7 +322,7 @@ async def close(self) -> None: # noqa: D402
"""Close the channel, deprecated alias for `aclose()`.""" # noqa: D402
return await self.aclose()

def new_sender(self) -> Sender[ChannelMessageT]:
def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]:
"""Return a new sender attached to this channel."""
return _Sender(self)

Expand Down Expand Up @@ -317,7 +370,7 @@ def __repr__(self) -> str:
_T = TypeVar("_T")


class _Sender(Sender[_T]):
class _Sender(ClonableSubscribableSender[_T]):
"""A sender to send messages to the broadcast channel.

Should not be created directly, but through the
Expand All @@ -334,6 +387,11 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
self._channel: Broadcast[_T] = channel
"""The broadcast channel this sender belongs to."""

self._closed: bool = False
"""Whether this sender is closed."""

self._channel._sender_count += 1

@override
async def send(self, message: _T, /) -> None:
"""Send a message to all broadcast receivers.
Expand All @@ -345,12 +403,22 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
self._channel
)
if self._channel._auto_close and (
self._channel._sender_count == 0 or len(self._channel._receivers) == 0
):
await self._channel.aclose()
raise SenderError("The channel was closed", self) from ChannelClosedError(
Comment on lines +413 to +417
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _Sender.send(), the auto-close branch runs before the _closed check. If a user calls send() on a closed sender, this can raise a channel-closed SenderError (and even close the whole channel) instead of the documented SenderClosedError. Check self._closed first to preserve the expected error semantics and avoid closing the channel due to a misuse of a closed sender.

Copilot uses AI. Check for mistakes.
self._channel
)
if self._closed:
raise SenderClosedError(self)
self._channel._latest = message
stale_refs = []
for _hash, recv_ref in self._channel._receivers.items():
Expand All @@ -365,6 +433,27 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify_all()
# pylint: enable=protected-access

@override
def close(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_Sender.close() is not idempotent: calling it multiple times will decrement _channel._sender_count multiple times, which can make the count negative and incorrectly trigger auto-close while there are still open senders. Make close() a no-op if the sender is already closed (and only decrement the counter once).

Suggested change
"""
"""
if self._closed:
return

Copilot uses AI. Check for mistakes.
self._closed = True
self._channel._sender_count -= 1
Comment on lines +444 to +445
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the last sender closes, receivers that are currently blocked in ready() will not be woken up because close() doesn't notify _recv_cv or otherwise trigger channel closure. With auto_close=True, this can leave receivers waiting forever even though all senders are closed. Consider scheduling aclose() (or at least notifying _recv_cv) when _sender_count reaches 0.

Copilot uses AI. Check for mistakes.

@override
def clone(self) -> _Sender[_T]:
"""Return a clone of this sender."""
return _Sender(self._channel)

@override
def subscribe(self) -> Receiver[_T]:
"""Return a new receiver attached to this sender's channel."""
return self._channel.new_receiver()

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down Expand Up @@ -476,6 +565,11 @@ async def ready(self) -> bool:
while len(self._q) == 0:
if self._channel._closed or self._closed:
return False
if self._channel._auto_close and (
self._channel._sender_count == 0 or len(self._channel._receivers) == 0
):
await self._channel.aclose()
return False
async with self._channel._recv_cv:
await self._channel._recv_cv.wait()
return True
Expand Down
90 changes: 90 additions & 0 deletions src/frequenz/channels/_one_shot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""A channel that can send a single message."""

import typing
from asyncio import Condition

from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderClosedError


def oneshot(
message_type: type[ChannelMessageT], # pylint: disable=unused-argument
) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]:
"""Create a one-shot channel.

A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.

Args:
message_type: The type of messages that can be sent through this channel.

Returns:
A tuple of a sender and a receiver for this channel.
"""
channel = _OneShot[ChannelMessageT]()
return _OneShotSender(channel), _OneShotReceiver(channel)


class _Empty:
pass


_EMPTY = _Empty()


class _OneShot(typing.Generic[ChannelMessageT]):
"""A one-shot channel.

A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.
"""

def __init__(self) -> None:
"""Create a new one-shot channel."""
self.message: ChannelMessageT | _Empty = _EMPTY
self.sent = False
self.drained = False
self.condition = Condition()


class _OneShotSender(Sender[ChannelMessageT]):
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
self._channel = channel

async def send(self, message: ChannelMessageT, /) -> None:
if self._channel.sent:
raise SenderClosedError(self)
self._channel.message = message
self._channel.sent = True
if self._channel.condition.locked():
self._channel.condition.notify()

def close(self) -> None:
self._channel.sent = True


class _OneShotReceiver(Receiver[ChannelMessageT]):
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
self._channel = channel

async def ready(self) -> bool:
while not self._channel.sent:
await self._channel.condition.wait()
if self._channel.drained:
return False
return True

def consume(self) -> ChannelMessageT:
if self._channel.drained:
raise ReceiverStoppedError(self)
if isinstance(self._channel.message, _Empty):
raise ReceiverStoppedError(self)

self._channel.drained = True
return self._channel.message
Loading
Loading