Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,7 @@ impl Prioritize {

// `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
// cannot be assigned at the time it is called.
//
// Streams over the max concurrent count will still call `send_data` so we should be
// careful not to put it into `pending_capacity` as it will starve the connection
// capacity for other streams
if !stream.is_pending_open {
self.try_assign_capacity(stream);
}
self.try_assign_capacity(stream);
}

if frame.is_end_stream() {
Expand Down Expand Up @@ -414,6 +408,12 @@ impl Prioritize {

/// Request capacity to send data
fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
// Streams over the max concurrent count should not have capacity assign to avoid starving the connection
// capacity for open streams
if stream.is_pending_open {
return;
}

let total_requested = stream.requested_send_capacity;

// Total requested should never go below actual assigned
Expand Down
130 changes: 121 additions & 9 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ async fn stream_close_by_data_frame_releases_capacity() {

// The capacity should be immediately available as nothing else is
// happening on the stream.
assert_eq!(s1.capacity(), window_size);
let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await;

let request = Request::builder()
.method(Method::POST)
Expand All @@ -414,7 +414,7 @@ async fn stream_close_by_data_frame_releases_capacity() {
s1.send_data("".into(), true).unwrap();

// The capacity should be available
assert_eq!(s2.capacity(), 5);
let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await;

// Send the frame
s2.send_data("hello".into(), true).unwrap();
Expand Down Expand Up @@ -461,9 +461,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() {
// This effectively reserves the entire connection window
s1.reserve_capacity(window_size);

// The capacity should be immediately available as nothing else is
// happening on the stream.
assert_eq!(s1.capacity(), window_size);
let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await;

let request = Request::builder()
.method(Method::POST)
Expand All @@ -486,7 +484,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() {
s1.send_trailers(Default::default()).unwrap();

// The capacity should be available
assert_eq!(s2.capacity(), 5);
let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await;

// Send the frame
s2.send_data("hello".into(), true).unwrap();
Expand Down Expand Up @@ -919,10 +917,10 @@ async fn recv_no_init_window_then_receive_some_init_window() {

let (response, mut stream) = client.send_request(request, false).unwrap();

stream.reserve_capacity(11);
stream.reserve_capacity(10);

let mut stream = h2.drive(util::wait_for_capacity(stream, 11)).await;
assert_eq!(stream.capacity(), 11);
let mut stream = h2.drive(util::wait_for_capacity(stream, 10)).await;
assert_eq!(stream.capacity(), 10);

stream.send_data("hello world".into(), true).unwrap();

Expand Down Expand Up @@ -1990,6 +1988,120 @@ async fn reclaim_reserved_capacity() {
join(mock, h2).await;
}

#[tokio::test]
async fn capacity_not_assigned_to_unopened_streams() {
h2_support::trace_init!();

let (io, mut srv) = mock::new();

let mock = async move {
let mut settings = frame::Settings::default();
settings.set_max_concurrent_streams(Some(1));
let settings = srv.assert_client_handshake_with_settings(settings).await;
assert_default_settings!(settings);

srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.recv_frame(frames::data(1, "hello")).await;
srv.recv_frame(frames::data(1, "world").eos()).await;
srv.send_frame(frames::headers(1).response(200).eos()).await;

srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::window_update(
0,
frame::DEFAULT_INITIAL_WINDOW_SIZE + 10,
))
.await;
srv.recv_frame(frames::reset(3).cancel()).await;
};

let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap();
stream1.send_data("hello".into(), false).unwrap();
let (_, mut stream2) = client.send_request(request.clone(), false).unwrap();
stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
stream1.send_data("world".into(), true).unwrap();
h2.drive(response1).await.unwrap();
let stream2 = h2
.drive(util::wait_for_capacity(
stream2,
frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
))
.await;
drop(stream2);
h2.await.unwrap();
};

join(mock, h2).await;
}

#[tokio::test]
async fn new_initial_window_size_capacity_not_assigned_to_unopened_streams() {
h2_support::trace_init!();

let (io, mut srv) = mock::new();

let mock = async move {
let mut settings = frame::Settings::default();
settings.set_max_concurrent_streams(Some(1));
settings.set_initial_window_size(Some(10));
let settings = srv.assert_client_handshake_with_settings(settings).await;
assert_default_settings!(settings);

srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.recv_frame(frames::data(1, "hello")).await;
srv.send_frame(frames::settings().initial_window_size(frame::DEFAULT_INITIAL_WINDOW_SIZE))
.await;
srv.recv_frame(frames::settings_ack()).await;
srv.send_frame(frames::headers(1).response(200).eos()).await;
srv.recv_frame(frames::data(1, "world").eos()).await;

srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::window_update(
0,
frame::DEFAULT_INITIAL_WINDOW_SIZE + 10,
))
.await;
srv.recv_frame(frames::reset(3).cancel()).await;
};

let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap();
stream1.send_data("hello".into(), false).unwrap();
let (_, mut stream2) = client.send_request(request.clone(), false).unwrap();
stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
h2.drive(response1).await.unwrap();
stream1.send_data("world".into(), true).unwrap();
let stream2 = h2
.drive(util::wait_for_capacity(
stream2,
frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
))
.await;
drop(stream2);
h2.await.unwrap();
};

join(mock, h2).await;
}

// ==== abusive window updates ====

#[tokio::test]
Expand Down
20 changes: 14 additions & 6 deletions tests/h2-tests/tests/prioritization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ async fn single_stream_send_large_body() {
stream.reserve_capacity(payload.len());

// The capacity should be immediately allocated
assert_eq!(stream.capacity(), payload.len());
let mut stream = h2
.drive(util::wait_for_capacity(stream, payload.len()))
.await;

// Send the data
stream.send_data(payload.into(), true).unwrap();
Expand Down Expand Up @@ -108,7 +110,9 @@ async fn multiple_streams_with_payload_greater_than_default_window() {
// The capacity should be immediately
// allocated to default window size (smaller than payload)
stream1.reserve_capacity(payload_clone.len());
assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE);
let mut stream1 = conn
.drive(util::wait_for_capacity(stream1, DEFAULT_WINDOW_SIZE))
.await;

stream2.reserve_capacity(payload_clone.len());
assert_eq!(stream2.capacity(), 0);
Expand Down Expand Up @@ -179,7 +183,9 @@ async fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
stream.reserve_capacity(payload.len());

// The capacity should be immediately allocated
assert_eq!(stream.capacity(), payload.len());
let mut stream = h2
.drive(util::wait_for_capacity(stream, payload.len()))
.await;

// Send the data
stream.send_data(payload.into(), true).unwrap();
Expand Down Expand Up @@ -296,13 +302,13 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
33, 233, 132,
])
.write(frames::SETTINGS_ACK)
.read(frames::SETTINGS_ACK)
.write(&[
// DATA
0, 64, 0, 0, 0, 0, 0, 0, 1,
])
.write(&payload[0..16_384])
.write(frames::SETTINGS_ACK)
.read(frames::SETTINGS_ACK)
.wait(Duration::from_millis(10))
.write(&[
// DATA
Expand All @@ -326,7 +332,9 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
stream.reserve_capacity(payload.len());

// The capacity should be immediately allocated
assert_eq!(stream.capacity(), payload.len());
let mut stream = h2
.drive(util::wait_for_capacity(stream, payload.len()))
.await;

// Send the data
stream.send_data(payload.into(), true).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions tests/h2-tests/tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ async fn send_recv_data() {
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
33, 233, 132,
])
.write(frames::SETTINGS_ACK)
.write(&[
// DATA
0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111,
])
.write(frames::SETTINGS_ACK)
// Read response
.read(&[
// HEADERS
Expand All @@ -78,10 +78,10 @@ async fn send_recv_data() {
// Reserve send capacity
stream.reserve_capacity(5);

assert_eq!(stream.capacity(), 5);
let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;

// Send the data
stream.send_data("hello".as_bytes(), true).unwrap();
stream.send_data("hello".into(), true).unwrap();

// Get the response
let resp = h2.run(response).await.unwrap();
Expand Down