Skip to content

Commit

Permalink
fix(incrementalDelivery) fixes null bubbling for async iterables
Browse files Browse the repository at this point in the history
when a null bubbles up, no further payloads should be sent.
  • Loading branch information
yaacovCR committed Oct 18, 2022
1 parent 04a94fb commit b020207
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 46 deletions.
33 changes: 14 additions & 19 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1056,8 +1056,8 @@ describe('Execute: stream directive', () => {
nestedObject: {
nonNullScalarField: () => Promise.resolve(null),
async *nestedFriendList() {
yield await Promise.resolve(friends[0]);
},
yield await Promise.resolve(friends[0]); /* c8 ignore start */
} /* c8 ignore stop */,
},
});
expectJSON(result).toDeepEqual({
Expand Down Expand Up @@ -1156,9 +1156,6 @@ describe('Execute: stream directive', () => {
path: ['nestedObject', 'nestedFriendList', 0],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
Expand All @@ -1183,8 +1180,8 @@ describe('Execute: stream directive', () => {
deeperNestedObject: {
nonNullScalarField: () => Promise.resolve(null),
async *deeperNestedFriendList() {
yield await Promise.resolve(friends[0]);
},
yield await Promise.resolve(friends[0]); /* c8 ignore start */
} /* c8 ignore stop */,
},
},
});
Expand Down Expand Up @@ -1271,14 +1268,17 @@ describe('Execute: stream directive', () => {

it('Returns iterator and ignores errors when stream payloads are filtered', async () => {
let returned = false;
let index = 0;
let requested = false;
const iterable = {
[Symbol.asyncIterator]: () => ({
next: () => {
const friend = friends[index++];
if (!friend) {
return Promise.resolve({ done: true, value: undefined });
if (requested) {
/* c8 ignore next 3 */
// Not reached, iterator should end immediately.
expect.fail('Not reached');
}
requested = true;
const friend = friends[0];
return Promise.resolve({
done: false,
value: {
Expand Down Expand Up @@ -1356,17 +1356,12 @@ describe('Execute: stream directive', () => {
],
},
],
hasNext: true,
hasNext: false,
},
});
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
done: false,
value: { hasNext: false },
});

const result4 = await iterator.next();
expectJSON(result4).toDeepEqual({ done: true, value: undefined });
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({ done: true, value: undefined });

assert(returned);
});
Expand Down
63 changes: 36 additions & 27 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2026,43 +2026,52 @@ async function executeStreamIterator(
exeContext,
});

const dataPromise = executeStreamIteratorItem(
iterator,
exeContext,
fieldNodes,
info,
itemType,
asyncPayloadRecord,
itemPath,
);

asyncPayloadRecord.addItems(
dataPromise
.then(({ value }) => value)
.then(
(value) => [value],
(err) => {
asyncPayloadRecord.errors.push(err);
return null;
},
),
);
let iteration;
try {
// eslint-disable-next-line no-await-in-loop
const { done } = await dataPromise;
if (done) {
break;
}
} catch (err) {
// entire stream has errored and bubbled upwards
iteration = await executeStreamIteratorItem(
iterator,
exeContext,
fieldNodes,
info,
itemType,
asyncPayloadRecord,
itemPath,
);
} catch (error) {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
asyncPayloadRecord.addItems(null);
// entire stream has errored and bubbled upwards
if (iterator?.return) {
iterator.return().catch(() => {
// ignore errors
});
}
return;
}

const { done, value: completedItem } = iteration;

let completedItems: PromiseOrValue<Array<unknown> | null>;
if (isPromise(completedItem)) {
completedItems = completedItem.then(
(value) => [value],
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
},
);
} else {
completedItems = [completedItem];
}

asyncPayloadRecord.addItems(completedItems);

if (done) {
break;
}
previousAsyncPayloadRecord = asyncPayloadRecord;
index++;
}
Expand Down

0 comments on commit b020207

Please sign in to comment.