diff --git a/.github/workflows/fuzz.yml b/.github/workflows/fuzz.yml index 1805974c03c..6900e6ed2be 100644 --- a/.github/workflows/fuzz.yml +++ b/.github/workflows/fuzz.yml @@ -1,15 +1,10 @@ name: Fuzz -concurrency: - # The group causes runs to queue instead of running in parallel. - group: fuzz - # This ensures each run builds on the previous run's corpus discoveries rather than losing them to - # failed compare-and-swap uploads. - cancel-in-progress: false +# No concurrency group — runs are allowed to overlap. We have safe corpus merge on: schedule: - - cron: "0 * * * *" # every hour + - cron: "0 */6 * * *" # every 6 hours workflow_dispatch: { } jobs: diff --git a/.github/workflows/minimize_fuzz_corpus.yml b/.github/workflows/minimize_fuzz_corpus.yml index d4a963a6f62..f87ca048a17 100644 --- a/.github/workflows/minimize_fuzz_corpus.yml +++ b/.github/workflows/minimize_fuzz_corpus.yml @@ -1,8 +1,7 @@ name: Minimize All Fuzz Corpora -concurrency: - group: fuzz - cancel-in-progress: false +# No concurrency group — runs are allowed to overlap. We have safe corpus merge + on: schedule: diff --git a/.github/workflows/minimize_fuzz_corpus_workflow.yml b/.github/workflows/minimize_fuzz_corpus_workflow.yml index e3b2a9c11e6..66f401f1d73 100644 --- a/.github/workflows/minimize_fuzz_corpus_workflow.yml +++ b/.github/workflows/minimize_fuzz_corpus_workflow.yml @@ -83,6 +83,12 @@ jobs: exit 0 fi + - name: Record original corpus files + run: | + CORPUS_DIR="fuzz/corpus/${{ inputs.fuzz_target }}" + ls "$CORPUS_DIR/" | sort > /tmp/original_files.txt + echo "Original corpus: $(wc -l < /tmp/original_files.txt) files" + - name: Minimize corpus run: | FEATURES_FLAG="" @@ -94,10 +100,11 @@ jobs: mkdir -p "$MINIMIZED_DIR" cargo +$NIGHTLY_TOOLCHAIN fuzz cmin $FEATURES_FLAG \ ${{ inputs.fuzz_target }} "$CORPUS_DIR" -- "$MINIMIZED_DIR" + # Replace original with minimized for upload rm -rf "$CORPUS_DIR" mv "$MINIMIZED_DIR" "$CORPUS_DIR" - - name: Persist corpus + - name: Merge and persist minimized corpus shell: bash env: AWS_ACCESS_KEY_ID: ${{ secrets.R2_FUZZ_ACCESS_KEY_ID }} @@ -105,7 +112,9 @@ jobs: AWS_REGION: "us-east-1" AWS_ENDPOINT_URL: "https://01e9655179bbec953276890b183039bc.r2.cloudflarestorage.com" run: | - CORPUS_KEY="${{ inputs.fuzz_target }}_corpus.tar.zst" - CORPUS_DIR="fuzz/corpus/${{ inputs.fuzz_target }}" - tar -acf "$CORPUS_KEY" "$CORPUS_DIR" - python3 scripts/s3-upload.py --bucket vortex-fuzz-corpus --key "$CORPUS_KEY" --body "$CORPUS_KEY" --checksum-algorithm CRC32 + python3 scripts/s3-corpus-merge-upload.py \ + --bucket vortex-fuzz-corpus \ + --key "${{ inputs.fuzz_target }}_corpus.tar.zst" \ + --corpus-dir "fuzz/corpus/${{ inputs.fuzz_target }}" \ + --original-snapshot /tmp/original_files.txt \ + --checksum-algorithm CRC32 diff --git a/.github/workflows/run-fuzzer.yml b/.github/workflows/run-fuzzer.yml index 55dc3268d3b..d8592074c68 100644 --- a/.github/workflows/run-fuzzer.yml +++ b/.github/workflows/run-fuzzer.yml @@ -16,7 +16,7 @@ on: description: "Maximum fuzzing time in seconds" required: false type: number - default: 2700 + default: 18000 runner: description: "Runner name from .github-private runs-on.yml (e.g., arm64-medium, gpu)" required: false @@ -61,7 +61,7 @@ jobs: name: "Run ${{ inputs.fuzz_name || inputs.fuzz_target }}" env: FUZZ_NAME: ${{ inputs.fuzz_name || inputs.fuzz_target }} - timeout-minutes: 240 # 4 hours + timeout-minutes: 370 # 6 hours 10 minutes runs-on: >- ${{ github.repository == 'vortex-data/vortex' && format('runs-on={0}/runner={1}/disk=large/tag={2}-fuzz', github.run_id, inputs.runner, inputs.fuzz_name || inputs.fuzz_target) @@ -113,6 +113,45 @@ jobs: mkdir -p "$CORPUS_DIR" fi + - name: Capture GPU diagnostics (pre-run) + if: inputs.runner == 'gpu' || contains(inputs.extra_features, 'cuda') + shell: bash + run: | + { + echo "===== GPU diagnostics (pre-run) =====" + echo "phase=pre-run" + echo "timestamp=$(date -u --iso-8601=seconds)" + echo "hostname=$(hostname)" + echo "uname=$(uname -a)" + echo "CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-}" + echo "NVIDIA_VISIBLE_DEVICES=${NVIDIA_VISIBLE_DEVICES:-}" + echo + echo "## CUDA/NVIDIA environment" + env | sort | grep -E '^(CUDA|NVIDIA|LD_LIBRARY_PATH|LIBRARY_PATH|PATH=|CARGO_TARGET_|RUSTFLAGS|RUSTC_WRAPPER|ASAN_OPTIONS|UBSAN_OPTIONS|LSAN_OPTIONS|TSAN_OPTIONS)=' || true + echo + echo "## nvcc location" + which nvcc || true + echo + echo "## nvcc --version" + nvcc --version || true + echo + echo "## nvidia-smi" + nvidia-smi || true + echo + echo "## nvidia-smi -L" + nvidia-smi -L || true + echo + echo "## GPU memory details" + nvidia-smi -q -d Memory || true + echo + echo "## GPU summary" + nvidia-smi --query-gpu=index,uuid,name,driver_version,memory.total,memory.used,memory.free,utilization.gpu,temperature.gpu --format=csv || true + echo + echo "## Active compute processes" + nvidia-smi --query-compute-apps=gpu_uuid,pid,process_name,used_memory --format=csv,noheader || true + echo + } + - name: Run fuzzing target id: fuzz run: | @@ -129,9 +168,36 @@ jobs: $FEATURES_FLAG \ ${{ inputs.fuzz_target }} -- \ $FORK_FLAG -max_total_time=${{ inputs.max_time }} -rss_limit_mb=0 \ - 2>&1 | tee fuzz_output.log + 2>&1 | tee -a fuzz_output.log continue-on-error: true + - name: Capture GPU diagnostics (post-run) + if: always() && (inputs.runner == 'gpu' || contains(inputs.extra_features, 'cuda')) + shell: bash + run: | + { + echo "===== GPU diagnostics (post-run) =====" + echo "phase=post-run" + echo "timestamp=$(date -u --iso-8601=seconds)" + echo "hostname=$(hostname)" + echo "uname=$(uname -a)" + echo "CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-}" + echo "NVIDIA_VISIBLE_DEVICES=${NVIDIA_VISIBLE_DEVICES:-}" + echo + echo "## nvidia-smi" + nvidia-smi || true + echo + echo "## GPU memory details" + nvidia-smi -q -d Memory || true + echo + echo "## GPU summary" + nvidia-smi --query-gpu=index,uuid,name,driver_version,memory.total,memory.used,memory.free,utilization.gpu,temperature.gpu --format=csv || true + echo + echo "## Active compute processes" + nvidia-smi --query-compute-apps=gpu_uuid,pid,process_name,used_memory --format=csv,noheader || true + echo + } + - name: Check for crashes id: check run: | @@ -168,7 +234,7 @@ jobs: $FEATURES_FLAG \ ${{ inputs.fuzz_target }} \ "${{ steps.check.outputs.first_crash }}" \ - 2>&1 | tee fuzz_output.log || true + 2>&1 | tee -a fuzz_output.log || true - name: Archive crash artifacts id: upload_artifacts @@ -187,7 +253,8 @@ jobs: path: fuzz_output.log retention-days: 90 - - name: Persist corpus + - name: Merge and persist corpus + if: always() shell: bash env: AWS_ACCESS_KEY_ID: ${{ secrets.R2_FUZZ_ACCESS_KEY_ID }} @@ -195,12 +262,15 @@ jobs: AWS_REGION: "us-east-1" AWS_ENDPOINT_URL: "https://01e9655179bbec953276890b183039bc.r2.cloudflarestorage.com" run: | - CORPUS_KEY="${FUZZ_NAME}_corpus.tar.zst" - CORPUS_DIR="fuzz/corpus/${FUZZ_NAME}" - - tar -acf "$CORPUS_KEY" "$CORPUS_DIR" - - python3 scripts/s3-upload.py --bucket vortex-fuzz-corpus --key "$CORPUS_KEY" --body "$CORPUS_KEY" --checksum-algorithm CRC32 --optimistic-lock + if [ "${GITHUB_REF}" != "refs/heads/develop" ]; then + echo "Skipping corpus upload for ${GITHUB_REF}; only develop updates the shared fuzz corpus." + exit 0 + fi + python3 scripts/s3-corpus-merge-upload.py \ + --bucket vortex-fuzz-corpus \ + --key "${FUZZ_NAME}_corpus.tar.zst" \ + --corpus-dir "fuzz/corpus/${FUZZ_NAME}" \ + --checksum-algorithm CRC32 - name: Fail job if fuzz run found a bug if: steps.check.outputs.crashes_found == 'true' diff --git a/fuzz/fuzz_targets/compress_gpu.rs b/fuzz/fuzz_targets/compress_gpu.rs index f88df52d2d5..9f917470781 100644 --- a/fuzz/fuzz_targets/compress_gpu.rs +++ b/fuzz/fuzz_targets/compress_gpu.rs @@ -4,13 +4,156 @@ #![no_main] #![expect(clippy::unwrap_used)] +use std::env; +use std::fs; +use std::process::Command; +use std::sync::OnceLock; + use libfuzzer_sys::Corpus; use libfuzzer_sys::fuzz_target; use vortex_error::vortex_panic; use vortex_fuzz::FuzzCompressGpu; use vortex_fuzz::run_compress_gpu; +static STARTUP_DIAGNOSTICS: OnceLock<()> = OnceLock::new(); + +fn log_command(label: &str, command: &str, args: &[&str]) { + eprintln!("## {label}"); + match Command::new(command).args(args).output() { + Ok(output) => { + eprintln!("status: {}", output.status); + + let stdout = String::from_utf8_lossy(&output.stdout); + if !stdout.trim().is_empty() { + eprintln!("stdout:\n{stdout}"); + } + + let stderr = String::from_utf8_lossy(&output.stderr); + if !stderr.trim().is_empty() { + eprintln!("stderr:\n{stderr}"); + } + } + Err(err) => eprintln!("failed to run `{command}`: {err}"), + } +} + +fn log_relevant_processes(pid: u32) { + eprintln!("## relevant processes"); + match Command::new("ps") + .args(["-Ao", "pid,ppid,pgid,comm,args"]) + .output() + { + Ok(output) => { + eprintln!("status: {}", output.status); + let pid = pid.to_string(); + let stdout = String::from_utf8_lossy(&output.stdout); + for line in stdout.lines().filter(|line| { + line.contains(&pid) + || line.contains("compress_gpu") + || line.contains("cargo") + || line.contains("libFuzzer") + }) { + eprintln!("{line}"); + } + + let stderr = String::from_utf8_lossy(&output.stderr); + if !stderr.trim().is_empty() { + eprintln!("stderr:\n{stderr}"); + } + } + Err(err) => eprintln!("failed to run `ps`: {err}"), + } +} + +fn log_process_snapshot() { + let pid = std::process::id(); + eprintln!("pid={pid}"); + eprintln!("argv={:?}", env::args().collect::>()); + + if let Ok(status) = fs::read_to_string("/proc/self/status") { + let interesting = status + .lines() + .filter(|line| { + matches!( + line.split_once(':').map(|(key, _)| key), + Some("Name") + | Some("State") + | Some("Pid") + | Some("PPid") + | Some("Threads") + | Some("VmPeak") + | Some("VmSize") + | Some("VmRSS") + | Some("VmData") + | Some("VmSwap") + ) + }) + .collect::>(); + + if !interesting.is_empty() { + eprintln!("## /proc/self/status"); + for line in interesting { + eprintln!("{line}"); + } + } + } + + let pid_arg = pid.to_string(); + log_command( + "current process ps", + "ps", + &[ + "-p", + pid_arg.as_str(), + "-o", + "pid,ppid,pgid,rss,vsz,etimes,comm,args", + ], + ); + log_relevant_processes(pid); +} + +fn log_cuda_diagnostics(phase: &str) { + eprintln!("===== compress_gpu CUDA diagnostics ({phase}) ====="); + eprintln!("cuda_available()={}", vortex_cuda::cuda_available()); + log_process_snapshot(); + eprintln!( + "CUDA_VISIBLE_DEVICES={}", + env::var("CUDA_VISIBLE_DEVICES").unwrap_or_else(|_| "".to_string()) + ); + eprintln!( + "NVIDIA_VISIBLE_DEVICES={}", + env::var("NVIDIA_VISIBLE_DEVICES").unwrap_or_else(|_| "".to_string()) + ); + eprintln!( + "LD_LIBRARY_PATH={}", + env::var("LD_LIBRARY_PATH").unwrap_or_else(|_| "".to_string()) + ); + + log_command("nvcc --version", "nvcc", &["--version"]); + log_command("nvidia-smi", "nvidia-smi", &[]); + log_command("nvidia-smi -L", "nvidia-smi", &["-L"]); + log_command("nvidia-smi memory", "nvidia-smi", &["-q", "-d", "Memory"]); + log_command( + "nvidia-smi gpu summary", + "nvidia-smi", + &[ + "--query-gpu=index,uuid,name,driver_version,memory.total,memory.used,memory.free,utilization.gpu,temperature.gpu", + "--format=csv", + ], + ); + log_command( + "nvidia-smi compute processes", + "nvidia-smi", + &[ + "--query-compute-apps=gpu_uuid,pid,process_name,used_memory", + "--format=csv,noheader", + ], + ); +} + fuzz_target!(|fuzz: FuzzCompressGpu| -> Corpus { + STARTUP_DIAGNOSTICS.get_or_init(|| log_cuda_diagnostics("startup")); + // Use tokio runtime to run async GPU fuzzer let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -21,6 +164,7 @@ fuzz_target!(|fuzz: FuzzCompressGpu| -> Corpus { Ok(true) => Corpus::Keep, Ok(false) => Corpus::Reject, Err(e) => { + log_cuda_diagnostics("error"); vortex_panic!("{e}"); } } diff --git a/scripts/s3-corpus-merge-upload.py b/scripts/s3-corpus-merge-upload.py new file mode 100644 index 00000000000..bdc810fa581 --- /dev/null +++ b/scripts/s3-corpus-merge-upload.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +"""Merge a local corpus with the remote corpus in S3 and upload using etag-based CAS. + +In fuzz mode (default), the result is the union of local and remote files. +In minimize mode (--original-snapshot), the result is the local (minimized) files +plus any remote files that were NOT in the original snapshot (i.e. new discoveries +from concurrent fuzz runs). +""" + +import argparse +import os +import shutil +import subprocess +import sys +import tempfile +import time + + +def head_etag(bucket: str, key: str) -> str | None: + """Fetch the current ETag for an object, or None if it doesn't exist.""" + result = subprocess.run( + [ + "aws", "s3api", "head-object", + "--bucket", bucket, + "--key", key, + "--query", "ETag", + "--output", "text", + ], + capture_output=True, + text=True, + ) + if result.returncode != 0: + return None + etag = result.stdout.strip() + if not etag or etag == "null": + return None + return etag + + +def get_object(bucket: str, key: str, output: str) -> bool: + """Download an object from S3.""" + result = subprocess.run( + ["aws", "s3api", "get-object", "--bucket", bucket, "--key", key, output], + capture_output=True, + ) + return result.returncode == 0 + + +def put_object( + bucket: str, + key: str, + body: str, + checksum_algorithm: str | None, + if_match: str | None, +) -> bool: + """Upload an object, returning True on success.""" + cmd = [ + "aws", "s3api", "put-object", + "--bucket", bucket, + "--key", key, + "--body", body, + ] + if checksum_algorithm: + cmd.extend(["--checksum-algorithm", checksum_algorithm]) + if if_match: + cmd.extend(["--if-match", if_match]) + result = subprocess.run(cmd, capture_output=True) + return result.returncode == 0 + + +def list_files(directory: str) -> set[str]: + """List regular file names (not paths) in a directory.""" + if not os.path.isdir(directory): + return set() + return {f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))} + + +def main(): + parser = argparse.ArgumentParser( + description="Merge local corpus with remote and upload with etag CAS", + ) + parser.add_argument("--bucket", required=True, help="S3 bucket name") + parser.add_argument("--key", required=True, help="S3 object key for the corpus tar") + parser.add_argument( + "--corpus-dir", required=True, + help="Local corpus directory (also used as the path inside the tar)", + ) + parser.add_argument( + "--original-snapshot", + help="File listing original corpus filenames, one per line (minimize mode)", + ) + parser.add_argument("--checksum-algorithm", help="Checksum algorithm (e.g. CRC32)") + parser.add_argument("--timeout", type=int, default=300, help="Maximum total retry time in seconds") + args = parser.parse_args() + + local_files = list_files(args.corpus_dir) + print(f"Local corpus: {len(local_files)} files") + + original_files: set[str] | None = None + if args.original_snapshot: + with open(args.original_snapshot) as f: + original_files = {line.strip() for line in f if line.strip()} + print(f"Original snapshot: {len(original_files)} files") + + deadline = time.monotonic() + args.timeout + attempt = 0 + while True: + attempt += 1 + ok = _try_merge_upload(args, local_files, original_files, attempt) + if ok: + return + + remaining = deadline - time.monotonic() + if remaining <= 0: + break + + # Exponential backoff for first 3 attempts (2s, 4s, 8s), then 10s polling + delay = min(2**attempt, 10) if attempt <= 3 else 10 + delay = min(delay, remaining) + print( + f"ETag conflict (attempt {attempt}), retrying in {delay:.0f}s... " + f"({remaining:.0f}s remaining)", + file=sys.stderr, + ) + time.sleep(delay) + + print(f"Corpus merge-upload failed after {attempt} attempts ({args.timeout}s timeout)", file=sys.stderr) + sys.exit(1) + + +def _try_merge_upload( + args: argparse.Namespace, + local_files: set[str], + original_files: set[str] | None, + attempt: int, +) -> bool: + """Single attempt: download remote, merge, upload with etag CAS. Returns True on success.""" + with tempfile.TemporaryDirectory() as merge_dir: + merge_corpus = os.path.join(merge_dir, args.corpus_dir) + os.makedirs(merge_corpus, exist_ok=True) + + # Start with local files + for f in local_files: + shutil.copy2(os.path.join(args.corpus_dir, f), os.path.join(merge_corpus, f)) + + # Download remote corpus and get its etag + etag = head_etag(args.bucket, args.key) + if etag: + remote_tar = tempfile.mktemp(suffix=".tar.zst") + try: + if get_object(args.bucket, args.key, remote_tar): + remote_extract = tempfile.mkdtemp() + subprocess.run(["tar", "-xf", remote_tar, "-C", remote_extract], check=False) + remote_corpus = os.path.join(remote_extract, args.corpus_dir) + remote_files = list_files(remote_corpus) + + if original_files is not None: + # Minimize mode: only add files that are genuinely new + new_files = remote_files - original_files + print(f"Preserving {len(new_files)} new entries from concurrent runs") + files_to_add = new_files + else: + # Fuzz mode: union merge + files_to_add = remote_files + + for f in files_to_add: + dest = os.path.join(merge_corpus, f) + if not os.path.exists(dest): + shutil.copy2(os.path.join(remote_corpus, f), dest) + + shutil.rmtree(remote_extract, ignore_errors=True) + finally: + if os.path.exists(remote_tar): + os.unlink(remote_tar) + + merged_count = len(list_files(merge_corpus)) + print(f"Merged corpus: {merged_count} files (attempt {attempt})") + + # Tar and upload with etag CAS + merged_tar = tempfile.mktemp(suffix=".tar.zst") + try: + subprocess.run( + ["tar", "-acf", merged_tar, "-C", merge_dir, args.corpus_dir], + check=True, + ) + if put_object(args.bucket, args.key, merged_tar, args.checksum_algorithm, etag): + print("Corpus merged and uploaded successfully.") + return True + finally: + if os.path.exists(merged_tar): + os.unlink(merged_tar) + + return False + + +if __name__ == "__main__": + main()