Skip to content
584 changes: 476 additions & 108 deletions packages/redis-worker/src/fair-queue/index.ts

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion packages/redis-worker/src/fair-queue/keyProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import type { FairQueueKeyProducer } from "./types.js";
* Uses a configurable prefix and standard key structure.
*
* Key structure:
* - Master queue: {prefix}:master:{shardId}
* - Master queue: {prefix}:master:{shardId} (legacy, drain-only)
* - Dispatch index: {prefix}:dispatch:{shardId} (Level 1: tenantIds)
* - Tenant queue index: {prefix}:tenantq:{tenantId} (Level 2: queueIds)
* - Queue: {prefix}:queue:{queueId}
* - Queue items: {prefix}:queue:{queueId}:items
* - Concurrency: {prefix}:concurrency:{groupName}:{groupId}
Expand Down Expand Up @@ -70,6 +72,18 @@ export class DefaultFairQueueKeyProducer implements FairQueueKeyProducer {
return this.#buildKey("worker", consumerId);
}

// ============================================================================
// Tenant Dispatch Keys (Two-Level Index)
// ============================================================================

dispatchKey(shardId: number): string {
return this.#buildKey("dispatch", shardId.toString());
}

tenantQueueIndexKey(tenantId: string): string {
return this.#buildKey("tenantq", tenantId);
}

// ============================================================================
// Dead Letter Queue Keys
// ============================================================================
Expand Down
94 changes: 94 additions & 0 deletions packages/redis-worker/src/fair-queue/schedulers/drr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { createRedisClient, type Redis, type RedisOptions } from "@internal/redi
import { BaseScheduler } from "../scheduler.js";
import type {
DRRSchedulerConfig,
DispatchSchedulerContext,
FairQueueKeyProducer,
SchedulerContext,
TenantQueues,
Expand Down Expand Up @@ -132,6 +133,70 @@ export class DRRScheduler extends BaseScheduler {
}));
}

/**
* Select queues using the two-level tenant dispatch index.
*
* Algorithm:
* 1. ZRANGEBYSCORE on dispatch index (gets only tenants with queues - much smaller)
* 2. Add quantum to each tenant's deficit (atomically)
* 3. Check capacity as safety net (dispatch should only have tenants with capacity)
* 4. Select tenants with deficit >= 1, sorted by deficit (highest first)
* 5. For each tenant, fetch their queues from Level 2 index
*/
async selectQueuesFromDispatch(
dispatchShardKey: string,
consumerId: string,
context: DispatchSchedulerContext
): Promise<TenantQueues[]> {
// Level 1: Get tenants from dispatch index
const tenants = await this.#getTenantsFromDispatch(dispatchShardKey);

if (tenants.length === 0) {
return [];
}

const tenantIds = tenants.map((t) => t.tenantId);

// Add quantum to all active tenants atomically (1 Lua call)
const deficits = await this.#addQuantumToTenants(tenantIds);

// Build candidates sorted by deficit (highest first)
const candidates = tenantIds
.map((tenantId, index) => ({ tenantId, deficit: deficits[index] ?? 0 }))
.filter((t) => t.deficit >= 1);

candidates.sort((a, b) => b.deficit - a.deficit);

// Pick the first tenant with available capacity and fetch their queues.
// This keeps the scheduler cheap: O(1) in the common case where the
// highest-deficit tenant has capacity. The consumer loop iterates fast
// (1ms yield between rounds) so we cycle through tenants quickly.
for (const { tenantId, deficit } of candidates) {
const isAtCapacity = await context.isAtCapacity("tenant", tenantId);
if (isAtCapacity) continue;

// Limit queues fetched to what the tenant can actually process this round.
// deficit = max messages this tenant should process, so no point fetching
// more queues than that (each queue yields at least 1 message).
const queueLimit = Math.ceil(deficit);
const queues = await context.getQueuesForTenant(tenantId, queueLimit);
if (queues.length > 0) {
this.logger.debug("DRR dispatch: selected tenant", {
dispatchTenants: tenants.length,
candidates: candidates.length,
selectedTenant: tenantId,
deficit,
queueLimit,
queuesReturned: queues.length,
});

return [{ tenantId, queues: queues.map((q) => q.queueId) }];
}
}

return [];
}

