Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ clap = { version = "4.6.0", features = ["derive", "env"] }
criterion = { workspace = true, features = ["html_reports"] }
datafusion = { workspace = true, default-features = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-common-runtime = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
libmimalloc-sys = { version = "0.1", optional = true }
Expand Down
259 changes: 20 additions & 239 deletions benchmarks/benches/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,13 @@
//! Cargo, for example: `BENCH_NAME=tpch cargo bench --bench sql`.

use clap::Parser;
use criterion::{Criterion, SamplingMode, criterion_group, criterion_main};
use datafusion::error::Result;
use datafusion::prelude::SessionContext;
use datafusion_benchmarks::sql_benchmark::SqlBenchmark;
use datafusion_benchmarks::util::{CommonOpt, print_memory_stats};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_benchmarks::sql_benchmark_runner::{
BenchmarkFilter, SqlRunConfig, default_sql_benchmark_directory,
run_criterion_benchmarks_impl,
};
use datafusion_benchmarks::util::CommonOpt;
use datafusion_common::instant::Instant;
use log::{debug, info};
use std::collections::BTreeMap;
use std::fs;
use std::sync::LazyLock;
use tokio::runtime::Runtime;

static SQL_BENCHMARK_DIRECTORY: LazyLock<String> = LazyLock::new(|| {
format!(
"{}{}{}",
env!("CARGO_MANIFEST_DIR"),
std::path::MAIN_SEPARATOR,
"sql_benchmarks"
)
});

#[cfg(feature = "snmalloc")]
#[global_allocator]
Expand Down Expand Up @@ -90,234 +77,28 @@ pub fn sql(c: &mut Criterion) {

let start = Instant::now();
let args = EnvParser::parse();
let rt = make_tokio_runtime();
let config = SqlRunConfig {
common: args.options,
filter: BenchmarkFilter {
name: args.name,
subgroup: args.subgroup,
query: args.query,
},
persist_results: args.persist_results,
validate_results: args.validate,
output: None,
};

println!("Loading benchmarks...");

let benchmarks = rt.block_on(async {
let ctx = make_ctx(&args).expect("SessionContext creation failed");

load_benchmarks(&args, &ctx, &SQL_BENCHMARK_DIRECTORY)
.await
.unwrap_or_else(|err| panic!("failed load benchmarks: {err:?}"))
});
run_criterion_benchmarks_impl(&default_sql_benchmark_directory(), &config, c)
.unwrap_or_else(|err| panic!("failed to run SQL benchmarks: {err:?}"));

println!(
"Loaded benchmarks in {} ms ...",
"Completed benchmarks in {} ms ...",
start.elapsed().as_millis()
);

for (group, benchmarks) in benchmarks {
let mut group = c.benchmark_group(group);
group.sample_size(10);
group.sampling_mode(SamplingMode::Flat);

for mut benchmark in benchmarks {
// create a context
let ctx = make_ctx(&args).expect("SessionContext creation failed");

// initialize the benchmark. This parses the benchmark file and does any pre-execution
// work such as loading data into tables
rt.block_on(async {
benchmark
.initialize(&ctx)
.await
.expect("initialization failed");

// run assertions
benchmark.assert(&ctx).await.expect("assertion failed");
});

let mut name = benchmark.name().to_string();
if !benchmark.subgroup().is_empty() {
name.push('_');
name.push_str(benchmark.subgroup());
}

if args.persist_results {
handle_persist(&rt, &ctx, &name, &mut benchmark);
} else if args.validate {
handle_verify(&rt, &ctx, &name, &mut benchmark);
} else {
info!("Running benchmark {name} ...");

let name = name.clone();
group.bench_function(name.clone(), |b| {
b.iter(|| handle_run(&rt, &ctx, &args, &mut benchmark, &name))
});

print_memory_stats();

info!("Benchmark {name} completed");
}

// run cleanup
rt.block_on(async {
benchmark.cleanup(&ctx).await.expect("Cleanup failed");
});
}

group.finish();
}
}

fn handle_run(
rt: &Runtime,
ctx: &SessionContext,
args: &EnvParser,
benchmark: &mut SqlBenchmark,
name: &str,
) {
rt.block_on(async {
benchmark
.run(ctx, args.validate)
.await
.unwrap_or_else(|err| panic!("Failed to run benchmark {name}: {err:?}"))
});
}

fn handle_persist(
rt: &Runtime,
ctx: &SessionContext,
name: &str,
benchmark: &mut SqlBenchmark,
) {
info!("Running benchmark {name} prior to persisting results ...");

rt.block_on(async {
info!("Persisting benchmark {name} ...");

benchmark
.persist(ctx)
.await
.expect("Failed to persist results");
});

info!("Persisted benchmark {name} successfully");
}

fn handle_verify(
rt: &Runtime,
ctx: &SessionContext,
name: &str,
benchmark: &mut SqlBenchmark,
) {
info!("Verifying benchmark {name} results ...");

rt.block_on(async {
benchmark
.run(ctx, true)
.await
.unwrap_or_else(|err| panic!("Failed to run benchmark {name}: {err:?}"));
benchmark
.verify(ctx)
.await
.unwrap_or_else(|err| panic!("Verification failed: {err:?}"));
});

info!("Verified benchmark {name} results successfully");
}

criterion_group!(benches, sql);
criterion_main!(benches);

fn make_tokio_runtime() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}

