Skip to content

Commit 5b44ba1

Browse files
committed
[feature] Add support for Blob
Closes #2206
1 parent 0d1b5e6 commit 5b44ba1

9 files changed

+775
-93
lines changed

doc/ws.md

+11-7
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,11 @@ does nothing if `type` is not one of `'close'`, `'error'`, `'message'`, or
466466
- {String}
467467

468468
A string indicating the type of binary data being transmitted by the connection.
469-
This should be one of "nodebuffer", "arraybuffer" or "fragments". Defaults to
470-
"nodebuffer". Type "fragments" will emit the array of fragments as received from
471-
the sender, without copyfull concatenation, which is useful for the performance
472-
of binary protocols transferring large messages with multiple fragments.
469+
This should be one of "nodebuffer", "arraybuffer", "blob", or "fragments".
470+
Defaults to "nodebuffer". Type "fragments" will emit the array of fragments as
471+
received from the sender, without copyfull concatenation, which is useful for
472+
the performance of binary protocols transferring large messages with multiple
473+
fragments.
473474

474475
### websocket.bufferedAmount
475476

@@ -538,7 +539,8 @@ is a noop if the ready state is `CONNECTING` or `CLOSED`.
538539

539540
### websocket.ping([data[, mask]][, callback])
540541

541-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
542+
- `data`
543+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
542544
data to send in the ping frame.
543545
- `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to
544546
`true` when `websocket` is not a server client.
@@ -550,7 +552,8 @@ Send a ping. This method throws an error if the ready state is `CONNECTING`.
550552

551553
### websocket.pong([data[, mask]][, callback])
552554

553-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
555+
- `data`
556+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
554557
data to send in the pong frame.
555558
- `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to
556559
`true` when `websocket` is not a server client.
@@ -588,7 +591,8 @@ only removes listeners added with
588591

589592
### websocket.send(data[, options][, callback])
590593

591-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
594+
- `data`
595+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
592596
data to send. `Object` values are only supported if they conform to the
593597
requirements of [`Buffer.from()`][]. If those constraints are not met, a
594598
`TypeError` is thrown.

lib/constants.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
'use strict';
22

