Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions .github/workflows/weekly-valgrind.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,29 @@ name: Weekly Valgrind
# valgrind the 10-50x slowdown collapses their timing assertions. The
# protocol / lifecycle suites carry the high-value memory-safety signal.
#
# Topology: sharded into 4 parallel matrix jobs. The previous single
# Topology: sharded into 6 parallel matrix jobs. The original single
# sequential job consumed the entire 6h GitHub job cap on the `auth`
# umbrella alone (without ever finishing), starving every suite
# scheduled after it. The shards keep critical-path wall time well
# under 30 min and let an individual shard hang without blocking the
# others (`fail-fast: false`). Per-shard 4h cap catches hangs while
# leaving plenty of buffer for the slowest expected shard (~10 min
# observed locally; CI is ~1.5-2x slower).
# scheduled after it. A first 4-shard split brought critical-path wall
# time down but left `auth-crypto` at 78 min (jwks alone 23 min);
# splitting that one shard 3 ways drops the critical path to ~25 min.
# `fail-fast: false` so an individual shard hanging or finding an
# error doesn't cancel the others. Per-shard 4h cap catches hangs
# while leaving plenty of buffer.
#
# Shard composition:
# - `protocol-fast`: lifecycle + parser + small unit tests
# (~4 min local, expect ~6-8 min on GitHub runner).
# - `proxy-heavy`: upstream-pool-heavy + auth_intro (the single
# longest auth sub-suite — moved out of `auth-crypto` for balance).
# - `auth-crypto`: auth umbrella minus the three integration sub-
# suites whose HTTP plumbing is already covered upstream
# (`auth2`, `auth_reload`, `auth_multi`). Loss of valgrind-unique
# coverage on those is bounded — see the table in PR #42 follow-up.
# - `protocol-fast`: lifecycle + parser + small unit tests.
# - `proxy-mixed`: upstream-pool-heavy + the longest auth integration
# suite (`auth_intro` — distinct from the `intro_client` unit suite
# that lives in the auth-intro shard below). The `-mixed` name
# flags the cross-theme content so contributors following the
# 'pick the shard whose theme matches' convention see the exception.
# - `auth-jwks` / `auth-mgr` / `auth-intro`: auth umbrella split 3
# ways by measured per-suite cost (jwks/intro_client/auth_mgr are
# the heavy hitters; tiny suites distributed for balance).
# `auth2`, `auth_reload`, `auth_multi` are dropped because their
# HTTP plumbing is already covered upstream; `auth_race` is
# dropped per the race-suite-under-Valgrind exclusion.
# - `observability`: all obs_* suites — every one is small enough
# that the whole family fits in one shard.
#
Expand Down Expand Up @@ -81,15 +86,28 @@ jobs:
basic lru_cache http http2 ws tls cli route upstream h2_upstream
rate_limit dns h2_trailer grpc_obs grpc_web_edge
introspection_cache
- shard: proxy-heavy
# Renamed from `proxy-heavy` to signal that this shard packs
# one auth integration suite (`auth_intro`) alongside the
# upstream-heavy ones — operators following the
# "pick the shard whose theme matches" convention should see
# the mixed grouping from the name.
- shard: proxy-mixed
suites: >-
circuit_breaker proxy streaming_request grpc grpc_proxy grpc_web
auth_intro
- shard: auth-crypto
# Auth umbrella split into 3 by measured CI per-suite wall-clock
# (run 26460686695): jwks 23m, intro_client 16m, auth_mgr 14m,
# jwt 7m dominate the budget. Greedy LPT balances each shard at
# ~21min suites + 2min build = ~23min total.
- shard: auth-jwks
suites: >-
auth_foundation hrauth auth_mgr auth_fail auth_ws
jwt jwks oidc intro_client auth_race
router_async auth_observability
jwks auth_foundation hrauth
- shard: auth-mgr
suites: >-
auth_mgr jwt oidc router_async
- shard: auth-intro
suites: >-
intro_client auth_ws auth_fail auth_observability
- shard: observability
suites: >-
obs_foundation obs_tracer obs_metrics obs_mgr obs_propagator
Expand All @@ -101,6 +119,18 @@ jobs:
- uses: actions/checkout@v4
- name: Install dependencies
run: sudo apt-get update && sudo apt-get install -y libssl-dev valgrind
# Per DEVELOPMENT_RULES.md "CI workflow maintenance": every Linux
# CI job wraps the build in ccache-action with per-dimension keys.
# All 6 sharded matrix jobs build identical valgrind-flagged
# binaries, so they share the single `ccache-linux-valgrind` key.
# The sanitizer-class 500M cap matches the ASan/TSan jobs in
# ci.yml — valgrind builds carry similar debug-info bloat.
- name: ccache
uses: hendrikmuhs/ccache-action@v1.2
with:
key: ccache-linux-valgrind
max-size: 500M
create-symlink: true
- name: Build (debug + frame pointers — valgrind needs accurate stacks)
run: |
make -j$(nproc) \
Expand Down
31 changes: 31 additions & 0 deletions include/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,31 @@ class ConnectionHandler : public std::enable_shared_from_this<ConnectionHandler>
// so the dtor can issue a matching -1 against the SAME labeled
// series without per-connection string allocation.
const char* http_protocol_label_ = nullptr;
// Stash slot for MarkApplicationProtocolConfirmed when it loses the
// race vs AttachTransportObservability (i.e. headers parsed on the
// worker thread before the accept dispatcher publishes the
// observability pointers). Late-running AttachTransportObservability
// exchanges this slot post-publish and EnQueues a replay on the
// worker dispatcher so the gauge bump still lands.
//
// Concurrent access model: BOTH threads read and write.
// - Worker: CAS(nullptr → label) on stash; exchange(nullptr) on take-back.
// - Accept dispatcher: exchange(nullptr) post-publish to drain.
// - Worker: exchange(nullptr) in HandOffToWebSocket to suppress the
// replay path for connections about to become WebSockets.
// Arbitration is via the exchange's total-order semantics on this
// single atomic — exactly one site observes a non-null value per
// stash event.
std::atomic<const char*> pending_http_protocol_label_{nullptr};

