From 4c0bfbcc988b730a3ff8cd856b531eb109070727 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Tue, 17 Mar 2026 12:48:42 -0400 Subject: [PATCH 1/2] Fix Barrier when parent.async() yields early If `parent.async()` returns before running `@tasks.append(node)`, then `barrier.async()` itself will return before `@tasks` is updated, causing `barrier.wait` to not actually wait on the new task. --- fixtures/async/chainable_async.rb | 4 +++- lib/async/barrier.rb | 17 +++++++++++++---- test/async/barrier.rb | 25 +++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/fixtures/async/chainable_async.rb b/fixtures/async/chainable_async.rb index 826b0788..2292cee9 100644 --- a/fixtures/async/chainable_async.rb +++ b/fixtures/async/chainable_async.rb @@ -9,7 +9,9 @@ module Async let(:chainable) {subject.new(parent: parent)} it "should chain async to parent" do - expect(parent).to receive(:async).and_return(nil) + expect(parent).to receive(:async).and_return {|*arguments, **options, &block| + Async(*arguments, **options, &block) + } chainable.async do # Nothing. diff --git a/lib/async/barrier.rb b/lib/async/barrier.rb index 769b19ac..271bcf93 100644 --- a/lib/async/barrier.rb +++ b/lib/async/barrier.rb @@ -18,6 +18,7 @@ class Barrier def initialize(parent: nil) @tasks = List.new @finished = Queue.new + @cond = Condition.new @parent = parent end @@ -47,13 +48,21 @@ def async(*arguments, parent: (@parent or Task.current), **options, &block) waiting = nil - parent.async(*arguments, **options) do |task, *arguments| - waiting = TaskNode.new(task) - @tasks.append(waiting) + task = parent.async(*arguments, **options) do |task, *arguments| + node = TaskNode.new(task) + @tasks.append(node) + + waiting = node + @cond.signal + block.call(task, *arguments) ensure - @finished.signal(waiting) unless @finished.closed? + @finished.signal(node) unless @finished.closed? end + + @cond.wait while waiting.nil? + + task end # Whether there are any tasks being held by the barrier. diff --git a/test/async/barrier.rb b/test/async/barrier.rb index 8f00a80a..1a69f335 100644 --- a/test/async/barrier.rb +++ b/test/async/barrier.rb @@ -133,6 +133,31 @@ barrier.stop end end + + it "waits even if the child task yields immediately" do + class Yielder + def async(*arguments, **options, &block) + Async(*arguments, **options) do |task, *arguments| + task.yield + block.call(task, *arguments) + end + end + end + + parent = Yielder.new + + 3.times do |i| + barrier.async(parent:) {i} + end + + expect(barrier.size).to be == 3 + + results = [] + barrier.wait do |task| + results << task.wait + end + expect(results.sort).to be == [0, 1, 2] + end end with "#stop" do From 29d75323e4ac3825d3646276cbf3ee17b042dd91 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 18 Mar 2026 08:48:40 +1300 Subject: [PATCH 2/2] Code formatting & release notes. --- fixtures/async/chainable_async.rb | 2 +- lib/async/barrier.rb | 22 ++++++++++++++-------- releases.md | 4 ++++ test/async/barrier.rb | 12 ++++++------ 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/fixtures/async/chainable_async.rb b/fixtures/async/chainable_async.rb index 2292cee9..7cbab411 100644 --- a/fixtures/async/chainable_async.rb +++ b/fixtures/async/chainable_async.rb @@ -9,7 +9,7 @@ module Async let(:chainable) {subject.new(parent: parent)} it "should chain async to parent" do - expect(parent).to receive(:async).and_return {|*arguments, **options, &block| + expect(parent).to receive(:async).and_return{|*arguments, **options, &block| Async(*arguments, **options, &block) } diff --git a/lib/async/barrier.rb b/lib/async/barrier.rb index 271bcf93..582fd33b 100644 --- a/lib/async/barrier.rb +++ b/lib/async/barrier.rb @@ -18,7 +18,7 @@ class Barrier def initialize(parent: nil) @tasks = List.new @finished = Queue.new - @cond = Condition.new + @condition = Condition.new @parent = parent end @@ -43,26 +43,32 @@ def size # Execute a child task and add it to the barrier. # @asynchronous Executes the given block concurrently. + # @returns [Task] The task which was created to execute the block. def async(*arguments, parent: (@parent or Task.current), **options, &block) raise "Barrier is stopped!" if @finished.closed? waiting = nil task = parent.async(*arguments, **options) do |task, *arguments| + # Create a new list node for the task and add it to the list of waiting tasks: node = TaskNode.new(task) @tasks.append(node) - + + # Signal the outer async block that we have added the task to the list of waiting tasks, and that it can now wait for it to finish: waiting = node - @cond.signal - + @condition.signal + + # Invoke the block, which may raise an error. If it does, we will still signal that the task has finished: block.call(task, *arguments) ensure + # Signal that the task has finished, which will unblock the waiting task: @finished.signal(node) unless @finished.closed? end - - @cond.wait while waiting.nil? - - task + + # `parent.async` may yield before the child block executes, so we wait here until the child has appended itself to `@tasks`, ensuring `wait` cannot return early and miss tracking it: + @condition.wait while waiting.nil? + + return task end # Whether there are any tasks being held by the barrier. diff --git a/releases.md b/releases.md index fe731175..381f146a 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,9 @@ # Releases +## Unreleased + + - Fix `Barrier#async` when `parent.async` yields before the child block executes. Previously, `Barrier#wait` could return early and miss tracking the task entirely, because the task had not yet appended itself to the barrier's task list. + ## v2.38.0 - Rename `Task#stop` to `Task#cancel` for better clarity and consistency with common concurrency terminology. The old `stop` method is still available as an alias for backward compatibility, but it is recommended to use `cancel` going forward. diff --git a/test/async/barrier.rb b/test/async/barrier.rb index 1a69f335..690a9b70 100644 --- a/test/async/barrier.rb +++ b/test/async/barrier.rb @@ -133,7 +133,7 @@ barrier.stop end end - + it "waits even if the child task yields immediately" do class Yielder def async(*arguments, **options, &block) @@ -143,15 +143,15 @@ def async(*arguments, **options, &block) end end end - + parent = Yielder.new - + 3.times do |i| - barrier.async(parent:) {i} + barrier.async(parent:){i} end - + expect(barrier.size).to be == 3 - + results = [] barrier.wait do |task| results << task.wait