From 4894cfc6d5eec2972959aa7a74826686a95fbaaf Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 5 Jun 2025 13:12:27 +0200 Subject: [PATCH] Fixed that dispatches are never retried, but forever logged. Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 2 +- src/frequenz/dispatch/_actor_dispatcher.py | 95 +++++++--------------- 2 files changed, 31 insertions(+), 66 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d4546f43..f0ee9d35 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,4 @@ ## Bug Fixes - +* Fixed that dispatches are never retried on failure, but instead an infinite loop of retry logs is triggered. diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index cc0deb98..177d3980 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -11,6 +11,7 @@ from typing import Any, Awaitable, cast from frequenz.channels import Broadcast, Receiver, Sender, select +from frequenz.channels.timer import SkipMissedAndDrift, Timer from frequenz.client.common.microgrid.components import ComponentCategory from frequenz.client.microgrid import ComponentId from frequenz.sdk.actor import Actor, BackgroundService @@ -142,59 +143,6 @@ async def main(): ``` """ - class FailedDispatchesRetrier(BackgroundService): - """Manages the retring of failed dispatches.""" - - def __init__(self, retry_interval: timedelta) -> None: - """Initialize the retry manager. - - Args: - retry_interval: The interval between retries. - """ - super().__init__() - self._retry_interval = retry_interval - self._channel = Broadcast[Dispatch](name="retry_channel") - self._sender = self._channel.new_sender() - - def start(self) -> None: - """Start the background service. - - This is a no-op. - """ - - def new_receiver(self) -> Receiver[Dispatch]: - """Create a new receiver for dispatches to retry. - - Returns: - The receiver. - """ - return self._channel.new_receiver() - - def retry(self, dispatch: Dispatch) -> None: - """Retry a dispatch. - - Args: - dispatch: The dispatch information to retry. - """ - task = asyncio.create_task(self._retry_after_delay(dispatch)) - self._tasks.add(task) - task.add_done_callback(self._tasks.remove) - - async def _retry_after_delay(self, dispatch: Dispatch) -> None: - """Retry a dispatch after a delay. - - Args: - dispatch: The dispatch information to retry. - """ - _logger.info( - "Will retry dispatch %s after %s", - dispatch.id, - self._retry_interval, - ) - await asyncio.sleep(self._retry_interval.total_seconds()) - _logger.info("Retrying dispatch %s now", dispatch.id) - await self._sender.send(dispatch) - @dataclass(frozen=True, kw_only=True) class ActorAndChannel: """Actor and its sender.""" @@ -225,7 +173,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen running_status_receiver: The receiver for dispatch running status changes. dispatch_identity: A function to identify to which actor a dispatch refers. By default, it uses the dispatch ID. - retry_interval: The interval between retries. + retry_interval: How long to wait until trying to start failed actors again. """ super().__init__() self._dispatch_identity: Callable[[Dispatch], int] = ( @@ -233,10 +181,11 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen ) self._dispatch_rx = running_status_receiver + self._retry_timer_rx = Timer(retry_interval, SkipMissedAndDrift()) self._actor_factory = actor_factory self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {} - - self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval) + self._failed_dispatches: dict[int, Dispatch] = {} + """Failed dispatches that will be retried later.""" def start(self) -> None: """Start the background service.""" @@ -292,7 +241,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None: dispatch.type, exc_info=e, ) - self._retrier.retry(dispatch) + self._failed_dispatches[identity] = dispatch else: # No exception occurred, so we can add the actor to the list self._actors[identity] = ActorDispatcher.ActorAndChannel( @@ -318,21 +267,37 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: async def _run(self) -> None: """Run the background service.""" - async with self._retrier: - retry_recv = self._retrier.new_receiver() + async for selected in select(self._retry_timer_rx, self._dispatch_rx): + if self._retry_timer_rx.triggered(selected): + if not self._failed_dispatches: + continue + + _logger.info( + "Retrying %d failed actor starts", + len(self._failed_dispatches), + ) + keys = list(self._failed_dispatches.keys()) + for identity in keys: + dispatch = self._failed_dispatches[identity] - async for selected in select(retry_recv, self._dispatch_rx): - if retry_recv.triggered(selected): - self._retrier.retry(selected.message) - elif self._dispatch_rx.triggered(selected): - await self._handle_dispatch(selected.message) + await self._handle_dispatch(dispatch) + elif self._dispatch_rx.triggered(selected): + await self._handle_dispatch(selected.message) async def _handle_dispatch(self, dispatch: Dispatch) -> None: - """Handle a dispatch. + """Process a dispatch to start, update, or stop an actor. + + If a newer version of a previously failed dispatch is received, the + pending retry for the older version is canceled to ensure only the + latest dispatch is processed. Args: dispatch: The dispatch to handle. """ + identity = self._dispatch_identity(dispatch) + if identity in self._failed_dispatches: + self._failed_dispatches.pop(identity) + if dispatch.started: await self._start_actor(dispatch) else: