Skip to content

Commit bc32ccd

Browse files
committed
optimize further
1 parent 4090fb4 commit bc32ccd

File tree

6 files changed

+387
-238
lines changed

6 files changed

+387
-238
lines changed

Diff for: src/execution/IncrementalPublisher.ts

+108-73
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,15 @@ export interface FormattedSubsequentIncrementalExecutionResult<
9393
extensions?: TExtensions;
9494
}
9595

96+
interface RawDeferResult<TData = ObjMap<unknown>> {
97+
errors?: ReadonlyArray<GraphQLError>;
98+
data: TData;
99+
}
100+
96101
export interface IncrementalDeferResult<
97102
TData = ObjMap<unknown>,
98103
TExtensions = ObjMap<unknown>,
99-
> {
100-
errors?: ReadonlyArray<GraphQLError>;
101-
data: TData;
104+
> extends RawDeferResult<TData> {
102105
id: string;
103106
subPath?: ReadonlyArray<string | number>;
104107
extensions?: TExtensions;
@@ -115,12 +118,15 @@ export interface FormattedIncrementalDeferResult<
115118
extensions?: TExtensions;
116119
}
117120

118-
export interface IncrementalStreamResult<
119-
TData = Array<unknown>,
120-
TExtensions = ObjMap<unknown>,
121-
> {
121+
interface RawStreamItemsResult<TData = ReadonlyArray<unknown>> {
122122
errors?: ReadonlyArray<GraphQLError>;
123123
items: TData;
124+
}
125+
126+
export interface IncrementalStreamResult<
127+
TData = ReadonlyArray<unknown>,
128+
TExtensions = ObjMap<unknown>,
129+
> extends RawStreamItemsResult<TData> {
124130
id: string;
125131
subPath?: ReadonlyArray<string | number>;
126132
extensions?: TExtensions;
@@ -166,23 +172,27 @@ export interface FormattedCompletedResult {
166172
}
167173

168174
export function buildIncrementalResponse(
175+
context: IncrementalPublisherContext,
169176
result: ObjMap<unknown>,
170-
errors: ReadonlyArray<GraphQLError>,
177+
errors: ReadonlyArray<GraphQLError> | undefined,
171178
futures: ReadonlyArray<Future>,
172-
cancellableStreams: Set<StreamRecord>,
173179
): ExperimentalIncrementalExecutionResults {
174-
const incrementalPublisher = new IncrementalPublisher(cancellableStreams);
180+
const incrementalPublisher = new IncrementalPublisher(context);
175181
return incrementalPublisher.buildResponse(result, errors, futures);
176182
}
177183

184+
interface IncrementalPublisherContext {
185+
cancellableStreams?: Set<StreamRecord> | undefined;
186+
}
187+
178188
/**
179189
* This class is used to publish incremental results to the client, enabling semi-concurrent
180190
* execution while preserving result order.
181191
*
182192
* @internal
183193
*/
184194
class IncrementalPublisher {
185-
private _cancellableStreams: Set<StreamRecord>;
195+
private _context: IncrementalPublisherContext;
186196
private _nextId: number;
187197
private _pending: Set<SubsequentResultRecord>;
188198
private _completedResultQueue: Array<FutureResult>;
@@ -193,8 +203,8 @@ class IncrementalPublisher {
193203
private _signalled!: Promise<unknown>;
194204
private _resolve!: () => void;
195205

196-
constructor(cancellableStreams: Set<StreamRecord>) {
197-
this._cancellableStreams = cancellableStreams;
206+
constructor(context: IncrementalPublisherContext) {
207+
this._context = context;
198208
this._nextId = 0;
199209
this._pending = new Set();
200210
this._completedResultQueue = [];
@@ -206,7 +216,7 @@ class IncrementalPublisher {
206216

207217
buildResponse(
208218
data: ObjMap<unknown>,
209-
errors: ReadonlyArray<GraphQLError>,
219+
errors: ReadonlyArray<GraphQLError> | undefined,
210220
futures: ReadonlyArray<Future>,
211221
): ExperimentalIncrementalExecutionResults {
212222
this._addFutures(futures);
@@ -215,7 +225,7 @@ class IncrementalPublisher {
215225
const pending = this._pendingSourcesToResults();
216226

217227
const initialResult: InitialIncrementalExecutionResult =
218-
errors.length === 0
228+
errors === undefined
219229
? { data, pending, hasNext: true }
220230
: { errors, data, pending, hasNext: true };
221231

@@ -425,8 +435,12 @@ class IncrementalPublisher {
425435
};
426436

427437
const returnStreamIterators = async (): Promise<void> => {
438+
const cancellableStreams = this._context.cancellableStreams;
439+
if (cancellableStreams === undefined) {
440+
return;
441+
}
428442
const promises: Array<Promise<unknown>> = [];
429-
for (const streamRecord of this._cancellableStreams) {
443+
for (const streamRecord of cancellableStreams) {
430444
if (streamRecord.earlyReturn !== undefined) {
431445
promises.push(streamRecord.earlyReturn());
432446
}
@@ -475,27 +489,36 @@ class IncrementalPublisher {
475489
}
476490

477491
private _handleCompletedDeferredGroupedFieldSet(
478-
result: DeferredGroupedFieldSetResult,
492+
deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult,
479493
): void {
480-
if (!isReconcilableDeferredGroupedFieldSetResult(result)) {
481-
for (const deferredFragmentRecord of result.deferredFragmentRecords) {
494+
if (
495+
isNonReconcilableDeferredGroupedFieldSetResult(
496+
deferredGroupedFieldSetResult,
497+
)
498+
) {
499+
for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) {
482500
const id = deferredFragmentRecord.id;
483501
if (id !== undefined) {
484-
this._completed.push({ id, errors: result.errors });
502+
this._completed.push({
503+
id,
504+
errors: deferredGroupedFieldSetResult.errors,
505+
});
485506
this._pending.delete(deferredFragmentRecord);
486507
}
487508
}
488509
return;
489510
}
490-
for (const deferredFragmentRecord of result.deferredFragmentRecords) {
491-
deferredFragmentRecord.reconcilableResults.push(result);
511+
for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) {
512+
deferredFragmentRecord.reconcilableResults.push(
513+
deferredGroupedFieldSetResult,
514+
);
492515
}
493516

494-
if (result.futures) {
495-
this._addFutures(result.futures);
517+
if (deferredGroupedFieldSetResult.futures) {
518+
this._addFutures(deferredGroupedFieldSetResult.futures);
496519
}
497520

498-
for (const deferredFragmentRecord of result.deferredFragmentRecords) {
521+
for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) {
499522
const id = deferredFragmentRecord.id;
500523
// TODO: add test case for this.
501524
// Presumably, this can occur if an error causes a fragment to be completed early,
@@ -522,12 +545,9 @@ class IncrementalPublisher {
522545
fragmentResult,
523546
);
524547
const incrementalEntry: IncrementalDeferResult = {
525-
data: fragmentResult.data,
548+
...fragmentResult.result,
526549
id: bestId,
527550
};
528-
if (result.errors.length > 0) {
529-
incrementalEntry.errors = fragmentResult.errors;
530-
}
531551
if (subPath !== undefined) {
532552
incrementalEntry.subPath = subPath;
533553
}
@@ -548,44 +568,48 @@ class IncrementalPublisher {
548568
this._pruneEmpty();
549569
}
550570

551-
private _handleCompletedStreamItems(result: StreamItemsResult): void {
552-
const streamRecord = result.streamRecord;
571+
private _handleCompletedStreamItems(
572+
streamItemsResult: StreamItemsResult,
573+
): void {
574+
const streamRecord = streamItemsResult.streamRecord;
553575
const id = streamRecord.id;
554576
// TODO: Consider adding invariant or non-null assertion, as this should never happen. Since the stream is converted into a linked list
555577
// for ordering purposes, if an entry errors, additional entries will not be processed.
556578
/* c8 ignore next 3 */
557579
if (id === undefined) {
558580
return;
559581
}
560-
if (result.items === undefined) {
582+
if (streamItemsResult.result === undefined) {
561583
this._completed.push({ id });
562584
this._pending.delete(streamRecord);
563-
this._cancellableStreams.delete(streamRecord);
564-
} else if (result.items === null) {
585+
const cancellableStreams = this._context.cancellableStreams;
586+
if (cancellableStreams !== undefined) {
587+
cancellableStreams.delete(streamRecord);
588+
}
589+
} else if (streamItemsResult.result === null) {
565590
this._completed.push({
566591
id,
567-
errors: result.errors,
592+
errors: streamItemsResult.errors,
568593
});
569594
this._pending.delete(streamRecord);
570-
this._cancellableStreams.delete(streamRecord);
595+
const cancellableStreams = this._context.cancellableStreams;
596+
if (cancellableStreams !== undefined) {
597+
cancellableStreams.delete(streamRecord);
598+
}
571599
streamRecord.earlyReturn?.().catch(() => {
572600
/* c8 ignore next 1 */
573601
// ignore error
574602
});
575603
} else {
576604
const incrementalEntry: IncrementalStreamResult = {
577605
id,
578-
items: result.items as Array<unknown>, // FIX!
606+
...streamItemsResult.result,
579607
};
580608

581-
if (result.errors !== undefined && result.errors.length > 0) {
582-
incrementalEntry.errors = result.errors;
583-
}
584-
585609
this._incremental.push(incrementalEntry);
586610

587-
if (result.futures) {
588-
this._addFutures(result.futures);
611+
if (streamItemsResult.futures) {
612+
this._addFutures(streamItemsResult.futures);
589613
this._pruneEmpty();
590614
}
591615
}
@@ -639,35 +663,39 @@ export function isDeferredGroupedFieldSetRecord(
639663
export interface IncrementalContext {
640664
deferUsageSet: DeferUsageSet | undefined;
641665
path: Path | undefined;
642-
errors: Array<GraphQLError>;
643-
errorPaths: Set<Path>;
644-
futures: Array<Future>;
666+
errors?: Map<Path | undefined, GraphQLError> | undefined;
667+
futures?: Array<Future> | undefined;
645668
}
646669

647-
export interface DeferredGroupedFieldSetResult {
648-
deferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord>;
649-
path: Array<string | number>;
650-
data: ObjMap<unknown> | null;
651-
futures?: ReadonlyArray<Future>;
652-
errors: ReadonlyArray<GraphQLError>;
653-
}
670+
export type DeferredGroupedFieldSetResult =
671+
| ReconcilableDeferredGroupedFieldSetResult
672+
| NonReconcilableDeferredGroupedFieldSetResult;
654673

655674
export function isDeferredGroupedFieldSetResult(
656675
subsequentResult: DeferredGroupedFieldSetResult | StreamItemsResult,
657676
): subsequentResult is DeferredGroupedFieldSetResult {
658677
return 'deferredFragmentRecords' in subsequentResult;
659678
}
660679

661-
interface ReconcilableDeferredGroupedFieldSetResult
662-
extends DeferredGroupedFieldSetResult {
663-
data: ObjMap<unknown>;
680+
interface ReconcilableDeferredGroupedFieldSetResult {
681+
deferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord>;
682+
path: Array<string | number>;
683+
result: RawDeferResult;
684+
futures?: ReadonlyArray<Future> | undefined;
664685
sent?: true | undefined;
665686
}
666687

667-
export function isReconcilableDeferredGroupedFieldSetResult(
688+
interface NonReconcilableDeferredGroupedFieldSetResult {
689+
result: null;
690+
errors: ReadonlyArray<GraphQLError>;
691+
deferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord>;
692+
path: Array<string | number>;
693+
}
694+
695+
export function isNonReconcilableDeferredGroupedFieldSetResult(
668696
deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult,
669-
): deferredGroupedFieldSetResult is ReconcilableDeferredGroupedFieldSetResult {
670-
return deferredGroupedFieldSetResult.data !== null;
697+
): deferredGroupedFieldSetResult is NonReconcilableDeferredGroupedFieldSetResult {
698+
return deferredGroupedFieldSetResult.result === null;
671699
}
672700

673701
/** @internal */
@@ -691,9 +719,6 @@ export class DeferredGroupedFieldSetRecord {
691719
const incrementalContext: IncrementalContext = {
692720
deferUsageSet,
693721
path,
694-
errors: [],
695-
errorPaths: new Set(),
696-
futures: [],
697722
};
698723

699724
for (const deferredFragmentRecord of deferredFragmentRecords) {
@@ -752,24 +777,36 @@ export class StreamRecord {
752777
}
753778
}
754779

755-
interface NonTerminatingStreamItemsResult {
780+
interface NonReconcilableStreamItemsResult {
756781
streamRecord: StreamRecord;
757-
items: ReadonlyArray<unknown> | null;
758-
futures?: ReadonlyArray<Future>;
782+
result: null;
759783
errors: ReadonlyArray<GraphQLError>;
760784
}
761785

786+
interface NonTerminatingStreamItemsResult {
787+
streamRecord: StreamRecord;
788+
result: RawStreamItemsResult;
789+
futures?: ReadonlyArray<Future> | undefined;
790+
}
791+
762792
interface TerminatingStreamItemsResult {
763793
streamRecord: StreamRecord;
764-
items?: never;
794+
result?: never;
765795
futures?: never;
766796
errors?: never;
767797
}
768798

769799
export type StreamItemsResult =
800+
| NonReconcilableStreamItemsResult
770801
| NonTerminatingStreamItemsResult
771802
| TerminatingStreamItemsResult;
772803

804+
export function isNonTerminatingStreamItemsResult(
805+
streamItemsResult: StreamItemsResult,
806+
): streamItemsResult is NonTerminatingStreamItemsResult {
807+
return streamItemsResult.result != null;
808+
}
809+
773810
/** @internal */
774811
export class StreamItemsRecord {
775812
streamRecord: StreamRecord;
@@ -789,9 +826,6 @@ export class StreamItemsRecord {
789826
const incrementalContext: IncrementalContext = {
790827
deferUsageSet: undefined,
791828
path: itemPath,
792-
errors: [],
793-
errorPaths: new Set(),
794-
futures: [],
795829
};
796830

797831
this._result = executor(incrementalContext);
@@ -810,12 +844,13 @@ export class StreamItemsRecord {
810844
private _prependNextStreamItems(
811845
result: StreamItemsResult,
812846
): StreamItemsResult {
813-
return this.nextStreamItems === undefined
814-
? result
815-
: {
847+
return isNonTerminatingStreamItemsResult(result) &&
848+
this.nextStreamItems !== undefined
849+
? {
816850
...result,
817851
futures: [this.nextStreamItems, ...(result.futures ?? [])],
818-
};
852+
}
853+
: result;
819854
}
820855
}
821856

0 commit comments

Comments
 (0)