Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions rs/hang/src/catalog/audio/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@ pub enum AudioCodec {
Unknown(String),
}

/// Coarse audio codec family, used for tag-only matching.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum AudioCodecKind {
AAC,
Opus,
Unknown,
}

impl AudioCodec {
/// Return the coarse codec family for tag-only matching.
pub fn kind(&self) -> AudioCodecKind {
match self {
Self::AAC(_) => AudioCodecKind::AAC,
Self::Opus => AudioCodecKind::Opus,
Self::Unknown(_) => AudioCodecKind::Unknown,
}
}
}

impl FromStr for AudioCodec {
type Err = Error;

Expand Down
26 changes: 26 additions & 0 deletions rs/hang/src/catalog/video/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ pub enum VideoCodec {
Unknown(String),
}

/// Coarse video codec family, used for tag-only matching.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum VideoCodecKind {
H264,
H265,
VP8,
VP9,
AV1,
Unknown,
}

impl VideoCodec {
/// Return the coarse codec family for tag-only matching.
pub fn kind(&self) -> VideoCodecKind {
match self {
Self::H264(_) => VideoCodecKind::H264,
Self::H265(_) => VideoCodecKind::H265,
Self::VP9(_) => VideoCodecKind::VP9,
Self::AV1(_) => VideoCodecKind::AV1,
Self::VP8 => VideoCodecKind::VP8,
Self::Unknown(_) => VideoCodecKind::Unknown,
}
}
}

impl FromStr for VideoCodec {
type Err = Error;

Expand Down
206 changes: 197 additions & 9 deletions rs/moq-cli/src/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::time::Duration;

use clap::ValueEnum;
use hang::catalog::{AudioCodecKind, VideoCodecKind};
use hang::moq_net;
use moq_mux::catalog::CatalogFormat;
use moq_mux::catalog::{self, CatalogFormat, FilterAudio, FilterVideo, Stream, TargetAudio, TargetVideo};
use tokio::io::AsyncWriteExt;

#[derive(ValueEnum, Clone, Copy)]
pub enum SubscribeFormat {
Fmp4,
Mkv,
/// H.264 Annex-B elementary stream (no container).
H264,
/// H.265 Annex-B elementary stream (no container).
H265,
}

/// `clap` adapter for [`CatalogFormat`] (which is `#[non_exhaustive]` and so
Expand All @@ -28,6 +33,44 @@ impl From<CatalogFormatArg> for CatalogFormat {
}
}

/// `clap` adapter for [`VideoCodecKind`].
#[derive(ValueEnum, Clone, Copy)]
pub enum VideoCodecArg {
H264,
H265,
Vp8,
Vp9,
Av1,
}

impl From<VideoCodecArg> for VideoCodecKind {
fn from(value: VideoCodecArg) -> Self {
match value {
VideoCodecArg::H264 => Self::H264,
VideoCodecArg::H265 => Self::H265,
VideoCodecArg::Vp8 => Self::VP8,
VideoCodecArg::Vp9 => Self::VP9,
VideoCodecArg::Av1 => Self::AV1,
}
}
}

/// `clap` adapter for [`AudioCodecKind`].
#[derive(ValueEnum, Clone, Copy)]
pub enum AudioCodecArg {
Aac,
Opus,
}

impl From<AudioCodecArg> for AudioCodecKind {
fn from(value: AudioCodecArg) -> Self {
match value {
AudioCodecArg::Aac => Self::AAC,
AudioCodecArg::Opus => Self::Opus,
}
}
}

