From 0b0b4751dcf3b7230fca37a8d497a3d82255e88a Mon Sep 17 00:00:00 2001 From: mwfj Date: Wed, 27 May 2026 00:08:45 +0800 Subject: [PATCH 1/3] Optimize week valgrind ci job --- .github/workflows/weekly-valgrind.yml | 2 +- server/http/http_router.cc | 21 ++++++++++++++++-- test/auth/introspection_cache_test.h | 32 +++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/.github/workflows/weekly-valgrind.yml b/.github/workflows/weekly-valgrind.yml index 2678e3a6..3da81ffe 100644 --- a/.github/workflows/weekly-valgrind.yml +++ b/.github/workflows/weekly-valgrind.yml @@ -88,7 +88,7 @@ jobs: - shard: auth-crypto suites: >- auth_foundation hrauth auth_mgr auth_fail auth_ws - jwt jwks oidc intro_client auth_race + jwt jwks oidc intro_client router_async auth_observability - shard: observability suites: >- diff --git a/server/http/http_router.cc b/server/http/http_router.cc index 9901fa09..59581c9b 100644 --- a/server/http/http_router.cc +++ b/server/http/http_router.cc @@ -45,7 +45,13 @@ void AsyncPendingState::Complete(AsyncMiddlewarePayload payload) { if (completed_) return; // one-shot completed_ = true; if (resume_armed_) { - cb_to_fire = resume_cb_; // copy under lock + // Move (not copy) — the resume callback's lambda captures `state` + // by value (strong shared_ptr), so leaving the std::function + // alive in resume_cb_ keeps a self-cycle that prevents the state + // (and any captured Span / IssueTraceContext) from destructing. + // Resume is one-shot; after Complete fires it, no future call + // ever wants it again. + cb_to_fire = std::move(resume_cb_); payload_to_fire = std::move(payload); } else { result_slot_ = std::move(payload); @@ -81,7 +87,10 @@ void AsyncPendingState::ArmResume( decrement_owed_ = false; } if (completion_pending_) { - cb_to_fire = resume_cb_; // copy under lock + // Move (not copy) — see Complete() for the cycle rationale. Resume + // is one-shot; once this inline-replay fires, no future caller + // ever uses resume_cb_ again. + cb_to_fire = std::move(resume_cb_); payload_to_fire = std::move(result_slot_); completion_pending_ = false; } @@ -134,6 +143,14 @@ void AsyncPendingState::TripCancel() { { std::lock_guard lk(mu_); local = std::move(cancel_cb_); + // Also release any armed resume callback — after cancel, resume is + // a no-op contract-wise and leaving it alive sustains the + // resume_cb→state self-cycle (the lambda captures `state` by value, + // so a never-fired resume_cb keeps the state alive forever). + // Safe vs a racing Complete: Complete's resume_armed_ check still + // sees true, but its move-out of resume_cb_ produces an empty + // std::function and the post-lock fire is the documented no-op. + resume_cb_ = nullptr; } if (local) { try { local(); } diff --git a/test/auth/introspection_cache_test.h b/test/auth/introspection_cache_test.h index 03494770..b70c9ed9 100644 --- a/test/auth/introspection_cache_test.h +++ b/test/auth/introspection_cache_test.h @@ -560,6 +560,18 @@ static void Test_ApplyReload_MaxEntriesDecrease_AmortisedEviction() { // 18. ConcurrentInsertLookup_16Threads — 16 threads, 50/50 mix, no crash. // --------------------------------------------------------------------------- static void Test_ConcurrentInsertLookup_16Threads() { + // Skip under valgrind: 16 × 10,000 iteration-bounded ops × 10-50x + // interpretation overhead burns >15 min wall clock without producing + // signal valgrind cares about (the same Insert/Lookup paths are + // exercised by every non-concurrent test in this suite, which are + // the ones that would catch actual UAF / uninit). The race semantics + // (no torn reads, no error) are validated by + // LookupDuringEviction_NoUseAfterFree which is wall-clock-bounded. + if (std::getenv("VALGRIND_TEST") != nullptr) { + Record("IntrospectionCache: ConcurrentInsertLookup_16Threads", + true, "skipped under VALGRIND_TEST"); + return; + } try { IntrospectionCache cache("issuer", 100000, 16); constexpr int kThreads = 16; @@ -600,6 +612,17 @@ static void Test_ConcurrentInsertLookup_16Threads() { // 19. ConcurrentReloadDuringInsert — reload thread races with insert. // --------------------------------------------------------------------------- static void Test_ConcurrentReloadDuringInsert() { + // Skip under valgrind / sanitizers: the 200ms wall-clock window is too + // short for inserter+reloader threads to make observable progress under + // 10-50x interpretation overhead, so `entries > 0` becomes a coin flip. + // The race semantics this asserts are also exercised by + // ConcurrentInsertLookup_16Threads and ConcurrentReload_MultipleThreads + // which have longer wall-clock budgets and validate the same invariants. + if (std::getenv("VALGRIND_TEST") != nullptr) { + Record("IntrospectionCache: ConcurrentReloadDuringInsert", + true, "skipped under VALGRIND_TEST"); + return; + } try { IntrospectionCache cache("issuer", 50000, 16); std::atomic stop{false}; @@ -973,6 +996,15 @@ static void Test_Insert_EmptyAuthContext_NegativeEntry_RoundTrips() { // remains usable. // --------------------------------------------------------------------------- static void Test_ConcurrentReload_MultipleThreads() { + // Skip under valgrind: 150ms wall-clock × 12 threads × 10-50x + // interpretation overhead → reload_count / lookup_count assertion + // becomes a coin flip. Race semantics validated by + // LookupDuringEviction_NoUseAfterFree. + if (std::getenv("VALGRIND_TEST") != nullptr) { + Record("IntrospectionCache: ConcurrentReload_MultipleThreads", + true, "skipped under VALGRIND_TEST"); + return; + } try { IntrospectionCache cache("issuer", 4096, 16); // Seed some entries so Lookup has non-empty shards. From d796516b22c32a22e6566b39d542e27f6d5cf203 Mon Sep 17 00:00:00 2001 From: mwfj Date: Wed, 27 May 2026 15:17:44 +0800 Subject: [PATCH 2/3] Optimize week valgrind ci job & fix issue --- .github/workflows/weekly-valgrind.yml | 68 ++++++++--- include/connection_handler.h | 31 +++++ server/connection_handler.cc | 164 ++++++++++++++++++++++---- 3 files changed, 222 insertions(+), 41 deletions(-) diff --git a/.github/workflows/weekly-valgrind.yml b/.github/workflows/weekly-valgrind.yml index 3da81ffe..fc1488fe 100644 --- a/.github/workflows/weekly-valgrind.yml +++ b/.github/workflows/weekly-valgrind.yml @@ -23,24 +23,29 @@ name: Weekly Valgrind # valgrind the 10-50x slowdown collapses their timing assertions. The # protocol / lifecycle suites carry the high-value memory-safety signal. # -# Topology: sharded into 4 parallel matrix jobs. The previous single +# Topology: sharded into 6 parallel matrix jobs. The original single # sequential job consumed the entire 6h GitHub job cap on the `auth` # umbrella alone (without ever finishing), starving every suite -# scheduled after it. The shards keep critical-path wall time well -# under 30 min and let an individual shard hang without blocking the -# others (`fail-fast: false`). Per-shard 4h cap catches hangs while -# leaving plenty of buffer for the slowest expected shard (~10 min -# observed locally; CI is ~1.5-2x slower). +# scheduled after it. A first 4-shard split brought critical-path wall +# time down but left `auth-crypto` at 78 min (jwks alone 23 min); +# splitting that one shard 3 ways drops the critical path to ~25 min. +# `fail-fast: false` so an individual shard hanging or finding an +# error doesn't cancel the others. Per-shard 4h cap catches hangs +# while leaving plenty of buffer. # # Shard composition: -# - `protocol-fast`: lifecycle + parser + small unit tests -# (~4 min local, expect ~6-8 min on GitHub runner). -# - `proxy-heavy`: upstream-pool-heavy + auth_intro (the single -# longest auth sub-suite — moved out of `auth-crypto` for balance). -# - `auth-crypto`: auth umbrella minus the three integration sub- -# suites whose HTTP plumbing is already covered upstream -# (`auth2`, `auth_reload`, `auth_multi`). Loss of valgrind-unique -# coverage on those is bounded — see the table in PR #42 follow-up. +# - `protocol-fast`: lifecycle + parser + small unit tests. +# - `proxy-mixed`: upstream-pool-heavy + the longest auth integration +# suite (`auth_intro` — distinct from the `intro_client` unit suite +# that lives in the auth-intro shard below). The `-mixed` name +# flags the cross-theme content so contributors following the +# 'pick the shard whose theme matches' convention see the exception. +# - `auth-jwks` / `auth-mgr` / `auth-intro`: auth umbrella split 3 +# ways by measured per-suite cost (jwks/intro_client/auth_mgr are +# the heavy hitters; tiny suites distributed for balance). +# `auth2`, `auth_reload`, `auth_multi` are dropped because their +# HTTP plumbing is already covered upstream; `auth_race` is +# dropped per the race-suite-under-Valgrind exclusion. # - `observability`: all obs_* suites — every one is small enough # that the whole family fits in one shard. # @@ -81,15 +86,28 @@ jobs: basic lru_cache http http2 ws tls cli route upstream h2_upstream rate_limit dns h2_trailer grpc_obs grpc_web_edge introspection_cache - - shard: proxy-heavy + # Renamed from `proxy-heavy` to signal that this shard packs + # one auth integration suite (`auth_intro`) alongside the + # upstream-heavy ones — operators following the + # "pick the shard whose theme matches" convention should see + # the mixed grouping from the name. + - shard: proxy-mixed suites: >- circuit_breaker proxy streaming_request grpc grpc_proxy grpc_web auth_intro - - shard: auth-crypto + # Auth umbrella split into 3 by measured CI per-suite wall-clock + # (run 26460686695): jwks 23m, intro_client 16m, auth_mgr 14m, + # jwt 7m dominate the budget. Greedy LPT balances each shard at + # ~21min suites + 2min build = ~23min total. + - shard: auth-jwks suites: >- - auth_foundation hrauth auth_mgr auth_fail auth_ws - jwt jwks oidc intro_client - router_async auth_observability + jwks auth_foundation hrauth + - shard: auth-mgr + suites: >- + auth_mgr jwt oidc router_async + - shard: auth-intro + suites: >- + intro_client auth_ws auth_fail auth_observability - shard: observability suites: >- obs_foundation obs_tracer obs_metrics obs_mgr obs_propagator @@ -101,6 +119,18 @@ jobs: - uses: actions/checkout@v4 - name: Install dependencies run: sudo apt-get update && sudo apt-get install -y libssl-dev valgrind + # Per DEVELOPMENT_RULES.md "CI workflow maintenance": every Linux + # CI job wraps the build in ccache-action with per-dimension keys. + # All 6 sharded matrix jobs build identical valgrind-flagged + # binaries, so they share the single `ccache-linux-valgrind` key. + # The sanitizer-class 500M cap matches the ASan/TSan jobs in + # ci.yml — valgrind builds carry similar debug-info bloat. + - name: ccache + uses: hendrikmuhs/ccache-action@v1.2 + with: + key: ccache-linux-valgrind + max-size: 500M + create-symlink: true - name: Build (debug + frame pointers — valgrind needs accurate stacks) run: | make -j$(nproc) \ diff --git a/include/connection_handler.h b/include/connection_handler.h index 239128d7..d2174902 100644 --- a/include/connection_handler.h +++ b/include/connection_handler.h @@ -142,6 +142,31 @@ class ConnectionHandler : public std::enable_shared_from_this // so the dtor can issue a matching -1 against the SAME labeled // series without per-connection string allocation. const char* http_protocol_label_ = nullptr; + // Stash slot for MarkApplicationProtocolConfirmed when it loses the + // race vs AttachTransportObservability (i.e. headers parsed on the + // worker thread before the accept dispatcher publishes the + // observability pointers). Late-running AttachTransportObservability + // exchanges this slot post-publish and EnQueues a replay on the + // worker dispatcher so the gauge bump still lands. + // + // Concurrent access model: BOTH threads read and write. + // - Worker: CAS(nullptr → label) on stash; exchange(nullptr) on take-back. + // - Accept dispatcher: exchange(nullptr) post-publish to drain. + // - Worker: exchange(nullptr) in HandOffToWebSocket to suppress the + // replay path for connections about to become WebSockets. + // Arbitration is via the exchange's total-order semantics on this + // single atomic — exactly one site observes a non-null value per + // stash event. + std::atomic pending_http_protocol_label_{nullptr}; + + // Set by HandOffToWebSocket to suppress any late EnQueued replay of + // MarkApplicationProtocolConfirmed: once the connection has been + // committed to WebSocket, the http/1.1 gauge MUST NOT be bumped + // even if a stash drained successfully and the replay was already + // queued. Both HandOffToWebSocket and the replay (via Mark) run on + // the same worker dispatcher, so program order on that thread plus + // release/acquire on this flag arbitrate the race. + std::atomic ws_handoff_done_{false}; OBSERVABILITY_NAMESPACE::UpDownCounter* http_active_counter_ = nullptr; OBSERVABILITY_NAMESPACE::Counter* tls_handshakes_counter_ = nullptr; public: @@ -209,6 +234,12 @@ class ConnectionHandler : public std::enable_shared_from_this read_pump_paused_.store(false, std::memory_order_release); close_on_resume_.store(false, std::memory_order_release); has_pending_reads_.store(false, std::memory_order_release); + // Inbound observability state — clear so a future code path + // that calls Mark / HandOff on a recycled outbound connection + // (today no such path exists) cannot inherit the prior + // borrower's stash or commitment flag. + pending_http_protocol_label_.store(nullptr, std::memory_order_release); + ws_handoff_done_.store(false, std::memory_order_release); } // Dispatcher-thread-only for reuse validation. size_t InputBufferSize() const { return input_bf_.Size(); } diff --git a/server/connection_handler.cc b/server/connection_handler.cc index 5b23114f..44837eb7 100644 --- a/server/connection_handler.cc +++ b/server/connection_handler.cc @@ -36,36 +36,144 @@ ConnectionHandler::~ConnectionHandler() { void ConnectionHandler::AttachTransportObservability( OBSERVABILITY_NAMESPACE::ObservabilityManager* mgr) { - if (net_active_incremented_) return; if (mgr == nullptr) return; - net_active_incremented_ = true; // set before side effects (mirrors bound_once_ pattern) - const auto& cat = mgr->catalog(); - net_active_counter_ = cat.reactor_net_connections_active; - net_accepted_counter_ = cat.reactor_net_connections_accepted; - http_active_counter_ = cat.reactor_http_connections_active; - tls_handshakes_counter_ = cat.reactor_tls_handshakes; - // Release-store: publishes the four preceding pointer writes to any - // socket-worker thread that later acquire-loads obs_attached_. Without - // this, the cross-thread happens-before edge from NetServer's EnQueue - // runs the wrong way (EnQueue happens BEFORE AttachTransportObservability - // in the accept-dispatcher call sequence) and the worker's read of - // http_active_counter_ races with this write. Plain bool writes / reads - // would be TSan-flagged even if benign on x86. - obs_attached_.store(true, std::memory_order_release); - if (net_active_counter_ != nullptr) net_active_counter_->Add(1.0, {}); - if (net_accepted_counter_ != nullptr) net_accepted_counter_->Add(1.0, {}); + + // First-call work: cache instrument pointers, publish via + // obs_attached_=true, and emit the accept-time +1s. Idempotent — + // a defensive second call short-circuits this block but still + // runs the stash drain below in case a worker stashed between + // the first and second invocation. + if (!net_active_incremented_) { + net_active_incremented_ = true; // set before side effects (mirrors bound_once_ pattern) + const auto& cat = mgr->catalog(); + net_active_counter_ = cat.reactor_net_connections_active; + net_accepted_counter_ = cat.reactor_net_connections_accepted; + http_active_counter_ = cat.reactor_http_connections_active; + tls_handshakes_counter_ = cat.reactor_tls_handshakes; + // Release-store: publishes the four preceding pointer writes to any + // socket-worker thread that later acquire-loads obs_attached_. Without + // this, the cross-thread happens-before edge from NetServer's EnQueue + // runs the wrong way (EnQueue happens BEFORE AttachTransportObservability + // in the accept-dispatcher call sequence) and the worker's read of + // http_active_counter_ races with this write. Plain bool writes / reads + // would be TSan-flagged even if benign on x86. + obs_attached_.store(true, std::memory_order_release); + if (net_active_counter_ != nullptr) net_active_counter_->Add(1.0, {}); + if (net_accepted_counter_ != nullptr) net_accepted_counter_->Add(1.0, {}); + } + + // Drain any protocol label the worker stashed before obs_attached_ + // became visible. The exchange MUST happen AFTER the release-store + // above so a worker reaching its post-stash recheck observes + // obs_attached_=true and arbitrates correctly. Replay on the worker + // dispatcher to keep http_protocol_label_ single-writer (race-free + // vs concurrent MarkApplicationProtocolConfirmed / HandOffToWebSocket + // calls that also run on the worker). + // + // Pre-load shared_from_this() BEFORE the exchange so a throw + // (`std::bad_weak_ptr` if the connection isn't shared_ptr-managed) + // does not consume the stash with no rollback. If the lookup + // succeeds AND we can queue, only then do we take ownership of the + // label via exchange. + std::weak_ptr weak_self; + try { + weak_self = shared_from_this(); + } catch (const std::bad_weak_ptr&) { + // Caller invoked attach without owning the connection via a + // shared_ptr. The stash drain is best-effort; leave the label + // in the slot so a future attach call (or no one) sees it. + // Both branches (no drain / late drain) are race-correct. + logging::Get()->warn( + "AttachTransportObservability: shared_from_this() failed; " + "stash drain skipped fd={}", fd()); + return; + } + if (!event_dispatcher_) { + logging::Get()->warn( + "AttachTransportObservability: event_dispatcher_ is null; " + "stash drain skipped fd={}", fd()); + return; + } + const char* stashed = pending_http_protocol_label_.exchange( + nullptr, std::memory_order_acquire); + if (stashed == nullptr) return; + event_dispatcher_->EnQueue([weak_self, stashed]() { + if (auto self = weak_self.lock()) { + // MarkApplicationProtocolConfirmed's `ws_handoff_done_` + // check at the top suppresses this publish if a + // HandOffToWebSocket happened on this worker between the + // exchange above and this lambda firing — otherwise we'd + // bump http/1.1 for a connection that is now a WebSocket. + self->MarkApplicationProtocolConfirmed(stashed); + } + }); } void ConnectionHandler::MarkApplicationProtocolConfirmed( const char* protocol_label) { if (protocol_label == nullptr) return; + // Suppress publish once the connection has been committed to + // WebSocket — a late EnQueued replay arriving after the handoff + // would otherwise +1 the http/1.1 gauge for a connection that is + // already a WebSocket. HandOffToWebSocket sets ws_handoff_done_ + // before doing anything else; both it and this function run on the + // same worker dispatcher, so the acquire-load here pairs with the + // release-store there. + if (ws_handoff_done_.load(std::memory_order_acquire)) return; // Acquire-load — pairs with AttachTransportObservability's release- // store so the pointer fields below are visible on this thread. - // When false, AttachTransportObservability never ran (observability - // disabled OR accept-time call hadn't completed yet); the counter - // pointers stay nullptr semantics — short-circuit to keep behavior - // identical to "no observability". - if (!obs_attached_.load(std::memory_order_acquire)) return; + // When false, the accept-dispatcher's attach has not finished + // publishing the counter pointers. Returning early would silently + // drop the gauge bump on fast-parsed connections where the worker + // outpaces the accept dispatcher (issue #43). Stash the label so + // attach replays it post-publish; recheck + take-back covers the + // window where attach completed between our load and our stash. + if (!obs_attached_.load(std::memory_order_acquire)) { + const char* expected = nullptr; + if (!pending_http_protocol_label_.compare_exchange_strong( + expected, protocol_label, + std::memory_order_release, + std::memory_order_relaxed)) { + // Another stash already in flight; first wins. Today the + // invariant is one Mark call per connection (H1 keep-alive + // re-confirms use the same literal), so `expected` always + // equals `protocol_label` here. A different literal means a + // future code path is racing two distinct protocol labels — + // a real bug surface; log so it doesn't fail silently. + if (expected != nullptr && protocol_label != expected) { + logging::Get()->error( + "MarkApplicationProtocolConfirmed CAS-fail with label " + "mismatch — stashed={}, dropped={}, fd={}", + expected, protocol_label, fd()); + } + return; + } + // Race recheck: if attach's exchange already ran (and got nullptr) + // before our stash landed, it won't run again and won't see our + // stash. Take it back and publish inline on this worker thread. + if (!obs_attached_.load(std::memory_order_acquire)) { + return; // attach hasn't run — its post-store exchange will pick this up + } + const char* taken = pending_http_protocol_label_.exchange( + nullptr, std::memory_order_acquire); + if (taken == nullptr) { + return; // attach grabbed our stash; replay will fire via EnQueue + } + // taken must equal protocol_label: pending_http_protocol_label_ is + // single-writer-via-CAS (this function) plus drain-readers + // (Attach and HandOffToWebSocket); none of the drainers write a + // non-null value back. If a future feature adds another writer, + // log the divergence so the silent override is observable. + if (taken != protocol_label) { + logging::Get()->error( + "MarkApplicationProtocolConfirmed take-back saw foreign " + "label — taken={}, expected={}, fd={}", + taken, protocol_label, fd()); + } + protocol_label = taken; + // Fall through to publish inline (we're on the worker thread, so + // http_protocol_label_ writes below are single-writer). + } if (http_protocol_label_ != nullptr) { // Same-label re-confirm is normal: H1 keep-alive calls this on every // headers_complete. Different label is a real bug (protocol change @@ -85,6 +193,18 @@ void ConnectionHandler::MarkApplicationProtocolConfirmed( } void ConnectionHandler::HandOffToWebSocket() { + // Suppress any late-arriving Mark replay before doing anything else. + // Set the flag FIRST so a racing MarkApplicationProtocolConfirmed + // (running on this same worker, serialized through the dispatcher + // loop, so 'racing' is really 'queued behind us') observes the + // commitment-to-WS via acquire-load and short-circuits. + ws_handoff_done_.store(true, std::memory_order_release); + // Also drain the pending stash so a not-yet-run Attach skips its + // EnQueue path. Best-effort optimisation — if Attach has already + // taken the stash and queued the replay, the ws_handoff_done_ check + // at the top of MarkApplicationProtocolConfirmed catches it. + pending_http_protocol_label_.exchange(nullptr, std::memory_order_acq_rel); + // Acquire-load — same publication contract as MarkApplicationProtocolConfirmed. if (!obs_attached_.load(std::memory_order_acquire)) return; if (http_protocol_label_ == nullptr) return; From 7d921e372300fd9b72be5e186deedaf18bba61ca Mon Sep 17 00:00:00 2001 From: mwfj Date: Wed, 27 May 2026 16:50:54 +0800 Subject: [PATCH 3/3] Fix review comment --- server/http/http_router.cc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/server/http/http_router.cc b/server/http/http_router.cc index 59581c9b..2d23850a 100644 --- a/server/http/http_router.cc +++ b/server/http/http_router.cc @@ -93,6 +93,20 @@ void AsyncPendingState::ArmResume( cb_to_fire = std::move(resume_cb_); payload_to_fire = std::move(result_slot_); completion_pending_ = false; + } else if (cancelled_.load(std::memory_order_relaxed)) { + // TripCancel ran BEFORE us — at that point resume_cb_ was + // empty so TripCancel's own clear was a no-op, and no + // Complete is expected to follow (the upstream cancel + // propagation purges queued work silently per the comment + // in TripCancel). Without this clear, the resume_cb_→state + // self-cycle the move-on-fire pattern would otherwise + // resolve stays intact, leaking the state plus any + // captured Span / IssueTraceContext. Bookkeeping is + // already covered by the decrement_owed_ hand-off above + // (TripCancel sets it when active_counter_ wasn't wired; + // ArmResume consumes it on wire-in), so dropping + // resume_cb_ here does NOT leak active_requests_. + resume_cb_ = nullptr; } } if (consume_now) {