diff --git a/cmd/pyrycode-relay/main.go b/cmd/pyrycode-relay/main.go index a541435..6fa5389 100644 --- a/cmd/pyrycode-relay/main.go +++ b/cmd/pyrycode-relay/main.go @@ -7,12 +7,14 @@ package main import ( + "context" "errors" "flag" "fmt" "log/slog" "net/http" "os" + "os/signal" "slices" "syscall" "time" @@ -20,26 +22,48 @@ import ( "github.com/pyrycode/pyrycode-relay/internal/relay" ) +// drainDeadline bounds how long Shutdown will wait for in-flight WS +// close handshakes before force-closing. nhooyr.io/websocket.Conn.Close +// waits up to 5s per conn for the peer's reciprocal close (lessons.md); +// 10s leaves ~5s of headroom while keeping a stuck drain from delaying +// a fly machine update indefinitely. Lives at the wiring site per the +// established policy-values-in-main convention (#21, #60). +const drainDeadline = 10 * time.Second + // Version is overridden at build time via -ldflags. var Version = "dev" func main() { + os.Exit(run(os.Args[1:], signalContextFor(syscall.SIGTERM, syscall.SIGINT))) +} + +// signalContextFor returns a context that is cancelled when any of the +// listed signals is received. Equivalent to signal.NotifyContext; broken +// out so tests can drive the shutdown path with a synthetic cancellation +// instead of the real signal handler. +func signalContextFor(sigs ...os.Signal) context.Context { + ctx, _ := signal.NotifyContext(context.Background(), sigs...) + return ctx +} + +func run(args []string, sigCtx context.Context) int { + fs := flag.NewFlagSet("pyrycode-relay", flag.ExitOnError) var ( - domain = flag.String("domain", "", "Public domain for Let's Encrypt cert issuance (required unless --insecure-listen is set).") - certCache = flag.String("cert-cache", defaultCertCache(), "Directory for autocert's TLS certificate cache.") - insecureListen = flag.String("insecure-listen", "", "Listen address for plain HTTP (e.g. :8080). Disables autocert; use only when fronted by a reverse proxy.") - metricsListen = flag.String("metrics-listen", "127.0.0.1:9090", "Listen address for the /metrics endpoint. Must be a loopback IP literal (e.g. 127.0.0.1:9090, [::1]:9090). Empty disables.") - trustXFF = flag.Bool("trust-x-forwarded-for", false, + domain = fs.String("domain", "", "Public domain for Let's Encrypt cert issuance (required unless --insecure-listen is set).") + certCache = fs.String("cert-cache", defaultCertCache(), "Directory for autocert's TLS certificate cache.") + insecureListen = fs.String("insecure-listen", "", "Listen address for plain HTTP (e.g. :8080). Disables autocert; use only when fronted by a reverse proxy.") + metricsListen = fs.String("metrics-listen", "127.0.0.1:9090", "Listen address for the /metrics endpoint. Must be a loopback IP literal (e.g. 127.0.0.1:9090, [::1]:9090). Empty disables.") + trustXFF = fs.Bool("trust-x-forwarded-for", false, "Trust the X-Forwarded-For header as the source IP for per-IP rate limiting. "+ "WARNING: enabling this without a trusted reverse proxy in front of the relay "+ "allows clients to spoof their source IP and bypass per-IP rate limits.") - showVersion = flag.Bool("version", false, "Print version and exit.") + showVersion = fs.Bool("version", false, "Print version and exit.") ) - flag.Parse() + _ = fs.Parse(args) if *showVersion { fmt.Println(Version) - return + return 0 } logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) @@ -47,7 +71,7 @@ func main() { if *insecureListen == "" && *domain == "" { logger.Error("either --domain (for autocert) or --insecure-listen (for behind-proxy mode) must be set") - os.Exit(2) + return 2 } // Boot-time env-var validation runs BEFORE CheckInsecureListenInProduction @@ -65,7 +89,7 @@ func main() { } else { logger.Error("refusing to start: invalid env-var config", "err", err) } - os.Exit(2) + return 2 } if err := relay.CheckInsecureListenInProduction(*insecureListen, os.Getenv); err != nil { @@ -73,7 +97,7 @@ func main() { "err", err, "env_var", "PYRYCODE_RELAY_PRODUCTION", "fix", "remove --insecure-listen and set --domain, or unset PYRYCODE_RELAY_PRODUCTION") - os.Exit(2) + return 2 } // CheckRunningAsRoot is the in-process backstop for the CI non-root-build @@ -89,14 +113,14 @@ func main() { "env_var", "PYRYCODE_RELAY_PRODUCTION", "effective_uid", syscall.Geteuid(), "fix", "drop privileges before exec (e.g. Dockerfile USER directive or --user , kubernetes securityContext.runAsUser), or unset PYRYCODE_RELAY_PRODUCTION if the deploy is truly dev") - os.Exit(2) + return 2 } if err := relay.CheckCapabilities(); err != nil { logger.Error("refusing to start: unexpected Linux capabilities", "err", err, "fix", "drop extra capabilities (e.g. --cap-drop=ALL --cap-add=NET_BIND_SERVICE on docker, or securityContext.capabilities on kubernetes)") - os.Exit(2) + return 2 } startedAt := time.Now() @@ -117,7 +141,7 @@ func main() { "err", err, "value", *metricsListen, "fix", "use a loopback IP literal such as 127.0.0.1:9090 or [::1]:9090, or pass --metrics-listen= to disable") - os.Exit(2) + return 2 } // maxFrameBytes: 256 KiB per-frame read cap. Derivation: @@ -139,11 +163,9 @@ func main() { rateLimitEvictionInterval = 5 * time.Minute ) limiter := relay.NewIPRateLimiter(rateLimitRefillEvery, rateLimitBurst, rateLimitEvictionInterval) - // Best-effort: the current listener block calls os.Exit on error, which - // skips defers. A real graceful-shutdown path (signal handler + server - // Shutdown) is out of scope per #47; this defer runs on clean returns - // (e.g. --version above does not reach here, but future test entry - // points might). + // Graceful shutdown (#31) reaches limiter.Close on the clean return + // path: a signal or a listener-goroutine error triggers relay.Shutdown + // before run returns. defer limiter.Close() rateLimit := relay.NewRateLimitMiddleware(limiter, logger, *trustXFF) @@ -172,7 +194,7 @@ func main() { if err != nil { logger.Error("refusing to start: invalid listener address", "err", err, "addr", srv.Addr) - os.Exit(2) + return 2 } expected := map[uint16]struct{}{port: {}} actual := map[uint16]struct{}{port: {}} @@ -181,7 +203,7 @@ func main() { if err != nil { logger.Error("refusing to start: invalid listener address", "err", err, "addr", metricsSrv.Addr) - os.Exit(2) + return 2 } expected[mp] = struct{}{} actual[mp] = struct{}{} @@ -192,28 +214,22 @@ func main() { "err", err, "unexpected_ports", surplus, "expected_ports", expectedList) - os.Exit(2) + return 2 } + servers := []*http.Server{srv} if metricsSrv != nil { + servers = append(servers, metricsSrv) logger.Info("starting metrics listener", "listen", metricsSrv.Addr) - go func() { - if err := metricsSrv.ListenAndServe(); err != nil { - logger.Error("metrics listener failed", "err", err) - os.Exit(1) - } - }() - } - if err := srv.ListenAndServe(); err != nil { - logger.Error("listen failed", "err", err) - os.Exit(1) } - return + return runServers(sigCtx, logger, reg, servers, func(s *http.Server) error { + return s.ListenAndServe() + }) } mgr, err := relay.NewAutocertManager(*domain, *certCache) if err != nil { logger.Error("autocert setup failed", "err", err) - os.Exit(1) + return 1 } httpsSrv := &http.Server{ @@ -242,13 +258,13 @@ func main() { if err != nil { logger.Error("refusing to start: invalid listener address", "err", err, "addr", httpsSrv.Addr) - os.Exit(2) + return 2 } httpPort, err := relay.ListenerPort(httpSrv.Addr) if err != nil { logger.Error("refusing to start: invalid listener address", "err", err, "addr", httpSrv.Addr) - os.Exit(2) + return 2 } expected := map[uint16]struct{}{443: {}, 80: {}} actual := map[uint16]struct{}{httpsPort: {}, httpPort: {}} @@ -257,7 +273,7 @@ func main() { if err != nil { logger.Error("refusing to start: invalid listener address", "err", err, "addr", metricsSrv.Addr) - os.Exit(2) + return 2 } expected[mp] = struct{}{} actual[mp] = struct{}{} @@ -268,33 +284,69 @@ func main() { "err", err, "unexpected_ports", surplus, "expected_ports", expectedList) - os.Exit(2) + return 2 } logger.Info("starting", "version", Version, "mode", "autocert", "domain", *domain, "cert_cache", *certCache) + servers := []*http.Server{httpsSrv, httpSrv} if metricsSrv != nil { + servers = append(servers, metricsSrv) logger.Info("starting metrics listener", "listen", metricsSrv.Addr) + } + return runServers(sigCtx, logger, reg, servers, func(s *http.Server) error { + if s == httpsSrv { + return s.ListenAndServeTLS("", "") + } + return s.ListenAndServe() + }) +} + +// runServers launches one goroutine per *http.Server invoking listen(s) +// and blocks until either sigCtx is cancelled (operator signal) or one +// of the listeners returns a non-ErrServerClosed error. Either way it +// runs relay.Shutdown(drainCtx, …) and returns: +// +// - 0 on signal-triggered drain (clean operator action), +// - 1 on listener-error-triggered drain (process supervisor restart). +// +// Errors are logged with their source listener's Addr; only the first +// listener error wins (buffered chan). +func runServers(sigCtx context.Context, logger *slog.Logger, reg *relay.Registry, servers []*http.Server, listen func(*http.Server) error) int { + listenerErr := make(chan error, 1) + for _, s := range servers { + s := s go func() { - if err := metricsSrv.ListenAndServe(); err != nil { - logger.Error("metrics listener failed", "err", err) - os.Exit(1) + if err := listen(s); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Error("listener failed", "addr", s.Addr, "err", err) + select { + case listenerErr <- err: + default: + } } }() } - go func() { - if err := httpSrv.ListenAndServe(); err != nil { - logger.Error("http-01 listener failed", "err", err) - os.Exit(1) - } - }() + var triggeredByErr bool + select { + case <-sigCtx.Done(): + logger.Info("shutdown signal received; draining") + case <-listenerErr: + triggeredByErr = true + logger.Error("listener error; draining") + } + + drainCtx, cancel := context.WithTimeout(context.Background(), drainDeadline) + defer cancel() + if err := relay.Shutdown(drainCtx, logger, reg, servers...); err != nil { + logger.Warn("drain incomplete", "err", err) + } - if err := httpsSrv.ListenAndServeTLS("", ""); err != nil { - logger.Error("https listener failed", "err", err) - os.Exit(1) + if triggeredByErr { + return 1 } + return 0 } // listenerPortLists returns the ascending-sorted surplus (actual\expected) diff --git a/cmd/pyrycode-relay/main_e2e_test.go b/cmd/pyrycode-relay/main_e2e_test.go new file mode 100644 index 0000000..cdbc61a --- /dev/null +++ b/cmd/pyrycode-relay/main_e2e_test.go @@ -0,0 +1,149 @@ +package main + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "testing" + "time" + + "nhooyr.io/websocket" +) + +// TestRun_SigtermClosesWSConnsWithGoingAway boots the relay via run() +// on a loopback port, opens a /v1/server WebSocket, triggers a graceful +// shutdown via the signal context, and asserts that the WS Read returns +// a close error carrying websocket.StatusGoingAway. Exits the run() +// goroutine and asserts the exit code is 0 (signal-triggered drain). +func TestRun_SigtermClosesWSConnsWithGoingAway(t *testing.T) { + addr, err := freePort() + if err != nil { + t.Fatalf("freePort: %v", err) + } + + sigCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exit := make(chan int, 1) + go func() { + exit <- run([]string{ + "--insecure-listen", addr, + "--metrics-listen", "", + }, sigCtx) + }() + + // Wait for the listener to actually be accepting connections. + if err := waitForDial(addr, 3*time.Second); err != nil { + t.Fatalf("relay did not accept connections: %v", err) + } + + // Open a /v1/server WS connection. + dialCtx, dialCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer dialCancel() + c, _, err := websocket.Dial(dialCtx, "ws://"+addr+"/v1/server", &websocket.DialOptions{ + HTTPHeader: http.Header{ + "X-Pyrycode-Server": []string{"s-e2e-1"}, + "X-Pyrycode-Version": []string{"0.0.0"}, + "User-Agent": []string{"pyrycode-relay-e2e/1.0"}, + }, + }) + if err != nil { + t.Fatalf("websocket.Dial: %v", err) + } + defer c.Close(websocket.StatusInternalError, "") + + // Trigger shutdown. + cancel() + + // Next Read must surface a close error with StatusGoingAway. + readCtx, readCancel := context.WithTimeout(context.Background(), 3*time.Second) + defer readCancel() + if _, _, err := c.Read(readCtx); err == nil { + t.Fatal("Read: got nil error after shutdown, want close error") + } else if code := websocket.CloseStatus(err); code != websocket.StatusGoingAway { + t.Errorf("close status: got %d, want %d (StatusGoingAway); err=%v", + code, websocket.StatusGoingAway, err) + } + + // run() must exit within the drain deadline. + select { + case got := <-exit: + if got != 0 { + t.Errorf("run exit code: got %d, want 0", got) + } + case <-time.After(drainDeadline + 2*time.Second): + t.Fatal("run did not return after shutdown") + } +} + +// TestRun_SigtermWithNoConns exits cleanly even with no live WS conns. +func TestRun_SigtermWithNoConns(t *testing.T) { + addr, err := freePort() + if err != nil { + t.Fatalf("freePort: %v", err) + } + + sigCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exit := make(chan int, 1) + go func() { + exit <- run([]string{ + "--insecure-listen", addr, + "--metrics-listen", "", + }, sigCtx) + }() + + if err := waitForDial(addr, 3*time.Second); err != nil { + t.Fatalf("relay did not accept connections: %v", err) + } + cancel() + + select { + case got := <-exit: + if got != 0 { + t.Errorf("run exit code: got %d, want 0", got) + } + case <-time.After(drainDeadline + 2*time.Second): + t.Fatal("run did not return after shutdown") + } + + // After shutdown the listener is no longer reachable. + if conn, err := net.DialTimeout("tcp", addr, 200*time.Millisecond); err == nil { + conn.Close() + t.Error("listener still accepting connections after run returned") + } +} + +// freePort opens a loopback listener on port 0, captures the assigned +// port, closes the listener, and returns the addr string. There is an +// inherent race between close and the test re-binding it, but the +// window is short enough that this is reliable in practice. +func freePort() (string, error) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", err + } + addr := ln.Addr().String() + if err := ln.Close(); err != nil { + return "", err + } + return addr, nil +} + +// waitForDial polls until a TCP dial to addr succeeds or the deadline +// elapses. +func waitForDial(addr string, deadline time.Duration) error { + end := time.Now().Add(deadline) + for time.Now().Before(end) { + conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) + if err == nil { + conn.Close() + return nil + } + time.Sleep(20 * time.Millisecond) + } + return fmt.Errorf("dial %s: %w", addr, errors.New("deadline elapsed")) +} diff --git a/docs/knowledge/INDEX.md b/docs/knowledge/INDEX.md index 9ffdce9..18d0eef 100644 --- a/docs/knowledge/INDEX.md +++ b/docs/knowledge/INDEX.md @@ -4,6 +4,7 @@ One-line pointers into the evergreen knowledge base. Newest entries at the top o ## Features +- [Graceful shutdown on SIGTERM](features/graceful-shutdown.md) — `SIGTERM` / `SIGINT` triggers a bounded drain: stop accepting new connections, emit `1001 StatusGoingAway` on every live WS conn, give peers a configurable window (default 10s) to ack, then force-close and exit. Closes the "graceful-shutdown signal handler" gap that #47, #57, and #60 deferred to this ticket. Three pieces: (a) `func (r *Registry) Snapshot() []Conn` — freshly-allocated slice of every registered binary + phone conn, RLock-scoped, nil on empty, mirrors `PhonesFor`'s shape; preserves ADR-0003's passive-registry contract (slow work happens outside the lock). (b) `func relay.Shutdown(ctx, logger, reg, servers...) error` in `internal/relay/shutdown.go` — concurrently invokes `srv.Shutdown(ctx)` on each `*http.Server` (closes listeners synchronously, waits for non-hijacked handlers) AND fans `CloseWithCode(StatusGoingAway, "shutting down")` out across `reg.Snapshot()` (one goroutine per conn; concurrent fan-out is load-bearing because `nhooyr.io/websocket.Conn.Close` blocks up to 5s per conn for the peer's reciprocal handshake — serial iteration would scale linearly in conn count and blow past the deadline at the second stuck peer). A type-assertion seam against the private `gracefulCloser` interface (`CloseWithCode(websocket.StatusCode, string)`) lets `*WSConn` (#7, ADR-0007) take the code-aware path while test fakes and any future narrow-`Conn` impls fall back to plain `Close()`. Returns `nil` on clean drain, `ctx.Err()` on deadline expiry — and on the deadline path explicitly iterates `srv.Close()` per server because `http.Server.Shutdown` does NOT internally force-close its listeners after ctx expiry (the unit test asserts wall-clock < 300ms after a 50ms deadline specifically to pin this). Errors from `srv.Shutdown` are logged but never abort the drain — every server gets a Shutdown call, every conn gets a close call. (c) `cmd/pyrycode-relay/main.go` refactor splitting `main` into a testable `run(args []string, sigCtx context.Context) int` entry point invoked from a one-line `main` (`os.Exit(run(os.Args[1:], signalContextFor(SIGTERM, SIGINT)))`); the split exists so `main_e2e_test.go` can drive the full boot+drain sequence with a synthetic `context.Context` instead of forking a subprocess and sending real signals. `runServers(sigCtx, logger, reg, servers, listen)` is the single sink for both `--insecure-listen` and `--domain` modes — launches one goroutine per `*http.Server` running the `listen` closure (`ListenAndServe` or `ListenAndServeTLS("", "")` for the autocert HTTPS server), forwards the first non-`ErrServerClosed` listener error to a 1-slot buffered channel (first error wins, additional non-blocking sends drop), `select`s on `<-sigCtx.Done()` vs `<-listenerErr`, then builds `context.WithTimeout(_, drainDeadline)` and calls `relay.Shutdown(...)`. Exit codes: 0 for signal-triggered drain (clean operator action), 1 for listener-error-triggered drain (process supervisor restart) — preserves today's `os.Exit(1)` semantics on listener errors while enabling clean shutdown on signals. `drainDeadline = 10 * time.Second` declared as a `const` at the top of `main.go` per the established policy-values-at-wiring-site convention (#21, #60, #79); leaves ~5s of headroom beyond a single close-handshake. The five pre-existing `os.Exit(1)` calls inside listener goroutines collapse into the buffered-error channel — and `defer limiter.Close()` (introduced by #47) now actually runs on every shutdown path. Shutdown ordering is structural, not enforced by a barrier: the listener-close is in flight before the snapshot is taken, so no new WS handshake can complete between snapshot and close. Handler goroutines are NOT joined by the helper — `CloseWithCode` unblocks the handler's `Read`, the handler unwinds through its existing `defer reg.ScheduleReleaseServer / wsconn.Close` chain, and anything still alive at process exit is reaped by the kernel. A second SIGTERM during drain is ignored (`NotifyContext` cancels its context once); SIGKILL bypasses Go entirely. Empty registry is a no-op (`Snapshot()` returns nil, fan-out has zero goroutines, helper returns nil promptly). Tests: `shutdown_test.go` (empty-registry, all-conns-get-StatusGoingAway, deadline-expiry-with-wall-clock-bound, real-`*WSConn`-idempotency under race), `registry_test.go` (Snapshot empty/membership/isolation/race-freedom), `main_e2e_test.go` (full boot + WS dial + cancel-sigCtx + assert `websocket.CloseStatus(err) == StatusGoingAway` + assert exit 0 within drainDeadline+2s). Out of scope: protocol-level reconnect/resume coordination (lives in `protocol-mobile.md`), in-flight frame persistence (relay is stateless), `--drain-deadline` CLI flag (10s constant for v1; promote when ops experience demands), SIGHUP config reload (no runtime-reloadable config exists). (#31). - [Upgrade-attempt and register-failure counters](features/upgrade-and-register-failure-counters.md) — two CounterVecs at the `/v1/server` + `/v1/client` upgrade call sites so operators can distinguish "flood of malformed headers" from "flood of 4409 conflicts" from "flood of 4429 phone-cap rejections" from "flood of rate-limited clients" without log-grepping. `pyrycode_relay_ws_upgrade_attempts_total{endpoint, outcome}` (`endpoint` ∈ `{server, client}`, `outcome` ∈ `{accept, reject_headers, reject_409, reject_404, reject_429, reject_rate_limit}`) is incremented exactly once per terminal decision; `pyrycode_relay_register_failures_total{kind}` (`kind` ∈ `{no_server, server_in_use, phones_at_cap, ratelimit}`) co-increments in lockstep with the matching `outcome=reject_*` cell. All 16 series are pre-bound via `WithLabelValues` in `NewUpgradeMetrics(reg prometheus.Registerer)` — including the three structurally unreachable cells (`server×reject_404`, `server×reject_429`, `client×reject_409`) which stay at 0 by construction — so ops dashboards see a stable scrape shape rather than series flickering into existence on first occurrence. Nine event-named, nil-receiver-safe methods (`ServerAccept`, `ServerHeaderReject`, `ServerIDConflict`, `ServerRateLimitDeny`, `ClientAccept`, `ClientHeaderReject`, `ClientNoServer`, `ClientPhonesAtCap`, `ClientRateLimitDeny`); the five composite methods are the lockstep contract — a single call bumps both the endpoint×outcome and kind cells, so tests cannot accidentally advance one without the other. **Two load-bearing seam choices**: (a) handler-parameter threading (`*UpgradeMetrics` as the final arg to `ServerHandler` / `ClientHandler`) instead of #58's hooks-on-`Registry`, because three of the five terminal sites (header-reject, success, rate-limit-deny) have NO Registry call to hook through — fan-out is 7 mechanical `, nil` test appends, under the 10-call-site red line; (b) `WrapServerRateLimitDeny` / `WrapClientRateLimitDeny` as a status-code observer wrapper stacked OUTSIDE the rate-limit middleware (#47), because adding a callback parameter to `NewRateLimitMiddleware` would cascade across 11 call sites — HTTP 429 in either pipe is unambiguously the rate-limit deny signal (header-gate writes 400, success writes 101, WS close codes 4404/4409/4429 travel inside the upgraded socket). The wrapper's `statusObserverResponseWriter` **must implement `http.Hijacker`** because the wrapped pipe terminates in `websocket.Accept` which type-asserts the writer for hijacking; without forwarding, the upgrade fails with HTTP 500 — a constraint that applies to any future middleware wrapping a WS upgrade. Increments fire structurally tight to the deciding branch (between `http.Error`/`logger.Info` and `return`) so "did this code path execute?" is one statement from "did the counter increment?". All three label values (`endpoint`, `outcome`, `kind`) are hard-coded compile-time constants set per event-named method — none derived from request headers, request paths, or any other attacker-influenced input; cardinality is exactly 16 regardless of traffic. Registered against the private `metricsReg` per ADR-0008 § Scope of use; package-level `TestMetricsRegistry_NoGlobalRegistrarLeak` plus a local sibling `TestUpgradeMetrics_NoGlobalRegistrarLeak` enforce no `DefaultRegisterer` leak structurally. The defensive `wsconn.Close()` non-`Is`-matched branches in both handlers stay MUST-NOT-increment — a future error variant would be a new outcome and belongs in a follow-up ticket. Out of scope: latency histograms for the upgrade handshake, a second HTTP-429 source on the upgrade pipes (would invalidate the observer's filter), source-IP-labelled attempt counters (would carry attacker-influenced label values) (#57). - [Per-IP rate-limit middleware (`/v1/server` + `/v1/client`)](features/rate-limit-middleware.md) — HTTP middleware that throttles WS upgrade attempts per source IP **before** `websocket.Accept` runs, before the registry is touched, before the mobile/binary token header is read. One new exported symbol: `relay.NewRateLimitMiddleware(limiter *IPRateLimiter, logger *slog.Logger, trustForwardedFor bool) func(http.Handler) http.Handler` — returns the conventional composition shape, matching `EnforceHost`. Per request: extract `ip := ClientIP(r, trustForwardedFor)`, deny on `ip == ""` (empty-IP guard runs **before** `Allow` because the limiter's `Allow("")` is a normal map key, not a deny — loud-failure rule: refuse un-attributable traffic rather than admit it un-throttled), deny on `!limiter.Allow(ip)`, otherwise `next.ServeHTTP`. Both deny paths emit one `logger.Warn("rate_limited", "remote", strconv.Quote(ip))` line — single allowlisted key, `strconv.Quote` defends against control-byte log injection, the empty-IP case renders `remote="\"\""` so operators can distinguish the two deny causes by the value. 429 body is empty (`http.Error(w, "", http.StatusTooManyRequests)` — matches the existing `BadRequest` shape in `server_endpoint.go` / `client_endpoint.go`, no internal state leaked). New `--trust-x-forwarded-for` CLI flag (default `false`, Usage carries the explicit spoofing warning) is threaded once into the factory. Policy lives at the wiring site in `cmd/pyrycode-relay/main.go`: `rateLimitRefillEvery = 6s`, `rateLimitBurst = 20`, `rateLimitEvictionInterval = 5min` → ~10 sustained attempts/IP/min with 20-attempt burst headroom; `5min` sweep is well above `burst*refillEvery = 120s`. One shared `*IPRateLimiter` is applied to both `mux.Handle` registrations — a misbehaving IP retrying against either endpoint shares one bucket; `/healthz` is intentionally NOT wrapped (must remain pollable from monitoring). `defer limiter.Close()` is best-effort (current `os.Exit` listener path skips defers; graceful shutdown out of scope). Concurrency safety fully delegated to the limiter; middleware is stateless, no new locks, no new goroutines. Mobile/binary `X-Pyrycode-Token` structurally not read on this code path. Why middleware not constructor injection: keeps frame-routing handlers ignorant of admission control, makes `/healthz`'s exemption fall out for free, mirrors `EnforceHost`. Closes the third leg of `docs/threat-model.md` § *DoS resistance* alongside #29 (per-frame size cap) and #30 (per-server-id phone cap); the "token-bucket rate limit on /v1/server and /v1/client" line moves from *Future hardening* to v1 mitigation. Out of scope: connection-count cap (different threat surface — *attempt rate* vs *resident count*), CIDR-aware trusted-proxy chain (today's flag is flat all-or-nothing), multi-instance shared-state (would need Redis), rate-limit metrics counter (future ticket parallel to #58), adaptive policy, graceful-shutdown signal handler (#47). - [Metrics listener (localhost-only)](features/metrics-listener.md) — separate `*http.Server` for `/metrics`, bound to a loopback IP literal (default `127.0.0.1:9090`); kept off the internet-exposed public listener that serves `/healthz` + `/v1/{server,client}` because metric values leak operational state. Two exports + one sentinel in `internal/relay/metrics_listen.go`: `ErrNonLoopbackBind` (branchable via `errors.Is`), `CheckLoopbackBind(addr)` (pure validator — `net.SplitHostPort` → `ListenerPort` for the port-0 / range / format rejects inherited from #81 → `net.ParseIP(host).IsLoopback()`; hostnames including `localhost:9090` are rejected even when they currently resolve to loopback — the DNS-time TOCTOU window is closed structurally, not by re-resolving), and `NewMetricsServer(addr, h)` (opt-out-aware: `addr == ""` → `(nil, nil)`, validator failure → `(nil, err)`, otherwise an `*http.Server` with the public listener's four timeouts duplicated literally so either listener can drift independently in a future ticket). Wired into both listener branches in `cmd/pyrycode-relay/main.go`: a goroutine launch mirroring the autocert mode's http-01 listener pattern (`os.Exit(1)` on `ListenAndServe` failure — loud-fail-over-silent because a relay booted with metrics enabled but silently not serving them would mislead operator scrapes), the metrics port joins **both** the `expected` and `actual` sets of #81's allowlist (declared secondary listener — must land on both sides of the asymmetric check). Empty-flag opt-out is structural via `metricsSrv != nil` guards at every reference site — no repeated `if *metricsListen == "" {}` branches. TLS / authn deliberately out of scope (loopback IS the defence); same-host adversary in scope but not a defence target; graceful shutdown deferred to #31. Three tests in `metrics_listen_test.go` — 12-row validator matrix, 3-row constructor matrix with timeouts pinned to literal values (not a shared constant), and an end-to-end happy-path that drives validator + constructor + `net.Listen("tcp", "127.0.0.1:0")` + actual `http.Get` round-trip by exploiting `http.Server.Serve(l)` ignoring `Addr` once a listener is supplied (#60). diff --git a/docs/knowledge/codebase/31.md b/docs/knowledge/codebase/31.md new file mode 100644 index 0000000..78d6387 --- /dev/null +++ b/docs/knowledge/codebase/31.md @@ -0,0 +1,74 @@ +# Ticket #31 — graceful shutdown on SIGTERM: drain WS connections before exit + +The relay had no signal handling. SIGTERM (fly machine update, container stop, systemd restart, k8s pod evict) killed in-flight WS frames mid-write and peers reconnected after a TCP reset. This ticket wires SIGTERM/SIGINT through a bounded drain: stop accepting new connections, emit `1001 StatusGoingAway` on every live WS conn, give peers a configurable window to ack, then force-close. Closes the "graceful-shutdown signal handler" gap that #47, #57, and #60 had explicitly deferred to this ticket. + +## Implementation + +- **`internal/relay/registry.go` (+30 lines)** — new `func (r *Registry) Snapshot() []Conn` accessor mirroring `PhonesFor`'s shape: + - `RLock` for the duration of the copy; iterate `r.binaries` + `r.phones`; return a freshly-allocated `[]Conn`. The total size is computed under the lock (`len(r.binaries) + sum(len(s) for s in r.phones)`) so the slice is allocated once with `make([]Conn, 0, total)` — no resize churn during the copy. + - Returns nil on empty registry (matches `PhonesFor`'s nil-on-empty convention). + - Order is unspecified (map-iteration order); the only consumer (shutdown helper) doesn't care. + - The returned `Conn` handles are the same references the registry holds; calling `Close` on them affects the live connection. The accessor preserves the passive-registry pattern (ADR-0003) — slow work (`CloseWithCode` fan-out) happens entirely outside the registry lock. +- **`internal/relay/shutdown.go` (new, ~85 lines)** — one exported function plus a private adapter interface: + - `func Shutdown(ctx context.Context, logger *slog.Logger, reg *Registry, servers ...*http.Server) error` — concurrent fan-out of two parallel drains tracked by a single `sync.WaitGroup`: (a) one goroutine per `*http.Server` calling `srv.Shutdown(ctx)` (closes the listener synchronously, then waits for non-hijacked handlers); (b) one goroutine per snapshotted `Conn` calling `c.CloseWithCode(websocket.StatusGoingAway, "shutting down")` if the conn satisfies the unexported `gracefulCloser` interface, else falling back to `c.Close()`. After both fan-outs are launched, the helper races a done-channel (closed when the WaitGroup hits zero) against `ctx.Done()`. On ctx-first, iterate `servers` and call `srv.Close()` to force-close any non-WS conns still pending; return `ctx.Err()`. On done-first, return `nil`. + - **`gracefulCloser` interface** is `interface { CloseWithCode(websocket.StatusCode, string) }` — `*WSConn` satisfies it (ADR-0007); the `Conn` interface intentionally does not, so test fakes and any future `Conn` impls that have no code-aware close fall back to plain `Close()`. The type assertion `c.(gracefulCloser)` is the single seam. + - **Errors from `srv.Shutdown` are logged but never abort the drain.** Every server gets a Shutdown call, every conn gets a close call. The function's return value reflects only the deadline outcome (`nil` clean, `ctx.Err()` timed out). `context.Canceled` / `context.DeadlineExceeded` from `srv.Shutdown` are filtered out of the error log to keep the deadline path quiet. + - **Concurrent close fan-out is load-bearing.** `nhooyr.io/websocket.Conn.Close` blocks up to 5s waiting for the peer's reciprocal close handshake (lessons.md). Serial fan-out would scale linearly in conn count and blow past the 10s deadline at the second stuck peer. One goroutine per conn keeps wall-clock bounded at ~5s regardless of conn count. The handler goroutines themselves are not joined — `CloseWithCode` unblocks the handler's `Read`, the handler unwinds through its existing defers, process exit reaps anything still alive. +- **`cmd/pyrycode-relay/main.go` (152 lines changed, +102/-50)** — uniform listener+drain pattern across both modes: + - `main` now delegates to `os.Exit(run(os.Args[1:], signalContextFor(syscall.SIGTERM, syscall.SIGINT)))`. The split exists so `main_e2e_test.go` can drive `run` with a synthetic `context.Context` instead of the real signal handler. + - `signalContextFor(sigs ...os.Signal) context.Context` wraps `signal.NotifyContext` and discards the stop func — the helper is process-scoped and never explicitly stopped. + - `run(args []string, sigCtx context.Context) int` returns an `int` exit code instead of calling `os.Exit` from the listener goroutines. All five pre-existing `os.Exit(1)` listener-error exits (insecure listener, autocert https, autocert http-01, metrics listener × 2) collapse into one shape: a buffered `listenerErr chan error` in `runServers`. + - **`runServers(sigCtx, logger, reg, servers, listen)`** — launches one goroutine per `*http.Server` invoking `listen(s)`; on a non-`http.ErrServerClosed` return, sends the error to the 1-slot buffered channel (first error wins, subsequent non-blocking sends drop). A `select` blocks on `<-sigCtx.Done()` or `<-listenerErr`. Either branch builds `drainCtx, cancel := context.WithTimeout(context.Background(), drainDeadline)`, calls `relay.Shutdown(drainCtx, logger, reg, servers...)`, and returns 0 (signal) or 1 (listener error). + - **`drainDeadline = 10 * time.Second`** — declared as a `const` at the top of `main.go` per the established policy-values-at-wiring-site convention (#21, #60, #79). Leaves ~5s of headroom beyond a single close-handshake; bumped here without touching the helper. + - The `listen func(*http.Server) error` closure is the seam for the per-server choice between `ListenAndServe` and `ListenAndServeTLS("", "")` — `if s == httpsSrv { return s.ListenAndServeTLS("", "") }` in the autocert mode is the one site where the autocert HTTPS server diverges. Insecure and metrics modes pass `s.ListenAndServe()` uniformly. + - `defer limiter.Close()` (introduced by #47) now actually runs — `run` returns normally on every path that used to `os.Exit` from a listener goroutine. +- **`internal/relay/registry_test.go` (+113 lines)** — four new tests for `Snapshot`: + - `TestSnapshot_EmptyRegistry` — pins `nil` (not zero-length slice) for an empty registry, matching `PhonesFor`'s contract. + - `TestSnapshot_IncludesBinariesAndPhones` — registers 2 binaries + 3 phones across 2 server-ids; asserts `len == 5` and the ConnID set membership (no order assumption). + - `TestSnapshot_FreshSliceIsolation` — mutates the returned slice (`snap[0] = nil`, `snap = snap[:0]`); asserts a subsequent `Snapshot` is unaffected and a new `BinaryFor` / `PhonesFor` still returns the original handles; then registers a third entry and confirms the new snapshot reflects it. + - `TestSnapshot_RaceFreedom` — hammers `Snapshot` concurrently with `ClaimServer` / `ReleaseServer` / `RegisterPhone` across multiple goroutines under `-race`; the assertion is the absence of a panic or data race, plus an internal-consistency check that each individual snapshot's `len` matches its sample's binary+phone counts. +- **`internal/relay/shutdown_test.go` (new, 262 lines)** — four tests, each `t.Parallel()`-safe: + - `shutdownFakeConn` extends `fakeConn`'s pattern with a captured `closeCode` / `closeMsg` and an optional `blockFor time.Duration` knob for the deadline-expiry test. `Close()` delegates to `CloseWithCode(StatusNormalClosure, "")` mirroring `*WSConn`. Locked with its own `sync.Mutex`; the `snapshot()` helper returns the captured state. + - `newTestServer(t)` binds a fresh loopback port via `net.Listen("tcp", "127.0.0.1:0")`, launches `srv.Serve(ln)` in a goroutine, and returns the server plus a buffered `errCh` carrying the `Serve` return value. The Listen-then-Serve shape sidesteps `ListenAndServe`'s race between dial and accept. + - `TestShutdown_EmptyRegistry` — covers both no-args and one-server-no-conns shapes; both must return nil and the server's `Serve` must return `http.ErrServerClosed`. + - `TestShutdown_ClosesAllConnsWithGoingAway` — registers 1 binary + 2 phones, runs `Shutdown` with a 2s deadline, asserts all three conns observed `CloseWithCode(StatusGoingAway, …)` and the server's listener closed. + - `TestShutdown_DeadlineExpiryReturnsCtxErr` — registers a conn whose `CloseWithCode` blocks 500ms; passes a 50ms deadline; asserts `errors.Is(err, context.DeadlineExceeded)`, asserts wall-clock elapsed < 300ms (the helper must NOT wait for the stuck close — `srv.Close()` force-close is what makes the deadline path prompt), asserts `Serve` returned `http.ErrServerClosed` (force-close path). + - `TestShutdown_CloseIdempotentOnRealWSConn` — pins the assumption that `closeOnce` on `*WSConn` (#7, ADR-0007) makes the shutdown-path `CloseWithCode` and a concurrent handler-defer `Close` safe under race. Spins up a real `httptest`-style server, `websocket.Accept`s the dial, and races `CloseWithCode(StatusGoingAway)` + `Close()` on the same `*WSConn` — both must return. +- **`cmd/pyrycode-relay/main_e2e_test.go` (new, 149 lines)** — two end-to-end tests driving the `run` entry point with a synthetic `sigCtx`: + - `TestRun_SigtermClosesWSConnsWithGoingAway` — `freePort()`s a loopback address, calls `run(["--insecure-listen", addr, "--metrics-listen", ""], sigCtx)` in a goroutine, `waitForDial`s the listener, opens a real `/v1/server` WS upgrade (with valid `X-Pyrycode-Server` / `X-Pyrycode-Version` / `User-Agent` headers), cancels `sigCtx`, asserts the next `c.Read` returns a close error with `websocket.CloseStatus(err) == StatusGoingAway`, asserts `run` returns exit code 0 within `drainDeadline + 2s`. + - `TestRun_SigtermWithNoConns` — the empty-registry happy path: boot, no dials, cancel, exit 0; also asserts the listener is no longer dialable after `run` returns (the listener actually closed, not just the goroutine ended). + - `freePort()` opens `127.0.0.1:0`, captures the assigned port, closes the listener, returns the addr string. The inherent close→rebind race window is short enough to be reliable in practice; tests use distinct ports per case so failures don't cascade. + +## Acceptance criteria — verification map + +- **AC-1** (SIGTERM/SIGINT triggers shutdown, exit 0 on completion or deadline): `signalContextFor` registers both signals; `runServers` returns 0 on the `<-sigCtx.Done()` branch regardless of whether `Shutdown` returns nil or `ctx.Err()`. Test: `TestRun_SigtermClosesWSConnsWithGoingAway`, `TestRun_SigtermWithNoConns`. +- **AC-2** (during shutdown, new `/v1/server` and `/v1/client` upgrades do not complete): `srv.Shutdown(ctx)` closes the listener synchronously at function entry, so a dial arriving after `Shutdown` is invoked sees `ECONNREFUSED`. Test: `TestRun_SigtermWithNoConns` asserts `net.DialTimeout` fails after `run` returns. +- **AC-3** (every registered WS conn receives a 1001 close frame): `Shutdown` snapshots via `reg.Snapshot()` and fans `CloseWithCode(StatusGoingAway, …)` out concurrently. Tests: `TestShutdown_ClosesAllConnsWithGoingAway` (unit), `TestRun_SigtermClosesWSConnsWithGoingAway` (e2e — peer reads `StatusGoingAway` over a real WS). +- **AC-4** (configurable deadline; force-close at deadline; default 10s): `drainDeadline = 10*time.Second` at the wiring site; `Shutdown`'s `select` returns `ctx.Err()` and force-closes via `srv.Close()` on deadline. Test: `TestShutdown_DeadlineExpiryReturnsCtxErr`. +- **AC-5** (both `--insecure-listen` and `--domain` modes handle the signal identically): `runServers` is the single sink for both branches; the only divergence is the `listen` closure that maps `httpsSrv → ListenAndServeTLS`. Both branches now build `servers []*http.Server` (including `metricsSrv` when non-nil) and hand it to the same helper. + +## Patterns established + +- **Listener-error → drain-via-helper instead of `os.Exit(1)` from inside the listener goroutine.** Previously each listener goroutine could `os.Exit(1)` directly, which made `defer limiter.Close()` (#47) best-effort and made graceful drain structurally impossible. The new shape buffers the first listener error to a 1-slot channel, lets `main`'s goroutine see it, and runs `Shutdown` before returning. Carry forward: any future listener added to `runServers`'s `servers` slice inherits both the drain and the listener-error pathway for free; no new `os.Exit` calls in goroutines. +- **`run(args, ctx) int` as the testable entry point.** Splitting `main` into a `run(args []string, sigCtx context.Context) int` function whose only side effect is the return-int-to-`os.Exit` lets `main_e2e_test.go` drive the full boot + drain sequence with a synthetic `context.Context` instead of forking a subprocess and sending real signals. The test code is ordinary Go, races correctly under `-race`, and exercises the listener + drain together. The `os.Args[1:]` + `signalContextFor` glue stays in `main` and is uncovered by tests; everything below it is in `run` and is coverable. Same shape as the developer convention for any `cmd/`-resident binary going forward. +- **Concurrent fan-out for any operation where each item can block up to N seconds.** The 5s close-handshake gate (lessons.md) is the canonical example; the rule generalises: when an iteration's per-item wall-clock cost is bounded but non-trivial, launch one goroutine per item and join with a `sync.WaitGroup` + done-channel. Serial iteration would make the wall-clock cost scale linearly in N and would force the deadline to grow without bound. +- **Type-assertion seam for "some impls have richer behaviour".** `gracefulCloser` is a private interface that `*WSConn` satisfies but the registry's `Conn` interface deliberately does not. The shutdown helper type-asserts at use time and falls back to the narrower contract (`Close()`) when the assertion fails. Tests can write the narrow contract (`shutdownFakeConn` here, `fakeConn` in `registry_test.go`) without forcing every test fake to implement methods that have no behaviour to verify. Carry forward: when a new caller needs richer-than-`Conn` behaviour on a registered conn, add a private interface in the caller's file and assert there, rather than widening `Conn`. + +## Lessons learned + +- **`http.Server.Shutdown(ctx)`'s "do not wait for hijacked conns" contract is the load-bearing fact.** The natural reading of "graceful shutdown" is `srv.Shutdown(ctx)` and done — but that gives the WS handler goroutines no signal to unwind. The handler is blocked in `Read` on the hijacked conn; `Shutdown` won't touch hijacked conns; the conn won't close until the handler's defer runs; the handler's defer won't run until `Read` returns. The only break in the cycle is to close the conn ourselves from outside the handler. That's why the registry has to be enumerable from the shutdown path, and why `Snapshot()` exists. +- **`srv.Close()` after deadline expiry is a separate force-close, not redundant.** `srv.Shutdown(ctx)` returns when `ctx` expires, but it does NOT internally call `srv.Close()` on its own listeners and idle conns — the listener stays bound, idle keep-alives stay open. The deadline-path explicit `srv.Close()` loop in `Shutdown` is what makes the helper return promptly after the deadline; without it, the helper would still hold listeners open until the process actually exits. `TestShutdown_DeadlineExpiryReturnsCtxErr` asserts elapsed wall-clock < 300ms specifically to catch a regression that drops the force-close. +- **`signal.NotifyContext`'s discarded `stop` func is intentional, not lazy.** The returned `stop` would restore default signal handling for the listed signals, which is useful if the program continues running after handling a signal. Here, receiving a signal means we're exiting — there's no "after". Discarding `stop` with `_` is the right shape; the lint suggestion to capture it would just produce an always-unused local. +- **The buffered-1 listener error channel was not obvious — a buffered-N would be wrong.** If we used `make(chan error, len(servers))`, all listener goroutines that error in a tight time window would each write, and a slow consumer could observe a stale error after the first one already triggered drain. Buffered-1 + non-blocking `select { case listenerErr <- err: default: }` is "first error wins, additional errors are logged and dropped" — exactly the semantics we want: one error is enough to trigger drain, more errors don't change behaviour. +- **The `t.Parallel()` posture on `shutdown_test.go` survives `freePort`-style port grabs because each test gets a distinct ephemeral port.** `freePort` close→rebind windows are independent across tests; tests do not share a port. Had we hard-coded a port, parallelism would have caused intermittent `EADDRINUSE`. Same constraint applies to any future test that uses `net.Listen("tcp", "127.0.0.1:0")` + close + reuse. + +## Cross-links + +- [Feature: Graceful shutdown on SIGTERM](../features/graceful-shutdown.md) — evergreen feature doc. +- [Architect spec](../../specs/architecture/31-graceful-shutdown-sigterm.md) — full design rationale, including the rejected alternatives (serial close fan-out, dedicated drain goroutine, in-handler ctx-cancel propagation). +- [ADR-0003: Connection registry as a passive store](../decisions/0003-connection-registry-passive-store.md) — the contract `Snapshot()` preserves (no callbacks under the registry lock; slow work outside). +- [ADR-0007: `WSConn.CloseWithCode` for active-conn application close codes](../decisions/0007-wsconn-closewithcode-for-active-conn.md) — the `closeOnce` idempotency that lets the shutdown path race the handler-defer. +- [Codebase note #47](47.md) — `defer limiter.Close()` is no longer best-effort; it runs on every shutdown path now. +- [Codebase note #60](60.md) — metrics listener; the third `*http.Server` that joins `runServers`'s slice. +- [Codebase note #7](7.md) — `WSConn.Close` / `CloseWithCode` semantics; the per-conn primitive used by the fan-out. +- [Lessons § nhooyr Close 5s handshake](../../lessons.md) — the constraint that forces concurrent fan-out. diff --git a/docs/knowledge/features/autocert-tls.md b/docs/knowledge/features/autocert-tls.md index 2dbc88f..77dd43c 100644 --- a/docs/knowledge/features/autocert-tls.md +++ b/docs/knowledge/features/autocert-tls.md @@ -98,7 +98,7 @@ Two goroutines, no shared mutable state: 1. Main — `httpsSrv.ListenAndServeTLS("", "")`. The empty cert/key paths defer to `TLSConfig.GetCertificate`, which `manager.TLSConfig()` populates. This is the documented autocert pattern. 2. Background — `httpSrv.ListenAndServe()` for ACME http-01. -Either listener failing → log + `os.Exit(1)`. No graceful shutdown (mirrors the insecure path). +Either listener failing → first error wins via #31's buffered `listenerErr` channel; `relay.Shutdown` drains both listeners (and the metrics listener) together; `run` returns exit 1. On SIGTERM/SIGINT both listeners drain via the same path and `run` returns exit 0. The `*autocert.Manager` is constructed once and used read-only; its internal locking is autocert's contract. diff --git a/docs/knowledge/features/graceful-shutdown.md b/docs/knowledge/features/graceful-shutdown.md new file mode 100644 index 0000000..83ed127 --- /dev/null +++ b/docs/knowledge/features/graceful-shutdown.md @@ -0,0 +1,143 @@ +# Graceful shutdown on SIGTERM + +On `SIGTERM` or `SIGINT`, the relay drains in-flight WebSocket connections to a clean `1001 StatusGoingAway` close before exiting. Connected phones and binaries observe the close code on the wire and run their existing reconnect path instead of seeing a TCP reset mid-frame. + +Best-effort by design: the drain has a bounded deadline (default 10s), and conns still alive at expiry are force-closed. Phones / binaries that have stopped reading from their socket will hit the 5s close-handshake gate (per the underlying `nhooyr.io/websocket` library) — the deadline budgets for one such stuck peer per fan-out goroutine and force-closes the rest. + +## Contract + +- **Signals handled**: `SIGTERM`, `SIGINT`. Captured via `signal.NotifyContext` at program entry. A second signal during drain is ignored — `NotifyContext` cancels its context once. `SIGKILL` is unhandled (the kernel reaps the process and no Go code runs). +- **Exit codes**: + - `0` — drain triggered by a signal (clean operator action). Returned whether `Shutdown` completed cleanly or hit the deadline. + - `1` — drain triggered by a listener error (`ListenAndServe[TLS]` returned a non-`http.ErrServerClosed` error). Process supervisors should restart. + - `2` — boot-time configuration refusal (unchanged from before this ticket). +- **New-connection refusal**: the listener is closed synchronously at the start of the drain (`http.Server.Shutdown(ctx)` shuts down the listener before waiting on handlers). Dials arriving after the drain begins see `ECONNREFUSED`; in-progress upgrades that haven't completed their handshake fail. +- **Close code on the wire**: every WS conn alive at the moment of drain receives a WebSocket close frame with code 1001 (`StatusGoingAway`) and reason `"shutting down"`. Idempotent: a handler-side `Close` racing the shutdown-side `CloseWithCode` does not double-emit (ADR-0007, `closeOnce`). +- **Drain deadline**: 10 seconds by default, declared as `const drainDeadline = 10 * time.Second` at the wiring site in `cmd/pyrycode-relay/main.go`. Conns whose close handshake has not completed by the deadline are force-closed via `http.Server.Close()`; their underlying TCP sockets close when the process exits. + +## API + +Package `internal/relay` (`shutdown.go`): + +```go +func Shutdown( + ctx context.Context, + logger *slog.Logger, + reg *Registry, + servers ...*http.Server, +) error +``` + +- Returns `nil` on clean drain within `ctx`'s deadline. +- Returns `ctx.Err()` (`context.DeadlineExceeded`) on deadline expiry; force-closes each `*http.Server` via `srv.Close()` before returning. +- Errors from `srv.Shutdown` are logged via `logger` but do NOT abort the drain — every server gets a `Shutdown` call, every conn gets a close call. The function's return value reflects only the deadline outcome. +- Safe to call exactly once per process. Concurrent or repeat invocations are undefined. + +Package `internal/relay` (`registry.go`): + +```go +func (r *Registry) Snapshot() []Conn +``` + +- Freshly-allocated slice of every `Conn` (binaries and phones) registered at the moment of the call. +- `nil` on empty registry. +- Order unspecified. +- Snapshotted under `RLock`; the caller iterates and closes outside the lock — preserves the passive-registry pattern (ADR-0003). + +## Sequence + +1. `main` calls `signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)` and threads the resulting context into `run`. +2. `run` launches one goroutine per `*http.Server` (insecure: public + optional metrics; autocert: HTTPS + HTTP-01 + optional metrics) invoking `ListenAndServe` or `ListenAndServeTLS("", "")`. Each goroutine forwards non-`http.ErrServerClosed` returns to a 1-slot buffered `listenerErr` channel (first error wins, additional errors dropped). +3. `run` `select`s on `<-sigCtx.Done()` (operator-triggered) or `<-listenerErr` (listener-triggered). +4. Either branch builds `drainCtx, cancel := context.WithTimeout(context.Background(), drainDeadline)` and calls `relay.Shutdown(drainCtx, logger, reg, servers...)`. +5. Inside `Shutdown`: + - For each server: launch a goroutine running `srv.Shutdown(drainCtx)` — closes the listener synchronously, waits for non-hijacked handlers. + - Snapshot `reg.Snapshot()`. For each conn: launch a goroutine running `c.CloseWithCode(StatusGoingAway, "shutting down")` (or plain `c.Close()` if the conn doesn't implement the unexported `gracefulCloser` interface). The fan-out unblocks every WS handler's `Read`, which lets each handler unwind through its existing defers. + - Wait on `done` (WaitGroup zero) or `drainCtx.Done()`. On ctx-first, iterate `servers` and call `srv.Close()` to force-close any non-WS conn or listener still pending; return `ctx.Err()`. On done-first, return `nil`. +6. `run` returns 0 or 1 based on the trigger; `main` calls `os.Exit(run(...))`. + +## Concurrency model + +During a drain, the relay holds: + +- **Listener goroutines** (1 per `*http.Server`) — block in `ListenAndServe[TLS]` until `srv.Shutdown` is called, at which point they return `http.ErrServerClosed` and exit. +- **Per-server `Shutdown` goroutines** (1 per server, inside the helper) — wait for non-hijacked handlers and the listener-close. Tracked by the helper's `WaitGroup`. +- **Per-conn close goroutines** (1 per snapshotted conn, inside the helper) — each may block up to 5s on the WS close handshake. Run in parallel so wall-clock stays bounded at ~5s regardless of conn count. Tracked by the same `WaitGroup`. +- **Handler goroutines** (1 per live WS) — NOT joined by the helper. `CloseWithCode` unblocks the handler's `Read`; the handler unwinds through its existing `defer reg.ScheduleReleaseServer / wsconn.Close` chain. Process exit reaps anything still alive at the deadline. + +**Ordering inside the helper is structural, not enforced by a barrier.** The listener-close goroutine and the close-fan-out are launched in sequence on the helper's goroutine; the snapshot is taken after the per-server `Shutdown` calls are in flight, so no new WS handshake can complete between snapshot and close (the listener is already closed). A conn that registered *just before* the snapshot is covered; a conn that tried to register *just after* the snapshot was started never got its handshake accepted. + +**Why concurrent fan-out is load-bearing**: `nhooyr.io/websocket.Conn.Close` blocks up to 5s waiting for the peer's reciprocal close (see [Lessons § nhooyr Close 5s handshake](../../lessons.md)). Serial iteration would scale linearly in conn count — two stuck peers would already blow past the 10s deadline. The drain deadline budgets for one such 5s handshake plus ~5s of headroom; concurrency keeps that budget achievable. + +## Force-close vs graceful close + +`http.Server.Shutdown(ctx)` does NOT call `srv.Close()` internally when `ctx` expires — its listeners stay bound, idle keep-alives stay open. The helper's deadline-path explicit `srv.Close()` loop is what makes the helper return promptly after the deadline. Without it, the helper would still hold listeners open until process exit. + +`Close()` on the listeners is the difference between "deadline expired, listeners still bound for ~100ms while goroutines exit" and "deadline expired, immediate force-close, prompt return". The unit test `TestShutdown_DeadlineExpiryReturnsCtxErr` asserts wall-clock < 300ms after a 50ms deadline specifically to catch a regression that drops the force-close. + +## Wiring (`cmd/pyrycode-relay/main.go`) + +Three structural seams: + +1. **Process entry split**: `main` is one line — `os.Exit(run(os.Args[1:], signalContextFor(syscall.SIGTERM, syscall.SIGINT)))`. The split exists so end-to-end tests in `main_e2e_test.go` can drive `run` with a synthetic `context.Context` instead of forking a subprocess to send real signals. +2. **`run(args []string, sigCtx context.Context) int`**: builds the servers slice for the active mode (insecure: 1 + optional metrics; autocert: 2 + optional metrics) and delegates to `runServers`. +3. **`runServers(sigCtx, logger, reg, servers, listen)`**: single sink for both modes. The `listen func(*http.Server) error` closure abstracts the per-server choice between `ListenAndServe` and `ListenAndServeTLS("", "")`. + +The `defer limiter.Close()` introduced by #47 (per-IP rate limiter eviction goroutine) now runs on every shutdown path — previously the `os.Exit(1)` calls inside listener goroutines skipped it. + +## Threat model alignment + +- **DoS resistance**: an attacker cannot stall the relay's shutdown beyond `drainDeadline`. Per-conn close goroutines are independent; one stuck peer doesn't block another's close. At deadline, `srv.Close()` force-closes regardless of peer state. +- **Information disclosure**: the close reason `"shutting down"` is hardcoded and operator-visible only on the wire to already-connected peers; no env/host/process info leaks. +- **Log hygiene**: the shutdown-path log lines emit only `err` (the wrapped `ctx.Err()` for deadline expiry) and the static "shutdown complete" / "shutdown signal received; draining" / "listener error; draining" / "drain incomplete" messages. No attacker-influenced values. +- **Sibling-defenses preserved**: the rate-limiter, header gate, registry caps all stay in force during drain — the listener is closed before the snapshot, so no new connection can be accepted; existing handlers continue to enforce their invariants until they unwind. + +## Failure modes + +| Failure | Surface | Recovery | +| --- | --- | --- | +| `SIGTERM` / `SIGINT` arrives during steady state | Drain runs; exit 0 | Operator restarts process or supervisor moves on | +| `ListenAndServe[TLS]` returns a non-`ErrServerClosed` error | Drain runs (best-effort close of any live conns); exit 1 | Process supervisor restarts | +| Drain deadline expires before all close handshakes complete | `srv.Close()` force-closes listeners; helper returns `context.DeadlineExceeded`; exit 0 (signal trigger) or 1 (listener-error trigger) | None — peers whose handshakes didn't complete see a TCP reset on those specific conns (no regression from pre-#31 behaviour) | +| Second `SIGTERM` arrives during drain | Ignored (`NotifyContext` cancels its context once) | Send `SIGKILL` if drain is wedged; kernel reaps the process | +| Empty registry at drain start | `Snapshot()` returns nil; close fan-out is a no-op; per-server `Shutdown` calls return promptly; helper returns nil | n/a | + +## What this deliberately does NOT do + +- **Reconnect / resume protocol coordination with peers** — the relay sends `1001` and exits; phones and binaries handle reconnect per their own logic. Resume semantics live in `pyrycode/pyrycode/docs/protocol-mobile.md` and are out of scope here. +- **Wait for handler goroutines to finish** — the helper unblocks handlers via `CloseWithCode` but does not join them. The handler unwind path is fast (existing defers) and any goroutine still alive at process exit is reaped by the kernel. +- **Persist in-flight frames** — the relay is stateless. Frames mid-write at drain time are lost; the peer's reconnect path re-establishes state. +- **Special-case the metrics listener** — `metricsSrv` joins the same `servers` slice as the public listener(s) and goes through the same `srv.Shutdown` / `srv.Close` path. Loopback scrapers see the same `ECONNREFUSED` after drain as any other client. +- **Configurable deadline via CLI flag** — 10s is a wiring-site constant. A future ticket can promote it to a flag if operational experience demands it; the helper takes `context.Context` so the change is localised. +- **Drain on `SIGHUP`** — `SIGHUP` is typically a config-reload signal; the relay has no runtime-reloadable config (everything is flags or build-time), so handling it would have no caller. Add when needed. + +## Testing + +`internal/relay/shutdown_test.go`: + +- `TestShutdown_EmptyRegistry` — no-args and one-server-no-conns; both return nil; `Serve` returns `http.ErrServerClosed`. +- `TestShutdown_ClosesAllConnsWithGoingAway` — 1 binary + 2 phones; all three observe `CloseWithCode(StatusGoingAway, …)`; server's listener closes. +- `TestShutdown_DeadlineExpiryReturnsCtxErr` — stuck conn + 50ms deadline; `errors.Is(err, context.DeadlineExceeded)`; wall-clock < 300ms; `Serve` returns `http.ErrServerClosed` (force-close path). +- `TestShutdown_CloseIdempotentOnRealWSConn` — races `CloseWithCode(StatusGoingAway)` + `Close()` on the same real `*WSConn`; both return. + +`internal/relay/registry_test.go`: + +- `TestSnapshot_EmptyRegistry` — pins `nil` on empty. +- `TestSnapshot_IncludesBinariesAndPhones` — set-membership over ConnIDs (no order). +- `TestSnapshot_FreshSliceIsolation` — mutating the returned slice does not affect the registry. +- `TestSnapshot_RaceFreedom` — concurrent `Snapshot` + `ClaimServer` / `ReleaseServer` / `RegisterPhone` under `-race`. + +`cmd/pyrycode-relay/main_e2e_test.go`: + +- `TestRun_SigtermClosesWSConnsWithGoingAway` — full boot via `run`, real WS dial against `/v1/server`, cancel the synthetic sigCtx, assert the next `Read` returns a close error with `websocket.CloseStatus(err) == StatusGoingAway`, assert exit code 0 within `drainDeadline + 2s`. +- `TestRun_SigtermWithNoConns` — empty-registry happy path; also asserts listener is no longer dialable after `run` returns. + +## Related + +- [Codebase: #31](../codebase/31.md) — implementation notes for this ticket. +- [ADR-0003: Connection registry as a passive store](../decisions/0003-connection-registry-passive-store.md) — the pattern `Snapshot` preserves. +- [ADR-0007: `WSConn.CloseWithCode` for active-conn application close codes](../decisions/0007-wsconn-closewithcode-for-active-conn.md) — the `closeOnce` idempotency the shutdown path inherits. +- [Connection registry](connection-registry.md) — the registry whose `Snapshot()` accessor this ticket adds. +- [Per-IP rate-limit middleware](rate-limit-middleware.md) — `defer limiter.Close()` now runs on every shutdown path. +- [Metrics listener (localhost-only)](metrics-listener.md) — the third `*http.Server` that joins the drain. +- [Lessons § nhooyr Close 5s handshake](../../lessons.md) — the constraint that forces concurrent fan-out. diff --git a/docs/knowledge/features/metrics-listener.md b/docs/knowledge/features/metrics-listener.md index 5c6a721..d3bf9b3 100644 --- a/docs/knowledge/features/metrics-listener.md +++ b/docs/knowledge/features/metrics-listener.md @@ -85,7 +85,7 @@ One new goroutine: the metrics listener's `srv.ListenAndServe()`. Started only a - TLS on the metrics listener — loopback is the entire defence (see *Threat model*). - Authentication on `/metrics` — same rationale. -- Graceful shutdown of either listener — the public listener has none either; SIGTERM-driven shutdown is open ticket #31, which will retrofit both listeners together. Structuring `metricsSrv` as a top-level local in `main` keeps that retrofit a localised edit. +- Per-listener teardown logic baked into this file — graceful shutdown (#31) lives in `internal/relay/shutdown.go`, takes `servers ...*http.Server`, and the metrics server joins that variadic alongside the public listener(s). The "top-level local in `main`" shape that this feature established was the seam that made #31 a localised edit. - A `--metrics-listen` unix-socket form — out of scope; would need its own threat-model review (file permissions, peer authentication). - A second `/metrics` collector beyond #61's `NewConnectionsMetrics` — siblings #57 / #58 append more `NewXxxMetrics(metricsReg, …)` calls at the wiring site this ticket establishes. diff --git a/docs/knowledge/features/rate-limit-middleware.md b/docs/knowledge/features/rate-limit-middleware.md index 60c3896..1c29f8d 100644 --- a/docs/knowledge/features/rate-limit-middleware.md +++ b/docs/knowledge/features/rate-limit-middleware.md @@ -85,7 +85,7 @@ One `*IPRateLimiter`, one middleware, applied to both `mux.Handle` registrations ### `defer limiter.Close()` is best-effort -The current listener block calls `os.Exit` on bind/serve errors, which skips defers. A real graceful-shutdown path (signal handler + `Server.Shutdown`) is out of scope; the defer runs on clean returns and any future shutdown sequence picks it up for free. +`defer limiter.Close()` now runs on every shutdown path: #31's `runServers` collapses the per-listener `os.Exit(1)` calls into a buffered-error channel, so `main` returns normally on signal or listener error and the deferred limiter stop fires. ## Why middleware, not constructor injection @@ -102,7 +102,6 @@ The middleware is stateless aside from the shared `*IPRateLimiter`. Concurrency - **Multi-instance shared-state rate limiting**: v1 is single-instance; multi-instance would need Redis or equivalent. - **Rate-limit metrics counter**: a future ticket, parallel to #58's frame-forward / grace-expiry counters. - **Adaptive (load-aware) policy**: fixed token-bucket is sufficient for v1. -- **Graceful-shutdown signal handler**: pre-existing gap, deferred until WS lifecycle work justifies it. ## Related diff --git a/docs/specs/architecture/31-graceful-shutdown-sigterm.md b/docs/specs/architecture/31-graceful-shutdown-sigterm.md new file mode 100644 index 0000000..81dd221 --- /dev/null +++ b/docs/specs/architecture/31-graceful-shutdown-sigterm.md @@ -0,0 +1,299 @@ +# Spec — graceful shutdown on SIGTERM: drain WS connections before exit (#31) + +## Files to read first + +- `cmd/pyrycode-relay/main.go:26-326` — full `main`. Two things to extract: + 1. The two listener branches (insecure: `:8080`-style `ListenAndServe` at + lines 161-211; autocert: `:443` `ListenAndServeTLS` + `:80` ACME at + lines 213-297, plus the loopback metrics listener at lines 198-205 / + 277-285). Both branches currently call the last `ListenAndServe[TLS]` + synchronously and `os.Exit(1)` on any goroutine listener error — this + spec replaces both shapes with a goroutine-per-listener + signal-wait + pattern. + 2. The boot-refusal flow above the listener block (lines 48-100). All + existing `os.Exit(2)` exits stay; the shutdown path attaches only to + post-boot, post-listener-start runtime. +- `internal/relay/registry.go:62-103,275-339` — the registry's lock shape and + the `PhonesFor` (lines 306-324) / `Counts` (lines 326-339) accessors. The + new accessor in this spec mirrors `PhonesFor` exactly: snapshot under + `RLock`, return a freshly-allocated slice, do all slow work (Close + fan-out) outside the registry lock. +- `internal/relay/ws_conn.go:96-120` — `Close` (delegates to + `CloseWithCode(StatusNormalClosure, "")`) and `CloseWithCode` semantics: + `closeOnce` makes both safe under concurrent unwind from a handler + defer, so the shutdown-path fan-out cannot collide with a handler + unwinding at the same instant (e.g. via heartbeat 1011). +- `internal/relay/server_endpoint.go:41-110` and + `internal/relay/client_endpoint.go:40-91` — the WS handlers. The + `websocket.Accept` call hijacks the underlying TCP conn; the handler + goroutine then runs the forwarder loop inline. This is what makes the + WS handlers invisible to `http.Server.Shutdown` (which by Go's contract + "does not attempt to close nor wait for hijacked connections such as + WebSockets" — `net/http` docs). The shutdown helper must enumerate them + itself from the registry. +- `docs/lessons.md` § *"nhooyr.io/websocket.Conn.Close performs a 5s + close-handshake, gating goroutine exit"* (lines 77-79) — every + `CloseWithCode` call can block for up to 5s waiting for the peer's + reciprocal close. Implication: the close-fan-out MUST be concurrent + (one goroutine per conn) so the wall-clock cost stays bounded at ~5s + regardless of conn count. The default drain deadline (10s) leaves ~5s + of headroom beyond that worst case. +- `docs/specs/architecture/3-connection-registry.md` (skim) — the + passive-registry pattern: registry never invokes `Send` or `Close` on a + `Conn` under its own lock. The new accessor preserves this — the lock + scope is the slice copy; the caller iterates and closes outside. +- `docs/specs/architecture/60-metrics-listener.md` (skim) — established + policy-values-at-wiring-site convention; the 10s drain deadline is one + more such literal. + +## Context + +The relay binary has no signal handling today. `grep -rn 'signal.Notify'` +returns zero hits in `cmd/` or `internal/`. SIGTERM (fly machine update, +container stop, systemd restart, k8s pod evict) kills in-flight WS frames +mid-write; peers reconnect after seeing a TCP reset. + +The fix is best-effort clean close: stop accepting new connections, send a +1001 (`StatusGoingAway`) close frame on every live WS conn, give peers a +bounded window to acknowledge, then exit. Phones and binaries observe the +close code on the wire and can run their existing reconnect path +(documented out-of-scope in `pyrycode/pyrycode/docs/protocol-mobile.md`). + +The shape is constrained by three pre-existing facts in this repo: + +- `http.Server.Shutdown(ctx)` is blind to hijacked WS conns (Go's + documented contract). Listener-close happens promptly inside `Shutdown`, + but the WS handlers won't unwind until we close their underlying conns. + The registry is the only authoritative list. +- `WSConn.CloseWithCode` (#7) already exists and is idempotent under + `closeOnce`, so the shutdown-path and a concurrent handler-defer Close + cannot double-fire. +- `nhooyr.io/websocket.Conn.Close` waits up to 5s for the peer's + reciprocal close. The shutdown helper has to fan closes out concurrently + or the wall-clock cost scales linearly in conn count. + +## Design + +Three pieces, in dependency order: + +### 1. Registry accessor: `Snapshot() []Conn` + +Add one method to `*Registry` in `internal/relay/registry.go`: + +```go +// Snapshot returns a freshly-allocated slice of every Conn currently +// registered as a binary or a phone. The caller may iterate, append, or +// otherwise mutate the slice without affecting the registry's internal +// state or holding any registry lock. Returns nil when no conns are +// registered. Used by the graceful-shutdown path (#31) to fan close +// frames out without exposing internal maps. +// +// The Conn handles are the same references the registry holds; calling +// Close on them affects the live connection. +func (r *Registry) Snapshot() []Conn +``` + +Lock shape: `RLock` for the duration of the copy; return outside the +lock. Same pattern as `PhonesFor`. Estimated count: snapshot capacity is +`len(r.binaries) + sum(len(s) for s in r.phones)` — compute under the +lock, single allocation, no resize churn. Order is unspecified +(map-iteration order); callers don't care. + +Why not `Snapshot() (binaries, phones []Conn)`: the only consumer +(shutdown helper) treats them uniformly — close them all with the same +code. Returning one flat slice keeps the caller boring. Future consumers +that care about role can read the registry through the existing +role-typed accessors. + +### 2. Shutdown helper: `Shutdown(ctx, logger, reg, servers...)` + +New file `internal/relay/shutdown.go`. Function signature: + +```go +// Shutdown initiates a graceful drain: it concurrently invokes +// http.Server.Shutdown on each server (which closes their listeners and +// waits for non-hijacked handlers), snapshots every WS conn from reg and +// emits a 1001 StatusGoingAway close frame on each, and returns once +// both fan-outs complete or ctx expires. On ctx expiry it force-closes +// each server via http.Server.Close before returning. +// +// Safe to call exactly once per process. Concurrent / repeat invocations +// are undefined. +func Shutdown(ctx context.Context, logger *slog.Logger, reg *Registry, servers ...*http.Server) error +``` + +Behavior (described, not pre-written): + +- Start one goroutine per server invoking `srv.Shutdown(ctx)`. Listener + close happens synchronously at the start of `Shutdown`, so new + `/v1/server` and `/v1/client` upgrade attempts will fail the handshake + the moment we enter this function. +- Snapshot via `reg.Snapshot()`; start one goroutine per snapshotted + conn invoking `c.CloseWithCode(websocket.StatusGoingAway, "shutting down")`. +- A `sync.WaitGroup` tracks the union of (a) per-server `Shutdown` + calls and (b) per-conn close calls. A single done-channel closes when + the WaitGroup hits zero. +- `select { case <-done: ... case <-ctx.Done(): ... }`. On ctx-first, + iterate `servers` and call `srv.Close()` to force-close any non-WS + conns that the per-server `Shutdown` was still waiting on; return + `ctx.Err()`. On done-first, return `nil`. +- Errors from `srv.Shutdown` are logged but do not abort the drain — + every server gets a Shutdown call, every conn gets a CloseWithCode + call. The function's return code reflects only deadline outcome. + +Error contract: returns `nil` on clean drain within deadline, `ctx.Err()` +on deadline expiry. Test for these two outcomes. + +### 3. `main.go` wiring + +Replace the two listener tails (lines 161-211 for `--insecure-listen` +mode, lines 213-297 for `--domain` mode) with a uniform pattern. The +boot-refusal flow above stays untouched. + +Sequence after `CheckListenerPorts` passes: + +1. Build the `[]*http.Server` slice for the active mode (insecure: one + public + optional metrics; autocert: HTTPS + HTTP-01 + optional + metrics). +2. `signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)` + — captures the signal as a context-cancel. +3. For each server, launch a listener goroutine: `srv.ListenAndServe()` + (or `ListenAndServeTLS("", "")` for the HTTPS server). On + non-`http.ErrServerClosed` return, send the error to a buffered + `listenerErr chan error` (one slot is enough — first error wins). +4. `select { case <-sigCtx.Done(): ... case err := <-listenerErr: ... }`. + On either branch, proceed to shutdown. +5. `drainCtx, cancel := context.WithTimeout(context.Background(), drainDeadline)` + (default `10*time.Second`, declared as a `const drainDeadline` at the + top of `main` per the wiring-site-policy convention). +6. `relay.Shutdown(drainCtx, logger, reg, servers...)`. Log the outcome. +7. Return from `main` with the appropriate exit code (`0` on signal or + `Shutdown` clean return, `1` if a listener-goroutine error was the + trigger). + +The existing `defer limiter.Close()` (main.go:147) now runs on the clean +return path. The comment at lines 142-146 referencing #47 as the +out-of-scope graceful-shutdown ticket can be updated to point to this +ticket — but the architect leaves the wording to the developer. + +The `os.Exit(1)` calls currently sitting inside the listener goroutines +(main.go:202, 207, 281, 289, 295) are replaced by the error-channel send +described above. None of the `os.Exit(2)` boot-refusal exits change. + +## Concurrency model + +Goroutines that exist during a drain: + +- **Listener goroutines** (1 per `*http.Server`): unchanged from today + except for the error sink. Each blocks in `ListenAndServe[TLS]` until + `http.Server.Shutdown` is invoked, at which point it returns + `http.ErrServerClosed`. +- **Per-server `Shutdown` goroutines** (1 per `*http.Server`, inside the + helper): synchronous from the helper's WaitGroup perspective. +- **Per-conn close goroutines** (1 per snapshotted `Conn`, inside the + helper): each may block up to 5s on the close handshake; running them + in parallel keeps wall-clock bounded. +- **Handler goroutines** (1 per live WS): not joined. Process exit reaps + them. The `CloseWithCode` from the close-fan-out is what unblocks the + handler's `Read` and lets it unwind through its existing + `defer reg.ScheduleReleaseServer / wsconn.Close` chain. + +Shutdown ordering is enforced by the helper, not by main: the moment the +helper is entered, listener-close is in flight; the moment the snapshot +is taken, every WS conn alive at that instant is targeted. Conns +registered after the snapshot starts cannot exist — listener is closed +before the snapshot fires (sequential inside the helper's first two +steps), so no new WS handshake can complete. + +## Error handling and edge cases + +- **Listener error during normal serving (TCP bind drop, autocert + failure, etc.)**: surfaced via `listenerErr` channel → triggers drain + → exit code 1. +- **Listener returns `http.ErrServerClosed`**: expected during shutdown. + Goroutine returns silently; no error-channel send. +- **Second SIGTERM during drain**: ignored. `signal.NotifyContext` only + cancels the context once; the shutdown helper proceeds on its + `drainCtx`. Sending a separate hard-kill signal (SIGKILL) bypasses Go + entirely and the kernel reaps the process — there is nothing for the + relay to do. +- **Drain deadline elapses with some conns still mid-handshake**: the + helper calls `srv.Close()` on each server (force-close of any non-WS + conn still around) and returns `ctx.Err()`. The hijacked WS conns' + underlying TCP sockets close when the process exits; their peers see + a TCP reset on those specific conns. Acceptable per the AC ("a peer + reading the conn observes the close code on the wire" applies to + every conn alive at drain start — those that DON'T finish the close + handshake within 10s degrade to today's behavior, no regression). +- **Empty registry at drain start**: `Snapshot()` returns nil; the close + fan-out is a no-op; the per-server `Shutdown` calls return promptly; + function returns `nil`. +- **Boot fails before listener launch**: existing `os.Exit(2)` paths are + unchanged. The shutdown helper is only reached after at least one + listener is running. + +## Testing strategy + +Tests split between the registry accessor, the shutdown helper, and a +single end-to-end smoke through `main`-equivalent wiring. + +**Registry — `internal/relay/registry_test.go`** (new test function): + +- `Snapshot` on an empty registry returns nil (or a zero-length slice; + pick one and stick to it — match `PhonesFor`'s nil-on-empty). +- `Snapshot` returns one entry per registered binary plus one per + registered phone (verify by ConnID set membership, not by order). +- `Snapshot` returns a fresh slice: mutating the returned slice does + not affect a subsequent `Snapshot` or `BinaryFor` / `PhonesFor`. +- Concurrent `Snapshot` + `ClaimServer` + `RegisterPhone` under + `-race`: no data race, no panic, internally consistent (each + individual `Snapshot` reflects some valid state — not a torn read). + +**Shutdown helper — `internal/relay/shutdown_test.go`** (new file): + +- Clean drain: pass a registry holding two fake `Conn`s (use the + existing test `mockConn` pattern from `registry_test.go:48-onwards`, + extended with a `closeCode` field if needed) and two pre-`Listen`'d + `httptest.NewServer().Config` instances. Assert both conns observed + `CloseWithCode(StatusGoingAway, …)` and both servers' listeners are + closed; helper returns `nil`. +- Deadline expiry: use a fake `Conn` whose `Close` blocks past the + deadline; pass a ctx with 50ms deadline; helper returns `ctx.Err()`; + `srv.Close` was invoked on each server (assert via a wrapper or by + observing the listener's `Accept` returns). +- Empty registry: helper returns `nil`, no panic, server-Shutdown still + ran. +- Idempotency of close: re-Close on the same WSConn does not panic and + does not double-emit (already covered by `closeOnce` in `ws_conn`, + but worth one assertion in the shutdown helper test). + +**End-to-end — `cmd/pyrycode-relay/main_e2e_test.go`** (new file, or +add to existing `deps_test.go`'s package if package-internal access is +useful): + +- Boot the relay on `--insecure-listen=127.0.0.1:0` via a test entry + point that returns the chosen port and a "trigger shutdown" hook. + Open a `/v1/server` WS, read once to consume the handshake response, + invoke the shutdown hook, assert the next read returns a close error + whose `websocket.CloseStatus` equals `StatusGoingAway`. Assert the + test entry point returns within the drain deadline. + + The "test entry point" can be as small as exporting `run(args []string, + signal <-chan os.Signal) int` from `cmd/pyrycode-relay` and having + `main` call `os.Exit(run(os.Args[1:], handlerCtxFor(SIGTERM/SIGINT)))`. + The exact shape is a developer choice; the spec asserts only that a + test can drive the shutdown path without forking a real subprocess. + +## Open questions + +- **Default drain deadline (10s)**: matches the ticket body and leaves + ~5s headroom beyond a single close-handshake. If operational + experience shows phones routinely take >5s to ack a close, this is + the knob to raise. Architect proposes 10s; PO / ops can override at + the wiring site without touching the helper. +- **Should `listenerErr` triggering a drain still emit exit code 0?** + Architect's proposal: exit 1 when shutdown was triggered by a + listener error (so process supervisors restart), exit 0 when + triggered by signal (clean operator action). The signal path is the + ticket's headline; the listener-error path is a side-effect of the + refactor and should not regress today's `os.Exit(1)` semantics. diff --git a/internal/relay/registry.go b/internal/relay/registry.go index 3b72f68..08cbb97 100644 --- a/internal/relay/registry.go +++ b/internal/relay/registry.go @@ -323,6 +323,36 @@ func (r *Registry) PhonesFor(serverID string) []Conn { return out } +// Snapshot returns a freshly-allocated slice of every Conn currently +// registered as a binary or a phone. The caller may iterate, append, or +// otherwise mutate the slice without affecting the registry's internal +// state or holding any registry lock. Returns nil when no conns are +// registered. Used by the graceful-shutdown path (#31) to fan close +// frames out without exposing internal maps. +// +// Order is unspecified (map-iteration order); callers don't care. The +// Conn handles inside the slice are the same references the registry +// holds; calling Close on them affects the live connection. +func (r *Registry) Snapshot() []Conn { + r.mu.RLock() + defer r.mu.RUnlock() + total := len(r.binaries) + for _, s := range r.phones { + total += len(s) + } + if total == 0 { + return nil + } + out := make([]Conn, 0, total) + for _, c := range r.binaries { + out = append(out, c) + } + for _, s := range r.phones { + out = append(out, s...) + } + return out +} + // Counts returns the number of binaries currently claimed and the total // number of phone connections summed across all server-ids. For the health // endpoint (#10). One call is internally consistent; two concurrent calls diff --git a/internal/relay/registry_test.go b/internal/relay/registry_test.go index 54e6202..085faae 100644 --- a/internal/relay/registry_test.go +++ b/internal/relay/registry_test.go @@ -539,6 +539,119 @@ func TestScheduleReleaseServer_RaceFreedomUnderRapidCycles(t *testing.T) { _, _ = r.Counts() } +func TestSnapshot_EmptyRegistry(t *testing.T) { + t.Parallel() + r := NewRegistry() + if got := r.Snapshot(); got != nil { + t.Errorf("Snapshot on empty registry: got %v, want nil", got) + } +} + +func TestSnapshot_IncludesBinariesAndPhones(t *testing.T) { + t.Parallel() + r := NewRegistry() + + b1 := &fakeConn{id: "b-1"} + b2 := &fakeConn{id: "b-2"} + p1 := &fakeConn{id: "p-1"} + p2 := &fakeConn{id: "p-2"} + p3 := &fakeConn{id: "p-3"} + + if err := r.ClaimServer("s1", b1); err != nil { + t.Fatalf("ClaimServer s1: %v", err) + } + if err := r.ClaimServer("s2", b2); err != nil { + t.Fatalf("ClaimServer s2: %v", err) + } + if err := r.RegisterPhone("s1", p1); err != nil { + t.Fatalf("RegisterPhone p1: %v", err) + } + if err := r.RegisterPhone("s1", p2); err != nil { + t.Fatalf("RegisterPhone p2: %v", err) + } + if err := r.RegisterPhone("s2", p3); err != nil { + t.Fatalf("RegisterPhone p3: %v", err) + } + + snap := r.Snapshot() + if len(snap) != 5 { + t.Fatalf("Snapshot len: got %d, want 5", len(snap)) + } + got := map[string]bool{} + for _, c := range snap { + got[c.ConnID()] = true + } + for _, want := range []string{"b-1", "b-2", "p-1", "p-2", "p-3"} { + if !got[want] { + t.Errorf("Snapshot: missing %q (got %v)", want, got) + } + } +} + +func TestSnapshot_FreshSliceIsolation(t *testing.T) { + t.Parallel() + r := NewRegistry() + if err := r.ClaimServer("s1", &fakeConn{id: "b-1"}); err != nil { + t.Fatalf("ClaimServer: %v", err) + } + if err := r.RegisterPhone("s1", &fakeConn{id: "p-1"}); err != nil { + t.Fatalf("RegisterPhone: %v", err) + } + + snap := r.Snapshot() + if len(snap) != 2 { + t.Fatalf("Snapshot len: got %d, want 2", len(snap)) + } + snap[0] = &fakeConn{id: "evil"} + + again := r.Snapshot() + for _, c := range again { + if c.ConnID() == "evil" { + t.Fatal("registry observed mutation through Snapshot") + } + } + + // Mutating registry after snapshot: the original snap length stays put. + if err := r.RegisterPhone("s1", &fakeConn{id: "p-2"}); err != nil { + t.Fatalf("RegisterPhone p2: %v", err) + } + if len(snap) != 2 { + t.Errorf("original snapshot length changed: got %d, want 2", len(snap)) + } + if len(r.Snapshot()) != 3 { + t.Errorf("new Snapshot after register: want 3") + } +} + +// TestSnapshot_RaceFreedom hammers Snapshot concurrently with mutation, +// asserting no data race under -race and that every Snapshot reflects an +// internally-consistent state (no torn reads). +func TestSnapshot_RaceFreedom(t *testing.T) { + t.Parallel() + r := NewRegistry() + + const goroutines = 16 + const opsPer = 200 + + var wg sync.WaitGroup + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { + g := g + go func() { + defer wg.Done() + sid := fmt.Sprintf("s-%d", g%4) + for i := 0; i < opsPer; i++ { + _ = r.ClaimServer(sid, &fakeConn{id: fmt.Sprintf("b-%d-%d", g, i)}) + _ = r.RegisterPhone(sid, &fakeConn{id: fmt.Sprintf("p-%d-%d", g, i)}) + _ = r.Snapshot() + r.UnregisterPhone(sid, fmt.Sprintf("p-%d-%d", g, i)) + _ = r.ReleaseServer(sid) + } + }() + } + wg.Wait() +} + // TestRegistry_RaceFreedom hammers the public API from many goroutines and // asserts the absence of DATA RACE reports under -race. // diff --git a/internal/relay/shutdown.go b/internal/relay/shutdown.go new file mode 100644 index 0000000..fb90b90 --- /dev/null +++ b/internal/relay/shutdown.go @@ -0,0 +1,87 @@ +package relay + +import ( + "context" + "errors" + "log/slog" + "net/http" + "sync" + + "nhooyr.io/websocket" +) + +// gracefulCloser is the narrow contract that lets Shutdown emit a +// specific WebSocket close code on each live conn. *WSConn satisfies +// it; the registry's Conn interface intentionally does not, so tests +// (fakeConn) and any future Conn impls that do not need code-aware +// close fall back to the plain Close() path. +type gracefulCloser interface { + CloseWithCode(code websocket.StatusCode, reason string) +} + +// Shutdown initiates a graceful drain. It concurrently invokes +// http.Server.Shutdown on each server (which closes their listeners +// and waits for non-hijacked handlers), snapshots every WS conn from +// reg, and emits a 1001 StatusGoingAway close frame on each. It +// returns once both fan-outs complete or ctx expires. +// +// On ctx expiry it force-closes each server via http.Server.Close +// before returning ctx.Err(); the helper does not wait for the +// per-conn close goroutines that are still mid-handshake — those +// will be reaped by process exit. +// +// Errors from srv.Shutdown are logged but do not abort the drain. +// Every server gets a Shutdown call, every conn gets a close call. +// The function's return reflects only deadline outcome: +// +// - nil on clean drain within deadline, +// - ctx.Err() on deadline expiry. +// +// Safe to call exactly once per process. Concurrent or repeat +// invocations are undefined. +func Shutdown(ctx context.Context, logger *slog.Logger, reg *Registry, servers ...*http.Server) error { + var wg sync.WaitGroup + + for _, srv := range servers { + srv := srv + wg.Add(1) + go func() { + defer wg.Done() + if err := srv.Shutdown(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + logger.Error("http server shutdown", "err", err) + } + }() + } + + conns := reg.Snapshot() + for _, c := range conns { + c := c + wg.Add(1) + go func() { + defer wg.Done() + if gc, ok := c.(gracefulCloser); ok { + gc.CloseWithCode(websocket.StatusGoingAway, "shutting down") + return + } + c.Close() + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + logger.Info("shutdown complete") + return nil + case <-ctx.Done(): + for _, srv := range servers { + _ = srv.Close() + } + logger.Warn("shutdown deadline exceeded; force-closed listeners", "err", ctx.Err()) + return ctx.Err() + } +} diff --git a/internal/relay/shutdown_test.go b/internal/relay/shutdown_test.go new file mode 100644 index 0000000..b5b0dbd --- /dev/null +++ b/internal/relay/shutdown_test.go @@ -0,0 +1,262 @@ +package relay + +import ( + "context" + "errors" + "io" + "log/slog" + "net" + "net/http" + "sync" + "sync/atomic" + "testing" + "time" + + "nhooyr.io/websocket" +) + +// shutdownFakeConn extends the registry-test fakeConn pattern with a +// captured close code and an optional Close-blocking knob, so shutdown +// tests can assert both the per-conn code emission and the +// deadline-expiry path. +type shutdownFakeConn struct { + id string + mu sync.Mutex + closed bool + closeCode websocket.StatusCode + closeMsg string + blockFor time.Duration +} + +func (c *shutdownFakeConn) ConnID() string { return c.id } +func (c *shutdownFakeConn) Send(msg []byte) error { return nil } + +func (c *shutdownFakeConn) Close() { + c.CloseWithCode(websocket.StatusNormalClosure, "") +} + +func (c *shutdownFakeConn) CloseWithCode(code websocket.StatusCode, reason string) { + if c.blockFor > 0 { + time.Sleep(c.blockFor) + } + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return + } + c.closed = true + c.closeCode = code + c.closeMsg = reason +} + +func (c *shutdownFakeConn) snapshot() (closed bool, code websocket.StatusCode, msg string) { + c.mu.Lock() + defer c.mu.Unlock() + return c.closed, c.closeCode, c.closeMsg +} + +// silentLogger discards output so tests don't spam stderr. +func silentLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +// newTestServer builds an *http.Server bound to a fresh loopback port and +// starts ListenAndServe in a goroutine. Returns the server and a +// listenerErr channel that receives the ListenAndServe return value. +// The returned function blocks until the listener has actually accepted +// bind (port is reachable) so tests don't race the goroutine. +func newTestServer(t *testing.T) (*http.Server, <-chan error) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + srv := &http.Server{Handler: http.NewServeMux(), Addr: ln.Addr().String()} + errCh := make(chan error, 1) + go func() { + errCh <- srv.Serve(ln) + }() + return srv, errCh +} + +func TestShutdown_EmptyRegistry(t *testing.T) { + t.Parallel() + reg := NewRegistry() + srv, errCh := newTestServer(t) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + if err := Shutdown(ctx, silentLogger(), reg); err != nil { + t.Errorf("Shutdown empty: got %v, want nil", err) + } + + // And with a server present but no conns. + if err := Shutdown(ctx, silentLogger(), reg, srv); err != nil { + t.Errorf("Shutdown with server, no conns: got %v, want nil", err) + } + select { + case err := <-errCh: + if !errors.Is(err, http.ErrServerClosed) { + t.Errorf("Serve return: got %v, want http.ErrServerClosed", err) + } + case <-time.After(time.Second): + t.Fatal("Serve did not return after Shutdown") + } +} + +func TestShutdown_ClosesAllConnsWithGoingAway(t *testing.T) { + t.Parallel() + reg := NewRegistry() + srv, errCh := newTestServer(t) + + b := &shutdownFakeConn{id: "b-1"} + p1 := &shutdownFakeConn{id: "p-1"} + p2 := &shutdownFakeConn{id: "p-2"} + + if err := reg.ClaimServer("s1", b); err != nil { + t.Fatalf("ClaimServer: %v", err) + } + if err := reg.RegisterPhone("s1", p1); err != nil { + t.Fatalf("RegisterPhone p1: %v", err) + } + if err := reg.RegisterPhone("s1", p2); err != nil { + t.Fatalf("RegisterPhone p2: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + if err := Shutdown(ctx, silentLogger(), reg, srv); err != nil { + t.Errorf("Shutdown: got %v, want nil", err) + } + + for _, c := range []*shutdownFakeConn{b, p1, p2} { + closed, code, _ := c.snapshot() + if !closed { + t.Errorf("%s: not closed", c.id) + } + if code != websocket.StatusGoingAway { + t.Errorf("%s: close code = %d, want %d (StatusGoingAway)", c.id, code, websocket.StatusGoingAway) + } + } + + select { + case err := <-errCh: + if !errors.Is(err, http.ErrServerClosed) { + t.Errorf("Serve return: got %v, want http.ErrServerClosed", err) + } + case <-time.After(time.Second): + t.Fatal("Serve did not return after Shutdown") + } +} + +func TestShutdown_DeadlineExpiryReturnsCtxErr(t *testing.T) { + t.Parallel() + reg := NewRegistry() + + // Conn whose Close blocks past the deadline. + stuck := &shutdownFakeConn{id: "stuck", blockFor: 500 * time.Millisecond} + if err := reg.ClaimServer("s1", stuck); err != nil { + t.Fatalf("ClaimServer: %v", err) + } + + // A server whose listener stays open — Shutdown should still return + // once the ctx fires, and srv.Close should force-close the listener. + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + srv := &http.Server{Handler: http.NewServeMux(), Addr: ln.Addr().String()} + serveErr := make(chan error, 1) + go func() { serveErr <- srv.Serve(ln) }() + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + start := time.Now() + err = Shutdown(ctx, silentLogger(), reg, srv) + elapsed := time.Since(start) + + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("Shutdown: got %v, want context.DeadlineExceeded", err) + } + // Should return promptly after deadline, well before the stuck-close completes. + if elapsed > 300*time.Millisecond { + t.Errorf("Shutdown took %v after 50ms deadline; expected prompt return after srv.Close()", elapsed) + } + + // srv.Close was invoked → Serve returns http.ErrServerClosed. + select { + case err := <-serveErr: + if !errors.Is(err, http.ErrServerClosed) { + t.Errorf("Serve return: got %v, want http.ErrServerClosed", err) + } + case <-time.After(time.Second): + t.Fatal("Serve did not return after Shutdown deadline expiry") + } +} + +func TestShutdown_CloseIdempotentOnRealWSConn(t *testing.T) { + // Verifies that Shutdown's CloseWithCode against a *WSConn is + // idempotent under closeOnce. The pyrycode WSConn already covers + // this directly; this test pins the assumption that the shutdown + // path inherits it. + t.Parallel() + + srv := httpServerForWS(t, func(w http.ResponseWriter, r *http.Request) { + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{OriginPatterns: []string{"*"}}) + if err != nil { + return + } + wsconn := NewWSConn(c, "x", 1024) + + // Race: handler's own close vs shutdown's CloseWithCode. + var ran atomic.Int32 + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + wsconn.CloseWithCode(websocket.StatusGoingAway, "shutting down") + ran.Add(1) + }() + go func() { + defer wg.Done() + wsconn.Close() + ran.Add(1) + }() + wg.Wait() + if ran.Load() != 2 { + t.Errorf("expected both close calls to return; ran=%d", ran.Load()) + } + }) + defer srv.Close() + + dial(t, srv) +} + +// httpServerForWS spins up an httptest-style HTTP server with the given +// handler so the test can exercise a real *websocket.Conn close path. +func httpServerForWS(t *testing.T, h http.HandlerFunc) *http.Server { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + srv := &http.Server{Handler: h, Addr: ln.Addr().String()} + go func() { _ = srv.Serve(ln) }() + return srv +} + +func dial(t *testing.T, srv *http.Server) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + c, _, err := websocket.Dial(ctx, "ws://"+srv.Addr, nil) + if err != nil { + t.Fatalf("dial: %v", err) + } + // Read until close so the handler observes the peer disconnect. + _, _, _ = c.Read(ctx) + c.Close(websocket.StatusNormalClosure, "") +}