diff --git a/CLAUDE.md b/CLAUDE.md index e835bb5..2e33ca2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -This is a GStreamer plugin for Media over QUIC (MoQ), written in Rust. It provides `hangsink` and `hangsrc` elements that enable publishing and subscribing to media streams using the MoQ protocol over QUIC transport. +This is a GStreamer plugin for Media over QUIC (MoQ), written in Rust. It provides `moqsink` and `moqsrc` elements that enable publishing and subscribing to media streams using the MoQ protocol over QUIC transport. ## Development Setup @@ -50,7 +50,7 @@ just fix just relay # Publish video stream with broadcast name -just pub-gst bbb +just pub bbb # Subscribe to video stream with broadcast name just sub bbb @@ -59,35 +59,29 @@ just sub bbb ## Architecture ### Plugin Structure -- **lib.rs**: Main plugin entry point, registers both sink and source elements as "hang" plugin -- **sink/**: Hang sink element (`hangsink`) for publishing streams - - `mod.rs`: GStreamer element wrapper for HangSink +- **lib.rs**: Main plugin entry point, registers both sink and source elements as "moq" plugin +- **sink/**: MoQ sink element (`moqsink`) for publishing streams + - `mod.rs`: GStreamer element wrapper for MoqSink - `imp.rs`: Core implementation with async Tokio runtime -- **source/**: Hang source element (`hangsrc`) for consuming streams - - `mod.rs`: GStreamer element wrapper for HangSrc +- **source/**: MoQ source element (`moqsrc`) for consuming streams + - `mod.rs`: GStreamer element wrapper for MoqSrc - `imp.rs`: Core implementation with async Tokio runtime ### Key Dependencies -- **hang**: Higher-level hang protocol utilities and CMAF handling +- **hang**: Higher-level protocol utilities and catalog/container handling - **moq-mux**: MoQ muxing/demuxing for media streams - **moq-lite**: Lightweight MoQ protocol types - **moq-native**: Core MoQ protocol implementation with QUIC/TLS - **gstreamer**: GStreamer bindings for Rust -- **tokio**: Async runtime (single-threaded worker pool) +- **tokio**: Async runtime ### Plugin Elements -- `hangsink`: BaseSink element that accepts media data and publishes via MoQ with broadcast name -- `hangsrc`: Bin element that receives MoQ streams and outputs GStreamer buffers +- `moqsink`: Element with request pads (`video_%u`, `audio_%u`) that accepts media data and publishes via MoQ +- `moqsrc`: Bin element that receives MoQ streams and outputs GStreamer buffers -Both elements use a shared Tokio runtime and support TLS configuration options. They require broadcast names for operation. +Both elements use a shared Tokio runtime and support TLS configuration options (url, broadcast, tls-disable-verify). ## Environment Variables -- `RUST_LOG=info`: Controls logging level -- `URL=http://localhost:4443`: Default relay server URL -- `GST_PLUGIN_PATH`: Must include the built plugin directory - -## Notable Changes from moq-gst -- Renamed from moq-gst to hang-gst -- Element names changed from moqsink/moqsrc to hangsink/hangsrc -- Added broadcast parameter requirement for both elements -- Updated justfile commands to include broadcast parameters \ No newline at end of file +- `RUST_LOG`: Controls logging level (default: info, overridable via environment) +- `URL`: Relay server URL (default: http://localhost:4443) +- `GST_PLUGIN_PATH_1_0`: Must include the built plugin directory (handled automatically by justfile) \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index fdd879a..7fc77af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,12 +124,6 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" -[[package]] -name = "atomic_refcell" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c" - [[package]] name = "autocfg" version = "1.5.0" @@ -901,33 +895,6 @@ dependencies = [ "thiserror 2.0.17", ] -[[package]] -name = "gstreamer-base" -version = "0.23.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f19a74fd04ffdcb847dd322640f2cf520897129d00a7bcb92fd62a63f3e27404" -dependencies = [ - "atomic_refcell", - "cfg-if", - "glib", - "gstreamer", - "gstreamer-base-sys", - "libc", -] - -[[package]] -name = "gstreamer-base-sys" -version = "0.23.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f2fb0037b6d3c5b51f60dea11e667910f33be222308ca5a101450018a09840" -dependencies = [ - "glib-sys", - "gobject-sys", - "gstreamer-sys", - "libc", - "system-deps", -] - [[package]] name = "gstreamer-sys" version = "0.23.6" @@ -1474,7 +1441,6 @@ dependencies = [ "bytes", "gst-plugin-version-helper", "gstreamer", - "gstreamer-base", "hang", "moq-lite", "moq-mux", diff --git a/Cargo.toml b/Cargo.toml index 68c5099..0285b53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ moq-native = "0.13.1" anyhow = { version = "1", features = ["backtrace"] } bytes = "1" gst = { package = "gstreamer", version = "0.23" } -gst-base = { package = "gstreamer-base", version = "0.23" } #gst-app = { package = "gstreamer-app", version = "0.23", features = ["v1_20"] } tokio = { version = "1", features = ["full"] } diff --git a/justfile b/justfile index 3f5d6ad..3e03a0d 100644 --- a/justfile +++ b/justfile @@ -3,8 +3,8 @@ # Using Just: https://github.com/casey/just?tab=readme-ov-file#installation export RUST_BACKTRACE := "1" -export RUST_LOG := "info" -export URL := "http://localhost:4443" +export RUST_LOG := env_var_or_default("RUST_LOG", "info") +export URL := "http://localhost:4443/anon" #export GST_DEBUG:="*:4" # List all of the available commands. @@ -44,10 +44,11 @@ pub broadcast: (download "bbb" "http://commondatastorage.googleapis.com/gtv-vide cargo build # Run gstreamer and pipe the output to our plugin - GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \ - gst-launch-1.0 -v -e multifilesrc location="dev/bbb.fmp4" loop=true ! qtdemux name=demux \ - demux.video_0 ! h264parse ! queue ! identity sync=true ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink url="$URL" broadcast="{{broadcast}}" tls-disable-verify=true \ - demux.audio_0 ! aacparse ! queue ! mux. + GST_PLUGIN_PATH_1_0="${PWD}/target/debug${GST_PLUGIN_PATH_1_0:+:$GST_PLUGIN_PATH_1_0}" \ + gst-launch-1.0 -v -e multifilesrc location="dev/bbb.fmp4" loop=true ! parsebin name=parse \ + parse. ! queue ! identity sync=true ! mux.sink_0 \ + parse. ! queue ! identity sync=true ! mux.sink_1 \ + moqsink name=mux url="$URL" broadcast="{{broadcast}}" tls-disable-verify=true # Subscribe to a video using gstreamer sub broadcast: @@ -56,7 +57,7 @@ sub broadcast: # Run gstreamer and pipe the output to our plugin # This will render the video to the screen - GST_PLUGIN_PATH="${PWD}/target/debug${GST_PLUGIN_PATH:+:$GST_PLUGIN_PATH}" \ + GST_PLUGIN_PATH_1_0="${PWD}/target/debug${GST_PLUGIN_PATH_1_0:+:$GST_PLUGIN_PATH_1_0}" \ gst-launch-1.0 -v -e moqsrc url="$URL" broadcast="{{broadcast}}" tls-disable-verify=true ! decodebin ! videoconvert ! autovideosink # Run the CI checks diff --git a/src/sink/imp.rs b/src/sink/imp.rs index aa77c7d..9145e1a 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -1,15 +1,16 @@ use anyhow::Context as _; -use bytes::BytesMut; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; -use gst_base::subclass::prelude::*; -use std::sync::Arc; +use std::collections::HashMap; use std::sync::LazyLock; use std::sync::Mutex; use url::Url; +static CAT: LazyLock = + LazyLock::new(|| gst::DebugCategory::new("moq-sink", gst::DebugColorFlags::empty(), Some("MoQ Sink Element"))); + pub static RUNTIME: LazyLock = LazyLock::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -25,27 +26,29 @@ struct Settings { pub tls_disable_verify: bool, } -#[derive(Default)] +struct PadState { + decoder: moq_mux::import::Decoder, + reference_pts: Option, +} + struct State { - pub media: Option, - pub buffer: BytesMut, + _session: moq_lite::Session, + broadcast: moq_lite::BroadcastProducer, + catalog: hang::CatalogProducer, + pads: HashMap, } #[derive(Default)] pub struct MoqSink { settings: Mutex, - state: Arc>, + state: Mutex>, } #[glib::object_subclass] impl ObjectSubclass for MoqSink { const NAME: &'static str = "MoqSink"; type Type = super::MoqSink; - type ParentType = gst_base::BaseSink; - - fn new() -> Self { - Self::default() - } + type ParentType = gst::Element; } impl ObjectImpl for MoqSink { @@ -111,54 +114,92 @@ impl ElementImpl for MoqSink { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { - let caps = gst::Caps::builder("video/quicktime") - .field("variant", "iso-fragmented") - .build(); + let mut caps = gst::Caps::new_empty(); + // Video + caps.merge( + gst::Caps::builder("video/x-h264") + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build(), + ); + caps.merge( + gst::Caps::builder("video/x-h265") + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build(), + ); + caps.merge(gst::Caps::builder("video/x-av1").build()); + // Audio + caps.merge( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .build(), + ); + caps.merge(gst::Caps::builder("audio/x-opus").build()); - let pad_template = - gst::PadTemplate::new("sink", gst::PadDirection::Sink, gst::PadPresence::Always, &caps).unwrap(); + let templ = + gst::PadTemplate::new("sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &caps).unwrap(); - vec![pad_template] + vec![templ] }); PAD_TEMPLATES.as_ref() } -} -impl BaseSinkImpl for MoqSink { - fn start(&self) -> Result<(), gst::ErrorMessage> { - let _guard = RUNTIME.enter(); - self.setup() - .map_err(|e| gst::error_msg!(gst::ResourceError::Failed, ["Failed to connect: {}", e])) + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + let builder = gst::Pad::builder_from_template(templ) + .chain_function(|pad, parent, buffer| { + let element = parent + .and_then(|p| p.downcast_ref::()) + .ok_or(gst::FlowError::Error)?; + element.imp().sink_chain(pad, buffer) + }) + .event_function(|pad, parent, event| { + let Some(element) = parent.and_then(|p| p.downcast_ref::()) else { + return false; + }; + element.imp().sink_event(pad, event) + }); + + let pad = if let Some(name) = name { + builder.name(name).build() + } else { + builder.build() + }; + + self.obj().add_pad(&pad).ok()?; + Some(pad) } - fn stop(&self) -> Result<(), gst::ErrorMessage> { - Ok(()) + fn release_pad(&self, pad: &gst::Pad) { + let pad_name = pad.name().to_string(); + if let Some(ref mut state) = *self.state.lock().unwrap() { + state.pads.remove(&pad_name); + } + let _ = self.obj().remove_pad(pad); } - fn render(&self, buffer: &gst::Buffer) -> Result { - let _guard = RUNTIME.enter(); - let data = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - - let mut state = self.state.lock().unwrap(); - - // Append incoming data to our buffer - state.buffer.extend_from_slice(data.as_slice()); - - // Take media out temporarily to avoid borrow conflict - let mut media = state.media.take().expect("not initialized"); - - // Try to decode what we have buffered - let result = media.decode(&mut state.buffer); - - // Put media back - state.media = Some(media); - - if let Err(e) = result { - gst::error!(gst::CAT_DEFAULT, "Failed to decode: {}", e); - return Err(gst::FlowError::Error); + fn change_state(&self, transition: gst::StateChange) -> Result { + match transition { + gst::StateChange::ReadyToPaused => { + let _guard = RUNTIME.enter(); + self.setup().map_err(|e| { + gst::error!(CAT, obj = self.obj(), "Failed to setup: {:?}", e); + gst::StateChangeError + })?; + } + gst::StateChange::PausedToReady => { + *self.state.lock().unwrap() = None; + } + _ => (), } - Ok(gst::FlowSuccess::Ok) + self.parent_change_state(transition) } } @@ -186,14 +227,122 @@ impl MoqSink { let client = config.init()?.with_publish(origin.consume()); RUNTIME.block_on(async { - let _session = client.connect(url).await.context("failed to connect")?; - - let media = moq_mux::import::Fmp4::new(broadcast, catalog, Default::default()); + let session = client.connect(url).await.context("failed to connect")?; - let mut state = self.state.lock().unwrap(); - state.media = Some(media); + *self.state.lock().unwrap() = Some(State { + _session: session, + broadcast, + catalog, + pads: HashMap::new(), + }); anyhow::Ok(()) }) } + + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + match event.view() { + gst::EventView::Caps(caps_event) => { + let caps = caps_event.caps(); + if let Err(e) = self.handle_caps(pad, caps) { + gst::error!(CAT, obj = pad, "Failed to handle caps: {:?}", e); + return false; + } + true + } + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } + + fn handle_caps(&self, pad: &gst::Pad, caps: &gst::CapsRef) -> anyhow::Result<()> { + let structure = caps.structure(0).context("empty caps")?; + let pad_name = pad.name().to_string(); + + let format = match structure.name().as_str() { + "video/x-h264" => moq_mux::import::DecoderFormat::Avc3, + "video/x-h265" => moq_mux::import::DecoderFormat::Hev1, + "video/x-av1" => moq_mux::import::DecoderFormat::Av01, + "audio/mpeg" => moq_mux::import::DecoderFormat::Aac, + "audio/x-opus" => moq_mux::import::DecoderFormat::Opus, + other => anyhow::bail!("unsupported caps: {}", other), + }; + + let mut state = self.state.lock().unwrap(); + let state = state.as_mut().context("not connected")?; + + let mut decoder = moq_mux::import::Decoder::new(state.broadcast.clone(), state.catalog.clone(), format); + + // Initialize audio decoders that need external config + match format { + moq_mux::import::DecoderFormat::Aac => { + // aacparse provides AudioSpecificConfig as codec_data in caps + let codec_data = structure + .get::("codec_data") + .context("AAC caps missing codec_data")?; + let map = codec_data.map_readable().context("failed to map codec_data buffer")?; + let mut data = bytes::Bytes::copy_from_slice(map.as_slice()); + decoder.initialize(&mut data)?; + } + moq_mux::import::DecoderFormat::Opus => { + // Synthesize OpusHead from caps fields + let channels: i32 = structure.get("channels").unwrap_or(2); + let rate: i32 = structure.get("rate").unwrap_or(48000); + + let mut opus_head = Vec::with_capacity(19); + opus_head.extend_from_slice(b"OpusHead"); + opus_head.push(1); // version + opus_head.push(channels as u8); + opus_head.extend_from_slice(&0u16.to_le_bytes()); // pre_skip + opus_head.extend_from_slice(&(rate as u32).to_le_bytes()); + opus_head.extend_from_slice(&0i16.to_le_bytes()); // gain + opus_head.push(0); // channel mapping family + + let mut data = bytes::Bytes::from(opus_head); + decoder.initialize(&mut data)?; + } + _ => {} // Video codecs self-initialize from inline data + } + + state.pads.insert( + pad_name.clone(), + PadState { + decoder, + reference_pts: None, + }, + ); + + gst::info!(CAT, obj = pad, "Configured pad {} with format {:?}", pad_name, format); + + Ok(()) + } + + fn sink_chain(&self, pad: &gst::Pad, buffer: gst::Buffer) -> Result { + let _guard = RUNTIME.enter(); + + let pad_name = pad.name(); + let mut state = self.state.lock().unwrap(); + let state = state.as_mut().ok_or(gst::FlowError::Error)?; + + let pad_state = state.pads.get_mut(pad_name.as_str()).ok_or_else(|| { + gst::error!(CAT, obj = pad, "Pad {} not configured", pad_name); + gst::FlowError::Error + })?; + + // Compute relative PTS in microseconds + let pts = buffer.pts().and_then(|pts| { + let reference = *pad_state.reference_pts.get_or_insert(pts); + let relative = pts.checked_sub(reference)?; + hang::container::Timestamp::from_micros(relative.nseconds() / 1000).ok() + }); + + let data = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + let mut bytes = bytes::Bytes::copy_from_slice(data.as_slice()); + + pad_state.decoder.decode_frame(&mut bytes, pts).map_err(|e| { + gst::error!(CAT, obj = pad, "Failed to decode: {}", e); + gst::FlowError::Error + })?; + + Ok(gst::FlowSuccess::Ok) + } } diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 17dbc07..80d890a 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -4,7 +4,7 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct MoqSink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; + pub struct MoqSink(ObjectSubclass) @extends gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {