Skip to content

Commit c3a3925

Browse files
fabianfettglbrntt
andauthored
Fix crash when response completes before request body finishes uploading (#895)
- Fix a `fatalError("Invalid state: idle")` crash in `HTTP1ConnectionStateMachine.demandMoreResponseBodyParts()` that occurs when a response completes before the request body finishes uploading - The root cause is that `self.request` was only nilled out inside the write-completion callback for `.sendRequestEnd`, creating a window where `demandResponseBodyStream` could still see the old request and call into the state machine after it had already transitioned to .idle - The fix nils out `self.request` synchronously when handling `.sendRequestEnd` (before the write completes), and moves `requestBodyStreamSent()` to fire after the final action rather than before it --------- Co-authored-by: George Barnett <gbarnett@apple.com>
1 parent f559681 commit c3a3925

2 files changed

Lines changed: 133 additions & 11 deletions

File tree

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -243,38 +243,45 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
243243
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)
244244

245245
case .sendRequestEnd(let trailers, let writePromise, let finalAction):
246-
247-
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
248246
// We need to defer succeeding the old request to avoid ordering issues
247+
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
248+
// It is fine to bang the request here, as we have just verified with the state machine
249+
// that the request is still ongoing.
250+
// TODO: In the future, we should likely move the request into the state machine to
251+
// prevent diverging state.
252+
let oldRequest = self.request!
253+
254+
switch finalAction {
255+
case .none:
256+
// we must not nil out the request here, as we are still uploading the request
257+
// and therefore still need the reference to it.
258+
break
259+
case .informConnectionIsIdle:
260+
self.request = nil
261+
case .close:
262+
self.request = nil
263+
}
249264

250265
writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
251-
guard let oldRequest = self.request else {
252-
// in the meantime an error might have happened, which is why this request is
253-
// not reference anymore.
254-
return
255-
}
256-
oldRequest.requestBodyStreamSent()
257266
switch result {
258267
case .success:
259268
// If our final action is not `none`, that means we've already received
260269
// the complete response. As a result, once we've uploaded all the body parts
261270
// we need to tell the pool that the connection is idle or, if we were asked to
262271
// close when we're done, send the close. Either way, we then succeed the request
263-
264272
switch finalAction {
265273
case .none:
266274
// we must not nil out the request here, as we are still uploading the request
267275
// and therefore still need the reference to it.
268276
break
269277

270278
case .informConnectionIsIdle:
271-
self.request = nil
272279
self.onConnectionIdle()
273280

274281
case .close:
275-
self.request = nil
276282
context.close(promise: nil)
277283
}
284+
oldRequest.requestBodyStreamSent()
278285

279286
case .failure(let error):
280287
context.close(promise: nil)

Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,6 +947,121 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
947947
)
948948
}
949949

