diff --git a/.changeset/add_support_for_frame_level_packet_trailer.md b/.changeset/add_support_for_frame_level_packet_trailer.md new file mode 100644 index 000000000..3c7c6e90c --- /dev/null +++ b/.changeset/add_support_for_frame_level_packet_trailer.md @@ -0,0 +1,19 @@ +--- +livekit: minor +livekit-protocol: minor +livekit-api: minor +livekit-wakeword: no changelog additions +soxr-sys: no changelog additions +webrtc-sys-build: no changelog additions +webrtc-sys: minor +livekit-ffi: minor +yuv-sys: no changelog additions +libwebrtc: minor +imgproc: no changelog additions +--- + +# Add support for frame level packet trailer + +#890 by @chenosaurus + +- Add support to attach/parse frame level timestamps & frame ID to VideoTracks as a custom payload trailer. \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 66e4735da..822b4bfbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4168,10 +4168,11 @@ dependencies = [ [[package]] name = "local_video" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "bytemuck", + "chrono", "clap", "eframe", "egui", diff --git a/examples/local_video/Cargo.toml b/examples/local_video/Cargo.toml index b865a4808..3206a5a2b 100644 --- a/examples/local_video/Cargo.toml +++ b/examples/local_video/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "local_video" -version = "0.1.0" +version = "0.2.0" edition.workspace = true publish = false @@ -41,6 +41,7 @@ wgpu = { workspace = true } winit = { workspace = true, features = [ "android-native-activity" ] } parking_lot = { workspace = true, features = ["deadlock_detection"] } anyhow = { workspace = true } +chrono = "0.4" bytemuck = { version = "1.16", features = ["derive"] } nokhwa = { version = "0.10", default-features = false, features = ["output-threaded"] } diff --git a/examples/local_video/README.md b/examples/local_video/README.md index e6b0b04b5..7cc6f3a20 100644 --- a/examples/local_video/README.md +++ b/examples/local_video/README.md @@ -30,6 +30,38 @@ Publisher usage: --url https://your.livekit.server \ --api-key YOUR_KEY \ --api-secret YOUR_SECRET + + # publish with a user timestamp attached to every frame + cargo run -p local_video -F desktop --bin publisher -- \ + --camera-index 0 \ + --room-name demo \ + --identity cam-1 \ + --attach-timestamp + + # publish with timestamp burned into the video and a frame ID in the packet trailer + cargo run -p local_video -F desktop --bin publisher -- \ + --camera-index 0 \ + --room-name demo \ + --identity cam-1 \ + --attach-timestamp \ + --burn-timestamp \ + --attach-frame-id + + # publish at a custom resolution and framerate + cargo run -p local_video -F desktop --bin publisher -- \ + --camera-index 0 \ + --width 1920 \ + --height 1080 \ + --fps 60 \ + --room-name demo \ + --identity cam-1 + + # publish with end-to-end encryption + cargo run -p local_video -F desktop --bin publisher -- \ + --camera-index 0 \ + --room-name demo \ + --identity cam-1 \ + --e2ee-key my-secret-key ``` List devices usage: @@ -38,9 +70,17 @@ List devices usage: ``` Publisher flags (in addition to the common connection flags above): +- `--camera-index `: Camera index to use (default: `0`). Use `--list-cameras` to see available indices. +- `--width `: Desired capture width (default: `1280`). +- `--height `: Desired capture height (default: `720`). +- `--fps `: Desired capture framerate (default: `30`). - `--h265`: Use H.265/HEVC encoding if supported (falls back to H.264 on failure). - `--simulcast`: Publish simulcast video (multiple layers when the resolution is large enough). - `--max-bitrate `: Max video bitrate for the main (highest) layer in bits per second (e.g. `1500000`). +- `--attach-timestamp`: Attach the current wall-clock time (microseconds since UNIX epoch) as the user timestamp on each published frame. The subscriber can display this to measure end-to-end latency. +- `--burn-timestamp`: Burn the attached timestamp into the video frame as a visible overlay. Has no effect unless `--attach-timestamp` is also set. +- `--attach-frame-id`: Attach a monotonically increasing frame ID to each published frame via the packet trailer. The subscriber displays this in the timestamp overlay when `--display-timestamp` is used. +- `--e2ee-key `: Enable end-to-end encryption with the given shared key. The subscriber must use the same key to decrypt. Subscriber usage: ``` @@ -55,13 +95,31 @@ Subscriber usage: --api-key YOUR_KEY \ --api-secret YOUR_SECRET - # subscribe to a specific participant's video only - cargo run -p local_video -F desktop --bin subscriber -- \ - --room-name demo \ - --identity viewer-1 \ - --participant alice + # subscribe to a specific participant's video only + cargo run -p local_video -F desktop --bin subscriber -- \ + --room-name demo \ + --identity viewer-1 \ + --participant alice + + # display timestamp overlay (requires publisher to use --attach-timestamp) + cargo run -p local_video -F desktop --bin subscriber -- \ + --room-name demo \ + --identity viewer-1 \ + --display-timestamp + + # subscribe with end-to-end encryption (must match publisher's key) + cargo run -p local_video -F desktop --bin subscriber -- \ + --room-name demo \ + --identity viewer-1 \ + --e2ee-key my-secret-key ``` +Subscriber flags (in addition to the common connection flags above): +- `--participant `: Only subscribe to video tracks from the specified participant. +- `--display-timestamp`: Show a top-left overlay with frame ID, the publisher's timestamp, the subscriber's current time, and the computed end-to-end latency. Timestamp fields require the publisher to use `--attach-timestamp`; frame ID requires `--attach-frame-id`. +- `--e2ee-key `: Enable end-to-end decryption with the given shared key. Must match the key used by the publisher. + Notes: -- `--participant` limits subscription to video tracks from the specified participant identity. - If the active video track is unsubscribed or unpublished, the app clears its state and will automatically attach to the next matching video track when it appears. +- For E2EE to work, both publisher and subscriber must specify the same `--e2ee-key` value. If the keys don't match, the subscriber will not be able to decode the video. +- The timestamp overlay updates at ~2 Hz so the latency value is readable rather than flickering every frame. diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index dc56f5969..21aa4950a 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -1,8 +1,11 @@ use anyhow::Result; use clap::Parser; -use livekit::options::{TrackPublishOptions, VideoCodec, VideoEncoding}; +use livekit::e2ee::{key_provider::*, E2eeOptions, EncryptionType}; +use livekit::options::{ + self, video as video_presets, TrackPublishOptions, VideoCodec, VideoEncoding, VideoPreset, +}; use livekit::prelude::*; -use livekit::webrtc::video_frame::{I420Buffer, VideoFrame, VideoRotation}; +use livekit::webrtc::video_frame::{FrameMetadata, I420Buffer, VideoFrame, VideoRotation}; use livekit::webrtc::video_source::native::NativeVideoSource; use livekit::webrtc::video_source::{RtcVideoSource, VideoResolution}; use livekit_api::access_token; @@ -18,9 +21,13 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use yuv_sys; +mod timestamp_burn; + +use timestamp_burn::TimestampOverlay; + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -75,6 +82,103 @@ struct Args { /// Use H.265/HEVC encoding if supported (falls back to H.264 on failure) #[arg(long, default_value_t = false)] h265: bool, + + /// Attach the current system time (microseconds since UNIX epoch) as the user timestamp on each frame + #[arg(long, default_value_t = false)] + attach_timestamp: bool, + + /// Burn the attached timestamp into each video frame; does nothing unless --attach-timestamp is also enabled + #[arg(long, default_value_t = false)] + burn_timestamp: bool, + + /// Attach a monotonically increasing frame ID to each published frame via the packet trailer + #[arg(long, default_value_t = false)] + attach_frame_id: bool, + + /// Shared encryption key for E2EE (enables AES-GCM end-to-end encryption when set) + #[arg(long)] + e2ee_key: Option, +} + +fn unix_time_us_now() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64 +} + +#[derive(Default)] +struct RollingMs { + total_ms: f64, + samples: u64, +} + +impl RollingMs { + fn record(&mut self, value_ms: f64) { + self.total_ms += value_ms; + self.samples += 1; + } + + fn average(&self) -> Option { + (self.samples > 0).then_some(self.total_ms / self.samples as f64) + } + + fn reset(&mut self) { + *self = Self::default(); + } +} + +#[derive(Default)] +struct PublisherTimingSummary { + paced_wait_ms: RollingMs, + camera_frame_read_ms: RollingMs, + decode_mjpeg_ms: RollingMs, + buffer_convert_ms: RollingMs, + frame_draw_ms: RollingMs, + submit_to_webrtc_ms: RollingMs, + capture_to_webrtc_total_ms: RollingMs, +} + +impl PublisherTimingSummary { + fn reset(&mut self) { + self.paced_wait_ms.reset(); + self.camera_frame_read_ms.reset(); + self.decode_mjpeg_ms.reset(); + self.buffer_convert_ms.reset(); + self.frame_draw_ms.reset(); + self.submit_to_webrtc_ms.reset(); + self.capture_to_webrtc_total_ms.reset(); + } +} + +fn format_timing_line(timings: &PublisherTimingSummary) -> String { + let line_one = vec![ + format!("paced_wait {:.2}", timings.paced_wait_ms.average().unwrap_or_default()), + format!( + "camera_frame_read {:.2}", + timings.camera_frame_read_ms.average().unwrap_or_default() + ), + ]; + let mut line_two = Vec::new(); + + if let Some(decode_ms) = timings.decode_mjpeg_ms.average() { + line_two.push(format!("decode_mjpeg {:.2}", decode_ms)); + } + + line_two.push(format!( + "convert_to_i420 {:.2}", + timings.buffer_convert_ms.average().unwrap_or_default() + )); + if let Some(frame_draw_ms) = timings.frame_draw_ms.average() { + line_two.push(format!("frame_draw {:.2}", frame_draw_ms)); + } + line_two.push(format!( + "submit_to_webrtc {:.2}", + timings.submit_to_webrtc_ms.average().unwrap_or_default() + )); + line_two.push(format!( + "capture_to_webrtc_total {:.2}", + timings.capture_to_webrtc_total_ms.average().unwrap_or_default() + )); + + format!("Timing ms: {}\nTiming ms: {}", line_one.join(" | "), line_two.join(" | ")) } fn list_cameras() -> Result<()> { @@ -137,10 +241,29 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { info!("Connecting to LiveKit room '{}' as '{}'...", args.room_name, args.identity); let mut room_options = RoomOptions::default(); room_options.auto_subscribe = true; + room_options.dynacast = true; + + // Configure E2EE if an encryption key is provided + if let Some(ref e2ee_key) = args.e2ee_key { + let key_provider = KeyProvider::with_shared_key( + KeyProviderOptions::default(), + e2ee_key.as_bytes().to_vec(), + ); + room_options.encryption = + Some(E2eeOptions { encryption_type: EncryptionType::Gcm, key_provider }); + info!("E2EE enabled with AES-GCM encryption"); + } + let (room, _) = Room::connect(&url, &token, room_options).await?; let room = std::sync::Arc::new(room); info!("Connected: {} - {}", room.name(), room.sid().await); + // Enable E2EE after connection + if args.e2ee_key.is_some() { + room.e2ee_manager().set_enabled(true); + info!("End-to-end encryption activated"); + } + // Log room events { let room_clone = room.clone(); @@ -193,18 +316,49 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let requested_codec = if args.h265 { VideoCodec::H265 } else { VideoCodec::H264 }; info!("Attempting publish with codec: {}", requested_codec.as_str()); - let publish_opts = |codec: VideoCodec| { - let mut opts = TrackPublishOptions { - source: TrackSource::Camera, - simulcast: args.simulcast, - video_codec: codec, - ..Default::default() - }; - if let Some(bitrate) = args.max_bitrate { - opts.video_encoding = - Some(VideoEncoding { max_bitrate: bitrate, max_framerate: args.fps as f64 }); + // Compute an explicit video encoding so all simulcast layers use 30 fps. + // The SDK defaults reduce lower layers to 15/20 fps; we override that here. + let target_fps = args.fps as f64; + let main_encoding = { + let base = options::compute_appropriate_encoding(false, width, height, VideoCodec::H264); + VideoEncoding { + max_bitrate: args.max_bitrate.unwrap_or(base.max_bitrate), + max_framerate: target_fps, } - opts + }; + let simulcast_presets = compute_simulcast_presets_30fps(width, height, target_fps); + info!( + "Video encoding: {}x{} @ {:.0} fps, {} bps (simulcast layers: {})", + width, + height, + target_fps, + main_encoding.max_bitrate, + simulcast_presets + .iter() + .map(|p| format!( + "{}x{}@{:.0}fps/{}bps", + p.width, p.height, p.encoding.max_framerate, p.encoding.max_bitrate + )) + .collect::>() + .join(", "), + ); + + let mut packet_trailer_features = Vec::new(); + if args.attach_timestamp { + packet_trailer_features.push(PacketTrailerFeature::PtfUserTimestamp); + } + if args.attach_frame_id { + packet_trailer_features.push(PacketTrailerFeature::PtfFrameId); + } + + let publish_opts = |codec: VideoCodec| TrackPublishOptions { + source: TrackSource::Camera, + simulcast: args.simulcast, + video_codec: codec, + packet_trailer_features: packet_trailer_features.clone(), + video_encoding: Some(main_encoding.clone()), + simulcast_layers: Some(simulcast_presets.clone()), + ..Default::default() }; let publish_result = room @@ -230,6 +384,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let mut frame = VideoFrame { rotation: VideoRotation::VideoRotation0, timestamp_us: 0, + frame_metadata: None, buffer: I420Buffer::new(width, height), }; let is_yuyv = fmt.format() == FrameFormat::YUYV; @@ -252,34 +407,34 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { info!("Target frame interval: {:.2} ms", target.as_secs_f64() * 1000.0); // Timing accumulators (ms) for rolling stats - let mut sum_get_ms = 0.0; - let mut sum_decode_ms = 0.0; - let mut sum_convert_ms = 0.0; - let mut sum_capture_ms = 0.0; - let mut sum_sleep_ms = 0.0; - let mut sum_iter_ms = 0.0; + let mut timings = PublisherTimingSummary::default(); let mut logged_mjpeg_fallback = false; + let mut frame_counter: u32 = 1; + let mut timestamp_overlay = (args.attach_timestamp && args.burn_timestamp) + .then(|| TimestampOverlay::new(width, height)); loop { if ctrl_c_received.load(Ordering::Acquire) { break; } // Wait until the scheduled next frame time - let wait_start = Instant::now(); + let paced_wait_started_at = Instant::now(); ticker.tick().await; - let iter_start = Instant::now(); + let paced_wait_finished_at = Instant::now(); - // Get frame as RGB24 (decoded by nokhwa if needed) - let t0 = Instant::now(); + // Capture the frame as early as possible so the attached timestamp is + // close to the camera acquisition point. + let capture_wall_time_us = unix_time_us_now(); + let camera_capture_started_at = Instant::now(); let frame_buf = camera.frame()?; - let t1 = Instant::now(); + let camera_frame_acquired_at = Instant::now(); let (stride_y, stride_u, stride_v) = frame.buffer.strides(); let (data_y, data_u, data_v) = frame.buffer.data_mut(); - // Fast path for YUYV: convert directly to I420 via libyuv - let t2 = if is_yuyv { + let stride_y_usize = stride_y as usize; + let (decode_finished_at, convert_finished_at, used_decode_path) = if is_yuyv { + // Fast path for YUYV: convert directly to I420 via libyuv let src = frame_buf.buffer(); let src_bytes = src.as_ref(); let src_stride = (width * 2) as i32; // YUYV packed 4:2:2 - let t2_local = t1; // no decode step in YUYV path unsafe { // returns 0 on success let _ = yuv_sys::rs_YUY2ToI420( @@ -295,11 +450,11 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { height as i32, ); } - t2_local + (camera_frame_acquired_at, Instant::now(), false) } else { // Auto path (either RGB24 already or compressed MJPEG) let src = frame_buf.buffer(); - let t2_local = if src.len() == (width as usize * height as usize * 3) { + if src.len() == (width as usize * height as usize * 3) { // Already RGB24 from backend; convert directly unsafe { let _ = yuv_sys::rs_RGB24ToI420( @@ -315,11 +470,11 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { height as i32, ); } - Instant::now() + (camera_frame_acquired_at, Instant::now(), false) } else { // Try fast MJPEG->I420 via libyuv if available; fallback to image crate let mut used_fast_mjpeg = false; - let t2_try = unsafe { + let fast_mjpeg_buffer_ready_at = unsafe { // rs_MJPGToI420 returns 0 on success let ret = yuv_sys::rs_MJPGToI420( src.as_ref().as_ptr(), @@ -339,16 +494,17 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { used_fast_mjpeg = true; Instant::now() } else { - t1 + camera_frame_acquired_at } }; if used_fast_mjpeg { - t2_try + (fast_mjpeg_buffer_ready_at, fast_mjpeg_buffer_ready_at, true) } else { // Fallback: decode MJPEG using image crate then RGB24->I420 match image::load_from_memory(src.as_ref()) { Ok(img_dyn) => { let rgb8 = img_dyn.to_rgb8(); + let decode_finished_at = Instant::now(); let dec_w = rgb8.width() as u32; let dec_h = rgb8.height() as u32; if dec_w != width || dec_h != height { @@ -372,7 +528,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { height as i32, ); } - Instant::now() + (decode_finished_at, Instant::now(), true) } Err(e2) => { if !logged_mjpeg_fallback { @@ -386,62 +542,97 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { } } } - }; - t2_local + } }; - let t3 = Instant::now(); + + let mut buffer_ready_at = convert_finished_at; + let mut frame_draw_ms = None; + if let Some(overlay) = timestamp_overlay.as_mut() { + let overlay_started_at = Instant::now(); + overlay.draw(data_y, stride_y_usize, capture_wall_time_us); + let overlay_finished_at = Instant::now(); + frame_draw_ms = Some((overlay_finished_at - overlay_started_at).as_secs_f64() * 1000.0); + buffer_ready_at = overlay_finished_at; + } // Update RTP timestamp (monotonic, microseconds since start) frame.timestamp_us = start_ts.elapsed().as_micros() as i64; + // Build frame metadata from enabled packet trailer features + let user_ts = if args.attach_timestamp { Some(capture_wall_time_us) } else { None }; + let fid = if args.attach_frame_id { + let id = frame_counter; + frame_counter = frame_counter.wrapping_add(1); + Some(id) + } else { + None + }; + frame.frame_metadata = if user_ts.is_some() || fid.is_some() { + Some(FrameMetadata { user_timestamp_us: user_ts, frame_id: fid }) + } else { + None + }; rtc_source.capture_frame(&frame); - let t4 = Instant::now(); + let webrtc_capture_finished_at = Instant::now(); frames += 1; - // We already paced via interval; measure actual sleep time for logging only - let sleep_dur = iter_start - wait_start; // Per-iteration timing bookkeeping - let t_end = Instant::now(); - let get_ms = (t1 - t0).as_secs_f64() * 1000.0; - let decode_ms = (t2 - t1).as_secs_f64() * 1000.0; - let convert_ms = (t3 - t2).as_secs_f64() * 1000.0; - let capture_ms = (t4 - t3).as_secs_f64() * 1000.0; - let sleep_ms = sleep_dur.as_secs_f64() * 1000.0; - let iter_ms = (t_end - iter_start).as_secs_f64() * 1000.0; - sum_get_ms += get_ms; - sum_decode_ms += decode_ms; - sum_convert_ms += convert_ms; - sum_capture_ms += capture_ms; - sum_sleep_ms += sleep_ms; - sum_iter_ms += iter_ms; + timings + .paced_wait_ms + .record((paced_wait_finished_at - paced_wait_started_at).as_secs_f64() * 1000.0); + timings + .camera_frame_read_ms + .record((camera_frame_acquired_at - camera_capture_started_at).as_secs_f64() * 1000.0); + if used_decode_path { + timings + .decode_mjpeg_ms + .record((decode_finished_at - camera_frame_acquired_at).as_secs_f64() * 1000.0); + } + timings + .buffer_convert_ms + .record((convert_finished_at - decode_finished_at).as_secs_f64() * 1000.0); + if let Some(frame_draw_ms) = frame_draw_ms { + timings.frame_draw_ms.record(frame_draw_ms); + } + timings + .submit_to_webrtc_ms + .record((webrtc_capture_finished_at - buffer_ready_at).as_secs_f64() * 1000.0); + timings.capture_to_webrtc_total_ms.record( + (webrtc_capture_finished_at - camera_capture_started_at).as_secs_f64() * 1000.0, + ); if last_fps_log.elapsed() >= std::time::Duration::from_secs(2) { let secs = last_fps_log.elapsed().as_secs_f64(); let fps_est = frames as f64 / secs; - let n = frames.max(1) as f64; info!( - "Publishing video: {}x{}, ~{:.1} fps | avg ms: get {:.2}, decode {:.2}, convert {:.2}, capture {:.2}, sleep {:.2}, iter {:.2} | target {:.2}", + "Video status: {}x{} | ~{:.1} fps | target {:.2} ms", width, height, fps_est, - sum_get_ms / n, - sum_decode_ms / n, - sum_convert_ms / n, - sum_capture_ms / n, - sum_sleep_ms / n, - sum_iter_ms / n, target.as_secs_f64() * 1000.0, ); + info!("{}", format_timing_line(&timings)); frames = 0; - sum_get_ms = 0.0; - sum_decode_ms = 0.0; - sum_convert_ms = 0.0; - sum_capture_ms = 0.0; - sum_sleep_ms = 0.0; - sum_iter_ms = 0.0; + timings.reset(); last_fps_log = Instant::now(); } } Ok(()) } + +/// Build simulcast presets that match the SDK defaults but with a uniform frame rate. +/// The SDK's built-in `DEFAULT_SIMULCAST_PRESETS` use 15/20 fps for lower layers; +/// this keeps the same resolutions and bitrates but overrides fps to `target_fps`. +fn compute_simulcast_presets_30fps(width: u32, height: u32, target_fps: f64) -> Vec { + let ar = width as f32 / height as f32; + let defaults: &[VideoPreset] = if f32::abs(ar - 16.0 / 9.0) < f32::abs(ar - 4.0 / 3.0) { + video_presets::DEFAULT_SIMULCAST_PRESETS + } else { + livekit::options::video43::DEFAULT_SIMULCAST_PRESETS + }; + defaults + .iter() + .map(|p| VideoPreset::new(p.width, p.height, p.encoding.max_bitrate, target_fps)) + .collect() +} diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index 255f9f867..70291c894 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -1,10 +1,12 @@ use anyhow::Result; +use chrono::{DateTime, Utc}; use clap::Parser; use eframe::egui; use eframe::wgpu::{self, util::DeviceExt}; use egui_wgpu as egui_wgpu_backend; use egui_wgpu_backend::CallbackTrait; use futures::StreamExt; +use livekit::e2ee::{key_provider::*, E2eeOptions, EncryptionType}; use livekit::prelude::*; use livekit::webrtc::video_stream::native::NativeVideoStream; use livekit_api::access_token; @@ -17,7 +19,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::{Duration, Instant}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; async fn wait_for_shutdown(flag: Arc) { @@ -52,20 +54,33 @@ struct Args { /// Only subscribe to video from this participant identity #[arg(long)] participant: Option, + + /// Display user timestamp, current timestamp, and latency overlay + #[arg(long)] + display_timestamp: bool, + + /// Shared encryption key for E2EE (enables AES-GCM end-to-end encryption when set; must match publisher's key) + #[arg(long)] + e2ee_key: Option, } struct SharedYuv { width: u32, height: u32, - stride_y: u32, - stride_u: u32, - stride_v: u32, + y_bytes_per_row: u32, + uv_bytes_per_row: u32, y: Vec, u: Vec, v: Vec, codec: String, fps: f32, dirty: bool, + /// Time when the latest frame became available to the subscriber code. + received_at_us: Option, + /// Packet-trailer metadata from the most recent frame, if any. + frame_metadata: Option, + /// Whether the publisher advertised PTF_USER_TIMESTAMP in its track info. + has_user_timestamp: bool, } #[derive(Clone)] @@ -114,6 +129,86 @@ fn infer_quality_from_dims( } } +fn find_video_inbound_stats( + stats: &[livekit::webrtc::stats::RtcStats], +) -> Option { + stats.iter().find_map(|stat| match stat { + livekit::webrtc::stats::RtcStats::InboundRtp(inbound) if inbound.stream.kind == "video" => { + Some(inbound.clone()) + } + _ => None, + }) +} + +fn log_video_inbound_stats(stats: &[livekit::webrtc::stats::RtcStats]) { + let mut codec_by_id: HashMap = HashMap::new(); + for stat in stats { + if let livekit::webrtc::stats::RtcStats::Codec(codec) = stat { + codec_by_id.insert( + codec.rtc.id.clone(), + (codec.codec.mime_type.clone(), codec.codec.sdp_fmtp_line.clone()), + ); + } + } + + if let Some(inbound) = find_video_inbound_stats(stats) { + if let Some((mime, fmtp)) = codec_by_id.get(&inbound.stream.codec_id) { + info!("Inbound codec: {} (fmtp: {})", mime, fmtp); + } else { + info!("Inbound codec id: {}", inbound.stream.codec_id); + } + info!( + "Inbound current layer: {}x{} ~{:.1} fps, decoder: {}, power_efficient: {}", + inbound.inbound.frame_width, + inbound.inbound.frame_height, + inbound.inbound.frames_per_second, + inbound.inbound.decoder_implementation, + inbound.inbound.power_efficient_decoder + ); + } +} + +fn update_simulcast_quality_from_stats( + stats: &[livekit::webrtc::stats::RtcStats], + simulcast: &Arc>, +) { + let Some(inbound) = find_video_inbound_stats(stats) else { + return; + }; + let Some((fw, fh)) = simulcast_state_full_dims(simulcast) else { + return; + }; + + let q = infer_quality_from_dims( + fw, + fh, + inbound.inbound.frame_width as u32, + inbound.inbound.frame_height as u32, + ); + let mut sc = simulcast.lock(); + sc.active_quality = Some(q); +} + +/// Returns the current wall-clock time as microseconds since Unix epoch. +fn current_timestamp_us() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_micros() as u64 +} + +/// Format a user timestamp (microseconds since Unix epoch) as +/// `yyyy-mm-dd hh:mm:ss:xxx` where xxx is milliseconds. +fn format_timestamp_us(ts_us: u64) -> String { + DateTime::::from_timestamp_micros(ts_us as i64) + .map(|dt| { + dt.format("%Y-%m-%d %H:%M:%S:").to_string() + + &format!("{:03}", dt.timestamp_subsec_millis()) + }) + .unwrap_or_else(|| format!("")) +} + +fn format_optional_timestamp_us(ts_us: Option) -> String { + ts_us.map(format_timestamp_us).unwrap_or_else(|| "N/A".to_string()) +} + fn simulcast_state_full_dims(state: &Arc>) -> Option<(u32, u32)> { let sc = state.lock(); sc.full_dims @@ -162,65 +257,29 @@ async fn handle_track_subscribed( *active = Some(sid.clone()); } - // Update HUD codec label early (before first frame arrives) + // Update HUD codec label and feature flags early (before first frame arrives) { let mut s = shared.lock(); s.codec = codec; + s.has_user_timestamp = + publication.packet_trailer_features().contains(&PacketTrailerFeature::PtfUserTimestamp); } info!( - "Subscribed to video track: {} (sid {}) from {} - codec: {}, simulcast: {}, dimension: {}x{}", + "Subscribed to video track: {} (sid {}) from {} - codec: {}, simulcast: {}, dimension: {}x{}, packet_trailer_features: {:?}", publication.name(), publication.sid(), participant.identity(), publication.mime_type(), publication.simulcasted(), publication.dimension().0, - publication.dimension().1 + publication.dimension().1, + publication.packet_trailer_features(), ); - // Try to fetch inbound RTP/codec stats for more details - match video_track.get_stats().await { - Ok(stats) => { - let mut codec_by_id: HashMap = HashMap::new(); - let mut inbound: Option = None; - for s in stats.iter() { - match s { - livekit::webrtc::stats::RtcStats::Codec(c) => { - codec_by_id.insert( - c.rtc.id.clone(), - (c.codec.mime_type.clone(), c.codec.sdp_fmtp_line.clone()), - ); - } - livekit::webrtc::stats::RtcStats::InboundRtp(i) => { - if i.stream.kind == "video" { - inbound = Some(i.clone()); - } - } - _ => {} - } - } - - if let Some(i) = inbound { - if let Some((mime, fmtp)) = codec_by_id.get(&i.stream.codec_id) { - info!("Inbound codec: {} (fmtp: {})", mime, fmtp); - } else { - info!("Inbound codec id: {}", i.stream.codec_id); - } - info!( - "Inbound current layer: {}x{} ~{:.1} fps, decoder: {}, power_efficient: {}", - i.inbound.frame_width, - i.inbound.frame_height, - i.inbound.frames_per_second, - i.inbound.decoder_implementation, - i.inbound.power_efficient_decoder - ); - } - } - Err(e) => debug!("Failed to get stats for video track: {:?}", e), - } + let rtc_track = video_track.rtc_track(); - // Start background sink thread + // Start background sink thread immediately so stats lookup cannot delay first-frame handling. let shared2 = shared.clone(); let active_sid2 = active_sid.clone(); let my_sid = sid.clone(); @@ -236,13 +295,11 @@ async fn handle_track_subscribed( sc.active_quality = None; sc.publication = Some(publication.clone()); } - let simulcast2 = simulcast.clone(); std::thread::spawn(move || { - let mut sink = NativeVideoStream::new(video_track.rtc_track()); + let mut sink = NativeVideoStream::new(rtc_track); let mut frames: u64 = 0; let mut last_log = Instant::now(); let mut logged_first = false; - let mut last_stats = Instant::now(); let mut fps_window_frames: u64 = 0; let mut fps_window_start = Instant::now(); let mut fps_smoothed: f32 = 0.0; @@ -261,6 +318,7 @@ async fn handle_track_subscribed( } }); let Some(frame) = next else { break }; + let received_at_us = current_timestamp_us(); let w = frame.buffer.width(); let h = frame.buffer.height(); @@ -269,41 +327,40 @@ async fn handle_track_subscribed( logged_first = true; } - // Convert to I420 on CPU, but keep planes separate for GPU sampling let i420 = frame.buffer.to_i420(); let (sy, su, sv) = i420.strides(); let (dy, du, dv) = i420.data(); - let ch = (h + 1) / 2; + let width = w as u32; + let height = h as u32; + let uv_w = (width + 1) / 2; + let uv_h = (height + 1) / 2; + let y_bytes_per_row = align_up(width, 256); + let uv_bytes_per_row = align_up(uv_w, 256); - // Ensure capacity and copy full plane slices - let y_size = (sy * h) as usize; - let u_size = (su * ch) as usize; - let v_size = (sv * ch) as usize; - if y_buf.len() != y_size { - y_buf.resize(y_size, 0); - } - if u_buf.len() != u_size { - u_buf.resize(u_size, 0); - } - if v_buf.len() != v_size { - v_buf.resize(v_size, 0); - } - y_buf.copy_from_slice(dy); - u_buf.copy_from_slice(du); - v_buf.copy_from_slice(dv); + pack_plane(dy, sy as u32, width, height, y_bytes_per_row, &mut y_buf); + pack_plane(du, su as u32, uv_w, uv_h, uv_bytes_per_row, &mut u_buf); + pack_plane(dv, sv as u32, uv_w, uv_h, uv_bytes_per_row, &mut v_buf); // Swap buffers into shared state let mut s = shared2.lock(); - s.width = w as u32; - s.height = h as u32; - s.stride_y = sy as u32; - s.stride_u = su as u32; - s.stride_v = sv as u32; + s.width = width; + s.height = height; + s.y_bytes_per_row = y_bytes_per_row; + s.uv_bytes_per_row = uv_bytes_per_row; std::mem::swap(&mut s.y, &mut y_buf); std::mem::swap(&mut s.u, &mut u_buf); std::mem::swap(&mut s.v, &mut v_buf); s.dirty = true; + s.received_at_us = Some(received_at_us); + + s.frame_metadata = frame.frame_metadata; + + if !s.has_user_timestamp + && frame.frame_metadata.and_then(|m| m.user_timestamp_us).is_some() + { + s.has_user_timestamp = true; + } // Update smoothed FPS (~500ms window) fps_window_frames += 1; @@ -329,32 +386,6 @@ async fn handle_track_subscribed( frames = 0; last_log = Instant::now(); } - // Periodically infer active simulcast quality from inbound stats - if last_stats.elapsed() >= Duration::from_secs(1) { - if let Ok(stats) = rt_clone.block_on(video_track.get_stats()) { - let mut inbound: Option = None; - for s in stats.iter() { - if let livekit::webrtc::stats::RtcStats::InboundRtp(i) = s { - if i.stream.kind == "video" { - inbound = Some(i.clone()); - } - } - } - if let Some(i) = inbound { - if let Some((fw, fh)) = simulcast_state_full_dims(&simulcast2) { - let q = infer_quality_from_dims( - fw, - fh, - i.inbound.frame_width as u32, - i.inbound.frame_height as u32, - ); - let mut sc = simulcast2.lock(); - sc.active_quality = Some(q); - } - } - } - last_stats = Instant::now(); - } } info!("Video stream ended for {}", my_sid); // Clear active sid if still ours @@ -363,6 +394,42 @@ async fn handle_track_subscribed( *active = None; } }); + + let ctrl_c_stats = ctrl_c_received.clone(); + let active_sid_stats = active_sid.clone(); + let my_sid_stats = sid.clone(); + let simulcast_stats = simulcast.clone(); + tokio::spawn(async move { + let mut logged_initial = false; + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + if ctrl_c_stats.load(Ordering::Acquire) { + break; + } + if active_sid_stats.lock().as_ref() != Some(&my_sid_stats) { + break; + } + + match video_track.get_stats().await { + Ok(stats) => { + if !logged_initial { + log_video_inbound_stats(&stats); + logged_initial = true; + } + update_simulcast_quality_from_stats(&stats, &simulcast_stats); + } + Err(e) if !logged_initial => { + debug!("Failed to get stats for video track: {:?}", e); + logged_initial = true; + } + Err(_) => {} + } + + interval.tick().await; + } + }); } fn clear_hud_and_simulcast(shared: &Arc>, simulcast: &Arc>) { @@ -370,6 +437,9 @@ fn clear_hud_and_simulcast(shared: &Arc>, simulcast: &Arc>, ctrl_c_received: Arc, locked_aspect: Option, + display_timestamp: bool, + /// Cached timestamp overlay text to avoid layout churn on every repaint. + last_timestamp_text: String, } impl eframe::App for VideoApp { @@ -459,16 +532,31 @@ impl eframe::App for VideoApp { ); }); - // Resolution/FPS overlay: top-left + // Resolution/FPS overlay: top-right egui::Area::new("video_hud".into()) - .anchor(egui::Align2::LEFT_TOP, egui::vec2(10.0, 10.0)) + .anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 10.0)) .interactable(false) .show(ctx, |ui| { let s = self.shared.lock(); if s.width == 0 || s.height == 0 || s.fps <= 0.0 || s.codec.is_empty() { return; } - let text = format!("{} {}x{} {:.1}fps", s.codec, s.width, s.height, s.fps); + let mut text = format!("{} {}x{} {:.1}fps", s.codec, s.width, s.height, s.fps); + let sc = self.simulcast.lock(); + if sc.available { + let layer = sc + .active_quality + .map(|q| match q { + livekit::track::VideoQuality::Low => "Low", + livekit::track::VideoQuality::Medium => "Medium", + livekit::track::VideoQuality::High => "High", + }) + .unwrap_or("?"); + text.push_str(&format!("\nSimulcast: {}", layer)); + } else { + text.push_str("\nSimulcast: off"); + } + drop(sc); egui::Frame::NONE .fill(egui::Color32::from_black_alpha(140)) .corner_radius(egui::CornerRadius::same(4)) @@ -481,6 +569,63 @@ impl eframe::App for VideoApp { }); }); + if self.display_timestamp { + let s = self.shared.lock(); + let meta = s.frame_metadata; + let receive_us = s.received_at_us; + let has_user_timestamp = s.has_user_timestamp; + drop(s); + + let publish_us = meta.and_then(|m| m.user_timestamp_us); + let frame_id = meta.and_then(|m| m.frame_id); + + if publish_us.is_some() || frame_id.is_some() { + let frame_id_line = match frame_id { + Some(fid) => format!("Frame ID: {}", fid), + None => "Frame ID: N/A".to_string(), + }; + if has_user_timestamp { + let latency = match (publish_us, receive_us) { + (Some(pub_ts), Some(recv_ts)) => { + format!("{:.1}ms", recv_ts.saturating_sub(pub_ts) as f64 / 1000.0) + } + _ => "N/A".to_string(), + }; + self.last_timestamp_text = format!( + "{}\nPublish: {}\nReceive: {}\nLatency: {}", + frame_id_line, + format_optional_timestamp_us(publish_us), + format_optional_timestamp_us(receive_us), + latency, + ); + } else { + self.last_timestamp_text = frame_id_line; + } + } + + if !self.last_timestamp_text.is_empty() { + egui::Area::new("timestamp_hud".into()) + .anchor(egui::Align2::LEFT_TOP, egui::vec2(10.0, 10.0)) + .interactable(false) + .show(ctx, |ui| { + egui::Frame::NONE + .fill(egui::Color32::from_black_alpha(140)) + .corner_radius(egui::CornerRadius::same(4)) + .inner_margin(egui::Margin::same(6)) + .show(ui, |ui| { + ui.add( + egui::Label::new( + egui::RichText::new(&self.last_timestamp_text) + .color(egui::Color32::WHITE) + .monospace(), + ) + .extend(), + ); + }); + }); + } + } + // Simulcast layer controls: bottom-left overlay egui::Area::new("simulcast_controls".into()) .anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0)) @@ -560,23 +705,45 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { info!("Connecting to LiveKit room '{}' as '{}'...", args.room_name, args.identity); let mut room_options = RoomOptions::default(); room_options.auto_subscribe = true; + room_options.dynacast = true; + room_options.adaptive_stream = true; + + // Configure E2EE if an encryption key is provided + if let Some(ref e2ee_key) = args.e2ee_key { + let key_provider = KeyProvider::with_shared_key( + KeyProviderOptions::default(), + e2ee_key.as_bytes().to_vec(), + ); + room_options.encryption = + Some(E2eeOptions { encryption_type: EncryptionType::Gcm, key_provider }); + info!("E2EE enabled with AES-GCM encryption"); + } + let (room, _) = Room::connect(&url, &token, room_options).await?; let room = Arc::new(room); info!("Connected: {} - {}", room.name(), room.sid().await); + // Enable E2EE after connection + if args.e2ee_key.is_some() { + room.e2ee_manager().set_enabled(true); + info!("End-to-end encryption activated"); + } + // Shared YUV buffer for UI/GPU let shared = Arc::new(Mutex::new(SharedYuv { width: 0, height: 0, - stride_y: 0, - stride_u: 0, - stride_v: 0, + y_bytes_per_row: 0, + uv_bytes_per_row: 0, y: Vec::new(), u: Vec::new(), v: Vec::new(), codec: String::new(), fps: 0.0, dirty: false, + received_at_us: None, + frame_metadata: None, + has_user_timestamp: false, })); // Subscribe to room events: on first video track, start sink task @@ -628,8 +795,10 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { simulcast, ctrl_c_received: ctrl_c_received.clone(), locked_aspect: None, + display_timestamp: args.display_timestamp, + last_timestamp_text: String::new(), }; - let native_options = eframe::NativeOptions::default(); + let native_options = eframe::NativeOptions { vsync: false, ..Default::default() }; eframe::run_native( "LiveKit Video Subscriber", native_options, @@ -663,6 +832,9 @@ struct YuvGpuState { y_pad_w: u32, uv_pad_w: u32, dims: (u32, u32), + upload_y: Vec, + upload_u: Vec, + upload_v: Vec, } impl YuvGpuState { @@ -708,6 +880,32 @@ fn align_up(value: u32, alignment: u32) -> u32 { ((value + alignment - 1) / alignment) * alignment } +fn resize_reused_buffer(buf: &mut Vec, len: usize) { + if buf.len() != len { + buf.resize(len, 0); + } +} + +fn pack_plane( + src: &[u8], + src_stride: u32, + row_width: u32, + rows: u32, + dst_stride: u32, + dst: &mut Vec, +) { + resize_reused_buffer(dst, (dst_stride * rows) as usize); + for row in 0..rows { + let src_off = (row * src_stride) as usize; + let dst_off = (row * dst_stride) as usize; + let row_end = dst_off + row_width as usize; + dst[dst_off..row_end].copy_from_slice(&src[src_off..src_off + row_width as usize]); + if dst_stride > row_width { + dst[row_end..dst_off + dst_stride as usize].fill(0); + } + } +} + #[repr(C)] #[derive(Clone, Copy, bytemuck::Pod, bytemuck::Zeroable)] struct ParamsUniform { @@ -908,24 +1106,34 @@ impl CallbackTrait for YuvPaintCallback { y_pad_w: 256, uv_pad_w: 256, dims: (0, 0), + upload_y: Vec::new(), + upload_u: Vec::new(), + upload_v: Vec::new(), }; resources.insert(new_state); } let state = resources.get_mut::().unwrap(); - // Upload planes when marked dirty - // Recreate textures/bind group on size change - if state.dims != (shared.width, shared.height) { - let y_pad_w = align_up(shared.width, 256); - let uv_w = (shared.width + 1) / 2; + let dims = (shared.width, shared.height); + let upload_row_bytes = (shared.y_bytes_per_row, shared.uv_bytes_per_row); + let has_dirty_frame = if shared.dirty { + std::mem::swap(&mut state.upload_y, &mut shared.y); + std::mem::swap(&mut state.upload_u, &mut shared.u); + std::mem::swap(&mut state.upload_v, &mut shared.v); + shared.dirty = false; + true + } else { + false + }; + drop(shared); + + // Recreate textures/bind group on size change. + if state.dims != dims { + let y_pad_w = align_up(dims.0, 256); + let uv_w = (dims.0 + 1) / 2; let uv_pad_w = align_up(uv_w, 256); - let (y_tex, u_tex, v_tex, y_view, u_view, v_view) = YuvGpuState::create_textures( - device, - shared.width, - shared.height, - y_pad_w, - uv_pad_w, - ); + let (y_tex, u_tex, v_tex, y_view, u_view, v_view) = + YuvGpuState::create_textures(device, dims.0, dims.1, y_pad_w, uv_pad_w); let bind_group = device.create_bind_group(&wgpu::BindGroupDescriptor { label: Some("yuv_bind_group"), layout: &state.bind_layout, @@ -961,24 +1169,14 @@ impl CallbackTrait for YuvPaintCallback { state.bind_group = bind_group; state.y_pad_w = y_pad_w; state.uv_pad_w = uv_pad_w; - state.dims = (shared.width, shared.height); + state.dims = dims; } - if shared.dirty { - let y_bytes_per_row = align_up(shared.width, 256); - let uv_w = (shared.width + 1) / 2; - let uv_h = (shared.height + 1) / 2; - let uv_bytes_per_row = align_up(uv_w, 256); + if has_dirty_frame { + let uv_w = (dims.0 + 1) / 2; + let uv_h = (dims.1 + 1) / 2; - // Pack and upload Y - if shared.stride_y >= shared.width { - let mut packed = vec![0u8; (y_bytes_per_row * shared.height) as usize]; - for row in 0..shared.height { - let src = - &shared.y[(row * shared.stride_y) as usize..][..shared.width as usize]; - let dst_off = (row * y_bytes_per_row) as usize; - packed[dst_off..dst_off + shared.width as usize].copy_from_slice(src); - } + if upload_row_bytes.0 >= dims.0 { queue.write_texture( wgpu::TexelCopyTextureInfo { texture: &state.y_tex, @@ -986,31 +1184,17 @@ impl CallbackTrait for YuvPaintCallback { origin: wgpu::Origin3d::ZERO, aspect: wgpu::TextureAspect::All, }, - &packed, + &state.upload_y, wgpu::TexelCopyBufferLayout { offset: 0, - bytes_per_row: Some(y_bytes_per_row), - rows_per_image: Some(shared.height), - }, - wgpu::Extent3d { - width: state.y_pad_w, - height: shared.height, - depth_or_array_layers: 1, + bytes_per_row: Some(upload_row_bytes.0), + rows_per_image: Some(dims.1), }, + wgpu::Extent3d { width: dims.0, height: dims.1, depth_or_array_layers: 1 }, ); } - // Pack and upload U,V - if shared.stride_u >= uv_w && shared.stride_v >= uv_w { - let mut packed_u = vec![0u8; (uv_bytes_per_row * uv_h) as usize]; - let mut packed_v = vec![0u8; (uv_bytes_per_row * uv_h) as usize]; - for row in 0..uv_h { - let src_u = &shared.u[(row * shared.stride_u) as usize..][..uv_w as usize]; - let src_v = &shared.v[(row * shared.stride_v) as usize..][..uv_w as usize]; - let dst_off = (row * uv_bytes_per_row) as usize; - packed_u[dst_off..dst_off + uv_w as usize].copy_from_slice(src_u); - packed_v[dst_off..dst_off + uv_w as usize].copy_from_slice(src_v); - } + if upload_row_bytes.1 >= uv_w { queue.write_texture( wgpu::TexelCopyTextureInfo { texture: &state.u_tex, @@ -1018,17 +1202,13 @@ impl CallbackTrait for YuvPaintCallback { origin: wgpu::Origin3d::ZERO, aspect: wgpu::TextureAspect::All, }, - &packed_u, + &state.upload_u, wgpu::TexelCopyBufferLayout { offset: 0, - bytes_per_row: Some(uv_bytes_per_row), + bytes_per_row: Some(upload_row_bytes.1), rows_per_image: Some(uv_h), }, - wgpu::Extent3d { - width: state.uv_pad_w, - height: uv_h, - depth_or_array_layers: 1, - }, + wgpu::Extent3d { width: uv_w, height: uv_h, depth_or_array_layers: 1 }, ); queue.write_texture( wgpu::TexelCopyTextureInfo { @@ -1037,30 +1217,26 @@ impl CallbackTrait for YuvPaintCallback { origin: wgpu::Origin3d::ZERO, aspect: wgpu::TextureAspect::All, }, - &packed_v, + &state.upload_v, wgpu::TexelCopyBufferLayout { offset: 0, - bytes_per_row: Some(uv_bytes_per_row), + bytes_per_row: Some(upload_row_bytes.1), rows_per_image: Some(uv_h), }, - wgpu::Extent3d { - width: state.uv_pad_w, - height: uv_h, - depth_or_array_layers: 1, - }, + wgpu::Extent3d { width: uv_w, height: uv_h, depth_or_array_layers: 1 }, ); } - // Update params uniform - let params = ParamsUniform { - src_w: shared.width, - src_h: shared.height, - y_tex_w: state.y_pad_w, - uv_tex_w: state.uv_pad_w, - }; - queue.write_buffer(&state.params_buf, 0, bytemuck::bytes_of(¶ms)); - - shared.dirty = false; + queue.write_buffer( + &state.params_buf, + 0, + bytemuck::bytes_of(&ParamsUniform { + src_w: dims.0, + src_h: dims.1, + y_tex_w: state.y_pad_w, + uv_tex_w: state.uv_pad_w, + }), + ); } Vec::new() @@ -1072,26 +1248,15 @@ impl CallbackTrait for YuvPaintCallback { render_pass: &mut wgpu::RenderPass<'static>, resources: &egui_wgpu_backend::CallbackResources, ) { - // Acquire device/queue via screen_descriptor? Not available; use resources to fetch our state - let shared = self.shared.lock(); - if shared.width == 0 || shared.height == 0 { - return; - } - - // Build pipeline and textures on first paint or on resize let Some(state) = resources.get::() else { - // prepare may not have created the state yet (race with first frame); skip this paint return; }; - - if state.dims != (shared.width, shared.height) { - // We cannot rebuild here (no device access); skip drawing until next frame where prepare will rebuild + if state.dims == (0, 0) { return; } render_pass.set_pipeline(&state.pipeline); render_pass.set_bind_group(0, &state.bind_group, &[]); - // Fullscreen triangle without vertex buffer render_pass.draw(0..3, 0..1); } } diff --git a/examples/local_video/src/timestamp_burn.rs b/examples/local_video/src/timestamp_burn.rs new file mode 100644 index 000000000..e4c557f2f --- /dev/null +++ b/examples/local_video/src/timestamp_burn.rs @@ -0,0 +1,161 @@ +use chrono::{DateTime, Datelike, Timelike, Utc}; + +const TIMESTAMP_TEXT_LEN: usize = 23; // YYYY-MM-DD HH:MM:SS:SSS +const TIMESTAMP_GLYPH_COUNT: usize = 13; // 0-9, :, -, space +const TIMESTAMP_GLYPH_WIDTH: usize = 5; +const TIMESTAMP_GLYPH_HEIGHT: usize = 7; +const TIMESTAMP_GLYPH_SCALE: usize = 4; +const TIMESTAMP_GLYPH_SPACING: usize = 2; +const TIMESTAMP_PADDING_X: usize = 4; +const TIMESTAMP_PADDING_Y: usize = 4; +const TIMESTAMP_MARGIN: usize = 8; +const TIMESTAMP_BG_LUMA: u8 = 16; +const TIMESTAMP_FG_LUMA: u8 = 235; +const TIMESTAMP_RASTER_WIDTH: usize = TIMESTAMP_GLYPH_WIDTH * TIMESTAMP_GLYPH_SCALE; +const TIMESTAMP_RASTER_HEIGHT: usize = TIMESTAMP_GLYPH_HEIGHT * TIMESTAMP_GLYPH_SCALE; +const TIMESTAMP_GLYPH_COLON: u8 = 10; +const TIMESTAMP_GLYPH_DASH: u8 = 11; +const TIMESTAMP_GLYPH_SPACE: u8 = 12; + +type TimestampGlyph = [[u8; TIMESTAMP_RASTER_WIDTH]; TIMESTAMP_RASTER_HEIGHT]; + +const TIMESTAMP_GLYPH_PATTERNS: [[u8; TIMESTAMP_GLYPH_HEIGHT]; TIMESTAMP_GLYPH_COUNT] = [ + [0b01110, 0b10001, 0b10011, 0b10101, 0b11001, 0b10001, 0b01110], // 0 + [0b00100, 0b01100, 0b00100, 0b00100, 0b00100, 0b00100, 0b01110], // 1 + [0b01110, 0b10001, 0b00001, 0b00010, 0b00100, 0b01000, 0b11111], // 2 + [0b11110, 0b00001, 0b00001, 0b01110, 0b00001, 0b00001, 0b11110], // 3 + [0b00010, 0b00110, 0b01010, 0b10010, 0b11111, 0b00010, 0b00010], // 4 + [0b11111, 0b10000, 0b10000, 0b11110, 0b00001, 0b00001, 0b11110], // 5 + [0b01110, 0b10000, 0b10000, 0b11110, 0b10001, 0b10001, 0b01110], // 6 + [0b11111, 0b00001, 0b00010, 0b00100, 0b01000, 0b01000, 0b01000], // 7 + [0b01110, 0b10001, 0b10001, 0b01110, 0b10001, 0b10001, 0b01110], // 8 + [0b01110, 0b10001, 0b10001, 0b01111, 0b00001, 0b00001, 0b01110], // 9 + [0b00000, 0b00000, 0b00100, 0b00000, 0b00100, 0b00000, 0b00000], // : + [0b00000, 0b00000, 0b00000, 0b01110, 0b00000, 0b00000, 0b00000], // - + [0b00000, 0b00000, 0b00000, 0b00000, 0b00000, 0b00000, 0b00000], // space +]; + +pub struct TimestampOverlay { + glyphs: [TimestampGlyph; TIMESTAMP_GLYPH_COUNT], + glyph_ids: [u8; TIMESTAMP_TEXT_LEN], + box_x: usize, + box_y: usize, + box_width: usize, + box_height: usize, + text_x: usize, + text_y: usize, + enabled: bool, +} + +impl TimestampOverlay { + pub fn new(frame_width: u32, frame_height: u32) -> Self { + let text_width = TIMESTAMP_TEXT_LEN * TIMESTAMP_RASTER_WIDTH + + (TIMESTAMP_TEXT_LEN.saturating_sub(1)) * TIMESTAMP_GLYPH_SPACING; + let box_width = text_width + TIMESTAMP_PADDING_X * 2; + let box_height = TIMESTAMP_RASTER_HEIGHT + TIMESTAMP_PADDING_Y * 2; + let frame_width = frame_width as usize; + let frame_height = frame_height as usize; + let enabled = frame_width >= box_width + TIMESTAMP_MARGIN + && frame_height >= box_height + TIMESTAMP_MARGIN; + let box_x = TIMESTAMP_MARGIN; + let box_y = frame_height.saturating_sub(TIMESTAMP_MARGIN + box_height); + + Self { + glyphs: rasterize_timestamp_glyphs(), + glyph_ids: [0; TIMESTAMP_TEXT_LEN], + box_x, + box_y, + box_width, + box_height, + text_x: box_x + TIMESTAMP_PADDING_X, + text_y: box_y + TIMESTAMP_PADDING_Y, + enabled, + } + } + + pub fn draw(&mut self, data_y: &mut [u8], stride_y: usize, timestamp_us: u64) { + if !self.enabled { + return; + } + + format_timestamp_glyphs(timestamp_us, &mut self.glyph_ids); + + for row in 0..self.box_height { + let row_start = (self.box_y + row) * stride_y + self.box_x; + let row_end = row_start + self.box_width; + data_y[row_start..row_end].fill(TIMESTAMP_BG_LUMA); + } + + for (glyph_pos, glyph_id) in self.glyph_ids.iter().copied().enumerate() { + let glyph = &self.glyphs[glyph_id as usize]; + let glyph_x = + self.text_x + glyph_pos * (TIMESTAMP_RASTER_WIDTH + TIMESTAMP_GLYPH_SPACING); + for (row, glyph_row) in glyph.iter().enumerate() { + let row_start = (self.text_y + row) * stride_y + glyph_x; + let row_end = row_start + TIMESTAMP_RASTER_WIDTH; + data_y[row_start..row_end].copy_from_slice(glyph_row); + } + } + } +} + +fn rasterize_timestamp_glyphs() -> [TimestampGlyph; TIMESTAMP_GLYPH_COUNT] { + let mut glyphs = [[[TIMESTAMP_BG_LUMA; TIMESTAMP_RASTER_WIDTH]; TIMESTAMP_RASTER_HEIGHT]; + TIMESTAMP_GLYPH_COUNT]; + + for (glyph_idx, pattern) in TIMESTAMP_GLYPH_PATTERNS.iter().enumerate() { + for (src_y, row_bits) in pattern.iter().copied().enumerate() { + for scale_y in 0..TIMESTAMP_GLYPH_SCALE { + let dst_row = &mut glyphs[glyph_idx][src_y * TIMESTAMP_GLYPH_SCALE + scale_y]; + for src_x in 0..TIMESTAMP_GLYPH_WIDTH { + let bit = 1 << (TIMESTAMP_GLYPH_WIDTH - 1 - src_x); + if row_bits & bit != 0 { + let dst_x = src_x * TIMESTAMP_GLYPH_SCALE; + dst_row[dst_x..dst_x + TIMESTAMP_GLYPH_SCALE].fill(TIMESTAMP_FG_LUMA); + } + } + } + } + } + + glyphs +} + +fn format_timestamp_glyphs(timestamp_us: u64, out: &mut [u8; TIMESTAMP_TEXT_LEN]) { + let Some(dt) = DateTime::::from_timestamp_micros(timestamp_us as i64) else { + out.fill(0); + return; + }; + + write_four_digits(&mut out[0..4], dt.year_ce().1); + out[4] = TIMESTAMP_GLYPH_DASH; + write_two_digits(&mut out[5..7], dt.month()); + out[7] = TIMESTAMP_GLYPH_DASH; + write_two_digits(&mut out[8..10], dt.day()); + out[10] = TIMESTAMP_GLYPH_SPACE; + write_two_digits(&mut out[11..13], dt.hour()); + out[13] = TIMESTAMP_GLYPH_COLON; + write_two_digits(&mut out[14..16], dt.minute()); + out[16] = TIMESTAMP_GLYPH_COLON; + write_two_digits(&mut out[17..19], dt.second()); + out[19] = TIMESTAMP_GLYPH_COLON; + write_three_digits(&mut out[20..23], dt.timestamp_subsec_millis()); +} + +fn write_two_digits(dst: &mut [u8], value: u32) { + dst[0] = (value / 10) as u8; + dst[1] = (value % 10) as u8; +} + +fn write_three_digits(dst: &mut [u8], value: u32) { + dst[0] = (value / 100) as u8; + dst[1] = ((value / 10) % 10) as u8; + dst[2] = (value % 10) as u8; +} + +fn write_four_digits(dst: &mut [u8], value: u32) { + dst[0] = ((value / 1_000) % 10) as u8; + dst[1] = ((value / 100) % 10) as u8; + dst[2] = ((value / 10) % 10) as u8; + dst[3] = (value % 10) as u8; +} diff --git a/examples/screensharing/src/lib.rs b/examples/screensharing/src/lib.rs index 7902199c3..f8e26bec6 100644 --- a/examples/screensharing/src/lib.rs +++ b/examples/screensharing/src/lib.rs @@ -185,8 +185,9 @@ mod test { let callback = { let mut frame_buffer = VideoFrame { rotation: VideoRotation::VideoRotation0, - buffer: I420Buffer::new(1, 1), timestamp_us: 0, + frame_metadata: None, + buffer: I420Buffer::new(1, 1), }; move |result: Result| { let frame = match result { diff --git a/examples/wgpu_room/src/logo_track.rs b/examples/wgpu_room/src/logo_track.rs index a7d46dee0..4e184f661 100644 --- a/examples/wgpu_room/src/logo_track.rs +++ b/examples/wgpu_room/src/logo_track.rs @@ -116,8 +116,9 @@ impl LogoTrack { framebuffer: Arc::new(Mutex::new(vec![0u8; FB_WIDTH * FB_HEIGHT * 4])), video_frame: Arc::new(Mutex::new(VideoFrame { rotation: VideoRotation::VideoRotation0, - buffer: I420Buffer::new(FB_WIDTH as u32, FB_HEIGHT as u32), timestamp_us: 0, + frame_metadata: None, + buffer: I420Buffer::new(FB_WIDTH as u32, FB_HEIGHT as u32), })), pos: (0, 0), direction: (1, 1), diff --git a/libwebrtc/src/lib.rs b/libwebrtc/src/lib.rs index e0c1482d0..bf4ad8294 100644 --- a/libwebrtc/src/lib.rs +++ b/libwebrtc/src/lib.rs @@ -68,7 +68,9 @@ pub mod video_track; pub mod native { pub use webrtc_sys::webrtc::ffi::create_random_uuid; - pub use crate::imp::{apm, audio_mixer, audio_resampler, frame_cryptor, yuv_helper}; + pub use crate::imp::{ + apm, audio_mixer, audio_resampler, frame_cryptor, packet_trailer, yuv_helper, + }; } #[cfg(target_os = "android")] diff --git a/libwebrtc/src/native/frame_cryptor.rs b/libwebrtc/src/native/frame_cryptor.rs index 0a076bf9a..7f463f900 100644 --- a/libwebrtc/src/native/frame_cryptor.rs +++ b/libwebrtc/src/native/frame_cryptor.rs @@ -19,8 +19,8 @@ use parking_lot::Mutex; use webrtc_sys::frame_cryptor::{self as sys_fc}; use crate::{ - peer_connection_factory::PeerConnectionFactory, rtp_receiver::RtpReceiver, - rtp_sender::RtpSender, + native::packet_trailer::PacketTrailerHandler, peer_connection_factory::PeerConnectionFactory, + rtp_receiver::RtpReceiver, rtp_sender::RtpSender, }; pub type OnStateChange = Box; @@ -185,6 +185,10 @@ impl FrameCryptor { pub fn on_state_change(&self, handler: Option) { *self.observer.state_change_handler.lock() = handler; } + + pub fn set_packet_trailer_handler(&self, handler: &PacketTrailerHandler) { + self.sys_handle.set_packet_trailer_handler(handler.sys_handle()); + } } #[derive(Clone)] diff --git a/libwebrtc/src/native/media_stream.rs b/libwebrtc/src/native/media_stream.rs index 10180d79a..67b13ec57 100644 --- a/libwebrtc/src/native/media_stream.rs +++ b/libwebrtc/src/native/media_stream.rs @@ -43,7 +43,7 @@ impl MediaStream { self.sys_handle .get_video_tracks() .into_iter() - .map(|t| video_track::RtcVideoTrack { handle: RtcVideoTrack { sys_handle: t.ptr } }) + .map(|t| video_track::RtcVideoTrack { handle: RtcVideoTrack::new(t.ptr) }) .collect() } } diff --git a/libwebrtc/src/native/media_stream_track.rs b/libwebrtc/src/native/media_stream_track.rs index 36424986b..43165e1b2 100644 --- a/libwebrtc/src/native/media_stream_track.rs +++ b/libwebrtc/src/native/media_stream_track.rs @@ -44,7 +44,7 @@ pub fn new_media_stream_track( }) } else if sys_handle.kind() == MEDIA_TYPE_VIDEO { MediaStreamTrack::Video(video_track::RtcVideoTrack { - handle: RtcVideoTrack { sys_handle: unsafe { media_to_video(sys_handle) } }, + handle: RtcVideoTrack::new(unsafe { media_to_video(sys_handle) }), }) } else { panic!("unknown track kind") diff --git a/libwebrtc/src/native/mod.rs b/libwebrtc/src/native/mod.rs index 183f5ab66..de56e3345 100644 --- a/libwebrtc/src/native/mod.rs +++ b/libwebrtc/src/native/mod.rs @@ -27,6 +27,7 @@ pub mod frame_cryptor; pub mod ice_candidate; pub mod media_stream; pub mod media_stream_track; +pub mod packet_trailer; pub mod peer_connection; pub mod peer_connection_factory; pub mod rtp_parameters; diff --git a/libwebrtc/src/native/packet_trailer.rs b/libwebrtc/src/native/packet_trailer.rs new file mode 100644 index 000000000..b06e37c55 --- /dev/null +++ b/libwebrtc/src/native/packet_trailer.rs @@ -0,0 +1,134 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Packet trailer support for end-to-end frame metadata propagation. +//! +//! This module provides functionality to embed user-supplied metadata +//! in encoded video frames as trailers. The timestamps/frameIDs are preserved +//! through the WebRTC pipeline and can be extracted on the receiver side. +//! +//! On the send side, user timestamps/frameIDs are stored in the handler's internal +//! map keyed by RTP timestamp. When the encoder produces a frame, +//! the transformer looks up the metadata via the frame's CaptureTime(). +//! +//! On the receive side, extracted frame metadata is stored in an +//! internal map keyed by RTP timestamp. Decoded frames look up their +//! metadata via lookup_frame_metadata(rtp_timestamp). + +use cxx::SharedPtr; +use webrtc_sys::packet_trailer::ffi as sys_pt; + +use crate::{ + peer_connection_factory::PeerConnectionFactory, rtp_receiver::RtpReceiver, + rtp_sender::RtpSender, +}; + +/// Handler for packet trailer embedding/extraction on RTP streams. +/// +/// For sender side: Stores frame metadata keyed by capture timestamp +/// and embeds them as binary payload trailers on encoded frames before they +/// are sent. Use `store_frame_metadata()` to associate metadata with +/// a captured frame. +/// +/// For receiver side: Extracts frame metadata from received frames +/// and makes them available for retrieval via `lookup_frame_metadata()`. +#[derive(Clone)] +pub struct PacketTrailerHandler { + sys_handle: SharedPtr, +} + +impl PacketTrailerHandler { + /// Enable or disable timestamp embedding/extraction. + pub fn set_enabled(&self, enabled: bool) { + self.sys_handle.set_enabled(enabled); + } + + /// Check if timestamp embedding/extraction is enabled. + pub fn enabled(&self) -> bool { + self.sys_handle.enabled() + } + + /// Lookup the frame metadata for a given RTP timestamp (receiver side). + /// Returns `Some((user_timestamp_us, frame_id))` if found, `None` otherwise. + /// The entry is removed from the map after a successful lookup. + pub fn lookup_frame_metadata(&self, rtp_timestamp: u32) -> Option<(u64, u32)> { + let ts = self.sys_handle.lookup_timestamp(rtp_timestamp); + if ts != u64::MAX { + let frame_id = self.sys_handle.last_lookup_frame_id(); + Some((ts, frame_id)) + } else { + None + } + } + + /// Store frame metadata for a given capture timestamp (sender side). + /// + /// The `capture_timestamp_us` must be the TimestampAligner-adjusted + /// timestamp (as produced by `VideoTrackSource::on_captured_frame`), + /// NOT the original `timestamp_us` from the VideoFrame. The transformer + /// looks up the metadata by the frame's `CaptureTime()` which is + /// derived from the aligned value. + /// + /// In normal usage this is called automatically by the C++ layer -- + /// callers should set `user_timestamp_us` and `frame_id` on the + /// `VideoFrame` and let `capture_frame` / `on_captured_frame` handle + /// the rest. + pub fn store_frame_metadata( + &self, + capture_timestamp_us: i64, + user_timestamp_us: u64, + frame_id: u32, + ) { + self.sys_handle.store_frame_metadata(capture_timestamp_us, user_timestamp_us, frame_id); + } + + pub(crate) fn sys_handle(&self) -> SharedPtr { + self.sys_handle.clone() + } +} + +/// Create a sender-side packet trailer handler. +/// +/// This handler will embed frame metadata into encoded frames before +/// they are packetized and sent. Use `store_frame_metadata()` to +/// associate metadata with a captured frame's capture timestamp. +pub fn create_sender_handler( + peer_factory: &PeerConnectionFactory, + sender: &RtpSender, +) -> PacketTrailerHandler { + PacketTrailerHandler { + sys_handle: sys_pt::new_packet_trailer_sender( + peer_factory.handle.sys_handle.clone(), + sender.handle.sys_handle.clone(), + ), + } +} + +/// Create a receiver-side packet trailer handler. +/// +/// This handler will extract frame metadata from received frames +/// and store them in a map keyed by RTP timestamp. Use +/// `lookup_frame_metadata(rtp_timestamp)` to retrieve the metadata +/// for a specific decoded frame. +pub fn create_receiver_handler( + peer_factory: &PeerConnectionFactory, + receiver: &RtpReceiver, +) -> PacketTrailerHandler { + PacketTrailerHandler { + sys_handle: sys_pt::new_packet_trailer_receiver( + peer_factory.handle.sys_handle.clone(), + receiver.handle.sys_handle.clone(), + ), + } +} diff --git a/libwebrtc/src/native/peer_connection_factory.rs b/libwebrtc/src/native/peer_connection_factory.rs index 4edc63047..ae082aecc 100644 --- a/libwebrtc/src/native/peer_connection_factory.rs +++ b/libwebrtc/src/native/peer_connection_factory.rs @@ -46,7 +46,6 @@ impl Default for PeerConnectionFactory { if log_sink.is_none() { *log_sink = Some(sys_rtc::ffi::new_log_sink(|msg, _| { let msg = msg.strip_suffix("\r\n").or(msg.strip_suffix('\n')).unwrap_or(&msg); - log::debug!(target: "libwebrtc", "{}", msg); })); } @@ -76,11 +75,9 @@ impl PeerConnectionFactory { pub fn create_video_track(&self, label: &str, source: NativeVideoSource) -> RtcVideoTrack { RtcVideoTrack { - handle: imp_vt::RtcVideoTrack { - sys_handle: self - .sys_handle - .create_video_track(label.to_string(), source.handle.sys_handle()), - }, + handle: imp_vt::RtcVideoTrack::new( + self.sys_handle.create_video_track(label.to_string(), source.handle.sys_handle()), + ), } } diff --git a/libwebrtc/src/native/video_source.rs b/libwebrtc/src/native/video_source.rs index a6e78e014..619a0ea77 100644 --- a/libwebrtc/src/native/video_source.rs +++ b/libwebrtc/src/native/video_source.rs @@ -23,6 +23,7 @@ use parking_lot::Mutex; use webrtc_sys::{video_frame as vf_sys, video_frame::ffi::VideoRotation, video_track as vt_sys}; use crate::{ + native::packet_trailer::PacketTrailerHandler, video_frame::{I420Buffer, VideoBuffer, VideoFrame}, video_source::VideoResolution, }; @@ -80,7 +81,14 @@ impl NativeVideoSource { let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); builder.pin_mut().set_timestamp_us(now.as_micros() as i64); - source.sys_handle.on_captured_frame(&builder.pin_mut().build()); + source.sys_handle.on_captured_frame( + &builder.pin_mut().build(), + &vt_sys::ffi::FrameMetadata { + has_packet_trailer: false, + user_timestamp_us: 0, + frame_id: 0, + }, + ); } } }); @@ -93,22 +101,44 @@ impl NativeVideoSource { } pub fn capture_frame>(&self, frame: &VideoFrame) { - let mut inner = self.inner.lock(); - inner.captured_frames += 1; - let mut builder = vf_sys::ffi::new_video_frame_builder(); builder.pin_mut().set_rotation(frame.rotation.into()); builder.pin_mut().set_video_frame_buffer(frame.buffer.as_ref().sys_handle()); - if frame.timestamp_us == 0 { - // If the timestamp is set to 0, default to now + let capture_ts = if frame.timestamp_us == 0 { let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - builder.pin_mut().set_timestamp_us(now.as_micros() as i64); + now.as_micros() as i64 } else { - builder.pin_mut().set_timestamp_us(frame.timestamp_us); - } + frame.timestamp_us + }; + builder.pin_mut().set_timestamp_us(capture_ts); + + let (has_trailer, user_ts, fid) = match frame.frame_metadata { + Some(meta) => (true, meta.user_timestamp_us.unwrap_or(0), meta.frame_id.unwrap_or(0)), + None => (false, 0, 0), + }; + + self.inner.lock().captured_frames += 1; + + self.sys_handle.on_captured_frame( + &builder.pin_mut().build(), + &vt_sys::ffi::FrameMetadata { + has_packet_trailer: has_trailer, + user_timestamp_us: user_ts, + frame_id: fid, + }, + ); + } - self.sys_handle.on_captured_frame(&builder.pin_mut().build()); + /// Set the packet trailer handler used by this source. + /// + /// When set, any frame captured with a `user_timestamp_us` value will + /// automatically have its timestamp stored in the handler so the + /// `PacketTrailerTransformer` can embed it into the encoded frame. + /// The handler is set on the C++ VideoTrackSource so it has access to + /// the TimestampAligner-adjusted capture timestamp for correct keying. + pub fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + self.sys_handle.set_packet_trailer_handler(handler.sys_handle()); } pub fn video_resolution(&self) -> VideoResolution { diff --git a/libwebrtc/src/native/video_stream.rs b/libwebrtc/src/native/video_stream.rs index 07774f87b..aacfd3c40 100644 --- a/libwebrtc/src/native/video_stream.rs +++ b/libwebrtc/src/native/video_stream.rs @@ -25,12 +25,14 @@ use webrtc_sys::video_track as sys_vt; use super::video_frame::new_video_frame_buffer; use crate::{ - video_frame::{BoxVideoFrame, VideoFrame}, + native::packet_trailer::PacketTrailerHandler, + video_frame::{BoxVideoFrame, FrameMetadata, VideoFrame}, video_track::RtcVideoTrack, }; pub struct NativeVideoStream { native_sink: SharedPtr, + observer: Arc, video_track: RtcVideoTrack, frame_rx: mpsc::UnboundedReceiver, } @@ -38,7 +40,14 @@ pub struct NativeVideoStream { impl NativeVideoStream { pub fn new(video_track: RtcVideoTrack) -> Self { let (frame_tx, frame_rx) = mpsc::unbounded_channel(); - let observer = Arc::new(VideoTrackObserver { frame_tx }); + + // Auto-wire the packet trailer handler from the track if one is set. + let handler = video_track.handle.packet_trailer_handler(); + + let observer = Arc::new(VideoTrackObserver { + frame_tx, + packet_trailer_handler: parking_lot::Mutex::new(handler), + }); let native_sink = sys_vt::ffi::new_native_video_sink(Box::new( sys_vt::VideoSinkWrapper::new(observer.clone()), )); @@ -46,7 +55,21 @@ impl NativeVideoStream { let video = unsafe { sys_vt::ffi::media_to_video(video_track.sys_handle()) }; video.add_sink(&native_sink); - Self { native_sink, video_track, frame_rx } + Self { native_sink, observer, video_track, frame_rx } + } + + /// Set the packet trailer handler for this stream. + /// + /// When set, each frame produced by this stream will have its + /// `user_timestamp_us` field populated from the handler's receive + /// map (looked up by RTP timestamp). + /// + /// Note: If the handler was already set on the `RtcVideoTrack` before + /// creating this stream, it is automatically wired up. This method is + /// only needed if you want to override or set the handler after + /// construction. + pub fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + *self.observer.packet_trailer_handler.lock() = Some(handler); } pub fn track(&self) -> RtcVideoTrack { @@ -77,13 +100,26 @@ impl Stream for NativeVideoStream { struct VideoTrackObserver { frame_tx: mpsc::UnboundedSender, + packet_trailer_handler: parking_lot::Mutex>, } impl sys_vt::VideoSink for VideoTrackObserver { fn on_frame(&self, frame: UniquePtr) { + let rtp_timestamp = frame.timestamp(); + let frame_metadata = self + .packet_trailer_handler + .lock() + .as_ref() + .and_then(|h| h.lookup_frame_metadata(rtp_timestamp)) + .map(|(ts, fid)| FrameMetadata { + user_timestamp_us: Some(ts), + frame_id: if fid != 0 { Some(fid) } else { None }, + }); + let _ = self.frame_tx.send(VideoFrame { rotation: frame.rotation().into(), timestamp_us: frame.timestamp_us(), + frame_metadata, buffer: new_video_frame_buffer(unsafe { frame.video_frame_buffer() }), }); } diff --git a/libwebrtc/src/native/video_track.rs b/libwebrtc/src/native/video_track.rs index efc3937df..64699e153 100644 --- a/libwebrtc/src/native/video_track.rs +++ b/libwebrtc/src/native/video_track.rs @@ -12,22 +12,45 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use cxx::SharedPtr; +use parking_lot::Mutex; use sys_vt::ffi::video_to_media; use webrtc_sys::video_track as sys_vt; use super::media_stream_track::impl_media_stream_track; +use super::packet_trailer::PacketTrailerHandler; use crate::media_stream_track::RtcTrackState; #[derive(Clone)] pub struct RtcVideoTrack { pub(crate) sys_handle: SharedPtr, + packet_trailer_handler: Arc>>, } impl RtcVideoTrack { impl_media_stream_track!(video_to_media); + pub(crate) fn new(sys_handle: SharedPtr) -> Self { + Self { sys_handle, packet_trailer_handler: Arc::new(Mutex::new(None)) } + } + pub fn sys_handle(&self) -> SharedPtr { video_to_media(self.sys_handle.clone()) } + + /// Set the packet trailer handler for this track. + /// + /// When set, any `NativeVideoStream` created from this track will + /// automatically use this handler to populate `user_timestamp_us` + /// on each decoded frame. + pub fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + self.packet_trailer_handler.lock().replace(handler); + } + + /// Get the packet trailer handler, if one has been set. + pub fn packet_trailer_handler(&self) -> Option { + self.packet_trailer_handler.lock().clone() + } } diff --git a/libwebrtc/src/video_frame.rs b/libwebrtc/src/video_frame.rs index 926b45572..6046c1713 100644 --- a/libwebrtc/src/video_frame.rs +++ b/libwebrtc/src/video_frame.rs @@ -52,6 +52,18 @@ pub enum VideoBufferType { NV12, } +/// Metadata carried alongside a video frame via the packet trailer mechanism. +/// +/// Each field corresponds to an independently negotiable packet trailer feature +/// (`PTF_USER_TIMESTAMP`, `PTF_FRAME_ID`), so individual fields are `Option`. +#[derive(Debug, Clone, Copy)] +pub struct FrameMetadata { + /// Wall-clock capture time in microseconds, when `PTF_USER_TIMESTAMP` is enabled. + pub user_timestamp_us: Option, + /// Monotonically increasing frame identifier, when `PTF_FRAME_ID` is enabled. + pub frame_id: Option, +} + #[derive(Debug)] pub struct VideoFrame where @@ -59,9 +71,17 @@ where { pub rotation: VideoRotation, pub timestamp_us: i64, // When the frame was captured in microseconds + /// Packet-trailer metadata, if any trailer features are active. + pub frame_metadata: Option, pub buffer: T, } +impl> VideoFrame { + pub fn new(rotation: VideoRotation, buffer: T) -> Self { + Self { rotation, timestamp_us: 0, frame_metadata: None, buffer } + } +} + pub type BoxVideoBuffer = Box; pub type BoxVideoFrame = VideoFrame; diff --git a/libwebrtc/src/video_source.rs b/libwebrtc/src/video_source.rs index f8dc9964a..dc9e62afc 100644 --- a/libwebrtc/src/video_source.rs +++ b/libwebrtc/src/video_source.rs @@ -50,6 +50,7 @@ pub mod native { use std::fmt::{Debug, Formatter}; use super::*; + use crate::native::packet_trailer::PacketTrailerHandler; use crate::video_frame::{VideoBuffer, VideoFrame}; #[derive(Clone)] @@ -78,6 +79,16 @@ pub mod native { self.handle.capture_frame(frame) } + /// Set the packet trailer handler used by this source. + /// + /// When set, any frame captured with a `user_timestamp_us` value will + /// automatically have its timestamp stored in the handler (keyed by + /// the TimestampAligner-adjusted capture timestamp) so the + /// `PacketTrailerTransformer` can embed it into the encoded frame. + pub fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + self.handle.set_packet_trailer_handler(handler) + } + pub fn video_resolution(&self) -> VideoResolution { self.handle.video_resolution() } diff --git a/libwebrtc/src/video_stream.rs b/libwebrtc/src/video_stream.rs index fbdd01abe..b7a2fa952 100644 --- a/libwebrtc/src/video_stream.rs +++ b/libwebrtc/src/video_stream.rs @@ -26,7 +26,10 @@ pub mod native { }; use super::stream_imp; - use crate::{video_frame::BoxVideoFrame, video_track::RtcVideoTrack}; + use crate::{ + native::packet_trailer::PacketTrailerHandler, video_frame::BoxVideoFrame, + video_track::RtcVideoTrack, + }; use livekit_runtime::Stream; pub struct NativeVideoStream { @@ -44,6 +47,20 @@ pub mod native { Self { handle: stream_imp::NativeVideoStream::new(video_track) } } + /// Set the packet trailer handler for this stream. + /// + /// When set, each frame produced by this stream will have its + /// `user_timestamp_us` field populated by looking up the user + /// timestamp for each frame's RTP timestamp. + /// + /// Note: If the handler was already set on the `RtcVideoTrack` + /// before creating this stream, it is automatically wired up. + /// This method is only needed to override or set the handler + /// after construction. + pub fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + self.handle.set_packet_trailer_handler(handler); + } + pub fn track(&self) -> RtcVideoTrack { self.handle.track() } diff --git a/libwebrtc/src/video_track.rs b/libwebrtc/src/video_track.rs index a223d4858..706da563b 100644 --- a/libwebrtc/src/video_track.rs +++ b/libwebrtc/src/video_track.rs @@ -19,6 +19,9 @@ use crate::{ media_stream_track::{media_stream_track, RtcTrackState}, }; +#[cfg(not(target_arch = "wasm32"))] +use crate::native::packet_trailer::PacketTrailerHandler; + #[derive(Clone)] pub struct RtcVideoTrack { pub(crate) handle: imp_vt::RtcVideoTrack, @@ -26,6 +29,22 @@ pub struct RtcVideoTrack { impl RtcVideoTrack { media_stream_track!(); + + /// Set the packet trailer handler for this track. + /// + /// When set, any `NativeVideoStream` created from this track will + /// automatically use this handler to populate `user_timestamp_us` + /// on each decoded frame. + #[cfg(not(target_arch = "wasm32"))] + pub fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + self.handle.set_packet_trailer_handler(handler); + } + + /// Get the packet trailer handler, if one has been set. + #[cfg(not(target_arch = "wasm32"))] + pub fn packet_trailer_handler(&self) -> Option { + self.handle.packet_trailer_handler() + } } impl Debug for RtcVideoTrack { diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index c74d39fc0..cf0a64db4 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -52,7 +52,7 @@ pub const JOIN_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5); pub const SIGNAL_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const REGION_FETCH_TIMEOUT: Duration = Duration::from_secs(3); const VALIDATE_TIMEOUT: Duration = Duration::from_secs(3); -pub const PROTOCOL_VERSION: u32 = 16; +pub const PROTOCOL_VERSION: u32 = 17; #[derive(Error, Debug)] pub enum SignalError { diff --git a/livekit-ffi-node-bindings/proto/audio_frame_pb.d.ts b/livekit-ffi-node-bindings/proto/audio_frame_pb.d.ts index 29303e20c..1cb120b5e 100644 --- a/livekit-ffi-node-bindings/proto/audio_frame_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/audio_frame_pb.d.ts @@ -185,6 +185,16 @@ export declare class NewAudioStreamRequest extends Message); static readonly runtime: typeof proto2; @@ -268,6 +278,16 @@ export declare class AudioStreamFromParticipantRequest extends Message); static readonly runtime: typeof proto2; diff --git a/livekit-ffi-node-bindings/proto/audio_frame_pb.js b/livekit-ffi-node-bindings/proto/audio_frame_pb.js index 17e6d9f0d..2e33235ea 100644 --- a/livekit-ffi-node-bindings/proto/audio_frame_pb.js +++ b/livekit-ffi-node-bindings/proto/audio_frame_pb.js @@ -101,6 +101,7 @@ const NewAudioStreamRequest = /*@__PURE__*/ proto2.makeMessageType( { no: 5, name: "audio_filter_module_id", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, { no: 6, name: "audio_filter_options", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, { no: 7, name: "frame_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true }, + { no: 8, name: "queue_size_frames", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true }, ], ); @@ -128,6 +129,7 @@ const AudioStreamFromParticipantRequest = /*@__PURE__*/ proto2.makeMessageType( { no: 7, name: "audio_filter_module_id", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, { no: 8, name: "audio_filter_options", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, { no: 9, name: "frame_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true }, + { no: 10, name: "queue_size_frames", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true }, ], ); diff --git a/livekit-ffi-node-bindings/proto/e2ee_pb.d.ts b/livekit-ffi-node-bindings/proto/e2ee_pb.d.ts index 6c815b49d..3e6a5316c 100644 --- a/livekit-ffi-node-bindings/proto/e2ee_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/e2ee_pb.d.ts @@ -40,6 +40,21 @@ export declare enum EncryptionType { CUSTOM = 2, } +/** + * @generated from enum livekit.proto.KeyDerivationFunction + */ +export declare enum KeyDerivationFunction { + /** + * @generated from enum value: PBKDF2 = 0; + */ + PBKDF2 = 0, + + /** + * @generated from enum value: HKDF = 1; + */ + HKDF = 1, +} + /** * @generated from enum livekit.proto.EncryptionState */ @@ -147,6 +162,16 @@ export declare class KeyProviderOptions extends Message { */ failureTolerance?: number; + /** + * @generated from field: required int32 key_ring_size = 5; + */ + keyRingSize?: number; + + /** + * @generated from field: required livekit.proto.KeyDerivationFunction key_derivation_function = 6; + */ + keyDerivationFunction?: KeyDerivationFunction; + constructor(data?: PartialMessage); static readonly runtime: typeof proto2; diff --git a/livekit-ffi-node-bindings/proto/e2ee_pb.js b/livekit-ffi-node-bindings/proto/e2ee_pb.js index 511338d6e..8696e62e3 100644 --- a/livekit-ffi-node-bindings/proto/e2ee_pb.js +++ b/livekit-ffi-node-bindings/proto/e2ee_pb.js @@ -34,6 +34,17 @@ const EncryptionType = /*@__PURE__*/ proto2.makeEnum( ], ); +/** + * @generated from enum livekit.proto.KeyDerivationFunction + */ +const KeyDerivationFunction = /*@__PURE__*/ proto2.makeEnum( + "livekit.proto.KeyDerivationFunction", + [ + {no: 0, name: "PBKDF2"}, + {no: 1, name: "HKDF"}, + ], +); + /** * @generated from enum livekit.proto.EncryptionState */ @@ -73,6 +84,8 @@ const KeyProviderOptions = /*@__PURE__*/ proto2.makeMessageType( { no: 2, name: "ratchet_window_size", kind: "scalar", T: 5 /* ScalarType.INT32 */, req: true }, { no: 3, name: "ratchet_salt", kind: "scalar", T: 12 /* ScalarType.BYTES */, req: true }, { no: 4, name: "failure_tolerance", kind: "scalar", T: 5 /* ScalarType.INT32 */, req: true }, + { no: 5, name: "key_ring_size", kind: "scalar", T: 5 /* ScalarType.INT32 */, req: true }, + { no: 6, name: "key_derivation_function", kind: "enum", T: proto2.getEnumType(KeyDerivationFunction), req: true }, ], ); @@ -325,6 +338,7 @@ const E2eeResponse = /*@__PURE__*/ proto2.makeMessageType( exports.EncryptionType = EncryptionType; +exports.KeyDerivationFunction = KeyDerivationFunction; exports.EncryptionState = EncryptionState; exports.FrameCryptor = FrameCryptor; exports.KeyProviderOptions = KeyProviderOptions; diff --git a/livekit-ffi-node-bindings/proto/participant_pb.d.ts b/livekit-ffi-node-bindings/proto/participant_pb.d.ts index 932b8879c..053486b08 100644 --- a/livekit-ffi-node-bindings/proto/participant_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/participant_pb.d.ts @@ -201,6 +201,11 @@ export declare enum DisconnectReason { * @generated from enum value: MEDIA_FAILURE = 15; */ MEDIA_FAILURE = 15, + + /** + * @generated from enum value: AGENT_ERROR = 16; + */ + AGENT_ERROR = 16, } /** diff --git a/livekit-ffi-node-bindings/proto/participant_pb.js b/livekit-ffi-node-bindings/proto/participant_pb.js index 739cf93c6..b7252e24e 100644 --- a/livekit-ffi-node-bindings/proto/participant_pb.js +++ b/livekit-ffi-node-bindings/proto/participant_pb.js @@ -76,6 +76,7 @@ const DisconnectReason = /*@__PURE__*/ proto2.makeEnum( {no: 13, name: "SIP_TRUNK_FAILURE"}, {no: 14, name: "CONNECTION_TIMEOUT"}, {no: 15, name: "MEDIA_FAILURE"}, + {no: 16, name: "AGENT_ERROR"}, ], ); diff --git a/livekit-ffi-node-bindings/proto/room_pb.d.ts b/livekit-ffi-node-bindings/proto/room_pb.d.ts index 14d81e121..842f1e9d0 100644 --- a/livekit-ffi-node-bindings/proto/room_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/room_pb.d.ts @@ -310,6 +310,11 @@ export declare class DisconnectRequest extends Message { */ requestAsyncId?: bigint; + /** + * @generated from field: optional livekit.proto.DisconnectReason reason = 3; + */ + reason?: DisconnectReason; + constructor(data?: PartialMessage); static readonly runtime: typeof proto2; @@ -1744,6 +1749,20 @@ export declare class RoomOptions extends Message { */ encryption?: E2eeOptions; + /** + * use single peer connection for both publish/subscribe (default: false) + * + * @generated from field: optional bool single_peer_connection = 8; + */ + singlePeerConnection?: boolean; + + /** + * timeout in milliseconds for each signal connection attempt (default: 5000) + * + * @generated from field: optional uint64 connect_timeout_ms = 9; + */ + connectTimeoutMs?: bigint; + constructor(data?: PartialMessage); static readonly runtime: typeof proto2; diff --git a/livekit-ffi-node-bindings/proto/room_pb.js b/livekit-ffi-node-bindings/proto/room_pb.js index 579d0474d..410908d8f 100644 --- a/livekit-ffi-node-bindings/proto/room_pb.js +++ b/livekit-ffi-node-bindings/proto/room_pb.js @@ -160,6 +160,7 @@ const DisconnectRequest = /*@__PURE__*/ proto2.makeMessageType( () => [ { no: 1, name: "room_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, { no: 2, name: "request_async_id", kind: "scalar", T: 4 /* ScalarType.UINT64 */, opt: true }, + { no: 3, name: "reason", kind: "enum", T: proto2.getEnumType(DisconnectReason), opt: true }, ], ); @@ -679,6 +680,8 @@ const RoomOptions = /*@__PURE__*/ proto2.makeMessageType( { no: 5, name: "rtc_config", kind: "message", T: RtcConfig, opt: true }, { no: 6, name: "join_retries", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true }, { no: 7, name: "encryption", kind: "message", T: E2eeOptions, opt: true }, + { no: 8, name: "single_peer_connection", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true }, + { no: 9, name: "connect_timeout_ms", kind: "scalar", T: 4 /* ScalarType.UINT64 */, opt: true }, ], ); diff --git a/livekit-ffi-node-bindings/proto/track_pb.d.ts b/livekit-ffi-node-bindings/proto/track_pb.d.ts index 89cb7e8b8..e4a854afd 100644 --- a/livekit-ffi-node-bindings/proto/track_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/track_pb.d.ts @@ -135,6 +135,21 @@ export declare enum AudioTrackFeature { TF_PRECONNECT_BUFFER = 6, } +/** + * @generated from enum livekit.proto.PacketTrailerFeature + */ +export declare enum PacketTrailerFeature { + /** + * @generated from enum value: PTF_USER_TIMESTAMP = 0; + */ + PTF_USER_TIMESTAMP = 0, + + /** + * @generated from enum value: PTF_FRAME_ID = 1; + */ + PTF_FRAME_ID = 1, +} + /** * Create a new VideoTrack from a VideoSource * @@ -415,6 +430,11 @@ export declare class TrackPublicationInfo extends Message */ audioFeatures: AudioTrackFeature[]; + /** + * @generated from field: repeated livekit.proto.PacketTrailerFeature packet_trailer_features = 13; + */ + packetTrailerFeatures: PacketTrailerFeature[]; + constructor(data?: PartialMessage); static readonly runtime: typeof proto2; diff --git a/livekit-ffi-node-bindings/proto/track_pb.js b/livekit-ffi-node-bindings/proto/track_pb.js index 4a150c747..4755e0b2f 100644 --- a/livekit-ffi-node-bindings/proto/track_pb.js +++ b/livekit-ffi-node-bindings/proto/track_pb.js @@ -79,6 +79,17 @@ const AudioTrackFeature = /*@__PURE__*/ proto2.makeEnum( ], ); +/** + * @generated from enum livekit.proto.PacketTrailerFeature + */ +const PacketTrailerFeature = /*@__PURE__*/ proto2.makeEnum( + "livekit.proto.PacketTrailerFeature", + [ + {no: 0, name: "PTF_USER_TIMESTAMP"}, + {no: 1, name: "PTF_FRAME_ID"}, + ], +); + /** * Create a new VideoTrack from a VideoSource * @@ -184,6 +195,7 @@ const TrackPublicationInfo = /*@__PURE__*/ proto2.makeMessageType( { no: 10, name: "remote", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true }, { no: 11, name: "encryption_type", kind: "enum", T: proto2.getEnumType(EncryptionType), req: true }, { no: 12, name: "audio_features", kind: "enum", T: proto2.getEnumType(AudioTrackFeature), repeated: true }, + { no: 13, name: "packet_trailer_features", kind: "enum", T: proto2.getEnumType(PacketTrailerFeature), repeated: true }, ], ); @@ -307,6 +319,7 @@ exports.TrackKind = TrackKind; exports.TrackSource = TrackSource; exports.StreamState = StreamState; exports.AudioTrackFeature = AudioTrackFeature; +exports.PacketTrailerFeature = PacketTrailerFeature; exports.CreateVideoTrackRequest = CreateVideoTrackRequest; exports.CreateVideoTrackResponse = CreateVideoTrackResponse; exports.CreateAudioTrackRequest = CreateAudioTrackRequest; diff --git a/livekit-ffi-node-bindings/proto/video_frame_pb.d.ts b/livekit-ffi-node-bindings/proto/video_frame_pb.d.ts index 14f952268..83027f4bf 100644 --- a/livekit-ffi-node-bindings/proto/video_frame_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/video_frame_pb.d.ts @@ -324,12 +324,17 @@ export declare class NewVideoSourceRequest extends Message); static readonly runtime: typeof proto2; diff --git a/livekit-ffi-node-bindings/proto/video_frame_pb.js b/livekit-ffi-node-bindings/proto/video_frame_pb.js index a6b174c4f..86f14d906 100644 --- a/livekit-ffi-node-bindings/proto/video_frame_pb.js +++ b/livekit-ffi-node-bindings/proto/video_frame_pb.js @@ -159,6 +159,7 @@ const NewVideoSourceRequest = /*@__PURE__*/ proto2.makeMessageType( () => [ { no: 1, name: "type", kind: "enum", T: proto2.getEnumType(VideoSourceType), req: true }, { no: 2, name: "resolution", kind: "message", T: VideoSourceResolution, req: true }, + { no: 3, name: "is_screencast", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true }, ], ); diff --git a/livekit-ffi/protocol/track.proto b/livekit-ffi/protocol/track.proto index bcb9ee785..17ced5ec3 100644 --- a/livekit-ffi/protocol/track.proto +++ b/livekit-ffi/protocol/track.proto @@ -91,6 +91,7 @@ message TrackPublicationInfo { required bool remote = 10; required EncryptionType encryption_type = 11; repeated AudioTrackFeature audio_features = 12; + repeated PacketTrailerFeature packet_trailer_features = 13; } message OwnedTrackPublication { @@ -159,3 +160,8 @@ enum AudioTrackFeature { TF_ENHANCED_NOISE_CANCELLATION = 5; TF_PRECONNECT_BUFFER = 6; // client will buffer audio once available and send it to the server via bytes stream once connected } + +enum PacketTrailerFeature { + PTF_USER_TIMESTAMP = 0; + PTF_FRAME_ID = 1; +} diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index 206bd5fde..89b889800 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -290,9 +290,11 @@ impl From for TrackPublishOptions { red: opts.red.unwrap_or(default_publish_options.red), simulcast: opts.simulcast.unwrap_or(default_publish_options.simulcast), stream: opts.stream.unwrap_or(default_publish_options.stream), + simulcast_layers: default_publish_options.simulcast_layers, preconnect_buffer: opts .preconnect_buffer .unwrap_or(default_publish_options.preconnect_buffer), + packet_trailer_features: default_publish_options.packet_trailer_features, } } } diff --git a/livekit-ffi/src/conversion/track.rs b/livekit-ffi/src/conversion/track.rs index 9c3418b6e..653af6c36 100644 --- a/livekit-ffi/src/conversion/track.rs +++ b/livekit-ffi/src/conversion/track.rs @@ -39,6 +39,11 @@ impl From<&FfiPublication> for proto::TrackPublicationInfo { .into_iter() .map(|i| proto::AudioTrackFeature::from(i).into()) .collect(), + packet_trailer_features: publication + .packet_trailer_features() + .into_iter() + .map(|i| proto::PacketTrailerFeature::from(i).into()) + .collect(), } } } @@ -158,3 +163,16 @@ impl From for proto::AudioTrackFeature { } } } + +impl From for proto::PacketTrailerFeature { + fn from(value: livekit_protocol::PacketTrailerFeature) -> Self { + match value { + livekit_protocol::PacketTrailerFeature::PtfUserTimestamp => { + proto::PacketTrailerFeature::PtfUserTimestamp + } + livekit_protocol::PacketTrailerFeature::PtfFrameId => { + proto::PacketTrailerFeature::PtfFrameId + } + } + } +} diff --git a/livekit-ffi/src/server/video_source.rs b/livekit-ffi/src/server/video_source.rs index 7f33b67c3..8e9c70794 100644 --- a/livekit-ffi/src/server/video_source.rs +++ b/livekit-ffi/src/server/video_source.rs @@ -67,6 +67,7 @@ impl FfiVideoSource { let frame = VideoFrame { rotation: capture.rotation().into(), timestamp_us: capture.timestamp_us, + frame_metadata: None, buffer, }; diff --git a/livekit-protocol/protocol b/livekit-protocol/protocol index f734574de..c5536cb98 160000 --- a/livekit-protocol/protocol +++ b/livekit-protocol/protocol @@ -1 +1 @@ -Subproject commit f734574de339d94dd83f70fbe1723ba1cdc61c2f +Subproject commit c5536cb98c1f32d7fd2dae384478d79fb2df5978 diff --git a/livekit-protocol/src/livekit.rs b/livekit-protocol/src/livekit.rs index 9d2c88193..29f52480c 100644 --- a/livekit-protocol/src/livekit.rs +++ b/livekit-protocol/src/livekit.rs @@ -2058,6 +2058,7 @@ impl AudioTrackFeature { #[repr(i32)] pub enum PacketTrailerFeature { PtfUserTimestamp = 0, + PtfFrameId = 1, } impl PacketTrailerFeature { /// String value of the enum field names used in the ProtoBuf definition. @@ -2067,12 +2068,14 @@ impl PacketTrailerFeature { pub fn as_str_name(&self) -> &'static str { match self { PacketTrailerFeature::PtfUserTimestamp => "PTF_USER_TIMESTAMP", + PacketTrailerFeature::PtfFrameId => "PTF_FRAME_ID", } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "PTF_USER_TIMESTAMP" => Some(Self::PtfUserTimestamp), + "PTF_FRAME_ID" => Some(Self::PtfFrameId), _ => None, } } diff --git a/livekit-protocol/src/livekit.serde.rs b/livekit-protocol/src/livekit.serde.rs index 1ae29c2af..8c84d300c 100644 --- a/livekit-protocol/src/livekit.serde.rs +++ b/livekit-protocol/src/livekit.serde.rs @@ -21596,6 +21596,7 @@ impl serde::Serialize for PacketTrailerFeature { { let variant = match self { Self::PtfUserTimestamp => "PTF_USER_TIMESTAMP", + Self::PtfFrameId => "PTF_FRAME_ID", }; serializer.serialize_str(variant) } @@ -21608,6 +21609,7 @@ impl<'de> serde::Deserialize<'de> for PacketTrailerFeature { { const FIELDS: &[&str] = &[ "PTF_USER_TIMESTAMP", + "PTF_FRAME_ID", ]; struct GeneratedVisitor; @@ -21649,6 +21651,7 @@ impl<'de> serde::Deserialize<'de> for PacketTrailerFeature { { match value { "PTF_USER_TIMESTAMP" => Ok(PacketTrailerFeature::PtfUserTimestamp), + "PTF_FRAME_ID" => Ok(PacketTrailerFeature::PtfFrameId), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/livekit/src/prelude.rs b/livekit/src/prelude.rs index 3e6bda23b..2757aa63d 100644 --- a/livekit/src/prelude.rs +++ b/livekit/src/prelude.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use livekit_protocol::AudioTrackFeature; +pub use livekit_protocol::{AudioTrackFeature, PacketTrailerFeature}; pub use crate::{ id::*, diff --git a/livekit/src/room/e2ee/manager.rs b/livekit/src/room/e2ee/manager.rs index 1e583b9c4..5f82caccf 100644 --- a/livekit/src/room/e2ee/manager.rs +++ b/livekit/src/room/e2ee/manager.rs @@ -15,12 +15,16 @@ use std::{collections::HashMap, sync::Arc}; use libwebrtc::{ - native::frame_cryptor::{ - DataPacketCryptor, EncryptedPacket, EncryptionAlgorithm, EncryptionState, FrameCryptor, + native::{ + frame_cryptor::{ + DataPacketCryptor, EncryptedPacket, EncryptionAlgorithm, EncryptionState, FrameCryptor, + }, + packet_trailer, }, rtp_receiver::RtpReceiver, rtp_sender::RtpSender, }; +use livekit_protocol::PacketTrailerFeature; use parking_lot::Mutex; use super::{key_provider::KeyProvider, EncryptionType}; @@ -91,48 +95,95 @@ impl E2eeManager { self.inner.lock().options.is_some() } - /// Called by the room pub(crate) fn on_track_subscribed( &self, track: RemoteTrack, publication: RemoteTrackPublication, participant: RemoteParticipant, ) { - if !self.initialized() { - return; + let identity = participant.identity(); + let receiver = track.transceiver().unwrap().receiver(); + let mut packet_trailer_handler = None; + + let has_packet_trailer = publication.proto_info().packet_trailer_features.iter().any(|f| { + *f == PacketTrailerFeature::PtfUserTimestamp as i32 + || *f == PacketTrailerFeature::PtfFrameId as i32 + }); + + if let RemoteTrack::Video(video_track) = &track { + let handler = packet_trailer::create_receiver_handler( + LkRuntime::instance().pc_factory(), + &receiver, + ); + video_track.set_packet_trailer_handler(handler.clone()); + packet_trailer_handler = Some(handler); + + if has_packet_trailer { + log::info!( + "attached packet_trailer handler for subscribed track {} from {}", + publication.sid(), + identity, + ); + } else { + log::info!( + "attached packet_trailer handler for subscribed track {} from {} without advertised packet trailer support", + publication.sid(), + identity, + ); + } } - if publication.encryption_type() == EncryptionType::None { + if !self.initialized() || publication.encryption_type() == EncryptionType::None { return; } - let identity = participant.identity(); - let receiver = track.transceiver().unwrap().receiver(); let frame_cryptor = self.setup_rtp_receiver(&identity, receiver); + if let Some(handler) = packet_trailer_handler.as_ref() { + frame_cryptor.set_packet_trailer_handler(handler); + } self.setup_cryptor(&frame_cryptor); let mut inner = self.inner.lock(); inner.frame_cryptors.insert((identity, publication.sid()), frame_cryptor.clone()); } - /// Called by the room pub(crate) fn on_local_track_published( &self, track: LocalTrack, publication: LocalTrackPublication, participant: LocalParticipant, ) { - if !self.initialized() { - return; - } + let identity = participant.identity(); + let sender = track.transceiver().unwrap().sender(); - if publication.encryption_type() == EncryptionType::None { + let packet_trailer_handler = if let LocalTrack::Video(video_track) = &track { + let handler = video_track.packet_trailer_handler(); + if handler.is_some() { + log::info!( + "packet_trailer enabled for published track {} from {}", + publication.sid(), + identity, + ); + } else { + log::info!( + "packet_trailer not enabled for published track {} from {}", + publication.sid(), + identity, + ); + } + handler + } else { + None + }; + + if !self.initialized() || publication.encryption_type() == EncryptionType::None { return; } - let identity = participant.identity(); - let sender = track.transceiver().unwrap().sender(); let frame_cryptor = self.setup_rtp_sender(&identity, sender); + if let Some(handler) = packet_trailer_handler.as_ref() { + frame_cryptor.set_packet_trailer_handler(handler); + } self.setup_cryptor(&frame_cryptor); let mut inner = self.inner.lock(); diff --git a/livekit/src/room/options.rs b/livekit/src/room/options.rs index fbcb6ba94..b043b7f52 100644 --- a/livekit/src/room/options.rs +++ b/livekit/src/room/options.rs @@ -14,6 +14,7 @@ use libwebrtc::prelude::*; use livekit_protocol as proto; +use proto::PacketTrailerFeature; use crate::prelude::*; @@ -84,10 +85,14 @@ pub struct TrackPublishOptions { pub dtx: bool, pub red: bool, pub simulcast: bool, + /// Custom simulcast layer presets (low, mid). When set, these override the + /// SDK's built-in defaults which reduce fps on lower layers. + pub simulcast_layers: Option>, // pub name: String, pub source: TrackSource, pub stream: String, pub preconnect_buffer: bool, + pub packet_trailer_features: Vec, } impl Default for TrackPublishOptions { @@ -99,9 +104,11 @@ impl Default for TrackPublishOptions { dtx: true, red: true, simulcast: true, + simulcast_layers: None, source: TrackSource::Unknown, stream: "".to_string(), preconnect_buffer: false, + packet_trailer_features: Vec::new(), } } } @@ -147,7 +154,10 @@ pub fn compute_video_encodings( return into_rtp_encodings(width, height, &[initial_preset]); } - let mut simulcast_presets = compute_default_simulcast_presets(screenshare, &initial_preset); + let mut simulcast_presets = match options.simulcast_layers { + Some(ref custom) => custom.clone(), + None => compute_default_simulcast_presets(screenshare, &initial_preset), + }; let mid_preset = simulcast_presets.pop(); let low_preset = simulcast_presets.pop(); diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index c72b5f4c4..0098a8858 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -35,11 +35,16 @@ use crate::{ options::{self, compute_video_encodings, video_layers_from_encodings, TrackPublishOptions}, prelude::*, room::participant::rpc::{RpcError, RpcErrorCode, RpcInvocationData, MAX_PAYLOAD_BYTES}, + rtc_engine::lk_runtime::LkRuntime, rtc_engine::{EngineError, RtcEngine}, ChatMessage, DataPacket, RoomSession, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription, }; use chrono::Utc; -use libwebrtc::{native::create_random_uuid, rtp_parameters::RtpEncodingParameters}; +use libwebrtc::{ + native::{create_random_uuid, packet_trailer}, + rtp_parameters::RtpEncodingParameters, + video_source::RtcVideoSource, +}; use livekit_api::signal_client::SignalError; use livekit_protocol as proto; use livekit_runtime::timeout; @@ -273,6 +278,9 @@ impl LocalParticipant { req.audio_features.push(proto::AudioTrackFeature::TfPreconnectBuffer as i32); } + req.packet_trailer_features = + options.packet_trailer_features.iter().map(|f| *f as i32).collect(); + let mut encodings = Vec::default(); match &track { LocalTrack::Video(video_track) => { @@ -318,6 +326,23 @@ impl LocalParticipant { track.set_transceiver(Some(transceiver)); + if !options.packet_trailer_features.is_empty() { + if let LocalTrack::Video(video_track) = &track { + log::info!("packet_trailer enabled for local video track {}", publication.sid(),); + let sender = track.transceiver().unwrap().sender(); + let handler = packet_trailer::create_sender_handler( + LkRuntime::instance().pc_factory(), + &sender, + ); + video_track.set_packet_trailer_handler(handler.clone()); + + #[cfg(not(target_arch = "wasm32"))] + if let RtcVideoSource::Native(ref native_source) = video_track.rtc_source() { + native_source.set_packet_trailer_handler(handler.clone()); + } + } + } + self.inner.rtc_engine.publisher_negotiation_needed(); publication.update_publish_options(options); diff --git a/livekit/src/room/publication/local.rs b/livekit/src/room/publication/local.rs index 97e365537..0a12e4e27 100644 --- a/livekit/src/room/publication/local.rs +++ b/livekit/src/room/publication/local.rs @@ -14,7 +14,7 @@ use std::{fmt::Debug, sync::Arc}; -use livekit_protocol::{self as proto, AudioTrackFeature}; +use livekit_protocol::{self as proto, AudioTrackFeature, PacketTrailerFeature}; use parking_lot::Mutex; use super::TrackPublicationInner; @@ -149,4 +149,8 @@ impl LocalTrackPublication { pub fn audio_features(&self) -> Vec { self.inner.info.read().audio_features.clone() } + + pub fn packet_trailer_features(&self) -> Vec { + self.inner.info.read().packet_trailer_features.clone() + } } diff --git a/livekit/src/room/publication/mod.rs b/livekit/src/room/publication/mod.rs index 54e86bd4c..a64eae37d 100644 --- a/livekit/src/room/publication/mod.rs +++ b/livekit/src/room/publication/mod.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use livekit_protocol::enum_dispatch; -use livekit_protocol::{self as proto, AudioTrackFeature}; +use livekit_protocol::{self as proto, AudioTrackFeature, PacketTrailerFeature}; use parking_lot::{Mutex, RwLock}; use super::track::TrackDimension; @@ -60,6 +60,7 @@ impl TrackPublication { pub fn is_remote(self: &Self) -> bool; pub fn encryption_type(self: &Self) -> EncryptionType; pub fn audio_features(self: &Self) -> Vec; + pub fn packet_trailer_features(self: &Self) -> Vec; pub(crate) fn on_muted(self: &Self, on_mute: impl Fn(TrackPublication) + Send + 'static) -> (); pub(crate) fn on_unmuted(self: &Self, on_unmute: impl Fn(TrackPublication) + Send + 'static) -> (); @@ -96,6 +97,7 @@ struct PublicationInfo { pub proto_info: proto::TrackInfo, pub encryption_type: EncryptionType, pub audio_features: Vec, + pub packet_trailer_features: Vec, } pub(crate) type MutedHandler = Box; @@ -133,6 +135,11 @@ pub(super) fn new_inner( .into_iter() .map(|item| item.try_into().unwrap()) .collect(), + packet_trailer_features: info + .packet_trailer_features + .iter() + .filter_map(|v| PacketTrailerFeature::try_from(*v).ok()) + .collect(), }; Arc::new(TrackPublicationInner { info: RwLock::new(info), events: Default::default() }) @@ -154,6 +161,11 @@ pub(super) fn update_info( info.mime_type = new_info.mime_type.clone(); info.simulcasted = new_info.simulcast; info.audio_features = new_info.audio_features().collect(); + info.packet_trailer_features = new_info + .packet_trailer_features + .iter() + .filter_map(|v| PacketTrailerFeature::try_from(*v).ok()) + .collect(); } pub(super) fn set_track( diff --git a/livekit/src/room/publication/remote.rs b/livekit/src/room/publication/remote.rs index 6d27003df..9e21b1383 100644 --- a/livekit/src/room/publication/remote.rs +++ b/livekit/src/room/publication/remote.rs @@ -14,7 +14,7 @@ use std::{fmt::Debug, sync::Arc}; -use livekit_protocol::{self as proto, AudioTrackFeature}; +use livekit_protocol::{self as proto, AudioTrackFeature, PacketTrailerFeature}; use parking_lot::{Mutex, RwLock}; use super::{PermissionStatus, SubscriptionStatus, TrackPublication, TrackPublicationInner}; @@ -399,4 +399,8 @@ impl RemoteTrackPublication { pub fn audio_features(&self) -> Vec { self.inner.info.read().audio_features.clone() } + + pub fn packet_trailer_features(&self) -> Vec { + self.inner.info.read().packet_trailer_features.clone() + } } diff --git a/livekit/src/room/track/local_video_track.rs b/livekit/src/room/track/local_video_track.rs index c7c26649b..811fabca1 100644 --- a/livekit/src/room/track/local_video_track.rs +++ b/livekit/src/room/track/local_video_track.rs @@ -14,8 +14,9 @@ use std::{fmt::Debug, sync::Arc}; -use libwebrtc::{prelude::*, stats::RtcStats}; +use libwebrtc::{native::packet_trailer::PacketTrailerHandler, prelude::*, stats::RtcStats}; use livekit_protocol as proto; +use parking_lot::Mutex; use super::TrackInner; use crate::{prelude::*, rtc_engine::lk_runtime::LkRuntime}; @@ -24,6 +25,7 @@ use crate::{prelude::*, rtc_engine::lk_runtime::LkRuntime}; pub struct LocalVideoTrack { inner: Arc, source: RtcVideoSource, + packet_trailer_handler: Arc>>, } impl Debug for LocalVideoTrack { @@ -46,6 +48,7 @@ impl LocalVideoTrack { MediaStreamTrack::Video(rtc_track), )), source, + packet_trailer_handler: Arc::new(Mutex::new(None)), } } @@ -123,6 +126,18 @@ impl LocalVideoTrack { self.source.clone() } + /// Returns the packet trailer handler associated with this track, if any. + /// When present on the sender side, callers can store per-frame user + /// timestamps which will be embedded into encoded frames. + pub fn packet_trailer_handler(&self) -> Option { + self.packet_trailer_handler.lock().clone() + } + + /// Internal: set the packet trailer handler used for this track. + pub(crate) fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + *self.packet_trailer_handler.lock() = Some(handler); + } + pub async fn get_stats(&self) -> RoomResult> { super::local_track::get_stats(&self.inner).await } diff --git a/livekit/src/room/track/remote_video_track.rs b/livekit/src/room/track/remote_video_track.rs index 2076a3b1c..87f9beb3c 100644 --- a/livekit/src/room/track/remote_video_track.rs +++ b/livekit/src/room/track/remote_video_track.rs @@ -14,7 +14,7 @@ use std::{fmt::Debug, sync::Arc}; -use libwebrtc::{prelude::*, stats::RtcStats}; +use libwebrtc::{native::packet_trailer::PacketTrailerHandler, prelude::*, stats::RtcStats}; use livekit_protocol as proto; use super::{remote_track, TrackInner}; @@ -94,6 +94,20 @@ impl RemoteVideoTrack { true } + /// Returns a clone of the packet trailer handler, if one has been set. + pub fn packet_trailer_handler(&self) -> Option { + self.rtc_track().packet_trailer_handler() + } + + /// Internal: set the handler that extracts packet trailers for this track. + /// + /// The handler is stored on the underlying `RtcVideoTrack`, so any + /// `NativeVideoStream` created from this track will automatically + /// pick it up — no manual wiring required. + pub(crate) fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + self.rtc_track().set_packet_trailer_handler(handler); + } + pub async fn get_stats(&self) -> RoomResult> { super::remote_track::get_stats(&self.inner).await } diff --git a/livekit/tests/common/e2e/video.rs b/livekit/tests/common/e2e/video.rs index 3887e3698..fac27cac1 100644 --- a/livekit/tests/common/e2e/video.rs +++ b/livekit/tests/common/e2e/video.rs @@ -107,8 +107,7 @@ impl SolidColorTrack { data_u.fill(128); data_v.fill(128); - let frame = - VideoFrame { rotation: VideoRotation::VideoRotation0, timestamp_us: 0, buffer }; + let frame = VideoFrame::new(VideoRotation::VideoRotation0, buffer); rtc_source.capture_frame(&frame); time::sleep(interval).await; } diff --git a/webrtc-sys/build.rs b/webrtc-sys/build.rs index 4d779777e..072794ecf 100644 --- a/webrtc-sys/build.rs +++ b/webrtc-sys/build.rs @@ -54,6 +54,7 @@ fn main() { "src/prohibit_libsrtp_initialization.rs", "src/apm.rs", "src/audio_mixer.rs", + "src/packet_trailer.rs", ]; if is_desktop { @@ -89,6 +90,7 @@ fn main() { "src/prohibit_libsrtp_initialization.cpp", "src/apm.cpp", "src/audio_mixer.cpp", + "src/packet_trailer.cpp", ]); if is_desktop { diff --git a/webrtc-sys/include/livekit/frame_cryptor.h b/webrtc-sys/include/livekit/frame_cryptor.h index c100aeed0..da36356b5 100644 --- a/webrtc-sys/include/livekit/frame_cryptor.h +++ b/webrtc-sys/include/livekit/frame_cryptor.h @@ -39,6 +39,7 @@ struct EncryptedPacket; enum class Algorithm : ::std::int32_t; class RtcFrameCryptorObserverWrapper; class NativeFrameCryptorObserver; +class PacketTrailerHandler; /// Shared secret key for frame encryption. class KeyProvider { @@ -158,6 +159,10 @@ class FrameCryptor { void unregister_observer() const; + /// Attach a packet trailer transformer for chained processing. + void set_packet_trailer_handler( + std::shared_ptr handler) const; + private: std::shared_ptr rtc_runtime_; const rust::String participant_id_; @@ -167,6 +172,8 @@ class FrameCryptor { webrtc::scoped_refptr sender_; webrtc::scoped_refptr receiver_; mutable webrtc::scoped_refptr observer_; + mutable rtc::scoped_refptr + chained_transformer_; }; class NativeFrameCryptorObserver diff --git a/webrtc-sys/include/livekit/packet_trailer.h b/webrtc-sys/include/livekit/packet_trailer.h new file mode 100644 index 000000000..e4c6a4e7e --- /dev/null +++ b/webrtc-sys/include/livekit/packet_trailer.h @@ -0,0 +1,218 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "absl/types/optional.h" +#include "api/frame_transformer_interface.h" +#include "api/rtp_sender_interface.h" +#include "api/rtp_receiver_interface.h" +#include "api/scoped_refptr.h" +#include "livekit/webrtc.h" +#include "rtc_base/synchronization/mutex.h" +#include "rust/cxx.h" + +// Forward declarations to avoid circular includes +// (video_track.h -> packet_trailer.h -> peer_connection.h -> media_stream.h -> video_track.h) +namespace livekit_ffi { +class PeerConnectionFactory; +class RtpSender; +class RtpReceiver; +} // namespace livekit_ffi + +namespace livekit_ffi { + +// Magic bytes to identify packet trailers: "LKTS" (LiveKit TimeStamp) +constexpr uint8_t kPacketTrailerMagic[4] = {'L', 'K', 'T', 'S'}; + +// Trailer envelope: [trailer_len: 1B] [magic: 4B] = 5 bytes. +// Always present at the end of every trailer. +constexpr size_t kTrailerEnvelopeSize = 5; + +// TLV element overhead: [tag: 1B] [len: 1B] = 2 bytes before value. +// All TLV bytes (tag, len, value) are XORed with 0xFF. + +// TLV tag IDs +constexpr uint8_t kTagTimestampUs = 0x01; // value: 8 bytes big-endian uint64 +constexpr uint8_t kTagFrameId = 0x02; // value: 4 bytes big-endian uint32 + +constexpr size_t kTimestampTlvSize = 10; // tag + len + 8-byte value +constexpr size_t kFrameIdTlvSize = 6; // tag + len + 4-byte value + +// Trailer size varies because frame_id is omitted when it is unset (0). +constexpr size_t kPacketTrailerMinSize = + kTimestampTlvSize + kTrailerEnvelopeSize; +constexpr size_t kPacketTrailerMaxSize = + kTimestampTlvSize + kFrameIdTlvSize + kTrailerEnvelopeSize; + +struct PacketTrailerMetadata { + uint64_t user_timestamp_us; + uint32_t frame_id; + uint32_t ssrc; // SSRC that produced this entry (for simulcast tracking) +}; + +/// Frame transformer that appends/extracts packet trailers. +/// This transformer can be used standalone or in conjunction with e2ee. +/// +/// On the send side, user timestamps are stored in an internal map keyed +/// by capture timestamp (microseconds). When TransformSend fires it +/// looks up the user timestamp via the frame's CaptureTime(). +/// +/// On the receive side, extracted frame metadata is stored in an +/// internal map keyed by RTP timestamp (uint32_t). Decoded frames can +/// look up their metadata via lookup_frame_metadata(rtp_ts). +class PacketTrailerTransformer : public webrtc::FrameTransformerInterface { + public: + enum class Direction { kSend, kReceive }; + + explicit PacketTrailerTransformer(Direction direction); + ~PacketTrailerTransformer() override = default; + + // FrameTransformerInterface implementation + void Transform( + std::unique_ptr frame) override; + void RegisterTransformedFrameCallback( + rtc::scoped_refptr callback) override; + void RegisterTransformedFrameSinkCallback( + rtc::scoped_refptr callback, + uint32_t ssrc) override; + void UnregisterTransformedFrameCallback() override; + void UnregisterTransformedFrameSinkCallback(uint32_t ssrc) override; + + /// Enable/disable timestamp embedding + void set_enabled(bool enabled); + bool enabled() const; + + /// Lookup the frame metadata associated with a given RTP timestamp. + /// Returns the metadata if found, nullopt otherwise. + /// The entry is removed from the map after lookup. + std::optional lookup_frame_metadata(uint32_t rtp_timestamp); + + /// Store frame metadata for a given capture timestamp (sender side). + /// Called from VideoTrackSource::on_captured_frame with the + /// TimestampAligner-adjusted timestamp, which matches CaptureTime() + /// in the encoder pipeline. + void store_frame_metadata(int64_t capture_timestamp_us, + uint64_t user_timestamp_us, + uint32_t frame_id); + + private: + void TransformSend( + std::unique_ptr frame); + void TransformReceive( + std::unique_ptr frame); + + /// Append frame metadata trailer to frame data + std::vector AppendTrailer( + rtc::ArrayView data, + uint64_t user_timestamp_us, + uint32_t frame_id); + + /// Extract and remove frame metadata trailer from frame data + std::optional ExtractTrailer( + rtc::ArrayView data, + std::vector& out_data); + + const Direction direction_; + std::atomic enabled_{true}; + mutable webrtc::Mutex mutex_; + rtc::scoped_refptr callback_; + std::unordered_map> + sink_callbacks_; + // Send-side map: capture timestamp (us) -> frame metadata. + // Populated by store_frame_metadata(), consumed by TransformSend() + // via CaptureTime() lookup. + mutable webrtc::Mutex send_map_mutex_; + mutable std::unordered_map send_map_; + mutable std::deque send_map_order_; + static constexpr size_t kMaxSendMapEntries = 300; + + // Receive-side map: RTP timestamp -> frame metadata. + // Keyed by RTP timestamp so decoded frames can look up their + // metadata regardless of frame drops or reordering. + mutable webrtc::Mutex recv_map_mutex_; + mutable std::unordered_map recv_map_; + mutable std::deque recv_map_order_; + static constexpr size_t kMaxRecvMapEntries = 300; + + // Simulcast tracking: detect layer switches and flush stale entries. + mutable uint32_t recv_active_ssrc_{0}; +}; + +/// Wrapper class for Rust FFI that manages packet trailer transformers. +class PacketTrailerHandler { + public: + PacketTrailerHandler( + std::shared_ptr rtc_runtime, + rtc::scoped_refptr sender); + + PacketTrailerHandler( + std::shared_ptr rtc_runtime, + rtc::scoped_refptr receiver); + + ~PacketTrailerHandler() = default; + + /// Enable/disable timestamp embedding + void set_enabled(bool enabled) const; + bool enabled() const; + + /// Lookup the user timestamp for a given RTP timestamp (receiver side). + /// Returns UINT64_MAX if not found. The entry is removed after lookup. + /// Also caches the frame_id for retrieval via last_lookup_frame_id(). + uint64_t lookup_timestamp(uint32_t rtp_timestamp) const; + + /// Returns the frame_id from the most recent successful + /// lookup_timestamp() call. Returns 0 if no lookup succeeded. + uint32_t last_lookup_frame_id() const; + + /// Store frame metadata for a given capture timestamp (sender side). + void store_frame_metadata(int64_t capture_timestamp_us, + uint64_t user_timestamp_us, + uint32_t frame_id) const; + + /// Access the underlying transformer for chaining. + rtc::scoped_refptr transformer() const; + + private: + std::shared_ptr rtc_runtime_; + rtc::scoped_refptr transformer_; + rtc::scoped_refptr sender_; + rtc::scoped_refptr receiver_; + mutable uint32_t last_frame_id_{0}; +}; + +// Factory functions for Rust FFI + +std::shared_ptr new_packet_trailer_sender( + std::shared_ptr peer_factory, + std::shared_ptr sender); + +std::shared_ptr new_packet_trailer_receiver( + std::shared_ptr peer_factory, + std::shared_ptr receiver); + +} // namespace livekit_ffi diff --git a/webrtc-sys/include/livekit/video_track.h b/webrtc-sys/include/livekit/video_track.h index d27eb4daa..3f4e1c1ac 100644 --- a/webrtc-sys/include/livekit/video_track.h +++ b/webrtc-sys/include/livekit/video_track.h @@ -33,6 +33,7 @@ namespace livekit_ffi { class VideoTrack; class NativeVideoSink; class VideoTrackSource; +class PacketTrailerHandler; // forward declaration to avoid circular include } // namespace livekit_ffi #include "webrtc-sys/src/video_track.rs.h" @@ -98,12 +99,17 @@ class VideoTrackSource { SourceState state() const override; bool remote() const override; VideoResolution video_resolution() const; - bool on_captured_frame(const webrtc::VideoFrame& frame); + bool on_captured_frame(const webrtc::VideoFrame& frame, + const FrameMetadata& frame_metadata); + + void set_packet_trailer_handler( + std::shared_ptr handler); private: mutable webrtc::Mutex mutex_; webrtc::TimestampAligner timestamp_aligner_; VideoResolution resolution_; + std::shared_ptr packet_trailer_handler_; bool is_screencast_; }; @@ -112,9 +118,13 @@ class VideoTrackSource { VideoResolution video_resolution() const; - bool on_captured_frame(const std::unique_ptr& frame) + bool on_captured_frame(const std::unique_ptr& frame, + const FrameMetadata& frame_metadata) const; // frames pushed from Rust (+interior mutability) + void set_packet_trailer_handler( + std::shared_ptr handler) const; + webrtc::scoped_refptr get() const; private: diff --git a/webrtc-sys/src/frame_cryptor.cpp b/webrtc-sys/src/frame_cryptor.cpp index 9ed50f38d..01ef7feec 100644 --- a/webrtc-sys/src/frame_cryptor.cpp +++ b/webrtc-sys/src/frame_cryptor.cpp @@ -22,12 +22,61 @@ #include "api/make_ref_counted.h" #include "livekit/peer_connection.h" #include "livekit/peer_connection_factory.h" +#include "livekit/packet_trailer.h" #include "livekit/webrtc.h" #include "rtc_base/thread.h" #include "webrtc-sys/src/frame_cryptor.rs.h" namespace livekit_ffi { +class ChainedFrameTransformer : public webrtc::FrameTransformerInterface, + public webrtc::TransformedFrameCallback { + public: + ChainedFrameTransformer( + rtc::scoped_refptr first, + rtc::scoped_refptr second) + : first_(std::move(first)), second_(std::move(second)) {} + + void Transform( + std::unique_ptr frame) override { + first_->Transform(std::move(frame)); + } + + void RegisterTransformedFrameCallback( + rtc::scoped_refptr callback) override { + second_->RegisterTransformedFrameCallback(callback); + first_->RegisterTransformedFrameCallback( + rtc::scoped_refptr(this)); + } + + void RegisterTransformedFrameSinkCallback( + rtc::scoped_refptr callback, + uint32_t ssrc) override { + second_->RegisterTransformedFrameSinkCallback(callback, ssrc); + first_->RegisterTransformedFrameSinkCallback( + rtc::scoped_refptr(this), ssrc); + } + + void UnregisterTransformedFrameCallback() override { + first_->UnregisterTransformedFrameCallback(); + second_->UnregisterTransformedFrameCallback(); + } + + void UnregisterTransformedFrameSinkCallback(uint32_t ssrc) override { + first_->UnregisterTransformedFrameSinkCallback(ssrc); + second_->UnregisterTransformedFrameSinkCallback(ssrc); + } + + void OnTransformedFrame( + std::unique_ptr frame) override { + second_->Transform(std::move(frame)); + } + + private: + rtc::scoped_refptr first_; + rtc::scoped_refptr second_; +}; + webrtc::FrameCryptorTransformer::Algorithm AlgorithmToFrameCryptorAlgorithm( Algorithm algorithm) { switch (algorithm) { @@ -136,6 +185,40 @@ void FrameCryptor::unregister_observer() const { e2ee_transformer_->UnRegisterFrameCryptorTransformerObserver(); } +void FrameCryptor::set_packet_trailer_handler( + std::shared_ptr handler) const { + if (!handler) { + return; + } + + auto timestamp_transformer = handler->transformer(); + if (!timestamp_transformer) { + return; + } + + rtc::scoped_refptr first; + rtc::scoped_refptr second; + if (sender_) { + first = e2ee_transformer_; + second = timestamp_transformer; + } else if (receiver_) { + first = timestamp_transformer; + second = e2ee_transformer_; + } else { + return; + } + + chained_transformer_ = + rtc::make_ref_counted(first, second); + + if (sender_) { + sender_->SetEncoderToPacketizerFrameTransformer(chained_transformer_); + } + if (receiver_) { + receiver_->SetDepacketizerToDecoderFrameTransformer(chained_transformer_); + } +} + NativeFrameCryptorObserver::NativeFrameCryptorObserver( rust::Box observer, const FrameCryptor* fc) diff --git a/webrtc-sys/src/frame_cryptor.rs b/webrtc-sys/src/frame_cryptor.rs index 55774a0d2..c402ce0da 100644 --- a/webrtc-sys/src/frame_cryptor.rs +++ b/webrtc-sys/src/frame_cryptor.rs @@ -102,10 +102,12 @@ pub mod ffi { include!("livekit/rtp_sender.h"); include!("livekit/rtp_receiver.h"); include!("livekit/peer_connection_factory.h"); + include!("livekit/packet_trailer.h"); type RtpSender = crate::rtp_sender::ffi::RtpSender; type RtpReceiver = crate::rtp_receiver::ffi::RtpReceiver; type PeerConnectionFactory = crate::peer_connection_factory::ffi::PeerConnectionFactory; + type PacketTrailerHandler = crate::packet_trailer::ffi::PacketTrailerHandler; pub type FrameCryptor; @@ -141,6 +143,11 @@ pub mod ffi { ); pub fn unregister_observer(self: &FrameCryptor); + + pub fn set_packet_trailer_handler( + self: &FrameCryptor, + handler: SharedPtr, + ); } unsafe extern "C++" { diff --git a/webrtc-sys/src/lib.rs b/webrtc-sys/src/lib.rs index fde63e14a..94f4eed0c 100644 --- a/webrtc-sys/src/lib.rs +++ b/webrtc-sys/src/lib.rs @@ -27,6 +27,7 @@ pub mod helper; pub mod jsep; pub mod media_stream; pub mod media_stream_track; +pub mod packet_trailer; pub mod peer_connection; pub mod peer_connection_factory; pub mod prohibit_libsrtp_initialization; diff --git a/webrtc-sys/src/packet_trailer.cpp b/webrtc-sys/src/packet_trailer.cpp new file mode 100644 index 000000000..0d6899263 --- /dev/null +++ b/webrtc-sys/src/packet_trailer.cpp @@ -0,0 +1,467 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/packet_trailer.h" + +#include +#include + +#include "api/make_ref_counted.h" +#include "livekit/peer_connection_factory.h" +#include "livekit/rtp_receiver.h" +#include "livekit/rtp_sender.h" +#include "rtc_base/logging.h" +#include "webrtc-sys/src/packet_trailer.rs.h" + +namespace livekit_ffi { + +// PacketTrailerTransformer implementation + +PacketTrailerTransformer::PacketTrailerTransformer(Direction direction) + : direction_(direction) {} + +void PacketTrailerTransformer::Transform( + std::unique_ptr frame) { + uint32_t ssrc = frame->GetSsrc(); + uint32_t rtp_timestamp = frame->GetTimestamp(); + + if (!enabled_.load()) { + rtc::scoped_refptr cb; + { + webrtc::MutexLock lock(&mutex_); + auto it = sink_callbacks_.find(ssrc); + if (it != sink_callbacks_.end()) { + cb = it->second; + } else { + cb = callback_; + } + } + + if (cb) { + cb->OnTransformedFrame(std::move(frame)); + } else { + RTC_LOG(LS_WARNING) + << "PacketTrailerTransformer::Transform (disabled) has no callback" + << " direction=" + << (direction_ == Direction::kSend ? "send" : "recv") + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; + } + return; + } + + if (direction_ == Direction::kSend) { + TransformSend(std::move(frame)); + } else { + TransformReceive(std::move(frame)); + } +} + +void PacketTrailerTransformer::TransformSend( + std::unique_ptr frame) { + uint32_t rtp_timestamp = frame->GetTimestamp(); + uint32_t ssrc = frame->GetSsrc(); + + auto data = frame->GetData(); + + // Look up the frame metadata by the frame's capture time. + // CaptureTime() returns Timestamp::Millis(capture_time_ms_) where + // capture_time_ms_ = timestamp_us / 1000. So capture_time->us() + // has millisecond precision (bottom 3 digits always zero). + // store_frame_metadata() truncates its key the same way. + PacketTrailerMetadata meta_to_embed{0, 0, 0}; + auto capture_time = frame->CaptureTime(); + if (capture_time.has_value()) { + int64_t capture_us = capture_time->us(); + + webrtc::MutexLock lock(&send_map_mutex_); + auto it = send_map_.find(capture_us); + if (it != send_map_.end()) { + meta_to_embed = it->second; + // Don't erase — simulcast layers share the same capture time. + // Entries are pruned by capacity in store_frame_metadata(). + } + } else { + RTC_LOG(LS_WARNING) + << "PacketTrailerTransformer::TransformSend CaptureTime() not available" + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; + } + + // Always append trailer when enabled (even if timestamp is 0, + // which indicates no metadata was set for this frame) + std::vector new_data; + if (enabled_.load()) { + new_data = AppendTrailer(data, meta_to_embed.user_timestamp_us, + meta_to_embed.frame_id); + frame->SetData(rtc::ArrayView(new_data)); + } + + // Forward to the appropriate callback (either global or per-SSRC sink). + rtc::scoped_refptr cb; + { + webrtc::MutexLock lock(&mutex_); + auto it = sink_callbacks_.find(ssrc); + if (it != sink_callbacks_.end()) { + cb = it->second; + } else { + cb = callback_; + } + } + + if (cb) { + cb->OnTransformedFrame(std::move(frame)); + } else { + RTC_LOG(LS_WARNING) + << "PacketTrailerTransformer::TransformSend has no callback" + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; + } +} + +void PacketTrailerTransformer::TransformReceive( + std::unique_ptr frame) { + uint32_t ssrc = frame->GetSsrc(); + uint32_t rtp_timestamp = frame->GetTimestamp(); + auto data = frame->GetData(); + std::vector stripped_data; + + auto meta = ExtractTrailer(data, stripped_data); + + if (meta.has_value()) { + meta->ssrc = ssrc; + + { + webrtc::MutexLock lock(&recv_map_mutex_); + + // Detect simulcast layer switch (SSRC change). + // When the SFU switches us to a different layer, the old layer's + // entries are stale and can cause RTP timestamp collisions or + // return wrong user timestamps on lookup. Flush them. + if (recv_active_ssrc_ != 0 && recv_active_ssrc_ != ssrc) { + auto oit = recv_map_order_.begin(); + while (oit != recv_map_order_.end()) { + auto mit = recv_map_.find(*oit); + if (mit != recv_map_.end() && mit->second.ssrc != ssrc) { + recv_map_.erase(mit); + oit = recv_map_order_.erase(oit); + } else { + ++oit; + } + } + } + recv_active_ssrc_ = ssrc; + + bool collision = recv_map_.find(rtp_timestamp) != recv_map_.end(); + + // Evict oldest entry if at capacity + while (recv_map_.size() >= kMaxRecvMapEntries && + !recv_map_order_.empty()) { + auto evicted_rtp = recv_map_order_.front(); + recv_map_.erase(evicted_rtp); + recv_map_order_.pop_front(); + } + if (!collision) { + recv_map_order_.push_back(rtp_timestamp); + } + recv_map_[rtp_timestamp] = meta.value(); + } + + // Update frame with stripped data + frame->SetData(rtc::ArrayView(stripped_data)); + } + + // Forward to the appropriate callback (either global or per-SSRC sink). + rtc::scoped_refptr cb; + { + webrtc::MutexLock lock(&mutex_); + auto it = sink_callbacks_.find(ssrc); + if (it != sink_callbacks_.end()) { + cb = it->second; + } else { + cb = callback_; + } + } + + if (cb) { + cb->OnTransformedFrame(std::move(frame)); + } else { + RTC_LOG(LS_WARNING) + << "PacketTrailerTransformer::TransformReceive has no callback" + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; + } +} + +std::vector PacketTrailerTransformer::AppendTrailer( + rtc::ArrayView data, + uint64_t user_timestamp_us, + uint32_t frame_id) { + const bool has_frame_id = frame_id != 0; + const size_t trailer_len = kTimestampTlvSize + + (has_frame_id ? kFrameIdTlvSize : 0) + + kTrailerEnvelopeSize; + std::vector result; + result.reserve(data.size() + trailer_len); + + // Copy original data + result.insert(result.end(), data.begin(), data.end()); + + // All TLV bytes are XORed with 0xFF to prevent H.264 NAL start code + // sequences (0x000001 / 0x00000001) from appearing inside the trailer. + + // TLV: timestamp_us (tag=0x01, len=8, 8 bytes big-endian) + result.push_back(kTagTimestampUs ^ 0xFF); + result.push_back(8 ^ 0xFF); + for (int i = 7; i >= 0; --i) { + result.push_back( + static_cast(((user_timestamp_us >> (i * 8)) & 0xFF) ^ 0xFF)); + } + + if (has_frame_id) { + // TLV: frame_id (tag=0x02, len=4, 4 bytes big-endian) + result.push_back(kTagFrameId ^ 0xFF); + result.push_back(4 ^ 0xFF); + for (int i = 3; i >= 0; --i) { + result.push_back( + static_cast(((frame_id >> (i * 8)) & 0xFF) ^ 0xFF)); + } + } + + // Envelope: trailer_len (1B, XORed) + magic (4B, NOT XORed) + result.push_back(static_cast(trailer_len ^ 0xFF)); + result.insert(result.end(), std::begin(kPacketTrailerMagic), + std::end(kPacketTrailerMagic)); + + return result; +} + +std::optional PacketTrailerTransformer::ExtractTrailer( + rtc::ArrayView data, + std::vector& out_data) { + if (data.size() < kTrailerEnvelopeSize) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; + } + + // Check for magic bytes at the end + const uint8_t* magic_start = data.data() + data.size() - 4; + if (std::memcmp(magic_start, kPacketTrailerMagic, 4) != 0) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; + } + + uint8_t trailer_len = data[data.size() - 5] ^ 0xFF; + + if (trailer_len < kTrailerEnvelopeSize || trailer_len > data.size()) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; + } + + // Walk the TLV region: everything from trailer_start up to the envelope. + const uint8_t* trailer_start = data.data() + data.size() - trailer_len; + size_t tlv_region_len = trailer_len - kTrailerEnvelopeSize; + + PacketTrailerMetadata meta{0, 0, 0}; + bool found_any = false; + size_t pos = 0; + + while (pos + 2 <= tlv_region_len) { + uint8_t tag = trailer_start[pos] ^ 0xFF; + uint8_t len = trailer_start[pos + 1] ^ 0xFF; + pos += 2; + + if (pos + len > tlv_region_len) { + break; + } + + const uint8_t* val = trailer_start + pos; + + if (tag == kTagTimestampUs && len == 8) { + uint64_t ts = 0; + for (int i = 0; i < 8; ++i) { + ts = (ts << 8) | (val[i] ^ 0xFF); + } + meta.user_timestamp_us = ts; + found_any = true; + } else if (tag == kTagFrameId && len == 4) { + uint32_t fid = 0; + for (int i = 0; i < 4; ++i) { + fid = (fid << 8) | (val[i] ^ 0xFF); + } + meta.frame_id = fid; + found_any = true; + } + // Unknown tags are silently skipped. + + pos += len; + } + + out_data.assign(data.begin(), data.end() - trailer_len); + + if (!found_any) { + return std::nullopt; + } + return meta; +} + +void PacketTrailerTransformer::RegisterTransformedFrameCallback( + rtc::scoped_refptr callback) { + webrtc::MutexLock lock(&mutex_); + callback_ = callback; +} + +void PacketTrailerTransformer::RegisterTransformedFrameSinkCallback( + rtc::scoped_refptr callback, + uint32_t ssrc) { + webrtc::MutexLock lock(&mutex_); + sink_callbacks_[ssrc] = callback; +} + +void PacketTrailerTransformer::UnregisterTransformedFrameCallback() { + webrtc::MutexLock lock(&mutex_); + callback_ = nullptr; +} + +void PacketTrailerTransformer::UnregisterTransformedFrameSinkCallback( + uint32_t ssrc) { + webrtc::MutexLock lock(&mutex_); + sink_callbacks_.erase(ssrc); +} + +void PacketTrailerTransformer::set_enabled(bool enabled) { + enabled_.store(enabled); +} + +bool PacketTrailerTransformer::enabled() const { + return enabled_.load(); +} + +std::optional PacketTrailerTransformer::lookup_frame_metadata( + uint32_t rtp_timestamp) { + webrtc::MutexLock lock(&recv_map_mutex_); + auto it = recv_map_.find(rtp_timestamp); + if (it == recv_map_.end()) { + return std::nullopt; + } + PacketTrailerMetadata meta = it->second; + recv_map_.erase(it); + for (auto oit = recv_map_order_.begin(); oit != recv_map_order_.end(); + ++oit) { + if (*oit == rtp_timestamp) { + recv_map_order_.erase(oit); + break; + } + } + return meta; +} + +void PacketTrailerTransformer::store_frame_metadata( + int64_t capture_timestamp_us, + uint64_t user_timestamp_us, + uint32_t frame_id) { + // Truncate to millisecond precision to match what WebRTC stores + // internally. The encoder pipeline converts the VideoFrame's + // timestamp_us to capture_time_ms_ = timestamp_us / 1000, and + // CaptureTime() returns Timestamp::Millis(capture_time_ms_). + // When we call capture_time->us() in TransformSend we get a value + // with the bottom 3 digits zeroed, so we must store with the same + // truncation to ensure the lookup succeeds. + // + // The caller (VideoTrackSource::on_captured_frame) passes the + // TimestampAligner-adjusted timestamp here, which is the same + // value that becomes CaptureTime() in the encoder pipeline. + int64_t key = (capture_timestamp_us / 1000) * 1000; + + webrtc::MutexLock lock(&send_map_mutex_); + + // Evict oldest entries if at capacity + while (send_map_.size() >= kMaxSendMapEntries && !send_map_order_.empty()) { + send_map_.erase(send_map_order_.front()); + send_map_order_.pop_front(); + } + + if (send_map_.find(key) == send_map_.end()) { + send_map_order_.push_back(key); + } + send_map_[key] = PacketTrailerMetadata{user_timestamp_us, frame_id, 0}; +} + +// PacketTrailerHandler implementation + +PacketTrailerHandler::PacketTrailerHandler( + std::shared_ptr rtc_runtime, + rtc::scoped_refptr sender) + : rtc_runtime_(rtc_runtime), sender_(sender) { + transformer_ = rtc::make_ref_counted( + PacketTrailerTransformer::Direction::kSend); + sender->SetEncoderToPacketizerFrameTransformer(transformer_); +} + +PacketTrailerHandler::PacketTrailerHandler( + std::shared_ptr rtc_runtime, + rtc::scoped_refptr receiver) + : rtc_runtime_(rtc_runtime), receiver_(receiver) { + transformer_ = rtc::make_ref_counted( + PacketTrailerTransformer::Direction::kReceive); + receiver->SetDepacketizerToDecoderFrameTransformer(transformer_); +} + +void PacketTrailerHandler::set_enabled(bool enabled) const { + transformer_->set_enabled(enabled); +} + +bool PacketTrailerHandler::enabled() const { + return transformer_->enabled(); +} + +uint64_t PacketTrailerHandler::lookup_timestamp(uint32_t rtp_timestamp) const { + auto meta = transformer_->lookup_frame_metadata(rtp_timestamp); + if (meta.has_value()) { + last_frame_id_ = meta->frame_id; + return meta->user_timestamp_us; + } + return UINT64_MAX; +} + +uint32_t PacketTrailerHandler::last_lookup_frame_id() const { + return last_frame_id_; +} + +void PacketTrailerHandler::store_frame_metadata( + int64_t capture_timestamp_us, + uint64_t user_timestamp_us, + uint32_t frame_id) const { + transformer_->store_frame_metadata(capture_timestamp_us, user_timestamp_us, frame_id); +} + +rtc::scoped_refptr PacketTrailerHandler::transformer() const { + return transformer_; +} + +// Factory functions + +std::shared_ptr new_packet_trailer_sender( + std::shared_ptr peer_factory, + std::shared_ptr sender) { + return std::make_shared( + peer_factory->rtc_runtime(), sender->rtc_sender()); +} + +std::shared_ptr new_packet_trailer_receiver( + std::shared_ptr peer_factory, + std::shared_ptr receiver) { + return std::make_shared( + peer_factory->rtc_runtime(), receiver->rtc_receiver()); +} + +} // namespace livekit_ffi diff --git a/webrtc-sys/src/packet_trailer.rs b/webrtc-sys/src/packet_trailer.rs new file mode 100644 index 000000000..03c3408ab --- /dev/null +++ b/webrtc-sys/src/packet_trailer.rs @@ -0,0 +1,69 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::impl_thread_safety; + +#[cxx::bridge(namespace = "livekit_ffi")] +pub mod ffi { + unsafe extern "C++" { + include!("livekit/packet_trailer.h"); + include!("livekit/rtp_sender.h"); + include!("livekit/rtp_receiver.h"); + include!("livekit/peer_connection_factory.h"); + + type RtpSender = crate::rtp_sender::ffi::RtpSender; + type RtpReceiver = crate::rtp_receiver::ffi::RtpReceiver; + type PeerConnectionFactory = crate::peer_connection_factory::ffi::PeerConnectionFactory; + + /// Handler for packet trailer embedding/extraction on RTP streams. + pub type PacketTrailerHandler; + + /// Enable/disable timestamp embedding. + fn set_enabled(self: &PacketTrailerHandler, enabled: bool); + + /// Check if timestamp embedding is enabled. + fn enabled(self: &PacketTrailerHandler) -> bool; + + /// Lookup the user timestamp for a given RTP timestamp (receiver side). + /// Returns -1 if not found. The entry is removed after lookup. + /// Also caches the frame_id for retrieval via last_lookup_frame_id(). + fn lookup_timestamp(self: &PacketTrailerHandler, rtp_timestamp: u32) -> u64; + + /// Returns the frame_id from the most recent successful + /// lookup_timestamp() call. + fn last_lookup_frame_id(self: &PacketTrailerHandler) -> u32; + + /// Store frame metadata for a given capture timestamp (sender side). + fn store_frame_metadata( + self: &PacketTrailerHandler, + capture_timestamp_us: i64, + user_timestamp_us: u64, + frame_id: u32, + ); + + /// Create a new packet trailer handler for a sender. + fn new_packet_trailer_sender( + peer_factory: SharedPtr, + sender: SharedPtr, + ) -> SharedPtr; + + /// Create a new packet trailer handler for a receiver. + fn new_packet_trailer_receiver( + peer_factory: SharedPtr, + receiver: SharedPtr, + ) -> SharedPtr; + } +} + +impl_thread_safety!(ffi::PacketTrailerHandler, Send + Sync); diff --git a/webrtc-sys/src/video_track.cpp b/webrtc-sys/src/video_track.cpp index 111575786..637cad28e 100644 --- a/webrtc-sys/src/video_track.cpp +++ b/webrtc-sys/src/video_track.cpp @@ -26,6 +26,7 @@ #include "audio/remix_resample.h" #include "common_audio/include/audio_util.h" #include "livekit/media_stream.h" +#include "livekit/packet_trailer.h" #include "livekit/video_track.h" #include "rtc_base/logging.h" #include "rtc_base/ref_counted_object.h" @@ -133,12 +134,23 @@ VideoResolution VideoTrackSource::InternalSource::video_resolution() const { } bool VideoTrackSource::InternalSource::on_captured_frame( - const webrtc::VideoFrame& frame) { + const webrtc::VideoFrame& frame, + const FrameMetadata& frame_metadata) { webrtc::MutexLock lock(&mutex_); int64_t aligned_timestamp_us = timestamp_aligner_.TranslateTimestamp( frame.timestamp_us(), webrtc::TimeMicros()); + // If a packet trailer was provided on this frame and we have a handler, + // store the mapping keyed by the aligned timestamp. This is the value + // that CaptureTime() will return in TransformSend, so the lookup will + // succeed. + if (frame_metadata.has_packet_trailer && packet_trailer_handler_) { + packet_trailer_handler_->store_frame_metadata( + aligned_timestamp_us, frame_metadata.user_timestamp_us, + frame_metadata.frame_id); + } + webrtc::scoped_refptr buffer = frame.video_frame_buffer(); @@ -175,6 +187,12 @@ bool VideoTrackSource::InternalSource::on_captured_frame( return true; } +void VideoTrackSource::InternalSource::set_packet_trailer_handler( + std::shared_ptr handler) { + webrtc::MutexLock lock(&mutex_); + packet_trailer_handler_ = std::move(handler); +} + VideoTrackSource::VideoTrackSource(const VideoResolution& resolution, bool is_screencast) { source_ = webrtc::make_ref_counted(resolution, is_screencast); } @@ -184,9 +202,15 @@ VideoResolution VideoTrackSource::video_resolution() const { } bool VideoTrackSource::on_captured_frame( - const std::unique_ptr& frame) const { + const std::unique_ptr& frame, + const FrameMetadata& frame_metadata) const { auto rtc_frame = frame->get(); - return source_->on_captured_frame(rtc_frame); + return source_->on_captured_frame(rtc_frame, frame_metadata); +} + +void VideoTrackSource::set_packet_trailer_handler( + std::shared_ptr handler) const { + source_->set_packet_trailer_handler(std::move(handler)); } webrtc::scoped_refptr VideoTrackSource::get() diff --git a/webrtc-sys/src/video_track.rs b/webrtc-sys/src/video_track.rs index d18712281..dccf9668f 100644 --- a/webrtc-sys/src/video_track.rs +++ b/webrtc-sys/src/video_track.rs @@ -42,6 +42,13 @@ pub mod ffi { pub height: u32, } + #[derive(Debug)] + pub struct FrameMetadata { + pub has_packet_trailer: bool, + pub user_timestamp_us: u64, + pub frame_id: u32, + } + extern "C++" { include!("livekit/video_frame.h"); include!("livekit/media_stream_track.h"); @@ -50,9 +57,15 @@ pub mod ffi { type MediaStreamTrack = crate::media_stream_track::ffi::MediaStreamTrack; } - unsafe extern "C++" { + extern "C++" { + include!("livekit/packet_trailer.h"); include!("livekit/video_track.h"); + type PacketTrailerHandler = crate::packet_trailer::ffi::PacketTrailerHandler; + } + + unsafe extern "C++" { + type VideoTrack; type NativeVideoSink; type VideoTrackSource; @@ -66,7 +79,15 @@ pub mod ffi { fn new_native_video_sink(observer: Box) -> SharedPtr; fn video_resolution(self: &VideoTrackSource) -> VideoResolution; - fn on_captured_frame(self: &VideoTrackSource, frame: &UniquePtr) -> bool; + fn on_captured_frame( + self: &VideoTrackSource, + frame: &UniquePtr, + frame_metadata: &FrameMetadata, + ) -> bool; + fn set_packet_trailer_handler( + self: &VideoTrackSource, + handler: SharedPtr, + ); fn new_video_track_source( resolution: &VideoResolution, is_screencast: bool,