Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions Runtime/Scripts/AudioStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ public sealed class AudioStream : IDisposable
{
internal readonly FfiHandle Handle;
private readonly AudioSource _audioSource;
private readonly AudioProbe _probe;
private RingBuffer _buffer;
private short[] _tempBuffer;
private uint _numChannels;
private uint _sampleRate;
private AudioResampler _resampler = new AudioResampler();
private object _lock = new object();
private readonly object _lock = new object();
private bool _disposed = false;

/// <summary>
Expand Down Expand Up @@ -50,14 +51,20 @@ public AudioStream(RemoteAudioTrack audioTrack, AudioSource source)
FfiClient.Instance.AudioStreamEventReceived += OnAudioStreamEvent;

_audioSource = source;
var probe = _audioSource.gameObject.AddComponent<AudioProbe>();
probe.AudioRead += OnAudioRead;
_probe = _audioSource.gameObject.AddComponent<AudioProbe>();
_probe.AudioRead += OnAudioRead;
_audioSource.Play();
}

// Called on Unity audio thread
private void OnAudioRead(float[] data, int channels, int sampleRate)
{
if (_disposed)
{
Array.Clear(data, 0, data.Length);
return;
}

lock (_lock)
{
if (_buffer == null || channels != _numChannels || sampleRate != _sampleRate || data.Length != _tempBuffer.Length)
Expand Down Expand Up @@ -90,13 +97,16 @@ static float S16ToFloat(short v)
// Called on the MainThread (See FfiClient)
private void OnAudioStreamEvent(AudioStreamEvent e)
{
if (_disposed)
return;

if ((ulong)Handle.DangerousGetHandle() != e.StreamHandle)
return;

if (e.MessageCase != AudioStreamEvent.MessageOneofCase.FrameReceived)
return;

var frame = new AudioFrame(e.FrameReceived.Frame);
using var frame = new AudioFrame(e.FrameReceived.Frame);

lock (_lock)
{
Expand All @@ -105,7 +115,7 @@ private void OnAudioStreamEvent(AudioStreamEvent e)

unsafe
{
var uFrame = _resampler.RemixAndResample(frame, _numChannels, _sampleRate);
using var uFrame = _resampler.RemixAndResample(frame, _numChannels, _sampleRate);
if (uFrame != null)
{
var data = new Span<byte>(uFrame.Data.ToPointer(), uFrame.Length);
Expand All @@ -124,11 +134,39 @@ public void Dispose()

private void Dispose(bool disposing)
{
if (!_disposed && disposing)
if (_disposed)
{
_audioSource.Stop();
UnityEngine.Object.Destroy(_audioSource.GetComponent<AudioProbe>());
return;
}

// Remove long-lived delegate references first so this instance can become collectible
// as soon as user code drops it. This also prevents late native callbacks from
// touching partially disposed state.
FfiClient.Instance.AudioStreamEventReceived -= OnAudioStreamEvent;

lock (_lock)
{
// Native resources can be released on both the explicit-dispose and finalizer
// paths. Unity objects are only touched when Dispose() is called explicitly on the
// main thread.
if (disposing)
{
_audioSource.Stop();
if (_probe != null)
{
_probe.AudioRead -= OnAudioRead;
UnityEngine.Object.Destroy(_probe);
}
}

_buffer?.Dispose();
_buffer = null;
_tempBuffer = null;
_resampler?.Dispose();
_resampler = null;
Handle.Dispose();
}

_disposed = true;
}

Expand Down
42 changes: 36 additions & 6 deletions Runtime/Scripts/Internal/AudioResampler.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
using System;
using LiveKit.Internal.FFIClients.Requests;
using LiveKit.Internal;
using LiveKit.Proto;
using UnityEngine;

namespace LiveKit
{
public class AudioResampler
public sealed class AudioResampler : IDisposable
{
internal readonly OwnedAudioResampler resampler;
private readonly FfiHandle _handle;
private bool _disposed;

public AudioResampler()
{
using var request = FFIBridge.Instance.NewRequest<NewAudioResamplerRequest>();
using var response = request.Send();
FfiResponse res = response;
resampler = res.NewAudioResampler.Resampler;
_handle = FfiHandle.FromOwnedHandle(res.NewAudioResampler.Resampler.Handle);
}

public AudioFrame RemixAndResample(AudioFrame frame, uint numChannels, uint sampleRate)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(AudioResampler));
}

using var request = FFIBridge.Instance.NewRequest<RemixAndResampleRequest>();
using var audioFrameBufferInfo = request.TempResource<AudioFrameBufferInfo>();
var remix = request.request;
remix.ResamplerHandle = resampler.Handle.Id;
remix.ResamplerHandle = (ulong)_handle.DangerousGetHandle();
remix.Buffer = frame.Info;
remix.NumChannels = numChannels;
remix.SampleRate = sampleRate;
Expand All @@ -35,5 +42,28 @@ public AudioFrame RemixAndResample(AudioFrame frame, uint numChannels, uint samp
var newBuffer = res.RemixAndResample.Buffer;
return new AudioFrame(newBuffer);
}

public void Dispose()
{
if (_disposed)
{
return;
}

_handle.Dispose();
_disposed = true;
GC.SuppressFinalize(this);
}

~AudioResampler()
{
if (_disposed)
{
return;
}

_handle.Dispose();
_disposed = true;
}
}
}
}
136 changes: 118 additions & 18 deletions Runtime/Scripts/VideoStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ public class VideoStream
private bool _disposed = false;
private bool _dirty = false;
private YuvToRgbConverter _converter;
private readonly object _frameLock = new object();
// Separates frame production from consumption:
// - OnVideoStreamEvent produces the latest native frame coming from Rust
// - Update() consumes at most one frame per Unity tick for upload/render
//
// Keeping the "next frame" in a dedicated slot lets Unity coalesce bursts down to the
// newest pending frame without overwriting the frame currently being uploaded.
private VideoFrameBuffer _pendingBuffer;

/// Called when we receive a new frame from the VideoTrack
public event FrameReceiveDelegate FrameReceived;
Expand All @@ -31,6 +39,9 @@ public class VideoStream
/// The texture changes every time the video resolution changes.
/// Can be null if UpdateRoutine isn't started
public RenderTexture Texture { private set; get; }
// The frame currently owned by the Unity update/render path. Update() swaps the latest
// pending frame into this slot before converting/uploading it, so this represents the
// frame being actively consumed rather than the next frame arriving from Rust.
public VideoFrameBuffer VideoBuffer { private set; get; }

protected bool _playing = false;
Expand Down Expand Up @@ -68,19 +79,38 @@ public void Dispose()

private void Dispose(bool disposing)
{
if (!_disposed)
if (_disposed)
return;

// Remove the long-lived delegate reference first so this stream can be collected as
// soon as user code drops it, and so late native callbacks cannot mutate disposed
// frame/converter state.
FfiClient.Instance.VideoStreamEventReceived -= OnVideoStreamEvent;

lock (_frameLock)
{
if (disposing)
{
VideoBuffer?.Dispose();
}
// Unity objects must be destroyed on main thread
// Native frame buffers are not Unity objects, so always release them even on the
// finalizer path. This keeps the stream from leaking native frame handles if user
// code forgets to call Dispose().
VideoBuffer?.Dispose();
VideoBuffer = null;
_pendingBuffer?.Dispose();
_pendingBuffer = null;
}

if (disposing)
{
// Unity objects must be destroyed on main thread, so only touch the converter and
// RenderTexture when Dispose() is called explicitly by user code.
_converter?.Dispose();
_converter = null;
// Texture is owned and cleaned up by _converter. Set to null to avoid holding a reference to a disposed RenderTexture.
// Texture is owned and cleaned up by _converter. Set to null to avoid holding a
// reference to a disposed RenderTexture.
Texture = null;
_disposed = true;
}

Handle.Dispose();
_disposed = true;
}

public virtual void Start()
Expand All @@ -92,6 +122,18 @@ public virtual void Start()
public virtual void Stop()
{
_playing = false;

// When the stream has no active consumer, do not keep the latest native frame alive.
// Rust may still be producing frames depending on its queue configuration, but Unity
// drops them immediately until Start() is called again.
lock (_frameLock)
{
_pendingBuffer?.Dispose();
_pendingBuffer = null;
VideoBuffer?.Dispose();
VideoBuffer = null;
_dirty = false;
}
}

public IEnumerator Update()
Expand All @@ -103,10 +145,32 @@ public IEnumerator Update()
if (_disposed)
break;

if (VideoBuffer == null || !VideoBuffer.IsValid || !_dirty)
VideoFrameBuffer nextBuffer = null;
lock (_frameLock)
{
if (_dirty)
{
nextBuffer = _pendingBuffer;
_pendingBuffer = null;
_dirty = false;
}
}

if (nextBuffer == null)
continue;

_dirty = false;
// Latest-frame-wins: if Rust buffered multiple frames, the intake path keeps only
// the newest pending frame. Update() uploads at most one frame per Unity tick.
VideoBuffer?.Dispose();
VideoBuffer = nextBuffer;

if (!VideoBuffer.IsValid)
{
VideoBuffer.Dispose();
VideoBuffer = null;
continue;
}

var rWidth = VideoBuffer.Width;
var rHeight = VideoBuffer.Height;

Expand All @@ -127,24 +191,60 @@ public IEnumerator Update()
// Handle new video stream events
private void OnVideoStreamEvent(VideoStreamEvent e)
{
if (_disposed)
return;

if (e.StreamHandle != (ulong)Handle.DangerousGetHandle())
return;

if (e.MessageCase != VideoStreamEvent.MessageOneofCase.FrameReceived)
return;

var newBuffer = e.FrameReceived.Buffer;
var handle = new FfiHandle((IntPtr)newBuffer.Handle.Id);
var frameInfo = newBuffer.Info;

var frame = new VideoFrame(frameInfo, e.FrameReceived.TimestampUs, e.FrameReceived.Rotation);
// Create a managed wrapper around the native frame handle. This does not copy the
// underlying video payload; the wrapper simply owns the FFI handle until the frame is
// either uploaded or dropped.
var buffer = VideoFrameBuffer.Create(handle, frameInfo);
if (buffer == null)
{
handle.Dispose();
return;
}

// If there is no active consumer, keep draining frames from Rust but drop them
// immediately on the Unity side to avoid growing native memory or preserving stale
// frames. The producer queue can be size 1, bounded N, or unbounded; this behavior is
// correct for all three because Unity only wants the most recent renderable frame.
if (!_playing)
{
buffer.Dispose();
return;
}

lock (_frameLock)
{
if (_disposed || !_playing)
{
buffer.Dispose();
return;
}

VideoBuffer?.Dispose();
VideoBuffer = buffer;
_dirty = true;
// Latest-frame-wins coalescing. If Rust delivers several frames before Update()
// runs again, replace the pending frame with the newest one and drop the older
// native buffer immediately.
_pendingBuffer?.Dispose();
_pendingBuffer = buffer;
_dirty = true;
}

FrameReceived?.Invoke(frame);
// Avoid allocating VideoFrame objects when nobody is observing them.
if (FrameReceived != null)
{
var frame = new VideoFrame(frameInfo, e.FrameReceived.TimestampUs, e.FrameReceived.Rotation);
FrameReceived.Invoke(frame);
}
}
}
}
}
Loading
Loading