From ce4a0c2bd0db549c8ff51ad56ad94e4c2da09a0c Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 18 Jun 2026 13:38:19 -0700 Subject: [PATCH 1/2] Do not concatenate batches in the core of the hash join This avoids significant memory pressure on large queries. --- .../physical-plan/src/joins/hash_join/exec.rs | 338 +++++++++++++----- .../src/joins/hash_join/stream.rs | 83 +++-- datafusion/physical-plan/src/joins/utils.rs | 273 +++++++++++++- 3 files changed, 575 insertions(+), 119 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 5ef767ebddb16..cce851e6e6945 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -186,13 +186,23 @@ fn try_create_array_map( /// HashTable and input data for the left (build side) of a join pub(super) struct JoinLeftData { - /// The hash table with indices into `batch` + /// The hash table with flat indices into the logical concatenation of `batches`. /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown pub(super) map: Arc, - /// The input rows for the build side - batch: RecordBatch, - /// The build side on expressions values - values: Vec, + /// The build-side input rows, kept as the original (un-concatenated) batches. + /// + /// The batches are ordered to match the hash map's flat index space: index `0` + /// is the first row of `batches[0]`, and indices run sequentially through the + /// batches. (This is the reverse of the order in which batches were received, + /// matching the order rows are inserted into the hash map.) Keeping the batches + /// separate avoids copying the entire build side into one contiguous batch. + batches: Vec, + /// Schema of the build-side batches. Retained so typed null/empty arrays can be + /// produced for build-side columns even when `batches` carries no rows. + schema: SchemaRef, + /// The build side on expressions values, laid out per key column then per + /// batch (`values[key_column][batch]`), aligned with `batches`. + values: Vec>, /// Shared bitmap builder for visited left indices visited_indices_bitmap: SharedBitmapBuilder, /// Counter of running probe-threads, potentially @@ -223,13 +233,19 @@ impl JoinLeftData { &self.map } - /// returns a reference to the build side batch - pub(super) fn batch(&self) -> &RecordBatch { - &self.batch + /// returns the build-side batches, ordered to match the hash map flat indices + pub(super) fn batches(&self) -> &[RecordBatch] { + &self.batches + } + + /// returns the schema of the build-side batches + pub(super) fn schema(&self) -> &SchemaRef { + &self.schema } - /// returns a reference to the build side expressions values - pub(super) fn values(&self) -> &[ArrayRef] { + /// returns a reference to the build side expressions values, laid out per key + /// column then per batch (`values()[key_column][batch]`) + pub(super) fn values(&self) -> &[Vec] { &self.values } @@ -595,15 +611,17 @@ impl From<&HashJoinExec> for HashJoinExecBuilder { /// Execution proceeds in 2 stages: /// /// 1. the **build phase** creates a hash table from the tuples of the build side, -/// and single concatenated batch containing data from all fetched record batches. -/// Resulting hash table stores hashed join-key fields for each row as a key, and -/// indices of corresponding rows in concatenated batch. +/// while retaining the fetched record batches as-is (without concatenating them +/// into a single batch). The hash table stores hashed join-key fields for each +/// row as a key, and a flat index into the logical concatenation of the build +/// batches; during the probe phase that flat index is resolved to a +/// `(batch, row)` pair and the rows are gathered with `interleave`. /// /// When using the standard `JoinHashMap`, hash join uses LIFO data structure as a hash table, /// and in order to retain original build-side input order while obtaining data during probe phase, /// hash table is updated by iterating batch sequence in reverse order -- it allows to /// keep rows with smaller indices "on the top" of hash table, and still maintain -/// correct indexing for concatenated build-side data batch. +/// correct indexing into the (reverse-ordered) build-side batches. /// /// Example of build phase for 3 record batches: /// @@ -1906,8 +1924,9 @@ fn should_collect_min_max_for_perfect_hash( /// before updating the filter exactly once. /// /// # Returns -/// `JoinLeftData` containing the hash map, consolidated batch, join key values, -/// visited indices bitmap, and computed bounds (if requested). +/// `JoinLeftData` containing the hash map, the (un-concatenated) build-side +/// batches, join key values, visited indices bitmap, and computed bounds (if +/// requested). #[expect(clippy::too_many_arguments)] async fn collect_left_input( random_state: RandomState, @@ -1982,8 +2001,8 @@ async fn collect_left_input( _ => None, }; - let (join_hash_map, batch, left_values) = - if let Some((array_map, batch, left_value)) = try_create_array_map( + let (join_hash_map, batches, values) = if let Some((array_map, batch, left_value)) = + try_create_array_map( &bounds, &schema, &batches, @@ -1993,71 +2012,94 @@ async fn collect_left_input( config.execution.perfect_hash_join_min_key_density, null_equality, )? { - array_map_created_count.add(1); - metrics.build_mem_used.add(array_map.size()); - - (Map::ArrayMap(array_map), batch, left_value) + array_map_created_count.add(1); + metrics.build_mem_used.add(array_map.size()); + + // The perfect-hash (ArrayMap) path is only taken for small build sides and + // returns a single concatenated batch; store it as a one-element batch list + // so its flat indices map trivially to `(batch 0, row)`. `left_value` holds + // one array per key column, so wrap each in a single-batch vector. + let values = left_value.into_iter().map(|arr| vec![arr]).collect(); + (Map::ArrayMap(array_map), vec![batch], values) + } else { + // Estimation of memory size, required for hashtable, prior to allocation. + // Final result can be verified using `RawTable.allocation_info()` + let fixed_size_u32 = size_of::(); + let fixed_size_u64 = size_of::(); + + // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the + // `u64` indice variant + let mut hashmap: Box = if num_rows > u32::MAX as usize { + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU64::with_capacity(num_rows)) } else { - // Estimation of memory size, required for hashtable, prior to allocation. - // Final result can be verified using `RawTable.allocation_info()` - let fixed_size_u32 = size_of::(); - let fixed_size_u64 = size_of::(); - - // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the - // `u64` indice variant - // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown - let mut hashmap: Box = if num_rows > u32::MAX as usize { - let estimated_hashtable_size = - estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU64::with_capacity(num_rows)) - } else { - let estimated_hashtable_size = - estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU32::with_capacity(num_rows)) - }; + let estimated_hashtable_size = + estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU32::with_capacity(num_rows)) + }; - let mut hashes_buffer = Vec::new(); - let mut offset = 0; - - let batches_iter = batches.iter().rev(); - - // Updating hashmap starting from the last batch - for batch in batches_iter.clone() { - hashes_buffer.clear(); - hashes_buffer.resize(batch.num_rows(), 0); - update_hash( - &on_left, - batch, - &mut *hashmap, - offset, - &random_state, - &mut hashes_buffer, - 0, - true, - null_equality, - )?; - offset += batch.num_rows(); - } + // Reverse the collected batches so their order matches the hash map's flat + // index space. The hash map is populated from the last batch to the first + // (so that earlier rows keep the smaller indices); storing the batches in + // that same reversed order lets a flat index resolve to a `(batch, row)` + // pair during the probe phase, without concatenating the build side. + let mut batches = batches; + batches.reverse(); + + // Ensure there is always at least one batch so the build-side column data + // types remain available (e.g. for all-null columns on an empty build side). + if batches.is_empty() { + batches.push(RecordBatch::new_empty(Arc::clone(&schema))); + } - // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + // Updating hashmap starting from the (reversed) first batch + for batch in &batches { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + batch, + &mut *hashmap, + offset, + &random_state, + &mut hashes_buffer, + 0, + true, + null_equality, + )?; + offset += batch.num_rows(); + } - let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + // Evaluate the build-side join keys once per batch, laid out per key column + // then per batch (`values[key_column][batch]`), aligned with `batches`. The + // probe phase gathers matched keys from these arrays by flat index. + let values = on_left + .iter() + .map(|expr| { + batches + .iter() + .map(|batch| expr.evaluate(batch)?.into_array(batch.num_rows())) + .collect::>>() + }) + .collect::>>()?; - (Map::HashMap(hashmap), batch, left_values) - }; + (Map::HashMap(hashmap), batches, values) + }; // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { - let bitmap_size = bit_util::ceil(batch.num_rows(), 8); + let bitmap_size = bit_util::ceil(num_rows, 8); reservation.try_grow(bitmap_size)?; metrics.build_mem_used.add(bitmap_size); - let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows()); + let mut bitmap_buffer = BooleanBufferBuilder::new(num_rows); bitmap_buffer.append_n(num_rows, false); bitmap_buffer } else { @@ -2072,12 +2114,13 @@ async fn collect_left_input( // If the build side is small enough we can use IN list pushdown. // If it's too big we fall back to pushing down a reference to the hash table. // See `PushdownStrategy` for more details. - let estimated_size = left_values + let estimated_size = values .iter() + .flatten() .map(|arr| arr.get_array_memory_size()) .sum::(); - if left_values.is_empty() - || left_values[0].is_empty() + if values.is_empty() + || values[0].iter().all(|arr| arr.is_empty()) || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size || map.num_of_distinct_key() > config @@ -2085,10 +2128,22 @@ async fn collect_left_input( .hash_join_inlist_pushdown_max_distinct_values { PushdownStrategy::Map(Arc::clone(&map)) - } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? { - PushdownStrategy::InList(in_list_values) } else { - PushdownStrategy::Map(Arc::clone(&map)) + // Building an IN-list needs one contiguous array per key column. This + // branch is only reached for small build sides, so concatenating the + // per-batch key arrays here is cheap. + let join_key_arrays = values + .iter() + .map(|per_batch| { + let refs: Vec<_> = per_batch.iter().map(|a| a.as_ref()).collect(); + Ok(arrow::compute::concat(&refs)?) + }) + .collect::>>()?; + if let Some(in_list_values) = build_struct_inlist_values(&join_key_arrays)? { + PushdownStrategy::InList(in_list_values) + } else { + PushdownStrategy::Map(Arc::clone(&map)) + } } }; @@ -2098,8 +2153,9 @@ async fn collect_left_input( let data = JoinLeftData { map, - batch, - values: left_values, + batches, + schema, + values, visited_indices_bitmap: Mutex::new(visited_indices_bitmap), probe_threads_counter: AtomicUsize::new(probe_threads_count), _reservation: reservation, @@ -3201,6 +3257,74 @@ mod tests { assert_phj_used(&metrics, use_perfect_hash_join_as_possible); } + // Full join where the BUILD (left) side spans multiple record batches, so the + // probe phase must gather build rows across batches (via `interleave`) instead + // of from a single concatenated batch. This exercises the paths unique to a + // multi-batch build side: + // - matched rows: build columns/keys gathered across batches, + // - unmatched probe rows: NULL build indices (interleave placeholder + nullif), + // - unmatched build rows: the final pass gathering real indices across batches. + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn join_full_multi_batch_build_side( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); + let batch1 = build_table_i32( + ("a1", &vec![1, 2]), + ("b1", &vec![4, 5]), + ("c1", &vec![7, 8]), + ); + // `b1 = 7` has no match on the right. + let batch2 = + build_table_i32(("a1", &vec![3]), ("b1", &vec![7]), ("c1", &vec![9])); + let schema = batch1.schema(); + // Two partitions coalesced into one stream of two batches, so the build side + // is collected as multiple (un-concatenated) batches. + let left = + TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None) + .unwrap(); + let left = Arc::new(CoalescePartitionsExec::new(left)); + + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b2", &vec![4, 5, 6]), // `b2 = 6` has no match on the left + ("c2", &vec![70, 80, 90]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + )]; + + let (columns, batches, _metrics) = join_collect( + left, + right, + on, + &JoinType::Full, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | | | | 30 | 6 | 90 | + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 7 | 9 | | | | + +----+----+----+----+----+----+ + "#); + } + + Ok(()) + } + #[apply(hash_join_exec_configs)] #[tokio::test] async fn join_left_empty_right( @@ -3301,6 +3425,56 @@ mod tests { assert_phj_used(&metrics, use_perfect_hash_join_as_possible); } + // Full join where the BUILD (left) side is empty. The build side stores a single + // empty placeholder batch, so the build-side column data types must come from the + // retained build schema rather than from row data. + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn join_full_empty_left( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); + let left = build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let schema = left.schema(); + let left = TestMemoryExec::try_new_exec(&[vec![left]], schema, None).unwrap(); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b2", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _, + )]; + let join = join( + left, + right, + on, + &JoinType::Full, + NullEquality::NullEqualsNothing, + ) + .unwrap(); + + let columns = columns(&join.schema()); + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + + let stream = join.execute(0, task_ctx).unwrap(); + let batches = common::collect(stream).await.unwrap(); + + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | | | | 10 | 4 | 70 | + | | | | 20 | 5 | 80 | + | | | | 30 | 6 | 90 | + +----+----+----+----+----+----+ + "#); + } + } + #[apply(hash_join_exec_configs)] #[tokio::test] async fn join_left_one( @@ -4644,7 +4818,7 @@ mod tests { let mut build_indices_buffer = Vec::new(); let (l, r, _) = lookup_join_hashmap( &join_hash_map, - &[left_keys_values], + &[vec![left_keys_values]], &[right_keys_values], NullEquality::NullEqualsNothing, &hashes_buffer, @@ -4706,7 +4880,7 @@ mod tests { let mut build_indices_buffer = Vec::new(); let (l, r, _) = lookup_join_hashmap( &join_hash_map, - &[left_keys_values], + &[vec![left_keys_values]], &[right_keys_values], NullEquality::NullEqualsNothing, &hashes_buffer, diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index ed605301ad4a7..734903c4aca4c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -33,7 +33,8 @@ use crate::joins::hash_join::shared_bounds::{ PartitionBounds, PartitionBuildData, SharedBuildAccumulator, }; use crate::joins::utils::{ - OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, matchable_join_keys, + OnceFut, build_batch_offsets, equal_rows_arr_multi, flat_index_to_batch_row, + get_final_indices_from_shared_bitmap, matchable_join_keys, }; use crate::stream::EmptyRecordBatchStream; use crate::{ @@ -41,8 +42,9 @@ use crate::{ hash_utils::create_hashes, joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, - StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, - build_batch_empty_build_side, build_batch_from_indices, + StatefulStreamResult, adjust_indices_by_join_type, + apply_join_filter_to_indices_multi, build_batch_empty_build_side, + build_batch_from_indices, build_batch_from_indices_multi, need_produce_result_in_final, }, }; @@ -395,7 +397,7 @@ impl RecordBatchStream for HashJoinStream { #[expect(clippy::too_many_arguments)] pub(super) fn lookup_join_hashmap( build_hashmap: &dyn JoinHashMapType, - build_side_values: &[ArrayRef], + build_side_values: &[Vec], probe_side_values: &[ArrayRef], null_equality: NullEquality, hashes_buffer: &[u64], @@ -421,7 +423,7 @@ pub(super) fn lookup_join_hashmap( // TODO: optimize equal_rows_arr to avoid allocation of intermediate arrays // https://github.com/apache/datafusion/issues/12131 - let (build_indices, probe_indices) = equal_rows_arr( + let (build_indices, probe_indices) = equal_rows_arr_multi( &build_indices_unfiltered, &probe_indices_unfiltered, build_side_values, @@ -523,7 +525,7 @@ impl HashJoinStream { join_type: JoinType, left_data: &JoinLeftData, ) -> HashJoinStreamState { - let build_empty = left_data.batch().num_rows() == 0; + let build_empty = left_data.batches().iter().all(|b| b.num_rows() == 0); // The map can be empty even when the build side has rows: under // `NullEqualsNothing`, build rows with a NULL join key are omitted. For // join types whose every output row requires a build match, that still @@ -784,7 +786,7 @@ impl HashJoinStream { if is_empty { let result = build_batch_empty_build_side( &self.schema, - build_side.left_data.batch(), + build_side.left_data.schema(), &state.batch, &self.column_indices, self.join_type, @@ -841,14 +843,13 @@ impl HashJoinStream { // apply join filter if exists let (left_indices, right_indices) = if let Some(filter) = &self.filter { - apply_join_filter_to_indices( - build_side.left_data.batch(), + apply_join_filter_to_indices_multi( + build_side.left_data.batches(), &state.batch, left_indices, right_indices, filter, JoinSide::Left, - None, self.join_type, )? } else { @@ -903,23 +904,34 @@ impl HashJoinStream { )?; // Build output batch and push to coalescer - let (build_batch, probe_batch, join_side) = - if self.join_type == JoinType::RightMark { - (&state.batch, build_side.left_data.batch(), JoinSide::Right) - } else { - (build_side.left_data.batch(), &state.batch, JoinSide::Left) - }; - - let batch = build_batch_from_indices( - &self.schema, - build_batch, - probe_batch, - &left_indices, - &right_indices, - &self.column_indices, - join_side, - self.join_type, - )?; + let batch = if self.join_type == JoinType::RightMark { + // RightMark output is the probe (right) columns plus a mark column; the + // build-side (left) data is never read here (see `build_join_schema`), + // so a single-batch call with an empty left placeholder is sufficient. + let empty_left = + RecordBatch::new_empty(Arc::clone(build_side.left_data.schema())); + build_batch_from_indices( + &self.schema, + &state.batch, + &empty_left, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Right, + self.join_type, + )? + } else { + build_batch_from_indices_multi( + &self.schema, + build_side.left_data.batches(), + &state.batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + self.join_type, + )? + }; let push_status = self.output_buffer.push_batch(batch)?; @@ -995,15 +1007,20 @@ impl HashJoinStream { .probe_side_non_empty .load(Ordering::Relaxed) { - // Since null_aware validation ensures single column join, we only check the first column - let build_key_column = &build_side.left_data.values()[0]; + // Since null_aware validation ensures single column join, we only check + // the first column. The build-side key values are stored per build batch + // and `left_side` holds flat indices into their logical concatenation, so + // resolve each flat index to its (batch, row) before testing for NULL. + let build_key_batches = &build_side.left_data.values()[0]; + let offsets = build_batch_offsets(build_key_batches.iter().map(|a| a.len())); // Filter out indices where the key is NULL let filtered_indices: Vec = left_side .iter() .filter_map(|idx| { - let idx_usize = idx.unwrap() as usize; - if build_key_column.is_null(idx_usize) { + let (batch, row) = + flat_index_to_batch_row(&offsets, idx.unwrap() as usize); + if build_key_batches[batch].is_null(row) { None // Skip rows with NULL keys } else { Some(idx.unwrap()) @@ -1029,9 +1046,9 @@ impl HashJoinStream { // Push final unmatched indices to output buffer if !left_side.is_empty() { let empty_right_batch = RecordBatch::new_empty(self.right.schema()); - let batch = build_batch_from_indices( + let batch = build_batch_from_indices_multi( &self.schema, - build_side.left_data.batch(), + build_side.left_data.batches(), &empty_right_batch, &left_side, &right_side, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 7ecace6b0e530..897b732e5a819 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -54,7 +54,7 @@ use arrow::array::{ }; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::kernels::cmp::eq; -use arrow::compute::{self, FilterBuilder, and, take}; +use arrow::compute::{self, FilterBuilder, and, interleave, take}; use arrow::datatypes::{ ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type, }; @@ -1373,13 +1373,218 @@ pub(crate) fn build_batch_from_indices( Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } +/// Computes prefix-sum offsets for a sequence of build-side batches. +/// +/// `offsets[k]` is the flat row index at which batch `k` begins in the logical +/// concatenation of all batches, and `offsets[n]` is the total row count. This +/// lets a flat build-side index be mapped back to a `(batch, row)` pair without +/// materializing the concatenation. +pub(crate) fn build_batch_offsets( + batch_row_counts: impl Iterator, +) -> Vec { + let mut offsets = vec![0usize]; + let mut acc = 0usize; + for n in batch_row_counts { + acc += n; + offsets.push(acc); + } + offsets +} + +/// Maps a flat build-side row index to a `(batch, row)` pair using `offsets` +/// (see [`build_batch_offsets`]). +#[inline] +pub(crate) fn flat_index_to_batch_row(offsets: &[usize], flat: usize) -> (usize, usize) { + // The largest `k` with `offsets[k] <= flat`. `partition_point` returns the + // count of leading elements satisfying the predicate, i.e. `k + 1`. Using + // `<=` (rather than `<`) naturally skips over empty batches. + let batch = offsets.partition_point(|&o| o <= flat) - 1; + (batch, flat - offsets[batch]) +} + +/// Builds the `(batch, row)` index pairs consumed by [`interleave`] for a set of +/// (possibly null) flat build-side indices. +/// +/// Null slots are assigned a valid placeholder `(batch, row)` so that +/// `interleave` never sees an out-of-bounds index; the value produced for those +/// slots is overwritten with null afterwards by [`take_build_array`]. The +/// placeholder is the location of the first non-null index, so it is only used +/// when at least one index is non-null (callers short-circuit the all-null case). +fn build_interleave_indices( + offsets: &[usize], + indices: &UInt64Array, +) -> Vec<(usize, usize)> { + let placeholder = indices + .iter() + .flatten() + .next() + .map(|flat| flat_index_to_batch_row(offsets, flat as usize)) + .unwrap_or((0, 0)); + + indices + .iter() + .map(|opt| match opt { + Some(flat) => flat_index_to_batch_row(offsets, flat as usize), + None => placeholder, + }) + .collect() +} + +/// Gathers rows from a build-side column physically stored across one or more +/// batches, using flat indices into the logical concatenation of those batches. +/// +/// This is the multi-batch analogue of `take(concat(arrays), indices)`, but it +/// never materializes the concatenation. `pairs` must be `None` when there is a +/// single build batch (the flat index is then the row index directly and the +/// `take` kernel is used) and `Some` precomputed [`interleave`] pairs otherwise. +fn take_build_array( + arrays: &[&dyn Array], + indices: &UInt64Array, + pairs: Option<&[(usize, usize)]>, +) -> Result { + let data_type = arrays[0].data_type(); + // Outer joins can leave every build index null; this also covers an empty + // build side. There is nothing to gather, so produce a typed all-null array. + if indices.null_count() == indices.len() { + return Ok(new_null_array(data_type, indices.len())); + } + match pairs { + // Single build batch: identical to the original `take` path, including + // native propagation of null indices to null output rows. + None => take(arrays[0], indices, None), + Some(pairs) => { + let values = interleave(arrays, pairs)?; + if indices.null_count() == 0 { + Ok(values) + } else { + // Restore nulls in the placeholder slots used for null indices. + let null_mask = compute::is_null(indices)?; + compute::nullif(&values, &null_mask) + } + } + } +} + +/// Multi-batch analogue of [`build_batch_from_indices`]. +/// +/// The build side is provided as a slice of batches (sharing one schema) rather +/// than a single concatenated batch, and `build_indices` are flat indices into +/// the logical concatenation of those batches. Build-side columns are gathered +/// with [`interleave`] (or `take`, for a single batch) instead of `take` over a +/// concatenated batch, avoiding a copy of the entire build side into one +/// contiguous batch. +#[expect(clippy::too_many_arguments)] +pub(crate) fn build_batch_from_indices_multi( + schema: &Schema, + build_batches: &[RecordBatch], + probe_batch: &RecordBatch, + build_indices: &UInt64Array, + probe_indices: &UInt32Array, + column_indices: &[ColumnIndex], + build_side: JoinSide, + join_type: JoinType, +) -> Result { + if schema.fields().is_empty() { + // For RightAnti and RightSemi joins, after `adjust_indices_by_join_type` + // the build_indices were untouched so only probe_indices hold the actual + // row count. + let row_count = match join_type { + JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(), + _ => build_indices.len(), + }; + return new_empty_schema_batch(schema, row_count); + } + + // Precompute the interleave pairs once, shared across all build-side columns. + // Only needed when the build side spans more than one batch. + let build_pairs = (build_batches.len() > 1).then(|| { + let offsets = + build_batch_offsets(build_batches.iter().map(RecordBatch::num_rows)); + build_interleave_indices(&offsets, build_indices) + }); + + let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); + + for column_index in column_indices { + let array = if column_index.side == JoinSide::None { + // For mark joins, the mark column is true when the index is not null. + Arc::new(compute::is_not_null(probe_indices)?) + } else if column_index.side == build_side { + let arrays: Vec<&dyn Array> = build_batches + .iter() + .map(|b| b.column(column_index.index).as_ref()) + .collect(); + take_build_array(&arrays, build_indices, build_pairs.as_deref())? + } else { + let array = probe_batch.column(column_index.index); + if array.is_empty() || probe_indices.null_count() == probe_indices.len() { + assert_eq!(probe_indices.null_count(), probe_indices.len()); + new_null_array(array.data_type(), probe_indices.len()) + } else { + take(array.as_ref(), probe_indices, None)? + } + }; + + columns.push(array); + } + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) +} + +/// Multi-batch analogue of [`apply_join_filter_to_indices`]. +/// +/// The build side is provided as several batches rather than one concatenated +/// batch; the intermediate batch fed to the filter is assembled with +/// [`build_batch_from_indices_multi`]. The `max_intermediate_size` chunking path +/// of the single-batch variant is not needed by the hash join and is omitted. +pub(crate) fn apply_join_filter_to_indices_multi( + build_batches: &[RecordBatch], + probe_batch: &RecordBatch, + build_indices: UInt64Array, + probe_indices: UInt32Array, + filter: &JoinFilter, + build_side: JoinSide, + join_type: JoinType, +) -> Result<(UInt64Array, UInt32Array)> { + if build_indices.is_empty() && probe_indices.is_empty() { + return Ok((build_indices, probe_indices)); + }; + + let intermediate_batch = build_batch_from_indices_multi( + filter.schema(), + build_batches, + probe_batch, + &build_indices, + &probe_indices, + filter.column_indices(), + build_side, + join_type, + )?; + + let filter_result = filter + .expression() + .evaluate(&intermediate_batch)? + .into_array(intermediate_batch.num_rows())?; + + let mask = as_boolean_array(&filter_result)?; + + let left_filtered = compute::filter(&build_indices, mask)?; + let right_filtered = compute::filter(&probe_indices, mask)?; + Ok(( + downcast_array(left_filtered.as_ref()), + downcast_array(right_filtered.as_ref()), + )) +} + /// Returns a new [RecordBatch] for a probe batch when no probe row can find a /// match: the build-side map is empty, either because the build side has no /// rows or because none of its rows has a matchable (non-NULL) join key. /// The resulting batch has [Schema] `schema`. +/// +/// `build_schema` is the schema of the (empty) build side, used only to determine +/// the data types of the build-side columns that become all-null. pub(crate) fn build_batch_empty_build_side( schema: &Schema, - build_batch: &RecordBatch, + build_schema: &Schema, probe_batch: &RecordBatch, column_indices: &[ColumnIndex], join_type: JoinType, @@ -1400,7 +1605,7 @@ pub(crate) fn build_batch_empty_build_side( .map(|column_index| match column_index.side { // left -> null array JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), + build_schema.field(column_index.index).data_type(), num_rows, ), // right -> respective right array @@ -2214,6 +2419,66 @@ pub(super) fn equal_rows_arr( )) } +/// Multi-batch analogue of [`equal_rows_arr`]. +/// +/// Build-side join-key columns are laid out per key column then per build batch +/// (`left_arrays[key][batch]`), matching the representation stored in +/// `JoinLeftData`. `indices_left` are flat indices into the logical concatenation +/// of the build batches; they always come straight from the hash table and so are +/// never null. Matched build keys are gathered with [`interleave`] (or `take`, +/// for a single batch) rather than `take` over a concatenated key array. +pub(super) fn equal_rows_arr_multi( + indices_left: &UInt64Array, + indices_right: &UInt32Array, + left_arrays: &[Vec], + right_arrays: &[ArrayRef], + null_equality: NullEquality, +) -> Result<(UInt64Array, UInt32Array)> { + let mut iter = left_arrays.iter().zip(right_arrays.iter()); + + let Some((first_left, first_right)) = iter.next() else { + return Ok((Vec::::new().into(), Vec::::new().into())); + }; + + // Interleave pairs are shared across all key columns; only needed when the + // build side spans more than one batch. + let pairs = (first_left.len() > 1).then(|| { + let offsets = build_batch_offsets(first_left.iter().map(|a| a.len())); + build_interleave_indices(&offsets, indices_left) + }); + + let take_left = |arrays: &[ArrayRef]| -> Result { + let refs: Vec<&dyn Array> = arrays.iter().map(|a| a.as_ref()).collect(); + take_build_array(&refs, indices_left, pairs.as_deref()) + }; + + let arr_left = take_left(first_left)?; + let arr_right = take(first_right.as_ref(), indices_right, None)?; + + let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; + + // Use map and try_fold to iterate over the remaining pairs of arrays. + // In each iteration, the matched rows are gathered and their equality is + // determined; the results are folded together with `and`. + equal = iter + .map(|(left, right)| { + let arr_left = take_left(left)?; + let arr_right = take(right.as_ref(), indices_right, None)?; + eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality) + }) + .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?; + + let filter_builder = FilterBuilder::new(&equal).optimize().build(); + + let left_filtered = filter_builder.filter(indices_left)?; + let right_filtered = filter_builder.filter(indices_right)?; + + Ok(( + downcast_array(left_filtered.as_ref()), + downcast_array(right_filtered.as_ref()), + )) +} + // version of eq_dyn supporting equality on null arrays fn eq_dyn_null( left: &dyn Array, @@ -4308,7 +4573,7 @@ mod tests { let result = build_batch_empty_build_side( &empty_schema, - &build_batch, + build_batch.schema_ref(), &probe_batch, &[], // no column indices with empty projection JoinType::Right, From d52732c68032b9a0359053820ca0649baff4e4f4 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 18 Jun 2026 14:32:43 -0700 Subject: [PATCH 2/2] Keep record batches unconcatenated to relieve memory pressure of large joins --- .../src/joins/nested_loop_join.rs | 198 ++++++++++++++---- .../piecewise_merge_join/classic_join.rs | 67 +++++- .../src/joins/piecewise_merge_join/exec.rs | 106 +++++++--- 3 files changed, 305 insertions(+), 66 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index d13e172352f6d..38fc4bac62c1d 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -24,8 +24,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::Poll; use super::utils::{ - asymmetric_join_output_partitioning, need_produce_result_in_final, - reorder_output_after_swap, swap_join_projection, + asymmetric_join_output_partitioning, build_batch_offsets, flat_index_to_batch_row, + need_produce_result_in_final, reorder_output_after_swap, swap_join_projection, }; use crate::common::can_project; use crate::execution_plan::{EmissionType, boundedness_from_children}; @@ -54,7 +54,8 @@ use arrow::array::{ }; use arrow::buffer::BooleanBuffer; use arrow::compute::{ - BatchCoalescer, concat_batches, filter, filter_record_batch, not, take, + BatchCoalescer, concat_batches, filter, filter_record_batch, interleave_record_batch, + not, take, take_record_batch, }; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -773,9 +774,19 @@ impl EmbeddedProjection for NestedLoopJoinExec { /// Left (build-side) data pub(crate) struct JoinLeftData { - /// Build-side data collected to single batch - batch: RecordBatch, - /// Shared bitmap builder for visited left indices + /// Build-side data kept as the original (un-concatenated) batches. + /// + /// Rows are addressed by a flat index into the logical concatenation of these + /// batches; [`JoinLeftData::locate`] resolves a flat index to a `(batch, row)` + /// pair. Keeping the batches separate avoids copying the whole build side into + /// one contiguous batch. + batches: Vec, + /// Build-side schema. Retained so it is available even when `batches` is empty. + schema: SchemaRef, + /// Prefix-sum offsets: `offsets[k]` is the first flat row index of `batches[k]`, + /// and `offsets[batches.len()]` is the total row count. + offsets: Vec, + /// Shared bitmap builder for visited left indices (flat-indexed) bitmap: SharedBitmapBuilder, /// Counter of running probe-threads, potentially able to update `bitmap` probe_threads_counter: AtomicUsize, @@ -788,21 +799,63 @@ pub(crate) struct JoinLeftData { impl JoinLeftData { pub(crate) fn new( - batch: RecordBatch, + batches: Vec, + schema: SchemaRef, + offsets: Vec, bitmap: SharedBitmapBuilder, probe_threads_counter: AtomicUsize, reservation: MemoryReservation, ) -> Self { Self { - batch, + batches, + schema, + offsets, bitmap, probe_threads_counter, reservation, } } - pub(crate) fn batch(&self) -> &RecordBatch { - &self.batch + /// Total number of build-side rows across all batches + pub(crate) fn num_rows(&self) -> usize { + *self.offsets.last().unwrap() + } + + /// Build-side schema + pub(crate) fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// The build-side batch at index `batch_idx` + pub(crate) fn batch_at(&self, batch_idx: usize) -> &RecordBatch { + &self.batches[batch_idx] + } + + /// Flat-index offsets (see field docs) + pub(crate) fn offsets(&self) -> &[usize] { + &self.offsets + } + + /// Resolve a flat build-side row index to a `(batch, row)` pair + pub(crate) fn locate(&self, flat: usize) -> (usize, usize) { + flat_index_to_batch_row(&self.offsets, flat) + } + + /// Gather the build-side rows at the given flat indices into a single batch. + /// Single batch: a zero-copy `take`; multiple batches: gathered with + /// `interleave` (the build side is not concatenated). + pub(crate) fn gather(&self, indices: &UInt32Array) -> Result { + if self.batches.len() == 1 { + Ok(take_record_batch(&self.batches[0], indices)?) + } else { + let pairs: Vec<(usize, usize)> = indices + .values() + .iter() + .map(|&i| flat_index_to_batch_row(&self.offsets, i as usize)) + .collect(); + let refs: Vec<&RecordBatch> = self.batches.iter().collect(); + Ok(interleave_record_batch(&refs, &pairs)?) + } } pub(crate) fn bitmap(&self) -> &SharedBitmapBuilder { @@ -845,11 +898,14 @@ async fn collect_left_input( ) .await?; - let merged_batch = concat_batches(&schema, &batches)?; + // Keep the build-side batches un-concatenated; rows are addressed by a flat + // index into their logical concatenation via these prefix-sum offsets. + let offsets = build_batch_offsets(batches.iter().map(RecordBatch::num_rows)); + let num_rows = *offsets.last().unwrap(); // Reserve memory for visited_left_side bitmap if required by join type let visited_left_side = if with_visited_left_side { - let n_rows = merged_batch.num_rows(); + let n_rows = num_rows; let buffer_size = n_rows.div_ceil(8); reservation.try_grow(buffer_size)?; metrics.build_mem_used.add(buffer_size); @@ -862,7 +918,9 @@ async fn collect_left_input( }; Ok(JoinLeftData::new( - merged_batch, + batches, + schema, + offsets, Mutex::new(visited_left_side), AtomicUsize::new(probe_threads_count), reservation, @@ -1699,8 +1757,15 @@ impl NestedLoopJoinStream { // The actual memory tracking is managed by the Active state's reservation. let dummy_reservation = active.reservation.new_empty(); + // This memory-limited path already merges the pending chunk into a single + // batch; store it as a one-element batch list so flat indices map directly. + let left_schema = merged_batch.schema(); + let batches = vec![merged_batch]; + let offsets = build_batch_offsets(batches.iter().map(RecordBatch::num_rows)); let left_data = JoinLeftData::new( - merged_batch, + batches, + left_schema, + offsets, Mutex::new(visited_left_side), // In memory-limited mode, only 1 probe thread per chunk AtomicUsize::new(1), @@ -1800,7 +1865,7 @@ impl NestedLoopJoinStream { if let (Ok(left_data), Some(right_batch)) = (self.get_left_data(), self.current_right_batch.as_ref()) { - let left_rows = left_data.batch().num_rows(); + let left_rows = left_data.num_rows(); let right_rows = right_batch.num_rows(); self.metrics.selectivity.add_total(left_rows * right_rows); } @@ -2109,7 +2174,7 @@ impl NestedLoopJoinStream { .clone(); // stop probing, the caller will go to the next state - if self.left_probe_idx >= left_data.batch().num_rows() { + if self.left_probe_idx >= left_data.num_rows() { return Ok(false); } @@ -2138,7 +2203,7 @@ impl NestedLoopJoinStream { // batch. let l_row_count = std::cmp::min( l_row_cnt_ratio, - left_data.batch().num_rows() - self.left_probe_idx, + left_data.num_rows() - self.left_probe_idx, ); debug_assert!( @@ -2207,6 +2272,12 @@ impl NestedLoopJoinStream { (0..l_row_count).flat_map(|_| 0..right_rows as u32), ); + // The build side is stored as several un-concatenated batches. Gather the + // selected left rows into a single batch once (row `k` is the build-side row + // at flat index `left_indices[k]`) and reuse it for both filter evaluation + // and output construction below. + let left_gathered = left_data.gather(&left_indices)?; + debug_assert!( left_indices.len() == right_indices.len() && right_indices.len() == total_rows, @@ -2228,8 +2299,7 @@ impl NestedLoopJoinStream { Vec::with_capacity(filter.column_indices().len()); for column_index in filter.column_indices() { let array = if column_index.side == JoinSide::Left { - let col = left_data.batch().column(column_index.index); - take(col.as_ref(), &left_indices, None)? + Arc::clone(left_gathered.column(column_index.index)) } else { let col = right_batch.column(column_index.index); take(col.as_ref(), &right_indices, None)? @@ -2349,8 +2419,7 @@ impl NestedLoopJoinStream { Vec::with_capacity(self.output_schema.fields().len()); for column_index in &self.column_indices { let array = if column_index.side == JoinSide::Left { - let col = left_data.batch().column(column_index.index); - take(col.as_ref(), &left_indices, None)? + Arc::clone(left_gathered.column(column_index.index)) } else { let col = right_batch.column(column_index.index); take(col.as_ref(), &right_indices, None)? @@ -2379,13 +2448,13 @@ impl NestedLoopJoinStream { return Ok(None); } + // Resolve the flat left-row index to the batch it lives in and the row + // offset within that batch (the build side is not concatenated). + let (left_batch_idx, left_row_idx) = left_data.locate(l_index); + let left_batch = left_data.batch_at(left_batch_idx); + let cur_right_bitmap = if let Some(filter) = &self.join_filter { - apply_filter_to_row_join_batch( - left_data.batch(), - l_index, - right_batch, - filter, - )? + apply_filter_to_row_join_batch(left_batch, left_row_idx, right_batch, filter)? } else { BooleanArray::from(vec![true; right_row_count]) }; @@ -2413,8 +2482,8 @@ impl NestedLoopJoinStream { // Use the optimized approach similar to build_intermediate_batch_for_single_left_row let join_batch = build_row_join_batch( &self.output_schema, - left_data.batch(), - l_index, + left_batch, + left_row_idx, right_batch, Some(cur_right_bitmap), &self.column_indices, @@ -2429,7 +2498,7 @@ impl NestedLoopJoinStream { /// false -> next state (Done) fn process_left_unmatched(&mut self) -> Result { let left_data = self.get_left_data()?; - let left_batch = left_data.batch(); + let num_rows = left_data.num_rows(); // ======== // Check early return conditions @@ -2438,7 +2507,7 @@ impl NestedLoopJoinStream { // Early return if join type can't have unmatched rows let join_type_no_produce_left = !need_produce_result_in_final(self.join_type); // Stop processing unmatched rows, the caller will go to the next state - let finished = self.left_emit_idx >= left_batch.num_rows(); + let finished = self.left_emit_idx >= num_rows; // `ProbeEnd` already recorded whether this stream emits unmatched-left // rows. Every probe partition passes through this state, but only the @@ -2450,10 +2519,14 @@ impl NestedLoopJoinStream { // ======== // Process unmatched rows and push the result into output_buffer - // Each time, the number to process is up to batch size + // Each time, the number to process is up to batch size. Because the build + // side is stored as separate batches, also clamp the range to the batch + // containing `start_idx` so the slice stays within a single batch. // ======== let start_idx = self.left_emit_idx; - let end_idx = std::cmp::min(start_idx + self.batch_size, left_batch.num_rows()); + let (start_batch_idx, _) = left_data.locate(start_idx); + let batch_end = left_data.offsets()[start_batch_idx + 1]; + let end_idx = std::cmp::min(start_idx + self.batch_size, batch_end); if let Some(batch) = self.process_left_unmatched_range(left_data, start_idx, end_idx)? @@ -2491,9 +2564,13 @@ impl NestedLoopJoinStream { } // Slice both left batch, and bitmap to range [start_idx, end_idx) - // The range is bit index (not byte) - let left_batch = left_data.batch(); - let left_batch_sliced = left_batch.slice(start_idx, end_idx - start_idx); + // The range is bit index (not byte). The caller guarantees the range lies + // within a single build batch, so resolve `start_idx` to that batch and + // slice it directly (no concatenation of the build side). + let (batch_idx, local_start) = left_data.locate(start_idx); + let left_batch_sliced = left_data + .batch_at(batch_idx) + .slice(local_start, end_idx - start_idx); // Can this be more efficient? let mut bitmap_sliced = BooleanBufferBuilder::new(end_idx - start_idx); @@ -2537,7 +2614,7 @@ impl NestedLoopJoinStream { let cur_right_batch = unwrap_or_internal_err!(right_batch); let left_data = self.get_left_data()?; - let left_schema = left_data.batch().schema(); + let left_schema = Arc::clone(left_data.schema()); let res = build_unmatched_batch( &self.output_schema, @@ -3367,6 +3444,53 @@ pub(crate) mod tests { Ok(()) } + // Full join where the BUILD (left) side is split across multiple record + // batches, so the build side is collected without concatenation. Exercises + // resolving flat left-row indices to `(batch, row)` for matched rows and + // slicing unmatched-left ranges within a single batch across batch boundaries. + #[rstest] + #[tokio::test] + async fn join_full_with_filter_multi_batch_left( + #[values(1, 2, 16)] batch_size: usize, + ) -> Result<()> { + let task_ctx = new_task_ctx(batch_size); + // Split the build (left) side into single-row batches. + let left = build_table( + ("a1", &vec![5, 9, 11]), + ("b1", &vec![5, 8, 8]), + ("c1", &vec![50, 90, 110]), + Some(1), + Vec::new(), + ); + let right = build_right_table(); + + let filter = prepare_join_filter(); + let (columns, batches, metrics) = multi_partitioned_join_collect( + left, + right, + &JoinType::Full, + Some(filter), + task_ctx, + ) + .await?; + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+-----+----+----+-----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+-----+----+----+-----+ + | | | | 10 | 10 | 100 | + | | | | 12 | 10 | 40 | + | 11 | 8 | 110 | | | | + | 5 | 5 | 50 | 2 | 2 | 80 | + | 9 | 8 | 90 | | | | + +----+----+-----+----+----+-----+ + "#)); + + assert_join_metrics!(metrics, 5); + + Ok(()) + } + #[rstest] #[tokio::test] async fn join_left_semi_with_filter( diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs index 36a043cc7d16b..9ccde7bce0ec1 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs @@ -354,8 +354,7 @@ impl ClassicPWMJStream { true, ); - let new_buffered_batch = - take_record_batch(buffered_data.batch(), &buffered_indices)?; + let new_buffered_batch = buffered_data.gather_indices(&buffered_indices)?; let mut buffered_columns = new_buffered_batch.columns().to_vec(); let streamed_columns: Vec = self @@ -609,8 +608,7 @@ fn build_matched_indices_and_set_buffered_bitmap( let new_buffered_batch = buffered_side .buffered_data - .batch() - .slice(buffered_range.0, buffered_range.1); + .gather_range(buffered_range.0, buffered_range.1)?; let mut buffered_columns = new_buffered_batch.columns().to_vec(); let indices = UInt32Array::from_value(streamed_range.0 as u32, streamed_range.1); @@ -662,7 +660,7 @@ mod tests { }; use arrow::array::{Date32Array, Date64Array}; use arrow_schema::{DataType, Field}; - use datafusion_common::test_util::batches_to_string; + use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, expressions::Column}; use insta::assert_snapshot; @@ -819,6 +817,65 @@ mod tests { Ok(()) } + // The buffered side may arrive as several record batches. Since it is no longer + // concatenated into one batch, rows are gathered across batches with + // `interleave` (matched suffix ranges and unmatched buffered rows). This test + // runs each join type with the buffered (left) side provided both as a single + // batch (the original `slice`/`take` path) and as the same data split into + // several batches (the `interleave` path), and asserts the results are equal. + #[tokio::test] + async fn join_buffered_multi_batch_equals_single_batch() -> Result<()> { + // Buffered (left) side, globally sorted descending on `b1` (required for `<`). + let a1 = vec![1, 2, 3, 4, 5]; + let b1 = vec![10, 8, 6, 4, 2]; + let c1 = vec![70, 80, 90, 100, 110]; + + // Probe (right) side. + let a2 = vec![10, 20, 30, 40]; + let b2 = vec![9, 7, 5, 3]; + let c2 = vec![700, 800, 900, 1000]; + + for join_type in [ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + ] { + // Single batch buffered side (original slice/take path). + let left_single = build_table(("a1", &a1), ("b1", &b1), ("c1", &c1)); + let right = build_table(("a2", &a2), ("b2", &b2), ("c2", &c2)); + let on = ( + Arc::new(Column::new_with_schema("b1", &left_single.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + ); + let (_, single_batches) = + join_collect(left_single, right, on, Operator::Lt, join_type).await?; + + // Same data, buffered side split into three batches (interleave path). + let batch = build_table_i32(("a1", &a1), ("b1", &b1), ("c1", &c1)); + let schema = batch.schema(); + let buffered_batches = + vec![batch.slice(0, 2), batch.slice(2, 2), batch.slice(4, 1)]; + let left_multi = + TestMemoryExec::try_new_exec(&[buffered_batches], schema, None).unwrap(); + let right = build_table(("a2", &a2), ("b2", &b2), ("c2", &c2)); + let on = ( + Arc::new(Column::new_with_schema("b1", &left_multi.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + ); + let (_, multi_batches) = + join_collect(left_multi, right, on, Operator::Lt, join_type).await?; + + assert_eq!( + batches_to_sort_string(&single_batches), + batches_to_sort_string(&multi_batches), + "multi-batch buffered result differs from single-batch for {join_type:?}" + ); + } + + Ok(()) + } + #[tokio::test] async fn join_inner_less_than_unsorted() -> Result<()> { // +----+----+----+ diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index 50e9252a21131..62688913a87be 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -17,8 +17,8 @@ use arrow::array::Array; use arrow::{ - array::{ArrayRef, BooleanBufferBuilder, RecordBatch}, - compute::concat_batches, + array::{ArrayRef, BooleanBufferBuilder, RecordBatch, UInt64Array}, + compute::{concat, interleave_record_batch, take_record_batch}, util::bit_util, }; use arrow_schema::{SchemaRef, SortOptions}; @@ -49,7 +49,9 @@ use crate::joins::piecewise_merge_join::classic_join::{ use crate::joins::piecewise_merge_join::utils::{ build_visited_indices_map, is_existence_join, is_right_existence_join, }; -use crate::joins::utils::asymmetric_join_output_partitioning; +use crate::joins::utils::{ + asymmetric_join_output_partitioning, build_batch_offsets, flat_index_to_batch_row, +}; use crate::metrics::MetricsSet; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties, @@ -665,27 +667,40 @@ async fn build_buffered_data( }) .await?; - let single_batch = concat_batches(&schema, batches.iter())?; - - // Evaluate physical expression on the buffered side. - let buffered_values = on_buffered - .evaluate(&single_batch)? - .into_array(single_batch.num_rows())?; - - // We add the single batch size + the memory of the join keys - // size of the size estimation - let size_estimation = get_record_batch_memory_size(&single_batch) - + buffered_values.get_array_memory_size(); + // Keep the buffered batches un-concatenated. Rows are addressed by a flat index + // into their logical concatenation via these prefix-sum offsets. + let mut batches = batches; + if batches.is_empty() { + // Guarantee at least one batch so build-side column types are always + // available (e.g. for gathering rows on an empty buffered side). + batches.push(RecordBatch::new_empty(Arc::clone(&schema))); + } + let offsets = build_batch_offsets(batches.iter().map(RecordBatch::num_rows)); + + // Evaluate the join key on each buffered batch and concatenate just that single + // (sorted) key column. The merge algorithm needs the key as one contiguous, + // globally-sorted array; concatenating one column is far cheaper than copying + // every column of the buffered side into a single batch. + let key_arrays = batches + .iter() + .map(|b| on_buffered.evaluate(b)?.into_array(b.num_rows())) + .collect::>>()?; + let buffered_values = + concat(&key_arrays.iter().map(|a| a.as_ref()).collect::>())?; + + // The buffered batches were already reserved as they were collected above; only + // the concatenated key column is newly allocated here. + let size_estimation = buffered_values.get_array_memory_size(); reservation.try_grow(size_estimation)?; metrics.build_mem_used.add(size_estimation); // Created visited indices bitmap only if the join type requires it let visited_indices_bitmap = if build_map { - let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); + let bitmap_size = bit_util::ceil(num_rows, 8); reservation.try_grow(bitmap_size)?; metrics.build_mem_used.add(bitmap_size); - let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows()); + let mut bitmap_buffer = BooleanBufferBuilder::new(num_rows); bitmap_buffer.append_n(num_rows, false); bitmap_buffer } else { @@ -693,7 +708,8 @@ async fn build_buffered_data( }; let buffered_data = BufferedSideData::new( - single_batch, + batches, + offsets, buffered_values, Mutex::new(visited_indices_bitmap), remaining_partitions, @@ -704,7 +720,14 @@ async fn build_buffered_data( } pub(super) struct BufferedSideData { - pub(super) batch: RecordBatch, + /// Buffered (build-side) rows, kept as the original un-concatenated batches. + /// Rows are addressed by a flat index into their logical concatenation. + batches: Vec, + /// Prefix-sum offsets: `offsets[k]` is the first flat index of `batches[k]`, + /// and `offsets[batches.len()]` is the total row count. + offsets: Vec, + /// The (globally sorted) join-key column, concatenated across all buffered + /// batches into a single contiguous array for the merge. values: ArrayRef, pub(super) visited_indices_bitmap: SharedBitmapBuilder, pub(super) remaining_partitions: AtomicUsize, @@ -713,14 +736,16 @@ pub(super) struct BufferedSideData { impl BufferedSideData { pub(super) fn new( - batch: RecordBatch, + batches: Vec, + offsets: Vec, values: ArrayRef, visited_indices_bitmap: SharedBitmapBuilder, remaining_partitions: usize, reservation: MemoryReservation, ) -> Self { Self { - batch, + batches, + offsets, values, visited_indices_bitmap, remaining_partitions: AtomicUsize::new(remaining_partitions), @@ -728,13 +753,46 @@ impl BufferedSideData { } } - pub(super) fn batch(&self) -> &RecordBatch { - &self.batch - } - pub(super) fn values(&self) -> &ArrayRef { &self.values } + + /// Gather a contiguous logical row range `[start, start + count)` of the + /// buffered side into a single batch. When the range lies within one batch + /// this is a zero-copy slice; otherwise it is materialized with `interleave`. + pub(super) fn gather_range(&self, start: usize, count: usize) -> Result { + if count == 0 { + return Ok(self.batches[0].slice(0, 0)); + } + let (first_batch, first_row) = flat_index_to_batch_row(&self.offsets, start); + let (last_batch, _) = flat_index_to_batch_row(&self.offsets, start + count - 1); + if first_batch == last_batch { + // Range is within a single batch: slice it directly (zero-copy). + Ok(self.batches[first_batch].slice(first_row, count)) + } else { + let pairs: Vec<(usize, usize)> = (start..start + count) + .map(|i| flat_index_to_batch_row(&self.offsets, i)) + .collect(); + let refs: Vec<&RecordBatch> = self.batches.iter().collect(); + Ok(interleave_record_batch(&refs, &pairs)?) + } + } + + /// Gather arbitrary buffered rows (by flat index) into a single batch. + pub(super) fn gather_indices(&self, indices: &UInt64Array) -> Result { + if self.batches.len() == 1 { + // Single batch: identical to the original `take_record_batch` path. + Ok(take_record_batch(&self.batches[0], indices)?) + } else { + let pairs: Vec<(usize, usize)> = indices + .values() + .iter() + .map(|&i| flat_index_to_batch_row(&self.offsets, i as usize)) + .collect(); + let refs: Vec<&RecordBatch> = self.batches.iter().collect(); + Ok(interleave_record_batch(&refs, &pairs)?) + } + } } pub(super) enum BufferedSide {