Skip to content

Commit 3e3bfe7

Browse files
authored
feat: add StreamTrait::now() and overhaul StreamInstant API (#1139)
- Adds now() to StreamTrait so callers can query the stream clock outside the audio callback on the same time base as callback/capture/playback timestamps. - StreamInstant is reworked to mirror std::time::Instant with u64 storage (all stream clocks are monotonic and non-negative) and similar functions parameters, and return types. Closes #472
1 parent 63ea9ff commit 3e3bfe7

26 files changed

Lines changed: 835 additions & 386 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
- `StreamConfig` now implements `Copy`.
1515
- `StreamTrait::buffer_size()` to query the stream's current buffer size in frames per callback.
1616
- `device_by_id` is now dispatched to each backend's implementation, allowing to override it.
17+
- `StreamTrait::now()` to query the current instant on the stream's clock.
18+
- `StreamInstant` API changed and extended to mirror `std::time::Instant`/`Duration`. See
19+
[UPGRADING.md](UPGRADING.md) for migration details.
1720
- **ALSA**: `device_by_id` now accepts PCM shorthand names such as `hw:0,0` and `plughw:foo`.
1821
- **PipeWire**: New host for Linux and some BSDs using the PipeWire API.
1922
- **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API.
@@ -48,6 +51,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4851
- **WebAudio**: Timestamps now include base and output latency.
4952
- **WebAudio**: Initial buffer scheduling offset now scales with buffer duration.
5053

54+
### Removed
55+
56+
- Replaced `StreamInstant::add()` and `sub()` by `checked_add()`/`+` and `checked_sub()`/`-`.
57+
5158
### Fixed
5259

5360
- Reintroduce `audio_thread_priority` feature.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,14 @@ clap = { version = ">=4.0, <=4.5.57", features = ["derive"] }
8585
# When updating this, also update the "windows-version" matrix in the CI workflow.
8686
[target.'cfg(target_os = "windows")'.dependencies]
8787
windows = { version = ">=0.59, <=0.62", features = [
88+
"Win32_Media",
8889
"Win32_Media_Audio",
8990
"Win32_Foundation",
9091
"Win32_Devices_Properties",
9192
"Win32_Media_KernelStreaming",
9293
"Win32_System_Com_StructuredStorage",
9394
"Win32_System_Threading",
95+
"Win32_System_Performance",
9496
"Win32_Security",
9597
"Win32_System_SystemServices",
9698
"Win32_System_Variant",
@@ -183,6 +185,7 @@ ndk = { version = "0.9", default-features = false, features = [
183185
] }
184186
ndk-context = "0.1"
185187
jni = "0.21"
188+
libc = "0.2"
186189
num-derive = "0.4"
187190
num-traits = "0.2"
188191

UPGRADING.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ This guide covers breaking changes requiring code updates. See [CHANGELOG.md](CH
88
- [ ] Optionally handle the new `DeviceBusy` variant for retryable device errors
99
- [ ] Change `build_*_stream` call sites to pass `StreamConfig` by value (drop the `&`)
1010
- [ ] For custom hosts, change `DeviceTrait` implementations to accept `StreamConfig` by value.
11+
- [ ] Remove `instant.duration_since(e)` unwraps; it now returns `Duration` (saturating).
12+
- [ ] Change `instant.add(d)` to `instant.checked_add(d)` (or use `instant + d`).
13+
- [ ] Change `instant.sub(d)` to `instant.checked_sub(d)` (or use `instant - d`).
14+
- [ ] Update `StreamInstant::new(secs, nanos)` call sites: `secs` is now `u64`.
15+
- [ ] Update `StreamInstant::from_nanos(nanos)` call sites: `nanos` is now `u64`.
16+
- [ ] Update `duration_since` call sites to pass by value (drop the `&`).
1117

1218
## 1. Error enums are now `#[non_exhaustive]`
1319

@@ -61,6 +67,73 @@ let stream = device.build_output_stream(config, data_fn, err_fn, None)?;
6167

6268
If you implement `DeviceTrait` on your own type (via the `custom` feature), update your `build_input_stream_raw` and `build_output_stream_raw` signatures from `config: &StreamConfig` to `config: StreamConfig`. Any `config.clone()` calls before `move` closures can also be removed.
6369

70+
## 4. `StreamInstant` API overhaul
71+
72+
The `StreamInstant` API has been aligned with `std::time::Instant` and `std::time::Duration`.
73+
74+
### `duration_since` now returns `Duration` (saturating)
75+
76+
**What changed:** `duration_since` now returns `Duration` directly, saturating to `Duration::ZERO`
77+
when the argument is later than `self`, instead of returning `Option<Duration>`.
78+
79+
```rust
80+
// Before (v0.17): returned Option<Duration>, argument by reference
81+
if let Some(d) = callback.duration_since(&start) {
82+
println!("elapsed: {d:?}");
83+
}
84+
85+
// After (v0.18): returns Duration (saturating), argument by value
86+
let d = callback.duration_since(start);
87+
println!("elapsed: {d:?}");
88+
89+
// For the previous Option-returning behaviour, use checked_duration_since:
90+
if let Some(d) = callback.checked_duration_since(start) {
91+
println!("elapsed: {d:?}");
92+
}
93+
```
94+
95+
**Why:** Mirrors the saturating behavior of `std::time::Instant::saturating_duration_since` in the Rust standard library.
96+
97+
### `add` / `sub` renamed to `checked_add` / `checked_sub`; operator impls added
98+
99+
**What changed:** The `add` and `sub` methods (which returned `Option`) are replaced by
100+
`checked_add` / `checked_sub` with the same semantics. `+`, `-`, `+=`, and `-=` operator impls
101+
are also added.
102+
103+
```rust
104+
// Before (v0.17)
105+
let future = instant.add(Duration::from_millis(10)).expect("overflow");
106+
let past = instant.sub(Duration::from_millis(10)).expect("underflow");
107+
108+
// After (v0.18): explicit checked form (same semantics):
109+
let future = instant.checked_add(Duration::from_millis(10)).expect("overflow");
110+
let past = instant.checked_sub(Duration::from_millis(10)).expect("underflow");
111+
112+
// Or use the operator (panics on overflow, like std::time::Instant):
113+
let future = instant + Duration::from_millis(10);
114+
let past = instant - Duration::from_millis(10);
115+
116+
// Subtract two instants to get a Duration (saturates to zero):
117+
let elapsed: Duration = later - earlier;
118+
```
119+
120+
**Why:** Aligns the API with `std::time::Instant`, making `StreamInstant` more idiomatic.
121+
122+
### `new` and `from_nanos` take unsigned integers
123+
124+
**What changed:** The `secs` parameter of `StreamInstant::new` and the `nanos` parameter of
125+
`StreamInstant::from_nanos` are now `u64` instead of `i64`.
126+
127+
```rust
128+
// Before (v0.17): negative seconds were accepted
129+
StreamInstant::new(-1_i64, 0);
130+
131+
// After (v0.18): all stream clocks are non-negative
132+
StreamInstant::new(0_u64, 0);
133+
```
134+
135+
**Why:** All audio host clocks are positive and monotonic; they are never negative.
136+
64137
---
65138

66139
# Upgrading from v0.16 to v0.17

examples/custom.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::sync::{
22
atomic::{AtomicBool, Ordering},
33
Arc,
44
};
5+
use std::time::Instant;
56

67
use cpal::{
78
traits::{DeviceTrait, HostTrait, StreamTrait},
@@ -19,7 +20,10 @@ struct MyDevice;
1920
// Only Send+Sync is needed
2021
struct MyStream {
2122
controls: Arc<StreamControls>,
22-
// option is needed since joining a thread takes ownership,
23+
// The instant the audio thread was started; shared with now() so that
24+
// callback timestamps and now() are on the same time base.
25+
start: Instant,
26+
// Option is needed since joining a thread takes ownership,
2327
// and we want to do that on drop (gives us &mut self, not self)
2428
handle: Option<std::thread::JoinHandle<()>>,
2529
}
@@ -138,9 +142,9 @@ impl DeviceTrait for MyDevice {
138142
pause: AtomicBool::new(true), // streams are expected to start out paused by default
139143
});
140144

145+
let start = Instant::now();
141146
let thread_controls = controls.clone();
142147
let handle = std::thread::spawn(move || {
143-
let start = std::time::Instant::now();
144148
let mut buffer = [0.0_f32; 4096];
145149
while !thread_controls.exit.load(Ordering::Relaxed) {
146150
std::thread::sleep(std::time::Duration::from_secs_f32(
@@ -161,7 +165,7 @@ impl DeviceTrait for MyDevice {
161165
)
162166
};
163167

164-
let duration = std::time::Instant::now().duration_since(start);
168+
let duration = Instant::now().duration_since(start);
165169
let secs = duration.as_nanos() / 1_000_000_000;
166170
let subsec_nanos = duration.as_nanos() - secs * 1_000_000_000;
167171
let stream_instant = cpal::StreamInstant::new(secs as _, subsec_nanos as _);
@@ -178,6 +182,7 @@ impl DeviceTrait for MyDevice {
178182

179183
Ok(MyStream {
180184
controls,
185+
start,
181186
handle: Some(handle),
182187
})
183188
}
@@ -193,6 +198,11 @@ impl StreamTrait for MyStream {
193198
self.controls.pause.store(true, Ordering::Relaxed);
194199
Ok(())
195200
}
201+
202+
fn now(&self) -> cpal::StreamInstant {
203+
let elapsed = self.start.elapsed();
204+
cpal::StreamInstant::new(elapsed.as_secs(), elapsed.subsec_nanos())
205+
}
196206
}
197207

198208
// streams are expected to stop when dropped
@@ -315,9 +325,7 @@ pub fn make_stream(
315325
config,
316326
move |output: &mut [f32], _: &cpal::OutputCallbackInfo| {
317327
// for 0-1s play sine, 1-2s play square, 2-3s play saw, 3-4s play triangle_wave
318-
let time_since_start = std::time::Instant::now()
319-
.duration_since(time_at_start)
320-
.as_secs_f32();
328+
let time_since_start = Instant::now().duration_since(time_at_start).as_secs_f32();
321329
if time_since_start < 1.0 {
322330
oscillator.set_waveform(Waveform::Sine);
323331
} else if time_since_start < 2.0 {

src/host/aaudio/convert.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use std::convert::TryInto;
2-
use std::time::Duration;
1+
//! Time-conversion helpers for the AAudio backend.
32
43
extern crate ndk;
54

@@ -8,21 +7,26 @@ use crate::{
87
StreamInstant,
98
};
109

11-
pub fn to_stream_instant(duration: Duration) -> StreamInstant {
12-
StreamInstant::new(
13-
duration.as_secs().try_into().unwrap(),
14-
duration.subsec_nanos(),
15-
)
10+
/// Returns a [`StreamInstant`] for the current moment.
11+
pub fn now_stream_instant() -> StreamInstant {
12+
let mut ts = libc::timespec {
13+
tv_sec: 0,
14+
tv_nsec: 0,
15+
};
16+
let res = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
17+
assert_eq!(res, 0, "clock_gettime(CLOCK_MONOTONIC) failed");
18+
StreamInstant::new(ts.tv_sec as u64, ts.tv_nsec as u32)
1619
}
1720

21+
/// Returns the [`StreamInstant`] of the most recent audio frame transferred by `stream`.
1822
pub fn stream_instant(stream: &ndk::audio::AudioStream) -> StreamInstant {
1923
let ts = stream
2024
.timestamp(ndk::audio::Clockid::Monotonic)
2125
.unwrap_or(ndk::audio::Timestamp {
2226
frame_position: 0,
2327
time_nanoseconds: 0,
2428
});
25-
to_stream_instant(Duration::from_nanos(ts.time_nanoseconds as u64))
29+
StreamInstant::from_nanos(ts.time_nanoseconds as u64)
2630
}
2731

2832
impl From<ndk::audio::AudioError> for StreamError {

src/host/aaudio/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ use std::cmp;
66
use std::convert::TryInto;
77
use std::sync::atomic::{AtomicI32, Ordering};
88
use std::sync::{Arc, Mutex};
9-
use std::time::{Duration, Instant};
9+
use std::time::Duration;
1010
use std::vec::IntoIter as VecIntoIter;
1111

1212
extern crate ndk;
1313

14-
use convert::{stream_instant, to_stream_instant};
14+
use convert::{now_stream_instant, stream_instant};
1515
use java_interface::{AudioDeviceInfo, AudioManager};
1616

1717
use crate::traits::{DeviceTrait, HostTrait, StreamTrait};
@@ -316,13 +316,12 @@ where
316316
E: FnMut(StreamError) + Send + 'static,
317317
{
318318
let builder = configure_for_device(builder, device, config);
319-
let created = Instant::now();
320319
let channel_count = config.channels as i32;
321320
let stream = builder
322321
.data_callback(Box::new(move |stream, data, num_frames| {
323322
let cb_info = InputCallbackInfo {
324323
timestamp: InputStreamTimestamp {
325-
callback: to_stream_instant(created.elapsed()),
324+
callback: now_stream_instant(),
326325
capture: stream_instant(stream),
327326
},
328327
};
@@ -366,7 +365,6 @@ where
366365
E: FnMut(StreamError) + Send + 'static,
367366
{
368367
let builder = configure_for_device(builder, device, config);
369-
let created = Instant::now();
370368
let channel_count = config.channels as i32;
371369
let tune_dynamically = config.buffer_size == BufferSize::Default;
372370

@@ -378,7 +376,7 @@ where
378376
// Deliver audio data to user callback
379377
let cb_info = OutputCallbackInfo {
380378
timestamp: OutputStreamTimestamp {
381-
callback: to_stream_instant(created.elapsed()),
379+
callback: now_stream_instant(),
382380
playback: stream_instant(stream),
383381
},
384382
};
@@ -719,6 +717,10 @@ impl StreamTrait for Stream {
719717
}
720718
}
721719

720+
fn now(&self) -> crate::StreamInstant {
721+
now_stream_instant()
722+
}
723+
722724
fn buffer_size(&self) -> Option<crate::FrameCount> {
723725
let stream = self.inner.lock().ok()?;
724726

src/host/alsa/mod.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use crate::{
2727
DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceDirection,
2828
DeviceId, DeviceIdError, DeviceNameError, DevicesError, FrameCount, InputCallbackInfo,
2929
OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig,
30-
StreamError, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange,
31-
SupportedStreamConfigsError,
30+
StreamError, StreamInstant, SupportedBufferSize, SupportedStreamConfig,
31+
SupportedStreamConfigRange, SupportedStreamConfigsError,
3232
};
3333

3434
mod enumerate;
@@ -1065,11 +1065,8 @@ fn process_input(
10651065
}?;
10661066
let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
10671067
let capture = callback
1068-
.sub(delay_duration)
1069-
.ok_or_else(|| BackendSpecificError {
1070-
description: "`capture` is earlier than representation supported by `StreamInstant`"
1071-
.to_string(),
1072-
})?;
1068+
.checked_sub(delay_duration)
1069+
.unwrap_or(StreamInstant::ZERO);
10731070
let timestamp = crate::InputStreamTimestamp { callback, capture };
10741071
let info = crate::InputCallbackInfo { timestamp };
10751072
data_callback(&data, &info);
@@ -1098,12 +1095,7 @@ fn process_output(
10981095
stream_timestamp_fallback(stream.creation_instant)
10991096
}?;
11001097
let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
1101-
let playback = callback
1102-
.add(delay_duration)
1103-
.ok_or_else(|| BackendSpecificError {
1104-
description: "`playback` occurs beyond representation supported by `StreamInstant`"
1105-
.to_string(),
1106-
})?;
1098+
let playback = callback + delay_duration;
11071099
let timestamp = crate::OutputStreamTimestamp { callback, playback };
11081100
let info = crate::OutputCallbackInfo { timestamp };
11091101
data_callback(&mut data, &info);
@@ -1135,7 +1127,7 @@ fn process_output(
11351127
#[inline]
11361128
fn stream_timestamp_hardware(
11371129
status: &alsa::pcm::Status,
1138-
) -> Result<crate::StreamInstant, BackendSpecificError> {
1130+
) -> Result<StreamInstant, BackendSpecificError> {
11391131
let trigger_ts = status.get_trigger_htstamp();
11401132
// trigger_htstamp records when the PCM stream started.
11411133
// On the first few callbacks, it might not have been set yet,
@@ -1156,7 +1148,7 @@ fn stream_timestamp_hardware(
11561148
);
11571149
return Err(BackendSpecificError { description });
11581150
}
1159-
Ok(crate::StreamInstant::from_nanos(nanos))
1151+
Ok(StreamInstant::from_nanos(nanos as u64))
11601152
}
11611153

11621154
// Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable.
@@ -1165,12 +1157,13 @@ fn stream_timestamp_hardware(
11651157
#[inline]
11661158
fn stream_timestamp_fallback(
11671159
creation: std::time::Instant,
1168-
) -> Result<crate::StreamInstant, BackendSpecificError> {
1160+
) -> Result<StreamInstant, BackendSpecificError> {
11691161
let now = std::time::Instant::now();
11701162
let duration = now.duration_since(creation);
1171-
crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError {
1172-
description: "stream duration has exceeded `StreamInstant` representation".to_string(),
1173-
})
1163+
Ok(StreamInstant::new(
1164+
duration.as_secs(),
1165+
duration.subsec_nanos(),
1166+
))
11741167
}
11751168

11761169
// Adapted from `timestamp2ns` here:
@@ -1284,6 +1277,18 @@ impl StreamTrait for Stream {
12841277
self.inner.channel.pause(true).ok();
12851278
Ok(())
12861279
}
1280+
fn now(&self) -> StreamInstant {
1281+
if self.inner.use_hw_timestamps {
1282+
if let Ok(status) = self.inner.channel.status() {
1283+
if let Ok(instant) = stream_timestamp_hardware(&status) {
1284+
return instant;
1285+
}
1286+
}
1287+
}
1288+
stream_timestamp_fallback(self.inner.creation_instant)
1289+
.expect("stream duration exceeded `StreamInstant` range")
1290+
}
1291+
12871292
fn buffer_size(&self) -> Option<FrameCount> {
12881293
Some(self.inner.period_frames as FrameCount)
12891294
}

0 commit comments

Comments
 (0)