diff --git a/rs/hang/src/catalog/audio/codec.rs b/rs/hang/src/catalog/audio/codec.rs index fb6309e81..8f303a36a 100644 --- a/rs/hang/src/catalog/audio/codec.rs +++ b/rs/hang/src/catalog/audio/codec.rs @@ -20,6 +20,26 @@ pub enum AudioCodec { Unknown(String), } +/// Coarse audio codec family, used for tag-only matching. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[non_exhaustive] +pub enum AudioCodecKind { + AAC, + Opus, + Unknown, +} + +impl AudioCodec { + /// Return the coarse codec family for tag-only matching. + pub fn kind(&self) -> AudioCodecKind { + match self { + Self::AAC(_) => AudioCodecKind::AAC, + Self::Opus => AudioCodecKind::Opus, + Self::Unknown(_) => AudioCodecKind::Unknown, + } + } +} + impl FromStr for AudioCodec { type Err = Error; diff --git a/rs/hang/src/catalog/video/codec.rs b/rs/hang/src/catalog/video/codec.rs index 76eb228c9..8fa206b8e 100644 --- a/rs/hang/src/catalog/video/codec.rs +++ b/rs/hang/src/catalog/video/codec.rs @@ -29,6 +29,32 @@ pub enum VideoCodec { Unknown(String), } +/// Coarse video codec family, used for tag-only matching. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[non_exhaustive] +pub enum VideoCodecKind { + H264, + H265, + VP8, + VP9, + AV1, + Unknown, +} + +impl VideoCodec { + /// Return the coarse codec family for tag-only matching. + pub fn kind(&self) -> VideoCodecKind { + match self { + Self::H264(_) => VideoCodecKind::H264, + Self::H265(_) => VideoCodecKind::H265, + Self::VP9(_) => VideoCodecKind::VP9, + Self::AV1(_) => VideoCodecKind::AV1, + Self::VP8 => VideoCodecKind::VP8, + Self::Unknown(_) => VideoCodecKind::Unknown, + } + } +} + impl FromStr for VideoCodec { type Err = Error; diff --git a/rs/moq-cli/src/subscribe.rs b/rs/moq-cli/src/subscribe.rs index e1c0804cf..27111611f 100644 --- a/rs/moq-cli/src/subscribe.rs +++ b/rs/moq-cli/src/subscribe.rs @@ -1,14 +1,19 @@ use std::time::Duration; use clap::ValueEnum; +use hang::catalog::{AudioCodecKind, VideoCodecKind}; use hang::moq_net; -use moq_mux::catalog::CatalogFormat; +use moq_mux::catalog::{self, CatalogFormat, FilterAudio, FilterVideo, Stream, TargetAudio, TargetVideo}; use tokio::io::AsyncWriteExt; #[derive(ValueEnum, Clone, Copy)] pub enum SubscribeFormat { Fmp4, Mkv, + /// H.264 Annex-B elementary stream (no container). + H264, + /// H.265 Annex-B elementary stream (no container). + H265, } /// `clap` adapter for [`CatalogFormat`] (which is `#[non_exhaustive]` and so @@ -28,6 +33,44 @@ impl From for CatalogFormat { } } +/// `clap` adapter for [`VideoCodecKind`]. +#[derive(ValueEnum, Clone, Copy)] +pub enum VideoCodecArg { + H264, + H265, + Vp8, + Vp9, + Av1, +} + +impl From for VideoCodecKind { + fn from(value: VideoCodecArg) -> Self { + match value { + VideoCodecArg::H264 => Self::H264, + VideoCodecArg::H265 => Self::H265, + VideoCodecArg::Vp8 => Self::VP8, + VideoCodecArg::Vp9 => Self::VP9, + VideoCodecArg::Av1 => Self::AV1, + } + } +} + +/// `clap` adapter for [`AudioCodecKind`]. +#[derive(ValueEnum, Clone, Copy)] +pub enum AudioCodecArg { + Aac, + Opus, +} + +impl From for AudioCodecKind { + fn from(value: AudioCodecArg) -> Self { + match value { + AudioCodecArg::Aac => Self::AAC, + AudioCodecArg::Opus => Self::Opus, + } + } +} + #[derive(clap::Args, Clone)] pub struct SubscribeArgs { /// The format to write to stdout. @@ -52,6 +95,42 @@ pub struct SubscribeArgs { /// (`.hang` -> hang, `.msf` -> msf), falling back to hang. #[arg(long)] pub catalog: Option, + + /// Pick the video rendition with this exact name. + #[arg(long)] + pub video_name: Option, + + /// Keep only video renditions whose codec family matches. + #[arg(long)] + pub video_codec: Option, + + /// Prefer a video rendition no wider than this (px). + #[arg(long)] + pub video_width_max: Option, + + /// Prefer a video rendition no taller than this (px). + #[arg(long)] + pub video_height_max: Option, + + /// Prefer a video rendition with at most this many pixels (`coded_width * coded_height`). + #[arg(long)] + pub video_pixels_max: Option, + + /// Prefer a video rendition under this bitrate (bits per second). + #[arg(long)] + pub video_bitrate_max: Option, + + /// Pick the audio rendition with this exact name. + #[arg(long)] + pub audio_name: Option, + + /// Keep only audio renditions whose codec family matches. + #[arg(long)] + pub audio_codec: Option, + + /// Prefer an audio rendition under this bitrate (bits per second). + #[arg(long)] + pub audio_bitrate_max: Option, } impl SubscribeArgs { @@ -63,6 +142,72 @@ impl SubscribeArgs { .or_else(|| CatalogFormat::detect(broadcast)) .unwrap_or_default() } + + /// Codec implied by the output format. `--format h264` / `--format h265` + /// each force a single codec family; container formats leave it open. + fn format_codec(&self) -> Option { + match self.format { + SubscribeFormat::H264 => Some(VideoCodecKind::H264), + SubscribeFormat::H265 => Some(VideoCodecKind::H265), + SubscribeFormat::Fmp4 | SubscribeFormat::Mkv => None, + } + } + + /// Build a video filter from the parsed flags, plus any codec defaulted by + /// the chosen output format (e.g. `--format h264` implies `codec = H264`). + /// + /// Errors if `--video-codec` contradicts the format-implied codec — fail + /// fast in the CLI rather than later in the exporter. + fn filter_video(&self) -> anyhow::Result> { + let user_codec = self.video_codec.map(VideoCodecKind::from); + let codec = match (self.format_codec(), user_codec) { + (Some(fmt), Some(user)) if fmt != user => { + anyhow::bail!( + "--format implies video codec {fmt:?}, but --video-codec {user:?} was passed; \ + remove --video-codec or pick a matching format" + ); + } + (Some(fmt), _) => Some(fmt), + (None, user) => user, + }; + if self.video_name.is_none() && codec.is_none() { + return Ok(None); + } + Ok(Some(FilterVideo { + name: self.video_name.clone(), + codec, + })) + } + + fn filter_audio(&self) -> Option { + if self.audio_name.is_none() && self.audio_codec.is_none() { + return None; + } + Some(FilterAudio { + name: self.audio_name.clone(), + codec: self.audio_codec.map(Into::into), + }) + } + + fn target_video(&self) -> Option { + if self.video_width_max.is_none() + && self.video_height_max.is_none() + && self.video_pixels_max.is_none() + && self.video_bitrate_max.is_none() + { + return None; + } + Some(TargetVideo { + width: self.video_width_max, + height: self.video_height_max, + pixels: self.video_pixels_max, + bitrate: self.video_bitrate_max, + }) + } + + fn target_audio(&self) -> Option { + self.audio_bitrate_max.map(|b| TargetAudio { bitrate: Some(b) }) + } } pub struct Subscribe { @@ -80,20 +225,35 @@ impl Subscribe { } } + /// Build the catalog stream from the configured filter/target flags. + fn stream(&self) -> anyhow::Result>> { + let consumer = catalog::Consumer::new(&self.broadcast, self.catalog)?; + + let mut filter = consumer.filter(); + filter.set_video(self.args.filter_video()?); + filter.set_audio(self.args.filter_audio()); + + let mut target = filter.target(); + target.set_video(self.args.target_video()); + target.set_audio(self.args.target_audio()); + + Ok(target) + } + pub async fn run(self) -> anyhow::Result<()> { match self.args.format { SubscribeFormat::Fmp4 => self.run_fmp4().await, SubscribeFormat::Mkv => self.run_mkv().await, + SubscribeFormat::H264 => self.run_h264().await, + SubscribeFormat::H265 => self.run_h265().await, } } async fn run_fmp4(self) -> anyhow::Result<()> { let mut stdout = tokio::io::stdout(); - // Fmp4 subscribes to the catalog internally, builds the merged init segment - // from the first catalog snapshot, then yields moof+mdat fragments in - // timestamp order across tracks. - let mut fmp4 = moq_mux::container::fmp4::Export::with_catalog_format(self.broadcast, self.catalog)? + let stream = self.stream()?; + let mut fmp4 = moq_mux::container::fmp4::Export::new(self.broadcast.clone(), stream) .with_latency(self.args.max_latency) .with_fragment_duration(self.args.fragment_duration); @@ -108,10 +268,8 @@ impl Subscribe { async fn run_mkv(self) -> anyhow::Result<()> { let mut stdout = tokio::io::stdout(); - // Mkv writes EBML + an unknown-size Segment header, then per-fragment - // Cluster elements. Avc3/Hev1 sources are transcoded to avc1/hvc1 - // shape internally (synthesizing avcC/hvcC from inline parameter sets). - let mut mkv = moq_mux::container::mkv::Export::with_catalog_format(self.broadcast, self.catalog)? + let stream = self.stream()?; + let mut mkv = moq_mux::container::mkv::Export::new(self.broadcast.clone(), stream) .with_latency(self.args.max_latency) .with_fragment_duration(self.args.fragment_duration); @@ -122,4 +280,34 @@ impl Subscribe { Ok(()) } + + async fn run_h264(self) -> anyhow::Result<()> { + let mut stdout = tokio::io::stdout(); + + let stream = self.stream()?; + let mut h264 = + moq_mux::codec::h264::Export::new(self.broadcast.clone(), stream).with_latency(self.args.max_latency); + + while let Some(chunk) = h264.next().await? { + stdout.write_all(&chunk).await?; + stdout.flush().await?; + } + + Ok(()) + } + + async fn run_h265(self) -> anyhow::Result<()> { + let mut stdout = tokio::io::stdout(); + + let stream = self.stream()?; + let mut h265 = + moq_mux::codec::h265::Export::new(self.broadcast.clone(), stream).with_latency(self.args.max_latency); + + while let Some(chunk) = h265.next().await? { + stdout.write_all(&chunk).await?; + stdout.flush().await?; + } + + Ok(()) + } } diff --git a/rs/moq-mux/src/catalog/consumer.rs b/rs/moq-mux/src/catalog/consumer.rs new file mode 100644 index 000000000..301536b3a --- /dev/null +++ b/rs/moq-mux/src/catalog/consumer.rs @@ -0,0 +1,46 @@ +//! Unified catalog consumer. +//! +//! Subscribes to whichever catalog track ([`hang`] or [`msf`]) the broadcast +//! advertises and yields [`hang::Catalog`] snapshots so callers and exporters +//! only deal with one shape. + +use std::task::Poll; + +use hang::Catalog; + +use super::{CatalogFormat, Stream}; + +/// A catalog stream sourced from a [`moq_net::BroadcastConsumer`]. +/// +/// Both variants emit [`hang::Catalog`]; the MSF variant converts each snapshot +/// on the fly. Wrap with [`Filter`](super::Filter) / [`Target`](super::Target) +/// to narrow the rendition set before handing the stream to an exporter. +pub enum Consumer { + Hang(super::hang::Consumer), + Msf(super::msf::Consumer), +} + +impl Consumer { + /// Subscribe to the catalog track advertised by `format`. + pub fn new(broadcast: &moq_net::BroadcastConsumer, format: CatalogFormat) -> Result { + Ok(match format { + CatalogFormat::Hang => { + let track = broadcast.subscribe_track(&hang::Catalog::default_track())?; + Self::Hang(super::hang::Consumer::new(track)) + } + CatalogFormat::Msf => { + let track = broadcast.subscribe_track(&moq_net::Track::new(moq_msf::DEFAULT_NAME))?; + Self::Msf(super::msf::Consumer::new(track)) + } + }) + } +} + +impl Stream for Consumer { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + match self { + Self::Hang(c) => c.poll_next(waiter), + Self::Msf(c) => c.poll_next(waiter).map_err(Into::into), + } + } +} diff --git a/rs/moq-mux/src/catalog/filter.rs b/rs/moq-mux/src/catalog/filter.rs new file mode 100644 index 000000000..00596d0af --- /dev/null +++ b/rs/moq-mux/src/catalog/filter.rs @@ -0,0 +1,309 @@ +//! Hard-match rendition filter. +//! +//! [`Filter`] wraps any [`Stream`] and drops renditions that don't satisfy a +//! [`FilterVideo`] / [`FilterAudio`]. Matching is exact: a `name` constraint +//! keeps only the rendition with that key, a `codec` constraint keeps only +//! renditions whose codec family matches. Multiple constraints intersect. + +use std::task::Poll; + +use hang::Catalog; +use hang::catalog::{AudioCodecKind, VideoCodecKind}; + +use super::Stream; + +/// Hard-match criteria for video renditions. +#[derive(Debug, Default, Clone)] +pub struct FilterVideo { + /// Keep only the rendition with this exact name. + pub name: Option, + /// Keep only renditions whose codec family matches. + pub codec: Option, +} + +/// Hard-match criteria for audio renditions. +#[derive(Debug, Default, Clone)] +pub struct FilterAudio { + /// Keep only the rendition with this exact name. + pub name: Option, + /// Keep only renditions whose codec family matches. + pub codec: Option, +} + +/// Shared state behind a [`Filter`]. +/// +/// `epoch` advances on every setter so [`Filter::poll_next`] can tell whether +/// the criteria changed since the last emit. +#[derive(Debug, Default, Clone)] +struct FilterState { + video: Option, + audio: Option, + epoch: u64, +} + +/// A [`Stream`] that drops renditions failing a [`FilterVideo`] / [`FilterAudio`]. +/// +/// Selection criteria live behind a [`conducer::Producer`], so calls to +/// [`set_video`](Self::set_video) / [`set_audio`](Self::set_audio) wake any +/// pending `poll_next` instead of silently waiting for the next upstream +/// snapshot. +pub struct Filter { + inner: S, + state: conducer::Producer, + state_consumer: conducer::Consumer, + /// Last raw snapshot from `inner`, retained so a setter between snapshots + /// can re-apply without polling upstream. + last_input: Option, + /// Epoch we already emitted against. + last_epoch: u64, + /// True once `inner` has handed us a snapshot we haven't emitted yet. + fresh_input: bool, +} + +impl Filter { + pub fn new(inner: S) -> Self { + let state = conducer::Producer::new(FilterState::default()); + let state_consumer = state.consume(); + Self { + inner, + state, + state_consumer, + last_input: None, + last_epoch: 0, + fresh_input: false, + } + } + + /// Set or clear the video filter. Pass `None` to clear. + pub fn set_video(&mut self, filter: impl Into>) { + self.update(|s| s.video = filter.into()); + } + + /// Set or clear the audio filter. Pass `None` to clear. + pub fn set_audio(&mut self, filter: impl Into>) { + self.update(|s| s.audio = filter.into()); + } + + fn update(&self, f: impl FnOnce(&mut FilterState)) { + // `write()` only errors when the producer is closed, which can't happen + // while `self` holds the only producer handle. + let Ok(mut state) = self.state.write() else { + return; + }; + f(&mut state); + state.epoch = state.epoch.wrapping_add(1); + // Mut::drop wakes the paired consumer waiters here. + } +} + +impl Stream for Filter { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + let inner_eof = loop { + match self.inner.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => { + self.last_input = Some(snapshot); + self.fresh_input = true; + } + Poll::Ready(None) => break true, + Poll::Pending => break false, + } + }; + + let last_epoch = self.last_epoch; + let fresh_input = self.fresh_input; + let last_input = self.last_input.clone(); + + let polled = self.state_consumer.poll(waiter, |state| { + let filter_changed = state.epoch != last_epoch; + if !fresh_input && !filter_changed { + return Poll::Pending; + } + let Some(input) = last_input.clone() else { + return Poll::Pending; + }; + let emit = apply(input, state.video.as_ref(), state.audio.as_ref()); + Poll::Ready((emit, state.epoch)) + }); + + match polled { + Poll::Ready(Ok((emit, epoch))) => { + self.last_epoch = epoch; + self.fresh_input = false; + Poll::Ready(Ok(Some(emit))) + } + Poll::Ready(Err(_)) => Poll::Ready(Ok(None)), + Poll::Pending => { + if inner_eof && self.last_input.is_none() { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + } + } + } +} + +/// Apply the active video / audio filters to a raw snapshot, dropping +/// renditions that don't match. Axes with no filter pass through unchanged. +fn apply(mut catalog: Catalog, video: Option<&FilterVideo>, audio: Option<&FilterAudio>) -> Catalog { + if let Some(filter) = video { + catalog.video.renditions.retain(|name, config| { + if let Some(want) = &filter.name + && want != name + { + return false; + } + if let Some(want) = filter.codec + && config.codec.kind() != want + { + return false; + } + true + }); + } + if let Some(filter) = audio { + catalog.audio.renditions.retain(|name, config| { + if let Some(want) = &filter.name + && want != name + { + return false; + } + if let Some(want) = filter.codec + && config.codec.kind() != want + { + return false; + } + true + }); + } + catalog +} + +#[cfg(test)] +mod test { + use std::collections::BTreeMap; + + use hang::catalog::{AudioCodec, AudioConfig, Container, H264, VideoConfig}; + + use super::*; + + struct Once(Option); + + impl Stream for Once { + fn poll_next(&mut self, _: &conducer::Waiter) -> Poll>> { + Poll::Ready(Ok(self.0.take())) + } + } + + fn h264(name: &str) -> (String, VideoConfig) { + let mut config = VideoConfig::new(H264 { + profile: 0x42, + constraints: 0, + level: 0x1e, + inline: false, + }); + config.coded_width = Some(640); + config.coded_height = Some(360); + config.bitrate = Some(500_000); + config.framerate = Some(30.0); + config.container = Container::Legacy; + (name.to_string(), config) + } + + fn opus(name: &str) -> (String, AudioConfig) { + let mut config = AudioConfig::new(AudioCodec::Opus, 48_000, 2); + config.bitrate = Some(128_000); + config.container = Container::Legacy; + (name.to_string(), config) + } + + fn catalog_with(video: Vec<(String, VideoConfig)>, audio: Vec<(String, AudioConfig)>) -> Catalog { + let mut c = Catalog::default(); + c.video.renditions = BTreeMap::from_iter(video); + c.audio.renditions = BTreeMap::from_iter(audio); + c + } + + #[test] + fn codec_filter_keeps_matching() { + let mut hd = h264("hd"); + hd.1.codec = hang::catalog::VP9 { + profile: 0, + level: 10, + bit_depth: 8, + chroma_subsampling: 1, + color_primaries: 1, + transfer_characteristics: 1, + matrix_coefficients: 1, + full_range: false, + } + .into(); + let snapshot = catalog_with(vec![h264("lo"), hd], vec![]); + + let mut f = Filter::new(Once(Some(snapshot))); + f.set_video(FilterVideo { + codec: Some(VideoCodecKind::H264), + ..Default::default() + }); + + let out = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("expected snapshot, got {other:?}"), + }; + assert_eq!(out.video.renditions.keys().collect::>(), vec!["lo"]); + } + + #[test] + fn name_filter_exact() { + let snapshot = catalog_with(vec![h264("lo"), h264("hi")], vec![]); + let mut f = Filter::new(Once(Some(snapshot))); + f.set_video(FilterVideo { + name: Some("hi".into()), + ..Default::default() + }); + let out = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("got {other:?}"), + }; + assert_eq!(out.video.renditions.keys().collect::>(), vec!["hi"]); + } + + #[test] + fn audio_filter_independent_of_video() { + let snapshot = catalog_with(vec![h264("hi")], vec![opus("en"), opus("es")]); + let mut f = Filter::new(Once(Some(snapshot))); + f.set_audio(FilterAudio { + name: Some("es".into()), + ..Default::default() + }); + let out = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("got {other:?}"), + }; + assert_eq!(out.video.renditions.keys().collect::>(), vec!["hi"]); + assert_eq!(out.audio.renditions.keys().collect::>(), vec!["es"]); + } + + #[test] + fn set_video_after_snapshot_reemits() { + let snapshot = catalog_with(vec![h264("lo"), h264("hi")], vec![]); + let mut f = Filter::new(Once(Some(snapshot))); + + let first = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("got {other:?}"), + }; + assert_eq!(first.video.renditions.len(), 2); + + f.set_video(FilterVideo { + name: Some("hi".into()), + ..Default::default() + }); + + let again = match f.poll_next(&conducer::Waiter::noop()) { + Poll::Ready(Ok(Some(c))) => c, + other => panic!("expected re-emit, got {other:?}"), + }; + assert_eq!(again.video.renditions.keys().collect::>(), vec!["hi"]); + } +} diff --git a/rs/moq-mux/src/catalog/mod.rs b/rs/moq-mux/src/catalog/mod.rs index 203b416b7..417fbb25b 100644 --- a/rs/moq-mux/src/catalog/mod.rs +++ b/rs/moq-mux/src/catalog/mod.rs @@ -10,9 +10,25 @@ //! Publishing through [`hang::Producer`] writes both tracks together; //! subscribers pick one based on the broadcast's filename suffix. See //! [`CatalogFormat`] for the suffix-to-format mapping. +//! +//! On the consume side, [`Consumer`] is the unified entry point: it +//! subscribes to whichever catalog track `format` advertises and yields +//! [`::hang::Catalog`] snapshots. Wrap it with [`Filter`] (hard match on +//! name / codec family) or [`Target`] (soft match picking one rendition +//! per axis) to narrow the set before handing it to an exporter; both +//! also implement [`Stream`] so they compose either direction. pub mod hang; pub mod msf; +mod consumer; +mod filter; mod format; +mod stream; +mod target; + +pub use consumer::Consumer; +pub use filter::{Filter, FilterAudio, FilterVideo}; pub use format::*; +pub use stream::Stream; +pub use target::{Target, TargetAudio, TargetVideo}; diff --git a/rs/moq-mux/src/catalog/stream.rs b/rs/moq-mux/src/catalog/stream.rs new file mode 100644 index 000000000..0dd806577 --- /dev/null +++ b/rs/moq-mux/src/catalog/stream.rs @@ -0,0 +1,51 @@ +//! Catalog stream trait. +//! +//! [`Stream`] yields a sequence of [`hang::Catalog`] snapshots. Both the +//! raw [`Consumer`](super::Consumer) and the rendition-selecting +//! [`Filter`](super::Filter) / [`Target`](super::Target) wrappers implement +//! it, so exporters can be written against the trait and the caller picks +//! the selection policy. + +use std::task::Poll; + +use hang::Catalog; + +use super::{Filter, Target}; + +/// A stream of catalog snapshots. +/// +/// `poll_next` returns the next snapshot (a full catalog, not a delta), or +/// `None` once the underlying track has ended. Late snapshots supersede +/// earlier ones, so an implementation may drop intermediate snapshots. +/// +/// Stream types are required to be `Send + 'static` so they can be moved +/// across threads and held inside exporters without per-call bounds. +pub trait Stream: Send + 'static { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>>; + + /// Wait for the next snapshot. + fn next(&mut self) -> impl std::future::Future>> + Send + where + Self: Sized, + { + async move { conducer::wait(|waiter| self.poll_next(waiter)).await } + } + + /// Wrap this stream in a [`Filter`] that drops renditions which don't + /// match a hard-match criterion (name or codec family). + fn filter(self) -> Filter + where + Self: Sized, + { + Filter::new(self) + } + + /// Wrap this stream in a [`Target`] that reduces each axis to at most + /// one rendition by soft-matching against width / height / bitrate. + fn target(self) -> Target + where + Self: Sized, + { + Target::new(self) + } +} diff --git a/rs/moq-mux/src/catalog/target.rs b/rs/moq-mux/src/catalog/target.rs new file mode 100644 index 000000000..5f4a96706 --- /dev/null +++ b/rs/moq-mux/src/catalog/target.rs @@ -0,0 +1,469 @@ +//! Soft-match rendition target. +//! +//! [`Target`] wraps any [`Stream`] and reduces each axis (video / audio) to at +//! most one rendition by ranking the input against constraints like maximum +//! width, height, pixels, or bitrate. The ranking algorithm is a Rust port of +//! [js/watch's `#select`](js/watch/src/video/source.ts). + +use std::collections::BTreeMap; +use std::task::Poll; + +use hang::Catalog; +use hang::catalog::{AudioConfig, VideoConfig}; + +use super::Stream; + +/// Soft-match constraints for the video rendition. +/// +/// Each `Option` is a *maximum* the selection will try to stay under. When a +/// rendition fits every active maximum, the largest such rendition wins; if +/// nothing fits, the algorithm degrades to the smallest over-budget rendition +/// (per constraint) and intersects across constraints. +#[derive(Debug, Default, Clone)] +pub struct TargetVideo { + pub width: Option, + pub height: Option, + pub pixels: Option, + pub bitrate: Option, +} + +/// Soft-match constraints for the audio rendition. +#[derive(Debug, Default, Clone)] +pub struct TargetAudio { + pub bitrate: Option, +} + +/// Shared state behind a [`Target`]. +/// +/// `epoch` advances on every setter so [`Target::poll_next`] can tell whether +/// the criteria changed since the last emit without diffing the structs. +#[derive(Debug, Default, Clone)] +struct TargetState { + video: Option, + audio: Option, + epoch: u64, +} + +/// A [`Stream`] that picks one rendition per axis from the inner snapshot. +/// +/// Selection criteria live behind a [`conducer::Producer`], so calls to +/// [`set_video`](Self::set_video) / [`set_audio`](Self::set_audio) wake any +/// pending `poll_next` instead of silently waiting for the next upstream +/// snapshot. That makes the type usable as the foothold for bandwidth-driven +/// ABR retargeting. +pub struct Target { + inner: S, + state: conducer::Producer, + state_consumer: conducer::Consumer, + /// Last raw snapshot from `inner`, retained so a target change between + /// snapshots can be re-applied without polling upstream. + last_input: Option, + /// Epoch we already emitted against. If `state.epoch` advances past this + /// while `last_input` is `Some`, the next poll re-emits. + last_epoch: u64, + /// True once `inner` has handed us a snapshot we haven't emitted yet. + fresh_input: bool, +} + +impl Target { + pub fn new(inner: S) -> Self { + let state = conducer::Producer::new(TargetState::default()); + let state_consumer = state.consume(); + Self { + inner, + state, + state_consumer, + last_input: None, + last_epoch: 0, + fresh_input: false, + } + } + + /// Set or clear the video target. Pass `None` to keep every rendition. + pub fn set_video(&mut self, target: impl Into>) { + self.update(|s| s.video = target.into()); + } + + /// Set or clear the audio target. Pass `None` to keep every rendition. + pub fn set_audio(&mut self, target: impl Into>) { + self.update(|s| s.audio = target.into()); + } + + fn update(&self, f: impl FnOnce(&mut TargetState)) { + // `write()` only errors when the producer is closed, which can't happen + // while `self` holds the only producer handle. + let Ok(mut state) = self.state.write() else { + return; + }; + f(&mut state); + state.epoch = state.epoch.wrapping_add(1); + // Mut::drop wakes the paired consumer waiters here. + } +} + +impl Stream for Target { + fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + // Drain inner: the latest snapshot wins. `poll_next` registers the + // waiter on its own Pending branch. + let inner_eof = loop { + match self.inner.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => { + self.last_input = Some(snapshot); + self.fresh_input = true; + } + Poll::Ready(None) => break true, + Poll::Pending => break false, + } + }; + + // Snapshot the fields the inner closure needs so it can borrow them + // without colliding with the `&self.state_consumer` receiver. + let last_epoch = self.last_epoch; + let fresh_input = self.fresh_input; + let last_input = self.last_input.clone(); + + let polled = self.state_consumer.poll(waiter, |state| { + let target_changed = state.epoch != last_epoch; + if !fresh_input && !target_changed { + // Nothing new from inner and nothing new from caller: register + // the waiter on this consumer so the next setter wakes us. + return Poll::Pending; + } + let Some(input) = last_input.clone() else { + // Caller already retargeted, but no upstream snapshot yet to apply. + return Poll::Pending; + }; + let emit = apply(input, state.video.as_ref(), state.audio.as_ref()); + Poll::Ready((emit, state.epoch)) + }); + + match polled { + Poll::Ready(Ok((emit, epoch))) => { + self.last_epoch = epoch; + self.fresh_input = false; + Poll::Ready(Ok(Some(emit))) + } + Poll::Ready(Err(_)) => { + // Producer dropped (impossible while Self holds it); treat as EOF. + Poll::Ready(Ok(None)) + } + Poll::Pending => { + if inner_eof && self.last_input.is_none() { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + } + } + } +} + +/// Apply the active video / audio targets to a raw snapshot, narrowing each +/// axis to at most one rendition. Axes with no target pass through unchanged. +fn apply(mut catalog: Catalog, video: Option<&TargetVideo>, audio: Option<&TargetAudio>) -> Catalog { + if let Some(target) = video { + if let Some(name) = select_video(&catalog.video.renditions, target) { + let mut kept = BTreeMap::new(); + if let Some(config) = catalog.video.renditions.remove(&name) { + kept.insert(name, config); + } + catalog.video.renditions = kept; + } else { + catalog.video.renditions.clear(); + } + } + + if let Some(target) = audio { + if let Some(name) = select_audio(&catalog.audio.renditions, target) { + let mut kept = BTreeMap::new(); + if let Some(config) = catalog.audio.renditions.remove(&name) { + kept.insert(name, config); + } + catalog.audio.renditions = kept; + } else { + catalog.audio.renditions.clear(); + } + } + + catalog +} + +/// Run all active video rankings and return the highest-ranked rendition +/// present in every ranking, or `None` if the intersection is empty. +fn select_video(renditions: &BTreeMap, target: &TargetVideo) -> Option { + if renditions.is_empty() { + return None; + } + if renditions.len() == 1 { + return renditions.keys().next().cloned(); + } + + let mut rankings: Vec> = Vec::new(); + if let Some(max) = target.pixels { + rankings.push(by_pixels(renditions, max)); + } + if target.width.is_some() || target.height.is_some() { + rankings.push(by_dimensions(renditions, target.width, target.height)); + } + if let Some(max) = target.bitrate { + rankings.push(by_video_bitrate(renditions, max)); + } + + if rankings.is_empty() { + return Some(best_video(renditions)); + } + + intersect_rankings(rankings) +} + +fn select_audio(renditions: &BTreeMap, target: &TargetAudio) -> Option { + if renditions.is_empty() { + return None; + } + if renditions.len() == 1 { + return renditions.keys().next().cloned(); + } + + let mut rankings: Vec> = Vec::new(); + if let Some(max) = target.bitrate { + rankings.push(by_audio_bitrate(renditions, max)); + } + + if rankings.is_empty() { + return Some(best_audio(renditions)); + } + + intersect_rankings(rankings) +} + +/// Pick the first name from `rankings[0]` that appears in every other ranking. +fn intersect_rankings(rankings: Vec>) -> Option { + use std::collections::HashSet; + let sets: Vec> = rankings.iter().map(|r| r.iter().collect()).collect(); + for name in &rankings[0] { + if sets.iter().all(|s| s.contains(name)) { + return Some(name.clone()); + } + } + tracing::warn!("conflicting rendition targets, no rendition satisfies all criteria"); + None +} + +/// Rank by area, largest-first within budget; fall back to single smallest +/// over-budget if nothing fits. Renditions without resolution metadata are +/// returned unranked when no rendition has any metadata at all (mirrors the JS). +fn by_pixels(renditions: &BTreeMap, max: u32) -> Vec { + let mut within: Vec<(String, u32)> = Vec::new(); + let mut rest: Vec<(String, u32)> = Vec::new(); + + for (name, config) in renditions { + if let (Some(w), Some(h)) = (config.coded_width, config.coded_height) { + let size = w.saturating_mul(h); + if size <= max { + within.push((name.clone(), size)); + } else { + rest.push((name.clone(), size)); + } + } + } + + within.sort_by_key(|b| std::cmp::Reverse(b.1)); + if !within.is_empty() { + return within.into_iter().map(|(n, _)| n).collect(); + } + + rest.sort_by_key(|a| a.1); + if let Some(smallest) = rest.into_iter().next() { + return vec![smallest.0]; + } + + renditions.keys().cloned().collect() +} + +fn by_dimensions(renditions: &BTreeMap, width: Option, height: Option) -> Vec { + let mut within: Vec<(String, u32)> = Vec::new(); + let mut rest: Vec<(String, u32)> = Vec::new(); + + for (name, config) in renditions { + let (Some(w), Some(h)) = (config.coded_width, config.coded_height) else { + continue; + }; + let size = w.saturating_mul(h); + let fits_w = width.is_none_or(|cap| w <= cap); + let fits_h = height.is_none_or(|cap| h <= cap); + if fits_w && fits_h { + within.push((name.clone(), size)); + } else { + rest.push((name.clone(), size)); + } + } + + within.sort_by_key(|b| std::cmp::Reverse(b.1)); + if !within.is_empty() { + return within.into_iter().map(|(n, _)| n).collect(); + } + + rest.sort_by_key(|a| a.1); + if let Some(smallest) = rest.into_iter().next() { + return vec![smallest.0]; + } + + renditions.keys().cloned().collect() +} + +fn by_video_bitrate(renditions: &BTreeMap, max: u64) -> Vec { + let mut within: Vec<(String, u64)> = Vec::new(); + let mut rest: Vec<(String, u64)> = Vec::new(); + for (name, config) in renditions { + if let Some(b) = config.bitrate { + if b <= max { + within.push((name.clone(), b)); + } else { + rest.push((name.clone(), b)); + } + } + } + within.sort_by_key(|b| std::cmp::Reverse(b.1)); + if !within.is_empty() { + return within.into_iter().map(|(n, _)| n).collect(); + } + rest.sort_by_key(|a| a.1); + if let Some(smallest) = rest.into_iter().next() { + return vec![smallest.0]; + } + renditions.keys().cloned().collect() +} + +fn by_audio_bitrate(renditions: &BTreeMap, max: u64) -> Vec { + let mut within: Vec<(String, u64)> = Vec::new(); + let mut rest: Vec<(String, u64)> = Vec::new(); + for (name, config) in renditions { + if let Some(b) = config.bitrate { + if b <= max { + within.push((name.clone(), b)); + } else { + rest.push((name.clone(), b)); + } + } + } + within.sort_by_key(|b| std::cmp::Reverse(b.1)); + if !within.is_empty() { + return within.into_iter().map(|(n, _)| n).collect(); + } + rest.sort_by_key(|a| a.1); + if let Some(smallest) = rest.into_iter().next() { + return vec![smallest.0]; + } + renditions.keys().cloned().collect() +} + +/// With no constraints, prefer the largest resolution then the highest bitrate. +fn best_video(renditions: &BTreeMap) -> String { + renditions + .iter() + .max_by_key(|(_, c)| { + let area = c.coded_width.unwrap_or(0).saturating_mul(c.coded_height.unwrap_or(0)) as u64; + (area, c.bitrate.unwrap_or(0)) + }) + .map(|(n, _)| n.clone()) + .expect("renditions non-empty checked by caller") +} + +fn best_audio(renditions: &BTreeMap) -> String { + renditions + .iter() + .max_by_key(|(_, c)| c.bitrate.unwrap_or(0)) + .map(|(n, _)| n.clone()) + .expect("renditions non-empty checked by caller") +} + +#[cfg(test)] +mod test { + use hang::catalog::{Container, H264, VideoConfig}; + + use super::*; + + fn vid(name: &str, w: u32, h: u32, bitrate: u64) -> (String, VideoConfig) { + let mut config = VideoConfig::new(H264 { + profile: 0x42, + constraints: 0, + level: 0x1e, + inline: false, + }); + config.coded_width = Some(w); + config.coded_height = Some(h); + config.bitrate = Some(bitrate); + config.framerate = Some(30.0); + config.container = Container::Legacy; + (name.to_string(), config) + } + + fn map(items: Vec<(String, VideoConfig)>) -> BTreeMap { + BTreeMap::from_iter(items) + } + + #[test] + fn pick_largest_under_width_cap() { + let renditions = map(vec![ + vid("sd", 640, 360, 500_000), + vid("hd", 1280, 720, 2_500_000), + vid("fhd", 1920, 1080, 6_000_000), + ]); + let target = TargetVideo { + width: Some(1280), + ..Default::default() + }; + assert_eq!(select_video(&renditions, &target).as_deref(), Some("hd")); + } + + #[test] + fn pick_largest_under_bitrate_cap() { + let renditions = map(vec![ + vid("sd", 640, 360, 500_000), + vid("hd", 1280, 720, 2_500_000), + vid("fhd", 1920, 1080, 6_000_000), + ]); + let target = TargetVideo { + bitrate: Some(3_000_000), + ..Default::default() + }; + assert_eq!(select_video(&renditions, &target).as_deref(), Some("hd")); + } + + #[test] + fn degrade_to_smallest_over_budget() { + let renditions = map(vec![vid("hd", 1280, 720, 2_500_000), vid("fhd", 1920, 1080, 6_000_000)]); + let target = TargetVideo { + bitrate: Some(100_000), + ..Default::default() + }; + assert_eq!(select_video(&renditions, &target).as_deref(), Some("hd")); + } + + #[test] + fn no_constraints_picks_largest() { + let renditions = map(vec![ + vid("sd", 640, 360, 500_000), + vid("hd", 1280, 720, 2_500_000), + vid("fhd", 1920, 1080, 6_000_000), + ]); + let target = TargetVideo::default(); + assert_eq!(select_video(&renditions, &target).as_deref(), Some("fhd")); + } + + #[test] + fn width_and_bitrate_intersect() { + let renditions = map(vec![ + vid("sd", 640, 360, 500_000), + vid("hd", 1280, 720, 2_500_000), + vid("fhd", 1920, 1080, 6_000_000), + ]); + let target = TargetVideo { + width: Some(1920), + bitrate: Some(1_000_000), + ..Default::default() + }; + // width allows all, bitrate allows only sd. + assert_eq!(select_video(&renditions, &target).as_deref(), Some("sd")); + } +} diff --git a/rs/moq-mux/src/codec/annexb.rs b/rs/moq-mux/src/codec/annexb.rs index d05a72609..b6fdaae5a 100644 --- a/rs/moq-mux/src/codec/annexb.rs +++ b/rs/moq-mux/src/codec/annexb.rs @@ -1,4 +1,4 @@ -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; pub const START_CODE: Bytes = Bytes::from_static(&[0, 0, 0, 1]); @@ -11,10 +11,65 @@ pub enum Error { #[error("invalid Annex B start code")] InvalidStartCode, + + #[error("invalid avc1/hvc1 length size {0}")] + InvalidLengthSize(usize), + + #[error("truncated length-prefixed NAL unit")] + Truncated, } pub type Result = std::result::Result; +/// Convert a length-prefixed NALU payload (avc1 / hvc1 wire shape) to Annex-B, +/// optionally prepending `prefix` bytes (typically VPS/SPS/PPS NAL units already +/// in Annex-B form, for keyframe parameter-set injection). +pub fn from_length_prefixed(payload: &[u8], length_size: usize, prefix: Option<&[u8]>) -> Result { + if !(1..=4).contains(&length_size) { + return Err(Error::InvalidLengthSize(length_size)); + } + + let mut out = BytesMut::with_capacity(payload.len() + prefix.map(|p| p.len()).unwrap_or(0) + 16); + if let Some(p) = prefix { + out.extend_from_slice(p); + } + + let mut pos = 0; + while pos < payload.len() { + let after_prefix = pos.checked_add(length_size).ok_or(Error::Truncated)?; + if payload.len() < after_prefix { + return Err(Error::Truncated); + } + let mut len = 0usize; + for byte in &payload[pos..after_prefix] { + len = (len << 8) | (*byte as usize); + } + let after_nal = after_prefix.checked_add(len).ok_or(Error::Truncated)?; + if payload.len() < after_nal { + return Err(Error::Truncated); + } + out.extend_from_slice(&START_CODE); + out.extend_from_slice(&payload[after_prefix..after_nal]); + pos = after_nal; + } + + Ok(out.freeze()) +} + +/// Concatenate `start_code | nal` for every NAL in `nals` and freeze the +/// result. Used to build a keyframe parameter-set prefix for an Annex-B +/// elementary stream. +pub fn build_prefix<'a, I: IntoIterator>(nals: I) -> Bytes { + let nals: Vec<&Bytes> = nals.into_iter().collect(); + let total: usize = nals.iter().map(|n| n.len() + START_CODE.len()).sum(); + let mut out = BytesMut::with_capacity(total); + for nal in nals { + out.extend_from_slice(&START_CODE); + out.extend_from_slice(nal); + } + out.freeze() +} + pub struct NalIterator<'a, T: Buf + AsRef<[u8]> + 'a> { buf: &'a mut T, start: Option, diff --git a/rs/moq-mux/src/codec/h264/export.rs b/rs/moq-mux/src/codec/h264/export.rs new file mode 100644 index 000000000..601f62ac1 --- /dev/null +++ b/rs/moq-mux/src/codec/h264/export.rs @@ -0,0 +1,183 @@ +//! H.264 single-rendition Annex-B exporter. +//! +//! Subscribes to one H.264 rendition from a catalog-narrowed stream and emits +//! a raw Annex-B elementary stream. Suitable for piping into `ffmpeg`, decoder +//! fuzzers, or recording one codec to disk. There is no container framing +//! (timestamps are dropped). +//! +//! Two source shapes are accepted: +//! - **avc3** (catalog `description` empty): payload is already Annex-B with +//! SPS/PPS inline. Pass through unchanged. +//! - **avc1** (catalog `description` is the avcC): length-prefixed NALUs. +//! Length prefixes are replaced with `00 00 00 01` start codes; SPS/PPS +//! extracted from the avcC are injected ahead of every keyframe. + +use std::task::Poll; +use std::time::Duration; + +use bytes::Bytes; +use hang::Catalog; +use hang::catalog::{VideoCodecKind, VideoConfig}; + +use crate::catalog::Stream; +use crate::codec::annexb; +use crate::container::ExportSource; + +/// Single-rendition H.264 Annex-B exporter. +pub struct Export { + broadcast: moq_net::BroadcastConsumer, + catalog: Option, + latency: Duration, + track: Option, +} + +struct H264Track { + name: String, + /// Snapshot of the catalog config we built `source` from. Cached so that + /// a catalog update which keeps the same rendition name but changes the + /// codec config (e.g. a new avcC) triggers a full rebuild instead of + /// silently reusing a stale `convert`. + config: VideoConfig, + source: ExportSource, + /// `Some` for an avc1 source: SPS/PPS prefix prebuilt from the avcC, and + /// the avcC length-prefix size. `None` for an avc3 source: Annex-B passes + /// through without conversion. + convert: Option, +} + +struct Avc1Convert { + length_size: usize, + keyframe_prefix: Bytes, +} + +impl Export { + /// Subscribe to `broadcast` and emit an Annex-B H.264 byte stream. + /// + /// `catalog` is expected to be narrowed to a single H.264 rendition (e.g. + /// `consumer.filter()` with `codec = H264` then `.target()` for ABR + /// selection). Renditions of other codecs are ignored; if multiple H.264 + /// renditions appear in a snapshot, the first by BTreeMap order wins and + /// a warning is logged. + pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + Self { + broadcast, + catalog: Some(catalog), + latency: Duration::ZERO, + track: None, + } + } + + /// Set the maximum buffering latency for the per-track source. + pub fn with_latency(mut self, latency: Duration) -> Self { + self.latency = latency; + self + } + + pub async fn next(&mut self) -> crate::Result> { + conducer::wait(|waiter| self.poll_next(waiter)).await + } + + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + while let Some(catalog) = self.catalog.as_mut() { + match catalog.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => self.update_catalog(&snapshot)?, + Poll::Ready(None) => { + self.catalog = None; + break; + } + Poll::Pending => break, + } + } + + loop { + let Some(track) = self.track.as_mut() else { + if self.catalog.is_none() { + return Poll::Ready(Ok(None)); + } + return Poll::Pending; + }; + + match track.source.poll_read(waiter) { + Poll::Ready(Ok(Some(frame))) => { + let bytes = match &track.convert { + None => frame.payload, + Some(convert) => { + let prefix = frame.keyframe.then(|| convert.keyframe_prefix.as_ref()); + annexb::from_length_prefixed(&frame.payload, convert.length_size, prefix)? + } + }; + if bytes.is_empty() { + continue; + } + return Poll::Ready(Ok(Some(bytes))); + } + Poll::Ready(Ok(None)) => { + self.track = None; + continue; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + } + } + } + + fn update_catalog(&mut self, catalog: &Catalog) -> crate::Result<()> { + let picked = catalog + .video + .renditions + .iter() + .filter(|(_, c)| c.codec.kind() == VideoCodecKind::H264) + .collect::>(); + + if picked.len() > 1 { + tracing::warn!( + count = picked.len(), + "multiple H.264 renditions in catalog snapshot; using the first by name. \ + Narrow with catalog::Target to pick one explicitly." + ); + } + + let Some((name, config)) = picked.into_iter().next() else { + self.track = None; + return Ok(()); + }; + + if self + .track + .as_ref() + .is_some_and(|t| t.name == *name && t.config == *config) + { + return Ok(()); + } + + let source = ExportSource::for_video_raw(&self.broadcast, name, config, self.latency)?; + let convert = match config.description.as_ref().filter(|d| !d.is_empty()) { + None => None, + Some(avcc) => { + let params = super::parse_avcc_param_sets(avcc)?; + if params.sps.is_empty() || params.pps.is_empty() { + return Err(super::Error::MissingParamSets { + name: name.clone(), + sps: params.sps.len(), + pps: params.pps.len(), + } + .into()); + } + let prefix = annexb::build_prefix(params.sps.iter().chain(params.pps.iter())); + Some(Avc1Convert { + length_size: params.length_size, + keyframe_prefix: prefix, + }) + } + }; + + self.track = Some(H264Track { + name: name.clone(), + config: config.clone(), + source, + convert, + }); + + Ok(()) + } +} diff --git a/rs/moq-mux/src/codec/h264/mod.rs b/rs/moq-mux/src/codec/h264/mod.rs index 5e574ae2d..08cdb0882 100644 --- a/rs/moq-mux/src/codec/h264/mod.rs +++ b/rs/moq-mux/src/codec/h264/mod.rs @@ -3,11 +3,15 @@ //! Parses SPS NAL units and AVCDecoderConfigurationRecord blobs into //! catalog-ready fields. The [`Avc1`] transmuxer rewrites Annex-B input //! (inline SPS/PPS) as length-prefixed NALU + out-of-band avcC, which is -//! what every CMAF and MKV consumer expects. [`Import`] is the importer; -//! it auto-detects either wire shape from the leading bytes. +//! what every CMAF and MKV consumer expects. [`Export`] subscribes to a +//! catalog-narrowed H.264 rendition and emits an Annex-B elementary +//! stream; [`Import`] is the importer (auto-detects either wire shape +//! from the leading bytes). +mod export; mod import; +pub use export::*; pub use import::*; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -29,6 +33,12 @@ pub enum Error { #[error("AVCDecoderConfigurationRecord too short")] AvccTooShort, + #[error("AVCDecoderConfigurationRecord truncated")] + AvccTruncated, + + #[error("avc1 description for rendition {name:?} is missing SPS or PPS (sps={sps}, pps={pps})")] + MissingParamSets { name: String, sps: usize, pps: usize }, + #[error("SPS too large for avcC length field ({0} > {max})", max = u16::MAX)] SpsTooLarge(usize), @@ -190,6 +200,58 @@ pub(crate) fn build_avcc(sps_nal: &[u8], pps_nal: &[u8]) -> Result { Ok(out.freeze()) } +/// SPS and PPS NAL units extracted from an avcC. +#[derive(Debug, Clone)] +pub struct AvccParamSets { + /// NALU length size in bytes (typically 4). + pub length_size: usize, + pub sps: Vec, + pub pps: Vec, +} + +/// Pull the SPS and PPS NAL units out of an AVCDecoderConfigurationRecord. +pub fn parse_avcc_param_sets(avcc: &[u8]) -> Result { + if avcc.len() < 7 { + return Err(Error::AvccTooShort); + } + let length_size = (avcc[4] & 0x03) as usize + 1; + let num_sps = (avcc[5] & 0x1f) as usize; + + let mut pos = 6; + let sps = read_param_sets(avcc, &mut pos, num_sps)?; + + if avcc.len() <= pos { + return Err(Error::AvccTruncated); + } + let num_pps = avcc[pos] as usize; + pos += 1; + + let pps = read_param_sets(avcc, &mut pos, num_pps)?; + + Ok(AvccParamSets { length_size, sps, pps }) +} + +/// Read `count` length-prefixed (u16) NAL units from `buf` starting at `*pos`, +/// advancing `*pos` past the last one. All arithmetic is checked so malformed +/// configs surface as errors rather than panics. +fn read_param_sets(buf: &[u8], pos: &mut usize, count: usize) -> Result> { + let mut out = Vec::with_capacity(count); + for _ in 0..count { + let after_len = pos.checked_add(2).ok_or(Error::AvccTruncated)?; + if buf.len() < after_len { + return Err(Error::AvccTruncated); + } + let len = u16::from_be_bytes([buf[*pos], buf[*pos + 1]]) as usize; + let after_nal = after_len.checked_add(len).ok_or(Error::AvccTruncated)?; + if buf.len() < after_nal { + return Err(Error::AvccTruncated); + } + out.push(Bytes::copy_from_slice(&buf[after_len..after_nal])); + *pos = after_nal; + } + Ok(out) +} + /// Transform H.264 frames from Annex-B (inline SPS/PPS, "avc3") to /// length-prefixed NALU (out-of-band AVCDecoderConfigurationRecord, "avc1"). /// diff --git a/rs/moq-mux/src/codec/h265/export.rs b/rs/moq-mux/src/codec/h265/export.rs new file mode 100644 index 000000000..ee5ba64a0 --- /dev/null +++ b/rs/moq-mux/src/codec/h265/export.rs @@ -0,0 +1,175 @@ +//! H.265 single-rendition Annex-B exporter. +//! +//! HEVC analogue of [`crate::codec::h264::Export`]. Accepts either a hev1 +//! (Annex-B, parameter sets inline) or hvc1 (length-prefixed + out-of-band +//! hvcC) source and emits a raw Annex-B elementary stream. Timestamps are +//! dropped. + +use std::task::Poll; +use std::time::Duration; + +use bytes::Bytes; +use hang::Catalog; +use hang::catalog::{VideoCodecKind, VideoConfig}; + +use crate::catalog::Stream; +use crate::codec::annexb; +use crate::container::ExportSource; + +/// Single-rendition H.265 Annex-B exporter. +pub struct Export { + broadcast: moq_net::BroadcastConsumer, + catalog: Option, + latency: Duration, + track: Option, +} + +struct H265Track { + name: String, + /// Snapshot of the catalog config we built `source` from. Cached so that + /// a catalog update which keeps the same rendition name but changes the + /// codec config (e.g. a new hvcC) triggers a full rebuild instead of + /// silently reusing a stale `convert`. + config: VideoConfig, + source: ExportSource, + /// `Some` for an hvc1 source: VPS/SPS/PPS prefix prebuilt from the hvcC, + /// and the hvcC length-prefix size. `None` for a hev1 source: Annex-B + /// passes through without conversion. + convert: Option, +} + +struct Hvc1Convert { + length_size: usize, + keyframe_prefix: Bytes, +} + +impl Export { + /// Subscribe to `broadcast` and emit an Annex-B H.265 byte stream. + /// + /// `catalog` is expected to be narrowed to a single H.265 rendition. If + /// multiple H.265 renditions appear in a snapshot, the first by BTreeMap + /// order wins and a warning is logged. + pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + Self { + broadcast, + catalog: Some(catalog), + latency: Duration::ZERO, + track: None, + } + } + + /// Set the maximum buffering latency for the per-track source. + pub fn with_latency(mut self, latency: Duration) -> Self { + self.latency = latency; + self + } + + pub async fn next(&mut self) -> crate::Result> { + conducer::wait(|waiter| self.poll_next(waiter)).await + } + + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + while let Some(catalog) = self.catalog.as_mut() { + match catalog.poll_next(waiter)? { + Poll::Ready(Some(snapshot)) => self.update_catalog(&snapshot)?, + Poll::Ready(None) => { + self.catalog = None; + break; + } + Poll::Pending => break, + } + } + + loop { + let Some(track) = self.track.as_mut() else { + if self.catalog.is_none() { + return Poll::Ready(Ok(None)); + } + return Poll::Pending; + }; + + match track.source.poll_read(waiter) { + Poll::Ready(Ok(Some(frame))) => { + let bytes = match &track.convert { + None => frame.payload, + Some(convert) => { + let prefix = frame.keyframe.then(|| convert.keyframe_prefix.as_ref()); + annexb::from_length_prefixed(&frame.payload, convert.length_size, prefix)? + } + }; + if bytes.is_empty() { + continue; + } + return Poll::Ready(Ok(Some(bytes))); + } + Poll::Ready(Ok(None)) => { + self.track = None; + continue; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + } + } + } + + fn update_catalog(&mut self, catalog: &Catalog) -> crate::Result<()> { + let picked = catalog + .video + .renditions + .iter() + .filter(|(_, c)| c.codec.kind() == VideoCodecKind::H265) + .collect::>(); + + if picked.len() > 1 { + tracing::warn!( + count = picked.len(), + "multiple H.265 renditions in catalog snapshot; using the first by name. \ + Narrow with catalog::Target to pick one explicitly." + ); + } + + let Some((name, config)) = picked.into_iter().next() else { + self.track = None; + return Ok(()); + }; + + if self + .track + .as_ref() + .is_some_and(|t| t.name == *name && t.config == *config) + { + return Ok(()); + } + + let source = ExportSource::for_video_raw(&self.broadcast, name, config, self.latency)?; + let convert = match config.description.as_ref().filter(|d| !d.is_empty()) { + None => None, + Some(hvcc) => { + let params = super::parse_hvcc_param_sets(hvcc)?; + if params.vps.is_empty() || params.sps.is_empty() || params.pps.is_empty() { + return Err(super::Error::MissingParamSets { + name: name.clone(), + vps: params.vps.len(), + sps: params.sps.len(), + pps: params.pps.len(), + } + .into()); + } + let prefix = annexb::build_prefix(params.vps.iter().chain(params.sps.iter()).chain(params.pps.iter())); + Some(Hvc1Convert { + length_size: params.length_size, + keyframe_prefix: prefix, + }) + } + }; + + self.track = Some(H265Track { + name: name.clone(), + config: config.clone(), + source, + convert, + }); + + Ok(()) + } +} diff --git a/rs/moq-mux/src/codec/h265/mod.rs b/rs/moq-mux/src/codec/h265/mod.rs index 8139731c7..b9c95d516 100644 --- a/rs/moq-mux/src/codec/h265/mod.rs +++ b/rs/moq-mux/src/codec/h265/mod.rs @@ -3,10 +3,13 @@ //! The H.265 analogue of [`crate::codec::h264`]. Parses SPS NAL units //! and HEVCDecoderConfigurationRecord blobs. The [`Hvc1`] transmuxer //! rewrites Annex-B input (inline VPS/SPS/PPS) as length-prefixed NALU -//! + out-of-band hvcC. [`Import`] is the Annex-B importer. +//! + out-of-band hvcC. [`Export`] is the single-rendition Annex-B +//! exporter; [`Import`] is the Annex-B importer. +mod export; mod import; +pub use export::*; pub use import::*; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -43,12 +46,88 @@ pub enum Error { #[error("missing timestamp")] MissingTimestamp, + #[error("HEVCDecoderConfigurationRecord too short")] + HvccTooShort, + + #[error("HEVCDecoderConfigurationRecord truncated")] + HvccTruncated, + + #[error("hvc1 description for rendition {name:?} is missing VPS, SPS, or PPS (vps={vps}, sps={sps}, pps={pps})")] + MissingParamSets { + name: String, + vps: usize, + sps: usize, + pps: usize, + }, + #[error("annexb: {0}")] Annexb(#[from] crate::codec::annexb::Error), } pub type Result = std::result::Result; +/// VPS, SPS, and PPS NAL units extracted from an hvcC. +#[derive(Debug, Clone)] +pub struct HvccParamSets { + /// NALU length size in bytes (typically 4). + pub length_size: usize, + pub vps: Vec, + pub sps: Vec, + pub pps: Vec, +} + +/// Pull the VPS/SPS/PPS NAL units out of an HEVCDecoderConfigurationRecord. +pub fn parse_hvcc_param_sets(hvcc: &[u8]) -> Result { + if hvcc.len() < 23 { + return Err(Error::HvccTooShort); + } + let length_size = (hvcc[21] & 0x3) as usize + 1; + let num_arrays = hvcc[22] as usize; + + let mut vps = Vec::new(); + let mut sps = Vec::new(); + let mut pps = Vec::new(); + let mut pos: usize = 23; + + for _ in 0..num_arrays { + let after_hdr = pos.checked_add(3).ok_or(Error::HvccTruncated)?; + if hvcc.len() < after_hdr { + return Err(Error::HvccTruncated); + } + let nal_type = hvcc[pos] & 0x3f; + let num_nalus = u16::from_be_bytes([hvcc[pos + 1], hvcc[pos + 2]]) as usize; + pos = after_hdr; + + for _ in 0..num_nalus { + let after_len = pos.checked_add(2).ok_or(Error::HvccTruncated)?; + if hvcc.len() < after_len { + return Err(Error::HvccTruncated); + } + let len = u16::from_be_bytes([hvcc[pos], hvcc[pos + 1]]) as usize; + let after_nal = after_len.checked_add(len).ok_or(Error::HvccTruncated)?; + if hvcc.len() < after_nal { + return Err(Error::HvccTruncated); + } + let bytes = Bytes::copy_from_slice(&hvcc[after_len..after_nal]); + pos = after_nal; + + match NALUnitType::from(nal_type) { + NALUnitType::VpsNut => vps.push(bytes), + NALUnitType::SpsNut => sps.push(bytes), + NALUnitType::PpsNut => pps.push(bytes), + _ => {} + } + } + } + + Ok(HvccParamSets { + length_size, + vps, + sps, + pps, + }) +} + /// Annex-B → length-prefixed transmuxer; the H.265 analogue of /// [`crate::codec::h264::Avc1`]. pub struct Hvc1 { diff --git a/rs/moq-mux/src/container/fmp4/export.rs b/rs/moq-mux/src/container/fmp4/export.rs index 44e1bf79d..24e4e6d6d 100644 --- a/rs/moq-mux/src/container/fmp4/export.rs +++ b/rs/moq-mux/src/container/fmp4/export.rs @@ -7,12 +7,11 @@ use hang::catalog::{Catalog, Container, VideoConfig}; use mp4_atom::{DecodeMaybe, Encode}; use crate::Result; -use crate::catalog::CatalogFormat; +use crate::catalog::Stream; +use crate::container::ExportSource; use crate::container::Frame; use crate::container::fmp4::Error; -use crate::container::{CatalogSource, ExportSource}; - /// Subscribe to a moq broadcast and produce a single fMP4 / CMAF byte stream. /// /// Built from a [`moq_net::BroadcastConsumer`], `Export` subscribes to the hang catalog, @@ -27,9 +26,9 @@ use crate::container::{CatalogSource, ExportSource}; /// keyframes); [`with_fragment_duration`](Self::with_fragment_duration) caps the /// fragment duration for downstream consumers that throttle by fragment rate. /// Returns `None` when the broadcast ends. -pub struct Export { +pub struct Export { broadcast: moq_net::BroadcastConsumer, - catalog: Option, + catalog: Option, latency: Duration, fragment_duration: Option, @@ -65,26 +64,16 @@ struct Fmp4Track { sequence_number: u32, } -impl Export { - /// Subscribe to `broadcast` and produce fMP4 byte chunks, using the default - /// catalog format ([`CatalogFormat::Hang`]). - /// - /// Use [`with_catalog_format`](Self::with_catalog_format) to subscribe to a - /// non-default catalog track (e.g. MSF). - pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result { - Self::with_catalog_format(broadcast, CatalogFormat::default()) - } - - /// Subscribe to `broadcast` and produce fMP4 byte chunks, selecting an - /// explicit `catalog_format` for track discovery. +impl Export { + /// Subscribe to `broadcast` and produce fMP4 byte chunks, driving track + /// (un)subscription from `catalog`. /// - /// Both formats drive the same internal `hang::Catalog`-based pipeline (MSF - /// snapshots are converted on receipt), so the only observable difference - /// is which wire catalog track is consumed. - pub fn with_catalog_format(broadcast: moq_net::BroadcastConsumer, catalog_format: CatalogFormat) -> Result { - let catalog = CatalogSource::new(&broadcast, catalog_format)?; - - Ok(Self { + /// `catalog` is any [`Stream`] of catalog snapshots, typically a + /// [`catalog::Consumer`](crate::catalog::Consumer) directly, or wrapped in + /// [`catalog::Filter`](crate::catalog::Filter) / + /// [`catalog::Target`](crate::catalog::Target) to narrow the rendition set. + pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + Self { broadcast, catalog: Some(catalog), latency: Duration::ZERO, @@ -92,7 +81,7 @@ impl Export { tracks: HashMap::new(), catalog_snapshot: None, init_emitted: false, - }) + } } /// Set the maximum buffering latency for each per-track source. diff --git a/rs/moq-mux/src/container/fmp4/export_test.rs b/rs/moq-mux/src/container/fmp4/export_test.rs index fa17cfd39..50283cdc7 100644 --- a/rs/moq-mux/src/container/fmp4/export_test.rs +++ b/rs/moq-mux/src/container/fmp4/export_test.rs @@ -58,7 +58,9 @@ async fn avc3_source_to_cmaf_export_roundtrip() { .unwrap(); track_producer.finish().unwrap(); - let mut exporter = crate::container::fmp4::Export::new(consumer).expect("new Fmp4"); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::fmp4::Export::new(consumer, catalog_stream); let init = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()) .await @@ -121,7 +123,9 @@ async fn cmaf_source_to_cmaf_export_passthrough() { let mut buf = BytesMut::from(data.as_slice()); let _ = importer.decode(&mut buf); - let mut exporter = crate::container::fmp4::Export::new(consumer).expect("new Fmp4"); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::fmp4::Export::new(consumer, catalog_stream); let init = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()) .await diff --git a/rs/moq-mux/src/container/mkv/export.rs b/rs/moq-mux/src/container/mkv/export.rs index 539c3db19..10fc9d02f 100644 --- a/rs/moq-mux/src/container/mkv/export.rs +++ b/rs/moq-mux/src/container/mkv/export.rs @@ -9,12 +9,11 @@ use webm_iterable::matroska_spec::{Master, MatroskaSpec}; use webm_iterable::{WebmWriter, WriteOptions}; use crate::Result; -use crate::catalog::CatalogFormat; +use crate::catalog::Stream; +use crate::container::ExportSource; use crate::container::Frame; use crate::container::mkv::Error; -use crate::container::{CatalogSource, ExportSource}; - /// Matroska TimestampScale: 1 ms (in nanoseconds). const TIMESTAMP_SCALE_NS: u64 = 1_000_000; @@ -45,9 +44,9 @@ const TIMESTAMP_SCALE_NS: u64 = 1_000_000; /// /// Only Legacy-container tracks (raw codec payloads) are supported. CMAF tracks /// (moof+mdat passthrough) are rejected with a clear error. -pub struct Export { +pub struct Export { broadcast: moq_net::BroadcastConsumer, - catalog: Option, + catalog: Option, latency: Duration, fragment_duration: Option, @@ -152,26 +151,16 @@ impl ClusterBuilder { } } -impl Export { - /// Subscribe to `broadcast` and produce MKV byte chunks, using the default - /// catalog format ([`CatalogFormat::Hang`]). - /// - /// Use [`with_catalog_format`](Self::with_catalog_format) to subscribe to a - /// non-default catalog track (e.g. MSF). - pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result { - Self::with_catalog_format(broadcast, CatalogFormat::default()) - } - - /// Subscribe to `broadcast` and produce MKV byte chunks, selecting an - /// explicit `catalog_format` for track discovery. +impl Export { + /// Subscribe to `broadcast` and produce MKV byte chunks, driving track + /// (un)subscription from `catalog`. /// - /// Both formats drive the same internal `hang::Catalog`-based pipeline (MSF - /// snapshots are converted on receipt), so the only observable difference - /// is which wire catalog track is consumed. - pub fn with_catalog_format(broadcast: moq_net::BroadcastConsumer, catalog_format: CatalogFormat) -> Result { - let catalog = CatalogSource::new(&broadcast, catalog_format)?; - - Ok(Self { + /// `catalog` is any [`Stream`] of catalog snapshots, typically a + /// [`catalog::Consumer`](crate::catalog::Consumer) directly, or wrapped in + /// [`catalog::Filter`](crate::catalog::Filter) / + /// [`catalog::Target`](crate::catalog::Target) to narrow the rendition set. + pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + Self { broadcast, catalog: Some(catalog), latency: Duration::ZERO, @@ -180,7 +169,7 @@ impl Export { catalog_snapshot: None, header_emitted: false, cluster: None, - }) + } } /// Set the maximum buffering latency for each per-track source. diff --git a/rs/moq-mux/src/container/mkv/export_test.rs b/rs/moq-mux/src/container/mkv/export_test.rs index aa118dec2..a896d89ab 100644 --- a/rs/moq-mux/src/container/mkv/export_test.rs +++ b/rs/moq-mux/src/container/mkv/export_test.rs @@ -28,7 +28,9 @@ async fn export_header_roundtrip_vp9_opus() { importer.finish().unwrap(); // Now subscribe via the exporter and pull bytes. - let mut exporter = crate::container::mkv::Export::new(consumer).unwrap(); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::mkv::Export::new(consumer, catalog_stream); // First `next()` should give us the header (EBML + Segment-start + Info + Tracks). let header = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()) @@ -153,8 +155,9 @@ async fn export_emits_blocks_for_each_frame() { importer.decode(&mut buf).unwrap(); importer.finish().unwrap(); - let mut exporter = crate::container::mkv::Export::new(consumer) - .unwrap() + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::mkv::Export::new(consumer, catalog_stream) // Use per-frame clustering so each frame is observable as its own // Cluster chunk; batching is exercised in a dedicated test below. .with_fragment_duration(std::time::Duration::ZERO); @@ -240,7 +243,9 @@ async fn export_rejects_cmaf_track() { }; catalog.lock().video.renditions.insert(track.name.clone(), config); - let mut exporter = crate::container::mkv::Export::new(consumer).unwrap(); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::mkv::Export::new(consumer, catalog_stream); let result = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()) .await .expect("exporter timed out"); @@ -314,9 +319,10 @@ async fn export_avc3_source_synthesizes_avcc_and_length_prefixes() { let mut catalog = catalog; catalog.finish().unwrap(); - let mut exporter = crate::container::mkv::Export::new(consumer) - .unwrap() - .with_fragment_duration(std::time::Duration::ZERO); + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = + crate::container::mkv::Export::new(consumer, catalog_stream).with_fragment_duration(std::time::Duration::ZERO); let mut exported: Vec = Vec::new(); let mut held_producer = Some(producer); @@ -449,8 +455,9 @@ async fn export_fragment_duration_batches_blocks() { importer.finish().unwrap(); catalog.finish().unwrap(); - let mut exporter = crate::container::mkv::Export::new(consumer) - .unwrap() + let catalog_stream = + crate::catalog::Consumer::new(&consumer, crate::catalog::CatalogFormat::Hang).expect("catalog consumer"); + let mut exporter = crate::container::mkv::Export::new(consumer, catalog_stream) .with_fragment_duration(std::time::Duration::from_secs(2)); let mut exported: Vec = Vec::new(); diff --git a/rs/moq-mux/src/container/mod.rs b/rs/moq-mux/src/container/mod.rs index cfb87651b..8d854508c 100644 --- a/rs/moq-mux/src/container/mod.rs +++ b/rs/moq-mux/src/container/mod.rs @@ -27,7 +27,7 @@ pub mod mkv; pub use consumer::Consumer; pub use producer::Producer; -pub(crate) use source::{CatalogSource, ExportSource}; +pub(crate) use source::ExportSource; /// A decoded media frame: timestamp, payload bytes, keyframe flag. /// diff --git a/rs/moq-mux/src/container/source.rs b/rs/moq-mux/src/container/source.rs index 9644b932b..d7e001ef3 100644 --- a/rs/moq-mux/src/container/source.rs +++ b/rs/moq-mux/src/container/source.rs @@ -19,45 +19,11 @@ use std::time::Duration; use bytes::Bytes; use hang::catalog::{AudioConfig, VideoCodec, VideoConfig}; -use crate::catalog::CatalogFormat; use crate::catalog::hang::Container as HangContainer; use crate::codec::h264::Avc1; use crate::codec::h265::Hvc1; use crate::container::{Consumer, Frame}; -/// Source for the catalog stream backing an exporter. -/// -/// Both variants expose the same [`hang::Catalog`] shape; the MSF variant -/// converts on the fly so the rest of the pipeline only deals with hang types. -pub(crate) enum CatalogSource { - /// The hang catalog track (track name `catalog.json`, JSON payload). - Hang(crate::catalog::hang::Consumer), - /// The MSF catalog track (track name `catalog`, MSF JSON payload converted to hang). - Msf(crate::catalog::msf::Consumer), -} - -impl CatalogSource { - pub(crate) fn new(broadcast: &moq_net::BroadcastConsumer, format: CatalogFormat) -> Result { - Ok(match format { - CatalogFormat::Hang => { - let track = broadcast.subscribe_track(&hang::Catalog::default_track())?; - CatalogSource::Hang(crate::catalog::hang::Consumer::new(track)) - } - CatalogFormat::Msf => { - let track = broadcast.subscribe_track(&moq_net::Track::new(moq_msf::DEFAULT_NAME))?; - CatalogSource::Msf(crate::catalog::msf::Consumer::new(track)) - } - }) - } - - pub(crate) fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { - match self { - Self::Hang(c) => c.poll_next(waiter).map_err(Into::into), - Self::Msf(c) => c.poll_next(waiter), - } - } -} - /// Per-track video transform that bridges between codec shapes. pub(crate) enum VideoTransform { Avc1(Avc1), @@ -114,6 +80,29 @@ impl ExportSource { }) } + /// Subscribe to a video rendition without attaching any codec-shape + /// transform. Payloads pass through untouched (Annex-B stays Annex-B, + /// avc1 length-prefixed stays length-prefixed). The Annex-B exporter + /// uses this to keep parameter sets in-band. + pub fn for_video_raw( + broadcast: &moq_net::BroadcastConsumer, + name: &str, + config: &VideoConfig, + latency: Duration, + ) -> Result { + let media: HangContainer = (&config.container).try_into()?; + let track = broadcast.subscribe_track(&moq_net::Track::new(name.to_string()))?; + let consumer = Consumer::new(track, media).with_latency(latency); + + let description = config.description.as_ref().filter(|b| !b.is_empty()).cloned(); + + Ok(Self { + consumer, + transform: None, + description, + }) + } + /// Subscribe to an audio rendition. Audio has no codec-shape transform; /// `description` is taken straight from the catalog. pub fn for_audio(