From a9db96c65de8b074aabb1bfa7ae1b5379c5a4113 Mon Sep 17 00:00:00 2001 From: iyzhang Date: Thu, 1 May 2025 10:02:17 -0700 Subject: [PATCH] [inetstack] Bug Fix: Simultaneous close misses FIN resend --- .../input/tcp/close/close-local.pkt | 5 +- .../input/tcp/close/close-simultaneous.pkt | 19 +- .../protocols/layer4/tcp/established/mod.rs | 37 +- .../layer4/tcp/established/receiver.rs | 331 ++++++++++-------- .../layer4/tcp/established/sender.rs | 92 +++-- 5 files changed, 272 insertions(+), 212 deletions(-) diff --git a/network_simulator/input/tcp/close/close-local.pkt b/network_simulator/input/tcp/close/close-local.pkt index d2d71b713..cc662dde7 100644 --- a/network_simulator/input/tcp/close/close-local.pkt +++ b/network_simulator/input/tcp/close/close-local.pkt @@ -25,10 +25,11 @@ // Receive FIN segment. +.1 TCP < F. seq 2001(0) ack 65537 win 65535 +// Send ACK on FIN segment. ++.0 TCP > . seq 65537(0) ack 2002 win 65534 + // Succeed to close connection immediately because we have linger set to 0. +0 wait(500, ...) = 0 -// Send ACK on FIN segment. -+.0 TCP > . seq 65537(0) ack 2002 win 65534 diff --git a/network_simulator/input/tcp/close/close-simultaneous.pkt b/network_simulator/input/tcp/close/close-simultaneous.pkt index 7d14965fd..6a74548cd 100644 --- a/network_simulator/input/tcp/close/close-simultaneous.pkt +++ b/network_simulator/input/tcp/close/close-simultaneous.pkt @@ -1,4 +1,4 @@ -// Tests for simultaneous close. +// Tests for simultaneous close. This is Figure 14 of RFC 793. // Establish a connection. +.0 socket(..., SOCK_STREAM, IPPROTO_TCP) = 500 @@ -18,14 +18,17 @@ // Close connection. +.2 close(500) = 0 -// Send FIN segment. +// Send FIN segment. Should move to FINWAIT-1 +.0 TCP > F. seq 65536(0) ack 101 win 65535 -// Receive ACK on FIN segment. -+.1 TCP < F. seq 101(0) ack 65537 win 65535 -// Succeed to close connection immediately because we have linger set to 0. -+0 wait(500, ...) = 0 +// Receive FIN but no ack on FIN. Should move to Closing. ++.1 TCP < F. seq 101(0) ack 65536 win 65535 + +// Resend FIN on simultaneous FIN segment with FIN ack. ++.0 TCP > F. seq 65536(0) ack 102 win 65534 -// Send ACK on FIN segment. -+.0 TCP > . seq 65537(0) ack 102 win 65534 +// Receive ACK on FIN segment. Should move to TIME WAIT ++.1 TCP < . seq 102(0) ack 65537 win 65534 +// Succeed to close connection immediately because we have linger set to 0. ++0 wait(500, ...) = 0 diff --git a/src/rust/inetstack/protocols/layer4/tcp/established/mod.rs b/src/rust/inetstack/protocols/layer4/tcp/established/mod.rs index 32428a47b..5138a4469 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/established/mod.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/established/mod.rs @@ -35,13 +35,12 @@ use crate::{ SharedDemiRuntime, SharedObject, }, }; -use ::futures::pin_mut; -use ::futures::FutureExt; +use ::futures::{join, pin_mut, FutureExt}; use ::std::{ net::SocketAddrV4, ops::{Deref, DerefMut}, - time::Duration, - time::Instant, + pin::pin, + time::{Duration, Instant}, }; //====================================================================================================================== @@ -206,19 +205,17 @@ impl SharedEstablishedSocket { async fn local_close(&mut self) -> Result<(), Fail> { // 1. Start close protocol by setting state and sending FIN. self.cb.state = State::FinWait1; - Sender::push_fin_and_wait_for_ack(&mut self.cb).await?; - - // 2. Got ACK to our FIN. Check if we also received a FIN from remote in the meantime. - let state: State = self.cb.state; - match state { - State::FinWait1 => { - self.cb.state = State::FinWait2; - // Haven't received a FIN yet from remote, so wait. - self.cb.receiver.wait_for_fin().await?; - }, - State::Closing => self.cb.state = State::TimeWait, - state => unreachable!("Cannot be in any other state at this point: {:?}", state), - }; + Sender::push_fin(&mut self.cb)?; + + // 2. Wait for FIN and FIN ack. + let mut me2: SharedEstablishedSocket = self.clone(); + let mut me3: SharedEstablishedSocket = self.clone(); + let wait_for_fin = pin!(me3.cb.receiver.wait_for_fin().fuse()); + let wait_for_fin_ack = pin!(Sender::wait_for_fin_ack(&mut me2.cb).fuse()); + let (result1, result2) = join!(wait_for_fin, wait_for_fin_ack); + result1?; + result2?; + // 3. TIMED_WAIT debug_assert_eq!(self.cb.state, State::TimeWait); trace!("socket options: {:?}", self.cb.socket_options.get_linger()); @@ -232,8 +229,10 @@ impl SharedEstablishedSocket { // 0. Move state forward self.cb.state = State::LastAck; // 1. Send FIN and wait for ack before closing. - Sender::push_fin_and_wait_for_ack(&mut self.cb).await?; - self.cb.state = State::Closed; + Sender::push_fin(&mut self.cb)?; + Sender::wait_for_fin_ack(&mut self.cb).await?; + debug_assert_eq!(self.cb.state, State::Closed); + Ok(()) } diff --git a/src/rust/inetstack/protocols/layer4/tcp/established/receiver.rs b/src/rust/inetstack/protocols/layer4/tcp/established/receiver.rs index 5979027cf..e6bbb78f0 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/established/receiver.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/established/receiver.rs @@ -111,94 +111,6 @@ impl Receiver { } } - // This function causes a EOF to be returned to the user. We also know that there will be no more incoming - // data after this sequence number. - fn push_fin(&mut self) { - self.pop_queue.push(DemiBuffer::new(0)); - debug_assert_eq!(self.receive_next_seq_no, self.fin_seq_no.get().unwrap()); - // Reset it to wake up any close coroutines waiting for FIN to arrive. - self.fin_seq_no.set(Some(self.receive_next_seq_no)); - // Move RECV_NXT over the FIN. - self.receive_next_seq_no = self.receive_next_seq_no + 1.into(); - } - - pub fn get_receive_window_size(&self) -> u32 { - let bytes_unread: u32 = (self.receive_next_seq_no - self.reader_next_seq_no).into(); - // The window should be less than 1GB or 64KB without scaling. - debug_assert!( - (self.window_scale_shift_bits == 0 && bytes_unread <= MAX_WINDOW_SIZE_WITHOUT_SCALING) - || bytes_unread <= MAX_WINDOW_SIZE_WITH_SCALING - ); - debug!( - "Receive window size: bytes_unread={:?} buffer_size_bytes={:?} ", - bytes_unread, self.buffer_size_bytes - ); - self.buffer_size_bytes - bytes_unread - } - - pub fn hdr_window_size(&self) -> u16 { - let window_size: u32 = self.get_receive_window_size(); - let hdr_window_size: u16 = expect_ok!( - (window_size >> self.window_scale_shift_bits).try_into(), - "Window size overflow" - ); - debug!( - "Window size -> {} (hdr {}, scale {})", - (hdr_window_size as u32) << self.window_scale_shift_bits, - hdr_window_size, - self.window_scale_shift_bits, - ); - hdr_window_size - } - - // This routine takes an incoming in-order TCP segment and adds the data to the user's receive queue. If the new - // segment fills a "hole" in the receive sequence number space allowing previously stored out-of-order data to now - // be received, it receives that too. - // - // This routine also updates receive_next to reflect any data now considered "received". - fn receive_data(&mut self, seg_start: SeqNumber, buf: DemiBuffer) { - // This routine should only be called with in-order segment data. - debug_assert_eq!(seg_start, self.receive_next_seq_no); - - // Push the new segment data onto the end of the receive queue. - self.receive_next_seq_no = self.receive_next_seq_no + SeqNumber::from(buf.len() as u32); - // This inserts the segment and wakes a waiting pop coroutine. - self.pop_queue.push(buf); - - // Okay, we've successfully received some new data. Check if any of the formerly out-of-order data waiting in - // the out-of-order queue is now in-order. If so, we can move it to the receive queue. - while !self.out_of_order_frames.is_empty() { - if let Some(stored_entry) = self.out_of_order_frames.front() { - if stored_entry.0 == self.receive_next_seq_no { - // Move this entry's buffer from the out-of-order store to the receive queue. - // This data is now considered to be "received" by TCP, and included in our RCV.NXT calculation. - debug!("Recovering out-of-order packet at {}", self.receive_next_seq_no); - if let Some(temp) = self.out_of_order_frames.pop_front() { - self.receive_next_seq_no = self.receive_next_seq_no + SeqNumber::from(temp.1.len() as u32); - // This inserts the segment and wakes a waiting pop coroutine. - self.pop_queue.push(temp.1); - } - } else { - // Since our out-of-order list is sorted, we can stop when the next segment is not in sequence. - break; - } - } - } - } - - // Block until the remote sends a FIN (plus all previous data has arrived). - pub async fn wait_for_fin(&mut self) -> Result<(), Fail> { - let mut fin_seq_no: Option = self.fin_seq_no.get(); - loop { - match fin_seq_no { - Some(fin_seq_no) if self.receive_next_seq_no >= fin_seq_no => return Ok(()), - _ => { - fin_seq_no = self.fin_seq_no.wait_for_change(None).await?; - }, - } - } - } - // Block until some data is received, up to an optional size. pub async fn pop(&mut self, size: Option) -> Result { debug!("waiting on pop {:?}", size); @@ -274,22 +186,49 @@ impl Receiver { warn!("Got packet with URG bit set!"); } - let has_data: bool = if data.len() > 0 { - Self::process_data(cb, layer3_endpoint, data, seg_start, seg_end, seg_len)?; - true - } else { - false - }; + // Store whether the packet has data here because processing it will consume the DemiBuffer. + let has_data: bool = data.len() > 0; + Self::process_data_if_any(cb, layer3_endpoint, data, seg_start, seg_end, seg_len)?; // Deal with FIN flag, saving the FIN for later if it is out of order. + Self::check_and_process_fin(cb, &header, seg_end, layer3_endpoint, now)?; + + // We should ACK this segment, preferably via piggybacking on a response. + if cb.receiver.ack_deadline_time_secs.get().is_none() { + // Start the delayed ACK timer to ensure an ACK gets sent soon even if no piggyback opportunity occurs. + let timeout: Duration = cb.receiver.ack_delay_timeout_secs; + // Getting the current time is extremely cheap as it is just a variable lookup. + cb.receiver.ack_deadline_time_secs.set(Some(now + timeout)); + } else if has_data { + // We already owe our peer an ACK (the timer was already running), so cancel the timer and ACK now. + cb.receiver.ack_deadline_time_secs.set(None); + trace!("process_packet(): sending ack before deadline because another packet arrived"); + Sender::send_ack(cb, layer3_endpoint); + } + + Ok(()) + } + + // This function causes a EOF to be returned to the user. We also know that there will be no more incoming + // data after this sequence number. + fn check_and_process_fin( + cb: &mut ControlBlock, + header: &TcpHeader, + seg_end: SeqNumber, + layer3_endpoint: &mut SharedLayer3Endpoint, + now: Instant, + ) -> Result<(), Fail> { + // Is this a FIN packet? if header.fin { match cb.receiver.fin_seq_no.get() { - // We've already received this FIN, so ignore. - Some(seq_no) if seq_no != seg_end => warn!( - "Received a FIN with a different sequence number, ignoring. previous={:?} new={:?}", - seq_no, seg_end, - ), - Some(_) => trace!("Received duplicate FIN"), + // We've already received this FIN. + Some(seq_no) if seg_end != seq_no => { + warn!( + "Received a FIN with a different sequence number, ignoring. previous={:?} new={:?}", + seq_no, seg_end, + ) + }, + Some(_) => (), None => { trace!("Received FIN"); cb.receiver.fin_seq_no.set(seg_end.into()); @@ -297,37 +236,116 @@ impl Receiver { } }; - // Check whether we've received the last packet in this TCP stream. + // Have we received all data before the FIN? if cb .receiver .fin_seq_no .get() .is_some_and(|seq_no| seq_no == cb.receiver.receive_next_seq_no) { - // Once we know there is no more data coming, begin closing down the connection. - Self::process_fin(cb); + let state = match cb.state { + State::Established => State::CloseWait, + State::FinWait1 => State::Closing, + State::FinWait2 => State::TimeWait, + state => unreachable!("Cannot be in any other state at this point: {:?}", state), + }; + cb.state = state; + cb.receiver.pop_queue.push(DemiBuffer::new(0)); + debug_assert_eq!(cb.receiver.receive_next_seq_no, cb.receiver.fin_seq_no.get().unwrap()); + // Reset it to wake up any close coroutines waiting for FIN to arrive. + cb.receiver.fin_seq_no.set(Some(cb.receiver.receive_next_seq_no)); + // Move RECV_NXT over the FIN. + cb.receiver.receive_next_seq_no = cb.receiver.receive_next_seq_no + 1.into(); } - // Send an ack on every FIN. We do this separately here because if the FIN is in order, we ack it after the - // previous line, otherwise we do not ack the FIN. + // Have we processed all of the data and the FIN? if header.fin { - Sender::send_ack(cb, layer3_endpoint) + if cb.state == State::Closing { + warn!("Simultaneous close detected. Resending FIN."); + Sender::send_fin(cb, layer3_endpoint, now)?; + } else { + Sender::send_ack(cb, layer3_endpoint); + } } - // We should ACK this segment, preferably via piggybacking on a response. - if cb.receiver.ack_deadline_time_secs.get().is_none() { - // Start the delayed ACK timer to ensure an ACK gets sent soon even if no piggyback opportunity occurs. - let timeout: Duration = cb.receiver.ack_delay_timeout_secs; - // Getting the current time is extremely cheap as it is just a variable lookup. - cb.receiver.ack_deadline_time_secs.set(Some(now + timeout)); - } else if has_data { - // We already owe our peer an ACK (the timer was already running), so cancel the timer and ACK now. - cb.receiver.ack_deadline_time_secs.set(None); - trace!("process_packet(): sending ack before deadline because another packet arrived"); - Sender::send_ack(cb, layer3_endpoint); + Ok(()) + } + + pub fn get_receive_window_size(&self) -> u32 { + let bytes_unread: u32 = (self.receive_next_seq_no - self.reader_next_seq_no).into(); + // The window should be less than 1GB or 64KB without scaling. + debug_assert!( + (self.window_scale_shift_bits == 0 && bytes_unread <= MAX_WINDOW_SIZE_WITHOUT_SCALING) + || bytes_unread <= MAX_WINDOW_SIZE_WITH_SCALING + ); + debug!( + "Receive window size: bytes_unread={:?} buffer_size_bytes={:?} ", + bytes_unread, self.buffer_size_bytes + ); + self.buffer_size_bytes - bytes_unread + } + + pub fn hdr_window_size(&self) -> u16 { + let window_size: u32 = self.get_receive_window_size(); + let hdr_window_size: u16 = expect_ok!( + (window_size >> self.window_scale_shift_bits).try_into(), + "Window size overflow" + ); + debug!( + "Window size -> {} (hdr {}, scale {})", + (hdr_window_size as u32) << self.window_scale_shift_bits, + hdr_window_size, + self.window_scale_shift_bits, + ); + hdr_window_size + } + + // This routine takes an incoming in-order TCP segment and adds the data to the user's receive queue. If the new + // segment fills a "hole" in the receive sequence number space allowing previously stored out-of-order data to now + // be received, it receives that too. + // + // This routine also updates receive_next to reflect any data now considered "received". + fn receive_data(&mut self, seg_start: SeqNumber, buf: DemiBuffer) { + // This routine should only be called with in-order segment data. + debug_assert_eq!(seg_start, self.receive_next_seq_no); + + // Push the new segment data onto the end of the receive queue. + self.receive_next_seq_no = self.receive_next_seq_no + SeqNumber::from(buf.len() as u32); + // This inserts the segment and wakes a waiting pop coroutine. + self.pop_queue.push(buf); + + // Okay, we've successfully received some new data. Check if any of the formerly out-of-order data waiting in + // the out-of-order queue is now in-order. If so, we can move it to the receive queue. + while !self.out_of_order_frames.is_empty() { + if let Some(stored_entry) = self.out_of_order_frames.front() { + if stored_entry.0 == self.receive_next_seq_no { + // Move this entry's buffer from the out-of-order store to the receive queue. + // This data is now considered to be "received" by TCP, and included in our RCV.NXT calculation. + debug!("Recovering out-of-order packet at {}", self.receive_next_seq_no); + if let Some(temp) = self.out_of_order_frames.pop_front() { + self.receive_next_seq_no = self.receive_next_seq_no + SeqNumber::from(temp.1.len() as u32); + // This inserts the segment and wakes a waiting pop coroutine. + self.pop_queue.push(temp.1); + } + } else { + // Since our out-of-order list is sorted, we can stop when the next segment is not in sequence. + break; + } + } } + } - Ok(()) + // Block until the remote sends a FIN (plus all previous data has arrived). + pub async fn wait_for_fin(&mut self) -> Result<(), Fail> { + let mut fin_seq_no: Option = self.fin_seq_no.get(); + loop { + match fin_seq_no { + Some(fin_seq_no) if self.receive_next_seq_no >= fin_seq_no => return Ok(()), + _ => { + fin_seq_no = self.fin_seq_no.wait_for_change(None).await?; + }, + } + } } // Check to see if the segment is acceptable sequence-wise (i.e. contains some data that fits within the receive @@ -460,8 +478,19 @@ impl Receiver { return Ok(()); } info!("Received RST: remote reset connection"); - if cb.receiver.fin_seq_no.get().is_none() { - cb.receiver.push_fin(); + match cb.receiver.fin_seq_no.get() { + // We've already received a FIN. + Some(seq_no) if seq_no > header.seq_num => { + warn!( + "Received a RST with a lower sequence number, updating. previous={:?} new={:?}", + seq_no, header.seq_num, + ) + }, + Some(_) => (), + None => { + trace!("Received FIN"); + cb.receiver.fin_seq_no.set(Some(header.seq_num)); + }, } cb.state = State::Closed; return Err(Fail::new(libc::ECONNRESET, "remote reset connection")); @@ -505,7 +534,7 @@ impl Receiver { Ok(()) } - fn process_data( + fn process_data_if_any( cb: &mut ControlBlock, layer3_endpoint: &mut SharedLayer3Endpoint, data: DemiBuffer, @@ -513,32 +542,34 @@ impl Receiver { seg_end: SeqNumber, seg_len: u32, ) -> Result<(), Fail> { - // We can only process in-order data. Check for out-of-order segment. - if seg_start != cb.receiver.receive_next_seq_no { - debug!( - "Received out-of-order segment; out_of_order_frames.len() = {:?}", - cb.receiver.out_of_order_frames.len() - ); - debug_assert_ne!(seg_len, 0); - // This segment is out-of-order. If it carries data, we should store it for later processing - // after the "hole" in the sequence number space has been filled. - match cb.state { - State::Established | State::FinWait1 | State::FinWait2 => { - debug_assert_eq!(seg_len, data.len() as u32); - cb.receiver.store_out_of_order_segment(seg_start, seg_end, data); - // Sending an ACK here is only a "MAY" according to the RFCs, but helpful for fast retransmit. - trace!("process_data(): send ack on out-of-order segment"); - Sender::send_ack(cb, layer3_endpoint); - }, - state => warn!("Ignoring data received after FIN (in state {:?}).", state), + if data.len() > 0 { + // We can only process in-order data. Check for out-of-order segment. + if seg_start != cb.receiver.receive_next_seq_no { + debug!( + "Received out-of-order segment; out_of_order_frames.len() = {:?}", + cb.receiver.out_of_order_frames.len() + ); + debug_assert_ne!(seg_len, 0); + // This segment is out-of-order. If it carries data, we should store it for later processing + // after the "hole" in the sequence number space has been filled. + match cb.state { + State::Established | State::FinWait1 | State::FinWait2 => { + debug_assert_eq!(seg_len, data.len() as u32); + cb.receiver.store_out_of_order_segment(seg_start, seg_end, data); + // Sending an ACK here is only a "MAY" according to the RFCs, but helpful for fast retransmit. + trace!("process_data(): send ack on out-of-order segment"); + Sender::send_ack(cb, layer3_endpoint); + }, + state => warn!("Ignoring data received after FIN (in state {:?}).", state), + } + + // We're done with this out-of-order segment. + return Ok(()); } - // We're done with this out-of-order segment. - return Ok(()); + // We can only legitimately receive data in ESTABLISHED, FIN-WAIT-1, and FIN-WAIT-2. + cb.receiver.receive_data(seg_start, data); } - - // We can only legitimately receive data in ESTABLISHED, FIN-WAIT-1, and FIN-WAIT-2. - cb.receiver.receive_data(seg_start, data); Ok(()) } @@ -645,23 +676,13 @@ impl Receiver { } } - fn process_fin(cb: &mut ControlBlock) { - let state = match cb.state { - State::Established => State::CloseWait, - State::FinWait1 => State::Closing, - State::FinWait2 => State::TimeWait, - state => unreachable!("Cannot be in any other state at this point: {:?}", state), - }; - cb.state = state; - cb.receiver.push_fin(); - } - pub async fn acknowledger( cb: &mut ControlBlock, layer3_endpoint: &mut SharedLayer3Endpoint, ) -> Result { let mut ack_deadline: SharedAsyncValue> = cb.receiver.ack_deadline_time_secs.clone(); let mut deadline: Option = ack_deadline.get(); + loop { // TODO: Implement TCP delayed ACKs, subject to restrictions from RFC 1122 // - TCP should implement a delayed ACK diff --git a/src/rust/inetstack/protocols/layer4/tcp/established/sender.rs b/src/rust/inetstack/protocols/layer4/tcp/established/sender.rs index a1e74398c..5a6d20bb3 100644 --- a/src/rust/inetstack/protocols/layer4/tcp/established/sender.rs +++ b/src/rust/inetstack/protocols/layer4/tcp/established/sender.rs @@ -12,7 +12,7 @@ use crate::{ protocols::{ layer3::SharedLayer3Endpoint, layer4::tcp::{ - established::{rto::RtoCalculator, ControlBlock}, + established::{ctrlblk::State, rto::RtoCalculator, ControlBlock}, header::TcpHeader, SeqNumber, }, @@ -138,7 +138,7 @@ impl Sender { } } - fn process_acked_fin(&mut self, bytes_remaining: usize, ack_num: SeqNumber) -> usize { + fn process_acked_fin(cb: &mut ControlBlock, bytes_remaining: usize, ack_num: SeqNumber) -> usize { // This buffer is the end-of-send marker. So we should only have one byte of acknowledged // sequence space remaining (corresponding to our FIN). debug_assert_eq!(bytes_remaining, 1); @@ -146,10 +146,22 @@ impl Sender { // Double check that the ack is for the FIN sequence number. debug_assert_eq!( ack_num, - self.fin_seq_no + cb.sender + .fin_seq_no .map(|s| { s + 1.into() }) .expect("should have a FIN set") ); + + cb.state = match cb.state { + State::FinWait1 => State::FinWait2, + State::Closing => State::TimeWait, + State::LastAck => State::Closed, + state => unreachable!( + "cannot receive a response to a FIN if one was not sent in state {:?}", + state + ), + }; + 0 } @@ -257,16 +269,21 @@ impl Sender { } // Places a FIN marker in the outgoing data stream. No data can be pushed after this. - pub async fn push_fin_and_wait_for_ack(cb: &mut ControlBlock) -> Result<(), Fail> { + pub fn push_fin(cb: &mut ControlBlock) -> Result<(), Fail> { debug_assert!(cb.sender.fin_seq_no.is_none()); // TODO: We need to fix this the correct way: limit our send buffer size to the amount we're willing to buffer. - if cb.sender.unsent_queue.len() > UNSENT_QUEUE_CUTOFF { + if cb.sender.unsent_queue.len() >= UNSENT_QUEUE_CUTOFF { return Err(Fail::new(libc::EBUSY, "too many packets to send")); } cb.sender.fin_seq_no = Some(cb.sender.unsent_next_seq_no); cb.sender.unsent_next_seq_no = cb.sender.unsent_next_seq_no + 1.into(); cb.sender.unsent_queue.push(None); + Ok(()) + } + + pub async fn wait_for_fin_ack(cb: &mut ControlBlock) -> Result<(), Fail> { + debug_assert_eq!(cb.sender.unsent_next_seq_no, cb.sender.fin_seq_no.unwrap() + 1.into()); let mut send_unacked_watched: SharedAsyncValue = cb.sender.send_unacked.clone(); let fin_ack_num: SeqNumber = cb.sender.unsent_next_seq_no; @@ -293,24 +310,40 @@ impl Sender { } } - fn send_fin(cb: &mut ControlBlock, layer3_endpoint: &mut SharedLayer3Endpoint, now: Instant) -> Result<(), Fail> { - let mut header: TcpHeader = Self::tcp_header(cb, None); - debug_assert!(cb.sender.fin_seq_no.is_some_and(|s| { s == header.seq_num })); - header.fin = true; - Self::emit(cb, layer3_endpoint, header, None); - // Update SND.NXT. - cb.sender.send_next_seq_no.modify(|s| s + 1.into()); + // Send (or resend) FIN. + pub fn send_fin( + cb: &mut ControlBlock, + layer3_endpoint: &mut SharedLayer3Endpoint, + now: Instant, + ) -> Result<(), Fail> { + // First time we are sending a FIN. + if let Some(fin_seq_no) = cb.sender.fin_seq_no { + if fin_seq_no > cb.sender.send_next_seq_no.get() { + return Err(Fail::new(libc::EAGAIN, "FIN still in the unsent queue.")); + } else if fin_seq_no == cb.sender.send_next_seq_no.get() { + // Previously had not sent a FIN yet. + // Update SND.NXT. + cb.sender.send_next_seq_no.modify(|s| s + 1.into()); + + // Add the FIN to our unacknowledged queue. + let unacked_segment = UnackedSegment { + bytes: None, + initial_tx: Some(now), + }; + cb.sender.unacked_queue.push(unacked_segment); + } + debug_assert_eq!(cb.sender.send_next_seq_no.get(), fin_seq_no + 1.into()); - // Add the FIN to our unacknowledged queue. - let unacked_segment = UnackedSegment { - bytes: None, - initial_tx: Some(now), - }; - cb.sender.unacked_queue.push(unacked_segment); - // Set the retransmit timer. - if cb.sender.retransmit_deadline_time_secs.get().is_none() { - let rto: Duration = cb.sender.rto_calculator.rto(); - cb.sender.retransmit_deadline_time_secs.set(Some(now + rto)); + // Send the FIN. + let mut header: TcpHeader = Self::tcp_header(cb, Some(fin_seq_no)); + header.fin = true; + Self::emit(cb, layer3_endpoint, header, None); + + // Set the retransmit timer. + if cb.sender.retransmit_deadline_time_secs.get().is_none() { + let rto: Duration = cb.sender.rto_calculator.rto(); + cb.sender.retransmit_deadline_time_secs.set(Some(now + rto)); + } } Ok(()) } @@ -561,9 +594,7 @@ impl Sender { pin_mut!(something_changed); match conditional_yield_until(something_changed, rtx_deadline).await { Ok(()) => match cb.sender.fin_seq_no { - Some(fin_seq_no) if cb.sender.send_unacked.get() > fin_seq_no => { - return Err(Fail::new(libc::ECONNRESET, "connection closed")); - }, + Some(fin_seq_no) if cb.sender.send_unacked.get() > fin_seq_no => {}, _ => continue, }, Err(Fail { errno, cause: _ }) if errno == libc::ETIMEDOUT => { @@ -637,7 +668,7 @@ impl Sender { while bytes_remaining != 0 { bytes_remaining = match cb.sender.unacked_queue.try_pop() { Some(segment) if segment.bytes.is_none() => { - cb.sender.process_acked_fin(bytes_remaining, header.ack_num) + Self::process_acked_fin(cb, bytes_remaining, header.ack_num) }, Some(segment) => cb.sender.process_acked_segment(bytes_remaining, segment, now), None => { @@ -686,11 +717,16 @@ impl Sender { // Only perform this debug print in debug builds. debug_assertions is compiler set in non-optimized builds. let mut pkt = match body { Some(body) => { - debug!("L4 OUTGOING {} bytes + {:?}", body.len(), header); + debug!( + "L4 OUTGOING {:?} Connection sending {} bytes + {:?}", + cb.state, + body.len(), + header + ); body }, _ => { - debug!("L4 OUTGOING 0 bytes + {:?}", header); + debug!("L4 OUTGOING {:?} Connection sending 0 bytes + {:?}", cb.state, header); DemiBuffer::new_with_headroom(0, MAX_HEADER_SIZE as u16) }, };