Skip to content
582 changes: 477 additions & 105 deletions packages/redis-worker/src/fair-queue/index.ts

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion packages/redis-worker/src/fair-queue/keyProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import type { FairQueueKeyProducer } from "./types.js";
* Uses a configurable prefix and standard key structure.
*
* Key structure:
* - Master queue: {prefix}:master:{shardId}
* - Master queue: {prefix}:master:{shardId} (legacy, drain-only)
* - Dispatch index: {prefix}:dispatch:{shardId} (Level 1: tenantIds)
* - Tenant queue index: {prefix}:tenantq:{tenantId} (Level 2: queueIds)
* - Queue: {prefix}:queue:{queueId}
* - Queue items: {prefix}:queue:{queueId}:items
* - Concurrency: {prefix}:concurrency:{groupName}:{groupId}
Expand Down Expand Up @@ -70,6 +72,18 @@ export class DefaultFairQueueKeyProducer implements FairQueueKeyProducer {
return this.#buildKey("worker", consumerId);
}

// ============================================================================
// Tenant Dispatch Keys (Two-Level Index)
// ============================================================================

dispatchKey(shardId: number): string {
return this.#buildKey("dispatch", shardId.toString());
}

tenantQueueIndexKey(tenantId: string): string {
return this.#buildKey("tenantq", tenantId);
}

// ============================================================================
// Dead Letter Queue Keys
// ============================================================================
Expand Down
102 changes: 102 additions & 0 deletions packages/redis-worker/src/fair-queue/schedulers/drr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { createRedisClient, type Redis, type RedisOptions } from "@internal/redi
import { BaseScheduler } from "../scheduler.js";
import type {
DRRSchedulerConfig,
DispatchSchedulerContext,
FairQueueKeyProducer,
SchedulerContext,
TenantQueues,
Expand Down Expand Up @@ -132,6 +133,78 @@ export class DRRScheduler extends BaseScheduler {
}));
}

/**
* Select queues using the two-level tenant dispatch index.
*
* Algorithm:
* 1. ZRANGEBYSCORE on dispatch index (gets only tenants with queues - much smaller)
* 2. Add quantum to each tenant's deficit (atomically)
* 3. Check capacity as safety net (dispatch should only have tenants with capacity)
* 4. Select tenants with deficit >= 1, sorted by deficit (highest first)
* 5. For each tenant, fetch their queues from Level 2 index
*/
async selectQueuesFromDispatch(
dispatchShardKey: string,
consumerId: string,
context: DispatchSchedulerContext
): Promise<TenantQueues[]> {
// Level 1: Get tenants from dispatch index
const tenants = await this.#getTenantsFromDispatch(dispatchShardKey);

if (tenants.length === 0) {
return [];
}

const tenantIds = tenants.map((t) => t.tenantId);

// Add quantum to all active tenants atomically
const deficits = await this.#addQuantumToTenants(tenantIds);

// Build tenant data with deficits and capacity checks
const tenantData: Array<{
tenantId: string;
deficit: number;
isAtCapacity: boolean;
}> = await Promise.all(
tenantIds.map(async (tenantId, index) => {
// Capacity check as safety net - dispatch should already exclude at-capacity tenants
// once capacity-based pruning is implemented as a follow-up
const isAtCapacity = await context.isAtCapacity("tenant", tenantId);
return {
tenantId,
deficit: deficits[index] ?? 0,
isAtCapacity,
};
})
);

// Filter out tenants at capacity or with no deficit
const eligibleTenants = tenantData.filter((t) => !t.isAtCapacity && t.deficit >= 1);

// Sort by deficit (highest first for fairness)
eligibleTenants.sort((a, b) => b.deficit - a.deficit);

this.logger.debug("DRR dispatch: tenant selection complete", {
dispatchTenants: tenants.length,
eligibleTenants: eligibleTenants.length,
topTenantDeficit: eligibleTenants[0]?.deficit,
});

// Level 2: For each eligible tenant, fetch their queues
const result: TenantQueues[] = [];
for (const { tenantId } of eligibleTenants) {
const queues = await context.getQueuesForTenant(tenantId);
if (queues.length > 0) {
result.push({
tenantId,
queues: queues.map((q) => q.queueId),
});
}
}

return result;
}

