From 1b6c6750538105acac3453cc5d78a71244700487 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Thu, 5 Mar 2026 15:28:24 -0800 Subject: [PATCH 1/2] Fix bug to emit WorkflowStartedEvent during workflow execution --- .../Execution/LockstepRunEventStream.cs | 3 + .../Execution/StreamingRunEventStream.cs | 3 + .../AgentEventsTests.cs | 69 +++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 506a0d1039..72e96efb10 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -72,6 +72,9 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe this.RunStatus = RunStatus.Running; runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); + // Emit WorkflowStartedEvent to the event stream for consumers + eventSink.Enqueue(new WorkflowStartedEvent()); + do { while (this._stepRunner.HasUnprocessedMessages && diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index a09dedd8ad..beb3dc395b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -86,6 +86,9 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) .SetTag(Tags.SessionId, this._stepRunner.SessionId); runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); + // Emit WorkflowStartedEvent to the event stream for consumers + await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false); + // Run all available supersteps continuously // Events are streamed out in real-time as they happen via the event handler while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs index aadef98bac..47ceb364bd 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; +using FluentAssertions; using Microsoft.Extensions.AI; namespace Microsoft.Agents.AI.Workflows.UnitTests; @@ -88,4 +90,71 @@ public void AgentResponseEvent_IsWorkflowOutputEvent() Assert.Same(response, evt.Response); Assert.Same(response, evt.Data); } + + /// + /// Regression test for https://github.com/microsoft/agent-framework/issues/3789 + /// Verifies that WorkflowStartedEvent is emitted first before any SuperStepStartedEvent. + /// + [Fact] + public async Task StreamingRun_WorkflowStartedEvent_ShouldBeEmittedBefore_SuperStepStartedAsync() + { + // Arrange + TestEchoAgent agent = new("test-agent"); + Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent); + ChatMessage inputMessage = new(ChatRole.User, "Hello"); + + // Act + await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List { inputMessage }); + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.Should().NotBeEmpty("workflow should produce events during execution"); + + List startedEvents = events.OfType().ToList(); + startedEvents.Should().ContainSingle("workflow should emit exactly one WorkflowStartedEvent"); + + WorkflowStartedEvent startedEvent = startedEvents.First(); + SuperStepStartedEvent? firstSuperStepEvent = events.OfType().FirstOrDefault(); + firstSuperStepEvent.Should().NotBeNull("workflow should emit SuperStepStartedEvent"); + + int startedIndex = events.IndexOf(startedEvent!); + int superStepIndex = events.IndexOf(firstSuperStepEvent!); + + startedIndex.Should().BeLessThan(superStepIndex, "WorkflowStartedEvent should be emitted before SuperStepStartedEvent"); + } + + /// + /// Regression test for https://github.com/microsoft/agent-framework/issues/3789 + /// Verifies that WorkflowStartedEvent is emitted using Lockstep execution mode. + /// + [Fact] + public async Task StreamingRun_LockstepExecution_ShouldEmit_WorkflowStartedEventAsync() + { + // Arrange + TestEchoAgent agent = new("test-agent"); + Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent); + ChatMessage inputMessage = new(ChatRole.User, "Hello"); + + // Act: Use Lockstep execution mode + await using StreamingRun run = await InProcessExecution.Lockstep.RunStreamingAsync(workflow, new List { inputMessage }); + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.Should().NotBeEmpty("workflow should produce events during execution"); + + List startedEvents = events.OfType().ToList(); + startedEvents.Should().ContainSingle("Lockstep execution should emit exactly one WorkflowStartedEvent"); + } } From efc8b55c027cec8ff523db16fa75617bc252a926 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Fri, 6 Mar 2026 14:10:26 -0800 Subject: [PATCH 2/2] Updated based on PR comments --- .../Execution/StreamingRunEventStream.cs | 14 +++++++++----- .../AgentEventsTests.cs | 18 ++++++++---------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index beb3dc395b..6278f3446b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -86,14 +86,18 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) .SetTag(Tags.SessionId, this._stepRunner.SessionId); runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); - // Emit WorkflowStartedEvent to the event stream for consumers - await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false); - // Run all available supersteps continuously // Events are streamed out in real-time as they happen via the event handler - while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested) + if (this._stepRunner.HasUnprocessedMessages) { - await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false); + // Emit WorkflowStartedEvent only when there's actual work to process + // This avoids spurious events on timeout-only loop iterations + await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false); + + while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested) + { + await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false); + } } // Update status based on what's waiting diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs index 47ceb364bd..2b8c4805d1 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs @@ -92,7 +92,6 @@ public void AgentResponseEvent_IsWorkflowOutputEvent() } /// - /// Regression test for https://github.com/microsoft/agent-framework/issues/3789 /// Verifies that WorkflowStartedEvent is emitted first before any SuperStepStartedEvent. /// [Fact] @@ -114,23 +113,22 @@ public async Task StreamingRun_WorkflowStartedEvent_ShouldBeEmittedBefore_SuperS } // Assert - events.Should().NotBeEmpty("workflow should produce events during execution"); + events.Should().NotBeEmpty(); List startedEvents = events.OfType().ToList(); - startedEvents.Should().ContainSingle("workflow should emit exactly one WorkflowStartedEvent"); + startedEvents.Should().NotBeEmpty(); - WorkflowStartedEvent startedEvent = startedEvents.First(); + WorkflowStartedEvent? firstStartedEvent = startedEvents.FirstOrDefault(); SuperStepStartedEvent? firstSuperStepEvent = events.OfType().FirstOrDefault(); - firstSuperStepEvent.Should().NotBeNull("workflow should emit SuperStepStartedEvent"); + firstSuperStepEvent.Should().NotBeNull(); - int startedIndex = events.IndexOf(startedEvent!); + int startedIndex = events.IndexOf(firstStartedEvent!); int superStepIndex = events.IndexOf(firstSuperStepEvent!); - startedIndex.Should().BeLessThan(superStepIndex, "WorkflowStartedEvent should be emitted before SuperStepStartedEvent"); + startedIndex.Should().BeLessThan(superStepIndex); } /// - /// Regression test for https://github.com/microsoft/agent-framework/issues/3789 /// Verifies that WorkflowStartedEvent is emitted using Lockstep execution mode. /// [Fact] @@ -152,9 +150,9 @@ public async Task StreamingRun_LockstepExecution_ShouldEmit_WorkflowStartedEvent } // Assert - events.Should().NotBeEmpty("workflow should produce events during execution"); + events.Should().NotBeEmpty(); List startedEvents = events.OfType().ToList(); - startedEvents.Should().ContainSingle("Lockstep execution should emit exactly one WorkflowStartedEvent"); + startedEvents.Should().NotBeEmpty(); } }