Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions include/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ class ConnectionHandler : public std::enable_shared_from_this<ConnectionHandler>
// completion path) acquire-load before touching the pointers.
std::atomic<bool> obs_attached_{false};
bool net_active_incremented_ = false;
// Separate latch for the monotonic accepted Counter — bumped by
// RecordAcceptedConnection before the fast-close guard, independent of
// the net_active_incremented_ latch that gates the active gauge +1/-1.
bool net_accepted_incremented_ = false;
OBSERVABILITY_NAMESPACE::UpDownCounter* net_active_counter_ = nullptr;
OBSERVABILITY_NAMESPACE::Counter* net_accepted_counter_ = nullptr;
// Null when no application protocol has been confirmed yet. Holds
Expand Down Expand Up @@ -319,14 +323,22 @@ class ConnectionHandler : public std::enable_shared_from_this<ConnectionHandler>

bool IsTimeOut(std::chrono::seconds) const;

// Bumps the monotonic `reactor.net.connections.accepted` Counter +1 on
// first call (idempotent). Called before the fast-close guard in
// HttpServer::HandleNewConnection so a connection the kernel accepted is
// always counted, even when it closes before protocol setup. Never emits
// the active gauge — that is AttachTransportObservability's job, gated on
// the connection surviving the guard.
void RecordAcceptedConnection(OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr);
// Transport-level observability wiring. Idempotent — second calls
// are silently ignored so accept paths that retry stay safe. Caches
// catalog instrument pointers under the assumption that the
// ObservabilityManager outlives this connection (every ConnectionHandler
// is destroyed before the ObservabilityManager via the documented
// four-phase shutdown).
// Bumps `reactor.net.connections.active` +1 and `reactor.net.connections.accepted` +1 on first call;
// the matching -1 against the active gauge fires from ~ConnectionHandler.
// four-phase shutdown).
// Bumps `reactor.net.connections.active` +1 on first call; the matching
// -1 against the active gauge fires from ~ConnectionHandler. The accepted
// Counter is emitted separately by RecordAcceptedConnection.
void AttachTransportObservability(OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr);
// Called once when the L7 protocol becomes known (HTTP/1.1 after
// first parse, HTTP/2 after preface accept).
Expand Down
42 changes: 31 additions & 11 deletions server/connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ ConnectionHandler::ConnectionHandler(std::shared_ptr<Dispatcher> _dispatcher, st
// Out-of-line destructor: unique_ptr<TlsConnection> requires complete type.
// TlsConnection is forward-declared in the header; full definition is available here.
ConnectionHandler::~ConnectionHandler() {
// Symmetric decrements for the accept-time +1 and any
// protocol-confirmed +1 still in flight. Run unconditionally — the
// latch flag protects against double-decrement, and operating on a
// Symmetric -1 for the net.connections.active +1 and any
// protocol-confirmed +1 still in flight. The monotonic accepted
// Counter (RecordAcceptedConnection) has no -1. Run unconditionally —
// the latch flag protects against double-decrement, and operating on a
// cached instrument pointer keeps the dtor independent of whether
// the manager's catalog has been swapped since AttachTransportObservability.
if (net_active_incremented_ && net_active_counter_ != nullptr) {
Expand All @@ -34,32 +35,51 @@ ConnectionHandler::~ConnectionHandler() {
}
}

void ConnectionHandler::RecordAcceptedConnection(
OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr) {
if (mgr == nullptr) return;
// Monotonic Counter — bumped once per accepted connection, never
// decremented. Kept separate from the active-gauge publish in
// AttachTransportObservability so this fires BEFORE the fast-close
// guard in HttpServer::HandleNewConnection (the connection was
// genuinely accepted by the kernel) WITHOUT publishing
// net.connections.active for a connection that may never enter
// http_connections_ — an active +1 there would be balanced only by
// ~ConnectionHandler, which the timer-map reap can defer, surfacing a
// phantom active connection on /metrics.
if (net_accepted_incremented_) return;
net_accepted_incremented_ = true;
net_accepted_counter_ = mgr->catalog().reactor_net_connections_accepted;
if (net_accepted_counter_ != nullptr) net_accepted_counter_->Add(1.0, {});
}

void ConnectionHandler::AttachTransportObservability(
OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr) {
if (mgr == nullptr) return;

// First-call work: cache instrument pointers, publish via
// obs_attached_=true, and emit the accept-time +1s. Idempotent —
// a defensive second call short-circuits this block but still
// runs the stash drain below in case a worker stashed between
// the first and second invocation.
// obs_attached_=true, and emit the net.connections.active +1.
// Idempotent — a defensive second call short-circuits this block but
// still runs the stash drain below in case a worker stashed between
// the first and second invocation. The accepted Counter is emitted
// separately by RecordAcceptedConnection; this method runs only for
// connections that survive the fast-close guard, so the active gauge
// is never published for a connection that won't be tracked.
if (!net_active_incremented_) {
net_active_incremented_ = true; // set before side effects (mirrors bound_once_ pattern)
const auto& cat = mgr->catalog();
net_active_counter_ = cat.reactor_net_connections_active;
net_accepted_counter_ = cat.reactor_net_connections_accepted;
http_active_counter_ = cat.reactor_http_connections_active;
tls_handshakes_counter_ = cat.reactor_tls_handshakes;
// Release-store: publishes the four preceding pointer writes to any
// Release-store: publishes the three preceding pointer writes to any
// socket-worker thread that later acquire-loads obs_attached_. Without
// this, the cross-thread happens-before edge from NetServer's EnQueue
// runs the wrong way (EnQueue happens BEFORE AttachTransportObservability
// in the accept-dispatcher call sequence) and the worker's read of
// http_active_counter_ races with this write. Plain bool writes / reads
// would be TSan-flagged even if benign on x86.
obs_attached_.store(true, std::memory_order_release);
if (net_active_counter_ != nullptr) net_active_counter_->Add(1.0, {});
if (net_accepted_counter_ != nullptr) net_accepted_counter_->Add(1.0, {});
if (net_active_counter_ != nullptr) net_active_counter_->Add(1.0, {});
}

// Drain any protocol label the worker stashed before obs_attached_
Expand Down
27 changes: 22 additions & 5 deletions server/http/http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4603,18 +4603,35 @@ void HttpServer::SafeNotifyWsClose(const std::shared_ptr<HttpConnectionHandler>&
}

void HttpServer::HandleNewConnection(std::shared_ptr<ConnectionHandler> conn) {
// Record the accept-time Counter bump BEFORE the fast-close early-exit.
// The connection has been accepted by the kernel (NetServer::
// HandleNewConnection just returned from accept()), so the monotonic
// `reactor.net.connections.accepted` Counter must reflect it regardless
// of whether the peer races us to close. The previous ordering — early-
// exit first, attach second — silently dropped the +1 in the window
// where the socket worker dispatcher fired CallCloseCb between
// RegisterCallbacks (epoll_ctl_add) and this callback. This Counter has
// no symmetric -1, so emitting it before the guard is safe.
conn->RecordAcceptedConnection(observability_manager_.get());

// Guard: if the connection already closed (fast disconnect between
// RegisterCallbacks enabling epoll and new_conn_callback running here),
// skip entirely. Inserting a handler for a closed connection would leave
// stale state in http_connections_ (potentially under fd -1 after ReleaseFd).
// skip the protocol-detection setup AND the active-gauge publish below.
// Inserting a handler for a closed connection would leave stale state in
// http_connections_ (potentially under fd -1 after ReleaseFd); publishing
// `reactor.net.connections.active` for a connection that never enters
// http_connections_ would leave the +1 balanced only by ~ConnectionHandler,
// which the timer-map reap can defer if the queued AddConnection races
// ahead of the close-path RemoveTimerConnectionIfMatch — surfacing a
// phantom active connection on /metrics until the idle scan reclaims it.
if (conn->IsClosing()) {
logging::Get()->debug("New connection already closing fd={}, skipping", conn->fd());
return;
}

// Wire transport-level observability counters. Idempotent and a no-op
// when no manager is installed; the matching -1 against
// reactor.net.connections.active fires from ~ConnectionHandler.
// Wire transport-level + protocol observability for the surviving
// connection. Idempotent; the matching `reactor.net.connections.active`
// -1 fires from ~ConnectionHandler gated on `net_active_incremented_`.
conn->AttachTransportObservability(observability_manager_.get());

// NOTE: total_accepted_ and active_connections_ are NOT incremented here.
Expand Down
9 changes: 9 additions & 0 deletions server/net_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ void NetServer::HandleNewConnection(std::unique_ptr<SocketHandler> cilent_sock){
auto dispatcher = socket_dispatchers_[idx];
dispatcher->EnQueue([weak_conn, dispatcher]() {
if (auto c = weak_conn.lock()) {
// Skip timer registration for a connection that already
// started closing. The close path EnQueues
// RemoveTimerConnectionIfMatch, which no-ops on a
// not-yet-registered conn; if that removal lost the race and
// ran before this AddConnection, registering now would strand
// the closed handler in the timer map until the idle scan
// (or, with idle timeout disabled, until shutdown). is_closing_
// is a one-way latch, so a closing conn never needs scanning.
if (c->IsClosing()) return;
dispatcher->AddConnection(c);
}
});
Expand Down
30 changes: 18 additions & 12 deletions test/cli/cli_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -2749,21 +2749,27 @@ void TestReloadAcceptsPidFile() {
void TestHelpIncludesReload() {
std::cout << "\n[TEST] CliParser: help text includes 'reload'..." << std::endl;

// Run reactor_server --help (or help subcommand) and capture output.
// We redirect stdout to a pipe so we can inspect it.
std::string cmd = "./server_runner help 2>&1";
FILE* fp = popen(cmd.c_str(), "r");
std::string output;
if (fp) {
char buf[256];
while (fgets(buf, sizeof(buf), fp)) output += buf;
pclose(fp);
}

// Call CliParser::PrintUsage() directly and capture std::cout in-process.
// This exercises the SAME usage text the `help` subcommand prints, without
// shelling out to ./server_runner — the old popen() form coupled this unit
// test to a separately-built binary's presence, the current working
// directory, and that binary's freshness, so it spuriously failed whenever
// only test_runner had been rebuilt (the help text itself was never broken).
std::ostringstream captured;
std::streambuf* prev = std::cout.rdbuf(captured.rdbuf());
try {
CliParser::PrintUsage("reactor_server");
} catch (...) {
std::cout.rdbuf(prev); // restore before rethrow so failures still print
throw;
}
std::cout.rdbuf(prev); // restore before RecordTest emits to the real stdout

const std::string output = captured.str();
bool has_reload = (output.find("reload") != std::string::npos);
TestFramework::RecordTest("CliParser: help text includes 'reload'",
has_reload,
has_reload ? "" : "Help output does not mention 'reload'",
has_reload ? "" : "PrintUsage output does not mention 'reload'",
CLI_CATEGORY);
}

Expand Down
50 changes: 36 additions & 14 deletions test/observability/observability_connection_metrics_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,16 +481,22 @@ inline void TestNetAcceptedIsMonotonic() {
"reactor.net.connections.accepted");

constexpr int N = 5;
// Send a real HTTP request and drain the response per connection.
// A bare connect+close races the accept dispatcher on slow CI
// runners — the RST can arrive before the accept callback fires,
// losing the +1 on `reactor.net.connections.accepted`. Driving a
// full request/response cycle forces the accept to complete
// before the close.
// Per-iteration polling: drive each connection through a full
// request/response cycle, then wait for ITS accept-counter +1 to
// land BEFORE opening the next. A batch-style "open all N, then
// poll once" loop loses races on heavily contended CI runners —
// ConnectTcp returns once the kernel's SYN/ACK handshake is done,
// but the userspace accept() that bumps the counter runs on a
// dispatcher thread that can be CPU-starved for seconds while
// sibling sanitizer jobs hog the runner. Per-iteration polling
// serializes accepts, adapts to whatever dispatcher cadence the
// runner allows, and pins any failure to the specific iteration
// whose accept never landed within its per-iter budget.
const std::string req =
"GET /h HTTP/1.1\r\n"
"Host: localhost\r\n"
"Connection: close\r\n\r\n";
int timed_out_iter = -1;
for (int i = 0; i < N; ++i) {
int fd = ConnectTcp(port);
if (fd < 0) {
Expand All @@ -508,19 +514,31 @@ inline void TestNetAcceptedIsMonotonic() {
}
(void)DrainSocket(fd, 500);
::close(fd);
}
// Poll for the counter to reach N — the accept-time bump runs on
// the dispatcher thread, so even after the response drains there
// is a small visibility gap before the Snapshot picks it up.
{
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
while (std::chrono::steady_clock::now() < deadline) {

// Wait for this connection's +1 before starting the next one.
// 10 s per-iteration budget covers worst-case starvation on
// heavily contended shared runners (multiple sanitizer jobs +
// parallel test shards on a 2-4 core box); normal runs exit
// this loop in tens of milliseconds.
auto iter_deadline = std::chrono::steady_clock::now()
+ std::chrono::seconds(10);
bool seen = false;
while (std::chrono::steady_clock::now() < iter_deadline) {
auto snap_poll = fix.manager->meter_provider()->Snapshot();
double delta_poll = SumCounter(snap_poll,
"reactor.net.connections.accepted") - accepted_before;
if (delta_poll >= static_cast<double>(N)) break;
if (delta_poll >= static_cast<double>(i + 1)) {
seen = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
if (!seen) {
// Let the final assertion run so the recorded delta and
// the timed-out iteration index both reach the failure log.
timed_out_iter = i;
break;
}
}

auto snap_after = fix.manager->meter_provider()->Snapshot();
Expand All @@ -533,6 +551,10 @@ inline void TestNetAcceptedIsMonotonic() {
if (!pass) {
err = "delta=" + std::to_string(accepted_delta) +
" expected>=" + std::to_string(N);
if (timed_out_iter >= 0) {
err += " (per-iter poll timed out at i="
+ std::to_string(timed_out_iter) + ")";
}
}
TestFramework::RecordTest(TAG, pass, err,
TestFramework::TestCategory::OTHER);
Expand Down
Loading