@@ -160,14 +160,13 @@ class Stream<T> extends ExchangeImpl<T> {
160
160
// send lock: prevent sending DataFrames after reset occurred.
161
161
private final Lock sendLock = new ReentrantLock ();
162
162
private final Lock stateLock = new ReentrantLock ();
163
-
164
163
/**
165
164
* A reference to this Stream's connection Send Window controller. The
166
165
* stream MUST acquire the appropriate amount of Send Window before
167
166
* sending any data. Will be null for PushStreams, as they cannot send data.
168
167
*/
169
168
private final WindowController windowController ;
170
- private final WindowUpdateSender windowUpdater ;
169
+ private final WindowUpdateSender streamWindowUpdater ;
171
170
172
171
// Only accessed in all method calls from incoming(), no need for volatile
173
172
private boolean endStreamSeen ;
@@ -217,7 +216,8 @@ private void schedule() {
217
216
int size = Utils .remaining (dsts , Integer .MAX_VALUE );
218
217
if (size == 0 && finished ) {
219
218
inputQ .remove ();
220
- connection .ensureWindowUpdated (df ); // must update connection window
219
+ // consumed will not be called
220
+ connection .releaseUnconsumed (df ); // must update connection window
221
221
Log .logTrace ("responseSubscriber.onComplete" );
222
222
if (debug .on ()) debug .log ("incoming: onComplete" );
223
223
connection .decrementStreamsCount (streamid );
@@ -232,7 +232,11 @@ private void schedule() {
232
232
try {
233
233
subscriber .onNext (dsts );
234
234
} catch (Throwable t ) {
235
- connection .dropDataFrame (df ); // must update connection window
235
+ // Data frames that have been added to the inputQ
236
+ // must be released using releaseUnconsumed() to
237
+ // account for the amount of unprocessed bytes
238
+ // tracked by the connection.windowUpdater.
239
+ connection .releaseUnconsumed (df );
236
240
throw t ;
237
241
}
238
242
if (consumed (df )) {
@@ -283,8 +287,12 @@ private void schedule() {
283
287
private void drainInputQueue () {
284
288
Http2Frame frame ;
285
289
while ((frame = inputQ .poll ()) != null ) {
286
- if (frame instanceof DataFrame ) {
287
- connection .dropDataFrame ((DataFrame )frame );
290
+ if (frame instanceof DataFrame df ) {
291
+ // Data frames that have been added to the inputQ
292
+ // must be released using releaseUnconsumed() to
293
+ // account for the amount of unprocessed bytes
294
+ // tracked by the connection.windowUpdater.
295
+ connection .releaseUnconsumed (df );
288
296
}
289
297
}
290
298
}
@@ -310,12 +318,13 @@ private boolean consumed(DataFrame df) {
310
318
boolean endStream = df .getFlag (DataFrame .END_STREAM );
311
319
if (len == 0 ) return endStream ;
312
320
313
- connection .windowUpdater .update (len );
314
-
321
+ connection .windowUpdater .processed (len );
315
322
if (!endStream ) {
323
+ streamWindowUpdater .processed (len );
324
+ } else {
316
325
// Don't send window update on a stream which is
317
326
// closed or half closed.
318
- windowUpdater . update (len );
327
+ streamWindowUpdater . released (len );
319
328
}
320
329
321
330
// true: end of stream; false: more data coming
@@ -385,8 +394,21 @@ public String toString() {
385
394
}
386
395
387
396
private void receiveDataFrame (DataFrame df ) {
388
- inputQ .add (df );
389
- sched .runOrSchedule ();
397
+ try {
398
+ int len = df .payloadLength ();
399
+ if (len > 0 ) {
400
+ // we return from here if the connection is being closed.
401
+ if (!connection .windowUpdater .canBufferUnprocessedBytes (len )) return ;
402
+ // we return from here if the stream is being closed.
403
+ if (closed || !streamWindowUpdater .canBufferUnprocessedBytes (len )) {
404
+ connection .releaseUnconsumed (df );
405
+ return ;
406
+ }
407
+ }
408
+ inputQ .add (df );
409
+ } finally {
410
+ sched .runOrSchedule ();
411
+ }
390
412
}
391
413
392
414
/** Handles a RESET frame. RESET is always handled inline in the queue. */
@@ -470,7 +492,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
470
492
this .responseHeadersBuilder = new HttpHeadersBuilder ();
471
493
this .rspHeadersConsumer = new HeadersConsumer ();
472
494
this .requestPseudoHeaders = createPseudoHeaders (request );
473
- this .windowUpdater = new StreamWindowUpdateSender (connection );
495
+ this .streamWindowUpdater = new StreamWindowUpdateSender (connection );
474
496
}
475
497
476
498
private boolean checkRequestCancelled () {
@@ -506,6 +528,8 @@ void incoming(Http2Frame frame) throws IOException {
506
528
if (debug .on ()) {
507
529
debug .log ("request cancelled or stream closed: dropping data frame" );
508
530
}
531
+ // Data frames that have not been added to the inputQ
532
+ // can be released using dropDataFrame
509
533
connection .dropDataFrame (df );
510
534
} else {
511
535
receiveDataFrame (df );
@@ -1427,12 +1451,18 @@ void cancel(IOException cause) {
1427
1451
1428
1452
@ Override
1429
1453
void onProtocolError (final IOException cause ) {
1454
+ onProtocolError (cause , ResetFrame .PROTOCOL_ERROR );
1455
+ }
1456
+
1457
+ void onProtocolError (final IOException cause , int code ) {
1430
1458
if (debug .on ()) {
1431
- debug .log ("cancelling exchange on stream %d due to protocol error: %s" , streamid , cause .getMessage ());
1459
+ debug .log ("cancelling exchange on stream %d due to protocol error [%s]: %s" ,
1460
+ streamid , ErrorFrame .stringForCode (code ),
1461
+ cause .getMessage ());
1432
1462
}
1433
1463
Log .logError ("cancelling exchange on stream {0} due to protocol error: {1}\n " , streamid , cause );
1434
1464
// send a RESET frame and close the stream
1435
- cancelImpl (cause , ResetFrame . PROTOCOL_ERROR );
1465
+ cancelImpl (cause , code );
1436
1466
}
1437
1467
1438
1468
void connectionClosing (Throwable cause ) {
@@ -1736,6 +1766,13 @@ String dbgString() {
1736
1766
return dbgString = dbg ;
1737
1767
}
1738
1768
}
1769
+
1770
+ @ Override
1771
+ protected boolean windowSizeExceeded (long received ) {
1772
+ onProtocolError (new ProtocolException ("stream %s flow control window exceeded"
1773
+ .formatted (streamid )), ResetFrame .FLOW_CONTROL_ERROR );
1774
+ return true ;
1775
+ }
1739
1776
}
1740
1777
1741
1778
/**
0 commit comments