Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
* Fixed that dispatches are never retried on failure, but instead an infinite loop of retry logs is triggered.
95 changes: 30 additions & 65 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Awaitable, cast

from frequenz.channels import Broadcast, Receiver, Sender, select
from frequenz.channels.timer import SkipMissedAndDrift, Timer
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.client.microgrid import ComponentId
from frequenz.sdk.actor import Actor, BackgroundService
Expand Down Expand Up @@ -142,59 +143,6 @@ async def main():
```
"""

class FailedDispatchesRetrier(BackgroundService):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So satisfying to see this class go away... 🤣

"""Manages the retring of failed dispatches."""

def __init__(self, retry_interval: timedelta) -> None:
"""Initialize the retry manager.

Args:
retry_interval: The interval between retries.
"""
super().__init__()
self._retry_interval = retry_interval
self._channel = Broadcast[Dispatch](name="retry_channel")
self._sender = self._channel.new_sender()

def start(self) -> None:
"""Start the background service.

This is a no-op.
"""

def new_receiver(self) -> Receiver[Dispatch]:
"""Create a new receiver for dispatches to retry.

Returns:
The receiver.
"""
return self._channel.new_receiver()

def retry(self, dispatch: Dispatch) -> None:
"""Retry a dispatch.

Args:
dispatch: The dispatch information to retry.
"""
task = asyncio.create_task(self._retry_after_delay(dispatch))
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)

async def _retry_after_delay(self, dispatch: Dispatch) -> None:
"""Retry a dispatch after a delay.

Args:
dispatch: The dispatch information to retry.
"""
_logger.info(
"Will retry dispatch %s after %s",
dispatch.id,
self._retry_interval,
)
await asyncio.sleep(self._retry_interval.total_seconds())
_logger.info("Retrying dispatch %s now", dispatch.id)
await self._sender.send(dispatch)

@dataclass(frozen=True, kw_only=True)
class ActorAndChannel:
"""Actor and its sender."""
Expand Down Expand Up @@ -225,18 +173,19 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
running_status_receiver: The receiver for dispatch running status changes.
dispatch_identity: A function to identify to which actor a dispatch refers.
By default, it uses the dispatch ID.
retry_interval: The interval between retries.
retry_interval: How long to wait until trying to start failed actors again.
"""
super().__init__()
self._dispatch_identity: Callable[[Dispatch], int] = (
dispatch_identity if dispatch_identity else lambda d: d.id
)

self._dispatch_rx = running_status_receiver
self._retry_timer_rx = Timer(retry_interval, SkipMissedAndDrift())
self._actor_factory = actor_factory
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}

self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)
self._failed_dispatches: dict[int, Dispatch] = {}
"""Failed dispatches that will be retried later."""

def start(self) -> None:
"""Start the background service."""
Expand Down Expand Up @@ -292,7 +241,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
dispatch.type,
exc_info=e,
)
self._retrier.retry(dispatch)
self._failed_dispatches[identity] = dispatch
else:
# No exception occurred, so we can add the actor to the list
self._actors[identity] = ActorDispatcher.ActorAndChannel(
Expand All @@ -318,21 +267,37 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:

async def _run(self) -> None:
"""Run the background service."""
async with self._retrier:
retry_recv = self._retrier.new_receiver()
async for selected in select(self._retry_timer_rx, self._dispatch_rx):
if self._retry_timer_rx.triggered(selected):
if not self._failed_dispatches:
continue

_logger.info(
"Retrying %d failed actor starts",
len(self._failed_dispatches),
)
keys = list(self._failed_dispatches.keys())
for identity in keys:
dispatch = self._failed_dispatches[identity]

async for selected in select(retry_recv, self._dispatch_rx):
if retry_recv.triggered(selected):
self._retrier.retry(selected.message)
elif self._dispatch_rx.triggered(selected):
await self._handle_dispatch(selected.message)
await self._handle_dispatch(dispatch)
elif self._dispatch_rx.triggered(selected):
await self._handle_dispatch(selected.message)

async def _handle_dispatch(self, dispatch: Dispatch) -> None:
"""Handle a dispatch.
"""Process a dispatch to start, update, or stop an actor.

If a newer version of a previously failed dispatch is received, the
pending retry for the older version is canceled to ensure only the
latest dispatch is processed.

Args:
dispatch: The dispatch to handle.
"""
identity = self._dispatch_identity(dispatch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be good to put a comment here about what you replied to Ela (or in the docstring), I agree at first sight it might look not super obvious wht removing dispatches from the pending list here.

if identity in self._failed_dispatches:
self._failed_dispatches.pop(identity)

if dispatch.started:
await self._start_actor(dispatch)
else:
Expand Down
Loading