Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 87 additions & 91 deletions lambdas/api-handler/src/services/__tests__/letter-operations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,19 @@ describe("getLetterDataUrl function", () => {
});
});

function makeLetterDto(n: number): LetterDto {
return {
id: `letter${n}`,
status: "PENDING",
supplierId: `testSupplier}`,
};
}

describe("enqueueLetterUpdateRequests function", () => {
beforeEach(() => {
jest.clearAllMocks();
});

const lettersToUpdate: LetterDto[] = [
{
id: "id1",
status: "REJECTED",
supplierId: "s1",
specificationId: "spec1",
groupId: "g1",
reasonCode: "123",
reasonText: "Reason text",
},
{
id: "id2",
status: "ACCEPTED",
supplierId: "s1",
},
];

it("should update the letter status successfully", async () => {
const sqsClient = { send: jest.fn() } as unknown as SQSClient;
const logger = { error: jest.fn() } as unknown as pino.Logger;
Expand All @@ -219,6 +210,13 @@ describe("enqueueLetterUpdateRequests function", () => {
};
const deps: Deps = { sqsClient, logger, env } as Deps;

const lettersToUpdate = Array.from({ length: 25 }, (_, i) =>
makeLetterDto(i),
);

const sqsClientSendMock = sqsClient.send as jest.Mock;
sqsClientSendMock.mockResolvedValue({ Failed: [] });

const result = await enqueueLetterUpdateRequests(
lettersToUpdate,
"correlationId1",
Expand All @@ -227,65 +225,55 @@ describe("enqueueLetterUpdateRequests function", () => {

expect(result).toBeUndefined();

expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
input: {
QueueUrl: deps.env.QUEUE_URL,
MessageAttributes: {
CorrelationId: {
DataType: "String",
StringValue: "correlationId1",
},
},
MessageBody: JSON.stringify({
id: lettersToUpdate[0].id,
status: lettersToUpdate[0].status,
supplierId: lettersToUpdate[0].supplierId,
specificationId: lettersToUpdate[0].specificationId,
groupId: lettersToUpdate[0].groupId,
reasonCode: lettersToUpdate[0].reasonCode,
reasonText: lettersToUpdate[0].reasonText,
}),
},
}),
);
// processes 10 at a time (25 -> 10+10+5)
expect(sqsClientSendMock).toHaveBeenCalledTimes(3);

expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
input: {
QueueUrl: deps.env.QUEUE_URL,
MessageAttributes: {
CorrelationId: {
DataType: "String",
StringValue: "correlationId1",
},
},
MessageBody: JSON.stringify({
id: lettersToUpdate[1].id,
status: lettersToUpdate[1].status,
supplierId: lettersToUpdate[1].supplierId,
}),
},
}),
);
const firstCallArg = sqsClientSendMock.mock.calls[0][0];
const firstInput = firstCallArg.input;

expect(firstInput.QueueUrl).toBe(deps.env.QUEUE_URL);
expect(Array.isArray(firstInput.Entries)).toBe(true);
expect(firstInput.Entries.length).toBe(10);

expect(firstInput.Entries[0].Id).toBe("0-0");
expect(firstInput.Entries[9].Id).toBe("0-9");

expect(
firstInput.Entries[0].MessageAttributes.CorrelationId.StringValue,
).toBe("correlationId1");

const parsed = JSON.parse(firstInput.Entries[0].MessageBody);
expect(parsed.id).toBe("letter0");

// check last batch had 5 entries
const thirdCallArg = sqsClientSendMock.mock.calls[2][0];
const thirdInput = thirdCallArg.input;
expect(thirdInput.Entries.length).toBe(5);
// ids in third batch should start "2-0"
expect(thirdInput.Entries[0].Id).toBe("2-0");
});

