Skip to content

Commit ac2e1ad

Browse files
[TOOL-28] SQS Batching API (#87)
* refactor batching * rename var * add early exit when no pages
1 parent 351c6c3 commit ac2e1ad

File tree

3 files changed

+45
-57
lines changed

3 files changed

+45
-57
lines changed

apps/extract-stack/src/create-message.ts

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,33 +24,6 @@ type MessageProps<QueueUrl extends string, Shape extends ZodRawShape, MetadataSh
2424
metadataShape: MetadataShape;
2525
};
2626

27-
export function createBatchMessage<QueueUrl extends string, Shape extends ZodRawShape, MetadataShape extends ZodRawShape>({
28-
queueUrl,
29-
contentShape,
30-
metadataShape,
31-
}: MessageProps<QueueUrl, Shape, MetadataShape>) {
32-
33-
const messageSchema = z.object({
34-
content: z.object(contentShape),
35-
metadata: z.object(metadataShape),
36-
});
37-
38-
const send: BatchSend<Shape, MetadataShape> = async (content, metadata) => {
39-
console.log("sending", { content, metadata });
40-
await sqs.sendMessageBatch({
41-
QueueUrl: queueUrl,
42-
Entries: content.map((c) => ({
43-
Id: nanoid(),
44-
MessageBody: JSON.stringify(messageSchema.parse({ content: c, metadata })),
45-
})),
46-
}).promise();
47-
}
48-
49-
return {
50-
send,
51-
}
52-
}
53-
5427
export function createMessage<QueueUrl extends string, Shape extends ZodRawShape, MetadataShape extends ZodRawShape>({
5528
queueUrl,
5629
contentShape,
@@ -70,17 +43,36 @@ export function createMessage<QueueUrl extends string, Shape extends ZodRawShape
7043
}).promise();
7144
}
7245

46+
const sendAll: BatchSend<Shape, MetadataShape> = async (contentArray, metadata) => {
47+
for (let i = 0; i < contentArray.length; i += 10) {
48+
const contentBatch = contentArray.slice(i, i + 10);
49+
const Entries = contentBatch.map(content => JSON.stringify(messageSchema.parse({ content, metadata })))
50+
.map(MessageBody => ({
51+
Id: nanoid(),
52+
MessageBody
53+
}));
54+
console.log("sending batch", Entries);
55+
try {
56+
await sqs.sendMessageBatch({
57+
QueueUrl: queueUrl,
58+
Entries
59+
}).promise();
60+
} catch (error) {
61+
console.error(error);
62+
}
63+
}
64+
65+
}
66+
7367
return {
7468
send,
69+
sendAll
7570
}
7671
}
7772

7873
type Sender<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
7974
send: Send<Shape, MetadataShape>;
80-
}
81-
82-
type BatchSender<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
83-
send: BatchSend<Shape, MetadataShape>
75+
sendAll: BatchSend<Shape, MetadataShape>
8476
}
8577

8678
type MessagePayload<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
@@ -89,7 +81,7 @@ type MessagePayload<Shape extends ZodRawShape, MetadataShape extends ZodRawShape
8981
}
9082

