Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://xunit.net/schema/current/xunit.runner.schema.json",
"parallelizeTestCollections": false,
"parallelizeTestCollections": true,
"parallelizeAssembly": false,
"maxParallelThreads": 1
"maxParallelThreads": 4
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public async Task GetResultAsync_FreshExecution_SuspendsExecution()

// GetResultAsync should signal termination and return a never-completing task.
var resultTask = callback.GetResultAsync();
await Task.Delay(10);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(resultTask.IsCompleted);
Expand Down Expand Up @@ -193,7 +193,7 @@ public async Task ReplayStarted_DoesNotReFlushStart_AndSuspendsOnGetResult()
Assert.False(tm.IsTerminated);

var resultTask = callback.GetResultAsync();
await Task.Delay(10);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(resultTask.IsCompleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public async Task RunInChildContextAsync_ChildSuspendsOnWait_TerminatesWithWaitS
},
name: "phase");

await Task.Delay(50);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public async Task WaitAsync_NewExecution_SignalsTermination()
var waitTask = context.WaitAsync(TimeSpan.FromSeconds(30), name: "my_wait");

// Give it a moment to execute
await Task.Delay(10);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(waitTask.IsCompleted);
Expand Down Expand Up @@ -433,7 +433,7 @@ public async Task WaitAsync_StartedButNotExpired_ResuspendsWithoutNewCheckpoint(

var waitTask = context.WaitAsync(TimeSpan.FromSeconds(30), name: "pending_wait");

await Task.Delay(10);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(waitTask.IsCompleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public async Task InvokeAsync_PreservesUnqualifiedArn_AndPassesItThrough()
payload: "x",
name: "noversion");

await Task.Delay(20);
await tm.WaitForTerminationAsync();
Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);

Expand All @@ -100,7 +100,7 @@ public async Task InvokeAsync_FreshExecution_CheckpointsStartAndSuspends()

// Service-side suspend mechanics: TerminationManager fires before the
// user task completes; the task itself never resolves on the fresh path.
await Task.Delay(20);
await tm.WaitForTerminationAsync();
Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);

Expand Down Expand Up @@ -130,7 +130,7 @@ public async Task InvokeAsync_FreshExecution_NoTenantId_OmitsTenantId()

var task = context.InvokeAsync<string, string>(FunctionArn, "payload", name: "no_tenant");

await Task.Delay(20);
await tm.WaitForTerminationAsync();
Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);

Expand All @@ -154,7 +154,7 @@ public async Task InvokeAsync_FreshExecution_StartIsSyncFlushed()
var (context, recorder, tm, _) = CreateContext();

var task = context.InvokeAsync<string, string>(FunctionArn, "x", name: "sync_flush");
await Task.Delay(20);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);
Expand Down Expand Up @@ -350,7 +350,7 @@ public async Task InvokeAsync_ReplayStarted_ResuspendsWithoutRecheckpoint()
});

var task = context.InvokeAsync<string, string>(FunctionArn, "x", name: "still_running");
await Task.Delay(20);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);
Expand All @@ -377,7 +377,7 @@ public async Task InvokeAsync_ReplayPending_ResuspendsWithoutRecheckpoint()
});

var task = context.InvokeAsync<string, string>(FunctionArn, "x", name: "pending");
await Task.Delay(20);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using Amazon.Lambda.DurableExecution.Internal;

namespace Amazon.Lambda.DurableExecution.Tests;

/// <summary>
/// Shared helpers for tests that exercise the suspend/terminate path.
/// </summary>
internal static class TerminationTestHelpers
{
/// <summary>
/// Waits for the suspend signal deterministically instead of a fixed delay, which races under
/// CI thread-pool pressure (the original <c>Task.Delay</c> assumed the suspend happened within a
/// fixed window, which isn't guaranteed). The suspend path trips
/// <see cref="TerminationManager.Terminate"/>, which completes
/// <see cref="TerminationManager.TerminationTask"/>. Bounded by a timeout so a genuine
/// non-suspension fails fast at the following assert instead of hanging.
/// </summary>
public static Task WaitForTerminationAsync(this TerminationManager tm, int timeoutSeconds = 10) =>
Task.WhenAny(tm.TerminationTask, Task.Delay(TimeSpan.FromSeconds(timeoutSeconds)));
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public async Task FreshExecution_StrategyContinues_EmitsRetryAndSuspends()
},
name: "poll");

