Skip to content

Commit 54635a8

Browse files
Use client-generated request_async_id for Unity FFI async requests and replace delegate fan-out with pending callback dispatch (#195)
* A few improvements * fix the build
1 parent bfc4946 commit 54635a8

9 files changed

Lines changed: 499 additions & 191 deletions

File tree

Runtime/Scripts/DataStream.cs

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,9 @@ public ReadAllInstruction ReadAll()
169169
var readAllReq = request.request;
170170
readAllReq.ReaderHandle = (ulong)_handle.DangerousGetHandle();
171171

172+
var instruction = new ReadAllInstruction(request.RequestAsyncId);
172173
using var response = request.Send();
173-
FfiResponse res = response;
174-
return new ReadAllInstruction(res.TextReadAll.AsyncId);
174+
return instruction;
175175
}
176176

177177
/// <summary>
@@ -188,7 +188,10 @@ public sealed class ReadAllInstruction : YieldInstruction
188188
internal ReadAllInstruction(ulong asyncId)
189189
{
190190
_asyncId = asyncId;
191-
FfiClient.Instance.TextStreamReaderReadAllReceived += OnReadAll;
191+
// ReadAll is modeled as a single async completion rather than a stream of
192+
// incremental events, so it uses the request_async_id pending map. Rust returns
193+
// the same value through callback.AsyncId.
194+
FfiClient.Instance.RegisterPendingCallback(asyncId, static e => e.TextStreamReaderReadAll, OnReadAll, OnCanceled);
192195
}
193196

194197
internal void OnReadAll(TextStreamReaderReadAllCallback e)
@@ -207,7 +210,13 @@ internal void OnReadAll(TextStreamReaderReadAllCallback e)
207210
break;
208211
}
209212
IsDone = true;
210-
FfiClient.Instance.TextStreamReaderReadAllReceived -= OnReadAll;
213+
}
214+
215+
void OnCanceled()
216+
{
217+
Error = new StreamError("Canceled");
218+
IsError = true;
219+
IsDone = true;
211220
}
212221

213222
public string Text
@@ -331,9 +340,9 @@ public ReadAllInstruction ReadAll()
331340
var readAllReq = request.request;
332341
readAllReq.ReaderHandle = (ulong)_handle.DangerousGetHandle();
333342

343+
var instruction = new ReadAllInstruction(request.RequestAsyncId);
334344
using var response = request.Send();
335-
FfiResponse res = response;
336-
return new ReadAllInstruction(res.ByteReadAll.AsyncId);
345+
return instruction;
337346
}
338347

339348
/// <summary>
@@ -373,9 +382,9 @@ public WriteToFileInstruction WriteToFile(string directory = null, string nameOv
373382
writeToFileReq.Directory = directory;
374383
writeToFileReq.NameOverride = nameOverride;
375384

385+
var instruction = new WriteToFileInstruction(request.RequestAsyncId);
376386
using var response = request.Send();
377-
FfiResponse res = response;
378-
return new WriteToFileInstruction(res.ByteWriteToFile.AsyncId);
387+
return instruction;
379388
}
380389

381390
/// <summary>
@@ -392,7 +401,7 @@ public sealed class ReadAllInstruction : YieldInstruction
392401
internal ReadAllInstruction(ulong asyncId)
393402
{
394403
_asyncId = asyncId;
395-
FfiClient.Instance.ByteStreamReaderReadAllReceived += OnReadAll;
404+
FfiClient.Instance.RegisterPendingCallback(asyncId, static e => e.ByteStreamReaderReadAll, OnReadAll, OnCanceled);
396405
}
397406

398407
internal void OnReadAll(ByteStreamReaderReadAllCallback e)
@@ -411,7 +420,13 @@ internal void OnReadAll(ByteStreamReaderReadAllCallback e)
411420
break;
412421
}
413422
IsDone = true;
414-
FfiClient.Instance.ByteStreamReaderReadAllReceived -= OnReadAll;
423+
}
424+
425+
void OnCanceled()
426+
{
427+
Error = new StreamError("Canceled");
428+
IsError = true;
429+
IsDone = true;
415430
}
416431

