diff --git a/integration-tests/bin/app.ts b/integration-tests/bin/app.ts index 08806d31d..b2294b075 100644 --- a/integration-tests/bin/app.ts +++ b/integration-tests/bin/app.ts @@ -5,6 +5,7 @@ import {OnDemand} from '../lib/stacks/on-demand'; import {Otlp} from '../lib/stacks/otlp'; import {Snapstart} from '../lib/stacks/snapstart'; import {LambdaManagedInstancesStack} from '../lib/stacks/lmi'; +import {DurableExecutionStatusStack} from '../lib/stacks/de-status'; import {ACCOUNT, getIdentifier, REGION} from '../config'; import {CapacityProviderStack} from "../lib/capacity-provider"; @@ -34,6 +35,9 @@ const stacks = [ new LambdaManagedInstancesStack(app, `integ-${identifier}-lmi`, { env, }), + new DurableExecutionStatusStack(app, `integ-${identifier}-de-status`, { + env, + }), ] // Tag all stacks so we can easily clean them up diff --git a/integration-tests/lambda/de-status-python/lambda_function.py b/integration-tests/lambda/de-status-python/lambda_function.py new file mode 100644 index 000000000..34e23baa6 --- /dev/null +++ b/integration-tests/lambda/de-status-python/lambda_function.py @@ -0,0 +1,53 @@ +""" +Durable execution Lambda handler for testing durable_function_execution_status tag. + +This handler uses the @durable_execution decorator from the AWS Durable Execution SDK +to create a durable function. When invoked, the extension should detect the durable +execution status from the response and add the `durable_function_execution_status` tag +to the aws.lambda span. +""" +import json +import logging + +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, + durable_step, + StepContext, +) + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +@durable_step +def do_work(step_context: StepContext, message: str) -> str: + """A simple durable step that returns the input message.""" + logger.info(f"Executing durable step with message: {message}") + return f"Processed: {message}" + + +@durable_execution +def handler(event: dict, context: DurableContext) -> dict: + """ + Durable Lambda handler. + + The @durable_execution decorator transforms this into a durable function. + When invoked, AWS Lambda returns a response with {"Status": "SUCCEEDED|FAILED|PENDING", ...} + which the extension should parse to set the durable_function_execution_status tag. + """ + logger.info("Hello world!") + + message = event.get("message", "Hello from durable function!") + + # Execute a durable step - this creates a checkpoint + result = context.step(do_work(message)) + + logger.info(f"Durable step completed with result: {result}") + + return { + "statusCode": 200, + "body": json.dumps({ + "message": result, + }) + } diff --git a/integration-tests/lambda/de-status-python/requirements.txt b/integration-tests/lambda/de-status-python/requirements.txt new file mode 100644 index 000000000..b2109b255 --- /dev/null +++ b/integration-tests/lambda/de-status-python/requirements.txt @@ -0,0 +1 @@ +aws-durable-execution-sdk-python>=1.3.0 diff --git a/integration-tests/lib/stacks/de-status.ts b/integration-tests/lib/stacks/de-status.ts new file mode 100644 index 000000000..8cb2ed1c2 --- /dev/null +++ b/integration-tests/lib/stacks/de-status.ts @@ -0,0 +1,65 @@ +import * as cdk from 'aws-cdk-lib'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import { LayerVersion } from 'aws-cdk-lib/aws-lambda'; +import { Construct } from 'constructs'; +import { + createLogGroup, + defaultDatadogEnvVariables, + defaultDatadogSecretPolicy, + getExtensionLayer, + getDefaultPythonLayer, + defaultPythonRuntime, +} from '../util'; + +// Use custom Python layer ARN if provided, otherwise use default +const customPythonLayerArn = process.env.PYTHON_TRACER_LAYER_ARN; + +/** + * CDK Stack for testing durable_function_execution_status tag. + * + * Deploys a Python Lambda function with durable execution enabled. + * The durable function uses the @durable_execution decorator from the + * AWS Durable Execution SDK, which causes Lambda to return responses + * with {"Status": "SUCCEEDED|FAILED|PENDING", ...}. + * + * The datadog-lambda-python library extracts this status and adds the + * `durable_function_execution_status` tag to the aws.lambda span. + */ +export class DurableExecutionStatusStack extends cdk.Stack { + constructor(scope: Construct, id: string, props: cdk.StackProps) { + super(scope, id, props); + + const extensionLayer = getExtensionLayer(this); + const pythonLayer = customPythonLayerArn + ? LayerVersion.fromLayerVersionArn(this, 'CustomPythonLayer', customPythonLayerArn) + : getDefaultPythonLayer(this); + + const pythonFunctionName = `${id}-python-lambda`; + const pythonFunction = new lambda.Function(this, pythonFunctionName, { + runtime: defaultPythonRuntime, + architecture: lambda.Architecture.ARM_64, + handler: 'datadog_lambda.handler.handler', + code: lambda.Code.fromAsset('./lambda/de-status-python/package'), + functionName: pythonFunctionName, + timeout: cdk.Duration.seconds(60), + memorySize: 512, + environment: { + ...defaultDatadogEnvVariables, + DD_SERVICE: pythonFunctionName, + DD_TRACE_ENABLED: 'true', + DD_LAMBDA_HANDLER: 'lambda_function.handler', + DD_TRACE_AGENT_URL: 'http://127.0.0.1:8126', + }, + logGroup: createLogGroup(this, pythonFunctionName), + // Enable durable execution + // executionTimeout must be <= 15 minutes for synchronous invocation + durableConfig: { + executionTimeout: cdk.Duration.minutes(15), + retentionPeriod: cdk.Duration.days(14), + }, + }); + pythonFunction.addToRolePolicy(defaultDatadogSecretPolicy); + pythonFunction.addLayers(extensionLayer); + pythonFunction.addLayers(pythonLayer); + } +} diff --git a/integration-tests/tests/de-status.test.ts b/integration-tests/tests/de-status.test.ts new file mode 100644 index 000000000..9c153fc54 --- /dev/null +++ b/integration-tests/tests/de-status.test.ts @@ -0,0 +1,116 @@ +import { invokeAndCollectTelemetry, FunctionConfig } from './utils/default'; +import { DatadogTelemetry } from './utils/datadog'; +import { getIdentifier } from '../config'; + +const identifier = getIdentifier(); +const stackName = `integ-${identifier}-de-status`; + +describe('Durable Execution Status Tag Integration Tests', () => { + let results: Record; + + beforeAll(async () => { + // Durable functions require a qualified ARN (with version) for invocation + const functions: FunctionConfig[] = [ + { + functionName: `${stackName}-python-lambda:$LATEST`, + runtime: 'python', + }, + ]; + + console.log('Invoking durable execution functions...'); + + // Invoke all durable execution functions and collect telemetry + results = await invokeAndCollectTelemetry(functions, 1); + + console.log('Durable execution invocation and data fetching completed'); + }, 600000); + + describe('Python Runtime with Durable Execution', () => { + const getResult = () => results['python']?.[0]?.[0]; + + it('should invoke Lambda successfully', () => { + const result = getResult(); + expect(result).toBeDefined(); + expect(result.statusCode).toBe(200); + }); + + it('should have logs in Datadog', () => { + const result = getResult(); + expect(result).toBeDefined(); + expect(result.logs).toBeDefined(); + expect(result.logs!.length).toBeGreaterThan(0); + }); + + it('should have "Hello world!" log message', () => { + const result = getResult(); + expect(result).toBeDefined(); + expect(result.logs).toBeDefined(); + const helloWorldLog = result.logs!.find((log: any) => + log.message && log.message.includes('Hello world!') + ); + expect(helloWorldLog).toBeDefined(); + }); + + it('should send one trace to Datadog', () => { + const result = getResult(); + expect(result).toBeDefined(); + expect(result.traces?.length).toEqual(1); + }); + + it('trace should have exactly one span with operation_name=aws.lambda', () => { + const result = getResult(); + expect(result).toBeDefined(); + const trace = result.traces![0]; + expect(trace.spans).toBeDefined(); + + const awsLambdaSpans = trace.spans.filter((span: any) => + span.attributes.operation_name === 'aws.lambda' + ); + expect(awsLambdaSpans).toBeDefined(); + expect(awsLambdaSpans.length).toEqual(1); + }); + + it('aws.lambda span should have durable_function_execution_status tag', () => { + const result = getResult(); + expect(result).toBeDefined(); + const trace = result.traces![0]; + const awsLambdaSpan = trace.spans.find((span: any) => + span.attributes.operation_name === 'aws.lambda' + ); + expect(awsLambdaSpan).toBeDefined(); + + // The durable_function_execution_status tag should be present + const status = awsLambdaSpan?.attributes.custom?.durable_function_execution_status; + expect(status).toBeDefined(); + }); + + it('durable_function_execution_status tag should be one of: SUCCEEDED, FAILED, STOPPED, TIMED_OUT', () => { + const result = getResult(); + expect(result).toBeDefined(); + const trace = result.traces![0]; + const awsLambdaSpan = trace.spans.find((span: any) => + span.attributes.operation_name === 'aws.lambda' + ); + expect(awsLambdaSpan).toBeDefined(); + + const status = awsLambdaSpan?.attributes.custom?.durable_function_execution_status; + expect(status).toBeDefined(); + expect(['SUCCEEDED', 'FAILED', 'STOPPED', 'TIMED_OUT']).toContain(status); + }); + + it('durable_function_execution_status tag should be SUCCEEDED for successful execution', () => { + const result = getResult(); + expect(result).toBeDefined(); + const trace = result.traces![0]; + const awsLambdaSpan = trace.spans.find((span: any) => + span.attributes.operation_name === 'aws.lambda' + ); + expect(awsLambdaSpan).toBeDefined(); + + // For a successful durable function invocation that completes in one call, + // the status should be SUCCEEDED + const status = awsLambdaSpan?.attributes.custom?.durable_function_execution_status; + expect(status).toBe('SUCCEEDED'); + }); + }); +});