diff --git a/tests/enroot/batch_scripts/rccl_tests_sbatch.sh b/tests/enroot/batch_scripts/rccl_tests_sbatch.sh index c52dca6..e046521 100644 --- a/tests/enroot/batch_scripts/rccl_tests_sbatch.sh +++ b/tests/enroot/batch_scripts/rccl_tests_sbatch.sh @@ -1,8 +1,11 @@ #!/bin/bash #SBATCH --nodes=2 -#SBATCH --ntasks=2 -#SBATCH --ntasks-per-node=1 -#SBATCH --gpus-per-task=8 +#SBATCH --ntasks=16 +#SBATCH --ntasks-per-node=8 +#SBATCH --gpus-per-task=1 +#SBATCH --gpu-bind=closest +#SBATCH --cpus-per-task=24 +#SBATCH --distribution=block:block #SBATCH --job-name=rccl_test #SBATCH --output=logs/rccl_test_%j.out #SBATCH --error=logs/rccl_test_%j.err @@ -11,6 +14,9 @@ set -e mkdir -p logs +# Configurable number of iterations with default value +NUM_ITERATIONS=${NUM_ITERATIONS:-10} + # Customizable container image with default value DOCKER_IMAGE_VERSION=${DOCKER_IMAGE_VERSION:-"ubuntu24_rocm-7.0.2_rccl-7.0.2_anp-v1.2.0_ainic-1.117.5-a-56"} @@ -19,7 +25,7 @@ CONTAINER_IMAGE="enroot_rccl-$DOCKER_IMAGE_VERSION.sqsh" echo "Pulling container image for version: $DOCKER_IMAGE_VERSION and saving to $CONTAINER_IMAGE" # Pull the image on every allocated node -srun --ntasks-per-node=1 bash -c " +srun --ntasks=2 --ntasks-per-node=1 bash -c " if [ ! -f \"$PWD/$CONTAINER_IMAGE\" ]; then echo \"Node \$(hostname): Pulling container image and saving to $CONTAINER_IMAGE\" if ! enroot import -o \"$PWD/$CONTAINER_IMAGE\" \"docker://rocm/roce-workload:$DOCKER_IMAGE_VERSION\"; then @@ -47,12 +53,30 @@ export NCCL_IB_TC=96 export NCCL_IB_FIFO_TC=184 export NCCL_IGNORE_CPU_AFFINITY=1 export NCCL_IB_USE_INLINE=1 -export NCCL_NET_OPTIONAL_RECV_COMPLETION=1 +export NCCL_NET_OPTIONAL_RECV_COMPLETION=0 export NCCL_TOPO_DUMP_FILE=/tmp/topo_all.txt export RCCL_GDR_FLUSH_GPU_MEM_NO_RELAXED_ORDERING=0 export OMPI_MCA_btl=^openib +export UCX_UNIFIED_MODE=y +export RCCL_AINIC_ROCE=1 +export RCCL_LL128_FORCE_ENABLE=1 +export NCCL_IB_PCI_RELAXED_ORDERING=1 +export NCCL_NET_PLUGIN=/root/amd-anp/build/librccl-anp.so +export NCCL_DEBUG=VERSION + +# Run the test multiple times +echo "Running all_reduce_perf test $NUM_ITERATIONS times" +for i in $(seq 1 $NUM_ITERATIONS); do + echo "==========================================" + echo "Starting iteration $i of $NUM_ITERATIONS" + echo "==========================================" -# Run the command -srun --mpi=pmix \ + srun --mpi=pmix \ --container-image="$PWD/$CONTAINER_IMAGE" \ - /root/rccl-tests/build/all_reduce_perf -b 16 -e 8G -f 2 -g 8 + /root/rccl-tests/build/all_reduce_perf -b 1K -e 16G -f 2 -g 1 -n 20 -w 5 -c 1 + + echo "Completed iteration $i of $NUM_ITERATIONS" + echo "" +done + +echo "All $NUM_ITERATIONS iterations completed successfully" \ No newline at end of file diff --git a/tests/enroot/testsuites/test_enroot.py b/tests/enroot/testsuites/test_enroot.py index 9c816e4..3eb338c 100644 --- a/tests/enroot/testsuites/test_enroot.py +++ b/tests/enroot/testsuites/test_enroot.py @@ -492,8 +492,7 @@ def test_multi_node_distributed_pytorch(): log.info("\n VALIDATION PASSED (REMOTE COUNTERS)") - -def test_multi_node_rccl(): +def test_multi_node_rccl(): """ Use sbatch to run rccl test on multiple nodes @@ -506,18 +505,26 @@ def test_multi_node_rccl(): Validation: 1. Verify if sbatch test is completed 2. Verify if the output file - logs/rccl_test_%j.out is created and print that output - 2. Verify and print the results + 3. Verify and print the results + 4. Validate number of bandwidth measurements matches expected iterations + 5. Validate average bus bandwidth against threshold (within 5% tolerance) + Raises: AssertionError: Above validation points are failed """ amd_host = pytest.testdata.amd_host[0] - copy_file_list =[] + copy_file_list = [] + + # Define performance threshold (GB/s) + BANDWIDTH_THRESHOLD = 130.0 + TOLERANCE_PERCENT = 5.0 + # Create batch script local_script = batch_scripts_folder / "rccl_tests_sbatch.sh" remote_script = str(local_script.name) log.info(f"Creating {local_script.name} on {amd_host.host_ip}...") - exit_code = create_batch_script(amd_host,local_script) + exit_code = create_batch_script(amd_host, local_script) if exit_code: assert False, f"{local_script.name} on {amd_host.host_ip} couldnt be created!!" log.info(f"Creating {local_script.name} on {amd_host.host_ip} - Successfull !!") @@ -529,7 +536,7 @@ def test_multi_node_rccl(): log.info(f"sbatch job - {job_id} submitted !!") # Wait for job completion - job_state, sacct_output = wait_for_job_completion(amd_host,job_id) + job_state, sacct_output = wait_for_job_completion(amd_host, job_id) log.info(f"Job state of {job_id} : {job_state}") log.info(f"sacct output : {sacct_output}") err_file = f"logs/rccl_test_{job_id}.err" @@ -544,30 +551,113 @@ def test_multi_node_rccl(): assert False, "RCCL test case failed.. !! " # Check for output file and print the results - parent_dir="logs" + parent_dir = "logs" log.info(f"Checking {parent_dir}/ ...") exit_code, output = amd_host.execute_command(f"cat {output_file} ") assert not exit_code, f" Error retrieving the file {output_file}!, {output['stderr']}" log.info(f"Output : ") log.info(output['stdout'].encode().decode('unicode_escape')) + + # Performance validation: Extract iterations count and bandwidth values + output_content = output['stdout'] + + # Extract expected number of iterations + expected_iterations = extract_num_iterations(output_content) + log.info(f"Expected number of iterations: {expected_iterations}") + + # Extract bandwidth values + bandwidth_values = extract_bandwidth_values(output_content) + actual_measurements = len(bandwidth_values) + + log.info(f"Iteration Count Validation:") + log.info(f" Expected iterations: {expected_iterations}") + log.info(f" Actual bandwidth measurements found: {actual_measurements}") + + # Validate iteration count matches bandwidth measurements + assert expected_iterations is not None, \ + "Could not find 'Running all_reduce_perf test' iteration count in output" + + assert actual_measurements == expected_iterations, \ + f"Iteration count mismatch! Expected {expected_iterations} bandwidth measurements " \ + f"but found {actual_measurements}. This indicates incomplete test execution." + + log.info(f" Iteration count validation passed!") + + # Validate bandwidth values exist + if not bandwidth_values: + assert False, "Performance validation failed: No '# Avg bus bandwidth' entries found in output" + + # Calculate average bandwidth + avg_bandwidth = sum(bandwidth_values) / len(bandwidth_values) + min_acceptable_bandwidth = BANDWIDTH_THRESHOLD * (1 - TOLERANCE_PERCENT / 100) + + log.info(f"Bandwidth Performance Analysis:") + log.info(f" Bandwidth measurements: {bandwidth_values}") + log.info(f" Average bandwidth: {avg_bandwidth:.3f} GB/s") + log.info(f" Threshold: {BANDWIDTH_THRESHOLD:.3f} GB/s") + log.info(f" Minimum acceptable (threshold - {TOLERANCE_PERCENT}%): {min_acceptable_bandwidth:.3f} GB/s") + + assert avg_bandwidth >= min_acceptable_bandwidth, \ + f"Performance check failed! Average bandwidth ({avg_bandwidth:.3f} GB/s) is more than {TOLERANCE_PERCENT}% " \ + f"below threshold ({BANDWIDTH_THRESHOLD:.3f} GB/s). Minimum acceptable: {min_acceptable_bandwidth:.3f} GB/s" + + log.info(f" Performance check passed! Bandwidth is within acceptable range.") # Copy back results and delete the directory and files log.info(f"Copying all the results to {str(pytest.testdata.results_dir)}...") - + for file in copy_file_list: local_file = pytest.testdata.results_dir / Path(file).name - exit_code = amd_host.copy_from_host(file,local_file) + exit_code = amd_host.copy_from_host(file, local_file) assert not exit_code, f" Error copying the file {file} !" exit_code, output = amd_host.execute_command(f"sudo rm -rf {file}") - assert not exit_code , f" Error deleting the file {file} !, {output['stderr']}" + assert not exit_code, f" Error deleting the file {file} !, {output['stderr']}" # Remove the parent directory exit_code, output = amd_host.execute_command(f"sudo rm -rf {parent_dir}") - assert not exit_code, f" Error deleting the folder {parent_dir} !, {output['stderr']}" + assert not exit_code, f" Error deleting the folder {parent_dir} !, {output['stderr']}" # Delete the batch script on the remote host exit_code, output = amd_host.execute_command(f"sudo rm -rf {remote_script}") - assert not exit_code , f" Error deleting the script {remote_script}!, {output['stderr']}" + assert not exit_code, f" Error deleting the script {remote_script}!, {output['stderr']}" + +def extract_bandwidth_values(output_content): + """ + Extract all bandwidth values from RCCL test output. + + Args: + output_content (str): The content of the RCCL test output file + + Returns: + list: List of bandwidth values (floats) found in the output + """ + bandwidth_pattern = r'#\s*Avg bus bandwidth\s*:\s*([\d.]+)' + matches = re.findall(bandwidth_pattern, output_content) + + # Convert string matches to float values + bandwidth_values = [float(match) for match in matches] + + return bandwidth_values + +def extract_num_iterations(output_content): + """ + Extract the number of iterations from RCCL test output. + Looks for pattern: "Running all_reduce_perf test $NUM_ITERATIONS times" + + Args: + output_content (str): The content of the RCCL test output file + + Returns: + int: Number of expected iterations, or None if not found + """ + # Pattern to match "Running all_reduce_perf test X times" where X is a number + iteration_pattern = r'Running all_reduce_perf test\s+(\d+)\s+times' + match = re. search(iteration_pattern, output_content) + + if match: + return int(match. group(1)) + else: + return None def teardown_test(): """