417432
public byte[] Bytes
@@ -500,7 +515,7 @@ public sealed class WriteToFileInstruction : YieldInstruction
500515
internal WriteToFileInstruction(ulong asyncId)
501516
{
502517
_asyncId = asyncId;
503-
FfiClient.Instance.ByteStreamReaderWriteToFileReceived += OnWriteToFile;
518+
FfiClient.Instance.RegisterPendingCallback(asyncId, static e => e.ByteStreamReaderWriteToFile, OnWriteToFile, OnCanceled);
504519
}
505520

506521
internal void OnWriteToFile(ByteStreamReaderWriteToFileCallback e)
@@ -519,7 +534,13 @@ internal void OnWriteToFile(ByteStreamReaderWriteToFileCallback e)
519534
break;
520535
}
521536
IsDone = true;
522-
FfiClient.Instance.ByteStreamReaderWriteToFileReceived -= OnWriteToFile;
537+
}
538+
539+
void OnCanceled()
540+
{
541+
Error = new StreamError("Canceled");
542+
IsError = true;
543+
IsDone = true;
523544
}
524545

525546
/// <summary>
@@ -641,9 +662,9 @@ public WriteInstruction Write(string text)
641662
writeReq.WriterHandle = (ulong)_handle.DangerousGetHandle();
642663
writeReq.Text = text;
643664

665+
var instruction = new WriteInstruction(request.RequestAsyncId);
644666
using var response = request.Send();
645-
FfiResponse res = response;
646-
return new WriteInstruction(res.TextStreamWrite.AsyncId);
667+
return instruction;
647668
}
648669

649670
/// <summary>
@@ -661,9 +682,9 @@ public CloseInstruction Close(string reason = null)
661682
closeReq.WriterHandle = (ulong)_handle.DangerousGetHandle();
662683
closeReq.Reason = reason;
663684

685+
var instruction = new CloseInstruction(request.RequestAsyncId);
664686
using var response = request.Send();
665-
FfiResponse res = response;
666-
return new CloseInstruction(res.TextStreamWrite.AsyncId);
687+
return instruction;
667688
}
668689

669690
/// <summary>
@@ -679,10 +700,10 @@ public sealed class WriteInstruction : YieldInstruction
679700
internal WriteInstruction(ulong asyncId)
680701
{
681702
_asyncId = asyncId;
682-
FfiClient.Instance.ByteStreamWriterWriteReceived += OnWrite;
703+
FfiClient.Instance.RegisterPendingCallback(asyncId, static e => e.TextStreamWriterWrite, OnWrite, OnCanceled);
683704
}
684705

685-
internal void OnWrite(ByteStreamWriterWriteCallback e)
706+
internal void OnWrite(TextStreamWriterWriteCallback e)
686707
{
687708
if (e.AsyncId != _asyncId)
688709
return;
@@ -693,7 +714,13 @@ internal void OnWrite(ByteStreamWriterWriteCallback e)
693714
IsError = true;
694715
}
695716
IsDone = true;
696-
FfiClient.Instance.ByteStreamWriterWriteReceived -= OnWrite;
717+
}
718+
719+
void OnCanceled()
720+
{
721+
Error = new StreamError("Canceled");
722+
IsError = true;
723+
IsDone = true;
697724
}
698725

699726
public StreamError Error { get; private set; }
@@ -712,10 +739,10 @@ public sealed class CloseInstruction : YieldInstruction
712739
internal CloseInstruction(ulong asyncId)
713740
{
714741
_asyncId = asyncId;
715-
FfiClient.Instance.ByteStreamWriterCloseReceived += OnClose;
742+
FfiClient.Instance.RegisterPendingCallback(asyncId, static e => e.TextStreamWriterClose, OnClose, OnCanceled);
716743
}
717744

