Skip to content

Commit 42ce30d

Browse files
committed
CostPower workflow step
1 parent 3bcd3de commit 42ce30d

6 files changed

Lines changed: 428 additions & 534 deletions

File tree

ngraph/workflow/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from .base import WorkflowStep, register_workflow_step
44
from .build_graph import BuildGraph
5-
from .cost_power_efficiency import CostPowerEfficiency
5+
from .cost_power import CostPower
66
from .max_flow_step import MaxFlow
77
from .maximum_supported_demand_step import MaximumSupportedDemand
88
from .network_stats import NetworkStats
@@ -16,5 +16,5 @@
1616
"NetworkStats",
1717
"TrafficMatrixPlacement",
1818
"MaximumSupportedDemand",
19-
"CostPowerEfficiency",
19+
"CostPower",
2020
]

ngraph/workflow/cost_power.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
"""CostPower workflow step: collect capex and power by hierarchy level.
2+
3+
This step aggregates capex and power from the network hardware inventory without
4+
performing any normalization or reporting. It separates contributions into two
5+
categories:
6+
7+
- platform_*: node hardware (e.g., chassis, linecards) resolved from node attrs
8+
- optics_*: per-end link hardware (e.g., optics) resolved from link attrs
9+
10+
Aggregation is computed at hierarchy levels 0..N where level 0 is the global
11+
root (path ""), and higher levels correspond to prefixes of node names split by
12+
"/". For example, for node "dc1/plane1/leaf/leaf-1":
13+
- level 1 path is "dc1"
14+
- level 2 path is "dc1/plane1"
15+
- etc.
16+
17+
Disabled handling:
18+
- When include_disabled is False, only enabled nodes and links are considered.
19+
- Optics are counted only when the endpoint node has platform hardware.
20+
21+
YAML Configuration Example:
22+
```yaml
23+
workflow:
24+
- step_type: CostPower
25+
name: "cost_power" # Optional custom name
26+
include_disabled: false # Default: only enabled nodes/links
27+
aggregation_level: 2 # Produce levels: 0, 1, 2
28+
```
29+
30+
Results stored in `scenario.results` under this step namespace:
31+
data:
32+
context:
33+
include_disabled: bool
34+
aggregation_level: int
35+
levels:
36+
"0":
37+
- path: ""
38+
platform_capex: float
39+
platform_power_watts: float
40+
optics_capex: float
41+
optics_power_watts: float
42+
capex_total: float
43+
power_total_watts: float
44+
"1": [ ... ]
45+
"2": [ ... ]
46+
"""
47+
48+
from __future__ import annotations
49+
50+
import time
51+
from dataclasses import dataclass
52+
from typing import Any, Dict, List
53+
54+
from ngraph.components import (
55+
ComponentsLibrary,
56+
resolve_link_end_components,
57+
resolve_node_hardware,
58+
totals_with_multiplier,
59+
)
60+
from ngraph.explorer import NetworkExplorer
61+
from ngraph.logging import get_logger
62+
from ngraph.workflow.base import WorkflowStep, register_workflow_step
63+
64+
logger = get_logger(__name__)
65+
66+
67+
@dataclass
68+
class CostPower(WorkflowStep):
69+
"""Collect platform and optics capex/power by aggregation level.
70+
71+
Attributes:
72+
include_disabled: If True, include disabled nodes and links.
73+
aggregation_level: Inclusive depth for aggregation. 0=root only.
74+
"""
75+
76+
include_disabled: bool = False
77+
aggregation_level: int = 2
78+
79+
def __post_init__(self) -> None:
80+
try:
81+
self.aggregation_level = int(self.aggregation_level)
82+
except Exception as exc:
83+
raise ValueError(f"aggregation_level must be int: {exc}") from exc
84+
if self.aggregation_level < 0:
85+
raise ValueError("aggregation_level must be >= 0")
86+
87+
def run(self, scenario: Any) -> None:
88+
"""Aggregate capex and power by hierarchy levels 0..N.
89+
90+
Args:
91+
scenario: Scenario with network, components_library, and results store.
92+
"""
93+
t0 = time.perf_counter()
94+
logger.info(
95+
"Starting CostPower: name=%s include_disabled=%s levels=0..%d",
96+
self.name or self.__class__.__name__,
97+
str(self.include_disabled),
98+
int(self.aggregation_level),
99+
)
100+
101+
network = scenario.network
102+
library: ComponentsLibrary = scenario.components_library
103+
104+
explorer = NetworkExplorer.explore_network(network, components_library=library)
105+
106+
# Helper: enabled checks honor both flags and attrs for consistency
107+
def node_enabled(nd: Any) -> bool:
108+
return not (
109+
bool(getattr(nd, "disabled", False)) or bool(nd.attrs.get("disabled"))
110+
)
111+
112+
def link_enabled(lk: Any) -> bool:
113+
return not (
114+
bool(getattr(lk, "disabled", False)) or bool(lk.attrs.get("disabled"))
115+
)
116+
117+
# Precompute endpoint eligibility for optics (node must have platform HW)
118+
node_has_hw: Dict[str, bool] = {}
119+
for nd in network.nodes.values():
120+
comp, _ = resolve_node_hardware(nd.attrs, library)
121+
node_has_hw[nd.name] = comp is not None
122+
123+
# Aggregation maps: level -> path -> [platform_capex, platform_power, optics_capex, optics_power]
124+
levels: Dict[int, Dict[str, List[float]]] = {
125+
lvl: {} for lvl in range(0, self.aggregation_level + 1)
126+
}
127+
128+
def path_prefix(full_path: str, level: int) -> str:
129+
if level <= 0:
130+
return ""
131+
parts = [p for p in full_path.split("/") if p]
132+
return "/".join(parts[:level])
133+
134+
def add_values(
135+
path: str,
136+
platform_capex: float,
137+
platform_power: float,
138+
optics_capex: float,
139+
optics_power: float,
140+
) -> None:
141+
for lvl in range(0, self.aggregation_level + 1):
142+
key = path_prefix(path, lvl)
143+
bucket = levels[lvl].setdefault(key, [0.0, 0.0, 0.0, 0.0])
144+
bucket[0] += platform_capex
145+
bucket[1] += platform_power
146+
bucket[2] += optics_capex
147+
bucket[3] += optics_power
148+
149+
# --- Platform aggregation (nodes) ---
150+
for nd in network.nodes.values():
151+
if not self.include_disabled and not node_enabled(nd):
152+
continue
153+
comp, count = resolve_node_hardware(nd.attrs, library)
154+
if comp is None:
155+
continue
156+
capex, power, _ = totals_with_multiplier(comp, count)
157+
tree_node = explorer._node_map.get(nd.name)
158+
if tree_node is None:
159+
continue
160+
full_path = explorer._compute_full_path(tree_node)
161+
add_values(full_path, float(capex), float(power), 0.0, 0.0)
162+
163+
# --- Optics aggregation (per-end link hardware) ---
164+
for lk in network.links.values():
165+
if not self.include_disabled:
166+
if not link_enabled(lk):
167+
continue
168+
# Both endpoints must be enabled when aggregating active view
169+
if not node_enabled(network.nodes[lk.source]):
170+
continue
171+
if not node_enabled(network.nodes[lk.target]):
172+
continue
173+
174+
(src_end, dst_end, per_end) = resolve_link_end_components(lk.attrs, library)
175+
if not per_end:
176+
continue
177+
178+
# Source endpoint
179+
src_comp, src_cnt, _src_excl = src_end
180+
if src_comp is not None and node_has_hw.get(lk.source, False):
181+
capex, power, _ = totals_with_multiplier(src_comp, src_cnt)
182+
src_tree = explorer._node_map.get(lk.source)
183+
if src_tree is not None:
184+
src_path = explorer._compute_full_path(src_tree)
185+
add_values(src_path, 0.0, 0.0, float(capex), float(power))
186+
187+
# Destination endpoint
188+
dst_comp, dst_cnt, _dst_excl = dst_end
189+
if dst_comp is not None and node_has_hw.get(lk.target, False):
190+
capex, power, _ = totals_with_multiplier(dst_comp, dst_cnt)
191+
dst_tree = explorer._node_map.get(lk.target)
192+
if dst_tree is not None:
193+
dst_path = explorer._compute_full_path(dst_tree)
194+
add_values(dst_path, 0.0, 0.0, float(capex), float(power))
195+
196+
# Build payload
197+
levels_payload: Dict[int, List[Dict[str, Any]]] = {}
198+
for lvl, mapping in levels.items():
199+
out_list: List[Dict[str, Any]] = []
200+
for path, vals in sorted(mapping.items(), key=lambda kv: kv[0]):
201+
platform_capex, platform_power, optics_capex, optics_power = vals
202+
out_list.append(
203+
{
204+
"path": path,
205+
"platform_capex": float(platform_capex),
206+
"platform_power_watts": float(platform_power),
207+
"optics_capex": float(optics_capex),
208+
"optics_power_watts": float(optics_power),
209+
"capex_total": float(platform_capex + optics_capex),
210+
"power_total_watts": float(platform_power + optics_power),
211+
}
212+
)
213+
levels_payload[lvl] = out_list
214+
215+
# Store results
216+
scenario.results.put("metadata", {})
217+
scenario.results.put(
218+
"data",
219+
{
220+
"context": {
221+
"include_disabled": bool(self.include_disabled),
222+
"aggregation_level": int(self.aggregation_level),
223+
},
224+
"levels": levels_payload,
225+
},
226+
)
227+
228+
# Log root summary
229+
try:
230+
root_items = levels_payload.get(0, [])
231+
root = root_items[0] if root_items else {}
232+
logger.info(
233+
"CostPower complete: name=%s capex=%.3f power=%.3f platform_capex=%.3f optics_capex=%.3f duration=%.3fs",
234+
self.name or self.__class__.__name__,
235+
float(root.get("capex_total", 0.0)),
236+
float(root.get("power_total_watts", 0.0)),
237+
float(root.get("platform_capex", 0.0)),
238+
float(root.get("optics_capex", 0.0)),
239+
time.perf_counter() - t0,
240+
)
241+
except Exception:
242+
pass
243+
244+
245+
register_workflow_step("CostPower")(CostPower)

0 commit comments

Comments
 (0)