diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index fdee3a27921..f4339823a20 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -679,7 +679,6 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [{ path: ['hero'], label: 'DeferID' }], incremental: [ { data: { @@ -689,12 +688,6 @@ describe('Execute: defer directive', () => { }, path: [], }, - ], - completed: [{ path: [], label: 'DeferName' }], - hasNext: true, - }, - { - incremental: [ { data: { id: '1', @@ -702,7 +695,10 @@ describe('Execute: defer directive', () => { path: ['hero'], }, ], - completed: [{ path: ['hero'], label: 'DeferID' }], + completed: [ + { path: [], label: 'DeferName' }, + { path: ['hero'], label: 'DeferID' }, + ], hasNext: false, }, ]); @@ -870,7 +866,6 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [{ path: ['hero', 'nestedObject', 'deeperObject'] }], incremental: [ { data: { @@ -880,12 +875,6 @@ describe('Execute: defer directive', () => { }, path: ['hero'], }, - ], - completed: [{ path: ['hero'] }], - hasNext: true, - }, - { - incremental: [ { data: { bar: 'bar', @@ -893,7 +882,10 @@ describe('Execute: defer directive', () => { path: ['hero', 'nestedObject', 'deeperObject'], }, ], - completed: [{ path: ['hero', 'nestedObject', 'deeperObject'] }], + completed: [ + { path: ['hero'] }, + { path: ['hero', 'nestedObject', 'deeperObject'] }, + ], hasNext: false, }, ]); @@ -948,35 +940,25 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [{ path: ['hero', 'nestedObject'] }], incremental: [ { data: { bar: 'bar' }, path: ['hero', 'nestedObject', 'deeperObject'], }, - ], - completed: [{ path: ['hero'] }], - hasNext: true, - }, - { - pending: [{ path: ['hero', 'nestedObject', 'deeperObject'] }], - incremental: [ { data: { baz: 'baz' }, path: ['hero', 'nestedObject', 'deeperObject'], }, - ], - hasNext: true, - completed: [{ path: ['hero', 'nestedObject'] }], - }, - { - incremental: [ { data: { bak: 'bak' }, path: ['hero', 'nestedObject', 'deeperObject'], }, ], - completed: [{ path: ['hero', 'nestedObject', 'deeperObject'] }], + completed: [ + { path: ['hero'] }, + { path: ['hero', 'nestedObject'] }, + { path: ['hero', 'nestedObject', 'deeperObject'] }, + ], hasNext: false, }, ]); @@ -1023,7 +1005,6 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [{ path: ['hero', 'nestedObject', 'deeperObject'] }], incremental: [ { data: { @@ -1031,15 +1012,6 @@ describe('Execute: defer directive', () => { }, path: ['hero', 'nestedObject', 'deeperObject'], }, - ], - completed: [ - { path: ['hero', 'nestedObject', 'deeperObject'] }, - { path: ['hero'] }, - ], - hasNext: true, - }, - { - incremental: [ { data: { bar: 'bar', @@ -1047,7 +1019,11 @@ describe('Execute: defer directive', () => { path: ['hero', 'nestedObject', 'deeperObject'], }, ], - completed: [{ path: ['hero', 'nestedObject', 'deeperObject'] }], + completed: [ + { path: ['hero', 'nestedObject', 'deeperObject'] }, + { path: ['hero'] }, + { path: ['hero', 'nestedObject', 'deeperObject'] }, + ], hasNext: false, }, ]); @@ -1843,27 +1819,17 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [ - { path: ['hero', 'friends', 0] }, - { path: ['hero', 'friends', 1] }, - { path: ['hero', 'friends', 2] }, - ], incremental: [ { data: { name: 'slow', friends: [{}, {}, {}] }, path: ['hero'], }, - ], - completed: [{ path: ['hero'] }], - hasNext: true, - }, - { - incremental: [ { data: { name: 'Han' }, path: ['hero', 'friends', 0] }, { data: { name: 'Leia' }, path: ['hero', 'friends', 1] }, { data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] }, ], completed: [ + { path: ['hero'] }, { path: ['hero', 'friends', 0] }, { path: ['hero', 'friends', 1] }, { path: ['hero', 'friends', 2] }, @@ -1900,11 +1866,6 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [ - { path: ['hero', 'friends', 0] }, - { path: ['hero', 'friends', 1] }, - { path: ['hero', 'friends', 2] }, - ], incremental: [ { data: { @@ -1913,17 +1874,12 @@ describe('Execute: defer directive', () => { }, path: ['hero'], }, - ], - completed: [{ path: ['hero'] }], - hasNext: true, - }, - { - incremental: [ { data: { name: 'Han' }, path: ['hero', 'friends', 0] }, { data: { name: 'Leia' }, path: ['hero', 'friends', 1] }, { data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] }, ], completed: [ + { path: ['hero'] }, { path: ['hero', 'friends', 0] }, { path: ['hero', 'friends', 1] }, { path: ['hero', 'friends', 2] }, diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 18cac05d591..7a0f684905a 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -153,11 +153,10 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [{ items: ['banana'], path: ['scalarList', 1] }], - hasNext: true, - }, - { - incremental: [{ items: ['coconut'], path: ['scalarList', 2] }], + incremental: [ + { items: ['banana'], path: ['scalarList', 1] }, + { items: ['coconut'], path: ['scalarList', 2] }, + ], completed: [{ path: ['scalarList'] }], hasNext: false, }, @@ -177,15 +176,11 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [{ items: ['apple'], path: ['scalarList', 0] }], - hasNext: true, - }, - { - incremental: [{ items: ['banana'], path: ['scalarList', 1] }], - hasNext: true, - }, - { - incremental: [{ items: ['coconut'], path: ['scalarList', 2] }], + incremental: [ + { items: ['apple'], path: ['scalarList', 0] }, + { items: ['banana'], path: ['scalarList', 1] }, + { items: ['coconut'], path: ['scalarList', 2] }, + ], completed: [{ path: ['scalarList'] }], hasNext: false, }, @@ -235,11 +230,6 @@ describe('Execute: stream directive', () => { items: ['banana'], path: ['scalarList', 1], }, - ], - hasNext: true, - }, - { - incremental: [ { items: ['coconut'], path: ['scalarList', 2], @@ -304,11 +294,6 @@ describe('Execute: stream directive', () => { items: [['banana', 'banana', 'banana']], path: ['scalarListList', 1], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [['coconut', 'coconut', 'coconut']], path: ['scalarListList', 2], @@ -391,20 +376,10 @@ describe('Execute: stream directive', () => { items: [{ name: 'Luke', id: '1' }], path: ['friendList', 0], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Han', id: '2' }], path: ['friendList', 1], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], path: ['friendList', 2], @@ -549,11 +524,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ name: 'Leia', id: '3' }], path: ['friendList', 2], @@ -970,11 +940,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], path: ['friendList', 2], @@ -1021,11 +986,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], path: ['friendList', 2], @@ -1156,11 +1116,6 @@ describe('Execute: stream directive', () => { }, ], }, - ], - hasNext: true, - }, - { - incremental: [ { items: [{ nonNullName: 'Han' }], path: ['friendList', 2], @@ -1484,11 +1439,10 @@ describe('Execute: stream directive', () => { path: ['nestedObject', 'nestedFriendList', 0], }, ], - completed: [{ path: ['otherNestedObject'] }], - hasNext: true, - }, - { - completed: [{ path: ['nestedObject', 'nestedFriendList'] }], + completed: [ + { path: ['otherNestedObject'] }, + { path: ['nestedObject', 'nestedFriendList'] }, + ], hasNext: false, }, ]); @@ -1594,9 +1548,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { completed: [{ path: ['friendList'] }], hasNext: false, }, @@ -1748,9 +1699,6 @@ describe('Execute: stream directive', () => { path: ['friendList', 2], }, ], - hasNext: true, - }, - { completed: [{ path: ['friendList'] }], hasNext: false, }, @@ -1801,17 +1749,12 @@ describe('Execute: stream directive', () => { items: [{ id: '1', name: 'Luke' }], path: ['nestedObject', 'nestedFriendList', 0], }, - ], - completed: [{ path: ['nestedObject'] }], - hasNext: true, - }, - { - incremental: [ { items: [{ id: '2', name: 'Han' }], path: ['nestedObject', 'nestedFriendList', 1], }, ], + completed: [{ path: ['nestedObject'] }], hasNext: true, }, { @@ -1871,27 +1814,18 @@ describe('Execute: stream directive', () => { data: { scalarField: 'slow', nestedFriendList: [] }, path: ['nestedObject'], }, - ], - completed: [{ path: ['nestedObject'] }], - hasNext: true, - }, - done: false, - }); - const result3 = await iterator.next(); - expectJSON(result3).toDeepEqual({ - value: { - incremental: [ { items: [{ name: 'Luke' }], path: ['nestedObject', 'nestedFriendList', 0], }, ], + completed: [{ path: ['nestedObject'] }], hasNext: true, }, done: false, }); - const result4 = await iterator.next(); - expectJSON(result4).toDeepEqual({ + const result3 = await iterator.next(); + expectJSON(result3).toDeepEqual({ value: { incremental: [ { @@ -1903,16 +1837,16 @@ describe('Execute: stream directive', () => { }, done: false, }); - const result5 = await iterator.next(); - expectJSON(result5).toDeepEqual({ + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ value: { completed: [{ path: ['nestedObject', 'nestedFriendList'] }], hasNext: false, }, done: false, }); - const result6 = await iterator.next(); - expectJSON(result6).toDeepEqual({ + const result5 = await iterator.next(); + expectJSON(result5).toDeepEqual({ value: undefined, done: true, }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 9c73c85064e..a69847c9ba0 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -115,7 +115,8 @@ const collectSubfields = memoize3( type IncrementalPublisher = Publisher< SubsequentResultRecord, - SubsequentIncrementalExecutionResult + SubsequentIncrementalExecutionResult, + IncrementalAggregate >; /** @@ -138,6 +139,12 @@ export interface ExecutionContext { publisher: IncrementalPublisher; } +interface IncrementalAggregate { + newPendingSources: Set; + incrementalResults: Array; + completedResults: Array; +} + /** * The result of GraphQL execution. * @@ -197,17 +204,14 @@ export interface FormattedInitialIncrementalExecutionResult< extensions?: TExtensions; } -interface IncrementalUpdate> { - pending: ReadonlyArray; - incremental: ReadonlyArray>; - completed: ReadonlyArray; -} - export interface SubsequentIncrementalExecutionResult< TData = unknown, TExtensions = ObjMap, -> extends Partial> { +> { hasNext: boolean; + pending?: ReadonlyArray; + incremental?: ReadonlyArray>; + completed?: ReadonlyArray; extensions?: TExtensions; } @@ -579,7 +583,12 @@ export function buildExecutionContext( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, - publisher: new Publisher(getIncrementalResult, returnStreamIterators), + publisher: new Publisher( + incrementalInitializer, + incrementalReducer, + incrementalFinalizer, + returnStreamIterators, + ), errors: [], }; } @@ -2512,47 +2521,21 @@ function returnStreamIteratorIgnoringError(streamRecord: StreamRecord): void { }); } -function getIncrementalResult( - completedRecords: ReadonlySet, - publisher: IncrementalPublisher, -): SubsequentIncrementalExecutionResult | undefined { - const { pending, incremental, completed } = processPending(completedRecords); - - const hasNext = publisher.hasNext(); - if (incremental.length === 0 && completed.length === 0 && hasNext) { - return undefined; - } - - const result: SubsequentIncrementalExecutionResult = { hasNext }; - if (pending.length) { - result.pending = pending; - } - if (incremental.length) { - result.incremental = incremental; - } - if (completed.length) { - result.completed = completed; - } - - return result; +function incrementalInitializer(): IncrementalAggregate { + return { + newPendingSources: new Set(), + incrementalResults: [], + completedResults: [], + }; } -function processPending( +function incrementalReducer( + aggregate: IncrementalAggregate, completedRecords: ReadonlySet, -): IncrementalUpdate { - const newPendingSources = new Set(); - const incrementalResults: Array = []; - const completedResults: Array = []; +): IncrementalAggregate { + const { newPendingSources, incrementalResults, completedResults } = aggregate; for (const subsequentResultRecord of completedRecords) { - for (const child of subsequentResultRecord.children) { - const pendingSource = isStreamItemsRecord(child) - ? child.streamRecord - : child; - if (!pendingSource.pendingSent) { - newPendingSources.add(pendingSource); - } - child.publish(); - } + publishChildRecords(subsequentResultRecord.children, newPendingSources); if (isStreamItemsRecord(subsequentResultRecord)) { if (subsequentResultRecord.isFinalRecord) { newPendingSources.delete(subsequentResultRecord.streamRecord); @@ -2598,11 +2581,51 @@ function processPending( } } - return { - pending: pendingSourcesToResults(newPendingSources), - incremental: incrementalResults, - completed: completedResults, - }; + return aggregate; +} + +function incrementalFinalizer( + aggregate: IncrementalAggregate, + hasNext: boolean, +): SubsequentIncrementalExecutionResult { + const { newPendingSources, incrementalResults, completedResults } = aggregate; + const pendingResults = pendingSourcesToResults(newPendingSources); + + const result: SubsequentIncrementalExecutionResult = { hasNext }; + if (pendingResults.length) { + result.pending = pendingResults; + } + if (incrementalResults.length) { + result.incremental = incrementalResults; + } + if (completedResults.length) { + result.completed = completedResults; + } + + return result; +} + +function publishChildRecords( + children: ReadonlySet, + newPendingSources: Set, +): void { + for (const child of children) { + if (isStreamItemsRecord(child)) { + const streamRecord = child.streamRecord; + if (!streamRecord.pendingSent) { + newPendingSources.add(streamRecord); + } + if (!child.published) { + // StreamItemsRecords potentially have multiple parents + child.publish(); + } + } else { + if (!child.pendingSent) { + newPendingSources.add(child); + } + child.publish(); + } + } } function completedRecordToResult( diff --git a/src/jsutils/Publisher.ts b/src/jsutils/Publisher.ts index 4c50165fe9f..ebaa584bfd4 100644 --- a/src/jsutils/Publisher.ts +++ b/src/jsutils/Publisher.ts @@ -1,8 +1,10 @@ /** @internal */ -export class Publisher { +export class Publisher { _released: Set; _pending: Set; - _update: (completed: Set, publisher: Publisher) => R | undefined; + _initializer: () => A; + _reducer: (aggregate: A, completed: Set) => A; + _finalizer: (aggregate: A, hasNext: boolean) => R; _onAbruptClose: (pending: ReadonlySet) => Promise; // these are assigned within the Promise executor called synchronously within the constructor @@ -10,15 +12,17 @@ export class Publisher { _resolve!: () => void; constructor( - update: ( - released: ReadonlySet, - publisher: Publisher, - ) => R | undefined, + initializer: () => A, + reducer: (aggregate: A, released: ReadonlySet) => A, + finalizer: (aggregate: A, hasNext: boolean) => R, onAbruptClose: (pending: ReadonlySet) => Promise, ) { this._released = new Set(); this._pending = new Set(); - this._update = update; + this._initializer = initializer; + this._reducer = reducer; + this._finalizer = finalizer; + this._reducer = reducer; this._onAbruptClose = onAbruptClose; this._reset(); } @@ -73,20 +77,25 @@ export class Publisher { return { value: undefined, done: true }; } - for (const item of this._released) { - this._pending.delete(item); - } - const released = this._released; - this._released = new Set(); + if (this._released.size > 0) { + let aggregate = this._initializer(); + do { + for (const item of this._released) { + this._pending.delete(item); + } + const released = this._released; + this._released = new Set(); - const result = this._update(released, this); + aggregate = this._reducer(aggregate, released); + } while (this._released.size > 0); - if (!this.hasNext()) { - isDone = true; - } + const hasNext = this.hasNext(); + + if (!hasNext) { + isDone = true; + } - if (result !== undefined) { - return { value: result, done: false }; + return { value: this._finalizer(aggregate, hasNext), done: false }; } // eslint-disable-next-line no-await-in-loop