diff --git a/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf b/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf index 7bb3595f..38893483 100644 --- a/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf +++ b/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf @@ -35,7 +35,8 @@ module "supplier_allocator" { log_subscription_role_arn = local.acct.log_subscription_role_arn lambda_env_vars = merge(local.common_lambda_env_vars, { - UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url + UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url, + IDEMPOTENCY_TABLE_NAME = aws_dynamodb_table.idempotency.name }) } @@ -110,6 +111,7 @@ data "aws_iam_policy_document" "supplier_allocator_lambda" { resources = [ aws_dynamodb_table.supplier-quotas.arn, + aws_dynamodb_table.idempotency.arn, "${aws_dynamodb_table.supplier-quotas.arn}/index/*" ] } diff --git a/lambdas/supplier-allocator/package.json b/lambdas/supplier-allocator/package.json index 5c6e422b..1edbda72 100644 --- a/lambdas/supplier-allocator/package.json +++ b/lambdas/supplier-allocator/package.json @@ -1,5 +1,6 @@ { "dependencies": { + "@aws-lambda-powertools/idempotency": "^2.33.0", "@aws-sdk/client-dynamodb": "^3.858.0", "@aws-sdk/client-sqs": "^3.984.0", "@aws-sdk/lib-dynamodb": "^3.1008.0", diff --git a/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts b/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts index 88d04eab..316fc91c 100644 --- a/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts +++ b/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts @@ -4,6 +4,7 @@ describe("createDependenciesContainer", () => { const env = { SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", }; beforeEach(() => { @@ -27,6 +28,10 @@ describe("createDependenciesContainer", () => { SupplierQuotasRepository: jest.fn(), })); + jest.mock("@aws-lambda-powertools/idempotency/dynamodb", () => ({ + DynamoDBPersistenceLayer: jest.fn(), + })); + // Env jest.mock("../env", () => ({ envVars: env })); }); @@ -40,6 +45,9 @@ describe("createDependenciesContainer", () => { const { SupplierQuotasRepository } = jest.requireMock( "@internal/datastore", ); + const { DynamoDBPersistenceLayer } = jest.requireMock( + "@aws-lambda-powertools/idempotency/dynamodb", + ); // eslint-disable-next-line @typescript-eslint/no-require-imports const { createDependenciesContainer } = require("../deps"); const deps: Deps = createDependenciesContainer(); @@ -54,6 +62,11 @@ describe("createDependenciesContainer", () => { expect(supplierQuotasRepoCtorArgs[1]).toEqual({ supplierQuotasTableName: "SupplierQuotasTable", }); + expect(DynamoDBPersistenceLayer).toHaveBeenCalledTimes(1); + const idempotencyLayerCtorArgs = DynamoDBPersistenceLayer.mock.calls[0][0]; + expect(idempotencyLayerCtorArgs).toEqual({ + tableName: "IdempotencyTable", + }); expect(deps.env).toEqual(env); }); }); diff --git a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts index 1f4da34c..6274a618 100644 --- a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts +++ b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts @@ -16,12 +16,14 @@ describe("lambdaEnv", () => { it("should load all environment variables successfully", () => { process.env.SUPPLIER_CONFIG_TABLE_NAME = "SupplierConfigTable"; process.env.SUPPLIER_QUOTAS_TABLE_NAME = "SupplierQuotasTable"; + process.env.IDEMPOTENCY_TABLE_NAME = "IdempotencyTable"; const { envVars } = require("../env"); expect(envVars).toEqual({ SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", }); }); }); diff --git a/lambdas/supplier-allocator/src/config/deps.ts b/lambdas/supplier-allocator/src/config/deps.ts index 5f58a00e..e937d64e 100644 --- a/lambdas/supplier-allocator/src/config/deps.ts +++ b/lambdas/supplier-allocator/src/config/deps.ts @@ -1,5 +1,6 @@ import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; import { DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb"; +import { DynamoDBPersistenceLayer } from "@aws-lambda-powertools/idempotency/dynamodb"; import { SQSClient } from "@aws-sdk/client-sqs"; import { Logger } from "pino"; import { createLogger } from "@internal/helpers"; @@ -12,6 +13,7 @@ import { EnvVars, envVars } from "./env"; export type Deps = { supplierConfigRepo: SupplierConfigRepository; supplierQuotasRepo: SupplierQuotasRepository; + idempotencyLayer: DynamoDBPersistenceLayer; logger: Logger; env: EnvVars; sqsClient: SQSClient; @@ -22,11 +24,7 @@ function createDocumentClient(): DynamoDBDocumentClient { return DynamoDBDocumentClient.from(ddbClient); } -function createSupplierConfigRepository( - log: Logger, - // eslint-disable-next-line @typescript-eslint/no-shadow - envVars: EnvVars, -): SupplierConfigRepository { +function createSupplierConfigRepository(): SupplierConfigRepository { const config = { supplierConfigTableName: envVars.SUPPLIER_CONFIG_TABLE_NAME, }; @@ -34,11 +32,7 @@ function createSupplierConfigRepository( return new SupplierConfigRepository(createDocumentClient(), config); } -function createSupplierQuotasRepository( - log: Logger, - // eslint-disable-next-line @typescript-eslint/no-shadow - envVars: EnvVars, -): SupplierQuotasRepository { +function createSupplierQuotasRepository(): SupplierQuotasRepository { const config = { supplierQuotasTableName: envVars.SUPPLIER_QUOTAS_TABLE_NAME, }; @@ -46,12 +40,19 @@ function createSupplierQuotasRepository( return new SupplierQuotasRepository(createDocumentClient(), config); } +function createIdempotencyLayer(): DynamoDBPersistenceLayer { + return new DynamoDBPersistenceLayer({ + tableName: envVars.IDEMPOTENCY_TABLE_NAME, + }); +} + export function createDependenciesContainer(): Deps { const log = createLogger({ logLevel: envVars.PINO_LOG_LEVEL }); return { - supplierConfigRepo: createSupplierConfigRepository(log, envVars), - supplierQuotasRepo: createSupplierQuotasRepository(log, envVars), + supplierConfigRepo: createSupplierConfigRepository(), + supplierQuotasRepo: createSupplierQuotasRepository(), + idempotencyLayer: createIdempotencyLayer(), logger: log, env: envVars, sqsClient: new SQSClient({}), diff --git a/lambdas/supplier-allocator/src/config/env.ts b/lambdas/supplier-allocator/src/config/env.ts index a155e4db..7d99ef0d 100644 --- a/lambdas/supplier-allocator/src/config/env.ts +++ b/lambdas/supplier-allocator/src/config/env.ts @@ -4,6 +4,7 @@ const EnvVarsSchema = z.object({ SUPPLIER_CONFIG_TABLE_NAME: z.string(), SUPPLIER_QUOTAS_TABLE_NAME: z.string(), PINO_LOG_LEVEL: z.coerce.string().optional(), + IDEMPOTENCY_TABLE_NAME: z.string(), }); export type EnvVars = z.infer; diff --git a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts index 39ae14c8..cc02087c 100644 --- a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts +++ b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts @@ -7,15 +7,11 @@ import { $LetterStatusChangeEvent, LetterStatusChangeEvent, } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events"; -import { - SupplierConfigRepository, - SupplierQuotasRepository, -} from "@internal/datastore"; +import { makeIdempotent } from "@aws-lambda-powertools/idempotency"; import createSupplierAllocatorHandler from "../allocate-handler"; import * as supplierConfig from "../../services/supplier-config"; import * as supplierQuotas from "../../services/supplier-quotas"; import * as allocationConfig from "../allocation-config"; - import { Deps } from "../../config/deps"; import packageJson from "../../../package.json"; @@ -28,6 +24,14 @@ jest.mock("../../services/supplier-config"); jest.mock("../../services/supplier-quotas"); jest.mock("../allocation-config"); +jest.mock("@aws-lambda-powertools/idempotency", () => { + const original = jest.requireActual("@aws-lambda-powertools/idempotency"); + return { + ...original, + makeIdempotent: jest.fn((fn, _) => fn), + }; +}); + function createSQSEvent(records: SQSRecord[]): SQSEvent { return { Records: records, @@ -185,16 +189,15 @@ function setupDefaultMocks() { } describe("createSupplierAllocatorHandler", () => { - let mockSqsClient: jest.Mocked; - let mockedDeps: jest.Mocked; - let mockedSupplierConfigRepo: jest.Mocked; - let mockedSupplierQuotasRepo: jest.Mocked; - beforeEach(() => { - mockSqsClient = { - send: jest.fn(), - } as unknown as jest.Mocked; - - mockedSupplierConfigRepo = { + const mockedDeps: jest.Mocked = { + logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger, + env: { + SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", + SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", + IDEMPOTENCY_TABLE_NAME: "IdempotencyTable", + }, + sqsClient: { send: jest.fn() } as unknown as SQSClient, + supplierConfigRepo: { ddbClient: {} as any, config: {} as any, getLetterVariant: jest.fn(), @@ -203,27 +206,18 @@ describe("createSupplierAllocatorHandler", () => { getSuppliersDetails: jest.fn(), getSupplierPacksForPackSpecification: jest.fn(), getPackSpecification: jest.fn(), - }; - - mockedSupplierQuotasRepo = { + }, + supplierQuotasRepo: { ddbClient: {} as any, config: {} as any, getOverallAllocation: jest.fn(), updateOverallAllocation: jest.fn(), getDailyAllocation: jest.fn(), updateDailyAllocation: jest.fn(), - }; + }, + } as unknown as Deps; - mockedDeps = { - logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger, - env: { - SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable", - SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable", - }, - sqsClient: mockSqsClient, - supplierConfigRepo: mockedSupplierConfigRepo, - supplierQuotasRepo: mockedSupplierQuotasRepo, - }; + beforeEach(() => { jest.clearAllMocks(); }); @@ -244,8 +238,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -281,8 +275,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -315,8 +309,8 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; const messageBody = JSON.parse(sendCall.input.MessageBody); expect(messageBody.allocationDetails.supplierSpec).toEqual({ supplierId: "supplier1", @@ -361,7 +355,7 @@ describe("createSupplierAllocatorHandler", () => { const handler = createSupplierAllocatorHandler(mockedDeps); await handler(evt, {} as any, {} as any); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; const messageBody = JSON.parse(sendCall.input.MessageBody); expect(messageBody.letterEvent.data.domainId).toBe("letter-test"); }); @@ -410,7 +404,7 @@ describe("createSupplierAllocatorHandler", () => { if (!result) throw new Error("expected BatchResponse, got void"); expect(result.batchItemFailures).toHaveLength(0); - expect(mockSqsClient.send).toHaveBeenCalledTimes(2); + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(2); }); test("returns batch failure for invalid JSON", async () => { @@ -480,7 +474,7 @@ describe("createSupplierAllocatorHandler", () => { process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; const sqsError = new Error("SQS send failed"); - (mockSqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError); + (mockedDeps.sqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError); const handler = createSupplierAllocatorHandler(mockedDeps); const result = await handler(evt, {} as any, {} as any); @@ -513,7 +507,7 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(1); expect(result.batchItemFailures[0].itemIdentifier).toBe("fail-msg"); - expect(mockSqsClient.send).toHaveBeenCalledTimes(2); + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(2); }); test("sends correct queue URL in SQS message command", async () => { @@ -529,7 +523,7 @@ describe("createSupplierAllocatorHandler", () => { const handler = createSupplierAllocatorHandler(mockedDeps); await handler(evt, {} as any, {} as any); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall.input.QueueUrl).toBe(queueUrl); }); @@ -557,8 +551,8 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -667,8 +661,9 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock + .calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -711,8 +706,8 @@ describe("createSupplierAllocatorHandler", () => { variantId: "lv1", }), ); - expect(mockSqsClient.send).toHaveBeenCalledTimes(1); - const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0]; expect(sendCall).toBeInstanceOf(SendMessageCommand); const messageBody = JSON.parse(sendCall.input.MessageBody); @@ -771,4 +766,22 @@ describe("createSupplierAllocatorHandler", () => { expect(result.batchItemFailures).toHaveLength(0); expect(allocationConfig.selectSupplierByFactor).toHaveBeenCalledTimes(2); }); + + test("does not process a message more than once due to idempotency wrapper", async () => { + const preparedEvent = createPreparedV2Event(); + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + setupDefaultMocks(); + (makeIdempotent as jest.Mock).mockImplementationOnce((_fn) => "supplier1"); + + const handler = createSupplierAllocatorHandler(mockedDeps); + await handler(evt, {} as any, {} as any); + + expect(makeIdempotent).toHaveBeenCalledTimes(1); + expect(mockedDeps.sqsClient.send).not.toHaveBeenCalled(); + }); }); diff --git a/lambdas/supplier-allocator/src/handler/allocate-handler.ts b/lambdas/supplier-allocator/src/handler/allocate-handler.ts index ec3cc106..07de8dc2 100644 --- a/lambdas/supplier-allocator/src/handler/allocate-handler.ts +++ b/lambdas/supplier-allocator/src/handler/allocate-handler.ts @@ -1,4 +1,4 @@ -import { SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; +import { Context, SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; import { SendMessageCommand } from "@aws-sdk/client-sqs"; import { LetterVariant, @@ -13,6 +13,10 @@ import { buildEMFObject, formatGroupId, } from "@internal/helpers"; +import { + IdempotencyConfig, + makeIdempotent, +} from "@aws-lambda-powertools/idempotency"; import { getVariantDetails, getVolumeGroupDetails, @@ -29,6 +33,10 @@ import { import { Deps } from "../config/deps"; import { PreparedEventSchema, PreparedEvents, SupplierDetails } from "./types"; +const idempotencyConfig = new IdempotencyConfig({ + eventKeyJmesPath: "id", +}); + function parseQueueMessage(queueMessage: string): PreparedEvents { const result = PreparedEventSchema.safeParse(queueMessage); @@ -225,20 +233,21 @@ function emitDataMetrics( } function incrementAllocation( - volumeGroupAllocation: VolumeGroupAllocation, + volumeGroupAllocations: VolumeGroupAllocation, volumeGroupId: string, supplierId: string, allocation: number, deps: Deps, ) { - const groupAllocations = volumeGroupAllocation.get(volumeGroupId) ?? {}; + const groupAllocations = volumeGroupAllocations.get(volumeGroupId) ?? {}; groupAllocations[supplierId] = (groupAllocations[supplierId] ?? 0) + allocation; - volumeGroupAllocation.set(volumeGroupId, groupAllocations); + volumeGroupAllocations.set(volumeGroupId, groupAllocations); deps.logger.info({ description: "Updated allocations for volume group and supplier", volumeGroupId, groupAllocations, + setVolumeGroupAllocations: volumeGroupAllocations.get(volumeGroupId), }); } @@ -258,13 +267,120 @@ async function saveAllocations( } } +type SupplierAllocationResult = { + supplier: string; + priority: string; +}; + +async function processSupplierAllocation( + letterEvent: PreparedEvents, + deps: Deps, + perAllocationSuccess: AllocationMetrics, + perAllocationFailure: AllocationMetrics, + volumeGroupAllocations: VolumeGroupAllocation, +): Promise { + deps.logger.info({ + description: "Processing supplier allocation for letter event", + eventId: letterEvent.id, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, + existingVolumeGroupAllocations: volumeGroupAllocations, + }); + const supplierDetails: SupplierDetails = await getSupplierFromConfig( + letterEvent, + deps, + ); + deps.logger.info({ + description: "Resolved supplier details from config", + supplierDetails, + }); + const supplierSpec = supplierDetails?.allocationDetails?.supplierSpec; + + const supplier = supplierSpec.supplierId; + const priority = String(supplierSpec.priority); + + if (supplierDetails.allocationDetails.allocationStatus.status === "PENDING") { + incrementMetric(perAllocationSuccess, supplier, priority); + emitDataMetrics(letterEvent, supplier, "extra_data_dimensions", deps); + + incrementAllocation( + volumeGroupAllocations, + supplierDetails.volumeGroupId, + supplier, + 1, + deps, + ); + } else { + incrementMetric(perAllocationFailure, supplier, priority); + } + deps.logger.info({ + description: "Completed supplier allocation logic for letter event", + finalvolumeGroupAllocations: volumeGroupAllocations, + }); + + // Send to allocated letters queue + const queueUrl = process.env.UPSERT_LETTERS_QUEUE_URL; + if (!queueUrl) { + throw new Error("UPSERT_LETTERS_QUEUE_URL not configured"); + } + + const queueMessage = { + letterEvent, + allocationDetails: supplierDetails.allocationDetails, + }; + + deps.logger.info({ + description: "Sending message to upsert letter queue", + msg: queueMessage, + url: queueUrl, + }); + + await deps.sqsClient.send( + new SendMessageCommand({ + QueueUrl: queueUrl, + MessageBody: JSON.stringify(queueMessage), + }), + ); + return { + supplier, + priority, + }; +} + export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { - return async (event: SQSEvent) => { + const getSupplierIdempotently = ( + perAllocationSuccess: AllocationMetrics, + perAllocationFailure: AllocationMetrics, + volumeGroupAllocations: VolumeGroupAllocation, + ) => { + return makeIdempotent( + (letterEvent: PreparedEvents, depsInner: Deps) => + processSupplierAllocation( + letterEvent, + depsInner, + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, + ), + { + persistenceStore: deps.idempotencyLayer, + config: idempotencyConfig, + }, + ); + }; + return async (event: SQSEvent, context: Context) => { const batchItemFailures: SQSBatchItemFailure[] = []; const perAllocationSuccess: AllocationMetrics = new Map(); const perAllocationFailure: AllocationMetrics = new Map(); const volumeGroupAllocations: VolumeGroupAllocation = new Map(); + // create an idempotent function bound to this handler's batchItemFailures + const boundGetSupplierIdempotently = getSupplierIdempotently( + perAllocationSuccess, + perAllocationFailure, + volumeGroupAllocations, + ); + const tasks = event.Records.map(async (record) => { let supplier = "unknown"; let priority = "unknown"; @@ -275,65 +391,35 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { deps.logger.info({ description: "Extracted letter event", messageId: record.messageId, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, }); - const supplierDetails: SupplierDetails = await getSupplierFromConfig( - letterEvent, - deps, - ); + idempotencyConfig.registerLambdaContext(context); + const supplierAllocationResult: SupplierAllocationResult = + await boundGetSupplierIdempotently(letterEvent, deps); deps.logger.info({ - description: "Resolved supplier details from config", - supplierDetails, + description: "Completed supplier allocation for letter event", + eventId: letterEvent.id, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, + supplier: supplierAllocationResult.supplier, + priority: supplierAllocationResult.priority, + volumeGroupAllocations, }); - const supplierSpec = supplierDetails?.allocationDetails?.supplierSpec; - - supplier = supplierSpec.supplierId; - priority = String(supplierSpec.priority); - - if ( - supplierDetails.allocationDetails.allocationStatus.status === - "PENDING" - ) { - incrementMetric(perAllocationSuccess, supplier, priority); - - incrementAllocation( - volumeGroupAllocations, - supplierDetails.volumeGroupId, - supplier, - 1, - deps, - ); - } else { - incrementMetric(perAllocationFailure, supplier, priority); - } - - // Send to allocated letters queue - const queueUrl = process.env.UPSERT_LETTERS_QUEUE_URL; - if (!queueUrl) { - throw new Error("UPSERT_LETTERS_QUEUE_URL not configured"); - } - - const queueMessage = { - letterEvent, - allocationDetails: supplierDetails.allocationDetails, - }; - + supplier = supplierAllocationResult.supplier; + priority = supplierAllocationResult.priority; deps.logger.info({ - description: "Sending message to upsert letter queue", - msg: queueMessage, - url: queueUrl, + description: + "Completed processing supplier allocation for letter event", + eventId: letterEvent.id, + domainId: letterEvent.data.domainId, + letterVariantId: letterEvent.data.letterVariantId, + supplier, + priority, + volumeGroupAllocations: volumeGroupAllocations.entries(), }); - - await deps.sqsClient.send( - new SendMessageCommand({ - QueueUrl: queueUrl, - MessageBody: JSON.stringify(queueMessage), - }), - ); - - incrementMetric(perAllocationSuccess, supplier, priority); - emitDataMetrics(letterEvent, supplier, "extra_data_dimensions", deps); } catch (error) { deps.logger.error({ description: "Error processing allocation of record", @@ -346,6 +432,12 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { } }); + deps.logger.info({ + description: "awaiting completion of supplier allocation tasks for batch", + totalRecords: event.Records.length, + taskCount: tasks.length, + tasks, + }); await Promise.all(tasks); emitMetrics(perAllocationSuccess, MetricStatus.Success, deps); diff --git a/lambdas/supplier-allocator/src/services/supplier-quotas.ts b/lambdas/supplier-allocator/src/services/supplier-quotas.ts index 00a0c1b4..56164fd2 100644 --- a/lambdas/supplier-allocator/src/services/supplier-quotas.ts +++ b/lambdas/supplier-allocator/src/services/supplier-quotas.ts @@ -43,6 +43,12 @@ export async function updateSupplierAllocation( newAllocation: number, deps: Deps, ): Promise { + deps.logger.info({ + description: "Updating supplier allocation", + volumeGroupId, + supplierId, + newAllocation, + }); await deps.supplierQuotasRepo.updateOverallAllocation( volumeGroupId, supplierId, diff --git a/package-lock.json b/package-lock.json index c8fcc257..3baf4e24 100644 --- a/package-lock.json +++ b/package-lock.json @@ -257,6 +257,7 @@ "name": "nhs-notify-supplier-api-allocate-letter", "version": "0.0.1", "dependencies": { + "@aws-lambda-powertools/idempotency": "^2.33.0", "@aws-sdk/client-dynamodb": "^3.858.0", "@aws-sdk/client-sqs": "^3.984.0", "@aws-sdk/lib-dynamodb": "^3.1008.0", @@ -5197,15 +5198,6 @@ "resolved": "internal/events", "link": true }, - "node_modules/@nhsdigital/nhs-notify-event-schemas-supplier-config": { - "version": "1.0.1", - "resolved": "https://npm.pkg.github.com/download/@nhsdigital/nhs-notify-event-schemas-supplier-config/1.0.1/ff1ce566201ae291825acd5e771537229d6aa9ca", - "integrity": "sha512-gIZgfzgvkCfZE+HCosrVJ3tBce2FJRGfwPmtYtZDBG+ox/KvbpJFWXzJ5Jkh/42YzcVn2GxT1fy1L1F6pxiYWA==", - "dependencies": { - "@asyncapi/bundler": "^0.6.4", - "zod": "^4.1.12" - } - }, "node_modules/@nhsdigital/notify-digital-letters-consumer-contracts": { "version": "1.0.1", "resolved": "https://npm.pkg.github.com/download/@nhsdigital/notify-digital-letters-consumer-contracts/1.0.1/a721d9c8b1e01a61de4ecc2b62d3c692e5213bb8",