Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions tests/enroot/batch_scripts/rccl_tests_sbatch.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks=2
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-task=8
#SBATCH --ntasks=16
#SBATCH --ntasks-per-node=8
#SBATCH --gpus-per-task=1
#SBATCH --gpu-bind=closest
#SBATCH --cpus-per-task=24
#SBATCH --distribution=block:block
#SBATCH --job-name=rccl_test
#SBATCH --output=logs/rccl_test_%j.out
#SBATCH --error=logs/rccl_test_%j.err
Expand All @@ -11,6 +14,9 @@ set -e

mkdir -p logs

# Configurable number of iterations with default value
NUM_ITERATIONS=${NUM_ITERATIONS:-10}

# Customizable container image with default value
DOCKER_IMAGE_VERSION=${DOCKER_IMAGE_VERSION:-"ubuntu24_rocm-7.0.2_rccl-7.0.2_anp-v1.2.0_ainic-1.117.5-a-56"}

Expand All @@ -19,7 +25,7 @@ CONTAINER_IMAGE="enroot_rccl-$DOCKER_IMAGE_VERSION.sqsh"
echo "Pulling container image for version: $DOCKER_IMAGE_VERSION and saving to $CONTAINER_IMAGE"

# Pull the image on every allocated node
srun --ntasks-per-node=1 bash -c "
srun --ntasks=2 --ntasks-per-node=1 bash -c "
if [ ! -f \"$PWD/$CONTAINER_IMAGE\" ]; then
echo \"Node \$(hostname): Pulling container image and saving to $CONTAINER_IMAGE\"
if ! enroot import -o \"$PWD/$CONTAINER_IMAGE\" \"docker://rocm/roce-workload:$DOCKER_IMAGE_VERSION\"; then
Expand Down Expand Up @@ -47,12 +53,30 @@ export NCCL_IB_TC=96
export NCCL_IB_FIFO_TC=184
export NCCL_IGNORE_CPU_AFFINITY=1
export NCCL_IB_USE_INLINE=1
export NCCL_NET_OPTIONAL_RECV_COMPLETION=1
export NCCL_NET_OPTIONAL_RECV_COMPLETION=0
export NCCL_TOPO_DUMP_FILE=/tmp/topo_all.txt
export RCCL_GDR_FLUSH_GPU_MEM_NO_RELAXED_ORDERING=0
export OMPI_MCA_btl=^openib
export UCX_UNIFIED_MODE=y
export RCCL_AINIC_ROCE=1
export RCCL_LL128_FORCE_ENABLE=1
export NCCL_IB_PCI_RELAXED_ORDERING=1
export NCCL_NET_PLUGIN=/root/amd-anp/build/librccl-anp.so
export NCCL_DEBUG=VERSION

# Run the test multiple times
echo "Running all_reduce_perf test $NUM_ITERATIONS times"
for i in $(seq 1 $NUM_ITERATIONS); do
echo "=========================================="
echo "Starting iteration $i of $NUM_ITERATIONS"
echo "=========================================="

# Run the command
srun --mpi=pmix \
srun --mpi=pmix \
--container-image="$PWD/$CONTAINER_IMAGE" \
/root/rccl-tests/build/all_reduce_perf -b 16 -e 8G -f 2 -g 8
/root/rccl-tests/build/all_reduce_perf -b 1K -e 16G -f 2 -g 1 -n 20 -w 5 -c 1

echo "Completed iteration $i of $NUM_ITERATIONS"
echo ""
done

echo "All $NUM_ITERATIONS iterations completed successfully"
114 changes: 102 additions & 12 deletions tests/enroot/testsuites/test_enroot.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,7 @@ def test_multi_node_distributed_pytorch():

log.info("\n VALIDATION PASSED (REMOTE COUNTERS)")


def test_multi_node_rccl():
def test_multi_node_rccl():
"""
Use sbatch to run rccl test on multiple nodes

Expand All @@ -506,18 +505,26 @@ def test_multi_node_rccl():
Validation:
1. Verify if sbatch test is completed
2. Verify if the output file - logs/rccl_test_%j.out is created and print that output
2. Verify and print the results
3. Verify and print the results
4. Validate number of bandwidth measurements matches expected iterations
5. Validate average bus bandwidth against threshold (within 5% tolerance)

Raises:
AssertionError: Above validation points are failed
"""

amd_host = pytest.testdata.amd_host[0]
copy_file_list =[]
copy_file_list = []

# Define performance threshold (GB/s)
BANDWIDTH_THRESHOLD = 130.0
TOLERANCE_PERCENT = 5.0

# Create batch script
local_script = batch_scripts_folder / "rccl_tests_sbatch.sh"
remote_script = str(local_script.name)
log.info(f"Creating {local_script.name} on {amd_host.host_ip}...")
exit_code = create_batch_script(amd_host,local_script)
exit_code = create_batch_script(amd_host, local_script)
if exit_code:
assert False, f"{local_script.name} on {amd_host.host_ip} couldnt be created!!"
log.info(f"Creating {local_script.name} on {amd_host.host_ip} - Successfull !!")
Expand All @@ -529,7 +536,7 @@ def test_multi_node_rccl():
log.info(f"sbatch job - {job_id} submitted !!")

# Wait for job completion
job_state, sacct_output = wait_for_job_completion(amd_host,job_id)
job_state, sacct_output = wait_for_job_completion(amd_host, job_id)
log.info(f"Job state of {job_id} : {job_state}")
log.info(f"sacct output : {sacct_output}")
err_file = f"logs/rccl_test_{job_id}.err"
Expand All @@ -544,30 +551,113 @@ def test_multi_node_rccl():
assert False, "RCCL test case failed.. !! "

# Check for output file and print the results
parent_dir="logs"
parent_dir = "logs"
log.info(f"Checking {parent_dir}/ ...")
exit_code, output = amd_host.execute_command(f"cat {output_file} ")
assert not exit_code, f" Error retrieving the file {output_file}!, {output['stderr']}"
log.info(f"Output : ")
log.info(output['stdout'].encode().decode('unicode_escape'))

# Performance validation: Extract iterations count and bandwidth values
output_content = output['stdout']

# Extract expected number of iterations
expected_iterations = extract_num_iterations(output_content)
log.info(f"Expected number of iterations: {expected_iterations}")

# Extract bandwidth values
bandwidth_values = extract_bandwidth_values(output_content)
actual_measurements = len(bandwidth_values)

log.info(f"Iteration Count Validation:")
log.info(f" Expected iterations: {expected_iterations}")
log.info(f" Actual bandwidth measurements found: {actual_measurements}")

# Validate iteration count matches bandwidth measurements
assert expected_iterations is not None, \
"Could not find 'Running all_reduce_perf test' iteration count in output"

assert actual_measurements == expected_iterations, \
f"Iteration count mismatch! Expected {expected_iterations} bandwidth measurements " \
f"but found {actual_measurements}. This indicates incomplete test execution."

log.info(f" Iteration count validation passed!")

# Validate bandwidth values exist
if not bandwidth_values:
assert False, "Performance validation failed: No '# Avg bus bandwidth' entries found in output"

# Calculate average bandwidth
avg_bandwidth = sum(bandwidth_values) / len(bandwidth_values)
min_acceptable_bandwidth = BANDWIDTH_THRESHOLD * (1 - TOLERANCE_PERCENT / 100)

log.info(f"Bandwidth Performance Analysis:")
log.info(f" Bandwidth measurements: {bandwidth_values}")
log.info(f" Average bandwidth: {avg_bandwidth:.3f} GB/s")
log.info(f" Threshold: {BANDWIDTH_THRESHOLD:.3f} GB/s")
log.info(f" Minimum acceptable (threshold - {TOLERANCE_PERCENT}%): {min_acceptable_bandwidth:.3f} GB/s")

assert avg_bandwidth >= min_acceptable_bandwidth, \
f"Performance check failed! Average bandwidth ({avg_bandwidth:.3f} GB/s) is more than {TOLERANCE_PERCENT}% " \
f"below threshold ({BANDWIDTH_THRESHOLD:.3f} GB/s). Minimum acceptable: {min_acceptable_bandwidth:.3f} GB/s"

log.info(f" Performance check passed! Bandwidth is within acceptable range.")

# Copy back results and delete the directory and files
log.info(f"Copying all the results to {str(pytest.testdata.results_dir)}...")

for file in copy_file_list:
local_file = pytest.testdata.results_dir / Path(file).name
exit_code = amd_host.copy_from_host(file,local_file)
exit_code = amd_host.copy_from_host(file, local_file)
assert not exit_code, f" Error copying the file {file} !"
exit_code, output = amd_host.execute_command(f"sudo rm -rf {file}")
assert not exit_code , f" Error deleting the file {file} !, {output['stderr']}"
assert not exit_code, f" Error deleting the file {file} !, {output['stderr']}"

# Remove the parent directory
exit_code, output = amd_host.execute_command(f"sudo rm -rf {parent_dir}")
assert not exit_code, f" Error deleting the folder {parent_dir} !, {output['stderr']}"
assert not exit_code, f" Error deleting the folder {parent_dir} !, {output['stderr']}"

# Delete the batch script on the remote host
exit_code, output = amd_host.execute_command(f"sudo rm -rf {remote_script}")
assert not exit_code , f" Error deleting the script {remote_script}!, {output['stderr']}"
assert not exit_code, f" Error deleting the script {remote_script}!, {output['stderr']}"

def extract_bandwidth_values(output_content):
"""
Extract all bandwidth values from RCCL test output.

Args:
output_content (str): The content of the RCCL test output file

Returns:
list: List of bandwidth values (floats) found in the output
"""
bandwidth_pattern = r'#\s*Avg bus bandwidth\s*:\s*([\d.]+)'
matches = re.findall(bandwidth_pattern, output_content)

# Convert string matches to float values
bandwidth_values = [float(match) for match in matches]

return bandwidth_values

def extract_num_iterations(output_content):
"""
Extract the number of iterations from RCCL test output.
Looks for pattern: "Running all_reduce_perf test $NUM_ITERATIONS times"

Args:
output_content (str): The content of the RCCL test output file

Returns:
int: Number of expected iterations, or None if not found
"""
# Pattern to match "Running all_reduce_perf test X times" where X is a number
iteration_pattern = r'Running all_reduce_perf test\s+(\d+)\s+times'
match = re. search(iteration_pattern, output_content)

if match:
return int(match. group(1))
else:
return None

def teardown_test():
"""
Expand Down
Loading