3+
const BINARY_TYPES = ['nodebuffer', 'arraybuffer', 'fragments'];
4+
const hasBlob = typeof Blob !== 'undefined';
5+
6+
if (hasBlob) BINARY_TYPES.push('blob');
7+
38
module.exports = {
4-
BINARY_TYPES: ['nodebuffer', 'arraybuffer', 'fragments'],
9+
BINARY_TYPES,
510
EMPTY_BUFFER: Buffer.alloc(0),
611
GUID: '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
12+
hasBlob,
713
kForOnEventAttribute: Symbol('kIsForOnEventAttribute'),
814
kListener: Symbol('kListener'),
915
kStatusCode: Symbol('status-code'),

lib/receiver.js

+2
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,8 @@ class Receiver extends Writable {
559559
data = concat(fragments, messageLength);
560560
} else if (this._binaryType === 'arraybuffer') {
561561
data = toArrayBuffer(concat(fragments, messageLength));
562+
} else if (this._binaryType === 'blob') {
563+
data = new Blob(fragments);
562564
} else {
563565
data = fragments;
564566
}

lib/sender.js

+151-46
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ const { Duplex } = require('stream');
66
const { randomFillSync } = require('crypto');
77

88
const PerMessageDeflate = require('./permessage-deflate');
9-
const { EMPTY_BUFFER } = require('./constants');
10-
const { isValidStatusCode } = require('./validation');
9+
const { EMPTY_BUFFER, kWebSocket, NOOP } = require('./constants');
10+
const { isBlob, isValidStatusCode } = require('./validation');
1111
const { mask: applyMask, toBuffer } = require('./buffer-util');
1212

1313
const kByteLength = Symbol('kByteLength');
@@ -16,6 +16,10 @@ const RANDOM_POOL_SIZE = 8 * 1024;
1616
let randomPool;
1717
let randomPoolPointer = RANDOM_POOL_SIZE;
1818

19+
const DEFAULT = 0;
20+
const DEFLATING = 1;
21+
const GET_BLOB_DATA = 2;
22+
1923
/**
2024
* HyBi Sender implementation.
2125
*/
@@ -42,8 +46,10 @@ class Sender {
4246
this._compress = false;
4347

4448
this._bufferedBytes = 0;
45-
this._deflating = false;
4649
this._queue = [];
50+
this._state = DEFAULT;
51+
this.onerror = NOOP;
52+
this[kWebSocket] = undefined;
4753
}
4854

4955
/**
@@ -210,7 +216,7 @@ class Sender {
210216
rsv1: false
211217
};
212218

213-
if (this._deflating) {
219+
if (this._state !== DEFAULT) {
214220
this.enqueue([this.dispatch, buf, false, options, cb]);
215221
} else {
216222
this.sendFrame(Sender.frame(buf, options), cb);
@@ -232,6 +238,9 @@ class Sender {
232238
if (typeof data === 'string') {
233239
byteLength = Buffer.byteLength(data);
234240
readOnly = false;
241+
} else if (isBlob(data)) {
242+
byteLength = data.size;
243+
readOnly = false;
235244
} else {
236245
data = toBuffer(data);
237246
byteLength = data.length;
@@ -253,7 +262,13 @@ class Sender {
253262
rsv1: false
254263
};
255264

256-
if (this._deflating) {
265+
if (isBlob(data)) {
266+
if (this._state !== DEFAULT) {
267+
this.enqueue([this.getBlobData, data, false, options, cb]);
268+
} else {
269+
this.getBlobData(data, false, options, cb);
270+
}
271+
} else if (this._state !== DEFAULT) {
257272
this.enqueue([this.dispatch, data, false, options, cb]);
258273
} else {
259274
this.sendFrame(Sender.frame(data, options), cb);
@@ -275,6 +290,9 @@ class Sender {
275290
if (typeof data === 'string') {
276291
byteLength = Buffer.byteLength(data);
277292
readOnly = false;
293+
} else if (isBlob(data)) {
294+
byteLength = data.size;
295+
readOnly = false;
278296
} else {
279297
data = toBuffer(data);
280298
byteLength = data.length;
@@ -296,7 +314,13 @@ class Sender {
296314
rsv1: false
297315
};
298316

299-
if (this._deflating) {
317+
if (isBlob(data)) {
318+
if (this._state !== DEFAULT) {
319+
this.enqueue([this.getBlobData, data, false, options, cb]);
320+
} else {
321+
this.getBlobData(data, false, options, cb);
322+
}
323+
} else if (this._state !== DEFAULT) {
300324
this.enqueue([this.dispatch, data, false, options, cb]);
301325
} else {
302326
this.sendFrame(Sender.frame(data, options), cb);
@@ -330,6 +354,9 @@ class Sender {
330354
if (typeof data === 'string') {
331355
byteLength = Buffer.byteLength(data);
332356
readOnly = false;
357+
} else if (isBlob(data)) {
358+
byteLength = data.size;
359+
readOnly = false;
333360
} else {
334361
data = toBuffer(data);
335362
byteLength = data.length;
@@ -357,40 +384,94 @@ class Sender {
357384

358385
if (options.fin) this._firstFragment = true;
359386

360-
if (perMessageDeflate) {
361-
const opts = {
362-
[kByteLength]: byteLength,
363-
fin: options.fin,
364-
generateMask: this._generateMask,
365-
mask: options.mask,
366-
maskBuffer: this._maskBuffer,
367-
opcode,
368-
readOnly,
369-
rsv1
370-
};
371-
372-
if (this._deflating) {
373-
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
387+
const opts = {
388+
[kByteLength]: byteLength,
389+
fin: options.fin,
390+
generateMask: this._generateMask,
391+
mask: options.mask,
392+
maskBuffer: this._maskBuffer,
393+
opcode,
394+
readOnly,
395+
rsv1
396+
};
397+
398+
if (isBlob(data)) {
399+
if (this._state !== DEFAULT) {
400+
this.enqueue([this.getBlobData, data, this._compress, opts, cb]);
374401
} else {
375-
this.dispatch(data, this._compress, opts, cb);
402+
this.getBlobData(data, this._compress, opts, cb);
376403
}
404+
} else if (this._state !== DEFAULT) {
405+
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
377406
} else {
378-
this.sendFrame(
379-
Sender.frame(data, {
380-
[kByteLength]: byteLength,
381-
fin: options.fin,
382-
generateMask: this._generateMask,
383-
mask: options.mask,
384-
maskBuffer: this._maskBuffer,
385-
opcode,
386-
readOnly,
387-
rsv1: false
388-
}),
389-
cb
390-
);
407+
this.dispatch(data, this._compress, opts, cb);
391408
}
392409
}
393410

411+
/**
412+
* Gets the contents of a blob as binary data.
413+
*
414+
* @param {Blob} blob The blob
415+
* @param {Boolean} [compress=false] Specifies whether or not to compress
416+
* the data
417+
* @param {Object} options Options object
418+
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
419+
* FIN bit
420+
* @param {Function} [options.generateMask] The function used to generate the
421+
* masking key
422+
* @param {Boolean} [options.mask=false] Specifies whether or not to mask
423+
* `data`
424+
* @param {Buffer} [options.maskBuffer] The buffer used to store the masking
425+
* key
426+
* @param {Number} options.opcode The opcode
427+
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
428+
* modified
429+
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
430+
* RSV1 bit
431+
* @param {Function} [cb] Callback
432+
* @private
433+
*/
434+
getBlobData(blob, compress, options, cb) {
435+
this._bufferedBytes += options[kByteLength];
436+
this._state = GET_BLOB_DATA;
437+
438+
blob
439+
.arrayBuffer()
440+
.then((arrayBuffer) => {
441+
if (this._socket.destroyed) {
442+
const err = new Error(
443+
'The socket was closed while the blob was being read'
444+
);
445+
446+
//
447+
// `callCallbacks` is called in the next tick to ensure that errors
448+
// that might be thrown in the callbacks behave like errors thrown
449+
// outside the promise chain.
450+
//
451+
process.nextTick(callCallbacks, this, err, cb);
452+
return;
453+
}
454+
455+
this._bufferedBytes -= options[kByteLength];
456+
const data = toBuffer(arrayBuffer);
457+
458+
if (!compress) {
459+
this._state = DEFAULT;
460+
this.sendFrame(Sender.frame(data, options), cb);
461+
this.dequeue();
462+
} else {
463+
this.dispatch(data, compress, options, cb);
464+
}
465+
})
466+
.catch((err) => {
467+
//
468+
// `onError` is called in the next tick for the same reason that
469+
// `callCallbacks` above is.
470+
//
471+
process.nextTick(onError, this, err, cb);
472+
});
473+
}
474+
394475
/**
395476
* Dispatches a message.
396477
*
@@ -423,27 +504,19 @@ class Sender {
423504
const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
424505

425506
this._bufferedBytes += options[kByteLength];
426-
this._deflating = true;
507+
this._state = DEFLATING;
427508
perMessageDeflate.compress(data, options.fin, (_, buf) => {
428509
if (this._socket.destroyed) {
429510
const err = new Error(
430511
'The socket was closed while data was being compressed'
431512
);
432513

433-
if (typeof cb === 'function') cb(err);
434-
435-
for (let i = 0; i < this._queue.length; i++) {
436-
const params = this._queue[i];
437-
const callback = params[params.length - 1];
438-
439-
if (typeof callback === 'function') callback(err);
440-
}
441-
514+
callCallbacks(this, err, cb);
442515
return;
443516
}
444517

445518
this._bufferedBytes -= options[kByteLength];
446-
this._deflating = false;
519+
this._state = DEFAULT;
447520
options.readOnly = false;
448521
this.sendFrame(Sender.frame(buf, options), cb);
449522
this.dequeue();
@@ -456,7 +529,7 @@ class Sender {
456529
* @private
457530
*/
458531
dequeue() {
459-
while (!this._deflating && this._queue.length) {
532+
while (this._state === DEFAULT && this._queue.length) {
460533
const params = this._queue.shift();
461534

462535
this._bufferedBytes -= params[3][kByteLength];
@@ -495,3 +568,35 @@ class Sender {
495568
}
496569

497570
module.exports = Sender;
571+
572+
/**
573+
* Calls queued callbacks with an error.
574+
*
575+
* @param {Sender} sender The `Sender` instance
576+
* @param {Error} err The error to call the callbacks with
577+
* @param {Function} [cb] The first callback
578+
* @private
579+
*/
580+
function callCallbacks(sender, err, cb) {
581+
if (typeof cb === 'function') cb(err);
582+
583+
for (let i = 0; i < sender._queue.length; i++) {
584+
const params = sender._queue[i];
585+
const callback = params[params.length - 1];
586+
587+
if (typeof callback === 'function') callback(err);
588+
}
589+
}
590+
591+
/**
592+
* Handles a `Sender` error.
593+
*
594+
* @param {Sender} sender The `Sender` instance
595+
* @param {Error} err The error
596+
* @param {Function} [cb] The first pending callback
597+
* @private
598+
*/
599+
function onError(sender, err, cb) {
600+
callCallbacks(sender, err, cb);
601+
sender.onerror(err);
602+
}

0 commit comments

Comments
 (0)