Skip to content

Commit 16f312c

Browse files
authored
use whatwg streams (#103)
* use whatwg streams * could not use optional changing in v12 * could not use fs/promise in v12
1 parent 5ba554e commit 16f312c

File tree

5 files changed

+89
-75
lines changed

5 files changed

+89
-75
lines changed

Diff for: README.md

+6-48
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ npm install fetch-blob
2222
- internal Buffer.from was replaced with TextEncoder/Decoder
2323
- internal buffers was replaced with Uint8Arrays
2424
- CommonJS was replaced with ESM
25-
- The node stream returned by calling `blob.stream()` was replaced with a simple generator function that yields Uint8Array (Breaking change)
26-
(Read "Differences from other blobs" for more info.)
27-
28-
All of this changes have made it dependency free of any core node modules, so it would be possible to just import it using http-import from a CDN without any bundling
25+
- The node stream returned by calling `blob.stream()` was replaced with whatwg streams
26+
- (Read "Differences from other blobs" for more info.)
2927

3028
</details>
3129

@@ -36,48 +34,12 @@ npm install fetch-blob
3634
- This blob version is more arbitrary, it can be constructed with blob parts that isn't a instance of itself
3735
it has to look and behave as a blob to be accepted as a blob part.
3836
- The benefit of this is that you can create other types of blobs that don't contain any internal data that has to be read in other ways, such as the `BlobDataItem` created in `from.js` that wraps a file path into a blob-like item and read lazily (nodejs plans to [implement this][fs-blobs] as well)
39-
- The `blob.stream()` is the most noticeable differences. It returns a AsyncGeneratorFunction that yields Uint8Arrays
40-
41-
The reasoning behind `Blob.prototype.stream()` is that NodeJS readable stream
42-
isn't spec compatible with whatwg streams and we didn't want to import the hole whatwg stream polyfill for node
43-
or browserify NodeJS streams for the browsers and picking any flavor over the other. So we decided to opted out
44-
of any stream and just implement the bear minium of what both streams have in common which is the asyncIterator
45-
that both yields Uint8Array. this is the most isomorphic way with the use of `for-await-of` loops.
46-
It would be redundant to convert anything to whatwg streams and than convert it back to
47-
node streams since you work inside of Node.
48-
It will probably stay like this until nodejs get native support for whatwg<sup>[1][https://github.com/nodejs/whatwg-stream]</sup> streams and whatwg stream add the node
49-
equivalent for `Readable.from(iterable)`<sup>[2](https://github.com/whatwg/streams/issues/1018)</sup>
50-
51-
But for now if you really need a Node Stream then you can do so using this transformation
37+
- The `blob.stream()` is the most noticeable differences. It returns a WHATWG stream now. to keep it as a node stream you would have to do:
38+
5239
```js
5340
import {Readable} from 'stream'
5441
const stream = Readable.from(blob.stream())
5542
```
56-
But if you don't need it to be a stream then you can just use the asyncIterator part of it that is isomorphic.
57-
```js
58-
for await (const chunk of blob.stream()) {
59-
console.log(chunk) // uInt8Array
60-
}
61-
```
62-
If you need to make some feature detection to fix this different behavior
63-
```js
64-
if (Blob.prototype.stream?.constructor?.name === 'AsyncGeneratorFunction') {
65-
// not spec compatible, monkey patch it...
66-
// (Alternative you could extend the Blob and use super.stream())
67-
let orig = Blob.prototype.stream
68-
Blob.prototype.stream = function () {
69-
const iterator = orig.call(this)
70-
return new ReadableStream({
71-
async pull (ctrl) {
72-
const next = await iterator.next()
73-
return next.done ? ctrl.close() : ctrl.enqueue(next.value)
74-
}
75-
})
76-
}
77-
}
78-
```
79-
Possible feature whatwg version: `ReadableStream.from(iterator)`
80-
It's also possible to delete this method and instead use `.slice()` and `.arrayBuffer()` since it has both a public and private stream method
8143
</details>
8244

8345
## Usage
@@ -100,12 +62,8 @@ const blob = new Blob(['hello, world'])
10062
await blob.text()
10163
await blob.arrayBuffer()
10264
for await (let chunk of blob.stream()) { ... }
103-
104-
// turn the async iterator into a node stream
105-
stream.Readable.from(blob.stream())
106-
107-
// turn the async iterator into a whatwg stream (feature)
108-
globalThis.ReadableStream.from(blob.stream())
65+
blob.stream().getReader().read()
66+
blob.stream().getReader({mode: 'byob'}).read(view)
10967
```
11068

11169
### Blob part backed up by filesystem

Diff for: from.js

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
import {statSync, createReadStream} from 'fs';
2-
import {stat} from 'fs/promises';
1+
import {statSync, createReadStream, promises as fs} from 'fs';
32
import {basename} from 'path';
43
import File from './file.js';
54
import Blob from './index.js';
65
import {MessageChannel} from 'worker_threads';
76

7+
const {stat} = fs;
8+
89
const DOMException = globalThis.DOMException || (() => {
910
const port = new MessageChannel().port1
1011
const ab = new ArrayBuffer(0)
@@ -86,12 +87,10 @@ class BlobDataItem {
8687
if (mtimeMs > this.lastModified) {
8788
throw new DOMException('The requested file could not be read, typically due to permission problems that have occurred after a reference to a file was acquired.', 'NotReadableError');
8889
}
89-
if (this.size) {
90-
yield * createReadStream(this.#path, {
91-
start: this.#start,
92-
end: this.#start + this.size - 1
93-
});
94-
}
90+
yield * createReadStream(this.#path, {
91+
start: this.#start,
92+
end: this.#start + this.size - 1
93+
});
9594
}
9695

9796
get [Symbol.toStringTag]() {

Diff for: index.js

+63-15
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,43 @@
1+
2+
// TODO (jimmywarting): in the feature use conditional loading with top level await (requires 14.x)
3+
// Node has recently added whatwg stream into core, want to use that instead when it becomes avalible.
4+
5+
import * as stream from 'web-streams-polyfill/dist/ponyfill.es2018.js'
6+
7+
const ReadableStream = globalThis.ReadableStream || stream.ReadableStream
8+
const ByteLengthQueuingStrategy = globalThis.ByteLengthQueuingStrategy || stream.ReadableStream
9+
10+
/** @typedef {import('buffer').Blob} NodeBlob} */
11+
12+
// Fix buffer.Blob's missing stream implantation
13+
import('buffer').then(m => {
14+
if (m.Blob && !m.Blob.prototype.stream) {
15+
m.Blob.prototype.stream = function name(params) {
16+
let position = 0;
17+
const blob = this;
18+
const stratergy = new ByteLengthQueuingStrategy({ highWaterMark: POOL_SIZE });
19+
20+
return new ReadableStream({
21+
type: "bytes",
22+
async pull(ctrl) {
23+
const chunk = blob.slice(position, Math.min(blob.size, position + POOL_SIZE));
24+
const buffer = await chunk.arrayBuffer();
25+
position += buffer.byteLength;
26+
ctrl.enqueue(new Uint8Array(buffer))
27+
28+
if (position === blob.size) {
29+
ctrl.close()
30+
}
31+
}
32+
}, stratergy)
33+
}
34+
}
35+
}, () => {})
36+
137
// 64 KiB (same size chrome slice theirs blob into Uint8array's)
238
const POOL_SIZE = 65536;
339

4-
/** @param {(Blob | Uint8Array)[]} parts */
40+
/** @param {(Blob | NodeBlob | Uint8Array)[]} parts */
541
async function * toIterator (parts, clone = true) {
642
for (let part of parts) {
743
if ('stream' in part) {
@@ -20,6 +56,7 @@ async function * toIterator (parts, clone = true) {
2056
yield part;
2157
}
2258
} else {
59+
/* c8 ignore start */
2360
// For blobs that have arrayBuffer but no stream method (nodes buffer.Blob)
2461
let position = 0;
2562
while (position !== part.size) {
@@ -28,6 +65,7 @@ async function * toIterator (parts, clone = true) {
2865
position += buffer.byteLength;
2966
yield new Uint8Array(buffer);
3067
}
68+
/* c8 ignore end */
3169
}
3270
}
3371
}
@@ -116,6 +154,11 @@ export default class Blob {
116154
* @return {Promise<ArrayBuffer>}
117155
*/
118156
async arrayBuffer() {
157+
// Easier way... Just a unnecessary overhead
158+
// const view = new Uint8Array(this.size);
159+
// await this.stream().getReader({mode: 'byob'}).read(view);
160+
// return view.buffer;
161+
119162
const data = new Uint8Array(this.size);
120163
let offset = 0;
121164
for await (const chunk of toIterator(this.#parts, false)) {
@@ -126,14 +169,17 @@ export default class Blob {
126169
return data.buffer;
127170
}
128171

129-
/**
130-
* The Blob stream() implements partial support of the whatwg stream
131-
* by only being async iterable.
132-
*
133-
* @returns {AsyncGenerator<Uint8Array>}
134-
*/
135-
async * stream() {
136-
yield * toIterator(this.#parts, true);
172+
stream() {
173+
const it = toIterator(this.#parts, true);
174+
const stratergy = new ByteLengthQueuingStrategy({ highWaterMark: POOL_SIZE });
175+
176+
return new ReadableStream({
177+
type: "bytes",
178+
async pull(ctrl) {
179+
const chunk = await it.next();
180+
chunk.done ? ctrl.close() : ctrl.enqueue(chunk.value);
181+
}
182+
}, stratergy)
137183
}
138184

139185
/**
@@ -157,6 +203,11 @@ export default class Blob {
157203
let added = 0;
158204

159205
for (const part of parts) {
206+
// don't add the overflow to new blobParts
207+
if (added >= span) {
208+
break;
209+
}
210+
160211
const size = ArrayBuffer.isView(part) ? part.byteLength : part.size;
161212
if (relativeStart && size <= relativeStart) {
162213
// Skip the beginning and change the relative
@@ -174,11 +225,6 @@ export default class Blob {
174225
}
175226
blobParts.push(chunk);
176227
relativeStart = 0; // All next sequential parts should start at 0
177-
178-
// don't add the overflow to new blobParts
179-
if (added >= span) {
180-
break;
181-
}
182228
}
183229
}
184230

@@ -195,7 +241,9 @@ export default class Blob {
195241

196242
static [Symbol.hasInstance](object) {
197243
return (
198-
typeof object?.constructor === 'function' &&
244+
object &&
245+
typeof object === 'object' &&
246+
typeof object.constructor === 'function' &&
199247
(
200248
typeof object.stream === 'function' ||
201249
typeof object.arrayBuffer === 'function'

Diff for: package.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,8 @@
7575
"type": "paypal",
7676
"url": "https://paypal.me/jimmywarting"
7777
}
78-
]
78+
],
79+
"dependencies": {
80+
"web-streams-polyfill": "^3.0.3"
81+
}
7982
}

Diff for: test.js

+9-3
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ test('blob part backed up by filesystem', async t => {
160160
test('Reading after modified should fail', async t => {
161161
const blob = blobFromSync('./LICENSE');
162162
await new Promise(resolve => {
163-
setTimeout(resolve, 100);
163+
setTimeout(resolve, 500);
164164
});
165165
fs.closeSync(fs.openSync('./LICENSE', 'a'));
166166
const error = await t.throwsAsync(blob.text());
@@ -174,7 +174,7 @@ test('Reading after modified should fail', async t => {
174174
// The lastModifiedDate is deprecated and removed from spec
175175
t.false('lastModifiedDate' in file);
176176
const mod = file.lastModified - Date.now();
177-
t.true(mod <= 0 && mod >= -100); // Close to tolerance: 0.100ms
177+
t.true(mod <= 0 && mod >= -500); // Close to tolerance: 0.500ms
178178
});
179179

180180
test('Reading file after modified should fail', async t => {
@@ -241,7 +241,7 @@ test('Parts are immutable', async t => {
241241
test('Blobs are immutable', async t => {
242242
const buf = new Uint8Array([97]);
243243
const blob = new Blob([buf]);
244-
const chunk = await blob.stream().next();
244+
const chunk = await blob.stream().getReader().read();
245245
t.is(chunk.value[0], 97);
246246
chunk.value[0] = 98;
247247
t.is(await blob.text(), 'a');
@@ -344,3 +344,9 @@ test('new File() throws with too few args', t => {
344344
message: 'Failed to construct \'File\': 2 arguments required, but only 0 present.'
345345
});
346346
});
347+
348+
test('can slice zero sized blobs', async t => {
349+
const blob = new Blob();
350+
const txt = await blob.slice(0, 0).text();
351+
t.is(txt, '');
352+
});

0 commit comments

Comments
 (0)