/**
* Record that a message was processed from a tenant.
* Decrements the tenant's deficit.
Expand Down Expand Up @@ -200,6 +273,35 @@ export class DRRScheduler extends BaseScheduler {
return `${this.keys.masterQueueKey(0).split(":")[0]}:drr:deficit`;
}

async #getTenantsFromDispatch(
dispatchKey: string
): Promise<Array<{ tenantId: string; score: number }>> {
const now = Date.now();
const results = await this.redis.zrangebyscore(
dispatchKey,
"-inf",
now,
"WITHSCORES",
"LIMIT",
0,
this.masterQueueLimit
);

const tenants: Array<{ tenantId: string; score: number }> = [];
for (let i = 0; i < results.length; i += 2) {
const tenantId = results[i];
const scoreStr = results[i + 1];
if (tenantId && scoreStr) {
tenants.push({
tenantId,
score: parseFloat(scoreStr),
});
}
}

return tenants;
}

async #getQueuesFromShard(shardKey: string): Promise<QueueWithScore[]> {
const now = Date.now();
const results = await this.redis.zrangebyscore(
Expand Down
26 changes: 24 additions & 2 deletions packages/redis-worker/src/fair-queue/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface FairQueueMetrics {
// Observable gauges (registered with callbacks)
queueLength: ObservableGauge;
masterQueueLength: ObservableGauge;
dispatchLength: ObservableGauge;
inflightCount: ObservableGauge;
dlqLength: ObservableGauge;
}
Expand Down Expand Up @@ -250,6 +251,7 @@ export class FairQueueTelemetry {
registerGaugeCallbacks(callbacks: {
getQueueLength?: (queueId: string) => Promise<number>;
getMasterQueueLength?: (shardId: number) => Promise<number>;
getDispatchLength?: (shardId: number) => Promise<number>;
getInflightCount?: (shardId: number) => Promise<number>;
getDLQLength?: (tenantId: string) => Promise<number>;
shardCount?: number;
Expand All @@ -273,7 +275,7 @@ export class FairQueueTelemetry {
});
}

// Master queue length gauge
// Legacy master queue length gauge (draining, should trend to 0)
if (callbacks.getMasterQueueLength && callbacks.shardCount) {
const getMasterQueueLength = callbacks.getMasterQueueLength;
const shardCount = callbacks.shardCount;
Expand All @@ -288,6 +290,21 @@ export class FairQueueTelemetry {
});
}

// Dispatch index length gauge (new two-level dispatch, tenant count per shard)
if (callbacks.getDispatchLength && callbacks.shardCount) {
const getDispatchLength = callbacks.getDispatchLength;
const shardCount = callbacks.shardCount;

this.metrics.dispatchLength.addCallback(async (observableResult) => {
for (let shardId = 0; shardId < shardCount; shardId++) {
const length = await getDispatchLength(shardId);
observableResult.observe(length, {
[FairQueueAttributes.SHARD_ID]: shardId.toString(),
});
}
});
}

// Inflight count gauge
if (callbacks.getInflightCount && callbacks.shardCount) {
const getInflightCount = callbacks.getInflightCount;
Expand Down Expand Up @@ -317,6 +334,7 @@ export class FairQueueTelemetry {
}
});
}

}

// ============================================================================
Expand Down Expand Up @@ -414,9 +432,13 @@ export class FairQueueTelemetry {
unit: "messages",
}),
masterQueueLength: this.meter.createObservableGauge(`${this.name}.master_queue.length`, {
description: "Number of queues in master queue shard",
description: "Number of queues in legacy master queue shard (draining)",
unit: "queues",
}),
dispatchLength: this.meter.createObservableGauge(`${this.name}.dispatch.length`, {
description: "Number of tenants in dispatch index shard",
unit: "tenants",
}),
inflightCount: this.meter.createObservableGauge(`${this.name}.inflight.count`, {
description: "Number of messages currently being processed",
unit: "messages",
Expand Down
Loading
Loading