From 421e24902d7d96f9b9df90318b8b6f341ad7d6fe Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Fri, 19 Jun 2026 10:38:09 -0400 Subject: [PATCH] . --- .../joins/sort_merge_join/bitwise_stream.rs | 1 + .../src/joins/sort_merge_join/tests.rs | 99 +++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs index 99aef6ed82a36..a26793f405a5a 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs @@ -1177,6 +1177,7 @@ impl BitwiseSortMergeJoinStream { self.emit_outer_batch()?; self.pending_boundary = Some(PendingBoundary::NoFilter { saved_keys }); + self.outer_batch = None; match ready!(self.poll_next_outer_batch(cx)) { Err(e) => return Poll::Ready(Err(e)), diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs index 338c5111d223d..bc8b15472a63e 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs @@ -4342,6 +4342,105 @@ async fn no_filter_boundary_pending_loses_outer_rows() -> Result<()> { Ok(()) } +/// Verifies no-filter semi/anti joins when a matching outer key group spans +/// multiple batches and the next outer batch is temporarily unavailable. +/// +/// The outer input has an unmatched prefix row followed by a matching key +/// group that continues in the next batch. Both rows with key=1 should be +/// treated as matched. Returning `Pending` before the second batch forces +/// `poll_join` to return and later resume from its top-level state, rather +/// than continuing the same in-progress boundary loop. +#[tokio::test] +async fn no_filter_boundary_pending_with_unmatched_prefix() -> Result<()> { + let left_schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Int32, false), + Field::new("b1", DataType::Int32, false), + Field::new("c1", DataType::Int32, false), + ])); + let right_schema = Arc::new(Schema::new(vec![ + Field::new("a2", DataType::Int32, false), + Field::new("b1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ])); + + // Key=0 is unmatched. Key=1 matches inner and spans the batch boundary. + let outer_batch1 = RecordBatch::try_new( + Arc::clone(&left_schema), + vec![ + Arc::new(Int32Array::from(vec![0, 1])), + Arc::new(Int32Array::from(vec![0, 1])), + Arc::new(Int32Array::from(vec![0, 10])), + ], + )?; + let outer_batch2 = RecordBatch::try_new( + Arc::clone(&left_schema), + vec![ + Arc::new(Int32Array::from(vec![2])), + Arc::new(Int32Array::from(vec![1])), // same key + Arc::new(Int32Array::from(vec![20])), + ], + )?; + + // Key=1 matches two outer rows. Key=2 keeps the inner input non-exhausted. + let inner_batch = RecordBatch::try_new( + Arc::clone(&right_schema), + vec![ + Arc::new(Int32Array::from(vec![100, 200])), + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(Int32Array::from(vec![50, 60])), + ], + )?; + + let on_outer: Vec = vec![Arc::new(Column::new("b1", 1))]; + let on_inner: Vec = vec![Arc::new(Column::new("b1", 1))]; + + for (join_type, expected_a1) in [(LeftSemi, vec![1, 2]), (LeftAnti, vec![0])] { + let outer: SendableRecordBatchStream = Box::pin(PendingStream::new( + vec![outer_batch1.clone(), outer_batch2.clone()], + vec![false, true], // Pending before 2nd outer batch + )); + let inner: SendableRecordBatchStream = + Box::pin(PendingStream::new(vec![inner_batch.clone()], vec![false])); + + let metrics = ExecutionPlanMetricsSet::new(); + let inner_schema = inner.schema(); + let (reservation, spill_manager, runtime_env) = + test_stream_resources(inner_schema, &metrics); + let stream = BitwiseSortMergeJoinStream::try_new( + Arc::clone(&left_schema), + vec![SortOptions::default()], + NullEquality::NullEqualsNothing, + outer, + inner, + on_outer.clone(), + on_inner.clone(), + None, // no filter + join_type, + 8192, + 0, + &metrics, + reservation, + spill_manager, + runtime_env, + )?; + + let batches = collect_stream(stream).await?; + let actual_a1 = batches + .iter() + .flat_map(|batch| { + let values = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + (0..batch.num_rows()).map(|row| values.value(row)) + }) + .collect::>(); + assert_eq!(actual_a1, expected_a1, "{join_type:?}"); + } + Ok(()) +} + /// Tests the filtered boundary Pending re-entry: outer key group spans /// batches with a filter, and poll_next_outer_batch returns Pending. ///