From 20fce3a77d806bf0cc07985d2daebb6bfb6610b6 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 6 Mar 2026 18:54:13 +0000 Subject: [PATCH 1/4] feat: implement per-channel backpressure in RepartitionExec distributor channels Replace the global gate mechanism with per-channel bounded capacity. Each channel independently enforces a buffer limit (CHANNEL_CAPACITY=2), so senders targeting a full channel block without affecting senders to other channels. This eliminates head-of-line blocking and provides more granular backpressure control. https://claude.ai/code/session_01MiFZEjX5FyFwJGAVjv8tJ3 --- .../src/repartition/distributor_channels.rs | 410 +++++++----------- 1 file changed, 165 insertions(+), 245 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/distributor_channels.rs b/datafusion/physical-plan/src/repartition/distributor_channels.rs index 22872d1e32d49..55316ab3c8e9c 100644 --- a/datafusion/physical-plan/src/repartition/distributor_channels.rs +++ b/datafusion/physical-plan/src/repartition/distributor_channels.rs @@ -16,27 +16,33 @@ // under the License. //! Special channel construction to distribute data from various inputs into N outputs -//! minimizing buffering but preventing deadlocks when repartitioning +//! using per-channel backpressure to prevent unbounded buffering. //! //! # Design //! +//! Each channel independently enforces a capacity limit. When a channel's buffer +//! is full, only senders targeting that specific channel are blocked. Senders to +//! other channels with available capacity can continue making progress. +//! //! ```text -//! +----+ +------+ -//! | TX |==|| | Gate | -//! +----+ || | | +--------+ +----+ -//! ====| |==| Buffer |==| RX | -//! +----+ || | | +--------+ +----+ -//! | TX |==|| | | -//! +----+ | | -//! | | -//! +----+ | | +--------+ +----+ -//! | TX |======| |==| Buffer |==| RX | -//! +----+ +------+ +--------+ +----+ +//! +----+ +-------------------+ +//! | TX |==|| | Channel 0 | +//! +----+ || | [capacity limit] | +----+ +//! ====| Buffer <=2 |==| RX | +//! +----+ || +-------------------+ +----+ +//! | TX |==|| +//! +----+ +//! +-------------------+ +//! +----+ | Channel 1 | +----+ +//! | TX |======| Buffer <=2 |==| RX | +//! +----+ +-------------------+ +----+ //! ``` //! -//! There are `N` virtual MPSC (multi-producer, single consumer) channels with unbounded capacity. However, if all -//! buffers/channels are non-empty, than a global gate will be closed preventing new data from being written (the -//! sender futures will be [pending](Poll::Pending)) until at least one channel is empty (and not closed). +//! There are `N` virtual MPSC (multi-producer, single consumer) channels, each with a +//! bounded capacity of [`CHANNEL_CAPACITY`]. When a channel's buffer reaches capacity, +//! senders to that channel will be [pending](Poll::Pending) until the receiver drains +//! data from that specific channel. This provides per-channel backpressure without +//! head-of-line blocking across channels. use std::{ collections::VecDeque, future::Future, @@ -51,30 +57,29 @@ use std::{ use parking_lot::Mutex; -/// Create `n` empty channels. +/// Per-channel backpressure capacity. +/// +/// Each channel can buffer up to this many items before blocking senders. +/// A value of 2 allows sender and receiver to overlap operations for better +/// throughput while still providing meaningful backpressure. +const CHANNEL_CAPACITY: usize = 2; + +/// Create `n` empty channels with per-channel backpressure. pub fn channels( n: usize, ) -> (Vec>, Vec>) { let channels = (0..n) - .map(|id| Arc::new(Channel::new_with_one_sender(id))) + .map(|_| Arc::new(Channel::new_with_one_sender(CHANNEL_CAPACITY))) .collect::>(); - let gate = Arc::new(Gate { - empty_channels: AtomicUsize::new(n), - send_wakers: Mutex::new(None), - }); let senders = channels .iter() .map(|channel| DistributionSender { channel: Arc::clone(channel), - gate: Arc::clone(&gate), }) .collect(); let receivers = channels .into_iter() - .map(|channel| DistributionReceiver { - channel, - gate: Arc::clone(&gate), - }) + .map(|channel| DistributionReceiver { channel }) .collect(); (senders, receivers) } @@ -119,19 +124,19 @@ impl std::error::Error for SendError {} /// receive `None` afterwards. #[derive(Debug)] pub struct DistributionSender { - /// To prevent lock inversion / deadlock, channel lock is always acquired prior to gate lock channel: SharedChannel, - gate: SharedGate, } impl DistributionSender { /// Send data. /// /// This fails if the [receiver](DistributionReceiver) is gone. + /// + /// If the channel's buffer is at capacity, the returned future will be + /// [pending](Poll::Pending) until the receiver drains data from this channel. pub fn send(&self, element: T) -> SendFuture<'_, T> { SendFuture { channel: &self.channel, - gate: &self.gate, element: Box::new(Some(element)), } } @@ -143,7 +148,6 @@ impl Clone for DistributionSender { Self { channel: Arc::clone(&self.channel), - gate: Arc::clone(&self.gate), } } } @@ -159,30 +163,6 @@ impl Drop for DistributionSender { let receivers = { let mut state = self.channel.state.lock(); - // During the shutdown of a empty channel, both the sender and the receiver side will be dropped. However we - // only want to decrement the "empty channels" counter once. - // - // We are within a critical section here, so we we can safely assume that either the last sender or the - // receiver (there's only one) will be dropped first. - // - // If the last sender is dropped first, `state.data` will still exists and the sender side decrements the - // signal. The receiver side then MUST check the `n_senders` counter during the section and if it is zero, - // it infers that it is dropped afterwards and MUST NOT decrement the counter. - // - // If the receiver end is dropped first, it will infer -- based on `n_senders` -- that there are still - // senders and it will decrement the `empty_channels` counter. It will also set `data` to `None`. The sender - // side will then see that `data` is `None` and can therefore infer that the receiver end was dropped, and - // hence it MUST NOT decrement the `empty_channels` counter. - if state - .data - .as_ref() - .map(|data| data.is_empty()) - .unwrap_or_default() - { - // channel is gone, so we need to clear our signal - self.gate.decr_empty_channels(); - } - // make sure that nobody can add wakers anymore state.recv_wakers.take().expect("not closed yet") }; @@ -198,7 +178,6 @@ impl Drop for DistributionSender { #[derive(Debug)] pub struct SendFuture<'a, T> { channel: &'a SharedChannel, - gate: &'a SharedGate, // the additional Box is required for `Self: Unpin` element: Box>, } @@ -212,31 +191,26 @@ impl Future for SendFuture<'_, T> { // lock scope let to_wake = { - let mut guard_channel_state = this.channel.state.lock(); + let mut guard = this.channel.state.lock(); - let Some(data) = guard_channel_state.data.as_mut() else { + let Some(data) = guard.data.as_mut() else { // receiver end dead return Poll::Ready(Err(SendError( this.element.take().expect("just checked"), ))); }; - // does ANY receiver need data? - // if so, allow sender to create another - if this.gate.empty_channels.load(Ordering::SeqCst) == 0 { - let mut guard = this.gate.send_wakers.lock(); - if let Some(send_wakers) = guard.deref_mut() { - send_wakers.push((cx.waker().clone(), this.channel.id)); - return Poll::Pending; - } + // Per-channel backpressure: block if this channel is at capacity + if data.len() >= this.channel.capacity { + guard.send_wakers.push(cx.waker().clone()); + return Poll::Pending; } let was_empty = data.is_empty(); data.push_back(this.element.take().expect("just checked")); if was_empty { - this.gate.decr_empty_channels(); - guard_channel_state.take_recv_wakers() + guard.take_recv_wakers() } else { Vec::with_capacity(0) } @@ -255,7 +229,6 @@ impl Future for SendFuture<'_, T> { #[derive(Debug)] pub struct DistributionReceiver { channel: SharedChannel, - gate: SharedGate, } impl DistributionReceiver { @@ -265,7 +238,6 @@ impl DistributionReceiver { pub fn recv(&mut self) -> RecvFuture<'_, T> { RecvFuture { channel: &mut self.channel, - gate: &mut self.gate, rdy: false, } } @@ -273,25 +245,22 @@ impl DistributionReceiver { impl Drop for DistributionReceiver { fn drop(&mut self) { - let mut guard_channel_state = self.channel.state.lock(); - let data = guard_channel_state.data.take().expect("not dropped yet"); - - // See `DistributedSender::drop` for an explanation of the drop order and when the "empty channels" counter is - // decremented. - if data.is_empty() && (self.channel.n_senders.load(Ordering::SeqCst) > 0) { - // channel is gone, so we need to clear our signal - self.gate.decr_empty_channels(); - } + let mut guard = self.channel.state.lock(); + guard.data.take().expect("not dropped yet"); - // senders may be waiting for gate to open but should error now that the channel is closed - self.gate.wake_channel_senders(self.channel.id); + // Wake all blocked senders so they see the receiver is gone and return SendError + let send_wakers = std::mem::take(&mut guard.send_wakers); + drop(guard); + + for waker in send_wakers { + waker.wake(); + } } } /// Future backing [recv](DistributionReceiver::recv). pub struct RecvFuture<'a, T> { channel: &'a mut SharedChannel, - gate: &'a mut SharedGate, rdy: bool, } @@ -308,32 +277,21 @@ impl Future for RecvFuture<'_, T> { match data.pop_front() { Some(element) => { - // change "empty" signal for this channel? - if data.is_empty() && channel_state.recv_wakers.is_some() { - // update counter - let old_counter = - this.gate.empty_channels.fetch_add(1, Ordering::SeqCst); - - // open gate? - let to_wake = if old_counter == 0 { - let mut guard = this.gate.send_wakers.lock(); - - // check after lock to see if we should still change the state - if this.gate.empty_channels.load(Ordering::SeqCst) > 0 { - guard.take().unwrap_or_default() - } else { - Vec::with_capacity(0) - } + // Wake blocked senders if the buffer was at capacity before this pop. + // After popping, data.len() == old_len - 1, so old_len == data.len() + 1. + // If old_len >= capacity, senders may have been blocked. + let to_wake = + if data.len() + 1 >= this.channel.capacity { + std::mem::take(&mut channel_state.send_wakers) } else { Vec::with_capacity(0) }; - drop(guard_channel_state); + drop(guard_channel_state); - // wake outside of lock scope - for (waker, _channel_id) in to_wake { - waker.wake(); - } + // wake outside of lock scope + for waker in to_wake { + waker.wake(); } this.rdy = true; @@ -358,24 +316,23 @@ struct Channel { /// Reference counter for the sender side. n_senders: AtomicUsize, - /// Channel ID. - /// - /// This is used to address [send wakers](Gate::send_wakers). - id: usize, + /// Per-channel capacity limit for backpressure. + capacity: usize, /// Mutable state. state: Mutex>, } impl Channel { - /// Create new channel with one sender (so we don't need to [fetch-add](AtomicUsize::fetch_add) directly afterwards). - fn new_with_one_sender(id: usize) -> Self { + /// Create new channel with one sender and the given capacity. + fn new_with_one_sender(capacity: usize) -> Self { Channel { n_senders: AtomicUsize::new(1), - id, + capacity, state: Mutex::new(ChannelState { data: Some(VecDeque::default()), recv_wakers: Some(Vec::default()), + send_wakers: Vec::default(), }), } } @@ -393,6 +350,12 @@ struct ChannelState { /// The receiver will be pending if the [buffer](Self::data) is empty and /// there are senders left (otherwise this is set to [`None`]). recv_wakers: Option>, + + /// Wakers for blocked senders. + /// + /// Senders are blocked when the channel buffer reaches [capacity](Channel::capacity). + /// They are woken when the receiver consumes data, making space in the buffer. + send_wakers: Vec, } impl ChannelState { @@ -415,63 +378,6 @@ impl ChannelState { /// One or multiple senders and a single receiver will share a channel. type SharedChannel = Arc>; -/// The "all channels have data" gate. -#[derive(Debug)] -struct Gate { - /// Number of currently empty (and still open) channels. - empty_channels: AtomicUsize, - - /// Wakers for the sender side, including their channel IDs. - /// - /// This is `None` if the there are non-empty channels. - send_wakers: Mutex>>, -} - -impl Gate { - /// Wake senders for a specific channel. - /// - /// This is helpful to signal that the receiver side is gone and the senders shall now error. - fn wake_channel_senders(&self, id: usize) { - // lock scope - let to_wake = { - let mut guard = self.send_wakers.lock(); - - if let Some(send_wakers) = guard.deref_mut() { - // `drain_filter` is unstable, so implement our own - let (wake, keep) = - send_wakers.drain(..).partition(|(_waker, id2)| id == *id2); - - *send_wakers = keep; - - wake - } else { - Vec::with_capacity(0) - } - }; - - // wake outside of lock scope - for (waker, _id) in to_wake { - waker.wake(); - } - } - - fn decr_empty_channels(&self) { - let old_count = self.empty_channels.fetch_sub(1, Ordering::SeqCst); - - if old_count == 1 { - let mut guard = self.send_wakers.lock(); - - // double-check state during lock - if self.empty_channels.load(Ordering::SeqCst) == 0 && guard.is_none() { - *guard = Some(Vec::new()); - } - } - } -} - -/// Gate shared by all senders and receivers. -type SharedGate = Arc; - #[cfg(test)] mod tests { use std::sync::atomic::AtomicBool; @@ -481,33 +387,35 @@ mod tests { use super::*; #[test] - fn test_single_channel_no_gate() { - // use two channels so that the first one never hits the gate - let (mut txs, mut rxs) = channels(2); + fn test_single_channel_send_recv() { + let (txs, mut rxs) = channels(1); let mut recv_fut = rxs[0].recv(); let waker = poll_pending(&mut recv_fut); poll_ready(&mut txs[0].send("foo")).unwrap(); assert!(waker.woken()); - assert_eq!(poll_ready(&mut recv_fut), Some("foo"),); + assert_eq!(poll_ready(&mut recv_fut), Some("foo")); + // Send up to capacity (2) poll_ready(&mut txs[0].send("bar")).unwrap(); poll_ready(&mut txs[0].send("baz")).unwrap(); - poll_ready(&mut txs[0].send("end")).unwrap(); - assert_eq!(poll_ready(&mut rxs[0].recv()), Some("bar"),); - assert_eq!(poll_ready(&mut rxs[0].recv()), Some("baz"),); - // close channel - txs.remove(0); - assert_eq!(poll_ready(&mut rxs[0].recv()), Some("end"),); - assert_eq!(poll_ready(&mut rxs[0].recv()), None,); - assert_eq!(poll_ready(&mut rxs[0].recv()), None,); + // Channel at capacity - next send blocks + let mut send_fut = txs[0].send("end"); + let send_waker = poll_pending(&mut send_fut); + + // Consume one to make room + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("bar")); + assert!(send_waker.woken()); + poll_ready(&mut send_fut).unwrap(); + + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("baz")); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("end")); } #[test] fn test_multi_sender() { - // use two channels so that the first one never hits the gate let (txs, mut rxs) = channels(2); let tx_clone = txs[0].clone(); @@ -515,49 +423,54 @@ mod tests { poll_ready(&mut txs[0].send("foo")).unwrap(); poll_ready(&mut tx_clone.send("bar")).unwrap(); - assert_eq!(poll_ready(&mut rxs[0].recv()), Some("foo"),); - assert_eq!(poll_ready(&mut rxs[0].recv()), Some("bar"),); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("foo")); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("bar")); } #[test] - fn test_gate() { + fn test_per_channel_backpressure() { let (txs, mut rxs) = channels(2); - // gate initially open + // Fill channel 0 to capacity poll_ready(&mut txs[0].send("0_a")).unwrap(); - - // gate still open because channel 1 is still empty poll_ready(&mut txs[0].send("0_b")).unwrap(); - // gate still open because channel 1 is still empty prior to this call, so this call still goes through + // Channel 0 at capacity - blocks + let mut send_fut_0 = txs[0].send("0_c"); + let waker_0 = poll_pending(&mut send_fut_0); + + // Channel 1 is independent - can still send! poll_ready(&mut txs[1].send("1_a")).unwrap(); + poll_ready(&mut txs[1].send("1_b")).unwrap(); - // both channels non-empty => gate closed + // Channel 1 now also at capacity - blocks + let mut send_fut_1 = txs[1].send("1_c"); + let waker_1 = poll_pending(&mut send_fut_1); - let mut send_fut = txs[1].send("1_b"); - let waker = poll_pending(&mut send_fut); + // Drain channel 0 - only channel 0 senders wake + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("0_a")); + assert!(waker_0.woken()); + assert!(!waker_1.woken()); - // drain channel 0 - assert_eq!(poll_ready(&mut rxs[0].recv()), Some("0_a"),); - poll_pending(&mut send_fut); - assert_eq!(poll_ready(&mut rxs[0].recv()), Some("0_b"),); + // Channel 0 sender can now proceed + poll_ready(&mut send_fut_0).unwrap(); - // channel 0 empty => gate open - assert!(waker.woken()); - poll_ready(&mut send_fut).unwrap(); + // Drain channel 1 + assert_eq!(poll_ready(&mut rxs[1].recv()), Some("1_a")); + assert!(waker_1.woken()); + poll_ready(&mut send_fut_1).unwrap(); } #[test] fn test_close_channel_by_dropping_tx() { - let (mut txs, mut rxs) = channels(2); + let (mut txs, mut rxs) = channels::<&str>(2); let tx0 = txs.remove(0); - let tx1 = txs.remove(0); + let _tx1 = txs.remove(0); let tx0_clone = tx0.clone(); let mut recv_fut = rxs[0].recv(); - poll_ready(&mut tx1.send("a")).unwrap(); let recv_waker = poll_pending(&mut recv_fut); // drop original sender @@ -565,74 +478,58 @@ mod tests { // not yet closed (there's a clone left) assert!(!recv_waker.woken()); - poll_ready(&mut tx1.send("b")).unwrap(); let recv_waker = poll_pending(&mut recv_fut); // create new clone let tx0_clone2 = tx0_clone.clone(); assert!(!recv_waker.woken()); - poll_ready(&mut tx1.send("c")).unwrap(); let recv_waker = poll_pending(&mut recv_fut); // drop first clone drop(tx0_clone); assert!(!recv_waker.woken()); - poll_ready(&mut tx1.send("d")).unwrap(); let recv_waker = poll_pending(&mut recv_fut); // drop last clone drop(tx0_clone2); - // channel closed => also close gate - poll_pending(&mut tx1.send("e")); + // channel closed assert!(recv_waker.woken()); - assert_eq!(poll_ready(&mut recv_fut), None,); + assert_eq!(poll_ready(&mut recv_fut), None); } #[test] - fn test_close_channel_by_dropping_rx_on_open_gate() { + fn test_close_channel_by_dropping_rx() { let (txs, mut rxs) = channels(2); let rx0 = rxs.remove(0); let _rx1 = rxs.remove(0); - poll_ready(&mut txs[1].send("a")).unwrap(); - - // drop receiver => also close gate + // drop receiver drop(rx0); - poll_pending(&mut txs[1].send("b")); - assert_eq!(poll_ready(&mut txs[0].send("foo")), Err(SendError("foo")),); + assert_eq!(poll_ready(&mut txs[0].send("foo")), Err(SendError("foo"))); } #[test] - fn test_close_channel_by_dropping_rx_on_closed_gate() { - let (txs, mut rxs) = channels(2); + fn test_close_channel_by_dropping_rx_wakes_blocked_senders() { + let (txs, mut rxs) = channels(1); let rx0 = rxs.remove(0); - let mut rx1 = rxs.remove(0); - // fill both channels - poll_ready(&mut txs[0].send("0_a")).unwrap(); - poll_ready(&mut txs[1].send("1_a")).unwrap(); + // Fill channel to capacity + poll_ready(&mut txs[0].send("a")).unwrap(); + poll_ready(&mut txs[0].send("b")).unwrap(); - let mut send_fut0 = txs[0].send("0_b"); - let mut send_fut1 = txs[1].send("1_b"); - let waker0 = poll_pending(&mut send_fut0); - let waker1 = poll_pending(&mut send_fut1); + // Sender blocked at capacity + let mut send_fut = txs[0].send("c"); + let waker = poll_pending(&mut send_fut); - // drop receiver + // Drop receiver - should wake blocked sender drop(rx0); - assert!(waker0.woken()); - assert!(!waker1.woken()); - assert_eq!(poll_ready(&mut send_fut0), Err(SendError("0_b")),); - - // gate closed, so cannot send on channel 1 - poll_pending(&mut send_fut1); - - // channel 1 can still receive data - assert_eq!(poll_ready(&mut rx1.recv()), Some("1_a"),); + assert!(waker.woken()); + assert_eq!(poll_ready(&mut send_fut), Err(SendError("c"))); } #[test] @@ -646,7 +543,7 @@ mod tests { let rx1 = rxs.remove(0); let _rx2 = rxs.remove(0); - // fill channels + // fill channels (one item each, below capacity) poll_ready(&mut tx0.send("0_a")).unwrap(); poll_ready(&mut tx1.send("1_a")).unwrap(); poll_ready(&mut tx2.send("2_a")).unwrap(); @@ -655,12 +552,13 @@ mod tests { drop(rx1); // receive data - assert_eq!(poll_ready(&mut rx0.recv()), Some("0_a"),); + assert_eq!(poll_ready(&mut rx0.recv()), Some("0_a")); // use senders again poll_ready(&mut tx0.send("0_b")).unwrap(); - assert_eq!(poll_ready(&mut tx1.send("1_b")), Err(SendError("1_b")),); - poll_pending(&mut tx2.send("2_b")); + assert_eq!(poll_ready(&mut tx1.send("1_b")), Err(SendError("1_b"))); + // Per-channel: tx2 can still send (channel 2 has capacity) + poll_ready(&mut tx2.send("2_b")).unwrap(); } #[test] @@ -697,15 +595,22 @@ mod tests { assert!(waker_1a.woken()); assert!(waker_1b.woken()); assert!(waker_2.woken()); - assert_eq!(poll_ready(&mut recv_fut), Some("a"),); + assert_eq!(poll_ready(&mut recv_fut), Some("a")); + // Send up to capacity poll_ready(&mut txs[0].send("b")).unwrap(); - let mut send_fut = txs[0].send("c"); + poll_ready(&mut txs[0].send("c")).unwrap(); + + // Channel at capacity, next send blocks + let mut send_fut = txs[0].send("d"); let waker_3 = poll_pending(&mut send_fut); - assert_eq!(poll_ready(&mut rxs[0].recv()), Some("b"),); + + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("b")); assert!(waker_3.woken()); poll_ready(&mut send_fut).unwrap(); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("c")); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("d")); let mut recv_fut = rxs[0].recv(); let waker_4 = poll_pending(&mut recv_fut); @@ -713,17 +618,32 @@ mod tests { let mut recv_fut = rxs[0].recv(); let waker_5 = poll_pending(&mut recv_fut); - poll_ready(&mut txs[0].send("d")).unwrap(); - let mut send_fut = txs[0].send("e"); - let waker_6a = poll_pending(&mut send_fut); - let waker_6b = poll_pending(&mut send_fut); + poll_ready(&mut txs[0].send("e")).unwrap(); assert!(waker_4.woken()); assert!(waker_5.woken()); - assert_eq!(poll_ready(&mut recv_fut), Some("d"),); + assert_eq!(poll_ready(&mut recv_fut), Some("e")); + } + + #[test] + fn test_poll_send_blocked_twice() { + let (txs, mut rxs) = channels(1); + + // Fill to capacity + poll_ready(&mut txs[0].send("a")).unwrap(); + poll_ready(&mut txs[0].send("b")).unwrap(); + + // Blocked - poll twice with different wakers + let mut send_fut = txs[0].send("c"); + let waker_a = poll_pending(&mut send_fut); + let waker_b = poll_pending(&mut send_fut); + + // Drain to make space + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("a")); - assert!(waker_6a.woken()); - assert!(waker_6b.woken()); + // Both wakers should be notified + assert!(waker_a.woken()); + assert!(waker_b.woken()); poll_ready(&mut send_fut).unwrap(); } From e32e3900a53befbaebadf8625ee62d73334bee99 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 6 Mar 2026 19:55:50 +0000 Subject: [PATCH 2/4] Fix deadlocks in per-channel backpressure for RepartitionExec Two issues caused deadlocks with per-channel backpressure: 1. wait_for_task sent completion signals to channels sequentially. If one channel was full, it blocked sending to all remaining channels. Fixed by using futures::future::join_all to send concurrently. 2. Unclaimed output partitions (when only a subset of partitions are executed) would fill to capacity and block senders indefinitely. Fixed by tracking receiver_active state - backpressure is only enforced once the receiver has actively consumed data. Channels with inactive receivers allow unbounded buffering, preventing deadlocks while still providing backpressure for active channels. https://claude.ai/code/session_01MiFZEjX5FyFwJGAVjv8tJ3 --- .../src/repartition/distributor_channels.rs | 35 ++++++++++++++- .../physical-plan/src/repartition/mod.rs | 44 ++++++++++++------- 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/distributor_channels.rs b/datafusion/physical-plan/src/repartition/distributor_channels.rs index 55316ab3c8e9c..14e06eefee8f7 100644 --- a/datafusion/physical-plan/src/repartition/distributor_channels.rs +++ b/datafusion/physical-plan/src/repartition/distributor_channels.rs @@ -193,6 +193,8 @@ impl Future for SendFuture<'_, T> { let to_wake = { let mut guard = this.channel.state.lock(); + let receiver_active = guard.receiver_active; + let Some(data) = guard.data.as_mut() else { // receiver end dead return Poll::Ready(Err(SendError( @@ -200,8 +202,11 @@ impl Future for SendFuture<'_, T> { ))); }; - // Per-channel backpressure: block if this channel is at capacity - if data.len() >= this.channel.capacity { + // Per-channel backpressure: block if this channel is at capacity. + // Only apply backpressure when the receiver is actively consuming data. + // If the receiver has never consumed data (e.g., unclaimed output partition), + // allow unbounded buffering to prevent deadlocks. + if receiver_active && data.len() >= this.channel.capacity { guard.send_wakers.push(cx.waker().clone()); return Poll::Pending; } @@ -277,6 +282,10 @@ impl Future for RecvFuture<'_, T> { match data.pop_front() { Some(element) => { + // Mark the receiver as active now that data has been consumed. + // This enables per-channel backpressure for this channel. + channel_state.receiver_active = true; + // Wake blocked senders if the buffer was at capacity before this pop. // After popping, data.len() == old_len - 1, so old_len == data.len() + 1. // If old_len >= capacity, senders may have been blocked. @@ -333,6 +342,7 @@ impl Channel { data: Some(VecDeque::default()), recv_wakers: Some(Vec::default()), send_wakers: Vec::default(), + receiver_active: false, }), } } @@ -356,6 +366,12 @@ struct ChannelState { /// Senders are blocked when the channel buffer reaches [capacity](Channel::capacity). /// They are woken when the receiver consumes data, making space in the buffer. send_wakers: Vec, + + /// Whether the receiver has actively consumed data from this channel. + /// Backpressure is only enforced when the receiver is active. This prevents + /// deadlocks when some output partitions are never consumed (e.g., when + /// only a subset of partitions are executed). + receiver_active: bool, } impl ChannelState { @@ -431,6 +447,13 @@ mod tests { fn test_per_channel_backpressure() { let (txs, mut rxs) = channels(2); + // Activate receivers by sending and receiving one item each. + // Backpressure only applies once a receiver has actively consumed data. + poll_ready(&mut txs[0].send("0_warmup")).unwrap(); + poll_ready(&mut txs[1].send("1_warmup")).unwrap(); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("0_warmup")); + assert_eq!(poll_ready(&mut rxs[1].recv()), Some("1_warmup")); + // Fill channel 0 to capacity poll_ready(&mut txs[0].send("0_a")).unwrap(); poll_ready(&mut txs[0].send("0_b")).unwrap(); @@ -515,6 +538,10 @@ mod tests { fn test_close_channel_by_dropping_rx_wakes_blocked_senders() { let (txs, mut rxs) = channels(1); + // Activate receiver by consuming one item + poll_ready(&mut txs[0].send("warmup")).unwrap(); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("warmup")); + let rx0 = rxs.remove(0); // Fill channel to capacity @@ -629,6 +656,10 @@ mod tests { fn test_poll_send_blocked_twice() { let (txs, mut rxs) = channels(1); + // Activate receiver + poll_ready(&mut txs[0].send("warmup")).unwrap(); + assert_eq!(poll_ready(&mut rxs[0].recv()), Some("warmup")); + // Fill to capacity poll_ready(&mut txs[0].send("a")).unwrap(); poll_ready(&mut txs[0].send("b")).unwrap(); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 081f10d482e1e..d16c25fe6a7a5 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1463,37 +1463,51 @@ impl RepartitionExec { ) { // wait for completion, and propagate error // note we ignore errors on send (.ok) as that means the receiver has already shutdown. + // + // IMPORTANT: sends to all channels must happen concurrently (via join_all) + // rather than sequentially. With per-channel backpressure, a sequential loop + // could deadlock if one channel is full while other channels' receivers are + // waiting for their completion signal. match input_task.join().await { // Error in joining task Err(e) => { let e = Arc::new(e); - for (_, tx) in txs { - let err = Err(DataFusionError::Context( - "Join Error".to_string(), - Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))), - )); - tx.send(Some(err)).await.ok(); - } + futures::future::join_all(txs.into_values().map(|tx| { + let e = Arc::clone(&e); + async move { + let err = Err(DataFusionError::Context( + "Join Error".to_string(), + Box::new(DataFusionError::External(Box::new(e))), + )); + tx.send(Some(err)).await.ok(); + } + })) + .await; } // Error from running input task Ok(Err(e)) => { // send the same Arc'd error to all output partitions let e = Arc::new(e); - for (_, tx) in txs { - // wrap it because need to send error to all output partitions - let err = Err(DataFusionError::from(&e)); - tx.send(Some(err)).await.ok(); - } + futures::future::join_all(txs.into_values().map(|tx| { + let e = Arc::clone(&e); + async move { + let err = Err(DataFusionError::from(&e)); + tx.send(Some(err)).await.ok(); + } + })) + .await; } // Input task completed successfully Ok(Ok(())) => { // notify each output partition that this input partition has no more data - for (_partition, tx) in txs { - tx.send(None).await.ok(); - } + futures::future::join_all( + txs.into_values() + .map(|tx| async move { tx.send(None).await.ok() }), + ) + .await; } } } From 31d2da31271ed55b777c9d64cc302c1ddc1f46de Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 6 Mar 2026 20:52:35 +0000 Subject: [PATCH 3/4] Increase CHANNEL_CAPACITY to 8 for better throughput https://claude.ai/code/session_01MiFZEjX5FyFwJGAVjv8tJ3 --- .../src/repartition/distributor_channels.rs | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/distributor_channels.rs b/datafusion/physical-plan/src/repartition/distributor_channels.rs index 14e06eefee8f7..37c830c223b89 100644 --- a/datafusion/physical-plan/src/repartition/distributor_channels.rs +++ b/datafusion/physical-plan/src/repartition/distributor_channels.rs @@ -60,16 +60,24 @@ use parking_lot::Mutex; /// Per-channel backpressure capacity. /// /// Each channel can buffer up to this many items before blocking senders. -/// A value of 2 allows sender and receiver to overlap operations for better +/// A value of 8 allows sender and receiver to overlap operations for better /// throughput while still providing meaningful backpressure. -const CHANNEL_CAPACITY: usize = 2; +const CHANNEL_CAPACITY: usize = 8; /// Create `n` empty channels with per-channel backpressure. pub fn channels( n: usize, +) -> (Vec>, Vec>) { + channels_with_capacity(n, CHANNEL_CAPACITY) +} + +/// Create `n` empty channels with a specific capacity per channel. +fn channels_with_capacity( + n: usize, + capacity: usize, ) -> (Vec>, Vec>) { let channels = (0..n) - .map(|_| Arc::new(Channel::new_with_one_sender(CHANNEL_CAPACITY))) + .map(|_| Arc::new(Channel::new_with_one_sender(capacity))) .collect::>(); let senders = channels .iter() @@ -402,9 +410,14 @@ mod tests { use super::*; + /// Test helper: create channels with capacity 2 (matching original test assumptions). + fn test_channels(n: usize) -> (Vec>, Vec>) { + channels_with_capacity(n, 2) + } + #[test] fn test_single_channel_send_recv() { - let (txs, mut rxs) = channels(1); + let (txs, mut rxs) = test_channels(1); let mut recv_fut = rxs[0].recv(); let waker = poll_pending(&mut recv_fut); @@ -432,7 +445,7 @@ mod tests { #[test] fn test_multi_sender() { - let (txs, mut rxs) = channels(2); + let (txs, mut rxs) = test_channels(2); let tx_clone = txs[0].clone(); @@ -445,7 +458,7 @@ mod tests { #[test] fn test_per_channel_backpressure() { - let (txs, mut rxs) = channels(2); + let (txs, mut rxs) = test_channels(2); // Activate receivers by sending and receiving one item each. // Backpressure only applies once a receiver has actively consumed data. @@ -486,7 +499,7 @@ mod tests { #[test] fn test_close_channel_by_dropping_tx() { - let (mut txs, mut rxs) = channels::<&str>(2); + let (mut txs, mut rxs) = test_channels::<&str>(2); let tx0 = txs.remove(0); let _tx1 = txs.remove(0); @@ -523,7 +536,7 @@ mod tests { #[test] fn test_close_channel_by_dropping_rx() { - let (txs, mut rxs) = channels(2); + let (txs, mut rxs) = test_channels(2); let rx0 = rxs.remove(0); let _rx1 = rxs.remove(0); @@ -536,7 +549,7 @@ mod tests { #[test] fn test_close_channel_by_dropping_rx_wakes_blocked_senders() { - let (txs, mut rxs) = channels(1); + let (txs, mut rxs) = test_channels(1); // Activate receiver by consuming one item poll_ready(&mut txs[0].send("warmup")).unwrap(); @@ -561,7 +574,7 @@ mod tests { #[test] fn test_drop_rx_three_channels() { - let (mut txs, mut rxs) = channels(3); + let (mut txs, mut rxs) = test_channels(3); let tx0 = txs.remove(0); let tx1 = txs.remove(0); @@ -590,7 +603,7 @@ mod tests { #[test] fn test_close_channel_by_dropping_rx_clears_data() { - let (txs, rxs) = channels(1); + let (txs, rxs) = test_channels(1); let obj = Arc::new(()); let counter = Arc::downgrade(&obj); @@ -609,7 +622,7 @@ mod tests { /// Ensure that polling "pending" futures work even when you poll them too often (which happens under some circumstances). #[test] fn test_poll_empty_channel_twice() { - let (txs, mut rxs) = channels(1); + let (txs, mut rxs) = test_channels(1); let mut recv_fut = rxs[0].recv(); let waker_1a = poll_pending(&mut recv_fut); @@ -654,7 +667,7 @@ mod tests { #[test] fn test_poll_send_blocked_twice() { - let (txs, mut rxs) = channels(1); + let (txs, mut rxs) = test_channels(1); // Activate receiver poll_ready(&mut txs[0].send("warmup")).unwrap(); @@ -681,7 +694,7 @@ mod tests { #[test] #[should_panic(expected = "polled ready future")] fn test_panic_poll_send_future_after_ready_ok() { - let (txs, _rxs) = channels(1); + let (txs, _rxs) = test_channels(1); let mut fut = txs[0].send("foo"); poll_ready(&mut fut).unwrap(); poll_ready(&mut fut).ok(); @@ -690,7 +703,7 @@ mod tests { #[test] #[should_panic(expected = "polled ready future")] fn test_panic_poll_send_future_after_ready_err() { - let (txs, rxs) = channels(1); + let (txs, rxs) = test_channels(1); drop(rxs); @@ -702,7 +715,7 @@ mod tests { #[test] #[should_panic(expected = "polled ready future")] fn test_panic_poll_recv_future_after_ready_some() { - let (txs, mut rxs) = channels(1); + let (txs, mut rxs) = test_channels(1); poll_ready(&mut txs[0].send("foo")).unwrap(); @@ -714,7 +727,7 @@ mod tests { #[test] #[should_panic(expected = "polled ready future")] fn test_panic_poll_recv_future_after_ready_none() { - let (txs, mut rxs) = channels::(1); + let (txs, mut rxs) = test_channels::(1); drop(txs); From 97e8c5aa3017f72a60781d3d7a846c79bfc6ccff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 7 Mar 2026 06:59:51 +0100 Subject: [PATCH 4/4] Increase CHANNEL_CAPACITY to 16 Increased the channel capacity from 8 to 16 to improve throughput. --- .../physical-plan/src/repartition/distributor_channels.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/distributor_channels.rs b/datafusion/physical-plan/src/repartition/distributor_channels.rs index 37c830c223b89..f49f859bee03d 100644 --- a/datafusion/physical-plan/src/repartition/distributor_channels.rs +++ b/datafusion/physical-plan/src/repartition/distributor_channels.rs @@ -60,9 +60,9 @@ use parking_lot::Mutex; /// Per-channel backpressure capacity. /// /// Each channel can buffer up to this many items before blocking senders. -/// A value of 8 allows sender and receiver to overlap operations for better +/// A value of 16 allows sender and receiver to overlap operations for better /// throughput while still providing meaningful backpressure. -const CHANNEL_CAPACITY: usize = 8; +const CHANNEL_CAPACITY: usize = 16; /// Create `n` empty channels with per-channel backpressure. pub fn channels(