From 5e8f3a67a637be7fcd6dd358434649f51b02bb64 Mon Sep 17 00:00:00 2001 From: Yuri Edward Date: Wed, 27 Mar 2024 18:55:30 +0100 Subject: [PATCH] Fixed various bugs in the protocol implementation --- src/profiler/log_msg.rs | 3 ++- src/profiler/network_types/message.rs | 13 ++++++++++--- src/profiler/thread/core.rs | 5 ++--- src/profiler/thread/net.rs | 8 ++++---- src/profiler/thread/util.rs | 6 +++--- 5 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/profiler/log_msg.rs b/src/profiler/log_msg.rs index 56b5a35..747584a 100644 --- a/src/profiler/log_msg.rs +++ b/src/profiler/log_msg.rs @@ -141,7 +141,8 @@ impl SpanLog { } pub fn clear(&mut self) { - self.msg_len = 0; + self.msg_len = 1; + self.var_count = 0; } pub fn id(&self) -> NonZeroU32 { diff --git a/src/profiler/network_types/message.rs b/src/profiler/network_types/message.rs index b9b6d41..503d972 100644 --- a/src/profiler/network_types/message.rs +++ b/src/profiler/network_types/message.rs @@ -62,6 +62,10 @@ impl MsgSize for u16 { const SIZE: usize = 2; } +impl MsgSize for i64 { + const SIZE: usize = 8; +} + #[derive(Serialize, Clone, Debug)] pub struct Duration { pub seconds: u32, @@ -197,18 +201,21 @@ impl Msg for SpanFollows { } #[derive(Serialize)] -pub struct SpanEvent<'a> { +pub struct SpanEvent { pub id: u32, pub timestamp: i64, pub level: Level, - pub message: &'a [u8], } -impl<'a> Msg for SpanEvent<'a> { +impl Msg for SpanEvent { const TYPE: MsgType = MsgType::SpanEvent; const HAS_PAYLOAD: bool = true; } +impl MsgSize for SpanEvent { + const SIZE: usize = u32::SIZE + i64::SIZE + Level::SIZE; +} + #[derive(Serialize)] pub struct SpanUpdate { pub id: u32, diff --git a/src/profiler/thread/core.rs b/src/profiler/thread/core.rs index b43906a..8d50f0e 100644 --- a/src/profiler/thread/core.rs +++ b/src/profiler/thread/core.rs @@ -116,11 +116,10 @@ impl<'a> Thread<'a> { event.write_finish(); let msg = nt::message::SpanEvent { id: event.id().map(|v| v.get()).unwrap_or(0), - message: event.as_bytes(), level: event.level(), timestamp: event.timestamp(), }; - wrap_io_debug_error!(self.net.network_write_dyn(msg, &mut self.msg).await); + wrap_io_debug_error!(self.net.network_write_fixed_payload(msg, event.as_bytes()).await); } async fn handle_control(&mut self, command: command::Control) -> bool { @@ -130,7 +129,7 @@ impl<'a> Thread<'a> { name, version, } => { - let mut cmd_line: FixedBufStr<128> = FixedBufStr::new(); + let mut cmd_line: FixedBufStr<255> = FixedBufStr::new(); read_command_line(&mut cmd_line); let app_name = app_name.str(); let name = name.str(); diff --git a/src/profiler/thread/net.rs b/src/profiler/thread/net.rs index 8c3cd8c..2b409c8 100644 --- a/src/profiler/thread/net.rs +++ b/src/profiler/thread/net.rs @@ -74,8 +74,8 @@ impl<'a> Net<'a> { if let Err(e) = message.serialize(&mut serializer) { return Err(Error::new(ErrorKind::Other, e)); } - self.write.write_u32_le(M::SIZE as _).await?; - self.write.write_all(&self.fixed_buffer[..M::SIZE]).await?; + self.write.write_u32_le((M::SIZE + 1) as _).await?; + self.write.write_all(&self.fixed_buffer[..M::SIZE + 1]).await?; Ok(()) } @@ -111,8 +111,8 @@ impl<'a> Net<'a> { if let Err(e) = message.serialize(&mut serializer) { return Err(Error::new(ErrorKind::Other, e)); } - self.write.write_u32_le((M::SIZE + buffer.as_ref().len()) as _).await?; - self.write.write_all(&self.fixed_buffer[..M::SIZE]).await?; + self.write.write_u32_le((M::SIZE + 1 + buffer.as_ref().len()) as _).await?; + self.write.write_all(&self.fixed_buffer[..M::SIZE + 1]).await?; self.write.write_all(buffer.as_ref()).await?; Ok(()) } diff --git a/src/profiler/thread/util.rs b/src/profiler/thread/util.rs index b35df01..d9c989f 100644 --- a/src/profiler/thread/util.rs +++ b/src/profiler/thread/util.rs @@ -64,15 +64,15 @@ impl FixedBufStr { impl Write for FixedBufStr { fn write_str(&mut self, value: &str) -> std::fmt::Result { - let len = std::cmp::min(value.len(), N); + let len = std::cmp::min(value.len(), self.buffer.len() - self.len as usize); unsafe { std::ptr::copy_nonoverlapping( value.as_ptr(), - std::mem::transmute(self.buffer.as_mut_ptr()), + std::mem::transmute(self.buffer.as_mut_ptr().offset(self.len as _)), len, ); } - self.len = len as _; + self.len += len as u8; Ok(()) } }