// Set by HandOffToWebSocket to suppress any late EnQueued replay of
// MarkApplicationProtocolConfirmed: once the connection has been
// committed to WebSocket, the http/1.1 gauge MUST NOT be bumped
// even if a stash drained successfully and the replay was already
// queued. Both HandOffToWebSocket and the replay (via Mark) run on
// the same worker dispatcher, so program order on that thread plus
// release/acquire on this flag arbitrate the race.
std::atomic<bool> ws_handoff_done_{false};
OBSERVABILITY_NAMESPACE::UpDownCounter* http_active_counter_ = nullptr;
OBSERVABILITY_NAMESPACE::Counter* tls_handshakes_counter_ = nullptr;
public:
Expand Down Expand Up @@ -209,6 +234,12 @@ class ConnectionHandler : public std::enable_shared_from_this<ConnectionHandler>
read_pump_paused_.store(false, std::memory_order_release);
close_on_resume_.store(false, std::memory_order_release);
has_pending_reads_.store(false, std::memory_order_release);
// Inbound observability state — clear so a future code path
// that calls Mark / HandOff on a recycled outbound connection
// (today no such path exists) cannot inherit the prior
// borrower's stash or commitment flag.
pending_http_protocol_label_.store(nullptr, std::memory_order_release);
ws_handoff_done_.store(false, std::memory_order_release);
}
// Dispatcher-thread-only for reuse validation.
size_t InputBufferSize() const { return input_bf_.Size(); }
Expand Down
164 changes: 142 additions & 22 deletions server/connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,144 @@ ConnectionHandler::~ConnectionHandler() {

void ConnectionHandler::AttachTransportObservability(
OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr) {
if (net_active_incremented_) return;
if (mgr == nullptr) return;
net_active_incremented_ = true; // set before side effects (mirrors bound_once_ pattern)
const auto& cat = mgr->catalog();
net_active_counter_ = cat.reactor_net_connections_active;
net_accepted_counter_ = cat.reactor_net_connections_accepted;
http_active_counter_ = cat.reactor_http_connections_active;
tls_handshakes_counter_ = cat.reactor_tls_handshakes;
// Release-store: publishes the four preceding pointer writes to any
// socket-worker thread that later acquire-loads obs_attached_. Without
// this, the cross-thread happens-before edge from NetServer's EnQueue
// runs the wrong way (EnQueue happens BEFORE AttachTransportObservability
// in the accept-dispatcher call sequence) and the worker's read of
// http_active_counter_ races with this write. Plain bool writes / reads
// would be TSan-flagged even if benign on x86.
obs_attached_.store(true, std::memory_order_release);
if (net_active_counter_ != nullptr) net_active_counter_->Add(1.0, {});
if (net_accepted_counter_ != nullptr) net_accepted_counter_->Add(1.0, {});

// First-call work: cache instrument pointers, publish via
// obs_attached_=true, and emit the accept-time +1s. Idempotent —
// a defensive second call short-circuits this block but still
// runs the stash drain below in case a worker stashed between
// the first and second invocation.
if (!net_active_incremented_) {
net_active_incremented_ = true; // set before side effects (mirrors bound_once_ pattern)
const auto& cat = mgr->catalog();
net_active_counter_ = cat.reactor_net_connections_active;
net_accepted_counter_ = cat.reactor_net_connections_accepted;
http_active_counter_ = cat.reactor_http_connections_active;
tls_handshakes_counter_ = cat.reactor_tls_handshakes;
// Release-store: publishes the four preceding pointer writes to any
// socket-worker thread that later acquire-loads obs_attached_. Without
// this, the cross-thread happens-before edge from NetServer's EnQueue
// runs the wrong way (EnQueue happens BEFORE AttachTransportObservability
// in the accept-dispatcher call sequence) and the worker's read of
// http_active_counter_ races with this write. Plain bool writes / reads
// would be TSan-flagged even if benign on x86.
obs_attached_.store(true, std::memory_order_release);
if (net_active_counter_ != nullptr) net_active_counter_->Add(1.0, {});
if (net_accepted_counter_ != nullptr) net_accepted_counter_->Add(1.0, {});
}

// Drain any protocol label the worker stashed before obs_attached_
// became visible. The exchange MUST happen AFTER the release-store
// above so a worker reaching its post-stash recheck observes
// obs_attached_=true and arbitrates correctly. Replay on the worker
// dispatcher to keep http_protocol_label_ single-writer (race-free
// vs concurrent MarkApplicationProtocolConfirmed / HandOffToWebSocket
// calls that also run on the worker).
//
// Pre-load shared_from_this() BEFORE the exchange so a throw
// (`std::bad_weak_ptr` if the connection isn't shared_ptr-managed)
// does not consume the stash with no rollback. If the lookup
// succeeds AND we can queue, only then do we take ownership of the
// label via exchange.
std::weak_ptr<ConnectionHandler> weak_self;
try {
weak_self = shared_from_this();
} catch (const std::bad_weak_ptr&) {
// Caller invoked attach without owning the connection via a
// shared_ptr. The stash drain is best-effort; leave the label
// in the slot so a future attach call (or no one) sees it.
// Both branches (no drain / late drain) are race-correct.
logging::Get()->warn(
"AttachTransportObservability: shared_from_this() failed; "
"stash drain skipped fd={}", fd());
return;
}
if (!event_dispatcher_) {
logging::Get()->warn(
"AttachTransportObservability: event_dispatcher_ is null; "
"stash drain skipped fd={}", fd());
return;
}
const char* stashed = pending_http_protocol_label_.exchange(
nullptr, std::memory_order_acquire);
if (stashed == nullptr) return;
event_dispatcher_->EnQueue([weak_self, stashed]() {
if (auto self = weak_self.lock()) {
// MarkApplicationProtocolConfirmed's `ws_handoff_done_`
// check at the top suppresses this publish if a
// HandOffToWebSocket happened on this worker between the
// exchange above and this lambda firing — otherwise we'd
// bump http/1.1 for a connection that is now a WebSocket.
self->MarkApplicationProtocolConfirmed(stashed);
}
});
}

void ConnectionHandler::MarkApplicationProtocolConfirmed(
const char* protocol_label) {
if (protocol_label == nullptr) return;
// Suppress publish once the connection has been committed to
// WebSocket — a late EnQueued replay arriving after the handoff
// would otherwise +1 the http/1.1 gauge for a connection that is
// already a WebSocket. HandOffToWebSocket sets ws_handoff_done_
// before doing anything else; both it and this function run on the
// same worker dispatcher, so the acquire-load here pairs with the
// release-store there.
if (ws_handoff_done_.load(std::memory_order_acquire)) return;
// Acquire-load — pairs with AttachTransportObservability's release-
// store so the pointer fields below are visible on this thread.
// When false, AttachTransportObservability never ran (observability
// disabled OR accept-time call hadn't completed yet); the counter
// pointers stay nullptr semantics — short-circuit to keep behavior
// identical to "no observability".
if (!obs_attached_.load(std::memory_order_acquire)) return;
// When false, the accept-dispatcher's attach has not finished
// publishing the counter pointers. Returning early would silently
// drop the gauge bump on fast-parsed connections where the worker
// outpaces the accept dispatcher (issue #43). Stash the label so
// attach replays it post-publish; recheck + take-back covers the
// window where attach completed between our load and our stash.
if (!obs_attached_.load(std::memory_order_acquire)) {
const char* expected = nullptr;
if (!pending_http_protocol_label_.compare_exchange_strong(
expected, protocol_label,
std::memory_order_release,
std::memory_order_relaxed)) {
// Another stash already in flight; first wins. Today the
// invariant is one Mark call per connection (H1 keep-alive
// re-confirms use the same literal), so `expected` always
// equals `protocol_label` here. A different literal means a
// future code path is racing two distinct protocol labels —
// a real bug surface; log so it doesn't fail silently.
if (expected != nullptr && protocol_label != expected) {
logging::Get()->error(
"MarkApplicationProtocolConfirmed CAS-fail with label "
"mismatch — stashed={}, dropped={}, fd={}",
expected, protocol_label, fd());
}
return;
}
// Race recheck: if attach's exchange already ran (and got nullptr)
// before our stash landed, it won't run again and won't see our
// stash. Take it back and publish inline on this worker thread.
if (!obs_attached_.load(std::memory_order_acquire)) {
return; // attach hasn't run — its post-store exchange will pick this up
}
const char* taken = pending_http_protocol_label_.exchange(
nullptr, std::memory_order_acquire);
if (taken == nullptr) {
return; // attach grabbed our stash; replay will fire via EnQueue
}
// taken must equal protocol_label: pending_http_protocol_label_ is
// single-writer-via-CAS (this function) plus drain-readers
// (Attach and HandOffToWebSocket); none of the drainers write a
// non-null value back. If a future feature adds another writer,
// log the divergence so the silent override is observable.
if (taken != protocol_label) {
logging::Get()->error(
"MarkApplicationProtocolConfirmed take-back saw foreign "
"label — taken={}, expected={}, fd={}",
taken, protocol_label, fd());
}
protocol_label = taken;
// Fall through to publish inline (we're on the worker thread, so
// http_protocol_label_ writes below are single-writer).
}
if (http_protocol_label_ != nullptr) {
// Same-label re-confirm is normal: H1 keep-alive calls this on every
// headers_complete. Different label is a real bug (protocol change
Expand All @@ -85,6 +193,18 @@ void ConnectionHandler::MarkApplicationProtocolConfirmed(
}

void ConnectionHandler::HandOffToWebSocket() {
// Suppress any late-arriving Mark replay before doing anything else.
// Set the flag FIRST so a racing MarkApplicationProtocolConfirmed
// (running on this same worker, serialized through the dispatcher
// loop, so 'racing' is really 'queued behind us') observes the
// commitment-to-WS via acquire-load and short-circuits.
ws_handoff_done_.store(true, std::memory_order_release);
// Also drain the pending stash so a not-yet-run Attach skips its
// EnQueue path. Best-effort optimisation — if Attach has already
// taken the stash and queued the replay, the ws_handoff_done_ check
// at the top of MarkApplicationProtocolConfirmed catches it.
pending_http_protocol_label_.exchange(nullptr, std::memory_order_acq_rel);

// Acquire-load — same publication contract as MarkApplicationProtocolConfirmed.
if (!obs_attached_.load(std::memory_order_acquire)) return;
if (http_protocol_label_ == nullptr) return;
Expand Down
Loading
Loading