From 5c9e9d37189facc298cf7b26a6eba5ed03f5d28c Mon Sep 17 00:00:00 2001 From: Kan-Ru Chen Date: Thu, 9 Jan 2025 20:35:30 +0900 Subject: [PATCH] Improve NodeSession TCP performance Each of NodeSession's message is encoded with a 2 bytes length prefix followed by the actual object data. Originally when writing the message to TcpStream we write them separately, however the TcpStream is not buffered, so the first 2 bytes write may be pushed to the underlying network stack as a tiny packet, then the data is pushed as one or more packets. The first tiny packet might trigger the pathological interaction between Nagle's Algorithm and Delayed Acknowledgement in TCP, causing from 40ms to 200ms delay of the actual payload delivery. This can be observed from the average 40ms latency between two nodes on the same host conneting via the loopback interface. Combine the length prefix and payload data to one write_all call to avoid this latency anomaly. Since the message is already length prefixed, also skip the length delimiter encoding in proto. Signed-off-by: Kan-Ru Chen --- ractor_cluster/src/net/session.rs | 41 ++++++++++++------------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/ractor_cluster/src/net/session.rs b/ractor_cluster/src/net/session.rs index 5856163..c42d809 100644 --- a/ractor_cluster/src/net/session.rs +++ b/ractor_cluster/src/net/session.rs @@ -7,6 +7,7 @@ // TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs +use std::io::Write; use std::net::SocketAddr; use bytes::Bytes; @@ -240,15 +241,6 @@ enum ActorWriteHalf { } impl ActorWriteHalf { - async fn write_u64(&mut self, n: u64) -> tokio::io::Result<()> { - use tokio::io::AsyncWriteExt; - match self { - Self::ServerTls(t) => t.write_u64(n).await, - Self::ClientTls(t) => t.write_u64(n).await, - Self::Regular(t) => t.write_u64(n).await, - } - } - async fn write_all(&mut self, data: &[u8]) -> tokio::io::Result<()> { use tokio::io::AsyncWriteExt; match self { @@ -337,23 +329,22 @@ impl Actor for SessionWriter { w.writable().await?; } - let encoded_data = msg.encode_length_delimited_to_vec(); - if let Err(write_err) = stream.write_u64(encoded_data.len() as u64).await { + // encode payload with length prefixed of proto encoded binary data + let len = msg.encoded_len(); + let mut buf: Vec = Vec::with_capacity(len + size_of::()); + buf.write_all(&len.to_be_bytes()) + .expect("buffer should have enough capacity"); + msg.encode(&mut buf) + .expect("buffer should have enough capacity"); + tracing::trace!("Writing payload (len={len})",); + // now send the object + if let Err(write_err) = stream.write_all(&buf).await { tracing::warn!("Error writing to the stream '{write_err}'"); - } else { - tracing::trace!( - "Wrote length, writing payload (len={})", - encoded_data.len() - ); - // now send the object - if let Err(write_err) = stream.write_all(&encoded_data).await { - tracing::warn!("Error writing to the stream '{write_err}'"); - myself.stop(Some("channel_closed".to_string())); - return Ok(()); - } - // flush the stream - stream.flush().await.unwrap(); + myself.stop(Some("channel_closed".to_string())); + return Ok(()); } + // flush the stream + stream.flush().await.unwrap(); } } _ => { @@ -454,7 +445,7 @@ impl Actor for SessionReader { // [buf] here should contain the exact amount of data to decode an object properly. let bytes = Bytes::from(buf); - match crate::protocol::NetworkMessage::decode_length_delimited(bytes) { + match crate::protocol::NetworkMessage::decode(bytes) { Ok(msg) => { // we decoded a message, pass it up the chain let _ = self.session.cast(SessionMessage::ObjectAvailable(msg));