Skip to content

fix(server): do not wait for response.write for response.end #2383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/whole-planes-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
'@whatwg-node/node-fetch': patch
'@whatwg-node/fetch': patch
'@whatwg-node/server': patch
---

Some implementations like `compression` npm package do not implement `response.write(data, callback)` signature, but whatwg-node/server waits for it to finish the response stream.
Then it causes the response stream hangs when the compression package takes the stream over when the response data is larger than its threshold.

It is actually a bug in `compression` package;
[expressjs/compression#46](https://github.com/expressjs/compression/issues/46)
But since it is a common mistake, we prefer to workaround this on our end.

Now after calling `response.write`, it no longer uses callback but first it checks the result;

if it is `true`, it means stream is drained and we can call `response.end` immediately.
else if it is `false`, it means the stream is not drained yet, so we can wait for the `drain` event to call `response.end`.
97 changes: 55 additions & 42 deletions packages/node-fetch/src/FormData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,55 +94,68 @@ export function getStreamFromFormData(
formData: FormData,
boundary = '---',
): PonyfillReadableStream<Uint8Array> {
const entries: [string, string | PonyfillFile][] = [];
let entriesIterator: FormDataIterator<[string, FormDataEntryValue]>;
let sentInitialHeader = false;
return new PonyfillReadableStream<Buffer>({
start: controller => {
formData.forEach((value, key) => {
if (!sentInitialHeader) {
controller.enqueue(Buffer.from(`--${boundary}\r\n`));
sentInitialHeader = true;
let currentAsyncIterator: AsyncIterator<Uint8Array> | undefined;
let hasBefore = false;
function handleNextEntry(controller: ReadableStreamController<Buffer>) {
const { done, value } = entriesIterator.next();
if (done) {
controller.enqueue(Buffer.from(`\r\n--${boundary}--\r\n`));
return controller.close();
}
if (hasBefore) {
controller.enqueue(Buffer.from(`\r\n--${boundary}\r\n`));
}
if (value) {
const [key, blobOrString] = value;
if (typeof blobOrString === 'string') {
controller.enqueue(Buffer.from(`Content-Disposition: form-data; name="${key}"\r\n\r\n`));
controller.enqueue(Buffer.from(blobOrString));
} else {
let filenamePart = '';
if (blobOrString.name) {
filenamePart = `; filename="${blobOrString.name}"`;
}
entries.push([key, value as any]);
});
if (!sentInitialHeader) {
controller.enqueue(Buffer.from(`--${boundary}--\r\n`));
controller.close();
controller.enqueue(
Buffer.from(`Content-Disposition: form-data; name="${key}"${filenamePart}\r\n`),
);
controller.enqueue(
Buffer.from(`Content-Type: ${blobOrString.type || 'application/octet-stream'}\r\n\r\n`),
);
const entryStream = blobOrString.stream();
// @ts-expect-error - ReadableStream is async iterable
currentAsyncIterator = entryStream[Symbol.asyncIterator]();
}
hasBefore = true;
}
}
return new PonyfillReadableStream<Buffer>({
start: () => {
entriesIterator = formData.entries();
},
pull: async controller => {
const entry = entries.shift();
if (entry) {
const [key, value] = entry;
if (typeof value === 'string') {
controller.enqueue(Buffer.from(`Content-Disposition: form-data; name="${key}"\r\n\r\n`));
controller.enqueue(Buffer.from(value));
} else {
let filenamePart = '';
if (value.name) {
filenamePart = `; filename="${value.name}"`;
pull: controller => {
if (!sentInitialHeader) {
sentInitialHeader = true;
return controller.enqueue(Buffer.from(`--${boundary}\r\n`));
}
if (currentAsyncIterator) {
return currentAsyncIterator.next().then(({ done, value }) => {
if (done) {
currentAsyncIterator = undefined;
}
controller.enqueue(
Buffer.from(`Content-Disposition: form-data; name="${key}"${filenamePart}\r\n`),
);
controller.enqueue(
Buffer.from(`Content-Type: ${value.type || 'application/octet-stream'}\r\n\r\n`),
);
const entryStream = value.stream();
for await (const chunk of entryStream) {
controller.enqueue(chunk);
if (value) {
return controller.enqueue(value);
} else {
Comment on lines +147 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add type checking for stream values

After fixing the currentAsyncIterator type, we should ensure that the values being enqueued are of the expected type.

           if (value) {
-            return controller.enqueue(value);
+            return controller.enqueue(Buffer.isBuffer(value) ? value : Buffer.from(value));
           } else {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (value) {
return controller.enqueue(value);
} else {
if (value) {
return controller.enqueue(Buffer.isBuffer(value)
? value
: Buffer.from(value));
} else {
🤖 Prompt for AI Agents (early access)
In packages/node-fetch/src/FormData.ts around lines 146 to 148, the code
enqueues a value without verifying its type. To fix this, add a type check
before enqueuing to ensure the value matches the expected stream type. This
prevents runtime errors by only enqueuing valid stream values.

return handleNextEntry(controller);
}
}
if (entries.length === 0) {
controller.enqueue(Buffer.from(`\r\n--${boundary}--\r\n`));
controller.close();
} else {
controller.enqueue(Buffer.from(`\r\n--${boundary}\r\n`));
}
} else {
controller.enqueue(Buffer.from(`\r\n--${boundary}--\r\n`));
controller.close();
});
}
return handleNextEntry(controller);
},
cancel: err => {
entriesIterator?.return?.(err);
currentAsyncIterator?.return?.(err);
},
});
}
Expand Down
3 changes: 2 additions & 1 deletion packages/node-fetch/src/TransformStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Transform } from 'node:stream';
import { PonyfillReadableStream } from './ReadableStream.js';
import { endStream } from './utils.js';
import { PonyfillWritableStream } from './WritableStream.js';

export class PonyfillTransformStream<I = any, O = any> implements TransformStream<I, O> {
Expand All @@ -19,7 +20,7 @@ export class PonyfillTransformStream<I = any, O = any> implements TransformStrea
transform.destroy(reason);
},
terminate() {
transform.end();
endStream(transform);
},
get desiredSize() {
return transform.writableLength;
Expand Down
50 changes: 13 additions & 37 deletions packages/node-fetch/src/WritableStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Writable } from 'node:stream';
import { fakePromise } from './utils.js';
import { fakeRejectPromise } from '@whatwg-node/promise-helpers';
import { endStream, fakePromise, safeWrite } from './utils.js';

export class PonyfillWritableStream<W = any> implements WritableStream<W> {
writable: Writable;
Expand Down Expand Up @@ -78,36 +79,20 @@ export class PonyfillWritableStream<W = any> implements WritableStream<W> {
// no-op
},
write(chunk: W) {
const promise = fakePromise();
if (chunk == null) {
return fakePromise();
return promise;
}
return new Promise<void>((resolve, reject) => {
writable.write(chunk, (err: Error | null | undefined) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
return promise.then(() => safeWrite(chunk, writable));
},
close() {
if (!writable.errored && writable.closed) {
return fakePromise();
}
return new Promise<void>((resolve, reject) => {
if (writable.errored) {
reject(writable.errored);
} else {
writable.end((err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
}
});
if (writable.errored) {
return fakeRejectPromise(writable.errored);
}
return fakePromise().then(() => endStream(writable));
},
abort(reason) {
return new Promise<void>(resolve => {
Expand All @@ -122,19 +107,10 @@ export class PonyfillWritableStream<W = any> implements WritableStream<W> {
if (!this.writable.errored && this.writable.closed) {
return fakePromise();
}
return new Promise<void>((resolve, reject) => {
if (this.writable.errored) {
reject(this.writable.errored);
} else {
this.writable.end((err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
}
});
if (this.writable.errored) {
return fakeRejectPromise(this.writable.errored);
}
return fakePromise().then(() => endStream(this.writable));
}

abort(reason: any): Promise<void> {
Expand Down
16 changes: 10 additions & 6 deletions packages/node-fetch/src/fetchNodeHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ import { request as httpRequest, STATUS_CODES } from 'node:http';
import { request as httpsRequest } from 'node:https';
import { PassThrough, Readable } from 'node:stream';
import { createBrotliDecompress, createGunzip, createInflate, createInflateRaw } from 'node:zlib';
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
import { PonyfillRequest } from './Request.js';
import { PonyfillResponse } from './Response.js';
import { PonyfillURL } from './URL.js';
import {
endStream,
getHeadersObj,
isNodeReadable,
safeWrite,
shouldRedirect,
wrapIncomingMessageWithPassthrough,
} from './utils.js';
Expand Down Expand Up @@ -56,6 +59,7 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
});
}

nodeRequest.once('error', reject);
nodeRequest.once('response', nodeResponse => {
let outputStream: PassThrough | undefined;
const contentEncoding = nodeResponse.headers['content-encoding'];
Expand Down Expand Up @@ -125,12 +129,13 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
});
resolve(ponyfillResponse);
});
nodeRequest.once('error', reject);

if (fetchRequest['_buffer'] != null) {
nodeRequest.write(fetchRequest['_buffer']);
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
nodeRequest.end(null, null, null);
handleMaybePromise(
() => safeWrite(fetchRequest['_buffer'], nodeRequest),
() => endStream(nodeRequest),
reject,
);
} else {
const nodeReadable = (
fetchRequest.body != null
Expand All @@ -142,8 +147,7 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
if (nodeReadable) {
nodeReadable.pipe(nodeRequest);
} else {
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
nodeRequest.end(null, null, null);
endStream(nodeRequest);
}
}
} catch (e) {
Expand Down
15 changes: 15 additions & 0 deletions packages/node-fetch/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,18 @@ export function wrapIncomingMessageWithPassthrough({
.catch(onError);
return passThrough;
}

export function endStream(stream: { end: () => void }) {
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
return stream.end(null, null, null);
}

export function safeWrite(
chunk: any,
stream: { write: (chunk: any) => boolean; once: (event: string, listener: () => void) => void },
) {
const result = stream.write(chunk);
if (!result) {
return new Promise<void>(resolve => stream.once('drain', resolve));
}
}
67 changes: 35 additions & 32 deletions packages/promise-helpers/tests/handleMaybePromise.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,42 +64,45 @@ describe('promise-helpers', () => {
},
);

it.each(cases)('when fake value is falsy', ({ input, output }) => {
expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise(undefined) : undefined),
res => (output === 'fake' ? fakePromise(undefined) : res),
),
).toBe(undefined);
it.each(cases)(
'when fake value is falsy; input: $input output: $output',
({ input, output }) => {
expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise(undefined) : undefined),
res => (output === 'fake' ? fakePromise(undefined) : res),
),
).toBe(undefined);

expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise(null) : null),
res => (output === 'fake' ? fakePromise(null) : res),
),
).toBe(null);
expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise(null) : null),
res => (output === 'fake' ? fakePromise(null) : res),
),
).toBe(null);

expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise('') : ''),
res => (output === 'fake' ? fakePromise('') : res),
),
).toBe('');
expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise('') : ''),
res => (output === 'fake' ? fakePromise('') : res),
),
).toBe('');

expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise(false) : false),
res => (output === 'fake' ? fakePromise(false) : res),
),
).toBe(false);
expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise(false) : false),
res => (output === 'fake' ? fakePromise(false) : res),
),
).toBe(false);

expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise(0) : 0),
res => (output === 'fake' ? fakePromise(0) : res),
),
).toBe(0);
});
expect(
handleMaybePromise(
() => (input === 'fake' ? fakePromise(0) : 0),
res => (output === 'fake' ? fakePromise(0) : res),
),
).toBe(0);
},
);
});
describe('finally', () => {
describe('with promises', () => {
Expand Down
10 changes: 6 additions & 4 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@
"tslib": "^2.6.3"
},
"devDependencies": {
"@hapi/hapi": "^21.3.12",
"@hapi/hapi": "21.4.0",
"@types/compression": "1.7.5",
"@types/express": "5.0.1",
"@types/koa": "^2.15.0",
"@types/koa": "2.15.0",
"@types/node": "22.15.17",
"compression": "1.8.0",
"express": "5.1.0",
"fastify": "5.3.2",
"form-data": "^4.0.2",
"koa": "^3.0.0",
"form-data": "4.0.2",
"koa": "3.0.0",
"react": "19.1.0",
"react-dom": "19.1.0"
},
Expand Down
Loading
Loading