-
Notifications
You must be signed in to change notification settings - Fork 307
Expand file tree
/
Copy pathFoundryLocalPipelineTransport.cs
More file actions
218 lines (194 loc) · 8.46 KB
/
FoundryLocalPipelineTransport.cs
File metadata and controls
218 lines (194 loc) · 8.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
using Microsoft.AI.Foundry.Local;
using Microsoft.AI.Foundry.Local.Detail;
using System.ClientModel;
using System.ClientModel.Primitives;
using System.Reflection;
/// <summary>
/// A custom <see cref="PipelineTransport"/> that conducts pipeline traffic via Foundry Local CoreInterop instead of HTTP
/// network traffic.
/// </summary>
/// <remarks>
/// As written, this relies on Reflection for access to non-public CoreInterop fields and methods. In an integrated form,
/// CoreInterop could instead be used directly via the <see cref="Model"/>'s CoreInterop instance (or another CoreInterop
/// instance).
/// </remarks>
internal class FoundryLocalPipelineTransport : PipelineTransport
{
private readonly FoundryLocalInteropWrapper _interopWrapper;
public FoundryLocalPipelineTransport(Model foundryLocalModel)
{
_interopWrapper = new(foundryLocalModel);
}
protected override PipelineMessage CreateMessageCore()
{
return new FoundryLocalPipelineMessage(new FoundryLocalPipelineRequest());
}
protected override void ProcessCore(PipelineMessage message)
{
if (message is FoundryLocalPipelineMessage foundryLocalMessage)
{
BinaryData interopResultBytes = _interopWrapper.ExecuteInteropChat(message);
foundryLocalMessage.SetResponse(new FoundryLocalPipelineResponse(interopResultBytes));
}
else
{
throw new NotImplementedException();
}
}
protected override async ValueTask ProcessCoreAsync(PipelineMessage message)
{
if (message is FoundryLocalPipelineMessage foundryLocalMessage)
{
BinaryData interopResultBytes = await _interopWrapper.ExecuteInteropChatAsync(message);
foundryLocalMessage.SetResponse(new FoundryLocalPipelineResponse(interopResultBytes));
}
else
{
throw new NotImplementedException();
}
}
private class FoundryLocalPipelineRequestHeaders : PipelineRequestHeaders
{
public override void Add(string name, string value) => throw new NotImplementedException();
public override IEnumerator<KeyValuePair<string, string>> GetEnumerator() => throw new NotImplementedException();
public override bool Remove(string name) => throw new NotImplementedException();
public override void Set(string name, string value)
{
}
public override bool TryGetValue(string name, out string? value)
{
value = null;
return false;
}
public override bool TryGetValues(string name, out IEnumerable<string>? values) => throw new NotImplementedException();
}
private class FoundryLocalPipelineRequest : PipelineRequest
{
protected override string MethodCore { get; set; } = "POST";
protected override Uri? UriCore { get; set; }
protected override PipelineRequestHeaders HeadersCore { get; } = new FoundryLocalPipelineRequestHeaders();
protected override BinaryContent? ContentCore { get; set; }
public override void Dispose()
{
}
}
private class FoundryLocalPipelineResponse : PipelineResponse
{
public FoundryLocalPipelineResponse(BinaryData interopResultBytes)
{
ContentStream = interopResultBytes.ToStream();
}
public override int Status => 200;
public override string ReasonPhrase => throw new NotImplementedException();
public override Stream? ContentStream { get; set; }
public override BinaryData Content
{
get
{
if (_content is null && ContentStream is not null)
{
_content = BinaryData.FromStream(ContentStream);
ContentStream.Position = 0;
}
return _content ??= BinaryData.Empty;
}
}
private BinaryData? _content;
protected override PipelineResponseHeaders HeadersCore => throw new NotImplementedException();
public override BinaryData BufferContent(CancellationToken cancellationToken = default) => Content;
public override ValueTask<BinaryData> BufferContentAsync(CancellationToken cancellationToken = default) => ValueTask.FromResult(Content);
public override void Dispose()
{
}
}
private class FoundryLocalPipelineMessage : PipelineMessage
{
public FoundryLocalPipelineMessage(PipelineRequest request)
: base(request)
{
}
public void SetResponse(FoundryLocalPipelineResponse response)
{
Response = response;
}
}
private class FoundryLocalInteropWrapper
{
private readonly object _coreInteropField;
private readonly MethodInfo _executeCommandInfo;
private readonly MethodInfo _executeCommandAsyncInfo;
public FoundryLocalInteropWrapper(Model foundryLocalModel)
{
_coreInteropField = typeof(ModelVariant)
.GetField("_coreInterop", BindingFlags.Instance | BindingFlags.NonPublic)
?.GetValue(foundryLocalModel.SelectedVariant)
?? throw new InvalidOperationException();
_executeCommandInfo = _coreInteropField
.GetType()
?.GetMethod("ExecuteCommand", BindingFlags.Instance | BindingFlags.Public, [typeof(string), typeof(CoreInteropRequest)])
?? throw new InvalidOperationException();
_executeCommandAsyncInfo = _coreInteropField
.GetType()
?.GetMethod("ExecuteCommandAsync", BindingFlags.Instance | BindingFlags.Public, [typeof(string), typeof(CoreInteropRequest), typeof(CancellationToken?)])
?? throw new InvalidOperationException();
}
public BinaryData ExecuteInteropChat(PipelineMessage? message)
{
if (message is FoundryLocalPipelineMessage foundryLocalMessage
&& message?.Request?.Content is BinaryContent requestContent)
{
CoreInteropRequest interopRequest = CreateInteropRequest(requestContent);
object reflectedInteropResult = _executeCommandInfo.Invoke(_coreInteropField, ["chat_completions", interopRequest]) ?? throw new InvalidOperationException();
return GetInteropResultBytes(reflectedInteropResult);
}
else
{
throw new NotImplementedException();
}
}
public async Task<BinaryData> ExecuteInteropChatAsync(PipelineMessage? message)
{
if (message is FoundryLocalPipelineMessage foundryLocalMessage
&& message?.Request?.Content is BinaryContent requestContent)
{
CoreInteropRequest interopRequest = CreateInteropRequest(requestContent);
dynamic interopResultTask = _executeCommandAsyncInfo.Invoke(_coreInteropField, ["chat_completions", interopRequest, message.CancellationToken]) ?? throw new InvalidOperationException();
await interopResultTask;
object reflectedInteropResult = interopResultTask.GetType().GetProperty("Result").GetValue(interopResultTask);
return GetInteropResultBytes(reflectedInteropResult);
}
else
{
throw new NotImplementedException();
}
}
private static CoreInteropRequest CreateInteropRequest(BinaryContent content)
{
using MemoryStream contentStream = new();
content.WriteTo(contentStream);
contentStream.Flush();
contentStream.Position = 0;
using StreamReader contentReader = new(contentStream);
string rawContent = contentReader.ReadToEnd();
return new CoreInteropRequest()
{
Params = new()
{
["OpenAICreateRequest"] = rawContent
}
};
}
private static BinaryData GetInteropResultBytes(object reflectedInteropResult)
{
object? reflectedData = reflectedInteropResult
?.GetType()
?.GetField("Data", BindingFlags.Instance | BindingFlags.NonPublic)
?.GetValue(reflectedInteropResult);
if (reflectedData is string rawReflectedData)
{
return BinaryData.FromString(rawReflectedData);
}
return BinaryData.Empty;
}
}
}