await Task.Delay(50);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);
Expand Down Expand Up @@ -818,7 +818,7 @@ public async Task FreshExecution_FlushesStartBeforeSuspending()
},
name: "poll");

await Task.Delay(50);
await tm.WaitForTerminationAsync();

Assert.True(tm.IsTerminated);
Assert.False(task.IsCompleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,21 @@ public void CreateStream_OnDemandMode_ReturnsValidStream()
/// Validates: Requirements 1.3, 2.2, 2.3
/// </summary>
[Fact]
public void CreateStream_MultiConcurrencyMode_ReturnsValidStream()
public Task CreateStream_MultiConcurrencyMode_ReturnsValidStream()
{
var mock = new MockStreamingRuntimeApiClient();
InitializeWithMock("req-2", isMultiConcurrency: true, mock);
// Run on an isolated execution-context flow (Task.Run) so the multi-concurrency
// AsyncLocal context this writes does not leak onto the reused xUnit worker thread and
// contaminate a later on-demand test (see StreamingE2EWithMoq flake).
return Task.Run(() =>
{
var mock = new MockStreamingRuntimeApiClient();
InitializeWithMock("req-2", isMultiConcurrency: true, mock);

var stream = ResponseStreamFactory.CreateStream(Array.Empty<byte>());
var stream = ResponseStreamFactory.CreateStream(Array.Empty<byte>());

Assert.NotNull(stream);
Assert.IsAssignableFrom<ResponseStream>(stream);
Assert.NotNull(stream);
Assert.IsAssignableFrom<ResponseStream>(stream);
});
}

// --- Property 4: Single Stream Per Invocation ---
Expand Down Expand Up @@ -192,15 +198,21 @@ public void InitializeInvocation_OnDemand_SetsUpContext()
}

[Fact]
public void InitializeInvocation_MultiConcurrency_SetsUpContext()
public Task InitializeInvocation_MultiConcurrency_SetsUpContext()
{
var mock = new MockStreamingRuntimeApiClient();
InitializeWithMock("req-5", isMultiConcurrency: true, mock);
// Run on an isolated execution-context flow (Task.Run) so the multi-concurrency
// AsyncLocal context this writes does not leak onto the reused xUnit worker thread and
// contaminate a later on-demand test (see StreamingE2EWithMoq flake).
return Task.Run(() =>
{
var mock = new MockStreamingRuntimeApiClient();
InitializeWithMock("req-5", isMultiConcurrency: true, mock);

Assert.Null(ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true));
Assert.Null(ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true));

var stream = ResponseStreamFactory.CreateStream(Array.Empty<byte>());
Assert.NotNull(stream);
var stream = ResponseStreamFactory.CreateStream(Array.Empty<byte>());
Assert.NotNull(stream);
});
}

[Fact]
Expand Down Expand Up @@ -264,21 +276,27 @@ public void StateIsolation_SequentialInvocations_NoLeakage()
/// Validates: Requirements 2.9, 2.10
/// </summary>
[Fact]
public async Task StateIsolation_MultiConcurrency_UsesAsyncLocal()
public Task StateIsolation_MultiConcurrency_UsesAsyncLocal()
{
var mock = new MockStreamingRuntimeApiClient();
InitializeWithMock("req-9", isMultiConcurrency: true, mock);
var stream = ResponseStreamFactory.CreateStream(Array.Empty<byte>());
Assert.NotNull(stream);

bool childSawNull = false;
await Task.Run(() =>
// Run the whole body on an isolated execution-context flow (Task.Run) so the
// multi-concurrency AsyncLocal context written here does not leak onto the reused xUnit
// worker thread and contaminate a later on-demand test (see StreamingE2EWithMoq flake).
return Task.Run(async () =>
{
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true);
childSawNull = ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true) == null;
var mock = new MockStreamingRuntimeApiClient();
InitializeWithMock("req-9", isMultiConcurrency: true, mock);
var stream = ResponseStreamFactory.CreateStream(Array.Empty<byte>());
Assert.NotNull(stream);

bool childSawNull = false;
await Task.Run(() =>
{
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true);
childSawNull = ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency: true) == null;
});

Assert.True(childSawNull);
});

