diff --git a/docs/PLUGIN_DOC.md b/docs/PLUGIN_DOC.md
index cbc1425..8c355e0 100644
--- a/docs/PLUGIN_DOC.md
+++ b/docs/PLUGIN_DOC.md
@@ -22,7 +22,7 @@
| OsPlugin | sh -c '( lsb_release -ds || (cat /etc/*release | grep PRETTY_NAME) || uname -om ) 2>/dev/null | head -n1'
cat /etc/*release | grep VERSION_ID
wmic os get Version /value
wmic os get Caption /Value | **Analyzer Args:**
- `exp_os`: Union[str, list] — Expected OS name/version string(s) to match (e.g. from lsb_release or /etc/os-release).
- `exact_match`: bool — If True, require exact match for exp_os; otherwise substring match. | - | [OsDataModel](#OsDataModel-Model) | [OsCollector](#Collector-Class-OsCollector) | [OsAnalyzer](#Data-Analyzer-Class-OsAnalyzer) |
| PackagePlugin | dnf list --installed
dpkg-query -W
pacman -Q
cat /etc/*release
wmic product get name,version | **Analyzer Args:**
- `exp_package_ver`: Dict[str, Optional[str]] — Map package name -> expected version (None = any version). Checked against installed packages.
- `regex_match`: bool — If True, match package versions with regex; otherwise exact or prefix match.
- `rocm_regex`: Optional[str] — Optional regex to identify ROCm package version (used when enable_rocm_regex is True).
- `enable_rocm_regex`: bool — If True, use rocm_regex (or default pattern) to extract ROCm version for checks. | - | [PackageDataModel](#PackageDataModel-Model) | [PackageCollector](#Collector-Class-PackageCollector) | [PackageAnalyzer](#Data-Analyzer-Class-PackageAnalyzer) |
| PciePlugin | lspci -d {vendor_id}: -nn
lspci -x
lspci -xxxx
lspci -PP
lspci -PP -d {vendor_id}:{dev_id}
lspci -vvv
lspci -vvvt | **Analyzer Args:**
- `exp_speed`: int — Expected PCIe link speed (generation 1–5).
- `exp_width`: int — Expected PCIe link width in lanes (1–16).
- `exp_sriov_count`: int — Expected SR-IOV virtual function count.
- `exp_gpu_count_override`: Optional[int] — Override expected GPU count for validation.
- `exp_max_payload_size`: Union[Dict[int, int], int, NoneType] — Expected max payload size: int for all devices, or dict keyed by device ID.
- `exp_max_rd_req_size`: Union[Dict[int, int], int, NoneType] — Expected max read request size: int for all devices, or dict keyed by device ID.
- `exp_ten_bit_tag_req_en`: Union[Dict[int, int], int, NoneType] — Expected 10-bit tag request enable: int for all devices, or dict keyed by device ID. | - | [PcieDataModel](#PcieDataModel-Model) | [PcieCollector](#Collector-Class-PcieCollector) | [PcieAnalyzer](#Data-Analyzer-Class-PcieAnalyzer) |
-| ProcessPlugin | top -b -n 1
rocm-smi --showpids
top -b -n 1 -o %CPU | **Analyzer Args:**
- `max_kfd_processes`: int — Maximum allowed number of KFD (Kernel Fusion Driver) processes; 0 disables the check.
- `max_cpu_usage`: float — Maximum allowed CPU usage (percent) for process checks. | **Collection Args:**
- `top_n_process`: int — Number of top processes by CPU usage to collect (e.g. for top -b -n 1 -o %CPU). | [ProcessDataModel](#ProcessDataModel-Model) | [ProcessCollector](#Collector-Class-ProcessCollector) | [ProcessAnalyzer](#Data-Analyzer-Class-ProcessAnalyzer) |
+| ProcessPlugin | cat /proc/stat
shell loop over /proc/*/stat (with ``__SAMPLER__`` marker)
batched ``cat /proc//comm`` | **Analyzer Args:**
- `max_cpu_usage`: float — Maximum allowed aggregate CPU usage (percent). | **Collection Args:**
- `top_n_process`: int — Max process rows ranked by CPU share over the sample window.
- `sample_interval_seconds`: float — Wall seconds between two /proc samples (default 1.0). | [ProcessDataModel](#ProcessDataModel-Model) | [ProcessCollector](#Collector-Class-ProcessCollector) | [ProcessAnalyzer](#Data-Analyzer-Class-ProcessAnalyzer) |
| RdmaPlugin | rdma link -j
rdma dev
rdma link
rdma statistic -j | - | - | [RdmaDataModel](#RdmaDataModel-Model) | [RdmaCollector](#Collector-Class-RdmaCollector) | [RdmaAnalyzer](#Data-Analyzer-Class-RdmaAnalyzer) |
| RocmPlugin | {rocm_path}/opencl/bin/*/clinfo
env | grep -Ei 'rocm|hsa|hip|mpi|openmp|ucx|miopen'
ls /sys/class/kfd/kfd/proc/
grep -i -E 'rocm' /etc/ld.so.conf.d/*
{rocm_path}/bin/rocminfo
ls -v -d {rocm_path}*
ls -v -d {rocm_path}-[3-7]* | tail -1
ldconfig -p | grep -i -E 'rocm'
grep . -r {rocm_path}/.info/* | **Analyzer Args:**
- `exp_rocm`: Union[str, list] — Expected ROCm version string(s) to match (e.g. from rocminfo).
- `exp_rocm_latest`: str — Expected 'latest' ROCm path or version string for versioned installs.
- `exp_rocm_sub_versions`: dict[str, Union[str, list]] — Map sub-version name (e.g. version_rocm) to expected string or list of allowed strings. | **Collection Args:**
- `rocm_path`: str — Base path to ROCm installation (e.g. /opt/rocm). Used for rocminfo, clinfo, and version discovery. | [RocmDataModel](#RocmDataModel-Model) | [RocmCollector](#Collector-Class-RocmCollector) | [RocmAnalyzer](#Data-Analyzer-Class-RocmAnalyzer) |
| StoragePlugin | sh -c 'df -lH -B1 | grep -v 'boot''
wmic LogicalDisk Where DriveType="3" Get DeviceId,Size,FreeSpace | - | **Collection Args:**
- `skip_sudo`: bool — If True, do not use sudo when running df and related storage commands. | [StorageDataModel](#StorageDataModel-Model) | [StorageCollector](#Collector-Class-StorageCollector) | [StorageAnalyzer](#Data-Analyzer-Class-StorageAnalyzer) |
@@ -727,7 +727,7 @@ PcieDataModel
### Description
-Collect Process details
+Collect aggregate CPU usage and top processes from Linux ``/proc`` (two samples of ``/proc/stat`` and ``/proc//stat``; no ``top`` or ROCm SMI).
**Bases**: ['InBandDataCollector']
@@ -736,9 +736,9 @@ Collect Process details
### Class Variables
- **SUPPORTED_OS_FAMILY**: `{}`
-- **CMD_KFD**: `rocm-smi --showpids`
-- **CMD_CPU_USAGE**: `top -b -n 1`
-- **CMD_PROCESS**: `top -b -n 1 -o %CPU `
+- **CMD_PROC_STAT**: read aggregate CPU counters from ``/proc/stat``
+- **CMD_PROC_PID_STAT_DUMP**: shell loop dumping ``/proc//stat`` with ``__SAMPLER__`` marker
+- **CMD_PROC_COMM_BATCH**: batched ``comm`` reads; format with ``{pids}`` (space-separated PID list)
### Provides Data
@@ -746,9 +746,9 @@ ProcessDataModel
### Commands
-- top -b -n 1
-- rocm-smi --showpids
-- top -b -n 1 -o %CPU
+- ``CMD_PROC_STAT`` (`cat /proc/stat`)
+- ``CMD_PROC_PID_STAT_DUMP``
+- ``CMD_PROC_COMM_BATCH.format(pids=...)``
## Collector Class RdmaCollector
@@ -1250,9 +1250,8 @@ class for collection of PCIe data.
### Model annotations and fields
-- **kfd_process**: `Optional[int]`
-- **cpu_usage**: `Optional[float]`
-- **processes**: `Optional[list[tuple[str, str]]]`
+- **cpu_usage**: `Optional[float]` — Aggregate non-idle CPU percent over the sample window.
+- **processes**: `Optional[list[tuple[str, str]]]` — Up to ``top_n_process`` rows: ``(comm, cpu_share_percent_str)``.
## RdmaDataModel Model
@@ -1650,7 +1649,7 @@ Check PCIe Data for errors
### Description
-Check cpu and kfd processes are within allowed maximum cpu and gpu usage
+Check aggregate ``cpu_usage`` against ``max_cpu_usage`` (see [ProcessDataModel](#ProcessDataModel-Model)).
**Bases**: ['DataAnalyzer']
@@ -2004,8 +2003,7 @@ Arguments for PCIe analyzer
### Annotations / fields
-- **max_kfd_processes**: `int` — Maximum allowed number of KFD (Kernel Fusion Driver) processes; 0 disables the check.
-- **max_cpu_usage**: `float` — Maximum allowed CPU usage (percent) for process checks.
+- **max_cpu_usage**: `float` — Maximum allowed aggregate CPU usage (percent) for process checks.
## Analyzer Args Class RocmAnalyzerArgs
diff --git a/nodescraper/plugins/inband/process/analyzer_args.py b/nodescraper/plugins/inband/process/analyzer_args.py
index 39375ed..4a88506 100644
--- a/nodescraper/plugins/inband/process/analyzer_args.py
+++ b/nodescraper/plugins/inband/process/analyzer_args.py
@@ -31,23 +31,14 @@
class ProcessAnalyzerArgs(AnalyzerArgs):
- max_kfd_processes: int = Field(
- default=0,
- description="Maximum allowed number of KFD (Kernel Fusion Driver) processes; 0 disables the check.",
- )
max_cpu_usage: float = Field(
default=20.0,
- description="Maximum allowed CPU usage (percent) for process checks.",
+ description="Maximum allowed aggregate CPU usage (percent) for process checks.",
)
@classmethod
def build_from_model(cls, datamodel: ProcessDataModel) -> "ProcessAnalyzerArgs":
- """build analyzer args from data model
-
- Args:
- datamodel (ProcessDataModel): data model for plugin
-
- Returns:
- ProcessAnalyzerArgs: instance of analyzer args class
- """
- return cls(max_kfd_processes=datamodel.kfd_process, max_cpu_usage=datamodel.cpu_usage)
+ """Build analyzer args from collected process data (threshold defaults if cpu_usage unset)."""
+ if datamodel.cpu_usage is not None:
+ return cls(max_cpu_usage=float(datamodel.cpu_usage))
+ return cls()
diff --git a/nodescraper/plugins/inband/process/collector_args.py b/nodescraper/plugins/inband/process/collector_args.py
index 60926f8..4be88fd 100644
--- a/nodescraper/plugins/inband/process/collector_args.py
+++ b/nodescraper/plugins/inband/process/collector_args.py
@@ -31,5 +31,9 @@
class ProcessCollectorArgs(CollectorArgs):
top_n_process: int = Field(
default=10,
- description="Number of top processes by CPU usage to collect (e.g. for top -b -n 1 -o %CPU).",
+ description="Max process rows to return, ranked by CPU share over the sample window (from /proc).",
+ )
+ sample_interval_seconds: float = Field(
+ default=1.0,
+ description="Wall time between two /proc samples for CPU utilization (must be > 0; invalid values use 1.0).",
)
diff --git a/nodescraper/plugins/inband/process/process_analyzer.py b/nodescraper/plugins/inband/process/process_analyzer.py
index 8700f1c..7bfdfd3 100644
--- a/nodescraper/plugins/inband/process/process_analyzer.py
+++ b/nodescraper/plugins/inband/process/process_analyzer.py
@@ -34,7 +34,7 @@
class ProcessAnalyzer(DataAnalyzer[ProcessDataModel, ProcessAnalyzerArgs]):
- """Check cpu and kfd processes are within allowed maximum cpu and gpu usage"""
+ """Check aggregate ``cpu_usage`` against ``max_cpu_usage``."""
DATA_MODEL = ProcessDataModel
@@ -42,35 +42,19 @@ def analyze_data(
self, data: ProcessDataModel, args: Optional[ProcessAnalyzerArgs] = None
) -> TaskResult:
"""
- Analyze the process data to check if the number of KFD processes and CPU usage
- are within the allowed limits.
+ Analyze process data: compare aggregate CPU usage to the configured limit.
Args:
data (ProcessDataModel): The process data to analyze.
- args (Optional[ProcessAnalyzerArgs], optional): The process analysis arguments. Defaults to None.
+ args (Optional[ProcessAnalyzerArgs], optional): Analysis arguments. Defaults to None.
Returns:
- TaskResult: The result of the analysis, containing any events logged during the process.
+ TaskResult: The result of the analysis, including any logged events.
"""
if not args:
args = ProcessAnalyzerArgs()
- has_errors = False
- if data.kfd_process is not None and data.kfd_process > args.max_kfd_processes:
- has_errors = True
- self._log_event(
- category=EventCategory.OS,
- description=f"Kfd processes {data.kfd_process} exeed max limit {args.max_kfd_processes}",
- data={
- "kfd_process": data.kfd_process,
- "kfd_process_limit": args.max_kfd_processes,
- },
- priority=EventPriority.CRITICAL,
- console_log=True,
- )
-
if data.cpu_usage is not None and data.cpu_usage > args.max_cpu_usage:
- has_errors = True
self._log_event(
category=EventCategory.OS,
description=f"CPU usage {data.cpu_usage} exceeds limit {args.max_cpu_usage}",
@@ -81,8 +65,6 @@ def analyze_data(
priority=EventPriority.CRITICAL,
console_log=True,
)
-
- if has_errors:
self.result.status = ExecutionStatus.ERROR
self.result.message = "Process limits exceeded"
diff --git a/nodescraper/plugins/inband/process/process_collector.py b/nodescraper/plugins/inband/process/process_collector.py
index 9c92080..b613d2e 100644
--- a/nodescraper/plugins/inband/process/process_collector.py
+++ b/nodescraper/plugins/inband/process/process_collector.py
@@ -23,8 +23,8 @@
# SOFTWARE.
#
###############################################################################
-import re
-from typing import Optional
+import time
+from typing import Dict, List, Optional, Set, Tuple
from nodescraper.base import InBandDataCollector
from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily
@@ -34,82 +34,261 @@
from .processdata import ProcessDataModel
+def _parse_aggregate_cpu_from_proc_stat(proc_stat: str) -> Optional[Tuple[int, int]]:
+ """Aggregate ``cpu`` line from ``/proc/stat``: ``(total jiffies, idle+iowait)`` or ``None``."""
+ for line in proc_stat.splitlines():
+ if not line.startswith("cpu "):
+ continue
+ parts = line.split()
+ if len(parts) < 6:
+ return None
+ try:
+ nums = [int(parts[i]) for i in range(1, len(parts))]
+ except ValueError:
+ return None
+ idle = nums[3] + nums[4]
+ total = sum(nums)
+ return total, idle
+ return None
+
+
+def _global_non_idle_percent(total1: int, idle1: int, total2: int, idle2: int) -> float:
+ """Non-idle % over the interval between two aggregate cpu samples."""
+ dt = total2 - total1
+ if dt <= 0:
+ return 0.0
+ di = idle2 - idle1
+ pct = 100.0 * (1.0 - di / dt)
+ pct = max(0.0, min(100.0, pct))
+ return round(pct, 6)
+
+
+def _parse_proc_pid_stat(stat_line: str) -> Optional[Tuple[int, int]]:
+ """Parse ``/proc//stat`` body (without pid prefix). Return (pid, utime+stime)."""
+ stat_line = stat_line.strip()
+ if not stat_line:
+ return None
+ try:
+ paren_end = stat_line.index(") ")
+ except ValueError:
+ return None
+ head = stat_line[:paren_end]
+ pid_str = head.split(maxsplit=1)[0]
+ pid = int(pid_str)
+ rest = stat_line[paren_end + 2 :].split()
+ if len(rest) < 13:
+ return None
+ utime = int(rest[11])
+ stime = int(rest[12])
+ return pid, utime + stime
+
+
+def _parse_proc_stat_dump(dump_stdout: str) -> Tuple[Dict[int, int], Set[int]]:
+ """Parse bulk ``pid|statline`` lines; return (pid -> jiffies, sampler_pids to exclude).
+
+ The shell dump prefixes one line ``__SAMPLER__:`` (see ``ProcessCollector.CMD_PROC_PID_STAT_DUMP``) so we
+ can drop that subshell from rankings—it would otherwise show up as active CPU.
+ """
+ jiffies_by_pid: Dict[int, int] = {}
+ exclude: Set[int] = set()
+ for line in dump_stdout.splitlines():
+ stripped = line.strip()
+ if stripped.startswith("__SAMPLER__:"):
+ try:
+ exclude.add(int(stripped.split(":", 1)[1]))
+ except ValueError:
+ pass
+ continue
+ if "|" not in line:
+ continue
+ pid_str, stat_body = line.split("|", 1)
+ try:
+ prefix_pid = int(pid_str)
+ except ValueError:
+ continue
+ parsed = _parse_proc_pid_stat(stat_body)
+ if parsed is None:
+ continue
+ stat_pid, jf = parsed
+ if stat_pid != prefix_pid:
+ continue
+ jiffies_by_pid[prefix_pid] = jf
+ return jiffies_by_pid, exclude
+
+
+def _parse_comm_dump(dump_stdout: str) -> Dict[int, str]:
+ """Parse ``pid:comm`` lines from ``cat /proc//comm`` batch."""
+ out: Dict[int, str] = {}
+ for line in dump_stdout.splitlines():
+ if ":" not in line:
+ continue
+ pid_str, comm = line.split(":", 1)
+ try:
+ pid = int(pid_str)
+ except ValueError:
+ continue
+ out[pid] = comm.strip()
+ return out
+
+
+def _top_process_cpu_shares(
+ sample1: Dict[int, int],
+ sample2: Dict[int, int],
+ total_delta: int,
+ top_n: int,
+ exclude_pids: Set[int],
+) -> List[Tuple[int, float]]:
+ """Return up to ``top_n`` (pid, cpu_share_pct) sorted by jiffies delta descending."""
+ if top_n <= 0:
+ return []
+ all_pids = set(sample1) | set(sample2)
+ rows: List[Tuple[int, float, int]] = []
+ for pid in all_pids:
+ if pid in exclude_pids:
+ continue
+ j1 = sample1.get(pid, 0)
+ j2 = sample2.get(pid, 0)
+ delta = max(0, j2 - j1)
+ if total_delta > 0:
+ pct = 100.0 * delta / total_delta
+ else:
+ pct = 0.0
+ rows.append((pid, pct, delta))
+ rows.sort(key=lambda x: (x[2], x[1]), reverse=True)
+ return [(pid, pct) for pid, pct, _ in rows[:top_n]]
+
+
class ProcessCollector(InBandDataCollector[ProcessDataModel, ProcessCollectorArgs]):
- """Collect Process details"""
+ """Linux process list and aggregate CPU usage from ``/proc`` (two samples; no ``top`` or ROCm SMI)."""
- SUPPORTED_OS_FAMILY: set[OSFamily] = {OSFamily.LINUX}
+ SUPPORTED_OS_FAMILY: Set[OSFamily] = {OSFamily.LINUX}
DATA_MODEL = ProcessDataModel
- CMD_KFD = "rocm-smi --showpids"
- CMD_CPU_USAGE = "top -b -n 1"
- CMD_PROCESS = "top -b -n 1 -o %CPU "
+
+ CMD_PROC_STAT = "cat /proc/stat"
+ # Lead with ``__SAMPLER__:`` so we exclude that subshell from top-N rankings.
+ CMD_PROC_PID_STAT_DUMP = (
+ 'printf "__SAMPLER__:%s\\n" "$$"; '
+ "for f in /proc/[0-9]*/stat; do "
+ '[ -r "$f" ] || continue; '
+ 'pid="${f#/proc/}"; pid="${pid%/stat}"; '
+ '[ "$pid" = "$$" ] && continue; '
+ 'printf "%s|" "$pid"; cat "$f" 2>/dev/null || true; printf "\\n"; '
+ "done"
+ )
+ CMD_PROC_COMM_BATCH = (
+ "for p in {pids}; do "
+ 'printf "%s:" "$p"; '
+ "cat /proc/$p/comm 2>/dev/null || true; "
+ 'printf "\\n"; '
+ "done"
+ )
+
+ def _collect_procfs_cpu(
+ self,
+ top_n_process: int,
+ sample_interval_seconds: float,
+ ) -> Tuple[Optional[float], List[Tuple[str, str]]]:
+ """Return (cpu_usage, processes) from procfs; (None, []) on failure."""
+ stat1 = self._run_sut_cmd(self.CMD_PROC_STAT)
+ if stat1.exit_code != 0:
+ return None, []
+ dump1 = self._run_sut_cmd(self.CMD_PROC_PID_STAT_DUMP)
+ if dump1.exit_code != 0:
+ return None, []
+
+ time.sleep(sample_interval_seconds)
+
+ stat2 = self._run_sut_cmd(self.CMD_PROC_STAT)
+ if stat2.exit_code != 0:
+ return None, []
+ dump2 = self._run_sut_cmd(self.CMD_PROC_PID_STAT_DUMP)
+ if dump2.exit_code != 0:
+ return None, []
+
+ agg1 = _parse_aggregate_cpu_from_proc_stat(stat1.stdout)
+ agg2 = _parse_aggregate_cpu_from_proc_stat(stat2.stdout)
+ if agg1 is None:
+ self._log_event(
+ category=EventCategory.OS,
+ description="Could not parse aggregate cpu line from /proc/stat (first sample)",
+ data={"proc_stat_preview": stat1.stdout[:200]},
+ priority=EventPriority.ERROR,
+ )
+ return None, []
+ if agg2 is None:
+ self._log_event(
+ category=EventCategory.OS,
+ description="Could not parse aggregate cpu line from /proc/stat (second sample)",
+ data={"proc_stat_preview": stat2.stdout[:200]},
+ priority=EventPriority.ERROR,
+ )
+ return None, []
+
+ total1, idle1 = agg1
+ total2, idle2 = agg2
+ cpu_usage = _global_non_idle_percent(total1, idle1, total2, idle2)
+ dt = total2 - total1
+ t_delta = dt if dt > 0 else 0
+
+ sample1, excl1 = _parse_proc_stat_dump(dump1.stdout)
+ sample2, excl2 = _parse_proc_stat_dump(dump2.stdout)
+ exclude = excl1 | excl2
+
+ top_list = _top_process_cpu_shares(sample1, sample2, t_delta, top_n_process, exclude)
+ pids = [pid for pid, _ in top_list]
+ if not pids:
+ processes: List[Tuple[str, str]] = []
+ else:
+ pid_str = " ".join(str(p) for p in pids)
+ comm_res = self._run_sut_cmd(self.CMD_PROC_COMM_BATCH.format(pids=pid_str))
+ if comm_res.exit_code != 0:
+ comm_map: Dict[int, str] = {}
+ else:
+ comm_map = _parse_comm_dump(comm_res.stdout)
+ processes = []
+ for pid, pct in top_list:
+ name = comm_map.get(pid, f"pid_{pid}")
+ processes.append((name, f"{pct:.1f}"))
+
+ return cpu_usage, processes
def collect_data(
self, args: Optional[ProcessCollectorArgs] = None
- ) -> tuple[TaskResult, Optional[ProcessDataModel]]:
- """Collect process data from the system.
+ ) -> Tuple[TaskResult, Optional[ProcessDataModel]]:
+ """Read aggregate CPU usage and top processes from Linux ``/proc``.
Args:
- args (Optional[ProcessCollectorArgs], optional): process collection arguments. Defaults to None.
+ args (Optional[ProcessCollectorArgs], optional): ``top_n_process`` and
+ ``sample_interval_seconds``. Defaults to ``ProcessCollectorArgs()``.
Returns:
- tuple[TaskResult, Optional[ProcessDataModel]]: tuple containing the task result and the collected process data model or None if no data was collected.
+ tuple containing the task result and ``ProcessDataModel`` or None if collection failed.
"""
if args is None:
args = ProcessCollectorArgs()
+ sample_interval_seconds = args.sample_interval_seconds
+ if sample_interval_seconds <= 0:
+ sample_interval_seconds = 1.0
+
process_data = ProcessDataModel()
- process_data.processes = []
- kfd_process = self._run_sut_cmd(self.CMD_KFD)
- if kfd_process.exit_code == 0:
- if "No KFD PIDs currently running" in kfd_process.stdout:
- process_data.kfd_process = 0
- else:
- kfd_process = re.findall(
- r"^\s*\d+\s+[\w]+\s+\d+\s+\d+\s+\d+\s+\d+",
- kfd_process.stdout,
- re.MULTILINE,
- )
- process_data.kfd_process = len(kfd_process)
-
- cpu_usage = self._run_sut_cmd(self.CMD_CPU_USAGE)
- if cpu_usage.exit_code == 0:
- cpu_idle = (
- [line for line in cpu_usage.stdout.splitlines() if "Cpu(s)" in line][0]
- .split(",")[3]
- .split()[0]
- .replace("%id", "")
- )
- process_data.cpu_usage = 100 - float(cpu_idle)
-
- processes = self._run_sut_cmd(
- f"self.CMD_PROCESS | sed -n '8,{args.top_n_process + 7}p'"
- ) # Remove system header
- if processes.exit_code == 0:
- for line in processes.stdout.splitlines():
- columns = line.split()
- process_cpu_usage = columns[8]
- process_name = columns[11]
- process_data.processes.append((process_name, process_cpu_usage))
-
- process_check = bool(process_data.model_fields_set)
- if process_check:
- self._log_event(
- category="PROCESS_READ",
- description="Process data collected",
- data=process_data.model_dump(),
- priority=EventPriority.INFO,
- )
+ cpu_usage, processes = self._collect_procfs_cpu(args.top_n_process, sample_interval_seconds)
+ if cpu_usage is not None:
+ process_data.cpu_usage = cpu_usage
+ process_data.processes = processes
+
+ if process_data.cpu_usage is not None:
self.result.message = "Process data collected"
self.result.status = ExecutionStatus.OK
return self.result, process_data
- else:
- self._log_event(
- category=EventCategory.OS,
- description="Process data not found",
- priority=EventPriority.ERROR,
- )
- self.result.message = "Process data not found"
- self.result.status = ExecutionStatus.ERROR
- return self.result, None
+
+ self._log_event(
+ category=EventCategory.OS,
+ description="Process data not found",
+ priority=EventPriority.ERROR,
+ )
+ self.result.message = "Process data not found"
+ self.result.status = ExecutionStatus.EXECUTION_FAILURE
+ return self.result, None
diff --git a/nodescraper/plugins/inband/process/processdata.py b/nodescraper/plugins/inband/process/processdata.py
index 683ad94..aea551c 100644
--- a/nodescraper/plugins/inband/process/processdata.py
+++ b/nodescraper/plugins/inband/process/processdata.py
@@ -23,12 +23,13 @@
# SOFTWARE.
#
###############################################################################
-from typing import Optional
+from typing import List, Optional, Tuple
from nodescraper.models import DataModel
class ProcessDataModel(DataModel):
- kfd_process: Optional[int] = None
+ """Aggregate CPU usage and top processes from ``/proc`` sampling."""
+
cpu_usage: Optional[float] = None
- processes: Optional[list[tuple[str, str]]] = None
+ processes: Optional[List[Tuple[str, str]]] = None
diff --git a/test/functional/fixtures/process_plugin_config.json b/test/functional/fixtures/process_plugin_config.json
index 752da78..ed7009b 100644
--- a/test/functional/fixtures/process_plugin_config.json
+++ b/test/functional/fixtures/process_plugin_config.json
@@ -3,7 +3,6 @@
"plugins": {
"ProcessPlugin": {
"analysis_args": {
- "max_kfd_processes": 0,
"max_cpu_usage": 20.0
}
}
diff --git a/test/unit/plugin/test_analyzer_args_build_from_model.py b/test/unit/plugin/test_analyzer_args_build_from_model.py
index e6eb748..1589fb4 100644
--- a/test/unit/plugin/test_analyzer_args_build_from_model.py
+++ b/test/unit/plugin/test_analyzer_args_build_from_model.py
@@ -185,13 +185,15 @@ def test_sysctl_analyzer_args_build_from_model():
def test_process_analyzer_args_build_from_model():
"""Test ProcessAnalyzerArgs.build_from_model includes all fields"""
- datamodel = ProcessDataModel(kfd_process=5, cpu_usage=15.5)
+ datamodel = ProcessDataModel(cpu_usage=15.5)
args = ProcessAnalyzerArgs.build_from_model(datamodel)
assert isinstance(args, ProcessAnalyzerArgs)
- assert args.max_kfd_processes == 5
assert args.max_cpu_usage == 15.5
+ args_default = ProcessAnalyzerArgs.build_from_model(ProcessDataModel())
+ assert args_default.max_cpu_usage == 20.0
+
def test_kernel_module_analyzer_args_build_from_model():
"""Test KernelModuleAnalyzerArgs.build_from_model includes all fields"""
diff --git a/test/unit/plugin/test_process_analyzer.py b/test/unit/plugin/test_process_analyzer.py
index e7bbb86..46221c0 100644
--- a/test/unit/plugin/test_process_analyzer.py
+++ b/test/unit/plugin/test_process_analyzer.py
@@ -38,21 +38,17 @@
@pytest.fixture
def model_obj():
return ProcessDataModel(
- kfd_process=0,
- cpu_usage=10,
+ cpu_usage=10.0,
processes=[
- ("top", "10.0"),
+ ("ksoftirqd/0", "10.0"),
("systemd", "0.0"),
- ("kthreadd", "0.0"),
- ("rcu_gp", "0.0"),
- ("rcu_par_gp", "0.0"),
],
)
@pytest.fixture
def config():
- return {"max_kfd_processes": 0, "max_cpu_usage": 40}
+ return {"max_cpu_usage": 40.0}
@pytest.fixture
@@ -61,9 +57,7 @@ def analyzer(system_info):
def test_nominal_with_config(analyzer, model_obj, config):
- args = ProcessAnalyzerArgs(
- max_kfd_processes=config["max_kfd_processes"], max_cpu_usage=config["max_cpu_usage"]
- )
+ args = ProcessAnalyzerArgs(max_cpu_usage=config["max_cpu_usage"])
result = analyzer.analyze_data(model_obj, args)
assert result.status == ExecutionStatus.OK
assert len(result.events) == 0
@@ -75,23 +69,9 @@ def test_nominal_no_config(analyzer, model_obj):
assert len(result.events) == 0
-def test_error_kfd_process(analyzer, model_obj, config):
- modified_model_obj = copy.deepcopy(model_obj)
- modified_model_obj.kfd_process = 1
- args = ProcessAnalyzerArgs(
- max_kfd_processes=config["max_kfd_processes"], max_cpu_usage=config["max_cpu_usage"]
- )
- result = analyzer.analyze_data(modified_model_obj, args)
-
- assert result.status == ExecutionStatus.ERROR
- for event in result.events:
- assert event.category == EventCategory.OS.value
- assert event.priority == EventPriority.CRITICAL
-
-
def test_error_cpu_usage(analyzer, model_obj, config):
modified_model_obj = copy.deepcopy(model_obj)
- args = ProcessAnalyzerArgs(max_kfd_processes=config["max_kfd_processes"], max_cpu_usage=5)
+ args = ProcessAnalyzerArgs(max_cpu_usage=5.0)
result = analyzer.analyze_data(modified_model_obj, args)
assert result.status == ExecutionStatus.ERROR
diff --git a/test/unit/plugin/test_process_collector.py b/test/unit/plugin/test_process_collector.py
index cae8d6f..7764078 100644
--- a/test/unit/plugin/test_process_collector.py
+++ b/test/unit/plugin/test_process_collector.py
@@ -23,17 +23,49 @@
# SOFTWARE.
#
###############################################################################
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
import pytest
+from nodescraper.enums.eventcategory import EventCategory
from nodescraper.enums.executionstatus import ExecutionStatus
from nodescraper.enums.systeminteraction import SystemInteractionLevel
from nodescraper.interfaces.task import SystemCompatibilityError
from nodescraper.models.systeminfo import OSFamily
-from nodescraper.plugins.inband.process.process_collector import ProcessCollector
+from nodescraper.plugins.inband.process.process_collector import (
+ ProcessCollector,
+ _global_non_idle_percent,
+ _parse_aggregate_cpu_from_proc_stat,
+ _parse_comm_dump,
+ _parse_proc_pid_stat,
+ _parse_proc_stat_dump,
+ _top_process_cpu_shares,
+)
from nodescraper.plugins.inband.process.processdata import ProcessDataModel
+PROC_STAT_1 = (
+ "cpu 100 0 0 900 0 0 0 0 0 0\n"
+ "cpu0 55 0 0 495 0 0 0 0 0 0\n"
+ "cpu1 45 0 0 405 0 0 0 0 0 0\n"
+)
+PROC_STAT_2 = (
+ "cpu 200 0 0 1800 0 0 0 0 0 0\n"
+ "cpu0 110 0 0 990 0 0 0 0 0 0\n"
+ "cpu1 90 0 0 810 0 0 0 0 0 0\n"
+)
+
+DUMP_1 = (
+ "__SAMPLER__:99999\n"
+ "1000|1000 (ksoftirqd/0) S 0 0 0 0 -1 69238848 0 0 0 0 5000 6000\n"
+ "1|1 (systemd) S 0 1 1 0 -1 4194560 100 200 300 400 5000 6000\n"
+)
+
+DUMP_2 = (
+ "__SAMPLER__:99999\n"
+ "1000|1000 (ksoftirqd/0) S 0 0 0 0 -1 69238848 0 0 0 0 5100 6000\n"
+ "1|1 (systemd) S 0 1 1 0 -1 4194560 100 200 300 400 5000 6000\n"
+)
+
@pytest.fixture
def collector(system_info, conn_mock):
@@ -44,34 +76,43 @@ def collector(system_info, conn_mock):
)
-def test_run_linux(collector, conn_mock):
+def _command_runner():
+ proc_stat_calls = 0
+ dump_calls = 0
+
+ def run_command(command, sudo=False, timeout=300, strip=True):
+ nonlocal proc_stat_calls, dump_calls
+ if command == "cat /proc/stat":
+ proc_stat_calls += 1
+ body = PROC_STAT_1 if proc_stat_calls == 1 else PROC_STAT_2
+ return MagicMock(exit_code=0, stdout=body, stderr="", command=command)
+ if "for f in /proc/" in command and "__SAMPLER__" in command:
+ dump_calls += 1
+ body = DUMP_1 if dump_calls == 1 else DUMP_2
+ return MagicMock(exit_code=0, stdout=body, stderr="", command=command)
+ if "cat /proc/$p/comm" in command:
+ return MagicMock(
+ exit_code=0,
+ stdout="1000:ksoftirqd/0\n1:systemd\n",
+ stderr="",
+ command=command,
+ )
+ raise AssertionError(f"unexpected command: {command!r}")
+
+ return run_command
+
+
+@patch("nodescraper.plugins.inband.process.process_collector.time.sleep")
+def test_run_linux_procfs(mock_sleep, collector, conn_mock):
collector.system_info.os_family = OSFamily.LINUX
- conn_mock.run_command.side_effect = [
- MagicMock(
- exit_code=0,
- stdout="PID PROCESS NAME GPU(s) VRAM USED SDMA USED CU OCCUPANCY\n8246 TransferBench 8 2267283456 0 0",
- stderr="",
- ),
- MagicMock(
- exit_code=0,
- stdout="%Cpu(s): 0.1 us, 0.1 sy, 0.0 ni, 90.0 id",
- stderr="",
- ),
- MagicMock(
- exit_code=0,
- stdout="356817 user 20 0 32112 14196 10556 R 10.0 0.0 0:00.07 top\n"
- "1 root 20 0 166596 11916 8316 S 0.0 0.0 1:32.14 systemd",
- stderr="",
- ),
- ]
+ conn_mock.run_command.side_effect = _command_runner()
result, data = collector.collect_data()
assert result.status == ExecutionStatus.OK
assert data == ProcessDataModel(
- kfd_process=1,
- cpu_usage=10,
+ cpu_usage=10.0,
processes=[
- ("top", "10.0"),
+ ("ksoftirqd/0", "10.0"),
("systemd", "0.0"),
],
)
@@ -87,14 +128,77 @@ def test_unsupported_platform(system_info, conn_mock):
)
-def test_exit_failure(collector, conn_mock):
+@patch("nodescraper.plugins.inband.process.process_collector.time.sleep")
+def test_exit_failure_procfs(mock_sleep, collector, conn_mock):
collector.system_info.os_family = OSFamily.LINUX
conn_mock.run_command.side_effect = [
- MagicMock(exit_code=1, stdout="", stderr=""),
- MagicMock(exit_code=0, stdout="", stderr=""),
- MagicMock(exit_code=0, stdout="", stderr=""),
+ MagicMock(exit_code=1, stdout="", stderr="no proc", command="cat /proc/stat"),
]
result, data = collector.collect_data()
assert result.status == ExecutionStatus.EXECUTION_FAILURE
assert data is None
+
+
+@patch("nodescraper.plugins.inband.process.process_collector.time.sleep")
+def test_invalid_proc_stat_logs_os_event(mock_sleep, collector, conn_mock):
+ collector.system_info.os_family = OSFamily.LINUX
+ conn_mock.run_command.side_effect = [
+ MagicMock(
+ exit_code=0, stdout="not a valid proc_stat\n", stderr="", command="cat /proc/stat"
+ ),
+ MagicMock(exit_code=0, stdout="__SAMPLER__:1\n", stderr="", command="stat-dump-1"),
+ MagicMock(exit_code=0, stdout="still bad\n", stderr="", command="cat /proc/stat"),
+ MagicMock(exit_code=0, stdout="__SAMPLER__:1\n", stderr="", command="stat-dump-2"),
+ ]
+ with patch.object(collector, "_log_event") as mock_log:
+ result, data = collector.collect_data()
+
+ assert result.status == ExecutionStatus.EXECUTION_FAILURE
+ assert data is None
+ os_parse_logs = [
+ c
+ for c in mock_log.call_args_list
+ if c.kwargs.get("category") == EventCategory.OS
+ and "aggregate cpu" in (c.kwargs.get("description") or "")
+ ]
+ assert os_parse_logs, "Expected OS event for unparseable /proc/stat aggregate cpu line"
+
+
+class TestProcessCollectorProcfsParsing:
+ def test_parse_aggregate_cpu_from_proc_stat(self):
+ text = "cpu0 1 2 3 4 5 6 7 8\n" "cpu 100 0 0 900 0 0 0 0 0 0\n"
+ assert _parse_aggregate_cpu_from_proc_stat(text) == (1000, 900)
+
+ def test_parse_aggregate_cpu_from_proc_stat_invalid(self):
+ assert _parse_aggregate_cpu_from_proc_stat("cpu 1 2\n") is None
+ assert _parse_aggregate_cpu_from_proc_stat("no aggregate line\n") is None
+
+ def test_global_non_idle_percent(self):
+ t1, i1 = 1000, 900
+ t2, i2 = 2000, 1800
+ assert _global_non_idle_percent(t1, i1, t2, i2) == 10.0
+
+ def test_parse_proc_pid_stat(self):
+ stat_line = DUMP_1.splitlines()[1].split("|", 1)[1]
+ assert _parse_proc_pid_stat(stat_line) == (1000, 11000)
+
+ def test_parse_proc_stat_dump(self):
+ dump = "__SAMPLER__:12345\n" + "\n".join(DUMP_1.splitlines()[1:])
+ jf, excl = _parse_proc_stat_dump(dump)
+ assert excl == {12345}
+ assert jf[1000] == 11000
+ assert jf[1] == 11000
+
+ def test_top_process_cpu_shares(self):
+ s1 = {1: 100, 2: 200}
+ s2 = {1: 100, 2: 400}
+ top = _top_process_cpu_shares(s1, s2, total_delta=1000, top_n=2, exclude_pids=set())
+ assert top[0][0] == 2
+ assert top[0][1] == 20.0
+ assert top[1][1] == 0.0
+
+ def test_parse_comm_dump(self):
+ m = _parse_comm_dump("1000:ksoftirqd/0\n1:systemd\n")
+ assert m[1000] == "ksoftirqd/0"
+ assert m[1] == "systemd"