Skip to content

Commit 7abee6c

Browse files
committed
simple scenario and more logging for analysis workflow
1 parent 08318e8 commit 7abee6c

3 files changed

Lines changed: 321 additions & 6 deletions

File tree

docs/reference/api-full.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ For a curated, example-driven API guide, see **[api.md](api.md)**.
1010
> - **[CLI Reference](cli.md)** - Command-line interface
1111
> - **[DSL Reference](dsl.md)** - YAML syntax guide
1212
13-
**Generated from source code on:** June 17, 2025 at 01:32 UTC
13+
**Generated from source code on:** June 17, 2025 at 01:50 UTC
1414

1515
**Modules auto-discovered:** 42
1616

ngraph/workflow/capacity_envelope_analysis.py

Lines changed: 161 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import TYPE_CHECKING, Any
1313

1414
from ngraph.lib.algorithms.base import FlowPlacement
15+
from ngraph.logging import get_logger
1516
from ngraph.results_artifacts import CapacityEnvelope
1617
from ngraph.workflow.base import WorkflowStep, register_workflow_step
1718

@@ -20,6 +21,8 @@
2021
from ngraph.network import Network
2122
from ngraph.scenario import Scenario
2223

24+
logger = get_logger(__name__)
25+
2326

2427
def _worker(args: tuple[Any, ...]) -> list[tuple[str, str, float]]:
2528
"""Worker function for parallel capacity envelope analysis.
@@ -31,6 +34,9 @@ def _worker(args: tuple[Any, ...]) -> list[tuple[str, str, float]]:
3134
Returns:
3235
List of (src_label, dst_label, flow_value) tuples from max_flow results.
3336
"""
37+
# Set up worker-specific logger
38+
worker_logger = get_logger(f"{__name__}.worker")
39+
3440
(
3541
base_network,
3642
base_policy,
@@ -42,25 +48,40 @@ def _worker(args: tuple[Any, ...]) -> list[tuple[str, str, float]]:
4248
seed_offset,
4349
) = args
4450

51+
worker_pid = os.getpid()
52+
worker_logger.debug(f"Worker {worker_pid} started with seed_offset={seed_offset}")
53+
4554
# Set up unique random seed for this worker iteration
4655
if seed_offset is not None:
4756
random.seed(seed_offset)
57+
worker_logger.debug(
58+
f"Worker {worker_pid} using provided seed offset: {seed_offset}"
59+
)
4860
else:
4961
# Use pid ^ time_ns for statistical independence when no seed provided
50-
random.seed(os.getpid() ^ time.time_ns())
62+
actual_seed = worker_pid ^ time.time_ns()
63+
random.seed(actual_seed)
64+
worker_logger.debug(f"Worker {worker_pid} generated seed: {actual_seed}")
5165

5266
# Work on deep copies to avoid modifying shared data
67+
worker_logger.debug(
68+
f"Worker {worker_pid} creating deep copies of network and policy"
69+
)
5370
net = copy.deepcopy(base_network)
5471
pol = copy.deepcopy(base_policy) if base_policy else None
5572

5673
if pol:
5774
pol.use_cache = False # Local run, no benefit to caching
75+
worker_logger.debug(f"Worker {worker_pid} applying failure policy")
5876

5977
# Apply failures to the network
6078
node_map = {n_name: n.attrs for n_name, n in net.nodes.items()}
6179
link_map = {link_name: link.attrs for link_name, link in net.links.items()}
6280

6381
failed_ids = pol.apply_failures(node_map, link_map, net.risk_groups)
82+
worker_logger.debug(
83+
f"Worker {worker_pid} applied failures: {len(failed_ids)} entities failed"
84+
)
6485

6586
# Disable the failed entities
6687
for f_id in failed_ids:
@@ -71,7 +92,15 @@ def _worker(args: tuple[Any, ...]) -> list[tuple[str, str, float]]:
7192
elif f_id in net.risk_groups:
7293
net.disable_risk_group(f_id, recursive=True)
7394

95+
if failed_ids:
96+
worker_logger.debug(
97+
f"Worker {worker_pid} disabled failed entities: {failed_ids}"
98+
)
99+
74100
# Compute max flow using the configured parameters
101+
worker_logger.debug(
102+
f"Worker {worker_pid} computing max flow: source={source_regex}, sink={sink_regex}, mode={mode}"
103+
)
75104
flows = net.max_flow(
76105
source_regex,
77106
sink_regex,
@@ -81,7 +110,15 @@ def _worker(args: tuple[Any, ...]) -> list[tuple[str, str, float]]:
81110
)
82111