#[derive(clap::Args, Clone)]
pub struct SubscribeArgs {
/// The format to write to stdout.
Expand All @@ -52,6 +95,42 @@ pub struct SubscribeArgs {
/// (`.hang` -> hang, `.msf` -> msf), falling back to hang.
#[arg(long)]
pub catalog: Option<CatalogFormatArg>,

/// Pick the video rendition with this exact name.
#[arg(long)]
pub video_name: Option<String>,

/// Keep only video renditions whose codec family matches.
#[arg(long)]
pub video_codec: Option<VideoCodecArg>,

/// Prefer a video rendition no wider than this (px).
#[arg(long)]
pub video_width_max: Option<u32>,

/// Prefer a video rendition no taller than this (px).
#[arg(long)]
pub video_height_max: Option<u32>,

/// Prefer a video rendition with at most this many pixels (`coded_width * coded_height`).
#[arg(long)]
pub video_pixels_max: Option<u32>,

/// Prefer a video rendition under this bitrate (bits per second).
#[arg(long)]
pub video_bitrate_max: Option<u64>,

/// Pick the audio rendition with this exact name.
#[arg(long)]
pub audio_name: Option<String>,

/// Keep only audio renditions whose codec family matches.
#[arg(long)]
pub audio_codec: Option<AudioCodecArg>,

/// Prefer an audio rendition under this bitrate (bits per second).
#[arg(long)]
pub audio_bitrate_max: Option<u64>,
}

impl SubscribeArgs {
Expand All @@ -63,6 +142,72 @@ impl SubscribeArgs {
.or_else(|| CatalogFormat::detect(broadcast))
.unwrap_or_default()
}

/// Codec implied by the output format. `--format h264` / `--format h265`
/// each force a single codec family; container formats leave it open.
fn format_codec(&self) -> Option<VideoCodecKind> {
match self.format {
SubscribeFormat::H264 => Some(VideoCodecKind::H264),
SubscribeFormat::H265 => Some(VideoCodecKind::H265),
SubscribeFormat::Fmp4 | SubscribeFormat::Mkv => None,
}
}

/// Build a video filter from the parsed flags, plus any codec defaulted by
/// the chosen output format (e.g. `--format h264` implies `codec = H264`).
///
/// Errors if `--video-codec` contradicts the format-implied codec — fail
/// fast in the CLI rather than later in the exporter.
fn filter_video(&self) -> anyhow::Result<Option<FilterVideo>> {
let user_codec = self.video_codec.map(VideoCodecKind::from);
let codec = match (self.format_codec(), user_codec) {
(Some(fmt), Some(user)) if fmt != user => {
anyhow::bail!(
"--format implies video codec {fmt:?}, but --video-codec {user:?} was passed; \
remove --video-codec or pick a matching format"
);
}
(Some(fmt), _) => Some(fmt),
(None, user) => user,
};
if self.video_name.is_none() && codec.is_none() {
return Ok(None);
}
Ok(Some(FilterVideo {
name: self.video_name.clone(),
codec,
}))
}

fn filter_audio(&self) -> Option<FilterAudio> {
if self.audio_name.is_none() && self.audio_codec.is_none() {
return None;
}
Some(FilterAudio {
name: self.audio_name.clone(),
codec: self.audio_codec.map(Into::into),
})
}

fn target_video(&self) -> Option<TargetVideo> {
if self.video_width_max.is_none()
&& self.video_height_max.is_none()
&& self.video_pixels_max.is_none()
&& self.video_bitrate_max.is_none()
{
return None;
}
Some(TargetVideo {
width: self.video_width_max,
height: self.video_height_max,
pixels: self.video_pixels_max,
bitrate: self.video_bitrate_max,
})
}

fn target_audio(&self) -> Option<TargetAudio> {
self.audio_bitrate_max.map(|b| TargetAudio { bitrate: Some(b) })
}
}

pub struct Subscribe {
Expand All @@ -80,20 +225,35 @@ impl Subscribe {
}
}

/// Build the catalog stream from the configured filter/target flags.
fn stream(&self) -> anyhow::Result<catalog::Target<catalog::Filter<catalog::Consumer>>> {
let consumer = catalog::Consumer::new(&self.broadcast, self.catalog)?;

let mut filter = consumer.filter();
filter.set_video(self.args.filter_video()?);
filter.set_audio(self.args.filter_audio());

let mut target = filter.target();
target.set_video(self.args.target_video());
target.set_audio(self.args.target_audio());

Ok(target)
}

pub async fn run(self) -> anyhow::Result<()> {
match self.args.format {
SubscribeFormat::Fmp4 => self.run_fmp4().await,
SubscribeFormat::Mkv => self.run_mkv().await,
SubscribeFormat::H264 => self.run_h264().await,
SubscribeFormat::H265 => self.run_h265().await,
}
}

async fn run_fmp4(self) -> anyhow::Result<()> {
let mut stdout = tokio::io::stdout();

// Fmp4 subscribes to the catalog internally, builds the merged init segment
// from the first catalog snapshot, then yields moof+mdat fragments in
// timestamp order across tracks.
let mut fmp4 = moq_mux::container::fmp4::Export::with_catalog_format(self.broadcast, self.catalog)?
let stream = self.stream()?;
let mut fmp4 = moq_mux::container::fmp4::Export::new(self.broadcast.clone(), stream)
.with_latency(self.args.max_latency)
.with_fragment_duration(self.args.fragment_duration);

Expand All @@ -108,10 +268,8 @@ impl Subscribe {
async fn run_mkv(self) -> anyhow::Result<()> {
let mut stdout = tokio::io::stdout();

// Mkv writes EBML + an unknown-size Segment header, then per-fragment
// Cluster elements. Avc3/Hev1 sources are transcoded to avc1/hvc1
// shape internally (synthesizing avcC/hvcC from inline parameter sets).
let mut mkv = moq_mux::container::mkv::Export::with_catalog_format(self.broadcast, self.catalog)?
let stream = self.stream()?;
let mut mkv = moq_mux::container::mkv::Export::new(self.broadcast.clone(), stream)
.with_latency(self.args.max_latency)
.with_fragment_duration(self.args.fragment_duration);

Expand All @@ -122,4 +280,34 @@ impl Subscribe {

Ok(())
}

async fn run_h264(self) -> anyhow::Result<()> {
let mut stdout = tokio::io::stdout();

let stream = self.stream()?;
let mut h264 =
moq_mux::codec::h264::Export::new(self.broadcast.clone(), stream).with_latency(self.args.max_latency);

while let Some(chunk) = h264.next().await? {
stdout.write_all(&chunk).await?;
stdout.flush().await?;
}

Ok(())
}

async fn run_h265(self) -> anyhow::Result<()> {
let mut stdout = tokio::io::stdout();

let stream = self.stream()?;
let mut h265 =
moq_mux::codec::h265::Export::new(self.broadcast.clone(), stream).with_latency(self.args.max_latency);

while let Some(chunk) = h265.next().await? {
stdout.write_all(&chunk).await?;
stdout.flush().await?;
}

Ok(())
}
}
46 changes: 46 additions & 0 deletions rs/moq-mux/src/catalog/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//! Unified catalog consumer.
//!
//! Subscribes to whichever catalog track ([`hang`] or [`msf`]) the broadcast
//! advertises and yields [`hang::Catalog`] snapshots so callers and exporters
//! only deal with one shape.

use std::task::Poll;

use hang::Catalog;

use super::{CatalogFormat, Stream};

/// A catalog stream sourced from a [`moq_net::BroadcastConsumer`].
///
/// Both variants emit [`hang::Catalog`]; the MSF variant converts each snapshot
/// on the fly. Wrap with [`Filter`](super::Filter) / [`Target`](super::Target)
/// to narrow the rendition set before handing the stream to an exporter.
pub enum Consumer {
Hang(super::hang::Consumer),
Msf(super::msf::Consumer),
}

impl Consumer {
/// Subscribe to the catalog track advertised by `format`.
pub fn new(broadcast: &moq_net::BroadcastConsumer, format: CatalogFormat) -> Result<Self, crate::Error> {
Ok(match format {
CatalogFormat::Hang => {
let track = broadcast.subscribe_track(&hang::Catalog::default_track())?;
Self::Hang(super::hang::Consumer::new(track))
}
CatalogFormat::Msf => {
let track = broadcast.subscribe_track(&moq_net::Track::new(moq_msf::DEFAULT_NAME))?;
Self::Msf(super::msf::Consumer::new(track))
}
})
}
}

impl Stream for Consumer {
fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll<crate::Result<Option<Catalog>>> {
match self {
Self::Hang(c) => c.poll_next(waiter),
Self::Msf(c) => c.poll_next(waiter).map_err(Into::into),
}
}
}
Loading
Loading