diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index ec12c19f1d..940745bd83 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -909,7 +909,9 @@ export class RunQueue { onError?: (error: Error) => void ): { stream: Readable; redis: Redis } { const pattern = this.keys.currentConcurrencySetKeyScanPattern(); - const stream = this.redis.scanStream({ + const redis = this.redis.duplicate(); + + const stream = redis.scanStream({ match: pattern, count, type: "set", @@ -925,7 +927,7 @@ export class RunQueue { return { stream, - redis: this.redis, + redis, }; } @@ -1938,11 +1940,23 @@ export class RunQueue { ); }); + const [scanError] = await tryCatch(promise); + + if (scanError) { + this.logger.error("Error scanning concurrency sets", { + error: scanError, + }); + } + + await redis.quit(); + return promise; } private async processConcurrencySet(concurrencyKey: string) { - const stream = this.redis.sscanStream(concurrencyKey, { + const redis = this.redis.duplicate(); + + const stream = redis.sscanStream(concurrencyKey, { count: 100, }); @@ -1991,6 +2005,16 @@ export class RunQueue { stream.resume(); }); + const [scanError] = await tryCatch(promise); + + if (scanError) { + this.logger.error("Error scanning concurrency sets", { + error: scanError, + }); + } + + await redis.quit(); + return promise; }