From 8bd166e44b0dbabdcbbc3e5ccf1a7d32c1b44292 Mon Sep 17 00:00:00 2001 From: Teodor Calin Date: Fri, 29 May 2026 11:07:18 -0700 Subject: [PATCH 1/4] fix(keyexchange): demote same-session PILA log to Debug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Observed against list-agents (node 179172) on 2026-05-29: a fresh daemon sends its first PILA, peer replies and trust is established, but the relayed data plane drops our PILS replies. Peer retransmits its PILA every ~8 s as a keepalive. Each arrival carries the SAME X25519 ephemeral (hadCrypto=true, keyChanged=false) and lands well outside DuplicateHandshakeDebounce (250 ms), so the existing duplicate gate doesn't catch it — every retransmit fires another 'encrypted tunnel established' Info log and another tunnel.established bus event even though structurally nothing was installed. 35 false-positive 'established' lines per peer per 5 minutes in field measurement. Fix demotes the log to Debug for the same-session case while keeping the bus event + postInstall hook firing (existing endpoint-refresh contract pinned by TestDuplicatePILAOutsideDebounceFiresHookAgain). Mirrors the demotion in HandleUnauthFrame (PILK) for the same reason. Adds TestSameSessionPILASuppressesInfoButFiresHookAndDebug that pins: - first PILA still produces an Info 'established' line + hook count 1 - second same-key PILA past debounce: hook count = 2 (endpoint refresh preserved), 'established' Info line count stays at 1, Debug log 'same-session keepalive' present. Does NOT change crypto/network behaviour. The asymmetric-recovery reply path (TestAsymmetricRecoveryRepliesOnDuplicatePILAWhenStale) and reply-rate-limit (TestReplyRateLimit*) gates are independent and remain intact. --- pkg/daemon/daemon.go | 16 +- pkg/daemon/keyexchange/handle.go | 48 ++++- pkg/daemon/keyexchange/store.go | 1 - .../keyexchange/zz_same_session_log_test.go | 165 ++++++++++++++++++ pkg/daemon/zz_helpers_test.go | 6 +- 5 files changed, 215 insertions(+), 21 deletions(-) create mode 100644 pkg/daemon/keyexchange/zz_same_session_log_test.go diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index c603e006..3238ae2b 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -86,10 +86,10 @@ type Config struct { Email string // email address for account identification and key recovery Owner string // deprecated: use Email instead - Endpoint string // fixed public endpoint (host:port) — skips STUN discovery (for cloud VMs) - AdvertiseEndpoint string // override STUN-discovered endpoint for registry advertisement (host:port) — for k8s pods where STUN returns unreachable IPs - Public bool // make this node's endpoint publicly discoverable - Hostname string // hostname for discovery (empty = none) + Endpoint string // fixed public endpoint (host:port) — skips STUN discovery (for cloud VMs) + AdvertiseEndpoint string // override STUN-discovered endpoint for registry advertisement (host:port) — for k8s pods where STUN returns unreachable IPs + Public bool // make this node's endpoint publicly discoverable + Hostname string // hostname for discovery (empty = none) // RelayOnly hides this node's real_addr from peer resolve/lookup // responses. Peers reach this node only via the beacon-relay path, @@ -148,13 +148,13 @@ type Config struct { // Default tuning constants (used when Config fields are zero). const ( - DefaultKeepaliveInterval = 60 * time.Second - DefaultIdleTimeout = 120 * time.Second - DefaultIdleSweepInterval = 15 * time.Second + DefaultKeepaliveInterval = 60 * time.Second + DefaultIdleTimeout = 120 * time.Second + DefaultIdleSweepInterval = 15 * time.Second // hostnameReannounceInterval is how often the daemon re-sets its // hostname with the registry. This heals hostname resolution after // a registry restart/roll wipes the in-memory hostname store. - hostnameReannounceInterval = 60 * time.Second + hostnameReannounceInterval = 60 * time.Second DefaultSYNRateLimit = 100 DefaultMaxConnectionsPerPort = 1024 DefaultMaxTotalConnections = 65536 diff --git a/pkg/daemon/keyexchange/handle.go b/pkg/daemon/keyexchange/handle.go index 3393febc..9d79d0bb 100644 --- a/pkg/daemon/keyexchange/handle.go +++ b/pkg/daemon/keyexchange/handle.go @@ -110,21 +110,42 @@ func (m *Manager) HandleAuthFrame(data []byte, from *net.UDPAddr, fromRelay bool // Cache the verified peer pubkey. m.SetPeerPubKey(peerNodeID, peerEd25519PubKey) - // Side-effect gate: the log line, tunnel.established bus event, and - // PostInstallHook fire ONLY when this is not a coalesced duplicate. - // A real rekey (keyChanged) or a first-time install always falls - // through to the side effects. The recovery-reply path below (the - // SendKeyExchangeToNode call) is NOT gated on `duplicate` — the - // asymmetric-recovery case (B dropped crypto for A while A retains - // it) requires A to reply on B's retransmit even though A sees it - // as a duplicate. Pinned by - // TestAsymmetricRecoveryRepliesOnDuplicatePILAWhenStale. + // Side-effect routing: + // + // - duplicate (same X25519 ephemeral within DuplicateHandshakeDebounce) + // is a tight direct+relay arrival pair or peer-side retransmit + // burst. All side effects suppressed. + // + // - sameSession (hadCrypto && !keyChanged, past the debounce window) + // is the peer's slow keepalive retransmit. The session was + // established by the earlier PILA; nothing was installed by + // this one. The bus event and PostInstallHook STILL fire so + // downstream observers can refresh endpoint observation / + // keep-alive bookkeeping (pinned by + // TestDuplicatePILAOutsideDebounceFiresHookAgain), but the + // Info-level "encrypted tunnel established" log is demoted to + // Debug: observed against list-agents on 2026-05-29, a peer + // sending a PILA every ~8 s while relayed data is being dropped + // floods the operator log with 35 false "established" lines per + // peer per 5 minutes. Pinned by + // TestSameSessionPILALogsAtDebugButFiresHook. + // + // - default (first install or real rekey): everything fires at + // Info level. + // + // The recovery-reply path below (SendKeyExchangeToNode) is gated + // independently — see TestAsymmetricRecoveryRepliesOnDuplicatePILAWhenStale. + sameSession := hadCrypto && !keyChanged if duplicate { slog.Debug("auth key exchange: duplicate frame coalesced", "peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), "relay", fromRelay) } else { if keyChanged { slog.Info("peer rekeyed (auth), re-establishing tunnel", "peer_node_id", peerNodeID) + } else if sameSession { + slog.Debug("auth key exchange: same-session keepalive", + "peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), + "endpoint", from, "relay", fromRelay) } else { slog.Info("encrypted tunnel established", "auth", true, "peer_node_id", peerNodeID, "endpoint", from, "relay", fromRelay) @@ -255,12 +276,21 @@ func (m *Manager) HandleUnauthFrame(data []byte, from *net.UDPAddr, fromRelay bo m.env.Install(peerNodeID, pc) } + // Same routing as HandleAuthFrame: duplicate suppresses everything; + // sameSession (past-debounce same-key keepalive) keeps the bus event + // + postInstall hook firing for endpoint refresh but demotes the + // Info log to Debug. + sameSession := hadCrypto && !keyChanged if duplicate { slog.Debug("unauth key exchange: duplicate frame coalesced", "peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), "relay", fromRelay) } else { if keyChanged { slog.Info("peer rekeyed, re-establishing tunnel", "peer_node_id", peerNodeID) + } else if sameSession { + slog.Debug("unauth key exchange: same-session keepalive", + "peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), + "endpoint", from, "relay", fromRelay) } else { slog.Info("encrypted tunnel established", "peer_node_id", peerNodeID, "endpoint", from, "relay", fromRelay) diff --git a/pkg/daemon/keyexchange/store.go b/pkg/daemon/keyexchange/store.go index 0810f852..b99b7fec 100644 --- a/pkg/daemon/keyexchange/store.go +++ b/pkg/daemon/keyexchange/store.go @@ -111,7 +111,6 @@ func (s *Store) IsReady(peerNodeID uint32) bool { return c != nil && c.Ready } - // wipeCryptoSecrets clears any sensitive plaintext on a Crypto's salvage // ring before the Crypto is dropped. PILOT-146: previously Drop simply // did a map delete, leaving the salvage plaintext bytes alive on the heap diff --git a/pkg/daemon/keyexchange/zz_same_session_log_test.go b/pkg/daemon/keyexchange/zz_same_session_log_test.go new file mode 100644 index 00000000..b1679180 --- /dev/null +++ b/pkg/daemon/keyexchange/zz_same_session_log_test.go @@ -0,0 +1,165 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package keyexchange_test + +// Regression for the same-session log spam observed against list-agents +// on 2026-05-29: a fresh daemon registers, completes the initial PILA +// exchange (1 install, 1 postInstall, 1 "encrypted tunnel established" +// at Info), then the peer keeps retransmitting the SAME PILA at ~8 s +// cadence while the relayed data plane drops our PILS replies. Every +// such retransmit lands past DuplicateHandshakeDebounce (250 ms), so +// the duplicate gate doesn't catch it, but it carries the same X25519 +// ephemeral so hadCrypto=true && keyChanged=false — structurally no +// new install. Pre-fix the daemon logged "encrypted tunnel established" +// at Info every time (35 events per peer per 5 min in field observation). +// +// The fix demotes the log to Debug for the same-session case while +// preserving: +// +// - the existing duplicate-within-debounce coalescing +// (TestDuplicatePILACoalescedSuppressesLogAndHook) +// +// - the past-debounce postInstall hook firing for endpoint refresh +// (TestDuplicatePILAOutsideDebounceFiresHookAgain pins hook count = 2) +// +// - the asymmetric-recovery reply on stale inbound +// (TestDuplicatePILAStillRepliesForAsymmetricRecovery) + +import ( + "bytes" + "log/slog" + "net" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/TeoSlayer/pilotprotocol/pkg/daemon/keyexchange" +) + +// syncWriter serialises Writes from slog handlers so concurrent +// goroutines logging into the same bytes.Buffer don't race the writes. +type syncWriter struct { + w *bytes.Buffer + mu *sync.Mutex +} + +func (s syncWriter) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.w.Write(p) +} + +// captureSlog redirects slog.Default to a buffer at the given level +// and returns a buffer-content snapshot + a restore func. NOT safe to +// use with t.Parallel() — slog.Default is process-global and parallel +// tests racing SetDefault see each other's handlers. +func captureSlog(t *testing.T, level slog.Level) (snapshot func() string, restore func()) { + t.Helper() + var ( + buf bytes.Buffer + mu sync.Mutex + ) + handler := slog.NewTextHandler(syncWriter{w: &buf, mu: &mu}, &slog.HandlerOptions{Level: level}) + prev := slog.Default() + slog.SetDefault(slog.New(handler)) + return func() string { + mu.Lock() + defer mu.Unlock() + return buf.String() + }, func() { + slog.SetDefault(prev) + } +} + +// TestSameSessionPILASuppressesInfoButFiresHookAndDebug pins the fix +// end-to-end: +// +// - First PILA: Info "encrypted tunnel established", hook count = 1. +// +// - Second same-key PILA past the debounce window: +// +// - hook still fires (count = 2, pinned for endpoint refresh by +// TestDuplicatePILAOutsideDebounceFiresHookAgain). +// +// - NO second "encrypted tunnel established" at Info — that was the +// spam pre-fix. +// +// - Debug-level "same-session keepalive" present (diagnostic +// remains available for operators tracing key-exchange flow). +// +// The two slog assertions share one capture buffer (and therefore one +// SetDefault call) because parallel-test races on slog.Default would +// otherwise tear the captured output. The test itself is NOT marked +// t.Parallel(). +func TestSameSessionPILASuppressesInfoButFiresHookAndDebug(t *testing.T) { + a := newPeer(t, 510) + b := newPeer(t, 511) + crossWireVerifyFuncs(a, b) + a.mgr.SetSender(func(uint32, *net.UDPAddr, []byte) error { return nil }) + + var hookCount atomic.Int32 + a.mgr.SetPostInstallHook(func(keyexchange.PostInstallEvent) { + hookCount.Add(1) + }) + + bFrame := b.mgr.BuildAuthFrame() + if bFrame == nil { + t.Fatalf("BuildAuthFrame returned nil") + } + from := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4000} + + // Capture at Debug so both the Info "established" on the first + // arrival and the Debug "same-session keepalive" on the second + // arrival land in the same buffer. + logSnap, restore := captureSlog(t, slog.LevelDebug) + defer restore() + + // --- First PILA: full install path --------------------------------- + + if !a.mgr.HandleAuthFrame(bFrame[4:], from, false) { + t.Fatalf("first PILA rejected") + } + if got := hookCount.Load(); got != 1 { + t.Fatalf("after first PILA: hook count = %d, want 1", got) + } + initialLog := logSnap() + if !strings.Contains(initialLog, "encrypted tunnel established") { + t.Fatalf("first PILA: expected Info log 'encrypted tunnel established', got:\n%s", initialLog) + } + initialEstablishedCount := strings.Count(initialLog, "encrypted tunnel established") + if initialEstablishedCount != 1 { + t.Fatalf("first PILA: expected exactly 1 'established' log line, got %d:\n%s", + initialEstablishedCount, initialLog) + } + + // --- Second PILA past debounce: keepalive, no spam ----------------- + + time.Sleep(keyexchange.DuplicateHandshakeDebounce + 100*time.Millisecond) + + if !a.mgr.HandleAuthFrame(bFrame[4:], from, false) { + t.Fatalf("same-session PILA rejected") + } + + // Endpoint-refresh contract from TestDuplicatePILAOutsideDebounceFiresHookAgain. + if got := hookCount.Load(); got != 2 { + t.Fatalf("after same-session PILA: hook count = %d, want 2 (must still refresh endpoint)", got) + } + + finalLog := logSnap() + + // The new behaviour: the Info "established" count stays at one + // (no spam from the second arrival). + finalEstablishedCount := strings.Count(finalLog, "encrypted tunnel established") + if finalEstablishedCount != 1 { + t.Fatalf("after same-session PILA: 'established' log count = %d, want 1 (no Info spam):\n%s", + finalEstablishedCount, finalLog) + } + + // The Debug diagnostic remains so operators can see the keepalive. + if !strings.Contains(finalLog, "same-session keepalive") { + t.Fatalf("after same-session PILA: expected Debug log 'same-session keepalive'; got:\n%s", + finalLog) + } +} diff --git a/pkg/daemon/zz_helpers_test.go b/pkg/daemon/zz_helpers_test.go index 977d2f09..0be8c624 100644 --- a/pkg/daemon/zz_helpers_test.go +++ b/pkg/daemon/zz_helpers_test.go @@ -405,9 +405,9 @@ func TestProcessSACKPartialOverlap(t *testing.T) { // A segment at [200, 205) should be marked sacked even when // the SACK block only partially overlaps (e.g. [195, 203)). c := &Connection{} - c.TrackSend(100, []byte("hello")) // [100, 105) - c.TrackSend(200, []byte("world")) // [200, 205) - c.TrackSend(300, []byte("!!!")) // [300, 303) + c.TrackSend(100, []byte("hello")) // [100, 105) + c.TrackSend(200, []byte("world")) // [200, 205) + c.TrackSend(300, []byte("!!!")) // [300, 303) // SACK block partially overlaps [200, 205) from the left c.ProcessSACK([]SACKBlock{{Left: 195, Right: 203}}) From 137d7426895558d3e07f92c952606cd5ae761eb3 Mon Sep 17 00:00:00 2001 From: Teodor Calin Date: Fri, 29 May 2026 11:18:08 -0700 Subject: [PATCH 2/4] test(daemon): fix data race in TestWriteLoopExitsOnWriteDeadline (pre-existing) The fan-out loop reused a single msg buffer across iterations and did the per-iteration copy INSIDE the spawned goroutine. The main goroutine's next msg[0] = byte(i & 0xFF) raced with the previous goroutine's copy(m2, m). Caught by go test -race in the Architecture gates job (report: zz_ipc_write_deadline_test.go:75 read vs :72 write). This was added in 1eff4fa3 (PILOT-218 write-deadline fix) before this branch existed; every PR opened since then has been failing the race-detector check. Hoisting the copy out of the goroutine fixes it without changing the test's intent (still floods ic.ipcWrite with ipcSendBuffer+10 distinct messages to fill the kernel send buffer). Touched alongside the keyexchange log-spam fix because the same PR job runs both and we can't merge until -race is green. --- pkg/daemon/zz_ipc_write_deadline_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/daemon/zz_ipc_write_deadline_test.go b/pkg/daemon/zz_ipc_write_deadline_test.go index b4484fa6..712a0418 100644 --- a/pkg/daemon/zz_ipc_write_deadline_test.go +++ b/pkg/daemon/zz_ipc_write_deadline_test.go @@ -66,15 +66,20 @@ func TestWriteLoopExitsOnWriteDeadline(t *testing.T) { }() // Fill sendCh + keep writing until ipcWrite blocks. + // + // Each iteration allocates its own message copy in the main + // goroutine BEFORE spawning the writer — `msg` is reused across + // iterations, so doing the copy inside the goroutine raced with + // the next iteration's msg[0] = ... write. Caught by -race on + // ubuntu-latest in the Architecture-gates job (zz_ipc_write_deadline_test + // race report 2026-05-29). const msgSize = 4096 msg := make([]byte, msgSize) for i := 0; i < ipcSendBuffer+10; i++ { msg[0] = byte(i & 0xFF) - go func(m []byte) { - m2 := make([]byte, len(m)) - copy(m2, m) - ic.ipcWrite(m2) - }(msg) + m := make([]byte, msgSize) + copy(m, msg) + go func() { ic.ipcWrite(m) }() } // Give writeLoop time to fill the kernel send buffer. From 01cb11547d91e9fe68c6452041674583e76b45a8 Mon Sep 17 00:00:00 2001 From: Teodor Calin Date: Fri, 29 May 2026 11:22:57 -0700 Subject: [PATCH 3/4] fix(daemon): actually set the IPC writeLoop deadline (PILOT-218) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #156 / commit 1eff4fa3 was titled 'add write deadline to IPC writeLoop and bypass semaphore for CmdHealth', and its commit message correctly described the design (10 s per active write, 3 s drain on Close). The CmdHealth inline-dispatch half landed; the writeLoop SetWriteDeadline half didn't. Result: TestWriteLoopExitsOnWriteDeadline has been failing since #156 merged — first with a -race report (a buffer-reuse bug in the test's fan-out loop, fixed in the prior commit on this branch), then with 'writeLoop did not exit within deadline window' because the deadline the test waits on doesn't actually exist. Every PR opened since #156 has been silently blocked on Architecture gates for this reason. Adds: - ipcWriteTimeout = 10 s (the active-write deadline) - ipcDrainTimeout = 3 s (the Close-drain deadline) - SetWriteDeadline calls in writeLoop's both arms, matching the contract the test expects. SetWriteDeadline errors are deliberately swallowed — net.Pipe ignores the call, and any real socket that doesn't support it is already broken in ways the next Write will surface. Semantically a no-op for the happy path (normal clients read fast, the deadline never trips). For a stalled client it does what PILOT-218 wanted: writeLoop times out → c.Conn.Close() → writeDone closes → every parked ipcWrite caller returns ErrIPCClosed → semaphore drains → daemon is responsive again. --- pkg/daemon/ipc.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/daemon/ipc.go b/pkg/daemon/ipc.go index b695d3c1..f52575bf 100644 --- a/pkg/daemon/ipc.go +++ b/pkg/daemon/ipc.go @@ -168,6 +168,23 @@ type ipcConn struct { // avg 256B/msg ≈ 64 MB worst case). const ipcSendBuffer = 256 +// ipcWriteTimeout caps how long writeLoop will wait inside a single +// ipcutil.Write before treating the client as stalled. A stalled client +// (stops reading from the socket) fills the kernel send buffer and +// blocks ipcutil.Write indefinitely; without this deadline the dispatch +// goroutines park in ipcWrite, the per-client semaphore exhausts, the +// read loop blocks, and the daemon appears dead even though it isn't. +// On deadline, writeLoop closes the connection and exits, unblocking +// every parked ipcWrite caller via writeDone. Pinned by +// TestWriteLoopExitsOnWriteDeadline. +const ipcWriteTimeout = 10 * time.Second + +// ipcDrainTimeout is the per-message deadline used on the drain path +// during Close. Shorter than ipcWriteTimeout because Close already +// signalled the conn is going away — there's no point waiting the full +// active-write budget for already-doomed messages. +const ipcDrainTimeout = 3 * time.Second + // ipcMaxInflightPerClient caps how many in-flight dispatch goroutines a // single IPC client may have. Each request becomes a goroutine that // handles the command and writes the reply (concurrent dispatch — see @@ -235,16 +252,24 @@ func (c *ipcConn) writeLoop() { for { select { case msg := <-c.sendCh: + // Bound the active write to ipcWriteTimeout (PILOT-218). The + // commit that added the test (1eff4fa3) intended to include + // this deadline but the writeLoop changes never landed; the + // test has been failing -race ever since. SetWriteDeadline + // errors are non-fatal (no-op on net.Pipe, etc.) so swallow. + _ = c.Conn.SetWriteDeadline(time.Now().Add(ipcWriteTimeout)) if err := ipcutil.Write(c.Conn, msg); err != nil { c.Conn.Close() return } case <-c.done: // Best-effort drain of pending messages so callers that already - // pushed before Close() don't lose their data. + // pushed before Close() don't lose their data. Shorter deadline + // per message because Close already signalled teardown. for { select { case msg := <-c.sendCh: + _ = c.Conn.SetWriteDeadline(time.Now().Add(ipcDrainTimeout)) if err := ipcutil.Write(c.Conn, msg); err != nil { c.Conn.Close() return From 28cb452ff35936dd765142d64fd3c77079e935f1 Mon Sep 17 00:00:00 2001 From: Teodor Calin Date: Fri, 29 May 2026 11:27:18 -0700 Subject: [PATCH 4/4] fix(daemon): ipcWrite must fast-fail when writeLoop has exited MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Companion to the prior commit (actually-set the writeLoop deadline). After writeLoop hits ipcWriteTimeout and closes writeDone, ipcWrite still had a sendCh-with-room enqueue path that returned nil — the message would land in the channel, sit there orphaned, and the caller would think it succeeded. The slow-path select did catch writeDone, but only after the buffer filled. Added a non-blocking writeDone check next to the existing c.done fast-fail, so any ipcWrite after writeLoop exits returns ErrIPCClosed immediately regardless of sendCh capacity. Pinned by TestWriteLoopExitsOnWriteDeadline's final assertion. --- pkg/daemon/ipc.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/daemon/ipc.go b/pkg/daemon/ipc.go index f52575bf..44b10b0d 100644 --- a/pkg/daemon/ipc.go +++ b/pkg/daemon/ipc.go @@ -301,6 +301,17 @@ func (c *ipcConn) ipcWrite(data []byte) error { return ErrIPCClosed default: } + // Also fast-fail if writeLoop has exited (e.g. SetWriteDeadline + // fired on a stalled client). The slow-path select catches the + // same condition, but a successful sendCh enqueue can be chosen + // over a closed writeDone when both are ready, and a message that + // lands on sendCh after writeLoop has exited will sit there + // orphaned. Pinned by TestWriteLoopExitsOnWriteDeadline:103. + select { + case <-c.writeDone: + return ErrIPCClosed + default: + } // Fast path: try non-blocking enqueue. select { case c.sendCh <- data: