From ce2d8c0012280bd0f0490e3a8a63d9c38f39d2b4 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 6 Mar 2026 21:01:35 +0000 Subject: [PATCH] Refactor RepartitionExec to use join_all for concurrent sends Replace sequential send loops in wait_for_task with futures::future::join_all to send to all output partitions concurrently instead of one at a time. https://claude.ai/code/session_01GDTBavJzih6tSSBd9SRNmk --- .../physical-plan/src/repartition/mod.rs | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 081f10d482e1e..eefe79d32da56 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -71,6 +71,7 @@ use crate::sort_pushdown::SortOrderPushdownResult; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::stream::Stream; +use futures::future::join_all; use futures::{FutureExt, StreamExt, TryStreamExt, ready}; use log::trace; use parking_lot::Mutex; @@ -1469,31 +1470,40 @@ impl RepartitionExec { Err(e) => { let e = Arc::new(e); - for (_, tx) in txs { - let err = Err(DataFusionError::Context( - "Join Error".to_string(), - Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))), - )); - tx.send(Some(err)).await.ok(); - } + join_all(txs.into_values().map(|tx| { + let e = Arc::clone(&e); + async move { + let err = Err(DataFusionError::Context( + "Join Error".to_string(), + Box::new(DataFusionError::External(Box::new(e))), + )); + tx.send(Some(err)).await.ok(); + } + })) + .await; } // Error from running input task Ok(Err(e)) => { // send the same Arc'd error to all output partitions let e = Arc::new(e); - for (_, tx) in txs { - // wrap it because need to send error to all output partitions - let err = Err(DataFusionError::from(&e)); - tx.send(Some(err)).await.ok(); - } + join_all(txs.into_values().map(|tx| { + let e = Arc::clone(&e); + async move { + let err = Err(DataFusionError::from(&e)); + tx.send(Some(err)).await.ok(); + } + })) + .await; } // Input task completed successfully Ok(Ok(())) => { // notify each output partition that this input partition has no more data - for (_partition, tx) in txs { - tx.send(None).await.ok(); - } + join_all( + txs.into_values() + .map(|tx| async move { tx.send(None).await.ok() }), + ) + .await; } } }