Skip to content

Commit 9527e8f

Browse files
fix(server): do not wait for response.write for response.end (#2383)
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 953c840 commit 9527e8f

14 files changed

+307
-191
lines changed

.changeset/whole-planes-mate.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
'@whatwg-node/node-fetch': patch
3+
'@whatwg-node/fetch': patch
4+
'@whatwg-node/server': patch
5+
---
6+
7+
Some implementations like `compression` npm package do not implement `response.write(data, callback)` signature, but whatwg-node/server waits for it to finish the response stream.
8+
Then it causes the response stream hangs when the compression package takes the stream over when the response data is larger than its threshold.
9+
10+
It is actually a bug in `compression` package;
11+
[expressjs/compression#46](https://github.com/expressjs/compression/issues/46)
12+
But since it is a common mistake, we prefer to workaround this on our end.
13+
14+
Now after calling `response.write`, it no longer uses callback but first it checks the result;
15+
16+
if it is `true`, it means stream is drained and we can call `response.end` immediately.
17+
else if it is `false`, it means the stream is not drained yet, so we can wait for the `drain` event to call `response.end`.

packages/node-fetch/src/FormData.ts

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -94,55 +94,68 @@ export function getStreamFromFormData(
9494
formData: FormData,
9595
boundary = '---',
9696
): PonyfillReadableStream<Uint8Array> {
97-
const entries: [string, string | PonyfillFile][] = [];
97+
let entriesIterator: FormDataIterator<[string, FormDataEntryValue]>;
9898
let sentInitialHeader = false;
99-
return new PonyfillReadableStream<Buffer>({
100-
start: controller => {
101-
formData.forEach((value, key) => {
102-
if (!sentInitialHeader) {
103-
controller.enqueue(Buffer.from(`--${boundary}\r\n`));
104-
sentInitialHeader = true;
99+
let currentAsyncIterator: AsyncIterator<Uint8Array> | undefined;
100+
let hasBefore = false;
101+
function handleNextEntry(controller: ReadableStreamController<Buffer>) {
102+
const { done, value } = entriesIterator.next();
103+
if (done) {
104+
controller.enqueue(Buffer.from(`\r\n--${boundary}--\r\n`));
105+
return controller.close();
106+
}
107+
if (hasBefore) {
108+
controller.enqueue(Buffer.from(`\r\n--${boundary}\r\n`));
109+
}
110+
if (value) {
111+
const [key, blobOrString] = value;
112+
if (typeof blobOrString === 'string') {
113+
controller.enqueue(Buffer.from(`Content-Disposition: form-data; name="${key}"\r\n\r\n`));
114+
controller.enqueue(Buffer.from(blobOrString));
115+
} else {
116+
let filenamePart = '';
117+
if (blobOrString.name) {
118+
filenamePart = `; filename="${blobOrString.name}"`;
105119
}
106-
entries.push([key, value as any]);
107-
});
108-
if (!sentInitialHeader) {
109-
controller.enqueue(Buffer.from(`--${boundary}--\r\n`));
110-
controller.close();
120+
controller.enqueue(
121+
Buffer.from(`Content-Disposition: form-data; name="${key}"${filenamePart}\r\n`),
122+
);
123+
controller.enqueue(
124+
Buffer.from(`Content-Type: ${blobOrString.type || 'application/octet-stream'}\r\n\r\n`),
125+
);
126+
const entryStream = blobOrString.stream();
127+
// @ts-expect-error - ReadableStream is async iterable
128+
currentAsyncIterator = entryStream[Symbol.asyncIterator]();
111129
}
130+
hasBefore = true;
131+
}
132+
}
133+
return new PonyfillReadableStream<Buffer>({
134+
start: () => {
135+
entriesIterator = formData.entries();
112136
},
113-
pull: async controller => {
114-
const entry = entries.shift();
115-
if (entry) {
116-
const [key, value] = entry;
117-
if (typeof value === 'string') {
118-
controller.enqueue(Buffer.from(`Content-Disposition: form-data; name="${key}"\r\n\r\n`));
119-
controller.enqueue(Buffer.from(value));
120-
} else {
121-
let filenamePart = '';
122-
if (value.name) {
123-
filenamePart = `; filename="${value.name}"`;
137+
pull: controller => {
138+
if (!sentInitialHeader) {
139+
sentInitialHeader = true;
140+
return controller.enqueue(Buffer.from(`--${boundary}\r\n`));
141+
}
142+
if (currentAsyncIterator) {
143+
return currentAsyncIterator.next().then(({ done, value }) => {
144+
if (done) {
145+
currentAsyncIterator = undefined;
124146
}
125-
controller.enqueue(
126-
Buffer.from(`Content-Disposition: form-data; name="${key}"${filenamePart}\r\n`),
127-
);
128-
controller.enqueue(
129-
Buffer.from(`Content-Type: ${value.type || 'application/octet-stream'}\r\n\r\n`),
130-
);
131-
const entryStream = value.stream();
132-
for await (const chunk of entryStream) {
133-
controller.enqueue(chunk);
147+
if (value) {
148+
return controller.enqueue(value);
149+
} else {
150+
return handleNextEntry(controller);
134151
}
135-
}
136-
if (entries.length === 0) {
137-
controller.enqueue(Buffer.from(`\r\n--${boundary}--\r\n`));
138-
controller.close();
139-
} else {
140-
controller.enqueue(Buffer.from(`\r\n--${boundary}\r\n`));
141-
}
142-
} else {
143-
controller.enqueue(Buffer.from(`\r\n--${boundary}--\r\n`));
144-
controller.close();
152+
});
145153
}
154+
return handleNextEntry(controller);
155+
},
156+
cancel: err => {
157+
entriesIterator?.return?.(err);
158+
currentAsyncIterator?.return?.(err);
146159
},
147160
});
148161
}

packages/node-fetch/src/TransformStream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Transform } from 'node:stream';
22
import { PonyfillReadableStream } from './ReadableStream.js';
3+
import { endStream } from './utils.js';
34
import { PonyfillWritableStream } from './WritableStream.js';
45

56
export class PonyfillTransformStream<I = any, O = any> implements TransformStream<I, O> {
@@ -19,7 +20,7 @@ export class PonyfillTransformStream<I = any, O = any> implements TransformStrea
1920
transform.destroy(reason);
2021
},
2122
terminate() {
22-
transform.end();
23+
endStream(transform);
2324
},
2425
get desiredSize() {
2526
return transform.writableLength;

packages/node-fetch/src/WritableStream.ts

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Writable } from 'node:stream';
2-
import { fakePromise } from './utils.js';
2+
import { fakeRejectPromise } from '@whatwg-node/promise-helpers';
3+
import { endStream, fakePromise, safeWrite } from './utils.js';
34

45
export class PonyfillWritableStream<W = any> implements WritableStream<W> {
56
writable: Writable;
@@ -78,36 +79,20 @@ export class PonyfillWritableStream<W = any> implements WritableStream<W> {
7879
// no-op
7980
},
8081
write(chunk: W) {
82+
const promise = fakePromise();
8183
if (chunk == null) {
82-
return fakePromise();
84+
return promise;
8385
}
84-
return new Promise<void>((resolve, reject) => {
85-
writable.write(chunk, (err: Error | null | undefined) => {
86-
if (err) {
87-
reject(err);
88-
} else {
89-
resolve();
90-
}
91-
});
92-
});
86+
return promise.then(() => safeWrite(chunk, writable));
9387
},
9488
close() {
9589
if (!writable.errored && writable.closed) {
9690
return fakePromise();
9791
}
98-
return new Promise<void>((resolve, reject) => {
99-
if (writable.errored) {
100-
reject(writable.errored);
101-
} else {
102-
writable.end((err: Error | null) => {
103-
if (err) {
104-
reject(err);
105-
} else {
106-
resolve();
107-
}
108-
});
109-
}
110-
});
92+
if (writable.errored) {
93+
return fakeRejectPromise(writable.errored);
94+
}
95+
return fakePromise().then(() => endStream(writable));
11196
},
11297
abort(reason) {
11398
return new Promise<void>(resolve => {
@@ -122,19 +107,10 @@ export class PonyfillWritableStream<W = any> implements WritableStream<W> {
122107
if (!this.writable.errored && this.writable.closed) {
123108
return fakePromise();
124109
}
125-
return new Promise<void>((resolve, reject) => {
126-
if (this.writable.errored) {
127-
reject(this.writable.errored);
128-
} else {
129-
this.writable.end((err: Error | null) => {
130-
if (err) {
131-
reject(err);
132-
} else {
133-
resolve();
134-
}
135-
});
136-
}
137-
});
110+
if (this.writable.errored) {
111+
return fakeRejectPromise(this.writable.errored);
112+
}
113+
return fakePromise().then(() => endStream(this.writable));
138114
}
139115

140116
abort(reason: any): Promise<void> {

packages/node-fetch/src/fetchNodeHttp.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ import { request as httpRequest, STATUS_CODES } from 'node:http';
22
import { request as httpsRequest } from 'node:https';
33
import { PassThrough, Readable } from 'node:stream';
44
import { createBrotliDecompress, createGunzip, createInflate, createInflateRaw } from 'node:zlib';
5+
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
56
import { PonyfillRequest } from './Request.js';
67
import { PonyfillResponse } from './Response.js';
78
import { PonyfillURL } from './URL.js';
89
import {
10+
endStream,
911
getHeadersObj,
1012
isNodeReadable,
13+
safeWrite,
1114
shouldRedirect,
1215
wrapIncomingMessageWithPassthrough,
1316
} from './utils.js';
@@ -56,6 +59,7 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
5659
});
5760
}
5861

62+
nodeRequest.once('error', reject);
5963
nodeRequest.once('response', nodeResponse => {
6064
let outputStream: PassThrough | undefined;
6165
const contentEncoding = nodeResponse.headers['content-encoding'];
@@ -125,12 +129,13 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
125129
});
126130
resolve(ponyfillResponse);
127131
});
128-
nodeRequest.once('error', reject);
129132

130133
if (fetchRequest['_buffer'] != null) {
131-
nodeRequest.write(fetchRequest['_buffer']);
132-
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
133-
nodeRequest.end(null, null, null);
134+
handleMaybePromise(
135+
() => safeWrite(fetchRequest['_buffer'], nodeRequest),
136+
() => endStream(nodeRequest),
137+
reject,
138+
);
134139
} else {
135140
const nodeReadable = (
136141
fetchRequest.body != null
@@ -142,8 +147,7 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
142147
if (nodeReadable) {
143148
nodeReadable.pipe(nodeRequest);
144149
} else {
145-
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
146-
nodeRequest.end(null, null, null);
150+
endStream(nodeRequest);
147151
}
148152
}
149153
} catch (e) {

packages/node-fetch/src/utils.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,18 @@ export function wrapIncomingMessageWithPassthrough({
7070
.catch(onError);
7171
return passThrough;
7272
}
73+
74+
export function endStream(stream: { end: () => void }) {
75+
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
76+
return stream.end(null, null, null);
77+
}
78+
79+
export function safeWrite(
80+
chunk: any,
81+
stream: { write: (chunk: any) => boolean; once: (event: string, listener: () => void) => void },
82+
) {
83+
const result = stream.write(chunk);
84+
if (!result) {
85+
return new Promise<void>(resolve => stream.once('drain', resolve));
86+
}
87+
}

packages/promise-helpers/tests/handleMaybePromise.spec.ts

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -64,42 +64,45 @@ describe('promise-helpers', () => {
6464
},
6565
);
6666

67-
it.each(cases)('when fake value is falsy', ({ input, output }) => {
68-
expect(
69-
handleMaybePromise(
70-
() => (input === 'fake' ? fakePromise(undefined) : undefined),
71-
res => (output === 'fake' ? fakePromise(undefined) : res),
72-
),
73-
).toBe(undefined);
67+
it.each(cases)(
68+
'when fake value is falsy; input: $input output: $output',
69+
({ input, output }) => {
70+
expect(
71+
handleMaybePromise(
72+
() => (input === 'fake' ? fakePromise(undefined) : undefined),
73+
res => (output === 'fake' ? fakePromise(undefined) : res),
74+
),
75+
).toBe(undefined);
7476

75-
expect(
76-
handleMaybePromise(
77-
() => (input === 'fake' ? fakePromise(null) : null),
78-
res => (output === 'fake' ? fakePromise(null) : res),
79-
),
80-
).toBe(null);
77+
expect(
78+
handleMaybePromise(
79+
() => (input === 'fake' ? fakePromise(null) : null),
80+
res => (output === 'fake' ? fakePromise(null) : res),
81+
),
82+
).toBe(null);
8183

82-
expect(
83-
handleMaybePromise(
84-
() => (input === 'fake' ? fakePromise('') : ''),
85-
res => (output === 'fake' ? fakePromise('') : res),
86-
),
87-
).toBe('');
84+
expect(
85+
handleMaybePromise(
86+
() => (input === 'fake' ? fakePromise('') : ''),
87+
res => (output === 'fake' ? fakePromise('') : res),
88+
),
89+
).toBe('');
8890

89-
expect(
90-
handleMaybePromise(
91-
() => (input === 'fake' ? fakePromise(false) : false),
92-
res => (output === 'fake' ? fakePromise(false) : res),
93-
),
94-
).toBe(false);
91+
expect(
92+
handleMaybePromise(
93+
() => (input === 'fake' ? fakePromise(false) : false),
94+
res => (output === 'fake' ? fakePromise(false) : res),
95+
),
96+
).toBe(false);
9597

96-
expect(
97-
handleMaybePromise(
98-
() => (input === 'fake' ? fakePromise(0) : 0),
99-
res => (output === 'fake' ? fakePromise(0) : res),
100-
),
101-
).toBe(0);
102-
});
98+
expect(
99+
handleMaybePromise(
100+
() => (input === 'fake' ? fakePromise(0) : 0),
101+
res => (output === 'fake' ? fakePromise(0) : res),
102+
),
103+
).toBe(0);
104+
},
105+
);
103106
});
104107
describe('finally', () => {
105108
describe('with promises', () => {

packages/server/package.json

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@
4141
"tslib": "^2.6.3"
4242
},
4343
"devDependencies": {
44-
"@hapi/hapi": "^21.3.12",
44+
"@hapi/hapi": "21.4.0",
45+
"@types/compression": "1.7.5",
4546
"@types/express": "5.0.1",
46-
"@types/koa": "^2.15.0",
47+
"@types/koa": "2.15.0",
4748
"@types/node": "22.15.17",
49+
"compression": "1.8.0",
4850
"express": "5.1.0",
4951
"fastify": "5.3.2",
50-
"form-data": "^4.0.2",
51-
"koa": "^3.0.0",
52+
"form-data": "4.0.2",
53+
"koa": "3.0.0",
5254
"react": "19.1.0",
5355
"react-dom": "19.1.0"
5456
},

0 commit comments

Comments
 (0)