Skip to content

Commit 4709210

Browse files
committed
enhance: use native AbortSignal and AbortController APIs
1 parent 64184c4 commit 4709210

File tree

8 files changed

+57
-75
lines changed

8 files changed

+57
-75
lines changed

.changeset/fair-queens-sniff.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@graphql-mesh/transport-http-callback': patch
3+
'@graphql-tools/executor-http': patch
4+
'@graphql-hive/gateway-runtime': patch
5+
---
6+
7+
Use native AbortSignal, AbortController APIs instead of custom ones

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
"@opentelemetry/otlp-exporter-base": "patch:@opentelemetry/otlp-exporter-base@npm%3A0.56.0#~/.yarn/patches/@opentelemetry-otlp-exporter-base-npm-0.56.0-ba3dc5f5c5.patch",
6666
"@opentelemetry/resources": "patch:@opentelemetry/resources@npm%3A1.29.0#~/.yarn/patches/@opentelemetry-resources-npm-1.29.0-112f89f0c5.patch",
6767
"@vitest/snapshot@npm:3.0.4": "patch:@vitest/snapshot@npm%3A3.0.4#~/.yarn/patches/@vitest-snapshot-npm-3.0.0-d5b381862b.patch",
68+
"@whatwg-node/node-fetch": "0.7.8-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8",
69+
"@whatwg-node/server": "0.10.0-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8",
6870
"cross-spawn": "7.0.6",
6971
"esbuild": "0.24.2",
7072
"graphql": "16.10.0",

