Skip to content

Commit 716dc1e

Browse files
committed
fix: do not assign capacity for pending streams
`Prioritize::send_data` has a check to prevent assigning capacity to streams that are not yet open. Assigning flow control window to pending streams could starve already open streams. This change adds a similar check to `Prioritize::reserve_capacity`. Test `capacity_not_assigned_to_unopened_streams` in `flow_control.rs` demonstrates the fix. A number of other tests must be changed because they were assuming that pending streams immediately received connection capacity. This may be related to #853.
1 parent b9d5397 commit 716dc1e

File tree

4 files changed

+145
-25
lines changed

4 files changed

+145
-25
lines changed

src/proto/streams/prioritize.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,7 @@ impl Prioritize {
186186

187187
// `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188188
// cannot be assigned at the time it is called.
189-
//
190-
// Streams over the max concurrent count will still call `send_data` so we should be
191-
// careful not to put it into `pending_capacity` as it will starve the connection
192-
// capacity for other streams
193-
if !stream.is_pending_open {
194-
self.try_assign_capacity(stream);
195-
}
189+
self.try_assign_capacity(stream);
196190
}
197191

198192
if frame.is_end_stream() {
@@ -414,6 +408,12 @@ impl Prioritize {
414408

415409
/// Request capacity to send data
416410
fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
411+
// Streams over the max concurrent count should not have capacity assign to avoid starving the connection
412+
// capacity for open streams
413+
if stream.is_pending_open {
414+
return;
415+
}
416+
417417
let total_requested = stream.requested_send_capacity;
418418

419419
// Total requested should never go below actual assigned

tests/h2-tests/tests/flow_control.rs

Lines changed: 121 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ async fn stream_close_by_data_frame_releases_capacity() {
391391

392392
// The capacity should be immediately available as nothing else is
393393
// happening on the stream.
394-
assert_eq!(s1.capacity(), window_size);
394+
let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await;
395395

396396
let request = Request::builder()
397397
.method(Method::POST)
@@ -414,7 +414,7 @@ async fn stream_close_by_data_frame_releases_capacity() {
414414
s1.send_data("".into(), true).unwrap();
415415

416416
// The capacity should be available
417-
assert_eq!(s2.capacity(), 5);
417+
let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await;
418418

419419
// Send the frame
420420
s2.send_data("hello".into(), true).unwrap();
@@ -461,9 +461,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() {
461461
// This effectively reserves the entire connection window
462462
s1.reserve_capacity(window_size);
463463

464-
// The capacity should be immediately available as nothing else is
465-
// happening on the stream.
466-
assert_eq!(s1.capacity(), window_size);
464+
let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await;
467465

468466
let request = Request::builder()
469467
.method(Method::POST)
@@ -486,7 +484,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() {
486484
s1.send_trailers(Default::default()).unwrap();
487485

488486
// The capacity should be available
489-
assert_eq!(s2.capacity(), 5);
487+
let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await;
490488

491489
// Send the frame
492490
s2.send_data("hello".into(), true).unwrap();
@@ -919,10 +917,10 @@ async fn recv_no_init_window_then_receive_some_init_window() {
919917

920918
let (response, mut stream) = client.send_request(request, false).unwrap();
921919

922-
stream.reserve_capacity(11);
920+
stream.reserve_capacity(10);
923921

924-
let mut stream = h2.drive(util::wait_for_capacity(stream, 11)).await;
925-
assert_eq!(stream.capacity(), 11);
922+
let mut stream = h2.drive(util::wait_for_capacity(stream, 10)).await;
923+
assert_eq!(stream.capacity(), 10);
926924

927925
stream.send_data("hello world".into(), true).unwrap();
928926

@@ -1990,6 +1988,120 @@ async fn reclaim_reserved_capacity() {
19901988
join(mock, h2).await;
19911989
}
19921990

1991+
#[tokio::test]
1992+
async fn capacity_not_assigned_to_unopened_streams() {
1993+
h2_support::trace_init!();
1994+
1995+
let (io, mut srv) = mock::new();
1996+
1997+
let mock = async move {
1998+
let mut settings = frame::Settings::default();
1999+
settings.set_max_concurrent_streams(Some(1));
2000+
let settings = srv.assert_client_handshake_with_settings(settings).await;
2001+
assert_default_settings!(settings);
2002+
2003+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
2004+
.await;
2005+
srv.recv_frame(frames::data(1, "hello")).await;
2006+
srv.recv_frame(frames::data(1, "world").eos()).await;
2007+
srv.send_frame(frames::headers(1).response(200).eos()).await;
2008+
2009+
srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/"))
2010+
.await;
2011+
srv.send_frame(frames::window_update(
2012+
0,
2013+
frame::DEFAULT_INITIAL_WINDOW_SIZE + 10,
2014+
))
2015+
.await;
2016+
srv.recv_frame(frames::reset(3).cancel()).await;
2017+
};
2018+
2019+
let h2 = async move {
2020+
let (mut client, mut h2) = client::handshake(io).await.unwrap();
2021+
let request = Request::builder()
2022+
.method(Method::POST)
2023+
.uri("https://www.example.com/")
2024+
.body(())
2025+
.unwrap();
2026+
2027+
let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap();
2028+
stream1.send_data("hello".into(), false).unwrap();
2029+
let (_, mut stream2) = client.send_request(request.clone(), false).unwrap();
2030+
stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
2031+
stream1.send_data("world".into(), true).unwrap();
2032+
h2.drive(response1).await.unwrap();
2033+
let stream2 = h2
2034+
.drive(util::wait_for_capacity(
2035+
stream2,
2036+
frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
2037+
))
2038+
.await;
2039+
drop(stream2);
2040+
h2.await.unwrap();
2041+
};
2042+
2043+
join(mock, h2).await;
2044+
}
2045+
2046+
#[tokio::test]
2047+
async fn new_initial_window_size_capacity_not_assigned_to_unopened_streams() {
2048+
h2_support::trace_init!();
2049+
2050+
let (io, mut srv) = mock::new();
2051+
2052+
let mock = async move {
2053+
let mut settings = frame::Settings::default();
2054+
settings.set_max_concurrent_streams(Some(1));
2055+
settings.set_initial_window_size(Some(10));
2056+
let settings = srv.assert_client_handshake_with_settings(settings).await;
2057+
assert_default_settings!(settings);
2058+
2059+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
2060+
.await;
2061+
srv.recv_frame(frames::data(1, "hello")).await;
2062+
srv.send_frame(frames::settings().initial_window_size(frame::DEFAULT_INITIAL_WINDOW_SIZE))
2063+
.await;
2064+
srv.recv_frame(frames::settings_ack()).await;
2065+
srv.send_frame(frames::headers(1).response(200).eos()).await;
2066+
srv.recv_frame(frames::data(1, "world").eos()).await;
2067+
2068+
srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/"))
2069+
.await;
2070+
srv.send_frame(frames::window_update(
2071+
0,
2072+
frame::DEFAULT_INITIAL_WINDOW_SIZE + 10,
2073+
))
2074+
.await;
2075+
srv.recv_frame(frames::reset(3).cancel()).await;
2076+
};
2077+
2078+
let h2 = async move {
2079+
let (mut client, mut h2) = client::handshake(io).await.unwrap();
2080+
let request = Request::builder()
2081+
.method(Method::POST)
2082+
.uri("https://www.example.com/")
2083+
.body(())
2084+
.unwrap();
2085+
2086+
let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap();
2087+
stream1.send_data("hello".into(), false).unwrap();
2088+
let (_, mut stream2) = client.send_request(request.clone(), false).unwrap();
2089+
stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
2090+
h2.drive(response1).await.unwrap();
2091+
stream1.send_data("world".into(), true).unwrap();
2092+
let stream2 = h2
2093+
.drive(util::wait_for_capacity(
2094+
stream2,
2095+
frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
2096+
))
2097+
.await;
2098+
drop(stream2);
2099+
h2.await.unwrap();
2100+
};
2101+
2102+
join(mock, h2).await;
2103+
}
2104+
19932105
// ==== abusive window updates ====
19942106

19952107
#[tokio::test]

tests/h2-tests/tests/prioritization.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ async fn single_stream_send_large_body() {
5252
stream.reserve_capacity(payload.len());
5353

5454
// The capacity should be immediately allocated
55-
assert_eq!(stream.capacity(), payload.len());
55+
let mut stream = h2
56+
.drive(util::wait_for_capacity(stream, payload.len()))
57+
.await;
5658

5759
// Send the data
5860
stream.send_data(payload.into(), true).unwrap();
@@ -108,7 +110,9 @@ async fn multiple_streams_with_payload_greater_than_default_window() {
108110
// The capacity should be immediately
109111
// allocated to default window size (smaller than payload)
110112
stream1.reserve_capacity(payload_clone.len());
111-
assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE);
113+
let mut stream1 = conn
114+
.drive(util::wait_for_capacity(stream1, DEFAULT_WINDOW_SIZE))
115+
.await;
112116

113117
stream2.reserve_capacity(payload_clone.len());
114118
assert_eq!(stream2.capacity(), 0);
@@ -179,7 +183,9 @@ async fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
179183
stream.reserve_capacity(payload.len());
180184

181185
// The capacity should be immediately allocated
182-
assert_eq!(stream.capacity(), payload.len());
186+
let mut stream = h2
187+
.drive(util::wait_for_capacity(stream, payload.len()))
188+
.await;
183189

184190
// Send the data
185191
stream.send_data(payload.into(), true).unwrap();
@@ -296,13 +302,13 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
296302
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
297303
33, 233, 132,
298304
])
305+
.write(frames::SETTINGS_ACK)
306+
.read(frames::SETTINGS_ACK)
299307
.write(&[
300308
// DATA
301309
0, 64, 0, 0, 0, 0, 0, 0, 1,
302310
])
303311
.write(&payload[0..16_384])
304-
.write(frames::SETTINGS_ACK)
305-
.read(frames::SETTINGS_ACK)
306312
.wait(Duration::from_millis(10))
307313
.write(&[
308314
// DATA
@@ -326,7 +332,9 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
326332
stream.reserve_capacity(payload.len());
327333

328334
// The capacity should be immediately allocated
329-
assert_eq!(stream.capacity(), payload.len());
335+
let mut stream = h2
336+
.drive(util::wait_for_capacity(stream, payload.len()))
337+
.await;
330338

331339
// Send the data
332340
stream.send_data(payload.into(), true).unwrap();

tests/h2-tests/tests/stream_states.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ async fn send_recv_data() {
5151
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
5252
33, 233, 132,
5353
])
54+
.write(frames::SETTINGS_ACK)
5455
.write(&[
5556
// DATA
5657
0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111,
5758
])
58-
.write(frames::SETTINGS_ACK)
5959
// Read response
6060
.read(&[
6161
// HEADERS
@@ -78,10 +78,10 @@ async fn send_recv_data() {
7878
// Reserve send capacity
7979
stream.reserve_capacity(5);
8080

81-
assert_eq!(stream.capacity(), 5);
81+
let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;
8282

8383
// Send the data
84-
stream.send_data("hello".as_bytes(), true).unwrap();
84+
stream.send_data("hello".into(), true).unwrap();
8585

8686
// Get the response
8787
let resp = h2.run(response).await.unwrap();

0 commit comments

Comments
 (0)