-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
702 lines (610 loc) · 25.4 KB
/
cli.py
File metadata and controls
702 lines (610 loc) · 25.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
"""Command-line interface for NetGraph."""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
from ngraph.explorer import NetworkExplorer
from ngraph.logging import get_logger, set_global_log_level
from ngraph.profiling import PerformanceProfiler, PerformanceReporter
from ngraph.report import ReportGenerator
from ngraph.scenario import Scenario
logger = get_logger(__name__)
def _format_table(headers: List[str], rows: List[List[str]], min_width: int = 8) -> str:
"""Format data as a simple ASCII table.
Args:
headers: Column headers
rows: Data rows
min_width: Minimum column width
Returns:
Formatted table string
"""
if not rows:
return ""
# Calculate column widths
all_data = [headers] + rows
col_widths = []
for col_idx in range(len(headers)):
max_width = max(len(str(row[col_idx])) for row in all_data)
col_widths.append(max(max_width, min_width))
# Format rows
def format_row(row_data: List[str]) -> str:
return " " + " | ".join(
f"{str(item):<{col_widths[i]}}" for i, item in enumerate(row_data)
)
# Build table
lines = []
lines.append(format_row(headers))
lines.append(" " + "-+-".join("-" * width for width in col_widths))
for row in rows:
lines.append(format_row(row))
return "\n".join(lines)
def _inspect_scenario(path: Path, detail: bool = False) -> None:
"""Inspect a scenario file, validate it, and show key characteristics.
Args:
path: Scenario YAML file.
detail: Whether to show detailed information including sample node names.
"""
logger.info(f"Inspecting scenario from: {path}")
try:
# Load and validate scenario
yaml_text = path.read_text()
logger.info("✓ YAML file loaded successfully")
scenario = Scenario.from_yaml(yaml_text)
logger.info("✓ Scenario validated and loaded successfully")
# Show scenario metadata
print("\n" + "=" * 60)
print("NETGRAPH SCENARIO INSPECTION")
print("=" * 60)
print("\n1. SCENARIO METADATA")
print("-" * 30)
if scenario.seed is not None:
print(f" Seed: {scenario.seed} (deterministic)")
print(
" All workflow step seeds are derived deterministically from scenario seed"
)
else:
print(" Seed: None (non-deterministic)")
print(" Workflow step seeds will be random on each run")
# Network Analysis
print("\n2. NETWORK STRUCTURE")
print("-" * 30)
network = scenario.network
nodes = network.nodes
links = network.links
print(f" Total Nodes: {len(nodes):,}")
print(f" Total Links: {len(links):,}")
# Show enabled/disabled breakdown
enabled_nodes = [n for n in nodes.values() if not n.disabled]
disabled_nodes = [n for n in nodes.values() if n.disabled]
enabled_links = [link for link in links.values() if not link.disabled]
disabled_links = [link for link in links.values() if link.disabled]
print(f" Enabled Nodes: {len(enabled_nodes):,}")
if disabled_nodes:
print(f" Disabled Nodes: {len(disabled_nodes):,}")
print(f" Enabled Links: {len(enabled_links):,}")
if disabled_links:
print(f" Disabled Links: {len(disabled_links):,}")
# Network hierarchy analysis
if nodes:
# Suppress the "Analyzing..." log message during inspect for cleaner output
original_level = logger.level
logger.setLevel(logging.WARNING)
try:
explorer = NetworkExplorer.explore_network(
network, scenario.components_library
)
print("\n Network Hierarchy:")
explorer.print_tree(
max_depth=3 if not detail else None,
skip_leaves=not detail,
detailed=detail,
)
except Exception as e:
print(f" Network Hierarchy: Unable to analyze ({e})")
finally:
logger.setLevel(original_level)
# Show complete node and link tables in detail mode
if detail:
# Nodes table
if nodes:
print("\n Nodes:")
node_rows = []
for node_name in sorted(nodes.keys()):
node = nodes[node_name]
status = "disabled" if node.disabled else "enabled"
# Calculate total capacity and link count for this node
node_capacity = 0
node_link_count = 0
for link in links.values():
if link.source == node_name or link.target == node_name:
if not link.disabled:
node_capacity += link.capacity
node_link_count += 1
capacity_str = f"{node_capacity:,.0f}" if node_capacity > 0 else "0"
node_rows.append(
[node_name, status, capacity_str, str(node_link_count)]
)
node_table = _format_table(
["Node", "Status", "Tot. Capacity", "Links"], node_rows
)
print(node_table)
# Links table
if links:
print("\n Links:")
link_rows = []
for _link_id, link in links.items():
status = "disabled" if link.disabled else "enabled"
capacity = f"{link.capacity:,.0f}"
# Get cost if available
cost = ""
if hasattr(link, "cost") and link.cost:
cost = f"{link.cost:,.0f}"
elif hasattr(link, "attrs") and link.attrs and "cost" in link.attrs:
cost = f"{link.attrs['cost']:,.0f}"
link_rows.append([link.source, link.target, status, capacity, cost])
link_table = _format_table(
["Source", "Target", "Status", "Capacity", "Cost"], link_rows
)
print(link_table)
# Link capacity analysis as table
if links:
link_caps = [link.capacity for link in enabled_links]
if link_caps:
print("\n Link Capacity Statistics:")
cap_table = _format_table(
["Metric", "Value"],
[
["Min", f"{min(link_caps):,.1f}"],
["Max", f"{max(link_caps):,.1f}"],
["Mean", f"{sum(link_caps) / len(link_caps):,.1f}"],
["Total", f"{sum(link_caps):,.1f}"],
],
)
print(cap_table)
# Node capacity analysis
if nodes and links:
print("\n Node Capacity Statistics:")
node_capacities = []
for node_name in nodes.keys():
node_capacity = 0
for link in enabled_links:
if link.source == node_name or link.target == node_name:
node_capacity += link.capacity
if node_capacity > 0: # Only include nodes with links
node_capacities.append(node_capacity)
if node_capacities:
node_cap_table = _format_table(
["Metric", "Value"],
[
["Min", f"{min(node_capacities):,.1f}"],
["Max", f"{max(node_capacities):,.1f}"],
["Mean", f"{sum(node_capacities) / len(node_capacities):,.1f}"],
["Total", f"{sum(node_capacities):,.1f}"],
],
)
print(node_cap_table)
# Risk Groups Analysis
print("\n3. RISK GROUPS")
print("-" * 30)
if network.risk_groups:
print(f" Total: {len(network.risk_groups)}")
if detail:
# Show all risk groups in detail mode
for rg_name, rg in network.risk_groups.items():
status = "disabled" if rg.disabled else "enabled"
print(f" {rg_name} ({status})")
else:
# Show first 5 risk groups, then summary
risk_items = list(network.risk_groups.items())[:5]
for rg_name, rg in risk_items:
status = "disabled" if rg.disabled else "enabled"
print(f" {rg_name} ({status})")
if len(network.risk_groups) > 5:
remaining = len(network.risk_groups) - 5
print(f" ... and {remaining} more")
else:
print(" Total: 0")
# Components Library
print("\n4. COMPONENTS LIBRARY")
print("-" * 30)
comp_count = len(scenario.components_library.components)
print(f" Total: {comp_count}")
if scenario.components_library.components:
if detail:
# Show all components in detail mode
for comp_name in scenario.components_library.components.keys():
print(f" {comp_name}")
else:
# Show first 5 components, then summary
comp_items = list(scenario.components_library.components.keys())[:5]
for comp_name in comp_items:
print(f" {comp_name}")
if comp_count > 5:
remaining = comp_count - 5
print(f" ... and {remaining} more")
# Failure Policies Analysis
print("\n5. FAILURE POLICIES")
print("-" * 30)
policy_count = len(scenario.failure_policy_set.policies)
print(f" Total: {policy_count}")
if scenario.failure_policy_set.policies:
policy_items = list(scenario.failure_policy_set.policies.items())[:5]
for policy_name, policy in policy_items:
rules_count = len(policy.rules)
print(
f" {policy_name}: {rules_count} rule{'s' if rules_count != 1 else ''}"
)
if detail and rules_count > 0:
for i, rule in enumerate(policy.rules[:3]): # Show first 3 rules
print(f" {i + 1}. {rule.entity_scope} {rule.rule_type}")
if rules_count > 3:
print(f" ... and {rules_count - 3} more rules")
if policy_count > 5:
remaining = policy_count - 5
print(f" ... and {remaining} more")
# Traffic Matrices Analysis
print("\n6. TRAFFIC MATRICES")
print("-" * 30)
matrix_count = len(scenario.traffic_matrix_set.matrices)
print(f" Total: {matrix_count}")
if scenario.traffic_matrix_set.matrices:
matrix_items = list(scenario.traffic_matrix_set.matrices.items())[:5]
for matrix_name, demands in matrix_items:
demand_count = len(demands)
print(
f" {matrix_name}: {demand_count} demand{'s' if demand_count != 1 else ''}"
)
if detail and demands:
for i, demand in enumerate(demands[:3]): # Show first 3 demands
print(
f" {i + 1}. {demand.source_path} → {demand.sink_path} ({demand.demand})"
)
if demand_count > 3:
print(f" ... and {demand_count - 3} more demands")
if matrix_count > 5:
remaining = matrix_count - 5
print(f" ... and {remaining} more")
# Workflow Analysis as table
print("\n7. WORKFLOW STEPS")
print("-" * 30)
step_count = len(scenario.workflow)
print(f" Total: {step_count}")
if scenario.workflow:
if not detail:
# Simple table format for basic view
workflow_rows = []
for i, step in enumerate(scenario.workflow):
step_name = step.name or f"step_{i + 1}"
step_type = step.__class__.__name__
determinism = (
"deterministic" if scenario.seed is not None else "random"
)
workflow_rows.append(
[str(i + 1), step_name, step_type, determinism]
)
workflow_table = _format_table(
["#", "Name", "Type", "Seed"], workflow_rows
)
print(workflow_table)
else:
# Detailed view with parameters
for i, step in enumerate(scenario.workflow):
step_name = step.name or f"step_{i + 1}"
step_type = step.__class__.__name__
determinism = (
"deterministic" if scenario.seed is not None else "random"
)
seed_info = (
f" (seed: {step.seed}, {determinism})"
if step.seed is not None
else f" ({determinism})"
)
print(f" {i + 1}. {step_name} ({step_type}){seed_info}")
# Show step-specific parameters if detail mode
step_dict = step.__dict__
param_rows = []
for key, value in step_dict.items():
if key not in ["name", "seed"] and not key.startswith("_"):
param_rows.append([key, str(value)])
if param_rows:
param_table = _format_table(["Parameter", "Value"], param_rows)
# Indent the table
indented_table = "\n".join(
f" {line}" for line in param_table.split("\n")
)
print(indented_table)
print("\n" + "=" * 60)
print("INSPECTION COMPLETE")
print("=" * 60)
if scenario.workflow:
print(
f"\nReady to run with {len(scenario.workflow)} workflow step{'s' if len(scenario.workflow) != 1 else ''}"
)
print(f"Usage: python -m ngraph run {path}")
else:
print("\nNo workflow steps defined")
print(
"This scenario can be used for network analysis but has no automated workflow"
)
logger.info("Scenario inspection completed successfully")
except FileNotFoundError:
print(f"❌ ERROR: Scenario file not found: {path}")
sys.exit(1)
except Exception as e:
logger.error(f"Failed to inspect scenario: {e}")
print("❌ ERROR: Failed to inspect scenario")
print(f" {type(e).__name__}: {e}")
sys.exit(1)
def _run_scenario(
path: Path,
output: Path,
no_results: bool,
stdout: bool,
keys: Optional[list[str]] = None,
profile: bool = False,
) -> None:
"""Run a scenario file and export results as JSON by default.
Args:
path: Scenario YAML file.
output: Path where JSON results should be written.
no_results: Whether to disable results file generation.
stdout: Whether to also print results to stdout.
keys: Optional list of workflow step names to include. When ``None`` all steps are
exported.
profile: Whether to enable performance profiling with CPU analysis.
"""
logger.info(f"Loading scenario from: {path}")
try:
yaml_text = path.read_text()
scenario = Scenario.from_yaml(yaml_text)
if profile:
logger.info("Performance profiling enabled")
# Initialize detailed profiler
profiler = PerformanceProfiler()
# Start scenario-level profiling
profiler.start_scenario()
logger.info("Starting scenario execution with profiling")
# Enable child-process profiling for parallel workflows
child_profile_dir = Path("worker_profiles")
child_profile_dir.mkdir(exist_ok=True)
os.environ["NGRAPH_PROFILE_DIR"] = str(child_profile_dir.resolve())
logger.info(f"Worker profiles will be saved to: {child_profile_dir}")
# Manual execution of workflow steps with profiling
for step in scenario.workflow:
step_name = step.name or step.__class__.__name__
step_type = step.__class__.__name__
with profiler.profile_step(step_name, step_type):
step.execute(scenario)
# Merge any worker profiles generated by this step
if child_profile_dir.exists():
profiler.merge_child_profiles(child_profile_dir, step_name)
logger.info("Scenario execution completed successfully")
# End scenario profiling and analyze results
profiler.end_scenario()
profiler.analyze_performance()
# Clean up any remaining worker profile files
if child_profile_dir.exists():
remaining_files = list(child_profile_dir.glob("*.pstats"))
if remaining_files:
logger.debug(
f"Cleaning up {len(remaining_files)} remaining profile files"
)
for f in remaining_files:
try:
f.unlink()
except Exception:
pass
try:
child_profile_dir.rmdir() # Remove dir if empty
except Exception:
pass
# Generate and display performance report
reporter = PerformanceReporter(profiler.results)
performance_report = reporter.generate_report()
print("\n" + performance_report)
else:
logger.info("Starting scenario execution")
scenario.run()
logger.info("Scenario execution completed successfully")
print("✅ Scenario execution completed")
# Export JSON results by default unless disabled
if not no_results:
logger.info("Serializing results to JSON")
results_dict: Dict[str, Dict[str, Any]] = scenario.results.to_dict()
if keys:
filtered: Dict[str, Dict[str, Any]] = {}
for step, data in results_dict.items():
if step in keys:
filtered[step] = data
results_dict = filtered
json_str = json.dumps(results_dict, indent=2, default=str)
logger.info(f"Writing results to: {output}")
output.write_text(json_str)
logger.info("Results written successfully")
print(f"✅ Results written to: {output}")
if stdout:
print(json_str)
elif stdout:
# Print to stdout even without file export
results_dict: Dict[str, Dict[str, Any]] = scenario.results.to_dict()
if keys:
filtered: Dict[str, Dict[str, Any]] = {}
for step, data in results_dict.items():
if step in keys:
filtered[step] = data
results_dict = filtered
json_str = json.dumps(results_dict, indent=2, default=str)
print(json_str)
except FileNotFoundError:
logger.error(f"Scenario file not found: {path}")
print(f"❌ ERROR: Scenario file not found: {path}")
sys.exit(1)
except Exception as e:
logger.error(f"Failed to run scenario: {type(e).__name__}: {e}")
print(f"❌ ERROR: Failed to run scenario: {type(e).__name__}: {e}")
sys.exit(1)
def main(argv: Optional[List[str]] = None) -> None:
"""Entry point for the ``ngraph`` command.
Args:
argv: Optional list of command-line arguments. If ``None``, ``sys.argv``
is used.
"""
parser = argparse.ArgumentParser(prog="ngraph")
# Global options
parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose (DEBUG) logging"
)
parser.add_argument(
"--quiet", "-q", action="store_true", help="Enable quiet mode (WARNING+ only)"
)
subparsers = parser.add_subparsers(dest="command", required=True)
# Run command
run_parser = subparsers.add_parser("run", help="Run a scenario")
run_parser.add_argument("scenario", type=Path, help="Path to scenario YAML")
run_parser.add_argument(
"--results",
"-r",
type=Path,
default=Path("results.json"),
help="Export results to JSON file (default: results.json)",
)
run_parser.add_argument(
"--no-results",
action="store_true",
help="Disable results file generation",
)
run_parser.add_argument(
"--stdout",
action="store_true",
help="Print results to stdout",
)
run_parser.add_argument(
"--keys",
"-k",
nargs="+",
help="Filter output to these workflow step names",
)
run_parser.add_argument(
"--profile",
action="store_true",
help="Enable performance profiling with CPU analysis and bottleneck detection",
)
# Inspect command
inspect_parser = subparsers.add_parser(
"inspect", help="Inspect and validate a scenario"
)
inspect_parser.add_argument("scenario", type=Path, help="Path to scenario YAML")
inspect_parser.add_argument(
"--detail",
"-d",
action="store_true",
help="Show detailed information including complete node/link tables and step parameters",
)
# Report command
report_parser = subparsers.add_parser(
"report", help="Generate analysis reports from results file"
)
report_parser.add_argument(
"results",
type=Path,
nargs="?",
default=Path("results.json"),
help="Path to results JSON file (default: results.json)",
)
report_parser.add_argument(
"--notebook",
"-n",
type=Path,
help="Output path for Jupyter notebook (default: analysis.ipynb)",
)
report_parser.add_argument(
"--html",
type=Path,
nargs="?",
const=Path("analysis.html"),
help="Generate HTML report (default: analysis.html if no path specified)",
)
report_parser.add_argument(
"--include-code",
action="store_true",
help="Include code cells in HTML output (default: report without code)",
)
args = parser.parse_args(argv)
# Configure logging based on arguments
if args.verbose:
set_global_log_level(logging.DEBUG)
logger.debug("Debug logging enabled")
elif args.quiet:
set_global_log_level(logging.WARNING)
else:
set_global_log_level(logging.INFO)
if args.command == "run":
_run_scenario(
args.scenario,
args.results,
args.no_results,
args.stdout,
args.keys,
args.profile,
)
elif args.command == "inspect":
_inspect_scenario(args.scenario, args.detail)
elif args.command == "report":
_generate_report(
args.results,
args.notebook,
args.html,
args.include_code,
)
def _generate_report(
results_path: Path,
notebook_path: Optional[Path],
html_path: Optional[Path],
include_code: bool,
) -> None:
"""Generate analysis reports from results file.
Args:
results_path: Path to results.json file.
notebook_path: Output path for notebook (default: analysis.ipynb).
html_path: Output path for HTML report (None = no HTML).
include_code: Whether to include code cells in HTML output.
"""
logger.info(f"Generating report from: {results_path}")
try:
# Initialize report generator
generator = ReportGenerator(results_path)
generator.load_results()
# Generate notebook
notebook_output = notebook_path or Path("analysis.ipynb")
generated_notebook = generator.generate_notebook(notebook_output)
print(f"✅ Notebook generated: {generated_notebook}")
# Generate HTML if requested
if html_path:
generated_html = generator.generate_html_report(
notebook_path=generated_notebook,
html_path=html_path,
include_code=include_code,
)
print(f"✅ HTML report generated: {generated_html}")
except FileNotFoundError as e:
logger.error(f"Results file not found: {e}")
print(f"❌ ERROR: Results file not found: {e}")
sys.exit(1)
except ValueError as e:
logger.error(f"Invalid results data: {e}")
print(f"❌ ERROR: Invalid results data: {e}")
sys.exit(1)
except RuntimeError as e:
logger.error(f"Report generation failed: {e}")
print(f"❌ ERROR: Report generation failed: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error: {e}")
print(f"❌ ERROR: Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()