From 31f6935cbf6e5d2d42b2890c6773dcb8aa26f54d Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 5 May 2025 08:38:55 +0000 Subject: [PATCH] Disconnect dispatcher at cleanup Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 1 + src/frequenz/dispatch/_dispatcher.py | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 027b9a83..30b97895 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -15,3 +15,4 @@ ## Bug Fixes * Fixes reconnecting after connection loss for streams +* Fixed an issue in the `Dispatcher` class where the client connection was not properly disconnected during cleanup, potentially causing unclosed socket errors. diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index f8d1cef3..c18fd6a7 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -220,12 +220,13 @@ def __init__( ) self._actor_dispatchers: dict[str, ActorDispatcher] = {} self._empty_event = Event() - self._empty_event.set() + self._disconnecting_future: asyncio.Future[None] | None = None @override def start(self) -> None: """Start the local dispatch service.""" self._bg_service.start() + self._empty_event.set() @property @override @@ -235,19 +236,23 @@ def is_running(self) -> bool: @override async def wait(self) -> None: - """Wait until all actor dispatches are stopped.""" - await asyncio.gather(self._bg_service.wait(), self._empty_event.wait()) + """Wait until all actor dispatches are stopped and client is disconnected.""" + if self._disconnecting_future is not None: + await self._disconnecting_future + await asyncio.gather(self._bg_service.wait(), self._empty_event.wait()) self._actor_dispatchers.clear() - @override def cancel(self, msg: str | None = None) -> None: - """Stop the local dispatch service.""" + """Stop the local dispatch service and initiate client disconnection.""" self._bg_service.cancel(msg) for instance in self._actor_dispatchers.values(): instance.cancel() + # Initiate client disconnection asynchronously + self._disconnecting_future = asyncio.ensure_future(self._client.disconnect()) + async def wait_for_initialization(self) -> None: """Wait until the background service is initialized.""" await self._bg_service.wait_for_initialization() @@ -358,6 +363,7 @@ async def __aenter__(self) -> Self: This background service. """ await super().__aenter__() + await self._client.__aenter__() await self.wait_for_initialization() return self