From 8e6a8d35e94b72f1355cc2ec2d5a094e8a230df1 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 28 Aug 2025 17:03:33 +0100 Subject: [PATCH] fix(run-queue): Scan for queues using a duplicate redis client instead of the instance version --- .../run-engine/src/run-queue/index.ts | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index ec12c19f1d..940745bd83 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -909,7 +909,9 @@ export class RunQueue { onError?: (error: Error) => void ): { stream: Readable; redis: Redis } { const pattern = this.keys.currentConcurrencySetKeyScanPattern(); - const stream = this.redis.scanStream({ + const redis = this.redis.duplicate(); + + const stream = redis.scanStream({ match: pattern, count, type: "set", @@ -925,7 +927,7 @@ export class RunQueue { return { stream, - redis: this.redis, + redis, }; } @@ -1938,11 +1940,23 @@ export class RunQueue { ); }); + const [scanError] = await tryCatch(promise); + + if (scanError) { + this.logger.error("Error scanning concurrency sets", { + error: scanError, + }); + } + + await redis.quit(); + return promise; } private async processConcurrencySet(concurrencyKey: string) { - const stream = this.redis.sscanStream(concurrencyKey, { + const redis = this.redis.duplicate(); + + const stream = redis.sscanStream(concurrencyKey, { count: 100, }); @@ -1991,6 +2005,16 @@ export class RunQueue { stream.resume(); }); + const [scanError] = await tryCatch(promise); + + if (scanError) { + this.logger.error("Error scanning concurrency sets", { + error: scanError, + }); + } + + await redis.quit(); + return promise; }