diff --git a/crates/video-streamer/src/streamer/mod.rs b/crates/video-streamer/src/streamer/mod.rs index 2bce9059e..7d4f13eb7 100644 --- a/crates/video-streamer/src/streamer/mod.rs +++ b/crates/video-streamer/src/streamer/mod.rs @@ -220,8 +220,10 @@ pub fn webm_stream( } } }); - // If the oneshot sender is dropped (task panicked), treat as Break - rx.blocking_recv().unwrap_or(WhenEofControlFlow::Break) + rx.blocking_recv().unwrap_or_else(|_| { + warn!("when_eof oneshot sender dropped unexpectedly, treating as shutdown"); + WhenEofControlFlow::Break + }) } enum WhenEofControlFlow { diff --git a/crates/video-streamer/src/streamer/tag_writers.rs b/crates/video-streamer/src/streamer/tag_writers.rs index 61557df57..70cd2e72e 100644 --- a/crates/video-streamer/src/streamer/tag_writers.rs +++ b/crates/video-streamer/src/streamer/tag_writers.rs @@ -20,6 +20,10 @@ const VPX_EFLAG_FORCE_KF: u32 = 0x00000001; /// - 0 = never skip (disables adaptive skipping effectively) const MAX_CONSECUTIVE_FRAME_SKIPS: u32 = 1; +/// Interval in milliseconds at which keyframes are forced in the outgoing stream. +/// Periodic keyframes improve seekability and resilience to packet loss. +const KEYFRAME_INTERVAL_MS: u64 = 10_000; + #[cfg(feature = "perf-diagnostics")] fn duration_as_millis_u64(duration: Duration) -> u64 { u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) @@ -111,6 +115,7 @@ where codec: VpxCodec, cut_block_state: CutBlockState, last_encoded_abs_time: Option, + last_keyframe_abs_time: Option, // Adaptive frame skipping state #[cfg(feature = "perf-diagnostics")] @@ -204,6 +209,7 @@ where codec: config.codec, cut_block_state: CutBlockState::HaventMet, last_encoded_abs_time: None, + last_keyframe_abs_time: None, #[cfg(feature = "perf-diagnostics")] stream_start: Instant::now(), processing_time: Duration::ZERO, @@ -399,6 +405,13 @@ where } } + fn should_force_keyframe(&self, abs_time: u64) -> bool { + match self.last_keyframe_abs_time { + Some(last_kf_time) => abs_time.saturating_sub(last_kf_time) >= KEYFRAME_INTERVAL_MS, + None => false, + } + } + fn should_skip_encode(&self) -> bool { // Skip encoding when falling behind real-time. The ratio naturally self-regulates: // skipping makes processing faster (decode-only), which pushes ratio back above 1.0, @@ -484,6 +497,7 @@ where perf_trace!(block_timestamp, "Writing block to output"); self.write_block(block)?; self.last_encoded_abs_time = Some(abs_time); + self.last_keyframe_abs_time = Some(abs_time); } CutBlockState::Met { cut_block_absolute_time, @@ -513,8 +527,10 @@ where ); self.frames_since_last_encode = 0; + let force_keyframe = self.should_force_keyframe(abs_time); + let duration = self.compute_encode_duration(abs_time); - let frame = self.reencode(current_video_block, false, duration)?; + let frame = self.reencode(current_video_block, force_keyframe, duration)?; let Some(frame) = frame else { perf_trace!(block_timestamp, "No frame available from encoder - skipping"); return Ok(()); @@ -524,10 +540,25 @@ where let frame_size = frame.len(); perf_trace!(block_timestamp, frame_size, "Frame available from encoder"); - let timestamp = self.compute_met_timestamp(cut_block_absolute_time, abs_time)?; - let block = SimpleBlock::new_uncheked(&frame, 1, timestamp, false, None, false, false); - perf_trace!(block_timestamp, "Writing block to output"); - self.write_block(block)?; + if force_keyframe { + // Start a new cluster for the forced keyframe so the stream remains seekable. + let cluster_rel = abs_time - cut_block_absolute_time; + self.maybe_report_realtime_ratio(abs_time, cluster_rel); + self.start_new_cluster(cluster_rel)?; + self.cut_block_state = CutBlockState::Met { + cut_block_absolute_time, + last_cluster_relative_time: cluster_rel, + }; + let block = SimpleBlock::new_uncheked(&frame, 1, 0, false, None, false, true); + perf_trace!(block_timestamp, "Writing forced keyframe block to output"); + self.write_block(block)?; + self.last_keyframe_abs_time = Some(abs_time); + } else { + let timestamp = self.compute_met_timestamp(cut_block_absolute_time, abs_time)?; + let block = SimpleBlock::new_uncheked(&frame, 1, timestamp, false, None, false, false); + perf_trace!(block_timestamp, "Writing block to output"); + self.write_block(block)?; + } self.last_encoded_abs_time = Some(abs_time); } }