Skip to content

Commit 4cdfa3b

Browse files
committed
feat: initial version
0 parents  commit 4cdfa3b

19 files changed

+1988
-0
lines changed

.editorconfig

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
root = true
2+
3+
[*]
4+
end_of_line = lf
5+
insert_final_newline = true
6+
indent_style = tab
7+
charset = utf-8
8+
9+
[{package.json,*.yml}]
10+
indent_style = space
11+
indent_size = 2

.eslintignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
node_modules
2+
coverage

.eslintrc.json

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"env": {
3+
"node": true,
4+
"es6": true
5+
},
6+
"parserOptions": {
7+
"ecmaVersion": 2018,
8+
"sourceType": "script"
9+
},
10+
"extends": [
11+
"@csimi/eslint-config"
12+
]
13+
}

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
node_modules
2+
npm-debug.log*
3+
coverage
4+
.nyc_output

lib/ChunkStream/ChunkConstants.js

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
const TYPE_0 = 0x00;
2+
const TYPE_1 = 0x40;
3+
const TYPE_2 = 0x80;
4+
const TYPE_3 = 0xC0;
5+
6+
const ID_LIMIT_ONE = 64;
7+
const ID_LIMIT_TWO = 320;
8+
9+
const ID_MASK_ONE = 0x3F;
10+
const ID_MASK_TWO = 0x3E;
11+
12+
const CHUNK_SIZE = 128;
13+
const TIMESTAMP_MAX = 0xFFFFFF;
14+
15+
module.exports = {
16+
TYPE_0,
17+
TYPE_1,
18+
TYPE_2,
19+
TYPE_3,
20+
ID_LIMIT_ONE,
21+
ID_LIMIT_TWO,
22+
ID_MASK_ONE,
23+
ID_MASK_TWO,
24+
CHUNK_SIZE,
25+
TIMESTAMP_MAX,
26+
};

lib/ChunkStream/ChunkStreamDemuxer.js

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/* eslint-disable no-bitwise */
2+
3+
const { Transform } = require('stream');
4+
const { Memo } = require('amf-codec');
5+
const {
6+
TYPE_1,
7+
TYPE_2,
8+
TYPE_3,
9+
ID_LIMIT_ONE,
10+
ID_MASK_ONE,
11+
ID_MASK_TWO,
12+
CHUNK_SIZE,
13+
} = require('./ChunkConstants');
14+
const {
15+
SET_CHUNK_SIZE,
16+
} = require('../MessageTypes');
17+
18+
const decodeBasicHeader = (buf, memo) => {
19+
const byte = buf.readUInt8(memo.consume(1));
20+
const format = byte & ~ID_MASK_ONE;
21+
22+
if (byte & ID_MASK_ONE === 0) {
23+
const chunkStreamId = buf.readUInt8(memo.consume(1)) + ID_LIMIT_ONE;
24+
return [format, chunkStreamId];
25+
}
26+
else if (byte & ID_MASK_TWO === 0) {
27+
const chunkStreamId = buf.readUInt16BE(memo.consume(2)) + ID_LIMIT_ONE;
28+
return [format, chunkStreamId];
29+
}
30+
else {
31+
const chunkStreamId = byte & ID_MASK_ONE;
32+
return [format, chunkStreamId];
33+
}
34+
};
35+
36+
const decodeMessageHeader = (buf, memo, format) => {
37+
const header = {};
38+
if (format === TYPE_3) {
39+
return header;
40+
}
41+
42+
header.timestamp = buf.readUIntBE(memo.consume(3), 3);
43+
if (format === TYPE_2) {
44+
return header;
45+
}
46+
47+
header.messageBodyLength = buf.readUIntBE(memo.consume(3), 3);
48+
header.messageTypeId = buf.readUIntBE(memo.consume(1), 1);
49+
if (format === TYPE_1) {
50+
return header;
51+
}
52+
53+
header.messageStreamId = buf.readUIntLE(memo.consume(4), 4);
54+
55+
return header;
56+
};
57+
58+
class ChunkStreamDemuxer extends Transform {
59+
constructor () {
60+
super({
61+
'readableObjectMode': true,
62+
});
63+
64+
this.chunkSize = CHUNK_SIZE;
65+
this.buffers = new Map();
66+
this.cache = new Map();
67+
}
68+
69+
onControl (messageTypeId, data) {
70+
if (messageTypeId === SET_CHUNK_SIZE) {
71+
this.chunkSize = data;
72+
}
73+
}
74+
75+
_transform (data, encoding, callback) {
76+
const memo = new Memo(0);
77+
const [format, chunkStreamId] = decodeBasicHeader(data, memo);
78+
79+
const messageHeader = {
80+
...this.cache.get(chunkStreamId),
81+
...decodeMessageHeader(data, memo, format),
82+
};
83+
this.cache.set(chunkStreamId, messageHeader);
84+
85+
const chunkBuffer = this.buffers.get(chunkStreamId) || Buffer.alloc(0);
86+
const dataSize = Math.min(
87+
this.chunkSize,
88+
data.length - memo.position,
89+
messageHeader.messageBodyLength - chunkBuffer.length,
90+
);
91+
const chunkData = Buffer.concat([
92+
chunkBuffer,
93+
data.slice(memo.position, memo.skip(dataSize)),
94+
]);
95+
96+
if (chunkData.length < messageHeader.messageBodyLength) {
97+
this.buffers.set(chunkStreamId, chunkData);
98+
}
99+
else {
100+
this.buffers.set(chunkStreamId, Buffer.alloc(0));
101+
102+
if (messageHeader.messageTypeId === 1) {
103+
this.chunkSize = chunkData.readUInt32BE();
104+
}
105+
106+
this.push({
107+
chunkStreamId,
108+
...messageHeader,
109+
'messageBody': chunkData,
110+
});
111+
}
112+
113+
if (data.length > memo.position) {
114+
/* eslint-disable-next-line no-underscore-dangle */
115+
return this._transform(data.slice(memo.position), encoding, callback);
116+
}
117+
118+
return callback();
119+
}
120+
}
121+
122+
module.exports = ChunkStreamDemuxer;

