Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
this.RunStatus = RunStatus.Running;
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));

// Emit WorkflowStartedEvent to the event stream for consumers
eventSink.Enqueue(new WorkflowStartedEvent());

do
{
while (this._stepRunner.HasUnprocessedMessages &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)

// Run all available supersteps continuously
// Events are streamed out in real-time as they happen via the event handler
while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
if (this._stepRunner.HasUnprocessedMessages)
{
await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false);
// Emit WorkflowStartedEvent only when there's actual work to process
// This avoids spurious events on timeout-only loop iterations
await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false);

while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
{
await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false);
}
}

// Update status based on what's waiting
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.AI;

namespace Microsoft.Agents.AI.Workflows.UnitTests;
Expand Down Expand Up @@ -88,4 +90,69 @@ public void AgentResponseEvent_IsWorkflowOutputEvent()
Assert.Same(response, evt.Response);
Assert.Same(response, evt.Data);
}

/// <summary>
/// Verifies that WorkflowStartedEvent is emitted first before any SuperStepStartedEvent.
/// </summary>
[Fact]
public async Task StreamingRun_WorkflowStartedEvent_ShouldBeEmittedBefore_SuperStepStartedAsync()
{
// Arrange
TestEchoAgent agent = new("test-agent");
Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent);
ChatMessage inputMessage = new(ChatRole.User, "Hello");

// Act
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.Should().NotBeEmpty();

List<WorkflowStartedEvent> startedEvents = events.OfType<WorkflowStartedEvent>().ToList();
startedEvents.Should().NotBeEmpty();

WorkflowStartedEvent? firstStartedEvent = startedEvents.FirstOrDefault();
SuperStepStartedEvent? firstSuperStepEvent = events.OfType<SuperStepStartedEvent>().FirstOrDefault();
firstSuperStepEvent.Should().NotBeNull();

int startedIndex = events.IndexOf(firstStartedEvent!);
int superStepIndex = events.IndexOf(firstSuperStepEvent!);

startedIndex.Should().BeLessThan(superStepIndex);
}

/// <summary>
/// Verifies that WorkflowStartedEvent is emitted using Lockstep execution mode.
/// </summary>
[Fact]
public async Task StreamingRun_LockstepExecution_ShouldEmit_WorkflowStartedEventAsync()
{
// Arrange
TestEchoAgent agent = new("test-agent");
Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent);
ChatMessage inputMessage = new(ChatRole.User, "Hello");

// Act: Use Lockstep execution mode
await using StreamingRun run = await InProcessExecution.Lockstep.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.Should().NotBeEmpty();

List<WorkflowStartedEvent> startedEvents = events.OfType<WorkflowStartedEvent>().ToList();
startedEvents.Should().NotBeEmpty();
}
}
Loading