diff --git a/Cargo.lock b/Cargo.lock index df3e9c4be..99a85bf3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3741,7 +3741,6 @@ dependencies = [ name = "moq-mux" version = "0.5.1" dependencies = [ - "anyhow", "base64 0.22.1", "bytes", "conducer", diff --git a/demo/pub/justfile b/demo/pub/justfile index ccf74cac0..b6781ffa3 100644 --- a/demo/pub/justfile +++ b/demo/pub/justfile @@ -17,6 +17,7 @@ deploy: # Use ffmpeg to fragment before uploading: # ffmpeg -i input.mp4 -c copy -f mp4 \ # -movflags cmaf+separate_moof+delay_moov+skip_trailer+frag_every_frame \ + # output.mp4 upload file: bun install diff --git a/go/justfile b/go/justfile index 9647d2829..94ec1a2c4 100644 --- a/go/justfile +++ b/go/justfile @@ -10,11 +10,13 @@ default: # Build moq-ffi for the host, run uniffi-bindgen-go, stage everything # into a tmp dir, and run `go build`/`go vet`/`go test` from there. + # Skips cleanly if cargo, go, or uniffi-bindgen-go is missing. check: bash scripts/check.sh # Stage the in-tree go/ source + per-target moq-ffi libs + generated + # bindings into a single Go module ready for publish. package *args: bash scripts/package.sh {{ args }} @@ -22,6 +24,7 @@ package *args: # Full Go CI: `check` builds moq-ffi, regenerates bindings, runs go # vet/build/test. Takes a newline-separated list of changed files; # skips if FILES is non-empty and none match the Go scope. Run + # `just go ci` (no FILES) to force-run. ci FILES="": #!/usr/bin/env bash diff --git a/infra/apt/justfile b/infra/apt/justfile index bfec5d405..f8654f08f 100644 --- a/infra/apt/justfile +++ b/infra/apt/justfile @@ -6,6 +6,7 @@ deploy: bun wrangler deploy # Regenerate apt repo metadata and upload to the apt-moq-dev R2 bucket. + # Reads .deb files from $ARTIFACTS_DIR (defaults to ./artifacts). publish artifacts="artifacts": ARTIFACTS_DIR="{{ artifacts }}" ./publish.sh diff --git a/infra/rpm/justfile b/infra/rpm/justfile index 2c6b1be9b..f1811346b 100644 --- a/infra/rpm/justfile +++ b/infra/rpm/justfile @@ -6,6 +6,7 @@ deploy: bun wrangler deploy # Regenerate rpm repo metadata and upload to the rpm-moq-dev R2 bucket. + # Reads .rpm files from $ARTIFACTS_DIR (defaults to ./artifacts). publish artifacts="artifacts": ARTIFACTS_DIR="{{ artifacts }}" ./publish.sh diff --git a/js/justfile b/js/justfile index 9d2a4256b..4d5592332 100644 --- a/js/justfile +++ b/js/justfile @@ -43,6 +43,7 @@ build: # Full JS CI: lint + tests + build. Takes a newline-separated list of # changed files; skips if FILES is non-empty and none match the JS + # scope. Run `just js ci` (no FILES) to force-run everything. ci FILES="": #!/usr/bin/env bash diff --git a/justfile b/justfile index 09147f61e..daf67b106 100644 --- a/justfile +++ b/justfile @@ -1,8 +1,7 @@ #!/usr/bin/env just --justfile - # Using Just: https://github.com/casey/just?tab=readme-ov-file#installation - # Per-language modules. Anything that's specific to one language lives in + # its own justfile; the recipes below orchestrate across them. mod js mod rs @@ -10,14 +9,11 @@ mod py mod kt mod swift mod go - # Demos and infra. mod demo mod infra - # GitHub Actions workflow linting. mod gh '.github' - # Shortcuts to avoid `demo::` prefix. mod boy 'demo/boy' mod pub 'demo/pub' @@ -34,6 +30,7 @@ dev: just demo # Install repo-wide tooling. Per-language deps install on first invocation + # of `just check`. install: bun install @@ -42,6 +39,7 @@ install: # Fast inner-loop checks. Runs JS, Rust, and Markdown lints. # Shell + workflow + TOML + Nix + justfile lints skip silently if their # binaries aren't on $PATH; `nix develop` provides them, and `just ci` + # requires them. check *args: just js check @@ -55,6 +53,7 @@ check *args: # Run every per-language `ci` with the diff vs BASE; each greps for its # own scope and skips when nothing relevant changed. Pass BASE="" to + # default to $GITHUB_BASE_REF (CI) or origin/main (local). ci BASE="": #!/usr/bin/env bash @@ -101,6 +100,7 @@ ci BASE="": just gh ci # Auto-fix linting/formatting issues across all languages. + # shfmt / taplo / nixfmt / just --fmt skipped silently if missing locally. fix: just js fix diff --git a/kt/justfile b/kt/justfile index 9250aca97..536fc536a 100644 --- a/kt/justfile +++ b/kt/justfile @@ -10,12 +10,14 @@ default: just check # Build moq-ffi for the host, regenerate uniffi bindings, run :moq:jvmTest. + # Skips cleanly if cargo, java, or gradle is missing. check: bash scripts/check.sh # Assemble the KMP module from per-target moq-ffi binaries + bindings. # Used by .github/workflows/release-kt.yml; see kt/README.md for the + # expected --lib-dir layout. package *args: bash scripts/package.sh {{ args }} @@ -23,6 +25,7 @@ package *args: # Full Kotlin CI: `check` already builds moq-ffi and runs gradle tests. # Takes a newline-separated list of changed files; skips if FILES is # non-empty and none match the Kotlin scope. Run `just kt ci` (no + # FILES) to force-run. ci FILES="": #!/usr/bin/env bash diff --git a/py/justfile b/py/justfile index c572b4efb..6f3a8c0fb 100644 --- a/py/justfile +++ b/py/justfile @@ -11,6 +11,7 @@ default: # Lint + format + maturin source build + pyright. `--no-install-workspace` # installs the root dev group (ruff, maturin, pyright, pytest) without # trying to pip-build moq-rs; `maturin develop` then installs moq-rs + # as an editable wheel with the rs/moq-ffi cdylib + uniffi bindings. check: uv sync --no-install-workspace @@ -30,6 +31,7 @@ test: uv run --no-sync pytest moq-rs/tests/ # Local dev build: produces an editable install of moq-rs (with the + # moq-ffi cdylib + uniffi bindings) into the workspace venv. build: uv sync --no-install-workspace @@ -38,6 +40,7 @@ build: # Full Python CI: lint + tests + build. Takes a newline-separated list # of changed files; skips if FILES is non-empty and none match the # Python scope (which includes rs/moq-ffi because py bundles it via + # maturin). Run `just py ci` (no FILES) to force-run everything. ci FILES="": #!/usr/bin/env bash diff --git a/rs/justfile b/rs/justfile index cf4fb0a6f..050a93773 100644 --- a/rs/justfile +++ b/rs/justfile @@ -27,6 +27,7 @@ check *args: # force-run everything. `cargo publish --dry-run` lives in release-rs.yml. # # `cargo deny` runs here (not in `check`) so the inner-loop stays fast + # and devs aren't blocked by a fresh upstream advisory mid-edit. ci FILES="": #!/usr/bin/env bash @@ -80,6 +81,7 @@ update: # # Examples: # just rs package moq-relay deb + # just rs package moq-cli rpm package crate packager: #!/usr/bin/env bash diff --git a/rs/libmoq/src/error.rs b/rs/libmoq/src/error.rs index 69b0378c4..728d4351b 100644 --- a/rs/libmoq/src/error.rs +++ b/rs/libmoq/src/error.rs @@ -83,13 +83,9 @@ pub enum Error { #[error("unknown format: {0}")] UnknownFormat(String), - /// Media decoder initialization failed. - #[error("init failed: {0}")] - InitFailed(Arc), - - /// Media frame decode failed. - #[error("decode failed: {0}")] - DecodeFailed(Arc), + /// Buffer was not fully consumed. + #[error("buffer was not fully consumed")] + BufferNotConsumed, /// Timestamp value overflow. #[error("timestamp overflow")] @@ -117,7 +113,7 @@ pub enum Error { /// Error from the moq-mux consumer layer. #[error("mux error: {0}")] - Mux(Arc), + Mux(#[from] moq_mux::Error), /// Index out of bounds. #[error("no index")] @@ -144,16 +140,6 @@ impl From for Error { } } -impl From for Error { - fn from(err: moq_mux::Error) -> Self { - match err { - moq_mux::Error::Moq(e) => Error::Moq(e), - moq_mux::Error::Hang(e) => Error::Hang(e), - e => Error::Mux(Arc::new(e)), - } - } -} - impl ffi::ReturnCode for Error { fn code(&self) -> i32 { tracing::error!("{}", self); @@ -167,8 +153,6 @@ impl ffi::ReturnCode for Error { Error::InvalidId => -7, Error::NotFound => -8, Error::UnknownFormat(_) => -9, - Error::InitFailed(_) => -10, - Error::DecodeFailed(_) => -11, Error::TimestampOverflow(_) => -13, Error::Level(_) => -14, Error::InvalidCode => -15, @@ -187,6 +171,7 @@ impl ffi::ReturnCode for Error { Error::FrameNotFound => -28, Error::Mux(_) => -29, Error::Audio(_) => -30, + Error::BufferNotConsumed => -31, } } } diff --git a/rs/libmoq/src/publish.rs b/rs/libmoq/src/publish.rs index a1ff8193a..224fe9a0a 100644 --- a/rs/libmoq/src/publish.rs +++ b/rs/libmoq/src/publish.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc}; +use std::str::FromStr; use bytes::Buf; use moq_mux::import; @@ -50,8 +50,7 @@ impl Publish { let (broadcast, catalog) = self.broadcasts.get(broadcast).ok_or(Error::BroadcastNotFound)?; let format = import::FramedFormat::from_str(format).map_err(|_| Error::UnknownFormat(format.to_string()))?; - let decoder = import::Framed::new(broadcast.clone(), catalog.clone(), format, &mut init) - .map_err(|err| Error::InitFailed(Arc::new(err)))?; + let decoder = import::Framed::new(broadcast.clone(), catalog.clone(), format, &mut init)?; let id = self.media.insert(decoder)?; Ok(id) @@ -65,14 +64,10 @@ impl Publish { ) -> Result<(), Error> { let media = self.media.get_mut(media).ok_or(Error::MediaNotFound)?; - media - .decode_frame(&mut data, Some(timestamp)) - .map_err(|err| Error::DecodeFailed(Arc::new(err)))?; + media.decode_frame(&mut data, Some(timestamp))?; if data.has_remaining() { - return Err(Error::DecodeFailed(Arc::new(anyhow::anyhow!( - "buffer was not fully consumed" - )))); + return Err(Error::BufferNotConsumed); } Ok(()) @@ -80,7 +75,7 @@ impl Publish { pub fn media_close(&mut self, media: Id) -> Result<(), Error> { let mut decoder = self.media.remove(media).ok_or(Error::MediaNotFound)?; - decoder.finish().map_err(|err| Error::DecodeFailed(Arc::new(err)))?; + decoder.finish()?; Ok(()) } } diff --git a/rs/moq-cli/src/publish.rs b/rs/moq-cli/src/publish.rs index 813ca6f83..baf6b6ded 100644 --- a/rs/moq-cli/src/publish.rs +++ b/rs/moq-cli/src/publish.rs @@ -24,8 +24,8 @@ impl PublishDecoder { /// Decode a chunk of bytes from stdin (Avc3 or Fmp4 only). fn decode_buf(&mut self, buffer: &mut bytes::BytesMut) -> anyhow::Result<()> { match self { - Self::Avc3(d) => d.decode_stream(buffer, None), - Self::Fmp4(d) => d.decode(buffer), + Self::Avc3(d) => Ok(d.decode_stream(buffer, None)?), + Self::Fmp4(d) => Ok(d.decode(buffer)?), Self::Hls(_) => unreachable!(), } } @@ -67,7 +67,7 @@ impl Publish { pub async fn run(mut self) -> anyhow::Result<()> { if let PublishDecoder::Hls(decoder) = &mut self.decoder { decoder.init().await?; - decoder.run().await + Ok(decoder.run().await?) } else { let mut stdin = tokio::io::stdin(); let mut buffer = bytes::BytesMut::new(); diff --git a/rs/moq-loc/src/lib.rs b/rs/moq-loc/src/lib.rs index 5ac039b9b..220d8531d 100644 --- a/rs/moq-loc/src/lib.rs +++ b/rs/moq-loc/src/lib.rs @@ -48,7 +48,7 @@ pub struct Frame { } /// Errors from LOC frame encode/decode. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, Clone, thiserror::Error)] #[non_exhaustive] pub enum Error { /// The frame's property block did not contain a 0x06 (Timestamp) entry. diff --git a/rs/moq-mux/Cargo.toml b/rs/moq-mux/Cargo.toml index b0e588cb6..2de859118 100644 --- a/rs/moq-mux/Cargo.toml +++ b/rs/moq-mux/Cargo.toml @@ -17,7 +17,6 @@ doctest = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1" base64 = "0.22" bytes = "1" conducer = { workspace = true } diff --git a/rs/moq-mux/src/catalog/msf/consumer.rs b/rs/moq-mux/src/catalog/msf/consumer.rs index bab7ba2b2..f3c039200 100644 --- a/rs/moq-mux/src/catalog/msf/consumer.rs +++ b/rs/moq-mux/src/catalog/msf/consumer.rs @@ -1,10 +1,12 @@ use std::str::FromStr; use std::task::Poll; -use anyhow::Context; use base64::Engine; use hang::catalog::{AudioCodec, AudioConfig, Container, VideoCodec, VideoConfig}; +use crate::Result; +use crate::catalog::msf::Error; + /// A consumer for the MSF catalog track. /// /// Mirrors [`crate::catalog::hang::Consumer`] but for the MSF (MOQT Streaming Format) catalog @@ -25,7 +27,7 @@ impl Consumer { } /// Poll for the next catalog update, returned as a [`hang::Catalog`]. - pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { // Drain pending groups, keeping only the newest. Remember whether the track is done // so we can distinguish "more groups may arrive" from "no more groups, ever". let track_finished = loop { @@ -40,8 +42,8 @@ impl Consumer { match group.poll_read_frame(waiter)? { Poll::Ready(Some(frame)) => { self.group = None; - let json = std::str::from_utf8(&frame).context("MSF catalog frame is not valid UTF-8")?; - let msf = moq_msf::Catalog::from_str(json).context("failed to parse MSF catalog frame")?; + let json = std::str::from_utf8(&frame).map_err(|_| Error::InvalidUtf8)?; + let msf = moq_msf::Catalog::from_str(json).map_err(|_| Error::ParseFrame)?; let catalog = from_msf(&msf)?; return Poll::Ready(Ok(Some(catalog))); } @@ -61,7 +63,7 @@ impl Consumer { /// /// Waits for the next MSF catalog publication and returns it converted to a /// [`hang::Catalog`]. Returns `None` when the track has ended with no further updates. - pub async fn next(&mut self) -> anyhow::Result> { + pub async fn next(&mut self) -> Result> { conducer::wait(|waiter| self.poll_next(waiter)).await } } @@ -86,7 +88,7 @@ impl From for Consumer { /// /// Fields with no representation in `hang::Catalog` (`is_live`, `render_group`, `alt_group`, /// `max_grp_sap_starting_type`, `max_obj_sap_starting_type`) are dropped. -pub(crate) fn from_msf(msf: &moq_msf::Catalog) -> anyhow::Result { +pub(crate) fn from_msf(msf: &moq_msf::Catalog) -> Result { let mut catalog = hang::Catalog::default(); for track in &msf.tracks { @@ -139,13 +141,12 @@ pub(crate) fn from_msf(msf: &moq_msf::Catalog) -> anyhow::Result /// Returns `Err` when a CMAF track is missing or has malformed `init_data`. This is an /// intentional hard error: a CMAF rendition is unusable without its `ftyp+moov` init /// segment, and silently skipping it would mask a publisher bug. -fn container_from_msf(track: &moq_msf::Track) -> anyhow::Result> { +fn container_from_msf(track: &moq_msf::Track) -> Result> { match &track.packaging { // Both LOC and Legacy represent raw payloads without ISO-BMFF boxing. moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => Ok(Some(Container::Legacy)), moq_msf::Packaging::Cmaf => { - let init = decode_init_data(track)? - .with_context(|| format!("MSF CMAF track {:?} missing init_data", track.name))?; + let init = decode_init_data(track)?.ok_or_else(|| Error::MissingCmafInit(track.name.clone()))?; #[allow(deprecated)] Ok(Some(Container::Cmaf { init, @@ -165,7 +166,7 @@ fn container_from_msf(track: &moq_msf::Track) -> anyhow::Result anyhow::Result> { +fn decode_init_data(track: &moq_msf::Track) -> Result> { track .init_data .as_ref() @@ -173,7 +174,7 @@ fn decode_init_data(track: &moq_msf::Track) -> anyhow::Result anyhow::Result anyhow::Result> { +fn legacy_description(track: &moq_msf::Track) -> Result> { match track.packaging { moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => decode_init_data(track), _ => Ok(None), } } -fn video_config_from_msf(track: &moq_msf::Track) -> anyhow::Result> { +fn video_config_from_msf(track: &moq_msf::Track) -> Result> { // Unsupported packaging (e.g. MediaTimeline) bubbles up as Ok(None) so the caller can // skip the track with a warning rather than fail the whole catalog. let Some(container) = container_from_msf(track)? else { @@ -199,11 +200,13 @@ fn video_config_from_msf(track: &moq_msf::Track) -> anyhow::Result anyhow::Result anyhow::Result> { +fn audio_config_from_msf(track: &moq_msf::Track) -> Result> { let Some(container) = container_from_msf(track)? else { return Ok(None); }; @@ -224,9 +227,11 @@ fn audio_config_from_msf(track: &moq_msf::Track) -> anyhow::Result anyhow::Result { - let init = decode_init_data(track)?.with_context(|| { - format!( - "MSF audio track {:?} omits samplerate/channelConfig and has no init_data to derive from", - track.name - ) - })?; +fn derive_audio_params(track: &moq_msf::Track, codec: &AudioCodec) -> Result { + let init = decode_init_data(track)?.ok_or_else(|| Error::MissingAudioParams(track.name.clone()))?; match track.packaging { moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => derive_from_codec_config(track, codec, init), moq_msf::Packaging::Cmaf => derive_from_cmaf_moov(track, init), - _ => anyhow::bail!( - "MSF audio track {:?} packaging {:?} is unsupported for parameter derivation", - track.name, - track.packaging - ), + _ => Err(Error::UnsupportedDerivationPackaging { + name: track.name.clone(), + packaging: format!("{:?}", track.packaging), + } + .into()), } } -fn derive_from_codec_config( - track: &moq_msf::Track, - codec: &AudioCodec, - init: bytes::Bytes, -) -> anyhow::Result { +fn derive_from_codec_config(track: &moq_msf::Track, codec: &AudioCodec, init: bytes::Bytes) -> Result { use bytes::Buf; let mut buf = init; match codec { AudioCodec::AAC(_) => { - let cfg = crate::codec::aac::Config::parse(&mut buf) - .with_context(|| format!("MSF audio track {:?} has malformed AudioSpecificConfig", track.name))?; - anyhow::ensure!( - !buf.has_remaining(), - "MSF audio track {:?} AudioSpecificConfig has trailing bytes", - track.name, - ); + let cfg = + crate::codec::aac::Config::parse(&mut buf).map_err(|_| Error::MalformedAac(track.name.clone()))?; + if buf.has_remaining() { + return Err(Error::AacTrailingBytes(track.name.clone()).into()); + } Ok(DerivedAudio { sample_rate: cfg.sample_rate, channel_count: cfg.channel_count, }) } AudioCodec::Opus => { - let cfg = crate::codec::opus::Config::parse(&mut buf) - .with_context(|| format!("MSF audio track {:?} has malformed OpusHead", track.name))?; - anyhow::ensure!( - !buf.has_remaining(), - "MSF audio track {:?} OpusHead has trailing bytes", - track.name, - ); + let cfg = + crate::codec::opus::Config::parse(&mut buf).map_err(|_| Error::MalformedOpus(track.name.clone()))?; + if buf.has_remaining() { + return Err(Error::OpusTrailingBytes(track.name.clone()).into()); + } Ok(DerivedAudio { sample_rate: cfg.sample_rate, channel_count: cfg.channel_count, }) } - _ => anyhow::bail!( - "MSF audio track {:?} omits samplerate/channelConfig; codec {:?} has no init_data parser", - track.name, - codec, - ), + _ => Err(Error::UnsupportedDerivationCodec(track.name.clone()).into()), } } -fn derive_from_cmaf_moov(track: &moq_msf::Track, init: bytes::Bytes) -> anyhow::Result { +fn derive_from_cmaf_moov(track: &moq_msf::Track, init: bytes::Bytes) -> Result { use mp4_atom::{Any, DecodeMaybe}; let mut cursor = std::io::Cursor::new(init.as_ref()); let mut moov: Option = None; - while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor) - .with_context(|| format!("MSF audio track {:?} init segment is malformed", track.name))? + while let Some(atom) = + mp4_atom::Any::decode_maybe(&mut cursor).map_err(|_| Error::MalformedInitSegment(track.name.clone()))? { if let Any::Moov(m) = atom { moov = Some(m); break; } } - let moov = moov.with_context(|| format!("MSF audio track {:?} init segment missing moov", track.name))?; + let moov = moov.ok_or_else(|| Error::MissingInitMoov(track.name.clone()))?; // Walk every trak looking for an audio sample entry. A single-track audio init is // the only thing we expect here, but rather than enforce that we just take the first @@ -369,10 +357,7 @@ fn derive_from_cmaf_moov(track: &moq_msf::Track, init: bytes::Bytes) -> anyhow:: } } } - anyhow::bail!( - "MSF audio track {:?} CMAF init has no audio sample entry to derive samplerate/channelConfig from", - track.name, - ) + Err(Error::MissingAudioSampleEntry(track.name.clone()).into()) } #[cfg(test)] diff --git a/rs/moq-mux/src/catalog/msf/mod.rs b/rs/moq-mux/src/catalog/msf/mod.rs index 595e19cd3..ebe5ce0d2 100644 --- a/rs/moq-mux/src/catalog/msf/mod.rs +++ b/rs/moq-mux/src/catalog/msf/mod.rs @@ -9,3 +9,64 @@ mod consumer; pub use consumer::Consumer; + +/// MSF catalog decoding errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("MSF catalog frame is not valid UTF-8")] + InvalidUtf8, + + #[error("failed to parse MSF catalog frame")] + ParseFrame, + + #[error("MSF CMAF track {0:?} missing init_data")] + MissingCmafInit(String), + + #[error("MSF track {0:?} has malformed init_data")] + MalformedInitData(String), + + #[error("MSF video track {0:?} missing codec")] + MissingVideoCodec(String), + + #[error("MSF audio track {0:?} missing codec")] + MissingAudioCodec(String), + + #[error("MSF video track {name:?} has invalid codec {codec:?}")] + InvalidVideoCodec { name: String, codec: String }, + + #[error("MSF audio track {name:?} has invalid codec {codec:?}")] + InvalidAudioCodec { name: String, codec: String }, + + #[error("MSF audio track {0:?} omits samplerate/channelConfig and has no init_data to derive from")] + MissingAudioParams(String), + + #[error("MSF audio track {name:?} packaging {packaging:?} is unsupported for parameter derivation")] + UnsupportedDerivationPackaging { name: String, packaging: String }, + + #[error("MSF audio track {0:?} has malformed AudioSpecificConfig")] + MalformedAac(String), + + #[error("MSF audio track {0:?} has malformed OpusHead")] + MalformedOpus(String), + + #[error("MSF audio track {0:?} AudioSpecificConfig has trailing bytes")] + AacTrailingBytes(String), + + #[error("MSF audio track {0:?} OpusHead has trailing bytes")] + OpusTrailingBytes(String), + + #[error("MSF audio track {0:?} omits samplerate/channelConfig; codec has no init_data parser")] + UnsupportedDerivationCodec(String), + + #[error("MSF audio track {0:?} init segment is malformed")] + MalformedInitSegment(String), + + #[error("MSF audio track {0:?} init segment missing moov")] + MissingInitMoov(String), + + #[error("MSF audio track {0:?} CMAF init has no audio sample entry to derive samplerate/channelConfig from")] + MissingAudioSampleEntry(String), +} + +pub type Result = std::result::Result; diff --git a/rs/moq-mux/src/codec/aac/import.rs b/rs/moq-mux/src/codec/aac/import.rs index 51e1a8fe1..99b069563 100644 --- a/rs/moq-mux/src/codec/aac/import.rs +++ b/rs/moq-mux/src/codec/aac/import.rs @@ -19,7 +19,7 @@ impl Import { mut broadcast: moq_net::BroadcastProducer, mut catalog: crate::catalog::hang::Producer, config: Config, - ) -> anyhow::Result { + ) -> crate::Result { let track = broadcast.unique_track(".aac")?; let mut audio_config = hang::catalog::AudioConfig::new( @@ -47,18 +47,18 @@ impl Import { } /// Finish the track, flushing the current group. - pub fn finish(&mut self) -> anyhow::Result<()> { + pub fn finish(&mut self) -> crate::Result<()> { self.track.finish()?; Ok(()) } /// Close the current group and open the next one at `sequence`. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { + pub fn seek(&mut self, sequence: u64) -> crate::Result<()> { self.track.seek(sequence)?; Ok(()) } - pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { + pub fn decode(&mut self, buf: &mut T, pts: Option) -> crate::Result<()> { let pts = self.pts(pts)?; // Collect the input into a contiguous Bytes payload. @@ -84,7 +84,7 @@ impl Import { Ok(()) } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> crate::Result { if let Some(pts) = hint { return Ok(pts); } diff --git a/rs/moq-mux/src/codec/aac/mod.rs b/rs/moq-mux/src/codec/aac/mod.rs index be11cd25c..0360f6c82 100644 --- a/rs/moq-mux/src/codec/aac/mod.rs +++ b/rs/moq-mux/src/codec/aac/mod.rs @@ -7,9 +7,30 @@ mod import; pub use import::*; -use anyhow::Context; use bytes::{Buf, Bytes}; +/// AAC parsing errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("AudioSpecificConfig must be at least 2 bytes")] + ConfigTooShort, + + #[error("extended audioObjectType requires 2 additional bytes")] + ExtendedConfigTooShort, + + #[error("AudioSpecificConfig incomplete")] + IncompleteConfig, + + #[error("explicit sample rate requires 3 additional bytes")] + ExplicitSampleRateTooShort, + + #[error("unsupported sample rate index: {0}")] + UnsupportedSampleRateIndex(u8), +} + +pub type Result = std::result::Result; + /// Typed AAC configuration mirroring the relevant fields of an /// AudioSpecificConfig. pub struct Config { @@ -24,8 +45,10 @@ impl Config { /// Handles basic formats (object_type < 31), extended formats /// (object_type == 31), and explicit sample rates (freq_index == 15). /// Any SBR/PS extension bytes after the core fields are consumed. - pub fn parse(buf: &mut T) -> anyhow::Result { - anyhow::ensure!(buf.remaining() >= 2, "AudioSpecificConfig must be at least 2 bytes"); + pub fn parse(buf: &mut T) -> Result { + if buf.remaining() < 2 { + return Err(Error::ConfigTooShort); + } // Read first byte let b0 = buf.get_u8(); @@ -33,10 +56,9 @@ impl Config { let freq_index; let (profile, sample_rate, channel_count) = if object_type == 31 { - anyhow::ensure!( - buf.remaining() >= 2, - "extended audioObjectType requires 2 additional bytes" - ); + if buf.remaining() < 2 { + return Err(Error::ExtendedConfigTooShort); + } // Extended format: next 6 bits are the extended object_type (32-63). // Bits 5-7 of b0 are the first 3 bits of extended object_type. let b_ext = buf.get_u8(); @@ -49,7 +71,9 @@ impl Config { let channel_config_high = b_ext & 0x01; // Read next byte for rest of channelConfiguration. - anyhow::ensure!(buf.remaining() >= 1, "AudioSpecificConfig incomplete"); + if buf.remaining() < 1 { + return Err(Error::IncompleteConfig); + } let b1 = buf.get_u8(); // Bits 5-7 of b1 are the remaining 3 bits of channelConfiguration. let channel_config = (channel_config_high << 3) | ((b1 >> 5) & 0x07); @@ -66,7 +90,9 @@ impl Config { // Standard format: bits 5-7 of b0 are first 3 bits of freq_index. let mut freq_index_local = (b0 & 0x07) << 1; - anyhow::ensure!(buf.remaining() >= 1, "AudioSpecificConfig incomplete"); + if buf.remaining() < 1 { + return Err(Error::IncompleteConfig); + } let b1 = buf.get_u8(); // Complete frequency index (bit 7 of b1 is bit 0 of freq_index). @@ -140,13 +166,15 @@ impl Config { } } -fn sample_rate_from_index(freq_index: u8, buf: &mut T) -> anyhow::Result { +fn sample_rate_from_index(freq_index: u8, buf: &mut T) -> Result { const SAMPLE_RATES: [u32; 13] = [ 96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350, ]; if freq_index == 15 { - anyhow::ensure!(buf.remaining() >= 3, "explicit sample rate requires 3 additional bytes"); + if buf.remaining() < 3 { + return Err(Error::ExplicitSampleRateTooShort); + } let rate_bytes = [buf.get_u8(), buf.get_u8(), buf.get_u8()]; return Ok(((rate_bytes[0] as u32) << 16) | ((rate_bytes[1] as u32) << 8) | (rate_bytes[2] as u32)); } @@ -154,7 +182,7 @@ fn sample_rate_from_index(freq_index: u8, buf: &mut T) -> anyhow::Result SAMPLE_RATES .get(freq_index as usize) .copied() - .context("unsupported sample rate index") + .ok_or(Error::UnsupportedSampleRateIndex(freq_index)) } /// Map an AAC `channel_config` (ISO 14496-3 Table 1.19) to its real channel count. diff --git a/rs/moq-mux/src/codec/annexb.rs b/rs/moq-mux/src/codec/annexb.rs index 7a5b1de5f..d05a72609 100644 --- a/rs/moq-mux/src/codec/annexb.rs +++ b/rs/moq-mux/src/codec/annexb.rs @@ -1,8 +1,20 @@ -use anyhow::{self}; use bytes::{Buf, Bytes}; pub const START_CODE: Bytes = Bytes::from_static(&[0, 0, 0, 1]); +/// Annex B parsing errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("missing Annex B start code")] + MissingStartCode, + + #[error("invalid Annex B start code")] + InvalidStartCode, +} + +pub type Result = std::result::Result; + pub struct NalIterator<'a, T: Buf + AsRef<[u8]> + 'a> { buf: &'a mut T, start: Option, @@ -15,7 +27,7 @@ impl<'a, T: Buf + AsRef<[u8]> + 'a> NalIterator<'a, T> { /// Assume the buffer ends with a NAL unit and flush it. /// This is more efficient because we cache the last "start" code position. - pub fn flush(self) -> anyhow::Result> { + pub fn flush(self) -> Result> { let start = match self.start { Some(start) => start, None => { @@ -34,7 +46,7 @@ impl<'a, T: Buf + AsRef<[u8]> + 'a> NalIterator<'a, T> { } impl<'a, T: Buf + AsRef<[u8]> + 'a> Iterator for NalIterator<'a, T> { - type Item = anyhow::Result; + type Item = Result; fn next(&mut self) -> Option { let start = match self.start { @@ -55,21 +67,22 @@ impl<'a, T: Buf + AsRef<[u8]> + 'a> Iterator for NalIterator<'a, T> { } // Return the size of the start code at the start of the buffer. -pub fn after_start_code(b: &[u8]) -> anyhow::Result> { +pub fn after_start_code(b: &[u8]) -> Result> { if b.len() < 3 { return Ok(None); } // NOTE: We have to check every byte, so the `find_start_code` optimization doesn't matter. - anyhow::ensure!(b[0] == 0, "missing Annex B start code"); - anyhow::ensure!(b[1] == 0, "missing Annex B start code"); + if b[0] != 0 || b[1] != 0 { + return Err(Error::MissingStartCode); + } match b[2] { 0 if b.len() < 4 => Ok(None), - 0 if b[3] != 1 => anyhow::bail!("missing Annex B start code"), + 0 if b[3] != 1 => Err(Error::MissingStartCode), 0 => Ok(Some(4)), 1 => Ok(Some(3)), - _ => anyhow::bail!("invalid Annex B start code"), + _ => Err(Error::InvalidStartCode), } } diff --git a/rs/moq-mux/src/codec/av1/import.rs b/rs/moq-mux/src/codec/av1/import.rs index 44998e86a..aa7af2ef6 100644 --- a/rs/moq-mux/src/codec/av1/import.rs +++ b/rs/moq-mux/src/codec/av1/import.rs @@ -1,10 +1,12 @@ use crate::container::jitter::MinFrameDuration; -use anyhow::Context; use bytes::BytesMut; use bytes::{Buf, Bytes}; use scuffle_av1::seq::SequenceHeaderObu; +use super::Error; +use crate::Result; + /// A decoder for AV1 with inline sequence headers. pub struct Import { // The broadcast being produced. @@ -49,7 +51,7 @@ impl Import { } } - fn init(&mut self, seq_header: &SequenceHeaderObu) -> anyhow::Result<()> { + fn init(&mut self, seq_header: &SequenceHeaderObu) -> Result<()> { let mut config = hang::catalog::VideoConfig::new(hang::catalog::AV1 { profile: seq_header.seq_profile, level: seq_header @@ -110,7 +112,7 @@ impl Import { } /// Initialize with minimal config if sequence header parsing fails - fn init_minimal(&mut self) -> anyhow::Result<()> { + fn init_minimal(&mut self) -> Result<()> { let mut config = hang::catalog::VideoConfig::new(hang::catalog::AV1 { profile: 0, // Main profile level: 0, // Unknown @@ -145,7 +147,7 @@ impl Import { } /// Initialize the decoder with sequence header and other metadata OBUs. - pub fn initialize>(&mut self, buf: &mut T) -> anyhow::Result<()> { + pub fn initialize>(&mut self, buf: &mut T) -> Result<()> { let data = buf.as_ref(); // Handle av1C format (MP4/container initialization) @@ -169,7 +171,7 @@ impl Import { Ok(()) } - fn init_from_av1c(&mut self, data: &[u8]) -> anyhow::Result<()> { + fn init_from_av1c(&mut self, data: &[u8]) -> Result<()> { // Parse av1C box structure let seq_profile = (data[1] >> 5) & 0x07; let seq_level_idx = data[1] & 0x1F; @@ -221,16 +223,12 @@ impl Import { } /// Returns a reference to the underlying track producer. - pub fn track(&self) -> anyhow::Result<&moq_net::TrackProducer> { - Ok(self.track.as_ref().context("not initialized")?.track()) + pub fn track(&self) -> Result<&moq_net::TrackProducer> { + Ok(self.track.as_ref().ok_or(Error::NotInitialized)?.track()) } /// Decode as much data as possible from the given buffer. - pub fn decode_stream>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { + pub fn decode_stream>(&mut self, buf: &mut T, pts: Option) -> Result<()> { let obus = ObuIterator::new(buf); for obu in obus { @@ -243,11 +241,7 @@ impl Import { } /// Decode all data in the buffer, assuming the buffer contains (the rest of) a frame. - pub fn decode_frame>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { + pub fn decode_frame>(&mut self, buf: &mut T, pts: Option) -> Result<()> { let pts = self.pts(pts)?; let mut obus = ObuIterator::new(buf); @@ -264,8 +258,10 @@ impl Import { Ok(()) } - fn decode_obu(&mut self, obu_data: Bytes, pts: Option) -> anyhow::Result<()> { - anyhow::ensure!(!obu_data.is_empty(), "OBU is too short"); + fn decode_obu(&mut self, obu_data: Bytes, pts: Option) -> Result<()> { + if obu_data.is_empty() { + return Err(Error::ObuTooShort.into()); + } // Parse OBU header - this consumes header + extension + LEB128 size let mut reader = &obu_data[..]; @@ -347,16 +343,13 @@ impl Import { Ok(()) } - fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { + fn maybe_start_frame(&mut self, pts: Option) -> Result<()> { if !self.current.contains_frame { return Ok(()); } - let track = self - .track - .as_mut() - .context("expected sequence header before any frames")?; - let pts = pts.context("missing timestamp")?; + let track = self.track.as_mut().ok_or(Error::MissingSequenceHeader)?; + let pts = pts.ok_or(Error::MissingTimestamp)?; let payload = std::mem::take(&mut self.current.chunks).freeze(); @@ -381,15 +374,19 @@ impl Import { } /// Finish the track, flushing the current group. - pub fn finish(&mut self) -> anyhow::Result<()> { - let track = self.track.as_mut().context("not initialized")?; + pub fn finish(&mut self) -> Result<()> { + let track = self.track.as_mut().ok_or(Error::NotInitialized)?; track.finish()?; Ok(()) } /// Close the current group and open the next one at `sequence`. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { - let track = self.track.as_mut().context("not initialized")?; + /// + /// Any in-flight access unit is dropped. Pre-seek OBUs would otherwise leak + /// into the post-seek group with the wrong timestamp. + pub fn seek(&mut self, sequence: u64) -> Result<()> { + self.current = Frame::default(); + let track = self.track.as_mut().ok_or(Error::NotInitialized)?; track.seek(sequence)?; Ok(()) } @@ -398,7 +395,7 @@ impl Import { self.track.is_some() } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> Result { if let Some(pts) = hint { return Ok(pts); } @@ -427,7 +424,7 @@ impl<'a, T: Buf + AsRef<[u8]> + 'a> ObuIterator<'a, T> { Self { buf } } - pub fn flush(self) -> anyhow::Result> { + pub fn flush(self) -> Result> { let remaining = self.buf.remaining(); if remaining == 0 { return Ok(None); @@ -439,7 +436,7 @@ impl<'a, T: Buf + AsRef<[u8]> + 'a> ObuIterator<'a, T> { } impl<'a, T: Buf + AsRef<[u8]> + 'a> Iterator for ObuIterator<'a, T> { - type Item = anyhow::Result; + type Item = Result; fn next(&mut self) -> Option { if self.buf.remaining() == 0 { @@ -490,7 +487,7 @@ impl<'a, T: Buf + AsRef<[u8]> + 'a> Iterator for ObuIterator<'a, T> { } if shift >= 56 { - return Some(Err(anyhow::anyhow!("OBU size too large"))); + return Some(Err(Error::ObuSizeTooLarge.into())); } } diff --git a/rs/moq-mux/src/codec/av1/mod.rs b/rs/moq-mux/src/codec/av1/mod.rs index a1f53f5bf..ca44ebeeb 100644 --- a/rs/moq-mux/src/codec/av1/mod.rs +++ b/rs/moq-mux/src/codec/av1/mod.rs @@ -10,6 +10,37 @@ pub use import::*; use hang::catalog::AV1; +/// AV1 parsing errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("OBU is too short")] + ObuTooShort, + + #[error("OBU size too large")] + ObuSizeTooLarge, + + #[error("not initialized")] + NotInitialized, + + #[error("expected sequence header before any frames")] + MissingSequenceHeader, + + #[error("missing timestamp")] + MissingTimestamp, + + #[error("OBU header parse: {0}")] + ObuHeaderParse(std::sync::Arc), +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::ObuHeaderParse(std::sync::Arc::new(err)) + } +} + +pub type Result = std::result::Result; + /// Map a parsed `mp4_atom::Av1c` (AV1CodecConfigurationRecord) to the /// hang catalog's AV1 codec struct. /// diff --git a/rs/moq-mux/src/codec/h264/import.rs b/rs/moq-mux/src/codec/h264/import.rs index bc1b8e54f..26085c04c 100644 --- a/rs/moq-mux/src/codec/h264/import.rs +++ b/rs/moq-mux/src/codec/h264/import.rs @@ -7,11 +7,11 @@ //! leading start code; callers that already know it can also force the //! mode via [`with_mode`](Import::with_mode). -use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; use tokio::io::{AsyncRead, AsyncReadExt}; -use super::Sps; +use super::{Error, Sps}; +use crate::Result; use crate::codec::annexb::{NalIterator, START_CODE}; use crate::container::jitter::MinFrameDuration; @@ -78,7 +78,7 @@ impl Import { /// inside [`initialize`](Self::initialize). Eagerly creates the broadcast /// track for avc3 sources so the caller can observe subscriber state /// (`used()` / `unused()`) before any frames arrive. - pub fn with_mode(mut self, mode: Mode) -> anyhow::Result { + pub fn with_mode(mut self, mode: Mode) -> Result { match mode { Mode::Avc1 => { self.state = State::Pending { @@ -117,7 +117,7 @@ impl Import { /// is parsed as Annex-B NALs to seed the cached SPS/PPS. /// /// The buffer is fully consumed. - pub fn initialize>(&mut self, buf: &mut T) -> anyhow::Result<()> { + pub fn initialize>(&mut self, buf: &mut T) -> Result<()> { let mode = match &self.state { State::Pending { mode_hint } => mode_hint.unwrap_or_else(|| detect_mode(buf.as_ref())), State::Avc1 { .. } => Mode::Avc1, @@ -131,7 +131,7 @@ impl Import { } /// Initialize the avc1 path from an `AVCDecoderConfigurationRecord` buffer. - fn initialize_avc1>(&mut self, buf: &mut T) -> anyhow::Result<()> { + fn initialize_avc1>(&mut self, buf: &mut T) -> Result<()> { let avcc_bytes = buf.as_ref(); let avcc = super::Avcc::parse(avcc_bytes)?; self.state = State::Avc1 { @@ -157,7 +157,7 @@ impl Import { /// Initialize the avc3 path by parsing Annex-B NALs (SPS/PPS seed the /// catalog rendition; the track is created eagerly on first SPS). - fn initialize_avc3>(&mut self, buf: &mut T) -> anyhow::Result<()> { + fn initialize_avc3>(&mut self, buf: &mut T) -> Result<()> { // Eager-create the track + state on first switch into Avc3 mode so // callers can observe `used()` / `unused()` before any frames arrive. if !matches!(self.state, State::Avc3 { .. }) { @@ -192,7 +192,7 @@ impl Import { /// Decode from an asynchronous reader. avc3 only — for avc1, the caller /// already has framed buffers and uses [`decode_frame`](Self::decode_frame). - pub async fn decode_from(&mut self, reader: &mut T) -> anyhow::Result<()> { + pub async fn decode_from(&mut self, reader: &mut T) -> Result<()> { let mut buffer = BytesMut::new(); while reader.read_buf(&mut buffer).await? > 0 { self.decode_stream(&mut buffer, None)?; @@ -203,12 +203,10 @@ impl Import { /// Decode a buffer where frame boundaries are unknown (avc3 streaming /// input). The leading start code of the *next* frame is what signals the /// previous frame is done. - pub fn decode_stream>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { - anyhow::ensure!(matches!(self.state, State::Avc3 { .. }), "decode_stream is avc3 only"); + pub fn decode_stream>(&mut self, buf: &mut T, pts: Option) -> Result<()> { + if !matches!(self.state, State::Avc3 { .. }) { + return Err(Error::StreamNotAvc3.into()); + } let pts = self.pts(pts)?; let nals = NalIterator::new(buf); for nal in nals { @@ -222,33 +220,22 @@ impl Import { /// - avc1: the buffer is written as one length-prefixed-NALU frame. /// - avc3: NALs are parsed; any trailing NAL without a start code is /// flushed as the last NAL of this frame. - pub fn decode_frame>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { + pub fn decode_frame>(&mut self, buf: &mut T, pts: Option) -> Result<()> { match &self.state { State::Avc1 { .. } => self.decode_avc1(buf, pts), State::Avc3 { .. } => self.decode_avc3_frame(buf, pts), - State::Pending { .. } => anyhow::bail!("not initialized; call initialize() or with_mode() first"), + State::Pending { .. } => Err(Error::NotInitialized.into()), } } - fn decode_avc1>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { + fn decode_avc1>(&mut self, buf: &mut T, pts: Option) -> Result<()> { let State::Avc1 { length_size } = self.state else { unreachable!("checked by decode_frame") }; let data = buf.as_ref(); let pts = self.pts(pts)?; let keyframe = avc1_is_keyframe(data, length_size); - let track = self - .track - .as_mut() - .context("not initialized; call initialize() first")?; + let track = self.track.as_mut().ok_or(Error::NotInitialized)?; track.write(crate::container::Frame { timestamp: pts, @@ -266,11 +253,7 @@ impl Import { Ok(()) } - fn decode_avc3_frame>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { + fn decode_avc3_frame>(&mut self, buf: &mut T, pts: Option) -> Result<()> { let pts = self.pts(pts)?; let mut nals = NalIterator::new(buf); while let Some(nal) = nals.next().transpose()? { @@ -283,10 +266,12 @@ impl Import { Ok(()) } - fn decode_nal(&mut self, nal: Bytes, pts: Option) -> anyhow::Result<()> { - let header = nal.first().context("NAL unit is too short")?; + fn decode_nal(&mut self, nal: Bytes, pts: Option) -> Result<()> { + let header = nal.first().ok_or(Error::NalTooShort)?; let forbidden_zero_bit = (header >> 7) & 1; - anyhow::ensure!(forbidden_zero_bit == 0, "forbidden zero bit is not zero"); + if forbidden_zero_bit != 0 { + return Err(Error::ForbiddenZeroBit.into()); + } let nal_unit_type = header & 0b11111; let nal_type = Avc3NalType::try_from(nal_unit_type).ok(); @@ -348,7 +333,7 @@ impl Import { | Some(Avc3NalType::DataPartitionA) | Some(Avc3NalType::DataPartitionB) | Some(Avc3NalType::DataPartitionC) => { - if nal.get(1).context("NAL unit is too short")? & 0x80 != 0 { + if nal.get(1).ok_or(Error::NalTooShort)? & 0x80 != 0 { self.maybe_start_frame(pts)?; } let State::Avc3 { current, .. } = &mut self.state else { @@ -369,7 +354,7 @@ impl Import { Ok(()) } - fn init_from_sps(&mut self, sps: &Sps) -> anyhow::Result<()> { + fn init_from_sps(&mut self, sps: &Sps) -> Result<()> { let mut config = hang::catalog::VideoConfig::new(hang::catalog::H264 { profile: sps.profile, constraints: sps.constraints, @@ -388,21 +373,21 @@ impl Import { // The avc3 track was created eagerly in initialize_avc3; just publish // (or republish) the catalog rendition with the latest config. - let track_name = self.track.as_ref().context("avc3 track not created")?.name.clone(); + let track_name = self.track.as_ref().ok_or(Error::Avc3TrackNotCreated)?.name.clone(); let mut catalog = self.catalog.lock(); catalog.video.renditions.insert(track_name, config.clone()); self.config = Some(config); Ok(()) } - fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { + fn maybe_start_frame(&mut self, pts: Option) -> Result<()> { let State::Avc3 { current, .. } = &mut self.state else { return Ok(()); }; if !current.contains_slice { return Ok(()); } - let pts = pts.context("missing timestamp")?; + let pts = pts.ok_or(Error::MissingTimestamp)?; let payload = std::mem::take(&mut current.chunks).freeze(); let keyframe = current.contains_idr; current.contains_idr = false; @@ -410,7 +395,7 @@ impl Import { current.contains_sps = false; current.contains_pps = false; - let track = self.track.as_mut().context("avc3 track not created")?; + let track = self.track.as_mut().ok_or(Error::Avc3TrackNotCreated)?; track.write(crate::container::Frame { timestamp: pts, payload, @@ -427,7 +412,7 @@ impl Import { /// Replace the current track + catalog rendition with `config`. Used by /// the avc1 path on every (re)initialization. - fn swap_config(&mut self, config: hang::catalog::VideoConfig, suffix: &str) -> anyhow::Result<()> { + fn swap_config(&mut self, config: hang::catalog::VideoConfig, suffix: &str) -> Result<()> { if let Some(old) = &self.config && old == &config { @@ -452,20 +437,26 @@ impl Import { } /// Finish the track, flushing any buffered data. - pub fn finish(&mut self) -> anyhow::Result<()> { - let track = self.track.as_mut().context("not initialized")?; + pub fn finish(&mut self) -> Result<()> { + let track = self.track.as_mut().ok_or(Error::NotInitialized)?; track.finish()?; Ok(()) } /// Close the current group and open the next one at `sequence`. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { - let track = self.track.as_mut().context("not initialized")?; + /// + /// Any in-flight avc3 access unit is dropped. Pre-seek NALs would otherwise + /// leak into the post-seek group with the wrong timestamp. + pub fn seek(&mut self, sequence: u64) -> Result<()> { + if let State::Avc3 { current, .. } = &mut self.state { + *current = Avc3Frame::default(); + } + let track = self.track.as_mut().ok_or(Error::NotInitialized)?; track.seek(sequence)?; Ok(()) } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> Result { if let Some(pts) = hint { return Ok(pts); } diff --git a/rs/moq-mux/src/codec/h264/mod.rs b/rs/moq-mux/src/codec/h264/mod.rs index 4f14a6e41..5e574ae2d 100644 --- a/rs/moq-mux/src/codec/h264/mod.rs +++ b/rs/moq-mux/src/codec/h264/mod.rs @@ -10,13 +10,58 @@ mod import; pub use import::*; -use anyhow::Context; use bytes::{Buf, BufMut, Bytes, BytesMut}; // H.264 NAL unit types (ISO/IEC 14496-10 §7.4.1). const NAL_TYPE_SPS: u8 = 7; const NAL_TYPE_PPS: u8 = 8; +/// H.264 parsing and transform errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("SPS NAL too short")] + SpsTooShort, + + #[error("failed to parse SPS")] + SpsParse, + + #[error("AVCDecoderConfigurationRecord too short")] + AvccTooShort, + + #[error("SPS too large for avcC length field ({0} > {max})", max = u16::MAX)] + SpsTooLarge(usize), + + #[error("PPS too large for avcC length field ({0} > {max})", max = u16::MAX)] + PpsTooLarge(usize), + + #[error("NAL too large for 4-byte length prefix")] + NalTooLarge, + + #[error("NAL unit is too short")] + NalTooShort, + + #[error("forbidden zero bit is not zero")] + ForbiddenZeroBit, + + #[error("not initialized; call initialize() or with_mode() first")] + NotInitialized, + + #[error("avc3 track not created")] + Avc3TrackNotCreated, + + #[error("missing timestamp")] + MissingTimestamp, + + #[error("decode_stream is avc3 only")] + StreamNotAvc3, + + #[error("annexb: {0}")] + Annexb(#[from] crate::codec::annexb::Error), +} + +pub type Result = std::result::Result; + /// Parsed H.264 SPS (Sequence Parameter Set) NAL. /// /// Wraps [`h264_parser::Sps`] with the codec-config fields that the hang @@ -33,10 +78,12 @@ pub struct Sps { impl Sps { /// Parse an SPS NAL unit. - pub fn parse(nal: &[u8]) -> anyhow::Result { - anyhow::ensure!(nal.len() >= 4, "SPS NAL too short"); + pub fn parse(nal: &[u8]) -> Result { + if nal.len() < 4 { + return Err(Error::SpsTooShort); + } let rbsp = h264_parser::nal::ebsp_to_rbsp(&nal[1..]); - let sps = h264_parser::Sps::parse(&rbsp).context("failed to parse SPS")?; + let sps = h264_parser::Sps::parse(&rbsp).map_err(|_| Error::SpsParse)?; Ok(Self { profile: sps.profile_idc, constraints: pack_constraint_flags(&sps), @@ -66,8 +113,10 @@ pub struct Avcc { impl Avcc { /// Parse an AVCDecoderConfigurationRecord buffer. - pub fn parse(avcc: &[u8]) -> anyhow::Result { - anyhow::ensure!(avcc.len() >= 6, "AVCDecoderConfigurationRecord too short"); + pub fn parse(avcc: &[u8]) -> Result { + if avcc.len() < 6 { + return Err(Error::AvccTooShort); + } let profile = avcc[1]; let constraints = avcc[2]; @@ -111,20 +160,16 @@ fn pack_constraint_flags(sps: &h264_parser::Sps) -> u8 { /// Build an AVCDecoderConfigurationRecord (ISO/IEC 14496-15 §5.3.3.1.2) from a /// single SPS and PPS NAL. -pub(crate) fn build_avcc(sps_nal: &[u8], pps_nal: &[u8]) -> anyhow::Result { - anyhow::ensure!( - sps_nal.len() <= u16::MAX as usize, - "SPS too large for avcC length field ({} > {})", - sps_nal.len(), - u16::MAX - ); - anyhow::ensure!( - pps_nal.len() <= u16::MAX as usize, - "PPS too large for avcC length field ({} > {})", - pps_nal.len(), - u16::MAX - ); - anyhow::ensure!(sps_nal.len() >= 4, "SPS NAL too short"); +pub(crate) fn build_avcc(sps_nal: &[u8], pps_nal: &[u8]) -> Result { + if sps_nal.len() > u16::MAX as usize { + return Err(Error::SpsTooLarge(sps_nal.len())); + } + if pps_nal.len() > u16::MAX as usize { + return Err(Error::PpsTooLarge(pps_nal.len())); + } + if sps_nal.len() < 4 { + return Err(Error::SpsTooShort); + } let profile_idc = sps_nal[1]; let constraints = sps_nal[2]; @@ -187,7 +232,7 @@ impl Avc1 { /// - `Ok(None)` if the input contained only parameter sets and the /// transform is still waiting for slice NALs (avcC may have been built /// as a side effect). - pub fn transform(&mut self, payload: Bytes) -> anyhow::Result> { + pub fn transform(&mut self, payload: Bytes) -> Result> { // Parse Annex-B NALs, strip SPS/PPS into the cache, length-prefix // the rest. NalIterator advances the Bytes cursor; the trailing NAL // has to be pulled separately via flush(). @@ -201,7 +246,7 @@ impl Avc1 { loop { let nal = match nal_iter.next() { Some(Ok(n)) => n, - Some(Err(e)) => return Err(e), + Some(Err(e)) => return Err(e.into()), None => break, }; if self.process_nal(&nal, &mut out, &mut sps_pps_changed)? { @@ -230,7 +275,7 @@ impl Avc1 { /// Process one NAL: SPS/PPS go into the cache, everything else gets /// length-prefixed and appended to `out`. Returns true if the NAL was a /// slice (i.e. produced sample bytes). - fn process_nal(&mut self, nal: &Bytes, out: &mut BytesMut, sps_pps_changed: &mut bool) -> anyhow::Result { + fn process_nal(&mut self, nal: &Bytes, out: &mut BytesMut, sps_pps_changed: &mut bool) -> Result { if nal.is_empty() { return Ok(false); } @@ -251,7 +296,7 @@ impl Avc1 { Ok(false) } _ => { - let len = u32::try_from(nal.len()).context("NAL too large for 4-byte length prefix")?; + let len = u32::try_from(nal.len()).map_err(|_| Error::NalTooLarge)?; out.extend_from_slice(&len.to_be_bytes()); out.extend_from_slice(nal); Ok(true) @@ -259,7 +304,7 @@ impl Avc1 { } } - fn rebuild_avcc(&mut self) -> anyhow::Result<()> { + fn rebuild_avcc(&mut self) -> Result<()> { let (Some(sps), Some(pps)) = (&self.sps, &self.pps) else { return Ok(()); }; diff --git a/rs/moq-mux/src/codec/h265/import.rs b/rs/moq-mux/src/codec/h265/import.rs index a180aef44..d3b616781 100644 --- a/rs/moq-mux/src/codec/h265/import.rs +++ b/rs/moq-mux/src/codec/h265/import.rs @@ -1,10 +1,12 @@ use crate::codec::annexb::{NalIterator, START_CODE}; use crate::container::jitter::MinFrameDuration; -use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; use scuffle_h265::{NALUnitType, SpsNALUnit}; +use super::Error; +use crate::Result; + /// A decoder for H.265 with inline SPS/PPS. /// Only supports single layer streams (VPS is cached but not parsed). pub struct Import { @@ -52,7 +54,7 @@ impl Import { } } - fn init(&mut self, sps: &SpsNALUnit) -> anyhow::Result<()> { + fn init(&mut self, sps: &SpsNALUnit) -> Result<()> { let profile = &sps.rbsp.profile_tier_level.general_profile; let vui_data = sps.rbsp.vui_parameters.as_ref().map(VuiData::new).unwrap_or_default(); @@ -62,7 +64,7 @@ impl Import { profile_idc: profile.profile_idc, profile_compatibility_flags: profile.profile_compatibility_flag.bits().to_be_bytes(), tier_flag: profile.tier_flag, - level_idc: profile.level_idc.context("missing level_idc in SPS")?, + level_idc: profile.level_idc.ok_or(Error::MissingLevelIdc)?, constraint_flags: crate::codec::h265::pack_constraint_flags(profile), }); config.coded_width = Some(sps.rbsp.cropped_width() as u32); @@ -99,7 +101,7 @@ impl Import { } /// Initialize the decoder with SPS/PPS and other non-slice NALs. - pub fn initialize>(&mut self, buf: &mut T) -> anyhow::Result<()> { + pub fn initialize>(&mut self, buf: &mut T) -> Result<()> { let mut nals = NalIterator::new(buf); while let Some(nal) = nals.next().transpose()? { @@ -114,8 +116,8 @@ impl Import { } /// Returns a reference to the underlying track producer. - pub fn track(&self) -> anyhow::Result<&moq_net::TrackProducer> { - Ok(self.track.as_ref().context("not initialized")?.track()) + pub fn track(&self) -> Result<&moq_net::TrackProducer> { + Ok(self.track.as_ref().ok_or(Error::NotInitialized)?.track()) } /// Decode as much data as possible from the given buffer. @@ -124,11 +126,7 @@ impl Import { /// This means it works for streaming media (ex. stdin) but adds a frame of latency. /// /// TODO: This currently associates PTS with the *previous* frame, as part of `maybe_start_frame`. - pub fn decode_stream>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { + pub fn decode_stream>(&mut self, buf: &mut T, pts: Option) -> Result<()> { let pts = self.pts(pts)?; // Iterate over the NAL units in the buffer based on start codes. @@ -148,11 +146,7 @@ impl Import { /// This can also be used when EOF is detected to flush the final frame. /// /// NOTE: The next decode will fail if it doesn't begin with a start code. - pub fn decode_frame>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { + pub fn decode_frame>(&mut self, buf: &mut T, pts: Option) -> Result<()> { let pts = self.pts(pts)?; // Iterate over the NAL units in the buffer based on start codes. let mut nals = NalIterator::new(buf); @@ -175,13 +169,17 @@ impl Import { /// Decode a single NAL unit. Only reads the first header byte to extract nal_unit_type, /// Ignores nuh_layer_id and nuh_temporal_id_plus1. - fn decode_nal(&mut self, nal: Bytes, pts: Option) -> anyhow::Result<()> { - anyhow::ensure!(nal.len() >= 2, "NAL unit is too short"); + fn decode_nal(&mut self, nal: Bytes, pts: Option) -> Result<()> { + if nal.len() < 2 { + return Err(Error::NalTooShort.into()); + } // u16 header: [forbidden_zero_bit(1) | nal_unit_type(6) | nuh_layer_id(6) | nuh_temporal_id_plus1(3)] - let header = nal.first().context("NAL unit is too short")?; + let header = nal.first().ok_or(Error::NalTooShort)?; let forbidden_zero_bit = (header >> 7) & 1; - anyhow::ensure!(forbidden_zero_bit == 0, "forbidden zero bit is not zero"); + if forbidden_zero_bit != 0 { + return Err(Error::ForbiddenZeroBit.into()); + } // Bits 1-6: nal_unit_type let nal_unit_type = (header >> 1) & 0b111111; @@ -198,7 +196,7 @@ impl Import { self.maybe_start_frame(pts)?; // Try to reinitialize the track if the SPS has changed. - let sps = SpsNALUnit::parse(&mut &nal[..]).context("failed to parse SPS NAL unit")?; + let sps = SpsNALUnit::parse(&mut &nal[..]).map_err(|_| Error::SpsParse)?; self.init(&sps)?; // SPS changed mid-AU. Cached VPS/PPS are tied to the old SPS @@ -270,7 +268,7 @@ impl Import { | NALUnitType::RaslN | NALUnitType::RaslR => { // Check first_slice_segment_in_pic_flag (bit 7 of third byte, after 2-byte header) - if nal.get(2).context("NAL unit is too short")? & 0x80 != 0 { + if nal.get(2).ok_or(Error::NalTooShort)? & 0x80 != 0 { self.maybe_start_frame(pts)?; } self.current.contains_slice = true; @@ -286,14 +284,14 @@ impl Import { Ok(()) } - fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { + fn maybe_start_frame(&mut self, pts: Option) -> Result<()> { // If we haven't seen any slices, we shouldn't flush yet. if !self.current.contains_slice { return Ok(()); } - let track = self.track.as_mut().context("expected SPS before any frames")?; - let pts = pts.context("missing timestamp")?; + let track = self.track.as_mut().ok_or(Error::MissingSps)?; + let pts = pts.ok_or(Error::MissingTimestamp)?; let payload = std::mem::take(&mut self.current.chunks).freeze(); @@ -321,15 +319,19 @@ impl Import { } /// Finish the track, flushing the current group. - pub fn finish(&mut self) -> anyhow::Result<()> { - let track = self.track.as_mut().context("not initialized")?; + pub fn finish(&mut self) -> Result<()> { + let track = self.track.as_mut().ok_or(Error::NotInitialized)?; track.finish()?; Ok(()) } /// Close the current group and open the next one at `sequence`. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { - let track = self.track.as_mut().context("not initialized")?; + /// + /// Any in-flight access unit is dropped. Pre-seek NALs would otherwise leak + /// into the post-seek group with the wrong timestamp. + pub fn seek(&mut self, sequence: u64) -> Result<()> { + self.current = Frame::default(); + let track = self.track.as_mut().ok_or(Error::NotInitialized)?; track.seek(sequence)?; Ok(()) } @@ -338,7 +340,7 @@ impl Import { self.track.is_some() } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> Result { if let Some(pts) = hint { return Ok(pts); } diff --git a/rs/moq-mux/src/codec/h265/mod.rs b/rs/moq-mux/src/codec/h265/mod.rs index d236ba874..8139731c7 100644 --- a/rs/moq-mux/src/codec/h265/mod.rs +++ b/rs/moq-mux/src/codec/h265/mod.rs @@ -9,10 +9,46 @@ mod import; pub use import::*; -use anyhow::Context; use bytes::{Buf, BufMut, Bytes, BytesMut}; use scuffle_h265::{NALUnitType, SpsNALUnit}; +/// H.265 parsing and transform errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("NAL unit is too short")] + NalTooShort, + + #[error("{0} too large for hvcC length field ({1} > {max})", max = u16::MAX)] + NalTooLargeForHvcc(&'static str, usize), + + #[error("NAL too large for 4-byte length prefix")] + NalTooLarge, + + #[error("failed to parse SPS NAL unit")] + SpsParse, + + #[error("missing level_idc in SPS")] + MissingLevelIdc, + + #[error("forbidden zero bit is not zero")] + ForbiddenZeroBit, + + #[error("not initialized")] + NotInitialized, + + #[error("expected SPS before any frames")] + MissingSps, + + #[error("missing timestamp")] + MissingTimestamp, + + #[error("annexb: {0}")] + Annexb(#[from] crate::codec::annexb::Error), +} + +pub type Result = std::result::Result; + /// Annex-B → length-prefixed transmuxer; the H.265 analogue of /// [`crate::codec::h264::Avc1`]. pub struct Hvc1 { @@ -51,7 +87,7 @@ impl Hvc1 { /// - `Ok(None)` if the input contained only parameter sets and the /// transform is still waiting for slice NALs (hvcC may have been /// built as a side effect). - pub fn transform(&mut self, payload: Bytes) -> anyhow::Result> { + pub fn transform(&mut self, payload: Bytes) -> Result> { let mut buf = payload.clone(); let mut nal_iter = crate::codec::annexb::NalIterator::new(&mut buf); @@ -62,7 +98,7 @@ impl Hvc1 { loop { let nal = match nal_iter.next() { Some(Ok(n)) => n, - Some(Err(e)) => return Err(e), + Some(Err(e)) => return Err(e.into()), None => break, }; if self.process_nal(&nal, &mut out, &mut params_changed)? { @@ -88,7 +124,7 @@ impl Hvc1 { Ok(Some(out.freeze())) } - fn process_nal(&mut self, nal: &Bytes, out: &mut BytesMut, params_changed: &mut bool) -> anyhow::Result { + fn process_nal(&mut self, nal: &Bytes, out: &mut BytesMut, params_changed: &mut bool) -> Result { if nal.is_empty() { return Ok(false); } @@ -119,7 +155,7 @@ impl Hvc1 { Ok(false) } _ => { - let len = u32::try_from(nal.len()).context("NAL too large for 4-byte length prefix")?; + let len = u32::try_from(nal.len()).map_err(|_| Error::NalTooLarge)?; out.extend_from_slice(&len.to_be_bytes()); out.extend_from_slice(nal); Ok(true) @@ -127,7 +163,7 @@ impl Hvc1 { } } - fn rebuild_hvcc(&mut self) -> anyhow::Result<()> { + fn rebuild_hvcc(&mut self) -> Result<()> { let (Some(vps), Some(sps), Some(pps)) = (&self.vps, &self.sps, &self.pps) else { return Ok(()); }; @@ -138,20 +174,16 @@ impl Hvc1 { /// Build an HEVCDecoderConfigurationRecord (ISO/IEC 14496-15 §8.3.3). /// Single-layer streams only. -pub(crate) fn build_hvcc(vps_nal: &[u8], sps_nal: &[u8], pps_nal: &[u8]) -> anyhow::Result { +pub(crate) fn build_hvcc(vps_nal: &[u8], sps_nal: &[u8], pps_nal: &[u8]) -> Result { for (label, nal) in [("VPS", vps_nal), ("SPS", sps_nal), ("PPS", pps_nal)] { - anyhow::ensure!( - nal.len() <= u16::MAX as usize, - "{} too large for hvcC length field ({} > {})", - label, - nal.len(), - u16::MAX - ); + if nal.len() > u16::MAX as usize { + return Err(Error::NalTooLargeForHvcc(label, nal.len())); + } } - let sps = SpsNALUnit::parse(&mut &sps_nal[..]).context("failed to parse SPS NAL unit for hvcC")?; + let sps = SpsNALUnit::parse(&mut &sps_nal[..]).map_err(|_| Error::SpsParse)?; let profile = &sps.rbsp.profile_tier_level.general_profile; - let level_idc = profile.level_idc.context("missing level_idc in SPS")?; + let level_idc = profile.level_idc.ok_or(Error::MissingLevelIdc)?; let constraint_flags = pack_constraint_flags(profile); let compat = profile.profile_compatibility_flag.bits().to_be_bytes(); let num_temporal_layers = sps.rbsp.sps_max_sub_layers_minus1 + 1; diff --git a/rs/moq-mux/src/codec/opus/import.rs b/rs/moq-mux/src/codec/opus/import.rs index 6de8c5bb3..1c9312897 100644 --- a/rs/moq-mux/src/codec/opus/import.rs +++ b/rs/moq-mux/src/codec/opus/import.rs @@ -19,7 +19,7 @@ impl Import { mut broadcast: moq_net::BroadcastProducer, mut catalog: crate::catalog::hang::Producer, config: Config, - ) -> anyhow::Result { + ) -> crate::Result { let track = broadcast.unique_track(".opus")?; let mut audio_config = hang::catalog::AudioConfig::new( @@ -46,18 +46,18 @@ impl Import { } /// Finish the track, flushing the current group. - pub fn finish(&mut self) -> anyhow::Result<()> { + pub fn finish(&mut self) -> crate::Result<()> { self.track.finish()?; Ok(()) } /// Close the current group and open the next one at `sequence`. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { + pub fn seek(&mut self, sequence: u64) -> crate::Result<()> { self.track.seek(sequence)?; Ok(()) } - pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { + pub fn decode(&mut self, buf: &mut T, pts: Option) -> crate::Result<()> { let pts = self.pts(pts)?; // Collect the input into a contiguous Bytes payload. @@ -83,7 +83,7 @@ impl Import { Ok(()) } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> crate::Result { if let Some(pts) = hint { return Ok(pts); } diff --git a/rs/moq-mux/src/codec/opus/mod.rs b/rs/moq-mux/src/codec/opus/mod.rs index 7416ca958..035a397a0 100644 --- a/rs/moq-mux/src/codec/opus/mod.rs +++ b/rs/moq-mux/src/codec/opus/mod.rs @@ -11,6 +11,19 @@ use bytes::{Buf, Bytes}; const OPUS_HEAD: u64 = u64::from_be_bytes(*b"OpusHead"); +/// Opus parsing errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("OpusHead must be at least 19 bytes")] + HeadTooShort, + + #[error("invalid OpusHead signature")] + InvalidSignature, +} + +pub type Result = std::result::Result; + /// Typed Opus configuration mirroring the parsed fields of an OpusHead packet. pub struct Config { pub sample_rate: u32, @@ -23,10 +36,14 @@ impl Config { /// Verifies the magic signature; reads channel count and sample rate; /// ignores pre-skip, gain, and channel mapping. Any trailing bytes are /// consumed. - pub fn parse(buf: &mut T) -> anyhow::Result { - anyhow::ensure!(buf.remaining() >= 19, "OpusHead must be at least 19 bytes"); + pub fn parse(buf: &mut T) -> Result { + if buf.remaining() < 19 { + return Err(Error::HeadTooShort); + } let signature = buf.get_u64(); - anyhow::ensure!(signature == OPUS_HEAD, "invalid OpusHead signature"); + if signature != OPUS_HEAD { + return Err(Error::InvalidSignature); + } buf.advance(1); // Skip version let channel_count = buf.get_u8() as u32; diff --git a/rs/moq-mux/src/container/fmp4/export.rs b/rs/moq-mux/src/container/fmp4/export.rs index 376812fff..44e1bf79d 100644 --- a/rs/moq-mux/src/container/fmp4/export.rs +++ b/rs/moq-mux/src/container/fmp4/export.rs @@ -2,13 +2,14 @@ use std::collections::HashMap; use std::task::Poll; use std::time::Duration; -use anyhow::Context; use bytes::Bytes; use hang::catalog::{Catalog, Container, VideoConfig}; use mp4_atom::{DecodeMaybe, Encode}; +use crate::Result; use crate::catalog::CatalogFormat; use crate::container::Frame; +use crate::container::fmp4::Error; use crate::container::{CatalogSource, ExportSource}; @@ -70,7 +71,7 @@ impl Export { /// /// Use [`with_catalog_format`](Self::with_catalog_format) to subscribe to a /// non-default catalog track (e.g. MSF). - pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result { + pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result { Self::with_catalog_format(broadcast, CatalogFormat::default()) } @@ -80,10 +81,7 @@ impl Export { /// Both formats drive the same internal `hang::Catalog`-based pipeline (MSF /// snapshots are converted on receipt), so the only observable difference /// is which wire catalog track is consumed. - pub fn with_catalog_format( - broadcast: moq_net::BroadcastConsumer, - catalog_format: CatalogFormat, - ) -> Result { + pub fn with_catalog_format(broadcast: moq_net::BroadcastConsumer, catalog_format: CatalogFormat) -> Result { let catalog = CatalogSource::new(&broadcast, catalog_format)?; Ok(Self { @@ -128,12 +126,12 @@ impl Export { /// subsequent call returns one moof+mdat fragment. Fragments arrive in ascending /// timestamp order across tracks. Returns `None` when the catalog and every track /// have ended. - pub async fn next(&mut self) -> anyhow::Result> { + pub async fn next(&mut self) -> Result> { conducer::wait(|waiter| self.poll_next(waiter)).await } /// Poll-based variant of [`Self::next`]. - pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { // 1. Drain catalog updates and (un)subscribe tracks accordingly. while let Some(catalog) = self.catalog.as_mut() { match catalog.poll_next(waiter)? { @@ -255,7 +253,7 @@ impl Export { Poll::Pending } - fn update_catalog(&mut self, catalog: &Catalog) -> anyhow::Result<()> { + fn update_catalog(&mut self, catalog: &Catalog) -> Result<()> { let mut active: HashMap = HashMap::new(); for name in catalog.video.renditions.keys() { active.insert(name.clone(), ()); @@ -328,24 +326,24 @@ impl Export { /// Build the merged ftyp + multi-track moov init segment from the cached /// catalog snapshot. CMAF tracks pass their existing init segment through; /// Legacy tracks synthesize a `trak` from codec config + dimensions. - fn build_init(&self) -> anyhow::Result { - let catalog = self.catalog_snapshot.as_ref().context("no catalog snapshot")?; + fn build_init(&self) -> Result { + let catalog = self.catalog_snapshot.as_ref().ok_or(Error::NoCatalogSnapshot)?; let mut traks: Vec = Vec::new(); let mut trexs: Vec = Vec::new(); let mut ftyp_data: Option = None; for (name, config) in &catalog.video.renditions { - let track = self.tracks.get(name).context("video track not subscribed")?; + let track = self + .tracks + .get(name) + .ok_or_else(|| Error::MissingVideoTrack(name.clone()))?; match &config.container { Container::Cmaf { init, .. } => { extract_init(init, &mut ftyp_data, &mut traks, &mut trexs)?; } Container::Legacy | Container::Loc => { - let description = track - .source - .description() - .context("video track missing codec config for synthesized init")?; + let description = track.source.description().ok_or(Error::MissingVideoConfig)?; let trak = crate::container::fmp4::synthesize_video_trak( track.track_id, track.timescale, @@ -363,7 +361,10 @@ impl Export { } for (name, config) in &catalog.audio.renditions { - let track = self.tracks.get(name).context("audio track not subscribed")?; + let track = self + .tracks + .get(name) + .ok_or_else(|| Error::MissingAudioTrack(name.clone()))?; match &config.container { Container::Cmaf { init, .. } => { extract_init(init, &mut ftyp_data, &mut traks, &mut trexs)?; @@ -419,7 +420,7 @@ fn extract_init( ftyp_data: &mut Option, traks: &mut Vec, trexs: &mut Vec, -) -> anyhow::Result<()> { +) -> Result<()> { let mut cursor = std::io::Cursor::new(init.as_ref()); while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor)? { match atom { @@ -484,11 +485,13 @@ fn should_flush(track: &Fmp4Track, frame: &Frame, fragment_duration: Option) -> anyhow::Result { - anyhow::ensure!(!frames.is_empty(), "encode_fragment called with no frames"); +fn encode_fragment(track: &mut Fmp4Track, frames: Vec) -> Result { + if frames.is_empty() { + return Err(Error::NoFrames.into()); + } let seq = track.sequence_number; track.sequence_number += 1; - let timescale = moq_net::Timescale::new(track.timescale).context("invalid track timescale")?; + let timescale = moq_net::Timescale::new(track.timescale)?; Ok(crate::container::fmp4::encode_fragment( track.track_id, timescale, @@ -513,13 +516,13 @@ fn catalog_timescale_audio(config: &hang::catalog::AudioConfig) -> u64 { } } -fn parse_timescale_from_init(init: &[u8]) -> anyhow::Result { +fn parse_timescale_from_init(init: &[u8]) -> Result { let mut cursor = std::io::Cursor::new(init); while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor)? { if let mp4_atom::Any::Moov(moov) = atom { - let trak = moov.trak.first().context("no tracks in moov")?; + let trak = moov.trak.first().ok_or(Error::NoTracks)?; return Ok(trak.mdia.mdhd.timescale as u64); } } - anyhow::bail!("no moov in init data") + Err(Error::NoMoov.into()) } diff --git a/rs/moq-mux/src/container/fmp4/import.rs b/rs/moq-mux/src/container/fmp4/import.rs index 535af7fdd..da8ff861b 100644 --- a/rs/moq-mux/src/container/fmp4/import.rs +++ b/rs/moq-mux/src/container/fmp4/import.rs @@ -1,4 +1,3 @@ -use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; use hang::catalog::{AAC, AudioCodec, AudioConfig, Container, H264, H265, VP9, VideoCodec, VideoConfig}; use moq_net::Timestamp; @@ -6,6 +5,9 @@ use mp4_atom::{Any, Atom, DecodeMaybe, Encode, Mdat, Moof, Moov, Trak}; use std::collections::HashMap; use tokio::io::{AsyncRead, AsyncReadExt}; +use super::Error; +use crate::Result; + /// Converts fMP4/CMAF files into MoQ broadcast streams using CMAF passthrough. /// /// This struct processes fragmented MP4 (fMP4) files and transports complete @@ -82,7 +84,7 @@ impl Import { } /// Decode from an asynchronous reader. - pub async fn decode_from(&mut self, reader: &mut T) -> anyhow::Result<()> { + pub async fn decode_from(&mut self, reader: &mut T) -> Result<()> { let mut buffer = BytesMut::new(); while reader.read_buf(&mut buffer).await? > 0 { self.decode(&mut buffer)?; @@ -92,7 +94,7 @@ impl Import { } /// Decode a buffer of bytes. - pub fn decode>(&mut self, buf: &mut T) -> anyhow::Result<()> { + pub fn decode>(&mut self, buf: &mut T) -> Result<()> { let mut cursor = std::io::Cursor::new(buf); let mut position = 0; @@ -109,7 +111,9 @@ impl Import { self.init(moov)?; } Any::Moof(moof) => { - anyhow::ensure!(self.moof.is_none(), "duplicate moof box"); + if self.moof.is_some() { + return Err(Error::DuplicateMoof.into()); + } self.moof.replace(moof); self.moof_size = size; } @@ -135,7 +139,7 @@ impl Import { self.moov.is_some() } - fn init(&mut self, moov: Moov) -> anyhow::Result<()> { + fn init(&mut self, moov: Moov) -> Result<()> { // Clone the catalog to avoid the borrow checker. let mut catalog = self.catalog.clone(); let mut catalog = catalog.lock(); @@ -158,8 +162,12 @@ impl Import { catalog.audio.renditions.insert(track.name.clone(), config); TrackKind::Audio } - b"sbtl" => anyhow::bail!("subtitle tracks are not supported"), - handler => anyhow::bail!("unknown track type: {:?}", handler), + b"sbtl" => return Err(Error::UnsupportedSubtitle.into()), + handler => { + let mut buf = [0u8; 4]; + buf[..handler.len().min(4)].copy_from_slice(&handler[..handler.len().min(4)]); + return Err(Error::UnknownTrackHandler(buf).into()); + } }; self.tracks.insert( @@ -183,7 +191,7 @@ impl Import { Ok(()) } - fn container(&self, trak: &Trak, moov: &Moov) -> anyhow::Result { + fn container(&self, trak: &Trak, moov: &Moov) -> Result { // Build a single-track init segment (ftyp+moov) for this track. { let ftyp = mp4_atom::Ftyp { @@ -229,14 +237,14 @@ impl Import { } } - fn init_video(&mut self, trak: &Trak, moov: &Moov) -> anyhow::Result { + fn init_video(&mut self, trak: &Trak, moov: &Moov) -> Result { let container = self.container(trak, moov)?; let stsd = &trak.mdia.minf.stbl.stsd; let codec = match stsd.codecs.len() { - 0 => anyhow::bail!("missing codec"), + 0 => return Err(Error::MissingCodec.into()), 1 => &stsd.codecs[0], - _ => anyhow::bail!("multiple codecs"), + _ => return Err(Error::MultipleCodecs.into()), }; let config = match codec { @@ -293,8 +301,8 @@ impl Import { config.container = container; config } - mp4_atom::Codec::Unknown(unknown) => anyhow::bail!("unknown codec: {:?}", unknown), - unsupported => anyhow::bail!("unsupported codec: {:?}", unsupported), + mp4_atom::Codec::Unknown(unknown) => return Err(Error::UnknownCodec(*unknown).into()), + unsupported => return Err(Error::UnsupportedCodec(Box::new(unsupported.clone())).into()), }; Ok(config) @@ -306,7 +314,7 @@ impl Import { hvcc: &mp4_atom::Hvcc, visual: &mp4_atom::Visual, container: Container, - ) -> anyhow::Result { + ) -> Result { let mut description = BytesMut::new(); hvcc.encode_body(&mut description)?; @@ -326,14 +334,14 @@ impl Import { Ok(config) } - fn init_audio(&mut self, trak: &Trak, moov: &Moov) -> anyhow::Result { + fn init_audio(&mut self, trak: &Trak, moov: &Moov) -> Result { let container = self.container(trak, moov)?; let stsd = &trak.mdia.minf.stbl.stsd; let codec = match stsd.codecs.len() { - 0 => anyhow::bail!("missing codec"), + 0 => return Err(Error::MissingCodec.into()), 1 => &stsd.codecs[0], - _ => anyhow::bail!("multiple codecs"), + _ => return Err(Error::MultipleCodecs.into()), }; let config = match codec { @@ -342,7 +350,7 @@ impl Import { // TODO Also support mp4a.67 if desc.object_type_indication != 0x40 { - anyhow::bail!("unsupported codec: MPEG2"); + return Err(Error::UnsupportedMpeg2.into()); } let bitrate = desc.avg_bitrate.max(desc.max_bitrate); @@ -374,31 +382,31 @@ impl Import { config.container = container; config } - mp4_atom::Codec::Unknown(unknown) => anyhow::bail!("unknown codec: {:?}", unknown), - unsupported => anyhow::bail!("unsupported codec: {:?}", unsupported), + mp4_atom::Codec::Unknown(unknown) => return Err(Error::UnknownCodec(*unknown).into()), + unsupported => return Err(Error::UnsupportedCodec(Box::new(unsupported.clone())).into()), }; Ok(config) } // Extract all frames out of an mdat atom using CMAF passthrough. - fn extract(&mut self, mdat: Mdat, mdat_raw: &[u8]) -> anyhow::Result<()> { - let moov = self.moov.as_ref().context("missing moov box")?; - let moof = self.moof.take().context("missing moof box")?; + fn extract(&mut self, mdat: Mdat, mdat_raw: &[u8]) -> Result<()> { + let moov = self.moov.as_ref().ok_or(Error::NoMoov)?; + let moof = self.moof.take().ok_or(Error::NoMoof)?; let moof_size = self.moof_size; let header_size = mdat_raw.len() - mdat.data.len(); // Loop over all of the traf boxes in the moof. for traf in &moof.traf { let track_id = traf.tfhd.track_id; - let track = self.tracks.get_mut(&track_id).context("unknown track")?; + let track = self.tracks.get_mut(&track_id).ok_or(Error::UnknownTrack(track_id))?; // Find the track information in the moov let trak = moov .trak .iter() .find(|trak| trak.tkhd.track_id == track_id) - .context("unknown track")?; + .ok_or(Error::UnknownTrack(track_id))?; let trex = moov .mvex .as_ref() @@ -409,16 +417,15 @@ impl Import { let default_sample_size = trex.map(|trex| trex.default_sample_size).unwrap_or_default(); let default_sample_flags = trex.map(|trex| trex.default_sample_flags).unwrap_or_default(); - let tfdt = traf.tfdt.as_ref().context("missing tfdt box")?; + let tfdt = traf.tfdt.as_ref().ok_or(Error::MissingTfdt)?; let mut dts = tfdt.base_media_decode_time; - let timescale = - moq_net::Timescale::new(trak.mdia.mdhd.timescale as u64).context("invalid fmp4 mdhd.timescale")?; + let timescale = moq_net::Timescale::new(trak.mdia.mdhd.timescale as u64)?; let mut offset = traf.tfhd.base_data_offset.unwrap_or_default() as usize; let mut track_data_start: Option = None; if traf.trun.is_empty() { - anyhow::bail!("missing trun box"); + return Err(Error::MissingTrun.into()); } // Keep track of the minimum and maximum timestamp for this track to compute the jitter. @@ -431,16 +438,16 @@ impl Import { if let Some(data_offset) = trun.data_offset { let base_offset = tfhd.base_data_offset.unwrap_or_default() as usize; - let data_offset: usize = data_offset.try_into().context("invalid data offset")?; + let data_offset: usize = data_offset.try_into().map_err(|_| Error::InvalidDataOffset)?; let relative_offset = data_offset .checked_sub(moof_size) .and_then(|v| v.checked_sub(header_size)) - .context("invalid data offset: underflow")?; + .ok_or(Error::InvalidDataOffset)?; offset = base_offset .checked_add(relative_offset) - .context("invalid data offset: overflow")?; + .ok_or(Error::InvalidDataOffset)?; } // Capture the actual start offset for this traf before consuming samples @@ -465,7 +472,7 @@ impl Import { let timestamp = moq_net::Timestamp::new(pts, timescale)?; if offset + size > mdat.data.len() { - anyhow::bail!("invalid data offset"); + return Err(Error::InvalidDataOffset.into()); } let keyframe = match track.kind { @@ -514,13 +521,14 @@ impl Import { // The per-track sample range must be in bounds of the original mdat. // If not, the parsed sample sizes/offsets disagree with the actual data // and we cannot safely emit a passthrough fragment with rewritten offsets. - anyhow::ensure!( - track_data_start <= track_data_end && track_data_end <= mdat.data.len(), - "track sample range {}..{} is out of bounds of mdat (len {})", - track_data_start, - track_data_end, - mdat.data.len() - ); + if !(track_data_start <= track_data_end && track_data_end <= mdat.data.len()) { + return Err(Error::SampleRangeOutOfBounds { + start: track_data_start, + end: track_data_end, + len: mdat.data.len(), + } + .into()); + } let track_mdat_data = &mdat.data[track_data_start..track_data_end]; let mut adjusted_moof = single_traf_moof; @@ -584,7 +592,7 @@ impl Import { None => track.track.append_group()?, } } else { - track.group.take().context("no keyframe at start")? + track.group.take().ok_or(Error::NoKeyframe)? }; g.write_frame(fragment_bytes)?; @@ -605,7 +613,7 @@ impl Import { .video .renditions .get_mut(&track.track.name) - .context("missing video config")?; + .ok_or_else(|| Error::MissingVideoTrack(track.track.name.clone()))?; config.jitter = Some(jitter.into()); } TrackKind::Audio => { @@ -613,7 +621,7 @@ impl Import { .audio .renditions .get_mut(&track.track.name) - .context("missing audio config")?; + .ok_or_else(|| Error::MissingAudioTrack(track.track.name.clone()))?; config.jitter = Some(jitter.into()); } } @@ -627,7 +635,7 @@ impl Import { impl Import { /// Finish all tracks, flushing current groups. - pub fn finish(&mut self) -> anyhow::Result<()> { + pub fn finish(&mut self) -> Result<()> { for track in self.tracks.values_mut() { if let Some(mut g) = track.group.take() { g.finish()?; @@ -641,7 +649,7 @@ impl Import { /// /// Broadcast-wide: every track inside this fMP4 import advances together; per-track /// control is intentionally not exposed. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { + pub fn seek(&mut self, sequence: u64) -> Result<()> { for track in self.tracks.values_mut() { if let Some(mut g) = track.group.take() { g.finish()?; diff --git a/rs/moq-mux/src/container/fmp4/mod.rs b/rs/moq-mux/src/container/fmp4/mod.rs index 2d2dfc48e..e93984863 100644 --- a/rs/moq-mux/src/container/fmp4/mod.rs +++ b/rs/moq-mux/src/container/fmp4/mod.rs @@ -27,11 +27,11 @@ use moq_net::Timestamp; use crate::container::{Container, Frame}; -#[derive(Debug, thiserror::Error)] +#[derive(Debug, Clone, thiserror::Error)] #[non_exhaustive] pub enum Error { #[error("mp4: {0}")] - Mp4(#[from] mp4_atom::Error), + Mp4(std::sync::Arc), #[error("moq: {0}")] Moq(#[from] moq_net::Error), @@ -48,13 +48,13 @@ pub enum Error { #[error("PTS overflow")] PtsOverflow, - #[error("no moof found in CMAF frame data")] + #[error("missing moof")] NoMoof, - #[error("no mdat found in CMAF frame data")] + #[error("missing mdat")] NoMdat, - #[error("no moov found in init data")] + #[error("missing moov")] NoMoov, #[error("no tracks in moov")] @@ -65,8 +65,73 @@ pub enum Error { #[error("can't synthesize CMAF init for {0}")] UnsupportedSynthesis(String), + + #[error("subtitle tracks are not supported")] + UnsupportedSubtitle, + + #[error("unknown track handler: {0:?}")] + UnknownTrackHandler([u8; 4]), + + #[error("missing codec")] + MissingCodec, + + #[error("multiple codecs")] + MultipleCodecs, + + #[error("unknown codec: {0:?}")] + UnknownCodec(mp4_atom::FourCC), + + #[error("unsupported codec: {0:?}")] + UnsupportedCodec(Box), + + #[error("unsupported codec: MPEG2")] + UnsupportedMpeg2, + + #[error("duplicate moof")] + DuplicateMoof, + + #[error("missing trun")] + MissingTrun, + + #[error("missing tfdt")] + MissingTfdt, + + #[error("missing video config for synthesized init")] + MissingVideoConfig, + + #[error("video track {0} missing in catalog")] + MissingVideoTrack(String), + + #[error("audio track {0} missing in catalog")] + MissingAudioTrack(String), + + #[error("invalid data offset")] + InvalidDataOffset, + + #[error("unknown track {0}")] + UnknownTrack(u32), + + #[error("no keyframe at start of group")] + NoKeyframe, + + #[error("track sample range {start}..{end} is out of bounds of mdat (len {len})")] + SampleRangeOutOfBounds { start: usize, end: usize, len: usize }, + + #[error("no catalog snapshot")] + NoCatalogSnapshot, + + #[error("encode_fragment called with no frames")] + NoFrames, } +impl From for Error { + fn from(err: mp4_atom::Error) -> Self { + Error::Mp4(std::sync::Arc::new(err)) + } +} + +pub type Result = std::result::Result; + /// CMAF container: encodes/decodes a single track's moof+mdat fragments. /// /// Build from a CMAF init segment with [`Wire::from_init`], or wrap a @@ -86,7 +151,7 @@ impl Wire { } /// Parse a CMAF init segment (ftyp+moov), extracting the single track. - pub fn from_init(init_data: &[u8]) -> Result { + pub fn from_init(init_data: &[u8]) -> Result { use mp4_atom::DecodeMaybe; let mut cursor = std::io::Cursor::new(init_data); @@ -110,7 +175,7 @@ impl Wire { impl Container for Wire { type Error = Error; - fn write(&self, group: &mut moq_net::GroupProducer, frames: &[Frame]) -> Result<(), Self::Error> { + fn write(&self, group: &mut moq_net::GroupProducer, frames: &[Frame]) -> std::result::Result<(), Self::Error> { let timescale = moq_net::Timescale::new(self.trak.mdia.mdhd.timescale as u64)?; let track_id = self.trak.tkhd.track_id; encode(group, frames, timescale, track_id) @@ -120,7 +185,7 @@ impl Container for Wire { &self, group: &mut moq_net::GroupConsumer, waiter: &conducer::Waiter, - ) -> Poll>, Self::Error>> { + ) -> Poll>, Self::Error>> { use std::task::ready; let Some(data) = ready!(group.poll_read_frame(waiter)?) else { @@ -132,7 +197,7 @@ impl Container for Wire { } } -pub(crate) fn decode(data: Bytes, timescale: moq_net::Timescale) -> Result, Error> { +pub(crate) fn decode(data: Bytes, timescale: moq_net::Timescale) -> Result> { use mp4_atom::DecodeMaybe; let mut cursor = std::io::Cursor::new(&data); @@ -166,7 +231,11 @@ pub(crate) fn decode(data: Bytes, timescale: moq_net::Timescale) -> Result mdat_data.len() { - return Ok(frames); + return Err(Error::SampleRangeOutOfBounds { + start: offset, + end, + len: mdat_data.len(), + }); } let cts = entry.cts.unwrap_or_default() as i64; @@ -197,7 +266,7 @@ pub(crate) fn encode( frames: &[Frame], timescale: moq_net::Timescale, track_id: u32, -) -> Result<(), Error> { +) -> Result<()> { if frames.is_empty() { return Ok(()); } @@ -224,7 +293,7 @@ pub(crate) fn encode_fragment( timescale: moq_net::Timescale, sequence_number: u32, frames: &[Frame], -) -> Result { +) -> Result { use mp4_atom::Encode; if frames.is_empty() { @@ -296,7 +365,7 @@ pub(crate) fn synthesize_video_trak( timescale: u64, config: &VideoConfig, description: &[u8], -) -> Result { +) -> Result { let width = config.coded_width.unwrap_or(0) as u16; let height = config.coded_height.unwrap_or(0) as u16; let visual = mp4_atom::Visual { @@ -309,7 +378,7 @@ pub(crate) fn synthesize_video_trak( let sample_entry = match &config.codec { VideoCodec::H264(_) => { let mut cursor = std::io::Cursor::new(description); - let avcc = mp4_atom::Avcc::decode_body(&mut cursor).map_err(Error::Mp4)?; + let avcc = mp4_atom::Avcc::decode_body(&mut cursor).map_err(Error::from)?; mp4_atom::Codec::from(mp4_atom::Avc1 { visual, avcc, @@ -318,7 +387,7 @@ pub(crate) fn synthesize_video_trak( } VideoCodec::H265(h265) => { let mut cursor = std::io::Cursor::new(description); - let hvcc = mp4_atom::Hvcc::decode_body(&mut cursor).map_err(Error::Mp4)?; + let hvcc = mp4_atom::Hvcc::decode_body(&mut cursor).map_err(Error::from)?; // `in_band` (catalog) ↔ hev1 sample entry; otherwise hvc1. if h265.in_band { mp4_atom::Codec::from(mp4_atom::Hev1 { @@ -341,11 +410,7 @@ pub(crate) fn synthesize_video_trak( } /// Synthesize a CMAF `Trak` for an audio rendition that has no init segment. -pub(crate) fn synthesize_audio_trak( - track_id: u32, - timescale: u64, - config: &AudioConfig, -) -> Result { +pub(crate) fn synthesize_audio_trak(track_id: u32, timescale: u64, config: &AudioConfig) -> Result { let audio = mp4_atom::Audio { data_reference_index: 1, channel_count: config.channel_count as u16, diff --git a/rs/moq-mux/src/container/hls/import.rs b/rs/moq-mux/src/container/hls/import.rs index f6c41c9d4..9a93c4115 100644 --- a/rs/moq-mux/src/container/hls/import.rs +++ b/rs/moq-mux/src/container/hls/import.rs @@ -8,7 +8,6 @@ use std::collections::hash_map::Entry; use std::path::PathBuf; use std::time::Duration; -use anyhow::Context; use bytes::Bytes; use m3u8_rs::{ AlternativeMedia, AlternativeMediaType, Map, MasterPlaylist, MediaPlaylist, MediaSegment, Resolution, VariantStream, @@ -17,7 +16,9 @@ use reqwest::Client; use tracing::{debug, info, warn}; use url::Url; +use crate::Result; use crate::container::fmp4::Import as Fmp4; +use crate::container::hls::Error; /// Configuration for the single-rendition HLS ingest loop. #[derive(Clone)] @@ -38,9 +39,9 @@ impl Config { /// Parse the playlist string into a URL. /// If it starts with http:// or https://, parse as URL. /// Otherwise, treat as a file path and convert to file:// URL. - fn parse_playlist(&self) -> anyhow::Result { + fn parse_playlist(&self) -> Result { if self.playlist.starts_with("http://") || self.playlist.starts_with("https://") { - Url::parse(&self.playlist).context("invalid playlist URL") + Url::parse(&self.playlist).map_err(|_| Error::InvalidPlaylistUrl.into()) } else { let path = PathBuf::from(&self.playlist); let absolute = if path.is_absolute() { @@ -48,7 +49,7 @@ impl Config { } else { std::env::current_dir()?.join(path) }; - Url::from_file_path(&absolute).ok().context("invalid file path") + Url::from_file_path(&absolute).map_err(|_| Error::InvalidFilePath.into()) } } } @@ -116,7 +117,7 @@ impl Import { broadcast: moq_net::BroadcastProducer, catalog: crate::catalog::hang::Producer, cfg: Config, - ) -> anyhow::Result { + ) -> Result { let base_url = cfg.parse_playlist()?; let client = cfg.client.unwrap_or_else(|| { Client::builder() @@ -139,7 +140,7 @@ impl Import { /// Fetch the latest playlist, download the init segment, and prime the importer with a buffer of segments. /// /// Returns the number of segments buffered during initialization. - pub async fn init(&mut self) -> anyhow::Result<()> { + pub async fn init(&mut self) -> Result<()> { let buffered = self.prime().await?; if buffered == 0 { warn!("HLS playlist had no new segments during init step"); @@ -150,7 +151,7 @@ impl Import { } /// Run the ingest loop until cancelled. - pub async fn run(&mut self) -> anyhow::Result<()> { + pub async fn run(&mut self) -> Result<()> { loop { let outcome = self.step().await?; let delay = self.refresh_delay(outcome.target_duration, outcome.wrote_segments); @@ -167,7 +168,7 @@ impl Import { } /// Internal: fetch the latest playlist, download the init segment, and buffer segments. - async fn prime(&mut self) -> anyhow::Result { + async fn prime(&mut self) -> Result { self.ensure_tracks().await?; let mut buffered = 0usize; @@ -205,7 +206,7 @@ impl Import { /// This fetches the current media playlists, consumes any fresh segments, /// and returns how many segments were written along with the target /// duration to guide scheduling of the next step. - async fn step(&mut self) -> anyhow::Result { + async fn step(&mut self) -> Result { self.ensure_tracks().await?; let mut wrote = 0usize; @@ -257,17 +258,16 @@ impl Import { base } - async fn fetch_media_playlist(&self, url: Url) -> anyhow::Result { + async fn fetch_media_playlist(&self, url: Url) -> Result { let body = self.fetch_bytes(url).await?; // Nom errors take ownership of the input, so we need to stringify any error messages. - let playlist = m3u8_rs::parse_media_playlist_res(&body) - .map_err(|e| anyhow::anyhow!("failed to parse media playlist: {}", e))?; + let playlist = m3u8_rs::parse_media_playlist_res(&body).map_err(|e| Error::ParsePlaylist(e.to_string()))?; Ok(playlist) } - async fn ensure_tracks(&mut self) -> anyhow::Result<()> { + async fn ensure_tracks(&mut self) -> Result<()> { // Tracks already discovered. if !self.video.is_empty() { return Ok(()); @@ -276,7 +276,9 @@ impl Import { let body = self.fetch_bytes(self.base_url.clone()).await?; if let Ok((_, master)) = m3u8_rs::parse_master_playlist(&body) { let variants = select_variants(&master); - anyhow::ensure!(!variants.is_empty(), "no usable variants found in master playlist"); + if variants.is_empty() { + return Err(Error::NoVariants.into()); + } // Create a video track state for every usable variant. for variant in &variants { @@ -319,7 +321,7 @@ impl Import { track: &mut TrackState, playlist: &MediaPlaylist, limit: Option, - ) -> anyhow::Result { + ) -> Result { self.ensure_init_segment(kind, track, playlist).await?; let next_seq = track.next_sequence.unwrap_or(0); @@ -383,12 +385,12 @@ impl Import { kind: TrackKind, track: &mut TrackState, playlist: &MediaPlaylist, - ) -> anyhow::Result<()> { + ) -> Result<()> { if track.init_ready { return Ok(()); } - let map = self.find_map(playlist).context("playlist missing EXT-X-MAP")?; + let map = self.find_map(playlist).ok_or(Error::MissingMap)?; let url = resolve_uri(&track.playlist, &map.uri)?; let mut bytes = self.fetch_bytes(url).await?; @@ -397,13 +399,14 @@ impl Import { TrackKind::Audio => self.ensure_audio_importer(), }; - importer.decode(&mut bytes).context("init segment parse error")?; + importer.decode(&mut bytes)?; - anyhow::ensure!(bytes.is_empty(), "init segment was not fully consumed"); - anyhow::ensure!( - importer.is_initialized(), - "init segment did not initialize the importer" - ); + if !bytes.is_empty() { + return Err(Error::InitNotConsumed.into()); + } + if !importer.is_initialized() { + return Err(Error::InitNotInitialized.into()); + } track.init_ready = true; info!(?kind, "loaded HLS init segment"); @@ -416,8 +419,10 @@ impl Import { track: &mut TrackState, segment: &MediaSegment, sequence: u64, - ) -> anyhow::Result<()> { - anyhow::ensure!(!segment.uri.is_empty(), "encountered segment with empty URI"); + ) -> Result<()> { + if segment.uri.is_empty() { + return Err(Error::EmptySegmentUri.into()); + } let url = resolve_uri(&track.playlist, &segment.uri)?; let mut bytes = self.fetch_bytes(url).await?; @@ -438,13 +443,10 @@ impl Import { // Final check after ensuring init segment if !importer.is_initialized() { - return Err(anyhow::anyhow!( - "importer not initialized for {:?} after ensure_init_segment - init segment processing failed", - kind - )); + return Err(Error::ImporterNotInitialized(format!("{:?}", kind)).into()); } - importer.decode(&mut bytes).context("failed to parse media segment")?; + importer.decode(&mut bytes)?; track.next_sequence = Some(sequence + 1); Ok(()) @@ -454,15 +456,15 @@ impl Import { playlist.segments.iter().find_map(|segment| segment.map.as_ref()) } - async fn fetch_bytes(&self, url: Url) -> anyhow::Result { + async fn fetch_bytes(&self, url: Url) -> Result { if url.scheme() == "file" { - let path = url.to_file_path().ok().context("invalid file URL")?; - let bytes = tokio::fs::read(&path).await.context("failed to read file")?; + let path = url.to_file_path().map_err(|_| Error::InvalidFileUrl)?; + let bytes = tokio::fs::read(&path).await.map_err(Error::from)?; Ok(Bytes::from(bytes)) } else { - let response = self.client.get(url).send().await?; - let response = response.error_for_status()?; - let bytes = response.bytes().await.context("failed to read response body")?; + let response = self.client.get(url).send().await.map_err(Error::from)?; + let response = response.error_for_status().map_err(Error::from)?; + let bytes = response.bytes().await.map_err(Error::from)?; Ok(bytes) } } diff --git a/rs/moq-mux/src/container/hls/mod.rs b/rs/moq-mux/src/container/hls/mod.rs index 84eb90c1c..634dd9da7 100644 --- a/rs/moq-mux/src/container/hls/mod.rs +++ b/rs/moq-mux/src/container/hls/mod.rs @@ -7,3 +7,61 @@ mod import; pub use import::*; + +/// HLS ingest errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("invalid playlist URL")] + InvalidPlaylistUrl, + + #[error("invalid file path")] + InvalidFilePath, + + #[error("invalid file URL")] + InvalidFileUrl, + + #[error("failed to parse media playlist: {0}")] + ParsePlaylist(String), + + #[error("no usable variants found in master playlist")] + NoVariants, + + #[error("playlist missing EXT-X-MAP")] + MissingMap, + + #[error("init segment was not fully consumed")] + InitNotConsumed, + + #[error("init segment did not initialize the importer")] + InitNotInitialized, + + #[error("encountered segment with empty URI")] + EmptySegmentUri, + + #[error("importer not initialized for {0:?} after ensure_init_segment - init segment processing failed")] + ImporterNotInitialized(String), + + #[error("url parse: {0}")] + UrlParse(#[from] url::ParseError), + + #[error("reqwest: {0}")] + Reqwest(std::sync::Arc), + + #[error("io: {0}")] + Io(std::sync::Arc), +} + +impl From for Error { + fn from(err: reqwest::Error) -> Self { + Error::Reqwest(std::sync::Arc::new(err)) + } +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::Io(std::sync::Arc::new(err)) + } +} + +pub type Result = std::result::Result; diff --git a/rs/moq-mux/src/container/mkv/export.rs b/rs/moq-mux/src/container/mkv/export.rs index 38c934f2b..539c3db19 100644 --- a/rs/moq-mux/src/container/mkv/export.rs +++ b/rs/moq-mux/src/container/mkv/export.rs @@ -3,14 +3,15 @@ use std::io::Cursor; use std::task::Poll; use std::time::Duration; -use anyhow::Context; use bytes::{BufMut, Bytes, BytesMut}; use hang::catalog::{AudioCodec, AudioConfig, Catalog, Container, VideoCodec, VideoConfig}; use webm_iterable::matroska_spec::{Master, MatroskaSpec}; use webm_iterable::{WebmWriter, WriteOptions}; +use crate::Result; use crate::catalog::CatalogFormat; use crate::container::Frame; +use crate::container::mkv::Error; use crate::container::{CatalogSource, ExportSource}; @@ -112,11 +113,11 @@ impl ClusterBuilder { keyframe: bool, payload: &[u8], is_video: bool, - ) -> anyhow::Result<()> { + ) -> Result<()> { let rel = (frame_ticks as i64) .checked_sub(self.start_ticks as i64) - .context("cluster underflow")?; - let rel: i16 = rel.try_into().context("block timestamp doesn't fit in i16")?; + .ok_or(Error::ClusterUnderflow)?; + let rel: i16 = rel.try_into().map_err(|_| Error::BlockTimestampOverflow)?; let sb_body = encode_simple_block_body(track_number, rel, keyframe, payload); write_tag_id(&mut self.body, ID_SIMPLEBLOCK as u32); @@ -157,7 +158,7 @@ impl Export { /// /// Use [`with_catalog_format`](Self::with_catalog_format) to subscribe to a /// non-default catalog track (e.g. MSF). - pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result { + pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result { Self::with_catalog_format(broadcast, CatalogFormat::default()) } @@ -167,10 +168,7 @@ impl Export { /// Both formats drive the same internal `hang::Catalog`-based pipeline (MSF /// snapshots are converted on receipt), so the only observable difference /// is which wire catalog track is consumed. - pub fn with_catalog_format( - broadcast: moq_net::BroadcastConsumer, - catalog_format: CatalogFormat, - ) -> Result { + pub fn with_catalog_format(broadcast: moq_net::BroadcastConsumer, catalog_format: CatalogFormat) -> Result { let catalog = CatalogSource::new(&broadcast, catalog_format)?; Ok(Self { @@ -207,11 +205,11 @@ impl Export { } /// Get the next byte chunk. - pub async fn next(&mut self) -> anyhow::Result> { + pub async fn next(&mut self) -> Result> { conducer::wait(|waiter| self.poll_next(waiter)).await } - pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + pub fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { // 1. Drain catalog updates. while let Some(catalog) = self.catalog.as_mut() { match catalog.poll_next(waiter)? { @@ -310,7 +308,7 @@ impl Export { Poll::Pending } - fn update_catalog(&mut self, catalog: Catalog) -> anyhow::Result<()> { + fn update_catalog(&mut self, catalog: Catalog) -> Result<()> { let mut active: HashMap = HashMap::new(); for name in catalog.video.renditions.keys() { active.insert(name.clone(), ()); @@ -324,12 +322,12 @@ impl Export { if self.header_emitted { for name in active.keys() { if !self.tracks.contains_key(name) { - anyhow::bail!("MKV track layout changed after header was emitted: track '{name}' added"); + return Err(Error::HeaderAddedTrack(name.clone()).into()); } } for name in self.tracks.keys() { if !active.contains_key(name) { - anyhow::bail!("MKV track layout changed after header was emitted: track '{name}' removed"); + return Err(Error::HeaderRemovedTrack(name.clone()).into()); } } self.catalog_snapshot = Some(catalog); @@ -387,8 +385,8 @@ impl Export { self.tracks.values().all(|t| t.source.header_ready()) } - fn build_header(&self) -> anyhow::Result { - let catalog = self.catalog_snapshot.as_ref().context("no catalog snapshot")?; + fn build_header(&self) -> Result { + let catalog = self.catalog_snapshot.as_ref().ok_or(Error::NoCatalogSnapshot)?; // Decide DocType: webm only if every codec is WebM-allowed. let webm_only = catalog @@ -405,7 +403,10 @@ impl Export { let mut entries: Vec = Vec::new(); for (name, config) in catalog.video.renditions.iter() { - let track = self.tracks.get(name).context("video track not subscribed")?; + let track = self + .tracks + .get(name) + .ok_or_else(|| Error::MissingVideoTrack(name.clone()))?; entries.push(build_video_track_entry( track.track_number, config, @@ -413,29 +414,40 @@ impl Export { )?); } for (name, config) in catalog.audio.renditions.iter() { - let track = self.tracks.get(name).context("audio track not subscribed")?; + let track = self + .tracks + .get(name) + .ok_or_else(|| Error::MissingAudioTrack(name.clone()))?; entries.push(build_audio_track_entry(track.track_number, config)?); } let mut dest = Cursor::new(Vec::new()); { let mut writer = WebmWriter::new(&mut dest); - writer.write(&MatroskaSpec::Ebml(Master::Full(vec![ - MatroskaSpec::DocType(doc_type.to_string()), - MatroskaSpec::DocTypeVersion(4), - MatroskaSpec::DocTypeReadVersion(2), - ])))?; - writer.write_advanced( - &MatroskaSpec::Segment(Master::Start), - WriteOptions::is_unknown_sized_element(), - )?; - writer.write(&MatroskaSpec::Info(Master::Full(vec![ - MatroskaSpec::TimestampScale(TIMESTAMP_SCALE_NS), - MatroskaSpec::MuxingApp("moq-mux".to_string()), - MatroskaSpec::WritingApp("moq-mux".to_string()), - ])))?; - writer.write(&MatroskaSpec::Tracks(Master::Full(entries)))?; - writer.flush()?; + writer + .write(&MatroskaSpec::Ebml(Master::Full(vec![ + MatroskaSpec::DocType(doc_type.to_string()), + MatroskaSpec::DocTypeVersion(4), + MatroskaSpec::DocTypeReadVersion(2), + ]))) + .map_err(Error::from)?; + writer + .write_advanced( + &MatroskaSpec::Segment(Master::Start), + WriteOptions::is_unknown_sized_element(), + ) + .map_err(Error::from)?; + writer + .write(&MatroskaSpec::Info(Master::Full(vec![ + MatroskaSpec::TimestampScale(TIMESTAMP_SCALE_NS), + MatroskaSpec::MuxingApp("moq-mux".to_string()), + MatroskaSpec::WritingApp("moq-mux".to_string()), + ]))) + .map_err(Error::from)?; + writer + .write(&MatroskaSpec::Tracks(Master::Full(entries))) + .map_err(Error::from)?; + writer.flush().map_err(Error::from)?; } Ok(Bytes::from(dest.into_inner())) @@ -453,8 +465,8 @@ impl Export { /// a chunk if the cluster rolled over (the returned chunk is the /// *previous* cluster; the new frame becomes the first block of a new /// open cluster). - fn feed_frame(&mut self, name: &str, frame: Frame) -> anyhow::Result> { - let track = self.tracks.get(name).context("missing track")?; + fn feed_frame(&mut self, name: &str, frame: Frame) -> Result> { + let track = self.tracks.get(name).ok_or(Error::MissingTrack)?; let track_number = track.track_number; let kind = track.kind; let payload = &frame.payload; @@ -465,7 +477,7 @@ impl Export { .timestamp .as_millis() .try_into() - .context("timestamp doesn't fit in u64 ms")?; + .map_err(|_| Error::TimestampU64)?; let is_video = kind == TrackKind::Video; let keyframe = frame.keyframe; @@ -505,14 +517,16 @@ impl Export { } } -fn ensure_legacy(container: &Container, kind: &str, name: &str) -> anyhow::Result<()> { +fn ensure_legacy(container: &Container, kind: &str, name: &str) -> Result<()> { match container { // MKV emits raw codec payloads, so it accepts both wire formats whose // frames are raw codec bitstreams (Legacy varint, LOC properties). Container::Legacy | Container::Loc => Ok(()), - Container::Cmaf { .. } => { - anyhow::bail!("MKV export does not support CMAF {} track '{}'", kind, name); + Container::Cmaf { .. } => Err(Error::UnsupportedCmafTrack { + kind: kind.to_string(), + name: name.to_string(), } + .into()), } } @@ -520,7 +534,7 @@ fn build_video_track_entry( track_number: u64, config: &VideoConfig, description: Option<&Bytes>, -) -> anyhow::Result { +) -> Result { // The description came from either the catalog (avc1/hvc1 sources) or // the codec transform (Avc3/Hev1 sources synthesizing it from inline params). let codec_private = description.map(|b| b.to_vec()); @@ -530,14 +544,14 @@ fn build_video_track_entry( VideoCodec::VP9(_) => ("V_VP9", None), VideoCodec::AV1(_) => ("V_AV1", codec_private), VideoCodec::H264(_) => { - let avcc = codec_private.context("H.264 track missing AVCDecoderConfigurationRecord")?; + let avcc = codec_private.ok_or(Error::MissingH264Avcc)?; ("V_MPEG4/ISO/AVC", Some(avcc)) } VideoCodec::H265(_) => { - let hvcc = codec_private.context("H.265 track missing HEVCDecoderConfigurationRecord")?; + let hvcc = codec_private.ok_or(Error::MissingH265Hvcc)?; ("V_MPEGH/ISO/HEVC", Some(hvcc)) } - other => anyhow::bail!("MKV export does not support video codec {:?}", other), + other => return Err(Error::UnsupportedVideoExport(format!("{:?}", other)).into()), }; let mut video_children: Vec = Vec::new(); @@ -564,7 +578,7 @@ fn build_video_track_entry( Ok(MatroskaSpec::TrackEntry(Master::Full(entry))) } -fn build_audio_track_entry(track_number: u64, config: &AudioConfig) -> anyhow::Result { +fn build_audio_track_entry(track_number: u64, config: &AudioConfig) -> Result { let (codec_id, codec_private) = match &config.codec { AudioCodec::Opus => ( "A_OPUS", @@ -583,11 +597,11 @@ fn build_audio_track_entry(track_number: u64, config: &AudioConfig) -> anyhow::R config .description .as_ref() - .context("AAC track missing AudioSpecificConfig (description)")? + .ok_or(Error::MissingAacDescription)? .to_vec(), ), ), - other => anyhow::bail!("MKV export does not support audio codec {:?}", other), + other => return Err(Error::UnsupportedAudioExport(format!("{:?}", other)).into()), }; let entry = vec![ diff --git a/rs/moq-mux/src/container/mkv/import.rs b/rs/moq-mux/src/container/mkv/import.rs index 61b2a0877..73993c44a 100644 --- a/rs/moq-mux/src/container/mkv/import.rs +++ b/rs/moq-mux/src/container/mkv/import.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::io::Cursor; -use anyhow::Context; +use crate::Result; use bytes::{Buf, Bytes, BytesMut}; use hang::catalog::{AAC, AudioCodec, AudioConfig, Container, H264, H265, VP9, VideoCodec, VideoConfig}; use moq_net::Timestamp; @@ -13,6 +13,8 @@ use webm_iterable::errors::TagIteratorError; use webm_iterable::iterator::AllowableErrors; use webm_iterable::matroska_spec::{Master, MatroskaSpec, SimpleBlock}; +use super::Error; + /// Default Matroska TimestampScale: 1 ms (in nanoseconds). const DEFAULT_TIMESTAMP_SCALE_NS: u64 = 1_000_000; @@ -86,7 +88,7 @@ impl Import { } /// Decode from an asynchronous reader. Drives [`Self::decode`] in a loop. - pub async fn decode_from(&mut self, reader: &mut T) -> anyhow::Result<()> { + pub async fn decode_from(&mut self, reader: &mut T) -> Result<()> { let mut chunk = BytesMut::with_capacity(64 * 1024); loop { chunk.clear(); @@ -104,7 +106,7 @@ impl Import { /// The buffer is fully consumed on every call (data is moved into the internal /// scratch). Bytes that cannot yet form a complete top-level tag are retained /// for the next call. - pub fn decode>(&mut self, buf: &mut T) -> anyhow::Result<()> { + pub fn decode>(&mut self, buf: &mut T) -> Result<()> { // Move the input into our scratch buffer. while buf.has_remaining() { let chunk = buf.chunk(); @@ -123,7 +125,7 @@ impl Import { /// blocks). After parsing stops (UnexpectedEOF or end of buffer), bytes up to the start /// of the most-recently emitted top-level tag are discarded so memory does not grow /// unboundedly. - fn drain(&mut self) -> anyhow::Result<()> { + fn drain(&mut self) -> Result<()> { // Buffer master tags that are bounded and convenient to handle atomically. let buffered = [ MatroskaSpec::Ebml(Master::Start), @@ -160,8 +162,8 @@ impl Import { self.handle_tag(tag)?; } Some(Err(TagIteratorError::UnexpectedEOF { .. })) => break, - Some(Err(e)) => { - return Err(anyhow::Error::new(e).context("matroska parse error")); + Some(Err(_e)) => { + return Err(Error::MatroskaParse.into()); } None => { last_offset = snapshot.len(); @@ -182,7 +184,7 @@ impl Import { Ok(()) } - fn handle_tag(&mut self, tag: MatroskaSpec) -> anyhow::Result<()> { + fn handle_tag(&mut self, tag: MatroskaSpec) -> Result<()> { match tag { MatroskaSpec::Ebml(Master::Full(children)) => { self.handle_ebml(&children)?; @@ -214,7 +216,7 @@ impl Import { self.cluster_timestamp = v; } MatroskaSpec::SimpleBlock(ref data) => { - let sb = SimpleBlock::try_from(data.as_slice()).context("invalid SimpleBlock")?; + let sb = SimpleBlock::try_from(data.as_slice()).map_err(|_| Error::InvalidSimpleBlock)?; self.handle_block(sb.track, sb.timestamp, sb.keyframe, sb.raw_frame_data())?; } MatroskaSpec::BlockGroup(Master::Full(children)) => { @@ -226,19 +228,19 @@ impl Import { Ok(()) } - fn handle_ebml(&self, children: &[MatroskaSpec]) -> anyhow::Result<()> { + fn handle_ebml(&self, children: &[MatroskaSpec]) -> Result<()> { for c in children { if let MatroskaSpec::DocType(doc) = c { match doc.as_str() { "matroska" | "webm" => return Ok(()), - other => anyhow::bail!("unsupported EBML DocType: {}", other), + other => return Err(Error::UnsupportedDocType(other.to_string()).into()), } } } - anyhow::bail!("EBML header missing DocType"); + Err(Error::MissingDocType.into()) } - fn handle_tracks(&mut self, entries: Vec) -> anyhow::Result<()> { + fn handle_tracks(&mut self, entries: Vec) -> Result<()> { for entry in entries { if let MatroskaSpec::TrackEntry(Master::Full(children)) = entry { if let Err(e) = self.add_track(children) { @@ -249,7 +251,7 @@ impl Import { Ok(()) } - fn add_track(&mut self, children: Vec) -> anyhow::Result<()> { + fn add_track(&mut self, children: Vec) -> Result<()> { let mut track_number: Option = None; let mut track_type: Option = None; let mut codec_id: Option = None; @@ -269,9 +271,9 @@ impl Import { } } - let track_number = track_number.context("TrackEntry missing TrackNumber")?; - let track_type = track_type.context("TrackEntry missing TrackType")?; - let codec_id = codec_id.context("TrackEntry missing CodecID")?; + let track_number = track_number.ok_or(Error::MissingTrackNumber)?; + let track_type = track_type.ok_or(Error::MissingTrackType)?; + let codec_id = codec_id.ok_or(Error::MissingCodecId)?; // Matroska TrackType: 1 = video, 2 = audio. let (kind, suffix) = match track_type { @@ -313,7 +315,7 @@ impl Import { Ok(()) } - fn handle_block_group(&mut self, children: &[MatroskaSpec]) -> anyhow::Result<()> { + fn handle_block_group(&mut self, children: &[MatroskaSpec]) -> Result<()> { let mut block_data: Option<&[u8]> = None; let mut has_reference = false; @@ -332,13 +334,13 @@ impl Import { // `Block` has the same on-wire header as `SimpleBlock` minus the keyframe flag. // We parse it via `SimpleBlock::try_from` (which works on the raw slice) but // derive keyframe from the absence of `ReferenceBlock`. - let parsed = SimpleBlock::try_from(data).context("invalid Block payload")?; + let parsed = SimpleBlock::try_from(data).map_err(|_| Error::InvalidBlock)?; let keyframe = !has_reference; self.handle_block(parsed.track, parsed.timestamp, keyframe, parsed.raw_frame_data()) } - fn handle_block(&mut self, track_number: u64, rel_ts: i16, keyframe: bool, payload: &[u8]) -> anyhow::Result<()> { + fn handle_block(&mut self, track_number: u64, rel_ts: i16, keyframe: bool, payload: &[u8]) -> Result<()> { let Some(track) = self.tracks.get_mut(&track_number) else { // Unknown or skipped track. return Ok(()); @@ -347,7 +349,9 @@ impl Import { // Compute PTS in MKV's native nanosecond units and stamp it on the // timestamp at NANO scale so a passthrough re-emit preserves precision. let block_ticks = (self.cluster_timestamp as i64) + (rel_ts as i64); - anyhow::ensure!(block_ticks >= 0, "negative block timestamp"); + if block_ticks < 0 { + return Err(Error::NegativeBlockTimestamp.into()); + } // Skip blocks we've already emitted on a previous decode() pass (buffer replay). if let Some(last) = track.last_emitted_ticks @@ -359,7 +363,7 @@ impl Import { let pts_ns = (block_ticks as u64) .checked_mul(self.timestamp_scale_ns) - .context("timestamp overflow")?; + .ok_or(Error::TimestampOverflow)?; let timestamp = Timestamp::from_nanos(pts_ns)?; // Audio tracks: always treat as keyframes (matches fmp4 behavior). @@ -394,7 +398,7 @@ impl Import { /// /// Broadcast-wide: every track inside this MKV import advances together; per-track /// control is intentionally not exposed. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { + pub fn seek(&mut self, sequence: u64) -> Result<()> { for track in self.tracks.values_mut() { track.track.seek(sequence)?; } @@ -402,7 +406,7 @@ impl Import { } /// Finish all tracks, flushing current groups. - pub fn finish(&mut self) -> anyhow::Result<()> { + pub fn finish(&mut self) -> Result<()> { for track in self.tracks.values_mut() { if let Some(mut g) = track.group.take() { g.finish()?; @@ -433,7 +437,7 @@ fn build_video_config( codec_id: &str, codec_private: Option<&Bytes>, video_children: Option<&[MatroskaSpec]>, -) -> anyhow::Result { +) -> Result { let (width, height) = video_children .map(|cs| { let mut w = None; @@ -476,7 +480,7 @@ fn build_video_config( "V_MPEG4/ISO/AVC" => build_h264_config(codec_private)?, "V_MPEGH/ISO/HEVC" => build_h265_config(codec_private)?, "V_AV1" => build_av1_config(codec_private)?, - other => anyhow::bail!("unsupported video CodecID: {}", other), + other => return Err(Error::UnsupportedVideoCodec(other.to_string()).into()), }; if config.coded_width.is_none() { @@ -493,7 +497,7 @@ fn build_audio_config( codec_id: &str, codec_private: Option<&Bytes>, audio_children: Option<&[MatroskaSpec]>, -) -> anyhow::Result { +) -> Result { let mut sample_rate: u32 = 0; let mut channels: u32 = 0; @@ -527,7 +531,10 @@ fn build_audio_config( Ok(config) } "A_AAC" => { - let priv_data = codec_private.context("A_AAC missing CodecPrivate (AudioSpecificConfig)")?; + let priv_data = codec_private.ok_or(Error::MissingCodecPrivate { + codec_id: "A_AAC", + purpose: "AudioSpecificConfig", + })?; let mut cursor = priv_data.clone(); let cfg = crate::codec::aac::Config::parse(&mut cursor)?; @@ -548,12 +555,15 @@ fn build_audio_config( config.container = Container::Legacy; Ok(config) } - other => anyhow::bail!("unsupported audio CodecID: {}", other), + other => Err(Error::UnsupportedAudioCodec(other.to_string()).into()), } } -fn build_h264_config(codec_private: Option<&Bytes>) -> anyhow::Result { - let avcc_bytes = codec_private.context("V_MPEG4/ISO/AVC missing CodecPrivate (AVCDecoderConfigurationRecord)")?; +fn build_h264_config(codec_private: Option<&Bytes>) -> Result { + let avcc_bytes = codec_private.ok_or(Error::MissingCodecPrivate { + codec_id: "V_MPEG4/ISO/AVC", + purpose: "AVCDecoderConfigurationRecord", + })?; let avcc = crate::codec::h264::Avcc::parse(avcc_bytes)?; let mut config = VideoConfig::new(H264 { @@ -569,10 +579,13 @@ fn build_h264_config(codec_private: Option<&Bytes>) -> anyhow::Result) -> anyhow::Result { - let hvcc_data = codec_private.context("V_MPEGH/ISO/HEVC missing CodecPrivate (HEVCDecoderConfigurationRecord)")?; +fn build_h265_config(codec_private: Option<&Bytes>) -> Result { + let hvcc_data = codec_private.ok_or(Error::MissingCodecPrivate { + codec_id: "V_MPEGH/ISO/HEVC", + purpose: "HEVCDecoderConfigurationRecord", + })?; let mut cursor = Cursor::new(hvcc_data.as_ref()); - let hvcc = mp4_atom::Hvcc::decode_body(&mut cursor).context("invalid HEVCDecoderConfigurationRecord")?; + let hvcc = mp4_atom::Hvcc::decode_body(&mut cursor).map_err(|_| Error::InvalidHvcc)?; let mut description = BytesMut::new(); hvcc.encode_body(&mut description)?; @@ -591,10 +604,13 @@ fn build_h265_config(codec_private: Option<&Bytes>) -> anyhow::Result) -> anyhow::Result { - let av1c_data = codec_private.context("V_AV1 missing CodecPrivate (AV1CodecConfigurationRecord)")?; +fn build_av1_config(codec_private: Option<&Bytes>) -> Result { + let av1c_data = codec_private.ok_or(Error::MissingCodecPrivate { + codec_id: "V_AV1", + purpose: "AV1CodecConfigurationRecord", + })?; let mut cursor = Cursor::new(av1c_data.as_ref()); - let av1c = mp4_atom::Av1c::decode_body(&mut cursor).context("invalid AV1CodecConfigurationRecord")?; + let av1c = mp4_atom::Av1c::decode_body(&mut cursor).map_err(|_| Error::InvalidAv1c)?; let mut description = BytesMut::new(); av1c.encode_body(&mut description)?; diff --git a/rs/moq-mux/src/container/mkv/mod.rs b/rs/moq-mux/src/container/mkv/mod.rs index 2bd85f43a..862b6c392 100644 --- a/rs/moq-mux/src/container/mkv/mod.rs +++ b/rs/moq-mux/src/container/mkv/mod.rs @@ -14,3 +14,112 @@ pub use import::*; mod export_test; #[cfg(test)] mod import_test; + +/// MKV parsing and emission errors. +#[derive(Debug, Clone, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + #[error("unsupported EBML DocType: {0}")] + UnsupportedDocType(String), + + #[error("EBML header missing DocType")] + MissingDocType, + + #[error("invalid SimpleBlock")] + InvalidSimpleBlock, + + #[error("invalid Block payload")] + InvalidBlock, + + #[error("negative block timestamp")] + NegativeBlockTimestamp, + + #[error("timestamp overflow")] + TimestampOverflow, + + #[error("TrackEntry missing TrackNumber")] + MissingTrackNumber, + + #[error("TrackEntry missing TrackType")] + MissingTrackType, + + #[error("TrackEntry missing CodecID")] + MissingCodecId, + + #[error("unsupported video CodecID: {0}")] + UnsupportedVideoCodec(String), + + #[error("unsupported audio CodecID: {0}")] + UnsupportedAudioCodec(String), + + #[error("{codec_id} missing CodecPrivate ({purpose})")] + MissingCodecPrivate { + codec_id: &'static str, + purpose: &'static str, + }, + + #[error("invalid HEVCDecoderConfigurationRecord")] + InvalidHvcc, + + #[error("invalid AV1CodecConfigurationRecord")] + InvalidAv1c, + + #[error("MKV track layout changed after header was emitted: track '{0}' added")] + HeaderAddedTrack(String), + + #[error("MKV track layout changed after header was emitted: track '{0}' removed")] + HeaderRemovedTrack(String), + + #[error("MKV export does not support CMAF {kind} track '{name}'")] + UnsupportedCmafTrack { kind: String, name: String }, + + #[error("MKV export does not support video codec {0}")] + UnsupportedVideoExport(String), + + #[error("MKV export does not support audio codec {0}")] + UnsupportedAudioExport(String), + + #[error("AAC track missing AudioSpecificConfig (description)")] + MissingAacDescription, + + #[error("H.264 track missing AVCDecoderConfigurationRecord")] + MissingH264Avcc, + + #[error("H.265 track missing HEVCDecoderConfigurationRecord")] + MissingH265Hvcc, + + #[error("cluster underflow")] + ClusterUnderflow, + + #[error("block timestamp doesn't fit in i16")] + BlockTimestampOverflow, + + #[error("missing track")] + MissingTrack, + + #[error("timestamp doesn't fit in u64 ms")] + TimestampU64, + + #[error("video track {0} missing in tracks map")] + MissingVideoTrack(String), + + #[error("audio track {0} missing in tracks map")] + MissingAudioTrack(String), + + #[error("no catalog snapshot")] + NoCatalogSnapshot, + + #[error("matroska parse error")] + MatroskaParse, + + #[error("matroska write error: {0}")] + MatroskaWrite(std::sync::Arc), +} + +impl From for Error { + fn from(err: webm_iterable::errors::TagWriterError) -> Self { + Error::MatroskaWrite(std::sync::Arc::new(err)) + } +} + +pub type Result = std::result::Result; diff --git a/rs/moq-mux/src/container/source.rs b/rs/moq-mux/src/container/source.rs index 9363d6ee5..9644b932b 100644 --- a/rs/moq-mux/src/container/source.rs +++ b/rs/moq-mux/src/container/source.rs @@ -50,7 +50,7 @@ impl CatalogSource { }) } - pub(crate) fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { + pub(crate) fn poll_next(&mut self, waiter: &conducer::Waiter) -> Poll>> { match self { Self::Hang(c) => c.poll_next(waiter).map_err(Into::into), Self::Msf(c) => c.poll_next(waiter), @@ -72,10 +72,10 @@ impl VideoTransform { } } - fn transform(&mut self, payload: Bytes) -> anyhow::Result> { + fn transform(&mut self, payload: Bytes) -> crate::Result> { match self { - VideoTransform::Avc1(t) => t.transform(payload), - VideoTransform::Hvc1(t) => t.transform(payload), + VideoTransform::Avc1(t) => Ok(t.transform(payload)?), + VideoTransform::Hvc1(t) => Ok(t.transform(payload)?), } } } @@ -150,12 +150,12 @@ impl ExportSource { /// Parameter-only frames (SPS/PPS-only inputs to the Avc3 transform) are /// absorbed and the next frame is polled. Returns `Ready(None)` at /// end-of-track. - pub fn poll_read(&mut self, waiter: &conducer::Waiter) -> Poll>> { + pub fn poll_read(&mut self, waiter: &conducer::Waiter) -> Poll>> { loop { let frame = match self.consumer.poll_read(waiter) { Poll::Ready(Ok(Some(f))) => f, Poll::Ready(Ok(None)) => return Poll::Ready(Ok(None)), - Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => return Poll::Pending, }; diff --git a/rs/moq-mux/src/error.rs b/rs/moq-mux/src/error.rs index 3961f89b6..89bf7caca 100644 --- a/rs/moq-mux/src/error.rs +++ b/rs/moq-mux/src/error.rs @@ -1,9 +1,10 @@ /// Errors from moq-mux operations. /// -/// Most variants are simple delegations to underlying layers — [`moq_net::Error`] for -/// transport / pub-sub failures, [`hang::Error`] for catalog/codec parsing, and -/// [`fmp4::Error`](crate::container::fmp4::Error) for CMAF wire-format problems. -#[derive(Debug, thiserror::Error)] +/// Most variants are delegations to underlying layers — [`moq_net::Error`] for +/// transport / pub-sub failures, [`hang::Error`] for catalog/codec parsing, the +/// per-format Errors for container shape problems, and the per-codec Errors for +/// bitstream parsing problems. +#[derive(Debug, Clone, thiserror::Error)] #[non_exhaustive] pub enum Error { /// Error from the underlying moq-net transport. @@ -18,9 +19,85 @@ pub enum Error { #[error("cmaf: {0}")] Cmaf(#[from] crate::container::fmp4::Error), + /// Error parsing or building MKV / WebM streams. + #[error("mkv: {0}")] + Mkv(#[from] crate::container::mkv::Error), + + /// Error during HLS ingest. + #[error("hls: {0}")] + Hls(#[from] crate::container::hls::Error), + + /// Error decoding the MSF catalog. + #[error("msf: {0}")] + Msf(#[from] crate::catalog::msf::Error), + /// Error parsing or building LOC frames. #[error("loc: {0}")] Loc(#[from] moq_loc::Error), + + /// Error parsing an Annex B NAL stream. + #[error("annexb: {0}")] + Annexb(#[from] crate::codec::annexb::Error), + + /// Error parsing AAC. + #[error("aac: {0}")] + Aac(#[from] crate::codec::aac::Error), + + /// Error parsing Opus. + #[error("opus: {0}")] + Opus(#[from] crate::codec::opus::Error), + + /// Error parsing H.264. + #[error("h264: {0}")] + H264(#[from] crate::codec::h264::Error), + + /// Error parsing H.265. + #[error("h265: {0}")] + H265(#[from] crate::codec::h265::Error), + + /// Error parsing AV1. + #[error("av1: {0}")] + Av1(#[from] crate::codec::av1::Error), + + /// Timestamp overflow when converting between timescales. + #[error("timestamp overflow")] + TimestampOverflow(#[from] moq_net::TimeOverflow), + + /// Error decoding or encoding an mp4 atom. + #[error("mp4: {0}")] + Mp4(std::sync::Arc), + + /// I/O error. + #[error("io: {0}")] + Io(std::sync::Arc), + + /// URL parse error. + #[error("url: {0}")] + Url(#[from] url::ParseError), + + /// Unknown media format. + #[error("unknown format: {0}")] + UnknownFormat(String), + + /// Buffer was not fully consumed. + #[error("buffer was not fully consumed")] + BufferNotConsumed, + + /// Importer dispatcher cannot return a single track for multi-track containers. + #[error("{0} can contain multiple tracks")] + MultipleTracks(&'static str), +} + +impl From for Error { + fn from(err: mp4_atom::Error) -> Self { + Error::Mp4(std::sync::Arc::new(err)) + } +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::Io(std::sync::Arc::new(err)) + } } /// A Result type alias for moq-mux operations. diff --git a/rs/moq-mux/src/import.rs b/rs/moq-mux/src/import.rs index 80bd3b0a5..ef0af0bdc 100644 --- a/rs/moq-mux/src/import.rs +++ b/rs/moq-mux/src/import.rs @@ -10,9 +10,9 @@ use std::{fmt, str::FromStr}; -use anyhow::Context; use bytes::Buf; -use hang::Error; + +use crate::Result; /// The supported framed formats (known frame boundaries). #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -37,9 +37,9 @@ pub enum FramedFormat { } impl FromStr for FramedFormat { - type Err = Error; + type Err = crate::Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> std::result::Result { match s { "avc1" | "avcc" => Ok(FramedFormat::Avc1), "avc3" | "h264" => Ok(FramedFormat::Avc3), @@ -49,7 +49,7 @@ impl FromStr for FramedFormat { "aac" => Ok(FramedFormat::Aac), "opus" => Ok(FramedFormat::Opus), "mkv" | "webm" | "matroska" => Ok(FramedFormat::Mkv), - _ => Err(Error::UnknownFormat(s.to_string())), + _ => Err(crate::Error::UnknownFormat(s.to_string())), } } } @@ -111,7 +111,7 @@ impl Framed { catalog: crate::catalog::hang::Producer, format: FramedFormat, buf: &mut T, - ) -> anyhow::Result { + ) -> Result { use crate::codec::h264::Mode as H264Mode; let decoder = match format { FramedFormat::Avc1 => { @@ -154,13 +154,15 @@ impl Framed { } }; - anyhow::ensure!(!buf.has_remaining(), "buffer was not fully consumed"); + if buf.has_remaining() { + return Err(crate::Error::BufferNotConsumed); + } Ok(Self { decoder }) } /// Finish the decoder, flushing any buffered data. - pub fn finish(&mut self) -> anyhow::Result<()> { + pub fn finish(&mut self) -> Result<()> { match self.decoder { FramedKind::H264(ref mut decoder) => decoder.finish(), FramedKind::Fmp4(ref mut decoder) => decoder.finish(), @@ -173,7 +175,7 @@ impl Framed { } /// Close the current group and open the next one at `sequence`. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { + pub fn seek(&mut self, sequence: u64) -> Result<()> { match self.decoder { FramedKind::H264(ref mut decoder) => decoder.seek(sequence), FramedKind::Fmp4(ref mut decoder) => decoder.seek(sequence), @@ -186,24 +188,22 @@ impl Framed { } /// Return the single track produced by this importer. - pub fn track(&self) -> anyhow::Result<&moq_net::TrackProducer> { + pub fn track(&self) -> Result<&moq_net::TrackProducer> { match self.decoder { - FramedKind::H264(ref decoder) => decoder.track().context("H.264 track not yet created"), - FramedKind::Fmp4(_) => anyhow::bail!("fmp4 can contain multiple tracks"), + FramedKind::H264(ref decoder) => decoder + .track() + .ok_or_else(|| crate::codec::h264::Error::Avc3TrackNotCreated.into()), + FramedKind::Fmp4(_) => Err(crate::Error::MultipleTracks("fmp4")), FramedKind::Hev1(ref decoder) => decoder.track(), FramedKind::Av01(ref decoder) => decoder.track(), FramedKind::Aac(ref decoder) => Ok(decoder.track()), FramedKind::Opus(ref decoder) => Ok(decoder.track()), - FramedKind::Mkv(_) => anyhow::bail!("mkv can contain multiple tracks"), + FramedKind::Mkv(_) => Err(crate::Error::MultipleTracks("mkv")), } } /// Decode a frame from the given buffer. - pub fn decode_frame>( - &mut self, - buf: &mut T, - pts: Option, - ) -> anyhow::Result<()> { + pub fn decode_frame>(&mut self, buf: &mut T, pts: Option) -> Result<()> { match self.decoder { FramedKind::H264(ref mut decoder) => decoder.decode_frame(buf, pts)?, FramedKind::Fmp4(ref mut decoder) => decoder.decode(buf)?, @@ -217,7 +217,9 @@ impl Framed { } } - anyhow::ensure!(!buf.has_remaining(), "buffer was not fully consumed"); + if buf.has_remaining() { + return Err(crate::Error::BufferNotConsumed); + } Ok(()) } @@ -261,16 +263,16 @@ pub enum StreamFormat { } impl FromStr for StreamFormat { - type Err = Error; + type Err = crate::Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> std::result::Result { match s { "avc3" | "h264" => Ok(StreamFormat::Avc3), "hev1" => Ok(StreamFormat::Hev1), "fmp4" | "cmaf" => Ok(StreamFormat::Fmp4), "av01" | "av1" | "av1c" | "av1C" => Ok(StreamFormat::Av01), "mkv" | "webm" | "matroska" => Ok(StreamFormat::Mkv), - _ => Err(Error::UnknownFormat(s.to_string())), + _ => Err(crate::Error::UnknownFormat(s.to_string())), } } } @@ -312,7 +314,7 @@ impl Stream { broadcast: moq_net::BroadcastProducer, catalog: crate::catalog::hang::Producer, format: StreamFormat, - ) -> anyhow::Result { + ) -> Result { use crate::codec::h264::Mode as H264Mode; let decoder = match format { StreamFormat::Avc3 => { @@ -332,7 +334,7 @@ impl Stream { /// This is not required for self-describing formats like fMP4 or AVC3. /// /// The buffer will be fully consumed, or an error will be returned. - pub fn initialize>(&mut self, buf: &mut T) -> anyhow::Result<()> { + pub fn initialize>(&mut self, buf: &mut T) -> Result<()> { match self.decoder { StreamKind::Avc3(ref mut decoder) => decoder.initialize(buf)?, StreamKind::Fmp4(ref mut decoder) => decoder.decode(buf)?, @@ -341,13 +343,15 @@ impl Stream { StreamKind::Mkv(ref mut decoder) => decoder.decode(buf)?, } - anyhow::ensure!(!buf.has_remaining(), "buffer was not fully consumed"); + if buf.has_remaining() { + return Err(crate::Error::BufferNotConsumed); + } Ok(()) } /// Decode a stream of data from the given buffer. - pub fn decode_stream>(&mut self, buf: &mut T) -> anyhow::Result<()> { + pub fn decode_stream>(&mut self, buf: &mut T) -> Result<()> { match self.decoder { StreamKind::Avc3(ref mut decoder) => decoder.decode_stream(buf, None), StreamKind::Fmp4(ref mut decoder) => decoder.decode(buf), @@ -358,7 +362,7 @@ impl Stream { } /// Finish the decoder, flushing any buffered data. - pub fn finish(&mut self) -> anyhow::Result<()> { + pub fn finish(&mut self) -> Result<()> { match self.decoder { StreamKind::Avc3(ref mut decoder) => decoder.finish(), StreamKind::Fmp4(ref mut decoder) => decoder.finish(), @@ -369,7 +373,7 @@ impl Stream { } /// Close the current group and open the next one at `sequence`. - pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> { + pub fn seek(&mut self, sequence: u64) -> Result<()> { match self.decoder { StreamKind::Avc3(ref mut decoder) => decoder.seek(sequence), StreamKind::Fmp4(ref mut decoder) => decoder.seek(sequence), diff --git a/swift/justfile b/swift/justfile index 4a011add9..e503385cc 100644 --- a/swift/justfile +++ b/swift/justfile @@ -9,12 +9,14 @@ default: just check # Build moq-ffi for the host, regenerate uniffi bindings, run swift test. + # Skips cleanly on non-macOS hosts. check: bash scripts/check.sh # Assemble the XCFramework + Package.swift from per-target moq-ffi # binaries + bindings. Used by .github/workflows/release-swift.yml; see + # swift/README.md for the expected --lib-dir layout. package *args: bash scripts/package.sh {{ args }} @@ -22,6 +24,7 @@ package *args: # Full Swift CI: `check` already builds moq-ffi and runs swift test. # Takes a newline-separated list of changed files; skips if FILES is # non-empty and none match the Swift scope. Run `just swift ci` (no + # FILES) to force-run. ci FILES="": #!/usr/bin/env bash