Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion py/moq-rs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ async def main():
catalog = await announcement.broadcast.catalog()

for name in catalog.audio:
async for frame in announcement.broadcast.subscribe_media(name):
media = await announcement.broadcast.subscribe_media(name)
async for frame in media:
print(f"Got frame: {len(frame.payload)} bytes, ts={frame.timestamp_us}")

asyncio.run(main())
Expand Down
2 changes: 1 addition & 1 deletion py/moq-rs/examples/clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def subscribe(url: str, broadcast_name: str, track_name: str, tls_verify:
broadcast = await client.announced_broadcast(broadcast_name)

print(f"subscribed to {broadcast_name!r} track={track_name!r}")
track = broadcast.subscribe_track(track_name)
track = await broadcast.subscribe_track(track_name)

async for group in track:
prefix: bytes | None = None
Expand Down
16 changes: 8 additions & 8 deletions py/moq-rs/moq/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,17 @@ class BroadcastConsumer:
def __init__(self, inner: MoqBroadcastConsumer) -> None:
self._inner = inner

def subscribe_catalog(self) -> CatalogConsumer:
return CatalogConsumer(self._inner.subscribe_catalog())
async def subscribe_catalog(self) -> CatalogConsumer:
return CatalogConsumer(await self._inner.subscribe_catalog())

def subscribe_track(self, name: str) -> TrackConsumer:
"""Subscribe to a track — receive arbitrary byte payloads."""
return TrackConsumer(self._inner.subscribe_track(name))
async def subscribe_track(self, name: str) -> TrackConsumer:
"""Subscribe to a track. Receive arbitrary byte payloads."""
return TrackConsumer(await self._inner.subscribe_track(name))

def subscribe_media(self, name: str, container: Container, max_latency_ms: int) -> MediaConsumer:
return MediaConsumer(self._inner.subscribe_media(name, container, max_latency_ms))
async def subscribe_media(self, name: str, container: Container, max_latency_ms: int) -> MediaConsumer:
return MediaConsumer(await self._inner.subscribe_media(name, container, max_latency_ms))

async def catalog(self) -> Catalog:
"""Convenience: subscribe and return the first catalog."""
consumer = self.subscribe_catalog()
consumer = await self.subscribe_catalog()
return await anext(consumer)
20 changes: 10 additions & 10 deletions py/moq-rs/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def test_local_publish_consume_audio():
assert audio.sample_rate == 48000
assert audio.channel_count == 2

media_consumer = announcement.broadcast.subscribe_media(track_name, audio.container, 10_000)
media_consumer = await announcement.broadcast.subscribe_media(track_name, audio.container, 10_000)

payload = b"opus audio payload data"
media.write_frame(payload, 1_000_000)
Expand Down Expand Up @@ -144,7 +144,7 @@ async def test_video_publish_consume():
assert video.coded.width == 1280
assert video.coded.height == 720

media_consumer = announcement.broadcast.subscribe_media(track_name, video.container, 10_000)
media_consumer = await announcement.broadcast.subscribe_media(track_name, video.container, 10_000)

keyframe = bytes([0x00, 0x00, 0x00, 0x01, 0x65, 0xAA, 0xBB, 0xCC])
media.write_frame(keyframe, 0)
Expand All @@ -169,7 +169,7 @@ async def test_multiple_frames_ordering():
catalog = await announcement.broadcast.catalog()
track_name = list(catalog.audio.keys())[0]
audio = catalog.audio[track_name]
media_consumer = announcement.broadcast.subscribe_media(track_name, audio.container, 10_000)
media_consumer = await announcement.broadcast.subscribe_media(track_name, audio.container, 10_000)

timestamps = [0, 20_000, 40_000, 60_000, 80_000]
for i, ts in enumerate(timestamps):
Expand All @@ -193,7 +193,7 @@ async def test_catalog_update_on_new_track():
consumer = origin.consume()

async for announcement in consumer.announced():
cat_consumer = announcement.broadcast.subscribe_catalog()
cat_consumer = await announcement.broadcast.subscribe_catalog()

# First catalog: 1 audio track.
catalog1 = await anext(cat_consumer)
Expand Down Expand Up @@ -226,7 +226,7 @@ async def test_announced_broadcast():

async for announcement in consumer.announced():
assert announcement.path == "test/broadcast"
_catalog = announcement.broadcast.subscribe_catalog()
_catalog = await announcement.broadcast.subscribe_catalog()
break


Expand Down Expand Up @@ -334,7 +334,7 @@ async def test_raw_publish_consume():
async for announcement in consumer.announced():
assert announcement.path == "robot/arm"

raw_consumer = announcement.broadcast.subscribe_track("events")
raw_consumer = await announcement.broadcast.subscribe_track("events")

payload = b'{"cmd": "button_changed", "arm": "left", "button": "THUMB", "state": "PRESSED"}'
raw.write_frame(payload)
Expand All @@ -357,7 +357,7 @@ async def test_raw_multiple_frames():
consumer = origin.consume()

async for announcement in consumer.announced():
raw_consumer = announcement.broadcast.subscribe_track("commands")
raw_consumer = await announcement.broadcast.subscribe_track("commands")

messages = [
b'{"cmd": "led", "arm": "left", "led": "THUMB", "state": 1}',
Expand Down Expand Up @@ -419,7 +419,7 @@ async def test_broadcast_producer_consume_direct():
raw = broadcast.publish_track("events")
consumer = broadcast.consume()

raw_consumer = consumer.subscribe_track("events")
raw_consumer = await consumer.subscribe_track("events")
raw.write_frame(b"event-0")

async for group in raw_consumer:
Expand All @@ -439,7 +439,7 @@ async def test_raw_group_sequence():
consumer = origin.consume()

async for announcement in consumer.announced():
raw_consumer = announcement.broadcast.subscribe_track("seq")
raw_consumer = await announcement.broadcast.subscribe_track("seq")

sent_sequences = []
for i in range(3):
Expand Down Expand Up @@ -470,7 +470,7 @@ async def test_raw_multi_frame_group():
consumer = origin.consume()

async for announcement in consumer.announced():
raw_consumer = announcement.broadcast.subscribe_track("chunks")
raw_consumer = await announcement.broadcast.subscribe_track("chunks")

group_producer = raw.append_group()
chunks = [b"chunk-0", b"chunk-1", b"chunk-2"]
Expand Down
6 changes: 6 additions & 0 deletions rs/conducer/src/waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ impl Waiter {
pub fn register(&self, list: &mut WaiterList) {
list.register(self);
}

/// The underlying [`Waker`]. Useful for bridging into other async machinery
/// (e.g. `futures::task::AtomicWaker`) that needs a `Waker` directly.
pub fn waker(&self) -> &Waker {
&self.waker
}
}

/// A list of weak wakers waiting for notification.
Expand Down
23 changes: 15 additions & 8 deletions rs/hang/examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ async fn run_subscribe(mut consumer: moq_net::OriginConsumer) -> anyhow::Result<
tracing::info!(%path, "broadcast announced");

// Read the catalog to discover available tracks.
let catalog_track = broadcast.subscribe_track(&hang::Catalog::default_track())?;
let catalog_track = broadcast
.subscribe_track(hang::Catalog::DEFAULT_NAME, moq_net::Subscription::default())
.await?;
let mut catalog = moq_mux::catalog::Consumer::new(catalog_track);

let info = catalog.next().await?.ok_or_else(|| anyhow::anyhow!("no catalog"))?;
Expand All @@ -71,13 +73,18 @@ async fn run_subscribe(mut consumer: moq_net::OriginConsumer) -> anyhow::Result<
"subscribing to video track"
);

// Subscribe to the video track.
let track = moq_net::Track {
name: name.clone(),
priority: 1,
};

let track_consumer = broadcast.subscribe_track(&track)?;
// Subscribe to the video track. Priority is a subscriber preference; the
// publisher's authoritative Track properties (including timescale) arrive
// on the returned TrackConsumer.
let track_consumer = broadcast
.subscribe_track(
name,
moq_net::Subscription {
priority: 1,
timeout: Duration::ZERO,
},
)
.await?;
let mut ordered = moq_mux::container::Consumer::new(track_consumer, moq_mux::container::Hang::Legacy)
.with_latency(Duration::from_millis(500));

Expand Down
1 change: 1 addition & 0 deletions rs/hang/examples/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fn create_track(broadcast: &mut moq_net::BroadcastProducer) -> anyhow::Result<mo
let video_track = moq_net::Track {
name: "video".to_string(),
priority: 1, // Video typically has lower priority than audio
timescale: moq_net::Timescale::MICRO,
};

// Example video configuration
Expand Down
2 changes: 1 addition & 1 deletion rs/hang/src/catalog/audio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,5 @@ pub struct AudioConfig {
/// NOTE: The audio "frame" duration depends on the codec, sample rate, etc.
/// ex: AAC often uses 1024 samples per frame, so at 44100Hz, this would be 1024/44100 = 23ms
#[serde(default)]
pub jitter: Option<moq_net::Time>,
pub jitter: Option<moq_net::Timestamp>,
}
1 change: 1 addition & 0 deletions rs/hang/src/catalog/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl Catalog {
moq_net::Track {
name: Catalog::DEFAULT_NAME.to_string(),
priority: 100,
timescale: moq_net::Timescale::UNKNOWN,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rs/hang/src/catalog/video/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,5 @@ pub struct VideoConfig {
/// - If there can be up to 3 b-frames in a row, this would be 3 * 1000/fps.
/// - If frames are buffered into 2s segments, this would be 2s.
#[serde(default)]
pub jitter: Option<moq_net::Time>,
pub jitter: Option<moq_net::Timestamp>,
}
31 changes: 25 additions & 6 deletions rs/hang/src/container/frame.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
use bytes::{Buf, Bytes, BytesMut};
use derive_more::Debug;
use moq_net::coding::VarInt;

use crate::Error;

pub type Timestamp = moq_net::Timescale<1_000_000>;
pub use moq_net::Timestamp;

/// Canonical timescale for hang frame timestamps: microseconds.
pub const TIMESCALE: moq_net::Timescale = moq_net::Timescale::MICRO;

/// Re-export so callers don't need a direct `moq_net` import to refer to the
/// hang container timescale by type.
pub type Timescale = moq_net::Timescale;

/// A media frame with a timestamp and codec-specific payload.
///
Expand All @@ -29,14 +37,24 @@ pub struct Frame {

impl Frame {
/// Encode the frame to the given group as a single moq-lite frame:
/// VarInt timestamp prefix followed by the raw codec payload.
/// VarInt timestamp prefix followed by the raw codec payload. Also stamps
/// the moq-net [`moq_net::Frame::timestamp`] so the wire layer can
/// delta-encode it independently on Lite05+ (the container-level prefix
/// stays as a duplicate for now).
pub fn encode(&self, group: &mut moq_net::GroupProducer) -> Result<(), Error> {
// Normalize to the hang container timescale on the wire so peers using a
// different source scale (e.g. nanoseconds from MKV) can decode without
// knowing the producer's internal scale.
let timestamp = self.timestamp.convert(TIMESCALE)?;

let mut header = BytesMut::new();
self.timestamp.encode(&mut header).map_err(moq_net::Error::from)?;
let value = VarInt::try_from(timestamp.value()).map_err(moq_net::Error::from)?;
value.encode_quic(&mut header).map_err(moq_net::Error::from)?;

let size = header.len() + self.payload.len();
let size = (header.len() + self.payload.len()) as u64;

let mut chunked = group.create_frame(size.into())?;
let net_frame = moq_net::Frame { size, timestamp };
let mut chunked = group.create_frame(net_frame)?;
chunked.write(header.freeze())?;
chunked.write(self.payload.clone())?;
chunked.finish()?;
Expand All @@ -46,7 +64,8 @@ impl Frame {

/// Decode a frame from raw bytes (VarInt timestamp prefix + payload).
pub fn decode(mut buf: impl Buf) -> Result<Self, Error> {
let timestamp = Timestamp::decode(&mut buf)?;
let value: u64 = VarInt::decode_quic(&mut buf).map_err(moq_net::Error::from)?.into();
let timestamp = Timestamp::from_micros(value)?;
let payload = buf.copy_to_bytes(buf.remaining());

Ok(Self { timestamp, payload })
Expand Down
Loading