-
Notifications
You must be signed in to change notification settings - Fork 150
parquet bench + cuda changes to scan clickbench #6739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
f08430d
15b87c4
0a7058d
6744166
96d37fb
36bbe76
9676670
8bbe843
e474389
c237031
eb12dd9
c6e294f
0780c87
4be081e
cc00661
3983bf0
ad38513
64f4e03
2dfeeaf
00b6510
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| [package] | ||
| name = "gpu-scan-bench" | ||
| authors = { workspace = true } | ||
| description = "CUDA GPU scan benchmarks for S3/NVMe" | ||
| edition = { workspace = true } | ||
| homepage = { workspace = true } | ||
| include = { workspace = true } | ||
| keywords = { workspace = true } | ||
| license = { workspace = true } | ||
| publish = false | ||
| repository = { workspace = true } | ||
| rust-version = { workspace = true } | ||
| version = { workspace = true } | ||
|
|
||
| [lints] | ||
| workspace = true | ||
|
|
||
| [dependencies] | ||
| clap = { workspace = true, features = ["derive"] } | ||
| futures = { workspace = true, features = ["executor"] } | ||
| object_store = { workspace = true, features = ["aws", "fs"] } | ||
| tokio = { workspace = true, features = ["macros", "full"] } | ||
| tracing = { workspace = true, features = ["std", "attributes"] } | ||
| tracing-perfetto = { workspace = true } | ||
| tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } | ||
| url = { workspace = true } | ||
| vortex = { workspace = true, features = ["tokio", "zstd"] } | ||
| vortex-cuda = { workspace = true, features = ["_test-harness", "unstable_encodings"] } | ||
| vortex-cuda-macros = { workspace = true } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| #!/usr/bin/env -S uv run --script | ||
| # /// script | ||
| # requires-python = ">=3.12" | ||
| # dependencies = [ | ||
| # "cudf-cu12", | ||
| # ] | ||
| # | ||
| # [tool.uv] | ||
| # extra-index-url = ["https://pypi.nvidia.com"] | ||
| # /// | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| # SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
| # | ||
| # Benchmark reading a Parquet file into GPU memory using cuDF. | ||
| # This serves as the baseline for comparing against Vortex GPU scans. | ||
| # | ||
| # Usage: | ||
| # uv run bench_parquet.py dataset.parquet --iterations 5 | ||
|
|
||
| import argparse | ||
| import json | ||
| import os | ||
| import sys | ||
| import time | ||
|
|
||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser( | ||
| description="Benchmark cuDF GPU parquet reads", | ||
| ) | ||
| parser.add_argument("source", help="Path to parquet file") | ||
| parser.add_argument("--iterations", type=int, default=1, help="Number of scan iterations") | ||
| args = parser.parse_args() | ||
|
|
||
| import cudf | ||
| import fsspec | ||
|
|
||
| source = args.source | ||
| fs, fs_path = fsspec.core.url_to_fs(source) | ||
| file_size = fs.size(fs_path) | ||
| file_size_mb = file_size / (1024 * 1024) | ||
|
|
||
| iteration_secs = [] | ||
| for i in range(args.iterations): | ||
| start = time.perf_counter() | ||
| df = cudf.read_parquet(source) | ||
| elapsed = time.perf_counter() - start | ||
| iteration_secs.append(elapsed) | ||
| print( | ||
| f"Iteration {i + 1}/{args.iterations}: {elapsed:.3f}s", | ||
| file=sys.stderr, | ||
| ) | ||
| del df | ||
|
|
||
| avg_secs = sum(iteration_secs) / len(iteration_secs) | ||
| throughput_mbs = file_size_mb / avg_secs | ||
|
|
||
| print(file=sys.stderr) | ||
| print("=== Benchmark Results ===", file=sys.stderr) | ||
| print(f"Source: {source}", file=sys.stderr) | ||
| print(f"Iterations: {args.iterations}", file=sys.stderr) | ||
| print(f"Avg time: {avg_secs:.3f}s", file=sys.stderr) | ||
| print(f"File size: {file_size_mb:.2f} MB", file=sys.stderr) | ||
| print(f"Throughput: {throughput_mbs:.2f} MB/s", file=sys.stderr) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,216 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| #![allow(unused_imports)] | ||
|
|
||
| use std::fs::File; | ||
| use std::path::PathBuf; | ||
| use std::sync::Arc; | ||
| use std::time::Instant; | ||
|
|
||
| use clap::Parser; | ||
| use futures::TryStreamExt; | ||
| use futures::stream::StreamExt; | ||
| use object_store::aws::AmazonS3Builder; | ||
| use object_store::path::Path as ObjectPath; | ||
| use tracing::Instrument; | ||
| use tracing_perfetto::PerfettoLayer; | ||
| use tracing_subscriber::EnvFilter; | ||
| use tracing_subscriber::Layer; | ||
| use tracing_subscriber::fmt::format::FmtSpan; | ||
| use tracing_subscriber::layer::SubscriberExt; | ||
| use tracing_subscriber::util::SubscriberInitExt; | ||
| use url::Url; | ||
| use vortex::VortexSessionDefault; | ||
| use vortex::error::VortexResult; | ||
| use vortex::file::OpenOptionsSessionExt; | ||
| use vortex::io::session::RuntimeSessionExt; | ||
| use vortex::session::VortexSession; | ||
| use vortex_cuda::CudaSession; | ||
| use vortex_cuda::CudaSessionExt; | ||
| use vortex_cuda::PinnedByteBufferPool; | ||
| use vortex_cuda::PooledFileReadAt; | ||
| use vortex_cuda::PooledObjectStoreReadAt; | ||
| use vortex_cuda::VortexCudaStreamPool; | ||
| use vortex_cuda::executor::CudaArrayExt; | ||
| use vortex_cuda::layout::register_cuda_layout; | ||
| use vortex_cuda_macros::cuda_available; | ||
| use vortex_cuda_macros::cuda_not_available; | ||
|
|
||
| #[derive(Parser)] | ||
| #[command( | ||
| name = "gpu-scan-bench", | ||
| about = "Benchmark GPU scans of CUDA-compatible Vortex files from S3 or local storage" | ||
| )] | ||
| struct Cli { | ||
| /// S3 URI (s3://bucket/path) or local path to a CUDA-compatible .vortex file. | ||
| source: String, | ||
|
|
||
| /// Number of scan iterations. | ||
| #[arg(long, default_value_t = 1)] | ||
| iterations: usize, | ||
|
|
||
| /// Path to write Perfetto trace output. If omitted, no trace file is written. | ||
| #[arg(long)] | ||
| perfetto: Option<PathBuf>, | ||
|
|
||
| /// Number of batches to process concurrently (each on its own CUDA stream). | ||
| #[arg(long, default_value_t = 1)] | ||
| concurrency: usize, | ||
|
|
||
| /// Output logs as JSON. | ||
| #[arg(long)] | ||
| json: bool, | ||
| } | ||
|
|
||
| #[cuda_not_available] | ||
| fn main() {} | ||
|
|
||
| #[cuda_available] | ||
| #[tokio::main] | ||
| async fn main() -> VortexResult<()> { | ||
| let cli = Cli::parse(); | ||
|
|
||
| // Setup tracing | ||
| let perfetto_guard = if let Some(ref perfetto_path) = cli.perfetto { | ||
| let perfetto_file = File::create(perfetto_path)?; | ||
| Some(PerfettoLayer::new(perfetto_file).with_debug_annotations(true)) | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| if cli.json { | ||
| let log_layer = tracing_subscriber::fmt::layer() | ||
| .json() | ||
| .with_span_events(FmtSpan::NONE) | ||
| .with_ansi(false); | ||
|
|
||
| let registry = tracing_subscriber::registry() | ||
| .with(log_layer.with_filter(EnvFilter::from_default_env())); | ||
|
|
||
| if let Some(perfetto) = perfetto_guard { | ||
| registry.with(perfetto).init(); | ||
| } else { | ||
| registry.init(); | ||
| } | ||
| } else { | ||
| let log_layer = tracing_subscriber::fmt::layer() | ||
| .pretty() | ||
| .with_span_events(FmtSpan::NONE) | ||
| .with_ansi(false) | ||
| .event_format(tracing_subscriber::fmt::format().with_target(true)); | ||
|
|
||
| let registry = tracing_subscriber::registry() | ||
| .with(log_layer.with_filter(EnvFilter::from_default_env())); | ||
|
|
||
| if let Some(perfetto) = perfetto_guard { | ||
| registry.with(perfetto).init(); | ||
| } else { | ||
| registry.init(); | ||
| } | ||
| } | ||
|
|
||
| let session = VortexSession::default().with_tokio(); | ||
| register_cuda_layout(&session); | ||
|
|
||
| let cuda_context = session.cuda_session().context().clone(); | ||
|
|
||
| let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone(&cuda_context))); | ||
| let cuda_stream = VortexCudaStreamPool::new(Arc::clone(&cuda_context), 1).get_stream()?; | ||
| let handle = session.handle(); | ||
|
|
||
| // Parse source and create reader | ||
| let reader: Arc<dyn vortex::io::VortexReadAt> = if cli.source.starts_with("s3://") { | ||
| let url = Url::parse(&cli.source) | ||
| .map_err(|e| vortex::error::vortex_err!("invalid S3 URL: {e}"))?; | ||
| let bucket = url | ||
| .host_str() | ||
| .ok_or_else(|| vortex::error::vortex_err!("S3 URL missing bucket name"))?; | ||
| let path = ObjectPath::from(url.path()); | ||
| let store: Arc<dyn object_store::ObjectStore> = Arc::new( | ||
| AmazonS3Builder::from_env() | ||
| .with_bucket_name(bucket) | ||
| .build()?, | ||
| ); | ||
| Arc::new(PooledObjectStoreReadAt::new( | ||
| store, | ||
| path, | ||
| handle, | ||
| Arc::clone(&pool), | ||
| cuda_stream, | ||
| )) | ||
| } else { | ||
| let path = PathBuf::from(&cli.source); | ||
| Arc::new(PooledFileReadAt::open( | ||
| &path, | ||
| handle, | ||
| Arc::clone(&pool), | ||
| cuda_stream, | ||
| )?) | ||
| }; | ||
|
|
||
| // Run benchmark iterations | ||
| let mut iteration_times = Vec::with_capacity(cli.iterations); | ||
| let concurrency = cli.concurrency; | ||
|
|
||
| for iteration in 0..cli.iterations { | ||
| let start = Instant::now(); | ||
|
|
||
| let gpu_file = session.open_options().open(Arc::clone(&reader)).await?; | ||
|
|
||
| let batches = gpu_file.scan()?.into_array_stream()?; | ||
|
|
||
| batches | ||
| .enumerate() | ||
| .map(|(chunk, batch)| { | ||
| let session = &session; | ||
| async move { | ||
| let batch = batch?; | ||
| let len = batch.len(); | ||
| let span = tracing::info_span!( | ||
| "batch execution", | ||
| iteration = iteration, | ||
| chunk = chunk, | ||
| len = len, | ||
| ); | ||
|
|
||
| async { | ||
| let mut cuda_ctx = CudaSession::create_execution_ctx(session)?; | ||
| batch.execute_cuda(&mut cuda_ctx).await?; | ||
| VortexResult::Ok(()) | ||
| } | ||
| .instrument(span) | ||
| .await | ||
| } | ||
| }) | ||
| .buffered(concurrency) | ||
| .try_collect::<Vec<_>>() | ||
| .await?; | ||
|
|
||
| let elapsed = start.elapsed(); | ||
| iteration_times.push(elapsed); | ||
| tracing::info!( | ||
| "Iteration {}/{}: {:.3}s", | ||
| iteration + 1, | ||
| cli.iterations, | ||
| elapsed.as_secs_f64() | ||
| ); | ||
| } | ||
|
|
||
| // Compute summary stats | ||
| let total: std::time::Duration = iteration_times.iter().sum(); | ||
| let avg = total / iteration_times.len() as u32; | ||
| let file_size = reader.size().await?; | ||
| let file_size_mb = file_size as f64 / (1024.0 * 1024.0); | ||
| let throughput_mbs = file_size_mb / avg.as_secs_f64(); | ||
| // Always print human-readable to stderr | ||
| eprintln!(); | ||
| eprintln!("=== Benchmark Results ==="); | ||
| eprintln!("Source: {}", cli.source); | ||
| eprintln!("Iterations: {}", cli.iterations); | ||
| eprintln!("Avg time: {:.3}s", avg.as_secs_f64()); | ||
| eprintln!("File size: {file_size_mb:.2} MB"); | ||
| eprintln!("Throughput: {throughput_mbs:.2} MB/s"); | ||
|
|
||
| Ok(()) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,9 +10,11 @@ use cudarc::driver::PushKernelArg; | |
| use tracing::instrument; | ||
| use vortex::array::ArrayRef; | ||
| use vortex::array::Canonical; | ||
| use vortex::array::IntoArray; | ||
| use vortex::array::arrays::ConstantArray; | ||
| use vortex::array::arrays::ConstantVTable; | ||
| use vortex::array::arrays::DecimalArray; | ||
| use vortex::array::arrays::ExtensionArray; | ||
| use vortex::array::arrays::PrimitiveArray; | ||
| use vortex::array::buffer::BufferHandle; | ||
| use vortex::array::match_each_decimal_value_type; | ||
|
|
@@ -76,6 +78,16 @@ impl CudaExecute for ConstantNumericExecutor { | |
| materialize_constant_decimal::<D>(array, decimal_dtype, validity, ctx).await | ||
| }) | ||
| } | ||
| DType::Extension(ext_dtype) => { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I needed this to solve a panic on datetimeparts |
||
| let ext_dtype = ext_dtype.clone(); | ||
| let storage_scalar = array.scalar().as_extension().to_storage_scalar(); | ||
| let storage_constant = ConstantArray::new(storage_scalar, array.len()).into_array(); | ||
| let storage_canonical = self.execute(storage_constant, ctx).await?; | ||
| Ok(Canonical::Extension(ExtensionArray::new( | ||
| ext_dtype, | ||
| storage_canonical.into_array(), | ||
| ))) | ||
| } | ||
| dt => vortex_bail!( | ||
| "CUDA constant array only supports numeric types, got {:?}", | ||
| dt | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,6 +106,13 @@ impl CudaExecute for DateTimePartsExecutor { | |
| let seconds_prim = seconds_canonical.into_primitive(); | ||
| let subseconds_prim = subseconds_canonical.into_primitive(); | ||
|
|
||
| // Components may decompress as unsigned (e.g. from BitPacked). Reinterpret | ||
| // as signed since the CUDA kernel only has signed variants and casts | ||
| // everything to int64_t anyway — the bit pattern is identical. | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know why I was getting a uint here on datetimeparts, so hacked around this |
||
| let days_prim = days_prim.reinterpret_cast(days_prim.ptype().to_signed()); | ||
| let seconds_prim = seconds_prim.reinterpret_cast(seconds_prim.ptype().to_signed()); | ||
| let subseconds_prim = subseconds_prim.reinterpret_cast(subseconds_prim.ptype().to_signed()); | ||
|
|
||
| let days_ptype = days_prim.ptype(); | ||
| let seconds_ptype = seconds_prim.ptype(); | ||
| let subseconds_ptype = subseconds_prim.ptype(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a standalone uv script to do the same scan we do on
gpu-scan-benchabove, but for parquet instead of vortex