Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 165 additions & 2 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::sync::Arc;

use crate::physical_optimizer::test_utils::{
coalesce_partitions_exec, global_limit_exec, local_limit_exec, sort_exec,
sort_preserving_merge_exec, stream_exec,
coalesce_partitions_exec, global_limit_exec, hash_join_exec, local_limit_exec,
sort_exec, sort_preserving_merge_exec, stream_exec,
};

use arrow::compute::SortOptions;
Expand All @@ -29,6 +29,7 @@ use datafusion_common::error::Result;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr::expressions::{BinaryExpr, col, lit};
use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
Expand Down Expand Up @@ -161,6 +162,168 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li
Ok(())
}

fn join_on_columns(
left_col: &str,
right_col: &str,
) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
vec![(
Arc::new(datafusion_physical_expr::expressions::Column::new(
left_col, 0,
)) as _,
Arc::new(datafusion_physical_expr::expressions::Column::new(
right_col, 0,
)) as _,
)]
}

#[test]
fn absorbs_limit_into_hash_join_inner() -> Result<()> {
// HashJoinExec with Inner join should absorb limit via with_fetch
let schema = create_schema();
let left = empty_exec(Arc::clone(&schema));
let right = empty_exec(Arc::clone(&schema));
let on = join_on_columns("c1", "c1");
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
let global_limit = global_limit_exec(hash_join, 0, Some(5));

let initial = format_plan(&global_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=0, fetch=5
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
EmptyExec
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
// The limit should be absorbed by the hash join (not pushed to children)
insta::assert_snapshot!(
optimized,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=5
EmptyExec
EmptyExec
"
);

Ok(())
}

#[test]
fn absorbs_limit_into_hash_join_right() -> Result<()> {
// HashJoinExec with Right join should absorb limit via with_fetch
let schema = create_schema();
let left = empty_exec(Arc::clone(&schema));
let right = empty_exec(Arc::clone(&schema));
let on = join_on_columns("c1", "c1");
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Right)?;
let global_limit = global_limit_exec(hash_join, 0, Some(10));

let initial = format_plan(&global_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=0, fetch=10
HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)]
EmptyExec
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
// The limit should be absorbed by the hash join
insta::assert_snapshot!(
optimized,
@r"
HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)], fetch=10
EmptyExec
EmptyExec
"
);

Ok(())
}

#[test]
fn absorbs_limit_into_hash_join_left() -> Result<()> {
// during probing, then unmatched rows at the end, stopping when limit is reached
let schema = create_schema();
let left = empty_exec(Arc::clone(&schema));
let right = empty_exec(Arc::clone(&schema));
let on = join_on_columns("c1", "c1");
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Left)?;
let global_limit = global_limit_exec(hash_join, 0, Some(5));

let initial = format_plan(&global_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=0, fetch=5
HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)]
EmptyExec
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
// Left join now absorbs the limit
insta::assert_snapshot!(
optimized,
@r"
HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)], fetch=5
EmptyExec
EmptyExec
"
);

Ok(())
}

#[test]
fn absorbs_limit_with_skip_into_hash_join() -> Result<()> {
let schema = create_schema();
let left = empty_exec(Arc::clone(&schema));
let right = empty_exec(Arc::clone(&schema));
let on = join_on_columns("c1", "c1");
let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
let global_limit = global_limit_exec(hash_join, 3, Some(5));

let initial = format_plan(&global_limit);
insta::assert_snapshot!(
initial,
@r"
GlobalLimitExec: skip=3, fetch=5
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
EmptyExec
EmptyExec
"
);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
let optimized = format_plan(&after_optimize);
// With skip, GlobalLimit is kept but fetch (skip + limit = 8) is absorbed by the join
insta::assert_snapshot!(
optimized,
@r"
GlobalLimitExec: skip=3, fetch=5
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=8
EmptyExec
EmptyExec
"
);

Ok(())
}

#[test]
fn pushes_global_limit_exec_through_projection_exec() -> Result<()> {
let schema = create_schema();
Expand Down
73 changes: 72 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ pub struct HashJoinExecBuilder {
partition_mode: PartitionMode,
null_equality: NullEquality,
null_aware: bool,
/// Maximum number of rows to return
Comment thread
jonathanc-n marked this conversation as resolved.
fetch: Option<usize>,
}

impl HashJoinExecBuilder {
Expand All @@ -278,6 +280,7 @@ impl HashJoinExecBuilder {
join_type,
null_equality: NullEquality::NullEqualsNothing,
null_aware: false,
fetch: None,
}
}

Expand Down Expand Up @@ -316,6 +319,12 @@ impl HashJoinExecBuilder {
self
}

/// Set fetch limit.
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
}

