diff --git a/infrastructure/terraform/modules/eventpub/README.md b/infrastructure/terraform/modules/eventpub/README.md index f013f131..c91733ea 100644 --- a/infrastructure/terraform/modules/eventpub/README.md +++ b/infrastructure/terraform/modules/eventpub/README.md @@ -46,7 +46,6 @@ | Name | Description | |------|-------------| -| [dlq](#output\_dlq) | EventPub DLQ name and ARN | | [publishing\_anomaly\_alarm](#output\_publishing\_anomaly\_alarm) | CloudWatch anomaly detection alarm details for SNS publishing | | [s3\_bucket\_event\_cache](#output\_s3\_bucket\_event\_cache) | S3 Bucket ARN and Name for event cache | | [sns\_topic](#output\_sns\_topic) | SNS Topic ARN and Name | diff --git a/infrastructure/terraform/modules/eventpub/cloudwatch_metric_alarm_dlq_alarm.tf b/infrastructure/terraform/modules/eventpub/cloudwatch_metric_alarm_dlq_alarm.tf index eae6bc47..77237d2d 100644 --- a/infrastructure/terraform/modules/eventpub/cloudwatch_metric_alarm_dlq_alarm.tf +++ b/infrastructure/terraform/modules/eventpub/cloudwatch_metric_alarm_dlq_alarm.tf @@ -12,6 +12,6 @@ resource "aws_cloudwatch_metric_alarm" "dlq_alarm" { treat_missing_data = "notBreaching" dimensions = { - QueueName = aws_sqs_queue.dlq.name + QueueName = module.sqs_queue.sqs_dlq_name } } diff --git a/infrastructure/terraform/modules/eventpub/iam_role_lambda.tf b/infrastructure/terraform/modules/eventpub/iam_role_lambda.tf index c6d5d5f8..05eea73a 100644 --- a/infrastructure/terraform/modules/eventpub/iam_role_lambda.tf +++ b/infrastructure/terraform/modules/eventpub/iam_role_lambda.tf @@ -54,33 +54,6 @@ data "aws_iam_policy_document" "lambda" { ] } - statement { - sid = "PutEvents" - effect = "Allow" - - actions = [ - "events:PutEvents", - ] - - resources = [ - var.control_plane_bus_arn, - var.data_plane_bus_arn, - ] - } - - statement { - sid = "DLQPutMessage" - effect = "Allow" - - actions = [ - "sqs:SendMessage", - ] - - resources = [ - aws_sqs_queue.dlq.arn - ] - } - statement { sid = "AllowSQSInput" effect = "Allow" diff --git a/infrastructure/terraform/modules/eventpub/lambda/eventpub/package-lock.json b/infrastructure/terraform/modules/eventpub/lambda/eventpub/package-lock.json index 6a870ecd..d35aebac 100644 --- a/infrastructure/terraform/modules/eventpub/lambda/eventpub/package-lock.json +++ b/infrastructure/terraform/modules/eventpub/lambda/eventpub/package-lock.json @@ -10,7 +10,6 @@ "license": "ISC", "dependencies": { "@aws-sdk/client-eventbridge": "^3.1004.0", - "@aws-sdk/client-sqs": "^3.1004.0", "aws-sdk-client-mock": "^4.1.0" }, "devDependencies": { @@ -196,58 +195,6 @@ "node": ">=20.0.0" } }, - "node_modules/@aws-sdk/client-sqs": { - "version": "3.1004.0", - "resolved": "https://registry.npmjs.org/@aws-sdk/client-sqs/-/client-sqs-3.1004.0.tgz", - "integrity": "sha512-aCREPa+SyOE6pD2JuD32E6HJAX9ik+qyXtyXPsTzPL5hDvCk85ccZUwSFpk1ErxubB4v832IDukvxfXOclQtzA==", - "license": "Apache-2.0", - "dependencies": { - "@aws-crypto/sha256-browser": "5.2.0", - "@aws-crypto/sha256-js": "5.2.0", - "@aws-sdk/core": "^3.973.18", - "@aws-sdk/credential-provider-node": "^3.972.18", - "@aws-sdk/middleware-host-header": "^3.972.7", - "@aws-sdk/middleware-logger": "^3.972.7", - "@aws-sdk/middleware-recursion-detection": "^3.972.7", - "@aws-sdk/middleware-sdk-sqs": "^3.972.13", - "@aws-sdk/middleware-user-agent": "^3.972.19", - "@aws-sdk/region-config-resolver": "^3.972.7", - "@aws-sdk/types": "^3.973.5", - "@aws-sdk/util-endpoints": "^3.996.4", - "@aws-sdk/util-user-agent-browser": "^3.972.7", - "@aws-sdk/util-user-agent-node": "^3.973.4", - "@smithy/config-resolver": "^4.4.10", - "@smithy/core": "^3.23.8", - "@smithy/fetch-http-handler": "^5.3.13", - "@smithy/hash-node": "^4.2.11", - "@smithy/invalid-dependency": "^4.2.11", - "@smithy/md5-js": "^4.2.11", - "@smithy/middleware-content-length": "^4.2.11", - "@smithy/middleware-endpoint": "^4.4.22", - "@smithy/middleware-retry": "^4.4.39", - "@smithy/middleware-serde": "^4.2.12", - "@smithy/middleware-stack": "^4.2.11", - "@smithy/node-config-provider": "^4.3.11", - "@smithy/node-http-handler": "^4.4.14", - "@smithy/protocol-http": "^5.3.11", - "@smithy/smithy-client": "^4.12.2", - "@smithy/types": "^4.13.0", - "@smithy/url-parser": "^4.2.11", - "@smithy/util-base64": "^4.3.2", - "@smithy/util-body-length-browser": "^4.2.2", - "@smithy/util-body-length-node": "^4.2.3", - "@smithy/util-defaults-mode-browser": "^4.3.38", - "@smithy/util-defaults-mode-node": "^4.2.41", - "@smithy/util-endpoints": "^3.3.2", - "@smithy/util-middleware": "^4.2.11", - "@smithy/util-retry": "^4.2.11", - "@smithy/util-utf8": "^4.2.2", - "tslib": "^2.6.2" - }, - "engines": { - "node": ">=20.0.0" - } - }, "node_modules/@aws-sdk/core": { "version": "3.973.18", "resolved": "https://registry.npmjs.org/@aws-sdk/core/-/core-3.973.18.tgz", @@ -500,23 +447,6 @@ "node": ">=20.0.0" } }, - "node_modules/@aws-sdk/middleware-sdk-sqs": { - "version": "3.972.13", - "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-sdk-sqs/-/middleware-sdk-sqs-3.972.13.tgz", - "integrity": "sha512-DiGD+Q636hk1+nTq/zSZe1Ih5GhEwmOzH/3oR3A+0/53hVK8E4OWFqAf9dwPE9pufp4mCmn/bE6qU8mE9Q6sig==", - "license": "Apache-2.0", - "dependencies": { - "@aws-sdk/types": "^3.973.5", - "@smithy/smithy-client": "^4.12.2", - "@smithy/types": "^4.13.0", - "@smithy/util-hex-encoding": "^4.2.2", - "@smithy/util-utf8": "^4.2.2", - "tslib": "^2.6.2" - }, - "engines": { - "node": ">=20.0.0" - } - }, "node_modules/@aws-sdk/middleware-user-agent": { "version": "3.972.19", "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-user-agent/-/middleware-user-agent-3.972.19.tgz", @@ -1719,20 +1649,6 @@ "node": ">=18.0.0" } }, - "node_modules/@smithy/md5-js": { - "version": "4.2.11", - "resolved": "https://registry.npmjs.org/@smithy/md5-js/-/md5-js-4.2.11.tgz", - "integrity": "sha512-350X4kGIrty0Snx2OWv7rPM6p6vM7RzryvFs6B/56Cux3w3sChOb3bymo5oidXJlPcP9fIRxGUCk7GqpiSOtng==", - "license": "Apache-2.0", - "dependencies": { - "@smithy/types": "^4.13.0", - "@smithy/util-utf8": "^4.2.2", - "tslib": "^2.6.2" - }, - "engines": { - "node": ">=18.0.0" - } - }, "node_modules/@smithy/middleware-content-length": { "version": "4.2.11", "resolved": "https://registry.npmjs.org/@smithy/middleware-content-length/-/middleware-content-length-4.2.11.tgz", diff --git a/infrastructure/terraform/modules/eventpub/lambda/eventpub/package.json b/infrastructure/terraform/modules/eventpub/lambda/eventpub/package.json index 8bd00c6b..a8cd600b 100644 --- a/infrastructure/terraform/modules/eventpub/lambda/eventpub/package.json +++ b/infrastructure/terraform/modules/eventpub/lambda/eventpub/package.json @@ -19,7 +19,6 @@ "license": "ISC", "dependencies": { "@aws-sdk/client-eventbridge": "^3.1004.0", - "@aws-sdk/client-sqs": "^3.1004.0", "aws-sdk-client-mock": "^4.1.0" } } diff --git a/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js b/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js index 58d655ad..3a982a1f 100644 --- a/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js +++ b/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js @@ -1,10 +1,8 @@ const { handler } = require('../index.js'); const { mockClient } = require('aws-sdk-client-mock'); -const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs'); const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge'); const eventBridgeMock = mockClient(EventBridgeClient); -const sqsMock = mockClient(SQSClient); const validCloudEvent = { id: "123e4567-e89b-12d3-a456-426614174000", @@ -34,72 +32,82 @@ const invalidCloudEvent = { data: {} }; -const snsEvent = { +const sqsEvent = { Records: [ - { Sns: { Message: JSON.stringify(validCloudEvent) } } + { + messageId: 'msg-1', + body: JSON.stringify({ Message: JSON.stringify(validCloudEvent) }) + } ] }; -const snsEventInvalid = { +const sqsEventInvalid = { Records: [ - { Sns: { Message: JSON.stringify(invalidCloudEvent) } } + { + messageId: 'msg-1', + body: JSON.stringify({ Message: JSON.stringify(invalidCloudEvent) }) + } ] }; -describe('SNS to EventBridge Lambda', () => { +describe('SQS to EventBridge Lambda', () => { beforeEach(() => { eventBridgeMock.reset(); - sqsMock.reset(); }); test('Valid event is sent to the correct EventBridge bus', async () => { eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] }); - await handler(snsEvent); + const result = await handler(sqsEvent); expect(eventBridgeMock.calls()).toHaveLength(1); + expect(result.batchItemFailures).toHaveLength(0); }); - test('Invalid event is sent to DLQ', async () => { - sqsMock.on(SendMessageCommand).resolves({ MessageId: '123' }); + test('Invalid event is reported as a batch item failure', async () => { + const result = await handler(sqsEventInvalid); - await handler(snsEventInvalid); - - expect(sqsMock.calls()).toHaveLength(1); + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe('msg-1'); }); - test('Event with unknown plane field is sent to DLQ', async () => { + test('Event with unknown plane field is reported as a batch item failure', async () => { const eventUnknownPlane = { ...validCloudEvent, plane: "unknown" }; - const snsEventUnknownPlane = { + const sqsEventUnknownPlane = { Records: [ - { Sns: { Message: JSON.stringify(eventUnknownPlane) } } + { + messageId: 'msg-1', + body: JSON.stringify({ Message: JSON.stringify(eventUnknownPlane) }) + } ] }; - await handler(snsEventUnknownPlane); + const result = await handler(sqsEventUnknownPlane); - expect(sqsMock.calls()).toHaveLength(1); + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe('msg-1'); }); - test('Retries on EventBridge failure and sends failed events to DLQ', async () => { + test('Retries on EventBridge failure and reports failed events as batch item failures', async () => { eventBridgeMock .on(PutEventsCommand) .rejectsOnce(Object.assign(new Error('Rate limit exceeded'), { retryable: true })) .resolves({ FailedEntryCount: 1, Entries: [{ ErrorCode: 'InternalFailure' }] }); - sqsMock.on(SendMessageCommand).resolves({ MessageId: '123' }); - await handler(snsEvent); + const result = await handler(sqsEvent); expect(eventBridgeMock.calls()).toHaveLength(2); - expect(sqsMock.calls()).toHaveLength(1); + expect(result.batchItemFailures).toHaveLength(1); }); test('Throttling delays event processing', async () => { process.env.THROTTLE_DELAY_MS = '500'; jest.useFakeTimers(); - const handlerPromise = handler(snsEvent); + eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] }); + + const handlerPromise = handler(sqsEvent); jest.advanceTimersByTime(500); await handlerPromise; diff --git a/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/index.js b/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/index.js index 1ae58f08..37a67a97 100644 --- a/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/index.js +++ b/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/index.js @@ -1,12 +1,9 @@ const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge'); -const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs'); const eventBridge = new EventBridgeClient({}); -const sqs = new SQSClient({}); const DATA_PLANE_EVENT_BUS_ARN = process.env.DATA_PLANE_EVENT_BUS_ARN; const CONTROL_PLANE_EVENT_BUS_ARN = process.env.CONTROL_PLANE_EVENT_BUS_ARN; -const DLQ_URL = process.env.DLQ_URL; const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10); const MAX_RETRIES = 3; const EVENTBRIDGE_MAX_BATCH_SIZE = 10; @@ -44,8 +41,8 @@ async function sendToEventBridge(events, eventBusArn) { const batch = events.slice(i, i + EVENTBRIDGE_MAX_BATCH_SIZE); const entries = batch.map(event => ({ Source: 'custom.event', - DetailType: event.type, - Detail: JSON.stringify(event), + DetailType: event.message.type, + Detail: JSON.stringify(event.message), EventBusName: eventBusArn })); @@ -70,8 +67,8 @@ async function sendToEventBridge(events, eventBusArn) { await new Promise(res => setTimeout(res, 2 ** attempts * 100)); attempts++; } else { - console.error(`Non-retryable error encountered. Moving ${batch.length} events to DLQ`); - failedEvents.push(...batch); + console.error(`Non-retryable error encountered. Reporting ${batch.length} as batch item failures`); + failedEvents.push(...batch.map(event => ({ itemIdentifier: event.itemIdentifier }))); break; } } @@ -80,21 +77,6 @@ async function sendToEventBridge(events, eventBusArn) { return failedEvents; } -async function sendToDLQ(events) { - if (events.length === 0) return; - - console.warn(`Sending ${events.length} failed event(s) to DLQ: ${DLQ_URL}`); - - for (const event of events) { - try { - await sqs.send(new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: JSON.stringify(event) })); - console.debug(`Successfully sent event ${event.id} to DLQ`); - } catch (error) { - console.error(`Failed to send event ${event.id} to DLQ - Name: ${error.name}, Message: ${error.message}, Code: ${error.$metadata?.httpStatusCode}, RequestId: ${error.$metadata?.requestId}`); - } - } -} - exports.handler = async (sqsEvent) => { console.debug(`Received SQS event with ${sqsEvent.Records.length} records.`); @@ -103,28 +85,36 @@ exports.handler = async (sqsEvent) => { await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS)); } - const records = sqsEvent.Records - .map(record => record.body) - .map(JSON.parse) - .map(record => record.Message) - .map(JSON.parse); - const validEvents = records.filter(validateEvent); - const invalidEvents = records.filter(event => !validateEvent(event)); - - console.debug(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`); - if (invalidEvents.length) { - console.warn(`${invalidEvents.length} event(s) failed validation and will be sent to DLQ`); - await sendToDLQ(invalidEvents); - } + const dataPlaneEvents = []; + const controlPlaneEvents = []; + const batchItemFailures = []; - const dataEvents = validEvents.filter(event => event.plane === 'data'); - const controlEvents = validEvents.filter(event => event.plane === 'control'); - const unknownEvents = validEvents.filter(event => event.plane !== 'data' && event.plane !== 'control'); + for(const sqsRecord of sqsEvent.Records) { + const record = JSON.parse(sqsRecord.body); + const message = JSON.parse(record.Message); + + if (!validateEvent(message)) { + console.error(`Invalid event received. EventID: ${message.id || 'unknown'}, EventType: ${message.type || 'unknown'}`); + batchItemFailures.push({ itemIdentifier: sqsRecord.messageId }); + } + else if (message.plane === 'data') { + dataPlaneEvents.push({ itemIdentifier: sqsRecord.messageId, message: message }); + } + else if (message.plane === 'control') { + controlPlaneEvents.push({ itemIdentifier: sqsRecord.messageId, message: message }); + } + else { + console.error(`Unknown plane type received: ${message.plane}. EventID: ${message.id || 'unknown'}, EventType: ${message.type || 'unknown'}`); + batchItemFailures.push({ itemIdentifier: sqsRecord.messageId }); + } + } - console.debug(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}, Unknown events: ${unknownEvents.length}`); + console.debug(`Data events: ${dataPlaneEvents.length}, Control events: ${controlPlaneEvents.length}, Unknown events: ${batchItemFailures.length}`); - const failedDataEvents = await sendToEventBridge(dataEvents, DATA_PLANE_EVENT_BUS_ARN); - const failedControlEvents = await sendToEventBridge(controlEvents, CONTROL_PLANE_EVENT_BUS_ARN); + batchItemFailures.push(...await sendToEventBridge(dataPlaneEvents, DATA_PLANE_EVENT_BUS_ARN)); + batchItemFailures.push(...await sendToEventBridge(controlPlaneEvents, CONTROL_PLANE_EVENT_BUS_ARN)); - await sendToDLQ([...failedDataEvents, ...failedControlEvents, ...unknownEvents]); + return { + batchItemFailures: batchItemFailures + }; }; diff --git a/infrastructure/terraform/modules/eventpub/lambda_function.tf b/infrastructure/terraform/modules/eventpub/lambda_function.tf index f5f9f618..2d1e9325 100644 --- a/infrastructure/terraform/modules/eventpub/lambda_function.tf +++ b/infrastructure/terraform/modules/eventpub/lambda_function.tf @@ -23,7 +23,6 @@ resource "aws_lambda_function" "main" { variables = { DATA_PLANE_EVENT_BUS_ARN = var.data_plane_bus_arn CONTROL_PLANE_EVENT_BUS_ARN = var.control_plane_bus_arn - DLQ_URL = aws_sqs_queue.dlq.url THROTTLE_DELAY_MS = "0" } } diff --git a/infrastructure/terraform/modules/eventpub/module_sqs_queue.tf b/infrastructure/terraform/modules/eventpub/module_sqs_queue.tf index 08f29457..39d0ebcf 100644 --- a/infrastructure/terraform/modules/eventpub/module_sqs_queue.tf +++ b/infrastructure/terraform/modules/eventpub/module_sqs_queue.tf @@ -1,16 +1,17 @@ module "sqs_queue" { source = "https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/3.1.4/terraform-sqs.zip" - aws_account_id = var.aws_account_id - component = var.component - environment = var.environment - project = var.project - region = var.region - name = local.csi - create_dlq = true - sqs_kms_key_arn = var.kms_key_arn - sqs_policy_overload = data.aws_iam_policy_document.allow_sns_send.json - message_retention_seconds = 1209600 # 14 days + aws_account_id = var.aws_account_id + component = var.component + environment = var.environment + project = var.project + region = var.region + name = var.name + create_dlq = true + sqs_kms_key_arn = var.kms_key_arn + sqs_policy_overload = data.aws_iam_policy_document.allow_sns_send.json + message_retention_seconds = 1209600 # 14 days + dlq_message_retention_seconds = 1209600 } data "aws_iam_policy_document" "allow_sns_send" { diff --git a/infrastructure/terraform/modules/eventpub/outputs.tf b/infrastructure/terraform/modules/eventpub/outputs.tf index 2787f33a..bca1f34f 100644 --- a/infrastructure/terraform/modules/eventpub/outputs.tf +++ b/infrastructure/terraform/modules/eventpub/outputs.tf @@ -6,14 +6,6 @@ output "sns_topic" { } } -output "dlq" { - description = "EventPub DLQ name and ARN" - value = { - arn = aws_sqs_queue.dlq.arn - name = aws_sqs_queue.dlq.name - } -} - output "sqs_queue" { description = "EventPub SQS queue name, ARN, and DLQ details" value = { diff --git a/infrastructure/terraform/modules/eventpub/sqs_queue_dlq.tf b/infrastructure/terraform/modules/eventpub/sqs_queue_dlq.tf deleted file mode 100644 index 9af9ddbd..00000000 --- a/infrastructure/terraform/modules/eventpub/sqs_queue_dlq.tf +++ /dev/null @@ -1,7 +0,0 @@ -resource "aws_sqs_queue" "dlq" { - name = "${local.csi}-dlq" - - kms_master_key_id = var.kms_key_arn - - message_retention_seconds = 1209600 -}