Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions lib/cluster/ClusterSubscriberGroup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,18 @@
}

/**
* Disconnect all subscribers
* Disconnect all subscribers and clear some of the internal state.
*/
stop() {
for (const s of this.shardedSubscribers.values()) {
s.stop();
}

// Clear subscriber instances and pending operations.
// Channels are preserved for resubscription on reconnect.
this.pendingReset = null;
this.shardedSubscribers.clear();
this.subscriberToSlotsIndex.clear();
}

/**
Expand Down Expand Up @@ -138,10 +144,7 @@
/**
* Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones.
*/
public async reset(
clusterSlots: string[][],
clusterNodes: any[]
): Promise<void> {
async reset(clusterSlots: string[][], clusterNodes: any[]): Promise<void> {
if (this.isResetting) {
this.pendingReset = { slots: clusterSlots, nodes: clusterNodes };
return;
Expand Down Expand Up @@ -181,7 +184,7 @@

const startPromises = [];
// For each node in slots cache
for (const [nodeKey, _] of this.subscriberToSlotsIndex) {

Check warning on line 187 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

'_' is assigned a value but never used

Check warning on line 187 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (12.x)

'_' is assigned a value but never used

Check warning on line 187 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

'_' is assigned a value but never used

Check warning on line 187 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (10.x)

'_' is assigned a value but never used

Check warning on line 187 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'_' is assigned a value but never used

Check warning on line 187 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

'_' is assigned a value but never used
// If we already have a subscriber for this node then keep it
if (this.shardedSubscribers.has(nodeKey)) {
debug("Skipping creating new subscriber for %s", nodeKey);
Expand Down
17 changes: 14 additions & 3 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1133,10 +1133,21 @@ class Cluster extends EventEmitter {
this.subscriberGroupEmitter
);

// Error handler used only for sharded-subscriber-triggered slots cache refreshes.
// Normal (non-subscriber) connections are created with lazyConnect: true and can
// become zombied. For sharded subscribers, a ClusterAllFailedError means
// we have lost all nodes from the subscriber perspective and must tear down.
const refreshSlotsCacheCallback = (err?: Error) => {
// Disconnect only when refreshing the slots cache fails with ClusterAllFailedError
if (err instanceof ClusterAllFailedError) {
this.disconnect(true);
}
};

this.subscriberGroupEmitter.on("-node", (redis, nodeKey) => {
this.emit("-node", redis, nodeKey);

this.refreshSlotsCache();
this.refreshSlotsCache(refreshSlotsCacheCallback);
});

this.subscriberGroupEmitter.on(
Expand All @@ -1145,13 +1156,13 @@ class Cluster extends EventEmitter {
this.emit("error", error);

setTimeout(() => {
this.refreshSlotsCache();
this.refreshSlotsCache(refreshSlotsCacheCallback);
}, delay);
}
);

this.subscriberGroupEmitter.on("moved", () => {
this.refreshSlotsCache();
this.refreshSlotsCache(refreshSlotsCacheCallback);
});

this.subscriberGroupEmitter.on("-subscriber", () => {
Expand Down
43 changes: 43 additions & 0 deletions test/functional/cluster/spub_ssub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,49 @@ describe("cluster:spub/ssub", function () {
});
});

// This test covers the error handler used only for sharded-subscriber-triggered
// slots cache refreshes. Normal (non-subscriber) connections are created with
// lazyConnect: true and can become zombied. For sharded subscribers, a
// ClusterAllFailedError means we have lost all nodes from the subscriber
// perspective and must tear down.
it("should trigger reconnect when subscriber node goes down and refresh fails", (done) => {
let clusterSlotsCallCount = 0;
const handler = function (argv) {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure uppercase works?

clusterSlotsCallCount++;
// First call: during connect() - must succeed to reach "ready" state
// Second call: subscriber-triggered refresh after we kill the subscriber - fail to trigger reconnect
if (clusterSlotsCallCount === 2) {
return new Error("CLUSTERDOWN The cluster is down");
}
return [[0, 16383, ["127.0.0.1", 30001]]];
}
};
const server = new MockServer(30001, handler);

const ssub = new Cluster([{ host: "127.0.0.1", port: 30001 }], {
shardedSubscribers: true,
slotsRefreshInterval: 0, // Disable periodic refresh - test subscriber-triggered path only
});

ssub.once("ready", () => {
// Close ONLY the subscriber connections, not the main connection.
// The main connection stays open ("zombied") so that the reconnect is
// driven solely by the sharded-subscriber error path, not by pool drain.
server
.getAllClients()
.filter((client) => getConnectionName(client)?.includes("ssub"))
.forEach((client) => client.destroy());
});

// After the subscriber-triggered slots refresh fails, we expect the
// Cluster instance to transition into the reconnecting state.
ssub.on("reconnecting", () => {
ssub.disconnect();
done();
});
});

it("should re-subscribe after reconnection", function (done) {
new MockServer(30001, function (argv) {
if (argv[0] === "cluster" && argv[1] === "slots") {
Expand Down
Loading