Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
EventsMap,
TypedEventBroadcaster,
} from "./typed-events";
import { PUBLISH } from "./util";

const debug = debugModule("socket.io-emitter");

Expand Down Expand Up @@ -41,13 +42,19 @@ export interface EmitterOptions {
* Defaults to notepack.io, a MessagePack implementation.
*/
parser?: Parser;

/**
* Enable the sharded PubSub or not. Default to `false`.
*/
sharded?: false;
}

interface BroadcastOptions {
nsp: string;
broadcastChannel: string;
requestChannel: string;
parser: Parser;
sharded: false;
}

interface BroadcastFlags {
Expand All @@ -68,6 +75,7 @@ export class Emitter<EmitEvents extends EventsMap = DefaultEventsMap> {
{
key: "socket.io",
parser: msgpack,
sharded: false,
},
opts
);
Expand All @@ -76,6 +84,7 @@ export class Emitter<EmitEvents extends EventsMap = DefaultEventsMap> {
broadcastChannel: this.opts.key + "#" + nsp + "#",
requestChannel: this.opts.key + "-request#" + nsp + "#",
parser: this.opts.parser,
sharded: this.opts.sharded,
};
}

Expand Down Expand Up @@ -233,7 +242,12 @@ export class Emitter<EmitEvents extends EventsMap = DefaultEventsMap> {
data: args,
});

this.redisClient.publish(this.broadcastOptions.requestChannel, request);
PUBLISH(
this.redisClient,
this.broadcastOptions.requestChannel,
request,
this.opts.sharded
);
}
}

Expand Down Expand Up @@ -386,7 +400,7 @@ export class BroadcastOperator<EmitEvents extends EventsMap>

debug("publishing message to channel %s", channel);

this.redisClient.publish(channel, msg);
PUBLISH(this.redisClient, channel, msg, this.broadcastOptions.sharded);

return true;
}
Expand All @@ -407,7 +421,12 @@ export class BroadcastOperator<EmitEvents extends EventsMap>
rooms: Array.isArray(rooms) ? rooms : [rooms],
});

this.redisClient.publish(this.broadcastOptions.requestChannel, request);
PUBLISH(
this.redisClient,
this.broadcastOptions.requestChannel,
request,
this.broadcastOptions.sharded
);
}

/**
Expand All @@ -426,7 +445,12 @@ export class BroadcastOperator<EmitEvents extends EventsMap>
rooms: Array.isArray(rooms) ? rooms : [rooms],
});

this.redisClient.publish(this.broadcastOptions.requestChannel, request);
PUBLISH(
this.redisClient,
this.broadcastOptions.requestChannel,
request,
this.broadcastOptions.sharded
);
}

/**
Expand All @@ -445,6 +469,11 @@ export class BroadcastOperator<EmitEvents extends EventsMap>
close,
});

this.redisClient.publish(this.broadcastOptions.requestChannel, request);
PUBLISH(
this.redisClient,
this.broadcastOptions.requestChannel,
request,
this.broadcastOptions.sharded
);
}
}
48 changes: 48 additions & 0 deletions lib/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Whether the client comes from the `redis` package
*
* @param redisClient
*
* @see https://github.com/redis/node-redis
*/
function isRedisV4Client(redisClient: any) {
return typeof redisClient.sSubscribe === "function";
}

/**
* Whether sharded publish using `redis` or `iosredis` package
* @param redisClient
* @param channel
* @param payload
*/
export function SPUBLISH(
redisClient: any,
channel: string,
payload: string | Uint8Array
) {
if (isRedisV4Client(redisClient)) {
redisClient.sPublish(channel, payload);
} else {
redisClient.spublish(channel, payload);
}
}

/**
* Whether publish in sharded mode.
* @param redisClient
* @param channel
* @param payload
* @param sharded
*/
export function PUBLISH(
redisClient: any,
channel: string,
payload: string | Uint8Array,
sharded: boolean
) {
if (sharded) {
SPUBLISH(redisClient, channel, payload);
} else {
redisClient.publish(channel, payload);
}
}