diff --git a/ractor_cluster/src/net/session.rs b/ractor_cluster/src/net/session.rs index 5856163..c42d809 100644 --- a/ractor_cluster/src/net/session.rs +++ b/ractor_cluster/src/net/session.rs @@ -7,6 +7,7 @@ // TODO: RUSTLS + Tokio : https://github.com/tokio-rs/tls/blob/master/tokio-rustls/examples/server/src/main.rs +use std::io::Write; use std::net::SocketAddr; use bytes::Bytes; @@ -240,15 +241,6 @@ enum ActorWriteHalf { } impl ActorWriteHalf { - async fn write_u64(&mut self, n: u64) -> tokio::io::Result<()> { - use tokio::io::AsyncWriteExt; - match self { - Self::ServerTls(t) => t.write_u64(n).await, - Self::ClientTls(t) => t.write_u64(n).await, - Self::Regular(t) => t.write_u64(n).await, - } - } - async fn write_all(&mut self, data: &[u8]) -> tokio::io::Result<()> { use tokio::io::AsyncWriteExt; match self { @@ -337,23 +329,22 @@ impl Actor for SessionWriter { w.writable().await?; } - let encoded_data = msg.encode_length_delimited_to_vec(); - if let Err(write_err) = stream.write_u64(encoded_data.len() as u64).await { + // encode payload with length prefixed of proto encoded binary data + let len = msg.encoded_len(); + let mut buf: Vec = Vec::with_capacity(len + size_of::()); + buf.write_all(&len.to_be_bytes()) + .expect("buffer should have enough capacity"); + msg.encode(&mut buf) + .expect("buffer should have enough capacity"); + tracing::trace!("Writing payload (len={len})",); + // now send the object + if let Err(write_err) = stream.write_all(&buf).await { tracing::warn!("Error writing to the stream '{write_err}'"); - } else { - tracing::trace!( - "Wrote length, writing payload (len={})", - encoded_data.len() - ); - // now send the object - if let Err(write_err) = stream.write_all(&encoded_data).await { - tracing::warn!("Error writing to the stream '{write_err}'"); - myself.stop(Some("channel_closed".to_string())); - return Ok(()); - } - // flush the stream - stream.flush().await.unwrap(); + myself.stop(Some("channel_closed".to_string())); + return Ok(()); } + // flush the stream + stream.flush().await.unwrap(); } } _ => { @@ -454,7 +445,7 @@ impl Actor for SessionReader { // [buf] here should contain the exact amount of data to decode an object properly. let bytes = Bytes::from(buf); - match crate::protocol::NetworkMessage::decode_length_delimited(bytes) { + match crate::protocol::NetworkMessage::decode(bytes) { Ok(msg) => { // we decoded a message, pass it up the chain let _ = self.session.cast(SessionMessage::ObjectAvailable(msg));