Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 108 additions & 1 deletion datafusion/physical-plan/src/spill/spill_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ struct SpillPoolShared {
/// Writer's reference to the current file (shared by all cloned writers).
/// Has its own lock to allow I/O without blocking queue access.
current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
/// Number of active writer clones. Only when this reaches zero should
/// `writer_dropped` be set to true. This prevents premature EOF signaling
/// when one writer clone is dropped while others are still active.
active_writer_count: usize,
}

impl SpillPoolShared {
Expand All @@ -72,6 +76,7 @@ impl SpillPoolShared {
waker: None,
writer_dropped: false,
current_write_file: None,
active_writer_count: 1,
}
}

Expand All @@ -97,7 +102,6 @@ impl SpillPoolShared {
/// The writer automatically manages file rotation based on the `max_file_size_bytes`
/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the
/// current file so readers can access all written data.
#[derive(Clone)]
pub struct SpillPoolWriter {
/// Maximum size in bytes before rotating to a new file.
/// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`.
Expand All @@ -106,6 +110,18 @@ pub struct SpillPoolWriter {
shared: Arc<Mutex<SpillPoolShared>>,
}

impl Clone for SpillPoolWriter {
fn clone(&self) -> Self {
// Increment the active writer count so that `writer_dropped` is only
// set to true when the *last* clone is dropped.
self.shared.lock().active_writer_count += 1;
Self {
max_file_size_bytes: self.max_file_size_bytes,
shared: Arc::clone(&self.shared),
}
}
}

impl SpillPoolWriter {
/// Spills a batch to the pool, rotating files when necessary.
///
Expand Down Expand Up @@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter {
fn drop(&mut self) {
let mut shared = self.shared.lock();

shared.active_writer_count -= 1;
let is_last_writer = shared.active_writer_count == 0;

if !is_last_writer {
// Other writer clones are still active; do not finalize or
// signal EOF to readers.
return;
}

// Finalize the current file when the last writer is dropped
if let Some(current_file) = shared.current_write_file.take() {
// Release shared lock before locking file
Expand Down Expand Up @@ -1343,6 +1368,88 @@ mod tests {
Ok(())
}

/// Regression test for data loss when multiple writer clones are used.
///
/// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
/// mode all input partition tasks share clones of the same writer. Before
/// the fix, `Drop` unconditionally set `writer_dropped = true` even when
/// other clones were still alive. This caused the `SpillPoolReader` to
/// return EOF prematurely, silently losing every batch written by the
/// remaining writers.
///
/// The test sequence is:
///
/// 1. writer1 writes a batch, then is dropped.
/// 2. The reader consumes that batch.
/// 3. The reader polls again — the queue is now empty.
/// - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF).
/// - **Fix**: `active_writer_count > 0` → `Pending` (wait for data).
/// 4. writer2 (still alive) writes a batch.
/// 5. The reader must see that batch — not silently lose it.
#[tokio::test]
async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
let (writer1, mut reader) = create_spill_channel(1024 * 1024);
let writer2 = writer1.clone();

// Synchronization: tell writer2 when it may proceed.
let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();

// Spawn writer2 — it waits for the signal before writing.
let writer2_handle = SpawnedTask::spawn(async move {
proceed_rx.await.unwrap();
writer2.push_batch(&create_test_batch(10, 10)).unwrap();
// writer2 is dropped here (last clone → true EOF)
});

// Writer1 writes one batch, then drops.
writer1.push_batch(&create_test_batch(0, 10))?;
drop(writer1);

// Read writer1's batch.
let batch1 = reader.next().await.unwrap()?;
assert_eq!(batch1.num_rows(), 10);
let col = batch1
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(col.value(0), 0);

// Signal writer2 to write its batch. It will execute when the
// current task yields (i.e. when reader.next() returns Pending).
proceed_tx.send(()).unwrap();

// With the bug the reader returns None here because it already
// saw writer_dropped=true on an empty queue. With the fix it
// returns Pending, the runtime schedules writer2, and the batch
// becomes available.
let batch2 =
tokio::time::timeout(std::time::Duration::from_secs(5), reader.next())
.await
.expect("Reader timed out — should not hang");

assert!(
batch2.is_some(),
"Reader returned None prematurely — batch from writer2 was lost \
because dropping writer1 incorrectly signaled EOF"
);
let batch2 = batch2.unwrap()?;
assert_eq!(batch2.num_rows(), 10);
let col = batch2
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(col.value(0), 10);

writer2_handle.await.unwrap();

// All writers dropped — reader should see real EOF now.
assert!(reader.next().await.is_none());

Ok(())
}

#[tokio::test]
async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
Expand Down