Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 236 additions & 3 deletions datafusion/core/benches/sql_planner_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use arrow::array::{ArrayRef, RecordBatch};
use arrow_schema::DataType;
use arrow_schema::TimeUnit::Nanosecond;
use criterion::{Criterion, criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_catalog::MemTable;
use datafusion_common::ScalarValue;
Expand All @@ -27,6 +27,7 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when};
use datafusion_functions::expr_fn::{
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
};
use std::fmt::Write;
use std::hint::black_box;
use std::ops::Rem;
use std::sync::Arc;
Expand Down Expand Up @@ -212,21 +213,253 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
})
}

fn criterion_benchmark(c: &mut Criterion) {
/// Build a CASE-heavy dataframe over a non-inner join to stress
/// planner-time filter pushdown and nullability/type inference.
fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
register_string_table(ctx, 100, 1000);
let query = build_case_heavy_left_join_query(30, 1);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String {
let mut query = String::from(
"SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE ",
);

if predicate_count == 0 {
query.push_str("TRUE");
return query;
}

// Keep this deterministic so comparisons between profiles are stable.
for i in 0..predicate_count {
if i > 0 {
query.push_str(" AND ");
}

let mut expr = format!("length(l.c{})", i % 20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we really want a benchmark for the case expression.
We want to optimize for the evaluation cost of the filter during pushdown, so perhaps it could be written not using a large case expression as is done currently or adaptive removing filters, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the TPC-H/TPC-DS one is already a good one to optimize for.

Copy link
Contributor Author

@kosiew kosiew Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think we really want a benchmark for the case expression.
We want to optimize for the evaluation cost of the filter during pushdown, so perhaps it could be written not using a large case expression as is done currently or adaptive removing filters, etc.

I believe that you fear that by using a huge CASE expression we might be tuning for an unrealistic “case expression” workload instead of the more common cost of pushing filters through joins.

The benchmark uses CASE because that form triggered a profiler hotspot in PushDownFilter — the nullability/type‑inference codepath for filters on non‑inner joins. I don’t believe real‑world queries typically look like this, so the presence of CASE is purely a convenient way to exercise that particular expensive planner path, not the target of optimization.

To make this clear and avoid overfitting, I’m going to treat the CASE variant as a narrowly scoped micro‑benchmark and add a companion LEFT JOIN query with a simple predicate instead of a CASE.
With both in place we can separate:

  1. the baseline cost of pushing a filter through a join, and
  2. the extra work incurred when a CASE expression forces nullability inference.

That way the benchmark remains useful for optimization while still reflecting more general planner behaviour.

So the TPC-H/TPC-DS one is already a good one to optimize for.

Agreed. TPC-H/TPC-DS should remain the primary goal for optimization value and regression detection.

The intent here is to complement those suites with a deterministic micro-benchmark that isolates known planner hotspot; macro benchmarks are still required to verify end-to-end relevance and prevent narrow wins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I didn't realize the issue is about planning (not about evaluation cost / pushdown per se), sorry about that!

Perhaps this would make sense to move into the planning benchmarks instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait - it already is 🙈

for depth in 0..case_depth {
let left_col = (i + depth + 1) % 20;
let right_col = (i + depth + 2) % 20;
expr = format!(
"CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END"
);
}

let _ = write!(&mut query, "{expr} > 2");
}

query
}

fn build_case_heavy_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
case_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
register_string_table(&ctx, 100, 1000);
if !push_down_filter_enabled {
let removed = ctx.remove_optimizer_rule("push_down_filter");
assert!(
removed,
"push_down_filter rule should be present in the default optimizer"
);
}

let query = build_case_heavy_left_join_query(predicate_count, case_depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_non_case_left_join_query(
predicate_count: usize,
nesting_depth: usize,
) -> String {
let mut query = String::from(
"SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE ",
);

if predicate_count == 0 {
query.push_str("TRUE");
return query;
}

// Keep this deterministic so comparisons between profiles are stable.
for i in 0..predicate_count {
if i > 0 {
query.push_str(" AND ");
}

let left_col = i % 20;
let mut expr = format!("l.c{left_col}");
for depth in 0..nesting_depth {
let right_col = (i + depth + 1) % 20;
expr = format!("coalesce({expr}, r.c{right_col})");
}

let _ = write!(&mut query, "length({expr}) > 2");
}

query
}

fn build_non_case_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
nesting_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
register_string_table(&ctx, 100, 1000);
if !push_down_filter_enabled {
let removed = ctx.remove_optimizer_rule("push_down_filter");
assert!(
removed,
"push_down_filter rule should be present in the default optimizer"
);
}

let query = build_non_case_left_join_query(predicate_count, nesting_depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn criterion_benchmark(c: &mut Criterion) {
let baseline_ctx = SessionContext::new();
let case_heavy_ctx = SessionContext::new();
let rt = Runtime::new().unwrap();

// validate logical plan optimize performance
// https://github.com/apache/datafusion/issues/17261

let df = build_test_data_frame(&ctx, &rt);
let df = build_test_data_frame(&baseline_ctx, &rt);
let case_heavy_left_join_df = build_case_heavy_left_join_df(&case_heavy_ctx, &rt);

c.bench_function("logical_plan_optimize", |b| {
b.iter(|| {
let df_clone = df.clone();
black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() }));
})
});

c.bench_function("logical_plan_optimize_hotspot_case_heavy_left_join", |b| {
b.iter(|| {
let df_clone = case_heavy_left_join_df.clone();
black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() }));
})
});

