Skip to content
Merged
4 changes: 3 additions & 1 deletion flixopt/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def __init__(
If the load-profile is just an upper limit, use relative_maximum instead.
previous_flow_rate: previous flow rate of the flow. Used to determine if and how long the
flow is already on / off. If None, the flow is considered to be off for one timestep.
Currently does not support different values in different years or scenarios!
meta_data: used to store more information about the Element. Is not used internally, but saved in the results. Only use python native types.
"""
super().__init__(label, meta_data=meta_data)
Expand Down Expand Up @@ -305,7 +306,8 @@ def _plausibility_checks(self) -> None:
]
):
raise TypeError(
f'previous_flow_rate must be None, a scalar, a list of scalars or a 1D-numpy-array. Got {type(self.previous_flow_rate)}'
f'previous_flow_rate must be None, a scalar, a list of scalars or a 1D-numpy-array. Got {type(self.previous_flow_rate)}.'
f'Different values in different years or scenarios are not yetsupported.'
)

@property
Expand Down
9 changes: 6 additions & 3 deletions flixopt/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,22 @@ def _do_modeling(self):
self.parameters.on_hours_total_max if self.parameters.on_hours_total_max is not None else np.inf,
), # TODO: self._model.hours_per_step.sum('time').item() + self._get_previous_on_duration())
short_name='on_hours_total',
coords=self.get_coords(['year', 'scenario']),
coords=['year', 'scenario'],
)

# 4. Switch tracking using existing pattern
if self.parameters.use_switch_on:
self.add_variables(binary=True, short_name='switch|on', coords=self.get_coords())
self.add_variables(binary=True, short_name='switch|off', coords=self.get_coords())

ModelingPrimitives.state_transition_variables(
BoundingPatterns.state_transition_bounds(
self,
state_variable=self.on,
switch_on=self.switch_on,
switch_off=self.switch_off,
name=f'{self.label_of_model}|switch',
previous_state=self._previous_states.isel(time=-1) if self._previous_states is not None else 0,
coord='time',
)

if self.parameters.switch_on_total_max is not None:
Expand Down Expand Up @@ -408,7 +409,9 @@ def _do_modeling(self):
rhs = self.zero_point
elif self._zero_point is True:
self.zero_point = self.add_variables(
coords=self._model.get_coords(), binary=True, short_name='zero_point'
coords=self._model.get_coords(('year', 'scenario') if self._as_time_series else None),
binary=True,
short_name='zero_point',
)
rhs = self.zero_point
else:
Expand Down
24 changes: 11 additions & 13 deletions flixopt/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ def maximum_or_fixed_size(self) -> NonTemporalData:
class OnOffParameters(Interface):
def __init__(
self,
effects_per_switch_on: Optional['NonTemporalEffectsUser'] = None,
effects_per_running_hour: Optional['NonTemporalEffectsUser'] = None,
effects_per_switch_on: Optional['TemporalEffectsUser'] = None,
effects_per_running_hour: Optional['TemporalEffectsUser'] = None,
on_hours_total_min: Optional[int] = None,
on_hours_total_max: Optional[int] = None,
consecutive_on_hours_min: Optional[TemporalDataUser] = None,
Expand Down Expand Up @@ -339,15 +339,13 @@ def use_consecutive_off_hours(self) -> bool:
@property
def use_switch_on(self) -> bool:
"""Determines wether a Variable for SWITCH-ON is needed or not"""
return (
any(
param not in (None, {})
for param in [
self.effects_per_switch_on,
self.switch_on_total_max,
self.on_hours_total_min,
self.on_hours_total_max,
]
)
or self.force_switch_on
if self.force_switch_on:
return True

return any(
param is not None and param != {}
for param in [
self.effects_per_switch_on,
self.switch_on_total_max,
]
)
135 changes: 49 additions & 86 deletions flixopt/modeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def expression_tracking_variable(
name: str = None,
short_name: str = None,
bounds: Tuple[TemporalData, TemporalData] = None,
coords: List[str] = None,
coords: Optional[Union[str, List[str]]] = None,
) -> Tuple[linopy.Variable, linopy.Constraint]:
"""
Creates variable that equals a given expression.
Expand All @@ -205,8 +205,6 @@ def expression_tracking_variable(
if not isinstance(model, Submodel):
raise ValueError('ModelingPrimitives.expression_tracking_variable() can only be used with a Submodel')

coords = coords or ['year', 'scenario']

if not bounds:
tracker = model.add_variables(name=name, coords=model.get_coords(coords), short_name=short_name)
else:
Expand All @@ -223,86 +221,6 @@ def expression_tracking_variable(

return tracker, tracking

@staticmethod
def state_transition_variables(
model: Submodel,
state_variable: linopy.Variable,
switch_on: linopy.Variable,
switch_off: linopy.Variable,
name: str,
previous_state=0,
) -> Tuple[linopy.Constraint, linopy.Constraint, linopy.Constraint]:
"""
Creates switch-on/off variables with state transition logic.

Mathematical formulation:
switch_on[t] - switch_off[t] = state[t] - state[t-1] ∀t > 0
switch_on[0] - switch_off[0] = state[0] - previous_state
switch_on[t] + switch_off[t] ≤ 1 ∀t
switch_on[t], switch_off[t] ∈ {0, 1}

Returns:
variables: {'switch_on': binary_var, 'switch_off': binary_var}
constraints: {'transition': constraint, 'initial': constraint, 'mutex': constraint}
"""
if not isinstance(model, Submodel):
raise ValueError('ModelingPrimitives.state_transition_variables() can only be used with a Submodel')

# State transition constraints for t > 0
transition = model.add_constraints(
switch_on.isel(time=slice(1, None)) - switch_off.isel(time=slice(1, None))
== state_variable.isel(time=slice(1, None)) - state_variable.isel(time=slice(None, -1)),
name=f'{name}|transition',
)

# Initial state transition for t = 0
initial = model.add_constraints(
switch_on.isel(time=0) - switch_off.isel(time=0) == state_variable.isel(time=0) - previous_state,
name=f'{name}|initial',
)

# At most one switch per timestep
mutex = model.add_constraints(switch_on + switch_off <= 1, name=f'{name}|mutex')

return transition, initial, mutex

@staticmethod
def sum_up_variable(
model: Submodel,
variable_to_count: linopy.Variable,
name: str = None,
bounds: Tuple[NonTemporalData, NonTemporalData] = None,
factor: TemporalData = 1,
) -> Tuple[linopy.Variable, linopy.Constraint]:
"""
SUms up a variable over time, applying a factor to the variable.

Args:
model: The optimization model instance
variable_to_count: The variable to be summed up
name: The name of the constraint
bounds: The bounds of the constraint
factor: The factor to be applied to the variable
"""
if not isinstance(model, Submodel):
raise ValueError('ModelingPrimitives.sum_up_variable() can only be used with a Submodel')

if bounds is None:
bounds = (0, np.inf)
else:
bounds = (bounds[0] if bounds[0] is not None else 0, bounds[1] if bounds[1] is not None else np.inf)

count = model.add_variables(
lower=bounds[0],
upper=bounds[1],
coords=model.get_coords(['year', 'scenario']),
name=name,
)

count_constraint = model.add_constraints(count == (variable_to_count * factor).sum('time'), name=name)

return count, count_constraint

@staticmethod
def consecutive_duration_tracking(
model: Submodel,
Expand Down Expand Up @@ -346,7 +264,7 @@ def consecutive_duration_tracking(
duration = model.add_variables(
lower=0,
upper=maximum_duration if maximum_duration is not None else mega,
coords=model.get_coords(['time']),
coords=model.get_coords(),
name=name,
short_name=short_name,
)
Expand Down Expand Up @@ -618,10 +536,55 @@ def scaled_bounds_with_state(
)
scaling_upper = model.add_constraints(variable <= scaling_variable * rel_upper, name=f'{name}|ub2')

big_m_upper = scaling_max * rel_upper
big_m_lower = np.maximum(CONFIG.modeling.EPSILON, scaling_min * rel_lower)
big_m_upper = rel_upper * scaling_max
big_m_lower = np.maximum(CONFIG.modeling.EPSILON, rel_lower * scaling_min)

binary_upper = model.add_constraints(variable_state * big_m_upper >= variable, name=f'{name}|ub1')
binary_lower = model.add_constraints(variable_state * big_m_lower <= variable, name=f'{name}|lb1')

return [scaling_lower, scaling_upper, binary_lower, binary_upper]

@staticmethod
def state_transition_bounds(
model: Submodel,
state_variable: linopy.Variable,
switch_on: linopy.Variable,
switch_off: linopy.Variable,
name: str,
previous_state=0,
coord: str = 'time',
) -> Tuple[linopy.Constraint, linopy.Constraint, linopy.Constraint]:
"""
Creates switch-on/off variables with state transition logic.

Mathematical formulation:
switch_on[t] - switch_off[t] = state[t] - state[t-1] ∀t > 0
switch_on[0] - switch_off[0] = state[0] - previous_state
switch_on[t] + switch_off[t] ≤ 1 ∀t
switch_on[t], switch_off[t] ∈ {0, 1}

Returns:
variables: {'switch_on': binary_var, 'switch_off': binary_var}
constraints: {'transition': constraint, 'initial': constraint, 'mutex': constraint}
"""
if not isinstance(model, Submodel):
raise ValueError('ModelingPrimitives.state_transition_variables() can only be used with a Submodel')

# State transition constraints for t > 0
transition = model.add_constraints(
switch_on.isel({coord: slice(1, None)}) - switch_off.isel({coord: slice(1, None)})
== state_variable.isel({coord: slice(1, None)}) - state_variable.isel({coord: slice(None, -1)}),
name=f'{name}|transition',
)

# Initial state transition for t = 0
initial = model.add_constraints(
switch_on.isel({coord: 0}) - switch_off.isel({coord: 0})
== state_variable.isel({coord: 0}) - previous_state,
name=f'{name}|initial',
)

# At most one switch per timestep
mutex = model.add_constraints(switch_on + switch_off <= 1, name=f'{name}|mutex')

return transition, initial, mutex
76 changes: 50 additions & 26 deletions flixopt/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def __init__(
self._flow_rates = None
self._flow_hours = None
self._sizes = None
self._effects_per_component = {'operation': None, 'invest': None, 'total': None}
self._effects_per_component = None

def __getitem__(self, key: str) -> Union['ComponentResults', 'BusResults', 'EffectResults', 'FlowResults']:
if key in self.components:
Expand Down Expand Up @@ -312,20 +312,24 @@ def filter_solution(
startswith=startswith,
)

def effects_per_component(self, mode: Literal['operation', 'invest', 'total'] = 'total') -> xr.Dataset:
"""Returns a dataset containing effect totals for each components (including their flows).

Args:
mode: Which effects to contain. (operation, invest, total)
@property
def effects_per_component(self) -> xr.Dataset:
"""Returns a dataset containing effect results for each mode, aggregated by Component

Returns:
An xarray Dataset with an additional component dimension and effects as variables.
"""
if mode not in ['operation', 'invest', 'total']:
raise ValueError(f'Invalid mode {mode}')
if self._effects_per_component[mode] is None:
self._effects_per_component[mode] = self._create_effects_dataset(mode)
return self._effects_per_component[mode]
if self._effects_per_component is None:
self._effects_per_component = xr.Dataset(
{
mode: self._create_effects_dataset(mode).to_dataarray('effect', name=mode)
for mode in ['operation', 'invest', 'total']
}
)
dim_order = ['time', 'year', 'scenario', 'component', 'effect']
self._effects_per_component = self._effects_per_component.transpose(*dim_order, missing_dims='ignore')

return self._effects_per_component

def flow_rates(
self,
Expand Down Expand Up @@ -580,7 +584,7 @@ def _compute_effect_total(
total = xr.DataArray(np.nan)
return total.rename(f'{element}->{effect}({mode})')

def _create_effects_dataset(self, mode: Literal['operation', 'invest', 'total'] = 'total') -> xr.Dataset:
def _create_effects_dataset(self, mode: Literal['operation', 'invest', 'total']) -> xr.Dataset:
"""Creates a dataset containing effect totals for all components (including their flows).
The dataset does contain the direct as well as the indirect effects of each component.

Expand All @@ -590,24 +594,44 @@ def _create_effects_dataset(self, mode: Literal['operation', 'invest', 'total']
Returns:
An xarray Dataset with components as dimension and effects as variables.
"""
# Create an empty dataset
ds = xr.Dataset()
all_arrays = {}
template = None # Template is needed to determine the dimensions of the arrays. This handles the case of no shares for an effect

components_list = list(self.components)

# Add each effect as a variable to the dataset
# First pass: collect arrays and find template
for effect in self.effects:
# Create a list of DataArrays, one for each component
component_arrays = [
self._compute_effect_total(element=component, effect=effect, mode=mode, include_flows=True).expand_dims(
component=[component]
) # Add component dimension to each array
for component in list(self.components)
]
effect_arrays = []
for component in components_list:
da = self._compute_effect_total(element=component, effect=effect, mode=mode, include_flows=True)
effect_arrays.append(da)

if template is None and (da.dims or not da.isnull().all()):
template = da

all_arrays[effect] = effect_arrays

# Ensure we have a template
if template is None:
raise ValueError(
f"No template with proper dimensions found for mode '{mode}'. "
f'All computed arrays are scalars, which indicates a data issue.'
)

# Second pass: process all effects (guaranteed to include all)
for effect in self.effects:
dataarrays = all_arrays[effect]
component_arrays = []

for component, arr in zip(components_list, dataarrays, strict=False):
# Expand scalar NaN arrays to match template dimensions
if not arr.dims and np.isnan(arr.item()):
arr = xr.full_like(template, np.nan, dtype=float).rename(arr.name)

component_arrays.append(arr.expand_dims(component=[component]))

# Combine all components into one DataArray for this effect
if component_arrays:
effect_array = xr.concat(component_arrays, dim='component', coords='minimal')
# Add this effect as a variable to the dataset
ds[effect] = effect_array
ds[effect] = xr.concat(component_arrays, dim='component', coords='minimal')

# For now include a test to ensure correctness
suffix = {
Expand Down
Loading