diff --git a/docs/PLUGIN_DOC.md b/docs/PLUGIN_DOC.md index cbc1425..8c355e0 100644 --- a/docs/PLUGIN_DOC.md +++ b/docs/PLUGIN_DOC.md @@ -22,7 +22,7 @@ | OsPlugin | sh -c '( lsb_release -ds || (cat /etc/*release | grep PRETTY_NAME) || uname -om ) 2>/dev/null | head -n1'
cat /etc/*release | grep VERSION_ID
wmic os get Version /value
wmic os get Caption /Value | **Analyzer Args:**
- `exp_os`: Union[str, list] — Expected OS name/version string(s) to match (e.g. from lsb_release or /etc/os-release).
- `exact_match`: bool — If True, require exact match for exp_os; otherwise substring match. | - | [OsDataModel](#OsDataModel-Model) | [OsCollector](#Collector-Class-OsCollector) | [OsAnalyzer](#Data-Analyzer-Class-OsAnalyzer) | | PackagePlugin | dnf list --installed
dpkg-query -W
pacman -Q
cat /etc/*release
wmic product get name,version | **Analyzer Args:**
- `exp_package_ver`: Dict[str, Optional[str]] — Map package name -> expected version (None = any version). Checked against installed packages.
- `regex_match`: bool — If True, match package versions with regex; otherwise exact or prefix match.
- `rocm_regex`: Optional[str] — Optional regex to identify ROCm package version (used when enable_rocm_regex is True).
- `enable_rocm_regex`: bool — If True, use rocm_regex (or default pattern) to extract ROCm version for checks. | - | [PackageDataModel](#PackageDataModel-Model) | [PackageCollector](#Collector-Class-PackageCollector) | [PackageAnalyzer](#Data-Analyzer-Class-PackageAnalyzer) | | PciePlugin | lspci -d {vendor_id}: -nn
lspci -x
lspci -xxxx
lspci -PP
lspci -PP -d {vendor_id}:{dev_id}
lspci -vvv
lspci -vvvt | **Analyzer Args:**
- `exp_speed`: int — Expected PCIe link speed (generation 1–5).
- `exp_width`: int — Expected PCIe link width in lanes (1–16).
- `exp_sriov_count`: int — Expected SR-IOV virtual function count.
- `exp_gpu_count_override`: Optional[int] — Override expected GPU count for validation.
- `exp_max_payload_size`: Union[Dict[int, int], int, NoneType] — Expected max payload size: int for all devices, or dict keyed by device ID.
- `exp_max_rd_req_size`: Union[Dict[int, int], int, NoneType] — Expected max read request size: int for all devices, or dict keyed by device ID.
- `exp_ten_bit_tag_req_en`: Union[Dict[int, int], int, NoneType] — Expected 10-bit tag request enable: int for all devices, or dict keyed by device ID. | - | [PcieDataModel](#PcieDataModel-Model) | [PcieCollector](#Collector-Class-PcieCollector) | [PcieAnalyzer](#Data-Analyzer-Class-PcieAnalyzer) | -| ProcessPlugin | top -b -n 1
rocm-smi --showpids
top -b -n 1 -o %CPU | **Analyzer Args:**
- `max_kfd_processes`: int — Maximum allowed number of KFD (Kernel Fusion Driver) processes; 0 disables the check.
- `max_cpu_usage`: float — Maximum allowed CPU usage (percent) for process checks. | **Collection Args:**
- `top_n_process`: int — Number of top processes by CPU usage to collect (e.g. for top -b -n 1 -o %CPU). | [ProcessDataModel](#ProcessDataModel-Model) | [ProcessCollector](#Collector-Class-ProcessCollector) | [ProcessAnalyzer](#Data-Analyzer-Class-ProcessAnalyzer) | +| ProcessPlugin | cat /proc/stat
shell loop over /proc/*/stat (with ``__SAMPLER__`` marker)
batched ``cat /proc//comm`` | **Analyzer Args:**
- `max_cpu_usage`: float — Maximum allowed aggregate CPU usage (percent). | **Collection Args:**
- `top_n_process`: int — Max process rows ranked by CPU share over the sample window.
- `sample_interval_seconds`: float — Wall seconds between two /proc samples (default 1.0). | [ProcessDataModel](#ProcessDataModel-Model) | [ProcessCollector](#Collector-Class-ProcessCollector) | [ProcessAnalyzer](#Data-Analyzer-Class-ProcessAnalyzer) | | RdmaPlugin | rdma link -j
rdma dev
rdma link
rdma statistic -j | - | - | [RdmaDataModel](#RdmaDataModel-Model) | [RdmaCollector](#Collector-Class-RdmaCollector) | [RdmaAnalyzer](#Data-Analyzer-Class-RdmaAnalyzer) | | RocmPlugin | {rocm_path}/opencl/bin/*/clinfo
env | grep -Ei 'rocm|hsa|hip|mpi|openmp|ucx|miopen'
ls /sys/class/kfd/kfd/proc/
grep -i -E 'rocm' /etc/ld.so.conf.d/*
{rocm_path}/bin/rocminfo
ls -v -d {rocm_path}*
ls -v -d {rocm_path}-[3-7]* | tail -1
ldconfig -p | grep -i -E 'rocm'
grep . -r {rocm_path}/.info/* | **Analyzer Args:**
- `exp_rocm`: Union[str, list] — Expected ROCm version string(s) to match (e.g. from rocminfo).
- `exp_rocm_latest`: str — Expected 'latest' ROCm path or version string for versioned installs.
- `exp_rocm_sub_versions`: dict[str, Union[str, list]] — Map sub-version name (e.g. version_rocm) to expected string or list of allowed strings. | **Collection Args:**
- `rocm_path`: str — Base path to ROCm installation (e.g. /opt/rocm). Used for rocminfo, clinfo, and version discovery. | [RocmDataModel](#RocmDataModel-Model) | [RocmCollector](#Collector-Class-RocmCollector) | [RocmAnalyzer](#Data-Analyzer-Class-RocmAnalyzer) | | StoragePlugin | sh -c 'df -lH -B1 | grep -v 'boot''
wmic LogicalDisk Where DriveType="3" Get DeviceId,Size,FreeSpace | - | **Collection Args:**
- `skip_sudo`: bool — If True, do not use sudo when running df and related storage commands. | [StorageDataModel](#StorageDataModel-Model) | [StorageCollector](#Collector-Class-StorageCollector) | [StorageAnalyzer](#Data-Analyzer-Class-StorageAnalyzer) | @@ -727,7 +727,7 @@ PcieDataModel ### Description -Collect Process details +Collect aggregate CPU usage and top processes from Linux ``/proc`` (two samples of ``/proc/stat`` and ``/proc//stat``; no ``top`` or ROCm SMI). **Bases**: ['InBandDataCollector'] @@ -736,9 +736,9 @@ Collect Process details ### Class Variables - **SUPPORTED_OS_FAMILY**: `{}` -- **CMD_KFD**: `rocm-smi --showpids` -- **CMD_CPU_USAGE**: `top -b -n 1` -- **CMD_PROCESS**: `top -b -n 1 -o %CPU ` +- **CMD_PROC_STAT**: read aggregate CPU counters from ``/proc/stat`` +- **CMD_PROC_PID_STAT_DUMP**: shell loop dumping ``/proc//stat`` with ``__SAMPLER__`` marker +- **CMD_PROC_COMM_BATCH**: batched ``comm`` reads; format with ``{pids}`` (space-separated PID list) ### Provides Data @@ -746,9 +746,9 @@ ProcessDataModel ### Commands -- top -b -n 1 -- rocm-smi --showpids -- top -b -n 1 -o %CPU +- ``CMD_PROC_STAT`` (`cat /proc/stat`) +- ``CMD_PROC_PID_STAT_DUMP`` +- ``CMD_PROC_COMM_BATCH.format(pids=...)`` ## Collector Class RdmaCollector @@ -1250,9 +1250,8 @@ class for collection of PCIe data. ### Model annotations and fields -- **kfd_process**: `Optional[int]` -- **cpu_usage**: `Optional[float]` -- **processes**: `Optional[list[tuple[str, str]]]` +- **cpu_usage**: `Optional[float]` — Aggregate non-idle CPU percent over the sample window. +- **processes**: `Optional[list[tuple[str, str]]]` — Up to ``top_n_process`` rows: ``(comm, cpu_share_percent_str)``. ## RdmaDataModel Model @@ -1650,7 +1649,7 @@ Check PCIe Data for errors ### Description -Check cpu and kfd processes are within allowed maximum cpu and gpu usage +Check aggregate ``cpu_usage`` against ``max_cpu_usage`` (see [ProcessDataModel](#ProcessDataModel-Model)). **Bases**: ['DataAnalyzer'] @@ -2004,8 +2003,7 @@ Arguments for PCIe analyzer ### Annotations / fields -- **max_kfd_processes**: `int` — Maximum allowed number of KFD (Kernel Fusion Driver) processes; 0 disables the check. -- **max_cpu_usage**: `float` — Maximum allowed CPU usage (percent) for process checks. +- **max_cpu_usage**: `float` — Maximum allowed aggregate CPU usage (percent) for process checks. ## Analyzer Args Class RocmAnalyzerArgs diff --git a/nodescraper/plugins/inband/process/analyzer_args.py b/nodescraper/plugins/inband/process/analyzer_args.py index 39375ed..4a88506 100644 --- a/nodescraper/plugins/inband/process/analyzer_args.py +++ b/nodescraper/plugins/inband/process/analyzer_args.py @@ -31,23 +31,14 @@ class ProcessAnalyzerArgs(AnalyzerArgs): - max_kfd_processes: int = Field( - default=0, - description="Maximum allowed number of KFD (Kernel Fusion Driver) processes; 0 disables the check.", - ) max_cpu_usage: float = Field( default=20.0, - description="Maximum allowed CPU usage (percent) for process checks.", + description="Maximum allowed aggregate CPU usage (percent) for process checks.", ) @classmethod def build_from_model(cls, datamodel: ProcessDataModel) -> "ProcessAnalyzerArgs": - """build analyzer args from data model - - Args: - datamodel (ProcessDataModel): data model for plugin - - Returns: - ProcessAnalyzerArgs: instance of analyzer args class - """ - return cls(max_kfd_processes=datamodel.kfd_process, max_cpu_usage=datamodel.cpu_usage) + """Build analyzer args from collected process data (threshold defaults if cpu_usage unset).""" + if datamodel.cpu_usage is not None: + return cls(max_cpu_usage=float(datamodel.cpu_usage)) + return cls() diff --git a/nodescraper/plugins/inband/process/collector_args.py b/nodescraper/plugins/inband/process/collector_args.py index 60926f8..4be88fd 100644 --- a/nodescraper/plugins/inband/process/collector_args.py +++ b/nodescraper/plugins/inband/process/collector_args.py @@ -31,5 +31,9 @@ class ProcessCollectorArgs(CollectorArgs): top_n_process: int = Field( default=10, - description="Number of top processes by CPU usage to collect (e.g. for top -b -n 1 -o %CPU).", + description="Max process rows to return, ranked by CPU share over the sample window (from /proc).", + ) + sample_interval_seconds: float = Field( + default=1.0, + description="Wall time between two /proc samples for CPU utilization (must be > 0; invalid values use 1.0).", ) diff --git a/nodescraper/plugins/inband/process/process_analyzer.py b/nodescraper/plugins/inband/process/process_analyzer.py index 8700f1c..7bfdfd3 100644 --- a/nodescraper/plugins/inband/process/process_analyzer.py +++ b/nodescraper/plugins/inband/process/process_analyzer.py @@ -34,7 +34,7 @@ class ProcessAnalyzer(DataAnalyzer[ProcessDataModel, ProcessAnalyzerArgs]): - """Check cpu and kfd processes are within allowed maximum cpu and gpu usage""" + """Check aggregate ``cpu_usage`` against ``max_cpu_usage``.""" DATA_MODEL = ProcessDataModel @@ -42,35 +42,19 @@ def analyze_data( self, data: ProcessDataModel, args: Optional[ProcessAnalyzerArgs] = None ) -> TaskResult: """ - Analyze the process data to check if the number of KFD processes and CPU usage - are within the allowed limits. + Analyze process data: compare aggregate CPU usage to the configured limit. Args: data (ProcessDataModel): The process data to analyze. - args (Optional[ProcessAnalyzerArgs], optional): The process analysis arguments. Defaults to None. + args (Optional[ProcessAnalyzerArgs], optional): Analysis arguments. Defaults to None. Returns: - TaskResult: The result of the analysis, containing any events logged during the process. + TaskResult: The result of the analysis, including any logged events. """ if not args: args = ProcessAnalyzerArgs() - has_errors = False - if data.kfd_process is not None and data.kfd_process > args.max_kfd_processes: - has_errors = True - self._log_event( - category=EventCategory.OS, - description=f"Kfd processes {data.kfd_process} exeed max limit {args.max_kfd_processes}", - data={ - "kfd_process": data.kfd_process, - "kfd_process_limit": args.max_kfd_processes, - }, - priority=EventPriority.CRITICAL, - console_log=True, - ) - if data.cpu_usage is not None and data.cpu_usage > args.max_cpu_usage: - has_errors = True self._log_event( category=EventCategory.OS, description=f"CPU usage {data.cpu_usage} exceeds limit {args.max_cpu_usage}", @@ -81,8 +65,6 @@ def analyze_data( priority=EventPriority.CRITICAL, console_log=True, ) - - if has_errors: self.result.status = ExecutionStatus.ERROR self.result.message = "Process limits exceeded" diff --git a/nodescraper/plugins/inband/process/process_collector.py b/nodescraper/plugins/inband/process/process_collector.py index 9c92080..b613d2e 100644 --- a/nodescraper/plugins/inband/process/process_collector.py +++ b/nodescraper/plugins/inband/process/process_collector.py @@ -23,8 +23,8 @@ # SOFTWARE. # ############################################################################### -import re -from typing import Optional +import time +from typing import Dict, List, Optional, Set, Tuple from nodescraper.base import InBandDataCollector from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily @@ -34,82 +34,261 @@ from .processdata import ProcessDataModel +def _parse_aggregate_cpu_from_proc_stat(proc_stat: str) -> Optional[Tuple[int, int]]: + """Aggregate ``cpu`` line from ``/proc/stat``: ``(total jiffies, idle+iowait)`` or ``None``.""" + for line in proc_stat.splitlines(): + if not line.startswith("cpu "): + continue + parts = line.split() + if len(parts) < 6: + return None + try: + nums = [int(parts[i]) for i in range(1, len(parts))] + except ValueError: + return None + idle = nums[3] + nums[4] + total = sum(nums) + return total, idle + return None + + +def _global_non_idle_percent(total1: int, idle1: int, total2: int, idle2: int) -> float: + """Non-idle % over the interval between two aggregate cpu samples.""" + dt = total2 - total1 + if dt <= 0: + return 0.0 + di = idle2 - idle1 + pct = 100.0 * (1.0 - di / dt) + pct = max(0.0, min(100.0, pct)) + return round(pct, 6) + + +def _parse_proc_pid_stat(stat_line: str) -> Optional[Tuple[int, int]]: + """Parse ``/proc//stat`` body (without pid prefix). Return (pid, utime+stime).""" + stat_line = stat_line.strip() + if not stat_line: + return None + try: + paren_end = stat_line.index(") ") + except ValueError: + return None + head = stat_line[:paren_end] + pid_str = head.split(maxsplit=1)[0] + pid = int(pid_str) + rest = stat_line[paren_end + 2 :].split() + if len(rest) < 13: + return None + utime = int(rest[11]) + stime = int(rest[12]) + return pid, utime + stime + + +def _parse_proc_stat_dump(dump_stdout: str) -> Tuple[Dict[int, int], Set[int]]: + """Parse bulk ``pid|statline`` lines; return (pid -> jiffies, sampler_pids to exclude). + + The shell dump prefixes one line ``__SAMPLER__:`` (see ``ProcessCollector.CMD_PROC_PID_STAT_DUMP``) so we + can drop that subshell from rankings—it would otherwise show up as active CPU. + """ + jiffies_by_pid: Dict[int, int] = {} + exclude: Set[int] = set() + for line in dump_stdout.splitlines(): + stripped = line.strip() + if stripped.startswith("__SAMPLER__:"): + try: + exclude.add(int(stripped.split(":", 1)[1])) + except ValueError: + pass + continue + if "|" not in line: + continue + pid_str, stat_body = line.split("|", 1) + try: + prefix_pid = int(pid_str) + except ValueError: + continue + parsed = _parse_proc_pid_stat(stat_body) + if parsed is None: + continue + stat_pid, jf = parsed + if stat_pid != prefix_pid: + continue + jiffies_by_pid[prefix_pid] = jf + return jiffies_by_pid, exclude + + +def _parse_comm_dump(dump_stdout: str) -> Dict[int, str]: + """Parse ``pid:comm`` lines from ``cat /proc//comm`` batch.""" + out: Dict[int, str] = {} + for line in dump_stdout.splitlines(): + if ":" not in line: + continue + pid_str, comm = line.split(":", 1) + try: + pid = int(pid_str) + except ValueError: + continue + out[pid] = comm.strip() + return out + + +def _top_process_cpu_shares( + sample1: Dict[int, int], + sample2: Dict[int, int], + total_delta: int, + top_n: int, + exclude_pids: Set[int], +) -> List[Tuple[int, float]]: + """Return up to ``top_n`` (pid, cpu_share_pct) sorted by jiffies delta descending.""" + if top_n <= 0: + return [] + all_pids = set(sample1) | set(sample2) + rows: List[Tuple[int, float, int]] = [] + for pid in all_pids: + if pid in exclude_pids: + continue + j1 = sample1.get(pid, 0) + j2 = sample2.get(pid, 0) + delta = max(0, j2 - j1) + if total_delta > 0: + pct = 100.0 * delta / total_delta + else: + pct = 0.0 + rows.append((pid, pct, delta)) + rows.sort(key=lambda x: (x[2], x[1]), reverse=True) + return [(pid, pct) for pid, pct, _ in rows[:top_n]] + + class ProcessCollector(InBandDataCollector[ProcessDataModel, ProcessCollectorArgs]): - """Collect Process details""" + """Linux process list and aggregate CPU usage from ``/proc`` (two samples; no ``top`` or ROCm SMI).""" - SUPPORTED_OS_FAMILY: set[OSFamily] = {OSFamily.LINUX} + SUPPORTED_OS_FAMILY: Set[OSFamily] = {OSFamily.LINUX} DATA_MODEL = ProcessDataModel - CMD_KFD = "rocm-smi --showpids" - CMD_CPU_USAGE = "top -b -n 1" - CMD_PROCESS = "top -b -n 1 -o %CPU " + + CMD_PROC_STAT = "cat /proc/stat" + # Lead with ``__SAMPLER__:`` so we exclude that subshell from top-N rankings. + CMD_PROC_PID_STAT_DUMP = ( + 'printf "__SAMPLER__:%s\\n" "$$"; ' + "for f in /proc/[0-9]*/stat; do " + '[ -r "$f" ] || continue; ' + 'pid="${f#/proc/}"; pid="${pid%/stat}"; ' + '[ "$pid" = "$$" ] && continue; ' + 'printf "%s|" "$pid"; cat "$f" 2>/dev/null || true; printf "\\n"; ' + "done" + ) + CMD_PROC_COMM_BATCH = ( + "for p in {pids}; do " + 'printf "%s:" "$p"; ' + "cat /proc/$p/comm 2>/dev/null || true; " + 'printf "\\n"; ' + "done" + ) + + def _collect_procfs_cpu( + self, + top_n_process: int, + sample_interval_seconds: float, + ) -> Tuple[Optional[float], List[Tuple[str, str]]]: + """Return (cpu_usage, processes) from procfs; (None, []) on failure.""" + stat1 = self._run_sut_cmd(self.CMD_PROC_STAT) + if stat1.exit_code != 0: + return None, [] + dump1 = self._run_sut_cmd(self.CMD_PROC_PID_STAT_DUMP) + if dump1.exit_code != 0: + return None, [] + + time.sleep(sample_interval_seconds) + + stat2 = self._run_sut_cmd(self.CMD_PROC_STAT) + if stat2.exit_code != 0: + return None, [] + dump2 = self._run_sut_cmd(self.CMD_PROC_PID_STAT_DUMP) + if dump2.exit_code != 0: + return None, [] + + agg1 = _parse_aggregate_cpu_from_proc_stat(stat1.stdout) + agg2 = _parse_aggregate_cpu_from_proc_stat(stat2.stdout) + if agg1 is None: + self._log_event( + category=EventCategory.OS, + description="Could not parse aggregate cpu line from /proc/stat (first sample)", + data={"proc_stat_preview": stat1.stdout[:200]}, + priority=EventPriority.ERROR, + ) + return None, [] + if agg2 is None: + self._log_event( + category=EventCategory.OS, + description="Could not parse aggregate cpu line from /proc/stat (second sample)", + data={"proc_stat_preview": stat2.stdout[:200]}, + priority=EventPriority.ERROR, + ) + return None, [] + + total1, idle1 = agg1 + total2, idle2 = agg2 + cpu_usage = _global_non_idle_percent(total1, idle1, total2, idle2) + dt = total2 - total1 + t_delta = dt if dt > 0 else 0 + + sample1, excl1 = _parse_proc_stat_dump(dump1.stdout) + sample2, excl2 = _parse_proc_stat_dump(dump2.stdout) + exclude = excl1 | excl2 + + top_list = _top_process_cpu_shares(sample1, sample2, t_delta, top_n_process, exclude) + pids = [pid for pid, _ in top_list] + if not pids: + processes: List[Tuple[str, str]] = [] + else: + pid_str = " ".join(str(p) for p in pids) + comm_res = self._run_sut_cmd(self.CMD_PROC_COMM_BATCH.format(pids=pid_str)) + if comm_res.exit_code != 0: + comm_map: Dict[int, str] = {} + else: + comm_map = _parse_comm_dump(comm_res.stdout) + processes = [] + for pid, pct in top_list: + name = comm_map.get(pid, f"pid_{pid}") + processes.append((name, f"{pct:.1f}")) + + return cpu_usage, processes def collect_data( self, args: Optional[ProcessCollectorArgs] = None - ) -> tuple[TaskResult, Optional[ProcessDataModel]]: - """Collect process data from the system. + ) -> Tuple[TaskResult, Optional[ProcessDataModel]]: + """Read aggregate CPU usage and top processes from Linux ``/proc``. Args: - args (Optional[ProcessCollectorArgs], optional): process collection arguments. Defaults to None. + args (Optional[ProcessCollectorArgs], optional): ``top_n_process`` and + ``sample_interval_seconds``. Defaults to ``ProcessCollectorArgs()``. Returns: - tuple[TaskResult, Optional[ProcessDataModel]]: tuple containing the task result and the collected process data model or None if no data was collected. + tuple containing the task result and ``ProcessDataModel`` or None if collection failed. """ if args is None: args = ProcessCollectorArgs() + sample_interval_seconds = args.sample_interval_seconds + if sample_interval_seconds <= 0: + sample_interval_seconds = 1.0 + process_data = ProcessDataModel() - process_data.processes = [] - kfd_process = self._run_sut_cmd(self.CMD_KFD) - if kfd_process.exit_code == 0: - if "No KFD PIDs currently running" in kfd_process.stdout: - process_data.kfd_process = 0 - else: - kfd_process = re.findall( - r"^\s*\d+\s+[\w]+\s+\d+\s+\d+\s+\d+\s+\d+", - kfd_process.stdout, - re.MULTILINE, - ) - process_data.kfd_process = len(kfd_process) - - cpu_usage = self._run_sut_cmd(self.CMD_CPU_USAGE) - if cpu_usage.exit_code == 0: - cpu_idle = ( - [line for line in cpu_usage.stdout.splitlines() if "Cpu(s)" in line][0] - .split(",")[3] - .split()[0] - .replace("%id", "") - ) - process_data.cpu_usage = 100 - float(cpu_idle) - - processes = self._run_sut_cmd( - f"self.CMD_PROCESS | sed -n '8,{args.top_n_process + 7}p'" - ) # Remove system header - if processes.exit_code == 0: - for line in processes.stdout.splitlines(): - columns = line.split() - process_cpu_usage = columns[8] - process_name = columns[11] - process_data.processes.append((process_name, process_cpu_usage)) - - process_check = bool(process_data.model_fields_set) - if process_check: - self._log_event( - category="PROCESS_READ", - description="Process data collected", - data=process_data.model_dump(), - priority=EventPriority.INFO, - ) + cpu_usage, processes = self._collect_procfs_cpu(args.top_n_process, sample_interval_seconds) + if cpu_usage is not None: + process_data.cpu_usage = cpu_usage + process_data.processes = processes + + if process_data.cpu_usage is not None: self.result.message = "Process data collected" self.result.status = ExecutionStatus.OK return self.result, process_data - else: - self._log_event( - category=EventCategory.OS, - description="Process data not found", - priority=EventPriority.ERROR, - ) - self.result.message = "Process data not found" - self.result.status = ExecutionStatus.ERROR - return self.result, None + + self._log_event( + category=EventCategory.OS, + description="Process data not found", + priority=EventPriority.ERROR, + ) + self.result.message = "Process data not found" + self.result.status = ExecutionStatus.EXECUTION_FAILURE + return self.result, None diff --git a/nodescraper/plugins/inband/process/processdata.py b/nodescraper/plugins/inband/process/processdata.py index 683ad94..aea551c 100644 --- a/nodescraper/plugins/inband/process/processdata.py +++ b/nodescraper/plugins/inband/process/processdata.py @@ -23,12 +23,13 @@ # SOFTWARE. # ############################################################################### -from typing import Optional +from typing import List, Optional, Tuple from nodescraper.models import DataModel class ProcessDataModel(DataModel): - kfd_process: Optional[int] = None + """Aggregate CPU usage and top processes from ``/proc`` sampling.""" + cpu_usage: Optional[float] = None - processes: Optional[list[tuple[str, str]]] = None + processes: Optional[List[Tuple[str, str]]] = None diff --git a/test/functional/fixtures/process_plugin_config.json b/test/functional/fixtures/process_plugin_config.json index 752da78..ed7009b 100644 --- a/test/functional/fixtures/process_plugin_config.json +++ b/test/functional/fixtures/process_plugin_config.json @@ -3,7 +3,6 @@ "plugins": { "ProcessPlugin": { "analysis_args": { - "max_kfd_processes": 0, "max_cpu_usage": 20.0 } } diff --git a/test/unit/plugin/test_analyzer_args_build_from_model.py b/test/unit/plugin/test_analyzer_args_build_from_model.py index e6eb748..1589fb4 100644 --- a/test/unit/plugin/test_analyzer_args_build_from_model.py +++ b/test/unit/plugin/test_analyzer_args_build_from_model.py @@ -185,13 +185,15 @@ def test_sysctl_analyzer_args_build_from_model(): def test_process_analyzer_args_build_from_model(): """Test ProcessAnalyzerArgs.build_from_model includes all fields""" - datamodel = ProcessDataModel(kfd_process=5, cpu_usage=15.5) + datamodel = ProcessDataModel(cpu_usage=15.5) args = ProcessAnalyzerArgs.build_from_model(datamodel) assert isinstance(args, ProcessAnalyzerArgs) - assert args.max_kfd_processes == 5 assert args.max_cpu_usage == 15.5 + args_default = ProcessAnalyzerArgs.build_from_model(ProcessDataModel()) + assert args_default.max_cpu_usage == 20.0 + def test_kernel_module_analyzer_args_build_from_model(): """Test KernelModuleAnalyzerArgs.build_from_model includes all fields""" diff --git a/test/unit/plugin/test_process_analyzer.py b/test/unit/plugin/test_process_analyzer.py index e7bbb86..46221c0 100644 --- a/test/unit/plugin/test_process_analyzer.py +++ b/test/unit/plugin/test_process_analyzer.py @@ -38,21 +38,17 @@ @pytest.fixture def model_obj(): return ProcessDataModel( - kfd_process=0, - cpu_usage=10, + cpu_usage=10.0, processes=[ - ("top", "10.0"), + ("ksoftirqd/0", "10.0"), ("systemd", "0.0"), - ("kthreadd", "0.0"), - ("rcu_gp", "0.0"), - ("rcu_par_gp", "0.0"), ], ) @pytest.fixture def config(): - return {"max_kfd_processes": 0, "max_cpu_usage": 40} + return {"max_cpu_usage": 40.0} @pytest.fixture @@ -61,9 +57,7 @@ def analyzer(system_info): def test_nominal_with_config(analyzer, model_obj, config): - args = ProcessAnalyzerArgs( - max_kfd_processes=config["max_kfd_processes"], max_cpu_usage=config["max_cpu_usage"] - ) + args = ProcessAnalyzerArgs(max_cpu_usage=config["max_cpu_usage"]) result = analyzer.analyze_data(model_obj, args) assert result.status == ExecutionStatus.OK assert len(result.events) == 0 @@ -75,23 +69,9 @@ def test_nominal_no_config(analyzer, model_obj): assert len(result.events) == 0 -def test_error_kfd_process(analyzer, model_obj, config): - modified_model_obj = copy.deepcopy(model_obj) - modified_model_obj.kfd_process = 1 - args = ProcessAnalyzerArgs( - max_kfd_processes=config["max_kfd_processes"], max_cpu_usage=config["max_cpu_usage"] - ) - result = analyzer.analyze_data(modified_model_obj, args) - - assert result.status == ExecutionStatus.ERROR - for event in result.events: - assert event.category == EventCategory.OS.value - assert event.priority == EventPriority.CRITICAL - - def test_error_cpu_usage(analyzer, model_obj, config): modified_model_obj = copy.deepcopy(model_obj) - args = ProcessAnalyzerArgs(max_kfd_processes=config["max_kfd_processes"], max_cpu_usage=5) + args = ProcessAnalyzerArgs(max_cpu_usage=5.0) result = analyzer.analyze_data(modified_model_obj, args) assert result.status == ExecutionStatus.ERROR diff --git a/test/unit/plugin/test_process_collector.py b/test/unit/plugin/test_process_collector.py index cae8d6f..7764078 100644 --- a/test/unit/plugin/test_process_collector.py +++ b/test/unit/plugin/test_process_collector.py @@ -23,17 +23,49 @@ # SOFTWARE. # ############################################################################### -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest +from nodescraper.enums.eventcategory import EventCategory from nodescraper.enums.executionstatus import ExecutionStatus from nodescraper.enums.systeminteraction import SystemInteractionLevel from nodescraper.interfaces.task import SystemCompatibilityError from nodescraper.models.systeminfo import OSFamily -from nodescraper.plugins.inband.process.process_collector import ProcessCollector +from nodescraper.plugins.inband.process.process_collector import ( + ProcessCollector, + _global_non_idle_percent, + _parse_aggregate_cpu_from_proc_stat, + _parse_comm_dump, + _parse_proc_pid_stat, + _parse_proc_stat_dump, + _top_process_cpu_shares, +) from nodescraper.plugins.inband.process.processdata import ProcessDataModel +PROC_STAT_1 = ( + "cpu 100 0 0 900 0 0 0 0 0 0\n" + "cpu0 55 0 0 495 0 0 0 0 0 0\n" + "cpu1 45 0 0 405 0 0 0 0 0 0\n" +) +PROC_STAT_2 = ( + "cpu 200 0 0 1800 0 0 0 0 0 0\n" + "cpu0 110 0 0 990 0 0 0 0 0 0\n" + "cpu1 90 0 0 810 0 0 0 0 0 0\n" +) + +DUMP_1 = ( + "__SAMPLER__:99999\n" + "1000|1000 (ksoftirqd/0) S 0 0 0 0 -1 69238848 0 0 0 0 5000 6000\n" + "1|1 (systemd) S 0 1 1 0 -1 4194560 100 200 300 400 5000 6000\n" +) + +DUMP_2 = ( + "__SAMPLER__:99999\n" + "1000|1000 (ksoftirqd/0) S 0 0 0 0 -1 69238848 0 0 0 0 5100 6000\n" + "1|1 (systemd) S 0 1 1 0 -1 4194560 100 200 300 400 5000 6000\n" +) + @pytest.fixture def collector(system_info, conn_mock): @@ -44,34 +76,43 @@ def collector(system_info, conn_mock): ) -def test_run_linux(collector, conn_mock): +def _command_runner(): + proc_stat_calls = 0 + dump_calls = 0 + + def run_command(command, sudo=False, timeout=300, strip=True): + nonlocal proc_stat_calls, dump_calls + if command == "cat /proc/stat": + proc_stat_calls += 1 + body = PROC_STAT_1 if proc_stat_calls == 1 else PROC_STAT_2 + return MagicMock(exit_code=0, stdout=body, stderr="", command=command) + if "for f in /proc/" in command and "__SAMPLER__" in command: + dump_calls += 1 + body = DUMP_1 if dump_calls == 1 else DUMP_2 + return MagicMock(exit_code=0, stdout=body, stderr="", command=command) + if "cat /proc/$p/comm" in command: + return MagicMock( + exit_code=0, + stdout="1000:ksoftirqd/0\n1:systemd\n", + stderr="", + command=command, + ) + raise AssertionError(f"unexpected command: {command!r}") + + return run_command + + +@patch("nodescraper.plugins.inband.process.process_collector.time.sleep") +def test_run_linux_procfs(mock_sleep, collector, conn_mock): collector.system_info.os_family = OSFamily.LINUX - conn_mock.run_command.side_effect = [ - MagicMock( - exit_code=0, - stdout="PID PROCESS NAME GPU(s) VRAM USED SDMA USED CU OCCUPANCY\n8246 TransferBench 8 2267283456 0 0", - stderr="", - ), - MagicMock( - exit_code=0, - stdout="%Cpu(s): 0.1 us, 0.1 sy, 0.0 ni, 90.0 id", - stderr="", - ), - MagicMock( - exit_code=0, - stdout="356817 user 20 0 32112 14196 10556 R 10.0 0.0 0:00.07 top\n" - "1 root 20 0 166596 11916 8316 S 0.0 0.0 1:32.14 systemd", - stderr="", - ), - ] + conn_mock.run_command.side_effect = _command_runner() result, data = collector.collect_data() assert result.status == ExecutionStatus.OK assert data == ProcessDataModel( - kfd_process=1, - cpu_usage=10, + cpu_usage=10.0, processes=[ - ("top", "10.0"), + ("ksoftirqd/0", "10.0"), ("systemd", "0.0"), ], ) @@ -87,14 +128,77 @@ def test_unsupported_platform(system_info, conn_mock): ) -def test_exit_failure(collector, conn_mock): +@patch("nodescraper.plugins.inband.process.process_collector.time.sleep") +def test_exit_failure_procfs(mock_sleep, collector, conn_mock): collector.system_info.os_family = OSFamily.LINUX conn_mock.run_command.side_effect = [ - MagicMock(exit_code=1, stdout="", stderr=""), - MagicMock(exit_code=0, stdout="", stderr=""), - MagicMock(exit_code=0, stdout="", stderr=""), + MagicMock(exit_code=1, stdout="", stderr="no proc", command="cat /proc/stat"), ] result, data = collector.collect_data() assert result.status == ExecutionStatus.EXECUTION_FAILURE assert data is None + + +@patch("nodescraper.plugins.inband.process.process_collector.time.sleep") +def test_invalid_proc_stat_logs_os_event(mock_sleep, collector, conn_mock): + collector.system_info.os_family = OSFamily.LINUX + conn_mock.run_command.side_effect = [ + MagicMock( + exit_code=0, stdout="not a valid proc_stat\n", stderr="", command="cat /proc/stat" + ), + MagicMock(exit_code=0, stdout="__SAMPLER__:1\n", stderr="", command="stat-dump-1"), + MagicMock(exit_code=0, stdout="still bad\n", stderr="", command="cat /proc/stat"), + MagicMock(exit_code=0, stdout="__SAMPLER__:1\n", stderr="", command="stat-dump-2"), + ] + with patch.object(collector, "_log_event") as mock_log: + result, data = collector.collect_data() + + assert result.status == ExecutionStatus.EXECUTION_FAILURE + assert data is None + os_parse_logs = [ + c + for c in mock_log.call_args_list + if c.kwargs.get("category") == EventCategory.OS + and "aggregate cpu" in (c.kwargs.get("description") or "") + ] + assert os_parse_logs, "Expected OS event for unparseable /proc/stat aggregate cpu line" + + +class TestProcessCollectorProcfsParsing: + def test_parse_aggregate_cpu_from_proc_stat(self): + text = "cpu0 1 2 3 4 5 6 7 8\n" "cpu 100 0 0 900 0 0 0 0 0 0\n" + assert _parse_aggregate_cpu_from_proc_stat(text) == (1000, 900) + + def test_parse_aggregate_cpu_from_proc_stat_invalid(self): + assert _parse_aggregate_cpu_from_proc_stat("cpu 1 2\n") is None + assert _parse_aggregate_cpu_from_proc_stat("no aggregate line\n") is None + + def test_global_non_idle_percent(self): + t1, i1 = 1000, 900 + t2, i2 = 2000, 1800 + assert _global_non_idle_percent(t1, i1, t2, i2) == 10.0 + + def test_parse_proc_pid_stat(self): + stat_line = DUMP_1.splitlines()[1].split("|", 1)[1] + assert _parse_proc_pid_stat(stat_line) == (1000, 11000) + + def test_parse_proc_stat_dump(self): + dump = "__SAMPLER__:12345\n" + "\n".join(DUMP_1.splitlines()[1:]) + jf, excl = _parse_proc_stat_dump(dump) + assert excl == {12345} + assert jf[1000] == 11000 + assert jf[1] == 11000 + + def test_top_process_cpu_shares(self): + s1 = {1: 100, 2: 200} + s2 = {1: 100, 2: 400} + top = _top_process_cpu_shares(s1, s2, total_delta=1000, top_n=2, exclude_pids=set()) + assert top[0][0] == 2 + assert top[0][1] == 20.0 + assert top[1][1] == 0.0 + + def test_parse_comm_dump(self): + m = _parse_comm_dump("1000:ksoftirqd/0\n1:systemd\n") + assert m[1000] == "ksoftirqd/0" + assert m[1] == "systemd"