@@ -169,6 +169,12 @@ export interface FormattedCompletedResult {
169
169
errors ?: ReadonlyArray < GraphQLError > ;
170
170
}
171
171
172
+ interface IncrementalAggregate {
173
+ newPendingSources : Set < DeferredFragmentRecord | StreamRecord > ;
174
+ incrementalResults : Array < IncrementalResult > ;
175
+ completedResults : Array < CompletedResult > ;
176
+ }
177
+
172
178
/**
173
179
* This class is used to publish incremental results to the client, enabling semi-concurrent
174
180
* execution while preserving result order.
@@ -482,20 +488,28 @@ export class IncrementalPublisher {
482
488
return { value : undefined , done : true } ;
483
489
}
484
490
485
- for ( const item of this . _released ) {
486
- this . _pending . delete ( item ) ;
487
- }
488
- const released = this . _released ;
489
- this . _released = new Set ( ) ;
491
+ if ( this . _released . size > 0 ) {
492
+ let aggregate = this . _incrementalInitializer ( ) ;
493
+ do {
494
+ for ( const item of this . _released ) {
495
+ this . _pending . delete ( item ) ;
496
+ }
497
+ const released = this . _released ;
498
+ this . _released = new Set ( ) ;
490
499
491
- const result = this . _getIncrementalResult ( released ) ;
500
+ aggregate = this . _incrementalReducer ( aggregate , released ) ;
501
+ } while ( this . _released . size > 0 ) ;
492
502
493
- if ( this . _pending . size === 0 ) {
494
- isDone = true ;
495
- }
503
+ const hasNext = this . _pending . size > 0 ;
504
+
505
+ if ( ! hasNext ) {
506
+ isDone = true ;
507
+ }
496
508
497
- if ( result !== undefined ) {
498
- return { value : result , done : false } ;
509
+ return {
510
+ value : this . _incrementalFinalizer ( aggregate ) ,
511
+ done : false ,
512
+ } ;
499
513
}
500
514
501
515
// eslint-disable-next-line no-await-in-loop
@@ -577,37 +591,20 @@ export class IncrementalPublisher {
577
591
this . _trigger ( ) ;
578
592
}
579
593
580
- private _getIncrementalResult (
581
- completedRecords : ReadonlySet < SubsequentResultRecord > ,
582
- ) : SubsequentIncrementalExecutionResult | undefined {
583
- const { pending, incremental, completed } =
584
- this . _processPending ( completedRecords ) ;
585
-
586
- const hasNext = this . _pending . size > 0 ;
587
- if ( incremental . length === 0 && completed . length === 0 && hasNext ) {
588
- return undefined ;
589
- }
590
-
591
- const result : SubsequentIncrementalExecutionResult = { hasNext } ;
592
- if ( pending . length ) {
593
- result . pending = pending ;
594
- }
595
- if ( incremental . length ) {
596
- result . incremental = incremental ;
597
- }
598
- if ( completed . length ) {
599
- result . completed = completed ;
600
- }
601
-
602
- return result ;
594
+ private _incrementalInitializer ( ) : IncrementalAggregate {
595
+ return {
596
+ newPendingSources : new Set < DeferredFragmentRecord | StreamRecord > ( ) ,
597
+ incrementalResults : [ ] ,
598
+ completedResults : [ ] ,
599
+ } ;
603
600
}
604
601
605
- private _processPending (
602
+ private _incrementalReducer (
603
+ aggregate : IncrementalAggregate ,
606
604
completedRecords : ReadonlySet < SubsequentResultRecord > ,
607
- ) : IncrementalUpdate {
608
- const newPendingSources = new Set < DeferredFragmentRecord | StreamRecord > ( ) ;
609
- const incrementalResults : Array < IncrementalResult > = [ ] ;
610
- const completedResults : Array < CompletedResult > = [ ] ;
605
+ ) : IncrementalAggregate {
606
+ const { newPendingSources, incrementalResults, completedResults } =
607
+ aggregate ;
611
608
for ( const subsequentResultRecord of completedRecords ) {
612
609
for ( const child of subsequentResultRecord . children ) {
613
610
if ( child . filtered ) {
@@ -673,11 +670,30 @@ export class IncrementalPublisher {
673
670
}
674
671
}
675
672
676
- return {
677
- pending : this . pendingSourcesToResults ( newPendingSources ) ,
678
- incremental : incrementalResults ,
679
- completed : completedResults ,
673
+ return aggregate ;
674
+ }
675
+
676
+ private _incrementalFinalizer (
677
+ aggregate : IncrementalAggregate ,
678
+ ) : SubsequentIncrementalExecutionResult {
679
+ const { newPendingSources, incrementalResults, completedResults } =
680
+ aggregate ;
681
+ const pendingResults = this . pendingSourcesToResults ( newPendingSources ) ;
682
+
683
+ const result : SubsequentIncrementalExecutionResult = {
684
+ hasNext : this . _pending . size > 0 ,
680
685
} ;
686
+ if ( pendingResults . length ) {
687
+ result . pending = pendingResults ;
688
+ }
689
+ if ( incrementalResults . length ) {
690
+ result . incremental = incrementalResults ;
691
+ }
692
+ if ( completedResults . length ) {
693
+ result . completed = completedResults ;
694
+ }
695
+
696
+ return result ;
681
697
}
682
698
683
699
private _completedRecordToResult (
0 commit comments