Skip to content
18 changes: 16 additions & 2 deletions src/madengine/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,28 @@
import os
import json
import logging
import sys


# Utility function for optional verbose logging of configuration
def _log_config_info(message: str, force_print: bool = False):
"""Log configuration information either to logger or print if specified."""
# Keep --version/--help output clean even if MAD_VERBOSE_CONFIG=true.
if any(arg in {"--version", "-V", "--help", "-h"} for arg in sys.argv[1:]):
logging.debug(message)
return
if force_print or os.environ.get("MAD_VERBOSE_CONFIG", "").lower() == "true":
print(message)
else:
logging.debug(message)


def _is_lightweight_cli_invocation() -> bool:
"""Return True for metadata/help invocations that should avoid side effects."""
lightweight_flags = {"--version", "-V", "--help", "-h"}
return any(arg in lightweight_flags for arg in sys.argv[1:])


# third-party modules
from madengine.core.console import Console

Expand Down Expand Up @@ -65,9 +76,12 @@ def _setup_model_dir():
_log_config_info(f"Model dir: {MODEL_DIR} copied to current dir: {cwd_abs}")


# Only setup model directory if explicitly requested (when not just importing for constants)
# Only setup model directory if explicitly requested and invocation is not metadata-only.
if os.environ.get("MAD_SETUP_MODEL_DIR", "").lower() == "true":
_setup_model_dir()
if _is_lightweight_cli_invocation():
_log_config_info("Skipping MODEL_DIR setup for lightweight CLI invocation (--version/--help).")
else:
_setup_model_dir()

# madengine credentials configuration
CRED_FILE = "credential.json"
Expand Down
11 changes: 4 additions & 7 deletions src/madengine/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,6 @@ def get_gpu_renderD_nodes(self) -> typing.Optional[typing.List[int]]:
print(f"Warning: Failed to parse unique_id from line '{item}': {e}")
continue

if kfd_renderDs is None:
raise RuntimeError(
"KFD topology not accessible and required for ROCm < 6.4.1 GPU mapping. "
"Check permissions on /sys/devices/virtual/kfd/kfd/topology/nodes"
)

if len(kfd_unique_ids) != len(kfd_renderDs):
raise RuntimeError(
f"Mismatch between unique_ids count ({len(kfd_unique_ids)}) "
Expand Down Expand Up @@ -816,7 +810,10 @@ def get_gpu_renderD_nodes(self) -> typing.Optional[typing.List[int]]:
json_output = output

try:
data = json.loads(json_output)
# Use raw_decode so we tolerate any trailing non-JSON
# text (deprecation banners, double-emitted blocks under
# concurrent slurm tasks, stray newlines, etc.).
data, _end = json.JSONDecoder().raw_decode(json_output.lstrip())
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse amd-smi JSON output: {e}. Output was: {output[:200]}")

Expand Down
73 changes: 66 additions & 7 deletions src/madengine/deployment/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,60 @@ def _build_common_info_dict(
flatten_tags(result)
return result

def _select_best_multiple_results_csv(self, candidates: List[Path]) -> Optional[Path]:
"""Pick the CSV with the most non-empty performance entries.

In multi-node SLURM runs every node copies its local multi-results CSV
into job_dir/node_<rank>/. Only some nodes observe the final throughput
and populate the performance column; others have the file but with empty
values (e.g. master perf empty while a worker has the real numbers).
Ranking candidates by the count of non-empty performance rows lets
downstream aggregation use the richest data instead of depending on
node-0 winning the race or being non-empty. Header/row keys are stripped
so a leading space in the CSV header (some Primus configs) still matches.
Ties break on total row count; candidates[0] is the ultimate fallback.
"""
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
import csv as _csv
best_candidate: Optional[Path] = None
best_score = -1
best_rows = -1
for candidate in candidates:
non_empty_perf = 0
total_rows = 0
has_perf_column = False
try:
with open(candidate, "r", encoding="utf-8", errors="ignore") as f:
reader = _csv.DictReader(f)
fieldnames = reader.fieldnames or []
stripped_fields = [fn.strip() for fn in fieldnames]
has_perf_column = "performance" in stripped_fields
for row in reader:
total_rows += 1
if has_perf_column:
normalized_row = {(k.strip() if isinstance(k, str) else k): v for k, v in row.items()}
value = (normalized_row.get("performance") or "").strip()
if value:
non_empty_perf += 1
except Exception:
continue
score = non_empty_perf if has_perf_column else 0
if score > best_score or (score == best_score and total_rows > best_rows):
best_score = score
best_rows = total_rows
best_candidate = candidate
if best_candidate is None:
return candidates[0]
if best_score > 0:
self.console.print(
f"[dim] Selected multiple_results CSV with {best_score} non-empty performance rows: {best_candidate}[/dim]"
)
return best_candidate


def collect_results(self, deployment_id: str) -> Dict[str, Any]:
"""Collect performance results from SLURM output files.

Expand Down Expand Up @@ -1764,14 +1818,19 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]:
mult_res = model_info_for_entry.get("multiple_results")
if mult_res:
resolved_csv: Optional[Path] = None
# Multi-node: gather all node CSVs and pick the one with the most
# non-empty performance rows (master CSV may be empty while a worker
# holds the real numbers) instead of taking the first node that has
# the file.
candidates: List[Path] = []
if (job_dir / mult_res).is_file():
resolved_csv = job_dir / mult_res
else:
for i in range(self.nodes):
candidate = job_dir / f"node_{i}" / mult_res
if candidate.is_file():
resolved_csv = candidate
break
candidates.append(job_dir / mult_res)
for i in range(self.nodes):
per_node_candidate = job_dir / f"node_{i}" / mult_res
if per_node_candidate.is_file():
candidates.append(per_node_candidate)
if candidates:
resolved_csv = self._select_best_multiple_results_csv(candidates)
if not resolved_csv and Path(mult_res).is_file():
resolved_csv = Path(mult_res)
if not resolved_csv and Path("run_directory", mult_res).is_file():
Expand Down
Loading