diff --git a/infrastructure/terraform/modules/eventpub/README.md b/infrastructure/terraform/modules/eventpub/README.md
index f013f131..c91733ea 100644
--- a/infrastructure/terraform/modules/eventpub/README.md
+++ b/infrastructure/terraform/modules/eventpub/README.md
@@ -46,7 +46,6 @@
| Name | Description |
|------|-------------|
-| [dlq](#output\_dlq) | EventPub DLQ name and ARN |
| [publishing\_anomaly\_alarm](#output\_publishing\_anomaly\_alarm) | CloudWatch anomaly detection alarm details for SNS publishing |
| [s3\_bucket\_event\_cache](#output\_s3\_bucket\_event\_cache) | S3 Bucket ARN and Name for event cache |
| [sns\_topic](#output\_sns\_topic) | SNS Topic ARN and Name |
diff --git a/infrastructure/terraform/modules/eventpub/cloudwatch_metric_alarm_dlq_alarm.tf b/infrastructure/terraform/modules/eventpub/cloudwatch_metric_alarm_dlq_alarm.tf
index eae6bc47..77237d2d 100644
--- a/infrastructure/terraform/modules/eventpub/cloudwatch_metric_alarm_dlq_alarm.tf
+++ b/infrastructure/terraform/modules/eventpub/cloudwatch_metric_alarm_dlq_alarm.tf
@@ -12,6 +12,6 @@ resource "aws_cloudwatch_metric_alarm" "dlq_alarm" {
treat_missing_data = "notBreaching"
dimensions = {
- QueueName = aws_sqs_queue.dlq.name
+ QueueName = module.sqs_queue.sqs_dlq_name
}
}
diff --git a/infrastructure/terraform/modules/eventpub/iam_role_lambda.tf b/infrastructure/terraform/modules/eventpub/iam_role_lambda.tf
index c6d5d5f8..05eea73a 100644
--- a/infrastructure/terraform/modules/eventpub/iam_role_lambda.tf
+++ b/infrastructure/terraform/modules/eventpub/iam_role_lambda.tf
@@ -54,33 +54,6 @@ data "aws_iam_policy_document" "lambda" {
]
}
- statement {
- sid = "PutEvents"
- effect = "Allow"
-
- actions = [
- "events:PutEvents",
- ]
-
- resources = [
- var.control_plane_bus_arn,
- var.data_plane_bus_arn,
- ]
- }
-
- statement {
- sid = "DLQPutMessage"
- effect = "Allow"
-
- actions = [
- "sqs:SendMessage",
- ]
-
- resources = [
- aws_sqs_queue.dlq.arn
- ]
- }
-
statement {
sid = "AllowSQSInput"
effect = "Allow"
diff --git a/infrastructure/terraform/modules/eventpub/lambda/eventpub/package-lock.json b/infrastructure/terraform/modules/eventpub/lambda/eventpub/package-lock.json
index 6a870ecd..d35aebac 100644
--- a/infrastructure/terraform/modules/eventpub/lambda/eventpub/package-lock.json
+++ b/infrastructure/terraform/modules/eventpub/lambda/eventpub/package-lock.json
@@ -10,7 +10,6 @@
"license": "ISC",
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.1004.0",
- "@aws-sdk/client-sqs": "^3.1004.0",
"aws-sdk-client-mock": "^4.1.0"
},
"devDependencies": {
@@ -196,58 +195,6 @@
"node": ">=20.0.0"
}
},
- "node_modules/@aws-sdk/client-sqs": {
- "version": "3.1004.0",
- "resolved": "https://registry.npmjs.org/@aws-sdk/client-sqs/-/client-sqs-3.1004.0.tgz",
- "integrity": "sha512-aCREPa+SyOE6pD2JuD32E6HJAX9ik+qyXtyXPsTzPL5hDvCk85ccZUwSFpk1ErxubB4v832IDukvxfXOclQtzA==",
- "license": "Apache-2.0",
- "dependencies": {
- "@aws-crypto/sha256-browser": "5.2.0",
- "@aws-crypto/sha256-js": "5.2.0",
- "@aws-sdk/core": "^3.973.18",
- "@aws-sdk/credential-provider-node": "^3.972.18",
- "@aws-sdk/middleware-host-header": "^3.972.7",
- "@aws-sdk/middleware-logger": "^3.972.7",
- "@aws-sdk/middleware-recursion-detection": "^3.972.7",
- "@aws-sdk/middleware-sdk-sqs": "^3.972.13",
- "@aws-sdk/middleware-user-agent": "^3.972.19",
- "@aws-sdk/region-config-resolver": "^3.972.7",
- "@aws-sdk/types": "^3.973.5",
- "@aws-sdk/util-endpoints": "^3.996.4",
- "@aws-sdk/util-user-agent-browser": "^3.972.7",
- "@aws-sdk/util-user-agent-node": "^3.973.4",
- "@smithy/config-resolver": "^4.4.10",
- "@smithy/core": "^3.23.8",
- "@smithy/fetch-http-handler": "^5.3.13",
- "@smithy/hash-node": "^4.2.11",
- "@smithy/invalid-dependency": "^4.2.11",
- "@smithy/md5-js": "^4.2.11",
- "@smithy/middleware-content-length": "^4.2.11",
- "@smithy/middleware-endpoint": "^4.4.22",
- "@smithy/middleware-retry": "^4.4.39",
- "@smithy/middleware-serde": "^4.2.12",
- "@smithy/middleware-stack": "^4.2.11",
- "@smithy/node-config-provider": "^4.3.11",
- "@smithy/node-http-handler": "^4.4.14",
- "@smithy/protocol-http": "^5.3.11",
- "@smithy/smithy-client": "^4.12.2",
- "@smithy/types": "^4.13.0",
- "@smithy/url-parser": "^4.2.11",
- "@smithy/util-base64": "^4.3.2",
- "@smithy/util-body-length-browser": "^4.2.2",
- "@smithy/util-body-length-node": "^4.2.3",
- "@smithy/util-defaults-mode-browser": "^4.3.38",
- "@smithy/util-defaults-mode-node": "^4.2.41",
- "@smithy/util-endpoints": "^3.3.2",
- "@smithy/util-middleware": "^4.2.11",
- "@smithy/util-retry": "^4.2.11",
- "@smithy/util-utf8": "^4.2.2",
- "tslib": "^2.6.2"
- },
- "engines": {
- "node": ">=20.0.0"
- }
- },
"node_modules/@aws-sdk/core": {
"version": "3.973.18",
"resolved": "https://registry.npmjs.org/@aws-sdk/core/-/core-3.973.18.tgz",
@@ -500,23 +447,6 @@
"node": ">=20.0.0"
}
},
- "node_modules/@aws-sdk/middleware-sdk-sqs": {
- "version": "3.972.13",
- "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-sdk-sqs/-/middleware-sdk-sqs-3.972.13.tgz",
- "integrity": "sha512-DiGD+Q636hk1+nTq/zSZe1Ih5GhEwmOzH/3oR3A+0/53hVK8E4OWFqAf9dwPE9pufp4mCmn/bE6qU8mE9Q6sig==",
- "license": "Apache-2.0",
- "dependencies": {
- "@aws-sdk/types": "^3.973.5",
- "@smithy/smithy-client": "^4.12.2",
- "@smithy/types": "^4.13.0",
- "@smithy/util-hex-encoding": "^4.2.2",
- "@smithy/util-utf8": "^4.2.2",
- "tslib": "^2.6.2"
- },
- "engines": {
- "node": ">=20.0.0"
- }
- },
"node_modules/@aws-sdk/middleware-user-agent": {
"version": "3.972.19",
"resolved": "https://registry.npmjs.org/@aws-sdk/middleware-user-agent/-/middleware-user-agent-3.972.19.tgz",
@@ -1719,20 +1649,6 @@
"node": ">=18.0.0"
}
},
- "node_modules/@smithy/md5-js": {
- "version": "4.2.11",
- "resolved": "https://registry.npmjs.org/@smithy/md5-js/-/md5-js-4.2.11.tgz",
- "integrity": "sha512-350X4kGIrty0Snx2OWv7rPM6p6vM7RzryvFs6B/56Cux3w3sChOb3bymo5oidXJlPcP9fIRxGUCk7GqpiSOtng==",
- "license": "Apache-2.0",
- "dependencies": {
- "@smithy/types": "^4.13.0",
- "@smithy/util-utf8": "^4.2.2",
- "tslib": "^2.6.2"
- },
- "engines": {
- "node": ">=18.0.0"
- }
- },
"node_modules/@smithy/middleware-content-length": {
"version": "4.2.11",
"resolved": "https://registry.npmjs.org/@smithy/middleware-content-length/-/middleware-content-length-4.2.11.tgz",
diff --git a/infrastructure/terraform/modules/eventpub/lambda/eventpub/package.json b/infrastructure/terraform/modules/eventpub/lambda/eventpub/package.json
index 8bd00c6b..a8cd600b 100644
--- a/infrastructure/terraform/modules/eventpub/lambda/eventpub/package.json
+++ b/infrastructure/terraform/modules/eventpub/lambda/eventpub/package.json
@@ -19,7 +19,6 @@
"license": "ISC",
"dependencies": {
"@aws-sdk/client-eventbridge": "^3.1004.0",
- "@aws-sdk/client-sqs": "^3.1004.0",
"aws-sdk-client-mock": "^4.1.0"
}
}
diff --git a/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js b/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js
index 58d655ad..3a982a1f 100644
--- a/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js
+++ b/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js
@@ -1,10 +1,8 @@
const { handler } = require('../index.js');
const { mockClient } = require('aws-sdk-client-mock');
-const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const eventBridgeMock = mockClient(EventBridgeClient);
-const sqsMock = mockClient(SQSClient);
const validCloudEvent = {
id: "123e4567-e89b-12d3-a456-426614174000",
@@ -34,72 +32,82 @@ const invalidCloudEvent = {
data: {}
};
-const snsEvent = {
+const sqsEvent = {
Records: [
- { Sns: { Message: JSON.stringify(validCloudEvent) } }
+ {
+ messageId: 'msg-1',
+ body: JSON.stringify({ Message: JSON.stringify(validCloudEvent) })
+ }
]
};
-const snsEventInvalid = {
+const sqsEventInvalid = {
Records: [
- { Sns: { Message: JSON.stringify(invalidCloudEvent) } }
+ {
+ messageId: 'msg-1',
+ body: JSON.stringify({ Message: JSON.stringify(invalidCloudEvent) })
+ }
]
};
-describe('SNS to EventBridge Lambda', () => {
+describe('SQS to EventBridge Lambda', () => {
beforeEach(() => {
eventBridgeMock.reset();
- sqsMock.reset();
});
test('Valid event is sent to the correct EventBridge bus', async () => {
eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });
- await handler(snsEvent);
+ const result = await handler(sqsEvent);
expect(eventBridgeMock.calls()).toHaveLength(1);
+ expect(result.batchItemFailures).toHaveLength(0);
});
- test('Invalid event is sent to DLQ', async () => {
- sqsMock.on(SendMessageCommand).resolves({ MessageId: '123' });
+ test('Invalid event is reported as a batch item failure', async () => {
+ const result = await handler(sqsEventInvalid);
- await handler(snsEventInvalid);
-
- expect(sqsMock.calls()).toHaveLength(1);
+ expect(result.batchItemFailures).toHaveLength(1);
+ expect(result.batchItemFailures[0].itemIdentifier).toBe('msg-1');
});
- test('Event with unknown plane field is sent to DLQ', async () => {
+ test('Event with unknown plane field is reported as a batch item failure', async () => {
const eventUnknownPlane = { ...validCloudEvent, plane: "unknown" };
- const snsEventUnknownPlane = {
+ const sqsEventUnknownPlane = {
Records: [
- { Sns: { Message: JSON.stringify(eventUnknownPlane) } }
+ {
+ messageId: 'msg-1',
+ body: JSON.stringify({ Message: JSON.stringify(eventUnknownPlane) })
+ }
]
};
- await handler(snsEventUnknownPlane);
+ const result = await handler(sqsEventUnknownPlane);
- expect(sqsMock.calls()).toHaveLength(1);
+ expect(result.batchItemFailures).toHaveLength(1);
+ expect(result.batchItemFailures[0].itemIdentifier).toBe('msg-1');
});
- test('Retries on EventBridge failure and sends failed events to DLQ', async () => {
+ test('Retries on EventBridge failure and reports failed events as batch item failures', async () => {
eventBridgeMock
.on(PutEventsCommand)
.rejectsOnce(Object.assign(new Error('Rate limit exceeded'), { retryable: true }))
.resolves({ FailedEntryCount: 1, Entries: [{ ErrorCode: 'InternalFailure' }] });
- sqsMock.on(SendMessageCommand).resolves({ MessageId: '123' });
- await handler(snsEvent);
+ const result = await handler(sqsEvent);
expect(eventBridgeMock.calls()).toHaveLength(2);
- expect(sqsMock.calls()).toHaveLength(1);
+ expect(result.batchItemFailures).toHaveLength(1);
});
test('Throttling delays event processing', async () => {
process.env.THROTTLE_DELAY_MS = '500';
jest.useFakeTimers();
- const handlerPromise = handler(snsEvent);
+ eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });
+
+ const handlerPromise = handler(sqsEvent);
jest.advanceTimersByTime(500);
await handlerPromise;
diff --git a/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/index.js b/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/index.js
index 1ae58f08..37a67a97 100644
--- a/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/index.js
+++ b/infrastructure/terraform/modules/eventpub/lambda/eventpub/src/index.js
@@ -1,12 +1,9 @@
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
-const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
const eventBridge = new EventBridgeClient({});
-const sqs = new SQSClient({});
const DATA_PLANE_EVENT_BUS_ARN = process.env.DATA_PLANE_EVENT_BUS_ARN;
const CONTROL_PLANE_EVENT_BUS_ARN = process.env.CONTROL_PLANE_EVENT_BUS_ARN;
-const DLQ_URL = process.env.DLQ_URL;
const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
const MAX_RETRIES = 3;
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;
@@ -44,8 +41,8 @@ async function sendToEventBridge(events, eventBusArn) {
const batch = events.slice(i, i + EVENTBRIDGE_MAX_BATCH_SIZE);
const entries = batch.map(event => ({
Source: 'custom.event',
- DetailType: event.type,
- Detail: JSON.stringify(event),
+ DetailType: event.message.type,
+ Detail: JSON.stringify(event.message),
EventBusName: eventBusArn
}));
@@ -70,8 +67,8 @@ async function sendToEventBridge(events, eventBusArn) {
await new Promise(res => setTimeout(res, 2 ** attempts * 100));
attempts++;
} else {
- console.error(`Non-retryable error encountered. Moving ${batch.length} events to DLQ`);
- failedEvents.push(...batch);
+ console.error(`Non-retryable error encountered. Reporting ${batch.length} as batch item failures`);
+ failedEvents.push(...batch.map(event => ({ itemIdentifier: event.itemIdentifier })));
break;
}
}
@@ -80,21 +77,6 @@ async function sendToEventBridge(events, eventBusArn) {
return failedEvents;
}
-async function sendToDLQ(events) {
- if (events.length === 0) return;
-
- console.warn(`Sending ${events.length} failed event(s) to DLQ: ${DLQ_URL}`);
-
- for (const event of events) {
- try {
- await sqs.send(new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: JSON.stringify(event) }));
- console.debug(`Successfully sent event ${event.id} to DLQ`);
- } catch (error) {
- console.error(`Failed to send event ${event.id} to DLQ - Name: ${error.name}, Message: ${error.message}, Code: ${error.$metadata?.httpStatusCode}, RequestId: ${error.$metadata?.requestId}`);
- }
- }
-}
-
exports.handler = async (sqsEvent) => {
console.debug(`Received SQS event with ${sqsEvent.Records.length} records.`);
@@ -103,28 +85,36 @@ exports.handler = async (sqsEvent) => {
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
}
- const records = sqsEvent.Records
- .map(record => record.body)
- .map(JSON.parse)
- .map(record => record.Message)
- .map(JSON.parse);
- const validEvents = records.filter(validateEvent);
- const invalidEvents = records.filter(event => !validateEvent(event));
-
- console.debug(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);
- if (invalidEvents.length) {
- console.warn(`${invalidEvents.length} event(s) failed validation and will be sent to DLQ`);
- await sendToDLQ(invalidEvents);
- }
+ const dataPlaneEvents = [];
+ const controlPlaneEvents = [];
+ const batchItemFailures = [];
- const dataEvents = validEvents.filter(event => event.plane === 'data');
- const controlEvents = validEvents.filter(event => event.plane === 'control');
- const unknownEvents = validEvents.filter(event => event.plane !== 'data' && event.plane !== 'control');
+ for(const sqsRecord of sqsEvent.Records) {
+ const record = JSON.parse(sqsRecord.body);
+ const message = JSON.parse(record.Message);
+
+ if (!validateEvent(message)) {
+ console.error(`Invalid event received. EventID: ${message.id || 'unknown'}, EventType: ${message.type || 'unknown'}`);
+ batchItemFailures.push({ itemIdentifier: sqsRecord.messageId });
+ }
+ else if (message.plane === 'data') {
+ dataPlaneEvents.push({ itemIdentifier: sqsRecord.messageId, message: message });
+ }
+ else if (message.plane === 'control') {
+ controlPlaneEvents.push({ itemIdentifier: sqsRecord.messageId, message: message });
+ }
+ else {
+ console.error(`Unknown plane type received: ${message.plane}. EventID: ${message.id || 'unknown'}, EventType: ${message.type || 'unknown'}`);
+ batchItemFailures.push({ itemIdentifier: sqsRecord.messageId });
+ }
+ }
- console.debug(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}, Unknown events: ${unknownEvents.length}`);
+ console.debug(`Data events: ${dataPlaneEvents.length}, Control events: ${controlPlaneEvents.length}, Unknown events: ${batchItemFailures.length}`);
- const failedDataEvents = await sendToEventBridge(dataEvents, DATA_PLANE_EVENT_BUS_ARN);
- const failedControlEvents = await sendToEventBridge(controlEvents, CONTROL_PLANE_EVENT_BUS_ARN);
+ batchItemFailures.push(...await sendToEventBridge(dataPlaneEvents, DATA_PLANE_EVENT_BUS_ARN));
+ batchItemFailures.push(...await sendToEventBridge(controlPlaneEvents, CONTROL_PLANE_EVENT_BUS_ARN));
- await sendToDLQ([...failedDataEvents, ...failedControlEvents, ...unknownEvents]);
+ return {
+ batchItemFailures: batchItemFailures
+ };
};
diff --git a/infrastructure/terraform/modules/eventpub/lambda_function.tf b/infrastructure/terraform/modules/eventpub/lambda_function.tf
index f5f9f618..2d1e9325 100644
--- a/infrastructure/terraform/modules/eventpub/lambda_function.tf
+++ b/infrastructure/terraform/modules/eventpub/lambda_function.tf
@@ -23,7 +23,6 @@ resource "aws_lambda_function" "main" {
variables = {
DATA_PLANE_EVENT_BUS_ARN = var.data_plane_bus_arn
CONTROL_PLANE_EVENT_BUS_ARN = var.control_plane_bus_arn
- DLQ_URL = aws_sqs_queue.dlq.url
THROTTLE_DELAY_MS = "0"
}
}
diff --git a/infrastructure/terraform/modules/eventpub/module_sqs_queue.tf b/infrastructure/terraform/modules/eventpub/module_sqs_queue.tf
index 08f29457..39d0ebcf 100644
--- a/infrastructure/terraform/modules/eventpub/module_sqs_queue.tf
+++ b/infrastructure/terraform/modules/eventpub/module_sqs_queue.tf
@@ -1,16 +1,17 @@
module "sqs_queue" {
source = "https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/3.1.4/terraform-sqs.zip"
- aws_account_id = var.aws_account_id
- component = var.component
- environment = var.environment
- project = var.project
- region = var.region
- name = local.csi
- create_dlq = true
- sqs_kms_key_arn = var.kms_key_arn
- sqs_policy_overload = data.aws_iam_policy_document.allow_sns_send.json
- message_retention_seconds = 1209600 # 14 days
+ aws_account_id = var.aws_account_id
+ component = var.component
+ environment = var.environment
+ project = var.project
+ region = var.region
+ name = var.name
+ create_dlq = true
+ sqs_kms_key_arn = var.kms_key_arn
+ sqs_policy_overload = data.aws_iam_policy_document.allow_sns_send.json
+ message_retention_seconds = 1209600 # 14 days
+ dlq_message_retention_seconds = 1209600
}
data "aws_iam_policy_document" "allow_sns_send" {
diff --git a/infrastructure/terraform/modules/eventpub/outputs.tf b/infrastructure/terraform/modules/eventpub/outputs.tf
index 2787f33a..bca1f34f 100644
--- a/infrastructure/terraform/modules/eventpub/outputs.tf
+++ b/infrastructure/terraform/modules/eventpub/outputs.tf
@@ -6,14 +6,6 @@ output "sns_topic" {
}
}
-output "dlq" {
- description = "EventPub DLQ name and ARN"
- value = {
- arn = aws_sqs_queue.dlq.arn
- name = aws_sqs_queue.dlq.name
- }
-}
-
output "sqs_queue" {
description = "EventPub SQS queue name, ARN, and DLQ details"
value = {
diff --git a/infrastructure/terraform/modules/eventpub/sqs_queue_dlq.tf b/infrastructure/terraform/modules/eventpub/sqs_queue_dlq.tf
deleted file mode 100644
index 9af9ddbd..00000000
--- a/infrastructure/terraform/modules/eventpub/sqs_queue_dlq.tf
+++ /dev/null
@@ -1,7 +0,0 @@
-resource "aws_sqs_queue" "dlq" {
- name = "${local.csi}-dlq"
-
- kms_master_key_id = var.kms_key_arn
-
- message_retention_seconds = 1209600
-}