let predicate_sweep = [10, 20, 30, 40, 60];
let case_depth_sweep = [1, 2, 3];

let mut hotspot_group =
c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab");
for case_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
true,
);
let without_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
false,
);

let input_label =
format!("predicates={predicate_count},case_depth={case_depth}");
// A/B interpretation:
// - with_push_down_filter: default optimizer path (rule enabled)
// - without_push_down_filter: control path with the rule removed
// Compare both IDs at the same sweep point to isolate rule impact.
hotspot_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
hotspot_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
hotspot_group.finish();

let mut control_group =
c.benchmark_group("push_down_filter_control_non_case_left_join_ab");
for nesting_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
true,
);
let without_push_down_filter =
build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
false,
);

let input_label =
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
control_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
control_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
control_group.finish();
}

criterion_group!(benches, criterion_benchmark);
Expand Down
43 changes: 40 additions & 3 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::Arc;
use arrow::datatypes::DataType;
use indexmap::IndexSet;
use itertools::Itertools;
use log::{Level, debug, log_enabled};

use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
Expand Down Expand Up @@ -543,8 +545,19 @@ fn push_down_join(
.map_or_else(Vec::new, |filter| split_conjunction_owned(filter.clone()));

// Are there any new join predicates that can be inferred from the filter expressions?
let inferred_join_predicates =
infer_join_predicates(&join, &predicates, &on_filters)?;
let inferred_join_predicates = with_debug_timing("infer_join_predicates", || {
infer_join_predicates(&join, &predicates, &on_filters)
})?;

if log_enabled!(Level::Debug) {
debug!(
"push_down_filter: join_type={:?}, parent_predicates={}, on_filters={}, inferred_join_predicates={}",
join.join_type,
predicates.len(),
on_filters.len(),
inferred_join_predicates.len()
);
}

if on_filters.is_empty()
&& predicates.is_empty()
Expand Down Expand Up @@ -783,7 +796,15 @@ impl OptimizerRule for PushDownFilter {

let predicate = split_conjunction_owned(filter.predicate.clone());
let old_predicate_len = predicate.len();
let new_predicates = simplify_predicates(predicate)?;
let new_predicates =
with_debug_timing("simplify_predicates", || simplify_predicates(predicate))?;
if log_enabled!(Level::Debug) {
debug!(
"push_down_filter: simplify_predicates old_count={}, new_count={}",
old_predicate_len,
new_predicates.len()
);
}
if old_predicate_len != new_predicates.len() {
let Some(new_predicate) = conjunction(new_predicates) else {
// new_predicates is empty - remove the filter entirely
Expand Down Expand Up @@ -1395,6 +1416,22 @@ impl PushDownFilter {
}
}

fn with_debug_timing<T, F>(label: &'static str, f: F) -> Result<T>
where
F: FnOnce() -> Result<T>,
{
if !log_enabled!(Level::Debug) {
return f();
}
let start = Instant::now();
let result = f();
debug!(
"push_down_filter_timing: section={label}, elapsed_us={}",
start.elapsed().as_micros()
);
result
}

/// replaces columns by its name on the projection.
pub fn replace_cols_by_name(
e: Expr,
Expand Down
Loading