forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathStatefulExecutor.cs
More file actions
188 lines (164 loc) · 9.33 KB
/
StatefulExecutor.cs
File metadata and controls
188 lines (164 loc) · 9.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright (c) Microsoft. All rights reserved.
#pragma warning disable CS0618 // Type or member is obsolete - Internal use of obsolete types for backward compatibility
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Reflection;
using Microsoft.Shared.Diagnostics;
namespace Microsoft.Agents.AI.Workflows;
/// <summary>
/// Provides a base class for executors that maintain and manage state across multiple message handling operations.
/// </summary>
/// <typeparam name="TState">The type of state associated with this Executor.</typeparam>
public abstract class StatefulExecutor<TState> : Executor
{
private readonly Func<TState> _initialStateFactory;
private TState? _stateCache;
/// <summary>
/// Initializes the executor with a unique id and an initial value for the state.
/// </summary>
/// <param name="id">The unique identifier for this executor instance. Cannot be null or empty.</param>
/// <param name="initialStateFactory">A factory to initialize the state value to be used by the executor.</param>
/// <param name="options">Optional configuration settings for the executor. If null, default options are used.</param>
/// <param name="declareCrossRunShareable">true to declare that the executor's state can be shared across multiple runs; otherwise, false.</param>
protected StatefulExecutor(string id,
Func<TState> initialStateFactory,
StatefulExecutorOptions? options = null,
bool declareCrossRunShareable = false)
: base(id, options ?? new StatefulExecutorOptions(), declareCrossRunShareable)
{
this.Options = (StatefulExecutorOptions)base.Options;
this._initialStateFactory = Throw.IfNull(initialStateFactory);
}
/// <inheritdoc/>
protected new StatefulExecutorOptions Options { get; }
private string DefaultStateKey => $"{this.GetType().Name}.State";
/// <summary>
/// Gets the key used to identify the executor's state.
/// </summary>
protected string StateKey => this.Options.StateKey ?? this.DefaultStateKey;
/// <summary>
/// Reads the state associated with this executor. If it is not initialized, it will be set to the initial state.
/// </summary>
/// <param name="context">The workflow context in which the executor executes.</param>
/// <param name="skipCache">Ignore the cached value, if any. State is not cached when running in Cross-Run Shareable
/// mode.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
protected async ValueTask<TState> ReadStateAsync(IWorkflowContext context, bool skipCache = false, CancellationToken cancellationToken = default)
{
if (!skipCache && this._stateCache is not null)
{
return this._stateCache;
}
TState? state = await context.ReadOrInitStateAsync(this.StateKey, this._initialStateFactory, this.Options.ScopeName, cancellationToken)
.ConfigureAwait(false);
if (!context.ConcurrentRunsEnabled)
{
this._stateCache = state;
}
return state;
}
/// <summary>
/// Queues up an update to the executor's state.
/// </summary>
/// <param name="state">The new value of state.</param>
/// <param name="context">The workflow context in which the executor executes.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
protected ValueTask QueueStateUpdateAsync(TState state, IWorkflowContext context, CancellationToken cancellationToken = default)
{
if (!context.ConcurrentRunsEnabled)
{
this._stateCache = state;
}
return context.QueueStateUpdateAsync(this.StateKey, state, this.Options.ScopeName, cancellationToken);
}
/// <summary>
/// Invokes an asynchronous operation that reads, updates, and persists workflow state associated with the specified
/// key.
/// </summary>
/// <param name="invocation">A delegate that receives the current state, workflow context, and cancellation token,
/// and returns the updated state asynchronously.</param>
/// <param name="context">The workflow context in which the executor executes.</param>
/// <param name="skipCache">Ignore the cached value, if any. State is not cached when running in Cross-Run Shareable
/// mode.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A ValueTask that represents the asynchronous operation.</returns>
protected async ValueTask InvokeWithStateAsync(
Func<TState, IWorkflowContext, CancellationToken, ValueTask<TState?>> invocation,
IWorkflowContext context,
bool skipCache = false,
CancellationToken cancellationToken = default)
{
if (!skipCache && !context.ConcurrentRunsEnabled)
{
TState newState = await invocation(this._stateCache ?? this._initialStateFactory(),
context,
cancellationToken).ConfigureAwait(false)
?? this._initialStateFactory();
await context.QueueStateUpdateAsync(this.StateKey,
newState,
this.Options.ScopeName,
cancellationToken).ConfigureAwait(false);
this._stateCache = newState;
}
else
{
await context.InvokeWithStateAsync(invocation,
this.StateKey,
this._initialStateFactory,
this.Options.ScopeName,
cancellationToken)
.ConfigureAwait(false);
}
}
/// <inheritdoc cref="IResettableExecutor.ResetAsync"/>
protected ValueTask ResetAsync()
{
this._stateCache = this._initialStateFactory();
return default;
}
}
/// <summary>
/// Provides a simple executor implementation that uses a single message handler function to process incoming messages,
/// and maintain state across invocations.
/// </summary>
/// <typeparam name="TState">The type of state associated with this Executor.</typeparam>
/// <typeparam name="TInput">The type of input message.</typeparam>
/// <param name="id">A unique identifier for the executor.</param>
/// <param name="initialStateFactory">A factory to initialize the state value to be used by the executor.</param>
/// <param name="options">Configuration options for the executor. If <c>null</c>, default options will be used.</param>
/// <param name="declareCrossRunShareable">Declare that this executor may be used simultaneously by multiple runs safely.</param>
public abstract class StatefulExecutor<TState, TInput>(string id, Func<TState> initialStateFactory, StatefulExecutorOptions? options = null, bool declareCrossRunShareable = false)
: StatefulExecutor<TState>(id, initialStateFactory, options, declareCrossRunShareable), IMessageHandler<TInput>
{
/// <inheritdoc/>
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
routeBuilder.AddHandler<TInput>(this.HandleAsync);
/// <inheritdoc/>
public abstract ValueTask HandleAsync(TInput message, IWorkflowContext context, CancellationToken cancellationToken = default);
}
/// <summary>
/// Provides a simple executor implementation that uses a single message handler function to process incoming messages,
/// and maintain state across invocations.
/// </summary>
/// <typeparam name="TState">The type of state associated with this Executor.</typeparam>
/// <typeparam name="TInput">The type of input message.</typeparam>
/// <typeparam name="TOutput">The type of output message.</typeparam>
/// <param name="id">A unique identifier for the executor.</param>
/// <param name="initialStateFactory">A factory to initialize the state value to be used by the executor.</param>
/// <param name="options">Configuration options for the executor. If <c>null</c>, default options will be used.</param>
/// <param name="declareCrossRunShareable">Declare that this executor may be used simultaneously by multiple runs safely.</param>
public abstract class StatefulExecutor<TState, TInput, TOutput>(string id, Func<TState> initialStateFactory, StatefulExecutorOptions? options = null, bool declareCrossRunShareable = false)
: StatefulExecutor<TState>(id, initialStateFactory, options, declareCrossRunShareable), IMessageHandler<TInput, TOutput>
{
/// <inheritdoc/>
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
routeBuilder.AddHandler<TInput, TOutput>(this.HandleAsync);
/// <inheritdoc/>
public abstract ValueTask<TOutput> HandleAsync(TInput message, IWorkflowContext context, CancellationToken cancellationToken = default);
}