Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions server/connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,9 @@ void ConnectionHandler::OnMessage(){
// Data still being flushed — enable write mode to drain it.
// CallWriteCb will ForceClose when the buffer empties.
client_channel_->EnableWriteMode();
} else if (callback_ran) {
// Callback ran but buffer is empty and connection not
// closed. Possible cases:
} else if (callback_ran && connect_state_ == ConnectState::NONE) {
// INBOUND only (connect_state_ == NONE). Callback ran but buffer
// is empty and connection not closed. Possible cases:
// - Sync handler sent response, fast-path ForceClose'd
// → is_closing_ == true (caught by outer guard).
// - Async handler will send response later via
Expand All @@ -585,14 +585,26 @@ void ConnectionHandler::OnMessage(){
// Arm a modest fallback deadline when nothing else has —
// guarantees the timer callback eventually runs so the
// connection can be torn down if the handler hangs,
// without closing a valid in-flight request up front.
// without closing a valid in-flight request up front. This
// half-close heuristic is meaningful only for inbound
// connections; an outbound peer read-close is a definitive
// end-of-stream and falls through to ForceClose below.
if (!has_deadline_) {
SetDeadline(std::chrono::steady_clock::now() +
std::chrono::seconds(5));
}
} else {
// No callback ran (EOF without any input this cycle and
// no handler in-flight) — nothing to wait for.
// No callback ran (EOF without any input this cycle and no
// handler in-flight), OR an OUTBOUND connection whose peer
// read-closed after delivering its response (response bytes and
// EOF coalesced into one read cycle). A peer close on an outbound
// connection is a definitive end-of-stream — close now so the
// close path delivers the EOF to the borrower (empty-bytes
// on_message_callback → upstream codec EOF finalization)
// immediately, instead of waiting out the inbound-only fallback
// deadline. This makes the coalesced read path behave identically
// to the split read path (which already reaches ForceClose with
// callback_ran == false).
ForceClose();
}
}
Expand Down
78 changes: 78 additions & 0 deletions test/upstream/proxy_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -4057,6 +4057,83 @@ void TestIntegrationBackoffDoesNotBlockOtherRequests() {
}
}

