From e7b0e3f5a10c017139d3e904fe16272366a4dd76 Mon Sep 17 00:00:00 2001 From: Ridhima Bhalerao Date: Thu, 30 Apr 2026 21:47:47 -0500 Subject: [PATCH] Added new feature to investigate failed/timed out DevOps Agent investigation using Lambda Durable Function --- .../README.md | 253 ++++++++++++++++ .../example-pattern.json | 63 ++++ .../callback_handler.cpython-314.pyc | Bin 0 -> 4777 bytes .../src/__pycache__/helpers.cpython-314.pyc | Bin 0 -> 5482 bytes .../src/callback_handler.py | 145 ++++++++++ .../src/helpers.py | 112 ++++++++ .../src/lambda_function.py | 269 ++++++++++++++++++ .../src/requirements.txt | 1 + .../template.yaml | 177 ++++++++++++ 9 files changed, 1020 insertions(+) create mode 100644 lambda-durable-investigation-escalation/README.md create mode 100644 lambda-durable-investigation-escalation/example-pattern.json create mode 100644 lambda-durable-investigation-escalation/src/__pycache__/callback_handler.cpython-314.pyc create mode 100644 lambda-durable-investigation-escalation/src/__pycache__/helpers.cpython-314.pyc create mode 100644 lambda-durable-investigation-escalation/src/callback_handler.py create mode 100644 lambda-durable-investigation-escalation/src/helpers.py create mode 100644 lambda-durable-investigation-escalation/src/lambda_function.py create mode 100644 lambda-durable-investigation-escalation/src/requirements.txt create mode 100644 lambda-durable-investigation-escalation/template.yaml diff --git a/lambda-durable-investigation-escalation/README.md b/lambda-durable-investigation-escalation/README.md new file mode 100644 index 000000000..e79656855 --- /dev/null +++ b/lambda-durable-investigation-escalation/README.md @@ -0,0 +1,253 @@ +# Investigation Failure Escalation Workflow with AWS Lambda durable functions + +This pattern demonstrates an investigation failure escalation workflow using AWS Lambda durable functions. When a DevOps Agent's investigation fails or times out, the durable function gathers failure context, creates an incident ticket in DynamoDB, pages on-call personnel via SNS, and uses durable function callbacks to wait for human acknowledgment. The function incurs no compute charges while waiting for a response. + +**Important:** Please check the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) for regions currently supported by AWS Lambda durable functions. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/lambda-durable-investigation-escalation + +## Architecture + +The pattern uses two Lambda functions, two DynamoDB tables, an SNS topic, and an API Gateway endpoint: + +- **Escalation_Function** (Durable Lambda) — Orchestrates the entire workflow: validates the event, gathers context, creates an incident, pages on-call personnel, and tracks resolution. Pauses execution (no compute charges) while waiting for acknowledgment. +- **Callback_Handler** (Standard Lambda) — Receives acknowledgment clicks via API Gateway and sends durable execution callbacks to resume the paused Escalation_Function. +- **Incident_Table** (DynamoDB) — Stores incident records with failure context, escalation history, and resolution status. +- **Callback_Table** (DynamoDB) — Maps short UUIDs to durable function callback IDs for clean acknowledgment URLs. +- **Notification_Topic** (SNS) — Sends escalation email notifications to on-call personnel. +- **Escalation_API** (API Gateway) — Provides the `GET /{uuid}` acknowledgment endpoint. + +### Workflow Steps + +1. **gather-context** — The DevOps Agent invokes the Escalation_Function directly via `aws lambda invoke`. The function validates the incoming event and extracts failure context (investigation ID, failure type, service, region, error details, timestamp), generating a unique incident ID. +2. **create-incident** — Writes an incident record to the Incident_Table with status `open` and an empty escalation history. *(Integration Point)* +3. **notify-oncall** — Creates a durable callback with a configurable timeout, stores the UUID-to-callback mapping, and publishes a notification via SNS with an acknowledgment link. The function then pauses, incurring no compute charges while waiting. *(Integration Point)* +4. **resolve-incident** — When an on-call responder clicks the acknowledgment link, the Callback_Handler sends a durable callback, resuming the function. The incident is updated to `status: acknowledged`. *(Integration Point)* +5. **resolve-unacknowledged** — If the timeout expires without acknowledgment, the incident is updated to `status: unacknowledged` and the workflow completes. *(Integration Point)* + +## Key Features + +- ✅ **No Compute Charges During Wait** — Function is suspended while waiting for human response +- ✅ **Configurable Timeout** — Acknowledgment timeout duration configurable via SAM template parameter +- ✅ **Extensible Integration Points** — Incident ticket and notification operations are isolated as helper functions that can be replaced with calls to Jira, ServiceNow, PagerDuty, Opsgenie, or other ITSM tools + +## Prerequisites + +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) >= 1.150.0 (required for `DurableConfig` support) +* Python 3.14 runtime (automatically provided by Lambda) + +## Deployment + +1. Navigate to the pattern directory: + ```bash + cd lambda-durable-investigation-escalation + ``` + +2. Build the SAM application: + ```bash + sam build + ``` + +3. Deploy the application: + ```bash + sam deploy --guided --region us-east-2 + ``` + + During the guided deployment, provide the required values: + - **OncallEmail**: Email address to receive escalation notifications + - **AckTimeout**: Timeout in seconds to wait for acknowledgment (default: 900) + - Allow SAM CLI to create IAM roles when prompted + +4. **Confirm SNS subscription**: Check your email and click the confirmation link from Amazon SNS + +5. Note the `ApiEndpoint` and `EscalationFunctionArn` from the outputs + +## Input Payload Format + +The Escalation_Function expects the following JSON payload when invoked: + +```json +{ + "investigationId": "INV-2025-001", + "failureType": "investigation_failed", + "service": "payment-service", + "region": "us-east-1", + "errorDetails": "Connection timeout after 3 retries to downstream API", + "timestamp": "2025-01-15T10:30:00Z" +} +``` + +| Field | Type | Required | Description | +|---|---|---|---| +| `investigationId` | string | Yes | Unique identifier for the failed investigation | +| `failureType` | string | Yes | One of `investigation_failed` or `investigation_timed_out` | +| `service` | string | Yes | The service that was being investigated | +| `region` | string | Yes | The AWS region where the failure occurred | +| `errorDetails` | string | Yes | Description of the error or failure | +| `timestamp` | string | Yes | ISO 8601 timestamp of the failure | + +If any required field is missing or `failureType` is invalid, the function records a validation failure in the Incident_Table and terminates without sending notifications. + +## Testing + +### Trigger an Escalation + +Invoke the Escalation_Function directly using the Lambda CLI: + +```bash +aws lambda invoke \ + --function-name \ + --invocation-type Event \ + --region us-east-2 \ + --cli-binary-format raw-in-base64-out \ + --payload '{ + "investigationId": "INV-2025-001", + "failureType": "investigation_failed", + "service": "payment-service", + "region": "us-east-1", + "errorDetails": "Connection timeout after 3 retries to downstream API", + "timestamp": "2025-01-15T10:30:00Z" + }' \ + /dev/null +``` + +### Check Your Email + +You will receive an escalation notification email containing: +- Incident ID and investigation details +- Failure type, service, region, and error details +- An **acknowledgment link** + +### Acknowledge the Incident + +Click the acknowledgment link in the email. You will see an HTML confirmation page indicating the incident has been acknowledged. The Escalation_Function resumes and updates the incident record to `acknowledged`. + +If you do not click the link within the timeout (default 15 minutes), the incident is marked `unacknowledged`. + +### Verify in DynamoDB + +```bash +aws dynamodb scan --table-name -incidents --region us-east-2 +``` + +## Integrating with a DevOps Agent + +The Escalation_Function is designed to be invoked programmatically by a DevOps Agent when an investigation fails. The agent calls `lambda:Invoke` with `InvocationType: Event` (async, non-blocking) and passes the payload described above. + +### Example: Python (boto3) + +```python +import boto3 +import json +from datetime import datetime, timezone + +lambda_client = boto3.client('lambda', region_name='us-east-2') + +def trigger_escalation(investigation_id, failure_type, service, region, error_details): + lambda_client.invoke( + FunctionName='', + InvocationType='Event', + Payload=json.dumps({ + 'investigationId': investigation_id, + 'failureType': failure_type, + 'service': service, + 'region': region, + 'errorDetails': str(error_details), + 'timestamp': datetime.now(timezone.utc).isoformat() + }) + ) +``` + +Call this from your agent's error handler: + +```python +try: + result = run_investigation(issue) +except TimeoutError: + trigger_escalation('INV-123', 'investigation_timed_out', 'my-service', 'us-east-1', 'Timed out') +except Exception as e: + trigger_escalation('INV-123', 'investigation_failed', 'my-service', 'us-east-1', str(e)) +``` + +### IAM Permission + +The agent's execution role needs `lambda:InvokeFunction` on the Escalation Function: + +```yaml +- Effect: Allow + Action: lambda:InvokeFunction + Resource: +``` + +## Configuration + +| Parameter | Default | Description | +|---|---|---| +| `AckTimeout` | 900 (15 min) | Seconds to wait for on-call acknowledgment before marking unacknowledged | +| `OncallEmail` | — | Email address subscribed to the SNS notification topic | + +To change values after deployment: + +```bash +sam deploy --parameter-overrides AckTimeout=600 OncallEmail=oncall@example.com --region us-east-2 +``` + +The timeout must be less than the overall `DurableConfig.ExecutionTimeout` of 3600 seconds (1 hour) defined in the SAM template. + +## Extensibility and Integration Points + +The incident ticket and notification operations are isolated into dedicated helper functions in `src/helpers.py`. You can replace any of these functions with calls to external ITSM and alerting services without modifying the core orchestration logic in `lambda_function.py`. + +### Helper Functions + +| Function | Default Behavior | Replacement Example | +|---|---|---| +| `create_incident_ticket(incident)` | DynamoDB `put_item` to Incident_Table | Jira, ServiceNow, Zendesk, or any ITSM platform with a REST API | +| `resolve_incident(incident_id, status, details)` | DynamoDB `update_item` on Incident_Table | Jira issue transition, ServiceNow resolve, or Zendesk ticket update | +| `send_escalation_notification(topic_arn, message)` | SNS `publish` to Notification_Topic | PagerDuty Events API v2, Opsgenie Alert API, Slack, or Microsoft Teams webhook | +| `store_callback_mapping(uuid, callback_id, incident_id, tier, ttl)` | DynamoDB `put_item` to Callback_Table | Typically not replaced | + +### Swapping DynamoDB for an ITSM Tool + +1. Replace `create_incident_ticket()` to call the external API and create a ticket. +2. Replace `resolve_incident()` to transition the external ticket to a resolved state. +3. Add API keys or credentials as environment variables or AWS Secrets Manager references. +4. Update the SAM template IAM policies to allow outbound HTTPS calls if running in a VPC. + +### Swapping SNS for an Alerting Service + +1. Replace `send_escalation_notification()` to call the external API (e.g., PagerDuty, Opsgenie, Slack). +2. Pass the acknowledgment URL in the notification payload so responders can still click to acknowledge. +3. Add the API integration key as an environment variable or Secrets Manager reference. + +## Monitoring + +### CloudWatch Logs + +```bash +aws logs tail /aws/lambda/-EscalationFunction \ + --region us-east-2 --follow + +aws logs tail /aws/lambda/-CallbackHandlerFunction \ + --region us-east-2 --follow +``` + +## Cleanup + +```bash +sam delete --stack-name --region us-east-2 +``` + +## Learn More + +- [Lambda durable functions Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) +- [Durable Execution SDK (Python)](https://github.com/aws/aws-durable-execution-sdk-python) +- [Callback Operations](https://docs.aws.amazon.com/lambda/latest/dg/durable-callback.html) +- [SendDurableExecutionCallbackSuccess API](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/send_durable_execution_callback_success.html) + +--- + +Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-durable-investigation-escalation/example-pattern.json b/lambda-durable-investigation-escalation/example-pattern.json new file mode 100644 index 000000000..df8d81152 --- /dev/null +++ b/lambda-durable-investigation-escalation/example-pattern.json @@ -0,0 +1,63 @@ +{ + "title": "AWS Lambda durable functions Investigation Failure Escalation Workflow", + "description": "Tiered investigation failure escalation workflow using Lambda durable functions with callback-based pausing, automatic tier escalation, and human acknowledgment handling", + "language": "Python", + "level": "300", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern implements a tiered escalation workflow using Lambda durable functions. When a DevOps Agent's investigation fails or times out, the Escalation Function gathers failure context and creates an incident ticket in DynamoDB.", + "The workflow pages Tier 1 on-call personnel via SNS and pauses execution using create_callback(), incurring no compute charges while waiting for human acknowledgment.", + "If no acknowledgment is received within a configurable timeout (default 15 minutes), the workflow automatically escalates to Tier 2 by paging the next responder and waiting again.", + "On-call responders acknowledge incidents by clicking a link in the notification, which triggers an API Gateway endpoint that resumes the paused durable function via callback.", + "The incident ticket creation and notification steps are designed as extensible integration points, allowing customers to replace DynamoDB and SNS with external services such as Jira, ServiceNow, PagerDuty, or Opsgenie." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-investigation-escalation-sam", + "templateURL": "serverless-patterns/lambda-durable-investigation-escalation-sam", + "projectFolder": "lambda-durable-investigation-escalation-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Lambda durable functions Documentation", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Durable Execution SDK for Python", + "link": "https://github.com/aws/aws-durable-execution-sdk-python" + } + ] + }, + "deploy": { + "text": [ + "Note: Lambda durable functions have limited regional availability. Please check the AWS documentation for current regional support.", + "sam build", + "sam deploy --guided --region us-east-2" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete --region us-east-2." + ] + }, + "authors": [ + { + "name": "Your name", + "image": "link-to-your-photo.jpg", + "bio": "Your bio.", + "linkedin": "linked-in-ID", + "twitter": "twitter-handle" + } + ] +} diff --git a/lambda-durable-investigation-escalation/src/__pycache__/callback_handler.cpython-314.pyc b/lambda-durable-investigation-escalation/src/__pycache__/callback_handler.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..594ac301a8b41894f93fea47ea09b7f298e36d7a GIT binary patch literal 4777 zcmdT|TTC0-89w&-GQMEE7z`vNIY7ihf-w-15HQ{)Tmnrt>y8|hP06CcbHJ0>Gt-$d z7iS~!n6?kQ8?6*nRVtM#m3H@m$9?RoyV*QC0u(g6QX_3%`erFqT6JIgAKQa%f|N?l zLpv7d`k(*&*YBLeSznDC!T0sIc`@ck=m**{KYOY1@(DE7(1(b#Oe4KmTXYNbt<%;i zt8SfQbmmlX+@{-Z+hPTtCYxg2ia3U|arTSWVqbRvov)gP_JGb&MaSM|V}V!cZd+pY zua0ug?qlWA0ZXy1dpNcc>0Zv#j=Eg`po7=DtOhIaI?=^6F<4q=Wr-M$6rY|K_^d0V zg$S?dq5dw5;Zez=C`)q^Pici!sAXZ^2#k$hyLNeW?CM;6^zyZdx$C3vO(aU{6*i`3 zU=Xssdu>CbTM0jwyJ1FGsqO)i~6l*`xK0G)=CvQrn0A4EB3I>SgY3_ zN3#xdo0!XNm1~Zfv4$<+qLQ2n!#QS{OVATM2(=*2$+4V^b8{Y`@^Up{rX9@yO{D~! zud=7u4$sGc)8UV+<@^IBdnoq*XvJFAfc}oM?YLvyajs<+ z!o2v!>daQTZbh>PVHFFO$IX_>E!J#~l~Gi+5@u=Q&^9A`~lifi^DXk#bJ{LEI| z7I$$c%*0Wc3Vlz;nQ9+%&JUkd_~;b%5k|9Kla~Vtm#8llX9bsr*;;cx?lh%3%$EyR z@NGPL1=rNigqe)>1mc<%=R3~!QY~6UG|ZepqUE)+;G;QL<^aTDr7W#SuXw#IU2WNm zQ@kXk2u^0wyoCA0T}fU}5n(Y+Bpok_JRXfr<0}9Y%ls<7G7-n&dzp+V+|T+sk|5#| z5inT-+97QC6kbopcoi^0C#q)HC}H+=OqCNv(?n_UF#82r#W!wDj}Kvh80IQjHEh#5 zNgHlW=k<&>CJRLT43f`qCJ8S9%dB@ymUP%H6kkv&SmNJ1oB9B@a%R3zX=H^Q2}nj#}@3m{x*flrI6)ge5piaf}b=2t??qM#>-@P*!9 zWrdEYi=s4y&qELM8C@R63NHv?pCQ~|prC4n_>{N^R0+_QsKcf_$v$(lgq)JqA>1w` z&Yd4P4+I?fWrk=vt`IiaS5iZX#*?CU*#0R|x{G00l$KyO0fRe|yoSk&BC15_iG&qU zFibV1bn$7 zV7oD;hStdmRfV|)q9>9xR2Eb@jmHZI&G_XZEbfCZvp>EOZx(j?U-o24zzXkW8;*3w zb;DcESR%v?7d^%yd(sM=?(VU&ryLy7GATXVVvb=A+=jJGg68HzCPhP*{|lVbZ^XpY zrMSSO2Y^D#$x}$WLe3P!llBXFbgz()v`fEN)2SP4yrhLRqKXT{RkW|9*`e9PDOWXJ zoqknQN2KDx{QZMgPsxt>9yZgme~`ZjX^AU{61*zlY8B^Y6>x0`_W=}WGR?tuK@h4{ zE7F8iUJx_d5FP{!IoOz*=|K7c3LaHcG))&(3S0mb3zz8c5P`fB(vrL&FAw2f+z(O= zz@NG}&v*8A7k)i`UF9WZWdMinm5?l?%L=R>F8WIovjlLAq^ndH6S4#g3LY&|3{Z+9 z6o%$yU6<1)`V}{cgi8TZg|sC&6T~H)Nb#CB(ngJ10!M0$NW~Q0v@P1v-q+jPdu})q zhN*JAML>fRi<0x&qDMw;D+o}6(ZU9b3BVP~^z~4M+W=$vlQRp2U>#)+qMXZtp#6=N*3wsttKMcj$rXprSCezd7 z0Ba$U>)zDK_y2*FxZ;jcMjU#DDLXHsM7wGYyP}GcZrJW(ThslBi{y?JYA z;VrO+_8rLX&UssY7ytdO-`?7^Z=dMNwS;r--W^+C-ht|ybIo15fis&Ib_4xu*Iw9M zIdAV)%a$+aerLz_Zr;&tcYhyfdKCRMy0NtRQLZEMNpw3f@_^m*ytUr6<7s{FZ&^?5 z`J30T=ej3z$FJ=9gL&j_w&k7vdS4#-Ykc|BsPWYL(r&P0hdy%Vg-L}UDKD+s7_TlUwJO9-Ehwkl$@Uw3H|PWBEQT1 zQ{D~?f7*xVT_y9*F9MrC|2(+K=6roSZ2y17{9oEQ1kG30&hZiS)!ES?RDT;BwM{bU z>k-c+V!mNR6T{3m=Pk57Y=`nO+k68ACULCzu}Shp?Ce}*6@!1owb@hkN@N7ucPZ{ sw$AQ&qHF9P>;KsIE8i#TqwJ^IT-(Ln#=&j&(wcqWF=4g1)j{C-Z#O;B$p8QV literal 0 HcmV?d00001 diff --git a/lambda-durable-investigation-escalation/src/__pycache__/helpers.cpython-314.pyc b/lambda-durable-investigation-escalation/src/__pycache__/helpers.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..3d9fd3b1efe3ca0a4a9d25e705922c421609f186 GIT binary patch literal 5482 zcmcIoTW=f372YM67n8EAi|ae&ab-)QO;L87#I2JUpe-4p?I;2zt1ofb)$T~zdbzup znYBU%1(ZJ2L0_r@1>(n~FG2o7ABsNpS&reF*uEHTe}HNSe(pK57cPARS|bEF+?~sr zGiSc@opX3*aBoS0&tLwwYh^|ib=EJ3!vxopQhH#y`2er z6RD*)mYRQw)N-?MP!R*oBD8y(d!QX`mZ06&EJHigRL7OdfPNQ_Uurmlt}$I$u0z(T z?NLTneaFCePF7t;#5yGn=MLq0MMbwLcD3%5-fFB!T+81-!3pz=s2UDbVC5UcIkv`rlm z9mo>v1deBOB2b3K%xO<&qC@aJmI0bg(b`(AO0HQ8fQ zjk(r*{iC@+2}ZBfZ``EqK^cJ_+Fl?E>^Z|5*l4xe%>B z4*meB$ohXmR9s;cZh?T!EXS}+Fb^0H)HmT>9;Fd=x0ZCcSb6;xhugy?X-F;LEt7w4 z{V`>Hm=!||jckN_3Cp-m1uww{AOk@o@0@qIpl!l^KnZ{etqvwk*&Q^38M;Z_RT5(Y z^r^Eoez8R0lbm46vOd^C`D}AR1S_2F-U&#c6v9c|Jmk|^_U$np{+7&e@#$z1sc_LVpZpm!Q zIpA8Lk@<8R4n1S*?G;m>HhqRdrmeKgPxrK%=6ZXEGh-$s3tsa&!GTc2R=ig$yh}C+ zTmA&T9GflWheJwk^pSS-zIOEMLp$2hziX$T!hm9=p;?3u3E)||=Jfz>s>{lZ+RQYw z%W6|w&eYVIOfz>Ok_V8^1EdT7pAphGJrf~~bQbG5$z$6S0mhfs)jf}*k=@iykoE8? z-U#M?wEvF{^Oj|jDd3j~=&7?Jk6DuFZ_`dq!df1C3S2ar?a+geaRoaJLnqSfW1W>ilDH&Wp@x13o(#(3@nxkl7G8I70+ z6y=z*oNeZ-c!cQVWtYJbt4HE@MOn@@3ls4A3c1Bs%H8vV+~RX`ODOk~D6@h{m+S2s zKQ+rp6wBFV4fGhepoRTF5PN|j2K`^c;+GLb9P}`p6H7qcvM>(;Jh-+mJt^eyHzEc| z1t2|=(th+9_&4#ro&gIe>?kfSL|2$lAq4^rO4zm_W2ZbQ z%V!y3ILFcq_z&`{kOGG!#_rH0nP5nL@4=UU1sh!HtAD+1QWqG@Czc`jm zJ@u$C@qJ-JPQCE3cmbx4pWZq1!Nc)OF!lQVBKfyA@<{7v+mqcBXSzR~-rXZ9HWyQD zLP4M?o=Z~fyHe}d=P{UwLk5#cWiDADCe2^O%w-|0YGhiRWQW9Y1y)C4*&=HdO6F62 zttHpP(gF&z|2H$|mAczOt2?8M^K3N;s}R0vL8cRwt6{8 zD{WR*g_)nimybaM5&nRZE8e~S#r5v+k?zph?$ALD=0ndEEk;^8#0|h2iL|S7@#~R> z8FMoOSi=CVLD`3 zhI$;(U|Y`ZYX5#80-?j55uby6DA;+T834w1-P^?Xa78JqYYc2?SrK6_ovVh zW*BG*Ho#4i68ObagrJPYJ{N0WzkRw;eQu%7xocQZyWq}I!HOE=66EhNWQApI4`9*I zIrPIge}~F3nY?rZq!H@5P*-y(?Hx1xyGQro4-+62BfKl$g9Kbdiv;i&N_hJXQ-%z7 z5bzdcK`6tbO;)1ul&}(IW$3TuSPdtZlQGK-!v(HqXQAN;X9xnZvmC&}m$s4x5sDb* zzaGwi2O4mbv80}SJTS67+Z{M`?@ZLl*WvB+yXE{~X*a9njy+R~k@r-FgF>rix<;!N zWLzHPR$Sqp4~iIhd}dIFjvM3wc32-sPtVD}If4Rp?pVxq*m0N<#=mp;goUn5(`B%d z?ktu4PLa8H7}<*N!@ohkO)s(cVLGPK{I}5TW>r=FPo|)1Pe&BB^bciVH?Ir~|NheN zF5Nr*pnPhp*v*&k+P|^4-+7Qfyp?@CeE6%eKa72y`L^^;>D$3?26tv=|GM&kG#(6J j+Zyb?cJxVpKt1zhY`>a+QX|k@Rrjg+A5I=r!y4hg-AgIP literal 0 HcmV?d00001 diff --git a/lambda-durable-investigation-escalation/src/callback_handler.py b/lambda-durable-investigation-escalation/src/callback_handler.py new file mode 100644 index 000000000..d15bac60f --- /dev/null +++ b/lambda-durable-investigation-escalation/src/callback_handler.py @@ -0,0 +1,145 @@ +import json +import os +from datetime import datetime, timezone + +import boto3 + +lambda_client = boto3.client('lambda') +dynamodb = boto3.resource('dynamodb') +table = dynamodb.Table(os.environ['CALLBACK_TABLE_NAME']) + + +def lambda_handler(event, context): + """Handle human acknowledgment via API Gateway GET /{uuid}""" + + print(f"Received event: {json.dumps(event)}") + + # Extract UUID from path parameters + request_uuid = event['pathParameters']['uuid'] + + print(f"Processing acknowledgment for UUID: {request_uuid}") + + # Fetch callback ID from DynamoDB + try: + response = table.get_item(Key={'uuid': request_uuid}) + if 'Item' not in response: + return { + 'statusCode': 404, + 'headers': {'Content-Type': 'text/html; charset=utf-8'}, + 'body': """ + + + + + Not Found + + +

