diff --git a/Runtime/Scripts/AudioStream.cs b/Runtime/Scripts/AudioStream.cs index b0d185b6..ccc028ae 100644 --- a/Runtime/Scripts/AudioStream.cs +++ b/Runtime/Scripts/AudioStream.cs @@ -15,12 +15,13 @@ public sealed class AudioStream : IDisposable { internal readonly FfiHandle Handle; private readonly AudioSource _audioSource; + private readonly AudioProbe _probe; private RingBuffer _buffer; private short[] _tempBuffer; private uint _numChannels; private uint _sampleRate; private AudioResampler _resampler = new AudioResampler(); - private object _lock = new object(); + private readonly object _lock = new object(); private bool _disposed = false; /// @@ -50,14 +51,20 @@ public AudioStream(RemoteAudioTrack audioTrack, AudioSource source) FfiClient.Instance.AudioStreamEventReceived += OnAudioStreamEvent; _audioSource = source; - var probe = _audioSource.gameObject.AddComponent(); - probe.AudioRead += OnAudioRead; + _probe = _audioSource.gameObject.AddComponent(); + _probe.AudioRead += OnAudioRead; _audioSource.Play(); } // Called on Unity audio thread private void OnAudioRead(float[] data, int channels, int sampleRate) { + if (_disposed) + { + Array.Clear(data, 0, data.Length); + return; + } + lock (_lock) { if (_buffer == null || channels != _numChannels || sampleRate != _sampleRate || data.Length != _tempBuffer.Length) @@ -90,13 +97,16 @@ static float S16ToFloat(short v) // Called on the MainThread (See FfiClient) private void OnAudioStreamEvent(AudioStreamEvent e) { + if (_disposed) + return; + if ((ulong)Handle.DangerousGetHandle() != e.StreamHandle) return; if (e.MessageCase != AudioStreamEvent.MessageOneofCase.FrameReceived) return; - var frame = new AudioFrame(e.FrameReceived.Frame); + using var frame = new AudioFrame(e.FrameReceived.Frame); lock (_lock) { @@ -105,7 +115,7 @@ private void OnAudioStreamEvent(AudioStreamEvent e) unsafe { - var uFrame = _resampler.RemixAndResample(frame, _numChannels, _sampleRate); + using var uFrame = _resampler.RemixAndResample(frame, _numChannels, _sampleRate); if (uFrame != null) { var data = new Span(uFrame.Data.ToPointer(), uFrame.Length); @@ -124,11 +134,39 @@ public void Dispose() private void Dispose(bool disposing) { - if (!_disposed && disposing) + if (_disposed) { - _audioSource.Stop(); - UnityEngine.Object.Destroy(_audioSource.GetComponent()); + return; } + + // Remove long-lived delegate references first so this instance can become collectible + // as soon as user code drops it. This also prevents late native callbacks from + // touching partially disposed state. + FfiClient.Instance.AudioStreamEventReceived -= OnAudioStreamEvent; + + lock (_lock) + { + // Native resources can be released on both the explicit-dispose and finalizer + // paths. Unity objects are only touched when Dispose() is called explicitly on the + // main thread. + if (disposing) + { + _audioSource.Stop(); + if (_probe != null) + { + _probe.AudioRead -= OnAudioRead; + UnityEngine.Object.Destroy(_probe); + } + } + + _buffer?.Dispose(); + _buffer = null; + _tempBuffer = null; + _resampler?.Dispose(); + _resampler = null; + Handle.Dispose(); + } + _disposed = true; } diff --git a/Runtime/Scripts/Internal/AudioResampler.cs b/Runtime/Scripts/Internal/AudioResampler.cs index 47fa5c44..162d9228 100644 --- a/Runtime/Scripts/Internal/AudioResampler.cs +++ b/Runtime/Scripts/Internal/AudioResampler.cs @@ -1,27 +1,34 @@ +using System; using LiveKit.Internal.FFIClients.Requests; +using LiveKit.Internal; using LiveKit.Proto; -using UnityEngine; namespace LiveKit { - public class AudioResampler + public sealed class AudioResampler : IDisposable { - internal readonly OwnedAudioResampler resampler; + private readonly FfiHandle _handle; + private bool _disposed; public AudioResampler() { using var request = FFIBridge.Instance.NewRequest(); using var response = request.Send(); FfiResponse res = response; - resampler = res.NewAudioResampler.Resampler; + _handle = FfiHandle.FromOwnedHandle(res.NewAudioResampler.Resampler.Handle); } public AudioFrame RemixAndResample(AudioFrame frame, uint numChannels, uint sampleRate) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(AudioResampler)); + } + using var request = FFIBridge.Instance.NewRequest(); using var audioFrameBufferInfo = request.TempResource(); var remix = request.request; - remix.ResamplerHandle = resampler.Handle.Id; + remix.ResamplerHandle = (ulong)_handle.DangerousGetHandle(); remix.Buffer = frame.Info; remix.NumChannels = numChannels; remix.SampleRate = sampleRate; @@ -35,5 +42,28 @@ public AudioFrame RemixAndResample(AudioFrame frame, uint numChannels, uint samp var newBuffer = res.RemixAndResample.Buffer; return new AudioFrame(newBuffer); } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _handle.Dispose(); + _disposed = true; + GC.SuppressFinalize(this); + } + + ~AudioResampler() + { + if (_disposed) + { + return; + } + + _handle.Dispose(); + _disposed = true; + } } -} \ No newline at end of file +} diff --git a/Runtime/Scripts/VideoStream.cs b/Runtime/Scripts/VideoStream.cs index eadd9ce8..8d2fa5e9 100644 --- a/Runtime/Scripts/VideoStream.cs +++ b/Runtime/Scripts/VideoStream.cs @@ -18,6 +18,14 @@ public class VideoStream private bool _disposed = false; private bool _dirty = false; private YuvToRgbConverter _converter; + private readonly object _frameLock = new object(); + // Separates frame production from consumption: + // - OnVideoStreamEvent produces the latest native frame coming from Rust + // - Update() consumes at most one frame per Unity tick for upload/render + // + // Keeping the "next frame" in a dedicated slot lets Unity coalesce bursts down to the + // newest pending frame without overwriting the frame currently being uploaded. + private VideoFrameBuffer _pendingBuffer; /// Called when we receive a new frame from the VideoTrack public event FrameReceiveDelegate FrameReceived; @@ -31,6 +39,9 @@ public class VideoStream /// The texture changes every time the video resolution changes. /// Can be null if UpdateRoutine isn't started public RenderTexture Texture { private set; get; } + // The frame currently owned by the Unity update/render path. Update() swaps the latest + // pending frame into this slot before converting/uploading it, so this represents the + // frame being actively consumed rather than the next frame arriving from Rust. public VideoFrameBuffer VideoBuffer { private set; get; } protected bool _playing = false; @@ -68,19 +79,38 @@ public void Dispose() private void Dispose(bool disposing) { - if (!_disposed) + if (_disposed) + return; + + // Remove the long-lived delegate reference first so this stream can be collected as + // soon as user code drops it, and so late native callbacks cannot mutate disposed + // frame/converter state. + FfiClient.Instance.VideoStreamEventReceived -= OnVideoStreamEvent; + + lock (_frameLock) { - if (disposing) - { - VideoBuffer?.Dispose(); - } - // Unity objects must be destroyed on main thread + // Native frame buffers are not Unity objects, so always release them even on the + // finalizer path. This keeps the stream from leaking native frame handles if user + // code forgets to call Dispose(). + VideoBuffer?.Dispose(); + VideoBuffer = null; + _pendingBuffer?.Dispose(); + _pendingBuffer = null; + } + + if (disposing) + { + // Unity objects must be destroyed on main thread, so only touch the converter and + // RenderTexture when Dispose() is called explicitly by user code. _converter?.Dispose(); _converter = null; - // Texture is owned and cleaned up by _converter. Set to null to avoid holding a reference to a disposed RenderTexture. + // Texture is owned and cleaned up by _converter. Set to null to avoid holding a + // reference to a disposed RenderTexture. Texture = null; - _disposed = true; } + + Handle.Dispose(); + _disposed = true; } public virtual void Start() @@ -92,6 +122,18 @@ public virtual void Start() public virtual void Stop() { _playing = false; + + // When the stream has no active consumer, do not keep the latest native frame alive. + // Rust may still be producing frames depending on its queue configuration, but Unity + // drops them immediately until Start() is called again. + lock (_frameLock) + { + _pendingBuffer?.Dispose(); + _pendingBuffer = null; + VideoBuffer?.Dispose(); + VideoBuffer = null; + _dirty = false; + } } public IEnumerator Update() @@ -103,10 +145,32 @@ public IEnumerator Update() if (_disposed) break; - if (VideoBuffer == null || !VideoBuffer.IsValid || !_dirty) + VideoFrameBuffer nextBuffer = null; + lock (_frameLock) + { + if (_dirty) + { + nextBuffer = _pendingBuffer; + _pendingBuffer = null; + _dirty = false; + } + } + + if (nextBuffer == null) continue; - _dirty = false; + // Latest-frame-wins: if Rust buffered multiple frames, the intake path keeps only + // the newest pending frame. Update() uploads at most one frame per Unity tick. + VideoBuffer?.Dispose(); + VideoBuffer = nextBuffer; + + if (!VideoBuffer.IsValid) + { + VideoBuffer.Dispose(); + VideoBuffer = null; + continue; + } + var rWidth = VideoBuffer.Width; var rHeight = VideoBuffer.Height; @@ -127,24 +191,60 @@ public IEnumerator Update() // Handle new video stream events private void OnVideoStreamEvent(VideoStreamEvent e) { + if (_disposed) + return; + if (e.StreamHandle != (ulong)Handle.DangerousGetHandle()) return; if (e.MessageCase != VideoStreamEvent.MessageOneofCase.FrameReceived) return; - + var newBuffer = e.FrameReceived.Buffer; var handle = new FfiHandle((IntPtr)newBuffer.Handle.Id); var frameInfo = newBuffer.Info; - - var frame = new VideoFrame(frameInfo, e.FrameReceived.TimestampUs, e.FrameReceived.Rotation); + // Create a managed wrapper around the native frame handle. This does not copy the + // underlying video payload; the wrapper simply owns the FFI handle until the frame is + // either uploaded or dropped. var buffer = VideoFrameBuffer.Create(handle, frameInfo); + if (buffer == null) + { + handle.Dispose(); + return; + } + + // If there is no active consumer, keep draining frames from Rust but drop them + // immediately on the Unity side to avoid growing native memory or preserving stale + // frames. The producer queue can be size 1, bounded N, or unbounded; this behavior is + // correct for all three because Unity only wants the most recent renderable frame. + if (!_playing) + { + buffer.Dispose(); + return; + } + + lock (_frameLock) + { + if (_disposed || !_playing) + { + buffer.Dispose(); + return; + } - VideoBuffer?.Dispose(); - VideoBuffer = buffer; - _dirty = true; + // Latest-frame-wins coalescing. If Rust delivers several frames before Update() + // runs again, replace the pending frame with the newest one and drop the older + // native buffer immediately. + _pendingBuffer?.Dispose(); + _pendingBuffer = buffer; + _dirty = true; + } - FrameReceived?.Invoke(frame); + // Avoid allocating VideoFrame objects when nobody is observing them. + if (FrameReceived != null) + { + var frame = new VideoFrame(frameInfo, e.FrameReceived.TimestampUs, e.FrameReceived.Rotation); + FrameReceived.Invoke(frame); + } } } -} \ No newline at end of file +} diff --git a/Tests/EditMode/MediaStreamLifetimeTests.cs b/Tests/EditMode/MediaStreamLifetimeTests.cs new file mode 100644 index 00000000..425212d9 --- /dev/null +++ b/Tests/EditMode/MediaStreamLifetimeTests.cs @@ -0,0 +1,175 @@ +using System; +using System.IO; +using NUnit.Framework; +using UnityEngine; + +namespace LiveKit.EditModeTests +{ + public class MediaStreamLifetimeTests + { + private static readonly string[] AudioStreamPaths = + { + "Runtime/Scripts/AudioStream.cs", + "Assets/client-sdk-unity/Runtime/Scripts/AudioStream.cs", + }; + + private static readonly string[] VideoStreamPaths = + { + "Runtime/Scripts/VideoStream.cs", + "Assets/client-sdk-unity/Runtime/Scripts/VideoStream.cs", + }; + + private static readonly string[] AudioResamplerPaths = + { + "Runtime/Scripts/Internal/AudioResampler.cs", + "Assets/client-sdk-unity/Runtime/Scripts/Internal/AudioResampler.cs", + }; + + private static string ReadSource(params string[] candidates) + { + foreach (var root in SearchRoots()) + { + foreach (var candidate in candidates) + { + var combined = Path.GetFullPath(Path.Combine(root, candidate)); + if (File.Exists(combined)) + { + return File.ReadAllText(combined); + } + } + } + + foreach (var root in SearchRoots()) + { + foreach (var candidate in candidates) + { + var fileName = Path.GetFileName(candidate); + if (string.IsNullOrEmpty(fileName) || !Directory.Exists(root)) + { + continue; + } + + try + { + foreach (var match in Directory.EnumerateFiles(root, fileName, SearchOption.AllDirectories)) + { + // Keep the search specific to the intended suffix so a duplicate file + // name elsewhere in the repo does not satisfy the test accidentally. + var normalizedMatch = match.Replace('\\', '/'); + var normalizedCandidate = candidate.Replace('\\', '/'); + if (normalizedMatch.EndsWith(normalizedCandidate)) + { + return File.ReadAllText(match); + } + } + } + catch (IOException) + { + // Best-effort lookup for CI layout differences. + } + catch (UnauthorizedAccessException) + { + // Best-effort lookup for CI layout differences. + } + } + } + + foreach (var candidate in candidates) + { + if (File.Exists(candidate)) + { + return File.ReadAllText(candidate); + } + } + + Assert.Fail($"Could not find source file. Tried: {string.Join(", ", candidates)}"); + return string.Empty; + } + + private static string[] SearchRoots() + { + var roots = new System.Collections.Generic.List(); + + void AddWithParents(string path) + { + if (string.IsNullOrEmpty(path)) + { + return; + } + + var fullPath = Path.GetFullPath(path); + var dir = new DirectoryInfo(fullPath); + while (dir != null) + { + if (!roots.Contains(dir.FullName)) + { + roots.Add(dir.FullName); + } + dir = dir.Parent; + } + } + + AddWithParents(Directory.GetCurrentDirectory()); + AddWithParents(Application.dataPath); + + return roots.ToArray(); + } + + [Test] + public void AudioStream_Dispose_UnsubscribesAndReleasesOwnedResources() + { + var source = ReadSource(AudioStreamPaths); + + StringAssert.Contains("FfiClient.Instance.AudioStreamEventReceived -= OnAudioStreamEvent;", source); + StringAssert.Contains("_probe.AudioRead -= OnAudioRead;", source); + StringAssert.Contains("_buffer?.Dispose();", source); + StringAssert.Contains("_resampler?.Dispose();", source); + StringAssert.Contains("Handle.Dispose();", source); + } + + [Test] + public void AudioStream_AudioFrames_AreDisposedAfterProcessing() + { + var source = ReadSource(AudioStreamPaths); + + // Both the inbound native frame and the remixed output frame should be scoped so their + // handles are released after each callback rather than accumulating over time. + StringAssert.Contains("using var frame = new AudioFrame(e.FrameReceived.Frame);", source); + StringAssert.Contains("using var uFrame = _resampler.RemixAndResample(frame, _numChannels, _sampleRate);", source); + } + + [Test] + public void AudioResampler_IsDisposable_AndReleasesNativeHandle() + { + var source = ReadSource(AudioResamplerPaths); + + StringAssert.Contains("public sealed class AudioResampler : IDisposable", source); + StringAssert.Contains("_handle.Dispose();", source); + } + + [Test] + public void VideoStream_Dispose_UnsubscribesAndReleasesOwnedResources() + { + var source = ReadSource(VideoStreamPaths); + + StringAssert.Contains("FfiClient.Instance.VideoStreamEventReceived -= OnVideoStreamEvent;", source); + StringAssert.Contains("VideoBuffer?.Dispose();", source); + StringAssert.Contains("_pendingBuffer?.Dispose();", source); + StringAssert.Contains("Handle.Dispose();", source); + } + + [Test] + public void VideoStream_UsesLatestFrameWinsCoalescing() + { + var source = ReadSource(VideoStreamPaths); + + // The intake path should maintain a dedicated pending slot and replace/drop superseded + // frames so Unity uploads at most the latest frame per tick. + StringAssert.Contains("private VideoFrameBuffer _pendingBuffer;", source); + StringAssert.Contains("_pendingBuffer?.Dispose();", source); + StringAssert.Contains("_pendingBuffer = buffer;", source); + StringAssert.Contains("nextBuffer = _pendingBuffer;", source); + StringAssert.Contains("VideoBuffer = nextBuffer;", source); + } + } +}