diff --git a/Cargo.lock b/Cargo.lock index fc0a87e12..df3e9c4be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3724,6 +3724,7 @@ name = "moq-loc" version = "0.1.0" dependencies = [ "bytes", + "moq-net", "thiserror 2.0.18", ] diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index b3a76989c..833da8fa7 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -103,7 +103,7 @@ async fn run_broadcast(origin: moq_net::OriginProducer) -> anyhow::Result<()> { // Not real frames of course. The first frame is a keyframe and starts the first group. let frame = moq_mux::container::Frame { - timestamp: moq_mux::container::Timestamp::from_secs(1).unwrap(), + timestamp: moq_net::Timestamp::from_secs(1).unwrap(), payload: Bytes::from_static(b"keyframe NAL data"), keyframe: true, }; @@ -112,7 +112,7 @@ async fn run_broadcast(origin: moq_net::OriginProducer) -> anyhow::Result<()> { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; let frame = moq_mux::container::Frame { - timestamp: moq_mux::container::Timestamp::from_secs(2).unwrap(), + timestamp: moq_net::Timestamp::from_secs(2).unwrap(), payload: Bytes::from_static(b"delta NAL data"), keyframe: false, }; @@ -122,7 +122,7 @@ async fn run_broadcast(origin: moq_net::OriginProducer) -> anyhow::Result<()> { // Marking this frame as a keyframe closes the current group and starts a new one. let frame = moq_mux::container::Frame { - timestamp: moq_mux::container::Timestamp::from_secs(3).unwrap(), + timestamp: moq_net::Timestamp::from_secs(3).unwrap(), payload: Bytes::from_static(b"keyframe NAL data"), keyframe: true, }; diff --git a/rs/hang/src/catalog/audio/mod.rs b/rs/hang/src/catalog/audio/mod.rs index 4a7d0767b..e170da503 100644 --- a/rs/hang/src/catalog/audio/mod.rs +++ b/rs/hang/src/catalog/audio/mod.rs @@ -9,7 +9,7 @@ use std::collections::{BTreeMap, btree_map}; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use serde_with::{DisplayFromStr, hex::Hex}; +use serde_with::{DisplayFromStr, DurationMilliSeconds, hex::Hex}; use crate::catalog::Container; @@ -92,10 +92,13 @@ pub struct AudioConfig { /// The player's jitter buffer should be larger than this value. /// If not provided, the player should assume each frame is flushed immediately. /// + /// Serialized as an integer number of milliseconds (sub-ms precision is truncated). + /// /// NOTE: The audio "frame" duration depends on the codec, sample rate, etc. /// ex: AAC often uses 1024 samples per frame, so at 44100Hz, this would be 1024/44100 = 23ms + #[serde_as(as = "Option>")] #[serde(default)] - pub jitter: Option, + pub jitter: Option, } impl AudioConfig { diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index 22e5925b0..578233ecc 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -164,4 +164,92 @@ mod test { let output = decoded.to_string().expect("failed to encode"); assert_eq!(encoded, output, "wrong encoded output"); } + + /// Lock in the on-wire shape of the jitter field: a bare integer number + /// of milliseconds. If `Option` ever loses the `duration_millis` + /// serde adapter, this regresses to serde's default `{secs, nanos}` shape. + #[test] + fn jitter_serialized_as_millis() { + let mut encoded = r#"{ + "video": { + "renditions": { + "video": { + "codec": "avc1.64001f", + "container": {"kind": "legacy"}, + "jitter": 100 + } + } + }, + "audio": { + "renditions": { + "audio": { + "codec": "opus", + "sampleRate": 48000, + "numberOfChannels": 2, + "container": {"kind": "legacy"}, + "jitter": 40 + } + } + } + }"# + .to_string(); + encoded.retain(|c| !c.is_whitespace()); + + let mut video_renditions = BTreeMap::new(); + video_renditions.insert( + "video".to_string(), + VideoConfig { + codec: H264 { + profile: 0x64, + constraints: 0x00, + level: 0x1f, + inline: false, + } + .into(), + description: None, + coded_width: None, + coded_height: None, + display_ratio_width: None, + display_ratio_height: None, + bitrate: None, + framerate: None, + optimize_for_latency: None, + container: Container::Legacy, + jitter: Some(std::time::Duration::from_millis(100)), + }, + ); + + let mut audio_renditions = BTreeMap::new(); + audio_renditions.insert( + "audio".to_string(), + AudioConfig { + codec: Opus, + sample_rate: 48_000, + channel_count: 2, + bitrate: None, + description: None, + container: Container::Legacy, + jitter: Some(std::time::Duration::from_millis(40)), + }, + ); + + let catalog = Catalog { + video: Video { + renditions: video_renditions, + display: None, + rotation: None, + flip: None, + }, + audio: Audio { + renditions: audio_renditions, + }, + ..Default::default() + }; + + let decoded = Catalog::from_str(&encoded).expect("failed to decode"); + assert_eq!(catalog, decoded, "decode mismatch"); + + let output = catalog.to_string().expect("failed to encode"); + assert_eq!(encoded, output, "encode mismatch"); + } } diff --git a/rs/hang/src/catalog/video/mod.rs b/rs/hang/src/catalog/video/mod.rs index 0e7860fc0..0db581a25 100644 --- a/rs/hang/src/catalog/video/mod.rs +++ b/rs/hang/src/catalog/video/mod.rs @@ -14,7 +14,7 @@ use std::collections::{BTreeMap, btree_map}; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use serde_with::{DisplayFromStr, hex::Hex}; +use serde_with::{DisplayFromStr, DurationMilliSeconds, hex::Hex}; use crate::catalog::Container; @@ -141,12 +141,15 @@ pub struct VideoConfig { /// The player's jitter buffer should be larger than this value. /// If not provided, the player should assume each frame is flushed immediately. /// + /// Serialized as an integer number of milliseconds (sub-ms precision is truncated). + /// /// ex: /// - If each frame is flushed immediately, this would be 1000/fps. /// - If there can be up to 3 b-frames in a row, this would be 3 * 1000/fps. /// - If frames are buffered into 2s segments, this would be 2s. + #[serde_as(as = "Option>")] #[serde(default)] - pub jitter: Option, + pub jitter: Option, } impl VideoConfig { diff --git a/rs/hang/src/container/frame.rs b/rs/hang/src/container/frame.rs index 6cbd2cd93..fbc8ca127 100644 --- a/rs/hang/src/container/frame.rs +++ b/rs/hang/src/container/frame.rs @@ -1,9 +1,16 @@ use bytes::{Buf, Bytes, BytesMut}; use derive_more::Debug; +use moq_net::VarInt; use crate::Error; -pub type Timestamp = moq_net::Timescale<1_000_000>; +pub use moq_net::{Timescale, Timestamp}; + +/// Canonical timescale for the hang legacy wire format: microseconds. +/// +/// The legacy container's on-wire timestamp is a single VarInt with no scale tag, +/// so encoders normalize to this scale and decoders attach it. +pub const TIMESCALE: Timescale = Timescale::MICRO; /// A media frame with a timestamp and codec-specific payload. /// @@ -30,9 +37,16 @@ pub struct Frame { impl Frame { /// Encode the frame to the given group as a single moq-lite frame: /// VarInt timestamp prefix followed by the raw codec payload. + /// + /// The timestamp is normalized to [`TIMESCALE`] (microseconds) on the wire so + /// peers using a different source scale (e.g. nanoseconds from MKV) can decode + /// without knowing the producer's internal scale. pub fn encode(&self, group: &mut moq_net::GroupProducer) -> Result<(), Error> { + let timestamp = self.timestamp.convert(TIMESCALE)?; + let value = VarInt::try_from(timestamp.value()).map_err(moq_net::Error::from)?; + let mut header = BytesMut::new(); - self.timestamp.encode(&mut header).map_err(moq_net::Error::from)?; + value.encode_quic(&mut header).map_err(moq_net::Error::from)?; let size = header.len() + self.payload.len(); @@ -45,8 +59,12 @@ impl Frame { } /// Decode a frame from raw bytes (VarInt timestamp prefix + payload). + /// + /// Attaches [`TIMESCALE`] (microseconds) to the decoded timestamp, matching what + /// [`Self::encode`] writes. pub fn decode(mut buf: impl Buf) -> Result { - let timestamp = Timestamp::decode(&mut buf)?; + let value: u64 = VarInt::decode_quic(&mut buf).map_err(moq_net::Error::from)?.into(); + let timestamp = Timestamp::new(value, TIMESCALE)?; let payload = buf.copy_to_bytes(buf.remaining()); Ok(Self { timestamp, payload }) diff --git a/rs/moq-audio/src/producer.rs b/rs/moq-audio/src/producer.rs index d0f8d8652..8a5b96c2e 100644 --- a/rs/moq-audio/src/producer.rs +++ b/rs/moq-audio/src/producer.rs @@ -2,7 +2,8 @@ use bytes::Bytes; -use moq_mux::container::{Frame as MuxFrame, Timestamp}; +use moq_mux::container::Frame as MuxFrame; +use moq_net::Timestamp; use crate::codec::{Encoder, EncoderInput, EncoderOutput}; use crate::resample::Resampler; diff --git a/rs/moq-gst/src/source/imp.rs b/rs/moq-gst/src/source/imp.rs index 013ec73a9..b1985904e 100644 --- a/rs/moq-gst/src/source/imp.rs +++ b/rs/moq-gst/src/source/imp.rs @@ -525,10 +525,13 @@ async fn run_track_pump( let buffer_mut = buffer.get_mut().unwrap(); let pts = match reference_ts { - Some(reference) => { - let delta: Duration = (timestamp - reference).into(); - gst::ClockTime::from_nseconds(delta.as_nanos() as u64) - } + Some(reference) => match timestamp.checked_sub(reference) { + Ok(delta) => { + let d: Duration = delta.into(); + gst::ClockTime::from_nseconds(d.as_nanos() as u64) + } + Err(_) => gst::ClockTime::ZERO, + }, None => { reference_ts = Some(timestamp); gst::ClockTime::ZERO diff --git a/rs/moq-loc/Cargo.toml b/rs/moq-loc/Cargo.toml index 221755f9d..22a8f261a 100644 --- a/rs/moq-loc/Cargo.toml +++ b/rs/moq-loc/Cargo.toml @@ -17,4 +17,5 @@ doctest = false [dependencies] bytes = "1" +moq-net = { workspace = true } thiserror = "2" diff --git a/rs/moq-loc/src/lib.rs b/rs/moq-loc/src/lib.rs index efde0b057..5ac039b9b 100644 --- a/rs/moq-loc/src/lib.rs +++ b/rs/moq-loc/src/lib.rs @@ -22,18 +22,15 @@ //! encode. Public properties are not handled here. They belong in the MoQ //! object header and are stripped by the transport layer. //! -//! Varint encoding is QUIC-style throughout, matching the rest of the moq -//! stack. +//! Varint encoding is QUIC-style throughout via [`moq_net::VarInt`]. use bytes::{Buf, Bytes, BytesMut}; +use moq_net::{BoundsExceeded, DecodeError, EncodeError, VarInt}; /// Property IDs recognized by this implementation. const PROP_TIMESTAMP: u64 = 0x06; const PROP_TIMESCALE: u64 = 0x08; -/// Maximum value representable as a 62-bit QUIC varint. -const VARINT_MAX: u64 = (1u64 << 62) - 1; - /// A decoded LOC frame. #[derive(Clone, Debug)] pub struct Frame { @@ -67,8 +64,29 @@ pub enum Error { ShortBuffer, /// A value exceeds the 62-bit varint range. - #[error("value out of range")] - OutOfRange, + #[error("value out of range: {0}")] + OutOfRange(#[from] BoundsExceeded), +} + +// DecodeError / EncodeError intentionally collapse into ShortBuffer vs the +// caller's catch-all variant, so they stay as manual From impls; #[from] can't +// express that mapping. +impl From for Error { + fn from(err: DecodeError) -> Self { + match err { + DecodeError::Short => Self::ShortBuffer, + _ => Self::MalformedProperties, + } + } +} + +impl From for Error { + fn from(err: EncodeError) -> Self { + match err { + EncodeError::Short => Self::ShortBuffer, + _ => Self::OutOfRange(BoundsExceeded), + } + } } /// Decode a LOC frame. @@ -76,7 +94,7 @@ pub enum Error { /// Consumes the properties_length prefix, walks the bounded property block, /// and returns the remainder as `payload`. pub fn decode(mut buf: Bytes) -> Result { - let properties_length = read_varint(&mut buf)?; + let properties_length: u64 = VarInt::decode_quic(&mut buf)?.into(); let properties_length: usize = properties_length.try_into().map_err(|_| Error::MalformedProperties)?; if properties_length > buf.remaining() { @@ -91,7 +109,7 @@ pub fn decode(mut buf: Bytes) -> Result { let mut first = true; while props.has_remaining() { - let delta = read_varint(&mut props)?; + let delta: u64 = VarInt::decode_quic(&mut props)?.into(); let abs = if first { first = false; delta @@ -101,7 +119,7 @@ pub fn decode(mut buf: Bytes) -> Result { prev_type = abs; if abs % 2 == 0 { - let value = read_varint(&mut props)?; + let value: u64 = VarInt::decode_quic(&mut props)?.into(); match abs { PROP_TIMESTAMP => timestamp = Some(value), PROP_TIMESCALE => { @@ -113,7 +131,7 @@ pub fn decode(mut buf: Bytes) -> Result { _ => {} } } else { - let len = read_varint(&mut props)?; + let len: u64 = VarInt::decode_quic(&mut props)?.into(); let len: usize = len.try_into().map_err(|_| Error::MalformedProperties)?; if len > props.remaining() { return Err(Error::MalformedProperties); @@ -139,87 +157,26 @@ pub fn decode(mut buf: Bytes) -> Result { /// catalog timescale to interpret `timestamp`. pub fn encode(timestamp: u64, payload: &[u8]) -> Result { let mut props = BytesMut::with_capacity(16); - write_varint(&mut props, PROP_TIMESTAMP)?; - write_varint(&mut props, timestamp)?; + VarInt::try_from(PROP_TIMESTAMP)?.encode_quic(&mut props)?; + VarInt::try_from(timestamp)?.encode_quic(&mut props)?; let mut out = BytesMut::with_capacity(props.len() + payload.len() + 8); - write_varint(&mut out, props.len() as u64)?; + VarInt::try_from(props.len() as u64)?.encode_quic(&mut out)?; out.extend_from_slice(&props); out.extend_from_slice(payload); Ok(out.freeze()) } -/// Decode a QUIC-style varint (2-bit length tag in top bits). -fn read_varint(buf: &mut B) -> Result { - if !buf.has_remaining() { - return Err(Error::ShortBuffer); - } - let b = buf.get_u8(); - let tag = b >> 6; - let mut bytes = [0u8; 8]; - bytes[0] = b & 0b0011_1111; - let value = match tag { - 0b00 => u64::from(bytes[0]), - 0b01 => { - if buf.remaining() < 1 { - return Err(Error::ShortBuffer); - } - buf.copy_to_slice(&mut bytes[1..2]); - u64::from(u16::from_be_bytes(bytes[..2].try_into().unwrap())) - } - 0b10 => { - if buf.remaining() < 3 { - return Err(Error::ShortBuffer); - } - buf.copy_to_slice(&mut bytes[1..4]); - u64::from(u32::from_be_bytes(bytes[..4].try_into().unwrap())) - } - 0b11 => { - if buf.remaining() < 7 { - return Err(Error::ShortBuffer); - } - buf.copy_to_slice(&mut bytes[1..8]); - u64::from_be_bytes(bytes) - } - _ => unreachable!(), - }; - Ok(value) -} - -/// Encode a QUIC-style varint (2-bit length tag in top bits). -fn write_varint(buf: &mut B, value: u64) -> Result<(), Error> { - if value > VARINT_MAX { - return Err(Error::OutOfRange); - } - if value < (1u64 << 6) { - if buf.remaining_mut() < 1 { - return Err(Error::ShortBuffer); - } - buf.put_u8(value as u8); - } else if value < (1u64 << 14) { - if buf.remaining_mut() < 2 { - return Err(Error::ShortBuffer); - } - buf.put_u16(value as u16 | 0b01 << 14); - } else if value < (1u64 << 30) { - if buf.remaining_mut() < 4 { - return Err(Error::ShortBuffer); - } - buf.put_u32(value as u32 | 0b10 << 30); - } else { - if buf.remaining_mut() < 8 { - return Err(Error::ShortBuffer); - } - buf.put_u64(value | 0b11 << 62); - } - Ok(()) -} - #[cfg(test)] mod tests { use super::*; + /// Test helper: write a u64 as a QUIC varint into `buf`. + fn write_varint(buf: &mut BytesMut, value: u64) { + VarInt::try_from(value).unwrap().encode_quic(buf).unwrap(); + } + #[test] fn roundtrip() { let payload = Bytes::from_static(b"hello world"); @@ -235,13 +192,13 @@ mod tests { fn decode_per_frame_timescale() { // Manually craft: properties = [delta=0x06 timestamp=96000, delta=0x02 (abs=0x08) timescale=48000] let mut props = BytesMut::new(); - write_varint(&mut props, PROP_TIMESTAMP).unwrap(); - write_varint(&mut props, 96_000).unwrap(); - write_varint(&mut props, PROP_TIMESCALE - PROP_TIMESTAMP).unwrap(); // delta = 2 - write_varint(&mut props, 48_000).unwrap(); + write_varint(&mut props, PROP_TIMESTAMP); + write_varint(&mut props, 96_000); + write_varint(&mut props, PROP_TIMESCALE - PROP_TIMESTAMP); // delta = 2 + write_varint(&mut props, 48_000); let mut frame = BytesMut::new(); - write_varint(&mut frame, props.len() as u64).unwrap(); + write_varint(&mut frame, props.len() as u64); frame.extend_from_slice(&props); frame.extend_from_slice(b"payload"); @@ -255,14 +212,14 @@ mod tests { fn decode_skips_video_config() { // properties = [delta=0x06 timestamp=10, delta=0x07 (abs=0x0d, video config) bytes=[1,2,3]] let mut props = BytesMut::new(); - write_varint(&mut props, PROP_TIMESTAMP).unwrap(); - write_varint(&mut props, 10).unwrap(); - write_varint(&mut props, 0x0d - PROP_TIMESTAMP).unwrap(); // delta = 7 -> abs 0x0d (Video Config) - write_varint(&mut props, 3).unwrap(); // length + write_varint(&mut props, PROP_TIMESTAMP); + write_varint(&mut props, 10); + write_varint(&mut props, 0x0d - PROP_TIMESTAMP); // delta = 7 -> abs 0x0d (Video Config) + write_varint(&mut props, 3); // length props.extend_from_slice(&[0x01, 0x02, 0x03]); let mut frame = BytesMut::new(); - write_varint(&mut frame, props.len() as u64).unwrap(); + write_varint(&mut frame, props.len() as u64); frame.extend_from_slice(&props); frame.extend_from_slice(b"data"); @@ -276,11 +233,11 @@ mod tests { fn decode_missing_timestamp_errors() { // properties = [delta=0x08 timescale=1000], no timestamp let mut props = BytesMut::new(); - write_varint(&mut props, PROP_TIMESCALE).unwrap(); - write_varint(&mut props, 1000).unwrap(); + write_varint(&mut props, PROP_TIMESCALE); + write_varint(&mut props, 1000); let mut frame = BytesMut::new(); - write_varint(&mut frame, props.len() as u64).unwrap(); + write_varint(&mut frame, props.len() as u64); frame.extend_from_slice(&props); frame.extend_from_slice(b"x"); @@ -290,7 +247,7 @@ mod tests { #[test] fn decode_empty_properties_errors() { let mut frame = BytesMut::new(); - write_varint(&mut frame, 0).unwrap(); + write_varint(&mut frame, 0); frame.extend_from_slice(b"payload"); assert!(matches!(decode(frame.freeze()), Err(Error::MissingTimestamp))); @@ -300,13 +257,13 @@ mod tests { fn decode_rejects_zero_timescale() { // Per-frame 0x08 timescale of 0 is invalid (would divide by zero). let mut props = BytesMut::new(); - write_varint(&mut props, PROP_TIMESTAMP).unwrap(); - write_varint(&mut props, 10).unwrap(); - write_varint(&mut props, PROP_TIMESCALE - PROP_TIMESTAMP).unwrap(); - write_varint(&mut props, 0).unwrap(); + write_varint(&mut props, PROP_TIMESTAMP); + write_varint(&mut props, 10); + write_varint(&mut props, PROP_TIMESCALE - PROP_TIMESTAMP); + write_varint(&mut props, 0); let mut frame = BytesMut::new(); - write_varint(&mut frame, props.len() as u64).unwrap(); + write_varint(&mut frame, props.len() as u64); frame.extend_from_slice(&props); frame.extend_from_slice(b"x"); @@ -316,7 +273,7 @@ mod tests { #[test] fn decode_overflowing_properties_length_errors() { let mut frame = BytesMut::new(); - write_varint(&mut frame, 100).unwrap(); // claims 100 bytes of properties + write_varint(&mut frame, 100); // claims 100 bytes of properties frame.extend_from_slice(&[0x06]); // only 1 byte follows assert!(matches!(decode(frame.freeze()), Err(Error::MalformedProperties))); diff --git a/rs/moq-msf/src/lib.rs b/rs/moq-msf/src/lib.rs index 196bc6952..a298a3846 100644 --- a/rs/moq-msf/src/lib.rs +++ b/rs/moq-msf/src/lib.rs @@ -10,8 +10,10 @@ use std::fmt; use std::str::FromStr; +use std::time::Duration; use serde::{Deserialize, Serialize}; +use serde_with::DurationMilliSeconds; /// The default track name for the MSF catalog. pub const DEFAULT_NAME: &str = "catalog"; @@ -35,6 +37,7 @@ pub struct Catalog { /// then assign whichever optional fields they need; struct-literal /// construction (with or without `..base`) is not available outside this /// crate. +#[serde_with::serde_as] #[serde_with::skip_serializing_none] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[serde(rename_all = "camelCase")] @@ -94,8 +97,12 @@ pub struct Track { #[serde(rename = "maxObjSapStartingType")] pub max_obj_sap_starting_type: Option, - /// Jitter in milliseconds (non-standard extension, matches JS implementation). - pub jitter: Option, + /// Jitter (non-standard extension; not in the MSF/CMSF drafts). + /// + /// Serialized as a JSON integer number of milliseconds, matching the hang + /// catalog. Sub-ms precision isn't meaningful for jitter. + #[serde_as(as = "Option>")] + pub jitter: Option, } impl Catalog { @@ -438,7 +445,7 @@ mod test { alt_group: None, max_grp_sap_starting_type: Some(1), max_obj_sap_starting_type: Some(2), - jitter: Some(15.5), + jitter: Some(Duration::from_millis(15)), } } @@ -457,7 +464,7 @@ mod test { let track = &value["tracks"][0]; assert_eq!(track.get("maxGrpSapStartingType"), Some(&serde_json::json!(1))); assert_eq!(track.get("maxObjSapStartingType"), Some(&serde_json::json!(2))); - assert_eq!(track.get("jitter"), Some(&serde_json::json!(15.5))); + assert_eq!(track.get("jitter"), Some(&serde_json::json!(15))); // Snake-case names must NOT appear on the wire. assert!(track.get("max_grp_sap_starting_type").is_none()); @@ -503,6 +510,6 @@ mod test { assert_eq!(original, parsed); assert_eq!(parsed.tracks[0].max_grp_sap_starting_type, Some(1)); assert_eq!(parsed.tracks[0].max_obj_sap_starting_type, Some(2)); - assert_eq!(parsed.tracks[0].jitter, Some(15.5)); + assert_eq!(parsed.tracks[0].jitter, Some(Duration::from_millis(15))); } } diff --git a/rs/moq-mux/src/catalog/hang/producer.rs b/rs/moq-mux/src/catalog/hang/producer.rs index d25a173d8..b587b6c71 100644 --- a/rs/moq-mux/src/catalog/hang/producer.rs +++ b/rs/moq-mux/src/catalog/hang/producer.rs @@ -169,7 +169,7 @@ fn to_msf(catalog: &hang::Catalog) -> moq_msf::Catalog { track.alt_group = if has_multiple_video { Some(1) } else { None }; track.max_grp_sap_starting_type = sap_type; track.max_obj_sap_starting_type = sap_type; - track.jitter = config.jitter.map(|t| t.as_millis() as f64); + track.jitter = config.jitter; tracks.push(track); } @@ -200,7 +200,7 @@ fn to_msf(catalog: &hang::Catalog) -> moq_msf::Catalog { track.alt_group = if has_multiple_audio { Some(1) } else { None }; track.max_grp_sap_starting_type = Some(1); track.max_obj_sap_starting_type = Some(1); - track.jitter = config.jitter.map(|t| t.as_millis() as f64); + track.jitter = config.jitter; tracks.push(track); } @@ -375,14 +375,14 @@ mod test { video_config.coded_height = Some(720); video_config.framerate = Some(30.0); video_config.container = Container::Legacy; - video_config.jitter = Some(moq_net::Time::from_millis_unchecked(100)); + video_config.jitter = Some(std::time::Duration::from_millis(100)); let mut video_renditions = BTreeMap::new(); video_renditions.insert("video0".to_string(), video_config); let mut audio_config = AudioConfig::new(AudioCodec::Opus, 48_000, 2); audio_config.container = Container::Legacy; - audio_config.jitter = Some(moq_net::Time::from_millis_unchecked(40)); + audio_config.jitter = Some(std::time::Duration::from_millis(40)); let mut audio_renditions = BTreeMap::new(); audio_renditions.insert("audio0".to_string(), audio_config); @@ -407,13 +407,13 @@ mod test { // H.264 may carry B-frames, so SAP starting type is 2. assert_eq!(video.max_grp_sap_starting_type, Some(2)); assert_eq!(video.max_obj_sap_starting_type, Some(2)); - assert_eq!(video.jitter, Some(100.0)); + assert_eq!(video.jitter, Some(std::time::Duration::from_millis(100))); let audio = &msf.tracks[1]; assert_eq!(audio.role, Some(moq_msf::Role::Audio)); assert_eq!(audio.max_grp_sap_starting_type, Some(1)); assert_eq!(audio.max_obj_sap_starting_type, Some(1)); - assert_eq!(audio.jitter, Some(40.0)); + assert_eq!(audio.jitter, Some(std::time::Duration::from_millis(40))); } #[test] diff --git a/rs/moq-mux/src/catalog/msf/consumer.rs b/rs/moq-mux/src/catalog/msf/consumer.rs index a61b3b31e..bab7ba2b2 100644 --- a/rs/moq-mux/src/catalog/msf/consumer.rs +++ b/rs/moq-mux/src/catalog/msf/consumer.rs @@ -212,14 +212,7 @@ fn video_config_from_msf(track: &moq_msf::Track) -> anyhow::Result= 0.0) - .and_then(|v| moq_net::Time::from_millis(v as u64).ok()); + config.jitter = track.jitter; Ok(Some(config)) } @@ -256,14 +249,7 @@ fn audio_config_from_msf(track: &moq_msf::Track) -> anyhow::Result= 0.0) - .and_then(|v| moq_net::Time::from_millis(v as u64).ok()); + config.jitter = track.jitter; Ok(Some(config)) } diff --git a/rs/moq-mux/src/codec/aac/import.rs b/rs/moq-mux/src/codec/aac/import.rs index a9f28a8d8..51e1a8fe1 100644 --- a/rs/moq-mux/src/codec/aac/import.rs +++ b/rs/moq-mux/src/codec/aac/import.rs @@ -58,7 +58,7 @@ impl Import { Ok(()) } - pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { + pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { let pts = self.pts(pts)?; // Collect the input into a contiguous Bytes payload. @@ -84,15 +84,13 @@ impl Import { Ok(()) } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> anyhow::Result { if let Some(pts) = hint { return Ok(pts); } let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); - Ok(crate::container::Timestamp::from_micros( - zero.elapsed().as_micros() as u64 - )?) + Ok(moq_net::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) } } diff --git a/rs/moq-mux/src/codec/av1/import.rs b/rs/moq-mux/src/codec/av1/import.rs index c1afb5208..44998e86a 100644 --- a/rs/moq-mux/src/codec/av1/import.rs +++ b/rs/moq-mux/src/codec/av1/import.rs @@ -229,7 +229,7 @@ impl Import { pub fn decode_stream>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { let obus = ObuIterator::new(buf); @@ -246,7 +246,7 @@ impl Import { pub fn decode_frame>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { let pts = self.pts(pts)?; let mut obus = ObuIterator::new(buf); @@ -264,7 +264,7 @@ impl Import { Ok(()) } - fn decode_obu(&mut self, obu_data: Bytes, pts: Option) -> anyhow::Result<()> { + fn decode_obu(&mut self, obu_data: Bytes, pts: Option) -> anyhow::Result<()> { anyhow::ensure!(!obu_data.is_empty(), "OBU is too short"); // Parse OBU header - this consumes header + extension + LEB128 size @@ -347,7 +347,7 @@ impl Import { Ok(()) } - fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { + fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { if !self.current.contains_frame { return Ok(()); } @@ -398,15 +398,13 @@ impl Import { self.track.is_some() } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> anyhow::Result { if let Some(pts) = hint { return Ok(pts); } let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); - Ok(crate::container::Timestamp::from_micros( - zero.elapsed().as_micros() as u64 - )?) + Ok(moq_net::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) } } diff --git a/rs/moq-mux/src/codec/h264/import.rs b/rs/moq-mux/src/codec/h264/import.rs index f4c25b226..bc1b8e54f 100644 --- a/rs/moq-mux/src/codec/h264/import.rs +++ b/rs/moq-mux/src/codec/h264/import.rs @@ -206,7 +206,7 @@ impl Import { pub fn decode_stream>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { anyhow::ensure!(matches!(self.state, State::Avc3 { .. }), "decode_stream is avc3 only"); let pts = self.pts(pts)?; @@ -225,7 +225,7 @@ impl Import { pub fn decode_frame>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { match &self.state { State::Avc1 { .. } => self.decode_avc1(buf, pts), @@ -237,7 +237,7 @@ impl Import { fn decode_avc1>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { let State::Avc1 { length_size } = self.state else { unreachable!("checked by decode_frame") @@ -269,7 +269,7 @@ impl Import { fn decode_avc3_frame>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { let pts = self.pts(pts)?; let mut nals = NalIterator::new(buf); @@ -283,7 +283,7 @@ impl Import { Ok(()) } - fn decode_nal(&mut self, nal: Bytes, pts: Option) -> anyhow::Result<()> { + fn decode_nal(&mut self, nal: Bytes, pts: Option) -> anyhow::Result<()> { let header = nal.first().context("NAL unit is too short")?; let forbidden_zero_bit = (header >> 7) & 1; anyhow::ensure!(forbidden_zero_bit == 0, "forbidden zero bit is not zero"); @@ -395,7 +395,7 @@ impl Import { Ok(()) } - fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { + fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { let State::Avc3 { current, .. } = &mut self.state else { return Ok(()); }; @@ -465,14 +465,12 @@ impl Import { Ok(()) } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> anyhow::Result { if let Some(pts) = hint { return Ok(pts); } let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); - Ok(crate::container::Timestamp::from_micros( - zero.elapsed().as_micros() as u64 - )?) + Ok(moq_net::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) } } diff --git a/rs/moq-mux/src/codec/h265/import.rs b/rs/moq-mux/src/codec/h265/import.rs index e3e25978e..a180aef44 100644 --- a/rs/moq-mux/src/codec/h265/import.rs +++ b/rs/moq-mux/src/codec/h265/import.rs @@ -127,7 +127,7 @@ impl Import { pub fn decode_stream>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { let pts = self.pts(pts)?; @@ -151,7 +151,7 @@ impl Import { pub fn decode_frame>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { let pts = self.pts(pts)?; // Iterate over the NAL units in the buffer based on start codes. @@ -175,7 +175,7 @@ impl Import { /// Decode a single NAL unit. Only reads the first header byte to extract nal_unit_type, /// Ignores nuh_layer_id and nuh_temporal_id_plus1. - fn decode_nal(&mut self, nal: Bytes, pts: Option) -> anyhow::Result<()> { + fn decode_nal(&mut self, nal: Bytes, pts: Option) -> anyhow::Result<()> { anyhow::ensure!(nal.len() >= 2, "NAL unit is too short"); // u16 header: [forbidden_zero_bit(1) | nal_unit_type(6) | nuh_layer_id(6) | nuh_temporal_id_plus1(3)] let header = nal.first().context("NAL unit is too short")?; @@ -286,7 +286,7 @@ impl Import { Ok(()) } - fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { + fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { // If we haven't seen any slices, we shouldn't flush yet. if !self.current.contains_slice { return Ok(()); @@ -338,15 +338,13 @@ impl Import { self.track.is_some() } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> anyhow::Result { if let Some(pts) = hint { return Ok(pts); } let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); - Ok(crate::container::Timestamp::from_micros( - zero.elapsed().as_micros() as u64 - )?) + Ok(moq_net::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) } } diff --git a/rs/moq-mux/src/codec/opus/import.rs b/rs/moq-mux/src/codec/opus/import.rs index 90b484b19..6de8c5bb3 100644 --- a/rs/moq-mux/src/codec/opus/import.rs +++ b/rs/moq-mux/src/codec/opus/import.rs @@ -57,7 +57,7 @@ impl Import { Ok(()) } - pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { + pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { let pts = self.pts(pts)?; // Collect the input into a contiguous Bytes payload. @@ -83,15 +83,13 @@ impl Import { Ok(()) } - fn pts(&mut self, hint: Option) -> anyhow::Result { + fn pts(&mut self, hint: Option) -> anyhow::Result { if let Some(pts) = hint { return Ok(pts); } let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); - Ok(crate::container::Timestamp::from_micros( - zero.elapsed().as_micros() as u64 - )?) + Ok(moq_net::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) } } diff --git a/rs/moq-mux/src/container/consumer.rs b/rs/moq-mux/src/container/consumer.rs index c8dac45d5..5f63ec70d 100644 --- a/rs/moq-mux/src/container/consumer.rs +++ b/rs/moq-mux/src/container/consumer.rs @@ -1,7 +1,9 @@ use std::collections::VecDeque; use std::task::{Poll, ready}; -use super::{Container, Frame, Timestamp}; +use moq_net::Timestamp; + +use super::{Container, Frame}; /// Decode a moq-lite track into a stream of media [`Frame`]s in latency-bounded /// presentation order. @@ -127,7 +129,7 @@ impl Consumer { && current.sequence <= self.current { match current.poll_min_timestamp(waiter, &self.format) { - Poll::Ready(Ok(ts)) => Some::(ts.into()), + Poll::Ready(Ok(ts)) => Some(std::time::Duration::from(ts)), _ => None, } } else { diff --git a/rs/moq-mux/src/container/fmp4/export.rs b/rs/moq-mux/src/container/fmp4/export.rs index 54f5ae946..376812fff 100644 --- a/rs/moq-mux/src/container/fmp4/export.rs +++ b/rs/moq-mux/src/container/fmp4/export.rs @@ -460,9 +460,21 @@ fn should_flush(track: &Fmp4Track, frame: &Frame, fragment_duration: Option true, Some(d) => { - let first = track.buffer.first().unwrap(); - let delta_us = frame.timestamp.as_micros().saturating_sub(first.timestamp.as_micros()); - delta_us >= d.as_micros() + // Frames within a track are in *decode* order; B-frames have + // non-monotonic PTS, so the span of the buffer is min..max of all + // PTS, not just first..incoming. + let mut min = Duration::from(frame.timestamp); + let mut max = min; + for f in &track.buffer { + let pts = Duration::from(f.timestamp); + if pts < min { + min = pts; + } + if pts > max { + max = pts; + } + } + max.saturating_sub(min) >= d } // No video keyframe will ever arrive to roll the fragment, so for // audio-only broadcasts in `None` mode we fall back to per-frame @@ -476,9 +488,10 @@ fn encode_fragment(track: &mut Fmp4Track, frames: Vec) -> anyhow::Result< anyhow::ensure!(!frames.is_empty(), "encode_fragment called with no frames"); let seq = track.sequence_number; track.sequence_number += 1; + let timescale = moq_net::Timescale::new(track.timescale).context("invalid track timescale")?; Ok(crate::container::fmp4::encode_fragment( track.track_id, - track.timescale, + timescale, seq, &frames, )?) diff --git a/rs/moq-mux/src/container/fmp4/export_test.rs b/rs/moq-mux/src/container/fmp4/export_test.rs index 596142e2e..fa17cfd39 100644 --- a/rs/moq-mux/src/container/fmp4/export_test.rs +++ b/rs/moq-mux/src/container/fmp4/export_test.rs @@ -16,8 +16,8 @@ use mp4_atom::{DecodeMaybe, Encode}; /// entry whose avcC is built from the inline SPS+PPS. #[tokio::test(start_paused = true)] async fn avc3_source_to_cmaf_export_roundtrip() { - use crate::container::Timestamp; use hang::catalog::{Container, H264, VideoConfig}; + use moq_net::Timestamp; let broadcast = moq_net::Broadcast::new(); let mut producer = broadcast.produce(); diff --git a/rs/moq-mux/src/container/fmp4/import.rs b/rs/moq-mux/src/container/fmp4/import.rs index 7144deb9c..535af7fdd 100644 --- a/rs/moq-mux/src/container/fmp4/import.rs +++ b/rs/moq-mux/src/container/fmp4/import.rs @@ -1,7 +1,7 @@ -use crate::container::Timestamp; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; use hang::catalog::{AAC, AudioCodec, AudioConfig, Container, H264, H265, VP9, VideoCodec, VideoConfig}; +use moq_net::Timestamp; use mp4_atom::{Any, Atom, DecodeMaybe, Encode, Mdat, Moof, Moov, Trak}; use std::collections::HashMap; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -411,7 +411,8 @@ impl Import { let tfdt = traf.tfdt.as_ref().context("missing tfdt box")?; let mut dts = tfdt.base_media_decode_time; - let timescale = trak.mdia.mdhd.timescale as u64; + let timescale = + moq_net::Timescale::new(trak.mdia.mdhd.timescale as u64).context("invalid fmp4 mdhd.timescale")?; let mut offset = traf.tfhd.base_data_offset.unwrap_or_default() as usize; let mut track_data_start: Option = None; @@ -459,7 +460,9 @@ impl Import { .unwrap_or(tfhd.default_sample_size.unwrap_or(default_sample_size)) as usize; let pts = (dts as i64 + entry.cts.unwrap_or_default() as i64) as u64; - let timestamp = crate::container::Timestamp::from_scale(pts, timescale)?; + // Preserve the fmp4 track's native timescale so a passthrough re-emit + // doesn't go through a lossy microsecond detour. + let timestamp = moq_net::Timestamp::new(pts, timescale)?; if offset + size > mdat.data.len() { anyhow::bail!("invalid data offset"); @@ -476,16 +479,16 @@ impl Import { contains_keyframe |= keyframe; - if timestamp >= max_timestamp.unwrap_or(Timestamp::ZERO) { + if max_timestamp.is_none_or(|max| timestamp >= max) { max_timestamp = Some(timestamp); } - if timestamp <= min_timestamp.unwrap_or(Timestamp::MAX) { + if min_timestamp.is_none_or(|min| timestamp <= min) { min_timestamp = Some(timestamp); } if let Some(last_timestamp) = track.last_timestamp && let Ok(duration) = timestamp.checked_sub(last_timestamp) - && duration < track.min_duration.unwrap_or(Timestamp::MAX) + && track.min_duration.is_none_or(|min| duration < min) { track.min_duration = Some(duration); } @@ -591,7 +594,7 @@ impl Import { if let (Some(min), Some(max), Some(min_duration)) = (min_timestamp, max_timestamp, track.min_duration) { let jitter = max - min + min_duration; - if jitter < track.jitter.unwrap_or(Timestamp::MAX) { + if track.jitter.is_none_or(|j| jitter < j) { track.jitter = Some(jitter); let mut catalog = self.catalog.lock(); @@ -603,7 +606,7 @@ impl Import { .renditions .get_mut(&track.track.name) .context("missing video config")?; - config.jitter = Some(jitter.convert()?); + config.jitter = Some(jitter.into()); } TrackKind::Audio => { let config = catalog @@ -611,7 +614,7 @@ impl Import { .renditions .get_mut(&track.track.name) .context("missing audio config")?; - config.jitter = Some(jitter.convert()?); + config.jitter = Some(jitter.into()); } } } diff --git a/rs/moq-mux/src/container/fmp4/mod.rs b/rs/moq-mux/src/container/fmp4/mod.rs index ff2c7d437..2d2dfc48e 100644 --- a/rs/moq-mux/src/container/fmp4/mod.rs +++ b/rs/moq-mux/src/container/fmp4/mod.rs @@ -23,7 +23,9 @@ use bytes::Bytes; use hang::catalog::{AudioCodec, AudioConfig, VideoCodec, VideoConfig}; use mp4_atom::Atom; -use crate::container::{Container, Frame, Timestamp}; +use moq_net::Timestamp; + +use crate::container::{Container, Frame}; #[derive(Debug, thiserror::Error)] #[non_exhaustive] @@ -109,7 +111,7 @@ impl Container for Wire { type Error = Error; fn write(&self, group: &mut moq_net::GroupProducer, frames: &[Frame]) -> Result<(), Self::Error> { - let timescale = self.trak.mdia.mdhd.timescale as u64; + let timescale = moq_net::Timescale::new(self.trak.mdia.mdhd.timescale as u64)?; let track_id = self.trak.tkhd.track_id; encode(group, frames, timescale, track_id) } @@ -125,12 +127,12 @@ impl Container for Wire { return Poll::Ready(Ok(None)); }; - let timescale = self.trak.mdia.mdhd.timescale as u64; + let timescale = moq_net::Timescale::new(self.trak.mdia.mdhd.timescale as u64)?; Poll::Ready(Ok(Some(decode(data, timescale)?))) } } -pub(crate) fn decode(data: Bytes, timescale: u64) -> Result, Error> { +pub(crate) fn decode(data: Bytes, timescale: moq_net::Timescale) -> Result, Error> { use mp4_atom::DecodeMaybe; let mut cursor = std::io::Cursor::new(&data); @@ -169,7 +171,8 @@ pub(crate) fn decode(data: Bytes, timescale: u64) -> Result, Error> { let cts = entry.cts.unwrap_or_default() as i64; let pts = dts.checked_add_signed(cts).ok_or(Error::PtsOverflow)?; - let timestamp = Timestamp::from_scale(pts, timescale)?; + // Preserve the fmp4 track's native scale through the pipeline. + let timestamp = Timestamp::new(pts, timescale)?; let payload = Bytes::copy_from_slice(&mdat_data[offset..end]); let flags = entry.flags.unwrap_or(0); // depends_on_no_other (bits 24-25 == 0x2) means keyframe @@ -192,7 +195,7 @@ pub(crate) fn decode(data: Bytes, timescale: u64) -> Result, Error> { pub(crate) fn encode( group: &mut moq_net::GroupProducer, frames: &[Frame], - timescale: u64, + timescale: moq_net::Timescale, track_id: u32, ) -> Result<(), Error> { if frames.is_empty() { @@ -218,7 +221,7 @@ pub(crate) fn encode( /// Returns an empty `Bytes` when `frames` is empty. pub(crate) fn encode_fragment( track_id: u32, - timescale: u64, + timescale: moq_net::Timescale, sequence_number: u32, frames: &[Frame], ) -> Result { @@ -228,7 +231,11 @@ pub(crate) fn encode_fragment( return Ok(Bytes::new()); } - let dts = (frames[0].timestamp.as_micros() * timescale as u128 / 1_000_000) as u64; + // Re-express the first frame's timestamp at the target track's scale. When the + // importer preserved the source scale (the common passthrough case), this is a + // no-op; otherwise it's a single rescale rather than the legacy `micros * scale + // / 1_000_000` round-trip. + let dts = frames[0].timestamp.as_scale(timescale) as u64; let entries: Vec<_> = frames .iter() diff --git a/rs/moq-mux/src/container/jitter.rs b/rs/moq-mux/src/container/jitter.rs index 4d3e67299..e026b3776 100644 --- a/rs/moq-mux/src/container/jitter.rs +++ b/rs/moq-mux/src/container/jitter.rs @@ -1,4 +1,6 @@ -use crate::container::Timestamp; +use std::time::Duration; + +use moq_net::Timestamp; /// Tracks the minimum duration between consecutive frames. /// @@ -18,19 +20,21 @@ impl MinFrameDuration { /// Record a new frame timestamp. /// - /// Returns the new minimum-frame-duration as a `moq_net::Time` if it - /// changed, so the caller can persist it on the catalog rendition. Returns - /// `None` when this is the first observation, the timestamps are - /// non-monotonic, or the new gap is no smaller than the recorded minimum. - pub fn observe(&mut self, ts: Timestamp) -> Option { + /// Returns the new minimum-frame-duration as a [`Duration`] if it changed, so + /// the caller can persist it on the catalog rendition. Returns `None` when this + /// is the first observation, the timestamps are non-monotonic, or the new gap + /// is no smaller than the recorded minimum. + pub fn observe(&mut self, ts: Timestamp) -> Option { let last = self.last_timestamp.replace(ts)?; let duration = ts.checked_sub(last).ok()?; - if duration >= self.min_duration.unwrap_or(Timestamp::MAX) { + if let Some(min) = self.min_duration + && duration >= min + { return None; } self.min_duration = Some(duration); - duration.convert().ok() + Some(duration.into()) } } diff --git a/rs/moq-mux/src/container/loc/mod.rs b/rs/moq-mux/src/container/loc/mod.rs index 5efc3dbb6..d2efe4336 100644 --- a/rs/moq-mux/src/container/loc/mod.rs +++ b/rs/moq-mux/src/container/loc/mod.rs @@ -7,20 +7,27 @@ use std::task::Poll; -use crate::container::{Container, Frame, Timestamp}; +use moq_net::{Timescale, Timestamp}; + +use crate::container::{Container, Frame}; + +/// LOC's catalog convention: timestamps are in microseconds when no per-frame +/// 0x08 timescale property is present. +const DEFAULT_TIMESCALE: Timescale = Timescale::MICRO; /// LOC wire format. Each moq frame holds one LOC frame. #[derive(Default)] pub struct Wire; -const DEFAULT_TIMESCALE: u64 = 1_000_000; - impl Container for Wire { type Error = crate::Error; fn write(&self, group: &mut moq_net::GroupProducer, frames: &[Frame]) -> Result<(), Self::Error> { for frame in frames { - let data = moq_loc::encode(frame.timestamp.as_micros() as u64, &frame.payload)?; + // LOC's wire format omits per-frame timescale by convention; the catalog + // default is microseconds, so convert at the boundary. + let timestamp = frame.timestamp.convert(DEFAULT_TIMESCALE).map_err(hang::Error::from)?; + let data = moq_loc::encode(timestamp.value(), &frame.payload)?; let mut chunked = group.create_frame(data.len().into())?; chunked.write(data)?; @@ -41,8 +48,14 @@ impl Container for Wire { }; let loc = moq_loc::decode(data)?; - let timescale = loc.timescale.unwrap_or(DEFAULT_TIMESCALE); - let timestamp = Timestamp::from_scale(loc.timestamp, timescale).map_err(hang::Error::from)?; + // `loc.timescale == Some(0)` is a malformed wire (caught by moq_loc::decode itself), + // so any Some(_) we see here is non-zero. Falling back to the catalog default + // keeps this code path infallible. + let scale = loc + .timescale + .and_then(|s| Timescale::new(s).ok()) + .unwrap_or(DEFAULT_TIMESCALE); + let timestamp = Timestamp::new(loc.timestamp, scale).map_err(hang::Error::from)?; Poll::Ready(Ok(Some(vec![Frame { timestamp, diff --git a/rs/moq-mux/src/container/mkv/export.rs b/rs/moq-mux/src/container/mkv/export.rs index ee335564b..38c934f2b 100644 --- a/rs/moq-mux/src/container/mkv/export.rs +++ b/rs/moq-mux/src/container/mkv/export.rs @@ -459,7 +459,11 @@ impl Export { let kind = track.kind; let payload = &frame.payload; - let frame_ticks: u64 = (frame.timestamp.as_micros() / 1_000) + // MKV's wire scale is ms (TIMESTAMP_SCALE_NS = 1_000_000). Re-express the + // frame's timestamp directly at MILLI rather than going through micros. + let frame_ticks: u64 = frame + .timestamp + .as_millis() .try_into() .context("timestamp doesn't fit in u64 ms")?; diff --git a/rs/moq-mux/src/container/mkv/export_test.rs b/rs/moq-mux/src/container/mkv/export_test.rs index 12311cf31..aa118dec2 100644 --- a/rs/moq-mux/src/container/mkv/export_test.rs +++ b/rs/moq-mux/src/container/mkv/export_test.rs @@ -255,8 +255,8 @@ async fn export_avc3_source_synthesizes_avcc_and_length_prefixes() { // Annex-B with inline SPS+PPS before keyframes. The exporter must // (a) defer the header until SPS+PPS arrive, (b) emit avcC in CodecPrivate, // (c) length-prefix the sample bytes in each SimpleBlock. - use crate::container::Timestamp; use hang::catalog::{Container, H264, VideoConfig}; + use moq_net::Timestamp; let broadcast = moq_net::Broadcast::new(); let mut producer = broadcast.produce(); diff --git a/rs/moq-mux/src/container/mkv/import.rs b/rs/moq-mux/src/container/mkv/import.rs index 256f0dd8b..61b2a0877 100644 --- a/rs/moq-mux/src/container/mkv/import.rs +++ b/rs/moq-mux/src/container/mkv/import.rs @@ -2,10 +2,10 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::io::Cursor; -use crate::container::Timestamp; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; use hang::catalog::{AAC, AudioCodec, AudioConfig, Container, H264, H265, VP9, VideoCodec, VideoConfig}; +use moq_net::Timestamp; use mp4_atom::Atom; use tokio::io::{AsyncRead, AsyncReadExt}; use webm_iterable::WebmIterator; @@ -344,7 +344,8 @@ impl Import { return Ok(()); }; - // Compute PTS in nanoseconds, then convert to the Timestamp's microsecond timescale. + // Compute PTS in MKV's native nanosecond units and stamp it on the + // timestamp at NANO scale so a passthrough re-emit preserves precision. let block_ticks = (self.cluster_timestamp as i64) + (rel_ts as i64); anyhow::ensure!(block_ticks >= 0, "negative block timestamp"); diff --git a/rs/moq-mux/src/container/mod.rs b/rs/moq-mux/src/container/mod.rs index 1050202e7..cfb87651b 100644 --- a/rs/moq-mux/src/container/mod.rs +++ b/rs/moq-mux/src/container/mod.rs @@ -29,10 +29,6 @@ pub use consumer::Consumer; pub use producer::Producer; pub(crate) use source::{CatalogSource, ExportSource}; -/// Microsecond presentation timestamp, the canonical timebase for media -/// frames in moq-mux. -pub type Timestamp = moq_net::Timescale<1_000_000>; - /// A decoded media frame: timestamp, payload bytes, keyframe flag. /// /// `payload` is the raw codec bitstream that gets handed to the decoder. @@ -42,10 +38,13 @@ pub type Timestamp = moq_net::Timescale<1_000_000>; pub struct Frame { /// Presentation timestamp. /// - /// Microsecond precision. Frames within a track must be in *decode* - /// order, not display order. B-frames may have non-monotonic - /// presentation timestamps. - pub timestamp: Timestamp, + /// Each container picks its own native scale: fmp4 uses the source + /// `mdhd.timescale`, mkv uses nanoseconds, legacy is fixed at microseconds. + /// LOC defaults to microseconds but a decoded frame keeps whatever per-frame + /// timescale the wire carried, so an exporter can re-emit without forcing + /// micros. Frames within a track must be in *decode* order, not display + /// order. B-frames may have non-monotonic presentation timestamps. + pub timestamp: moq_net::Timestamp, /// Encoded codec payload. pub payload: Bytes, diff --git a/rs/moq-mux/src/container/producer.rs b/rs/moq-mux/src/container/producer.rs index a618e63d1..5457148f0 100644 --- a/rs/moq-mux/src/container/producer.rs +++ b/rs/moq-mux/src/container/producer.rs @@ -98,12 +98,16 @@ impl Producer { } else { self.buffer.push(frame); - // Check if buffered duration exceeds latency. + // Flush if the buffered span has reached the latency budget. Compute + // min/max across the buffer rather than first/last: frames within a track + // are in *decode* order, and B-frames have non-monotonic PTS, so + // `last - first` can shrink as a B-frame lands between two earlier-PTS + // frames. The min/max pair captures the actual presentation span. if self.buffer.len() >= 2 { - let first_ts: std::time::Duration = self.buffer.first().unwrap().timestamp.into(); - let last_ts: std::time::Duration = self.buffer.last().unwrap().timestamp.into(); - - if last_ts.saturating_sub(first_ts) >= self.latency { + let mut iter = self.buffer.iter().map(|f| std::time::Duration::from(f.timestamp)); + let first = iter.next().unwrap(); + let (min, max) = iter.fold((first, first), |(min, max), d| (min.min(d), max.max(d))); + if max.saturating_sub(min) >= self.latency { self.flush()?; } } @@ -177,7 +181,7 @@ mod tests { use super::*; use crate::catalog::hang::Container; - use crate::container::Timestamp; + use moq_net::Timestamp; fn frame(timestamp_us: u64, keyframe: bool) -> Frame { Frame { diff --git a/rs/moq-mux/src/import.rs b/rs/moq-mux/src/import.rs index 0fa2ed852..80bd3b0a5 100644 --- a/rs/moq-mux/src/import.rs +++ b/rs/moq-mux/src/import.rs @@ -202,7 +202,7 @@ impl Framed { pub fn decode_frame>( &mut self, buf: &mut T, - pts: Option, + pts: Option, ) -> anyhow::Result<()> { match self.decoder { FramedKind::H264(ref mut decoder) => decoder.decode_frame(buf, pts)?, diff --git a/rs/moq-net/src/coding/varint.rs b/rs/moq-net/src/coding/varint.rs index 81e6b8ca8..bd35d9f45 100644 --- a/rs/moq-net/src/coding/varint.rs +++ b/rs/moq-net/src/coding/varint.rs @@ -169,7 +169,7 @@ impl fmt::Display for VarInt { impl VarInt { /// Decode a QUIC-style varint (2-bit length tag in top bits). - fn decode_quic(r: &mut R) -> Result { + pub fn decode_quic(r: &mut R) -> Result { if !r.has_remaining() { return Err(DecodeError::Short); } @@ -210,7 +210,7 @@ impl VarInt { } /// Encode a QUIC-style varint (2-bit length tag in top bits). - fn encode_quic(&self, w: &mut W) -> Result<(), EncodeError> { + pub fn encode_quic(&self, w: &mut W) -> Result<(), EncodeError> { let remaining = w.remaining_mut(); if self.0 < (1u64 << 6) { if remaining < 1 { diff --git a/rs/moq-net/src/lib.rs b/rs/moq-net/src/lib.rs index cff083c27..523123a66 100644 --- a/rs/moq-net/src/lib.rs +++ b/rs/moq-net/src/lib.rs @@ -62,7 +62,7 @@ mod stats; mod version; pub use client::*; -pub use coding::{BoundsExceeded, DecodeError, EncodeError}; +pub use coding::{BoundsExceeded, DecodeError, EncodeError, VarInt}; pub use error::*; pub use model::*; pub use path::*; diff --git a/rs/moq-net/src/model/time.rs b/rs/moq-net/src/model/time.rs index c24629cff..1d47addfa 100644 --- a/rs/moq-net/src/model/time.rs +++ b/rs/moq-net/src/model/time.rs @@ -1,65 +1,148 @@ -use rand::Rng; - -use crate::Error; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - +use std::num::NonZero; use std::sync::LazyLock; use std::time::{SystemTime, UNIX_EPOCH}; -/// A timestamp representing the presentation time in milliseconds. -/// -/// The underlying implementation supports any scale, but everything uses milliseconds by default. -pub type Time = Timescale<1_000>; +use rand::Rng; + +use crate::coding::VarInt; -/// Returned when a [`Timescale`] operation would exceed the QUIC VarInt range -/// (`2^62 - 1`) or overflow during scale conversion or arithmetic. +/// Returned when a [`Timestamp`] operation would exceed the QUIC VarInt range +/// (`2^62 - 1`), overflow during scale conversion or arithmetic, or attempt +/// arithmetic between timestamps with mismatched scales. #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] #[error("time overflow")] pub struct TimeOverflow; -/// A timestamp representing the presentation time in a given scale. ex. 1000 for milliseconds. -/// -/// All timestamps within a track are relative, so zero for one track is not zero for another. -/// Values are constrained to fit within a QUIC VarInt (2^62) so they can be encoded and decoded easily. +/// Units per second used by a track for frame timestamps. /// -/// This is [std::time::Instant] and [std::time::Duration] merged into one type for simplicity. -#[derive(Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +/// Newtype around [`NonZero`] — zero is structurally impossible, so the +/// arithmetic on [`Timestamp`] can divide by `self.scale` without ever risking +/// a divide by zero. Use the named constants ([`Self::SECOND`], [`Self::MILLI`], +/// [`Self::MICRO`], [`Self::NANO`]) instead of writing raw integers at call sites; +/// for runtime values, use [`Self::new`] which returns [`TimeOverflow`] for `0` or +/// for values past the QUIC varint range. +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct Timescale(VarInt); +pub struct Timescale(NonZero); + +impl Timescale { + /// One unit per second (`1`). + pub const SECOND: Self = Self(NonZero::::MIN); + /// 1,000 units per second (`1_000`). + pub const MILLI: Self = match NonZero::new(1_000) { + Some(n) => Self(n), + None => unreachable!(), + }; + /// 1,000,000 units per second (`1_000_000`). Common default for media tracks. + pub const MICRO: Self = match NonZero::new(1_000_000) { + Some(n) => Self(n), + None => unreachable!(), + }; + /// 1,000,000,000 units per second (`1_000_000_000`). + pub const NANO: Self = match NonZero::new(1_000_000_000) { + Some(n) => Self(n), + None => unreachable!(), + }; + + /// Construct a timescale from a raw value (units per second). + /// + /// Returns [`TimeOverflow`] if `units_per_second` is `0` (would divide by zero) + /// or exceeds `2^62 - 1` (the QUIC varint range, matching [`Timestamp`] values). + pub const fn new(units_per_second: u64) -> Result { + // Reject values that wouldn't fit in a QUIC varint, keeping the constraint + // symmetric with Timestamp's raw value. + if VarInt::from_u64(units_per_second).is_none() { + return Err(TimeOverflow); + } + match NonZero::new(units_per_second) { + Some(n) => Ok(Self(n)), + None => Err(TimeOverflow), + } + } + + /// The raw units-per-second value (always non-zero). + pub const fn as_u64(self) -> u64 { + self.0.get() + } +} + +impl TryFrom for Timescale { + type Error = TimeOverflow; + + fn try_from(units_per_second: u64) -> Result { + Self::new(units_per_second) + } +} -impl Timescale { - /// The maximum representable instant. - pub const MAX: Self = Self(VarInt::MAX); +impl From> for Timescale { + fn from(units_per_second: NonZero) -> Self { + Self(units_per_second) + } +} + +impl From for u64 { + fn from(scale: Timescale) -> Self { + scale.0.get() + } +} + +impl From for NonZero { + fn from(scale: Timescale) -> Self { + scale.0 + } +} - /// The minimum representable instant. - pub const ZERO: Self = Self(VarInt::ZERO); +impl std::fmt::Debug for Timescale { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + Self::SECOND => write!(f, "Timescale::SECOND"), + Self::MILLI => write!(f, "Timescale::MILLI"), + Self::MICRO => write!(f, "Timescale::MICRO"), + Self::NANO => write!(f, "Timescale::NANO"), + Self(n) => write!(f, "Timescale({n})"), + } + } +} - /// Construct a timestamp directly from a value in this scale's units. Infallible - /// because any `u32` fits within the 62-bit varint range. - pub const fn new(value: u32) -> Self { - Self(VarInt::from_u32(value)) +impl std::fmt::Display for Timescale { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) } +} - /// Construct a timestamp directly from a value in this scale's units. Returns - /// [`TimeOverflow`] if `value` exceeds the 62-bit varint range. - pub const fn new_u64(value: u64) -> Result { +/// A timestamp in a track's timescale (units per second). +/// +/// All timestamps within a track are relative, so zero for one track is not zero for another. +/// The underlying value is constrained to fit within a QUIC VarInt (`2^62 - 1`) so it can be +/// encoded and decoded easily; the scale is carried alongside so frames from different +/// sources can be compared and converted without lossy detours through a single fixed scale. +/// +/// The scale is a [`Timescale`] (always non-zero), so unit conversions (`as_secs`, `as_millis`, +/// etc.) are infallible. Use [`Option`] at call sites that need a "missing" sentinel +/// instead of relying on a magic value. +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Timestamp { + value: VarInt, + scale: Timescale, +} + +impl Timestamp { + /// Construct a timestamp directly from a raw value at the given scale. + /// Returns [`TimeOverflow`] if `value` exceeds `2^62 - 1`. + pub const fn new(value: u64, scale: Timescale) -> Result { match VarInt::from_u64(value) { - Some(varint) => Ok(Self(varint)), + Some(value) => Ok(Self { value, scale }), None => Err(TimeOverflow), } } - /// Convert a number of seconds to a timestamp, returning an error if the timestamp would overflow. + /// Convert a number of seconds to a timestamp at [`Timescale::SECOND`]. pub const fn from_secs(seconds: u64) -> Result { - // Not using from_scale because it'll be slightly faster - match seconds.checked_mul(SCALE) { - Some(value) => Self::new_u64(value), - None => Err(TimeOverflow), - } + Self::new(seconds, Timescale::SECOND) } - /// Like [`Self::from_secs`] but panics on overflow. Intended for `const` - /// initializers where overflow indicates a bug, not a runtime condition. + /// Like [`Self::from_secs`] but panics on overflow. pub const fn from_secs_unchecked(seconds: u64) -> Self { match Self::from_secs(seconds) { Ok(time) => time, @@ -67,187 +150,175 @@ impl Timescale { } } - /// Convert a number of milliseconds to a timestamp, returning an error if the timestamp would overflow. + /// Convert a number of milliseconds to a timestamp at [`Timescale::MILLI`]. pub const fn from_millis(millis: u64) -> Result { - Self::from_scale(millis, 1000) + Self::new(millis, Timescale::MILLI) } /// Like [`Self::from_millis`] but panics on overflow. pub const fn from_millis_unchecked(millis: u64) -> Self { - Self::from_scale_unchecked(millis, 1000) + match Self::from_millis(millis) { + Ok(time) => time, + Err(_) => panic!("time overflow"), + } } - /// Convert a number of microseconds to a timestamp, returning an error on overflow. + /// Convert a number of microseconds to a timestamp at [`Timescale::MICRO`]. pub const fn from_micros(micros: u64) -> Result { - Self::from_scale(micros, 1_000_000) + Self::new(micros, Timescale::MICRO) } /// Like [`Self::from_micros`] but panics on overflow. pub const fn from_micros_unchecked(micros: u64) -> Self { - Self::from_scale_unchecked(micros, 1_000_000) + match Self::from_micros(micros) { + Ok(time) => time, + Err(_) => panic!("time overflow"), + } } - /// Convert a number of nanoseconds to a timestamp, returning an error on overflow. + /// Convert a number of nanoseconds to a timestamp at [`Timescale::NANO`]. pub const fn from_nanos(nanos: u64) -> Result { - Self::from_scale(nanos, 1_000_000_000) + Self::new(nanos, Timescale::NANO) } /// Like [`Self::from_nanos`] but panics on overflow. pub const fn from_nanos_unchecked(nanos: u64) -> Self { - Self::from_scale_unchecked(nanos, 1_000_000_000) + match Self::from_nanos(nanos) { + Ok(time) => time, + Err(_) => panic!("time overflow"), + } } - /// Construct from `value` measured at the given `scale` (units per second), rescaling - /// to `SCALE`. Returns [`TimeOverflow`] if the rescaled value exceeds 2^62. - pub const fn from_scale(value: u64, scale: u64) -> Result { - match VarInt::from_u128(value as u128 * SCALE as u128 / scale as u128) { - Some(varint) => Ok(Self(varint)), - None => Err(TimeOverflow), - } + /// The raw value in the timestamp's own scale. + pub const fn value(self) -> u64 { + self.value.into_inner() + } + + /// The scale (units per second) attached to this timestamp. + pub const fn scale(self) -> Timescale { + self.scale } - /// Like [`Self::from_scale`] but accepts a `u128` source value. - pub const fn from_scale_u128(value: u128, scale: u64) -> Result { - match value.checked_mul(SCALE as u128) { - Some(value) => match VarInt::from_u128(value / scale as u128) { - Some(varint) => Ok(Self(varint)), + /// Whether the raw value is zero. Does not consider scale. + pub const fn is_zero(self) -> bool { + self.value.into_inner() == 0 + } + + /// Re-express this timestamp at a new scale. Returns [`TimeOverflow`] if the new + /// value would exceed `2^62 - 1`. + pub const fn convert(self, new_scale: Timescale) -> Result { + if self.scale.0.get() == new_scale.0.get() { + return Ok(self); + } + match (self.value.into_inner() as u128).checked_mul(new_scale.0.get() as u128) { + Some(scaled) => match VarInt::from_u128(scaled / self.scale.0.get() as u128) { + Some(value) => Ok(Self { + value, + scale: new_scale, + }), None => Err(TimeOverflow), }, None => Err(TimeOverflow), } } - /// Like [`Self::from_scale`] but panics on overflow. - pub const fn from_scale_unchecked(value: u64, scale: u64) -> Self { - match Self::from_scale(value, scale) { - Ok(time) => time, - Err(_) => panic!("time overflow"), - } + /// The value re-expressed at `target` as a `u128`. + pub const fn as_scale(self, target: Timescale) -> u128 { + self.value.into_inner() as u128 * target.0.get() as u128 / self.scale.0.get() as u128 } - /// Get the timestamp as seconds. + /// The value re-expressed in seconds. pub const fn as_secs(self) -> u64 { - self.0.into_inner() / SCALE + self.value.into_inner() / self.scale.0.get() } - /// Get the timestamp as milliseconds. - // - // This returns a u128 to avoid a possible overflow when SCALE < 250 + /// The value re-expressed in milliseconds. pub const fn as_millis(self) -> u128 { - self.as_scale(1000) + self.as_scale(Timescale::MILLI) } - /// Get the timestamp as microseconds. + /// The value re-expressed in microseconds. pub const fn as_micros(self) -> u128 { - self.as_scale(1_000_000) + self.as_scale(Timescale::MICRO) } - /// Get the timestamp as nanoseconds. + /// The value re-expressed in nanoseconds. pub const fn as_nanos(self) -> u128 { - self.as_scale(1_000_000_000) + self.as_scale(Timescale::NANO) } - /// Convert this timestamp to the given `scale` (units per second). - pub const fn as_scale(self, scale: u64) -> u128 { - self.0.into_inner() as u128 * scale as u128 / SCALE as u128 - } - - /// Get the maximum of two timestamps. + /// Return the larger of two timestamps. + /// + /// Panics if the scales differ. Use [`Self::convert`] first if you need to compare + /// across scales. pub const fn max(self, other: Self) -> Self { - if self.0.into_inner() > other.0.into_inner() { + assert!(self.scale.0.get() == other.scale.0.get(), "mismatched timestamp scales"); + if self.value.into_inner() > other.value.into_inner() { self } else { other } } - /// Add two timestamps, returning [`TimeOverflow`] if the sum exceeds 2^62. + /// Add two timestamps. Returns [`TimeOverflow`] if the sum exceeds `2^62 - 1` or + /// if the scales differ. pub const fn checked_add(self, rhs: Self) -> Result { - let lhs = self.0.into_inner(); - let rhs = rhs.0.into_inner(); - match lhs.checked_add(rhs) { - Some(result) => Self::new_u64(result), + if self.scale.0.get() != rhs.scale.0.get() { + return Err(TimeOverflow); + } + match self.value.into_inner().checked_add(rhs.value.into_inner()) { + Some(result) => Self::new(result, self.scale), None => Err(TimeOverflow), } } - /// Subtract `rhs` from `self`, returning [`TimeOverflow`] if `rhs > self`. + /// Subtract `rhs` from `self`. Returns [`TimeOverflow`] if `rhs > self` or if the + /// scales differ. pub const fn checked_sub(self, rhs: Self) -> Result { - let lhs = self.0.into_inner(); - let rhs = rhs.0.into_inner(); - match lhs.checked_sub(rhs) { - Some(result) => Self::new_u64(result), + if self.scale.0.get() != rhs.scale.0.get() { + return Err(TimeOverflow); + } + match self.value.into_inner().checked_sub(rhs.value.into_inner()) { + Some(result) => Self::new(result, self.scale), None => Err(TimeOverflow), } } - /// Whether this timestamp is [`Self::ZERO`]. - pub const fn is_zero(self) -> bool { - self.0.into_inner() == 0 - } - - /// Current time as a timestamp, derived from [`tokio::time::Instant::now`] so - /// it honors `tokio::time::pause` in tests. + /// Current time, expressed in microseconds ([`Timescale::MICRO`]). Uses + /// [`tokio::time::Instant::now`] so it honors `tokio::time::pause` in tests. pub fn now() -> Self { - // We use tokio so it can be stubbed for testing. tokio::time::Instant::now().into() } - - /// Convert this timestamp to a different scale. - /// - /// This allows converting between different TimeScale types, for example from milliseconds to microseconds. - /// Note that converting to a coarser scale may lose precision due to integer division. - pub const fn convert(self) -> Result, TimeOverflow> { - let value = self.0.into_inner(); - // Convert from SCALE to NEW_SCALE: value * NEW_SCALE / SCALE - match (value as u128).checked_mul(NEW_SCALE as u128) { - Some(v) => match v.checked_div(SCALE as u128) { - Some(v) => match VarInt::from_u128(v) { - Some(varint) => Ok(Timescale(varint)), - None => Err(TimeOverflow), - }, - None => Err(TimeOverflow), - }, - None => Err(TimeOverflow), - } - } - - /// Encode this timestamp as a QUIC varint. Version-independent. - pub fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - // Version-independent: uses QUIC varint encoding. - self.0.encode(w, crate::lite::Version::Lite01)?; - Ok(()) - } - - /// Decode a timestamp from a QUIC varint. Version-independent. - pub fn decode(r: &mut R) -> Result { - // Version-independent: uses QUIC varint encoding. - let v = VarInt::decode(r, crate::lite::Version::Lite01)?; - Ok(Self(v)) - } } -impl TryFrom for Timescale { +impl TryFrom for Timestamp { type Error = TimeOverflow; + /// Convert a [`std::time::Duration`] into a nanosecond-scale timestamp. fn try_from(duration: std::time::Duration) -> Result { - Self::from_scale_u128(duration.as_nanos(), 1_000_000_000) + match VarInt::from_u128(duration.as_nanos()) { + Some(value) => Ok(Self { + value, + scale: Timescale::NANO, + }), + None => Err(TimeOverflow), + } } } -impl From> for std::time::Duration { - fn from(time: Timescale) -> Self { - std::time::Duration::new(time.as_secs(), (time.as_nanos() % 1_000_000_000) as u32) +impl From for std::time::Duration { + fn from(time: Timestamp) -> Self { + let nanos = time.as_nanos(); + std::time::Duration::new(time.as_secs(), (nanos % 1_000_000_000) as u32) } } -impl std::fmt::Debug for Timescale { +impl std::fmt::Debug for Timestamp { #[allow(clippy::manual_is_multiple_of)] // is_multiple_of is unstable in Rust 1.85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let nanos = self.as_nanos(); - // Choose the largest unit where we don't need decimal places - // Check from largest to smallest unit + // Choose the largest unit where we don't need decimal places. if nanos % 1_000_000_000 == 0 { write!(f, "{}s", nanos / 1_000_000_000) } else if nanos % 1_000_000 == 0 { @@ -260,29 +331,56 @@ impl std::fmt::Debug for Timescale { } } -impl std::ops::Add for Timescale { +impl PartialOrd for Timestamp { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Timestamp { + /// Compare two timestamps, normalizing across scales when they differ. + /// + /// - If both scales are equal, compares raw values directly. + /// - Otherwise cross-multiplies in 128-bit so e.g. `1s > 2ms` orders correctly. + /// + /// When the cross-scale comparison would otherwise be `Equal` (e.g. `from_secs(1)` + /// vs `from_millis(1000)`), breaks ties by `(scale, value)` so the result agrees + /// with derived `PartialEq`/`Eq`/`Hash` (which are field-wise). + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + if self.scale.0.get() == other.scale.0.get() { + return self.value.cmp(&other.value); + } + let lhs = self.value.into_inner() as u128 * other.scale.0.get() as u128; + let rhs = other.value.into_inner() as u128 * self.scale.0.get() as u128; + lhs.cmp(&rhs) + .then_with(|| self.scale.0.get().cmp(&other.scale.0.get())) + .then_with(|| self.value.cmp(&other.value)) + } +} + +impl std::ops::Add for Timestamp { type Output = Self; fn add(self, rhs: Self) -> Self { - self.checked_add(rhs).expect("time overflow") + self.checked_add(rhs).expect("time overflow or scale mismatch") } } -impl std::ops::AddAssign for Timescale { +impl std::ops::AddAssign for Timestamp { fn add_assign(&mut self, rhs: Self) { *self = *self + rhs; } } -impl std::ops::Sub for Timescale { +impl std::ops::Sub for Timestamp { type Output = Self; fn sub(self, rhs: Self) -> Self { - self.checked_sub(rhs).expect("time overflow") + self.checked_sub(rhs).expect("time overflow or scale mismatch") } } -impl std::ops::SubAssign for Timescale { +impl std::ops::SubAssign for Timestamp { fn sub_assign(&mut self, rhs: Self) { *self = *self - rhs; } @@ -297,54 +395,39 @@ static TIME_ANCHOR: LazyLock<(std::time::Instant, SystemTime)> = LazyLock::new(| (std::time::Instant::now(), SystemTime::now() - jitter) }); -// Convert an Instant to a Unix timestamp -impl From for Timescale { +impl From for Timestamp { + /// Convert an [`std::time::Instant`] into a microsecond-scale timestamp anchored to a + /// jittered wall-clock reference (see `TIME_ANCHOR`). fn from(instant: std::time::Instant) -> Self { let (anchor_instant, anchor_system) = *TIME_ANCHOR; - // Conver the instant to a SystemTime. let system = match instant.checked_duration_since(anchor_instant) { Some(forward) => anchor_system + forward, None => anchor_system - anchor_instant.duration_since(instant), }; - // Convert the SystemTime to a Unix timestamp in nanoseconds. - // We'll then convert that to the desired scale. - system + let duration = system .duration_since(UNIX_EPOCH) - .expect("dude your clock is earlier than 1970") - .try_into() - .expect("dude your clock is later than 2116") + .expect("dude your clock is earlier than 1970"); + + Self::from_micros(duration.as_micros() as u64).expect("dude your clock is later than 2116") } } -impl From for Timescale { +impl From for Timestamp { fn from(instant: tokio::time::Instant) -> Self { instant.into_std().into() } } -impl Decode for Timescale { - fn decode(r: &mut R, version: crate::Version) -> Result { - let v = VarInt::decode(r, version)?; - Ok(Self(v)) - } -} - -impl Encode for Timescale { - fn encode(&self, w: &mut W, version: crate::Version) -> Result<(), EncodeError> { - self.0.encode(w, version)?; - Ok(()) - } -} - #[cfg(test)] mod tests { use super::*; #[test] fn test_from_secs() { - let time = Time::from_secs(5).unwrap(); + let time = Timestamp::from_secs(5).unwrap(); + assert_eq!(time.scale(), Timescale::SECOND); assert_eq!(time.as_secs(), 5); assert_eq!(time.as_millis(), 5000); assert_eq!(time.as_micros(), 5_000_000); @@ -353,368 +436,199 @@ mod tests { #[test] fn test_from_millis() { - let time = Time::from_millis(5000).unwrap(); + let time = Timestamp::from_millis(5000).unwrap(); + assert_eq!(time.scale(), Timescale::MILLI); assert_eq!(time.as_secs(), 5); assert_eq!(time.as_millis(), 5000); } #[test] fn test_from_micros() { - let time = Time::from_micros(5_000_000).unwrap(); + let time = Timestamp::from_micros(5_000_000).unwrap(); + assert_eq!(time.scale(), Timescale::MICRO); assert_eq!(time.as_secs(), 5); - assert_eq!(time.as_millis(), 5000); assert_eq!(time.as_micros(), 5_000_000); } #[test] fn test_from_nanos() { - let time = Time::from_nanos(5_000_000_000).unwrap(); + let time = Timestamp::from_nanos(5_000_000_000).unwrap(); + assert_eq!(time.scale(), Timescale::NANO); assert_eq!(time.as_secs(), 5); - assert_eq!(time.as_millis(), 5000); - assert_eq!(time.as_micros(), 5_000_000); assert_eq!(time.as_nanos(), 5_000_000_000); } #[test] - fn test_zero() { - let time = Time::ZERO; - assert_eq!(time.as_secs(), 0); - assert_eq!(time.as_millis(), 0); - assert_eq!(time.as_micros(), 0); - assert_eq!(time.as_nanos(), 0); - assert!(time.is_zero()); - } + fn test_timescale_new_rejects_zero_and_overflow() { + assert!(Timescale::new(0).is_err()); + assert!(Timescale::new(1).is_ok()); + assert_eq!(Timescale::new(1).unwrap(), Timescale::SECOND); + assert_eq!(Timescale::new(1_000).unwrap(), Timescale::MILLI); - #[test] - fn test_roundtrip_millis() { - let values = [0, 1, 100, 1000, 999999, 1_000_000_000]; - for &val in &values { - let time = Time::from_millis(val).unwrap(); - assert_eq!(time.as_millis(), val as u128); - } - } - - #[test] - fn test_roundtrip_micros() { - // Note: values < 1000 will lose precision when converting to milliseconds (SCALE=1000) - let values = [0, 1000, 1_000_000, 1_000_000_000]; - for &val in &values { - let time = Time::from_micros(val).unwrap(); - assert_eq!(time.as_micros(), val as u128); - } + // Above the QUIC varint range. + assert!(Timescale::new(1u64 << 62).is_err()); + // Right at the top of the varint range is still valid. + assert!(Timescale::new((1u64 << 62) - 1).is_ok()); } #[test] - fn test_different_scale_seconds() { - type TimeInSeconds = Timescale<1>; - let time = TimeInSeconds::from_secs(5).unwrap(); - assert_eq!(time.as_secs(), 5); - assert_eq!(time.as_millis(), 5000); + fn test_convert_to_finer() { + let time_ms = Timestamp::from_millis(5000).unwrap(); + let time_us = time_ms.convert(Timescale::MICRO).unwrap(); + assert_eq!(time_us.scale(), Timescale::MICRO); + assert_eq!(time_us.as_micros(), 5_000_000); } #[test] - fn test_different_scale_microseconds() { - type TimeInMicros = Timescale<1_000_000>; - let time = TimeInMicros::from_micros(5_000_000).unwrap(); - assert_eq!(time.as_secs(), 5); - assert_eq!(time.as_micros(), 5_000_000); + fn test_convert_to_coarser() { + let time_ms = Timestamp::from_millis(5000).unwrap(); + let time_s = time_ms.convert(Timescale::SECOND).unwrap(); + assert_eq!(time_s.scale(), Timescale::SECOND); + assert_eq!(time_s.as_secs(), 5); } #[test] - fn test_scale_conversion() { - // Converting 5000 milliseconds at scale 1000 to scale 1000 (should be identity) - let time = Time::from_scale(5000, 1000).unwrap(); - assert_eq!(time.as_millis(), 5000); - assert_eq!(time.as_secs(), 5); - - // Converting 5 seconds at scale 1 to scale 1000 - let time = Time::from_scale(5, 1).unwrap(); - assert_eq!(time.as_millis(), 5000); - assert_eq!(time.as_secs(), 5); + fn test_convert_precision_loss() { + // 1234 ms = 1.234 s, rounds down to 1 s + let time_ms = Timestamp::from_millis(1234).unwrap(); + let time_s = time_ms.convert(Timescale::SECOND).unwrap(); + assert_eq!(time_s.as_secs(), 1); } #[test] - fn test_add() { - let a = Time::from_secs(3).unwrap(); - let b = Time::from_secs(2).unwrap(); - let c = a + b; - assert_eq!(c.as_secs(), 5); - assert_eq!(c.as_millis(), 5000); + fn test_convert_roundtrip() { + let original = Timestamp::from_millis(5000).unwrap(); + let as_micros = original.convert(Timescale::MICRO).unwrap(); + let back = as_micros.convert(Timescale::MILLI).unwrap(); + assert_eq!(original.value(), back.value()); + assert_eq!(original.scale(), back.scale()); } #[test] - fn test_sub() { - let a = Time::from_secs(5).unwrap(); - let b = Time::from_secs(2).unwrap(); - let c = a - b; - assert_eq!(c.as_secs(), 3); - assert_eq!(c.as_millis(), 3000); + fn test_convert_same_scale() { + let time = Timestamp::from_millis(5000).unwrap(); + let converted = time.convert(Timescale::MILLI).unwrap(); + assert_eq!(time, converted); } #[test] - fn test_checked_add() { - let a = Time::from_millis(1000).unwrap(); - let b = Time::from_millis(2000).unwrap(); + fn test_add_same_scale() { + let a = Timestamp::from_millis(1000).unwrap(); + let b = Timestamp::from_millis(2000).unwrap(); let c = a.checked_add(b).unwrap(); assert_eq!(c.as_millis(), 3000); + assert_eq!(c.scale(), Timescale::MILLI); } #[test] - fn test_checked_sub() { - let a = Time::from_millis(5000).unwrap(); - let b = Time::from_millis(2000).unwrap(); - let c = a.checked_sub(b).unwrap(); - assert_eq!(c.as_millis(), 3000); + fn test_add_mismatched_scale() { + let a = Timestamp::from_millis(1000).unwrap(); + let b = Timestamp::from_micros(1000).unwrap(); + assert!(a.checked_add(b).is_err()); } #[test] - fn test_checked_sub_underflow() { - let a = Time::from_millis(1000).unwrap(); - let b = Time::from_millis(2000).unwrap(); + fn test_sub_underflow() { + let a = Timestamp::from_millis(1000).unwrap(); + let b = Timestamp::from_millis(2000).unwrap(); assert!(a.checked_sub(b).is_err()); } #[test] - fn test_max() { - let a = Time::from_secs(5).unwrap(); - let b = Time::from_secs(10).unwrap(); + fn test_max_same_scale() { + let a = Timestamp::from_secs(5).unwrap(); + let b = Timestamp::from_secs(10).unwrap(); assert_eq!(a.max(b), b); assert_eq!(b.max(a), b); } #[test] - fn test_duration_conversion() { - let duration = std::time::Duration::from_secs(5); - let time: Time = duration.try_into().unwrap(); - assert_eq!(time.as_secs(), 5); - assert_eq!(time.as_millis(), 5000); - - let duration_back: std::time::Duration = time.into(); - assert_eq!(duration_back.as_secs(), 5); - } - - #[test] - fn test_duration_with_nanos() { - let duration = std::time::Duration::new(5, 500_000_000); // 5.5 seconds - let time: Time = duration.try_into().unwrap(); - assert_eq!(time.as_millis(), 5500); - - let duration_back: std::time::Duration = time.into(); - assert_eq!(duration_back.as_millis(), 5500); + #[should_panic(expected = "mismatched timestamp scales")] + fn test_max_mismatched_scale_panics() { + let a = Timestamp::from_millis(1).unwrap(); + let b = Timestamp::from_secs(1).unwrap(); + let _ = a.max(b); } #[test] - fn test_fractional_conversion() { - // Test that 1500 millis = 1.5 seconds - let time = Time::from_millis(1500).unwrap(); - assert_eq!(time.as_secs(), 1); // Integer division - assert_eq!(time.as_millis(), 1500); - assert_eq!(time.as_micros(), 1_500_000); - } - - #[test] - fn test_precision_loss() { - // When converting from a finer scale to coarser, we lose precision - // 1234 micros = 1.234 millis, which rounds down to 1 millisecond internally - // When converting back, we get 1000 micros, not the original 1234 - let time = Time::from_micros(1234).unwrap(); - assert_eq!(time.as_millis(), 1); // 1234 micros = 1.234 millis, rounds to 1 - assert_eq!(time.as_micros(), 1000); // Precision lost: 1 milli = 1000 micros - } - - #[test] - fn test_scale_boundaries() { - // Test values near scale boundaries - let time = Time::from_millis(999).unwrap(); - assert_eq!(time.as_secs(), 0); - assert_eq!(time.as_millis(), 999); - - let time = Time::from_millis(1000).unwrap(); - assert_eq!(time.as_secs(), 1); - assert_eq!(time.as_millis(), 1000); - - let time = Time::from_millis(1001).unwrap(); - assert_eq!(time.as_secs(), 1); - assert_eq!(time.as_millis(), 1001); - } - - #[test] - fn test_large_values() { - // Test with large but valid values - let large_secs = 1_000_000_000u64; // ~31 years - let time = Time::from_secs(large_secs).unwrap(); - assert_eq!(time.as_secs(), large_secs); - } - - #[test] - fn test_new() { - let time = Time::new(5000); // 5000 in the current scale (millis) - assert_eq!(time.as_millis(), 5000); - assert_eq!(time.as_secs(), 5); - } - - #[test] - fn test_new_u64() { - let time = Time::new_u64(5000).unwrap(); - assert_eq!(time.as_millis(), 5000); - } - - #[test] - fn test_ordering() { - let a = Time::from_secs(1).unwrap(); - let b = Time::from_secs(2).unwrap(); + fn test_ordering_same_scale() { + let a = Timestamp::from_secs(1).unwrap(); + let b = Timestamp::from_secs(2).unwrap(); assert!(a < b); assert!(b > a); assert_eq!(a, a); } #[test] - fn test_unchecked_variants() { - let time = Time::from_secs_unchecked(5); - assert_eq!(time.as_secs(), 5); - - let time = Time::from_millis_unchecked(5000); - assert_eq!(time.as_millis(), 5000); - - let time = Time::from_micros_unchecked(5_000_000); - assert_eq!(time.as_micros(), 5_000_000); - - let time = Time::from_nanos_unchecked(5_000_000_000); - assert_eq!(time.as_nanos(), 5_000_000_000); - - let time = Time::from_scale_unchecked(5000, 1000); - assert_eq!(time.as_millis(), 5000); - } - - #[test] - fn test_as_scale() { - let time = Time::from_secs(1).unwrap(); - // 1 second in scale 1000 = 1000 - assert_eq!(time.as_scale(1000), 1000); - // 1 second in scale 1 = 1 - assert_eq!(time.as_scale(1), 1); - // 1 second in scale 1_000_000 = 1_000_000 - assert_eq!(time.as_scale(1_000_000), 1_000_000); - } - - #[test] - fn test_convert_to_finer() { - // Convert from milliseconds to microseconds (coarser to finer) - type TimeInMillis = Timescale<1_000>; - type TimeInMicros = Timescale<1_000_000>; - - let time_millis = TimeInMillis::from_millis(5000).unwrap(); - let time_micros: TimeInMicros = time_millis.convert().unwrap(); - - assert_eq!(time_micros.as_millis(), 5000); - assert_eq!(time_micros.as_micros(), 5_000_000); - } - - #[test] - fn test_convert_to_coarser() { - // Convert from milliseconds to seconds (finer to coarser) - type TimeInMillis = Timescale<1_000>; - type TimeInSeconds = Timescale<1>; - - let time_millis = TimeInMillis::from_millis(5000).unwrap(); - let time_secs: TimeInSeconds = time_millis.convert().unwrap(); - - assert_eq!(time_secs.as_secs(), 5); - assert_eq!(time_secs.as_millis(), 5000); - } - - #[test] - fn test_convert_precision_loss() { - // Converting 1234 millis to seconds loses precision - type TimeInMillis = Timescale<1_000>; - type TimeInSeconds = Timescale<1>; + fn test_ordering_across_known_scales() { + // Cross-scale ordering normalizes to a common scale. + let one_sec = Timestamp::from_secs(1).unwrap(); + let two_ms = Timestamp::from_millis(2).unwrap(); + assert!(one_sec > two_ms); + assert!(two_ms < one_sec); - let time_millis = TimeInMillis::from_millis(1234).unwrap(); - let time_secs: TimeInSeconds = time_millis.convert().unwrap(); + // Temporally-equivalent timestamps with different representations are NOT + // Equal under cmp: derived Eq compares fields, and Ord must agree. + let one_sec_b = Timestamp::from_millis(1000).unwrap(); + assert_ne!(one_sec.cmp(&one_sec_b), std::cmp::Ordering::Equal); + assert_ne!(one_sec, one_sec_b); + assert_eq!(one_sec.cmp(&one_sec), std::cmp::Ordering::Equal); - // 1234 millis = 1.234 seconds, rounds down to 1 second - assert_eq!(time_secs.as_secs(), 1); - assert_eq!(time_secs.as_millis(), 1000); // Lost 234 millis + // Mixed-scale sort lands in correct temporal order. + let mut items = [ + Timestamp::from_secs(2).unwrap(), + Timestamp::from_millis(500).unwrap(), + Timestamp::from_micros(1_500_000).unwrap(), + ]; + items.sort(); + assert_eq!(items[0], Timestamp::from_millis(500).unwrap()); + assert_eq!(items[1], Timestamp::from_micros(1_500_000).unwrap()); + assert_eq!(items[2], Timestamp::from_secs(2).unwrap()); } #[test] - fn test_convert_roundtrip() { - // Converting to finer and back should preserve value - type TimeInMillis = Timescale<1_000>; - type TimeInMicros = Timescale<1_000_000>; - - let original = TimeInMillis::from_millis(5000).unwrap(); - let as_micros: TimeInMicros = original.convert().unwrap(); - let back_to_millis: TimeInMillis = as_micros.convert().unwrap(); - - assert_eq!(original.as_millis(), back_to_millis.as_millis()); - } - - #[test] - fn test_convert_same_scale() { - // Converting to the same scale should be identity - type TimeInMillis = Timescale<1_000>; - - let time = TimeInMillis::from_millis(5000).unwrap(); - let converted: TimeInMillis = time.convert().unwrap(); - - assert_eq!(time.as_millis(), converted.as_millis()); - } - - #[test] - fn test_convert_microseconds_to_nanoseconds() { - type TimeInMicros = Timescale<1_000_000>; - type TimeInNanos = Timescale<1_000_000_000>; - - let time_micros = TimeInMicros::from_micros(5_000_000).unwrap(); - let time_nanos: TimeInNanos = time_micros.convert().unwrap(); - - assert_eq!(time_nanos.as_micros(), 5_000_000); - assert_eq!(time_nanos.as_nanos(), 5_000_000_000); - } - - #[test] - fn test_convert_custom_scales() { - // Test with unusual custom scales - type TimeScale60 = Timescale<60>; // 60Hz - type TimeScale90 = Timescale<90>; // 90Hz - - let time60 = TimeScale60::from_scale(120, 60).unwrap(); // 2 seconds at 60Hz - let time90: TimeScale90 = time60.convert().unwrap(); + fn test_duration_conversion() { + let duration = std::time::Duration::from_secs(5); + let time: Timestamp = duration.try_into().unwrap(); + assert_eq!(time.scale(), Timescale::NANO); + assert_eq!(time.as_secs(), 5); - // Both should represent 2 seconds - assert_eq!(time60.as_secs(), 2); - assert_eq!(time90.as_secs(), 2); + let duration_back: std::time::Duration = time.into(); + assert_eq!(duration_back.as_secs(), 5); } #[test] fn test_debug_format_units() { - // Test that Debug chooses appropriate units based on value - - // Milliseconds that are clean seconds - let t = Time::from_millis(100000).unwrap(); + let t = Timestamp::from_millis(100_000).unwrap(); assert_eq!(format!("{:?}", t), "100s"); - let t = Time::from_millis(1000).unwrap(); - assert_eq!(format!("{:?}", t), "1s"); - - // Milliseconds that are clean milliseconds - let t = Time::from_millis(100).unwrap(); + let t = Timestamp::from_millis(100).unwrap(); assert_eq!(format!("{:?}", t), "100ms"); - let t = Time::from_millis(5500).unwrap(); - assert_eq!(format!("{:?}", t), "5500ms"); - - // Zero - let t = Time::ZERO; - assert_eq!(format!("{:?}", t), "0s"); - - // Test with microsecond-scale time - type TimeMicros = Timescale<1_000_000>; - let t = TimeMicros::from_micros(1500).unwrap(); + let t = Timestamp::from_micros(1500).unwrap(); assert_eq!(format!("{:?}", t), "1500µs"); - let t = TimeMicros::from_micros(1000).unwrap(); + let t = Timestamp::from_micros(1000).unwrap(); assert_eq!(format!("{:?}", t), "1ms"); } + + #[test] + fn test_new() { + let t = Timestamp::new(5000, Timescale::MILLI).unwrap(); + assert_eq!(t.value(), 5000); + assert_eq!(t.scale(), Timescale::MILLI); + assert_eq!(t.as_millis(), 5000); + } + + #[test] + fn test_custom_scale_convert() { + // 120 units at 60Hz = 2 seconds, expressed at 1000Hz = 2000 ms. + let scale_60 = Timescale::new(60).unwrap(); + let t = Timestamp::new(120, scale_60) + .unwrap() + .convert(Timescale::MILLI) + .unwrap(); + assert_eq!(t.scale(), Timescale::MILLI); + assert_eq!(t.as_millis(), 2000); + } }