Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rs/hang/examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ async fn run_session(origin: moq_net::OriginProducer) -> anyhow::Result<()> {
}

// Subscribe to a broadcast and read media frames.
async fn run_subscribe(mut consumer: moq_net::OriginConsumer) -> anyhow::Result<()> {
async fn run_subscribe(consumer: moq_net::OriginConsumer) -> anyhow::Result<()> {
// Wait for a broadcast to be announced.
let (path, broadcast) = consumer
.announced()
.next()
.await
.ok_or_else(|| anyhow::anyhow!("origin closed"))?;

Expand Down
10 changes: 4 additions & 6 deletions rs/libmoq/src/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Origin {

pub fn announced(&mut self, origin: Id, on_announce: OnStatus) -> Result<Id, Error> {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
let consumer = origin.consume();
let consumer = origin.consume().announced();
let channel = oneshot::channel();

let entry = TaskEntry {
Expand All @@ -64,8 +64,8 @@ impl Origin {
Ok(id)
}

async fn run_announced(task_id: Id, mut consumer: moq_net::OriginConsumer) -> Result<(), Error> {
while let Some((path, broadcast)) = consumer.announced().await {
async fn run_announced(task_id: Id, mut consumer: moq_net::AnnounceConsumer) -> Result<(), Error> {
while let Some((path, broadcast)) = consumer.next().await {
let mut state = State::lock();

// Stop if the callback was revoked by close.
Expand Down Expand Up @@ -108,9 +108,7 @@ impl Origin {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
// TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait
// for gossip instead of racing it.
// Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume().
#[allow(deprecated)]
origin.get_broadcast(path).ok_or(Error::BroadcastNotFound)
origin.consume().get_broadcast(path).ok_or(Error::BroadcastNotFound)
}

pub fn publish<P: moq_net::AsPath>(
Expand Down
4 changes: 2 additions & 2 deletions rs/moq-boy/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ pub enum Command {

/// Handles discovered viewers: subscribes to their command tracks.
pub async fn handle_viewers(
viewer_origin: &mut moq_net::OriginConsumer,
viewer_origin: &mut moq_net::AnnounceConsumer,
cmd_tx: &tokio::sync::mpsc::Sender<Command>,
) -> anyhow::Result<()> {
loop {
let Some((path, broadcast)) = viewer_origin.announced().await else {
let Some((path, broadcast)) = viewer_origin.next().await else {
break;
};

Expand Down
3 changes: 2 additions & 1 deletion rs/moq-boy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ async fn run(config: &Config) -> Result<()> {
let mut viewer_consumer = consume_origin
.with_root(&viewer_path)
.expect("viewer prefix should be valid")
.consume();
.consume()
.announced();

tracing::info!(url = %config.url, %name, broadcast = %broadcast_path, "connecting to relay");

Expand Down
18 changes: 11 additions & 7 deletions rs/moq-ffi/src/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ pub struct MoqAnnounced {
}

struct Announced {
inner: moq_net::OriginConsumer,
inner: moq_net::AnnounceConsumer,
}

impl Announced {
async fn next(&mut self) -> Result<Option<Arc<MoqAnnouncement>>, MoqError> {
loop {
match self.inner.announced().await {
match self.inner.next().await {
Some((path, Some(broadcast))) => {
return Ok(Some(Arc::new(MoqAnnouncement {
path: path.to_string(),
Expand All @@ -43,7 +43,7 @@ impl Announced {

async fn available(&mut self) -> Result<Arc<MoqBroadcastConsumer>, MoqError> {
loop {
match self.inner.announced().await {
match self.inner.next().await {
Some((_path, Some(broadcast))) => {
return Ok(Arc::new(MoqBroadcastConsumer::new(broadcast)));
}
Expand Down Expand Up @@ -109,18 +109,22 @@ impl MoqOriginConsumer {
/// Subscribe to all broadcast announcements under a prefix.
pub fn announced(&self, prefix: String) -> Result<Arc<MoqAnnounced>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let origin = self.inner.clone().with_root(prefix).ok_or(MoqError::Unauthorized)?;
let origin = self.inner.with_root(prefix).ok_or(MoqError::Unauthorized)?;
Ok(Arc::new(MoqAnnounced {
task: Task::new(Announced { inner: origin }),
task: Task::new(Announced {
inner: origin.announced(),
}),
}))
}

/// Wait for a specific broadcast to be announced by path.
pub fn announced_broadcast(&self, path: String) -> Result<Arc<MoqAnnouncedBroadcast>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let origin = self.inner.clone().with_root(path).ok_or(MoqError::Unauthorized)?;
let origin = self.inner.with_root(path).ok_or(MoqError::Unauthorized)?;
Ok(Arc::new(MoqAnnouncedBroadcast {
task: Task::new(Announced { inner: origin }),
task: Task::new(Announced {
inner: origin.announced(),
}),
}))
}
}
Expand Down
5 changes: 3 additions & 2 deletions rs/moq-native/examples/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ async fn main() -> anyhow::Result<()> {
let mut origin = origin
.scope(&[path])
.context("not allowed to consume broadcast")?
.consume();
.consume()
.announced();

let mut clock: Option<Subscriber> = None;

loop {
tokio::select! {
Some(announce) = origin.announced() => match announce {
Some(announce) = origin.next() => match announce {
(path, Some(broadcast)) => {
tracing::info!(broadcast = %path, "broadcast is online, subscribing to track");
let track = broadcast.subscribe_track(&track)?;
Expand Down
8 changes: 4 additions & 4 deletions rs/moq-native/tests/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn backend_test(scheme: &str, backend: moq_native::QuicBackend) {

// ── subscriber (client) ─────────────────────────────────────────
let sub_origin = Origin::random().produce();
let mut announcements = sub_origin.consume();
let mut announcements = sub_origin.consume().announced();

let mut client_config = moq_native::ClientConfig::default();
client_config.tls.disable_verify = Some(true);
Expand All @@ -61,7 +61,7 @@ async fn backend_test(scheme: &str, backend: moq_native::QuicBackend) {
.expect("client connect timed out")
.expect("client connect failed");

let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced())
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next())
.await
.expect("announce timed out")
.expect("origin closed");
Expand Down Expand Up @@ -173,7 +173,7 @@ async fn iroh_connect() {

// ── subscriber (client) ─────────────────────────────────────────
let sub_origin = Origin::random().produce();
let mut announcements = sub_origin.consume();
let mut announcements = sub_origin.consume().announced();

// Create client iroh endpoint
let mut client_iroh_config = IrohEndpointConfig::default();
Expand Down Expand Up @@ -213,7 +213,7 @@ async fn iroh_connect() {
.expect("client connect timed out")
.expect("client connect failed");

let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced())
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next())
.await
.expect("announce timed out")
.expect("origin closed");
Expand Down
12 changes: 6 additions & 6 deletions rs/moq-native/tests/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn broadcast_test(scheme: &str, client_version: Option<&str>, server_versi

// ── subscriber (client) ─────────────────────────────────────────
let sub_origin = Origin::random().produce();
let mut announcements = sub_origin.consume();
let mut announcements = sub_origin.consume().announced();

let mut client_config = moq_native::ClientConfig::default();
client_config.tls.disable_verify = Some(true);
Expand Down Expand Up @@ -78,7 +78,7 @@ async fn broadcast_test(scheme: &str, client_version: Option<&str>, server_versi
.expect("client connect failed");

// Wait for the broadcast announcement.
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced())
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next())
.await
.expect("announce timed out")
.expect("origin closed");
Expand Down Expand Up @@ -460,7 +460,7 @@ async fn broadcast_websocket() {

// ── subscriber (client) ─────────────────────────────────────────
let sub_origin = Origin::random().produce();
let mut announcements = sub_origin.consume();
let mut announcements = sub_origin.consume().announced();

let mut client_config = moq_native::ClientConfig::default();
client_config.tls.disable_verify = Some(true);
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn broadcast_websocket() {
.expect("client connect failed");

// Wait for the broadcast announcement.
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced())
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next())
.await
.expect("announce timed out")
.expect("origin closed");
Expand Down Expand Up @@ -564,7 +564,7 @@ async fn broadcast_websocket_fallback() {

// ── subscriber (client) ─────────────────────────────────────────
let sub_origin = Origin::random().produce();
let mut announcements = sub_origin.consume();
let mut announcements = sub_origin.consume().announced();

let mut client_config = moq_native::ClientConfig::default();
client_config.tls.disable_verify = Some(true);
Expand Down Expand Up @@ -597,7 +597,7 @@ async fn broadcast_websocket_fallback() {
.expect("client connect failed");

// Wait for the broadcast announcement.
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.announced())
let (path, bc) = tokio::time::timeout(TIMEOUT, announcements.next())
.await
.expect("announce timed out")
.expect("origin closed");
Expand Down
19 changes: 11 additions & 8 deletions rs/moq-net/src/ietf/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,17 +437,18 @@ impl<S: web_transport_trait::Session> Publisher<S> {
}

/// Outgoing PublishNamespace: announce each namespace via a bidi stream.
async fn run_announce(mut self) -> Result<(), Error> {
async fn run_announce(self) -> Result<(), Error> {
let mut namespace_streams: HashMap<crate::PathOwned, (RequestId, Stream<S, Version>)> = HashMap::new();
let mut announced = self.origin.announced();

loop {
let announced = tokio::select! {
let next = tokio::select! {
biased;
_ = self.session.closed() => return Ok(()),
announced = self.origin.announced() => announced,
next = announced.next() => next,
};

let Some((path, active)) = announced else {
let Some((path, active)) = next else {
break;
};

Expand Down Expand Up @@ -548,7 +549,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {

tracing::debug!(prefix = %self.origin.absolute(&prefix), "subscribe_namespace stream");

let mut origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?;
let origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?;

// Send OK response
match self.version {
Expand Down Expand Up @@ -585,8 +586,10 @@ impl<S: web_transport_trait::Session> Publisher<S> {
}
// v16+: Send Namespace/NamespaceDone entries on this bidi stream.
_ => {
let mut announced = origin.announced();

// Send initial NAMESPACE messages for currently active namespaces
while let Some((path, active)) = origin.try_announced() {
while let Some((path, active)) = announced.try_next() {
let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path");
if active.is_some() {
tracing::debug!(broadcast = %origin.absolute(&path), "namespace");
Expand All @@ -605,8 +608,8 @@ impl<S: web_transport_trait::Session> Publisher<S> {
tokio::select! {
biased;
res = stream.reader.closed() => return res,
announced = origin.announced() => {
match announced {
next = announced.next() => {
match next {
Some((path, active)) => {
let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned();
if active.is_some() {
Expand Down
20 changes: 12 additions & 8 deletions rs/moq-net/src/lite/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use web_async::FuturesExt;
use web_transport_trait::Stats;

use crate::{
AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, OriginList, StatsHandle as MoqStats, Track,
TrackConsumer,
AnnounceConsumer, AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, OriginList, StatsHandle as MoqStats,
Track, TrackConsumer,
coding::{Stream, Writer},
lite::{
self,
Expand Down Expand Up @@ -142,15 +142,17 @@ impl<S: web_transport_trait::Session> Publisher<S> {
let prefix = interest.prefix.to_owned();
let exclude_hop = interest.exclude_hop;

let mut origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?;
let origin = self.origin.scope(&[prefix.as_path()]).ok_or(Error::Unauthorized)?;
let mut announced = origin.announced();

let version = self.version;
let self_origin = self.self_origin;
let stats = self.stats.clone();
web_async::spawn(async move {
if let Err(err) = Self::run_announce(
&mut stream,
&mut origin,
&origin,
&mut announced,
&prefix,
self_origin,
exclude_hop,
Expand All @@ -175,9 +177,11 @@ impl<S: web_transport_trait::Session> Publisher<S> {
Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn run_announce(
stream: &mut Stream<S, Version>,
origin: &mut OriginConsumer,
origin: &OriginConsumer,
announced: &mut AnnounceConsumer,
prefix: impl AsPath,
self_origin: Origin,
// Peer's session-level origin id, sent in AnnounceInterest. We skip
Expand All @@ -202,7 +206,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {

// Send ANNOUNCE_INIT as the first message with all currently active paths
// We use `try_next()` to synchronously get the initial updates.
while let Some((path, active)) = origin.try_announced() {
while let Some((path, active)) = announced.try_next() {
let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path");

if active.is_some() {
Expand Down Expand Up @@ -233,8 +237,8 @@ impl<S: web_transport_trait::Session> Publisher<S> {
tokio::select! {
biased;
res = stream.reader.closed() => return res,
announced = origin.announced() => {
match announced {
next = announced.next() => {
match next {
Some((path, active)) => {
let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned();

Expand Down
Loading
Loading