it("should log error if enqueueing fails", async () => {
const mockError = new Error("error");
it("should log error when SendMessageBatch returns Failed entries", async () => {
const sqsClient = {
send: jest
.fn()
.mockRejectedValueOnce(mockError)
.mockResolvedValueOnce({ MessageId: "m1" }),
.mockResolvedValueOnce({ Failed: [] }) // first batch succeeds
.mockResolvedValueOnce({
Failed: [
{ Id: "1-1", SenderFault: false, Code: "Err", Message: "failed" },
],
}),
} as unknown as SQSClient;
const logger = { error: jest.fn() } as unknown as pino.Logger;
const env = {
QUEUE_URL: "sqsUrl",
};
const deps: Deps = { sqsClient, logger, env } as Deps;

const lettersToUpdate = Array.from({ length: 12 }, (_, i) =>
makeLetterDto(i),
);

const result = await enqueueLetterUpdateRequests(
lettersToUpdate,
"correlationId1",
Expand All @@ -294,36 +282,44 @@ describe("enqueueLetterUpdateRequests function", () => {

expect(result).toBeUndefined();

expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
input: {
QueueUrl: deps.env.QUEUE_URL,
MessageAttributes: {
CorrelationId: {
DataType: "String",
StringValue: "correlationId1",
},
},
MessageBody: JSON.stringify({
id: lettersToUpdate[1].id,
status: lettersToUpdate[1].status,
supplierId: lettersToUpdate[1].supplierId,
}),
},
}),
);
// 12 = 10 + 2
expect(deps.sqsClient.send).toHaveBeenCalledTimes(2);

expect(deps.logger.error).toHaveBeenCalledTimes(1);
expect(deps.logger.error).toHaveBeenCalledWith(
{
err: mockError,
correlationId: "correlationId1",
letterId: lettersToUpdate[0].id,
letterStatus: lettersToUpdate[0].status,
supplierId: lettersToUpdate[0].supplierId,
},
"Error enqueuing letter status update",
const errorArgs = (deps.logger.error as jest.Mock).mock.calls[0][0];
expect(errorArgs.failed).toBeDefined();
expect(Array.isArray(errorArgs.failed)).toBe(true);
expect(errorArgs.failed[0].Id).toBe("1-1");
});

it("should log error if enqueueing fails", async () => {
const sqsClient = {
send: jest
.fn()
.mockResolvedValueOnce({ Failed: [] }) // batch 0
.mockImplementationOnce(() => {
throw new Error("some failure");
}) // batch 1
.mockResolvedValueOnce({ Failed: [] }), // batch 2
} as unknown as SQSClient;
const logger = { error: jest.fn() } as unknown as pino.Logger;
const env = {
QUEUE_URL: "sqsUrl",
};
const deps: Deps = { sqsClient, logger, env } as Deps;

const lettersToUpdate = Array.from({ length: 21 }, (_, i) =>
makeLetterDto(i),
);

await enqueueLetterUpdateRequests(lettersToUpdate, "correlationId1", deps);

// all 3 attempted
expect(deps.sqsClient.send).toHaveBeenCalledTimes(3);

expect(deps.logger.error).toHaveBeenCalledTimes(1);
const logged = (deps.logger.error as jest.Mock).mock.calls[0][0];
expect(logged.correlationId).toBe("correlationId1");
expect(logged.err).toBeInstanceOf(Error);
});
});
76 changes: 51 additions & 25 deletions lambdas/api-handler/src/services/letter-operations.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { LetterBase, LetterRepository } from "@internal/datastore";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
import { SendMessageCommand } from "@aws-sdk/client-sqs";
import { SendMessageBatchCommand } from "@aws-sdk/client-sqs";
import NotFoundError from "../errors/not-found-error";
import { LetterDto } from "../contracts/letters";
import { ApiErrorDetail } from "../contracts/errors";
Expand Down Expand Up @@ -81,34 +81,60 @@ export const getLetterDataUrl = async (
}
};

function chunk(arr: LetterDto[], size: number): LetterDto[][] {
const out: LetterDto[][] = [];
for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size));
return out;
}

export async function enqueueLetterUpdateRequests(
updateRequests: LetterDto[],
correlationId: string,
deps: Deps,
) {
const tasks = updateRequests.map(async (request: LetterDto) => {
try {
const command = new SendMessageCommand({
QueueUrl: deps.env.QUEUE_URL,
MessageAttributes: {
CorrelationId: { DataType: "String", StringValue: correlationId },
},
MessageBody: JSON.stringify(request),
});
await deps.sqsClient.send(command);
} catch (error) {
deps.logger.error(
{
err: error,
correlationId,
letterId: request.id,
letterStatus: request.status,
supplierId: request.supplierId,
},
"Error enqueuing letter status update",
);
}
});
const BATCH_SIZE = 10; // SQS SendMessageBatch max
const CONCURRENCY = 5; // number of parallel batch API calls

const batches = chunk(updateRequests, BATCH_SIZE);

await Promise.all(tasks);
// send batches in groups with limited concurrency
// BATCH_SIZE * CONCURRENCY is the number of total updates / db calls in-flight
for (let i = 0; i < batches.length; i += CONCURRENCY) {
const window = batches.slice(i, i + CONCURRENCY);

await Promise.all(
window.map(async (batch, batchIdx) => {
const entries = batch.map((request, idx) => ({
Id: `${i + batchIdx}-${idx}`, // unique per batch entry
MessageBody: JSON.stringify(request),
MessageAttributes: {
CorrelationId: { DataType: "String", StringValue: correlationId },
},
}));

const cmd = new SendMessageBatchCommand({
QueueUrl: deps.env.QUEUE_URL,
Entries: entries,
});

try {
const result = await deps.sqsClient.send(cmd);
if (result.Failed && result.Failed.length > 0) {
deps.logger.error(
{ failed: result.Failed },
"Some batch entries failed",
);
}
} catch (error) {
deps.logger.error(
{
err: error,
correlationId,
},
"Error enqueuing letter status updates",
);
}
}),
);
}
}
Loading