diff --git a/lambda-durable-investigation-escalation/README.md b/lambda-durable-investigation-escalation/README.md new file mode 100644 index 000000000..e79656855 --- /dev/null +++ b/lambda-durable-investigation-escalation/README.md @@ -0,0 +1,253 @@ +# Investigation Failure Escalation Workflow with AWS Lambda durable functions + +This pattern demonstrates an investigation failure escalation workflow using AWS Lambda durable functions. When a DevOps Agent's investigation fails or times out, the durable function gathers failure context, creates an incident ticket in DynamoDB, pages on-call personnel via SNS, and uses durable function callbacks to wait for human acknowledgment. The function incurs no compute charges while waiting for a response. + +**Important:** Please check the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) for regions currently supported by AWS Lambda durable functions. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/lambda-durable-investigation-escalation + +## Architecture + +The pattern uses two Lambda functions, two DynamoDB tables, an SNS topic, and an API Gateway endpoint: + +- **Escalation_Function** (Durable Lambda) — Orchestrates the entire workflow: validates the event, gathers context, creates an incident, pages on-call personnel, and tracks resolution. Pauses execution (no compute charges) while waiting for acknowledgment. +- **Callback_Handler** (Standard Lambda) — Receives acknowledgment clicks via API Gateway and sends durable execution callbacks to resume the paused Escalation_Function. +- **Incident_Table** (DynamoDB) — Stores incident records with failure context, escalation history, and resolution status. +- **Callback_Table** (DynamoDB) — Maps short UUIDs to durable function callback IDs for clean acknowledgment URLs. +- **Notification_Topic** (SNS) — Sends escalation email notifications to on-call personnel. +- **Escalation_API** (API Gateway) — Provides the `GET /{uuid}` acknowledgment endpoint. + +### Workflow Steps + +1. **gather-context** — The DevOps Agent invokes the Escalation_Function directly via `aws lambda invoke`. The function validates the incoming event and extracts failure context (investigation ID, failure type, service, region, error details, timestamp), generating a unique incident ID. +2. **create-incident** — Writes an incident record to the Incident_Table with status `open` and an empty escalation history. *(Integration Point)* +3. **notify-oncall** — Creates a durable callback with a configurable timeout, stores the UUID-to-callback mapping, and publishes a notification via SNS with an acknowledgment link. The function then pauses, incurring no compute charges while waiting. *(Integration Point)* +4. **resolve-incident** — When an on-call responder clicks the acknowledgment link, the Callback_Handler sends a durable callback, resuming the function. The incident is updated to `status: acknowledged`. *(Integration Point)* +5. **resolve-unacknowledged** — If the timeout expires without acknowledgment, the incident is updated to `status: unacknowledged` and the workflow completes. *(Integration Point)* + +## Key Features + +- ✅ **No Compute Charges During Wait** — Function is suspended while waiting for human response +- ✅ **Configurable Timeout** — Acknowledgment timeout duration configurable via SAM template parameter +- ✅ **Extensible Integration Points** — Incident ticket and notification operations are isolated as helper functions that can be replaced with calls to Jira, ServiceNow, PagerDuty, Opsgenie, or other ITSM tools + +## Prerequisites + +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) >= 1.150.0 (required for `DurableConfig` support) +* Python 3.14 runtime (automatically provided by Lambda) + +## Deployment + +1. Navigate to the pattern directory: + ```bash + cd lambda-durable-investigation-escalation + ``` + +2. Build the SAM application: + ```bash + sam build + ``` + +3. Deploy the application: + ```bash + sam deploy --guided --region us-east-2 + ``` + + During the guided deployment, provide the required values: + - **OncallEmail**: Email address to receive escalation notifications + - **AckTimeout**: Timeout in seconds to wait for acknowledgment (default: 900) + - Allow SAM CLI to create IAM roles when prompted + +4. **Confirm SNS subscription**: Check your email and click the confirmation link from Amazon SNS + +5. Note the `ApiEndpoint` and `EscalationFunctionArn` from the outputs + +## Input Payload Format + +The Escalation_Function expects the following JSON payload when invoked: + +```json +{ + "investigationId": "INV-2025-001", + "failureType": "investigation_failed", + "service": "payment-service", + "region": "us-east-1", + "errorDetails": "Connection timeout after 3 retries to downstream API", + "timestamp": "2025-01-15T10:30:00Z" +} +``` + +| Field | Type | Required | Description | +|---|---|---|---| +| `investigationId` | string | Yes | Unique identifier for the failed investigation | +| `failureType` | string | Yes | One of `investigation_failed` or `investigation_timed_out` | +| `service` | string | Yes | The service that was being investigated | +| `region` | string | Yes | The AWS region where the failure occurred | +| `errorDetails` | string | Yes | Description of the error or failure | +| `timestamp` | string | Yes | ISO 8601 timestamp of the failure | + +If any required field is missing or `failureType` is invalid, the function records a validation failure in the Incident_Table and terminates without sending notifications. + +## Testing + +### Trigger an Escalation + +Invoke the Escalation_Function directly using the Lambda CLI: + +```bash +aws lambda invoke \ + --function-name \ + --invocation-type Event \ + --region us-east-2 \ + --cli-binary-format raw-in-base64-out \ + --payload '{ + "investigationId": "INV-2025-001", + "failureType": "investigation_failed", + "service": "payment-service", + "region": "us-east-1", + "errorDetails": "Connection timeout after 3 retries to downstream API", + "timestamp": "2025-01-15T10:30:00Z" + }' \ + /dev/null +``` + +### Check Your Email + +You will receive an escalation notification email containing: +- Incident ID and investigation details +- Failure type, service, region, and error details +- An **acknowledgment link** + +### Acknowledge the Incident + +Click the acknowledgment link in the email. You will see an HTML confirmation page indicating the incident has been acknowledged. The Escalation_Function resumes and updates the incident record to `acknowledged`. + +If you do not click the link within the timeout (default 15 minutes), the incident is marked `unacknowledged`. + +### Verify in DynamoDB + +```bash +aws dynamodb scan --table-name -incidents --region us-east-2 +``` + +## Integrating with a DevOps Agent + +The Escalation_Function is designed to be invoked programmatically by a DevOps Agent when an investigation fails. The agent calls `lambda:Invoke` with `InvocationType: Event` (async, non-blocking) and passes the payload described above. + +### Example: Python (boto3) + +```python +import boto3 +import json +from datetime import datetime, timezone + +lambda_client = boto3.client('lambda', region_name='us-east-2') + +def trigger_escalation(investigation_id, failure_type, service, region, error_details): + lambda_client.invoke( + FunctionName='', + InvocationType='Event', + Payload=json.dumps({ + 'investigationId': investigation_id, + 'failureType': failure_type, + 'service': service, + 'region': region, + 'errorDetails': str(error_details), + 'timestamp': datetime.now(timezone.utc).isoformat() + }) + ) +``` + +Call this from your agent's error handler: + +```python +try: + result = run_investigation(issue) +except TimeoutError: + trigger_escalation('INV-123', 'investigation_timed_out', 'my-service', 'us-east-1', 'Timed out') +except Exception as e: + trigger_escalation('INV-123', 'investigation_failed', 'my-service', 'us-east-1', str(e)) +``` + +### IAM Permission + +The agent's execution role needs `lambda:InvokeFunction` on the Escalation Function: + +```yaml +- Effect: Allow + Action: lambda:InvokeFunction + Resource: +``` + +## Configuration + +| Parameter | Default | Description | +|---|---|---| +| `AckTimeout` | 900 (15 min) | Seconds to wait for on-call acknowledgment before marking unacknowledged | +| `OncallEmail` | — | Email address subscribed to the SNS notification topic | + +To change values after deployment: + +```bash +sam deploy --parameter-overrides AckTimeout=600 OncallEmail=oncall@example.com --region us-east-2 +``` + +The timeout must be less than the overall `DurableConfig.ExecutionTimeout` of 3600 seconds (1 hour) defined in the SAM template. + +## Extensibility and Integration Points + +The incident ticket and notification operations are isolated into dedicated helper functions in `src/helpers.py`. You can replace any of these functions with calls to external ITSM and alerting services without modifying the core orchestration logic in `lambda_function.py`. + +### Helper Functions + +| Function | Default Behavior | Replacement Example | +|---|---|---| +| `create_incident_ticket(incident)` | DynamoDB `put_item` to Incident_Table | Jira, ServiceNow, Zendesk, or any ITSM platform with a REST API | +| `resolve_incident(incident_id, status, details)` | DynamoDB `update_item` on Incident_Table | Jira issue transition, ServiceNow resolve, or Zendesk ticket update | +| `send_escalation_notification(topic_arn, message)` | SNS `publish` to Notification_Topic | PagerDuty Events API v2, Opsgenie Alert API, Slack, or Microsoft Teams webhook | +| `store_callback_mapping(uuid, callback_id, incident_id, tier, ttl)` | DynamoDB `put_item` to Callback_Table | Typically not replaced | + +### Swapping DynamoDB for an ITSM Tool + +1. Replace `create_incident_ticket()` to call the external API and create a ticket. +2. Replace `resolve_incident()` to transition the external ticket to a resolved state. +3. Add API keys or credentials as environment variables or AWS Secrets Manager references. +4. Update the SAM template IAM policies to allow outbound HTTPS calls if running in a VPC. + +### Swapping SNS for an Alerting Service + +1. Replace `send_escalation_notification()` to call the external API (e.g., PagerDuty, Opsgenie, Slack). +2. Pass the acknowledgment URL in the notification payload so responders can still click to acknowledge. +3. Add the API integration key as an environment variable or Secrets Manager reference. + +## Monitoring + +### CloudWatch Logs + +```bash +aws logs tail /aws/lambda/-EscalationFunction \ + --region us-east-2 --follow + +aws logs tail /aws/lambda/-CallbackHandlerFunction \ + --region us-east-2 --follow +``` + +## Cleanup + +```bash +sam delete --stack-name --region us-east-2 +``` + +## Learn More + +- [Lambda durable functions Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) +- [Durable Execution SDK (Python)](https://github.com/aws/aws-durable-execution-sdk-python) +- [Callback Operations](https://docs.aws.amazon.com/lambda/latest/dg/durable-callback.html) +- [SendDurableExecutionCallbackSuccess API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/send_durable_execution_callback_success.html) + +--- + +Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-durable-investigation-escalation/example-pattern.json b/lambda-durable-investigation-escalation/example-pattern.json new file mode 100644 index 000000000..df8d81152 --- /dev/null +++ b/lambda-durable-investigation-escalation/example-pattern.json @@ -0,0 +1,63 @@ +{ + "title": "AWS Lambda durable functions Investigation Failure Escalation Workflow", + "description": "Tiered investigation failure escalation workflow using Lambda durable functions with callback-based pausing, automatic tier escalation, and human acknowledgment handling", + "language": "Python", + "level": "300", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern implements a tiered escalation workflow using Lambda durable functions. When a DevOps Agent's investigation fails or times out, the Escalation Function gathers failure context and creates an incident ticket in DynamoDB.", + "The workflow pages Tier 1 on-call personnel via SNS and pauses execution using create_callback(), incurring no compute charges while waiting for human acknowledgment.", + "If no acknowledgment is received within a configurable timeout (default 15 minutes), the workflow automatically escalates to Tier 2 by paging the next responder and waiting again.", + "On-call responders acknowledge incidents by clicking a link in the notification, which triggers an API Gateway endpoint that resumes the paused durable function via callback.", + "The incident ticket creation and notification steps are designed as extensible integration points, allowing customers to replace DynamoDB and SNS with external services such as Jira, ServiceNow, PagerDuty, or Opsgenie." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-investigation-escalation-sam", + "templateURL": "serverless-patterns/lambda-durable-investigation-escalation-sam", + "projectFolder": "lambda-durable-investigation-escalation-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Lambda durable functions Documentation", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Durable Execution SDK for Python", + "link": "https://github.com/aws/aws-durable-execution-sdk-python" + } + ] + }, + "deploy": { + "text": [ + "Note: Lambda durable functions have limited regional availability. Please check the AWS documentation for current regional support.", + "sam build", + "sam deploy --guided --region us-east-2" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete --region us-east-2." + ] + }, + "authors": [ + { + "name": "Your name", + "image": "link-to-your-photo.jpg", + "bio": "Your bio.", + "linkedin": "linked-in-ID", + "twitter": "twitter-handle" + } + ] +} diff --git a/lambda-durable-investigation-escalation/src/__pycache__/callback_handler.cpython-314.pyc b/lambda-durable-investigation-escalation/src/__pycache__/callback_handler.cpython-314.pyc new file mode 100644 index 000000000..594ac301a Binary files /dev/null and b/lambda-durable-investigation-escalation/src/__pycache__/callback_handler.cpython-314.pyc differ diff --git a/lambda-durable-investigation-escalation/src/__pycache__/helpers.cpython-314.pyc b/lambda-durable-investigation-escalation/src/__pycache__/helpers.cpython-314.pyc new file mode 100644 index 000000000..3d9fd3b1e Binary files /dev/null and b/lambda-durable-investigation-escalation/src/__pycache__/helpers.cpython-314.pyc differ diff --git a/lambda-durable-investigation-escalation/src/callback_handler.py b/lambda-durable-investigation-escalation/src/callback_handler.py new file mode 100644 index 000000000..d15bac60f --- /dev/null +++ b/lambda-durable-investigation-escalation/src/callback_handler.py @@ -0,0 +1,145 @@ +import json +import os +from datetime import datetime, timezone + +import boto3 + +lambda_client = boto3.client('lambda') +dynamodb = boto3.resource('dynamodb') +table = dynamodb.Table(os.environ['CALLBACK_TABLE_NAME']) + + +def lambda_handler(event, context): + """Handle human acknowledgment via API Gateway GET /{uuid}""" + + print(f"Received event: {json.dumps(event)}") + + # Extract UUID from path parameters + request_uuid = event['pathParameters']['uuid'] + + print(f"Processing acknowledgment for UUID: {request_uuid}") + + # Fetch callback ID from DynamoDB + try: + response = table.get_item(Key={'uuid': request_uuid}) + if 'Item' not in response: + return { + 'statusCode': 404, + 'headers': {'Content-Type': 'text/html; charset=utf-8'}, + 'body': """ + + + + + Not Found + + +

