From 83b84a44e99e832b9a75871876b9742749ee1894 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 25 May 2026 10:18:08 -0700 Subject: [PATCH 1/9] moq-relay: restore gossip-style cluster discovery via --cluster-node The 91ea43c5 rewrite collapsed clustering onto a single hop-routed origin and removed the old `--cluster-root`/`--cluster-node` flags that let leaves discover each other through a rendezvous. The replacement `--cluster-connect` requires every operator to enumerate every peer URL in every relay's config, which is O(N^2) configuration and bites anyone adding a node. Issue #1499 hit this after merging main. Hop-based loop detection now lives on every broadcast, so the old "publish a registration broadcast to a well-known path" trick can return without the `cluster: bool` auth flag that used to gate it. Bring back `--cluster-node `: when set the relay publishes a placeholder broadcast at `.internal/origins/` on its own origin, and watches that prefix for other peers' registrations. Each newly-announced URL is dialed with the same exponential-backoff loop as `--cluster-connect`, deduplicated by URL so static and gossip peers don't double-dial. Scope the `.internal/*` namespace to mTLS sessions only. Add a small `OriginProducer::block(prefix)` / `OriginConsumer::block(prefix)` view to moq-net (symmetric with `scope`/`with_root`) that refuses publishes and hides announces under a prefix. `Cluster::publisher` and `Cluster::subscriber` apply `.block(".internal")` for any token where `internal == false`, so even a broad-scope JWT can't see or publish into the mesh namespace. `--cluster-root` / `cluster.root` is kept as a hidden Option field that bails at startup with a migration message pointing at `--cluster-connect` and `--cluster-node`. Rewrite doc/bin/relay/cluster.md to describe both the static and gossip modes, the `.internal/*` access rule, and a migration table. Update doc/bin/relay/config.md, rs/moq-relay/README.md to match. Switch demo/relay/leaf{0,1}.toml from static `connect = [root, leaf]` to gossip (`connect = [root]` + `node`), so the loop-detection exercise still runs but through discovery. Closes #1499 Co-Authored-By: Claude Opus 4.7 (1M context) --- demo/relay/leaf0.toml | 8 +- demo/relay/leaf1.toml | 10 +- doc/bin/relay/cluster.md | 139 +++++++++------- doc/bin/relay/config.md | 17 +- rs/moq-net/src/model/origin.rs | 229 ++++++++++++++++++++++++-- rs/moq-relay/README.md | 22 +-- rs/moq-relay/src/cluster.rs | 289 +++++++++++++++++++++++++++++++-- 7 files changed, 603 insertions(+), 111 deletions(-) diff --git a/demo/relay/leaf0.toml b/demo/relay/leaf0.toml index abb6caef9..d12099742 100644 --- a/demo/relay/leaf0.toml +++ b/demo/relay/leaf0.toml @@ -35,9 +35,13 @@ tls.root = ["ca.pem"] listen = "[::]:4444" [cluster] -# Connect only to the root. leaf1 will dial us directly, forming a loop -# (leaf1 -> root, leaf1 -> leaf0, leaf0 -> root) to exercise cluster loop detection. +# Dial the root for the initial cluster session. The root then gossips our +# `node` URL to other leaves via the .internal/origins/ mesh path so +# they discover and dial us back. leaf1 reaches us this way, forming a loop +# (leaf1 -> root, leaf1 -> leaf0, leaf0 -> root) that exercises cluster +# loop detection. connect = ["localhost:4443"] +node = "localhost:4444" [auth] # Allow JWT tokens that are signed by this root key. diff --git a/demo/relay/leaf1.toml b/demo/relay/leaf1.toml index e21fddf40..19a045b49 100644 --- a/demo/relay/leaf1.toml +++ b/demo/relay/leaf1.toml @@ -35,9 +35,13 @@ tls.root = ["ca.pem"] listen = "[::]:4445" [cluster] -# Connect to both the root and leaf0, forming a loop to exercise cluster loop detection. -# ex. imagine us-east connects to us-central AND directly to us-west. -connect = ["localhost:4443", "localhost:4444"] +# Dial only the root explicitly; gossip via .internal/origins/* will discover +# leaf0 and dial it automatically. Combined with leaf0's own outbound connect +# to the root, this still forms the leaf0 <-> leaf1 <-> root loop that +# exercises cluster loop detection. The mesh edge is now discovered, not +# hardcoded (ex. imagine us-east learning about us-west via us-central). +connect = ["localhost:4443"] +node = "localhost:4445" [auth] # Allow JWT tokens that are signed by this root key. diff --git a/doc/bin/relay/cluster.md b/doc/bin/relay/cluster.md index 187f7ef38..d29a28ae7 100644 --- a/doc/bin/relay/cluster.md +++ b/doc/bin/relay/cluster.md @@ -5,96 +5,125 @@ description: Run multiple moq-relay instances across multiple hosts/regions # Clustering -Multiple relay instances can cluster for geographic distribution and improved latency. +Multiple relay instances can join a cluster for geographic distribution and improved latency. Every cluster peer publishes into the same logical origin; loop detection and shortest-path preference come from a hop list on each broadcast, so peers can be connected in arbitrary topologies without duplicating data. -## Overview +## Two ways to form a cluster -`moq-relay` uses a simple clustering scheme: +Pick the mode that matches your operational constraints. Both can be combined in a single deployment. -1. **Root node** - A single relay (can serve public traffic) that tracks cluster membership -2. **Other nodes** - Accept internet traffic and consult the root for routing +### Static topology -When a relay publishes a broadcast, it advertises its `node` address to other relays via the root. +Enumerate every peer by URL. Best for small clusters (2-5 nodes) where membership rarely changes. -## Configuration +```toml +[cluster] +connect = ["us-east.example.com:4443", "eu-west.example.com:4443"] +``` + +Each entry in `connect` is dialed at startup and kept alive with exponential backoff. There is no discovery: every new node requires editing every existing config. + +### Gossip discovery + +Each relay sets `node` to its own externally-reachable URL. Connecting to a single peer is enough; that peer gossips the new node's address to everyone else. ```toml +# On the rendezvous (every other relay connects here) [cluster] -root = "https://root-relay.example.com" # Root node -node = "https://us-east.relay.example.com" # This node's address +node = "rendezvous.example.com:4443" + +# On a leaf joining the cluster +[cluster] +node = "us-east.example.com:4443" +connect = ["rendezvous.example.com:4443"] ``` -### Cluster Arguments +When a leaf with `node` set connects to `rendezvous`, it publishes a registration broadcast at `.internal/origins/` on the cluster origin. Other peers reachable from `rendezvous` see the registration and dial the new leaf, building a full mesh. Removing a node unannounces its registration, which aborts the dial on every other peer. -- `--cluster-root ` - Hostname/IP of the root node (omit to make this node the root) -- `--cluster-node ` - Hostname/IP of this instance (needs valid TLS cert) +A relay with `node` set and no `connect` entries waits passively for inbound connections. A relay with `connect` and no `node` dials peers but isn't itself advertised, so others won't discover it via gossip. -## How It Works +## How gossip works -1. Each relay connects to the root node on startup -2. When a publisher connects to any relay, that relay announces the broadcast -3. The root node tracks which relay has which broadcasts -4. When a subscriber connects, the relay queries the root to find the broadcast -5. Relays connect to each other to forward traffic +1. On startup, a relay with `cluster.node = ""` publishes a placeholder broadcast at `.internal/origins/` on its own origin. The broadcast carries no tracks: the path is the registration. +2. Cluster sessions exchange their origins both ways. The registration propagates to every connected peer, accumulating a hop chain along the way. +3. Each peer watches `.internal/origins/*` for newly announced URLs and dials any it isn't already connected to. Dials are deduplicated by URL, so a peer reached via both `connect` and gossip uses a single session. +4. When a peer goes away, its registration is unannounced and every other relay aborts the dial it spawned in response. +5. Loop detection on `publish_broadcast` refuses any broadcast whose hop chain already contains this relay's id, so re-announcing a registration through a longer path is a silent no-op. -## Benefits +## Visibility of `.internal/*` -- **Lower latency** - Users connect to nearest relay -- **Higher availability** - Redundancy across regions -- **Geographic distribution** - Serve global audiences +Mesh registrations are infrastructure, not user data. The relay restricts the `.internal/` namespace to internal sessions: -## Example Topology +- **mTLS peers** (cluster-to-cluster traffic, authenticated against `tls.root`) see `.internal/*` and can publish into it. This is how registrations flow between relays. +- **JWT-authenticated sessions** are filtered: their subscribe view hides `.internal/*` announcements, and their publish view refuses publishes to `.internal/*`. This holds even for tokens with the broadest possible scope (`subscribe = [""]`, `publish = [""]`). +- **Anonymous sessions** under `auth.public` are bound by the configured public prefixes; `.internal/` is not one of them. -```text - ┌─────────────┐ - │ Root Node │ - │ (US-C) │ - └──────┬──────┘ - ┌───────────────┼───────────────┐ - │ │ │ - ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ - │ US-East │ │ EU-West │ │ Asia-SE │ - │ Relay │ │ Relay │ │ Relay │ - └─────────────┘ └─────────────┘ └─────────────┘ -``` +The split is enforced at session acceptance, so there is no way to reach `.internal/*` without first authenticating via a trusted client certificate. + +## Peer authentication + +Cluster peers must authenticate to each other before they exchange registrations. Two options: -## Peer Authentication +### mTLS (recommended for new deployments) + +Configure the relay with `tls.root` pointing at the CA that signed the cluster peer certificates. Inbound connections presenting a valid client cert are granted full access (`AuthToken::unrestricted`) and tagged as internal. Leaves connect outbound with a `client.tls.cert` / `client.tls.key` signed by the same CA. No JWT is required. -Cluster peers must authenticate to each other. Two options: +See [Authentication → mTLS Peer Authentication](/bin/relay/auth#mtls-peer-authentication) for the CA setup walkthrough. ### JWT token -Each leaf reads a JWT from `cluster.token` (see [Authentication](/bin/relay/auth)) -and presents it to the root on connect. The token must grant cluster privileges -and full publish/subscribe access. +Each relay reads a JWT from `cluster.token` and presents it on outbound dials. The token must grant full publish and subscribe scope (`publish: ""`, `subscribe: ""`). The receiving relay verifies it like any other JWT. -### mTLS (recommended for new deployments) +```toml +[cluster] +node = "us-east.example.com:4443" +connect = ["rendezvous.example.com:4443"] +token = "cluster.jwt" +``` -Configure the root with `tls.root` pointing at the CA that signed the leaf -certificates. Leaves connect with a client certificate signed by that CA — -no JWT needed. The leaf's cluster node name is taken from the first DNS SAN on -its certificate, so identity is bound to the cert rather than self-declared. +JWT-authenticated cluster sessions are tagged as external for stats purposes (unlike mTLS) but otherwise behave the same. Note: by default a JWT session is also filtered out of `.internal/*`. A cluster JWT must include the cluster privileges that bypass that filter; if you find yourself reaching for this path, prefer mTLS. -See [Authentication → mTLS Peer Authentication](/bin/relay/auth#mtls-peer-authentication) -for details. +## Example topology (3-node gossip cluster) -## Current Limitations +```text + ┌──────────────────────┐ + │ rendezvous.exam.com │ + │ cluster.node = ... │ + └──┬──────────────┬────┘ + │ │ + gossip ┌──┘ └──┐ gossip + │ │ + ┌──────────┴──────┐ ┌────────┴────────┐ + │ us-east.exam.com│◀──▶│ eu-west.exam.com│ + │ node + connect │ │ node + connect │ + └─────────────────┘ └─────────────────┘ + ▲ direct (gossip) ▲ + └─────────────────────────┘ +``` -- **Mesh topology** - All relays connect to all others -- **Not optimized for large clusters** - 3-5 nodes recommended -- **Single root node** - Future: multi-root for redundancy +`us-east` and `eu-west` each set `connect = ["rendezvous.example.com:4443"]`. The rendezvous gossips them to each other; the resulting topology is a full mesh. -## Production Example +## Production example -The public CDN at `cdn.moq.dev` uses this clustering approach: +The public CDN at `cdn.moq.dev` uses gossip-style discovery across regions: -- `usc.cdn.moq.dev` - US Central (root) +- `usc.cdn.moq.dev` - US Central - `euc.cdn.moq.dev` - EU Central - `sea.cdn.moq.dev` - Southeast Asia Clients use GeoDNS to connect to the nearest relay automatically. -## Next Steps +## Migration from older configs + +`cluster.root` was removed in favor of the gossip / static split. If a config still sets it (CLI flag `--cluster-root` or TOML `[cluster] root = "..."`), the relay errors at startup with a message pointing at `--cluster-connect` and `--cluster-node`. Two minimal migrations: + +| Old (pre-rewrite) | New equivalent | +|---|---| +| `root = "rendezvous:4443"` + `node = "us-east:4443"` | `connect = ["rendezvous:4443"]` + `node = "us-east:4443"` | +| `root = "rendezvous:4443"` (root-only node) | `node = "rendezvous:4443"` (passive rendezvous) | + +The `node` field on leaves keeps its meaning; only the entry-point flag was renamed from `root` to `connect`, and `connect` now accepts a list. + +## Next steps - Deploy to [Production](/bin/relay/prod) - Set up [Authentication](/bin/relay/auth) diff --git a/doc/bin/relay/config.md b/doc/bin/relay/config.md index 3047b31cf..2d0942862 100644 --- a/doc/bin/relay/config.md +++ b/doc/bin/relay/config.md @@ -120,18 +120,19 @@ Clustering configuration for multi-relay deployments. ```toml [cluster] -# Address of the root relay to connect to -# Omit this to make this relay the root -connect = "root.relay.example.com:4443" +# Static peers to dial. Each is kept alive with exponential backoff. +connect = ["rendezvous.example.com:4443"] -# JWT token file for cluster authentication -token = "cluster.jwt" +# This relay's own externally-reachable URL. When set, the relay advertises +# itself on the cluster origin so peers reached via `connect` discover and +# dial it. Omit for a relay that should not appear in the gossip mesh. +node = "us-east.example.com:4443" -# This relay's address, as reachable by other cluster nodes -node = "leaf1.relay.example.com:4443" +# JWT used for outbound cluster dials (alternative to mTLS). +token = "cluster.jwt" ``` -See [Clustering](/bin/relay/cluster) for deployment patterns. +See [Clustering](/bin/relay/cluster) for deployment patterns and the static / gossip mode split. ### \[client] diff --git a/rs/moq-net/src/model/origin.rs b/rs/moq-net/src/model/origin.rs index c5d2f2471..29f7e1b89 100644 --- a/rs/moq-net/src/model/origin.rs +++ b/rs/moq-net/src/model/origin.rs @@ -643,6 +643,10 @@ pub struct OriginProducer { // The prefix that is automatically stripped from all paths. root: PathOwned, + + // Absolute path prefixes for which publishes are silently refused. + // Populated by [`OriginProducer::block`]. Empty by default. + blocked: Vec, } impl std::ops::Deref for OriginProducer { @@ -661,6 +665,7 @@ impl OriginProducer { info, nodes: OriginNodes::default(), root: PathOwned::default(), + blocked: Vec::new(), } } @@ -690,13 +695,17 @@ impl OriginProducer { return false; } + let full = self.root.join(&path); + + if self.is_blocked(&full) { + return false; + } + let (root, rest) = match self.nodes.get(&path) { Some(root) => root, None => return false, }; - let full = self.root.join(&path); - root.lock().publish(&full, &broadcast, &rest); let root = root.clone(); @@ -719,9 +728,32 @@ impl OriginProducer { info: self.info, nodes: self.nodes.select(&prefixes)?, root: self.root.clone(), + blocked: self.blocked.clone(), }) } + /// Returns a new OriginProducer that silently refuses publishes whose absolute path + /// falls under `prefix`. Stacks: `.block(a).block(b)` refuses paths under either. + /// + /// `prefix` is interpreted relative to this producer's current root, so the resulting + /// block is fixed at the absolute path computed at call time. Subsequent + /// [`with_root`](Self::with_root) calls do not retarget the block. + pub fn block(&self, prefix: impl AsPath) -> Self { + let absolute = self.root.join(prefix); + let mut blocked = self.blocked.clone(); + blocked.push(absolute); + Self { + info: self.info, + nodes: self.nodes.clone(), + root: self.root.clone(), + blocked, + } + } + + fn is_blocked(&self, absolute: &Path<'_>) -> bool { + self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) + } + /// Subscribe to all announced broadcasts. pub fn consume(&self) -> OriginConsumer { OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone()) @@ -734,6 +766,12 @@ impl OriginProducer { #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")] pub fn get_broadcast(&self, path: impl AsPath) -> Option { let path = path.as_path(); + if !self.blocked.is_empty() { + let absolute = self.root.join(&path); + if self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) { + return None; + } + } let (root, rest) = self.nodes.get(&path)?; let state = root.lock(); state.consume_broadcast(&rest) @@ -750,6 +788,7 @@ impl OriginProducer { info: self.info, root: self.root.join(&prefix).to_owned(), nodes: self.nodes.root(&prefix)?, + blocked: self.blocked.clone(), }) } @@ -785,6 +824,10 @@ pub struct OriginConsumer { // A prefix that is automatically stripped from all paths. root: PathOwned, + + // Absolute path prefixes whose announcements / lookups are hidden from this + // consumer. Populated by [`OriginConsumer::block`]. Empty by default. + blocked: Vec, } impl std::ops::Deref for OriginConsumer { @@ -797,6 +840,10 @@ impl std::ops::Deref for OriginConsumer { impl OriginConsumer { fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self { + Self::new_with_blocked(info, root, nodes, Vec::new()) + } + + fn new_with_blocked(info: Origin, root: PathOwned, nodes: OriginNodes, blocked: Vec) -> Self { let state = conducer::Producer::::default(); let id = ConsumerId::new(); @@ -814,6 +861,7 @@ impl OriginConsumer { nodes, state, root, + blocked, } } @@ -834,14 +882,21 @@ impl OriginConsumer { /// consumer is closed, or `Poll::Pending` after registering `waiter` to be /// notified when the next update arrives. pub fn poll_announced(&mut self, waiter: &conducer::Waiter) -> Poll> { - match self.state.poll(waiter, |state| match state.take() { - Some(item) => Poll::Ready(item), - None => Poll::Pending, - }) { - Poll::Ready(Ok(item)) => Poll::Ready(Some(item)), - // Closed: discard the Ref so its MutexGuard doesn't escape this call. - Poll::Ready(Err(_)) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + loop { + let item = match self.state.poll(waiter, |state| match state.take() { + Some(item) => Poll::Ready(item), + None => Poll::Pending, + }) { + Poll::Ready(Ok(item)) => item, + // Closed: discard the Ref so its MutexGuard doesn't escape this call. + Poll::Ready(Err(_)) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + }; + + if self.is_blocked(&item.0) { + continue; + } + return Poll::Ready(Some(item)); } } @@ -850,7 +905,21 @@ impl OriginConsumer { /// Returns None if there is no update available; NOT because the consumer is closed. /// You have to use `is_closed` to check if the consumer is closed. pub fn try_announced(&mut self) -> Option { - self.state.write().ok()?.take() + loop { + let item = self.state.write().ok()?.take()?; + if self.is_blocked(&item.0) { + continue; + } + return Some(item); + } + } + + fn is_blocked(&self, path: &PathOwned) -> bool { + if self.blocked.is_empty() { + return false; + } + let absolute = self.root.join(path); + self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) } /// Create another consumer with its own announcement cursor over the same origin. @@ -867,6 +936,12 @@ impl OriginConsumer { /// landed (e.g. you're responding to an `announced()` callback). pub fn get_broadcast(&self, path: impl AsPath) -> Option { let path = path.as_path(); + if !self.blocked.is_empty() { + let absolute = self.root.join(&path); + if self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) { + return None; + } + } let (root, rest) = self.nodes.get(&path)?; let state = root.lock(); state.consume_broadcast(&rest) @@ -883,6 +958,14 @@ impl OriginConsumer { pub async fn announced_broadcast(&self, path: impl AsPath) -> Option { let path = path.as_path(); + // Refuse blocked paths up front; the post-scope loop would otherwise hang forever. + if !self.blocked.is_empty() { + let absolute = self.root.join(&path); + if self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) { + return None; + } + } + // Scope a fresh consumer down to this path so we only wake up for relevant announcements. let mut consumer = self.scope(std::slice::from_ref(&path))?; @@ -911,10 +994,11 @@ impl OriginConsumer { // TODO accept PathPrefixes instead of &[Path] pub fn scope(&self, prefixes: &[Path]) -> Option { let prefixes = PathPrefixes::new(prefixes); - Some(OriginConsumer::new( + Some(OriginConsumer::new_with_blocked( self.info, self.root.clone(), self.nodes.select(&prefixes)?, + self.blocked.clone(), )) } @@ -925,13 +1009,28 @@ impl OriginConsumer { pub fn with_root(&self, prefix: impl AsPath) -> Option { let prefix = prefix.as_path(); - Some(Self::new( + Some(Self::new_with_blocked( self.info, self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?, + self.blocked.clone(), )) } + /// Returns a new OriginConsumer that hides announcements whose absolute path falls + /// under `prefix`, and refuses `get_broadcast` / `announced_broadcast` lookups + /// targeting those paths. Stacks: `.block(a).block(b)` hides paths under either. + /// + /// `prefix` is interpreted relative to this consumer's current root, so the resulting + /// block is fixed at the absolute path computed at call time. Subsequent + /// [`with_root`](Self::with_root) calls do not retarget the block. + pub fn block(&self, prefix: impl AsPath) -> Self { + let absolute = self.root.join(prefix); + let mut blocked = self.blocked.clone(); + blocked.push(absolute); + Self::new_with_blocked(self.info, self.root.clone(), self.nodes.clone(), blocked) + } + /// Returns the prefix that is automatically stripped from all paths. pub fn root(&self) -> &Path<'_> { &self.root @@ -959,7 +1058,7 @@ impl Drop for OriginConsumer { impl Clone for OriginConsumer { fn clone(&self) -> Self { - OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone()) + OriginConsumer::new_with_blocked(self.info, self.root.clone(), self.nodes.clone(), self.blocked.clone()) } } @@ -2118,4 +2217,106 @@ mod tests { "unexpected path in pending updates", ); } + + #[tokio::test] + async fn test_consume_block_hides_announces_under_prefix() { + let origin = Origin::random().produce(); + let hidden = Broadcast::new().produce(); + let visible = Broadcast::new().produce(); + + // .internal/origins/foo is the cluster mesh path the relay needs to keep private. + origin.publish_broadcast(".internal/origins/foo", hidden.consume()); + origin.publish_broadcast("demo/bar", visible.consume()); + + let mut blocked = origin.consume().block(".internal"); + blocked.assert_next("demo/bar", &visible.consume()); + blocked.assert_next_wait(); // .internal/origins/foo is hidden + + let mut unblocked = origin.consume(); + // Insertion order isn't guaranteed across roots; just confirm both arrive. + let first = unblocked.try_announced().expect("first announce"); + let second = unblocked.try_announced().expect("second announce"); + let mut paths = [first.0.as_str(), second.0.as_str()]; + paths.sort(); + assert_eq!(paths, [".internal/origins/foo", "demo/bar"]); + } + + #[tokio::test] + async fn test_publish_block_refuses_publish_under_prefix() { + let origin = Origin::random().produce(); + let broadcast = Broadcast::new().produce(); + + let blocked = origin.block(".internal"); + assert!(!blocked.publish_broadcast(".internal/origins/foo", broadcast.consume())); + // Exact-match also blocked. + assert!(!blocked.publish_broadcast(".internal", broadcast.consume())); + // Sibling under a different first segment is fine. + assert!(blocked.publish_broadcast(".internalish", broadcast.consume())); + assert!(blocked.publish_broadcast("demo/bar", broadcast.consume())); + + let mut consumer = origin.consume(); + // Insertion-order: .internalish first, then demo/bar. + consumer.assert_next(".internalish", &broadcast.consume()); + consumer.assert_next("demo/bar", &broadcast.consume()); + consumer.assert_next_wait(); + } + + #[tokio::test] + async fn test_block_get_broadcast() { + let origin = Origin::random().produce(); + let hidden = Broadcast::new().produce(); + origin.publish_broadcast(".internal/origins/foo", hidden.consume()); + + let consumer = origin.consume().block(".internal"); + assert!(consumer.get_broadcast(".internal/origins/foo").is_none()); + assert!( + consumer + .announced_broadcast(".internal/origins/foo") + .now_or_never() + .is_some() + ); + // announced_broadcast returns None synchronously for blocked paths. + assert!( + consumer + .announced_broadcast(".internal/origins/foo") + .now_or_never() + .unwrap() + .is_none() + ); + } + + #[tokio::test] + async fn test_block_stacks() { + let origin = Origin::random().produce(); + let b1 = Broadcast::new().produce(); + let b2 = Broadcast::new().produce(); + let b3 = Broadcast::new().produce(); + + origin.publish_broadcast(".internal/x", b1.consume()); + origin.publish_broadcast(".secret/y", b2.consume()); + origin.publish_broadcast("demo/z", b3.consume()); + + let mut consumer = origin.consume().block(".internal").block(".secret"); + consumer.assert_next("demo/z", &b3.consume()); + consumer.assert_next_wait(); + } + + #[tokio::test] + async fn test_block_with_root_treats_block_as_absolute() { + let origin = Origin::random().produce(); + let internal = Broadcast::new().produce(); + let nested = Broadcast::new().produce(); + + // `.internal` set at the unrooted view stays anchored at the absolute root. + // After descending into `demo`, paths in the view start with `demo/...`, so the + // absolute `.internal/...` paths aren't reachable from the rooted view anyway. + let blocked = origin.block(".internal"); + + assert!(!blocked.publish_broadcast(".internal/x", internal.consume())); + + // .block then with_root: the descended view publishes under absolute "demo/...", + // so blocked .internal is unreachable from the rooted view and publishes succeed. + let rooted = blocked.with_root("demo").expect("rooted view"); + assert!(rooted.publish_broadcast("nested", nested.consume())); + } } diff --git a/rs/moq-relay/README.md b/rs/moq-relay/README.md index c84148805..327aa1ce1 100644 --- a/rs/moq-relay/README.md +++ b/rs/moq-relay/README.md @@ -58,22 +58,22 @@ HTTPS is currently not supported. ## Clustering -In order to scale MoQ, you will eventually need to run multiple moq-relay instances potentially in different regions. -This is called *clustering*, where the goal is that a user connects to the closest relay and they magically form a mesh behind the scenes. +To scale MoQ, you will eventually need to run multiple moq-relay instances, often in different regions. +A user connects to the nearest relay and the cluster routes broadcasts between peers behind the scenes. -**moq-relay** uses a simple clustering scheme using moq-lite. -This is both dog-fooding and a surprisingly ueeful way to distribute live metadata at scale. +**moq-relay** layers clustering on top of moq-lite: every cluster peer publishes into the same logical origin, with a hop list on each broadcast for loop detection and shortest-path preference. +There are two ways to form a cluster, which can be combined: -We currently use a single "root" node that is used to discover members of the cluster and what broadcasts they offer. -This is a normal moq-relay instance, potentially serving public traffic, unaware of the fact that it's in charge of other relays. +- **Static topology** — `--cluster-connect ` (repeatable or comma-separated). Each peer is dialed at startup and kept alive with exponential backoff. Best for 2-5 stable nodes; no discovery. +- **Gossip discovery** — `--cluster-node `. This relay advertises its URL on the cluster origin so peers reached via `--cluster-connect` discover and dial it. Pair with `--cluster-connect ` to join an existing mesh. -The other moq-relay instances accept internet traffic and consult the root for routing. -They can then advertise their internal ip/hostname to other instances when publishing a broadcast. +A relay with only `--cluster-node` set waits passively for inbound connections (acts as a rendezvous). A relay with both flags dials the rendezvous, gossips itself, and dials every peer it learns about. -Cluster arguments: +Mesh registrations live at `.internal/origins/` on the cluster origin. That namespace is mTLS-only: JWT and anonymous sessions never see or publish into `.internal/*` regardless of their declared scope. -- `--cluster-root `: The hostname/ip of the root node. If missing, this node is a root. -- `--cluster-node `: The hostname/ip of this instance. There needs to be a corresponding valid TLS certificate, potentially self-signed. If missing, published broadcasts will only be available on this specific relay. +> `--cluster-root` was removed. If you have it in an existing config, the relay errors at startup with a message pointing at the replacements above. + +See [doc/bin/relay/cluster.md](https://github.com/moq-dev/moq/blob/main/doc/bin/relay/cluster.md) for the full walkthrough, including mTLS setup and a 3-node example. ## Authentication diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index d6ef4df66..7b7cf32b1 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -1,16 +1,32 @@ -use std::path::PathBuf; +use std::{ + collections::HashMap, + path::PathBuf, + sync::{Arc, Mutex}, +}; use anyhow::Context; -use moq_net::{Origin, OriginConsumer, OriginProducer, Stats, Tier}; +use moq_net::{BroadcastProducer, Origin, OriginConsumer, OriginProducer, Path, Stats, Tier}; +use tokio::task::AbortHandle; use url::Url; use crate::AuthToken; +/// Path prefix under which cluster nodes advertise their own URLs for gossip-style +/// peer discovery. Restricted to mTLS (`token.internal`) sessions by +/// [`Cluster::subscriber`] / [`Cluster::publisher`]. +const MESH_PREFIX: &str = ".internal/origins"; + /// Configuration for relay clustering. /// -/// Each node runs a full mesh: every configured `--cluster-connect` peer is -/// dialed and kept open for the session's lifetime. Hop-based routing on -/// broadcasts prevents announcement loops. +/// Two modes that can be combined: +/// +/// - **Static** ([`Self::connect`]): explicit list of peer URLs to dial. Each is kept +/// alive for the session lifetime; no discovery happens. +/// - **Gossip** ([`Self::node`] + at least one [`Self::connect`] entry): advertise +/// this relay's URL on the cluster origin so connected peers discover and dial it, +/// and watch for the advertisements of others so we dial them too. +/// +/// Hop-based routing on broadcasts prevents announcement loops regardless of mode. #[serde_with::serde_as] #[derive(clap::Args, Clone, Debug, serde::Serialize, serde::Deserialize, Default)] #[serde_with::skip_serializing_none] @@ -30,9 +46,20 @@ pub struct ClusterConfig { #[serde_as(as = "serde_with::OneOrMany<_>")] pub connect: Vec, + /// This relay's own externally-reachable URL. When set, the relay publishes its address + /// on the cluster origin (under `.internal/origins/`) so other mTLS-authenticated + /// peers can discover and dial it. Pair with [`Self::connect`] to reach an initial peer + /// who will gossip your address onward. + #[arg(id = "cluster-node", long = "cluster-node", env = "MOQ_CLUSTER_NODE")] + pub node: Option, + /// Use the token in this file when connecting to other nodes. #[arg(id = "cluster-token", long = "cluster-token", env = "MOQ_CLUSTER_TOKEN")] pub token: Option, + + /// Removed; present only to emit a migration error. Use [`Self::connect`] instead. + #[arg(id = "cluster-root", long = "cluster-root", env = "MOQ_CLUSTER_ROOT", hide = true)] + pub root: Option, } /// A relay cluster built around a single [`OriginProducer`]. @@ -98,24 +125,45 @@ impl Cluster { } /// Returns an [`OriginConsumer`] scoped to this session's subscribe permissions. + /// + /// Non-internal tokens (i.e. JWT-authenticated end users) cannot see `.internal/*` + /// paths regardless of their declared scope. Cluster mesh registrations and other + /// infrastructure broadcasts live under that prefix. pub fn subscriber(&self, token: &AuthToken) -> Option { - Some(self.origin.with_root(&token.root)?.scope(&token.subscribe)?.consume()) + let view = self.origin.with_root(&token.root)?.scope(&token.subscribe)?.consume(); + Some(if token.internal { view } else { view.block(".internal") }) } /// Returns an [`OriginProducer`] scoped to this session's publish permissions. + /// + /// Non-internal tokens cannot publish into `.internal/*` regardless of their + /// declared scope. pub fn publisher(&self, token: &AuthToken) -> Option { - self.origin.with_root(&token.root)?.scope(&token.publish) + let view = self.origin.with_root(&token.root)?.scope(&token.publish)?; + Some(if token.internal { view } else { view.block(".internal") }) } - /// Runs the cluster event loop, dialing the configured peers and keeping - /// each connection alive indefinitely with exponential backoff on failure. + /// Runs the cluster event loop: dial static `--cluster-connect` peers, publish a + /// self-registration broadcast for `--cluster-node` (if set), and watch for other + /// peers' registrations to discover and dial them. /// - /// Completes once all dials have given up; a node with no peers (`connect` - /// empty) has no outbound work and returns immediately. Errors when peers - /// are configured but no client has been attached via - /// [`with_client`](Self::with_client). + /// Completes once all dials have given up; a node with no peers and no self URL + /// returns immediately. Errors: + /// - if `cluster.root` / `--cluster-root` is set (removed flag); + /// - if any cluster work is configured but no QUIC client has been attached via + /// [`with_client`](Self::with_client). pub async fn run(self) -> anyhow::Result<()> { - if self.config.connect.is_empty() { + if let Some(root) = &self.config.root { + anyhow::bail!( + "`cluster.root` / `--cluster-root` was removed (value: {root:?}). \ + Use `--cluster-connect ` for static peer connections, or \ + `--cluster-node ` to gossip this relay's address so other peers \ + can discover and dial it. See https://doc.moq.dev/bin/relay/cluster." + ); + } + + let has_work = !self.config.connect.is_empty() || self.config.node.is_some(); + if !has_work { tracing::info!("no cluster peers configured; running standalone"); return Ok(()); } @@ -133,15 +181,36 @@ impl Cluster { None => String::new(), }; + // Hold the self-registration broadcast alive for the lifetime of `run`. Dropping + // it would unannounce immediately and tell peers we've left. + let _self_registration: Option = self.config.node.as_deref().map(|node| { + let path = Path::new(MESH_PREFIX).join(node); + let broadcast = self + .origin + .create_broadcast(&path) + .expect(".internal/origins is within the relay origin's root"); + tracing::info!(%node, %path, "advertising cluster node"); + broadcast + }); + + // Track active dial tasks by URL so static and gossip-discovered peers don't + // duplicate, and so the discovery side can abort a task when a peer unannounces. + let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let mut tasks = tokio::task::JoinSet::new(); + + // Seed static peers from --cluster-connect. for peer in &self.config.connect { + Self::spawn_dial(&mut tasks, &active, self.clone(), peer.clone(), token.clone()); + } + + // Spawn the gossip discovery task if --cluster-node is set. + if let Some(self_url) = self.config.node.clone() { let this = self.clone(); let token = token.clone(); - let peer = peer.clone(); + let active = active.clone(); tasks.spawn(async move { - if let Err(err) = this.run_remote(&peer, token).await { - tracing::warn!(%err, %peer, "cluster peer connection ended"); - } + this.run_discovery(self_url, token, active).await; }); } @@ -149,6 +218,79 @@ impl Cluster { Ok(()) } + /// Spawn a dial loop for `peer` and remember its abort handle. Skips if `peer` + /// is already tracked (caller-side dedup against static peers and prior discoveries). + fn spawn_dial( + tasks: &mut tokio::task::JoinSet<()>, + active: &Arc>>, + this: Self, + peer: String, + token: String, + ) { + { + let active = active.lock().expect("dial map poisoned"); + if active.contains_key(&peer) { + return; + } + } + let peer_for_task = peer.clone(); + let handle = tasks.spawn(async move { + if let Err(err) = this.run_remote(&peer_for_task, token).await { + tracing::warn!(%err, peer = %peer_for_task, "cluster peer connection ended"); + } + }); + active.lock().expect("dial map poisoned").insert(peer, handle); + } + + /// Watch `.internal/origins/*` for peer registrations and dial each newly-announced + /// URL that isn't already tracked. Unannounces abort the corresponding dial. + async fn run_discovery(self, self_url: String, token: String, active: Arc>>) { + let Some(mut consumer) = self.origin.consume().with_root(MESH_PREFIX) else { + tracing::warn!("could not scope cluster origin to {MESH_PREFIX}; discovery disabled"); + return; + }; + + while let Some((relative, announced)) = consumer.announced().await { + let peer = relative.as_str(); + if peer == self_url { + continue; + } + + match announced { + Some(_) => { + let peer = peer.to_owned(); + let already_active = { + let active = active.lock().expect("dial map poisoned"); + active.contains_key(&peer) + }; + if already_active { + tracing::debug!(%peer, "discovered peer already tracked; skipping dial"); + continue; + } + tracing::info!(%peer, "discovered cluster peer; dialing"); + let this = self.clone(); + let token = token.clone(); + let peer_for_task = peer.clone(); + let handle = tokio::spawn(async move { + if let Err(err) = this.run_remote(&peer_for_task, token).await { + tracing::warn!(%err, peer = %peer_for_task, "cluster peer connection ended"); + } + }); + active + .lock() + .expect("dial map poisoned") + .insert(peer, handle.abort_handle()); + } + None => { + tracing::info!(%peer, "cluster peer unannounced; aborting dial"); + if let Some(handle) = active.lock().expect("dial map poisoned").remove(peer) { + handle.abort(); + } + } + } + } + } + #[tracing::instrument("remote", skip_all, err, fields(%remote))] async fn run_remote(self, remote: &str, token: String) -> anyhow::Result<()> { let mut url = Url::parse(&format!("https://{remote}/"))?; @@ -209,3 +351,114 @@ impl Cluster { session.closed().await.map_err(Into::into) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::Config; + use moq_net::{Broadcast, PathOwned, PathPrefixes}; + + fn full_scope_jwt() -> AuthToken { + AuthToken { + root: PathOwned::default(), + subscribe: PathPrefixes::from(vec![PathOwned::from(String::new())]), + publish: PathPrefixes::from(vec![PathOwned::from(String::new())]), + internal: false, + } + } + + /// A JWT with the broadest possible scope is still kept out of `.internal/*`. + #[tokio::test] + async fn internal_paths_invisible_to_non_mtls_token() { + let cluster = Cluster::new(ClusterConfig::default()); + let mesh = Broadcast::new().produce(); + let user = Broadcast::new().produce(); + + cluster + .origin + .publish_broadcast(".internal/origins/peer.example.com:4443", mesh.consume()); + cluster.origin.publish_broadcast("demo/test", user.consume()); + + let token = full_scope_jwt(); + let mut subscriber = cluster.subscriber(&token).expect("subscriber"); + + // The user broadcast is visible; the mesh registration must not be. + let (path, broadcast) = subscriber.try_announced().expect("user announce"); + assert_eq!(path.as_str(), "demo/test"); + assert!(broadcast.is_some()); + assert!( + subscriber.try_announced().is_none(), + ".internal/* must not be visible to a broad-scope JWT" + ); + + // The publisher view rejects publishes to `.internal/*` even with broad scope. + let publisher = cluster.publisher(&token).expect("publisher"); + let attempt = Broadcast::new().produce(); + assert!(!publisher.publish_broadcast(".internal/origins/attacker", attempt.consume())); + } + + /// mTLS sessions see the mesh registrations they need to route between cluster peers. + #[tokio::test] + async fn internal_paths_visible_to_mtls_token() { + let cluster = Cluster::new(ClusterConfig::default()); + let mesh = Broadcast::new().produce(); + cluster + .origin + .publish_broadcast(".internal/origins/peer.example.com:4443", mesh.consume()); + + let mut subscriber = cluster.subscriber(&AuthToken::unrestricted()).expect("subscriber"); + let (path, broadcast) = subscriber.try_announced().expect("announce"); + assert_eq!(path.as_str(), ".internal/origins/peer.example.com:4443"); + assert!(broadcast.is_some()); + } + + /// Setting `cluster.root` (the removed flag) at startup must surface a migration + /// message that names both the replacement flags. + #[tokio::test] + async fn cluster_root_errors_with_migration_message() { + let config = ClusterConfig { + root: Some("legacy-root.example.com:4443".to_string()), + ..Default::default() + }; + let err = Cluster::new(config).run().await.expect_err("should error"); + let msg = format!("{err}"); + assert!(msg.contains("cluster.root"), "missing cluster.root in: {msg}"); + assert!(msg.contains("--cluster-connect"), "missing --cluster-connect in: {msg}"); + assert!(msg.contains("--cluster-node"), "missing --cluster-node in: {msg}"); + } + + /// `cluster.root` parsed from TOML triggers the same migration error. + #[test] + fn cluster_root_toml_parses_then_errors() { + let toml = "[cluster]\nroot = \"legacy-root.example.com:4443\"\n"; + let dir = std::env::temp_dir().join("moq-relay-cluster-test"); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join("cluster-root-toml.toml"); + std::fs::write(&path, toml).unwrap(); + + let args = vec![std::ffi::OsString::from("moq-relay"), std::ffi::OsString::from(&path)]; + let config = Config::parse_and_merge(args).expect("config load"); + assert_eq!(config.cluster.root.as_deref(), Some("legacy-root.example.com:4443")); + + let rt = tokio::runtime::Runtime::new().unwrap(); + let err = rt + .block_on(Cluster::new(config.cluster).run()) + .expect_err("should error"); + assert!(format!("{err}").contains("cluster.root")); + } + + /// `cluster.node` round-trips through TOML and CLI. + #[test] + fn cluster_node_round_trips() { + let toml = "[cluster]\nnode = \"us-east.example.com:4443\"\nconnect = [\"root.example.com:4443\"]\n"; + let dir = std::env::temp_dir().join("moq-relay-cluster-test"); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join("cluster-node-toml.toml"); + std::fs::write(&path, toml).unwrap(); + + let args = vec![std::ffi::OsString::from("moq-relay"), std::ffi::OsString::from(&path)]; + let config = Config::parse_and_merge(args).expect("config load"); + assert_eq!(config.cluster.node.as_deref(), Some("us-east.example.com:4443")); + assert_eq!(config.cluster.connect, vec!["root.example.com:4443".to_string()]); + } +} From ea8678f3b4e3b6d8b678099e2c3f508ca76f17c8 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 25 May 2026 14:00:36 -0700 Subject: [PATCH 2/9] moq-relay: fix .internal block bypass + static-peer abort on gossip churn Three review fixes for #1504. 1. block-before-root for non-internal tokens. Previously the relay built a view as `with_root(token.root).scope(token.subscribe).consume()` and *then* called `.block(".internal")` on it. The block stores absolute paths, computed at call time as `self.root.join(prefix)`. For a token with `root == ".internal"` the block resolved to absolute `.internal/.internal`, and the real `.internal/origins/*` mesh paths leaked through. Apply `.block(".internal")` to `self.origin` *before* `with_root` so the block is anchored at the true absolute root regardless of the token's `root` claim. 2. `OriginProducer::consume()` now propagates `blocked` to the derived consumer. The view is meaningfully a (publish-refuse, announce-hide) pair; the consumer half was unfiltered, so the producer's publish block left the announce side wide open. The fix in #1 exposed this: the regression test for token.root=".internal" wouldn't have failed if the consumer had been blocked. Now `.block().consume()` returns a consumer that also filters announces. 3. Static peers survive gossip unannounces. `active: HashMap` used to be shared between `--cluster-connect` (static) peers and gossip-discovered peers. A peer that appeared in both would have its static reconnect loop aborted as soon as its `.internal/origins/` registration was unannounced (e.g. peer restart). Replace the value with `DialEntry { handle, is_static }` and only abort discovered entries on unannounce. Doc: rewrote the JWT subsection in `cluster.md` to drop the incorrect claim that a "cluster JWT" can bypass `.internal/*` filtering. `.internal/*` is mTLS-only; JWT cluster peers can relay user traffic but cannot participate in gossip discovery. Tests: - `internal_paths_invisible_when_token_root_is_internal` exercises the block-before-root path. - `gossip_unannounce_preserves_static_peer` exercises the new `DialEntry.is_static` branch by inserting both a static and a discovered entry into the active map and asserting only the discovered one is removed. Extract `Cluster::handle_gossip_unannounce` so the test can hit the exact same code path the discovery loop uses. Co-Authored-By: Claude Opus 4.7 (1M context) --- doc/bin/relay/cluster.md | 2 +- rs/moq-net/src/model/origin.rs | 10 +- rs/moq-relay/src/cluster.rs | 171 ++++++++++++++++++++++++++++----- 3 files changed, 154 insertions(+), 29 deletions(-) diff --git a/doc/bin/relay/cluster.md b/doc/bin/relay/cluster.md index d29a28ae7..e4e72885b 100644 --- a/doc/bin/relay/cluster.md +++ b/doc/bin/relay/cluster.md @@ -80,7 +80,7 @@ connect = ["rendezvous.example.com:4443"] token = "cluster.jwt" ``` -JWT-authenticated cluster sessions are tagged as external for stats purposes (unlike mTLS) but otherwise behave the same. Note: by default a JWT session is also filtered out of `.internal/*`. A cluster JWT must include the cluster privileges that bypass that filter; if you find yourself reaching for this path, prefer mTLS. +JWT-authenticated cluster sessions are tagged as external for stats purposes. **`.internal/*` is mTLS-only**: a JWT session, no matter how broad its scope, is filtered out of `.internal/origins/*` and cannot publish or receive mesh registrations. JWT-only cluster peers can still relay user traffic for each other, but they will not participate in gossip discovery. Use mTLS for any deployment that wants peers to find each other automatically. ## Example topology (3-node gossip cluster) diff --git a/rs/moq-net/src/model/origin.rs b/rs/moq-net/src/model/origin.rs index 29f7e1b89..30e0313d7 100644 --- a/rs/moq-net/src/model/origin.rs +++ b/rs/moq-net/src/model/origin.rs @@ -755,8 +755,12 @@ impl OriginProducer { } /// Subscribe to all announced broadcasts. + /// + /// Any `block(prefix)` calls on this producer propagate to the consumer so the + /// view stays consistent: paths the producer refuses to publish are also hidden + /// from announce streams on the derived consumer. pub fn consume(&self) -> OriginConsumer { - OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone()) + OriginConsumer::new_with_blocked(self.info, self.root.clone(), self.nodes.clone(), self.blocked.clone()) } /// Get a broadcast by path if it has *already* been published. @@ -839,10 +843,6 @@ impl std::ops::Deref for OriginConsumer { } impl OriginConsumer { - fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self { - Self::new_with_blocked(info, root, nodes, Vec::new()) - } - fn new_with_blocked(info: Origin, root: PathOwned, nodes: OriginNodes, blocked: Vec) -> Self { let state = conducer::Producer::::default(); let id = ConsumerId::new(); diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index 7b7cf32b1..eac8fabbf 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -16,6 +16,15 @@ use crate::AuthToken; /// [`Cluster::subscriber`] / [`Cluster::publisher`]. const MESH_PREFIX: &str = ".internal/origins"; +/// One entry in the active-dial map. The provenance flag keeps +/// gossip unannounces from tearing down a peer that was also seeded statically: +/// static peers must keep their reconnect loop running even when the +/// remote temporarily disappears from `.internal/origins/*`. +struct DialEntry { + handle: AbortHandle, + is_static: bool, +} + /// Configuration for relay clustering. /// /// Two modes that can be combined: @@ -127,20 +136,33 @@ impl Cluster { /// Returns an [`OriginConsumer`] scoped to this session's subscribe permissions. /// /// Non-internal tokens (i.e. JWT-authenticated end users) cannot see `.internal/*` - /// paths regardless of their declared scope. Cluster mesh registrations and other - /// infrastructure broadcasts live under that prefix. + /// paths regardless of their declared scope or root. Cluster mesh registrations and + /// other infrastructure broadcasts live under that prefix. + /// + /// The block is applied to the absolute root before any `with_root`/`scope` so a + /// JWT whose `token.root` itself lies under `.internal/*` can't sidestep it. pub fn subscriber(&self, token: &AuthToken) -> Option { - let view = self.origin.with_root(&token.root)?.scope(&token.subscribe)?.consume(); - Some(if token.internal { view } else { view.block(".internal") }) + let origin = self.access_origin(token); + Some(origin.with_root(&token.root)?.scope(&token.subscribe)?.consume()) } /// Returns an [`OriginProducer`] scoped to this session's publish permissions. /// /// Non-internal tokens cannot publish into `.internal/*` regardless of their - /// declared scope. + /// declared scope or root. pub fn publisher(&self, token: &AuthToken) -> Option { - let view = self.origin.with_root(&token.root)?.scope(&token.publish)?; - Some(if token.internal { view } else { view.block(".internal") }) + let origin = self.access_origin(token); + origin.with_root(&token.root)?.scope(&token.publish) + } + + /// Returns the base origin a session is allowed to see. mTLS / internal sessions + /// get the full origin; everyone else gets a view that blocks `.internal/*`. + fn access_origin(&self, token: &AuthToken) -> OriginProducer { + if token.internal { + self.origin.clone() + } else { + self.origin.block(".internal") + } } /// Runs the cluster event loop: dial static `--cluster-connect` peers, publish a @@ -194,14 +216,16 @@ impl Cluster { }); // Track active dial tasks by URL so static and gossip-discovered peers don't - // duplicate, and so the discovery side can abort a task when a peer unannounces. - let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); + // duplicate, and so the discovery side can abort a discovered peer's task when + // it unannounces. Static peers carry `is_static = true` and are exempt from + // unannounce-driven aborts. + let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); let mut tasks = tokio::task::JoinSet::new(); // Seed static peers from --cluster-connect. for peer in &self.config.connect { - Self::spawn_dial(&mut tasks, &active, self.clone(), peer.clone(), token.clone()); + Self::spawn_dial(&mut tasks, &active, self.clone(), peer.clone(), token.clone(), true); } // Spawn the gossip discovery task if --cluster-node is set. @@ -222,10 +246,11 @@ impl Cluster { /// is already tracked (caller-side dedup against static peers and prior discoveries). fn spawn_dial( tasks: &mut tokio::task::JoinSet<()>, - active: &Arc>>, + active: &Arc>>, this: Self, peer: String, token: String, + is_static: bool, ) { { let active = active.lock().expect("dial map poisoned"); @@ -239,12 +264,16 @@ impl Cluster { tracing::warn!(%err, peer = %peer_for_task, "cluster peer connection ended"); } }); - active.lock().expect("dial map poisoned").insert(peer, handle); + active + .lock() + .expect("dial map poisoned") + .insert(peer, DialEntry { handle, is_static }); } /// Watch `.internal/origins/*` for peer registrations and dial each newly-announced - /// URL that isn't already tracked. Unannounces abort the corresponding dial. - async fn run_discovery(self, self_url: String, token: String, active: Arc>>) { + /// URL that isn't already tracked. An unannounce aborts the corresponding dial only + /// for gossip-discovered peers; static `--cluster-connect` peers keep reconnecting. + async fn run_discovery(self, self_url: String, token: String, active: Arc>>) { let Some(mut consumer) = self.origin.consume().with_root(MESH_PREFIX) else { tracing::warn!("could not scope cluster origin to {MESH_PREFIX}; discovery disabled"); return; @@ -276,18 +305,38 @@ impl Cluster { tracing::warn!(%err, peer = %peer_for_task, "cluster peer connection ended"); } }); - active - .lock() - .expect("dial map poisoned") - .insert(peer, handle.abort_handle()); + active.lock().expect("dial map poisoned").insert( + peer, + DialEntry { + handle: handle.abort_handle(), + is_static: false, + }, + ); } - None => { - tracing::info!(%peer, "cluster peer unannounced; aborting dial"); - if let Some(handle) = active.lock().expect("dial map poisoned").remove(peer) { - handle.abort(); - } + None => Self::handle_gossip_unannounce(&active, peer), + } + } + } + + /// Handle a gossip unannounce for `peer`: abort the dial only if the entry was + /// added by discovery. Static peers (seeded from `--cluster-connect`) keep their + /// reconnect loop running, since gossip churn is just remote restarts. + fn handle_gossip_unannounce(active: &Arc>>, peer: &str) { + let mut active = active.lock().expect("dial map poisoned"); + match active.get(peer) { + Some(entry) if entry.is_static => { + tracing::debug!( + %peer, + "gossip unannounce for static peer; reconnect loop kept alive" + ); + } + Some(_) => { + tracing::info!(%peer, "cluster peer unannounced; aborting dial"); + if let Some(entry) = active.remove(peer) { + entry.handle.abort(); } } + None => {} } } @@ -397,6 +446,82 @@ mod tests { assert!(!publisher.publish_broadcast(".internal/origins/attacker", attempt.consume())); } + /// Regression test for the block-before-root fix: a JWT whose `root` claim points + /// at `.internal` (or any prefix of it) must still be filtered. Before the fix the + /// `.block(".internal")` call sat on top of a view already rooted at `.internal`, + /// so the resulting block prefix was `.internal/.internal` and the real mesh paths + /// leaked through. + #[tokio::test] + async fn internal_paths_invisible_when_token_root_is_internal() { + let cluster = Cluster::new(ClusterConfig::default()); + let mesh = Broadcast::new().produce(); + cluster + .origin + .publish_broadcast(".internal/origins/peer.example.com:4443", mesh.consume()); + + let token = AuthToken { + root: PathOwned::from(".internal".to_string()), + subscribe: PathPrefixes::from(vec![PathOwned::from(String::new())]), + publish: PathPrefixes::from(vec![PathOwned::from(String::new())]), + internal: false, + }; + + let mut subscriber = cluster.subscriber(&token).expect("subscriber"); + assert!( + subscriber.try_announced().is_none(), + "token.root=.internal must not be able to read mesh registrations" + ); + + let publisher = cluster.publisher(&token).expect("publisher"); + let attempt = Broadcast::new().produce(); + // `origins/attacker` relative to root `.internal` is absolute `.internal/origins/attacker`. + assert!( + !publisher.publish_broadcast("origins/attacker", attempt.consume()), + "token.root=.internal must not be able to publish mesh registrations" + ); + } + + /// Regression test for the static-peer survival fix: gossip-driven unannounces must + /// not abort the reconnect loop of a peer that was statically configured via + /// `--cluster-connect`. Before the fix the active map didn't track provenance, so an + /// unannounce of a peer that appeared in both `connect` and gossip removed it, + /// permanently breaking that static peer's reconnect. + #[tokio::test] + async fn gossip_unannounce_preserves_static_peer() { + let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); + // Stand in for a real dial task; never polled but provides an AbortHandle. + let placeholder = tokio::spawn(std::future::pending::<()>()); + let static_handle = placeholder.abort_handle(); + active.lock().unwrap().insert( + "static-peer.example.com:4443".to_string(), + DialEntry { + handle: static_handle, + is_static: true, + }, + ); + + Cluster::handle_gossip_unannounce(&active, "static-peer.example.com:4443"); + assert!( + active.lock().unwrap().contains_key("static-peer.example.com:4443"), + "static peer entry must survive a gossip unannounce" + ); + + // Now insert a discovered peer and confirm unannounce DOES drop it. + let discovered = tokio::spawn(std::future::pending::<()>()); + active.lock().unwrap().insert( + "discovered.example.com:4443".to_string(), + DialEntry { + handle: discovered.abort_handle(), + is_static: false, + }, + ); + Cluster::handle_gossip_unannounce(&active, "discovered.example.com:4443"); + assert!( + !active.lock().unwrap().contains_key("discovered.example.com:4443"), + "discovered peer entry should be removed on gossip unannounce" + ); + } + /// mTLS sessions see the mesh registrations they need to route between cluster peers. #[tokio::test] async fn internal_paths_visible_to_mtls_token() { From 0524a101c4e8827a79c4ce0abc6b96726cddf836 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 25 May 2026 14:18:12 -0700 Subject: [PATCH 3/9] moq-relay: make --cluster-node usable as passive rendezvous The README and cluster docs both describe `--cluster-node` alone as a valid "rendezvous" mode: the relay advertises its URL on the cluster origin and accepts inbound cluster sessions, but does not dial out. The actual `run()` implementation was inconsistent with this: - It required `Cluster::with_client` whenever any cluster work was configured, including node-only mode. A relay without outbound dials has no need for a QUIC client. - It spawned the gossip discovery task whenever `node` was set, even with no `connect` peers. A passive rendezvous can't usefully dial peers it learns about (they already have inbound sessions to us; dialing back creates duplicates). - The `_self_registration` BroadcastProducer was held in a local. If no dial tasks were spawned, the `while tasks.join_next()` loop exited immediately, dropping the broadcast and unannouncing the relay before any peer could see it. Fix: - Split `has_work` into `has_outbound = !connect.is_empty()` and `has_work = has_outbound || node.is_some()`. - `with_client` is only required when `has_outbound`. - Discovery is only spawned when `has_outbound` AND `node.is_some()`. - When no tasks are spawned (passive rendezvous), park forever with `std::future::pending().await` so the self-registration broadcast stays alive. `cluster.run()` is one arm of a `tokio::select!` in `main.rs`, so the process still exits cleanly via the other arms. Tests: - `passive_rendezvous_runs_without_client_and_advertises_self` builds a Cluster with only `node` set, spawns `run()`, asserts the self-registration is visible on the origin, and confirms `run()` parks (doesn't return) so the registration stays published. Co-Authored-By: Claude Opus 4.7 (1M context) --- rs/moq-relay/src/cluster.rs | 97 +++++++++++++++++++++++++++++-------- 1 file changed, 78 insertions(+), 19 deletions(-) diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index eac8fabbf..e7cd35c08 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -165,14 +165,21 @@ impl Cluster { } } - /// Runs the cluster event loop: dial static `--cluster-connect` peers, publish a - /// self-registration broadcast for `--cluster-node` (if set), and watch for other - /// peers' registrations to discover and dial them. + /// Runs the cluster event loop. Three modes, derived from config: /// - /// Completes once all dials have given up; a node with no peers and no self URL - /// returns immediately. Errors: + /// - **Standalone** (`connect` empty, `node` unset): returns immediately. + /// - **Passive rendezvous** (`node` set, `connect` empty): publishes the + /// self-registration broadcast and parks. The relay accepts inbound cluster + /// sessions through the moq-native server but does not dial out, so no QUIC + /// client is required. + /// - **Active** (`connect` non-empty, with or without `node`): requires a QUIC + /// client. Dials each static peer with exponential-backoff retry. If `node` + /// is also set, advertises self and watches `.internal/origins/*` to discover + /// and dial additional peers. + /// + /// Errors: /// - if `cluster.root` / `--cluster-root` is set (removed flag); - /// - if any cluster work is configured but no QUIC client has been attached via + /// - if `connect` is non-empty but no QUIC client was attached via /// [`with_client`](Self::with_client). pub async fn run(self) -> anyhow::Result<()> { if let Some(root) = &self.config.root { @@ -184,16 +191,19 @@ impl Cluster { ); } - let has_work = !self.config.connect.is_empty() || self.config.node.is_some(); + let has_outbound = !self.config.connect.is_empty(); + let has_work = has_outbound || self.config.node.is_some(); if !has_work { tracing::info!("no cluster peers configured; running standalone"); return Ok(()); } - anyhow::ensure!( - self.client.is_some(), - "cluster peers configured but no QUIC client attached (call Cluster::with_client)" - ); + if has_outbound { + anyhow::ensure!( + self.client.is_some(), + "cluster peers configured but no QUIC client attached (call Cluster::with_client)" + ); + } let token = match &self.config.token { Some(path) => std::fs::read_to_string(path) @@ -228,14 +238,27 @@ impl Cluster { Self::spawn_dial(&mut tasks, &active, self.clone(), peer.clone(), token.clone(), true); } - // Spawn the gossip discovery task if --cluster-node is set. - if let Some(self_url) = self.config.node.clone() { - let this = self.clone(); - let token = token.clone(); - let active = active.clone(); - tasks.spawn(async move { - this.run_discovery(self_url, token, active).await; - }); + // Spawn the gossip discovery task only when we have at least one outbound peer + // to bootstrap from. A node-only relay (passive rendezvous) has no use for + // discovery: it accepts inbound sessions and shouldn't dial peers back, since + // those peers already have a session to us. + if has_outbound { + if let Some(self_url) = self.config.node.clone() { + let this = self.clone(); + let token = token.clone(); + let active = active.clone(); + tasks.spawn(async move { + this.run_discovery(self_url, token, active).await; + }); + } + } + + if tasks.is_empty() { + // Passive rendezvous: nothing to wait on, but we must hold + // `_self_registration` alive so inbound peers continue to see our + // advertisement. Park here forever; `cluster.run()` is one arm of + // `tokio::select!` in main.rs, so the process still exits via the other arms. + std::future::pending::<()>().await } while tasks.join_next().await.is_some() {} @@ -572,6 +595,42 @@ mod tests { assert!(format!("{err}").contains("cluster.root")); } + /// A relay configured with only `cluster.node` (passive rendezvous) must run + /// without a QUIC client, publish its self-registration on the cluster origin, + /// and keep that registration alive (i.e. not exit and drop the broadcast). + #[tokio::test(start_paused = true)] + async fn passive_rendezvous_runs_without_client_and_advertises_self() { + let cluster = Cluster::new(ClusterConfig { + node: Some("rendezvous.example.com:4443".to_string()), + ..Default::default() + }); + + // Snapshot a consumer on the cluster origin before run() takes ownership of + // `cluster` so we can later check that the registration was published. + let mut watcher = cluster.origin.consume(); + + let cluster_run = cluster.clone(); + let mut handle = tokio::spawn(async move { cluster_run.run().await }); + + // Give the runtime a moment to execute the synchronous setup work. + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + // The self-registration broadcast must be visible on the origin. + let (path, broadcast) = watcher.try_announced().expect("self-registration must be published"); + assert_eq!(path.as_str(), ".internal/origins/rendezvous.example.com:4443"); + assert!(broadcast.is_some()); + + // run() must NOT have returned: dropping the broadcast (via run returning) + // would unannounce the registration immediately. Use a short timeout to + // confirm we're still parked. + let still_running = tokio::time::timeout(tokio::time::Duration::from_millis(50), &mut handle) + .await + .is_err(); + assert!(still_running, "passive rendezvous run() should park, not return"); + + handle.abort(); + } + /// `cluster.node` round-trips through TOML and CLI. #[test] fn cluster_node_round_trips() { From c2936e47f566e5c62fa11f24339e8c8d2d7cd219 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 26 May 2026 08:55:59 -0700 Subject: [PATCH 4/9] moq-relay: rename self-advertise flag to --cluster-mesh; deprecate --cluster-node Use `--cluster-mesh ` for the gossip self-advertisement instead of reusing the old `--cluster-node` name. Both `--cluster-root` and `--cluster-node` (and their TOML counterparts) now error at startup with a migration message pointing at `--cluster-connect` and `--cluster-mesh`. `cluster.node` was a brief detour: the pre-91ea43c5 design used it for the self-URL, then the rewrite removed it, and the prior commit on this branch re-added it with the same meaning. Renaming to `mesh` makes the role explicit and frees `node` for any future per-instance identity work without overlap. ClusterConfig now has: - `connect: Vec` (unchanged) -- static peer URLs to dial. - `mesh: Option` (new) -- our own URL, advertised for gossip discovery. - `node: Option` (hidden, deprecation-only) -- bails if set. - `root: Option` (hidden, deprecation-only) -- bails if set. Both deprecation messages reference the same two replacement flags so the operator only has to learn one mapping. Tests: - Renamed `cluster_node_round_trips` -> `cluster_mesh_round_trips`. - Added `cluster_node_errors_with_migration_message` (CLI) and `cluster_node_toml_parses_then_errors` (TOML) mirroring the root tests. - `passive_rendezvous_runs_without_client_and_advertises_self` now uses `mesh: Some(...)`. Docs: `doc/bin/relay/cluster.md`, `doc/bin/relay/config.md`, `rs/moq-relay/README.md` updated. The migration table now covers both `node -> mesh` and `root -> connect`. Demos: `demo/relay/leaf{0,1}.toml` switched from `node =` to `mesh =`. Co-Authored-By: Claude Opus 4.7 (1M context) --- demo/relay/leaf0.toml | 4 +- demo/relay/leaf1.toml | 2 +- doc/bin/relay/cluster.md | 31 +++++------ doc/bin/relay/config.md | 2 +- rs/moq-relay/README.md | 6 +-- rs/moq-relay/src/cluster.rs | 105 ++++++++++++++++++++++++++---------- 6 files changed, 101 insertions(+), 49 deletions(-) diff --git a/demo/relay/leaf0.toml b/demo/relay/leaf0.toml index d12099742..0c6f6bf2c 100644 --- a/demo/relay/leaf0.toml +++ b/demo/relay/leaf0.toml @@ -36,12 +36,12 @@ listen = "[::]:4444" [cluster] # Dial the root for the initial cluster session. The root then gossips our -# `node` URL to other leaves via the .internal/origins/ mesh path so +# `mesh` URL to other leaves via the .internal/origins/ mesh path so # they discover and dial us back. leaf1 reaches us this way, forming a loop # (leaf1 -> root, leaf1 -> leaf0, leaf0 -> root) that exercises cluster # loop detection. connect = ["localhost:4443"] -node = "localhost:4444" +mesh = "localhost:4444" [auth] # Allow JWT tokens that are signed by this root key. diff --git a/demo/relay/leaf1.toml b/demo/relay/leaf1.toml index 19a045b49..09a8e255f 100644 --- a/demo/relay/leaf1.toml +++ b/demo/relay/leaf1.toml @@ -41,7 +41,7 @@ listen = "[::]:4445" # exercises cluster loop detection. The mesh edge is now discovered, not # hardcoded (ex. imagine us-east learning about us-west via us-central). connect = ["localhost:4443"] -node = "localhost:4445" +mesh = "localhost:4445" [auth] # Allow JWT tokens that are signed by this root key. diff --git a/doc/bin/relay/cluster.md b/doc/bin/relay/cluster.md index e4e72885b..638ecd853 100644 --- a/doc/bin/relay/cluster.md +++ b/doc/bin/relay/cluster.md @@ -24,29 +24,29 @@ Each entry in `connect` is dialed at startup and kept alive with exponential bac ### Gossip discovery -Each relay sets `node` to its own externally-reachable URL. Connecting to a single peer is enough; that peer gossips the new node's address to everyone else. +Each relay sets `mesh` to its own externally-reachable URL. Connecting to a single peer is enough; that peer gossips the new relay's address to everyone else. ```toml # On the rendezvous (every other relay connects here) [cluster] -node = "rendezvous.example.com:4443" +mesh = "rendezvous.example.com:4443" # On a leaf joining the cluster [cluster] -node = "us-east.example.com:4443" +mesh = "us-east.example.com:4443" connect = ["rendezvous.example.com:4443"] ``` -When a leaf with `node` set connects to `rendezvous`, it publishes a registration broadcast at `.internal/origins/` on the cluster origin. Other peers reachable from `rendezvous` see the registration and dial the new leaf, building a full mesh. Removing a node unannounces its registration, which aborts the dial on every other peer. +When a leaf with `mesh` set connects to `rendezvous`, it publishes a registration broadcast at `.internal/origins/` on the cluster origin. Other peers reachable from `rendezvous` see the registration and dial the new leaf, building a full mesh. Removing a node unannounces its registration, which aborts the dial on every other peer. -A relay with `node` set and no `connect` entries waits passively for inbound connections. A relay with `connect` and no `node` dials peers but isn't itself advertised, so others won't discover it via gossip. +A relay with `mesh` set and no `connect` entries waits passively for inbound connections (it does not need a QUIC client). A relay with `connect` and no `mesh` dials peers but isn't itself advertised, so others won't discover it via gossip. ## How gossip works -1. On startup, a relay with `cluster.node = ""` publishes a placeholder broadcast at `.internal/origins/` on its own origin. The broadcast carries no tracks: the path is the registration. +1. On startup, a relay with `cluster.mesh = ""` publishes a placeholder broadcast at `.internal/origins/` on its own origin. The broadcast carries no tracks: the path is the registration. 2. Cluster sessions exchange their origins both ways. The registration propagates to every connected peer, accumulating a hop chain along the way. -3. Each peer watches `.internal/origins/*` for newly announced URLs and dials any it isn't already connected to. Dials are deduplicated by URL, so a peer reached via both `connect` and gossip uses a single session. -4. When a peer goes away, its registration is unannounced and every other relay aborts the dial it spawned in response. +3. Each peer watches `.internal/origins/*` for newly announced URLs and dials any it isn't already connected to. Dials are deduplicated by URL, so a peer reached via both `connect` and gossip uses a single session. Static `connect` peers are exempt from gossip-driven aborts (a peer restart doesn't kill the reconnect loop). +4. When a gossip-discovered peer goes away, its registration is unannounced and every other relay aborts the dial it spawned in response. 5. Loop detection on `publish_broadcast` refuses any broadcast whose hop chain already contains this relay's id, so re-announcing a registration through a longer path is a silent no-op. ## Visibility of `.internal/*` @@ -75,7 +75,7 @@ Each relay reads a JWT from `cluster.token` and presents it on outbound dials. T ```toml [cluster] -node = "us-east.example.com:4443" +mesh = "us-east.example.com:4443" connect = ["rendezvous.example.com:4443"] token = "cluster.jwt" ``` @@ -87,14 +87,14 @@ JWT-authenticated cluster sessions are tagged as external for stats purposes. ** ```text ┌──────────────────────┐ │ rendezvous.exam.com │ - │ cluster.node = ... │ + │ cluster.mesh = ... │ └──┬──────────────┬────┘ │ │ gossip ┌──┘ └──┐ gossip │ │ ┌──────────┴──────┐ ┌────────┴────────┐ │ us-east.exam.com│◀──▶│ eu-west.exam.com│ - │ node + connect │ │ node + connect │ + │ mesh + connect │ │ mesh + connect │ └─────────────────┘ └─────────────────┘ ▲ direct (gossip) ▲ └─────────────────────────┘ @@ -114,14 +114,15 @@ Clients use GeoDNS to connect to the nearest relay automatically. ## Migration from older configs -`cluster.root` was removed in favor of the gossip / static split. If a config still sets it (CLI flag `--cluster-root` or TOML `[cluster] root = "..."`), the relay errors at startup with a message pointing at `--cluster-connect` and `--cluster-node`. Two minimal migrations: +Both `cluster.root` and `cluster.node` were removed. If a config still sets either (CLI flags `--cluster-root` / `--cluster-node` or TOML `[cluster] root = "..."` / `node = "..."`), the relay errors at startup with a message pointing at `--cluster-connect` and `--cluster-mesh`. Minimal migrations: | Old (pre-rewrite) | New equivalent | |---|---| -| `root = "rendezvous:4443"` + `node = "us-east:4443"` | `connect = ["rendezvous:4443"]` + `node = "us-east:4443"` | -| `root = "rendezvous:4443"` (root-only node) | `node = "rendezvous:4443"` (passive rendezvous) | +| `root = "rendezvous:4443"` + `node = "us-east:4443"` | `connect = ["rendezvous:4443"]` + `mesh = "us-east:4443"` | +| `root = "rendezvous:4443"` (root-only node) | `mesh = "rendezvous:4443"` (passive rendezvous) | +| `node = "us-east:4443"` (post-rewrite, briefly) | `mesh = "us-east:4443"` | -The `node` field on leaves keeps its meaning; only the entry-point flag was renamed from `root` to `connect`, and `connect` now accepts a list. +The semantics carry over: `connect` is the static dial list (one-or-more bootstrap peers), `mesh` is this relay's own advertised URL for gossip discovery. ## Next steps diff --git a/doc/bin/relay/config.md b/doc/bin/relay/config.md index 2d0942862..ef980067c 100644 --- a/doc/bin/relay/config.md +++ b/doc/bin/relay/config.md @@ -126,7 +126,7 @@ connect = ["rendezvous.example.com:4443"] # This relay's own externally-reachable URL. When set, the relay advertises # itself on the cluster origin so peers reached via `connect` discover and # dial it. Omit for a relay that should not appear in the gossip mesh. -node = "us-east.example.com:4443" +mesh = "us-east.example.com:4443" # JWT used for outbound cluster dials (alternative to mTLS). token = "cluster.jwt" diff --git a/rs/moq-relay/README.md b/rs/moq-relay/README.md index 327aa1ce1..98f6feaea 100644 --- a/rs/moq-relay/README.md +++ b/rs/moq-relay/README.md @@ -65,13 +65,13 @@ A user connects to the nearest relay and the cluster routes broadcasts between p There are two ways to form a cluster, which can be combined: - **Static topology** — `--cluster-connect ` (repeatable or comma-separated). Each peer is dialed at startup and kept alive with exponential backoff. Best for 2-5 stable nodes; no discovery. -- **Gossip discovery** — `--cluster-node `. This relay advertises its URL on the cluster origin so peers reached via `--cluster-connect` discover and dial it. Pair with `--cluster-connect ` to join an existing mesh. +- **Gossip discovery** — `--cluster-mesh `. This relay advertises its URL on the cluster origin so peers reached via `--cluster-connect` discover and dial it. Pair with `--cluster-connect ` to join an existing mesh. -A relay with only `--cluster-node` set waits passively for inbound connections (acts as a rendezvous). A relay with both flags dials the rendezvous, gossips itself, and dials every peer it learns about. +A relay with only `--cluster-mesh` set waits passively for inbound connections (acts as a rendezvous; no QUIC client required). A relay with both flags dials the rendezvous, gossips itself, and dials every peer it learns about. Mesh registrations live at `.internal/origins/` on the cluster origin. That namespace is mTLS-only: JWT and anonymous sessions never see or publish into `.internal/*` regardless of their declared scope. -> `--cluster-root` was removed. If you have it in an existing config, the relay errors at startup with a message pointing at the replacements above. +> `--cluster-root` and `--cluster-node` were removed. If you have either in an existing config, the relay errors at startup with a message pointing at `--cluster-connect` and `--cluster-mesh`. See [doc/bin/relay/cluster.md](https://github.com/moq-dev/moq/blob/main/doc/bin/relay/cluster.md) for the full walkthrough, including mTLS setup and a 3-node example. diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index e7cd35c08..746971a10 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -31,9 +31,10 @@ struct DialEntry { /// /// - **Static** ([`Self::connect`]): explicit list of peer URLs to dial. Each is kept /// alive for the session lifetime; no discovery happens. -/// - **Gossip** ([`Self::node`] + at least one [`Self::connect`] entry): advertise -/// this relay's URL on the cluster origin so connected peers discover and dial it, -/// and watch for the advertisements of others so we dial them too. +/// - **Gossip** ([`Self::mesh`]): advertise this relay's URL on the cluster origin so +/// connected peers discover and dial it, and watch for the advertisements of others +/// so we dial them too. Pair with [`Self::connect`] to bootstrap into an existing +/// mesh, or set alone to act as a passive rendezvous. /// /// Hop-based routing on broadcasts prevents announcement loops regardless of mode. #[serde_with::serde_as] @@ -58,14 +59,18 @@ pub struct ClusterConfig { /// This relay's own externally-reachable URL. When set, the relay publishes its address /// on the cluster origin (under `.internal/origins/`) so other mTLS-authenticated /// peers can discover and dial it. Pair with [`Self::connect`] to reach an initial peer - /// who will gossip your address onward. - #[arg(id = "cluster-node", long = "cluster-node", env = "MOQ_CLUSTER_NODE")] - pub node: Option, + /// who will gossip your address onward, or set alone for passive rendezvous. + #[arg(id = "cluster-mesh", long = "cluster-mesh", env = "MOQ_CLUSTER_MESH")] + pub mesh: Option, /// Use the token in this file when connecting to other nodes. #[arg(id = "cluster-token", long = "cluster-token", env = "MOQ_CLUSTER_TOKEN")] pub token: Option, + /// Removed; present only to emit a migration error. Use [`Self::mesh`] instead. + #[arg(id = "cluster-node", long = "cluster-node", env = "MOQ_CLUSTER_NODE", hide = true)] + pub node: Option, + /// Removed; present only to emit a migration error. Use [`Self::connect`] instead. #[arg(id = "cluster-root", long = "cluster-root", env = "MOQ_CLUSTER_ROOT", hide = true)] pub root: Option, @@ -167,32 +172,43 @@ impl Cluster { /// Runs the cluster event loop. Three modes, derived from config: /// - /// - **Standalone** (`connect` empty, `node` unset): returns immediately. - /// - **Passive rendezvous** (`node` set, `connect` empty): publishes the + /// - **Standalone** (`connect` empty, `mesh` unset): returns immediately. + /// - **Passive rendezvous** (`mesh` set, `connect` empty): publishes the /// self-registration broadcast and parks. The relay accepts inbound cluster /// sessions through the moq-native server but does not dial out, so no QUIC /// client is required. - /// - **Active** (`connect` non-empty, with or without `node`): requires a QUIC - /// client. Dials each static peer with exponential-backoff retry. If `node` + /// - **Active** (`connect` non-empty, with or without `mesh`): requires a QUIC + /// client. Dials each static peer with exponential-backoff retry. If `mesh` /// is also set, advertises self and watches `.internal/origins/*` to discover /// and dial additional peers. /// /// Errors: /// - if `cluster.root` / `--cluster-root` is set (removed flag); + /// - if `cluster.node` / `--cluster-node` is set (renamed to `cluster.mesh`); /// - if `connect` is non-empty but no QUIC client was attached via /// [`with_client`](Self::with_client). pub async fn run(self) -> anyhow::Result<()> { if let Some(root) = &self.config.root { anyhow::bail!( "`cluster.root` / `--cluster-root` was removed (value: {root:?}). \ - Use `--cluster-connect ` for static peer connections, or \ - `--cluster-node ` to gossip this relay's address so other peers \ - can discover and dial it. See https://doc.moq.dev/bin/relay/cluster." + Use `--cluster-connect ` to dial cluster peers, and \ + optionally `--cluster-mesh ` to gossip this relay's address \ + so other peers can discover and dial it. \ + See https://doc.moq.dev/bin/relay/cluster." + ); + } + if let Some(node) = &self.config.node { + anyhow::bail!( + "`cluster.node` / `--cluster-node` was renamed (value: {node:?}). \ + Use `--cluster-connect ` to dial cluster peers, and \ + optionally `--cluster-mesh ` to gossip this relay's address \ + so other peers can discover and dial it. \ + See https://doc.moq.dev/bin/relay/cluster." ); } let has_outbound = !self.config.connect.is_empty(); - let has_work = has_outbound || self.config.node.is_some(); + let has_work = has_outbound || self.config.mesh.is_some(); if !has_work { tracing::info!("no cluster peers configured; running standalone"); return Ok(()); @@ -215,13 +231,13 @@ impl Cluster { // Hold the self-registration broadcast alive for the lifetime of `run`. Dropping // it would unannounce immediately and tell peers we've left. - let _self_registration: Option = self.config.node.as_deref().map(|node| { - let path = Path::new(MESH_PREFIX).join(node); + let _self_registration: Option = self.config.mesh.as_deref().map(|mesh| { + let path = Path::new(MESH_PREFIX).join(mesh); let broadcast = self .origin .create_broadcast(&path) .expect(".internal/origins is within the relay origin's root"); - tracing::info!(%node, %path, "advertising cluster node"); + tracing::info!(url = %mesh, %path, "advertising cluster mesh URL"); broadcast }); @@ -239,11 +255,11 @@ impl Cluster { } // Spawn the gossip discovery task only when we have at least one outbound peer - // to bootstrap from. A node-only relay (passive rendezvous) has no use for + // to bootstrap from. A mesh-only relay (passive rendezvous) has no use for // discovery: it accepts inbound sessions and shouldn't dial peers back, since // those peers already have a session to us. if has_outbound { - if let Some(self_url) = self.config.node.clone() { + if let Some(self_url) = self.config.mesh.clone() { let this = self.clone(); let token = token.clone(); let active = active.clone(); @@ -572,7 +588,22 @@ mod tests { let msg = format!("{err}"); assert!(msg.contains("cluster.root"), "missing cluster.root in: {msg}"); assert!(msg.contains("--cluster-connect"), "missing --cluster-connect in: {msg}"); - assert!(msg.contains("--cluster-node"), "missing --cluster-node in: {msg}"); + assert!(msg.contains("--cluster-mesh"), "missing --cluster-mesh in: {msg}"); + } + + /// Setting `cluster.node` (the renamed flag) at startup must surface a migration + /// message that names both replacement flags. + #[tokio::test] + async fn cluster_node_errors_with_migration_message() { + let config = ClusterConfig { + node: Some("legacy-node.example.com:4443".to_string()), + ..Default::default() + }; + let err = Cluster::new(config).run().await.expect_err("should error"); + let msg = format!("{err}"); + assert!(msg.contains("cluster.node"), "missing cluster.node in: {msg}"); + assert!(msg.contains("--cluster-connect"), "missing --cluster-connect in: {msg}"); + assert!(msg.contains("--cluster-mesh"), "missing --cluster-mesh in: {msg}"); } /// `cluster.root` parsed from TOML triggers the same migration error. @@ -595,13 +626,33 @@ mod tests { assert!(format!("{err}").contains("cluster.root")); } - /// A relay configured with only `cluster.node` (passive rendezvous) must run + /// `cluster.node` parsed from TOML triggers the same migration error. + #[test] + fn cluster_node_toml_parses_then_errors() { + let toml = "[cluster]\nnode = \"legacy-node.example.com:4443\"\n"; + let dir = std::env::temp_dir().join("moq-relay-cluster-test"); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join("cluster-node-toml.toml"); + std::fs::write(&path, toml).unwrap(); + + let args = vec![std::ffi::OsString::from("moq-relay"), std::ffi::OsString::from(&path)]; + let config = Config::parse_and_merge(args).expect("config load"); + assert_eq!(config.cluster.node.as_deref(), Some("legacy-node.example.com:4443")); + + let rt = tokio::runtime::Runtime::new().unwrap(); + let err = rt + .block_on(Cluster::new(config.cluster).run()) + .expect_err("should error"); + assert!(format!("{err}").contains("cluster.node")); + } + + /// A relay configured with only `cluster.mesh` (passive rendezvous) must run /// without a QUIC client, publish its self-registration on the cluster origin, /// and keep that registration alive (i.e. not exit and drop the broadcast). #[tokio::test(start_paused = true)] async fn passive_rendezvous_runs_without_client_and_advertises_self() { let cluster = Cluster::new(ClusterConfig { - node: Some("rendezvous.example.com:4443".to_string()), + mesh: Some("rendezvous.example.com:4443".to_string()), ..Default::default() }); @@ -631,18 +682,18 @@ mod tests { handle.abort(); } - /// `cluster.node` round-trips through TOML and CLI. + /// `cluster.mesh` round-trips through TOML and CLI. #[test] - fn cluster_node_round_trips() { - let toml = "[cluster]\nnode = \"us-east.example.com:4443\"\nconnect = [\"root.example.com:4443\"]\n"; + fn cluster_mesh_round_trips() { + let toml = "[cluster]\nmesh = \"us-east.example.com:4443\"\nconnect = [\"root.example.com:4443\"]\n"; let dir = std::env::temp_dir().join("moq-relay-cluster-test"); std::fs::create_dir_all(&dir).unwrap(); - let path = dir.join("cluster-node-toml.toml"); + let path = dir.join("cluster-mesh-toml.toml"); std::fs::write(&path, toml).unwrap(); let args = vec![std::ffi::OsString::from("moq-relay"), std::ffi::OsString::from(&path)]; let config = Config::parse_and_merge(args).expect("config load"); - assert_eq!(config.cluster.node.as_deref(), Some("us-east.example.com:4443")); + assert_eq!(config.cluster.mesh.as_deref(), Some("us-east.example.com:4443")); assert_eq!(config.cluster.connect, vec!["root.example.com:4443".to_string()]); } } From ea4189ea526cacecd2f3eeda604bf65c3eaaed66 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 26 May 2026 08:59:54 -0700 Subject: [PATCH 5/9] CLAUDE.md: note PR title/description maintenance when iterating on a PR Add a short section about keeping `gh pr edit --title/--body` in sync when later commits change scope (renamed flag, reshaped API, extra fix landed). The PR body becomes the squash-merge commit message, so a stale body means a misleading entry in `git log` permanently. Calls out the common drift spots and reminds to keep the `(Written by Claude)` marker on edits. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 5e48ad6fb..bcb385b2b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -157,3 +157,15 @@ When making changes to the codebase: ## PR Reviews CodeRabbit reviews PRs automatically, but it has an hourly quota and runs out of org credits. If a PR shows a "Review limit reached" / "out of usage credits" message instead of an actual review, run the `/review` skill locally against the PR to get review feedback without waiting for the quota to refill. + +## PR Title and Description Maintenance + +When pushing additional commits to an existing PR, check whether the title and description still describe the change accurately. They often go stale during review iterations: a flag gets renamed, an API gets reshaped, an extra fix lands, etc. The PR description is what shows up in the squash-merge commit, so a stale title/body means a misleading entry in `git log` forever. + +Update them with `gh pr edit --title "..." --body "..."` whenever the scope shifts. Specifically watch for: + +- Flags, file names, or public APIs renamed in later commits but still referenced by their old name in the PR body. +- Bullet points in the "Summary" section that describe behavior the latest commits have changed or removed. +- The test-plan checklist getting out of date as new tests are added. + +When you edit a PR description you authored, keep the `(Written by Claude)` marker so reviewers still know the body wasn't human-authored. From c81ce3c1200fd1bd842dab14c567354643cd5e63 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 26 May 2026 09:13:11 -0700 Subject: [PATCH 6/9] moq-relay: address cluster review nits Three small fixes from PR review on c2936e47: 1. doc/bin/relay/cluster.md: the "Removing a node unannounces..." sentence said the dial is aborted "on every other peer", but static `connect` peers are exempt by design. Reword to match the gossip-only abort behavior plus the static exemption already documented further down. 2. rs/moq-relay/README.md: replace em dashes in the clustering bullets with parenthetical phrasing to comply with the repo's no-em-dash prose rule. 3. rs/moq-relay/src/cluster.rs: trim verbose comments in `run()` per the project's comment guideline. The function docblock is shorter, and the inline notes around self-registration, dial tracking, discovery gating, and the parking branch are reduced to non-obvious why points. DialEntry's own docblock already covers the static/discovered split, so the duplicate explanation in `active`'s declaration is removed. No behavior change; 10 cluster tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- doc/bin/relay/cluster.md | 2 +- rs/moq-relay/README.md | 4 ++-- rs/moq-relay/src/cluster.rs | 45 ++++++++++++------------------------- 3 files changed, 17 insertions(+), 34 deletions(-) diff --git a/doc/bin/relay/cluster.md b/doc/bin/relay/cluster.md index 638ecd853..1e8dcb22e 100644 --- a/doc/bin/relay/cluster.md +++ b/doc/bin/relay/cluster.md @@ -37,7 +37,7 @@ mesh = "us-east.example.com:4443" connect = ["rendezvous.example.com:4443"] ``` -When a leaf with `mesh` set connects to `rendezvous`, it publishes a registration broadcast at `.internal/origins/` on the cluster origin. Other peers reachable from `rendezvous` see the registration and dial the new leaf, building a full mesh. Removing a node unannounces its registration, which aborts the dial on every other peer. +When a leaf with `mesh` set connects to `rendezvous`, it publishes a registration broadcast at `.internal/origins/` on the cluster origin. Other peers reachable from `rendezvous` see the registration and dial the new leaf, building a full mesh. Removing a node unannounces its registration, which aborts every dial that was spawned in response to that gossip. Static `connect` peers are exempt: their reconnect loop keeps running through unannounces, so a remote restart doesn't permanently break a hand-configured edge. A relay with `mesh` set and no `connect` entries waits passively for inbound connections (it does not need a QUIC client). A relay with `connect` and no `mesh` dials peers but isn't itself advertised, so others won't discover it via gossip. diff --git a/rs/moq-relay/README.md b/rs/moq-relay/README.md index 98f6feaea..53ae68d3d 100644 --- a/rs/moq-relay/README.md +++ b/rs/moq-relay/README.md @@ -64,8 +64,8 @@ A user connects to the nearest relay and the cluster routes broadcasts between p **moq-relay** layers clustering on top of moq-lite: every cluster peer publishes into the same logical origin, with a hop list on each broadcast for loop detection and shortest-path preference. There are two ways to form a cluster, which can be combined: -- **Static topology** — `--cluster-connect ` (repeatable or comma-separated). Each peer is dialed at startup and kept alive with exponential backoff. Best for 2-5 stable nodes; no discovery. -- **Gossip discovery** — `--cluster-mesh `. This relay advertises its URL on the cluster origin so peers reached via `--cluster-connect` discover and dial it. Pair with `--cluster-connect ` to join an existing mesh. +- **Static topology** (`--cluster-connect `, repeatable or comma-separated). Each peer is dialed at startup and kept alive with exponential backoff. Best for 2-5 stable nodes; no discovery. +- **Gossip discovery** (`--cluster-mesh `). This relay advertises its URL on the cluster origin so peers reached via `--cluster-connect` discover and dial it. Pair with `--cluster-connect ` to join an existing mesh. A relay with only `--cluster-mesh` set waits passively for inbound connections (acts as a rendezvous; no QUIC client required). A relay with both flags dials the rendezvous, gossips itself, and dials every peer it learns about. diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index 746971a10..9222a9f9e 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -170,23 +170,16 @@ impl Cluster { } } - /// Runs the cluster event loop. Three modes, derived from config: + /// Runs the cluster event loop. /// - /// - **Standalone** (`connect` empty, `mesh` unset): returns immediately. - /// - **Passive rendezvous** (`mesh` set, `connect` empty): publishes the - /// self-registration broadcast and parks. The relay accepts inbound cluster - /// sessions through the moq-native server but does not dial out, so no QUIC - /// client is required. - /// - **Active** (`connect` non-empty, with or without `mesh`): requires a QUIC - /// client. Dials each static peer with exponential-backoff retry. If `mesh` - /// is also set, advertises self and watches `.internal/origins/*` to discover - /// and dial additional peers. + /// Modes are derived from config: standalone (no work) returns immediately; + /// passive rendezvous (`mesh` only) parks after publishing self-registration + /// and does not require a QUIC client; active (`connect` non-empty) dials + /// peers and, if `mesh` is also set, runs gossip discovery. /// - /// Errors: - /// - if `cluster.root` / `--cluster-root` is set (removed flag); - /// - if `cluster.node` / `--cluster-node` is set (renamed to `cluster.mesh`); - /// - if `connect` is non-empty but no QUIC client was attached via - /// [`with_client`](Self::with_client). + /// Bails when removed flags `cluster.root` / `cluster.node` are set, or when + /// `connect` is non-empty but no client was attached via + /// [`with_client`](Self::with_client). pub async fn run(self) -> anyhow::Result<()> { if let Some(root) = &self.config.root { anyhow::bail!( @@ -229,8 +222,7 @@ impl Cluster { None => String::new(), }; - // Hold the self-registration broadcast alive for the lifetime of `run`. Dropping - // it would unannounce immediately and tell peers we've left. + // Held in scope so the registration stays announced until `run` exits. let _self_registration: Option = self.config.mesh.as_deref().map(|mesh| { let path = Path::new(MESH_PREFIX).join(mesh); let broadcast = self @@ -241,23 +233,16 @@ impl Cluster { broadcast }); - // Track active dial tasks by URL so static and gossip-discovered peers don't - // duplicate, and so the discovery side can abort a discovered peer's task when - // it unannounces. Static peers carry `is_static = true` and are exempt from - // unannounce-driven aborts. let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); - let mut tasks = tokio::task::JoinSet::new(); - // Seed static peers from --cluster-connect. for peer in &self.config.connect { Self::spawn_dial(&mut tasks, &active, self.clone(), peer.clone(), token.clone(), true); } - // Spawn the gossip discovery task only when we have at least one outbound peer - // to bootstrap from. A mesh-only relay (passive rendezvous) has no use for - // discovery: it accepts inbound sessions and shouldn't dial peers back, since - // those peers already have a session to us. + // A mesh-only relay (passive rendezvous) already has inbound sessions from + // every peer that knows about it; running discovery would only create + // duplicate outbound sessions. if has_outbound { if let Some(self_url) = self.config.mesh.clone() { let this = self.clone(); @@ -270,10 +255,8 @@ impl Cluster { } if tasks.is_empty() { - // Passive rendezvous: nothing to wait on, but we must hold - // `_self_registration` alive so inbound peers continue to see our - // advertisement. Park here forever; `cluster.run()` is one arm of - // `tokio::select!` in main.rs, so the process still exits via the other arms. + // Passive rendezvous: park to keep `_self_registration` alive. The + // process still exits via the other arms of `tokio::select!` in main. std::future::pending::<()>().await } From 1094b83fc0098758f9f42e1497424ccfbf367513 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 26 May 2026 09:28:56 -0700 Subject: [PATCH 7/9] moq-relay: simplify cluster docs and drop the OriginProducer block() view Address review feedback on the cluster work: - Revert OriginProducer::block / OriginConsumer::block and the `Cluster::access_origin` helper. The relay relies on token scopes to keep customers away from `.internal/*`; if a token's subscribe scope is broad enough to see infrastructure paths, that's an auth configuration issue, not something the origin view should defensively paper over. Removes the `blocked` field, `is_blocked` helpers, `new_with_blocked`, and the five block-related tests in moq-net (no behavior change for any other caller). Cluster subscriber/publisher collapse back to the simple `with_root(token.root).scope(token.subscribe).consume()` form. - Rewrite doc/bin/relay/cluster.md for humans. Open with one sentence on what clustering does (relays joined to proxy announcements/subscriptions). Lead with a multi-hop chain example (eu-west <- us-east <- us-west) that shows the actual point of clustering: a relay closer to viewers caches and dedups fetches back toward the source, which a full mesh would skip. Treat `cluster.mesh` as an optional auto-discovery convenience layered on top, with a one-line note that you still need at least one connection somewhere for advertisements to flow. - Trim rs/moq-relay/README.md and doc/bin/relay/config.md to match the new framing. - Demo configs: drop `mesh =` from leaf0.toml and leaf1.toml. The demo is meant to exercise proxying (and loop detection via leaf1's connect=[root, leaf0] triangle), not gossip. Restore the original comments. No public API change beyond removing block(), which had only landed on this branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- demo/relay/leaf0.toml | 8 +- demo/relay/leaf1.toml | 10 +- doc/bin/relay/cluster.md | 114 ++++------------ doc/bin/relay/config.md | 13 +- rs/moq-net/src/model/origin.rs | 233 +++------------------------------ rs/moq-relay/README.md | 18 +-- rs/moq-relay/src/cluster.rs | 137 ++----------------- 7 files changed, 70 insertions(+), 463 deletions(-) diff --git a/demo/relay/leaf0.toml b/demo/relay/leaf0.toml index 0c6f6bf2c..abb6caef9 100644 --- a/demo/relay/leaf0.toml +++ b/demo/relay/leaf0.toml @@ -35,13 +35,9 @@ tls.root = ["ca.pem"] listen = "[::]:4444" [cluster] -# Dial the root for the initial cluster session. The root then gossips our -# `mesh` URL to other leaves via the .internal/origins/ mesh path so -# they discover and dial us back. leaf1 reaches us this way, forming a loop -# (leaf1 -> root, leaf1 -> leaf0, leaf0 -> root) that exercises cluster -# loop detection. +# Connect only to the root. leaf1 will dial us directly, forming a loop +# (leaf1 -> root, leaf1 -> leaf0, leaf0 -> root) to exercise cluster loop detection. connect = ["localhost:4443"] -mesh = "localhost:4444" [auth] # Allow JWT tokens that are signed by this root key. diff --git a/demo/relay/leaf1.toml b/demo/relay/leaf1.toml index 09a8e255f..e21fddf40 100644 --- a/demo/relay/leaf1.toml +++ b/demo/relay/leaf1.toml @@ -35,13 +35,9 @@ tls.root = ["ca.pem"] listen = "[::]:4445" [cluster] -# Dial only the root explicitly; gossip via .internal/origins/* will discover -# leaf0 and dial it automatically. Combined with leaf0's own outbound connect -# to the root, this still forms the leaf0 <-> leaf1 <-> root loop that -# exercises cluster loop detection. The mesh edge is now discovered, not -# hardcoded (ex. imagine us-east learning about us-west via us-central). -connect = ["localhost:4443"] -mesh = "localhost:4445" +# Connect to both the root and leaf0, forming a loop to exercise cluster loop detection. +# ex. imagine us-east connects to us-central AND directly to us-west. +connect = ["localhost:4443", "localhost:4444"] [auth] # Allow JWT tokens that are signed by this root key. diff --git a/doc/bin/relay/cluster.md b/doc/bin/relay/cluster.md index 1e8dcb22e..64605b76e 100644 --- a/doc/bin/relay/cluster.md +++ b/doc/bin/relay/cluster.md @@ -5,124 +5,66 @@ description: Run multiple moq-relay instances across multiple hosts/regions # Clustering -Multiple relay instances can join a cluster for geographic distribution and improved latency. Every cluster peer publishes into the same logical origin; loop detection and shortest-path preference come from a hop list on each broadcast, so peers can be connected in arbitrary topologies without duplicating data. +Relays can be joined together to proxy announcements and subscriptions between each other. A viewer talks to whichever relay is closest; if their broadcast lives somewhere else in the cluster, the local relay fetches it from a neighbor and caches it. -## Two ways to form a cluster +A broadcast carries a small hop list as it travels. Each relay it passes through adds itself to the list, which is how loops are caught and how the network picks the shortest path when there's more than one. -Pick the mode that matches your operational constraints. Both can be combined in a single deployment. +## Topology -### Static topology +Each relay lists the peers it wants to dial in `cluster.connect`. That's it; the topology is whatever you draw with those links. -Enumerate every peer by URL. Best for small clusters (2-5 nodes) where membership rarely changes. +A simple chain works well when one region is the source and others are caches: -```toml -[cluster] -connect = ["us-east.example.com:4443", "eu-west.example.com:4443"] +```text +eu-west <--- us-east <--- us-west ``` -Each entry in `connect` is dialed at startup and kept alive with exponential backoff. There is no discovery: every new node requires editing every existing config. - -### Gossip discovery - -Each relay sets `mesh` to its own externally-reachable URL. Connecting to a single peer is enough; that peer gossips the new relay's address to everyone else. - ```toml -# On the rendezvous (every other relay connects here) +# us-east.toml [cluster] -mesh = "rendezvous.example.com:4443" +connect = ["eu-west.example.com:4443"] -# On a leaf joining the cluster +# us-west.toml [cluster] -mesh = "us-east.example.com:4443" -connect = ["rendezvous.example.com:4443"] +connect = ["us-east.example.com:4443"] ``` -When a leaf with `mesh` set connects to `rendezvous`, it publishes a registration broadcast at `.internal/origins/` on the cluster origin. Other peers reachable from `rendezvous` see the registration and dial the new leaf, building a full mesh. Removing a node unannounces its registration, which aborts every dial that was spawned in response to that gossip. Static `connect` peers are exempt: their reconnect loop keeps running through unannounces, so a remote restart doesn't permanently break a hand-configured edge. - -A relay with `mesh` set and no `connect` entries waits passively for inbound connections (it does not need a QUIC client). A relay with `connect` and no `mesh` dials peers but isn't itself advertised, so others won't discover it via gossip. - -## How gossip works - -1. On startup, a relay with `cluster.mesh = ""` publishes a placeholder broadcast at `.internal/origins/` on its own origin. The broadcast carries no tracks: the path is the registration. -2. Cluster sessions exchange their origins both ways. The registration propagates to every connected peer, accumulating a hop chain along the way. -3. Each peer watches `.internal/origins/*` for newly announced URLs and dials any it isn't already connected to. Dials are deduplicated by URL, so a peer reached via both `connect` and gossip uses a single session. Static `connect` peers are exempt from gossip-driven aborts (a peer restart doesn't kill the reconnect loop). -4. When a gossip-discovered peer goes away, its registration is unannounced and every other relay aborts the dial it spawned in response. -5. Loop detection on `publish_broadcast` refuses any broadcast whose hop chain already contains this relay's id, so re-announcing a registration through a longer path is a silent no-op. - -## Visibility of `.internal/*` - -Mesh registrations are infrastructure, not user data. The relay restricts the `.internal/` namespace to internal sessions: +A publisher on `eu-west` reaches a viewer on `us-west` through `us-east`. If a second `us-west` viewer subscribes to the same broadcast, `us-east` already has it cached, so only one fetch crosses the Atlantic. A full mesh (every relay dialing every other) would skip the cache entirely and waste an outbound link per pair. -- **mTLS peers** (cluster-to-cluster traffic, authenticated against `tls.root`) see `.internal/*` and can publish into it. This is how registrations flow between relays. -- **JWT-authenticated sessions** are filtered: their subscribe view hides `.internal/*` announcements, and their publish view refuses publishes to `.internal/*`. This holds even for tokens with the broadest possible scope (`subscribe = [""]`, `publish = [""]`). -- **Anonymous sessions** under `auth.public` are bound by the configured public prefixes; `.internal/` is not one of them. +Pick the shape that matches your traffic. Linear chains are great for fanout; small N-way meshes are fine when latency matters more than dedup; mixed shapes work too. -The split is enforced at session acceptance, so there is no way to reach `.internal/*` without first authenticating via a trusted client certificate. +## Auto-discovery -## Peer authentication - -Cluster peers must authenticate to each other before they exchange registrations. Two options: - -### mTLS (recommended for new deployments) - -Configure the relay with `tls.root` pointing at the CA that signed the cluster peer certificates. Inbound connections presenting a valid client cert are granted full access (`AuthToken::unrestricted`) and tagged as internal. Leaves connect outbound with a `client.tls.cert` / `client.tls.key` signed by the same CA. No JWT is required. - -See [Authentication → mTLS Peer Authentication](/bin/relay/auth#mtls-peer-authentication) for the CA setup walkthrough. - -### JWT token - -Each relay reads a JWT from `cluster.token` and presents it on outbound dials. The token must grant full publish and subscribe scope (`publish: ""`, `subscribe: ""`). The receiving relay verifies it like any other JWT. +Listing every peer by hand can get tedious in larger clusters. Set `cluster.mesh` to this relay's own URL and connected peers will discover and dial it back automatically: ```toml [cluster] -mesh = "us-east.example.com:4443" -connect = ["rendezvous.example.com:4443"] -token = "cluster.jwt" +connect = ["us-east.example.com:4443"] +mesh = "us-west.example.com:4443" ``` -JWT-authenticated cluster sessions are tagged as external for stats purposes. **`.internal/*` is mTLS-only**: a JWT session, no matter how broad its scope, is filtered out of `.internal/origins/*` and cannot publish or receive mesh registrations. JWT-only cluster peers can still relay user traffic for each other, but they will not participate in gossip discovery. Use mTLS for any deployment that wants peers to find each other automatically. +Each node with `mesh` set creates a broadcast carrying its address, which other nodes pick up. `connect` is optional once gossip is running, but you still need at least one connection somewhere (either you dial a peer or a peer dials you) for the advertisement to flow. -## Example topology (3-node gossip cluster) +A relay with `mesh` set and no `connect` is a passive rendezvous: it sits and waits for inbound connections, then helps everyone else find each other. -```text - ┌──────────────────────┐ - │ rendezvous.exam.com │ - │ cluster.mesh = ... │ - └──┬──────────────┬────┘ - │ │ - gossip ┌──┘ └──┐ gossip - │ │ - ┌──────────┴──────┐ ┌────────┴────────┐ - │ us-east.exam.com│◀──▶│ eu-west.exam.com│ - │ mesh + connect │ │ mesh + connect │ - └─────────────────┘ └─────────────────┘ - ▲ direct (gossip) ▲ - └─────────────────────────┘ -``` - -`us-east` and `eu-west` each set `connect = ["rendezvous.example.com:4443"]`. The rendezvous gossips them to each other; the resulting topology is a full mesh. +## Authentication -## Production example +Cluster peers must authenticate to each other: -The public CDN at `cdn.moq.dev` uses gossip-style discovery across regions: +- **mTLS** (recommended). Set `tls.root` to the CA that signed the cluster certificates. Inbound connections presenting a valid client cert are granted full access; outbound dials use `client.tls.cert` / `client.tls.key`. +- **JWT**. Each relay reads a token from `cluster.token` and presents it on outbound dials. The token needs broad enough scope to cover whatever paths the cluster carries. -- `usc.cdn.moq.dev` - US Central -- `euc.cdn.moq.dev` - EU Central -- `sea.cdn.moq.dev` - Southeast Asia - -Clients use GeoDNS to connect to the nearest relay automatically. +See [Authentication](/bin/relay/auth) for the full setup. ## Migration from older configs -Both `cluster.root` and `cluster.node` were removed. If a config still sets either (CLI flags `--cluster-root` / `--cluster-node` or TOML `[cluster] root = "..."` / `node = "..."`), the relay errors at startup with a message pointing at `--cluster-connect` and `--cluster-mesh`. Minimal migrations: +`cluster.root` and `cluster.node` were both removed. If a config still sets either flag, the relay errors at startup with a message pointing at the replacements: -| Old (pre-rewrite) | New equivalent | +| Old | New | |---|---| | `root = "rendezvous:4443"` + `node = "us-east:4443"` | `connect = ["rendezvous:4443"]` + `mesh = "us-east:4443"` | -| `root = "rendezvous:4443"` (root-only node) | `mesh = "rendezvous:4443"` (passive rendezvous) | -| `node = "us-east:4443"` (post-rewrite, briefly) | `mesh = "us-east:4443"` | - -The semantics carry over: `connect` is the static dial list (one-or-more bootstrap peers), `mesh` is this relay's own advertised URL for gossip discovery. +| `root = "rendezvous:4443"` only | `mesh = "rendezvous:4443"` (passive rendezvous) | +| `node = "us-east:4443"` | `mesh = "us-east:4443"` | ## Next steps diff --git a/doc/bin/relay/config.md b/doc/bin/relay/config.md index ef980067c..543fbb0bb 100644 --- a/doc/bin/relay/config.md +++ b/doc/bin/relay/config.md @@ -120,19 +120,18 @@ Clustering configuration for multi-relay deployments. ```toml [cluster] -# Static peers to dial. Each is kept alive with exponential backoff. -connect = ["rendezvous.example.com:4443"] +# Peers this relay dials. The topology is whatever you draw with these links. +connect = ["us-east.example.com:4443"] -# This relay's own externally-reachable URL. When set, the relay advertises -# itself on the cluster origin so peers reached via `connect` discover and -# dial it. Omit for a relay that should not appear in the gossip mesh. -mesh = "us-east.example.com:4443" +# Optional. Set to this relay's own URL to advertise it so other peers find +# you automatically. +mesh = "us-west.example.com:4443" # JWT used for outbound cluster dials (alternative to mTLS). token = "cluster.jwt" ``` -See [Clustering](/bin/relay/cluster) for deployment patterns and the static / gossip mode split. +See [Clustering](/bin/relay/cluster) for topology choices and the trade-off between hand-listed peers and gossip. ### \[client] diff --git a/rs/moq-net/src/model/origin.rs b/rs/moq-net/src/model/origin.rs index 30e0313d7..c5d2f2471 100644 --- a/rs/moq-net/src/model/origin.rs +++ b/rs/moq-net/src/model/origin.rs @@ -643,10 +643,6 @@ pub struct OriginProducer { // The prefix that is automatically stripped from all paths. root: PathOwned, - - // Absolute path prefixes for which publishes are silently refused. - // Populated by [`OriginProducer::block`]. Empty by default. - blocked: Vec, } impl std::ops::Deref for OriginProducer { @@ -665,7 +661,6 @@ impl OriginProducer { info, nodes: OriginNodes::default(), root: PathOwned::default(), - blocked: Vec::new(), } } @@ -695,17 +690,13 @@ impl OriginProducer { return false; } - let full = self.root.join(&path); - - if self.is_blocked(&full) { - return false; - } - let (root, rest) = match self.nodes.get(&path) { Some(root) => root, None => return false, }; + let full = self.root.join(&path); + root.lock().publish(&full, &broadcast, &rest); let root = root.clone(); @@ -728,39 +719,12 @@ impl OriginProducer { info: self.info, nodes: self.nodes.select(&prefixes)?, root: self.root.clone(), - blocked: self.blocked.clone(), }) } - /// Returns a new OriginProducer that silently refuses publishes whose absolute path - /// falls under `prefix`. Stacks: `.block(a).block(b)` refuses paths under either. - /// - /// `prefix` is interpreted relative to this producer's current root, so the resulting - /// block is fixed at the absolute path computed at call time. Subsequent - /// [`with_root`](Self::with_root) calls do not retarget the block. - pub fn block(&self, prefix: impl AsPath) -> Self { - let absolute = self.root.join(prefix); - let mut blocked = self.blocked.clone(); - blocked.push(absolute); - Self { - info: self.info, - nodes: self.nodes.clone(), - root: self.root.clone(), - blocked, - } - } - - fn is_blocked(&self, absolute: &Path<'_>) -> bool { - self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) - } - /// Subscribe to all announced broadcasts. - /// - /// Any `block(prefix)` calls on this producer propagate to the consumer so the - /// view stays consistent: paths the producer refuses to publish are also hidden - /// from announce streams on the derived consumer. pub fn consume(&self) -> OriginConsumer { - OriginConsumer::new_with_blocked(self.info, self.root.clone(), self.nodes.clone(), self.blocked.clone()) + OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone()) } /// Get a broadcast by path if it has *already* been published. @@ -770,12 +734,6 @@ impl OriginProducer { #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")] pub fn get_broadcast(&self, path: impl AsPath) -> Option { let path = path.as_path(); - if !self.blocked.is_empty() { - let absolute = self.root.join(&path); - if self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) { - return None; - } - } let (root, rest) = self.nodes.get(&path)?; let state = root.lock(); state.consume_broadcast(&rest) @@ -792,7 +750,6 @@ impl OriginProducer { info: self.info, root: self.root.join(&prefix).to_owned(), nodes: self.nodes.root(&prefix)?, - blocked: self.blocked.clone(), }) } @@ -828,10 +785,6 @@ pub struct OriginConsumer { // A prefix that is automatically stripped from all paths. root: PathOwned, - - // Absolute path prefixes whose announcements / lookups are hidden from this - // consumer. Populated by [`OriginConsumer::block`]. Empty by default. - blocked: Vec, } impl std::ops::Deref for OriginConsumer { @@ -843,7 +796,7 @@ impl std::ops::Deref for OriginConsumer { } impl OriginConsumer { - fn new_with_blocked(info: Origin, root: PathOwned, nodes: OriginNodes, blocked: Vec) -> Self { + fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self { let state = conducer::Producer::::default(); let id = ConsumerId::new(); @@ -861,7 +814,6 @@ impl OriginConsumer { nodes, state, root, - blocked, } } @@ -882,21 +834,14 @@ impl OriginConsumer { /// consumer is closed, or `Poll::Pending` after registering `waiter` to be /// notified when the next update arrives. pub fn poll_announced(&mut self, waiter: &conducer::Waiter) -> Poll> { - loop { - let item = match self.state.poll(waiter, |state| match state.take() { - Some(item) => Poll::Ready(item), - None => Poll::Pending, - }) { - Poll::Ready(Ok(item)) => item, - // Closed: discard the Ref so its MutexGuard doesn't escape this call. - Poll::Ready(Err(_)) => return Poll::Ready(None), - Poll::Pending => return Poll::Pending, - }; - - if self.is_blocked(&item.0) { - continue; - } - return Poll::Ready(Some(item)); + match self.state.poll(waiter, |state| match state.take() { + Some(item) => Poll::Ready(item), + None => Poll::Pending, + }) { + Poll::Ready(Ok(item)) => Poll::Ready(Some(item)), + // Closed: discard the Ref so its MutexGuard doesn't escape this call. + Poll::Ready(Err(_)) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } @@ -905,21 +850,7 @@ impl OriginConsumer { /// Returns None if there is no update available; NOT because the consumer is closed. /// You have to use `is_closed` to check if the consumer is closed. pub fn try_announced(&mut self) -> Option { - loop { - let item = self.state.write().ok()?.take()?; - if self.is_blocked(&item.0) { - continue; - } - return Some(item); - } - } - - fn is_blocked(&self, path: &PathOwned) -> bool { - if self.blocked.is_empty() { - return false; - } - let absolute = self.root.join(path); - self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) + self.state.write().ok()?.take() } /// Create another consumer with its own announcement cursor over the same origin. @@ -936,12 +867,6 @@ impl OriginConsumer { /// landed (e.g. you're responding to an `announced()` callback). pub fn get_broadcast(&self, path: impl AsPath) -> Option { let path = path.as_path(); - if !self.blocked.is_empty() { - let absolute = self.root.join(&path); - if self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) { - return None; - } - } let (root, rest) = self.nodes.get(&path)?; let state = root.lock(); state.consume_broadcast(&rest) @@ -958,14 +883,6 @@ impl OriginConsumer { pub async fn announced_broadcast(&self, path: impl AsPath) -> Option { let path = path.as_path(); - // Refuse blocked paths up front; the post-scope loop would otherwise hang forever. - if !self.blocked.is_empty() { - let absolute = self.root.join(&path); - if self.blocked.iter().any(|b| absolute.has_prefix(b.as_path())) { - return None; - } - } - // Scope a fresh consumer down to this path so we only wake up for relevant announcements. let mut consumer = self.scope(std::slice::from_ref(&path))?; @@ -994,11 +911,10 @@ impl OriginConsumer { // TODO accept PathPrefixes instead of &[Path] pub fn scope(&self, prefixes: &[Path]) -> Option { let prefixes = PathPrefixes::new(prefixes); - Some(OriginConsumer::new_with_blocked( + Some(OriginConsumer::new( self.info, self.root.clone(), self.nodes.select(&prefixes)?, - self.blocked.clone(), )) } @@ -1009,28 +925,13 @@ impl OriginConsumer { pub fn with_root(&self, prefix: impl AsPath) -> Option { let prefix = prefix.as_path(); - Some(Self::new_with_blocked( + Some(Self::new( self.info, self.root.join(&prefix).to_owned(), self.nodes.root(&prefix)?, - self.blocked.clone(), )) } - /// Returns a new OriginConsumer that hides announcements whose absolute path falls - /// under `prefix`, and refuses `get_broadcast` / `announced_broadcast` lookups - /// targeting those paths. Stacks: `.block(a).block(b)` hides paths under either. - /// - /// `prefix` is interpreted relative to this consumer's current root, so the resulting - /// block is fixed at the absolute path computed at call time. Subsequent - /// [`with_root`](Self::with_root) calls do not retarget the block. - pub fn block(&self, prefix: impl AsPath) -> Self { - let absolute = self.root.join(prefix); - let mut blocked = self.blocked.clone(); - blocked.push(absolute); - Self::new_with_blocked(self.info, self.root.clone(), self.nodes.clone(), blocked) - } - /// Returns the prefix that is automatically stripped from all paths. pub fn root(&self) -> &Path<'_> { &self.root @@ -1058,7 +959,7 @@ impl Drop for OriginConsumer { impl Clone for OriginConsumer { fn clone(&self) -> Self { - OriginConsumer::new_with_blocked(self.info, self.root.clone(), self.nodes.clone(), self.blocked.clone()) + OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone()) } } @@ -2217,106 +2118,4 @@ mod tests { "unexpected path in pending updates", ); } - - #[tokio::test] - async fn test_consume_block_hides_announces_under_prefix() { - let origin = Origin::random().produce(); - let hidden = Broadcast::new().produce(); - let visible = Broadcast::new().produce(); - - // .internal/origins/foo is the cluster mesh path the relay needs to keep private. - origin.publish_broadcast(".internal/origins/foo", hidden.consume()); - origin.publish_broadcast("demo/bar", visible.consume()); - - let mut blocked = origin.consume().block(".internal"); - blocked.assert_next("demo/bar", &visible.consume()); - blocked.assert_next_wait(); // .internal/origins/foo is hidden - - let mut unblocked = origin.consume(); - // Insertion order isn't guaranteed across roots; just confirm both arrive. - let first = unblocked.try_announced().expect("first announce"); - let second = unblocked.try_announced().expect("second announce"); - let mut paths = [first.0.as_str(), second.0.as_str()]; - paths.sort(); - assert_eq!(paths, [".internal/origins/foo", "demo/bar"]); - } - - #[tokio::test] - async fn test_publish_block_refuses_publish_under_prefix() { - let origin = Origin::random().produce(); - let broadcast = Broadcast::new().produce(); - - let blocked = origin.block(".internal"); - assert!(!blocked.publish_broadcast(".internal/origins/foo", broadcast.consume())); - // Exact-match also blocked. - assert!(!blocked.publish_broadcast(".internal", broadcast.consume())); - // Sibling under a different first segment is fine. - assert!(blocked.publish_broadcast(".internalish", broadcast.consume())); - assert!(blocked.publish_broadcast("demo/bar", broadcast.consume())); - - let mut consumer = origin.consume(); - // Insertion-order: .internalish first, then demo/bar. - consumer.assert_next(".internalish", &broadcast.consume()); - consumer.assert_next("demo/bar", &broadcast.consume()); - consumer.assert_next_wait(); - } - - #[tokio::test] - async fn test_block_get_broadcast() { - let origin = Origin::random().produce(); - let hidden = Broadcast::new().produce(); - origin.publish_broadcast(".internal/origins/foo", hidden.consume()); - - let consumer = origin.consume().block(".internal"); - assert!(consumer.get_broadcast(".internal/origins/foo").is_none()); - assert!( - consumer - .announced_broadcast(".internal/origins/foo") - .now_or_never() - .is_some() - ); - // announced_broadcast returns None synchronously for blocked paths. - assert!( - consumer - .announced_broadcast(".internal/origins/foo") - .now_or_never() - .unwrap() - .is_none() - ); - } - - #[tokio::test] - async fn test_block_stacks() { - let origin = Origin::random().produce(); - let b1 = Broadcast::new().produce(); - let b2 = Broadcast::new().produce(); - let b3 = Broadcast::new().produce(); - - origin.publish_broadcast(".internal/x", b1.consume()); - origin.publish_broadcast(".secret/y", b2.consume()); - origin.publish_broadcast("demo/z", b3.consume()); - - let mut consumer = origin.consume().block(".internal").block(".secret"); - consumer.assert_next("demo/z", &b3.consume()); - consumer.assert_next_wait(); - } - - #[tokio::test] - async fn test_block_with_root_treats_block_as_absolute() { - let origin = Origin::random().produce(); - let internal = Broadcast::new().produce(); - let nested = Broadcast::new().produce(); - - // `.internal` set at the unrooted view stays anchored at the absolute root. - // After descending into `demo`, paths in the view start with `demo/...`, so the - // absolute `.internal/...` paths aren't reachable from the rooted view anyway. - let blocked = origin.block(".internal"); - - assert!(!blocked.publish_broadcast(".internal/x", internal.consume())); - - // .block then with_root: the descended view publishes under absolute "demo/...", - // so blocked .internal is unreachable from the rooted view and publishes succeed. - let rooted = blocked.with_root("demo").expect("rooted view"); - assert!(rooted.publish_broadcast("nested", nested.consume())); - } } diff --git a/rs/moq-relay/README.md b/rs/moq-relay/README.md index 53ae68d3d..14dc29f8f 100644 --- a/rs/moq-relay/README.md +++ b/rs/moq-relay/README.md @@ -58,22 +58,14 @@ HTTPS is currently not supported. ## Clustering -To scale MoQ, you will eventually need to run multiple moq-relay instances, often in different regions. -A user connects to the nearest relay and the cluster routes broadcasts between peers behind the scenes. +Relays can be joined together to proxy announcements and subscriptions. A viewer talks to whichever relay is closest; if their broadcast lives elsewhere in the cluster, the local relay fetches it from a neighbor and caches it. Hop tracking on every broadcast keeps loops out and picks the shortest path when there's more than one. -**moq-relay** layers clustering on top of moq-lite: every cluster peer publishes into the same logical origin, with a hop list on each broadcast for loop detection and shortest-path preference. -There are two ways to form a cluster, which can be combined: +- `--cluster-connect ` lists the peers this relay dials. Repeatable; defines the topology by hand. A simple chain like `eu-west <- us-east <- us-west` lets `us-east` cache and dedup the transatlantic fetches that fan out to many `us-west` viewers. +- `--cluster-mesh ` is optional. When set, this relay advertises its own URL to connected peers and dials any peers it learns about, so larger clusters don't need each node hand-configured. You still need at least one connection (in or out) so the advertisement has a path to flow. A relay with `--cluster-mesh` set and no `--cluster-connect` is a passive rendezvous. -- **Static topology** (`--cluster-connect `, repeatable or comma-separated). Each peer is dialed at startup and kept alive with exponential backoff. Best for 2-5 stable nodes; no discovery. -- **Gossip discovery** (`--cluster-mesh `). This relay advertises its URL on the cluster origin so peers reached via `--cluster-connect` discover and dial it. Pair with `--cluster-connect ` to join an existing mesh. +`--cluster-root` and `--cluster-node` from earlier versions were removed. The relay errors at startup if either is set and points at the replacements. -A relay with only `--cluster-mesh` set waits passively for inbound connections (acts as a rendezvous; no QUIC client required). A relay with both flags dials the rendezvous, gossips itself, and dials every peer it learns about. - -Mesh registrations live at `.internal/origins/` on the cluster origin. That namespace is mTLS-only: JWT and anonymous sessions never see or publish into `.internal/*` regardless of their declared scope. - -> `--cluster-root` and `--cluster-node` were removed. If you have either in an existing config, the relay errors at startup with a message pointing at `--cluster-connect` and `--cluster-mesh`. - -See [doc/bin/relay/cluster.md](https://github.com/moq-dev/moq/blob/main/doc/bin/relay/cluster.md) for the full walkthrough, including mTLS setup and a 3-node example. +See [doc/bin/relay/cluster.md](https://github.com/moq-dev/moq/blob/main/doc/bin/relay/cluster.md) for the full walkthrough, including topology trade-offs and authentication. ## Authentication diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index 9222a9f9e..504d77283 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -27,16 +27,11 @@ struct DialEntry { /// Configuration for relay clustering. /// -/// Two modes that can be combined: +/// [`Self::connect`] lists peers to dial. [`Self::mesh`] is optional: when set, this +/// relay advertises its own URL so other peers discover and dial it. Set both to +/// join an existing cluster; set mesh alone to act as a passive rendezvous. /// -/// - **Static** ([`Self::connect`]): explicit list of peer URLs to dial. Each is kept -/// alive for the session lifetime; no discovery happens. -/// - **Gossip** ([`Self::mesh`]): advertise this relay's URL on the cluster origin so -/// connected peers discover and dial it, and watch for the advertisements of others -/// so we dial them too. Pair with [`Self::connect`] to bootstrap into an existing -/// mesh, or set alone to act as a passive rendezvous. -/// -/// Hop-based routing on broadcasts prevents announcement loops regardless of mode. +/// Hop-based routing on broadcasts prevents announcement loops regardless of topology. #[serde_with::serde_as] #[derive(clap::Args, Clone, Debug, serde::Serialize, serde::Deserialize, Default)] #[serde_with::skip_serializing_none] @@ -56,10 +51,10 @@ pub struct ClusterConfig { #[serde_as(as = "serde_with::OneOrMany<_>")] pub connect: Vec, - /// This relay's own externally-reachable URL. When set, the relay publishes its address - /// on the cluster origin (under `.internal/origins/`) so other mTLS-authenticated - /// peers can discover and dial it. Pair with [`Self::connect`] to reach an initial peer - /// who will gossip your address onward, or set alone for passive rendezvous. + /// This relay's own externally-reachable URL. When set, the relay publishes its + /// address on the cluster origin so other peers can discover and dial it. Pair + /// with [`Self::connect`] to reach an initial peer who will gossip your address + /// onward, or set alone for passive rendezvous. #[arg(id = "cluster-mesh", long = "cluster-mesh", env = "MOQ_CLUSTER_MESH")] pub mesh: Option, @@ -139,35 +134,13 @@ impl Cluster { } /// Returns an [`OriginConsumer`] scoped to this session's subscribe permissions. - /// - /// Non-internal tokens (i.e. JWT-authenticated end users) cannot see `.internal/*` - /// paths regardless of their declared scope or root. Cluster mesh registrations and - /// other infrastructure broadcasts live under that prefix. - /// - /// The block is applied to the absolute root before any `with_root`/`scope` so a - /// JWT whose `token.root` itself lies under `.internal/*` can't sidestep it. pub fn subscriber(&self, token: &AuthToken) -> Option { - let origin = self.access_origin(token); - Some(origin.with_root(&token.root)?.scope(&token.subscribe)?.consume()) + Some(self.origin.with_root(&token.root)?.scope(&token.subscribe)?.consume()) } /// Returns an [`OriginProducer`] scoped to this session's publish permissions. - /// - /// Non-internal tokens cannot publish into `.internal/*` regardless of their - /// declared scope or root. pub fn publisher(&self, token: &AuthToken) -> Option { - let origin = self.access_origin(token); - origin.with_root(&token.root)?.scope(&token.publish) - } - - /// Returns the base origin a session is allowed to see. mTLS / internal sessions - /// get the full origin; everyone else gets a view that blocks `.internal/*`. - fn access_origin(&self, token: &AuthToken) -> OriginProducer { - if token.internal { - self.origin.clone() - } else { - self.origin.block(".internal") - } + self.origin.with_root(&token.root)?.scope(&token.publish) } /// Runs the cluster event loop. @@ -427,81 +400,6 @@ impl Cluster { mod tests { use super::*; use crate::Config; - use moq_net::{Broadcast, PathOwned, PathPrefixes}; - - fn full_scope_jwt() -> AuthToken { - AuthToken { - root: PathOwned::default(), - subscribe: PathPrefixes::from(vec![PathOwned::from(String::new())]), - publish: PathPrefixes::from(vec![PathOwned::from(String::new())]), - internal: false, - } - } - - /// A JWT with the broadest possible scope is still kept out of `.internal/*`. - #[tokio::test] - async fn internal_paths_invisible_to_non_mtls_token() { - let cluster = Cluster::new(ClusterConfig::default()); - let mesh = Broadcast::new().produce(); - let user = Broadcast::new().produce(); - - cluster - .origin - .publish_broadcast(".internal/origins/peer.example.com:4443", mesh.consume()); - cluster.origin.publish_broadcast("demo/test", user.consume()); - - let token = full_scope_jwt(); - let mut subscriber = cluster.subscriber(&token).expect("subscriber"); - - // The user broadcast is visible; the mesh registration must not be. - let (path, broadcast) = subscriber.try_announced().expect("user announce"); - assert_eq!(path.as_str(), "demo/test"); - assert!(broadcast.is_some()); - assert!( - subscriber.try_announced().is_none(), - ".internal/* must not be visible to a broad-scope JWT" - ); - - // The publisher view rejects publishes to `.internal/*` even with broad scope. - let publisher = cluster.publisher(&token).expect("publisher"); - let attempt = Broadcast::new().produce(); - assert!(!publisher.publish_broadcast(".internal/origins/attacker", attempt.consume())); - } - - /// Regression test for the block-before-root fix: a JWT whose `root` claim points - /// at `.internal` (or any prefix of it) must still be filtered. Before the fix the - /// `.block(".internal")` call sat on top of a view already rooted at `.internal`, - /// so the resulting block prefix was `.internal/.internal` and the real mesh paths - /// leaked through. - #[tokio::test] - async fn internal_paths_invisible_when_token_root_is_internal() { - let cluster = Cluster::new(ClusterConfig::default()); - let mesh = Broadcast::new().produce(); - cluster - .origin - .publish_broadcast(".internal/origins/peer.example.com:4443", mesh.consume()); - - let token = AuthToken { - root: PathOwned::from(".internal".to_string()), - subscribe: PathPrefixes::from(vec![PathOwned::from(String::new())]), - publish: PathPrefixes::from(vec![PathOwned::from(String::new())]), - internal: false, - }; - - let mut subscriber = cluster.subscriber(&token).expect("subscriber"); - assert!( - subscriber.try_announced().is_none(), - "token.root=.internal must not be able to read mesh registrations" - ); - - let publisher = cluster.publisher(&token).expect("publisher"); - let attempt = Broadcast::new().produce(); - // `origins/attacker` relative to root `.internal` is absolute `.internal/origins/attacker`. - assert!( - !publisher.publish_broadcast("origins/attacker", attempt.consume()), - "token.root=.internal must not be able to publish mesh registrations" - ); - } /// Regression test for the static-peer survival fix: gossip-driven unannounces must /// not abort the reconnect loop of a peer that was statically configured via @@ -544,21 +442,6 @@ mod tests { ); } - /// mTLS sessions see the mesh registrations they need to route between cluster peers. - #[tokio::test] - async fn internal_paths_visible_to_mtls_token() { - let cluster = Cluster::new(ClusterConfig::default()); - let mesh = Broadcast::new().produce(); - cluster - .origin - .publish_broadcast(".internal/origins/peer.example.com:4443", mesh.consume()); - - let mut subscriber = cluster.subscriber(&AuthToken::unrestricted()).expect("subscriber"); - let (path, broadcast) = subscriber.try_announced().expect("announce"); - assert_eq!(path.as_str(), ".internal/origins/peer.example.com:4443"); - assert!(broadcast.is_some()); - } - /// Setting `cluster.root` (the removed flag) at startup must surface a migration /// message that names both the replacement flags. #[tokio::test] From afb75426f3cb23a0847069ecce0bcfa60c794359 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 26 May 2026 10:53:25 -0700 Subject: [PATCH 8/9] moq-relay: tidy cluster::run control flow Two style fixes from review: - The `_self_registration` block and the gossip discovery spawn both depend on `self.config.mesh` being set. Merge the registration `.map(|mesh| ...)` and the separate `if has_outbound { if let Some(self_url) = ... }` into one `if let Some(mesh) = self.config.mesh .as_deref() { ... } else { None }`. Discovery still requires has_outbound (passive rendezvous has nothing to discover), but the outer mesh-is-set check happens once. - run_discovery: replace `match announced { Some(_) => ... None => ... }` with `if announced.is_some() { ... } else { ... }` since the BroadcastConsumer in the Some arm is never bound. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- rs/moq-relay/src/cluster.rs | 88 ++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index 504d77283..cf4f2d9e4 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -195,37 +195,38 @@ impl Cluster { None => String::new(), }; + let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); + let mut tasks = tokio::task::JoinSet::new(); + + for peer in &self.config.connect { + Self::spawn_dial(&mut tasks, &active, self.clone(), peer.clone(), token.clone(), true); + } + // Held in scope so the registration stays announced until `run` exits. - let _self_registration: Option = self.config.mesh.as_deref().map(|mesh| { + // Discovery is paired with it: a mesh-only relay (passive rendezvous) has + // nothing to discover, so we only run it when we also have an outbound peer. + let _self_registration: Option = if let Some(mesh) = self.config.mesh.as_deref() { let path = Path::new(MESH_PREFIX).join(mesh); let broadcast = self .origin .create_broadcast(&path) .expect(".internal/origins is within the relay origin's root"); tracing::info!(url = %mesh, %path, "advertising cluster mesh URL"); - broadcast - }); - - let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); - let mut tasks = tokio::task::JoinSet::new(); - for peer in &self.config.connect { - Self::spawn_dial(&mut tasks, &active, self.clone(), peer.clone(), token.clone(), true); - } - - // A mesh-only relay (passive rendezvous) already has inbound sessions from - // every peer that knows about it; running discovery would only create - // duplicate outbound sessions. - if has_outbound { - if let Some(self_url) = self.config.mesh.clone() { + if has_outbound { let this = self.clone(); let token = token.clone(); let active = active.clone(); + let self_url = mesh.to_owned(); tasks.spawn(async move { this.run_discovery(self_url, token, active).await; }); } - } + + Some(broadcast) + } else { + None + }; if tasks.is_empty() { // Passive rendezvous: park to keep `_self_registration` alive. The @@ -280,35 +281,34 @@ impl Cluster { continue; } - match announced { - Some(_) => { - let peer = peer.to_owned(); - let already_active = { - let active = active.lock().expect("dial map poisoned"); - active.contains_key(&peer) - }; - if already_active { - tracing::debug!(%peer, "discovered peer already tracked; skipping dial"); - continue; - } - tracing::info!(%peer, "discovered cluster peer; dialing"); - let this = self.clone(); - let token = token.clone(); - let peer_for_task = peer.clone(); - let handle = tokio::spawn(async move { - if let Err(err) = this.run_remote(&peer_for_task, token).await { - tracing::warn!(%err, peer = %peer_for_task, "cluster peer connection ended"); - } - }); - active.lock().expect("dial map poisoned").insert( - peer, - DialEntry { - handle: handle.abort_handle(), - is_static: false, - }, - ); + if announced.is_some() { + let peer = peer.to_owned(); + let already_active = { + let active = active.lock().expect("dial map poisoned"); + active.contains_key(&peer) + }; + if already_active { + tracing::debug!(%peer, "discovered peer already tracked; skipping dial"); + continue; } - None => Self::handle_gossip_unannounce(&active, peer), + tracing::info!(%peer, "discovered cluster peer; dialing"); + let this = self.clone(); + let token = token.clone(); + let peer_for_task = peer.clone(); + let handle = tokio::spawn(async move { + if let Err(err) = this.run_remote(&peer_for_task, token).await { + tracing::warn!(%err, peer = %peer_for_task, "cluster peer connection ended"); + } + }); + active.lock().expect("dial map poisoned").insert( + peer, + DialEntry { + handle: handle.abort_handle(), + is_static: false, + }, + ); + } else { + Self::handle_gossip_unannounce(&active, peer); } } } From 0b0ec73ebf6bcf149d0e8b7bd6f68b78fce5b29c Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 26 May 2026 15:39:30 -0700 Subject: [PATCH 9/9] moq-relay: don't abort dial tasks on gossip unannounce Found while testing mesh discovery against the local cluster: my discovery loop was treating every consumer unannounce as "peer gone, abort the dial" and every announce as "peer back, dial again". When the second leaf joined and gossip established a direct edge, each leaf saw the other's mesh broadcast arrive via *two* paths (root and direct). OriginProducer's "prefer shorter hop" logic then swaps which copy is active and delivers that swap to consumers as `unannounce + announce`. The discovery loop interpreted the swap as flap and re-dialed on every event, spinning a tight loop (5600 unannounces and 2700 redials in 30s for a stable cluster). Reannouncements are normal: any closer copy of the same broadcast triggers them, and that's exactly what makes mesh discovery work in the first place. The fix is to stop treating gossip unannouces as a reason to abort. The dial loop already reconnects on its own via exponential backoff; a peer that truly leaves just keeps backing off harmlessly. Replace `Arc>>` with a plain `Arc>>` used for dedup only, drop the DialEntry struct (no more provenance flag), drop handle_gossip_unannounce, and collapse the announce/unannounce branch in run_discovery to "skip if unannounced, else dial-if-new". The static-peer / discovered-peer distinction disappears with the abort path that motivated it. Removed gossip_unannounce_preserves_static_peer since the behavior it guarded (abort-only-non-static) no longer exists. Verified with a live just cluster run using cluster.mesh on both leaves: each leaf logs exactly one discovery event and dials the other once. Co-Authored-By: Claude Opus 4.7 (1M context) --- rs/moq-relay/src/cluster.rs | 167 ++++++++---------------------------- 1 file changed, 38 insertions(+), 129 deletions(-) diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index cf4f2d9e4..1bdda3f02 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -1,30 +1,19 @@ use std::{ - collections::HashMap, + collections::HashSet, path::PathBuf, sync::{Arc, Mutex}, }; use anyhow::Context; use moq_net::{BroadcastProducer, Origin, OriginConsumer, OriginProducer, Path, Stats, Tier}; -use tokio::task::AbortHandle; use url::Url; use crate::AuthToken; /// Path prefix under which cluster nodes advertise their own URLs for gossip-style -/// peer discovery. Restricted to mTLS (`token.internal`) sessions by -/// [`Cluster::subscriber`] / [`Cluster::publisher`]. +/// peer discovery. const MESH_PREFIX: &str = ".internal/origins"; -/// One entry in the active-dial map. The provenance flag keeps -/// gossip unannounces from tearing down a peer that was also seeded statically: -/// static peers must keep their reconnect loop running even when the -/// remote temporarily disappears from `.internal/origins/*`. -struct DialEntry { - handle: AbortHandle, - is_static: bool, -} - /// Configuration for relay clustering. /// /// [`Self::connect`] lists peers to dial. [`Self::mesh`] is optional: when set, this @@ -195,11 +184,15 @@ impl Cluster { None => String::new(), }; - let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); + // URLs we've already spawned a dial task for (static + gossip-discovered). + // Dedup only; we never abort entries based on gossip churn, since the + // "prefer shorter hop" logic in OriginProducer delivers reannounces as + // unannounce-then-announce pairs that would otherwise drive a tight loop. + let dialed: Arc>> = Arc::new(Mutex::new(HashSet::new())); let mut tasks = tokio::task::JoinSet::new(); for peer in &self.config.connect { - Self::spawn_dial(&mut tasks, &active, self.clone(), peer.clone(), token.clone(), true); + Self::spawn_dial(&mut tasks, &dialed, self.clone(), peer.clone(), token.clone()); } // Held in scope so the registration stays announced until `run` exits. @@ -216,10 +209,10 @@ impl Cluster { if has_outbound { let this = self.clone(); let token = token.clone(); - let active = active.clone(); + let dialed = dialed.clone(); let self_url = mesh.to_owned(); tasks.spawn(async move { - this.run_discovery(self_url, token, active).await; + this.run_discovery(self_url, token, dialed).await; }); } @@ -238,100 +231,57 @@ impl Cluster { Ok(()) } - /// Spawn a dial loop for `peer` and remember its abort handle. Skips if `peer` - /// is already tracked (caller-side dedup against static peers and prior discoveries). + /// Spawn a dial loop for `peer`. No-op if `peer` is already tracked. fn spawn_dial( tasks: &mut tokio::task::JoinSet<()>, - active: &Arc>>, + dialed: &Arc>>, this: Self, peer: String, token: String, - is_static: bool, ) { - { - let active = active.lock().expect("dial map poisoned"); - if active.contains_key(&peer) { - return; - } + if !dialed.lock().expect("dial set poisoned").insert(peer.clone()) { + return; } - let peer_for_task = peer.clone(); - let handle = tasks.spawn(async move { - if let Err(err) = this.run_remote(&peer_for_task, token).await { - tracing::warn!(%err, peer = %peer_for_task, "cluster peer connection ended"); + tasks.spawn(async move { + if let Err(err) = this.run_remote(&peer, token).await { + tracing::warn!(%err, %peer, "cluster peer connection ended"); } }); - active - .lock() - .expect("dial map poisoned") - .insert(peer, DialEntry { handle, is_static }); } - /// Watch `.internal/origins/*` for peer registrations and dial each newly-announced - /// URL that isn't already tracked. An unannounce aborts the corresponding dial only - /// for gossip-discovered peers; static `--cluster-connect` peers keep reconnecting. - async fn run_discovery(self, self_url: String, token: String, active: Arc>>) { + /// Watch `.internal/origins/*` for peer registrations and dial each newly-seen + /// URL. We deliberately don't abort dials on unannounce: the "prefer shorter + /// hop" path in OriginProducer delivers reannouncements as unannounce-then- + /// announce pairs whenever a closer copy of the same broadcast arrives, + /// which would otherwise drive a tight respawn loop. The dial loop reconnects + /// on its own; if a peer truly leaves, the loop just keeps backing off. + async fn run_discovery(self, self_url: String, token: String, dialed: Arc>>) { let Some(mut consumer) = self.origin.consume().with_root(MESH_PREFIX) else { tracing::warn!("could not scope cluster origin to {MESH_PREFIX}; discovery disabled"); return; }; while let Some((relative, announced)) = consumer.announced().await { + if announced.is_none() { + continue; + } let peer = relative.as_str(); if peer == self_url { continue; } - - if announced.is_some() { - let peer = peer.to_owned(); - let already_active = { - let active = active.lock().expect("dial map poisoned"); - active.contains_key(&peer) - }; - if already_active { - tracing::debug!(%peer, "discovered peer already tracked; skipping dial"); - continue; - } - tracing::info!(%peer, "discovered cluster peer; dialing"); - let this = self.clone(); - let token = token.clone(); - let peer_for_task = peer.clone(); - let handle = tokio::spawn(async move { - if let Err(err) = this.run_remote(&peer_for_task, token).await { - tracing::warn!(%err, peer = %peer_for_task, "cluster peer connection ended"); - } - }); - active.lock().expect("dial map poisoned").insert( - peer, - DialEntry { - handle: handle.abort_handle(), - is_static: false, - }, - ); - } else { - Self::handle_gossip_unannounce(&active, peer); - } - } - } - - /// Handle a gossip unannounce for `peer`: abort the dial only if the entry was - /// added by discovery. Static peers (seeded from `--cluster-connect`) keep their - /// reconnect loop running, since gossip churn is just remote restarts. - fn handle_gossip_unannounce(active: &Arc>>, peer: &str) { - let mut active = active.lock().expect("dial map poisoned"); - match active.get(peer) { - Some(entry) if entry.is_static => { - tracing::debug!( - %peer, - "gossip unannounce for static peer; reconnect loop kept alive" - ); + let peer = peer.to_owned(); + if !dialed.lock().expect("dial set poisoned").insert(peer.clone()) { + tracing::debug!(%peer, "discovered peer already tracked; skipping dial"); + continue; } - Some(_) => { - tracing::info!(%peer, "cluster peer unannounced; aborting dial"); - if let Some(entry) = active.remove(peer) { - entry.handle.abort(); + tracing::info!(%peer, "discovered cluster peer; dialing"); + let this = self.clone(); + let token = token.clone(); + tokio::spawn(async move { + if let Err(err) = this.run_remote(&peer, token).await { + tracing::warn!(%err, %peer, "cluster peer connection ended"); } - } - None => {} + }); } } @@ -401,47 +351,6 @@ mod tests { use super::*; use crate::Config; - /// Regression test for the static-peer survival fix: gossip-driven unannounces must - /// not abort the reconnect loop of a peer that was statically configured via - /// `--cluster-connect`. Before the fix the active map didn't track provenance, so an - /// unannounce of a peer that appeared in both `connect` and gossip removed it, - /// permanently breaking that static peer's reconnect. - #[tokio::test] - async fn gossip_unannounce_preserves_static_peer() { - let active: Arc>> = Arc::new(Mutex::new(HashMap::new())); - // Stand in for a real dial task; never polled but provides an AbortHandle. - let placeholder = tokio::spawn(std::future::pending::<()>()); - let static_handle = placeholder.abort_handle(); - active.lock().unwrap().insert( - "static-peer.example.com:4443".to_string(), - DialEntry { - handle: static_handle, - is_static: true, - }, - ); - - Cluster::handle_gossip_unannounce(&active, "static-peer.example.com:4443"); - assert!( - active.lock().unwrap().contains_key("static-peer.example.com:4443"), - "static peer entry must survive a gossip unannounce" - ); - - // Now insert a discovered peer and confirm unannounce DOES drop it. - let discovered = tokio::spawn(std::future::pending::<()>()); - active.lock().unwrap().insert( - "discovered.example.com:4443".to_string(), - DialEntry { - handle: discovered.abort_handle(), - is_static: false, - }, - ); - Cluster::handle_gossip_unannounce(&active, "discovered.example.com:4443"); - assert!( - !active.lock().unwrap().contains_key("discovered.example.com:4443"), - "discovered peer entry should be removed on gossip unannounce" - ); - } - /// Setting `cluster.root` (the removed flag) at startup must surface a migration /// message that names both the replacement flags. #[tokio::test]