Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
Added SteamBoilerPool, which allows users to control a pool of steam boiler components.

## Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies = [
# Make sure to update the mkdocs.yml file when
# changing the version
# (plugins.mkdocstrings.handlers.python.import)
"frequenz-client-microgrid >= 0.18.1, < 0.19.0",
"frequenz-client-microgrid >= 0.18.3, < 0.19.0",
"frequenz-microgrid-component-graph >= 0.4, < 0.5",
"frequenz-client-common >= 0.3.6, < 0.4.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
Expand Down
1 change: 1 addition & 0 deletions src/frequenz/sdk/microgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ async def initialize(
"new_battery_pool",
"new_ev_charger_pool",
"new_pv_pool",
"new_steam_boiler_pool",
"producer",
"voltage_per_phase",
]
139 changes: 138 additions & 1 deletion src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@

from frequenz.channels import Broadcast, Sender
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter
from frequenz.client.microgrid.component import (
Battery,
EvCharger,
SolarInverter,
SteamBoiler,
)

from .._internal._channels import ChannelRegistry
from ..actor._actor import Actor
from ..timeseries import ResamplerConfig
from ..timeseries._voltage_streamer import VoltageStreamer
from ..timeseries.steam_boiler_pool import SteamBoilerPool
from ..timeseries.steam_boiler_pool._steam_boiler_pool_reference_store import (
SteamBoilerPoolReferenceStore,
)
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
from ._power_managing._base_classes import DefaultPower, PowerManagerAlgorithm
from ._power_wrapper import PowerWrapper
Expand Down Expand Up @@ -127,6 +136,13 @@ def __init__(
# https://github.com/frequenz-floss/frequenz-sdk-python/issues/1285
component_class=SolarInverter,
)
self._steam_boiler_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA,
default_power=DefaultPower.MAX,
component_class=SteamBoiler,
)

self._logical_meter: LogicalMeter | None = None
self._consumer: Consumer | None = None
Expand All @@ -141,6 +157,9 @@ def __init__(
self._pv_pool_reference_stores: dict[
frozenset[ComponentId], PVPoolReferenceStore
] = {}
self._steam_boiler_pool_reference_stores: dict[
frozenset[ComponentId], SteamBoilerPoolReferenceStore
] = {}
self._frequency_instance: GridFrequency | None = None
self._voltage_instance: VoltageStreamer | None = None

Expand Down Expand Up @@ -447,6 +466,82 @@ def new_battery_pool(
priority=priority,
)

def new_steam_boiler_pool(
self,
*,
priority: int,
component_ids: abc.Set[ComponentId] | None = None,
name: str | None = None,
) -> SteamBoilerPool:
"""Return the corresponding SteamBoilerPool instance for the given ids.

If a SteamBoilerPool instance for the given ids doesn't exist, a new one is
created and returned.

Args:
priority: The priority of the actor making the call.
component_ids: Optional set of IDs of steam boilers to be managed by the
SteamBoilerPool.
name: An optional name used to identify this instance of the pool or a
corresponding actor in the logs.

Returns:
A SteamBoilerPool instance.
"""
from ..timeseries.steam_boiler_pool import SteamBoilerPool
from ..timeseries.steam_boiler_pool._steam_boiler_pool_reference_store import (
SteamBoilerPoolReferenceStore,
)

if not self._steam_boiler_power_wrapper.started:
self._steam_boiler_power_wrapper.start()

# We use frozenset to make a hashable key from the input set.
ref_store_key: frozenset[ComponentId] = frozenset()
if component_ids is not None:
ref_store_key = frozenset(component_ids)

pool_key = f"{ref_store_key}-{priority}"
if pool_key in self._known_pool_keys:
_logger.warning(
"A SteamBoilerPool instance was already created for steam_boiler_ids=%s "
"and priority=%s using `microgrid.steam_boiler_pool(...)`."
"\n Hint: If the multiple instances are created from the same actor, "
"consider reusing the same instance."
"\n Hint: If the instances are created from different actors, "
"consider using different priorities to distinguish them.",
component_ids,
priority,
)
else:
self._known_pool_keys.add(pool_key)

if ref_store_key not in self._steam_boiler_pool_reference_stores:
self._steam_boiler_pool_reference_stores[ref_store_key] = (
SteamBoilerPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
status_receiver=self._steam_boiler_power_wrapper.status_channel.new_receiver(
limit=1
),
power_manager_requests_sender=(
self._steam_boiler_power_wrapper.proposal_channel.new_sender()
),
power_manager_bounds_subs_sender=(
self._steam_boiler_power_wrapper.bounds_subscription_channel.new_sender()
),
power_distribution_results_fetcher=(
self._steam_boiler_power_wrapper.distribution_results_fetcher()
),
component_ids=component_ids,
)
)
return SteamBoilerPool(
pool_ref_store=self._steam_boiler_pool_reference_stores[ref_store_key],
name=name,
priority=priority,
)

