-
Notifications
You must be signed in to change notification settings - Fork 499
Add durable execution Step + Wait end-to-end #2360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
GarrettBeatty
wants to merge
6
commits into
feature/durablefunction
Choose a base branch
from
GarrettBeatty/stack/2
base: feature/durablefunction
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
ec232db
Add durable execution Step + Wait end-to-end
GarrettBeatty 8b853ed
Track replay state per operation rather than via a global flag
GarrettBeatty d4d5d3d
Add to sln
GarrettBeatty 6ca4868
Update Libraries.sln to put Durable Function project in right solutio…
normj e6a88cc
Use ILambdaContext.Serializer in DurableExecution; remove ICheckpoint…
GarrettBeatty c3c251b
Address PR review feedback
GarrettBeatty File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
11 changes: 11 additions & 0 deletions
11
.autover/changes/35ada24f-0a68-4947-aded-0a27de9ad05a.json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| { | ||
| "Projects": [ | ||
| { | ||
| "Name": "Amazon.Lambda.DurableExecution", | ||
| "Type": "Patch", | ||
| "ChangelogMessages": [ | ||
| "Use ILambdaContext.Serializer for step checkpoint and workflow input/output serialization; remove ICheckpointSerializer<T>, ReflectionJsonCheckpointSerializer<T>, the JsonSerializerContext-taking WrapAsync overloads, and the [RequiresUnreferencedCode]/[RequiresDynamicCode] attributes that previously forked AOT vs reflection paths" | ||
| ] | ||
| } | ||
| ] | ||
| } |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
5 changes: 0 additions & 5 deletions
5
Libraries/src/Amazon.Lambda.DurableExecution/AssemblyMarker.cs
This file was deleted.
Oops, something went wrong.
128 changes: 128 additions & 0 deletions
128
Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| using Amazon.Lambda.Core; | ||
| using Amazon.Lambda.DurableExecution.Internal; | ||
| using Microsoft.Extensions.Logging; | ||
| using Microsoft.Extensions.Logging.Abstractions; | ||
|
|
||
| namespace Amazon.Lambda.DurableExecution; | ||
|
|
||
| /// <summary> | ||
| /// Implementation of <see cref="IDurableContext"/>. Constructs and dispatches | ||
| /// per-operation classes (<see cref="StepOperation{T}"/>, <see cref="WaitOperation"/>); | ||
| /// the replay logic lives in those classes. | ||
| /// </summary> | ||
| internal sealed class DurableContext : IDurableContext | ||
| { | ||
| private readonly ExecutionState _state; | ||
| private readonly TerminationManager _terminationManager; | ||
| private readonly OperationIdGenerator _idGenerator; | ||
| private readonly string _durableExecutionArn; | ||
| private readonly CheckpointBatcher? _batcher; | ||
|
|
||
| public DurableContext( | ||
| ExecutionState state, | ||
| TerminationManager terminationManager, | ||
| OperationIdGenerator idGenerator, | ||
| string durableExecutionArn, | ||
| ILambdaContext lambdaContext, | ||
| CheckpointBatcher? batcher = null) | ||
| { | ||
| _state = state; | ||
| _terminationManager = terminationManager; | ||
| _idGenerator = idGenerator; | ||
| _durableExecutionArn = durableExecutionArn; | ||
| _batcher = batcher; | ||
| LambdaContext = lambdaContext; | ||
| } | ||
|
|
||
| // Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc. | ||
| public ILogger Logger => NullLogger.Instance; | ||
| public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn); | ||
| public ILambdaContext LambdaContext { get; } | ||
|
GarrettBeatty marked this conversation as resolved.
|
||
|
|
||
| public Task<T> StepAsync<T>( | ||
| Func<IStepContext, Task<T>> func, | ||
| string? name = null, | ||
| StepConfig? config = null, | ||
| CancellationToken cancellationToken = default) | ||
| => RunStep(func, name, config, cancellationToken); | ||
|
|
||
| public async Task StepAsync( | ||
| Func<IStepContext, Task> func, | ||
| string? name = null, | ||
| StepConfig? config = null, | ||
| CancellationToken cancellationToken = default) | ||
| { | ||
| // Void steps don't carry a meaningful payload — wrap with an object?-typed | ||
| // step that always returns null. The serializer isn't actually invoked | ||
| // with a non-null value, so any registered ILambdaSerializer suffices. | ||
| await RunStep<object?>( | ||
| async (ctx) => { await func(ctx); return null; }, | ||
| name, config, cancellationToken); | ||
| } | ||
|
|
||
| private Task<T> RunStep<T>( | ||
| Func<IStepContext, Task<T>> func, | ||
| string? name, | ||
| StepConfig? config, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| var serializer = LambdaContext.Serializer | ||
| ?? throw new InvalidOperationException( | ||
| "No ILambdaSerializer is registered on ILambdaContext.Serializer. " + | ||
| "Register a serializer via LambdaBootstrapBuilder.Create(handler, serializer) " + | ||
| "(or in tests, set TestLambdaContext.Serializer)."); | ||
|
|
||
| var operationId = _idGenerator.NextId(); | ||
| var op = new StepOperation<T>( | ||
| operationId, name, func, config, serializer, Logger, | ||
| _state, _terminationManager, _durableExecutionArn, _batcher); | ||
| return op.ExecuteAsync(cancellationToken); | ||
| } | ||
|
|
||
| public Task WaitAsync( | ||
| TimeSpan duration, | ||
| string? name = null, | ||
| CancellationToken cancellationToken = default) | ||
| { | ||
| // Service timer granularity is 1 second; sub-second waits would round to 0. | ||
| // WaitOptions.WaitSeconds is integer in [1, 31_622_400] (1 second to ~1 year). | ||
| if (duration < TimeSpan.FromSeconds(1)) | ||
| throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at least 1 second."); | ||
|
|
||
| if (duration > TimeSpan.FromSeconds(31_622_400)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we be validating this on our end? |
||
| throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at most 31,622,400 seconds (~1 year)."); | ||
|
|
||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| var operationId = _idGenerator.NextId(); | ||
| var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds)); | ||
|
GarrettBeatty marked this conversation as resolved.
|
||
| var op = new WaitOperation( | ||
| operationId, name, waitSeconds, | ||
| _state, _terminationManager, _durableExecutionArn, _batcher); | ||
| return op.ExecuteAsync(cancellationToken); | ||
| } | ||
| } | ||
|
|
||
| internal sealed class DurableExecutionContext : IExecutionContext | ||
| { | ||
| public DurableExecutionContext(string durableExecutionArn) | ||
| { | ||
| DurableExecutionArn = durableExecutionArn; | ||
| } | ||
|
|
||
| public string DurableExecutionArn { get; } | ||
| } | ||
|
|
||
| internal sealed class StepContext : IStepContext | ||
| { | ||
| public StepContext(string operationId, int attemptNumber, ILogger logger) | ||
| { | ||
| OperationId = operationId; | ||
| AttemptNumber = attemptNumber; | ||
| Logger = logger; | ||
| } | ||
|
|
||
| public ILogger Logger { get; } | ||
| public int AttemptNumber { get; } | ||
| public string OperationId { get; } | ||
| } | ||
49 changes: 49 additions & 0 deletions
49
Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| namespace Amazon.Lambda.DurableExecution; | ||
|
GarrettBeatty marked this conversation as resolved.
|
||
|
|
||
| /// <summary> | ||
| /// Base exception for all durable execution errors. | ||
| /// </summary> | ||
| public class DurableExecutionException : Exception | ||
| { | ||
| /// <summary>Creates an empty <see cref="DurableExecutionException"/>.</summary> | ||
| public DurableExecutionException() { } | ||
| /// <summary>Creates a <see cref="DurableExecutionException"/> with the given message.</summary> | ||
| public DurableExecutionException(string message) : base(message) { } | ||
| /// <summary>Creates a <see cref="DurableExecutionException"/> wrapping an inner exception.</summary> | ||
| public DurableExecutionException(string message, Exception innerException) : base(message, innerException) { } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Thrown when code has changed between invocations, causing a replay mismatch. | ||
| /// For example, a step at index 0 was previously a WAIT but is now a STEP. | ||
| /// </summary> | ||
| public class NonDeterministicExecutionException : DurableExecutionException | ||
| { | ||
| /// <summary>Creates an empty <see cref="NonDeterministicExecutionException"/>.</summary> | ||
| public NonDeterministicExecutionException() { } | ||
| /// <summary>Creates a <see cref="NonDeterministicExecutionException"/> with the given message.</summary> | ||
| public NonDeterministicExecutionException(string message) : base(message) { } | ||
| /// <summary>Creates a <see cref="NonDeterministicExecutionException"/> wrapping an inner exception.</summary> | ||
| public NonDeterministicExecutionException(string message, Exception innerException) : base(message, innerException) { } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Thrown when user code inside a step fails (after retries exhausted). | ||
| /// Contains the original error details from the checkpoint. | ||
| /// </summary> | ||
| public class StepException : DurableExecutionException | ||
| { | ||
| /// <summary>The fully-qualified type name of the original exception.</summary> | ||
| public string? ErrorType { get; init; } | ||
| /// <summary>Optional structured error data attached by the user.</summary> | ||
| public string? ErrorData { get; init; } | ||
| /// <summary>Stack trace of the original exception, captured before serialization.</summary> | ||
| public IReadOnlyList<string>? OriginalStackTrace { get; init; } | ||
|
|
||
| /// <summary>Creates an empty <see cref="StepException"/>.</summary> | ||
| public StepException() { } | ||
| /// <summary>Creates a <see cref="StepException"/> with the given message.</summary> | ||
| public StepException(string message) : base(message) { } | ||
| /// <summary>Creates a <see cref="StepException"/> wrapping an inner exception.</summary> | ||
| public StepException(string message, Exception innerException) : base(message, innerException) { } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.