diff --git a/docs/package.json b/docs/package.json index c694856e1..ae9980271 100644 --- a/docs/package.json +++ b/docs/package.json @@ -15,6 +15,7 @@ "debug": "JEKYLL_ENV=development BUNDLE_GEMFILE=Gemfile bundle exec jekyll serve --config _config.yml,_config.dev.yml,_config.version.yml --limit_posts 100 --trace", "generate-includes": "./generate-includes.sh", "lint": "echo \"Documentation module has no code to lint\"", + "lint:fix": "echo \"Documentation module has no code to lint\"", "test:unit": "echo \"Documentation module has no unit tests\"", "typecheck": "echo \"Documentation module has no typescript to typecheck\"" }, diff --git a/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf b/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf index d30c8c84a..52a444ec4 100644 --- a/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf +++ b/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf @@ -35,7 +35,8 @@ module "letter_updates_transformer" { log_subscription_role_arn = local.acct.log_subscription_role_arn lambda_env_vars = merge(local.common_lambda_env_vars, { - EVENTPUB_SNS_TOPIC_ARN = "${module.eventpub.sns_topic.arn}" + EVENTPUB_SNS_TOPIC_ARN = "${module.eventpub.sns_topic.arn}", + EVENT_SOURCE = "/data-plane/supplier-api/${var.group}/${var.environment}/letters" }) } diff --git a/internal/events/package.json b/internal/events/package.json index 04fcac176..08abc7453 100644 --- a/internal/events/package.json +++ b/internal/events/package.json @@ -50,5 +50,5 @@ "typecheck": "tsc --noEmit" }, "types": "dist/index.d.ts", - "version": "1.0.8" + "version": "1.0.9" } diff --git a/internal/events/schemas/examples/letter.ACCEPTED.json b/internal/events/schemas/examples/letter.ACCEPTED.json index c6533b93b..77a66c5b2 100644 --- a/internal/events/schemas/examples/letter.ACCEPTED.json +++ b/internal/events/schemas/examples/letter.ACCEPTED.json @@ -18,7 +18,7 @@ "recordedtime": "2025-08-28T08:45:00.000Z", "severitynumber": 2, "severitytext": "INFO", - "source": "/data-plane/supplier-api/prod/update-status", + "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status", "specversion": "1.0", "subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479", "time": "2025-08-28T08:45:00.000Z", diff --git a/internal/events/schemas/examples/letter.FORWARDED.json b/internal/events/schemas/examples/letter.FORWARDED.json index 6661fe6c1..cd5bd1731 100644 --- a/internal/events/schemas/examples/letter.FORWARDED.json +++ b/internal/events/schemas/examples/letter.FORWARDED.json @@ -20,7 +20,7 @@ "recordedtime": "2025-08-28T08:45:00.000Z", "severitynumber": 2, "severitytext": "INFO", - "source": "/data-plane/supplier-api/prod/update-status", + "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status", "specversion": "1.0", "subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479", "time": "2025-08-28T08:45:00.000Z", diff --git a/internal/events/schemas/examples/letter.RETURNED.json b/internal/events/schemas/examples/letter.RETURNED.json index f0cfa3761..1f54aea5a 100644 --- a/internal/events/schemas/examples/letter.RETURNED.json +++ b/internal/events/schemas/examples/letter.RETURNED.json @@ -20,7 +20,7 @@ "recordedtime": "2025-08-28T08:45:00.000Z", "severitynumber": 2, "severitytext": "INFO", - "source": "/data-plane/supplier-api/prod/update-status", + "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status", "specversion": "1.0", "subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479", "time": "2025-08-28T08:45:00.000Z", diff --git a/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts b/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts index 67211462e..065a9a1e3 100644 --- a/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts +++ b/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts @@ -26,15 +26,18 @@ jest.mock("crypto", () => ({ randomBytes: (size: number) => randomBytes[String(size)], })); -describe("letter-updates-transformer Lambda", () => { - const mockedDeps: jest.Mocked = { - snsClient: { send: jest.fn() } as unknown as SNSClient, - logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger, - env: { - EVENTPUB_SNS_TOPIC_ARN: "arn:aws:sns:region:account:topic", - } as unknown as EnvVars, - } as Deps; +const eventSource = + "/data-plane/supplier-api/nhs-supplier-api-dev/main/letters"; +const mockedDeps: jest.Mocked = { + snsClient: { send: jest.fn() } as unknown as SNSClient, + logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger, + env: { + EVENTPUB_SNS_TOPIC_ARN: "arn:aws:sns:region:account:topic", + EVENT_SOURCE: eventSource, + } as unknown as EnvVars, +} as Deps; +describe("letter-updates-transformer Lambda", () => { beforeEach(() => { jest.useFakeTimers(); }); @@ -50,7 +53,9 @@ describe("letter-updates-transformer Lambda", () => { const newLetter = generateLetter("PRINTED"); const expectedEntries = [ expect.objectContaining({ - Message: JSON.stringify(mapLetterToCloudEvent(newLetter)), + Message: JSON.stringify( + mapLetterToCloudEvent(newLetter, eventSource), + ), }), ]; @@ -76,7 +81,9 @@ describe("letter-updates-transformer Lambda", () => { newLetter.reasonCode = "R1"; const expectedEntries = [ expect.objectContaining({ - Message: JSON.stringify(mapLetterToCloudEvent(newLetter)), + Message: JSON.stringify( + mapLetterToCloudEvent(newLetter, eventSource), + ), }), ]; @@ -103,7 +110,9 @@ describe("letter-updates-transformer Lambda", () => { newLetter.reasonCode = "R2"; const expectedEntries = [ expect.objectContaining({ - Message: JSON.stringify(mapLetterToCloudEvent(newLetter)), + Message: JSON.stringify( + mapLetterToCloudEvent(newLetter, eventSource), + ), }), ]; @@ -135,14 +144,28 @@ describe("letter-updates-transformer Lambda", () => { expect(mockedDeps.snsClient.send).not.toHaveBeenCalled(); }); - it("does not publish non-modify events", async () => { + it("publishes INSERT events", async () => { const handler = createHandler(mockedDeps); const newLetter = generateLetter("ACCEPTED"); + const expectedEntries = [ + expect.objectContaining({ + Message: JSON.stringify( + mapLetterToCloudEvent(newLetter, eventSource), + ), + }), + ]; const testData = generateKinesisEvent([generateInsertRecord(newLetter)]); await handler(testData, mockDeep(), jest.fn()); - expect(mockedDeps.snsClient.send).not.toHaveBeenCalled(); + expect(mockedDeps.snsClient.send).toHaveBeenCalledWith( + expect.objectContaining({ + input: expect.objectContaining({ + TopicArn: "arn:aws:sns:region:account:topic", + PublishBatchRequestEntries: expectedEntries, + }), + }), + ); }); it("does not publish invalid letter data", async () => { @@ -159,6 +182,55 @@ describe("letter-updates-transformer Lambda", () => { expect(mockedDeps.snsClient.send).not.toHaveBeenCalled(); }); + + it("throws error when kinesis data contains malformed JSON", async () => { + const handler = createHandler(mockedDeps); + + // Create a Kinesis event with malformed JSON data + const malformedKinesisEvent: KinesisStreamEvent = { + Records: [ + { + kinesis: { + data: Buffer.from("invalid-json-data").toString("base64"), + sequenceNumber: "12345", + }, + } as any, + ], + }; + + await expect( + handler(malformedKinesisEvent, mockDeep(), jest.fn()), + ).rejects.toThrow(); + + expect(mockedDeps.logger.error).toHaveBeenCalledWith( + expect.objectContaining({ + description: "Error extracting payload", + error: expect.any(Error), + record: expect.objectContaining({ + kinesis: expect.objectContaining({ + data: Buffer.from("invalid-json-data").toString("base64"), + }), + }), + }), + ); + }); + + it("handles events with no records", async () => { + const handler = createHandler(mockedDeps); + + // Create a Kinesis event with empty Records array + const emptyKinesisEvent: KinesisStreamEvent = { Records: [] }; + + await handler(emptyKinesisEvent, mockDeep(), jest.fn()); + + expect(mockedDeps.logger.info).toHaveBeenCalledWith( + expect.objectContaining({ + description: "Number of records", + count: 0, + }), + ); + expect(mockedDeps.snsClient.send).not.toHaveBeenCalled(); + }); }); describe("Batching", () => { @@ -168,7 +240,7 @@ describe("letter-updates-transformer Lambda", () => { const newLetters = generateLetters(10, "PRINTED"); const expectedEntries = newLetters.map((letter) => expect.objectContaining({ - Message: JSON.stringify(mapLetterToCloudEvent(letter)), + Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)), }), ); @@ -197,19 +269,19 @@ describe("letter-updates-transformer Lambda", () => { newLetters.slice(0, 10).map((letter, index) => expect.objectContaining({ Id: expect.stringMatching(new RegExp(`-${index}$`)), - Message: JSON.stringify(mapLetterToCloudEvent(letter)), + Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)), }), ), newLetters.slice(10, 20).map((letter, index) => expect.objectContaining({ Id: expect.stringMatching(new RegExp(`-${index}$`)), - Message: JSON.stringify(mapLetterToCloudEvent(letter)), + Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)), }), ), newLetters.slice(20).map((letter, index) => expect.objectContaining({ Id: expect.stringMatching(new RegExp(`-${index}$`)), - Message: JSON.stringify(mapLetterToCloudEvent(letter)), + Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)), }), ), ]; diff --git a/lambdas/letter-updates-transformer/src/env.ts b/lambdas/letter-updates-transformer/src/env.ts index f93bbf39f..a91381864 100644 --- a/lambdas/letter-updates-transformer/src/env.ts +++ b/lambdas/letter-updates-transformer/src/env.ts @@ -2,6 +2,7 @@ import { z } from "zod"; const EnvVarsSchema = z.object({ EVENTPUB_SNS_TOPIC_ARN: z.string(), + EVENT_SOURCE: z.string(), }); export type EnvVars = z.infer; diff --git a/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts b/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts index 85c6d2c94..c53151c07 100644 --- a/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts +++ b/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts @@ -20,17 +20,20 @@ const BATCH_SIZE = 10; export default function createHandler(deps: Deps): Handler { return async (streamEvent: KinesisStreamEvent) => { deps.logger.info({ description: "Received event", streamEvent }); + deps.logger.info({ + description: "Number of records", + count: streamEvent.Records?.length || 0, + }); - const cloudEvents: LetterEvent[] = streamEvent.Records.map((record) => + // Ensure logging by extracting all records first + const ddbRecords: DynamoDBRecord[] = streamEvent.Records.map((record) => extractPayload(record, deps), - ) - .filter((record) => record.eventName === "MODIFY") - .filter( - (record) => - isChanged(record, "status") || isChanged(record, "reasonCode"), - ) + ); + + const cloudEvents: LetterEvent[] = ddbRecords + .filter((record) => filterRecord(record, deps)) .map((element) => extractNewLetter(element)) - .map((element) => mapLetterToCloudEvent(element)); + .map((element) => mapLetterToCloudEvent(element, deps.env.EVENT_SOURCE)); for (const batch of generateBatches(cloudEvents)) { deps.logger.info({ @@ -50,14 +53,54 @@ export default function createHandler(deps: Deps): Handler { }; } +function filterRecord(record: DynamoDBRecord, deps: Deps): boolean { + let allowEvent = false; + if (record.eventName === "INSERT") { + allowEvent = true; + } + + if ( + record.eventName === "MODIFY" && + (isChanged(record, "status") || isChanged(record, "reasonCode")) + ) { + allowEvent = true; + } + + deps.logger.info({ + description: "Filtering record", + eventName: record.eventName, + eventId: record.eventID, + allowEvent, + }); + + return allowEvent; +} + function extractPayload( record: KinesisStreamRecord, deps: Deps, ): DynamoDBRecord { - // Kinesis data is base64 encoded - const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8"); - deps.logger.info({ description: "Extracted dynamoDBRecord", payload }); - return JSON.parse(payload); + try { + deps.logger.info({ + description: "Processing Kinesis record", + recordId: record.kinesis.sequenceNumber, + }); + + // Kinesis data is base64 encoded + const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8"); + deps.logger.info({ description: "Decoded payload", payload }); + + const jsonParsed = JSON.parse(payload); + deps.logger.info({ description: "Extracted dynamoDBRecord", jsonParsed }); + return jsonParsed; + } catch (error) { + deps.logger.error({ + description: "Error extracting payload", + error, + record, + }); + throw error; + } } function isChanged(record: DynamoDBRecord, property: string): boolean { diff --git a/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts b/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts index d36f52b40..ba91099c4 100644 --- a/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts +++ b/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts @@ -17,7 +17,8 @@ describe("letter-mapper", () => { source: "letter-rendering/source/test", subject: "letter-rendering/source/letter/letter-id", } as Letter; - const event = mapLetterToCloudEvent(letter); + const source = "/data-plane/supplier-api/nhs-supplier-api-dev/main/letters"; + const event = mapLetterToCloudEvent(letter, source); // Check it conforms to the letter event schema - parse will throw an error if not $LetterEvent.parse(event); @@ -25,7 +26,7 @@ describe("letter-mapper", () => { expect(event.dataschema).toBe( `https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.PRINTED.${event.dataschemaversion}.schema.json`, ); - expect(event.dataschemaversion).toBe("1.0.8"); + expect(event.dataschemaversion).toMatch(/1\.\d+\.\d+/); expect(event.subject).toBe("letter-origin/supplier-api/letter/id1"); expect(event.time).toBe("2025-11-24T15:55:18.000Z"); expect(event.recordedtime).toBe("2025-11-24T15:55:18.000Z"); @@ -45,5 +46,6 @@ describe("letter-mapper", () => { event: event.id, }, }); + expect(event.source).toBe(source); }); }); diff --git a/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts b/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts index 34cd23c56..468d9ffd6 100644 --- a/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts +++ b/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts @@ -5,6 +5,7 @@ import { LetterForEventPub } from "../types"; export default function mapLetterToCloudEvent( letter: LetterForEventPub, + source: string, ): LetterEvent { const eventId = randomUUID(); const dataschemaversion = eventSchemaPackage.version; @@ -15,7 +16,7 @@ export default function mapLetterToCloudEvent( plane: "data", dataschema: `https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.${letter.status}.${dataschemaversion}.schema.json`, dataschemaversion, - source: "/data-plane/supplier-api/letters", + source, subject: `letter-origin/supplier-api/letter/${letter.id}`, data: { diff --git a/package-lock.json b/package-lock.json index baee66ec3..61aace1fc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -123,7 +123,7 @@ }, "internal/events": { "name": "@nhsdigital/nhs-notify-event-schemas-supplier-api", - "version": "1.0.8", + "version": "1.0.9", "license": "MIT", "dependencies": { "@asyncapi/bundler": "^0.6.4", diff --git a/pact-contracts/package.json b/pact-contracts/package.json index c28d516d7..f97c4569f 100644 --- a/pact-contracts/package.json +++ b/pact-contracts/package.json @@ -1,25 +1,26 @@ { - "name": "@nhsdigital/notify-supplier-api-consumer-contracts", - "version": "1.0.1", "description": "NHS Notify Supplier API Pact contracts", - "license": "MIT", - "repository": { - "type": "git", - "url": "git+ssh://git@github.com/NHSDigital/nhs-notify-supplier-api.git" - }, "exports": { "./pacts/*.json": { "default": "./pacts/*.json" } }, + "license": "MIT", + "name": "@nhsdigital/notify-supplier-api-consumer-contracts", "publishConfig": { "access": "public", "registry": "https://npm.pkg.github.com" }, + "repository": { + "type": "git", + "url": "git+ssh://git@github.com/NHSDigital/nhs-notify-supplier-api.git" + }, "scripts": { "lint": "echo Linting not required", + "lint:fix": "echo Linting not required", "test:unit": "echo Unit tests not required", "test:unit:coverage": "echo Unit tests not required", "typecheck": "echo Typecheck not required" - } + }, + "version": "1.0.1" }