Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extract out publisher #3784

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 43 additions & 119 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import {
collectSubfields as _collectSubfields,
} from './collectFields.js';
import { mapAsyncIterable } from './mapAsyncIterable.js';
import { Publisher } from './publisher.js';
import {
getArgumentValues,
getDirectiveValues,
Expand Down Expand Up @@ -121,7 +122,11 @@ export interface ExecutionContext {
typeResolver: GraphQLTypeResolver<any, any>;
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
errors: Array<GraphQLError>;
subsequentPayloads: Set<AsyncPayloadRecord>;
publisher: Publisher<
AsyncPayloadRecord,
IncrementalResult,
SubsequentIncrementalExecutionResult
>;
}

/**
Expand Down Expand Up @@ -357,13 +362,14 @@ function executeImpl(
return result.then(
(data) => {
const initialResult = buildResponse(data, exeContext.errors);
if (exeContext.subsequentPayloads.size > 0) {
const publisher = exeContext.publisher;
if (publisher.hasNext()) {
return {
initialResult: {
...initialResult,
hasNext: true,
},
subsequentResults: yieldSubsequentPayloads(exeContext),
subsequentResults: publisher.subscribe(),
};
}
return initialResult;
Expand All @@ -375,13 +381,14 @@ function executeImpl(
);
}
const initialResult = buildResponse(result, exeContext.errors);
if (exeContext.subsequentPayloads.size > 0) {
const publisher = exeContext.publisher;
if (publisher.hasNext()) {
return {
initialResult: {
...initialResult,
hasNext: true,
},
subsequentResults: yieldSubsequentPayloads(exeContext),
subsequentResults: publisher.subscribe(),
};
}
return initialResult;
Expand Down Expand Up @@ -503,7 +510,7 @@ export function buildExecutionContext(
fieldResolver: fieldResolver ?? defaultFieldResolver,
typeResolver: typeResolver ?? defaultTypeResolver,
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
subsequentPayloads: new Set(),
publisher: new Publisher(resultFromAsyncPayloadRecord, payloadFromResults),
errors: [],
};
}
Expand All @@ -515,7 +522,7 @@ function buildPerEventExecutionContext(
return {
...exeContext,
rootValue: payload,
subsequentPayloads: new Set(),
publisher: new Publisher(resultFromAsyncPayloadRecord, payloadFromResults),
errors: [],
};
}
Expand Down Expand Up @@ -2038,132 +2045,49 @@ function filterSubsequentPayloads(
currentAsyncRecord: AsyncPayloadRecord | undefined,
): void {
const nullPathArray = pathToArray(nullPath);
exeContext.subsequentPayloads.forEach((asyncRecord) => {
exeContext.publisher.filter((asyncRecord) => {
if (asyncRecord === currentAsyncRecord) {
// don't remove payload from where error originates
return;
return true;
}
for (let i = 0; i < nullPathArray.length; i++) {
if (asyncRecord.path[i] !== nullPathArray[i]) {
// asyncRecord points to a path unaffected by this payload
return;
return true;
}
}
// asyncRecord path points to nulled error field
if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) {
asyncRecord.iterator.return().catch(() => {
// ignore error
});
}
exeContext.subsequentPayloads.delete(asyncRecord);

return false;
});
}

function getCompletedIncrementalResults(
exeContext: ExecutionContext,
): Array<IncrementalResult> {
const incrementalResults: Array<IncrementalResult> = [];
for (const asyncPayloadRecord of exeContext.subsequentPayloads) {
const incrementalResult: IncrementalResult = {};
if (!asyncPayloadRecord.isCompleted) {
continue;
}
exeContext.subsequentPayloads.delete(asyncPayloadRecord);
if (isStreamPayload(asyncPayloadRecord)) {
const items = asyncPayloadRecord.items;
if (asyncPayloadRecord.isCompletedIterator) {
// async iterable resolver just finished but there may be pending payloads
continue;
}
(incrementalResult as IncrementalStreamResult).items = items;
} else {
const data = asyncPayloadRecord.data;
(incrementalResult as IncrementalDeferResult).data = data ?? null;
}

incrementalResult.path = asyncPayloadRecord.path;
if (asyncPayloadRecord.label) {
incrementalResult.label = asyncPayloadRecord.label;
}
if (asyncPayloadRecord.errors.length > 0) {
incrementalResult.errors = asyncPayloadRecord.errors;
}
incrementalResults.push(incrementalResult);
function resultFromAsyncPayloadRecord(
asyncPayloadRecord: AsyncPayloadRecord,
): IncrementalResult {
const incrementalResult: IncrementalResult = {};
if (isStreamPayload(asyncPayloadRecord)) {
const items = asyncPayloadRecord.items;
(incrementalResult as IncrementalStreamResult).items = items;
} else {
const data = asyncPayloadRecord.data;
(incrementalResult as IncrementalDeferResult).data = data ?? null;
}
return incrementalResults;
}

function yieldSubsequentPayloads(
exeContext: ExecutionContext,
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
let isDone = false;

async function next(): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> {
if (isDone) {
return { value: undefined, done: true };
}

await Promise.race(
Array.from(exeContext.subsequentPayloads).map((p) => p.promise),
);

if (isDone) {
// a different call to next has exhausted all payloads
return { value: undefined, done: true };
}

const incremental = getCompletedIncrementalResults(exeContext);
const hasNext = exeContext.subsequentPayloads.size > 0;

if (!incremental.length && hasNext) {
return next();
}

if (!hasNext) {
isDone = true;
}

return {
value: incremental.length ? { incremental, hasNext } : { hasNext },
done: false,
};
incrementalResult.path = asyncPayloadRecord.path;
if (asyncPayloadRecord.label) {
incrementalResult.label = asyncPayloadRecord.label;
}

function returnStreamIterators() {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
exeContext.subsequentPayloads.forEach((asyncPayloadRecord) => {
if (
isStreamPayload(asyncPayloadRecord) &&
asyncPayloadRecord.iterator?.return
) {
promises.push(asyncPayloadRecord.iterator.return());
}
});
return Promise.all(promises);
if (asyncPayloadRecord.errors.length > 0) {
incrementalResult.errors = asyncPayloadRecord.errors;
}
return incrementalResult;
}

return {
[Symbol.asyncIterator]() {
return this;
},
next,
async return(): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> {
await returnStreamIterators();
isDone = true;
return { value: undefined, done: true };
},
async throw(
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
await returnStreamIterators();
isDone = true;
return Promise.reject(error);
},
};
function payloadFromResults(
incremental: ReadonlyArray<IncrementalResult>,
hasNext: boolean,
): SubsequentIncrementalExecutionResult {
return incremental.length ? { incremental, hasNext } : { hasNext };
}

class DeferredFragmentRecord {
Expand All @@ -2189,7 +2113,7 @@ class DeferredFragmentRecord {
this.parentContext = opts.parentContext;
this.errors = [];
this._exeContext = opts.exeContext;
this._exeContext.subsequentPayloads.add(this);
this._exeContext.publisher.add(this);
this.isCompleted = false;
this.data = null;
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
Expand Down Expand Up @@ -2240,7 +2164,7 @@ class StreamRecord {
this.iterator = opts.iterator;
this.errors = [];
this._exeContext = opts.exeContext;
this._exeContext.subsequentPayloads.add(this);
this._exeContext.publisher.add(this);
this.isCompleted = false;
this.items = null;
this.promise = new Promise<Array<unknown> | null>((resolve) => {
Expand Down
129 changes: 129 additions & 0 deletions src/execution/publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
interface Source {
promise: Promise<void>;
isCompleted: boolean;
isCompletedIterator?: boolean | undefined;
iterator?: AsyncIterator<unknown> | undefined;
}

type ToIncrementalResult<TSource extends Source, TIncremental> = (
source: TSource,
) => TIncremental;

type ToPayload<TIncremental, TPayload> = (
incremental: ReadonlyArray<TIncremental>,
hasNext: boolean,
) => TPayload;

/**
* @internal
*/
export class Publisher<TSource extends Source, TIncremental, TPayload> {
sources: Set<TSource>;
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>;
toPayload: ToPayload<TIncremental, TPayload>;

constructor(
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>,
toPayload: ToPayload<TIncremental, TPayload>,
) {
this.sources = new Set();
this.toIncrementalResult = toIncrementalResult;
this.toPayload = toPayload;
}

add(source: TSource) {
this.sources.add(source);
}

hasNext(): boolean {
return this.sources.size > 0;
}

filter(predicate: (source: TSource) => boolean): void {
this.sources.forEach((source) => {
if (predicate(source)) {
return;
}
if (source.iterator?.return) {
source.iterator.return().catch(() => {
// ignore error
});
}
this.sources.delete(source);
});
}

_getCompletedIncrementalResults(): Array<TIncremental> {
const incrementalResults: Array<TIncremental> = [];
for (const source of this.sources) {
if (!source.isCompleted) {
continue;
}
this.sources.delete(source);
if (source.isCompletedIterator) {
continue;
}
incrementalResults.push(this.toIncrementalResult(source));
}
return incrementalResults;
}

subscribe(): AsyncGenerator<TPayload, void, void> {
let isDone = false;

const next = async (): Promise<IteratorResult<TPayload, void>> => {
if (isDone) {
return { value: undefined, done: true };
}

await Promise.race(Array.from(this.sources).map((p) => p.promise));

if (isDone) {
return { value: undefined, done: true };
}

const incremental = this._getCompletedIncrementalResults();
const hasNext = this.sources.size > 0;

if (!incremental.length && hasNext) {
return next();
}

if (!hasNext) {
isDone = true;
}

return {
value: this.toPayload(incremental, hasNext),
done: false,
};
};

const returnIterators = () => {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
this.sources.forEach((source) => {
if (source.iterator?.return) {
promises.push(source.iterator.return());
}
});
return Promise.all(promises);
};

return {
[Symbol.asyncIterator]() {
return this;
},
next,
async return(): Promise<IteratorResult<TPayload, void>> {
await returnIterators();
isDone = true;
return { value: undefined, done: true };
},
async throw(error?: unknown): Promise<IteratorResult<TPayload, void>> {
await returnIterators();
isDone = true;
return Promise.reject(error);
},
};
}
}