diff --git a/appsync-events-lambda-agentcore-cdk/.gitignore b/appsync-events-lambda-agentcore-cdk/.gitignore new file mode 100644 index 000000000..84778f1de --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/.gitignore @@ -0,0 +1,15 @@ +*.swp +package-lock.json +__pycache__ +.pytest_cache +.venv +*.egg-info +.DS_Store + +# CDK asset staging directory +.cdk.staging +cdk.out + +# Dev tooling +.kiro +mise.local.toml \ No newline at end of file diff --git a/appsync-events-lambda-agentcore-cdk/.pylintrc b/appsync-events-lambda-agentcore-cdk/.pylintrc new file mode 100644 index 000000000..d88938473 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/.pylintrc @@ -0,0 +1,2 @@ +[format] +max-line-length=150 diff --git a/appsync-events-lambda-agentcore-cdk/README.md b/appsync-events-lambda-agentcore-cdk/README.md new file mode 100644 index 000000000..0059c4273 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/README.md @@ -0,0 +1,163 @@ +# AppSync Events to Lambda to Bedrock AgentCore + +This pattern deploys a real-time streaming chat service using AWS AppSync Events with Lambda to invoke a Strands agent running on Amazon Bedrock AgentCore Runtime. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/appsync-events-lambda-agentcore-cdk + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI installed and configured](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) +* [Git installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [Python 3.14](https://www.python.org/downloads/) with [pip](https://pip.pypa.io/en/stable/installation/) +* [Node.js 22](https://nodejs.org/en/download/) +* [AWS CDK v2](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) (`npm install -g aws-cdk`) +* [Finch](https://runfinch.com/) or [Docker](https://docs.docker.com/get-docker/) (used for CDK bundling) + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd appsync-events-lambda-agentcore-cdk + ``` +1. Create and activate a Python virtual environment: + ``` + python -m venv .venv + source .venv/bin/activate # On Windows: .venv\Scripts\activate + ``` +1. Install Python dependencies: + ``` + pip install -r requirements.txt + ``` +1. Set your target AWS region: + ``` + export AWS_REGION=eu-west-1 # On Windows: set AWS_REGION=eu-west-1 + ``` +1. If you are using [Finch](https://runfinch.com/) instead of Docker, set the `CDK_DOCKER` environment variable: + ``` + export CDK_DOCKER=finch # On Windows: set CDK_DOCKER=finch + ``` +1. Bootstrap CDK in your account/region (if not already done): + ``` + cdk bootstrap + ``` +1. Deploy the stack: + ``` + cdk deploy + ``` +1. Note the outputs from the CDK deployment process. These contain the AppSync Events HTTP endpoint, WebSocket endpoint, and API key needed for testing. + +## How it works + +![Architecture diagram](images/architecture.png) + +Figure 1 - Architecture + +1. The client publishes a message to the inbound channel (`/chat/{conversationId}`) via HTTP POST to AppSync Events. +2. AppSync Events triggers the agent invoker Lambda via direct Lambda integration. +3. The agent invoker validates the payload, invokes the stream relay Lambda asynchronously, and returns immediately. This two-Lambda split is necessary because AppSync invokes the handler synchronously — a long-running stream would block the response. +4. The stream relay calls `invoke_agent_runtime` on the Bedrock AgentCore Runtime, which hosts a Strands agent container, and consumes the Server-Sent Events (SSE) stream. +5. The stream relay publishes each chunk back to the response channel on AppSync Events (`/responses/chat/{conversationId}`). +6. The client receives agent response tokens in real time via the WebSocket subscription. + +The client subscribes to the response channel before publishing. Separate channel namespaces (`chat` for inbound, `responses` for outbound) ensure the stream relay's publishes do not re-trigger the agent invoker. + +The agent is a Strands-based research assistant with access to `http_request`, `calculator`, and `current_time` tools, backed by S3 session persistence for multi-turn conversations. + +## Testing + +### Automated tests + +Install the test dependencies: + +```bash +pip install -r requirements-dev.txt +``` + +Run the tests: + +```bash +pytest tests/unit -v # unit tests (no deployed stack needed) +pytest tests/integration -v -s # integration tests with streaming output +``` + +### Using the AppSync Pub/Sub Editor + +You can test the deployed service directly from the AWS Console using the AppSync Events built-in Pub/Sub Editor. No additional tooling required. + +1. Open the [AWS AppSync console](https://console.aws.amazon.com/appsync/) in the region you deployed to (e.g. `eu-west-1`). +1. Select the Event API created by the stack (look for the API with "EventApi" in the name). +1. Click the **Pub/Sub Editor** tab. +1. Scroll to the bottom of the page. The API key is pre-populated in the authorization token field. Click **Connect** to establish a WebSocket connection. +1. In the **Subscribe** panel, select `responses` from the namespace dropdown, then enter the path: + ``` + /chat/test-conversation-1 + ``` +1. Click **Subscribe**. + + ![AppSync Pub/Sub Editor — Subscribe panel](images/appsync-pubsub-subscribe.jpg) + + Figure 2 - AppSync Pub/Sub Editor - Subscribe panel + +1. Scroll back to the top of the page to the **Publish** panel. Select `chat` from the namespace dropdown, then enter the path: + ``` + /test-conversation-1 + ``` + Enter this JSON as the event payload: + ```json + [ + { + "message": "What is 347 multiplied by 29?", + "sessionId": "test-conversation-1" + } + ] + ``` + Click **Publish**. When prompted, choose **WebSocket** as the publish method. + + ![AppSync Pub/Sub Editor — Publish panel](images/appsync-pubsub-publish.jpg) + + Figure 3 - AppSync Pub/Sub Editor - Publish panel + +1. Scroll back down to the bottom of the page to watch the subscription panel — you should see streaming chunk events arrive in real time, followed by a final completion event containing the full response. + + ![AppSync Pub/Sub Editor — Subscribe results](images/appsync-pubsub-subscribe-result.jpg) + + Figure 4 - AppSync Pub/Sub Editor - Subscribe results + + +A few things to note: + +- The `sessionId` value ties messages to a conversation. Use the same `sessionId` across publishes to test multi-turn conversation with session persistence. +- The subscribe channel must be prefixed with `/responses` — the agent invoker publishes responses to `/responses/chat/{conversationId}` to avoid re-triggering itself. +- You can try different prompts to exercise the agent's tools: ask it to fetch a URL (`http_request`), do arithmetic (`calculator`), or tell you the current time (`current_time`). + +## Authentication + +This example uses an API key for authentication to keep things simple. API keys are suitable for development and testing but are not recommended for production workloads. + +AppSync Events supports several authentication methods that are better suited for production: + +- **Amazon Cognito user pools** — ideal for end-user authentication in web and mobile apps. +- **AWS IAM** — best for server-to-server or backend service communication. +- **OpenID Connect (OIDC)** — use with third-party identity providers. +- **Lambda authorizers** — for custom authorization logic. + +You can configure multiple authorization modes on a single API and apply different modes per channel namespace. See the [AppSync Events authorization and authentication](https://docs.aws.amazon.com/appsync/latest/eventapi/configure-event-api-auth.html) documentation for details. + +## Cleanup + +1. Delete the stack + ``` + cdk destroy + ``` + +---- +Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/appsync-events-lambda-agentcore-cdk/agents/chat/Dockerfile b/appsync-events-lambda-agentcore-cdk/agents/chat/Dockerfile new file mode 100644 index 000000000..b44708318 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/agents/chat/Dockerfile @@ -0,0 +1,20 @@ +FROM ghcr.io/astral-sh/uv:python3.14-bookworm-slim + +WORKDIR /app + +ENV UV_SYSTEM_PYTHON=1 UV_COMPILE_BYTECODE=1 + +COPY chat/requirements.txt requirements.txt +RUN uv pip install --system --no-cache -r requirements.txt + +ARG AWS_REGION +ENV AWS_REGION=${AWS_REGION} + +RUN useradd -m -u 1000 bedrock_agentcore +USER bedrock_agentcore + +EXPOSE 8080 + +COPY chat/ /app + +CMD ["opentelemetry-instrument", "python", "-m", "entrypoint"] diff --git a/appsync-events-lambda-agentcore-cdk/agents/chat/entrypoint.py b/appsync-events-lambda-agentcore-cdk/agents/chat/entrypoint.py new file mode 100644 index 000000000..129e73d0f --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/agents/chat/entrypoint.py @@ -0,0 +1,96 @@ +"""Chat agent entrypoint for AgentCore runtime. + +Pure streaming agent with S3-backed session persistence. +Yields response chunks via SSE. Has no knowledge of delivery +mechanism (AppSync, WebSocket, etc.). +""" + +import os +import logging + +from strands import Agent +from strands.models import BedrockModel +from strands.session.s3_session_manager import S3SessionManager +from strands_tools import http_request, calculator, current_time +from bedrock_agentcore.runtime import BedrockAgentCoreApp + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +app = BedrockAgentCoreApp() + +MODEL_ID = os.environ.get("BEDROCK_MODEL_ID") +if not MODEL_ID: + raise ValueError("BEDROCK_MODEL_ID environment variable is required") + +REGION = os.environ.get("AWS_REGION") +if not REGION: + raise ValueError("AWS_REGION environment variable is required") +SESSION_BUCKET = os.environ.get("SESSION_BUCKET") + +SYSTEM_PROMPT = """\ +You are a research assistant with access to the web, a calculator, and a clock. + +You can: +- Fetch and summarise content from any public URL using http_request +- Perform mathematical calculations using calculator +- Check the current date and time in any timezone using current_time + +When fetching web content, prefer converting HTML to markdown for readability +by setting convert_to_markdown=true. Always cite the URL you fetched. +Keep responses clear and concise. +""" + + +def _create_agent(session_id: str | None = None) -> Agent: + """Create a Strands agent with Bedrock model and optional session.""" + model = BedrockModel(model_id=MODEL_ID, region_name=REGION) + + kwargs = { + "system_prompt": SYSTEM_PROMPT, + "model": model, + "tools": [http_request, calculator, current_time], + } + + if session_id and SESSION_BUCKET: + kwargs["session_manager"] = S3SessionManager( + session_id=session_id, + bucket=SESSION_BUCKET, + region_name=REGION, + ) + + return Agent(**kwargs) + + +@app.entrypoint +async def invoke(payload=None): + """Stream agent response as SSE events.""" + if not payload: + yield {"status": "error", "error": "payload is required"} + return + + query = payload.get("content") or payload.get("prompt") + if not query: + yield {"status": "error", "error": "content or prompt is required"} + return + + session_id = payload.get("sessionId") + logger.info("Processing query: %s (session: %s)", query[:100], session_id) + + agent = _create_agent(session_id) + + async for event in agent.stream_async(query): + if "data" in event: + yield {"data": event["data"]} + elif "result" in event: + result = event["result"] + yield { + "result": { + "stop_reason": str(result.stop_reason), + "message": result.message, + }, + } + + +if __name__ == "__main__": + app.run() diff --git a/appsync-events-lambda-agentcore-cdk/agents/chat/requirements.txt b/appsync-events-lambda-agentcore-cdk/agents/chat/requirements.txt new file mode 100644 index 000000000..f128c9318 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/agents/chat/requirements.txt @@ -0,0 +1,4 @@ +strands-agents>=1.29.0 +strands-agents-tools>=0.2.22 +bedrock-agentcore>=1.4.4 +aws-opentelemetry-distro>=0.15.0 \ No newline at end of file diff --git a/appsync-events-lambda-agentcore-cdk/app.py b/appsync-events-lambda-agentcore-cdk/app.py new file mode 100644 index 000000000..8cbd5e077 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/app.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +"""CDK app entrypoint for the AppSync Events + Lambda + AgentCore stack.""" +import os + +import aws_cdk as cdk +from cdk_nag import AwsSolutionsChecks + +from cdk.stack import ChatStack + + +app = cdk.App() + +stack_name = app.node.try_get_context("stack_name") or "AppsyncLambdaAgentcore" + +region = os.environ.get("AWS_REGION") +if not region: + raise EnvironmentError("AWS_REGION environment variable must be set") + +ChatStack( + app, + stack_name, + env=cdk.Environment(region=region), +) + +cdk.Aspects.of(app).add(AwsSolutionsChecks(verbose=True)) + +app.synth() diff --git a/appsync-events-lambda-agentcore-cdk/cdk.json b/appsync-events-lambda-agentcore-cdk/cdk.json new file mode 100644 index 000000000..0dc343d19 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/cdk.json @@ -0,0 +1,102 @@ +{ + "app": "python3 app.py", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "requirements*.txt", + "source.bat", + "**/__init__.py", + "**/__pycache__", + "tests" + ] + }, + "context": { + "model_id": "anthropic.claude-haiku-4-5-20251001-v1:0", + "@aws-cdk/aws-signer:signingProfileNamePassedToCfn": true, + "@aws-cdk/aws-ecs-patterns:secGroupsDisablesImplicitOpenListener": true, + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-kms:applyImportedAliasPermissionsToPrincipal": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true, + "@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true, + "@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true, + "@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true, + "@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true, + "@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true, + "@aws-cdk/aws-eks:nodegroupNameAttribute": true, + "@aws-cdk/aws-eks:useNativeOidcProvider": true, + "@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true, + "@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true, + "@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false, + "@aws-cdk/aws-s3:keepNotificationInImportedBucket": false, + "@aws-cdk/core:explicitStackTags": true, + "@aws-cdk/aws-ecs:reduceEc2FargateCloudWatchPermissions": true, + "@aws-cdk/aws-dynamodb:resourcePolicyPerReplica": true, + "@aws-cdk/aws-ec2:ec2SumTImeoutEnabled": true, + "@aws-cdk/aws-appsync:appSyncGraphQLAPIScopeLambdaPermission": true, + "@aws-cdk/aws-rds:setCorrectValueForDatabaseInstanceReadReplicaInstanceResourceId": true, + "@aws-cdk/core:cfnIncludeRejectComplexResourceUpdateCreatePolicyIntrinsics": true, + "@aws-cdk/aws-lambda-nodejs:sdkV3ExcludeSmithyPackages": true, + "@aws-cdk/aws-stepfunctions-tasks:fixRunEcsTaskPolicy": true, + "@aws-cdk/aws-ec2:bastionHostUseAmazonLinux2023ByDefault": true, + "@aws-cdk/aws-route53-targets:userPoolDomainNameMethodWithoutCustomResource": true, + "@aws-cdk/aws-elasticloadbalancingV2:albDualstackWithoutPublicIpv4SecurityGroupRulesDefault": true, + "@aws-cdk/aws-iam:oidcRejectUnauthorizedConnections": true, + "@aws-cdk/core:enableAdditionalMetadataCollection": true, + "@aws-cdk/aws-lambda:createNewPoliciesWithAddToRolePolicy": false, + "@aws-cdk/aws-s3:setUniqueReplicationRoleName": true, + "@aws-cdk/aws-events:requireEventBusPolicySid": true, + "@aws-cdk/core:aspectPrioritiesMutating": true, + "@aws-cdk/aws-dynamodb:retainTableReplica": true, + "@aws-cdk/aws-stepfunctions:useDistributedMapResultWriterV2": true, + "@aws-cdk/s3-notifications:addS3TrustKeyPolicyForSnsSubscriptions": true, + "@aws-cdk/aws-ec2:requirePrivateSubnetsForEgressOnlyInternetGateway": true, + "@aws-cdk/aws-s3:publicAccessBlockedByDefault": true, + "@aws-cdk/aws-lambda:useCdkManagedLogGroup": true, + "@aws-cdk/aws-elasticloadbalancingv2:networkLoadBalancerWithSecurityGroupByDefault": true, + "@aws-cdk/aws-ecs-patterns:uniqueTargetGroupId": true, + "@aws-cdk/aws-route53-patterns:useDistribution": true + } +} diff --git a/appsync-events-lambda-agentcore-cdk/cdk/__init__.py b/appsync-events-lambda-agentcore-cdk/cdk/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/appsync-events-lambda-agentcore-cdk/cdk/constructs/__init__.py b/appsync-events-lambda-agentcore-cdk/cdk/constructs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/appsync-events-lambda-agentcore-cdk/cdk/constructs/chat_agent.py b/appsync-events-lambda-agentcore-cdk/cdk/constructs/chat_agent.py new file mode 100644 index 000000000..877f42e14 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/cdk/constructs/chat_agent.py @@ -0,0 +1,263 @@ +"""CDK Construct for the chat agent — AgentCore runtime + S3 session bucket. + +Creates: +- S3 bucket for conversation session storage (Strands S3SessionManager) +- Docker image from agents/chat/ +- IAM role with Bedrock, ECR, CloudWatch, X-Ray, and S3 policies +- CfnRuntime with HTTP protocol and PUBLIC network +""" + +import json +from pathlib import Path + +from constructs import Construct +from aws_cdk import ( + CfnOutput, + RemovalPolicy, + Stack, + aws_bedrockagentcore as agentcore, + aws_ecr_assets as ecr_assets, + aws_iam as iam, + aws_s3 as s3, +) +from cdk_nag import NagSuppressions + +INFERENCE_PROFILES_FILE = Path(__file__).parent / "inference_profiles.json" + + +def resolve_inference_profile(model_id: str, region: str) -> str: + """Resolve a base model ID to its cross-region inference profile for the given region. + + Raises ValueError if the model or region is not found in the mapping. + """ + if not INFERENCE_PROFILES_FILE.exists(): + raise ValueError(f"Inference profiles mapping not found at {INFERENCE_PROFILES_FILE}. ") + + with open(INFERENCE_PROFILES_FILE, encoding="utf-8") as f: + mappings = json.load(f) + + if model_id not in mappings: + available = ", ".join(sorted(mappings.keys())) or "(none)" + raise ValueError(f"Model '{model_id}' not found in inference profiles mapping. Available models: {available}. ") + + region_map = mappings[model_id] + if region not in region_map: + available = ", ".join(sorted(region_map.keys())) + raise ValueError(f"Model '{model_id}' does not support cross-region inference in '{region}'. Supported regions: {available}. ") + + return region_map[region] + + +class ChatAgentConstruct(Construct): + """Creates the chat agent runtime with session persistence.""" + + def __init__( + self, + scope: Construct, + construct_id: str, + *, + model_id: str, + environment_variables: dict = None, + **kwargs, + ) -> None: + super().__init__(scope, construct_id, **kwargs) + + stack = Stack.of(self) + + # Resolve base model ID to cross-region inference profile + inference_profile_id = resolve_inference_profile(model_id, stack.region) + + self.session_bucket = s3.Bucket( + self, + "SessionBucket", + removal_policy=RemovalPolicy.DESTROY, + auto_delete_objects=True, + encryption=s3.BucketEncryption.S3_MANAGED, + enforce_ssl=True, + ) + + agent_image = ecr_assets.DockerImageAsset( + self, + "AgentImage", + directory="agents", + file="chat/Dockerfile", + platform=ecr_assets.Platform.LINUX_ARM64, + build_args={"AWS_REGION": stack.region}, + exclude=["**/__pycache__", "**/*.pyc"], + ) + + self.runtime_role = iam.Role( + self, + "RuntimeRole", + assumed_by=iam.ServicePrincipal( + "bedrock-agentcore.amazonaws.com", + ), + inline_policies=self._build_policies( + stack, + agent_image, + self.session_bucket, + ), + ) + + merged_env = { + "BEDROCK_MODEL_ID": inference_profile_id, + "AWS_REGION": stack.region, + "SESSION_BUCKET": self.session_bucket.bucket_name, + **(environment_variables or {}), + } + + # AgentCore Runtime — L1 construct (no L2 available yet) + runtime_name = f"{stack.stack_name.replace('-', '_')}_chat_agent" + self.runtime = agentcore.CfnRuntime( + self, + "Runtime", + agent_runtime_name=runtime_name, + role_arn=self.runtime_role.role_arn, + agent_runtime_artifact=agentcore.CfnRuntime.AgentRuntimeArtifactProperty( + container_configuration=agentcore.CfnRuntime.ContainerConfigurationProperty( + container_uri=agent_image.image_uri, + ), + ), + network_configuration=agentcore.CfnRuntime.NetworkConfigurationProperty( + network_mode="PUBLIC", + ), + protocol_configuration="HTTP", + environment_variables=merged_env, + description="Chat agent runtime with session persistence", + ) + + CfnOutput(self, "RuntimeArn", value=self.runtime.attr_agent_runtime_arn) + CfnOutput(self, "RuntimeName", value=runtime_name) + CfnOutput( + self, + "SessionBucketName", + value=self.session_bucket.bucket_name, + ) + + NagSuppressions.add_resource_suppressions( + self.session_bucket, + [ + { + "id": "AwsSolutions-S1", + "reason": "Access logs not required for sample code.", + } + ], + ) + + NagSuppressions.add_resource_suppressions( + self.runtime_role, + [ + { + "id": "AwsSolutions-IAM5", + "reason": "Bedrock foundation model and inference profile ARNs require wildcards — the model is resolved at synth time from inference_profiles.json.", + "applies_to": [ + "Resource::arn:aws:bedrock:*::foundation-model/*", + f"Resource::arn:aws:bedrock:{stack.region}::inference-profile/*", + ], + }, + { + "id": "AwsSolutions-IAM5", + "reason": ( + "ecr:GetAuthorizationToken requires Resource::* (account-wide token, cannot be scoped). " + "X-Ray actions (PutTraceSegments, PutTelemetryRecords, GetSamplingRules, GetSamplingTargets) " + "do not support resource-level permissions per AWS documentation." + ), + "applies_to": ["Resource::*"], + }, + { + "id": "AwsSolutions-IAM5", + "reason": "S3 session objects require wildcard suffix for read/write/delete operations.", + "applies_to": [ + "Resource::/*", + ], + }, + { + "id": "AwsSolutions-IAM5", + "reason": "AgentCore runtime log group name is determined at deploy time.", + "applies_to": [ + f"Resource::arn:aws:logs:{stack.region}::log-group:/aws/bedrock-agentcore/runtimes/*", + ], + }, + ], + ) + + @staticmethod + def _build_policies(stack, agent_image, session_bucket): + """Build IAM inline policies for the runtime.""" + return { + "Bedrock": iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=[ + "bedrock:InvokeModel", + "bedrock:InvokeModelWithResponseStream", + ], + resources=[ + "arn:aws:bedrock:*::foundation-model/*", + f"arn:aws:bedrock:{stack.region}:{stack.account}:inference-profile/*", + ], + ), + ], + ), + "ECR": iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=["ecr:GetAuthorizationToken"], + resources=["*"], + ), + iam.PolicyStatement( + actions=[ + "ecr:BatchGetImage", + "ecr:GetDownloadUrlForLayer", + "ecr:BatchCheckLayerAvailability", + ], + resources=[agent_image.repository.repository_arn], + ), + ], + ), + "S3Sessions": iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=[ + "s3:GetObject", + "s3:PutObject", + "s3:DeleteObject", + ], + resources=[f"{session_bucket.bucket_arn}/*"], + ), + iam.PolicyStatement( + actions=["s3:ListBucket"], + resources=[session_bucket.bucket_arn], + ), + ], + ), + "CloudWatchLogs": iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=[ + "logs:DescribeLogStreams", + "logs:CreateLogGroup", + "logs:DescribeLogGroups", + "logs:CreateLogStream", + "logs:PutLogEvents", + ], + resources=[ + f"arn:aws:logs:{stack.region}:{stack.account}:log-group:/aws/bedrock-agentcore/runtimes/*", + ], + ), + ], + ), + "XRay": iam.PolicyDocument( + statements=[ + iam.PolicyStatement( + actions=[ + "xray:PutTraceSegments", + "xray:PutTelemetryRecords", + "xray:GetSamplingRules", + "xray:GetSamplingTargets", + ], + resources=["*"], + ), + ], + ), + } diff --git a/appsync-events-lambda-agentcore-cdk/cdk/constructs/chat_service.py b/appsync-events-lambda-agentcore-cdk/cdk/constructs/chat_service.py new file mode 100644 index 000000000..271f17d2b --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/cdk/constructs/chat_service.py @@ -0,0 +1,129 @@ +"""CDK Construct for the AppSync Events API and its chat handler Lambda. + +Creates: +- AppSync Event API with API key auth +- Agent invoker Lambda (thin dispatcher) wired as a direct Lambda + integration on the ``chat`` channel namespace +- Responses namespace for outbound streaming (no handler) +""" + +from constructs import Construct +from aws_cdk import ( + CfnOutput, + Duration, + aws_appsync as appsync, + aws_lambda as lambda_, + aws_logs as logs, +) +from cdk_nag import NagSuppressions + +from cdk.constructs.standard_lambda import StandardLambda + + +class ChatServiceConstruct(Construct): + """AppSync Event API with an integrated chat handler Lambda.""" + + def __init__( + self, + scope: Construct, + construct_id: str, + *, + stream_relay_function: lambda_.IFunction, + **kwargs, + ) -> None: + super().__init__(scope, construct_id, **kwargs) + + api_key_provider = appsync.AppSyncAuthProvider( + authorization_type=appsync.AppSyncAuthorizationType.API_KEY, + ) + + self.api = appsync.EventApi( + self, + "EventApi", + api_name="ChatEventApi", + authorization_config=appsync.EventApiAuthConfig( + auth_providers=[api_key_provider], + connection_auth_mode_types=[ + appsync.AppSyncAuthorizationType.API_KEY, + ], + default_publish_auth_mode_types=[ + appsync.AppSyncAuthorizationType.API_KEY, + ], + default_subscribe_auth_mode_types=[ + appsync.AppSyncAuthorizationType.API_KEY, + ], + ), + log_config=appsync.AppSyncLogConfig( + field_log_level=appsync.AppSyncFieldLogLevel.INFO, + retention=logs.RetentionDays.ONE_WEEK, + ), + ) + + # Separate "responses" namespace for outbound streaming — avoids + # re-invocation loops where the Lambda's own publishes would + # re-trigger the invoker. + self.api.add_channel_namespace("responses") + + self.agent_invoker = StandardLambda( + self, + "AgentInvoker", + handler="index.handler", + code_path="functions/agent_invoker", + timeout=Duration.seconds(10), + environment={ + "STREAM_RELAY_ARN": stream_relay_function.function_arn, + }, + ) + + stream_relay_function.grant_invoke( + self.agent_invoker.function, + ) + + lambda_ds = self.api.add_lambda_data_source( + "AgentInvokerDS", + self.agent_invoker.function, + ) + + # "chat" namespace with direct Lambda integration — AppSync invokes + # the agent invoker Lambda synchronously on each publish. + self.api.add_channel_namespace( + "chat", + publish_handler_config=appsync.HandlerConfig( + data_source=lambda_ds, + direct=True, + lambda_invoke_type=(appsync.LambdaInvokeType.REQUEST_RESPONSE), + ), + ) + + CfnOutput( + self, + "EventApiHttpEndpoint", + value=self.api.http_dns, + ) + CfnOutput( + self, + "EventApiRealtimeEndpoint", + value=self.api.realtime_dns, + ) + # NOTE: The API key is output in plaintext for convenience in this + # sample. For production, store it in Secrets Manager or SSM + # SecureString and retrieve it at runtime instead. + CfnOutput( + self, + "EventApiApiKey", + value=self.api.api_keys["Default"].attr_api_key, + ) + + NagSuppressions.add_resource_suppressions( + self.api, + [ + { + "id": "AwsSolutions-IAM4", + "reason": ("AWSAppSyncPushToCloudWatchLogs is the standard " "managed policy for AppSync API logging."), + "applies_to": [ + "Policy::arn::iam::aws:policy/" "service-role/AWSAppSyncPushToCloudWatchLogs", + ], + }, + ], + apply_to_children=True, + ) diff --git a/appsync-events-lambda-agentcore-cdk/cdk/constructs/inference_profiles.json b/appsync-events-lambda-agentcore-cdk/cdk/constructs/inference_profiles.json new file mode 100644 index 000000000..aedde985d --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/cdk/constructs/inference_profiles.json @@ -0,0 +1,153 @@ +{ + "anthropic.claude-sonnet-4-20250514-v1:0": { + "ap-northeast-1": "apac.anthropic.claude-sonnet-4-20250514-v1:0", + "ap-northeast-2": "apac.anthropic.claude-sonnet-4-20250514-v1:0", + "ap-northeast-3": "apac.anthropic.claude-sonnet-4-20250514-v1:0", + "ap-south-1": "apac.anthropic.claude-sonnet-4-20250514-v1:0", + "ap-south-2": "apac.anthropic.claude-sonnet-4-20250514-v1:0", + "ap-southeast-1": "apac.anthropic.claude-sonnet-4-20250514-v1:0", + "ap-southeast-2": "apac.anthropic.claude-sonnet-4-20250514-v1:0", + "ap-southeast-4": "apac.anthropic.claude-sonnet-4-20250514-v1:0", + "eu-central-1": "eu.anthropic.claude-sonnet-4-20250514-v1:0", + "eu-north-1": "eu.anthropic.claude-sonnet-4-20250514-v1:0", + "eu-south-1": "eu.anthropic.claude-sonnet-4-20250514-v1:0", + "eu-south-2": "eu.anthropic.claude-sonnet-4-20250514-v1:0", + "eu-west-1": "eu.anthropic.claude-sonnet-4-20250514-v1:0", + "eu-west-3": "eu.anthropic.claude-sonnet-4-20250514-v1:0", + "us-east-1": "us.anthropic.claude-sonnet-4-20250514-v1:0", + "us-east-2": "us.anthropic.claude-sonnet-4-20250514-v1:0", + "us-west-1": "us.anthropic.claude-sonnet-4-20250514-v1:0", + "us-west-2": "us.anthropic.claude-sonnet-4-20250514-v1:0" + }, + "anthropic.claude-haiku-4-5-20251001-v1:0": { + "ap-northeast-1": "jp.anthropic.claude-haiku-4-5-20251001-v1:0", + "ap-northeast-3": "jp.anthropic.claude-haiku-4-5-20251001-v1:0", + "ap-southeast-2": "au.anthropic.claude-haiku-4-5-20251001-v1:0", + "ap-southeast-4": "au.anthropic.claude-haiku-4-5-20251001-v1:0", + "ca-central-1": "us.anthropic.claude-haiku-4-5-20251001-v1:0", + "eu-central-1": "eu.anthropic.claude-haiku-4-5-20251001-v1:0", + "eu-north-1": "eu.anthropic.claude-haiku-4-5-20251001-v1:0", + "eu-south-1": "eu.anthropic.claude-haiku-4-5-20251001-v1:0", + "eu-south-2": "eu.anthropic.claude-haiku-4-5-20251001-v1:0", + "eu-west-1": "eu.anthropic.claude-haiku-4-5-20251001-v1:0", + "eu-west-2": "eu.anthropic.claude-haiku-4-5-20251001-v1:0", + "eu-west-3": "eu.anthropic.claude-haiku-4-5-20251001-v1:0", + "us-east-1": "us.anthropic.claude-haiku-4-5-20251001-v1:0", + "us-east-2": "us.anthropic.claude-haiku-4-5-20251001-v1:0", + "us-west-1": "us.anthropic.claude-haiku-4-5-20251001-v1:0", + "us-west-2": "us.anthropic.claude-haiku-4-5-20251001-v1:0" + }, + "anthropic.claude-sonnet-4-6": { + "ap-northeast-1": "jp.anthropic.claude-sonnet-4-6", + "ap-northeast-3": "jp.anthropic.claude-sonnet-4-6", + "ap-southeast-2": "au.anthropic.claude-sonnet-4-6", + "ap-southeast-4": "au.anthropic.claude-sonnet-4-6", + "ca-central-1": "us.anthropic.claude-sonnet-4-6", + "eu-central-1": "eu.anthropic.claude-sonnet-4-6", + "eu-north-1": "eu.anthropic.claude-sonnet-4-6", + "eu-south-1": "eu.anthropic.claude-sonnet-4-6", + "eu-south-2": "eu.anthropic.claude-sonnet-4-6", + "eu-west-1": "eu.anthropic.claude-sonnet-4-6", + "eu-west-2": "eu.anthropic.claude-sonnet-4-6", + "eu-west-3": "eu.anthropic.claude-sonnet-4-6", + "us-east-1": "us.anthropic.claude-sonnet-4-6", + "us-east-2": "us.anthropic.claude-sonnet-4-6", + "us-west-1": "us.anthropic.claude-sonnet-4-6", + "us-west-2": "us.anthropic.claude-sonnet-4-6" + }, + "anthropic.claude-sonnet-4-5-20250929-v1:0": { + "ap-northeast-1": "jp.anthropic.claude-sonnet-4-5-20250929-v1:0", + "ap-northeast-3": "jp.anthropic.claude-sonnet-4-5-20250929-v1:0", + "ap-southeast-2": "au.anthropic.claude-sonnet-4-5-20250929-v1:0", + "ap-southeast-4": "au.anthropic.claude-sonnet-4-5-20250929-v1:0", + "ca-central-1": "us.anthropic.claude-sonnet-4-5-20250929-v1:0", + "eu-central-1": "eu.anthropic.claude-sonnet-4-5-20250929-v1:0", + "eu-north-1": "eu.anthropic.claude-sonnet-4-5-20250929-v1:0", + "eu-south-1": "eu.anthropic.claude-sonnet-4-5-20250929-v1:0", + "eu-south-2": "eu.anthropic.claude-sonnet-4-5-20250929-v1:0", + "eu-west-1": "eu.anthropic.claude-sonnet-4-5-20250929-v1:0", + "eu-west-2": "eu.anthropic.claude-sonnet-4-5-20250929-v1:0", + "eu-west-3": "eu.anthropic.claude-sonnet-4-5-20250929-v1:0", + "us-east-1": "us.anthropic.claude-sonnet-4-5-20250929-v1:0", + "us-east-2": "us.anthropic.claude-sonnet-4-5-20250929-v1:0", + "us-west-1": "us.anthropic.claude-sonnet-4-5-20250929-v1:0", + "us-west-2": "us.anthropic.claude-sonnet-4-5-20250929-v1:0" + }, + "anthropic.claude-3-haiku-20240307-v1:0": { + "ap-northeast-1": "apac.anthropic.claude-3-haiku-20240307-v1:0", + "ap-northeast-2": "apac.anthropic.claude-3-haiku-20240307-v1:0", + "ap-south-1": "apac.anthropic.claude-3-haiku-20240307-v1:0", + "ap-southeast-1": "apac.anthropic.claude-3-haiku-20240307-v1:0", + "ap-southeast-2": "apac.anthropic.claude-3-haiku-20240307-v1:0", + "eu-central-1": "eu.anthropic.claude-3-haiku-20240307-v1:0", + "eu-west-1": "eu.anthropic.claude-3-haiku-20240307-v1:0", + "eu-west-3": "eu.anthropic.claude-3-haiku-20240307-v1:0", + "us-east-1": "us.anthropic.claude-3-haiku-20240307-v1:0", + "us-east-2": "us.anthropic.claude-3-haiku-20240307-v1:0", + "us-west-2": "us.anthropic.claude-3-haiku-20240307-v1:0" + }, + "amazon.nova-2-lite-v1:0": { + "ap-northeast-1": "jp.amazon.nova-2-lite-v1:0", + "ap-northeast-3": "jp.amazon.nova-2-lite-v1:0", + "ca-central-1": "us.amazon.nova-2-lite-v1:0", + "eu-central-1": "eu.amazon.nova-2-lite-v1:0", + "eu-north-1": "eu.amazon.nova-2-lite-v1:0", + "eu-south-1": "eu.amazon.nova-2-lite-v1:0", + "eu-south-2": "eu.amazon.nova-2-lite-v1:0", + "eu-west-1": "eu.amazon.nova-2-lite-v1:0", + "eu-west-3": "eu.amazon.nova-2-lite-v1:0", + "us-east-1": "us.amazon.nova-2-lite-v1:0", + "us-east-2": "us.amazon.nova-2-lite-v1:0", + "us-west-1": "us.amazon.nova-2-lite-v1:0", + "us-west-2": "us.amazon.nova-2-lite-v1:0" + }, + "amazon.nova-lite-v1:0": { + "ap-northeast-1": "apac.amazon.nova-lite-v1:0", + "ap-northeast-2": "apac.amazon.nova-lite-v1:0", + "ap-northeast-3": "apac.amazon.nova-lite-v1:0", + "ap-south-1": "apac.amazon.nova-lite-v1:0", + "ap-southeast-1": "apac.amazon.nova-lite-v1:0", + "ap-southeast-2": "apac.amazon.nova-lite-v1:0", + "ca-central-1": "ca.amazon.nova-lite-v1:0", + "ca-west-1": "ca.amazon.nova-lite-v1:0", + "eu-central-1": "eu.amazon.nova-lite-v1:0", + "eu-north-1": "eu.amazon.nova-lite-v1:0", + "eu-west-1": "eu.amazon.nova-lite-v1:0", + "eu-west-3": "eu.amazon.nova-lite-v1:0", + "us-east-1": "us.amazon.nova-lite-v1:0", + "us-east-2": "us.amazon.nova-lite-v1:0", + "us-west-1": "us.amazon.nova-lite-v1:0", + "us-west-2": "us.amazon.nova-lite-v1:0" + }, + "amazon.nova-micro-v1:0": { + "ap-northeast-1": "apac.amazon.nova-micro-v1:0", + "ap-northeast-2": "apac.amazon.nova-micro-v1:0", + "ap-northeast-3": "apac.amazon.nova-micro-v1:0", + "ap-south-1": "apac.amazon.nova-micro-v1:0", + "ap-southeast-1": "apac.amazon.nova-micro-v1:0", + "ap-southeast-2": "apac.amazon.nova-micro-v1:0", + "eu-central-1": "eu.amazon.nova-micro-v1:0", + "eu-north-1": "eu.amazon.nova-micro-v1:0", + "eu-west-1": "eu.amazon.nova-micro-v1:0", + "eu-west-3": "eu.amazon.nova-micro-v1:0", + "us-east-1": "us.amazon.nova-micro-v1:0", + "us-east-2": "us.amazon.nova-micro-v1:0", + "us-west-2": "us.amazon.nova-micro-v1:0" + }, + "amazon.nova-pro-v1:0": { + "ap-northeast-1": "apac.amazon.nova-pro-v1:0", + "ap-northeast-2": "apac.amazon.nova-pro-v1:0", + "ap-northeast-3": "apac.amazon.nova-pro-v1:0", + "ap-south-1": "apac.amazon.nova-pro-v1:0", + "ap-southeast-1": "apac.amazon.nova-pro-v1:0", + "ap-southeast-2": "apac.amazon.nova-pro-v1:0", + "eu-central-1": "eu.amazon.nova-pro-v1:0", + "eu-north-1": "eu.amazon.nova-pro-v1:0", + "eu-west-1": "eu.amazon.nova-pro-v1:0", + "eu-west-3": "eu.amazon.nova-pro-v1:0", + "us-east-1": "us.amazon.nova-pro-v1:0", + "us-east-2": "us.amazon.nova-pro-v1:0", + "us-west-1": "us.amazon.nova-pro-v1:0", + "us-west-2": "us.amazon.nova-pro-v1:0" + } +} diff --git a/appsync-events-lambda-agentcore-cdk/cdk/constructs/standard_lambda.py b/appsync-events-lambda-agentcore-cdk/cdk/constructs/standard_lambda.py new file mode 100644 index 000000000..e067d44eb --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/cdk/constructs/standard_lambda.py @@ -0,0 +1,166 @@ +"""Reusable Lambda blueprint construct with Powertools, X-Ray, and log groups.""" + +from pathlib import Path +from aws_cdk import ( + aws_lambda as lambda_, + aws_logs as logs, + BundlingOptions, + Duration, + RemovalPolicy, + Stack, +) +from constructs import Construct +from cdk_nag import NagSuppressions + + +class StandardLambda(Construct): + """ + A reusable CDK Construct that provides a standardized blueprint for creating + AWS Lambda functions — similar to the Global section of an AWS SAM template. + + This construct automatically includes: + - A dedicated CloudWatch Log Group with configurable retention and removal policies, + along with IAM permissions for the Lambda function to write logs. + - The AWS Lambda Powertools for Python V3 layer (managed by AWS), pre-configured + with POWERTOOLS_SERVICE_NAME and LOG_LEVEL environment variables. + - X-Ray active tracing enabled by default. + - Container-based dependency bundling when a requirements.txt file is detected. + + All default settings can be overridden per function. Environment variables and + layers are merged (not replaced) so that Powertools config is always preserved + unless explicitly overridden. + + The function and its execution role are exposed as public attributes (self.function + and self.role) so consumers can grant additional IAM permissions after creation. + """ + + def __init__( + self, + scope: Construct, + construct_id: str, + handler: str, + code_path: str, + runtime: lambda_.Runtime = lambda_.Runtime.PYTHON_3_14, + log_retention: logs.RetentionDays = logs.RetentionDays.ONE_WEEK, + log_removal_policy: RemovalPolicy = RemovalPolicy.DESTROY, + **kwargs, + ) -> None: + super().__init__(scope, construct_id) + + stack = Stack.of(self) + + code = self._build_code(code_path, runtime) + + log_group = logs.LogGroup( + self, + "LogGroup", + retention=log_retention, + removal_policy=log_removal_policy, + ) + + # Default to ARM_64 for better price/performance + architecture = kwargs.pop("architecture", lambda_.Architecture.ARM_64) + + # Resolve Powertools layer ARN from architecture + runtime. + # Compare on .name because CDK Architecture objects are not singletons. + arch_suffix = "arm64" if architecture.name == "arm64" else "x86_64" + runtime_suffix = runtime.name.replace(".", "") + + powertools_layer = lambda_.LayerVersion.from_layer_version_arn( + self, + "PowertoolsLayer", + f"arn:aws:lambda:{stack.region}:017000801446:layer:AWSLambdaPowertoolsPythonV3-{runtime_suffix}-{arch_suffix}:27", + ) + + defaults = { + "runtime": runtime, + "architecture": architecture, + "timeout": Duration.seconds(30), + "memory_size": 256, + "layers": [powertools_layer], + "environment": { + "POWERTOOLS_SERVICE_NAME": construct_id, + "LOG_LEVEL": "DEBUG", + }, + "tracing": lambda_.Tracing.ACTIVE, + "log_group": log_group, + } + + # Pop layers and environment before the general merge so we can + # combine them additively rather than replace the defaults. + user_layers = kwargs.pop("layers", None) + user_environment = kwargs.pop("environment", None) + + merged_config = {**defaults, **kwargs} + + if user_layers is not None: + merged_config["layers"] = defaults.get("layers", []) + user_layers + + if user_environment is not None: + merged_config["environment"] = { + **defaults.get("environment", {}), + **user_environment, + } + + self.function = lambda_.Function(self, "Function", handler=handler, code=code, **merged_config) + self.function_arn = self.function.function_arn + self.function_name = self.function.function_name + + log_group.grant_write(self.function) + self.role = self.function.role + + NagSuppressions.add_resource_suppressions( + self.function, + [ + { + "id": "AwsSolutions-IAM4", + "reason": "AWSLambdaBasicExecutionRole is the standard managed policy for Lambda logging.", + "applies_to": [ + "Policy::arn::iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + ], + }, + ], + apply_to_children=True, + ) + + def _build_code(self, code_path: str, runtime: lambda_.Runtime) -> lambda_.Code: + """Build the Lambda deployment package. + + If the code directory has a requirements.txt with real dependencies, + uses container bundling to pip install them into the deployment zip. + Otherwise packages the code directory as-is. + """ + code_dir = Path(code_path) + requirements_file = code_dir / "requirements.txt" + + if requirements_file.exists() and self._has_dependencies(requirements_file): + return lambda_.Code.from_asset( + code_path, + bundling=BundlingOptions( + image=runtime.bundling_image, + command=[ + "bash", + "-c", + " && ".join( + [ + "pip install -r requirements.txt -t /asset-output/", + "cp -r . /asset-output", + ] + ), + ], + ), + ) + + return lambda_.Code.from_asset(code_path) + + def _has_dependencies(self, requirements_file: Path) -> bool: + """Check if a requirements.txt contains actual package dependencies (not just comments).""" + try: + with open(requirements_file, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + return True + return False + except Exception: + return False diff --git a/appsync-events-lambda-agentcore-cdk/cdk/constructs/stream_relay.py b/appsync-events-lambda-agentcore-cdk/cdk/constructs/stream_relay.py new file mode 100644 index 000000000..0b7de9218 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/cdk/constructs/stream_relay.py @@ -0,0 +1,49 @@ +"""CDK Construct for the stream relay Lambda. + +Consumes the SSE stream from AgentCore Runtime and publishes +chunks to AppSync Events. Invoked asynchronously by the agent invoker. +""" + +from aws_cdk import ( + Duration, + aws_iam as iam, +) +from constructs import Construct + +from cdk.constructs.standard_lambda import StandardLambda + + +class StreamRelayConstruct(Construct): + """Creates the stream relay Lambda that bridges AgentCore to AppSync Events.""" + + def __init__( + self, + scope: Construct, + construct_id: str, + *, + agent_runtime_arn: str, + **kwargs, + ) -> None: + super().__init__(scope, construct_id, **kwargs) + + self.standard_lambda = StandardLambda( + self, + "Lambda", + handler="index.handler", + code_path="functions/stream_relay", + timeout=Duration.minutes(5), + environment={ + "AGENT_RUNTIME_ARN": agent_runtime_arn, + }, + ) + + # Grant permission to invoke AgentCore runtime + self.standard_lambda.function.add_to_role_policy( + iam.PolicyStatement( + actions=["bedrock-agentcore:InvokeAgentRuntime"], + resources=[ + agent_runtime_arn, + f"{agent_runtime_arn}/*", + ], + ), + ) diff --git a/appsync-events-lambda-agentcore-cdk/cdk/stack.py b/appsync-events-lambda-agentcore-cdk/cdk/stack.py new file mode 100644 index 000000000..91d57d193 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/cdk/stack.py @@ -0,0 +1,121 @@ +"""Main CDK stack for the AppSync Events + Lambda + AgentCore architecture.""" + +from aws_cdk import Stack +from cdk_nag import NagSuppressions +from constructs import Construct + +from cdk.constructs.chat_service import ChatServiceConstruct +from cdk.constructs.chat_agent import ChatAgentConstruct +from cdk.constructs.stream_relay import StreamRelayConstruct + + +class ChatStack(Stack): + """AppSync Events chat stack with Lambda invoker and AgentCore runtime.""" + + def __init__( + self, + scope: Construct, + construct_id: str, + **kwargs, + ) -> None: + super().__init__(scope, construct_id, **kwargs) + + self.chat_agent = ChatAgentConstruct( + self, + "ChatAgent", + model_id=self.node.try_get_context("model_id"), + ) + + self.stream_relay = StreamRelayConstruct( + self, + "StreamRelay", + agent_runtime_arn=(self.chat_agent.runtime.attr_agent_runtime_arn), + ) + + self.chat_service = ChatServiceConstruct( + self, + "ChatService", + stream_relay_function=(self.stream_relay.standard_lambda.function), + ) + + # Pass AppSync endpoint and API key to the stream relay so it + # can publish response chunks back to the client. + self.stream_relay.standard_lambda.function.add_environment( + "APPSYNC_HTTP_ENDPOINT", + self.chat_service.api.http_dns, + ) + self.stream_relay.standard_lambda.function.add_environment( + "APPSYNC_API_KEY", + self.chat_service.api.api_keys["Default"].attr_api_key, + ) + + self._add_nag_suppressions() + + def _add_nag_suppressions(self): + """Add cdk-nag suppressions for CDK-managed resources.""" + + for child in self.node.children: + if child.node.id.startswith("LogRetention"): + NagSuppressions.add_resource_suppressions( + child, + [ + { + "id": "AwsSolutions-IAM4", + "reason": ("CDK LogRetention custom resource " "uses AWS managed policy."), + "applies_to": [ + "Policy::arn::iam::aws:" + "policy/service-role/" + "AWSLambdaBasicExecutionRole", + ], + }, + { + "id": "AwsSolutions-IAM5", + "reason": ("CDK LogRetention custom resource " + "requires wildcard to set retention " + "on any log group."), + "applies_to": ["Resource::*"], + }, + ], + apply_to_children=True, + ) + + NagSuppressions.add_resource_suppressions( + self.chat_service, + [ + { + "id": "AwsSolutions-IAM5", + "reason": ("grant_invoke appends :* to allow invocation " "of any version/alias of the target Lambda."), + "applies_to": [ + "Resource:::*", + ], + }, + { + "id": "AwsSolutions-IAM5", + "reason": ("AppSync data source grant_invoke appends :* " "for Lambda version/alias invocations."), + "applies_to": [ + "Resource:::*", + ], + }, + { + "id": "AwsSolutions-IAM5", + "reason": ("X-Ray actions do not support " + "resource-level permissions."), + "applies_to": ["Resource::*"], + }, + ], + apply_to_children=True, + ) + + NagSuppressions.add_resource_suppressions( + self.stream_relay, + [ + { + "id": "AwsSolutions-IAM5", + "reason": ("AgentCore runtime ARN requires /* suffix " "for endpoint invocation."), + "applies_to": [ + "Resource::/*", + ], + }, + { + "id": "AwsSolutions-IAM5", + "reason": ("X-Ray actions do not support " "resource-level permissions."), + "applies_to": ["Resource::*"], + }, + ], + apply_to_children=True, + ) diff --git a/appsync-events-lambda-agentcore-cdk/example-pattern.json b/appsync-events-lambda-agentcore-cdk/example-pattern.json new file mode 100644 index 000000000..dec0a254f --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/example-pattern.json @@ -0,0 +1,61 @@ +{ + "title": "Real-time streaming chat with AppSync Events, Lambda, and Bedrock AgentCore", + "description": "Stream AI agent responses in real time using AppSync Events WebSockets with a two-phase Lambda architecture and Bedrock AgentCore Runtime.", + "language": "Python", + "level": "300", + "framework": "AWS CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern deploys a real-time streaming chat service using AWS AppSync Events with a two-phase Lambda architecture to invoke a Strands agent running on Amazon Bedrock AgentCore Runtime.", + "The client publishes messages to an inbound channel via HTTP. AppSync Events triggers an agent invoker Lambda via direct Lambda integration (synchronous only), which asynchronously invokes a separate stream relay Lambda and returns immediately. Two Lambdas are needed because AppSync Events does not support async Lambda invocation, so the invoker must return quickly. The stream relay calls the AgentCore Runtime, consumes the Server-Sent Events (SSE) stream, and publishes response chunks back to a separate response channel.", + "The client receives agent response tokens in real time via a WebSocket subscription. Separate channel namespaces for inbound and outbound traffic prevent re-invocation loops." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/appsync-events-lambda-agentcore-cdk", + "templateURL": "serverless-patterns/appsync-events-lambda-agentcore-cdk", + "projectFolder": "appsync-events-lambda-agentcore-cdk", + "templateFile": "cdk/stack.py" + } + }, + "resources": { + "bullets": [ + { + "text": "AppSync Events API", + "link": "https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-welcome.html" + }, + { + "text": "Amazon Bedrock AgentCore", + "link": "https://docs.aws.amazon.com/bedrock/latest/userguide/agentcore.html" + } + ] + }, + "deploy": { + "text": [ + "python -m venv .venv && source .venv/bin/activate", + "pip install -r requirements.txt", + "cdk bootstrap", + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions using the AppSync Pub/Sub Editor and automated tests." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: cdk destroy." + ] + }, + "authors": [ + { + "name": "Pete Davis", + "image": "https://github.com/pjdavis-aws.png", + "bio": "Senior Partner Solution Architect at AWS", + "linkedin": "peter-davis-2676585" + } + ] +} diff --git a/appsync-events-lambda-agentcore-cdk/functions/agent_invoker/index.py b/appsync-events-lambda-agentcore-cdk/functions/agent_invoker/index.py new file mode 100644 index 000000000..63d599e60 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/functions/agent_invoker/index.py @@ -0,0 +1,91 @@ +"""Agent invoker — thin dispatcher for AppSync Events direct Lambda integration. + +Receives chat events, invokes the stream relay Lambda asynchronously, +and returns immediately so AppSync gets a fast response. + +Requires sessionId in the message payload — used for both session +persistence and channel isolation. +""" + +import json +import os + +import boto3 +from aws_lambda_powertools import Logger, Tracer + +logger = Logger() +tracer = Tracer() + +lambda_client = boto3.client("lambda") + +STREAM_RELAY_ARN = os.environ["STREAM_RELAY_ARN"] + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def handler(event: dict, context) -> dict: + """Handle incoming chat messages from AppSync Events.""" + logger.info("Received event", extra={"event": event}) + + incoming_events = event.get("events", []) + channel = event.get("info", {}).get("channel", {}).get("path", "/chat/default") + results = [] + + for e in incoming_events: + payload = e.get("payload", {}) + event_id = e.get("id") + + message = payload.get("message") + if not message or not str(message).strip(): + results.append( + { + "id": event_id, + "payload": { + "error": "message is required and cannot be empty", + }, + } + ) + continue + + session_id = payload.get("sessionId") + if not session_id or not str(session_id).strip(): + results.append( + { + "id": event_id, + "payload": { + "error": "sessionId is required and cannot be empty", + }, + } + ) + continue + + relay_payload = { + "content": payload.get("message", ""), + "channel": f"/responses{channel}", + "eventId": event_id, + "sessionId": session_id, + } + + logger.info( + "Invoking stream relay", + extra={ + "event_id": event_id, + "channel": channel, + "session_id": session_id, + }, + ) + + lambda_client.invoke( + FunctionName=STREAM_RELAY_ARN, + InvocationType="Event", + Payload=json.dumps(relay_payload).encode(), + ) + + results.append( + { + "id": event_id, + "payload": {**payload, "sessionId": session_id}, + } + ) + + return {"events": results} diff --git a/appsync-events-lambda-agentcore-cdk/functions/agent_invoker/requirements.txt b/appsync-events-lambda-agentcore-cdk/functions/agent_invoker/requirements.txt new file mode 100644 index 000000000..61b2360d8 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/functions/agent_invoker/requirements.txt @@ -0,0 +1 @@ +# Dependencies provided by Lambda runtime (boto3) and Powertools layer diff --git a/appsync-events-lambda-agentcore-cdk/functions/stream_relay/index.py b/appsync-events-lambda-agentcore-cdk/functions/stream_relay/index.py new file mode 100644 index 000000000..a369feb49 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/functions/stream_relay/index.py @@ -0,0 +1,175 @@ +"""Stream relay — consumes SSE stream from AgentCore and publishes chunks to AppSync Events. + +Invoked asynchronously by the agent_invoker Lambda. Has up to 15 minutes +to consume the full agent response stream. + +Flow: +1. Receives agent runtime ARN, channel, and event context +2. Calls invoke_agent_runtime and consumes the SSE stream +3. Publishes each chunk to the AppSync Events channel +4. Publishes a completion event when the stream ends +""" + +import json +import os +import urllib.request + +import boto3 +from aws_lambda_powertools import Logger, Tracer + +logger = Logger() +tracer = Tracer() + +agentcore_client = boto3.client("bedrock-agentcore") + +APPSYNC_HTTP_ENDPOINT = os.environ["APPSYNC_HTTP_ENDPOINT"] +APPSYNC_API_KEY = os.environ["APPSYNC_API_KEY"] +AGENT_RUNTIME_ARN = os.environ["AGENT_RUNTIME_ARN"] + + +def _publish_to_channel(channel: str, event: dict): + """Publish an event to an AppSync Events channel via HTTP.""" + url = f"https://{APPSYNC_HTTP_ENDPOINT}/event" + body = json.dumps( + { + "channel": channel, + "events": [json.dumps(event)], + } + ).encode() + + req = urllib.request.Request( + url, + data=body, + method="POST", + headers={ + "Content-Type": "application/json", + "x-api-key": APPSYNC_API_KEY, + }, + ) + try: + with urllib.request.urlopen(req) as resp: + logger.debug("Published to %s: %s", channel, resp.status) + except Exception: + logger.exception("Failed to publish to %s", channel) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def handler(event: dict, context) -> dict: + """Consume agent SSE stream and relay chunks to AppSync Events.""" + channel = event["channel"] + event_id = event["eventId"] + content = event["content"] + session_id = event["sessionId"] + + logger.info( + "Starting stream relay", + extra={ + "channel": channel, + "event_id": event_id, + "session_id": session_id, + }, + ) + + payload = json.dumps( + { + "content": content, + "sessionId": session_id, + } + ).encode() + response = agentcore_client.invoke_agent_runtime( + agentRuntimeArn=AGENT_RUNTIME_ARN, + payload=payload, + ) + + content_type = response.get("contentType", "") + sequence = 0 + full_response = "" + current_chunk = "" + + if "text/event-stream" in content_type: + # Streaming SSE response — consume line by line + for line in response["response"].iter_lines(chunk_size=1024): + if not line: + continue + decoded = line.decode("utf-8") + if not decoded.startswith("data: "): + continue + + data_str = decoded[6:] # strip "data: " prefix + try: + data = json.loads(data_str) + except json.JSONDecodeError: + # Raw text chunk + current_chunk += data_str + full_response += data_str + continue + + # Skip non-dict events (shouldn't happen but be safe) + if not isinstance(data, dict): + logger.debug("Non-dict SSE event", extra={"data": data}) + continue + + # Skip Strands SDK control events + if any( + k in data + for k in ( + "init_event_loop", + "start", + "start_event_loop", + "force_stop", + "complete", + ) + ): + logger.debug("Control event", extra={"data": data}) + continue + + # Strands streams text tokens in the "data" key + text = data.get("data", "") + if isinstance(text, str) and text: + current_chunk += text + full_response += text + + if len(current_chunk) >= 50 or text.endswith((".", "!", "?", "\n")): + _publish_to_channel( + channel, + { + "type": "chunk", + "sequence": sequence, + "content": current_chunk, + "eventId": event_id, + }, + ) + sequence += 1 + current_chunk = "" + else: + # Non-streaming JSON response + raw = response["response"].read().decode("utf-8") + try: + body = json.loads(raw) + full_response = body.get("message", raw) + except json.JSONDecodeError: + full_response = raw + + # Publish completion event (includes any remaining chunk content) + _publish_to_channel( + channel, + { + "type": "complete", + "sequence": sequence, + "content": current_chunk, + "response": full_response, + "eventId": event_id, + }, + ) + + logger.info( + "Stream relay complete", + extra={ + "event_id": event_id, + "chunks_sent": sequence + 1, + "response_length": len(full_response), + }, + ) + + return {"status": "success", "chunks_sent": sequence + 1} diff --git a/appsync-events-lambda-agentcore-cdk/functions/stream_relay/requirements.txt b/appsync-events-lambda-agentcore-cdk/functions/stream_relay/requirements.txt new file mode 100644 index 000000000..61b2360d8 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/functions/stream_relay/requirements.txt @@ -0,0 +1 @@ +# Dependencies provided by Lambda runtime (boto3) and Powertools layer diff --git a/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-publish.jpg b/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-publish.jpg new file mode 100644 index 000000000..d58091df8 Binary files /dev/null and b/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-publish.jpg differ diff --git a/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-subscribe-result.jpg b/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-subscribe-result.jpg new file mode 100644 index 000000000..b874049a8 Binary files /dev/null and b/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-subscribe-result.jpg differ diff --git a/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-subscribe.jpg b/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-subscribe.jpg new file mode 100644 index 000000000..13a3fef01 Binary files /dev/null and b/appsync-events-lambda-agentcore-cdk/images/appsync-pubsub-subscribe.jpg differ diff --git a/appsync-events-lambda-agentcore-cdk/images/architecture.drawio b/appsync-events-lambda-agentcore-cdk/images/architecture.drawio new file mode 100644 index 000000000..7a1797ac3 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/images/architecture.drawio @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/appsync-events-lambda-agentcore-cdk/images/architecture.png b/appsync-events-lambda-agentcore-cdk/images/architecture.png new file mode 100644 index 000000000..818b12d72 Binary files /dev/null and b/appsync-events-lambda-agentcore-cdk/images/architecture.png differ diff --git a/appsync-events-lambda-agentcore-cdk/mise-tasks/bedrock/models/generate b/appsync-events-lambda-agentcore-cdk/mise-tasks/bedrock/models/generate new file mode 100755 index 000000000..eb297da46 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/mise-tasks/bedrock/models/generate @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +#MISE description="Refresh cross-region inference profile mapping for a model" +#USAGE arg "" help="Base model ID (e.g. anthropic.claude-haiku-4-5-20251001-v1:0)" + +"""Query all standard AWS regions for cross-region inference profiles +matching the given model ID and upsert the mapping into +cdk/constructs/inference_profiles.json. +""" + +import json +import os +import re +import sys + +from pathlib import Path + +import boto3 + +MAPPING_FILE = Path(os.environ.get("MISE_PROJECT_DIR", ".")) / "cdk" / "constructs" / "inference_profiles.json" + +REGIONS = [ + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "eu-central-1", + "eu-north-1", + "ap-southeast-1", + "ap-southeast-2", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-south-1", + "ca-central-1", + "sa-east-1", +] + +MODEL_ID_PATTERN = re.compile(r"^[a-z][a-z0-9-]+\.[a-z][a-z0-9._-]+(:\d+)?$") + + +def validate_model_id(model_id: str) -> None: + if not MODEL_ID_PATTERN.match(model_id): + print(f"Error: '{model_id}' is not a valid model ID.", file=sys.stderr) + print("Expected format: provider.model-name-version:N", file=sys.stderr) + print("Example: anthropic.claude-haiku-4-5-20251001-v1:0", file=sys.stderr) + sys.exit(1) + + +def find_profiles_in_region(region: str, model_id: str) -> dict[str, str]: + """Query a region for cross-region inference profiles containing the model. + + Returns {covered_region: profile_id} for each region in the profile's models array. + """ + client = boto3.client("bedrock", region_name=region) + region_map = {} + + try: + paginator = client.get_paginator("list_inference_profiles") + for page in paginator.paginate(typeEquals="SYSTEM_DEFINED"): + for profile in page.get("inferenceProfileSummaries", []): + profile_id = profile["inferenceProfileId"] + + # Match profiles ending with the model ID, skip global profiles + if not profile_id.endswith(model_id): + continue + if profile_id.startswith("global."): + continue + + # Extract covered regions from the models array + for model_ref in profile.get("models", []): + arn = model_ref.get("modelArn", "") + arn_parts = arn.split(":") + if len(arn_parts) >= 4 and arn_parts[3]: + region_map[arn_parts[3]] = profile_id + + except Exception as e: + print(f" Warning: could not query {region}: {e}", file=sys.stderr) + + return region_map + + +def main(): + model_id = os.environ.get("usage_model_id", "").strip() + if not model_id: + print("Error: model_id argument is required.", file=sys.stderr) + sys.exit(1) + + validate_model_id(model_id) + print(f"Refreshing inference profiles for: {model_id}") + + all_regions: dict[str, str] = {} + + for region in REGIONS: + print(f" Querying {region}...", end="", flush=True) + region_map = find_profiles_in_region(region, model_id) + new_count = len(set(region_map.keys()) - set(all_regions.keys())) + all_regions.update(region_map) + + if region_map: + print(f" found {new_count} new region(s)") + else: + print(" no profiles") + + if not all_regions: + print(f"\nError: No cross-region inference profiles found for '{model_id}'.", file=sys.stderr) + print("Check the model ID is correct and supports cross-region inference.", file=sys.stderr) + sys.exit(1) + + sorted_regions = dict(sorted(all_regions.items())) + + # Load existing mapping or start fresh + if MAPPING_FILE.exists(): + with open(MAPPING_FILE, encoding="utf-8") as f: + mappings = json.load(f) + else: + mappings = {} + + mappings[model_id] = sorted_regions + + MAPPING_FILE.parent.mkdir(parents=True, exist_ok=True) + with open(MAPPING_FILE, "w", encoding="utf-8") as f: + json.dump(mappings, f, indent=2) + f.write("\n") + + print(f"\nUpdated {MAPPING_FILE}") + print(f"Regions mapped for {model_id}:") + for r, pid in sorted_regions.items(): + print(f" {r} -> {pid}") + + +if __name__ == "__main__": + main() diff --git a/appsync-events-lambda-agentcore-cdk/mise-tasks/bedrock/models/refresh b/appsync-events-lambda-agentcore-cdk/mise-tasks/bedrock/models/refresh new file mode 100755 index 000000000..08c4ca4bd --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/mise-tasks/bedrock/models/refresh @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +#MISE description="Refresh inference profiles for all models in the mapping file" + +"""Reads model IDs from cdk/constructs/inference_profiles.json and +runs 'mise run bedrock:models:refresh' for each one. +""" + +import json +import os +import subprocess +import sys +from pathlib import Path + +MAPPING_FILE = Path(os.environ.get("MISE_PROJECT_DIR", ".")) / "cdk" / "constructs" / "inference_profiles.json" + + +def main(): + if not MAPPING_FILE.exists(): + print(f"Error: {MAPPING_FILE} not found. Run 'mise run bedrock:models:generate ' first.", file=sys.stderr) + sys.exit(1) + + with open(MAPPING_FILE) as f: + mappings = json.load(f) + + if not mappings: + print("No models found in mapping file.", file=sys.stderr) + sys.exit(1) + + model_ids = sorted(mappings.keys()) + print(f"Refreshing {len(model_ids)} model(s):\n") + + failed = [] + for model_id in model_ids: + print(f"{'=' * 60}") + print(f" {model_id}") + print(f"{'=' * 60}") + result = subprocess.run(["mise", "run", "bedrock:models:generate", model_id]) + if result.returncode != 0: + failed.append(model_id) + print() + + if failed: + print(f"\nFailed to refresh: {', '.join(failed)}", file=sys.stderr) + sys.exit(1) + + print(f"All {len(model_ids)} model(s) refreshed successfully.") + + +if __name__ == "__main__": + main() diff --git a/appsync-events-lambda-agentcore-cdk/mise.toml b/appsync-events-lambda-agentcore-cdk/mise.toml new file mode 100644 index 000000000..cd8e4ea0b --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/mise.toml @@ -0,0 +1,111 @@ +[env] +_.python.venv = { path = ".venv", create = true } +AWS_REGION = "eu-west-1" +STACK_NAME = "AppsyncLambdaAgentcore" + +[tools] +node = "22" +python = "3.14" +uv = "latest" +"npm:aws-cdk" = "latest" + +[tasks.init] +description = "Initialise the environment" +run = [ + "for f in functions/*/requirements.txt agents/*/requirements.txt requirements.txt requirements-dev.txt; do [ -f \"$f\" ] && uv pip install -r \"$f\"; done", + "cdk bootstrap" +] +run_windows = [ + "uv pip install -r requirements.txt", + "uv pip install -r requirements-dev.txt", + "cdk bootstrap" +] + +[tasks.clean] +description = "Remove build artifacts and dependencies" +run = [ + "rm -rf .venv cdk.out", + "find . -type d \\( -name '__pycache__' -o -name '.pytest_cache' \\) -exec rm -rf {} +" +] +run_windows = "echo Not implemented" + +[tasks."cdk:synth"] +description = "Synthesize CDK Stack" +run = "cdk synth -c stack_name=$STACK_NAME" + +[tasks."cdk:sync"] +description = "Sync CDK Stack - Development only" +run = "cdk deploy --watch -c stack_name=$STACK_NAME" + +[tasks."cdk:deploy"] +description = "Deploy CDK Stack" +run = "cdk deploy --require-approval never -c stack_name=$STACK_NAME" + +[tasks.deploy] +description = "Install dependencies, bootstrap, and deploy" +depends = "init" +depends_post = "cdk:deploy" + +[tasks."cdk:destroy"] +description = "Destroy CDK Stack" +run = "cdk destroy -c stack_name=$STACK_NAME" + +[tasks.destroy] +description = "Destroy stack and clean up orphaned CloudWatch log groups" +run = """ +echo "Collecting log group names from stack $STACK_NAME..." + +# AppSync API log group — extract API ID from ARN +APPSYNC_ARN=$(aws cloudformation list-stack-resources --stack-name $STACK_NAME --region $AWS_REGION \ + --query "StackResourceSummaries[?ResourceType=='AWS::AppSync::Api'].PhysicalResourceId" --output text) +APPSYNC_API_ID=$(echo "$APPSYNC_ARN" | grep -oE '[^/]+$') +APPSYNC_LG="/aws/appsync/apis/$APPSYNC_API_ID" + +# AgentCore Runtime log group +RUNTIME_ID=$(aws cloudformation list-stack-resources --stack-name $STACK_NAME --region $AWS_REGION \ + --query "StackResourceSummaries[?ResourceType=='AWS::BedrockAgentCore::Runtime'].PhysicalResourceId" --output text) +AGENTCORE_LG="/aws/bedrock-agentcore/runtimes/${RUNTIME_ID}-DEFAULT" + +# S3 Auto-Delete custom resource Lambda log group +S3_AUTO_DELETE_FN=$(aws cloudformation list-stack-resources --stack-name $STACK_NAME --region $AWS_REGION \ + --query "StackResourceSummaries[?ResourceType=='AWS::Lambda::Function' && contains(LogicalResourceId,'CustomS3AutoDelete')].PhysicalResourceId" --output text) +S3_AUTO_DELETE_LG="/aws/lambda/$S3_AUTO_DELETE_FN" + +# Log Retention Lambda log group +LOG_RETENTION_FN=$(aws cloudformation list-stack-resources --stack-name $STACK_NAME --region $AWS_REGION \ + --query "StackResourceSummaries[?ResourceType=='AWS::Lambda::Function' && contains(LogicalResourceId,'LogRetention')].PhysicalResourceId" --output text) +LOG_RETENTION_LG="/aws/lambda/$LOG_RETENTION_FN" + +LOG_GROUPS="$APPSYNC_LG $AGENTCORE_LG $S3_AUTO_DELETE_LG $LOG_RETENTION_LG" + +echo "Log groups to clean up after destroy:" +for lg in $LOG_GROUPS; do echo " $lg"; done + +mise run cdk:destroy + +echo "Deleting orphaned log groups..." +for lg in $LOG_GROUPS; do + if aws logs describe-log-groups --log-group-name-prefix "$lg" --region $AWS_REGION --query "logGroups[?logGroupName=='$lg'].logGroupName" --output text | grep -q .; then + echo "Deleting $lg" + aws logs delete-log-group --log-group-name "$lg" --region $AWS_REGION + fi +done +echo "Done." +""" +run_windows = "echo Not implemented" + +[tasks.test] +run = [ + "mise run test:unit", + "mise run test:integration" +] + +[tasks."test:unit"] +run = "pytest tests/unit -v" + +[tasks."test:integration"] +run = "pytest tests/integration -v" + +[tasks."test:integration:verbose"] +description = "Run integration tests with WebSocket output visible" +run = "pytest tests/integration -v -s" diff --git a/appsync-events-lambda-agentcore-cdk/pyproject.toml b/appsync-events-lambda-agentcore-cdk/pyproject.toml new file mode 100644 index 000000000..6b313bcfc --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/pyproject.toml @@ -0,0 +1,2 @@ +[tool.black] +line-length = 150 diff --git a/appsync-events-lambda-agentcore-cdk/requirements-dev.txt b/appsync-events-lambda-agentcore-cdk/requirements-dev.txt new file mode 100644 index 000000000..5c9ed205c --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/requirements-dev.txt @@ -0,0 +1,6 @@ +pytest>=9.0.2 +pytest-asyncio>=1.3.0 +boto3>=1.42.65 +websockets>=16.0 +aws-lambda-powertools>=3.25.0 +aws-xray-sdk>=2.15.0 diff --git a/appsync-events-lambda-agentcore-cdk/requirements.txt b/appsync-events-lambda-agentcore-cdk/requirements.txt new file mode 100644 index 000000000..8bd62fdee --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/requirements.txt @@ -0,0 +1,3 @@ +aws-cdk-lib>=2.242.0,<3.0.0 +constructs>=10.5.1,<11.0.0 +cdk-nag>=2.35.0,<3.0.0 diff --git a/appsync-events-lambda-agentcore-cdk/tests/__init__.py b/appsync-events-lambda-agentcore-cdk/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/appsync-events-lambda-agentcore-cdk/tests/integration/__init__.py b/appsync-events-lambda-agentcore-cdk/tests/integration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/appsync-events-lambda-agentcore-cdk/tests/integration/conftest.py b/appsync-events-lambda-agentcore-cdk/tests/integration/conftest.py new file mode 100644 index 000000000..3a84a5743 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/tests/integration/conftest.py @@ -0,0 +1,160 @@ +"""Shared fixtures for integration tests.""" + +# pylint: disable=redefined-outer-name + +import asyncio +import base64 +import json +import os +from contextlib import asynccontextmanager +import urllib.request +import uuid + +import boto3 +import pytest +import websockets + +# --------------------------------------------------------------------------- +# Output key prefixes (CDK appends hash suffixes) +# --------------------------------------------------------------------------- +_PREFIX_HTTP = "ChatServiceEventApiHttpEndpoint" +_PREFIX_WS = "ChatServiceEventApiRealtimeEndpoint" +_PREFIX_KEY = "ChatServiceEventApiApiKey" + +_DEFAULT_STACK_NAME = "AppsyncLambdaAgentcore" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="session") +def stack_outputs(): + """Fetch all CloudFormation stack outputs once for the test session.""" + stack_name = os.environ.get("STACK_NAME") or _DEFAULT_STACK_NAME + + region = os.environ.get("AWS_REGION") + if not region: + raise EnvironmentError("AWS_REGION environment variable must be set") + + cfn = boto3.client("cloudformation", region_name=region) + response = cfn.describe_stacks(StackName=stack_name) + raw = response["Stacks"][0].get("Outputs", []) + return {o["OutputKey"]: o["OutputValue"] for o in raw} + + +@pytest.fixture(scope="session") +def get_output(stack_outputs): + """Return a callable that looks up a stack output by key prefix.""" + + def _lookup(prefix: str) -> str: + for key, value in stack_outputs.items(): + if key.startswith(prefix): + return value + pytest.skip(f"Stack output starting with '{prefix}' not found") + return "" + + return _lookup + + +@pytest.fixture(scope="session") +def api_config(get_output): + """Resolve the three AppSync Events endpoints/key from stack outputs.""" + return { + "http_endpoint": get_output(_PREFIX_HTTP), + "realtime_endpoint": get_output(_PREFIX_WS), + "api_key": get_output(_PREFIX_KEY), + } + + +@pytest.fixture(scope="session") +def auth_subprotocol(api_config): + """Base64-encoded auth subprotocol string for WebSocket connections.""" + header = json.dumps( + { + "host": api_config["http_endpoint"], + "x-api-key": api_config["api_key"], + } + ).encode() + return "header-" + base64.b64encode(header).decode().rstrip("=") + + +@pytest.fixture(scope="session") +def publish(api_config): + """Return a callable that publishes to a channel via HTTP.""" + + def _do_publish(channel: str, message: dict) -> dict: + url = f"https://{api_config['http_endpoint']}/event" + payload = json.dumps( + { + "channel": f"/{channel}", + "events": [json.dumps(message)], + } + ).encode() + req = urllib.request.Request( + url, + data=payload, + method="POST", + headers={ + "Content-Type": "application/json", + "x-api-key": api_config["api_key"], + }, + ) + with urllib.request.urlopen(req) as resp: + return json.loads(resp.read().decode()) + + return _do_publish + + +@pytest.fixture +def subscribe(api_config, auth_subprotocol): + """Async context manager that connects, inits, and subscribes to a channel. + + Yields (ws, sub_id) — the WebSocket and subscription ID. + + Usage: + async with subscribe("/responses/chat/abc") as (ws, sub_id): + ... + """ + + @asynccontextmanager + async def _subscribe(channel: str): + ws_url = f"wss://{api_config['realtime_endpoint']}/event/realtime" + async with websockets.connect( + ws_url, + subprotocols=["aws-appsync-event-ws", auth_subprotocol], + additional_headers={}, + close_timeout=2, + ) as ws: + await ws.send(json.dumps({"type": "connection_init"})) + ack = json.loads(await ws.recv()) + assert ack["type"] == "connection_ack" + + sub_id = str(uuid.uuid4()) + await ws.send( + json.dumps( + { + "type": "subscribe", + "id": sub_id, + "channel": channel, + "authorization": { + "x-api-key": api_config["api_key"], + "host": api_config["http_endpoint"], + }, + } + ) + ) + + while True: + msg = json.loads( + await asyncio.wait_for(ws.recv(), timeout=10), + ) + if msg["type"] == "ka": + continue + assert msg["type"] == "subscribe_success" + break + + yield ws, sub_id + + return _subscribe diff --git a/appsync-events-lambda-agentcore-cdk/tests/integration/test_appsync_events.py b/appsync-events-lambda-agentcore-cdk/tests/integration/test_appsync_events.py new file mode 100644 index 000000000..e8c5de9ec --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/tests/integration/test_appsync_events.py @@ -0,0 +1,327 @@ +"""Integration tests for the deployed AppSync Event API. + +Tests HTTP publish, streaming responses, tool use (http_request, +calculator, current_time), multi-turn conversations, and error +handling for the full end-to-end flow. +""" + +import asyncio +import json +import re +import uuid +from datetime import datetime, timezone + +import pytest + + +async def _collect_response(ws, sub_id, timeout=60): + """Collect streaming events until a completion event arrives.""" + events = [] + complete = None + deadline = asyncio.get_event_loop().time() + timeout + + while asyncio.get_event_loop().time() < deadline: + try: + raw = await asyncio.wait_for(ws.recv(), timeout=10) + except asyncio.TimeoutError: + break + msg = json.loads(raw) + if msg["type"] == "ka": + continue + if msg["type"] == "data" and msg["id"] == sub_id: + event_data = msg["event"] + if isinstance(event_data, str): + event_data = json.loads(event_data) + events.append(event_data) + print(f"Chunk {len(events)}: {event_data}") + + payload = event_data.get("payload", event_data) + if payload.get("type") == "complete": + complete = payload + break + + return events, complete + + +def test_publish_returns_success(publish): + """Publish a chat message and verify the API accepts it.""" + body = publish("chat/test", {"message": "ping", "sessionId": str(uuid.uuid4())}) + + assert "successful" in body, f"Expected 'successful' key: {body}" + assert len(body["successful"]) == 1 + assert len(body["failed"]) == 0 + + +@pytest.mark.asyncio +async def test_http_request_tool(subscribe, publish): + """Ask the agent to fetch a URL using http_request and verify + it streams a summarised response back via the channel.""" + conversation_id = str(uuid.uuid4()) + publish_channel = f"chat/{conversation_id}" + + async with subscribe(f"/responses/{publish_channel}") as (ws, sub_id): + publish( + publish_channel, + { + "message": ("Fetch the AWS blog homepage at https://aws.amazon.com/blogs/ and show me the latest 5 blog posts titles."), + "sessionId": conversation_id, + }, + ) + + received, complete = await _collect_response(ws, sub_id) + + assert len(received) > 0, "Did not receive any data messages" + assert complete is not None, "Did not receive a completion event" + assert complete.get("response"), "Completion should contain a response" + + +@pytest.mark.asyncio +async def test_conversation_with_session(subscribe, publish): + """Test multi-turn conversation using sessionId for continuity. + + Turn 1: Ask the agent to fetch and summarise an AWS blog post. + Turn 2: Ask a follow-up question — the agent should remember + the blog content from the first turn via session persistence. + """ + session_id = str(uuid.uuid4()) + publish_channel = f"chat/{session_id}" + + async with subscribe(f"/responses/{publish_channel}") as (ws, sub_id): + # Turn 1: fetch and summarise an AWS blog post + publish( + publish_channel, + { + "message": ( + "Fetch https://aws.amazon.com/blogs/aws/ and give me " + "a short summary of the first blog post you see. " + "Keep it under 100 words." + ), + "sessionId": session_id, + }, + ) + + _, complete_1 = await _collect_response(ws, sub_id) + assert complete_1 is not None, "Turn 1 did not complete" + + # Turn 2: ask a follow-up — agent should remember the blog post + publish( + publish_channel, + { + "message": ("Based on that blog post, what AWS services were mentioned? " "List them out."), + "sessionId": session_id, + }, + ) + + _, complete_2 = await _collect_response(ws, sub_id) + assert complete_2 is not None, "Turn 2 did not complete" + response_2 = complete_2.get("response", "") + + assert len(response_2) > 0, "Turn 2 should have a non-empty response" + + +@pytest.mark.asyncio +async def test_unsubscribe_stops_receiving_events(subscribe, publish): + """After unsubscribing, the client should stop receiving events + on that channel while the WebSocket remains open.""" + conversation_id = str(uuid.uuid4()) + publish_channel = f"chat/{conversation_id}" + response_channel = f"/responses/{publish_channel}" + + async with subscribe(response_channel) as (ws, sub_id): + # Verify subscription works — publish and collect response + publish( + publish_channel, + { + "message": "Say hello", + "sessionId": conversation_id, + }, + ) + _, complete = await _collect_response(ws, sub_id, timeout=30) + assert complete is not None, "Should receive response while subscribed" + + # Unsubscribe from the channel + await ws.send( + json.dumps( + { + "type": "unsubscribe", + "id": sub_id, + } + ) + ) + + # Wait for unsubscribe ack + deadline = asyncio.get_event_loop().time() + 5 + while asyncio.get_event_loop().time() < deadline: + raw = await asyncio.wait_for(ws.recv(), timeout=5) + msg = json.loads(raw) + if msg["type"] == "ka": + continue + if msg["type"] == "unsubscribe_success": + break + + # Publish again — should NOT receive anything on this sub + new_session = str(uuid.uuid4()) + publish( + publish_channel, + { + "message": "Say goodbye", + "sessionId": new_session, + }, + ) + + # Wait briefly — we should only see keepalives, no data + got_data = False + deadline = asyncio.get_event_loop().time() + 10 + while asyncio.get_event_loop().time() < deadline: + try: + raw = await asyncio.wait_for(ws.recv(), timeout=5) + except asyncio.TimeoutError: + break + msg = json.loads(raw) + if msg["type"] == "ka": + continue + if msg["type"] == "data" and msg["id"] == sub_id: + got_data = True + break + + assert not got_data, "Should not receive data after unsubscribing" + + +@pytest.mark.asyncio +async def test_channel_isolation(subscribe, publish): + """Messages published to one conversation channel should not + leak to a subscriber on a different conversation channel.""" + id_a = str(uuid.uuid4()) + id_b = str(uuid.uuid4()) + + async with subscribe(f"/responses/chat/{id_a}") as (ws_a, sub_a): + async with subscribe(f"/responses/chat/{id_b}") as (ws_b, sub_b): + # Publish only to channel A + publish( + f"chat/{id_a}", + { + "message": "Say hello from channel A", + "sessionId": id_a, + }, + ) + + # Channel A should receive the response + _, complete_a = await _collect_response( + ws_a, + sub_a, + timeout=30, + ) + assert complete_a is not None, "Channel A should receive a response" + + # Channel B should NOT receive anything + got_data_b = False + deadline = asyncio.get_event_loop().time() + 10 + while asyncio.get_event_loop().time() < deadline: + try: + raw = await asyncio.wait_for(ws_b.recv(), timeout=5) + except asyncio.TimeoutError: + break + msg = json.loads(raw) + if msg["type"] == "ka": + continue + if msg["type"] == "data" and msg["id"] == sub_b: + got_data_b = True + break + + assert not got_data_b, "Channel B should not receive events from channel A" + + +async def _expect_error_event(subscribe, publish, channel_id, payload, expected_text): + """Publish an invalid payload and verify the Lambda returns an error + event to WebSocket subscribers on the chat channel.""" + publish_channel = f"chat/{channel_id}" + + async with subscribe(f"/{publish_channel}") as (ws, sub_id): + publish(publish_channel, payload) + + deadline = asyncio.get_event_loop().time() + 15 + while asyncio.get_event_loop().time() < deadline: + try: + raw = await asyncio.wait_for(ws.recv(), timeout=5) + except asyncio.TimeoutError: + break + msg = json.loads(raw) + if msg["type"] == "ka": + continue + if msg["type"] == "data" and msg["id"] == sub_id: + event_data = msg["event"] + if isinstance(event_data, str): + event_data = json.loads(event_data) + error = event_data.get("payload", event_data).get("error", "") + if error: + assert expected_text in error.lower(), f"Expected '{expected_text}' in error: {error}" + return + + pytest.fail(f"Did not receive error event for payload: {payload}") + + +@pytest.mark.asyncio +async def test_calculator_tool(subscribe, publish): + """Ask the agent to perform a calculation and verify it uses the + calculator tool and returns the correct result.""" + conversation_id = str(uuid.uuid4()) + publish_channel = f"chat/{conversation_id}" + + async with subscribe(f"/responses/{publish_channel}") as (ws, sub_id): + publish( + publish_channel, + { + "message": "Use the calculator to compute 347 * 29. Reply with only the number.", + "sessionId": conversation_id, + }, + ) + + _, complete = await _collect_response(ws, sub_id) + assert complete is not None, "Did not receive a completion event" + response = complete.get("response", "") + assert "10063" in response, f"Expected calculator result 10063 in response: {response}" + + +@pytest.mark.asyncio +async def test_current_time_tool(subscribe, publish): + """Ask the agent for the current UTC time and verify the response + matches the actual time to the minute (±2 min grace).""" + conversation_id = str(uuid.uuid4()) + publish_channel = f"chat/{conversation_id}" + + async with subscribe(f"/responses/{publish_channel}") as (ws, sub_id): + publish( + publish_channel, + { + "message": ( + "Get the current time in UTC. " "Reply with ONLY the time in this exact format: YYYY-MM-DD HH:MM " "For example: 2026-03-11 14:05" + ), + "sessionId": conversation_id, + }, + ) + + now_utc = datetime.now(timezone.utc) + _, complete = await _collect_response(ws, sub_id) + assert complete is not None, "Did not receive a completion event" + response = complete.get("response", "") + + match = re.search(r"\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}", response) + assert match, f"Could not find YYYY-MM-DD HH:MM in response: {response}" + + reported = datetime.strptime(match.group(), "%Y-%m-%d %H:%M").replace( + tzinfo=timezone.utc, + ) + diff = abs((reported - now_utc).total_seconds()) + assert diff <= 120, f"Reported time {match.group()} differs from actual " f"{now_utc.strftime('%Y-%m-%d %H:%M')} by {diff:.0f}s (max 120s)" + + +@pytest.mark.asyncio +async def test_missing_session_id_returns_error(subscribe, publish): + """Publishing without a sessionId should return an error via WebSocket.""" + await _expect_error_event( + subscribe, + publish, + channel_id=str(uuid.uuid4()), + payload={"message": "hello"}, + expected_text="sessionid", + ) diff --git a/appsync-events-lambda-agentcore-cdk/tests/unit/__init__.py b/appsync-events-lambda-agentcore-cdk/tests/unit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/appsync-events-lambda-agentcore-cdk/tests/unit/conftest.py b/appsync-events-lambda-agentcore-cdk/tests/unit/conftest.py new file mode 100644 index 000000000..23bed0f68 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/tests/unit/conftest.py @@ -0,0 +1,44 @@ +"""Unit test configuration — sets up isolated environment before any imports.""" + +import os +from dataclasses import dataclass + +import pytest + +# Prevent any real AWS service calls — fake credentials and region +_REGION = os.environ.get("AWS_REGION", "eu-west-1") + +os.environ["AWS_ACCESS_KEY_ID"] = "testing" +os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" +os.environ["AWS_SECURITY_TOKEN"] = "testing" +os.environ["AWS_SESSION_TOKEN"] = "testing" +os.environ.setdefault("AWS_DEFAULT_REGION", _REGION) +os.environ.setdefault("AWS_REGION", _REGION) + +# Disable X-Ray tracing so Tracer uses a no-op provider +os.environ["POWERTOOLS_TRACE_DISABLED"] = "true" +os.environ["POWERTOOLS_SERVICE_NAME"] = "test" + +# Agent invoker Lambda env vars +os.environ["STREAM_RELAY_ARN"] = f"arn:aws:lambda:{_REGION}:123456789012:function:stream-relay" + +# Stream relay Lambda env vars +os.environ["APPSYNC_HTTP_ENDPOINT"] = f"test.appsync-api.{_REGION}.amazonaws.com" +os.environ["APPSYNC_API_KEY"] = "test-api-key" +os.environ["AGENT_RUNTIME_ARN"] = f"arn:aws:bedrock-agentcore:{_REGION}:123456789012:runtime/test" + + +@dataclass +class FakeLambdaContext: + """Minimal Lambda context for Powertools inject_lambda_context.""" + + function_name: str = "test-function" + memory_limit_in_mb: int = 256 + invoked_function_arn: str = f"arn:aws:lambda:{_REGION}:123456789012:function:test" + aws_request_id: str = "test-request-id" + + +@pytest.fixture +def lambda_context(): + """Provide a fake Lambda context for Powertools.""" + return FakeLambdaContext() diff --git a/appsync-events-lambda-agentcore-cdk/tests/unit/test_agent_invoker.py b/appsync-events-lambda-agentcore-cdk/tests/unit/test_agent_invoker.py new file mode 100644 index 000000000..d2c4be5d2 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/tests/unit/test_agent_invoker.py @@ -0,0 +1,122 @@ +"""Unit tests for the agent invoker Lambda handler.""" + +import json +import os +from unittest.mock import patch + +from functions.agent_invoker.index import handler + + +def _make_event(payload, channel="/chat/test-123"): + """Build a minimal AppSync Events direct Lambda integration event.""" + return { + "events": [{"id": "evt-1", "payload": payload}], + "info": {"channel": {"path": channel}}, + } + + +def _make_multi_event(payloads, channel="/chat/test-123"): + """Build an event with multiple published messages.""" + return { + "events": [{"id": f"evt-{i}", "payload": p} for i, p in enumerate(payloads)], + "info": {"channel": {"path": channel}}, + } + + +@patch("functions.agent_invoker.index.lambda_client") +def test_valid_message_invokes_stream_relay(mock_client, lambda_context): + """Valid payload triggers async Lambda invoke with correct relay payload.""" + event = _make_event({"message": "hello", "sessionId": "sess-1"}) + result = handler(event, lambda_context) + + mock_client.invoke.assert_called_once() + call_kwargs = mock_client.invoke.call_args[1] + assert call_kwargs["InvocationType"] == "Event" + assert call_kwargs["FunctionName"] == os.environ["STREAM_RELAY_ARN"] + + relay = json.loads(call_kwargs["Payload"]) + assert relay["content"] == "hello" + assert relay["sessionId"] == "sess-1" + + assert len(result["events"]) == 1 + assert "error" not in result["events"][0]["payload"] + + +@patch("functions.agent_invoker.index.lambda_client") +def test_missing_message_returns_error(mock_client, lambda_context): + """Missing message key returns error without invoking stream relay.""" + event = _make_event({"sessionId": "sess-1"}) + result = handler(event, lambda_context) + + mock_client.invoke.assert_not_called() + assert "message" in result["events"][0]["payload"]["error"].lower() + + +@patch("functions.agent_invoker.index.lambda_client") +def test_empty_message_returns_error(mock_client, lambda_context): + """Whitespace-only message is rejected.""" + event = _make_event({"message": " ", "sessionId": "sess-1"}) + result = handler(event, lambda_context) + + mock_client.invoke.assert_not_called() + assert "message" in result["events"][0]["payload"]["error"].lower() + + +@patch("functions.agent_invoker.index.lambda_client") +def test_missing_session_id_returns_error(mock_client, lambda_context): + """Missing sessionId returns error without invoking stream relay.""" + event = _make_event({"message": "hello"}) + result = handler(event, lambda_context) + + mock_client.invoke.assert_not_called() + assert "sessionid" in result["events"][0]["payload"]["error"].lower() + + +@patch("functions.agent_invoker.index.lambda_client") +def test_empty_session_id_returns_error(mock_client, lambda_context): + """Whitespace-only sessionId is rejected.""" + event = _make_event({"message": "hello", "sessionId": " "}) + result = handler(event, lambda_context) + + mock_client.invoke.assert_not_called() + assert "sessionid" in result["events"][0]["payload"]["error"].lower() + + +@patch("functions.agent_invoker.index.lambda_client") +def test_empty_payload_returns_error(mock_client, lambda_context): + """Empty payload returns message error (first validation hit).""" + event = _make_event({}) + result = handler(event, lambda_context) + + mock_client.invoke.assert_not_called() + assert "message" in result["events"][0]["payload"]["error"].lower() + + +@patch("functions.agent_invoker.index.lambda_client") +def test_response_channel_prefixed_with_responses(mock_client, lambda_context): + """Relay payload channel should be /responses{original_channel}.""" + event = _make_event( + {"message": "hi", "sessionId": "s1"}, + channel="/chat/conv-abc", + ) + handler(event, lambda_context) + + relay = json.loads(mock_client.invoke.call_args[1]["Payload"]) + assert relay["channel"] == "/responses/chat/conv-abc" + + +@patch("functions.agent_invoker.index.lambda_client") +def test_multiple_events_processed_independently(mock_client, lambda_context): + """Batch with one valid and one invalid event returns mixed results.""" + event = _make_multi_event( + [ + {"message": "hello", "sessionId": "s1"}, + {"message": "world"}, # missing sessionId + ] + ) + result = handler(event, lambda_context) + + assert mock_client.invoke.call_count == 1 # only valid event invoked + events = result["events"] + assert "error" not in events[0]["payload"] + assert "error" in events[1]["payload"] diff --git a/appsync-events-lambda-agentcore-cdk/tests/unit/test_stream_relay.py b/appsync-events-lambda-agentcore-cdk/tests/unit/test_stream_relay.py new file mode 100644 index 000000000..0c9aabf78 --- /dev/null +++ b/appsync-events-lambda-agentcore-cdk/tests/unit/test_stream_relay.py @@ -0,0 +1,127 @@ +"""Unit tests for the stream relay Lambda handler.""" + +import json +from unittest.mock import MagicMock, patch + +from functions.stream_relay.index import handler + + +def _make_event(content="hello", channel="/responses/chat/conv-1", event_id="evt-1", session_id="sess-1"): + return { + "content": content, + "channel": channel, + "eventId": event_id, + "sessionId": session_id, + } + + +def _mock_sse_response(lines): + """Create a mock AgentCore response with SSE streaming.""" + encoded_lines = [line.encode("utf-8") for line in lines] + stream = MagicMock() + stream.iter_lines.return_value = encoded_lines + return { + "contentType": "text/event-stream", + "response": stream, + } + + +@patch("functions.stream_relay.index._publish_to_channel") +@patch("functions.stream_relay.index.agentcore_client") +def test_streaming_sse_publishes_chunks(mock_ac, mock_publish, lambda_context): + """SSE stream with data events publishes chunks to AppSync channel.""" + mock_ac.invoke_agent_runtime.return_value = _mock_sse_response( + [ + 'data: {"data": "Hello, "}', + 'data: {"data": "how are you?"}', + ] + ) + + result = handler(_make_event(), lambda_context) + + assert result["status"] == "success" + assert mock_publish.call_count >= 1 + last_call = mock_publish.call_args_list[-1] + completion = last_call[0][1] + assert completion["type"] == "complete" + assert "Hello, " in completion["response"] + assert "how are you?" in completion["response"] + + +@patch("functions.stream_relay.index._publish_to_channel") +@patch("functions.stream_relay.index.agentcore_client") +def test_completion_event_includes_full_response(mock_ac, mock_publish, lambda_context): + """Final publish contains the assembled full response text.""" + mock_ac.invoke_agent_runtime.return_value = _mock_sse_response( + [ + 'data: {"data": "Part one. "}', + 'data: {"data": "Part two."}', + ] + ) + + handler(_make_event(), lambda_context) + + completion = mock_publish.call_args_list[-1][0][1] + assert completion["type"] == "complete" + assert completion["response"] == "Part one. Part two." + assert completion["eventId"] == "evt-1" + + +@patch("functions.stream_relay.index._publish_to_channel") +@patch("functions.stream_relay.index.agentcore_client") +def test_control_events_are_skipped(mock_ac, mock_publish, lambda_context): + """Strands control events should not produce chunk publishes.""" + mock_ac.invoke_agent_runtime.return_value = _mock_sse_response( + [ + 'data: {"init_event_loop": true}', + 'data: {"start": true}', + 'data: {"data": "actual text."}', + 'data: {"complete": true}', + ] + ) + + handler(_make_event(), lambda_context) + + completion = mock_publish.call_args_list[-1][0][1] + assert completion["response"] == "actual text." + for c in mock_publish.call_args_list: + payload = c[0][1] + if payload["type"] == "chunk": + assert "init_event_loop" not in payload["content"] + assert "start" not in payload["content"] + + +@patch("functions.stream_relay.index._publish_to_channel") +@patch("functions.stream_relay.index.agentcore_client") +def test_non_streaming_response_handled(mock_ac, mock_publish, lambda_context): + """Non-SSE response is read and published as completion.""" + stream = MagicMock() + stream.read.return_value = json.dumps({"message": "static reply"}).encode() + mock_ac.invoke_agent_runtime.return_value = { + "contentType": "application/json", + "response": stream, + } + + handler(_make_event(), lambda_context) + + completion = mock_publish.call_args_list[-1][0][1] + assert completion["type"] == "complete" + assert completion["response"] == "static reply" + + +@patch("functions.stream_relay.index._publish_to_channel") +@patch("functions.stream_relay.index.agentcore_client") +def test_chunk_batching_on_punctuation(mock_ac, mock_publish, lambda_context): + """Chunks should flush on sentence-ending punctuation.""" + mock_ac.invoke_agent_runtime.return_value = _mock_sse_response( + [ + 'data: {"data": "Short."}', + 'data: {"data": " More text after."}', + ] + ) + + handler(_make_event(), lambda_context) + + chunk_publishes = [c[0][1] for c in mock_publish.call_args_list if c[0][1]["type"] == "chunk"] + assert len(chunk_publishes) >= 1 + assert chunk_publishes[0]["content"] == "Short."