Request Not Found

+

This acknowledgment link is invalid or has expired.

+ + +""" + } + + callback_id = response['Item']['callbackId'] + incident_id = response['Item'].get('incidentId', 'Unknown') + tier = response['Item'].get('tier', 'Unknown') + print(f"Found callback ID: {callback_id} for incident: {incident_id}, tier: {tier}") + + except Exception as e: + print(f"Error fetching from DynamoDB: {str(e)}") + return { + 'statusCode': 500, + 'headers': {'Content-Type': 'text/html; charset=utf-8'}, + 'body': f""" + + + + + Error + + +

Error

+

{str(e)}

+ + +""" + } + + # Prepare callback result with acknowledgment and timestamp + result = { + 'acknowledged': True, + 'timestamp': datetime.now(timezone.utc).isoformat() + } + + try: + # Send callback using Lambda API + result_json = json.dumps(result) + + response = lambda_client.send_durable_execution_callback_success( + CallbackId=callback_id, + Result=result_json + ) + + print(f"Callback sent successfully: {response}") + + # Return HTML confirmation page + return { + 'statusCode': 200, + 'headers': {'Content-Type': 'text/html; charset=utf-8'}, + 'body': f""" + + + + + Incident Acknowledged + + + +
+
+

Incident Acknowledged!

