Skip to content

Commit 59cfe3b

Browse files
committed
comment
1 parent 09a8630 commit 59cfe3b

1 file changed

Lines changed: 11 additions & 18 deletions

File tree

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,24 +1368,21 @@ mod tests {
13681368
Ok(())
13691369
}
13701370

1371-
/// Regression test for data loss when multiple writer clones are used.
1371+
/// Verifies that the reader stays alive as long as any writer clone exists.
13721372
///
13731373
/// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
1374-
/// mode all input partition tasks share clones of the same writer. Before
1375-
/// the fix, `Drop` unconditionally set `writer_dropped = true` even when
1376-
/// other clones were still alive. This caused the `SpillPoolReader` to
1377-
/// return EOF prematurely, silently losing every batch written by the
1378-
/// remaining writers.
1374+
/// mode multiple input partition tasks share clones of the same writer.
1375+
/// The reader must not see EOF until **all** clones have been dropped,
1376+
/// even if the queue is temporarily empty between writes from different
1377+
/// clones.
13791378
///
13801379
/// The test sequence is:
13811380
///
13821381
/// 1. writer1 writes a batch, then is dropped.
1383-
/// 2. The reader consumes that batch.
1384-
/// 3. The reader polls again — the queue is now empty.
1385-
/// - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF).
1386-
/// - **Fix**: `active_writer_count > 0` → `Pending` (wait for data).
1387-
/// 4. writer2 (still alive) writes a batch.
1388-
/// 5. The reader must see that batch — not silently lose it.
1382+
/// 2. The reader consumes that batch (queue is now empty).
1383+
/// 3. writer2 (still alive) writes a batch.
1384+
/// 4. The reader must see that batch.
1385+
/// 5. EOF is only signalled after writer2 is also dropped.
13891386
#[tokio::test]
13901387
async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
13911388
let (writer1, mut reader) = create_spill_channel(1024 * 1024);
@@ -1419,19 +1416,15 @@ mod tests {
14191416
// current task yields (i.e. when reader.next() returns Pending).
14201417
proceed_tx.send(()).unwrap();
14211418

1422-
// With the bug the reader returns None here because it already
1423-
// saw writer_dropped=true on an empty queue. With the fix it
1424-
// returns Pending, the runtime schedules writer2, and the batch
1425-
// becomes available.
1419+
// The reader should wait (Pending) for writer2's data, not EOF.
14261420
let batch2 =
14271421
tokio::time::timeout(std::time::Duration::from_secs(5), reader.next())
14281422
.await
14291423
.expect("Reader timed out — should not hang");
14301424

14311425
assert!(
14321426
batch2.is_some(),
1433-
"Reader returned None prematurely — batch from writer2 was lost \
1434-
because dropping writer1 incorrectly signaled EOF"
1427+
"Reader must not return EOF while a writer clone is still alive"
14351428
);
14361429
let batch2 = batch2.unwrap()?;
14371430
assert_eq!(batch2.num_rows(), 10);

0 commit comments

Comments
 (0)