diff --git a/fixtures/async/chainable_async.rb b/fixtures/async/chainable_async.rb index 826b0788..7cbab411 100644 --- a/fixtures/async/chainable_async.rb +++ b/fixtures/async/chainable_async.rb @@ -9,7 +9,9 @@ module Async let(:chainable) {subject.new(parent: parent)} it "should chain async to parent" do - expect(parent).to receive(:async).and_return(nil) + expect(parent).to receive(:async).and_return{|*arguments, **options, &block| + Async(*arguments, **options, &block) + } chainable.async do # Nothing. diff --git a/lib/async/barrier.rb b/lib/async/barrier.rb index 769b19ac..582fd33b 100644 --- a/lib/async/barrier.rb +++ b/lib/async/barrier.rb @@ -18,6 +18,7 @@ class Barrier def initialize(parent: nil) @tasks = List.new @finished = Queue.new + @condition = Condition.new @parent = parent end @@ -42,18 +43,32 @@ def size # Execute a child task and add it to the barrier. # @asynchronous Executes the given block concurrently. + # @returns [Task] The task which was created to execute the block. def async(*arguments, parent: (@parent or Task.current), **options, &block) raise "Barrier is stopped!" if @finished.closed? waiting = nil - parent.async(*arguments, **options) do |task, *arguments| - waiting = TaskNode.new(task) - @tasks.append(waiting) + task = parent.async(*arguments, **options) do |task, *arguments| + # Create a new list node for the task and add it to the list of waiting tasks: + node = TaskNode.new(task) + @tasks.append(node) + + # Signal the outer async block that we have added the task to the list of waiting tasks, and that it can now wait for it to finish: + waiting = node + @condition.signal + + # Invoke the block, which may raise an error. If it does, we will still signal that the task has finished: block.call(task, *arguments) ensure - @finished.signal(waiting) unless @finished.closed? + # Signal that the task has finished, which will unblock the waiting task: + @finished.signal(node) unless @finished.closed? end + + # `parent.async` may yield before the child block executes, so we wait here until the child has appended itself to `@tasks`, ensuring `wait` cannot return early and miss tracking it: + @condition.wait while waiting.nil? + + return task end # Whether there are any tasks being held by the barrier. diff --git a/releases.md b/releases.md index fe731175..381f146a 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,9 @@ # Releases +## Unreleased + + - Fix `Barrier#async` when `parent.async` yields before the child block executes. Previously, `Barrier#wait` could return early and miss tracking the task entirely, because the task had not yet appended itself to the barrier's task list. + ## v2.38.0 - Rename `Task#stop` to `Task#cancel` for better clarity and consistency with common concurrency terminology. The old `stop` method is still available as an alias for backward compatibility, but it is recommended to use `cancel` going forward. diff --git a/test/async/barrier.rb b/test/async/barrier.rb index 8f00a80a..690a9b70 100644 --- a/test/async/barrier.rb +++ b/test/async/barrier.rb @@ -133,6 +133,31 @@ barrier.stop end end + + it "waits even if the child task yields immediately" do + class Yielder + def async(*arguments, **options, &block) + Async(*arguments, **options) do |task, *arguments| + task.yield + block.call(task, *arguments) + end + end + end + + parent = Yielder.new + + 3.times do |i| + barrier.async(parent:){i} + end + + expect(barrier.size).to be == 3 + + results = [] + barrier.wait do |task| + results << task.wait + end + expect(results.sort).to be == [0, 1, 2] + end end with "#stop" do