diff --git a/lambdas/api-handler/src/services/__tests__/letter-operations.test.ts b/lambdas/api-handler/src/services/__tests__/letter-operations.test.ts index 550fbeca..5ab0323f 100644 --- a/lambdas/api-handler/src/services/__tests__/letter-operations.test.ts +++ b/lambdas/api-handler/src/services/__tests__/letter-operations.test.ts @@ -189,28 +189,19 @@ describe("getLetterDataUrl function", () => { }); }); +function makeLetterDto(n: number): LetterDto { + return { + id: `letter${n}`, + status: "PENDING", + supplierId: `testSupplier}`, + }; +} + describe("enqueueLetterUpdateRequests function", () => { beforeEach(() => { jest.clearAllMocks(); }); - const lettersToUpdate: LetterDto[] = [ - { - id: "id1", - status: "REJECTED", - supplierId: "s1", - specificationId: "spec1", - groupId: "g1", - reasonCode: "123", - reasonText: "Reason text", - }, - { - id: "id2", - status: "ACCEPTED", - supplierId: "s1", - }, - ]; - it("should update the letter status successfully", async () => { const sqsClient = { send: jest.fn() } as unknown as SQSClient; const logger = { error: jest.fn() } as unknown as pino.Logger; @@ -219,6 +210,13 @@ describe("enqueueLetterUpdateRequests function", () => { }; const deps: Deps = { sqsClient, logger, env } as Deps; + const lettersToUpdate = Array.from({ length: 25 }, (_, i) => + makeLetterDto(i), + ); + + const sqsClientSendMock = sqsClient.send as jest.Mock; + sqsClientSendMock.mockResolvedValue({ Failed: [] }); + const result = await enqueueLetterUpdateRequests( lettersToUpdate, "correlationId1", @@ -227,58 +225,44 @@ describe("enqueueLetterUpdateRequests function", () => { expect(result).toBeUndefined(); - expect(deps.sqsClient.send).toHaveBeenNthCalledWith( - 1, - expect.objectContaining({ - input: { - QueueUrl: deps.env.QUEUE_URL, - MessageAttributes: { - CorrelationId: { - DataType: "String", - StringValue: "correlationId1", - }, - }, - MessageBody: JSON.stringify({ - id: lettersToUpdate[0].id, - status: lettersToUpdate[0].status, - supplierId: lettersToUpdate[0].supplierId, - specificationId: lettersToUpdate[0].specificationId, - groupId: lettersToUpdate[0].groupId, - reasonCode: lettersToUpdate[0].reasonCode, - reasonText: lettersToUpdate[0].reasonText, - }), - }, - }), - ); + // processes 10 at a time (25 -> 10+10+5) + expect(sqsClientSendMock).toHaveBeenCalledTimes(3); - expect(deps.sqsClient.send).toHaveBeenNthCalledWith( - 2, - expect.objectContaining({ - input: { - QueueUrl: deps.env.QUEUE_URL, - MessageAttributes: { - CorrelationId: { - DataType: "String", - StringValue: "correlationId1", - }, - }, - MessageBody: JSON.stringify({ - id: lettersToUpdate[1].id, - status: lettersToUpdate[1].status, - supplierId: lettersToUpdate[1].supplierId, - }), - }, - }), - ); + const firstCallArg = sqsClientSendMock.mock.calls[0][0]; + const firstInput = firstCallArg.input; + + expect(firstInput.QueueUrl).toBe(deps.env.QUEUE_URL); + expect(Array.isArray(firstInput.Entries)).toBe(true); + expect(firstInput.Entries.length).toBe(10); + + expect(firstInput.Entries[0].Id).toBe("0-0"); + expect(firstInput.Entries[9].Id).toBe("0-9"); + + expect( + firstInput.Entries[0].MessageAttributes.CorrelationId.StringValue, + ).toBe("correlationId1"); + + const parsed = JSON.parse(firstInput.Entries[0].MessageBody); + expect(parsed.id).toBe("letter0"); + + // check last batch had 5 entries + const thirdCallArg = sqsClientSendMock.mock.calls[2][0]; + const thirdInput = thirdCallArg.input; + expect(thirdInput.Entries.length).toBe(5); + // ids in third batch should start "2-0" + expect(thirdInput.Entries[0].Id).toBe("2-0"); }); - it("should log error if enqueueing fails", async () => { - const mockError = new Error("error"); + it("should log error when SendMessageBatch returns Failed entries", async () => { const sqsClient = { send: jest .fn() - .mockRejectedValueOnce(mockError) - .mockResolvedValueOnce({ MessageId: "m1" }), + .mockResolvedValueOnce({ Failed: [] }) // first batch succeeds + .mockResolvedValueOnce({ + Failed: [ + { Id: "1-1", SenderFault: false, Code: "Err", Message: "failed" }, + ], + }), } as unknown as SQSClient; const logger = { error: jest.fn() } as unknown as pino.Logger; const env = { @@ -286,6 +270,10 @@ describe("enqueueLetterUpdateRequests function", () => { }; const deps: Deps = { sqsClient, logger, env } as Deps; + const lettersToUpdate = Array.from({ length: 12 }, (_, i) => + makeLetterDto(i), + ); + const result = await enqueueLetterUpdateRequests( lettersToUpdate, "correlationId1", @@ -294,36 +282,44 @@ describe("enqueueLetterUpdateRequests function", () => { expect(result).toBeUndefined(); - expect(deps.sqsClient.send).toHaveBeenNthCalledWith( - 2, - expect.objectContaining({ - input: { - QueueUrl: deps.env.QUEUE_URL, - MessageAttributes: { - CorrelationId: { - DataType: "String", - StringValue: "correlationId1", - }, - }, - MessageBody: JSON.stringify({ - id: lettersToUpdate[1].id, - status: lettersToUpdate[1].status, - supplierId: lettersToUpdate[1].supplierId, - }), - }, - }), - ); + // 12 = 10 + 2 + expect(deps.sqsClient.send).toHaveBeenCalledTimes(2); expect(deps.logger.error).toHaveBeenCalledTimes(1); - expect(deps.logger.error).toHaveBeenCalledWith( - { - err: mockError, - correlationId: "correlationId1", - letterId: lettersToUpdate[0].id, - letterStatus: lettersToUpdate[0].status, - supplierId: lettersToUpdate[0].supplierId, - }, - "Error enqueuing letter status update", + const errorArgs = (deps.logger.error as jest.Mock).mock.calls[0][0]; + expect(errorArgs.failed).toBeDefined(); + expect(Array.isArray(errorArgs.failed)).toBe(true); + expect(errorArgs.failed[0].Id).toBe("1-1"); + }); + + it("should log error if enqueueing fails", async () => { + const sqsClient = { + send: jest + .fn() + .mockResolvedValueOnce({ Failed: [] }) // batch 0 + .mockImplementationOnce(() => { + throw new Error("some failure"); + }) // batch 1 + .mockResolvedValueOnce({ Failed: [] }), // batch 2 + } as unknown as SQSClient; + const logger = { error: jest.fn() } as unknown as pino.Logger; + const env = { + QUEUE_URL: "sqsUrl", + }; + const deps: Deps = { sqsClient, logger, env } as Deps; + + const lettersToUpdate = Array.from({ length: 21 }, (_, i) => + makeLetterDto(i), ); + + await enqueueLetterUpdateRequests(lettersToUpdate, "correlationId1", deps); + + // all 3 attempted + expect(deps.sqsClient.send).toHaveBeenCalledTimes(3); + + expect(deps.logger.error).toHaveBeenCalledTimes(1); + const logged = (deps.logger.error as jest.Mock).mock.calls[0][0]; + expect(logged.correlationId).toBe("correlationId1"); + expect(logged.err).toBeInstanceOf(Error); }); }); diff --git a/lambdas/api-handler/src/services/letter-operations.ts b/lambdas/api-handler/src/services/letter-operations.ts index c94e76b5..486d2a5e 100644 --- a/lambdas/api-handler/src/services/letter-operations.ts +++ b/lambdas/api-handler/src/services/letter-operations.ts @@ -1,7 +1,7 @@ import { LetterBase, LetterRepository } from "@internal/datastore"; import { getSignedUrl } from "@aws-sdk/s3-request-presigner"; import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3"; -import { SendMessageCommand } from "@aws-sdk/client-sqs"; +import { SendMessageBatchCommand } from "@aws-sdk/client-sqs"; import NotFoundError from "../errors/not-found-error"; import { LetterDto } from "../contracts/letters"; import { ApiErrorDetail } from "../contracts/errors"; @@ -81,34 +81,60 @@ export const getLetterDataUrl = async ( } }; +function chunk(arr: LetterDto[], size: number): LetterDto[][] { + const out: LetterDto[][] = []; + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)); + return out; +} + export async function enqueueLetterUpdateRequests( updateRequests: LetterDto[], correlationId: string, deps: Deps, ) { - const tasks = updateRequests.map(async (request: LetterDto) => { - try { - const command = new SendMessageCommand({ - QueueUrl: deps.env.QUEUE_URL, - MessageAttributes: { - CorrelationId: { DataType: "String", StringValue: correlationId }, - }, - MessageBody: JSON.stringify(request), - }); - await deps.sqsClient.send(command); - } catch (error) { - deps.logger.error( - { - err: error, - correlationId, - letterId: request.id, - letterStatus: request.status, - supplierId: request.supplierId, - }, - "Error enqueuing letter status update", - ); - } - }); + const BATCH_SIZE = 10; // SQS SendMessageBatch max + const CONCURRENCY = 5; // number of parallel batch API calls + + const batches = chunk(updateRequests, BATCH_SIZE); - await Promise.all(tasks); + // send batches in groups with limited concurrency + // BATCH_SIZE * CONCURRENCY is the number of total updates / db calls in-flight + for (let i = 0; i < batches.length; i += CONCURRENCY) { + const window = batches.slice(i, i + CONCURRENCY); + + await Promise.all( + window.map(async (batch, batchIdx) => { + const entries = batch.map((request, idx) => ({ + Id: `${i + batchIdx}-${idx}`, // unique per batch entry + MessageBody: JSON.stringify(request), + MessageAttributes: { + CorrelationId: { DataType: "String", StringValue: correlationId }, + }, + })); + + const cmd = new SendMessageBatchCommand({ + QueueUrl: deps.env.QUEUE_URL, + Entries: entries, + }); + + try { + const result = await deps.sqsClient.send(cmd); + if (result.Failed && result.Failed.length > 0) { + deps.logger.error( + { failed: result.Failed }, + "Some batch entries failed", + ); + } + } catch (error) { + deps.logger.error( + { + err: error, + correlationId, + }, + "Error enqueuing letter status updates", + ); + } + }), + ); + } }