-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Expand file tree
/
Copy pathLockstepRunEventStream.cs
More file actions
210 lines (174 loc) · 8.16 KB
/
LockstepRunEventStream.cs
File metadata and controls
210 lines (174 loc) · 8.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Observability;
namespace Microsoft.Agents.AI.Workflows.Execution;
internal sealed class LockstepRunEventStream : IRunEventStream
{
private readonly CancellationTokenSource _stopCancellation = new();
private readonly InputWaiter _inputWaiter = new();
private int _isDisposed;
private readonly ISuperStepRunner _stepRunner;
private Activity? _sessionActivity;
public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default) => new(this.RunStatus);
public LockstepRunEventStream(ISuperStepRunner stepRunner)
{
this._stepRunner = stepRunner;
}
private RunStatus RunStatus { get; set; } = RunStatus.NotStarted;
public void Start()
{
// Save and restore Activity.Current so the long-lived session activity
// doesn't leak into caller code via AsyncLocal.
Activity? previousActivity = Activity.Current;
this._sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();
this._sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
.SetTag(Tags.SessionId, this._stepRunner.SessionId);
this._sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted));
Activity.Current = previousActivity;
}
public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPendingRequest, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
#if NET
ObjectDisposedException.ThrowIf(Volatile.Read(ref this._isDisposed) == 1, this);
#else
if (Volatile.Read(ref this._isDisposed) == 1)
{
throw new ObjectDisposedException(nameof(LockstepRunEventStream));
}
#endif
using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken);
ConcurrentQueue<WorkflowEvent> eventSink = [];
this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync;
// Re-establish session as parent so the run activity nests correctly.
Activity.Current = this._sessionActivity;
// Not 'using' — must dispose explicitly in finally for deterministic export.
Activity? runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId);
try
{
this.RunStatus = RunStatus.Running;
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));
// Emit WorkflowStartedEvent to the event stream for consumers
eventSink.Enqueue(new WorkflowStartedEvent());
do
{
while (this._stepRunner.HasUnprocessedMessages &&
!linkedSource.Token.IsCancellationRequested)
{
// Because we may be yielding out of this function, we need to ensure that the Activity.Current
// is set to our activity for the duration of this loop iteration.
Activity.Current = runActivity;
// Drain SuperSteps while there are steps to run
try
{
await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (Exception ex) when (runActivity is not null)
{
runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
{ Tags.ErrorType, ex.GetType().FullName },
{ Tags.ErrorMessage, ex.Message },
}));
runActivity.CaptureException(ex);
throw;
}
if (linkedSource.Token.IsCancellationRequested)
{
yield break; // Exit if cancellation is requested
}
bool hadRequestHaltEvent = false;
foreach (WorkflowEvent raisedEvent in Interlocked.Exchange(ref eventSink, []))
{
if (linkedSource.Token.IsCancellationRequested)
{
yield break; // Exit if cancellation is requested
}
// TODO: Do we actually want to interpret this as a termination request?
if (raisedEvent is RequestHaltEvent)
{
hadRequestHaltEvent = true;
}
else
{
yield return raisedEvent;
}
}
if (hadRequestHaltEvent || linkedSource.Token.IsCancellationRequested)
{
// If we had a completion event, we are done.
yield break;
}
this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle;
}
if (blockOnPendingRequest && this.RunStatus == RunStatus.PendingRequests)
{
try
{
await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{ }
}
} while (!ShouldBreak());
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
}
finally
{
this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle;
this._stepRunner.OutgoingEvents.EventRaised -= OnWorkflowEventAsync;
// Explicitly dispose the Activity so Activity.Stop fires deterministically,
// regardless of how the async iterator enumerator is disposed.
runActivity?.Dispose();
}
ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e)
{
eventSink.Enqueue(e);
return default;
}
// If we are Idle or Ended, we should break out of the loop
// If we are PendingRequests and not blocking on pending requests, we should break out of the loop
// If cancellation is requested, we should break out of the loop
bool ShouldBreak() => this.RunStatus is RunStatus.Idle or RunStatus.Ended ||
(this.RunStatus == RunStatus.PendingRequests && !blockOnPendingRequest) ||
linkedSource.Token.IsCancellationRequested;
}
/// <summary>
/// Signals that new input has been provided and the run loop should continue processing.
/// Called by AsyncRunHandle when the user enqueues a message or response.
/// </summary>
public void SignalInput()
{
this._inputWaiter?.SignalInput();
}
public ValueTask StopAsync()
{
this._stopCancellation.Cancel();
return default;
}
public ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref this._isDisposed, 1) == 0)
{
this._stopCancellation.Cancel();
// Stop the session activity
if (this._sessionActivity is not null)
{
this._sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted));
this._sessionActivity.Dispose();
this._sessionActivity = null;
}
this._stopCancellation.Dispose();
this._inputWaiter.Dispose();
}
return default;
}
}