Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion h3i/src/actions/h3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! Actions, that h3i iterates over in sequence and executes.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use crate::quiche;
Expand All @@ -50,7 +51,7 @@ use crate::encode_header_block_literal;
/// sequentially. Note that packets will be flushed when said iteration has
/// completed, regardless of if an [`Action::FlushPackets`] was the terminal
/// action.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq)]
pub enum Action {
/// Send a [quiche::h3::frame::Frame] over a stream.
SendFrame {
Expand Down Expand Up @@ -110,6 +111,43 @@ pub enum Action {
Wait {
wait_type: WaitType,
},

/// Wait for an event. See [WaitType] for the events.
RunCallback {
cb: CustomCallback,
},
}

pub struct CustomCallback {
cb: Arc<dyn Fn() + Send + Sync + 'static>,
}

impl CustomCallback {
pub fn new(cb: Arc<dyn Fn() + Send + Sync + 'static>) -> Self {
Self { cb }
}

pub fn run(&self) {
(self.cb)()
}
}

impl PartialEq for CustomCallback {
fn eq(&self, _other: &Self) -> bool {
false
}
}

impl Clone for CustomCallback {
fn clone(&self) -> Self {
unreachable!()
}
}

impl std::fmt::Debug for CustomCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CustomCallback").finish()
}
}

