Skip to content

Commit 7538971

Browse files
committed
refactor: SourcesQueueOutput silence and span logic
1 parent 5d305d1 commit 7538971

1 file changed

Lines changed: 93 additions & 88 deletions

File tree

src/queue.rs

Lines changed: 93 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::sync::atomic::{AtomicBool, Ordering};
55
use std::sync::{Arc, Mutex};
66
use std::time::Duration;
77

8-
use crate::source::{Empty, SeekError, Source, Zero};
8+
use dasp_sample::Sample as _;
9+
10+
use crate::source::{Empty, SeekError, Source};
911
use crate::Sample;
1012

1113
use crate::common::{ChannelCount, SampleRate};
@@ -36,7 +38,7 @@ pub fn queue(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput>, SourcesQueue
3638
signal_after_end: None,
3739
input: input.clone(),
3840
samples_consumed_in_span: 0,
39-
padding_samples_remaining: 0,
41+
silence_samples_remaining: 0,
4042
};
4143

4244
(input, output)
@@ -72,7 +74,8 @@ impl SourcesQueueInput {
7274
///
7375
/// The `Receiver` will be signalled when the sound has finished playing.
7476
///
75-
/// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver` instead.
77+
/// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver`
78+
/// instead.
7679
#[inline]
7780
pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
7881
where
@@ -94,6 +97,11 @@ impl SourcesQueueInput {
9497
.store(keep_alive_if_empty, Ordering::Release);
9598
}
9699

100+
/// Returns whether the queue stays alive if there's no more sound to play.
101+
pub fn keep_alive_if_empty(&self) -> bool {
102+
self.keep_alive_if_empty.load(Ordering::Acquire)
103+
}
104+
97105
/// Removes all the sounds from the queue. Returns the number of sounds cleared.
98106
pub fn clear(&self) -> usize {
99107
let mut sounds = self.next_sounds.lock().unwrap();
@@ -102,6 +110,7 @@ impl SourcesQueueInput {
102110
len
103111
}
104112
}
113+
105114
/// The output of the queue. Implements `Source`.
106115
pub struct SourcesQueueOutput {
107116
// The current iterator that produces samples.
@@ -116,72 +125,74 @@ pub struct SourcesQueueOutput {
116125
// Track samples consumed in the current span to detect mid-span endings.
117126
samples_consumed_in_span: usize,
118127

119-
// When a source ends mid-frame, this counts how many silence samples to inject
120-
// to complete the frame before transitioning to the next source.
121-
padding_samples_remaining: usize,
122-
}
123-
124-
/// Returns a threshold span length that ensures frame alignment.
125-
///
126-
/// Spans must end on frame boundaries (multiples of channel count) to prevent
127-
/// channel misalignment. Returns ~512 samples rounded to the nearest frame.
128-
#[inline]
129-
fn threshold(channels: ChannelCount) -> usize {
130-
const BASE_SAMPLES: usize = 512;
131-
let ch = channels.get() as usize;
132-
BASE_SAMPLES.div_ceil(ch) * ch
128+
// This counts how many silence samples to inject when a source ends.
129+
silence_samples_remaining: usize,
133130
}
134131

135132
impl Source for SourcesQueueOutput {
136133
#[inline]
137134
fn current_span_len(&self) -> Option<usize> {
138-
if !self.current.is_exhausted() {
139-
return self.current.current_span_len();
140-
} else if self.input.keep_alive_if_empty.load(Ordering::Acquire)
141-
&& self.input.next_sounds.lock().unwrap().is_empty()
135+
let len = match self.current.current_span_len() {
136+
Some(len) if len == 0 && self.silence_samples_remaining > 0 => {
137+
// - Current source ended mid-frame, and we're injecting silence to frame-align it.
138+
self.silence_samples_remaining
139+
}
140+
Some(len) if len > 0 || !self.input.keep_alive_if_empty() => {
141+
// - Current source is not exhausted, and is reporting some span length, or
142+
// - Current source is exhausted, and won't output silence after it: end of queue.
143+
len
144+
}
145+
_ => {
146+
// - Current source is not exhausted, and is reporting no span length, or
147+
// - Current source is exhausted, and will output silence after it.
148+
self.current.channels().get() as usize
149+
}
150+
};
151+
152+
// Special case: if the current source is `Empty` and there are queued sounds after it.
153+
if len == 0
154+
&& self
155+
.current
156+
.total_duration()
157+
.is_some_and(|duration| duration.is_zero())
142158
{
143-
// Return what that Zero's current_span_len() will be: Some(threshold(channels)).
144-
return Some(threshold(self.current.channels()));
159+
if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
160+
return next
161+
.current_span_len()
162+
.or_else(|| Some(next.channels().get() as usize));
163+
}
145164
}
146165

147-
None
166+
// A queue must never return None: that could cause downstream sources to assume sample
167+
// rate or channel count would never change from one queue item to the next.
168+
Some(len)
148169
}
149170

150171
#[inline]
151172
fn channels(&self) -> ChannelCount {
152-
if !self.current.is_exhausted() {
153-
// Current source is active (producing samples)
154-
// - Initially: never (Empty is exhausted immediately)
155-
// - After append: the appended source while playing
156-
// - With keep_alive: Zero (silence) while playing
157-
self.current.channels()
158-
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
159-
// Current source exhausted, peek at next queued source
160-
// This is critical: UniformSourceIterator queries metadata during append,
161-
// before any samples are pulled. We must report the next source's metadata.
162-
next.channels()
163-
} else {
164-
// Queue is empty, no sources queued
165-
// - Initially: Empty
166-
// - With keep_alive: exhausted Zero between silence chunks (matches Empty)
167-
// - Without keep_alive: Empty (will end on next())
168-
self.current.channels()
173+
if self.current.is_exhausted() && self.silence_samples_remaining == 0 {
174+
if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
175+
// Current source exhausted, peek at next queued source
176+
// This is critical: UniformSourceIterator queries metadata during append,
177+
// before any samples are pulled. We must report the next source's metadata.
178+
return next.channels();
179+
}
169180
}
181+
182+
self.current.channels()
170183
}
171184

172185
#[inline]
173186
fn sample_rate(&self) -> SampleRate {
174-
if !self.current.is_exhausted() {
175-
// Current source is active (producing samples)
176-
self.current.sample_rate()
177-
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
178-
// Current source exhausted, peek at next queued source
179-
// This prevents wrong resampling setup in UniformSourceIterator
180-
next.sample_rate()
181-
} else {
182-
// Queue is empty, no sources queued
183-
self.current.sample_rate()
187+
if self.current.is_exhausted() && self.silence_samples_remaining == 0 {
188+
if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
189+
// Current source exhausted, peek at next queued source
190+
// This prevents wrong resampling setup in UniformSourceIterator
191+
return next.sample_rate();
192+
}
184193
}
194+
195+
self.current.sample_rate()
185196
}
186197

187198
#[inline]
@@ -211,34 +222,45 @@ impl Iterator for SourcesQueueOutput {
211222
fn next(&mut self) -> Option<Self::Item> {
212223
loop {
213224
// If we're padding to complete a frame, return silence.
214-
if self.padding_samples_remaining > 0 {
215-
self.padding_samples_remaining -= 1;
216-
return Some(0.0);
225+
if self.silence_samples_remaining > 0 {
226+
self.silence_samples_remaining -= 1;
227+
return Some(Sample::EQUILIBRIUM);
217228
}
218229

219230
// Basic situation that will happen most of the time.
220231
if let Some(sample) = self.current.next() {
221-
self.samples_consumed_in_span += 1;
232+
self.samples_consumed_in_span = self
233+
.samples_consumed_in_span
234+
.checked_add(1)
235+
.unwrap_or_else(|| {
236+
self.samples_consumed_in_span % self.current.channels().get() as usize + 1
237+
});
222238
return Some(sample);
223239
}
224240

225-
// Source ended - check if we ended mid-frame and need padding.
226-
let channels = self.current.channels().get() as usize;
227-
let incomplete_frame_samples = self.samples_consumed_in_span % channels;
228-
if incomplete_frame_samples > 0 {
229-
// We're mid-frame - need to pad with silence to complete it.
230-
self.padding_samples_remaining = channels - incomplete_frame_samples;
231-
// Reset counter now since we're transitioning to a new span.
232-
self.samples_consumed_in_span = 0;
233-
// Continue loop - next iteration will inject silence.
234-
continue;
241+
// Current source is exhausted - check if we ended mid-frame and need padding.
242+
if self.samples_consumed_in_span > 0 {
243+
let channels = self.current.channels().get() as usize;
244+
let incomplete_frame_samples = self.samples_consumed_in_span % channels;
245+
if incomplete_frame_samples > 0 {
246+
// We're mid-frame - need to pad with silence to complete it.
247+
self.silence_samples_remaining = channels - incomplete_frame_samples;
248+
// Reset counter now since we're transitioning to a new span.
249+
self.samples_consumed_in_span = 0;
250+
// Continue loop - next iterations will inject silence.
251+
continue;
252+
}
235253
}
236254

237-
// Reset counter and move to next sound.
238-
// In order to avoid inlining this expensive operation, the code is in another function.
239-
self.samples_consumed_in_span = 0;
255+
// Move to next sound, play silence, or end.
256+
// In order to avoid inlining that expensive operation, the code is in another function.
240257
if self.go_next().is_err() {
241-
return None;
258+
if self.input.keep_alive_if_empty() {
259+
self.silence_samples_remaining = self.current.channels().get() as usize;
260+
continue;
261+
} else {
262+
return None;
263+
}
242264
}
243265
}
244266
}
@@ -251,7 +273,7 @@ impl Iterator for SourcesQueueOutput {
251273

252274
impl SourcesQueueOutput {
253275
// Called when `current` is empty, and we must jump to the next element.
254-
// Returns `Ok` if the sound should continue playing, or an error if it should stop.
276+
// Returns `Ok` if there is another sound should continue playing, or `Err` when there is not.
255277
//
256278
// This method is separate so that it is not inlined.
257279
fn go_next(&mut self) -> Result<(), ()> {
@@ -261,23 +283,7 @@ impl SourcesQueueOutput {
261283

262284
let (next, signal_after_end) = {
263285
let mut next = self.input.next_sounds.lock().unwrap();
264-
265-
if let Some(next) = next.pop_front() {
266-
next
267-
} else {
268-
let channels = self.current.channels();
269-
let silence = Box::new(Zero::new_samples(
270-
channels,
271-
self.current.sample_rate(),
272-
threshold(channels),
273-
)) as Box<_>;
274-
if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
275-
// Play a short silence in order to avoid spinlocking.
276-
(silence, None)
277-
} else {
278-
return Err(());
279-
}
280-
}
286+
next.pop_front().ok_or(())?
281287
};
282288

283289
self.current = next;
@@ -350,7 +356,6 @@ mod tests {
350356
}
351357

352358
#[test]
353-
#[ignore] // TODO: not yet implemented
354359
fn no_delay_when_added() {
355360
let (tx, mut rx) = queue::queue(true);
356361

0 commit comments

Comments
 (0)