Skip to content

Commit 95c9c72

Browse files
wtnclaude
andcommitted
Fix retire/release race when resource close yields.
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 42f2142 commit 95c9c72

2 files changed

Lines changed: 88 additions & 1 deletion

File tree

lib/async/pool/controller.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
require "async/semaphore"
1313

1414
require "thread"
15+
require "set"
1516

1617
module Async
1718
module Pool
@@ -50,6 +51,9 @@ def initialize(constructor, limit: nil, concurrency: (limit || 1), policy: nil,
5051
# Used to signal when a resource has been released:
5152
@mutex = Thread::Mutex.new
5253
@condition = Thread::ConditionVariable.new
54+
55+
# Resources currently being retired (close may yield):
56+
@retiring = Set.new
5357
end
5458

5559
# @attribute [Proc] The constructor used to create new resources.
@@ -234,9 +238,11 @@ def prune(retain = 0)
234238
def retire(resource)
235239
Console.debug(self){"Retire #{resource}"}
236240

237-
@resources.delete(resource)
241+
return false unless @resources.delete(resource)
238242

243+
@retiring.add(resource)
239244
resource.close
245+
@retiring.delete(resource)
240246

241247
@mutex.synchronize{@condition.broadcast}
242248

@@ -286,6 +292,9 @@ def reuse(resource)
286292

287293
usage = @resources[resource]
288294

295+
# Already retired (e.g. connection reset during retire close):
296+
return false if usage.nil? && @retiring.include?(resource)
297+
289298
if usage.nil? || usage.zero?
290299
raise "Trying to reuse unacquired resource: #{resource}!"
291300
end

test/async/pool/retire_race.rb

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# frozen_string_literal: true
2+
3+
# Verifies that retire + release on the same resource does not raise
4+
# "Trying to reuse unacquired resource".
5+
#
6+
# In practice this happens with async-http when an HTTP/2 connection is reset:
7+
# the background reader retires the connection, but retire's resource.close
8+
# yields (e.g. sending GOAWAY), allowing another fiber to call release while
9+
# the resource is deleted from @resources but still reports reusable? = true.
10+
11+
require "async/pool/controller"
12+
require "async/pool/resource"
13+
require "sus/fixtures/async/reactor_context"
14+
15+
# A resource whose close yields (simulating async IO like sending GOAWAY),
16+
# but whose reusable? check is synchronous (no yield).
17+
class SlowCloseResource < Async::Pool::Resource
18+
def close
19+
Async::Task.current.yield
20+
super
21+
end
22+
end
23+
24+
describe Async::Pool::Controller do
25+
include Sus::Fixtures::Async::ReactorContext
26+
27+
with "retire/release race on slow-close resource" do
28+
let(:pool) {subject.new(SlowCloseResource)}
29+
30+
it "gracefully handles release after retire begins closing" do
31+
resource = pool.acquire
32+
33+
# Start retire in a child task. It runs synchronously up to the
34+
# yield inside SlowCloseResource#close, then pauses. At that point
35+
# @resources[resource] has been deleted but resource.close hasn't
36+
# finished, so reusable? still returns true.
37+
retire_task = Async do
38+
pool.retire(resource)
39+
end
40+
41+
# No yield needed — the child already ran to its yield point.
42+
# Verify the race window exists:
43+
expect(resource).to be(:reusable?)
44+
expect(pool.resources).not.to be(:key?, resource)
45+
46+
# The client's error handler now tries to release the same resource.
47+
# This should not raise — retire already claimed ownership.
48+
pool.release(resource)
49+
50+
retire_task.wait
51+
end
52+
53+
it "gracefully handles multiplexed release after retire begins closing" do
54+
constructor = lambda { SlowCloseResource.new(128) }
55+
pool = subject.new(constructor)
56+
57+
# Two streams on the same HTTP/2 connection:
58+
r1 = pool.acquire
59+
r2 = pool.acquire
60+
expect(r1).to be_equal(r2)
61+
62+
# Background reader retires (yields during close):
63+
retire_task = Async do
64+
pool.retire(r1)
65+
end
66+
67+
# The race window is open: resource deleted from pool but not
68+
# yet closed. First stream's error handler hits the race:
69+
expect(r1).to be(:reusable?)
70+
expect(pool.resources).not.to be(:key?, r1)
71+
72+
# Should not raise:
73+
pool.release(r1)
74+
75+
retire_task.wait
76+
end
77+
end
78+
end

0 commit comments

Comments
 (0)