From 54d13b8189dc467bde9e4b5f6fb27a854e4b5ae6 Mon Sep 17 00:00:00 2001 From: David Wass Date: Tue, 12 May 2026 11:20:53 +0100 Subject: [PATCH 1/4] CCM-12392: Allocation idempotency --- .../api/module_lambda_supplier_allocator.tf | 4 +- lambdas/supplier-allocator/package.json | 1 + .../src/config/__tests__/deps.test.ts | 13 ++ .../src/config/__tests__/env.test.ts | 2 + lambdas/supplier-allocator/src/config/deps.ts | 25 +-- lambdas/supplier-allocator/src/config/env.ts | 1 + .../__tests__/allocate-handler.test.ts | 105 ++++++----- .../src/handler/allocate-handler.ts | 168 ++++++++++++------ package-lock.json | 10 +- 9 files changed, 202 insertions(+), 127 deletions(-) diff --git a/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf b/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf index 7bb3595fb..388934831 100644 --- a/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf +++ b/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf @@ -35,7 +35,8 @@ module "supplier_allocator" { log_subscription_role_arn = local.acct.log_subscription_role_arn lambda_env_vars = merge(local.common_lambda_env_vars, { - UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url + UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url, + IDEMPOTENCY_TABLE_NAME = aws_dynamodb_table.idempotency.name }) } @@ -110,6 +111,7 @@ data "aws_iam_policy_document" "supplier_allocator_lambda" { resources = [ aws_dynamodb_table.supplier-quotas.arn, + aws_dynamodb_table.idempotency.arn, "${aws_dynamodb_table.supplier-quotas.arn}/index/*" ] } diff --git a/lambdas/supplier-allocator/package.json b/lambdas/supplier-allocator/package.json index 5c6e422bb..1edbda727 100644 --- a/lambdas/supplier-allocator/package.json +++ b/lambdas/supplier-allocator/package.json @@ -1,5 +1,6 @@ { "dependencies": { + "@aws-lambda-powertools/idempotency": "^2.33.0", "@aws-sdk/client-dynamodb": "^3.858.0", "@aws-sdk/client-sqs": "^3.984.0", "@aws-sdk/lib-dynamodb": "^3.1008.0", diff --git a/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts b/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts index 88d04eab5..316fc91c7 100644 --- a/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts +++ b/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts @@ -4,6 +4,7 @@ describe("createDependenciesContainer", () => { const env = { SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", }; beforeEach(() => { @@ -27,6 +28,10 @@ describe("createDependenciesContainer", () => { SupplierQuotasRepository: jest.fn(), })); + jest.mock("@aws-lambda-powertools/idempotency/dynamodb", () => ({ + DynamoDBPersistenceLayer: jest.fn(), + })); + // Env jest.mock("../env", () => ({ envVars: env })); }); @@ -40,6 +45,9 @@ describe("createDependenciesContainer", () => { const { SupplierQuotasRepository } = jest.requireMock( "@internal/datastore", ); + const { DynamoDBPersistenceLayer } = jest.requireMock( + "@aws-lambda-powertools/idempotency/dynamodb", + ); // eslint-disable-next-line @typescript-eslint/no-require-imports const { createDependenciesContainer } = require("../deps"); const deps: Deps = createDependenciesContainer(); @@ -54,6 +62,11 @@ describe("createDependenciesContainer", () => { expect(supplierQuotasRepoCtorArgs[1]).toEqual({ supplierQuotasTableName: "SupplierQuotasTable", }); + expect(DynamoDBPersistenceLayer).toHaveBeenCalledTimes(1); + const idempotencyLayerCtorArgs = DynamoDBPersistenceLayer.mock.calls[0][0]; + expect(idempotencyLayerCtorArgs).toEqual({ + tableName: "IdempotencyTable", + }); expect(deps.env).toEqual(env); }); }); diff --git a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts index 1f4da34cb..6274a6186 100644 --- a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts +++ b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts @@ -16,12 +16,14 @@ describe("lambdaEnv", () => { it("should load all environment variables successfully", () => { process.env.SUPPLIER_CONFIG_TABLE_NAME = "SupplierConfigTable"; process.env.SUPPLIER_QUOTAS_TABLE_NAME = "SupplierQuotasTable"; + process.env.IDEMPOTENCY_TABLE_NAME = "IdempotencyTable"; const { envVars } = require("../env"); expect(envVars).toEqual({ SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", }); }); }); diff --git a/lambdas/supplier-allocator/src/config/deps.ts b/lambdas/supplier-allocator/src/config/deps.ts index 5f58a00e0..e937d64ef 100644 --- a/lambdas/supplier-allocator/src/config/deps.ts +++ b/lambdas/supplier-allocator/src/config/deps.ts @@ -1,5 +1,6 @@ import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; import { DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb"; +import { DynamoDBPersistenceLayer } from "@aws-lambda-powertools/idempotency/dynamodb"; import { SQSClient } from "@aws-sdk/client-sqs"; import { Logger } from "pino"; import { createLogger } from "@internal/helpers"; @@ -12,6 +13,7 @@ import { EnvVars, envVars } from "./env"; export type Deps = { supplierConfigRepo: SupplierConfigRepository; supplierQuotasRepo: SupplierQuotasRepository; + idempotencyLayer: DynamoDBPersistenceLayer; logger: Logger; env: EnvVars; sqsClient: SQSClient; @@ -22,11 +24,7 @@ function createDocumentClient(): DynamoDBDocumentClient { return DynamoDBDocumentClient.from(ddbClient); } -function createSupplierConfigRepository( - log: Logger, - // eslint-disable-next-line @typescript-eslint/no-shadow - envVars: EnvVars, -): SupplierConfigRepository { +function createSupplierConfigRepository(): SupplierConfigRepository { const config = { supplierConfigTableName: envVars.SUPPLIER_CONFIG_TABLE_NAME, }; @@ -34,11 +32,7 @@ function createSupplierConfigRepository( return new SupplierConfigRepository(createDocumentClient(), config); } -function createSupplierQuotasRepository( - log: Logger, - // eslint-disable-next-line @typescript-eslint/no-shadow - envVars: EnvVars, -): SupplierQuotasRepository { +function createSupplierQuotasRepository(): SupplierQuotasRepository { const config = { supplierQuotasTableName: envVars.SUPPLIER_QUOTAS_TABLE_NAME, }; @@ -46,12 +40,19 @@ function createSupplierQuotasRepository( return new SupplierQuotasRepository(createDocumentClient(), config); } +function createIdempotencyLayer(): DynamoDBPersistenceLayer { + return new DynamoDBPersistenceLayer({ + tableName: envVars.IDEMPOTENCY_TABLE_NAME, + }); +} + export function createDependenciesContainer(): Deps { const log = createLogger({ logLevel: envVars.PINO_LOG_LEVEL }); return { - supplierConfigRepo: createSupplierConfigRepository(log, envVars), - supplierQuotasRepo: createSupplierQuotasRepository(log, envVars), + supplierConfigRepo: createSupplierConfigRepository(), + supplierQuotasRepo: createSupplierQuotasRepository(), + idempotencyLayer: createIdempotencyLayer(), logger: log, env: envVars, sqsClient: new SQSClient({}), diff --git a/lambdas/supplier-allocator/src/config/env.ts b/lambdas/supplier-allocator/src/config/env.ts index a155e4dbc..7d99ef0df 100644 --- a/lambdas/supplier-allocator/src/config/env.ts +++ b/lambdas/supplier-allocator/src/config/env.ts @@ -4,6 +4,7 @@ const EnvVarsSchema = z.object({ SUPPLIER_CONFIG_TABLE_NAME: z.string(), SUPPLIER_QUOTAS_TABLE_NAME: z.string(), PINO_LOG_LEVEL: z.coerce.string().optional(), + IDEMPOTENCY_TABLE_NAME: z.string(), }); export type EnvVars = z.infer; diff --git a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts index 39ae14c84..cc02087cd 100644 --- a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts +++ b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts @@ -7,15 +7,11 @@ import { $LetterStatusChangeEvent, LetterStatusChangeEvent, } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events"; -import { - SupplierConfigRepository, - SupplierQuotasRepository, -} from "@internal/datastore"; +import { makeIdempotent } from "@aws-lambda-powertools/idempotency"; import createSupplierAllocatorHandler from "../allocate-handler"; import * as supplierConfig from "../../services/supplier-config"; import * as supplierQuotas from "../../services/supplier-quotas"; import * as allocationConfig from "../allocation-config"; - import { Deps } from "../../config/deps"; import packageJson from "../../../package.json"; @@ -28,6 +24,14 @@ jest.mock("../../services/supplier-config"); jest.mock("../../services/supplier-quotas"); jest.mock("../allocation-config"); +jest.mock("@aws-lambda-powertools/idempotency", () => { + const original = jest.requireActual("@aws-lambda-powertools/idempotency"); + return { + ...original, + makeIdempotent: jest.fn((fn, _) => fn), + }; +}); + function createSQSEvent(records: SQSRecord[]): SQSEvent { return { Records: records, @@ -185,16 +189,15 @@ function setupDefaultMocks() { } describe("createSupplierAllocatorHandler", () => { - let mockSqsClient: jest.Mocked; - let mockedDeps: jest.Mocked; - let mockedSupplierConfigRepo: jest.Mocked; - let mockedSupplierQuotasRepo: jest.Mocked; - beforeEach(() => { - mockSqsClient = { - send: jest.fn(), - } as unknown as jest.Mocked; - - mockedSupplierConfigRepo = { + const mockedDeps: jest.Mocked = { + logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger, + env: { + SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", + SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", + }, + sqsClient: { send: jest.fn() } as unknown as SQSClient, + supplierConfigRepo: { ddbClient: {} as any, config: {} as any, getLetterVariant: jest.fn(), @@ -203,27 +206,18 @@ describe("createSupplierAllocatorHandler", () => { getSuppliersDetails: jest.fn(), getSupplierPacksForPackSpecification: jest.fn(), getPackSpecification: jest.fn(), - }; - - mockedSupplierQuotasRepo = { + }, + supplierQuotasRepo: { ddbClient: {} as any, config: {} as any, getOverallAllocation: jest.fn(), updateOverallAllocation: jest.fn(), getDailyAllocation: jest.fn(), updateDailyAllocation: jest.fn(), - }; + }, + } as unknown as Deps; - mockedDeps = { - logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger, - env: { - SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", - SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", - }, - sqsClient: mockSqsClient, - supplierConfigRepo: mockedSupplierConfigRepo, - supplierQuotasRepo: mockedSupplierQuotasRepo, - }; + beforeEach(() => { jest.clearAllMocks(); }); @@ -244,8 +238,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -281,8 +275,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -315,8 +309,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; const messageBody = JSON.parse(sendCall.input.MessageBody); expect(messageBody.allocationDetails.supplierSpec).toEqual({ supplierId: "supplier1", @@ -361,7 +355,7 @@ describe("createSupplierAllocatorHandler", () => { const handler = createSupplierAllocatorHandler(mockedDeps); await handler(evt, {} as any, {} as any); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; const messageBody = JSON.parse(sendCall.input.MessageBody); expect(messageBody.letterEvent.data.domainId).toBe("letter-test"); }); @@ -410,7 +404,7 @@ describe("createSupplierAllocatorHandler", () => { if (!result) throw new Error("expected BatchResponse, got void"); expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(2); + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(2); }); test("returns batch failure for invalid JSON", async () => { @@ -480,7 +474,7 @@ describe("createSupplierAllocatorHandler", () => { process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; const sqsError = new Error("SQS send failed"); - (mockSqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError); + (mockedDeps.sqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError); const handler = createSupplierAllocatorHandler(mockedDeps); const result = await handler(evt, {} as any, {} as any); @@ -513,7 +507,7 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(1); expect(result.batchItemFailures[0].itemIdentifier).toBe("fail-msg"); - expect(mockSqsClient.send).toHaveBeenCalledTimes(2); + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(2); }); test("sends correct queue URL in SQS message command", async () => { @@ -529,7 +523,7 @@ describe("createSupplierAllocatorHandler", () => { const handler = createSupplierAllocatorHandler(mockedDeps); await handler(evt, {} as any, {} as any); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall.input.QueueUrl).toBe(queueUrl); }); @@ -557,8 +551,8 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -667,8 +661,9 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock + .calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -711,8 +706,8 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -771,4 +766,22 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); expect(allocationConfig.selectSupplierByFactor).toHaveBeenCalledTimes(2); }); + + test("does not process a message more than once due to idempotency wrapper", async () => { + const preparedEvent = createPreparedV2Event(); + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + setupDefaultMocks(); + (makeIdempotent as jest.Mock).mockImplementationOnce((_fn) => "supplier1"); + + const handler = createSupplierAllocatorHandler(mockedDeps); + await handler(evt, {} as any, {} as any); + + expect(makeIdempotent).toHaveBeenCalledTimes(1); + expect(mockedDeps.sqsClient.send).not.toHaveBeenCalled(); + }); }); diff --git a/lambdas/supplier-allocator/src/handler/allocate-handler.ts b/lambdas/supplier-allocator/src/handler/allocate-handler.ts index ec3cc106d..affecdf66 100644 --- a/lambdas/supplier-allocator/src/handler/allocate-handler.ts +++ b/lambdas/supplier-allocator/src/handler/allocate-handler.ts @@ -1,4 +1,4 @@ -import { SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; +import { Context, SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; import { SendMessageCommand } from "@aws-sdk/client-sqs"; import { LetterVariant, @@ -13,6 +13,10 @@ import { buildEMFObject, formatGroupId, } from "@internal/helpers"; +import { + IdempotencyConfig, + makeIdempotent, +} from "@aws-lambda-powertools/idempotency"; import { getVariantDetails, getVolumeGroupDetails, @@ -29,6 +33,10 @@ import { import { Deps } from "../config/deps"; import { PreparedEventSchema, PreparedEvents, SupplierDetails } from "./types"; +const idempotencyConfig = new IdempotencyConfig({ + eventKeyJmesPath: "id", +}); + function parseQueueMessage(queueMessage: string): PreparedEvents { const result = PreparedEventSchema.safeParse(queueMessage); @@ -258,12 +266,91 @@ async function saveAllocations( } } +type SupplierAllocationResult = [ + supplier: string, + priority: string, + AllocationMetrics, + AllocationMetrics, + VolumeGroupAllocation, +]; + +async function processSupplierAllocation( + letterEvent: PreparedEvents, + deps: Deps, + perAllocationSuccess: AllocationMetrics, + perAllocationFailure: AllocationMetrics, + volumeGroupAllocations: VolumeGroupAllocation, +): Promise { + const supplierDetails: SupplierDetails = await getSupplierFromConfig( + letterEvent, + deps, + ); + deps.logger.info({ + description: "Resolved supplier details from config", + supplierDetails, + }); + const supplierSpec = supplierDetails?.allocationDetails?.supplierSpec; + + const supplier = supplierSpec.supplierId; + const priority = String(supplierSpec.priority); + + if (supplierDetails.allocationDetails.allocationStatus.status === "PENDING") { + incrementMetric(perAllocationSuccess, supplier, priority); + emitDataMetrics(letterEvent, supplier, "extra_data_dimensions", deps); + + incrementAllocation( + volumeGroupAllocations, + supplierDetails.volumeGroupId, + supplier, + 1, + deps, + ); + } else { + incrementMetric(perAllocationFailure, supplier, priority); + } + + // Send to allocated letters queue + const queueUrl = process.env.UPSERT_LETTERS_QUEUE_URL; + if (!queueUrl) { + throw new Error("UPSERT_LETTERS_QUEUE_URL not configured"); + } + + const queueMessage = { + letterEvent, + allocationDetails: supplierDetails.allocationDetails, + }; + + deps.logger.info({ + description: "Sending message to upsert letter queue", + msg: queueMessage, + url: queueUrl, + }); + + await deps.sqsClient.send( + new SendMessageCommand({ + QueueUrl: queueUrl, + MessageBody: JSON.stringify(queueMessage), + }), + ); + return [ + supplier, + priority, + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, + ]; +} + export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { - return async (event: SQSEvent) => { + const getSupplierIdempotently = makeIdempotent(processSupplierAllocation, { + persistenceStore: deps.idempotencyLayer, + config: idempotencyConfig, + }); + return async (event: SQSEvent, context: Context) => { const batchItemFailures: SQSBatchItemFailure[] = []; - const perAllocationSuccess: AllocationMetrics = new Map(); - const perAllocationFailure: AllocationMetrics = new Map(); - const volumeGroupAllocations: VolumeGroupAllocation = new Map(); + let perAllocationSuccess: AllocationMetrics = new Map(); + let perAllocationFailure: AllocationMetrics = new Map(); + let volumeGroupAllocations: VolumeGroupAllocation = new Map(); const tasks = event.Records.map(async (record) => { let supplier = "unknown"; @@ -277,63 +364,26 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { messageId: record.messageId, }); - const supplierDetails: SupplierDetails = await getSupplierFromConfig( - letterEvent, - deps, - ); + idempotencyConfig.registerLambdaContext(context); - deps.logger.info({ - description: "Resolved supplier details from config", - supplierDetails, - }); - const supplierSpec = supplierDetails?.allocationDetails?.supplierSpec; - - supplier = supplierSpec.supplierId; - priority = String(supplierSpec.priority); - - if ( - supplierDetails.allocationDetails.allocationStatus.status === - "PENDING" - ) { - incrementMetric(perAllocationSuccess, supplier, priority); - - incrementAllocation( - volumeGroupAllocations, - supplierDetails.volumeGroupId, - supplier, - 1, - deps, - ); - } else { - incrementMetric(perAllocationFailure, supplier, priority); - } - - // Send to allocated letters queue - const queueUrl = process.env.UPSERT_LETTERS_QUEUE_URL; - if (!queueUrl) { - throw new Error("UPSERT_LETTERS_QUEUE_URL not configured"); - } - - const queueMessage = { + const [ + allocatedSupplier, + alocatedPriority, + updatedPerAllocationSuccess, + updatedPerAllocationFailure, + updatedVolumeGroupAllocations, + ] = await getSupplierIdempotently( letterEvent, - allocationDetails: supplierDetails.allocationDetails, - }; - - deps.logger.info({ - description: "Sending message to upsert letter queue", - msg: queueMessage, - url: queueUrl, - }); - - await deps.sqsClient.send( - new SendMessageCommand({ - QueueUrl: queueUrl, - MessageBody: JSON.stringify(queueMessage), - }), + deps, + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, ); - - incrementMetric(perAllocationSuccess, supplier, priority); - emitDataMetrics(letterEvent, supplier, "extra_data_dimensions", deps); + supplier = allocatedSupplier; + priority = alocatedPriority; + perAllocationSuccess = updatedPerAllocationSuccess; + perAllocationFailure = updatedPerAllocationFailure; + volumeGroupAllocations = updatedVolumeGroupAllocations; } catch (error) { deps.logger.error({ description: "Error processing allocation of record", diff --git a/package-lock.json b/package-lock.json index c8fcc257a..3baf4e24a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -257,6 +257,7 @@ "name": "nhs-notify-supplier-api-allocate-letter", "version": "0.0.1", "dependencies": { + "@aws-lambda-powertools/idempotency": "^2.33.0", "@aws-sdk/client-dynamodb": "^3.858.0", "@aws-sdk/client-sqs": "^3.984.0", "@aws-sdk/lib-dynamodb": "^3.1008.0", @@ -5197,15 +5198,6 @@ "resolved": "internal/events", "link": true }, - "node_modules/@nhsdigital/nhs-notify-event-schemas-supplier-config": { - "version": "1.0.1", - "resolved": "https://npm.pkg.github.com/download/@nhsdigital/nhs-notify-event-schemas-supplier-config/1.0.1/ff1ce566201ae291825acd5e771537229d6aa9ca", - "integrity": "sha512-gIZgfzgvkCfZE+HCosrVJ3tBce2FJRGfwPmtYtZDBG+ox/KvbpJFWXzJ5Jkh/42YzcVn2GxT1fy1L1F6pxiYWA==", - "dependencies": { - "@asyncapi/bundler": "^0.6.4", - "zod": "^4.1.12" - } - }, "node_modules/@nhsdigital/notify-digital-letters-consumer-contracts": { "version": "1.0.1", "resolved": "https://npm.pkg.github.com/download/@nhsdigital/notify-digital-letters-consumer-contracts/1.0.1/a721d9c8b1e01a61de4ecc2b62d3c692e5213bb8", From 6de3928ae2078cc749964ffcb9c188780173e52a Mon Sep 17 00:00:00 2001 From: David Wass Date: Thu, 14 May 2026 14:22:26 +0100 Subject: [PATCH 2/4] extra logging --- .../src/handler/allocate-handler.ts | 77 ++++++++++++------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/lambdas/supplier-allocator/src/handler/allocate-handler.ts b/lambdas/supplier-allocator/src/handler/allocate-handler.ts index affecdf66..a6678921a 100644 --- a/lambdas/supplier-allocator/src/handler/allocate-handler.ts +++ b/lambdas/supplier-allocator/src/handler/allocate-handler.ts @@ -32,6 +32,7 @@ import { import { Deps } from "../config/deps"; import { PreparedEventSchema, PreparedEvents, SupplierDetails } from "./types"; +import { domain } from "zod/v4/core/regexes.cjs"; const idempotencyConfig = new IdempotencyConfig({ eventKeyJmesPath: "id", @@ -266,13 +267,13 @@ async function saveAllocations( } } -type SupplierAllocationResult = [ - supplier: string, - priority: string, - AllocationMetrics, - AllocationMetrics, - VolumeGroupAllocation, -]; +type SupplierAllocationResult = { + supplier: string; + priority: string; + perAllocationSuccess: AllocationMetrics; + perAllocationFailure: AllocationMetrics; + volumeGroupAllocations: VolumeGroupAllocation; +}; async function processSupplierAllocation( letterEvent: PreparedEvents, @@ -281,6 +282,12 @@ async function processSupplierAllocation( perAllocationFailure: AllocationMetrics, volumeGroupAllocations: VolumeGroupAllocation, ): Promise { + deps.logger.info({ + description: "Processing supplier allocation for letter event", + eventId: letterEvent.id, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, + }); const supplierDetails: SupplierDetails = await getSupplierFromConfig( letterEvent, deps, @@ -332,13 +339,13 @@ async function processSupplierAllocation( MessageBody: JSON.stringify(queueMessage), }), ); - return [ + return { supplier, priority, perAllocationSuccess, perAllocationFailure, volumeGroupAllocations, - ]; + }; } export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { @@ -362,28 +369,46 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { deps.logger.info({ description: "Extracted letter event", messageId: record.messageId, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, }); idempotencyConfig.registerLambdaContext(context); - const [ - allocatedSupplier, - alocatedPriority, - updatedPerAllocationSuccess, - updatedPerAllocationFailure, - updatedVolumeGroupAllocations, - ] = await getSupplierIdempotently( - letterEvent, - deps, - perAllocationSuccess, - perAllocationFailure, + const supplierAllocationResult: SupplierAllocationResult = + await getSupplierIdempotently( + letterEvent, + deps, + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, + ); + deps.logger.info({ + description: "Completed supplier allocation for letter event", + eventId: letterEvent.id, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, + supplier: supplierAllocationResult.supplier, + priority: supplierAllocationResult.priority, + updatedVolumeGroupAllocations: + supplierAllocationResult.volumeGroupAllocations, + }); + supplier = supplierAllocationResult.supplier; + priority = supplierAllocationResult.priority; + perAllocationSuccess = supplierAllocationResult.perAllocationSuccess; + perAllocationFailure = supplierAllocationResult.perAllocationFailure; + volumeGroupAllocations = + supplierAllocationResult.volumeGroupAllocations; + deps.logger.info({ + description: + "Completed processing supplier allocation for letter event", + eventId: letterEvent.id, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, + supplier, + priority, volumeGroupAllocations, - ); - supplier = allocatedSupplier; - priority = alocatedPriority; - perAllocationSuccess = updatedPerAllocationSuccess; - perAllocationFailure = updatedPerAllocationFailure; - volumeGroupAllocations = updatedVolumeGroupAllocations; + }); } catch (error) { deps.logger.error({ description: "Error processing allocation of record", From 8449e047b48492250efeee971b00fd6ad5f179a9 Mon Sep 17 00:00:00 2001 From: David Wass Date: Fri, 15 May 2026 11:06:09 +0100 Subject: [PATCH 3/4] make stats idempotent too --- .../src/handler/allocate-handler.ts | 58 ++++++++++--------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/lambdas/supplier-allocator/src/handler/allocate-handler.ts b/lambdas/supplier-allocator/src/handler/allocate-handler.ts index a6678921a..c03ae0253 100644 --- a/lambdas/supplier-allocator/src/handler/allocate-handler.ts +++ b/lambdas/supplier-allocator/src/handler/allocate-handler.ts @@ -32,7 +32,6 @@ import { import { Deps } from "../config/deps"; import { PreparedEventSchema, PreparedEvents, SupplierDetails } from "./types"; -import { domain } from "zod/v4/core/regexes.cjs"; const idempotencyConfig = new IdempotencyConfig({ eventKeyJmesPath: "id", @@ -270,9 +269,6 @@ async function saveAllocations( type SupplierAllocationResult = { supplier: string; priority: string; - perAllocationSuccess: AllocationMetrics; - perAllocationFailure: AllocationMetrics; - volumeGroupAllocations: VolumeGroupAllocation; }; async function processSupplierAllocation( @@ -342,22 +338,42 @@ async function processSupplierAllocation( return { supplier, priority, - perAllocationSuccess, - perAllocationFailure, - volumeGroupAllocations, }; } export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { - const getSupplierIdempotently = makeIdempotent(processSupplierAllocation, { - persistenceStore: deps.idempotencyLayer, - config: idempotencyConfig, - }); + const getSupplierIdempotently = ( + perAllocationSuccess: AllocationMetrics, + perAllocationFailure: AllocationMetrics, + volumeGroupAllocations: VolumeGroupAllocation, + ) => { + return makeIdempotent( + (letterEvent: PreparedEvents, depsInner: Deps) => + processSupplierAllocation( + letterEvent, + depsInner, + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, + ), + { + persistenceStore: deps.idempotencyLayer, + config: idempotencyConfig, + }, + ); + }; return async (event: SQSEvent, context: Context) => { const batchItemFailures: SQSBatchItemFailure[] = []; - let perAllocationSuccess: AllocationMetrics = new Map(); - let perAllocationFailure: AllocationMetrics = new Map(); - let volumeGroupAllocations: VolumeGroupAllocation = new Map(); + const perAllocationSuccess: AllocationMetrics = new Map(); + const perAllocationFailure: AllocationMetrics = new Map(); + const volumeGroupAllocations: VolumeGroupAllocation = new Map(); + + // create an idempotent function bound to this handler's batchItemFailures + const boundGetSupplierIdempotently = getSupplierIdempotently( + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, + ); const tasks = event.Records.map(async (record) => { let supplier = "unknown"; @@ -376,13 +392,7 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { idempotencyConfig.registerLambdaContext(context); const supplierAllocationResult: SupplierAllocationResult = - await getSupplierIdempotently( - letterEvent, - deps, - perAllocationSuccess, - perAllocationFailure, - volumeGroupAllocations, - ); + await boundGetSupplierIdempotently(letterEvent, deps); deps.logger.info({ description: "Completed supplier allocation for letter event", eventId: letterEvent.id, @@ -390,15 +400,9 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { letterVariantId: letterEvent.data.letterVariantId, supplier: supplierAllocationResult.supplier, priority: supplierAllocationResult.priority, - updatedVolumeGroupAllocations: - supplierAllocationResult.volumeGroupAllocations, }); supplier = supplierAllocationResult.supplier; priority = supplierAllocationResult.priority; - perAllocationSuccess = supplierAllocationResult.perAllocationSuccess; - perAllocationFailure = supplierAllocationResult.perAllocationFailure; - volumeGroupAllocations = - supplierAllocationResult.volumeGroupAllocations; deps.logger.info({ description: "Completed processing supplier allocation for letter event", From dfbf62490c5576a13e177a6133168bf3c9adb416 Mon Sep 17 00:00:00 2001 From: David Wass Date: Fri, 15 May 2026 11:40:29 +0100 Subject: [PATCH 4/4] even more logging --- .../src/handler/allocate-handler.ts | 21 +++++++++++++++---- .../src/services/supplier-quotas.ts | 6 ++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/lambdas/supplier-allocator/src/handler/allocate-handler.ts b/lambdas/supplier-allocator/src/handler/allocate-handler.ts index c03ae0253..07de8dc25 100644 --- a/lambdas/supplier-allocator/src/handler/allocate-handler.ts +++ b/lambdas/supplier-allocator/src/handler/allocate-handler.ts @@ -233,20 +233,21 @@ function emitDataMetrics( } function incrementAllocation( - volumeGroupAllocation: VolumeGroupAllocation, + volumeGroupAllocations: VolumeGroupAllocation, volumeGroupId: string, supplierId: string, allocation: number, deps: Deps, ) { - const groupAllocations = volumeGroupAllocation.get(volumeGroupId) ?? {}; + const groupAllocations = volumeGroupAllocations.get(volumeGroupId) ?? {}; groupAllocations[supplierId] = (groupAllocations[supplierId] ?? 0) + allocation; - volumeGroupAllocation.set(volumeGroupId, groupAllocations); + volumeGroupAllocations.set(volumeGroupId, groupAllocations); deps.logger.info({ description: "Updated allocations for volume group and supplier", volumeGroupId, groupAllocations, + setVolumeGroupAllocations: volumeGroupAllocations.get(volumeGroupId), }); } @@ -283,6 +284,7 @@ async function processSupplierAllocation( eventId: letterEvent.id, domainId: letterEvent.data.domainId, letterVariantId: letterEvent.data.letterVariantId, + existingVolumeGroupAllocations: volumeGroupAllocations, }); const supplierDetails: SupplierDetails = await getSupplierFromConfig( letterEvent, @@ -311,6 +313,10 @@ async function processSupplierAllocation( } else { incrementMetric(perAllocationFailure, supplier, priority); } + deps.logger.info({ + description: "Completed supplier allocation logic for letter event", + finalvolumeGroupAllocations: volumeGroupAllocations, + }); // Send to allocated letters queue const queueUrl = process.env.UPSERT_LETTERS_QUEUE_URL; @@ -400,6 +406,7 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { letterVariantId: letterEvent.data.letterVariantId, supplier: supplierAllocationResult.supplier, priority: supplierAllocationResult.priority, + volumeGroupAllocations, }); supplier = supplierAllocationResult.supplier; priority = supplierAllocationResult.priority; @@ -411,7 +418,7 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { letterVariantId: letterEvent.data.letterVariantId, supplier, priority, - volumeGroupAllocations, + volumeGroupAllocations: volumeGroupAllocations.entries(), }); } catch (error) { deps.logger.error({ @@ -425,6 +432,12 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { } }); + deps.logger.info({ + description: "awaiting completion of supplier allocation tasks for batch", + totalRecords: event.Records.length, + taskCount: tasks.length, + tasks, + }); await Promise.all(tasks); emitMetrics(perAllocationSuccess, MetricStatus.Success, deps); diff --git a/lambdas/supplier-allocator/src/services/supplier-quotas.ts b/lambdas/supplier-allocator/src/services/supplier-quotas.ts index 00a0c1b46..56164fd21 100644 --- a/lambdas/supplier-allocator/src/services/supplier-quotas.ts +++ b/lambdas/supplier-allocator/src/services/supplier-quotas.ts @@ -43,6 +43,12 @@ export async function updateSupplierAllocation( newAllocation: number, deps: Deps, ): Promise { + deps.logger.info({ + description: "Updating supplier allocation", + volumeGroupId, + supplierId, + newAllocation, + }); await deps.supplierQuotasRepo.updateOverallAllocation( volumeGroupId, supplierId,