Skip to content

Commit

Permalink
Improve NodeSession TCP performance
Browse files Browse the repository at this point in the history
Each of NodeSession's message is encoded with a 2 bytes length
prefix followed by the actual object data. Originally when writing
the message to TcpStream we write them separately, however the
TcpStream is not buffered, so the first 2 bytes write may be pushed
to the underlying network stack as a tiny packet, then the data
is pushed as one or more packets.

The first tiny packet might trigger the pathological interaction
between Nagle's Algorithm and Delayed Acknowledgement in TCP, causing
from 40ms to 200ms delay of the actual payload delivery. This can
be observed from the average 40ms latency between two nodes on the
same host conneting via the loopback interface.

Combine the length prefix and payload data to one write_all call
to avoid this latency anomaly. Since the message is already
length prefixed, also skip the length delimiter encoding in proto.

Signed-off-by: Kan-Ru Chen <[email protected]>
  • Loading branch information
kanru committed Jan 13, 2025
1 parent 9f8a239 commit 5c9e9d3
Showing 1 changed file with 16 additions and 25 deletions.
41 changes: 16 additions & 25 deletions ractor_cluster/src/net/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs

use std::io::Write;
use std::net::SocketAddr;

use bytes::Bytes;
Expand Down Expand Up @@ -240,15 +241,6 @@ enum ActorWriteHalf {
}

impl ActorWriteHalf {
async fn write_u64(&mut self, n: u64) -> tokio::io::Result<()> {
use tokio::io::AsyncWriteExt;
match self {
Self::ServerTls(t) => t.write_u64(n).await,
Self::ClientTls(t) => t.write_u64(n).await,
Self::Regular(t) => t.write_u64(n).await,
}
}

async fn write_all(&mut self, data: &[u8]) -> tokio::io::Result<()> {
use tokio::io::AsyncWriteExt;
match self {
Expand Down Expand Up @@ -337,23 +329,22 @@ impl Actor for SessionWriter {
w.writable().await?;
}

let encoded_data = msg.encode_length_delimited_to_vec();
if let Err(write_err) = stream.write_u64(encoded_data.len() as u64).await {
// encode payload with length prefixed of proto encoded binary data
let len = msg.encoded_len();
let mut buf: Vec<u8> = Vec::with_capacity(len + size_of::<u64>());
buf.write_all(&len.to_be_bytes())
.expect("buffer should have enough capacity");
msg.encode(&mut buf)
.expect("buffer should have enough capacity");
tracing::trace!("Writing payload (len={len})",);

Check warning on line 339 in ractor_cluster/src/net/session.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/net/session.rs#L333-L339

Added lines #L333 - L339 were not covered by tests
// now send the object
if let Err(write_err) = stream.write_all(&buf).await {

Check warning on line 341 in ractor_cluster/src/net/session.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/net/session.rs#L341

Added line #L341 was not covered by tests
tracing::warn!("Error writing to the stream '{write_err}'");
} else {
tracing::trace!(
"Wrote length, writing payload (len={})",
encoded_data.len()
);
// now send the object
if let Err(write_err) = stream.write_all(&encoded_data).await {
tracing::warn!("Error writing to the stream '{write_err}'");
myself.stop(Some("channel_closed".to_string()));
return Ok(());
}
// flush the stream
stream.flush().await.unwrap();
myself.stop(Some("channel_closed".to_string()));
return Ok(());

Check warning on line 344 in ractor_cluster/src/net/session.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/net/session.rs#L343-L344

Added lines #L343 - L344 were not covered by tests
}
// flush the stream
stream.flush().await.unwrap();

Check warning on line 347 in ractor_cluster/src/net/session.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/net/session.rs#L346-L347

Added lines #L346 - L347 were not covered by tests
}
}
_ => {
Expand Down Expand Up @@ -454,7 +445,7 @@ impl Actor for SessionReader {

// [buf] here should contain the exact amount of data to decode an object properly.
let bytes = Bytes::from(buf);
match crate::protocol::NetworkMessage::decode_length_delimited(bytes) {
match crate::protocol::NetworkMessage::decode(bytes) {

Check warning on line 448 in ractor_cluster/src/net/session.rs

View check run for this annotation

Codecov / codecov/patch

ractor_cluster/src/net/session.rs#L448

Added line #L448 was not covered by tests
Ok(msg) => {
// we decoded a message, pass it up the chain
let _ = self.session.cast(SessionMessage::ObjectAvailable(msg));
Expand Down

0 comments on commit 5c9e9d3

Please sign in to comment.