-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpresence-service.js
More file actions
95 lines (81 loc) · 2.88 KB
/
presence-service.js
File metadata and controls
95 lines (81 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
const logger = require('./logger').getLogger('PRESNC');
const redis = require('./redis-cli');
const { getUserSocket } = require('./message-service');
let pubsubMap = new Map();
exports.handleMessage = (data, ws) => {
if (data.type === 'publish') { // {type: 'publish', status: 'online/offline/busy'}
logger.info(`Received PUBLISH from ${ws.userId}: ${data.status}`);
if (data.status === 'typing') {
let subscriber = getUserSocket(data.to);
if (subscriber) {
subscriber.send(JSON.stringify({
type: 'notify',
subscriptionId: ws.userId,
status: data.status
}));
}
} else {
redis.updatePresence(ws.userId, data.status);
notifyAll(data, ws.userId);
}
} else if (data.type === 'subscribe') { // {type: 'subscribe, subscriptionId: 'chaunsa@university.com'}
logger.info(`Received SUBSCRIBE from ${ws.userId} for ${data.subscriptionId}`);
addSubscriber(data.subscriptionId, ws);
notifyFromCache(data, ws);
} else {
logger.error(`Unknown type received: ${data.type}`);
}
}
exports.handleClose = (ws) => {
const ts = new Date().toISOString();
redis.updatePresence(ws.userId, `offline#${ts}`);
notifyAll({ status: 'offline', timestamp: ts }, ws.userId);
removeSubscriber(ws);
}
function addSubscriber(subscriptionId, ws) {
if (!pubsubMap.has(subscriptionId)) {
pubsubMap.set(subscriptionId, new Set());
}
pubsubMap.get(subscriptionId).add(ws);
}
// Implement efficient solution for removing subscriber
// by keeping a user-subscription map
function removeSubscriber(ws) {
for (let subscriberSet of pubsubMap.values())
subscriberSet.delete(ws);
}
function getSubscribers(subscriptionId) {
if (pubsubMap.has(subscriptionId))
return Array.from(pubsubMap.get(subscriptionId));
return [];
}
async function notifyFromCache(data, ws) {
const notification = {
type: 'notify',
subscriptionId: data.subscriptionId,
};
const status = await redis.getPresence(data.subscriptionId);
if (!status) {
notification.status = 'offline';
} else if (status.startsWith('offline')) {
let [_, ts] = status.split('#');
notification.status = 'offline';
notification.timestamp = ts;
} else {
notification.status = status;
}
ws.send(JSON.stringify(notification));
}
function notifyAll(data, subscriptionId) {
let subscribers = getSubscribers(subscriptionId);
for (let subscriber of subscribers) {
const notification = {
type: 'notify',
subscriptionId: subscriptionId,
status: data.status
};
if (data.timestamp)
notification.timestamp = data.timestamp;
subscriber.send(JSON.stringify(notification));
}
}