diff --git a/flixopt/features.py b/flixopt/features.py index e34cb9a40..c2a62adb1 100644 --- a/flixopt/features.py +++ b/flixopt/features.py @@ -4,7 +4,7 @@ """ import logging -from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union import linopy import numpy as np @@ -12,7 +12,7 @@ from . import utils from .config import CONFIG from .core import NumericData, Scalar, TimeSeries -from .interface import InvestParameters, OnOffParameters, Piece, Piecewise, PiecewiseConversion, PiecewiseEffects +from .interface import InvestParameters, OnOffParameters, Piecewise from .structure import Model, SystemModel logger = logging.getLogger('flixopt') @@ -155,7 +155,7 @@ def _create_bounds_for_defining_variable(self): ) if self._on_variable is not None: raise ValueError( - f'Flow {self.label} has a fixed relative flow rate and an on_variable.' + f'Flow {self.label_full} has a fixed relative flow rate and an on_variable.' f'This combination is currently not supported.' ) return @@ -193,53 +193,52 @@ def _create_bounds_for_defining_variable(self): # anmerkung: Glg bei Spezialfall relative_minimum = 0 redundant zu OnOff ?? -class OnOffModel(Model): +class StateModel(Model): """ - Class for modeling the on and off state of a variable - If defining_bounds are given, creates sufficient lower bounds + Handles basic on/off binary states for defining variables """ def __init__( self, model: SystemModel, - on_off_parameters: OnOffParameters, label_of_element: str, defining_variables: List[linopy.Variable], defining_bounds: List[Tuple[NumericData, NumericData]], - previous_values: List[Optional[NumericData]], + previous_values: List[Optional[NumericData]] = None, + use_off: bool = True, + on_hours_total_min: Optional[NumericData] = 0, + on_hours_total_max: Optional[NumericData] = None, + effects_per_running_hour: Dict[str, NumericData] = None, label: Optional[str] = None, ): """ - Constructor for OnOffModel + Models binary state variables based on a continous variable. Args: - model: Reference to the SystemModel - on_off_parameters: Parameters for the OnOffModel - label_of_element: Label of the Parent - defining_variables: List of Variables that are used to define the OnOffModel + model: The SystemModel that is used to create the model. + label_of_element: The label of the parent (Element). Used to construct the full label of the model. + defining_variables: List of Variables that are used to define the state defining_bounds: List of Tuples, defining the absolute bounds of each defining variable previous_values: List of previous values of the defining variables + use_off: Whether to use the off state or not + on_hours_total_min: min. overall sum of operating hours. + on_hours_total_max: max. overall sum of operating hours. + effects_per_running_hour: Costs per operating hours label: Label of the OnOffModel """ super().__init__(model, label_of_element, label) assert len(defining_variables) == len(defining_bounds), 'Every defining Variable needs bounds to Model OnOff' - self.parameters = on_off_parameters self._defining_variables = defining_variables - # Ensure that no lower bound is below a certain threshold - self._defining_bounds = [(np.maximum(lb, CONFIG.modeling.EPSILON), ub) for lb, ub in defining_bounds] - self._previous_values = previous_values - - self.on: Optional[linopy.Variable] = None + self._defining_bounds = defining_bounds + self._previous_values = previous_values or [] + self._on_hours_total_min = on_hours_total_min if on_hours_total_min is not None else 0 + self._on_hours_total_max = on_hours_total_max if on_hours_total_max is not None else np.inf + self._use_off = use_off + self._effects_per_running_hour = effects_per_running_hour or {} + + self.on = None self.total_on_hours: Optional[linopy.Variable] = None - - self.consecutive_on_hours: Optional[linopy.Variable] = None - self.consecutive_off_hours: Optional[linopy.Variable] = None - - self.off: Optional[linopy.Variable] = None - - self.switch_on: Optional[linopy.Variable] = None - self.switch_off: Optional[linopy.Variable] = None - self.switch_on_nr: Optional[linopy.Variable] = None + self.off = None def do_modeling(self): self.on = self.add( @@ -253,8 +252,9 @@ def do_modeling(self): self.total_on_hours = self.add( self._model.add_variables( - lower=self.parameters.on_hours_total_min if self.parameters.on_hours_total_min is not None else 0, - upper=self.parameters.on_hours_total_max if self.parameters.on_hours_total_max is not None else np.inf, + lower=self._on_hours_total_min, + upper=self._on_hours_total_max, + coords=None, name=f'{self.label_full}|on_hours_total', ), 'on_hours_total', @@ -268,9 +268,10 @@ def do_modeling(self): 'on_hours_total', ) - self._add_on_constraints() + # Add defining constraints for each variable + self._add_defining_constraints() - if self.parameters.use_off: + if self._use_off: self.off = self.add( self._model.add_variables( name=f'{self.label_full}|off', @@ -280,69 +281,21 @@ def do_modeling(self): 'off', ) - # eq: var_on(t) + var_off(t) = 1 + # Constraint: on + off = 1 self.add(self._model.add_constraints(self.on + self.off == 1, name=f'{self.label_full}|off'), 'off') - if self.parameters.use_consecutive_on_hours: - self.consecutive_on_hours = self._get_duration_in_hours( - 'consecutive_on_hours', - self.on, - self.previous_consecutive_on_hours, - self.parameters.consecutive_on_hours_min, - self.parameters.consecutive_on_hours_max, - ) - - if self.parameters.use_consecutive_off_hours: - self.consecutive_off_hours = self._get_duration_in_hours( - 'consecutive_off_hours', - self.off, - self.previous_consecutive_off_hours, - self.parameters.consecutive_off_hours_min, - self.parameters.consecutive_off_hours_max, - ) - - if self.parameters.use_switch_on: - self.switch_on = self.add( - self._model.add_variables(binary=True, name=f'{self.label_full}|switch_on', coords=self._model.coords), - 'switch_on', - ) - - self.switch_off = self.add( - self._model.add_variables(binary=True, name=f'{self.label_full}|switch_off', coords=self._model.coords), - 'switch_off', - ) - - self.switch_on_nr = self.add( - self._model.add_variables( - lower=0, - upper=self.parameters.switch_on_total_max - if self.parameters.switch_on_total_max is not None - else np.inf, - name=f'{self.label_full}|switch_on_nr', - ), - 'switch_on_nr', - ) - - self._add_switch_constraints() - - self._create_shares() - - def _add_on_constraints(self): - assert self.on is not None, f'On variable of {self.label_full} must be defined to add constraints' - # % Bedingungen 1) und 2) müssen erfüllt sein: - - # % Anmerkung: Falls "abschnittsweise linear" gewählt, dann ist eigentlich nur Bedingung 1) noch notwendig - # % (und dann auch nur wenn erstes Piece bei Q_th=0 beginnt. Dann soll bei Q_th=0 (d.h. die Maschine ist Aus) On = 0 und segment1.onSeg = 0):) - # % Fazit: Wenn kein Performance-Verlust durch mehr Gleichungen, dann egal! + return self + def _add_defining_constraints(self): + """Add constraints that link defining variables to the on state""" nr_of_def_vars = len(self._defining_variables) - assert nr_of_def_vars > 0, 'Achtung: mindestens 1 Flow notwendig' if nr_of_def_vars == 1: + # Case for a single defining variable def_var = self._defining_variables[0] lb, ub = self._defining_bounds[0] - # eq: On(t) * max(epsilon, lower_bound) <= Q_th(t) + # Constraint: on * lower_bound <= def_var self.add( self._model.add_constraints( self.on * np.maximum(CONFIG.modeling.EPSILON, lb) <= def_var, name=f'{self.label_full}|on_con1' @@ -350,20 +303,16 @@ def _add_on_constraints(self): 'on_con1', ) - # eq: Q_th(t) <= Q_th_max * On(t) + # Constraint: on * upper_bound >= def_var self.add( - self._model.add_constraints( - self.on * np.maximum(CONFIG.modeling.EPSILON, ub) >= def_var, name=f'{self.label_full}|on_con2' - ), - 'on_con2', + self._model.add_constraints(self.on * ub >= def_var, name=f'{self.label_full}|on_con2'), 'on_con2' ) - - else: # Bei mehreren Leistungsvariablen: + else: + # Case for multiple defining variables ub = sum(bound[1] for bound in self._defining_bounds) / nr_of_def_vars lb = CONFIG.modeling.EPSILON #TODO: Can this be a bigger value? (maybe the smallest bound?) - # When all defining variables are 0, On is 0 - # eq: On(t) * Epsilon <= sum(alle Leistungen(t)) + # Constraint: on * epsilon <= sum(all_defining_variables) self.add( self._model.add_constraints( self.on * lb <= sum(self._defining_variables), name=f'{self.label_full}|on_con1' @@ -371,10 +320,8 @@ def _add_on_constraints(self): 'on_con1', ) - ## sum(alle Leistung) >0 -> On = 1|On=0 -> sum(Leistung)=0 - # eq: sum( Leistung(t,i)) - sum(Leistung_max(i)) * On(t) <= 0 - # --> damit Gleichungswerte nicht zu groß werden, noch durch nr_of_flows geteilt: - # eq: sum( Leistung(t,i) / nr_of_flows ) - sum(Leistung_max(i)) / nr_of_flows * On(t) <= 0 + # Constraint to ensure all variables are zero when off. + # Divide by nr_of_def_vars to improve numerical stability (smaller factors) self.add( self._model.add_constraints( self.on * ub >= sum([def_var / nr_of_def_vars for def_var in self._defining_variables]), @@ -383,285 +330,261 @@ def _add_on_constraints(self): 'on_con2', ) - if np.max(ub) > CONFIG.modeling.BIG_BINARY_BOUND: - logger.warning( - f'In "{self.label_full}", a binary definition was created with a big upper bound ' - f'({np.max(ub)}). This can lead to wrong results regarding the on and off variables. ' - f'Avoid this warning by reducing the size of {self.label_full} ' - f'(or the maximum_size of the corresponding InvestParameters). ' - f'If its a Component, you might need to adjust the sizes of all of its flows.' - ) + @property + def previous_states(self) -> np.ndarray: + """Computes the previous states {0, 1} of defining variables as a binary array from their previous values.""" + return StateModel.compute_previous_states(self._previous_values, epsilon=CONFIG.modeling.EPSILON) - def _get_duration_in_hours( - self, - variable_name: str, - binary_variable: linopy.Variable, - previous_duration: Scalar, - minimum_duration: Optional[TimeSeries], - maximum_duration: Optional[TimeSeries], - ) -> linopy.Variable: - """ - creates duration variable and adds constraints to a time-series variable to enforce duration limits based on - binary activity. - The minimum duration in the last time step is not restricted. - Previous values before t=0 are not recognised! + @property + def previous_on_states(self) -> np.ndarray: + return self.previous_states - Args: - variable_name: Label for the duration variable to be created. - binary_variable: Time-series binary variable (e.g., [0, 0, 1, 1, 1, 0, ...]) representing activity states. - minimum_duration: Minimum duration the activity must remain active once started. - If None, no minimum duration constraint is applied. - maximum_duration: Maximum duration the activity can remain active. - If None, the maximum duration is set to the total available time. + @property + def previous_off_states(self): + return 1 - self.previous_states - Returns: - The created duration variable representing consecutive active durations. + @staticmethod + def compute_previous_states(previous_values: List[NumericData], epsilon: float = 1e-5) -> np.ndarray: + """Computes the previous states {0, 1} of defining variables as a binary array from their previous values.""" + if not previous_values or all([val is None for val in previous_values]): + return np.array([0]) - Example: - binary_variable: [0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, ...] - duration_in_hours: [0, 0, 1, 2, 3, 4, 0, 1, 2, 3, 0, ...] (only if dt_in_hours=1) + # Convert to 2D-array and compute binary on/off states + previous_values = np.array([values for values in previous_values if values is not None]) # Filter out None + if previous_values.ndim > 1: + return np.any(~np.isclose(previous_values, 0, atol=epsilon), axis=0).astype(int) - Here, duration_in_hours increments while binary_variable is 1. Minimum and maximum durations - can be enforced to constrain how long the activity remains active. + return (~np.isclose(previous_values, 0, atol=epsilon)).astype(int) - Notes: - - To count consecutive zeros instead of ones, use a transformed binary variable - (e.g., `1 - binary_variable`). - - Constraints ensure the duration variable properly resets or increments based on activity. - Raises: - AssertionError: If the binary_variable is None, indicating the duration constraints cannot be applied. +class SwitchStateModel(Model): + """ + Handles switch on/off transitions + """ - """ - assert binary_variable is not None, f'Duration Variable of {self.label_full} must be defined to add constraints' + def __init__( + self, + model: SystemModel, + label_of_element: str, + state_variable: linopy.Variable, + previous_state=0, + switch_on_max: Optional[Scalar] = None, + label: Optional[str] = None, + ): + super().__init__(model, label_of_element, label) + self._state_variable = state_variable + self.previous_state = previous_state + self._switch_on_max = switch_on_max if switch_on_max is not None else np.inf - mega = self._model.hours_per_step.sum() + previous_duration + self.switch_on = None + self.switch_off = None + self.switch_on_nr = None - if maximum_duration is not None: - first_step_max: Scalar = maximum_duration.isel(time=0) + def do_modeling(self): + """Create switch variables and constraints""" - if previous_duration + self._model.hours_per_step[0] > first_step_max: - logger.warning( - f'The maximum duration of "{variable_name}" is set to {maximum_duration.active_data}h, ' - f'but the consecutive_duration previous to this model is {previous_duration}h. ' - f'This forces "{binary_variable.name} = 0" in the first time step ' - f'(dt={self._model.hours_per_step[0]}h)!' - ) + # Create switch variables + self.switch_on = self.add( + self._model.add_variables(binary=True, name=f'{self.label_full}|switch_on', coords=self._model.coords), + 'switch_on', + ) + + self.switch_off = self.add( + self._model.add_variables(binary=True, name=f'{self.label_full}|switch_off', coords=self._model.coords), + 'switch_off', + ) - duration_in_hours = self.add( + # Create count variable for number of switches + self.switch_on_nr = self.add( self._model.add_variables( + upper=self._switch_on_max, lower=0, - upper=maximum_duration.active_data if maximum_duration is not None else mega, - coords=self._model.coords, - name=f'{self.label_full}|{variable_name}', + name=f'{self.label_full}|switch_on_nr', ), - variable_name, + 'switch_on_nr', ) - # 1) eq: duration(t) - On(t) * BIG <= 0 + # Add switch constraints for all entries after the first timestep self.add( self._model.add_constraints( - duration_in_hours <= binary_variable * mega, name=f'{self.label_full}|{variable_name}_con1' + self.switch_on.isel(time=slice(1, None)) - self.switch_off.isel(time=slice(1, None)) + == self._state_variable.isel(time=slice(1, None)) - self._state_variable.isel(time=slice(None, -1)), + name=f'{self.label_full}|switch_con', ), - f'{variable_name}_con1', + 'switch_con', ) - # 2a) eq: duration(t) - duration(t-1) <= dt(t) - # on(t)=1 -> duration(t) - duration(t-1) <= dt(t) - # on(t)=0 -> duration(t-1) >= negat. value + # Initial switch constraint self.add( self._model.add_constraints( - duration_in_hours.isel(time=slice(1, None)) - <= duration_in_hours.isel(time=slice(None, -1)) + self._model.hours_per_step.isel(time=slice(None, -1)), - name=f'{self.label_full}|{variable_name}_con2a', + self.switch_on.isel(time=0) - self.switch_off.isel(time=0) + == self._state_variable.isel(time=0) - self.previous_state, + name=f'{self.label_full}|initial_switch_con', ), - f'{variable_name}_con2a', + 'initial_switch_con', ) - # 2b) eq: dt(t) - BIG * ( 1-On(t) ) <= duration(t) - duration(t-1) - # eq: -duration(t) + duration(t-1) + On(t) * BIG <= -dt(t) + BIG - # with BIG = dt_in_hours_total. - # on(t)=1 -> duration(t)- duration(t-1) >= dt(t) - # on(t)=0 -> duration(t)- duration(t-1) >= negat. value + # Mutual exclusivity constraint + self.add( + self._model.add_constraints(self.switch_on + self.switch_off <= 1.1, name=f'{self.label_full}|switch_on_or_off'), + 'switch_on_or_off', + ) + # Total switch-on count constraint self.add( self._model.add_constraints( - duration_in_hours.isel(time=slice(1, None)) - >= duration_in_hours.isel(time=slice(None, -1)) - + self._model.hours_per_step.isel(time=slice(None, -1)) - + (binary_variable.isel(time=slice(1, None)) - 1) * mega, - name=f'{self.label_full}|{variable_name}_con2b', + self.switch_on_nr == self.switch_on.sum('time'), name=f'{self.label_full}|switch_on_nr' ), - f'{variable_name}_con2b', + 'switch_on_nr', ) - # 3) check minimum_duration before switchOff-step + return self - if minimum_duration is not None: - # Note: switchOff-step is when: On(t) - On(t+1) == 1 - # Note: (last on-time period (with last timestep of period t=n) is not checked and can be shorter) - # Note: (previous values before t=1 are not recognised!) - # eq: duration(t) >= minimum_duration(t) * [On(t) - On(t+1)] for t=1..(n-1) - # eq: -duration(t) + minimum_duration(t) * On(t) - minimum_duration(t) * On(t+1) <= 0 - self.add( - self._model.add_constraints( - duration_in_hours - >= (binary_variable.isel(time=slice(None, -1)) - binary_variable.isel(time=slice(1, None))) - * minimum_duration.isel(time=slice(None, -1)), - name=f'{self.label_full}|{variable_name}_minimum_duration', - ), - f'{variable_name}_minimum_duration', - ) - if 0 < previous_duration < minimum_duration.isel(time=0): - # Force the first step to be = 1, if the minimum_duration is not reached in previous_values - # Note: Only if the previous consecutive_duration is smaller than the minimum duration - # and the previous_duration is greater 0! - # eq: On(t=0) = 1 - self.add( - self._model.add_constraints( - binary_variable.isel(time=0) == 1, name=f'{self.label_full}|{variable_name}_minimum_inital' - ), - f'{variable_name}_minimum_inital', - ) +class ConsecutiveStateModel(Model): + """ + Handles tracking consecutive durations in a state + """ - # 4) first index: - # eq: duration(t=0)= dt(0) * On(0) - self.add( - self._model.add_constraints( - duration_in_hours.isel(time=0) - == self._model.hours_per_step.isel(time=0) * binary_variable.isel(time=0), - name=f'{self.label_full}|{variable_name}_initial', - ), - f'{variable_name}_initial', - ) + def __init__( + self, + model: SystemModel, + label_of_element: str, + state_variable: linopy.Variable, + minimum_duration: Optional[NumericData] = None, + maximum_duration: Optional[NumericData] = None, + previous_states: Optional[NumericData] = None, + label: Optional[str] = None, + ): + """ + Model and constraint the consecutive duration of a state variable. - return duration_in_hours + Args: + model: The SystemModel that is used to create the model. + label_of_element: The label of the parent (Element). Used to construct the full label of the model. + state_variable: The state variable that is used to model the duration. state = {0, 1} + minimum_duration: The minimum duration of the state variable. + maximum_duration: The maximum duration of the state variable. + previous_states: The previous states of the state variable. + label: The label of the model. Used to construct the full label of the model. + """ + super().__init__(model, label_of_element, label) + self._state_variable = state_variable + self._previous_states = previous_states + self._minimum_duration = minimum_duration + self._maximum_duration = maximum_duration - def _add_switch_constraints(self): - assert self.switch_on is not None, f'Switch On Variable of {self.label_full} must be defined to add constraints' - assert self.switch_off is not None, ( - f'Switch Off Variable of {self.label_full} must be defined to add constraints' - ) - assert self.switch_on_nr is not None, ( - f'Nr of Switch On Variable of {self.label_full} must be defined to add constraints' - ) - assert self.on is not None, f'On Variable of {self.label_full} must be defined to add constraints' - # % Schaltänderung aus On-Variable - # % SwitchOn(t)-SwitchOff(t) = On(t)-On(t-1) - self.add( - self._model.add_constraints( - self.switch_on.isel(time=slice(1, None)) - self.switch_off.isel(time=slice(1, None)) - == self.on.isel(time=slice(1, None)) - self.on.isel(time=slice(None, -1)), - name=f'{self.label_full}|switch_con', + if isinstance(self._minimum_duration, TimeSeries): + self._minimum_duration = self._minimum_duration.active_data + if isinstance(self._maximum_duration, TimeSeries): + self._maximum_duration = self._maximum_duration.active_data + + self.duration = None + + def do_modeling(self): + """Create consecutive duration variables and constraints""" + # Get the hours per step + hours_per_step = self._model.hours_per_step + mega = hours_per_step.sum('time') + self.previous_duration + + # Create the duration variable + self.duration = self.add( + self._model.add_variables( + lower=0, + upper=self._maximum_duration if self._maximum_duration is not None else mega, + coords=self._model.coords, + name=f'{self.label_full}|hours', ), - 'switch_con', + 'hours', ) - # Initital switch on - # eq: SwitchOn(t=0)-SwitchOff(t=0) = On(t=0) - On(t=-1) + + # Add constraints + + # Upper bound constraint self.add( self._model.add_constraints( - self.switch_on.isel(time=0) - self.switch_off.isel(time=0) - == self.on.isel(time=0) - self.previous_on_values[-1], - name=f'{self.label_full}|initial_switch_con', + self.duration <= self._state_variable * mega, name=f'{self.label_full}|con1' ), - 'initial_switch_con', + 'con1', ) - ## Entweder SwitchOff oder SwitchOn - # eq: SwitchOn(t) + SwitchOff(t) <= 1.1 + + # Forward constraint self.add( self._model.add_constraints( - self.switch_on + self.switch_off <= 1.1, name=f'{self.label_full}|switch_on_or_off' + self.duration.isel(time=slice(1, None)) + <= self.duration.isel(time=slice(None, -1)) + hours_per_step.isel(time=slice(None, -1)), + name=f'{self.label_full}|con2a', ), - 'switch_on_or_off', + 'con2a', ) - ## Anzahl Starts: - # eq: nrSwitchOn = sum(SwitchOn(t)) + # Backward constraint self.add( self._model.add_constraints( - self.switch_on_nr == self.switch_on.sum(), name=f'{self.label_full}|switch_on_nr' + self.duration.isel(time=slice(1, None)) + >= self.duration.isel(time=slice(None, -1)) + + hours_per_step.isel(time=slice(None, -1)) + + (self._state_variable.isel(time=slice(1, None)) - 1) * mega, + name=f'{self.label_full}|con2b', ), - 'switch_on_nr', + 'con2b', ) - def _create_shares(self): - # Anfahrkosten: - effects_per_switch_on = self.parameters.effects_per_switch_on - if effects_per_switch_on != {}: - self._model.effects.add_share_to_effects( - name=self.label_of_element, - expressions={effect: self.switch_on * factor for effect, factor in effects_per_switch_on.items()}, - target='operation', - ) - - # Betriebskosten: - effects_per_running_hour = self.parameters.effects_per_running_hour - if effects_per_running_hour != {}: - self._model.effects.add_share_to_effects( - name=self.label_of_element, - expressions={ - effect: self.on * factor * self._model.hours_per_step - for effect, factor in effects_per_running_hour.items() - }, - target='operation', + # Add minimum duration constraints if specified + if self._minimum_duration is not None: + self.add( + self._model.add_constraints( + self.duration + >= ( + self._state_variable.isel(time=slice(None, -1)) - self._state_variable.isel(time=slice(1, None)) + ) + * self._minimum_duration.isel(time=slice(None, -1)), + name=f'{self.label_full}|minimum', + ), + 'minimum', ) - @property - def previous_on_values(self) -> np.ndarray: - return self.compute_previous_on_states(self._previous_values) + # Handle initial condition + if 0 < self.previous_duration < self._minimum_duration.isel(time=0): + self.add( + self._model.add_constraints( + self._state_variable.isel(time=0) == 1, name=f'{self.label_full}|initial_minimum' + ), + 'initial_minimum', + ) - @property - def previous_off_values(self) -> np.ndarray: - return 1 - self.previous_on_values + # Set initial value + self.add( + self._model.add_constraints( + self.duration.isel(time=0) == + (hours_per_step.isel(time=0) + self.previous_duration) * self._state_variable.isel(time=0), + name=f'{self.label_full}|initial', + ), + 'initial', + ) - @property - def previous_consecutive_on_hours(self) -> Scalar: - return self.compute_consecutive_duration(self.previous_on_values, self._model.hours_per_step) + return self @property - def previous_consecutive_off_hours(self) -> Scalar: - return self.compute_consecutive_duration(self.previous_off_values, self._model.hours_per_step) - - @staticmethod - def compute_previous_on_states(previous_values: List[Optional[NumericData]], epsilon: float = 1e-5) -> np.ndarray: - """ - Computes the previous 'on' states {0, 1} of defining variables as a binary array from their previous values. - - Args: - previous_values: List of previous values of the defining variables. In Range [0, inf] or None (ignored) - epsilon: Tolerance for equality to determine "off" state, default is 1e-5. - - Returns: - A binary array (0 and 1) indicating the previous on/off states of the variables. - Returns `array([0])` if no previous values are available. - """ - for arr in previous_values: - if isinstance(arr, np.ndarray) and arr.ndim > 1: - raise ValueError('Only 1D arrays or None values are supported for previous_values') - - if not previous_values or all([val is None for val in previous_values]): - return np.array([0]) - else: # Convert to 2D-array and compute binary on/off states - previous_values = np.array([values for values in previous_values if values is not None]) # Filter out None - if previous_values.ndim > 1: - return np.any(~np.isclose(previous_values, 0, atol=epsilon), axis=0).astype(int) - else: - return (~np.isclose(previous_values, 0, atol=epsilon)).astype(int) + def previous_duration(self) -> Scalar: + """Computes the previous duration of the state variable""" + #TODO: Allow for other/dynamic timestep resolutions + return ConsecutiveStateModel.compute_consecutive_hours_in_state( + self._previous_states, self._model.hours_per_step.isel(time=0).item() + ) @staticmethod - def compute_consecutive_duration( + def compute_consecutive_hours_in_state( binary_values: NumericData, hours_per_timestep: Union[int, float, np.ndarray] ) -> Scalar: """ - Computes the final consecutive duration in State 'on' (=1) in hours, from a binary. - - hours_per_timestep is handled in a way, that maximizes compatability. - Its length must only be as long as the last consecutive duration in binary_values. + Computes the final consecutive duration in state 'on' (=1) in hours, from a binary array. Args: binary_values: An int or 1D binary array containing only `0`s and `1`s. hours_per_timestep: The duration of each timestep in hours. + If a scalar is provided, it is used for all timesteps. + If an array is provided, it must be as long as the last consecutive duration in binary_values. Returns: The duration of the binary variable in hours. @@ -699,6 +622,155 @@ def compute_consecutive_duration( return np.sum(binary_values[-nr_of_indexes_with_consecutive_ones:] * hours_per_timestep[-nr_of_indexes_with_consecutive_ones:]) +class OnOffModel(Model): + """ + Class for modeling the on and off state of a variable + Uses component models to create a modular implementation + """ + + def __init__( + self, + model: SystemModel, + on_off_parameters: OnOffParameters, + label_of_element: str, + defining_variables: List[linopy.Variable], + defining_bounds: List[Tuple[NumericData, NumericData]], + previous_values: List[Optional[NumericData]], + label: Optional[str] = None, + ): + """ + Constructor for OnOffModel + + Args: + model: Reference to the SystemModel + on_off_parameters: Parameters for the OnOffModel + label_of_element: Label of the Parent + defining_variables: List of Variables that are used to define the OnOffModel + defining_bounds: List of Tuples, defining the absolute bounds of each defining variable + previous_values: List of previous values of the defining variables + label: Label of the OnOffModel + """ + super().__init__(model, label_of_element, label) + self.parameters = on_off_parameters + self._defining_variables = defining_variables + self._defining_bounds = defining_bounds + self._previous_values = previous_values + + self.state_model = None + self.switch_state_model = None + self.consecutive_on_model = None + self.consecutive_off_model = None + + def do_modeling(self): + """Create all variables and constraints for the OnOffModel""" + + # Create binary state component + self.state_model = StateModel( + model=self._model, + label_of_element=self.label_of_element, + defining_variables=self._defining_variables, + defining_bounds=self._defining_bounds, + previous_values=self._previous_values, + use_off=self.parameters.use_off, + on_hours_total_min=self.parameters.on_hours_total_min, + on_hours_total_max=self.parameters.on_hours_total_max, + effects_per_running_hour=self.parameters.effects_per_running_hour, + ) + self.add(self.state_model) + self.state_model.do_modeling() + + # Create switch component if needed + if self.parameters.use_switch_on: + self.switch_state_model = SwitchStateModel( + model=self._model, + label_of_element=self.label_of_element, + state_variable=self.state_model.on, + previous_state=self.state_model.previous_on_states[-1], + switch_on_max=self.parameters.switch_on_total_max, + ) + self.add(self.switch_state_model) + self.switch_state_model.do_modeling() + + # Create consecutive on hours component if needed + if self.parameters.use_consecutive_on_hours: + self.consecutive_on_model = ConsecutiveStateModel( + model=self._model, + label_of_element=self.label_of_element, + state_variable=self.state_model.on, + minimum_duration=self.parameters.consecutive_on_hours_min, + maximum_duration=self.parameters.consecutive_on_hours_max, + previous_states=self.state_model.previous_on_states, + label='ConsecutiveOn', + ) + self.add(self.consecutive_on_model) + self.consecutive_on_model.do_modeling() + + # Create consecutive off hours component if needed + if self.parameters.use_consecutive_off_hours: + self.consecutive_off_model = ConsecutiveStateModel( + model=self._model, + label_of_element=self.label_of_element, + state_variable=self.state_model.off, + minimum_duration=self.parameters.consecutive_off_hours_min, + maximum_duration=self.parameters.consecutive_off_hours_max, + previous_states=self.state_model.previous_off_states, + label='ConsecutiveOff', + ) + self.add(self.consecutive_off_model) + self.consecutive_off_model.do_modeling() + + self._create_shares() + + def _create_shares(self): + if self.parameters.effects_per_running_hour: + self._model.effects.add_share_to_effects( + name=self.label_of_element, + expressions={ + effect: self.state_model.on * factor * self._model.hours_per_step + for effect, factor in self.parameters.effects_per_running_hour.items() + }, + target='operation', + ) + + if self.parameters.effects_per_switch_on: + self._model.effects.add_share_to_effects( + name=self.label_of_element, + expressions={ + effect: self.switch_state_model.switch_on * factor + for effect, factor in self.parameters.effects_per_switch_on.items() + }, + target='operation', + ) + + @property + def on(self): + return self.state_model.on + + @property + def off(self): + return self.state_model.off + + @property + def switch_on(self): + return self.switch_state_model.switch_on + + @property + def switch_off(self): + return self.switch_state_model.switch_off + + @property + def switch_on_nr(self): + return self.switch_state_model.switch_on_nr + + @property + def consecutive_on_hours(self): + return self.consecutive_on_model.duration + + @property + def consecutive_off_hours(self): + return self.consecutive_off_model.duration + + class PieceModel(Model): """Class for modeling a linear piece of one or more variables in parallel""" diff --git a/tests/test_flow.py b/tests/test_flow.py index 5026c6120..10266b1f3 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -568,52 +568,51 @@ def test_consecutive_on_hours(self, basic_flow_system_linopy): flow_system.add_elements( fx.Sink('Sink', sink=flow)) model = create_linopy_model(flow_system) - assert {'Sink(Wärme)|consecutive_on_hours', 'Sink(Wärme)|on'}.issubset(set(flow.model.variables)) + assert {'Sink(Wärme)|ConsecutiveOn|hours', 'Sink(Wärme)|on'}.issubset(set(flow.model.variables)) - assert { - 'Sink(Wärme)|consecutive_on_hours_con1', - 'Sink(Wärme)|consecutive_on_hours_con2a', - 'Sink(Wärme)|consecutive_on_hours_con2b', - 'Sink(Wärme)|consecutive_on_hours_initial', - 'Sink(Wärme)|consecutive_on_hours_minimum_duration' + assert {'Sink(Wärme)|ConsecutiveOn|con1', + 'Sink(Wärme)|ConsecutiveOn|con2a', + 'Sink(Wärme)|ConsecutiveOn|con2b', + 'Sink(Wärme)|ConsecutiveOn|initial', + 'Sink(Wärme)|ConsecutiveOn|minimum', }.issubset(set(flow.model.constraints)) assert_var_equal( - model.variables['Sink(Wärme)|consecutive_on_hours'], + model.variables['Sink(Wärme)|ConsecutiveOn|hours'], model.add_variables(lower=0, upper=8, coords=(timesteps,)) ) mega = model.hours_per_step.sum('time') assert_conequal( - model.constraints['Sink(Wärme)|consecutive_on_hours_con1'], - model.variables['Sink(Wärme)|consecutive_on_hours'] <= model.variables['Sink(Wärme)|on'] * mega + model.constraints['Sink(Wärme)|ConsecutiveOn|con1'], + model.variables['Sink(Wärme)|ConsecutiveOn|hours'] <= model.variables['Sink(Wärme)|on'] * mega ) assert_conequal( - model.constraints['Sink(Wärme)|consecutive_on_hours_con2a'], - model.variables['Sink(Wärme)|consecutive_on_hours'].isel(time=slice(1, None)) - <= model.variables['Sink(Wärme)|consecutive_on_hours'].isel(time=slice(None, -1)) + model.hours_per_step.isel(time=slice(None, -1)) + model.constraints['Sink(Wärme)|ConsecutiveOn|con2a'], + model.variables['Sink(Wärme)|ConsecutiveOn|hours'].isel(time=slice(1, None)) + <= model.variables['Sink(Wärme)|ConsecutiveOn|hours'].isel(time=slice(None, -1)) + model.hours_per_step.isel(time=slice(None, -1)) ) # eq: duration(t) >= duration(t - 1) + dt(t) + (On(t) - 1) * BIG assert_conequal( - model.constraints['Sink(Wärme)|consecutive_on_hours_con2b'], - model.variables['Sink(Wärme)|consecutive_on_hours'].isel(time=slice(1, None)) - >= model.variables['Sink(Wärme)|consecutive_on_hours'].isel(time=slice(None, -1)) + model.constraints['Sink(Wärme)|ConsecutiveOn|con2b'], + model.variables['Sink(Wärme)|ConsecutiveOn|hours'].isel(time=slice(1, None)) + >= model.variables['Sink(Wärme)|ConsecutiveOn|hours'].isel(time=slice(None, -1)) + model.hours_per_step.isel(time=slice(None, -1)) + (model.variables['Sink(Wärme)|on'].isel(time=slice(1, None)) - 1) * mega ) assert_conequal( - model.constraints['Sink(Wärme)|consecutive_on_hours_initial'], - model.variables['Sink(Wärme)|consecutive_on_hours'].isel(time=0) + model.constraints['Sink(Wärme)|ConsecutiveOn|initial'], + model.variables['Sink(Wärme)|ConsecutiveOn|hours'].isel(time=0) == model.variables['Sink(Wärme)|on'].isel(time=0) * model.hours_per_step.isel(time=0), ) assert_conequal( - model.constraints['Sink(Wärme)|consecutive_on_hours_minimum_duration'], - model.variables['Sink(Wärme)|consecutive_on_hours'] + model.constraints['Sink(Wärme)|ConsecutiveOn|minimum'], + model.variables['Sink(Wärme)|ConsecutiveOn|hours'] >= (model.variables['Sink(Wärme)|on'].isel(time=slice(None, -1)) - model.variables['Sink(Wärme)|on'].isel(time=slice(1, None))) * 2 ) @@ -635,52 +634,52 @@ def test_consecutive_off_hours(self, basic_flow_system_linopy): flow_system.add_elements( fx.Sink('Sink', sink=flow)) model = create_linopy_model(flow_system) - assert {'Sink(Wärme)|consecutive_off_hours', 'Sink(Wärme)|off'}.issubset(set(flow.model.variables)) + assert {'Sink(Wärme)|ConsecutiveOff|hours', 'Sink(Wärme)|off'}.issubset(set(flow.model.variables)) assert { - 'Sink(Wärme)|consecutive_off_hours_con1', - 'Sink(Wärme)|consecutive_off_hours_con2a', - 'Sink(Wärme)|consecutive_off_hours_con2b', - 'Sink(Wärme)|consecutive_off_hours_initial', - 'Sink(Wärme)|consecutive_off_hours_minimum_duration' + 'Sink(Wärme)|ConsecutiveOff|con1', + 'Sink(Wärme)|ConsecutiveOff|con2a', + 'Sink(Wärme)|ConsecutiveOff|con2b', + 'Sink(Wärme)|ConsecutiveOff|initial', + 'Sink(Wärme)|ConsecutiveOff|minimum' }.issubset(set(flow.model.constraints)) assert_var_equal( - model.variables['Sink(Wärme)|consecutive_off_hours'], + model.variables['Sink(Wärme)|ConsecutiveOff|hours'], model.add_variables(lower=0, upper=12, coords=(timesteps,)) ) mega = model.hours_per_step.sum('time') + 1 # previously off for 1h assert_conequal( - model.constraints['Sink(Wärme)|consecutive_off_hours_con1'], - model.variables['Sink(Wärme)|consecutive_off_hours'] <= model.variables['Sink(Wärme)|off'] * mega + model.constraints['Sink(Wärme)|ConsecutiveOff|con1'], + model.variables['Sink(Wärme)|ConsecutiveOff|hours'] <= model.variables['Sink(Wärme)|off'] * mega ) assert_conequal( - model.constraints['Sink(Wärme)|consecutive_off_hours_con2a'], - model.variables['Sink(Wärme)|consecutive_off_hours'].isel(time=slice(1, None)) - <= model.variables['Sink(Wärme)|consecutive_off_hours'].isel(time=slice(None, -1)) + model.hours_per_step.isel(time=slice(None, -1)) + model.constraints['Sink(Wärme)|ConsecutiveOff|con2a'], + model.variables['Sink(Wärme)|ConsecutiveOff|hours'].isel(time=slice(1, None)) + <= model.variables['Sink(Wärme)|ConsecutiveOff|hours'].isel(time=slice(None, -1)) + model.hours_per_step.isel(time=slice(None, -1)) ) # eq: duration(t) >= duration(t - 1) + dt(t) + (On(t) - 1) * BIG assert_conequal( - model.constraints['Sink(Wärme)|consecutive_off_hours_con2b'], - model.variables['Sink(Wärme)|consecutive_off_hours'].isel(time=slice(1, None)) - >= model.variables['Sink(Wärme)|consecutive_off_hours'].isel(time=slice(None, -1)) + model.constraints['Sink(Wärme)|ConsecutiveOff|con2b'], + model.variables['Sink(Wärme)|ConsecutiveOff|hours'].isel(time=slice(1, None)) + >= model.variables['Sink(Wärme)|ConsecutiveOff|hours'].isel(time=slice(None, -1)) + model.hours_per_step.isel(time=slice(None, -1)) + (model.variables['Sink(Wärme)|off'].isel(time=slice(1, None)) - 1) * mega ) assert_conequal( - model.constraints['Sink(Wärme)|consecutive_off_hours_initial'], - model.variables['Sink(Wärme)|consecutive_off_hours'].isel(time=0) - == model.variables['Sink(Wärme)|off'].isel(time=0) * model.hours_per_step.isel(time=0), + model.constraints['Sink(Wärme)|ConsecutiveOff|initial'], + model.variables['Sink(Wärme)|ConsecutiveOff|hours'].isel(time=0) + == model.variables['Sink(Wärme)|off'].isel(time=0) * (model.hours_per_step.isel(time=0)+1), ) assert_conequal( - model.constraints['Sink(Wärme)|consecutive_off_hours_minimum_duration'], - model.variables['Sink(Wärme)|consecutive_off_hours'] + model.constraints['Sink(Wärme)|ConsecutiveOff|minimum'], + model.variables['Sink(Wärme)|ConsecutiveOff|hours'] >= (model.variables['Sink(Wärme)|off'].isel(time=slice(None, -1)) - model.variables['Sink(Wärme)|off'].isel(time=slice(1, None))) * 4 ) diff --git a/tests/test_on_hours_computation.py b/tests/test_on_hours_computation.py index 5608155c0..a873bbd12 100644 --- a/tests/test_on_hours_computation.py +++ b/tests/test_on_hours_computation.py @@ -1,7 +1,7 @@ import numpy as np import pytest -from flixopt.features import OnOffModel +from flixopt.features import ConsecutiveStateModel, StateModel class TestComputeConsecutiveDuration: @@ -31,7 +31,7 @@ class TestComputeConsecutiveDuration: ]) def test_compute_duration(self, binary_values, hours_per_timestep, expected): """Test compute_consecutive_duration with various inputs.""" - result = OnOffModel.compute_consecutive_duration(binary_values, hours_per_timestep) + result = ConsecutiveStateModel.compute_consecutive_hours_in_state(binary_values, hours_per_timestep) assert np.isclose(result, expected) @pytest.mark.parametrize("binary_values, hours_per_timestep", [ @@ -41,7 +41,7 @@ def test_compute_duration(self, binary_values, hours_per_timestep, expected): def test_compute_duration_raises_error(self, binary_values, hours_per_timestep): """Test error conditions.""" with pytest.raises(TypeError): - OnOffModel.compute_consecutive_duration(binary_values, hours_per_timestep) + ConsecutiveStateModel.compute_consecutive_hours_in_state(binary_values, hours_per_timestep) class TestComputePreviousOnStates: @@ -76,7 +76,7 @@ class TestComputePreviousOnStates: ) def test_compute_previous_on_states(self, previous_values, expected): """Test compute_previous_on_states with various inputs.""" - result = OnOffModel.compute_previous_on_states(previous_values) + result = StateModel.compute_previous_states(previous_values) np.testing.assert_array_equal(result, expected) @pytest.mark.parametrize("previous_values, epsilon, expected", [ @@ -90,7 +90,7 @@ def test_compute_previous_on_states(self, previous_values, expected): ]) def test_compute_previous_on_states_with_epsilon(self, previous_values, epsilon, expected): """Test compute_previous_on_states with custom epsilon values.""" - result = OnOffModel.compute_previous_on_states(previous_values, epsilon) + result = StateModel.compute_previous_states(previous_values, epsilon) np.testing.assert_array_equal(result, expected) @pytest.mark.parametrize("previous_values, expected_shape", [ @@ -101,5 +101,5 @@ def test_compute_previous_on_states_with_epsilon(self, previous_values, epsilon, ]) def test_output_shapes(self, previous_values, expected_shape): """Test that output array has the correct shape.""" - result = OnOffModel.compute_previous_on_states(previous_values) + result = StateModel.compute_previous_states(previous_values) assert result.shape == expected_shape