packages/executors/http/src/index.ts

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { abortSignalAny } from '@graphql-hive/gateway-abort-signal-any';
21
import {
32
defaultPrintFn,
43
SerializedExecutionRequest,
@@ -132,24 +131,6 @@ export interface HTTPExecutorOptions {
132131

133132
export type HeadersConfig = Record<string, string>;
134133

135-
// To prevent event listener warnings
136-
function createSignalWrapper(signal: AbortSignal): AbortSignal {
137-
const listeners = new Set<EventListener>();
138-
signal.onabort = (event) => {
139-
for (const listener of listeners) {
140-
listener(event);
141-
}
142-
};
143-
return Object.assign(signal, {
144-
addEventListener(_type: 'abort', listener: EventListener) {
145-
listeners.add(listener);
146-
},
147-
removeEventListener(_type: 'abort', listener: EventListener) {
148-
listeners.delete(listener);
149-
},
150-
});
151-
}
152-
153134
export function buildHTTPExecutor(
154135
options?: Omit<HTTPExecutorOptions, 'fetch'> & {
155136
fetch: SyncFetchFn;
@@ -177,13 +158,12 @@ export function buildHTTPExecutor(
177158
): DisposableExecutor<any, HTTPExecutorOptions> {
178159
const printFn = options?.print ?? defaultPrintFn;
179160
const disposeCtrl = new AbortController();
180-
const sharedSignal = createSignalWrapper(disposeCtrl.signal);
181161
const baseExecutor = (
182162
request: ExecutionRequest<any, any, any, HTTPExecutorOptions>,
183163
excludeQuery?: boolean,
184164
) => {
185-
if (sharedSignal.aborted) {
186-
return createResultForAbort(sharedSignal.reason);
165+
if (disposeCtrl.signal.aborted) {
166+
return createResultForAbort(disposeCtrl.signal.reason);
187167
}
188168
const fetchFn = request.extensions?.fetch ?? options?.fetch ?? defaultFetch;
189169
let method = request.extensions?.method || options?.method;
@@ -230,7 +210,7 @@ export function buildHTTPExecutor(
230210
request.extensions = restExtensions;
231211
}
232212

233-
const signals = [sharedSignal];
213+
const signals = [disposeCtrl.signal];
234214
const signalFromRequest = request.signal || request.info?.signal;
235215
if (signalFromRequest) {
236216
if (signalFromRequest.aborted) {
@@ -242,7 +222,7 @@ export function buildHTTPExecutor(
242222
signals.push(AbortSignal.timeout(options.timeout));
243223
}
244224

245-
const signal = abortSignalAny(signals);
225+
const signal = AbortSignal.any(signals);
246226

247227
const upstreamErrorExtensions: UpstreamErrorExtensions = {
248228
request: {
@@ -489,8 +469,8 @@ export function buildHTTPExecutor(
489469
function retryAttempt():
490470
| PromiseLike<ExecutionResult<any>>
491471
| ExecutionResult<any> {
492-
if (sharedSignal.aborted) {
493-
return createResultForAbort(sharedSignal.reason);
472+
if (disposeCtrl.signal.aborted) {
473+
return createResultForAbort(disposeCtrl.signal.reason);
494474
}
495475
attempt++;
496476
if (attempt > options!.retry!) {
Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
import {
2-
abortSignalAny,
3-
isAbortSignalFromAny,
4-
} from '@graphql-hive/gateway-abort-signal-any';
51
import { GraphQLResolveInfo } from '@graphql-tools/utils';
62
import type { GatewayPlugin } from '../types';
73

@@ -21,14 +17,10 @@ export function useUpstreamCancel(): GatewayPlugin {
2117
if (signalInInfo) {
2218
signals.push(signalInInfo);
2319
}
24-
if (isAbortSignalFromAny(options.signal)) {
25-
options.signal.addSignals(signals);
26-
} else {
27-
if (options.signal) {
28-
signals.push(options.signal);
29-
}
30-
options.signal = abortSignalAny(signals);
20+
if (options.signal) {
21+
signals.push(options.signal);
3122
}
23+
options.signal = AbortSignal.any(signals);
3224
},
3325
onSubgraphExecute({ executionRequest }) {
3426
const signals: AbortSignal[] = [];
@@ -38,14 +30,10 @@ export function useUpstreamCancel(): GatewayPlugin {
3830
if (executionRequest.context?.request?.signal) {
3931
signals.push(executionRequest.context.request.signal);
4032
}
41-
if (isAbortSignalFromAny(executionRequest.signal)) {
42-
executionRequest.signal.addSignals(signals);
43-
} else {
44-
if (executionRequest.signal) {
45-
signals.push(executionRequest.signal);
46-
}
47-
executionRequest.signal = abortSignalAny(signals);
33+
if (executionRequest.signal) {
34+
signals.push(executionRequest.signal);
4835
}
36+
executionRequest.signal = AbortSignal.any(signals);
4937
},
5038
};
5139
}

packages/runtime/src/plugins/useUpstreamTimeout.ts

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
import { abortSignalAny } from '@graphql-hive/gateway-abort-signal-any';
21
import { subgraphNameByExecutionRequest } from '@graphql-mesh/fusion-runtime';
32
import { UpstreamErrorExtensions } from '@graphql-mesh/transport-common';
43
import { getHeadersObj } from '@graphql-mesh/utils';
54
import {
5+
createDeferred,
66
createGraphQLError,
77
ExecutionRequest,
88
ExecutionResult,
9-
getAbortPromise,
109
isAsyncIterable,
1110
MaybeAsyncIterable,
1211
MaybePromise,
@@ -54,21 +53,28 @@ export function useUpstreamTimeout<TContext extends Record<string, any>>(
5453
// Comment the line above
5554
// And uncomment the line below to see that statement leaks specificaly
5655
// timeoutSignal = { addEventListener() {} } as AbortSignal;
56+
timeoutSignalsByExecutionRequest.set(
57+
executionRequest,
58+
timeoutSignal,
59+
);
5760
}
58-
timeoutSignalsByExecutionRequest.set(executionRequest, timeoutSignal);
59-
const timeout$ = getAbortPromise(timeoutSignal);
60-
let finalSignal: AbortSignal | undefined = timeoutSignal;
61-
const signals = new Set<AbortSignal>();
62-
signals.add(timeoutSignal);
61+
const timeoutDeferred = createDeferred<void>();
62+
function rejectDeferred() {
63+
timeoutDeferred.reject(timeoutSignal?.reason);
64+
}
65+
timeoutSignal.addEventListener('abort', rejectDeferred, {
66+
once: true,
67+
});
68+
const signals: AbortSignal[] = [];
69+
signals.push(timeoutSignal);
6370
if (executionRequest.signal) {
64-
signals.add(executionRequest.signal);
65-
finalSignal = abortSignalAny(signals);
71+
signals.push(executionRequest.signal);
6672
}
6773
return Promise.race([
68-
timeout$,
74+
timeoutDeferred.promise,
6975
executor({
7076
...executionRequest,
71-
signal: finalSignal,
77+
signal: AbortSignal.any(signals),
7278
}),
7379
])
7480
.then((result) => {
@@ -99,6 +105,8 @@ export function useUpstreamTimeout<TContext extends Record<string, any>>(
99105
throw e;
100106
})
101107
.finally(() => {
108+
timeoutDeferred.resolve();
109+
timeoutSignal.removeEventListener('abort', rejectDeferred);
102110
// Remove from the map after used so we don't see it again
103111
errorExtensionsByExecRequest.delete(executionRequest);
104112
timeoutSignalsByExecutionRequest.delete(executionRequest);
@@ -131,15 +139,15 @@ export function useUpstreamTimeout<TContext extends Record<string, any>>(
131139
} else {
132140
timeoutSignal = AbortSignal.timeout(timeout);
133141
}
134-
const signals = new Set<AbortSignal>();
135-
signals.add(timeoutSignal);
142+
const signals: AbortSignal[] = [];
143+
signals.push(timeoutSignal);
136144
if (options.signal) {
137-
signals.add(options.signal);
138-
setOptions({
139-
...options,
140-
signal: abortSignalAny(signals),
141-
});
145+
signals.push(options.signal);
142146
}
147+
setOptions({
148+
...options,
149+
signal: AbortSignal.any(signals),
150+
});
143151
}
144152
}
145153
if (executionRequest) {

packages/runtime/tests/upstream-timeout.test.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ describe('Upstream Timeout', () => {
4545
throw new Error('Unexpected subgraph');
4646
},
4747
});
48-
setTimeout(() => {
49-
greetingsDeferred.resolve('Hello, World!');
50-
}, 1500);
5148
const res = await gateway.fetch('http://localhost:4000/graphql', {
5249
method: 'POST',
5350
headers: {
@@ -75,6 +72,7 @@ describe('Upstream Timeout', () => {
7572
}),
7673
],
7774
});
75+
greetingsDeferred.resolve('Hello, World!');
7876
});
7977
it('issue #303 - does not leak when it does not time out', async () => {
8078
const upstreamSchema = createSchema({

packages/transports/http-callback/src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { process } from '@graphql-mesh/cross-helpers';
22
import { getInterpolatedHeadersFactory } from '@graphql-mesh/string-interpolation';
33
import {
4-
abortSignalAny,
54
type DisposableExecutor,
65
type Transport,
76
} from '@graphql-mesh/transport-common';
@@ -158,7 +157,7 @@ export default {
158157
}
159158
let signal = executionRequest.signal || executionRequest.info?.signal;
160159
if (signal) {
161-
signal = abortSignalAny([reqAbortCtrl.signal, signal]);
160+
signal = AbortSignal.any([reqAbortCtrl.signal, signal]);
162161
}
163162
const subFetchCall$ = mapMaybePromise(
164163
fetch(

yarn.lock

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7013,14 +7013,14 @@ __metadata:
70137013
languageName: node
70147014
linkType: hard
70157015

7016-
"@whatwg-node/node-fetch@npm:^0.7.1, @whatwg-node/node-fetch@npm:^0.7.7":
7017-
version: 0.7.7
7018-
resolution: "@whatwg-node/node-fetch@npm:0.7.7"
7016+
"@whatwg-node/node-fetch@npm:0.7.8-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8":
7017+
version: 0.7.8-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8
7018+
resolution: "@whatwg-node/node-fetch@npm:0.7.8-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8"
70197019
dependencies:
70207020
"@whatwg-node/disposablestack": "npm:^0.0.5"
70217021
busboy: "npm:^1.6.0"
70227022
tslib: "npm:^2.6.3"
7023-
checksum: 10c0/f61c45f256363f1c98ddcbcfc9265c8a98a64d09461a19ce32bcf76ab38619c7d7ee52ee7abfe80e49ddc7d6336e85ee481552294146ae934fca453feb970d23
7023+
checksum: 10c0/ceac8adad72cfbc98fe6b16265defaaa7b31bea8853663b230076b81bd473ee69ae5deb94844dc9ac57842a862c38634f802d3f77231b4f5a1582f0536941122
70247024
languageName: node
70257025
linkType: hard
70267026

@@ -7036,14 +7036,14 @@ __metadata:
70367036
languageName: node
70377037
linkType: hard
70387038

7039-
"@whatwg-node/server@npm:^0.9.55, @whatwg-node/server@npm:^0.9.60, @whatwg-node/server@npm:^0.9.64":
7040-
version: 0.9.65
7041-
resolution: "@whatwg-node/server@npm:0.9.65"
7039+
"@whatwg-node/server@npm:0.10.0-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8":
7040+
version: 0.10.0-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8
7041+
resolution: "@whatwg-node/server@npm:0.10.0-alpha-20250205002109-e61c2d65aed8aa582581368511e66b20e33632b8"
70427042
dependencies:
70437043
"@whatwg-node/disposablestack": "npm:^0.0.5"
70447044
"@whatwg-node/fetch": "npm:^0.10.0"
70457045
tslib: "npm:^2.6.3"
7046-
checksum: 10c0/f6fde2995c28223278484432b6107908d3bb917e76efb401b132df44ec45f140d3ef97db6ad03d0d197133036f85fbdb9274f4ed75363594b0469391c178bbfb
7046+
checksum: 10c0/97992d4070541692a184f8722ae1fc908bc9737cfefffeb2f14133f162c0ee95d373bcf0bef6469a8a6a68f26ae1725b3b141a2fd3a3e40c47de381c00a768e2
70477047
languageName: node
70487048
linkType: hard
70497049

0 commit comments

Comments
 (0)