/// Build resulting execution plan.
pub fn build(self) -> Result<HashJoinExec> {
let Self {
Expand All @@ -328,6 +337,7 @@ impl HashJoinExecBuilder {
partition_mode,
null_equality,
null_aware,
fetch,
} = self;

let left_schema = left.schema();
Expand Down Expand Up @@ -393,6 +403,7 @@ impl HashJoinExecBuilder {
null_aware,
cache,
dynamic_filter: None,
fetch,
})
}
}
Expand All @@ -409,6 +420,7 @@ impl From<&HashJoinExec> for HashJoinExecBuilder {
partition_mode: exec.mode,
null_equality: exec.null_equality,
null_aware: exec.null_aware,
fetch: exec.fetch,
}
}
}
Expand Down Expand Up @@ -646,6 +658,8 @@ pub struct HashJoinExec {
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
/// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
dynamic_filter: Option<HashJoinExecDynamicFilter>,
/// Maximum number of rows to return
fetch: Option<usize>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -760,6 +774,11 @@ impl HashJoinExec {
self.null_equality
}

/// Get the fetch (limit) for this join
pub fn fetch(&self) -> Option<usize> {
Comment thread
jonathanc-n marked this conversation as resolved.
Outdated
self.fetch
}

/// Get the dynamic filter expression for testing purposes.
/// Returns `None` if no dynamic filter has been set.
///
Expand Down Expand Up @@ -979,6 +998,9 @@ impl DisplayAs for HashJoinExec {
} else {
""
};
let display_fetch = self
.fetch
.map_or_else(String::new, |f| format!(", fetch={f}"));
let on = self
.on
.iter()
Expand All @@ -987,13 +1009,14 @@ impl DisplayAs for HashJoinExec {
.join(", ");
write!(
f,
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}",
self.mode,
self.join_type,
on,
display_filter,
display_projections,
display_null_equality,
display_fetch,
)
}
DisplayFormatType::TreeRender => {
Expand All @@ -1020,6 +1043,10 @@ impl DisplayAs for HashJoinExec {
writeln!(f, "filter={filter}")?;
}

if let Some(fetch) = self.fetch {
writeln!(f, "fetch={fetch}")?;
}

Ok(())
}
}
Expand Down Expand Up @@ -1122,6 +1149,7 @@ impl ExecutionPlan for HashJoinExec {
)?,
// Keep the dynamic filter, bounds accumulator will be reset
dynamic_filter: self.dynamic_filter.clone(),
fetch: self.fetch,
}))
}

Expand All @@ -1145,6 +1173,7 @@ impl ExecutionPlan for HashJoinExec {
cache: self.cache.clone(),
// Reset dynamic filter and bounds accumulator to initial state
dynamic_filter: None,
fetch: self.fetch,
}))
}

Expand Down Expand Up @@ -1312,6 +1341,7 @@ impl ExecutionPlan for HashJoinExec {
build_accumulator,
self.mode,
self.null_aware,
self.fetch,
)))
}

Expand Down Expand Up @@ -1476,12 +1506,53 @@ impl ExecutionPlan for HashJoinExec {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
}),
fetch: self.fetch,
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
}
}
Ok(result)
}

fn supports_limit_pushdown(&self) -> bool {
// Hash join execution plan does not support pushing limit down through to children
// because the children don't know about the join condition and can't
// determine how many rows to produce
false
}

fn fetch(&self) -> Option<usize> {
self.fetch
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method really needed ?
If it is not then I'd suggest to remove it.
If the HashJoinExec is already executed then setting a new limit will be confusing/inconsistent unless it is re-executed again.

Alternatively it could be implemented as:

HashJoinExecBuilder::from(self)
    .with_fetch(limit)
    .build()
    .ok()
    .map(|exec| Arc::new(exec) as _)

This way it won't keep the calculated state.

Copy link
Copy Markdown
Contributor Author

@jonathanc-n jonathanc-n Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is needed, the optimizer uses it to pass the limit to the exec node. with_fetch() will only be called during compile time for limit pushdown so the current solution is fine. I will change it to use HashJoinExec builder

// Null-aware anti join requires seeing ALL probe rows to check for NULLs.
// If any probe row has NULL, the output must be empty.
// We can't stop early or we might miss a NULL and return wrong results.
if self.null_aware {
Comment thread
jonathanc-n marked this conversation as resolved.
Outdated
return None;
}

Some(Arc::new(HashJoinExec {
left: Arc::clone(&self.left),
right: Arc::clone(&self.right),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
null_aware: self.null_aware,
cache: self.cache.clone(),
dynamic_filter: self.dynamic_filter.clone(),
fetch: limit,
}))
}
}

/// Accumulator for collecting min/max bounds from build-side data during hash join.
Expand Down
Loading