// End-to-end coverage for an EOF-delimited upstream response (no Content-Length,
// no Transfer-Encoding — body framed by connection close, as in HTTP/1.0 and
// Connection: close upstreams). buffering="always" withholds the downstream
// response until the upstream response completes on EOF, so this exercises the
// full outbound peer-close -> codec EOF finalization -> buffered relay path.
// Complements the deterministic issue #48 regression guard
// (UpstreamPoolTests::TestOutboundCoalescedPeerCloseClosesPromptly): when the
// upstream's body bytes and FIN coalesce into one read drain (frequent on Linux
// loopback), the pre-fix outbound half-close mishandling stalls EOF delivery and
// the buffered body never arrives within the receive budget below.
void TestIntegrationEofDelimitedResponseRelayed() {
std::cout << "\n[TEST] Integration: EOF-delimited (Connection: close) upstream response relayed..." << std::endl;
try {
const std::string kBody = "eof-delimited-body-issue-48";
RawHttpBackendServer backend([kBody](int fd, const std::string&) {
// One write: status line + headers + body. No Content-Length and no
// Transfer-Encoding => the body is delimited by connection close.
// RawHttpBackendServer::Run() shutdown()s + close()s immediately
// after this returns, so the body bytes and the FIN frequently land
// in a single read drain on the proxy's outbound socket.
SendAll(fd,
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n" + kBody);
});

ServerConfig gw_config;
gw_config.bind_host = "127.0.0.1";
gw_config.bind_port = 0;
gw_config.worker_threads = 1;
gw_config.http2.enabled = false;
UpstreamConfig u = MakeProxyUpstreamConfig(
"backend", "127.0.0.1", backend.GetPort(), "/eof48");
u.proxy.buffering = "always";
gw_config.upstreams.push_back(u);

HttpServer gateway(gw_config);
TestServerRunner<HttpServer> gw_runner(gateway);
int gw_port = gw_runner.GetPort();

int client_fd = TestHttpClient::ConnectRawSocket(gw_port);
if (client_fd < 0) throw std::runtime_error("gateway connect failed");

if (!SendAll(client_fd,
"GET /eof48 HTTP/1.1\r\n"
"Host: localhost\r\n"
"Connection: close\r\n"
"\r\n")) {
close(client_fd);
throw std::runtime_error("gateway send failed");
}

// The fix finalizes on EOF and the gateway closes promptly; the bug
// stalls past this budget so the buffered body never arrives.
std::string full = RecvUntilClose(client_fd, 4000);
close(client_fd);

bool pass = true;
std::string err;
if (!TestHttpClient::HasStatus(full, 200)) {
pass = false; err += "status not 200; ";
}
if (full.find(kBody) == std::string::npos) {
pass = false; err += "EOF-delimited body not delivered; ";
}

TestFramework::RecordTest(
"Integration: EOF-delimited (Connection: close) upstream response relayed",
pass, err);
} catch (const std::exception& e) {
TestFramework::RecordTest(
"Integration: EOF-delimited (Connection: close) upstream response relayed",
false, e.what());
}
}

// ---------------------------------------------------------------------------
// RunAllTests
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -4136,6 +4213,7 @@ void RunAllTests() {
TestIntegrationParameterizedSseAutoStreams();
TestIntegrationStreamIdleTimeoutAbortsRelay();
TestIntegrationDownstreamBackpressureSuspendsIdleTimeout();
TestIntegrationEofDelimitedResponseRelayed();
TestBackpressureDrainThenDownstreamDies();
TestBackpressureMultiplePauseResumeCyclesWithEof();
TestBackpressureGatewayShutdownWhilePaused();
Expand Down
171 changes: 171 additions & 0 deletions test/upstream/upstream_pool_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,175 @@ void TestUpstreamConnectionQueuedResumeHonorsCallbackOwnership() {
}
}

// Build a non-blocking AF_UNIX socketpair, throwing on failure. Returns via out.
static void MakeNonBlockingSocketpair(int sv[2]) {
if (::socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0) {
throw std::runtime_error("socketpair failed");
}
for (int i = 0; i < 2; ++i) {
int flags = fcntl(sv[i], F_GETFL, 0);
if (flags < 0 || fcntl(sv[i], F_SETFL, flags | O_NONBLOCK) < 0) {
::close(sv[0]);
::close(sv[1]);
sv[0] = -1; // prevent the caller's catch from re-closing (fd-reuse race)
sv[1] = -1;
throw std::runtime_error("failed to set non-blocking");
}
Comment thread
mwfj marked this conversation as resolved.
}
}

// Regression (issue #48): an OUTBOUND (upstream / client-role) connection whose
// peer delivers its response and then read-closes such that the response bytes
// and the EOF (read()==0) drain in the SAME OnMessage cycle must close PROMPTLY
// (ForceClose) so the close path delivers EOF to the borrower immediately — NOT
// arm the inbound HTTP/1 half-close 5s fallback deadline. Writing-then-shutdown
// before OnMessage runs leaves BOTH the bytes and the FIN buffered on sv[0], so
// the read loop deterministically reads the bytes then read()==0 in one cycle
// (callback_ran && peer_closed). connect_state_==CONNECTED must route to the
// ForceClose branch; pre-fix it took the deadline-arming branch and never closed.
void TestOutboundCoalescedPeerCloseClosesPromptly() {
std::cout << "\n[TEST] UpstreamPool: outbound coalesced peer-close closes promptly (issue #48)..." << std::endl;
int sv[2] = {-1, -1};
try {
MakeNonBlockingSocketpair(sv);

// Full EOF-delimited response (no Content-Length, no chunked), then
// half-close the write side so bytes + FIN are buffered together.
const std::string resp =
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n"
"Connection: close\r\n\r\neof-delimited-body";
if (::send(sv[1], resp.data(), resp.size(), 0) !=
static_cast<ssize_t>(resp.size())) {
throw std::runtime_error("send failed");
}
::shutdown(sv[1], SHUT_WR); // FIN -> sv[0]; buffered bytes still readable

auto dispatcher = std::make_shared<Dispatcher>();
auto sock = std::make_unique<SocketHandler>(sv[0], "127.0.0.1", 9999);
sv[0] = -1; // ownership transferred to SocketHandler/ConnectionHandler
auto conn = std::shared_ptr<ConnectionHandler>(
new ConnectionHandler(dispatcher, std::move(sock)));

std::string delivered;
bool closed = false;
conn->SetOnMessageCb(
[&delivered](std::shared_ptr<ConnectionHandler>, std::string& message) {
if (!message.empty()) delivered = message;
});
conn->SetCloseCb(
[&closed](std::shared_ptr<ConnectionHandler>) { closed = true; });

// Drive the outbound connect handshake to CONNECTED. On a socketpair
// FinishConnect's getsockopt(SO_ERROR) returns 0 (already connected).
conn->RegisterOutboundCallbacks(); // connect_state_ = CONNECTING
conn->CallWriteCb(); // EPOLLOUT -> FinishConnect -> CONNECTED

// One read cycle: response bytes + EOF coalesced (both pre-buffered).
conn->OnMessage();
dispatcher->ProcessPendingTasks(); // drain any enqueued close

bool pass = true;
std::string err;
if (delivered != resp) {
pass = false;
err += "response not delivered to borrower (got " +
std::to_string(delivered.size()) + " bytes); ";
}
if (!closed) {
pass = false;
err += "outbound peer-close did not ForceClose promptly "
"(inbound 5s half-close deadline armed for an outbound conn); ";
}

::close(sv[1]); // sv[0] is owned + closed by the ConnectionHandler
Comment thread
mwfj marked this conversation as resolved.
sv[1] = -1; // prevent the catch from re-closing on a later throw
TestFramework::RecordTest(
"UpstreamPool: outbound coalesced peer-close closes promptly (issue #48)",
pass, err);
} catch (const std::exception& e) {
// Close whatever the test still owns. sv[0] is -1 once SocketHandler
// adopts it; sv[1] is -1 once closed above — so each fd closes exactly
// once across every path (no double-close, no leak).
if (sv[0] >= 0) ::close(sv[0]);
if (sv[1] >= 0) ::close(sv[1]);
TestFramework::RecordTest(
"UpstreamPool: outbound coalesced peer-close closes promptly (issue #48)",
false, e.what());
}
}

// Companion to the outbound test above: the SAME coalesced data+EOF read cycle on
// an INBOUND connection (connect_state_ == NONE, the default — no outbound connect
// driven) must NOT close immediately. Inbound peers may half-close the write side
// while awaiting the response, so the half-close heuristic arms the bounded
// fallback deadline instead of force-closing the in-flight request. This locks in
// the inbound contract the issue #48 gate must preserve.
void TestInboundCoalescedHalfCloseDoesNotClosePromptly() {
std::cout << "\n[TEST] UpstreamPool: inbound coalesced half-close keeps conn open (issue #48 guard)..." << std::endl;
int sv[2] = {-1, -1};
try {
MakeNonBlockingSocketpair(sv);

const std::string req =
"GET /x HTTP/1.1\r\nHost: localhost\r\n\r\n";
if (::send(sv[1], req.data(), req.size(), 0) !=
static_cast<ssize_t>(req.size())) {
throw std::runtime_error("send failed");
}
::shutdown(sv[1], SHUT_WR); // client half-closes write side

auto dispatcher = std::make_shared<Dispatcher>();
auto sock = std::make_unique<SocketHandler>(sv[0], "127.0.0.1", 9999);
sv[0] = -1; // ownership transferred to SocketHandler/ConnectionHandler
auto conn = std::shared_ptr<ConnectionHandler>(
new ConnectionHandler(dispatcher, std::move(sock)));
// connect_state_ stays NONE (inbound): no RegisterOutboundCallbacks /
// CallWriteCb. RegisterCallbacks wires the channel for inbound role.
conn->RegisterCallbacks();

std::string delivered;
bool closed = false;
conn->SetOnMessageCb(
[&delivered](std::shared_ptr<ConnectionHandler>, std::string& message) {
if (!message.empty()) delivered = message;
});
conn->SetCloseCb(
[&closed](std::shared_ptr<ConnectionHandler>) { closed = true; });

conn->OnMessage(); // request bytes + EOF coalesced
dispatcher->ProcessPendingTasks();

bool pass = true;
std::string err;
if (delivered != req) {
pass = false;
err += "inbound request not delivered to handler; ";
}
if (closed) {
pass = false;
err += "inbound half-close force-closed the in-flight request "
"instead of arming the fallback deadline; ";
}

conn->ForceClose(); // tear down the test connection
dispatcher->ProcessPendingTasks();
::close(sv[1]);
Comment thread
mwfj marked this conversation as resolved.
sv[1] = -1; // prevent the catch from re-closing on a later throw
TestFramework::RecordTest(
"UpstreamPool: inbound coalesced half-close keeps conn open (issue #48 guard)",
pass, err);
} catch (const std::exception& e) {
// Close whatever the test still owns. sv[0] is -1 once SocketHandler
// adopts it; sv[1] is -1 once closed above — so each fd closes exactly
// once across every path (no double-close, no leak).
if (sv[0] >= 0) ::close(sv[0]);
if (sv[1] >= 0) ::close(sv[1]);
TestFramework::RecordTest(
"UpstreamPool: inbound coalesced half-close keeps conn open (issue #48 guard)",
false, e.what());
}
}

// ---------------------------------------------------------------------------
// Section 4: UpstreamLease RAII tests
//
Expand Down Expand Up @@ -2579,6 +2748,8 @@ void RunAllTests() {
TestUpstreamConnectionRequestCount();
TestUpstreamConnectionReadPauseResumesBufferedData();
TestUpstreamConnectionQueuedResumeHonorsCallbackOwnership();
TestOutboundCoalescedPeerCloseClosesPromptly();
TestInboundCoalescedHalfCloseDoesNotClosePromptly();

// Section 4: UpstreamLease RAII
TestUpstreamLeaseEmptyDefault();
Expand Down
Loading