From a0358ad56ed0940ce1a7aa69ca64f07e7e1ec109 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 14 Jan 2025 13:50:08 +0100 Subject: [PATCH 1/4] Update example to new code Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_managing_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index 6ed4e57..df86f87 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -91,7 +91,7 @@ async def run(): # Start actor and give it an dispatch updates channel receiver my_actor = MyActor(dispatch_updates_channel.new_receiver()) - status_receiver = dispatcher.running_status_change.new_receiver() + status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") managing_actor = DispatchManagingActor( actor=my_actor, From cc6e85ed2b39203c289a8121d84a57865f3f0d3a Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 20 Jan 2025 19:00:52 +0100 Subject: [PATCH 2/4] Update component selector example fixes #68 Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_managing_actor.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index df86f87..57da898 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -62,13 +62,15 @@ async def _run(self) -> None: def set_components(self, components: TargetComponents) -> None: match components: - case [int(), *_] as component_ids: - print("Dispatch: Setting components to %s", components) + case []: + print("Dispatch: Using all components") + case list() as ids if isinstance(ids[0], int): + component_ids = ids case [ComponentCategory.BATTERY, *_]: - print("Dispatch: Using all battery components") + component_category = ComponentCategory.BATTERY case unsupported: print( - "Dispatch: Requested an unsupported target component %r, " + "Dispatch: Requested an unsupported selector %r, " "but only component IDs or category BATTERY are supported.", unsupported, ) From 9d0ad74a6fac5dda6ba58bc1b9edaa3a96b28125 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 20 Jan 2025 19:06:39 +0100 Subject: [PATCH 3/4] Remove now unnessecary check & parameter Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_managing_actor.py | 8 -------- tests/test_mananging_actor.py | 1 - 2 files changed, 9 deletions(-) diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index 57da898..d158b12 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -97,7 +97,6 @@ async def run(): managing_actor = DispatchManagingActor( actor=my_actor, - dispatch_type="EXAMPLE", running_status_receiver=status_receiver, updates_sender=dispatch_updates_channel.new_sender(), ) @@ -109,7 +108,6 @@ async def run(): def __init__( self, actor: Actor | Set[Actor], - dispatch_type: str, running_status_receiver: Receiver[Dispatch], updates_sender: Sender[DispatchUpdate] | None = None, ) -> None: @@ -117,7 +115,6 @@ def __init__( Args: actor: A set of actors or a single actor to manage. - dispatch_type: The type of dispatches to handle. running_status_receiver: The receiver for dispatch running status changes. updates_sender: The sender for dispatch events """ @@ -126,7 +123,6 @@ def __init__( self._actors: frozenset[Actor] = frozenset( [actor] if isinstance(actor, Actor) else actor ) - self._dispatch_type = dispatch_type self._updates_sender = updates_sender def _start_actors(self) -> None: @@ -160,10 +156,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None: Args: dispatch: The dispatch to handle. """ - if dispatch.type != self._dispatch_type: - _logger.debug("Ignoring dispatch %s", dispatch.id) - return - if dispatch.started: if self._updates_sender is not None: _logger.info("Updated by dispatch %s", dispatch.id) diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 07fda2f..662f4ff 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -72,7 +72,6 @@ async def test_env() -> AsyncIterator[TestEnv]: runner_actor = DispatchManagingActor( actor=actor, - dispatch_type="UNIT_TEST", running_status_receiver=channel.new_receiver(), updates_sender=updates_channel.new_sender(), ) From 2b7f7fd31eab6420b572ace980ad296831e8887f Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 20 Jan 2025 19:07:02 +0100 Subject: [PATCH 4/4] Make dispatch receiving less chatty/verbose Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 1 + src/frequenz/dispatch/_bg_service.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 75e865b..c5dd530 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -9,6 +9,7 @@ * Two properties have been replaced by methods that require a type as parameter. * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`. * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`. +* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function. ## New Features diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index a546850..9e466a6 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -259,16 +259,18 @@ async def _fetch(self) -> None: self._dispatches[dispatch.id] = Dispatch(client_dispatch) old_dispatch = old_dispatches.pop(dispatch.id, None) if not old_dispatch: - _logger.info("New dispatch: %s", dispatch) + _logger.debug("New dispatch: %s", dispatch) await self._update_dispatch_schedule_and_notify(dispatch, None) await self._lifecycle_events_tx.send(Created(dispatch=dispatch)) elif dispatch.update_time != old_dispatch.update_time: - _logger.info("Updated dispatch: %s", dispatch) + _logger.debug("Updated dispatch: %s", dispatch) await self._update_dispatch_schedule_and_notify( dispatch, old_dispatch ) await self._lifecycle_events_tx.send(Updated(dispatch=dispatch)) + _logger.info("Received %s dispatches", len(self._dispatches)) + except grpc.aio.AioRpcError as error: _logger.error("Error fetching dispatches: %s", error) self._dispatches = old_dispatches