/// Configure the wait behavior for a connection.
Expand Down
1 change: 1 addition & 0 deletions h3i/src/client/async_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ impl ApplicationOverQuic for H3iDriver {
Action::StopSending { .. } |
Action::OpenUniStream { .. } |
Action::ConnectionClose { .. } |
Action::RunCallback { .. } |
Action::SendHeadersFrame { .. } => {
if self.should_fire() {
// Reset the fire time such that the next action will
Expand Down
4 changes: 4 additions & 0 deletions h3i/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ pub(crate) fn execute_action(
stream_parsers: &mut StreamParserMap,
) {
match action {
Action::RunCallback { cb } => {
cb.run();
},

Action::SendFrame {
stream_id,
fin_stream,
Expand Down
2 changes: 2 additions & 0 deletions h3i/src/recordreplay/qlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub struct H3FrameCreatedEx {
impl From<&Action> for QlogEvents {
fn from(action: &Action) -> Self {
match action {
Action::RunCallback { .. } => unreachable!(),

Action::SendFrame {
stream_id,
fin_stream,
Expand Down
17 changes: 6 additions & 11 deletions tokio-quiche/src/http3/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;

use datagram_socket::StreamClosureKind;
use foundations::telemetry::log;
Expand Down Expand Up @@ -726,18 +725,11 @@ impl<H: DriverHooks> H3Driver<H> {
};

if let Err(h3::Error::StreamBlocked) = res {
ctx.first_full_headers_flush_fail_time
.get_or_insert(Instant::now());
ctx.full_headers_flush_blocked();
}

if res.is_ok() {
if let Some(first) =
ctx.first_full_headers_flush_fail_time.take()
{
ctx.audit_stats.add_header_flush_duration(
Instant::now().duration_since(first),
);
}
ctx.full_headers_flush_success();
}

res
Expand Down Expand Up @@ -1266,10 +1258,13 @@ impl<H: DriverHooks> ApplicationOverQuic for H3Driver<H> {

impl<H: DriverHooks> Drop for H3Driver<H> {
fn drop(&mut self) {
for stream in self.stream_map.values() {
for stream in self.stream_map.values_mut() {
stream
.audit_stats
.set_recvd_stream_fin(StreamClosureKind::Implicit);

// Update stats if there were pending header sends on this stream.
stream.full_headers_flush_aborted();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix for flaky test could be to move the contents of this function to IoWorker::close_connection and wait until metrics.connections_in_memory is 0 before making assertions in the test.

}
}
}
Expand Down
37 changes: 36 additions & 1 deletion tokio-quiche/src/http3/driver/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub(crate) struct StreamCtx {
/// Indicates the stream sent initial headers.
pub(crate) initial_headers_sent: bool,
/// First time that a HEADERS frame was not fully flushed.
pub(crate) first_full_headers_flush_fail_time: Option<Instant>,
first_full_headers_flush_fail_time: Option<Instant>,
/// Indicates the stream received fin or reset. No more data will be
/// received.
pub(crate) fin_or_reset_recv: bool,
Expand Down Expand Up @@ -92,6 +92,37 @@ impl StreamCtx {
(ctx, PollSender::new(backward_sender), forward_receiver)
}

/// Signal that there was a headers flush attempt that failed due
/// to insufficient flow control or congestion control window.
pub(crate) fn full_headers_flush_blocked(&mut self) {
if self.first_full_headers_flush_fail_time.is_none() {
self.audit_stats.set_headers_pending_flush(true);
self.first_full_headers_flush_fail_time = Some(Instant::now());
}
}

/// Signal that there was a headers flush attempt was successful.
pub(crate) fn full_headers_flush_success(&mut self) {
// Clear headers_pending_flush since headers were successfully flushed.
self.audit_stats.set_headers_pending_flush(false);

self.maybe_update_header_flush_duration();
}

/// Signal that the headers flush was aborted due to stream being reset.
pub(crate) fn full_headers_flush_aborted(&mut self) {
self.maybe_update_header_flush_duration();
}

// Helper used to implment full_headers_flush_success and
// full_headers_flush_aborted
fn maybe_update_header_flush_duration(&mut self) {
if let Some(first) = self.first_full_headers_flush_fail_time.take() {
self.audit_stats
.add_header_flush_duration(Instant::now().duration_since(first));
}
}

/// Creates a [Future] that resolves when `send` has capacity again.
pub(crate) fn wait_for_send(&mut self, stream_id: u64) -> WaitForStream {
WaitForStream::Upstream(WaitForUpstreamCapacity {
Expand Down Expand Up @@ -122,6 +153,7 @@ impl StreamCtx {
self.audit_stats
.set_recvd_stop_sending_error_code(wire_err_code as i64);
self.fin_or_reset_sent = true;
self.full_headers_flush_aborted();
// Drop any pending data and close the write side.
// We can't accept additional frames
self.queued_frame = None;
Expand All @@ -137,6 +169,7 @@ impl StreamCtx {
self.audit_stats
.set_recvd_reset_stream_error_code(wire_err_code as i64);
self.fin_or_reset_recv = true;
self.full_headers_flush_aborted();
self.send = None;
}

Expand All @@ -145,6 +178,7 @@ impl StreamCtx {
self.audit_stats
.set_sent_reset_stream_error_code(wire_err_code as i64);
self.fin_or_reset_sent = true;
self.full_headers_flush_aborted();
}

pub(crate) fn handle_sent_stop_sending(&mut self, wire_err_code: u64) {
Expand All @@ -155,6 +189,7 @@ impl StreamCtx {
// must still send a fin or reset_stream with its final size, we don't
// need to read it from the stream. Quiche will take care of that.
self.fin_or_reset_recv = true;
self.full_headers_flush_aborted();
self.send = None;
}

Expand Down
32 changes: 32 additions & 0 deletions tokio-quiche/src/http3/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -69,6 +70,8 @@ pub struct H3AuditStats {
/// Measured across all HEADERS frames sent on the stream. A value of 0
/// indicates there was no failed flushing.
headers_flush_duration: AtomicCell<Duration>,
/// True if the stream currently has headers pending flush.
headers_pending_flush: AtomicBool,
}

impl H3AuditStats {
Expand All @@ -84,6 +87,7 @@ impl H3AuditStats {
recvd_stream_fin: AtomicCell::new(StreamClosureKind::None),
sent_stream_fin: AtomicCell::new(StreamClosureKind::None),
headers_flush_duration: AtomicCell::new(Duration::from_secs(0)),
headers_pending_flush: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -149,6 +153,11 @@ impl H3AuditStats {
self.sent_stream_fin.load()
}

#[inline]
pub fn headers_pending_flush(&self) -> bool {
self.headers_pending_flush.load(Ordering::SeqCst)
}

/// Cumulative time between HEADERS failed flush and complete.
///
/// Measured as the duration between the first moment a HEADERS frame was
Expand Down Expand Up @@ -221,4 +230,27 @@ impl H3AuditStats {
let current = self.headers_flush_duration.load();
self.headers_flush_duration.store(current + duration);
}

#[inline]
pub fn set_headers_pending_flush(&self, value: bool) {
self.headers_pending_flush.store(value, Ordering::SeqCst);
}
}

#[cfg(test)]
mod tests {
use super::*;

fn is_lock_free<T>(_: &AtomicCell<T>) -> bool {
AtomicCell::<T>::is_lock_free()
}

#[test]
fn atomic_cells_are_lock_free() {
let stats = H3AuditStats::new(0x07ac0ca707ac0ca7);
assert!(is_lock_free(&stats.recvd_stream_fin));
assert!(is_lock_free(&stats.sent_stream_fin));
// BUG! AtomicCell uses global lock.
assert!(!is_lock_free(&stats.headers_flush_duration));
}
}
Loading