From 3df0245e526eebe8393cb1827ab3ca584e6a3f24 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 8 Jan 2025 13:41:03 +0100 Subject: [PATCH 1/4] Re-create actors on dispatch updates This commit modifies the `DispatchManagingActor` to re-create the actor when a new dispatch is received and the actor should start running instead of just start/stop the actor. To create the actor a factory function is used, which passes the initial dispatch information to the actor, so it can be properly initialized instead of having the initialization in 2 steps, the creation and the receiving of the dispatch update. Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_managing_actor.py | 170 ++++++++++++++--------- tests/test_mananging_actor.py | 73 +++++++--- 2 files changed, 156 insertions(+), 87 deletions(-) diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index d158b12..d0e4a98 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -3,13 +3,15 @@ """Helper class to manage actors based on dispatches.""" +import asyncio import logging +from collections.abc import Callable from dataclasses import dataclass -from typing import Any, Set +from typing import Any -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver from frequenz.client.dispatch.types import TargetComponents -from frequenz.sdk.actor import Actor +from frequenz.sdk.actor import Actor, BackgroundService from ._dispatch import Dispatch @@ -30,7 +32,7 @@ class DispatchUpdate: """Additional options.""" -class DispatchManagingActor(Actor): +class DispatchManagingActor(BackgroundService): """Helper class to manage actors based on dispatches. Example usage: @@ -38,30 +40,63 @@ class DispatchManagingActor(Actor): ```python import os import asyncio + from typing import override from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate from frequenz.client.dispatch.types import TargetComponents from frequenz.client.common.microgrid.components import ComponentCategory - - from frequenz.channels import Receiver, Broadcast + from frequenz.channels import Receiver, Broadcast, select, selected_from + from frequenz.sdk.actor import Actor, run class MyActor(Actor): - def __init__(self, updates_channel: Receiver[DispatchUpdate]): - super().__init__() - self._updates_channel = updates_channel - self._dry_run: bool - self._options : dict[str, Any] - + def __init__( + self, + *, + name: str | None = None, + ) -> None: + super().__init__(name=name) + self._dispatch_updates_receiver: Receiver[DispatchUpdate] | None = None + self._dry_run: bool = False + self._options: dict[str, Any] = {} + + @classmethod + def new_with_dispatch( + cls, + initial_dispatch: DispatchUpdate, + dispatch_updates_receiver: Receiver[DispatchUpdate], + *, + name: str | None = None, + ) -> "Self": + self = cls(name=name) + self._dispatch_updates_receiver = dispatch_updates_receiver + self._update_dispatch_information(initial_dispatch) + return self + + @override async def _run(self) -> None: - while True: - update = await self._updates_channel.receive() - print("Received update:", update) - - self.set_components(update.components) - self._dry_run = update.dry_run - self._options = update.options + other_recv: Receiver[Any] = ... - def set_components(self, components: TargetComponents) -> None: - match components: + if self._dispatch_updates_receiver is None: + async for msg in other_recv: + # do stuff + ... + else: + await self._run_with_dispatch(other_recv) + + async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None: + async for selected in select(self._dispatch_updates_receiver, other_recv): + if selected_from(selected, self._dispatch_updates_receiver): + self._update_dispatch_information(selected.message) + elif selected_from(selected, other_recv): + # do stuff + ... + else: + assert False, f"Unexpected selected receiver: {selected}" + + def _update_dispatch_information(self, dispatch_update: DispatchUpdate) -> None: + print("Received update:", dispatch_update) + self._dry_run = dispatch_update.dry_run + self._options = dispatch_update.options + match dispatch_update.components: case []: print("Dispatch: Using all components") case list() as ids if isinstance(ids[0], int): @@ -75,7 +110,7 @@ def set_components(self, components: TargetComponents) -> None: unsupported, ) - async def run(): + async def main(): url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") key = os.getenv("DISPATCH_API_KEY", "some-key") @@ -86,64 +121,83 @@ async def run(): server_url=url, key=key ) - - # Create update channel to receive dispatch update events pre-start and mid-run - dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel") - - # Start actor and give it an dispatch updates channel receiver - my_actor = MyActor(dispatch_updates_channel.new_receiver()) + dispatcher.start() status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") managing_actor = DispatchManagingActor( - actor=my_actor, + actor_factory=MyActor.new_with_dispatch, running_status_receiver=status_receiver, - updates_sender=dispatch_updates_channel.new_sender(), ) - await asyncio.gather(dispatcher.start(), managing_actor.start()) + await run(managing_actor) ``` """ def __init__( self, - actor: Actor | Set[Actor], + actor_factory: Callable[[DispatchUpdate, Receiver[DispatchUpdate]], Actor], running_status_receiver: Receiver[Dispatch], - updates_sender: Sender[DispatchUpdate] | None = None, ) -> None: """Initialize the dispatch handler. Args: - actor: A set of actors or a single actor to manage. + actor_factory: A callable that creates an actor with some initial dispatch + information. running_status_receiver: The receiver for dispatch running status changes. - updates_sender: The sender for dispatch events """ super().__init__() self._dispatch_rx = running_status_receiver - self._actors: frozenset[Actor] = frozenset( - [actor] if isinstance(actor, Actor) else actor + self._actor_factory = actor_factory + self._actor: Actor | None = None + self._updates_channel = Broadcast[DispatchUpdate]( + name="dispatch_updates_channel", resend_latest=True ) - self._updates_sender = updates_sender + self._updates_sender = self._updates_channel.new_sender() + + def start(self) -> None: + """Start the background service.""" + self._tasks.add(asyncio.create_task(self._run())) - def _start_actors(self) -> None: + async def _start_actor(self, dispatch: Dispatch) -> None: """Start all actors.""" - for actor in self._actors: - if actor.is_running: - _logger.warning("Actor %s is already running", actor.name) - else: - actor.start() + dispatch_update = DispatchUpdate( + components=dispatch.target, + dry_run=dispatch.dry_run, + options=dispatch.payload, + ) - async def _stop_actors(self, msg: str) -> None: + if self._actor: + sent_str = "" + if self._updates_sender is not None: + sent_str = ", sent a dispatch update instead of creating a new actor" + await self._updates_sender.send(dispatch_update) + _logger.warning( + "Actor for dispatch type %r is already running%s", + dispatch.type, + sent_str, + ) + else: + _logger.info("Starting actor for dispatch type %r", dispatch.type) + self._actor = self._actor_factory( + dispatch_update, self._updates_channel.new_receiver() + ) + self._actor.start() + + async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """Stop all actors. Args: + stopping_dispatch: The dispatch that is stopping the actor. msg: The message to be passed to the actors being stopped. """ - for actor in self._actors: - if actor.is_running: - await actor.stop(msg) - else: - _logger.warning("Actor %s is not running", actor.name) + if self._actor is None: + _logger.warning( + "Actor for dispatch type %r is not running", stopping_dispatch.type + ) + else: + await self._actor.stop(msg) + self._actor = None async def _run(self) -> None: """Wait for dispatches and handle them.""" @@ -157,18 +211,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None: dispatch: The dispatch to handle. """ if dispatch.started: - if self._updates_sender is not None: - _logger.info("Updated by dispatch %s", dispatch.id) - await self._updates_sender.send( - DispatchUpdate( - components=dispatch.target, - dry_run=dispatch.dry_run, - options=dispatch.payload, - ) - ) - - _logger.info("Started by dispatch %s", dispatch.id) - self._start_actors() + await self._start_actor(dispatch) else: - _logger.info("Stopped by dispatch %s", dispatch.id) - await self._stop_actors("Dispatch stopped") + await self._stop_actor(dispatch, "Dispatch stopped") diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 662f4ff..af3af69 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -5,9 +5,10 @@ import asyncio import heapq +import logging from dataclasses import dataclass, replace from datetime import datetime, timedelta, timezone -from typing import AsyncIterator, Iterator +from typing import AsyncIterator, Iterator, cast import async_solipsism import time_machine @@ -46,6 +47,15 @@ def _now() -> datetime: class MockActor(Actor): """Mock actor for testing.""" + def __init__( + self, initial_dispatch: DispatchUpdate, receiver: Receiver[DispatchUpdate] + ) -> None: + """Initialize the actor.""" + super().__init__(name="MockActor") + logging.info("MockActor created") + self.initial_dispatch = initial_dispatch + self.receiver = receiver + async def _run(self) -> None: while True: await asyncio.sleep(1) @@ -55,39 +65,45 @@ async def _run(self) -> None: class TestEnv: """Test environment.""" - actor: Actor - runner_actor: DispatchManagingActor + actors_service: DispatchManagingActor running_status_sender: Sender[Dispatch] - updates_receiver: Receiver[DispatchUpdate] generator: DispatchGenerator = DispatchGenerator() + @property + def actor(self) -> MockActor | None: + """Return the actor.""" + # pylint: disable=protected-access + if self.actors_service._actor is None: + return None + return cast(MockActor, self.actors_service._actor) + # pylint: enable=protected-access + + @property + def updates_receiver(self) -> Receiver[DispatchUpdate]: + """Return the updates receiver.""" + assert self.actor is not None + return self.actor.receiver + @fixture async def test_env() -> AsyncIterator[TestEnv]: """Create a test environment.""" channel = Broadcast[Dispatch](name="dispatch ready test channel") - updates_channel = Broadcast[DispatchUpdate](name="dispatch update test channel") - actor = MockActor() - - runner_actor = DispatchManagingActor( - actor=actor, + actors_service = DispatchManagingActor( + actor_factory=MockActor, running_status_receiver=channel.new_receiver(), - updates_sender=updates_channel.new_sender(), ) - # pylint: disable=protected-access - runner_actor._restart_limit = 0 - runner_actor.start() + actors_service.start() + await asyncio.sleep(1) yield TestEnv( - actor=actor, - runner_actor=runner_actor, + actors_service=actors_service, running_status_sender=channel.new_sender(), - updates_receiver=updates_channel.new_receiver(), ) - await runner_actor.stop() + await actors_service.stop() async def test_simple_start_stop( @@ -112,23 +128,31 @@ async def test_simple_start_stop( ), ) + # Send status update to start actor, expect no DispatchUpdate for the start await test_env.running_status_sender.send(Dispatch(dispatch)) fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) + await asyncio.sleep(1) + logging.info("Sent dispatch") - event = await test_env.updates_receiver.receive() + assert test_env.actor is not None + event = test_env.actor.initial_dispatch assert event.options == {"test": True} assert event.components == dispatch.target assert event.dry_run is False + logging.info("Received dispatch") + + assert test_env.actor is not None assert test_env.actor.is_running is True fake_time.shift(duration) await test_env.running_status_sender.send(Dispatch(dispatch)) # Give await actor.stop a chance to run in DispatchManagingActor - await asyncio.sleep(0.1) + await asyncio.sleep(1) - assert test_env.actor.is_running is False + assert test_env.actor is None def test_heapq_dispatch_compare(test_env: TestEnv) -> None: @@ -198,12 +222,15 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - await test_env.running_status_sender.send(Dispatch(dispatch)) fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) - event = await test_env.updates_receiver.receive() + assert test_env.actor is not None + event = test_env.actor.initial_dispatch assert event.dry_run is dispatch.dry_run assert event.components == dispatch.target assert event.options == dispatch.payload + assert test_env.actor is not None assert test_env.actor.is_running is True assert dispatch.duration is not None @@ -211,6 +238,6 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - await test_env.running_status_sender.send(Dispatch(dispatch)) # Give await actor.stop a chance to run in DispatchManagingActor - await asyncio.sleep(0.1) + await asyncio.sleep(1) - assert test_env.actor.is_running is False + assert test_env.actor is None From db533d1c396ec8927ddc222d38b2036db8f71c43 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 27 Jan 2025 18:39:47 +0100 Subject: [PATCH 2/4] Rename DispatchManagingActor to ActorDispatcher Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/__init__.py | 8 ++++---- .../{_managing_actor.py => _actor_dispatcher.py} | 2 +- tests/test_mananging_actor.py | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) rename src/frequenz/dispatch/{_managing_actor.py => _actor_dispatcher.py} (99%) diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index b2d25d4..db44974 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -7,18 +7,18 @@ * [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API. * [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality. -* [DispatchManagingActor][frequenz.dispatch.DispatchManagingActor]: An actor to - manage other actors based on incoming dispatches. +* [ActorDispatcher][frequenz.dispatch.ActorDispatcher]: A service to manage other actors based on + incoming dispatches. * [Created][frequenz.dispatch.Created], [Updated][frequenz.dispatch.Updated], [Deleted][frequenz.dispatch.Deleted]: Dispatch event types. """ +from ._actor_dispatcher import ActorDispatcher, DispatchUpdate from ._dispatch import Dispatch from ._dispatcher import Dispatcher from ._event import Created, Deleted, DispatchEvent, Updated -from ._managing_actor import DispatchManagingActor, DispatchUpdate __all__ = [ "Created", @@ -27,6 +27,6 @@ "Dispatcher", "Updated", "Dispatch", - "DispatchManagingActor", + "ActorDispatcher", "DispatchUpdate", ] diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_actor_dispatcher.py similarity index 99% rename from src/frequenz/dispatch/_managing_actor.py rename to src/frequenz/dispatch/_actor_dispatcher.py index d0e4a98..fb62d41 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -32,7 +32,7 @@ class DispatchUpdate: """Additional options.""" -class DispatchManagingActor(BackgroundService): +class ActorDispatcher(BackgroundService): """Helper class to manage actors based on dispatches. Example usage: diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index af3af69..17a183e 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -18,7 +18,7 @@ from frequenz.sdk.actor import Actor from pytest import fixture -from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate +from frequenz.dispatch import ActorDispatcher, Dispatch, DispatchUpdate from frequenz.dispatch._bg_service import DispatchScheduler @@ -65,7 +65,7 @@ async def _run(self) -> None: class TestEnv: """Test environment.""" - actors_service: DispatchManagingActor + actors_service: ActorDispatcher running_status_sender: Sender[Dispatch] generator: DispatchGenerator = DispatchGenerator() @@ -90,7 +90,7 @@ async def test_env() -> AsyncIterator[TestEnv]: """Create a test environment.""" channel = Broadcast[Dispatch](name="dispatch ready test channel") - actors_service = DispatchManagingActor( + actors_service = ActorDispatcher( actor_factory=MockActor, running_status_receiver=channel.new_receiver(), ) @@ -149,7 +149,7 @@ async def test_simple_start_stop( fake_time.shift(duration) await test_env.running_status_sender.send(Dispatch(dispatch)) - # Give await actor.stop a chance to run in DispatchManagingActor + # Give await actor.stop a chance to run await asyncio.sleep(1) assert test_env.actor is None @@ -237,7 +237,7 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - fake_time.shift(dispatch.duration) await test_env.running_status_sender.send(Dispatch(dispatch)) - # Give await actor.stop a chance to run in DispatchManagingActor + # Give await actor.stop a chance to run await asyncio.sleep(1) assert test_env.actor is None From c03c594a391dd2b37796f4ccd1b9408305d873f9 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 27 Jan 2025 18:51:53 +0100 Subject: [PATCH 3/4] Update release notes Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index bf30540..6058c97 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,7 +2,7 @@ ## Summary - +This release introduces a more flexible and powerful mechanism for managing dispatch events with new strategies for merging intervals, enhanced customization options, and better overall alignment with evolving SDK dependencies. It also simplifies actor initialization while maintaining robust support for diverse dispatch scenarios. ## Upgrading @@ -10,13 +10,12 @@ * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`. * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`. * The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function. +* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`. + * It's interface has been simplified and now only requires an actor factory and a running status receiver. + * It only supports a single actor at a time now. + * Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information. ## New Features -* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. - * The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600. -## Bug Fixes - - From d8d3c5af0b472f0047fd84f5561148a6a2389cb9 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 4 Feb 2025 12:19:52 +0100 Subject: [PATCH 4/4] Rename `DispatchUpdate` to `DispatchInfo` Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 1 + src/frequenz/dispatch/__init__.py | 4 ++-- src/frequenz/dispatch/_actor_dispatcher.py | 18 +++++++++--------- tests/test_mananging_actor.py | 8 ++++---- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 6058c97..dd74f73 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,6 +14,7 @@ This release introduces a more flexible and powerful mechanism for managing disp * It's interface has been simplified and now only requires an actor factory and a running status receiver. * It only supports a single actor at a time now. * Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information. +* `DispatchUpdate` was renamed to `DispatchInfo`. ## New Features diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index db44974..fe6a504 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -15,7 +15,7 @@ """ -from ._actor_dispatcher import ActorDispatcher, DispatchUpdate +from ._actor_dispatcher import ActorDispatcher, DispatchInfo from ._dispatch import Dispatch from ._dispatcher import Dispatcher from ._event import Created, Deleted, DispatchEvent, Updated @@ -28,5 +28,5 @@ "Updated", "Dispatch", "ActorDispatcher", - "DispatchUpdate", + "DispatchInfo", ] diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index fb62d41..6262f3a 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -19,7 +19,7 @@ @dataclass(frozen=True, kw_only=True) -class DispatchUpdate: +class DispatchInfo: """Event emitted when the dispatch changes.""" components: TargetComponents @@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService): import os import asyncio from typing import override - from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate + from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo from frequenz.client.dispatch.types import TargetComponents from frequenz.client.common.microgrid.components import ComponentCategory from frequenz.channels import Receiver, Broadcast, select, selected_from @@ -54,15 +54,15 @@ def __init__( name: str | None = None, ) -> None: super().__init__(name=name) - self._dispatch_updates_receiver: Receiver[DispatchUpdate] | None = None + self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None self._dry_run: bool = False self._options: dict[str, Any] = {} @classmethod def new_with_dispatch( cls, - initial_dispatch: DispatchUpdate, - dispatch_updates_receiver: Receiver[DispatchUpdate], + initial_dispatch: DispatchInfo, + dispatch_updates_receiver: Receiver[DispatchInfo], *, name: str | None = None, ) -> "Self": @@ -92,7 +92,7 @@ async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None: else: assert False, f"Unexpected selected receiver: {selected}" - def _update_dispatch_information(self, dispatch_update: DispatchUpdate) -> None: + def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None: print("Received update:", dispatch_update) self._dry_run = dispatch_update.dry_run self._options = dispatch_update.options @@ -136,7 +136,7 @@ async def main(): def __init__( self, - actor_factory: Callable[[DispatchUpdate, Receiver[DispatchUpdate]], Actor], + actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], running_status_receiver: Receiver[Dispatch], ) -> None: """Initialize the dispatch handler. @@ -150,7 +150,7 @@ def __init__( self._dispatch_rx = running_status_receiver self._actor_factory = actor_factory self._actor: Actor | None = None - self._updates_channel = Broadcast[DispatchUpdate]( + self._updates_channel = Broadcast[DispatchInfo]( name="dispatch_updates_channel", resend_latest=True ) self._updates_sender = self._updates_channel.new_sender() @@ -161,7 +161,7 @@ def start(self) -> None: async def _start_actor(self, dispatch: Dispatch) -> None: """Start all actors.""" - dispatch_update = DispatchUpdate( + dispatch_update = DispatchInfo( components=dispatch.target, dry_run=dispatch.dry_run, options=dispatch.payload, diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 17a183e..6463558 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -18,7 +18,7 @@ from frequenz.sdk.actor import Actor from pytest import fixture -from frequenz.dispatch import ActorDispatcher, Dispatch, DispatchUpdate +from frequenz.dispatch import ActorDispatcher, Dispatch, DispatchInfo from frequenz.dispatch._bg_service import DispatchScheduler @@ -48,7 +48,7 @@ class MockActor(Actor): """Mock actor for testing.""" def __init__( - self, initial_dispatch: DispatchUpdate, receiver: Receiver[DispatchUpdate] + self, initial_dispatch: DispatchInfo, receiver: Receiver[DispatchInfo] ) -> None: """Initialize the actor.""" super().__init__(name="MockActor") @@ -79,7 +79,7 @@ def actor(self) -> MockActor | None: # pylint: enable=protected-access @property - def updates_receiver(self) -> Receiver[DispatchUpdate]: + def updates_receiver(self) -> Receiver[DispatchInfo]: """Return the updates receiver.""" assert self.actor is not None return self.actor.receiver @@ -128,7 +128,7 @@ async def test_simple_start_stop( ), ) - # Send status update to start actor, expect no DispatchUpdate for the start + # Send status update to start actor, expect no DispatchInfo for the start await test_env.running_status_sender.send(Dispatch(dispatch)) fake_time.shift(timedelta(seconds=1)) await asyncio.sleep(1)