diff --git a/server/connection_handler.cc b/server/connection_handler.cc index 50df1f02..d755c15b 100644 --- a/server/connection_handler.cc +++ b/server/connection_handler.cc @@ -570,9 +570,9 @@ void ConnectionHandler::OnMessage(){ // Data still being flushed — enable write mode to drain it. // CallWriteCb will ForceClose when the buffer empties. client_channel_->EnableWriteMode(); - } else if (callback_ran) { - // Callback ran but buffer is empty and connection not - // closed. Possible cases: + } else if (callback_ran && connect_state_ == ConnectState::NONE) { + // INBOUND only (connect_state_ == NONE). Callback ran but buffer + // is empty and connection not closed. Possible cases: // - Sync handler sent response, fast-path ForceClose'd // → is_closing_ == true (caught by outer guard). // - Async handler will send response later via @@ -585,14 +585,26 @@ void ConnectionHandler::OnMessage(){ // Arm a modest fallback deadline when nothing else has — // guarantees the timer callback eventually runs so the // connection can be torn down if the handler hangs, - // without closing a valid in-flight request up front. + // without closing a valid in-flight request up front. This + // half-close heuristic is meaningful only for inbound + // connections; an outbound peer read-close is a definitive + // end-of-stream and falls through to ForceClose below. if (!has_deadline_) { SetDeadline(std::chrono::steady_clock::now() + std::chrono::seconds(5)); } } else { - // No callback ran (EOF without any input this cycle and - // no handler in-flight) — nothing to wait for. + // No callback ran (EOF without any input this cycle and no + // handler in-flight), OR an OUTBOUND connection whose peer + // read-closed after delivering its response (response bytes and + // EOF coalesced into one read cycle). A peer close on an outbound + // connection is a definitive end-of-stream — close now so the + // close path delivers the EOF to the borrower (empty-bytes + // on_message_callback → upstream codec EOF finalization) + // immediately, instead of waiting out the inbound-only fallback + // deadline. This makes the coalesced read path behave identically + // to the split read path (which already reaches ForceClose with + // callback_ran == false). ForceClose(); } } diff --git a/test/upstream/proxy_test.h b/test/upstream/proxy_test.h index 04cedbfa..13620e70 100644 --- a/test/upstream/proxy_test.h +++ b/test/upstream/proxy_test.h @@ -4057,6 +4057,83 @@ void TestIntegrationBackoffDoesNotBlockOtherRequests() { } } +// End-to-end coverage for an EOF-delimited upstream response (no Content-Length, +// no Transfer-Encoding — body framed by connection close, as in HTTP/1.0 and +// Connection: close upstreams). buffering="always" withholds the downstream +// response until the upstream response completes on EOF, so this exercises the +// full outbound peer-close -> codec EOF finalization -> buffered relay path. +// Complements the deterministic issue #48 regression guard +// (UpstreamPoolTests::TestOutboundCoalescedPeerCloseClosesPromptly): when the +// upstream's body bytes and FIN coalesce into one read drain (frequent on Linux +// loopback), the pre-fix outbound half-close mishandling stalls EOF delivery and +// the buffered body never arrives within the receive budget below. +void TestIntegrationEofDelimitedResponseRelayed() { + std::cout << "\n[TEST] Integration: EOF-delimited (Connection: close) upstream response relayed..." << std::endl; + try { + const std::string kBody = "eof-delimited-body-issue-48"; + RawHttpBackendServer backend([kBody](int fd, const std::string&) { + // One write: status line + headers + body. No Content-Length and no + // Transfer-Encoding => the body is delimited by connection close. + // RawHttpBackendServer::Run() shutdown()s + close()s immediately + // after this returns, so the body bytes and the FIN frequently land + // in a single read drain on the proxy's outbound socket. + SendAll(fd, + "HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n" + "\r\n" + kBody); + }); + + ServerConfig gw_config; + gw_config.bind_host = "127.0.0.1"; + gw_config.bind_port = 0; + gw_config.worker_threads = 1; + gw_config.http2.enabled = false; + UpstreamConfig u = MakeProxyUpstreamConfig( + "backend", "127.0.0.1", backend.GetPort(), "/eof48"); + u.proxy.buffering = "always"; + gw_config.upstreams.push_back(u); + + HttpServer gateway(gw_config); + TestServerRunner gw_runner(gateway); + int gw_port = gw_runner.GetPort(); + + int client_fd = TestHttpClient::ConnectRawSocket(gw_port); + if (client_fd < 0) throw std::runtime_error("gateway connect failed"); + + if (!SendAll(client_fd, + "GET /eof48 HTTP/1.1\r\n" + "Host: localhost\r\n" + "Connection: close\r\n" + "\r\n")) { + close(client_fd); + throw std::runtime_error("gateway send failed"); + } + + // The fix finalizes on EOF and the gateway closes promptly; the bug + // stalls past this budget so the buffered body never arrives. + std::string full = RecvUntilClose(client_fd, 4000); + close(client_fd); + + bool pass = true; + std::string err; + if (!TestHttpClient::HasStatus(full, 200)) { + pass = false; err += "status not 200; "; + } + if (full.find(kBody) == std::string::npos) { + pass = false; err += "EOF-delimited body not delivered; "; + } + + TestFramework::RecordTest( + "Integration: EOF-delimited (Connection: close) upstream response relayed", + pass, err); + } catch (const std::exception& e) { + TestFramework::RecordTest( + "Integration: EOF-delimited (Connection: close) upstream response relayed", + false, e.what()); + } +} + // --------------------------------------------------------------------------- // RunAllTests // --------------------------------------------------------------------------- @@ -4136,6 +4213,7 @@ void RunAllTests() { TestIntegrationParameterizedSseAutoStreams(); TestIntegrationStreamIdleTimeoutAbortsRelay(); TestIntegrationDownstreamBackpressureSuspendsIdleTimeout(); + TestIntegrationEofDelimitedResponseRelayed(); TestBackpressureDrainThenDownstreamDies(); TestBackpressureMultiplePauseResumeCyclesWithEof(); TestBackpressureGatewayShutdownWhilePaused(); diff --git a/test/upstream/upstream_pool_test.h b/test/upstream/upstream_pool_test.h index 532c87c8..2651a590 100644 --- a/test/upstream/upstream_pool_test.h +++ b/test/upstream/upstream_pool_test.h @@ -866,6 +866,175 @@ void TestUpstreamConnectionQueuedResumeHonorsCallbackOwnership() { } } +// Build a non-blocking AF_UNIX socketpair, throwing on failure. Returns via out. +static void MakeNonBlockingSocketpair(int sv[2]) { + if (::socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0) { + throw std::runtime_error("socketpair failed"); + } + for (int i = 0; i < 2; ++i) { + int flags = fcntl(sv[i], F_GETFL, 0); + if (flags < 0 || fcntl(sv[i], F_SETFL, flags | O_NONBLOCK) < 0) { + ::close(sv[0]); + ::close(sv[1]); + sv[0] = -1; // prevent the caller's catch from re-closing (fd-reuse race) + sv[1] = -1; + throw std::runtime_error("failed to set non-blocking"); + } + } +} + +// Regression (issue #48): an OUTBOUND (upstream / client-role) connection whose +// peer delivers its response and then read-closes such that the response bytes +// and the EOF (read()==0) drain in the SAME OnMessage cycle must close PROMPTLY +// (ForceClose) so the close path delivers EOF to the borrower immediately — NOT +// arm the inbound HTTP/1 half-close 5s fallback deadline. Writing-then-shutdown +// before OnMessage runs leaves BOTH the bytes and the FIN buffered on sv[0], so +// the read loop deterministically reads the bytes then read()==0 in one cycle +// (callback_ran && peer_closed). connect_state_==CONNECTED must route to the +// ForceClose branch; pre-fix it took the deadline-arming branch and never closed. +void TestOutboundCoalescedPeerCloseClosesPromptly() { + std::cout << "\n[TEST] UpstreamPool: outbound coalesced peer-close closes promptly (issue #48)..." << std::endl; + int sv[2] = {-1, -1}; + try { + MakeNonBlockingSocketpair(sv); + + // Full EOF-delimited response (no Content-Length, no chunked), then + // half-close the write side so bytes + FIN are buffered together. + const std::string resp = + "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n" + "Connection: close\r\n\r\neof-delimited-body"; + if (::send(sv[1], resp.data(), resp.size(), 0) != + static_cast(resp.size())) { + throw std::runtime_error("send failed"); + } + ::shutdown(sv[1], SHUT_WR); // FIN -> sv[0]; buffered bytes still readable + + auto dispatcher = std::make_shared(); + auto sock = std::make_unique(sv[0], "127.0.0.1", 9999); + sv[0] = -1; // ownership transferred to SocketHandler/ConnectionHandler + auto conn = std::shared_ptr( + new ConnectionHandler(dispatcher, std::move(sock))); + + std::string delivered; + bool closed = false; + conn->SetOnMessageCb( + [&delivered](std::shared_ptr, std::string& message) { + if (!message.empty()) delivered = message; + }); + conn->SetCloseCb( + [&closed](std::shared_ptr) { closed = true; }); + + // Drive the outbound connect handshake to CONNECTED. On a socketpair + // FinishConnect's getsockopt(SO_ERROR) returns 0 (already connected). + conn->RegisterOutboundCallbacks(); // connect_state_ = CONNECTING + conn->CallWriteCb(); // EPOLLOUT -> FinishConnect -> CONNECTED + + // One read cycle: response bytes + EOF coalesced (both pre-buffered). + conn->OnMessage(); + dispatcher->ProcessPendingTasks(); // drain any enqueued close + + bool pass = true; + std::string err; + if (delivered != resp) { + pass = false; + err += "response not delivered to borrower (got " + + std::to_string(delivered.size()) + " bytes); "; + } + if (!closed) { + pass = false; + err += "outbound peer-close did not ForceClose promptly " + "(inbound 5s half-close deadline armed for an outbound conn); "; + } + + ::close(sv[1]); // sv[0] is owned + closed by the ConnectionHandler + sv[1] = -1; // prevent the catch from re-closing on a later throw + TestFramework::RecordTest( + "UpstreamPool: outbound coalesced peer-close closes promptly (issue #48)", + pass, err); + } catch (const std::exception& e) { + // Close whatever the test still owns. sv[0] is -1 once SocketHandler + // adopts it; sv[1] is -1 once closed above — so each fd closes exactly + // once across every path (no double-close, no leak). + if (sv[0] >= 0) ::close(sv[0]); + if (sv[1] >= 0) ::close(sv[1]); + TestFramework::RecordTest( + "UpstreamPool: outbound coalesced peer-close closes promptly (issue #48)", + false, e.what()); + } +} + +// Companion to the outbound test above: the SAME coalesced data+EOF read cycle on +// an INBOUND connection (connect_state_ == NONE, the default — no outbound connect +// driven) must NOT close immediately. Inbound peers may half-close the write side +// while awaiting the response, so the half-close heuristic arms the bounded +// fallback deadline instead of force-closing the in-flight request. This locks in +// the inbound contract the issue #48 gate must preserve. +void TestInboundCoalescedHalfCloseDoesNotClosePromptly() { + std::cout << "\n[TEST] UpstreamPool: inbound coalesced half-close keeps conn open (issue #48 guard)..." << std::endl; + int sv[2] = {-1, -1}; + try { + MakeNonBlockingSocketpair(sv); + + const std::string req = + "GET /x HTTP/1.1\r\nHost: localhost\r\n\r\n"; + if (::send(sv[1], req.data(), req.size(), 0) != + static_cast(req.size())) { + throw std::runtime_error("send failed"); + } + ::shutdown(sv[1], SHUT_WR); // client half-closes write side + + auto dispatcher = std::make_shared(); + auto sock = std::make_unique(sv[0], "127.0.0.1", 9999); + sv[0] = -1; // ownership transferred to SocketHandler/ConnectionHandler + auto conn = std::shared_ptr( + new ConnectionHandler(dispatcher, std::move(sock))); + // connect_state_ stays NONE (inbound): no RegisterOutboundCallbacks / + // CallWriteCb. RegisterCallbacks wires the channel for inbound role. + conn->RegisterCallbacks(); + + std::string delivered; + bool closed = false; + conn->SetOnMessageCb( + [&delivered](std::shared_ptr, std::string& message) { + if (!message.empty()) delivered = message; + }); + conn->SetCloseCb( + [&closed](std::shared_ptr) { closed = true; }); + + conn->OnMessage(); // request bytes + EOF coalesced + dispatcher->ProcessPendingTasks(); + + bool pass = true; + std::string err; + if (delivered != req) { + pass = false; + err += "inbound request not delivered to handler; "; + } + if (closed) { + pass = false; + err += "inbound half-close force-closed the in-flight request " + "instead of arming the fallback deadline; "; + } + + conn->ForceClose(); // tear down the test connection + dispatcher->ProcessPendingTasks(); + ::close(sv[1]); + sv[1] = -1; // prevent the catch from re-closing on a later throw + TestFramework::RecordTest( + "UpstreamPool: inbound coalesced half-close keeps conn open (issue #48 guard)", + pass, err); + } catch (const std::exception& e) { + // Close whatever the test still owns. sv[0] is -1 once SocketHandler + // adopts it; sv[1] is -1 once closed above — so each fd closes exactly + // once across every path (no double-close, no leak). + if (sv[0] >= 0) ::close(sv[0]); + if (sv[1] >= 0) ::close(sv[1]); + TestFramework::RecordTest( + "UpstreamPool: inbound coalesced half-close keeps conn open (issue #48 guard)", + false, e.what()); + } +} + // --------------------------------------------------------------------------- // Section 4: UpstreamLease RAII tests // @@ -2579,6 +2748,8 @@ void RunAllTests() { TestUpstreamConnectionRequestCount(); TestUpstreamConnectionReadPauseResumesBufferedData(); TestUpstreamConnectionQueuedResumeHonorsCallbackOwnership(); + TestOutboundCoalescedPeerCloseClosesPromptly(); + TestInboundCoalescedHalfCloseDoesNotClosePromptly(); // Section 4: UpstreamLease RAII TestUpstreamLeaseEmptyDefault();