Skip to content

Commit 30336f9

Browse files
committed
close streams when a SocketError occurs
1 parent 49387b5 commit 30336f9

2 files changed

Lines changed: 32 additions & 16 deletions

File tree

lib/net-http2/client.rb

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ module NetHttp2
99
PROXY_SETTINGS_KEYS = [:proxy_addr, :proxy_port, :proxy_user, :proxy_pass]
1010

1111
AsyncRequestTimeout = Class.new(StandardError)
12+
SocketClosedError = Class.new(StandardError)
1213

1314
class Client
1415

@@ -39,8 +40,7 @@ def call(method, path, options={})
3940

4041
def call_async(request)
4142
ensure_open
42-
stream = new_monitored_stream_for request
43-
stream.async_call_with request
43+
new_stream.async_call_with request
4444
end
4545

4646
def prepare_request(method, path, options={})
@@ -53,6 +53,7 @@ def ssl?
5353

5454
def close
5555
exit_thread(@socket_thread)
56+
@socket_error = SocketClosedError.new
5657
init_vars
5758
end
5859

@@ -74,10 +75,14 @@ def stream_count
7475

7576
private
7677

77-
def init_vars
78+
def init_vars(error: nil)
7879
@mutex.synchronize do
7980
@socket.close if @socket && !@socket.closed?
8081

82+
(@streams || {}).each do |k, v|
83+
v.force_close(@socket_error)
84+
end
85+
8186
@h2 = nil
8287
@socket = nil
8388
@socket_thread = nil
@@ -86,20 +91,16 @@ def init_vars
8691
end
8792
end
8893

89-
def new_stream
90-
@mutex.synchronize { NetHttp2::Stream.new(h2_stream: h2.new_stream) }
91-
rescue StandardError => e
92-
close
93-
raise e
94-
end
95-
96-
def new_monitored_stream_for(request)
97-
stream = new_stream
94+
def new_stream()
95+
stream = @mutex.synchronize { NetHttp2::Stream.new(h2_stream: h2.new_stream) }
9896

99-
@streams[stream.id] = true
100-
request.on(:close) { @streams.delete(stream.id) }
97+
@streams[stream.id] = stream
98+
stream.on(:close) { @streams.delete(stream.id) }
10199

102100
stream
101+
rescue StandardError => e
102+
close
103+
raise e
103104
end
104105

105106
def ensure_open
@@ -115,13 +116,15 @@ def ensure_open
115116

116117
rescue EOFError
117118
# socket closed
119+
@socket_error = SocketError.new('Socket was remotely closed')
118120
init_vars
119-
callback_or_raise SocketError.new('Socket was remotely closed')
121+
callback_or_raise @socket_error
120122

121123
rescue Exception => e
122124
# error on socket
125+
@socket_error = e
123126
init_vars
124-
callback_or_raise e
127+
callback_or_raise @socket_error
125128
end
126129
end.tap { |t| t.abort_on_exception = true }
127130
end

lib/net-http2/stream.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ def initialize(options={})
1010
@async = false
1111
@completed = false
1212
@mutex = Mutex.new
13+
@error = nil
1314
@cv = ConditionVariable.new
1415

1516
listen_for_headers
@@ -42,6 +43,15 @@ def async?
4243
@async
4344
end
4445

46+
def wait
47+
wait_for_completed
48+
end
49+
50+
def force_close(error = nil)
51+
@error = error
52+
@mutex.synchronize { @cv.signal }
53+
end
54+
4555
private
4656

4757
def listen_for_headers
@@ -98,6 +108,9 @@ def sync_respond
98108

99109
def wait_for_completed
100110
@mutex.synchronize { @cv.wait(@mutex, @request.timeout) }
111+
if @error || !@completed
112+
raise @error
113+
end
101114
end
102115
end
103116
end

0 commit comments

Comments
 (0)