diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e9ea80f2..c8900faf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,7 +71,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **ASIO**: `Device::driver`, `asio_streams`, and `current_callback_flag` are no longer `pub`. - **ASIO**: Timestamps now include driver-reported hardware latency. - **ASIO**: Hardware latency is now re-queried when the driver reports `kAsioLatenciesChanged`. -- **ASIO**: Stream error callback now receives `ErrorKind::Xrun` on `kAsioResyncRequest`. +- **ASIO**: Stream error callback now receives `ErrorKind::StreamInvalidated` on + `kAsioResyncRequest`. - **ASIO**: Stream error callback now receives `ErrorKind::StreamInvalidated` when the driver reports a sample rate change (`sampleRateDidChange`) of 1 Hz or more from the configured rate. - **AudioWorklet**: `BufferSize::Fixed` now sets `renderSizeHint` on the `AudioContext`. @@ -90,6 +91,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **CoreAudio**: Stream error callback now receives `ErrorKind::DeviceChanged` on iOS when headphones are unplugged. - **CoreAudio**: User timeouts are now respected when building a stream. +- **CoreAudio**: Streams no longer start automatically on creation; call `play()` manually. - **CoreAudio (iOS)**: `default_input_config()` and `default_output_config()` now prefer 48 kHz, then 44.1 kHz, then the maximum supported sample rate, instead of always taking the maximum. - **JACK**: Timestamps now use the precise hardware deadline. @@ -103,6 +105,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 port-connection failures. - **JACK**: Stream error callback now receives `ErrorKind::RealtimeDenied` once if the process callback is not running at real-time scheduling priority. +- **JACK**: Streams no longer start automatically on creation; call `play()` manually. - **Linux/BSD**: Default host in order from first to last available now is: PipeWire, PulseAudio, ALSA. - **WASAPI**: Raise `windows` dependency lower bound to 0.61. @@ -156,6 +159,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **ASIO**: Poisoned stream locks now return `ErrorKind::StreamInvalidated` instead of panicking. - **ASIO**: Output buffers are now zero-filled before the callback runs. - **ASIO**: Fix `driver.sample_rate()` failures at stream creation being silently ignored. +- **ASIO**: Fix callbacks firing before `build_*_stream` returns the `Stream` handle. +- **ASIO**: Fix overrun not being reported when the driver reports `kAsioOverload`. - **CoreAudio**: Fix default output streams silently stopping when the system default output device is unplugged; they now reroute automatically or report `ErrorKind::DeviceNotAvailable`. - **CoreAudio**: Fix undefined behaviour and silent failure in loopback device creation. diff --git a/UPGRADING.md b/UPGRADING.md index ea10a6507..3ab4beb75 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -20,6 +20,8 @@ This guide covers breaking changes requiring code updates. See [CHANGELOG.md](CH - [ ] If you relied on the default config returning `F32`, pin the sample format explicitly. - [ ] **JACK**: Handle or discard the new `Result` from `Stream::connect_to_system_outputs()` and `Stream::connect_to_system_inputs()`. +- [ ] **CoreAudio, JACK**: Add an explicit `stream.play()` call after `build_*_stream()` if you + were relying these backends to auto-start streams. ## 1. Unified `Error` and `ErrorKind` type @@ -237,7 +239,28 @@ explicitly, rename it to `realtime-dbus`. If you relied on the opt-out behavior, **Why:** Real-time scheduling is the correct default for audio applications. The previous opt-in made it easy to accidentally ship without it. -## 7. `wasm32-unknown-emscripten` target removed +## 7. Streams are returned paused on every backend + +**What changed:** `build_input_stream` and `build_output_stream` now return a paused `Stream` on +every backend. Previously, CoreAudio and JACK started the stream automatically. + +```rust +// Before (v0.17): on CoreAudio/JACK the stream was already running +let stream = device.build_output_stream(config, data_fn, err_fn, None)?; + +// After (v0.18): every backend requires play() +let stream = device.build_output_stream(config, data_fn, err_fn, None)?; +stream.play()?; +``` + +**Impact:** If you were targeting CoreAudio or JACK and never called `play()`, your callback will +never fire after upgrading. Add the `play()` call. + +**Why:** Auto-starting before the caller has the `Stream` handle creates a window where data and +error callbacks can fire before the application can pause, stop, or drop the stream. Other hosts +already required `play()`. The behavior is now uniform. + +## 8. `wasm32-unknown-emscripten` target removed **What changed:** The `emscripten` audio host and the `wasm32-unknown-emscripten` build target are no longer supported. diff --git a/src/host/aaudio/mod.rs b/src/host/aaudio/mod.rs index 657884778..0c3037a0b 100644 --- a/src/host/aaudio/mod.rs +++ b/src/host/aaudio/mod.rs @@ -327,8 +327,6 @@ where let error_callback: ErrorCallbackArc = Arc::new(Mutex::new(error_callback)); let error_callback_for_stream = error_callback.clone(); - // RT check: run once on the first callback invocation to avoid delivering RealtimeDenied - // before the Stream handle is returned to the caller. #[cfg(feature = "realtime")] let mut rt_checked = false; #[cfg(feature = "realtime")] @@ -408,8 +406,6 @@ where let error_callback: ErrorCallbackArc = Arc::new(Mutex::new(error_callback)); let error_callback_for_stream = error_callback.clone(); - // RT check: run once on the first callback invocation to avoid delivering RealtimeDenied - // before the Stream handle is returned to the caller. #[cfg(feature = "realtime")] let mut rt_checked = false; #[cfg(feature = "realtime")] diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index d53217b57..1cce43e66 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -22,6 +22,7 @@ use crate::{ host::{ equilibrium::{fill_equilibrium, DSD_EQUILIBRIUM_BYTE, U8_EQUILIBRIUM_BYTE}, frames_to_duration, + latch::Latch, }, iter::{SupportedInputConfigs, SupportedOutputConfigs}, traits::{DeviceTrait, HostTrait, StreamTrait}, @@ -236,6 +237,7 @@ impl DeviceTrait for Device { error_callback, timeout, ); + stream.signal_ready(); Ok(stream) } @@ -259,6 +261,7 @@ impl DeviceTrait for Device { error_callback, timeout, ); + stream.signal_ready(); Ok(stream) } } @@ -773,6 +776,10 @@ pub struct Stream { /// Keeps the read end of the self-pipe alive for the lifetime of the Stream, so that /// `trigger.wakeup()` never writes to a closed pipe, even if the worker exited early. _rx: Arc, + + /// Latch that prevents the worker thread from firing callbacks until the caller has received + /// the `Stream` handle. + latch: Latch, } // Compile-time assertion that Stream is Send and Sync @@ -1251,6 +1258,11 @@ fn htstamp_elapsed(status: &alsa::pcm::Status, origin: libc::timespec) -> Stream } impl Stream { + /// Releases the latch so the worker thread can begin processing audio callbacks. + pub(crate) fn signal_ready(&self) { + self.latch.release(); + } + fn new_input( inner: Arc, mut data_callback: D, @@ -1265,16 +1277,15 @@ impl Stream { let rx_thread = rx.clone(); let stream = inner.clone(); - // The barrier prevents the worker from firing data callbacks before the caller has - // received the Stream handle. Without it, callbacks could arrive before the caller can - // pause, stop, or drop the stream. - let ready = std::sync::Arc::new(std::sync::Barrier::new(2)); - let ready_worker = ready.clone(); + // The latch is released just before the `Stream` is returned so the worker cannot fire any + // callbacks before the caller has the handle. + let mut latch = Latch::new(); + let waiter = latch.waiter(); let thread = thread::Builder::new() .name("cpal_alsa_in".to_owned()) .spawn(move || { - ready_worker.wait(); + waiter.wait(); input_stream_worker( rx_thread, &stream, @@ -1284,15 +1295,15 @@ impl Stream { ); }) .unwrap(); - let stream = Self { + latch.add_thread(thread.thread().clone()); + + Self { thread: Some(thread), inner, trigger: tx, _rx: rx, - }; - - ready.wait(); - stream + latch, + } } fn new_output( @@ -1309,16 +1320,15 @@ impl Stream { let rx_thread = rx.clone(); let stream = inner.clone(); - // The barrier prevents the worker from firing data callbacks before the caller has - // received the Stream handle. Without it, callbacks could arrive before the caller can - // pause, stop, or drop the stream. - let ready = std::sync::Arc::new(std::sync::Barrier::new(2)); - let ready_worker = ready.clone(); + // The latch is released just before the `Stream` is returned so the worker cannot fire any + // callbacks before the caller has the handle. + let mut latch = Latch::new(); + let waiter = latch.waiter(); let thread = thread::Builder::new() .name("cpal_alsa_out".to_owned()) .spawn(move || { - ready_worker.wait(); + waiter.wait(); output_stream_worker( rx_thread, &stream, @@ -1328,21 +1338,23 @@ impl Stream { ); }) .unwrap(); + latch.add_thread(thread.thread().clone()); - let stream = Self { + Self { thread: Some(thread), inner, trigger: tx, _rx: rx, - }; - - ready.wait(); - stream + latch, + } } } impl Drop for Stream { fn drop(&mut self) { + // Unblock the worker in case the stream is dropped before signal_ready() was + // called. Idempotent: no effect if the worker is already running. + self.signal_ready(); self.inner.dropping.store(true, Ordering::Release); self.trigger.wakeup(); if let Some(handle) = self.thread.take() { diff --git a/src/host/asio/stream.rs b/src/host/asio/stream.rs index e64459f57..72005381f 100644 --- a/src/host/asio/stream.rs +++ b/src/host/asio/stream.rs @@ -3,8 +3,8 @@ extern crate num_traits; use std::{ sync::{ - atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, - Arc, Mutex, + atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering}, + mpsc, Arc, Mutex, }, time::Duration, }; @@ -12,7 +12,11 @@ use std::{ use self::num_traits::{FromPrimitive, PrimInt}; use super::Device; use crate::{ - host::{com, frames_to_duration}, + host::{ + com, + error_emit::{emit_error, try_emit_error}, + frames_to_duration, + }, BufferSize, Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, SampleFormat, SampleRate, StreamConfig, StreamInstant, I24, @@ -52,11 +56,35 @@ impl TimeBase { } } +/// Matches the `startTimer(500)` call JUCE uses for debouncing ASIO driver event notifications. +const ASIO_EVENT_DEBOUNCE: Duration = Duration::from_millis(500); + +#[repr(u8)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] +enum StreamState { + #[default] + Starting = 0, + Paused = 1, + Playing = 2, +} + +impl StreamState { + fn load(atom: &AtomicU8) -> Self { + match atom.load(Ordering::Acquire) { + 1 => StreamState::Paused, + 2 => StreamState::Playing, + _ => StreamState::Starting, + } + } + + fn store(atom: &AtomicU8, state: StreamState) { + atom.store(state as u8, Ordering::Release); + } +} + pub struct Stream { - playing: Arc, - // Ensure the `Driver` does not terminate until the last stream is dropped. + state: Arc, driver: Arc, - #[allow(dead_code)] asio_streams: Arc>, callback_id: sys::BufferCallbackId, driver_event_callback_id: sys::DriverEventCallbackId, @@ -77,12 +105,12 @@ impl Stream { } pub fn play(&self) -> Result<(), Error> { - self.playing.store(true, Ordering::Release); + StreamState::store(&self.state, StreamState::Playing); Ok(()) } pub fn pause(&self) -> Result<(), Error> { - self.playing.store(false, Ordering::Release); + StreamState::store(&self.state, StreamState::Paused); Ok(()) } @@ -159,15 +187,23 @@ impl Device { .unwrap_or(0), )); - let driver_event_callback_id = self.add_event_callback( - &driver, - error_callback, - Arc::clone(&hardware_input_latency), - true, - ); + let state = Arc::new(AtomicU8::new(StreamState::Starting as u8)); + let driver_event_callback_id = self + .add_event_callback( + &driver, + error_callback, + Arc::clone(&hardware_input_latency), + true, + Arc::clone(&state), + ) + .inspect_err(|_| { + // Roll back the input stream stored by get_or_create_input_stream. + if let Ok(mut streams) = self.asio_streams.lock() { + streams.input = None; + } + })?; - let stream_playing = Arc::new(AtomicBool::new(false)); - let playing = Arc::clone(&stream_playing); + let state_cb = Arc::clone(&state); let asio_streams = self.asio_streams.clone(); let mut current_buffer_size = buffer_size as i32; let mut last_buffer_index: i32 = -1; @@ -178,8 +214,8 @@ impl Device { // Set the input callback. // This is most performance critical part of the ASIO bindings. let callback_id = driver.add_callback(move |callback_info| unsafe { - // If not playing return early. - if !playing.load(Ordering::Acquire) { + // If not playing, return early. + if StreamState::load(&state_cb) != StreamState::Playing { return; } @@ -401,10 +437,18 @@ impl Device { let driver = Arc::new(driver); let asio_streams = self.asio_streams.clone(); - driver.start().map_err(build_stream_err)?; + if let Err(e) = driver.start() { + driver.remove_event_callback(driver_event_callback_id); + driver.remove_callback(callback_id); + if let Ok(mut streams) = asio_streams.lock() { + streams.input = None; + } + return Err(build_stream_err(e)); + } + StreamState::store(&state, StreamState::Paused); Ok(Stream { - playing: stream_playing, + state, driver, asio_streams, callback_id, @@ -473,15 +517,23 @@ impl Device { .unwrap_or(0), )); - let driver_event_callback_id = self.add_event_callback( - &driver, - error_callback, - Arc::clone(&hardware_output_latency), - false, - ); + let state = Arc::new(AtomicU8::new(StreamState::Starting as u8)); + let driver_event_callback_id = self + .add_event_callback( + &driver, + error_callback, + Arc::clone(&hardware_output_latency), + false, + Arc::clone(&state), + ) + .inspect_err(|_| { + // Roll back the output stream stored by get_or_create_output_stream. + if let Ok(mut streams) = self.asio_streams.lock() { + streams.output = None; + } + })?; - let stream_playing = Arc::new(AtomicBool::new(false)); - let playing = Arc::clone(&stream_playing); + let state_cb = Arc::clone(&state); let asio_streams = self.asio_streams.clone(); let mut current_buffer_size = buffer_size as i32; let mut last_buffer_index: i32 = -1; @@ -491,7 +543,7 @@ impl Device { let callback_id = driver.add_callback(move |callback_info| unsafe { // If not playing, return early. - if !playing.load(Ordering::Acquire) { + if StreamState::load(&state_cb) != StreamState::Playing { return; } @@ -764,10 +816,18 @@ impl Device { let driver = Arc::new(driver); let asio_streams = self.asio_streams.clone(); - driver.start().map_err(build_stream_err)?; + if let Err(e) = driver.start() { + driver.remove_event_callback(driver_event_callback_id); + driver.remove_callback(callback_id); + if let Ok(mut streams) = asio_streams.lock() { + streams.output = None; + } + return Err(build_stream_err(e)); + } + StreamState::store(&state, StreamState::Paused); Ok(Stream { - playing: stream_playing, + state, driver, asio_streams, callback_id, @@ -868,7 +928,8 @@ impl Device { error_callback: E, hardware_latency: Arc, is_input: bool, - ) -> sys::DriverEventCallbackId + state: Arc, + ) -> Result where E: FnMut(Error) + Send + 'static, { @@ -883,7 +944,47 @@ impl Device { let driver_for_latency = driver.clone(); let asio_streams_for_event = self.asio_streams.clone(); - driver.add_event_callback(move |event| { + // Debounce timer: wait for ASIO_EVENT_DEBOUNCE of silence after the most recent event + // before delivering to the user. Exits when `timer_tx` is dropped, which happens when the + // event callback closure is removed during stream teardown. + let (timer_tx, timer_rx) = mpsc::channel::(); + let error_cb_for_timer = Arc::clone(&error_callback_shared); + std::thread::Builder::new() + .name("cpal-asio-event-timer".into()) + .spawn(move || { + let mut pending: Option = None; + loop { + // Use recv() when idle (no timeout needed) so we don't spin. + let result = if pending.is_some() { + timer_rx.recv_timeout(ASIO_EVENT_DEBOUNCE) + } else { + timer_rx + .recv() + .map_err(|_| mpsc::RecvTimeoutError::Disconnected) + }; + match result { + Ok(err) => { + // New event; restart the grace window. + pending = Some(err); + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // Grace period elapsed with no new events: now deliver. + if let Some(err) = pending.take() { + emit_error(&error_cb_for_timer, err); + } + } + Err(mpsc::RecvTimeoutError::Disconnected) => return, + } + } + }) + .map_err(|e| { + Error::with_message( + ErrorKind::ResourceExhausted, + format!("failed to spawn ASIO event timer thread: {e}"), + ) + })?; + + Ok(driver.add_event_callback(move |event| { match event { sys::AsioDriverEvent::Message { selector: msg, @@ -894,29 +995,38 @@ impl Device { matches!( sys::AsioMessageSelectors::from_i64(value as i64), Some(sys::AsioMessageSelectors::kAsioBufferSizeChange) + | Some(sys::AsioMessageSelectors::kAsioOverload) ) } sys::AsioMessageSelectors::kAsioResetRequest => { - error_callback_shared - .lock() - .unwrap_or_else(|e| e.into_inner())( - Error::with_message( + // Guard on Starting: some USB ASIO drivers (ASIO4ALL, Focusrite, etc.) + // fire spurious reset/resync requests during driver.start(). + if StreamState::load(&state) != StreamState::Starting { + let _ = timer_tx.send(Error::with_message( ErrorKind::StreamInvalidated, "ASIO driver requested stream reset", - ), - ); - false + )); + } + true } sys::AsioMessageSelectors::kAsioResyncRequest => { - error_callback_shared - .lock() - .unwrap_or_else(|e| e.into_inner())( - Error::with_message( - ErrorKind::Xrun, - "ASIO driver requested resynchronization", - ), - ); - false + // Per the ASIO spec (and matching JUCE's behavior), kAsioResyncRequest + // means the driver needs a full stop/reinit/start. It is *not* a simple + // xrun notification. + if StreamState::load(&state) != StreamState::Starting { + let _ = timer_tx.send(Error::with_message( + ErrorKind::StreamInvalidated, + "ASIO driver requested stream resynchronization", + )); + } + true + } + sys::AsioMessageSelectors::kAsioOverload => { + if StreamState::load(&state) == StreamState::Playing { + let _ = + try_emit_error(&error_callback_shared, Error::new(ErrorKind::Xrun)); + } + true } sys::AsioMessageSelectors::kAsioLatenciesChanged => { if let Ok(latencies) = driver_for_latency.latencies() { @@ -955,20 +1065,16 @@ impl Device { true } }; - if should_notify { - error_callback_shared - .lock() - .unwrap_or_else(|e| e.into_inner())( - Error::with_message( - ErrorKind::StreamInvalidated, - format!("ASIO driver changed sample rate to {new_rate} Hz"), - ), - ); + if should_notify && StreamState::load(&state) != StreamState::Starting { + let _ = timer_tx.send(Error::with_message( + ErrorKind::StreamInvalidated, + format!("ASIO driver changed sample rate to {new_rate} Hz"), + )); } false } } - }) + })) } } diff --git a/src/host/audioworklet/mod.rs b/src/host/audioworklet/mod.rs index 31a5636cf..e3c57352c 100644 --- a/src/host/audioworklet/mod.rs +++ b/src/host/audioworklet/mod.rs @@ -417,6 +417,7 @@ type AudioProcessorCallback = Box; /// WasmAudioProcessor provides an interface for the Javascript code /// running in the AudioWorklet to interact with Rust. #[wasm_bindgen] +#[allow(unused_variables)] pub struct WasmAudioProcessor { #[wasm_bindgen(skip)] interleaved_buffer: Vec, diff --git a/src/host/coreaudio/ios/mod.rs b/src/host/coreaudio/ios/mod.rs index 219598645..b4f90b6db 100644 --- a/src/host/coreaudio/ios/mod.rs +++ b/src/host/coreaudio/ios/mod.rs @@ -20,7 +20,7 @@ use self::enumerate::{ }; use super::{asbd_from_config, host_time_to_stream_instant}; use crate::{ - host::{frames_to_duration, try_emit_error, ErrorCallbackArc}, + host::{frames_to_duration, latch::Latch, try_emit_error, ErrorCallbackArc}, traits::{DeviceTrait, HostTrait, StreamTrait}, BufferSize, ChannelCount, Data, DeviceDescription, DeviceDescriptionBuilder, DeviceId, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp, OutputCallbackInfo, @@ -174,7 +174,7 @@ impl DeviceTrait for Device { let device_buffer_frames = Some(get_device_buffer_frames()); let error_callback: ErrorCallbackArc = Arc::new(Mutex::new(error_callback)); - let session_manager = SessionEventManager::new(error_callback.clone()); + let session_manager = SessionEventManager::new(error_callback.clone(), Latch::new()); // Set up input callback setup_input_callback( @@ -188,15 +188,15 @@ impl DeviceTrait for Device { }, )?; - audio_unit.start()?; - - Ok(Stream::new( + let stream = Stream::new( StreamInner { - playing: true, + playing: false, audio_unit, }, session_manager, - )) + ); + stream.signal_ready(); + Ok(stream) } /// Create an output stream. @@ -219,7 +219,7 @@ impl DeviceTrait for Device { let device_buffer_frames = Some(get_device_buffer_frames()); let error_callback: ErrorCallbackArc = Arc::new(Mutex::new(error_callback)); - let session_manager = SessionEventManager::new(error_callback.clone()); + let session_manager = SessionEventManager::new(error_callback.clone(), Latch::new()); // Set up output callback setup_output_callback( @@ -233,30 +233,41 @@ impl DeviceTrait for Device { }, )?; - audio_unit.start()?; - - Ok(Stream::new( + let stream = Stream::new( StreamInner { - playing: true, + playing: false, audio_unit, }, session_manager, - )) + ); + stream.signal_ready(); + Ok(stream) } } pub struct Stream { inner: Mutex, - _session_manager: SessionEventManager, + session_manager: SessionEventManager, } impl Stream { fn new(inner: StreamInner, session_manager: SessionEventManager) -> Self { Self { inner: Mutex::new(inner), - _session_manager: session_manager, + session_manager, } } + + fn signal_ready(&self) { + self.session_manager.signal_ready(); + } +} + +impl Drop for Stream { + fn drop(&mut self) { + // Ensure the latch is released even if signal_ready() was never called (error path). + self.session_manager.signal_ready(); + } } impl StreamTrait for Stream { diff --git a/src/host/coreaudio/ios/session_event_manager.rs b/src/host/coreaudio/ios/session_event_manager.rs index 7f9665d44..f40a63943 100644 --- a/src/host/coreaudio/ios/session_event_manager.rs +++ b/src/host/coreaudio/ios/session_event_manager.rs @@ -12,7 +12,7 @@ use objc2_avf_audio::{ use objc2_foundation::{NSNotification, NSNotificationCenter, NSNumber, NSString}; use crate::{ - host::{emit_error, ErrorCallbackArc}, + host::{emit_error, latch::Latch, ErrorCallbackArc}, Error, ErrorKind, }; @@ -46,6 +46,7 @@ unsafe fn route_change_error(notification: &NSNotification) -> Option { } pub(super) struct SessionEventManager { + latch: Latch, observers: Vec< objc2::rc::Retained>, >, @@ -57,15 +58,19 @@ unsafe impl Send for SessionEventManager {} unsafe impl Sync for SessionEventManager {} impl SessionEventManager { - pub(super) fn new(error_callback: ErrorCallbackArc) -> Self { + pub(super) fn new(error_callback: ErrorCallbackArc, latch: Latch) -> Self { let nc = NSNotificationCenter::defaultCenter(); let mut observers = Vec::new(); + let waiter = latch.waiter(); { let cb = error_callback.clone(); + let w = waiter.clone(); let block = RcBlock::new(move |notif: NonNull| { - if let Some(err) = unsafe { route_change_error(notif.as_ref()) } { - emit_error(&cb, err); + if w.is_released() { + if let Some(err) = unsafe { route_change_error(notif.as_ref()) } { + emit_error(&cb, err); + } } }); if let Some(name) = unsafe { AVAudioSessionRouteChangeNotification } { @@ -78,14 +83,17 @@ impl SessionEventManager { { let cb = error_callback.clone(); + let w = waiter.clone(); let block = RcBlock::new(move |_: NonNull| { - emit_error( - &cb, - Error::with_message( - ErrorKind::DeviceNotAvailable, - "audio media services were lost", - ), - ); + if w.is_released() { + emit_error( + &cb, + Error::with_message( + ErrorKind::DeviceNotAvailable, + "audio media services were lost", + ), + ); + } }); if let Some(name) = unsafe { AVAudioSessionMediaServicesWereLostNotification } { let observer = unsafe { @@ -97,14 +105,17 @@ impl SessionEventManager { { let cb = error_callback.clone(); + let w = waiter; let block = RcBlock::new(move |_: NonNull| { - emit_error( - &cb, - Error::with_message( - ErrorKind::StreamInvalidated, - "audio media services were reset", - ), - ); + if w.is_released() { + emit_error( + &cb, + Error::with_message( + ErrorKind::StreamInvalidated, + "audio media services were reset", + ), + ); + } }); if let Some(name) = unsafe { AVAudioSessionMediaServicesWereResetNotification } { let observer = unsafe { @@ -114,7 +125,11 @@ impl SessionEventManager { } } - Self { observers } + Self { latch, observers } + } + + pub(super) fn signal_ready(&self) { + self.latch.release(); } } diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 702c09d0d..61c638d85 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -41,7 +41,7 @@ use objc2_core_foundation::{CFString, Type}; pub use super::enumerate::{SupportedInputConfigs, SupportedOutputConfigs}; use super::{ asbd_from_config, check_os_status, host_time_to_stream_instant, DefaultOutputMonitor, - DisconnectManager, Stream, + DisconnectManager, Monitor, Stream, }; use crate::{ host::{ @@ -825,26 +825,19 @@ impl Device { audio_unit.initialize()?; let inner_arc = Arc::new(Mutex::new(StreamInner { - playing: true, + playing: false, audio_unit, - device_id: self.audio_device_id, + _device_id: self.audio_device_id, _loopback_device: loopback_aggregate, })); let weak_inner = Arc::downgrade(&inner_arc); - let monitor: Box = Box::new(DisconnectManager::new( + let monitor: Box = Box::new(DisconnectManager::new( self.audio_device_id, weak_inner, error_callback_disconnect, )?); let stream = Stream::new(inner_arc, monitor); - - stream - .inner - .lock() - .map_err(|_| Error::with_message(ErrorKind::StreamInvalidated, "stream lock poisoned"))? - .audio_unit - .start()?; - + stream.signal_ready(); Ok(stream) } @@ -936,13 +929,13 @@ impl Device { audio_unit.initialize()?; let inner_arc = Arc::new(Mutex::new(StreamInner { - playing: true, + playing: false, audio_unit, - device_id: self.audio_device_id, + _device_id: self.audio_device_id, _loopback_device: None, })); let weak_inner = Arc::downgrade(&inner_arc); - let monitor: Box = if matches!(mode, AudioUnitMode::DefaultOutput) { + let monitor: Box = if matches!(mode, AudioUnitMode::DefaultOutput) { Box::new(DefaultOutputMonitor::new(weak_inner, error_callback)?) } else { Box::new(DisconnectManager::new( @@ -952,14 +945,7 @@ impl Device { )?) }; let stream = Stream::new(inner_arc, monitor); - - stream - .inner - .lock() - .map_err(|_| Error::with_message(ErrorKind::StreamInvalidated, "stream lock poisoned"))? - .audio_unit - .start()?; - + stream.signal_ready(); Ok(stream) } } diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index e9c447bd4..d39b2d57d 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -13,7 +13,7 @@ use property_listener::AudioObjectPropertyListener; pub use self::enumerate::{default_input_device, default_output_device, Devices}; use super::{asbd_from_config, check_os_status, host_time_to_stream_instant, OSStatus}; use crate::{ - host::{coreaudio::macos::loopback::LoopbackDevice, emit_error}, + host::{coreaudio::macos::loopback::LoopbackDevice, emit_error, latch::Latch}, traits::{HostTrait, StreamTrait}, Error, ErrorKind, FrameCount, ResultExt, StreamInstant, }; @@ -94,6 +94,13 @@ fn spawn_property_listener_thread( Ok((change_rx, shutdown_tx)) } +/// A device monitor that can signal when the owning `Stream` handle has been returned to the +/// caller, allowing the delivery thread to start processing events. +pub(super) trait Monitor: Send + Sync { + /// Unblocks the delivery thread. Called after `Stream::new()` and from `Stream::drop()`. + fn signal_ready(&self); +} + /// Manages device disconnection listener on a dedicated thread to ensure the /// AudioObjectPropertyListener is always created and dropped on the same thread. /// This avoids potential threading issues with CoreAudio APIs. @@ -104,6 +111,7 @@ fn spawn_property_listener_thread( /// /// The dedicated thread architecture ensures `Stream` can implement `Send`. struct DisconnectManager { + latch: Latch, _shutdown_tx: mpsc::Sender<()>, } @@ -167,31 +175,55 @@ impl DisconnectManager { ) })??; - std::thread::spawn(move || { - while let Ok(err) = disconnect_rx.recv() { - if let Some(stream_arc) = stream_weak.upgrade() { - if let Ok(mut stream_inner) = stream_arc.try_lock() { - let _ = stream_inner.pause(); + let mut latch = Latch::new(); + let waiter = latch.waiter(); + + let handle = std::thread::Builder::new() + .name("cpal-coreaudio-disconnect".into()) + .spawn(move || { + // If the Latch is dropped without being released (error path), exit cleanly. + if !waiter.wait() { + return; + } + while let Ok(err) = disconnect_rx.recv() { + if let Some(stream_arc) = stream_weak.upgrade() { + if let Ok(mut stream_inner) = stream_arc.try_lock() { + let _ = stream_inner.pause(); + } + emit_error(&error_callback, err); + } else { + break; } - emit_error(&error_callback, err); - } else { - break; } - } - }); - + }) + .map_err(|e| { + Error::with_message( + ErrorKind::ResourceExhausted, + format!("failed to spawn disconnect delivery thread: {e}"), + ) + })?; + + latch.add_thread(handle.thread().clone()); Ok(DisconnectManager { + latch, _shutdown_tx: shutdown_tx, }) } } +impl Monitor for DisconnectManager { + fn signal_ready(&self) { + self.latch.release(); + } +} + /// Manages the system default output device change listener on a dedicated thread. /// /// When the system default output device changes: /// - If a new valid default exists, AudioUnit reroutes and `DeviceChanged` is reported. /// - If there is no new default, the stream is paused and `DeviceNotAvailable` is reported. struct DefaultOutputMonitor { + latch: Latch, _shutdown_tx: mpsc::Sender<()>, } @@ -209,52 +241,69 @@ impl DefaultOutputMonitor { }, )?; - std::thread::spawn(move || { - while let Ok(()) = change_rx.recv() { - let Some(arc) = stream_weak.upgrade() else { - break; - }; - if default_output_device().is_none() { - if let Ok(mut inner) = arc.try_lock() { - let _ = inner.pause(); + let mut latch = Latch::new(); + let waiter = latch.waiter(); + + let handle = std::thread::Builder::new() + .name("cpal-coreaudio-default-output".into()) + .spawn(move || { + if !waiter.wait() { + return; + } + while let Ok(()) = change_rx.recv() { + let Some(arc) = stream_weak.upgrade() else { + break; + }; + if default_output_device().is_none() { + if let Ok(mut inner) = arc.try_lock() { + let _ = inner.pause(); + } + emit_error( + &error_callback, + Error::with_message( + ErrorKind::DeviceNotAvailable, + "no default output device", + ), + ); + } else { + // DefaultOutput AudioUnit rerouted automatically; notify the caller. + emit_error( + &error_callback, + Error::with_message( + ErrorKind::DeviceChanged, + "default output device changed", + ), + ); } - emit_error( - &error_callback, - Error::with_message( - ErrorKind::DeviceNotAvailable, - "no default output device", - ), - ); - } else { - // DefaultOutput AudioUnit rerouted automatically; notify the caller. - emit_error( - &error_callback, - Error::with_message( - ErrorKind::DeviceChanged, - "default output device changed", - ), - ); } - } - }); - + }) + .map_err(|e| { + Error::with_message( + ErrorKind::ResourceExhausted, + format!("failed to spawn default-output monitor thread: {e}"), + ) + })?; + + latch.add_thread(handle.thread().clone()); Ok(DefaultOutputMonitor { + latch, _shutdown_tx: shutdown_tx, }) } } +impl Monitor for DefaultOutputMonitor { + fn signal_ready(&self) { + self.latch.release(); + } +} + struct StreamInner { playing: bool, audio_unit: AudioUnit, - // Track the device with which the audio unit was spawned. - // - // We must do this so that we can avoid changing the device sample rate if there is already - // a stream associated with the device. - #[allow(dead_code)] - device_id: AudioDeviceID, - /// Manage the lifetime of the aggregate device used - /// for loopback recording + // Track the device with which the audio unit was spawned + _device_id: AudioDeviceID, + /// Manage the lifetime of the aggregate device used for loopback recording _loopback_device: Option, } @@ -282,17 +331,23 @@ impl StreamInner { pub struct Stream { inner: Arc>, - // Holds the device monitor (either DisconnectManager or DefaultOutputMonitor) to keep it - // alive for the lifetime of the stream. - _monitor: Box, + monitor: Box, } impl Stream { - fn new(inner: Arc>, monitor: Box) -> Self { - Self { - inner, - _monitor: monitor, - } + fn new(inner: Arc>, monitor: Box) -> Self { + Self { inner, monitor } + } + + fn signal_ready(&self) { + self.monitor.signal_ready(); + } +} + +impl Drop for Stream { + fn drop(&mut self) { + // Unblock monitor delivery threads if the stream is dropped early. + self.monitor.signal_ready(); } } diff --git a/src/host/jack/stream.rs b/src/host/jack/stream.rs index b24d92657..44294a1e5 100644 --- a/src/host/jack/stream.rs +++ b/src/host/jack/stream.rs @@ -1,5 +1,5 @@ use std::sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicU8, Ordering}, Arc, Mutex, }; @@ -11,9 +11,31 @@ use crate::{ OutputCallbackInfo, OutputStreamTimestamp, ResultExt, Sample, SampleRate, StreamInstant, }; +#[repr(u8)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] +enum StreamState { + #[default] + Starting = 0, + Paused = 1, + Playing = 2, +} + +impl StreamState { + fn load(atom: &AtomicU8, order: Ordering) -> Self { + match atom.load(order) { + 1 => Self::Paused, + 2 => Self::Playing, + _ => Self::Starting, + } + } + + fn store(self, atom: &AtomicU8, order: Ordering) { + atom.store(self as u8, order); + } +} + pub struct Stream { - // TODO: It might be faster to send a message when playing/pausing than to check this every iteration - playing: Arc, + state: Arc, async_client: jack::AsyncClient, // Port names are stored in order to connect them to other ports in jack automatically input_port_names: Vec, @@ -47,7 +69,7 @@ impl Stream { ports.push(port); } - let playing = Arc::new(AtomicBool::new(true)); + let state = Arc::new(AtomicU8::new(StreamState::Starting as u8)); let error_callback_ptr: ErrorCallbackArc = Arc::new(Mutex::new(error_callback)); let input_process_handler = LocalProcessHandler::new( @@ -57,19 +79,24 @@ impl Stream { client.buffer_size() as usize, Some(Box::new(data_callback)), None, - playing.clone(), + state.clone(), #[cfg(feature = "realtime")] error_callback_ptr.clone(), ); - let notification_handler = JackNotificationHandler::new(error_callback_ptr); + let notification_handler = JackNotificationHandler::new( + error_callback_ptr, + state.clone(), + client.sample_rate() as jack::Frames, + ); let async_client = client .activate_async(notification_handler, input_process_handler) .context("failed to activate JACK client")?; + StreamState::Paused.store(&state, Ordering::Release); Ok(Self { - playing, + state, async_client, input_port_names: port_names, output_port_names: vec![], @@ -98,7 +125,7 @@ impl Stream { ports.push(port); } - let playing = Arc::new(AtomicBool::new(true)); + let state = Arc::new(AtomicU8::new(StreamState::Starting as u8)); let error_callback_ptr: ErrorCallbackArc = Arc::new(Mutex::new(error_callback)); let output_process_handler = LocalProcessHandler::new( @@ -108,19 +135,24 @@ impl Stream { client.buffer_size() as usize, None, Some(Box::new(data_callback)), - playing.clone(), + state.clone(), #[cfg(feature = "realtime")] error_callback_ptr.clone(), ); - let notification_handler = JackNotificationHandler::new(error_callback_ptr); + let notification_handler = JackNotificationHandler::new( + error_callback_ptr, + state.clone(), + client.sample_rate() as jack::Frames, + ); let async_client = client .activate_async(notification_handler, output_process_handler) .context("failed to activate JACK client")?; + StreamState::Paused.store(&state, Ordering::Release); Ok(Self { - playing, + state, async_client, input_port_names: vec![], output_port_names: port_names, @@ -139,11 +171,8 @@ impl Stream { /// On error, connections that were made before the failure are rolled back on a best-effort /// basis so the JACK graph is left unchanged. pub fn connect_to_system_outputs(&mut self) -> Result<(), Error> { - let system_ports = self.async_client.as_client().ports( - Some("system:playback_.*"), - None, - jack::PortFlags::empty(), - ); + let client = self.async_client.as_client(); + let system_ports = client.ports(Some("system:playback_.*"), None, jack::PortFlags::empty()); let n_our = self.output_port_names.len(); let n_sys = system_ports.len(); @@ -160,18 +189,11 @@ impl Stream { for (i, (our_port, system_port)) in self.output_port_names.iter().zip(&system_ports).enumerate() { - if let Err(e) = self - .async_client - .as_client() - .connect_ports_by_name(our_port, system_port) - { + if let Err(e) = client.connect_ports_by_name(our_port, system_port) { for (prev_our, prev_sys) in self.output_port_names[..i].iter().zip(&system_ports[..i]) { - let _ = self - .async_client - .as_client() - .disconnect_ports_by_name(prev_our, prev_sys); + let _ = client.disconnect_ports_by_name(prev_our, prev_sys); } return Err(Error::with_message( @@ -195,11 +217,8 @@ impl Stream { /// On error, connections that were made before the failure are rolled back on a best-effort /// basis so the JACK graph is left unchanged. pub fn connect_to_system_inputs(&mut self) -> Result<(), Error> { - let system_ports = self.async_client.as_client().ports( - Some("system:capture_.*"), - None, - jack::PortFlags::empty(), - ); + let client = self.async_client.as_client(); + let system_ports = client.ports(Some("system:capture_.*"), None, jack::PortFlags::empty()); let n_our = self.input_port_names.len(); let n_sys = system_ports.len(); @@ -216,18 +235,11 @@ impl Stream { for (i, (system_port, our_port)) in system_ports.iter().zip(&self.input_port_names).enumerate() { - if let Err(e) = self - .async_client - .as_client() - .connect_ports_by_name(system_port, our_port) - { + if let Err(e) = client.connect_ports_by_name(system_port, our_port) { for (prev_sys, prev_our) in system_ports[..i].iter().zip(&self.input_port_names[..i]) { - let _ = self - .async_client - .as_client() - .disconnect_ports_by_name(prev_sys, prev_our); + let _ = client.disconnect_ports_by_name(prev_sys, prev_our); } return Err(Error::with_message( @@ -242,12 +254,12 @@ impl Stream { impl StreamTrait for Stream { fn play(&self) -> Result<(), Error> { - self.playing.store(true, Ordering::Relaxed); + StreamState::Playing.store(&self.state, Ordering::Relaxed); Ok(()) } fn pause(&self) -> Result<(), Error> { - self.playing.store(false, Ordering::Relaxed); + StreamState::Paused.store(&self.state, Ordering::Relaxed); Ok(()) } @@ -276,7 +288,7 @@ struct LocalProcessHandler { // JACK audio samples are 32-bit float (unless you do some custom dark magic) temp_input_buffer: Vec, temp_output_buffer: Vec, - playing: Arc, + state: Arc, #[cfg(feature = "realtime")] error_callback: ErrorCallbackArc, #[cfg(feature = "realtime")] @@ -292,7 +304,7 @@ impl LocalProcessHandler { buffer_size: usize, input_data_callback: Option, output_data_callback: Option, - playing: Arc, + state: Arc, #[cfg(feature = "realtime")] error_callback: ErrorCallbackArc, ) -> Self { let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size]; @@ -307,7 +319,7 @@ impl LocalProcessHandler { output_data_callback, temp_input_buffer, temp_output_buffer, - playing, + state, #[cfg(feature = "realtime")] error_callback, #[cfg(feature = "realtime")] @@ -329,6 +341,14 @@ impl jack::ProcessHandler for LocalProcessHandler { client: &jack::Client, process_scope: &jack::ProcessScope, ) -> jack::Control { + if StreamState::load(&self.state, Ordering::Relaxed) != StreamState::Playing { + // JACK does not zero-fill output port buffers before calling the process handler + for port in &mut self.out_ports { + port.as_mut_slice(process_scope).fill(f32::EQUILIBRIUM); + } + return jack::Control::Continue; + } + #[cfg(feature = "realtime")] { if !self.rt_checked { @@ -396,10 +416,6 @@ impl jack::ProcessHandler for LocalProcessHandler { } } - if !self.playing.load(Ordering::Relaxed) { - return jack::Control::Continue; - } - // This should be equal to self.buffer_size, but the implementation will // work even if it is less. Will panic in `temp_buffer_to_data` if greater. let current_frame_count = process_scope.n_frames() as usize; @@ -507,20 +523,29 @@ fn micros_to_stream_instant(micros: u64) -> StreamInstant { /// Receives notifications from the JACK server on JACK's notification thread (single-threaded). struct JackNotificationHandler { error_callback_ptr: ErrorCallbackArc, - init_sample_rate_flag: bool, + state: Arc, + configured_sample_rate: jack::Frames, } impl JackNotificationHandler { - pub fn new(error_callback_ptr: ErrorCallbackArc) -> Self { + pub fn new( + error_callback_ptr: ErrorCallbackArc, + state: Arc, + configured_sample_rate: jack::Frames, + ) -> Self { JackNotificationHandler { error_callback_ptr, - init_sample_rate_flag: false, + state, + configured_sample_rate, } } } impl jack::NotificationHandler for JackNotificationHandler { unsafe fn shutdown(&mut self, _status: jack::ClientStatus, reason: &str) { + if StreamState::load(&self.state, Ordering::Acquire) == StreamState::Starting { + return; + } emit_error( &self.error_callback_ptr, Error::with_message( @@ -531,32 +556,29 @@ impl jack::NotificationHandler for JackNotificationHandler { } fn sample_rate(&mut self, _: &jack::Client, srate: jack::Frames) -> jack::Control { - match self.init_sample_rate_flag { - false => { - // One of these notifications is sent every time a client is started. - self.init_sample_rate_flag = true; - jack::Control::Continue - } - true => { - // The JACK server has changed the sample rate, invalidating this stream. - // The stream configuration must be rebuilt with the new sample rate. - emit_error( - &self.error_callback_ptr, - Error::with_message( - ErrorKind::StreamInvalidated, - format!("JACK server changed sample rate to {srate} Hz"), - ), - ); - jack::Control::Quit - } + if srate == self.configured_sample_rate { + // One of these notifications is sent every time a client is started. + return jack::Control::Continue; + } + if StreamState::load(&self.state, Ordering::Acquire) != StreamState::Starting { + emit_error( + &self.error_callback_ptr, + Error::with_message( + ErrorKind::StreamInvalidated, + format!("JACK server changed sample rate to {srate} Hz"), + ), + ); } + jack::Control::Quit } fn xrun(&mut self, _: &jack::Client) -> jack::Control { - let _ = try_emit_error( - &self.error_callback_ptr, - Error::with_message(ErrorKind::Xrun, "JACK xrun detected"), - ); + if StreamState::load(&self.state, Ordering::Acquire) != StreamState::Starting { + let _ = try_emit_error( + &self.error_callback_ptr, + Error::with_message(ErrorKind::Xrun, "JACK xrun detected"), + ); + } jack::Control::Continue } } diff --git a/src/host/latch.rs b/src/host/latch.rs new file mode 100644 index 000000000..6d979640d --- /dev/null +++ b/src/host/latch.rs @@ -0,0 +1,110 @@ +//! Stream-readiness latch used by backends with dedicated worker threads. +//! +//! Prevents worker threads from invoking user callbacks before the `Stream` handle has been +//! returned to the caller. + +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Weak, + }, + thread::Thread, +}; + +/// Signals worker threads that the stream handle has been given to the caller. +#[derive(Debug)] +pub(crate) struct Latch { + /// `Option` so `Drop` can move it out before unparking, closing the window where a thread + /// could wake, see the Arc still alive (flag=false), re-park, then be orphaned. + flag: Option>, + threads: Vec, +} + +/// Held by a worker thread. Parks until the matching [`Latch`] is released. +#[derive(Clone, Debug)] +pub(crate) struct LatchWaiter(Weak); + +impl Latch { + /// Creates a new stream-readiness latch. + pub(crate) fn new() -> Self { + Self { + flag: Some(Arc::new(AtomicBool::new(false))), + threads: Vec::new(), + } + } + + /// Returns a waiter that unblocks when this latch is released. + pub(crate) fn waiter(&self) -> LatchWaiter { + LatchWaiter(Arc::downgrade( + self.flag + .as_ref() + .expect("waiter called on a dropped Latch"), + )) + } + + /// Registers a thread to be unparked when [`release`](Self::release) is called. + #[cfg(any( + target_os = "macos", + target_os = "windows", + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd", + ))] + pub(crate) fn add_thread(&mut self, thread: Thread) { + self.threads.push(thread); + } + + /// Releases the latch and unparks all registered threads. + pub(crate) fn release(&self) { + if let Some(flag) = &self.flag { + flag.store(true, Ordering::Release); + } + for t in &self.threads { + t.unpark(); + } + } +} + +impl Drop for Latch { + fn drop(&mut self) { + // Invalidate the Arc *before* unparking so waiters see upgrade() == None and exit cleanly + // on the error path (latch dropped without being released). + drop(self.flag.take()); + for t in &self.threads { + t.unpark(); + } + } +} + +impl LatchWaiter { + /// Parks the calling thread until the latch is released or dropped without releasing. + /// + /// Returns `true` if the stream is ready, `false` if the [`Latch`] was dropped before release. + #[cfg(any( + target_os = "macos", + target_os = "windows", + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd", + ))] + pub(crate) fn wait(&self) -> bool { + loop { + match self.0.upgrade() { + None => return false, + Some(flag) if flag.load(Ordering::Acquire) => return true, + Some(flag) => { + drop(flag); // release strong ref before parking + std::thread::park(); + } + } + } + } + + /// Returns `true` if the latch has already been released. + #[cfg(all(target_vendor = "apple", not(target_os = "macos")))] + pub(crate) fn is_released(&self) -> bool { + self.0.upgrade().is_some_and(|f| f.load(Ordering::Acquire)) + } +} diff --git a/src/host/mod.rs b/src/host/mod.rs index 2ccf71c71..6b67defdd 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -90,6 +90,27 @@ pub(crate) mod custom; )))] pub(crate) mod null; +#[cfg(any( + target_vendor = "apple", + target_os = "windows", + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd", + all( + feature = "jack", + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd", + target_os = "macos", + target_os = "windows", + ) + ), +))] +pub(crate) mod latch; + /// Shared error-callback type that hands the callback across thread boundaries. #[allow(dead_code)] pub(crate) type ErrorCallbackArc = std::sync::Arc>; diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index deb148c72..5e69abe1d 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -21,6 +21,7 @@ use super::stream::Stream; use crate::{ host::{ emit_error, + latch::Latch, pipewire::{ stream::{ DefaultDeviceMonitor, PwInitGuard, StreamCommand, StreamData, SUPPORTED_FORMATS, @@ -319,8 +320,9 @@ impl DeviceTrait for Device { { let (pw_play_tx, pw_play_rx) = pw::channel::channel::(); - let (pw_init_tx, pw_init_rx) = std::sync::mpsc::channel::(); - let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel::<()>(0); + let (init_tx, init_rx) = std::sync::mpsc::channel::>(); + let mut latch = Latch::new(); + let waiter = latch.waiter(); let device = self.clone(); let wait_timeout = timeout.unwrap_or(Duration::from_secs(2)); let initial_quantum = match config.buffer_size { @@ -336,18 +338,7 @@ impl DeviceTrait for Device { let _pw = PwInitGuard::new(); let properties = device.pw_properties(DeviceDirection::Input, &config); - let Ok(StreamData { - mainloop, - listener, - stream, - context, - core, - core_monitor, - error_callback, - pending_device_changed, - invalidated, - is_default_device, - }) = super::stream::connect_input( + let stream_data = match super::stream::connect_input( super::stream::ConnectParams { config, properties, @@ -357,55 +348,59 @@ impl DeviceTrait for Device { }, data_callback, error_callback, - ) - else { - let _ = pw_init_tx.send(false); - return; + ) { + Ok(d) => d, + Err(e) => { + let _ = init_tx.send(Err(Error::with_message( + ErrorKind::UnsupportedConfig, + format!("PipeWire stream connection failed: {e}"), + ))); + return; + } }; - let _ = pw_init_tx.send(true); - - // Wait until the caller has received the Stream handle before running the - // mainloop or invoking any callbacks. If the caller timed out and dropped - // ready_tx, exit cleanly. - if ready_rx.recv().is_err() { - return; - } - let default_monitor = - if let Some(key) = device.default_metadata_key() { - match core.get_registry_rc() { - Ok(registry) => Some(DefaultDeviceMonitor::new( - registry, - key, - error_callback.clone(), - invalidated, - pending_device_changed, - )), - Err(e) => { - emit_error( - &error_callback, - Error::with_message( - ErrorKind::BackendError, - format!("PipeWire: could not acquire registry; device change notifications will be unavailable: {e}"), - ), - ); - None - } + let StreamData { + mainloop, + listener, + stream, + context, + core, + core_monitor, + error_callback, + pending_device_changed, + invalidated, + is_default_device, + } = stream_data; + + let default_monitor = if let Some(key) = device.default_metadata_key() { + match core.get_registry_rc() { + Ok(registry) => Some(DefaultDeviceMonitor::new( + registry, + key, + error_callback.clone(), + invalidated, + pending_device_changed, + )), + Err(e) => { + let _ = init_tx.send(Err(Error::with_message( + ErrorKind::BackendError, + format!("PipeWire: could not acquire registry: {e}"), + ))); + return; } - } else { - None - }; + } + } else { + None + }; is_default_device.store(default_monitor.is_some(), Ordering::Relaxed); - let stream = stream.clone(); + let stream_clone = stream.clone(); let mainloop_rc1 = mainloop.clone(); - - #[cfg(feature = "realtime")] - let error_callback_rt = error_callback.clone(); + let error_callback_cmd = error_callback.clone(); let _receiver = pw_play_rx.attach(mainloop.loop_(), move |play| match play { StreamCommand::Toggle(state) => { - if let Err(e) = stream.set_active(state) { + if let Err(e) = stream_clone.set_active(state) { emit_error( - &error_callback, + &error_callback_cmd, Error::with_message( ErrorKind::StreamInvalidated, format!("PipeWire: set_active({state}) failed: {e}"), @@ -414,9 +409,9 @@ impl DeviceTrait for Device { } } StreamCommand::Stop => { - if let Err(e) = stream.disconnect() { + if let Err(e) = stream_clone.disconnect() { emit_error( - &error_callback, + &error_callback_cmd, Error::with_message( ErrorKind::StreamInvalidated, format!("PipeWire: stream disconnect failed: {e}"), @@ -427,15 +422,25 @@ impl DeviceTrait for Device { } }); + if init_tx.send(Ok(())).is_err() { + return; + } + + // If the Latch is dropped without being released (error path), exit cleanly. + if !waiter.wait() { + return; + } + #[cfg(feature = "realtime")] if let Err(e) = audio_thread_priority::promote_current_thread_to_real_time( device.quantum, device.rate, ) { - emit_error(&error_callback_rt, Error::from(e)); + emit_error(&error_callback, Error::from(e)); } mainloop.run(); + drop(listener); drop(default_monitor); drop(core_monitor); @@ -443,28 +448,28 @@ impl DeviceTrait for Device { drop(context); }) .map_err(|e| { - Error::with_message(ErrorKind::ResourceExhausted, format!("failed to create thread: {e}")) + Error::with_message( + ErrorKind::ResourceExhausted, + format!("failed to create thread: {e}"), + ) })?; - match pw_init_rx.recv_timeout(wait_timeout) { - Ok(true) => { - let stream = Stream { - handle: Some(handle), - controller: pw_play_tx, - last_quantum, - start, - }; - let _ = ready_tx.send(()); - Ok(stream) - } - Ok(false) => Err(Error::with_message( - ErrorKind::UnsupportedConfig, - "stream configuration rejected by PipeWire", - )), - Err(_) => Err(Error::with_message( + + let init_result = init_rx.recv_timeout(wait_timeout).unwrap_or_else(|_| { + Err(Error::with_message( ErrorKind::DeviceNotAvailable, "PipeWire timed out", - )), + )) + }); + + if let Err(e) = init_result { + drop(latch); + return Err(e); } + + latch.add_thread(handle.thread().clone()); + let stream = Stream::new(handle, pw_play_tx, last_quantum, start, latch); + stream.signal_ready(); + Ok(stream) } fn build_output_stream_raw( @@ -481,8 +486,9 @@ impl DeviceTrait for Device { { let (pw_play_tx, pw_play_rx) = pw::channel::channel::(); - let (pw_init_tx, pw_init_rx) = std::sync::mpsc::channel::(); - let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel::<()>(0); + let (init_tx, init_rx) = std::sync::mpsc::channel::>(); + let mut latch = Latch::new(); + let waiter = latch.waiter(); let device = self.clone(); let wait_timeout = timeout.unwrap_or(Duration::from_secs(2)); let initial_quantum = match config.buffer_size { @@ -498,18 +504,7 @@ impl DeviceTrait for Device { let _pw = PwInitGuard::new(); let properties = device.pw_properties(DeviceDirection::Output, &config); - let Ok(StreamData { - mainloop, - listener, - stream, - context, - core, - core_monitor, - error_callback, - pending_device_changed, - invalidated, - is_default_device, - }) = super::stream::connect_output( + let stream_data = match super::stream::connect_output( super::stream::ConnectParams { config, properties, @@ -519,56 +514,59 @@ impl DeviceTrait for Device { }, data_callback, error_callback, - ) - else { - let _ = pw_init_tx.send(false); - return; + ) { + Ok(d) => d, + Err(e) => { + let _ = init_tx.send(Err(Error::with_message( + ErrorKind::UnsupportedConfig, + format!("PipeWire stream connection failed: {e}"), + ))); + return; + } }; - let _ = pw_init_tx.send(true); - - // Wait until the caller has received the Stream handle before running the - // mainloop or invoking any callbacks. If the caller timed out and dropped - // ready_tx, exit cleanly. - if ready_rx.recv().is_err() { - return; - } - - let default_monitor = - if let Some(key) = device.default_metadata_key() { - match core.get_registry_rc() { - Ok(registry) => Some(DefaultDeviceMonitor::new( - registry, - key, - error_callback.clone(), - invalidated, - pending_device_changed, - )), - Err(e) => { - emit_error( - &error_callback, - Error::with_message( - ErrorKind::BackendError, - format!("PipeWire: could not acquire registry; device change notifications will be unavailable: {e}"), - ), - ); - None - } + let StreamData { + mainloop, + listener, + stream, + context, + core, + core_monitor, + error_callback, + pending_device_changed, + invalidated, + is_default_device, + } = stream_data; + + let default_monitor = if let Some(key) = device.default_metadata_key() { + match core.get_registry_rc() { + Ok(registry) => Some(DefaultDeviceMonitor::new( + registry, + key, + error_callback.clone(), + invalidated, + pending_device_changed, + )), + Err(e) => { + let _ = init_tx.send(Err(Error::with_message( + ErrorKind::BackendError, + format!("PipeWire: could not acquire registry: {e}"), + ))); + return; } - } else { - None - }; + } + } else { + None + }; is_default_device.store(default_monitor.is_some(), Ordering::Relaxed); - let stream = stream.clone(); + let stream_clone = stream.clone(); let mainloop_rc1 = mainloop.clone(); - - #[cfg(feature = "realtime")] - let error_callback_rt = error_callback.clone(); + let error_callback_cmd = error_callback.clone(); let _receiver = pw_play_rx.attach(mainloop.loop_(), move |play| match play { StreamCommand::Toggle(state) => { - if let Err(e) = stream.set_active(state) { + if let Err(e) = stream_clone.set_active(state) { emit_error( - &error_callback, + &error_callback_cmd, Error::with_message( ErrorKind::StreamInvalidated, format!("PipeWire: set_active({state}) failed: {e}"), @@ -577,9 +575,9 @@ impl DeviceTrait for Device { } } StreamCommand::Stop => { - if let Err(e) = stream.disconnect() { + if let Err(e) = stream_clone.disconnect() { emit_error( - &error_callback, + &error_callback_cmd, Error::with_message( ErrorKind::StreamInvalidated, format!("PipeWire: stream disconnect failed: {e}"), @@ -590,12 +588,21 @@ impl DeviceTrait for Device { } }); + if init_tx.send(Ok(())).is_err() { + return; + } + + // If the Latch is dropped without being released (error path), exit cleanly. + if !waiter.wait() { + return; + } + #[cfg(feature = "realtime")] if let Err(e) = audio_thread_priority::promote_current_thread_to_real_time( device.quantum, device.rate, ) { - emit_error(&error_callback_rt, Error::from(e)); + emit_error(&error_callback, Error::from(e)); } mainloop.run(); @@ -606,28 +613,28 @@ impl DeviceTrait for Device { drop(context); }) .map_err(|e| { - Error::with_message(ErrorKind::ResourceExhausted, format!("failed to create thread: {e}")) + Error::with_message( + ErrorKind::ResourceExhausted, + format!("failed to create thread: {e}"), + ) })?; - match pw_init_rx.recv_timeout(wait_timeout) { - Ok(true) => { - let stream = Stream { - handle: Some(handle), - controller: pw_play_tx, - last_quantum, - start, - }; - let _ = ready_tx.send(()); - Ok(stream) - } - Ok(false) => Err(Error::with_message( - ErrorKind::UnsupportedConfig, - "stream configuration rejected by PipeWire", - )), - Err(_) => Err(Error::with_message( + + let init_result = init_rx.recv_timeout(wait_timeout).unwrap_or_else(|_| { + Err(Error::with_message( ErrorKind::DeviceNotAvailable, "PipeWire timed out", - )), + )) + }); + + if let Err(e) = init_result { + drop(latch); + return Err(e); } + + latch.add_thread(handle.thread().clone()); + let stream = Stream::new(handle, pw_play_tx, last_quantum, start, latch); + stream.signal_ready(); + Ok(stream) } } diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index d4e93569c..0779bd9f6 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -28,8 +28,8 @@ use pipewire::{ use crate::{ host::{ - emit_error, equilibrium::fill_equilibrium, frames_to_duration, try_emit_error, - ErrorCallbackArc, + emit_error, equilibrium::fill_equilibrium, frames_to_duration, latch::Latch, + try_emit_error, ErrorCallbackArc, }, traits::StreamTrait, Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp, @@ -75,14 +75,40 @@ pub enum StreamCommand { } pub struct Stream { - pub(crate) handle: Option>, - pub(crate) controller: pw::channel::Sender, - pub(crate) last_quantum: Arc, - pub(crate) start: Instant, + handle: Option>, + controller: pw::channel::Sender, + last_quantum: Arc, + start: Instant, + latch: Latch, +} + +impl Stream { + pub(crate) fn new( + handle: JoinHandle<()>, + controller: pw::channel::Sender, + last_quantum: Arc, + start: Instant, + latch: Latch, + ) -> Self { + Self { + handle: Some(handle), + controller, + last_quantum, + start, + latch, + } + } + + /// Releases the latch so the worker thread can begin processing audio callbacks. + pub fn signal_ready(&self) { + self.latch.release(); + } } impl Drop for Stream { fn drop(&mut self) { + // Unblock the worker in case the stream is dropped before signal_ready() was called. + self.signal_ready(); let _ = self.controller.send(StreamCommand::Stop); if let Some(handle) = self.handle.take() { // Prevent self-join: Stop was sent; the handle detaches and the thread exits after @@ -626,6 +652,7 @@ where if n_channels == 0 { return; // format not yet negotiated by param_changed } + if let Some(mut buffer) = stream.dequeue_buffer() { // Read the requested frame count before mutably borrowing datas_mut(). let requested = buffer.requested() as usize; @@ -685,7 +712,7 @@ where // RT_PROCESS is intentionally absent: with add_local_listener the process callback always // runs on this mainloop thread, not the separate data-loop thread RT_PROCESS creates. - // The mainloop thread is promoted to RT by the caller (device.rs) before mainloop.run(). + // The worker thread is promoted to RT after signalling the main thread (see device.rs). let flags = pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS; stream.connect(pw::spa::utils::Direction::Output, None, flags, &mut params)?; @@ -858,6 +885,7 @@ where if n_channels == 0 { return; // format not yet negotiated by param_changed } + if let Some(mut buffer) = stream.dequeue_buffer() { let datas = buffer.datas_mut(); if datas.is_empty() { @@ -899,7 +927,7 @@ where // RT_PROCESS is intentionally absent: with add_local_listener the process callback always // runs on this mainloop thread, not the separate data-loop thread RT_PROCESS creates. - // The mainloop thread is promoted to RT by the caller (device.rs) before mainloop.run(). + // The worker thread is promoted to RT after signalling the main thread (see device.rs). let flags = pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS; stream.connect(pw::spa::utils::Direction::Input, None, flags, &mut params)?; diff --git a/src/host/pulseaudio/mod.rs b/src/host/pulseaudio/mod.rs index 3caa9ebea..a2a7ca282 100644 --- a/src/host/pulseaudio/mod.rs +++ b/src/host/pulseaudio/mod.rs @@ -338,7 +338,7 @@ impl DeviceTrait for Device { }; let client = client.clone(); - if let Some(dur) = timeout { + let stream = if let Some(dur) = timeout { // Run stream creation on a thread so we can bound the wait. If the PulseAudio server // is hung, `create_record_stream` would block forever. let (tx, rx) = std::sync::mpsc::channel(); @@ -360,7 +360,9 @@ impl DeviceTrait for Device { } } else { stream::Stream::new_record(client, params, data_callback, error_callback) - } + }?; + stream.signal_ready(); + Ok(stream) } fn build_output_stream_raw( @@ -411,7 +413,7 @@ impl DeviceTrait for Device { }; let client = client.clone(); - if let Some(dur) = timeout { + let stream = if let Some(dur) = timeout { // Run stream creation on a thread so we can bound the wait. If the PulseAudio server // is hung, `create_playback_stream` would block forever. let (tx, rx) = std::sync::mpsc::channel(); @@ -433,7 +435,9 @@ impl DeviceTrait for Device { } } else { stream::Stream::new_playback(client, params, data_callback, error_callback) - } + }?; + stream.signal_ready(); + Ok(stream) } fn description(&self) -> Result { diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs index 766c76c61..0c796b249 100644 --- a/src/host/pulseaudio/stream.rs +++ b/src/host/pulseaudio/stream.rs @@ -1,6 +1,6 @@ use std::{ sync::{ - atomic::{self, AtomicBool, AtomicU64}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Condvar, Mutex, }, time::{Duration, Instant}, @@ -11,7 +11,7 @@ use futures::FutureExt as _; use pulseaudio::{protocol, AsPlaybackSource}; use crate::{ - host::{emit_error, ErrorCallbackArc}, + host::{emit_error, latch::Latch, ErrorCallbackArc}, traits::StreamTrait, Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, SampleFormat, StreamInstant, @@ -44,7 +44,7 @@ impl LatencyHandle { // Signal cancellation and wake the thread immediately fn cancel(&self) { - self.cancel.store(true, atomic::Ordering::Relaxed); + self.cancel.store(true, Ordering::Relaxed); self.notify(); } } @@ -57,6 +57,7 @@ enum StreamInner { pub struct Stream { inner: StreamInner, workers: Vec>, + latch: Latch, } impl Drop for Stream { @@ -74,6 +75,10 @@ impl Drop for Stream { handle.cancel(); } } + + // Unpark the threads in case they're sleeping. + self.signal_ready(); + for handle in self.workers.drain(..) { // Prevent self-join: a worker thread may surface an error // through the user's error_callback, and that callback may @@ -188,8 +193,8 @@ impl Stream { // Interpolate the latency based on elapsed time since the last // poll: as audio plays, the DAC drains the buffer at a constant // rate, so the latency decreases linearly between polls. - let stored_latency = latency_clone.load(atomic::Ordering::Relaxed); - let poll_usec = poll_clone.load(atomic::Ordering::Relaxed); + let stored_latency = latency_clone.load(Ordering::Relaxed); + let poll_usec = poll_clone.load(Ordering::Relaxed); // Cap to LATENCY_MAX_INTERVAL: the linear-drain assumption is only valid for that // window, and a stale poll_usec (e.g. after cork/uncork where timing_info blocks) // would otherwise saturate latency to zero. @@ -243,18 +248,18 @@ impl Stream { let error_callback_clone = error_callback.clone(); let cancel_driver = handle.cancel.clone(); - // The barrier prevents the worker and latency threads from firing callbacks before the - // caller has received the Stream handle. - let ready = std::sync::Arc::new(std::sync::Barrier::new(3)); + // The latch is released just before the `Stream` is returned so the driver and latency + // threads cannot fire any callbacks before the caller has the handle. + let mut latch = Latch::new(); + let waiter_driver = latch.waiter(); - let ready_worker = ready.clone(); let driver_handle = std::thread::spawn(move || { - ready_worker.wait(); + waiter_driver.wait(); if let Err(e) = block_on(stream_clone.play_all()) { // A server playback error is expected when the client // closes their stream. No need to report it back to // the client. - if !cancel_driver.load(atomic::Ordering::Relaxed) { + if !cancel_driver.load(Ordering::Relaxed) { emit_error(&error_callback_clone, Error::from(e)); } } @@ -266,11 +271,11 @@ impl Stream { let latency_clone = current_latency_micros.clone(); let poll_clone = last_poll_micros.clone(); - let ready_latency = ready.clone(); + let waiter_latency = latch.waiter(); let latency_handle = std::thread::spawn(move || { - ready_latency.wait(); + waiter_latency.wait(); loop { - if cancel_thread.load(atomic::Ordering::Relaxed) { + if cancel_thread.load(Ordering::Relaxed) { break; } @@ -284,7 +289,7 @@ impl Stream { let poll_since_epoch = Instant::now().saturating_duration_since(start).as_micros() as u64; - poll_clone.store(poll_since_epoch, atomic::Ordering::Relaxed); + poll_clone.store(poll_since_epoch, Ordering::Relaxed); store_latency( &latency_clone, @@ -304,10 +309,12 @@ impl Stream { } }); - ready.wait(); + latch.add_thread(driver_handle.thread().clone()); + latch.add_thread(latency_handle.thread().clone()); Ok(Self { inner: StreamInner::Playback(stream, start, handle), workers: vec![driver_handle, latency_handle], + latch, }) } @@ -349,8 +356,8 @@ impl Stream { // Interpolate the latency based on elapsed time since the last poll: as audio records, // the ADC fills the buffer at a constant rate, so the latency increases linearly // between polls. - let stored_latency = latency_clone.load(atomic::Ordering::Relaxed); - let poll_usec = poll_clone.load(atomic::Ordering::Relaxed); + let stored_latency = latency_clone.load(Ordering::Relaxed); + let poll_usec = poll_clone.load(Ordering::Relaxed); // Cap to LATENCY_MAX_INTERVAL: the linear-fill assumption is only valid for that // window, and a stale poll_usec (e.g. after cork/uncork where timing_info blocks) // would otherwise keep inflating the interpolated latency up to the cap. @@ -395,45 +402,61 @@ impl Stream { let stream_clone = stream.clone(); let latency_clone = current_latency_micros.clone(); let poll_clone = last_poll_micros.clone(); - let latency_handle = std::thread::spawn(move || loop { - if cancel_thread.load(atomic::Ordering::Relaxed) { - break; - } - let timing_info = match block_on(stream_clone.timing_info()) { - Ok(timing_info) => timing_info, - Err(e) => { - error_callback(Error::from(e)); + // The latch is released just before the `Stream` is returned so the latency thread cannot + // fire any callbacks before the caller has the handle. + let mut latch = Latch::new(); + let waiter_latency = latch.waiter(); + + let latency_handle = std::thread::spawn(move || { + waiter_latency.wait(); + loop { + if cancel_thread.load(Ordering::Relaxed) { break; } - }; - let poll_since_epoch = - Instant::now().saturating_duration_since(start).as_micros() as u64; - poll_clone.store(poll_since_epoch, atomic::Ordering::Relaxed); - - store_latency( - &latency_clone, - sample_spec, - timing_info.source_usec, - timing_info.write_offset, - timing_info.read_offset, - ); - - // Wait until woken by a read/play/pause/drop event or until LATENCY_MAX_INTERVAL. - let (lock, cvar) = &*update_thread; - let Ok(guard) = lock.lock() else { break }; - let (mut guard, _) = cvar - .wait_timeout_while(guard, LATENCY_MAX_INTERVAL, |notified| !*notified) - .unwrap_or_else(|e| e.into_inner()); - *guard = false; + let timing_info = match block_on(stream_clone.timing_info()) { + Ok(timing_info) => timing_info, + Err(e) => { + error_callback(Error::from(e)); + break; + } + }; + + let poll_since_epoch = + Instant::now().saturating_duration_since(start).as_micros() as u64; + poll_clone.store(poll_since_epoch, Ordering::Relaxed); + + store_latency( + &latency_clone, + sample_spec, + timing_info.source_usec, + timing_info.write_offset, + timing_info.read_offset, + ); + + // Wait until woken by a read/play/pause/drop event or until LATENCY_MAX_INTERVAL. + let (lock, cvar) = &*update_thread; + let Ok(guard) = lock.lock() else { break }; + let (mut guard, _) = cvar + .wait_timeout_while(guard, LATENCY_MAX_INTERVAL, |notified| !*notified) + .unwrap_or_else(|e| e.into_inner()); + *guard = false; + } }); + latch.add_thread(latency_handle.thread().clone()); Ok(Self { inner: StreamInner::Record(stream, start, handle), workers: vec![latency_handle], + latch, }) } + + /// Releases the latch so the worker thread can begin processing audio callbacks. + pub(crate) fn signal_ready(&self) { + self.latch.release(); + } } fn store_latency( @@ -450,6 +473,6 @@ fn store_latency( latency_micros.store( latency.as_micros().try_into().unwrap_or(u64::MAX), - atomic::Ordering::Relaxed, + Ordering::Relaxed, ); } diff --git a/src/host/wasapi/device.rs b/src/host/wasapi/device.rs index 563dd9249..9e0ea918e 100644 --- a/src/host/wasapi/device.rs +++ b/src/host/wasapi/device.rs @@ -142,7 +142,9 @@ impl DeviceTrait for Device { let stream_inner = self.build_input_stream_raw_inner(config, sample_format, timeout)?; let error_callback: ErrorCallbackArc = Arc::new(Mutex::new(error_callback)); let monitor = self.default_device_monitor()?; - Stream::new_input(stream_inner, data_callback, error_callback, monitor) + let stream = Stream::new_input(stream_inner, data_callback, error_callback, monitor)?; + stream.signal_ready(); + Ok(stream) } fn build_output_stream_raw( @@ -160,7 +162,9 @@ impl DeviceTrait for Device { let stream_inner = self.build_output_stream_raw_inner(config, sample_format, timeout)?; let error_callback: ErrorCallbackArc = Arc::new(Mutex::new(error_callback)); let monitor = self.default_device_monitor()?; - Stream::new_output(stream_inner, data_callback, error_callback, monitor) + let stream = Stream::new_output(stream_inner, data_callback, error_callback, monitor)?; + stream.signal_ready(); + Ok(stream) } } diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 131aa4150..dbb1b9195 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -18,7 +18,10 @@ use windows::Win32::{ }; use crate::{ - host::{emit_error, equilibrium::fill_equilibrium, frames_to_duration, ErrorCallbackArc}, + host::{ + emit_error, equilibrium::fill_equilibrium, frames_to_duration, latch::Latch, + ErrorCallbackArc, + }, traits::StreamTrait, Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, ResultExt, SampleFormat, SampleRate, StreamConfig, @@ -212,6 +215,9 @@ pub struct Stream { // default changes. Dropped after the run thread joins, ensuring the HANDLE is not // waited on when it is closed. _default_device_monitor: Option, + + // Latch that ensures no callbacks fire before the caller receives the `Stream` handle. + latch: Latch, } // SAFETY: Windows Event HANDLEs are safe to send between threads - they are designed for @@ -219,6 +225,7 @@ pub struct Stream { // - JoinHandle<()> is Send // - Sender is Send // - Foundation::HANDLE is Send (Windows synchronization primitive) +// - Latch is Send // See: https://learn.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-createeventa unsafe impl Send for Stream {} @@ -228,6 +235,7 @@ unsafe impl Send for Stream {} // - JoinHandle<()> is Sync // - Sender is Sync (uses internal synchronization) // - Foundation::HANDLE for event objects supports concurrent access +// - Latch is Sync // The audio thread owns all COM objects, so no cross-thread COM access occurs. unsafe impl Sync for Stream {} @@ -341,16 +349,15 @@ impl Stream { pending_scheduled_event, }; - // The barrier prevents the worker from firing data callbacks before the caller has - // received the Stream handle. Without it, callbacks could arrive before the caller can - // pause, stop, or drop the stream. - let ready = std::sync::Arc::new(std::sync::Barrier::new(2)); - let ready_worker = ready.clone(); + // The latch is released just before the `Stream` is returned so the worker cannot fire any + // callbacks before the caller has the handle. + let mut latch = Latch::new(); + let waiter = latch.waiter(); let thread = thread::Builder::new() .name("cpal_wasapi_in".to_owned()) .spawn(move || { - ready_worker.wait(); + waiter.wait(); run_input(run_context, &mut data_callback, &error_callback) }) .map_err(|e| { @@ -360,6 +367,7 @@ impl Stream { ) })?; + latch.add_thread(thread.thread().clone()); let stream = Stream { thread: Some(thread), commands: tx, @@ -367,9 +375,8 @@ impl Stream { period_frames, qpc_frequency: qpc_frequency as u64, _default_device_monitor: default_device_monitor, + latch, }; - - ready.wait(); Ok(stream) } @@ -412,16 +419,15 @@ impl Stream { pending_scheduled_event, }; - // The barrier prevents the worker from firing data callbacks before the caller has - // received the Stream handle. Without it, callbacks could arrive before the caller can - // pause, stop, or drop the stream. - let ready = std::sync::Arc::new(std::sync::Barrier::new(2)); - let ready_worker = ready.clone(); + // The latch is released just before the `Stream` is returned so the worker cannot fire any + // callbacks before the caller has the handle. + let mut latch = Latch::new(); + let waiter = latch.waiter(); let thread = thread::Builder::new() .name("cpal_wasapi_out".to_owned()) .spawn(move || { - ready_worker.wait(); + waiter.wait(); run_output(run_context, &mut data_callback, &error_callback) }) .map_err(|e| { @@ -431,6 +437,7 @@ impl Stream { ) })?; + latch.add_thread(thread.thread().clone()); let stream = Stream { thread: Some(thread), commands: tx, @@ -438,12 +445,16 @@ impl Stream { period_frames, qpc_frequency: qpc_frequency as u64, _default_device_monitor: default_device_monitor, + latch, }; - - ready.wait(); Ok(stream) } + /// Releases the latch so the worker thread can begin processing audio callbacks. + pub(crate) fn signal_ready(&self) { + self.latch.release(); + } + fn push_command(&self, command: Command) -> Result<(), SendError> { self.commands.send(command)?; unsafe { @@ -455,6 +466,9 @@ impl Stream { impl Drop for Stream { fn drop(&mut self) { + // Release the latch in case the stream is dropped before signal_ready() was called. + self.signal_ready(); + let _ = self.push_command(Command::Terminate); if let Some(handle) = self.thread.take() { // Prevent self-join: Terminate was sent; the thread exits after the current callback diff --git a/src/lib.rs b/src/lib.rs index 37d0adbd8..9aa5d58b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,10 +78,11 @@ //! ); //! ``` //! -//! While the stream is running, the selected audio device will periodically call the data callback -//! that was passed to the function. For input streams, the callback receives `&`[`Data`] containing -//! captured audio samples. For output streams, the callback receives `&mut`[`Data`] to be filled -//! with audio samples for playback. +//! Streams are returned in a paused state. Once the stream has been started with +//! [`Stream::play`](traits::StreamTrait::play), the selected audio device will periodically call +//! the data callback that was passed to the function. For input streams, the callback receives +//! `&`[`Data`] containing captured audio samples. For output streams, the callback receives +//! `&mut`[`Data`] to be filled with audio samples for playback. //! //! > **Note**: Creating and running a stream will *not* block the thread. On modern platforms, the //! > given callback is called by a dedicated, high-priority thread responsible for delivering @@ -117,8 +118,8 @@ //! } //! ``` //! -//! Not all platforms automatically run the stream upon creation. To ensure the stream has started, -//! we can use [`Stream::play`](traits::StreamTrait::play). +//! Streams are always returned in a paused state, so we must call +//! [`Stream::play`](traits::StreamTrait::play) to start running the data callback. //! //! ```no_run //! # use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; diff --git a/src/traits.rs b/src/traits.rs index a8d5cf4aa..32eacc079 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -409,8 +409,8 @@ pub trait DeviceTrait { pub trait StreamTrait { /// Run the stream. /// - /// Note: Not all platforms automatically run the stream upon creation, so it is important to - /// call `play` after creation if it is expected that the stream should run immediately. + /// Streams returned by `build_*_stream` are always paused, so `play` must be called before the + /// data callback will fire. /// /// # Errors ///