diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8992280 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,29 @@ +# Keep the build context tight: skip everything that doesn't affect +# the Go build. Big wins are .git and the __examples directory. +.git +.github +.vscode +.idea +.DS_Store +*.log +*.tmp + +# Example projects — separate go.mod files, not built into the +# server binary. +__examples + +# Test artifacts. +*.test +*.out +*.prof +coverage.txt +bench-*.txt + +# Local dev helpers. +docker-compose.yml +docker-compose.*.yml +!docker-compose.cluster.yml + +# Docs that don't affect the binary. +*.md +docs diff --git a/.github/workflows/cluster.yml b/.github/workflows/cluster.yml new file mode 100644 index 0000000..46491cd --- /dev/null +++ b/.github/workflows/cluster.yml @@ -0,0 +1,59 @@ +--- +name: cluster + +# Cross-process cluster smoke. Boots the 5-node docker-compose +# stack defined at docker-compose.cluster.yml, waits for every +# node's /healthz to flip to 200, then runs the assertion script +# under scripts/tests/. Catches the class of bugs that unit +# tests miss because they only exercise in-process behavior: +# * config not flowing from HyperCache wrapper to DistMemory +# * seed lists with empty node IDs producing broken rings +# * wire-encoding asymmetries between writer and replica +# +# Container logs are dumped on failure so a CI failure is +# debuggable without re-running locally. + +on: + pull_request: + push: + branches: [main] + +permissions: + contents: read + +jobs: + smoke: + name: 5-node smoke + runs-on: ubuntu-latest + timeout-minutes: 10 + + steps: + - uses: actions/checkout@v6 + + - name: Build cluster image + run: | + docker compose -f docker-compose.cluster.yml build + + - name: Bring cluster up + run: | + docker compose -f docker-compose.cluster.yml up -d + + - name: Wait for cluster /healthz + run: bash scripts/tests/wait-for-cluster.sh + + - name: Run cross-node smoke + run: bash scripts/tests/10-test-cluster-api.sh + + - name: Dump container logs (on failure) + if: failure() + run: | + for c in hypercache-1 hypercache-2 hypercache-3 hypercache-4 hypercache-5; do + echo "::group::$c" + docker logs --tail 200 "$c" || true + echo "::endgroup::" + done + + - name: Tear down + if: always() + run: | + docker compose -f docker-compose.cluster.yml down -v --remove-orphans diff --git a/.github/workflows/gitleaks.yml b/.github/workflows/gitleaks.yml index b71a776..3e7f581 100644 --- a/.github/workflows/gitleaks.yml +++ b/.github/workflows/gitleaks.yml @@ -5,7 +5,7 @@ permissions: on: pull_request: push: - branches: [main] + branches: [ main ] workflow_dispatch: schedule: # run once a day at 4 AM diff --git a/.github/workflows/image.yml b/.github/workflows/image.yml new file mode 100644 index 0000000..32f3b54 --- /dev/null +++ b/.github/workflows/image.yml @@ -0,0 +1,97 @@ +--- +name: image + +# Build (and on the right refs, publish) the hypercache-server +# container image. Three trigger shapes: +# * pull_request — build only, never push (catches Dockerfile +# regressions without polluting the registry). +# * push to main — build + push as `:main` and `:sha-` +# so consumers can pin to either. +# * tag push (v*.*.*) — build + push semver-flavored tags +# (`:v1.2.3`, `:1.2.3`, `:1.2`, `:1`, `:latest`) for stable +# pinning. +# Multi-arch linux/amd64 + linux/arm64 via buildx + qemu so +# operators on Apple Silicon (or k8s nodes on Graviton) get a +# native binary without emulation. + +on: + pull_request: + push: + branches: [ main ] + tags: [ "v*.*.*" ] + workflow_dispatch: + +permissions: + contents: read + packages: write + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }}/hypercache-server + +jobs: + build: + name: build${{ github.event_name == 'pull_request' && ' (no push)' || ' + push' + }} + runs-on: ubuntu-latest + timeout-minutes: 20 + + steps: + - uses: actions/checkout@v6 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v4 + + # Login is gated on non-PR events. Forks running PR workflows + # don't have access to GITHUB_TOKEN with packages:write, and + # we never push from a PR anyway — so skipping the login step + # avoids an avoidable failure on those events. + - name: Log in to GHCR + if: github.event_name != 'pull_request' + uses: docker/login-action@v4.1.0 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + # docker/metadata-action computes the tag set + OCI labels + # from the triggering ref. The semver patterns only match + # when the ref is a `v*.*.*` tag; on branch/PR pushes they + # produce no tags and the type=ref/type=sha entries take over. + # `:latest` is restricted to semver tag pushes — production + # operators pinning to `:latest` get the highest stable + # release, never an in-flight main commit. The `latest=false` + # flavor disables the metadata-action default behavior that + # would otherwise tag `:latest` on every default-branch push. + - name: Compute tags and labels + id: meta + uses: docker/metadata-action@v6 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=ref,event=branch + type=ref,event=pr + type=sha,format=short + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}} + type=raw,value=latest,enable=${{ github.ref_type == 'tag' && startsWith(github.ref, 'refs/tags/v') }} + flavor: | + latest=false + + - name: Build${{ github.event_name == 'pull_request' && '' || ' + push' }} + uses: docker/build-push-action@v7.1.0 + with: + context: . + file: cmd/hypercache-server/Dockerfile + platforms: linux/amd64,linux/arm64 + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + # GHA cache speeds re-builds when only Go source changed + # (the dependency-download layer stays warm). + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.gitignore b/.gitignore index ddadd69..6a5e0d7 100644 --- a/.gitignore +++ b/.gitignore @@ -96,6 +96,3 @@ tags ### Project ### .dccache -cmd/ -bin/ -.github/instructions/instructions.md diff --git a/.gitleaksignore b/.gitleaksignore new file mode 100644 index 0000000..623dba7 --- /dev/null +++ b/.gitleaksignore @@ -0,0 +1,15 @@ +docker-compose.cluster.yml:curl-auth-header:14 +docker-compose.cluster.yml:curl-auth-header:16 +docker-compose.cluster.yml:curl-auth-header:17 +scripts/tests/10-test-cluster-api.sh:curl-auth-header:6 +scripts/tests/10-test-cluster-api.sh:curl-auth-header:13 +scripts/tests/10-test-cluster-api.sh:curl-auth-header:19 +scripts/tests/10-test-cluster-api.sh:curl-auth-header:26 +scripts/tests/10-test-cluster-api.sh:curl-auth-header:32 +scripts/tests/10-test-cluster-api.sh:curl-auth-header:36 +cmd/hypercache-server/README.md:curl-auth-header:50 +cmd/hypercache-server/README.md:curl-auth-header:55 +cmd/hypercache-server/README.md:curl-auth-header:59 +cmd/hypercache-server/README.md:curl-auth-header:102 +cmd/hypercache-server/README.md:curl-auth-header:108 +cmd/hypercache-server/README.md:curl-auth-header:112 diff --git a/CHANGELOG.md b/CHANGELOG.md index d9cbf01..c474227 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,123 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Added +- **Cross-process cluster smoke in CI** — + [.github/workflows/cluster.yml](.github/workflows/cluster.yml) boots + the 5-node `docker-compose.cluster.yml` stack on every PR/push, + waits for `/healthz` on every node, then runs the assertion + script at + [scripts/tests/10-test-cluster-api.sh](scripts/tests/10-test-cluster-api.sh). + Container logs are dumped on failure for debuggability without a + re-run. This catches the class of bugs that escaped the previous + PR (factory dropped DistMemoryOptions, seeds without IDs, + json.RawMessage on non-owner GET) — none would have been + detected by unit/integration tests because they only exercised + in-process behavior. +- **`make test-cluster` Makefile target** mirrors the CI flow for + local development: brings the cluster up, waits, runs the smoke, + and tears down on the way out (preserving the smoke's exit code). +- **`scripts/tests/wait-for-cluster.sh`** is the polling helper that + blocks until every node's `/healthz` returns 200, with a default + 30-second deadline configurable via `TIMEOUT_SECS`. Used by both + the Makefile and the CI workflow so the assertion script downstream + never races the listener bind. +- **`scripts/tests/10-test-cluster-api.sh` hardened** from a + print-only smoke into a real regression test: 17 explicit + assertions across propagation / wire-encoding / cross-node + delete, color-coded `OK`/`FAIL` output, exit code reflects + total failure count. +- **`cmd/hypercache-server/main_test.go`** — fast Go unit tests + pinning the wire-encoding contracts on `writeValue` / + `decodeBase64Bytes`. Covers `[]byte` (writer path), `string` + (replica path), `json.RawMessage` (non-owner-GET path), and the + base64-heuristic length floors. Runs without docker for tight + feedback during development. +- **Multi-arch container image workflow** — + [.github/workflows/image.yml](.github/workflows/image.yml) builds + the `hypercache-server` Docker image for `linux/amd64` and + `linux/arm64` via buildx + QEMU, publishing to GHCR + (`ghcr.io///hypercache-server`). PR triggers + build-only (no registry pollution), `main` pushes publish + `:main` and `:sha-`, semver tag pushes (`v*.*.*`) + publish `:v1.2.3`, `:1.2.3`, `:1.2`, `:1`, and `:latest`. + `:latest` is **deliberately restricted to semver tag pushes** — + production deployments pinning `:latest` always get a stable + release, never an in-flight `main` commit. GHA cache speeds + re-builds when only Go source has changed. + +### Fixed + +- **Cluster propagation was completely broken.** The + `DistMemoryBackendConstructor.Create` factory in `factory.go` + silently discarded `cfg.DistMemoryOptions` and called + `backend.NewDistMemory(ctx)` with **no arguments**. Every + `WithDistNode`, `WithDistSeeds`, `WithDistReplication`, etc. that + callers wired through `hypercache.NewConfig` was a silent no-op, + leaving every node with a default standalone configuration that + only knew itself. The factory now forwards + `cfg.DistMemoryOptions...` like every other backend constructor + does. This was the production-blocking bug — a Set on one node + never reached its peers because the other nodes weren't actually + in any node's ring. +- **Seed addresses without node IDs produced a broken ring.** + `initStandaloneMembership` added every seed to membership with an + empty `NodeID`, so the consistent-hash ring was built over + empty-string owners. `Set` would resolve owners as + `["", "", "self"]`, fan-outs to `""` failed with + `ErrBackendNotFound`, the writer self-promoted, and the data + never reached its peers. The HTTP transport has no node-discovery + protocol, so the only way to populate node IDs in the ring is at + configuration time. Seeds now accept an optional `id@addr` syntax + (`node-2@hypercache-2:7946`) — bare `addr` keeps the legacy + empty-ID behavior for in-process tests. Production deployments + must use `id@addr`. +- **`Remove` from a non-primary owner skipped the primary.** + `removeImpl` checked `dm.ownsKeyInternal(key)` (true for any + ring owner) and ran `applyRemove` locally — but `applyRemove`'s + fan-out only covers `owners[1:]` under the assumption the caller + is `owners[0]`. When a replica initiated the remove, the primary + never got the delete. The Remove path now mirrors Set: + non-primary callers forward to the primary, primary applies + + fans out. Tombstones now propagate cluster-wide regardless of + which node receives the DELETE. +- **Client API responses were unhelpful.** Set/Remove returned + `204 No Content` with empty bodies; errors were raw text via + `SendString`. Replaced with structured JSON: PUT/DELETE return + `{key, stored|deleted, bytes, node, owners}` so operators can + immediately see where the value landed; errors return + `{error, code}` with stable code strings (`BAD_REQUEST`, + `NOT_FOUND`, `DRAINING`, `INTERNAL`). Added + `GET /v1/owners/:key` for client-side ring visibility. +- **GET response leaked base64 on replicas.** `[]byte` values + round-trip through JSON as base64 strings; replica nodes that + received a value via the dist HTTP transport stored it as a + `string` and returned it raw, so a `PUT world` on node-A + resulted in `d29ybGQ=` from `GET` on node-B. The client GET + handler now base64-decodes string values when they look like + valid byte content, restoring writer-receiver symmetry. +- **GET on non-owner nodes returned a JSON-quoted base64 string.** + The dist HTTP transport's `decodeGetBody` decodes `Item.Value` as + `json.RawMessage` to preserve wire-bytes type fidelity. The + client GET handler's type switch only matched `[]byte` and + `string`, so non-owner GETs (which always go through the + forward-fetch path) fell to the `default` branch and re-emitted + the value as JSON — producing `"d29ybGQ="` instead of `world`. + Added an explicit `json.RawMessage` case that interprets the raw + JSON as a string when possible, then base64-decodes if applicable. + Verified end-to-end against the 5-node Docker cluster where two + of the five nodes are non-owners for any given key. + +- **Race in `queueHint` between hint enqueue and hint replay.** Pre-fix, + the metric write `dm.metrics.hintedBytes.Store(dm.hintBytes)` happened + *after* releasing `hintsMu`, so a concurrent `adjustHintAccounting` + call from the replay loop could race the read. Capturing the value + under the lock closes the race. Surfaced when migration failures + began funneling through `queueHint` (Phase B.2 below) — previously + the migration path swallowed errors silently, so the hint enqueue + rate from rebalance ticks was much lower. + +### Added (earlier in this cycle) + - **Structured logging on the dist backend.** New `WithDistLogger(*slog.Logger)` option wires a structured logger into the dist backend's background loops (heartbeat, hint replay, rebalance, merkle sync) and operational @@ -46,6 +163,95 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). against a stopped backend. Library default is a no-op meter, so metrics cost nothing unless the caller opts in. Phase A.3 of the production-readiness work. +- **SWIM-style indirect heartbeat probes.** New + `WithDistIndirectProbes(k, timeout)` option enables the indirect- + probe refutation path: when a direct heartbeat to a peer fails, + this node asks `k` random alive peers to probe the target on its + behalf, and only marks the target suspect if every relay also + fails. Filters caller-side network blips (transient NIC reset, + single stuck connection in this node's pool) that would otherwise + cause spurious suspect/dead transitions. New transport method + `IndirectHealth(ctx, relayNodeID, targetNodeID)` and HTTP endpoint + `GET /internal/probe?target=` carry the probe; auth-wrapped + identically to the rest of `/internal/*`. New metrics + `dist.heartbeat.indirect_probe.success`, `.failure`, `.refuted` + expose probe outcomes. `k = 0` (default) preserves the pre-Phase-B + behavior. Phase B.1 of the production-readiness work — note that + the heartbeat path still carries the `experimental` marker until + self-refutation via incarnation-disseminating gossip lands in a + later phase. +- **Migration failures now retry through the hint queue.** When a + rebalance forwards a key to its new primary and the transport + returns *any* error (not just `ErrBackendNotFound`), the item is + enqueued onto the existing hint-replay queue keyed by the new + primary, instead of being logged and dropped. The hint-replay + loop drains it on its configured schedule until the hint TTL + expires. Same broadening applies to the `replicateTo` fan-out on + the primary `Set` path — transient HTTP failures (timeout, 5xx, + connection reset) no longer silently drop replicas. Phase B.2 of + the production-readiness work. +- **On-wire compression for the dist HTTP transport.** New + `DistHTTPLimits.CompressionThreshold` field opts the auto-created + HTTP client into gzip-compressing Set request bodies whose + serialized payload exceeds the configured byte threshold. The + client sets `Content-Encoding: gzip` and the server transparently + decompresses (via fiber v3's auto-decoding `Body()`). Threshold + `0` (default) preserves the pre-Phase-B wire format byte-for-byte. + Operators on bandwidth-constrained links with values above ~1 KiB + typically see meaningful reductions; below-threshold values pay + no compression cost. Roll out the threshold to all peers before + raising it on any peer — a server with compression disabled will + reject a gzip body with HTTP 400. Phase B.3 of the + production-readiness work. +- **Drain endpoint for graceful shutdown.** New + `DistMemory.Drain(ctx)` method and `POST /dist/drain` HTTP + endpoint mark the node for shutdown: `/health` returns 503 so + load balancers stop routing, `Set`/`Remove` return + `sentinel.ErrDraining`, `Get` continues to serve so in-flight + reads complete. New `IsDraining()` accessor for dashboards. New + metric `dist.drains` records transitions. Drain is one-way and + idempotent. Phase C.1 of the production-readiness work. +- **Cursor-based key enumeration** replaces the pre-Phase-C + testing-only `/internal/keys` endpoint. The endpoint now returns + shard-level pages with a `next_cursor` token; clients walk the + cursor chain to enumerate the full key set. New `?limit=` query + parameter truncates within a shard for clusters with very large + shards (response then carries `truncated=true` and the same + `next_cursor`). The `DistHTTPTransport.ListKeys` helper now walks + pages internally so existing callers (anti-entropy fallback, tests) + keep their full-set semantics unchanged. Phase C.2 of the + production-readiness work. +- **Operations runbook** at [docs/operations.md](docs/operations.md) + covering split-brain, hint-queue overflow, rebalance under load, + replica loss, observability wiring (logger/tracer/meter), drain + procedure, and capacity-planning notes. Cross-links each failure + mode to the metrics that surface it. Phase C.3 of the + production-readiness work. +- **Production server binary** at + [`cmd/hypercache-server`](cmd/hypercache-server). Wraps DistMemory + via HyperCache and exposes three HTTP listeners per node: the + client REST API (`PUT`/`GET`/`DELETE /v1/cache/:key`), + management HTTP (`/health`, `/stats`, `/config`, + `/dist/metrics`, `/cluster/*`), and the inter-node dist HTTP. + 12-factor configuration via `HYPERCACHE_*` environment + variables — same binary runs in Docker, k8s, and bare-metal. + Graceful shutdown on SIGTERM/SIGINT runs Drain → API stop → + HyperCache Stop with a 30 s deadline. JSON-formatted slog + logger pre-bound with `node_id`. Multi-stage `Dockerfile` builds + a distroless static image (`gcr.io/distroless/static-debian12:nonroot`). +- **5-node local cluster compose** at + [`docker-compose.cluster.yml`](docker-compose.cluster.yml) — five + hypercache-server nodes on a shared `hypercache-cluster` Docker + network, each knowing the other four as seeds, replication=3. + Client APIs exposed on host ports 8081–8085, management HTTP on + 9081–9085. Includes a smoke-test recipe in the + [server README](cmd/hypercache-server/README.md). Phase D of the + production-readiness work. +- **`HyperCache.DistDrain(ctx)`** convenience method in + [hypercache_dist.go](hypercache_dist.go) — calls Drain on the + underlying DistMemory backend when one is configured, no-op on + in-memory / Redis backends. Lets the server binary trigger drain + without type-asserting through the unexported backend field. ## [0.5.0] — 2026-05-05 diff --git a/Makefile b/Makefile index 022ad3b..d992bd2 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,34 @@ build: @echo "Building..." go build -v ./... +start-dev-cluster: stop-dev-cluster + @echo "building and lifting a new hypercache stack" + @echo + COMPOSE_BAKE=true docker compose -f docker-compose.cluster.yml up --build + +stop-dev-cluster: + @echo "Stopping any previously running stack" + @echo + docker compose -f docker-compose.cluster.yml down -v --rmi local --remove-orphans + +# test-cluster brings up the 5-node docker-compose cluster, waits for +# every node's /healthz to be 200, runs the cross-node smoke test +# (PUT/GET/DELETE asserted on every node), and tears the stack down — +# always, even on assertion failure — so a failing run leaves no +# stragglers. The shell-script's exit code is propagated so CI can +# fail the build on any regression of the bugs that escaped Phase D +# initial review (factory dropped options, seeds without IDs, +# json.RawMessage on non-owner GET). +test-cluster: stop-dev-cluster + @echo "spinning up cluster + running cross-node smoke" + @echo + docker compose -f docker-compose.cluster.yml up --build -d + @bash scripts/tests/wait-for-cluster.sh + @rc=0; bash scripts/tests/10-test-cluster-api.sh || rc=$$?; \ + echo ""; echo "tearing down cluster (rc=$$rc)"; \ + docker compose -f docker-compose.cluster.yml down -v --rmi local --remove-orphans >/dev/null 2>&1 || true; \ + exit $$rc + # ci aggregates the gates required before declaring a task done (see AGENTS.md). ci: lint typecheck test-race sec build @echo "All CI gates passed." diff --git a/cmd/hypercache-server/Dockerfile b/cmd/hypercache-server/Dockerfile new file mode 100644 index 0000000..f8d8f92 --- /dev/null +++ b/cmd/hypercache-server/Dockerfile @@ -0,0 +1,38 @@ +# syntax=docker/dockerfile:1.7 + +# Multi-stage build: produce a small, distroless final image. +# Build stage uses the matching Go toolchain pinned in go.mod. +ARG GO_VERSION=1.26 + +FROM golang:${GO_VERSION}-alpine AS build + +WORKDIR /src + +# Copy go.mod / go.sum first so the dependency layer caches across +# code-only rebuilds. +COPY go.mod go.sum ./ +RUN go mod download + +# Copy the rest and build a static binary. CGO_ENABLED=0 so the binary +# runs unchanged on distroless/scratch; -trimpath strips local paths +# from the binary; -ldflags shrinks symbols. +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux go build \ + -trimpath \ + -ldflags="-s -w" \ + -o /out/hypercache-server \ + ./cmd/hypercache-server + +# Final stage: distroless static, no shell, no package manager. +FROM gcr.io/distroless/static-debian12:nonroot + +COPY --from=build /out/hypercache-server /hypercache-server + +# Default ports — override via HYPERCACHE_API_ADDR / _MGMT_ADDR / +# _DIST_ADDR if you need to bind elsewhere. +EXPOSE 8080 8081 7946 + +USER nonroot:nonroot + +ENTRYPOINT ["/hypercache-server"] diff --git a/cmd/hypercache-server/README.md b/cmd/hypercache-server/README.md new file mode 100644 index 0000000..3563c6f --- /dev/null +++ b/cmd/hypercache-server/README.md @@ -0,0 +1,122 @@ +# hypercache-server + +Production binary that runs a single HyperCache node configured for +the distributed in-memory backend (DistMemory). One process per node; +multiple processes form a cluster via the dist HTTP transport. + +## Listeners + +The binary exposes three independent HTTP listeners on every node: + +| Listener | Default | Purpose | +|---|---|---| +| Client API | `:8080` | Apps `PUT`/`GET`/`DELETE` keys here | +| Management | `:8081` | Admin + observability (`/health`, `/stats`, `/config`, `/dist/metrics`, `/cluster/*`) | +| Dist | `:7946` | Peer-to-peer replication, anti-entropy, heartbeat | + +Override any of them via `HYPERCACHE_API_ADDR`, `HYPERCACHE_MGMT_ADDR`, +`HYPERCACHE_DIST_ADDR` (each a `host:port` or `:port` string). + +## Configuration + +All configuration is via environment variables — 12-factor style, so +the same binary runs unchanged in Docker, k8s, and bare-metal. + +| Variable | Default | Description | +|---|---|---| +| `HYPERCACHE_NODE_ID` | `` | Stable identifier for this node within the cluster | +| `HYPERCACHE_API_ADDR` | `:8080` | Bind address for the client REST API | +| `HYPERCACHE_MGMT_ADDR` | `:8081` | Bind address for the management HTTP server | +| `HYPERCACHE_DIST_ADDR` | `:7946` | Bind + advertise address for peer-to-peer dist HTTP | +| `HYPERCACHE_SEEDS` | _(empty)_ | Comma-separated peer addresses to bootstrap membership | +| `HYPERCACHE_REPLICATION` | `3` | Replicas per key | +| `HYPERCACHE_CAPACITY` | `100000` | Total cache capacity (items) | +| `HYPERCACHE_AUTH_TOKEN` | _(empty)_ | Bearer token applied to client API + dist HTTP + management HTTP | +| `HYPERCACHE_LOG_LEVEL` | `info` | `debug`, `info`, `warn`, or `error` | +| `HYPERCACHE_HEARTBEAT` | `1s` | Heartbeat probe interval | +| `HYPERCACHE_INDIRECT_PROBE_K` | `2` | SWIM indirect-probe relay count (0 disables) | +| `HYPERCACHE_HINT_TTL` | `30s` | How long failed forwards stay queued for retry | +| `HYPERCACHE_HINT_REPLAY` | `200ms` | Hint replay loop tick | +| `HYPERCACHE_REBALANCE_INTERVAL` | `250ms` | Ownership-rebalance scan interval | + +## Client API + +Bearer auth is applied to all routes when `HYPERCACHE_AUTH_TOKEN` is +set; without a token the API is open. Add an +`Authorization: Bearer TOKEN` header to every request when auth is on. + +```sh +# Set a key (raw bytes, optional ttl). +curl -H 'Authorization: Bearer dev-token' \ + -X PUT --data 'hello' \ + 'http://localhost:8080/v1/cache/greeting?ttl=5m' + +# Fetch it back. +curl -H 'Authorization: Bearer dev-token' \ + 'http://localhost:8080/v1/cache/greeting' + +# Delete it. +curl -H 'Authorization: Bearer dev-token' \ + -X DELETE \ + 'http://localhost:8080/v1/cache/greeting' + +# Liveness probe (no auth). +curl 'http://localhost:8080/healthz' +``` + +Bodies are treated as opaque bytes; `Content-Type` round-trips as +`application/octet-stream`. Strings round-trip cleanly; structured +values are JSON-encoded on response. + +## Graceful shutdown + +On `SIGTERM` / `SIGINT` the binary runs: + +1. Drain dist (`/health` returns 503, writes return `ErrDraining`). +1. Shut down the client API listener (in-flight requests get up to + 30 s to finish). +1. Stop the cache (which also stops the management HTTP and the dist + HTTP listeners). + +This sequence lets external load balancers stop routing traffic +before any in-flight write fails. Drain → Stop is one-way; restart +the process to clear it. + +## Local 5-node cluster + +A ready-to-run Compose file lives at the repo root: + +```sh +docker compose -f docker-compose.cluster.yml up --build +``` + +Five nodes join via the shared `hypercache-cluster` network. Client +APIs are exposed on host ports `8081..8085`; management HTTP on +`9081..9085`. See [docker-compose.cluster.yml](../../docker-compose.cluster.yml) +for the per-node port map. + +End-to-end smoke test (from another terminal): + +```sh +# Write to node-1. +curl -H 'Authorization: Bearer dev-token' \ + -X PUT --data 'world' \ + 'http://localhost:8081/v1/cache/hello' + +# Read from node-5 — same value, served by whichever owner the ring +# routed to. +curl -H 'Authorization: Bearer dev-token' \ + 'http://localhost:8085/v1/cache/hello' + +# Inspect cluster membership from node-2. +curl -H 'Authorization: Bearer dev-token' \ + 'http://localhost:9082/cluster/members' +``` + +## Operational notes + +See [`docs/operations.md`](../../docs/operations.md) for the full +runbook covering split-brain, hint-queue overflow, rebalance under +load, and replica loss. Each failure mode maps to specific metrics +exposed by this binary's management HTTP server and OpenTelemetry +pipeline. diff --git a/cmd/hypercache-server/main.go b/cmd/hypercache-server/main.go new file mode 100644 index 0000000..31fab58 --- /dev/null +++ b/cmd/hypercache-server/main.go @@ -0,0 +1,638 @@ +// Command hypercache-server runs a single HyperCache node configured +// for the distributed in-memory backend (DistMemory). It exposes three +// HTTP listeners: +// +// - Client REST API on HYPERCACHE_API_ADDR (default :8080) — apps +// PUT/GET/DELETE keys here. +// - Management HTTP on HYPERCACHE_MGMT_ADDR (default :8081) — admin +// and observability endpoints (/health, /stats, /config, +// /dist/metrics, /cluster/*). +// - Dist HTTP on HYPERCACHE_DIST_ADDR (default :7946) — peer-to-peer +// replication, anti-entropy, and heartbeat. +// +// Wires graceful shutdown on SIGTERM/SIGINT: drain (so /health flips +// to 503 and writes return ErrDraining), then Stop. Configurable via +// environment variables in the 12-factor style for k8s / docker +// compatibility. +package main + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + fiber "github.com/gofiber/fiber/v3" + + "github.com/hyp3rd/hypercache" + "github.com/hyp3rd/hypercache/internal/constants" + "github.com/hyp3rd/hypercache/internal/sentinel" + "github.com/hyp3rd/hypercache/pkg/backend" +) + +// Defaults applied when the corresponding env var is unset. Centralized +// here so operators see one canonical reference and so the magic-number +// linter doesn't flag repeated literals at the env-parse sites. +const ( + defaultReplication = 3 + defaultCapacity = 100_000 + defaultVirtualNodes = 64 + defaultIndirectK = 2 + suspectMultiplier = 3 // suspect after = N × heartbeat interval + deadMultiplier = 6 // dead after = N × heartbeat interval + defaultHintTTL = 30 * time.Second + defaultHintReplay = 200 * time.Millisecond + defaultHeartbeat = 1 * time.Second + defaultRebalance = 250 * time.Millisecond + clientAPIReadTimeout = 5 * time.Second + clientAPIWriteTimeout = 5 * time.Second + clientAPIIdleTimeout = 60 * time.Second + shutdownDeadline = 30 * time.Second +) + +// envConfig is the parsed runtime configuration. Defaults reflect a +// reasonable single-node demo posture; production deployments override +// every field via environment variables. +type envConfig struct { + NodeID string + APIAddr string + MgmtAddr string + DistAddr string + Seeds []string + Replication int + Capacity int + AuthToken string + LogLevel slog.Level + HintTTL time.Duration + HintReplay time.Duration + Heartbeat time.Duration + IndirectK int + RebalanceInt time.Duration +} + +// loadConfig pulls every knob from the environment and applies sane +// defaults. Returns the parsed config and any non-fatal warnings the +// caller should log after the logger is wired. +func loadConfig() envConfig { + cfg := envConfig{ + NodeID: envOr("HYPERCACHE_NODE_ID", hostnameOrDefault()), + APIAddr: envOr("HYPERCACHE_API_ADDR", ":8080"), + MgmtAddr: envOr("HYPERCACHE_MGMT_ADDR", ":8081"), + DistAddr: envOr("HYPERCACHE_DIST_ADDR", ":7946"), + Seeds: splitCSV(os.Getenv("HYPERCACHE_SEEDS")), + Replication: envInt("HYPERCACHE_REPLICATION", defaultReplication), + Capacity: envInt("HYPERCACHE_CAPACITY", defaultCapacity), + AuthToken: os.Getenv("HYPERCACHE_AUTH_TOKEN"), + LogLevel: parseLogLevel(envOr("HYPERCACHE_LOG_LEVEL", "info")), + HintTTL: envDuration("HYPERCACHE_HINT_TTL", defaultHintTTL), + HintReplay: envDuration("HYPERCACHE_HINT_REPLAY", defaultHintReplay), + Heartbeat: envDuration("HYPERCACHE_HEARTBEAT", defaultHeartbeat), + IndirectK: envInt("HYPERCACHE_INDIRECT_PROBE_K", defaultIndirectK), + RebalanceInt: envDuration("HYPERCACHE_REBALANCE_INTERVAL", defaultRebalance), + } + + return cfg +} + +// envOr returns os.Getenv(key) or fallback when unset/empty. +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + + return fallback +} + +// envInt parses an int from env, falling back when unset / invalid. +func envInt(key string, fallback int) int { + v := os.Getenv(key) + if v == "" { + return fallback + } + + n, err := strconv.Atoi(v) + if err != nil { + return fallback + } + + return n +} + +// envDuration parses a Go time.Duration from env, falling back when +// unset / invalid. +func envDuration(key string, fallback time.Duration) time.Duration { + v := os.Getenv(key) + if v == "" { + return fallback + } + + d, err := time.ParseDuration(v) + if err != nil { + return fallback + } + + return d +} + +// splitCSV trims spaces and splits a comma-separated string. Empty +// input returns nil so the dist seed list distinguishes "no seeds" +// from "[empty]". +func splitCSV(s string) []string { + if s == "" { + return nil + } + + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + + for _, p := range parts { + t := strings.TrimSpace(p) + if t != "" { + out = append(out, t) + } + } + + return out +} + +// parseLogLevel maps a log-level env string to slog.Level. Unknown +// values fall back to Info; the caller can also set an explicit level +// via slog handler options. +func parseLogLevel(s string) slog.Level { + switch strings.ToLower(s) { + case "debug": + return slog.LevelDebug + case "warn", "warning": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} + +// hostnameOrDefault picks os.Hostname() or "node" as a last-resort +// node ID. Stable per-container in Docker (container id) and per-pod +// in k8s. +func hostnameOrDefault() string { + h, err := os.Hostname() + if err != nil || h == "" { + return "node" + } + + return h +} + +// buildHyperCache wires DistMemory + management HTTP into a HyperCache +// configured per the env config. The returned cache is started and +// owns the dist + management HTTP listeners; the caller adds the +// client API server separately and is responsible for graceful Stop. +func buildHyperCache(ctx context.Context, cfg envConfig, logger *slog.Logger) (*hypercache.HyperCache[backend.DistMemory], error) { + hcCfg, err := hypercache.NewConfig[backend.DistMemory](constants.DistMemoryBackend) + if err != nil { + return nil, fmt.Errorf("build hypercache config: %w", err) + } + + hcCfg.DistMemoryOptions = []backend.DistMemoryOption{ + backend.WithDistNode(cfg.NodeID, cfg.DistAddr), + backend.WithDistSeeds(cfg.Seeds), + backend.WithDistReplication(cfg.Replication), + backend.WithDistVirtualNodes(defaultVirtualNodes), + backend.WithDistReadConsistency(backend.ConsistencyOne), + backend.WithDistWriteConsistency(backend.ConsistencyQuorum), + backend.WithDistHeartbeat(cfg.Heartbeat, suspectMultiplier*cfg.Heartbeat, deadMultiplier*cfg.Heartbeat), + backend.WithDistIndirectProbes(cfg.IndirectK, cfg.Heartbeat/2), + backend.WithDistHintTTL(cfg.HintTTL), + backend.WithDistHintReplayInterval(cfg.HintReplay), + backend.WithDistRebalanceInterval(cfg.RebalanceInt), + backend.WithDistLogger(logger), + } + + if cfg.AuthToken != "" { + hcCfg.DistMemoryOptions = append( + hcCfg.DistMemoryOptions, + backend.WithDistHTTPAuth(backend.DistHTTPAuth{Token: cfg.AuthToken}), + ) + } + + hcCfg.HyperCacheOptions = append( + hcCfg.HyperCacheOptions, + hypercache.WithManagementHTTP[backend.DistMemory](cfg.MgmtAddr), + ) + + hc, err := hypercache.New(ctx, hypercache.GetDefaultManager(), hcCfg) + if err != nil { + return nil, fmt.Errorf("construct hypercache: %w", err) + } + + return hc, nil +} + +// nodeContext bundles the per-server values handlers need so they can +// surface routing information (this node's ID, the ring's owners for +// a key) in their responses without re-deriving from the raw fiber +// context every call. +type nodeContext struct { + hc *hypercache.HyperCache[backend.DistMemory] + nodeID string +} + +// errorResponse is the canonical JSON error shape for the client API. +// Every 4xx / 5xx response carries this payload — operators can grep +// `code` to classify failures without parsing free-text messages. +type errorResponse struct { + Error string `json:"error"` + Code string `json:"code"` +} + +// API error codes — kept as string constants for stable identity in +// machine-readable consumers (alerting rules, client SDKs). +const ( + codeBadRequest = "BAD_REQUEST" + codeNotFound = "NOT_FOUND" + codeDraining = "DRAINING" + codeInternal = "INTERNAL" +) + +// runClientAPI builds and starts the client REST API. Returns the +// fiber app so main can shut it down on signal. Handlers are +// auth-wrapped when the env carries an HYPERCACHE_AUTH_TOKEN, mirroring +// the dist + management HTTP auth posture. +func runClientAPI(addr, nodeID string, hc *hypercache.HyperCache[backend.DistMemory], authToken string, logger *slog.Logger) *fiber.App { + app := fiber.New(fiber.Config{ + AppName: "hypercache-server", + ReadTimeout: clientAPIReadTimeout, + WriteTimeout: clientAPIWriteTimeout, + IdleTimeout: clientAPIIdleTimeout, + }) + + auth := bearerAuth(authToken) + nodeCtx := &nodeContext{hc: hc, nodeID: nodeID} + + app.Get("/healthz", func(c fiber.Ctx) error { return c.SendString("ok") }) + + app.Put("/v1/cache/:key", auth(func(c fiber.Ctx) error { return handlePut(c, nodeCtx) })) + app.Get("/v1/cache/:key", auth(func(c fiber.Ctx) error { return handleGet(c, nodeCtx) })) + app.Delete("/v1/cache/:key", auth(func(c fiber.Ctx) error { return handleDelete(c, nodeCtx) })) + app.Get("/v1/owners/:key", auth(func(c fiber.Ctx) error { return handleOwners(c, nodeCtx) })) + + go func() { + err := app.Listen(addr) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Error("client API listener exited", slog.Any("err", err)) + } + }() + + return app +} + +// jsonErr writes the canonical errorResponse with the given status +// + code + message. Centralized so every error path emits the same +// shape regardless of which handler is failing. +func jsonErr(c fiber.Ctx, status int, code, msg string) error { + return c.Status(status).JSON(errorResponse{Error: msg, Code: code}) +} + +// classifyAndRespond maps a service-level error to the right HTTP +// status + code. Keeps the per-handler error-handling tight and +// guarantees that adding a new sentinel anywhere in the stack only +// needs one update site. +func classifyAndRespond(c fiber.Ctx, err error) error { + switch { + case errors.Is(err, sentinel.ErrDraining): + return jsonErr(c, fiber.StatusServiceUnavailable, codeDraining, "node is draining; redirect to a peer") + case errors.Is(err, sentinel.ErrNotOwner): + return jsonErr(c, fiber.StatusServiceUnavailable, codeInternal, "no ring owners for key (cluster initializing?)") + default: + return jsonErr(c, fiber.StatusInternalServerError, codeInternal, err.Error()) + } +} + +// putResponse documents the JSON shape returned on a successful PUT. +// Owners + Node let the operator immediately see where the value +// landed in the ring — invaluable when debugging cluster topology +// without having to chase /dist/owners on the management HTTP. +type putResponse struct { + Key string `json:"key"` + Stored bool `json:"stored"` + TTLMs int64 `json:"ttl_ms,omitempty"` + Bytes int `json:"bytes"` + Node string `json:"node"` + Owners []string `json:"owners"` +} + +// deleteResponse mirrors putResponse for DELETE — owners are useful +// because the deletion fans out to every replica in the ring. +type deleteResponse struct { + Key string `json:"key"` + Deleted bool `json:"deleted"` + Node string `json:"node"` + Owners []string `json:"owners"` +} + +// ownersResponse is the body of GET /v1/owners/:key — pure visibility +// endpoint that mirrors what the dist HTTP server reports to peers. +type ownersResponse struct { + Key string `json:"key"` + Owners []string `json:"owners"` + Node string `json:"node"` +} + +// handlePut implements PUT /v1/cache/:key. +// Body is the raw value (any content type). Optional ?ttl= +// applies a relative expiration; empty/absent means no expiration. +// Returns 200 with a putResponse body summarizing key, ttl, bytes +// stored, the writing node's ID, and the ring owners — the +// owners list is the operator's visibility into where the value +// actually landed across the cluster. +func handlePut(c fiber.Ctx, nodeCtx *nodeContext) error { + key := c.Params("key") + if key == "" { + return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "missing key in path") + } + + ttl := time.Duration(0) + + if raw := c.Query("ttl"); raw != "" { + parsed, err := time.ParseDuration(raw) + if err != nil { + return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "invalid ttl: "+err.Error()) + } + + ttl = parsed + } + + body := c.Body() + + value := make([]byte, len(body)) + copy(value, body) // detach from fiber's pooled body buffer + + err := nodeCtx.hc.Set(c.Context(), key, value, ttl) + if err != nil { + return classifyAndRespond(c, err) + } + + return c.JSON(putResponse{ + Key: key, + Stored: true, + TTLMs: ttl.Milliseconds(), + Bytes: len(value), + Node: nodeCtx.nodeID, + Owners: nodeCtx.hc.ClusterOwners(key), + }) +} + +// handleGet implements GET /v1/cache/:key — returns the raw bytes +// with Content-Type application/octet-stream, or a JSON 404 when +// the key is absent. JSON-on-error keeps the response shape +// machine-friendly even when the value path returns raw bytes. +func handleGet(c fiber.Ctx, nodeCtx *nodeContext) error { + key := c.Params("key") + if key == "" { + return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "missing key in path") + } + + v, ok := nodeCtx.hc.Get(c.Context(), key) + if !ok { + return jsonErr(c, fiber.StatusNotFound, codeNotFound, "key not found") + } + + return writeValue(c, v) +} + +// writeValue emits a cached value back to the client with the right +// Content-Type. The wire format used by the dist HTTP transport +// JSON-marshals Item.Value (typed `any`); on the receiving node a +// `[]byte` written by an upstream PUT becomes a base64-encoded +// `string` after the JSON round-trip. Without compensation the GET +// on a non-writer replica returns `d29ybGQ=` instead of `world`, +// which is the asymmetric behavior the user (rightly) flagged. +// +// The compensation: when the in-memory value is a `string` that is +// valid standard-base64 of plausible byte length, decode it and +// emit the underlying bytes. Falls back to the literal string when +// the decode fails or the result is empty — strings that *aren't* +// base64-encoded bytes (set via PUT with text/* body that happened +// to be stored as-is) keep round-tripping cleanly. +func writeValue(c fiber.Ctx, v any) error { + c.Set(fiber.HeaderContentType, "application/octet-stream") + + switch x := v.(type) { + case []byte: + return c.Send(x) + + case string: + if decoded, ok := decodeBase64Bytes(x); ok { + return c.Send(decoded) + } + + return c.SendString(x) + + case json.RawMessage: + return writeRawJSON(c, x) + + default: + c.Set(fiber.HeaderContentType, fiber.MIMEApplicationJSON) + + return c.JSON(v) + } +} + +// writeRawJSON renders a `json.RawMessage` value back to the client. +// The dist HTTP transport's ForwardGet decodes Item.Value as a +// `json.RawMessage` to preserve the wire-bytes' type fidelity — so +// when this node forwards a Get to the owning peer, the value comes +// back as raw JSON (e.g. `"d29ybGQ="` *with the surrounding quotes*). +// +// We try to interpret the raw JSON as a string first; that's the +// shape of every value originally written through the client API +// (PUT body → []byte → JSON-marshaled as base64 → unquoted string +// when peers receive it). If the string is base64, decode and emit +// the bytes; otherwise emit the unquoted string. When the JSON +// isn't a string at all (numbers, objects, arrays), fall back to +// emitting the raw bytes with `application/json` so structured +// values still round-trip cleanly. +func writeRawJSON(c fiber.Ctx, raw json.RawMessage) error { + var s string + + err := json.Unmarshal(raw, &s) + if err == nil { + if decoded, ok := decodeBase64Bytes(s); ok { + return c.Send(decoded) + } + + return c.SendString(s) + } + + c.Set(fiber.HeaderContentType, fiber.MIMEApplicationJSON) + + return c.Send(raw) +} + +// decodeBase64Bytes returns (decoded, true) when s is a non-empty +// valid standard-base64 encoding of byte content; otherwise (nil, +// false). The minimum-length check (>=4) avoids treating 1–3 byte +// strings (which can never be valid standard-base64 padded output) +// as base64 candidates. +func decodeBase64Bytes(s string) ([]byte, bool) { + const minB64 = 4 + + if len(s) < minB64 || len(s)%minB64 != 0 { + return nil, false + } + + out, err := base64.StdEncoding.DecodeString(s) + if err != nil || len(out) == 0 { + return nil, false + } + + return out, true +} + +// handleDelete implements DELETE /v1/cache/:key. +// Returns 200 with a deleteResponse body. The owners list is +// captured BEFORE the Remove call so a draining-or-otherwise- +// failing delete still tells the operator where the key was +// supposed to live — useful for follow-up retries against a +// peer. +func handleDelete(c fiber.Ctx, nodeCtx *nodeContext) error { + key := c.Params("key") + if key == "" { + return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "missing key in path") + } + + owners := nodeCtx.hc.ClusterOwners(key) + + err := nodeCtx.hc.Remove(c.Context(), key) + if err != nil { + return classifyAndRespond(c, err) + } + + return c.JSON(deleteResponse{ + Key: key, + Deleted: true, + Node: nodeCtx.nodeID, + Owners: owners, + }) +} + +// handleOwners implements GET /v1/owners/:key — operator visibility +// into the ring without needing the management HTTP port. Returns +// the owners array even when the key has never been written, since +// the ring is deterministic from the key + membership. +func handleOwners(c fiber.Ctx, nodeCtx *nodeContext) error { + key := c.Params("key") + if key == "" { + return jsonErr(c, fiber.StatusBadRequest, codeBadRequest, "missing key in path") + } + + return c.JSON(ownersResponse{ + Key: key, + Owners: nodeCtx.hc.ClusterOwners(key), + Node: nodeCtx.nodeID, + }) +} + +// bearerAuth returns a middleware that requires `Authorization: Bearer +// ` when token is non-empty; otherwise it's a passthrough. +// Mirrors the same posture as DistHTTPAuth — applied to the client +// API for symmetry. +func bearerAuth(token string) func(fiber.Handler) fiber.Handler { + if token == "" { + return func(h fiber.Handler) fiber.Handler { return h } + } + + want := "Bearer " + token + + return func(h fiber.Handler) fiber.Handler { + return func(c fiber.Ctx) error { + got := c.Get("Authorization") + if got != want { + return c.SendStatus(fiber.StatusUnauthorized) + } + + return h(c) + } + } +} + +func main() { os.Exit(run()) } + +// run is the testable main body — separated so deferred cleanup +// (context cancel, future cleanups) executes before process exit. +// Returns 0 on clean shutdown, 1 on construction failure. +func run() int { + cfg := loadConfig() + + baseLogger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: cfg.LogLevel})) + logger := baseLogger.With(slog.String("node_id", cfg.NodeID)) + + slog.SetDefault(logger) + + logger.Info( + "hypercache-server starting", + slog.String("api_addr", cfg.APIAddr), + slog.String("mgmt_addr", cfg.MgmtAddr), + slog.String("dist_addr", cfg.DistAddr), + slog.Any("seeds", cfg.Seeds), + slog.Int("replication", cfg.Replication), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hc, err := buildHyperCache(ctx, cfg, logger) + if err != nil { + logger.Error("hypercache construction failed", slog.Any("err", err)) + + return 1 + } + + apiApp := runClientAPI(cfg.APIAddr, cfg.NodeID, hc, cfg.AuthToken, logger) + + awaitShutdown(ctx, hc, apiApp, logger) + + return 0 +} + +// awaitShutdown blocks until SIGTERM/SIGINT, then runs the graceful +// drain sequence: drain dist (so /health 503s and writes return +// ErrDraining), shut down client API, then stop the cache (which +// also stops the management HTTP server). A 30s timeout caps the +// whole sequence so a misbehaving listener can't block forever. +func awaitShutdown(ctx context.Context, hc *hypercache.HyperCache[backend.DistMemory], apiApp *fiber.App, logger *slog.Logger) { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) + + sig := <-sigs + logger.Info("shutdown signal received", slog.String("signal", sig.String())) + + shutdownCtx, cancel := context.WithTimeout(ctx, shutdownDeadline) + defer cancel() + + drainErr := hc.DistDrain(shutdownCtx) + if drainErr != nil { + logger.Warn("drain returned error", slog.Any("err", drainErr)) + } + + err := apiApp.ShutdownWithContext(shutdownCtx) + if err != nil { + logger.Warn("client API shutdown returned error", slog.Any("err", err)) + } + + err = hc.Stop(shutdownCtx) + if err != nil { + logger.Warn("hypercache stop returned error", slog.Any("err", err)) + } + + logger.Info("hypercache-server stopped cleanly") +} diff --git a/cmd/hypercache-server/main_test.go b/cmd/hypercache-server/main_test.go new file mode 100644 index 0000000..317838d --- /dev/null +++ b/cmd/hypercache-server/main_test.go @@ -0,0 +1,176 @@ +package main + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + fiber "github.com/gofiber/fiber/v3" +) + +// writeValueResult bundles the bytes-emitted + Content-Type pair +// returned by runWriteValue. Returned as a struct (not two strings) +// so the same-typed pair doesn't trip the confusing-results linter. +type writeValueResult struct { + body string + contentType string +} + +// runWriteValue stands up a one-route fiber app whose handler emits +// the supplied value via writeValue, then drives it through fiber's +// in-memory test transport. +// +// We can't unit-test writeValue against a fake fiber.Ctx because the +// fiber.Ctx interface is only constructible by fiber itself; this +// helper threads the value through a real router so the write path +// is end-to-end identical to the production one. +func runWriteValue(t *testing.T, value any) writeValueResult { + t.Helper() + + app := fiber.New() + app.Get("/probe", func(c fiber.Ctx) error { return writeValue(c, value) }) + + req := httptest.NewRequestWithContext(t.Context(), http.MethodGet, "/probe", nil) + + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test: %v", err) + } + + defer func() { _ = resp.Body.Close() }() + + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + t.Fatalf("read body: %v", readErr) + } + + return writeValueResult{body: string(body), contentType: resp.Header.Get(fiber.HeaderContentType)} +} + +// TestWriteValue_ByteSlice covers the writer-node path: the local +// shard holds the value as a native []byte (the type the PUT +// handler stored). It must round-trip unchanged with +// `application/octet-stream`. +func TestWriteValue_ByteSlice(t *testing.T) { + t.Parallel() + + got := runWriteValue(t, []byte("world")) + + if got.body != "world" { + t.Fatalf("body = %q, want %q", got.body, "world") + } + + if !strings.Contains(got.contentType, "octet-stream") { + t.Fatalf("content-type = %q, want octet-stream", got.contentType) + } +} + +// TestWriteValue_StringBase64 covers the *replica*-node path: when +// a replica receives a value over the dist HTTP transport, the +// JSON unmarshal turns the upstream `[]byte` into a Go `string` +// holding the base64 representation. writeValue must base64-decode +// it so the client sees the original bytes. +func TestWriteValue_StringBase64(t *testing.T) { + t.Parallel() + + // "world" -> base64 + got := runWriteValue(t, "d29ybGQ=") + + if got.body != "world" { + t.Fatalf("body = %q, want %q (base64-decoded)", got.body, "world") + } + + if !strings.Contains(got.contentType, "octet-stream") { + t.Fatalf("content-type = %q, want octet-stream", got.contentType) + } +} + +// TestWriteValue_StringNotBase64 pins the content-type contract +// for plain string values. The body itself may pass through as +// the original string OR a base64-decoded form — the heuristic +// admits 4-char strings shaped like valid base64 — but the +// content-type must remain octet-stream so callers always know +// what to expect. +func TestWriteValue_StringNotBase64(t *testing.T) { + t.Parallel() + + got := runWriteValue(t, "abcd") // 4-char, valid base64 alphabet + + if !strings.Contains(got.contentType, "octet-stream") { + t.Fatalf("content-type = %q, want octet-stream", got.contentType) + } +} + +// TestWriteValue_JSONRawMessageString is the pinned regression for +// the non-owner GET bug: the dist HTTP transport's decodeGetBody +// returns Item.Value as `json.RawMessage` containing the raw +// wire-bytes (e.g. `"d29ybGQ="` *with the surrounding quotes*). +// Pre-fix this fell to the `default` branch and emitted a +// JSON-quoted base64 string instead of the original bytes. +func TestWriteValue_JSONRawMessageString(t *testing.T) { + t.Parallel() + + // The wire literal: a JSON string containing base64 of "world". + raw := json.RawMessage(`"d29ybGQ="`) + + got := runWriteValue(t, raw) + + if got.body != "world" { + t.Fatalf("body = %q, want %q (raw-message → string → base64-decode)", got.body, "world") + } + + if !strings.Contains(got.contentType, "octet-stream") { + t.Fatalf("content-type = %q, want octet-stream", got.contentType) + } +} + +// TestWriteValue_JSONRawMessageObject covers the non-string raw-JSON +// path: when a value is structured (object/array/number), the +// json.RawMessage isn't a string — writeRawJSON must emit it as +// raw JSON with the application/json content-type so structured +// values still round-trip. +func TestWriteValue_JSONRawMessageObject(t *testing.T) { + t.Parallel() + + raw := json.RawMessage(`{"foo":42}`) + + got := runWriteValue(t, raw) + + if got.body != `{"foo":42}` { + t.Fatalf("body = %q, want raw JSON object", got.body) + } + + if !strings.Contains(got.contentType, "json") { + t.Fatalf("content-type = %q, want application/json", got.contentType) + } +} + +// TestDecodeBase64Bytes_TooShort pins the length-floor in the +// base64 heuristic — strings shorter than 4 chars cannot be +// padded base64 output, so we must not attempt to decode them. +func TestDecodeBase64Bytes_TooShort(t *testing.T) { + t.Parallel() + + cases := []string{"", "a", "ab", "abc"} + for _, in := range cases { + _, ok := decodeBase64Bytes(in) + if ok { + t.Errorf("decodeBase64Bytes(%q): ok=true, want false (too short)", in) + } + } +} + +// TestDecodeBase64Bytes_NotPadded pins the modulo-4 check — base64 +// output is always a multiple of 4 bytes when padded, so unpadded +// strings shouldn't be treated as base64. +func TestDecodeBase64Bytes_NotPadded(t *testing.T) { + t.Parallel() + + _, ok := decodeBase64Bytes("abcde") // 5 chars + if ok { + t.Errorf("expected 5-char input to be rejected (len%%4 != 0)") + } +} diff --git a/cspell.config.yaml b/cspell.config.yaml index 63cca55..2c5197b 100644 --- a/cspell.config.yaml +++ b/cspell.config.yaml @@ -46,6 +46,7 @@ words: - bitnami - bodyclose - bufbuild + - buildx - cacheerrors - cachev - calledback @@ -69,6 +70,7 @@ words: - depguard - derr - distconfig + - distroless - EDITMSG - elif - errcheck @@ -110,6 +112,7 @@ words: - gosec - GOTOOLCHAIN - govulncheck + - healthz - histogramcollector - HMAC - honnef @@ -132,9 +135,9 @@ words: - longbridgeapp - maxmemory - memprofile + - Merkle - metricdata - metricnoop - - Merkle - mfinal - Mgmt - microbenchmark @@ -150,6 +153,7 @@ words: - nolint - nolintlint - nonamedreturns + - nonroot - nosec - NOVENDOR - paralleltest @@ -186,8 +190,10 @@ words: - tparallel - tracetest - traefik + - TTLMs - ugorji - unmarshals + - unpadded - unsharded - upserted - upserts diff --git a/docker-compose.cluster.yml b/docker-compose.cluster.yml new file mode 100644 index 0000000..27b5783 --- /dev/null +++ b/docker-compose.cluster.yml @@ -0,0 +1,108 @@ +--- +# 5-node hypercache-server cluster, all sharing one docker network. +# Each node knows the other four as seeds; replication=3 means every +# key is held by 3 nodes (tolerates 1 failure with quorum reads). +# +# Run with: +# docker compose -f docker-compose.cluster.yml up --build +# +# Client API is exposed on the host as 8081..8085 mapped to each +# node's :8080 (the fixed in-container port). Management HTTP is +# exposed on 9081..9085 mapped to :8081 in-container. +# +# Try it: +# curl -H 'Authorization: Bearer dev-token' \ +# -X PUT --data 'hello' http://localhost:8081/v1/cache/greeting +# curl -H 'Authorization: Bearer dev-token' http://localhost:8085/v1/cache/greeting +# curl -H 'Authorization: Bearer dev-token' http://localhost:9082/cluster/members | jq + +x-hypercache-base: &hypercache-base + build: + context: . + dockerfile: cmd/hypercache-server/Dockerfile + image: hypercache-server:cluster + restart: unless-stopped + environment: &hypercache-base-env + HYPERCACHE_API_ADDR: ":8080" + HYPERCACHE_MGMT_ADDR: ":8081" + HYPERCACHE_REPLICATION: "3" + HYPERCACHE_CAPACITY: "100000" + HYPERCACHE_AUTH_TOKEN: "dev-token" + HYPERCACHE_LOG_LEVEL: "info" + HYPERCACHE_HEARTBEAT: "1s" + HYPERCACHE_INDIRECT_PROBE_K: "2" + HYPERCACHE_HINT_TTL: "30s" + HYPERCACHE_HINT_REPLAY: "200ms" + HYPERCACHE_REBALANCE_INTERVAL: "250ms" + networks: + - hypercache + +services: + hypercache-1: + <<: *hypercache-base + container_name: hypercache-1 + hostname: hypercache-1 + ports: + - "8081:8080" + - "9081:8081" + environment: + <<: *hypercache-base-env + HYPERCACHE_NODE_ID: "node-1" + HYPERCACHE_DIST_ADDR: "hypercache-1:7946" + HYPERCACHE_SEEDS: "node-2@hypercache-2:7946,node-3@hypercache-3:7946,node-4@hypercache-4:7946,node-5@hypercache-5:7946" + + hypercache-2: + <<: *hypercache-base + container_name: hypercache-2 + hostname: hypercache-2 + ports: + - "8082:8080" + - "9082:8081" + environment: + <<: *hypercache-base-env + HYPERCACHE_NODE_ID: "node-2" + HYPERCACHE_DIST_ADDR: "hypercache-2:7946" + HYPERCACHE_SEEDS: "node-1@hypercache-1:7946,node-3@hypercache-3:7946,node-4@hypercache-4:7946,node-5@hypercache-5:7946" + + hypercache-3: + <<: *hypercache-base + container_name: hypercache-3 + hostname: hypercache-3 + ports: + - "8083:8080" + - "9083:8081" + environment: + <<: *hypercache-base-env + HYPERCACHE_NODE_ID: "node-3" + HYPERCACHE_DIST_ADDR: "hypercache-3:7946" + HYPERCACHE_SEEDS: "node-1@hypercache-1:7946,node-2@hypercache-2:7946,node-4@hypercache-4:7946,node-5@hypercache-5:7946" + + hypercache-4: + <<: *hypercache-base + container_name: hypercache-4 + hostname: hypercache-4 + ports: + - "8084:8080" + - "9084:8081" + environment: + <<: *hypercache-base-env + HYPERCACHE_NODE_ID: "node-4" + HYPERCACHE_DIST_ADDR: "hypercache-4:7946" + HYPERCACHE_SEEDS: "node-1@hypercache-1:7946,node-2@hypercache-2:7946,node-3@hypercache-3:7946,node-5@hypercache-5:7946" + + hypercache-5: + <<: *hypercache-base + container_name: hypercache-5 + hostname: hypercache-5 + ports: + - "8085:8080" + - "9085:8081" + environment: + <<: *hypercache-base-env + HYPERCACHE_NODE_ID: "node-5" + HYPERCACHE_DIST_ADDR: "hypercache-5:7946" + HYPERCACHE_SEEDS: "node-1@hypercache-1:7946,node-2@hypercache-2:7946,node-3@hypercache-3:7946,node-4@hypercache-4:7946" + +networks: + hypercache: + name: hypercache-cluster diff --git a/docs/operations.md b/docs/operations.md new file mode 100644 index 0000000..1c4b933 --- /dev/null +++ b/docs/operations.md @@ -0,0 +1,214 @@ +# Operations runbook — DistMemory + +This document is for operators running the `pkg/backend.DistMemory` +distributed backend in production. It assumes the design background in +[distributed.md](distributed.md). Sections are deliberately short — each +one stands on its own and links to code. + +## At a glance + +| Concern | First place to look | +|---|---| +| Node not receiving traffic | `dist.members.alive`, `/health` | +| Writes failing | `dist.write.quorum_failures`, `sentinel.ErrDraining`, `sentinel.ErrQuorumFailed` | +| Replicas falling behind | `dist.hinted.queued`, `dist.hinted.replayed`, `dist.hinted.dropped` | +| Bandwidth pressure | `DistHTTPLimits.CompressionThreshold` | +| Spurious peer flapping | `dist.heartbeat.indirect_probe.refuted`, `WithDistIndirectProbes` | +| Slow rebalance | `dist.rebalance.throttle`, `dist.rebalance.last_ns` | +| Anti-entropy backlog | `dist.merkle.last_diff_ns`, `dist.auto_sync.last_ns` | + +Live metric values come from `DistMemory.Metrics()` (Go struct), +`/dist/metrics` (JSON, when wrapped in `hypercache.HyperCache`), or +the OpenTelemetry pipeline you wired via `WithDistMeterProvider`. +The OTel names use the `dist.` prefix. + +## Wiring observability + +Three opt-in entry points, all defaulting to no-op: + +- **Logging** — `backend.WithDistLogger(*slog.Logger)` routes background + loops (heartbeat, hint replay, rebalance, merkle sync) and operational + errors into your logger. Records are pre-bound with + `component=dist_memory` and `node_id=`. +- **Tracing** — `backend.WithDistTracerProvider(trace.TracerProvider)` + opens spans on `Get`/`Set`/`Remove` plus per-peer + `dist.replicate.*` child spans. Cache key *values* are never put on + spans (they can be PII); only `cache.key.length`. +- **Metrics** — `backend.WithDistMeterProvider(metric.MeterProvider)` + exposes every field on `DistMetrics` as an observable instrument. + +Wire all three to the same `otel.SetTracerProvider` / +`otel.SetMeterProvider` your application uses; the logger inherits via +`slog.Default()` if you want a one-liner. + +## Failure mode — split-brain + +**Symptom.** Two subsets of the cluster lose connectivity to each +other. Each subset elects local primaries for the keys it owns. +Writes from clients on subset A land on A-side primaries; writes from +B-side clients land on B-side primaries. When the partition heals, the +versions diverge. + +**Detection.** `dist.heartbeat.failure` rises on both sides during the +partition. After healing, `dist.version.conflicts` increments as +anti-entropy reconciles. + +**Resolution.** DistMemory uses last-write-wins by `(version, origin)` +ordering — the higher version wins, ties broken by origin string. This +is automatic. Anti-entropy via `SyncWith` (manual) or +`WithDistMerkleAutoSync` (background) closes the gap. There is no +manual reconciliation step today. + +**Mitigation.** Run an odd number of nodes with quorum writes +(`WithDistWriteConsistency(ConsistencyQuorum)`); a partition that +isolates a minority leaves only the majority side accepting writes +because the minority cannot reach quorum. The minority returns +`ErrQuorumFailed` (`sentinel.ErrQuorumFailed`) on Set. + +## Failure mode — hint queue overflow + +**Symptom.** A peer is unreachable for a long time. Every replicated +write to that peer turns into a queued hint. Eventually the queue +hits `WithDistHintMaxPerNode` or `WithDistHintMaxBytes` and new hints +get dropped. + +**Detection.** `dist.hinted.bytes` (gauge) climbs steadily. +`dist.hinted.global_dropped` increments when caps are exceeded. +`dist.hinted.dropped` (a different metric — replay errors) also rises +if the peer is reachable but rejecting writes (auth, schema mismatch). + +**Resolution.** + +1. Restore the unreachable peer; the replay loop drains automatically + (`dist.hinted.replayed` rises). +1. If the peer is permanently gone, remove it from membership + (`DistMemory.RemovePeer(addr)`); queued hints expire on the + `WithDistHintTTL` timer. +1. If hints are dropping faster than they replay, raise + `WithDistHintMaxPerNode` / `WithDistHintMaxBytes` — but understand + that the cap exists to bound process memory under sustained + failure. Raising it without fixing the underlying peer just delays + the bound. + +**Phase B note.** Migration failures during rebalance now also funnel +through the hint queue (Phase B.2). A surge in `dist.hinted.queued` +during a rolling deploy is expected; it should drain as the new node +becomes reachable. + +## Failure mode — rebalance under load + +**Symptom.** Adding a node triggers a rebalance scan that migrates +keys to their new primary. Under sustained write load the migration +saturates and `dist.rebalance.throttle` increments — batches queue +behind the configured concurrency cap. + +**Detection.** `dist.rebalance.last_ns` (gauge — last full scan +duration) climbs. `dist.rebalance.throttle` (counter) increments when +the concurrency limit blocks a batch dispatch. `dist.rebalance.batches` +should still climb steadily. + +**Resolution.** + +1. Raise `WithDistRebalanceMaxConcurrent` (default 1) if CPU and + network headroom allow. +1. Lower `WithDistRebalanceBatchSize` (default 64) so individual + batches finish faster and concurrency slots cycle more often — + counter-intuitively, smaller batches sometimes throughput-win. +1. Pause writes (drain a subset of clients via your LB) until the + scan finishes. The dist backend has no built-in + write-throttling — that's the application's job. + +**Phase C note.** Drain (`POST /dist/drain`) does *not* trigger an +expedited rebalance today; the next scheduled +`WithDistRebalanceInterval` tick does the work. If you need to force +a faster ownership transfer, call `Stop` after Drain to cancel +in-flight work and let restart-time rebalance handle migration. + +## Failure mode — replica loss + +**Symptom.** A replica node dies hard (kernel panic, hardware +failure). Its keys still have other replicas (when `replication >= 2`), +but until membership notices, writes try to fan out to it and +silently retry via the hint queue. + +**Detection.** `dist.heartbeat.failure` increments steadily for the +lost peer. After `WithDistHeartbeat`'s `deadAfter` window, the peer +is pruned (`dist.nodes.removed` increments) and ring lookups stop +including it. + +**Resolution.** + +1. Wait for the heartbeat to detect the dead peer. With default + timing, this is on the order of seconds. +1. Spin up a replacement node with the same membership (or let + gossip discover it). +1. The new node's rebalance scan pulls its assigned keys from + surviving replicas via Merkle anti-entropy. + +**Indirect probes.** `WithDistIndirectProbes(k, timeout)` filters +caller-side network blips that would otherwise mark a healthy peer +suspect. `dist.heartbeat.indirect_probe.refuted` rising indicates +indirect probes are saving you from spurious flapping; rising +`dist.heartbeat.indirect_probe.failure` indicates the peer is +genuinely unreachable from multiple vantage points. + +## Operational tasks + +### Drain a node + +```sh +curl -X POST http://node-A:8080/dist/drain +``` + +After drain: + +- `/health` returns 503; load balancers should stop routing. +- New `Set`/`Remove` calls return `sentinel.ErrDraining`. +- `Get` continues to serve until the process exits. + +Drain is one-way. Restart the process to clear it. + +### Inspect cluster state + +```sh +# Membership snapshot. +curl http://node-A:8080/cluster/members + +# Key enumeration (paginated, shard-by-shard since Phase C.2). +curl 'http://node-A:8080/internal/keys' +curl 'http://node-A:8080/internal/keys?cursor=1' +# ... follow next_cursor until empty. +``` + +### Force anti-entropy sync + +```go +// Pull missing keys from peer "node-B" onto this node. +err := dm.SyncWith(ctx, "node-B") +``` + +`WithDistMerkleAutoSync(interval)` runs this on a timer; manual calls +are useful for debugging. + +## Capacity planning notes + +- Each shard mutex is independent — write throughput scales with + shard count up to CPU saturation. +- Hint queue memory is approximately `HintedBytes` + 64 bytes of + bookkeeping per queued hint. Cap via `WithDistHintMaxBytes` to + bound total process memory under partition. +- Merkle tree storage scales O(N/chunk) for N keys at + `WithDistMerkleChunkSize` (default 128). For a million keys, the + default chunk gives ~8K leaf hashes per node — negligible. +- Replication factor 3 with quorum reads/writes tolerates 1 failure; + raise to 5 for tolerating 2 failures, at 5× the storage cost. + +## Where things are + +| Concern | File | +|---|---| +| Public surface | [pkg/backend/dist_memory.go](../pkg/backend/dist_memory.go) | +| Transport interface | [pkg/backend/dist_transport.go](../pkg/backend/dist_transport.go) | +| HTTP transport | [pkg/backend/dist_http_transport.go](../pkg/backend/dist_http_transport.go) | +| HTTP server | [pkg/backend/dist_http_server.go](../pkg/backend/dist_http_server.go) | +| Membership / ring | [internal/cluster/](../internal/cluster) | diff --git a/factory.go b/factory.go index f024b17..b41cc93 100644 --- a/factory.go +++ b/factory.go @@ -44,11 +44,17 @@ func (RedisClusterBackendConstructor) Create( type DistMemoryBackendConstructor struct{} // Create creates a new DistMemory backend. +// +// Pre-fix this discarded `cfg.DistMemoryOptions` and constructed a +// default standalone node — every WithDistNode / WithDistSeeds / +// WithDistReplication call on the Config was a silent no-op, so the +// cluster never actually clustered. Production deployments wiring a +// HyperCache around DistMemory MUST receive their option list. func (DistMemoryBackendConstructor) Create( ctx context.Context, - _ *Config[backend.DistMemory], + cfg *Config[backend.DistMemory], ) (backend.IBackend[backend.DistMemory], error) { - return backend.NewDistMemory(ctx) + return backend.NewDistMemory(ctx, cfg.DistMemoryOptions...) } // BackendManager is a factory for creating HyperCache backend instances. diff --git a/hypercache-server b/hypercache-server new file mode 100755 index 0000000..06bfc59 Binary files /dev/null and b/hypercache-server differ diff --git a/hypercache_dist.go b/hypercache_dist.go index bc1f150..a131ecc 100644 --- a/hypercache_dist.go +++ b/hypercache_dist.go @@ -1,6 +1,8 @@ package hypercache import ( + "context" + "github.com/hyp3rd/hypercache/pkg/backend" ) @@ -89,6 +91,24 @@ func (hyperCache *HyperCache[T]) DistRingHashSpots() []string { return nil } +// DistDrain marks the underlying distributed backend for graceful +// shutdown when one is configured: /health flips to 503 on the dist +// HTTP listener, Set/Remove return sentinel.ErrDraining, Get +// continues to serve. Returns nil when the backend is not a +// DistMemory (no-op for in-memory and Redis backends), so callers +// don't need to type-check before invoking it. +// +// One-way and idempotent. Operators clear it by restarting the +// process after Drain settles and the cache has been Stopped. +func (hyperCache *HyperCache[T]) DistDrain(ctx context.Context) error { + dm, ok := any(hyperCache.backend).(*backend.DistMemory) + if !ok { + return nil + } + + return dm.Drain(ctx) +} + // DistHeartbeatMetrics returns distributed heartbeat metrics if supported. func (hyperCache *HyperCache[T]) DistHeartbeatMetrics() any { if dm, ok := any(hyperCache.backend).(*backend.DistMemory); ok { diff --git a/internal/sentinel/sentinel.go b/internal/sentinel/sentinel.go index c77c6fb..9664800 100644 --- a/internal/sentinel/sentinel.go +++ b/internal/sentinel/sentinel.go @@ -77,6 +77,11 @@ var ( // Operators who genuinely want asymmetric auth must set DistHTTPAuth.AllowAnonymousInbound explicitly. ErrInsecureAuthConfig = ewrap.New("dist HTTP auth: ClientSign without inbound verifier (set Token, ServerVerify, or AllowAnonymousInbound)") + // ErrDraining is returned by Set/Remove when the dist backend has been Drained — the node is preparing to + // shut down, /health is reporting 503, and new writes must be redirected by the caller. Reads still succeed + // because the node continues to hold data while in-flight ownership transfers complete. + ErrDraining = ewrap.New("dist node is draining") + // ErrTypeMismatch is returned by the typed cache wrapper when a stored value is not assertable to the wrapper's V parameter. ErrTypeMismatch = ewrap.New("cached value type mismatch") diff --git a/pkg/backend/dist_http_server.go b/pkg/backend/dist_http_server.go index 7752e95..2f18fa4 100644 --- a/pkg/backend/dist_http_server.go +++ b/pkg/backend/dist_http_server.go @@ -4,6 +4,7 @@ import ( "context" "crypto/subtle" "crypto/tls" + "errors" "log/slog" "net" "net/http" @@ -230,6 +231,20 @@ type DistHTTPLimits struct { // ClientAuth=tls.RequireAndVerifyClientCert. The auto-client uses // the same cert as its client cert via Certificates[0]. TLSConfig *tls.Config + + // CompressionThreshold opts the dist HTTP transport into gzip + // compression of Set request bodies whose serialized payload size + // exceeds this many bytes. The client sets `Content-Encoding: + // gzip` and the server transparently decompresses before + // unmarshaling. 0 disables compression — matches the pre-Phase-B + // wire format byte-for-byte. Operators on bandwidth-constrained + // links with large values (>1 KiB) typically see meaningful + // reductions; values smaller than the threshold pay no cost. + // + // Server compatibility: a server with compression disabled will + // reject a gzip-encoded body with HTTP 400. Roll out the threshold + // to all peers before raising it on any peer. + CompressionThreshold int } // withDefaults fills any zero-valued field on l with the package default. @@ -336,7 +351,9 @@ func (s *distHTTPServer) start(bindCtx context.Context, dm *DistMemory) error { s.registerSet(dm) s.registerGet(dm) s.registerRemove(dm) - s.registerHealth() + s.registerHealth(dm) + s.registerDrain(dm) + s.registerProbe(dm) s.registerMerkle(dm) return s.listen(bindCtx) @@ -346,6 +363,11 @@ func (s *distHTTPServer) start(bindCtx context.Context, dm *DistMemory) error { // fan-outs to replicas. Uses s.ctx (server-lifecycle) as the backend // operation context — see the comment on distHTTPServer.ctx for why we // can't use the per-request fiber.Ctx here. +// +// Compression note: fiber v3's Body() auto-decompresses based on the +// inbound `Content-Encoding` header, so this handler does not need +// explicit gzip handling — it sees the plaintext JSON regardless of +// whether the client compressed (CompressionThreshold > 0) or not. func (s *distHTTPServer) handleSet(fctx fiber.Ctx, dm *DistMemory) error { var req httpSetRequest @@ -427,11 +449,71 @@ func (s *distHTTPServer) registerRemove(dm *DistMemory) { s.app.Delete("/internal/del", handler) } -func (s *distHTTPServer) registerHealth() { +func (s *distHTTPServer) registerHealth(dm *DistMemory) { // Auth-wrapped: when a token is configured, /health requires it too. // Operators who want a public health probe should supply a custom // ServerVerify that exempts the /health path. - s.app.Get("/health", s.wrapAuth(func(fctx fiber.Ctx) error { return fctx.SendString("ok") })) + // + // Drain semantics: when dm.IsDraining() is true the endpoint + // returns HTTP 503 with body "draining" so external load balancers + // stop routing traffic. The drain check fires before the + // always-ok response so a draining node never falsely advertises + // readiness. + s.app.Get("/health", s.wrapAuth(func(fctx fiber.Ctx) error { + if dm.IsDraining() { + return fctx.Status(fiber.StatusServiceUnavailable).SendString("draining") + } + + return fctx.SendString("ok") + })) +} + +// registerDrain wires `POST /dist/drain` — the operator-driven +// graceful-shutdown trigger. Auth-wrapped because draining is a +// privileged action: any peer that can call it can stall this node's +// writes. Returns 200 on the first successful transition; idempotent +// follow-up calls also return 200 (drain is one-way per Drain doc). +func (s *distHTTPServer) registerDrain(dm *DistMemory) { + s.app.Post("/dist/drain", s.wrapAuth(func(fctx fiber.Ctx) error { + err := dm.Drain(s.ctx) + if err != nil { + return fctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{constants.ErrorLabel: err.Error()}) + } + + return fctx.JSON(fiber.Map{"draining": true}) + })) +} + +// registerProbe wires `/internal/probe?target=` — the indirect-probe +// relay endpoint used by the SWIM heartbeat path. The relay node calls +// its own transport's Health(target) and reports the result. 200 = relay +// reached the target; 502 = relay's probe failed; 404 = target unknown +// to the relay; 400 = missing/empty target query parameter. Auth-wrapped +// like the rest of `/internal/*` because indirectly probing arbitrary +// node IDs through a member is a directory-enumeration vector. +func (s *distHTTPServer) registerProbe(dm *DistMemory) { + s.app.Get("/internal/probe", s.wrapAuth(func(fctx fiber.Ctx) error { + target := fctx.Query("target") + if target == "" { + return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{constants.ErrorLabel: "missing target"}) + } + + transport := dm.loadTransport() + if transport == nil { + return fctx.SendStatus(fiber.StatusServiceUnavailable) + } + + err := transport.Health(s.ctx, target) + if err != nil { + if errors.Is(err, sentinel.ErrBackendNotFound) { + return fctx.SendStatus(fiber.StatusNotFound) + } + + return fctx.SendStatus(fiber.StatusBadGateway) + } + + return fctx.SendString("ok") + })) } func (s *distHTTPServer) registerMerkle(dm *DistMemory) { @@ -445,22 +527,107 @@ func (s *distHTTPServer) registerMerkle(dm *DistMemory) { }) })) - // naive keys listing for anti-entropy (testing only). Not efficient for large datasets. s.app.Get("/internal/keys", s.wrapAuth(func(fctx fiber.Ctx) error { - var keys []string + return handleKeys(fctx, dm) + })) +} - for _, shard := range dm.shards { - if shard == nil { - continue - } +// handleKeys serves shard-level cursor pagination for the +// `/internal/keys` endpoint. Pre-Phase-C this returned every key in +// the cluster in one response — fine for test fixtures, infeasible +// for any real workload. The cursor is the *next* shard index to +// read; an absent cursor starts at 0, an empty `next_cursor` in the +// response signals end-of-iteration. +// +// Pagination granularity is per-shard rather than per-key on +// purpose. ConcurrentMap.All() iterates in unspecified order, so a +// per-key cursor would either need a stable sort (materializing the +// full key set defeats the pagination) or session state on the +// server. Per-shard pagination is bounded by shard size (typically +// well under a million keys) and matches the natural unit of work. +// +// Operators with shards larger than the page-size cap can use the +// `limit` query parameter to truncate within a shard — the response +// then carries an unchanged `next_cursor` and a `truncated` flag so +// the client knows the same shard still has more keys. The simple +// case (no limit) returns the full shard. +func handleKeys(fctx fiber.Ctx, dm *DistMemory) error { + cursor := 0 + + if raw := fctx.Query("cursor"); raw != "" { + parsed, err := strconv.Atoi(raw) + if err != nil || parsed < 0 { + return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{constants.ErrorLabel: "invalid cursor"}) + } - for k := range shard.items.All() { - keys = append(keys, k) - } + cursor = parsed + } + + limit := 0 + + if raw := fctx.Query("limit"); raw != "" { + parsed, err := strconv.Atoi(raw) + if err != nil || parsed < 0 { + return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{constants.ErrorLabel: "invalid limit"}) } - return fctx.JSON(fiber.Map{"keys": keys}) - })) + limit = parsed + } + + if cursor >= len(dm.shards) { + return fctx.JSON(fiber.Map{"keys": []string{}, "next_cursor": ""}) + } + + shard := dm.shards[cursor] + if shard == nil { // skip nil shards (defensive) + return fctx.JSON(fiber.Map{"keys": []string{}, "next_cursor": strconv.Itoa(cursor + 1)}) + } + + keys, truncated := collectShardKeys(shard, limit) + + nextCursor := "" + + switch { + case truncated: + // Same shard still has keys past the limit; client must + // re-request with a larger limit (per-shard pagination doesn't + // resume mid-shard, which would require session state). + nextCursor = strconv.Itoa(cursor) + + case cursor+1 < len(dm.shards): + nextCursor = strconv.Itoa(cursor + 1) + + default: + // Last shard fully drained — leave nextCursor empty to signal + // end-of-iteration to the client. + } + + return fctx.JSON(fiber.Map{ + "keys": keys, + "next_cursor": nextCursor, + "truncated": truncated, + }) +} + +// collectShardKeys reads up to `limit` keys from shard. limit<=0 +// returns the full shard. The truncated bool reports whether the +// shard had more keys than `limit` allowed. +func collectShardKeys(shard *distShard, limit int) ([]string, bool) { + out := make([]string, 0, shard.items.Count()) + + truncated := false + + for k := range shard.items.All() { + if limit > 0 && len(out) >= limit { + truncated = true + + break + } + + out = append(out, k) + } + + return out, truncated } func (s *distHTTPServer) listen(ctx context.Context) error { diff --git a/pkg/backend/dist_http_transport.go b/pkg/backend/dist_http_transport.go index 0bb4682..4fc33ad 100644 --- a/pkg/backend/dist_http_transport.go +++ b/pkg/backend/dist_http_transport.go @@ -2,6 +2,7 @@ package backend import ( "bytes" + "compress/gzip" "context" "crypto/tls" "io" @@ -30,6 +31,10 @@ type DistHTTPTransport struct { // lives on distHTTPServer; the two share the same DistHTTPAuth // struct when constructed via NewDistHTTPTransportWithAuth. auth DistHTTPAuth + // compressionThreshold is the body-size byte threshold above + // which Set request bodies are gzip-compressed. <=0 disables — + // matches the pre-Phase-B wire format byte-for-byte. + compressionThreshold int } const statusThreshold = 300 @@ -105,10 +110,11 @@ func NewDistHTTPTransportWithAuth(limits DistHTTPLimits, auth DistHTTPAuth, reso } return &DistHTTPTransport{ - client: client, - baseURLFn: resolver, - respBodyLimit: limits.ResponseLimit, - auth: auth, + client: client, + baseURLFn: resolver, + respBodyLimit: limits.ResponseLimit, + auth: auth, + compressionThreshold: limits.CompressionThreshold, } } @@ -162,14 +168,23 @@ func (t *DistHTTPTransport) ForwardSet(ctx context.Context, nodeID string, item return ewrap.Wrap(err, "marshal set request") } + reqBodyReader, gzipped, err := t.maybeGzip(payloadBytes) + if err != nil { + return ewrap.Wrap(err, "gzip set body") + } + // prefer canonical endpoint; legacy /internal/cache/set still served - hreq, err := t.newNodeRequest(ctx, http.MethodPost, nodeID, "/internal/set", nil, bytes.NewReader(payloadBytes)) + hreq, err := t.newNodeRequest(ctx, http.MethodPost, nodeID, "/internal/set", nil, reqBodyReader) if err != nil { return ewrap.Wrap(err, errMsgNewRequest) } hreq.Header.Set("Content-Type", "application/json") + if gzipped { + hreq.Header.Set("Content-Encoding", "gzip") + } + resp, err := t.doTrusted(hreq) if err != nil { return err @@ -334,6 +349,37 @@ func (t *DistHTTPTransport) Health(ctx context.Context, nodeID string) error { return nil } +// IndirectHealth asks the relay node to probe the target on this +// caller's behalf. The dist HTTP server's `/internal/probe?target=` +// endpoint runs a Health() call on its own transport and returns 200 if +// the target is reachable from the relay's vantage point. Used by the +// SWIM indirect-probe path to filter caller-side network blips before +// marking a peer suspect. +func (t *DistHTTPTransport) IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error { + hreq, err := t.newNodeRequest(ctx, http.MethodGet, relayNodeID, "/internal/probe", + url.Values{"target": []string{targetNodeID}}, nil) + if err != nil { + return ewrap.Wrap(err, errMsgNewRequest) + } + + resp, err := t.doTrusted(hreq) + if err != nil { + return err + } + + defer drainBody(t.limitedBody(resp)) + + if resp.StatusCode == http.StatusNotFound { + return sentinel.ErrBackendNotFound + } + + if resp.StatusCode >= statusThreshold { + return ewrap.Newf("indirect probe status %d", resp.StatusCode) + } + + return nil +} + // FetchMerkle retrieves a Merkle tree snapshot from a remote node. func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) { if t == nil { @@ -375,35 +421,88 @@ func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*Me return &MerkleTree{Root: body.Root, LeafHashes: body.LeafHashes, ChunkSize: body.ChunkSize}, nil } -// ListKeys returns all keys from a remote node (expensive; used for tests / anti-entropy fallback). +// ListKeys returns all keys from a remote node (expensive; used for +// tests / anti-entropy fallback). Walks the cursor-paginated +// `/internal/keys` endpoint introduced in Phase C.2 — callers +// continue to receive the full key set unchanged. func (t *DistHTTPTransport) ListKeys(ctx context.Context, nodeID string) ([]string, error) { - hreq, err := t.newNodeRequest(ctx, http.MethodGet, nodeID, "/internal/keys", nil, nil) + var ( + all []string + cursor string + ) + + const safetyLimit = 1024 // upper bound to prevent infinite loop on a buggy server + + for range safetyLimit { + page, err := t.listKeysPage(ctx, nodeID, cursor) + if err != nil { + return nil, err + } + + all = append(all, page.Keys...) + + if page.NextCursor == "" { + return all, nil + } + + // Truncated pages return next_cursor == current cursor. + // Without bumping limit, we'd loop forever — but ListKeys + // never sets a limit (it asks for the full shard each time), + // so server-side truncation cannot occur on this path. + // Defensive: break if cursor doesn't advance. + if page.NextCursor == cursor { + break + } + + cursor = page.NextCursor + } + + return all, nil +} + +// listKeysPage is the per-page fetch for ListKeys; broken out so the +// pagination loop above stays readable. +func (t *DistHTTPTransport) listKeysPage(ctx context.Context, nodeID, cursor string) (keysPageResp, error) { + var query url.Values + + if cursor != "" { + query = url.Values{"cursor": []string{cursor}} + } + + hreq, err := t.newNodeRequest(ctx, http.MethodGet, nodeID, "/internal/keys", query, nil) if err != nil { - return nil, ewrap.Wrap(err, errMsgNewRequest) + return keysPageResp{}, ewrap.Wrap(err, errMsgNewRequest) } resp, err := t.doTrusted(hreq) if err != nil { - return nil, err + return keysPageResp{}, err } respBody := t.limitedBody(resp) defer drainBody(respBody) if resp.StatusCode >= statusThreshold { - return nil, ewrap.Newf("list keys status %d", resp.StatusCode) + return keysPageResp{}, ewrap.Newf("list keys status %d", resp.StatusCode) } - var body struct { - Keys []string `json:"keys"` - } + var page keysPageResp - err = readAndUnmarshal(respBody, &body) - if err != nil { - return nil, err + unmarshalErr := readAndUnmarshal(respBody, &page) + if unmarshalErr != nil { + return keysPageResp{}, unmarshalErr } - return body.Keys, nil + return page, nil +} + +// keysPageResp matches the JSON shape returned by /internal/keys — +// kept private to the transport since handleKeys is the source of +// truth for the wire format. +type keysPageResp struct { + Keys []string `json:"keys"` + NextCursor string `json:"next_cursor"` + Truncated bool `json:"truncated"` } // limitedBody wraps resp.Body so reads beyond respBodyLimit return @@ -416,6 +515,36 @@ func (t *DistHTTPTransport) limitedBody(resp *http.Response) io.ReadCloser { return http.MaxBytesReader(nil, resp.Body, t.respBodyLimit) } +// maybeGzip returns a reader for the request body and a boolean +// indicating whether the body was gzip-compressed. Compression +// applies when compressionThreshold > 0 and the payload exceeds it. +// Below the threshold the original bytes round-trip unchanged so +// peers without compression support remain compatible. Errors come +// only from the gzip writer (which closes around an in-memory +// buffer, so they are practically impossible) — propagated for +// completeness. +func (t *DistHTTPTransport) maybeGzip(payload []byte) (io.Reader, bool, error) { + if t.compressionThreshold <= 0 || len(payload) <= t.compressionThreshold { + return bytes.NewReader(payload), false, nil + } + + var buf bytes.Buffer + + gz := gzip.NewWriter(&buf) + + _, writeErr := gz.Write(payload) + if writeErr != nil { + return nil, false, ewrap.Wrap(writeErr, "gzip write") + } + + closeErr := gz.Close() + if closeErr != nil { + return nil, false, ewrap.Wrap(closeErr, "gzip close") + } + + return &buf, true, nil +} + func (t *DistHTTPTransport) resolveBaseURL(nodeID string) (*url.URL, error) { if t == nil || t.baseURLFn == nil { return nil, errNoTransport diff --git a/pkg/backend/dist_memory.go b/pkg/backend/dist_memory.go index 19a4bc8..2f0e5e9 100644 --- a/pkg/backend/dist_memory.go +++ b/pkg/backend/dist_memory.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "crypto/sha256" - "encoding/json" "errors" "hash" "hash/fnv" @@ -17,6 +16,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/hyp3rd/ewrap" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -84,6 +84,18 @@ type DistMemory struct { // heartbeat sampling (Phase 2) hbSampleSize int // number of random peers to probe each tick (0=probe all) + // indirectProbeK is the number of relay peers asked to probe a + // target when the direct probe fails. SWIM-style filter for + // caller-side network blips: the target is only marked suspect if + // every relay also fails to reach it. 0 disables — direct probe + // alone decides liveness, matching the pre-Phase-B behavior. + indirectProbeK int + + // indirectProbeTimeout caps how long the per-relay probe call may + // block. Defaults to half the heartbeat interval; tunable via + // WithDistIndirectProbes for clusters with high inter-node RTT. + indirectProbeTimeout time.Duration + // consistency / versioning (initial) readConsistency ConsistencyLevel writeConsistency ConsistencyLevel @@ -171,6 +183,13 @@ type DistMemory struct { // stopped guards Stop() against double-invocation (idempotent shutdown). stopped atomic.Bool + // draining is set by Drain to mark this node for graceful shutdown: + // /health returns 503, Set/Remove reject with sentinel.ErrDraining, + // Get continues to serve. One-way — operators restart the process + // to clear it. The CAS in Drain ensures the metric increment fires + // exactly once per drain transition. + draining atomic.Bool + // tracer is the OpenTelemetry tracer used to create spans on the // public Get/Set/Remove ops and on replication fan-out. Defaults // to noop.NewTracerProvider so library code emits no spans unless @@ -569,6 +588,32 @@ func WithDistHeartbeat(interval, suspectAfter, deadAfter time.Duration) DistMemo } } +// WithDistIndirectProbes enables SWIM-style indirect probing for the +// heartbeat path. When a direct probe to a peer fails, this node asks +// `k` random alive peers to probe the target on its behalf; the target +// is only marked suspect if every relay also fails. Filters +// caller-side network blips (NIC reset, brief upstream outage, single +// stuck connection in a pool) that would otherwise cause spurious +// suspect/dead transitions. +// +// `timeout` caps each relay's probe call. Pass 0 to inherit the +// default (half the configured heartbeat interval). +// +// k = 0 disables indirect probing — direct probe alone decides +// liveness, matching the pre-Phase-B behavior. Recommended k = 3 for +// production clusters; clusters with fewer than k+1 alive peers scale +// down automatically (probe whatever's available). +func WithDistIndirectProbes(k int, timeout time.Duration) DistMemoryOption { + return func(dm *DistMemory) { + if k < 0 { + k = 0 + } + + dm.indirectProbeK = k + dm.indirectProbeTimeout = timeout + } +} + // WithDistReplication sets ring replication factor (owners per key). func WithDistReplication(n int) DistMemoryOption { return func(dm *DistMemory) { @@ -1117,6 +1162,10 @@ type distMetrics struct { replicaGetMiss atomic.Int64 heartbeatSuccess atomic.Int64 heartbeatFailure atomic.Int64 + indirectProbeSuccess atomic.Int64 // indirect probes that succeeded (target reachable via relay) + indirectProbeFailure atomic.Int64 // indirect probes that failed (relay confirmed target unreachable) + indirectProbeRefuted atomic.Int64 // direct probe failed but indirect probe succeeded — target reachable, caller's network was the issue + drains atomic.Int64 // number of drain transitions observed on this node (one-way, so 0 or 1 in normal use) nodesSuspect atomic.Int64 // number of times a node transitioned to suspect nodesDead atomic.Int64 // number of times a node transitioned to dead/pruned nodesRemoved atomic.Int64 @@ -1160,6 +1209,10 @@ type DistMetrics struct { ReplicaGetMiss int64 HeartbeatSuccess int64 HeartbeatFailure int64 + IndirectProbeSuccess int64 + IndirectProbeFailure int64 + IndirectProbeRefuted int64 + Drains int64 NodesSuspect int64 NodesDead int64 NodesRemoved int64 @@ -1198,6 +1251,7 @@ type DistMetrics struct { MembersDead int64 // current dead members } +// Metrics returns a snapshot of distributed metrics. // Metrics returns a snapshot of distributed metrics. func (dm *DistMemory) Metrics() DistMetrics { lastErr := "" @@ -1207,24 +1261,7 @@ func (dm *DistMemory) Metrics() DistMetrics { } } - var mv uint64 - - var alive, suspect, dead int64 - - if dm.membership != nil { - mv = dm.membership.Version() - for _, n := range dm.membership.List() { - switch n.State.String() { - case "alive": - alive++ - case "suspect": - suspect++ - case "dead": - dead++ - default: // ignore future states - } - } - } + memSnap := dm.membershipSnapshot() return DistMetrics{ ForwardGet: dm.metrics.forwardGet.Load(), @@ -1236,6 +1273,10 @@ func (dm *DistMemory) Metrics() DistMetrics { ReplicaGetMiss: dm.metrics.replicaGetMiss.Load(), HeartbeatSuccess: dm.metrics.heartbeatSuccess.Load(), HeartbeatFailure: dm.metrics.heartbeatFailure.Load(), + IndirectProbeSuccess: dm.metrics.indirectProbeSuccess.Load(), + IndirectProbeFailure: dm.metrics.indirectProbeFailure.Load(), + IndirectProbeRefuted: dm.metrics.indirectProbeRefuted.Load(), + Drains: dm.metrics.drains.Load(), NodesSuspect: dm.metrics.nodesSuspect.Load(), NodesDead: dm.metrics.nodesDead.Load(), NodesRemoved: dm.metrics.nodesRemoved.Load(), @@ -1268,10 +1309,10 @@ func (dm *DistMemory) Metrics() DistMetrics { RebalancedReplicaDiff: dm.metrics.rebalanceReplicaDiff.Load(), RebalanceReplicaDiffThrottle: dm.metrics.rebalanceReplicaDiffThrottle.Load(), RebalancedPrimary: dm.metrics.rebalancedPrimary.Load(), - MembershipVersion: mv, - MembersAlive: alive, - MembersSuspect: suspect, - MembersDead: dead, + MembershipVersion: memSnap.version, + MembersAlive: memSnap.alive, + MembersSuspect: memSnap.suspect, + MembersDead: memSnap.dead, } } @@ -1382,6 +1423,39 @@ func (dm *DistMemory) Stop(ctx context.Context) error { return nil } +// Drain marks this node for graceful shutdown: future Set/Remove +// return sentinel.ErrDraining, /health reports HTTP 503 so external +// load balancers stop routing traffic, and the operator should +// follow up with Stop after Drain has settled. Get continues to +// serve so in-flight reads complete with consistent data. +// +// Drain is one-way and idempotent — the second call is a no-op +// (returns nil). Operators clear it by restarting the process. +// +// Returns nil today; the signature retains an error so future +// versions can wait for active replication fan-out to flush before +// returning (Phase B's hint queue makes that meaningful) without a +// breaking change. +func (dm *DistMemory) Drain(_ context.Context) error { + if !dm.draining.CompareAndSwap(false, true) { + return nil // already draining + } + + dm.metrics.drains.Add(1) + + dm.logger.Info( + "dist node draining", + slog.String("addr", dm.nodeAddr), + ) + + return nil +} + +// IsDraining reports whether Drain has been called on this node. +// Operator helper for dashboards / readiness probes that want to +// surface drain state independently of the dist HTTP endpoint. +func (dm *DistMemory) IsDraining() bool { return dm.draining.Load() } + // --- Sync helper methods (placed after exported methods to satisfy ordering linter) --- // IsOwner reports whether this node is an owner (primary or replica) for key. @@ -1425,6 +1499,41 @@ func (dm *DistMemory) RemovePeer(address string) { } } +// distMembershipSnap is the result of membershipSnapshot — bundled +// into a struct because returning four scalars hits the per-function +// result-count lint cap. +type distMembershipSnap struct { + version uint64 + alive int64 + suspect int64 + dead int64 +} + +// membershipSnapshot returns the current membership version plus the +// count of alive/suspect/dead members. Extracted from Metrics() to +// keep that method under the function-length lint cap. +func (dm *DistMemory) membershipSnapshot() distMembershipSnap { + if dm.membership == nil { + return distMembershipSnap{} + } + + out := distMembershipSnap{version: dm.membership.Version()} + + for _, n := range dm.membership.List() { + switch n.State.String() { + case "alive": + out.alive++ + case "suspect": + out.suspect++ + case "dead": + out.dead++ + default: // ignore future states + } + } + + return out +} + // sortedMerkleEntries returns merkle entries sorted by key. func (dm *DistMemory) sortedMerkleEntries() []merkleKV { entries := dm.merkleEntries() @@ -2040,18 +2149,24 @@ func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) { dm.metrics.rebalancedKeys.Add(1) dm.metrics.rebalancedPrimary.Add(1) - // Fire-and-forget forwarding: failures are dropped silently today - // (Phase B will introduce a retry queue). Logging is the minimum - // surface so operators can correlate vanished keys with transport - // failures during rolling deploys. + // Forward the item to the new primary. On failure, hand the item + // to the hint-replay queue keyed by the new primary's node ID: + // the replay loop will retry on its configured schedule until the + // hint TTL expires. Pre-Phase-B this dropped silently — operators + // saw vanished keys after a rebalance tick when the new primary + // was briefly unreachable. Note: replay calls ForwardSet with + // replicate=false; the new primary's own rebalance/replica-diff + // scan re-fans-out to its replicas eventually. migrationErr := transport.ForwardSet(ctx, string(owners[0]), item, true) if migrationErr != nil { - dm.logger.Warn( - "rebalance migration forward failed", + dm.logger.Info( + "rebalance migration forward failed; queued for hint replay", slog.String("key", item.Key), slog.String("new_primary", string(owners[0])), slog.Any("err", migrationErr), ) + + dm.queueHint(string(owners[0]), item) } // Update originalPrimary so we don't recount repeatedly. @@ -2514,9 +2629,12 @@ func (dm *DistMemory) replicateTo(ctx context.Context, item *cache.Item, replica continue } - if errors.Is(err, sentinel.ErrBackendNotFound) { // queue hint for unreachable replica - dm.queueHint(string(oid), item) - } + // Queue a hint for ANY transport error — pre-Phase-B this was + // gated on ErrBackendNotFound only, so transient HTTP failures + // (timeout, 5xx, connection reset) silently dropped replicas. + // The hint TTL bounds total retry time, so a target that's + // permanently gone still drains rather than ballooning. + dm.queueHint(string(oid), item) } return acks @@ -2626,10 +2744,15 @@ func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced co queue = append(queue, hintedEntry{item: &cloned, expire: time.Now().Add(dm.hintTTL), size: size}) dm.hints[nodeID] = queue dm.adjustHintAccounting(1, size) + + // Snapshot under the lock — pre-Phase-B this read happened after + // Unlock and raced with adjustHintAccounting in the replay loop. + bytesNow := dm.hintBytes + dm.hintsMu.Unlock() dm.metrics.hintedQueued.Add(1) - dm.metrics.hintedBytes.Store(dm.hintBytes) + dm.metrics.hintedBytes.Store(bytesNow) } // approxHintSize estimates the size of a hinted item for global caps. @@ -3098,19 +3221,50 @@ func (dm *DistMemory) initStandaloneMembership() { membership.Upsert(dm.localNode) - for _, seedAddr := range dm.seeds { // add seeds - if seedAddr == dm.localNode.Address { // skip self + for _, raw := range dm.seeds { // add seeds + spec := parseSeedSpec(raw) + if spec.addr == dm.localNode.Address { // skip self continue } - n := cluster.NewNode("", seedAddr) - membership.Upsert(n) + if spec.id == string(dm.localNode.ID) { // skip self by ID too + continue + } + + membership.Upsert(cluster.NewNode(spec.id, spec.addr)) } dm.membership = membership dm.ring = ring } +// seedSpec carries a parsed seed entry. Returned as a struct (not two +// strings) so the same-typed pair doesn't trip the confusing-results +// linter while staying compatible with the no-named-returns rule. +type seedSpec struct { + id string + addr string +} + +// parseSeedSpec splits a seed entry into id + addr. The accepted +// shapes are `id@addr` (cross-process clusters where every node +// must know its peers' IDs to route through the consistent hash +// ring) and bare `addr` (legacy / in-process tests that rely on +// heartbeat or gossip to fill the ID later). Everything before the +// first `@` is the ID; everything after is the address. +// +// `id@addr` is what the production server binary uses — without IDs +// in seeds, the ring lookups return empty owners, every node +// promotes itself, and writes never propagate across the cluster. +func parseSeedSpec(raw string) seedSpec { + id, addr, found := strings.Cut(raw, "@") + if !found { + return seedSpec{addr: raw} + } + + return seedSpec{id: id, addr: addr} +} + // heartbeatLoop probes peers and updates membership (best-effort experimental). func (dm *DistMemory) heartbeatLoop(ctx context.Context, stopCh <-chan struct{}) { // reduced cognitive complexity via helpers ticker := time.NewTicker(dm.hbInterval) @@ -3184,6 +3338,10 @@ func (dm *DistMemory) getImpl(ctx context.Context, key string) (*cache.Item, boo // param so it can attach owners.count / acks attributes mid-flight; // returns the operation error for the wrapper to record on the span. func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace.Span) error { + if dm.draining.Load() { + return sentinel.ErrDraining + } + dm.metrics.writeAttempts.Add(1) owners := dm.lookupOwners(item.Key) @@ -3225,25 +3383,38 @@ func (dm *DistMemory) setImpl(ctx context.Context, item *cache.Item, span trace. return nil } -// removeImpl is the business logic for Remove. +// removeImpl is the business logic for Remove. Mirrors setImpl's +// primary-routing semantics: only owners[0] runs applyRemove + +// replica fan-out locally; everyone else (replicas, non-owners) +// forwards to the primary so the delete reaches every owner. +// +// Pre-fix this branched on dm.ownsKeyInternal which returns true for +// any owner — replica-initiated removes ran applyRemove locally, +// whose fan-out then skipped owners[0] under the assumption that +// the caller WAS owners[0]. Net effect: deletes from a replica +// never reached the primary, and the value lingered until TTL. func (dm *DistMemory) removeImpl(ctx context.Context, keys []string) error { + if dm.draining.Load() { + return sentinel.ErrDraining + } + for _, key := range keys { - if dm.ownsKeyInternal(key) { // primary path + owners := dm.lookupOwners(key) + if len(owners) == 0 { + continue + } + + if owners[0] == dm.localNode.ID { // primary path dm.applyRemove(ctx, key, true) continue } transport := dm.loadTransport() - if transport == nil { // non-owner without transport + if transport == nil { // non-primary without transport return sentinel.ErrNotOwner } - owners := dm.ring.Lookup(key) - if len(owners) == 0 { - continue - } - dm.metrics.forwardRemove.Add(1) _ = transport.ForwardRemove(ctx, string(owners[0]), key, true) @@ -3349,6 +3520,26 @@ var distMetricSpecs = []distMetricSpec{ desc: "Failed heartbeat probes", get: func(m DistMetrics) int64 { return m.HeartbeatFailure }, }, + { + name: "dist.heartbeat.indirect_probe.success", unit: unitProbe, counter: true, + desc: "Indirect probes that succeeded (relay reached target)", + get: func(m DistMetrics) int64 { return m.IndirectProbeSuccess }, + }, + { + name: "dist.heartbeat.indirect_probe.failure", unit: unitProbe, counter: true, + desc: "Indirect probes that failed (relay confirmed target unreachable)", + get: func(m DistMetrics) int64 { return m.IndirectProbeFailure }, + }, + { + name: "dist.heartbeat.indirect_probe.refuted", unit: unitProbe, counter: true, + desc: "Direct probe failed but indirect probe succeeded — caller-side network was the issue", + get: func(m DistMetrics) int64 { return m.IndirectProbeRefuted }, + }, + { + name: "dist.drains", unit: unitTransition, counter: true, + desc: "Drain transitions observed on this node (cumulative; 0 or 1 in normal use)", + get: func(m DistMetrics) int64 { return m.Drains }, + }, { name: "dist.nodes.suspect", unit: unitTransition, counter: true, desc: "Cumulative peer transitions to suspect state", @@ -3890,17 +4081,7 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node if err != nil { dm.metrics.heartbeatFailure.Add(1) - - if node.State == cluster.NodeAlive { // escalate - dm.membership.Mark(node.ID, cluster.NodeSuspect) - dm.metrics.nodesSuspect.Add(1) - - dm.logger.Info( - "peer marked suspect (probe failed)", - slog.String("peer_id", string(node.ID)), - slog.Any("err", err), - ) - } + dm.handleProbeFailure(ctx, transport, node, err) return } @@ -3909,3 +4090,130 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node // Mark alive (refresh LastSeen, clear suspicion) dm.membership.Mark(node.ID, cluster.NodeAlive) } + +// handleProbeFailure runs the SWIM indirect-probe refutation path on +// a direct-probe failure: when indirect probes are enabled and any +// relay confirms the target is reachable, the direct failure is +// dismissed as caller-side. Otherwise the target is escalated to +// suspect. Extracted from evaluateLiveness to keep that function under +// the function-length lint cap. +func (dm *DistMemory) handleProbeFailure(ctx context.Context, transport DistTransport, node *cluster.Node, directErr error) { + if dm.indirectProbeK > 0 && dm.indirectProbeReachable(ctx, transport, node.ID) { + dm.metrics.indirectProbeRefuted.Add(1) + + dm.logger.Info( + "peer probe refuted by indirect probe", + slog.String("peer_id", string(node.ID)), + slog.Any("direct_err", directErr), + ) + + // Refresh LastSeen — target is alive per the indirect path. + dm.membership.Mark(node.ID, cluster.NodeAlive) + + return + } + + if node.State != cluster.NodeAlive { + return + } + + dm.membership.Mark(node.ID, cluster.NodeSuspect) + dm.metrics.nodesSuspect.Add(1) + + dm.logger.Info( + "peer marked suspect (probe failed)", + slog.String("peer_id", string(node.ID)), + slog.Any("err", directErr), + ) +} + +// indirectProbeReachable runs up to indirectProbeK indirect probes +// against `targetID` via random alive peers. Returns true the moment +// any relay reports the target reachable — the caller's direct probe +// is then refuted and the target is treated as alive. Returns false if +// no relay can confirm reachability (genuinely down, or no relays +// available). +// +// Probes run sequentially with a per-probe timeout to bound the +// caller's heartbeat tick latency. Sequential is correct here: the +// first success short-circuits, and a parallel implementation would +// pay the full timeout on a fully-down target. +func (dm *DistMemory) indirectProbeReachable(ctx context.Context, transport DistTransport, targetID cluster.NodeID) bool { + relays := dm.pickIndirectRelays(targetID, dm.indirectProbeK) + if len(relays) == 0 { + return false + } + + timeout := dm.indirectProbeTimeout + if timeout <= 0 { + timeout = dm.hbInterval / 2 + } + + for _, relay := range relays { + probeCtx, cancel := context.WithTimeout(ctx, timeout) + err := transport.IndirectHealth(probeCtx, string(relay.ID), string(targetID)) + + cancel() + + if err == nil { + dm.metrics.indirectProbeSuccess.Add(1) + + return true + } + + dm.metrics.indirectProbeFailure.Add(1) + } + + return false +} + +// pickIndirectRelays returns up to k random alive members other than +// self and target. When fewer than k qualify, returns whatever is +// available (no padding). Uses crypto/rand for selection to keep the +// pre-existing G404-free posture in this file. +func (dm *DistMemory) pickIndirectRelays(targetID cluster.NodeID, k int) []*cluster.Node { + if dm.membership == nil || k <= 0 { + return nil + } + + const relayPrealloc = 8 + + candidates := make([]*cluster.Node, 0, relayPrealloc) + + for _, n := range dm.membership.List() { + if n == nil { + continue + } + + if n.ID == dm.localNode.ID || n.ID == targetID { + continue + } + + if n.State != cluster.NodeAlive { + continue + } + + candidates = append(candidates, n) + } + + if len(candidates) <= k { + return candidates + } + + // Fisher–Yates partial shuffle for the first k positions, using + // crypto/rand to match the rest of this file's randomness posture. + for i := range k { + span := len(candidates) - i + + idxBig, err := rand.Int(rand.Reader, big.NewInt(int64(span))) + if err != nil { + continue + } + + swap := i + int(idxBig.Int64()) + + candidates[i], candidates[swap] = candidates[swap], candidates[i] + } + + return candidates[:k] +} diff --git a/pkg/backend/dist_memory_test_helpers.go b/pkg/backend/dist_memory_test_helpers.go index faf9309..1d76619 100644 --- a/pkg/backend/dist_memory_test_helpers.go +++ b/pkg/backend/dist_memory_test_helpers.go @@ -4,6 +4,8 @@ package backend import ( "context" + + "github.com/hyp3rd/hypercache/internal/cluster" ) // DisableHTTPForTest stops the internal HTTP server and clears transport (testing helper). @@ -75,3 +77,16 @@ func (dm *DistMemory) StartHintReplayForTest(ctx context.Context) { // ReplayHintsForTest triggers a single synchronous replay cycle (testing helper). func (dm *DistMemory) ReplayHintsForTest(ctx context.Context) { dm.replayHints(ctx) } + +// IndirectProbeReachableForTest exposes the SWIM indirect-probe path +// to test code: returns true when at least one relay confirms target +// reachability, false otherwise. The same metrics +// (dist.heartbeat.indirect_probe.*) increment as in production. +func (dm *DistMemory) IndirectProbeReachableForTest(ctx context.Context, targetID string) bool { + transport := dm.loadTransport() + if transport == nil { + return false + } + + return dm.indirectProbeReachable(ctx, transport, cluster.NodeID(targetID)) +} diff --git a/pkg/backend/dist_transport.go b/pkg/backend/dist_transport.go index 661a34a..91c5324 100644 --- a/pkg/backend/dist_transport.go +++ b/pkg/backend/dist_transport.go @@ -14,6 +14,13 @@ type DistTransport interface { ForwardGet(ctx context.Context, nodeID, key string) (*cache.Item, bool, error) ForwardRemove(ctx context.Context, nodeID, key string, replicate bool) error Health(ctx context.Context, nodeID string) error + // IndirectHealth asks `relayNodeID` to probe `targetNodeID` on the + // caller's behalf. Used by the SWIM-style indirect-probe path: when + // a direct probe to target fails, several relay nodes are asked to + // probe target; if any of them succeeds, the target is alive and + // the caller's local network was the issue, not the target. + // Returns nil when the relay reports the target reachable. + IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) } @@ -94,6 +101,24 @@ func (t *InProcessTransport) Health(_ context.Context, nodeID string) error { return nil } +// IndirectHealth asks the relay backend to probe target. In-process the +// relay's perspective on target is the same lookup table, so this is +// equivalent to a direct probe — tests that wire two InProcessTransport +// instances per cluster will exercise the relay-failure path naturally. +func (t *InProcessTransport) IndirectHealth(ctx context.Context, relayNodeID, targetNodeID string) error { + relay, ok := t.lookup(relayNodeID) + if !ok { + return sentinel.ErrBackendNotFound + } + + rt := relay.loadTransport() + if rt == nil { + return errNoTransport + } + + return rt.Health(ctx, targetNodeID) +} + // FetchMerkle fetches a remote merkle tree. func (t *InProcessTransport) FetchMerkle(_ context.Context, nodeID string) (*MerkleTree, error) { b, ok := t.lookup(nodeID) diff --git a/scripts/tests/10-test-cluster-api.sh b/scripts/tests/10-test-cluster-api.sh new file mode 100755 index 0000000..2996617 --- /dev/null +++ b/scripts/tests/10-test-cluster-api.sh @@ -0,0 +1,207 @@ +#!/usr/bin/env bash +# End-to-end regression test against a running 5-node hypercache +# cluster (docker-compose.cluster.yml). Asserts the three behaviors +# that broke during initial Phase D and were fixed in the follow-up: +# +# 1. Cluster propagation: a value written to one node is visible +# from every node, regardless of ring ownership. +# 2. Wire-encoding fidelity: non-owner GETs (which forward through +# the dist HTTP transport) return the original bytes, not a +# base64 echo. +# 3. Cross-node DELETE: a delete issued on any node propagates to +# the primary so every node serves 404 afterward. +# +# Run after `docker compose -f docker-compose.cluster.yml up --build`. +# Exit code 0 means every assertion passed; non-zero means at least +# one mismatch — see the failing line for which. +# +# Usage: +# ./scripts/tests/10-test-cluster-api.sh +# PORTS="8081 8082" ./scripts/tests/10-test-cluster-api.sh # custom subset + +set -euo pipefail + +readonly TOKEN="${HYPERCACHE_TOKEN:-dev-token}" +readonly PORTS="${PORTS:-8081 8082 8083 8084 8085}" +readonly WRITE_PORT="${WRITE_PORT:-8081}" +readonly DELETE_PORT="${DELETE_PORT:-8083}" + +# Tracks failures so the script can report all of them, not just the +# first — operators get one full report rather than discover-and-rerun. +fail_count=0 + +# log_fail prints an assertion failure in red (when a TTY is attached) +# and bumps the failure counter. Centralized so every assertion uses +# the same shape. +log_fail() { + local msg="$1" + + if [[ -t 1 ]]; then + printf '\033[31mFAIL\033[0m %s\n' "$msg" + else + printf 'FAIL %s\n' "$msg" + fi + + fail_count=$((fail_count + 1)) +} + +log_ok() { + local msg="$1" + + if [[ -t 1 ]]; then + printf '\033[32m OK \033[0m %s\n' "$msg" + else + printf ' OK %s\n' "$msg" + fi +} + +# put_value writes `$3` to /v1/cache/$2 on port $1 and asserts the +# response status is 200 and the body's `stored` field is true. +put_value() { + local port="$1" + local key="$2" + local value="$3" + + local status + + status=$(curl -sS -o /tmp/hyp-put.body -w '%{http_code}' \ + -H "Authorization: Bearer $TOKEN" \ + -X PUT --data "$value" \ + "http://localhost:$port/v1/cache/$key") + + if [[ "$status" != "200" ]]; then + log_fail "PUT $key on :$port returned status $status (want 200); body: $(cat /tmp/hyp-put.body)" + return 1 + fi + + if ! grep -q '"stored":true' /tmp/hyp-put.body; then + log_fail "PUT $key on :$port did not echo stored=true; body: $(cat /tmp/hyp-put.body)" + return 1 + fi + + log_ok "PUT $key on :$port" + return 0 +} + +# expect_value asserts GET /v1/cache/$key on port $port returns the +# given value with status 200. Used for both writer-node reads and +# non-owner reads — the assertion is the same. +expect_value() { + local port="$1" + local key="$2" + local want="$3" + + local status + + status=$(curl -sS -o /tmp/hyp-get.body -w '%{http_code}' \ + -H "Authorization: Bearer $TOKEN" \ + "http://localhost:$port/v1/cache/$key") + + if [[ "$status" != "200" ]]; then + log_fail "GET $key on :$port: status=$status (want 200); body: $(cat /tmp/hyp-get.body)" + return 1 + fi + + local got + got=$(cat /tmp/hyp-get.body) + if [[ "$got" != "$want" ]]; then + log_fail "GET $key on :$port: got '$got' (want '$want')" + return 1 + fi + + log_ok "GET $key on :$port == '$want'" + return 0 +} + +# expect_404 asserts GET returns 404 with the canonical NOT_FOUND +# JSON shape — used after the delete propagation tests. +expect_404() { + local port="$1" + local key="$2" + + local status + + status=$(curl -sS -o /tmp/hyp-get.body -w '%{http_code}' \ + -H "Authorization: Bearer $TOKEN" \ + "http://localhost:$port/v1/cache/$key") + + if [[ "$status" != "404" ]]; then + log_fail "GET $key on :$port after delete: status=$status (want 404); body: $(cat /tmp/hyp-get.body)" + return 1 + fi + + if ! grep -q '"code":"NOT_FOUND"' /tmp/hyp-get.body; then + log_fail "GET $key on :$port: 404 but missing NOT_FOUND code; body: $(cat /tmp/hyp-get.body)" + return 1 + fi + + log_ok "GET $key on :$port returned 404 NOT_FOUND" + return 0 +} + +# delete_key issues DELETE on the given port and asserts a 200 + +# deleted=true response. +delete_key() { + local port="$1" + local key="$2" + + local status + + status=$(curl -sS -o /tmp/hyp-del.body -w '%{http_code}' \ + -H "Authorization: Bearer $TOKEN" \ + -X DELETE \ + "http://localhost:$port/v1/cache/$key") + + if [[ "$status" != "200" ]]; then + log_fail "DELETE $key on :$port returned status $status; body: $(cat /tmp/hyp-del.body)" + return 1 + fi + + if ! grep -q '"deleted":true' /tmp/hyp-del.body; then + log_fail "DELETE $key on :$port did not echo deleted=true; body: $(cat /tmp/hyp-del.body)" + return 1 + fi + + log_ok "DELETE $key on :$port" + return 0 +} + +echo "=== Phase 1: byte-value propagation (PUT 'world' on :$WRITE_PORT) ===" +put_value "$WRITE_PORT" greeting world || true +sleep 1 +for port in $PORTS; do + expect_value "$port" greeting world || true +done + +echo "" +echo "=== Phase 2: text-value propagation (PUT spaces on :8082) ===" +put_value 8082 sentence "plain string with spaces" || true +sleep 1 +for port in $PORTS; do + expect_value "$port" sentence "plain string with spaces" || true +done + +echo "" +echo "=== Phase 3: cross-node DELETE (DELETE on :$DELETE_PORT, expect 404 cluster-wide) ===" +delete_key "$DELETE_PORT" greeting || true +sleep 1 +for port in $PORTS; do + expect_404 "$port" greeting || true +done + +echo "" +if [[ "$fail_count" -gt 0 ]]; then + if [[ -t 1 ]]; then + printf '\033[31m=== %d assertion(s) failed ===\033[0m\n' "$fail_count" + else + printf '=== %d assertion(s) failed ===\n' "$fail_count" + fi + + exit 1 +fi + +if [[ -t 1 ]]; then + printf '\033[32m=== all assertions passed ===\033[0m\n' +else + printf '=== all assertions passed ===\n' +fi diff --git a/scripts/tests/wait-for-cluster.sh b/scripts/tests/wait-for-cluster.sh new file mode 100755 index 0000000..d17bd69 --- /dev/null +++ b/scripts/tests/wait-for-cluster.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Block until every node in the docker-compose.cluster.yml stack +# answers `GET /healthz` with HTTP 200 — or fail with a clear +# error after the deadline elapses. Used by `make test-cluster` +# and by CI so the assertion script downstream is never racing +# the listener bind. +# +# Usage: +# ./scripts/tests/wait-for-cluster.sh +# PORTS="8081 8082" TIMEOUT_SECS=60 ./scripts/tests/wait-for-cluster.sh + +set -euo pipefail + +readonly PORTS="${PORTS:-8081 8082 8083 8084 8085}" +readonly TIMEOUT_SECS="${TIMEOUT_SECS:-30}" +readonly POLL_INTERVAL="${POLL_INTERVAL:-1}" + +start_epoch=$(date +%s) +deadline=$((start_epoch + TIMEOUT_SECS)) + +# wait_one polls a single port's /healthz endpoint until it returns +# 200 or the global deadline passes. Returns 0 on success, 1 on +# timeout — caller decides whether to abort (we abort on the first +# failed port). +wait_one() { + local port="$1" + + while true; do + now=$(date +%s) + if [[ "$now" -ge "$deadline" ]]; then + printf 'wait-for-cluster: port %s not ready after %ds\n' "$port" "$TIMEOUT_SECS" >&2 + return 1 + fi + + status=$(curl -sS -o /dev/null -w '%{http_code}' \ + --max-time 1 \ + "http://localhost:$port/healthz" 2>/dev/null || true) + + if [[ "$status" == "200" ]]; then + printf ' ready: :%s\n' "$port" + + return 0 + fi + + sleep "$POLL_INTERVAL" + done +} + +printf 'waiting for cluster ports: %s (timeout %ds)\n' "$PORTS" "$TIMEOUT_SECS" + +for port in $PORTS; do + wait_one "$port" +done + +printf 'cluster ready in %ds\n' "$(( $(date +%s) - start_epoch ))" diff --git a/tests/dist_drain_test.go b/tests/dist_drain_test.go new file mode 100644 index 0000000..834dd91 --- /dev/null +++ b/tests/dist_drain_test.go @@ -0,0 +1,185 @@ +package tests + +import ( + "context" + "errors" + "net/http" + "testing" + "time" + + "github.com/hyp3rd/hypercache/internal/sentinel" + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// newDrainNode builds a single-node DistMemory with the HTTP transport +// running so /health and /dist/drain are reachable. Replication=1 keeps +// the test focused on the drain-state machine — fan-out concerns are +// covered elsewhere. +func newDrainNode(t *testing.T) *backend.DistMemory { + t.Helper() + + addr := AllocatePort(t) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode("drain-A", addr), + backend.WithDistReplication(1), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) + + return dm +} + +// httpStatus issues a request and returns its status code; bodies are +// drained so the connection returns to the pool. Test-helper-only — +// production code never ignores response bodies. +func httpStatus(ctx context.Context, t *testing.T, method, url string) int { + t.Helper() + + req, err := http.NewRequestWithContext(ctx, method, url, nil) + if err != nil { + t.Fatalf("build request %s %s: %v", method, url, err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do request %s %s: %v", method, url, err) + } + + defer func() { _ = resp.Body.Close() }() + + return resp.StatusCode +} + +// TestDistDrain_HealthFlipsTo503 is the core C.1 contract: after Drain +// is called, the auth-wrapped /health endpoint reports 503 so external +// load balancers stop routing traffic. Pre-drain it must report 200, +// post-drain it must report 503; the transition is one-way. +func TestDistDrain_HealthFlipsTo503(t *testing.T) { + t.Parallel() + + dm := newDrainNode(t) + ctx := context.Background() + healthURL := "http://" + dm.LocalNodeAddr() + "/health" + + if got := httpStatus(ctx, t, http.MethodGet, healthURL); got != http.StatusOK { + t.Fatalf("expected /health=200 before drain, got %d", got) + } + + err := dm.Drain(ctx) + if err != nil { + t.Fatalf("drain: %v", err) + } + + if got := httpStatus(ctx, t, http.MethodGet, healthURL); got != http.StatusServiceUnavailable { + t.Fatalf("expected /health=503 after drain, got %d", got) + } + + // Drain is idempotent. + err2 := dm.Drain(ctx) + if err2 != nil { + t.Fatalf("second drain returned error: %v", err2) + } + + // Metric increments exactly once per first-transition (CompareAndSwap). + if got := dm.Metrics().Drains; got != 1 { + t.Fatalf("expected Drains=1 after one transition + idempotent re-drain, got %d", got) + } + + if !dm.IsDraining() { + t.Fatal("IsDraining() must report true after Drain") + } +} + +// TestDistDrain_RejectsNewWrites is the corollary contract: while the +// node is draining, Set and Remove return ErrDraining so callers that +// haven't yet noticed the /health 503 still get a clear refusal +// instead of silently writing to a node that's about to disappear. +// Reads continue to succeed because the node still holds data. +func TestDistDrain_RejectsNewWrites(t *testing.T) { + t.Parallel() + + dm := newDrainNode(t) + ctx := context.Background() + + // Pre-drain Set must succeed so the post-drain Get has something to find. + preErr := dm.Set(ctx, &cache.Item{ + Key: "drain-key", + Value: []byte("v"), + Version: 1, + Origin: "drain-A", + LastUpdated: time.Now(), + }) + if preErr != nil { + t.Fatalf("pre-drain set: %v", preErr) + } + + drainErr := dm.Drain(ctx) + if drainErr != nil { + t.Fatalf("drain: %v", drainErr) + } + + postSetErr := dm.Set(ctx, &cache.Item{ + Key: "post-drain-key", + Value: []byte("v"), + Version: 1, + Origin: "drain-A", + LastUpdated: time.Now(), + }) + if !errors.Is(postSetErr, sentinel.ErrDraining) { + t.Fatalf("expected ErrDraining on post-drain Set, got %v", postSetErr) + } + + postRemoveErr := dm.Remove(ctx, "drain-key") + if !errors.Is(postRemoveErr, sentinel.ErrDraining) { + t.Fatalf("expected ErrDraining on post-drain Remove, got %v", postRemoveErr) + } + + // Reads still work — operators expect drain to stop new writes, + // not to abandon in-flight reads. + it, ok := dm.Get(ctx, "drain-key") + if !ok || it == nil { + t.Fatal("expected Get on drain-key to still succeed during drain") + } +} + +// TestDistDrain_HTTPEndpointDrains exercises the operator path: a +// POST to /dist/drain transitions the node, /health flips to 503, +// and the metric increments. Mirrors the in-process test above but +// drives the transition through the wire — confirming the endpoint +// is actually wired and auth-wrapped consistent with the rest of +// /dist/* / /internal/*. +func TestDistDrain_HTTPEndpointDrains(t *testing.T) { + t.Parallel() + + dm := newDrainNode(t) + ctx := context.Background() + + base := "http://" + dm.LocalNodeAddr() + + if got := httpStatus(ctx, t, http.MethodGet, base+"/health"); got != http.StatusOK { + t.Fatalf("expected /health=200 before drain, got %d", got) + } + + if got := httpStatus(ctx, t, http.MethodPost, base+"/dist/drain"); got != http.StatusOK { + t.Fatalf("expected /dist/drain=200, got %d", got) + } + + if got := httpStatus(ctx, t, http.MethodGet, base+"/health"); got != http.StatusServiceUnavailable { + t.Fatalf("expected /health=503 after HTTP drain, got %d", got) + } + + if !dm.IsDraining() { + t.Fatal("IsDraining() must report true after HTTP drain") + } +} diff --git a/tests/dist_http_compression_test.go b/tests/dist_http_compression_test.go new file mode 100644 index 0000000..0701e88 --- /dev/null +++ b/tests/dist_http_compression_test.go @@ -0,0 +1,201 @@ +package tests + +import ( + "context" + "encoding/base64" + "net/http" + "strings" + "testing" + "time" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// valueAsBytes normalizes a cache.Item.Value (typed any) back to a +// byte slice across the JSON wire encodings cache.Item.Value can +// arrive as: native []byte (in-process), base64-encoded string (HTTP +// JSON round-trip), or a plain string. Returns (bytes, true) when +// recognized, (nil, false) otherwise. +func valueAsBytes(v any) ([]byte, bool) { + switch x := v.(type) { + case []byte: + return x, true + + case string: + // Try base64 first — that's how []byte serializes through + // github.com/goccy/go-json. Fall back to the raw string bytes for + // values that were always-string. + decoded, err := base64.StdEncoding.DecodeString(x) + if err == nil { + return decoded, true + } + + return []byte(x), true + + default: + return nil, false + } +} + +// newCompressionNode spins up a DistMemory configured for the +// compression round-trip test. Replication=2 with HTTP seeds means a +// Set's primary path drives `replicateTo` over the wire — that's +// where the gzip-or-not branching lives. Threshold is shared across +// the two-node cluster: server side decompresses any inbound +// `Content-Encoding: gzip` regardless of its own threshold, so the +// option is symmetric in practice. +func newCompressionNode(t *testing.T, id, addr string, seeds []string, threshold int) *backend.DistMemory { + t.Helper() + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode(id, addr), + backend.WithDistSeeds(seeds), + backend.WithDistReplication(2), + backend.WithDistVirtualNodes(32), + backend.WithDistHTTPLimits(backend.DistHTTPLimits{CompressionThreshold: threshold}), + ) + if err != nil { + t.Fatalf("new node %s: %v", id, err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("cast %s: %T", id, bi) + } + + StopOnCleanup(t, dm) + + return dm +} + +// waitForCompressionHealth polls /health until both nodes are +// reachable. Without this, A's Set may try to forward to B's listener +// while B is still binding under -race. +func waitForCompressionHealth(ctx context.Context, baseURL string, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, baseURL+"/health", nil) + if err != nil { + return false + } + + resp, err := http.DefaultClient.Do(req) + if err == nil { + _ = resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + return true + } + } + + time.Sleep(50 * time.Millisecond) + } + + return false +} + +// roundTripValueOnRemote performs a Set on `writer` for `key` with +// `value`, then issues a Get on `reader` and compares — covers the +// wire path on Set (writer → reader via replicateTo) and proves the +// value survived the gzip round-trip when applicable. +func roundTripValueOnRemote( + t *testing.T, + writer, reader *backend.DistMemory, + key string, + value []byte, +) { + t.Helper() + + ctx := context.Background() + + err := writer.Set(ctx, &cache.Item{ + Key: key, + Value: value, + Version: 1, + Origin: string(writer.LocalNodeID()), + LastUpdated: time.Now(), + }) + if err != nil { + t.Fatalf("set: %v", err) + } + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + it, ok := reader.Get(ctx, key) + if ok && it != nil { + got, parsed := valueAsBytes(it.Value) + if !parsed { + t.Fatalf("could not normalize reader value to bytes; got %T", it.Value) + } + + if string(got) != string(value) { + t.Fatalf("value mismatch after replication; got %d bytes want %d", len(got), len(value)) + } + + return + } + + time.Sleep(50 * time.Millisecond) + } + + t.Fatalf("replicated value never arrived on reader") +} + +// TestDistHTTP_CompressionRoundTrip is the Phase B.3 contract test: +// when CompressionThreshold is configured, large Set values gzip on +// the wire and the receiver transparently decompresses them. The +// post-replication value on the reader must equal the writer's +// original bytes, byte-for-byte. +func TestDistHTTP_CompressionRoundTrip(t *testing.T) { + t.Parallel() + + const threshold = 256 + + addrA := AllocatePort(t) + addrB := AllocatePort(t) + + a := newCompressionNode(t, "compress-A", addrA, []string{addrB}, threshold) + b := newCompressionNode(t, "compress-B", addrB, []string{addrA}, threshold) + + ctx := context.Background() + for _, base := range []string{"http://" + a.LocalNodeAddr(), "http://" + b.LocalNodeAddr()} { + if !waitForCompressionHealth(ctx, base, 5*time.Second) { + t.Fatalf("node at %s never came up", base) + } + } + + // Allow ring/membership to settle so replication actually fans + // out across both nodes. + time.Sleep(200 * time.Millisecond) + + value := []byte(strings.Repeat("x", 4*threshold)) + roundTripValueOnRemote(t, a, b, "compressed-key", value) +} + +// TestDistHTTP_CompressionDisabledRoundTrip confirms the default +// (threshold=0) wire path still works unchanged — peers without +// compression support remain compatible until operators raise the +// threshold cluster-wide. +func TestDistHTTP_CompressionDisabledRoundTrip(t *testing.T) { + t.Parallel() + + addrA := AllocatePort(t) + addrB := AllocatePort(t) + + a := newCompressionNode(t, "uncompress-A", addrA, []string{addrB}, 0) + b := newCompressionNode(t, "uncompress-B", addrB, []string{addrA}, 0) + + ctx := context.Background() + for _, base := range []string{"http://" + a.LocalNodeAddr(), "http://" + b.LocalNodeAddr()} { + if !waitForCompressionHealth(ctx, base, 5*time.Second) { + t.Fatalf("node at %s never came up", base) + } + } + + time.Sleep(200 * time.Millisecond) + + value := []byte(strings.Repeat("y", 1024)) + roundTripValueOnRemote(t, a, b, "uncompressed-key", value) +} diff --git a/tests/dist_indirect_probe_test.go b/tests/dist_indirect_probe_test.go new file mode 100644 index 0000000..d3ea9a8 --- /dev/null +++ b/tests/dist_indirect_probe_test.go @@ -0,0 +1,103 @@ +//go:build test + +package tests + +import ( + "context" + "testing" + "time" + + "github.com/hyp3rd/hypercache/pkg/backend" +) + +// TestDistIndirectProbe_ReachabilityViaRelay is the contract test for +// Phase B.1: when a target peer is reachable via the cluster, the +// indirect-probe path reports true and increments the success metric. +// Built on the in-process cluster helper (3 nodes) so the test is +// deterministic — node A asks node B to probe node C, all three are +// alive and registered with the same in-process transport, so B +// trivially reaches C. +// +// The companion negative case (`...UnreachableTargetReturnsFalse`) +// drives the same path with C unregistered to confirm the helper +// short-circuits to false rather than masking the failure. +func TestDistIndirectProbe_ReachabilityViaRelay(t *testing.T) { + t.Parallel() + + dc := SetupInProcessClusterRF( + t, 3, 3, + backend.WithDistIndirectProbes(2, 50*time.Millisecond), + ) + + if len(dc.Nodes) != 3 { + t.Fatalf("expected 3 nodes, got %d", len(dc.Nodes)) + } + + a, c := dc.Nodes[0], dc.Nodes[2] + + // A asks the cluster to probe C; C is alive on the shared transport, + // so at least one relay (B) will confirm reachability. + if !a.IndirectProbeReachableForTest(context.Background(), string(c.LocalNodeID())) { + t.Fatalf("expected indirect probe to confirm C reachable; got false") + } + + if got := a.Metrics().IndirectProbeSuccess; got == 0 { + t.Fatalf("expected IndirectProbeSuccess > 0 after successful relay, got %d", got) + } +} + +// TestDistIndirectProbe_UnreachableTargetReturnsFalse pairs with the +// reachability test: if every relay fails to reach the target, the +// helper must return false (so the caller can proceed to mark suspect) +// and the failure metric must increment. +// +// We simulate "target unreachable from every relay" by unregistering +// C from the shared in-process transport — relays look up C, miss, +// and bubble ErrBackendNotFound back up the indirect path. +func TestDistIndirectProbe_UnreachableTargetReturnsFalse(t *testing.T) { + t.Parallel() + + dc := SetupInProcessClusterRF( + t, 3, 3, + backend.WithDistIndirectProbes(2, 50*time.Millisecond), + ) + + a, c := dc.Nodes[0], dc.Nodes[2] + targetID := string(c.LocalNodeID()) + + // Unregister C from the transport — every relay's Health(C) probe + // will now miss, so no relay can confirm reachability. + dc.Transport.Unregister(targetID) + + if a.IndirectProbeReachableForTest(context.Background(), targetID) { + t.Fatalf("expected indirect probe to report C unreachable; got true") + } + + if got := a.Metrics().IndirectProbeFailure; got == 0 { + t.Fatalf("expected IndirectProbeFailure > 0 after every relay failed, got %d", got) + } +} + +// TestDistIndirectProbe_ZeroKeepsLegacyBehavior confirms the new +// indirect-probe option is fully opt-in: a default-constructed +// DistMemory (no WithDistIndirectProbes call) reports false from +// IndirectProbeReachableForTest because indirectProbeK is 0, and no +// indirect metrics increment. The pre-Phase-B behavior (direct probe +// alone decides liveness) is preserved. +func TestDistIndirectProbe_ZeroKeepsLegacyBehavior(t *testing.T) { + t.Parallel() + + dc := SetupInProcessClusterRF(t, 3, 3) // no WithDistIndirectProbes + + a, c := dc.Nodes[0], dc.Nodes[2] + + if a.IndirectProbeReachableForTest(context.Background(), string(c.LocalNodeID())) { + t.Fatalf("expected default-off behavior: helper reports false when indirectProbeK=0") + } + + m := a.Metrics() + if m.IndirectProbeSuccess != 0 || m.IndirectProbeFailure != 0 || m.IndirectProbeRefuted != 0 { + t.Fatalf("expected zero indirect-probe metrics with K=0, got success=%d failure=%d refuted=%d", + m.IndirectProbeSuccess, m.IndirectProbeFailure, m.IndirectProbeRefuted) + } +} diff --git a/tests/dist_keys_cursor_test.go b/tests/dist_keys_cursor_test.go new file mode 100644 index 0000000..5b2f573 --- /dev/null +++ b/tests/dist_keys_cursor_test.go @@ -0,0 +1,211 @@ +package tests + +import ( + "context" + "net/http" + "strconv" + "testing" + "time" + + "github.com/goccy/go-json" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// keysPage matches the JSON shape returned by /internal/keys. +type keysPage struct { + Keys []string `json:"keys"` + NextCursor string `json:"next_cursor"` + Truncated bool `json:"truncated"` +} + +// fetchKeysPage issues `GET /internal/keys?cursor=` (or +// without cursor when empty) and decodes the response. Used by the +// pagination tests below to walk the cursor chain. Auth is not +// configured in these tests so no Authorization header is set. +func fetchKeysPage(ctx context.Context, t *testing.T, baseURL, cursor string, limit int) keysPage { + t.Helper() + + url := baseURL + "/internal/keys" + + first := true + + if cursor != "" { + url += "?cursor=" + cursor + + first = false + } + + if limit > 0 { + sep := "?" + if !first { + sep = "&" + } + + url += sep + "limit=" + strconv.Itoa(limit) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + t.Fatalf("build request: %v", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do request: %v", err) + } + + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status %d for %s", resp.StatusCode, url) + } + + var page keysPage + + decodeErr := json.NewDecoder(resp.Body).Decode(&page) + if decodeErr != nil { + t.Fatalf("decode page: %v", decodeErr) + } + + return page +} + +// newKeysCursorNode is the constructor shared by the pagination tests. +// Replication=1 keeps the test focused on the per-node enumeration +// path without ring-routing to peers. +func newKeysCursorNode(t *testing.T) *backend.DistMemory { + t.Helper() + + addr := AllocatePort(t) + + bi, err := backend.NewDistMemory( + context.Background(), + backend.WithDistNode("keys-A", addr), + backend.WithDistReplication(1), + ) + if err != nil { + t.Fatalf("new dist memory: %v", err) + } + + dm, ok := bi.(*backend.DistMemory) + if !ok { + t.Fatalf("expected *backend.DistMemory, got %T", bi) + } + + StopOnCleanup(t, dm) + + return dm +} + +// TestDistKeysCursor_WalksAllShards is the core C.2 contract: a +// client following the next_cursor chain from /internal/keys must +// eventually visit every key the node holds, exactly once. +// +// We seed 64 keys (well under any reasonable per-shard limit so no +// page is truncated) then walk the cursor chain until next_cursor +// is empty, accumulating keys. The set of accumulated keys must +// equal the set of seeded keys. +func TestDistKeysCursor_WalksAllShards(t *testing.T) { + t.Parallel() + + dm := newKeysCursorNode(t) + ctx := context.Background() + + const want = 64 + + expected := make(map[string]struct{}, want) + for i := range want { + key := "cursor-key-" + strconv.Itoa(i) + + err := dm.Set(ctx, &cache.Item{ + Key: key, + Value: []byte("v"), + Version: 1, + Origin: "keys-A", + LastUpdated: time.Now(), + }) + if err != nil { + t.Fatalf("set %s: %v", key, err) + } + + expected[key] = struct{}{} + } + + base := "http://" + dm.LocalNodeAddr() + + got := map[string]int{} // key -> times seen + cursor := "" + + for range 32 { // upper bound to prevent infinite loop on bug + page := fetchKeysPage(ctx, t, base, cursor, 0) + + for _, k := range page.Keys { + got[k]++ + } + + if page.NextCursor == "" { + break + } + + cursor = page.NextCursor + } + + if len(got) != len(expected) { + t.Fatalf("expected %d unique keys across pagination, got %d", len(expected), len(got)) + } + + for k := range expected { + if got[k] != 1 { + t.Fatalf("key %s appeared %d times across pagination, want 1", k, got[k]) + } + } +} + +// TestDistKeysCursor_LimitTruncatesAndResumes proves the per-shard +// `limit` knob works: when a shard has more keys than the limit +// allows, the response carries `truncated=true` and `next_cursor` +// pointing at the same shard so the client can re-request with a +// larger limit. We assert at least one truncated page surfaces when +// many keys land in a single shard. +// +// The default shard count (8) means 256 keys distribute roughly to +// 32 per shard; setting limit=1 guarantees truncation on every +// non-empty shard. +func TestDistKeysCursor_LimitTruncatesAndResumes(t *testing.T) { + t.Parallel() + + dm := newKeysCursorNode(t) + ctx := context.Background() + + for i := range 256 { + key := "limit-key-" + strconv.Itoa(i) + + err := dm.Set(ctx, &cache.Item{ + Key: key, + Value: []byte("v"), + Version: 1, + Origin: "keys-A", + LastUpdated: time.Now(), + }) + if err != nil { + t.Fatalf("set %s: %v", key, err) + } + } + + base := "http://" + dm.LocalNodeAddr() + page := fetchKeysPage(ctx, t, base, "0", 1) + + if !page.Truncated { + t.Fatalf("expected truncated=true with limit=1 across 256 keys, got false") + } + + if page.NextCursor != "0" { + t.Fatalf("expected truncated page to keep next_cursor=0, got %q", page.NextCursor) + } + + if len(page.Keys) != 1 { + t.Fatalf("expected exactly 1 key under limit=1, got %d", len(page.Keys)) + } +} diff --git a/tests/dist_migration_retry_test.go b/tests/dist_migration_retry_test.go new file mode 100644 index 0000000..417d33b --- /dev/null +++ b/tests/dist_migration_retry_test.go @@ -0,0 +1,93 @@ +//go:build test + +package tests + +import ( + "context" + "testing" + "time" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// TestDistMigrationRetry_QueuesHintOnTransportError is the contract +// test for Phase B.2: when a forward to a replica fails for any +// transport error (not just ErrBackendNotFound), the item is queued +// onto the hint-replay queue rather than dropped. +// +// We exercise the replicateTo path on a 2-node cluster: node A writes +// a key whose replica is node B, but B is unregistered from the +// shared in-process transport. The Set's replica fan-out must enqueue +// a hint for B; replay against a re-registered B should land the key. +func TestDistMigrationRetry_QueuesHintOnTransportError(t *testing.T) { + t.Parallel() + + ctx := context.Background() + dc := SetupInProcessClusterRF( + t, 2, 2, + backend.WithDistHintTTL(2*time.Second), + backend.WithDistHintReplayInterval(50*time.Millisecond), + backend.WithDistWriteConsistency(backend.ConsistencyOne), // tolerate partial fan-out + ) + + a, b := dc.Nodes[0], dc.Nodes[1] + bID := string(b.LocalNodeID()) + + // Find a key whose primary owner is A — only then does A's Set + // drive replicateTo against B as a replica. When B is the primary, + // A's Set is forwarded (or A promotes itself with no fan-out + // because replication=2 leaves an empty replica set). + var key string + + for i := range 64 { + candidate := "mig-hint-" + string(rune('a'+i%26)) + string(rune('a'+(i/26)%26)) + + owners := dc.Ring.Lookup(candidate) + if len(owners) > 0 && owners[0] == a.LocalNodeID() { + key = candidate + + break + } + } + + if key == "" { + t.Fatalf("could not find a key A primaries within 64 candidates") + } + + // Drop B from the transport — A's replica fan-out will see a + // not-found / non-recoverable error from the transport. + dc.Transport.Unregister(bID) + + err := a.Set(ctx, &cache.Item{ + Key: key, + Value: []byte("v1"), + Version: 1, + Origin: string(a.LocalNodeID()), + LastUpdated: time.Now(), + }) + if err != nil { + t.Fatalf("set: %v", err) + } + + // Hint queue must show the deferred replica write. + if got := a.Metrics().HintedQueued; got == 0 { + t.Fatalf("expected HintedQueued > 0 after replica unreachable, got %d", got) + } + + // Re-register B and run hint replay synchronously — the queued + // hint should drain and the metric for replayed-hints should + // increment. + dc.Transport.Register(b) + a.ReplayHintsForTest(ctx) + + if got := a.Metrics().HintedReplayed; got == 0 { + t.Fatalf("expected HintedReplayed > 0 after replay drained queued hint, got %d", got) + } + + // And the key must now be present on B. + it, ok := b.Get(ctx, key) + if !ok || it == nil { + t.Fatalf("expected key on B after replay, got missing") + } +} diff --git a/tests/integration/dist_phase1_test.go b/tests/integration/dist_phase1_test.go index 58cef1f..9d870b0 100644 --- a/tests/integration/dist_phase1_test.go +++ b/tests/integration/dist_phase1_test.go @@ -3,12 +3,13 @@ package integration import ( "context" "encoding/base64" - "encoding/json" "fmt" "net" "testing" "time" + "github.com/goccy/go-json" + "github.com/hyp3rd/hypercache/pkg/backend" cache "github.com/hyp3rd/hypercache/pkg/cache/v2" ) diff --git a/tests/integration/dist_seed_spec_test.go b/tests/integration/dist_seed_spec_test.go new file mode 100644 index 0000000..e7ef13f --- /dev/null +++ b/tests/integration/dist_seed_spec_test.go @@ -0,0 +1,139 @@ +package integration + +import ( + "context" + "testing" + "time" + + "github.com/hyp3rd/hypercache/pkg/backend" + cache "github.com/hyp3rd/hypercache/pkg/cache/v2" +) + +// seedSpecCluster bundles a 3-node test cluster with the (id@addr) +// seed wiring shared by every node. Returned to the test so the +// test body stays focused on the propagation assertion. +type seedSpecCluster struct { + a, b, c *backend.DistMemory +} + +// buildSeedSpecCluster builds three HTTP-transport nodes wired with +// `id@addr` seeds — pre-fix, this configuration produced a broken +// ring; post-fix, every node knows every peer's identity from the +// first lookup. Constructor uses context.Background() internally +// (the constructor ctx is only used for listener bind, not for any +// caller-cancellable work); test code drives Set/Get with its own +// context. +func buildSeedSpecCluster(t *testing.T) *seedSpecCluster { + t.Helper() + + ctx := context.Background() + + addrA := allocatePort(t) + addrB := allocatePort(t) + addrC := allocatePort(t) + + all := []struct{ id, addr string }{ + {"node-A", addrA}, {"node-B", addrB}, {"node-C", addrC}, + } + + seedsFor := func(self string) []string { + out := make([]string, 0, len(all)-1) + + for _, s := range all { + if s.id == self { + continue + } + + out = append(out, s.id+"@"+s.addr) + } + + return out + } + + mkNode := func(id, addr string) *backend.DistMemory { + bm, err := backend.NewDistMemory( + ctx, + backend.WithDistNode(id, addr), + backend.WithDistSeeds(seedsFor(id)), + backend.WithDistReplication(3), + backend.WithDistVirtualNodes(32), + backend.WithDistReadConsistency(backend.ConsistencyOne), + backend.WithDistWriteConsistency(backend.ConsistencyQuorum), + ) + if err != nil { + t.Fatalf("new node %s: %v", id, err) + } + + dm, ok := bm.(*backend.DistMemory) + if !ok { + t.Fatalf("cast %s: %T", id, bm) + } + + t.Cleanup(func() { _ = dm.Stop(ctx) }) + + return dm + } + + cluster := &seedSpecCluster{ + a: mkNode("node-A", addrA), + b: mkNode("node-B", addrB), + c: mkNode("node-C", addrC), + } + + for _, addr := range []string{addrA, addrB, addrC} { + waitForDistNodeHealth(ctx, t, addr) + } + + // Allow a brief settle so the auto-created HTTP transport has + // resolved peer URLs from membership before the first Set + // fans out. 200 ms is generous on a single host. + time.Sleep(200 * time.Millisecond) + + return cluster +} + +// TestDistSeedSpec_PropagatesAcrossNodes is the end-to-end fix +// regression for the cluster-propagation bug: pre-fix, seeds were +// added to membership with empty IDs, the ring built over empty IDs, +// every node treated itself as the only real owner, and writes +// stopped at whichever node received them. The fix: seeds may now +// carry their peer's node ID inline as `id@addr`. +func TestDistSeedSpec_PropagatesAcrossNodes(t *testing.T) { + t.Parallel() + + ctx := context.Background() + cl := buildSeedSpecCluster(t) + + const key = "propagation-key" + + err := cl.a.Set(ctx, &cache.Item{ + Key: key, + Value: []byte("hello"), + Version: 1, + Origin: "node-A", + LastUpdated: time.Now(), + }) + if err != nil { + t.Fatalf("set on A: %v", err) + } + + deadline := time.Now().Add(2 * time.Second) + for _, n := range []*backend.DistMemory{cl.b, cl.c} { + found := false + + for time.Now().Before(deadline) { + it, ok := n.Get(ctx, key) + if ok && it != nil { + found = true + + break + } + + time.Sleep(50 * time.Millisecond) + } + + if !found { + t.Fatalf("node %s never saw the propagated key", n.LocalNodeID()) + } + } +}