Skip to content

Commit 8a46c67

Browse files
committed
Fix RightSemi joins
1 parent c333d2d commit 8a46c67

4 files changed

Lines changed: 116 additions & 173 deletions

File tree

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ use crate::PhysicalOptimizerRule;
2727
use datafusion_common::config::ConfigOptions;
2828
use datafusion_common::error::Result;
2929
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30-
use datafusion_common::{internal_err, JoinSide, JoinType};
30+
use datafusion_common::{internal_err, DataFusionError, JoinSide, JoinType};
3131
use datafusion_expr_common::sort_properties::SortProperties;
3232
use datafusion_physical_expr::expressions::Column;
3333
use datafusion_physical_expr::LexOrdering;
3434
use datafusion_physical_plan::execution_plan::EmissionType;
35-
use datafusion_physical_plan::joins::utils::ColumnIndex;
35+
use datafusion_physical_plan::joins::utils::{check_join_is_valid, ColumnIndex};
3636
use datafusion_physical_plan::joins::{
3737
CrossJoinExec, GraceHashJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
3838
StreamJoinPartitionMode, SymmetricHashJoinExec,
@@ -173,6 +173,13 @@ pub(crate) fn try_collect_left(
173173
let left = hash_join.left();
174174
let right = hash_join.right();
175175

176+
// Skip collect-left rewrite if the join currently has inconsistent schemas (e.g. required
177+
// columns were projected away temporarily). This mirrors the legacy hash join behavior where
178+
// collect-left is only attempted once the join inputs are fully valid.
179+
if check_join_is_valid(&left.schema(), &right.schema(), hash_join.on()).is_err() {
180+
return Ok(None);
181+
}
182+
176183
let left_can_collect = ignore_threshold
177184
|| supports_collect_by_thresholds(
178185
&**left,
@@ -191,33 +198,23 @@ pub(crate) fn try_collect_left(
191198
if hash_join.join_type().supports_swap()
192199
&& should_swap_join_order(&**left, &**right)?
193200
{
194-
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
201+
match hash_join.swap_inputs(PartitionMode::CollectLeft) {
202+
Ok(plan) => Ok(Some(plan)),
203+
Err(err) if is_missing_join_columns(&err) => Ok(None),
204+
Err(err) => Err(err),
205+
}
195206
} else {
196-
Ok(Some(Arc::new(HashJoinExec::try_new(
197-
Arc::clone(left),
198-
Arc::clone(right),
199-
hash_join.on().to_vec(),
200-
hash_join.filter().cloned(),
201-
hash_join.join_type(),
202-
hash_join.projection.clone(),
203-
PartitionMode::CollectLeft,
204-
hash_join.null_equality(),
205-
)?)))
207+
build_collect_left_exec(hash_join, left, right)
206208
}
207209
}
208-
(true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
209-
Arc::clone(left),
210-
Arc::clone(right),
211-
hash_join.on().to_vec(),
212-
hash_join.filter().cloned(),
213-
hash_join.join_type(),
214-
hash_join.projection.clone(),
215-
PartitionMode::CollectLeft,
216-
hash_join.null_equality(),
217-
)?))),
210+
(true, false) => build_collect_left_exec(hash_join, left, right),
218211
(false, true) => {
219212
if hash_join.join_type().supports_swap() {
220-
hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
213+
match hash_join.swap_inputs(PartitionMode::CollectLeft) {
214+
Ok(plan) => Ok(Some(plan)),
215+
Err(err) if is_missing_join_columns(&err) => Ok(None),
216+
Err(err) => Err(err),
217+
}
221218
} else {
222219
Ok(None)
223220
}
@@ -226,6 +223,35 @@ pub(crate) fn try_collect_left(
226223
}
227224
}
228225

226+
fn is_missing_join_columns(err: &DataFusionError) -> bool {
227+
matches!(
228+
err,
229+
DataFusionError::Plan(msg)
230+
if msg.contains("The left or right side of the join does not have all columns")
231+
)
232+
}
233+
234+
fn build_collect_left_exec(
235+
hash_join: &HashJoinExec,
236+
left: &Arc<dyn ExecutionPlan>,
237+
right: &Arc<dyn ExecutionPlan>,
238+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
239+
match HashJoinExec::try_new(
240+
Arc::clone(left),
241+
Arc::clone(right),
242+
hash_join.on().to_vec(),
243+
hash_join.filter().cloned(),
244+
hash_join.join_type(),
245+
hash_join.projection.clone(),
246+
PartitionMode::CollectLeft,
247+
hash_join.null_equality(),
248+
) {
249+
Ok(exec) => Ok(Some(Arc::new(exec))),
250+
Err(err) if is_missing_join_columns(&err) => Ok(None),
251+
Err(err) => Err(err),
252+
}
253+
}
254+
229255
/// Creates a partitioned hash join execution plan, swapping inputs if beneficial.
230256
///
231257
/// Checks if the join order should be swapped based on the join type and input statistics.

