Skip to content

Commit e379fc1

Browse files
authored
Fix redis worker debounce (#1966)
* Added some Redis worker debounce tests (one failing that reproduces a bug) * Added some tests for acking * Added a deduplicationKey to prevent acking when items are queued * The worker passes the deduplicationKey back in for acking * Improved logs and removed events from test
1 parent c0172d4 commit e379fc1

File tree

3 files changed

+345
-11
lines changed

3 files changed

+345
-11
lines changed

packages/redis-worker/src/queue.ts

+43-8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export type QueueItem<TMessageCatalog extends MessageCatalogSchema> = {
2727
visibilityTimeoutMs: number;
2828
attempt: number;
2929
timestamp: Date;
30+
deduplicationKey?: string;
3031
};
3132

3233
export type AnyQueueItem = {
@@ -36,6 +37,7 @@ export type AnyQueueItem = {
3637
visibilityTimeoutMs: number;
3738
attempt: number;
3839
timestamp: Date;
40+
deduplicationKey?: string;
3941
};
4042

4143
export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
@@ -98,11 +100,13 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
98100
}): Promise<void> {
99101
try {
100102
const score = availableAt ? availableAt.getTime() : Date.now();
103+
const deduplicationKey = nanoid();
101104
const serializedItem = JSON.stringify({
102105
job,
103106
item,
104107
visibilityTimeoutMs,
105108
attempt,
109+
deduplicationKey,
106110
});
107111

108112
const result = await this.redis.enqueueItem(
@@ -136,7 +140,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
136140
return [];
137141
}
138142

139-
const dequeuedItems = [];
143+
const dequeuedItems: Array<QueueItem<TMessageCatalog>> = [];
140144

141145
for (const [id, serializedItem, score] of results) {
142146
const parsedItem = JSON.parse(serializedItem) as any;
@@ -186,6 +190,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
186190
visibilityTimeoutMs,
187191
attempt: parsedItem.attempt ?? 0,
188192
timestamp,
193+
deduplicationKey: parsedItem.deduplicationKey,
189194
});
190195
}
191196

@@ -200,14 +205,26 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
200205
}
201206
}
202207

203-
async ack(id: string): Promise<void> {
208+
async ack(id: string, deduplicationKey?: string): Promise<void> {
204209
try {
205-
await this.redis.ackItem(`queue`, `items`, id);
210+
const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? "");
211+
if (result !== 1) {
212+
this.logger.debug(
213+
`SimpleQueue ${this.name}.ack(): ack operation returned ${result}. This means it was not removed from the queue.`,
214+
{
215+
queue: this.name,
216+
id,
217+
deduplicationKey,
218+
result,
219+
}
220+
);
221+
}
206222
} catch (e) {
207223
this.logger.error(`SimpleQueue ${this.name}.ack(): error acknowledging item`, {
208224
queue: this.name,
209225
error: e,
210226
id,
227+
deduplicationKey,
211228
});
212229
throw e;
213230
}
@@ -367,15 +384,32 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
367384
this.redis.defineCommand("ackItem", {
368385
numberOfKeys: 2,
369386
lua: `
370-
local queue = KEYS[1]
371-
local items = KEYS[2]
387+
local queueKey = KEYS[1]
388+
local itemsKey = KEYS[2]
372389
local id = ARGV[1]
390+
local deduplicationKey = ARGV[2]
373391
374-
redis.call('ZREM', queue, id)
375-
redis.call('HDEL', items, id)
392+
-- Get the item from the hash
393+
local item = redis.call('HGET', itemsKey, id)
394+
if not item then
395+
return -1
396+
end
376397
398+
-- Only check deduplicationKey if a non-empty one was passed in
399+
if deduplicationKey and deduplicationKey ~= "" then
400+
local success, parsed = pcall(cjson.decode, item)
401+
if success then
402+
if parsed.deduplicationKey and parsed.deduplicationKey ~= deduplicationKey then
403+
return 0
404+
end
405+
end
406+
end
407+
408+
-- Remove from sorted set and hash
409+
redis.call('ZREM', queueKey, id)
410+
redis.call('HDEL', itemsKey, id)
377411
return 1
378-
`,
412+
`,
379413
});
380414

381415
this.redis.defineCommand("moveToDeadLetterQueue", {
@@ -468,6 +502,7 @@ declare module "@internal/redis" {
468502
queue: string,
469503
items: string,
470504
id: string,
505+
deduplicationKey: string,
471506
callback?: Callback<number>
472507
): Result<number, Context>;
473508

0 commit comments

Comments
 (0)