Skip to content

Commit 1ac64c4

Browse files
committed
refactor - extract utility method to wait for in-flight commands to complete
1 parent 85a646f commit 1ac64c4

File tree

2 files changed

+13
-16
lines changed

2 files changed

+13
-16
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ export default class RedisCommandsQueue {
7777
this.#maxLength = maxLength;
7878
this.#onShardedChannelMoved = onShardedChannelMoved;
7979
this.decoder = this.#initiateDecoder();
80-
this.#waitingForReply.events.on('empty', this.events.emit.bind(this.events, 'waitingForReplyEmpty'))
8180
}
8281

8382
#onReply(reply: ReplyUnion) {
@@ -154,8 +153,15 @@ export default class RedisCommandsQueue {
154153
this.#invalidateCallback = callback;
155154
}
156155

157-
isWaitingForReply(): boolean {
158-
return this.#waitingForReply.length > 0;
156+
async waitForInflightCommandsToComplete(): Promise<void> {
157+
// In-flight commands already completed
158+
if(this.#waitingForReply.length === 0) {
159+
return
160+
};
161+
// Otherwise wait for in-flight commands to fire `empty` event
162+
return new Promise(resolve => {
163+
this.#waitingForReply.events.on('empty', resolve)
164+
});
159165
}
160166

161167
addCommand<T>(

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
1919

2020
// Queue:
2121
// toWrite [ C D E ]
22-
// waitingForReply [ A B ]
22+
// waitingForReply [ A B ] - aka In-flight commands
2323
//
2424
// time: ---1-2---3-4-5-6---------------------------
2525
//
2626
// 1. [EVENT] MOVING PN received
2727
// 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete )
2828
// 3. [EVENT] New socket connected
29-
// 4. [EVENT] WaitingForReply commands completed
29+
// 4. [EVENT] In-flight commands completed
3030
// 5. [ACTION] Destroy old socket
3131
// 6. [ACTION] Resume writing -> we are going to write to the new socket from now on
3232
#onMoving = async (
@@ -46,17 +46,8 @@ export default class EnterpriseMaintenanceManager extends EventEmitter {
4646
await newSocket.connect();
4747
// 3 [EVENT] New socket connected
4848

49-
// Wait until waitingForReply is empty
50-
await new Promise<void>((resolve) => {
51-
if (!this.commandsQueue.isWaitingForReply()) {
52-
resolve();
53-
} else {
54-
this.commandsQueue.events.once("waitingForReplyEmpty", () => {
55-
resolve();
56-
});
57-
}
58-
});
59-
// 4 [EVENT] WaitingForReply commands completed
49+
await this.commandsQueue.waitForInflightCommandsToComplete();
50+
// 4 [EVENT] In-flight commands completed
6051

6152
// 5 + 6
6253
this.emit('resume', newSocket);

0 commit comments

Comments
 (0)