fn make_ctx(args: &EnvParser) -> Result<SessionContext> {
let config = args.options.config()?;
let rt = args.options.build_runtime()?;

Ok(SessionContext::new_with_config_rt(config, rt))
}

/// Recursively walks the directory tree starting at `path` and
/// calls the call back function for every file encountered.
pub fn list_files<F>(path: &str, callback: &mut F)
where
F: FnMut(&str),
{
let mut entries: Vec<fs::DirEntry> =
fs::read_dir(path).unwrap().filter_map(Result::ok).collect();
entries.sort_by_key(|entry| entry.path());

for dir_entry in entries {
let path = dir_entry.path();
if path.is_dir() {
// Recurse into the sub‑directory
list_files(&path.to_string_lossy(), callback);
} else {
// For files, invoke the callback with the full path as a string
let full_str = path.to_string_lossy();
callback(&full_str);
}
}
}

/// Loads all benchmark files in the `sql_benchmarks` directory.
/// For each file ending with `.benchmark` it creates a new
/// `SqlBenchmark` instance.
async fn load_benchmarks(
args: &EnvParser,
ctx: &SessionContext,
path: &str,
) -> Result<BTreeMap<String, Vec<SqlBenchmark>>> {
let mut benches = BTreeMap::new();
let mut paths = Vec::new();

list_files(path, &mut |path: &str| {
if path.ends_with(".benchmark") {
paths.push(path.to_string());
}
});

for path in paths {
debug!("Loading benchmark from {path}");

let benchmark = SqlBenchmark::new(ctx, &path, &*SQL_BENCHMARK_DIRECTORY).await?;
let entries = benches
.entry(benchmark.group().to_string())
.or_insert(vec![]);

entries.push(benchmark);
}

benches = filter_benchmarks(args, benches);
benches.iter_mut().for_each(|(_, benchmarks)| {
benchmarks.sort_by(|b1, b2| b1.name().cmp(b2.name()))
});

Ok(benches)
}

fn filter_benchmarks(
args: &EnvParser,
benchmarks: BTreeMap<String, Vec<SqlBenchmark>>,
) -> BTreeMap<String, Vec<SqlBenchmark>> {
match &args.name {
Some(bench_name) => benchmarks
.into_iter()
.filter(|(key, _val)| key.eq_ignore_ascii_case(bench_name))
.map(|(key, mut val)| {
if let Some(subgroup) = &args.subgroup {
val.retain(|bench| bench.subgroup().eq_ignore_ascii_case(subgroup));
}
if let Some(query) = &args.query {
// Accept `1`, `01`, `6a`, `Q06a`, ... case-insensitively.
// Bench names are canonical, e.g. `Q01`, `Q06a`.
let q = query.trim_start_matches(['Q', 'q']);
let split = q.find(|c: char| !c.is_ascii_digit()).unwrap_or(q.len());
let (num, suffix) = q.split_at(split);
let normalized = format!("Q{num:0>2}{suffix}");
val.retain(|bench| bench.name().eq_ignore_ascii_case(&normalized));
}
(key, val)
})
.collect(),
None => benchmarks,
}
}
74 changes: 74 additions & 0 deletions benchmarks/src/bin/benchmark_runner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! DataFusion SQL benchmark runner.

use datafusion_benchmarks::sql_benchmark_runner;

#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;

// `cargo clippy --all-features` enables both allocator features, so prefer
// `snmalloc` in that case and fall back to `mimalloc` otherwise.
#[cfg(all(not(feature = "snmalloc"), feature = "mimalloc"))]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[tokio::main]
async fn main() {
env_logger::init();
if let Err(error) = sql_benchmark_runner::run_cli().await {
eprintln!("Error: {error}");
std::process::exit(1);
}
}

#[test]
fn benchmark_runner_prints_errors_without_debug_escaping() {
use std::process::Command;

let output = Command::new("cargo")
.args([
"run",
"--bin",
"benchmark_runner",
"--",
"predicate_eval",
"--query",
"999",
"--iterations",
"1",
])
.env("PRED_ROWS", "1000000")
.output()
.expect("run benchmark_runner");

assert!(!output.status.success());

let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("no SQL benchmark query matched benchmark 'predicate_eval'"),
"{stderr}"
);
assert!(
stderr.contains("Available predicate_eval queries:"),
"{stderr}"
);
assert!(!stderr.contains("Execution(\""), "{stderr}");
assert!(!stderr.contains("\\n"), "{stderr}");
}
1 change: 1 addition & 0 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod smj;
pub mod sort_pushdown;
pub mod sort_tpch;
pub mod sql_benchmark;
pub mod sql_benchmark_runner;
pub mod tpcds;
pub mod tpch;
pub mod util;
Loading
Loading