Skip to content

streams: implement min option for ReadableStreamBYOBReader.read(view) #50888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jan 4, 2024
Merged
15 changes: 12 additions & 3 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
@@ -492,7 +492,7 @@ added: v16.5.0
-->

* Returns: A promise fulfilled with an object:
* `value` {ArrayBuffer}
* `value` {any}
* `done` {boolean}

Requests the next chunk of data from the underlying {ReadableStream}
@@ -617,15 +617,24 @@ added: v16.5.0
{ReadableStream} is closed or rejected if the stream errors or the reader's
lock is released before the stream finishes closing.

#### `readableStreamBYOBReader.read(view)`
#### `readableStreamBYOBReader.read(view[, options])`

<!-- YAML
added: v16.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/50888
description: Added `min` option.
-->

* `view` {Buffer|TypedArray|DataView}
* `options` {Object}
* `min` {number} When set, the returned promise will only be
fulfilled as soon as `min` number of elements are available.
When not set, the promise fulfills when at least one element
is available.
* Returns: A promise fulfilled with an object:
* `value` {ArrayBuffer}
* `value` {TypedArray|DataView}
* `done` {boolean}

Requests the next chunk of data from the underlying {ReadableStream}
8 changes: 1 addition & 7 deletions lib/internal/encoding.js
Original file line number Diff line number Diff line change
@@ -47,9 +47,7 @@ const {
const {
validateString,
validateObject,
kValidateObjectAllowNullable,
kValidateObjectAllowArray,
kValidateObjectAllowFunction,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');
const binding = internalBinding('encoding_binding');
const {
@@ -393,10 +391,6 @@ const TextDecoder =
makeTextDecoderICU() :
makeTextDecoderJS();

const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
kValidateObjectAllowArray |
kValidateObjectAllowFunction;

function makeTextDecoderICU() {
const {
decode: _decode,
7 changes: 7 additions & 0 deletions lib/internal/validators.js
Original file line number Diff line number Diff line change
@@ -222,6 +222,11 @@ const kValidateObjectNone = 0;
const kValidateObjectAllowNullable = 1 << 0;
const kValidateObjectAllowArray = 1 << 1;
const kValidateObjectAllowFunction = 1 << 2;
const kValidateObjectAllowObjects = kValidateObjectAllowArray |
kValidateObjectAllowFunction;
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
kValidateObjectAllowArray |
kValidateObjectAllowFunction;

/**
* @callback validateObject
@@ -583,6 +588,8 @@ module.exports = {
kValidateObjectAllowNullable,
kValidateObjectAllowArray,
kValidateObjectAllowFunction,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
validateOneOf,
validatePlainFunction,
validatePort,
107 changes: 67 additions & 40 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ const {
SymbolAsyncIterator,
SymbolDispose,
SymbolToStringTag,
TypedArrayPrototypeGetLength,
Uint8Array,
} = primordials;

@@ -33,6 +34,7 @@ const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_STATE,
ERR_INVALID_THIS,
ERR_OUT_OF_RANGE,
},
} = require('internal/errors');

@@ -58,8 +60,8 @@ const {
validateAbortSignal,
validateBuffer,
validateObject,
kValidateObjectAllowNullable,
kValidateObjectAllowFunction,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');

const {
@@ -246,10 +248,10 @@ class ReadableStream {
* @param {UnderlyingSource} [source]
* @param {QueuingStrategy} [strategy]
*/
constructor(source = {}, strategy = kEmptyObject) {
constructor(source = kEmptyObject, strategy = kEmptyObject) {
markTransferMode(this, false, true);
if (source === null)
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
validateObject(source, 'source', kValidateObjectAllowObjects);
validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
@@ -332,7 +334,7 @@ class ReadableStream {
getReader(options = kEmptyObject) {
if (!isReadableStream(this))
throw new ERR_INVALID_THIS('ReadableStream');
validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const mode = options?.mode;

if (mode === undefined)
@@ -370,6 +372,7 @@ class ReadableStream {

// The web platform tests require that these be handled one at a
// time and in a specific order. options can be null or undefined.
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventAbort = options?.preventAbort;
const preventCancel = options?.preventCancel;
const preventClose = options?.preventClose;
@@ -412,6 +415,7 @@ class ReadableStream {
destination);
}

validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventAbort = options?.preventAbort;
const preventCancel = options?.preventCancel;
const preventClose = options?.preventClose;
@@ -456,10 +460,8 @@ class ReadableStream {
values(options = kEmptyObject) {
if (!isReadableStream(this))
throw new ERR_INVALID_THIS('ReadableStream');
validateObject(options, 'options');
const {
preventCancel = false,
} = options;
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventCancel = !!(options?.preventCancel);

// eslint-disable-next-line no-use-before-define
const reader = new ReadableStreamDefaultReader(this);
@@ -931,47 +933,62 @@ class ReadableStreamBYOBReader {

/**
* @param {ArrayBufferView} view
* @param {{
* min? : number
* }} [options]
* @returns {Promise<{
* view : ArrayBufferView,
* value : ArrayBufferView,
* done : boolean,
* }>}
*/
read(view) {
async read(view, options = kEmptyObject) {
if (!isReadableStreamBYOBReader(this))
return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');
if (!isArrayBufferView(view)) {
return PromiseReject(
new ERR_INVALID_ARG_TYPE(
'view',
[
'Buffer',
'TypedArray',
'DataView',
],
view));
throw new ERR_INVALID_ARG_TYPE(
'view',
[
'Buffer',
'TypedArray',
'DataView',
],
view,
);
}
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);

const viewByteLength = ArrayBufferViewGetByteLength(view);
const viewBuffer = ArrayBufferViewGetBuffer(view);
const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);

if (viewByteLength === 0 || viewBufferByteLength === 0) {
return PromiseReject(
new ERR_INVALID_STATE.TypeError(
'View or Viewed ArrayBuffer is zero-length or detached',
),
);
throw new ERR_INVALID_STATE.TypeError(
'View or Viewed ArrayBuffer is zero-length or detached');
}

// Supposed to assert here that the view's buffer is not
// detached, but there's no API available to use to check that.

const min = options?.min ?? 1;
if (typeof min !== 'number')
throw new ERR_INVALID_ARG_TYPE('options.min', 'number', min);
if (!NumberIsInteger(min))
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer');
if (min <= 0)
throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0');
if (!isDataView(view)) {
if (min > TypedArrayPrototypeGetLength(view)) {
throw new ERR_OUT_OF_RANGE('options.min', '<= view.length', min);
}
} else if (min > viewByteLength) {
throw new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min);
}

if (this[kState].stream === undefined) {
return PromiseReject(
new ERR_INVALID_STATE.TypeError(
'The reader is not attached to a stream'));
throw new ERR_INVALID_STATE.TypeError('The reader is not attached to a stream');
}
const readIntoRequest = new ReadIntoRequest();
readableStreamBYOBReaderRead(this, view, readIntoRequest);
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
return readIntoRequest.promise;
}

@@ -1885,7 +1902,7 @@ function readableByteStreamTee(stream) {
reading = false;
},
};
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
}

function pull1Algorithm() {
@@ -2212,7 +2229,7 @@ function readableStreamReaderGenericRelease(reader) {
reader[kState].stream = undefined;
}

function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
const {
stream,
} = reader[kState];
@@ -2225,6 +2242,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
readableByteStreamControllerPullInto(
stream[kState].controller,
view,
min,
readIntoRequest);
}

@@ -2497,7 +2515,7 @@ function readableByteStreamControllerClose(controller) {

if (pendingPullIntos.length) {
const firstPendingPullInto = pendingPullIntos[0];
if (firstPendingPullInto.bytesFilled > 0) {
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
const error = new ERR_INVALID_STATE.TypeError('Partial read');
readableByteStreamControllerError(controller, error);
throw error;
@@ -2514,7 +2532,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {

let done = false;
if (stream[kState].state === 'closed') {
desc.bytesFilled = 0;
assert(desc.bytesFilled % desc.elementSize === 0);
done = true;
}

@@ -2603,6 +2621,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
function readableByteStreamControllerPullInto(
controller,
view,
min,
readIntoRequest) {
const {
closeRequested,
@@ -2615,6 +2634,11 @@ function readableByteStreamControllerPullInto(
elementSize = view.constructor.BYTES_PER_ELEMENT;
ctor = view.constructor;
}

const minimumFill = min * elementSize;
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
assert(minimumFill % elementSize === 0);

const buffer = ArrayBufferViewGetBuffer(view);
const byteOffset = ArrayBufferViewGetByteOffset(view);
const byteLength = ArrayBufferViewGetByteLength(view);
@@ -2633,6 +2657,7 @@ function readableByteStreamControllerPullInto(
byteOffset,
byteLength,
bytesFilled: 0,
minimumFill,
elementSize,
ctor,
type: 'byob',
@@ -2720,7 +2745,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
}

function readableByteStreamControllerRespondInClosedState(controller, desc) {
assert(!desc.bytesFilled);
assert(desc.bytesFilled % desc.elementSize === 0);
if (desc.type === 'none') {
readableByteStreamControllerShiftPendingPullInto(controller);
}
@@ -2897,17 +2922,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
byteLength,
byteOffset,
bytesFilled,
minimumFill,
elementSize,
} = desc;
const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
const maxBytesToCopy = MathMin(
controller[kState].queueTotalSize,
byteLength - bytesFilled);
const maxBytesFilled = bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
if (maxAlignedBytes > currentAlignedBytes) {
assert(bytesFilled < minimumFill);
if (maxAlignedBytes >= minimumFill) {
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
ready = true;
}
@@ -2950,7 +2976,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
if (!ready) {
assert(!controller[kState].queueTotalSize);
assert(desc.bytesFilled > 0);
assert(desc.bytesFilled < elementSize);
assert(desc.bytesFilled < minimumFill);
}
return ready;
}
@@ -3006,7 +3032,7 @@ function readableByteStreamControllerRespondInReadableState(
return;
}

if (desc.bytesFilled < desc.elementSize)
if (desc.bytesFilled < desc.minimumFill)
return;

readableByteStreamControllerShiftPendingPullInto(controller);
@@ -3191,6 +3217,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
byteOffset: 0,
byteLength: autoAllocateChunkSize,
bytesFilled: 0,
minimumFill: 1,
elementSize: 1,
ctor: Uint8Array,
type: 'default',
11 changes: 10 additions & 1 deletion lib/internal/webstreams/transformstream.js
Original file line number Diff line number Diff line change
@@ -29,6 +29,12 @@ const {
kEnumerableProperty,
} = require('internal/util');

const {
validateObject,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');

const {
kDeserialize,
kTransfer,
@@ -119,10 +125,13 @@ class TransformStream {
* @param {QueuingStrategy} [readableStrategy]
*/
constructor(
transformer = null,
transformer = kEmptyObject,
writableStrategy = kEmptyObject,
readableStrategy = kEmptyObject) {
markTransferMode(this, false, true);
validateObject(transformer, 'transformer', kValidateObjectAllowObjects);
validateObject(writableStrategy, 'writableStrategy', kValidateObjectAllowObjectsAndNull);
validateObject(readableStrategy, 'readableStrategy', kValidateObjectAllowObjectsAndNull);
const readableType = transformer?.readableType;
const writableType = transformer?.writableType;
const start = transformer?.start;
Loading