Skip to content

Commit 7afb269

Browse files
committed
- Broadcast service events without connected sockets via Redis
- Document base websocket server class
1 parent c1bba2f commit 7afb269

File tree

9 files changed

+114
-62
lines changed

9 files changed

+114
-62
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
77

8-
## Version 0.1.3 - 2024-xx-xx
8+
## Version 0.1.3 - 2024-01-11
99

1010
### Fixed
1111

12-
- tbd
12+
- Broadcast service events without connected sockets via Redis
13+
- Document base websocket server class
1314

1415
## Version 0.1.2 - 2024-01-08
1516

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@cap-js-community/websocket",
3-
"version": "0.1.3",
3+
"version": "0.2.0",
44
"description": "WebSocket adapter for CDS",
55
"homepage": "https://cap.cloud.sap/",
66
"engines": {

src/adapter/redis.js

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,43 @@ const cds = require("@sap/cds");
66
const LOG = cds.log("websocket/redis");
77

88
class RedisAdapter {
9-
constructor(server, channel, options) {
9+
constructor(server, prefix, options) {
1010
this.server = server;
11-
this.channel = channel;
11+
this.prefix = prefix;
1212
this.options = options;
1313
}
1414

1515
async setup() {
1616
this.client = await redis.createPrimaryClientAndConnect();
1717
}
1818

19-
async on() {
19+
async on(service) {
2020
if (!this.client) {
2121
return;
2222
}
2323
try {
24-
await this.client.subscribe(this.channel);
25-
this.client.on("message", (channel, message) => {
26-
if (channel === this.channel) {
27-
this.server.wss.broadcastAll(message);
24+
const channel = this.prefix + service;
25+
await this.client.subscribe(channel, async (message, messageChannel) => {
26+
try {
27+
if (messageChannel === channel) {
28+
await this.server.broadcast(service, message);
29+
}
30+
} catch (err) {
31+
LOG?.error(err);
2832
}
2933
});
3034
} catch (err) {
3135
LOG?.error(err);
3236
}
3337
}
3438

35-
async emit(message) {
39+
async emit(service, message) {
3640
if (!this.client) {
3741
return;
3842
}
3943
try {
40-
await this.client.publish(this.channel, message);
44+
const channel = this.prefix + service;
45+
await this.client.publish(channel, message);
4146
} catch (err) {
4247
LOG?.error(err);
4348
}

src/index.js

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,13 @@ function serveWebSocketService(socketServer, service, options) {
108108
if (["websocket", "ws"].includes(endpoint.kind)) {
109109
const servicePath = normalizeServicePath(endpoint.path, options.path);
110110
try {
111+
bindServiceEvents(socketServer, servicePath, service);
111112
socketServer.service(servicePath, (socket) => {
112113
try {
113114
socket.setup();
114115
bindServiceDefaults(socket, service);
115116
bindServiceOperations(socket, service);
116117
bindServiceEntities(socket, service);
117-
bindServiceEvents(socket, service);
118118
emitConnect(socket, service);
119119
} catch (err) {
120120
LOG?.error(err);
@@ -128,9 +128,16 @@ function serveWebSocketService(socketServer, service, options) {
128128
}
129129
}
130130

131-
async function emitConnect(socket, service) {
132-
if (service.operations(WebSocketAction.Connect).length) {
133-
await processEvent(socket, service, undefined, WebSocketAction.Connect);
131+
function bindServiceEvents(socketServer, servicePath, service) {
132+
for (const event of service.events()) {
133+
service.on(event, async (req) => {
134+
const localEventName = serviceLocalName(service, event.name);
135+
try {
136+
await socketServer.broadcast(servicePath, localEventName, req.data, null, true);
137+
} catch (err) {
138+
LOG?.error(err);
139+
}
140+
});
134141
}
135142
}
136143

@@ -190,20 +197,9 @@ function bindServiceEntities(socket, service) {
190197
}
191198
}
192199

193-
function bindServiceEvents(socket, service) {
194-
for (const event of service.events()) {
195-
service.on(event, async (req) => {
196-
const localEventName = serviceLocalName(service, event.name);
197-
await processEmit(socket, service, localEventName, req.data);
198-
});
199-
}
200-
}
201-
202-
async function processEmit(socket, service, event, data) {
203-
try {
204-
socket.emit(event, data);
205-
} catch (err) {
206-
LOG?.error(err);
200+
async function emitConnect(socket, service) {
201+
if (service.operations(WebSocketAction.Connect).length) {
202+
await processEvent(socket, service, undefined, WebSocketAction.Connect);
207203
}
208204
}
209205

src/socket/base.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,37 @@ const cds = require("@sap/cds");
44
const cookie = require("cookie");
55
const crypto = require("crypto");
66

7+
/**
8+
* Base class for a websocket server
9+
*/
710
class SocketServer {
11+
/**
12+
* Constructor for websocket server
13+
* @param server HTTP server from express app
14+
* @param path Protocol path, e.g. '/ws'
15+
*/
816
constructor(server, path) {
917
this.id = crypto.randomUUID();
1018
this.server = server;
1119
this.path = path;
1220
cds.ws = null;
1321
}
1422

23+
/**
24+
* Setup websocket server with async operations
25+
* @returns {Promise<void>} Promise when setup is completed
26+
*/
1527
async setup() {}
1628

29+
/**
30+
* Connect a service to websocket
31+
* @param service service path, e.g. "/chat"
32+
* @param connected Callback function to be called on every websocket connection passing socket functions (i.e. ws.on("connection", connected))
33+
*/
1734
service(service, connected) {
1835
connected &&
1936
connected({
37+
service,
2038
socket: null,
2139
setup: () => {},
2240
context: () => {},
@@ -27,6 +45,21 @@ class SocketServer {
2745
});
2846
}
2947

48+
/**
49+
* Broadcast to all websocket clients
50+
* @param service service path, e.g. "/chat"
51+
* @param event Event name
52+
* @param data Data object
53+
* @param socket Broadcast client to be excluded
54+
* @param multiple Broadcast across multiple websocket servers
55+
* @returns {Promise<void>} Promise when broadcasting completed
56+
*/
57+
async broadcast(service, event, data, socket, multiple) {}
58+
59+
/**
60+
* Mock the HTTP response object and make available at request.res
61+
* @param request HTTP request
62+
*/
3063
static mockResponse(request) {
3164
// Mock response (not available in websocket, CDS middlewares need it)
3265
const res = request.res ?? {};
@@ -67,6 +100,10 @@ class SocketServer {
67100
request.res = res;
68101
}
69102

103+
/**
104+
* Apply the authorization cookie to authorization header for local authorization testing in mocked auth scenario
105+
* @param request HTTP request
106+
*/
70107
static applyAuthCookie(request) {
71108
// Apply cookie to authorization header
72109
if (["mocked"].includes(cds.env.requires.auth.kind) && !request.headers.authorization && request.headers.cookie) {

src/socket/socket.io.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class SocketIOServer extends SocketServer {
3636
DEBUG?.("Disconnected", socket.id);
3737
});
3838
const serviceSocket = {
39+
service,
3940
socket,
4041
setup: () => {
4142
this._enforceAuth(socket);
@@ -69,6 +70,15 @@ class SocketIOServer extends SocketServer {
6970
});
7071
}
7172

73+
async broadcast(service, event, data, socket, multiple) {
74+
if (socket) {
75+
socket.broadcast.emit(event, data);
76+
} else {
77+
const io = this.io.of(service);
78+
io.emit(event, data);
79+
}
80+
}
81+
7282
async _applyAdapter() {
7383
try {
7484
const adapterImpl = cds.env.requires?.websocket?.adapter?.impl;

src/socket/ws.js

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,30 +11,17 @@ class SocketWSServer extends SocketServer {
1111
constructor(server, path) {
1212
super(server, path);
1313
this.wss = new WebSocket.Server({ server });
14-
this.wss.broadcast = (message, socket) => {
15-
this.wss.clients.forEach((client) => {
16-
if (client !== socket && client.readyState === WebSocket.OPEN) {
17-
client.send(message);
18-
}
19-
});
20-
};
21-
this.wss.broadcastAll = (message) => {
22-
this.wss.clients.forEach((client) => {
23-
if (client.readyState === WebSocket.OPEN) {
24-
client.send(message);
25-
}
26-
});
27-
};
28-
this.adapter = null;
2914
cds.ws = this.wss;
3015
cds.wss = this.wss;
16+
this.adapter = null;
3117
}
3218

3319
async setup() {
3420
await this._applyAdapter();
3521
}
3622

3723
service(service, connected) {
24+
this.adapter?.on(service);
3825
this.wss.on("connection", async (ws, request) => {
3926
ws.request = request;
4027
if (ws.request?.url !== `${this.path}${service}`) {
@@ -51,6 +38,7 @@ class SocketWSServer extends SocketServer {
5138
try {
5239
connected &&
5340
connected({
41+
service,
5442
socket: ws,
5543
setup: () => {
5644
this._enforceAuth(ws);
@@ -66,18 +54,16 @@ class SocketWSServer extends SocketServer {
6654
},
6755
on: (event, callback) => {
6856
ws.on("message", async (message) => {
69-
let payload;
57+
let payload = {};
7058
try {
7159
payload = JSON.parse(message);
7260
} catch (_) {
7361
// ignore
7462
}
7563
try {
7664
if (payload?.event === event) {
65+
await this.adapter?.emit(service, message);
7766
await callback(payload.data);
78-
if (this.adapter) {
79-
await this.adapter.emit(message);
80-
}
8167
}
8268
} catch (err) {
8369
LOG?.error(err);
@@ -93,13 +79,7 @@ class SocketWSServer extends SocketServer {
9379
);
9480
},
9581
broadcast: (event, data) => {
96-
this.wss.broadcast(
97-
JSON.stringify({
98-
event,
99-
data,
100-
}),
101-
ws,
102-
);
82+
this.broadcast(service, event, data, ws, true);
10383
},
10484
disconnect() {
10585
ws.disconnect();
@@ -112,6 +92,28 @@ class SocketWSServer extends SocketServer {
11292
});
11393
}
11494

95+
async broadcast(service, event, data, socket, multiple) {
96+
const clients = [];
97+
this.wss.clients.forEach((client) => {
98+
if (
99+
client.readyState === WebSocket.OPEN &&
100+
client !== socket &&
101+
client.request?.url === `${this.path}${service}`
102+
) {
103+
clients.push(client);
104+
}
105+
});
106+
if (clients.length > 0 || multiple) {
107+
const message = !data ? event : JSON.stringify({ event, data });
108+
clients.forEach((client) => {
109+
client.send(message);
110+
});
111+
if (multiple) {
112+
await this.adapter?.emit(service, message);
113+
}
114+
}
115+
}
116+
115117
async _applyAdapter() {
116118
try {
117119
const adapterImpl = cds.env.requires?.websocket?.adapter?.impl;
@@ -120,11 +122,10 @@ class SocketWSServer extends SocketServer {
120122
if (cds.env.requires.websocket?.adapter?.options) {
121123
options = { ...options, ...cds.env.requires.websocket?.adapter?.options };
122124
}
123-
const channel = options?.key ?? "websocket";
125+
const prefix = options?.key ?? "websocket";
124126
this.adapterFactory = require(`../adapter/${adapterImpl}`);
125-
this.adapter = new this.adapterFactory(this, channel, options);
126-
await this.adapter.setup();
127-
await this.adapter.on();
127+
this.adapter = new this.adapterFactory(this, prefix, options);
128+
await this.adapter?.setup();
128129
}
129130
} catch (err) {
130131
LOG?.error(err);

test/base.test.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ describe("Base", () => {
1919
socketServer.setup();
2020
socketServer.service("test", connected);
2121
expect(socket).toEqual({
22+
service: "test",
2223
socket: null,
2324
setup: expect.any(Function),
2425
context: expect.any(Function),

test/redis_ws.test.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ describe("Redis", () => {
3939
expect(redis.createClient).toHaveBeenCalledWith({ url: "uri" });
4040
expect(redis.client.connect).toHaveBeenCalledWith();
4141
expect(redis.client.on).toHaveBeenNthCalledWith(1, "error", expect.any(Function));
42-
expect(redis.client.on).toHaveBeenNthCalledWith(2, "message", expect.any(Function));
43-
expect(redis.client.publish).toHaveBeenCalledWith("websocket", expect.any(Buffer));
42+
expect(redis.client.subscribe).toHaveBeenNthCalledWith(1, "websocket/chat", expect.any(Function));
43+
expect(redis.client.subscribe).toHaveBeenNthCalledWith(2, "websocket/main", expect.any(Function));
44+
expect(redis.client.publish).toHaveBeenCalledWith("websocket/chat", expect.any(Buffer));
4445
});
4546
});

0 commit comments

Comments
 (0)