718-
internal void OnClose(ByteStreamWriterCloseCallback e)
745+
internal void OnClose(TextStreamWriterCloseCallback e)
719746
{
720747
if (e.AsyncId != _asyncId)
721748
return;
@@ -726,7 +753,13 @@ internal void OnClose(ByteStreamWriterCloseCallback e)
726753
IsError = true;
727754
}
728755
IsDone = true;
729-
FfiClient.Instance.ByteStreamWriterCloseReceived -= OnClose;
756+
}
757+
758+
void OnCanceled()
759+
{
760+
Error = new StreamError("Canceled");
761+
IsError = true;
762+
IsDone = true;
730763
}
731764

732765
public StreamError Error { get; private set; }
@@ -763,9 +796,9 @@ public WriteInstruction Write(byte[] bytes)
763796
writeReq.WriterHandle = (ulong)_handle.DangerousGetHandle();
764797
writeReq.Bytes = Google.Protobuf.ByteString.CopyFrom(bytes);
765798

799+
var instruction = new WriteInstruction(request.RequestAsyncId);
766800
using var response = request.Send();
767-
FfiResponse res = response;
768-
return new WriteInstruction(res.ByteStreamWrite.AsyncId);
801+
return instruction;
769802
}
770803

771804
/// <summary>
@@ -782,9 +815,9 @@ public CloseInstruction Close(string reason = null)
782815
closeReq.WriterHandle = (ulong)_handle.DangerousGetHandle();
783816
closeReq.Reason = reason;
784817

818+
var instruction = new CloseInstruction(request.RequestAsyncId);
785819
using var response = request.Send();
786-
FfiResponse res = response;
787-
return new CloseInstruction(res.ByteStreamWrite.AsyncId);
820+
return instruction;
788821
}
789822

790823
/// <summary>
@@ -800,10 +833,10 @@ public sealed class WriteInstruction : YieldInstruction
800833
internal WriteInstruction(ulong asyncId)
801834
{
802835
_asyncId = asyncId;
803-
FfiClient.Instance.TextStreamWriterWriteReceived += OnWrite;
836+
FfiClient.Instance.RegisterPendingCallback(asyncId, static e => e.ByteStreamWriterWrite, OnWrite, OnCanceled);
804837
}
805838

806-
internal void OnWrite(TextStreamWriterWriteCallback e)
839+
internal void OnWrite(ByteStreamWriterWriteCallback e)
807840
{
808841
if (e.AsyncId != _asyncId)
809842
return;
@@ -814,7 +847,13 @@ internal void OnWrite(TextStreamWriterWriteCallback e)
814847
IsError = true;
815848
}
816849
IsDone = true;
817-
FfiClient.Instance.TextStreamWriterWriteReceived -= OnWrite;
850+
}
851+
852+
void OnCanceled()
853+
{
854+
Error = new StreamError("Canceled");
855+
IsError = true;
856+
IsDone = true;
818857
}
819858

820859
public StreamError Error { get; private set; }
@@ -833,10 +872,10 @@ public sealed class CloseInstruction : YieldInstruction
833872
internal CloseInstruction(ulong asyncId)
834873
{
835874
_asyncId = asyncId;
836-
FfiClient.Instance.TextStreamWriterCloseReceived += OnClose;
875+
FfiClient.Instance.RegisterPendingCallback(asyncId, static e => e.ByteStreamWriterClose, OnClose, OnCanceled);
837876
}
838877

839-
internal void OnClose(TextStreamWriterCloseCallback e)
878+
internal void OnClose(ByteStreamWriterCloseCallback e)
840879
{
841880
if (e.AsyncId != _asyncId)
842881
return;
@@ -847,7 +886,13 @@ internal void OnClose(TextStreamWriterCloseCallback e)
847886
IsError = true;
848887
}
849888
IsDone = true;
850-
FfiClient.Instance.TextStreamWriterCloseReceived -= OnClose;
889+
}
890+
891+
void OnCanceled()
892+
{
893+
Error = new StreamError("Canceled");
894+
IsError = true;
895+
IsDone = true;
851896
}
852897

853898
public StreamError Error { get; private set; }
@@ -898,4 +943,4 @@ internal bool Dispatch(ByteStreamReader reader, string participantIdentity)
898943
return false;
899944
}
900945
}
901-
}
946+
}

0 commit comments

Comments
 (0)