def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
"""Return a Sender for sending requests to the data sourcing actor.

Expand Down Expand Up @@ -503,12 +598,15 @@ async def _stop(self) -> None:
await self._battery_power_wrapper.stop()
await self._ev_power_wrapper.stop()
await self._pv_power_wrapper.stop()
await self._steam_boiler_power_wrapper.stop()
for pool in self._battery_pool_reference_stores.values():
await pool.stop()
for evpool in self._ev_charger_pool_reference_stores.values():
await evpool.stop()
for pvpool in self._pv_pool_reference_stores.values():
await pvpool.stop()
for steam_boiler_pool in self._steam_boiler_pool_reference_stores.values():
await steam_boiler_pool.stop()


_DATA_PIPELINE: _DataPipeline | None = None
Expand Down Expand Up @@ -682,6 +780,45 @@ def new_pv_pool(
return _get().new_pv_pool(priority=priority, component_ids=component_ids, name=name)


def new_steam_boiler_pool(
*,
priority: int,
component_ids: abc.Set[ComponentId] | None = None,
name: str | None = None,
) -> SteamBoilerPool:
"""Return a new `SteamBoilerPool` instance for the given parameters.

The priority value is used to resolve conflicts when multiple actors are trying to
propose different power values for the same set of steam boilers.

!!! note
When specifying priority, bigger values indicate higher priority.

It is recommended to reuse the same instance of the `SteamBoilerPool` within the same
actor, unless they are managing different sets of steam boilers.

In deployments with multiple actors managing the same set of steam boilers, it is
recommended to use different priorities to distinguish between them. If not,
a random prioritization will be imposed on them to resolve conflicts, which may
lead to unexpected behavior like longer duration to converge on the desired
power.

Args:
priority: The priority of the actor making the call.
component_ids: Optional set of IDs of steam boilers to be managed by the
`SteamBoilerPool`. If not specified, all steam boilers available in the component
graph are used.
name: An optional name used to identify this instance of the pool or a
corresponding actor in the logs.

Returns:
A `SteamBoilerPool` instance.
"""
return _get().new_steam_boiler_pool(
priority=priority, component_ids=component_ids, name=name
)


def grid() -> Grid:
"""Return the grid measuring point."""
return _get().grid()
Expand Down
9 changes: 7 additions & 2 deletions src/frequenz/sdk/microgrid/_power_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
from datetime import timedelta

from frequenz.channels import Broadcast
from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter
from frequenz.client.microgrid.component import (
Battery,
EvCharger,
SolarInverter,
SteamBoiler,
)

from .._internal._channels import ChannelRegistry, ReceiverFetcher

Expand Down Expand Up @@ -40,7 +45,7 @@ def __init__( # pylint: disable=too-many-arguments
api_power_request_timeout: timedelta,
power_manager_algorithm: PowerManagerAlgorithm,
default_power: DefaultPower,
component_class: type[Battery | EvCharger | SolarInverter],
component_class: type[Battery | EvCharger | SolarInverter, SteamBoiler],
):
"""Initialize the power control.

