Skip to content

Commit 02d58f8

Browse files
committed
add pending notifications
1 parent 272c52e commit 02d58f8

File tree

4 files changed

+192
-4
lines changed

4 files changed

+192
-4
lines changed

src/execution/IncrementalPublisher.ts

+56-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type {
1717
import type { StreamUsage } from './execute.js';
1818

1919
interface IncrementalUpdate<TData = unknown, TExtensions = ObjMap<unknown>> {
20+
pending: ReadonlyArray<PendingResult>;
2021
incremental: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
2122
completed: ReadonlyArray<CompletedResult>;
2223
}
@@ -65,6 +66,7 @@ export interface InitialIncrementalExecutionResult<
6566
TExtensions = ObjMap<unknown>,
6667
> extends ExecutionResult<TData, TExtensions> {
6768
data: TData;
69+
pending: ReadonlyArray<PendingResult>;
6870
hasNext: true;
6971
extensions?: TExtensions;
7072
}
@@ -74,6 +76,7 @@ export interface FormattedInitialIncrementalExecutionResult<
7476
TExtensions = ObjMap<unknown>,
7577
> extends FormattedExecutionResult<TData, TExtensions> {
7678
data: TData;
79+
pending: ReadonlyArray<PendingResult>;
7780
hasNext: boolean;
7881
extensions?: TExtensions;
7982
}
@@ -91,6 +94,7 @@ export interface FormattedSubsequentIncrementalExecutionResult<
9194
TExtensions = ObjMap<unknown>,
9295
> {
9396
hasNext: boolean;
97+
pending?: ReadonlyArray<PendingResult>;
9498
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
9599
completed?: ReadonlyArray<FormattedCompletedResult>;
96100
extensions?: TExtensions;
@@ -147,6 +151,11 @@ export type FormattedIncrementalResult<
147151
| FormattedIncrementalDeferResult<TData, TExtensions>
148152
| FormattedIncrementalStreamResult<TData, TExtensions>;
149153

154+
export interface PendingResult {
155+
path: ReadonlyArray<string | number>;
156+
label?: string;
157+
}
158+
150159
export interface CompletedResult {
151160
path: ReadonlyArray<string | number>;
152161
label?: string;
@@ -373,10 +382,20 @@ export class IncrementalPublisher {
373382

374383
const errors = initialResultRecord.errors;
375384
const initialResult = errors.length === 0 ? { data } : { errors, data };
376-
if (this._pending.size > 0) {
385+
const pending = this._pending;
386+
if (pending.size > 0) {
387+
const pendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
388+
for (const subsequentResultRecord of pending) {
389+
const pendingSource = isStreamItemsRecord(subsequentResultRecord)
390+
? subsequentResultRecord.streamRecord
391+
: subsequentResultRecord;
392+
pendingSources.add(pendingSource);
393+
}
394+
377395
return {
378396
initialResult: {
379397
...initialResult,
398+
pending: this.pendingSourcesToResults(pendingSources),
380399
hasNext: true,
381400
},
382401
subsequentResults: this._subscribe(),
@@ -424,6 +443,23 @@ export class IncrementalPublisher {
424443
});
425444
}
426445

446+
pendingSourcesToResults(
447+
pendingSources: ReadonlySet<DeferredFragmentRecord | StreamRecord>,
448+
): Array<PendingResult> {
449+
const pendingResults: Array<PendingResult> = [];
450+
for (const pendingSource of pendingSources) {
451+
pendingSource.pendingSent = true;
452+
const pendingResult: PendingResult = {
453+
path: pendingSource.path,
454+
};
455+
if (pendingSource.label !== undefined) {
456+
pendingResult.label = pendingSource.label;
457+
}
458+
pendingResults.push(pendingResult);
459+
}
460+
return pendingResults;
461+
}
462+
427463
private _subscribe(): AsyncGenerator<
428464
SubsequentIncrementalExecutionResult,
429465
void,
@@ -538,14 +574,18 @@ export class IncrementalPublisher {
538574
private _getIncrementalResult(
539575
completedRecords: ReadonlySet<SubsequentResultRecord>,
540576
): SubsequentIncrementalExecutionResult | undefined {
541-
const { incremental, completed } = this._processPending(completedRecords);
577+
const { pending, incremental, completed } =
578+
this._processPending(completedRecords);
542579

543580
const hasNext = this._pending.size > 0;
544581
if (incremental.length === 0 && completed.length === 0 && hasNext) {
545582
return undefined;
546583
}
547584

548585
const result: SubsequentIncrementalExecutionResult = { hasNext };
586+
if (pending.length) {
587+
result.pending = pending;
588+
}
549589
if (incremental.length) {
550590
result.incremental = incremental;
551591
}
@@ -559,19 +599,27 @@ export class IncrementalPublisher {
559599
private _processPending(
560600
completedRecords: ReadonlySet<SubsequentResultRecord>,
561601
): IncrementalUpdate {
602+
const newPendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
562603
const incrementalResults: Array<IncrementalResult> = [];
563604
const completedResults: Array<CompletedResult> = [];
564605
for (const subsequentResultRecord of completedRecords) {
565606
for (const child of subsequentResultRecord.children) {
566607
if (child.filtered) {
567608
continue;
568609
}
610+
const pendingSource = isStreamItemsRecord(child)
611+
? child.streamRecord
612+
: child;
613+
if (!pendingSource.pendingSent) {
614+
newPendingSources.add(pendingSource);
615+
}
569616
this._publish(child);
570617
}
571618
if (isStreamItemsRecord(subsequentResultRecord)) {
572619
if (!subsequentResultRecord.sent) {
573620
subsequentResultRecord.sent = true;
574621
if (subsequentResultRecord.isFinalRecord) {
622+
newPendingSources.delete(subsequentResultRecord.streamRecord);
575623
completedResults.push(
576624
this._completedRecordToResult(
577625
subsequentResultRecord.streamRecord,
@@ -595,6 +643,7 @@ export class IncrementalPublisher {
595643
incrementalResults.push(incrementalResult);
596644
}
597645
} else {
646+
newPendingSources.delete(subsequentResultRecord);
598647
completedResults.push(
599648
this._completedRecordToResult(subsequentResultRecord),
600649
);
@@ -619,6 +668,7 @@ export class IncrementalPublisher {
619668
}
620669

621670
return {
671+
pending: this.pendingSourcesToResults(newPendingSources),
622672
incremental: incrementalResults,
623673
completed: completedResults,
624674
};
@@ -777,6 +827,7 @@ export class DeferredFragmentRecord {
777827
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
778828
errors: Array<GraphQLError>;
779829
filtered: boolean;
830+
pendingSent?: boolean;
780831
_pending: Set<DeferredGroupedFieldSetRecord>;
781832

782833
constructor(opts: { path: Path | undefined; label: string | undefined }) {
@@ -796,6 +847,7 @@ export class StreamRecord {
796847
path: ReadonlyArray<string | number>;
797848
errors: Array<GraphQLError>;
798849
asyncIterator?: AsyncIterator<unknown> | undefined;
850+
pendingSent?: boolean;
799851
constructor(opts: {
800852
label: string | undefined;
801853
path: Path;
@@ -838,15 +890,15 @@ export type IncrementalDataRecord =
838890
| DeferredGroupedFieldSetRecord
839891
| StreamItemsRecord;
840892

841-
type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord;
893+
export type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord;
842894

843895
function isDeferredGroupedFieldSetRecord(
844896
incrementalDataRecord: unknown,
845897
): incrementalDataRecord is DeferredGroupedFieldSetRecord {
846898
return incrementalDataRecord instanceof DeferredGroupedFieldSetRecord;
847899
}
848900

849-
function isStreamItemsRecord(
901+
export function isStreamItemsRecord(
850902
subsequentResultRecord: unknown,
851903
): subsequentResultRecord is StreamItemsRecord {
852904
return subsequentResultRecord instanceof StreamItemsRecord;

0 commit comments

Comments
 (0)