forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathExecutor.cs
More file actions
298 lines (255 loc) · 13.5 KB
/
Executor.cs
File metadata and controls
298 lines (255 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
// Copyright (c) Microsoft. All rights reserved.
#pragma warning disable CS0618 // Type or member is obsolete - Internal use of obsolete types for backward compatibility
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Execution;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Agents.AI.Workflows.Reflection;
namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// A component that processes messages in a <see cref="Workflow"/>.
/// </summary>
[DebuggerDisplay("{GetType().Name}[{Id}]")]
public abstract class Executor : IIdentified
{
/// <summary>
/// A unique identifier for the executor.
/// </summary>
public string Id { get; }
private static readonly string s_namespace = typeof(Executor).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);
// TODO: Add overloads for binding with a configuration/options object once the Configured<T> hierarchy goes away.
/// <summary>
/// Initialize the executor with a unique identifier
/// </summary>
/// <param name="id">A unique identifier for the executor.</param>
/// <param name="options">Configuration options for the executor. If <c>null</c>, default options will be used.</param>
/// <param name="declareCrossRunShareable">Declare that this executor may be used simultaneously by multiple runs safely.</param>
protected Executor(string id, ExecutorOptions? options = null, bool declareCrossRunShareable = false)
{
this.Id = id;
this.Options = options ?? ExecutorOptions.Default;
//if (declareCrossRunShareable && this is IResettableExecutor)
//{
// // We need a way to be able to let the user override this at the workflow level too, because knowing the fine
// // details of when to use which of these paths seems like it could be tricky, and we should not force users
// // to do this; instead container agents should set this when they intiate the run (via WorkflowHostAgent).
// throw new ArgumentException("An executor that is declared as cross-run shareable cannot also be resettable.");
//}
this.IsCrossRunShareable = declareCrossRunShareable;
}
internal bool IsCrossRunShareable { get; }
/// <summary>
/// Gets the configuration options for the executor.
/// </summary>
protected ExecutorOptions Options { get; }
/// <summary>
/// Override this method to register handlers for the executor.
/// </summary>
protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder);
internal void Configure(IExternalRequestContext externalRequestContext)
{
// TODO: This is an unfortunate pattern (pending the ability to rework the Configure APIs a bit):
// new()
// >>> will throw InvalidOperationException if Configure() is not invoked when using PortHandlers
// .Configure()
// >>> only usable now
// The fix would be to change the API surface of Executor to have Configure return the contract that the workflow
// will use to invoke the executor (currently the MessageRouter). (Ideally we would rename Executor to Node or similar,
// and the actual Executor class will represent that Contract object)
// Not a terrible issue right now because only InProcessExecution exists right now, and the InProccessRunContext centralizes
// executor instantiation in EnsureExecutorAsync.
this.Router = this.CreateRouter(externalRequestContext);
}
private MessageRouter CreateRouter(IExternalRequestContext? externalRequestContext = null)
=> this.ConfigureRoutes(new RouteBuilder(externalRequestContext)).Build();
/// <summary>
/// Perform any asynchronous initialization required by the executor. This method is called once per executor instance,
/// </summary>
/// <param name="context">The workflow context in which the executor executes.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="ValueTask"/> representing the asynchronous operation.</returns>
protected internal virtual ValueTask InitializeAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
=> default;
/// <summary>
/// Override this method to declare the types of messages this executor can send.
/// </summary>
/// <returns></returns>
protected virtual ISet<Type> ConfigureSentTypes() => new HashSet<Type>([typeof(object)]);
/// <summary>
/// Override this method to declare the types of messages this executor can yield as workflow outputs.
/// </summary>
/// <returns></returns>
protected virtual ISet<Type> ConfigureYieldTypes()
{
if (this.Options.AutoYieldOutputHandlerResultObject)
{
return this.Router.DefaultOutputTypes;
}
return new HashSet<Type>();
}
internal MessageRouter Router
{
get
{
if (field is null)
{
field = this.CreateRouter();
}
return field;
}
private set
{
field = value;
}
}
/// <summary>
/// Process an incoming message using the registered handlers.
/// </summary>
/// <param name="message">The message to be processed by the executor.</param>
/// <param name="messageType">The "declared" type of the message (captured when it was being sent). This is
/// used to enable routing messages as their base types, in absence of true polymorphic type routing.</param>
/// <param name="context">The workflow context in which the executor executes.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A ValueTask representing the asynchronous operation, wrapping the output from the executor.</returns>
/// <exception cref="NotSupportedException">No handler found for the message type.</exception>
/// <exception cref="TargetInvocationException">An exception is generated while handling the message.</exception>
public async ValueTask<object?> ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default)
{
using var activity = s_activitySource.StartActivity(ActivityNames.ExecutorProcess, ActivityKind.Internal);
activity?.SetTag(Tags.ExecutorId, this.Id)
.SetTag(Tags.ExecutorType, this.GetType().FullName)
.SetTag(Tags.MessageType, messageType.TypeName)
.CreateSourceLinks(context.TraceContext);
await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message), cancellationToken).ConfigureAwait(false);
CallResult? result = await this.Router.RouteMessageAsync(message, context, requireRoute: true, cancellationToken)
.ConfigureAwait(false);
ExecutorEvent executionResult;
if (result?.IsSuccess is not false)
{
executionResult = new ExecutorCompletedEvent(this.Id, result?.Result);
}
else
{
executionResult = new ExecutorFailedEvent(this.Id, result.Exception);
}
await context.AddEventAsync(executionResult, cancellationToken).ConfigureAwait(false);
if (result is null)
{
throw new NotSupportedException(
$"No handler found for message type {message.GetType().Name} in executor {this.GetType().Name}.");
}
if (!result.IsSuccess)
{
throw new TargetInvocationException($"Error invoking handler for {message.GetType()}", result.Exception);
}
if (result.IsVoid)
{
return null; // Void result.
}
// If we had a real return type, raise it as a SendMessage; TODO: Should we have a way to disable this behaviour?
if (result.Result is not null && this.Options.AutoSendMessageHandlerResultObject)
{
await context.SendMessageAsync(result.Result, cancellationToken: cancellationToken).ConfigureAwait(false);
}
if (result.Result is not null && this.Options.AutoYieldOutputHandlerResultObject)
{
await context.YieldOutputAsync(result.Result, cancellationToken).ConfigureAwait(false);
}
return result.Result;
}
/// <summary>
/// Invoked before a checkpoint is saved, allowing custom pre-save logic in derived classes.
/// </summary>
/// <param name="context">The workflow context.</param>
/// <returns>A ValueTask representing the asynchronous operation.</returns>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
protected internal virtual ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) => default;
/// <summary>
/// Invoked after a checkpoint is loaded, allowing custom post-load logic in derived classes.
/// </summary>
/// <param name="context">The workflow context.</param>
/// <returns>A ValueTask representing the asynchronous operation.</returns>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
protected internal virtual ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default) => default;
/// <summary>
/// A set of <see cref="Type"/>s, representing the messages this executor can handle.
/// </summary>
public ISet<Type> InputTypes => this.Router.IncomingTypes;
/// <summary>
/// A set of <see cref="Type"/>s, representing the messages this executor can produce as output.
/// </summary>
public ISet<Type> OutputTypes { get; } = new HashSet<Type>([typeof(object)]);
/// <summary>
/// Describes the protocol for communication with this <see cref="Executor"/>.
/// </summary>
/// <returns></returns>
public ProtocolDescriptor DescribeProtocol()
{
// TODO: Once burden of annotating yield/output messages becomes easier for the non-Auto case,
// we should (1) start checking for validity on output/send side, and (2) add the Yield/Send
// types to the ProtocolDescriptor.
return new(this.InputTypes, this.Router.HasCatchAll);
}
/// <summary>
/// Checks if the executor can handle a specific message type.
/// </summary>
/// <param name="messageType"></param>
/// <returns></returns>
public bool CanHandle(Type messageType) => this.Router.CanHandle(messageType);
internal bool CanHandle(TypeId messageType) => this.Router.CanHandle(messageType);
internal bool CanOutput(Type messageType)
{
foreach (Type type in this.OutputTypes)
{
if (type.IsAssignableFrom(messageType))
{
return true;
}
}
return false;
}
}
/// <summary>
/// Provides a simple executor implementation that uses a single message handler function to process incoming messages.
/// </summary>
/// <typeparam name="TInput">The type of input message.</typeparam>
/// <param name="id">A unique identifier for the executor.</param>
/// <param name="options">Configuration options for the executor. If <c>null</c>, default options will be used.</param>
/// <param name="declareCrossRunShareable">Declare that this executor may be used simultaneously by multiple runs safely.</param>
public abstract class Executor<TInput>(string id, ExecutorOptions? options = null, bool declareCrossRunShareable = false)
: Executor(id, options, declareCrossRunShareable), IMessageHandler<TInput>
{
/// <inheritdoc/>
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
routeBuilder.AddHandler<TInput>(this.HandleAsync);
/// <inheritdoc/>
public abstract ValueTask HandleAsync(TInput message, IWorkflowContext context, CancellationToken cancellationToken = default);
}
/// <summary>
/// Provides a simple executor implementation that uses a single message handler function to process incoming messages.
/// </summary>
/// <typeparam name="TInput">The type of input message.</typeparam>
/// <typeparam name="TOutput">The type of output message.</typeparam>
/// <param name="id">A unique identifier for the executor.</param>
/// <param name="options">Configuration options for the executor. If <c>null</c>, default options will be used.</param>
/// <param name="declareCrossRunShareable">Declare that this executor may be used simultaneously by multiple runs safely.</param>
public abstract class Executor<TInput, TOutput>(string id, ExecutorOptions? options = null, bool declareCrossRunShareable = false)
: Executor(id, options ?? ExecutorOptions.Default, declareCrossRunShareable),
IMessageHandler<TInput, TOutput>
{
/// <inheritdoc/>
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
routeBuilder.AddHandler<TInput, TOutput>(this.HandleAsync);
/// <inheritdoc/>
public abstract ValueTask<TOutput> HandleAsync(TInput message, IWorkflowContext context, CancellationToken cancellationToken = default);
}