9183
export function QueueHandler<Shape extends ZodRawShape, MetadataShape extends ZodRawShape>(
92-
_sender: Sender<Shape, MetadataShape> | BatchSender<Shape, MetadataShape>,
84+
_sender: Sender<Shape, MetadataShape>,
9385
cb: (
9486
message: MessagePayload<Shape, MetadataShape>
9587
) => Promise<void>

apps/extract-stack/src/extract-member.ts

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type { Namespace, Repository } from "@acme/extract-schema";
1010
import { GitHubSourceControl, GitlabSourceControl } from "@acme/source-control";
1111
import type { Pagination } from "@acme/source-control";
1212
import { Config } from "sst/node/config";
13-
import { extractMemberPageBatchMessage } from "./messages";
13+
import { extractMemberPageMessage } from "./messages";
1414

1515
import { QueueHandler } from "./create-message";
1616

@@ -61,7 +61,7 @@ const extractMembersPage = async ({ namespace, repository, sourceControl, userId
6161
context.integrations.sourceControl = await initSourceControl(userId, sourceControl);
6262
} catch (error) {
6363
console.error(error);
64-
return;
64+
throw error;
6565
}
6666

6767
const { paginationInfo: resultPaginationInfo } = await getMembers({
@@ -76,11 +76,7 @@ const extractMembersPage = async ({ namespace, repository, sourceControl, userId
7676
return resultPaginationInfo;
7777
};
7878

79-
const range = (a: number, b: number) => Array.apply(0, { length: b - a + 1 } as number[]).map((_, index) => index + a);
80-
const chunks = <T>(array: Array<T>, size: number) => Array.apply(0, { length: Math.ceil(array.length / size) } as unknown[]).map((_, index) => array.slice(index * size, (index + 1) * size));
81-
8279
export const eventHandler = EventHandler(extractRepositoryEvent, async (ev) => {
83-
8480
const pagination = await extractMembersPage({
8581
namespace: ev.properties.namespace,
8682
repository: ev.properties.repository,
@@ -89,32 +85,32 @@ export const eventHandler = EventHandler(extractRepositoryEvent, async (ev) => {
8985
paginationInfo: { page: 1, perPage: 2, totalPages: 1000 },
9086
});
9187

92-
if (!pagination) return;
93-
94-
const remainingMemberPages = range(2, pagination.totalPages)
95-
.map(page => ({
96-
page,
97-
perPage: pagination.perPage,
98-
totalPages: pagination.totalPages
99-
} satisfies Pagination));
100-
101-
const batchedPages = chunks(remainingMemberPages, 10);
102-
103-
await Promise.all(batchedPages.map(batch => extractMemberPageBatchMessage.send(
104-
batch.map(page => ({
88+
const arrayOfExtractMemberPageMessageContent: { repository: Repository, namespace: Namespace | null, pagination: Pagination }[] = [];
89+
for (let i = 2; i <= pagination.totalPages; i++) {
90+
arrayOfExtractMemberPageMessageContent.push({
10591
namespace: ev.properties.namespace,
10692
repository: ev.properties.repository,
107-
pagination: page
108-
})), {
93+
pagination: {
94+
page: i,
95+
perPage: pagination.perPage,
96+
totalPages: pagination.totalPages
97+
}
98+
})
99+
}
100+
101+
if (arrayOfExtractMemberPageMessageContent.length === 0) return console.log("No more pages left, no need to enqueue");
102+
103+
await extractMemberPageMessage.sendAll(arrayOfExtractMemberPageMessageContent, {
109104
version: 1,
110105
caller: 'extract-member',
111106
sourceControl: ev.metadata.sourceControl,
112107
userId: ev.metadata.userId,
113108
timestamp: new Date().getTime(),
114-
})));
109+
})
110+
115111
});
116112

117-
export const queueHandler = QueueHandler(extractMemberPageBatchMessage, async (message) => {
113+
export const queueHandler = QueueHandler(extractMemberPageMessage, async (message) => {
118114
await extractMembersPage({
119115
namespace: message.content.namespace,
120116
paginationInfo: message.content.pagination,

apps/extract-stack/src/messages.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { z } from "zod";
22
import { RepositorySchema } from "@acme/extract-schema";
33
import { NamespaceSchema } from "@acme/extract-schema/src/namespaces";
4-
import { createBatchMessage } from "./create-message";
4+
import { createMessage } from "./create-message";
55
import { Queue } from 'sst/node/queue'
66

77
const paginationSchema = z.object({
@@ -24,7 +24,7 @@ const metadataSchema = z.object({
2424
userId: z.string(),
2525
});
2626

27-
export const extractMemberPageBatchMessage = createBatchMessage({
27+
export const extractMemberPageMessage = createMessage({
2828
metadataShape: metadataSchema.shape,
2929
contentShape: extractMemberPageMessageSchema.shape,
3030
queueUrl: Queue.ExtractMemberPageQueue.queueUrl

0 commit comments

Comments
 (0)