From e758f467b61eab65dcc15b4f950af5b107f7a5a3 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 27 Dec 2022 16:57:39 +0200 Subject: [PATCH 1/3] remove extra ticks from executeStreamField by using custom completePromise helpers --- src/execution/__tests__/stream-test.ts | 10 -- src/execution/execute.ts | 132 ++++++++++++++++++------- 2 files changed, 94 insertions(+), 48 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index aed5211ae1..f9dec10d72 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -531,11 +531,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], path: ['friendList', 2], @@ -984,11 +979,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], path: ['friendList', 2], diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 052cb8da25..99e8ec6c9c 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1798,6 +1798,78 @@ function executeDeferredFragment( asyncPayloadRecord.addData(promiseOrData); } +async function completedItemsFromPromisedItem( + exeContext: ExecutionContext, + itemType: GraphQLOutputType, + fieldNodes: ReadonlyArray, + info: GraphQLResolveInfo, + path: Path, + itemPath: Path, + item: Promise, + asyncPayloadRecord: AsyncPayloadRecord, +): Promise<[unknown] | null> { + try { + try { + const resolved = await item; + let completedItem = completeValue( + exeContext, + itemType, + fieldNodes, + info, + itemPath, + resolved, + asyncPayloadRecord, + ); + if (isPromise(completedItem)) { + completedItem = await completedItem; + } + return [completedItem]; + } catch (rawError) { + const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); + const handledError = handleFieldError( + error, + itemType, + asyncPayloadRecord.errors, + ); + filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord); + return [handledError]; + } + } catch (error) { + asyncPayloadRecord.errors.push(error); + filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); + return null; + } +} + +async function completedItemsFromPromisedCompletedItem( + exeContext: ExecutionContext, + itemType: GraphQLOutputType, + fieldNodes: ReadonlyArray, + path: Path, + itemPath: Path, + completedItem: Promise, + asyncPayloadRecord: AsyncPayloadRecord, +): Promise<[unknown] | null> { + try { + try { + return [await completedItem]; + } catch (rawError) { + const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); + const handledError = handleFieldError( + error, + itemType, + asyncPayloadRecord.errors, + ); + filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord); + return [handledError]; + } + } catch (error) { + asyncPayloadRecord.errors.push(error); + filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); + return null; + } +} + function executeStreamField( path: Path, itemPath: Path, @@ -1816,24 +1888,18 @@ function executeStreamField( exeContext, }); if (isPromise(item)) { - const completedItems = completePromisedValue( - exeContext, - itemType, - fieldNodes, - info, - itemPath, - item, - asyncPayloadRecord, - ).then( - (value) => [value], - (error) => { - asyncPayloadRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); - return null; - }, + asyncPayloadRecord.addItems( + completedItemsFromPromisedItem( + exeContext, + itemType, + fieldNodes, + info, + path, + itemPath, + item, + asyncPayloadRecord, + ), ); - - asyncPayloadRecord.addItems(completedItems); return asyncPayloadRecord; } @@ -1866,27 +1932,17 @@ function executeStreamField( } if (isPromise(completedItem)) { - const completedItems = completedItem - .then(undefined, (rawError) => { - const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); - const handledError = handleFieldError( - error, - itemType, - asyncPayloadRecord.errors, - ); - filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord); - return handledError; - }) - .then( - (value) => [value], - (error) => { - asyncPayloadRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); - return null; - }, - ); - - asyncPayloadRecord.addItems(completedItems); + asyncPayloadRecord.addItems( + completedItemsFromPromisedCompletedItem( + exeContext, + itemType, + fieldNodes, + path, + itemPath, + completedItem, + asyncPayloadRecord, + ), + ); return asyncPayloadRecord; } From 3ffb33ced7024d6e86118065c4698d25debaa6fa Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 27 Dec 2022 19:31:10 +0200 Subject: [PATCH 2/3] add test to complete coverage --- src/execution/__tests__/stream-test.ts | 50 ++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index f9dec10d72..ea1175acb1 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -450,6 +450,56 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Can stream a field that returns a list of promises with nested promises', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document, { + friendList: () => + friends.map((f) => + Promise.resolve({ + name: Promise.resolve(f.name), + id: Promise.resolve(f.id), + }), + ), + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [ + { + name: 'Luke', + id: '1', + }, + { + name: 'Han', + id: '2', + }, + ], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [ + { + name: 'Leia', + id: '3', + }, + ], + path: ['friendList', 2], + }, + ], + hasNext: false, + }, + ]); + }); it('Handles rejections in a field that returns a list of promises before initialCount is reached', async () => { const document = parse(` query { From ea8d0f2cfaaa20a7f3552305559316316e8a2cbb Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 27 Dec 2022 21:08:05 +0200 Subject: [PATCH 3/3] remove extra ticks from executeStreamIterator by inlining executeStreamIteratorItem and using an async helper --- src/execution/__tests__/stream-test.ts | 42 ++---- src/execution/execute.ts | 178 +++++++++++++------------ 2 files changed, 108 insertions(+), 112 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index ea1175acb1..621986b62d 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -678,9 +678,6 @@ describe('Execute: stream directive', () => { path: ['friendList', 2], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -720,7 +717,7 @@ describe('Execute: stream directive', () => { } } `); - const result = await completeAsync(document, 3, { + const result = await completeAsync(document, 2, { async *friendList() { yield await Promise.resolve(friends[0]); yield await Promise.resolve(friends[1]); @@ -749,10 +746,9 @@ describe('Execute: stream directive', () => { path: ['friendList', 2], }, ], - hasNext: true, + hasNext: false, }, }, - { done: false, value: { hasNext: false } }, { done: true, value: undefined }, ]); }); @@ -1214,9 +1210,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -1240,25 +1233,19 @@ describe('Execute: stream directive', () => { } /* c8 ignore stop */, }, }); - expectJSON(result).toDeepEqual([ - { - errors: [ - { - message: - 'Cannot return null for non-nullable field NestedObject.nonNullScalarField.', - locations: [{ line: 4, column: 11 }], - path: ['nestedObject', 'nonNullScalarField'], - }, - ], - data: { - nestedObject: null, + expectJSON(result).toDeepEqual({ + errors: [ + { + message: + 'Cannot return null for non-nullable field NestedObject.nonNullScalarField.', + locations: [{ line: 4, column: 11 }], + path: ['nestedObject', 'nonNullScalarField'], }, - hasNext: true, - }, - { - hasNext: false, + ], + data: { + nestedObject: null, }, - ]); + }); }); it('Filters payloads that are nulled by a later synchronous error', async () => { const document = parse(` @@ -1399,9 +1386,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 99e8ec6c9c..fabe646c11 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1950,59 +1950,49 @@ function executeStreamField( return asyncPayloadRecord; } -async function executeStreamIteratorItem( +async function completedItemsFromPromisedCompletedStreamedItem( iterator: AsyncIterator, exeContext: ExecutionContext, - fieldNodes: ReadonlyArray, - info: GraphQLResolveInfo, itemType: GraphQLOutputType, - asyncPayloadRecord: StreamRecord, + fieldNodes: ReadonlyArray, + path: Path, itemPath: Path, -): Promise> { - let item; + completedItem: Promise, + asyncPayloadRecord: AsyncPayloadRecord, +): Promise<[unknown] | null> { try { - const { value, done } = await iterator.next(); - if (done) { - asyncPayloadRecord.setIsCompletedIterator(); - return { done, value: undefined }; + try { + return [await completedItem]; + } catch (rawError) { + const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); + const handledError = handleFieldError( + error, + itemType, + asyncPayloadRecord.errors, + ); + filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord); + return [handledError]; } - item = value; - } catch (rawError) { - const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); - const value = handleFieldError(error, itemType, asyncPayloadRecord.errors); - // don't continue if iterator throws - return { done: true, value }; + } catch (error) { + handleStreamError(iterator, exeContext, path, asyncPayloadRecord, error); + return null; } - let completedItem; - try { - completedItem = completeValue( - exeContext, - itemType, - fieldNodes, - info, - itemPath, - item, - asyncPayloadRecord, - ); +} - if (isPromise(completedItem)) { - completedItem = completedItem.then(undefined, (rawError) => { - const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); - const handledError = handleFieldError( - error, - itemType, - asyncPayloadRecord.errors, - ); - filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord); - return handledError; - }); - } - return { done: false, value: completedItem }; - } catch (rawError) { - const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); - const value = handleFieldError(error, itemType, asyncPayloadRecord.errors); - filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord); - return { done: false, value }; +function handleStreamError( + iterator: AsyncIterator, + exeContext: ExecutionContext, + path: Path, + asyncPayloadRecord: AsyncPayloadRecord, + error: GraphQLError, +): void { + asyncPayloadRecord.errors.push(error); + filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); + // entire stream has errored and bubbled upwards + if (iterator?.return) { + iterator.return().catch(() => { + // ignore errors + }); } } @@ -2032,50 +2022,72 @@ async function executeStreamIterator( let iteration; try { - // eslint-disable-next-line no-await-in-loop - iteration = await executeStreamIteratorItem( - iterator, - exeContext, - fieldNodes, - info, - itemType, - asyncPayloadRecord, - itemPath, - ); - } catch (error) { - asyncPayloadRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); - asyncPayloadRecord.addItems(null); - // entire stream has errored and bubbled upwards - if (iterator?.return) { - iterator.return().catch(() => { - // ignore errors - }); + try { + // eslint-disable-next-line no-await-in-loop + iteration = await iterator.next(); + } catch (rawError) { + const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); + const value = handleFieldError( + error, + itemType, + asyncPayloadRecord.errors, + ); + // don't continue if iterator throws + asyncPayloadRecord.addItems([value]); + break; } - return; - } - const { done, value: completedItem } = iteration; + const { done, value: item } = iteration; - let completedItems: PromiseOrValue | null>; - if (isPromise(completedItem)) { - completedItems = completedItem.then( - (value) => [value], - (error) => { - asyncPayloadRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); - return null; - }, - ); - } else { - completedItems = [completedItem]; - } + if (done) { + asyncPayloadRecord.setIsCompletedIterator(); + asyncPayloadRecord.addItems(null); + break; + } - asyncPayloadRecord.addItems(completedItems); + let completedItem; + try { + completedItem = completeValue( + exeContext, + itemType, + fieldNodes, + info, + itemPath, + item, + asyncPayloadRecord, + ); + } catch (rawError) { + const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); + completedItem = handleFieldError( + error, + itemType, + asyncPayloadRecord.errors, + ); + filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord); + } - if (done) { - break; + if (isPromise(completedItem)) { + asyncPayloadRecord.addItems( + completedItemsFromPromisedCompletedStreamedItem( + iterator, + exeContext, + itemType, + fieldNodes, + path, + itemPath, + completedItem, + asyncPayloadRecord, + ), + ); + } else { + asyncPayloadRecord.addItems([completedItem]); + } + } catch (error) { + handleStreamError(iterator, exeContext, path, asyncPayloadRecord, error); + asyncPayloadRecord.addItems(null); + return; } + previousAsyncPayloadRecord = asyncPayloadRecord; index++; }