Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integration-tests/bin/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {OnDemand} from '../lib/stacks/on-demand';
import {Otlp} from '../lib/stacks/otlp';
import {Snapstart} from '../lib/stacks/snapstart';
import {LambdaManagedInstancesStack} from '../lib/stacks/lmi';
import {DurableExecutionStatusStack} from '../lib/stacks/de-status';
import {ACCOUNT, getIdentifier, REGION} from '../config';
import {CapacityProviderStack} from "../lib/capacity-provider";

Expand Down Expand Up @@ -34,6 +35,9 @@ const stacks = [
new LambdaManagedInstancesStack(app, `integ-${identifier}-lmi`, {
env,
}),
new DurableExecutionStatusStack(app, `integ-${identifier}-de-status`, {
env,
}),
]

// Tag all stacks so we can easily clean them up
Expand Down
53 changes: 53 additions & 0 deletions integration-tests/lambda/de-status-python/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Durable execution Lambda handler for testing durable_function_execution_status tag.

This handler uses the @durable_execution decorator from the AWS Durable Execution SDK
to create a durable function. When invoked, the extension should detect the durable
execution status from the response and add the `durable_function_execution_status` tag
to the aws.lambda span.
"""
import json
import logging

from aws_durable_execution_sdk_python import (
DurableContext,
durable_execution,
durable_step,
StepContext,
)

logger = logging.getLogger()
logger.setLevel(logging.INFO)


@durable_step
def do_work(step_context: StepContext, message: str) -> str:
"""A simple durable step that returns the input message."""
logger.info(f"Executing durable step with message: {message}")
return f"Processed: {message}"


@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
"""
Durable Lambda handler.

