diff --git a/bin/wfbench b/bin/wfbench index 4bc03cba..a196175f 100755 --- a/bin/wfbench +++ b/bin/wfbench @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2021-2025 The WfCommons Team. @@ -13,18 +13,21 @@ import pathlib import subprocess import time import sys -import signal -import queue import argparse import re import json import logging -import pandas as pd +import psutil from io import StringIO from filelock import FileLock from pathos.helpers import mp as multiprocessing -from typing import List, Optional + +from abc import ABC, abstractmethod +from typing import List, Optional, IO + +int32_max = 2147483647 +this_dir = pathlib.Path(__file__).resolve().parent # Configure logging @@ -35,10 +38,6 @@ logging.basicConfig( handlers=[logging.StreamHandler()] ) - -this_dir = pathlib.Path(__file__).resolve().parent - - def log_info(msg: str): """ Log an info message to stderr @@ -66,6 +65,227 @@ def log_error(msg: str): """ logging.error(msg) +# Utility process class +####################### + +class ProcessHandle: + def __init__(self, proc: multiprocessing.Process | subprocess.Popen): + self._proc = proc + + @property + def pid(self): + return self._proc.pid + + def terminate(self): + self._proc.terminate() + + def terminate_along_with_children(self): + # If it's a multiprocessing, just kill the parent and return + if isinstance(self._proc, multiprocessing.Process): + self._proc.terminate() + return + # If it's a Popen, then do the brute-force thing + try: + parent = psutil.Process(self._proc.pid) + children = parent.children(recursive=True) + for child in children: + try: + child.kill() + except psutil.NoSuchProcess: + pass # Process is already dead' + try: + parent.kill() + except psutil.NoSuchProcess: + pass # Nevermind + except subprocess.TimeoutExpired: + log_debug("Process did not terminate; force-killing.") + subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() + + def wait(self): + if isinstance(self._proc, multiprocessing.Process): + self._proc.join() + else: + self._proc.wait() + + def is_alive(self): + if isinstance(self._proc, multiprocessing.Process): + return self._proc.is_alive() + else: + return self._proc.poll() is None + +# Benchmark classes +################### + +class Benchmark(ABC): + @abstractmethod + def run(self) -> multiprocessing.Process: + pass + +class IOReadBenchmark: + def __init__(self): + self.to_read : dict[str, (IO, int)] = {} + + def add_read_operation(self, filepath: str, opened_file: IO, num_bytes: int): + self.to_read[filepath] = (opened_file, num_bytes) + + def run(self) -> ProcessHandle | None: + if len(self.to_read) <= 0: + return None + p = multiprocessing.Process(target=self.benchmark_function, args=()) + p.start() + return ProcessHandle(p) + + def benchmark_function(self): + for filepath, (opened_file, bytes_to_read) in self.to_read.items(): + log_debug(f"Reading {bytes_to_read} bytes from {filepath}...") + opened_file.read(bytes_to_read) + + +class IOWriteBenchmark: + def __init__(self): + self.to_write : dict[str, (IO, int)] = {} + + def add_write_operation(self, filepath: str, opened_file: IO, num_bytes: int): + self.to_write[filepath] = (opened_file, num_bytes) + + def run(self) -> ProcessHandle | None: + if len(self.to_write) <= 0: + return None + p = multiprocessing.Process(target=self.benchmark_function, args=()) + p.start() + return ProcessHandle(p) + + def benchmark_function(self): + for filepath, (opened_file, bytes_to_write) in self.to_write.items(): + log_debug(f"Writing {bytes_to_write} bytes to {filepath}...") + opened_file.write(os.urandom(int(bytes_to_write))) + opened_file.flush() + + +class CPUBenchmark: + def __init__(self, cpu_threads: Optional[int] = 5, + mem_threads: Optional[int] = 5, + core: Optional[int] = None, + total_mem: Optional[int] = None): + self.cpu_threads = cpu_threads + self.mem_threads = mem_threads + self.core = core + self.total_mem = total_mem + self.work = None + + def set_work(self, work: int): + self.work = work + + def set_infinite_work(self): + self.work = int32_max # "infinite" + + def run(self) -> list[ProcessHandle | None]: + if self.work is None or self.work <= 0: + return [None, None] + + total_mem = f"{self.total_mem}B" if self.total_mem else f"{100.0 / os.cpu_count()}%" + cpu_work_per_thread = int(1000000 * self.work / (16384 * self.cpu_threads)) if self.cpu_threads != 0 else int32_max ** 2 + cpu_samples = min(cpu_work_per_thread, int32_max) + cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max + if cpu_ops > int32_max: + log_info("Exceeded maximum allowed value of cpu work.") + cpu_ops = int32_max + + + # Start CPU benchmark, if need be + cpu_proc_handle = None + if self.cpu_threads > 0: + log_debug(f"Running CPU benchmark with {self.cpu_threads} threads for {self.work if self.work < int32_max else 'infinite'} units of work...") + cpu_prog = ["stress-ng", "--monte-carlo", f"{self.cpu_threads}", + "--monte-carlo-method", "pi", + "--monte-carlo-rand", "lcg", + "--monte-carlo-samples", f"{cpu_samples}", + "--monte-carlo-ops", f"{cpu_ops}", + "--quiet"] + cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) + cpu_proc_handle = ProcessHandle(cpu_proc) + + # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) + if self.core: + os.sched_setaffinity(cpu_proc.pid, {self.core}) + + # Start Memory benchmark, if need be + mem_proc_handle = None + if self.mem_threads > 0: + # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows + log_debug(f"Running memory benchmark with {self.mem_threads} threads...") + mem_prog = ["stress-ng", "--vm", f"{self.mem_threads}", + "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] + mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) + mem_proc_handle = ProcessHandle(mem_proc) + if self.core: + os.sched_setaffinity(mem_proc.pid, {self.core}) + + return [cpu_proc_handle, mem_proc_handle] + + +class GPUBenchmark: + + @staticmethod + def get_available_gpus(): + proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, _ = proc.communicate() + df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") + return df[df["utilization.gpu"] <= 5].index.to_list() + + def __init__(self): + self.work = None + self.duration = None + self.device = None + + def set_device(self): + available_gpus = self.get_available_gpus() # checking for available GPUs + if not available_gpus: + log_error("No GPU available") + sys.exit(1) + self.device = available_gpus[0] + log_debug(f"GPU benchmark instantiated for device {self.device}") + + def set_work(self, work: int): + self.work = work + + def set_time(self, duration: float): + self.duration = duration + + def run(self) -> ProcessHandle | None: + if self.work is None and self.duration is None: + return None + + if self.duration is not None: + log_debug(f"Running GPU benchmark for {self.duration} seconds") + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={self.device} {this_dir.joinpath('./gpu_benchmark')} {self.work} {self.duration}"] + else: + log_debug(f"Running GPU benchmark for {self.work} units of work") + gpu_prog = [ + f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={self.device} {this_dir.joinpath('./gpu_benchmark')} {self.work}"] + + p = subprocess.Popen(gpu_prog, shell=True) + return ProcessHandle(p) + +# def kill_process_and_children(proc): +# if proc is None: +# return +# try: +# parent = psutil.Process(proc.pid) +# children = parent.children(recursive=True) +# for child in children: +# child.kill() +# parent.kill() +# +# except psutil.NoSuchProcess: +# pass # Process is already dead + + + +# Utility code functions +######################## def lock_core(path_locked: pathlib.Path, path_cores: pathlib.Path) -> int: @@ -128,186 +348,185 @@ def unlock_core(path_locked: pathlib.Path, finally: lock.release() -def monitor_progress(proc, cpu_queue): - """Monitor progress from the CPU benchmark process.""" - for line in iter(proc.stdout.readline, ""): # No decode needed - line = line.strip() - if line.startswith("Progress:"): - try: - progress = float(line.split()[1].strip('%')) - cpu_queue.put(progress) - except (ValueError, IndexError): - pass - -def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, - cpu_threads: Optional[int] = 5, - mem_threads: Optional[int] = 5, - cpu_work: Optional[int] = 100, - core: Optional[int] = None, - total_mem: Optional[int] = None) -> List: - """ - Run CPU and memory benchmark. - - :param cpu_queue: Queue to push CPU benchmark progress as a float. - :type cpu_queue: multiprocessing.Queue - :param cpu_threads: Number of threads for CPU benchmark. - :type cpu_threads: Optional[int] - :param mem_threads: Number of threads for memory benchmark. - :type mem_threads: Optional[int] - :param cpu_work: Total work units for CPU benchmark. - :type cpu_work: Optional[int] - :param core: Core to pin the benchmark processes to. - :type core: Optional[int] - :param total_mem: Total memory to use for memory benchmark. - :type total_mem: Optional[float] - - :return: Lists of CPU and memory subprocesses. - :rtype: List - """ - total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"] - mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", f"{total_mem}", "--vm-keep"] - - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - - # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) - if core: - os.sched_setaffinity(cpu_proc.pid, {core}) - cpu_procs.append(cpu_proc) - - # Start a thread to monitor the progress of each CPU benchmark process - monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue)) - monitor_thread.start() - - if mem_threads > 0: - # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows - mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) - if core: - os.sched_setaffinity(mem_proc.pid, {core}) - mem_procs.append(mem_proc) - - return cpu_procs, mem_procs - - -def io_read_benchmark_user_input_data_size(inputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = -1 - memory_limit = int(memory_limit) - log_debug("Starting IO Read Benchmark...") - for file, size in inputs.items(): - with open(rundir.joinpath(file), "rb") as fp: - log_debug(f"Reading '{file}'") - chunk_size = min(size, memory_limit) - while fp.read(chunk_size): - pass - log_debug("Completed IO Read Benchmark!") - - -def io_write_benchmark_user_input_data_size(outputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = sys.maxsize - memory_limit = int(memory_limit) - for file_name, file_size in outputs.items(): - log_debug(f"Writing output file '{file_name}'") - file_size_todo = file_size - while file_size_todo > 0: - with open(rundir.joinpath(file_name), "ab") as fp: - chunk_size = min(file_size_todo, memory_limit) - file_size_todo -= fp.write(os.urandom(int(chunk_size))) - - -def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): - """Alternate between reading and writing to a file, ensuring read only happens after write.""" - - if memory_limit is None: - memory_limit = 10 * 1024 * 1024 # sys.maxsize - memory_limit = int(memory_limit) - - # queue will have messages in the form (cpu_percent_completed) - # Get the last message and trash the rest - - # Create empty files - for name in outputs: - open(rundir.joinpath(name), "wb").close() - - io_completed = 0 - bytes_read = { - name: 0 - for name in inputs - } - bytes_written = { - name: 0 - for name in outputs - } - - # get size of inputs - inputs = { - name: os.path.getsize(rundir.joinpath(name)) - for name in inputs - } - - while io_completed < 100: - cpu_percent = max(io_completed, cpu_queue.get()) - while True: # Get the last message - try: - cpu_percent = max(io_completed, cpu_queue.get_nowait()) - except queue.Empty: - break - - log_debug(f"CPU Percent: {cpu_percent}") - if cpu_percent: - bytes_to_read = { - name: int(size * (cpu_percent / 100) - bytes_read[name]) - for name, size in inputs.items() - } - bytes_to_write = { - name: int(size * (cpu_percent / 100) - bytes_written[name]) - for name, size in outputs.items() - } - io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) - io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit) - - bytes_read = { - name: bytes_read[name] + bytes_to_read[name] - for name in bytes_to_read - } - bytes_written = { - name: bytes_written[name] + bytes_to_write[name] - for name in bytes_to_write - } - - log_debug(f"Bytes Read: {bytes_read}") - log_debug(f"Bytes Written: {bytes_written}") - - io_completed = cpu_percent - - if io_completed >= 100: - break - -def get_available_gpus(): - proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, _ = proc.communicate() - df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") - return df[df["utilization.gpu"] <= 5].index.to_list() +# def monitor_progress(proc, cpu_queue): +# """Monitor progress from the CPU benchmark process.""" +# for line in iter(proc.stdout.readline, ""): # No decode needed +# line = line.strip() +# if line.startswith("Progress:"): +# try: +# progress = float(line.split()[1].strip('%')) +# cpu_queue.put(progress) +# except (ValueError, IndexError): +# pass +# +# def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, +# cpu_threads: Optional[int] = 5, +# mem_threads: Optional[int] = 5, +# cpu_work: Optional[int] = 100, +# core: Optional[int] = None, +# total_mem: Optional[int] = None) -> List: +# """ +# Run CPU and memory benchmark. +# +# :param cpu_queue: Queue to push CPU benchmark progress as a float. +# :type cpu_queue: multiprocessing.Queue +# :param cpu_threads: Number of threads for CPU benchmark. +# :type cpu_threads: Optional[int] +# :param mem_threads: Number of threads for memory benchmark. +# :type mem_threads: Optional[int] +# :param cpu_work: Total work units for CPU benchmark. +# :type cpu_work: Optional[int] +# :param core: Core to pin the benchmark processes to. +# :type core: Optional[int] +# :param total_mem: Total memory to use for memory benchmark. +# :type total_mem: Optional[float] +# +# :return: Lists of CPU and memory subprocesses. +# :rtype: List +# """ +# total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" +# cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads)) if cpu_threads != 0 else int32_max**2 +# cpu_samples = min(cpu_work_per_thread, int32_max) +# cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max +# if cpu_ops > int32_max: +# log_info("Exceeded maximum allowed value of cpu work.") +# cpu_ops = int32_max +# +# cpu_proc = None +# mem_proc = None +# +# cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}", +# "--monte-carlo-method", "pi", +# "--monte-carlo-rand", "lcg", +# "--monte-carlo-samples", f"{cpu_samples}", +# "--monte-carlo-ops", f"{cpu_ops}", +# "--quiet"] +# mem_prog = ["stress-ng", "--vm", f"{mem_threads}", +# "--vm-bytes", f"{total_mem}", "--vm-keep", "--quiet"] +# +# if cpu_threads > 0: +# cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid) +# +# # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) +# if core: +# os.sched_setaffinity(cpu_proc.pid, {core}) +# +# if mem_threads > 0: +# # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows +# mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) +# if core: +# os.sched_setaffinity(mem_proc.pid, {core}) +# +# return [cpu_proc, mem_proc] +# +# +# def io_read_benchmark_user_input_data_size(inputs, +# rundir=None, +# memory_limit=None): +# if memory_limit is None: +# memory_limit = -1 +# memory_limit = int(memory_limit) +# log_debug("Starting IO Read Benchmark...") +# for file, size in inputs.items(): +# with open(rundir.joinpath(file), "rb") as fp: +# log_debug(f"Reading '{file}'") +# chunk_size = min(size, memory_limit) +# while fp.read(chunk_size): +# pass +# log_debug("Completed IO Read Benchmark!") +# +# +# def io_write_benchmark_user_input_data_size(outputs, +# rundir=None, +# memory_limit=None): +# if memory_limit is None: +# memory_limit = sys.maxsize +# memory_limit = int(memory_limit) +# for file_name, file_size in outputs.items(): +# log_debug(f"Writing output file '{file_name}'") +# file_size_todo = file_size +# while file_size_todo > 0: +# with open(rundir.joinpath(file_name), "ab") as fp: +# chunk_size = min(file_size_todo, memory_limit) +# file_size_todo -= fp.write(os.urandom(int(chunk_size))) +# +# +# def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): +# """Alternate between reading and writing to a file, ensuring read only happens after write.""" +# +# if memory_limit is None: +# memory_limit = 10 * 1024 * 1024 # sys.maxsize +# memory_limit = int(memory_limit) +# +# # queue will have messages in the form (cpu_percent_completed) +# # Get the last message and trash the rest +# +# # Create empty files +# for name in outputs: +# open(rundir.joinpath(name), "wb").close() +# +# io_completed = 1 +# bytes_read = { +# name: 0 +# for name in inputs +# } +# bytes_written = { +# name: 0 +# for name in outputs +# } +# +# # get size of inputs +# inputs = { +# name: os.path.getsize(rundir.joinpath(name)) +# for name in inputs +# } +# +# while io_completed < 100: +# #cpu_percent = max(io_completed, cpu_queue.get()) +# #while True: # Get the last message +# # try: +# # cpu_percent = max(io_completed, cpu_queue.get_nowait()) +# # except queue.Empty: +# # break +# +# log_debug(f"IO Percent: {io_completed}") +# if True: #cpu_percent: +# bytes_to_read = { +# name: int(size * (io_completed / 100) - bytes_read[name]) +# for name, size in inputs.items() +# } +# bytes_to_write = { +# name: int(size * (io_completed / 100) - bytes_written[name]) +# for name, size in outputs.items() +# } +# io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) +# io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit) +# +# bytes_read = { +# name: bytes_read[name] + bytes_to_read[name] +# for name in bytes_to_read +# } +# bytes_written = { +# name: bytes_written[name] + bytes_to_write[name] +# for name in bytes_to_write +# } +# +# log_debug(f"Bytes Read: {bytes_read}") +# log_debug(f"Bytes Written: {bytes_written}") +# +# io_completed = io_completed + 1 +# +# if io_completed >= 100: +# break -def gpu_benchmark(time: int = 100, - work: int = 100, - device: int = 0): #work, device - - gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] - log_debug(f"Running GPU Benchmark: {gpu_prog}") - subprocess.Popen(gpu_prog, shell=True) +# def gpu_benchmark(time: int = 100, +# work: int = 100, +# device: int = 0): #work, device +# +# gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] +# log_debug(f"Running GPU Benchmark: {gpu_prog}") +# subprocess.Popen(gpu_prog, shell=True) def get_parser() -> argparse.ArgumentParser: @@ -319,22 +538,26 @@ def get_parser() -> argparse.ArgumentParser: parser.add_argument("--path-lock", default=None, help="Path to lock file.") parser.add_argument("--path-cores", default=None, help="Path to cores file.") parser.add_argument("--cpu-work", default=None, help="Amount of CPU work.") + parser.add_argument("--num-chunks", default=10, help="Number of chunks used for pipelining I/O and " + "computation throughout the execution (fewer chunks may be used " + "if amounts of work and or input/output file sizes are too small).") parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.") - parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete the task (overrides CPU and GPU works).") - parser.add_argument("--mem", type=float, default=None, help="Max amount (in MB) of memory consumption.") - parser.add_argument("--output-files", help="output file names with sizes in bytes as a JSON dictionary " + parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete " + "the computational portion of the benchmark (overrides CPU and GPU works).") + parser.add_argument("--mem", type=float, default=None, help="Maximum memory consumption (in MB).") + parser.add_argument("--output-files", help="Output file names with sizes in bytes as a JSON dictionary " "(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).") - parser.add_argument("--input-files", help="input files names as a JSON array " + parser.add_argument("--input-files", help="Input files names as a JSON array " "(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).") parser.add_argument("--debug", action="store_true", help="Enable debug messages.") parser.add_argument("--with-flowcept", action="store_true", default=False, help="Enable Flowcept monitoring.") parser.add_argument("--workflow_id", default=None, help="Id to group tasks in a workflow.") return parser - + def begin_flowcept(args): - log_info("Running with Flowcept.") + log_debug("Running with Flowcept.") from flowcept import Flowcept, FlowceptTask # TODO: parametrize to allow storing individual tasks f = Flowcept(workflow_id=args.workflow_id, @@ -350,6 +573,35 @@ def end_flowcept(flowcept, flowcept_task): flowcept.stop() +def compute_num_chunks(args): + # Compute the (feasible number of chunks) + min_chunk_size_time = 1.0 # At least 1 second per chunk, if we're doing time-based + # TODO: Pick reasonable factors below so that a chunk takes about min_chunk_size_time sec on a reasonable machine + min_chunk_size_cpu_work = 3000000 * min_chunk_size_time # 1s on my MacBook Pro + min_chunk_size_gpu_work = 30000000 * min_chunk_size_time # unknown..... + + if args.time: + num_chunks = min(int(args.num_chunks), int(float(args.time) / min_chunk_size_time)) + else: + if args.cpu_work: + num_chunks_cpu = min(int(args.num_chunks), int(float(args.cpu_work) / min_chunk_size_cpu_work)) + else: + num_chunks_cpu = 1 + if args.gpu_work: + num_chunks_gpu = min(int(args.num_chunks), int(float(args.gpu_work) / min_chunk_size_gpu_work)) + else: + num_chunks_gpu = 1 + num_chunks = min(num_chunks_cpu, num_chunks_cpu) + + num_chunks = max(num_chunks, 1) # The above computations may say "zero" + return num_chunks + +def kill_current_handles(handles: list[ProcessHandle]): + for handle in handles: + if handle is not None and handle.is_alive(): + handle.terminate_along_with_children() + + def main(): """Main program.""" parser = get_parser() @@ -372,132 +624,275 @@ def main(): path_cores = pathlib.Path(args.path_cores) core = lock_core(path_locked, path_cores) - log_info(f"Starting {args.name} Benchmark") - - mem_bytes = args.mem * 1024 * 1024 if args.mem else None - - procs = [] - io_proc = None - outputs_dict = {} - - cpu_queue = multiprocessing.Queue() - - log_debug(f"Working directory: {os.getcwd()}") - - # Deal with input/output files if any + if args.time and (not args.cpu_work and not args.gpu_work): + log_error("If --time is provided, at least one of --cpu-work and --gpu-work must also be provided.") + sys.exit(1) + + # Compute the (feasible) number of chunks based on the arguments + num_chunks = compute_num_chunks(args) + log_debug(f"Executing benchmark with {num_chunks} chunks.") + + # At this point we know the number of chunks, and we can just iterate as follows (N = num_chunks + 2) + # step 0 sep 1 step 2 step N-3 step N-2 step N-1 + # READ READ READ ... READ - - + # - COMPUTE_CPU COMPUTE_CPU ... COMPUTE_CPU COMPUTE_CPU - + # - COMPUTE_GPU COMPUTE_GPU ... COMPUTE_GPU COMPUTE_GPU - + # - - WRITE ... WRITE WRITE WRITE + # (Intermediate READ and WRITE steps may do nothing for some files if there is too little data) + + # Construct a list of benchmark steps, where each step is a list of IO benchmarks (Read or Write) + # and a list of non-IO benchmarks (CPU, GPU). Initially these are all "do nothing" benchmarks + N = num_chunks + 2 + steps = [{"io_read_benchmark": IOReadBenchmark(), + "io_write_benchmark": IOWriteBenchmark(), + "cpu_benchmark": CPUBenchmark(cpu_threads=int(10 * args.percent_cpu), + mem_threads=int(10 - 10 * args.percent_cpu), + core=core, + total_mem=args.mem * 1000 * 1000 if args.mem else None), + "gpu_benchmark": GPUBenchmark()} for i in range(N)] + + min_chunk_size_data = 1000 # 1KB per chunk at a minimum for each input / output file, otherwise the file + # is read/written all at once at the beginning/end + + # Augment I/O read benchmarks for each input file cleaned_input = "{}" if args.input_files is None else re.sub(r'\\+', '', args.input_files) + try: + input_files = json.loads(cleaned_input) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --input-files JSON string argument: {e}") + sys.exit(1) + + for file_path in input_files: + file_size = os.path.getsize(file_path) + # If file is zero-size, do nothing + if file_size == 0: + continue + opened_file = open(rundir / file_path, "rb") + # If file is "small" only read it at the beginning + if file_size < num_chunks * min_chunk_size_data: + steps[0]["io_read_benchmark"].add_read_operation(file_path, opened_file, file_size) + continue + # Otherwise, read it in chunks + for step in range(0, N-2): + num_bytes = file_size // num_chunks + (file_size % num_chunks > step) + steps[step]["io_read_benchmark"].add_read_operation(file_path, opened_file, num_bytes) + + # Augment I/O write benchmarks for each output file cleaned_output = "{}" if args.output_files is None else re.sub(r'\\+', '', args.output_files) - # print("CLEANED INPUT", cleaned_input) - # print("CLEANED OUTPUT", cleaned_output) - - if cleaned_input or cleaned_output: - log_debug("Starting IO benchmark...") - - # Attempt to parse the cleaned string - try: - outputs_dict = json.loads(cleaned_output) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --output-files JSON string argument: {e}") - sys.exit(1) - - try: - inputs_array = json.loads(cleaned_input) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --input-files JSON string argument: {e}") - sys.exit(1) - - # print("OUTPUT", outputs_dict) - # print("INPUTS", inputs_array) - - # Create a multiprocessing event that in the first run is set to True - write_done_event = multiprocessing.Event() - # Set this to True to allow the first read to happen - write_done_event.set() - # Print the value of the event - # print("Event Value:", write_done_event.is_set()) - - io_proc = multiprocessing.Process( - target=io_alternate, - args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event) - ) - io_proc.start() - procs.append(io_proc) - - if args.gpu_work: - log_info(f"Starting GPU Benchmark for {args.name}...") - available_gpus = get_available_gpus() #checking for available GPUs - - if not available_gpus: - log_error("No GPU available") - sys.exit(1) - else: - device = available_gpus[0] - log_debug(f"Running on GPU {device}") - - if args.time: - log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}") - gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) - else: - gpu_benchmark(work=int(args.gpu_work), device=device) - + try: + output_files = json.loads(cleaned_output) + except json.JSONDecodeError as e: + log_error(f"Failed to decode --output-files JSON string argument: {e}") + sys.exit(1) + + for file_path, file_size in output_files.items(): + # Open the file for writing no matter what (it should be created) + opened_file = open(rundir / file_path, "wb") + # If file is zero-size, do nothing + if file_size == 0: + continue + # If file is "small" only write it at the end + if file_size < num_chunks * min_chunk_size_data: + steps[N-1]["io_write_benchmark"].add_write_operation(file_path, opened_file, file_size) + continue + # Otherwise, write it in chunks + for step in range(2, N): + num_bytes = file_size // num_chunks + (file_size % num_chunks > (step - 2)) + steps[step]["io_write_benchmark"].add_write_operation(file_path, opened_file, num_bytes) + + # Augment CPU benchmark with computation (if need be) if args.cpu_work: - log_info(f"Starting CPU and Memory Benchmarks for {args.name}...") - if core: - log_debug(f"{args.name} acquired core {core}") - - mem_threads=int(10 - 10 * args.percent_cpu) - cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue, - cpu_threads=int(10 * args.percent_cpu), - mem_threads=mem_threads, - cpu_work=sys.maxsize if args.time else int(args.cpu_work), - core=core, - total_mem=mem_bytes) - - procs.extend(cpu_procs) if args.time: - time.sleep(int(args.time)) - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - elif isinstance(proc, subprocess.Popen): - proc.terminate() + for step in range(1, N-1): + steps[step]["cpu_benchmark"].set_infinite_work() else: - for proc in procs: - if isinstance(proc, subprocess.Popen): - proc.wait() - if io_proc is not None and io_proc.is_alive(): - # io_proc.terminate() - io_proc.join() - - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails - except subprocess.TimeoutExpired: - log_debug("Memory process did not terminate; force-killing.") - # As a fallback, use pkill if any remaining instances are stuck - subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - - log_debug("Completed CPU and Memory Benchmarks!") - - # NOTE: If you would like to run only IO add time.sleep(2) - # Check if all procs are done, if not, kill them - log_debug("Checking if all processes are done...") - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - proc.join() - if isinstance(proc, subprocess.Popen): - proc.wait() + for step in range(1, N-1): + chunk_work = int(args.cpu_work) // num_chunks + (int(args.cpu_work) % num_chunks > step - 1) + steps[step]["cpu_benchmark"].set_work(chunk_work) + # Augment GPU benchmark with computation (if need be) + if args.gpu_work: + if args.time: + for step in range(1, N - 1): + steps[step]["gpu_benchmark"].set_device() + steps[step]["gpu_benchmark"].set_work(int(args.gpu_work)) + steps[step]["gpu_benchmark"].set_time(float(args.time)) + else: + for step in range(1, N - 1): + chunk_work = int(args.gpu_work) // num_chunks + (int(args.gpu_work) % num_chunks > step - 1) + steps[step]["gpu_benchmark"].set_device() + steps[step]["gpu_benchmark"].set_work(chunk_work) + + # All benchmarks have been specified, we can just go through the steps blindly + # log_info(f"Starting {args.name} Benchmark") + + current_proc_handles = [] + try: + for step_index, step in enumerate(steps): + log_debug(f"**** STEP {step_index} ***") + io_read_process = step["io_read_benchmark"].run() + current_proc_handles += [io_read_process] + io_write_process = step["io_write_benchmark"].run() + [cpu_benchmark_process, memory_benchmark_process] = step["cpu_benchmark"].run() + current_proc_handles += [cpu_benchmark_process, memory_benchmark_process] + gpu_benchmark_process = step["gpu_benchmark"].run() + current_proc_handles += [gpu_benchmark_process] + current_proc_handles[:] = [io_read_process, cpu_benchmark_process, memory_benchmark_process, gpu_benchmark_process] + + # If time based, sleep the required amount of time and kill the process + if args.time: + time.sleep(float(args.time) / num_chunks) + if cpu_benchmark_process is not None: + cpu_benchmark_process.terminate_along_with_children() + if gpu_benchmark_process is not None: + gpu_benchmark_process.terminate() + + # Wait for the I/O processes to be done + if io_read_process is not None: + io_read_process.wait() + if io_write_process is not None: + io_write_process.wait() + + # Wait for the CPU process to be done + if cpu_benchmark_process is not None: + cpu_benchmark_process.wait() + + # Kill the Memory process + if memory_benchmark_process is not None: + memory_benchmark_process.terminate_along_with_children() + memory_benchmark_process.wait() + + # Wait for the GPU Process to be done + if gpu_benchmark_process is not None: + gpu_benchmark_process.wait() + except KeyboardInterrupt: + log_debug("Detected Keyboard interrupt: cleaning up processes...") + kill_current_handles(current_proc_handles) + finally: + log_debug("Aborting: cleaning up processes...") + kill_current_handles(current_proc_handles) + + + # Cleanups if core: unlock_core(path_locked, path_cores, core) if args.with_flowcept: end_flowcept(flowcept, flowcept_task) - log_info(f"Benchmark {args.name} completed!") + log_debug(f"{args.name} Benchmark Completed") + + # OLD CODE BELOW: + # + # procs = [] + # io_proc = None + # outputs_dict = {} + # + # cpu_queue = multiprocessing.Queue() + # + # log_debug(f"Working directory: {os.getcwd()}") + # + # if cleaned_input or cleaned_output: + # log_debug("Starting IO benchmark...") + # + # # Attempt to parse the cleaned string + # try: + # outputs_dict = json.loads(cleaned_output) + # except json.JSONDecodeError as e: + # log_error(f"Failed to decode --output-files JSON string argument: {e}") + # sys.exit(1) + # + # try: + # inputs_array = json.loads(cleaned_input) + # except json.JSONDecodeError as e: + # log_error(f"Failed to decode --input-files JSON string argument: {e}") + # sys.exit(1) + # + # # print("OUTPUT", outputs_dict) + # # print("INPUTS", inputs_array) + # + # # Create a multiprocessing event that in the first run is set to True + # write_done_event = multiprocessing.Event() + # # Set this to True to allow the first read to happen + # write_done_event.set() + # # Print the value of the event + # # print("Event Value:", write_done_event.is_set()) + # + # io_proc = multiprocessing.Process( + # target=io_alternate, + # args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event) + # ) + # io_proc.start() + # procs.append(io_proc) + # + # if args.gpu_work: + # log_info(f"Starting GPU Benchmark for {args.name}...") + # available_gpus = get_available_gpus() #checking for available GPUs + # + # if not available_gpus: + # log_error("No GPU available") + # sys.exit(1) + # else: + # device = available_gpus[0] + # log_debug(f"Running on GPU {device}") + # + # if args.time: + # log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}") + # gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) + # else: + # gpu_benchmark(work=int(args.gpu_work), device=device) + # + # if args.cpu_work: + # log_info(f"Starting CPU and Memory Benchmarks for {args.name}...") + # if core: + # log_debug(f"{args.name} acquired core {core}") + # + # mem_threads=int(10 - 10 * args.percent_cpu) + # [cpu_proc, mem_proc] = cpu_mem_benchmark(cpu_queue=cpu_queue, + # cpu_threads=int(10 * args.percent_cpu), + # mem_threads=mem_threads, + # cpu_work=int32_max**2 if args.time else int(args.cpu_work), + # core=core, + # total_mem=mem_bytes) + # procs.append(cpu_proc) + # if args.time: + # time.sleep(int(args.time)) + # for proc in procs: + # if isinstance(proc, multiprocessing.Process): + # if proc.is_alive(): + # proc.terminate() + # elif isinstance(proc, subprocess.Popen): + # kill_process_and_children(proc) + # else: + # for proc in procs: + # if isinstance(proc, subprocess.Popen): + # proc.wait() + # if io_proc is not None and io_proc.is_alive(): + # #io_proc.terminate() + # io_proc.join() + # + # try: + # kill_process_and_children(mem_proc) + # except subprocess.TimeoutExpired: + # log_debug("Memory process did not terminate; force-killing.") + # # As a fallback, use pkill if any remaining instances are stuck + # subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() + # + # log_debug("Completed CPU and Memory Benchmarks!") + # + # NOTE: If you would like to run only IO add time.sleep(2) + # Check if all procs are done, if not, kill them + # log_debug("Checking if all processes are done...") + # for proc in procs: + # if isinstance(proc, multiprocessing.Process): + # if proc.is_alive(): + # proc.terminate() + # proc.join() + # if isinstance(proc, subprocess.Popen): + # proc.wait() + # + # + # log_info(f"Benchmark {args.name} completed!") if __name__ == "__main__": main() diff --git a/pyproject.toml b/pyproject.toml index b3ceed01..caa985f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,11 +29,11 @@ dependencies = [ "networkx", "numpy", "pandas", + "psutil", "python-dateutil", "requests", "scipy>=1.16.1", "pyyaml", - "pandas", "shortuuid", "stringcase", "filelock", diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 3f45290c..966b7379 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -90,7 +90,8 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= working_dir=working_dir, user="wfcommons", tty=True, - detach=True + detach=True, + init=True # For zombies ) # Installing WfCommons on container @@ -165,4 +166,4 @@ def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow): # sys.stderr.write(f"WORKFLOW2 OUTPUT FILE: {output_file.file_id} {output_file.size}\n") workflow2_output_bytes += output_file.size assert (workflow1_input_bytes == workflow2_input_bytes) - assert (workflow1_output_bytes == workflow2_output_bytes) \ No newline at end of file + assert (workflow1_output_bytes == workflow2_output_bytes)