diff --git a/streams/buffer.ts b/streams/buffer.ts index a84f452b06df..ba0c59fa9169 100644 --- a/streams/buffer.ts +++ b/streams/buffer.ts @@ -24,9 +24,16 @@ const DEFAULT_CHUNK_SIZE = 16_640; export class Buffer { #buf: Uint8Array; // contents are the bytes buf[off : len(buf)] #off = 0; // read at buf[off], write at buf[buf.byteLength] + #startedPromise = Promise.withResolvers(); + #startedBool = false; #readable: ReadableStream = new ReadableStream({ type: "bytes", - pull: (controller) => { + pull: async (controller) => { + if (!this.#startedBool) { + await this.#startedPromise.promise; + this.#startedBool = true; + } + const view = new Uint8Array(controller.byobRequest!.view!.buffer); if (this.empty()) { // Buffer is empty, reset to recover space. @@ -37,7 +44,9 @@ export class Buffer { } const nread = copy(this.#buf.subarray(this.#off), view); this.#off += nread; - controller.byobRequest!.respond(nread); + if (nread !== 0) { + controller.byobRequest!.respond(nread); + } }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, }); @@ -51,6 +60,7 @@ export class Buffer { write: (chunk) => { const m = this.#grow(chunk.byteLength); copy(chunk, this.#buf, m); + this.#startedPromise.resolve(undefined); }, }); @@ -61,7 +71,13 @@ export class Buffer { /** Constructs a new instance. */ constructor(ab?: ArrayBufferLike | ArrayLike) { - this.#buf = ab === undefined ? new Uint8Array(0) : new Uint8Array(ab); + if (ab === undefined) { + this.#buf = new Uint8Array(0); + } else { + this.#buf = new Uint8Array(ab); + this.#startedBool = true; + this.#startedPromise.resolve(undefined); + } } /** Returns a slice holding the unread portion of the buffer.