@@ -61,6 +61,10 @@ struct SpillPoolShared {
6161 /// Writer's reference to the current file (shared by all cloned writers).
6262 /// Has its own lock to allow I/O without blocking queue access.
6363 current_write_file : Option < Arc < Mutex < ActiveSpillFileShared > > > ,
64+ /// Number of active writer clones. Only when this reaches zero should
65+ /// `writer_dropped` be set to true. This prevents premature EOF signaling
66+ /// when one writer clone is dropped while others are still active.
67+ active_writer_count : usize ,
6468}
6569
6670impl SpillPoolShared {
@@ -72,6 +76,7 @@ impl SpillPoolShared {
7276 waker : None ,
7377 writer_dropped : false ,
7478 current_write_file : None ,
79+ active_writer_count : 1 ,
7580 }
7681 }
7782
@@ -97,7 +102,6 @@ impl SpillPoolShared {
97102/// The writer automatically manages file rotation based on the `max_file_size_bytes`
98103/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the
99104/// current file so readers can access all written data.
100- #[ derive( Clone ) ]
101105pub struct SpillPoolWriter {
102106 /// Maximum size in bytes before rotating to a new file.
103107 /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`.
@@ -106,6 +110,18 @@ pub struct SpillPoolWriter {
106110 shared : Arc < Mutex < SpillPoolShared > > ,
107111}
108112
113+ impl Clone for SpillPoolWriter {
114+ fn clone ( & self ) -> Self {
115+ // Increment the active writer count so that `writer_dropped` is only
116+ // set to true when the *last* clone is dropped.
117+ self . shared . lock ( ) . active_writer_count += 1 ;
118+ Self {
119+ max_file_size_bytes : self . max_file_size_bytes ,
120+ shared : Arc :: clone ( & self . shared ) ,
121+ }
122+ }
123+ }
124+
109125impl SpillPoolWriter {
110126 /// Spills a batch to the pool, rotating files when necessary.
111127 ///
@@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter {
233249 fn drop ( & mut self ) {
234250 let mut shared = self . shared . lock ( ) ;
235251
252+ shared. active_writer_count -= 1 ;
253+ let is_last_writer = shared. active_writer_count == 0 ;
254+
255+ if !is_last_writer {
256+ // Other writer clones are still active; do not finalize or
257+ // signal EOF to readers.
258+ return ;
259+ }
260+
236261 // Finalize the current file when the last writer is dropped
237262 if let Some ( current_file) = shared. current_write_file . take ( ) {
238263 // Release shared lock before locking file
@@ -1343,6 +1368,90 @@ mod tests {
13431368 Ok ( ( ) )
13441369 }
13451370
1371+ /// Regression test for data loss when multiple writer clones are used.
1372+ ///
1373+ /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
1374+ /// mode all input partition tasks share clones of the same writer. Before
1375+ /// the fix, `Drop` unconditionally set `writer_dropped = true` even when
1376+ /// other clones were still alive. This caused the `SpillPoolReader` to
1377+ /// return EOF prematurely, silently losing every batch written by the
1378+ /// remaining writers.
1379+ ///
1380+ /// The test sequence is:
1381+ ///
1382+ /// 1. writer1 writes a batch, then is dropped.
1383+ /// 2. The reader consumes that batch.
1384+ /// 3. The reader polls again — the queue is now empty.
1385+ /// - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF).
1386+ /// - **Fix**: `active_writer_count > 0` → `Pending` (wait for data).
1387+ /// 4. writer2 (still alive) writes a batch.
1388+ /// 5. The reader must see that batch — not silently lose it.
1389+ #[ tokio:: test]
1390+ async fn test_clone_drop_does_not_signal_eof_prematurely ( ) -> Result < ( ) > {
1391+ let ( writer1, mut reader) = create_spill_channel ( 1024 * 1024 ) ;
1392+ let writer2 = writer1. clone ( ) ;
1393+
1394+ // Synchronization: tell writer2 when it may proceed.
1395+ let ( proceed_tx, proceed_rx) = tokio:: sync:: oneshot:: channel :: < ( ) > ( ) ;
1396+
1397+ // Spawn writer2 — it waits for the signal before writing.
1398+ let writer2_handle = SpawnedTask :: spawn ( async move {
1399+ proceed_rx. await . unwrap ( ) ;
1400+ writer2. push_batch ( & create_test_batch ( 10 , 10 ) ) . unwrap ( ) ;
1401+ // writer2 is dropped here (last clone → true EOF)
1402+ } ) ;
1403+
1404+ // Writer1 writes one batch, then drops.
1405+ writer1. push_batch ( & create_test_batch ( 0 , 10 ) ) ?;
1406+ drop ( writer1) ;
1407+
1408+ // Read writer1's batch.
1409+ let batch1 = reader. next ( ) . await . unwrap ( ) ?;
1410+ assert_eq ! ( batch1. num_rows( ) , 10 ) ;
1411+ let col = batch1
1412+ . column ( 0 )
1413+ . as_any ( )
1414+ . downcast_ref :: < Int32Array > ( )
1415+ . unwrap ( ) ;
1416+ assert_eq ! ( col. value( 0 ) , 0 ) ;
1417+
1418+ // Signal writer2 to write its batch. It will execute when the
1419+ // current task yields (i.e. when reader.next() returns Pending).
1420+ proceed_tx. send ( ( ) ) . unwrap ( ) ;
1421+
1422+ // With the bug the reader returns None here because it already
1423+ // saw writer_dropped=true on an empty queue. With the fix it
1424+ // returns Pending, the runtime schedules writer2, and the batch
1425+ // becomes available.
1426+ let batch2 = tokio:: time:: timeout (
1427+ std:: time:: Duration :: from_secs ( 5 ) ,
1428+ reader. next ( ) ,
1429+ )
1430+ . await
1431+ . expect ( "Reader timed out — should not hang" ) ;
1432+
1433+ assert ! (
1434+ batch2. is_some( ) ,
1435+ "Reader returned None prematurely — batch from writer2 was lost \
1436+ because dropping writer1 incorrectly signaled EOF"
1437+ ) ;
1438+ let batch2 = batch2. unwrap ( ) ?;
1439+ assert_eq ! ( batch2. num_rows( ) , 10 ) ;
1440+ let col = batch2
1441+ . column ( 0 )
1442+ . as_any ( )
1443+ . downcast_ref :: < Int32Array > ( )
1444+ . unwrap ( ) ;
1445+ assert_eq ! ( col. value( 0 ) , 10 ) ;
1446+
1447+ writer2_handle. await . unwrap ( ) ;
1448+
1449+ // All writers dropped — reader should see real EOF now.
1450+ assert ! ( reader. next( ) . await . is_none( ) ) ;
1451+
1452+ Ok ( ( ) )
1453+ }
1454+
13461455 #[ tokio:: test]
13471456 async fn test_disk_usage_decreases_as_files_consumed ( ) -> Result < ( ) > {
13481457 use datafusion_execution:: runtime_env:: RuntimeEnvBuilder ;
0 commit comments