Skip to content

Decouple SSE #824

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lerna.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"version": "0.0.0",
"packages": [
"packages/*"
],
"publishConfig": {
"directory": "dist"
}
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"@extensionengine/tce-scorm": "github:ExtensionEngine/tce-scorm#85d39a8d73f303f",
"@mdi/font": "^4.8.95",
"@ungap/global-this": "^0.4.0",
"JSONStream": "^1.3.5",
"auto-bind": "^3.0.0",
"aws-sdk": "^2.814.0",
"axios": "^0.21.1",
Expand Down Expand Up @@ -94,6 +93,7 @@
"is-url": "^1.2.2",
"joycon": "^2.2.5",
"jquery": "^3.5.0",
"JSONStream": "^1.3.5",
"jsonwebtoken": "^8.0.0",
"jszip": "^3.2.2",
"listr": "^0.14.3",
Expand Down Expand Up @@ -188,6 +188,7 @@
"eslint-plugin-vuetify": "^1.0.0-beta.6",
"exports-loader": "^0.7.0",
"imports-loader": "^0.8.0",
"lerna": "^3.22.1",
"nodemon": "^2.0.3",
"poi": "^12.10.3",
"rewrite-lockfile": "^1.0.1",
Expand Down
12 changes: 12 additions & 0 deletions packages/sse/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';

module.exports = {
root: true,
extends: '@extensionengine/eslint-config',
overrides: [{
files: ['src/lib/**'],
parserOptions: {
sourceType: 'script'
}
}]
};
11 changes: 11 additions & 0 deletions packages/sse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# `sse`

> TODO: description

## Usage

```
const sse = require('sse');

// TODO: DEMONSTRATE API
```
60 changes: 60 additions & 0 deletions packages/sse/lib/channels.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
'use strict';

const { EventEmitter } = require('events');

const channels = new Map();

class Channel extends EventEmitter {
constructor(id) {
super();
this._id = id;
this._connections = new Map();
}

get id() {
return this._id;
}

add(connection) {
this._connections.set(connection.id, connection);
connection.prependOnceListener('close', () => this.remove(connection));
return this;
}

remove(connection) {
this._connections.delete(connection.id);
if (!this._connections.size) this.emit('close');
return this;
}

send(_event, _data) {
this._connections.forEach(connection => connection.send(...arguments));
return this;
}
}

module.exports = {
getChannel,
addConnection,
removeConnection
};

function getChannel(channelId) {
channelId = String(channelId);
return channels.get(channelId) || new Channel('\0dummy');
}

function addConnection(channelId, connection) {
channelId = String(channelId);
if (channels.has(channelId)) {
return channels.get(channelId).add(connection);
}
const channel = new Channel(channelId);
channel.prependOnceListener('close', () => channels.delete(channelId));
channels.set(channelId, channel);
return channel.add(connection);
}

function removeConnection(channelId, connection) {
return getChannel(channelId).remove(connection);
}
129 changes: 129 additions & 0 deletions packages/sse/lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
'use strict';

const channels = require('./channels');
const cuid = require('cuid');
const { EventEmitter } = require('events');

const SSE_TIMEOUT_MARGIN = 0.10;
const SSE_DEFAULT_TIMEOUT = 60000; /* ms */
const SSE_HEADERS = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-transform',
Connection: 'keep-alive',
'Transfer-Encoding': 'identity',
// NOTE: This controls nginx proxy buffering
// https://nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
'X-Accel-Buffering': 'no'
};

const hasProp = (obj, prop) => Object.prototype.hasOwnProperty.call(obj, prop);

class SSEConnection extends EventEmitter {
constructor(res) {
super();
this._id = null;
this._res = res;
this._req = res.req;
this._lastEventId = 0;
this._heartbeat = null;
this.initialize();
}

static create(res) {
return new this(res);
}

get id() {
return this._id;
}

get socket() {
return this._res.socket;
}

get query() {
return this._req.query;
}

header(name) {
return this._req.header(name);
}

get timeout() {
const connectionTimeout = parseInt(this.header('connection-timeout'), 10);
const timeout = connectionTimeout || SSE_DEFAULT_TIMEOUT;
return timeout * (1 - SSE_TIMEOUT_MARGIN);
}

initialize() {
// Set socket properties.
this.socket.setTimeout(0);
this.socket.setNoDelay(true);
this.socket.setKeepAlive(true);
// Gracefully handle termination.
this._req.once('close', () => this.close());
// Set event stream headers.
this._res.writeHead(200, SSE_HEADERS);
this._res.flushHeaders();
// Ensure connection id is correctly set.
this._id = cuid.isCuid(this.query.id) ? this.query.id : cuid();
// Setup heartbeat interval.
if (this.timeout > 0) {
this._heartbeat = setInterval(() => this.write(':ping'), this.timeout);
}
// Start stream.
return this.write(':ok');
}

write(payload = '') {
return this._res.write(`${payload}\n\n`);
}

send(event, data = '') {
const id = this._lastEventId += 1;
this.emit('data', { id, event, data });
const json = JSON.stringify(data);
const payload = [
`id: ${id}`,
`event: ${event}`,
`data: ${json}`
].join('\n');
this.write(payload);
if (hasProp(this.query, 'debug')) {
this.debug({ id, type: event, data });
}
return this;
}

debug(data = '') {
const json = JSON.stringify(data);
this.write(`data: ${json}`);
return this;
}

close() {
if (this._heartbeat) clearInterval(this._heartbeat);
this._res.end();
this.emit('close');
}

static channel(channelId) {
return channels.getChannel(channelId);
}

join(channelId) {
return channels.addConnection(channelId, this);
}

leave(channelId) {
return channels.removeConnection(channelId, this);
}
}

module.exports = SSEConnection;
module.exports.middleware = middleware;

function middleware(_req, res, next) {
res.sse = SSEConnection.create(res);
next();
}
Loading