diff --git a/include/exec/async_scope.hpp b/include/exec/async_scope.hpp index 9f04268af..d04c3a6bc 100644 --- a/include/exec/async_scope.hpp +++ b/include/exec/async_scope.hpp @@ -51,6 +51,7 @@ namespace exec { inplace_stop_source __stop_source_{}; mutable std::mutex __lock_{}; mutable __std::atomic_ptrdiff_t __active_ = 0; + mutable __std::atomic_ptrdiff_t __pending_notifiers_ = 0; // Track in-flight __complete() calls mutable __intrusive_queue<&__task::__next_> __waiters_{}; ~__impl() { @@ -83,8 +84,13 @@ namespace exec { // the waiter is queued but after __active is checked, the waiter will never be notified std::unique_lock __guard{this->__scope_->__lock_}; auto& __active = this->__scope_->__active_; + auto& __pending = this->__scope_->__pending_notifiers_; auto& __waiters = this->__scope_->__waiters_; - if (__active.load(__std::memory_order_acquire) != 0) { + // Also check __pending_notifiers_ to avoid race with in-flight __complete() calls. + // A __complete() that did fetch_sub but hasn't locked the mutex yet will have + // incremented __pending_notifiers_, preventing us from completing immediately. + if (__active.load(__std::memory_order_acquire) != 0 + || __pending.load(__std::memory_order_acquire) != 0) { __waiters.push_back(this); return; } @@ -158,9 +164,13 @@ namespace exec { __nest_op_base<_ReceiverId>* __op_; static void __complete(const __impl* __scope) noexcept { + // Increment pending BEFORE fetch_sub to close race window with on_empty(). + // This ensures on_empty() sees pending > 0 if we're about to lock the mutex. + __scope->__pending_notifiers_.fetch_add(1, __std::memory_order_acquire); auto& __active = __scope->__active_; - std::unique_lock __guard{__scope->__lock_}; if (__active.fetch_sub(1, __std::memory_order_acq_rel) == 1) { + std::unique_lock __guard{__scope->__lock_}; + __scope->__pending_notifiers_.fetch_sub(1, __std::memory_order_release); auto __local_waiters = std::move(__scope->__waiters_); __guard.unlock(); __scope = nullptr; @@ -170,6 +180,8 @@ namespace exec { __next->__notify_waiter(__next); // __scope must be considered deleted } + } else { + __scope->__pending_notifiers_.fetch_sub(1, __std::memory_order_release); } }