/**
* Record that a message was processed from a tenant.
* Decrements the tenant's deficit.
Expand Down Expand Up @@ -200,6 +265,35 @@ export class DRRScheduler extends BaseScheduler {
return `${this.keys.masterQueueKey(0).split(":")[0]}:drr:deficit`;
}

async #getTenantsFromDispatch(
dispatchKey: string
): Promise<Array<{ tenantId: string; score: number }>> {
const now = Date.now();
const results = await this.redis.zrangebyscore(
dispatchKey,
"-inf",
now,
"WITHSCORES",
"LIMIT",
0,
this.masterQueueLimit
);

const tenants: Array<{ tenantId: string; score: number }> = [];
for (let i = 0; i < results.length; i += 2) {
const tenantId = results[i];
const scoreStr = results[i + 1];
if (tenantId && scoreStr) {
tenants.push({
tenantId,
score: parseFloat(scoreStr),
});
}
}

return tenants;
}

async #getQueuesFromShard(shardKey: string): Promise<QueueWithScore[]> {
const now = Date.now();
const results = await this.redis.zrangebyscore(
Expand Down
26 changes: 24 additions & 2 deletions packages/redis-worker/src/fair-queue/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface FairQueueMetrics {
// Observable gauges (registered with callbacks)
queueLength: ObservableGauge;
masterQueueLength: ObservableGauge;
dispatchLength: ObservableGauge;
inflightCount: ObservableGauge;
dlqLength: ObservableGauge;
}
Expand Down Expand Up @@ -250,6 +251,7 @@ export class FairQueueTelemetry {
registerGaugeCallbacks(callbacks: {
getQueueLength?: (queueId: string) => Promise<number>;
getMasterQueueLength?: (shardId: number) => Promise<number>;
getDispatchLength?: (shardId: number) => Promise<number>;
getInflightCount?: (shardId: number) => Promise<number>;
getDLQLength?: (tenantId: string) => Promise<number>;
shardCount?: number;
Expand All @@ -273,7 +275,7 @@ export class FairQueueTelemetry {
});
}

// Master queue length gauge
// Legacy master queue length gauge (draining, should trend to 0)
if (callbacks.getMasterQueueLength && callbacks.shardCount) {
const getMasterQueueLength = callbacks.getMasterQueueLength;
const shardCount = callbacks.shardCount;
Expand All @@ -288,6 +290,21 @@ export class FairQueueTelemetry {
});
}

// Dispatch index length gauge (new two-level dispatch, tenant count per shard)
if (callbacks.getDispatchLength && callbacks.shardCount) {
const getDispatchLength = callbacks.getDispatchLength;
const shardCount = callbacks.shardCount;

this.metrics.dispatchLength.addCallback(async (observableResult) => {
for (let shardId = 0; shardId < shardCount; shardId++) {
const length = await getDispatchLength(shardId);
observableResult.observe(length, {
[FairQueueAttributes.SHARD_ID]: shardId.toString(),
});
}
});
}

// Inflight count gauge
if (callbacks.getInflightCount && callbacks.shardCount) {
const getInflightCount = callbacks.getInflightCount;
Expand Down Expand Up @@ -317,6 +334,7 @@ export class FairQueueTelemetry {
}
});
}

}

// ============================================================================
Expand Down Expand Up @@ -414,9 +432,13 @@ export class FairQueueTelemetry {
unit: "messages",
}),
masterQueueLength: this.meter.createObservableGauge(`${this.name}.master_queue.length`, {
description: "Number of queues in master queue shard",
description: "Number of queues in legacy master queue shard (draining)",
unit: "queues",
}),
dispatchLength: this.meter.createObservableGauge(`${this.name}.dispatch.length`, {
description: "Number of tenants in dispatch index shard",
unit: "tenants",
}),
inflightCount: this.meter.createObservableGauge(`${this.name}.inflight.count`, {
description: "Number of messages currently being processed",
unit: "messages",
Expand Down
182 changes: 182 additions & 0 deletions packages/redis-worker/src/fair-queue/tenantDispatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis";
import { jumpHash } from "@trigger.dev/core/v3/serverOnly";
import type { FairQueueKeyProducer, QueueWithScore } from "./types.js";

export interface TenantDispatchOptions {
redis: RedisOptions;
keys: FairQueueKeyProducer;
shardCount: number;
}

export interface TenantWithScore {
tenantId: string;
score: number;
}

/**
* TenantDispatch manages the two-level tenant dispatch index.
*
* Level 1 - Dispatch Index (per shard):
* Key: {prefix}:dispatch:{shardId}
* ZSET of tenantIds scored by oldest message timestamp across their queues.
* Only tenants with queues containing messages appear here.
*
* Level 2 - Per-Tenant Queue Index:
* Key: {prefix}:tenantq:{tenantId}
* ZSET of queueIds scored by oldest message timestamp in that queue.
*
* This replaces the flat master queue for new enqueues, isolating each tenant's
* queue backlog so the scheduler iterates tenants (not queues) at Level 1.
*/
export class TenantDispatch {
private redis: Redis;
private keys: FairQueueKeyProducer;
private shardCount: number;

constructor(private options: TenantDispatchOptions) {
this.redis = createRedisClient(options.redis);
this.keys = options.keys;
this.shardCount = Math.max(1, options.shardCount);
}

/**
* Get the shard ID for a queue.
* Uses the same jump consistent hash as MasterQueue for consistency.
*/
getShardForQueue(queueId: string): number {
return jumpHash(queueId, this.shardCount);
}

/**
* Get eligible tenants from a dispatch shard (Level 1).
* Returns tenants ordered by oldest message (lowest score first).
*/
async getTenantsFromShard(
shardId: number,
limit: number = 1000,
maxScore?: number
): Promise<TenantWithScore[]> {
const dispatchKey = this.keys.dispatchKey(shardId);
const score = maxScore ?? Date.now();

const results = await this.redis.zrangebyscore(
dispatchKey,
"-inf",
score,
"WITHSCORES",
"LIMIT",
0,
limit
);

const tenants: TenantWithScore[] = [];
for (let i = 0; i < results.length; i += 2) {
const tenantId = results[i];
const scoreStr = results[i + 1];
if (tenantId && scoreStr) {
tenants.push({
tenantId,
score: parseFloat(scoreStr),
});
}
}

return tenants;
}

/**
* Get queues for a specific tenant (Level 2).
* Returns queues ordered by oldest message (lowest score first).
*/
async getQueuesForTenant(
tenantId: string,
limit: number = 1000,
maxScore?: number
): Promise<QueueWithScore[]> {
const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId);
const score = maxScore ?? Date.now();

const results = await this.redis.zrangebyscore(
tenantQueueKey,
"-inf",
score,
"WITHSCORES",
"LIMIT",
0,
limit
);

const queues: QueueWithScore[] = [];
for (let i = 0; i < results.length; i += 2) {
const queueId = results[i];
const scoreStr = results[i + 1];
if (queueId && scoreStr) {
queues.push({
queueId,
score: parseFloat(scoreStr),
tenantId,
});
}
}

return queues;
}