+

The escalation workflow has been notified and will update the incident record.

+
+ + +""" + } + + except Exception as e: + print(f"Error sending callback: {str(e)}") + return { + 'statusCode': 500, + 'headers': {'Content-Type': 'text/html; charset=utf-8'}, + 'body': f""" + + + + + Error + + +

Error Processing Acknowledgment

+

{str(e)}

+ + +""" + } diff --git a/lambda-durable-investigation-escalation/src/helpers.py b/lambda-durable-investigation-escalation/src/helpers.py new file mode 100644 index 000000000..4f8a1e6cd --- /dev/null +++ b/lambda-durable-investigation-escalation/src/helpers.py @@ -0,0 +1,112 @@ +""" +Integration helper functions for the Investigation Escalation workflow. + +These functions serve as extensible integration points. Each function isolates +a specific external interaction (DynamoDB write, SNS publish) so that customers +can replace the default implementation with calls to third-party services +(e.g., Jira, ServiceNow, PagerDuty) without modifying the core orchestration logic. +""" + +import os +import boto3 + +# Module-level clients initialized from environment variables +dynamodb = boto3.resource('dynamodb') +sns = boto3.client('sns') + +incident_table = dynamodb.Table(os.environ.get('INCIDENT_TABLE_NAME', '')) +callback_table = dynamodb.Table(os.environ.get('CALLBACK_TABLE_NAME', '')) + + +def create_incident_ticket(incident): + """ + Create an incident record in the Incident_Table. + + This is an Integration Point — replace this function to create tickets + in an external system such as Jira or ServiceNow instead of DynamoDB. + + Args: + incident (dict): The full incident record containing incidentId, + investigationId, failureType, service, region, errorDetails, + timestamp, status, escalationHistory, createdAt, and ttl. + """ + incident_table.put_item(Item=incident) + + +def resolve_incident(incident_id, status, details): + """ + Update the incident record with its final resolution status. + + This is an Integration Point — replace this function to transition tickets + in an external system such as Jira or ServiceNow instead of DynamoDB. + + Args: + incident_id (str): The incident's partition key. + status (str): The resolution status ('acknowledged' or 'unacknowledged'). + details (dict): Additional resolution details. For acknowledged incidents, + should contain 'acknowledgedAt' (ISO 8601 timestamp) and + 'acknowledgedTier' (int). For unacknowledged incidents, may be empty + or contain supplementary info. + """ + update_expr = 'SET #s = :status' + expr_names = {'#s': 'status'} + expr_values = {':status': status} + + if details.get('acknowledgedAt'): + update_expr += ', acknowledgedAt = :ack_at' + expr_values[':ack_at'] = details['acknowledgedAt'] + + if details.get('acknowledgedTier'): + update_expr += ', acknowledgedTier = :ack_tier' + expr_values[':ack_tier'] = details['acknowledgedTier'] + + incident_table.update_item( + Key={'incidentId': incident_id}, + UpdateExpression=update_expr, + ExpressionAttributeNames=expr_names, + ExpressionAttributeValues=expr_values, + ) + + +def send_escalation_notification(topic_arn, message): + """ + Publish an escalation notification to the Notification_Topic. + + This is an Integration Point — replace this function to send notifications + via an external service such as PagerDuty or Opsgenie instead of SNS. + + Args: + topic_arn (str): The ARN of the SNS topic to publish to. + message (str): The formatted notification message body. + """ + sns.publish( + TopicArn=topic_arn, + Subject='Investigation Escalation Alert', + Message=message, + ) + + +def store_callback_mapping(uuid, callback_id, incident_id, tier, ttl): + """ + Store a UUID-to-callback-ID mapping in the Callback_Table. + + This mapping enables clean acknowledgment URLs. When an on-call responder + clicks the link, the Callback_Handler uses this mapping to look up the + durable function callback ID. + + Args: + uuid (str): The short UUID used in the acknowledgment URL. + callback_id (str): The durable function callback ID. + incident_id (str): The associated incident ID. + tier (int): The escalation tier (1 or 2). + ttl (int): TTL in epoch seconds for automatic cleanup. + """ + callback_table.put_item( + Item={ + 'uuid': uuid, + 'callbackId': callback_id, + 'incidentId': incident_id, + 'tier': tier, + 'ttl': ttl, + } + ) diff --git a/lambda-durable-investigation-escalation/src/lambda_function.py b/lambda-durable-investigation-escalation/src/lambda_function.py new file mode 100644 index 000000000..e14ebb73f --- /dev/null +++ b/lambda-durable-investigation-escalation/src/lambda_function.py @@ -0,0 +1,269 @@ +""" +Durable execution handler for the Investigation Failure Escalation workflow. + +This Lambda function orchestrates an escalation workflow using the +aws_durable_execution_sdk_python SDK. When a DevOps Agent's investigation +fails or times out, this function: + +1. Validates the incoming event +2. Gathers failure context and generates an incident ID +3. Creates an incident ticket (Integration Point) +4. Pages on-call personnel and waits for acknowledgment +5. Tracks final resolution (acknowledged or unacknowledged) + +Each major step is checkpointed via context.step() so the workflow +survives Lambda restarts and retries automatically. +""" + +import json +import os +import uuid +import time +from datetime import datetime, timezone + +from aws_durable_execution_sdk_python import DurableContext, durable_execution +from aws_durable_execution_sdk_python.config import CallbackConfig, Duration +from aws_durable_execution_sdk_python.exceptions import CallbackError +import boto3 + +import helpers + +# Valid failure types for the Investigation_Event +VALID_FAILURE_TYPES = {'investigation_failed', 'investigation_timed_out'} + +# Required fields in the Investigation_Event payload +REQUIRED_FIELDS = [ + 'investigationId', + 'failureType', + 'service', + 'region', + 'errorDetails', + 'timestamp', +] + + +def validate_event(event): + """ + Validate the Investigation_Event payload. + + Checks that all required fields are present and that failureType + is one of the allowed enum values. + """ + missing = [f for f in REQUIRED_FIELDS if f not in event or event[f] is None] + if missing: + return False, f"Missing required fields: {', '.join(missing)}" + + if event.get('failureType') not in VALID_FAILURE_TYPES: + return False, ( + f"Invalid failureType: '{event.get('failureType')}'. " + f"Must be one of: {', '.join(sorted(VALID_FAILURE_TYPES))}" + ) + + return True, None + + +def build_context_summary(event): + """ + Build a context summary from a validated Investigation_Event. + + Extracts all required fields and generates a new incidentId (UUID v4). + """ + return { + 'incidentId': str(uuid.uuid4()), + 'investigationId': event['investigationId'], + 'failureType': event['failureType'], + 'service': event['service'], + 'region': event['region'], + 'errorDetails': event['errorDetails'], + 'timestamp': event['timestamp'], + } + + +def build_incident_record(context_summary): + """ + Build a full incident record from a context summary. + + Adds default fields: status=open, empty escalationHistory, + createdAt timestamp, and TTL set to 7 days from creation. + """ + now = datetime.now(timezone.utc) + created_at = now.isoformat() + ttl = int(now.timestamp()) + 604800 # 7 days in seconds + + return { + **context_summary, + 'status': 'open', + 'escalationHistory': [], + 'createdAt': created_at, + 'ttl': ttl, + } + + +def build_notification_message(incident_data, ack_url): + """ + Build a notification message for escalation alerts. + """ + return ( + f"INVESTIGATION ESCALATION ALERT\n" + f"\n" + f"Incident ID: {incident_data['incidentId']}\n" + f"Investigation ID: {incident_data['investigationId']}\n" + f"Failure Type: {incident_data['failureType']}\n" + f"Service: {incident_data['service']}\n" + f"Region: {incident_data['region']}\n" + f"Error Details: {incident_data['errorDetails']}\n" + f"\n" + f"Acknowledge this incident:\n" + f"{ack_url}\n" + ) + + +def build_final_result(incident_id, investigation_id, status, escalation_history): + """ + Build the final result object returned by the Escalation_Function. + """ + return { + 'incidentId': incident_id, + 'investigationId': investigation_id, + 'status': status, + 'escalationHistory': escalation_history, + } + + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Durable execution handler for the investigation failure escalation workflow. + + Gathers context, creates an incident, pages on-call personnel, + waits for acknowledgment, and tracks the final resolution. + """ + # --- Input Validation --- + is_valid, error_message = validate_event(event) + if not is_valid: + now = datetime.now(timezone.utc) + validation_record = { + 'incidentId': str(uuid.uuid4()), + 'investigationId': event.get('investigationId', 'unknown'), + 'failureType': event.get('failureType', 'unknown'), + 'service': event.get('service', 'unknown'), + 'region': event.get('region', 'unknown'), + 'errorDetails': error_message, + 'timestamp': event.get('timestamp', now.isoformat()), + 'status': 'validation_failed', + 'escalationHistory': [], + 'createdAt': now.isoformat(), + 'ttl': int(now.timestamp()) + 604800, + } + helpers.create_incident_ticket(validation_record) + print(f"Validation failed: {error_message}") + return {'error': error_message, 'status': 'validation_failed'} + + # --- Step 1: Gather Context --- + def gather_context(_): + summary = build_context_summary(event) + print(f"Context gathered — incidentId: {summary['incidentId']}") + return summary + + context_summary = context.step(gather_context, name='gather-context') + + incident_id = context_summary['incidentId'] + investigation_id = context_summary['investigationId'] + + # --- Step 2: Create Incident --- + def create_incident(_): + record = build_incident_record(context_summary) + helpers.create_incident_ticket(record) + print(f"Incident created: {incident_id}") + return record + + incident_record = context.step(create_incident, name='create-incident') + escalation_history = incident_record.get('escalationHistory', []) + + # --- Read API Gateway URL from SSM --- + param_name = os.environ.get('API_GATEWAY_PARAM') + if not param_name: + raise ValueError("API_GATEWAY_PARAM environment variable is not set") + + ssm = boto3.client('ssm') + response = ssm.get_parameter(Name=param_name) + api_base_url = response['Parameter']['Value'] + print(f"API Base URL: {api_base_url}") + + # --- Read configurable timeout --- + ack_timeout = int(os.environ.get('ACK_TIMEOUT', '900')) + + # Callback table entry TTL (1 hour) + callback_ttl = int(time.time()) + 3600 + + topic_arn = os.environ['NOTIFICATION_TOPIC_ARN'] + + # --- Create Callback, Store Mapping, Notify On-Call --- + callback = context.create_callback( + name='wait-for-ack', + config=CallbackConfig(timeout=Duration.from_seconds(ack_timeout)), + ) + + callback_uuid = str(uuid.uuid4()) + helpers.store_callback_mapping( + uuid=callback_uuid, + callback_id=callback.callback_id, + incident_id=incident_id, + tier=1, + ttl=callback_ttl, + ) + + def notify_oncall(_): + ack_url = f"{api_base_url}/{callback_uuid}" + message = build_notification_message(context_summary, ack_url) + helpers.send_escalation_notification(topic_arn, message) + print(f"On-call notification sent — ack URL: {ack_url}") + + context.step(notify_oncall, name='notify-oncall') + + # --- Pause: Wait for Acknowledgment --- + try: + result = callback.result() + + # Acknowledged + if isinstance(result, str): + ack_data = json.loads(result) + else: + ack_data = result + + ack_timestamp = ack_data.get('timestamp', datetime.now(timezone.utc).isoformat()) + + def resolve_acknowledged(_): + helpers.resolve_incident(incident_id, 'acknowledged', { + 'acknowledgedAt': ack_timestamp, + }) + escalation_history.append({ + 'action': 'acknowledged', + 'timestamp': ack_timestamp, + }) + print(f"Incident {incident_id} acknowledged") + + context.step(resolve_acknowledged, name='resolve-incident') + + return build_final_result( + incident_id, investigation_id, 'acknowledged', escalation_history + ) + + except CallbackError: + print(f"Callback timed out — resolving as unacknowledged") + + # --- Timeout → Resolve Unacknowledged --- + def resolve_unacknowledged(_): + timeout_timestamp = datetime.now(timezone.utc).isoformat() + helpers.resolve_incident(incident_id, 'unacknowledged', {}) + escalation_history.append({ + 'action': 'timeout', + 'timestamp': timeout_timestamp, + }) + print(f"Incident {incident_id} unacknowledged — timeout expired") + + context.step(resolve_unacknowledged, name='resolve-unacknowledged') + + return build_final_result( + incident_id, investigation_id, 'unacknowledged', escalation_history + ) diff --git a/lambda-durable-investigation-escalation/src/requirements.txt b/lambda-durable-investigation-escalation/src/requirements.txt new file mode 100644 index 000000000..8f4105d97 --- /dev/null +++ b/lambda-durable-investigation-escalation/src/requirements.txt @@ -0,0 +1 @@ +boto3>=1.42.1 diff --git a/lambda-durable-investigation-escalation/template.yaml b/lambda-durable-investigation-escalation/template.yaml new file mode 100644 index 000000000..5086a2849 --- /dev/null +++ b/lambda-durable-investigation-escalation/template.yaml @@ -0,0 +1,177 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: Investigation Failure Escalation Pattern using AWS Lambda durable functions + +Parameters: + OncallEmail: + Type: String + Description: Email address to receive escalation notifications + AckTimeout: + Type: Number + Default: 900 + Description: Timeout in seconds to wait for on-call acknowledgment before marking unacknowledged + +Resources: + # DynamoDB Table for Incident Records + IncidentTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub '${AWS::StackName}-incidents' + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: incidentId + AttributeType: S + KeySchema: + - AttributeName: incidentId + KeyType: HASH + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + + # DynamoDB Table for Callback Mapping + CallbackTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub '${AWS::StackName}-callbacks' + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: uuid + AttributeType: S + KeySchema: + - AttributeName: uuid + KeyType: HASH + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + + # SNS Topic for Escalation Notifications + NotificationTopic: + Type: AWS::SNS::Topic + Properties: + Subscription: + - Protocol: email + Endpoint: !Ref OncallEmail + + # Durable Escalation Function + EscalationFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub '${AWS::StackName}-EscalationFunction' + CodeUri: src/ + Handler: lambda_function.lambda_handler + Runtime: python3.14 + Timeout: 120 + AutoPublishAlias: live + DurableConfig: + ExecutionTimeout: 3600 + RetentionPeriodInDays: 7 + Environment: + Variables: + INCIDENT_TABLE_NAME: !Ref IncidentTable + CALLBACK_TABLE_NAME: !Ref CallbackTable + NOTIFICATION_TOPIC_ARN: !Ref NotificationTopic + API_GATEWAY_PARAM: !Sub '/${AWS::StackName}/api-gateway-url' + ACK_TIMEOUT: !Ref AckTimeout + Policies: + - DynamoDBWritePolicy: + TableName: !Ref IncidentTable + - DynamoDBWritePolicy: + TableName: !Ref CallbackTable + - SNSPublishMessagePolicy: + TopicName: !GetAtt NotificationTopic.TopicName + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecution + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-EscalationFunction-*' + - Effect: Allow + Action: + - ssm:GetParameter + Resource: !Sub 'arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${AWS::StackName}/api-gateway-url' + + # Callback Handler Function + CallbackHandlerFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub '${AWS::StackName}-CallbackHandlerFunction' + CodeUri: src/ + Handler: callback_handler.lambda_handler + Runtime: python3.14 + Timeout: 30 + Environment: + Variables: + CALLBACK_TABLE_NAME: !Ref CallbackTable + Policies: + - DynamoDBReadPolicy: + TableName: !Ref CallbackTable + - Statement: + - Effect: Allow + Action: + - dynamodb:GetItem + Resource: !GetAtt CallbackTable.Arn + - Effect: Allow + Action: + - lambda:SendDurableExecutionCallbackSuccess + Resource: '*' + + # Lambda Permission for API Gateway — EscalationFunction (alias) + EscalationFunctionInvokePermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !Ref EscalationFunction.Alias + Action: lambda:InvokeFunction + Principal: apigateway.amazonaws.com + SourceArn: !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${EscalationApi}/*' + + # Lambda Permission for API Gateway — CallbackHandlerFunction + CallbackHandlerInvokePermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !Ref CallbackHandlerFunction + Action: lambda:InvokeFunction + Principal: apigateway.amazonaws.com + SourceArn: !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${EscalationApi}/*/*/{uuid}' + + # API Gateway for Acknowledgment Endpoint + EscalationApi: + Type: AWS::Serverless::Api + Properties: + StageName: prod + DefinitionBody: + openapi: 3.0.1 + paths: + /{uuid}: + get: + parameters: + - name: uuid + in: path + required: true + schema: + type: string + responses: + '200': + description: Callback processed + x-amazon-apigateway-integration: + type: aws_proxy + httpMethod: POST + uri: !Sub 'arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${CallbackHandlerFunction.Arn}/invocations' + passthroughBehavior: when_no_match + + # Store API Gateway URL in Parameter Store + ApiGatewayUrlParameter: + Type: AWS::SSM::Parameter + Properties: + Name: !Sub '/${AWS::StackName}/api-gateway-url' + Type: String + Value: !Sub 'https://${EscalationApi}.execute-api.${AWS::Region}.amazonaws.com/prod' + Description: 'API Gateway URL for the escalation workflow' + DependsOn: EscalationApi + +Outputs: + ApiEndpoint: + Description: API Gateway endpoint URL + Value: !Sub 'https://${EscalationApi}.execute-api.${AWS::Region}.amazonaws.com/prod' + + EscalationFunctionArn: + Description: Escalation Durable Function ARN + Value: !GetAtt EscalationFunction.Arn