Skip to content

Commit 4d08211

Browse files
committedOct 2, 2021
Wait for both reads and writes to finish before finalizing pipe
1 parent 0ec648d commit 4d08211

File tree

1 file changed

+45
-19
lines changed

1 file changed

+45
-19
lines changed
 

‎reference-implementation/lib/abstract-ops/readable-streams.js

+45-19
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
141141

142142
let shuttingDown = false;
143143

144-
// This is used to keep track of the spec's requirement that we wait for ongoing writes during shutdown.
144+
// This is used to keep track of the spec's requirement that we wait for ongoing reads and writes during shutdown.
145+
let currentRead = promiseResolvedWith(undefined);
145146
let currentWrite = promiseResolvedWith(undefined);
146147

147148
return new Promise((resolve, reject) => {
@@ -200,21 +201,21 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
200201
}
201202

202203
return transformPromiseWith(writer._readyPromise, () => {
203-
return new Promise((resolveRead, rejectRead) => {
204+
currentRead = new Promise((resolveRead, rejectRead) => {
204205
ReadableStreamDefaultReaderRead(
205206
reader,
206207
{
207208
chunkSteps: chunk => {
208-
currentWrite = transformPromiseWith(
209-
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {}
210-
);
209+
currentWrite = WritableStreamDefaultWriterWrite(writer, chunk);
210+
setPromiseIsHandledToTrue(currentWrite);
211211
resolveRead(false);
212212
},
213213
closeSteps: () => resolveRead(true),
214214
errorSteps: rejectRead
215215
}
216216
);
217217
});
218+
return currentRead;
218219
});
219220
}
220221

@@ -259,13 +260,38 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
259260
setPromiseIsHandledToTrue(pipeLoop());
260261

261262
function waitForWritesToFinish() {
262-
// Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait
263-
// for that too.
264-
const oldCurrentWrite = currentWrite;
265-
return transformPromiseWith(
266-
currentWrite,
267-
() => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined
268-
);
263+
let oldCurrentWrite;
264+
return promiseResolvedWith(check());
265+
266+
function check() {
267+
// Another write may have started while we were waiting on this currentWrite,
268+
// so we have to be sure to wait for that too.
269+
if (oldCurrentWrite !== currentWrite) {
270+
oldCurrentWrite = currentWrite;
271+
return transformPromiseWith(currentWrite, check, check);
272+
}
273+
return undefined;
274+
}
275+
}
276+
277+
function waitForReadsAndWritesToFinish() {
278+
let oldCurrentRead;
279+
let oldCurrentWrite;
280+
return promiseResolvedWith(check());
281+
282+
function check() {
283+
// Another read or write may have started while we were waiting on this currentRead or currentWrite,
284+
// so we have to be sure to wait for that too.
285+
if (oldCurrentRead !== currentRead) {
286+
oldCurrentRead = currentRead;
287+
return transformPromiseWith(currentRead, check, check);
288+
}
289+
if (oldCurrentWrite !== currentWrite) {
290+
oldCurrentWrite = currentWrite;
291+
return transformPromiseWith(currentWrite, check, check);
292+
}
293+
return undefined;
294+
}
269295
}
270296

271297
function isOrBecomesErrored(stream, promise, action) {
@@ -299,8 +325,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
299325
function doTheRest() {
300326
uponPromise(
301327
action(),
302-
() => finalize(originalIsError, originalError),
303-
newError => finalize(true, newError)
328+
() => waitForReadsAndWritesThenFinalize(originalIsError, originalError),
329+
newError => waitForReadsAndWritesThenFinalize(true, newError)
304330
);
305331
}
306332
}
@@ -311,11 +337,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
311337
}
312338
shuttingDown = true;
313339

314-
if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
315-
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error));
316-
} else {
317-
finalize(isError, error);
318-
}
340+
waitForReadsAndWritesThenFinalize(isError, error);
341+
}
342+
343+
function waitForReadsAndWritesThenFinalize(isError, error) {
344+
uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error));
319345
}
320346

321347
function finalize(isError, error) {

0 commit comments

Comments
 (0)