@@ -45,7 +45,7 @@ use crate::{
4545} ;
4646
4747use arrow:: array:: { PrimitiveArray , RecordBatch , RecordBatchOptions } ;
48- use arrow:: compute:: take_arrays;
48+ use arrow:: compute:: { take_arrays, BatchCoalescer } ;
4949use arrow:: datatypes:: { SchemaRef , UInt32Type } ;
5050use datafusion_common:: config:: ConfigOptions ;
5151use datafusion_common:: stats:: Precision ;
@@ -388,6 +388,7 @@ impl RepartitionExecState {
388388 . map ( |( partition, channel) | ( * partition, channel. sender . clone ( ) ) )
389389 . collect ( ) ;
390390
391+ let target_batch_size = context. session_config ( ) . batch_size ( ) ;
391392 let input_task = SpawnedTask :: spawn ( RepartitionExec :: pull_from_input (
392393 stream,
393394 txs,
@@ -396,6 +397,7 @@ impl RepartitionExecState {
396397 // preserve_order depends on partition index to start from 0
397398 if preserve_order { 0 } else { i } ,
398399 num_input_partitions,
400+ target_batch_size,
399401 ) ) ;
400402
401403 // In a separate task, wait for each input to be done
@@ -643,6 +645,68 @@ impl BatchPartitioner {
643645 Ok ( it)
644646 }
645647
648+ /// Compute hash partition indices without materializing sub-batches.
649+ ///
650+ /// Returns an iterator of `(partition_index, UInt32Array)` pairs where each
651+ /// `UInt32Array` contains the row indices from the input batch that belong
652+ /// to that partition. Only non-empty partitions are returned.
653+ ///
654+ /// This is used with `BatchCoalescer::push_batch_with_indices` to avoid
655+ /// creating intermediate sub-batches during hash repartitioning.
656+ fn hash_partition_indices (
657+ & mut self ,
658+ batch : & RecordBatch ,
659+ ) -> Result < Vec < ( usize , PrimitiveArray < UInt32Type > ) > > {
660+ match & mut self . state {
661+ BatchPartitionerState :: Hash {
662+ exprs,
663+ num_partitions : partitions,
664+ hash_buffer,
665+ indices,
666+ } => {
667+ let timer = self . timer . timer ( ) ;
668+
669+ let arrays =
670+ evaluate_expressions_to_arrays ( exprs. as_slice ( ) , batch) ?;
671+
672+ hash_buffer. clear ( ) ;
673+ hash_buffer. resize ( batch. num_rows ( ) , 0 ) ;
674+
675+ create_hashes (
676+ & arrays,
677+ REPARTITION_RANDOM_STATE . random_state ( ) ,
678+ hash_buffer,
679+ ) ?;
680+
681+ indices. iter_mut ( ) . for_each ( |v| v. clear ( ) ) ;
682+
683+ for ( index, hash) in hash_buffer. iter ( ) . enumerate ( ) {
684+ indices[ ( * hash % * partitions as u64 ) as usize ]
685+ . push ( index as u32 ) ;
686+ }
687+
688+ timer. done ( ) ;
689+
690+ let mut result = Vec :: new ( ) ;
691+ for ( partition, p_indices) in indices. iter_mut ( ) . enumerate ( ) {
692+ if !p_indices. is_empty ( ) {
693+ let taken_indices = std:: mem:: take ( p_indices) ;
694+ let indices_array: PrimitiveArray < UInt32Type > =
695+ taken_indices. into ( ) ;
696+ result. push ( ( partition, indices_array) ) ;
697+ }
698+ }
699+
700+ Ok ( result)
701+ }
702+ _ => {
703+ internal_err ! (
704+ "hash_partition_indices called on non-hash partitioner"
705+ )
706+ }
707+ }
708+ }
709+
646710 // return the number of output partitions
647711 fn num_partitions ( & self ) -> usize {
648712 match self . state {
@@ -1337,6 +1401,55 @@ impl RepartitionExec {
13371401 }
13381402 }
13391403
1404+ /// Send a completed batch through the output channel, handling memory
1405+ /// reservation and spilling.
1406+ ///
1407+ /// Returns `true` if the channel is still open, `false` if the receiver
1408+ /// has hung up (e.g. LIMIT).
1409+ async fn send_batch (
1410+ batch : RecordBatch ,
1411+ partition : usize ,
1412+ output_channels : & mut HashMap < usize , OutputChannel > ,
1413+ metrics : & RepartitionMetrics ,
1414+ ) -> bool {
1415+ let Some ( channel) = output_channels. get_mut ( & partition) else {
1416+ return false ;
1417+ } ;
1418+
1419+ let size = batch. get_array_memory_size ( ) ;
1420+ let timer = metrics. send_time [ partition] . timer ( ) ;
1421+
1422+ let can_grow = channel. reservation . lock ( ) . try_grow ( size) . is_ok ( ) ;
1423+ let ( batch_to_send, is_memory_batch) = if can_grow {
1424+ ( RepartitionBatch :: Memory ( batch) , true )
1425+ } else {
1426+ if let Err ( e) = channel. spill_writer . push_batch ( & batch) {
1427+ let _ = channel. sender . send ( Some ( Err ( e. into ( ) ) ) ) . await ;
1428+ timer. done ( ) ;
1429+ output_channels. remove ( & partition) ;
1430+ return false ;
1431+ }
1432+ ( RepartitionBatch :: Spilled , false )
1433+ } ;
1434+
1435+ if channel
1436+ . sender
1437+ . send ( Some ( Ok ( batch_to_send) ) )
1438+ . await
1439+ . is_err ( )
1440+ {
1441+ if is_memory_batch {
1442+ channel. reservation . lock ( ) . shrink ( size) ;
1443+ }
1444+ timer. done ( ) ;
1445+ output_channels. remove ( & partition) ;
1446+ return false ;
1447+ }
1448+
1449+ timer. done ( ) ;
1450+ true
1451+ }
1452+
13401453 /// Pulls data from the specified input plan, feeding it to the
13411454 /// output partitions based on the desired partitioning
13421455 ///
@@ -1348,6 +1461,7 @@ impl RepartitionExec {
13481461 metrics : RepartitionMetrics ,
13491462 input_partition : usize ,
13501463 num_input_partitions : usize ,
1464+ target_batch_size : usize ,
13511465 ) -> Result < ( ) > {
13521466 let mut partitioner = match & partitioning {
13531467 Partitioning :: Hash ( exprs, num_partitions) => {
@@ -1370,6 +1484,22 @@ impl RepartitionExec {
13701484 }
13711485 } ;
13721486
1487+ let schema = stream. schema ( ) ;
1488+ // Use coalescers for hash partitioning to accumulate rows per
1489+ // output partition and only send when target_batch_size is reached.
1490+ // This avoids sending many small sub-batches through the channel.
1491+ // Skip coalescing for 0-column schemas as BatchCoalescer can't handle them.
1492+ let use_coalescers = matches ! ( & partitioning, Partitioning :: Hash ( ..) )
1493+ && !schema. fields ( ) . is_empty ( ) ;
1494+ let mut coalescers: HashMap < usize , BatchCoalescer > = if use_coalescers {
1495+ output_channels
1496+ . keys ( )
1497+ . map ( |& p| ( p, BatchCoalescer :: new ( Arc :: clone ( & schema) , target_batch_size) ) )
1498+ . collect ( )
1499+ } else {
1500+ HashMap :: new ( )
1501+ } ;
1502+
13731503 // While there are still outputs to send to, keep pulling inputs
13741504 let mut batches_until_yield = partitioner. num_partitions ( ) ;
13751505 while !output_channels. is_empty ( ) {
@@ -1389,38 +1519,44 @@ impl RepartitionExec {
13891519 continue ;
13901520 }
13911521
1392- for res in partitioner. partition_iter ( batch) ? {
1393- let ( partition, batch) = res?;
1394- let size = batch. get_array_memory_size ( ) ;
1395-
1396- let timer = metrics. send_time [ partition] . timer ( ) ;
1397- // if there is still a receiver, send to it
1398- if let Some ( channel) = output_channels. get_mut ( & partition) {
1399- let ( batch_to_send, is_memory_batch) =
1400- match channel. reservation . lock ( ) . try_grow ( size) {
1401- Ok ( _) => {
1402- // Memory available - send in-memory batch
1403- ( RepartitionBatch :: Memory ( batch) , true )
1404- }
1405- Err ( _) => {
1406- // We're memory limited - spill to SpillPool
1407- // SpillPool handles file handle reuse and rotation
1408- channel. spill_writer . push_batch ( & batch) ?;
1409- // Send marker indicating batch was spilled
1410- ( RepartitionBatch :: Spilled , false )
1411- }
1412- } ;
1413-
1414- if channel. sender . send ( Some ( Ok ( batch_to_send) ) ) . await . is_err ( ) {
1415- // If the other end has hung up, it was an early shutdown (e.g. LIMIT)
1416- // Only shrink memory if it was a memory batch
1417- if is_memory_batch {
1418- channel. reservation . lock ( ) . shrink ( size) ;
1522+ if use_coalescers {
1523+ // Hash partitioning with coalescing: compute indices and
1524+ // push into per-partition coalescers
1525+ for ( partition, indices) in
1526+ partitioner. hash_partition_indices ( & batch) ?
1527+ {
1528+ if let Some ( coalescer) = coalescers. get_mut ( & partition) {
1529+ coalescer. push_batch_with_indices (
1530+ batch. clone ( ) ,
1531+ & indices,
1532+ ) ?;
1533+
1534+ // Send any completed batches
1535+ while let Some ( completed) =
1536+ coalescer. next_completed_batch ( )
1537+ {
1538+ Self :: send_batch (
1539+ completed,
1540+ partition,
1541+ & mut output_channels,
1542+ & metrics,
1543+ )
1544+ . await ;
14191545 }
1420- output_channels. remove ( & partition) ;
14211546 }
14221547 }
1423- timer. done ( ) ;
1548+ } else {
1549+ // Round-robin or fallback: send batches directly
1550+ for res in partitioner. partition_iter ( batch) ? {
1551+ let ( partition, batch) = res?;
1552+ Self :: send_batch (
1553+ batch,
1554+ partition,
1555+ & mut output_channels,
1556+ & metrics,
1557+ )
1558+ . await ;
1559+ }
14241560 }
14251561
14261562 // If the input stream is endless, we may spin forever and
@@ -1447,6 +1583,20 @@ impl RepartitionExec {
14471583 }
14481584 }
14491585
1586+ // Flush remaining buffered batches from coalescers
1587+ for ( partition, coalescer) in & mut coalescers {
1588+ coalescer. finish_buffered_batch ( ) ?;
1589+ while let Some ( batch) = coalescer. next_completed_batch ( ) {
1590+ Self :: send_batch (
1591+ batch,
1592+ * partition,
1593+ & mut output_channels,
1594+ & metrics,
1595+ )
1596+ . await ;
1597+ }
1598+ }
1599+
14501600 // Spill writers will auto-finalize when dropped
14511601 // No need for explicit flush
14521602 Ok ( ( ) )
0 commit comments