-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat: Push limit into hash join #20228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
832a79c
78e572c
5ff7a99
9ce44f3
3b46fd4
5c94567
67638c0
006a1ac
be992f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -258,6 +258,8 @@ pub struct HashJoinExecBuilder { | |
| partition_mode: PartitionMode, | ||
| null_equality: NullEquality, | ||
| null_aware: bool, | ||
| /// Maximum number of rows to return | ||
| fetch: Option<usize>, | ||
| } | ||
|
|
||
| impl HashJoinExecBuilder { | ||
|
|
@@ -278,6 +280,7 @@ impl HashJoinExecBuilder { | |
| join_type, | ||
| null_equality: NullEquality::NullEqualsNothing, | ||
| null_aware: false, | ||
| fetch: None, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -316,6 +319,12 @@ impl HashJoinExecBuilder { | |
| self | ||
| } | ||
|
|
||
| /// Set fetch limit. | ||
| pub fn with_fetch(mut self, fetch: Option<usize>) -> Self { | ||
| self.fetch = fetch; | ||
| self | ||
| } | ||
|
|
||
| /// Build resulting execution plan. | ||
| pub fn build(self) -> Result<HashJoinExec> { | ||
| let Self { | ||
|
|
@@ -328,6 +337,7 @@ impl HashJoinExecBuilder { | |
| partition_mode, | ||
| null_equality, | ||
| null_aware, | ||
| fetch, | ||
| } = self; | ||
|
|
||
| let left_schema = left.schema(); | ||
|
|
@@ -393,6 +403,7 @@ impl HashJoinExecBuilder { | |
| null_aware, | ||
| cache, | ||
| dynamic_filter: None, | ||
| fetch, | ||
| }) | ||
| } | ||
| } | ||
|
|
@@ -409,6 +420,7 @@ impl From<&HashJoinExec> for HashJoinExecBuilder { | |
| partition_mode: exec.mode, | ||
| null_equality: exec.null_equality, | ||
| null_aware: exec.null_aware, | ||
| fetch: exec.fetch, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -646,6 +658,8 @@ pub struct HashJoinExec { | |
| /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. | ||
| /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. | ||
| dynamic_filter: Option<HashJoinExecDynamicFilter>, | ||
| /// Maximum number of rows to return | ||
| fetch: Option<usize>, | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
|
|
@@ -760,6 +774,11 @@ impl HashJoinExec { | |
| self.null_equality | ||
| } | ||
|
|
||
| /// Get the fetch (limit) for this join | ||
| pub fn fetch(&self) -> Option<usize> { | ||
|
jonathanc-n marked this conversation as resolved.
Outdated
|
||
| self.fetch | ||
| } | ||
|
|
||
| /// Get the dynamic filter expression for testing purposes. | ||
| /// Returns `None` if no dynamic filter has been set. | ||
| /// | ||
|
|
@@ -979,6 +998,9 @@ impl DisplayAs for HashJoinExec { | |
| } else { | ||
| "" | ||
| }; | ||
| let display_fetch = self | ||
| .fetch | ||
| .map_or_else(String::new, |f| format!(", fetch={f}")); | ||
| let on = self | ||
| .on | ||
| .iter() | ||
|
|
@@ -987,13 +1009,14 @@ impl DisplayAs for HashJoinExec { | |
| .join(", "); | ||
| write!( | ||
| f, | ||
| "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", | ||
| "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}", | ||
| self.mode, | ||
| self.join_type, | ||
| on, | ||
| display_filter, | ||
| display_projections, | ||
| display_null_equality, | ||
| display_fetch, | ||
| ) | ||
| } | ||
| DisplayFormatType::TreeRender => { | ||
|
|
@@ -1020,6 +1043,10 @@ impl DisplayAs for HashJoinExec { | |
| writeln!(f, "filter={filter}")?; | ||
| } | ||
|
|
||
| if let Some(fetch) = self.fetch { | ||
| writeln!(f, "fetch={fetch}")?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
|
|
@@ -1122,6 +1149,7 @@ impl ExecutionPlan for HashJoinExec { | |
| )?, | ||
| // Keep the dynamic filter, bounds accumulator will be reset | ||
| dynamic_filter: self.dynamic_filter.clone(), | ||
| fetch: self.fetch, | ||
| })) | ||
| } | ||
|
|
||
|
|
@@ -1145,6 +1173,7 @@ impl ExecutionPlan for HashJoinExec { | |
| cache: self.cache.clone(), | ||
| // Reset dynamic filter and bounds accumulator to initial state | ||
| dynamic_filter: None, | ||
| fetch: self.fetch, | ||
| })) | ||
| } | ||
|
|
||
|
|
@@ -1312,6 +1341,7 @@ impl ExecutionPlan for HashJoinExec { | |
| build_accumulator, | ||
| self.mode, | ||
| self.null_aware, | ||
| self.fetch, | ||
| ))) | ||
| } | ||
|
|
||
|
|
@@ -1476,12 +1506,53 @@ impl ExecutionPlan for HashJoinExec { | |
| filter: dynamic_filter, | ||
| build_accumulator: OnceLock::new(), | ||
| }), | ||
| fetch: self.fetch, | ||
| }); | ||
| result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>); | ||
| } | ||
| } | ||
| Ok(result) | ||
| } | ||
|
|
||
| fn supports_limit_pushdown(&self) -> bool { | ||
| // Hash join execution plan does not support pushing limit down through to children | ||
| // because the children don't know about the join condition and can't | ||
| // determine how many rows to produce | ||
| false | ||
| } | ||
|
|
||
| fn fetch(&self) -> Option<usize> { | ||
| self.fetch | ||
| } | ||
|
|
||
| fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this method really needed ? Alternatively it could be implemented as: HashJoinExecBuilder::from(self)
.with_fetch(limit)
.build()
.ok()
.map(|exec| Arc::new(exec) as _)This way it won't keep the calculated state.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is needed, the optimizer uses it to pass the limit to the exec node. |
||
| // Null-aware anti join requires seeing ALL probe rows to check for NULLs. | ||
| // If any probe row has NULL, the output must be empty. | ||
| // We can't stop early or we might miss a NULL and return wrong results. | ||
| if self.null_aware { | ||
|
jonathanc-n marked this conversation as resolved.
Outdated
|
||
| return None; | ||
| } | ||
|
|
||
| Some(Arc::new(HashJoinExec { | ||
| left: Arc::clone(&self.left), | ||
| right: Arc::clone(&self.right), | ||
| on: self.on.clone(), | ||
| filter: self.filter.clone(), | ||
| join_type: self.join_type, | ||
| join_schema: Arc::clone(&self.join_schema), | ||
| left_fut: Arc::clone(&self.left_fut), | ||
| random_state: self.random_state.clone(), | ||
| mode: self.mode, | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| projection: self.projection.clone(), | ||
| column_indices: self.column_indices.clone(), | ||
| null_equality: self.null_equality, | ||
| null_aware: self.null_aware, | ||
| cache: self.cache.clone(), | ||
| dynamic_filter: self.dynamic_filter.clone(), | ||
| fetch: limit, | ||
| })) | ||
| } | ||
| } | ||
|
|
||
| /// Accumulator for collecting min/max bounds from build-side data during hash join. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.