diff --git a/docs/home/quick-start.md b/docs/home/quick-start.md index 4b80a7066..d95e53c90 100644 --- a/docs/home/quick-start.md +++ b/docs/home/quick-start.md @@ -88,21 +88,31 @@ battery = fx.Storage( flow_system.add_elements(solar, demand, battery, electricity_bus) ``` -### 5. Run Optimization +### 5. Visualize and Run Optimization ```python -# Run optimization directly on the flow system +# Optional: visualize your system structure +flow_system.topology.plot(path='system.html') + +# Run optimization flow_system.optimize(fx.solvers.HighsSolver()) ``` -### 6. Access Results +### 6. Access and Visualize Results ```python -# Access results directly from the flow system +# Access raw solution data print(flow_system.solution) -# Or access component-specific results +# Use statistics for aggregated data +print(flow_system.statistics.flow_hours) + +# Access component-specific results print(flow_system.components['battery'].solution) + +# Visualize results +flow_system.statistics.plot.balance('electricity') +flow_system.statistics.plot.storage('battery') ``` ### 7. Save Results (Optional) @@ -132,8 +142,10 @@ Most flixOpt projects follow this pattern: 2. **Create flow system** - Initialize with time series and effects 3. **Add buses** - Define connection points 4. **Add components** - Create generators, storage, converters, loads -5. **Run optimization** - Call `flow_system.optimize(solver)` -6. **Access Results** - Via `flow_system.solution` or component `.solution` attributes +5. **Verify structure** - Use `flow_system.topology.plot()` to visualize +6. **Run optimization** - Call `flow_system.optimize(solver)` +7. **Analyze results** - Via `flow_system.statistics` and `.solution` +8. **Visualize** - Use `flow_system.statistics.plot.*` methods ## Tips diff --git a/docs/user-guide/core-concepts.md b/docs/user-guide/core-concepts.md index 3bccb554c..401b34705 100644 --- a/docs/user-guide/core-concepts.md +++ b/docs/user-guide/core-concepts.md @@ -127,23 +127,29 @@ Define your system structure, parameters, and time series data. ### 2. Run the Optimization -Create an [`Optimization`][flixopt.optimization.Optimization] and solve it: +Optimize your FlowSystem with a solver: ```python -optimization = fx.Optimization('my_model', flow_system) -results = optimization.solve(fx.solvers.HighsSolver()) +flow_system.optimize(fx.solvers.HighsSolver()) ``` ### 3. Analyze Results -The [`Results`][flixopt.results.Results] object contains all solution data: +Access solution data directly from the FlowSystem: ```python -# Access component results -boiler_output = results['Boiler'].node_balance() +# Access component solutions +boiler = flow_system.components['Boiler'] +print(boiler.solution) # Get total costs -total_costs = results.solution['Costs'] +total_costs = flow_system.solution['costs|total'] + +# Use statistics for aggregated data +print(flow_system.statistics.flow_hours) + +# Plot results +flow_system.statistics.plot.balance('HeatBus') ```
@@ -185,12 +191,17 @@ While our example used a heating system, flixOpt works for any flow-based optimi flixOpt is built on [linopy](https://github.com/PyPSA/linopy). You can access and extend the underlying optimization model for custom constraints: ```python -# Access the linopy model after building -optimization.do_modeling() -model = optimization.model +# Build the model (without solving) +flow_system.build_model() + +# Access the linopy model +model = flow_system.model # Add custom constraints using linopy API model.add_constraints(...) + +# Then solve +flow_system.solve(fx.solvers.HighsSolver()) ``` This allows advanced users to add domain-specific constraints while keeping flixOpt's convenience for standard modeling. diff --git a/docs/user-guide/migration-guide-v6.md b/docs/user-guide/migration-guide-v6.md index 796310522..8b50312ee 100644 --- a/docs/user-guide/migration-guide-v6.md +++ b/docs/user-guide/migration-guide-v6.md @@ -296,6 +296,31 @@ The new API also applies to advanced optimization modes: --- +## Statistics Accessor + +The new `statistics` accessor provides convenient aggregated data: + +```python +stats = flow_system.statistics + +# Flow data (clean labels, no |flow_rate suffix) +stats.flow_rates['Boiler(Q_th)'] # Not 'Boiler(Q_th)|flow_rate' +stats.flow_hours['Boiler(Q_th)'] +stats.sizes['Boiler(Q_th)'] +stats.charge_states['Battery'] + +# Effect breakdown by contributor (replaces effects_per_component) +stats.temporal_effects['costs'] # Per timestep, per contributor +stats.periodic_effects['costs'] # Investment costs per contributor +stats.total_effects['costs'] # Total per contributor + +# Group by component or component type +stats.total_effects['costs'].groupby('component').sum() +stats.total_effects['costs'].groupby('component_type').sum() +``` + +--- + ## 🔧 Quick Reference ### Common Conversions diff --git a/docs/user-guide/optimization/index.md b/docs/user-guide/optimization/index.md index 0762d505a..07c96454c 100644 --- a/docs/user-guide/optimization/index.md +++ b/docs/user-guide/optimization/index.md @@ -2,6 +2,21 @@ This section covers how to run optimizations in flixOpt, including different optimization modes and solver configuration. +## Verifying Your Model + +Before running an optimization, it's helpful to visualize your system structure: + +```python +# Generate an interactive network diagram +flow_system.topology.plot(path='my_system.html') + +# Or get structure info programmatically +nodes, edges = flow_system.topology.infos() +print(f"Components: {[n for n, d in nodes.items() if d['class'] == 'Component']}") +print(f"Buses: {[n for n, d in nodes.items() if d['class'] == 'Bus']}") +print(f"Flows: {list(edges.keys())}") +``` + ## Standard Optimization The recommended way to run an optimization is directly on the `FlowSystem`: @@ -78,6 +93,107 @@ print(clustered_fs.solution) | Standard | Small-Medium | Slow | Optimal | | Clustered | Very Large | Fast | Approximate | +## Custom Constraints + +flixOpt is built on [linopy](https://github.com/PyPSA/linopy), allowing you to add custom constraints beyond what's available through the standard API. + +### Adding Custom Constraints + +To add custom constraints, build the model first, then access the underlying linopy model: + +```python +# Build the model (without solving) +flow_system.build_model() + +# Access the linopy model +model = flow_system.model + +# Access variables from the solution namespace +# Variables are named: "ElementLabel|variable_name" +boiler_flow = model.variables['Boiler(Q_th)|flow_rate'] +chp_flow = model.variables['CHP(Q_th)|flow_rate'] + +# Add a custom constraint: Boiler must produce at least as much as CHP +model.add_constraints( + boiler_flow >= chp_flow, + name='boiler_min_chp' +) + +# Solve with the custom constraint +flow_system.solve(fx.solvers.HighsSolver()) +``` + +### Common Use Cases + +**Minimum runtime constraint:** +```python +# Require component to run at least 100 hours total +on_var = model.variables['CHP|on'] # Binary on/off variable +hours = flow_system.hours_per_timestep +model.add_constraints( + (on_var * hours).sum() >= 100, + name='chp_min_runtime' +) +``` + +**Linking flows across components:** +```python +# Heat pump and boiler combined must meet minimum base load +hp_flow = model.variables['HeatPump(Q_th)|flow_rate'] +boiler_flow = model.variables['Boiler(Q_th)|flow_rate'] +model.add_constraints( + hp_flow + boiler_flow >= 50, # At least 50 kW combined + name='min_heat_supply' +) +``` + +**Seasonal constraints:** +```python +import pandas as pd + +# Different constraints for summer vs winter +summer_mask = flow_system.timesteps.month.isin([6, 7, 8]) +winter_mask = flow_system.timesteps.month.isin([12, 1, 2]) + +flow_var = model.variables['Boiler(Q_th)|flow_rate'] + +# Lower capacity in summer +model.add_constraints( + flow_var.sel(time=flow_system.timesteps[summer_mask]) <= 100, + name='summer_limit' +) +``` + +### Inspecting the Model + +Before adding constraints, inspect available variables and existing constraints: + +```python +flow_system.build_model() +model = flow_system.model + +# List all variables +print(model.variables) + +# List all constraints +print(model.constraints) + +# Get details about a specific variable +print(model.variables['Boiler(Q_th)|flow_rate']) +``` + +### Variable Naming Convention + +Variables follow this naming pattern: + +| Element Type | Pattern | Example | +|--------------|---------|---------| +| Flow rate | `Component(FlowLabel)\|flow_rate` | `Boiler(Q_th)\|flow_rate` | +| Flow size | `Component(FlowLabel)\|size` | `Boiler(Q_th)\|size` | +| On/off status | `Component\|on` | `CHP\|on` | +| Charge state | `Storage\|charge_state` | `Battery\|charge_state` | +| Effect totals | `effect_name\|total` | `costs\|total` | + ## Solver Configuration ### Available Solvers diff --git a/docs/user-guide/results-plotting.md b/docs/user-guide/results-plotting.md new file mode 100644 index 000000000..4f1932e53 --- /dev/null +++ b/docs/user-guide/results-plotting.md @@ -0,0 +1,413 @@ +# Plotting Results + +After solving an optimization, flixOpt provides a powerful plotting API to visualize and analyze your results. The API is designed to be intuitive and chainable, giving you quick access to common plots while still allowing deep customization. + +## The Plot Accessor + +All plotting is accessed through the `statistics.plot` accessor on your FlowSystem: + +```python +# Run optimization +flow_system.optimize(fx.solvers.HighsSolver()) + +# Access plotting via statistics +flow_system.statistics.plot.balance('ElectricityBus') +flow_system.statistics.plot.sankey() +flow_system.statistics.plot.heatmap('Boiler(Q_th)|flow_rate') +``` + +## PlotResult: Data + Figure + +Every plot method returns a [`PlotResult`][flixopt.plot_accessors.PlotResult] object containing both: + +- **`data`**: An xarray Dataset with the prepared data +- **`figure`**: A Plotly Figure object + +This gives you full access to export data, customize the figure, or use the data for your own visualizations: + +```python +result = flow_system.statistics.plot.balance('Bus') + +# Access the xarray data +print(result.data) +result.data.to_dataframe() # Convert to pandas DataFrame +result.data.to_netcdf('balance_data.nc') # Export as netCDF + +# Access and modify the figure +result.figure.update_layout(title='Custom Title') +result.figure.show() +``` + +### Method Chaining + +All `PlotResult` methods return `self`, enabling fluent chaining: + +```python +flow_system.statistics.plot.balance('Bus') \ + .update(title='Custom Title', height=600) \ + .update_traces(opacity=0.8) \ + .to_csv('data.csv') \ + .to_html('plot.html') \ + .show() +``` + +Available methods: + +| Method | Description | +|--------|-------------| +| `.show()` | Display the figure | +| `.update(**kwargs)` | Update figure layout (passes to `fig.update_layout()`) | +| `.update_traces(**kwargs)` | Update traces (passes to `fig.update_traces()`) | +| `.to_html(path)` | Save as interactive HTML | +| `.to_image(path)` | Save as static image (png, svg, pdf) | +| `.to_csv(path)` | Export data to CSV (converts xarray to DataFrame) | +| `.to_netcdf(path)` | Export data to netCDF (native xarray format) | + +## Available Plot Methods + +### Balance Plot + +Plot the energy/material balance at a node (Bus or Component), showing inputs and outputs: + +```python +flow_system.statistics.plot.balance('ElectricityBus') +flow_system.statistics.plot.balance('Boiler', mode='area') +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `node` | str | Label of the Bus or Component | +| `mode` | `'bar'`, `'line'`, `'area'` | Visual style (default: `'bar'`) | +| `unit` | `'flow_rate'`, `'flow_hours'` | Power (kW) or energy (kWh) | +| `include` | str or list | Only include flows containing these substrings | +| `exclude` | str or list | Exclude flows containing these substrings | +| `aggregate` | `'sum'`, `'mean'`, `'max'`, `'min'` | Aggregate over time | +| `select` | dict | xarray-style data selection | + +### Storage Plot + +Visualize storage components with charge state and flow balance: + +```python +flow_system.statistics.plot.storage('Battery') +flow_system.statistics.plot.storage('ThermalStorage', mode='line') +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `component` | str | Storage component label | +| `mode` | `'bar'`, `'line'`, `'area'` | Visual style | + +### Heatmap + +Create heatmaps of time series data, with automatic time reshaping: + +```python +flow_system.statistics.plot.heatmap('Boiler(Q_th)|flow_rate') +flow_system.statistics.plot.heatmap(['CHP|on', 'Boiler|on'], facet_col='variable') +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `variables` | str or list | Variable name(s) to plot | +| `reshape` | tuple | Time reshaping pattern, e.g., `('D', 'h')` for days × hours | +| `colorscale` | str | Plotly colorscale name | + +Common reshape patterns: + +- `('D', 'h')`: Days × Hours (default) +- `('W', 'D')`: Weeks × Days +- `('MS', 'D')`: Months × Days + +### Flows Plot + +Plot flow rates filtered by nodes or components: + +```python +flow_system.statistics.plot.flows(component='Boiler') +flow_system.statistics.plot.flows(start='ElectricityBus') +flow_system.statistics.plot.flows(unit='flow_hours', aggregate='sum') +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `start` | str or list | Filter by source node(s) | +| `end` | str or list | Filter by destination node(s) | +| `component` | str or list | Filter by parent component(s) | +| `unit` | `'flow_rate'`, `'flow_hours'` | Power or energy | +| `aggregate` | str | Time aggregation | + +### Compare Plot + +Compare multiple elements side-by-side: + +```python +flow_system.statistics.plot.compare(['Boiler', 'CHP', 'HeatPump'], variable='flow_rate') +flow_system.statistics.plot.compare(['Battery1', 'Battery2'], variable='charge_state') +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `elements` | list | Element labels to compare | +| `variable` | str | Variable suffix to compare | +| `mode` | `'overlay'`, `'facet'` | Same axes or subplots | + +### Sankey Diagram + +Visualize energy/material flows as a Sankey diagram: + +```python +flow_system.statistics.plot.sankey() +flow_system.statistics.plot.sankey(timestep=100) +flow_system.statistics.plot.sankey(aggregate='mean') +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `timestep` | int or str | Specific timestep, or None for aggregation | +| `aggregate` | `'sum'`, `'mean'` | Aggregation method when timestep is None | + +### Effects Plot + +Plot cost, emissions, or other effect breakdowns. Effects can be grouped by component, individual contributor (flows), or time. + +```python +flow_system.statistics.plot.effects() # Total of all effects by component +flow_system.statistics.plot.effects(effect='costs') # Just costs +flow_system.statistics.plot.effects(by='contributor') # By individual flows/components +flow_system.statistics.plot.effects(aspect='temporal', by='time') # Over time +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `aspect` | `'total'`, `'temporal'`, `'periodic'` | Which aspect to plot (default: `'total'`) | +| `effect` | str or None | Specific effect to plot (e.g., `'costs'`, `'CO2'`). If None, plots all. | +| `by` | `'component'`, `'contributor'`, `'time'` | Grouping dimension (default: `'component'`) | +| `select` | dict | xarray-style data selection | +| `colors` | dict | Color overrides for categories | +| `facet_col` | str | Dimension for column facets (default: `'scenario'`) | +| `facet_row` | str | Dimension for row facets (default: `'period'`) | + +**Grouping options:** + +- **`by='component'`**: Groups effects by parent component (e.g., all flows from a Boiler are summed together) +- **`by='contributor'`**: Shows individual contributors - flows and components that directly contribute to effects +- **`by='time'`**: Shows effects over time (only valid for `aspect='temporal'`) + +!!! note "Contributors vs Components" + Contributors include not just flows, but also components that directly contribute to effects (e.g., via `effects_per_active_hour`). The system automatically detects all contributors from the optimization solution. + +### Variable Plot + +Plot the same variable type across multiple elements for comparison: + +```python +flow_system.statistics.plot.variable('on') # All binary operation states +flow_system.statistics.plot.variable('flow_rate', include='Boiler') +flow_system.statistics.plot.variable('charge_state') # All storage charge states +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `pattern` | str | Variable suffix to match (e.g., `'on'`, `'flow_rate'`) | +| `include` | str or list | Only include elements containing these substrings | +| `exclude` | str or list | Exclude elements containing these substrings | +| `aggregate` | str | Time aggregation method | +| `mode` | `'line'`, `'bar'`, `'area'` | Visual style | + +### Duration Curve + +Plot load duration curves (sorted time series) to understand utilization patterns: + +```python +flow_system.statistics.plot.duration_curve('Boiler(Q_th)') +flow_system.statistics.plot.duration_curve(['CHP(Q_th)', 'HeatPump(Q_th)']) +flow_system.statistics.plot.duration_curve('Demand(in)', normalize=True) +``` + +**Key parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `variables` | str or list | Variable name(s) to plot | +| `normalize` | bool | Normalize x-axis to 0-100% (default: False) | +| `mode` | `'line'`, `'area'` | Visual style | + +## Common Parameters + +Most plot methods share these parameters: + +### Data Selection + +Use xarray-style selection to filter data before plotting: + +```python +# Single value +flow_system.statistics.plot.balance('Bus', select={'scenario': 'base'}) + +# Multiple values +flow_system.statistics.plot.balance('Bus', select={'scenario': ['base', 'high_demand']}) + +# Time slices +flow_system.statistics.plot.balance('Bus', select={'time': slice('2024-01', '2024-06')}) + +# Combined +flow_system.statistics.plot.balance('Bus', select={ + 'scenario': 'base', + 'time': slice('2024-01-01', '2024-01-07') +}) +``` + +### Faceting and Animation + +Control how multi-dimensional data is displayed: + +```python +# Facet by scenario +flow_system.statistics.plot.balance('Bus', facet_col='scenario') + +# Animate by period +flow_system.statistics.plot.balance('Bus', animate_by='period') + +# Both +flow_system.statistics.plot.balance('Bus', facet_col='scenario', animate_by='period') +``` + +!!! note + Facet and animation dimensions are automatically ignored if not present in the data. Defaults are `facet_col='scenario'` and `animate_by='period'` for balance plots. + +### Include/Exclude Filtering + +Filter flows using simple substring matching: + +```python +# Only show flows containing 'Q_th' +flow_system.statistics.plot.balance('Bus', include='Q_th') + +# Exclude flows containing 'Gas' or 'Grid' +flow_system.statistics.plot.balance('Bus', exclude=['Gas', 'Grid']) + +# Combine include and exclude +flow_system.statistics.plot.balance('Bus', include='Boiler', exclude='auxiliary') +``` + +### Colors + +Override colors using a dictionary: + +```python +flow_system.statistics.plot.balance('Bus', colors={ + 'Boiler(Q_th)': '#ff6b6b', + 'CHP(Q_th)': '#4ecdc4', +}) +``` + +### Display Control + +Control whether plots are shown automatically: + +```python +# Don't show (useful in scripts) +result = flow_system.statistics.plot.balance('Bus', show=False) + +# Show later +result.show() +``` + +The default behavior is controlled by `CONFIG.Plotting.default_show`. + +## Complete Examples + +### Analyzing a Bus Balance + +```python +# Quick overview +flow_system.statistics.plot.balance('ElectricityBus') + +# Detailed analysis with exports +result = flow_system.statistics.plot.balance( + 'ElectricityBus', + mode='area', + unit='flow_hours', + select={'time': slice('2024-06-01', '2024-06-07')}, + show=False +) + +# Access xarray data for further analysis +print(result.data) # xarray Dataset +df = result.data.to_dataframe() # Convert to pandas + +# Export data +result.to_netcdf('electricity_balance.nc') # Native xarray format +result.to_csv('electricity_balance.csv') # As CSV + +# Customize and display +result.update( + title='Electricity Balance - First Week of June', + yaxis_title='Energy [kWh]' +).show() +``` + +### Comparing Storage Units + +```python +# Compare charge states +flow_system.statistics.plot.compare( + ['Battery1', 'Battery2', 'ThermalStorage'], + variable='charge_state', + mode='overlay' +).update(title='Storage Comparison') +``` + +### Creating a Report + +```python +# Generate multiple plots for a report +plots = { + 'balance': flow_system.statistics.plot.balance('HeatBus', show=False), + 'storage': flow_system.statistics.plot.storage('ThermalStorage', show=False), + 'sankey': flow_system.statistics.plot.sankey(show=False), + 'costs': flow_system.statistics.plot.effects(effect='costs', show=False), +} + +# Export all +for name, plot in plots.items(): + plot.to_html(f'report_{name}.html') + plot.to_netcdf(f'report_{name}.nc') # xarray native format +``` + +### Working with xarray Data + +The `.data` attribute returns xarray objects, giving you full access to xarray's powerful data manipulation capabilities: + +```python +result = flow_system.statistics.plot.balance('Bus', show=False) + +# Access the xarray Dataset +ds = result.data + +# Use xarray operations +ds.mean(dim='time') # Average over time +ds.sel(time='2024-06') # Select specific time +ds.to_dataframe() # Convert to pandas + +# Export options +ds.to_netcdf('data.nc') # Native xarray format +ds.to_zarr('data.zarr') # Zarr format for large datasets +``` diff --git a/docs/user-guide/results/index.md b/docs/user-guide/results/index.md index c0a9464ed..5f103dd39 100644 --- a/docs/user-guide/results/index.md +++ b/docs/user-guide/results/index.md @@ -1,18 +1,12 @@ # Analyzing Results -!!! note "Under Development" - This section is being expanded with detailed tutorials. +After running an optimization, flixOpt provides powerful tools to access, analyze, and visualize your results. -Learn how to work with optimization results: +## Accessing Solution Data -- Accessing solution data -- Plotting flows and states -- Exporting to various formats -- Comparing scenarios and periods +### Raw Solution -## Accessing Results - -After running an optimization, access results directly from the FlowSystem: +The `solution` property contains all optimization variables as an xarray Dataset: ```python # Run optimization @@ -20,15 +14,199 @@ flow_system.optimize(fx.solvers.HighsSolver()) # Access the full solution dataset solution = flow_system.solution +print(solution) + +# Access specific variables print(solution['Boiler(Q_th)|flow_rate']) +print(solution['Battery|charge_state']) +``` -# Access component-specific solutions +### Element-Specific Solutions + +Access solution data for individual elements: + +```python +# Component solutions boiler = flow_system.components['Boiler'] -print(boiler.solution) +print(boiler.solution) # All variables for this component -# Access flow solutions +# Flow solutions flow = flow_system.flows['Boiler(Q_th)'] print(flow.solution) + +# Bus solutions (if imbalance is allowed) +bus = flow_system.buses['Heat'] +print(bus.solution) +``` + +## Statistics Accessor + +The `statistics` accessor provides pre-computed aggregations for common analysis tasks: + +```python +# Access via the statistics property +stats = flow_system.statistics +``` + +### Available Data Properties + +| Property | Description | +|----------|-------------| +| `flow_rates` | All flow rate variables as xarray Dataset | +| `flow_hours` | Flow hours (flow_rate × hours_per_timestep) | +| `sizes` | All size variables (fixed and optimized) | +| `charge_states` | Storage charge state variables | +| `temporal_effects` | Temporal effects per contributor per timestep | +| `periodic_effects` | Periodic (investment) effects per contributor | +| `total_effects` | Total effects (temporal + periodic) per contributor | +| `effect_share_factors` | Conversion factors between effects | + +### Examples + +```python +# Get all flow rates +flow_rates = flow_system.statistics.flow_rates +print(flow_rates) + +# Get flow hours (energy) +flow_hours = flow_system.statistics.flow_hours +total_heat = flow_hours['Boiler(Q_th)'].sum() + +# Get sizes (capacities) +sizes = flow_system.statistics.sizes +print(f"Boiler size: {sizes['Boiler(Q_th)'].values}") + +# Get storage charge states +charge_states = flow_system.statistics.charge_states + +# Get effect breakdown by contributor +temporal = flow_system.statistics.temporal_effects +print(temporal['costs']) # Costs per contributor per timestep + +# Group by component +temporal['costs'].groupby('component').sum() +``` + +### Effect Analysis + +Analyze how effects (costs, emissions, etc.) are distributed: + +```python +# Access effects via the new properties +stats = flow_system.statistics + +# Temporal effects per timestep (costs, CO2, etc. per contributor) +stats.temporal_effects['costs'] # DataArray with dims [time, contributor] +stats.temporal_effects['costs'].sum('contributor') # Total per timestep + +# Periodic effects (investment costs, etc.) +stats.periodic_effects['costs'] # DataArray with dim [contributor] + +# Total effects (temporal + periodic combined) +stats.total_effects['costs'].sum('contributor') # Grand total + +# Group by component or component type +stats.total_effects['costs'].groupby('component').sum() +stats.total_effects['costs'].groupby('component_type').sum() +``` + +!!! tip "Contributors" + Contributors are automatically detected from the optimization solution and include: + + - **Flows**: Individual flows with `effects_per_flow_hour` + - **Components**: Components with `effects_per_active_hour` or similar direct effects + + Each contributor has associated metadata (`component` and `component_type` coordinates) for flexible groupby operations. + +## Plotting Results + +The `statistics.plot` accessor provides visualization methods: + +```python +# Balance plots +flow_system.statistics.plot.balance('HeatBus') +flow_system.statistics.plot.balance('Boiler') + +# Heatmaps +flow_system.statistics.plot.heatmap('Boiler(Q_th)|flow_rate') + +# Duration curves +flow_system.statistics.plot.duration_curve('Boiler(Q_th)') + +# Sankey diagrams +flow_system.statistics.plot.sankey() + +# Effects breakdown +flow_system.statistics.plot.effects() # Total costs by component +flow_system.statistics.plot.effects(effect='costs', by='contributor') # By individual flows +flow_system.statistics.plot.effects(aspect='temporal', by='time') # Over time +``` + +See [Plotting Results](../results-plotting.md) for comprehensive plotting documentation. + +## Network Visualization + +The `topology` accessor lets you visualize and inspect your system structure: + +### Static HTML Visualization + +Generate an interactive network diagram using PyVis: + +```python +# Default: saves to 'flow_system.html' and opens in browser +flow_system.topology.plot() + +# Custom options +flow_system.topology.plot( + path='output/my_network.html', + controls=['nodes', 'layout', 'physics'], + show=True +) +``` + +**Parameters:** + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `path` | str, Path, or False | `'flow_system.html'` | Where to save the HTML file | +| `controls` | bool or list | `True` | UI controls to show | +| `show` | bool | `None` | Whether to open in browser | + +### Interactive App + +Launch a Dash/Cytoscape application for exploring the network: + +```python +# Start the visualization server +flow_system.topology.start_app() + +# ... interact with the visualization in your browser ... + +# Stop when done +flow_system.topology.stop_app() +``` + +!!! note "Optional Dependencies" + The interactive app requires additional packages: + ```bash + pip install flixopt[network_viz] + ``` + +### Network Structure Info + +Get node and edge information programmatically: + +```python +nodes, edges = flow_system.topology.infos() + +# nodes: dict mapping labels to properties +# {'Boiler': {'label': 'Boiler', 'class': 'Component', 'infos': '...'}, ...} + +# edges: dict mapping flow labels to properties +# {'Boiler(Q_th)': {'label': 'Q_th', 'start': 'Boiler', 'end': 'Heat', ...}, ...} + +print(f"Components and buses: {list(nodes.keys())}") +print(f"Flows: {list(edges.keys())}") ``` ## Saving and Loading @@ -36,16 +214,70 @@ print(flow.solution) Save the FlowSystem (including solution) for later analysis: ```python -# Save to NetCDF +# Save to NetCDF (recommended for large datasets) flow_system.to_netcdf('results/my_system.nc') # Load later loaded_fs = fx.FlowSystem.from_netcdf('results/my_system.nc') print(loaded_fs.solution) + +# Save to JSON (human-readable, smaller datasets) +flow_system.to_json('results/my_system.json') +loaded_fs = fx.FlowSystem.from_json('results/my_system.json') ``` -## Getting Started +## Working with xarray + +All result data uses [xarray](https://docs.xarray.dev/), giving you powerful data manipulation: + +```python +solution = flow_system.solution + +# Select specific times +summer = solution.sel(time=slice('2024-06-01', '2024-08-31')) + +# Aggregate over dimensions +daily_avg = solution.resample(time='D').mean() + +# Convert to pandas +df = solution['Boiler(Q_th)|flow_rate'].to_dataframe() + +# Export to various formats +solution.to_netcdf('full_solution.nc') +df.to_csv('boiler_flow.csv') +``` + +## Complete Example + +```python +import flixopt as fx +import pandas as pd + +# Build and optimize +timesteps = pd.date_range('2024-01-01', periods=168, freq='h') +flow_system = fx.FlowSystem(timesteps) +# ... add elements ... +flow_system.optimize(fx.solvers.HighsSolver()) + +# Visualize network structure +flow_system.topology.plot(path='system_network.html') + +# Analyze results +print("=== Flow Statistics ===") +print(flow_system.statistics.flow_hours) + +print("\n=== Effect Breakdown ===") +print(flow_system.statistics.total_effects) + +# Create plots +flow_system.statistics.plot.balance('HeatBus') +flow_system.statistics.plot.heatmap('Boiler(Q_th)|flow_rate') + +# Save for later +flow_system.to_netcdf('results/optimized_system.nc') +``` -For now, see: +## Next Steps -- **[Examples](../../examples/index.md)** - Result analysis patterns in working code +- [Plotting Results](../results-plotting.md) - Detailed plotting documentation +- [Examples](../../examples/index.md) - Working code examples diff --git a/examples/00_Minmal/minimal_example.py b/examples/00_Minmal/minimal_example.py index 7a94b2222..207faa9a9 100644 --- a/examples/00_Minmal/minimal_example.py +++ b/examples/00_Minmal/minimal_example.py @@ -32,5 +32,5 @@ ), ) - optimization = fx.Optimization('Simulation1', flow_system).solve(fx.solvers.HighsSolver(0.01, 60)) - optimization.results['Heat'].plot_node_balance() + flow_system.optimize(fx.solvers.HighsSolver(0.01, 60)) + flow_system.statistics.plot.balance('Heat') diff --git a/examples/01_Simple/simple_example.py b/examples/01_Simple/simple_example.py index c2d6d88e1..13781c973 100644 --- a/examples/01_Simple/simple_example.py +++ b/examples/01_Simple/simple_example.py @@ -100,28 +100,22 @@ flow_system.add_elements(costs, CO2, boiler, storage, chp, heat_sink, gas_source, power_sink) # Visualize the flow system for validation purposes - flow_system.plot_network() + flow_system.topology.plot() - # --- Define and Run Calculation --- - # Create a calculation object to model the Flow System - optimization = fx.Optimization(name='Sim1', flow_system=flow_system) - optimization.do_modeling() # Translate the model to a solvable form, creating equations and Variables - - # --- Solve the Calculation and Save Results --- - optimization.solve(fx.solvers.HighsSolver(mip_gap=0, time_limit_seconds=30)) + # --- Define and Solve Optimization --- + flow_system.optimize(fx.solvers.HighsSolver(mip_gap=0, time_limit_seconds=30)) # --- Analyze Results --- - # Colors are automatically assigned using default colormap - # Optional: Configure custom colors with - optimization.results.setup_colors() - optimization.results['Fernwärme'].plot_node_balance_pie() - optimization.results['Fernwärme'].plot_node_balance() - optimization.results['Storage'].plot_charge_state() - optimization.results.plot_heatmap('CHP(Q_th)|flow_rate') - - # Convert the results for the storage component to a dataframe and display - df = optimization.results['Storage'].node_balance_with_charge_state() - print(df) - - # Save results to file for later usage - optimization.results.to_file() + # Plotting through statistics accessor - returns PlotResult with .data and .figure + flow_system.statistics.plot.balance('Fernwärme') + flow_system.statistics.plot.balance('Storage') + flow_system.statistics.plot.heatmap('CHP(Q_th)|flow_rate') + flow_system.statistics.plot.heatmap('Storage|charge_state') + + # Access data as xarray Datasets + print(flow_system.statistics.flow_rates) + print(flow_system.statistics.charge_states) + + # Duration curve and effects analysis + flow_system.statistics.plot.duration_curve('Boiler(Q_th)|flow_rate') + print(flow_system.statistics.temporal_effects) diff --git a/examples/02_Complex/complex_example.py b/examples/02_Complex/complex_example.py index 3806fde40..f1b524a2b 100644 --- a/examples/02_Complex/complex_example.py +++ b/examples/02_Complex/complex_example.py @@ -15,7 +15,6 @@ check_penalty = False imbalance_penalty = 1e5 use_chp_with_piecewise_conversion = True - time_indices = None # Define specific time steps for custom optimizations, or use the entire series # --- Define Demand and Price Profiles --- # Input data for electricity and heat demands, as well as electricity price @@ -189,22 +188,19 @@ print(flow_system) # Get a string representation of the FlowSystem try: - flow_system.start_network_app() # Start the network app + flow_system.topology.start_app() # Start the network app except ImportError as e: print(f'Network app requires extra dependencies: {e}') # --- Solve FlowSystem --- - optimization = fx.Optimization('complex example', flow_system, time_indices) - optimization.do_modeling() - - optimization.solve(fx.solvers.HighsSolver(0.01, 60)) + flow_system.optimize(fx.solvers.HighsSolver(0.01, 60)) # --- Results --- - # You can analyze results directly or save them to file and reload them later. - optimization.results.to_file() - - # But let's plot some results anyway - optimization.results.plot_heatmap('BHKW2(Q_th)|flow_rate') - optimization.results['BHKW2'].plot_node_balance() - optimization.results['Speicher'].plot_charge_state() - optimization.results['Fernwärme'].plot_node_balance_pie() + # Save the flow system with solution to file for later analysis + flow_system.to_netcdf('results/complex_example.nc') + + # Plot results using the statistics accessor + flow_system.statistics.plot.heatmap('BHKW2(Q_th)|flow_rate') + flow_system.statistics.plot.balance('BHKW2') + flow_system.statistics.plot.heatmap('Speicher|charge_state') + flow_system.statistics.plot.balance('Fernwärme') diff --git a/examples/02_Complex/complex_example_results.py b/examples/02_Complex/complex_example_results.py index c4e9bb4f2..6978caff1 100644 --- a/examples/02_Complex/complex_example_results.py +++ b/examples/02_Complex/complex_example_results.py @@ -1,5 +1,5 @@ """ -This script shows how load results of a prior calcualtion and how to analyze them. +This script shows how to load results of a prior optimization and how to analyze them. """ import flixopt as fx @@ -7,31 +7,32 @@ if __name__ == '__main__': fx.CONFIG.exploring() - # --- Load Results --- + # --- Load FlowSystem with Solution --- try: - results = fx.results.Results.from_file('results', 'complex example') + flow_system = fx.FlowSystem.from_netcdf('results/complex_example.nc') except FileNotFoundError as e: raise FileNotFoundError( - f"Results file not found in the specified directory ('results'). " + f"Results file not found ('results/complex_example.nc'). " f"Please ensure that the file is generated by running 'complex_example.py'. " f'Original error: {e}' ) from e # --- Basic overview --- - results.plot_network() - results['Fernwärme'].plot_node_balance() + flow_system.topology.plot() + flow_system.statistics.plot.balance('Fernwärme') # --- Detailed Plots --- - # In depth plot for individual flow rates ('__' is used as the delimiter between Component and Flow - results.plot_heatmap('Wärmelast(Q_th_Last)|flow_rate') - for bus in results.buses.values(): - bus.plot_node_balance_pie(show=False, save=f'results/{bus.label}--pie.html') - bus.plot_node_balance(show=False, save=f'results/{bus.label}--balance.html') + # In-depth plot for individual flow rates + flow_system.statistics.plot.heatmap('Wärmelast(Q_th_Last)|flow_rate') + + # Plot balances for all buses + for bus in flow_system.buses.values(): + flow_system.statistics.plot.balance(bus.label).to_html(f'results/{bus.label}--balance.html') # --- Plotting internal variables manually --- - results.plot_heatmap('BHKW2(Q_th)|status') - results.plot_heatmap('Kessel(Q_th)|status') + flow_system.statistics.plot.heatmap('BHKW2(Q_th)|status') + flow_system.statistics.plot.heatmap('Kessel(Q_th)|status') - # Dataframes from results: - fw_bus = results['Fernwärme'].node_balance().to_dataframe() - all = results.solution.to_dataframe() + # Access data as DataFrames: + print(flow_system.statistics.flow_rates.to_dataframe()) + print(flow_system.solution.to_dataframe()) diff --git a/examples/03_Optimization_modes/example_optimization_modes.py b/examples/03_Optimization_modes/example_optimization_modes.py index 8f26d84b4..3dcd8bd1c 100644 --- a/examples/03_Optimization_modes/example_optimization_modes.py +++ b/examples/03_Optimization_modes/example_optimization_modes.py @@ -16,9 +16,11 @@ def get_solutions(optimizations: list, variable: str) -> xr.Dataset: dataarrays = [] for optimization in optimizations: if optimization.name == 'Segmented': + # SegmentedOptimization requires special handling to remove overlaps dataarrays.append(optimization.results.solution_without_overlap(variable).rename(optimization.name)) else: - dataarrays.append(optimization.results.solution[variable].rename(optimization.name)) + # For Full and Clustered, access solution from the flow_system + dataarrays.append(optimization.flow_system.solution[variable].rename(optimization.name)) return xr.merge(dataarrays, join='outer') @@ -176,7 +178,7 @@ def get_solutions(optimizations: list, variable: str) -> xr.Dataset: a_kwk, a_speicher, ) - flow_system.plot_network() + flow_system.topology.plot() # Optimizations optimizations: list[fx.Optimization | fx.ClusteredOptimization | fx.SegmentedOptimization] = [] diff --git a/examples/04_Scenarios/scenario_example.py b/examples/04_Scenarios/scenario_example.py index 672df5c7f..e3c6f5fd3 100644 --- a/examples/04_Scenarios/scenario_example.py +++ b/examples/04_Scenarios/scenario_example.py @@ -120,7 +120,7 @@ thermal_flow=fx.Flow( label='Q_th', bus='Fernwärme', - size=50, + size=100, relative_minimum=0.1, relative_maximum=1, status_parameters=fx.StatusParameters(), @@ -135,7 +135,7 @@ thermal_efficiency=0.48, # Realistic thermal efficiency (48%) electrical_efficiency=0.40, # Realistic electrical efficiency (40%) electrical_flow=fx.Flow( - 'P_el', bus='Strom', size=60, relative_minimum=5 / 60, status_parameters=fx.StatusParameters() + 'P_el', bus='Strom', size=80, relative_minimum=5 / 80, status_parameters=fx.StatusParameters() ), thermal_flow=fx.Flow('Q_th', bus='Fernwärme'), fuel_flow=fx.Flow('Q_fu', bus='Gas'), @@ -192,35 +192,18 @@ flow_system.add_elements(costs, CO2, boiler, storage, chp, heat_sink, gas_source, power_sink) # Visualize the flow system for validation purposes - flow_system.plot_network() - - # --- Define and Run Calculation --- - # Create a calculation object to model the Flow System - optimization = fx.Optimization(name='Sim1', flow_system=flow_system) - optimization.do_modeling() # Translate the model to a solvable form, creating equations and Variables - - # --- Solve the Calculation and Save Results --- - optimization.solve(fx.solvers.HighsSolver(mip_gap=0, time_limit_seconds=30)) - - optimization.results.setup_colors( - { - 'CHP': 'red', - 'Greys': ['Gastarif', 'Einspeisung', 'Heat Demand'], - 'Storage': 'blue', - 'Boiler': 'orange', - } - ) + flow_system.topology.plot() - optimization.results.plot_heatmap('CHP(Q_th)|flow_rate') + # --- Define and Solve Optimization --- + flow_system.optimize(fx.solvers.HighsSolver(mip_gap=0, time_limit_seconds=30)) # --- Analyze Results --- - optimization.results['Fernwärme'].plot_node_balance(mode='stacked_bar') - optimization.results.plot_heatmap('CHP(Q_th)|flow_rate') - optimization.results['Storage'].plot_charge_state() - optimization.results['Fernwärme'].plot_node_balance_pie(select={'period': 2020, 'scenario': 'Base Case'}) - - # Convert the results for the storage component to a dataframe and display - df = optimization.results['Storage'].node_balance_with_charge_state() - - # Save results to file for later usage - optimization.results.to_file() + # Plotting through statistics accessor - returns PlotResult with .data and .figure + flow_system.statistics.plot.heatmap('CHP(Q_th)|flow_rate') + flow_system.statistics.plot.balance('Fernwärme') + flow_system.statistics.plot.balance('Storage') + flow_system.statistics.plot.heatmap('Storage|charge_state') + + # Access data as xarray Datasets + print(flow_system.statistics.flow_rates) + print(flow_system.statistics.charge_states) diff --git a/examples/05_Two-stage-optimization/two_stage_optimization.py b/examples/05_Two-stage-optimization/two_stage_optimization.py index 9e102c44f..8dea1713b 100644 --- a/examples/05_Two-stage-optimization/two_stage_optimization.py +++ b/examples/05_Two-stage-optimization/two_stage_optimization.py @@ -53,7 +53,7 @@ label='Q_fu', bus='Gas', size=fx.InvestParameters( - effects_of_investment_per_size={'costs': 1_000}, minimum_size=10, maximum_size=500 + effects_of_investment_per_size={'costs': 1_000}, minimum_size=10, maximum_size=600 ), relative_minimum=0.2, previous_flow_rate=20, @@ -87,8 +87,8 @@ eta_discharge=1, relative_loss_per_hour=0.001, prevent_simultaneous_charge_and_discharge=True, - charging=fx.Flow('Q_th_load', size=137, bus='Fernwärme'), - discharging=fx.Flow('Q_th_unload', size=158, bus='Fernwärme'), + charging=fx.Flow('Q_th_load', size=200, bus='Fernwärme'), + discharging=fx.Flow('Q_th_unload', size=200, bus='Fernwärme'), ), fx.Sink( 'Wärmelast', inputs=[fx.Flow('Q_th_Last', bus='Fernwärme', size=1, fixed_relative_profile=heat_demand)] @@ -122,34 +122,39 @@ ) # Separate optimization of flow sizes and dispatch + # Stage 1: Optimize sizes using downsampled (2h) data start = timeit.default_timer() calculation_sizing = fx.Optimization('Sizing', flow_system.resample('2h')) calculation_sizing.do_modeling() calculation_sizing.solve(fx.solvers.HighsSolver(0.1 / 100, 60)) timer_sizing = timeit.default_timer() - start + # Stage 2: Optimize dispatch with fixed sizes from Stage 1 start = timeit.default_timer() calculation_dispatch = fx.Optimization('Dispatch', flow_system) calculation_dispatch.do_modeling() - calculation_dispatch.fix_sizes(calculation_sizing.results.solution) + calculation_dispatch.fix_sizes(calculation_sizing.flow_system.solution) calculation_dispatch.solve(fx.solvers.HighsSolver(0.1 / 100, 60)) timer_dispatch = timeit.default_timer() - start - if (calculation_dispatch.results.sizes().round(5) == calculation_sizing.results.sizes().round(5)).all().item(): + # Verify sizes were correctly fixed + dispatch_sizes = calculation_dispatch.flow_system.statistics.sizes + sizing_sizes = calculation_sizing.flow_system.statistics.sizes + if (dispatch_sizes.round(5).to_dataarray() == sizing_sizes.round(5).to_dataarray()).all().item(): logger.info('Sizes were correctly equalized') else: raise RuntimeError('Sizes were not correctly equalized') - # Optimization of both flow sizes and dispatch together + # Combined optimization: optimize both sizes and dispatch together start = timeit.default_timer() calculation_combined = fx.Optimization('Combined', flow_system) calculation_combined.do_modeling() calculation_combined.solve(fx.solvers.HighsSolver(0.1 / 100, 600)) timer_combined = timeit.default_timer() - start - # Comparison of results + # Comparison of results - access solutions from flow_system comparison = xr.concat( - [calculation_combined.results.solution, calculation_dispatch.results.solution], dim='mode' + [calculation_combined.flow_system.solution, calculation_dispatch.flow_system.solution], dim='mode' ).assign_coords(mode=['Combined', 'Two-stage']) comparison['Duration [s]'] = xr.DataArray([timer_combined, timer_sizing + timer_dispatch], dims='mode') diff --git a/flixopt/clustering.py b/flixopt/clustering.py index 1c6f7511b..1595ace5d 100644 --- a/flixopt/clustering.py +++ b/flixopt/clustering.py @@ -7,7 +7,6 @@ import copy import logging -import pathlib import timeit from typing import TYPE_CHECKING @@ -29,6 +28,8 @@ ) if TYPE_CHECKING: + import pathlib + import linopy import pandas as pd import plotly.graph_objects as go @@ -145,7 +146,7 @@ def use_extreme_periods(self): return self.time_series_for_high_peaks or self.time_series_for_low_peaks def plot(self, colormap: str | None = None, show: bool = True, save: pathlib.Path | None = None) -> go.Figure: - from . import plotting + import plotly.express as px df_org = self.original_data.copy().rename( columns={col: f'Original - {col}' for col in self.original_data.columns} @@ -156,10 +157,17 @@ def plot(self, colormap: str | None = None, show: bool = True, save: pathlib.Pat colors = list( process_colors(colormap or CONFIG.Plotting.default_qualitative_colorscale, list(df_org.columns)).values() ) - fig = plotting.with_plotly(df_org.to_xarray(), 'line', colors=colors, xlabel='Time in h') + + # Create line plot for original data (dashed) + index_name = df_org.index.name or 'index' + df_org_long = df_org.reset_index().melt(id_vars=index_name, var_name='variable', value_name='value') + fig = px.line(df_org_long, x=index_name, y='value', color='variable', color_discrete_sequence=colors) for trace in fig.data: - trace.update(dict(line=dict(dash='dash'))) - fig2 = plotting.with_plotly(df_agg.to_xarray(), 'line', colors=colors, xlabel='Time in h') + trace.update(line=dict(dash='dash')) + + # Add aggregated data (solid lines) + df_agg_long = df_agg.reset_index().melt(id_vars=index_name, var_name='variable', value_name='value') + fig2 = px.line(df_agg_long, x=index_name, y='value', color='variable', color_discrete_sequence=colors) for trace in fig2.data: fig.add_trace(trace) @@ -169,14 +177,10 @@ def plot(self, colormap: str | None = None, show: bool = True, save: pathlib.Pat yaxis_title='Value', ) - plotting.export_figure( - figure_like=fig, - default_path=pathlib.Path('aggregated data.html'), - default_filetype='.html', - user_path=save, - show=show, - save=save is not None, - ) + if save is not None: + fig.write_html(str(save)) + if show: + fig.show() return fig diff --git a/flixopt/color_processing.py b/flixopt/color_processing.py index 2959acc82..f6e9a3b9f 100644 --- a/flixopt/color_processing.py +++ b/flixopt/color_processing.py @@ -15,6 +15,57 @@ logger = logging.getLogger('flixopt') +# Type alias for flexible color input +ColorType = str | list[str] | dict[str, str] +"""Flexible color specification type supporting multiple input formats for visualization. + +Color specifications can take several forms to accommodate different use cases: + +**Named colorscales** (str): + - Standard colorscales: 'turbo', 'plasma', 'cividis', 'tab10', 'Set1' + - Energy-focused: 'portland' (custom flixopt colorscale for energy systems) + - Backend-specific maps available in Plotly and Matplotlib + +**Color Lists** (list[str]): + - Explicit color sequences: ['red', 'blue', 'green', 'orange'] + - HEX codes: ['#FF0000', '#0000FF', '#00FF00', '#FFA500'] + - Mixed formats: ['red', '#0000FF', 'green', 'orange'] + +**Label-to-Color Mapping** (dict[str, str]): + - Explicit associations: {'Wind': 'skyblue', 'Solar': 'gold', 'Gas': 'brown'} + - Ensures consistent colors across different plots and datasets + - Ideal for energy system components with semantic meaning + +Examples: + ```python + # Named colorscale + colors = 'turbo' # Automatic color generation + + # Explicit color list + colors = ['red', 'blue', 'green', '#FFD700'] + + # Component-specific mapping + colors = { + 'Wind_Turbine': 'skyblue', + 'Solar_Panel': 'gold', + 'Natural_Gas': 'brown', + 'Battery': 'green', + 'Electric_Load': 'darkred' + } + ``` + +Color Format Support: + - **Named Colors**: 'red', 'blue', 'forestgreen', 'darkorange' + - **HEX Codes**: '#FF0000', '#0000FF', '#228B22', '#FF8C00' + - **RGB Tuples**: (255, 0, 0), (0, 0, 255) [Matplotlib only] + - **RGBA**: 'rgba(255,0,0,0.8)' [Plotly only] + +References: + - HTML Color Names: https://htmlcolorcodes.com/color-names/ + - Matplotlib colorscales: https://matplotlib.org/stable/tutorials/colors/colorscales.html + - Plotly Built-in Colorscales: https://plotly.com/python/builtin-colorscales/ +""" + def _rgb_string_to_hex(color: str) -> str: """Convert Plotly RGB/RGBA string format to hex. diff --git a/flixopt/flow_system.py b/flixopt/flow_system.py index 9906fd27a..5fda024f7 100644 --- a/flixopt/flow_system.py +++ b/flixopt/flow_system.py @@ -5,6 +5,7 @@ from __future__ import annotations import logging +import pathlib import warnings from collections import defaultdict from itertools import chain @@ -15,7 +16,7 @@ import xarray as xr from . import io as fx_io -from .config import CONFIG +from .config import CONFIG, DEPRECATION_REMOVAL_VERSION from .core import ( ConversionError, DataConverter, @@ -25,11 +26,12 @@ from .effects import Effect, EffectCollection from .elements import Bus, Component, Flow from .optimize_accessor import OptimizeAccessor +from .statistics_accessor import StatisticsAccessor from .structure import CompositeContainerMixin, Element, ElementContainer, FlowSystemModel, Interface +from .topology_accessor import TopologyAccessor from .transform_accessor import TransformAccessor if TYPE_CHECKING: - import pathlib from collections.abc import Collection import pyvis @@ -166,6 +168,7 @@ def __init__( scenario_weights: Numeric_S | None = None, scenario_independent_sizes: bool | list[str] = True, scenario_independent_flow_rates: bool | list[str] = False, + name: str | None = None, ): self.timesteps = self._validate_timesteps(timesteps) @@ -206,15 +209,21 @@ def __init__( self._flows_cache: ElementContainer[Flow] | None = None # Solution dataset - populated after optimization or loaded from file - self.solution: xr.Dataset | None = None + self._solution: xr.Dataset | None = None # Clustering info - populated by transform.cluster() self._clustering_info: dict | None = None + # Statistics accessor cache - lazily initialized, invalidated on new solution + self._statistics: StatisticsAccessor | None = None + # Use properties to validate and store scenario dimension settings self.scenario_independent_sizes = scenario_independent_sizes self.scenario_independent_flow_rates = scenario_independent_flow_rates + # Optional name for identification (derived from filename on load) + self.name = name + @staticmethod def _validate_timesteps(timesteps: pd.DatetimeIndex) -> pd.DatetimeIndex: """Validate timesteps format and rename if needed.""" @@ -654,22 +663,53 @@ def from_dataset(cls, ds: xr.Dataset) -> FlowSystem: return flow_system - def to_netcdf(self, path: str | pathlib.Path, compression: int = 0): + def to_netcdf(self, path: str | pathlib.Path, compression: int = 0, overwrite: bool = True): """ Save the FlowSystem to a NetCDF file. Ensures FlowSystem is connected before saving. + The FlowSystem's name is automatically set from the filename + (without extension) when saving. + Args: - path: The path to the netCDF file. - compression: The compression level to use when saving the file. + path: The path to the netCDF file. Parent directories are created if they don't exist. + compression: The compression level to use when saving the file (0-9). + overwrite: If True (default), overwrite existing file. If False, raise error if file exists. + + Raises: + FileExistsError: If overwrite=False and file already exists. """ if not self.connected_and_transformed: logger.warning('FlowSystem is not connected. Calling connect_and_transform() now.') self.connect_and_transform() - super().to_netcdf(path, compression) + path = pathlib.Path(path) + # Set name from filename (without extension) + self.name = path.stem + + super().to_netcdf(path, compression, overwrite) logger.info(f'Saved FlowSystem to {path}') + @classmethod + def from_netcdf(cls, path: str | pathlib.Path) -> FlowSystem: + """ + Load a FlowSystem from a NetCDF file. + + The FlowSystem's name is automatically derived from the filename + (without extension), overriding any name that may have been stored. + + Args: + path: Path to the NetCDF file + + Returns: + FlowSystem instance with name set from filename + """ + path = pathlib.Path(path) + flow_system = super().from_netcdf(path) + # Derive name from filename (without extension) + flow_system.name = path.stem + return flow_system + def get_structure(self, clean: bool = False, stats: bool = False) -> dict: """ Get FlowSystem structure. @@ -913,7 +953,7 @@ def solve(self, solver: _Solver) -> FlowSystem: **solver.options, ) - if self.model.termination_condition == 'infeasible': + if 'infeasible' in self.model.termination_condition: if CONFIG.Solving.compute_infeasibilities: import io from contextlib import redirect_stdout @@ -935,6 +975,17 @@ def solve(self, solver: _Solver) -> FlowSystem: return self + @property + def solution(self) -> xr.Dataset | None: + """Get the solution dataset.""" + return self._solution + + @solution.setter + def solution(self, value: xr.Dataset | None) -> None: + """Set the solution dataset and invalidate statistics cache.""" + self._solution = value + self._statistics = None # Invalidate cached statistics + @property def optimize(self) -> OptimizeAccessor: """ @@ -986,6 +1037,62 @@ def transform(self) -> TransformAccessor: """ return TransformAccessor(self) + @property + def statistics(self) -> StatisticsAccessor: + """ + Access statistics and plotting methods for optimization results. + + This property returns a StatisticsAccessor that provides methods to analyze + and visualize optimization results stored in this FlowSystem's solution. + + Note: + The FlowSystem must have a solution (from optimize() or solve()) before + most statistics methods can be used. + + Returns: + A cached StatisticsAccessor instance. + + Examples: + After optimization: + + >>> flow_system.optimize(solver) + >>> flow_system.statistics.plot.balance('ElectricityBus') + >>> flow_system.statistics.plot.heatmap('Boiler|on') + >>> ds = flow_system.statistics.flow_rates # Get data for analysis + """ + if self._statistics is None: + self._statistics = StatisticsAccessor(self) + return self._statistics + + @property + def topology(self) -> TopologyAccessor: + """ + Access network topology inspection and visualization methods. + + This property returns a TopologyAccessor that provides methods to inspect + the network structure and visualize it. + + Returns: + A TopologyAccessor instance. + + Examples: + Visualize the network: + + >>> flow_system.topology.plot() + >>> flow_system.topology.plot(path='my_network.html', show=True) + + Interactive visualization: + + >>> flow_system.topology.start_app() + >>> # ... interact with the visualization ... + >>> flow_system.topology.stop_app() + + Get network structure info: + + >>> nodes, edges = flow_system.topology.infos() + """ + return TopologyAccessor(self) + def plot_network( self, path: bool | str | pathlib.Path = 'flow_system.html', @@ -996,114 +1103,59 @@ def plot_network( show: bool | None = None, ) -> pyvis.network.Network | None: """ - Visualizes the network structure of a FlowSystem using PyVis, saving it as an interactive HTML file. - - Args: - path: Path to save the HTML visualization. - - `False`: Visualization is created but not saved. - - `str` or `Path`: Specifies file path (default: 'flow_system.html'). - controls: UI controls to add to the visualization. - - `True`: Enables all available controls. - - `List`: Specify controls, e.g., ['nodes', 'layout']. - - Options: 'nodes', 'edges', 'layout', 'interaction', 'manipulation', 'physics', 'selection', 'renderer'. - show: Whether to open the visualization in the web browser. - - Returns: - - 'pyvis.network.Network' | None: The `Network` instance representing the visualization, or `None` if `pyvis` is not installed. - - Examples: - >>> flow_system.plot_network() - >>> flow_system.plot_network(show=False) - >>> flow_system.plot_network(path='output/custom_network.html', controls=['nodes', 'layout']) + Deprecated: Use `flow_system.topology.plot()` instead. - Notes: - - This function requires `pyvis`. If not installed, the function prints a warning and returns `None`. - - Nodes are styled based on type (e.g., circles for buses, boxes for components) and annotated with node information. + Visualizes the network structure of a FlowSystem using PyVis. """ - from . import plotting - - node_infos, edge_infos = self.network_infos() - return plotting.plot_network( - node_infos, edge_infos, path, controls, show if show is not None else CONFIG.Plotting.default_show + warnings.warn( + f'plot_network() is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' + 'Use flow_system.topology.plot() instead.', + DeprecationWarning, + stacklevel=2, ) + return self.topology.plot(path=path, controls=controls, show=show) - def start_network_app(self): - """Visualizes the network structure of a FlowSystem using Dash, Cytoscape, and networkx. - Requires optional dependencies: dash, dash-cytoscape, dash-daq, networkx, flask, werkzeug. + def start_network_app(self) -> None: """ - from .network_app import DASH_CYTOSCAPE_AVAILABLE, VISUALIZATION_ERROR, flow_graph, shownetwork + Deprecated: Use `flow_system.topology.start_app()` instead. + Visualizes the network structure using Dash and Cytoscape. + """ warnings.warn( - 'The network visualization is still experimental and might change in the future.', + f'start_network_app() is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' + 'Use flow_system.topology.start_app() instead.', + DeprecationWarning, stacklevel=2, - category=UserWarning, ) + self.topology.start_app() - if not DASH_CYTOSCAPE_AVAILABLE: - raise ImportError( - f'Network visualization requires optional dependencies. ' - f'Install with: `pip install flixopt[network_viz]`, `pip install flixopt[full]` ' - f'or: `pip install dash dash-cytoscape dash-daq networkx werkzeug`. ' - f'Original error: {VISUALIZATION_ERROR}' - ) - - if not self._connected_and_transformed: - self._connect_network() - - if self._network_app is not None: - logger.warning('The network app is already running. Restarting it.') - self.stop_network_app() - - self._network_app = shownetwork(flow_graph(self)) - - def stop_network_app(self): - """Stop the network visualization server.""" - from .network_app import DASH_CYTOSCAPE_AVAILABLE, VISUALIZATION_ERROR - - if not DASH_CYTOSCAPE_AVAILABLE: - raise ImportError( - f'Network visualization requires optional dependencies. ' - f'Install with: `pip install flixopt[network_viz]`, `pip install flixopt[full]` ' - f'or: `pip install dash dash-cytoscape dash-daq networkx werkzeug`. ' - f'Original error: {VISUALIZATION_ERROR}' - ) - - if self._network_app is None: - logger.warning("No network app is currently running. Can't stop it") - return + def stop_network_app(self) -> None: + """ + Deprecated: Use `flow_system.topology.stop_app()` instead. - try: - logger.info('Stopping network visualization server...') - self._network_app.server_instance.shutdown() - logger.info('Network visualization stopped.') - except Exception as e: - logger.error(f'Failed to stop the network visualization app: {e}') - finally: - self._network_app = None + Stop the network visualization server. + """ + warnings.warn( + f'stop_network_app() is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' + 'Use flow_system.topology.stop_app() instead.', + DeprecationWarning, + stacklevel=2, + ) + self.topology.stop_app() def network_infos(self) -> tuple[dict[str, dict[str, str]], dict[str, dict[str, str]]]: - if not self.connected_and_transformed: - self.connect_and_transform() - nodes = { - node.label_full: { - 'label': node.label, - 'class': 'Bus' if isinstance(node, Bus) else 'Component', - 'infos': node.__str__(), - } - for node in chain(self.components.values(), self.buses.values()) - } - - edges = { - flow.label_full: { - 'label': flow.label, - 'start': flow.bus if flow.is_input_in_component else flow.component, - 'end': flow.component if flow.is_input_in_component else flow.bus, - 'infos': flow.__str__(), - } - for flow in self.flows.values() - } + """ + Deprecated: Use `flow_system.topology.infos()` instead. - return nodes, edges + Get network topology information as dictionaries. + """ + warnings.warn( + f'network_infos() is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' + 'Use flow_system.topology.infos() instead.', + DeprecationWarning, + stacklevel=2, + ) + return self.topology.infos() def _check_if_element_is_unique(self, element: Element) -> None: """ diff --git a/flixopt/plotting.py b/flixopt/plotting.py index 0a8dfbc9b..db5a3eb5c 100644 --- a/flixopt/plotting.py +++ b/flixopt/plotting.py @@ -39,7 +39,7 @@ import plotly.offline import xarray as xr -from .color_processing import process_colors +from .color_processing import ColorType, process_colors from .config import CONFIG if TYPE_CHECKING: @@ -66,56 +66,6 @@ plt.register_cmap(name='portland', cmap=mcolors.LinearSegmentedColormap.from_list('portland', _portland_colors)) -ColorType = str | list[str] | dict[str, str] -"""Flexible color specification type supporting multiple input formats for visualization. - -Color specifications can take several forms to accommodate different use cases: - -**Named colorscales** (str): - - Standard colorscales: 'turbo', 'plasma', 'cividis', 'tab10', 'Set1' - - Energy-focused: 'portland' (custom flixopt colorscale for energy systems) - - Backend-specific maps available in Plotly and Matplotlib - -**Color Lists** (list[str]): - - Explicit color sequences: ['red', 'blue', 'green', 'orange'] - - HEX codes: ['#FF0000', '#0000FF', '#00FF00', '#FFA500'] - - Mixed formats: ['red', '#0000FF', 'green', 'orange'] - -**Label-to-Color Mapping** (dict[str, str]): - - Explicit associations: {'Wind': 'skyblue', 'Solar': 'gold', 'Gas': 'brown'} - - Ensures consistent colors across different plots and datasets - - Ideal for energy system components with semantic meaning - -Examples: - ```python - # Named colorscale - colors = 'turbo' # Automatic color generation - - # Explicit color list - colors = ['red', 'blue', 'green', '#FFD700'] - - # Component-specific mapping - colors = { - 'Wind_Turbine': 'skyblue', - 'Solar_Panel': 'gold', - 'Natural_Gas': 'brown', - 'Battery': 'green', - 'Electric_Load': 'darkred' - } - ``` - -Color Format Support: - - **Named Colors**: 'red', 'blue', 'forestgreen', 'darkorange' - - **HEX Codes**: '#FF0000', '#0000FF', '#228B22', '#FF8C00' - - **RGB Tuples**: (255, 0, 0), (0, 0, 255) [Matplotlib only] - - **RGBA**: 'rgba(255,0,0,0.8)' [Plotly only] - -References: - - HTML Color Names: https://htmlcolorcodes.com/color-names/ - - Matplotlib colorscales: https://matplotlib.org/stable/tutorials/colors/colorscales.html - - Plotly Built-in Colorscales: https://plotly.com/python/builtin-colorscales/ -""" - PlottingEngine = Literal['plotly', 'matplotlib'] """Identifier for the plotting engine to use.""" @@ -1192,6 +1142,57 @@ def draw_pie(ax, labels, values, subtitle): return fig, axes +def heatmap_with_plotly_v2( + data: xr.DataArray, + colors: ColorType | None = None, + title: str = '', + facet_col: str | None = None, + animation_frame: str | None = None, + facet_col_wrap: int | None = None, + **imshow_kwargs: Any, +) -> go.Figure: + """ + Plot a heatmap using Plotly's imshow. + + Data should be prepared with dims in order: (y_axis, x_axis, [facet_col], [animation_frame]). + Use reshape_data_for_heatmap() to prepare time-series data before calling this. + + Args: + data: DataArray with 2-4 dimensions. First two are heatmap axes. + colors: Colorscale name ('viridis', 'plasma', etc.). + title: Plot title. + facet_col: Dimension name for subplot columns (3rd dim). + animation_frame: Dimension name for animation (4th dim). + facet_col_wrap: Max columns before wrapping (only if < n_facets). + **imshow_kwargs: Additional args for px.imshow. + + Returns: + Plotly Figure object. + """ + if data.size == 0: + return go.Figure() + + colors = colors or CONFIG.Plotting.default_sequential_colorscale + facet_col_wrap = facet_col_wrap or CONFIG.Plotting.default_facet_cols + + imshow_args: dict[str, Any] = { + 'img': data, + 'color_continuous_scale': colors, + 'title': title, + **imshow_kwargs, + } + + if facet_col and facet_col in data.dims: + imshow_args['facet_col'] = facet_col + if facet_col_wrap < data.sizes[facet_col]: + imshow_args['facet_col_wrap'] = facet_col_wrap + + if animation_frame and animation_frame in data.dims: + imshow_args['animation_frame'] = animation_frame + + return px.imshow(**imshow_args) + + def heatmap_with_plotly( data: xr.DataArray, colors: ColorType | None = None, diff --git a/flixopt/results.py b/flixopt/results.py index 99944b98c..edcbb7a87 100644 --- a/flixopt/results.py +++ b/flixopt/results.py @@ -394,7 +394,7 @@ def setup_colors( def get_all_variable_names(comp: str) -> list[str]: """Collect all variables from the component, including flows and flow_hours.""" comp_object = self.components[comp] - var_names = [comp] + list(comp_object._variable_names) + var_names = [comp] + list(comp_object.variable_names) for flow in comp_object.flows: var_names.extend([flow, f'{flow}|flow_hours']) return var_names @@ -549,21 +549,40 @@ def flow_rates( ) -> xr.DataArray: """Returns a DataArray containing the flow rates of each Flow. - Args: - start: Optional source node(s) to filter by. Can be a single node name or a list of names. - end: Optional destination node(s) to filter by. Can be a single node name or a list of names. - component: Optional component(s) to filter by. Can be a single component name or a list of names. + .. deprecated:: + Use `results.plot.all_flow_rates` (Dataset) or + `results.flows['FlowLabel'].flow_rate` (DataArray) instead. - Further usage: - Convert the dataarray to a dataframe: - >>>results.flow_rates().to_pandas() - Get the max or min over time: - >>>results.flow_rates().max('time') - Sum up the flow rates of flows with the same start and end: - >>>results.flow_rates(end='Fernwärme').groupby('start').sum(dim='flow') - To recombine filtered dataarrays, use `xr.concat` with dim 'flow': - >>>xr.concat([results.flow_rates(start='Fernwärme'), results.flow_rates(end='Fernwärme')], dim='flow') + **Note**: The new API differs from this method: + + - Returns ``xr.Dataset`` (not ``DataArray``) with flow labels as variable names + - No ``'flow'`` dimension - each flow is a separate variable + - No filtering parameters - filter using these alternatives:: + + # Select specific flows by label + ds = results.plot.all_flow_rates + ds[['Boiler(Q_th)', 'CHP(Q_th)']] + + # Filter by substring in label + ds[[v for v in ds.data_vars if 'Boiler' in v]] + + # Filter by bus (start/end) - get flows connected to a bus + results['Fernwärme'].inputs # list of input flow labels + results['Fernwärme'].outputs # list of output flow labels + ds[results['Fernwärme'].inputs] # Dataset with only inputs to bus + + # Filter by component - get flows of a component + results['Boiler'].inputs # list of input flow labels + results['Boiler'].outputs # list of output flow labels """ + warnings.warn( + 'results.flow_rates() is deprecated. ' + 'Use results.plot.all_flow_rates instead (returns Dataset, not DataArray). ' + 'Note: The new API has no filtering parameters and uses flow labels as variable names. ' + f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', + DeprecationWarning, + stacklevel=2, + ) if not self._has_flow_data: raise ValueError('Flow data is not available in this results object (pre-v2.2.0).') if self._flow_rates is None: @@ -584,6 +603,32 @@ def flow_hours( ) -> xr.DataArray: """Returns a DataArray containing the flow hours of each Flow. + .. deprecated:: + Use `results.plot.all_flow_hours` (Dataset) or + `results.flows['FlowLabel'].flow_rate * results.hours_per_timestep` instead. + + **Note**: The new API differs from this method: + + - Returns ``xr.Dataset`` (not ``DataArray``) with flow labels as variable names + - No ``'flow'`` dimension - each flow is a separate variable + - No filtering parameters - filter using these alternatives:: + + # Select specific flows by label + ds = results.plot.all_flow_hours + ds[['Boiler(Q_th)', 'CHP(Q_th)']] + + # Filter by substring in label + ds[[v for v in ds.data_vars if 'Boiler' in v]] + + # Filter by bus (start/end) - get flows connected to a bus + results['Fernwärme'].inputs # list of input flow labels + results['Fernwärme'].outputs # list of output flow labels + ds[results['Fernwärme'].inputs] # Dataset with only inputs to bus + + # Filter by component - get flows of a component + results['Boiler'].inputs # list of input flow labels + results['Boiler'].outputs # list of output flow labels + Flow hours represent the total energy/material transferred over time, calculated by multiplying flow rates by the duration of each timestep. @@ -603,6 +648,14 @@ def flow_hours( >>>xr.concat([results.flow_hours(start='Fernwärme'), results.flow_hours(end='Fernwärme')], dim='flow') """ + warnings.warn( + 'results.flow_hours() is deprecated. ' + 'Use results.plot.all_flow_hours instead (returns Dataset, not DataArray). ' + 'Note: The new API has no filtering parameters and uses flow labels as variable names. ' + f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', + DeprecationWarning, + stacklevel=2, + ) if self._flow_hours is None: self._flow_hours = (self.flow_rates() * self.hours_per_timestep).rename('flow_hours') filters = {k: v for k, v in {'start': start, 'end': end, 'component': component}.items() if v is not None} @@ -615,18 +668,41 @@ def sizes( component: str | list[str] | None = None, ) -> xr.DataArray: """Returns a dataset with the sizes of the Flows. - Args: - start: Optional source node(s) to filter by. Can be a single node name or a list of names. - end: Optional destination node(s) to filter by. Can be a single node name or a list of names. - component: Optional component(s) to filter by. Can be a single component name or a list of names. - Further usage: - Convert the dataarray to a dataframe: - >>>results.sizes().to_pandas() - To recombine filtered dataarrays, use `xr.concat` with dim 'flow': - >>>xr.concat([results.sizes(start='Fernwärme'), results.sizes(end='Fernwärme')], dim='flow') + .. deprecated:: + Use `results.plot.all_sizes` (Dataset) or + `results.flows['FlowLabel'].size` (DataArray) instead. + + **Note**: The new API differs from this method: + + - Returns ``xr.Dataset`` (not ``DataArray``) with flow labels as variable names + - No ``'flow'`` dimension - each flow is a separate variable + - No filtering parameters - filter using these alternatives:: + + # Select specific flows by label + ds = results.plot.all_sizes + ds[['Boiler(Q_th)', 'CHP(Q_th)']] + + # Filter by substring in label + ds[[v for v in ds.data_vars if 'Boiler' in v]] + # Filter by bus (start/end) - get flows connected to a bus + results['Fernwärme'].inputs # list of input flow labels + results['Fernwärme'].outputs # list of output flow labels + ds[results['Fernwärme'].inputs] # Dataset with only inputs to bus + + # Filter by component - get flows of a component + results['Boiler'].inputs # list of input flow labels + results['Boiler'].outputs # list of output flow labels """ + warnings.warn( + 'results.sizes() is deprecated. ' + 'Use results.plot.all_sizes instead (returns Dataset, not DataArray). ' + 'Note: The new API has no filtering parameters and uses flow labels as variable names. ' + f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', + DeprecationWarning, + stacklevel=2, + ) if not self._has_flow_data: raise ValueError('Flow data is not available in this results object (pre-v2.2.0).') if self._sizes is None: @@ -1102,10 +1178,10 @@ class _ElementResults: def __init__(self, results: Results, label: str, variables: list[str], constraints: list[str]): self._results = results self.label = label - self._variable_names = variables + self.variable_names = variables self._constraint_names = constraints - self.solution = self._results.solution[self._variable_names] + self.solution = self._results.solution[self.variable_names] @property def variables(self) -> linopy.Variables: @@ -1116,7 +1192,7 @@ def variables(self) -> linopy.Variables: """ if self._results.model is None: raise ValueError('The linopy model is not available.') - return self._results.model.variables[self._variable_names] + return self._results.model.variables[self.variable_names] @property def constraints(self) -> linopy.Constraints: @@ -1581,7 +1657,7 @@ class ComponentResults(_NodeResults): @property def is_storage(self) -> bool: - return self._charge_state in self._variable_names + return self._charge_state in self.variable_names @property def _charge_state(self) -> str: @@ -1842,7 +1918,7 @@ def get_shares_from(self, element: str) -> xr.Dataset: Returns: xr.Dataset: Element shares to this effect. """ - return self.solution[[name for name in self._variable_names if name.startswith(f'{element}->')]] + return self.solution[[name for name in self.variable_names if name.startswith(f'{element}->')]] class FlowResults(_ElementResults): diff --git a/flixopt/statistics_accessor.py b/flixopt/statistics_accessor.py new file mode 100644 index 000000000..9f6bb01be --- /dev/null +++ b/flixopt/statistics_accessor.py @@ -0,0 +1,1406 @@ +"""Statistics accessor for FlowSystem. + +This module provides a user-friendly API for analyzing optimization results +directly from a FlowSystem. + +Structure: + - `.statistics` - Data/metrics access (cached xarray Datasets) + - `.statistics.plot` - Plotting methods using the statistics data + +Example: + >>> flow_system.optimize(solver) + >>> # Data access + >>> flow_system.statistics.flow_rates + >>> flow_system.statistics.flow_hours + >>> # Plotting + >>> flow_system.statistics.plot.balance('ElectricityBus') + >>> flow_system.statistics.plot.heatmap('Boiler|on') +""" + +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Literal + +import numpy as np +import pandas as pd +import plotly.express as px +import plotly.graph_objects as go +import xarray as xr + +from .color_processing import ColorType, process_colors +from .config import CONFIG + +if TYPE_CHECKING: + from pathlib import Path + + from .flow_system import FlowSystem + +logger = logging.getLogger('flixopt') + +# Type aliases +SelectType = dict[str, Any] +"""xarray-style selection dict: {'time': slice(...), 'scenario': 'base'}""" + +FilterType = str | list[str] +"""For include/exclude filtering: 'Boiler' or ['Boiler', 'CHP']""" + + +def _reshape_time_for_heatmap( + data: xr.DataArray, + reshape: tuple[str, str], + fill: Literal['ffill', 'bfill'] | None = 'ffill', +) -> xr.DataArray: + """Reshape time dimension into 2D (timeframe × timestep) for heatmap display. + + Args: + data: DataArray with 'time' dimension. + reshape: Tuple of (outer_freq, inner_freq), e.g. ('D', 'h') for days × hours. + fill: Method to fill missing values after resampling. + + Returns: + DataArray with 'time' replaced by 'timestep' and 'timeframe' dimensions. + """ + if 'time' not in data.dims: + return data + + timeframes, timesteps_per_frame = reshape + + # Define formats for different combinations + formats = { + ('YS', 'W'): ('%Y', '%W'), + ('YS', 'D'): ('%Y', '%j'), + ('YS', 'h'): ('%Y', '%j %H:00'), + ('MS', 'D'): ('%Y-%m', '%d'), + ('MS', 'h'): ('%Y-%m', '%d %H:00'), + ('W', 'D'): ('%Y-w%W', '%w_%A'), + ('W', 'h'): ('%Y-w%W', '%w_%A %H:00'), + ('D', 'h'): ('%Y-%m-%d', '%H:00'), + ('D', '15min'): ('%Y-%m-%d', '%H:%M'), + ('h', '15min'): ('%Y-%m-%d %H:00', '%M'), + ('h', 'min'): ('%Y-%m-%d %H:00', '%M'), + } + + format_pair = (timeframes, timesteps_per_frame) + if format_pair not in formats: + raise ValueError(f'{format_pair} is not a valid format. Choose from {list(formats.keys())}') + period_format, step_format = formats[format_pair] + + # Resample along time dimension + resampled = data.resample(time=timesteps_per_frame).mean() + + # Apply fill if specified + if fill == 'ffill': + resampled = resampled.ffill(dim='time') + elif fill == 'bfill': + resampled = resampled.bfill(dim='time') + + # Create period and step labels + time_values = pd.to_datetime(resampled.coords['time'].values) + period_labels = time_values.strftime(period_format) + step_labels = time_values.strftime(step_format) + + # Handle special case for weekly day format + if '%w_%A' in step_format: + step_labels = pd.Series(step_labels).replace('0_Sunday', '7_Sunday').values + + # Add period and step as coordinates + resampled = resampled.assign_coords({'timeframe': ('time', period_labels), 'timestep': ('time', step_labels)}) + + # Convert to multi-index and unstack + resampled = resampled.set_index(time=['timeframe', 'timestep']) + result = resampled.unstack('time') + + # Reorder: timestep, timeframe, then other dimensions + other_dims = [d for d in result.dims if d not in ['timestep', 'timeframe']] + return result.transpose('timestep', 'timeframe', *other_dims) + + +def _heatmap_figure( + data: xr.DataArray, + colors: str | list[str] | None = None, + title: str = '', + facet_col: str | None = None, + animation_frame: str | None = None, + facet_col_wrap: int | None = None, + **imshow_kwargs: Any, +) -> go.Figure: + """Create heatmap figure using px.imshow. + + Args: + data: DataArray with 2-4 dimensions. First two are heatmap axes. + colors: Colorscale name (str) or list of colors. Dicts are not supported + for heatmaps as color_continuous_scale requires a colorscale specification. + title: Plot title. + facet_col: Dimension for subplot columns. + animation_frame: Dimension for animation slider. + facet_col_wrap: Max columns before wrapping. + **imshow_kwargs: Additional args for px.imshow. + + Returns: + Plotly Figure. + """ + if data.size == 0: + return go.Figure() + + colors = colors or CONFIG.Plotting.default_sequential_colorscale + facet_col_wrap = facet_col_wrap or CONFIG.Plotting.default_facet_cols + + imshow_args: dict[str, Any] = { + 'img': data, + 'color_continuous_scale': colors, + 'title': title, + **imshow_kwargs, + } + + if facet_col and facet_col in data.dims: + imshow_args['facet_col'] = facet_col + if facet_col_wrap < data.sizes[facet_col]: + imshow_args['facet_col_wrap'] = facet_col_wrap + + if animation_frame and animation_frame in data.dims: + imshow_args['animation_frame'] = animation_frame + + return px.imshow(**imshow_args) + + +@dataclass +class PlotResult: + """Container returned by all plot methods. Holds both data and figure. + + Attributes: + data: Prepared xarray Dataset used for the plot. + figure: Plotly figure object. + """ + + data: xr.Dataset + figure: go.Figure + + def show(self) -> PlotResult: + """Display the figure. Returns self for chaining.""" + self.figure.show() + return self + + def update(self, **layout_kwargs: Any) -> PlotResult: + """Update figure layout. Returns self for chaining.""" + self.figure.update_layout(**layout_kwargs) + return self + + def update_traces(self, **trace_kwargs: Any) -> PlotResult: + """Update figure traces. Returns self for chaining.""" + self.figure.update_traces(**trace_kwargs) + return self + + def to_html(self, path: str | Path) -> PlotResult: + """Save figure as interactive HTML. Returns self for chaining.""" + self.figure.write_html(str(path)) + return self + + def to_image(self, path: str | Path, **kwargs: Any) -> PlotResult: + """Save figure as static image. Returns self for chaining.""" + self.figure.write_image(str(path), **kwargs) + return self + + def to_csv(self, path: str | Path, **kwargs: Any) -> PlotResult: + """Export the underlying data to CSV. Returns self for chaining.""" + self.data.to_dataframe().to_csv(path, **kwargs) + return self + + def to_netcdf(self, path: str | Path, **kwargs: Any) -> PlotResult: + """Export the underlying data to netCDF. Returns self for chaining.""" + self.data.to_netcdf(path, **kwargs) + return self + + +# --- Helper functions --- + + +def _filter_by_pattern( + names: list[str], + include: FilterType | None, + exclude: FilterType | None, +) -> list[str]: + """Filter names using substring matching.""" + result = names.copy() + if include is not None: + patterns = [include] if isinstance(include, str) else include + result = [n for n in result if any(p in n for p in patterns)] + if exclude is not None: + patterns = [exclude] if isinstance(exclude, str) else exclude + result = [n for n in result if not any(p in n for p in patterns)] + return result + + +def _apply_selection(ds: xr.Dataset, select: SelectType | None) -> xr.Dataset: + """Apply xarray-style selection to dataset.""" + if select is None: + return ds + valid_select = {k: v for k, v in select.items() if k in ds.dims or k in ds.coords} + if valid_select: + ds = ds.sel(valid_select) + return ds + + +def _resolve_facets( + ds: xr.Dataset, + facet_col: str | None, + facet_row: str | None, +) -> tuple[str | None, str | None]: + """Resolve facet dimensions, returning None if not present in data.""" + actual_facet_col = facet_col if facet_col and facet_col in ds.dims else None + actual_facet_row = facet_row if facet_row and facet_row in ds.dims else None + return actual_facet_col, actual_facet_row + + +def _dataset_to_long_df(ds: xr.Dataset, value_name: str = 'value', var_name: str = 'variable') -> pd.DataFrame: + """Convert xarray Dataset to long-form DataFrame for plotly express.""" + if not ds.data_vars: + return pd.DataFrame() + if all(ds[var].ndim == 0 for var in ds.data_vars): + rows = [{var_name: var, value_name: float(ds[var].values)} for var in ds.data_vars] + return pd.DataFrame(rows) + df = ds.to_dataframe().reset_index() + # Only use coordinates that are actually present as columns after reset_index + coord_cols = [c for c in ds.coords.keys() if c in df.columns] + return df.melt(id_vars=coord_cols, var_name=var_name, value_name=value_name) + + +def _create_stacked_bar( + ds: xr.Dataset, + colors: ColorType, + title: str, + facet_col: str | None, + facet_row: str | None, + **plotly_kwargs: Any, +) -> go.Figure: + """Create a stacked bar chart from xarray Dataset.""" + df = _dataset_to_long_df(ds) + if df.empty: + return go.Figure() + x_col = 'time' if 'time' in df.columns else df.columns[0] + variables = df['variable'].unique().tolist() + color_map = process_colors(colors, variables, default_colorscale=CONFIG.Plotting.default_qualitative_colorscale) + fig = px.bar( + df, + x=x_col, + y='value', + color='variable', + facet_col=facet_col, + facet_row=facet_row, + color_discrete_map=color_map, + title=title, + **plotly_kwargs, + ) + fig.update_layout(barmode='relative', bargap=0, bargroupgap=0) + fig.update_traces(marker_line_width=0) + return fig + + +def _create_line( + ds: xr.Dataset, + colors: ColorType, + title: str, + facet_col: str | None, + facet_row: str | None, + **plotly_kwargs: Any, +) -> go.Figure: + """Create a line chart from xarray Dataset.""" + df = _dataset_to_long_df(ds) + if df.empty: + return go.Figure() + x_col = 'time' if 'time' in df.columns else df.columns[0] + variables = df['variable'].unique().tolist() + color_map = process_colors(colors, variables, default_colorscale=CONFIG.Plotting.default_qualitative_colorscale) + return px.line( + df, + x=x_col, + y='value', + color='variable', + facet_col=facet_col, + facet_row=facet_row, + color_discrete_map=color_map, + title=title, + **plotly_kwargs, + ) + + +# --- Statistics Accessor (data only) --- + + +class StatisticsAccessor: + """Statistics accessor for FlowSystem. Access via ``flow_system.statistics``. + + This accessor provides cached data properties for optimization results. + Use ``.plot`` for visualization methods. + + Data Properties: + ``flow_rates`` : xr.Dataset + Flow rates for all flows. + ``flow_hours`` : xr.Dataset + Flow hours (energy) for all flows. + ``sizes`` : xr.Dataset + Sizes for all flows. + ``charge_states`` : xr.Dataset + Charge states for all storage components. + ``temporal_effects`` : xr.Dataset + Temporal effects per contributor per timestep. + ``periodic_effects`` : xr.Dataset + Periodic (investment) effects per contributor. + ``total_effects`` : xr.Dataset + Total effects (temporal + periodic) per contributor. + ``effect_share_factors`` : dict + Conversion factors between effects. + + Examples: + >>> flow_system.optimize(solver) + >>> flow_system.statistics.flow_rates # Get data + >>> flow_system.statistics.plot.balance('Bus') # Plot + """ + + def __init__(self, flow_system: FlowSystem) -> None: + self._fs = flow_system + # Cached data + self._flow_rates: xr.Dataset | None = None + self._flow_hours: xr.Dataset | None = None + self._sizes: xr.Dataset | None = None + self._charge_states: xr.Dataset | None = None + self._effect_share_factors: dict[str, dict] | None = None + self._temporal_effects: xr.Dataset | None = None + self._periodic_effects: xr.Dataset | None = None + self._total_effects: xr.Dataset | None = None + # Plotting accessor (lazy) + self._plot: StatisticsPlotAccessor | None = None + + def _require_solution(self) -> xr.Dataset: + """Get solution, raising if not available.""" + if self._fs.solution is None: + raise RuntimeError('FlowSystem has no solution. Run optimize() or solve() first.') + return self._fs.solution + + @property + def plot(self) -> StatisticsPlotAccessor: + """Access plotting methods for statistics. + + Returns: + A StatisticsPlotAccessor instance. + + Examples: + >>> flow_system.statistics.plot.balance('ElectricityBus') + >>> flow_system.statistics.plot.heatmap('Boiler|on') + """ + if self._plot is None: + self._plot = StatisticsPlotAccessor(self) + return self._plot + + @property + def flow_rates(self) -> xr.Dataset: + """All flow rates as a Dataset with flow labels as variable names.""" + self._require_solution() + if self._flow_rates is None: + flow_rate_vars = [v for v in self._fs.solution.data_vars if v.endswith('|flow_rate')] + self._flow_rates = xr.Dataset({v.replace('|flow_rate', ''): self._fs.solution[v] for v in flow_rate_vars}) + return self._flow_rates + + @property + def flow_hours(self) -> xr.Dataset: + """All flow hours (energy) as a Dataset with flow labels as variable names.""" + self._require_solution() + if self._flow_hours is None: + hours = self._fs.hours_per_timestep + self._flow_hours = self.flow_rates * hours + return self._flow_hours + + @property + def sizes(self) -> xr.Dataset: + """All flow sizes as a Dataset with flow labels as variable names.""" + self._require_solution() + if self._sizes is None: + size_vars = [v for v in self._fs.solution.data_vars if v.endswith('|size')] + self._sizes = xr.Dataset({v.replace('|size', ''): self._fs.solution[v] for v in size_vars}) + return self._sizes + + @property + def charge_states(self) -> xr.Dataset: + """All storage charge states as a Dataset with storage labels as variable names.""" + self._require_solution() + if self._charge_states is None: + charge_vars = [v for v in self._fs.solution.data_vars if v.endswith('|charge_state')] + self._charge_states = xr.Dataset( + {v.replace('|charge_state', ''): self._fs.solution[v] for v in charge_vars} + ) + return self._charge_states + + @property + def effect_share_factors(self) -> dict[str, dict]: + """Effect share factors for temporal and periodic modes. + + Returns: + Dict with 'temporal' and 'periodic' keys, each containing + conversion factors between effects. + """ + self._require_solution() + if self._effect_share_factors is None: + factors = self._fs.effects.calculate_effect_share_factors() + self._effect_share_factors = {'temporal': factors[0], 'periodic': factors[1]} + return self._effect_share_factors + + @property + def temporal_effects(self) -> xr.Dataset: + """Temporal effects per contributor per timestep. + + Returns a Dataset where each effect is a data variable with dimensions + [time, contributor] (plus period/scenario if present). + + Coordinates: + - contributor: Individual contributor labels + - component: Parent component label for groupby operations + - component_type: Component type (e.g., 'Boiler', 'Source', 'Sink') + + Examples: + >>> # Get costs per contributor per timestep + >>> statistics.temporal_effects['costs'] + >>> # Sum over all contributors to get total costs per timestep + >>> statistics.temporal_effects['costs'].sum('contributor') + >>> # Group by component + >>> statistics.temporal_effects['costs'].groupby('component').sum() + + Returns: + xr.Dataset with effects as variables and contributor dimension. + """ + self._require_solution() + if self._temporal_effects is None: + ds = self._create_effects_dataset('temporal') + dim_order = ['time', 'period', 'scenario', 'contributor'] + self._temporal_effects = ds.transpose(*dim_order, missing_dims='ignore') + return self._temporal_effects + + @property + def periodic_effects(self) -> xr.Dataset: + """Periodic (investment) effects per contributor. + + Returns a Dataset where each effect is a data variable with dimensions + [contributor] (plus period/scenario if present). + + Coordinates: + - contributor: Individual contributor labels + - component: Parent component label for groupby operations + - component_type: Component type (e.g., 'Boiler', 'Source', 'Sink') + + Examples: + >>> # Get investment costs per contributor + >>> statistics.periodic_effects['costs'] + >>> # Sum over all contributors to get total investment costs + >>> statistics.periodic_effects['costs'].sum('contributor') + >>> # Group by component + >>> statistics.periodic_effects['costs'].groupby('component').sum() + + Returns: + xr.Dataset with effects as variables and contributor dimension. + """ + self._require_solution() + if self._periodic_effects is None: + ds = self._create_effects_dataset('periodic') + dim_order = ['period', 'scenario', 'contributor'] + self._periodic_effects = ds.transpose(*dim_order, missing_dims='ignore') + return self._periodic_effects + + @property + def total_effects(self) -> xr.Dataset: + """Total effects (temporal + periodic) per contributor. + + Returns a Dataset where each effect is a data variable with dimensions + [contributor] (plus period/scenario if present). + + Coordinates: + - contributor: Individual contributor labels + - component: Parent component label for groupby operations + - component_type: Component type (e.g., 'Boiler', 'Source', 'Sink') + + Examples: + >>> # Get total costs per contributor + >>> statistics.total_effects['costs'] + >>> # Sum over all contributors to get total system costs + >>> statistics.total_effects['costs'].sum('contributor') + >>> # Group by component + >>> statistics.total_effects['costs'].groupby('component').sum() + >>> # Group by component type + >>> statistics.total_effects['costs'].groupby('component_type').sum() + + Returns: + xr.Dataset with effects as variables and contributor dimension. + """ + self._require_solution() + if self._total_effects is None: + ds = self._create_effects_dataset('total') + dim_order = ['period', 'scenario', 'contributor'] + self._total_effects = ds.transpose(*dim_order, missing_dims='ignore') + return self._total_effects + + def get_effect_shares( + self, + element: str, + effect: str, + mode: Literal['temporal', 'periodic'] | None = None, + include_flows: bool = False, + ) -> xr.Dataset: + """Retrieve individual effect shares for a specific element and effect. + + Args: + element: The element identifier (component or flow label). + effect: The effect identifier. + mode: 'temporal', 'periodic', or None for both. + include_flows: Whether to include effects from flows connected to this element. + + Returns: + xr.Dataset containing the requested effect shares. + + Raises: + ValueError: If the effect is not available or mode is invalid. + """ + self._require_solution() + + if effect not in self._fs.effects: + raise ValueError(f'Effect {effect} is not available.') + + if mode is None: + return xr.merge( + [ + self.get_effect_shares( + element=element, effect=effect, mode='temporal', include_flows=include_flows + ), + self.get_effect_shares( + element=element, effect=effect, mode='periodic', include_flows=include_flows + ), + ] + ) + + if mode not in ['temporal', 'periodic']: + raise ValueError(f'Mode {mode} is not available. Choose between "temporal" and "periodic".') + + ds = xr.Dataset() + label = f'{element}->{effect}({mode})' + if label in self._fs.solution: + ds = xr.Dataset({label: self._fs.solution[label]}) + + if include_flows: + if element not in self._fs.components: + raise ValueError(f'Only use Components when retrieving Effects including flows. Got {element}') + comp = self._fs.components[element] + flows = [f.label_full.split('|')[0] for f in comp.inputs + comp.outputs] + return xr.merge( + [ds] + + [ + self.get_effect_shares(element=flow, effect=effect, mode=mode, include_flows=False) + for flow in flows + ] + ) + + return ds + + def _create_template_for_mode(self, mode: Literal['temporal', 'periodic', 'total']) -> xr.DataArray: + """Create a template DataArray with the correct dimensions for a given mode.""" + coords = {} + if mode == 'temporal': + coords['time'] = self._fs.timesteps + if self._fs.periods is not None: + coords['period'] = self._fs.periods + if self._fs.scenarios is not None: + coords['scenario'] = self._fs.scenarios + + if coords: + shape = tuple(len(coords[dim]) for dim in coords) + return xr.DataArray(np.full(shape, np.nan, dtype=float), coords=coords, dims=list(coords.keys())) + else: + return xr.DataArray(np.nan) + + def _create_effects_dataset(self, mode: Literal['temporal', 'periodic', 'total']) -> xr.Dataset: + """Create dataset containing effect totals for all contributors. + + Detects contributors (flows, components, etc.) from solution data variables. + Excludes effect-to-effect shares which are intermediate conversions. + Provides component and component_type coordinates for flexible groupby operations. + """ + solution = self._fs.solution + template = self._create_template_for_mode(mode) + + # Detect contributors from solution data variables + # Pattern: {contributor}->{effect}(temporal) or {contributor}->{effect}(periodic) + contributor_pattern = re.compile(r'^(.+)->(.+)\((temporal|periodic)\)$') + effect_labels = set(self._fs.effects.keys()) + + detected_contributors: set[str] = set() + for var in solution.data_vars: + match = contributor_pattern.match(str(var)) + if match: + contributor = match.group(1) + # Exclude effect-to-effect shares (e.g., costs(temporal) -> Effect1(temporal)) + base_name = contributor.split('(')[0] if '(' in contributor else contributor + if base_name not in effect_labels: + detected_contributors.add(contributor) + + contributors = sorted(detected_contributors) + + # Build metadata for each contributor + def get_parent_component(contributor: str) -> str: + if contributor in self._fs.flows: + return self._fs.flows[contributor].component + elif contributor in self._fs.components: + return contributor + return contributor + + def get_contributor_type(contributor: str) -> str: + if contributor in self._fs.flows: + parent = self._fs.flows[contributor].component + return type(self._fs.components[parent]).__name__ + elif contributor in self._fs.components: + return type(self._fs.components[contributor]).__name__ + elif contributor in self._fs.buses: + return type(self._fs.buses[contributor]).__name__ + return 'Unknown' + + parents = [get_parent_component(c) for c in contributors] + contributor_types = [get_contributor_type(c) for c in contributors] + + # Determine modes to process + modes_to_process = ['temporal', 'periodic'] if mode == 'total' else [mode] + + ds = xr.Dataset() + + for effect in self._fs.effects: + contributor_arrays = [] + + for contributor in contributors: + share_total: xr.DataArray | None = None + + for current_mode in modes_to_process: + # Get conversion factors: which source effects contribute to this target effect + conversion_factors = { + key[0]: value + for key, value in self.effect_share_factors[current_mode].items() + if key[1] == effect + } + conversion_factors[effect] = 1 # Direct contribution + + for source_effect, factor in conversion_factors.items(): + label = f'{contributor}->{source_effect}({current_mode})' + if label in solution: + da = solution[label] * factor + # For total mode, sum temporal over time + if mode == 'total' and current_mode == 'temporal' and 'time' in da.dims: + da = da.sum('time') + if share_total is None: + share_total = da + else: + share_total = share_total + da + + # If no share found, use NaN template + if share_total is None: + share_total = xr.full_like(template, np.nan, dtype=float) + + contributor_arrays.append(share_total.expand_dims(contributor=[contributor])) + + # Concatenate all contributors for this effect + ds[effect] = xr.concat(contributor_arrays, dim='contributor', coords='minimal', join='outer').rename(effect) + + # Add groupby coordinates for contributor dimension + ds = ds.assign_coords( + component=('contributor', parents), + component_type=('contributor', contributor_types), + ) + + # Validation: check totals match solution + suffix_map = {'temporal': '(temporal)|per_timestep', 'periodic': '(periodic)', 'total': ''} + for effect in self._fs.effects: + label = f'{effect}{suffix_map[mode]}' + if label in solution: + computed = ds[effect].sum('contributor') + found = solution[label] + if not np.allclose(computed.fillna(0).values, found.fillna(0).values, equal_nan=True): + logger.critical( + f'Results for {effect}({mode}) in effects_dataset doesnt match {label}\n{computed=}\n, {found=}' + ) + + return ds + + +# --- Statistics Plot Accessor --- + + +class StatisticsPlotAccessor: + """Plot accessor for statistics. Access via ``flow_system.statistics.plot``. + + All methods return PlotResult with both data and figure. + """ + + def __init__(self, statistics: StatisticsAccessor) -> None: + self._stats = statistics + self._fs = statistics._fs + + def balance( + self, + node: str, + *, + select: SelectType | None = None, + include: FilterType | None = None, + exclude: FilterType | None = None, + unit: Literal['flow_rate', 'flow_hours'] = 'flow_rate', + colors: ColorType | None = None, + facet_col: str | None = 'period', + facet_row: str | None = 'scenario', + show: bool | None = None, + **plotly_kwargs: Any, + ) -> PlotResult: + """Plot node balance (inputs vs outputs) for a Bus or Component. + + Args: + node: Label of the Bus or Component to plot. + select: xarray-style selection dict. + include: Only include flows containing these substrings. + exclude: Exclude flows containing these substrings. + unit: 'flow_rate' (power) or 'flow_hours' (energy). + colors: Color specification (colorscale name, color list, or label-to-color dict). + facet_col: Dimension for column facets. + facet_row: Dimension for row facets. + show: Whether to display the plot. + + Returns: + PlotResult with .data and .figure. + """ + self._stats._require_solution() + + # Get the element + if node in self._fs.buses: + element = self._fs.buses[node] + elif node in self._fs.components: + element = self._fs.components[node] + else: + raise KeyError(f"'{node}' not found in buses or components") + + input_labels = [f.label_full for f in element.inputs] + output_labels = [f.label_full for f in element.outputs] + all_labels = input_labels + output_labels + + filtered_labels = _filter_by_pattern(all_labels, include, exclude) + if not filtered_labels: + logger.warning(f'No flows remaining after filtering for node {node}') + return PlotResult(data=xr.Dataset(), figure=go.Figure()) + + # Get data from statistics + if unit == 'flow_rate': + ds = self._stats.flow_rates[[lbl for lbl in filtered_labels if lbl in self._stats.flow_rates]] + else: + ds = self._stats.flow_hours[[lbl for lbl in filtered_labels if lbl in self._stats.flow_hours]] + + # Negate inputs + for label in input_labels: + if label in ds: + ds[label] = -ds[label] + + ds = _apply_selection(ds, select) + actual_facet_col, actual_facet_row = _resolve_facets(ds, facet_col, facet_row) + + fig = _create_stacked_bar( + ds, + colors=colors, + title=f'{node} ({unit})', + facet_col=actual_facet_col, + facet_row=actual_facet_row, + **plotly_kwargs, + ) + + if show is None: + show = CONFIG.Plotting.default_show + if show: + fig.show() + + return PlotResult(data=ds, figure=fig) + + def heatmap( + self, + variables: str | list[str], + *, + select: SelectType | None = None, + reshape: tuple[str, str] | None = ('D', 'h'), + colors: str | list[str] | None = None, + facet_col: str | None = 'period', + animation_frame: str | None = 'scenario', + show: bool | None = None, + **plotly_kwargs: Any, + ) -> PlotResult: + """Plot heatmap of time series data. + + Time is reshaped into 2D (e.g., days × hours) when possible. Multiple variables + are shown as facets. If too many dimensions exist to display without data loss, + reshaping is skipped and variables are shown on the y-axis with time on x-axis. + + Args: + variables: Variable name(s) from solution. + select: xarray-style selection, e.g. {'scenario': 'Base Case'}. + reshape: Time reshape frequencies as (outer, inner), e.g. ('D', 'h') for + days × hours. Set to None to disable reshaping. + colors: Colorscale name (str) or list of colors for heatmap coloring. + Dicts are not supported for heatmaps (use str or list[str]). + facet_col: Dimension for subplot columns (default: 'period'). + With multiple variables, 'variable' is used instead. + animation_frame: Dimension for animation slider (default: 'scenario'). + show: Whether to display the figure. + **plotly_kwargs: Additional arguments passed to px.imshow. + + Returns: + PlotResult with processed data and figure. + """ + solution = self._stats._require_solution() + + if isinstance(variables, str): + variables = [variables] + + ds = solution[variables] + ds = _apply_selection(ds, select) + + # Stack variables into single DataArray + variable_names = list(ds.data_vars) + dataarrays = [ds[var] for var in variable_names] + da = xr.concat(dataarrays, dim=pd.Index(variable_names, name='variable')) + + # Determine facet and animation from available dims + has_multiple_vars = 'variable' in da.dims and da.sizes['variable'] > 1 + + if has_multiple_vars: + actual_facet = 'variable' + actual_animation = ( + animation_frame + if animation_frame in da.dims + else (facet_col if facet_col in da.dims and da.sizes.get(facet_col, 1) > 1 else None) + ) + else: + actual_facet = facet_col if facet_col in da.dims and da.sizes.get(facet_col, 0) > 1 else None + actual_animation = ( + animation_frame if animation_frame in da.dims and da.sizes.get(animation_frame, 0) > 1 else None + ) + + # Count non-time dims with size > 1 (these need facet/animation slots) + extra_dims = [d for d in da.dims if d != 'time' and da.sizes[d] > 1] + used_slots = len([d for d in [actual_facet, actual_animation] if d]) + would_drop = len(extra_dims) > used_slots + + # Reshape time only if we wouldn't lose data (all extra dims fit in facet + animation) + if reshape and 'time' in da.dims and not would_drop: + da = _reshape_time_for_heatmap(da, reshape) + heatmap_dims = ['timestep', 'timeframe'] + elif has_multiple_vars: + # Can't reshape but have multiple vars: use variable + time as heatmap axes + heatmap_dims = ['variable', 'time'] + # variable is now a heatmap dim, use period/scenario for facet/animation + actual_facet = facet_col if facet_col in da.dims and da.sizes.get(facet_col, 0) > 1 else None + actual_animation = ( + animation_frame if animation_frame in da.dims and da.sizes.get(animation_frame, 0) > 1 else None + ) + else: + heatmap_dims = ['time'] if 'time' in da.dims else list(da.dims)[:1] + + # Keep only dims we need + keep_dims = set(heatmap_dims) | {actual_facet, actual_animation} - {None} + for dim in [d for d in da.dims if d not in keep_dims]: + da = da.isel({dim: 0}, drop=True) if da.sizes[dim] > 1 else da.squeeze(dim, drop=True) + + # Transpose to expected order + dim_order = heatmap_dims + [d for d in [actual_facet, actual_animation] if d] + da = da.transpose(*dim_order) + + # Clear name for multiple variables (colorbar would show first var's name) + if has_multiple_vars: + da = da.rename('') + + fig = _heatmap_figure( + da, + colors=colors, + facet_col=actual_facet, + animation_frame=actual_animation, + **plotly_kwargs, + ) + + if show is None: + show = CONFIG.Plotting.default_show + if show: + fig.show() + + reshaped_ds = da.to_dataset(name='value') if isinstance(da, xr.DataArray) else da + return PlotResult(data=reshaped_ds, figure=fig) + + def flows( + self, + *, + start: str | list[str] | None = None, + end: str | list[str] | None = None, + component: str | list[str] | None = None, + select: SelectType | None = None, + unit: Literal['flow_rate', 'flow_hours'] = 'flow_rate', + colors: ColorType | None = None, + facet_col: str | None = 'period', + facet_row: str | None = 'scenario', + show: bool | None = None, + **plotly_kwargs: Any, + ) -> PlotResult: + """Plot flow rates filtered by start/end nodes or component. + + Args: + start: Filter by source node(s). + end: Filter by destination node(s). + component: Filter by parent component(s). + select: xarray-style selection. + unit: 'flow_rate' or 'flow_hours'. + colors: Color specification (colorscale name, color list, or label-to-color dict). + facet_col: Dimension for column facets. + facet_row: Dimension for row facets. + show: Whether to display. + + Returns: + PlotResult with flow data. + """ + self._stats._require_solution() + + ds = self._stats.flow_rates if unit == 'flow_rate' else self._stats.flow_hours + + # Filter by connection + if start is not None or end is not None or component is not None: + matching_labels = [] + starts = [start] if isinstance(start, str) else (start or []) + ends = [end] if isinstance(end, str) else (end or []) + components = [component] if isinstance(component, str) else (component or []) + + for flow in self._fs.flows.values(): + # Get bus label (could be string or Bus object) + bus_label = flow.bus if isinstance(flow.bus, str) else flow.bus.label + comp_label = flow.component.label if hasattr(flow.component, 'label') else str(flow.component) + + # start/end filtering based on flow direction + if flow.is_input_in_component: + # Flow goes: bus -> component, so start=bus, end=component + if starts and bus_label not in starts: + continue + if ends and comp_label not in ends: + continue + else: + # Flow goes: component -> bus, so start=component, end=bus + if starts and comp_label not in starts: + continue + if ends and bus_label not in ends: + continue + + if components and comp_label not in components: + continue + matching_labels.append(flow.label_full) + + ds = ds[[lbl for lbl in matching_labels if lbl in ds]] + + ds = _apply_selection(ds, select) + actual_facet_col, actual_facet_row = _resolve_facets(ds, facet_col, facet_row) + + fig = _create_line( + ds, + colors=colors, + title=f'Flows ({unit})', + facet_col=actual_facet_col, + facet_row=actual_facet_row, + **plotly_kwargs, + ) + + if show is None: + show = CONFIG.Plotting.default_show + if show: + fig.show() + + return PlotResult(data=ds, figure=fig) + + def sankey( + self, + *, + timestep: int | str | None = None, + aggregate: Literal['sum', 'mean'] = 'sum', + select: SelectType | None = None, + colors: ColorType | None = None, + show: bool | None = None, + **plotly_kwargs: Any, + ) -> PlotResult: + """Plot Sankey diagram of energy/material flow hours. + + Args: + timestep: Specific timestep to show, or None for aggregation. + aggregate: How to aggregate if timestep is None. + select: xarray-style selection. + colors: Color specification for nodes (colorscale name, color list, or label-to-color dict). + show: Whether to display. + + Returns: + PlotResult with Sankey flow data. + """ + self._stats._require_solution() + + ds = self._stats.flow_hours.copy() + + # Apply weights + if 'period' in ds.dims and self._fs.period_weights is not None: + ds = ds * self._fs.period_weights + if 'scenario' in ds.dims and self._fs.scenario_weights is not None: + weights = self._fs.scenario_weights / self._fs.scenario_weights.sum() + ds = ds * weights + + ds = _apply_selection(ds, select) + + if timestep is not None: + if isinstance(timestep, int): + ds = ds.isel(time=timestep) + else: + ds = ds.sel(time=timestep) + elif 'time' in ds.dims: + ds = getattr(ds, aggregate)(dim='time') + + for dim in ['period', 'scenario']: + if dim in ds.dims: + ds = ds.sum(dim=dim) + + # Build Sankey + nodes = set() + links = {'source': [], 'target': [], 'value': [], 'label': []} + + for flow in self._fs.flows.values(): + label = flow.label_full + if label not in ds: + continue + value = float(ds[label].values) + if abs(value) < 1e-6: + continue + + # Determine source/target based on flow direction + # is_input_in_component: True means bus -> component, False means component -> bus + bus_label = flow.bus if isinstance(flow.bus, str) else flow.bus.label + comp_label = flow.component.label if hasattr(flow.component, 'label') else str(flow.component) + + if flow.is_input_in_component: + source = bus_label + target = comp_label + else: + source = comp_label + target = bus_label + + nodes.add(source) + nodes.add(target) + links['source'].append(source) + links['target'].append(target) + links['value'].append(abs(value)) + links['label'].append(label) + + node_list = list(nodes) + node_indices = {n: i for i, n in enumerate(node_list)} + + color_map = process_colors(colors, node_list) + node_colors = [color_map[node] for node in node_list] + + fig = go.Figure( + data=[ + go.Sankey( + node=dict( + pad=15, thickness=20, line=dict(color='black', width=0.5), label=node_list, color=node_colors + ), + link=dict( + source=[node_indices[s] for s in links['source']], + target=[node_indices[t] for t in links['target']], + value=links['value'], + label=links['label'], + ), + ) + ] + ) + fig.update_layout(title='Energy Flow Sankey', **plotly_kwargs) + + sankey_ds = xr.Dataset( + {'value': ('link', links['value'])}, + coords={'link': links['label'], 'source': ('link', links['source']), 'target': ('link', links['target'])}, + ) + + if show is None: + show = CONFIG.Plotting.default_show + if show: + fig.show() + + return PlotResult(data=sankey_ds, figure=fig) + + def sizes( + self, + *, + max_size: float | None = 1e6, + select: SelectType | None = None, + colors: ColorType | None = None, + facet_col: str | None = 'period', + facet_row: str | None = 'scenario', + show: bool | None = None, + **plotly_kwargs: Any, + ) -> PlotResult: + """Plot investment sizes (capacities) of flows. + + Args: + max_size: Maximum size to include (filters defaults). + select: xarray-style selection. + colors: Color specification (colorscale name, color list, or label-to-color dict). + facet_col: Dimension for column facets. + facet_row: Dimension for row facets. + show: Whether to display. + + Returns: + PlotResult with size data. + """ + self._stats._require_solution() + ds = self._stats.sizes + + ds = _apply_selection(ds, select) + + if max_size is not None and ds.data_vars: + valid_labels = [lbl for lbl in ds.data_vars if float(ds[lbl].max()) < max_size] + ds = ds[valid_labels] + + actual_facet_col, actual_facet_row = _resolve_facets(ds, facet_col, facet_row) + + df = _dataset_to_long_df(ds) + if df.empty: + fig = go.Figure() + else: + variables = df['variable'].unique().tolist() + color_map = process_colors(colors, variables) + fig = px.bar( + df, + x='variable', + y='value', + color='variable', + facet_col=actual_facet_col, + facet_row=actual_facet_row, + color_discrete_map=color_map, + title='Investment Sizes', + labels={'variable': 'Flow', 'value': 'Size'}, + **plotly_kwargs, + ) + + if show is None: + show = CONFIG.Plotting.default_show + if show: + fig.show() + + return PlotResult(data=ds, figure=fig) + + def duration_curve( + self, + variables: str | list[str], + *, + select: SelectType | None = None, + normalize: bool = False, + colors: ColorType | None = None, + facet_col: str | None = 'period', + facet_row: str | None = 'scenario', + show: bool | None = None, + **plotly_kwargs: Any, + ) -> PlotResult: + """Plot load duration curves (sorted time series). + + Args: + variables: Flow label(s) to plot (e.g., 'Boiler(Q_th)'). + Uses flow_rates from statistics. + select: xarray-style selection. + normalize: If True, normalize x-axis to 0-100%. + colors: Color specification (colorscale name, color list, or label-to-color dict). + facet_col: Dimension for column facets. + facet_row: Dimension for row facets. + show: Whether to display. + + Returns: + PlotResult with sorted duration curve data. + """ + self._stats._require_solution() + + if isinstance(variables, str): + variables = [variables] + + # Use flow_rates from statistics (already has clean labels without |flow_rate suffix) + ds = self._stats.flow_rates[variables] + ds = _apply_selection(ds, select) + + if 'time' not in ds.dims: + raise ValueError('Duration curve requires time dimension') + + def sort_descending(arr: np.ndarray) -> np.ndarray: + return np.sort(arr)[::-1] + + result_ds = xr.apply_ufunc( + sort_descending, + ds, + input_core_dims=[['time']], + output_core_dims=[['time']], + vectorize=True, + ) + + duration_name = 'duration_pct' if normalize else 'duration' + result_ds = result_ds.rename({'time': duration_name}) + + n_timesteps = result_ds.sizes[duration_name] + duration_coord = np.linspace(0, 100, n_timesteps) if normalize else np.arange(n_timesteps) + result_ds = result_ds.assign_coords({duration_name: duration_coord}) + + actual_facet_col, actual_facet_row = _resolve_facets(result_ds, facet_col, facet_row) + + fig = _create_line( + result_ds, + colors=colors, + title='Duration Curve', + facet_col=actual_facet_col, + facet_row=actual_facet_row, + **plotly_kwargs, + ) + + x_label = 'Duration [%]' if normalize else 'Timesteps' + fig.update_xaxes(title_text=x_label) + + if show is None: + show = CONFIG.Plotting.default_show + if show: + fig.show() + + return PlotResult(data=result_ds, figure=fig) + + def effects( + self, + aspect: Literal['total', 'temporal', 'periodic'] = 'total', + *, + effect: str | None = None, + by: Literal['component', 'contributor', 'time'] = 'component', + select: SelectType | None = None, + colors: ColorType | None = None, + facet_col: str | None = 'period', + facet_row: str | None = 'scenario', + show: bool | None = None, + **plotly_kwargs: Any, + ) -> PlotResult: + """Plot effect (cost, emissions, etc.) breakdown. + + Args: + aspect: Which aspect to plot - 'total', 'temporal', or 'periodic'. + effect: Specific effect name to plot (e.g., 'costs', 'CO2'). + If None, plots all effects. + by: Group by 'component', 'contributor' (individual flows), or 'time'. + select: xarray-style selection. + colors: Color specification (colorscale name, color list, or label-to-color dict). + facet_col: Dimension for column facets (ignored if not in data). + facet_row: Dimension for row facets (ignored if not in data). + show: Whether to display. + + Returns: + PlotResult with effect breakdown data. + + Examples: + >>> flow_system.statistics.plot.effects() # Total of all effects by component + >>> flow_system.statistics.plot.effects(effect='costs') # Just costs + >>> flow_system.statistics.plot.effects(by='contributor') # By individual flows + >>> flow_system.statistics.plot.effects(aspect='temporal', by='time') # Over time + """ + self._stats._require_solution() + + # Get the appropriate effects dataset based on aspect + if aspect == 'total': + effects_ds = self._stats.total_effects + elif aspect == 'temporal': + effects_ds = self._stats.temporal_effects + elif aspect == 'periodic': + effects_ds = self._stats.periodic_effects + else: + raise ValueError(f"Aspect '{aspect}' not valid. Choose from 'total', 'temporal', 'periodic'.") + + # Get available effects (data variables in the dataset) + available_effects = list(effects_ds.data_vars) + + # Filter to specific effect if requested + if effect is not None: + if effect not in available_effects: + raise ValueError(f"Effect '{effect}' not found. Available: {available_effects}") + effects_to_plot = [effect] + else: + effects_to_plot = available_effects + + # Build a combined DataArray with effect dimension + effect_arrays = [] + for eff in effects_to_plot: + da = effects_ds[eff] + if by == 'contributor': + # Keep individual contributors (flows) - no groupby + effect_arrays.append(da.expand_dims(effect=[eff])) + else: + # Group by component (sum over contributor within each component) + da_grouped = da.groupby('component').sum() + effect_arrays.append(da_grouped.expand_dims(effect=[eff])) + + combined = xr.concat(effect_arrays, dim='effect') + + # Apply selection + combined = _apply_selection(combined.to_dataset(name='value'), select)['value'] + + # Group by the specified dimension + if by == 'component': + # Sum over time if present + if 'time' in combined.dims: + combined = combined.sum(dim='time') + x_col = 'component' + color_col = 'effect' if len(effects_to_plot) > 1 else 'component' + elif by == 'contributor': + # Sum over time if present + if 'time' in combined.dims: + combined = combined.sum(dim='time') + x_col = 'contributor' + color_col = 'effect' if len(effects_to_plot) > 1 else 'contributor' + elif by == 'time': + if 'time' not in combined.dims: + raise ValueError(f"Cannot plot by 'time' for aspect '{aspect}' - no time dimension.") + # Sum over components or contributors + if 'component' in combined.dims: + combined = combined.sum(dim='component') + if 'contributor' in combined.dims: + combined = combined.sum(dim='contributor') + x_col = 'time' + color_col = 'effect' if len(effects_to_plot) > 1 else None + else: + raise ValueError(f"'by' must be one of 'component', 'contributor', 'time', got {by!r}") + + # Resolve facets + actual_facet_col, actual_facet_row = _resolve_facets(combined.to_dataset(name='value'), facet_col, facet_row) + + # Convert to DataFrame for plotly express + df = combined.to_dataframe(name='value').reset_index() + + # Build color map + if color_col and color_col in df.columns: + color_items = df[color_col].unique().tolist() + color_map = process_colors(colors, color_items) + else: + color_map = None + + # Build title + effect_label = effect if effect else 'Effects' + title = f'{effect_label} ({aspect}) by {by}' + + fig = px.bar( + df, + x=x_col, + y='value', + color=color_col, + color_discrete_map=color_map, + facet_col=actual_facet_col, + facet_row=actual_facet_row, + title=title, + **plotly_kwargs, + ) + fig.update_layout(bargap=0, bargroupgap=0) + fig.update_traces(marker_line_width=0) + + if show is None: + show = CONFIG.Plotting.default_show + if show: + fig.show() + + return PlotResult(data=combined.to_dataset(name=aspect), figure=fig) diff --git a/flixopt/structure.py b/flixopt/structure.py index 732dcfeae..8bec197bc 100644 --- a/flixopt/structure.py +++ b/flixopt/structure.py @@ -7,6 +7,7 @@ import inspect import logging +import pathlib import re from dataclasses import dataclass from difflib import get_close_matches @@ -28,7 +29,6 @@ from .core import FlowSystemDimensions, TimeSeriesData, get_dataarray_stats if TYPE_CHECKING: # for type checking and preventing circular imports - import pathlib from collections.abc import Collection, ItemsView, Iterator from .effects import EffectCollectionModel @@ -838,18 +838,29 @@ def to_dataset(self) -> xr.Dataset: f'Original Error: {e}' ) from e - def to_netcdf(self, path: str | pathlib.Path, compression: int = 0): + def to_netcdf(self, path: str | pathlib.Path, compression: int = 0, overwrite: bool = True): """ Save the object to a NetCDF file. Args: - path: Path to save the NetCDF file + path: Path to save the NetCDF file. Parent directories are created if they don't exist. compression: Compression level (0-9) + overwrite: If True (default), overwrite existing file. If False, raise error if file exists. Raises: + FileExistsError: If overwrite=False and file already exists. ValueError: If serialization fails IOError: If file cannot be written """ + path = pathlib.Path(path) + + # Check if file exists (unless overwrite is True) + if not overwrite and path.exists(): + raise FileExistsError(f'File already exists: {path}. Use overwrite=True to overwrite existing file.') + + # Create parent directories if they don't exist + path.parent.mkdir(parents=True, exist_ok=True) + try: ds = self.to_dataset() fx_io.save_dataset_to_netcdf(ds, path, compression=compression) diff --git a/flixopt/topology_accessor.py b/flixopt/topology_accessor.py new file mode 100644 index 000000000..de4f83685 --- /dev/null +++ b/flixopt/topology_accessor.py @@ -0,0 +1,302 @@ +""" +Topology accessor for FlowSystem. + +This module provides the TopologyAccessor class that enables the +`flow_system.topology` pattern for network structure inspection and visualization. +""" + +from __future__ import annotations + +import logging +import pathlib +import warnings +from itertools import chain +from typing import TYPE_CHECKING, Literal + +if TYPE_CHECKING: + import pyvis + + from .flow_system import FlowSystem + +logger = logging.getLogger('flixopt') + + +def _plot_network( + node_infos: dict, + edge_infos: dict, + path: str | pathlib.Path | None = None, + controls: bool + | list[ + Literal['nodes', 'edges', 'layout', 'interaction', 'manipulation', 'physics', 'selection', 'renderer'] + ] = True, + show: bool = False, +) -> pyvis.network.Network | None: + """Visualize network structure using PyVis. + + Args: + node_infos: Dictionary of node information. + edge_infos: Dictionary of edge information. + path: Path to save HTML visualization. + controls: UI controls to add. True for all, or list of specific controls. + show: Whether to open in browser. + + Returns: + Network instance, or None if pyvis not installed. + """ + try: + from pyvis.network import Network + except ImportError: + logger.critical("Plotting the flow system network was not possible. Please install pyvis: 'pip install pyvis'") + return None + + net = Network(directed=True, height='100%' if controls is False else '800px', font_color='white') + + for node_id, node in node_infos.items(): + net.add_node( + node_id, + label=node['label'], + shape={'Bus': 'circle', 'Component': 'box'}[node['class']], + color={'Bus': '#393E46', 'Component': '#00ADB5'}[node['class']], + title=node['infos'].replace(')', '\n)'), + font={'size': 14}, + ) + + for edge in edge_infos.values(): + net.add_edge( + edge['start'], + edge['end'], + label=edge['label'], + title=edge['infos'].replace(')', '\n)'), + font={'color': '#4D4D4D', 'size': 14}, + color='#222831', + ) + + net.barnes_hut(central_gravity=0.8, spring_length=50, spring_strength=0.05, gravity=-10000) + + if controls: + net.show_buttons(filter_=controls) + if not show and not path: + return net + elif path: + path = pathlib.Path(path) if isinstance(path, str) else path + net.write_html(path.as_posix()) + elif show: + path = pathlib.Path('network.html') + net.write_html(path.as_posix()) + + if show: + try: + import webbrowser + + worked = webbrowser.open(f'file://{path.resolve()}', 2) + if not worked: + logger.error(f'Showing the network in the Browser went wrong. Open it manually. Its saved under {path}') + except Exception as e: + logger.error( + f'Showing the network in the Browser went wrong. Open it manually. Its saved under {path}: {e}' + ) + + return net + + +class TopologyAccessor: + """ + Accessor for network topology inspection and visualization on FlowSystem. + + This class provides the topology API for FlowSystem, accessible via + `flow_system.topology`. It offers methods to inspect the network structure + and visualize it. + + Examples: + Visualize the network: + + >>> flow_system.topology.plot() + >>> flow_system.topology.plot(path='my_network.html', show=True) + + Interactive visualization: + + >>> flow_system.topology.start_app() + >>> # ... interact with the visualization ... + >>> flow_system.topology.stop_app() + + Get network structure info: + + >>> nodes, edges = flow_system.topology.infos() + """ + + def __init__(self, flow_system: FlowSystem) -> None: + """ + Initialize the accessor with a reference to the FlowSystem. + + Args: + flow_system: The FlowSystem to inspect. + """ + self._fs = flow_system + + def infos(self) -> tuple[dict[str, dict[str, str]], dict[str, dict[str, str]]]: + """ + Get network topology information as dictionaries. + + Returns node and edge information suitable for visualization or analysis. + + Returns: + Tuple of (nodes_dict, edges_dict) where: + - nodes_dict maps node labels to their properties (label, class, infos) + - edges_dict maps edge labels to their properties (label, start, end, infos) + + Examples: + >>> nodes, edges = flow_system.topology.infos() + >>> print(nodes.keys()) # All component and bus labels + >>> print(edges.keys()) # All flow labels + """ + from .elements import Bus + + if not self._fs.connected_and_transformed: + self._fs.connect_and_transform() + + nodes = { + node.label_full: { + 'label': node.label, + 'class': 'Bus' if isinstance(node, Bus) else 'Component', + 'infos': node.__str__(), + } + for node in chain(self._fs.components.values(), self._fs.buses.values()) + } + + edges = { + flow.label_full: { + 'label': flow.label, + 'start': flow.bus if flow.is_input_in_component else flow.component, + 'end': flow.component if flow.is_input_in_component else flow.bus, + 'infos': flow.__str__(), + } + for flow in self._fs.flows.values() + } + + return nodes, edges + + def plot( + self, + path: bool | str | pathlib.Path = 'flow_system.html', + controls: bool + | list[ + Literal['nodes', 'edges', 'layout', 'interaction', 'manipulation', 'physics', 'selection', 'renderer'] + ] = True, + show: bool | None = None, + ) -> pyvis.network.Network | None: + """ + Visualize the network structure using PyVis, saving it as an interactive HTML file. + + Args: + path: Path to save the HTML visualization. + - `False`: Visualization is created but not saved. + - `str` or `Path`: Specifies file path (default: 'flow_system.html'). + controls: UI controls to add to the visualization. + - `True`: Enables all available controls. + - `List`: Specify controls, e.g., ['nodes', 'layout']. + - Options: 'nodes', 'edges', 'layout', 'interaction', 'manipulation', + 'physics', 'selection', 'renderer'. + show: Whether to open the visualization in the web browser. + + Returns: + The `pyvis.network.Network` instance representing the visualization, + or `None` if `pyvis` is not installed. + + Examples: + >>> flow_system.topology.plot() + >>> flow_system.topology.plot(show=False) + >>> flow_system.topology.plot(path='output/network.html', controls=['nodes', 'layout']) + + Notes: + This function requires `pyvis`. If not installed, the function prints + a warning and returns `None`. + Nodes are styled based on type (circles for buses, boxes for components) + and annotated with node information. + """ + from .config import CONFIG + + node_infos, edge_infos = self.infos() + # Normalize path=False to None for _plot_network compatibility + normalized_path = None if path is False else path + return _plot_network( + node_infos, + edge_infos, + normalized_path, + controls, + show if show is not None else CONFIG.Plotting.default_show, + ) + + def start_app(self) -> None: + """ + Start an interactive network visualization using Dash and Cytoscape. + + Launches a web-based interactive visualization server that allows + exploring the network structure dynamically. + + Raises: + ImportError: If required dependencies are not installed. + + Examples: + >>> flow_system.topology.start_app() + >>> # ... interact with the visualization in browser ... + >>> flow_system.topology.stop_app() + + Notes: + Requires optional dependencies: dash, dash-cytoscape, dash-daq, + networkx, flask, werkzeug. + Install with: `pip install flixopt[network_viz]` or `pip install flixopt[full]` + """ + from .network_app import DASH_CYTOSCAPE_AVAILABLE, VISUALIZATION_ERROR, flow_graph, shownetwork + + warnings.warn( + 'The network visualization is still experimental and might change in the future.', + stacklevel=2, + category=UserWarning, + ) + + if not DASH_CYTOSCAPE_AVAILABLE: + raise ImportError( + f'Network visualization requires optional dependencies. ' + f'Install with: `pip install flixopt[network_viz]`, `pip install flixopt[full]` ' + f'or: `pip install dash dash-cytoscape dash-daq networkx werkzeug`. ' + f'Original error: {VISUALIZATION_ERROR}' + ) + + if not self._fs._connected_and_transformed: + self._fs._connect_network() + + if self._fs._network_app is not None: + logger.warning('The network app is already running. Restarting it.') + self.stop_app() + + self._fs._network_app = shownetwork(flow_graph(self._fs)) + + def stop_app(self) -> None: + """ + Stop the interactive network visualization server. + + Examples: + >>> flow_system.topology.stop_app() + """ + from .network_app import DASH_CYTOSCAPE_AVAILABLE, VISUALIZATION_ERROR + + if not DASH_CYTOSCAPE_AVAILABLE: + raise ImportError( + f'Network visualization requires optional dependencies. ' + f'Install with: `pip install flixopt[network_viz]`, `pip install flixopt[full]` ' + f'or: `pip install dash dash-cytoscape dash-daq networkx werkzeug`. ' + f'Original error: {VISUALIZATION_ERROR}' + ) + + if self._fs._network_app is None: + logger.warning("No network app is currently running. Can't stop it") + return + + try: + logger.info('Stopping network visualization server...') + self._fs._network_app.server_instance.shutdown() + logger.info('Network visualization stopped.') + except Exception as e: + logger.error(f'Failed to stop the network visualization app: {e}') + finally: + self._fs._network_app = None diff --git a/mkdocs.yml b/mkdocs.yml index 58abb684f..f966e76f7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -26,6 +26,7 @@ nav: - Building Models: user-guide/building-models/index.md - Running Optimizations: user-guide/optimization/index.md - Analyzing Results: user-guide/results/index.md + - Plotting Results: user-guide/results-plotting.md - Mathematical Notation: - Overview: user-guide/mathematical-notation/index.md - Bus: user-guide/mathematical-notation/elements/Bus.md diff --git a/tests/test_effect.py b/tests/test_effect.py index 1876761ee..7dcac9e1c 100644 --- a/tests/test_effect.py +++ b/tests/test_effect.py @@ -9,7 +9,6 @@ assert_sets_equal, assert_var_equal, create_linopy_model, - create_optimization_and_solve, ) @@ -225,10 +224,7 @@ def test_shares(self, basic_flow_system_linopy_coords, coords_config): class TestEffectResults: - @pytest.mark.deprecated_api - @pytest.mark.filterwarnings('ignore:Results is deprecated:DeprecationWarning:flixopt') - @pytest.mark.filterwarnings('ignore:Optimization is deprecated:DeprecationWarning:flixopt') - def test_shares(self, basic_flow_system_linopy_coords, coords_config): + def test_shares(self, basic_flow_system_linopy_coords, coords_config, highs_solver): flow_system = basic_flow_system_linopy_coords effect1 = fx.Effect('Effect1', '€', 'Testing Effect', share_from_temporal={'costs': 0.5}) effect2 = fx.Effect( @@ -261,7 +257,10 @@ def test_shares(self, basic_flow_system_linopy_coords, coords_config): ), ) - results = create_optimization_and_solve(flow_system, fx.solvers.HighsSolver(0.01, 60), 'Sim1').results + flow_system.optimize(highs_solver) + + # Use the new statistics accessor + statistics = flow_system.statistics effect_share_factors = { 'temporal': { @@ -278,71 +277,72 @@ def test_shares(self, basic_flow_system_linopy_coords, coords_config): }, } for key, value in effect_share_factors['temporal'].items(): - np.testing.assert_allclose(results.effect_share_factors['temporal'][key].values, value) + np.testing.assert_allclose(statistics.effect_share_factors['temporal'][key].values, value) for key, value in effect_share_factors['periodic'].items(): - np.testing.assert_allclose(results.effect_share_factors['periodic'][key].values, value) + np.testing.assert_allclose(statistics.effect_share_factors['periodic'][key].values, value) + # Temporal effects checks using new API xr.testing.assert_allclose( - results.effects_per_component['temporal'].sum('component').sel(effect='costs', drop=True), - results.solution['costs(temporal)|per_timestep'].fillna(0), + statistics.temporal_effects['costs'].sum('contributor'), + flow_system.solution['costs(temporal)|per_timestep'].fillna(0), ) xr.testing.assert_allclose( - results.effects_per_component['temporal'].sum('component').sel(effect='Effect1', drop=True), - results.solution['Effect1(temporal)|per_timestep'].fillna(0), + statistics.temporal_effects['Effect1'].sum('contributor'), + flow_system.solution['Effect1(temporal)|per_timestep'].fillna(0), ) xr.testing.assert_allclose( - results.effects_per_component['temporal'].sum('component').sel(effect='Effect2', drop=True), - results.solution['Effect2(temporal)|per_timestep'].fillna(0), + statistics.temporal_effects['Effect2'].sum('contributor'), + flow_system.solution['Effect2(temporal)|per_timestep'].fillna(0), ) xr.testing.assert_allclose( - results.effects_per_component['temporal'].sum('component').sel(effect='Effect3', drop=True), - results.solution['Effect3(temporal)|per_timestep'].fillna(0), + statistics.temporal_effects['Effect3'].sum('contributor'), + flow_system.solution['Effect3(temporal)|per_timestep'].fillna(0), ) - # periodic mode checks + # Periodic effects checks using new API xr.testing.assert_allclose( - results.effects_per_component['periodic'].sum('component').sel(effect='costs', drop=True), - results.solution['costs(periodic)'], + statistics.periodic_effects['costs'].sum('contributor'), + flow_system.solution['costs(periodic)'], ) xr.testing.assert_allclose( - results.effects_per_component['periodic'].sum('component').sel(effect='Effect1', drop=True), - results.solution['Effect1(periodic)'], + statistics.periodic_effects['Effect1'].sum('contributor'), + flow_system.solution['Effect1(periodic)'], ) xr.testing.assert_allclose( - results.effects_per_component['periodic'].sum('component').sel(effect='Effect2', drop=True), - results.solution['Effect2(periodic)'], + statistics.periodic_effects['Effect2'].sum('contributor'), + flow_system.solution['Effect2(periodic)'], ) xr.testing.assert_allclose( - results.effects_per_component['periodic'].sum('component').sel(effect='Effect3', drop=True), - results.solution['Effect3(periodic)'], + statistics.periodic_effects['Effect3'].sum('contributor'), + flow_system.solution['Effect3(periodic)'], ) - # Total mode checks + # Total effects checks using new API xr.testing.assert_allclose( - results.effects_per_component['total'].sum('component').sel(effect='costs', drop=True), - results.solution['costs'], + statistics.total_effects['costs'].sum('contributor'), + flow_system.solution['costs'], ) xr.testing.assert_allclose( - results.effects_per_component['total'].sum('component').sel(effect='Effect1', drop=True), - results.solution['Effect1'], + statistics.total_effects['Effect1'].sum('contributor'), + flow_system.solution['Effect1'], ) xr.testing.assert_allclose( - results.effects_per_component['total'].sum('component').sel(effect='Effect2', drop=True), - results.solution['Effect2'], + statistics.total_effects['Effect2'].sum('contributor'), + flow_system.solution['Effect2'], ) xr.testing.assert_allclose( - results.effects_per_component['total'].sum('component').sel(effect='Effect3', drop=True), - results.solution['Effect3'], + statistics.total_effects['Effect3'].sum('contributor'), + flow_system.solution['Effect3'], ) @@ -351,7 +351,6 @@ class TestPenaltyAsObjective: def test_penalty_cannot_be_created_as_objective(self): """Test that creating a Penalty effect with is_objective=True raises ValueError.""" - import pytest with pytest.raises(ValueError, match='Penalty.*cannot be set as the objective'): fx.Effect('Penalty', '€', 'Test Penalty', is_objective=True) @@ -359,7 +358,6 @@ def test_penalty_cannot_be_created_as_objective(self): def test_penalty_cannot_be_set_as_objective_via_setter(self): """Test that setting Penalty as objective via setter raises ValueError.""" import pandas as pd - import pytest # Create a fresh flow system without pre-existing objective flow_system = fx.FlowSystem(timesteps=pd.date_range('2020-01-01', periods=10, freq='h')) diff --git a/tests/test_flow_system_resample.py b/tests/test_flow_system_resample.py index f25949c98..3da206646 100644 --- a/tests/test_flow_system_resample.py +++ b/tests/test_flow_system_resample.py @@ -186,8 +186,6 @@ def test_invest_resample(complex_fs): # === Modeling Integration === -@pytest.mark.deprecated_api -@pytest.mark.filterwarnings('ignore:Optimization is deprecated:DeprecationWarning:flixopt') @pytest.mark.parametrize('with_dim', [None, 'periods', 'scenarios']) def test_modeling(with_dim): """Test resampled FlowSystem can be modeled.""" @@ -208,15 +206,12 @@ def test_modeling(with_dim): ) fs_r = fs.resample('4h', method='mean') - calc = fx.Optimization('test', fs_r) - calc.do_modeling() + fs_r.build_model() - assert calc.model is not None - assert len(calc.model.variables) > 0 + assert fs_r.model is not None + assert len(fs_r.model.variables) > 0 -@pytest.mark.deprecated_api -@pytest.mark.filterwarnings('ignore:Optimization is deprecated:DeprecationWarning:flixopt') def test_model_structure_preserved(): """Test model structure (var/constraint types) preserved.""" ts = pd.date_range('2023-01-01', periods=48, freq='h') @@ -229,22 +224,18 @@ def test_model_structure_preserved(): fx.Source(label='s', outputs=[fx.Flow(label='out', bus='h', size=100, effects_per_flow_hour={'costs': 0.05})]), ) - calc_orig = fx.Optimization('orig', fs) - calc_orig.do_modeling() + fs.build_model() fs_r = fs.resample('4h', method='mean') - calc_r = fx.Optimization('resamp', fs_r) - calc_r.do_modeling() + fs_r.build_model() # Same number of variable/constraint types - assert len(calc_orig.model.variables) == len(calc_r.model.variables) - assert len(calc_orig.model.constraints) == len(calc_r.model.constraints) + assert len(fs.model.variables) == len(fs_r.model.variables) + assert len(fs.model.constraints) == len(fs_r.model.constraints) # Same names - assert set(calc_orig.model.variables.labels.data_vars.keys()) == set(calc_r.model.variables.labels.data_vars.keys()) - assert set(calc_orig.model.constraints.labels.data_vars.keys()) == set( - calc_r.model.constraints.labels.data_vars.keys() - ) + assert set(fs.model.variables.labels.data_vars.keys()) == set(fs_r.model.variables.labels.data_vars.keys()) + assert set(fs.model.constraints.labels.data_vars.keys()) == set(fs_r.model.constraints.labels.data_vars.keys()) # === Advanced Features === diff --git a/tests/test_solution_and_plotting.py b/tests/test_solution_and_plotting.py index d80169773..e5c96da33 100644 --- a/tests/test_solution_and_plotting.py +++ b/tests/test_solution_and_plotting.py @@ -347,12 +347,12 @@ def test_reshape_none_preserves_data(self, long_time_data): assert 'time' in reshaped.dims xr.testing.assert_equal(reshaped, long_time_data) - def test_heatmap_with_plotly(self, long_time_data): + def test_heatmap_with_plotly_v2(self, long_time_data): """Test heatmap plotting with Plotly.""" - # Convert to Dataset for plotting - data = long_time_data.to_dataset(name='power') + # Reshape data first (heatmap_with_plotly_v2 requires pre-reshaped data) + reshaped = plotting.reshape_data_for_heatmap(long_time_data, reshape_time=('D', 'h')) - fig = plotting.heatmap_with_plotly(data['power'], reshape_time=('D', 'h')) + fig = plotting.heatmap_with_plotly_v2(reshaped) assert fig is not None def test_heatmap_with_matplotlib(self, long_time_data): diff --git a/tests/test_topology_accessor.py b/tests/test_topology_accessor.py new file mode 100644 index 000000000..b1e3fdf31 --- /dev/null +++ b/tests/test_topology_accessor.py @@ -0,0 +1,126 @@ +"""Tests for the TopologyAccessor class.""" + +import tempfile +from pathlib import Path + +import pytest + +import flixopt as fx + + +@pytest.fixture +def flow_system(simple_flow_system): + """Get a simple flow system for testing.""" + if isinstance(simple_flow_system, fx.FlowSystem): + return simple_flow_system + return simple_flow_system[0] + + +class TestTopologyInfos: + """Tests for topology.infos() method.""" + + def test_infos_returns_tuple(self, flow_system): + """Test that infos() returns a tuple of two dicts.""" + result = flow_system.topology.infos() + assert isinstance(result, tuple) + assert len(result) == 2 + nodes, edges = result + assert isinstance(nodes, dict) + assert isinstance(edges, dict) + + def test_infos_nodes_have_correct_structure(self, flow_system): + """Test that nodes have label, class, and infos keys.""" + nodes, _ = flow_system.topology.infos() + for node_data in nodes.values(): + assert 'label' in node_data + assert 'class' in node_data + assert 'infos' in node_data + assert node_data['class'] in ('Bus', 'Component') + + def test_infos_edges_have_correct_structure(self, flow_system): + """Test that edges have label, start, end, and infos keys.""" + _, edges = flow_system.topology.infos() + for edge_data in edges.values(): + assert 'label' in edge_data + assert 'start' in edge_data + assert 'end' in edge_data + assert 'infos' in edge_data + + def test_infos_contains_all_elements(self, flow_system): + """Test that infos contains all components, buses, and flows.""" + nodes, edges = flow_system.topology.infos() + + # Check components + for comp in flow_system.components.values(): + assert comp.label in nodes + + # Check buses + for bus in flow_system.buses.values(): + assert bus.label in nodes + + # Check flows + for flow in flow_system.flows.values(): + assert flow.label_full in edges + + +class TestTopologyPlot: + """Tests for topology.plot() method.""" + + def test_plot_returns_network_or_none(self, flow_system): + """Test that plot() returns a pyvis Network or None.""" + try: + import pyvis + + result = flow_system.topology.plot(path=False, show=False) + assert result is None or isinstance(result, pyvis.network.Network) + except ImportError: + # pyvis not installed, should return None + result = flow_system.topology.plot(path=False, show=False) + assert result is None + + def test_plot_creates_html_file(self, flow_system): + """Test that plot() creates an HTML file when path is specified.""" + pytest.importorskip('pyvis') + + with tempfile.TemporaryDirectory() as tmpdir: + html_path = Path(tmpdir) / 'network.html' + flow_system.topology.plot(path=str(html_path), show=False) + assert html_path.exists() + content = html_path.read_text() + assert '' in content.lower() or '