lib/ChunkStream/ChunkStreamEncoder.js

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/* eslint-disable no-bitwise */
2+
3+
const { Transform } = require('stream');
4+
const {
5+
TYPE_0,
6+
TYPE_1,
7+
TYPE_2,
8+
TYPE_3,
9+
ID_LIMIT_ONE,
10+
ID_LIMIT_TWO,
11+
CHUNK_SIZE,
12+
TIMESTAMP_MAX,
13+
} = require('./ChunkConstants');
14+
15+
const createTimestamp = () => {
16+
let memo = 0;
17+
return () => {
18+
const now = Date.now();
19+
const timestamp = now - memo;
20+
21+
if (timestamp < TIMESTAMP_MAX) {
22+
return timestamp;
23+
}
24+
else {
25+
memo = now;
26+
return 0;
27+
}
28+
};
29+
};
30+
31+
const encodeBasicHeader = (format, chunkStreamId) => {
32+
if (chunkStreamId < ID_LIMIT_ONE) {
33+
return Buffer.from([chunkStreamId | format]);
34+
}
35+
else if (chunkStreamId < ID_LIMIT_TWO) {
36+
return Buffer.from([format, chunkStreamId - ID_LIMIT_ONE]);
37+
}
38+
else {
39+
const buf = Buffer.allocUnsafe(3);
40+
41+
buf.writeUInt8(format | 1, 0);
42+
buf.writeUInt16BE(chunkStreamId - ID_LIMIT_ONE, 1);
43+
44+
return buf;
45+
}
46+
};
47+
48+
const getMessageHeaderSize = (format) => {
49+
if (format === TYPE_0) {
50+
return 11;
51+
}
52+
else if (format === TYPE_1) {
53+
return 7;
54+
}
55+
else if (format === TYPE_2) {
56+
return 3;
57+
}
58+
return 0;
59+
};
60+
61+
const encodeMessageHeader = (format, timestamp, messageBodyLength, messageTypeId, messageStreamId) => {
62+
const buf = Buffer.allocUnsafe(getMessageHeaderSize(format));
63+
if (format === TYPE_3) {
64+
return buf;
65+
}
66+
67+
buf.writeUIntBE(timestamp, 0, 3);
68+
if (format === TYPE_2) {
69+
return buf;
70+
}
71+
72+
buf.writeUIntBE(messageBodyLength, 3, 3);
73+
buf.writeUIntBE(messageTypeId, 6, 1);
74+
if (format === TYPE_1) {
75+
return buf;
76+
}
77+
78+
buf.writeUIntLE(messageStreamId, 7, 4);
79+
80+
return buf;
81+
};
82+
83+
class ChunkStreamEncoder extends Transform {
84+
constructor (chunkStreamId = 2) {
85+
super({
86+
'writableObjectMode': true,
87+
});
88+
89+
this.chunkStreamId = chunkStreamId;
90+
this.timestamp = createTimestamp();
91+
}
92+
93+
_transform ({ messageTypeId, messageStreamId, messageBody }, encoding, callback) {
94+
let format = TYPE_0;
95+
const timestamp = this.timestamp();
96+
for (let position = 0; position < messageBody.length; position += CHUNK_SIZE) {
97+
const basicHeader = encodeBasicHeader(
98+
format,
99+
this.chunkStreamId,
100+
);
101+
const messageHeader = encodeMessageHeader(
102+
format,
103+
timestamp,
104+
messageBody.length,
105+
messageTypeId,
106+
messageStreamId,
107+
);
108+
109+
format = TYPE_3;
110+
const chunk = messageBody.slice(
111+
position,
112+
Math.min(messageBody.length, position + CHUNK_SIZE),
113+
);
114+
115+
this.push(Buffer.concat([
116+
basicHeader,
117+
messageHeader,
118+
chunk,
119+
]));
120+
}
121+
122+
callback();
123+
}
124+
}
125+
126+
module.exports = ChunkStreamEncoder;

lib/ChunkStream/ChunkStreamFilter.js

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
const { Transform } = require('stream');
2+
3+
class ChunkStreamFilter extends Transform {
4+
constructor (chunkStreamId) {
5+
super({
6+
'objectMode': true,
7+
});
8+
9+
this.chunkStreamId = chunkStreamId;
10+
}
11+
12+
_transform (chunk, encoding, callback) {
13+
if (chunk.chunkStreamId === this.chunkStreamId) {
14+
this.push(chunk);
15+
}
16+
callback();
17+
}
18+
}
19+
20+
module.exports = ChunkStreamFilter;

0 commit comments

Comments
 (0)