From b2ca077603a3e6f0c29aecdaba019941e7ffc62f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 5 Mar 2026 12:52:01 -0500 Subject: [PATCH] Replace reservation.free()->try_grow() pattern with try_resize() to reduce memory pool interactions. --- datafusion/physical-plan/src/sorts/sort.rs | 50 +++++++++------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index b3ea548d53750..5b64f0b2a6186 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -730,37 +730,27 @@ impl ExternalSorter { // Sort the batch immediately and get all output batches let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?; - // Free the old reservation and grow it to match the actual sorted output size - reservation.free(); + // Resize the reservation to match the actual sorted output size. + // Using try_resize avoids a release-then-reacquire cycle, which + // matters for MemoryPool implementations where grow/shrink have + // non-trivial cost (e.g. JNI calls in Comet). + let total_sorted_size: usize = sorted_batches + .iter() + .map(get_record_batch_memory_size) + .sum(); + reservation + .try_resize(total_sorted_size) + .map_err(Self::err_with_oom_context)?; - Result::<_, DataFusionError>::Ok((schema, sorted_batches, reservation)) - }) - .then({ - move |batches| async move { - match batches { - Ok((schema, sorted_batches, reservation)) => { - // Calculate the total size of sorted batches - let total_sorted_size: usize = sorted_batches - .iter() - .map(get_record_batch_memory_size) - .sum(); - reservation - .try_grow(total_sorted_size) - .map_err(Self::err_with_oom_context)?; - - // Wrap in ReservationStream to hold the reservation - Ok(Box::pin(ReservationStream::new( - Arc::clone(&schema), - Box::pin(RecordBatchStreamAdapter::new( - schema, - futures::stream::iter(sorted_batches.into_iter().map(Ok)), - )), - reservation, - )) as SendableRecordBatchStream) - } - Err(e) => Err(e), - } - } + // Wrap in ReservationStream to hold the reservation + Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new( + Arc::clone(&schema), + Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&schema), + futures::stream::iter(sorted_batches.into_iter().map(Ok)), + )), + reservation, + )) as SendableRecordBatchStream) }) .try_flatten() .map(move |batch| match batch {