Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ type Config struct {
Email string // email address for account identification and key recovery
Owner string // deprecated: use Email instead

Endpoint string // fixed public endpoint (host:port) — skips STUN discovery (for cloud VMs)
AdvertiseEndpoint string // override STUN-discovered endpoint for registry advertisement (host:port) — for k8s pods where STUN returns unreachable IPs
Public bool // make this node's endpoint publicly discoverable
Hostname string // hostname for discovery (empty = none)
Endpoint string // fixed public endpoint (host:port) — skips STUN discovery (for cloud VMs)
AdvertiseEndpoint string // override STUN-discovered endpoint for registry advertisement (host:port) — for k8s pods where STUN returns unreachable IPs
Public bool // make this node's endpoint publicly discoverable
Hostname string // hostname for discovery (empty = none)

// RelayOnly hides this node's real_addr from peer resolve/lookup
// responses. Peers reach this node only via the beacon-relay path,
Expand Down Expand Up @@ -148,13 +148,13 @@ type Config struct {

// Default tuning constants (used when Config fields are zero).
const (
DefaultKeepaliveInterval = 60 * time.Second
DefaultIdleTimeout = 120 * time.Second
DefaultIdleSweepInterval = 15 * time.Second
DefaultKeepaliveInterval = 60 * time.Second
DefaultIdleTimeout = 120 * time.Second
DefaultIdleSweepInterval = 15 * time.Second
// hostnameReannounceInterval is how often the daemon re-sets its
// hostname with the registry. This heals hostname resolution after
// a registry restart/roll wipes the in-memory hostname store.
hostnameReannounceInterval = 60 * time.Second
hostnameReannounceInterval = 60 * time.Second
DefaultSYNRateLimit = 100
DefaultMaxConnectionsPerPort = 1024
DefaultMaxTotalConnections = 65536
Expand Down
38 changes: 37 additions & 1 deletion pkg/daemon/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,23 @@ type ipcConn struct {
// avg 256B/msg ≈ 64 MB worst case).
const ipcSendBuffer = 256

// ipcWriteTimeout caps how long writeLoop will wait inside a single
// ipcutil.Write before treating the client as stalled. A stalled client
// (stops reading from the socket) fills the kernel send buffer and
// blocks ipcutil.Write indefinitely; without this deadline the dispatch
// goroutines park in ipcWrite, the per-client semaphore exhausts, the
// read loop blocks, and the daemon appears dead even though it isn't.
// On deadline, writeLoop closes the connection and exits, unblocking
// every parked ipcWrite caller via writeDone. Pinned by
// TestWriteLoopExitsOnWriteDeadline.
const ipcWriteTimeout = 10 * time.Second

// ipcDrainTimeout is the per-message deadline used on the drain path
// during Close. Shorter than ipcWriteTimeout because Close already
// signalled the conn is going away — there's no point waiting the full
// active-write budget for already-doomed messages.
const ipcDrainTimeout = 3 * time.Second

// ipcMaxInflightPerClient caps how many in-flight dispatch goroutines a
// single IPC client may have. Each request becomes a goroutine that
// handles the command and writes the reply (concurrent dispatch — see
Expand Down Expand Up @@ -236,16 +253,24 @@ func (c *ipcConn) writeLoop() {
for {
select {
case msg := <-c.sendCh:
// Bound the active write to ipcWriteTimeout (PILOT-218). The
// commit that added the test (1eff4fa3) intended to include
// this deadline but the writeLoop changes never landed; the
// test has been failing -race ever since. SetWriteDeadline
// errors are non-fatal (no-op on net.Pipe, etc.) so swallow.
_ = c.Conn.SetWriteDeadline(time.Now().Add(ipcWriteTimeout))
if err := ipcutil.Write(c.Conn, msg); err != nil {
c.Conn.Close()
return
}
case <-c.done:
// Best-effort drain of pending messages so callers that already
// pushed before Close() don't lose their data.
// pushed before Close() don't lose their data. Shorter deadline
// per message because Close already signalled teardown.
for {
select {
case msg := <-c.sendCh:
_ = c.Conn.SetWriteDeadline(time.Now().Add(ipcDrainTimeout))
if err := ipcutil.Write(c.Conn, msg); err != nil {
c.Conn.Close()
return
Expand Down Expand Up @@ -277,6 +302,17 @@ func (c *ipcConn) ipcWrite(data []byte) error {
return ErrIPCClosed
default:
}
// Also fast-fail if writeLoop has exited (e.g. SetWriteDeadline
// fired on a stalled client). The slow-path select catches the
// same condition, but a successful sendCh enqueue can be chosen
// over a closed writeDone when both are ready, and a message that
// lands on sendCh after writeLoop has exited will sit there
// orphaned. Pinned by TestWriteLoopExitsOnWriteDeadline:103.
select {
case <-c.writeDone:
return ErrIPCClosed
default:
}
// Fast path: try non-blocking enqueue.
select {
case c.sendCh <- data:
Expand Down
48 changes: 39 additions & 9 deletions pkg/daemon/keyexchange/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,42 @@ func (m *Manager) HandleAuthFrame(data []byte, from *net.UDPAddr, fromRelay bool
// Cache the verified peer pubkey.
m.SetPeerPubKey(peerNodeID, peerEd25519PubKey)

// Side-effect gate: the log line, tunnel.established bus event, and
// PostInstallHook fire ONLY when this is not a coalesced duplicate.
// A real rekey (keyChanged) or a first-time install always falls
// through to the side effects. The recovery-reply path below (the
// SendKeyExchangeToNode call) is NOT gated on `duplicate` — the
// asymmetric-recovery case (B dropped crypto for A while A retains
// it) requires A to reply on B's retransmit even though A sees it
// as a duplicate. Pinned by
// TestAsymmetricRecoveryRepliesOnDuplicatePILAWhenStale.
// Side-effect routing:
//
// - duplicate (same X25519 ephemeral within DuplicateHandshakeDebounce)
// is a tight direct+relay arrival pair or peer-side retransmit
// burst. All side effects suppressed.
//
// - sameSession (hadCrypto && !keyChanged, past the debounce window)
// is the peer's slow keepalive retransmit. The session was
// established by the earlier PILA; nothing was installed by
// this one. The bus event and PostInstallHook STILL fire so
// downstream observers can refresh endpoint observation /
// keep-alive bookkeeping (pinned by
// TestDuplicatePILAOutsideDebounceFiresHookAgain), but the
// Info-level "encrypted tunnel established" log is demoted to
// Debug: observed against list-agents on 2026-05-29, a peer
// sending a PILA every ~8 s while relayed data is being dropped
// floods the operator log with 35 false "established" lines per
// peer per 5 minutes. Pinned by
// TestSameSessionPILALogsAtDebugButFiresHook.
//
// - default (first install or real rekey): everything fires at
// Info level.
//
// The recovery-reply path below (SendKeyExchangeToNode) is gated
// independently — see TestAsymmetricRecoveryRepliesOnDuplicatePILAWhenStale.
sameSession := hadCrypto && !keyChanged
if duplicate {
slog.Debug("auth key exchange: duplicate frame coalesced",
"peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), "relay", fromRelay)
} else {
if keyChanged {
slog.Info("peer rekeyed (auth), re-establishing tunnel", "peer_node_id", peerNodeID)
} else if sameSession {
slog.Debug("auth key exchange: same-session keepalive",
"peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt),
"endpoint", from, "relay", fromRelay)
} else {
slog.Info("encrypted tunnel established", "auth", true,
"peer_node_id", peerNodeID, "endpoint", from, "relay", fromRelay)
Expand Down Expand Up @@ -255,12 +276,21 @@ func (m *Manager) HandleUnauthFrame(data []byte, from *net.UDPAddr, fromRelay bo
m.env.Install(peerNodeID, pc)
}

// Same routing as HandleAuthFrame: duplicate suppresses everything;
// sameSession (past-debounce same-key keepalive) keeps the bus event
// + postInstall hook firing for endpoint refresh but demotes the
// Info log to Debug.
sameSession := hadCrypto && !keyChanged
if duplicate {
slog.Debug("unauth key exchange: duplicate frame coalesced",
"peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt), "relay", fromRelay)
} else {
if keyChanged {
slog.Info("peer rekeyed, re-establishing tunnel", "peer_node_id", peerNodeID)
} else if sameSession {
slog.Debug("unauth key exchange: same-session keepalive",
"peer_node_id", peerNodeID, "age", time.Since(oldPC.CreatedAt),
"endpoint", from, "relay", fromRelay)
} else {
slog.Info("encrypted tunnel established",
"peer_node_id", peerNodeID, "endpoint", from, "relay", fromRelay)
Expand Down
1 change: 0 additions & 1 deletion pkg/daemon/keyexchange/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (s *Store) IsReady(peerNodeID uint32) bool {
return c != nil && c.Ready
}


// wipeCryptoSecrets clears any sensitive plaintext on a Crypto's salvage
// ring before the Crypto is dropped. PILOT-146: previously Drop simply
// did a map delete, leaving the salvage plaintext bytes alive on the heap
Expand Down
165 changes: 165 additions & 0 deletions pkg/daemon/keyexchange/zz_same_session_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

package keyexchange_test

// Regression for the same-session log spam observed against list-agents
// on 2026-05-29: a fresh daemon registers, completes the initial PILA
// exchange (1 install, 1 postInstall, 1 "encrypted tunnel established"
// at Info), then the peer keeps retransmitting the SAME PILA at ~8 s
// cadence while the relayed data plane drops our PILS replies. Every
// such retransmit lands past DuplicateHandshakeDebounce (250 ms), so
// the duplicate gate doesn't catch it, but it carries the same X25519
// ephemeral so hadCrypto=true && keyChanged=false — structurally no
// new install. Pre-fix the daemon logged "encrypted tunnel established"
// at Info every time (35 events per peer per 5 min in field observation).
//
// The fix demotes the log to Debug for the same-session case while
// preserving:
//
// - the existing duplicate-within-debounce coalescing
// (TestDuplicatePILACoalescedSuppressesLogAndHook)
//
// - the past-debounce postInstall hook firing for endpoint refresh
// (TestDuplicatePILAOutsideDebounceFiresHookAgain pins hook count = 2)
//
// - the asymmetric-recovery reply on stale inbound
// (TestDuplicatePILAStillRepliesForAsymmetricRecovery)

import (
"bytes"
"log/slog"
"net"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/TeoSlayer/pilotprotocol/pkg/daemon/keyexchange"
)

// syncWriter serialises Writes from slog handlers so concurrent
// goroutines logging into the same bytes.Buffer don't race the writes.
type syncWriter struct {
w *bytes.Buffer
mu *sync.Mutex
}

func (s syncWriter) Write(p []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.w.Write(p)
}

// captureSlog redirects slog.Default to a buffer at the given level
// and returns a buffer-content snapshot + a restore func. NOT safe to
// use with t.Parallel() — slog.Default is process-global and parallel
// tests racing SetDefault see each other's handlers.
func captureSlog(t *testing.T, level slog.Level) (snapshot func() string, restore func()) {
t.Helper()
var (
buf bytes.Buffer
mu sync.Mutex
)
handler := slog.NewTextHandler(syncWriter{w: &buf, mu: &mu}, &slog.HandlerOptions{Level: level})
prev := slog.Default()
slog.SetDefault(slog.New(handler))
return func() string {
mu.Lock()
defer mu.Unlock()
return buf.String()
}, func() {
slog.SetDefault(prev)
}
}

// TestSameSessionPILASuppressesInfoButFiresHookAndDebug pins the fix
// end-to-end:
//
// - First PILA: Info "encrypted tunnel established", hook count = 1.
//
// - Second same-key PILA past the debounce window:
//
// - hook still fires (count = 2, pinned for endpoint refresh by
// TestDuplicatePILAOutsideDebounceFiresHookAgain).
//
// - NO second "encrypted tunnel established" at Info — that was the
// spam pre-fix.
//
// - Debug-level "same-session keepalive" present (diagnostic
// remains available for operators tracing key-exchange flow).
//
// The two slog assertions share one capture buffer (and therefore one
// SetDefault call) because parallel-test races on slog.Default would
// otherwise tear the captured output. The test itself is NOT marked
// t.Parallel().
func TestSameSessionPILASuppressesInfoButFiresHookAndDebug(t *testing.T) {
a := newPeer(t, 510)
b := newPeer(t, 511)
crossWireVerifyFuncs(a, b)
a.mgr.SetSender(func(uint32, *net.UDPAddr, []byte) error { return nil })

var hookCount atomic.Int32
a.mgr.SetPostInstallHook(func(keyexchange.PostInstallEvent) {
hookCount.Add(1)
})

bFrame := b.mgr.BuildAuthFrame()
if bFrame == nil {
t.Fatalf("BuildAuthFrame returned nil")
}
from := &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 4000}

// Capture at Debug so both the Info "established" on the first
// arrival and the Debug "same-session keepalive" on the second
// arrival land in the same buffer.
logSnap, restore := captureSlog(t, slog.LevelDebug)
defer restore()

// --- First PILA: full install path ---------------------------------

if !a.mgr.HandleAuthFrame(bFrame[4:], from, false) {
t.Fatalf("first PILA rejected")
}
if got := hookCount.Load(); got != 1 {
t.Fatalf("after first PILA: hook count = %d, want 1", got)
}
initialLog := logSnap()
if !strings.Contains(initialLog, "encrypted tunnel established") {
t.Fatalf("first PILA: expected Info log 'encrypted tunnel established', got:\n%s", initialLog)
}
initialEstablishedCount := strings.Count(initialLog, "encrypted tunnel established")
if initialEstablishedCount != 1 {
t.Fatalf("first PILA: expected exactly 1 'established' log line, got %d:\n%s",
initialEstablishedCount, initialLog)
}

// --- Second PILA past debounce: keepalive, no spam -----------------

time.Sleep(keyexchange.DuplicateHandshakeDebounce + 100*time.Millisecond)

if !a.mgr.HandleAuthFrame(bFrame[4:], from, false) {
t.Fatalf("same-session PILA rejected")
}

// Endpoint-refresh contract from TestDuplicatePILAOutsideDebounceFiresHookAgain.
if got := hookCount.Load(); got != 2 {
t.Fatalf("after same-session PILA: hook count = %d, want 2 (must still refresh endpoint)", got)
}

finalLog := logSnap()

// The new behaviour: the Info "established" count stays at one
// (no spam from the second arrival).
finalEstablishedCount := strings.Count(finalLog, "encrypted tunnel established")
if finalEstablishedCount != 1 {
t.Fatalf("after same-session PILA: 'established' log count = %d, want 1 (no Info spam):\n%s",
finalEstablishedCount, finalLog)
}

// The Debug diagnostic remains so operators can see the keepalive.
if !strings.Contains(finalLog, "same-session keepalive") {
t.Fatalf("after same-session PILA: expected Debug log 'same-session keepalive'; got:\n%s",
finalLog)
}
}
6 changes: 3 additions & 3 deletions pkg/daemon/zz_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,9 @@ func TestProcessSACKPartialOverlap(t *testing.T) {
// A segment at [200, 205) should be marked sacked even when
// the SACK block only partially overlaps (e.g. [195, 203)).
c := &Connection{}
c.TrackSend(100, []byte("hello")) // [100, 105)
c.TrackSend(200, []byte("world")) // [200, 205)
c.TrackSend(300, []byte("!!!")) // [300, 303)
c.TrackSend(100, []byte("hello")) // [100, 105)
c.TrackSend(200, []byte("world")) // [200, 205)
c.TrackSend(300, []byte("!!!")) // [300, 303)

// SACK block partially overlaps [200, 205) from the left
c.ProcessSACK([]SACKBlock{{Left: 195, Right: 203}})
Expand Down
15 changes: 10 additions & 5 deletions pkg/daemon/zz_ipc_write_deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,20 @@ func TestWriteLoopExitsOnWriteDeadline(t *testing.T) {
}()

// Fill sendCh + keep writing until ipcWrite blocks.
//
// Each iteration allocates its own message copy in the main
// goroutine BEFORE spawning the writer — `msg` is reused across
// iterations, so doing the copy inside the goroutine raced with
// the next iteration's msg[0] = ... write. Caught by -race on
// ubuntu-latest in the Architecture-gates job (zz_ipc_write_deadline_test
// race report 2026-05-29).
const msgSize = 4096
msg := make([]byte, msgSize)
for i := 0; i < ipcSendBuffer+10; i++ {
msg[0] = byte(i & 0xFF)
go func(m []byte) {
m2 := make([]byte, len(m))
copy(m2, m)
ic.ipcWrite(m2)
}(msg)
m := make([]byte, msgSize)
copy(m, msg)
go func() { ic.ipcWrite(m) }()
}

// Give writeLoop time to fill the kernel send buffer.
Expand Down
Loading