@@ -162,6 +162,12 @@ export interface FormattedCompletedResult {
162
162
errors ?: ReadonlyArray < GraphQLError > ;
163
163
}
164
164
165
+ interface IncrementalAggregate {
166
+ newPendingSources : Set < DeferredFragmentRecord | StreamRecord > ;
167
+ incrementalResults : Array < IncrementalResult > ;
168
+ completedResults : Array < CompletedResult > ;
169
+ }
170
+
165
171
/**
166
172
* This class is used to publish incremental results to the client, enabling semi-concurrent
167
173
* execution while preserving result order.
@@ -399,20 +405,28 @@ export class IncrementalPublisher {
399
405
return { value : undefined , done : true } ;
400
406
}
401
407
402
- for ( const item of this . _released ) {
403
- this . _pending . delete ( item ) ;
404
- }
405
- const released = this . _released ;
406
- this . _released = new Set ( ) ;
408
+ if ( this . _released . size > 0 ) {
409
+ let aggregate = this . _incrementalInitializer ( ) ;
410
+ do {
411
+ for ( const item of this . _released ) {
412
+ this . _pending . delete ( item ) ;
413
+ }
414
+ const released = this . _released ;
415
+ this . _released = new Set ( ) ;
407
416
408
- const result = this . _getIncrementalResult ( released ) ;
417
+ aggregate = this . _incrementalReducer ( aggregate , released ) ;
418
+ } while ( this . _released . size > 0 ) ;
409
419
410
- if ( this . _pending . size === 0 ) {
411
- isDone = true ;
412
- }
420
+ const hasNext = this . _pending . size > 0 ;
421
+
422
+ if ( ! hasNext ) {
423
+ isDone = true ;
424
+ }
413
425
414
- if ( result !== undefined ) {
415
- return { value : result , done : false } ;
426
+ return {
427
+ value : this . _incrementalFinalizer ( aggregate ) ,
428
+ done : false ,
429
+ } ;
416
430
}
417
431
418
432
// eslint-disable-next-line no-await-in-loop
@@ -494,37 +508,20 @@ export class IncrementalPublisher {
494
508
this . _trigger ( ) ;
495
509
}
496
510
497
- private _getIncrementalResult (
498
- completedRecords : ReadonlySet < SubsequentResultRecord > ,
499
- ) : SubsequentIncrementalExecutionResult | undefined {
500
- const { pending, incremental, completed } =
501
- this . _processPending ( completedRecords ) ;
502
-
503
- const hasNext = this . _pending . size > 0 ;
504
- if ( incremental . length === 0 && completed . length === 0 && hasNext ) {
505
- return undefined ;
506
- }
507
-
508
- const result : SubsequentIncrementalExecutionResult = { hasNext } ;
509
- if ( pending . length ) {
510
- result . pending = pending ;
511
- }
512
- if ( incremental . length ) {
513
- result . incremental = incremental ;
514
- }
515
- if ( completed . length ) {
516
- result . completed = completed ;
517
- }
518
-
519
- return result ;
511
+ private _incrementalInitializer ( ) : IncrementalAggregate {
512
+ return {
513
+ newPendingSources : new Set < DeferredFragmentRecord | StreamRecord > ( ) ,
514
+ incrementalResults : [ ] ,
515
+ completedResults : [ ] ,
516
+ } ;
520
517
}
521
518
522
- private _processPending (
519
+ private _incrementalReducer (
520
+ aggregate : IncrementalAggregate ,
523
521
completedRecords : ReadonlySet < SubsequentResultRecord > ,
524
- ) : IncrementalUpdate {
525
- const newPendingSources = new Set < DeferredFragmentRecord | StreamRecord > ( ) ;
526
- const incrementalResults : Array < IncrementalResult > = [ ] ;
527
- const completedResults : Array < CompletedResult > = [ ] ;
522
+ ) : IncrementalAggregate {
523
+ const { newPendingSources, incrementalResults, completedResults } =
524
+ aggregate ;
528
525
for ( const subsequentResultRecord of completedRecords ) {
529
526
for ( const child of subsequentResultRecord . children ) {
530
527
if ( child . filtered ) {
@@ -585,11 +582,30 @@ export class IncrementalPublisher {
585
582
}
586
583
}
587
584
588
- return {
589
- pending : this . pendingSourcesToResults ( newPendingSources ) ,
590
- incremental : incrementalResults ,
591
- completed : completedResults ,
585
+ return aggregate ;
586
+ }
587
+
588
+ private _incrementalFinalizer (
589
+ aggregate : IncrementalAggregate ,
590
+ ) : SubsequentIncrementalExecutionResult {
591
+ const { newPendingSources, incrementalResults, completedResults } =
592
+ aggregate ;
593
+ const pendingResults = this . pendingSourcesToResults ( newPendingSources ) ;
594
+
595
+ const result : SubsequentIncrementalExecutionResult = {
596
+ hasNext : this . _pending . size > 0 ,
592
597
} ;
598
+ if ( pendingResults . length ) {
599
+ result . pending = pendingResults ;
600
+ }
601
+ if ( incrementalResults . length ) {
602
+ result . incremental = incrementalResults ;
603
+ }
604
+ if ( completedResults . length ) {
605
+ result . completed = completedResults ;
606
+ }
607
+
608
+ return result ;
593
609
}
594
610
595
611
private _completedRecordToResult (
0 commit comments