Skip to content

Commit

Permalink
Improve NodeSession TCP performance (#319)
Browse files Browse the repository at this point in the history
Each of NodeSession's message is encoded with a 2 bytes length
prefix followed by the actual object data. Originally when writing
the message to TcpStream we write them separately, however the
TcpStream is not buffered, so the first 2 bytes write may be pushed
to the underlying network stack as a tiny packet, then the data
is pushed as one or more packets.

The first tiny packet might trigger the pathological interaction
between Nagle's Algorithm and Delayed Acknowledgement in TCP, causing
from 40ms to 200ms delay of the actual payload delivery. This can
be observed from the average 40ms latency between two nodes on the
same host conneting via the loopback interface.

Combine the length prefix and payload data to one write_all call
to avoid this latency anomaly. Since the message is already
length prefixed, also skip the length delimiter encoding in proto.

Signed-off-by: Kan-Ru Chen <[email protected]>
  • Loading branch information
kanru authored Jan 13, 2025
1 parent 9f8a239 commit 081c6b7
Showing 1 changed file with 16 additions and 25 deletions.
41 changes: 16 additions & 25 deletions ractor_cluster/src/net/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs

use std::io::Write;
use std::net::SocketAddr;

use bytes::Bytes;
Expand Down Expand Up @@ -240,15 +241,6 @@ enum ActorWriteHalf {
}

impl ActorWriteHalf {
async fn write_u64(&mut self, n: u64) -> tokio::io::Result<()> {
use tokio::io::AsyncWriteExt;
match self {
Self::ServerTls(t) => t.write_u64(n).await,
Self::ClientTls(t) => t.write_u64(n).await,
Self::Regular(t) => t.write_u64(n).await,
}
}

async fn write_all(&mut self, data: &[u8]) -> tokio::io::Result<()> {
use tokio::io::AsyncWriteExt;
match self {
Expand Down Expand Up @@ -337,23 +329,22 @@ impl Actor for SessionWriter {
w.writable().await?;
}

let encoded_data = msg.encode_length_delimited_to_vec();
if let Err(write_err) = stream.write_u64(encoded_data.len() as u64).await {
// encode payload with length prefixed of proto encoded binary data
let len = msg.encoded_len();
let mut buf: Vec<u8> = Vec::with_capacity(len + size_of::<u64>());
buf.write_all(&len.to_be_bytes())
.expect("buffer should have enough capacity");
msg.encode(&mut buf)
.expect("buffer should have enough capacity");
tracing::trace!("Writing payload (len={len})",);
// now send the object
if let Err(write_err) = stream.write_all(&buf).await {
tracing::warn!("Error writing to the stream '{write_err}'");
} else {
tracing::trace!(
"Wrote length, writing payload (len={})",
encoded_data.len()
);
// now send the object
if let Err(write_err) = stream.write_all(&encoded_data).await {
tracing::warn!("Error writing to the stream '{write_err}'");
myself.stop(Some("channel_closed".to_string()));
return Ok(());
}
// flush the stream
stream.flush().await.unwrap();
myself.stop(Some("channel_closed".to_string()));
return Ok(());
}
// flush the stream
stream.flush().await.unwrap();
}
}
_ => {
Expand Down Expand Up @@ -454,7 +445,7 @@ impl Actor for SessionReader {

// [buf] here should contain the exact amount of data to decode an object properly.
let bytes = Bytes::from(buf);
match crate::protocol::NetworkMessage::decode_length_delimited(bytes) {
match crate::protocol::NetworkMessage::decode(bytes) {
Ok(msg) => {
// we decoded a message, pass it up the chain
let _ = self.session.cast(SessionMessage::ObjectAvailable(msg));
Expand Down

0 comments on commit 081c6b7

Please sign in to comment.