datafusion/physical-plan/src/joins/grace_hash_join/exec.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,13 @@ use datafusion_common::{
5353
internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
5454
};
5555
use datafusion_execution::TaskContext;
56-
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
5756
use datafusion_physical_expr::equivalence::{
5857
join_equivalence_properties, ProjectionMapping,
5958
};
6059
use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
6160
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
6261

63-
use crate::joins::grace_hash_join::stream::{
64-
GraceAccumulator, GraceHashJoinStream, SpillFut,
65-
};
62+
use crate::joins::grace_hash_join::stream::{GraceHashJoinStream, SpillFut};
6663
use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator;
6764
use crate::metrics::SpillMetrics;
6865
use crate::spill::spill_manager::SpillLocation;
@@ -105,7 +102,6 @@ pub struct GraceHashJoinExec {
105102
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
106103
/// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
107104
dynamic_filter: Option<HashJoinExecDynamicFilter>,
108-
accumulator: Arc<GraceAccumulator>,
109105
}
110106

111107
#[derive(Clone)]
@@ -183,9 +179,6 @@ impl GraceHashJoinExec {
183179
&on,
184180
projection.as_ref(),
185181
)?;
186-
let partitions = left.output_partitioning().partition_count();
187-
let accumulator = GraceAccumulator::new(partitions);
188-
189182
let metrics = ExecutionPlanMetricsSet::new();
190183
// Initialize both dynamic filter and bounds accumulator to None
191184
// They will be set later if dynamic filtering is enabled
@@ -203,7 +196,6 @@ impl GraceHashJoinExec {
203196
null_equality,
204197
cache,
205198
dynamic_filter: None,
206-
accumulator,
207199
})
208200
}
209201

@@ -547,7 +539,6 @@ impl ExecutionPlan for GraceHashJoinExec {
547539
self: Arc<Self>,
548540
children: Vec<Arc<dyn ExecutionPlan>>,
549541
) -> Result<Arc<dyn ExecutionPlan>> {
550-
let new_partition_count = children[0].output_partitioning().partition_count();
551542
Ok(Arc::new(GraceHashJoinExec {
552543
left: Arc::clone(&children[0]),
553544
right: Arc::clone(&children[1]),
@@ -570,12 +561,10 @@ impl ExecutionPlan for GraceHashJoinExec {
570561
)?,
571562
// Keep the dynamic filter, bounds accumulator will be reset
572563
dynamic_filter: self.dynamic_filter.clone(),
573-
accumulator: GraceAccumulator::new(new_partition_count),
574564
}))
575565
}
576566

577567
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
578-
let partition_count = self.left.output_partitioning().partition_count();
579568
Ok(Arc::new(GraceHashJoinExec {
580569
left: Arc::clone(&self.left),
581570
right: Arc::clone(&self.right),
@@ -591,7 +580,6 @@ impl ExecutionPlan for GraceHashJoinExec {
591580
cache: self.cache.clone(),
592581
// Reset dynamic filter and bounds accumulator to initial state
593582
dynamic_filter: None,
594-
accumulator: GraceAccumulator::new(partition_count),
595583
}))
596584
}
597585

@@ -654,7 +642,6 @@ impl ExecutionPlan for GraceHashJoinExec {
654642
let on = self.on.clone();
655643
let spill_left_clone = Arc::clone(&spill_left);
656644
let spill_right_clone = Arc::clone(&spill_right);
657-
let accumulator_clone = Arc::clone(&self.accumulator);
658645
let join_metrics_clone = Arc::clone(&join_metrics);
659646
let spill_fut = OnceFut::new(async move {
660647
let (left_idx, right_idx) = partition_and_spill(
@@ -670,14 +657,16 @@ impl ExecutionPlan for GraceHashJoinExec {
670657
partition,
671658
)
672659
.await?;
673-
accumulator_clone
674-
.report_partition(partition, left_idx.clone(), right_idx.clone())
675-
.await;
676660
Ok(SpillFut::new(partition, left_idx, right_idx))
677661
});
678662

663+
let left_input_schema = self.left.schema();
664+
let right_input_schema = self.right.schema();
665+
679666
Ok(Box::pin(GraceHashJoinStream::new(
680667
self.schema(),
668+
left_input_schema,
669+
right_input_schema,
681670
spill_fut,
682671
spill_left,
683672
spill_right,
@@ -689,7 +678,6 @@ impl ExecutionPlan for GraceHashJoinExec {
689678
column_indices_after_projection,
690679
join_metrics,
691680
context,
692-
Arc::clone(&self.accumulator),
693681
)))
694682
}
695683

@@ -845,7 +833,6 @@ impl ExecutionPlan for GraceHashJoinExec {
845833
filter: dynamic_filter,
846834
bounds_accumulator: OnceLock::new(),
847835
}),
848-
accumulator: Arc::clone(&self.accumulator),
849836
});
850837
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
851838
}

0 commit comments

Comments
 (0)