Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 93 additions & 1 deletion benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use std::time::Instant;
use clap::Parser;
use clap::value_parser;
use custom_labels::asynchronous::Label;
use datafusion::arrow::array::Array;
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::util::display::ArrayFormatter;
use datafusion::arrow::util::display::FormatOptions;
use datafusion::common::runtime::set_join_set_tracer;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::listing::ListingTable;
Expand Down Expand Up @@ -97,6 +100,14 @@ struct Args {
#[arg(long, default_value_t = false)]
explain: bool,

/// Validate query results against reference files.
#[arg(long, default_value_t = false, conflicts_with_all = &["explain", "generate_reference"])]
validate: bool,

/// Generate reference result files for future validation.
#[arg(long, default_value_t = false, conflicts_with_all = &["explain", "validate"])]
generate_reference: bool,

#[arg(long, value_delimiter = ',', value_parser = value_parser!(Format))]
formats: Vec<Format>,

Expand Down Expand Up @@ -163,9 +174,14 @@ async fn main() -> anyhow::Result<()> {

let mode = if args.explain {
BenchmarkMode::Explain
} else if args.validate {
BenchmarkMode::Validate
} else if args.generate_reference {
BenchmarkMode::GenerateReference
} else {
BenchmarkMode::Run {
iterations: args.iterations,
validate: std::env::var("CI").is_ok(),
}
};

Expand Down Expand Up @@ -215,7 +231,7 @@ async fn main() -> anyhow::Result<()> {
)
.await?;

if !args.explain {
if !args.explain && !args.validate && !args.generate_reference {
// Print metrics if requested
if show_metrics {
let plans = collected_plans.lock();
Expand Down Expand Up @@ -400,6 +416,82 @@ impl BenchmarkQueryResult for DataFusionQueryResult {
.map(|d| d.to_string())
.unwrap_or_else(|e| format!("<error: {e}>"))
}

fn normalized_result(&self) -> (Vec<String>, Vec<Vec<String>>) {
normalize_record_batches(&self.0)
}
}

/// Convert Arrow `RecordBatch`es into normalized column names and row values.
///
/// Uses [`vortex_bench::validation`] normalization for floats and strings to
/// match the sqllogictest conventions used by DuckDB's result normalization.
fn normalize_record_batches(batches: &[RecordBatch]) -> (Vec<String>, Vec<Vec<String>>) {
use datafusion::arrow::datatypes::DataType;
use vortex::error::VortexExpect;
use vortex_bench::validation::normalize_f32;
use vortex_bench::validation::normalize_f64;
use vortex_bench::validation::normalize_string;

let column_names = batches
.first()
.map(|b| {
b.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect()
})
.unwrap_or_default();

let format_opts = FormatOptions::default().with_null("NULL");
let mut rows = Vec::new();

for batch in batches {
let formatters: Vec<ArrayFormatter> = batch
.columns()
.iter()
.map(|col| ArrayFormatter::try_new(col.as_ref(), &format_opts))
.collect::<Result<Vec<_>, _>>()
.vortex_expect("ArrayFormatter creation should not fail");

for row_idx in 0..batch.num_rows() {
let mut row = Vec::with_capacity(batch.num_columns());
for (col_idx, formatter) in formatters.iter().enumerate() {
let col = batch.column(col_idx);
if col.is_null(row_idx) {
row.push("NULL".to_string());
} else {
let dt = col.data_type();
let cell = match dt {
DataType::Float32 => {
let arr = col
.as_any()
.downcast_ref::<datafusion::arrow::array::Float32Array>()
.vortex_expect("Float32 downcast");
normalize_f32(arr.value(row_idx))
}
DataType::Float64 => {
let arr = col
.as_any()
.downcast_ref::<datafusion::arrow::array::Float64Array>()
.vortex_expect("Float64 downcast");
normalize_f64(arr.value(row_idx))
}
DataType::Utf8 | DataType::LargeUtf8 => {
let s = formatter.value(row_idx).to_string();
normalize_string(&s)
}
_ => formatter.value(row_idx).to_string(),
};
row.push(cell);
}
}
rows.push(row);
}
}

(column_names, rows)
}

pub async fn execute_query(
Expand Down
113 changes: 102 additions & 11 deletions benchmarks/duckdb-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ use vortex_bench::Format;
use vortex_bench::IdempotentPath;
use vortex_bench::generate_duckdb_registration_sql;
use vortex_bench::runner::BenchmarkQueryResult;
use vortex_bench::validation;
use vortex_duckdb::duckdb::Config;
use vortex_duckdb::duckdb::Connection;
use vortex_duckdb::duckdb::Database;
use vortex_duckdb::duckdb::ExtractedValue;
use vortex_duckdb::duckdb::QueryResult;
use vortex_duckdb::duckdb::Value;

/// DuckDB context for benchmarks.
pub struct DuckClient {
Expand Down Expand Up @@ -199,25 +202,113 @@ impl DuckClient {
let time_instant = Instant::now();
let result = self.connection().query(query)?;
let query_time = time_instant.elapsed();
Ok((Some(query_time), DuckQueryResult(result)))
Ok((Some(query_time), DuckQueryResult::from_query_result(result)))
}
}

/// Wrapper around DuckDB's `QueryResult` implementing `BenchmarkQueryResult`.
pub struct DuckQueryResult(pub QueryResult);
/// Eagerly materialized wrapper around DuckDB query results.
///
/// Materializes the result on construction so that both `row_count()`,
/// `display()`, and `normalized_result()` can be called via shared reference.
pub struct DuckQueryResult {
row_count: usize,
display_string: String,
column_names: Vec<String>,
normalized_rows: Vec<Vec<String>>,
}

impl DuckQueryResult {
/// Consume a DuckDB `QueryResult` and materialize its contents.
pub fn from_query_result(result: QueryResult) -> Self {
let row_count = usize::try_from(result.row_count()).unwrap_or(0);
let col_count = usize::try_from(result.column_count()).unwrap_or(0);

let mut column_names = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
column_names.push(
result
.column_name(col_idx)
.vortex_expect("column name should be valid")
.to_string(),
);
}

let mut display_string = String::new();
let mut normalized_rows = Vec::new();

for chunk in result {
let chunk_str =
String::try_from(chunk.deref()).unwrap_or_else(|_| "<error>".to_string());
display_string.push_str(&chunk_str);

for row_idx in 0..chunk.len() {
let mut row = Vec::with_capacity(chunk.column_count());
for col_idx in 0..chunk.column_count() {
let vector = chunk.get_vector(col_idx);
let cell = match vector.get_value(row_idx, chunk.len()) {
Some(value) => normalize_duckdb_value(&value),
None => "NULL".to_string(),
};
row.push(cell);
}
normalized_rows.push(row);
}
}

Self {
row_count,
display_string,
column_names,
normalized_rows,
}
}
}

impl BenchmarkQueryResult for DuckQueryResult {
fn row_count(&self) -> usize {
usize::try_from(self.0.row_count()).unwrap_or(0)
self.row_count
}

fn display(self) -> String {
let mut output = String::new();
for chunk in self.0 {
let chunk_str =
String::try_from(chunk.deref()).unwrap_or_else(|_| "<error>".to_string());
output.push_str(&chunk_str);
}
output
self.display_string
}

fn normalized_result(&self) -> (Vec<String>, Vec<Vec<String>>) {
(self.column_names.clone(), self.normalized_rows.clone())
}
}

/// Normalize a DuckDB value to a canonical string representation.
///
/// Uses the same normalization as `vortex-sqllogictest`'s `ValueDisplayAdapter`
/// and the shared [`vortex_bench::validation`] helpers so that results are
/// comparable with DataFusion output.
fn normalize_duckdb_value(value: &Value) -> String {
match value.extract() {
ExtractedValue::Null => "NULL".to_string(),
ExtractedValue::TinyInt(v) => v.to_string(),
ExtractedValue::SmallInt(v) => v.to_string(),
ExtractedValue::Integer(v) => v.to_string(),
ExtractedValue::BigInt(v) => v.to_string(),
ExtractedValue::HugeInt(v) => v.to_string(),
ExtractedValue::UTinyInt(v) => v.to_string(),
ExtractedValue::USmallInt(v) => v.to_string(),
ExtractedValue::UInteger(v) => v.to_string(),
ExtractedValue::UBigInt(v) => v.to_string(),
ExtractedValue::UHugeInt(v) => v.to_string(),
ExtractedValue::Float(v) => validation::normalize_f32(v),
ExtractedValue::Double(v) => validation::normalize_f64(v),
ExtractedValue::Boolean(v) => v.to_string(),
ExtractedValue::Varchar(s) => validation::normalize_string(s.as_str()),
ExtractedValue::Decimal(_, scale, v) => validation::normalize_decimal(v, scale),
// Delegate to DuckDB's native string representation for other types.
ExtractedValue::Blob(_)
| ExtractedValue::Date(_)
| ExtractedValue::Time(_)
| ExtractedValue::TimestampNs(_)
| ExtractedValue::Timestamp(_)
| ExtractedValue::TimestampMs(_)
| ExtractedValue::TimestampS(_)
| ExtractedValue::List(_) => value.to_string(),
}
}
15 changes: 14 additions & 1 deletion benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ struct Args {
#[arg(long, default_value_t = false)]
explain: bool,

/// Validate query results against reference files.
#[arg(long, default_value_t = false, conflicts_with_all = &["explain", "generate_reference"])]
validate: bool,

/// Generate reference result files for future validation.
#[arg(long, default_value_t = false, conflicts_with_all = &["explain", "validate"])]
generate_reference: bool,

#[arg(
long,
default_value_t = false,
Expand Down Expand Up @@ -151,9 +159,14 @@ fn main() -> anyhow::Result<()> {

let mode = if args.explain {
BenchmarkMode::Explain
} else if args.validate {
BenchmarkMode::Validate
} else if args.generate_reference {
BenchmarkMode::GenerateReference
} else {
BenchmarkMode::Run {
iterations: args.iterations,
validate: std::env::var("CI").is_ok(),
}
};

Expand Down Expand Up @@ -185,7 +198,7 @@ fn main() -> anyhow::Result<()> {
},
)?;

if !args.explain {
if !args.explain && !args.validate && !args.generate_reference {
let benchmark_id = format!("duckdb-{}", benchmark.dataset_name());
let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?;
runner.export_to(&args.display_format, writer)?;
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/lance-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ async fn main() -> anyhow::Result<()> {
&filtered_queries,
BenchmarkMode::Run {
iterations: args.iterations,
validate: false,
},
|_format| async {
let session = SessionContext::new();
Expand Down Expand Up @@ -165,6 +166,10 @@ impl BenchmarkQueryResult for LanceQueryResult {
.map(|d| d.to_string())
.unwrap_or_else(|e| format!("<error: {e}>"))
}

fn normalized_result(&self) -> (Vec<String>, Vec<Vec<String>>) {
unimplemented!("Lance benchmarks do not support result validation")
}
}

pub async fn execute_query(
Expand Down
2 changes: 2 additions & 0 deletions vortex-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-trait = { workspace = true }
bigdecimal = { workspace = true }
bytes = { workspace = true }
bzip2 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
Expand All @@ -41,6 +42,7 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["stream"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
similar = { workspace = true }
sysinfo = { workspace = true }
tabled = { workspace = true, features = ["std"] }
target-lexicon = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions vortex-bench/REUSE.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ SPDX-License-Identifier = "Apache-2.0"
path = "**/*.csv"
SPDX-FileCopyrightText = "Copyright the Vortex contributors"
SPDX-License-Identifier = "Apache-2.0"

[[annotations]]
path = "**/*.tsv"
SPDX-FileCopyrightText = "Copyright the Vortex contributors"
SPDX-License-Identifier = "Apache-2.0"
2 changes: 2 additions & 0 deletions vortex-bench/clickbench/results/datafusion/q00.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
count(*)
99997497
2 changes: 2 additions & 0 deletions vortex-bench/clickbench/results/datafusion/q01.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
count(*)
630500
2 changes: 2 additions & 0 deletions vortex-bench/clickbench/results/datafusion/q02.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
sum(hits.AdvEngineID) count(*) avg(hits.ResolutionWidth)
7280088 99997497 1513.487934903011
2 changes: 2 additions & 0 deletions vortex-bench/clickbench/results/datafusion/q03.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
avg(hits.UserID)
2528953029789716000
2 changes: 2 additions & 0 deletions vortex-bench/clickbench/results/datafusion/q04.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
count(DISTINCT hits.UserID)
17630976
2 changes: 2 additions & 0 deletions vortex-bench/clickbench/results/datafusion/q05.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
count(DISTINCT hits.SearchPhrase)
6019103
2 changes: 2 additions & 0 deletions vortex-bench/clickbench/results/datafusion/q06.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
min(hits.EventDate) max(hits.EventDate)
2013-07-02T00:00:00 2013-07-31T00:00:00
Loading
Loading