Skip to content

Commit ccb80d8

Browse files
committed
fix(fusion): handle promises of async iterables
1 parent 4472019 commit ccb80d8

File tree

4 files changed

+98
-5
lines changed

4 files changed

+98
-5
lines changed

.changeset/cyan-worms-pull.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@graphql-mesh/fusion-execution": patch
3+
---
4+
5+
Handle promises of async iterables correctly

packages/fusion/execution/src/execution.ts

+32-4
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,14 @@ export function executeResolverOperationNodesWithDependenciesInParallel({
306306
if (isAsyncIterable(fieldOpResult$)) {
307307
fieldOpAsyncIterables.push(mapAsyncIterator(fieldOpResult$, handleFieldOpResult));
308308
} else if (isPromise(fieldOpResult$)) {
309-
fieldOpPromises.push(fieldOpResult$.then(handleFieldOpResult as any));
309+
fieldOpPromises.push(
310+
fieldOpResult$.then(fieldOpResult => {
311+
if (isAsyncIterable(fieldOpResult)) {
312+
return mapAsyncIterator(fieldOpResult, handleFieldOpResult as any);
313+
}
314+
return handleFieldOpResult(fieldOpResult as any);
315+
}),
316+
);
310317
} else {
311318
handleFieldOpResult(fieldOpResult$);
312319
}
@@ -373,7 +380,16 @@ export function executeResolverOperationNodesWithDependenciesInParallel({
373380
const mergedIterable = Repeater.merge([...fieldOpPromises, ...fieldOpAsyncIterables]);
374381
asyncIterables.push(mapAsyncIterator(mergedIterable, handleFieldOpResults));
375382
} else if (fieldOpPromises.length) {
376-
dependencyPromises.push(Promise.all(fieldOpPromises).then(handleFieldOpResults));
383+
dependencyPromises.push(
384+
Promise.all(fieldOpPromises).then(fieldOpPromiseResults => {
385+
const asyncIterablesInResults = fieldOpPromiseResults.filter(isAsyncIterable);
386+
if (asyncIterablesInResults.length) {
387+
const mergedIterable = Repeater.merge(asyncIterablesInResults);
388+
return mapAsyncIterator(mergedIterable, handleFieldOpResults);
389+
}
390+
return handleFieldOpResults();
391+
}),
392+
);
377393
} else {
378394
handleFieldOpResults();
379395
}
@@ -390,7 +406,14 @@ export function executeResolverOperationNodesWithDependenciesInParallel({
390406
return mapAsyncIterator(mergedIterable, handleDependencyPromises);
391407
}
392408
if (dependencyPromises.length) {
393-
return Promise.all(dependencyPromises).then(handleDependencyPromises);
409+
return Promise.all(dependencyPromises).then(dependencyPromiseResults => {
410+
const asyncIterablesInResults = dependencyPromiseResults.filter(isAsyncIterable);
411+
if (asyncIterablesInResults.length) {
412+
const mergedIterable = Repeater.merge(asyncIterablesInResults);
413+
return mapAsyncIterator(mergedIterable, handleDependencyPromises);
414+
}
415+
return handleDependencyPromises();
416+
});
394417
}
395418
return handleDependencyPromises();
396419
}
@@ -627,7 +650,12 @@ export function executeResolverOperationNode({
627650
}
628651

629652
if (isPromise(result$)) {
630-
return result$.then(handleResult);
653+
return result$.then(result => {
654+
if (isAsyncIterable(result)) {
655+
return mapAsyncIterator(result as AsyncIterableIterator<any>, handleResult as any);
656+
}
657+
return handleResult(result);
658+
});
631659
}
632660

633661
return handleResult(result$);

packages/fusion/execution/src/operations.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,12 @@ export function executeOperationPlan({
295295
errors,
296296
});
297297
if (isPromise(res$)) {
298-
return res$.then(res => prepareExecutionResult(res, errors, executablePlan));
298+
return res$.then(res => {
299+
if (isAsyncIterable(res)) {
300+
return mapAsyncIterator(res, res => prepareExecutionResult(res, errors, executablePlan));
301+
}
302+
return prepareExecutionResult(res, errors, executablePlan);
303+
});
299304
}
300305
if (isAsyncIterable(res$)) {
301306
return mapAsyncIterator(res$, res => prepareExecutionResult(res, errors, executablePlan));

packages/fusion/runtime/tests/runtime.spec.ts

+55
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,28 @@ describe('useFusiongraph', () => {
3838
},
3939
},
4040
});
41+
const cSchema = createSchema({
42+
typeDefs: `
43+
type Query {
44+
c: String
45+
}
46+
type Subscription {
47+
c: String
48+
}
49+
`,
50+
resolvers: {
51+
Query: {
52+
c: () => 'c',
53+
},
54+
Subscription: {
55+
c: {
56+
async *subscribe() {
57+
yield { c: 'c' };
58+
},
59+
},
60+
},
61+
},
62+
});
4163
const yoga = createYoga({
4264
plugins: [
4365
useFusiongraph({
@@ -51,6 +73,10 @@ describe('useFusiongraph', () => {
5173
name: 'b',
5274
schema: bSchema,
5375
},
76+
{
77+
name: 'c',
78+
schema: cSchema,
79+
},
5480
]),
5581
transports() {
5682
return {
@@ -60,6 +86,11 @@ describe('useFusiongraph', () => {
6086
return createDefaultExecutor(aSchema);
6187
case 'b':
6288
return createDefaultExecutor(bSchema);
89+
case 'c':
90+
// Returns Promise<AsyncIterableIterator<ExecutionResult>>
91+
return async function (args) {
92+
return createDefaultExecutor(cSchema)(args);
93+
};
6394
}
6495
throw new Error(`Unknown subgraph: ${subgraphName}`);
6596
},
@@ -100,4 +131,28 @@ describe('useFusiongraph', () => {
100131
},
101132
});
102133
}
134+
it('works with executors that return Promise<AsyncIterableIterator<ExecutionResult>>', async () => {
135+
expect.assertions(1);
136+
const res = await yoga.fetch('http://localhost:4000/graphql', {
137+
method: 'POST',
138+
headers: {
139+
'Content-Type': 'application/json',
140+
Accept: 'text/event-stream',
141+
},
142+
body: JSON.stringify({
143+
query: `
144+
subscription {
145+
c
146+
}
147+
`,
148+
}),
149+
});
150+
const chunks: Uint8Array[] = [];
151+
// eslint-disable-next-line no-unreachable-loop
152+
for await (const chunk of res.body) {
153+
chunks.push(chunk);
154+
}
155+
const resText = Buffer.concat(chunks).toString('utf-8');
156+
expect(resText).toMatch(/{"c":"c"}/);
157+
});
103158
});

0 commit comments

Comments
 (0)