Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dotnet/src/Microsoft.Agents.AI.DurableTask/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased]

- Fixed `WorkflowOutputEvent` streaming deserialization to read `executorId` instead of the renamed `sourceId` property, with fallback for backward compatibility ([#6896](https://github.com/microsoft/agent-framework/pull/6896))
- Fix issue with resuming checkpoint after package version upgrade ([#6670](https://github.com/microsoft/agent-framework/pull/6670))
- Bind MCP threadId to the current agent and guard cross-agent session dispatch ([#6531](https://github.com/microsoft/agent-framework/pull/6531))
- Added support for durable workflows ([#4436](https://github.com/microsoft/agent-framework/pull/4436))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,13 @@ private static bool TryParseWorkflowResult(string? serializedOutput, [NotNullWhe
}

// WorkflowOutputEvent
string sourceId = root.GetProperty("sourceId").GetString() ?? string.Empty;
string outputExecutorId = root.TryGetProperty("executorId", out JsonElement execIdElem)
? execIdElem.GetString() ?? string.Empty
: root.TryGetProperty("sourceId", out JsonElement srcIdElem)
? srcIdElem.GetString() ?? string.Empty
: string.Empty;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fallback now silently accepts a malformed payload like {"data":"x"} and produces a WorkflowOutputEvent with an empty ExecutorId. That violates the documented invariant that ExecutorId is "The unique identifier of the executor that yielded this output." The surrounding caller already treats JsonException as an unparsable event (DurableStreamingWorkflowRun.cs:395-397), so the backward-compat path should still fail closed when neither key is present instead of manufacturing anonymous output event.

Suggested change
: string.Empty;
string outputExecutorId = root.TryGetProperty("executorId", out JsonElement execIdElem)
? execIdElem.GetString() ?? string.Empty
: root.TryGetProperty("sourceId", out JsonElement srcIdElem)
? srcIdElem.GetString() ?? string.Empty
: throw new JsonException("WorkflowOutputEvent is missing required executorId/sourceId.");

object? outputData = GetDataProperty(root);
return new WorkflowOutputEvent(outputData!, sourceId);
return new WorkflowOutputEvent(outputData!, outputExecutorId);
Comment thread
kshyju marked this conversation as resolved.
}

return JsonSerializer.Deserialize(json, eventType, DurableSerialization.Options) as WorkflowEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,66 @@ public async Task WatchStreamAsync_CompletedWithEventsInOutput_YieldsEventsAndCo
Assert.Equal("result", completedResult.Result);
}

[Fact]
public async Task WatchStreamAsync_WorkflowOutputEvent_RoundTripsCorrectlyAsync()
{
// Arrange
WorkflowOutputEvent outputEvent = new("test-data", "executor-1");
string serializedEvent = SerializeEvent(outputEvent);
string serializedOutput = SerializeWorkflowResult("done", [serializedEvent]);

Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput));

DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());

// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
Assert.Equal(2, events.Count);
WorkflowOutputEvent result = Assert.IsType<WorkflowOutputEvent>(events[0]);
Assert.Equal("executor-1", result.ExecutorId);
Assert.Equal("test-data", result.Data?.ToString());
Assert.Empty(result.Tags);
}
Comment thread
kshyju marked this conversation as resolved.

[Fact]
public async Task WatchStreamAsync_WorkflowOutputEvent_LegacySourceId_RoundTripsCorrectlyAsync()
{
// Arrange — simulate a legacy payload that uses "sourceId" instead of "executorId"
const string LegacyEventJson = """{"sourceId":"legacy-executor","data":"legacy-data"}""";
string typeName = typeof(WorkflowOutputEvent).AssemblyQualifiedName!;
string serializedEvent = JsonSerializer.Serialize(
new { typeName, data = LegacyEventJson },
DurableSerialization.Options);
string serializedOutput = SerializeWorkflowResult("done", [serializedEvent]);

Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput));

DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());

// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
Assert.Equal(2, events.Count);
WorkflowOutputEvent result = Assert.IsType<WorkflowOutputEvent>(events[0]);
Assert.Equal("legacy-executor", result.ExecutorId);
Assert.Equal("legacy-data", result.Data?.ToString());
}

[Fact]
public async Task WatchStreamAsync_CompletedWithoutWrapper_YieldsFailedEventAsync()
{
Expand Down
Loading