diff --git a/examples/battery_pool.py b/examples/battery_pool.py index 5df72a5c2..971256c37 100644 --- a/examples/battery_pool.py +++ b/examples/battery_pool.py @@ -30,8 +30,7 @@ async def main() -> None: receivers = [ battery_pool.soc.new_receiver(limit=1), battery_pool.capacity.new_receiver(limit=1), - # pylint: disable-next=protected-access - battery_pool._system_power_bounds.new_receiver(limit=1), + battery_pool.system_power_bounds.new_receiver(limit=1), ] async for metric in merge(*receivers): diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index aca47e4a9..a2ced76db 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -437,7 +437,7 @@ def new_battery_pool( self._battery_power_wrapper.distribution_results_fetcher() ), min_update_interval=self._resampler_config.resampling_period, - batteries_id=component_ids, + component_ids=component_ids, ) ) diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index 0dead0c29..d200bd0ff 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -149,20 +149,17 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[ComponentId]) -> N battery_pool = _data_pipeline.new_battery_pool( priority=-sys.maxsize - 1, component_ids=component_ids ) - # pylint: disable-next=protected-access - bounds_receiver = battery_pool._system_power_bounds.new_receiver() + bounds_receiver = battery_pool.system_power_bounds.new_receiver() elif issubclass(self._component_class, EvCharger): ev_charger_pool = _data_pipeline.new_ev_charger_pool( priority=-sys.maxsize - 1, component_ids=component_ids ) - # pylint: disable-next=protected-access - bounds_receiver = ev_charger_pool._system_power_bounds.new_receiver() + bounds_receiver = ev_charger_pool.system_power_bounds.new_receiver() elif issubclass(self._component_class, SolarInverter): pv_pool = _data_pipeline.new_pv_pool( priority=-sys.maxsize - 1, component_ids=component_ids ) - # pylint: disable-next=protected-access - bounds_receiver = pv_pool._system_power_bounds.new_receiver() + bounds_receiver = pv_pool.system_power_bounds.new_receiver() else: _logger.error( "PowerManagingActor: Unsupported component class: %s", diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 3be021a85..89b027129 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -9,18 +9,17 @@ """ import asyncio -import uuid -from collections import abc -from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Energy, Percentage, Power, Temperature +from typing_extensions import override from ... import timeseries -from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher -from ...microgrid import _power_distributing, _power_managing, connection_manager +from ..._internal._channels import ReceiverFetcher +from ...microgrid import _power_managing, connection_manager from ...timeseries import Sample from .._base_types import SystemBounds -from ..formulas._formula import Formula +from ..component_pool import ComponentPool +from ..formulas import Formula from ._battery_pool_reference_store import BatteryPoolReferenceStore from ._methods import SendOnUpdate from ._metric_calculator import ( @@ -34,7 +33,7 @@ # pylint: disable=protected-access -class BatteryPool: +class BatteryPool(ComponentPool[BatteryPoolReferenceStore, BatteryPoolReport]): """An interface for interaction with pools of batteries. Provides: @@ -50,65 +49,6 @@ class BatteryPool: [propose_discharge][frequenz.sdk.timeseries.battery_pool.BatteryPool.propose_discharge]. """ - def __init__( - self, - *, - pool_ref_store: BatteryPoolReferenceStore, - name: str | None, - priority: int, - ): - """Create a BatteryPool instance. - - !!! note - `BatteryPool` instances are not meant to be created directly by users. Use - the [`microgrid.new_battery_pool`][frequenz.sdk.microgrid.new_battery_pool] - method for creating `BatteryPool` instances. - - Args: - pool_ref_store: The battery pool reference store instance. - name: An optional name used to identify this instance of the pool or a - corresponding actor in the logs. - priority: The priority of the actor using this wrapper. - """ - self._pool_ref_store = pool_ref_store - unique_id = str(uuid.uuid4()) - self._source_id = unique_id if name is None else f"{name}-{unique_id}" - self._priority = priority - - async def propose_power( - self, - power: Power | None, - *, - bounds: timeseries.Bounds[Power | None] = timeseries.Bounds(None, None), - ) -> None: - """Send a proposal to the power manager for the pool's set of batteries. - - Power values need to follow the Passive Sign Convention (PSC). That is, positive - values indicate charge power and negative values indicate discharge power. - - Details on how the power manager handles proposals can be found in the - [Microgrid][frequenz.sdk.microgrid--setting-power] documentation. - - Args: - power: The power to propose for the batteries in the pool. If `None`, this - proposal will not have any effect on the target power, unless bounds are - specified. When specified without bounds, bounds for lower priority - actors will be shifted by this power. If both are `None`, it is - equivalent to not having a proposal or withdrawing a previous one. - bounds: The power bounds for the proposal. When specified, this will limit - the bounds for lower priority actors. - """ - await self._pool_ref_store._power_manager_requests_sender.send( - _power_managing.Proposal( - source_id=self._source_id, - preferred_power=power, - bounds=bounds, - component_ids=self._pool_ref_store._batteries, - priority=self._priority, - creation_time=asyncio.get_running_loop().time(), - ) - ) - async def propose_charge(self, power: Power | None) -> None: """Set the given charge power for the batteries in the pool. @@ -133,12 +73,12 @@ async def propose_charge(self, power: Power | None) -> None: """ if power and power < Power.zero(): raise ValueError("Charge power must be positive.") - await self._pool_ref_store._power_manager_requests_sender.send( + await self._pool_ref_store.power_manager_requests_sender.send( _power_managing.Proposal( source_id=self._source_id, preferred_power=power, bounds=timeseries.Bounds(None, None), - component_ids=self._pool_ref_store._batteries, + component_ids=self._pool_ref_store.component_ids, priority=self._priority, creation_time=asyncio.get_running_loop().time(), ) @@ -170,27 +110,19 @@ async def propose_discharge(self, power: Power | None) -> None: if power < Power.zero(): raise ValueError("Discharge power must be positive.") power = -power - await self._pool_ref_store._power_manager_requests_sender.send( + await self._pool_ref_store.power_manager_requests_sender.send( _power_managing.Proposal( source_id=self._source_id, preferred_power=power, bounds=timeseries.Bounds(None, None), - component_ids=self._pool_ref_store._batteries, + component_ids=self._pool_ref_store.component_ids, priority=self._priority, creation_time=asyncio.get_running_loop().time(), ) ) @property - def component_ids(self) -> abc.Set[ComponentId]: - """Return ids of the batteries in the pool. - - Returns: - Ids of the batteries in the pool - """ - return self._pool_ref_store._batteries - - @property + @override def power(self) -> Formula[Power]: """Fetch the total power of the batteries in the pool. @@ -206,10 +138,10 @@ def power(self) -> Formula[Power]: A Formula that will calculate and stream the total power of all batteries in the pool. """ - return self._pool_ref_store._formula_pool.from_power_formula( + return self._pool_ref_store.formula_pool.from_power_formula( "battery_pool_power", connection_manager.get().component_graph.battery_formula( - self._pool_ref_store._batteries + self._pool_ref_store.component_ids ), ) @@ -248,10 +180,12 @@ def soc(self) -> ReceiverFetcher[Sample[Percentage]]: batteries in the pool, considering only working batteries with operational inverters. """ + assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore) + method_name = SendOnUpdate.name() + "_" + SoCCalculator.name() if method_name not in self._pool_ref_store._active_methods: - calculator = SoCCalculator(self._pool_ref_store._batteries) + calculator = SoCCalculator(self._pool_ref_store.component_ids) self._pool_ref_store._active_methods[method_name] = SendOnUpdate( metric_calculator=calculator, working_batteries=self._pool_ref_store._working_batteries, @@ -268,9 +202,12 @@ def temperature(self) -> ReceiverFetcher[Sample[Temperature]]: A MetricAggregator that will calculate and stream the average temperature of all batteries in the pool. """ + assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore) + method_name = SendOnUpdate.name() + "_" + TemperatureCalculator.name() + if method_name not in self._pool_ref_store._active_methods: - calculator = TemperatureCalculator(self._pool_ref_store._batteries) + calculator = TemperatureCalculator(self._pool_ref_store.component_ids) self._pool_ref_store._active_methods[method_name] = SendOnUpdate( metric_calculator=calculator, working_batteries=self._pool_ref_store._working_batteries, @@ -305,10 +242,12 @@ def capacity(self) -> ReceiverFetcher[Sample[Energy]]: batteries in the pool, considering only working batteries with operational inverters. """ + assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore) + method_name = SendOnUpdate.name() + "_" + CapacityCalculator.name() if method_name not in self._pool_ref_store._active_methods: - calculator = CapacityCalculator(self._pool_ref_store._batteries) + calculator = CapacityCalculator(self._pool_ref_store.component_ids) self._pool_ref_store._active_methods[method_name] = SendOnUpdate( metric_calculator=calculator, working_batteries=self._pool_ref_store._working_batteries, @@ -317,59 +256,16 @@ def capacity(self) -> ReceiverFetcher[Sample[Energy]]: return self._pool_ref_store._active_methods[method_name] + @override @property - def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: - """Get a receiver to receive new power status reports when they change. - - These include - - the current inclusion/exclusion bounds available for the pool's priority, - - the current target power for the pool's set of batteries, - - the result of the last distribution request for the pool's set of batteries. - - Returns: - A receiver that will stream power status reports for the pool's priority. - """ - sub = _power_managing.ReportRequest( - source_id=self._source_id, - priority=self._priority, - component_ids=self._pool_ref_store._batteries, - ) - self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = ( - asyncio.create_task( - self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub) - ) - ) - channel = self._pool_ref_store._channel_registry.get_or_create( - _power_managing._Report, sub.get_channel_name() - ) - channel.resend_latest = True - - return channel - - @property - def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: - """Get a receiver to receive power distribution results. - - Returns: - A receiver that will stream power distribution results for the pool's set of - batteries. - """ - return MappingReceiverFetcher( - self._pool_ref_store._power_dist_results_fetcher, - lambda recv: recv.filter( - lambda x: x.request.component_ids == self._pool_ref_store._batteries - ), - ) - - @property - def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: + def system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: """Get receiver to receive new power bounds when they change. Power bounds refer to the min and max power that a battery can discharge or charge at and is also denoted as SoP. Power bounds formulas are described in the receiver return type. - None will be send if there is no component to calculate metrics. + None will be sent if there is no component to calculate metrics. A receiver from the MetricAggregator can be obtained by calling the `new_receiver` method. @@ -378,10 +274,12 @@ def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: A MetricAggregator that will calculate and stream the power bounds of all batteries in the pool. """ + assert isinstance(self._pool_ref_store, BatteryPoolReferenceStore) + method_name = SendOnUpdate.name() + "_" + PowerBoundsCalculator.name() if method_name not in self._pool_ref_store._active_methods: - calculator = PowerBoundsCalculator(self._pool_ref_store._batteries) + calculator = PowerBoundsCalculator(self._pool_ref_store.component_ids) self._pool_ref_store._active_methods[method_name] = SendOnUpdate( metric_calculator=calculator, working_batteries=self._pool_ref_store._working_batteries, @@ -389,12 +287,3 @@ def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: ) return self._pool_ref_store._active_methods[method_name] - - async def stop(self) -> None: - """Stop all tasks and channels owned by the BatteryPool.""" - # This was closing the pool_ref_store, which is not correct, because those are - # shared. - # - # This method will do until we have a mechanism to track the resources created - # through it. It can also eventually cleanup the pool_ref_store, when it is - # holding the last reference to it. diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py index 25ea9f3a3..76c024f51 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py @@ -1,5 +1,5 @@ # License: MIT -# Copyright © 2023 Frequenz Energy-as-a-Service GmbH +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH """User interface for requesting aggregated battery-inverter data.""" @@ -7,24 +7,24 @@ import uuid from collections.abc import Awaitable, Set from datetime import timedelta -from typing import Any +from typing import Any, Type from frequenz.channels import Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId -from frequenz.client.microgrid.component import Battery +from frequenz.client.microgrid.component import Battery, Component +from typing_extensions import override from ..._internal._asyncio import cancel_and_await from ..._internal._channels import ChannelRegistry, ReceiverFetcher -from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import Result from ...microgrid._power_distributing._component_status import ComponentPoolStatus from ...microgrid._power_managing._base_classes import Proposal, ReportRequest -from ..formulas._formula_pool import FormulaPool +from ..component_pool._component_pool_reference_store import ComponentPoolReferenceStore from ._methods import MetricAggregator -class BatteryPoolReferenceStore: # pylint: disable=too-many-instance-attributes +class BatteryPoolReferenceStore(ComponentPoolReferenceStore): """A class for maintaining the shared state/tasks for a set of pool of batteries. This includes ownership of @@ -47,7 +47,7 @@ def __init__( # pylint: disable=too-many-arguments power_manager_bounds_subscription_sender: Sender[ReportRequest], power_distribution_results_fetcher: ReceiverFetcher[Result], min_update_interval: timedelta, - batteries_id: Set[ComponentId] | None = None, + component_ids: Set[ComponentId] | None = None, ) -> None: """Create the class instance. @@ -77,84 +77,66 @@ def __init__( # pylint: disable=too-many-arguments the timestamp of the last received component data. It is currently impossible to use resampling actor for these metrics, because we can't specify resampling function for them. - batteries_id: Subset of the batteries that should be included in the - battery pool. If None or empty, then all batteries from the microgrid - will be used. - - Raises: - ValueError: If any of the specified batteries is not present in the - microgrid. + component_ids: An optional list of component_ids belonging to this pool. If + not specified, IDs of all components of the components type of this pool + in the microgrid will be fetched from the component graph. """ - self._batteries: frozenset[ComponentId] - all_batteries = self._get_all_batteries() - if batteries_id: - self._batteries = frozenset(batteries_id) - if not self._batteries.issubset(all_batteries): - unknown_ids = self._batteries - all_batteries - raise ValueError( - "Unable to create a BatteryPool. These component IDs are either " - + "not batteries or are unknown: " - + f"{unknown_ids}" - ) - else: - self._batteries = all_batteries + super().__init__( + channel_registry=channel_registry, + resampler_subscription_sender=resampler_subscription_sender, + status_receiver=batteries_status_receiver, + power_manager_requests_sender=power_manager_requests_sender, + power_manager_bounds_subs_sender=power_manager_bounds_subscription_sender, + power_distribution_results_fetcher=power_distribution_results_fetcher, + component_ids=component_ids, + ) + self._batteries = self.component_ids self._working_batteries: set[ComponentId] = set() - self._update_battery_status_task: asyncio.Task[None] | None = None - self._batteries_status_receiver: Receiver[ComponentPoolStatus] = ( - batteries_status_receiver - ) + if self._batteries: self._update_battery_status_task = asyncio.create_task( - self._update_battery_status(self._batteries_status_receiver) + self._update_battery_status(self.status_receiver) ) - self._min_update_interval: timedelta = min_update_interval + self._active_methods: dict[str, MetricAggregator[Any]] = {} + self._power_distributing_namespace: str = f"power-distributor-{self.namespace}" - self._power_manager_requests_sender: Sender[Proposal] = ( - power_manager_requests_sender - ) + @staticmethod + def get_component_class() -> Type[Component]: + """Class of the component type.""" + return Battery - self._power_manager_bounds_subscription_sender: Sender[ReportRequest] = ( - power_manager_bounds_subscription_sender - ) + @staticmethod + def get_pool_type_name() -> str: + """Name of the pool type, for display purposes.""" + return "BatteryPool" - self._active_methods: dict[str, MetricAggregator[Any]] = {} - self._power_bounds_subs: dict[str, asyncio.Task[None]] = {} - self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}" - self._power_distributing_namespace: str = f"power-distributor-{self._namespace}" - self._channel_registry: ChannelRegistry = channel_registry - self._power_dist_results_fetcher: ReceiverFetcher[Result] = ( - power_distribution_results_fetcher - ) - self._formula_pool: FormulaPool = FormulaPool( - self._namespace, - self._channel_registry, - resampler_subscription_sender, - ) + @staticmethod + def get_component_type_name_plural() -> str: + """Name of the component type, for display purposes.""" + return "batteries" + + @override + def get_namespace(self) -> str: + """Namespace to use with the data pipeline.""" + return f"battery-pool-{self.component_ids}-{uuid.uuid4()}" + + @override + def create_bounds_tracker(self) -> None: + """Create the bounds tracker for the pool.""" async def stop(self) -> None: - """Stop all pending async tasks.""" + """Stop all tasks and channels.""" + await super().stop() + tasks_to_stop: list[Awaitable[Any]] = [ method.stop() for method in self._active_methods.values() ] - tasks_to_stop.append(self._formula_pool.stop()) if self._update_battery_status_task: tasks_to_stop.append(cancel_and_await(self._update_battery_status_task)) await asyncio.gather(*tasks_to_stop) - self._batteries_status_receiver.close() - - def _get_all_batteries(self) -> frozenset[ComponentId]: - """Get all batteries from the microgrid. - - Returns: - All batteries in the microgrid. - """ - graph = connection_manager.get().component_graph - return frozenset( - battery.id for battery in graph.components(matching_types=Battery) - ) async def _update_battery_status( self, receiver: Receiver[ComponentPoolStatus] diff --git a/src/frequenz/sdk/timeseries/battery_pool/messages.py b/src/frequenz/sdk/timeseries/battery_pool/messages.py index 653fee6ff..7641ac6a0 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/messages.py +++ b/src/frequenz/sdk/timeseries/battery_pool/messages.py @@ -16,11 +16,12 @@ Success, ) from .._base_types import Bounds +from ..component_pool._component_pool_report import ComponentPoolReport # This class is used to expose the generic reports from the PowerManager with specific # documentation for the battery pool. -class BatteryPoolReport(typing.Protocol): +class BatteryPoolReport(ComponentPoolReport, typing.Protocol): """A status report for a battery pool.""" @property diff --git a/src/frequenz/sdk/timeseries/component_pool/__init__.py b/src/frequenz/sdk/timeseries/component_pool/__init__.py new file mode 100644 index 000000000..4da69a4b5 --- /dev/null +++ b/src/frequenz/sdk/timeseries/component_pool/__init__.py @@ -0,0 +1,10 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Manage a pool of components.""" + +from ._component_pool import ComponentPool + +__all__ = [ + "ComponentPool", +] diff --git a/src/frequenz/sdk/timeseries/component_pool/_component_pool.py b/src/frequenz/sdk/timeseries/component_pool/_component_pool.py new file mode 100644 index 000000000..41e33621a --- /dev/null +++ b/src/frequenz/sdk/timeseries/component_pool/_component_pool.py @@ -0,0 +1,160 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Abstract base class for component pools.""" + +import asyncio +import uuid +from abc import ABC, abstractmethod +from collections import abc +from typing import Generic, TypeVar, cast + +from frequenz.client.common.microgrid.components import ComponentId +from frequenz.quantities import Power + +from frequenz.sdk._internal._channels import MappingReceiverFetcher, ReceiverFetcher +from frequenz.sdk.microgrid import _power_distributing, _power_managing +from frequenz.sdk.timeseries import Bounds +from frequenz.sdk.timeseries._base_types import SystemBounds +from frequenz.sdk.timeseries.formulas import Formula + +from ._component_pool_reference_store import ComponentPoolReferenceStore +from ._component_pool_report import ComponentPoolReport + +RefStoreT = TypeVar("RefStoreT", bound=ComponentPoolReferenceStore) +ReportT = TypeVar("ReportT", bound=ComponentPoolReport) + + +class ComponentPool(ABC, Generic[RefStoreT, ReportT]): + """Abstract base class for component pools.""" + + def __init__( # pylint: disable=too-many-arguments + self, + *, + pool_ref_store: RefStoreT, + name: str | None, + priority: int, + ) -> None: + """Create an `AbstractPool` instance. + + Args: + pool_ref_store: The pool reference store instance. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor using this wrapper. + """ + self._pool_ref_store = pool_ref_store + unique_id = str(uuid.uuid4()) + self._source_id = unique_id if name is None else f"{name}-{unique_id}" + self._priority = priority + + @property + def component_ids(self) -> abc.Set[ComponentId]: + """Return component IDs of all component IDs managed by this pool. + + Returns: + Set of managed component IDs. + """ + return self._pool_ref_store.component_ids + + async def propose_power( + self, + power: Power | None, + bounds: Bounds[Power | None] = Bounds(None, None), + ) -> None: + """Send a proposal to the power manager for the pool's underlying components. + + This proposal is for the maximum power that can be set for the components in + the pool. The actual production or consumption might be lower. + + Details on how the power manager handles proposals can be found in the + [Microgrid][frequenz.sdk.microgrid--setting-power] documentation. + + Args: + power: The power to propose. If `None`, + this proposal will not have any effect on the target power, unless + bounds are specified. When specified without bounds, bounds for lower + priority actors will be shifted by this power. If both are `None`, it + is equivalent to not having a proposal or withdrawing a previous one. + bounds: The power bounds for the proposal. When specified, these bounds will + limit the bounds for lower priority actors. + """ + await self._pool_ref_store.power_manager_requests_sender.send( + _power_managing.Proposal( + source_id=self._source_id, + preferred_power=power, + bounds=bounds, + component_ids=self._pool_ref_store.component_ids, + priority=self._priority, + creation_time=asyncio.get_running_loop().time(), + ) + ) + + @property + @abstractmethod + def power(self) -> Formula[Power]: + """Fetch the total power for the components in the pool. + + Returns: + A Formula that will calculate and stream the total power of all + components in the pool. + """ + + @property + def power_status(self) -> ReceiverFetcher[ReportT]: + """Get a receiver to receive new power status reports when they change. + + These include + - the current inclusion/exclusion bounds available for the pool's priority, + - the current target power for the pool's set of components, + - the result of the last distribution request for the pool's set of components,. + + Returns: + A receiver that will stream power status reports for the pool's priority. + """ + sub = _power_managing.ReportRequest( + source_id=self._source_id, + priority=self._priority, + component_ids=self._pool_ref_store.component_ids, + ) + self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( + asyncio.create_task( + self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) + ) + ) + channel = self._pool_ref_store.channel_registry.get_or_create( + _power_managing._Report, # pylint: disable=protected-access + sub.get_channel_name(), + ) + channel.resend_latest = True + + return cast(ReceiverFetcher[ReportT], channel) + + @property + def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: + """Get a receiver to receive power distribution results. + + Returns: + A receiver that will stream power distribution results for the pool's set of + components. + """ + return MappingReceiverFetcher( + self._pool_ref_store.power_distribution_results_fetcher, + lambda recv: recv.filter( + lambda x: x.request.component_ids == self._pool_ref_store.component_ids + ), + ) + + @property + def system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: + """Return a receiver fetcher for the system power bounds.""" + return self._pool_ref_store.bounds_channel + + async def stop(self) -> None: + """Stop all tasks and channels owned by the pool.""" + # This was closing the pool_ref_store, which is not correct, because those are + # shared. + # + # This method will do until we have a mechanism to track the resources created + # through it. It can also eventually cleanup the pool_ref_store, when it is + # holding the last reference to it. diff --git a/src/frequenz/sdk/timeseries/component_pool/_component_pool_reference_store.py b/src/frequenz/sdk/timeseries/component_pool/_component_pool_reference_store.py new file mode 100644 index 000000000..df9f2859a --- /dev/null +++ b/src/frequenz/sdk/timeseries/component_pool/_component_pool_reference_store.py @@ -0,0 +1,133 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Abstract base class for pool reference stores.""" + +import asyncio +from abc import ABC, abstractmethod +from collections import abc +from typing import Type + +from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.common.microgrid.components import ComponentId +from frequenz.client.microgrid.component import Component + +from frequenz.sdk._internal._channels import ChannelRegistry, ReceiverFetcher +from frequenz.sdk.actor import BackgroundService +from frequenz.sdk.microgrid import connection_manager +from frequenz.sdk.microgrid._data_sourcing import ComponentMetricRequest +from frequenz.sdk.microgrid._power_distributing import ComponentPoolStatus, Result +from frequenz.sdk.microgrid._power_managing import Proposal, ReportRequest +from frequenz.sdk.timeseries._base_types import SystemBounds +from frequenz.sdk.timeseries.formulas._formula_pool import FormulaPool + + +class ComponentPoolReferenceStore(ABC): + """Abstract base class for pool reference stores.""" + + def __init__( # pylint: disable=too-many-arguments + self, + *, + channel_registry: ChannelRegistry, + resampler_subscription_sender: Sender[ComponentMetricRequest], + status_receiver: Receiver[ComponentPoolStatus], + power_manager_requests_sender: Sender[Proposal], + power_manager_bounds_subs_sender: Sender[ReportRequest], + power_distribution_results_fetcher: ReceiverFetcher[Result], + component_ids: abc.Set[ComponentId] | None = None, + ): + """Initialize this instance. + + Args: + channel_registry: A channel registry instance shared with the resampling + actor. + resampler_subscription_sender: A sender for sending metric requests to the + resampling actor. + status_receiver: A receiver that streams the status of the components in + the pool. + power_manager_requests_sender: A Channel sender for sending power + requests to the power managing actor. + power_manager_bounds_subs_sender: A Channel sender for sending power bounds + subscription requests to the power managing actor. + power_distribution_results_fetcher: A ReceiverFetcher for the results from + the power distributing actor. + component_ids: An optional list of component_ids belonging to this pool. If + not specified, IDs of all components of the components type of this pool + in the microgrid will be fetched from the component graph. + + Raises: + ValueError: If any of the provided component_ids are not of correct type or + are unknown to the component graph. + """ + self.channel_registry = channel_registry + self.resampler_subscription_sender = resampler_subscription_sender + self.status_receiver = status_receiver + self.power_manager_requests_sender = power_manager_requests_sender + self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender + self.power_distribution_results_fetcher = power_distribution_results_fetcher + + graph = connection_manager.get().component_graph + all_components = frozenset( + { + inv.id + for inv in graph.components(matching_types=self.get_component_class()) + } + ) + + if component_ids is not None: + self.component_ids: frozenset[ComponentId] = frozenset(component_ids) + if not self.component_ids.issubset(all_components): + unknown_ids = self.component_ids - all_components + raise ValueError( + f"Unable to create {self.get_pool_type_name()}. These component IDs " + + f"are either not {self.get_component_type_name_plural()} or are unknown: " + + f"{unknown_ids}" + ) + else: + self.component_ids = all_components + + self.power_bounds_subs: dict[str, asyncio.Task[None]] = {} + + self.namespace = self.get_namespace() + self.formula_pool = FormulaPool( + self.namespace, + self.channel_registry, + self.resampler_subscription_sender, + ) + self.bounds_channel: Broadcast[SystemBounds] = Broadcast( + name=f"System Bounds for {self.get_component_type_name_plural()}: {self.component_ids}", + resend_latest=True, + ) + + self.bounds_tracker: BackgroundService | None = None + self.create_bounds_tracker() + + @staticmethod + @abstractmethod + def get_component_class() -> Type[Component]: + """Class of the component type.""" + + @staticmethod + @abstractmethod + def get_pool_type_name() -> str: + """Name of the pool type, for display purposes.""" + + @staticmethod + @abstractmethod + def get_component_type_name_plural() -> str: + """Name of the component type, for display purposes.""" + + @abstractmethod + def get_namespace(self) -> str: + """Namespace to use with the data pipeline.""" + + @abstractmethod + def create_bounds_tracker(self) -> None: + """Create the bounds tracker for the pool.""" + + async def stop(self) -> None: + """Stop all tasks and channels.""" + await self.formula_pool.stop() + if self.bounds_tracker is not None: + await self.bounds_tracker.stop() + self.status_receiver.close() diff --git a/src/frequenz/sdk/timeseries/component_pool/_component_pool_report.py b/src/frequenz/sdk/timeseries/component_pool/_component_pool_report.py new file mode 100644 index 000000000..12f17a4ac --- /dev/null +++ b/src/frequenz/sdk/timeseries/component_pool/_component_pool_report.py @@ -0,0 +1,26 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Types for exposing component pool reports.""" + +import typing + +from frequenz.quantities import Power + +from .._base_types import Bounds + + +class ComponentPoolReport(typing.Protocol): + """A status report for a component pool.""" + + @property + def target_power(self) -> Power | None: + """The currently set power for the components.""" + + @property + def bounds(self) -> Bounds[Power] | None: + """The usable bounds for the components. + + These bounds are adjusted to any restrictions placed by actors with higher + priorities. + """ diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 46aa22ee8..c8d7046ab 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -3,19 +3,13 @@ """Interactions with pools of EV Chargers.""" -import asyncio -import uuid -from collections import abc - -from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Current, Power +from typing_extensions import override -from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher -from ...microgrid import _power_distributing, _power_managing, connection_manager +from ...microgrid import connection_manager from ...timeseries import Bounds -from .._base_types import SystemBounds -from ..formulas._formula import Formula -from ..formulas._formula_3_phase import Formula3Phase +from ..component_pool import ComponentPool +from ..formulas import Formula, Formula3Phase from ._ev_charger_pool_reference_store import EVChargerPoolReferenceStore from ._result_types import EVChargerPoolReport @@ -24,7 +18,7 @@ class EVChargerPoolError(Exception): """An error that occurred in any of the EVChargerPool methods.""" -class EVChargerPool: +class EVChargerPool(ComponentPool[EVChargerPoolReferenceStore, EVChargerPoolReport]): """An interface for interaction with pools of EV Chargers. Provides: @@ -34,37 +28,10 @@ class EVChargerPool: measurements of the EV Chargers in the pool. """ - def __init__( # pylint: disable=too-many-arguments - self, - *, - pool_ref_store: EVChargerPoolReferenceStore, - name: str | None, - priority: int, - ) -> None: - """Create an `EVChargerPool` instance. - - !!! note - - `EVChargerPool` instances are not meant to be created directly by users. Use - the - [`microgrid.new_ev_charger_pool`][frequenz.sdk.microgrid.new_ev_charger_pool] - method for creating `EVChargerPool` instances. - - Args: - pool_ref_store: The EV charger pool reference store instance. - name: An optional name used to identify this instance of the pool or a - corresponding actor in the logs. - priority: The priority of the actor using this wrapper. - """ - self._pool_ref_store = pool_ref_store - unique_id = str(uuid.uuid4()) - self._source_id = unique_id if name is None else f"{name}-{unique_id}" - self._priority = priority - + @override async def propose_power( self, power: Power | None, - *, bounds: Bounds[Power | None] = Bounds(None, None), ) -> None: """Send a proposal to the power manager for the pool's set of EV chargers. @@ -92,25 +59,7 @@ async def propose_power( raise EVChargerPoolError( "Discharging from EV chargers is currently not supported." ) - await self._pool_ref_store.power_manager_requests_sender.send( - _power_managing.Proposal( - source_id=self._source_id, - preferred_power=power, - bounds=bounds, - component_ids=self._pool_ref_store.component_ids, - priority=self._priority, - creation_time=asyncio.get_running_loop().time(), - ) - ) - - @property - def component_ids(self) -> abc.Set[ComponentId]: - """Return component IDs of all EV Chargers managed by this EVChargerPool. - - Returns: - Set of managed component IDs. - """ - return self._pool_ref_store.component_ids + await super().propose_power(power, bounds=bounds) @property def current_per_phase(self) -> Formula3Phase[Current]: @@ -136,6 +85,7 @@ def current_per_phase(self) -> Formula3Phase[Current]: ) @property + @override def power(self) -> Formula[Power]: """Fetch the total power for the EV Chargers in the pool. @@ -157,62 +107,3 @@ def power(self) -> Formula[Power]: self._pool_ref_store.component_ids ), ) - - @property - def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: - """Get a receiver to receive new power status reports when they change. - - These include - - the current inclusion/exclusion bounds available for the pool's priority, - - the current target power for the pool's set of batteries, - - the result of the last distribution request for the pool's set of batteries. - - Returns: - A receiver that will stream power status reports for the pool's priority. - """ - sub = _power_managing.ReportRequest( - source_id=self._source_id, - priority=self._priority, - component_ids=self._pool_ref_store.component_ids, - ) - self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( - asyncio.create_task( - self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) - ) - ) - channel = self._pool_ref_store.channel_registry.get_or_create( - _power_managing._Report, # pylint: disable=protected-access - sub.get_channel_name(), - ) - channel.resend_latest = True - - return channel - - @property - def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: - """Get a receiver to receive power distribution results. - - Returns: - A receiver that will stream power distribution results for the pool's set of - EV chargers. - """ - return MappingReceiverFetcher( - self._pool_ref_store.power_distribution_results_fetcher, - lambda recv: recv.filter( - lambda x: x.request.component_ids == self._pool_ref_store.component_ids - ), - ) - - async def stop(self) -> None: - """Stop all tasks and channels owned by the EVChargerPool.""" - # This was closing the pool_ref_store, which is not correct, because those are - # shared. - # - # This method will do until we have a mechanism to track the resources created - # through it. It can also eventually cleanup the pool_ref_store, when it is - # holding the last reference to it. - - @property - def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: - """Return a receiver fetcher for the system power bounds.""" - return self._pool_ref_store.bounds_channel diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py index 7e4d4c27b..eb9461675 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py @@ -3,25 +3,17 @@ """Manages shared state/tasks for a set of EV chargers.""" -import asyncio import uuid -from collections import abc +from typing import Type -from frequenz.channels import Broadcast, Receiver, Sender -from frequenz.client.common.microgrid.components import ComponentId -from frequenz.client.microgrid.component import EvCharger +from frequenz.client.microgrid.component import Component, EvCharger +from typing_extensions import override -from ..._internal._channels import ChannelRegistry, ReceiverFetcher -from ...microgrid import connection_manager -from ...microgrid._data_sourcing import ComponentMetricRequest -from ...microgrid._power_distributing import ComponentPoolStatus, Result -from ...microgrid._power_managing._base_classes import Proposal, ReportRequest -from .._base_types import SystemBounds -from ..formulas._formula_pool import FormulaPool +from ..component_pool._component_pool_reference_store import ComponentPoolReferenceStore from ._system_bounds_tracker import EVCSystemBoundsTracker -class EVChargerPoolReferenceStore: +class EVChargerPoolReferenceStore(ComponentPoolReferenceStore): """A class for maintaining the shared state/tasks for a set of pool of EV chargers. This includes ownership of @@ -34,85 +26,32 @@ class EVChargerPoolReferenceStore: They are exposed through the EVChargerPool class. """ - def __init__( # pylint: disable=too-many-arguments - self, - *, - channel_registry: ChannelRegistry, - resampler_subscription_sender: Sender[ComponentMetricRequest], - status_receiver: Receiver[ComponentPoolStatus], - power_manager_requests_sender: Sender[Proposal], - power_manager_bounds_subs_sender: Sender[ReportRequest], - power_distribution_results_fetcher: ReceiverFetcher[Result], - component_ids: abc.Set[ComponentId] | None = None, - ): - """Create an instance of the class. - - Args: - channel_registry: A channel registry instance shared with the resampling - actor. - resampler_subscription_sender: A sender for sending metric requests to the - resampling actor. - status_receiver: A receiver that streams the status of the EV Chargers in - the pool. - power_manager_requests_sender: A Channel sender for sending power - requests to the power managing actor. - power_manager_bounds_subs_sender: A Channel sender for sending power bounds - subscription requests to the power managing actor. - power_distribution_results_fetcher: A ReceiverFetcher for the results from - the power distributing actor. - component_ids: An optional list of component_ids belonging to this pool. If - not specified, IDs of all EV Chargers in the microgrid will be fetched - from the component graph. - - Raises: - ValueError: If any of the specified component_ids are not EV chargers - or are unknown to the component graph. - """ - self.channel_registry = channel_registry - self.resampler_subscription_sender = resampler_subscription_sender - self.status_receiver = status_receiver - self.power_manager_requests_sender = power_manager_requests_sender - self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender - self.power_distribution_results_fetcher = power_distribution_results_fetcher - - graph = connection_manager.get().component_graph - all_ev_chargers = frozenset( - {evc.id for evc in graph.components(matching_types=EvCharger)} - ) - - if component_ids is not None: - self.component_ids: frozenset[ComponentId] = frozenset(component_ids) - if not self.component_ids.issubset(all_ev_chargers): - unknown_ids = self.component_ids - all_ev_chargers - raise ValueError( - "Unable to create an EVChargerPool. These component IDs are either " - + f"not EV chargers or are unknown: {unknown_ids}" - ) - else: - self.component_ids = all_ev_chargers - - self.power_bounds_subs: dict[str, asyncio.Task[None]] = {} - - self.namespace: str = f"ev-charger-pool-{uuid.uuid4()}" - self.formula_pool = FormulaPool( - self.namespace, - self.channel_registry, - self.resampler_subscription_sender, - ) - - self.bounds_channel: Broadcast[SystemBounds] = Broadcast( - name=f"System Bounds for EV Chargers: {component_ids}", - resend_latest=True, - ) - self.bounds_tracker: EVCSystemBoundsTracker = EVCSystemBoundsTracker( + @staticmethod + def get_component_class() -> Type[Component]: + """Class of the component type.""" + return EvCharger + + @staticmethod + def get_pool_type_name() -> str: + """Name of the pool type, for display purposes.""" + return "EVChargerPool" + + @staticmethod + def get_component_type_name_plural() -> str: + """Name of the component type, for display purposes.""" + return "EV chargers" + + @override + def get_namespace(self) -> str: + """Namespace to use with the data pipeline.""" + return f"ev-charger-pool-{uuid.uuid4()}" + + @override + def create_bounds_tracker(self) -> None: + """Create the bounds tracker for the pool.""" + self.bounds_tracker = EVCSystemBoundsTracker( self.component_ids, self.status_receiver, self.bounds_channel.new_sender(), ) self.bounds_tracker.start() - - async def stop(self) -> None: - """Stop all tasks and channels owned by the EVChargerPool.""" - await self.formula_pool.stop() - await self.bounds_tracker.stop() - self.status_receiver.close() diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py index 32d1c90f1..4e88ddddc 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py @@ -7,10 +7,11 @@ from frequenz.quantities import Power -from .._base_types import Bounds +from .. import Bounds +from ..component_pool._component_pool_report import ComponentPoolReport -class EVChargerPoolReport(typing.Protocol): +class EVChargerPoolReport(ComponentPoolReport, typing.Protocol): """A status report for an EV chargers pool.""" @property diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index 6bbb91e15..ba4237a34 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -3,20 +3,14 @@ """Interactions with pools of PV inverters.""" -import asyncio -import uuid -from collections import abc - -from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power +from typing_extensions import override from frequenz.sdk.microgrid import connection_manager -from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher -from ...microgrid import _power_distributing, _power_managing from ...timeseries import Bounds -from .._base_types import SystemBounds -from ..formulas._formula import Formula +from ..component_pool import ComponentPool +from ..formulas import Formula from ._pv_pool_reference_store import PVPoolReferenceStore from ._result_types import PVPoolReport @@ -25,7 +19,7 @@ class PVPoolError(Exception): """An error that occurred in any of the PVPool methods.""" -class PVPool: +class PVPool(ComponentPool[PVPoolReferenceStore, PVPoolReport]): """An interface for interaction with pools of PV inverters. Provides: @@ -33,30 +27,7 @@ class PVPool: measurements of the PV inverters in the pool. """ - def __init__( # pylint: disable=too-many-arguments - self, - *, - pool_ref_store: PVPoolReferenceStore, - name: str | None, - priority: int, - ) -> None: - """Initialize the instance. - - !!! note - `PVPool` instances are not meant to be created directly by users. Use the - [`microgrid.new_pv_pool`][frequenz.sdk.microgrid.new_pv_pool] method for - creating `PVPool` instances. - - Args: - pool_ref_store: The reference store for the PV pool. - name: The name of the PV pool. - priority: The priority of the PV pool. - """ - self._pool_ref_store = pool_ref_store - unique_id = uuid.uuid4() - self._source_id = str(unique_id) if name is None else f"{name}-{unique_id}" - self._priority = priority - + @override async def propose_power( self, power: Power | None, @@ -77,7 +48,7 @@ async def propose_power( Args: power: The power to propose for the PV inverters in the pool. If `None`, this proposal will not have any effect on the target power, unless - bounds are specified. When speficied without bounds, bounds for lower + bounds are specified. When specified without bounds, bounds for lower priority actors will be shifted by this power. If both are `None`, it is equivalent to not having a proposal or withdrawing a previous one. bounds: The power bounds for the proposal. When specified, this will limit @@ -88,27 +59,10 @@ async def propose_power( """ if power is not None and power > Power.zero(): raise PVPoolError("Charge powers for PV inverters is not supported.") - await self._pool_ref_store.power_manager_requests_sender.send( - _power_managing.Proposal( - source_id=self._source_id, - preferred_power=power, - bounds=bounds, - component_ids=self._pool_ref_store.component_ids, - priority=self._priority, - creation_time=asyncio.get_running_loop().time(), - ) - ) - - @property - def component_ids(self) -> abc.Set[ComponentId]: - """Return component IDs of all PV inverters managed by this PVPool. - - Returns: - Set of managed component IDs. - """ - return self._pool_ref_store.component_ids + await super().propose_power(power, bounds=bounds) @property + @override def power(self) -> Formula[Power]: """Fetch the total power for the PV Inverters in the pool. @@ -130,62 +84,3 @@ def power(self) -> Formula[Power]: self._pool_ref_store.component_ids ), ) - - @property - def power_status(self) -> ReceiverFetcher[PVPoolReport]: - """Get a receiver to receive new power status reports when they change. - - These include - - the current inclusion/exclusion bounds available for the pool's priority, - - the current target power for the pool's set of batteries, - - the result of the last distribution request for the pool's set of batteries. - - Returns: - A receiver that will stream power status reports for the pool's priority. - """ - sub = _power_managing.ReportRequest( - source_id=self._source_id, - priority=self._priority, - component_ids=self._pool_ref_store.component_ids, - ) - self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( - asyncio.create_task( - self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) - ) - ) - channel = self._pool_ref_store.channel_registry.get_or_create( - _power_managing._Report, # pylint: disable=protected-access - sub.get_channel_name(), - ) - channel.resend_latest = True - - return channel - - @property - def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: - """Get a receiver to receive power distribution results. - - Returns: - A receiver that will stream power distribution results for the pool's set of - PV inverters. - """ - return MappingReceiverFetcher( - self._pool_ref_store.power_distribution_results_fetcher, - lambda recv: recv.filter( - lambda x: x.request.component_ids == self._pool_ref_store.component_ids - ), - ) - - async def stop(self) -> None: - """Stop all tasks and channels owned by the PVPool.""" - # This was closing the pool_ref_store, which is not correct, because those are - # shared. - # - # This method will do until we have a mechanism to track the resources created - # through it. It can also eventually cleanup the pool_ref_store, when it is - # holding the last reference to it. - - @property - def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: - """Return a receiver fetcher for the system power bounds.""" - return self._pool_ref_store.bounds_channel diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py index 8d6b920d3..58f11c6df 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py @@ -3,25 +3,17 @@ """Manages shared state/tasks for a set of PV inverters.""" -import asyncio import uuid -from collections import abc +from typing import Type -from frequenz.channels import Broadcast, Receiver, Sender -from frequenz.client.common.microgrid.components import ComponentId -from frequenz.client.microgrid.component import SolarInverter +from frequenz.client.microgrid.component import Component, SolarInverter +from typing_extensions import override -from ..._internal._channels import ChannelRegistry, ReceiverFetcher -from ...microgrid import connection_manager -from ...microgrid._data_sourcing import ComponentMetricRequest -from ...microgrid._power_distributing import ComponentPoolStatus, Result -from ...microgrid._power_managing._base_classes import Proposal, ReportRequest -from .._base_types import SystemBounds -from ..formulas._formula_pool import FormulaPool +from ..component_pool._component_pool_reference_store import ComponentPoolReferenceStore from ._system_bounds_tracker import PVSystemBoundsTracker -class PVPoolReferenceStore: +class PVPoolReferenceStore(ComponentPoolReferenceStore): """A class for maintaining the shared state/tasks for a set of pool of PV inverters. This includes ownership of @@ -34,78 +26,29 @@ class PVPoolReferenceStore: They are exposed through the PVPool class. """ - def __init__( # pylint: disable=too-many-arguments - self, - *, - channel_registry: ChannelRegistry, - resampler_subscription_sender: Sender[ComponentMetricRequest], - status_receiver: Receiver[ComponentPoolStatus], - power_manager_requests_sender: Sender[Proposal], - power_manager_bounds_subs_sender: Sender[ReportRequest], - power_distribution_results_fetcher: ReceiverFetcher[Result], - component_ids: abc.Set[ComponentId] | None = None, - ): - """Initialize this instance. - - Args: - channel_registry: A channel registry instance shared with the resampling - actor. - resampler_subscription_sender: A sender for sending metric requests to the - resampling actor. - status_receiver: A receiver that streams the status of the PV inverters in - the pool. - power_manager_requests_sender: A Channel sender for sending power - requests to the power managing actor. - power_manager_bounds_subs_sender: A Channel sender for sending power bounds - subscription requests to the power managing actor. - power_distribution_results_fetcher: A ReceiverFetcher for the results from - the power distributing actor. - component_ids: An optional list of component_ids belonging to this pool. If - not specified, IDs of all PV inverters in the microgrid will be fetched - from the component graph. - - Raises: - ValueError: If any of the provided component_ids are not PV inverters or - are unknown to the component graph. - """ - self.channel_registry = channel_registry - self.resampler_subscription_sender = resampler_subscription_sender - self.status_receiver = status_receiver - self.power_manager_requests_sender = power_manager_requests_sender - self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender - self.power_distribution_results_fetcher = power_distribution_results_fetcher - - graph = connection_manager.get().component_graph - all_solar_inverters = frozenset( - {inv.id for inv in graph.components(matching_types=SolarInverter)} - ) - - if component_ids is not None: - self.component_ids: frozenset[ComponentId] = frozenset(component_ids) - if not self.component_ids.issubset(all_solar_inverters): - unknown_ids = self.component_ids - all_solar_inverters - raise ValueError( - "Unable to create a PVPool. These component IDs are either " - + "not PV inverters or are unknown: " - + f"{unknown_ids}" - ) - else: - self.component_ids = all_solar_inverters - - self.power_bounds_subs: dict[str, asyncio.Task[None]] = {} - - self.namespace: str = f"pv-pool-{uuid.uuid4()}" - self.formula_pool = FormulaPool( - self.namespace, - self.channel_registry, - self.resampler_subscription_sender, - ) - self.bounds_channel: Broadcast[SystemBounds] = Broadcast( - name=f"System Bounds for PV inverters: {component_ids}", - resend_latest=True, - ) - - self.bounds_tracker: PVSystemBoundsTracker | None = None + @staticmethod + def get_component_class() -> Type[Component]: + """Class of the component type.""" + return SolarInverter + + @staticmethod + def get_pool_type_name() -> str: + """Name of the pool type, for display purposes.""" + return "PVPool" + + @staticmethod + def get_component_type_name_plural() -> str: + """Name of the component type, for display purposes.""" + return "PV inverters" + + @override + def get_namespace(self) -> str: + """Namespace to use with the data pipeline.""" + return f"pv-pool-{uuid.uuid4()}" + + @override + def create_bounds_tracker(self) -> None: + """Create the bounds tracker for the pool.""" # In locations without PV inverters, the bounds tracker will not be started. if self.component_ids: self.bounds_tracker = PVSystemBoundsTracker( @@ -114,10 +57,3 @@ def __init__( # pylint: disable=too-many-arguments self.bounds_channel.new_sender(), ) self.bounds_tracker.start() - - async def stop(self) -> None: - """Stop all tasks and channels owned by the PVInverterPool.""" - await self.formula_pool.stop() - if self.bounds_tracker is not None: - await self.bounds_tracker.stop() - self.status_receiver.close() diff --git a/src/frequenz/sdk/timeseries/pv_pool/_result_types.py b/src/frequenz/sdk/timeseries/pv_pool/_result_types.py index da8ddb2f4..bfad0158f 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_result_types.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_result_types.py @@ -8,9 +8,10 @@ from frequenz.quantities import Power from .._base_types import Bounds +from ..component_pool._component_pool_report import ComponentPoolReport -class PVPoolReport(typing.Protocol): +class PVPoolReport(ComponentPoolReport, typing.Protocol): """A status report for a PV pool.""" @property diff --git a/tests/microgrid/test_datapipeline.py b/tests/microgrid/test_datapipeline.py index 8142a0281..343b8a7ba 100644 --- a/tests/microgrid/test_datapipeline.py +++ b/tests/microgrid/test_datapipeline.py @@ -99,7 +99,7 @@ async def test_actors_started( with pytest.raises( ValueError, match=re.escape( - "Unable to create a BatteryPool. These component IDs are either not " + "Unable to create BatteryPool. These component IDs are either not " + "batteries or are unknown: frozenset({ComponentId(4)})" ), ): @@ -108,7 +108,7 @@ async def test_actors_started( with pytest.raises( ValueError, match=re.escape( - "Unable to create a PVPool. These component IDs are either not PV " + "Unable to create PVPool. These component IDs are either not PV " + "inverters or are unknown: frozenset({ComponentId(1)})" ), ): @@ -117,7 +117,7 @@ async def test_actors_started( with pytest.raises( ValueError, match=re.escape( - "Unable to create an EVChargerPool. These component IDs are either " + "Unable to create EVChargerPool. These component IDs are either " + "not EV chargers or are unknown: frozenset({ComponentId(4)})" ), ): diff --git a/tests/timeseries/_battery_pool/test_battery_pool.py b/tests/timeseries/_battery_pool/test_battery_pool.py index 489cb6746..39b4a09c3 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool.py +++ b/tests/timeseries/_battery_pool/test_battery_pool.py @@ -902,9 +902,7 @@ async def run_power_bounds_test( # pylint: disable=too-many-locals sampling_rate=0.1, ) - # pylint: disable=protected-access - receiver = battery_pool._system_power_bounds.new_receiver(limit=50) - # pylint: enable=protected-access + receiver = battery_pool.system_power_bounds.new_receiver(limit=50) # First metrics delivers slower because of the startup delay in the pool. msg = await asyncio.wait_for( @@ -1248,7 +1246,7 @@ async def test_power_status_same_instance_subscriptions_work( power_manager_bounds_subscription_sender=requests_channel.new_sender(), power_distribution_results_fetcher=MagicMock(), min_update_interval=timedelta(seconds=1), - batteries_id=component_ids, + component_ids=component_ids, ), name="battery-pool", priority=5,