83112
# Flatten to a pickle-friendly list
84-
return [(src, dst, val) for (src, dst), val in flows.items()]
113+
result = [(src, dst, val) for (src, dst), val in flows.items()]
114+
worker_logger.debug(f"Worker {worker_pid} computed {len(result)} flow results")
115+
116+
# Log summary of results for debugging
117+
if result:
118+
total_flow = sum(val for _, _, val in result)
119+
worker_logger.debug(f"Worker {worker_pid} total flow: {total_flow:.2f}")
120+
121+
return result
85122

86123

87124
def _run_single_iteration(
@@ -108,6 +145,7 @@ def _run_single_iteration(
108145
samples: Dictionary to accumulate results into
109146
seed_offset: Optional seed offset for deterministic results
110147
"""
148+
logger.debug(f"Running single iteration with seed_offset={seed_offset}")
111149
res = _worker(
112150
(
113151
base_network,
@@ -120,6 +158,7 @@ def _run_single_iteration(
120158
seed_offset,
121159
)
122160
)
161+
logger.debug(f"Single iteration produced {len(res)} flow results")
123162
for src, dst, val in res:
124163
if (src, dst) not in samples:
125164
samples[(src, dst)] = []
@@ -197,23 +236,39 @@ def run(self, scenario: "Scenario") -> None:
197236
Args:
198237
scenario: The scenario containing network, failure policies, and results.
199238
"""
239+
# Log analysis parameters (base class handles start/end timing)
240+
logger.debug(
241+
f"Analysis parameters: source_path={self.source_path}, sink_path={self.sink_path}, "
242+
f"mode={self.mode}, iterations={self.iterations}, parallelism={self.parallelism}, "
243+
f"failure_policy={self.failure_policy}"
244+
)
245+
200246
# Get the failure policy to use
201247
base_policy = self._get_failure_policy(scenario)
248+
if base_policy:
249+
logger.debug(
250+
f"Using failure policy: {self.failure_policy} with {len(base_policy.rules)} rules"
251+
)
252+
else:
253+
logger.debug("No failure policy specified - running baseline analysis only")
202254

203255
# Validate iterations parameter based on failure policy
204256
self._validate_iterations_parameter(base_policy)
205257

206258
# Determine actual number of iterations to run
207259
mc_iters = self._get_monte_carlo_iterations(base_policy)
260+
logger.info(f"Running {mc_iters} Monte-Carlo iterations")
208261

209262
# Run analysis (serial or parallel)
210263
samples = self._run_capacity_analysis(scenario.network, base_policy, mc_iters)
211264

212265
# Build capacity envelopes from samples
213266
envelopes = self._build_capacity_envelopes(samples)
267+
logger.info(f"Generated {len(envelopes)} capacity envelopes")
214268

215269
# Store results in scenario
216270
scenario.results.put(self.name, "capacity_envelopes", envelopes)
271+
logger.info(f"Capacity envelope analysis completed: {self.name}")
217272

218273
def _get_failure_policy(self, scenario: "Scenario") -> "FailurePolicy | None":
219274
"""Get the failure policy to use for this analysis.
@@ -284,10 +339,15 @@ def _run_capacity_analysis(
284339
use_parallel = self.parallelism > 1 and mc_iters > 1
285340

286341
if use_parallel:
342+
logger.info(
343+
f"Running capacity analysis in parallel with {self.parallelism} workers"
344+
)
287345
self._run_parallel_analysis(network, policy, mc_iters, samples)
288346
else:
347+
logger.info("Running capacity analysis serially")
289348
self._run_serial_analysis(network, policy, mc_iters, samples)
290349

350+
logger.debug(f"Collected samples for {len(samples)} flow pairs")
291351
return samples
292352

293353
def _run_parallel_analysis(
@@ -307,6 +367,9 @@ def _run_parallel_analysis(
307367
"""
308368
# Limit workers to available iterations
309369
workers = min(self.parallelism, mc_iters)
370+
logger.info(
371+
f"Starting parallel analysis with {workers} workers for {mc_iters} iterations"
372+
)
310373

311374
# Build worker arguments
312375
worker_args = []
@@ -328,11 +391,56 @@ def _run_parallel_analysis(
328391
)
329392
)
330393

394+
logger.debug(f"Created {len(worker_args)} worker argument sets")
395+
331396
# Execute in parallel
397+
start_time = time.time()
398+
completed_tasks = 0
399+
400+
logger.debug(f"Submitting {len(worker_args)} tasks to process pool")
401+
logger.debug(
402+
f"Network size: {len(network.nodes)} nodes, {len(network.links)} links"
403+
)
404+
332405
with ProcessPoolExecutor(max_workers=workers) as pool:
333-
for result in pool.map(_worker, worker_args, chunksize=1):
334-
for src, dst, val in result:
335-
samples[(src, dst)].append(val)
406+
logger.debug(f"ProcessPoolExecutor created with {workers} workers")
407+
logger.info(f"Starting parallel execution of {mc_iters} iterations")
408+
409+
try:
410+
for result in pool.map(_worker, worker_args, chunksize=1):
411+
completed_tasks += 1
412+
413+
# Add results to samples
414+
result_count = len(result)
415+
for src, dst, val in result:
416+
samples[(src, dst)].append(val)
417+
418+
# Progress logging
419+
if (
420+
completed_tasks % max(1, mc_iters // 10) == 0
421+
): # Log every 10% completion
422+
logger.info(
423+
f"Parallel analysis progress: {completed_tasks}/{mc_iters} tasks completed"
424+
)
425+
logger.debug(
426+
f"Latest task produced {result_count} flow results"
427+
)
428+
429+
except Exception as e:
430+
logger.error(
431+
f"Error during parallel execution: {type(e).__name__}: {e}"
432+
)
433+
logger.debug(f"Failed after {completed_tasks} completed tasks")
434+
raise
435+
436+
elapsed_time = time.time() - start_time
437+
logger.info(f"Parallel analysis completed in {elapsed_time:.2f} seconds")
438+
logger.debug(
439+
f"Average time per iteration: {elapsed_time / mc_iters:.3f} seconds"
440+
)
441+
logger.debug(
442+
f"Total samples collected: {sum(len(vals) for vals in samples.values())}"
443+
)
336444

337445
def _run_serial_analysis(
338446
self,
@@ -349,10 +457,19 @@ def _run_serial_analysis(
349457
mc_iters: Number of Monte-Carlo iterations
350458
samples: Dictionary to accumulate results into
351459
"""
460+
logger.debug("Starting serial analysis")
461+
start_time = time.time()
462+
352463
for i in range(mc_iters):
464+
iter_start = time.time()
353465
seed_offset = None
354466
if self.seed is not None:
355467
seed_offset = self.seed + i
468+
logger.debug(
469+
f"Serial iteration {i + 1}/{mc_iters} with seed offset {seed_offset}"
470+
)
471+
else:
472+
logger.debug(f"Serial iteration {i + 1}/{mc_iters}")
356473

357474
_run_single_iteration(
358475
network,
@@ -366,6 +483,28 @@ def _run_serial_analysis(
366483
seed_offset,
367484
)
368485

486+
iter_time = time.time() - iter_start
487+
if mc_iters <= 10: # Log individual iteration times for small runs
488+
logger.debug(
489+
f"Serial iteration {i + 1} completed in {iter_time:.3f} seconds"
490+
)
491+
492+
if (
493+
mc_iters > 1 and (i + 1) % max(1, mc_iters // 10) == 0
494+
): # Log every 10% completion
495+
logger.info(
496+
f"Serial analysis progress: {i + 1}/{mc_iters} iterations completed"
497+
)
498+
avg_time = (time.time() - start_time) / (i + 1)
499+
logger.debug(f"Average iteration time so far: {avg_time:.3f} seconds")
500+
501+
elapsed_time = time.time() - start_time
502+
logger.info(f"Serial analysis completed in {elapsed_time:.2f} seconds")
503+
if mc_iters > 1:
504+
logger.debug(
505+
f"Average time per iteration: {elapsed_time / mc_iters:.3f} seconds"
506+
)
507+
369508
def _build_capacity_envelopes(
370509
self, samples: dict[tuple[str, str], list[float]]
371510
) -> dict[str, dict[str, Any]]:
@@ -377,9 +516,16 @@ def _build_capacity_envelopes(
377516
Returns:
378517
Dictionary mapping flow keys to serialized CapacityEnvelope data.
379518
"""
519+
logger.debug(f"Building capacity envelopes from {len(samples)} flow pairs")
380520
envelopes = {}
381521

382522
for (src_label, dst_label), capacity_values in samples.items():
523+
if not capacity_values:
524+
logger.warning(
525+
f"No capacity values found for flow {src_label}->{dst_label}"
526+
)
527+
continue
528+
383529
# Create capacity envelope
384530
envelope = CapacityEnvelope(
385531
source_pattern=self.source_path,
@@ -392,6 +538,16 @@ def _build_capacity_envelopes(
392538
flow_key = f"{src_label}->{dst_label}"
393539
envelopes[flow_key] = envelope.to_dict()
394540

541+
# Enhanced logging with statistics
542+
min_val = min(capacity_values)
543+
max_val = max(capacity_values)
544+
mean_val = sum(capacity_values) / len(capacity_values)
545+
logger.debug(
546+
f"Created envelope for {flow_key}: {len(capacity_values)} samples, "
547+
f"min={min_val:.2f}, max={max_val:.2f}, mean={mean_val:.2f}"
548+
)
549+
550+
logger.debug(f"Successfully created {len(envelopes)} capacity envelopes")
395551
return envelopes
396552

397553

0 commit comments

Comments
 (0)