Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ impl BitwiseSortMergeJoinStream {
self.emit_outer_batch()?;
self.pending_boundary =
Some(PendingBoundary::NoFilter { saved_keys });
self.outer_batch = None;

match ready!(self.poll_next_outer_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Expand Down
99 changes: 99 additions & 0 deletions datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4342,6 +4342,105 @@ async fn no_filter_boundary_pending_loses_outer_rows() -> Result<()> {
Ok(())
}

/// Verifies no-filter semi/anti joins when a matching outer key group spans
/// multiple batches and the next outer batch is temporarily unavailable.
///
/// The outer input has an unmatched prefix row followed by a matching key
/// group that continues in the next batch. Both rows with key=1 should be
/// treated as matched. Returning `Pending` before the second batch forces
/// `poll_join` to return and later resume from its top-level state, rather
/// than continuing the same in-progress boundary loop.
#[tokio::test]
async fn no_filter_boundary_pending_with_unmatched_prefix() -> Result<()> {
let left_schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::Int32, false),
Field::new("b1", DataType::Int32, false),
Field::new("c1", DataType::Int32, false),
]));
let right_schema = Arc::new(Schema::new(vec![
Field::new("a2", DataType::Int32, false),
Field::new("b1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));

// Key=0 is unmatched. Key=1 matches inner and spans the batch boundary.
let outer_batch1 = RecordBatch::try_new(
Arc::clone(&left_schema),
vec![
Arc::new(Int32Array::from(vec![0, 1])),
Arc::new(Int32Array::from(vec![0, 1])),
Arc::new(Int32Array::from(vec![0, 10])),
],
)?;
let outer_batch2 = RecordBatch::try_new(
Arc::clone(&left_schema),
vec![
Arc::new(Int32Array::from(vec![2])),
Arc::new(Int32Array::from(vec![1])), // same key
Arc::new(Int32Array::from(vec![20])),
],
)?;

// Key=1 matches two outer rows. Key=2 keeps the inner input non-exhausted.
let inner_batch = RecordBatch::try_new(
Arc::clone(&right_schema),
vec![
Arc::new(Int32Array::from(vec![100, 200])),
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(Int32Array::from(vec![50, 60])),
],
)?;

let on_outer: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
let on_inner: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];

for (join_type, expected_a1) in [(LeftSemi, vec![1, 2]), (LeftAnti, vec![0])] {
let outer: SendableRecordBatchStream = Box::pin(PendingStream::new(
vec![outer_batch1.clone(), outer_batch2.clone()],
vec![false, true], // Pending before 2nd outer batch
));
let inner: SendableRecordBatchStream =
Box::pin(PendingStream::new(vec![inner_batch.clone()], vec![false]));

let metrics = ExecutionPlanMetricsSet::new();
let inner_schema = inner.schema();
let (reservation, spill_manager, runtime_env) =
test_stream_resources(inner_schema, &metrics);
let stream = BitwiseSortMergeJoinStream::try_new(
Arc::clone(&left_schema),
vec![SortOptions::default()],
NullEquality::NullEqualsNothing,
outer,
inner,
on_outer.clone(),
on_inner.clone(),
None, // no filter
join_type,
8192,
0,
&metrics,
reservation,
spill_manager,
runtime_env,
)?;

let batches = collect_stream(stream).await?;
let actual_a1 = batches
.iter()
.flat_map(|batch| {
let values = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
(0..batch.num_rows()).map(|row| values.value(row))
})
.collect::<Vec<_>>();
assert_eq!(actual_a1, expected_a1, "{join_type:?}");
}
Ok(())
}

/// Tests the filtered boundary Pending re-entry: outer key group spans
/// batches with a filter, and poll_next_outer_batch returns Pending.
///
Expand Down
Loading