Skip to content

Commit 7a4280d

Browse files
FBumannclaude
andauthored
perf: Optimize clustering and I/O (4.4x faster segmented clustering) (#579)
* perf: Use ds.variables to avoid _construct_dataarray overhead Optimize several functions by using ds.variables instead of iterating over data_vars.items() or accessing ds[name], which triggers slow _construct_dataarray calls. Changes: - io.py: save_dataset_to_netcdf, load_dataset_from_netcdf, _reduce_constant_arrays - structure.py: from_dataset (use coord_cache pattern) - core.py: drop_constant_arrays (use numpy operations) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * perf: Optimize clustering serialization with ds.variables Use ds.variables for faster access in clustering/base.py: - _create_reference_structure: original_data and metrics iteration - compare plot: duration_curve generation with direct numpy indexing Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * perf: Use batch assignment for clustering arrays (24x speedup) _add_clustering_to_dataset was slow due to 210 individual ds[name] = arr assignments. Each triggers xarray's expensive dataset_update_method. Changed to batch assignment with ds.assign(dict): - Before: ~2600ms for to_dataset with clustering - After: ~109ms for to_dataset with clustering Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * perf: Use ds.variables in _build_reduced_dataset (12% faster) Avoided _construct_dataarray overhead by: - Using ds.variables instead of ds.data_vars.items() - Using numpy slicing instead of .isel() - Passing attrs dict directly instead of DataArray cluster() benchmark: - Before: ~10.1s - After: ~8.9s Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * perf: Use numpy reshape in _build_typical_das (4.4x faster) Eliminated 451,856 slow pandas .loc calls by using numpy reshape for segmented clustering data instead of iterating per-cluster. cluster() with segments benchmark (50 clusters, 4 segments): - Before: ~93.7s - After: ~21.1s - Speedup: 4.4x Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: Multiple clustering and IO bug fixes - benchmark_io_performance.py: Add Gurobi → HiGHS solver fallback - components.py: Fix storage decay to use sum (not mean) for hours per cluster - flow_system.py: Add RangeIndex validation requiring explicit timestep_duration - io.py: Include auxiliary coordinates in _fast_get_dataarray - transform_accessor.py: Add empty dataset guard after drop_constant_arrays - transform_accessor.py: Fix timestep_mapping indexing for segmented clustering Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * perf: Use ds.variables pattern in expand() (2.2x faster) Replace data_vars.items() iteration with ds.variables pattern to avoid slow _construct_dataarray calls (5502 calls × ~1.5ms each). Before: 3.73s After: 1.72s Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 79d0e5e commit 7a4280d

8 files changed

Lines changed: 194 additions & 77 deletions

File tree

benchmarks/benchmark_io_performance.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,18 @@ def run_io_benchmarks(
142142

143143
print('\n2. Clustering and solving...')
144144
fs_clustered = fs.transform.cluster(n_clusters=n_clusters, cluster_duration='1D')
145-
fs_clustered.optimize(fx.solvers.GurobiSolver())
145+
146+
# Try Gurobi first, fall back to HiGHS if not available
147+
try:
148+
solver = fx.solvers.GurobiSolver()
149+
fs_clustered.optimize(solver)
150+
except Exception as e:
151+
if 'gurobi' in str(e).lower() or 'license' in str(e).lower():
152+
print(f' Gurobi not available ({e}), falling back to HiGHS...')
153+
solver = fx.solvers.HighsSolver()
154+
fs_clustered.optimize(solver)
155+
else:
156+
raise
146157

147158
print('\n3. Expanding...')
148159
fs_expanded = fs_clustered.transform.expand()

flixopt/clustering/base.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,12 +1113,17 @@ def _create_reference_structure(self, include_original_data: bool = True) -> tup
11131113
original_data_refs = None
11141114
if include_original_data and self.original_data is not None:
11151115
original_data_refs = []
1116-
for name, da in self.original_data.data_vars.items():
1116+
# Use variables for faster access (avoids _construct_dataarray overhead)
1117+
variables = self.original_data.variables
1118+
for name in self.original_data.data_vars:
1119+
var = variables[name]
11171120
ref_name = f'original_data|{name}'
11181121
# Rename time dim to avoid xarray alignment issues
1119-
if 'time' in da.dims:
1120-
da = da.rename({'time': 'original_time'})
1121-
arrays[ref_name] = da
1122+
if 'time' in var.dims:
1123+
new_dims = tuple('original_time' if d == 'time' else d for d in var.dims)
1124+
arrays[ref_name] = xr.Variable(new_dims, var.values, attrs=var.attrs)
1125+
else:
1126+
arrays[ref_name] = var
11221127
original_data_refs.append(f':::{ref_name}')
11231128

11241129
# NOTE: aggregated_data is NOT serialized - it's identical to the FlowSystem's
@@ -1129,9 +1134,11 @@ def _create_reference_structure(self, include_original_data: bool = True) -> tup
11291134
metrics_refs = None
11301135
if self._metrics is not None:
11311136
metrics_refs = []
1132-
for name, da in self._metrics.data_vars.items():
1137+
# Use variables for faster access (avoids _construct_dataarray overhead)
1138+
metrics_vars = self._metrics.variables
1139+
for name in self._metrics.data_vars:
11331140
ref_name = f'metrics|{name}'
1134-
arrays[ref_name] = da
1141+
arrays[ref_name] = metrics_vars[name]
11351142
metrics_refs.append(f':::{ref_name}')
11361143

11371144
reference = {
@@ -1415,9 +1422,15 @@ def compare(
14151422

14161423
if kind == 'duration_curve':
14171424
sorted_vars = {}
1425+
# Use variables for faster access (avoids _construct_dataarray overhead)
1426+
variables = ds.variables
1427+
rep_values = ds.coords['representation'].values
1428+
rep_idx = {rep: i for i, rep in enumerate(rep_values)}
14181429
for var in ds.data_vars:
1419-
for rep in ds.coords['representation'].values:
1420-
values = np.sort(ds[var].sel(representation=rep).values.flatten())[::-1]
1430+
data = variables[var].values
1431+
for rep in rep_values:
1432+
# Direct numpy indexing instead of .sel()
1433+
values = np.sort(data[rep_idx[rep]].flatten())[::-1]
14211434
sorted_vars[(var, rep)] = values
14221435
# Get length from first sorted array
14231436
n = len(next(iter(sorted_vars.values())))

flixopt/components.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1505,11 +1505,11 @@ def _add_linking_constraints(
15051505

15061506
# Apply self-discharge decay factor (1-loss)^hours to soc_before per Eq. 5
15071507
# relative_loss_per_hour is per-hour, so we need total hours per cluster
1508-
# Use sum over time to handle both regular and segmented systems
1508+
# Use sum over time to get total duration (handles both regular and segmented systems)
15091509
# Keep as DataArray to respect per-period/scenario values
15101510
rel_loss = _scalar_safe_reduce(self.element.relative_loss_per_hour, 'time', 'mean')
1511-
hours_per_cluster = _scalar_safe_reduce(self._model.timestep_duration, 'time', 'mean')
1512-
decay_n = (1 - rel_loss) ** hours_per_cluster
1511+
total_hours_per_cluster = _scalar_safe_reduce(self._model.timestep_duration, 'time', 'sum')
1512+
decay_n = (1 - rel_loss) ** total_hours_per_cluster
15131513

15141514
lhs = soc_after - soc_before * decay_n - delta_soc_ordered
15151515
self.add_constraints(lhs == 0, short_name='link')

flixopt/core.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -629,17 +629,24 @@ def drop_constant_arrays(
629629
Dataset with constant variables removed.
630630
"""
631631
drop_vars = []
632+
# Use ds.variables for faster access (avoids _construct_dataarray overhead)
633+
variables = ds.variables
632634

633-
for name, da in ds.data_vars.items():
635+
for name in ds.data_vars:
636+
var = variables[name]
634637
# Skip variables without the dimension
635-
if dim not in da.dims:
638+
if dim not in var.dims:
636639
if drop_arrays_without_dim:
637640
drop_vars.append(name)
638641
continue
639642

640-
# Check if variable is constant along the dimension (ptp < atol)
641-
ptp = da.max(dim, skipna=True) - da.min(dim, skipna=True)
642-
if (ptp < atol).all().item():
643+
# Check if variable is constant along the dimension using numpy (ptp < atol)
644+
axis = var.dims.index(dim)
645+
data = var.values
646+
# Use numpy operations directly for speed
647+
with np.errstate(invalid='ignore'): # Ignore NaN warnings
648+
ptp = np.nanmax(data, axis=axis) - np.nanmin(data, axis=axis)
649+
if np.all(ptp < atol):
643650
drop_vars.append(name)
644651

645652
if drop_vars:

flixopt/flow_system.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ def __init__(
214214
elif computed_timestep_duration is not None:
215215
self.timestep_duration = self.fit_to_model_coords('timestep_duration', computed_timestep_duration)
216216
else:
217+
# RangeIndex (segmented systems) requires explicit timestep_duration
218+
if isinstance(self.timesteps, pd.RangeIndex):
219+
raise ValueError(
220+
'timestep_duration is required when using RangeIndex timesteps (segmented systems). '
221+
'Provide timestep_duration explicitly or use DatetimeIndex timesteps.'
222+
)
217223
self.timestep_duration = None
218224

219225
# Cluster weight for cluster() optimization (default 1.0)

flixopt/io.py

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -561,14 +561,18 @@ def save_dataset_to_netcdf(
561561
ds.attrs = {'attrs': json.dumps(ds.attrs)}
562562

563563
# Convert all DataArray attrs to JSON strings
564-
for var_name, data_var in ds.data_vars.items():
565-
if data_var.attrs: # Only if there are attrs
566-
ds[var_name].attrs = {'attrs': json.dumps(data_var.attrs)}
564+
# Use ds.variables to avoid slow _construct_dataarray calls
565+
variables = ds.variables
566+
for var_name in ds.data_vars:
567+
var = variables[var_name]
568+
if var.attrs: # Only if there are attrs
569+
var.attrs = {'attrs': json.dumps(var.attrs)}
567570

568571
# Also handle coordinate attrs if they exist
569-
for coord_name, coord_var in ds.coords.items():
570-
if hasattr(coord_var, 'attrs') and coord_var.attrs:
571-
ds[coord_name].attrs = {'attrs': json.dumps(coord_var.attrs)}
572+
for coord_name in ds.coords:
573+
var = variables[coord_name]
574+
if var.attrs:
575+
var.attrs = {'attrs': json.dumps(var.attrs)}
572576

573577
# Suppress numpy binary compatibility warnings from netCDF4 (numpy 1->2 transition)
574578
with warnings.catch_warnings():
@@ -602,25 +606,38 @@ def _reduce_constant_arrays(ds: xr.Dataset) -> xr.Dataset:
602606
Dataset with constant dimensions reduced.
603607
"""
604608
new_data_vars = {}
609+
variables = ds.variables
610+
611+
for name in ds.data_vars:
612+
var = variables[name]
613+
dims = var.dims
614+
data = var.values
605615

606-
for name, da in ds.data_vars.items():
607-
if not da.dims or da.size == 0:
608-
new_data_vars[name] = da
616+
if not dims or data.size == 0:
617+
new_data_vars[name] = var
609618
continue
610619

611-
# Try to reduce each dimension
612-
reduced = da
613-
for dim in list(da.dims):
614-
if dim not in reduced.dims:
620+
# Try to reduce each dimension using numpy operations
621+
reduced_data = data
622+
reduced_dims = list(dims)
623+
624+
for _axis, dim in enumerate(dims):
625+
if dim not in reduced_dims:
615626
continue # Already removed
616-
# Check if constant along this dimension
617-
first_slice = reduced.isel({dim: 0})
618-
is_constant = (reduced == first_slice).all()
627+
628+
current_axis = reduced_dims.index(dim)
629+
# Check if constant along this axis using numpy
630+
first_slice = np.take(reduced_data, 0, axis=current_axis)
631+
# Broadcast first_slice to compare
632+
expanded = np.expand_dims(first_slice, axis=current_axis)
633+
is_constant = np.allclose(reduced_data, expanded, equal_nan=True)
634+
619635
if is_constant:
620636
# Remove this dimension by taking first slice
621-
reduced = first_slice
637+
reduced_data = first_slice
638+
reduced_dims.pop(current_axis)
622639

623-
new_data_vars[name] = reduced
640+
new_data_vars[name] = xr.Variable(tuple(reduced_dims), reduced_data, attrs=var.attrs)
624641

625642
return xr.Dataset(new_data_vars, coords=ds.coords, attrs=ds.attrs)
626643

@@ -754,14 +771,18 @@ def load_dataset_from_netcdf(path: str | pathlib.Path) -> xr.Dataset:
754771
ds.attrs = json.loads(ds.attrs['attrs'])
755772

756773
# Restore DataArray attrs (before unstacking, as stacked vars have no individual attrs)
757-
for var_name, data_var in ds.data_vars.items():
758-
if 'attrs' in data_var.attrs:
759-
ds[var_name].attrs = json.loads(data_var.attrs['attrs'])
774+
# Use ds.variables to avoid slow _construct_dataarray calls
775+
variables = ds.variables
776+
for var_name in ds.data_vars:
777+
var = variables[var_name]
778+
if 'attrs' in var.attrs:
779+
var.attrs = json.loads(var.attrs['attrs'])
760780

761781
# Restore coordinate attrs
762-
for coord_name, coord_var in ds.coords.items():
763-
if hasattr(coord_var, 'attrs') and 'attrs' in coord_var.attrs:
764-
ds[coord_name].attrs = json.loads(coord_var.attrs['attrs'])
782+
for coord_name in ds.coords:
783+
var = variables[coord_name]
784+
if 'attrs' in var.attrs:
785+
var.attrs = json.loads(var.attrs['attrs'])
765786

766787
# Unstack variables if they were stacked during saving
767788
# Detection: check if any dataset dimension starts with '__stacked__'
@@ -1577,7 +1598,10 @@ def _fast_get_dataarray(ds: xr.Dataset, name: str, coord_cache: dict[str, xr.Dat
15771598
Constructed DataArray
15781599
"""
15791600
variable = ds.variables[name]
1580-
coords = {k: coord_cache[k] for k in variable.dims if k in coord_cache}
1601+
var_dims = set(variable.dims)
1602+
# Include coordinates whose dims are a subset of the variable's dims
1603+
# This preserves both dimension coordinates and auxiliary coordinates
1604+
coords = {k: v for k, v in coord_cache.items() if set(v.dims).issubset(var_dims)}
15811605
return xr.DataArray(variable, coords=coords, name=name)
15821606

15831607
@staticmethod
@@ -1865,9 +1889,10 @@ def _add_clustering_to_dataset(
18651889
clustering_ref, clustering_arrays = clustering._create_reference_structure(
18661890
include_original_data=include_original_data
18671891
)
1868-
# Add clustering arrays with prefix
1869-
for name, arr in clustering_arrays.items():
1870-
ds[f'{cls.CLUSTERING_PREFIX}{name}'] = arr
1892+
# Add clustering arrays with prefix using batch assignment
1893+
# (individual ds[name] = arr assignments are slow)
1894+
prefixed_arrays = {f'{cls.CLUSTERING_PREFIX}{name}': arr for name, arr in clustering_arrays.items()}
1895+
ds = ds.assign(prefixed_arrays)
18711896
ds.attrs['clustering'] = json.dumps(clustering_ref)
18721897

18731898
return ds

flixopt/structure.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,17 @@ def from_dataset(cls, ds: xr.Dataset) -> Interface:
11161116
reference_structure.pop('__class__', None)
11171117

11181118
# Create arrays dictionary from dataset variables
1119-
arrays_dict = {name: array for name, array in ds.data_vars.items()}
1119+
# Use ds.variables with coord_cache for faster DataArray construction
1120+
variables = ds.variables
1121+
coord_cache = {k: ds.coords[k] for k in ds.coords}
1122+
arrays_dict = {
1123+
name: xr.DataArray(
1124+
variables[name],
1125+
coords={k: coord_cache[k] for k in variables[name].dims if k in coord_cache},
1126+
name=name,
1127+
)
1128+
for name in ds.data_vars
1129+
}
11201130

11211131
# Resolve all references using the centralized method
11221132
resolved_params = cls._resolve_reference_structure(reference_structure, arrays_dict)

0 commit comments

Comments
 (0)