From b020207254b7c58b9b4e2cea6db1de35d7646cd6 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 18 Oct 2022 12:12:39 +0300 Subject: [PATCH] fix(incrementalDelivery) fixes null bubbling for async iterables when a null bubbles up, no further payloads should be sent. --- src/execution/__tests__/stream-test.ts | 33 ++++++-------- src/execution/execute.ts | 63 +++++++++++++++----------- 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index c977e670fb..b97a11fd33 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -1056,8 +1056,8 @@ describe('Execute: stream directive', () => { nestedObject: { nonNullScalarField: () => Promise.resolve(null), async *nestedFriendList() { - yield await Promise.resolve(friends[0]); - }, + yield await Promise.resolve(friends[0]); /* c8 ignore start */ + } /* c8 ignore stop */, }, }); expectJSON(result).toDeepEqual({ @@ -1156,9 +1156,6 @@ describe('Execute: stream directive', () => { path: ['nestedObject', 'nestedFriendList', 0], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -1183,8 +1180,8 @@ describe('Execute: stream directive', () => { deeperNestedObject: { nonNullScalarField: () => Promise.resolve(null), async *deeperNestedFriendList() { - yield await Promise.resolve(friends[0]); - }, + yield await Promise.resolve(friends[0]); /* c8 ignore start */ + } /* c8 ignore stop */, }, }, }); @@ -1271,14 +1268,17 @@ describe('Execute: stream directive', () => { it('Returns iterator and ignores errors when stream payloads are filtered', async () => { let returned = false; - let index = 0; + let requested = false; const iterable = { [Symbol.asyncIterator]: () => ({ next: () => { - const friend = friends[index++]; - if (!friend) { - return Promise.resolve({ done: true, value: undefined }); + if (requested) { + /* c8 ignore next 3 */ + // Not reached, iterator should end immediately. + expect.fail('Not reached'); } + requested = true; + const friend = friends[0]; return Promise.resolve({ done: false, value: { @@ -1356,17 +1356,12 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, + hasNext: false, }, }); - const result3 = await iterator.next(); - expectJSON(result3).toDeepEqual({ - done: false, - value: { hasNext: false }, - }); - const result4 = await iterator.next(); - expectJSON(result4).toDeepEqual({ done: true, value: undefined }); + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ done: true, value: undefined }); assert(returned); }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 9f3cd4db74..e7a6bed841 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -2026,36 +2026,23 @@ async function executeStreamIterator( exeContext, }); - const dataPromise = executeStreamIteratorItem( - iterator, - exeContext, - fieldNodes, - info, - itemType, - asyncPayloadRecord, - itemPath, - ); - - asyncPayloadRecord.addItems( - dataPromise - .then(({ value }) => value) - .then( - (value) => [value], - (err) => { - asyncPayloadRecord.errors.push(err); - return null; - }, - ), - ); + let iteration; try { // eslint-disable-next-line no-await-in-loop - const { done } = await dataPromise; - if (done) { - break; - } - } catch (err) { - // entire stream has errored and bubbled upwards + iteration = await executeStreamIteratorItem( + iterator, + exeContext, + fieldNodes, + info, + itemType, + asyncPayloadRecord, + itemPath, + ); + } catch (error) { + asyncPayloadRecord.errors.push(error); filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); + asyncPayloadRecord.addItems(null); + // entire stream has errored and bubbled upwards if (iterator?.return) { iterator.return().catch(() => { // ignore errors @@ -2063,6 +2050,28 @@ async function executeStreamIterator( } return; } + + const { done, value: completedItem } = iteration; + + let completedItems: PromiseOrValue | null>; + if (isPromise(completedItem)) { + completedItems = completedItem.then( + (value) => [value], + (error) => { + asyncPayloadRecord.errors.push(error); + filterSubsequentPayloads(exeContext, path, asyncPayloadRecord); + return null; + }, + ); + } else { + completedItems = [completedItem]; + } + + asyncPayloadRecord.addItems(completedItems); + + if (done) { + break; + } previousAsyncPayloadRecord = asyncPayloadRecord; index++; }