/**
* Get the number of tenants in a dispatch shard.
*/
async getShardTenantCount(shardId: number): Promise<number> {
const dispatchKey = this.keys.dispatchKey(shardId);
return await this.redis.zcard(dispatchKey);
}

/**
* Get total tenant count across all dispatch shards.
* Note: tenants may appear in multiple shards, so this may overcount.
*/
async getTotalTenantCount(): Promise<number> {
const counts = await Promise.all(
Array.from({ length: this.shardCount }, (_, i) => this.getShardTenantCount(i))
);
return counts.reduce((sum, count) => sum + count, 0);
}

/**
* Get the number of queues for a tenant.
*/
async getTenantQueueCount(tenantId: string): Promise<number> {
const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId);
return await this.redis.zcard(tenantQueueKey);
}

/**
* Remove a tenant from a specific dispatch shard.
*/
async removeTenantFromShard(shardId: number, tenantId: string): Promise<void> {
const dispatchKey = this.keys.dispatchKey(shardId);
await this.redis.zrem(dispatchKey, tenantId);
}

/**
* Add a tenant to a dispatch shard with the given score.
*/
async addTenantToShard(shardId: number, tenantId: string, score: number): Promise<void> {
const dispatchKey = this.keys.dispatchKey(shardId);
await this.redis.zadd(dispatchKey, score, tenantId);
}

/**
* Remove a queue from a tenant's queue index.
*/
async removeQueueFromTenant(tenantId: string, queueId: string): Promise<void> {
const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId);
await this.redis.zrem(tenantQueueKey, queueId);
}

/**
* Close the Redis connection.
*/
async close(): Promise<void> {
await this.redis.quit();
}
}
Loading