Assert.True(childSawNull);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ public class RuntimeSupportStateCheckCollection { }
[Collection("RuntimeSupportStateCheck")]
public class StreamingE2EWithMoq : IDisposable
{
public void Dispose()
// Reset the factory's static/async-local state before AND after each test so these tests
// start from a clean slate. The root cause of the cross-test leak (multi-concurrency tests
// writing the AsyncLocal on a reused xUnit worker thread) is contained at its source by
// running those tests on isolated Task.Run flows; this reset is belt-and-suspenders.
public StreamingE2EWithMoq() => ResetFactoryState();

public void Dispose() => ResetFactoryState();

private static void ResetFactoryState()
{
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false);
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: true);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;

namespace Amazon.Lambda.RuntimeSupport.UnitTests.TestHelpers
{
Expand All @@ -19,13 +17,14 @@ public TestFileStream(Action<byte[], int, int> writeAction)

public override void Write(byte[] buffer, int offset, int count)
{
WriteAction(TrimTrailingNullBytes(buffer).Take(count).ToArray(), offset, count);
}

private static IEnumerable<byte> TrimTrailingNullBytes(IEnumerable<byte> buffer)
{
// Trim trailing null bytes to make testing assertions easier
return buffer.Reverse().SkipWhile(x => x == 0).Reverse();
// Capture exactly the bytes that were written: [offset, offset + count).
// The previous implementation trimmed trailing null bytes from the buffer, which was
// flaky: a log header ends with an 8-byte big-endian microsecond timestamp, and roughly
// 1 in 256 timestamps ends in a 0x00 byte. Trimming that legitimate byte made the
// captured header 15 bytes instead of 16 and failed MaxSizeProducesOneLogFrame.
var written = new byte[count];
Array.Copy(buffer, offset, written, 0, count);
WriteAction(written, offset, count);
}
}
}
35 changes: 23 additions & 12 deletions Libraries/test/IntegrationTests.Helpers/LambdaHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,50 @@
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Amazon.CloudFormation;
using Amazon.CloudFormation.Model;
using Amazon.Lambda;
using Amazon.Lambda.Model;

namespace IntegrationTests.Helpers
{
public class LambdaHelper
{
// Resource type that SAM AWS::Serverless::Function resources are transformed into in the deployed stack.
private const string LambdaFunctionResourceType = "AWS::Lambda::Function";

private readonly IAmazonLambda _lambdaClient;
private readonly IAmazonCloudFormation _cloudFormationClient;

public LambdaHelper(IAmazonLambda lambdaClient)
public LambdaHelper(IAmazonLambda lambdaClient, IAmazonCloudFormation cloudFormationClient)
{
_lambdaClient = lambdaClient;
_cloudFormationClient = cloudFormationClient;
}

/// <summary>
/// Returns the Lambda functions belonging to a CloudFormation stack by listing the stack's
/// resources directly. This is O(stack size) and independent of how many functions exist in
/// the account, unlike scanning every function and reading its tags one at a time, which is
/// slow and prone to throttling in a shared test account.
/// </summary>
public async Task<List<LambdaFunction>> FilterByCloudFormationStackAsync(string stackName)
{
const string stackNameKey = "aws:cloudformation:stack-name";
const string logicalIdKey = "aws:cloudformation:logical-id";
var lambdaFunctions = new List<LambdaFunction>();
var paginator = _lambdaClient.Paginators.ListFunctions(new ListFunctionsRequest());
var paginator = _cloudFormationClient.Paginators.ListStackResources(
new ListStackResourcesRequest { StackName = stackName });

await foreach (var function in paginator.Functions)
await foreach (var resource in paginator.StackResourceSummaries)
{
var tags = (await _lambdaClient.ListTagsAsync(new ListTagsRequest { Resource = function.FunctionArn })).Tags;
if (tags.ContainsKey(stackNameKey) && string.Equals(tags[stackNameKey], stackName))
if (string.Equals(resource.ResourceType, LambdaFunctionResourceType))
{
var lambdaFunction = new LambdaFunction
lambdaFunctions.Add(new LambdaFunction
{
LogicalId = tags[logicalIdKey],
Name = function.FunctionName
};
lambdaFunctions.Add(lambdaFunction);
LogicalId = resource.LogicalResourceId,
Name = resource.PhysicalResourceId
});
}
}

Expand Down
Loading
Loading