diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index c603e006..3238ae2b 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -86,10 +86,10 @@ type Config struct { Email string // email address for account identification and key recovery Owner string // deprecated: use Email instead - Endpoint string // fixed public endpoint (host:port) — skips STUN discovery (for cloud VMs) - AdvertiseEndpoint string // override STUN-discovered endpoint for registry advertisement (host:port) — for k8s pods where STUN returns unreachable IPs - Public bool // make this node's endpoint publicly discoverable - Hostname string // hostname for discovery (empty = none) + Endpoint string // fixed public endpoint (host:port) — skips STUN discovery (for cloud VMs) + AdvertiseEndpoint string // override STUN-discovered endpoint for registry advertisement (host:port) — for k8s pods where STUN returns unreachable IPs + Public bool // make this node's endpoint publicly discoverable + Hostname string // hostname for discovery (empty = none) // RelayOnly hides this node's real_addr from peer resolve/lookup // responses. Peers reach this node only via the beacon-relay path, @@ -148,13 +148,13 @@ type Config struct { // Default tuning constants (used when Config fields are zero). const ( - DefaultKeepaliveInterval = 60 * time.Second - DefaultIdleTimeout = 120 * time.Second - DefaultIdleSweepInterval = 15 * time.Second + DefaultKeepaliveInterval = 60 * time.Second + DefaultIdleTimeout = 120 * time.Second + DefaultIdleSweepInterval = 15 * time.Second // hostnameReannounceInterval is how often the daemon re-sets its // hostname with the registry. This heals hostname resolution after // a registry restart/roll wipes the in-memory hostname store. - hostnameReannounceInterval = 60 * time.Second + hostnameReannounceInterval = 60 * time.Second DefaultSYNRateLimit = 100 DefaultMaxConnectionsPerPort = 1024 DefaultMaxTotalConnections = 65536 diff --git a/pkg/daemon/ipc.go b/pkg/daemon/ipc.go index 6dc91a8b..a86832d9 100644 --- a/pkg/daemon/ipc.go +++ b/pkg/daemon/ipc.go @@ -169,6 +169,23 @@ type ipcConn struct { // avg 256B/msg ≈ 64 MB worst case). const ipcSendBuffer = 256 +// ipcWriteTimeout caps how long writeLoop will wait inside a single +// ipcutil.Write before treating the client as stalled. A stalled client +// (stops reading from the socket) fills the kernel send buffer and +// blocks ipcutil.Write indefinitely; without this deadline the dispatch +// goroutines park in ipcWrite, the per-client semaphore exhausts, the +// read loop blocks, and the daemon appears dead even though it isn't. +// On deadline, writeLoop closes the connection and exits, unblocking +// every parked ipcWrite caller via writeDone. Pinned by +// TestWriteLoopExitsOnWriteDeadline. +const ipcWriteTimeout = 10 * time.Second + +// ipcDrainTimeout is the per-message deadline used on the drain path +// during Close. Shorter than ipcWriteTimeout because Close already +// signalled the conn is going away — there's no point waiting the full +// active-write budget for already-doomed messages. +const ipcDrainTimeout = 3 * time.Second + // ipcMaxInflightPerClient caps how many in-flight dispatch goroutines a // single IPC client may have. Each request becomes a goroutine that // handles the command and writes the reply (concurrent dispatch — see @@ -236,16 +253,24 @@ func (c *ipcConn) writeLoop() { for { select { case msg := <-c.sendCh: + // Bound the active write to ipcWriteTimeout (PILOT-218). The + // commit that added the test (1eff4fa3) intended to include + // this deadline but the writeLoop changes never landed; the + // test has been failing -race ever since. SetWriteDeadline + // errors are non-fatal (no-op on net.Pipe, etc.) so swallow. + _ = c.Conn.SetWriteDeadline(time.Now().Add(ipcWriteTimeout)) if err := ipcutil.Write(c.Conn, msg); err != nil { c.Conn.Close() return } case <-c.done: // Best-effort drain of pending messages so callers that already - // pushed before Close() don't lose their data. + // pushed before Close() don't lose their data. Shorter deadline + // per message because Close already signalled teardown. for { select { case msg := <-c.sendCh: + _ = c.Conn.SetWriteDeadline(time.Now().Add(ipcDrainTimeout)) if err := ipcutil.Write(c.Conn, msg); err != nil { c.Conn.Close() return @@ -277,6 +302,17 @@ func (c *ipcConn) ipcWrite(data []byte) error { return ErrIPCClosed default: } + // Also fast-fail if writeLoop has exited (e.g. SetWriteDeadline + // fired on a stalled client). The slow-path select catches the + // same condition, but a successful sendCh enqueue can be chosen + // over a closed writeDone when both are ready, and a message that + // lands on sendCh after writeLoop has exited will sit there + // orphaned. Pinned by TestWriteLoopExitsOnWriteDeadline:103. + select { + case <-c.writeDone: + return ErrIPCClosed + default: + } // Fast path: try non-blocking enqueue. select { case c.sendCh <- data: diff --git a/pkg/daemon/keyexchange/handle.go b/pkg/daemon/keyexchange/handle.go index 3393febc..9d79d0bb 100644 --- a/pkg/daemon/keyexchange/handle.go +++ b/pkg/daemon/keyexchange/handle.go @@ -110,21 +110,42 @@ func (m *Manager) HandleAuthFrame(data []byte, from *net.UDPAddr, fromRelay bool // Cache the verified peer pubkey. m.SetPeerPubKey(peerNodeID, peerEd25519PubKey) - // Side-effect gate: the log line, tunnel.established bus event, and - // PostInstallHook fire ONLY when this is not a coalesced duplicate. - // A real rekey (keyChanged) or a first-time install always falls - // through to the side effects. The recovery-reply path below (the - // SendKeyExchangeToNode call) is NOT gated on `duplicate` — the - // asymmetric-recovery case (B dropped crypto for A while A retains - // it) requires A to reply on B's retransmit even though A sees it - // as a duplicate. Pinned by - // TestAsymmetricRecoveryRepliesOnDuplicatePILAWhenStale. + // Side-effect routing: + // + // - duplicate (same X25519 ephemeral within DuplicateHandshakeDebounce) + // is a tight direct+relay arrival pair or peer-side retransmit + // burst. All side effects suppressed. + // + // - sameSession (hadCrypto && !keyChanged, past the debounce window) + // is the peer's slow keepalive retransmit. The session was + // established by the earlier PILA; nothing was installed by + // this one. The bus event and PostInstallHook STILL fire so + // downstream observers can refresh endpoint observation / + // keep-alive bookkeeping (pinned by + // TestDuplicatePILAOutsideDebounceFiresHookAgain), but the + // Info-level "encrypted tunnel established" log is demoted to + // Debug: observed against list-agents on 2026-05-29, a peer + // sending a PILA every ~8 s while relayed data is being dropped + // floods the operator log with 35 false "established" lines per + // peer per 5 minutes. Pinned by + // TestSameSessionPILALogsAtDebugButFiresHook. + // + // - default (first install or real rekey): everything fires at + // Info level. + // + // The recovery-reply path below (SendKeyExchangeToNode) is gated + // independently — see TestAsymmetricRecoveryRepliesOnDuplicatePILAWhenStale. + sameSession := hadCrypto && !keyChanged if duplicate { slog.Debug("auth key exchange: duplicate frame coalesced", "peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), "relay", fromRelay) } else { if keyChanged { slog.Info("peer rekeyed (auth), re-establishing tunnel", "peer_node_id", peerNodeID) + } else if sameSession { + slog.Debug("auth key exchange: same-session keepalive", + "peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), + "endpoint", from, "relay", fromRelay) } else { slog.Info("encrypted tunnel established", "auth", true, "peer_node_id", peerNodeID, "endpoint", from, "relay", fromRelay) @@ -255,12 +276,21 @@ func (m *Manager) HandleUnauthFrame(data []byte, from *net.UDPAddr, fromRelay bo m.env.Install(peerNodeID, pc) } + // Same routing as HandleAuthFrame: duplicate suppresses everything; + // sameSession (past-debounce same-key keepalive) keeps the bus event + // + postInstall hook firing for endpoint refresh but demotes the + // Info log to Debug. + sameSession := hadCrypto && !keyChanged if duplicate { slog.Debug("unauth key exchange: duplicate frame coalesced", "peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), "relay", fromRelay) } else { if keyChanged { slog.Info("peer rekeyed, re-establishing tunnel", "peer_node_id", peerNodeID) + } else if sameSession { + slog.Debug("unauth key exchange: same-session keepalive", + "peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), + "endpoint", from, "relay", fromRelay) } else { slog.Info("encrypted tunnel established", "peer_node_id", peerNodeID, "endpoint", from, "relay", fromRelay) diff --git a/pkg/daemon/keyexchange/store.go b/pkg/daemon/keyexchange/store.go index 0810f852..b99b7fec 100644 --- a/pkg/daemon/keyexchange/store.go +++ b/pkg/daemon/keyexchange/store.go @@ -111,7 +111,6 @@ func (s *Store) IsReady(peerNodeID uint32) bool { return c != nil && c.Ready } - // wipeCryptoSecrets clears any sensitive plaintext on a Crypto's salvage // ring before the Crypto is dropped. PILOT-146: previously Drop simply // did a map delete, leaving the salvage plaintext bytes alive on the heap diff --git a/pkg/daemon/keyexchange/zz_same_session_log_test.go b/pkg/daemon/keyexchange/zz_same_session_log_test.go new file mode 100644 index 00000000..b1679180 --- /dev/null +++ b/pkg/daemon/keyexchange/zz_same_session_log_test.go @@ -0,0 +1,165 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package keyexchange_test + +// Regression for the same-session log spam observed against list-agents +// on 2026-05-29: a fresh daemon registers, completes the initial PILA +// exchange (1 install, 1 postInstall, 1 "encrypted tunnel established" +// at Info), then the peer keeps retransmitting the SAME PILA at ~8 s +// cadence while the relayed data plane drops our PILS replies. Every +// such retransmit lands past DuplicateHandshakeDebounce (250 ms), so +// the duplicate gate doesn't catch it, but it carries the same X25519 +// ephemeral so hadCrypto=true && keyChanged=false — structurally no +// new install. Pre-fix the daemon logged "encrypted tunnel established" +// at Info every time (35 events per peer per 5 min in field observation). +// +// The fix demotes the log to Debug for the same-session case while +// preserving: +// +// - the existing duplicate-within-debounce coalescing +// (TestDuplicatePILACoalescedSuppressesLogAndHook) +// +// - the past-debounce postInstall hook firing for endpoint refresh +// (TestDuplicatePILAOutsideDebounceFiresHookAgain pins hook count = 2) +// +// - the asymmetric-recovery reply on stale inbound +// (TestDuplicatePILAStillRepliesForAsymmetricRecovery) + +import ( + "bytes" + "log/slog" + "net" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/TeoSlayer/pilotprotocol/pkg/daemon/keyexchange" +) + +// syncWriter serialises Writes from slog handlers so concurrent +// goroutines logging into the same bytes.Buffer don't race the writes. +type syncWriter struct { + w *bytes.Buffer + mu *sync.Mutex +} + +func (s syncWriter) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.w.Write(p) +} + +// captureSlog redirects slog.Default to a buffer at the given level +// and returns a buffer-content snapshot + a restore func. NOT safe to +// use with t.Parallel() — slog.Default is process-global and parallel +// tests racing SetDefault see each other's handlers. +func captureSlog(t *testing.T, level slog.Level) (snapshot func() string, restore func()) { + t.Helper() + var ( + buf bytes.Buffer + mu sync.Mutex + ) + handler := slog.NewTextHandler(syncWriter{w: &buf, mu: &mu}, &slog.HandlerOptions{Level: level}) + prev := slog.Default() + slog.SetDefault(slog.New(handler)) + return func() string { + mu.Lock() + defer mu.Unlock() + return buf.String() + }, func() { + slog.SetDefault(prev) + } +} + +// TestSameSessionPILASuppressesInfoButFiresHookAndDebug pins the fix +// end-to-end: +// +// - First PILA: Info "encrypted tunnel established", hook count = 1. +// +// - Second same-key PILA past the debounce window: +// +// - hook still fires (count = 2, pinned for endpoint refresh by +// TestDuplicatePILAOutsideDebounceFiresHookAgain). +// +// - NO second "encrypted tunnel established" at Info — that was the +// spam pre-fix. +// +// - Debug-level "same-session keepalive" present (diagnostic +// remains available for operators tracing key-exchange flow). +// +// The two slog assertions share one capture buffer (and therefore one +// SetDefault call) because parallel-test races on slog.Default would +// otherwise tear the captured output. The test itself is NOT marked +// t.Parallel(). +func TestSameSessionPILASuppressesInfoButFiresHookAndDebug(t *testing.T) { + a := newPeer(t, 510) + b := newPeer(t, 511) + crossWireVerifyFuncs(a, b) + a.mgr.SetSender(func(uint32, *net.UDPAddr, []byte) error { return nil }) + + var hookCount atomic.Int32 + a.mgr.SetPostInstallHook(func(keyexchange.PostInstallEvent) { + hookCount.Add(1) + }) + + bFrame := b.mgr.BuildAuthFrame() + if bFrame == nil { + t.Fatalf("BuildAuthFrame returned nil") + } + from := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4000} + + // Capture at Debug so both the Info "established" on the first + // arrival and the Debug "same-session keepalive" on the second + // arrival land in the same buffer. + logSnap, restore := captureSlog(t, slog.LevelDebug) + defer restore() + + // --- First PILA: full install path --------------------------------- + + if !a.mgr.HandleAuthFrame(bFrame[4:], from, false) { + t.Fatalf("first PILA rejected") + } + if got := hookCount.Load(); got != 1 { + t.Fatalf("after first PILA: hook count = %d, want 1", got) + } + initialLog := logSnap() + if !strings.Contains(initialLog, "encrypted tunnel established") { + t.Fatalf("first PILA: expected Info log 'encrypted tunnel established', got:\n%s", initialLog) + } + initialEstablishedCount := strings.Count(initialLog, "encrypted tunnel established") + if initialEstablishedCount != 1 { + t.Fatalf("first PILA: expected exactly 1 'established' log line, got %d:\n%s", + initialEstablishedCount, initialLog) + } + + // --- Second PILA past debounce: keepalive, no spam ----------------- + + time.Sleep(keyexchange.DuplicateHandshakeDebounce + 100*time.Millisecond) + + if !a.mgr.HandleAuthFrame(bFrame[4:], from, false) { + t.Fatalf("same-session PILA rejected") + } + + // Endpoint-refresh contract from TestDuplicatePILAOutsideDebounceFiresHookAgain. + if got := hookCount.Load(); got != 2 { + t.Fatalf("after same-session PILA: hook count = %d, want 2 (must still refresh endpoint)", got) + } + + finalLog := logSnap() + + // The new behaviour: the Info "established" count stays at one + // (no spam from the second arrival). + finalEstablishedCount := strings.Count(finalLog, "encrypted tunnel established") + if finalEstablishedCount != 1 { + t.Fatalf("after same-session PILA: 'established' log count = %d, want 1 (no Info spam):\n%s", + finalEstablishedCount, finalLog) + } + + // The Debug diagnostic remains so operators can see the keepalive. + if !strings.Contains(finalLog, "same-session keepalive") { + t.Fatalf("after same-session PILA: expected Debug log 'same-session keepalive'; got:\n%s", + finalLog) + } +} diff --git a/pkg/daemon/zz_helpers_test.go b/pkg/daemon/zz_helpers_test.go index 977d2f09..0be8c624 100644 --- a/pkg/daemon/zz_helpers_test.go +++ b/pkg/daemon/zz_helpers_test.go @@ -405,9 +405,9 @@ func TestProcessSACKPartialOverlap(t *testing.T) { // A segment at [200, 205) should be marked sacked even when // the SACK block only partially overlaps (e.g. [195, 203)). c := &Connection{} - c.TrackSend(100, []byte("hello")) // [100, 105) - c.TrackSend(200, []byte("world")) // [200, 205) - c.TrackSend(300, []byte("!!!")) // [300, 303) + c.TrackSend(100, []byte("hello")) // [100, 105) + c.TrackSend(200, []byte("world")) // [200, 205) + c.TrackSend(300, []byte("!!!")) // [300, 303) // SACK block partially overlaps [200, 205) from the left c.ProcessSACK([]SACKBlock{{Left: 195, Right: 203}}) diff --git a/pkg/daemon/zz_ipc_write_deadline_test.go b/pkg/daemon/zz_ipc_write_deadline_test.go index b4484fa6..712a0418 100644 --- a/pkg/daemon/zz_ipc_write_deadline_test.go +++ b/pkg/daemon/zz_ipc_write_deadline_test.go @@ -66,15 +66,20 @@ func TestWriteLoopExitsOnWriteDeadline(t *testing.T) { }() // Fill sendCh + keep writing until ipcWrite blocks. + // + // Each iteration allocates its own message copy in the main + // goroutine BEFORE spawning the writer — `msg` is reused across + // iterations, so doing the copy inside the goroutine raced with + // the next iteration's msg[0] = ... write. Caught by -race on + // ubuntu-latest in the Architecture-gates job (zz_ipc_write_deadline_test + // race report 2026-05-29). const msgSize = 4096 msg := make([]byte, msgSize) for i := 0; i < ipcSendBuffer+10; i++ { msg[0] = byte(i & 0xFF) - go func(m []byte) { - m2 := make([]byte, len(m)) - copy(m2, m) - ic.ipcWrite(m2) - }(msg) + m := make([]byte, msgSize) + copy(m, msg) + go func() { ic.ipcWrite(m) }() } // Give writeLoop time to fill the kernel send buffer.