Expand Down
13 changes: 13 additions & 0 deletions src/frequenz/sdk/timeseries/steam_boiler_pool/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Interactions with pools of steam boilers."""

from ._result_types import SteamBoilerPoolReport
from ._steam_boiler_pool import SteamBoilerPool, SteamBoilerPoolError

__all__ = [
"SteamBoilerPool",
"SteamBoilerPoolError",
"SteamBoilerPoolReport",
]
27 changes: 27 additions & 0 deletions src/frequenz/sdk/timeseries/steam_boiler_pool/_result_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Types for exposing steam boiler pool reports."""

import typing

from frequenz.quantities import Power

from .. import Bounds
from ..component_pool._component_pool_report import ComponentPoolReport


class SteamBoilerPoolReport(ComponentPoolReport, typing.Protocol):
"""A status report for a steam boiler pool."""

@property
def target_power(self) -> Power | None:
"""The currently set power for the steam boilers."""

@property
def bounds(self) -> Bounds[Power] | None:
"""The usable bounds for the steam boilers.

These bounds are adjusted to any restrictions placed by actors with higher
priorities.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Interactions with pools of steam boilers."""

from frequenz.quantities import Power
from typing_extensions import override

from ...microgrid import connection_manager
from ..component_pool import ComponentPool
from ..formulas import Formula
from ._result_types import SteamBoilerPoolReport
from ._steam_boiler_pool_reference_store import SteamBoilerPoolReferenceStore


class SteamBoilerPoolError(Exception):
"""An error that occurred in any of the SteamBoilerPool methods."""


class SteamBoilerPool(
ComponentPool[SteamBoilerPoolReferenceStore, SteamBoilerPoolReport]
):
"""An interface for interaction with pools of steam boilers."""

@property
@override
def power(self) -> Formula[Power]:
"""Fetch the total power for the steam boilers in the pool.

This formula produces values that are in the Passive Sign Convention (PSC).

If a formula to calculate steam boiler power is not already running, it
will be started.

A receiver from the formula can be created using the `new_receiver`
method.

Returns:
A Formula that will calculate and stream the total power of all steam boilers.
"""
return self._pool_ref_store.formula_pool.from_power_formula(
"steam_boiler_power",
connection_manager.get().component_graph.steam_boiler_formula(
self._pool_ref_store.component_ids
),
)
Comment thread
simonvoelcker marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Manages shared state/tasks for a set of steam boilers."""

import uuid
from typing import Type

from frequenz.client.microgrid.component import Component, SteamBoiler
from typing_extensions import override

from ..component_pool._component_pool_reference_store import ComponentPoolReferenceStore


class SteamBoilerPoolReferenceStore(ComponentPoolReferenceStore):
"""A class for maintaining the shared state/tasks for a set of pools of steam boilers.

This includes ownership of
- the formula pool and metric calculators.
- the tasks for calculating system bounds for the steam boilers.

These are independent of the priority of the actors and can be shared between
multiple users of the same set of steam boilers.

They are exposed through the SteamBoilerPool class.
"""

@staticmethod
def get_component_class() -> Type[Component]:
"""Class of the component type."""
return SteamBoiler

@staticmethod
def get_pool_type_name() -> str:
"""Name of the pool type, for display purposes."""
return "SteamBoilerPool"

@staticmethod
def get_component_type_name_plural() -> str:
"""Name of the component type, for display purposes."""
return "steam boilers"

@override
def get_namespace(self) -> str:
"""Namespace to use with the data pipeline."""
return f"steam-boiler-pool-{uuid.uuid4()}"

@override
def create_bounds_tracker(self) -> None:
"""Create the bounds tracker for the pool."""
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@llucax @shsms do we need a bounds tracker for steam boilers? Asking because I don't know what bounds trackers are used for at the moment, and also because the existing ones use dataclasses from this module that openly states it should be removed in the near future. Can either of you shine some light on this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shsms knows better.

4 changes: 4 additions & 0 deletions tests/timeseries/_steam_boiler_pool/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Test the steam boiler pool control methods."""
Loading
Loading