Request Not Found

+

This acknowledgment link is invalid or has expired.

+ + +""" + } + + callback_id = response['Item']['callbackId'] + incident_id = response['Item'].get('incidentId', 'Unknown') + tier = response['Item'].get('tier', 'Unknown') + print(f"Found callback ID: {callback_id} for incident: {incident_id}, tier: {tier}") + + except Exception as e: + print(f"Error fetching from DynamoDB: {str(e)}") + return { + 'statusCode': 500, + 'headers': {'Content-Type': 'text/html; charset=utf-8'}, + 'body': f""" + + + + + Error + + +

Error

+

{str(e)}

+ + +""" + } + + # Prepare callback result with acknowledgment and timestamp + result = { + 'acknowledged': True, + 'timestamp': datetime.now(timezone.utc).isoformat() + } + + try: + # Send callback using Lambda API + result_json = json.dumps(result) + + response = lambda_client.send_durable_execution_callback_success( + CallbackId=callback_id, + Result=result_json + ) + + print(f"Callback sent successfully: {response}") + + # Return HTML confirmation page + return { + 'statusCode': 200, + 'headers': {'Content-Type': 'text/html; charset=utf-8'}, + 'body': f""" + + + + + Incident Acknowledged + + + +
+
+

Incident Acknowledged!

