moq-mux: catalog filter/target and Annex-B exporters#1487
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (10)
WalkthroughThis PR implements end-to-end raw H.264/H.265 Annex-B export with catalog filtering and constraint-based rendition selection. It adds codec-family abstractions to the Hang library, introduces a unified Stream trait for catalog snapshots with Filter/Target wrappers, implements H.264 and H.265 single-rendition exporters with parameter-set parsing and length-prefixed→Annex‑B conversion, generalizes fMP4 and MKV exporters to accept external catalog streams, and extends the CLI subscribe command with new output formats and rendition selection flags. All components compose through a pipeline where catalogs flow through filtering, targeting, and export layers. 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-cli/src/subscribe.rs`:
- Around line 149-153: The current codec selection lets self.video_codec
override the format-implied codec (codec variable), enabling contradictory
inputs like `--format h264 --video-codec h265`; modify the logic in the
Subscribe request handling (where codec is computed from self.video_codec and
self.format, and types SubscribeFormat and VideoCodecKind are used) to enforce
consistency for Annex-B formats: if self.format is SubscribeFormat::H264 or
SubscribeFormat::H265 then if self.video_codec is Some(...) and it does not
match the format-implied VideoCodecKind, return an error (or propagate a
descriptive validation error) instead of silently choosing one, otherwise set
codec to the single implied VideoCodecKind when not provided by the user. Ensure
the check happens before any exporter setup so conflicts are deterministic and
fail fast.
In `@rs/moq-mux/src/codec/annexb.rs`:
- Around line 22-30: The code currently does arithmetic like pos + length_size
and pos + len before bounds checks which can overflow; update the parsing in
annexb.rs to use checked_add for both additions: first compute let len_pos =
pos.checked_add(length_size).ok_or_else(|| anyhow::anyhow!("NAL length prefix
overflow"))? and use anyhow::ensure!(payload.len() >= len_pos, ...) then read
the length bytes and compute len, then compute let payload_end =
len_pos.checked_add(len).ok_or_else(|| anyhow::anyhow!("NAL payload length
overflow"))? and use anyhow::ensure!(payload.len() >= payload_end, ...); finally
slice payload[len_pos..payload_end] and set pos = payload_end; reference
symbols: pos, length_size, len, payload, out, START_CODE in annexb.rs.
In `@rs/moq-mux/src/codec/h264/export.rs`:
- Around line 148-153: The Avc1 conversion currently proceeds even when
parse_avcc_param_sets returns empty SPS/PPS; change the code that constructs
Avc1Convert to validate params.sps and params.pps and fail fast if either is
empty. Specifically, after calling parse_avcc_param_sets(avcc) and before
building prefix/returning Some(Avc1Convert { ... }), check params.sps.is_empty()
or params.pps.is_empty() and return None (or propagate an error) instead of
creating Avc1Convert; this ensures that Avc1Convert, its length_size and
keyframe_prefix, are only created when at least one SPS and one PPS are present.
- Around line 140-142: The current early-return in the method that checks only
self.track.as_ref().is_some_and(|t| t.name == *name) can skip necessary
refreshes when codec/description changed; update the comparison to include the
rendition/config identity (e.g., compare t.description or codec details, a
config_id, or perform full config equality) or remove the name-only shortcut so
the track is rebuilt when the snapshot/config differs; modify the condition
around self.track and name to also verify the config/description matches (or
always rebuild on snapshot changes) so stale conversion config cannot persist.
In `@rs/moq-mux/src/codec/h264/mod.rs`:
- Around line 170-188: The avcC parser uses expressions like avcc.len() >= pos +
2 and pos + len which can overflow; change the arithmetic to use checked_add and
fail with an appropriate anyhow error when it returns None. For each occurrence
(parsing SPS and PPS lengths and payloads in mod.rs where avcc, pos, sps, and
pps are used), compute end = pos.checked_add(2).ok_or_else(||
anyhow::anyhow!("avcC truncated in SPS/PPS length"))? (or use anyhow::ensure on
None), use end <= avcc.len() to validate, then for payload compute end2 =
end.checked_add(len).ok_or_else(|| anyhow::anyhow!("avcC truncated in SPS/PPS
payload"))? and check end2 <= avcc.len(); push slices using these checked
indices and update pos = end2. Ensure all pos arithmetic uses checked_add to
prevent usize overflow panics.
In `@rs/moq-mux/src/codec/h265/export.rs`:
- Around line 131-133: The early return that only checks
self.track.as_ref().is_some_and(|t| t.name == *name) can leave stale codec/state
when catalog entries change but the rendition name stays the same; update the
guard in the function containing self.track so it also compares the codec/config
(e.g., hvcC/codec fields or the whole track config) between the existing
self.track and the incoming track info instead of only t.name, or remove the
short-circuit and explicitly refresh/replace self.track when any relevant config
fields differ; look for symbols self.track, t.name and the codec/config fields
(hvcC, codec parameters, or track equality method) to implement the proper
comparison and refresh logic.
- Around line 139-144: Ensure parse_hvcc_param_sets results include non-empty
params.vps, params.sps and params.pps before constructing Hvc1Convert: if any of
those vectors are empty return an Err early rather than proceeding; reference
the parse_hvcc_param_sets call, the returned params.vps/sps/pps, and the
Hvc1Convert creation (length_size and keyframe_prefix built via
annexb::build_prefix) so the code aborts when required VPS/SPS/PPS sets are
missing and avoids emitting keyframes without the proper prefix.
In `@rs/moq-mux/src/codec/h265/mod.rs`:
- Around line 41-52: The hvcc parsing uses unchecked additions like pos + 1 and
pos + len that can overflow; replace those comparisons with checked_add-based
bounds checks: use pos.checked_add(1) / pos.checked_add(2) /
pos.checked_add(len) to compute end indices, convert the Option to an error (via
anyhow::ensure or an early return) if None, then compare hvcc.len() >= end and
use the computed end for slicing (e.g., for the u16::from_be_bytes inputs and
the Bytes::copy_from_slice call inside the for loop over num_nalus). Ensure you
update all occurrences (the initial nal_type read, the num_nalus read, and the
NAL length/payload slicing) to use checked_add and the validated end indices
instead of raw pos + n arithmetic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d12a3c04-a55d-490f-b061-37bf15ba5643
📒 Files selected for processing (19)
rs/hang/src/catalog/audio/codec.rsrs/hang/src/catalog/video/codec.rsrs/moq-cli/src/subscribe.rsrs/moq-mux/src/catalog/consumer.rsrs/moq-mux/src/catalog/filter.rsrs/moq-mux/src/catalog/mod.rsrs/moq-mux/src/catalog/stream.rsrs/moq-mux/src/catalog/target.rsrs/moq-mux/src/codec/annexb.rsrs/moq-mux/src/codec/h264/export.rsrs/moq-mux/src/codec/h264/mod.rsrs/moq-mux/src/codec/h265/export.rsrs/moq-mux/src/codec/h265/mod.rsrs/moq-mux/src/container/fmp4/export.rsrs/moq-mux/src/container/fmp4/export_test.rsrs/moq-mux/src/container/mkv/export.rsrs/moq-mux/src/container/mkv/export_test.rsrs/moq-mux/src/container/mod.rsrs/moq-mux/src/container/source.rs
Adds a small composable layer for narrowing a broadcast's catalog before handing it to an exporter, plus single-rendition H.264 / H.265 Annex-B exporters that emit raw elementary streams for piping into ffmpeg or similar tools. The catalog crate gains a public `Stream` trait, a unified `Consumer` (promoted from the internal `CatalogSource`), and two wrappers: - `Filter<S>` hard-matches on rendition name and codec family. - `Target<S>` soft-matches on max width/height/pixels/bitrate and reduces each axis to one rendition (Rust port of js/watch's selection algorithm). Both wrappers cache the last input snapshot and re-emit on `set_*`, which is the seam future bandwidth-driven ABR will drive. `fmp4::Export` and `mkv::Export` are now generic over `S: catalog::Stream`; the legacy `with_catalog_format` constructor is gone since callers can build the stream themselves. The new `codec::h264::Export` / `codec::h265::Export` subscribe to a single rendition via `ExportSource::for_video_raw` (no avc1/hvc1 shape transform) and emit Annex-B bytes. avc3/hev1 sources pass through; avc1 / hvc1 sources are converted via `annexb::from_length_prefixed` with VPS/SPS/PPS injected at every keyframe from the avcC/hvcC. Timestamps are dropped (Annex-B has no container framing). The CLI gains `H264` / `H265` formats plus video/audio filter and target flags, wired through the `Filter` then `Target` chain. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
20df921 to
5f1b97a
Compare
- annexb/avcC/hvcC parsers use checked_add for offset arithmetic so malformed configs surface as errors instead of usize panics on overflow. - h264::Export and h265::Export now compare the full VideoConfig (not just the rendition name) when deciding to refresh the source, so catalog updates that change codec details while keeping the same name rebuild correctly. - avc1 / hvc1 conversion fails fast when parse_avcc_param_sets / parse_hvcc_param_sets returns empty parameter sets, instead of silently emitting keyframes without injected SPS/PPS (and VPS). - moq subscribe errors when --video-codec contradicts the format- implied codec (--format h264 + --video-codec h265 etc.) so misuse fails in the CLI rather than later in the exporter. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (7)
rs/moq-mux/src/codec/annexb.rs (1)
22-31:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse
checked_addbefore bounds checks and slicing.
pos + length_size/pos + lencan overflow before validation, allowing malformed input to reach panicking slices.Suggested fix
- anyhow::ensure!(payload.len() >= pos + length_size, "truncated NAL length prefix"); + let end_prefix = pos + .checked_add(length_size) + .context("overflow while reading NAL length prefix")?; + anyhow::ensure!(payload.len() >= end_prefix, "truncated NAL length prefix"); let mut len = 0usize; - for byte in &payload[pos..pos + length_size] { + for byte in &payload[pos..end_prefix] { len = (len << 8) | (*byte as usize); } - pos += length_size; - anyhow::ensure!(payload.len() >= pos + len, "truncated NAL payload"); + pos = end_prefix; + let end_nal = pos + .checked_add(len) + .context("overflow while reading NAL payload")?; + anyhow::ensure!(payload.len() >= end_nal, "truncated NAL payload"); out.extend_from_slice(&START_CODE); - out.extend_from_slice(&payload[pos..pos + len]); - pos += len; + out.extend_from_slice(&payload[pos..end_nal]); + pos = end_nal;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/annexb.rs` around lines 22 - 31, The bounds checks and slices in the Annex B NAL parsing loop can overflow because expressions like pos + length_size and pos + len are computed before ensuring they don't wrap; change those to use checked_add (e.g., let end = pos.checked_add(length_size).ok_or_else(|| anyhow::anyhow!("truncated NAL length prefix"))? ) and similarly compute let payload_end = pos.checked_add(len).ok_or_else(|| anyhow::anyhow!("truncated NAL payload"))? ), perform the ensure/ok_or on those checked results, then use payload[pos..end] and payload[pos..payload_end] and update pos = end / pos = payload_end; apply this to the code that builds len and extends out (references: pos, length_size, len, payload, out, START_CODE) so no pre-check overflow can occur.rs/moq-mux/src/codec/h264/export.rs (2)
148-153:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRequire SPS and PPS before enabling
avc1conversion.Conversion currently proceeds even when avcC yields no SPS/PPS, which can produce undecodable keyframes.
Proposed fix
let params = super::parse_avcc_param_sets(avcc)?; +anyhow::ensure!(!params.sps.is_empty(), "avcC missing SPS"); +anyhow::ensure!(!params.pps.is_empty(), "avcC missing PPS"); let prefix = annexb::build_prefix(params.sps.iter().chain(params.pps.iter()));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/h264/export.rs` around lines 148 - 153, The current Avc1Convert creation enables avc1 conversion even when parse_avcc_param_sets(avcc) returns no SPS/PPS, leading to undecodable keyframes; update the logic around Avc1Convert construction (the parse_avcc_param_sets call and the Avc1Convert { length_size, keyframe_prefix } creation) to first check that params.sps and params.pps are non-empty (and possibly length_size is valid) and only return Some(Avc1Convert) when both SPS and PPS exist; otherwise return None (or skip conversion) so avc1 is not enabled without required parameter sets.
140-142:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRefresh track on config changes, not just rendition name.
The early return can preserve stale conversion/source state when
descriptionor codec metadata changes under the samename.Proposed fix
- if self.track.as_ref().is_some_and(|t| t.name == *name) { + if self.track.as_ref().is_some_and(|t| { + t.name == *name + && t.convert.as_ref().map(|c| c.length_size) + == config.description.as_ref() + .filter(|d| !d.is_empty()) + .and_then(|avcc| super::parse_avcc_param_sets(avcc).ok().map(|p| p.length_size)) + }) { return Ok(()); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/h264/export.rs` around lines 140 - 142, The early return in the export logic only checks self.track.as_ref().is_some_and(|t| t.name == *name) which can leave stale state when other config fields (e.g., description or codec metadata) change under the same rendition name; instead, update the guard so you refresh the track when any relevant metadata differs: compare self.track's identifying fields (name plus description and codec/config fields used by conversion) against the incoming values and only return early when all of them match, otherwise update/replace self.track and rebuild any conversion/source state; locate the check referencing self.track and name and extend it to include the additional metadata comparisons (or remove the early-return and explicitly handle track refresh) to ensure state is refreshed on config changes.rs/moq-mux/src/codec/h265/export.rs (2)
139-144:⚠️ Potential issue | 🟠 Major | ⚡ Quick winValidate VPS/SPS/PPS presence before constructing
Hvc1Convert.
hvc1conversion should fail fast when any required parameter set is missing, otherwise keyframes may be emitted without a decodable prefix.Proposed fix
let params = super::parse_hvcc_param_sets(hvcc)?; +anyhow::ensure!(!params.vps.is_empty(), "hvcC missing VPS"); +anyhow::ensure!(!params.sps.is_empty(), "hvcC missing SPS"); +anyhow::ensure!(!params.pps.is_empty(), "hvcC missing PPS"); let prefix = annexb::build_prefix(params.vps.iter().chain(params.sps.iter()).chain(params.pps.iter()));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/h265/export.rs` around lines 139 - 144, The code currently constructs Hvc1Convert using params from parse_hvcc_param_sets without ensuring params.vps, params.sps, and params.pps are non-empty; change the logic in the block building Hvc1Convert (the code that calls parse_hvcc_param_sets and annexb::build_prefix) to validate that params.vps, params.sps, and params.pps each contain at least one entry and return an Err (propagate the failure) if any are missing, instead of proceeding to call annexb::build_prefix and creating Hvc1Convert with an incomplete prefix; keep length_size from params.length_size and only build keyframe_prefix when all three sets are present.
131-133:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDo not short-circuit on name-only equality for track refresh.
If rendition metadata changes while
nameremains stable, the exporter may keep stale conversion/source state.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/h265/export.rs` around lines 131 - 133, The early return using self.track.as_ref().is_some_and(|t| t.name == *name) skips updating the exporter when only the name matches but rendition/metadata may have changed; remove this name-only short-circuit in the export path and instead compare the full relevant track metadata (e.g., rendition, codec parameters, source identifiers) or always refresh the internal conversion/source state when a new track info arrives; update the matching logic around self.track and the name variable in the exporter to validate all fields needed to preserve conversion state consistency rather than relying solely on name equality.rs/moq-mux/src/codec/h265/mod.rs (1)
41-53:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse checked offset arithmetic in
hvcCparser loops.
pos + 3,pos + 2, andpos + lencan overflow before comparison and open panic paths with malformed input.Proposed fix
- anyhow::ensure!(hvcc.len() >= pos + 3, "hvcC truncated in array header"); + let hdr_end = pos.checked_add(3).context("hvcC truncated in array header")?; + anyhow::ensure!(hdr_end <= hvcc.len(), "hvcC truncated in array header"); @@ - anyhow::ensure!(hvcc.len() >= pos + 2, "hvcC truncated in NAL length"); + let len_end = pos.checked_add(2).context("hvcC truncated in NAL length")?; + anyhow::ensure!(len_end <= hvcc.len(), "hvcC truncated in NAL length"); @@ - anyhow::ensure!(hvcc.len() >= pos + len, "hvcC truncated in NAL payload"); - let bytes = Bytes::copy_from_slice(&hvcc[pos..pos + len]); - pos += len; + let payload_end = pos.checked_add(len).context("hvcC truncated in NAL payload")?; + anyhow::ensure!(payload_end <= hvcc.len(), "hvcC truncated in NAL payload"); + let bytes = Bytes::copy_from_slice(&hvcc[pos..payload_end]); + pos = payload_end;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/h265/mod.rs` around lines 41 - 53, The hvcC parser currently computes offsets like pos + 3, pos + 2 and pos + len which can overflow and panic; update the parsing logic (the block handling hvcc, pos, nal_type, num_nalus and the NAL loop) to use checked arithmetic (e.g., pos.checked_add(3), pos.checked_add(1), pos.checked_add(2), pos.checked_add(len)) and bail/anyhow::ensure with an appropriate "hvcC truncated" error if any checked_add returns None or if the resulting end offset exceeds hvcc.len(); ensure you replace each direct addition (pos + X) with checks before indexing and update pos only after successful checked bounds verification in the functions/blocks that reference hvcc, pos, nal_type, num_nalus and len.rs/moq-mux/src/codec/h264/mod.rs (1)
170-175:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse checked index math in
avcCparameter parsing.
pos + 2/pos + lencan overflow before the bounds check and lead to panic paths on malformed input.Proposed fix
for _ in 0..num_sps { - anyhow::ensure!(avcc.len() >= pos + 2, "avcC truncated in SPS length"); - let len = u16::from_be_bytes([avcc[pos], avcc[pos + 1]]) as usize; - pos += 2; - anyhow::ensure!(avcc.len() >= pos + len, "avcC truncated in SPS payload"); - sps.push(Bytes::copy_from_slice(&avcc[pos..pos + len])); - pos += len; + let len_end = pos.checked_add(2).context("avcC truncated in SPS length")?; + anyhow::ensure!(len_end <= avcc.len(), "avcC truncated in SPS length"); + let len = u16::from_be_bytes([avcc[pos], avcc[pos + 1]]) as usize; + pos = len_end; + let payload_end = pos.checked_add(len).context("avcC truncated in SPS payload")?; + anyhow::ensure!(payload_end <= avcc.len(), "avcC truncated in SPS payload"); + sps.push(Bytes::copy_from_slice(&avcc[pos..payload_end])); + pos = payload_end; } @@ for _ in 0..num_pps { - anyhow::ensure!(avcc.len() >= pos + 2, "avcC truncated in PPS length"); - let len = u16::from_be_bytes([avcc[pos], avcc[pos + 1]]) as usize; - pos += 2; - anyhow::ensure!(avcc.len() >= pos + len, "avcC truncated in PPS payload"); - pps.push(Bytes::copy_from_slice(&avcc[pos..pos + len])); - pos += len; + let len_end = pos.checked_add(2).context("avcC truncated in PPS length")?; + anyhow::ensure!(len_end <= avcc.len(), "avcC truncated in PPS length"); + let len = u16::from_be_bytes([avcc[pos], avcc[pos + 1]]) as usize; + pos = len_end; + let payload_end = pos.checked_add(len).context("avcC truncated in PPS payload")?; + anyhow::ensure!(payload_end <= avcc.len(), "avcC truncated in PPS payload"); + pps.push(Bytes::copy_from_slice(&avcc[pos..payload_end])); + pos = payload_end; }Also applies to: 184-189
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/h264/mod.rs` around lines 170 - 175, The parsing uses unchecked index arithmetic (avcc, pos) so pos + 2 / pos + len can overflow before the bounds checks; change to checked/borrowed slicing to avoid panics: use avcc.get(pos..pos.checked_add(2).ok_or(...)?) (or compute end = pos.checked_add(2).ok_or(...) and then avcc.get(pos..end).ok_or(...)) to read the two-byte length, convert that safe slice into the u16 length, then compute payload_end = pos.checked_add(len).ok_or(...) and use avcc.get(pos..payload_end).ok_or(...) to push Bytes::copy_from_slice of that safe subslice and finally set pos = payload_end; apply the same pattern for the block at lines 184-189. Refer to variables/functions: avcc, pos, sps, Bytes::copy_from_slice, and the surrounding ensure/error handling to propagate errors instead of panicking.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-mux/src/catalog/stream.rs`:
- Around line 23-31: The Stream trait currently exposes anyhow::Result; change
the public signatures of Stream::poll_next and Stream::next to return
crate::Result<Option<Catalog>> (i.e. use the library's Result alias) and update
any impls to convert/propagate errors into crate::Error (use the #[from]
conversions / From::from where needed) instead of returning anyhow::Error;
ensure imports reference crate::Result and crate::Error from error.rs and keep
the async next() body using conducer::wait unchanged except for the new return
type so all implementations compile against the typed Error/Result.
In `@rs/moq-mux/src/codec/annexb.rs`:
- Around line 9-10: The public APIs in this module (from_length_prefixed,
NalIterator::flush, after_start_code and the iterator Item type) currently
expose anyhow::Result and must be changed to use the crate-level Result/Error;
update the signatures to return crate::Result<Bytes> (and crate::Result<()>
where appropriate) and change the iterator alias type Item =
crate::Result<Bytes>. Add new error variants to crate::error::Error (e.g.
AnnexBParseError and LengthPrefixedError or similar names) using thiserror to
represent the specific parsing failures, then replace any
anyhow::ensure!/anyhow::Error usage inside from_length_prefixed,
NalIterator::flush, after_start_code and iterator yield points with
Err(Error::AnnexBParseError(...)) or Err(Error::LengthPrefixedError(...)) as
appropriate (or map existing anyhow errors into these variants), and update call
sites/tests to propagate the crate::Result type.
In `@rs/moq-mux/src/codec/h264/mod.rs`:
- Around line 162-163: Change the public API of parse_avcc_param_sets to return
a typed error instead of anyhow::Result: define a #[non_exhaustive] pub enum
(e.g. ParseAvccError) using thiserror::Error with variants for the cases the
function can produce (e.g. TooShort, InvalidFormat, and an External(#[from]
OtherError) or From<anyhow::Error>-style wrapper for propagated errors), update
the signature to fn parse_avcc_param_sets(avcc: &[u8]) -> Result<AvccParamSets,
ParseAvccError>, replace anyhow::ensure! and other anyhow usages inside
parse_avcc_param_sets with explicit Err(ParseAvccError::TooShort) or the
appropriate variant, and convert any internal error propagation to use the
#[from] variants so callers get a concrete ParseAvccError type instead of
anyhow::Error.
---
Duplicate comments:
In `@rs/moq-mux/src/codec/annexb.rs`:
- Around line 22-31: The bounds checks and slices in the Annex B NAL parsing
loop can overflow because expressions like pos + length_size and pos + len are
computed before ensuring they don't wrap; change those to use checked_add (e.g.,
let end = pos.checked_add(length_size).ok_or_else(|| anyhow::anyhow!("truncated
NAL length prefix"))? ) and similarly compute let payload_end =
pos.checked_add(len).ok_or_else(|| anyhow::anyhow!("truncated NAL payload"))? ),
perform the ensure/ok_or on those checked results, then use payload[pos..end]
and payload[pos..payload_end] and update pos = end / pos = payload_end; apply
this to the code that builds len and extends out (references: pos, length_size,
len, payload, out, START_CODE) so no pre-check overflow can occur.
In `@rs/moq-mux/src/codec/h264/export.rs`:
- Around line 148-153: The current Avc1Convert creation enables avc1 conversion
even when parse_avcc_param_sets(avcc) returns no SPS/PPS, leading to undecodable
keyframes; update the logic around Avc1Convert construction (the
parse_avcc_param_sets call and the Avc1Convert { length_size, keyframe_prefix }
creation) to first check that params.sps and params.pps are non-empty (and
possibly length_size is valid) and only return Some(Avc1Convert) when both SPS
and PPS exist; otherwise return None (or skip conversion) so avc1 is not enabled
without required parameter sets.
- Around line 140-142: The early return in the export logic only checks
self.track.as_ref().is_some_and(|t| t.name == *name) which can leave stale state
when other config fields (e.g., description or codec metadata) change under the
same rendition name; instead, update the guard so you refresh the track when any
relevant metadata differs: compare self.track's identifying fields (name plus
description and codec/config fields used by conversion) against the incoming
values and only return early when all of them match, otherwise update/replace
self.track and rebuild any conversion/source state; locate the check referencing
self.track and name and extend it to include the additional metadata comparisons
(or remove the early-return and explicitly handle track refresh) to ensure state
is refreshed on config changes.
In `@rs/moq-mux/src/codec/h264/mod.rs`:
- Around line 170-175: The parsing uses unchecked index arithmetic (avcc, pos)
so pos + 2 / pos + len can overflow before the bounds checks; change to
checked/borrowed slicing to avoid panics: use
avcc.get(pos..pos.checked_add(2).ok_or(...)?) (or compute end =
pos.checked_add(2).ok_or(...) and then avcc.get(pos..end).ok_or(...)) to read
the two-byte length, convert that safe slice into the u16 length, then compute
payload_end = pos.checked_add(len).ok_or(...) and use
avcc.get(pos..payload_end).ok_or(...) to push Bytes::copy_from_slice of that
safe subslice and finally set pos = payload_end; apply the same pattern for the
block at lines 184-189. Refer to variables/functions: avcc, pos, sps,
Bytes::copy_from_slice, and the surrounding ensure/error handling to propagate
errors instead of panicking.
In `@rs/moq-mux/src/codec/h265/export.rs`:
- Around line 139-144: The code currently constructs Hvc1Convert using params
from parse_hvcc_param_sets without ensuring params.vps, params.sps, and
params.pps are non-empty; change the logic in the block building Hvc1Convert
(the code that calls parse_hvcc_param_sets and annexb::build_prefix) to validate
that params.vps, params.sps, and params.pps each contain at least one entry and
return an Err (propagate the failure) if any are missing, instead of proceeding
to call annexb::build_prefix and creating Hvc1Convert with an incomplete prefix;
keep length_size from params.length_size and only build keyframe_prefix when all
three sets are present.
- Around line 131-133: The early return using
self.track.as_ref().is_some_and(|t| t.name == *name) skips updating the exporter
when only the name matches but rendition/metadata may have changed; remove this
name-only short-circuit in the export path and instead compare the full relevant
track metadata (e.g., rendition, codec parameters, source identifiers) or always
refresh the internal conversion/source state when a new track info arrives;
update the matching logic around self.track and the name variable in the
exporter to validate all fields needed to preserve conversion state consistency
rather than relying solely on name equality.
In `@rs/moq-mux/src/codec/h265/mod.rs`:
- Around line 41-53: The hvcC parser currently computes offsets like pos + 3,
pos + 2 and pos + len which can overflow and panic; update the parsing logic
(the block handling hvcc, pos, nal_type, num_nalus and the NAL loop) to use
checked arithmetic (e.g., pos.checked_add(3), pos.checked_add(1),
pos.checked_add(2), pos.checked_add(len)) and bail/anyhow::ensure with an
appropriate "hvcC truncated" error if any checked_add returns None or if the
resulting end offset exceeds hvcc.len(); ensure you replace each direct addition
(pos + X) with checks before indexing and update pos only after successful
checked bounds verification in the functions/blocks that reference hvcc, pos,
nal_type, num_nalus and len.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b6551753-a076-4cb1-91b3-cb316ff3f2a2
📒 Files selected for processing (19)
rs/hang/src/catalog/audio/codec.rsrs/hang/src/catalog/video/codec.rsrs/moq-cli/src/subscribe.rsrs/moq-mux/src/catalog/consumer.rsrs/moq-mux/src/catalog/filter.rsrs/moq-mux/src/catalog/mod.rsrs/moq-mux/src/catalog/stream.rsrs/moq-mux/src/catalog/target.rsrs/moq-mux/src/codec/annexb.rsrs/moq-mux/src/codec/h264/export.rsrs/moq-mux/src/codec/h264/mod.rsrs/moq-mux/src/codec/h265/export.rsrs/moq-mux/src/codec/h265/mod.rsrs/moq-mux/src/container/fmp4/export.rsrs/moq-mux/src/container/fmp4/export_test.rsrs/moq-mux/src/container/mkv/export.rsrs/moq-mux/src/container/mkv/export_test.rsrs/moq-mux/src/container/mod.rsrs/moq-mux/src/container/source.rs
| pub fn from_length_prefixed(payload: &[u8], length_size: usize, prefix: Option<&[u8]>) -> anyhow::Result<Bytes> { | ||
| anyhow::ensure!( |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate and inspect the target file and nearby code
FILE="rs/moq-mux/src/codec/annexb.rs"
echo "== File exists =="
ls -l "$FILE"
echo "== Show lines 1-120 =="
nl -ba "$FILE" | sed -n '1,160p'
echo "== Show Cargo manifest for moq-mux =="
ls -l rs/moq-mux
nl -ba rs/moq-mux/Cargo.toml | sed -n '1,220p'
echo "== Find other public funcs returning anyhow::Result in rs/moq-mux =="
rg -n --hidden --glob '!**/target/**' "pub\s+fn\s+.*anyhow::Result" rs/moq-mux/src || true
echo "== Find occurrences of annexb::from_length_prefixed usage in repo =="
rg -n --hidden --glob '!**/target/**' "from_length_prefixed\(" rs || trueRepository: moq-dev/moq
Length of output: 239
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="rs/moq-mux/src/codec/annexb.rs"
echo "== File =="
ls -l "$FILE"
echo "== annexb.rs (relevant section) =="
nl -ba "$FILE" | sed -n '1,220p'
echo "== moq-mux/Cargo.toml =="
ls -l rs/moq-mux
nl -ba rs/moq-mux/Cargo.toml | sed -n '1,260p'
echo "== Public fns returning anyhow::Result in moq-mux =="
rg -n --hidden --glob '!**/target/**' "pub\s+fn\s+.*anyhow::Result" rs/moq-mux/src || true
echo "== Other public APIs returning anyhow::Result in rs/moq-mux =="
rg -n --hidden --glob '!**/target/**' "pub\s+.*->\s*anyhow::Result" rs/moq-mux/src || trueRepository: moq-dev/moq
Length of output: 243
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="rs/moq-mux/src/codec/annexb.rs"
echo "== annexb.rs (first 220 lines) =="
sed -n '1,220p' "$FILE"
echo "== moq-mux/Cargo.toml =="
sed -n '1,260p' rs/moq-mux/Cargo.toml
echo "== Public funcs returning anyhow::Result in moq-mux/src =="
rg -n --hidden --glob '!**/target/**' "pub\s+fn\s+.*anyhow::Result" rs/moq-mux/src || true
echo "== Public return types mentioning anyhow::Result (broader) =="
rg -n --hidden --glob '!**/target/**' "->\s*anyhow::Result" rs/moq-mux/src || true
echo "== Usage of from_length_prefixed =="
rg -n --hidden --glob '!**/target/**' "from_length_prefixed\(" rs || trueRepository: moq-dev/moq
Length of output: 12882
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Exact line numbers around annexb.rs pub functions
FILE="rs/moq-mux/src/codec/annexb.rs"
echo "== pub anyhow::Result in annexb.rs with line numbers =="
awk 'NR>=1 && NR<=140 {print NR":"$0}' "$FILE" | rg -n "pub .*anyhow::Result" || true
echo "== annexb.rs lines 1-140 (numbered) =="
awk 'NR>=1 && NR<=140 {print NR":"$0}' "$FILE"
# 2) How annexb module is wired into the public API
echo "== Find module wiring for annexb in rs/moq-mux/src/codec =="
rg -n "mod annexb|pub mod annexb|annexb::" rs/moq-mux/src/codec || true
echo "== Find where codec module re-exports annexb from crate root =="
rg -n "pub mod codec|mod codec|pub use.*annexb|pub use.*codec" rs/moq-mux/src || true
# 3) Confirm the public nature: check rs/moq-mux/src/lib.rs
echo "== moq-mux/src/lib.rs =="
ls -l rs/moq-mux/src
sed -n '1,220p' rs/moq-mux/src/lib.rs
# 4) usages of from_length_prefixed within the crate
echo "== usages of from_length_prefixed =="
rg -n "annexb::from_length_prefixed\(" rs/moq-mux/src || trueRepository: moq-dev/moq
Length of output: 8102
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== moq-mux/src/error.rs =="
sed -n '1,240p' rs/moq-mux/src/error.rs
echo "== codec/mod.rs =="
sed -n '1,120p' rs/moq-mux/src/codec/mod.rs
echo "== codec/annexb.rs uses anyhow import line(s) =="
rg -n "use anyhow|\banhow::Result\b|anyhow::ensure!|anyhow::bail!" rs/moq-mux/src/codec/annexb.rsRepository: moq-dev/moq
Length of output: 1894
Avoid leaking anyhow::Result from moq-mux public APIs.
rs/moq-mux/src/codec/annexb.rs exposes anyhow::Result in the library surface (from_length_prefixed and also NalIterator::flush, after_start_code, and the iterator type Item = anyhow::Result<Bytes>), even though moq-mux already has crate::error::Error + crate::Result<T> (thiserror, #[non_exhaustive]). Switch these public signatures to return crate::Result and add thiserror variants for the Annex-B/length-prefixed parsing failures.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-mux/src/codec/annexb.rs` around lines 9 - 10, The public APIs in this
module (from_length_prefixed, NalIterator::flush, after_start_code and the
iterator Item type) currently expose anyhow::Result and must be changed to use
the crate-level Result/Error; update the signatures to return
crate::Result<Bytes> (and crate::Result<()> where appropriate) and change the
iterator alias type Item = crate::Result<Bytes>. Add new error variants to
crate::error::Error (e.g. AnnexBParseError and LengthPrefixedError or similar
names) using thiserror to represent the specific parsing failures, then replace
any anyhow::ensure!/anyhow::Error usage inside from_length_prefixed,
NalIterator::flush, after_start_code and iterator yield points with
Err(Error::AnnexBParseError(...)) or Err(Error::LengthPrefixedError(...)) as
appropriate (or map existing anyhow errors into these variants), and update call
sites/tests to propagate the crate::Result type.
| pub fn parse_avcc_param_sets(avcc: &[u8]) -> anyhow::Result<AvccParamSets> { | ||
| anyhow::ensure!(avcc.len() >= 7, "avcC too short"); |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify library-facing anyhow usage in this crate
fd -p Cargo.toml rs/moq-mux | xargs -I{} sh -c 'echo "== {} =="; sed -n "1,120p" {}'
rg -n --type rust 'pub fn .*-> anyhow::Result|pub async fn .*-> anyhow::Result' rs/moq-mux/srcRepository: moq-dev/moq
Length of output: 6156
Align public library errors with project Rust policy (thiserror over anyhow).
rs/moq-mux/src/codec/h264/mod.rs’s public parse_avcc_param_sets(avcc: &[u8]) -> anyhow::Result<AvccParamSets> exposes anyhow::Result from a library crate; per policy, switch to a typed thiserror error enum (using #[from] and #[non_exhaustive] for public enums) and convert internal errors into that type.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-mux/src/codec/h264/mod.rs` around lines 162 - 163, Change the public
API of parse_avcc_param_sets to return a typed error instead of anyhow::Result:
define a #[non_exhaustive] pub enum (e.g. ParseAvccError) using thiserror::Error
with variants for the cases the function can produce (e.g. TooShort,
InvalidFormat, and an External(#[from] OtherError) or From<anyhow::Error>-style
wrapper for propagated errors), update the signature to fn
parse_avcc_param_sets(avcc: &[u8]) -> Result<AvccParamSets, ParseAvccError>,
replace anyhow::ensure! and other anyhow usages inside parse_avcc_param_sets
with explicit Err(ParseAvccError::TooShort) or the appropriate variant, and
convert any internal error propagation to use the #[from] variants so callers
get a concrete ParseAvccError type instead of anyhow::Error.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-mux/src/codec/h265/mod.rs (1)
30-30:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftSwitch public
parse_hvcc_param_setsfromanyhow::Resultto typedthiserrorerrors.
parse_hvcc_param_setsis a public library API (rs/moq-mux/src/codec/h265/mod.rs) but currently returnsanyhow::Result, whilemoq-muxalready provides a typedthiserrorsurface viacrate::Error(#[non_exhaustive]) andpub type Result<T>. Add/extend a typed hvcc parse error (and keep it#[non_exhaustive]if public) and update callers likers/moq-mux/src/codec/h265/export.rsto use the typedcrate::Result<HvccParamSets>.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/h265/mod.rs` at line 30, Change the public function parse_hvcc_param_sets signature from returning anyhow::Result to the crate's typed Result by returning crate::Result<HvccParamSets>; create a new thiserror-compatible error variant (e.g., HvccParseError) added to the public #[non_exhaustive] crate::Error enum to represent parse failures and preserve any original error details as source/cause, update parse_hvcc_param_sets to map or wrap internal errors into that new variant, and update all callers (notably functions in rs/moq-mux/src/codec/h265/export.rs) to expect crate::Result<HvccParamSets> instead of anyhow::Result so error types align with the library surface.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@rs/moq-mux/src/codec/h265/mod.rs`:
- Line 30: Change the public function parse_hvcc_param_sets signature from
returning anyhow::Result to the crate's typed Result by returning
crate::Result<HvccParamSets>; create a new thiserror-compatible error variant
(e.g., HvccParseError) added to the public #[non_exhaustive] crate::Error enum
to represent parse failures and preserve any original error details as
source/cause, update parse_hvcc_param_sets to map or wrap internal errors into
that new variant, and update all callers (notably functions in
rs/moq-mux/src/codec/h265/export.rs) to expect crate::Result<HvccParamSets>
instead of anyhow::Result so error types align with the library surface.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 20cb48fc-2acd-424f-b34b-71103214addd
📒 Files selected for processing (6)
rs/moq-cli/src/subscribe.rsrs/moq-mux/src/codec/annexb.rsrs/moq-mux/src/codec/h264/export.rsrs/moq-mux/src/codec/h264/mod.rsrs/moq-mux/src/codec/h265/export.rsrs/moq-mux/src/codec/h265/mod.rs
Three changes from Luke's review: 1. CLI flags: rename `--max-video-*` / `--max-audio-bitrate` to put the `_max` suffix at the end (`--video-width-max`, `--video-bitrate-max`, etc.) for consistency. 2. `catalog::Stream` trait now returns `crate::Result<Option<Catalog>>` instead of `anyhow::Result`. `msf::Consumer` follows suit; its rich parsing chain stays in `anyhow::Error` internally and is wrapped in a new `crate::Error::Msf(anyhow::Error)` variant at the boundary so the layered context is preserved. 3. `catalog::Filter` and `catalog::Target` swap the dirty-flag re-emit trick for a `conducer::Producer<State>` + paired `Consumer` pair. `set_video` / `set_audio` write through the producer and the Mut::drop wakes any paired consumer waiters, so a retarget mid-poll now wakes the polling task (the dirty flag silently waited for the next upstream snapshot). The state carries a monotonic `epoch` counter so `poll_next` can tell whether the criteria changed since the last emit without diffing the structs. Inner and consumer waiters are both registered on Pending, so either a new upstream snapshot or a setter triggers a re-poll. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace `Error::Msf(anyhow::Error)` with a dedicated thiserror enum in `catalog::msf::error`. Each failure path the consumer can hit now lands in a typed variant: - transport (`Moq`, `Utf8`, `Json`, `Base64`, `Mp4`) - catalog conversion (`InvalidCodec`, `Schema`, `UnsupportedAudioPackaging`) - codec config blobs (`AudioConfig` carries `kind` tag for which blob) `Schema` is the bucket for invariants the consumer enforces on top of MSF (CMAF without init_data, audio without samplerate / channelConfig, missing codec). `AudioConfig.detail` still stringifies the inner `anyhow::Error` from `codec::aac::Config::parse` / `codec::opus::Config::parse` since those existing parsers return `anyhow`; switching them is a separate refactor. The variant boundary keeps the typed surface stable. `crate::Error::Msf` now wraps `catalog::msf::Error` via `#[from]`, and `catalog::Consumer` propagates the conversion through its `Stream` impl. Added `serde_json` direct dep (was transitive through `moq_msf`) for the `Json` variant. All msf consumer tests updated to match against the typed variants. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Adds a composable layer for narrowing a broadcast's catalog before handing it to an exporter, plus single-rendition H.264 / H.265 Annex-B exporters that emit raw elementary streams for piping into ffmpeg or similar tools.
What changed
catalog::Stream+ selection wrappers (new)catalog::Stream— trait yieldinghang::Catalogsnapshots, boundedSend + 'static. Defaultnext,filter,targetmethods.catalog::Consumer— public unified hang/MSF consumer (promoted from the internalCatalogSource).catalog::Filter<S>— hard match on renditionnameandcodecfamily. Drops renditions that don't match.catalog::Target<S>— soft match on maxwidth/height/pixels/bitrate. Reduces each axis to at most one rendition; ranking algorithm is a Rust port of js/watch's#select.Both wrappers cache the last input snapshot and re-emit on
set_video/set_audio, which is the seam future bandwidth-driven ABR will drive.Existing exporters refactored
fmp4::Exportandmkv::Exportare now generic overS: catalog::Stream. The legacywith_catalog_format(broadcast, fmt)constructor is gone — callers build their own stream and pass it toExport::new(broadcast, stream). The internalCatalogSourceis deleted (superseded by the publiccatalog::Consumer).Annex-B exporters (new)
codec::h264::Exportandcodec::h265::Exportsubscribe to a single rendition via the newExportSource::for_video_raw(skips the avc1/hvc1 shape transform) and emit Annex-B bytes:annexb::from_length_prefixed; VPS/SPS/PPS extracted from the avcC/hvcC are injected ahead of every keyframe.Timestamps are dropped (Annex-B has no container framing).
hangcodec kindsVideoCodec::kind() -> VideoCodecKindandAudioCodec::kind() -> AudioCodecKindfor tag-only matching without pattern-matching the full codec enums.CLI
moq subscribegains:--format h264/--format h265(Annex-B output).--video-name,--video-codec,--max-video-{width,height,pixels,bitrate}.--audio-name,--audio-codec,--max-audio-bitrate.Flags wire through a
FilterthenTargetchain before reaching the exporter.Test plan
just checkpasses (cargo + clippy + rustfmt + rustdoc + remark).cargo test -p moq-mux— all 150 tests pass, including refactored fmp4/mkv round-trip tests and new unit tests forFilterandTargetselection / re-emit-on-retarget behavior.--format fmp4 --max-video-width 640and confirmffprobesees only the targeted rendition.moq subscribe --format h264 ... | ffplay -against an avc3 source.moq subscribe --format h264 ... | ffplay -against an avc1 (CMAF) source — verifies SPS/PPS keyframe injection.Notes for reviewers
with_catalog_formatis a breaking change forfmp4::Export/mkv::Exportcallers. The replacement isExport::new(broadcast, catalog_stream)where the stream comes fromcatalog::Consumer::new(&broadcast, format)?(optionally wrapped inFilter/Target). The CLI'sSubscribe::stream()is a representative call site.annexb::from_length_prefixedandannexb::build_prefixhelpers live incodec::annexbsince they're NALU-format-agnostic (used by bothcodec::h264::Exportandcodec::h265::Export).set_*re-emit semantics are the only foothold this PR puts in place.🤖 Generated with Claude Code