diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 61ee6f2ad..f4c87e7fa 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +Added SteamBoilerPool, which allows users to control a pool of steam boiler components. ## Bug Fixes diff --git a/pyproject.toml b/pyproject.toml index 6b9ccc3fe..9109cbe86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ # Make sure to update the mkdocs.yml file when # changing the version # (plugins.mkdocstrings.handlers.python.import) - "frequenz-client-microgrid >= 0.18.1, < 0.19.0", + "frequenz-client-microgrid >= 0.18.3, < 0.19.0", "frequenz-microgrid-component-graph >= 0.4, < 0.5", "frequenz-client-common >= 0.3.6, < 0.4.0", "frequenz-channels >= 1.6.1, < 2.0.0", diff --git a/src/frequenz/sdk/microgrid/__init__.py b/src/frequenz/sdk/microgrid/__init__.py index a9d049bcd..e4ead86f4 100644 --- a/src/frequenz/sdk/microgrid/__init__.py +++ b/src/frequenz/sdk/microgrid/__init__.py @@ -428,6 +428,7 @@ async def initialize( "new_battery_pool", "new_ev_charger_pool", "new_pv_pool", + "new_steam_boiler_pool", "producer", "voltage_per_phase", ] diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index a2ced76db..6f774fe5d 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -18,12 +18,21 @@ from frequenz.channels import Broadcast, Sender from frequenz.client.common.microgrid.components import ComponentId -from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter +from frequenz.client.microgrid.component import ( + Battery, + EvCharger, + SolarInverter, + SteamBoiler, +) from .._internal._channels import ChannelRegistry from ..actor._actor import Actor from ..timeseries import ResamplerConfig from ..timeseries._voltage_streamer import VoltageStreamer +from ..timeseries.steam_boiler_pool import SteamBoilerPool +from ..timeseries.steam_boiler_pool._steam_boiler_pool_reference_store import ( + SteamBoilerPoolReferenceStore, +) from ._data_sourcing import ComponentMetricRequest, DataSourcingActor from ._power_managing._base_classes import DefaultPower, PowerManagerAlgorithm from ._power_wrapper import PowerWrapper @@ -127,6 +136,13 @@ def __init__( # https://github.com/frequenz-floss/frequenz-sdk-python/issues/1285 component_class=SolarInverter, ) + self._steam_boiler_power_wrapper = PowerWrapper( + self._channel_registry, + api_power_request_timeout=api_power_request_timeout, + power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA, + default_power=DefaultPower.MAX, + component_class=SteamBoiler, + ) self._logical_meter: LogicalMeter | None = None self._consumer: Consumer | None = None @@ -141,6 +157,9 @@ def __init__( self._pv_pool_reference_stores: dict[ frozenset[ComponentId], PVPoolReferenceStore ] = {} + self._steam_boiler_pool_reference_stores: dict[ + frozenset[ComponentId], SteamBoilerPoolReferenceStore + ] = {} self._frequency_instance: GridFrequency | None = None self._voltage_instance: VoltageStreamer | None = None @@ -447,6 +466,82 @@ def new_battery_pool( priority=priority, ) + def new_steam_boiler_pool( + self, + *, + priority: int, + component_ids: abc.Set[ComponentId] | None = None, + name: str | None = None, + ) -> SteamBoilerPool: + """Return the corresponding SteamBoilerPool instance for the given ids. + + If a SteamBoilerPool instance for the given ids doesn't exist, a new one is + created and returned. + + Args: + priority: The priority of the actor making the call. + component_ids: Optional set of IDs of steam boilers to be managed by the + SteamBoilerPool. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + + Returns: + A SteamBoilerPool instance. + """ + from ..timeseries.steam_boiler_pool import SteamBoilerPool + from ..timeseries.steam_boiler_pool._steam_boiler_pool_reference_store import ( + SteamBoilerPoolReferenceStore, + ) + + if not self._steam_boiler_power_wrapper.started: + self._steam_boiler_power_wrapper.start() + + # We use frozenset to make a hashable key from the input set. + ref_store_key: frozenset[ComponentId] = frozenset() + if component_ids is not None: + ref_store_key = frozenset(component_ids) + + pool_key = f"{ref_store_key}-{priority}" + if pool_key in self._known_pool_keys: + _logger.warning( + "A SteamBoilerPool instance was already created for steam_boiler_ids=%s " + "and priority=%s using `microgrid.steam_boiler_pool(...)`." + "\n Hint: If the multiple instances are created from the same actor, " + "consider reusing the same instance." + "\n Hint: If the instances are created from different actors, " + "consider using different priorities to distinguish them.", + component_ids, + priority, + ) + else: + self._known_pool_keys.add(pool_key) + + if ref_store_key not in self._steam_boiler_pool_reference_stores: + self._steam_boiler_pool_reference_stores[ref_store_key] = ( + SteamBoilerPoolReferenceStore( + channel_registry=self._channel_registry, + resampler_subscription_sender=self._resampling_request_sender(), + status_receiver=self._steam_boiler_power_wrapper.status_channel.new_receiver( + limit=1 + ), + power_manager_requests_sender=( + self._steam_boiler_power_wrapper.proposal_channel.new_sender() + ), + power_manager_bounds_subs_sender=( + self._steam_boiler_power_wrapper.bounds_subscription_channel.new_sender() + ), + power_distribution_results_fetcher=( + self._steam_boiler_power_wrapper.distribution_results_fetcher() + ), + component_ids=component_ids, + ) + ) + return SteamBoilerPool( + pool_ref_store=self._steam_boiler_pool_reference_stores[ref_store_key], + name=name, + priority=priority, + ) + def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]: """Return a Sender for sending requests to the data sourcing actor. @@ -503,12 +598,15 @@ async def _stop(self) -> None: await self._battery_power_wrapper.stop() await self._ev_power_wrapper.stop() await self._pv_power_wrapper.stop() + await self._steam_boiler_power_wrapper.stop() for pool in self._battery_pool_reference_stores.values(): await pool.stop() for evpool in self._ev_charger_pool_reference_stores.values(): await evpool.stop() for pvpool in self._pv_pool_reference_stores.values(): await pvpool.stop() + for steam_boiler_pool in self._steam_boiler_pool_reference_stores.values(): + await steam_boiler_pool.stop() _DATA_PIPELINE: _DataPipeline | None = None @@ -682,6 +780,45 @@ def new_pv_pool( return _get().new_pv_pool(priority=priority, component_ids=component_ids, name=name) +def new_steam_boiler_pool( + *, + priority: int, + component_ids: abc.Set[ComponentId] | None = None, + name: str | None = None, +) -> SteamBoilerPool: + """Return a new `SteamBoilerPool` instance for the given parameters. + + The priority value is used to resolve conflicts when multiple actors are trying to + propose different power values for the same set of steam boilers. + + !!! note + When specifying priority, bigger values indicate higher priority. + + It is recommended to reuse the same instance of the `SteamBoilerPool` within the same + actor, unless they are managing different sets of steam boilers. + + In deployments with multiple actors managing the same set of steam boilers, it is + recommended to use different priorities to distinguish between them. If not, + a random prioritization will be imposed on them to resolve conflicts, which may + lead to unexpected behavior like longer duration to converge on the desired + power. + + Args: + priority: The priority of the actor making the call. + component_ids: Optional set of IDs of steam boilers to be managed by the + `SteamBoilerPool`. If not specified, all steam boilers available in the component + graph are used. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + + Returns: + A `SteamBoilerPool` instance. + """ + return _get().new_steam_boiler_pool( + priority=priority, component_ids=component_ids, name=name + ) + + def grid() -> Grid: """Return the grid measuring point.""" return _get().grid() diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index 70e2e4e55..5c3ac3cbf 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -9,7 +9,12 @@ from datetime import timedelta from frequenz.channels import Broadcast -from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter +from frequenz.client.microgrid.component import ( + Battery, + EvCharger, + SolarInverter, + SteamBoiler, +) from .._internal._channels import ChannelRegistry, ReceiverFetcher @@ -40,7 +45,7 @@ def __init__( # pylint: disable=too-many-arguments api_power_request_timeout: timedelta, power_manager_algorithm: PowerManagerAlgorithm, default_power: DefaultPower, - component_class: type[Battery | EvCharger | SolarInverter], + component_class: type[Battery | EvCharger | SolarInverter, SteamBoiler], ): """Initialize the power control. diff --git a/src/frequenz/sdk/timeseries/steam_boiler_pool/__init__.py b/src/frequenz/sdk/timeseries/steam_boiler_pool/__init__.py new file mode 100644 index 000000000..1be6ef82a --- /dev/null +++ b/src/frequenz/sdk/timeseries/steam_boiler_pool/__init__.py @@ -0,0 +1,13 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Interactions with pools of steam boilers.""" + +from ._result_types import SteamBoilerPoolReport +from ._steam_boiler_pool import SteamBoilerPool, SteamBoilerPoolError + +__all__ = [ + "SteamBoilerPool", + "SteamBoilerPoolError", + "SteamBoilerPoolReport", +] diff --git a/src/frequenz/sdk/timeseries/steam_boiler_pool/_result_types.py b/src/frequenz/sdk/timeseries/steam_boiler_pool/_result_types.py new file mode 100644 index 000000000..0c1e9ade2 --- /dev/null +++ b/src/frequenz/sdk/timeseries/steam_boiler_pool/_result_types.py @@ -0,0 +1,27 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Types for exposing steam boiler pool reports.""" + +import typing + +from frequenz.quantities import Power + +from .. import Bounds +from ..component_pool._component_pool_report import ComponentPoolReport + + +class SteamBoilerPoolReport(ComponentPoolReport, typing.Protocol): + """A status report for a steam boiler pool.""" + + @property + def target_power(self) -> Power | None: + """The currently set power for the steam boilers.""" + + @property + def bounds(self) -> Bounds[Power] | None: + """The usable bounds for the steam boilers. + + These bounds are adjusted to any restrictions placed by actors with higher + priorities. + """ diff --git a/src/frequenz/sdk/timeseries/steam_boiler_pool/_steam_boiler_pool.py b/src/frequenz/sdk/timeseries/steam_boiler_pool/_steam_boiler_pool.py new file mode 100644 index 000000000..8f0035b28 --- /dev/null +++ b/src/frequenz/sdk/timeseries/steam_boiler_pool/_steam_boiler_pool.py @@ -0,0 +1,46 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Interactions with pools of steam boilers.""" + +from frequenz.quantities import Power +from typing_extensions import override + +from ...microgrid import connection_manager +from ..component_pool import ComponentPool +from ..formulas import Formula +from ._result_types import SteamBoilerPoolReport +from ._steam_boiler_pool_reference_store import SteamBoilerPoolReferenceStore + + +class SteamBoilerPoolError(Exception): + """An error that occurred in any of the SteamBoilerPool methods.""" + + +class SteamBoilerPool( + ComponentPool[SteamBoilerPoolReferenceStore, SteamBoilerPoolReport] +): + """An interface for interaction with pools of steam boilers.""" + + @property + @override + def power(self) -> Formula[Power]: + """Fetch the total power for the steam boilers in the pool. + + This formula produces values that are in the Passive Sign Convention (PSC). + + If a formula to calculate steam boiler power is not already running, it + will be started. + + A receiver from the formula can be created using the `new_receiver` + method. + + Returns: + A Formula that will calculate and stream the total power of all steam boilers. + """ + return self._pool_ref_store.formula_pool.from_power_formula( + "steam_boiler_power", + connection_manager.get().component_graph.steam_boiler_formula( + self._pool_ref_store.component_ids + ), + ) diff --git a/src/frequenz/sdk/timeseries/steam_boiler_pool/_steam_boiler_pool_reference_store.py b/src/frequenz/sdk/timeseries/steam_boiler_pool/_steam_boiler_pool_reference_store.py new file mode 100644 index 000000000..954584f4a --- /dev/null +++ b/src/frequenz/sdk/timeseries/steam_boiler_pool/_steam_boiler_pool_reference_store.py @@ -0,0 +1,50 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Manages shared state/tasks for a set of steam boilers.""" + +import uuid +from typing import Type + +from frequenz.client.microgrid.component import Component, SteamBoiler +from typing_extensions import override + +from ..component_pool._component_pool_reference_store import ComponentPoolReferenceStore + + +class SteamBoilerPoolReferenceStore(ComponentPoolReferenceStore): + """A class for maintaining the shared state/tasks for a set of pools of steam boilers. + + This includes ownership of + - the formula pool and metric calculators. + - the tasks for calculating system bounds for the steam boilers. + + These are independent of the priority of the actors and can be shared between + multiple users of the same set of steam boilers. + + They are exposed through the SteamBoilerPool class. + """ + + @staticmethod + def get_component_class() -> Type[Component]: + """Class of the component type.""" + return SteamBoiler + + @staticmethod + def get_pool_type_name() -> str: + """Name of the pool type, for display purposes.""" + return "SteamBoilerPool" + + @staticmethod + def get_component_type_name_plural() -> str: + """Name of the component type, for display purposes.""" + return "steam boilers" + + @override + def get_namespace(self) -> str: + """Namespace to use with the data pipeline.""" + return f"steam-boiler-pool-{uuid.uuid4()}" + + @override + def create_bounds_tracker(self) -> None: + """Create the bounds tracker for the pool.""" diff --git a/tests/timeseries/_steam_boiler_pool/__init__.py b/tests/timeseries/_steam_boiler_pool/__init__.py new file mode 100644 index 000000000..3fbbdbed7 --- /dev/null +++ b/tests/timeseries/_steam_boiler_pool/__init__.py @@ -0,0 +1,4 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Test the steam boiler pool control methods.""" diff --git a/tests/timeseries/_steam_boiler_pool/test_steam_boiler_pool.py b/tests/timeseries/_steam_boiler_pool/test_steam_boiler_pool.py new file mode 100644 index 000000000..2b4af2426 --- /dev/null +++ b/tests/timeseries/_steam_boiler_pool/test_steam_boiler_pool.py @@ -0,0 +1,104 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Tests for the `SteamBoilerPool`.""" + +import asyncio +from unittest.mock import MagicMock + +from frequenz.channels import Broadcast +from frequenz.client.common.microgrid.components import ComponentId +from frequenz.quantities import Power +from pytest_mock import MockerFixture + +from frequenz.sdk import microgrid, timeseries +from frequenz.sdk._internal._channels import ChannelRegistry +from frequenz.sdk.microgrid._power_managing import ReportRequest, _Report +from frequenz.sdk.timeseries.steam_boiler_pool import SteamBoilerPool +from frequenz.sdk.timeseries.steam_boiler_pool._steam_boiler_pool_reference_store import ( + SteamBoilerPoolReferenceStore, +) +from tests.timeseries.mock_microgrid import MockMicrogrid + + +def _new_power_status_report(target_power_watts: float) -> _Report: + """Create a distinct report for power status assertions.""" + target_power = Power.from_watts(target_power_watts) + return _Report( + target_power=target_power, + _inclusion_bounds=timeseries.Bounds(target_power, target_power), + _exclusion_bounds=None, + ) + + +class TestSteamBoilerPool: + """Tests for the `SteamBoilerPool`.""" + + async def test_power( # pylint: disable=too-many-locals + self, + mocker: MockerFixture, + ) -> None: + """Test the power formula.""" + mockgrid = MockMicrogrid(grid_meter=True, mocker=mocker) + mockgrid.add_steam_boilers(3) + + async with mockgrid: + steam_boiler_pool = microgrid.new_steam_boiler_pool(priority=5) + power_receiver = steam_boiler_pool.power.new_receiver() + + await mockgrid.mock_resampler.send_meter_power([16.0]) + assert (await power_receiver.receive()).value == Power.from_watts(16.0) + + +async def test_power_status_same_instance_subscriptions_work( + mocker: MockerFixture, +) -> None: + """Ensure same-instance power_status subscriptions share the same channel.""" + mock_cm = MagicMock() + mock_graph = MagicMock() + mock_graph.components.return_value = [ + MagicMock(id=ComponentId(12)), + MagicMock(id=ComponentId(22)), + ] + mock_cm.component_graph = mock_graph + mocker.patch( + "frequenz.sdk.microgrid.connection_manager._CONNECTION_MANAGER", + mock_cm, + ) + mocker.patch("frequenz.sdk.microgrid.connection_manager.get", return_value=mock_cm) + + registry = ChannelRegistry(name="steam_boiler-pool-test") + requests_channel = Broadcast[ReportRequest](name="steam_boiler-pool-requests") + requests_rx = requests_channel.new_receiver() + component_ids = frozenset({ComponentId(12), ComponentId(22)}) + pool = SteamBoilerPool( + pool_ref_store=SteamBoilerPoolReferenceStore( + channel_registry=registry, + resampler_subscription_sender=MagicMock(), + status_receiver=MagicMock(), + power_manager_requests_sender=MagicMock(), + power_manager_bounds_subs_sender=requests_channel.new_sender(), + power_distribution_results_fetcher=MagicMock(), + component_ids=component_ids, + ), + name="steam_boiler-pool", + priority=5, + ) + + first_status_rx = pool.power_status.new_receiver() + second_status_rx = pool.power_status.new_receiver() + + await asyncio.sleep(0) + + first_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0) + second_request = await asyncio.wait_for(requests_rx.receive(), timeout=1.0) + assert second_request.get_channel_name() == first_request.get_channel_name() + + await registry.get_or_create( + _Report, first_request.get_channel_name() + ).new_sender().send(_new_power_status_report(123.0)) + + first_report = await asyncio.wait_for(first_status_rx.receive(), timeout=1.0) + second_report = await asyncio.wait_for(second_status_rx.receive(), timeout=1.0) + assert first_report.target_power == Power.from_watts(123.0) + assert second_report.target_power == Power.from_watts(123.0) diff --git a/tests/timeseries/_steam_boiler_pool/test_steam_boiler_pool_control_methods.py b/tests/timeseries/_steam_boiler_pool/test_steam_boiler_pool_control_methods.py new file mode 100644 index 000000000..8c2a90d81 --- /dev/null +++ b/tests/timeseries/_steam_boiler_pool/test_steam_boiler_pool_control_methods.py @@ -0,0 +1,259 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Test the EV charger pool control methods.""" + +import asyncio +from collections.abc import AsyncIterator, Callable +from datetime import datetime, timedelta, timezone +from typing import cast +from unittest.mock import AsyncMock, MagicMock + +import async_solipsism +import pytest +import time_machine +from frequenz.channels import Receiver +from frequenz.client.microgrid.component import ComponentStateCode +from frequenz.quantities import Power, Voltage +from pytest_mock import MockerFixture + +from frequenz.sdk import microgrid +from frequenz.sdk.microgrid import _power_distributing +from frequenz.sdk.microgrid._data_pipeline import _DataPipeline +from frequenz.sdk.microgrid._power_distributing import ComponentPoolStatus +from frequenz.sdk.microgrid._power_distributing._component_pool_status_tracker import ( + ComponentPoolStatusTracker, +) +from frequenz.sdk.timeseries import ResamplerConfig2, Sample3Phase +from frequenz.sdk.timeseries.ev_charger_pool import EVChargerPoolReport + +from ...microgrid.fixtures import _Mocks +from ...utils.component_data_streamer import MockComponentDataStreamer +from ...utils.component_data_wrapper import EvChargerDataWrapper, MeterDataWrapper +from ..mock_microgrid import MockMicrogrid + +# pylint: disable=protected-access + + +@pytest.fixture +def event_loop_policy() -> async_solipsism.EventLoopPolicy: + """Event loop policy.""" + return async_solipsism.EventLoopPolicy() + + +@pytest.fixture +async def mocks(mocker: MockerFixture) -> AsyncIterator[_Mocks]: + """Create the mocks.""" + mockgrid = MockMicrogrid(grid_meter=True) + mockgrid.add_ev_chargers(4) + await mockgrid.start(mocker) + + # pylint: disable=protected-access + if microgrid._data_pipeline._DATA_PIPELINE is not None: + microgrid._data_pipeline._DATA_PIPELINE = None + await microgrid._data_pipeline.initialize( + ResamplerConfig2(resampling_period=timedelta(seconds=0.1)) + ) + streamer = MockComponentDataStreamer(mockgrid.mock_client) + + dp = cast(_DataPipeline, microgrid._data_pipeline._DATA_PIPELINE) + + _mocks = _Mocks( + mockgrid, + streamer, + dp._ev_power_wrapper.status_channel.new_sender(), + ) + try: + yield _mocks + finally: + await _mocks.stop() + + +class TestEVChargerPoolControl: + """Test the EV charger pool control methods.""" + + async def _patch_ev_pool_status( + self, + mocks: _Mocks, + mocker: MockerFixture, + component_ids: list[int] | None = None, + ) -> None: + """Patch the EV charger pool status. + + If `component_ids` is not None, the mock will always return `component_ids`. + Otherwise, it will return the requested components. + """ + if component_ids: + mock = MagicMock(spec=ComponentPoolStatusTracker) + mock.get_working_components.return_value = component_ids + mocker.patch( + "frequenz.sdk.microgrid._power_distributing._component_managers" + "._ev_charger_manager._ev_charger_manager.ComponentPoolStatusTracker", + return_value=mock, + ) + else: + mock = MagicMock(spec=ComponentPoolStatusTracker) + mock.get_working_components.side_effect = set + mocker.patch( + "frequenz.sdk.microgrid._power_distributing._component_managers" + "._ev_charger_manager._ev_charger_manager.ComponentPoolStatusTracker", + return_value=mock, + ) + await mocks.component_status_sender.send( + ComponentPoolStatus(working=set(mocks.microgrid.evc_ids), uncertain=set()) + ) + + async def _patch_power_distributing_actor( + self, + mocker: MockerFixture, + ) -> None: + mocker.patch( + "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._ev_power_wrapper" + "._power_distributing_actor._component_manager._voltage_cache.get", + return_value=Sample3Phase( + timestamp=datetime.now(tz=timezone.utc), + value_p1=Voltage.from_volts(220.0), + value_p2=Voltage.from_volts(220.0), + value_p3=Voltage.from_volts(220.0), + ), + ) + + async def _init_ev_chargers(self, mocks: _Mocks) -> None: + now = datetime.now(tz=timezone.utc) + for evc_id in mocks.microgrid.evc_ids: + mocks.streamer.start_streaming( + EvChargerDataWrapper( + evc_id, + now, + states={ + ComponentStateCode.READY, + ComponentStateCode.EV_CHARGING_CABLE_PLUGGED_AT_EV, + ComponentStateCode.EV_CHARGING_CABLE_PLUGGED_AT_STATION, + }, + active_power=0.0, + active_power_inclusion_lower_bound=0.0, + active_power_inclusion_upper_bound=16.0 * 230.0 * 3, + voltage_per_phase=(230.0, 230.0, 230.0), + ), + 0.05, + ) + + for meter_id in mocks.microgrid.meter_ids: + mocks.streamer.start_streaming( + MeterDataWrapper( + meter_id, + now, + voltage_per_phase=(230.0, 230.0, 230.0), + ), + 0.05, + ) + + async def _recv_reports_until( + self, + bounds_rx: Receiver[EVChargerPoolReport], + check: Callable[[EVChargerPoolReport], bool], + ) -> EVChargerPoolReport | None: + """Receive reports until the given condition is met.""" + max_reports = 10 + ctr = 0 + while ctr < max_reports: + ctr += 1 + async with asyncio.timeout(10.0): + report = await bounds_rx.receive() + if check(report): + return report + return None + + def _assert_report( # pylint: disable=too-many-arguments + self, + report: EVChargerPoolReport | None, + *, + power: float | None, + lower: float, + upper: float, + dist_result: _power_distributing.Result | None = None, + expected_result_pred: ( + Callable[[_power_distributing.Result], bool] | None + ) = None, + ) -> None: + assert report is not None + assert report.target_power == ( + Power.from_watts(power) if power is not None else None + ) + assert report.bounds is not None + assert report.bounds.lower == Power.from_watts(lower) + assert report.bounds.upper == Power.from_watts(upper) + if expected_result_pred is not None: + assert dist_result is not None + assert expected_result_pred(dist_result) + + async def test_setting_power( + self, + mocks: _Mocks, + mocker: MockerFixture, + ) -> None: + """Test setting power.""" + traveller = time_machine.travel(datetime(2012, 12, 12, tzinfo=timezone.utc)) + mock_time = traveller.start() + + set_power = cast( + AsyncMock, + microgrid.connection_manager.get().api_client.set_component_power_active, + ) + await self._init_ev_chargers(mocks) + ev_charger_pool = microgrid.new_ev_charger_pool(priority=5) + await self._patch_ev_pool_status(mocks, mocker) + await self._patch_power_distributing_actor(mocker) + + bounds_rx = ev_charger_pool.power_status.new_receiver() + # Receive reports until all chargers are initialized + latest_report = await self._recv_reports_until( + bounds_rx, + lambda x: x.bounds is not None and x.bounds.upper.as_watts() == 44160.0, + ) + + self._assert_report(latest_report, power=None, lower=0.0, upper=44160.0) + + # Check that chargers are initialized to Power.zero() + assert set_power.call_count == 4 + assert all(x.args[1] == 0.0 for x in set_power.call_args_list) + + set_power.reset_mock() + await ev_charger_pool.propose_power(Power.from_watts(40000.0)) + # ignore one report because it is not always immediately updated. + latest_report = await self._recv_reports_until( + bounds_rx, + lambda r: r.target_power == Power.from_watts(40000.0), + ) + self._assert_report(latest_report, power=40000.0, lower=0.0, upper=44160.0) + mock_time.shift(timedelta(seconds=60)) + await asyncio.sleep(0.15) + + # Components are set initial power + assert set_power.call_count == 4 + assert all(x.args[1] == 6600.0 for x in set_power.call_args_list) + + # All available power is allocated. 3 chargers are set to 11040.0 + # and the last one is set to 6880.0 + set_power.reset_mock() + mock_time.shift(timedelta(seconds=60)) + await asyncio.sleep(0.15) + assert set_power.call_count == 4 + + evs_11040 = [x.args for x in set_power.call_args_list if x.args[1] == 11040.0] + assert 3 == len(evs_11040) + evs_6680 = [x.args for x in set_power.call_args_list if x.args[1] == 6880.0] + assert 1 == len(evs_6680) + + # Throttle the power + set_power.reset_mock() + await ev_charger_pool.propose_power(Power.from_watts(32000.0)) + await bounds_rx.receive() # Receive the next report and discard it. + await asyncio.sleep(0.02) + assert set_power.call_count == 1 + + stopped_evs = [x.args for x in set_power.call_args_list if x.args[1] == 0.0] + assert 1 == len(stopped_evs) + assert stopped_evs[0][0] in [evc[0] for evc in evs_11040] + + traveller.stop() diff --git a/tests/timeseries/mock_microgrid.py b/tests/timeseries/mock_microgrid.py index d58c6e201..18f0c140f 100644 --- a/tests/timeseries/mock_microgrid.py +++ b/tests/timeseries/mock_microgrid.py @@ -27,6 +27,7 @@ LiIonBattery, Meter, SolarInverter, + SteamBoiler, ) from frequenz.microgrid_component_graph import ComponentGraph from pytest_mock import MockerFixture @@ -55,6 +56,7 @@ class MockMicrogrid: # pylint: disable=too-many-instance-attributes grid_id = ComponentId(1) _grid_meter_id = ComponentId(4) + steam_boiler_id_suffix = 10 chp_id_suffix = 5 evc_id_suffix = 6 meter_id_suffix = 7 @@ -144,6 +146,7 @@ def inverters(component_type: type[Inverter]) -> list[ComponentId]: self.battery_inverter_ids: list[ComponentId] = inverters(BatteryInverter) self.pv_inverter_ids: list[ComponentId] = inverters(SolarInverter) + self.steam_boiler_ids: list[ComponentId] = filter_comp(SteamBoiler) self.bat_inv_map: dict[ComponentId, ComponentId] = ( {} @@ -350,6 +353,11 @@ def _start_ev_charger_streaming(self, evc_id: ComponentId) -> None: ) ) + def _start_steam_boiler_streaming(self, steam_boiler_id: ComponentId) -> None: + if not self._api_client_streaming: + return + # TODO + def add_consumer_meters(self, count: int = 1) -> None: """Add consumer meters to the mock microgrid. @@ -495,6 +503,27 @@ def add_ev_chargers(self, count: int) -> None: ComponentConnection(source=self._connect_to, destination=evc_id) ) + def add_steam_boilers(self, count: int) -> None: + """Add steam boilers to the microgrid. + + Args: + count: Number of steam boilers to add to the microgrid. + """ + for _ in range(count): + component_id = ComponentId( + self._id_increment * 10 + self.steam_boiler_id_suffix + ) + self._id_increment += 1 + self.steam_boiler_ids.append(component_id) + + self._components.add( + SteamBoiler(id=component_id, microgrid_id=_MICROGRID_ID) + ) + self._start_steam_boiler_streaming(component_id) + self._connections.add( + ComponentConnection(source=self._connect_to, destination=component_id) + ) + async def send_meter_data(self, values: list[float]) -> None: """Send raw meter data from the mock microgrid. diff --git a/tests/utils/graph_generator.py b/tests/utils/graph_generator.py index 584f56347..ed7edadb2 100644 --- a/tests/utils/graph_generator.py +++ b/tests/utils/graph_generator.py @@ -25,6 +25,7 @@ LiIonBattery, Meter, SolarInverter, + SteamBoiler, UnspecifiedInverter, ) from frequenz.microgrid_component_graph import ComponentGraph @@ -36,6 +37,7 @@ class GraphGenerator: """Utilities to generate graphs from component data structures.""" SUFFIXES: dict[ComponentCategory, int] = { + ComponentCategory.STEAM_BOILER: 4, ComponentCategory.CHP: 5, ComponentCategory.EV_CHARGER: 6, ComponentCategory.METER: 7, @@ -204,6 +206,11 @@ def component( id=self.new_id()[other], microgrid_id=_MICROGRID_ID, ) + case ComponentCategory.STEAM_BOILER: + return SteamBoiler( + id=self.new_id()[other], + microgrid_id=_MICROGRID_ID, + ) case _: assert False, "Unsupported ComponentCategory"