diff --git a/CHANGELOG.md b/CHANGELOG.md index f78503cf2..f004372b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,34 @@ If upgrading from v2.x, see the [v3.0.0 release notes](https://github.com/flixOp Until here --> +## [3.5.0] - 2025-11-06 + +**Summary**: Improve representations and improve resampling + +If upgrading from v2.x, see the [v3.0.0 release notes](https://github.com/flixOpt/flixOpt/releases/tag/v3.0.0) and [Migration Guide](https://flixopt.github.io/flixopt/latest/user-guide/migration-guide-v3/). + +### ✨ Added +- Added options to resample and select subsets of flowsystems without converting to and from Dataset each time. Use the new methods `FlowSystem.__dataset_resample()`, `FlowSystem.__dataset_sel()` and `FlowSystem.__dataset_isel()`. All of them expect and return a dataset. + +### 💥 Breaking Changes + +### ♻️ Changed +- Truncate repr of FlowSystem and CalculationResults to only show the first 10 items of each category +- Greatly sped up the resampling of a FlowSystem again + +--- + +## [3.4.1] - 2025-11-04 + +**Summary**: Speed up resampling by 20-40 times. + +If upgrading from v2.x, see the [v3.0.0 release notes](https://github.com/flixOpt/flixOpt/releases/tag/v3.0.0) and [Migration Guide](https://flixopt.github.io/flixopt/latest/user-guide/migration-guide-v3/). + +### ♻️ Changed +- Greatly sped up the resampling of a FlowSystem (x20 - x40) by converting to dataarray internally + +--- + ## [3.4.0] - 2025-11-01 **Summary**: Enhanced solver configuration with new CONFIG.Solving section for centralized solver parameter management. diff --git a/flixopt/__init__.py b/flixopt/__init__.py index 3633d86a1..b40855905 100644 --- a/flixopt/__init__.py +++ b/flixopt/__init__.py @@ -41,6 +41,20 @@ solvers, ) +# Type system for dimension-aware type hints +from .types import ( + Bool_PS, + Bool_S, + Bool_TPS, + Effect_PS, + Effect_S, + Effect_TPS, + Numeric_PS, + Numeric_S, + Numeric_TPS, + Scalar, +) + # === Runtime warning suppression for third-party libraries === # These warnings are from dependencies and cannot be fixed by end users. # They are suppressed at runtime to provide a cleaner user experience. diff --git a/flixopt/calculation.py b/flixopt/calculation.py index 5de2c8870..1125da401 100644 --- a/flixopt/calculation.py +++ b/flixopt/calculation.py @@ -26,7 +26,7 @@ from .aggregation import Aggregation, AggregationModel, AggregationParameters from .components import Storage from .config import CONFIG -from .core import DataConverter, Scalar, TimeSeriesData, drop_constant_arrays +from .core import DataConverter, TimeSeriesData, drop_constant_arrays from .features import InvestmentModel from .flow_system import FlowSystem from .results import CalculationResults, SegmentedCalculationResults @@ -103,7 +103,7 @@ def __init__( self._modeled = False @property - def main_results(self) -> dict[str, Scalar | dict]: + def main_results(self) -> dict[str, int | float | dict]: from flixopt.features import InvestmentModel main_results = { diff --git a/flixopt/components.py b/flixopt/components.py index e4209c8ac..6a5abfc4e 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -12,7 +12,7 @@ import xarray as xr from . import io as fx_io -from .core import PeriodicDataUser, PlausibilityError, TemporalData, TemporalDataUser +from .core import PlausibilityError from .elements import Component, ComponentModel, Flow from .features import InvestmentModel, PiecewiseModel from .interface import InvestParameters, OnOffParameters, PiecewiseConversion @@ -23,6 +23,7 @@ import linopy from .flow_system import FlowSystem + from .types import Numeric_PS, Numeric_TPS logger = logging.getLogger('flixopt') @@ -169,7 +170,7 @@ def __init__( inputs: list[Flow], outputs: list[Flow], on_off_parameters: OnOffParameters | None = None, - conversion_factors: list[dict[str, TemporalDataUser]] | None = None, + conversion_factors: list[dict[str, Numeric_TPS]] | None = None, piecewise_conversion: PiecewiseConversion | None = None, meta_data: dict | None = None, ): @@ -386,17 +387,17 @@ def __init__( label: str, charging: Flow, discharging: Flow, - capacity_in_flow_hours: PeriodicDataUser | InvestParameters, - relative_minimum_charge_state: TemporalDataUser = 0, - relative_maximum_charge_state: TemporalDataUser = 1, - initial_charge_state: PeriodicDataUser | Literal['lastValueOfSim'] = 0, - minimal_final_charge_state: PeriodicDataUser | None = None, - maximal_final_charge_state: PeriodicDataUser | None = None, - relative_minimum_final_charge_state: PeriodicDataUser | None = None, - relative_maximum_final_charge_state: PeriodicDataUser | None = None, - eta_charge: TemporalDataUser = 1, - eta_discharge: TemporalDataUser = 1, - relative_loss_per_hour: TemporalDataUser = 0, + capacity_in_flow_hours: Numeric_PS | InvestParameters, + relative_minimum_charge_state: Numeric_TPS = 0, + relative_maximum_charge_state: Numeric_TPS = 1, + initial_charge_state: Numeric_PS | Literal['lastValueOfSim'] = 0, + minimal_final_charge_state: Numeric_PS | None = None, + maximal_final_charge_state: Numeric_PS | None = None, + relative_minimum_final_charge_state: Numeric_PS | None = None, + relative_maximum_final_charge_state: Numeric_PS | None = None, + eta_charge: Numeric_TPS = 1, + eta_discharge: Numeric_TPS = 1, + relative_loss_per_hour: Numeric_TPS = 0, prevent_simultaneous_charge_and_discharge: bool = True, balanced: bool = False, meta_data: dict | None = None, @@ -413,8 +414,8 @@ def __init__( self.charging = charging self.discharging = discharging self.capacity_in_flow_hours = capacity_in_flow_hours - self.relative_minimum_charge_state: TemporalDataUser = relative_minimum_charge_state - self.relative_maximum_charge_state: TemporalDataUser = relative_maximum_charge_state + self.relative_minimum_charge_state: Numeric_TPS = relative_minimum_charge_state + self.relative_maximum_charge_state: Numeric_TPS = relative_maximum_charge_state self.relative_minimum_final_charge_state = relative_minimum_final_charge_state self.relative_maximum_final_charge_state = relative_maximum_final_charge_state @@ -423,9 +424,9 @@ def __init__( self.minimal_final_charge_state = minimal_final_charge_state self.maximal_final_charge_state = maximal_final_charge_state - self.eta_charge: TemporalDataUser = eta_charge - self.eta_discharge: TemporalDataUser = eta_discharge - self.relative_loss_per_hour: TemporalDataUser = relative_loss_per_hour + self.eta_charge: Numeric_TPS = eta_charge + self.eta_discharge: Numeric_TPS = eta_discharge + self.relative_loss_per_hour: Numeric_TPS = relative_loss_per_hour self.prevent_simultaneous_charge_and_discharge = prevent_simultaneous_charge_and_discharge self.balanced = balanced @@ -663,8 +664,8 @@ def __init__( out1: Flow, in2: Flow | None = None, out2: Flow | None = None, - relative_losses: TemporalDataUser | None = None, - absolute_losses: TemporalDataUser | None = None, + relative_losses: Numeric_TPS | None = None, + absolute_losses: Numeric_TPS | None = None, on_off_parameters: OnOffParameters = None, prevent_simultaneous_flows_in_both_directions: bool = True, balanced: bool = False, @@ -916,7 +917,7 @@ def _initial_and_final_charge_state(self): ) @property - def _absolute_charge_state_bounds(self) -> tuple[TemporalData, TemporalData]: + def _absolute_charge_state_bounds(self) -> tuple[xr.DataArray, xr.DataArray]: relative_lower_bound, relative_upper_bound = self._relative_charge_state_bounds if not isinstance(self.element.capacity_in_flow_hours, InvestParameters): return ( diff --git a/flixopt/core.py b/flixopt/core.py index 917ee2984..0d70e255b 100644 --- a/flixopt/core.py +++ b/flixopt/core.py @@ -12,16 +12,9 @@ import pandas as pd import xarray as xr -logger = logging.getLogger('flixopt') - -Scalar = int | float -"""A single number, either integer or float.""" - -PeriodicDataUser = int | float | np.integer | np.floating | np.ndarray | pd.Series | pd.DataFrame | xr.DataArray -"""User data which has no time dimension. Internally converted to a Scalar or an xr.DataArray without a time dimension.""" +from .types import NumericOrBool -PeriodicData = xr.DataArray -"""Internally used datatypes for periodic data.""" +logger = logging.getLogger('flixopt') FlowSystemDimensions = Literal['time', 'period', 'scenario'] """Possible dimensions of a FlowSystem.""" @@ -150,15 +143,6 @@ def agg_weight(self): return self.aggregation_weight -TemporalDataUser = ( - int | float | np.integer | np.floating | np.ndarray | pd.Series | pd.DataFrame | xr.DataArray | TimeSeriesData -) -"""User data which might have a time dimension. Internally converted to an xr.DataArray with time dimension.""" - -TemporalData = xr.DataArray | TimeSeriesData -"""Internally used datatypes for temporal data (data with a time dimension).""" - - class DataConverter: """ Converts various data types into xarray.DataArray with specified target coordinates. @@ -405,16 +389,7 @@ def _broadcast_dataarray_to_target_specification( @classmethod def to_dataarray( cls, - data: int - | float - | bool - | np.integer - | np.floating - | np.bool_ - | np.ndarray - | pd.Series - | pd.DataFrame - | xr.DataArray, + data: NumericOrBool, coords: dict[str, pd.Index] | None = None, ) -> xr.DataArray: """ @@ -637,9 +612,3 @@ def drop_constant_arrays(ds: xr.Dataset, dim: str = 'time', drop_arrays_without_ ) return ds.drop_vars(drop_vars) - - -# Backward compatibility aliases -# TODO: Needed? -NonTemporalDataUser = PeriodicDataUser -NonTemporalData = PeriodicData diff --git a/flixopt/effects.py b/flixopt/effects.py index 8d8efbf4c..d428cccd2 100644 --- a/flixopt/effects.py +++ b/flixopt/effects.py @@ -16,8 +16,6 @@ import numpy as np import xarray as xr -from . import io as fx_io -from .core import PeriodicDataUser, Scalar, TemporalData, TemporalDataUser from .features import ShareAllocationModel from .structure import Element, ElementContainer, ElementModel, FlowSystemModel, Submodel, register_class_for_io @@ -25,6 +23,7 @@ from collections.abc import Iterator from .flow_system import FlowSystem + from .types import Effect_PS, Effect_TPS, Numeric_PS, Numeric_TPS, Scalar logger = logging.getLogger('flixopt') @@ -52,17 +51,27 @@ class Effect(Element): is_objective: If True, this effect serves as the optimization objective function. Only one effect can be marked as objective per optimization. share_from_temporal: Temporal cross-effect contributions. - Maps temporal contributions from other effects to this effect + Maps temporal contributions from other effects to this effect. + Type: `Effect_TPS` (single value or dict with dimensions [Time, Period, Scenario]) share_from_periodic: Periodic cross-effect contributions. Maps periodic contributions from other effects to this effect. + Type: `Effect_PS` (single value or dict with dimensions [Period, Scenario]) minimum_temporal: Minimum allowed total contribution across all timesteps. + Type: `Numeric_PS` (sum over time, can vary by period/scenario) maximum_temporal: Maximum allowed total contribution across all timesteps. + Type: `Numeric_PS` (sum over time, can vary by period/scenario) minimum_per_hour: Minimum allowed contribution per hour. + Type: `Numeric_TPS` (per-timestep constraint, can vary by period) maximum_per_hour: Maximum allowed contribution per hour. + Type: `Numeric_TPS` (per-timestep constraint, can vary by period) minimum_periodic: Minimum allowed total periodic contribution. + Type: `Numeric_PS` (periodic constraint) maximum_periodic: Maximum allowed total periodic contribution. + Type: `Numeric_PS` (periodic constraint) minimum_total: Minimum allowed total effect (temporal + periodic combined). + Type: `Numeric_PS` (total constraint per period) maximum_total: Maximum allowed total effect (temporal + periodic combined). + Type: `Numeric_PS` (total constraint per period) meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. @@ -170,16 +179,16 @@ def __init__( meta_data: dict | None = None, is_standard: bool = False, is_objective: bool = False, - share_from_temporal: TemporalEffectsUser | None = None, - share_from_periodic: PeriodicEffectsUser | None = None, - minimum_temporal: PeriodicEffectsUser | None = None, - maximum_temporal: PeriodicEffectsUser | None = None, - minimum_periodic: PeriodicEffectsUser | None = None, - maximum_periodic: PeriodicEffectsUser | None = None, - minimum_per_hour: TemporalDataUser | None = None, - maximum_per_hour: TemporalDataUser | None = None, - minimum_total: Scalar | None = None, - maximum_total: Scalar | None = None, + share_from_temporal: Effect_TPS | None = None, + share_from_periodic: Effect_PS | None = None, + minimum_temporal: Numeric_PS | None = None, + maximum_temporal: Numeric_PS | None = None, + minimum_periodic: Numeric_PS | None = None, + maximum_periodic: Numeric_PS | None = None, + minimum_per_hour: Numeric_TPS | None = None, + maximum_per_hour: Numeric_TPS | None = None, + minimum_total: Numeric_PS | None = None, + maximum_total: Numeric_PS | None = None, **kwargs, ): super().__init__(label, meta_data=meta_data) @@ -187,8 +196,8 @@ def __init__( self.description = description self.is_standard = is_standard self.is_objective = is_objective - self.share_from_temporal: TemporalEffectsUser = share_from_temporal if share_from_temporal is not None else {} - self.share_from_periodic: PeriodicEffectsUser = share_from_periodic if share_from_periodic is not None else {} + self.share_from_temporal: Effect_TPS = share_from_temporal if share_from_temporal is not None else {} + self.share_from_periodic: Effect_PS = share_from_periodic if share_from_periodic is not None else {} # Handle backwards compatibility for deprecated parameters using centralized helper minimum_temporal = self._handle_deprecated_kwarg( @@ -436,18 +445,6 @@ def _do_modeling(self): ) -TemporalEffectsUser = TemporalDataUser | dict[str, TemporalDataUser] # User-specified Shares to Effects -""" This datatype is used to define a temporal share to an effect by a certain attribute. """ - -PeriodicEffectsUser = PeriodicDataUser | dict[str, PeriodicDataUser] # User-specified Shares to Effects -""" This datatype is used to define a scalar share to an effect by a certain attribute. """ - -TemporalEffects = dict[str, TemporalData] # User-specified Shares to Effects -""" This datatype is used internally to handle temporal shares to an effect. """ - -PeriodicEffects = dict[str, Scalar] -""" This datatype is used internally to handle scalar shares to an effect. """ - EffectExpr = dict[str, linopy.LinearExpression] # Used to create Shares @@ -458,8 +455,15 @@ class EffectCollection(ElementContainer[Effect]): submodel: EffectCollectionModel | None - def __init__(self, *effects: Effect): - super().__init__(element_type_name='effects') + def __init__(self, *effects: Effect, truncate_repr: int | None = None): + """ + Initialize the EffectCollection. + + Args: + *effects: Effects to register in the collection. + truncate_repr: Maximum number of items to show in repr. If None, show all items. Default: None + """ + super().__init__(element_type_name='effects', truncate_repr=truncate_repr) self._standard_effect: Effect | None = None self._objective_effect: Effect | None = None @@ -482,9 +486,7 @@ def add_effects(self, *effects: Effect) -> None: self.add(effect) # Use the inherited add() method from ElementContainer logger.info(f'Registered new Effect: {effect.label}') - def create_effect_values_dict( - self, effect_values_user: PeriodicEffectsUser | TemporalEffectsUser - ) -> dict[str, Scalar | TemporalDataUser] | None: + def create_effect_values_dict(self, effect_values_user: Numeric_TPS | Effect_TPS | None) -> Effect_TPS | None: """Converts effect values into a dictionary. If a scalar is provided, it is associated with a default effect type. Examples: @@ -844,8 +846,3 @@ def tuples_to_adjacency_list(edges: list[tuple[str, str]]) -> dict[str, list[str graph[target] = [] return graph - - -# Backward compatibility aliases -NonTemporalEffectsUser = PeriodicEffectsUser -NonTemporalEffects = PeriodicEffects diff --git a/flixopt/elements.py b/flixopt/elements.py index 337f34fce..224cc0f9c 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -13,7 +13,7 @@ from . import io as fx_io from .config import CONFIG -from .core import PlausibilityError, Scalar, TemporalData, TemporalDataUser +from .core import PlausibilityError from .features import InvestmentModel, OnOffModel from .interface import InvestParameters, OnOffParameters from .modeling import BoundingPatterns, ModelingPrimitives, ModelingUtilitiesAbstract @@ -22,8 +22,19 @@ if TYPE_CHECKING: import linopy - from .effects import TemporalEffectsUser from .flow_system import FlowSystem + from .types import ( + Bool_PS, + Bool_S, + Bool_TPS, + Effect_PS, + Effect_S, + Effect_TPS, + Numeric_PS, + Numeric_S, + Numeric_TPS, + Scalar, + ) logger = logging.getLogger('flixopt') @@ -228,7 +239,7 @@ class Bus(Element): def __init__( self, label: str, - excess_penalty_per_flow_hour: TemporalDataUser | None = 1e5, + excess_penalty_per_flow_hour: Numeric_TPS | None = 1e5, meta_data: dict | None = None, ): super().__init__(label, meta_data=meta_data) @@ -419,16 +430,16 @@ def __init__( self, label: str, bus: str, - size: Scalar | InvestParameters = None, - fixed_relative_profile: TemporalDataUser | None = None, - relative_minimum: TemporalDataUser = 0, - relative_maximum: TemporalDataUser = 1, - effects_per_flow_hour: TemporalEffectsUser | None = None, + size: Numeric_PS | InvestParameters = None, + fixed_relative_profile: Numeric_TPS | None = None, + relative_minimum: Numeric_TPS = 0, + relative_maximum: Numeric_TPS = 1, + effects_per_flow_hour: Effect_TPS | Numeric_TPS | None = None, on_off_parameters: OnOffParameters | None = None, - flow_hours_total_max: Scalar | None = None, - flow_hours_total_min: Scalar | None = None, - load_factor_min: Scalar | None = None, - load_factor_max: Scalar | None = None, + flow_hours_total_max: Numeric_PS | None = None, + flow_hours_total_min: Numeric_PS | None = None, + load_factor_min: Numeric_PS | None = None, + load_factor_max: Numeric_PS | None = None, previous_flow_rate: Scalar | list[Scalar] | None = None, meta_data: dict | None = None, ): @@ -716,13 +727,13 @@ def _create_bounds_for_load_factor(self): ) @property - def relative_flow_rate_bounds(self) -> tuple[TemporalData, TemporalData]: + def relative_flow_rate_bounds(self) -> tuple[xr.DataArray, xr.DataArray]: if self.element.fixed_relative_profile is not None: return self.element.fixed_relative_profile, self.element.fixed_relative_profile return self.element.relative_minimum, self.element.relative_maximum @property - def absolute_flow_rate_bounds(self) -> tuple[TemporalData, TemporalData]: + def absolute_flow_rate_bounds(self) -> tuple[xr.DataArray, xr.DataArray]: """ Returns the absolute bounds the flow_rate can reach. Further constraining might be needed @@ -765,7 +776,7 @@ def investment(self) -> InvestmentModel | None: return self.submodels['investment'] @property - def previous_states(self) -> TemporalData | None: + def previous_states(self) -> xr.DataArray | None: """Previous states of the flow rate""" # TODO: This would be nicer to handle in the Flow itself, and allow DataArrays as well. previous_flow_rate = self.element.previous_flow_rate diff --git a/flixopt/features.py b/flixopt/features.py index 0d1fc7784..b00ccc547 100644 --- a/flixopt/features.py +++ b/flixopt/features.py @@ -15,8 +15,9 @@ from .structure import FlowSystemModel, Submodel if TYPE_CHECKING: - from .core import FlowSystemDimensions, Scalar, TemporalData + from .core import FlowSystemDimensions from .interface import InvestParameters, OnOffParameters, Piecewise + from .types import Numeric_PS, Numeric_TPS logger = logging.getLogger('flixopt') @@ -153,7 +154,7 @@ def __init__( label_of_element: str, parameters: OnOffParameters, on_variable: linopy.Variable, - previous_states: TemporalData | None, + previous_states: Numeric_TPS | None, label_of_model: str | None = None, ): """ @@ -517,10 +518,10 @@ def __init__( dims: list[FlowSystemDimensions], label_of_element: str | None = None, label_of_model: str | None = None, - total_max: Scalar | None = None, - total_min: Scalar | None = None, - max_per_hour: TemporalData | None = None, - min_per_hour: TemporalData | None = None, + total_max: Numeric_PS | None = None, + total_min: Numeric_PS | None = None, + max_per_hour: Numeric_TPS | None = None, + min_per_hour: Numeric_TPS | None = None, ): if 'time' not in dims and (max_per_hour is not None or min_per_hour is not None): raise ValueError('Both max_per_hour and min_per_hour cannot be used when has_time_dim is False') diff --git a/flixopt/flow_system.py b/flixopt/flow_system.py index 9bc7f7f99..081359076 100644 --- a/flixopt/flow_system.py +++ b/flixopt/flow_system.py @@ -6,6 +6,7 @@ import logging import warnings +from collections import defaultdict from itertools import chain from typing import TYPE_CHECKING, Any, Literal, Optional @@ -19,20 +20,9 @@ ConversionError, DataConverter, FlowSystemDimensions, - PeriodicData, - PeriodicDataUser, - TemporalData, - TemporalDataUser, TimeSeriesData, ) -from .effects import ( - Effect, - EffectCollection, - PeriodicEffects, - PeriodicEffectsUser, - TemporalEffects, - TemporalEffectsUser, -) +from .effects import Effect, EffectCollection from .elements import Bus, Component, Flow from .structure import CompositeContainerMixin, Element, ElementContainer, FlowSystemModel, Interface @@ -42,6 +32,8 @@ import pyvis + from .types import Bool_TPS, Effect_TPS, Numeric_PS, Numeric_TPS, NumericOrBool + logger = logging.getLogger('flixopt') @@ -57,11 +49,10 @@ class FlowSystem(Interface, CompositeContainerMixin[Element]): timesteps: The timesteps of the model. periods: The periods of the model. scenarios: The scenarios of the model. - hours_of_last_timestep: The duration of the last time step. Uses the last time interval if not specified - hours_of_previous_timesteps: The duration of previous timesteps. - If None, the first time increment of time_series is used. - This is needed to calculate previous durations (for example consecutive_on_hours). - If you use an array, take care that its long enough to cover all previous values! + hours_of_last_timestep: Duration of the last timestep. If None, computed from the last time interval. + hours_of_previous_timesteps: Duration of previous timesteps. If None, computed from the first time interval. + Can be a scalar (all previous timesteps have same duration) or array (different durations). + Used to calculate previous values (e.g., consecutive_on_hours). weights: The weights of each period and scenario. If None, all scenarios have the same weight (normalized to 1). Its recommended to normalize the weights to sum up to 1. scenario_independent_sizes: Controls whether investment sizes are equalized across scenarios. @@ -120,6 +111,22 @@ class FlowSystem(Interface, CompositeContainerMixin[Element]): ... print(f'{bus.label}') >>> >>> # Flows are automatically collected from all components + + Power user pattern - Efficient chaining without conversion overhead: + + >>> # Instead of chaining (causes multiple conversions): + >>> result = flow_system.sel(time='2020-01').resample('2h') # Slow + >>> + >>> # Use dataset methods directly (single conversion): + >>> ds = flow_system.to_dataset() + >>> ds = FlowSystem._dataset_sel(ds, time='2020-01') + >>> ds = flow_system._dataset_resample(ds, freq='2h', method='mean') + >>> result = FlowSystem.from_dataset(ds) # Fast! + >>> + >>> # Available dataset methods: + >>> # - FlowSystem._dataset_sel(dataset, time=..., period=..., scenario=...) + >>> # - FlowSystem._dataset_isel(dataset, time=..., period=..., scenario=...) + >>> # - flow_system._dataset_resample(dataset, freq=..., method=..., **kwargs) >>> for flow in flow_system.flows.values(): ... print(f'{flow.label_full}: {flow.size}') >>> @@ -152,31 +159,33 @@ def __init__( scenarios: pd.Index | None = None, hours_of_last_timestep: int | float | None = None, hours_of_previous_timesteps: int | float | np.ndarray | None = None, - weights: PeriodicDataUser | None = None, + weights: Numeric_PS | None = None, scenario_independent_sizes: bool | list[str] = True, scenario_independent_flow_rates: bool | list[str] = False, ): self.timesteps = self._validate_timesteps(timesteps) - self.timesteps_extra = self._create_timesteps_with_extra(self.timesteps, hours_of_last_timestep) - self.hours_of_previous_timesteps = self._calculate_hours_of_previous_timesteps( - self.timesteps, hours_of_previous_timesteps - ) + + # Compute all time-related metadata using shared helper + ( + self.timesteps_extra, + self.hours_of_last_timestep, + self.hours_of_previous_timesteps, + hours_per_timestep, + ) = self._compute_time_metadata(self.timesteps, hours_of_last_timestep, hours_of_previous_timesteps) self.periods = None if periods is None else self._validate_periods(periods) self.scenarios = None if scenarios is None else self._validate_scenarios(scenarios) self.weights = weights - hours_per_timestep = self.calculate_hours_per_timestep(self.timesteps_extra) - - self.hours_of_last_timestep = hours_per_timestep[-1].item() - self.hours_per_timestep = self.fit_to_model_coords('hours_per_timestep', hours_per_timestep) # Element collections - self.components: ElementContainer[Component] = ElementContainer(element_type_name='components') - self.buses: ElementContainer[Bus] = ElementContainer(element_type_name='buses') - self.effects: EffectCollection = EffectCollection() + self.components: ElementContainer[Component] = ElementContainer( + element_type_name='components', truncate_repr=10 + ) + self.buses: ElementContainer[Bus] = ElementContainer(element_type_name='buses', truncate_repr=10) + self.effects: EffectCollection = EffectCollection(truncate_repr=10) self.model: FlowSystemModel | None = None self._connected_and_transformed = False @@ -271,6 +280,87 @@ def _calculate_hours_of_previous_timesteps( first_interval = timesteps[1] - timesteps[0] return first_interval.total_seconds() / 3600 # Convert to hours + @classmethod + def _compute_time_metadata( + cls, + timesteps: pd.DatetimeIndex, + hours_of_last_timestep: int | float | None = None, + hours_of_previous_timesteps: int | float | np.ndarray | None = None, + ) -> tuple[pd.DatetimeIndex, float, float | np.ndarray, xr.DataArray]: + """ + Compute all time-related metadata from timesteps. + + This is the single source of truth for time metadata computation, used by both + __init__ and dataset operations (sel/isel/resample) to ensure consistency. + + Args: + timesteps: The time index to compute metadata from + hours_of_last_timestep: Duration of the last timestep. If None, computed from the time index. + hours_of_previous_timesteps: Duration of previous timesteps. If None, computed from the time index. + Can be a scalar or array. + + Returns: + Tuple of (timesteps_extra, hours_of_last_timestep, hours_of_previous_timesteps, hours_per_timestep) + """ + # Create timesteps with extra step at the end + timesteps_extra = cls._create_timesteps_with_extra(timesteps, hours_of_last_timestep) + + # Calculate hours per timestep + hours_per_timestep = cls.calculate_hours_per_timestep(timesteps_extra) + + # Extract hours_of_last_timestep if not provided + if hours_of_last_timestep is None: + hours_of_last_timestep = hours_per_timestep.isel(time=-1).item() + + # Compute hours_of_previous_timesteps (handles both None and provided cases) + hours_of_previous_timesteps = cls._calculate_hours_of_previous_timesteps(timesteps, hours_of_previous_timesteps) + + return timesteps_extra, hours_of_last_timestep, hours_of_previous_timesteps, hours_per_timestep + + @classmethod + def _update_time_metadata( + cls, + dataset: xr.Dataset, + hours_of_last_timestep: int | float | None = None, + hours_of_previous_timesteps: int | float | np.ndarray | None = None, + ) -> xr.Dataset: + """ + Update time-related attributes and data variables in dataset based on its time index. + + Recomputes hours_of_last_timestep, hours_of_previous_timesteps, and hours_per_timestep + from the dataset's time index when these parameters are None. This ensures time metadata + stays synchronized with the actual timesteps after operations like resampling or selection. + + Args: + dataset: Dataset to update (will be modified in place) + hours_of_last_timestep: Duration of the last timestep. If None, computed from the time index. + hours_of_previous_timesteps: Duration of previous timesteps. If None, computed from the time index. + Can be a scalar or array. + + Returns: + The same dataset with updated time-related attributes and data variables + """ + new_time_index = dataset.indexes.get('time') + if new_time_index is not None and len(new_time_index) >= 2: + # Use shared helper to compute all time metadata + _, hours_of_last_timestep, hours_of_previous_timesteps, hours_per_timestep = cls._compute_time_metadata( + new_time_index, hours_of_last_timestep, hours_of_previous_timesteps + ) + + # Update hours_per_timestep DataArray if it exists in the dataset + # This prevents stale data after resampling operations + if 'hours_per_timestep' in dataset.data_vars: + dataset['hours_per_timestep'] = hours_per_timestep + + # Update time-related attributes only when new values are provided/computed + # This preserves existing metadata instead of overwriting with None + if hours_of_last_timestep is not None: + dataset.attrs['hours_of_last_timestep'] = hours_of_last_timestep + if hours_of_previous_timesteps is not None: + dataset.attrs['hours_of_previous_timesteps'] = hours_of_previous_timesteps + + return dataset + def _create_reference_structure(self) -> tuple[dict, dict[str, xr.DataArray]]: """ Override Interface method to handle FlowSystem-specific serialization. @@ -433,15 +523,15 @@ def to_json(self, path: str | pathlib.Path): def fit_to_model_coords( self, name: str, - data: TemporalDataUser | PeriodicDataUser | None, + data: NumericOrBool | None, dims: Collection[FlowSystemDimensions] | None = None, - ) -> TemporalData | PeriodicData | None: + ) -> xr.DataArray | None: """ Fit data to model coordinate system (currently time, but extensible). Args: name: Name of the data - data: Data to fit to model coordinates + data: Data to fit to model coordinates (accepts any dimensionality including scalars) dims: Collection of dimension names to use for fitting. If None, all dimensions are used. Returns: @@ -473,11 +563,11 @@ def fit_to_model_coords( def fit_effects_to_model_coords( self, label_prefix: str | None, - effect_values: TemporalEffectsUser | PeriodicEffectsUser | None, + effect_values: Effect_TPS | Numeric_TPS | None, label_suffix: str | None = None, dims: Collection[FlowSystemDimensions] | None = None, delimiter: str = '|', - ) -> TemporalEffects | PeriodicEffects | None: + ) -> Effect_TPS | None: """ Transform EffectValues from the user to Internal Datatypes aligned with model coordinates. """ @@ -798,7 +888,7 @@ def flows(self) -> ElementContainer[Flow]: flows = [f for c in self.components.values() for f in c.inputs + c.outputs] # Deduplicate by id and sort for reproducibility flows = sorted({id(f): f for f in flows}.values(), key=lambda f: f.label_full.lower()) - self._flows_cache = ElementContainer(flows, element_type_name='flows') + self._flows_cache = ElementContainer(flows, element_type_name='flows', truncate_repr=10) return self._flows_cache @property @@ -908,6 +998,63 @@ def scenario_independent_flow_rates(self, value: bool | list[str]) -> None: self._validate_scenario_parameter(value, 'scenario_independent_flow_rates', 'Flow.label_full') self._scenario_independent_flow_rates = value + @classmethod + def _dataset_sel( + cls, + dataset: xr.Dataset, + time: str | slice | list[str] | pd.Timestamp | pd.DatetimeIndex | None = None, + period: int | slice | list[int] | pd.Index | None = None, + scenario: str | slice | list[str] | pd.Index | None = None, + hours_of_last_timestep: int | float | None = None, + hours_of_previous_timesteps: int | float | np.ndarray | None = None, + ) -> xr.Dataset: + """ + Select subset of dataset by label (for power users to avoid conversion overhead). + + This method operates directly on xarray Datasets, allowing power users to chain + operations efficiently without repeated FlowSystem conversions: + + Example: + # Power user pattern (single conversion): + >>> ds = flow_system.to_dataset() + >>> ds = FlowSystem._dataset_sel(ds, time='2020-01') + >>> ds = FlowSystem._dataset_resample(ds, freq='2h', method='mean') + >>> result = FlowSystem.from_dataset(ds) + + # vs. simple pattern (multiple conversions): + >>> result = flow_system.sel(time='2020-01').resample('2h') + + Args: + dataset: xarray Dataset from FlowSystem.to_dataset() + time: Time selection (e.g., '2020-01', slice('2020-01-01', '2020-06-30')) + period: Period selection (e.g., 2020, slice(2020, 2022)) + scenario: Scenario selection (e.g., 'Base Case', ['Base Case', 'High Demand']) + hours_of_last_timestep: Duration of the last timestep. If None, computed from the selected time index. + hours_of_previous_timesteps: Duration of previous timesteps. If None, computed from the selected time index. + Can be a scalar or array. + + Returns: + xr.Dataset: Selected dataset + """ + indexers = {} + if time is not None: + indexers['time'] = time + if period is not None: + indexers['period'] = period + if scenario is not None: + indexers['scenario'] = scenario + + if not indexers: + return dataset + + result = dataset.sel(**indexers) + + # Update time-related attributes if time was selected + if 'time' in indexers: + result = cls._update_time_metadata(result, hours_of_last_timestep, hours_of_previous_timesteps) + + return result + def sel( self, time: str | slice | list[str] | pd.Timestamp | pd.DatetimeIndex | None = None, @@ -915,22 +1062,56 @@ def sel( scenario: str | slice | list[str] | pd.Index | None = None, ) -> FlowSystem: """ - Select a subset of the flowsystem by the time coordinate. + Select a subset of the flowsystem by label. + + For power users: Use FlowSystem._dataset_sel() to chain operations on datasets + without conversion overhead. See _dataset_sel() documentation. Args: - time: Time selection (e.g., slice('2023-01-01', '2023-12-31'), '2023-06-15', or list of times) + time: Time selection (e.g., slice('2023-01-01', '2023-12-31'), '2023-06-15') period: Period selection (e.g., slice(2023, 2024), or list of periods) - scenario: Scenario selection (e.g., slice('scenario1', 'scenario2'), or list of scenarios) + scenario: Scenario selection (e.g., 'scenario1', or list of scenarios) Returns: FlowSystem: New FlowSystem with selected data """ + if time is None and period is None and scenario is None: + return self.copy() + if not self.connected_and_transformed: self.connect_and_transform() ds = self.to_dataset() + ds = self._dataset_sel(ds, time=time, period=period, scenario=scenario) + return self.__class__.from_dataset(ds) - # Build indexers dict from non-None parameters + @classmethod + def _dataset_isel( + cls, + dataset: xr.Dataset, + time: int | slice | list[int] | None = None, + period: int | slice | list[int] | None = None, + scenario: int | slice | list[int] | None = None, + hours_of_last_timestep: int | float | None = None, + hours_of_previous_timesteps: int | float | np.ndarray | None = None, + ) -> xr.Dataset: + """ + Select subset of dataset by integer index (for power users to avoid conversion overhead). + + See _dataset_sel() for usage pattern. + + Args: + dataset: xarray Dataset from FlowSystem.to_dataset() + time: Time selection by index (e.g., slice(0, 100), [0, 5, 10]) + period: Period selection by index + scenario: Scenario selection by index + hours_of_last_timestep: Duration of the last timestep. If None, computed from the selected time index. + hours_of_previous_timesteps: Duration of previous timesteps. If None, computed from the selected time index. + Can be a scalar or array. + + Returns: + xr.Dataset: Selected dataset + """ indexers = {} if time is not None: indexers['time'] = time @@ -940,10 +1121,15 @@ def sel( indexers['scenario'] = scenario if not indexers: - return self.copy() # Return a copy when no selection + return dataset + + result = dataset.isel(**indexers) - selected_dataset = ds.sel(**indexers) - return self.__class__.from_dataset(selected_dataset) + # Update time-related attributes if time was selected + if 'time' in indexers: + result = cls._update_time_metadata(result, hours_of_last_timestep, hours_of_previous_timesteps) + + return result def isel( self, @@ -954,6 +1140,9 @@ def isel( """ Select a subset of the flowsystem by integer indices. + For power users: Use FlowSystem._dataset_isel() to chain operations on datasets + without conversion overhead. See _dataset_sel() documentation. + Args: time: Time selection by integer index (e.g., slice(0, 100), 50, or [0, 5, 10]) period: Period selection by integer index (e.g., slice(0, 100), 50, or [0, 5, 10]) @@ -962,25 +1151,158 @@ def isel( Returns: FlowSystem: New FlowSystem with selected data """ + if time is None and period is None and scenario is None: + return self.copy() + if not self.connected_and_transformed: self.connect_and_transform() ds = self.to_dataset() + ds = self._dataset_isel(ds, time=time, period=period, scenario=scenario) + return self.__class__.from_dataset(ds) - # Build indexers dict from non-None parameters - indexers = {} - if time is not None: - indexers['time'] = time - if period is not None: - indexers['period'] = period - if scenario is not None: - indexers['scenario'] = scenario + @classmethod + def _resample_by_dimension_groups( + cls, + time_dataset: xr.Dataset, + time: str, + method: str, + **kwargs: Any, + ) -> xr.Dataset: + """ + Resample variables grouped by their dimension structure to avoid broadcasting. - if not indexers: - return self.copy() # Return a copy when no selection + This method groups variables by their non-time dimensions before resampling, + which provides two key benefits: + + 1. **Performance**: Resampling many variables with the same dimensions together + is significantly faster than resampling each variable individually. + + 2. **Safety**: Prevents xarray from broadcasting variables with different + dimensions into a larger dimensional space filled with NaNs, which would + cause memory bloat and computational inefficiency. + + Example: + Without grouping (problematic): + var1: (time, location, tech) shape (8000, 10, 2) + var2: (time, region) shape (8000, 5) + concat → (variable, time, location, tech, region) ← Unwanted broadcasting! + + With grouping (safe and fast): + Group 1: [var1, var3, ...] with dims (time, location, tech) + Group 2: [var2, var4, ...] with dims (time, region) + Each group resampled separately → No broadcasting, optimal performance! + + Args: + time_dataset: Dataset containing only variables with time dimension + time: Resampling frequency (e.g., '2h', '1D', '1M') + method: Resampling method name (e.g., 'mean', 'sum', 'first') + **kwargs: Additional arguments passed to xarray.resample() - selected_dataset = ds.isel(**indexers) - return self.__class__.from_dataset(selected_dataset) + Returns: + Resampled dataset with original dimension structure preserved + """ + # Group variables by dimensions (excluding time) + dim_groups = defaultdict(list) + for var_name, var in time_dataset.data_vars.items(): + dims_key = tuple(sorted(d for d in var.dims if d != 'time')) + dim_groups[dims_key].append(var_name) + + # Handle empty case: no time-dependent variables + if not dim_groups: + return getattr(time_dataset.resample(time=time, **kwargs), method)() + + # Resample each group separately using DataArray concat (faster) + resampled_groups = [] + for var_names in dim_groups.values(): + # Skip empty groups + if not var_names: + continue + + # Concat variables into a single DataArray with 'variable' dimension + # Use combine_attrs='drop_conflicts' to handle attribute conflicts + stacked = xr.concat( + [time_dataset[name] for name in var_names], + dim=pd.Index(var_names, name='variable'), + combine_attrs='drop_conflicts', + ) + + # Resample the DataArray (faster than resampling Dataset) + resampled = getattr(stacked.resample(time=time, **kwargs), method)() + + # Convert back to Dataset using the 'variable' dimension + resampled_dataset = resampled.to_dataset(dim='variable') + resampled_groups.append(resampled_dataset) + + # Merge all resampled groups, handling empty list case + if not resampled_groups: + return time_dataset # Return empty dataset as-is + + if len(resampled_groups) == 1: + return resampled_groups[0] + + # Merge multiple groups with combine_attrs to avoid conflicts + return xr.merge(resampled_groups, combine_attrs='drop_conflicts') + + @classmethod + def _dataset_resample( + cls, + dataset: xr.Dataset, + freq: str, + method: Literal['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median', 'count'] = 'mean', + hours_of_last_timestep: int | float | None = None, + hours_of_previous_timesteps: int | float | np.ndarray | None = None, + **kwargs: Any, + ) -> xr.Dataset: + """ + Resample dataset along time dimension (for power users to avoid conversion overhead). + Preserves only the attrs of the Dataset. + + Uses optimized _resample_by_dimension_groups() to avoid broadcasting issues. + See _dataset_sel() for usage pattern. + + Args: + dataset: xarray Dataset from FlowSystem.to_dataset() + freq: Resampling frequency (e.g., '2h', '1D', '1M') + method: Resampling method (e.g., 'mean', 'sum', 'first') + hours_of_last_timestep: Duration of the last timestep after resampling. If None, computed from the last time interval. + hours_of_previous_timesteps: Duration of previous timesteps after resampling. If None, computed from the first time interval. + Can be a scalar or array. + **kwargs: Additional arguments passed to xarray.resample() + + Returns: + xr.Dataset: Resampled dataset + """ + # Validate method + available_methods = ['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median', 'count'] + if method not in available_methods: + raise ValueError(f'Unsupported resampling method: {method}. Available: {available_methods}') + + # Preserve original dataset attributes (especially the reference structure) + original_attrs = dict(dataset.attrs) + + # Separate time and non-time variables + time_var_names = [v for v in dataset.data_vars if 'time' in dataset[v].dims] + non_time_var_names = [v for v in dataset.data_vars if v not in time_var_names] + + # Only resample variables that have time dimension + time_dataset = dataset[time_var_names] + + # Resample with dimension grouping to avoid broadcasting + resampled_time_dataset = cls._resample_by_dimension_groups(time_dataset, freq, method, **kwargs) + + # Combine resampled time variables with non-time variables + if non_time_var_names: + non_time_dataset = dataset[non_time_var_names] + result = xr.merge([resampled_time_dataset, non_time_dataset]) + else: + result = resampled_time_dataset + + # Restore original attributes (xr.merge can drop them) + result.attrs.update(original_attrs) + + # Update time-related attributes based on new time index + return cls._update_time_metadata(result, hours_of_last_timestep, hours_of_previous_timesteps) def resample( self, @@ -994,11 +1316,15 @@ def resample( Create a resampled FlowSystem by resampling data along the time dimension (like xr.Dataset.resample()). Only resamples data variables that have a time dimension. + For power users: Use FlowSystem._dataset_resample() to chain operations on datasets + without conversion overhead. See _dataset_sel() documentation. + Args: time: Resampling frequency (e.g., '3h', '2D', '1M') method: Resampling method. Recommended: 'mean', 'first', 'last', 'max', 'min' - hours_of_last_timestep: New duration of the last time step. Defaults to the last time interval of the new timesteps - hours_of_previous_timesteps: New duration of the previous timestep. Defaults to the first time increment of the new timesteps + hours_of_last_timestep: Duration of the last timestep after resampling. If None, computed from the last time interval. + hours_of_previous_timesteps: Duration of previous timesteps after resampling. If None, computed from the first time interval. + Can be a scalar or array. **kwargs: Additional arguments passed to xarray.resample() Returns: @@ -1007,40 +1333,16 @@ def resample( if not self.connected_and_transformed: self.connect_and_transform() - dataset = self.to_dataset() - - # Separate variables with and without time dimension - time_vars = {} - non_time_vars = {} - - for var_name, var in dataset.data_vars.items(): - if 'time' in var.dims: - time_vars[var_name] = var - else: - non_time_vars[var_name] = var - - # Only resample variables that have time dimension - time_dataset = dataset[list(time_vars.keys())] - resampler = time_dataset.resample(time=time, **kwargs) - - if hasattr(resampler, method): - resampled_time_data = getattr(resampler, method)() - else: - available_methods = ['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median', 'count'] - raise ValueError(f'Unsupported resampling method: {method}. Available: {available_methods}') - - # Combine resampled time variables with non-time variables - if non_time_vars: - non_time_dataset = dataset[list(non_time_vars.keys())] - resampled_dataset = xr.merge([resampled_time_data, non_time_dataset]) - else: - resampled_dataset = resampled_time_data - - # Let FlowSystem recalculate or use explicitly set value - resampled_dataset.attrs['hours_of_last_timestep'] = hours_of_last_timestep - resampled_dataset.attrs['hours_of_previous_timesteps'] = hours_of_previous_timesteps - - return self.__class__.from_dataset(resampled_dataset) + ds = self.to_dataset() + ds = self._dataset_resample( + ds, + freq=time, + method=method, + hours_of_last_timestep=hours_of_last_timestep, + hours_of_previous_timesteps=hours_of_previous_timesteps, + **kwargs, + ) + return self.__class__.from_dataset(ds) @property def connected_and_transformed(self) -> bool: diff --git a/flixopt/interface.py b/flixopt/interface.py index 21cbc82b9..bc7adabbc 100644 --- a/flixopt/interface.py +++ b/flixopt/interface.py @@ -19,9 +19,8 @@ if TYPE_CHECKING: # for type checking and preventing circular imports from collections.abc import Iterator - from .core import PeriodicData, PeriodicDataUser, Scalar, TemporalDataUser - from .effects import PeriodicEffectsUser, TemporalEffectsUser from .flow_system import FlowSystem + from .types import Effect_PS, Effect_TPS, Numeric_PS, Numeric_TPS logger = logging.getLogger('flixopt') @@ -73,7 +72,7 @@ class Piece(Interface): """ - def __init__(self, start: TemporalDataUser, end: TemporalDataUser): + def __init__(self, start: Numeric_TPS, end: Numeric_TPS): self.start = start self.end = end self.has_time_dim = False @@ -874,15 +873,15 @@ class InvestParameters(Interface): def __init__( self, - fixed_size: PeriodicDataUser | None = None, - minimum_size: PeriodicDataUser | None = None, - maximum_size: PeriodicDataUser | None = None, + fixed_size: Numeric_PS | None = None, + minimum_size: Numeric_PS | None = None, + maximum_size: Numeric_PS | None = None, mandatory: bool = False, - effects_of_investment: PeriodicEffectsUser | None = None, - effects_of_investment_per_size: PeriodicEffectsUser | None = None, - effects_of_retirement: PeriodicEffectsUser | None = None, + effects_of_investment: Effect_PS | Numeric_PS | None = None, + effects_of_investment_per_size: Effect_PS | Numeric_PS | None = None, + effects_of_retirement: Effect_PS | Numeric_PS | None = None, piecewise_effects_of_investment: PiecewiseEffects | None = None, - linked_periods: PeriodicDataUser | tuple[int, int] | None = None, + linked_periods: Numeric_PS | tuple[int, int] | None = None, **kwargs, ): # Handle deprecated parameters using centralized helper @@ -912,15 +911,11 @@ def __init__( # Validate any remaining unexpected kwargs self._validate_kwargs(kwargs) - self.effects_of_investment: PeriodicEffectsUser = ( - effects_of_investment if effects_of_investment is not None else {} - ) - self.effects_of_retirement: PeriodicEffectsUser = ( - effects_of_retirement if effects_of_retirement is not None else {} - ) + self.effects_of_investment = effects_of_investment if effects_of_investment is not None else {} + self.effects_of_retirement = effects_of_retirement if effects_of_retirement is not None else {} self.fixed_size = fixed_size self.mandatory = mandatory - self.effects_of_investment_per_size: PeriodicEffectsUser = ( + self.effects_of_investment_per_size = ( effects_of_investment_per_size if effects_of_investment_per_size is not None else {} ) self.piecewise_effects_of_investment = piecewise_effects_of_investment @@ -1004,7 +999,7 @@ def optional(self, value: bool): self.mandatory = not value @property - def fix_effects(self) -> PeriodicEffectsUser: + def fix_effects(self) -> Effect_PS | Numeric_PS: """Deprecated property. Use effects_of_investment instead.""" warnings.warn( 'The fix_effects property is deprecated. Use effects_of_investment instead.', @@ -1014,7 +1009,7 @@ def fix_effects(self) -> PeriodicEffectsUser: return self.effects_of_investment @property - def specific_effects(self) -> PeriodicEffectsUser: + def specific_effects(self) -> Effect_PS | Numeric_PS: """Deprecated property. Use effects_of_investment_per_size instead.""" warnings.warn( 'The specific_effects property is deprecated. Use effects_of_investment_per_size instead.', @@ -1024,7 +1019,7 @@ def specific_effects(self) -> PeriodicEffectsUser: return self.effects_of_investment_per_size @property - def divest_effects(self) -> PeriodicEffectsUser: + def divest_effects(self) -> Effect_PS | Numeric_PS: """Deprecated property. Use effects_of_retirement instead.""" warnings.warn( 'The divest_effects property is deprecated. Use effects_of_retirement instead.', @@ -1044,11 +1039,11 @@ def piecewise_effects(self) -> PiecewiseEffects | None: return self.piecewise_effects_of_investment @property - def minimum_or_fixed_size(self) -> PeriodicData: + def minimum_or_fixed_size(self) -> Numeric_PS: return self.fixed_size if self.fixed_size is not None else self.minimum_size @property - def maximum_or_fixed_size(self) -> PeriodicData: + def maximum_or_fixed_size(self) -> Numeric_PS: return self.fixed_size if self.fixed_size is not None else self.maximum_size def format_for_repr(self) -> str: @@ -1268,30 +1263,26 @@ class OnOffParameters(Interface): def __init__( self, - effects_per_switch_on: TemporalEffectsUser | None = None, - effects_per_running_hour: TemporalEffectsUser | None = None, - on_hours_total_min: int | None = None, - on_hours_total_max: int | None = None, - consecutive_on_hours_min: TemporalDataUser | None = None, - consecutive_on_hours_max: TemporalDataUser | None = None, - consecutive_off_hours_min: TemporalDataUser | None = None, - consecutive_off_hours_max: TemporalDataUser | None = None, - switch_on_total_max: int | None = None, + effects_per_switch_on: Effect_TPS | Numeric_TPS | None = None, + effects_per_running_hour: Effect_TPS | Numeric_TPS | None = None, + on_hours_total_min: Numeric_PS | None = None, + on_hours_total_max: Numeric_PS | None = None, + consecutive_on_hours_min: Numeric_TPS | None = None, + consecutive_on_hours_max: Numeric_TPS | None = None, + consecutive_off_hours_min: Numeric_TPS | None = None, + consecutive_off_hours_max: Numeric_TPS | None = None, + switch_on_total_max: Numeric_PS | None = None, force_switch_on: bool = False, ): - self.effects_per_switch_on: TemporalEffectsUser = ( - effects_per_switch_on if effects_per_switch_on is not None else {} - ) - self.effects_per_running_hour: TemporalEffectsUser = ( - effects_per_running_hour if effects_per_running_hour is not None else {} - ) - self.on_hours_total_min: Scalar = on_hours_total_min - self.on_hours_total_max: Scalar = on_hours_total_max - self.consecutive_on_hours_min: TemporalDataUser = consecutive_on_hours_min - self.consecutive_on_hours_max: TemporalDataUser = consecutive_on_hours_max - self.consecutive_off_hours_min: TemporalDataUser = consecutive_off_hours_min - self.consecutive_off_hours_max: TemporalDataUser = consecutive_off_hours_max - self.switch_on_total_max: Scalar = switch_on_total_max + self.effects_per_switch_on = effects_per_switch_on if effects_per_switch_on is not None else {} + self.effects_per_running_hour = effects_per_running_hour if effects_per_running_hour is not None else {} + self.on_hours_total_min: Numeric_PS = on_hours_total_min + self.on_hours_total_max: Numeric_PS = on_hours_total_max + self.consecutive_on_hours_min: Numeric_TPS = consecutive_on_hours_min + self.consecutive_on_hours_max: Numeric_TPS = consecutive_on_hours_max + self.consecutive_off_hours_min: Numeric_TPS = consecutive_off_hours_min + self.consecutive_off_hours_max: Numeric_TPS = consecutive_off_hours_max + self.switch_on_total_max: Numeric_PS = switch_on_total_max self.force_switch_on: bool = force_switch_on def transform_data(self, flow_system: FlowSystem, name_prefix: str = '') -> None: diff --git a/flixopt/io.py b/flixopt/io.py index 3c53c4170..e83738d89 100644 --- a/flixopt/io.py +++ b/flixopt/io.py @@ -19,6 +19,8 @@ if TYPE_CHECKING: import linopy + from .types import Numeric_TPS + logger = logging.getLogger('flixopt') @@ -651,7 +653,7 @@ def update(self, new_name: str | None = None, new_folder: pathlib.Path | None = def numeric_to_str_for_repr( - value: int | float | np.integer | np.floating | np.ndarray | pd.Series | pd.DataFrame | xr.DataArray, + value: Numeric_TPS, precision: int = 1, atol: float = 1e-10, ) -> str: diff --git a/flixopt/linear_converters.py b/flixopt/linear_converters.py index 47c545506..76aa25a47 100644 --- a/flixopt/linear_converters.py +++ b/flixopt/linear_converters.py @@ -10,12 +10,13 @@ import numpy as np from .components import LinearConverter -from .core import TemporalDataUser, TimeSeriesData +from .core import TimeSeriesData from .structure import register_class_for_io if TYPE_CHECKING: from .elements import Flow from .interface import OnOffParameters + from .types import Numeric_TPS logger = logging.getLogger('flixopt') @@ -33,11 +34,13 @@ class Boiler(LinearConverter): label: The label of the Element. Used to identify it in the FlowSystem. eta: Thermal efficiency factor (0-1 range). Defines the ratio of thermal output to fuel input energy content. - Q_fu: Fuel input-flow representing fuel consumption. - Q_th: Thermal output-flow representing heat generation. + fuel_flow: Fuel input-flow representing fuel consumption. + thermal_flow: Thermal output-flow representing heat generation. on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. + Q_fu: *Deprecated*. Use `fuel_flow` instead. + Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Natural gas boiler: @@ -46,8 +49,8 @@ class Boiler(LinearConverter): gas_boiler = Boiler( label='natural_gas_boiler', eta=0.85, # 85% thermal efficiency - Q_fu=natural_gas_flow, - Q_th=hot_water_flow, + fuel_flow=natural_gas_flow, + thermal_flow=hot_water_flow, ) ``` @@ -57,8 +60,8 @@ class Boiler(LinearConverter): biomass_boiler = Boiler( label='wood_chip_boiler', eta=seasonal_efficiency_profile, # Time-varying efficiency - Q_fu=biomass_flow, - Q_th=district_heat_flow, + fuel_flow=biomass_flow, + thermal_flow=district_heat_flow, on_off_parameters=OnOffParameters( consecutive_on_hours_min=4, # Minimum 4-hour operation effects_per_switch_on={'startup_fuel': 50}, # Startup fuel penalty @@ -67,7 +70,7 @@ class Boiler(LinearConverter): ``` Note: - The conversion relationship is: Q_th = Q_fu × eta + The conversion relationship is: thermal_flow = fuel_flow × eta Efficiency should be between 0 and 1, where 1 represents perfect conversion (100% of fuel energy converted to useful thermal output). @@ -76,31 +79,37 @@ class Boiler(LinearConverter): def __init__( self, label: str, - eta: TemporalDataUser, - Q_fu: Flow, - Q_th: Flow, + eta: Numeric_TPS, + fuel_flow: Flow | None = None, + thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, + **kwargs, ): + # Handle deprecated parameters + fuel_flow = self._handle_deprecated_kwarg(kwargs, 'Q_fu', 'fuel_flow', fuel_flow) + thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) + self._validate_kwargs(kwargs) + super().__init__( label, - inputs=[Q_fu], - outputs=[Q_th], - conversion_factors=[{Q_fu.label: eta, Q_th.label: 1}], + inputs=[fuel_flow], + outputs=[thermal_flow], on_off_parameters=on_off_parameters, meta_data=meta_data, ) - self.Q_fu = Q_fu - self.Q_th = Q_th + self.fuel_flow = fuel_flow + self.thermal_flow = thermal_flow + self.eta = eta # Uses setter @property def eta(self): - return self.conversion_factors[0][self.Q_fu.label] + return self.conversion_factors[0][self.fuel_flow.label] @eta.setter def eta(self, value): check_bounds(value, 'eta', self.label_full, 0, 1) - self.conversion_factors[0][self.Q_fu.label] = value + self.conversion_factors = [{self.fuel_flow.label: value, self.thermal_flow.label: 1}] @register_class_for_io @@ -118,11 +127,13 @@ class Power2Heat(LinearConverter): eta: Thermal efficiency factor (0-1 range). For resistance heating this is typically close to 1.0 (nearly 100% efficiency), but may be lower for electrode boilers or systems with distribution losses. - P_el: Electrical input-flow representing electricity consumption. - Q_th: Thermal output-flow representing heat generation. + power_flow: Electrical input-flow representing electricity consumption. + thermal_flow: Thermal output-flow representing heat generation. on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. + P_el: *Deprecated*. Use `power_flow` instead. + Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Electric resistance heater: @@ -131,8 +142,8 @@ class Power2Heat(LinearConverter): electric_heater = Power2Heat( label='resistance_heater', eta=0.98, # 98% efficiency (small losses) - P_el=electricity_flow, - Q_th=space_heating_flow, + power_flow=electricity_flow, + thermal_flow=space_heating_flow, ) ``` @@ -142,8 +153,8 @@ class Power2Heat(LinearConverter): electrode_boiler = Power2Heat( label='electrode_steam_boiler', eta=0.95, # 95% efficiency including boiler losses - P_el=industrial_electricity, - Q_th=process_steam_flow, + power_flow=industrial_electricity, + thermal_flow=process_steam_flow, on_off_parameters=OnOffParameters( consecutive_on_hours_min=1, # Minimum 1-hour operation effects_per_switch_on={'startup_cost': 100}, @@ -152,7 +163,7 @@ class Power2Heat(LinearConverter): ``` Note: - The conversion relationship is: Q_th = P_el × eta + The conversion relationship is: thermal_flow = power_flow × eta Unlike heat pumps, Power2Heat systems cannot exceed 100% efficiency (eta ≤ 1.0) as they only convert electrical energy without extracting additional energy @@ -163,32 +174,38 @@ class Power2Heat(LinearConverter): def __init__( self, label: str, - eta: TemporalDataUser, - P_el: Flow, - Q_th: Flow, + eta: Numeric_TPS, + power_flow: Flow | None = None, + thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, + **kwargs, ): + # Handle deprecated parameters + power_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'power_flow', power_flow) + thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) + self._validate_kwargs(kwargs) + super().__init__( label, - inputs=[P_el], - outputs=[Q_th], - conversion_factors=[{P_el.label: eta, Q_th.label: 1}], + inputs=[power_flow], + outputs=[thermal_flow], on_off_parameters=on_off_parameters, meta_data=meta_data, ) - self.P_el = P_el - self.Q_th = Q_th + self.power_flow = power_flow + self.thermal_flow = thermal_flow + self.eta = eta # Uses setter @property def eta(self): - return self.conversion_factors[0][self.P_el.label] + return self.conversion_factors[0][self.power_flow.label] @eta.setter def eta(self, value): check_bounds(value, 'eta', self.label_full, 0, 1) - self.conversion_factors[0][self.P_el.label] = value + self.conversion_factors = [{self.power_flow.label: value, self.thermal_flow.label: 1}] @register_class_for_io @@ -203,14 +220,17 @@ class HeatPump(LinearConverter): Args: label: The label of the Element. Used to identify it in the FlowSystem. - COP: Coefficient of Performance (typically 1-20 range). Defines the ratio of + cop: Coefficient of Performance (typically 1-20 range). Defines the ratio of thermal output to electrical input. COP > 1 indicates the heat pump extracts additional energy from the environment. - P_el: Electrical input-flow representing electricity consumption. - Q_th: Thermal output-flow representing heat generation. + power_flow: Electrical input-flow representing electricity consumption. + thermal_flow: Thermal output-flow representing heat generation. on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. + COP: *Deprecated*. Use `cop` instead. + P_el: *Deprecated*. Use `power_flow` instead. + Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Air-source heat pump with constant COP: @@ -218,9 +238,9 @@ class HeatPump(LinearConverter): ```python air_hp = HeatPump( label='air_source_heat_pump', - COP=3.5, # COP of 3.5 (350% efficiency) - P_el=electricity_flow, - Q_th=heating_flow, + cop=3.5, # COP of 3.5 (350% efficiency) + power_flow=electricity_flow, + thermal_flow=heating_flow, ) ``` @@ -229,9 +249,9 @@ class HeatPump(LinearConverter): ```python ground_hp = HeatPump( label='geothermal_heat_pump', - COP=temperature_dependent_cop, # Time-varying COP based on ground temp - P_el=electricity_flow, - Q_th=radiant_heating_flow, + cop=temperature_dependent_cop, # Time-varying COP based on ground temp + power_flow=electricity_flow, + thermal_flow=radiant_heating_flow, on_off_parameters=OnOffParameters( consecutive_on_hours_min=2, # Avoid frequent cycling effects_per_running_hour={'maintenance': 0.5}, @@ -240,7 +260,7 @@ class HeatPump(LinearConverter): ``` Note: - The conversion relationship is: Q_th = P_el × COP + The conversion relationship is: thermal_flow = power_flow × COP COP should be greater than 1 for realistic heat pump operation, with typical values ranging from 2-6 depending on technology and operating conditions. @@ -250,32 +270,39 @@ class HeatPump(LinearConverter): def __init__( self, label: str, - COP: TemporalDataUser, - P_el: Flow, - Q_th: Flow, + cop: Numeric_TPS, + power_flow: Flow | None = None, + thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, + **kwargs, ): + # Handle deprecated parameters + power_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'power_flow', power_flow) + thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) + cop = self._handle_deprecated_kwarg(kwargs, 'COP', 'cop', cop) + self._validate_kwargs(kwargs) + super().__init__( label, - inputs=[P_el], - outputs=[Q_th], - conversion_factors=[{P_el.label: COP, Q_th.label: 1}], + inputs=[power_flow], + outputs=[thermal_flow], + conversion_factors=[], on_off_parameters=on_off_parameters, meta_data=meta_data, ) - self.P_el = P_el - self.Q_th = Q_th - self.COP = COP + self.power_flow = power_flow + self.thermal_flow = thermal_flow + self.cop = cop # Uses setter @property - def COP(self): # noqa: N802 - return self.conversion_factors[0][self.P_el.label] + def cop(self): + return self.conversion_factors[0][self.power_flow.label] - @COP.setter - def COP(self, value): # noqa: N802 - check_bounds(value, 'COP', self.label_full, 1, 20) - self.conversion_factors[0][self.P_el.label] = value + @cop.setter + def cop(self, value): + check_bounds(value, 'cop', self.label_full, 1, 20) + self.conversion_factors = [{self.power_flow.label: value, self.thermal_flow.label: 1}] @register_class_for_io @@ -293,11 +320,13 @@ class CoolingTower(LinearConverter): specific_electricity_demand: Auxiliary electricity demand per unit of cooling power (dimensionless, typically 0.01-0.05 range). Represents the fraction of thermal power that must be supplied as electricity for fans and pumps. - P_el: Electrical input-flow representing electricity consumption for fans/pumps. - Q_th: Thermal input-flow representing waste heat to be rejected to environment. + power_flow: Electrical input-flow representing electricity consumption for fans/pumps. + thermal_flow: Thermal input-flow representing waste heat to be rejected to environment. on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. + P_el: *Deprecated*. Use `power_flow` instead. + Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Industrial cooling tower: @@ -306,8 +335,8 @@ class CoolingTower(LinearConverter): cooling_tower = CoolingTower( label='process_cooling_tower', specific_electricity_demand=0.025, # 2.5% auxiliary power - P_el=cooling_electricity, - Q_th=waste_heat_flow, + power_flow=cooling_electricity, + thermal_flow=waste_heat_flow, ) ``` @@ -317,8 +346,8 @@ class CoolingTower(LinearConverter): condenser_cooling = CoolingTower( label='power_plant_cooling', specific_electricity_demand=0.015, # 1.5% auxiliary power - P_el=auxiliary_electricity, - Q_th=condenser_waste_heat, + power_flow=auxiliary_electricity, + thermal_flow=condenser_waste_heat, on_off_parameters=OnOffParameters( consecutive_on_hours_min=4, # Minimum operation time effects_per_running_hour={'water_consumption': 2.5}, # m³/h @@ -327,7 +356,7 @@ class CoolingTower(LinearConverter): ``` Note: - The conversion relationship is: P_el = Q_th × specific_electricity_demand + The conversion relationship is: power_flow = thermal_flow × specific_electricity_demand The cooling tower consumes electrical power proportional to the thermal load. No thermal energy is produced - all thermal input is rejected to the environment. @@ -339,34 +368,38 @@ class CoolingTower(LinearConverter): def __init__( self, label: str, - specific_electricity_demand: TemporalDataUser, - P_el: Flow, - Q_th: Flow, + specific_electricity_demand: Numeric_TPS, + power_flow: Flow | None = None, + thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, + **kwargs, ): + # Handle deprecated parameters + power_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'power_flow', power_flow) + thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) + self._validate_kwargs(kwargs) + super().__init__( label, - inputs=[P_el, Q_th], + inputs=[power_flow, thermal_flow], outputs=[], - conversion_factors=[{P_el.label: -1, Q_th.label: specific_electricity_demand}], on_off_parameters=on_off_parameters, meta_data=meta_data, ) - self.P_el = P_el - self.Q_th = Q_th - - check_bounds(specific_electricity_demand, 'specific_electricity_demand', self.label_full, 0, 1) + self.power_flow = power_flow + self.thermal_flow = thermal_flow + self.specific_electricity_demand = specific_electricity_demand # Uses setter @property def specific_electricity_demand(self): - return self.conversion_factors[0][self.Q_th.label] + return self.conversion_factors[0][self.thermal_flow.label] @specific_electricity_demand.setter def specific_electricity_demand(self, value): check_bounds(value, 'specific_electricity_demand', self.label_full, 0, 1) - self.conversion_factors[0][self.Q_th.label] = value + self.conversion_factors = [{self.power_flow.label: -1, self.thermal_flow.label: value}] @register_class_for_io @@ -385,12 +418,15 @@ class CHP(LinearConverter): energy converted to useful thermal output. eta_el: Electrical efficiency factor (0-1 range). Defines the fraction of fuel energy converted to electrical output. - Q_fu: Fuel input-flow representing fuel consumption. - P_el: Electrical output-flow representing electricity generation. - Q_th: Thermal output-flow representing heat generation. + fuel_flow: Fuel input-flow representing fuel consumption. + power_flow: Electrical output-flow representing electricity generation. + thermal_flow: Thermal output-flow representing heat generation. on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. + Q_fu: *Deprecated*. Use `fuel_flow` instead. + P_el: *Deprecated*. Use `power_flow` instead. + Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Natural gas CHP unit: @@ -400,9 +436,9 @@ class CHP(LinearConverter): label='natural_gas_chp', eta_th=0.45, # 45% thermal efficiency eta_el=0.35, # 35% electrical efficiency (80% total) - Q_fu=natural_gas_flow, - P_el=electricity_flow, - Q_th=district_heat_flow, + fuel_flow=natural_gas_flow, + power_flow=electricity_flow, + thermal_flow=district_heat_flow, ) ``` @@ -413,9 +449,9 @@ class CHP(LinearConverter): label='industrial_chp', eta_th=0.40, eta_el=0.38, - Q_fu=fuel_gas_flow, - P_el=plant_electricity, - Q_th=process_steam, + fuel_flow=fuel_gas_flow, + power_flow=plant_electricity, + thermal_flow=process_steam, on_off_parameters=OnOffParameters( consecutive_on_hours_min=8, # Minimum 8-hour operation effects_per_switch_on={'startup_cost': 5000}, @@ -426,8 +462,8 @@ class CHP(LinearConverter): Note: The conversion relationships are: - - Q_th = Q_fu × eta_th (thermal output) - - P_el = Q_fu × eta_el (electrical output) + - thermal_flow = fuel_flow × eta_th (thermal output) + - power_flow = fuel_flow × eta_el (electrical output) Total efficiency (eta_th + eta_el) should be ≤ 1.0, with typical combined efficiencies of 80-90% for modern CHP units. This provides significant @@ -437,49 +473,63 @@ class CHP(LinearConverter): def __init__( self, label: str, - eta_th: TemporalDataUser, - eta_el: TemporalDataUser, - Q_fu: Flow, - P_el: Flow, - Q_th: Flow, + eta_th: Numeric_TPS, + eta_el: Numeric_TPS, + fuel_flow: Flow | None = None, + power_flow: Flow | None = None, + thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, + **kwargs, ): - heat = {Q_fu.label: eta_th, Q_th.label: 1} - electricity = {Q_fu.label: eta_el, P_el.label: 1} + # Handle deprecated parameters + fuel_flow = self._handle_deprecated_kwarg(kwargs, 'Q_fu', 'fuel_flow', fuel_flow) + power_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'power_flow', power_flow) + thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) + self._validate_kwargs(kwargs) super().__init__( label, - inputs=[Q_fu], - outputs=[Q_th, P_el], - conversion_factors=[heat, electricity], + inputs=[fuel_flow], + outputs=[thermal_flow, power_flow], + conversion_factors=[], on_off_parameters=on_off_parameters, meta_data=meta_data, ) - self.Q_fu = Q_fu - self.P_el = P_el - self.Q_th = Q_th + self.fuel_flow = fuel_flow + self.power_flow = power_flow + self.thermal_flow = thermal_flow + self.eta_th = eta_th # Uses setter + self.eta_el = eta_el # Uses setter check_bounds(eta_el + eta_th, 'eta_th+eta_el', self.label_full, 0, 1) @property def eta_th(self): - return self.conversion_factors[0][self.Q_fu.label] + return self.conversion_factors[0][self.fuel_flow.label] @eta_th.setter def eta_th(self, value): check_bounds(value, 'eta_th', self.label_full, 0, 1) - self.conversion_factors[0][self.Q_fu.label] = value + if len(self.conversion_factors) < 2: + # Initialize structure if not yet set + self.conversion_factors = [{self.fuel_flow.label: value, self.thermal_flow.label: 1}, {}] + else: + self.conversion_factors[0] = {self.fuel_flow.label: value, self.thermal_flow.label: 1} @property def eta_el(self): - return self.conversion_factors[1][self.Q_fu.label] + return self.conversion_factors[1][self.fuel_flow.label] @eta_el.setter def eta_el(self, value): check_bounds(value, 'eta_el', self.label_full, 0, 1) - self.conversion_factors[1][self.Q_fu.label] = value + if len(self.conversion_factors) < 2: + # Initialize structure if not yet set + self.conversion_factors = [{}, {self.fuel_flow.label: value, self.power_flow.label: 1}] + else: + self.conversion_factors[1] = {self.fuel_flow.label: value, self.power_flow.label: 1} @register_class_for_io @@ -494,16 +544,20 @@ class HeatPumpWithSource(LinearConverter): Args: label: The label of the Element. Used to identify it in the FlowSystem. - COP: Coefficient of Performance (typically 1-20 range). Defines the ratio of + cop: Coefficient of Performance (typically 1-20 range). Defines the ratio of thermal output to electrical input. The heat source extraction is automatically - calculated as Q_ab = Q_th × (COP-1)/COP. - P_el: Electrical input-flow representing electricity consumption for compressor. - Q_ab: Heat source input-flow representing thermal energy extracted from environment + calculated as heat_source_flow = thermal_flow × (COP-1)/COP. + power_flow: Electrical input-flow representing electricity consumption for compressor. + heat_source_flow: Heat source input-flow representing thermal energy extracted from environment (ground, air, water source). - Q_th: Thermal output-flow representing useful heat delivered to the application. + thermal_flow: Thermal output-flow representing useful heat delivered to the application. on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. + COP: *Deprecated*. Use `cop` instead. + P_el: *Deprecated*. Use `power_flow` instead. + Q_ab: *Deprecated*. Use `heat_source_flow` instead. + Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Ground-source heat pump with explicit ground coupling: @@ -511,10 +565,10 @@ class HeatPumpWithSource(LinearConverter): ```python ground_source_hp = HeatPumpWithSource( label='geothermal_heat_pump', - COP=4.5, # High COP due to stable ground temperature - P_el=electricity_flow, - Q_ab=ground_heat_extraction, # Heat extracted from ground loop - Q_th=building_heating_flow, + cop=4.5, # High COP due to stable ground temperature + power_flow=electricity_flow, + heat_source_flow=ground_heat_extraction, # Heat extracted from ground loop + thermal_flow=building_heating_flow, ) ``` @@ -523,10 +577,10 @@ class HeatPumpWithSource(LinearConverter): ```python waste_heat_pump = HeatPumpWithSource( label='waste_heat_pump', - COP=temperature_dependent_cop, # Varies with temperature of heat source - P_el=electricity_consumption, - Q_ab=industrial_heat_extraction, # Heat extracted from a industrial process or waste water - Q_th=heat_supply, + cop=temperature_dependent_cop, # Varies with temperature of heat source + power_flow=electricity_consumption, + heat_source_flow=industrial_heat_extraction, # Heat extracted from a industrial process or waste water + thermal_flow=heat_supply, on_off_parameters=OnOffParameters( consecutive_on_hours_min=0.5, # 30-minute minimum runtime effects_per_switch_on={'costs': 1000}, @@ -536,9 +590,9 @@ class HeatPumpWithSource(LinearConverter): Note: The conversion relationships are: - - Q_th = P_el × COP (thermal output from electrical input) - - Q_ab = Q_th × (COP-1)/COP (heat source extraction) - - Energy balance: Q_th = P_el + Q_ab + - thermal_flow = power_flow × COP (thermal output from electrical input) + - heat_source_flow = thermal_flow × (COP-1)/COP (heat source extraction) + - Energy balance: thermal_flow = power_flow + heat_source_flow This formulation explicitly tracks the heat source, which is important for systems where the source capacity or temperature is limited, @@ -551,49 +605,55 @@ class HeatPumpWithSource(LinearConverter): def __init__( self, label: str, - COP: TemporalDataUser, - P_el: Flow, - Q_ab: Flow, - Q_th: Flow, + cop: Numeric_TPS, + power_flow: Flow | None = None, + heat_source_flow: Flow | None = None, + thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, + **kwargs, ): + # Handle deprecated parameters + power_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'power_flow', power_flow) + heat_source_flow = self._handle_deprecated_kwarg(kwargs, 'Q_ab', 'heat_source_flow', heat_source_flow) + thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) + cop = self._handle_deprecated_kwarg(kwargs, 'COP', 'cop', cop) + self._validate_kwargs(kwargs) + super().__init__( label, - inputs=[P_el, Q_ab], - outputs=[Q_th], - conversion_factors=[{P_el.label: COP, Q_th.label: 1}, {Q_ab.label: COP / (COP - 1), Q_th.label: 1}], + inputs=[power_flow, heat_source_flow], + outputs=[thermal_flow], + conversion_factors=[], on_off_parameters=on_off_parameters, meta_data=meta_data, ) - self.P_el = P_el - self.Q_ab = Q_ab - self.Q_th = Q_th - - if np.any(np.asarray(self.COP) <= 1): - raise ValueError(f'{self.label_full}.COP must be strictly > 1 for HeatPumpWithSource.') + self.power_flow = power_flow + self.heat_source_flow = heat_source_flow + self.thermal_flow = thermal_flow + self.cop = cop # Uses setter @property - def COP(self): # noqa: N802 - return self.conversion_factors[0][self.P_el.label] + def cop(self): # noqa: N802 + return self.conversion_factors[0][self.power_flow.label] - @COP.setter - def COP(self, value): # noqa: N802 - check_bounds(value, 'COP', self.label_full, 1, 20) + @cop.setter + def cop(self, value): # noqa: N802 + check_bounds(value, 'cop', self.label_full, 1, 20) if np.any(np.asarray(value) <= 1): - raise ValueError(f'{self.label_full}.COP must be strictly > 1 for HeatPumpWithSource.') + raise ValueError(f'{self.label_full}.cop must be strictly > 1 for HeatPumpWithSource.') self.conversion_factors = [ - {self.P_el.label: value, self.Q_th.label: 1}, - {self.Q_ab.label: value / (value - 1), self.Q_th.label: 1}, + {self.power_flow.label: value, self.thermal_flow.label: 1}, + {self.heat_source_flow.label: value / (value - 1), self.thermal_flow.label: 1}, ] def check_bounds( - value: TemporalDataUser, + value: Numeric_TPS, parameter_label: str, element_label: str, - lower_bound: TemporalDataUser, - upper_bound: TemporalDataUser, + lower_bound: Numeric_TPS, + upper_bound: Numeric_TPS, ) -> None: """ Check if the value is within the bounds. The bounds are exclusive. diff --git a/flixopt/modeling.py b/flixopt/modeling.py index c7f0bf314..13b4c0e3e 100644 --- a/flixopt/modeling.py +++ b/flixopt/modeling.py @@ -5,7 +5,6 @@ import xarray as xr from .config import CONFIG -from .core import TemporalData from .structure import Submodel logger = logging.getLogger('flixopt') @@ -119,7 +118,7 @@ def count_consecutive_states( class ModelingUtilities: @staticmethod def compute_consecutive_hours_in_state( - binary_values: TemporalData, + binary_values: xr.DataArray, hours_per_timestep: int | float, epsilon: float = None, ) -> float: @@ -203,7 +202,7 @@ def expression_tracking_variable( tracked_expression, name: str = None, short_name: str = None, - bounds: tuple[TemporalData, TemporalData] = None, + bounds: tuple[xr.DataArray, xr.DataArray] = None, coords: str | list[str] | None = None, ) -> tuple[linopy.Variable, linopy.Constraint]: """ @@ -242,11 +241,11 @@ def consecutive_duration_tracking( state_variable: linopy.Variable, name: str = None, short_name: str = None, - minimum_duration: TemporalData | None = None, - maximum_duration: TemporalData | None = None, + minimum_duration: xr.DataArray | None = None, + maximum_duration: xr.DataArray | None = None, duration_dim: str = 'time', - duration_per_step: int | float | TemporalData = None, - previous_duration: TemporalData = 0, + duration_per_step: int | float | xr.DataArray = None, + previous_duration: xr.DataArray = 0, ) -> tuple[linopy.Variable, tuple[linopy.Constraint, linopy.Constraint, linopy.Constraint]]: """ Creates consecutive duration tracking for a binary state variable. @@ -394,7 +393,7 @@ class BoundingPatterns: def basic_bounds( model: Submodel, variable: linopy.Variable, - bounds: tuple[TemporalData, TemporalData], + bounds: tuple[xr.DataArray, xr.DataArray], name: str = None, ) -> list[linopy.constraints.Constraint]: """Create simple bounds. @@ -426,7 +425,7 @@ def basic_bounds( def bounds_with_state( model: Submodel, variable: linopy.Variable, - bounds: tuple[TemporalData, TemporalData], + bounds: tuple[xr.DataArray, xr.DataArray], variable_state: linopy.Variable, name: str = None, ) -> list[linopy.Constraint]: @@ -473,7 +472,7 @@ def scaled_bounds( model: Submodel, variable: linopy.Variable, scaling_variable: linopy.Variable, - relative_bounds: tuple[TemporalData, TemporalData], + relative_bounds: tuple[xr.DataArray, xr.DataArray], name: str = None, ) -> list[linopy.Constraint]: """Constraint a variable by scaling bounds, dependent on another variable. @@ -516,8 +515,8 @@ def scaled_bounds_with_state( model: Submodel, variable: linopy.Variable, scaling_variable: linopy.Variable, - relative_bounds: tuple[TemporalData, TemporalData], - scaling_bounds: tuple[TemporalData, TemporalData], + relative_bounds: tuple[xr.DataArray, xr.DataArray], + scaling_bounds: tuple[xr.DataArray, xr.DataArray], variable_state: linopy.Variable, name: str = None, ) -> list[linopy.Constraint]: diff --git a/flixopt/results.py b/flixopt/results.py index 954af6669..3d9aedf62 100644 --- a/flixopt/results.py +++ b/flixopt/results.py @@ -246,13 +246,15 @@ def __init__( components_dict = { label: ComponentResults(self, **infos) for label, infos in self.solution.attrs['Components'].items() } - self.components = ResultsContainer(elements=components_dict, element_type_name='component results') + self.components = ResultsContainer( + elements=components_dict, element_type_name='component results', truncate_repr=10 + ) buses_dict = {label: BusResults(self, **infos) for label, infos in self.solution.attrs['Buses'].items()} - self.buses = ResultsContainer(elements=buses_dict, element_type_name='bus results') + self.buses = ResultsContainer(elements=buses_dict, element_type_name='bus results', truncate_repr=10) effects_dict = {label: EffectResults(self, **infos) for label, infos in self.solution.attrs['Effects'].items()} - self.effects = ResultsContainer(elements=effects_dict, element_type_name='effect results') + self.effects = ResultsContainer(elements=effects_dict, element_type_name='effect results', truncate_repr=10) if 'Flows' not in self.solution.attrs: warnings.warn( @@ -267,7 +269,7 @@ def __init__( label: FlowResults(self, **infos) for label, infos in self.solution.attrs.get('Flows', {}).items() } self._has_flow_data = True - self.flows = ResultsContainer(elements=flows_dict, element_type_name='flow results') + self.flows = ResultsContainer(elements=flows_dict, element_type_name='flow results', truncate_repr=10) self.timesteps_extra = self.solution.indexes['time'] self.hours_per_timestep = FlowSystem.calculate_hours_per_timestep(self.timesteps_extra) diff --git a/flixopt/structure.py b/flixopt/structure.py index e2aa6ee87..2bce6aa52 100644 --- a/flixopt/structure.py +++ b/flixopt/structure.py @@ -921,14 +921,17 @@ def __init__( self, elements: list[T] | dict[str, T] | None = None, element_type_name: str = 'elements', + truncate_repr: int | None = None, ): """ Args: elements: Initial elements to add (list or dict) element_type_name: Name for display (e.g., 'components', 'buses') + truncate_repr: Maximum number of items to show in repr. If None, show all items. Default: None """ super().__init__() self._element_type_name = element_type_name + self._truncate_repr = truncate_repr if elements is not None: if isinstance(elements, dict): @@ -999,8 +1002,20 @@ def __getitem__(self, label: str) -> T: error_msg += f' Available: {", ".join(available[:5])} ... (+{len(available) - 5} more)' raise KeyError(error_msg) from None - def __repr__(self) -> str: - """Return a string representation similar to linopy.model.Variables.""" + def _get_repr(self, max_items: int | None = None) -> str: + """ + Get string representation with optional truncation. + + Args: + max_items: Maximum number of items to show. If None, uses instance default (self._truncate_repr). + If still None, shows all items. + + Returns: + Formatted string representation + """ + # Use provided max_items, or fall back to instance default + limit = max_items if max_items is not None else self._truncate_repr + count = len(self) title = f'{self._element_type_name.capitalize()} ({count} item{"s" if count != 1 else ""})' @@ -1009,11 +1024,24 @@ def __repr__(self) -> str: r += '\n' else: r = fx_io.format_title_with_underline(title) - for name in sorted(self.keys(), key=_natural_sort_key): - r += f' * {name}\n' + sorted_names = sorted(self.keys(), key=_natural_sort_key) + + if limit is not None and limit > 0 and len(sorted_names) > limit: + # Show truncated list + for name in sorted_names[:limit]: + r += f' * {name}\n' + r += f' ... (+{len(sorted_names) - limit} more)\n' + else: + # Show all items + for name in sorted_names: + r += f' * {name}\n' return r + def __repr__(self) -> str: + """Return a string representation using the instance's truncate_repr setting.""" + return self._get_repr() + class ElementContainer(ContainerMixin[T]): """ @@ -1215,6 +1243,7 @@ def _format_grouped_containers(self, title: str | None = None) -> str: if container: # Only show non-empty groups if parts: # Add spacing between sections parts.append('') + # Use container's __repr__ which respects its truncate_repr setting parts.append(repr(container).rstrip('\n')) return '\n'.join(parts) diff --git a/flixopt/types.py b/flixopt/types.py new file mode 100644 index 000000000..f53d308c4 --- /dev/null +++ b/flixopt/types.py @@ -0,0 +1,112 @@ +"""Type system for dimension-aware data in flixopt. + +Type aliases use suffix notation to indicate maximum dimensions. Data can have any +subset of these dimensions (including scalars, which are broadcast to all dimensions). + +| Suffix | Dimensions | Use Case | +|--------|------------|----------| +| `_TPS` | Time, Period, Scenario | Time-varying data across all dimensions | +| `_PS` | Period, Scenario | Investment parameters (no time variation) | +| `_S` | Scenario | Scenario-specific parameters | +| (none) | Scalar only | Single numeric values | + +All dimensioned types accept: scalars (`int`, `float`), arrays (`ndarray`), +Series (`pd.Series`), DataFrames (`pd.DataFrame`), or DataArrays (`xr.DataArray`). + +Example: + ```python + from flixopt.types import Numeric_TPS, Numeric_PS, Scalar + + + def create_flow( + size: Numeric_PS = None, # Scalar, array, Series, DataFrame, or DataArray + profile: Numeric_TPS = 1.0, # Time-varying data + efficiency: Scalar = 0.95, # Scalars only + ): ... + + + # All valid: + create_flow(size=100) # Scalar broadcast + create_flow(size=np.array([100, 150])) # Period-varying + create_flow(profile=pd.DataFrame(...)) # Time + scenario + ``` + +Important: + Data can have **any subset** of specified dimensions, but **cannot have more + dimensions than the FlowSystem**. If the FlowSystem has only time dimension, + you cannot pass period or scenario data. The type hints indicate the maximum + dimensions that could be used if they exist in the FlowSystem. +""" + +from typing import TypeAlias + +import numpy as np +import pandas as pd +import xarray as xr + +# Internal base types - not exported +_Numeric: TypeAlias = int | float | np.integer | np.floating | np.ndarray | pd.Series | pd.DataFrame | xr.DataArray +_Bool: TypeAlias = bool | np.bool_ | np.ndarray | pd.Series | pd.DataFrame | xr.DataArray +_Effect: TypeAlias = _Numeric | dict[str, _Numeric] + +# Combined type for numeric or boolean data (no dimension information) +NumericOrBool: TypeAlias = ( + int | float | bool | np.integer | np.floating | np.bool_ | np.ndarray | pd.Series | pd.DataFrame | xr.DataArray +) +"""Numeric or boolean data without dimension metadata. For internal utilities.""" + + +# Numeric data types +Numeric_TPS: TypeAlias = _Numeric +"""Time, Period, Scenario dimensions. For time-varying data across all dimensions.""" + +Numeric_PS: TypeAlias = _Numeric +"""Period, Scenario dimensions. For investment parameters (e.g., size, costs).""" + +Numeric_S: TypeAlias = _Numeric +"""Scenario dimension. For scenario-specific parameters (e.g., discount rates).""" + + +# Boolean data types +Bool_TPS: TypeAlias = _Bool +"""Time, Period, Scenario dimensions. For time-varying binary flags/constraints.""" + +Bool_PS: TypeAlias = _Bool +"""Period, Scenario dimensions. For period-specific binary decisions.""" + +Bool_S: TypeAlias = _Bool +"""Scenario dimension. For scenario-specific binary flags.""" + + +# Effect data types +Effect_TPS: TypeAlias = _Effect +"""Time, Period, Scenario dimensions. For time-varying effects (costs, emissions). +Can be single numeric value or dict mapping effect names to values.""" + +Effect_PS: TypeAlias = _Effect +"""Period, Scenario dimensions. For period-specific effects (investment costs). +Can be single numeric value or dict mapping effect names to values.""" + +Effect_S: TypeAlias = _Effect +"""Scenario dimension. For scenario-specific effects (carbon prices). +Can be single numeric value or dict mapping effect names to values.""" + + +# Scalar type (no dimensions) +Scalar: TypeAlias = int | float | np.integer | np.floating +"""Scalar numeric values only. Not converted to DataArray (unlike dimensioned types).""" + +# Export public API +__all__ = [ + 'Numeric_TPS', + 'Numeric_PS', + 'Numeric_S', + 'Bool_TPS', + 'Bool_PS', + 'Bool_S', + 'Effect_TPS', + 'Effect_PS', + 'Effect_S', + 'Scalar', + 'NumericOrBool', +] diff --git a/pyproject.toml b/pyproject.toml index 764dbea1d..eb1fea0f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -98,7 +98,7 @@ dev = [ # Documentation building docs = [ "mkdocs==1.6.1", - "mkdocs-material==9.6.22", + "mkdocs-material==9.6.23", "mkdocstrings-python==1.18.2", "mkdocs-table-reader-plugin==3.1.0", "mkdocs-gen-files==0.5.0", diff --git a/tests/test_flow_system_resample.py b/tests/test_flow_system_resample.py new file mode 100644 index 000000000..d28872a0f --- /dev/null +++ b/tests/test_flow_system_resample.py @@ -0,0 +1,293 @@ +"""Integration tests for FlowSystem.resample() - verifies correct data resampling and structure preservation.""" + +import numpy as np +import pandas as pd +import pytest +from numpy.testing import assert_allclose + +import flixopt as fx + + +@pytest.fixture +def simple_fs(): + """Simple FlowSystem with basic components.""" + timesteps = pd.date_range('2023-01-01', periods=24, freq='h') + fs = fx.FlowSystem(timesteps) + fs.add_elements( + fx.Bus('heat'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True) + ) + fs.add_elements( + fx.Sink( + label='demand', + inputs=[fx.Flow(label='in', bus='heat', fixed_relative_profile=np.linspace(10, 20, 24), size=1)], + ), + fx.Source( + label='source', outputs=[fx.Flow(label='out', bus='heat', size=50, effects_per_flow_hour={'costs': 0.05})] + ), + ) + return fs + + +@pytest.fixture +def complex_fs(): + """FlowSystem with complex elements (storage, piecewise, invest).""" + timesteps = pd.date_range('2023-01-01', periods=48, freq='h') + fs = fx.FlowSystem(timesteps) + + fs.add_elements( + fx.Bus('heat'), + fx.Bus('elec'), + fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True), + ) + + # Storage + fs.add_elements( + fx.Storage( + label='battery', + charging=fx.Flow('charge', bus='elec', size=10), + discharging=fx.Flow('discharge', bus='elec', size=10), + capacity_in_flow_hours=fx.InvestParameters(fixed_size=100), + ) + ) + + # Piecewise converter + converter = fx.linear_converters.Boiler( + 'boiler', eta=0.9, Q_fu=fx.Flow('gas', bus='elec'), Q_th=fx.Flow('heat', bus='heat') + ) + converter.Q_th.size = 100 + fs.add_elements(converter) + + # Component with investment + fs.add_elements( + fx.Source( + label='pv', + outputs=[ + fx.Flow( + 'gen', + bus='elec', + size=fx.InvestParameters(maximum_size=1000, effects_of_investment_per_size={'costs': 100}), + ) + ], + ) + ) + + return fs + + +# === Basic Functionality === + + +@pytest.mark.parametrize('freq,method', [('2h', 'mean'), ('4h', 'sum'), ('6h', 'first')]) +def test_basic_resample(simple_fs, freq, method): + """Test basic resampling preserves structure.""" + fs_r = simple_fs.resample(freq, method=method) + assert len(fs_r.components) == len(simple_fs.components) + assert len(fs_r.buses) == len(simple_fs.buses) + assert len(fs_r.timesteps) < len(simple_fs.timesteps) + + +@pytest.mark.parametrize( + 'method,expected', + [ + ('mean', [15.0, 35.0]), + ('sum', [30.0, 70.0]), + ('first', [10.0, 30.0]), + ('last', [20.0, 40.0]), + ], +) +def test_resample_methods(method, expected): + """Test different resampling methods.""" + ts = pd.date_range('2023-01-01', periods=4, freq='h') + fs = fx.FlowSystem(ts) + fs.add_elements(fx.Bus('b'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) + fs.add_elements( + fx.Sink( + label='s', + inputs=[fx.Flow(label='in', bus='b', fixed_relative_profile=np.array([10.0, 20.0, 30.0, 40.0]), size=1)], + ) + ) + + fs_r = fs.resample('2h', method=method) + assert_allclose(fs_r.flows['s(in)'].fixed_relative_profile.values, expected, rtol=1e-10) + + +def test_structure_preserved(simple_fs): + """Test all structural elements preserved.""" + fs_r = simple_fs.resample('2h', method='mean') + assert set(simple_fs.components.keys()) == set(fs_r.components.keys()) + assert set(simple_fs.buses.keys()) == set(fs_r.buses.keys()) + assert set(simple_fs.effects.keys()) == set(fs_r.effects.keys()) + + # Flow connections preserved + for label in simple_fs.flows.keys(): + assert simple_fs.flows[label].bus == fs_r.flows[label].bus + assert simple_fs.flows[label].component == fs_r.flows[label].component + + +def test_time_metadata_updated(simple_fs): + """Test time metadata correctly updated.""" + fs_r = simple_fs.resample('3h', method='mean') + assert len(fs_r.timesteps) == 8 + assert_allclose(fs_r.hours_per_timestep.values, 3.0) + assert fs_r.hours_of_last_timestep == 3.0 + + +# === Advanced Dimensions === + + +@pytest.mark.parametrize( + 'dim_name,dim_value', + [ + ('periods', pd.Index([2023, 2024], name='period')), + ('scenarios', pd.Index(['base', 'high'], name='scenario')), + ], +) +def test_with_dimensions(simple_fs, dim_name, dim_value): + """Test resampling preserves period/scenario dimensions.""" + fs = fx.FlowSystem(simple_fs.timesteps, **{dim_name: dim_value}) + fs.add_elements(fx.Bus('h'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) + fs.add_elements( + fx.Sink(label='d', inputs=[fx.Flow(label='in', bus='h', fixed_relative_profile=np.ones(24), size=1)]) + ) + + fs_r = fs.resample('2h', method='mean') + assert getattr(fs_r, dim_name) is not None + pd.testing.assert_index_equal(getattr(fs_r, dim_name), dim_value) + + +# === Complex Elements === + + +def test_storage_resample(complex_fs): + """Test storage component resampling.""" + fs_r = complex_fs.resample('4h', method='mean') + assert 'battery' in fs_r.components + storage = fs_r.components['battery'] + assert storage.charging.label == 'charge' + assert storage.discharging.label == 'discharge' + + +def test_converter_resample(complex_fs): + """Test converter component resampling.""" + fs_r = complex_fs.resample('4h', method='mean') + assert 'boiler' in fs_r.components + boiler = fs_r.components['boiler'] + assert hasattr(boiler, 'eta') + + +def test_invest_resample(complex_fs): + """Test investment parameters preserved.""" + fs_r = complex_fs.resample('4h', method='mean') + pv_flow = fs_r.flows['pv(gen)'] + assert isinstance(pv_flow.size, fx.InvestParameters) + assert pv_flow.size.maximum_size == 1000 + + +# === Modeling Integration === + + +@pytest.mark.parametrize('with_dim', [None, 'periods', 'scenarios']) +def test_modeling(with_dim): + """Test resampled FlowSystem can be modeled.""" + ts = pd.date_range('2023-01-01', periods=48, freq='h') + kwargs = {} + if with_dim == 'periods': + kwargs['periods'] = pd.Index([2023, 2024], name='period') + elif with_dim == 'scenarios': + kwargs['scenarios'] = pd.Index(['base', 'high'], name='scenario') + + fs = fx.FlowSystem(ts, **kwargs) + fs.add_elements(fx.Bus('h'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) + fs.add_elements( + fx.Sink( + label='d', inputs=[fx.Flow(label='in', bus='h', fixed_relative_profile=np.linspace(10, 30, 48), size=1)] + ), + fx.Source(label='s', outputs=[fx.Flow(label='out', bus='h', size=100, effects_per_flow_hour={'costs': 0.05})]), + ) + + fs_r = fs.resample('4h', method='mean') + calc = fx.FullCalculation('test', fs_r) + calc.do_modeling() + + assert calc.model is not None + assert len(calc.model.variables) > 0 + + +def test_model_structure_preserved(): + """Test model structure (var/constraint types) preserved.""" + ts = pd.date_range('2023-01-01', periods=48, freq='h') + fs = fx.FlowSystem(ts) + fs.add_elements(fx.Bus('h'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) + fs.add_elements( + fx.Sink( + label='d', inputs=[fx.Flow(label='in', bus='h', fixed_relative_profile=np.linspace(10, 30, 48), size=1)] + ), + fx.Source(label='s', outputs=[fx.Flow(label='out', bus='h', size=100, effects_per_flow_hour={'costs': 0.05})]), + ) + + calc_orig = fx.FullCalculation('orig', fs) + calc_orig.do_modeling() + + fs_r = fs.resample('4h', method='mean') + calc_r = fx.FullCalculation('resamp', fs_r) + calc_r.do_modeling() + + # Same number of variable/constraint types + assert len(calc_orig.model.variables) == len(calc_r.model.variables) + assert len(calc_orig.model.constraints) == len(calc_r.model.constraints) + + # Same names + assert set(calc_orig.model.variables.labels.data_vars.keys()) == set(calc_r.model.variables.labels.data_vars.keys()) + assert set(calc_orig.model.constraints.labels.data_vars.keys()) == set( + calc_r.model.constraints.labels.data_vars.keys() + ) + + +# === Advanced Features === + + +def test_dataset_roundtrip(simple_fs): + """Test dataset serialization.""" + fs_r = simple_fs.resample('2h', method='mean') + assert fx.FlowSystem.from_dataset(fs_r.to_dataset()) == fs_r + + +def test_dataset_chaining(simple_fs): + """Test power user pattern.""" + ds = simple_fs.to_dataset() + ds = fx.FlowSystem._dataset_sel(ds, time='2023-01-01') + ds = fx.FlowSystem._dataset_resample(ds, freq='2h', method='mean') + fs_result = fx.FlowSystem.from_dataset(ds) + + fs_simple = simple_fs.sel(time='2023-01-01').resample('2h', method='mean') + assert fs_result == fs_simple + + +@pytest.mark.parametrize('freq,exp_len', [('2h', 84), ('6h', 28), ('1D', 7)]) +def test_frequencies(freq, exp_len): + """Test various frequencies.""" + ts = pd.date_range('2023-01-01', periods=168, freq='h') + fs = fx.FlowSystem(ts) + fs.add_elements(fx.Bus('b'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) + fs.add_elements( + fx.Sink(label='s', inputs=[fx.Flow(label='in', bus='b', fixed_relative_profile=np.ones(168), size=1)]) + ) + + assert len(fs.resample(freq, method='mean').timesteps) == exp_len + + +def test_irregular_timesteps(): + """Test irregular timesteps.""" + ts = pd.DatetimeIndex(['2023-01-01 00:00', '2023-01-01 01:00', '2023-01-01 03:00'], name='time') + fs = fx.FlowSystem(ts) + fs.add_elements(fx.Bus('b'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) + fs.add_elements( + fx.Sink(label='s', inputs=[fx.Flow(label='in', bus='b', fixed_relative_profile=np.ones(3), size=1)]) + ) + + fs_r = fs.resample('1h', method='mean') + assert len(fs_r.timesteps) > 0 + + +if __name__ == '__main__': + pytest.main(['-v', __file__]) diff --git a/tests/test_resample_equivalence.py b/tests/test_resample_equivalence.py new file mode 100644 index 000000000..19144b6a1 --- /dev/null +++ b/tests/test_resample_equivalence.py @@ -0,0 +1,310 @@ +""" +Tests to ensure the dimension grouping optimization in _resample_by_dimension_groups +is equivalent to naive Dataset resampling. + +These tests verify that the optimization (grouping variables by dimensions before +resampling) produces identical results to simply calling Dataset.resample() directly. +""" + +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +import flixopt as fx + + +def naive_dataset_resample(dataset: xr.Dataset, freq: str, method: str) -> xr.Dataset: + """ + Naive resampling: simply call Dataset.resample().method() directly. + + This is the straightforward approach without dimension grouping optimization. + """ + return getattr(dataset.resample(time=freq), method)() + + +def create_dataset_with_mixed_dimensions(n_timesteps=48, seed=42): + """ + Create a dataset with variables having different dimension structures. + + This mimics realistic data with: + - Variables with only time dimension + - Variables with time + one other dimension + - Variables with time + multiple dimensions + """ + np.random.seed(seed) + timesteps = pd.date_range('2020-01-01', periods=n_timesteps, freq='h') + + ds = xr.Dataset( + coords={ + 'time': timesteps, + 'component': ['comp1', 'comp2'], + 'bus': ['bus1', 'bus2'], + 'scenario': ['base', 'alt'], + } + ) + + # Variable with only time dimension + ds['total_demand'] = xr.DataArray( + np.random.randn(n_timesteps), + dims=['time'], + coords={'time': ds.time}, + ) + + # Variable with time + component + ds['component_flow'] = xr.DataArray( + np.random.randn(n_timesteps, 2), + dims=['time', 'component'], + coords={'time': ds.time, 'component': ds.component}, + ) + + # Variable with time + bus + ds['bus_balance'] = xr.DataArray( + np.random.randn(n_timesteps, 2), + dims=['time', 'bus'], + coords={'time': ds.time, 'bus': ds.bus}, + ) + + # Variable with time + component + bus + ds['flow_on_bus'] = xr.DataArray( + np.random.randn(n_timesteps, 2, 2), + dims=['time', 'component', 'bus'], + coords={'time': ds.time, 'component': ds.component, 'bus': ds.bus}, + ) + + # Variable with time + scenario + ds['scenario_demand'] = xr.DataArray( + np.random.randn(n_timesteps, 2), + dims=['time', 'scenario'], + coords={'time': ds.time, 'scenario': ds.scenario}, + ) + + # Variable with time + component + scenario + ds['component_scenario_flow'] = xr.DataArray( + np.random.randn(n_timesteps, 2, 2), + dims=['time', 'component', 'scenario'], + coords={'time': ds.time, 'component': ds.component, 'scenario': ds.scenario}, + ) + + return ds + + +@pytest.mark.parametrize('method', ['mean', 'sum', 'max', 'min', 'first', 'last']) +@pytest.mark.parametrize('freq', ['2h', '4h', '1D']) +def test_resample_equivalence_mixed_dimensions(method, freq): + """ + Test that _resample_by_dimension_groups produces same results as naive resampling. + + Uses a dataset with variables having different dimension structures. + """ + ds = create_dataset_with_mixed_dimensions(n_timesteps=100) + + # Method 1: Optimized approach (with dimension grouping) + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, freq, method) + + # Method 2: Naive approach (direct Dataset resampling) + result_naive = naive_dataset_resample(ds, freq, method) + + # Compare results + xr.testing.assert_allclose(result_optimized, result_naive) + + +@pytest.mark.parametrize('method', ['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median']) +def test_resample_equivalence_single_dimension(method): + """ + Test with variables having only time dimension. + """ + timesteps = pd.date_range('2020-01-01', periods=48, freq='h') + + ds = xr.Dataset(coords={'time': timesteps}) + ds['var1'] = xr.DataArray(np.random.randn(48), dims=['time'], coords={'time': ds.time}) + ds['var2'] = xr.DataArray(np.random.randn(48) * 10, dims=['time'], coords={'time': ds.time}) + ds['var3'] = xr.DataArray(np.random.randn(48) / 5, dims=['time'], coords={'time': ds.time}) + + # Optimized approach + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) + + # Naive approach + result_naive = naive_dataset_resample(ds, '2h', method) + + # Compare results + xr.testing.assert_allclose(result_optimized, result_naive) + + +def test_resample_equivalence_empty_dataset(): + """ + Test with an empty dataset (edge case). + """ + timesteps = pd.date_range('2020-01-01', periods=48, freq='h') + ds = xr.Dataset(coords={'time': timesteps}) + + # Both should handle empty dataset gracefully + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', 'mean') + result_naive = naive_dataset_resample(ds, '2h', 'mean') + + xr.testing.assert_allclose(result_optimized, result_naive) + + +def test_resample_equivalence_single_variable(): + """ + Test with a single variable. + """ + timesteps = pd.date_range('2020-01-01', periods=48, freq='h') + ds = xr.Dataset(coords={'time': timesteps}) + ds['single_var'] = xr.DataArray(np.random.randn(48), dims=['time'], coords={'time': ds.time}) + + # Test multiple methods + for method in ['mean', 'sum', 'max', 'min']: + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '3h', method) + result_naive = naive_dataset_resample(ds, '3h', method) + + xr.testing.assert_allclose(result_optimized, result_naive) + + +def test_resample_equivalence_with_nans(): + """ + Test with NaN values to ensure they're handled consistently. + """ + timesteps = pd.date_range('2020-01-01', periods=48, freq='h') + + ds = xr.Dataset(coords={'time': timesteps, 'component': ['a', 'b']}) + + # Create variable with some NaN values + data = np.random.randn(48, 2) + data[5:10, 0] = np.nan + data[20:25, 1] = np.nan + + ds['var_with_nans'] = xr.DataArray( + data, dims=['time', 'component'], coords={'time': ds.time, 'component': ds.component} + ) + + # Test with methods that handle NaNs + for method in ['mean', 'sum', 'max', 'min', 'first', 'last']: + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) + result_naive = naive_dataset_resample(ds, '2h', method) + + xr.testing.assert_allclose(result_optimized, result_naive) + + +def test_resample_equivalence_different_dimension_orders(): + """ + Test that dimension order doesn't affect the equivalence. + """ + timesteps = pd.date_range('2020-01-01', periods=48, freq='h') + + ds = xr.Dataset( + coords={ + 'time': timesteps, + 'x': ['x1', 'x2'], + 'y': ['y1', 'y2'], + } + ) + + # Variable with time first + ds['var_time_first'] = xr.DataArray( + np.random.randn(48, 2, 2), + dims=['time', 'x', 'y'], + coords={'time': ds.time, 'x': ds.x, 'y': ds.y}, + ) + + # Variable with time in middle + ds['var_time_middle'] = xr.DataArray( + np.random.randn(2, 48, 2), + dims=['x', 'time', 'y'], + coords={'x': ds.x, 'time': ds.time, 'y': ds.y}, + ) + + # Variable with time last + ds['var_time_last'] = xr.DataArray( + np.random.randn(2, 2, 48), + dims=['x', 'y', 'time'], + coords={'x': ds.x, 'y': ds.y, 'time': ds.time}, + ) + + for method in ['mean', 'sum', 'max', 'min']: + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) + result_naive = naive_dataset_resample(ds, '2h', method) + + xr.testing.assert_allclose(result_optimized, result_naive) + + +def test_resample_equivalence_multiple_variables_same_dims(): + """ + Test with multiple variables sharing the same dimensions. + + This is the key optimization case - variables with same dims should be + grouped and resampled together. + """ + timesteps = pd.date_range('2020-01-01', periods=48, freq='h') + + ds = xr.Dataset(coords={'time': timesteps, 'location': ['A', 'B', 'C']}) + + # Multiple variables with same dimensions (time, location) + for i in range(3): + ds[f'var_{i}'] = xr.DataArray( + np.random.randn(48, 3), + dims=['time', 'location'], + coords={'time': ds.time, 'location': ds.location}, + ) + + for method in ['mean', 'sum', 'max', 'min']: + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) + result_naive = naive_dataset_resample(ds, '2h', method) + + xr.testing.assert_allclose(result_optimized, result_naive) + + +def test_resample_equivalence_large_dataset(): + """ + Test with a larger, more realistic dataset. + """ + timesteps = pd.date_range('2020-01-01', periods=168, freq='h') # One week + + ds = xr.Dataset( + coords={ + 'time': timesteps, + 'component': [f'comp_{i}' for i in range(5)], + 'bus': [f'bus_{i}' for i in range(3)], + } + ) + + # Various variable types + ds['simple_var'] = xr.DataArray(np.random.randn(168), dims=['time'], coords={'time': ds.time}) + ds['component_var'] = xr.DataArray( + np.random.randn(168, 5), dims=['time', 'component'], coords={'time': ds.time, 'component': ds.component} + ) + ds['bus_var'] = xr.DataArray(np.random.randn(168, 3), dims=['time', 'bus'], coords={'time': ds.time, 'bus': ds.bus}) + ds['complex_var'] = xr.DataArray( + np.random.randn(168, 5, 3), + dims=['time', 'component', 'bus'], + coords={'time': ds.time, 'component': ds.component, 'bus': ds.bus}, + ) + + # Test with a subset of methods (to keep test time reasonable) + for method in ['mean', 'sum', 'first']: + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '1D', method) + result_naive = naive_dataset_resample(ds, '1D', method) + + xr.testing.assert_allclose(result_optimized, result_naive) + + +def test_resample_equivalence_with_kwargs(): + """ + Test that kwargs are properly forwarded to resample(). + + Verifies that additional arguments like label and closed are correctly + passed through the optimization path. + """ + timesteps = pd.date_range('2020-01-01', periods=48, freq='h') + ds = xr.Dataset(coords={'time': timesteps}) + ds['var'] = xr.DataArray(np.random.randn(48), dims=['time'], coords={'time': ds.time}) + + kwargs = {'label': 'right', 'closed': 'right'} + result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', 'mean', **kwargs) + result_naive = ds.resample(time='2h', **kwargs).mean() + + xr.testing.assert_allclose(result_optimized, result_naive) + + +if __name__ == '__main__': + pytest.main(['-v', __file__])