Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/video-streamer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,10 @@ pub fn webm_stream(
}
}
});
// If the oneshot sender is dropped (task panicked), treat as Break
rx.blocking_recv().unwrap_or(WhenEofControlFlow::Break)
rx.blocking_recv().unwrap_or_else(|_| {
warn!("when_eof oneshot sender dropped unexpectedly, treating as shutdown");
WhenEofControlFlow::Break
})
}

enum WhenEofControlFlow {
Expand Down
41 changes: 36 additions & 5 deletions crates/video-streamer/src/streamer/tag_writers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const VPX_EFLAG_FORCE_KF: u32 = 0x00000001;
/// - 0 = never skip (disables adaptive skipping effectively)
const MAX_CONSECUTIVE_FRAME_SKIPS: u32 = 1;

/// Interval in milliseconds at which keyframes are forced in the outgoing stream.
/// Periodic keyframes improve seekability and resilience to packet loss.
const KEYFRAME_INTERVAL_MS: u64 = 10_000;

Comment on lines +23 to +26
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR description mentions adding “codec-aware VP9 keyframe detection in tag_writers.rs”, but the change here introduces periodic forced keyframes (interval-based) rather than keyframe detection. If VP9 detection work is happening elsewhere, consider updating the PR description; otherwise, ensure the intended VP9 keyframe-detection fix is included.

Copilot uses AI. Check for mistakes.
#[cfg(feature = "perf-diagnostics")]
fn duration_as_millis_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
Expand Down Expand Up @@ -111,6 +115,7 @@ where
codec: VpxCodec,
cut_block_state: CutBlockState,
last_encoded_abs_time: Option<u64>,
last_keyframe_abs_time: Option<u64>,

// Adaptive frame skipping state
#[cfg(feature = "perf-diagnostics")]
Expand Down Expand Up @@ -204,6 +209,7 @@ where
codec: config.codec,
cut_block_state: CutBlockState::HaventMet,
last_encoded_abs_time: None,
last_keyframe_abs_time: None,
#[cfg(feature = "perf-diagnostics")]
stream_start: Instant::now(),
processing_time: Duration::ZERO,
Expand Down Expand Up @@ -399,6 +405,13 @@ where
}
}

fn should_force_keyframe(&self, abs_time: u64) -> bool {
match self.last_keyframe_abs_time {
Some(last_kf_time) => abs_time.saturating_sub(last_kf_time) >= KEYFRAME_INTERVAL_MS,
None => false,
}
}

fn should_skip_encode(&self) -> bool {
// Skip encoding when falling behind real-time. The ratio naturally self-regulates:
// skipping makes processing faster (decode-only), which pushes ratio back above 1.0,
Expand Down Expand Up @@ -484,6 +497,7 @@ where
perf_trace!(block_timestamp, "Writing block to output");
self.write_block(block)?;
self.last_encoded_abs_time = Some(abs_time);
self.last_keyframe_abs_time = Some(abs_time);
}
CutBlockState::Met {
cut_block_absolute_time,
Expand Down Expand Up @@ -513,8 +527,10 @@ where
);
self.frames_since_last_encode = 0;

let force_keyframe = self.should_force_keyframe(abs_time);

let duration = self.compute_encode_duration(abs_time);
let frame = self.reencode(current_video_block, false, duration)?;
let frame = self.reencode(current_video_block, force_keyframe, duration)?;
let Some(frame) = frame else {
perf_trace!(block_timestamp, "No frame available from encoder - skipping");
return Ok(());
Expand All @@ -524,10 +540,25 @@ where
let frame_size = frame.len();
perf_trace!(block_timestamp, frame_size, "Frame available from encoder");

let timestamp = self.compute_met_timestamp(cut_block_absolute_time, abs_time)?;
let block = SimpleBlock::new_uncheked(&frame, 1, timestamp, false, None, false, false);
perf_trace!(block_timestamp, "Writing block to output");
self.write_block(block)?;
if force_keyframe {
// Start a new cluster for the forced keyframe so the stream remains seekable.
let cluster_rel = abs_time - cut_block_absolute_time;
self.maybe_report_realtime_ratio(abs_time, cluster_rel);
self.start_new_cluster(cluster_rel)?;
self.cut_block_state = CutBlockState::Met {
cut_block_absolute_time,
last_cluster_relative_time: cluster_rel,
};
Comment on lines +547 to +551
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the forced-keyframe path, cut_block_state is updated to Met { .. last_cluster_relative_time: cluster_rel } before start_new_cluster(cluster_rel)? runs. If start_new_cluster fails (e.g., write error), the writer state is mutated even though the new cluster wasn't actually emitted. Consider calling start_new_cluster first and only updating cut_block_state after it succeeds (mirrors the ordering used in compute_met_timestamp).

Copilot uses AI. Check for mistakes.
let block = SimpleBlock::new_uncheked(&frame, 1, 0, false, None, false, true);
perf_trace!(block_timestamp, "Writing forced keyframe block to output");
self.write_block(block)?;
self.last_keyframe_abs_time = Some(abs_time);
} else {
Comment on lines 530 to +556
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new periodic keyframe forcing / cluster-splitting behavior doesn’t appear to be covered by tests. Since this is important for session shadowing correctness, consider adding an integration test (similar to existing webm_stream_* tests) that runs long enough to cross KEYFRAME_INTERVAL_MS and asserts that a new cluster starts and the corresponding SimpleBlock is marked as a keyframe.

Copilot uses AI. Check for mistakes.
let timestamp = self.compute_met_timestamp(cut_block_absolute_time, abs_time)?;
let block = SimpleBlock::new_uncheked(&frame, 1, timestamp, false, None, false, false);
perf_trace!(block_timestamp, "Writing block to output");
self.write_block(block)?;
}
self.last_encoded_abs_time = Some(abs_time);
}
}
Expand Down