diff --git a/Containers.Test/DelayLineTests.cs b/Containers.Test/DelayLineTests.cs new file mode 100644 index 0000000..7077086 --- /dev/null +++ b/Containers.Test/DelayLineTests.cs @@ -0,0 +1,157 @@ +// Copyright (c) ktsu.dev +// All rights reserved. +// Licensed under the MIT license. + +namespace ktsu.Containers.Tests; + +using Microsoft.VisualStudio.TestTools.UnitTesting; + +[TestClass] +public class DelayLineTests +{ + [TestMethod] + public void Constructor_NonPositiveCapacity_Throws() + { + Assert.ThrowsExactly(() => new DelayLine(0)); + Assert.ThrowsExactly(() => new DelayLine(-4)); + } + + [TestMethod] + public void Capacity_MatchesRequestedMaxDelay() + { + DelayLine line = new(100); + Assert.AreEqual(100, line.Capacity); + } + + [TestMethod] + public void NewDelayLine_ReadsZero() + { + DelayLine line = new(8); + for (int delay = 0; delay <= line.Capacity; delay++) + { + Assert.AreEqual(0f, line.Read(delay)); + } + } + + [TestMethod] + public void Read_ReturnsSampleNSamplesInThePast() + { + DelayLine line = new(8); + line.Write(1f); + line.Write(2f); + line.Write(3f); + + // Delay 0 == most recent, delay 2 == oldest of the three writes. + Assert.AreEqual(3f, line.Read(0)); + Assert.AreEqual(2f, line.Read(1)); + Assert.AreEqual(1f, line.Read(2)); + } + + [TestMethod] + public void Process_OutputsInputFromExactlyDelaySamplesEarlier() + { + DelayLine line = new(4); + // Process(x, 4) yields x[n - 4]: zero until the line has been primed with 4 samples. + Assert.AreEqual(0f, line.Process(10f, 4)); + Assert.AreEqual(0f, line.Process(20f, 4)); + Assert.AreEqual(0f, line.Process(30f, 4)); + Assert.AreEqual(0f, line.Process(40f, 4)); + Assert.AreEqual(10f, line.Process(50f, 4)); + Assert.AreEqual(20f, line.Process(60f, 4)); + } + + [TestMethod] + public void Process_ZeroDelay_ReturnsInput() + { + DelayLine line = new(4); + Assert.AreEqual(7f, line.Process(7f, 0)); + Assert.AreEqual(9f, line.Process(9f, 0)); + } + + [TestMethod] + public void Read_OutOfRange_Throws() + { + DelayLine line = new(8); + Assert.ThrowsExactly(() => line.Read(-1)); + Assert.ThrowsExactly(() => line.Read(9)); + } + + [TestMethod] + public void ReadInterpolated_HalfwayBetweenSamples_AveragesNeighbours() + { + DelayLine line = new(8); + line.Write(0f); + line.Write(10f); + + // delay 0 -> 10 (newest), delay 1 -> 0 (older). 0.5 interpolates halfway. + Assert.AreEqual(5f, line.ReadInterpolated(0.5f), 1e-6f); + } + + [TestMethod] + public void ReadInterpolated_IntegerDelay_MatchesRead() + { + DelayLine line = new(8); + line.Write(3f); + line.Write(7f); + line.Write(11f); + + Assert.AreEqual(line.Read(1), line.ReadInterpolated(1f), 1e-6f); + } + + [TestMethod] + public void ReadInterpolated_OutOfRange_Throws() + { + DelayLine line = new(8); + Assert.ThrowsExactly(() => line.ReadInterpolated(-0.5f)); + Assert.ThrowsExactly(() => line.ReadInterpolated(8.5f)); + Assert.ThrowsExactly(() => line.ReadInterpolated(float.NaN)); + } + + [TestMethod] + public void Write_FlushesDenormalsToZero() + { + DelayLine line = new(4); + float denormal = float.Epsilon; // smallest subnormal float, well below the threshold + line.Write(denormal); + Assert.AreEqual(0f, line.Read(0), "Denormal input must be flushed to zero."); + } + + [TestMethod] + public void Write_KeepsNormalSmallValues() + { + DelayLine line = new(4); + const float audible = 1e-6f; // ~ -120 dBFS, a normal float that must be preserved + line.Write(audible); + Assert.AreEqual(audible, line.Read(0)); + } + + [TestMethod] + public void Clear_ZeroesAllSamples() + { + DelayLine line = new(4); + line.Write(1f); + line.Write(2f); + line.Clear(); + + for (int delay = 0; delay <= line.Capacity; delay++) + { + Assert.AreEqual(0f, line.Read(delay)); + } + } + + [TestMethod] + public void WrapAround_LongStreamReadsCorrectDelay() + { + DelayLine line = new(3); + float previous = 0f; + for (int i = 1; i <= 1000; i++) + { + // Process(i, 1) returns the value written on the previous call (i - 1), 0 on the first. + float output = line.Process(i, 1); + Assert.AreEqual(i - 1, output); + previous = output; + } + + Assert.AreEqual(999f, previous); + } +} diff --git a/Containers.Test/SpscRingBufferTests.cs b/Containers.Test/SpscRingBufferTests.cs new file mode 100644 index 0000000..af23dae --- /dev/null +++ b/Containers.Test/SpscRingBufferTests.cs @@ -0,0 +1,158 @@ +// Copyright (c) ktsu.dev +// All rights reserved. +// Licensed under the MIT license. + +namespace ktsu.Containers.Tests; + +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +[TestClass] +public class SpscRingBufferTests +{ + [TestMethod] + public void Constructor_NonPositiveCapacity_Throws() + { + Assert.ThrowsExactly(() => new SpscRingBuffer(0)); + Assert.ThrowsExactly(() => new SpscRingBuffer(-1)); + } + + [TestMethod] + public void Capacity_IsAtLeastRequested() + { + SpscRingBuffer buffer = new(5); + Assert.IsTrue(buffer.Capacity >= 5, "Usable capacity must be at least the requested amount."); + } + + [TestMethod] + public void NewBuffer_IsEmpty() + { + SpscRingBuffer buffer = new(4); + Assert.IsTrue(buffer.IsEmpty); + Assert.AreEqual(0, buffer.Count); + Assert.IsFalse(buffer.TryDequeue(out _)); + } + + [TestMethod] + public void EnqueueDequeue_PreservesFifoOrder() + { + SpscRingBuffer buffer = new(8); + for (int i = 0; i < 5; i++) + { + Assert.IsTrue(buffer.TryEnqueue(i)); + } + + for (int i = 0; i < 5; i++) + { + Assert.IsTrue(buffer.TryDequeue(out int value)); + Assert.AreEqual(i, value); + } + + Assert.IsTrue(buffer.IsEmpty); + } + + [TestMethod] + public void TryEnqueue_WhenFull_ReturnsFalse() + { + SpscRingBuffer buffer = new(4); + int enqueued = 0; + while (buffer.TryEnqueue(enqueued)) + { + enqueued++; + } + + Assert.IsTrue(enqueued >= 4, "Should accept at least the requested capacity before reporting full."); + Assert.IsFalse(buffer.TryEnqueue(999)); + } + + [TestMethod] + public void TryPeek_DoesNotRemoveElement() + { + SpscRingBuffer buffer = new(4); + buffer.TryEnqueue(42); + + Assert.IsTrue(buffer.TryPeek(out int peeked)); + Assert.AreEqual(42, peeked); + Assert.AreEqual(1, buffer.Count); + + Assert.IsTrue(buffer.TryDequeue(out int dequeued)); + Assert.AreEqual(42, dequeued); + } + + [TestMethod] + public void WrapAround_ReusesSlotsCorrectly() + { + SpscRingBuffer buffer = new(4); + + // Cycle through many more items than capacity to force repeated wraparound. + for (int i = 0; i < 1000; i++) + { + Assert.IsTrue(buffer.TryEnqueue(i)); + Assert.IsTrue(buffer.TryDequeue(out int value)); + Assert.AreEqual(i, value); + } + + Assert.IsTrue(buffer.IsEmpty); + } + + [TestMethod] + public async Task ConcurrentProducerConsumer_TransfersAllItemsInOrder() + { + const int itemCount = 1_000_000; + SpscRingBuffer buffer = new(1024); + + Task producer = Task.Run(() => + { + int produced = 0; + while (produced < itemCount) + { + if (buffer.TryEnqueue(produced)) + { + produced++; + } + else + { + Thread.SpinWait(1); + } + } + }); + + Task consumer = Task.Run(() => + { + int expected = 0; + while (expected < itemCount) + { + if (buffer.TryDequeue(out int value)) + { + if (value != expected) + { + return false; + } + + expected++; + } + else + { + Thread.SpinWait(1); + } + } + + return true; + }); + + await Task.WhenAll(producer, consumer).ConfigureAwait(false); + Assert.IsTrue(await consumer.ConfigureAwait(false), "All items must be received exactly once and in order."); + Assert.IsTrue(buffer.IsEmpty); + } + + [TestMethod] + public void Dequeue_ReferenceType_ReleasesReference() + { + SpscRingBuffer buffer = new(4); + buffer.TryEnqueue("hello"); + Assert.IsTrue(buffer.TryDequeue(out string? value)); + Assert.AreEqual("hello", value); + Assert.IsTrue(buffer.IsEmpty); + } +} diff --git a/Containers/DelayLine.cs b/Containers/DelayLine.cs new file mode 100644 index 0000000..86c9df5 --- /dev/null +++ b/Containers/DelayLine.cs @@ -0,0 +1,193 @@ +// Copyright (c) ktsu.dev +// All rights reserved. +// Licensed under the MIT license. + +namespace ktsu.Containers; + +using System; + +/// +/// A fixed-capacity, single-channel circular delay line for audio-rate samples. +/// +/// +/// A delay line stores the most recent samples and lets you read any of them back by their age +/// (the number of samples ago they were written). It is the fundamental building block of delays, +/// echoes, reverbs, comb/all-pass filters, choruses and flangers. +/// +/// +/// Real-time safety. The backing store is allocated once in the constructor. After that, +/// , , and +/// never allocate, never take a lock and never block, so they are +/// safe to call from an audio callback. The buffer is intended to be owned and used by a single +/// thread (typically the audio thread). +/// +/// +/// +/// Denormal awareness. Feedback delay structures naturally decay towards zero, and on many +/// CPUs the resulting subnormal (denormal) floating-point values are tens to hundreds of times more +/// expensive to process, causing audible CPU spikes. Samples are flushed to zero once their +/// magnitude falls below on write, keeping the buffer free of +/// denormals without any audible effect. +/// +/// +/// +/// The internal capacity is rounded up to a power of two so that index wrapping uses a bitwise AND +/// rather than a modulo. +/// +/// +public sealed class DelayLine +{ + /// + /// Magnitude at or below which a sample is treated as denormal and flushed to zero on write. + /// + /// + /// The smallest normal is approximately 1.18e-38; this threshold sits well + /// above the subnormal range so all denormals are eliminated, while only flushing values far + /// below audibility (roughly -600 dBFS). + /// + public const float DenormalThreshold = 1e-30f; + + /// + /// The backing store, whose length is always a power of two. + /// + private readonly float[] buffer; + + /// + /// Bitmask equal to buffer.Length - 1, used to wrap indices. + /// + private readonly int mask; + + /// + /// Index of the next slot to be written. + /// + private int writeIndex; + + /// + /// Gets the maximum delay, in samples, that can be read back from this delay line. + /// + public int Capacity { get; } + + /// + /// Initializes a new instance of the class that can delay a signal by up + /// to samples. + /// + /// The maximum delay, in samples, the line must support. + /// Thrown when is less than one. + public DelayLine(int maxDelaySamples) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxDelaySamples); + + // One extra slot is needed so the oldest readable sample (delay == maxDelaySamples) is not + // the slot about to be overwritten. + int length = NextPower2(maxDelaySamples + 1); + buffer = new float[length]; + mask = length - 1; + Capacity = maxDelaySamples; + } + + /// + /// Writes a new sample into the delay line, advancing the write position by one. + /// + /// The sample to store. Denormal magnitudes are flushed to zero. + public void Write(float sample) + { + buffer[writeIndex] = FlushDenormal(sample); + writeIndex = (writeIndex + 1) & mask; + } + + /// + /// Reads the sample that is samples in the past. + /// + /// The delay in samples; 0 is the most recently written sample and is the oldest. + /// The delayed sample. + /// Thrown when is not in the range [0, Capacity]. + /// With this convention a delay of d returns x[n - d], so with a delay of d outputs the input from exactly d samples earlier. + public float Read(int delaySamples) + { + ArgumentOutOfRangeException.ThrowIfNegative(delaySamples); + ArgumentOutOfRangeException.ThrowIfGreaterThan(delaySamples, Capacity); + + // The most recently written sample sits one slot behind the write cursor (delay 0). + int index = (writeIndex - 1 - delaySamples) & mask; + return buffer[index]; + } + + /// + /// Reads a fractionally delayed sample using linear interpolation between adjacent samples. + /// + /// The fractional delay in samples; must lie in the range [0, Capacity]. + /// The linearly interpolated delayed sample. + /// Thrown when is not in the range [0, Capacity]. + /// + /// Linear interpolation is the standard choice for modulated delays (chorus/flanger). For static + /// integer delays prefer , which avoids the interpolation and the high-frequency + /// roll-off it introduces. + /// + public float ReadInterpolated(float delaySamples) + { + if (float.IsNaN(delaySamples) || delaySamples < 0f || delaySamples > Capacity) + { + throw new ArgumentOutOfRangeException(nameof(delaySamples), delaySamples, $"Delay must be in the range [0, {Capacity}]."); + } + + int delayInt = (int)delaySamples; + float frac = delaySamples - delayInt; + + int index0 = (writeIndex - 1 - delayInt) & mask; + // The interpolation partner is one sample older; clamp so we never read past Capacity. + int olderDelay = Math.Min(delayInt + 1, Capacity); + int index1 = (writeIndex - 1 - olderDelay) & mask; + + float s0 = buffer[index0]; + float s1 = buffer[index1]; + return s0 + ((s1 - s0) * frac); + } + + /// + /// Writes and returns the sample delayed by in a single call. + /// + /// The new sample to store. + /// The delay in samples; must lie in the range [0, Capacity]. + /// The delayed output sample, equal to the input from samples earlier. + /// Thrown when is not in the range [0, Capacity]. + /// This is the common per-sample idiom: push the input, then tap the output. + public float Process(float input, int delaySamples) + { + Write(input); + return Read(delaySamples); + } + + /// + /// Resets the delay line, zeroing all stored samples and the write position. + /// + public void Clear() + { + Array.Clear(buffer, 0, buffer.Length); + writeIndex = 0; + } + + /// + /// Flushes denormal magnitudes to zero. + /// + /// The value to sanitise. + /// Zero if the magnitude is at or below ; otherwise the value unchanged. + private static float FlushDenormal(float value) => + value is < DenormalThreshold and > -DenormalThreshold ? 0f : value; + + /// + /// Calculates the next power of two greater than or equal to the specified value. + /// + /// The value to round up. + /// The next power of two. + private static int NextPower2(int v) + { + v--; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v++; + return v; + } +} diff --git a/Containers/SpscRingBuffer.cs b/Containers/SpscRingBuffer.cs new file mode 100644 index 0000000..82534bd --- /dev/null +++ b/Containers/SpscRingBuffer.cs @@ -0,0 +1,231 @@ +// Copyright (c) ktsu.dev +// All rights reserved. +// Licensed under the MIT license. + +namespace ktsu.Containers; + +using System.Diagnostics.CodeAnalysis; +using System.Runtime.InteropServices; +using System.Threading; + +/// +/// A lock-free, wait-free, fixed-capacity single-producer/single-consumer (SPSC) ring buffer. +/// +/// +/// This buffer is designed for handing data across a thread boundary without locks or +/// allocations on the hot path, which makes it suitable for real-time scenarios such as +/// publishing meter/scope telemetry from an audio callback to a UI thread. +/// +/// +/// Threading contract. Exactly one thread may call the producer operations +/// () and exactly one (different or same) thread may call the +/// consumer operations (). Concurrent use by more than one +/// producer or more than one consumer is undefined behaviour. The producer and consumer +/// may run fully concurrently with respect to each other. +/// +/// +/// +/// Real-time safety. After construction, neither nor +/// allocates, takes a lock, or blocks. Both are O(1) and never +/// throw. The producer side is therefore safe to call from an audio/render thread; the +/// consumer side is intended for the UI/worker thread. +/// +/// +/// +/// Correctness relies on acquire/release ordering: the producer publishes the written +/// slot by releasing the tail index, and the consumer observes it by acquiring the tail +/// index (and vice versa for the head index). On the .NET memory model this is provided by +/// acquire/release reads and writes. +/// +/// +/// +/// One slot is reserved internally to distinguish the full and empty states without a +/// separate count, so the usable is one less than the (power-of-two) +/// internal array length. +/// +/// +/// The type of elements stored in the buffer. +[SuppressMessage( + "Naming", + "CA1711:Identifiers should not have incorrect suffix", + Justification = "Buffer accurately describes the ring buffer." +)] +public sealed class SpscRingBuffer +{ + /// + /// The backing store. Its length is always a power of two so that index wrapping can use + /// a bitwise AND with instead of a modulo. + /// + private readonly T[] buffer; + + /// + /// Bitmask equal to buffer.Length - 1, used to wrap indices. + /// + private readonly int mask; + + /// + /// Index of the next element to be read. Owned by the consumer; read by the producer. + /// + private SpscPaddedIndex head; + + /// + /// Index of the next slot to be written. Owned by the producer; read by the consumer. + /// + private SpscPaddedIndex tail; + + /// + /// Gets the maximum number of elements the buffer can hold at once. + /// + public int Capacity { get; } + + /// + /// Initializes a new instance of the class able to hold at + /// least elements. + /// + /// The minimum number of elements the buffer must be able to hold. + /// Thrown when is less than one. + public SpscRingBuffer(int capacity) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(capacity); + + // Reserve one slot to disambiguate full vs empty, then round up to a power of two. + int arrayLength = NextPower2(capacity + 1); + buffer = new T[arrayLength]; + mask = arrayLength - 1; + Capacity = arrayLength - 1; + } + + /// + /// Gets a value indicating whether the buffer currently appears empty. + /// + /// + /// The result is a snapshot and may be stale the moment it is read if the other thread is + /// active. It is exact only when observed from the consumer thread with no concurrent producer. + /// + public bool IsEmpty => Volatile.Read(ref head.Value) == Volatile.Read(ref tail.Value); + + /// + /// Gets the approximate number of elements currently stored in the buffer. + /// + /// + /// This is a best-effort snapshot intended for diagnostics/metering. Under concurrent access + /// the true count may have changed by the time the value is returned. + /// + public int Count + { + get + { + int currentTail = Volatile.Read(ref tail.Value); + int currentHead = Volatile.Read(ref head.Value); + return (currentTail - currentHead) & mask; + } + } + + /// + /// Attempts to add an element to the buffer. Must only be called from the single producer thread. + /// + /// The element to add. + /// if the element was added; if the buffer was full. + /// This operation is lock-free, allocation-free, and never blocks or throws. + public bool TryEnqueue(T item) + { + // The producer owns Tail, so a plain read is sufficient here. + int currentTail = tail.Value; + int nextTail = (currentTail + 1) & mask; + + // Full when advancing Tail would collide with the consumer's Head. + // Acquire the consumer's published Head. + if (nextTail == Volatile.Read(ref head.Value)) + { + return false; + } + + buffer[currentTail] = item; + + // Release: publish the written slot to the consumer. + Volatile.Write(ref tail.Value, nextTail); + return true; + } + + /// + /// Attempts to remove and return the oldest element in the buffer. Must only be called from the + /// single consumer thread. + /// + /// When this method returns , contains the dequeued element; otherwise the default value. + /// if an element was removed; if the buffer was empty. + /// This operation is lock-free, allocation-free, and never blocks or throws. + public bool TryDequeue([MaybeNullWhen(false)] out T item) + { + // The consumer owns Head, so a plain read is sufficient here. + int currentHead = head.Value; + + // Empty when Head has caught up to the producer's published Tail. Acquire Tail. + if (currentHead == Volatile.Read(ref tail.Value)) + { + item = default; + return false; + } + + item = buffer[currentHead]; + + // Avoid keeping a reference alive for reference types (no-op cost for value types + // is negligible and the JIT elides it for non-reference T). + buffer[currentHead] = default!; + + // Release: publish the freed slot to the producer. + Volatile.Write(ref head.Value, (currentHead + 1) & mask); + return true; + } + + /// + /// Attempts to return the oldest element without removing it. Must only be called from the + /// single consumer thread. + /// + /// When this method returns , contains the oldest element; otherwise the default value. + /// if an element was available; if the buffer was empty. + public bool TryPeek([MaybeNullWhen(false)] out T item) + { + int currentHead = head.Value; + if (currentHead == Volatile.Read(ref tail.Value)) + { + item = default; + return false; + } + + item = buffer[currentHead]; + return true; + } + + /// + /// Calculates the next power of two greater than or equal to the specified value. + /// + /// The value to round up. + /// The next power of two. + private static int NextPower2(int v) + { + v--; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v++; + return v; + } +} + +/// +/// A 32-bit index padded onto its own cache line to avoid false sharing between the producer and +/// consumer of a . +/// +/// +/// This is a non-generic struct so that it is permitted to carry an explicit layout (a struct nested +/// in a generic type cannot). The value sits at offset 64 with 64 bytes of trailing padding, isolating +/// it from any adjacent field on a typical 64-byte cache line. +/// +[StructLayout(LayoutKind.Explicit, Size = 128)] +internal struct SpscPaddedIndex +{ + [FieldOffset(64)] + public int Value; +}