The @durable_execution decorator transforms this into a durable function.
When invoked, AWS Lambda returns a response with {"Status": "SUCCEEDED|FAILED|PENDING", ...}
which the extension should parse to set the durable_function_execution_status tag.
"""
logger.info("Hello world!")

message = event.get("message", "Hello from durable function!")

# Execute a durable step - this creates a checkpoint
result = context.step(do_work(message))

logger.info(f"Durable step completed with result: {result}")

return {
"statusCode": 200,
"body": json.dumps({
"message": result,
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
aws-durable-execution-sdk-python>=1.3.0
65 changes: 65 additions & 0 deletions integration-tests/lib/stacks/de-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { LayerVersion } from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import {
createLogGroup,
defaultDatadogEnvVariables,
defaultDatadogSecretPolicy,
getExtensionLayer,
getDefaultPythonLayer,
defaultPythonRuntime,
} from '../util';

// Use custom Python layer ARN if provided, otherwise use default
const customPythonLayerArn = process.env.PYTHON_TRACER_LAYER_ARN;

/**
* CDK Stack for testing durable_function_execution_status tag.
*
* Deploys a Python Lambda function with durable execution enabled.
* The durable function uses the @durable_execution decorator from the
* AWS Durable Execution SDK, which causes Lambda to return responses
* with {"Status": "SUCCEEDED|FAILED|PENDING", ...}.
*
* The datadog-lambda-python library extracts this status and adds the
* `durable_function_execution_status` tag to the aws.lambda span.
*/
export class DurableExecutionStatusStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);

const extensionLayer = getExtensionLayer(this);
const pythonLayer = customPythonLayerArn
? LayerVersion.fromLayerVersionArn(this, 'CustomPythonLayer', customPythonLayerArn)
: getDefaultPythonLayer(this);

const pythonFunctionName = `${id}-python-lambda`;
const pythonFunction = new lambda.Function(this, pythonFunctionName, {
runtime: defaultPythonRuntime,
architecture: lambda.Architecture.ARM_64,
handler: 'datadog_lambda.handler.handler',
code: lambda.Code.fromAsset('./lambda/de-status-python/package'),
functionName: pythonFunctionName,
timeout: cdk.Duration.seconds(60),
memorySize: 512,
environment: {
...defaultDatadogEnvVariables,
DD_SERVICE: pythonFunctionName,
DD_TRACE_ENABLED: 'true',
DD_LAMBDA_HANDLER: 'lambda_function.handler',
DD_TRACE_AGENT_URL: 'http://127.0.0.1:8126',
},
logGroup: createLogGroup(this, pythonFunctionName),
// Enable durable execution
// executionTimeout must be <= 15 minutes for synchronous invocation
durableConfig: {
executionTimeout: cdk.Duration.minutes(15),
retentionPeriod: cdk.Duration.days(14),
},
});
pythonFunction.addToRolePolicy(defaultDatadogSecretPolicy);
pythonFunction.addLayers(extensionLayer);
pythonFunction.addLayers(pythonLayer);
}
}
116 changes: 116 additions & 0 deletions integration-tests/tests/de-status.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { invokeAndCollectTelemetry, FunctionConfig } from './utils/default';
import { DatadogTelemetry } from './utils/datadog';
import { getIdentifier } from '../config';

const identifier = getIdentifier();
const stackName = `integ-${identifier}-de-status`;

describe('Durable Execution Status Tag Integration Tests', () => {
let results: Record<string, DatadogTelemetry[][]>;

beforeAll(async () => {
// Durable functions require a qualified ARN (with version) for invocation
const functions: FunctionConfig[] = [
{
functionName: `${stackName}-python-lambda:$LATEST`,
runtime: 'python',
},
];

console.log('Invoking durable execution functions...');

// Invoke all durable execution functions and collect telemetry
results = await invokeAndCollectTelemetry(functions, 1);

console.log('Durable execution invocation and data fetching completed');
}, 600000);

describe('Python Runtime with Durable Execution', () => {
const getResult = () => results['python']?.[0]?.[0];

it('should invoke Lambda successfully', () => {
const result = getResult();
expect(result).toBeDefined();
expect(result.statusCode).toBe(200);
});

it('should have logs in Datadog', () => {
const result = getResult();
expect(result).toBeDefined();
expect(result.logs).toBeDefined();
expect(result.logs!.length).toBeGreaterThan(0);
});

it('should have "Hello world!" log message', () => {
const result = getResult();
expect(result).toBeDefined();
expect(result.logs).toBeDefined();
const helloWorldLog = result.logs!.find((log: any) =>
log.message && log.message.includes('Hello world!')
);
expect(helloWorldLog).toBeDefined();
});

it('should send one trace to Datadog', () => {
const result = getResult();
expect(result).toBeDefined();
expect(result.traces?.length).toEqual(1);
});

it('trace should have exactly one span with operation_name=aws.lambda', () => {
const result = getResult();
expect(result).toBeDefined();
const trace = result.traces![0];
expect(trace.spans).toBeDefined();

const awsLambdaSpans = trace.spans.filter((span: any) =>
span.attributes.operation_name === 'aws.lambda'
);
expect(awsLambdaSpans).toBeDefined();
expect(awsLambdaSpans.length).toEqual(1);
});

it('aws.lambda span should have durable_function_execution_status tag', () => {
const result = getResult();
expect(result).toBeDefined();
const trace = result.traces![0];
const awsLambdaSpan = trace.spans.find((span: any) =>
span.attributes.operation_name === 'aws.lambda'
);
expect(awsLambdaSpan).toBeDefined();

// The durable_function_execution_status tag should be present
const status = awsLambdaSpan?.attributes.custom?.durable_function_execution_status;
expect(status).toBeDefined();
});

it('durable_function_execution_status tag should be one of: SUCCEEDED, FAILED, PENDING', () => {
const result = getResult();
expect(result).toBeDefined();
const trace = result.traces![0];
const awsLambdaSpan = trace.spans.find((span: any) =>
span.attributes.operation_name === 'aws.lambda'
);
expect(awsLambdaSpan).toBeDefined();

const status = awsLambdaSpan?.attributes.custom?.durable_function_execution_status;
expect(status).toBeDefined();
expect(['SUCCEEDED', 'FAILED', 'PENDING']).toContain(status);
});

it('durable_function_execution_status tag should be SUCCEEDED for successful execution', () => {
const result = getResult();
expect(result).toBeDefined();
const trace = result.traces![0];
const awsLambdaSpan = trace.spans.find((span: any) =>
span.attributes.operation_name === 'aws.lambda'
);
expect(awsLambdaSpan).toBeDefined();

// For a successful durable function invocation that completes in one call,
// the status should be SUCCEEDED
const status = awsLambdaSpan?.attributes.custom?.durable_function_execution_status;
expect(status).toBe('SUCCEEDED');
});
});
});
Loading