Skip to content

Commit fc9a744

Browse files
committed
comments
1 parent 52a06a4 commit fc9a744

1 file changed

Lines changed: 13 additions & 16 deletions

File tree

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,35 +91,33 @@ impl SpillPoolShared {
9191

9292
/// Tracks the number of live [`SpillPoolWriter`] clones.
9393
///
94-
/// Cloning increments the count; dropping decrements it.
95-
/// [`WriterCount::is_last`] returns `true` when called from the final clone,
96-
/// which the writer uses to decide whether to finalize the spill pool.
94+
/// Cloning increments the count. [`WriterCount::decrement`] atomically
95+
/// decrements the count and reports whether the caller was the last clone.
96+
/// This combined operation avoids TOCTOU races between checking the count
97+
/// and decrementing it.
9798
struct WriterCount(Arc<AtomicUsize>);
9899

99100
impl WriterCount {
100101
fn new() -> Self {
101102
Self(Arc::new(AtomicUsize::new(1)))
102103
}
103104

104-
/// Returns `true` if this is the only remaining clone.
105-
fn is_last(&self) -> bool {
106-
self.0.load(Ordering::Acquire) == 1
105+
/// Decrements the count and returns `true` if this was the last clone.
106+
///
107+
/// This is a single atomic operation, so concurrent drops cannot both
108+
/// observe themselves as "last".
109+
fn decrement(&self) -> bool {
110+
self.0.fetch_sub(1, Ordering::SeqCst) == 1
107111
}
108112
}
109113

110114
impl Clone for WriterCount {
111115
fn clone(&self) -> Self {
112-
self.0.fetch_add(1, Ordering::Relaxed);
116+
self.0.fetch_add(1, Ordering::SeqCst);
113117
Self(Arc::clone(&self.0))
114118
}
115119
}
116120

117-
impl Drop for WriterCount {
118-
fn drop(&mut self) {
119-
self.0.fetch_sub(1, Ordering::Release);
120-
}
121-
}
122-
123121
/// Writer for a spill pool. Provides coordinated write access with FIFO semantics.
124122
///
125123
/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
@@ -266,10 +264,9 @@ impl SpillPoolWriter {
266264

267265
impl Drop for SpillPoolWriter {
268266
fn drop(&mut self) {
269-
if !self.writer_count.is_last() {
267+
if !self.writer_count.decrement() {
270268
// Other writer clones are still active; do not finalize or
271-
// signal EOF to readers. `self.writer_count` is decremented
272-
// automatically when this `Drop` returns.
269+
// signal EOF to readers.
273270
return;
274271
}
275272

0 commit comments

Comments
 (0)