From 5f1b97a3f05c058721920295c48578f0fc7676a2 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 24 May 2026 12:22:11 -0700 Subject: [PATCH 1/6] moq-mux: add catalog filter/target and Annex-B exporters Adds a small composable layer for narrowing a broadcast's catalog before handing it to an exporter, plus single-rendition H.264 / H.265 Annex-B exporters that emit raw elementary streams for piping into ffmpeg or similar tools. The catalog crate gains a public `Stream` trait, a unified `Consumer` (promoted from the internal `CatalogSource`), and two wrappers: - `Filter` hard-matches on rendition name and codec family. - `Target` soft-matches on max width/height/pixels/bitrate and reduces each axis to one rendition (Rust port of js/watch's selection algorithm). Both wrappers cache the last input snapshot and re-emit on `set_*`, which is the seam future bandwidth-driven ABR will drive. `fmp4::Export` and `mkv::Export` are now generic over `S: catalog::Stream`; the legacy `with_catalog_format` constructor is gone since callers can build the stream themselves. The new `codec::h264::Export` / `codec::h265::Export` subscribe to a single rendition via `ExportSource::for_video_raw` (no avc1/hvc1 shape transform) and emit Annex-B bytes. avc3/hev1 sources pass through; avc1 / hvc1 sources are converted via `annexb::from_length_prefixed` with VPS/SPS/PPS injected at every keyframe from the avcC/hvcC. Timestamps are dropped (Annex-B has no container framing). The CLI gains `H264` / `H265` formats plus video/audio filter and target flags, wired through the `Filter` then `Target` chain. Co-Authored-By: Claude Opus 4.7 (1M context) --- rs/hang/src/catalog/audio/codec.rs | 20 + rs/hang/src/catalog/video/codec.rs | 26 ++ rs/moq-cli/src/subscribe.rs | 187 ++++++++- rs/moq-mux/src/catalog/consumer.rs | 46 +++ rs/moq-mux/src/catalog/filter.rs | 251 ++++++++++++ rs/moq-mux/src/catalog/mod.rs | 16 + rs/moq-mux/src/catalog/stream.rs | 51 +++ rs/moq-mux/src/catalog/target.rs | 395 +++++++++++++++++++ rs/moq-mux/src/codec/annexb.rs | 47 ++- rs/moq-mux/src/codec/h264/export.rs | 165 ++++++++ rs/moq-mux/src/codec/h264/mod.rs | 51 ++- rs/moq-mux/src/codec/h265/export.rs | 156 ++++++++ rs/moq-mux/src/codec/h265/mod.rs | 58 ++- rs/moq-mux/src/container/fmp4/export.rs | 42 +- rs/moq-mux/src/container/fmp4/export_test.rs | 8 +- rs/moq-mux/src/container/mkv/export.rs | 42 +- rs/moq-mux/src/container/mkv/export_test.rs | 25 +- rs/moq-mux/src/container/mod.rs | 2 +- rs/moq-mux/src/container/source.rs | 57 ++- 19 files changed, 1530 insertions(+), 115 deletions(-) create mode 100644 rs/moq-mux/src/catalog/consumer.rs create mode 100644 rs/moq-mux/src/catalog/filter.rs create mode 100644 rs/moq-mux/src/catalog/stream.rs create mode 100644 rs/moq-mux/src/catalog/target.rs create mode 100644 rs/moq-mux/src/codec/h264/export.rs create mode 100644 rs/moq-mux/src/codec/h265/export.rs diff --git a/rs/hang/src/catalog/audio/codec.rs b/rs/hang/src/catalog/audio/codec.rs index fb6309e81..8f303a36a 100644 --- a/rs/hang/src/catalog/audio/codec.rs +++ b/rs/hang/src/catalog/audio/codec.rs @@ -20,6 +20,26 @@ pub enum AudioCodec { Unknown(String), } +/// Coarse audio codec family, used for tag-only matching. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[non_exhaustive] +pub enum AudioCodecKind { + AAC, + Opus, + Unknown, +} + +impl AudioCodec { + /// Return the coarse codec family for tag-only matching. + pub fn kind(&self) -> AudioCodecKind { + match self { + Self::AAC(_) => AudioCodecKind::AAC, + Self::Opus => AudioCodecKind::Opus, + Self::Unknown(_) => AudioCodecKind::Unknown, + } + } +} + impl FromStr for AudioCodec { type Err = Error; diff --git a/rs/hang/src/catalog/video/codec.rs b/rs/hang/src/catalog/video/codec.rs index 76eb228c9..8fa206b8e 100644 --- a/rs/hang/src/catalog/video/codec.rs +++ b/rs/hang/src/catalog/video/codec.rs @@ -29,6 +29,32 @@ pub enum VideoCodec { Unknown(String), } +/// Coarse video codec family, used for tag-only matching. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[non_exhaustive] +pub enum VideoCodecKind { + H264, + H265, + VP8, + VP9, + AV1, + Unknown, +} + +impl VideoCodec { + /// Return the coarse codec family for tag-only matching. + pub fn kind(&self) -> VideoCodecKind { + match self { + Self::H264(_) => VideoCodecKind::H264, + Self::H265(_) => VideoCodecKind::H265, + Self::VP9(_) => VideoCodecKind::VP9, + Self::AV1(_) => VideoCodecKind::AV1, + Self::VP8 => VideoCodecKind::VP8, + Self::Unknown(_) => VideoCodecKind::Unknown, + } + } +} + impl FromStr for VideoCodec { type Err = Error; diff --git a/rs/moq-cli/src/subscribe.rs b/rs/moq-cli/src/subscribe.rs index e1c0804cf..d5740693e 100644 --- a/rs/moq-cli/src/subscribe.rs +++ b/rs/moq-cli/src/subscribe.rs @@ -1,14 +1,19 @@ use std::time::Duration; use clap::ValueEnum; +use hang::catalog::{AudioCodecKind, VideoCodecKind}; use hang::moq_net; -use moq_mux::catalog::CatalogFormat; +use moq_mux::catalog::{self, CatalogFormat, FilterAudio, FilterVideo, Stream, TargetAudio, TargetVideo}; use tokio::io::AsyncWriteExt; #[derive(ValueEnum, Clone, Copy)] pub enum SubscribeFormat { Fmp4, Mkv, + /// H.264 Annex-B elementary stream (no container). + H264, + /// H.265 Annex-B elementary stream (no container). + H265, } /// `clap` adapter for [`CatalogFormat`] (which is `#[non_exhaustive]` and so @@ -28,6 +33,44 @@ impl From for CatalogFormat { } } +/// `clap` adapter for [`VideoCodecKind`]. +#[derive(ValueEnum, Clone, Copy)] +pub enum VideoCodecArg { + H264, + H265, + Vp8, + Vp9, + Av1, +} + +impl From for VideoCodecKind { + fn from(value: VideoCodecArg) -> Self { + match value { + VideoCodecArg::H264 => Self::H264, + VideoCodecArg::H265 => Self::H265, + VideoCodecArg::Vp8 => Self::VP8, + VideoCodecArg::Vp9 => Self::VP9, + VideoCodecArg::Av1 => Self::AV1, + } + } +} + +/// `clap` adapter for [`AudioCodecKind`]. +#[derive(ValueEnum, Clone, Copy)] +pub enum AudioCodecArg { + Aac, + Opus, +} + +impl From for AudioCodecKind { + fn from(value: AudioCodecArg) -> Self { + match value { + AudioCodecArg::Aac => Self::AAC, + AudioCodecArg::Opus => Self::Opus, + } + } +} + #[derive(clap::Args, Clone)] pub struct SubscribeArgs { /// The format to write to stdout. @@ -52,6 +95,42 @@ pub struct SubscribeArgs { /// (`.hang` -> hang, `.msf` -> msf), falling back to hang. #[arg(long)] pub catalog: Option, + + /// Pick the video rendition with this exact name. + #[arg(long)] + pub video_name: Option, + + /// Keep only video renditions whose codec family matches. + #[arg(long)] + pub video_codec: Option, + + /// Prefer a video rendition no wider than this (px). + #[arg(long)] + pub max_video_width: Option, + + /// Prefer a video rendition no taller than this (px). + #[arg(long)] + pub max_video_height: Option, + + /// Prefer a video rendition with at most this many pixels (`coded_width * coded_height`). + #[arg(long)] + pub max_video_pixels: Option, + + /// Prefer a video rendition under this bitrate (bits per second). + #[arg(long)] + pub max_video_bitrate: Option, + + /// Pick the audio rendition with this exact name. + #[arg(long)] + pub audio_name: Option, + + /// Keep only audio renditions whose codec family matches. + #[arg(long)] + pub audio_codec: Option, + + /// Prefer an audio rendition under this bitrate (bits per second). + #[arg(long)] + pub max_audio_bitrate: Option, } impl SubscribeArgs { @@ -63,6 +142,53 @@ impl SubscribeArgs { .or_else(|| CatalogFormat::detect(broadcast)) .unwrap_or_default() } + + /// Build a video filter from the parsed flags, plus any codec defaulted by + /// the chosen output format (e.g. `--format h264` implies `codec = H264`). + fn filter_video(&self) -> Option { + let codec = self.video_codec.map(Into::into).or(match self.format { + SubscribeFormat::H264 => Some(VideoCodecKind::H264), + SubscribeFormat::H265 => Some(VideoCodecKind::H265), + _ => None, + }); + if self.video_name.is_none() && codec.is_none() { + return None; + } + Some(FilterVideo { + name: self.video_name.clone(), + codec, + }) + } + + fn filter_audio(&self) -> Option { + if self.audio_name.is_none() && self.audio_codec.is_none() { + return None; + } + Some(FilterAudio { + name: self.audio_name.clone(), + codec: self.audio_codec.map(Into::into), + }) + } + + fn target_video(&self) -> Option { + if self.max_video_width.is_none() + && self.max_video_height.is_none() + && self.max_video_pixels.is_none() + && self.max_video_bitrate.is_none() + { + return None; + } + Some(TargetVideo { + width: self.max_video_width, + height: self.max_video_height, + pixels: self.max_video_pixels, + bitrate: self.max_video_bitrate, + }) + } + + fn target_audio(&self) -> Option { + self.max_audio_bitrate.map(|b| TargetAudio { bitrate: Some(b) }) + } } pub struct Subscribe { @@ -80,20 +206,35 @@ impl Subscribe { } } + /// Build the catalog stream from the configured filter/target flags. + fn stream(&self) -> Result>, moq_mux::Error> { + let consumer = catalog::Consumer::new(&self.broadcast, self.catalog)?; + + let mut filter = consumer.filter(); + filter.set_video(self.args.filter_video()); + filter.set_audio(self.args.filter_audio()); + + let mut target = filter.target(); + target.set_video(self.args.target_video()); + target.set_audio(self.args.target_audio()); + + Ok(target) + } + pub async fn run(self) -> anyhow::Result<()> { match self.args.format { SubscribeFormat::Fmp4 => self.run_fmp4().await, SubscribeFormat::Mkv => self.run_mkv().await, + SubscribeFormat::H264 => self.run_h264().await, + SubscribeFormat::H265 => self.run_h265().await, } } async fn run_fmp4(self) -> anyhow::Result<()> { let mut stdout = tokio::io::stdout(); - // Fmp4 subscribes to the catalog internally, builds the merged init segment - // from the first catalog snapshot, then yields moof+mdat fragments in - // timestamp order across tracks. - let mut fmp4 = moq_mux::container::fmp4::Export::with_catalog_format(self.broadcast, self.catalog)? + let stream = self.stream()?; + let mut fmp4 = moq_mux::container::fmp4::Export::new(self.broadcast.clone(), stream) .with_latency(self.args.max_latency) .with_fragment_duration(self.args.fragment_duration); @@ -108,10 +249,8 @@ impl Subscribe { async fn run_mkv(self) -> anyhow::Result<()> { let mut stdout = tokio::io::stdout(); - // Mkv writes EBML + an unknown-size Segment header, then per-fragment - // Cluster elements. Avc3/Hev1 sources are transcoded to avc1/hvc1 - // shape internally (synthesizing avcC/hvcC from inline parameter sets). - let mut mkv = moq_mux::container::mkv::Export::with_catalog_format(self.broadcast, self.catalog)? + let stream = self.stream()?; + let mut mkv = moq_mux::container::mkv::Export::new(self.broadcast.clone(), stream) .with_latency(self.args.max_latency) .with_fragment_duration(self.args.fragment_duration); @@ -122,4 +261,34 @@ impl Subscribe { Ok(()) } + + async fn run_h264(self) -> anyhow::Result<()> { + let mut stdout = tokio::io::stdout(); + + let stream = self.stream()?; + let mut h264 = + moq_mux::codec::h264::Export::new(self.broadcast.clone(), stream).with_latency(self.args.max_latency); + + while let Some(chunk) = h264.next().await? { + stdout.write_all(&chunk).await?; + stdout.flush().await?; + } + + Ok(()) + } + + async fn run_h265(self) -> anyhow::Result<()> { + let mut stdout = tokio::io::stdout(); + + let stream = self.stream()?; + let mut h265 = + moq_mux::codec::h265::Export::new(self.broadcast.clone(), stream).with_latency(self.args.max_latency); + + while let Some(chunk) = h265.next().await? { + stdout.write_all(&chunk).await?; + stdout.flush().await?; + } + + Ok(()) + } } diff --git a/rs/moq-mux/src/catalog/consumer.rs b/rs/moq-mux/src/catalog/consumer.rs new file mode 100644 index 000000000..2bea9aa30 --- /dev/null +++ b/rs/moq-mux/src/catalog/consumer.rs @@ -0,0 +1,46 @@ +//! Unified catalog consumer. +//! +//! Subscribes to whichever catalog track ([`hang`] or [`msf`]) the broadcast +//! advertises and yields [`hang::Catalog`] snapshots so callers and exporters +//! only deal with one shape. + +use std::task::Poll; + +use hang::Catalog; + +use super::{CatalogFormat, Stream}; + +/// A catalog stream sourced from a [`moq_net::BroadcastConsumer`]. +/// +/// Both variants emit [`hang::Catalog`]; the MSF variant converts each snapshot +/// on the fly. Wrap with [`Filter`](super::Filter) / [`Target`](super::Target) +/// to narrow the rendition set before handing the stream to an exporter. +pub enum Consumer { + Hang(super::hang::Consumer), + Msf(super::msf::Consumer), +} + +impl Consumer { + /// Subscribe to the catalog track advertised by `format`. + pub fn new(broadcast: &moq_net::BroadcastConsumer, format: CatalogFormat) -> Result { + Ok(match format { + CatalogFormat::Hang => { + let track = broadcast.subscribe_track(&hang::Catalog::default_track())?; + Self::Hang(super::hang::Consumer::new(track)) + } + CatalogFormat::Msf => { + let track = broadcast.subscribe_track(&moq_net::Track::new(moq_msf::DEFAULT_NAME))?; + Self::Msf(super::msf::Consumer::new(track)) + } + }) + } +} + +impl Stream for Consumer { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + match self { + Self::Hang(c) => c.poll_next(waiter).map_err(Into::into), + Self::Msf(c) => c.poll_next(waiter), + } + } +} diff --git a/rs/moq-mux/src/catalog/filter.rs b/rs/moq-mux/src/catalog/filter.rs new file mode 100644 index 000000000..78b9aa5b7 --- /dev/null +++ b/rs/moq-mux/src/catalog/filter.rs @@ -0,0 +1,251 @@ +//! Hard-match rendition filter. +//! +//! [`Filter`] wraps any [`Stream`] and drops renditions that don't satisfy a +//! [`FilterVideo`] / [`FilterAudio`]. Matching is exact: a `name` constraint +//! keeps only the rendition with that key, a `codec` constraint keeps only +//! renditions whose codec family matches. Multiple constraints intersect. + +use std::task::Poll; + +use hang::Catalog; +use hang::catalog::{AudioCodecKind, VideoCodecKind}; + +use super::Stream; + +/// Hard-match criteria for video renditions. +#[derive(Debug, Default, Clone)] +pub struct FilterVideo { + /// Keep only the rendition with this exact name. + pub name: Option, + /// Keep only renditions whose codec family matches. + pub codec: Option, +} + +/// Hard-match criteria for audio renditions. +#[derive(Debug, Default, Clone)] +pub struct FilterAudio { + /// Keep only the rendition with this exact name. + pub name: Option, + /// Keep only renditions whose codec family matches. + pub codec: Option, +} + +/// A [`Stream`] that drops renditions failing a [`FilterVideo`] / [`FilterAudio`]. +pub struct Filter { + inner: S, + video: Option, + audio: Option, + /// Last raw snapshot from `inner`, kept so retargeting via `set_*` can re-emit + /// without polling upstream (the foothold for future ABR retargeting). + last_input: Option, + /// True if `set_*` has been called since the last emit and we still owe a + /// re-evaluated snapshot derived from `last_input`. + dirty: bool, +} + +impl Filter { + pub fn new(inner: S) -> Self { + Self { + inner, + video: None, + audio: None, + last_input: None, + dirty: false, + } + } + + /// Set or clear the video filter. Pass `None` to clear. + pub fn set_video(&mut self, filter: impl Into>) { + self.video = filter.into(); + self.dirty = self.last_input.is_some(); + } + + /// Set or clear the audio filter. Pass `None` to clear. + pub fn set_audio(&mut self, filter: impl Into>) { + self.audio = filter.into(); + self.dirty = self.last_input.is_some(); + } + + fn apply(&self, mut catalog: Catalog) -> Catalog { + if let Some(filter) = &self.video { + catalog.video.renditions.retain(|name, config| { + if let Some(want) = &filter.name + && want != name + { + return false; + } + if let Some(want) = filter.codec + && config.codec.kind() != want + { + return false; + } + true + }); + } + if let Some(filter) = &self.audio { + catalog.audio.renditions.retain(|name, config| { + if let Some(want) = &filter.name + && want != name + { + return false; + } + if let Some(want) = filter.codec + && config.codec.kind() != want + { + return false; + } + true + }); + } + catalog + } +} + +impl Stream for Filter { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + if self.dirty { + self.dirty = false; + if let Some(snapshot) = self.last_input.clone() { + return Poll::Ready(Ok(Some(self.apply(snapshot)))); + } + } + + match self.inner.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => { + self.last_input = Some(snapshot.clone()); + Poll::Ready(Ok(Some(self.apply(snapshot)))) + } + Poll::Ready(None) => Poll::Ready(Ok(None)), + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod test { + use std::collections::BTreeMap; + + use hang::catalog::{AudioCodec, AudioConfig, Container, H264, VideoConfig}; + + use super::*; + + struct Once(Option); + + impl Stream for Once { + fn poll_next(&mut self, _: &conducer::Waiter) -> Poll>> { + Poll::Ready(Ok(self.0.take())) + } + } + + fn h264(name: &str) -> (String, VideoConfig) { + let mut config = VideoConfig::new(H264 { + profile: 0x42, + constraints: 0, + level: 0x1e, + inline: false, + }); + config.coded_width = Some(640); + config.coded_height = Some(360); + config.bitrate = Some(500_000); + config.framerate = Some(30.0); + config.container = Container::Legacy; + (name.to_string(), config) + } + + fn opus(name: &str) -> (String, AudioConfig) { + let mut config = AudioConfig::new(AudioCodec::Opus, 48_000, 2); + config.bitrate = Some(128_000); + config.container = Container::Legacy; + (name.to_string(), config) + } + + fn catalog_with(video: Vec<(String, VideoConfig)>, audio: Vec<(String, AudioConfig)>) -> Catalog { + let mut c = Catalog::default(); + c.video.renditions = BTreeMap::from_iter(video); + c.audio.renditions = BTreeMap::from_iter(audio); + c + } + + #[test] + fn codec_filter_keeps_matching() { + let mut hd = h264("hd"); + hd.1.codec = hang::catalog::VP9 { + profile: 0, + level: 10, + bit_depth: 8, + chroma_subsampling: 1, + color_primaries: 1, + transfer_characteristics: 1, + matrix_coefficients: 1, + full_range: false, + } + .into(); + let snapshot = catalog_with(vec![h264("lo"), hd], vec![]); + + let mut f = Filter::new(Once(Some(snapshot))); + f.set_video(FilterVideo { + codec: Some(VideoCodecKind::H264), + ..Default::default() + }); + + let out = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("expected snapshot, got {other:?}"), + }; + assert_eq!(out.video.renditions.keys().collect::>(), vec!["lo"]); + } + + #[test] + fn name_filter_exact() { + let snapshot = catalog_with(vec![h264("lo"), h264("hi")], vec![]); + let mut f = Filter::new(Once(Some(snapshot))); + f.set_video(FilterVideo { + name: Some("hi".into()), + ..Default::default() + }); + let out = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("got {other:?}"), + }; + assert_eq!(out.video.renditions.keys().collect::>(), vec!["hi"]); + } + + #[test] + fn audio_filter_independent_of_video() { + let snapshot = catalog_with(vec![h264("hi")], vec![opus("en"), opus("es")]); + let mut f = Filter::new(Once(Some(snapshot))); + f.set_audio(FilterAudio { + name: Some("es".into()), + ..Default::default() + }); + let out = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("got {other:?}"), + }; + assert_eq!(out.video.renditions.keys().collect::>(), vec!["hi"]); + assert_eq!(out.audio.renditions.keys().collect::>(), vec!["es"]); + } + + #[test] + fn set_video_after_snapshot_reemits() { + let snapshot = catalog_with(vec![h264("lo"), h264("hi")], vec![]); + let mut f = Filter::new(Once(Some(snapshot))); + + let first = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("got {other:?}"), + }; + assert_eq!(first.video.renditions.len(), 2); + + f.set_video(FilterVideo { + name: Some("hi".into()), + ..Default::default() + }); + + let again = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("expected re-emit, got {other:?}"), + }; + assert_eq!(again.video.renditions.keys().collect::>(), vec!["hi"]); + } +} diff --git a/rs/moq-mux/src/catalog/mod.rs b/rs/moq-mux/src/catalog/mod.rs index 203b416b7..417fbb25b 100644 --- a/rs/moq-mux/src/catalog/mod.rs +++ b/rs/moq-mux/src/catalog/mod.rs @@ -10,9 +10,25 @@ //! Publishing through [`hang::Producer`] writes both tracks together; //! subscribers pick one based on the broadcast's filename suffix. See //! [`CatalogFormat`] for the suffix-to-format mapping. +//! +//! On the consume side, [`Consumer`] is the unified entry point: it +//! subscribes to whichever catalog track `format` advertises and yields +//! [`::hang::Catalog`] snapshots. Wrap it with [`Filter`] (hard match on +//! name / codec family) or [`Target`] (soft match picking one rendition +//! per axis) to narrow the set before handing it to an exporter; both +//! also implement [`Stream`] so they compose either direction. pub mod hang; pub mod msf; +mod consumer; +mod filter; mod format; +mod stream; +mod target; + +pub use consumer::Consumer; +pub use filter::{Filter, FilterAudio, FilterVideo}; pub use format::*; +pub use stream::Stream; +pub use target::{Target, TargetAudio, TargetVideo}; diff --git a/rs/moq-mux/src/catalog/stream.rs b/rs/moq-mux/src/catalog/stream.rs new file mode 100644 index 000000000..fd78645f9 --- /dev/null +++ b/rs/moq-mux/src/catalog/stream.rs @@ -0,0 +1,51 @@ +//! Catalog stream trait. +//! +//! [`Stream`] yields a sequence of [`hang::Catalog`] snapshots. Both the +//! raw [`Consumer`](super::Consumer) and the rendition-selecting +//! [`Filter`](super::Filter) / [`Target`](super::Target) wrappers implement +//! it, so exporters can be written against the trait and the caller picks +//! the selection policy. + +use std::task::Poll; + +use hang::Catalog; + +use super::{Filter, Target}; + +/// A stream of catalog snapshots. +/// +/// `poll_next` returns the next snapshot (a full catalog, not a delta), or +/// `None` once the underlying track has ended. Late snapshots supersede +/// earlier ones, so an implementation may drop intermediate snapshots. +/// +/// Stream types are required to be `Send + 'static` so they can be moved +/// across threads and held inside exporters without per-call bounds. +pub trait Stream: Send + 'static { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>>; + + /// Wait for the next snapshot. + fn next(&mut self) -> impl std::future::Future>> + Send + where + Self: Sized, + { + async move { conducer::wait(|waiter| self.poll_next(waiter)).await } + } + + /// Wrap this stream in a [`Filter`] that drops renditions which don't + /// match a hard-match criterion (name or codec family). + fn filter(self) -> Filter + where + Self: Sized, + { + Filter::new(self) + } + + /// Wrap this stream in a [`Target`] that reduces each axis to at most + /// one rendition by soft-matching against width / height / bitrate. + fn target(self) -> Target + where + Self: Sized, + { + Target::new(self) + } +} diff --git a/rs/moq-mux/src/catalog/target.rs b/rs/moq-mux/src/catalog/target.rs new file mode 100644 index 000000000..7fa3af57a --- /dev/null +++ b/rs/moq-mux/src/catalog/target.rs @@ -0,0 +1,395 @@ +//! Soft-match rendition target. +//! +//! [`Target`] wraps any [`Stream`] and reduces each axis (video / audio) to at +//! most one rendition by ranking the input against constraints like maximum +//! width, height, pixels, or bitrate. The ranking algorithm is a Rust port of +//! [js/watch's `#select`](js/watch/src/video/source.ts). + +use std::collections::BTreeMap; +use std::task::Poll; + +use hang::Catalog; +use hang::catalog::{AudioConfig, VideoConfig}; + +use super::Stream; + +/// Soft-match constraints for the video rendition. +/// +/// Each `Option` is a *maximum* the selection will try to stay under. When a +/// rendition fits every active maximum, the largest such rendition wins; if +/// nothing fits, the algorithm degrades to the smallest over-budget rendition +/// (per constraint) and intersects across constraints. +#[derive(Debug, Default, Clone)] +pub struct TargetVideo { + pub width: Option, + pub height: Option, + pub pixels: Option, + pub bitrate: Option, +} + +/// Soft-match constraints for the audio rendition. +#[derive(Debug, Default, Clone)] +pub struct TargetAudio { + pub bitrate: Option, +} + +/// A [`Stream`] that picks one rendition per axis from the inner snapshot. +pub struct Target { + inner: S, + video: Option, + audio: Option, + last_input: Option, + dirty: bool, +} + +impl Target { + pub fn new(inner: S) -> Self { + Self { + inner, + video: None, + audio: None, + last_input: None, + dirty: false, + } + } + + /// Set or clear the video target. Pass `None` to keep every rendition. + pub fn set_video(&mut self, target: impl Into>) { + self.video = target.into(); + self.dirty = self.last_input.is_some(); + } + + /// Set or clear the audio target. Pass `None` to keep every rendition. + pub fn set_audio(&mut self, target: impl Into>) { + self.audio = target.into(); + self.dirty = self.last_input.is_some(); + } + + fn apply(&self, mut catalog: Catalog) -> Catalog { + if let Some(target) = &self.video + && let Some(name) = select_video(&catalog.video.renditions, target) + { + let mut kept = BTreeMap::new(); + if let Some(config) = catalog.video.renditions.remove(&name) { + kept.insert(name, config); + } + catalog.video.renditions = kept; + } else if self.video.is_some() { + catalog.video.renditions.clear(); + } + + if let Some(target) = &self.audio + && let Some(name) = select_audio(&catalog.audio.renditions, target) + { + let mut kept = BTreeMap::new(); + if let Some(config) = catalog.audio.renditions.remove(&name) { + kept.insert(name, config); + } + catalog.audio.renditions = kept; + } else if self.audio.is_some() { + catalog.audio.renditions.clear(); + } + + catalog + } +} + +impl Stream for Target { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + if self.dirty { + self.dirty = false; + if let Some(snapshot) = self.last_input.clone() { + return Poll::Ready(Ok(Some(self.apply(snapshot)))); + } + } + + match self.inner.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => { + self.last_input = Some(snapshot.clone()); + Poll::Ready(Ok(Some(self.apply(snapshot)))) + } + Poll::Ready(None) => Poll::Ready(Ok(None)), + Poll::Pending => Poll::Pending, + } + } +} + +/// Run all active video rankings and return the highest-ranked rendition +/// present in every ranking, or `None` if the intersection is empty. +fn select_video(renditions: &BTreeMap, target: &TargetVideo) -> Option { + if renditions.is_empty() { + return None; + } + if renditions.len() == 1 { + return renditions.keys().next().cloned(); + } + + let mut rankings: Vec> = Vec::new(); + if let Some(max) = target.pixels { + rankings.push(by_pixels(renditions, max)); + } + if target.width.is_some() || target.height.is_some() { + rankings.push(by_dimensions(renditions, target.width, target.height)); + } + if let Some(max) = target.bitrate { + rankings.push(by_video_bitrate(renditions, max)); + } + + if rankings.is_empty() { + return Some(best_video(renditions)); + } + + intersect_rankings(rankings) +} + +fn select_audio(renditions: &BTreeMap, target: &TargetAudio) -> Option { + if renditions.is_empty() { + return None; + } + if renditions.len() == 1 { + return renditions.keys().next().cloned(); + } + + let mut rankings: Vec> = Vec::new(); + if let Some(max) = target.bitrate { + rankings.push(by_audio_bitrate(renditions, max)); + } + + if rankings.is_empty() { + return Some(best_audio(renditions)); + } + + intersect_rankings(rankings) +} + +/// Pick the first name from `rankings[0]` that appears in every other ranking. +fn intersect_rankings(rankings: Vec>) -> Option { + use std::collections::HashSet; + let sets: Vec> = rankings.iter().map(|r| r.iter().collect()).collect(); + for name in &rankings[0] { + if sets.iter().all(|s| s.contains(name)) { + return Some(name.clone()); + } + } + tracing::warn!("conflicting rendition targets, no rendition satisfies all criteria"); + None +} + +/// Rank by area, largest-first within budget; fall back to single smallest +/// over-budget if nothing fits. Renditions without resolution metadata are +/// returned unranked when no rendition has any metadata at all (mirrors the JS). +fn by_pixels(renditions: &BTreeMap, max: u32) -> Vec { + let mut within: Vec<(String, u32)> = Vec::new(); + let mut rest: Vec<(String, u32)> = Vec::new(); + + for (name, config) in renditions { + if let (Some(w), Some(h)) = (config.coded_width, config.coded_height) { + let size = w.saturating_mul(h); + if size <= max { + within.push((name.clone(), size)); + } else { + rest.push((name.clone(), size)); + } + } + } + + within.sort_by_key(|b| std::cmp::Reverse(b.1)); + if !within.is_empty() { + return within.into_iter().map(|(n, _)| n).collect(); + } + + rest.sort_by_key(|a| a.1); + if let Some(smallest) = rest.into_iter().next() { + return vec![smallest.0]; + } + + renditions.keys().cloned().collect() +} + +fn by_dimensions(renditions: &BTreeMap, width: Option, height: Option) -> Vec { + let mut within: Vec<(String, u32)> = Vec::new(); + let mut rest: Vec<(String, u32)> = Vec::new(); + + for (name, config) in renditions { + let (Some(w), Some(h)) = (config.coded_width, config.coded_height) else { + continue; + }; + let size = w.saturating_mul(h); + let fits_w = width.is_none_or(|cap| w <= cap); + let fits_h = height.is_none_or(|cap| h <= cap); + if fits_w && fits_h { + within.push((name.clone(), size)); + } else { + rest.push((name.clone(), size)); + } + } + + within.sort_by_key(|b| std::cmp::Reverse(b.1)); + if !within.is_empty() { + return within.into_iter().map(|(n, _)| n).collect(); + } + + rest.sort_by_key(|a| a.1); + if let Some(smallest) = rest.into_iter().next() { + return vec![smallest.0]; + } + + renditions.keys().cloned().collect() +} + +fn by_video_bitrate(renditions: &BTreeMap, max: u64) -> Vec { + let mut within: Vec<(String, u64)> = Vec::new(); + let mut rest: Vec<(String, u64)> = Vec::new(); + for (name, config) in renditions { + if let Some(b) = config.bitrate { + if b <= max { + within.push((name.clone(), b)); + } else { + rest.push((name.clone(), b)); + } + } + } + within.sort_by_key(|b| std::cmp::Reverse(b.1)); + if !within.is_empty() { + return within.into_iter().map(|(n, _)| n).collect(); + } + rest.sort_by_key(|a| a.1); + if let Some(smallest) = rest.into_iter().next() { + return vec![smallest.0]; + } + renditions.keys().cloned().collect() +} + +fn by_audio_bitrate(renditions: &BTreeMap, max: u64) -> Vec { + let mut within: Vec<(String, u64)> = Vec::new(); + let mut rest: Vec<(String, u64)> = Vec::new(); + for (name, config) in renditions { + if let Some(b) = config.bitrate { + if b <= max { + within.push((name.clone(), b)); + } else { + rest.push((name.clone(), b)); + } + } + } + within.sort_by_key(|b| std::cmp::Reverse(b.1)); + if !within.is_empty() { + return within.into_iter().map(|(n, _)| n).collect(); + } + rest.sort_by_key(|a| a.1); + if let Some(smallest) = rest.into_iter().next() { + return vec![smallest.0]; + } + renditions.keys().cloned().collect() +} + +/// With no constraints, prefer the largest resolution then the highest bitrate. +fn best_video(renditions: &BTreeMap) -> String { + renditions + .iter() + .max_by_key(|(_, c)| { + let area = c.coded_width.unwrap_or(0).saturating_mul(c.coded_height.unwrap_or(0)) as u64; + (area, c.bitrate.unwrap_or(0)) + }) + .map(|(n, _)| n.clone()) + .expect("renditions non-empty checked by caller") +} + +fn best_audio(renditions: &BTreeMap) -> String { + renditions + .iter() + .max_by_key(|(_, c)| c.bitrate.unwrap_or(0)) + .map(|(n, _)| n.clone()) + .expect("renditions non-empty checked by caller") +} + +#[cfg(test)] +mod test { + use hang::catalog::{Container, H264, VideoConfig}; + + use super::*; + + fn vid(name: &str, w: u32, h: u32, bitrate: u64) -> (String, VideoConfig) { + let mut config = VideoConfig::new(H264 { + profile: 0x42, + constraints: 0, + level: 0x1e, + inline: false, + }); + config.coded_width = Some(w); + config.coded_height = Some(h); + config.bitrate = Some(bitrate); + config.framerate = Some(30.0); + config.container = Container::Legacy; + (name.to_string(), config) + } + + fn map(items: Vec<(String, VideoConfig)>) -> BTreeMap { + BTreeMap::from_iter(items) + } + + #[test] + fn pick_largest_under_width_cap() { + let renditions = map(vec![ + vid("sd", 640, 360, 500_000), + vid("hd", 1280, 720, 2_500_000), + vid("fhd", 1920, 1080, 6_000_000), + ]); + let target = TargetVideo { + width: Some(1280), + ..Default::default() + }; + assert_eq!(select_video(&renditions, &target).as_deref(), Some("hd")); + } + + #[test] + fn pick_largest_under_bitrate_cap() { + let renditions = map(vec![ + vid("sd", 640, 360, 500_000), + vid("hd", 1280, 720, 2_500_000), + vid("fhd", 1920, 1080, 6_000_000), + ]); + let target = TargetVideo { + bitrate: Some(3_000_000), + ..Default::default() + }; + assert_eq!(select_video(&renditions, &target).as_deref(), Some("hd")); + } + + #[test] + fn degrade_to_smallest_over_budget() { + let renditions = map(vec![vid("hd", 1280, 720, 2_500_000), vid("fhd", 1920, 1080, 6_000_000)]); + let target = TargetVideo { + bitrate: Some(100_000), + ..Default::default() + }; + assert_eq!(select_video(&renditions, &target).as_deref(), Some("hd")); + } + + #[test] + fn no_constraints_picks_largest() { + let renditions = map(vec![ + vid("sd", 640, 360, 500_000), + vid("hd", 1280, 720, 2_500_000), + vid("fhd", 1920, 1080, 6_000_000), + ]); + let target = TargetVideo::default(); + assert_eq!(select_video(&renditions, &target).as_deref(), Some("fhd")); + } + + #[test] + fn width_and_bitrate_intersect() { + let renditions = map(vec![ + vid("sd", 640, 360, 500_000), + vid("hd", 1280, 720, 2_500_000), + vid("fhd", 1920, 1080, 6_000_000), + ]); + let target = TargetVideo { + width: Some(1920), + bitrate: Some(1_000_000), + ..Default::default() + }; + // width allows all, bitrate allows only sd. + assert_eq!(select_video(&renditions, &target).as_deref(), Some("sd")); + } +} diff --git a/rs/moq-mux/src/codec/annexb.rs b/rs/moq-mux/src/codec/annexb.rs index 7a5b1de5f..06d8ada9e 100644 --- a/rs/moq-mux/src/codec/annexb.rs +++ b/rs/moq-mux/src/codec/annexb.rs @@ -1,8 +1,53 @@ use anyhow::{self}; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; pub const START_CODE: Bytes = Bytes::from_static(&[0, 0, 0, 1]); +/// Convert a length-prefixed NALU payload (avc1 / hvc1 wire shape) to Annex-B, +/// optionally prepending `prefix` bytes (typically VPS/SPS/PPS NAL units already +/// in Annex-B form, for keyframe parameter-set injection). +pub fn from_length_prefixed(payload: &[u8], length_size: usize, prefix: Option<&[u8]>) -> anyhow::Result { + anyhow::ensure!( + (1..=4).contains(&length_size), + "invalid avc1/hvc1 length size {length_size}" + ); + + let mut out = BytesMut::with_capacity(payload.len() + prefix.map(|p| p.len()).unwrap_or(0) + 16); + if let Some(p) = prefix { + out.extend_from_slice(p); + } + + let mut pos = 0; + while pos < payload.len() { + anyhow::ensure!(payload.len() >= pos + length_size, "truncated NAL length prefix"); + let mut len = 0usize; + for byte in &payload[pos..pos + length_size] { + len = (len << 8) | (*byte as usize); + } + pos += length_size; + anyhow::ensure!(payload.len() >= pos + len, "truncated NAL payload"); + out.extend_from_slice(&START_CODE); + out.extend_from_slice(&payload[pos..pos + len]); + pos += len; + } + + Ok(out.freeze()) +} + +/// Concatenate `start_code | nal` for every NAL in `nals` and freeze the +/// result. Used to build a keyframe parameter-set prefix for an Annex-B +/// elementary stream. +pub fn build_prefix<'a, I: IntoIterator>(nals: I) -> Bytes { + let nals: Vec<&Bytes> = nals.into_iter().collect(); + let total: usize = nals.iter().map(|n| n.len() + START_CODE.len()).sum(); + let mut out = BytesMut::with_capacity(total); + for nal in nals { + out.extend_from_slice(&START_CODE); + out.extend_from_slice(nal); + } + out.freeze() +} + pub struct NalIterator<'a, T: Buf + AsRef<[u8]> + 'a> { buf: &'a mut T, start: Option, diff --git a/rs/moq-mux/src/codec/h264/export.rs b/rs/moq-mux/src/codec/h264/export.rs new file mode 100644 index 000000000..03b2a3be8 --- /dev/null +++ b/rs/moq-mux/src/codec/h264/export.rs @@ -0,0 +1,165 @@ +//! H.264 single-rendition Annex-B exporter. +//! +//! Subscribes to one H.264 rendition from a catalog-narrowed stream and emits +//! a raw Annex-B elementary stream. Suitable for piping into `ffmpeg`, decoder +//! fuzzers, or recording one codec to disk. There is no container framing +//! (timestamps are dropped). +//! +//! Two source shapes are accepted: +//! - **avc3** (catalog `description` empty): payload is already Annex-B with +//! SPS/PPS inline. Pass through unchanged. +//! - **avc1** (catalog `description` is the avcC): length-prefixed NALUs. +//! Length prefixes are replaced with `00 00 00 01` start codes; SPS/PPS +//! extracted from the avcC are injected ahead of every keyframe. + +use std::task::Poll; +use std::time::Duration; + +use bytes::Bytes; +use hang::Catalog; +use hang::catalog::VideoCodecKind; + +use crate::catalog::Stream; +use crate::codec::annexb; +use crate::container::ExportSource; + +/// Single-rendition H.264 Annex-B exporter. +pub struct Export { + broadcast: moq_net::BroadcastConsumer, + catalog: Option, + latency: Duration, + track: Option, +} + +struct H264Track { + name: String, + source: ExportSource, + /// `Some` for an avc1 source: SPS/PPS prefix prebuilt from the avcC, and + /// the avcC length-prefix size. `None` for an avc3 source: Annex-B passes + /// through without conversion. + convert: Option, +} + +struct Avc1Convert { + length_size: usize, + keyframe_prefix: Bytes, +} + +impl Export { + /// Subscribe to `broadcast` and emit an Annex-B H.264 byte stream. + /// + /// `catalog` is expected to be narrowed to a single H.264 rendition (e.g. + /// `consumer.filter()` with `codec = H264` then `.target()` for ABR + /// selection). Renditions of other codecs are ignored; if multiple H.264 + /// renditions appear in a snapshot, the first by BTreeMap order wins and + /// a warning is logged. + pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + Self { + broadcast, + catalog: Some(catalog), + latency: Duration::ZERO, + track: None, + } + } + + /// Set the maximum buffering latency for the per-track source. + pub fn with_latency(mut self, latency: Duration) -> Self { + self.latency = latency; + self + } + + pub async fn next(&mut self) -> anyhow::Result> { + conducer::wait(|waiter| self.poll_next(waiter)).await + } + + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + while let Some(catalog) = self.catalog.as_mut() { + match catalog.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => self.update_catalog(&snapshot)?, + Poll::Ready(None) => { + self.catalog = None; + break; + } + Poll::Pending => break, + } + } + + loop { + let Some(track) = self.track.as_mut() else { + if self.catalog.is_none() { + return Poll::Ready(Ok(None)); + } + return Poll::Pending; + }; + + match track.source.poll_read(waiter) { + Poll::Ready(Ok(Some(frame))) => { + let bytes = match &track.convert { + None => frame.payload, + Some(convert) => { + let prefix = frame.keyframe.then(|| convert.keyframe_prefix.as_ref()); + annexb::from_length_prefixed(&frame.payload, convert.length_size, prefix)? + } + }; + if bytes.is_empty() { + continue; + } + return Poll::Ready(Ok(Some(bytes))); + } + Poll::Ready(Ok(None)) => { + self.track = None; + continue; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + } + } + } + + fn update_catalog(&mut self, catalog: &Catalog) -> anyhow::Result<()> { + let picked = catalog + .video + .renditions + .iter() + .filter(|(_, c)| c.codec.kind() == VideoCodecKind::H264) + .collect::>(); + + if picked.len() > 1 { + tracing::warn!( + count = picked.len(), + "multiple H.264 renditions in catalog snapshot; using the first by name. \ + Narrow with catalog::Target to pick one explicitly." + ); + } + + let Some((name, config)) = picked.into_iter().next() else { + self.track = None; + return Ok(()); + }; + + if self.track.as_ref().is_some_and(|t| t.name == *name) { + return Ok(()); + } + + let source = ExportSource::for_video_raw(&self.broadcast, name, config, self.latency)?; + let convert = match config.description.as_ref().filter(|d| !d.is_empty()) { + None => None, + Some(avcc) => { + let params = super::parse_avcc_param_sets(avcc)?; + let prefix = annexb::build_prefix(params.sps.iter().chain(params.pps.iter())); + Some(Avc1Convert { + length_size: params.length_size, + keyframe_prefix: prefix, + }) + } + }; + + self.track = Some(H264Track { + name: name.clone(), + source, + convert, + }); + + Ok(()) + } +} diff --git a/rs/moq-mux/src/codec/h264/mod.rs b/rs/moq-mux/src/codec/h264/mod.rs index 4f14a6e41..d2a55282e 100644 --- a/rs/moq-mux/src/codec/h264/mod.rs +++ b/rs/moq-mux/src/codec/h264/mod.rs @@ -3,11 +3,15 @@ //! Parses SPS NAL units and AVCDecoderConfigurationRecord blobs into //! catalog-ready fields. The [`Avc1`] transmuxer rewrites Annex-B input //! (inline SPS/PPS) as length-prefixed NALU + out-of-band avcC, which is -//! what every CMAF and MKV consumer expects. [`Import`] is the importer; -//! it auto-detects either wire shape from the leading bytes. +//! what every CMAF and MKV consumer expects. [`Export`] subscribes to a +//! catalog-narrowed H.264 rendition and emits an Annex-B elementary +//! stream; [`Import`] is the importer (auto-detects either wire shape +//! from the leading bytes). +mod export; mod import; +pub use export::*; pub use import::*; use anyhow::Context; @@ -145,6 +149,49 @@ pub(crate) fn build_avcc(sps_nal: &[u8], pps_nal: &[u8]) -> anyhow::Result, + pub pps: Vec, +} + +/// Pull the SPS and PPS NAL units out of an AVCDecoderConfigurationRecord. +pub fn parse_avcc_param_sets(avcc: &[u8]) -> anyhow::Result { + anyhow::ensure!(avcc.len() >= 7, "avcC too short"); + let length_size = (avcc[4] & 0x03) as usize + 1; + let num_sps = (avcc[5] & 0x1f) as usize; + + let mut pos = 6; + let mut sps = Vec::with_capacity(num_sps); + for _ in 0..num_sps { + anyhow::ensure!(avcc.len() >= pos + 2, "avcC truncated in SPS length"); + let len = u16::from_be_bytes([avcc[pos], avcc[pos + 1]]) as usize; + pos += 2; + anyhow::ensure!(avcc.len() >= pos + len, "avcC truncated in SPS payload"); + sps.push(Bytes::copy_from_slice(&avcc[pos..pos + len])); + pos += len; + } + + anyhow::ensure!(avcc.len() > pos, "avcC truncated in PPS count"); + let num_pps = avcc[pos] as usize; + pos += 1; + + let mut pps = Vec::with_capacity(num_pps); + for _ in 0..num_pps { + anyhow::ensure!(avcc.len() >= pos + 2, "avcC truncated in PPS length"); + let len = u16::from_be_bytes([avcc[pos], avcc[pos + 1]]) as usize; + pos += 2; + anyhow::ensure!(avcc.len() >= pos + len, "avcC truncated in PPS payload"); + pps.push(Bytes::copy_from_slice(&avcc[pos..pos + len])); + pos += len; + } + + Ok(AvccParamSets { length_size, sps, pps }) +} + /// Transform H.264 frames from Annex-B (inline SPS/PPS, "avc3") to /// length-prefixed NALU (out-of-band AVCDecoderConfigurationRecord, "avc1"). /// diff --git a/rs/moq-mux/src/codec/h265/export.rs b/rs/moq-mux/src/codec/h265/export.rs new file mode 100644 index 000000000..7add07a4d --- /dev/null +++ b/rs/moq-mux/src/codec/h265/export.rs @@ -0,0 +1,156 @@ +//! H.265 single-rendition Annex-B exporter. +//! +//! HEVC analogue of [`crate::codec::h264::Export`]. Accepts either a hev1 +//! (Annex-B, parameter sets inline) or hvc1 (length-prefixed + out-of-band +//! hvcC) source and emits a raw Annex-B elementary stream. Timestamps are +//! dropped. + +use std::task::Poll; +use std::time::Duration; + +use bytes::Bytes; +use hang::Catalog; +use hang::catalog::VideoCodecKind; + +use crate::catalog::Stream; +use crate::codec::annexb; +use crate::container::ExportSource; + +/// Single-rendition H.265 Annex-B exporter. +pub struct Export { + broadcast: moq_net::BroadcastConsumer, + catalog: Option, + latency: Duration, + track: Option, +} + +struct H265Track { + name: String, + source: ExportSource, + /// `Some` for an hvc1 source: VPS/SPS/PPS prefix prebuilt from the hvcC, + /// and the hvcC length-prefix size. `None` for a hev1 source: Annex-B + /// passes through without conversion. + convert: Option, +} + +struct Hvc1Convert { + length_size: usize, + keyframe_prefix: Bytes, +} + +impl Export { + /// Subscribe to `broadcast` and emit an Annex-B H.265 byte stream. + /// + /// `catalog` is expected to be narrowed to a single H.265 rendition. If + /// multiple H.265 renditions appear in a snapshot, the first by BTreeMap + /// order wins and a warning is logged. + pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + Self { + broadcast, + catalog: Some(catalog), + latency: Duration::ZERO, + track: None, + } + } + + /// Set the maximum buffering latency for the per-track source. + pub fn with_latency(mut self, latency: Duration) -> Self { + self.latency = latency; + self + } + + pub async fn next(&mut self) -> anyhow::Result> { + conducer::wait(|waiter| self.poll_next(waiter)).await + } + + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + while let Some(catalog) = self.catalog.as_mut() { + match catalog.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => self.update_catalog(&snapshot)?, + Poll::Ready(None) => { + self.catalog = None; + break; + } + Poll::Pending => break, + } + } + + loop { + let Some(track) = self.track.as_mut() else { + if self.catalog.is_none() { + return Poll::Ready(Ok(None)); + } + return Poll::Pending; + }; + + match track.source.poll_read(waiter) { + Poll::Ready(Ok(Some(frame))) => { + let bytes = match &track.convert { + None => frame.payload, + Some(convert) => { + let prefix = frame.keyframe.then(|| convert.keyframe_prefix.as_ref()); + annexb::from_length_prefixed(&frame.payload, convert.length_size, prefix)? + } + }; + if bytes.is_empty() { + continue; + } + return Poll::Ready(Ok(Some(bytes))); + } + Poll::Ready(Ok(None)) => { + self.track = None; + continue; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + } + } + } + + fn update_catalog(&mut self, catalog: &Catalog) -> anyhow::Result<()> { + let picked = catalog + .video + .renditions + .iter() + .filter(|(_, c)| c.codec.kind() == VideoCodecKind::H265) + .collect::>(); + + if picked.len() > 1 { + tracing::warn!( + count = picked.len(), + "multiple H.265 renditions in catalog snapshot; using the first by name. \ + Narrow with catalog::Target to pick one explicitly." + ); + } + + let Some((name, config)) = picked.into_iter().next() else { + self.track = None; + return Ok(()); + }; + + if self.track.as_ref().is_some_and(|t| t.name == *name) { + return Ok(()); + } + + let source = ExportSource::for_video_raw(&self.broadcast, name, config, self.latency)?; + let convert = match config.description.as_ref().filter(|d| !d.is_empty()) { + None => None, + Some(hvcc) => { + let params = super::parse_hvcc_param_sets(hvcc)?; + let prefix = annexb::build_prefix(params.vps.iter().chain(params.sps.iter()).chain(params.pps.iter())); + Some(Hvc1Convert { + length_size: params.length_size, + keyframe_prefix: prefix, + }) + } + }; + + self.track = Some(H265Track { + name: name.clone(), + source, + convert, + }); + + Ok(()) + } +} diff --git a/rs/moq-mux/src/codec/h265/mod.rs b/rs/moq-mux/src/codec/h265/mod.rs index d236ba874..0455a3a56 100644 --- a/rs/moq-mux/src/codec/h265/mod.rs +++ b/rs/moq-mux/src/codec/h265/mod.rs @@ -3,16 +3,72 @@ //! The H.265 analogue of [`crate::codec::h264`]. Parses SPS NAL units //! and HEVCDecoderConfigurationRecord blobs. The [`Hvc1`] transmuxer //! rewrites Annex-B input (inline VPS/SPS/PPS) as length-prefixed NALU -//! + out-of-band hvcC. [`Import`] is the Annex-B importer. +//! + out-of-band hvcC. [`Export`] is the single-rendition Annex-B +//! exporter; [`Import`] is the Annex-B importer. +mod export; mod import; +pub use export::*; pub use import::*; use anyhow::Context; use bytes::{Buf, BufMut, Bytes, BytesMut}; use scuffle_h265::{NALUnitType, SpsNALUnit}; +/// VPS, SPS, and PPS NAL units extracted from an hvcC. +#[derive(Debug, Clone)] +pub struct HvccParamSets { + /// NALU length size in bytes (typically 4). + pub length_size: usize, + pub vps: Vec, + pub sps: Vec, + pub pps: Vec, +} + +/// Pull the VPS/SPS/PPS NAL units out of an HEVCDecoderConfigurationRecord. +pub fn parse_hvcc_param_sets(hvcc: &[u8]) -> anyhow::Result { + anyhow::ensure!(hvcc.len() >= 23, "hvcC too short"); + let length_size = (hvcc[21] & 0x3) as usize + 1; + let num_arrays = hvcc[22] as usize; + + let mut vps = Vec::new(); + let mut sps = Vec::new(); + let mut pps = Vec::new(); + let mut pos = 23; + + for _ in 0..num_arrays { + anyhow::ensure!(hvcc.len() >= pos + 3, "hvcC truncated in array header"); + let nal_type = hvcc[pos] & 0x3f; + pos += 1; + let num_nalus = u16::from_be_bytes([hvcc[pos], hvcc[pos + 1]]) as usize; + pos += 2; + + for _ in 0..num_nalus { + anyhow::ensure!(hvcc.len() >= pos + 2, "hvcC truncated in NAL length"); + let len = u16::from_be_bytes([hvcc[pos], hvcc[pos + 1]]) as usize; + pos += 2; + anyhow::ensure!(hvcc.len() >= pos + len, "hvcC truncated in NAL payload"); + let bytes = Bytes::copy_from_slice(&hvcc[pos..pos + len]); + pos += len; + + match NALUnitType::from(nal_type) { + NALUnitType::VpsNut => vps.push(bytes), + NALUnitType::SpsNut => sps.push(bytes), + NALUnitType::PpsNut => pps.push(bytes), + _ => {} + } + } + } + + Ok(HvccParamSets { + length_size, + vps, + sps, + pps, + }) +} + /// Annex-B → length-prefixed transmuxer; the H.265 analogue of /// [`crate::codec::h264::Avc1`]. pub struct Hvc1 { diff --git a/rs/moq-mux/src/container/fmp4/export.rs b/rs/moq-mux/src/container/fmp4/export.rs index 54f5ae946..ccef53a61 100644 --- a/rs/moq-mux/src/container/fmp4/export.rs +++ b/rs/moq-mux/src/container/fmp4/export.rs @@ -7,11 +7,10 @@ use bytes::Bytes; use hang::catalog::{Catalog, Container, VideoConfig}; use mp4_atom::{DecodeMaybe, Encode}; -use crate::catalog::CatalogFormat; +use crate::catalog::Stream; +use crate::container::ExportSource; use crate::container::Frame; -use crate::container::{CatalogSource, ExportSource}; - /// Subscribe to a moq broadcast and produce a single fMP4 / CMAF byte stream. /// /// Built from a [`moq_net::BroadcastConsumer`], `Export` subscribes to the hang catalog, @@ -26,9 +25,9 @@ use crate::container::{CatalogSource, ExportSource}; /// keyframes); [`with_fragment_duration`](Self::with_fragment_duration) caps the /// fragment duration for downstream consumers that throttle by fragment rate. /// Returns `None` when the broadcast ends. -pub struct Export { +pub struct Export { broadcast: moq_net::BroadcastConsumer, - catalog: Option, + catalog: Option, latency: Duration, fragment_duration: Option, @@ -64,29 +63,16 @@ struct Fmp4Track { sequence_number: u32, } -impl Export { - /// Subscribe to `broadcast` and produce fMP4 byte chunks, using the default - /// catalog format ([`CatalogFormat::Hang`]). +impl Export { + /// Subscribe to `broadcast` and produce fMP4 byte chunks, driving track + /// (un)subscription from `catalog`. /// - /// Use [`with_catalog_format`](Self::with_catalog_format) to subscribe to a - /// non-default catalog track (e.g. MSF). - pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result { - Self::with_catalog_format(broadcast, CatalogFormat::default()) - } - - /// Subscribe to `broadcast` and produce fMP4 byte chunks, selecting an - /// explicit `catalog_format` for track discovery. - /// - /// Both formats drive the same internal `hang::Catalog`-based pipeline (MSF - /// snapshots are converted on receipt), so the only observable difference - /// is which wire catalog track is consumed. - pub fn with_catalog_format( - broadcast: moq_net::BroadcastConsumer, - catalog_format: CatalogFormat, - ) -> Result { - let catalog = CatalogSource::new(&broadcast, catalog_format)?; - - Ok(Self { + /// `catalog` is any [`Stream`] of catalog snapshots — typically a + /// [`catalog::Consumer`](crate::catalog::Consumer) directly, or wrapped in + /// [`catalog::Filter`](crate::catalog::Filter) / + /// [`catalog::Target`](crate::catalog::Target) to narrow the rendition set. + pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + Self { broadcast, catalog: Some(catalog), latency: Duration::ZERO, @@ -94,7 +80,7 @@ impl Export { tracks: HashMap::new(), catalog_snapshot: None, init_emitted: false, - }) + } } /// Set the maximum buffering latency for each per-track source. diff --git a/rs/moq-mux/src/container/fmp4/export_test.rs b/rs/moq-mux/src/container/fmp4/export_test.rs index 596142e2e..606e8f26a 100644 --- a/rs/moq-mux/src/container/fmp4/export_test.rs +++ b/rs/moq-mux/src/container/fmp4/export_test.rs @@ -58,7 +58,9 @@ async fn avc3_source_to_cmaf_export_roundtrip() { .unwrap(); track_producer.finish().unwrap(); - let mut exporter = crate::container::fmp4::Export::new(consumer).expect("new Fmp4"); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::fmp4::Export::new(consumer, catalog_stream); let init = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()) .await @@ -121,7 +123,9 @@ async fn cmaf_source_to_cmaf_export_passthrough() { let mut buf = BytesMut::from(data.as_slice()); let _ = importer.decode(&mut buf); - let mut exporter = crate::container::fmp4::Export::new(consumer).expect("new Fmp4"); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::fmp4::Export::new(consumer, catalog_stream); let init = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()) .await diff --git a/rs/moq-mux/src/container/mkv/export.rs b/rs/moq-mux/src/container/mkv/export.rs index ee335564b..26e0d0307 100644 --- a/rs/moq-mux/src/container/mkv/export.rs +++ b/rs/moq-mux/src/container/mkv/export.rs @@ -9,11 +9,10 @@ use hang::catalog::{AudioCodec, AudioConfig, Catalog, Container, VideoCodec, Vid use webm_iterable::matroska_spec::{Master, MatroskaSpec}; use webm_iterable::{WebmWriter, WriteOptions}; -use crate::catalog::CatalogFormat; +use crate::catalog::Stream; +use crate::container::ExportSource; use crate::container::Frame; -use crate::container::{CatalogSource, ExportSource}; - /// Matroska TimestampScale: 1 ms (in nanoseconds). const TIMESTAMP_SCALE_NS: u64 = 1_000_000; @@ -44,9 +43,9 @@ const TIMESTAMP_SCALE_NS: u64 = 1_000_000; /// /// Only Legacy-container tracks (raw codec payloads) are supported. CMAF tracks /// (moof+mdat passthrough) are rejected with a clear error. -pub struct Export { +pub struct Export { broadcast: moq_net::BroadcastConsumer, - catalog: Option, + catalog: Option, latency: Duration, fragment_duration: Option, @@ -151,29 +150,16 @@ impl ClusterBuilder { } } -impl Export { - /// Subscribe to `broadcast` and produce MKV byte chunks, using the default - /// catalog format ([`CatalogFormat::Hang`]). - /// - /// Use [`with_catalog_format`](Self::with_catalog_format) to subscribe to a - /// non-default catalog track (e.g. MSF). - pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result { - Self::with_catalog_format(broadcast, CatalogFormat::default()) - } - - /// Subscribe to `broadcast` and produce MKV byte chunks, selecting an - /// explicit `catalog_format` for track discovery. +impl Export { + /// Subscribe to `broadcast` and produce MKV byte chunks, driving track + /// (un)subscription from `catalog`. /// - /// Both formats drive the same internal `hang::Catalog`-based pipeline (MSF - /// snapshots are converted on receipt), so the only observable difference - /// is which wire catalog track is consumed. - pub fn with_catalog_format( - broadcast: moq_net::BroadcastConsumer, - catalog_format: CatalogFormat, - ) -> Result { - let catalog = CatalogSource::new(&broadcast, catalog_format)?; - - Ok(Self { + /// `catalog` is any [`Stream`] of catalog snapshots — typically a + /// [`catalog::Consumer`](crate::catalog::Consumer) directly, or wrapped in + /// [`catalog::Filter`](crate::catalog::Filter) / + /// [`catalog::Target`](crate::catalog::Target) to narrow the rendition set. + pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + Self { broadcast, catalog: Some(catalog), latency: Duration::ZERO, @@ -182,7 +168,7 @@ impl Export { catalog_snapshot: None, header_emitted: false, cluster: None, - }) + } } /// Set the maximum buffering latency for each per-track source. diff --git a/rs/moq-mux/src/container/mkv/export_test.rs b/rs/moq-mux/src/container/mkv/export_test.rs index 12311cf31..34a33dc1e 100644 --- a/rs/moq-mux/src/container/mkv/export_test.rs +++ b/rs/moq-mux/src/container/mkv/export_test.rs @@ -28,7 +28,9 @@ async fn export_header_roundtrip_vp9_opus() { importer.finish().unwrap(); // Now subscribe via the exporter and pull bytes. - let mut exporter = crate::container::mkv::Export::new(consumer).unwrap(); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::mkv::Export::new(consumer, catalog_stream); // First `next()` should give us the header (EBML + Segment-start + Info + Tracks). let header = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()) @@ -153,8 +155,9 @@ async fn export_emits_blocks_for_each_frame() { importer.decode(&mut buf).unwrap(); importer.finish().unwrap(); - let mut exporter = crate::container::mkv::Export::new(consumer) - .unwrap() + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::mkv::Export::new(consumer, catalog_stream) // Use per-frame clustering so each frame is observable as its own // Cluster chunk; batching is exercised in a dedicated test below. .with_fragment_duration(std::time::Duration::ZERO); @@ -240,7 +243,9 @@ async fn export_rejects_cmaf_track() { }; catalog.lock().video.renditions.insert(track.name.clone(), config); - let mut exporter = crate::container::mkv::Export::new(consumer).unwrap(); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::mkv::Export::new(consumer, catalog_stream); let result = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()) .await .expect("exporter timed out"); @@ -314,9 +319,10 @@ async fn export_avc3_source_synthesizes_avcc_and_length_prefixes() { let mut catalog = catalog; catalog.finish().unwrap(); - let mut exporter = crate::container::mkv::Export::new(consumer) - .unwrap() - .with_fragment_duration(std::time::Duration::ZERO); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = + crate::container::mkv::Export::new(consumer, catalog_stream).with_fragment_duration(std::time::Duration::ZERO); let mut exported: Vec = Vec::new(); let mut held_producer = Some(producer); @@ -449,8 +455,9 @@ async fn export_fragment_duration_batches_blocks() { importer.finish().unwrap(); catalog.finish().unwrap(); - let mut exporter = crate::container::mkv::Export::new(consumer) - .unwrap() + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::mkv::Export::new(consumer, catalog_stream) .with_fragment_duration(std::time::Duration::from_secs(2)); let mut exported: Vec = Vec::new(); diff --git a/rs/moq-mux/src/container/mod.rs b/rs/moq-mux/src/container/mod.rs index 1050202e7..d6276ec14 100644 --- a/rs/moq-mux/src/container/mod.rs +++ b/rs/moq-mux/src/container/mod.rs @@ -27,7 +27,7 @@ pub mod mkv; pub use consumer::Consumer; pub use producer::Producer; -pub(crate) use source::{CatalogSource, ExportSource}; +pub(crate) use source::ExportSource; /// Microsecond presentation timestamp, the canonical timebase for media /// frames in moq-mux. diff --git a/rs/moq-mux/src/container/source.rs b/rs/moq-mux/src/container/source.rs index 9363d6ee5..bdea9cf02 100644 --- a/rs/moq-mux/src/container/source.rs +++ b/rs/moq-mux/src/container/source.rs @@ -19,45 +19,11 @@ use std::time::Duration; use bytes::Bytes; use hang::catalog::{AudioConfig, VideoCodec, VideoConfig}; -use crate::catalog::CatalogFormat; use crate::catalog::hang::Container as HangContainer; use crate::codec::h264::Avc1; use crate::codec::h265::Hvc1; use crate::container::{Consumer, Frame}; -/// Source for the catalog stream backing an exporter. -/// -/// Both variants expose the same [`hang::Catalog`] shape; the MSF variant -/// converts on the fly so the rest of the pipeline only deals with hang types. -pub(crate) enum CatalogSource { - /// The hang catalog track (track name `catalog.json`, JSON payload). - Hang(crate::catalog::hang::Consumer), - /// The MSF catalog track (track name `catalog`, MSF JSON payload converted to hang). - Msf(crate::catalog::msf::Consumer), -} - -impl CatalogSource { - pub(crate) fn new(broadcast: &moq_net::BroadcastConsumer, format: CatalogFormat) -> Result { - Ok(match format { - CatalogFormat::Hang => { - let track = broadcast.subscribe_track(&hang::Catalog::default_track())?; - CatalogSource::Hang(crate::catalog::hang::Consumer::new(track)) - } - CatalogFormat::Msf => { - let track = broadcast.subscribe_track(&moq_net::Track::new(moq_msf::DEFAULT_NAME))?; - CatalogSource::Msf(crate::catalog::msf::Consumer::new(track)) - } - }) - } - - pub(crate) fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { - match self { - Self::Hang(c) => c.poll_next(waiter).map_err(Into::into), - Self::Msf(c) => c.poll_next(waiter), - } - } -} - /// Per-track video transform that bridges between codec shapes. pub(crate) enum VideoTransform { Avc1(Avc1), @@ -114,6 +80,29 @@ impl ExportSource { }) } + /// Subscribe to a video rendition without attaching any codec-shape + /// transform. Payloads pass through untouched (Annex-B stays Annex-B, + /// avc1 length-prefixed stays length-prefixed). The Annex-B exporter + /// uses this to keep parameter sets in-band. + pub fn for_video_raw( + broadcast: &moq_net::BroadcastConsumer, + name: &str, + config: &VideoConfig, + latency: Duration, + ) -> Result { + let media: HangContainer = (&config.container).try_into()?; + let track = broadcast.subscribe_track(&moq_net::Track::new(name.to_string()))?; + let consumer = Consumer::new(track, media).with_latency(latency); + + let description = config.description.as_ref().filter(|b| !b.is_empty()).cloned(); + + Ok(Self { + consumer, + transform: None, + description, + }) + } + /// Subscribe to an audio rendition. Audio has no codec-shape transform; /// `description` is taken straight from the catalog. pub fn for_audio( From 833a51d8352cc159b53a890f95db6f3626d8668b Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 24 May 2026 12:41:28 -0700 Subject: [PATCH 2/6] moq-mux: address CodeRabbit review feedback - annexb/avcC/hvcC parsers use checked_add for offset arithmetic so malformed configs surface as errors instead of usize panics on overflow. - h264::Export and h265::Export now compare the full VideoConfig (not just the rendition name) when deciding to refresh the source, so catalog updates that change codec details while keeping the same name rebuild correctly. - avc1 / hvc1 conversion fails fast when parse_avcc_param_sets / parse_hvcc_param_sets returns empty parameter sets, instead of silently emitting keyframes without injected SPS/PPS (and VPS). - moq subscribe errors when --video-codec contradicts the format- implied codec (--format h264 + --video-codec h265 etc.) so misuse fails in the CLI rather than later in the exporter. Co-Authored-By: Claude Opus 4.7 (1M context) --- rs/moq-cli/src/subscribe.rs | 41 +++++++++++++++++++++-------- rs/moq-mux/src/codec/annexb.rs | 19 ++++++++----- rs/moq-mux/src/codec/h264/export.rs | 17 ++++++++++-- rs/moq-mux/src/codec/h264/mod.rs | 41 ++++++++++++++++------------- rs/moq-mux/src/codec/h265/export.rs | 18 +++++++++++-- rs/moq-mux/src/codec/h265/mod.rs | 27 ++++++++++++------- 6 files changed, 113 insertions(+), 50 deletions(-) diff --git a/rs/moq-cli/src/subscribe.rs b/rs/moq-cli/src/subscribe.rs index d5740693e..d0c7128bd 100644 --- a/rs/moq-cli/src/subscribe.rs +++ b/rs/moq-cli/src/subscribe.rs @@ -143,21 +143,40 @@ impl SubscribeArgs { .unwrap_or_default() } - /// Build a video filter from the parsed flags, plus any codec defaulted by - /// the chosen output format (e.g. `--format h264` implies `codec = H264`). - fn filter_video(&self) -> Option { - let codec = self.video_codec.map(Into::into).or(match self.format { + /// Codec implied by the output format. `--format h264` / `--format h265` + /// each force a single codec family; container formats leave it open. + fn format_codec(&self) -> Option { + match self.format { SubscribeFormat::H264 => Some(VideoCodecKind::H264), SubscribeFormat::H265 => Some(VideoCodecKind::H265), - _ => None, - }); + SubscribeFormat::Fmp4 | SubscribeFormat::Mkv => None, + } + } + + /// Build a video filter from the parsed flags, plus any codec defaulted by + /// the chosen output format (e.g. `--format h264` implies `codec = H264`). + /// + /// Errors if `--video-codec` contradicts the format-implied codec — fail + /// fast in the CLI rather than later in the exporter. + fn filter_video(&self) -> anyhow::Result> { + let user_codec = self.video_codec.map(VideoCodecKind::from); + let codec = match (self.format_codec(), user_codec) { + (Some(fmt), Some(user)) if fmt != user => { + anyhow::bail!( + "--format implies video codec {fmt:?}, but --video-codec {user:?} was passed; \ + remove --video-codec or pick a matching format" + ); + } + (Some(fmt), _) => Some(fmt), + (None, user) => user, + }; if self.video_name.is_none() && codec.is_none() { - return None; + return Ok(None); } - Some(FilterVideo { + Ok(Some(FilterVideo { name: self.video_name.clone(), codec, - }) + })) } fn filter_audio(&self) -> Option { @@ -207,11 +226,11 @@ impl Subscribe { } /// Build the catalog stream from the configured filter/target flags. - fn stream(&self) -> Result>, moq_mux::Error> { + fn stream(&self) -> anyhow::Result>> { let consumer = catalog::Consumer::new(&self.broadcast, self.catalog)?; let mut filter = consumer.filter(); - filter.set_video(self.args.filter_video()); + filter.set_video(self.args.filter_video()?); filter.set_audio(self.args.filter_audio()); let mut target = filter.target(); diff --git a/rs/moq-mux/src/codec/annexb.rs b/rs/moq-mux/src/codec/annexb.rs index 06d8ada9e..aed772886 100644 --- a/rs/moq-mux/src/codec/annexb.rs +++ b/rs/moq-mux/src/codec/annexb.rs @@ -1,4 +1,4 @@ -use anyhow::{self}; +use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; pub const START_CODE: Bytes = Bytes::from_static(&[0, 0, 0, 1]); @@ -19,16 +19,21 @@ pub fn from_length_prefixed(payload: &[u8], length_size: usize, prefix: Option<& let mut pos = 0; while pos < payload.len() { - anyhow::ensure!(payload.len() >= pos + length_size, "truncated NAL length prefix"); + let after_prefix = pos + .checked_add(length_size) + .context("offset overflow reading NAL length prefix")?; + anyhow::ensure!(payload.len() >= after_prefix, "truncated NAL length prefix"); let mut len = 0usize; - for byte in &payload[pos..pos + length_size] { + for byte in &payload[pos..after_prefix] { len = (len << 8) | (*byte as usize); } - pos += length_size; - anyhow::ensure!(payload.len() >= pos + len, "truncated NAL payload"); + let after_nal = after_prefix + .checked_add(len) + .context("offset overflow reading NAL payload")?; + anyhow::ensure!(payload.len() >= after_nal, "truncated NAL payload"); out.extend_from_slice(&START_CODE); - out.extend_from_slice(&payload[pos..pos + len]); - pos += len; + out.extend_from_slice(&payload[after_prefix..after_nal]); + pos = after_nal; } Ok(out.freeze()) diff --git a/rs/moq-mux/src/codec/h264/export.rs b/rs/moq-mux/src/codec/h264/export.rs index 03b2a3be8..e7ee50dcc 100644 --- a/rs/moq-mux/src/codec/h264/export.rs +++ b/rs/moq-mux/src/codec/h264/export.rs @@ -17,7 +17,7 @@ use std::time::Duration; use bytes::Bytes; use hang::Catalog; -use hang::catalog::VideoCodecKind; +use hang::catalog::{VideoCodecKind, VideoConfig}; use crate::catalog::Stream; use crate::codec::annexb; @@ -33,6 +33,11 @@ pub struct Export { struct H264Track { name: String, + /// Snapshot of the catalog config we built `source` from. Cached so that + /// a catalog update which keeps the same rendition name but changes the + /// codec config (e.g. a new avcC) triggers a full rebuild instead of + /// silently reusing a stale `convert`. + config: VideoConfig, source: ExportSource, /// `Some` for an avc1 source: SPS/PPS prefix prebuilt from the avcC, and /// the avcC length-prefix size. `None` for an avc3 source: Annex-B passes @@ -137,7 +142,7 @@ impl Export { return Ok(()); }; - if self.track.as_ref().is_some_and(|t| t.name == *name) { + if self.track.as_ref().is_some_and(|t| t.name == *name && t.config == *config) { return Ok(()); } @@ -146,6 +151,13 @@ impl Export { None => None, Some(avcc) => { let params = super::parse_avcc_param_sets(avcc)?; + anyhow::ensure!( + !params.sps.is_empty() && !params.pps.is_empty(), + "avc1 description for rendition {name:?} is missing SPS or PPS \ + (sps={}, pps={}); cannot inject parameter sets at keyframes", + params.sps.len(), + params.pps.len(), + ); let prefix = annexb::build_prefix(params.sps.iter().chain(params.pps.iter())); Some(Avc1Convert { length_size: params.length_size, @@ -156,6 +168,7 @@ impl Export { self.track = Some(H264Track { name: name.clone(), + config: config.clone(), source, convert, }); diff --git a/rs/moq-mux/src/codec/h264/mod.rs b/rs/moq-mux/src/codec/h264/mod.rs index d2a55282e..41d7d2634 100644 --- a/rs/moq-mux/src/codec/h264/mod.rs +++ b/rs/moq-mux/src/codec/h264/mod.rs @@ -165,33 +165,38 @@ pub fn parse_avcc_param_sets(avcc: &[u8]) -> anyhow::Result { let num_sps = (avcc[5] & 0x1f) as usize; let mut pos = 6; - let mut sps = Vec::with_capacity(num_sps); - for _ in 0..num_sps { - anyhow::ensure!(avcc.len() >= pos + 2, "avcC truncated in SPS length"); - let len = u16::from_be_bytes([avcc[pos], avcc[pos + 1]]) as usize; - pos += 2; - anyhow::ensure!(avcc.len() >= pos + len, "avcC truncated in SPS payload"); - sps.push(Bytes::copy_from_slice(&avcc[pos..pos + len])); - pos += len; - } + let sps = read_param_sets(avcc, &mut pos, num_sps, "SPS")?; anyhow::ensure!(avcc.len() > pos, "avcC truncated in PPS count"); let num_pps = avcc[pos] as usize; pos += 1; - let mut pps = Vec::with_capacity(num_pps); - for _ in 0..num_pps { - anyhow::ensure!(avcc.len() >= pos + 2, "avcC truncated in PPS length"); - let len = u16::from_be_bytes([avcc[pos], avcc[pos + 1]]) as usize; - pos += 2; - anyhow::ensure!(avcc.len() >= pos + len, "avcC truncated in PPS payload"); - pps.push(Bytes::copy_from_slice(&avcc[pos..pos + len])); - pos += len; - } + let pps = read_param_sets(avcc, &mut pos, num_pps, "PPS")?; Ok(AvccParamSets { length_size, sps, pps }) } +/// Read `count` length-prefixed (u16) NAL units from `buf` starting at `*pos`, +/// advancing `*pos` past the last one. All arithmetic is checked so malformed +/// configs surface as errors rather than panics. +fn read_param_sets(buf: &[u8], pos: &mut usize, count: usize, label: &str) -> anyhow::Result> { + let mut out = Vec::with_capacity(count); + for _ in 0..count { + let after_len = pos + .checked_add(2) + .with_context(|| format!("offset overflow reading {label} length"))?; + anyhow::ensure!(buf.len() >= after_len, "avcC truncated in {label} length"); + let len = u16::from_be_bytes([buf[*pos], buf[*pos + 1]]) as usize; + let after_nal = after_len + .checked_add(len) + .with_context(|| format!("offset overflow reading {label} payload"))?; + anyhow::ensure!(buf.len() >= after_nal, "avcC truncated in {label} payload"); + out.push(Bytes::copy_from_slice(&buf[after_len..after_nal])); + *pos = after_nal; + } + Ok(out) +} + /// Transform H.264 frames from Annex-B (inline SPS/PPS, "avc3") to /// length-prefixed NALU (out-of-band AVCDecoderConfigurationRecord, "avc1"). /// diff --git a/rs/moq-mux/src/codec/h265/export.rs b/rs/moq-mux/src/codec/h265/export.rs index 7add07a4d..b385b1a9a 100644 --- a/rs/moq-mux/src/codec/h265/export.rs +++ b/rs/moq-mux/src/codec/h265/export.rs @@ -10,7 +10,7 @@ use std::time::Duration; use bytes::Bytes; use hang::Catalog; -use hang::catalog::VideoCodecKind; +use hang::catalog::{VideoCodecKind, VideoConfig}; use crate::catalog::Stream; use crate::codec::annexb; @@ -26,6 +26,11 @@ pub struct Export { struct H265Track { name: String, + /// Snapshot of the catalog config we built `source` from. Cached so that + /// a catalog update which keeps the same rendition name but changes the + /// codec config (e.g. a new hvcC) triggers a full rebuild instead of + /// silently reusing a stale `convert`. + config: VideoConfig, source: ExportSource, /// `Some` for an hvc1 source: VPS/SPS/PPS prefix prebuilt from the hvcC, /// and the hvcC length-prefix size. `None` for a hev1 source: Annex-B @@ -128,7 +133,7 @@ impl Export { return Ok(()); }; - if self.track.as_ref().is_some_and(|t| t.name == *name) { + if self.track.as_ref().is_some_and(|t| t.name == *name && t.config == *config) { return Ok(()); } @@ -137,6 +142,14 @@ impl Export { None => None, Some(hvcc) => { let params = super::parse_hvcc_param_sets(hvcc)?; + anyhow::ensure!( + !params.vps.is_empty() && !params.sps.is_empty() && !params.pps.is_empty(), + "hvc1 description for rendition {name:?} is missing VPS, SPS, or PPS \ + (vps={}, sps={}, pps={}); cannot inject parameter sets at keyframes", + params.vps.len(), + params.sps.len(), + params.pps.len(), + ); let prefix = annexb::build_prefix(params.vps.iter().chain(params.sps.iter()).chain(params.pps.iter())); Some(Hvc1Convert { length_size: params.length_size, @@ -147,6 +160,7 @@ impl Export { self.track = Some(H265Track { name: name.clone(), + config: config.clone(), source, convert, }); diff --git a/rs/moq-mux/src/codec/h265/mod.rs b/rs/moq-mux/src/codec/h265/mod.rs index 0455a3a56..f2dcd6c94 100644 --- a/rs/moq-mux/src/codec/h265/mod.rs +++ b/rs/moq-mux/src/codec/h265/mod.rs @@ -35,22 +35,29 @@ pub fn parse_hvcc_param_sets(hvcc: &[u8]) -> anyhow::Result { let mut vps = Vec::new(); let mut sps = Vec::new(); let mut pps = Vec::new(); - let mut pos = 23; + let mut pos: usize = 23; for _ in 0..num_arrays { - anyhow::ensure!(hvcc.len() >= pos + 3, "hvcC truncated in array header"); + let after_hdr = pos + .checked_add(3) + .context("offset overflow reading hvcC array header")?; + anyhow::ensure!(hvcc.len() >= after_hdr, "hvcC truncated in array header"); let nal_type = hvcc[pos] & 0x3f; - pos += 1; - let num_nalus = u16::from_be_bytes([hvcc[pos], hvcc[pos + 1]]) as usize; - pos += 2; + let num_nalus = u16::from_be_bytes([hvcc[pos + 1], hvcc[pos + 2]]) as usize; + pos = after_hdr; for _ in 0..num_nalus { - anyhow::ensure!(hvcc.len() >= pos + 2, "hvcC truncated in NAL length"); + let after_len = pos + .checked_add(2) + .context("offset overflow reading hvcC NAL length")?; + anyhow::ensure!(hvcc.len() >= after_len, "hvcC truncated in NAL length"); let len = u16::from_be_bytes([hvcc[pos], hvcc[pos + 1]]) as usize; - pos += 2; - anyhow::ensure!(hvcc.len() >= pos + len, "hvcC truncated in NAL payload"); - let bytes = Bytes::copy_from_slice(&hvcc[pos..pos + len]); - pos += len; + let after_nal = after_len + .checked_add(len) + .context("offset overflow reading hvcC NAL payload")?; + anyhow::ensure!(hvcc.len() >= after_nal, "hvcC truncated in NAL payload"); + let bytes = Bytes::copy_from_slice(&hvcc[after_len..after_nal]); + pos = after_nal; match NALUnitType::from(nal_type) { NALUnitType::VpsNut => vps.push(bytes), From b5dad7eed9e809cf5bd085ca18458701d74bf898 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 24 May 2026 12:49:14 -0700 Subject: [PATCH 3/6] moq-mux: apply rustfmt to address CI format check --- rs/moq-mux/src/codec/h264/export.rs | 6 +++++- rs/moq-mux/src/codec/h265/export.rs | 6 +++++- rs/moq-mux/src/codec/h265/mod.rs | 4 +--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/rs/moq-mux/src/codec/h264/export.rs b/rs/moq-mux/src/codec/h264/export.rs index e7ee50dcc..753447a1f 100644 --- a/rs/moq-mux/src/codec/h264/export.rs +++ b/rs/moq-mux/src/codec/h264/export.rs @@ -142,7 +142,11 @@ impl Export { return Ok(()); }; - if self.track.as_ref().is_some_and(|t| t.name == *name && t.config == *config) { + if self + .track + .as_ref() + .is_some_and(|t| t.name == *name && t.config == *config) + { return Ok(()); } diff --git a/rs/moq-mux/src/codec/h265/export.rs b/rs/moq-mux/src/codec/h265/export.rs index b385b1a9a..69e459bbb 100644 --- a/rs/moq-mux/src/codec/h265/export.rs +++ b/rs/moq-mux/src/codec/h265/export.rs @@ -133,7 +133,11 @@ impl Export { return Ok(()); }; - if self.track.as_ref().is_some_and(|t| t.name == *name && t.config == *config) { + if self + .track + .as_ref() + .is_some_and(|t| t.name == *name && t.config == *config) + { return Ok(()); } diff --git a/rs/moq-mux/src/codec/h265/mod.rs b/rs/moq-mux/src/codec/h265/mod.rs index f2dcd6c94..9cc2abb78 100644 --- a/rs/moq-mux/src/codec/h265/mod.rs +++ b/rs/moq-mux/src/codec/h265/mod.rs @@ -47,9 +47,7 @@ pub fn parse_hvcc_param_sets(hvcc: &[u8]) -> anyhow::Result { pos = after_hdr; for _ in 0..num_nalus { - let after_len = pos - .checked_add(2) - .context("offset overflow reading hvcC NAL length")?; + let after_len = pos.checked_add(2).context("offset overflow reading hvcC NAL length")?; anyhow::ensure!(hvcc.len() >= after_len, "hvcC truncated in NAL length"); let len = u16::from_be_bytes([hvcc[pos], hvcc[pos + 1]]) as usize; let after_nal = after_len From d20aea981563c0e58f7f314a9430c2edaf1f0962 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 24 May 2026 15:27:44 -0700 Subject: [PATCH 4/6] moq-mux: address review feedback round 3 Three changes from Luke's review: 1. CLI flags: rename `--max-video-*` / `--max-audio-bitrate` to put the `_max` suffix at the end (`--video-width-max`, `--video-bitrate-max`, etc.) for consistency. 2. `catalog::Stream` trait now returns `crate::Result>` instead of `anyhow::Result`. `msf::Consumer` follows suit; its rich parsing chain stays in `anyhow::Error` internally and is wrapped in a new `crate::Error::Msf(anyhow::Error)` variant at the boundary so the layered context is preserved. 3. `catalog::Filter` and `catalog::Target` swap the dirty-flag re-emit trick for a `conducer::Producer` + paired `Consumer` pair. `set_video` / `set_audio` write through the producer and the Mut::drop wakes any paired consumer waiters, so a retarget mid-poll now wakes the polling task (the dirty flag silently waited for the next upstream snapshot). The state carries a monotonic `epoch` counter so `poll_next` can tell whether the criteria changed since the last emit without diffing the structs. Inner and consumer waiters are both registered on Pending, so either a new upstream snapshot or a setter triggers a re-poll. Co-Authored-By: Claude Opus 4.7 (1M context) --- rs/moq-cli/src/subscribe.rs | 28 ++-- rs/moq-mux/src/catalog/consumer.rs | 4 +- rs/moq-mux/src/catalog/filter.rs | 176 ++++++++++++++++--------- rs/moq-mux/src/catalog/msf/consumer.rs | 17 ++- rs/moq-mux/src/catalog/stream.rs | 4 +- rs/moq-mux/src/catalog/target.rs | 154 ++++++++++++++++------ rs/moq-mux/src/error.rs | 9 ++ 7 files changed, 270 insertions(+), 122 deletions(-) diff --git a/rs/moq-cli/src/subscribe.rs b/rs/moq-cli/src/subscribe.rs index d0c7128bd..27111611f 100644 --- a/rs/moq-cli/src/subscribe.rs +++ b/rs/moq-cli/src/subscribe.rs @@ -106,19 +106,19 @@ pub struct SubscribeArgs { /// Prefer a video rendition no wider than this (px). #[arg(long)] - pub max_video_width: Option, + pub video_width_max: Option, /// Prefer a video rendition no taller than this (px). #[arg(long)] - pub max_video_height: Option, + pub video_height_max: Option, /// Prefer a video rendition with at most this many pixels (`coded_width * coded_height`). #[arg(long)] - pub max_video_pixels: Option, + pub video_pixels_max: Option, /// Prefer a video rendition under this bitrate (bits per second). #[arg(long)] - pub max_video_bitrate: Option, + pub video_bitrate_max: Option, /// Pick the audio rendition with this exact name. #[arg(long)] @@ -130,7 +130,7 @@ pub struct SubscribeArgs { /// Prefer an audio rendition under this bitrate (bits per second). #[arg(long)] - pub max_audio_bitrate: Option, + pub audio_bitrate_max: Option, } impl SubscribeArgs { @@ -190,23 +190,23 @@ impl SubscribeArgs { } fn target_video(&self) -> Option { - if self.max_video_width.is_none() - && self.max_video_height.is_none() - && self.max_video_pixels.is_none() - && self.max_video_bitrate.is_none() + if self.video_width_max.is_none() + && self.video_height_max.is_none() + && self.video_pixels_max.is_none() + && self.video_bitrate_max.is_none() { return None; } Some(TargetVideo { - width: self.max_video_width, - height: self.max_video_height, - pixels: self.max_video_pixels, - bitrate: self.max_video_bitrate, + width: self.video_width_max, + height: self.video_height_max, + pixels: self.video_pixels_max, + bitrate: self.video_bitrate_max, }) } fn target_audio(&self) -> Option { - self.max_audio_bitrate.map(|b| TargetAudio { bitrate: Some(b) }) + self.audio_bitrate_max.map(|b| TargetAudio { bitrate: Some(b) }) } } diff --git a/rs/moq-mux/src/catalog/consumer.rs b/rs/moq-mux/src/catalog/consumer.rs index 2bea9aa30..0968517bf 100644 --- a/rs/moq-mux/src/catalog/consumer.rs +++ b/rs/moq-mux/src/catalog/consumer.rs @@ -37,9 +37,9 @@ impl Consumer { } impl Stream for Consumer { - fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { match self { - Self::Hang(c) => c.poll_next(waiter).map_err(Into::into), + Self::Hang(c) => c.poll_next(waiter), Self::Msf(c) => c.poll_next(waiter), } } diff --git a/rs/moq-mux/src/catalog/filter.rs b/rs/moq-mux/src/catalog/filter.rs index 78b9aa5b7..00596d0af 100644 --- a/rs/moq-mux/src/catalog/filter.rs +++ b/rs/moq-mux/src/catalog/filter.rs @@ -30,97 +30,155 @@ pub struct FilterAudio { pub codec: Option, } +/// Shared state behind a [`Filter`]. +/// +/// `epoch` advances on every setter so [`Filter::poll_next`] can tell whether +/// the criteria changed since the last emit. +#[derive(Debug, Default, Clone)] +struct FilterState { + video: Option, + audio: Option, + epoch: u64, +} + /// A [`Stream`] that drops renditions failing a [`FilterVideo`] / [`FilterAudio`]. +/// +/// Selection criteria live behind a [`conducer::Producer`], so calls to +/// [`set_video`](Self::set_video) / [`set_audio`](Self::set_audio) wake any +/// pending `poll_next` instead of silently waiting for the next upstream +/// snapshot. pub struct Filter { inner: S, - video: Option, - audio: Option, - /// Last raw snapshot from `inner`, kept so retargeting via `set_*` can re-emit - /// without polling upstream (the foothold for future ABR retargeting). + state: conducer::Producer, + state_consumer: conducer::Consumer, + /// Last raw snapshot from `inner`, retained so a setter between snapshots + /// can re-apply without polling upstream. last_input: Option, - /// True if `set_*` has been called since the last emit and we still owe a - /// re-evaluated snapshot derived from `last_input`. - dirty: bool, + /// Epoch we already emitted against. + last_epoch: u64, + /// True once `inner` has handed us a snapshot we haven't emitted yet. + fresh_input: bool, } impl Filter { pub fn new(inner: S) -> Self { + let state = conducer::Producer::new(FilterState::default()); + let state_consumer = state.consume(); Self { inner, - video: None, - audio: None, + state, + state_consumer, last_input: None, - dirty: false, + last_epoch: 0, + fresh_input: false, } } /// Set or clear the video filter. Pass `None` to clear. pub fn set_video(&mut self, filter: impl Into>) { - self.video = filter.into(); - self.dirty = self.last_input.is_some(); + self.update(|s| s.video = filter.into()); } /// Set or clear the audio filter. Pass `None` to clear. pub fn set_audio(&mut self, filter: impl Into>) { - self.audio = filter.into(); - self.dirty = self.last_input.is_some(); + self.update(|s| s.audio = filter.into()); } - fn apply(&self, mut catalog: Catalog) -> Catalog { - if let Some(filter) = &self.video { - catalog.video.renditions.retain(|name, config| { - if let Some(want) = &filter.name - && want != name - { - return false; - } - if let Some(want) = filter.codec - && config.codec.kind() != want - { - return false; - } - true - }); - } - if let Some(filter) = &self.audio { - catalog.audio.renditions.retain(|name, config| { - if let Some(want) = &filter.name - && want != name - { - return false; - } - if let Some(want) = filter.codec - && config.codec.kind() != want - { - return false; - } - true - }); - } - catalog + fn update(&self, f: impl FnOnce(&mut FilterState)) { + // `write()` only errors when the producer is closed, which can't happen + // while `self` holds the only producer handle. + let Ok(mut state) = self.state.write() else { + return; + }; + f(&mut state); + state.epoch = state.epoch.wrapping_add(1); + // Mut::drop wakes the paired consumer waiters here. } } impl Stream for Filter { - fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { - if self.dirty { - self.dirty = false; - if let Some(snapshot) = self.last_input.clone() { - return Poll::Ready(Ok(Some(self.apply(snapshot)))); + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + let inner_eof = loop { + match self.inner.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => { + self.last_input = Some(snapshot); + self.fresh_input = true; + } + Poll::Ready(None) => break true, + Poll::Pending => break false, } - } + }; - match self.inner.poll_next(waiter)? { - Poll::Ready(Some(snapshot)) => { - self.last_input = Some(snapshot.clone()); - Poll::Ready(Ok(Some(self.apply(snapshot)))) + let last_epoch = self.last_epoch; + let fresh_input = self.fresh_input; + let last_input = self.last_input.clone(); + + let polled = self.state_consumer.poll(waiter, |state| { + let filter_changed = state.epoch != last_epoch; + if !fresh_input && !filter_changed { + return Poll::Pending; + } + let Some(input) = last_input.clone() else { + return Poll::Pending; + }; + let emit = apply(input, state.video.as_ref(), state.audio.as_ref()); + Poll::Ready((emit, state.epoch)) + }); + + match polled { + Poll::Ready(Ok((emit, epoch))) => { + self.last_epoch = epoch; + self.fresh_input = false; + Poll::Ready(Ok(Some(emit))) + } + Poll::Ready(Err(_)) => Poll::Ready(Ok(None)), + Poll::Pending => { + if inner_eof && self.last_input.is_none() { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } } - Poll::Ready(None) => Poll::Ready(Ok(None)), - Poll::Pending => Poll::Pending, } } } +/// Apply the active video / audio filters to a raw snapshot, dropping +/// renditions that don't match. Axes with no filter pass through unchanged. +fn apply(mut catalog: Catalog, video: Option<&FilterVideo>, audio: Option<&FilterAudio>) -> Catalog { + if let Some(filter) = video { + catalog.video.renditions.retain(|name, config| { + if let Some(want) = &filter.name + && want != name + { + return false; + } + if let Some(want) = filter.codec + && config.codec.kind() != want + { + return false; + } + true + }); + } + if let Some(filter) = audio { + catalog.audio.renditions.retain(|name, config| { + if let Some(want) = &filter.name + && want != name + { + return false; + } + if let Some(want) = filter.codec + && config.codec.kind() != want + { + return false; + } + true + }); + } + catalog +} + #[cfg(test)] mod test { use std::collections::BTreeMap; @@ -132,7 +190,7 @@ mod test { struct Once(Option); impl Stream for Once { - fn poll_next(&mut self, _: &conducer::Waiter) -> Poll>> { + fn poll_next(&mut self, _: &conducer::Waiter) -> Poll>> { Poll::Ready(Ok(self.0.take())) } } diff --git a/rs/moq-mux/src/catalog/msf/consumer.rs b/rs/moq-mux/src/catalog/msf/consumer.rs index a61b3b31e..97cfa9c05 100644 --- a/rs/moq-mux/src/catalog/msf/consumer.rs +++ b/rs/moq-mux/src/catalog/msf/consumer.rs @@ -25,7 +25,7 @@ impl Consumer { } /// Poll for the next catalog update, returned as a [`hang::Catalog`]. - pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { // Drain pending groups, keeping only the newest. Remember whether the track is done // so we can distinguish "more groups may arrive" from "no more groups, ever". let track_finished = loop { @@ -40,9 +40,7 @@ impl Consumer { match group.poll_read_frame(waiter)? { Poll::Ready(Some(frame)) => { self.group = None; - let json = std::str::from_utf8(&frame).context("MSF catalog frame is not valid UTF-8")?; - let msf = moq_msf::Catalog::from_str(json).context("failed to parse MSF catalog frame")?; - let catalog = from_msf(&msf)?; + let catalog = decode_frame(&frame).map_err(crate::Error::Msf)?; return Poll::Ready(Ok(Some(catalog))); } Poll::Ready(None) => self.group = None, @@ -61,11 +59,20 @@ impl Consumer { /// /// Waits for the next MSF catalog publication and returns it converted to a /// [`hang::Catalog`]. Returns `None` when the track has ended with no further updates. - pub async fn next(&mut self) -> anyhow::Result> { + pub async fn next(&mut self) -> crate::Result> { conducer::wait(|waiter| self.poll_next(waiter)).await } } +/// Decode a single MSF catalog frame into a [`hang::Catalog`]. Internal error +/// chain stays as an [`anyhow::Error`] so the layered parsing contexts survive; +/// the caller wraps it in [`crate::Error::Msf`] at the boundary. +fn decode_frame(frame: &[u8]) -> anyhow::Result { + let json = std::str::from_utf8(frame).context("MSF catalog frame is not valid UTF-8")?; + let msf = moq_msf::Catalog::from_str(json).context("failed to parse MSF catalog frame")?; + from_msf(&msf) +} + impl From for Consumer { fn from(inner: moq_net::TrackConsumer) -> Self { Self::new(inner) diff --git a/rs/moq-mux/src/catalog/stream.rs b/rs/moq-mux/src/catalog/stream.rs index fd78645f9..0dd806577 100644 --- a/rs/moq-mux/src/catalog/stream.rs +++ b/rs/moq-mux/src/catalog/stream.rs @@ -21,10 +21,10 @@ use super::{Filter, Target}; /// Stream types are required to be `Send + 'static` so they can be moved /// across threads and held inside exporters without per-call bounds. pub trait Stream: Send + 'static { - fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>>; + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>>; /// Wait for the next snapshot. - fn next(&mut self) -> impl std::future::Future>> + Send + fn next(&mut self) -> impl std::future::Future>> + Send where Self: Sized, { diff --git a/rs/moq-mux/src/catalog/target.rs b/rs/moq-mux/src/catalog/target.rs index 7fa3af57a..5f4a96706 100644 --- a/rs/moq-mux/src/catalog/target.rs +++ b/rs/moq-mux/src/catalog/target.rs @@ -33,85 +33,159 @@ pub struct TargetAudio { pub bitrate: Option, } +/// Shared state behind a [`Target`]. +/// +/// `epoch` advances on every setter so [`Target::poll_next`] can tell whether +/// the criteria changed since the last emit without diffing the structs. +#[derive(Debug, Default, Clone)] +struct TargetState { + video: Option, + audio: Option, + epoch: u64, +} + /// A [`Stream`] that picks one rendition per axis from the inner snapshot. +/// +/// Selection criteria live behind a [`conducer::Producer`], so calls to +/// [`set_video`](Self::set_video) / [`set_audio`](Self::set_audio) wake any +/// pending `poll_next` instead of silently waiting for the next upstream +/// snapshot. That makes the type usable as the foothold for bandwidth-driven +/// ABR retargeting. pub struct Target { inner: S, - video: Option, - audio: Option, + state: conducer::Producer, + state_consumer: conducer::Consumer, + /// Last raw snapshot from `inner`, retained so a target change between + /// snapshots can be re-applied without polling upstream. last_input: Option, - dirty: bool, + /// Epoch we already emitted against. If `state.epoch` advances past this + /// while `last_input` is `Some`, the next poll re-emits. + last_epoch: u64, + /// True once `inner` has handed us a snapshot we haven't emitted yet. + fresh_input: bool, } impl Target { pub fn new(inner: S) -> Self { + let state = conducer::Producer::new(TargetState::default()); + let state_consumer = state.consume(); Self { inner, - video: None, - audio: None, + state, + state_consumer, last_input: None, - dirty: false, + last_epoch: 0, + fresh_input: false, } } /// Set or clear the video target. Pass `None` to keep every rendition. pub fn set_video(&mut self, target: impl Into>) { - self.video = target.into(); - self.dirty = self.last_input.is_some(); + self.update(|s| s.video = target.into()); } /// Set or clear the audio target. Pass `None` to keep every rendition. pub fn set_audio(&mut self, target: impl Into>) { - self.audio = target.into(); - self.dirty = self.last_input.is_some(); + self.update(|s| s.audio = target.into()); + } + + fn update(&self, f: impl FnOnce(&mut TargetState)) { + // `write()` only errors when the producer is closed, which can't happen + // while `self` holds the only producer handle. + let Ok(mut state) = self.state.write() else { + return; + }; + f(&mut state); + state.epoch = state.epoch.wrapping_add(1); + // Mut::drop wakes the paired consumer waiters here. } +} + +impl Stream for Target { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + // Drain inner: the latest snapshot wins. `poll_next` registers the + // waiter on its own Pending branch. + let inner_eof = loop { + match self.inner.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => { + self.last_input = Some(snapshot); + self.fresh_input = true; + } + Poll::Ready(None) => break true, + Poll::Pending => break false, + } + }; + + // Snapshot the fields the inner closure needs so it can borrow them + // without colliding with the `&self.state_consumer` receiver. + let last_epoch = self.last_epoch; + let fresh_input = self.fresh_input; + let last_input = self.last_input.clone(); + + let polled = self.state_consumer.poll(waiter, |state| { + let target_changed = state.epoch != last_epoch; + if !fresh_input && !target_changed { + // Nothing new from inner and nothing new from caller: register + // the waiter on this consumer so the next setter wakes us. + return Poll::Pending; + } + let Some(input) = last_input.clone() else { + // Caller already retargeted, but no upstream snapshot yet to apply. + return Poll::Pending; + }; + let emit = apply(input, state.video.as_ref(), state.audio.as_ref()); + Poll::Ready((emit, state.epoch)) + }); - fn apply(&self, mut catalog: Catalog) -> Catalog { - if let Some(target) = &self.video - && let Some(name) = select_video(&catalog.video.renditions, target) - { + match polled { + Poll::Ready(Ok((emit, epoch))) => { + self.last_epoch = epoch; + self.fresh_input = false; + Poll::Ready(Ok(Some(emit))) + } + Poll::Ready(Err(_)) => { + // Producer dropped (impossible while Self holds it); treat as EOF. + Poll::Ready(Ok(None)) + } + Poll::Pending => { + if inner_eof && self.last_input.is_none() { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + } + } + } +} + +/// Apply the active video / audio targets to a raw snapshot, narrowing each +/// axis to at most one rendition. Axes with no target pass through unchanged. +fn apply(mut catalog: Catalog, video: Option<&TargetVideo>, audio: Option<&TargetAudio>) -> Catalog { + if let Some(target) = video { + if let Some(name) = select_video(&catalog.video.renditions, target) { let mut kept = BTreeMap::new(); if let Some(config) = catalog.video.renditions.remove(&name) { kept.insert(name, config); } catalog.video.renditions = kept; - } else if self.video.is_some() { + } else { catalog.video.renditions.clear(); } + } - if let Some(target) = &self.audio - && let Some(name) = select_audio(&catalog.audio.renditions, target) - { + if let Some(target) = audio { + if let Some(name) = select_audio(&catalog.audio.renditions, target) { let mut kept = BTreeMap::new(); if let Some(config) = catalog.audio.renditions.remove(&name) { kept.insert(name, config); } catalog.audio.renditions = kept; - } else if self.audio.is_some() { + } else { catalog.audio.renditions.clear(); } - - catalog } -} -impl Stream for Target { - fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { - if self.dirty { - self.dirty = false; - if let Some(snapshot) = self.last_input.clone() { - return Poll::Ready(Ok(Some(self.apply(snapshot)))); - } - } - - match self.inner.poll_next(waiter)? { - Poll::Ready(Some(snapshot)) => { - self.last_input = Some(snapshot.clone()); - Poll::Ready(Ok(Some(self.apply(snapshot)))) - } - Poll::Ready(None) => Poll::Ready(Ok(None)), - Poll::Pending => Poll::Pending, - } - } + catalog } /// Run all active video rankings and return the highest-ranked rendition diff --git a/rs/moq-mux/src/error.rs b/rs/moq-mux/src/error.rs index 3961f89b6..a2cc16c64 100644 --- a/rs/moq-mux/src/error.rs +++ b/rs/moq-mux/src/error.rs @@ -21,6 +21,15 @@ pub enum Error { /// Error parsing or building LOC frames. #[error("loc: {0}")] Loc(#[from] moq_loc::Error), + + /// Error parsing or converting an MSF catalog snapshot. + /// + /// MSF parsing pulls together moq_msf JSON decoding, base64-decoded + /// init data, mp4_atom moov walking, and codec-specific config readers. + /// Each can fail in its own way; we wrap the resulting [`anyhow::Error`] + /// rather than enumerating every leaf type. + #[error("msf: {0}")] + Msf(anyhow::Error), } /// A Result type alias for moq-mux operations. From a87fb6b2fea1202c8f1b06570d84966154720936 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 24 May 2026 15:37:16 -0700 Subject: [PATCH 5/6] moq-mux: typed msf::Error instead of wrapping anyhow Replace `Error::Msf(anyhow::Error)` with a dedicated thiserror enum in `catalog::msf::error`. Each failure path the consumer can hit now lands in a typed variant: - transport (`Moq`, `Utf8`, `Json`, `Base64`, `Mp4`) - catalog conversion (`InvalidCodec`, `Schema`, `UnsupportedAudioPackaging`) - codec config blobs (`AudioConfig` carries `kind` tag for which blob) `Schema` is the bucket for invariants the consumer enforces on top of MSF (CMAF without init_data, audio without samplerate / channelConfig, missing codec). `AudioConfig.detail` still stringifies the inner `anyhow::Error` from `codec::aac::Config::parse` / `codec::opus::Config::parse` since those existing parsers return `anyhow`; switching them is a separate refactor. The variant boundary keeps the typed surface stable. `crate::Error::Msf` now wraps `catalog::msf::Error` via `#[from]`, and `catalog::Consumer` propagates the conversion through its `Stream` impl. Added `serde_json` direct dep (was transitive through `moq_msf`) for the `Json` variant. All msf consumer tests updated to match against the typed variants. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 1 + rs/moq-mux/Cargo.toml | 1 + rs/moq-mux/src/catalog/consumer.rs | 2 +- rs/moq-mux/src/catalog/msf/consumer.rs | 213 +++++++++++++------------ rs/moq-mux/src/catalog/msf/error.rs | 70 ++++++++ rs/moq-mux/src/catalog/msf/mod.rs | 2 + rs/moq-mux/src/error.rs | 7 +- 7 files changed, 185 insertions(+), 111 deletions(-) create mode 100644 rs/moq-mux/src/catalog/msf/error.rs diff --git a/Cargo.lock b/Cargo.lock index f985bb31d..da4fac0dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3764,6 +3764,7 @@ dependencies = [ "reqwest 0.12.28", "scuffle-av1", "scuffle-h265", + "serde_json", "thiserror 2.0.18", "tokio", "tracing", diff --git a/rs/moq-mux/Cargo.toml b/rs/moq-mux/Cargo.toml index e1e686d34..55419ec62 100644 --- a/rs/moq-mux/Cargo.toml +++ b/rs/moq-mux/Cargo.toml @@ -32,6 +32,7 @@ num_enum = "0.7" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "gzip"] } scuffle-av1 = { version = "0.1.4" } scuffle-h265 = { version = "0.2.2" } +serde_json = "1" thiserror = "2" tokio = { workspace = true, features = ["macros", "fs"] } tracing = "0.1" diff --git a/rs/moq-mux/src/catalog/consumer.rs b/rs/moq-mux/src/catalog/consumer.rs index 0968517bf..301536b3a 100644 --- a/rs/moq-mux/src/catalog/consumer.rs +++ b/rs/moq-mux/src/catalog/consumer.rs @@ -40,7 +40,7 @@ impl Stream for Consumer { fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { match self { Self::Hang(c) => c.poll_next(waiter), - Self::Msf(c) => c.poll_next(waiter), + Self::Msf(c) => c.poll_next(waiter).map_err(Into::into), } } } diff --git a/rs/moq-mux/src/catalog/msf/consumer.rs b/rs/moq-mux/src/catalog/msf/consumer.rs index 97cfa9c05..9b4d5d2ba 100644 --- a/rs/moq-mux/src/catalog/msf/consumer.rs +++ b/rs/moq-mux/src/catalog/msf/consumer.rs @@ -1,10 +1,12 @@ use std::str::FromStr; use std::task::Poll; -use anyhow::Context; use base64::Engine; +use bytes::Buf; use hang::catalog::{AudioCodec, AudioConfig, Container, VideoCodec, VideoConfig}; +use super::Error; + /// A consumer for the MSF catalog track. /// /// Mirrors [`crate::catalog::hang::Consumer`] but for the MSF (MOQT Streaming Format) catalog @@ -25,7 +27,7 @@ impl Consumer { } /// Poll for the next catalog update, returned as a [`hang::Catalog`]. - pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll, Error>> { // Drain pending groups, keeping only the newest. Remember whether the track is done // so we can distinguish "more groups may arrive" from "no more groups, ever". let track_finished = loop { @@ -40,7 +42,7 @@ impl Consumer { match group.poll_read_frame(waiter)? { Poll::Ready(Some(frame)) => { self.group = None; - let catalog = decode_frame(&frame).map_err(crate::Error::Msf)?; + let catalog = decode_frame(&frame)?; return Poll::Ready(Ok(Some(catalog))); } Poll::Ready(None) => self.group = None, @@ -59,26 +61,24 @@ impl Consumer { /// /// Waits for the next MSF catalog publication and returns it converted to a /// [`hang::Catalog`]. Returns `None` when the track has ended with no further updates. - pub async fn next(&mut self) -> crate::Result> { + pub async fn next(&mut self) -> Result, Error> { conducer::wait(|waiter| self.poll_next(waiter)).await } } -/// Decode a single MSF catalog frame into a [`hang::Catalog`]. Internal error -/// chain stays as an [`anyhow::Error`] so the layered parsing contexts survive; -/// the caller wraps it in [`crate::Error::Msf`] at the boundary. -fn decode_frame(frame: &[u8]) -> anyhow::Result { - let json = std::str::from_utf8(frame).context("MSF catalog frame is not valid UTF-8")?; - let msf = moq_msf::Catalog::from_str(json).context("failed to parse MSF catalog frame")?; - from_msf(&msf) -} - impl From for Consumer { fn from(inner: moq_net::TrackConsumer) -> Self { Self::new(inner) } } +/// Decode a single MSF catalog frame into a [`hang::Catalog`]. +fn decode_frame(frame: &[u8]) -> Result { + let json = std::str::from_utf8(frame)?; + let msf = moq_msf::Catalog::from_str(json)?; + from_msf(&msf) +} + /// Convert an MSF catalog to a hang catalog. /// /// Each MSF track is mapped onto a `hang::Catalog` rendition based on its [`moq_msf::Role`]: @@ -93,7 +93,7 @@ impl From for Consumer { /// /// Fields with no representation in `hang::Catalog` (`is_live`, `render_group`, `alt_group`, /// `max_grp_sap_starting_type`, `max_obj_sap_starting_type`) are dropped. -pub(crate) fn from_msf(msf: &moq_msf::Catalog) -> anyhow::Result { +pub(crate) fn from_msf(msf: &moq_msf::Catalog) -> Result { let mut catalog = hang::Catalog::default(); for track in &msf.tracks { @@ -146,13 +146,15 @@ pub(crate) fn from_msf(msf: &moq_msf::Catalog) -> anyhow::Result /// Returns `Err` when a CMAF track is missing or has malformed `init_data`. This is an /// intentional hard error: a CMAF rendition is unusable without its `ftyp+moov` init /// segment, and silently skipping it would mask a publisher bug. -fn container_from_msf(track: &moq_msf::Track) -> anyhow::Result> { +fn container_from_msf(track: &moq_msf::Track) -> Result, Error> { match &track.packaging { // Both LOC and Legacy represent raw payloads without ISO-BMFF boxing. moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => Ok(Some(Container::Legacy)), moq_msf::Packaging::Cmaf => { - let init = decode_init_data(track)? - .with_context(|| format!("MSF CMAF track {:?} missing init_data", track.name))?; + let init = decode_init_data(track)?.ok_or_else(|| Error::Schema { + track: track.name.clone(), + reason: "CMAF track is missing init_data", + })?; #[allow(deprecated)] Ok(Some(Container::Cmaf { init, @@ -172,7 +174,7 @@ fn container_from_msf(track: &moq_msf::Track) -> anyhow::Result anyhow::Result> { +fn decode_init_data(track: &moq_msf::Track) -> Result, Error> { track .init_data .as_ref() @@ -180,7 +182,10 @@ fn decode_init_data(track: &moq_msf::Track) -> anyhow::Result anyhow::Result anyhow::Result> { +fn legacy_description(track: &moq_msf::Track) -> Result, Error> { match track.packaging { moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => decode_init_data(track), _ => Ok(None), } } -fn video_config_from_msf(track: &moq_msf::Track) -> anyhow::Result> { +fn video_config_from_msf(track: &moq_msf::Track) -> Result, Error> { // Unsupported packaging (e.g. MediaTimeline) bubbles up as Ok(None) so the caller can // skip the track with a warning rather than fail the whole catalog. let Some(container) = container_from_msf(track)? else { return Ok(None); }; - let codec_str = track - .codec - .as_deref() - .with_context(|| format!("MSF video track {:?} missing codec", track.name))?; + let codec_str = track.codec.as_deref().ok_or_else(|| Error::Schema { + track: track.name.clone(), + reason: "video track is missing the codec field", + })?; // VideoCodec::from_str returns Ok(VideoCodec::Unknown(s)) for codecs it doesn't know, // so this only fails for malformed structured codec strings (avc1.xxx, hvc1.xxx, etc.). - let codec = VideoCodec::from_str(codec_str) - .with_context(|| format!("MSF video track {:?} has invalid codec {codec_str:?}", track.name))?; + let codec = VideoCodec::from_str(codec_str).map_err(|source| Error::InvalidCodec { + track: track.name.clone(), + codec: codec_str.to_string(), + source, + })?; let mut config = VideoConfig::new(codec); config.description = legacy_description(track)?; @@ -230,17 +238,20 @@ fn video_config_from_msf(track: &moq_msf::Track) -> anyhow::Result anyhow::Result> { +fn audio_config_from_msf(track: &moq_msf::Track) -> Result, Error> { let Some(container) = container_from_msf(track)? else { return Ok(None); }; - let codec_str = track - .codec - .as_deref() - .with_context(|| format!("MSF audio track {:?} missing codec", track.name))?; - let codec = AudioCodec::from_str(codec_str) - .with_context(|| format!("MSF audio track {:?} has invalid codec {codec_str:?}", track.name))?; + let codec_str = track.codec.as_deref().ok_or_else(|| Error::Schema { + track: track.name.clone(), + reason: "audio track is missing the codec field", + })?; + let codec = AudioCodec::from_str(codec_str).map_err(|source| Error::InvalidCodec { + track: track.name.clone(), + codec: codec_str.to_string(), + source, + })?; // MSF leaves samplerate and channelConfig optional, but hang requires both. Trust the // explicit fields when present; otherwise parse the codec init data (AAC @@ -290,22 +301,19 @@ struct DerivedAudio { /// Returns an error if `init_data` is absent, malformed, or doesn't carry usable audio /// parameters. The caller is expected to surface this as a hard failure rather than /// substitute defaults: a wrong sample rate produces silent or distorted playback. -fn derive_audio_params(track: &moq_msf::Track, codec: &AudioCodec) -> anyhow::Result { - let init = decode_init_data(track)?.with_context(|| { - format!( - "MSF audio track {:?} omits samplerate/channelConfig and has no init_data to derive from", - track.name - ) +fn derive_audio_params(track: &moq_msf::Track, codec: &AudioCodec) -> Result { + let init = decode_init_data(track)?.ok_or_else(|| Error::Schema { + track: track.name.clone(), + reason: "audio track omits samplerate/channelConfig and has no init_data to derive from", })?; match track.packaging { moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => derive_from_codec_config(track, codec, init), moq_msf::Packaging::Cmaf => derive_from_cmaf_moov(track, init), - _ => anyhow::bail!( - "MSF audio track {:?} packaging {:?} is unsupported for parameter derivation", - track.name, - track.packaging - ), + _ => Err(Error::UnsupportedAudioPackaging { + track: track.name.clone(), + packaging: track.packaging.to_string(), + }), } } @@ -313,58 +321,66 @@ fn derive_from_codec_config( track: &moq_msf::Track, codec: &AudioCodec, init: bytes::Bytes, -) -> anyhow::Result { - use bytes::Buf; +) -> Result { let mut buf = init; - match codec { + let (kind, sample_rate, channel_count) = match codec { AudioCodec::AAC(_) => { - let cfg = crate::codec::aac::Config::parse(&mut buf) - .with_context(|| format!("MSF audio track {:?} has malformed AudioSpecificConfig", track.name))?; - anyhow::ensure!( - !buf.has_remaining(), - "MSF audio track {:?} AudioSpecificConfig has trailing bytes", - track.name, - ); - Ok(DerivedAudio { - sample_rate: cfg.sample_rate, - channel_count: cfg.channel_count, - }) + let cfg = crate::codec::aac::Config::parse(&mut buf).map_err(|e| Error::AudioConfig { + track: track.name.clone(), + kind: "AudioSpecificConfig", + detail: format!("{e:#}"), + })?; + ("AudioSpecificConfig", cfg.sample_rate, cfg.channel_count) } AudioCodec::Opus => { - let cfg = crate::codec::opus::Config::parse(&mut buf) - .with_context(|| format!("MSF audio track {:?} has malformed OpusHead", track.name))?; - anyhow::ensure!( - !buf.has_remaining(), - "MSF audio track {:?} OpusHead has trailing bytes", - track.name, - ); - Ok(DerivedAudio { - sample_rate: cfg.sample_rate, - channel_count: cfg.channel_count, - }) + let cfg = crate::codec::opus::Config::parse(&mut buf).map_err(|e| Error::AudioConfig { + track: track.name.clone(), + kind: "OpusHead", + detail: format!("{e:#}"), + })?; + ("OpusHead", cfg.sample_rate, cfg.channel_count) + } + _ => { + return Err(Error::AudioConfig { + track: track.name.clone(), + kind: "init_data", + detail: format!("codec {codec:?} has no init_data parser"), + }); } - _ => anyhow::bail!( - "MSF audio track {:?} omits samplerate/channelConfig; codec {:?} has no init_data parser", - track.name, - codec, - ), + }; + + if buf.has_remaining() { + return Err(Error::AudioConfig { + track: track.name.clone(), + kind, + detail: "trailing bytes after config".to_string(), + }); } + + Ok(DerivedAudio { + sample_rate, + channel_count, + }) } -fn derive_from_cmaf_moov(track: &moq_msf::Track, init: bytes::Bytes) -> anyhow::Result { +fn derive_from_cmaf_moov(track: &moq_msf::Track, init: bytes::Bytes) -> Result { use mp4_atom::{Any, DecodeMaybe}; let mut cursor = std::io::Cursor::new(init.as_ref()); let mut moov: Option = None; - while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor) - .with_context(|| format!("MSF audio track {:?} init segment is malformed", track.name))? - { + while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor).map_err(|source| Error::Mp4 { + track: track.name.clone(), + source, + })? { if let Any::Moov(m) = atom { moov = Some(m); break; } } - let moov = moov.with_context(|| format!("MSF audio track {:?} init segment missing moov", track.name))?; + let moov = moov.ok_or_else(|| Error::Schema { + track: track.name.clone(), + reason: "CMAF init segment is missing moov", + })?; // Walk every trak looking for an audio sample entry. A single-track audio init is // the only thing we expect here, but rather than enforce that we just take the first @@ -390,10 +406,10 @@ fn derive_from_cmaf_moov(track: &moq_msf::Track, init: bytes::Bytes) -> anyhow:: } } } - anyhow::bail!( - "MSF audio track {:?} CMAF init has no audio sample entry to derive samplerate/channelConfig from", - track.name, - ) + Err(Error::Schema { + track: track.name.clone(), + reason: "CMAF init segment has no audio sample entry to derive samplerate/channelConfig", + }) } #[cfg(test)] @@ -517,11 +533,7 @@ mod test { tracks: vec![track], }; let err = from_msf(&msf).expect_err("malformed base64 should error"); - assert!( - err.to_string().contains("malformed init_data"), - "unexpected error: {}", - err - ); + assert!(matches!(err, Error::Base64 { .. }), "unexpected error: {err:?}"); } #[test] @@ -546,8 +558,7 @@ mod test { }; let err = from_msf(&msf).expect_err("CMAF without init_data must error"); - let msg = format!("{err:#}"); - assert!(msg.contains("init_data"), "expected init_data in error, got: {msg}"); + assert!(matches!(err, Error::Schema { .. }), "unexpected error: {err:?}"); } #[test] @@ -604,7 +615,7 @@ mod test { }; let err = from_msf(&msf).expect_err("missing fields with no init_data should error"); - assert!(err.to_string().contains("no init_data"), "unexpected error: {}", err); + assert!(matches!(err, Error::Schema { .. }), "unexpected error: {err:?}"); } #[test] @@ -750,11 +761,7 @@ mod test { }; let err = from_msf(&msf).expect_err("missing video codec must error"); - let msg = format!("{err:#}"); - assert!( - msg.contains("missing codec"), - "expected 'missing codec' in error, got: {msg}" - ); + assert!(matches!(err, Error::Schema { .. }), "unexpected error: {err:?}"); } #[test] @@ -767,11 +774,7 @@ mod test { }; let err = from_msf(&msf).expect_err("missing audio codec must error"); - let msg = format!("{err:#}"); - assert!( - msg.contains("missing codec"), - "expected 'missing codec' in error, got: {msg}" - ); + assert!(matches!(err, Error::Schema { .. }), "unexpected error: {err:?}"); } #[test] @@ -785,7 +788,9 @@ mod test { }; let err = from_msf(&msf).expect_err("malformed avc1 codec must error"); - let msg = format!("{err:#}"); - assert!(msg.contains("avc1.0"), "expected codec string in error, got: {msg}"); + match err { + Error::InvalidCodec { codec, .. } => assert_eq!(codec, "avc1.0"), + other => panic!("expected InvalidCodec, got {other:?}"), + } } } diff --git a/rs/moq-mux/src/catalog/msf/error.rs b/rs/moq-mux/src/catalog/msf/error.rs new file mode 100644 index 000000000..3c17a6d83 --- /dev/null +++ b/rs/moq-mux/src/catalog/msf/error.rs @@ -0,0 +1,70 @@ +//! Typed errors from the MSF catalog consumer. + +/// Reasons an [`super::Consumer::poll_next`] can fail. +/// +/// MSF catalog parsing flows through several layers (transport, JSON, base64, +/// ISO-BMFF, codec-specific config blobs) and any of them can surface a +/// malformed payload. Variants are grouped by the layer that produced the +/// failure, with the originating track name carried alongside when known. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + /// Transport-layer error from the underlying track. + #[error("moq: {0}")] + Moq(#[from] moq_net::Error), + + /// Catalog frame was not valid UTF-8. + #[error("catalog frame is not valid UTF-8: {0}")] + Utf8(#[from] std::str::Utf8Error), + + /// Catalog JSON did not parse. + #[error("catalog JSON parse failed: {0}")] + Json(#[from] serde_json::Error), + + /// `init_data` bytes were not valid base64. + #[error("track {track:?} has malformed init_data: {source}")] + Base64 { + track: String, + #[source] + source: base64::DecodeError, + }, + + /// CMAF init bytes did not decode as ISO-BMFF. + #[error("CMAF track {track:?} init segment is malformed: {source}")] + Mp4 { + track: String, + #[source] + source: mp4_atom::Error, + }, + + /// Codec string was syntactically invalid. + #[error("track {track:?} has invalid codec {codec:?}: {source}")] + InvalidCodec { + track: String, + codec: String, + #[source] + source: hang::Error, + }, + + /// MSF catalog violates a schema invariant the consumer enforces (missing + /// codec, CMAF without init_data, audio without samplerate / channelConfig + /// and no init_data to derive from, etc.). `reason` is a static description + /// of which invariant was violated. + #[error("track {track:?}: {reason}")] + Schema { track: String, reason: &'static str }, + + /// Audio packaging is unsupported for parameter derivation (samplerate / + /// channelConfig must be supplied explicitly for these tracks). + #[error("audio track {track:?} packaging {packaging:?} is unsupported for parameter derivation")] + UnsupportedAudioPackaging { track: String, packaging: String }, + + /// Codec-specific audio config blob (AAC `AudioSpecificConfig`, Opus + /// `OpusHead`, CMAF audio sample entry) failed to parse. `kind` tags which + /// kind of config was being read; `detail` carries the parser's message. + #[error("audio track {track:?}: {kind}: {detail}")] + AudioConfig { + track: String, + kind: &'static str, + detail: String, + }, +} diff --git a/rs/moq-mux/src/catalog/msf/mod.rs b/rs/moq-mux/src/catalog/msf/mod.rs index 595e19cd3..5dbcd7f32 100644 --- a/rs/moq-mux/src/catalog/msf/mod.rs +++ b/rs/moq-mux/src/catalog/msf/mod.rs @@ -7,5 +7,7 @@ //! producer's job (it writes both tracks). mod consumer; +mod error; pub use consumer::Consumer; +pub use error::Error; diff --git a/rs/moq-mux/src/error.rs b/rs/moq-mux/src/error.rs index a2cc16c64..98915a5ea 100644 --- a/rs/moq-mux/src/error.rs +++ b/rs/moq-mux/src/error.rs @@ -23,13 +23,8 @@ pub enum Error { Loc(#[from] moq_loc::Error), /// Error parsing or converting an MSF catalog snapshot. - /// - /// MSF parsing pulls together moq_msf JSON decoding, base64-decoded - /// init data, mp4_atom moov walking, and codec-specific config readers. - /// Each can fail in its own way; we wrap the resulting [`anyhow::Error`] - /// rather than enumerating every leaf type. #[error("msf: {0}")] - Msf(anyhow::Error), + Msf(#[from] crate::catalog::msf::Error), } /// A Result type alias for moq-mux operations. From 945c87522bb24578ee690b9829afdffa51a4f316 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 28 May 2026 14:14:48 -0700 Subject: [PATCH 6/6] moq-mux: drop unused serde_json dep after merge The MSF catalog error type from dev parses frames without serde_json's error conversion, so the dependency is no longer referenced. Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 1 - rs/moq-mux/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73daaac35..99a85bf3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3756,7 +3756,6 @@ dependencies = [ "reqwest 0.12.28", "scuffle-av1", "scuffle-h265", - "serde_json", "thiserror 2.0.18", "tokio", "tracing", diff --git a/rs/moq-mux/Cargo.toml b/rs/moq-mux/Cargo.toml index efad616d9..2de859118 100644 --- a/rs/moq-mux/Cargo.toml +++ b/rs/moq-mux/Cargo.toml @@ -31,7 +31,6 @@ num_enum = "0.7" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "gzip"] } scuffle-av1 = { version = "0.1.4" } scuffle-h265 = { version = "0.2.2" } -serde_json = "1" thiserror = "2" tokio = { workspace = true, features = ["macros", "fs"] } tracing = "0.1"