diff --git a/cdk-sfn-dmap-df/README.md b/cdk-sfn-dmap-df/README.md new file mode 100644 index 000000000..67a4573ce --- /dev/null +++ b/cdk-sfn-dmap-df/README.md @@ -0,0 +1,149 @@ +# AWS Step Functions Distributed Map with Lambda durable functions + +This pattern demonstrates how to use AWS Step Functions Distributed Map with an Amazon S3 JSON input to fan out across 50 product catalog items, invoking a Lambda durable function for each item. The key technique is using the AWS Step Functions AWS SDK service integration (`CallAwsService` targeting `lambda:invoke`) instead of the optimized Lambda integration. This is currently necessary because only the raw SDK integration exposes the `DurableExecutionName` parameter, which enables per-item idempotency, derived from each product's `itemId`, showcased in this example. + +Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/cdk-sfn-dmap-df](https://serverlessland.com/patterns/cdk-sfn-dmap-df) + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [Node.js and npm](https://nodejs.org/) installed (Node.js 22+) +* [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html) installed +* CDK bootstrapped in your target account/region + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + + ```bash + git clone https://github.com/aws-samples/serverless-patterns + ``` + +2. Change directory to the pattern directory: + + ```bash + cd cdk-sfn-dmap-df + ``` + +3. Install dependencies: + + ```bash + npm install + ``` + +4. Run tests: + + ```bash + npm test + ``` + + This runs the CDK stack assertions and the local durable function tests using `LocalDurableTestRunner`. The durable function tests verify the three-operation workflow (validate → wait → update) and price tier assignment without deploying to AWS. + +5. Deploy the CDK stack to your default AWS account and region: + + ```bash + cdk deploy + ``` + +6. Note the outputs from the CDK deployment process. These contain the resource names and ARNs used for testing. + +## How it works + +![Architecture Diagram](cdk-sfn-dmap-df.png) + +Architecture flow: +1. AWS Step Functions Distributed Map reads 50 product items from an Amazon S3 JSON file +2. For each item, the map invokes a Lambda durable function via the AWS SDK service integration (`lambda:invoke`) +3. The `DurableExecutionName` is derived from each item's `itemId` using `States.Format`, providing per-item idempotency +4. Each durable function executes a three-operation workflow: + - **`validate-item`** (step) — Validates required fields, checks price > 0, computes a pricing tier (budget / standard / premium) + - **`rate-limit-delay`** (wait) — Pauses 5 seconds to simulate downstream rate limiting. No compute charges during this wait + - **`update-catalog`** (step) — Writes the enriched catalog entry with processing timestamps and a `completed` status +5. Results are written back to Amazon S3 under the `results/` prefix + +### Why the AWS SDK Service Integration? + +AWS Step Functions offers two ways to invoke AWS Lambda: + +| Integration | ARN Pattern | `DurableExecutionName` Support | +|---|---|---| +| Optimized Lambda | `arn:aws:states:::lambda:invoke` | No | +| AWS SDK | `arn:aws:states:::aws-sdk:lambda:invoke` | Yes | + +The optimized integration is simpler but only exposes a subset of the `Lambda.Invoke` API parameters. The AWS SDK integration maps directly to the full `Lambda.Invoke` API, giving access to `DurableExecutionName`. In CDK, this is expressed with `CallAwsService`: + +```typescript +new tasks.CallAwsService(this, 'InvokeDurableFunction', { + service: 'lambda', + action: 'invoke', + parameters: { + 'FunctionName': `${itemProcessor.functionArn}:$LATEST`, + 'InvocationType': 'RequestResponse', + 'DurableExecutionName.$': "States.Format('dmap-item-{}', $.itemId)", + 'Payload.$': '$', + }, + iamResources: [ + `${itemProcessor.functionArn}:$LATEST`, + itemProcessor.functionArn, + ], +}); +``` + +The `DurableExecutionName` is derived from each item's `itemId` using `States.Format`, so re-running the state machine with the same input file produces the same execution names. This means the durable functions return their previously checkpointed results instead of re-executing, providing end-to-end idempotency from AWS Step Functions through to durable function state. + +## Testing + +### Start the State Machine + +```bash +aws stepfunctions start-execution \ + --state-machine-arn \ + --name "catalog-update-$(date +%s)" +``` + +### Monitor Execution + +Monitor the execution in the AWS Step Functions console. The Distributed Map view shows all 50 child executions and their status. Each child invokes the durable function synchronously with a unique `DurableExecutionName`. Results are written back to the Amazon S3 bucket under the `results/` prefix. + +### Verify Deployment + +```bash +# Confirm items.json was deployed to S3 +aws s3 ls s3:///items.json + +# Confirm the state machine exists +aws stepfunctions describe-state-machine \ + --state-machine-arn +``` + +### Re-running for Idempotency + +To demonstrate idempotency, start another execution with the same input: + +```bash +aws stepfunctions start-execution \ + --state-machine-arn \ + --name "catalog-update-rerun-$(date +%s)" +``` + +Since the `DurableExecutionName` values are derived from the `itemId` (which does not change between runs), the durable functions detect that executions with those names already completed and return the previously checkpointed results without re-executing any steps. + +## Cleanup + +1. Delete the stack: + + ```bash + cdk destroy + ``` + +2. Confirm the deletion when prompted. This removes all resources including the Amazon S3 bucket (configured with `autoDeleteObjects` and `RemovalPolicy.DESTROY`) and Amazon CloudWatch log groups. + +--- + +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/cdk-sfn-dmap-df/bin/cdk-sfn-dmap-df.ts b/cdk-sfn-dmap-df/bin/cdk-sfn-dmap-df.ts new file mode 100644 index 000000000..6e8d4cf15 --- /dev/null +++ b/cdk-sfn-dmap-df/bin/cdk-sfn-dmap-df.ts @@ -0,0 +1,20 @@ +#!/usr/bin/env node +import * as cdk from 'aws-cdk-lib/core'; +import { CdkSfnDmapDfStack } from '../lib/cdk-sfn-dmap-df-stack'; + +const app = new cdk.App(); +new CdkSfnDmapDfStack(app, 'CdkSfnDmapDfStack', { + /* If you don't specify 'env', this stack will be environment-agnostic. + * Account/Region-dependent features and context lookups will not work, + * but a single synthesized template can be deployed anywhere. */ + + /* Uncomment the next line to specialize this stack for the AWS Account + * and Region that are implied by the current CLI configuration. */ + // env: { account: process.env.CDK_DEFAULT_ACCOUNT, region: process.env.CDK_DEFAULT_REGION }, + + /* Uncomment the next line if you know exactly what Account and Region you + * want to deploy the stack to. */ + // env: { account: '123456789012', region: 'us-east-1' }, + + /* For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html */ +}); diff --git a/cdk-sfn-dmap-df/cdk-sfn-dmap-df.png b/cdk-sfn-dmap-df/cdk-sfn-dmap-df.png new file mode 100644 index 000000000..585398325 Binary files /dev/null and b/cdk-sfn-dmap-df/cdk-sfn-dmap-df.png differ diff --git a/cdk-sfn-dmap-df/cdk.json b/cdk-sfn-dmap-df/cdk.json new file mode 100644 index 000000000..25ae7dcb7 --- /dev/null +++ b/cdk-sfn-dmap-df/cdk.json @@ -0,0 +1,103 @@ +{ + "app": "npx ts-node --prefer-ts-exts bin/cdk-sfn-dmap-df.ts", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "**/*.d.ts", + "**/*.js", + "tsconfig.json", + "package*.json", + "yarn.lock", + "node_modules", + "test" + ] + }, + "context": { + "@aws-cdk/aws-signer:signingProfileNamePassedToCfn": true, + "@aws-cdk/aws-ecs-patterns:secGroupsDisablesImplicitOpenListener": true, + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-kms:applyImportedAliasPermissionsToPrincipal": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true, + "@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true, + "@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true, + "@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true, + "@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true, + "@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true, + "@aws-cdk/aws-eks:nodegroupNameAttribute": true, + "@aws-cdk/aws-eks:useNativeOidcProvider": true, + "@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true, + "@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true, + "@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false, + "@aws-cdk/aws-s3:keepNotificationInImportedBucket": false, + "@aws-cdk/core:explicitStackTags": true, + "@aws-cdk/aws-ecs:reduceEc2FargateCloudWatchPermissions": true, + "@aws-cdk/aws-dynamodb:resourcePolicyPerReplica": true, + "@aws-cdk/aws-ec2:ec2SumTImeoutEnabled": true, + "@aws-cdk/aws-appsync:appSyncGraphQLAPIScopeLambdaPermission": true, + "@aws-cdk/aws-rds:setCorrectValueForDatabaseInstanceReadReplicaInstanceResourceId": true, + "@aws-cdk/core:cfnIncludeRejectComplexResourceUpdateCreatePolicyIntrinsics": true, + "@aws-cdk/aws-lambda-nodejs:sdkV3ExcludeSmithyPackages": true, + "@aws-cdk/aws-stepfunctions-tasks:fixRunEcsTaskPolicy": true, + "@aws-cdk/aws-ec2:bastionHostUseAmazonLinux2023ByDefault": true, + "@aws-cdk/aws-route53-targets:userPoolDomainNameMethodWithoutCustomResource": true, + "@aws-cdk/aws-elasticloadbalancingV2:albDualstackWithoutPublicIpv4SecurityGroupRulesDefault": true, + "@aws-cdk/aws-iam:oidcRejectUnauthorizedConnections": true, + "@aws-cdk/core:enableAdditionalMetadataCollection": true, + "@aws-cdk/aws-lambda:createNewPoliciesWithAddToRolePolicy": false, + "@aws-cdk/aws-s3:setUniqueReplicationRoleName": true, + "@aws-cdk/aws-events:requireEventBusPolicySid": true, + "@aws-cdk/core:aspectPrioritiesMutating": true, + "@aws-cdk/aws-dynamodb:retainTableReplica": true, + "@aws-cdk/aws-stepfunctions:useDistributedMapResultWriterV2": true, + "@aws-cdk/s3-notifications:addS3TrustKeyPolicyForSnsSubscriptions": true, + "@aws-cdk/aws-ec2:requirePrivateSubnetsForEgressOnlyInternetGateway": true, + "@aws-cdk/aws-s3:publicAccessBlockedByDefault": true, + "@aws-cdk/aws-lambda:useCdkManagedLogGroup": true, + "@aws-cdk/aws-elasticloadbalancingv2:networkLoadBalancerWithSecurityGroupByDefault": true, + "@aws-cdk/aws-ecs-patterns:uniqueTargetGroupId": true, + "@aws-cdk/aws-route53-patterns:useDistribution": true + } +} diff --git a/cdk-sfn-dmap-df/data/items.json b/cdk-sfn-dmap-df/data/items.json new file mode 100644 index 000000000..7c6dc3877 --- /dev/null +++ b/cdk-sfn-dmap-df/data/items.json @@ -0,0 +1,302 @@ +[ + { + "itemId": "PROD-001", + "productName": "Wireless Mouse", + "category": "electronics", + "price": 29.99 + }, + { + "itemId": "PROD-002", + "productName": "USB-C Hub", + "category": "electronics", + "price": 49.99 + }, + { + "itemId": "PROD-003", + "productName": "Mechanical Keyboard", + "category": "electronics", + "price": 129.99 + }, + { + "itemId": "PROD-004", + "productName": "Cotton T-Shirt", + "category": "clothing", + "price": 14.99 + }, + { + "itemId": "PROD-005", + "productName": "Running Shoes", + "category": "clothing", + "price": 89.99 + }, + { + "itemId": "PROD-006", + "productName": "Desk Lamp", + "category": "home", + "price": 34.99 + }, + { + "itemId": "PROD-007", + "productName": "Bluetooth Speaker", + "category": "electronics", + "price": 59.99 + }, + { + "itemId": "PROD-008", + "productName": "Yoga Mat", + "category": "fitness", + "price": 24.99 + }, + { + "itemId": "PROD-009", + "productName": "Coffee Maker", + "category": "home", + "price": 79.99 + }, + { + "itemId": "PROD-010", + "productName": "Noise-Canceling Headphones", + "category": "electronics", + "price": 249.99 + }, + { + "itemId": "PROD-011", + "productName": "Wool Sweater", + "category": "clothing", + "price": 64.99 + }, + { + "itemId": "PROD-012", + "productName": "Standing Desk", + "category": "home", + "price": 399.99 + }, + { + "itemId": "PROD-013", + "productName": "Phone Case", + "category": "electronics", + "price": 12.99 + }, + { + "itemId": "PROD-014", + "productName": "Water Bottle", + "category": "fitness", + "price": 19.99 + }, + { + "itemId": "PROD-015", + "productName": "LED Monitor", + "category": "electronics", + "price": 199.99 + }, + { + "itemId": "PROD-016", + "productName": "Denim Jacket", + "category": "clothing", + "price": 74.99 + }, + { + "itemId": "PROD-017", + "productName": "Air Purifier", + "category": "home", + "price": 149.99 + }, + { + "itemId": "PROD-018", + "productName": "Resistance Bands", + "category": "fitness", + "price": 15.99 + }, + { + "itemId": "PROD-019", + "productName": "Webcam", + "category": "electronics", + "price": 69.99 + }, + { + "itemId": "PROD-020", + "productName": "Throw Pillow Set", + "category": "home", + "price": 39.99 + }, + { + "itemId": "PROD-021", + "productName": "Laptop Stand", + "category": "electronics", + "price": 44.99 + }, + { + "itemId": "PROD-022", + "productName": "Winter Coat", + "category": "clothing", + "price": 189.99 + }, + { + "itemId": "PROD-023", + "productName": "Blender", + "category": "home", + "price": 59.99 + }, + { + "itemId": "PROD-024", + "productName": "Foam Roller", + "category": "fitness", + "price": 22.99 + }, + { + "itemId": "PROD-025", + "productName": "4K Monitor", + "category": "electronics", + "price": 499.99 + }, + { + "itemId": "PROD-026", + "productName": "Linen Shirt", + "category": "clothing", + "price": 44.99 + }, + { + "itemId": "PROD-027", + "productName": "Robot Vacuum", + "category": "home", + "price": 299.99 + }, + { + "itemId": "PROD-028", + "productName": "Jump Rope", + "category": "fitness", + "price": 11.99 + }, + { + "itemId": "PROD-029", + "productName": "Portable Charger", + "category": "electronics", + "price": 34.99 + }, + { + "itemId": "PROD-030", + "productName": "Rain Jacket", + "category": "clothing", + "price": 99.99 + }, + { + "itemId": "PROD-031", + "productName": "Toaster Oven", + "category": "home", + "price": 69.99 + }, + { + "itemId": "PROD-032", + "productName": "Dumbbells Set", + "category": "fitness", + "price": 149.99 + }, + { + "itemId": "PROD-033", + "productName": "Wireless Earbuds", + "category": "electronics", + "price": 79.99 + }, + { + "itemId": "PROD-034", + "productName": "Hiking Boots", + "category": "clothing", + "price": 129.99 + }, + { + "itemId": "PROD-035", + "productName": "Smart Thermostat", + "category": "home", + "price": 179.99 + }, + { + "itemId": "PROD-036", + "productName": "Kettlebell", + "category": "fitness", + "price": 39.99 + }, + { + "itemId": "PROD-037", + "productName": "USB Microphone", + "category": "electronics", + "price": 89.99 + }, + { + "itemId": "PROD-038", + "productName": "Silk Scarf", + "category": "clothing", + "price": 54.99 + }, + { + "itemId": "PROD-039", + "productName": "Vacuum Sealer", + "category": "home", + "price": 64.99 + }, + { + "itemId": "PROD-040", + "productName": "Exercise Ball", + "category": "fitness", + "price": 24.99 + }, + { + "itemId": "PROD-041", + "productName": "Graphics Tablet", + "category": "electronics", + "price": 219.99 + }, + { + "itemId": "PROD-042", + "productName": "Leather Belt", + "category": "clothing", + "price": 34.99 + }, + { + "itemId": "PROD-043", + "productName": "Instant Pot", + "category": "home", + "price": 89.99 + }, + { + "itemId": "PROD-044", + "productName": "Pull-Up Bar", + "category": "fitness", + "price": 29.99 + }, + { + "itemId": "PROD-045", + "productName": "External SSD", + "category": "electronics", + "price": 109.99 + }, + { + "itemId": "PROD-046", + "productName": "Cashmere Gloves", + "category": "clothing", + "price": 59.99 + }, + { + "itemId": "PROD-047", + "productName": "Electric Kettle", + "category": "home", + "price": 44.99 + }, + { + "itemId": "PROD-048", + "productName": "Ab Roller", + "category": "fitness", + "price": 17.99 + }, + { + "itemId": "PROD-049", + "productName": "Streaming Webcam", + "category": "electronics", + "price": 159.99 + }, + { + "itemId": "PROD-050", + "productName": "Cotton Socks Pack", + "category": "clothing", + "price": 9.99 + } +] \ No newline at end of file diff --git a/cdk-sfn-dmap-df/example-pattern.json b/cdk-sfn-dmap-df/example-pattern.json new file mode 100644 index 000000000..b9fccb73b --- /dev/null +++ b/cdk-sfn-dmap-df/example-pattern.json @@ -0,0 +1,68 @@ +{ + "title": "AWS Step Functions Distributed Map with Lambda durable functions", + "description": "Fan out across Amazon S3 items using Distributed Map, invoking a Lambda durable function per item with per-item idempotency.", + "language": "TypeScript", + "level": "300", + "framework": "AWS CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern uses AWS Step Functions Distributed Map to read items from an Amazon S3 JSON file and fan out processing across Lambda durable functions.", + "Each item is processed by a durable function invoked via the AWS SDK service integration (CallAwsService targeting lambda:invoke).", + "The DurableExecutionName parameter is derived from each item's ID, providing per-item idempotency across re-runs.", + "The durable function executes a three-step workflow: validate the item, wait for rate limiting, and update the catalog entry." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/cdk-sfn-dmap-df", + "templateURL": "serverless-patterns/cdk-sfn-dmap-df", + "projectFolder": "cdk-sfn-dmap-df", + "templateFile": "lib/cdk-sfn-dmap-df-stack.ts" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS Step Functions Distributed Map", + "link": "https://docs.aws.amazon.com/step-functions/latest/dg/concepts-asl-use-map-state-distributed.html" + }, + { + "text": "Lambda durable functions Documentation", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "AWS SDK Service Integration for Step Functions", + "link": "https://docs.aws.amazon.com/step-functions/latest/dg/supported-services-awssdk.html" + } + ] + }, + "deploy": { + "text": [ + "Clone the repository: git clone https://github.com/aws-samples/serverless-patterns", + "Change directory: cd cdk-sfn-dmap-df", + "Install dependencies: npm install", + "Run tests: npm test", + "Deploy the CDK stack: cdk deploy" + ] + }, + "testing": { + "text": [ + "Run local tests: npm test — validates CDK stack assertions and runs the durable function locally using LocalDurableTestRunner (no AWS deployment needed).", + "Start the state machine: aws stepfunctions start-execution --state-machine-arn <StateMachineArn from stack output> --name \"catalog-update-$(date +%s)\"", + "Monitor the execution in the AWS Step Functions console to see all 50 child executions.", + "Re-run with the same input to demonstrate idempotency — durable functions return cached results." + ] + }, + "cleanup": { + "text": ["Delete the stack: cdk destroy"] + }, + "authors": [ + { + "name": "Marco Jahn", + "image": "https://sessionize.com/image/e99b-400o400o2-pqR4BacUSzHrq4fgZ4wwEQ.png", + "bio": "Senior Solutions Architect, Amazon Web Services", + "linkedin": "marcojahn" + } + ] +} diff --git a/cdk-sfn-dmap-df/jest.config.js b/cdk-sfn-dmap-df/jest.config.js new file mode 100644 index 000000000..af8b12534 --- /dev/null +++ b/cdk-sfn-dmap-df/jest.config.js @@ -0,0 +1,9 @@ +module.exports = { + testEnvironment: 'node', + roots: ['/test', '/lib'], + testMatch: ['**/*.test.ts'], + transform: { + '^.+\\.tsx?$': 'ts-jest' + }, + setupFilesAfterEnv: ['aws-cdk-lib/testhelpers/jest-autoclean'], +}; diff --git a/cdk-sfn-dmap-df/lib/cdk-sfn-dmap-df-stack.ts b/cdk-sfn-dmap-df/lib/cdk-sfn-dmap-df-stack.ts new file mode 100644 index 000000000..19d6f231d --- /dev/null +++ b/cdk-sfn-dmap-df/lib/cdk-sfn-dmap-df-stack.ts @@ -0,0 +1,160 @@ +import * as cdk from 'aws-cdk-lib/core'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import * as nodejs from 'aws-cdk-lib/aws-lambda-nodejs'; +import * as logs from 'aws-cdk-lib/aws-logs'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as s3 from 'aws-cdk-lib/aws-s3'; +import * as s3deploy from 'aws-cdk-lib/aws-s3-deployment'; +import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; +import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks'; +import { Construct } from 'constructs'; +import * as path from 'path'; + +export class CdkSfnDmapDfStack extends cdk.Stack { + constructor(scope: Construct, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + // ===== S3 Bucket for input data ===== + const dataBucket = new s3.Bucket(this, 'CatalogDataBucket', { + bucketName: `sfn-dmap-df-catalog-${cdk.Aws.ACCOUNT_ID}-${cdk.Aws.REGION}`, + removalPolicy: cdk.RemovalPolicy.DESTROY, + autoDeleteObjects: true, + }); + + // Deploy items.json to the bucket + new s3deploy.BucketDeployment(this, 'DeployCatalogData', { + sources: [s3deploy.Source.asset(path.join(__dirname, '..', 'data'))], + destinationBucket: dataBucket, + }); + + // ===== Durable Lambda Function ===== + + const functionLogGroup = new logs.LogGroup(this, 'ItemProcessorLogGroup', { + logGroupName: '/aws/lambda/catalog-item-processor', + retention: logs.RetentionDays.ONE_WEEK, + removalPolicy: cdk.RemovalPolicy.DESTROY, + }); + + const itemProcessor = new nodejs.NodejsFunction(this, 'ItemProcessorFunction', { + functionName: 'catalog-item-processor', + description: 'Durable function that validates and updates a single product catalog item', + runtime: lambda.Runtime.NODEJS_22_X, + handler: 'handler', + entry: path.join(__dirname, 'lambda', 'item-processor.ts'), + timeout: cdk.Duration.minutes(1), + memorySize: 256, + logGroup: functionLogGroup, + durableConfig: { + executionTimeout: cdk.Duration.minutes(15), + retentionPeriod: cdk.Duration.days(1), + }, + bundling: { + minify: true, + sourceMap: true, + externalModules: [], + }, + environment: { + NODE_OPTIONS: '--enable-source-maps', + }, + }); + + // ===== Step Functions State Machine ===== + + // Use the AWS SDK service integration (CallAwsService) to invoke the + // durable function. The optimized Lambda integration does not expose the + // DurableExecutionName parameter, so we drop down to the raw + // lambda:invoke API call which gives us full control over all parameters. + const invokeDurableFunction = new tasks.CallAwsService(this, 'InvokeDurableFunction', { + service: 'lambda', + action: 'invoke', + iamAction: 'lambda:InvokeFunction', + parameters: { + // Qualified name required for durable invocations + 'FunctionName': `${itemProcessor.functionArn}:$LATEST`, + 'InvocationType': 'RequestResponse', + // Derive a stable execution name from the item ID for idempotency + 'DurableExecutionName.$': "States.Format('dmap-item-{}', $.itemId)", + 'Payload.$': '$', + }, + iamResources: [ + `${itemProcessor.functionArn}:$LATEST`, + itemProcessor.functionArn, + ], + // Extract just the parsed response payload + resultSelector: { + 'result.$': 'States.StringToJson($.Payload)', + }, + resultPath: '$.processingResult', + }); + + // Distributed Map reads items from S3 and fans out to the invoke task + const distributedMap = new sfn.DistributedMap(this, 'ProcessCatalogItems', { + comment: 'Fan out across 50 product items from S3, invoking a durable function per item', + maxConcurrency: 10, + itemReader: new sfn.S3JsonItemReader({ + bucket: dataBucket, + key: 'items.json', + }), + resultWriterV2: new sfn.ResultWriterV2({ + bucket: dataBucket, + prefix: 'results', + }), + }); + + distributedMap.itemProcessor(invokeDurableFunction, { + mode: sfn.ProcessorMode.DISTRIBUTED, + executionType: sfn.ProcessorType.STANDARD, + }); + + // State machine log group + const stateMachineLogGroup = new logs.LogGroup(this, 'StateMachineLogGroup', { + logGroupName: '/aws/vendedlogs/states/catalog-update-sm', + retention: logs.RetentionDays.ONE_WEEK, + removalPolicy: cdk.RemovalPolicy.DESTROY, + }); + + const stateMachine = new sfn.StateMachine(this, 'CatalogUpdateStateMachine', { + stateMachineName: 'catalog-update-distributed-map', + definitionBody: sfn.DefinitionBody.fromChainable(distributedMap), + timeout: cdk.Duration.hours(1), + logs: { + destination: stateMachineLogGroup, + level: sfn.LogLevel.ALL, + includeExecutionData: true, + }, + }); + + // Grant the state machine permission to read from and write to the S3 bucket + dataBucket.grantRead(stateMachine); + dataBucket.grantWrite(stateMachine); + + // Grant the state machine permission to start distributed map child executions + stateMachine.addToRolePolicy(new iam.PolicyStatement({ + actions: ['states:StartExecution'], + resources: [`arn:aws:states:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:stateMachine:catalog-update-distributed-map`], + })); + + // Grant the state machine the ability to describe and stop child executions + stateMachine.addToRolePolicy(new iam.PolicyStatement({ + actions: ['states:DescribeExecution', 'states:StopExecution'], + resources: [`arn:aws:states:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:execution:catalog-update-distributed-map/*`], + })); + + // ===== Stack Outputs ===== + + new cdk.CfnOutput(this, 'StateMachineArn', { + value: stateMachine.stateMachineArn, + description: 'ARN of the catalog update state machine', + }); + + new cdk.CfnOutput(this, 'ItemProcessorFunctionArn', { + value: itemProcessor.functionArn, + description: 'ARN of the durable item processor function', + }); + + new cdk.CfnOutput(this, 'DataBucketName', { + value: dataBucket.bucketName, + description: 'S3 bucket containing the product catalog data', + }); + } +} diff --git a/cdk-sfn-dmap-df/lib/lambda/item-processor.test.ts b/cdk-sfn-dmap-df/lib/lambda/item-processor.test.ts new file mode 100644 index 000000000..2616b1127 --- /dev/null +++ b/cdk-sfn-dmap-df/lib/lambda/item-processor.test.ts @@ -0,0 +1,92 @@ +import { LocalDurableTestRunner, OperationStatus, OperationType } from '@aws/durable-execution-sdk-js-testing'; +import { handler } from './item-processor'; + +describe('Product Catalog Item Processor', () => { + beforeAll(() => LocalDurableTestRunner.setupTestEnvironment({ skipTime: true })); + afterAll(() => LocalDurableTestRunner.teardownTestEnvironment()); + + it('should validate, wait, and update a product item', async () => { + const runner = new LocalDurableTestRunner({ + handlerFunction: handler, + }); + + const execution = await runner.run({ + payload: { + itemId: 'PROD-001', + productName: 'Wireless Mouse', + category: 'electronics', + price: 29.99, + }, + }); + + // Verify invocations (initial + replay after wait) + expect(execution.getInvocations().length).toBe(2); + + // Verify final result + const result = execution.getResult() as any; + expect(result.itemId).toBe('PROD-001'); + expect(result.productName).toBe('Wireless Mouse'); + expect(result.category).toBe('electronics'); + expect(result.price).toBe(29.99); + expect(result.priceTier).toBe('standard'); + expect(result.status).toBe('completed'); + expect(result.validatedAt).toBeDefined(); + expect(result.processedAt).toBeDefined(); + + // Verify 3 operations were executed + expect(execution.getOperations()).toHaveLength(3); + + // Verify validate step + const validateStep = runner.getOperation('validate-item'); + expect(validateStep.getType()).toBe(OperationType.STEP); + expect(validateStep.getStatus()).toBe(OperationStatus.SUCCEEDED); + + // Verify wait operation + const waitOp = runner.getOperation('rate-limit-delay'); + expect(waitOp.getType()).toBe(OperationType.WAIT); + expect(waitOp.getStatus()).toBe(OperationStatus.SUCCEEDED); + + // Verify update step + const updateStep = runner.getOperation('update-catalog'); + expect(updateStep.getType()).toBe(OperationType.STEP); + expect(updateStep.getStatus()).toBe(OperationStatus.SUCCEEDED); + }); + + it('should assign budget tier for low-price items', async () => { + const runner = new LocalDurableTestRunner({ + handlerFunction: handler, + }); + + const execution = await runner.run({ + payload: { + itemId: 'PROD-050', + productName: 'Cotton Socks', + category: 'clothing', + price: 9.99, + }, + }); + + const result = execution.getResult() as any; + expect(result.priceTier).toBe('budget'); + expect(result.status).toBe('completed'); + }); + + it('should assign premium tier for high-price items', async () => { + const runner = new LocalDurableTestRunner({ + handlerFunction: handler, + }); + + const execution = await runner.run({ + payload: { + itemId: 'PROD-025', + productName: '4K Monitor', + category: 'electronics', + price: 499.99, + }, + }); + + const result = execution.getResult() as any; + expect(result.priceTier).toBe('premium'); + expect(result.status).toBe('completed'); + }); +}); diff --git a/cdk-sfn-dmap-df/lib/lambda/item-processor.ts b/cdk-sfn-dmap-df/lib/lambda/item-processor.ts new file mode 100644 index 000000000..57df7539d --- /dev/null +++ b/cdk-sfn-dmap-df/lib/lambda/item-processor.ts @@ -0,0 +1,99 @@ +import { + DurableContext, + withDurableExecution, +} from "@aws/durable-execution-sdk-js"; + +interface ProductItem { + itemId: string; + productName: string; + category: string; + price: number; +} + +interface ProcessedProduct { + itemId: string; + productName: string; + category: string; + price: number; + priceTier: string; + validatedAt: string; + status: string; + processedAt: string; +} + +function computePriceTier(price: number): string { + if (price < 25) return "budget"; + if (price < 100) return "standard"; + return "premium"; +} + +export const handler = withDurableExecution( + async ( + event: ProductItem, + context: DurableContext, + ): Promise => { + context.logger.info("Starting product catalog update", { + itemId: event.itemId, + productName: event.productName, + }); + + // Note: No try/catch here by design. Errors thrown inside steps are + // checkpointed by the durable SDK — the execution is marked as failed + // and won't re-execute on replay. The Step Functions Distributed Map + // handles failed items at the orchestration level (visible in map results). + + // Step 1: Validate the product record and enrich with pricing tier + const validated = await context.step("validate-item", async (stepCtx) => { + if (!event.itemId || !event.productName || !event.category) { + throw new Error(`Missing required fields for item ${event.itemId}`); + } + if (event.price <= 0) { + throw new Error( + `Invalid price ${event.price} for item ${event.itemId}`, + ); + } + + const priceTier = computePriceTier(event.price); + const validatedAt = new Date().toISOString(); + + stepCtx.logger.info("Item validated", { + itemId: event.itemId, + priceTier, + validatedAt, + }); + + return { priceTier, validatedAt }; + }); + + // Wait: Simulate downstream rate limiting / backpressure + await context.wait("rate-limit-delay", { seconds: 5 }); + + // Step 2: Update the catalog entry + const result = await context.step("update-catalog", async (stepCtx) => { + const processedAt = new Date().toISOString(); + + stepCtx.logger.info("Catalog entry updated", { + itemId: event.itemId, + processedAt, + }); + + return { + itemId: event.itemId, + productName: event.productName, + category: event.category, + price: event.price, + priceTier: validated.priceTier, + validatedAt: validated.validatedAt, + status: "completed", + processedAt, + }; + }); + + context.logger.info("Product catalog update finished", { + itemId: result.itemId, + status: result.status, + }); + + return result; + }, +); diff --git a/cdk-sfn-dmap-df/package.json b/cdk-sfn-dmap-df/package.json new file mode 100644 index 000000000..ecb4dcf33 --- /dev/null +++ b/cdk-sfn-dmap-df/package.json @@ -0,0 +1,29 @@ +{ + "name": "cdk-sfn-dmap-df", + "version": "0.1.0", + "bin": { + "cdk-sfn-dmap-df": "bin/cdk-sfn-dmap-df.js" + }, + "scripts": { + "build": "tsc", + "watch": "tsc -w", + "test": "jest", + "cdk": "cdk" + }, + "devDependencies": { + "@aws/durable-execution-sdk-js-testing": "^1.0.0", + "@types/jest": "^30", + "@types/node": "^24.10.1", + "aws-cdk": "2.1108.0", + "esbuild": "^0.27.4", + "jest": "^30", + "ts-jest": "^29", + "ts-node": "^10.9.2", + "typescript": "~5.9.3" + }, + "dependencies": { + "@aws/durable-execution-sdk-js": "^1.0.0", + "aws-cdk-lib": "^2.240.0", + "constructs": "^10.5.0" + } +} diff --git a/cdk-sfn-dmap-df/test/cdk-sfn-dmap-df.test.ts b/cdk-sfn-dmap-df/test/cdk-sfn-dmap-df.test.ts new file mode 100644 index 000000000..ff71c454d --- /dev/null +++ b/cdk-sfn-dmap-df/test/cdk-sfn-dmap-df.test.ts @@ -0,0 +1,36 @@ +import * as cdk from 'aws-cdk-lib/core'; +import { Template } from 'aws-cdk-lib/assertions'; +import { CdkSfnDmapDfStack } from '../lib/cdk-sfn-dmap-df-stack'; + +describe('CdkSfnDmapDfStack', () => { + const app = new cdk.App(); + const stack = new CdkSfnDmapDfStack(app, 'TestStack'); + const template = Template.fromStack(stack); + + it('should create a durable Lambda function', () => { + template.hasResourceProperties('AWS::Lambda::Function', { + FunctionName: 'catalog-item-processor', + Runtime: 'nodejs22.x', + }); + }); + + it('should create a Step Functions state machine', () => { + template.hasResourceProperties('AWS::StepFunctions::StateMachine', { + StateMachineName: 'catalog-update-distributed-map', + }); + }); + + it('should create an S3 bucket for catalog data', () => { + template.hasResource('AWS::S3::Bucket', {}); + }); + + it('should configure the durable function with durableConfig', () => { + template.hasResourceProperties('AWS::Lambda::Function', { + FunctionName: 'catalog-item-processor', + DurableConfig: { + ExecutionTimeout: 900, + RetentionPeriodInDays: 1, + }, + }); + }); +}); diff --git a/cdk-sfn-dmap-df/tsconfig.json b/cdk-sfn-dmap-df/tsconfig.json new file mode 100644 index 000000000..bfc61bf83 --- /dev/null +++ b/cdk-sfn-dmap-df/tsconfig.json @@ -0,0 +1,32 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "lib": [ + "es2022" + ], + "declaration": true, + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "noImplicitThis": true, + "alwaysStrict": true, + "noUnusedLocals": false, + "noUnusedParameters": false, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": false, + "inlineSourceMap": true, + "inlineSources": true, + "experimentalDecorators": true, + "strictPropertyInitialization": false, + "skipLibCheck": true, + "typeRoots": [ + "./node_modules/@types" + ] + }, + "exclude": [ + "node_modules", + "cdk.out" + ] +}