|
| 1 | +"""Sensitivity workflow step. |
| 2 | +
|
| 3 | +Monte Carlo sensitivity analysis of network bottlenecks between node groups |
| 4 | +using FailureManager. Identifies critical edges and quantifies their impact |
| 5 | +on flow capacity across failure scenarios. |
| 6 | +
|
| 7 | +Baseline (no failures) is always run first as a separate reference. The |
| 8 | +``iterations`` parameter specifies how many failure scenarios to run. |
| 9 | +Per-iteration results include per-edge flow-reduction deltas. Aggregated |
| 10 | +``component_scores`` summarize mean/max/min impact across all iterations. |
| 11 | +
|
| 12 | +YAML Configuration Example: |
| 13 | +
|
| 14 | + workflow: |
| 15 | + - type: Sensitivity |
| 16 | + name: "bottleneck_analysis" |
| 17 | + source: "^datacenter/.*" |
| 18 | + target: "^edge/.*" |
| 19 | + mode: "combine" |
| 20 | + failure_policy: "random_failures" |
| 21 | + iterations: 100 |
| 22 | + parallelism: auto |
| 23 | + shortest_path: false |
| 24 | + flow_placement: "PROPORTIONAL" |
| 25 | + seed: 42 |
| 26 | + store_failure_patterns: false |
| 27 | +""" |
| 28 | + |
| 29 | +from __future__ import annotations |
| 30 | + |
| 31 | +import time |
| 32 | +from dataclasses import dataclass |
| 33 | +from typing import TYPE_CHECKING, Any, Dict, Union |
| 34 | + |
| 35 | +from ngraph.analysis.failure_manager import FailureManager |
| 36 | +from ngraph.logging import get_logger |
| 37 | +from ngraph.results.flow import FlowIterationResult |
| 38 | +from ngraph.types.base import FlowPlacement |
| 39 | +from ngraph.workflow.base import ( |
| 40 | + WorkflowStep, |
| 41 | + register_workflow_step, |
| 42 | + resolve_parallelism, |
| 43 | +) |
| 44 | + |
| 45 | +if TYPE_CHECKING: |
| 46 | + from ngraph.scenario import Scenario |
| 47 | + |
| 48 | +logger = get_logger(__name__) |
| 49 | + |
| 50 | + |
| 51 | +@dataclass |
| 52 | +class Sensitivity(WorkflowStep): |
| 53 | + """Monte Carlo sensitivity analysis workflow step. |
| 54 | +
|
| 55 | + Identifies critical network edges by measuring the flow-capacity reduction |
| 56 | + caused by removing each one, across Monte Carlo failure scenarios. Results |
| 57 | + include per-iteration sensitivity maps and aggregated component scores. |
| 58 | +
|
| 59 | + Baseline (no failures) is always run first as a separate reference. The |
| 60 | + flow_results list contains unique failure patterns (deduplicated); each |
| 61 | + result has occurrence_count indicating how many iterations matched that |
| 62 | + pattern. |
| 63 | +
|
| 64 | + Attributes: |
| 65 | + source: Source node selector (string path or selector dict). |
| 66 | + target: Target node selector (string path or selector dict). |
| 67 | + mode: Flow analysis mode ("combine" or "pairwise"). |
| 68 | + failure_policy: Name of failure policy in scenario.failure_policy_set. |
| 69 | + iterations: Number of failure iterations to run. |
| 70 | + parallelism: Number of parallel worker threads. |
| 71 | + shortest_path: Whether to use shortest paths only. |
| 72 | + flow_placement: Flow placement strategy. |
| 73 | + seed: Optional seed for reproducible results. |
| 74 | + store_failure_patterns: Whether to store failure patterns in results. |
| 75 | + """ |
| 76 | + |
| 77 | + source: Union[str, Dict[str, Any]] = "" |
| 78 | + target: Union[str, Dict[str, Any]] = "" |
| 79 | + mode: str = "combine" |
| 80 | + failure_policy: str | None = None |
| 81 | + iterations: int = 1 |
| 82 | + parallelism: int | str = "auto" |
| 83 | + shortest_path: bool = False |
| 84 | + flow_placement: FlowPlacement | str = FlowPlacement.PROPORTIONAL |
| 85 | + seed: int | None = None |
| 86 | + store_failure_patterns: bool = False |
| 87 | + |
| 88 | + def __post_init__(self) -> None: |
| 89 | + if self.iterations < 0: |
| 90 | + raise ValueError("iterations must be >= 0") |
| 91 | + if isinstance(self.parallelism, str): |
| 92 | + if self.parallelism != "auto": |
| 93 | + raise ValueError("parallelism must be an integer or 'auto'") |
| 94 | + else: |
| 95 | + if self.parallelism < 1: |
| 96 | + raise ValueError("parallelism must be >= 1") |
| 97 | + if self.mode not in {"combine", "pairwise"}: |
| 98 | + raise ValueError("mode must be 'combine' or 'pairwise'") |
| 99 | + if isinstance(self.flow_placement, str): |
| 100 | + self.flow_placement = FlowPlacement.from_string(self.flow_placement) |
| 101 | + |
| 102 | + def run(self, scenario: "Scenario") -> None: |
| 103 | + t0 = time.perf_counter() |
| 104 | + logger.info("Starting Sensitivity: name=%s", self.name) |
| 105 | + logger.debug( |
| 106 | + "Sensitivity params: source=%s target=%s mode=%s failure_iters=%d " |
| 107 | + "parallelism=%s failure_policy=%s shortest_path=%s", |
| 108 | + self.source, |
| 109 | + self.target, |
| 110 | + self.mode, |
| 111 | + self.iterations, |
| 112 | + self.parallelism, |
| 113 | + self.failure_policy, |
| 114 | + self.shortest_path, |
| 115 | + ) |
| 116 | + |
| 117 | + fm = FailureManager( |
| 118 | + network=scenario.network, |
| 119 | + failure_policy_set=scenario.failure_policy_set, |
| 120 | + policy_name=self.failure_policy, |
| 121 | + ) |
| 122 | + effective_parallelism = resolve_parallelism(self.parallelism) |
| 123 | + raw = fm.run_sensitivity_monte_carlo( |
| 124 | + source=self.source, |
| 125 | + target=self.target, |
| 126 | + mode=self.mode, |
| 127 | + iterations=self.iterations, |
| 128 | + parallelism=effective_parallelism, |
| 129 | + shortest_path=self.shortest_path, |
| 130 | + flow_placement=self.flow_placement, |
| 131 | + seed=self.seed, |
| 132 | + store_failure_patterns=self.store_failure_patterns, |
| 133 | + ) |
| 134 | + |
| 135 | + scenario.results.put("metadata", raw.get("metadata", {})) |
| 136 | + |
| 137 | + # Handle baseline (separate from failure results) |
| 138 | + baseline_result = raw.get("baseline") |
| 139 | + baseline_dict = None |
| 140 | + if baseline_result is not None: |
| 141 | + if hasattr(baseline_result, "to_dict"): |
| 142 | + baseline_dict = baseline_result.to_dict() |
| 143 | + else: |
| 144 | + baseline_dict = baseline_result |
| 145 | + |
| 146 | + # Handle failure results |
| 147 | + flow_results: list[dict] = [] |
| 148 | + for item in raw.get("results", []): |
| 149 | + if isinstance(item, FlowIterationResult): |
| 150 | + flow_results.append(item.to_dict()) |
| 151 | + elif hasattr(item, "to_dict") and callable(item.to_dict): |
| 152 | + flow_results.append(item.to_dict()) # type: ignore[union-attr] |
| 153 | + else: |
| 154 | + flow_results.append(item) |
| 155 | + |
| 156 | + # Component scores: aggregated per-component sensitivity statistics |
| 157 | + component_scores = raw.get("component_scores", {}) |
| 158 | + |
| 159 | + context = { |
| 160 | + "source": self.source, |
| 161 | + "target": self.target, |
| 162 | + "mode": self.mode, |
| 163 | + "shortest_path": bool(self.shortest_path), |
| 164 | + "flow_placement": getattr( |
| 165 | + self.flow_placement, "name", str(self.flow_placement) |
| 166 | + ), |
| 167 | + } |
| 168 | + scenario.results.put( |
| 169 | + "data", |
| 170 | + { |
| 171 | + "baseline": baseline_dict, |
| 172 | + "flow_results": flow_results, |
| 173 | + "component_scores": component_scores, |
| 174 | + "context": context, |
| 175 | + }, |
| 176 | + ) |
| 177 | + |
| 178 | + metadata = raw.get("metadata", {}) |
| 179 | + logger.info( |
| 180 | + "Sensitivity completed: name=%s failure_iters=%d unique_patterns=%d " |
| 181 | + "workers=%d duration=%.3fs", |
| 182 | + self.name, |
| 183 | + metadata.get("iterations", self.iterations), |
| 184 | + metadata.get("unique_patterns", 0), |
| 185 | + metadata.get("parallelism", effective_parallelism), |
| 186 | + time.perf_counter() - t0, |
| 187 | + ) |
| 188 | + |
| 189 | + |
| 190 | +register_workflow_step("Sensitivity")(Sensitivity) |
0 commit comments