950+
func testDemandResponseBodyStreamAfterEarlyResponseDoesNotCrash() async throws {
951+
// This test reproduces a crash where `demandMoreResponseBodyParts()` was called on the
952+
// state machine after it had already transitioned to `.idle`. The scenario is:
953+
//
954+
// 1. A streaming POST request is in progress
955+
// 2. The full response (head + end) arrives before the request body is finished
956+
// 3. The response end is forwarded with `finalAction: .none` (body still uploading)
957+
// 4. The request body stream finishes -> `requestStreamFinished` -> state machine
958+
// transitions to `.idle` and returns `.sendRequestEnd(.informConnectionIsIdle)`
959+
// 5. The `.sendRequestEnd` handler writes `.end` to the channel and registers a
960+
// callback on the write promise
961+
// 6. Before the write callback fires (write hasn't completed yet),
962+
// `demandResponseBodyStream` is called (from a delegate on another event loop)
963+
// 7. Without the fix, `self.request` was still set (only nilled in the write
964+
// callback), so the guard passed and `demandMoreResponseBodyParts()` hit
965+
// `fatalError("Invalid state: idle")`
966+
//
967+
// The fix nils out `self.request` synchronously in `.sendRequestEnd` (before the
968+
// write callback), so the guard in `demandResponseBodyStream0` fails and returns early.
969+
970+
final class DelayEndHandler: ChannelOutboundHandler {
971+
typealias OutboundIn = HTTPClientRequestPart
972+
typealias OutboundOut = HTTPClientRequestPart
973+
974+
private(set) var endPromise: EventLoopPromise<Void>?
975+
976+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
977+
if case .end = self.unwrapOutboundIn(data) {
978+
self.endPromise = promise
979+
context.write(data, promise: nil)
980+
} else {
981+
context.write(data, promise: promise)
982+
}
983+
}
984+
}
985+
986+
let eventLoop = EmbeddedEventLoop()
987+
let delayEndHandler = DelayEndHandler()
988+
let handler = HTTP1ClientChannelHandler(
989+
eventLoop: eventLoop,
990+
backgroundLogger: Logger(label: "no-op", factory: SwiftLogNoOpLogHandler.init),
991+
connectionIdLoggerMetadata: "test connection"
992+
)
993+
var connectionIsIdle = false
994+
handler.onConnectionIdle = { connectionIsIdle = true }
995+
let channel = EmbeddedChannel(handlers: [delayEndHandler, handler], loop: eventLoop)
996+
XCTAssertNoThrow(try channel.connect(to: .init(ipAddress: "127.0.0.1", port: 80)).wait())
997+
998+
let request = MockHTTPExecutableRequest(
999+
head: .init(version: .http1_1, method: .POST, uri: "http://localhost/"),
1000+
framingMetadata: RequestFramingMetadata(connectionClose: false, body: .stream),
1001+
raiseErrorIfUnimplementedMethodIsCalled: false
1002+
)
1003+
1004+
let executor = handler.requestExecutor
1005+
1006+
// When the body stream is resumed, write one part but do NOT finish the stream yet.
1007+
request.resumeRequestBodyStreamCallback = {
1008+
executor.writeRequestBodyPart(.byteBuffer(.init(string: "Hello")), request: request, promise: nil)
1009+
}
1010+
1011+
// Start the request
1012+
channel.write(request, promise: nil)
1013+
1014+
// Verify request head was sent
1015+
XCTAssertEqual(try channel.readOutbound(as: HTTPClientRequestPart.self), .head(request.requestHead))
1016+
// Verify body part was sent
1017+
XCTAssertEqual(
1018+
try channel.readOutbound(as: HTTPClientRequestPart.self),
1019+
.body(.byteBuffer(.init(string: "Hello")))
1020+
)
1021+
1022+
// Now send the full response while the request body stream is still open.
1023+
// This causes forwardResponseEnd with finalAction: .none (body not done yet).
1024+
XCTAssertNoThrow(try channel.writeInbound(HTTPClientResponsePart.head(.init(version: .http1_1, status: .ok))))
1025+
// Issue a read to advance the response stream state so it accepts the end properly.
1026+
channel.read()
1027+
XCTAssertNoThrow(try channel.writeInbound(HTTPClientResponsePart.end(nil)))
1028+
1029+
// Finish the request body stream. This transitions the state machine to `.idle`
1030+
// and writes `.end` to the channel. The DelayEndHandler intercepts the `.end`
1031+
// write and holds the promise, preventing the write callback from firing.
1032+
executor.finishRequestBodyStream(trailers: nil, request: request, promise: nil)
1033+
1034+
// Verify the .end was written through to the channel
1035+
XCTAssertEqual(try channel.readOutbound(as: HTTPClientRequestPart.self), .end(nil))
1036+
1037+
// At this point:
1038+
// - The state machine has transitioned to `.idle`
1039+
// - The write promise has NOT been fulfilled (held by DelayEndHandler)
1040+
// - In old code: self.request is still set (only nilled in the write callback)
1041+
// - In fixed code: self.request is already nil (nilled synchronously)
1042+
1043+
// Now call demandResponseBodyStream, simulating a delegate on a different event
1044+
// loop calling it after receiving the response end but before the write completes.
1045+
// Without the fix, self.request is still set, the guard passes, and
1046+
// state.demandMoreResponseBodyParts() crashes with "Invalid state: idle".
1047+
// With the fix, self.request was already nilled, the guard fails, and this is a no-op.
1048+
executor.demandResponseBodyStream(request)
1049+
1050+
// Complete the delayed write to clean up properly.
1051+
delayEndHandler.endPromise?.succeed(())
1052+
eventLoop.run()
1053+
1054+
XCTAssertTrue(connectionIsIdle)
1055+
1056+
XCTAssertEqual(
1057+
request.events.map(\.kind),
1058+
[
1059+
.willExecuteRequest, .requestHeadSent, .resumeRequestBodyStream,
1060+
.receiveResponseHead, .receiveResponseEnd, .requestBodySent,
1061+
]
1062+
)
1063+
}
1064+
9501065
func testDefaultMaxBufferSize() {
9511066
if MemoryLayout<Int>.size == 8 {
9521067
XCTAssertEqual(ResponseAccumulator.maxByteBufferSize, Int(UInt32.max))

0 commit comments

Comments
 (0)