From 5a2c958023df3b534210d024a88df1413b01f531 Mon Sep 17 00:00:00 2001 From: mwfj Date: Thu, 28 May 2026 10:20:35 +0800 Subject: [PATCH 1/2] Fix observability unit test flaky issue --- server/http/http_server.cc | 22 +++++--- .../observability_connection_metrics_test.h | 50 +++++++++++++------ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/server/http/http_server.cc b/server/http/http_server.cc index e4d80f6b..f44f74ef 100644 --- a/server/http/http_server.cc +++ b/server/http/http_server.cc @@ -4603,20 +4603,28 @@ void HttpServer::SafeNotifyWsClose(const std::shared_ptr& } void HttpServer::HandleNewConnection(std::shared_ptr conn) { + // Wire transport-level observability counters BEFORE the fast-close + // early-exit below. The connection has been accepted by the kernel + // (NetServer::HandleNewConnection just returned from accept()), so the + // `reactor.net.connections.accepted` Counter must reflect it regardless + // of whether the peer races us to close. The previous ordering — early- + // exit first, attach second — silently dropped the +1 in the window + // where the socket worker dispatcher fired CallCloseCb between + // RegisterCallbacks (epoll_ctl_add) and this callback. The matching + // `reactor.net.connections.active` -1 still fires from ~ConnectionHandler + // gated on `net_active_incremented_`, preserving gauge balance. + conn->AttachTransportObservability(observability_manager_.get()); + // Guard: if the connection already closed (fast disconnect between // RegisterCallbacks enabling epoll and new_conn_callback running here), - // skip entirely. Inserting a handler for a closed connection would leave - // stale state in http_connections_ (potentially under fd -1 after ReleaseFd). + // skip the protocol-detection setup. Inserting a handler for a closed + // connection would leave stale state in http_connections_ (potentially + // under fd -1 after ReleaseFd). if (conn->IsClosing()) { logging::Get()->debug("New connection already closing fd={}, skipping", conn->fd()); return; } - // Wire transport-level observability counters. Idempotent and a no-op - // when no manager is installed; the matching -1 against - // reactor.net.connections.active fires from ~ConnectionHandler. - conn->AttachTransportObservability(observability_manager_.get()); - // NOTE: total_accepted_ and active_connections_ are NOT incremented here. // They are incremented at map-insertion points (pending_detection_ in this // method, http_connections_ in the http2_disabled path, or h2_connections_/ diff --git a/test/observability/observability_connection_metrics_test.h b/test/observability/observability_connection_metrics_test.h index 218cf70c..e9e6f52d 100644 --- a/test/observability/observability_connection_metrics_test.h +++ b/test/observability/observability_connection_metrics_test.h @@ -481,16 +481,22 @@ inline void TestNetAcceptedIsMonotonic() { "reactor.net.connections.accepted"); constexpr int N = 5; - // Send a real HTTP request and drain the response per connection. - // A bare connect+close races the accept dispatcher on slow CI - // runners — the RST can arrive before the accept callback fires, - // losing the +1 on `reactor.net.connections.accepted`. Driving a - // full request/response cycle forces the accept to complete - // before the close. + // Per-iteration polling: drive each connection through a full + // request/response cycle, then wait for ITS accept-counter +1 to + // land BEFORE opening the next. A batch-style "open all N, then + // poll once" loop loses races on heavily contended CI runners — + // ConnectTcp returns once the kernel's SYN/ACK handshake is done, + // but the userspace accept() that bumps the counter runs on a + // dispatcher thread that can be CPU-starved for seconds while + // sibling sanitizer jobs hog the runner. Per-iteration polling + // serializes accepts, adapts to whatever dispatcher cadence the + // runner allows, and pins any failure to the specific iteration + // whose accept never landed within its per-iter budget. const std::string req = "GET /h HTTP/1.1\r\n" "Host: localhost\r\n" "Connection: close\r\n\r\n"; + int timed_out_iter = -1; for (int i = 0; i < N; ++i) { int fd = ConnectTcp(port); if (fd < 0) { @@ -508,19 +514,31 @@ inline void TestNetAcceptedIsMonotonic() { } (void)DrainSocket(fd, 500); ::close(fd); - } - // Poll for the counter to reach N — the accept-time bump runs on - // the dispatcher thread, so even after the response drains there - // is a small visibility gap before the Snapshot picks it up. - { - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - while (std::chrono::steady_clock::now() < deadline) { + + // Wait for this connection's +1 before starting the next one. + // 10 s per-iteration budget covers worst-case starvation on + // heavily contended shared runners (multiple sanitizer jobs + + // parallel test shards on a 2-4 core box); normal runs exit + // this loop in tens of milliseconds. + auto iter_deadline = std::chrono::steady_clock::now() + + std::chrono::seconds(10); + bool seen = false; + while (std::chrono::steady_clock::now() < iter_deadline) { auto snap_poll = fix.manager->meter_provider()->Snapshot(); double delta_poll = SumCounter(snap_poll, "reactor.net.connections.accepted") - accepted_before; - if (delta_poll >= static_cast(N)) break; + if (delta_poll >= static_cast(i + 1)) { + seen = true; + break; + } std::this_thread::sleep_for(std::chrono::milliseconds(20)); } + if (!seen) { + // Let the final assertion run so the recorded delta and + // the timed-out iteration index both reach the failure log. + timed_out_iter = i; + break; + } } auto snap_after = fix.manager->meter_provider()->Snapshot(); @@ -533,6 +551,10 @@ inline void TestNetAcceptedIsMonotonic() { if (!pass) { err = "delta=" + std::to_string(accepted_delta) + " expected>=" + std::to_string(N); + if (timed_out_iter >= 0) { + err += " (per-iter poll timed out at i=" + + std::to_string(timed_out_iter) + ")"; + } } TestFramework::RecordTest(TAG, pass, err, TestFramework::TestCategory::OTHER); From 960ef79566285b54c54eb337a6adfd1167f46ae1 Mon Sep 17 00:00:00 2001 From: mwfj Date: Thu, 28 May 2026 12:12:19 +0800 Subject: [PATCH 2/2] Fix review comment --- include/connection_handler.h | 18 +++++++++++++--- server/connection_handler.cc | 42 ++++++++++++++++++++++++++---------- server/http/http_server.cc | 29 ++++++++++++++++--------- server/net_server.cc | 9 ++++++++ test/cli/cli_test.h | 30 +++++++++++++++----------- 5 files changed, 92 insertions(+), 36 deletions(-) diff --git a/include/connection_handler.h b/include/connection_handler.h index d2174902..4e9f1f80 100644 --- a/include/connection_handler.h +++ b/include/connection_handler.h @@ -135,6 +135,10 @@ class ConnectionHandler : public std::enable_shared_from_this // completion path) acquire-load before touching the pointers. std::atomic obs_attached_{false}; bool net_active_incremented_ = false; + // Separate latch for the monotonic accepted Counter — bumped by + // RecordAcceptedConnection before the fast-close guard, independent of + // the net_active_incremented_ latch that gates the active gauge +1/-1. + bool net_accepted_incremented_ = false; OBSERVABILITY_NAMESPACE::UpDownCounter* net_active_counter_ = nullptr; OBSERVABILITY_NAMESPACE::Counter* net_accepted_counter_ = nullptr; // Null when no application protocol has been confirmed yet. Holds @@ -319,14 +323,22 @@ class ConnectionHandler : public std::enable_shared_from_this bool IsTimeOut(std::chrono::seconds) const; + // Bumps the monotonic `reactor.net.connections.accepted` Counter +1 on + // first call (idempotent). Called before the fast-close guard in + // HttpServer::HandleNewConnection so a connection the kernel accepted is + // always counted, even when it closes before protocol setup. Never emits + // the active gauge — that is AttachTransportObservability's job, gated on + // the connection surviving the guard. + void RecordAcceptedConnection(OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr); // Transport-level observability wiring. Idempotent — second calls // are silently ignored so accept paths that retry stay safe. Caches // catalog instrument pointers under the assumption that the // ObservabilityManager outlives this connection (every ConnectionHandler // is destroyed before the ObservabilityManager via the documented - // four-phase shutdown). - // Bumps `reactor.net.connections.active` +1 and `reactor.net.connections.accepted` +1 on first call; - // the matching -1 against the active gauge fires from ~ConnectionHandler. + // four-phase shutdown). + // Bumps `reactor.net.connections.active` +1 on first call; the matching + // -1 against the active gauge fires from ~ConnectionHandler. The accepted + // Counter is emitted separately by RecordAcceptedConnection. void AttachTransportObservability(OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr); // Called once when the L7 protocol becomes known (HTTP/1.1 after // first parse, HTTP/2 after preface accept). diff --git a/server/connection_handler.cc b/server/connection_handler.cc index 44837eb7..50df1f02 100644 --- a/server/connection_handler.cc +++ b/server/connection_handler.cc @@ -20,9 +20,10 @@ ConnectionHandler::ConnectionHandler(std::shared_ptr _dispatcher, st // Out-of-line destructor: unique_ptr requires complete type. // TlsConnection is forward-declared in the header; full definition is available here. ConnectionHandler::~ConnectionHandler() { - // Symmetric decrements for the accept-time +1 and any - // protocol-confirmed +1 still in flight. Run unconditionally — the - // latch flag protects against double-decrement, and operating on a + // Symmetric -1 for the net.connections.active +1 and any + // protocol-confirmed +1 still in flight. The monotonic accepted + // Counter (RecordAcceptedConnection) has no -1. Run unconditionally — + // the latch flag protects against double-decrement, and operating on a // cached instrument pointer keeps the dtor independent of whether // the manager's catalog has been swapped since AttachTransportObservability. if (net_active_incremented_ && net_active_counter_ != nullptr) { @@ -34,23 +35,43 @@ ConnectionHandler::~ConnectionHandler() { } } +void ConnectionHandler::RecordAcceptedConnection( + OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr) { + if (mgr == nullptr) return; + // Monotonic Counter — bumped once per accepted connection, never + // decremented. Kept separate from the active-gauge publish in + // AttachTransportObservability so this fires BEFORE the fast-close + // guard in HttpServer::HandleNewConnection (the connection was + // genuinely accepted by the kernel) WITHOUT publishing + // net.connections.active for a connection that may never enter + // http_connections_ — an active +1 there would be balanced only by + // ~ConnectionHandler, which the timer-map reap can defer, surfacing a + // phantom active connection on /metrics. + if (net_accepted_incremented_) return; + net_accepted_incremented_ = true; + net_accepted_counter_ = mgr->catalog().reactor_net_connections_accepted; + if (net_accepted_counter_ != nullptr) net_accepted_counter_->Add(1.0, {}); +} + void ConnectionHandler::AttachTransportObservability( OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr) { if (mgr == nullptr) return; // First-call work: cache instrument pointers, publish via - // obs_attached_=true, and emit the accept-time +1s. Idempotent — - // a defensive second call short-circuits this block but still - // runs the stash drain below in case a worker stashed between - // the first and second invocation. + // obs_attached_=true, and emit the net.connections.active +1. + // Idempotent — a defensive second call short-circuits this block but + // still runs the stash drain below in case a worker stashed between + // the first and second invocation. The accepted Counter is emitted + // separately by RecordAcceptedConnection; this method runs only for + // connections that survive the fast-close guard, so the active gauge + // is never published for a connection that won't be tracked. if (!net_active_incremented_) { net_active_incremented_ = true; // set before side effects (mirrors bound_once_ pattern) const auto& cat = mgr->catalog(); net_active_counter_ = cat.reactor_net_connections_active; - net_accepted_counter_ = cat.reactor_net_connections_accepted; http_active_counter_ = cat.reactor_http_connections_active; tls_handshakes_counter_ = cat.reactor_tls_handshakes; - // Release-store: publishes the four preceding pointer writes to any + // Release-store: publishes the three preceding pointer writes to any // socket-worker thread that later acquire-loads obs_attached_. Without // this, the cross-thread happens-before edge from NetServer's EnQueue // runs the wrong way (EnQueue happens BEFORE AttachTransportObservability @@ -58,8 +79,7 @@ void ConnectionHandler::AttachTransportObservability( // http_active_counter_ races with this write. Plain bool writes / reads // would be TSan-flagged even if benign on x86. obs_attached_.store(true, std::memory_order_release); - if (net_active_counter_ != nullptr) net_active_counter_->Add(1.0, {}); - if (net_accepted_counter_ != nullptr) net_accepted_counter_->Add(1.0, {}); + if (net_active_counter_ != nullptr) net_active_counter_->Add(1.0, {}); } // Drain any protocol label the worker stashed before obs_attached_ diff --git a/server/http/http_server.cc b/server/http/http_server.cc index f44f74ef..70844b7c 100644 --- a/server/http/http_server.cc +++ b/server/http/http_server.cc @@ -4603,28 +4603,37 @@ void HttpServer::SafeNotifyWsClose(const std::shared_ptr& } void HttpServer::HandleNewConnection(std::shared_ptr conn) { - // Wire transport-level observability counters BEFORE the fast-close - // early-exit below. The connection has been accepted by the kernel - // (NetServer::HandleNewConnection just returned from accept()), so the + // Record the accept-time Counter bump BEFORE the fast-close early-exit. + // The connection has been accepted by the kernel (NetServer:: + // HandleNewConnection just returned from accept()), so the monotonic // `reactor.net.connections.accepted` Counter must reflect it regardless // of whether the peer races us to close. The previous ordering — early- // exit first, attach second — silently dropped the +1 in the window // where the socket worker dispatcher fired CallCloseCb between - // RegisterCallbacks (epoll_ctl_add) and this callback. The matching - // `reactor.net.connections.active` -1 still fires from ~ConnectionHandler - // gated on `net_active_incremented_`, preserving gauge balance. - conn->AttachTransportObservability(observability_manager_.get()); + // RegisterCallbacks (epoll_ctl_add) and this callback. This Counter has + // no symmetric -1, so emitting it before the guard is safe. + conn->RecordAcceptedConnection(observability_manager_.get()); // Guard: if the connection already closed (fast disconnect between // RegisterCallbacks enabling epoll and new_conn_callback running here), - // skip the protocol-detection setup. Inserting a handler for a closed - // connection would leave stale state in http_connections_ (potentially - // under fd -1 after ReleaseFd). + // skip the protocol-detection setup AND the active-gauge publish below. + // Inserting a handler for a closed connection would leave stale state in + // http_connections_ (potentially under fd -1 after ReleaseFd); publishing + // `reactor.net.connections.active` for a connection that never enters + // http_connections_ would leave the +1 balanced only by ~ConnectionHandler, + // which the timer-map reap can defer if the queued AddConnection races + // ahead of the close-path RemoveTimerConnectionIfMatch — surfacing a + // phantom active connection on /metrics until the idle scan reclaims it. if (conn->IsClosing()) { logging::Get()->debug("New connection already closing fd={}, skipping", conn->fd()); return; } + // Wire transport-level + protocol observability for the surviving + // connection. Idempotent; the matching `reactor.net.connections.active` + // -1 fires from ~ConnectionHandler gated on `net_active_incremented_`. + conn->AttachTransportObservability(observability_manager_.get()); + // NOTE: total_accepted_ and active_connections_ are NOT incremented here. // They are incremented at map-insertion points (pending_detection_ in this // method, http_connections_ in the http2_disabled path, or h2_connections_/ diff --git a/server/net_server.cc b/server/net_server.cc index dbfa411c..9bda04d3 100644 --- a/server/net_server.cc +++ b/server/net_server.cc @@ -460,6 +460,15 @@ void NetServer::HandleNewConnection(std::unique_ptr cilent_sock){ auto dispatcher = socket_dispatchers_[idx]; dispatcher->EnQueue([weak_conn, dispatcher]() { if (auto c = weak_conn.lock()) { + // Skip timer registration for a connection that already + // started closing. The close path EnQueues + // RemoveTimerConnectionIfMatch, which no-ops on a + // not-yet-registered conn; if that removal lost the race and + // ran before this AddConnection, registering now would strand + // the closed handler in the timer map until the idle scan + // (or, with idle timeout disabled, until shutdown). is_closing_ + // is a one-way latch, so a closing conn never needs scanning. + if (c->IsClosing()) return; dispatcher->AddConnection(c); } }); diff --git a/test/cli/cli_test.h b/test/cli/cli_test.h index 8905fa33..0d75ede3 100644 --- a/test/cli/cli_test.h +++ b/test/cli/cli_test.h @@ -2749,21 +2749,27 @@ void TestReloadAcceptsPidFile() { void TestHelpIncludesReload() { std::cout << "\n[TEST] CliParser: help text includes 'reload'..." << std::endl; - // Run reactor_server --help (or help subcommand) and capture output. - // We redirect stdout to a pipe so we can inspect it. - std::string cmd = "./server_runner help 2>&1"; - FILE* fp = popen(cmd.c_str(), "r"); - std::string output; - if (fp) { - char buf[256]; - while (fgets(buf, sizeof(buf), fp)) output += buf; - pclose(fp); - } - + // Call CliParser::PrintUsage() directly and capture std::cout in-process. + // This exercises the SAME usage text the `help` subcommand prints, without + // shelling out to ./server_runner — the old popen() form coupled this unit + // test to a separately-built binary's presence, the current working + // directory, and that binary's freshness, so it spuriously failed whenever + // only test_runner had been rebuilt (the help text itself was never broken). + std::ostringstream captured; + std::streambuf* prev = std::cout.rdbuf(captured.rdbuf()); + try { + CliParser::PrintUsage("reactor_server"); + } catch (...) { + std::cout.rdbuf(prev); // restore before rethrow so failures still print + throw; + } + std::cout.rdbuf(prev); // restore before RecordTest emits to the real stdout + + const std::string output = captured.str(); bool has_reload = (output.find("reload") != std::string::npos); TestFramework::RecordTest("CliParser: help text includes 'reload'", has_reload, - has_reload ? "" : "Help output does not mention 'reload'", + has_reload ? "" : "PrintUsage output does not mention 'reload'", CLI_CATEGORY); }