From 030be54c7008e54ce4020f5120482d3c963948a9 Mon Sep 17 00:00:00 2001 From: Juhana Ilmoniemi Date: Sun, 10 May 2026 18:09:03 +0300 Subject: [PATCH 1/3] spec: binary-side frame forwarder (#26) --- .../specs/architecture/26-binary-forwarder.md | 525 ++++++++++++++++++ 1 file changed, 525 insertions(+) create mode 100644 docs/specs/architecture/26-binary-forwarder.md diff --git a/docs/specs/architecture/26-binary-forwarder.md b/docs/specs/architecture/26-binary-forwarder.md new file mode 100644 index 0000000..41e7972 --- /dev/null +++ b/docs/specs/architecture/26-binary-forwarder.md @@ -0,0 +1,525 @@ +# Binary-side frame forwarder (#26) + +Per-binary read pump that unwraps each inbound routing envelope, looks +up the addressed phone within the binary's own server-id, and writes +the inner frame to that phone. Replaces the +`c.CloseRead(r.Context())` + `<-readCtx.Done()` placeholder in +`internal/relay/server_endpoint.go`. Mirror-image of the phone-side +forwarder shipped in #25 — same overall shape, but with a divergent +error policy: a single misaddressed or malformed frame, or a single +failing phone `Send`, MUST NOT tear down the binary connection. + +## Files to read first + +- `internal/relay/forward.go` (whole file, 75 lines) — `phoneSource` + interface and `StartPhoneForwarder` body. The new function mirrors + this shape exactly except for the Send-error / unknown-id / + malformed-envelope branches, which `continue` rather than `return`. + Lines 27-30 capture the "forwarder owns reading; handler owns + cleanup" rule that must apply equally here. +- `internal/relay/forward_test.go` (whole file, 270 lines) — + `fakePhone`, `fakeBinary`, `runForwarder`, `waitForSent`, + `compactJSON`, `discardLogger`, and the `// Manual stress: go test + -race -count=20 …` doc-comment header. Reuse all of these. The new + tests need a parallel "binary as source" fake (Read side) and a + "phone as sink" fake (Send side). See § Testing strategy for the + minimal additions. +- `internal/relay/server_endpoint.go:35-112` — current `/v1/server` + handler. Lines 109-110 hold the `c.CloseRead(r.Context())` + + `<-readCtx.Done()` block this spec replaces. Note the existing + `defer { ScheduleReleaseServer; Close; log }` (lines 87-91), the + heartbeat goroutine + `defer cancelHB` (lines 99-101), and the + defer order — the forwarder must NOT touch any of those, and the + LIFO unwind on forwarder return must remain `cancelHB → release → + Close`. +- `internal/relay/client_endpoint.go:74-99` — the call shape we mirror: + `_ = StartPhoneForwarder(r.Context(), reg, serverID, wsconn, logger)` + parked on the handler goroutine, return value discarded, defers + running on return. +- `internal/relay/envelope.go` (whole file, 80 lines) — `Unmarshal` + signature, the `Envelope` shape (`ConnID string`, `Frame + json.RawMessage`), and the four sentinel errors + (`ErrMalformedEnvelope`, `ErrMissingConnID`, `ErrMissingFrame`, + plus `ErrInvalidFrameJSON` from the marshal side). The forwarder + treats every `Unmarshal` error the same way (warn + drop + + continue), so `errors.Is` branching is not strictly required — + but the spec uses `errors.Is(err, ErrMalformedEnvelope) || …` in + the AC bullet to signal that all sentinels are handled, not just + the type-assertion-style `_, isJSON := err.(*json.SyntaxError)`. +- `internal/relay/registry.go:32-47` — `Conn` interface (`ConnID`, + `Send`, `Close`); `PhonesFor` at lines 246-256 — the per-frame + lookup. The forwarder iterates the returned snapshot (not the + registry's internal slice) and matches `ConnID()` against + `env.ConnID`. +- `internal/relay/registry.go:125-184` — + `ScheduleReleaseServer` / `handleGraceExpiry` semantics. Critical: + during grace, `PhonesFor` continues to return phones registered + under the (now disconnected) server; on expiry, each phone's + `Close()` is invoked by the registry. The forwarder is dead + before grace expiry (its `Read` returned), so it never sees + expiry-time state. +- `internal/relay/heartbeat.go` (whole file, 59 lines) — the + heartbeat goroutine wired alongside the binary's WS conn. The + forwarder runs synchronously on the HTTP handler goroutine in + parallel with the heartbeat goroutine; both terminate cleanly via + the existing `defer cancelHB()` + handler defer. +- `internal/relay/ws_conn.go:74-84` — `Read` method on `*WSConn` + (already shipped in #25). The binary's `*WSConn` will satisfy the + new `binarySource` interface via this same method. +- `docs/specs/architecture/25-phone-forwarder.md` (whole file) — + prior-art spec. Same loop shape, same fake-substitution test + strategy. Read it once; this spec elides anything #25 already + motivated and only spells out what diverges. +- `docs/lessons.md` § "json.RawMessage round-trips are byte-stable + modulo whitespace" (lines 37-39); § "Race-test count is a + CI-runner knob" (lines 45-47); § "A long-lived WS handler that + does not read frames will never observe peer close" (lines 13-15) + — explains why the existing `CloseRead` call must be deleted, not + retained alongside the new reader. +- `pyrycode/pyrycode/docs/protocol-mobile.md` § Routing envelope — + the wire shape `{conn_id, frame}` and the rule that the relay + treats `frame` as opaque bytes. Out-of-scope for inspection. + +## Context + +After `/v1/server` (#16, #21) claims the server-id slot and (#7) +launches the heartbeat goroutine, the handler currently parks on +`c.CloseRead(r.Context())` + `<-readCtx.Done()` so the WS processes +peer-side control frames but discards data frames. This ticket lands +the data path in the binary→phone direction: read each inbound +envelope, look up the addressed phone within `serverID`, and write +the inner frame to that phone. + +The relay continues to treat inner frames as opaque bytes +(`json.RawMessage`); only the envelope's `conn_id` and `frame` +fields are inspected, exactly as #25's outbound path inspects only +`conn_id` (it constructs the envelope) and the inbound phone-frame +is wrapped without inspection. + +Stateless relay; per-frame `PhonesFor` lookup picks up phone +registration changes transparently (a phone that just connected +between envelope #N and #N+1 becomes addressable on #N+1). + +## Design + +### Edit: `internal/relay/forward.go` + +Add one type and one function. The type is unexported (the +"interface at the consumer" pattern adopted in #25). The function is +exported for `server_endpoint.go` to call. + +```go +// binarySource is the read-side contract the forwarder needs from a +// binary connection. Defined at the consumer, not on WSConn, so tests +// can substitute a fake. Production passes *WSConn; concurrent Read +// callers are NOT supported (matches WSConn.Read's contract). +// +// Structurally identical to phoneSource; the distinct named type +// documents the call site's intent and keeps the two Start*Forwarder +// signatures self-describing. +type binarySource interface { + ConnID() string + Read(ctx context.Context) ([]byte, error) +} +``` + +```go +// StartBinaryForwarder runs the per-binary read pump synchronously: +// it reads envelopes from binary, unwraps each, finds the phone +// registered under serverID whose ConnID equals env.ConnID, and +// writes env.Frame (the verbatim inner bytes) to that phone. +// +// Returns when binary.Read errors or ctx is cancelled. Does NOT +// return on per-frame errors: a malformed envelope, an unknown +// conn_id, or a phone Send failure all log + drop + continue. A +// single bad frame from the binary MUST NOT tear the binary +// connection down — phones come and go, and an envelope addressing +// a just-disconnected phone is a normal race, not a binary fault. +// +// The caller's defer (in /v1/server) handles ScheduleReleaseServer, +// wsconn.Close, and the heartbeat cancel; the forwarder must NOT +// touch any of those. The relay treats inner frames as opaque +// bytes: only Unmarshal's structural checks inspect the envelope, +// never env.Frame. +// +// Despite the Start verb, the call is synchronous; the verb matches +// the AC and mirrors StartPhoneForwarder. +func StartBinaryForwarder( + ctx context.Context, + reg *Registry, + serverID string, + binary binarySource, + logger *slog.Logger, +) error +``` + +### Loop body + +``` +for { + wrapped, err := binary.Read(ctx) + if err != nil { + // ctx cancellation, peer close, library error — all funnel + // here. Log at info; the handler's server_released log + // closes out the lifecycle. + logger.Info("binary_forwarder_read_end", + "server_id", serverID, + "binary_conn_id", binary.ConnID(), + "err", err) + return err + } + + env, err := Unmarshal(wrapped) + if err != nil { + // Adversarial / buggy binary: malformed envelope, missing + // conn_id, or missing frame. Drop the frame, keep serving + // the rest of the conn. The binary owns its own protocol + // health; the relay does not punish it for one bad frame. + logger.Warn("binary_forwarder_unmarshal_err", + "server_id", serverID, + "binary_conn_id", binary.ConnID(), + "err", err) + continue + } + + // PhonesFor returns a fresh snapshot — safe to iterate without + // holding any registry lock. O(N) in registered phones; current + // scale (handful per server) makes this fine. + var phone Conn + for _, p := range reg.PhonesFor(serverID) { + if p.ConnID() == env.ConnID { + phone = p + break + } + } + if phone == nil { + // Unknown conn_id: phone disconnected between the binary's + // last observation and this envelope, or the binary + // addressed a phone it shouldn't know about. Either way, a + // normal race or a binary bug — drop, continue. + logger.Warn("binary_forwarder_unknown_conn_id", + "server_id", serverID, + "conn_id", env.ConnID) + continue + } + + if err := phone.Send(env.Frame); err != nil { + // Phone is wedged or already closed. The phone's own + // /v1/client handler will see its read fail (or has + // already) and run its UnregisterPhone+Close defer. Drop + // this frame; keep serving other phones on this binary. + // DIVERGES from StartPhoneForwarder, which returns on + // Send error: there, the only sink is the binary, so a + // failing Send means the conn is dead. Here, we have N + // sinks and one bad sink does not end the loop. + logger.Info("binary_forwarder_phone_send_failed", + "server_id", serverID, + "conn_id", env.ConnID, + "err", err) + continue + } +} +``` + +`env.Frame` is `json.RawMessage`, i.e. `[]byte`. It passes verbatim +to `phone.Send` (which writes a single binary WS frame). The relay +neither parses nor canonicalises the inner bytes; tests assert +byte-stability modulo whitespace via `json.Compact` (see § Testing +strategy). + +### Edit: `internal/relay/server_endpoint.go` + +Replace the placeholder block (lines 109-110) with a +`StartBinaryForwarder` call: + +```go +// Before: +readCtx := c.CloseRead(r.Context()) +<-readCtx.Done() + +// After: +_ = StartBinaryForwarder(r.Context(), reg, serverID, wsconn, logger) +``` + +Drop the `CloseRead` call entirely. The new read loop processes +control frames inline with data reads, so the drain-and-discard +goroutine is no longer needed and would race the +`*websocket.Conn`-level sole-reader contract if retained +(`docs/lessons.md` lines 13-15; `WSConn.Read` doc). + +The existing `defer { ScheduleReleaseServer; Close; log }` (lines +87-91) and `defer cancelHB()` (line 100) run in LIFO order on +forwarder return: `cancelHB` first (heartbeat goroutine observes ctx +cancel and exits without touching the conn), then the release defer +(`ScheduleReleaseServer` + `wsconn.Close()`, idempotent). The +forwarder MUST NOT call `ScheduleReleaseServer`, `wsconn.Close()`, +or `cancelHB` itself. + +The return value is discarded (`_ =`). The handler's +`server_released` log already terminates the lifecycle from the +HTTP side; the forwarder's own logs cover the data path. + +The heartbeat-related comment at lines 103-110 is replaced by a +short comment that names the forwarder and references this spec, in +the same style as the equivalent comment in `client_endpoint.go` +lines 92-98. + +## Concurrency model + +- One forwarder goroutine per binary (the HTTP handler goroutine + itself; no extra goroutine spawned). Runs in parallel with the + heartbeat goroutine on a sibling goroutine; the two never share + state. +- `binary.Read` is single-caller (forwarder). +- `phone.Send` is multi-caller across the system: every binary's + forwarder writing to a given phone, plus any future server-side + signal path. `WSConn.writeMu` (#15) serialises them. No new locks + added by this ticket. +- `reg.PhonesFor` takes RLock and returns a fresh snapshot; cheap, + contention-free, and the snapshot is iterated without holding any + registry lock — `phone.Send` and `phone.ConnID` are called + outside the registry lock. Matches the established passive-store + pattern (`docs/lessons.md` line 49-51). +- Shutdown paths: + 1. **Binary closes WS:** `c.Read` returns close error → loop + returns → handler defer runs (`cancelHB` → `ScheduleReleaseServer` + → `wsconn.Close`). + 2. **Server shutdown / request cancel:** `ctx` cancels → `c.Read` + returns ctx error → loop returns. + 3. **Heartbeat-driven close (1011):** heartbeat goroutine calls + `wsconn.CloseWithCode(1011, "heartbeat timeout")` → underlying + `*websocket.Conn` aborts in-flight `Read` → forwarder returns + via Read error. + 4. **Per-frame errors:** malformed envelope, unknown conn_id, + phone Send error — log + `continue`. Loop is not terminated; + the binary continues serving other phones / other frames. +- No goroutine leaks: all binary-fault termination paths terminate + the single forwarder goroutine, and the handler's defer cleans up + registry state. + +## Error handling + +| Cause | Action | Log level | +|----------------------------------------------------|-----------------------------------|-----------| +| `binary.Read` error (any) | log, return err | info | +| `Unmarshal` error (any sentinel; checked structurally) | log, **continue** | warn | +| `PhonesFor` snapshot lacks `env.ConnID` | log, **continue** | warn | +| `phone.Send` error | log, **continue** | info | + +The three "continue" branches are the structural divergence from +`StartPhoneForwarder`. Per the AC: "A single bad frame from the +binary MUST NOT tear the binary connection down." + +The forwarder does not branch on specific `Unmarshal` sentinels — +all four (`ErrMalformedEnvelope`, `ErrMissingConnID`, +`ErrMissingFrame`, plus a defensive catch-all) get the same +warn-and-continue treatment. Branching adds no behaviour and risks +divergence if envelope errors are added later. + +## Testing strategy + +`internal/relay/forward_test.go` (extending the existing file). All +tests use mocks against a real `Registry`; no httptest server. The +existing `compactJSON`, `discardLogger`, and `waitForSent` helpers +are reused. + +### Two new test fakes + +1. **`fakeBinarySource`** — implements `binarySource`. Holds an + `id` and a `frames chan []byte`. `Read` selects on `ctx.Done()` + and on `frames`; on closed frames-chan, returns `io.EOF`. + Mirrors `fakePhone`'s shape; ~15 LOC. + +2. **A phone fake that captures `Send`**. Two reasonable approaches + — developer picks one: + + - **(a)** Add `mu sync.Mutex` + `sent [][]byte` + `sendErr error` + fields to the existing `fakePhone`, and give it a `Send` + method that captures (mu-protected). This makes `fakePhone` + directly satisfy `Conn`, removing the need for `registryConn` + in the new tests. Existing #25 tests are unaffected (they use + `®istryConn{phone}` and never inspect `fakePhone.sent`). + + - **(b)** Introduce a parallel `fakePhoneSink` type with the + same shape as `fakeBinary` (id, mu, sent, sendErr, snapshot). + + Approach (a) is preferred — reduces type count and matches the + "one fake per role" feel of the existing file. Approach (b) is + acceptable if the developer wants strict separation between + "source-only" and "sink-only" fakes for symmetry with #25. + + Either way, a `waitForSent`-shape helper is needed for the + phone-side sink. The existing `waitForSent(t, *fakeBinary, ...)` + takes `*fakeBinary` concretely; either generalise it or + duplicate it for the new type. Either is fine — ~10 LOC. + +### Test cases (7 total) + +1. **`TestStartBinaryForwarder_RoutesToAddressedPhone`.** Claim a + `fakeBinarySource` for server-id `"s1"` (via `ClaimServer` — + wrap as `Conn` similarly to how #25 wraps `fakePhone`, since the + binary needs to be in the registry for `BinaryFor` lookups by + anyone, though this forwarder doesn't use `BinaryFor`; in + practice the registry simply needs the slot present so + `RegisterPhone` succeeds). Register two phones P1 + (`"client-s1-aaaa1111"`) and P2 (`"client-s1-bbbb2222"`) via + `RegisterPhone`. Run `StartBinaryForwarder` in a goroutine. + Build an envelope addressed to P1 with inner frame + `{"type":"hello","x":[1,2,3]}` via `Marshal`. Push it onto + `fakeBinarySource.frames`. Wait until P1.sent has 1 entry; assert + P2.sent is empty; assert P1.sent[0] is byte-equal modulo + whitespace to the inner frame via `compactJSON`. + +2. **`TestStartBinaryForwarder_MultiplePhones`.** Same setup. Push + one envelope addressed to P1 and one addressed to P2. Wait for + each to receive exactly one frame; assert the inner bytes match. + +3. **`TestStartBinaryForwarder_UnknownConnID_DropsAndContinues`.** + Same setup. Push envelope #1 addressed to a bogus + `"client-s1-deadbeef"`. Push envelope #2 addressed to P1. + Assert P1 receives exactly one frame (envelope #2's inner) and + P2 receives nothing. Assert the forwarder is still running by + then closing the binary's frames chan and observing the + forwarder return with `io.EOF` on its done chan. + +4. **`TestStartBinaryForwarder_MalformedEnvelope_DropsAndContinues`.** + Push raw bytes that fail `Unmarshal`: `[]byte("not-json")` (or + `[]byte("{}")` to trigger `ErrMissingConnID`, or + `[]byte(\`{"conn_id":"x"}\`)` for `ErrMissingFrame` — pick one; + the AC says "drop, continue" for all). Push a valid envelope + addressed to P1 immediately after. Assert P1 receives the + second envelope's inner frame; the first never reaches any + phone. Confirm continuation by closing frames and observing + `io.EOF` return. + +5. **`TestStartBinaryForwarder_PhoneSendError_DropsAndContinues`.** + Configure P1 with a non-nil `sendErr`. Push envelope to P1 + (asserted to be dropped via `len(P1.sent)==0`), then push + envelope to P2 (asserted to land). Forwarder is still running: + close the binary frames chan; observe `io.EOF` return. Asserts + the divergence from `StartPhoneForwarder`'s + return-on-Send-error behaviour. + +6. **`TestStartBinaryForwarder_BinaryDisconnect_Returns`.** Push + one envelope to P1 (assert it lands). Close + `fakeBinarySource.frames` → `Read` returns `io.EOF` → forwarder + returns. From the test goroutine, mimic the handler defer by + calling `reg.ScheduleReleaseServer("s1", 0)` (zero grace for + instant expiry) and assert the server slot becomes free — + verifies the AC bullet "the handler-level `defer` runs + `ScheduleReleaseServer` as expected" structurally. + +7. **`TestStartBinaryForwarder_ContextCancellation_Returns`.** Same + setup as test 1 but DON'T push frames. Cancel the parent ctx. + Forwarder's `Read` returns ctx.Err → loop returns. Assert + return within 100 ms (generous bound under `-race`). + +`make test` clean with `-race`. The package-level doc comment for +`forward_test.go` already documents the `go test -race -count=20 +./internal/relay/` invocation per the race-count lesson; no edit +needed there. + +## Open questions + +None blocking. Two judgement calls the developer will hit: + +1. **`binarySource` vs reusing `phoneSource`.** Spec defines a + distinct named type — three lines of code, plus a descriptive + doc comment. The two interfaces are structurally identical; + reusing `phoneSource` would compile fine. The named-type choice + is for documentation: the call sites read `phone phoneSource` + vs `binary binarySource` rather than both saying `phoneSource`. + If the developer prefers reuse, that's acceptable — argue the + call. + +2. **Test-fake reuse strategy.** § Testing strategy describes two + approaches for a phone-with-Send-capture fake (extend + `fakePhone` vs introduce a parallel `fakePhoneSink`). Approach + (a) is preferred but (b) is fine. Either keeps the file at ~140 + LOC of new tests as the AC sized. + +## Security review + +**Verdict:** PASS + +**Findings:** + +- **[Trust boundaries]** No findings — the binary is the source of + envelopes here. The single explicit boundary is `Unmarshal` in + `envelope.go:67`; all four sentinel returns funnel through the + warn-and-continue branch. `env.Frame` is forwarded as opaque + bytes — `json.RawMessage` makes it hard to accidentally inspect. + The downstream phone's protocol layer already knows it receives + untrusted-via-relay bytes (the binary is trusted relative to the + phone, not relative to the relay). +- **[Cross-server addressing]** No findings — `PhonesFor(serverID)` + scopes the lookup to the binary's own server-id slot. A binary + cannot address phones registered under a different server-id + even if it forges an `env.ConnID` collision: the iteration only + considers phones it owns. This is a structural defence, not a + runtime check, and follows from the registry's per-server-id map + shape established in #3. +- **[Tokens, secrets, credentials]** No findings — the forwarder + never reads or constructs `X-Pyrycode-Server`, headers, or any + token. The binary handshake (#16) already happened. No log call + site in this spec includes any header value; enumerated log + fields are `event`, `server_id`, `binary_conn_id`, `conn_id`, + `err` (a library or sentinel error, not a request payload). +- **[File operations]** N/A — no filesystem access. +- **[Subprocess execution]** N/A — no subprocess. +- **[Cryptographic primitives]** N/A — no cryptographic operations + in this ticket. +- **[Network & I/O]** SHOULD FIX (deferred) — `c.Read(ctx)` has no + per-message size cap. A malicious binary can send arbitrarily + large envelopes to exhaust memory in `Unmarshal` and on the + phone's write buffers. **Out of scope for #26** — same + unbounded-read posture inherited from #15 / #25; the per-frame + size cap belongs on `WSConn.SetReadLimit` (the right hook on + `*websocket.Conn`) so it covers both `/v1/server` and + `/v1/client`. Should be a follow-up ticket, not a regression. + Default nhooyr read limit is 32 MiB which provides a soft floor. + Same finding as the #25 spec; not addressed here. +- **[Network & I/O]** No finding on `PhonesFor` linear scan — O(N) + per envelope where N is phones registered for this server-id. + Current scale (handful of phones per server, low envelope rate + per phone) makes this comfortably bounded; the alternative + (per-server-id `map[connID]Conn`) is a registry-shape change + that should be motivated by observed cost, not anticipated. +- **[Network & I/O]** No finding on backpressure — a slow phone + blocks only its own `Send` call; concurrent phones for the same + binary are not affected because the binary forwarder iterates + one envelope at a time and each `Send` is independent. The + `WSConn.Send` 10 s deadline (#15) bounds any single write, so a + wedged phone causes the forwarder to log `binary_forwarder_phone_send_failed` + within 10 s and continue. A high-rate stream of envelopes + destined for a single wedged phone would still block the + binary's loop for up to 10 s per envelope; if this becomes a + real failure mode, a follow-up ticket can introduce per-phone + send queues. Not observed yet, deferred. +- **[Error messages, logs, telemetry]** No findings — the four log + call sites enumerate fields explicitly. No envelope bytes, no + frame bytes, no headers, no tokens enter logs. `err` carries + library errors and `Unmarshal` sentinels — none of which embed + user payloads (the `Unmarshal` wrap in `envelope.go:70` includes + the JSON decoder error, which can name a byte offset but not + payload contents). No telemetry / metrics added. +- **[Concurrency]** No findings — single forwarder goroutine per + binary; no new locks; per-frame `PhonesFor` snapshot is + RLock-only and does not nest with any caller-held lock; the + iteration-after-snapshot pattern matches #3's established + passive-store contract. Goroutine lifecycle: the four + termination paths in § Concurrency model cover ctx cancel, + binary close, heartbeat-driven close, and server shutdown. + Per-frame errors do not terminate. No leaks. +- **[Threat model alignment]** No findings — the relay's role per + `protocol-mobile.md` § Routing envelope is "wrap, address, + forward; never inspect". This spec preserves that: + `Unmarshal` is structural (envelope shape only), not semantic + (inner-frame contents); the binary owns inner-frame + construction and the phone owns inner-frame interpretation. + Out-of-scope items (per-message size cap, per-phone send + queueing) are named above with deferral rationale. + +**Reviewer:** architect (self-review per +`pyrycode-relay-agents/architect/security-review.md`) +**Date:** 2026-05-10 From 7031ab840acbf6b41454bee615c9aef0ddbd4fce Mon Sep 17 00:00:00 2001 From: Juhana Ilmoniemi Date: Sun, 10 May 2026 18:13:38 +0300 Subject: [PATCH 2/3] relay: binary-side frame forwarder (#26) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds StartBinaryForwarder, the binary→phone read pump that unwraps each routing envelope, finds the phone matching env.ConnID under serverID, and writes env.Frame as opaque bytes. Replaces the CloseRead+Done placeholder in /v1/server. Diverges from StartPhoneForwarder in error policy: malformed envelopes, unknown conn_ids, and phone Send failures all log+drop and continue. The binary serves N phones, so a single bad sink or bad frame must not tear the binary connection down. Tests cover routing, multi-phone fanout, unknown conn_id continuation, malformed envelope continuation, phone-send-error continuation, binary disconnect, and ctx cancel. fakePhone gained capture-Send + Close so it directly satisfies Conn for the new tests; existing #25 tests are unaffected (still use ®istryConn{phone}). Co-Authored-By: Claude Opus 4.7 --- internal/relay/forward.go | 84 +++++++ internal/relay/forward_test.go | 358 +++++++++++++++++++++++++++++- internal/relay/server_endpoint.go | 17 +- 3 files changed, 449 insertions(+), 10 deletions(-) diff --git a/internal/relay/forward.go b/internal/relay/forward.go index 12ab86d..efe0013 100644 --- a/internal/relay/forward.go +++ b/internal/relay/forward.go @@ -72,3 +72,87 @@ func StartPhoneForwarder( } } } + +// binarySource is the read-side contract the forwarder needs from a binary +// connection. Defined at the consumer (this file), not on WSConn, so tests +// can substitute a fake. Production passes *WSConn, which satisfies it via +// its Read method. +// +// Structurally identical to phoneSource; the distinct named type documents +// the call site's intent and keeps the two Start*Forwarder signatures +// self-describing. Concurrent Read callers are NOT supported (matches +// WSConn.Read's contract). +type binarySource interface { + ConnID() string + Read(ctx context.Context) ([]byte, error) +} + +// StartBinaryForwarder runs the per-binary read pump synchronously: it +// reads envelopes from binary, unwraps each, finds the phone registered +// under serverID whose ConnID equals env.ConnID, and writes env.Frame +// (the verbatim inner bytes) to that phone. +// +// Returns when binary.Read errors or ctx is cancelled. Does NOT return on +// per-frame errors: a malformed envelope, an unknown conn_id, or a phone +// Send failure all log + drop + continue. A single bad frame from the +// binary MUST NOT tear the binary connection down — phones come and go, +// and an envelope addressing a just-disconnected phone is a normal race, +// not a binary fault. This diverges from StartPhoneForwarder, where the +// only sink is the binary so a Send failure ends the loop. +// +// The caller's defer (in /v1/server) handles ScheduleReleaseServer, +// wsconn.Close, and the heartbeat cancel; the forwarder must NOT touch +// any of those. The relay treats inner frames as opaque bytes: only +// Unmarshal's structural checks inspect the envelope, never env.Frame. +// +// Despite the Start verb, the call is synchronous; the verb matches the +// AC and mirrors StartPhoneForwarder. +func StartBinaryForwarder( + ctx context.Context, + reg *Registry, + serverID string, + binary binarySource, + logger *slog.Logger, +) error { + for { + wrapped, err := binary.Read(ctx) + if err != nil { + logger.Info("binary_forwarder_read_end", + "server_id", serverID, + "binary_conn_id", binary.ConnID(), + "err", err) + return err + } + + env, err := Unmarshal(wrapped) + if err != nil { + logger.Warn("binary_forwarder_unmarshal_err", + "server_id", serverID, + "binary_conn_id", binary.ConnID(), + "err", err) + continue + } + + var phone Conn + for _, p := range reg.PhonesFor(serverID) { + if p.ConnID() == env.ConnID { + phone = p + break + } + } + if phone == nil { + logger.Warn("binary_forwarder_unknown_conn_id", + "server_id", serverID, + "conn_id", env.ConnID) + continue + } + + if err := phone.Send(env.Frame); err != nil { + logger.Info("binary_forwarder_phone_send_failed", + "server_id", serverID, + "conn_id", env.ConnID, + "err", err) + continue + } + } +} diff --git a/internal/relay/forward_test.go b/internal/relay/forward_test.go index 0a862b7..cb27721 100644 --- a/internal/relay/forward_test.go +++ b/internal/relay/forward_test.go @@ -15,11 +15,17 @@ import ( "time" ) -// fakePhone implements phoneSource. Tests push frames onto frames; closing -// the channel signals "phone disconnected" — Read returns io.EOF. +// fakePhone implements phoneSource and Conn. Tests push frames onto frames; +// closing the channel signals "phone disconnected" — Read returns io.EOF. +// Send captures bytes for the binary-forwarder tests; #25 tests still wrap +// it via registryConn (which discards Send) and never inspect sent. type fakePhone struct { id string frames chan []byte + + mu sync.Mutex + sent [][]byte + sendErr error } func newFakePhone(id string) *fakePhone { @@ -40,6 +46,75 @@ func (p *fakePhone) Read(ctx context.Context) ([]byte, error) { } } +func (p *fakePhone) Send(msg []byte) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.sendErr != nil { + return p.sendErr + } + cp := make([]byte, len(msg)) + copy(cp, msg) + p.sent = append(p.sent, cp) + return nil +} + +func (p *fakePhone) Close() {} + +func (p *fakePhone) snapshotSent() [][]byte { + p.mu.Lock() + defer p.mu.Unlock() + out := make([][]byte, len(p.sent)) + for i, m := range p.sent { + cp := make([]byte, len(m)) + copy(cp, m) + out[i] = cp + } + return out +} + +// waitForPhoneSent polls until want frames have been captured by p.Send, +// or fails the test. Mirrors waitForSent's shape for *fakeBinary. +func waitForPhoneSent(t *testing.T, p *fakePhone, want int, timeout time.Duration) [][]byte { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + got := p.snapshotSent() + if len(got) >= want { + return got + } + time.Sleep(5 * time.Millisecond) + } + got := p.snapshotSent() + t.Fatalf("waitForPhoneSent: got %d, want %d after %v", len(got), want, timeout) + return nil +} + +// fakeBinarySource implements binarySource. Tests push wire-encoded +// envelopes onto frames; closing the channel signals "binary +// disconnected" — Read returns io.EOF. +type fakeBinarySource struct { + id string + frames chan []byte +} + +func newFakeBinarySource(id string) *fakeBinarySource { + return &fakeBinarySource{id: id, frames: make(chan []byte, 16)} +} + +func (b *fakeBinarySource) ConnID() string { return b.id } + +func (b *fakeBinarySource) Read(ctx context.Context) ([]byte, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case frame, ok := <-b.frames: + if !ok { + return nil, io.EOF + } + return frame, nil + } +} + // fakeBinary implements Conn for use as the registry-side binary. Send is // called from the forwarder goroutine while the test reads sent from the // test goroutine, so writes are mu-protected. @@ -267,3 +342,282 @@ type registryConn struct{ p *fakePhone } func (c *registryConn) ConnID() string { return c.p.ConnID() } func (c *registryConn) Send(msg []byte) error { return nil } func (c *registryConn) Close() {} + +// runBinaryForwarder spawns StartBinaryForwarder on a goroutine and returns +// a done chan that closes once it returns, plus the cancel for the parent +// context. +func runBinaryForwarder(reg *Registry, serverID string, bin binarySource) (done chan error, cancel context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + done = make(chan error, 1) + go func() { + done <- StartBinaryForwarder(ctx, reg, serverID, bin, discardLogger()) + }() + return done, cancel +} + +// claimAndRegister sets up a server slot and registers the given phones +// against it. Returns the bin used for the slot (a stillborn fakeBinary +// satisfies the Conn interface; this forwarder doesn't use BinaryFor). +func claimAndRegister(t *testing.T, reg *Registry, serverID string, phones ...*fakePhone) *fakeBinary { + t.Helper() + bin := &fakeBinary{id: "bin-" + serverID} + if err := reg.ClaimServer(serverID, bin); err != nil { + t.Fatalf("ClaimServer: %v", err) + } + for _, p := range phones { + if err := reg.RegisterPhone(serverID, p); err != nil { + t.Fatalf("RegisterPhone(%s): %v", p.ConnID(), err) + } + } + return bin +} + +func mustMarshal(t *testing.T, connID string, frame []byte) []byte { + t.Helper() + out, err := Marshal(connID, frame) + if err != nil { + t.Fatalf("Marshal(%s): %v", connID, err) + } + return out +} + +func TestStartBinaryForwarder_RoutesToAddressedPhone(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + p1 := newFakePhone("client-s1-aaaa1111") + p2 := newFakePhone("client-s1-bbbb2222") + claimAndRegister(t, reg, "s1", p1, p2) + + src := newFakeBinarySource("bin-s1") + done, cancel := runBinaryForwarder(reg, "s1", src) + defer cancel() + + inner := []byte(`{"type":"hello","x":[1,2,3]}`) + src.frames <- mustMarshal(t, p1.ConnID(), inner) + + got := waitForPhoneSent(t, p1, 1, 2*time.Second) + if !bytes.Equal(compactJSON(t, got[0]), compactJSON(t, inner)) { + t.Errorf("p1 inner bytes diverged\nwant: %s\n got: %s", + compactJSON(t, inner), compactJSON(t, got[0])) + } + if other := p2.snapshotSent(); len(other) != 0 { + t.Errorf("p2 sent = %d frames, want 0", len(other)) + } + + close(src.frames) + select { + case err := <-done: + if !errors.Is(err, io.EOF) { + t.Fatalf("forwarder return = %v, want io.EOF", err) + } + case <-time.After(time.Second): + t.Fatal("forwarder did not return after frames chan closed") + } +} + +func TestStartBinaryForwarder_MultiplePhones(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + p1 := newFakePhone("client-s1-aaaa1111") + p2 := newFakePhone("client-s1-bbbb2222") + claimAndRegister(t, reg, "s1", p1, p2) + + src := newFakeBinarySource("bin-s1") + done, cancel := runBinaryForwarder(reg, "s1", src) + defer cancel() + + inner1 := []byte(`{"type":"keystroke","data":"abc"}`) + inner2 := []byte(`{"type":"resize","cols":80,"rows":24}`) + src.frames <- mustMarshal(t, p1.ConnID(), inner1) + src.frames <- mustMarshal(t, p2.ConnID(), inner2) + + got1 := waitForPhoneSent(t, p1, 1, 2*time.Second) + got2 := waitForPhoneSent(t, p2, 1, 2*time.Second) + + if !bytes.Equal(compactJSON(t, got1[0]), compactJSON(t, inner1)) { + t.Errorf("p1 inner bytes diverged\nwant: %s\n got: %s", + compactJSON(t, inner1), compactJSON(t, got1[0])) + } + if !bytes.Equal(compactJSON(t, got2[0]), compactJSON(t, inner2)) { + t.Errorf("p2 inner bytes diverged\nwant: %s\n got: %s", + compactJSON(t, inner2), compactJSON(t, got2[0])) + } + if len(p1.snapshotSent()) != 1 { + t.Errorf("p1 sent = %d, want 1", len(p1.snapshotSent())) + } + if len(p2.snapshotSent()) != 1 { + t.Errorf("p2 sent = %d, want 1", len(p2.snapshotSent())) + } + + close(src.frames) + <-done +} + +func TestStartBinaryForwarder_UnknownConnID_DropsAndContinues(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + p1 := newFakePhone("client-s1-aaaa1111") + p2 := newFakePhone("client-s1-bbbb2222") + claimAndRegister(t, reg, "s1", p1, p2) + + src := newFakeBinarySource("bin-s1") + done, cancel := runBinaryForwarder(reg, "s1", src) + defer cancel() + + inner := []byte(`{"type":"after-bogus"}`) + src.frames <- mustMarshal(t, "client-s1-deadbeef", []byte(`{"dropped":true}`)) + src.frames <- mustMarshal(t, p1.ConnID(), inner) + + got := waitForPhoneSent(t, p1, 1, 2*time.Second) + if !bytes.Equal(compactJSON(t, got[0]), compactJSON(t, inner)) { + t.Errorf("p1 inner bytes diverged\nwant: %s\n got: %s", + compactJSON(t, inner), compactJSON(t, got[0])) + } + if other := p2.snapshotSent(); len(other) != 0 { + t.Errorf("p2 sent = %d frames, want 0", len(other)) + } + + close(src.frames) + select { + case err := <-done: + if !errors.Is(err, io.EOF) { + t.Fatalf("forwarder return = %v, want io.EOF", err) + } + case <-time.After(time.Second): + t.Fatal("forwarder did not return after frames chan closed") + } +} + +func TestStartBinaryForwarder_MalformedEnvelope_DropsAndContinues(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + p1 := newFakePhone("client-s1-aaaa1111") + claimAndRegister(t, reg, "s1", p1) + + src := newFakeBinarySource("bin-s1") + done, cancel := runBinaryForwarder(reg, "s1", src) + defer cancel() + + inner := []byte(`{"type":"after-malformed"}`) + src.frames <- []byte("not-json") + src.frames <- mustMarshal(t, p1.ConnID(), inner) + + got := waitForPhoneSent(t, p1, 1, 2*time.Second) + if !bytes.Equal(compactJSON(t, got[0]), compactJSON(t, inner)) { + t.Errorf("p1 inner bytes diverged\nwant: %s\n got: %s", + compactJSON(t, inner), compactJSON(t, got[0])) + } + + close(src.frames) + select { + case err := <-done: + if !errors.Is(err, io.EOF) { + t.Fatalf("forwarder return = %v, want io.EOF", err) + } + case <-time.After(time.Second): + t.Fatal("forwarder did not return after frames chan closed") + } +} + +func TestStartBinaryForwarder_PhoneSendError_DropsAndContinues(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + p1 := newFakePhone("client-s1-aaaa1111") + p1.sendErr = errors.New("phone wedged") + p2 := newFakePhone("client-s1-bbbb2222") + claimAndRegister(t, reg, "s1", p1, p2) + + src := newFakeBinarySource("bin-s1") + done, cancel := runBinaryForwarder(reg, "s1", src) + defer cancel() + + innerToP1 := []byte(`{"type":"to-p1"}`) + innerToP2 := []byte(`{"type":"to-p2"}`) + src.frames <- mustMarshal(t, p1.ConnID(), innerToP1) + src.frames <- mustMarshal(t, p2.ConnID(), innerToP2) + + got2 := waitForPhoneSent(t, p2, 1, 2*time.Second) + if !bytes.Equal(compactJSON(t, got2[0]), compactJSON(t, innerToP2)) { + t.Errorf("p2 inner bytes diverged\nwant: %s\n got: %s", + compactJSON(t, innerToP2), compactJSON(t, got2[0])) + } + // p1.Send returned an error; sendErr-path skips the append so sent stays empty. + if dropped := p1.snapshotSent(); len(dropped) != 0 { + t.Errorf("p1 sent = %d, want 0 (Send errored)", len(dropped)) + } + + close(src.frames) + select { + case err := <-done: + if !errors.Is(err, io.EOF) { + t.Fatalf("forwarder return = %v, want io.EOF", err) + } + case <-time.After(time.Second): + t.Fatal("forwarder did not return after frames chan closed") + } +} + +func TestStartBinaryForwarder_BinaryDisconnect_Returns(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + p1 := newFakePhone("client-s1-aaaa1111") + claimAndRegister(t, reg, "s1", p1) + + src := newFakeBinarySource("bin-s1") + done, cancel := runBinaryForwarder(reg, "s1", src) + defer cancel() + + inner := []byte(`{"type":"first"}`) + src.frames <- mustMarshal(t, p1.ConnID(), inner) + waitForPhoneSent(t, p1, 1, 2*time.Second) + + close(src.frames) + select { + case err := <-done: + if !errors.Is(err, io.EOF) { + t.Fatalf("forwarder return = %v, want io.EOF", err) + } + case <-time.After(time.Second): + t.Fatal("forwarder did not return after binary disconnect") + } + + // Mimic the handler-level defer: ScheduleReleaseServer with zero + // grace fires the timer immediately on a runtime goroutine. Poll for + // the slot to clear — verifies the AC bullet structurally. + reg.ScheduleReleaseServer("s1", 0) + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if _, ok := reg.BinaryFor("s1"); !ok { + return + } + time.Sleep(5 * time.Millisecond) + } + t.Fatal("BinaryFor(s1) still present after ScheduleReleaseServer(0) expired") +} + +func TestStartBinaryForwarder_ContextCancellation_Returns(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + p1 := newFakePhone("client-s1-aaaa1111") + claimAndRegister(t, reg, "s1", p1) + + src := newFakeBinarySource("bin-s1") + done, cancel := runBinaryForwarder(reg, "s1", src) + + cancel() + select { + case err := <-done: + if !errors.Is(err, context.Canceled) { + t.Fatalf("forwarder return = %v, want context.Canceled", err) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("forwarder did not return promptly on ctx cancel") + } +} diff --git a/internal/relay/server_endpoint.go b/internal/relay/server_endpoint.go index 4fd710e..2a7e828 100644 --- a/internal/relay/server_endpoint.go +++ b/internal/relay/server_endpoint.go @@ -100,14 +100,15 @@ func ServerHandler(reg *Registry, logger *slog.Logger, grace time.Duration) http defer cancelHB() go runHeartbeat(hbCtx, wsconn, heartbeatInterval, heartbeatTimeout) - // Hold the connection open until the peer closes it. CloseRead - // spawns a goroutine that drains-and-discards frames (including - // control frames like ping/pong, which must be processed for the - // connection to observe a peer-side close) and returns a context - // cancelled when the conn ends. The frame loop (#6) replaces this - // block with a real read loop later. - readCtx := c.CloseRead(r.Context()) - <-readCtx.Done() + // Binary-side read pump (#26): unwraps each inbound routing + // envelope and writes its inner frame to the phone matching + // env.ConnID under serverID. Blocks until the binary closes the + // WS or ctx cancels. Per-frame errors (malformed envelope, + // unknown conn_id, phone Send failure) log+drop and continue — + // the binary connection is not torn down for a single bad frame. + // Return value is observability-only; the forwarder logs the + // cause. + _ = StartBinaryForwarder(r.Context(), reg, serverID, wsconn, logger) }) } From ba7e84be10fb2d57927be8a7dc48e6a1491fa9d7 Mon Sep 17 00:00:00 2001 From: Juhana Ilmoniemi Date: Mon, 11 May 2026 09:35:13 +0300 Subject: [PATCH 3/3] docs: binary-side frame forwarder (#26) Per-ticket file + new feature doc + INDEX entry + server-endpoint updates for the StartBinaryForwarder swap. PROJECT-MEMORY patterns folded the sink-fan-out error policy and merged the two source interfaces into one bullet now that both forwarders exist. Co-Authored-By: Claude Opus 4.7 --- docs/PROJECT-MEMORY.md | 7 +- docs/knowledge/INDEX.md | 3 +- docs/knowledge/codebase/26.md | 44 ++++++ docs/knowledge/features/binary-forwarder.md | 148 ++++++++++++++++++++ docs/knowledge/features/server-endpoint.md | 23 +-- 5 files changed, 211 insertions(+), 14 deletions(-) create mode 100644 docs/knowledge/codebase/26.md create mode 100644 docs/knowledge/features/binary-forwarder.md diff --git a/docs/PROJECT-MEMORY.md b/docs/PROJECT-MEMORY.md index 0d6e151..20fddbe 100644 --- a/docs/PROJECT-MEMORY.md +++ b/docs/PROJECT-MEMORY.md @@ -43,9 +43,10 @@ Stateless WebSocket router between mobile clients and pyry binaries. Internet-ex - **Policy values live at the wiring site, not as package-level constants.** `30*time.Second` for the grace window is a literal in `cmd/pyrycode-relay/main.go`, threaded into `ServerHandler` as a constructor parameter — it appears exactly once, the value is policy (matches protocol spec), and inlining keeps the protocol-spec linkage visible where the relay is composed. Tests pass ms-scale durations through the same parameter. Promote to a package-level constant only when a second wiring entry point needs the same value. Adopted in `/v1/server` grace duration (#21). - **Pointer-identity for stale `time.AfterFunc` fires.** `time.Timer.Stop()` returns false if the timer's func has already started executing. Wrap each pending timer in a small struct and store the wrapper pointer in a map; the `AfterFunc` closure captures the wrapper pointer and asserts `map[key] == self` under the lock before acting. If a faster goroutine replaced the entry, the pointer no longer matches and the closure no-ops. Capturing the `*time.Timer` directly forces a self-referential local var (assigned after `AfterFunc` returns) which trips the race detector under stress; the wrapper avoids that. Adopted in `Registry.ScheduleReleaseServer` (#20). Same shape applies to any "one cancellable timer per key" pattern. - **Credentials the relay does not validate are presence-checked then discarded — never logged, never put in error strings.** `/v1/client`'s `X-Pyrycode-Token` is opaque to the relay (the binary owns verification). The handler reads the token into a local string, branches on `!= ""`, and lets the local go out of scope unread. The log-event field set is enumerated explicitly in the doc; the token name does not appear in any `slog` call, `fmt.Errorf`, or response body. Defence is layered: spec, code review, and the structural absence of any code path that uses the value after the gate. Same posture extends to any future "courier credential" the relay carries but does not own (e.g. session resume tokens). Adopted in `/v1/client` (#5). -- **Hold long-lived WS handlers open with `c.CloseRead(r.Context())` plus `<-readCtx.Done()` until the real read loop lands.** `CloseRead` drains-and-discards frames (including control frames — pings must be processed for the connection to observe a peer-side close). The frame-loop ticket replaces both the `CloseRead` call and the `<-readCtx.Done()` block with the actual read pump in the same call site — keeping `CloseRead` alongside a real reader would race the sole-reader contract. Adopted in `/v1/server` (#16) and `/v1/client` (#5); `/v1/client` swapped to `StartPhoneForwarder` in #25 (`/v1/server`'s placeholder remains until the binary-side forwarder lands). -- **Forwarder owns reading; handler owns cleanup.** The phone-side frame forwarder (#25) is a pure read pump — no `UnregisterPhone`, no `wsconn.Close()`, no extra goroutine. It runs synchronously on the HTTP handler goroutine; on return, the handler's `defer { UnregisterPhone; Close; log }` runs in the right order. Adding cleanup inside the forwarder would either double-close (idempotent, but muddies the lifecycle) or unregister twice. Pattern extends to the symmetric binary-side forwarder. -- **Define narrow read-side interfaces at the consumer, not on the adapter.** `phoneSource` (`ConnID() + Read(ctx)`) lives in `forward.go`, not on `WSConn`. Production passes `*WSConn`; tests substitute a fake without touching `WSConn`. If a future caller needs the same shape, promote the interface then — don't anticipate. Adopted in #25. +- **Hold long-lived WS handlers open with `c.CloseRead(r.Context())` plus `<-readCtx.Done()` until the real read loop lands.** `CloseRead` drains-and-discards frames (including control frames — pings must be processed for the connection to observe a peer-side close). The frame-loop ticket replaces both the `CloseRead` call and the `<-readCtx.Done()` block with the actual read pump in the same call site — keeping `CloseRead` alongside a real reader would race the sole-reader contract. Adopted in `/v1/server` (#16) and `/v1/client` (#5); `/v1/client` swapped to `StartPhoneForwarder` in #25, `/v1/server` swapped to `StartBinaryForwarder` in #26. Both `CloseRead` placeholders are now gone. +- **Forwarder owns reading; handler owns cleanup.** Frame forwarders are pure read pumps — no `UnregisterPhone` / `ScheduleReleaseServer`, no `wsconn.Close()`, no `cancelHB`, no extra goroutine. The forwarder runs synchronously on the HTTP handler goroutine; on return, the handler's `defer cancelHB` (#7) and `defer { release/unregister; Close; log }` (#16/#21/#5) run in LIFO order. Adding cleanup inside the forwarder would either double-close (idempotent, but muddies the lifecycle) or unregister twice. Adopted in `StartPhoneForwarder` (#25) and `StartBinaryForwarder` (#26). +- **Forwarder error policy tracks the sink fan-out.** A forwarder with **one sink** returns on sink errors (no alternative — the conn is dead): `StartPhoneForwarder` (#25) returns on `BinaryFor` miss, marshal error, and `binary.Send` error. A forwarder with **N sinks** continues on per-sink errors (other sinks are still good): `StartBinaryForwarder` (#26) logs+continues on unknown `conn_id`, malformed envelope, and `phone.Send` error. Both return on source `Read` error or ctx cancel — those are unambiguous. The shape isn't a stylistic choice; it follows from how many independent downstream lifecycles the loop is multiplexing. +- **Define narrow read-side interfaces at the consumer, not on the adapter.** `phoneSource` and `binarySource` (`ConnID() + Read(ctx)`) live in `forward.go`, not on `WSConn`. Production passes `*WSConn`; tests substitute a fake without touching `WSConn`. The two interfaces are structurally identical but named distinctly so the `Start*Forwarder` signatures read self-describingly at call sites; promote to a shared interface only if a third caller emerges. Adopted in #25 and #26. - **Per-conn goroutines exit cleanly via LIFO defers, not by closing the conn themselves.** Pattern for any goroutine launched alongside an upgraded WS conn (heartbeat, future binary-side ack pump, etc.): derive `hbCtx, cancelHB := context.WithCancel(r.Context())`, register `defer cancelHB()` *after* the release/unregister defer, then `go runX(hbCtx, wsconn, ...)`. Under LIFO unwind `cancelHB` fires first → goroutine observes `ctx.Done()` and returns without touching the conn → release defer's `wsconn.Close()` runs unopposed. The goroutine owns ONLY the failure-path close (heartbeat: `CloseWithCode(1011, "heartbeat timeout")`); cleanup-path close is the handler defer's. `closeOnce` makes the two paths idempotent against each other regardless of which fires first. Adopted in heartbeat (#7); same shape applies to any future per-conn watchdog/timer goroutine. - **Active-conn application close codes go through `WSConn.CloseWithCode`; stillborn-conn close codes go through the underlying `*websocket.Conn`.** ADR-0005's stillborn-conn pattern (`c.Close(websocket.StatusCode(4409), reason)` directly on the underlying conn before any `Send` could have run) coexists with ADR-0007's active-conn pattern (`wsconn.CloseWithCode(1011, "heartbeat timeout")` post-claim, where `closeOnce`/`writeMu` invariants are in play). Both share the same `closeOnce` guard via `Close()` delegating to `CloseWithCode(StatusNormalClosure, "")`. The two patterns describe structurally distinct windows of a WSConn's lifecycle. Adopted: stillborn `4409`/`4404` (#16, #5); active `1011` (#7). - **Capture process-state timestamps in `main` after `flag.Parse()`, not as package-level vars.** `startedAt := time.Now()` lives inside `main` and is passed into the handler factory. A package-level `var startedAt = time.Now()` would fire at import time — before flag parsing, before `--version` early-returns — and be wrong for short-lived test binaries and any future deferred-serve setup. Adopted in #10. diff --git a/docs/knowledge/INDEX.md b/docs/knowledge/INDEX.md index cd7aa1b..8581a39 100644 --- a/docs/knowledge/INDEX.md +++ b/docs/knowledge/INDEX.md @@ -4,10 +4,11 @@ One-line pointers into the evergreen knowledge base. Newest entries at the top o ## Features +- [Binary-side frame forwarder](features/binary-forwarder.md) — per-binary read pump: unwraps each inbound routing envelope, linear-scans `PhonesFor(serverID)` for `env.ConnID`, writes `env.Frame` verbatim to that phone; opaque inner bytes; synchronous (handler discards the return); diverges from #25 in error policy — unknown `conn_id`, malformed envelope, phone `Send` error all log+continue (a single bad frame never tears down the binary); replaced `/v1/server`'s `CloseRead` placeholder (#26). - [WebSocket heartbeat](features/heartbeat.md) — per-conn goroutine on both endpoints sends RFC 6455 ping every 30s; closes with `1011 "heartbeat timeout"` if no pong within 30s. Detects half-open TCP within 60s; ctx-cancel exit path leaves close to the handler defer (#7). - [Phone-side frame forwarder](features/phone-forwarder.md) — per-phone read pump: wraps each inbound phone frame in the routing envelope keyed by the phone's `conn_id` and `Send`s it to the binary holding `serverID`; opaque inner bytes; synchronous (handler discards the return); replaced `/v1/client`'s `CloseRead` placeholder; added `WSConn.Read` (single-caller) (#25). - [`/v1/client` WS upgrade](features/client-endpoint.md) — phone-side ingress: validates `X-Pyrycode-Server` / `X-Pyrycode-Token` / `User-Agent` pre-upgrade (token presence-only, never parsed/logged); registers phone on the binary's slot, emits `4404` if no binary holds the id, hands the conn to `StartPhoneForwarder` for the data path (#5, #25). -- [`/v1/server` WS upgrade](features/server-endpoint.md) — binary-side ingress: validates `X-Pyrycode-Server` / `X-Pyrycode-Version` / `User-Agent` pre-upgrade, claims the slot, emits `4409` on conflict, holds the conn via `CloseRead` until #6's frame loop replaces it; on disconnect schedules a 30s grace release so a quick reconnect inherits the slot (#21). +- [`/v1/server` WS upgrade](features/server-endpoint.md) — binary-side ingress: validates `X-Pyrycode-Server` / `X-Pyrycode-Version` / `User-Agent` pre-upgrade, claims the slot, emits `4409` on conflict, hands the conn to `StartBinaryForwarder` for the data path (#26); on disconnect schedules a 30s grace release so a quick reconnect inherits the slot (#16, #21, #26). - [`/healthz` JSON endpoint](features/healthz.md) — unauthenticated `GET /healthz` returning `{status, version, connected_binaries, connected_phones, uptime_seconds}`; `Cache-Control: no-store`, body bounded ≈135 bytes. - [WSConn adapter](features/ws-conn-adapter.md) — wraps `nhooyr.io/websocket.Conn` to satisfy the registry's `Conn`; owns the per-conn write mutex and a `Close`-cancelled context with a 10s per-`Send` deadline. - [Connection registry](features/connection-registry.md) — thread-safe `Registry` (server-id → binary 1:1, server-id → phones 1:N) with `Conn` interface, snapshot-returning `PhonesFor`, sentinel errors for `4409` / `4404`, deferred-release `ScheduleReleaseServer` with grace-window reclaim semantics. diff --git a/docs/knowledge/codebase/26.md b/docs/knowledge/codebase/26.md new file mode 100644 index 0000000..8964c03 --- /dev/null +++ b/docs/knowledge/codebase/26.md @@ -0,0 +1,44 @@ +# Ticket #26 — Binary-side frame forwarder (unwrap envelope, dispatch to addressed phone) + +Lands the binary-to-phone data path. After `/v1/server` claims the slot (#16/#21) and launches the heartbeat goroutine (#7), the handler now hands the conn to `StartBinaryForwarder`, which reads frames from the binary, unwraps each routing envelope, looks up the phone matching `env.ConnID` under the binary's `serverID`, and writes `env.Frame` verbatim to that phone. Replaces the `c.CloseRead(r.Context())` + `<-readCtx.Done()` placeholder #16 left in place; the relay still treats inner frames as opaque bytes. + +## Implementation + +- **`internal/relay/forward.go`** — added `binarySource` interface (`ConnID() + Read(ctx)`) and `StartBinaryForwarder(ctx, reg, serverID, binary, logger) error`. Structurally a mirror of `StartPhoneForwarder` (synchronous despite the `Start` verb; consumer-defined source interface; return value is observability-only and the handler discards it). Diverges in **error policy**: unknown `conn_id`, malformed envelope, and `phone.Send` error all `log + continue`, never `return`. Only `binary.Read` errors or ctx cancellation end the loop. The divergence is structural — a binary serves N phones, and a single failing sink (or an envelope addressed to a phone that just disconnected) is a normal race, not a binary fault. +- **`internal/relay/server_endpoint.go`** — the placeholder `readCtx := c.CloseRead(r.Context()); <-readCtx.Done()` block is replaced by `_ = StartBinaryForwarder(r.Context(), reg, serverID, wsconn, logger)`. The `CloseRead` call is **deleted**, not retained — keeping it would spawn a second reader that races the forwarder for the `*websocket.Conn` sole-reader contract. Handler's existing `defer { ScheduleReleaseServer; Close; log server_released }` (#16/#21) and `defer cancelHB()` (#7) run on return in the correct LIFO order: `cancelHB → release defer`. The forwarder owns reading; the handler owns cleanup. +- **`internal/relay/forward_test.go`** — extended with seven new tests. New fakes: `fakeBinarySource` (Read-side, mirrors `fakePhone`'s frames-chan shape) and `Send`-capture fields added to the existing `fakePhone` (`mu` + `sent` + `sendErr`) so it can serve as the sink. New helpers: `runBinaryForwarder`, `waitForPhoneSent`, `claimAndRegister`, `mustMarshal`. Tests (1:1 with AC): + - `TestStartBinaryForwarder_RoutesToAddressedPhone` — envelope to P1 lands at P1, byte-stable modulo whitespace; P2 untouched. + - `TestStartBinaryForwarder_MultiplePhones` — one envelope per phone, each receives only its own. + - `TestStartBinaryForwarder_UnknownConnID_DropsAndContinues` — bogus `conn_id` dropped; subsequent valid envelope still lands on P1; closing the binary's frames chan returns `io.EOF` (proves loop never died). + - `TestStartBinaryForwarder_MalformedEnvelope_DropsAndContinues` — raw `not-json` dropped; subsequent valid envelope still lands. + - `TestStartBinaryForwarder_PhoneSendError_DropsAndContinues` — `p1.sendErr` set; envelope to P1 dropped (no panic, no return); envelope to P2 still lands. Encodes the divergence from `StartPhoneForwarder`'s return-on-Send-error. + - `TestStartBinaryForwarder_BinaryDisconnect_Returns` — close source frames chan → forwarder returns `io.EOF`; test then mimics the handler defer via `reg.ScheduleReleaseServer("s1", 0)` and polls `BinaryFor` to confirm the slot clears — verifies the handler-defer-runs-as-expected AC bullet structurally. + - `TestStartBinaryForwarder_ContextCancellation_Returns` — `cancel()` → forwarder returns `context.Canceled` within 100 ms. +- The package-level `// Manual stress: go test -race -count=20 ./internal/relay/` doc comment already in `forward_test.go` (from #25) continues to cover the new tests; no edit needed there. + +## Concurrency + +One forwarder goroutine per binary (the HTTP handler goroutine itself; no extra goroutine spawned). Runs alongside the heartbeat goroutine; the two never share state. `phone.Send` is multi-caller across the system (every binary's forwarder writing to a given phone — `WSConn.writeMu` serialises). `reg.PhonesFor` returns a fresh snapshot under RLock and is iterated without holding any registry lock — `phone.ConnID()` / `phone.Send` are called outside the lock, matching the established passive-store pattern. + +Four termination paths: +1. Binary closes WS → `Read` close error → loop returns → handler defer. +2. Server shutdown / request cancel → ctx cancels → `Read` ctx error → loop returns. +3. Heartbeat fires → `wsconn.CloseWithCode(1011, "heartbeat timeout")` → in-flight `Read` aborts → loop returns. +4. Per-frame errors → `continue`. Loop never terminated by frame faults. + +## Deliberately out of scope + +- **Per-message size cap on `WSConn`.** Same residual as #25 — belongs on `WSConn.SetReadLimit` so it covers both `/v1/server` and `/v1/client`. nhooyr's default 32 MiB read limit is a soft floor. Follow-up ticket. +- **Per-phone send queues.** A high-rate stream of envelopes destined for a single wedged phone can block the binary's loop for up to 10 s per envelope (the `WSConn.Send` deadline). Not observed yet; deferred. +- **Per-server-id `map[connID]Conn` for O(1) phone lookup.** Current iteration is O(N) per envelope; N is small (handful of phones per server). Registry-shape change should be motivated by observed cost, not anticipated. +- **No branching on specific `Unmarshal` sentinels.** All four (`ErrMalformedEnvelope`, `ErrMissingConnID`, `ErrMissingFrame`, defensive catch-all) get the same `warn + continue`; branching adds no behaviour and risks divergence if envelope errors are added later. + +## Cross-links + +- [Feature: Binary-side frame forwarder](../features/binary-forwarder.md) — full design / error-policy divergence / testing rationale. +- [Feature: `/v1/server` WS upgrade](../features/server-endpoint.md) — the wiring site; `CloseRead` placeholder removed. +- [Feature: Phone-side frame forwarder](../features/phone-forwarder.md) — mirror image; explains the shared `*Source` interface pattern. +- [Routing envelope](../features/routing-envelope.md) — `Unmarshal` is the per-frame structural check. +- [ADR-0001](../decisions/0001-routing-envelope-shape-and-opacity.md) — `json.RawMessage` opacity; sentinel errors funnel through one warn-and-continue branch. +- [Spec: 26-binary-forwarder](../../specs/architecture/26-binary-forwarder.md) — architect's pre-implementation design. +- [Protocol spec § Routing envelope](https://github.com/pyrycode/pyrycode/blob/main/docs/protocol-mobile.md#routing-envelope) — authoritative wire shape. diff --git a/docs/knowledge/features/binary-forwarder.md b/docs/knowledge/features/binary-forwarder.md new file mode 100644 index 0000000..698929e --- /dev/null +++ b/docs/knowledge/features/binary-forwarder.md @@ -0,0 +1,148 @@ +# Binary-side frame forwarder + +The binary-side data path. After `/v1/server` claims the slot for `serverID` and the heartbeat goroutine has been launched, the handler hands the connection to `StartBinaryForwarder`, which reads frames from the binary, unwraps each routing envelope, looks up the phone registered under `serverID` whose `ConnID()` equals `env.ConnID`, and writes `env.Frame` verbatim to that phone. Inner frames are opaque bytes — the relay never parses the inner protocol. + +Mirror image of the [phone-side forwarder](phone-forwarder.md). Same overall shape — synchronous despite the `Start` verb, consumer-defined source interface, return value is observability-only — with a divergent **error policy**: a single misaddressed envelope, malformed envelope, or failing `phone.Send` does NOT tear down the binary connection. + +Replaces the `c.CloseRead(r.Context())` + `<-readCtx.Done()` placeholder #16 left in place pending the read loop. + +Authoritative wire spec: [`pyrycode/pyrycode/docs/protocol-mobile.md` § Routing envelope](https://github.com/pyrycode/pyrycode/blob/main/docs/protocol-mobile.md#routing-envelope). + +## API + +Package `internal/relay` (`forward.go`): + +```go +type binarySource interface { + ConnID() string + Read(ctx context.Context) ([]byte, error) +} + +func StartBinaryForwarder( + ctx context.Context, + reg *Registry, + serverID string, + binary binarySource, + logger *slog.Logger, +) error +``` + +Despite the `Start` verb (carried from the AC), the call is **synchronous**: it blocks until `binary.Read` errors or `ctx` is cancelled. The returned error is for observability only — the `/v1/server` handler discards it (`_ = StartBinaryForwarder(...)`); lifecycle is closed by the handler's existing `defer { ScheduleReleaseServer; Close; log server_released }` (#16/#21) and `defer cancelHB()` (#7). + +`binarySource` is package-private and defined at the consumer (this file), structurally identical to `phoneSource` but distinct so the two `Start*Forwarder` signatures read self-describingly at call sites. Production passes `*WSConn`, which satisfies it via `Read`. Tests substitute a fake without touching `WSConn`'s shape. + +## Loop body + +Per iteration: + +1. `binary.Read(ctx)` for the next wire-encoded envelope. Any error → log `binary_forwarder_read_end` (info) and **return**. +2. `Unmarshal(wrapped)`. Any sentinel (`ErrMalformedEnvelope`, `ErrMissingConnID`, `ErrMissingFrame`, defensive catch-all) → log `binary_forwarder_unmarshal_err` (warn) and **continue**. +3. Linear-scan `reg.PhonesFor(serverID)` for the first `p.ConnID() == env.ConnID`. Miss → log `binary_forwarder_unknown_conn_id` (warn) and **continue**. +4. `phone.Send(env.Frame)`. Error → log `binary_forwarder_phone_send_failed` (info) and **continue**. + +The relay neither parses nor canonicalises the inner bytes; `env.Frame` is forwarded verbatim. `json.RawMessage` makes accidental inspection hard. + +## Error policy divergence from `StartPhoneForwarder` + +| Cause | `StartPhoneForwarder` | `StartBinaryForwarder` | +|---|---|---| +| Source `Read` error | return | return | +| Envelope (un)marshal error | return | **continue** | +| Sink missing (no binary / unknown conn_id) | return `nil` | **continue** | +| Sink `Send` error | return | **continue** | + +The phone-side forwarder has one sink (the binary); a failing sink means the conn is dead, so the loop ends. The binary-side forwarder has N sinks (phones); a single failing sink does not justify dropping every other phone served by this binary. The phone-side semantics encode "frame loop owns the lifecycle of its single pair"; the binary-side semantics encode "frame loop is decoupled from the lifecycle of its sinks." A bad frame from the binary MUST NOT tear the binary connection down — phones come and go, and an envelope addressing a just-disconnected phone is a normal race. + +The forwarder does not branch on specific `Unmarshal` sentinels — all four get the same warn-and-continue treatment. Branching would add no behaviour and risks divergence if envelope errors are added later. + +## Termination paths + +| Cause | Mechanism | Cleanup owner | +|---|---|---| +| Binary closes WS | `c.Read` returns close error → loop returns | handler's `defer` | +| Server shutdown / request cancel | `ctx` cancel → `c.Read` returns `ctx.Err()` → loop returns | handler's `defer` | +| Heartbeat fires (no pong) | `wsconn.CloseWithCode(1011, "heartbeat timeout")` → underlying `*websocket.Conn` aborts in-flight `Read` → loop returns | handler's `defer` (`closeOnce` swallows the second close) | +| Per-frame error | `continue` — loop does **not** terminate | n/a | + +All termination paths terminate the single goroutine; the handler's defer cleans up registry state. **No goroutine leaks.** + +## Concurrency model + +- One forwarder per binary — runs on the HTTP handler goroutine itself; no extra goroutine spawned. Runs in parallel with the heartbeat goroutine on a sibling goroutine; the two never share state. +- `binary.Read` (and `WSConn.Read`) is **single-caller**: the forwarder is the sole reader by contract. Concurrent `Read` is not supported by the underlying library. +- `phone.Send` is multi-caller across the system: every binary's forwarder writing to a given phone, plus any future server-side signal path. `WSConn.writeMu` (#15) serialises them. No new locks added by this ticket. +- `reg.PhonesFor` takes RLock and returns a fresh snapshot; cheap, contention-free. Iteration runs outside the registry lock — `phone.ConnID()` and `phone.Send` are called on snapshot entries, not registry state. Matches the established passive-store contract. +- Backpressure: a slow phone blocks only its own `Send` call; concurrent phones for the same binary are not affected because the forwarder iterates one envelope at a time and each `Send` is independent. The `WSConn.Send` 10 s deadline (#15) bounds any single write — a wedged phone causes a `binary_forwarder_phone_send_failed` log + continue within ~10 s. + +## Edits to neighbouring code + +- `/v1/server` handler now ends with `_ = StartBinaryForwarder(r.Context(), reg, serverID, wsconn, logger)` instead of `c.CloseRead(...)` + `<-readCtx.Done()`. The `CloseRead` call is **deleted entirely** — the new read loop processes control frames inline with data reads, and retaining `CloseRead` would race the forwarder for the `*websocket.Conn` sole-reader role (see `lessons.md` § "A long-lived WS handler that does not read frames will never observe peer close"). +- No changes to `WSConn`: `Read` was added in #25 and already satisfies `binarySource` via the same method. + +## Logging + +Field set is fixed; nothing else (envelope bytes, frame bytes, headers, tokens) appears. + +| event | level | fields | +|---|---|---| +| `binary_forwarder_read_end` | info | `server_id`, `binary_conn_id`, `err` | +| `binary_forwarder_unmarshal_err` | warn | `server_id`, `binary_conn_id`, `err` | +| `binary_forwarder_unknown_conn_id` | warn | `server_id`, `conn_id` | +| `binary_forwarder_phone_send_failed` | info | `server_id`, `conn_id`, `err` | + +`err` carries library errors (close codes, ctx cancellation, write deadlines) and `Unmarshal` sentinels — none of which embed user payloads (the `Unmarshal` wrap in `envelope.go:70` includes the JSON decoder error, which can name a byte offset but not payload contents). The handler's `server_released` log bookends the lifecycle. + +## What this forwarder deliberately does NOT do + +- **No `ScheduleReleaseServer`, no `wsconn.Close()`, no `cancelHB`.** The handler's defers own all three. Doing them here would either double-close or arm a stray grace timer. +- **No inner-frame parsing.** The relay's role per `protocol-mobile.md` § Routing envelope is "wrap, address, forward; never inspect." `Unmarshal`'s structural checks inspect envelope shape only, never `env.Frame`. +- **No retries.** Drop the offending frame and keep serving. +- **No buffering, no bounded channels.** A slow phone blocks this goroutine for one envelope at a time; `WSConn.Send`'s 10 s deadline bounds the worst case. +- **No per-frame size cap.** Inherited from `WSConn`'s underlying `*websocket.Conn` (nhooyr's default 32 MiB read limit). A deliberate per-message cap is a follow-up against `WSConn` so it covers both forwarders. Not a regression introduced by this ticket; named explicitly in the architect's security review. +- **No branching on specific `Unmarshal` sentinels.** All sentinels funnel through one warn-and-continue branch — `errors.Is` is not needed when every case is the same. +- **No heartbeat / ping-pong.** Separate goroutine (#7). + +## Adversarial framing + +- **Cross-server addressing.** `PhonesFor(serverID)` scopes the lookup to the binary's own slot. A binary cannot address phones registered under a different server-id even if it forges an `env.ConnID` collision — the iteration only considers phones it owns. Structural defence, not a runtime check; follows from the registry's per-server-id map shape (#3). +- **Malformed envelope.** `Unmarshal` returns a sentinel; loop logs + continues. The binary owns its own protocol health; the relay does not punish it for one bad frame. +- **Unknown `conn_id`.** Phone disconnected between the binary's last observation and this envelope, or the binary addressed a phone it shouldn't know about. Either way, a normal race or a binary bug — drop, continue. +- **Adversarial `env.Frame` contents.** Forwarded verbatim as opaque bytes. The downstream phone's protocol layer already knows it receives untrusted-via-relay bytes (the binary is trusted relative to the phone, not relative to the relay). +- **Slow phone.** Bounded by `WSConn.Send`'s 10 s deadline. A wedged phone causes log + continue within ~10 s. +- **Frame larger than any phone's tolerance.** Forwarder doesn't know or care; per-message size on the inbound side is the inherited residual on `WSConn`. +- **Crafted log content.** `err` strings come from the WS library or `Unmarshal` sentinels; no envelope/frame bytes enter logs. + +Verdict from the architect's security review: **PASS**. No new credentials handled, no new locks, no logging surface beyond the four enumerated events. The single residual (per-message size cap on `WSConn`) is shared with #25 and tracked separately. + +## Testing + +`internal/relay/forward_test.go` (extended from #25), `package relay`. All tests wire mocks to a real `Registry`; no httptest server (handler-level integration is covered by `/v1/server`'s own tests). + +Two new test fakes added alongside `fakePhone` / `fakeBinary` from #25: + +- `fakeBinarySource` — implements `binarySource`. Holds a `frames chan []byte`. `Read` selects on `ctx.Done()` and `frames`; closed chan → `io.EOF`. Mirrors `fakePhone`'s Read-side shape. +- `fakePhone` was extended (not duplicated) to satisfy `Conn` directly via a mu-protected `sent [][]byte` and a `sendErr` knob. Existing #25 tests use `®istryConn{phone}` and are unaffected (they never inspect `fakePhone.sent`). New tests register `fakePhone` directly via `reg.RegisterPhone(serverID, phone)` and inspect `phone.snapshotSent()`. + +Tests (1:1 with AC): + +- **Routes to addressed phone.** Envelope addressed to P1 lands at P1; P2 receives nothing; inner bytes byte-equal modulo whitespace via `json.Compact`. +- **Multiple phones.** One envelope per phone; each receives exactly its own. +- **Unknown conn_id drops and continues.** Bogus `client-s1-deadbeef` dropped; subsequent envelope to P1 still lands; closing the binary frames chan returns `io.EOF` (proves the forwarder is still running). +- **Malformed envelope drops and continues.** `[]byte("not-json")` dropped; subsequent valid envelope lands. +- **Phone Send error drops and continues.** `p1.sendErr` non-nil; envelope to P1 dropped (`len(p1.sent) == 0`); envelope to P2 still lands. **Encodes the divergence** from `StartPhoneForwarder`'s return-on-Send-error. +- **Binary disconnect returns.** Source `Read` returns `io.EOF` → forwarder returns; test mimics handler defer via `reg.ScheduleReleaseServer("s1", 0)` and polls `BinaryFor` to confirm the slot clears. +- **Context cancellation returns.** `cancel()` → forwarder returns `context.Canceled` within 100 ms. + +`make test` clean with `-race`. Package-level doc comment in `forward_test.go` already documents `go test -race -count=20 ./internal/relay/` per the race-count lesson. + +## Related + +- [`/v1/server`](server-endpoint.md) — sole production caller; owns the cleanup defers the forwarder must not touch. +- [Phone-side forwarder](phone-forwarder.md) — mirror image; explains why the two have distinct named source interfaces despite identical shape. +- [Routing envelope](routing-envelope.md) — `Unmarshal` is the per-frame structural check; all four sentinels funnel through one warn-and-continue branch. +- [WSConn adapter](ws-conn-adapter.md) — `Read` (added in #25) satisfies `binarySource` in production. +- [Connection registry](connection-registry.md) — `PhonesFor` is the per-frame lookup; snapshot semantics guarantee lock-free iteration. +- [Heartbeat](heartbeat.md) — sibling per-conn goroutine; closes the conn with `1011` on pong timeout, which aborts the forwarder's in-flight `Read`. +- [ADR-0001](../decisions/0001-routing-envelope-shape-and-opacity.md) — opacity-by-type and sentinel errors. +- [ADR-0006](../decisions/0006-grace-period-as-reclaim-path.md) — grace-window semantics; the forwarder is dead before grace expiry and never sees expiry-time state. +- [Protocol spec § Routing envelope](https://github.com/pyrycode/pyrycode/blob/main/docs/protocol-mobile.md#routing-envelope) — authoritative wire shape. diff --git a/docs/knowledge/features/server-endpoint.md b/docs/knowledge/features/server-endpoint.md index 15ee63b..a6ed9b6 100644 --- a/docs/knowledge/features/server-endpoint.md +++ b/docs/knowledge/features/server-endpoint.md @@ -2,7 +2,7 @@ `/v1/server` is the relay's ingress for pyry binaries. A binary opens an outbound WSS to the relay, sends three required headers, and — if no other binary holds the same `serverID` — gets registered in the connection registry and held there until the connection ends. The endpoint enforces first-claim-wins via `Registry.ClaimServer`; a duplicate claim is closed with WS code `4409`. On disconnect (clean close, network error, ping timeout), the slot is scheduled for release after a 30-second grace window so a quick reconnect lands back in the same slot with phones still attached. -This is the binary side only. Phone ingress (`/v1/client`) is #5; frame forwarding is #6; heartbeat is #7. The handler currently holds the connection open by draining-and-discarding frames — #6 swaps in the real read loop in the same call site. +This is the binary side only. Phone ingress (`/v1/client`) is #5; phone-side frame forwarding is #25; heartbeat is #7. The binary-side data path is wired here via `StartBinaryForwarder` (#26) — the handler hands the conn to the forwarder after the claim and heartbeat are in place, and the forwarder unwraps each routing envelope and writes the inner frame to the addressed phone. ## Wire shape @@ -62,7 +62,8 @@ The `30*time.Second` literal lives at the wiring site, not as a package-level co - On any other error → `wsconn.Close()`, return (defensive; not currently reachable). 6. Log `server_claimed`. 7. `defer { reg.ScheduleReleaseServer(serverID, grace); wsconn.Close; log server_released }`. The defer is registered **after** the successful claim so a missed/conflicted claim never tries to schedule a release for a slot we don't hold — and never arms a stray grace timer for an id it never owned. -8. `readCtx := c.CloseRead(r.Context()); <-readCtx.Done()` — block until the peer closes. +8. Launch the heartbeat goroutine via `hbCtx, cancelHB := context.WithCancel(r.Context()); defer cancelHB(); go runHeartbeat(hbCtx, wsconn, …)` (#7). +9. `_ = StartBinaryForwarder(r.Context(), reg, serverID, wsconn, logger)` (#26) — synchronously runs the per-binary read pump. Returns on `binary.Read` error or ctx cancel; per-frame errors (malformed envelope, unknown `conn_id`, phone `Send` failure) `log + continue` without ending the loop. Return value is observability-only. ## Logging @@ -87,17 +88,18 @@ The conflict path **deliberately omits** `binary_version`: an unauthenticated pr | Header validation | request goroutine | none | pre-upgrade; no resources held | | `websocket.Accept` | request goroutine | none | conn allocated on success | | `ClaimServer` | request goroutine | registry write lock (held internally) | one-shot | -| `CloseRead` | spawns one read-discard goroutine | none | terminates on conn close / `r.Context()` cancel | -| `<-readCtx.Done()` | request goroutine, blocking | none | unblocks on peer close | +| `runHeartbeat` | sibling goroutine | none (control frames serialised by the library) | exits on `hbCtx` cancel or pong timeout | +| `StartBinaryForwarder` | request goroutine, blocking | none (per-frame `PhonesFor` RLock, snapshot) | unblocks on `binary.Read` error or ctx cancel; per-frame errors continue | +| `defer cancelHB` | request goroutine | none | LIFO: runs before the release defer | | `defer` (schedule-release/close/log) | request goroutine | `wsconn.closeOnce` | runs only on the success path; idempotent | -Shutdown: `http.Server` cancels `r.Context()` → `CloseRead`'s read errors → `readCtx` cancels → handler unblocks → defer runs → slot scheduled for release. The grace timer is owned by `time.AfterFunc` inside the registry; the handler does not block on, observe, or reason about it. Process exit drops pending timers (the registry has no on-disk state); a binary mid-grace at shutdown loses its slot the moment the new process starts — same shape as a binary mid-claim at shutdown. +Shutdown: `http.Server` cancels `r.Context()` → forwarder's `Read` returns ctx error → forwarder returns → `cancelHB` fires (heartbeat goroutine observes `ctx.Done()` and exits without touching the conn) → release defer runs → slot scheduled for release. The grace timer is owned by `time.AfterFunc` inside the registry; the handler does not block on, observe, or reason about it. Process exit drops pending timers (the registry has no on-disk state); a binary mid-grace at shutdown loses its slot the moment the new process starts — same shape as a binary mid-claim at shutdown. ## Design notes - **`OriginPatterns: ["*"]`.** `nhooyr.io/websocket.Accept` rejects cross-origin upgrades by default. Programmatic clients don't have to send `Origin`; setting `["*"]` says "this endpoint is not browser-facing." Safe because the custom-header gate one layer above structurally excludes browser-driven CSWSH: raw browser WebSocket cannot set custom request headers, and `fetch` with custom headers triggers a CORS pre-flight this endpoint does not satisfy. - **Direct `c.Close` on conflict, not `wsconn.Close()`.** `WSConn.Close` always emits `StatusNormalClosure`; the conflict path needs `4409`. Adding a custom-code close to `WSConn` would inflate its surface for one caller. The conflict case is a stillborn WSConn — no `Send` was attempted, no goroutine holds `writeMu` — so the WSConn invariant ("reach the connection only through WSConn methods") is preserved in spirit. The handler comments name this exception so a future reader doesn't generalise it. See [ADR-0005](../decisions/0005-application-close-codes-via-underlying-conn.md). -- **`CloseRead` instead of `for { c.Read() }`.** The handler must keep the goroutine alive until the peer disconnects, but reading frames is #6's job. `CloseRead` spawns a goroutine that drains-and-discards frames (including control frames — pings must be processed for the connection to observe a peer-side close) and returns a context cancelled on close. When #6 lands, `<-readCtx.Done()` is replaced with a real frame loop in the same call site. +- **`StartBinaryForwarder` is the sole reader (#26).** The earlier `CloseRead` + `<-readCtx.Done()` placeholder is deleted, not retained alongside the forwarder: keeping it would spawn a second drain-and-discard goroutine that races the forwarder for the `*websocket.Conn` sole-reader contract. The forwarder processes control frames inline with data reads, so peer close still reaches the handler promptly. See [Binary-side forwarder](binary-forwarder.md) for the read-loop design. - **`crypto/rand`-backed `connID` suffix.** 32 bits is sufficient: scoped per server-id, used only as an opaque map key in `UnregisterPhone`. Birthday bound non-trivial only at ~2¹⁶ live conns under one server-id — far beyond v1 scale. Using `crypto/rand` over `math/rand` is forward-compat hardening: if a future ticket exposes the conn-id (e.g. echoing it in an error envelope), unguessable bytes avoid creating an oracle by accident. - **`crypto/rand` failure is fatal-by-design.** `randHex8` panics on RNG read failure; on Linux/macOS the OS RNG does not block once boot-initialised, so a failure here means a broken host. `http.Server` panic recovery terminates the connection; the slot is not yet claimed when `randHex8` runs, so the registry stays clean. - **`grace` is a constructor parameter, not a package-level var or `Options` struct.** Honest about the dependency; consistent with how the handler already takes `reg` and `logger`; the value is policy and policy belongs at the wiring site (`cmd/pyrycode-relay/main.go`). A package-level override (e.g. `setGracePeriod(t, d)`) would be action-at-a-distance and force test serialisation. An `Options` struct would be premature for a single field. @@ -109,9 +111,9 @@ Shutdown: `http.Server` cancels `r.Context()` → `CloseRead`'s read errors → - **No `400` log line.** Avoids amplifying header-floods into log volume. - **No log on `websocket.Accept` errors.** Library writes a 4xx; the failure is visible in the http access log. A per-failure log would invite log-flood from misconfigured clients. A counter (Prometheus, future) is the right shape if observability later wants this. - **No connection caps (per-IP or global).** Documented residual in `docs/threat-model.md` § DoS resistance; the WS upgrade path is named there. Inherited gap, not widened. -- **No frame loop.** `CloseRead` discards frames until #6 replaces it. +- **No inline frame handling.** `StartBinaryForwarder` owns reading; the handler is a thin upgrade-and-claim shell. See [Binary-side forwarder](binary-forwarder.md). - **No heartbeat policy in the handler.** The handler launches `go runHeartbeat(...)` after the successful claim and registers `defer cancelHB()` so the goroutine exits cleanly under handler unwind (#7). The heartbeat policy itself — 30s interval, 30s pong timeout, `1011 "heartbeat timeout"` close — lives in `heartbeat.go`. See [Heartbeat feature](heartbeat.md). -- **No frame-forward error handling during grace.** A phone whose `Send` errors against the closed binary `Conn` during the grace window is #6's territory; the handler does not observe these. +- **No frame-forward error handling during grace.** A phone whose `Send` errors against the closed binary `Conn` during the grace window is the forwarders' territory (#25 returns; #26 logs+continues); the handler does not observe these. - **No token validation.** `/v1/server` is unauthenticated by design — the binary owns the trust relationship with phones. - **No log-line sanitisation.** `slog`'s text handler quotes string values, so a newline in `binary_version` cannot forge a log line. JSON handler in production preserves the property. @@ -124,12 +126,12 @@ The handler is internet-exposed and unauthenticated. Threats considered: - **Grace-window map growth.** `binaries` and `timers` map sizes scale as `grace × accept-rate` in the worst case (an attacker rapidly connect-disconnects with distinct server-ids, leaving a 30 s residual entry per id). Each entry is bounded — the timer eventually fires and reclaims it — so growth is not unbounded. Per-IP and total-connection caps that would ceiling the accept-rate are #5 / #16 territory and explicitly deferred. - **Crafted headers (NULs, very long values).** `slog` quotes string values; `net.SplitHostPort` runs on `r.RemoteAddr` (set by `net/http`, not attacker-controlled); `r.Header.Get` does no parsing. No fragile parse step the attacker controls. - **Space-only headers.** `!= ""` accepts them. Hardening to "trim then non-empty" would be invented enforcement; the fields are informational, not enforcement, and no observed failure motivates it. -- **Frame floods after upgrade.** `CloseRead` discards. One goroutine per connection. No amplification. Per-frame work is a header read + body drop. Acceptable for v1. +- **Frame floods after upgrade.** `StartBinaryForwarder` processes each envelope: `Unmarshal` (structural) + linear `PhonesFor` scan + opaque `phone.Send`. Per-envelope work is bounded; the inherited residual is `WSConn`'s per-message size cap (nhooyr's 32 MiB soft floor). One goroutine per connection. Acceptable for v1. - **Browser-driven CSWSH via `OriginPatterns: ["*"]`.** Browsers cannot set custom request headers on raw WebSocket; `fetch` with custom headers triggers CORS pre-flight. The header gate structurally excludes browser attackers; `OriginPatterns: ["*"]` is safe because of the gate, not despite it. - **`POST /v1/server` to confuse the upgrade.** `websocket.Accept` rejects non-GET with a library 4xx. Registry untouched. Tested. - **Conn-id collision.** Birthday bound is fine at v1 scale. Widen the suffix if scale ever demands; public API doesn't change. - **`crypto/rand` failure.** Panic terminates the connection; the http server recovers; other connections continue. A failing host at this depth is operator-visible; refusing to serve is correct (loud failure over silent correction). -- **Goroutine leak on shutdown.** `r.Context()` cancellation propagates through `CloseRead` to `readCtx`; defer runs; no leak. +- **Goroutine leak on shutdown.** `r.Context()` cancellation propagates through the forwarder's `Read`; loop returns; `cancelHB` fires; release defer runs; no leak. Verdicts from #16's and #21's security reviews: **PASS**. No new defences invented; no new threats opened. #21 inherits #20's well-tested registry-side semantics for grace; the residual `grace × accept-rate` map-size scaling is named here and tracked to the per-IP/total-cap tickets that own it. @@ -152,6 +154,7 @@ Not tested: registry mocks (use the real one — it's race-tested in #3); librar ## Related +- [Binary-side forwarder](binary-forwarder.md) — the read pump wired in step 9; replaces the `CloseRead` placeholder. - [Connection registry](connection-registry.md) — `ClaimServer` / `ReleaseServer` / `ErrServerIDConflict` are the primitives this handler routes through. - [WSConn adapter](ws-conn-adapter.md) — what the handler hands to `ClaimServer`. The handler's direct-`c.Close` on conflict is the documented exception to the WSConn-only invariant. - [`/healthz`](healthz.md) — sibling unauthenticated endpoint; mirrors the handler-factory shape.