+

The escalation workflow has been notified and will update the incident record.

+
+ + +""" + } + + except Exception as e: + print(f"Error sending callback: {str(e)}") + return { + 'statusCode': 500, + 'headers': {'Content-Type': 'text/html; charset=utf-8'}, + 'body': f""" + + + + + Error + + +

Error Processing Acknowledgment

+

{str(e)}

+ + +""" + } diff --git a/lambda-durable-investigation-escalation/src/helpers.py b/lambda-durable-investigation-escalation/src/helpers.py new file mode 100644 index 000000000..4f8a1e6cd --- /dev/null +++ b/lambda-durable-investigation-escalation/src/helpers.py @@ -0,0 +1,112 @@ +""" +Integration helper functions for the Investigation Escalation workflow. + +These functions serve as extensible integration points. Each function isolates +a specific external interaction (DynamoDB write, SNS publish) so that customers +can replace the default implementation with calls to third-party services +(e.g., Jira, ServiceNow, PagerDuty) without modifying the core orchestration logic. +""" + +import os +import boto3 + +# Module-level clients initialized from environment variables +dynamodb = boto3.resource('dynamodb') +sns = boto3.client('sns') + +incident_table = dynamodb.Table(os.environ.get('INCIDENT_TABLE_NAME', '')) +callback_table = dynamodb.Table(os.environ.get('CALLBACK_TABLE_NAME', '')) + + +def create_incident_ticket(incident): + """ + Create an incident record in the Incident_Table. + + This is an Integration Point — replace this function to create tickets + in an external system such as Jira or ServiceNow instead of DynamoDB. + + Args: + incident (dict): The full incident record containing incidentId, + investigationId, failureType, service, region, errorDetails, + timestamp, status, escalationHistory, createdAt, and ttl. + """ + incident_table.put_item(Item=incident) + + +def resolve_incident(incident_id, status, details): + """ + Update the incident record with its final resolution status. + + This is an Integration Point — replace this function to transition tickets + in an external system such as Jira or ServiceNow instead of DynamoDB. + + Args: + incident_id (str): The incident's partition key. + status (str): The resolution status ('acknowledged' or 'unacknowledged'). + details (dict): Additional resolution details. For acknowledged incidents, + should contain 'acknowledgedAt' (ISO 8601 timestamp) and + 'acknowledgedTier' (int). For unacknowledged incidents, may be empty + or contain supplementary info. + """ + update_expr = 'SET #s = :status' + expr_names = {'#s': 'status'} + expr_values = {':status': status} + + if details.get('acknowledgedAt'): + update_expr += ', acknowledgedAt = :ack_at' + expr_values[':ack_at'] = details['acknowledgedAt'] + + if details.get('acknowledgedTier'): + update_expr += ', acknowledgedTier = :ack_tier' + expr_values[':ack_tier'] = details['acknowledgedTier'] + + incident_table.update_item( + Key={'incidentId': incident_id}, + UpdateExpression=update_expr, + ExpressionAttributeNames=expr_names, + ExpressionAttributeValues=expr_values, + ) + + +def send_escalation_notification(topic_arn, message): + """ + Publish an escalation notification to the Notification_Topic. + + This is an Integration Point — replace this function to send notifications + via an external service such as PagerDuty or Opsgenie instead of SNS. + + Args: + topic_arn (str): The ARN of the SNS topic to publish to. + message (str): The formatted notification message body. + """ + sns.publish( + TopicArn=topic_arn, + Subject='Investigation Escalation Alert', + Message=message, + ) + + +def store_callback_mapping(uuid, callback_id, incident_id, tier, ttl): + """ + Store a UUID-to-callback-ID mapping in the Callback_Table. + + This mapping enables clean acknowledgment URLs. When an on-call responder + clicks the link, the Callback_Handler uses this mapping to look up the + durable function callback ID. + + Args: + uuid (str): The short UUID used in the acknowledgment URL. + callback_id (str): The durable function callback ID. + incident_id (str): The associated incident ID. + tier (int): The escalation tier (1 or 2). + ttl (int): TTL in epoch seconds for automatic cleanup. + """ + callback_table.put_item( + Item={ + 'uuid': uuid, + 'callbackId': callback_id, + 'incidentId': incident_id, + 'tier': tier, + 'ttl': ttl, + } + ) diff --git a/lambda-durable-investigation-escalation/src/lambda_function.py b/lambda-durable-investigation-escalation/src/lambda_function.py new file mode 100644 index 000000000..e14ebb73f --- /dev/null +++ b/lambda-durable-investigation-escalation/src/lambda_function.py @@ -0,0 +1,269 @@ +""" +Durable execution handler for the Investigation Failure Escalation workflow. + +This Lambda function orchestrates an escalation workflow using the +aws_durable_execution_sdk_python SDK. When a DevOps Agent's investigation +fails or times out, this function: + +1. Validates the incoming event +2. Gathers failure context and generates an incident ID +3. Creates an incident ticket (Integration Point) +4. Pages on-call personnel and waits for acknowledgment +5. Tracks final resolution (acknowledged or unacknowledged) + +Each major step is checkpointed via context.step() so the workflow +survives Lambda restarts and retries automatically. +""" + +import json +import os +import uuid +import time +from datetime import datetime, timezone + +from aws_durable_execution_sdk_python import DurableContext, durable_execution +from aws_durable_execution_sdk_python.config import CallbackConfig, Duration +from aws_durable_execution_sdk_python.exceptions import CallbackError +import boto3 + +import helpers + +# Valid failure types for the Investigation_Event +VALID_FAILURE_TYPES = {'investigation_failed', 'investigation_timed_out'} + +# Required fields in the Investigation_Event payload +REQUIRED_FIELDS = [ + 'investigationId', + 'failureType', + 'service', + 'region', + 'errorDetails', + 'timestamp', +] + + +def validate_event(event): + """ + Validate the Investigation_Event payload. + + Checks that all required fields are present and that failureType + is one of the allowed enum values. + """ + missing = [f for f in REQUIRED_FIELDS if f not in event or event[f] is None] + if missing: + return False, f"Missing required fields: {', '.join(missing)}" + + if event.get('failureType') not in VALID_FAILURE_TYPES: + return False, ( + f"Invalid failureType: '{event.get('failureType')}'. " + f"Must be one of: {', '.join(sorted(VALID_FAILURE_TYPES))}" + ) + + return True, None + + +def build_context_summary(event): + """ + Build a context summary from a validated Investigation_Event. + + Extracts all required fields and generates a new incidentId (UUID v4). + """ + return { + 'incidentId': str(uuid.uuid4()), + 'investigationId': event['investigationId'], + 'failureType': event['failureType'], + 'service': event['service'], + 'region': event['region'], + 'errorDetails': event['errorDetails'], + 'timestamp': event['timestamp'], + } + + +def build_incident_record(context_summary): + """ + Build a full incident record from a context summary. + + Adds default fields: status=open, empty escalationHistory, + createdAt timestamp, and TTL set to 7 days from creation. + """ + now = datetime.now(timezone.utc) + created_at = now.isoformat() + ttl = int(now.timestamp()) + 604800 # 7 days in seconds + + return { + **context_summary, + 'status': 'open', + 'escalationHistory': [], + 'createdAt': created_at, + 'ttl': ttl, + } + + +def build_notification_message(incident_data, ack_url): + """ + Build a notification message for escalation alerts. + """ + return ( + f"INVESTIGATION ESCALATION ALERT\n" + f"\n" + f"Incident ID: {incident_data['incidentId']}\n" + f"Investigation ID: {incident_data['investigationId']}\n" + f"Failure Type: {incident_data['failureType']}\n" + f"Service: {incident_data['service']}\n" + f"Region: {incident_data['region']}\n" + f"Error Details: {incident_data['errorDetails']}\n" + f"\n" + f"Acknowledge this incident:\n" + f"{ack_url}\n" + ) + + +def build_final_result(incident_id, investigation_id, status, escalation_history): + """ + Build the final result object returned by the Escalation_Function. + """ + return { + 'incidentId': incident_id, + 'investigationId': investigation_id, + 'status': status, + 'escalationHistory': escalation_history, + } + + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Durable execution handler for the investigation failure escalation workflow. + + Gathers context, creates an incident, pages on-call personnel, + waits for acknowledgment, and tracks the final resolution. + """ + # --- Input Validation --- + is_valid, error_message = validate_event(event) + if not is_valid: + now = datetime.now(timezone.utc) + validation_record = { + 'incidentId': str(uuid.uuid4()), + 'investigationId': event.get('investigationId', 'unknown'), + 'failureType': event.get('failureType', 'unknown'), + 'service': event.get('service', 'unknown'), + 'region': event.get('region', 'unknown'), + 'errorDetails': error_message, + 'timestamp': event.get('timestamp', now.isoformat()), + 'status': 'validation_failed', + 'escalationHistory': [], + 'createdAt': now.isoformat(), + 'ttl': int(now.timestamp()) + 604800, + } + helpers.create_incident_ticket(validation_record) + print(f"Validation failed: {error_message}") + return {'error': error_message, 'status': 'validation_failed'} + + # --- Step 1: Gather Context --- + def gather_context(_): + summary = build_context_summary(event) + print(f"Context gathered — incidentId: {summary['incidentId']}") + return summary + + context_summary = context.step(gather_context, name='gather-context') + + incident_id = context_summary['incidentId'] + investigation_id = context_summary['investigationId'] + + # --- Step 2: Create Incident --- + def create_incident(_): + record = build_incident_record(context_summary) + helpers.create_incident_ticket(record) + print(f"Incident created: {incident_id}") + return record + + incident_record = context.step(create_incident, name='create-incident') + escalation_history = incident_record.get('escalationHistory', []) + + # --- Read API Gateway URL from SSM --- + param_name = os.environ.get('API_GATEWAY_PARAM') + if not param_name: + raise ValueError("API_GATEWAY_PARAM environment variable is not set") + + ssm = boto3.client('ssm') + response = ssm.get_parameter(Name=param_name) + api_base_url = response['Parameter']['Value'] + print(f"API Base URL: {api_base_url}") + + # --- Read configurable timeout --- + ack_timeout = int(os.environ.get('ACK_TIMEOUT', '900')) + + # Callback table entry TTL (1 hour) + callback_ttl = int(time.time()) + 3600 + + topic_arn = os.environ['NOTIFICATION_TOPIC_ARN'] + + # --- Create Callback, Store Mapping, Notify On-Call --- + callback = context.create_callback( + name='wait-for-ack', + config=CallbackConfig(timeout=Duration.from_seconds(ack_timeout)), + ) + + callback_uuid = str(uuid.uuid4()) + helpers.store_callback_mapping( + uuid=callback_uuid, + callback_id=callback.callback_id, + incident_id=incident_id, + tier=1, + ttl=callback_ttl, + ) + + def notify_oncall(_): + ack_url = f"{api_base_url}/{callback_uuid}" + message = build_notification_message(context_summary, ack_url) + helpers.send_escalation_notification(topic_arn, message) + print(f"On-call notification sent — ack URL: {ack_url}") + + context.step(notify_oncall, name='notify-oncall') + + # --- Pause: Wait for Acknowledgment --- + try: + result = callback.result() + + # Acknowledged + if isinstance(result, str): + ack_data = json.loads(result) + else: + ack_data = result + + ack_timestamp = ack_data.get('timestamp', datetime.now(timezone.utc).isoformat()) + + def resolve_acknowledged(_): + helpers.resolve_incident(incident_id, 'acknowledged', { + 'acknowledgedAt': ack_timestamp, + }) + escalation_history.append({ + 'action': 'acknowledged', + 'timestamp': ack_timestamp, + }) + print(f"Incident {incident_id} acknowledged") + + context.step(resolve_acknowledged, name='resolve-incident') + + return build_final_result( + incident_id, investigation_id, 'acknowledged', escalation_history + ) + + except CallbackError: + print(f"Callback timed out — resolving as unacknowledged") + + # --- Timeout → Resolve Unacknowledged --- + def resolve_unacknowledged(_): + timeout_timestamp = datetime.now(timezone.utc).isoformat() + helpers.resolve_incident(incident_id, 'unacknowledged', {}) + escalation_history.append({ + 'action': 'timeout', + 'timestamp': timeout_timestamp, + }) + print(f"Incident {incident_id} unacknowledged — timeout expired") + + context.step(resolve_unacknowledged, name='resolve-unacknowledged') + + return build_final_result( + incident_id, investigation_id, 'unacknowledged', escalation_history + ) diff --git a/lambda-durable-investigation-escalation/src/requirements.txt b/lambda-durable-investigation-escalation/src/requirements.txt new file mode 100644 index 000000000..8f4105d97 --- /dev/null +++ b/lambda-durable-investigation-escalation/src/requirements.txt @@ -0,0 +1 @@ +boto3>=1.42.1 diff --git a/lambda-durable-investigation-escalation/template.yaml b/lambda-durable-investigation-escalation/template.yaml new file mode 100644 index 000000000..5086a2849 --- /dev/null +++ b/lambda-durable-investigation-escalation/template.yaml @@ -0,0 +1,177 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: Investigation Failure Escalation Pattern using AWS Lambda durable functions + +Parameters: + OncallEmail: + Type: String + Description: Email address to receive escalation notifications + AckTimeout: + Type: Number + Default: 900 + Description: Timeout in seconds to wait for on-call acknowledgment before marking unacknowledged + +Resources: + # DynamoDB Table for Incident Records + IncidentTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub '${AWS::StackName}-incidents' + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: incidentId + AttributeType: S + KeySchema: + - AttributeName: incidentId + KeyType: HASH + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + + # DynamoDB Table for Callback Mapping + CallbackTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub '${AWS::StackName}-callbacks' + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: uuid + AttributeType: S + KeySchema: + - AttributeName: uuid + KeyType: HASH + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + + # SNS Topic for Escalation Notifications + NotificationTopic: + Type: AWS::SNS::Topic + Properties: + Subscription: + - Protocol: email + Endpoint: !Ref OncallEmail + + # Durable Escalation Function + EscalationFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub '${AWS::StackName}-EscalationFunction' + CodeUri: src/ + Handler: lambda_function.lambda_handler + Runtime: python3.14 + Timeout: 120 + AutoPublishAlias: live + DurableConfig: + ExecutionTimeout: 3600 + RetentionPeriodInDays: 7 + Environment: + Variables: + INCIDENT_TABLE_NAME: !Ref IncidentTable + CALLBACK_TABLE_NAME: !Ref CallbackTable + NOTIFICATION_TOPIC_ARN: !Ref NotificationTopic + API_GATEWAY_PARAM: !Sub '/${AWS::StackName}/api-gateway-url' + ACK_TIMEOUT: !Ref AckTimeout + Policies: + - DynamoDBWritePolicy: + TableName: !Ref IncidentTable + - DynamoDBWritePolicy: + TableName: !Ref CallbackTable + - SNSPublishMessagePolicy: + TopicName: !GetAtt NotificationTopic.TopicName + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecution + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-EscalationFunction-*' + - Effect: Allow + Action: + - ssm:GetParameter + Resource: !Sub 'arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${AWS::StackName}/api-gateway-url' + + # Callback Handler Function + CallbackHandlerFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub '${AWS::StackName}-CallbackHandlerFunction' + CodeUri: src/ + Handler: callback_handler.lambda_handler + Runtime: python3.14 + Timeout: 30 + Environment: + Variables: + CALLBACK_TABLE_NAME: !Ref CallbackTable + Policies: + - DynamoDBReadPolicy: + TableName: !Ref CallbackTable + - Statement: + - Effect: Allow + Action: + - dynamodb:GetItem + Resource: !GetAtt CallbackTable.Arn + - Effect: Allow + Action: + - lambda:SendDurableExecutionCallbackSuccess + Resource: '*' + + # Lambda Permission for API Gateway — EscalationFunction (alias) + EscalationFunctionInvokePermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !Ref EscalationFunction.Alias + Action: lambda:InvokeFunction + Principal: apigateway.amazonaws.com + SourceArn: !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${EscalationApi}/*' + + # Lambda Permission for API Gateway — CallbackHandlerFunction + CallbackHandlerInvokePermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !Ref CallbackHandlerFunction + Action: lambda:InvokeFunction + Principal: apigateway.amazonaws.com + SourceArn: !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${EscalationApi}/*/*/{uuid}' + + # API Gateway for Acknowledgment Endpoint + EscalationApi: + Type: AWS::Serverless::Api + Properties: + StageName: prod + DefinitionBody: + openapi: 3.0.1 + paths: + /{uuid}: + get: + parameters: + - name: uuid + in: path + required: true + schema: + type: string + responses: + '200': + description: Callback processed + x-amazon-apigateway-integration: + type: aws_proxy + httpMethod: POST + uri: !Sub 'arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${CallbackHandlerFunction.Arn}/invocations' + passthroughBehavior: when_no_match + + # Store API Gateway URL in Parameter Store + ApiGatewayUrlParameter: + Type: AWS::SSM::Parameter + Properties: + Name: !Sub '/${AWS::StackName}/api-gateway-url' + Type: String + Value: !Sub 'https://${EscalationApi}.execute-api.${AWS::Region}.amazonaws.com/prod' + Description: 'API Gateway URL for the escalation workflow' + DependsOn: EscalationApi + +Outputs: + ApiEndpoint: + Description: API Gateway endpoint URL + Value: !Sub 'https://${EscalationApi}.execute-api.${AWS::Region}.amazonaws.com/prod' + + EscalationFunctionArn: + Description: Escalation Durable Function ARN + Value: !GetAtt EscalationFunction.Arn