From f2b8dd9f68bede62f724c096aa86435d8962236d Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 21 May 2026 16:46:34 +0000 Subject: [PATCH] moq-net: split announcement cursor out of OriginConsumer OriginConsumer used to bundle a tree-read handle with an mpsc cursor and a registered ConsumerId. Every clone (and every transient .consume() / .scope() / .with_root() chain) paid for a fresh channel plus a consume_initial() walk that replayed every active broadcast as backlog. Callers like get_broadcast, announced_broadcast, and the FFI wrappers paid that cost just to look up one path or re-root a handle. Split announcement subscription into AnnounceProducer and AnnounceConsumer. OriginConsumer is now a cheap, Clone-friendly read handle; call .announced() on it to allocate the channel and register the cursor. AnnounceConsumer owns the Drop-side unregistration. The deprecated OriginProducer::get_broadcast shim and its libmoq Updates the lite/ietf publishers, FFI, libmoq, relay, clock, moq-boy, hang examples, and integration tests to grab a cursor explicitly (consumer.announced() -> AnnounceConsumer::next/try_next). Adds a regression test that cloning OriginConsumer does not drain another cursor's channel and that a freshly-built AnnounceConsumer still receives the active backlog. https://claude.ai/code/session_01BSuKtGPPntcBMEQiGb3Pn6 --- rs/hang/examples/subscribe.rs | 3 +- rs/libmoq/src/origin.rs | 10 +- rs/moq-boy/src/input.rs | 4 +- rs/moq-boy/src/main.rs | 3 +- rs/moq-ffi/src/origin.rs | 18 +- rs/moq-native/examples/clock.rs | 5 +- rs/moq-native/tests/backend.rs | 8 +- rs/moq-native/tests/broadcast.rs | 12 +- rs/moq-net/src/ietf/publisher.rs | 19 +- rs/moq-net/src/lite/publisher.rs | 20 +- rs/moq-net/src/model/origin.rs | 438 ++++++++++++++++++++----------- rs/moq-net/src/stats.rs | 18 +- rs/moq-relay/src/cluster.rs | 9 +- rs/moq-relay/src/web.rs | 5 +- 14 files changed, 353 insertions(+), 219 deletions(-) diff --git a/rs/hang/examples/subscribe.rs b/rs/hang/examples/subscribe.rs index 571823ee1..886b45747 100644 --- a/rs/hang/examples/subscribe.rs +++ b/rs/hang/examples/subscribe.rs @@ -38,10 +38,11 @@ async fn run_session(origin: moq_net::OriginProducer) -> anyhow::Result<()> { } // Subscribe to a broadcast and read media frames. -async fn run_subscribe(mut consumer: moq_net::OriginConsumer) -> anyhow::Result<()> { +async fn run_subscribe(consumer: moq_net::OriginConsumer) -> anyhow::Result<()> { // Wait for a broadcast to be announced. let (path, broadcast) = consumer .announced() + .next() .await .ok_or_else(|| anyhow::anyhow!("origin closed"))?; diff --git a/rs/libmoq/src/origin.rs b/rs/libmoq/src/origin.rs index 0c645f7e0..a9b27470b 100644 --- a/rs/libmoq/src/origin.rs +++ b/rs/libmoq/src/origin.rs @@ -40,7 +40,7 @@ impl Origin { pub fn announced(&mut self, origin: Id, on_announce: OnStatus) -> Result { let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; - let consumer = origin.consume(); + let consumer = origin.consume().announced(); let channel = oneshot::channel(); let entry = TaskEntry { @@ -64,8 +64,8 @@ impl Origin { Ok(id) } - async fn run_announced(task_id: Id, mut consumer: moq_net::OriginConsumer) -> Result<(), Error> { - while let Some((path, broadcast)) = consumer.announced().await { + async fn run_announced(task_id: Id, mut consumer: moq_net::AnnounceConsumer) -> Result<(), Error> { + while let Some((path, broadcast)) = consumer.next().await { let mut state = State::lock(); // Stop if the callback was revoked by close. @@ -108,9 +108,7 @@ impl Origin { let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; // TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait // for gossip instead of racing it. - // Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume(). - #[allow(deprecated)] - origin.get_broadcast(path).ok_or(Error::BroadcastNotFound) + origin.consume().get_broadcast(path).ok_or(Error::BroadcastNotFound) } pub fn publish( diff --git a/rs/moq-boy/src/input.rs b/rs/moq-boy/src/input.rs index ddaa0561f..a0283ee6c 100644 --- a/rs/moq-boy/src/input.rs +++ b/rs/moq-boy/src/input.rs @@ -61,11 +61,11 @@ pub enum Command { /// Handles discovered viewers: subscribes to their command tracks. pub async fn handle_viewers( - viewer_origin: &mut moq_net::OriginConsumer, + viewer_origin: &mut moq_net::AnnounceConsumer, cmd_tx: &tokio::sync::mpsc::Sender, ) -> anyhow::Result<()> { loop { - let Some((path, broadcast)) = viewer_origin.announced().await else { + let Some((path, broadcast)) = viewer_origin.next().await else { break; }; diff --git a/rs/moq-boy/src/main.rs b/rs/moq-boy/src/main.rs index 74d42b0a9..5776600b1 100644 --- a/rs/moq-boy/src/main.rs +++ b/rs/moq-boy/src/main.rs @@ -228,7 +228,8 @@ async fn run(config: &Config) -> Result<()> { let mut viewer_consumer = consume_origin .with_root(&viewer_path) .expect("viewer prefix should be valid") - .consume(); + .consume() + .announced(); tracing::info!(url = %config.url, %name, broadcast = %broadcast_path, "connecting to relay"); diff --git a/rs/moq-ffi/src/origin.rs b/rs/moq-ffi/src/origin.rs index eac8ea283..f7a195ed1 100644 --- a/rs/moq-ffi/src/origin.rs +++ b/rs/moq-ffi/src/origin.rs @@ -21,13 +21,13 @@ pub struct MoqAnnounced { } struct Announced { - inner: moq_net::OriginConsumer, + inner: moq_net::AnnounceConsumer, } impl Announced { async fn next(&mut self) -> Result>, MoqError> { loop { - match self.inner.announced().await { + match self.inner.next().await { Some((path, Some(broadcast))) => { return Ok(Some(Arc::new(MoqAnnouncement { path: path.to_string(), @@ -43,7 +43,7 @@ impl Announced { async fn available(&mut self) -> Result, MoqError> { loop { - match self.inner.announced().await { + match self.inner.next().await { Some((_path, Some(broadcast))) => { return Ok(Arc::new(MoqBroadcastConsumer::new(broadcast))); } @@ -109,18 +109,22 @@ impl MoqOriginConsumer { /// Subscribe to all broadcast announcements under a prefix. pub fn announced(&self, prefix: String) -> Result, MoqError> { let _guard = crate::ffi::RUNTIME.enter(); - let origin = self.inner.clone().with_root(prefix).ok_or(MoqError::Unauthorized)?; + let origin = self.inner.with_root(prefix).ok_or(MoqError::Unauthorized)?; Ok(Arc::new(MoqAnnounced { - task: Task::new(Announced { inner: origin }), + task: Task::new(Announced { + inner: origin.announced(), + }), })) } /// Wait for a specific broadcast to be announced by path. pub fn announced_broadcast(&self, path: String) -> Result, MoqError> { let _guard = crate::ffi::RUNTIME.enter(); - let origin = self.inner.clone().with_root(path).ok_or(MoqError::Unauthorized)?; + let origin = self.inner.with_root(path).ok_or(MoqError::Unauthorized)?; Ok(Arc::new(MoqAnnouncedBroadcast { - task: Task::new(Announced { inner: origin }), + task: Task::new(Announced { + inner: origin.announced(), + }), })) } } diff --git a/rs/moq-native/examples/clock.rs b/rs/moq-native/examples/clock.rs index 491ee2cb2..c59f2355d 100644 --- a/rs/moq-native/examples/clock.rs +++ b/rs/moq-native/examples/clock.rs @@ -94,13 +94,14 @@ async fn main() -> anyhow::Result<()> { let mut origin = origin .scope(&[path]) .context("not allowed to consume broadcast")? - .consume(); + .consume() + .announced(); let mut clock: Option = None; loop { tokio::select! { - Some(announce) = origin.announced() => match announce { + Some(announce) = origin.next() => match announce { (path, Some(broadcast)) => { tracing::info!(broadcast = %path, "broadcast is online, subscribing to track"); let track = broadcast.subscribe_track(&track)?; diff --git a/rs/moq-native/tests/backend.rs b/rs/moq-native/tests/backend.rs index 271f592e2..271406f96 100644 --- a/rs/moq-native/tests/backend.rs +++ b/rs/moq-native/tests/backend.rs @@ -34,7 +34,7 @@ async fn backend_test(scheme: &str, backend: moq_native::QuicBackend) { // ── subscriber (client) ───────────────────────────────────────── let sub_origin = Origin::random().produce(); - let mut announcements = sub_origin.consume(); + let mut announcements = sub_origin.consume().announced(); let mut client_config = moq_native::ClientConfig::default(); client_config.tls.disable_verify = Some(true); @@ -61,7 +61,7 @@ async fn backend_test(scheme: &str, backend: moq_native::QuicBackend) { .expect("client connect timed out") .expect("client connect failed"); - let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced()) + let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next()) .await .expect("announce timed out") .expect("origin closed"); @@ -173,7 +173,7 @@ async fn iroh_connect() { // ── subscriber (client) ───────────────────────────────────────── let sub_origin = Origin::random().produce(); - let mut announcements = sub_origin.consume(); + let mut announcements = sub_origin.consume().announced(); // Create client iroh endpoint let mut client_iroh_config = IrohEndpointConfig::default(); @@ -213,7 +213,7 @@ async fn iroh_connect() { .expect("client connect timed out") .expect("client connect failed"); - let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced()) + let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next()) .await .expect("announce timed out") .expect("origin closed"); diff --git a/rs/moq-native/tests/broadcast.rs b/rs/moq-native/tests/broadcast.rs index a0f20da5c..ab983932c 100644 --- a/rs/moq-native/tests/broadcast.rs +++ b/rs/moq-native/tests/broadcast.rs @@ -46,7 +46,7 @@ async fn broadcast_test(scheme: &str, client_version: Option<&str>, server_versi // ── subscriber (client) ───────────────────────────────────────── let sub_origin = Origin::random().produce(); - let mut announcements = sub_origin.consume(); + let mut announcements = sub_origin.consume().announced(); let mut client_config = moq_native::ClientConfig::default(); client_config.tls.disable_verify = Some(true); @@ -78,7 +78,7 @@ async fn broadcast_test(scheme: &str, client_version: Option<&str>, server_versi .expect("client connect failed"); // Wait for the broadcast announcement. - let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced()) + let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next()) .await .expect("announce timed out") .expect("origin closed"); @@ -460,7 +460,7 @@ async fn broadcast_websocket() { // ── subscriber (client) ───────────────────────────────────────── let sub_origin = Origin::random().produce(); - let mut announcements = sub_origin.consume(); + let mut announcements = sub_origin.consume().announced(); let mut client_config = moq_native::ClientConfig::default(); client_config.tls.disable_verify = Some(true); @@ -490,7 +490,7 @@ async fn broadcast_websocket() { .expect("client connect failed"); // Wait for the broadcast announcement. - let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced()) + let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next()) .await .expect("announce timed out") .expect("origin closed"); @@ -564,7 +564,7 @@ async fn broadcast_websocket_fallback() { // ── subscriber (client) ───────────────────────────────────────── let sub_origin = Origin::random().produce(); - let mut announcements = sub_origin.consume(); + let mut announcements = sub_origin.consume().announced(); let mut client_config = moq_native::ClientConfig::default(); client_config.tls.disable_verify = Some(true); @@ -597,7 +597,7 @@ async fn broadcast_websocket_fallback() { .expect("client connect failed"); // Wait for the broadcast announcement. - let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced()) + let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next()) .await .expect("announce timed out") .expect("origin closed"); diff --git a/rs/moq-net/src/ietf/publisher.rs b/rs/moq-net/src/ietf/publisher.rs index 45772bde5..3219f91f8 100644 --- a/rs/moq-net/src/ietf/publisher.rs +++ b/rs/moq-net/src/ietf/publisher.rs @@ -437,17 +437,18 @@ impl Publisher { } /// Outgoing PublishNamespace: announce each namespace via a bidi stream. - async fn run_announce(mut self) -> Result<(), Error> { + async fn run_announce(self) -> Result<(), Error> { let mut namespace_streams: HashMap)> = HashMap::new(); + let mut announced = self.origin.announced(); loop { - let announced = tokio::select! { + let next = tokio::select! { biased; _ = self.session.closed() => return Ok(()), - announced = self.origin.announced() => announced, + next = announced.next() => next, }; - let Some((path, active)) = announced else { + let Some((path, active)) = next else { break; }; @@ -548,7 +549,7 @@ impl Publisher { tracing::debug!(prefix = %self.origin.absolute(&prefix), "subscribe_namespace stream"); - let mut origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?; + let origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?; // Send OK response match self.version { @@ -585,8 +586,10 @@ impl Publisher { } // v16+: Send Namespace/NamespaceDone entries on this bidi stream. _ => { + let mut announced = origin.announced(); + // Send initial NAMESPACE messages for currently active namespaces - while let Some((path, active)) = origin.try_announced() { + while let Some((path, active)) = announced.try_next() { let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); if active.is_some() { tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); @@ -605,8 +608,8 @@ impl Publisher { tokio::select! { biased; res = stream.reader.closed() => return res, - announced = origin.announced() => { - match announced { + next = announced.next() => { + match next { Some((path, active)) => { let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned(); if active.is_some() { diff --git a/rs/moq-net/src/lite/publisher.rs b/rs/moq-net/src/lite/publisher.rs index e88e49bd6..a51f7d304 100644 --- a/rs/moq-net/src/lite/publisher.rs +++ b/rs/moq-net/src/lite/publisher.rs @@ -5,8 +5,8 @@ use web_async::FuturesExt; use web_transport_trait::Stats; use crate::{ - AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, OriginList, StatsHandle as MoqStats, Track, - TrackConsumer, + AnnounceConsumer, AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, OriginList, StatsHandle as MoqStats, + Track, TrackConsumer, coding::{Stream, Writer}, lite::{ self, @@ -142,7 +142,8 @@ impl Publisher { let prefix = interest.prefix.to_owned(); let exclude_hop = interest.exclude_hop; - let mut origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?; + let origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?; + let mut announced = origin.announced(); let version = self.version; let self_origin = self.self_origin; @@ -150,7 +151,8 @@ impl Publisher { web_async::spawn(async move { if let Err(err) = Self::run_announce( &mut stream, - &mut origin, + &origin, + &mut announced, &prefix, self_origin, exclude_hop, @@ -175,9 +177,11 @@ impl Publisher { Ok(()) } + #[allow(clippy::too_many_arguments)] async fn run_announce( stream: &mut Stream, - origin: &mut OriginConsumer, + origin: &OriginConsumer, + announced: &mut AnnounceConsumer, prefix: impl AsPath, self_origin: Origin, // Peer's session-level origin id, sent in AnnounceInterest. We skip @@ -202,7 +206,7 @@ impl Publisher { // Send ANNOUNCE_INIT as the first message with all currently active paths // We use `try_next()` to synchronously get the initial updates. - while let Some((path, active)) = origin.try_announced() { + while let Some((path, active)) = announced.try_next() { let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); if active.is_some() { @@ -233,8 +237,8 @@ impl Publisher { tokio::select! { biased; res = stream.reader.closed() => return res, - announced = origin.announced() => { - match announced { + next = announced.next() => { + match next { Some((path, active)) => { let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned(); diff --git a/rs/moq-net/src/model/origin.rs b/rs/moq-net/src/model/origin.rs index c5d2f2471..a23538b89 100644 --- a/rs/moq-net/src/model/origin.rs +++ b/rs/moq-net/src/model/origin.rs @@ -299,12 +299,12 @@ impl OriginConsumerState { } #[derive(Clone)] -struct OriginConsumerNotify { +struct AnnounceConsumerNotify { root: PathOwned, state: conducer::Producer, } -impl OriginConsumerNotify { +impl AnnounceConsumerNotify { fn announce(&self, path: impl AsPath, broadcast: BroadcastConsumer) { let path = path.as_path().strip_prefix(&self.root).unwrap().to_owned(); self.state @@ -332,7 +332,7 @@ struct NotifyNode { // Consumers that are subscribed to this node. // We store a consumer ID so we can remove it easily when it closes. - consumers: HashMap, + consumers: HashMap, } impl NotifyNode { @@ -452,12 +452,12 @@ impl OriginNode { } } - fn consume(&mut self, id: ConsumerId, mut notify: OriginConsumerNotify) { + fn consume(&mut self, id: ConsumerId, mut notify: AnnounceConsumerNotify) { self.consume_initial(&mut notify); self.notify.lock().consumers.insert(id, notify); } - fn consume_initial(&mut self, notify: &mut OriginConsumerNotify) { + fn consume_initial(&mut self, notify: &mut AnnounceConsumerNotify) { if let Some(broadcast) = &self.broadcast { notify.announce(&broadcast.path, broadcast.active.clone()); } @@ -722,21 +722,21 @@ impl OriginProducer { }) } - /// Subscribe to all announced broadcasts. + /// Cheap read handle over this origin's broadcast tree. + /// + /// Use [`OriginConsumer::announced`] to register interest and start receiving + /// announcement events; the consumer itself does not allocate any channels. pub fn consume(&self) -> OriginConsumer { OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone()) } - /// Get a broadcast by path if it has *already* been published. + /// Handle to the announcement stream for this producer's subtree. /// - /// Equivalent to `self.consume().get_broadcast(path)` but skips the - /// announcement-cursor allocation, which is currently relatively expensive. - #[deprecated(note = "use `consume().get_broadcast(path)` once `consume()` is cheap")] - pub fn get_broadcast(&self, path: impl AsPath) -> Option { - let path = path.as_path(); - let (root, rest) = self.nodes.get(&path)?; - let state = root.lock(); - state.consume_broadcast(&rest) + /// Symmetric counterpart to [`Self::consume`]; call + /// [`AnnounceProducer::consume`] to get an [`AnnounceConsumer`] that + /// receives announce / unannounce events. + pub fn announces(&self) -> AnnounceProducer { + AnnounceProducer::new(self.root.clone(), self.nodes.clone()) } /// Returns a new OriginProducer that automatically strips out the provided prefix. @@ -770,19 +770,17 @@ impl OriginProducer { } } -/// Consumes announced broadcasts matching against an optional prefix. +/// Cheap read handle over an origin's broadcast tree. /// -/// NOTE: Clone is expensive, try to avoid it. +/// Clones share the underlying tree state without allocating any per-cursor +/// resources. To actually receive announce / unannounce events, call +/// [`Self::announced`] to obtain an [`AnnounceConsumer`]. +#[derive(Clone)] pub struct OriginConsumer { - id: ConsumerId, // Identity of the origin this consumer was derived from. info: Origin, nodes: OriginNodes, - // Pending updates queued for this consumer. Coalesced so a slow consumer - // can't accumulate redundant announce/unannounce pairs. - state: conducer::Producer, - // A prefix that is automatically stripped from all paths. root: PathOwned, } @@ -797,63 +795,20 @@ impl std::ops::Deref for OriginConsumer { impl OriginConsumer { fn new(info: Origin, root: PathOwned, nodes: OriginNodes) -> Self { - let state = conducer::Producer::::default(); - let id = ConsumerId::new(); - - for (_, node) in &nodes.nodes { - let notify = OriginConsumerNotify { - root: root.clone(), - state: state.clone(), - }; - node.lock().consume(id, notify); - } - - Self { - id, - info, - nodes, - state, - root, - } - } - - /// Returns the next (un)announced broadcast and the absolute path. - /// - /// The broadcast will only be announced if it was previously unannounced. - /// The same path won't be announced/unannounced twice, instead it will toggle. - /// Returns None if the consumer is closed. - /// - /// Note: The returned path is absolute and will always match this consumer's prefix. - pub async fn announced(&mut self) -> Option { - conducer::wait(|waiter| self.poll_announced(waiter)).await - } - - /// Poll for the next (un)announced broadcast, without blocking. - /// - /// Returns `Poll::Ready(Some(_))` for an update, `Poll::Ready(None)` if the - /// consumer is closed, or `Poll::Pending` after registering `waiter` to be - /// notified when the next update arrives. - pub fn poll_announced(&mut self, waiter: &conducer::Waiter) -> Poll> { - match self.state.poll(waiter, |state| match state.take() { - Some(item) => Poll::Ready(item), - None => Poll::Pending, - }) { - Poll::Ready(Ok(item)) => Poll::Ready(Some(item)), - // Closed: discard the Ref so its MutexGuard doesn't escape this call. - Poll::Ready(Err(_)) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } + Self { info, nodes, root } } - /// Returns the next (un)announced broadcast and the absolute path without blocking. + /// Subscribe to announce / unannounce events for this consumer's subtree. /// - /// Returns None if there is no update available; NOT because the consumer is closed. - /// You have to use `is_closed` to check if the consumer is closed. - pub fn try_announced(&mut self) -> Option { - self.state.write().ok()?.take() + /// Allocates a per-cursor coalescing buffer, registers it with each root + /// in this consumer's scope, and replays the currently active broadcast + /// set as initial announcements. Drop the returned [`AnnounceConsumer`] + /// to unregister. + pub fn announced(&self) -> AnnounceConsumer { + AnnounceConsumer::new(self.root.clone(), self.nodes.clone()) } - /// Create another consumer with its own announcement cursor over the same origin. + /// Returns a cheap duplicate of this read handle. pub fn consume(&self) -> Self { self.clone() } @@ -861,7 +816,7 @@ impl OriginConsumer { /// Get a broadcast by path if it has *already* been announced. /// /// Returns `None` when the path is unknown to this consumer right now. Synchronous - /// lookup races announcement gossip — a freshly-connected consumer will see `None` + /// lookup races announcement gossip. A freshly-connected consumer will see `None` /// even when the broadcast is about to arrive. Prefer [`Self::announced_broadcast`] /// (blocks until announced) unless you can guarantee the announcement has already /// landed (e.g. you're responding to an `announced()` callback). @@ -876,7 +831,7 @@ impl OriginConsumer { /// /// Returns `None` if the path is outside this consumer's allowed prefixes or if the consumer /// is closed before the broadcast is announced. The returned broadcast may itself be closed - /// later — subscribers should watch [`BroadcastConsumer::closed`] to react to that. + /// later. Subscribers should watch [`BroadcastConsumer::closed`] to react to that. /// /// Prefer this over [`Self::get_broadcast`] when you know the exact path you want but /// cannot guarantee the announcement has already been received. @@ -884,19 +839,20 @@ impl OriginConsumer { let path = path.as_path(); // Scope a fresh consumer down to this path so we only wake up for relevant announcements. - let mut consumer = self.scope(std::slice::from_ref(&path))?; + let consumer = self.scope(std::slice::from_ref(&path))?; // `scope` keeps narrower permissions intact: if we ask for `foo` on a consumer limited - // to `foo/specific`, `scope` returns a consumer scoped to `foo/specific` — no + // to `foo/specific`, `scope` returns a consumer scoped to `foo/specific`. No // announcement at the exact path `foo` can ever arrive. Bail rather than loop forever. if !consumer.allowed().any(|allowed| path.has_prefix(allowed)) { return None; } + let mut announced = consumer.announced(); loop { - let (announced, broadcast) = consumer.announced().await?; + let (announced_path, broadcast) = announced.next().await?; // `scope` narrows by prefix, but we only want an exact-path match. - if announced.as_path() == path { + if announced_path.as_path() == path { if let Some(broadcast) = broadcast { return Some(broadcast); } @@ -949,17 +905,122 @@ impl OriginConsumer { } } -impl Drop for OriginConsumer { - fn drop(&mut self) { - for (_, root) in &self.nodes.nodes { - root.lock().unconsume(self.id); +/// Handle to the announcement stream for a subtree. +/// +/// Symmetric counterpart of [`AnnounceConsumer`]. Cheap to clone; call +/// [`Self::consume`] to obtain an [`AnnounceConsumer`] that receives events. +#[derive(Clone)] +pub struct AnnounceProducer { + nodes: OriginNodes, + root: PathOwned, +} + +impl AnnounceProducer { + fn new(root: PathOwned, nodes: OriginNodes) -> Self { + Self { nodes, root } + } + + /// Subscribe to announce / unannounce events for this subtree. + /// + /// Allocates a per-cursor coalescing buffer and replays the currently active broadcast set + /// as initial announcements. Drop the returned [`AnnounceConsumer`] to + /// unregister. + pub fn consume(&self) -> AnnounceConsumer { + AnnounceConsumer::new(self.root.clone(), self.nodes.clone()) + } + + /// Returns the prefix that is automatically stripped from announced paths. + pub fn root(&self) -> &Path<'_> { + &self.root + } +} + +/// Receives announce / unannounce events for a subtree. +/// +/// Created by [`OriginConsumer::announced`] or [`AnnounceProducer::consume`]. +/// Drop to unregister. +pub struct AnnounceConsumer { + id: ConsumerId, + nodes: OriginNodes, + root: PathOwned, + + // Pending updates queued for this cursor. Coalesced so a slow consumer + // can't accumulate redundant announce/unannounce pairs. + state: conducer::Producer, +} + +impl AnnounceConsumer { + fn new(root: PathOwned, nodes: OriginNodes) -> Self { + let state = conducer::Producer::::default(); + let id = ConsumerId::new(); + + for (_, node) in &nodes.nodes { + let notify = AnnounceConsumerNotify { + root: root.clone(), + state: state.clone(), + }; + node.lock().consume(id, notify); } + + Self { id, nodes, root, state } + } + + /// Returns the next (un)announced broadcast and its path relative to this + /// cursor's root. + /// + /// The broadcast will only be announced if it was previously unannounced. + /// The same path won't be announced/unannounced twice in a row; instead it + /// toggles. Returns None if the cursor is closed. + pub async fn next(&mut self) -> Option { + conducer::wait(|waiter| self.poll_next(waiter)).await + } + + /// Poll for the next (un)announced broadcast, without blocking. + /// + /// Returns `Poll::Ready(Some(_))` for an update, `Poll::Ready(None)` if the + /// cursor is closed, or `Poll::Pending` after registering `waiter` to be + /// notified when the next update arrives. + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll> { + match self.state.poll(waiter, |state| match state.take() { + Some(item) => Poll::Ready(item), + None => Poll::Pending, + }) { + Poll::Ready(Ok(item)) => Poll::Ready(Some(item)), + // Closed: discard the Ref so its MutexGuard doesn't escape this call. + Poll::Ready(Err(_)) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + /// Returns the next (un)announced broadcast without blocking. + /// + /// Returns None if there is no update available; NOT because the cursor is closed. + /// Use [`Self::is_closed`] to check if the cursor is closed. + pub fn try_next(&mut self) -> Option { + self.state.write().ok()?.take() + } + + /// Returns true if the cursor is closed (no more updates will arrive). + pub fn is_closed(&self) -> bool { + self.state.write().is_err() + } + + /// Returns the prefix that is automatically stripped from emitted paths. + pub fn root(&self) -> &Path<'_> { + &self.root + } + + /// Converts a relative path to an absolute path. + pub fn absolute(&self, path: impl AsPath) -> Path<'_> { + self.root.join(path) } } -impl Clone for OriginConsumer { - fn clone(&self) -> Self { - OriginConsumer::new(self.info, self.root.clone(), self.nodes.clone()) +impl Drop for AnnounceConsumer { + fn drop(&mut self) { + for (_, root) in &self.nodes.nodes { + root.lock().unconsume(self.id); + } } } @@ -967,30 +1028,30 @@ impl Clone for OriginConsumer { use futures::FutureExt; #[cfg(test)] -impl OriginConsumer { +impl AnnounceConsumer { pub fn assert_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) { let expected = expected.as_path(); - let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next"); + let (path, active) = self.next().now_or_never().expect("next blocked").expect("no next"); assert_eq!(path, expected, "wrong path"); assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast"); } pub fn assert_try_next(&mut self, expected: impl AsPath, broadcast: &BroadcastConsumer) { let expected = expected.as_path(); - let (path, active) = self.try_announced().expect("no next"); + let (path, active) = self.try_next().expect("no next"); assert_eq!(path, expected, "wrong path"); assert!(active.unwrap().is_clone(broadcast), "should be the same broadcast"); } pub fn assert_next_none(&mut self, expected: impl AsPath) { let expected = expected.as_path(); - let (path, active) = self.announced().now_or_never().expect("next blocked").expect("no next"); + let (path, active) = self.next().now_or_never().expect("next blocked").expect("no next"); assert_eq!(path, expected, "wrong path"); assert!(active.is_none(), "should be unannounced"); } pub fn assert_next_wait(&mut self) { - if let Some(res) = self.announced().now_or_never() { + if let Some(res) = self.next().now_or_never() { panic!("next should block: got {:?}", res.map(|(path, _)| path)); } } @@ -998,7 +1059,7 @@ impl OriginConsumer { /* pub fn assert_next_closed(&mut self) { assert!( - self.announced().now_or_never().expect("next blocked").is_none(), + self.next().now_or_never().expect("next blocked").is_none(), "next should be closed" ); } @@ -1038,7 +1099,7 @@ mod tests { let broadcast1 = Broadcast::new().produce(); let broadcast2 = Broadcast::new().produce(); - let mut consumer1 = origin.consume(); + let mut consumer1 = origin.consume().announced(); // Make a new consumer that should get it. consumer1.assert_next_wait(); @@ -1050,7 +1111,7 @@ mod tests { // Make a new consumer that should get the existing broadcast. // But we don't consume it yet. - let mut consumer2 = origin.consume(); + let mut consumer2 = origin.consume().announced(); // Publish the second broadcast. origin.publish_broadcast("test2", broadcast2.consume()); @@ -1075,7 +1136,7 @@ mod tests { consumer2.assert_next_wait(); // And a new consumer only gets the last broadcast. - let mut consumer3 = origin.consume(); + let mut consumer3 = origin.consume().announced(); consumer3.assert_next("test2", &broadcast2.consume()); consumer3.assert_next_wait(); @@ -1110,7 +1171,8 @@ mod tests { let consumer2 = broadcast2.consume(); let consumer3 = broadcast3.consume(); - let mut consumer = origin.consume(); + let consumer = origin.consume(); + let mut announced = consumer.announced(); origin.publish_broadcast("test", consumer1.clone()); origin.publish_broadcast("test", consumer2.clone()); @@ -1118,11 +1180,11 @@ mod tests { assert!(consumer.get_broadcast("test").is_some()); // On equal hop lengths, each new publish replaces the active and reannounces. - // Because the consumer hasn't drained between publishes, the stale announces + // Because the cursor hasn't drained between publishes, the stale announces // collapse with their following unannounces and only the final active broadcast // is delivered. - consumer.assert_next("test", &consumer3); - consumer.assert_next_wait(); + announced.assert_next("test", &consumer3); + announced.assert_next_wait(); // Drop a backup, nothing should change. drop(broadcast2); @@ -1131,7 +1193,7 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; assert!(consumer.get_broadcast("test").is_some()); - consumer.assert_next_wait(); + announced.assert_next_wait(); // Drop the active, we should reannounce with the remaining backup. drop(broadcast3); @@ -1140,8 +1202,8 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; assert!(consumer.get_broadcast("test").is_some()); - consumer.assert_next_none("test"); - consumer.assert_next("test", &consumer1); + announced.assert_next_none("test"); + announced.assert_next("test", &consumer1); // Drop the final broadcast, we should unannounce. drop(broadcast1); @@ -1150,8 +1212,8 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; assert!(consumer.get_broadcast("test").is_none()); - consumer.assert_next_none("test"); - consumer.assert_next_wait(); + announced.assert_next_none("test"); + announced.assert_next_wait(); } #[tokio::test] @@ -1208,7 +1270,7 @@ mod tests { let origin = Origin::random().produce(); let broadcast = Broadcast::new().produce(); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); for i in 0..256 { origin.publish_broadcast(format!("test{i:03}"), broadcast.consume()); } @@ -1224,7 +1286,7 @@ mod tests { let origin = Origin::random().produce(); let broadcast = Broadcast::new().produce(); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); for i in 0..256 { origin.publish_broadcast(format!("test{i:03}"), broadcast.consume()); } @@ -1243,7 +1305,7 @@ mod tests { let foo_producer = origin.with_root("foo").expect("should create root"); assert_eq!(foo_producer.root().as_str(), "foo"); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); // When publishing to "bar/baz", it should actually publish to "foo/bar/baz" assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume())); @@ -1251,7 +1313,7 @@ mod tests { consumer.assert_next("foo/bar/baz", &broadcast.consume()); // A consumer created from the rooted producer should see the stripped path - let mut foo_consumer = foo_producer.consume(); + let mut foo_consumer = foo_producer.consume().announced(); foo_consumer.assert_next("bar/baz", &broadcast.consume()); } @@ -1265,7 +1327,7 @@ mod tests { let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root"); assert_eq!(foo_bar_producer.root().as_str(), "foo/bar"); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); // Publishing to "baz" should actually publish to "foo/bar/baz" assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume())); @@ -1273,7 +1335,7 @@ mod tests { consumer.assert_next("foo/bar/baz", &broadcast.consume()); // Consumer from foo_bar_producer sees just "baz" - let mut foo_bar_consumer = foo_bar_producer.consume(); + let mut foo_bar_consumer = foo_bar_producer.consume().announced(); foo_bar_consumer.assert_next("baz", &broadcast.consume()); } @@ -1313,7 +1375,7 @@ mod tests { let broadcast2 = Broadcast::new().produce(); let broadcast3 = Broadcast::new().produce(); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); // Publish to different paths origin.publish_broadcast("allowed", broadcast1.consume()); @@ -1324,7 +1386,8 @@ mod tests { let mut limited_consumer = origin .consume() .scope(&["allowed".into()]) - .expect("should create limited consumer"); + .expect("should create limited consumer") + .announced(); // Should only receive broadcasts under "allowed" limited_consumer.assert_next("allowed", &broadcast1.consume()); @@ -1352,7 +1415,8 @@ mod tests { let mut limited_consumer = origin .consume() .scope(&["foo".into(), "bar".into()]) - .expect("should create limited consumer"); + .expect("should create limited consumer") + .announced(); // Order depends on PathPrefixes canonical sort (lexicographic for same length) limited_consumer.assert_next("bar/test", &broadcast2.consume()); @@ -1373,7 +1437,7 @@ mod tests { .scope(&["bar".into(), "goop/pee".into()]) .expect("should create limited producer"); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); // Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee) assert!(limited_producer.publish_broadcast("bar", broadcast.consume())); @@ -1412,7 +1476,8 @@ mod tests { let mut limited_consumer = foo_producer .consume() .scope(&["bar".into(), "goop/pee".into()]) - .expect("should create limited consumer"); + .expect("should create limited consumer") + .announced(); // Should only see allowed paths (without foo prefix) limited_consumer.assert_next("bar/test", &broadcast1.consume()); @@ -1520,17 +1585,20 @@ mod tests { let mut foo_consumer = origin .consume() .scope(&["foo".into()]) - .expect("should create foo consumer"); + .expect("should create foo consumer") + .announced(); let mut bar_consumer = origin .consume() .scope(&["bar".into()]) - .expect("should create bar consumer"); + .expect("should create bar consumer") + .announced(); let mut foobar_consumer = origin .consume() .scope(&["foo".into(), "bar".into()]) - .expect("should create foobar consumer"); + .expect("should create foobar consumer") + .announced(); // Each consumer should only see their allowed paths foo_consumer.assert_next("foo/test", &broadcast1.consume()); @@ -1564,11 +1632,12 @@ mod tests { let mut consumer = limited_producer .consume() .scope(&["".into()]) - .expect("should create consumer with empty prefix"); + .expect("should create consumer with empty prefix") + .announced(); // Should see both broadcasts (order depends on PathPrefixes sort) - let a1 = consumer.try_announced().expect("expected first announcement"); - let a2 = consumer.try_announced().expect("expected second announcement"); + let a1 = consumer.try_next().expect("expected first announcement"); + let a2 = consumer.try_next().expect("expected second announcement"); consumer.assert_next_wait(); let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect(); @@ -1598,7 +1667,8 @@ mod tests { let mut worm_consumer = limited_producer .consume() .scope(&["worm-node".into()]) - .expect("should create worm-node consumer"); + .expect("should create worm-node consumer") + .announced(); // Should see worm-node content with paths stripped to "" worm_consumer.assert_next("worm-node", &broadcast1.consume()); @@ -1609,7 +1679,8 @@ mod tests { let mut foo_consumer = limited_producer .consume() .scope(&["worm-node/foo".into()]) - .expect("should create worm-node/foo consumer"); + .expect("should create worm-node/foo consumer") + .announced(); foo_consumer.assert_next("worm-node/foo", &broadcast2.consume()); foo_consumer.assert_next_wait(); // Should NOT see other content @@ -1636,7 +1707,8 @@ mod tests { let mut consumer = limited_producer .consume() .scope(&["".into()]) - .expect("should create consumer with empty prefix"); + .expect("should create consumer with empty prefix") + .announced(); // Should see all broadcasts from all roots consumer.assert_next("app1/data", &broadcast1.consume()); @@ -1686,7 +1758,8 @@ mod tests { let mut team2_consumer = limited_producer .consume() .scope(&["org/team2".into()]) - .expect("should create team2 consumer"); + .expect("should create team2 consumer") + .announced(); team2_consumer.assert_next("org/team2/project1", &broadcast3.consume()); team2_consumer.assert_next_wait(); // Should NOT see team1 content @@ -1695,7 +1768,8 @@ mod tests { let mut project1_consumer = limited_producer .consume() .scope(&["org/team1/project1".into()]) - .expect("should create project1 consumer"); + .expect("should create project1 consumer") + .announced(); // Should only see project1 content at root project1_consumer.assert_next("org/team1/project1", &broadcast1.consume()); @@ -1726,7 +1800,7 @@ mod tests { // Use an owned String so the trailing slash is NOT normalized away. let prefix = "some_prefix/".to_string(); - let mut consumer = origin.consume().with_root(prefix).unwrap(); + let mut consumer = origin.consume().with_root(prefix).unwrap().announced(); let b = origin.create_broadcast("some_prefix/test").unwrap(); consumer.assert_next("test", &b.consume()); @@ -1743,7 +1817,7 @@ mod tests { let b = rooted.create_broadcast("test").unwrap(); - let mut consumer = rooted.consume(); + let mut consumer = rooted.consume().announced(); consumer.assert_next("test", &b.consume()); } @@ -1755,7 +1829,7 @@ mod tests { let origin = Origin::random().produce(); let prefix = "some_prefix/".to_string(); - let mut consumer = origin.consume().with_root(prefix).unwrap(); + let mut consumer = origin.consume().with_root(prefix).unwrap().announced(); let b = origin.create_broadcast("some_prefix/test").unwrap(); consumer.assert_next("test", &b.consume()); @@ -1788,11 +1862,12 @@ mod tests { let mut consumer = user_producer .consume() .scope(&["".into()]) - .expect("scope with empty prefix should not fail when user has specific permissions"); + .expect("scope with empty prefix should not fail when user has specific permissions") + .announced(); // Should still receive broadcasts from allowed paths (order not guaranteed) - let a1 = consumer.try_announced().expect("expected first announcement"); - let a2 = consumer.try_announced().expect("expected second announcement"); + let a1 = consumer.try_next().expect("expected first announcement"); + let a2 = consumer.try_next().expect("expected second announcement"); consumer.assert_next_wait(); let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect(); @@ -1803,7 +1878,8 @@ mod tests { let mut narrow_consumer = user_producer .consume() .scope(&["worm-node".into()]) - .expect("should be able to narrow scope to worm-node"); + .expect("should be able to narrow scope to worm-node") + .announced(); narrow_consumer.assert_next("worm-node/data", &broadcast1.consume()); narrow_consumer.assert_next_wait(); // Should not see foobar @@ -1821,7 +1897,7 @@ mod tests { assert!(producer.publish_broadcast("demo/stream", broadcast.consume())); - let mut consumer = producer.consume(); + let mut consumer = producer.consume().announced(); consumer.assert_next("demo/stream", &broadcast.consume()); consumer.assert_next_wait(); } @@ -1839,7 +1915,7 @@ mod tests { // Can still publish under "demo/bar" since "demo" covers everything assert!(producer.publish_broadcast("demo/bar/stream", broadcast.consume())); - let mut consumer = producer.consume(); + let mut consumer = producer.consume().announced(); consumer.assert_next("demo/bar/stream", &broadcast.consume()); consumer.assert_next_wait(); } @@ -1856,7 +1932,7 @@ mod tests { assert!(producer.publish_broadcast("demo/foo/stream", broadcast.consume())); - let mut consumer = producer.consume(); + let mut consumer = producer.consume().announced(); // Should only get ONE announcement (not two from overlapping nodes) consumer.assert_next("demo/foo/stream", &broadcast.consume()); consumer.assert_next_wait(); @@ -1994,16 +2070,16 @@ mod tests { assert!(result.is_none()); } - // Coalescing tests: a slow consumer that doesn't drain between updates + // Coalescing tests: a slow cursor that doesn't drain between updates // should observe a bounded number of deliveries. #[tokio::test] async fn test_coalesce_announce_then_unannounce() { - // announce + unannounce that the consumer hasn't observed yet collapses to nothing. + // announce + unannounce that the cursor hasn't observed yet collapses to nothing. tokio::time::pause(); let origin = Origin::random().produce(); - let mut consumer = origin.consume(); + let mut announced = origin.consume().announced(); let broadcast = Broadcast::new().produce(); origin.publish_broadcast("test", broadcast.consume()); @@ -2011,17 +2087,17 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - consumer.assert_next_wait(); + announced.assert_next_wait(); } #[tokio::test] async fn test_coalesce_announce_unannounce_announce() { - // announce, unannounce, announce that the consumer hasn't drained collapses + // announce, unannounce, announce that the cursor hasn't drained collapses // to a single Announce of the latest broadcast. tokio::time::pause(); let origin = Origin::random().produce(); - let mut consumer = origin.consume(); + let mut announced = origin.consume().announced(); let broadcast1 = Broadcast::new().produce(); let broadcast2 = Broadcast::new().produce(); @@ -2031,22 +2107,22 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; origin.publish_broadcast("test", broadcast2.consume()); - consumer.assert_next("test", &broadcast2.consume()); - consumer.assert_next_wait(); + announced.assert_next("test", &broadcast2.consume()); + announced.assert_next_wait(); } #[tokio::test] async fn test_coalesce_unannounce_announce_preserved() { // unannounce followed by announce of a different broadcast must be preserved - // as two deliveries so the consumer learns the origin changed. + // as two deliveries so the cursor learns the origin changed. tokio::time::pause(); let origin = Origin::random().produce(); let broadcast1 = Broadcast::new().produce(); origin.publish_broadcast("test", broadcast1.consume()); - let mut consumer = origin.consume(); - consumer.assert_next("test", &broadcast1.consume()); + let mut announced = origin.consume().announced(); + announced.assert_next("test", &broadcast1.consume()); // Drop, then publish a fresh broadcast at the same path. drop(broadcast1); @@ -2055,10 +2131,10 @@ mod tests { let broadcast2 = Broadcast::new().produce(); origin.publish_broadcast("test", broadcast2.consume()); - // The consumer must see the unannounce before the new announce. - consumer.assert_next_none("test"); - consumer.assert_next("test", &broadcast2.consume()); - consumer.assert_next_wait(); + // The cursor must see the unannounce before the new announce. + announced.assert_next_none("test"); + announced.assert_next("test", &broadcast2.consume()); + announced.assert_next_wait(); } #[tokio::test] @@ -2071,8 +2147,8 @@ mod tests { let broadcast1 = Broadcast::new().produce(); origin.publish_broadcast("test", broadcast1.consume()); - let mut consumer = origin.consume(); - consumer.assert_next("test", &broadcast1.consume()); + let mut announced = origin.consume().announced(); + announced.assert_next("test", &broadcast1.consume()); drop(broadcast1); tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; @@ -2082,20 +2158,20 @@ mod tests { drop(broadcast2); tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - consumer.assert_next_none("test"); - consumer.assert_next_wait(); + announced.assert_next_none("test"); + announced.assert_next_wait(); } #[tokio::test] async fn test_coalesce_churn_bounded() { // A churn loop on a single path should keep the pending set bounded. - // Backup promotion during cleanup can leave the consumer with zero or one + // Backup promotion during cleanup can leave the cursor with zero or one // pending update for "test" depending on the order tasks run; we only // require that churn doesn't accumulate across iterations. tokio::time::pause(); let origin = Origin::random().produce(); - let mut consumer = origin.consume(); + let mut announced = origin.consume().announced(); for _ in 0..1000 { let broadcast = Broadcast::new().produce(); @@ -2105,7 +2181,7 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; let mut collected = Vec::new(); - while let Some(update) = consumer.try_announced() { + while let Some(update) = announced.try_next() { collected.push(update); } assert!( @@ -2118,4 +2194,48 @@ mod tests { "unexpected path in pending updates", ); } + + // OriginConsumer should be cheap to clone: cloning must NOT drain any + // other cursor's announce channel. A freshly-built AnnounceConsumer + // still receives the active backlog. + #[tokio::test] + async fn test_consumer_clone_is_side_effect_free() { + let origin = Origin::random().produce(); + let broadcast1 = Broadcast::new().produce(); + let broadcast2 = Broadcast::new().produce(); + + origin.publish_broadcast("test1", broadcast1.consume()); + origin.publish_broadcast("test2", broadcast2.consume()); + + let consumer = origin.consume(); + let mut announced = consumer.announced(); + + // Cloning the OriginConsumer many times and looking up broadcasts + // must not consume any events from the existing cursor. + for _ in 0..16 { + let cloned = consumer.clone(); + assert!(cloned.get_broadcast("test1").is_some()); + assert!(cloned.get_broadcast("test2").is_some()); + } + + // The original cursor still sees both announcements in their + // natural order, undisturbed by the clones above. + let a1 = announced.try_next().expect("first announcement"); + let a2 = announced.try_next().expect("second announcement"); + announced.assert_next_wait(); + + let mut paths: Vec<_> = [&a1, &a2].iter().map(|(p, _)| p.to_string()).collect(); + paths.sort(); + assert_eq!(paths, ["test1", "test2"]); + + // A freshly-built AnnounceConsumer still receives the active backlog. + let mut fresh = consumer.announced(); + let b1 = fresh.try_next().expect("backlog: first"); + let b2 = fresh.try_next().expect("backlog: second"); + fresh.assert_next_wait(); + + let mut paths: Vec<_> = [&b1, &b2].iter().map(|(p, _)| p.to_string()).collect(); + paths.sort(); + assert_eq!(paths, ["test1", "test2"]); + } } diff --git a/rs/moq-net/src/stats.rs b/rs/moq-net/src/stats.rs index aa178e481..14a368f32 100644 --- a/rs/moq-net/src/stats.rs +++ b/rs/moq-net/src/stats.rs @@ -1063,7 +1063,7 @@ mod tests { async fn task_spawns_on_first_subscribe_and_announces() { let origin = Origin::random().produce(); let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin.clone()); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); let bs = stats.tier(Tier::External).broadcast("foo/bar"); let p = bs.publisher(); @@ -1073,7 +1073,7 @@ mod tests { // levels=1 + broadcast "foo/bar" → buckets ["", "foo"]: root + per-first-segment. let mut seen = std::collections::HashSet::new(); for _ in 0..2 { - let (path, broadcast) = consumer.announced().await.expect("expected announce"); + let (path, broadcast) = consumer.next().await.expect("expected announce"); assert!(broadcast.is_some()); seen.insert(path.as_str().to_string()); } @@ -1085,7 +1085,7 @@ mod tests { async fn task_spawns_with_node_suffix() { let origin = Origin::random().produce(); let stats = Stats::new(".stats", 2, Some(PathOwned::from("sjc")), origin.clone()); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); let bs = stats.tier(Tier::External).broadcast("foo/bar"); let p = bs.publisher(); @@ -1096,7 +1096,7 @@ mod tests { // suffixed with `/sjc`. let mut seen = std::collections::HashSet::new(); for _ in 0..3 { - let (path, broadcast) = consumer.announced().await.expect("expected announce"); + let (path, broadcast) = consumer.next().await.expect("expected announce"); assert!(broadcast.is_some()); seen.insert(path.as_str().to_string()); } @@ -1110,7 +1110,7 @@ mod tests { // node=None: paths should omit the trailing / segment. let origin = Origin::random().produce(); let stats = Stats::new(".stats", 1, None, origin.clone()); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); let bs = stats.tier(Tier::External).broadcast("foo/bar"); let p = bs.publisher(); @@ -1119,7 +1119,7 @@ mod tests { tokio::time::advance(Duration::from_millis(1)).await; let mut seen = std::collections::HashSet::new(); for _ in 0..2 { - let (path, broadcast) = consumer.announced().await.expect("expected announce"); + let (path, broadcast) = consumer.next().await.expect("expected announce"); assert!(broadcast.is_some()); seen.insert(path.as_str().to_string()); } @@ -1134,7 +1134,7 @@ mod tests { // the first-segment prefix; the broadcast's own path isn't reachable at // this depth, so we get exactly two stats announces). let stats = Stats::new(".stats", 1, Some(PathOwned::from("sjc")), origin.clone()); - let mut consumer = origin.consume(); + let mut consumer = origin.consume().announced(); let bs = stats.tier(Tier::External).broadcast("foo/bar"); let p = bs.publisher(); @@ -1143,7 +1143,7 @@ mod tests { tokio::time::advance(Duration::from_millis(1)).await; let mut announced: Vec = Vec::new(); for _ in 0..2 { - let (path, broadcast) = consumer.announced().await.expect("expected announce"); + let (path, broadcast) = consumer.next().await.expect("expected announce"); assert!(broadcast.is_some(), "expected an active announce"); announced.push(path.as_str().to_string()); } @@ -1157,7 +1157,7 @@ mod tests { tokio::time::advance(Duration::from_secs(2)).await; let mut unannounced: Vec = Vec::new(); for _ in 0..2 { - let (path, broadcast) = consumer.announced().await.expect("expected unannounce"); + let (path, broadcast) = consumer.next().await.expect("expected unannounce"); assert!(broadcast.is_none(), "expected an unannounce"); unannounced.push(path.as_str().to_string()); } diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index a52fecac1..86d24df4c 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -344,10 +344,11 @@ impl Cluster { /// unannounce-then-announce within sub-milliseconds, which clears the /// pending-cleanup timestamp long before the sweep fires. async fn run_discovery(self, self_url: String, token: String, dialed: DialMap) { - let Some(mut consumer) = self.origin.consume().with_root(MESH_PREFIX) else { + let Some(consumer) = self.origin.consume().with_root(MESH_PREFIX) else { tracing::warn!("could not scope cluster origin to {MESH_PREFIX}; discovery disabled"); return; }; + let mut announced = consumer.announced(); let mut sweep = tokio::time::interval(SWEEP_INTERVAL); sweep.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -356,7 +357,7 @@ impl Cluster { loop { tokio::select! { - ann = consumer.announced() => { + ann = announced.next() => { let Some((relative, announced)) = ann else { return; }; let peer = relative.as_str(); if peer == self_url { @@ -624,7 +625,7 @@ mod tests { // Snapshot a consumer on the cluster origin before run() takes ownership of // `cluster` so we can later check that the registration was published. - let mut watcher = cluster.origin.consume(); + let mut watcher = cluster.origin.consume().announced(); let cluster_run = cluster.clone(); let mut handle = tokio::spawn(async move { cluster_run.run().await }); @@ -633,7 +634,7 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; // The self-registration broadcast must be visible on the origin. - let (path, broadcast) = watcher.try_announced().expect("self-registration must be published"); + let (path, broadcast) = watcher.try_next().expect("self-registration must be published"); assert_eq!(path.as_str(), ".internal/origins/rendezvous.example.com:4443"); assert!(broadcast.is_some()); diff --git a/rs/moq-relay/src/web.rs b/rs/moq-relay/src/web.rs index 61f4c48f2..1e81f3e15 100644 --- a/rs/moq-relay/src/web.rs +++ b/rs/moq-relay/src/web.rs @@ -458,13 +458,14 @@ async fn serve_announced( } else { state.auth.verify(¶ms).await? }; - let Some(mut origin) = state.cluster.subscriber(&token) else { + let Some(origin) = state.cluster.subscriber(&token) else { return Err(StatusCode::UNAUTHORIZED.into()); }; + let mut announced = origin.announced(); let mut broadcasts = Vec::new(); - while let Some((suffix, active)) = origin.try_announced() { + while let Some((suffix, active)) = announced.try_next() { if active.is_some() { broadcasts.push(suffix); }