Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
this.RunStatus = RunStatus.Running;
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));

// Emit WorkflowStartedEvent to the event stream for consumers
eventSink.Enqueue(new WorkflowStartedEvent());
Comment thread
peibekwe marked this conversation as resolved.

do
{
while (this._stepRunner.HasUnprocessedMessages &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
.SetTag(Tags.SessionId, this._stepRunner.SessionId);
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));

// Emit WorkflowStartedEvent to the event stream for consumers
await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false);

Comment thread
peibekwe marked this conversation as resolved.
Outdated
// Run all available supersteps continuously
// Events are streamed out in real-time as they happen via the event handler
while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.AI;

namespace Microsoft.Agents.AI.Workflows.UnitTests;
Expand Down Expand Up @@ -88,4 +90,71 @@ public void AgentResponseEvent_IsWorkflowOutputEvent()
Assert.Same(response, evt.Response);
Assert.Same(response, evt.Data);
}

/// <summary>
/// Regression test for https://github.com/microsoft/agent-framework/issues/3789
/// Verifies that WorkflowStartedEvent is emitted first before any SuperStepStartedEvent.
/// </summary>
[Fact]
public async Task StreamingRun_WorkflowStartedEvent_ShouldBeEmittedBefore_SuperStepStartedAsync()
{
// Arrange
TestEchoAgent agent = new("test-agent");
Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent);
ChatMessage inputMessage = new(ChatRole.User, "Hello");

// Act
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.Should().NotBeEmpty("workflow should produce events during execution");

List<WorkflowStartedEvent> startedEvents = events.OfType<WorkflowStartedEvent>().ToList();
startedEvents.Should().ContainSingle("workflow should emit exactly one WorkflowStartedEvent");

WorkflowStartedEvent startedEvent = startedEvents.First();
SuperStepStartedEvent? firstSuperStepEvent = events.OfType<SuperStepStartedEvent>().FirstOrDefault();
firstSuperStepEvent.Should().NotBeNull("workflow should emit SuperStepStartedEvent");

int startedIndex = events.IndexOf(startedEvent!);
int superStepIndex = events.IndexOf(firstSuperStepEvent!);

startedIndex.Should().BeLessThan(superStepIndex, "WorkflowStartedEvent should be emitted before SuperStepStartedEvent");
}
Comment thread
peibekwe marked this conversation as resolved.
Outdated

/// <summary>
/// Regression test for https://github.com/microsoft/agent-framework/issues/3789
/// Verifies that WorkflowStartedEvent is emitted using Lockstep execution mode.
/// </summary>
[Fact]
public async Task StreamingRun_LockstepExecution_ShouldEmit_WorkflowStartedEventAsync()
{
// Arrange
TestEchoAgent agent = new("test-agent");
Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent);
ChatMessage inputMessage = new(ChatRole.User, "Hello");

// Act: Use Lockstep execution mode
await using StreamingRun run = await InProcessExecution.Lockstep.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.Should().NotBeEmpty("workflow should produce events during execution");

List<WorkflowStartedEvent> startedEvents = events.OfType<WorkflowStartedEvent>().ToList();
startedEvents.Should().ContainSingle("Lockstep execution should emit exactly one WorkflowStartedEvent");
}
}
Loading