Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"vortex-duckdb",
"vortex-cuda",
"vortex-cuda/cub",
"vortex-cuda/gpu-scan-bench",
"vortex-cuda/gpu-scan-cli",
"vortex-cuda/macros",
"vortex-cuda/nvcomp",
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub fn format_to_df_format(format: Format) -> Arc<dyn FileFormat> {
Format::Csv => Arc::new(CsvFormat::default()) as _,
Format::Arrow => Arc::new(ArrowFormat),
Format::Parquet => Arc::new(ParquetFormat::new()),
Format::OnDiskVortex | Format::VortexCompact => {
Format::OnDiskVortex | Format::VortexCompact | Format::VortexCuda => {
Arc::new(VortexFormat::new(SESSION.clone()))
}
Format::OnDiskDuckDB | Format::Lance => {
Expand Down
14 changes: 13 additions & 1 deletion benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ async fn main() -> anyhow::Result<()> {
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact)
.await?;
}
Format::VortexCuda => {
convert_parquet_directory_to_vortex(
&base_path,
CompactionStrategy::CudaCompatible,
)
.await?;
}
_ => {}
}
}
Expand Down Expand Up @@ -233,7 +240,12 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
) -> anyhow::Result<()> {
match format {
Format::Arrow => register_arrow_tables(session, benchmark).await,
_ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => {
_ if use_scan_api()
&& matches!(
format,
Format::OnDiskVortex | Format::VortexCompact | Format::VortexCuda
) =>
{
register_v2_tables(session, benchmark, format).await
}
_ => {
Expand Down
9 changes: 9 additions & 0 deletions vortex-bench/src/bin/data-gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ async fn main() -> anyhow::Result<()> {
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact).await?;
}

if args
.formats
.iter()
.any(|f| matches!(f, Format::VortexCuda))
{
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::CudaCompatible)
.await?;
}

if args
.formats
.iter()
Expand Down
1 change: 1 addition & 0 deletions vortex-bench/src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub async fn convert_parquet_directory_to_vortex(
) -> anyhow::Result<()> {
let (format, dir_name) = match compaction {
CompactionStrategy::Compact => (Format::VortexCompact, Format::VortexCompact.name()),
CompactionStrategy::CudaCompatible => (Format::VortexCuda, Format::VortexCuda.name()),
CompactionStrategy::Default => (Format::OnDiskVortex, Format::OnDiskVortex.name()),
};

Expand Down
13 changes: 13 additions & 0 deletions vortex-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ pub enum Format {
#[clap(name = "vortex-compact")]
#[serde(rename = "vortex-compact")]
VortexCompact,
#[clap(name = "vortex-cuda")]
#[serde(rename = "vortex-cuda")]
VortexCuda,
#[clap(name = "duckdb")]
#[serde(rename = "duckdb")]
OnDiskDuckDB,
Expand Down Expand Up @@ -176,6 +179,7 @@ impl Format {
Format::Parquet => "parquet",
Format::OnDiskVortex => "vortex-file-compressed",
Format::VortexCompact => "vortex-compact",
Format::VortexCuda => "vortex-cuda",
Format::OnDiskDuckDB => "duckdb",
Format::Lance => "lance",
}
Expand All @@ -188,6 +192,7 @@ impl Format {
Format::Parquet => "parquet",
Format::OnDiskVortex => "vortex",
Format::VortexCompact => "vortex",
Format::VortexCuda => "vortex",
Format::OnDiskDuckDB => "duckdb",
Format::Lance => "lance",
}
Expand Down Expand Up @@ -222,18 +227,26 @@ impl Display for Engine {
#[derive(Debug, Clone, Copy, Default)]
pub enum CompactionStrategy {
Compact,
CudaCompatible,
#[default]
Default,
}

impl CompactionStrategy {
pub fn apply_options(&self, options: VortexWriteOptions) -> VortexWriteOptions {
const CUDA_COALESCING_TARGET_BYTES: u64 = 128 * 1024 * 1024;
match self {
CompactionStrategy::Compact => options.with_strategy(
WriteStrategyBuilder::default()
.with_compact_encodings()
.build(),
),
CompactionStrategy::CudaCompatible => options.with_strategy(
WriteStrategyBuilder::default()
.with_cuda_compatible_encodings()
.with_coalescing_block_size(CUDA_COALESCING_TARGET_BYTES)
.build(),
),
CompactionStrategy::Default => options,
}
}
Expand Down
43 changes: 43 additions & 0 deletions vortex-cuda/gpu-scan-bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[package]
name = "gpu-scan-bench"
authors = { workspace = true }
description = "CUDA GPU scan benchmarks for S3/NVMe"
edition = { workspace = true }
homepage = { workspace = true }
include = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
publish = false
repository = { workspace = true }
rust-version = { workspace = true }
version = { workspace = true }

[lints]
workspace = true

[[bin]]
name = "gpu-scan-bench"
path = "src/main.rs"

[[bin]]
name = "resegment"
path = "src/resegment.rs"

[[bin]]
name = "io-bisect"
path = "src/io_bisect.rs"

[dependencies]
anyhow = { workspace = true }
cudarc = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true, features = ["executor"] }
object_store = { workspace = true, features = ["aws", "fs"] }
tokio = { workspace = true, features = ["macros", "full"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-perfetto = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
url = { workspace = true }
vortex = { workspace = true, features = ["tokio", "zstd"] }
vortex-cuda = { workspace = true, features = ["_test-harness", "unstable_encodings"] }
vortex-cuda-macros = { workspace = true }
111 changes: 111 additions & 0 deletions vortex-cuda/gpu-scan-bench/bench_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "cudf-cu12",
# "s3fs",
# ]
#
# [tool.uv]
# extra-index-url = ["https://pypi.nvidia.com"]
# ///
#
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright the Vortex contributors
#
# Benchmark reading a Parquet file into GPU memory using cuDF.
# This serves as the baseline for comparing against Vortex GPU scans.
#
# Usage:
# uv run bench_parquet.py dataset.parquet --iterations 5
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a standalone uv script to do the same scan we do on gpu-scan-bench above, but for parquet instead of vortex


import argparse
import sys
import time


def main():
parser = argparse.ArgumentParser(
description="Benchmark cuDF GPU parquet reads",
)
parser.add_argument("source", help="Path to parquet file")
parser.add_argument("--iterations", type=int, default=1, help="Number of scan iterations")
parser.add_argument(
"--row-group-batch-size",
type=int,
default=1,
help="Number of parquet row groups to read per cuDF call when streaming",
)
parser.add_argument(
"--full-file-read",
action="store_true",
help="Read the full parquet file in one call (old behavior, can OOM)",
)
args = parser.parse_args()

import cudf
import fsspec
import pyarrow.parquet as pq

source = args.source
if args.row_group_batch_size < 1:
raise ValueError("--row-group-batch-size must be >= 1")

fs, fs_path = fsspec.core.url_to_fs(source)
file_size = fs.size(fs_path)
file_size_mb = file_size / (1024 * 1024)

num_row_groups = None
if not args.full_file_read:
with fs.open(fs_path, "rb") as parquet_file:
num_row_groups = pq.ParquetFile(parquet_file).metadata.num_row_groups
print(
f"Streaming parquet by row groups: {num_row_groups} total, "
f"batch size={args.row_group_batch_size}",
file=sys.stderr,
)

iteration_secs = []
output_bytes = 0
for i in range(args.iterations):
start = time.perf_counter()
iter_bytes = 0
if args.full_file_read:
df = cudf.read_parquet(source)
iter_bytes = df.memory_usage(deep=True).sum()
del df
else:
for rg_start in range(0, num_row_groups, args.row_group_batch_size):
row_groups = list(
range(rg_start, min(rg_start + args.row_group_batch_size, num_row_groups))
)
df = cudf.read_parquet(source, row_groups=row_groups)
iter_bytes += df.memory_usage(deep=True).sum()
del df
elapsed = time.perf_counter() - start
iteration_secs.append(elapsed)
if i == 0:
output_bytes = iter_bytes
print(
f"Iteration {i + 1}/{args.iterations}: {elapsed:.3f}s",
file=sys.stderr,
)

avg_secs = sum(iteration_secs) / len(iteration_secs)
output_size_mb = output_bytes / (1024 * 1024)
input_throughput_mbs = file_size_mb / avg_secs
output_throughput_mbs = output_size_mb / avg_secs

print(file=sys.stderr)
print("=== Benchmark Results ===", file=sys.stderr)
print(f"Source: {source}", file=sys.stderr)
print(f"Iterations: {args.iterations}", file=sys.stderr)
print(f"Avg time: {avg_secs:.3f}s", file=sys.stderr)
print(f"Input size: {file_size_mb:.2f} MB", file=sys.stderr)
print(f"Output size: {output_size_mb:.2f} MB", file=sys.stderr)
print(f"Input throughput: {input_throughput_mbs:.2f} MB/s", file=sys.stderr)
print(f"Output throughput: {output_throughput_mbs:.2f} MB/s", file=sys.stderr)


if __name__ == "__main__":
main()
Loading