Skip to content

Commit

Permalink
Fixed various bugs in the protocol implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuri6037 committed Mar 27, 2024
1 parent 682fcf7 commit 5e8f3a6
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
3 changes: 2 additions & 1 deletion src/profiler/log_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ impl SpanLog {
}

pub fn clear(&mut self) {
self.msg_len = 0;
self.msg_len = 1;
self.var_count = 0;
}

pub fn id(&self) -> NonZeroU32 {
Expand Down
13 changes: 10 additions & 3 deletions src/profiler/network_types/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ impl MsgSize for u16 {
const SIZE: usize = 2;
}

impl MsgSize for i64 {
const SIZE: usize = 8;
}

#[derive(Serialize, Clone, Debug)]
pub struct Duration {
pub seconds: u32,
Expand Down Expand Up @@ -197,18 +201,21 @@ impl Msg for SpanFollows {
}

#[derive(Serialize)]
pub struct SpanEvent<'a> {
pub struct SpanEvent {
pub id: u32,
pub timestamp: i64,
pub level: Level,
pub message: &'a [u8],
}

impl<'a> Msg for SpanEvent<'a> {
impl Msg for SpanEvent {
const TYPE: MsgType = MsgType::SpanEvent;
const HAS_PAYLOAD: bool = true;
}

impl MsgSize for SpanEvent {
const SIZE: usize = u32::SIZE + i64::SIZE + Level::SIZE;
}

#[derive(Serialize)]
pub struct SpanUpdate {
pub id: u32,
Expand Down
5 changes: 2 additions & 3 deletions src/profiler/thread/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,10 @@ impl<'a> Thread<'a> {
event.write_finish();
let msg = nt::message::SpanEvent {
id: event.id().map(|v| v.get()).unwrap_or(0),
message: event.as_bytes(),
level: event.level(),
timestamp: event.timestamp(),
};
wrap_io_debug_error!(self.net.network_write_dyn(msg, &mut self.msg).await);
wrap_io_debug_error!(self.net.network_write_fixed_payload(msg, event.as_bytes()).await);
}

async fn handle_control(&mut self, command: command::Control) -> bool {
Expand All @@ -130,7 +129,7 @@ impl<'a> Thread<'a> {
name,
version,
} => {
let mut cmd_line: FixedBufStr<128> = FixedBufStr::new();
let mut cmd_line: FixedBufStr<255> = FixedBufStr::new();
read_command_line(&mut cmd_line);
let app_name = app_name.str();
let name = name.str();
Expand Down
8 changes: 4 additions & 4 deletions src/profiler/thread/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ impl<'a> Net<'a> {
if let Err(e) = message.serialize(&mut serializer) {
return Err(Error::new(ErrorKind::Other, e));
}
self.write.write_u32_le(M::SIZE as _).await?;
self.write.write_all(&self.fixed_buffer[..M::SIZE]).await?;
self.write.write_u32_le((M::SIZE + 1) as _).await?;
self.write.write_all(&self.fixed_buffer[..M::SIZE + 1]).await?;
Ok(())
}

Expand Down Expand Up @@ -111,8 +111,8 @@ impl<'a> Net<'a> {
if let Err(e) = message.serialize(&mut serializer) {
return Err(Error::new(ErrorKind::Other, e));
}
self.write.write_u32_le((M::SIZE + buffer.as_ref().len()) as _).await?;
self.write.write_all(&self.fixed_buffer[..M::SIZE]).await?;
self.write.write_u32_le((M::SIZE + 1 + buffer.as_ref().len()) as _).await?;
self.write.write_all(&self.fixed_buffer[..M::SIZE + 1]).await?;
self.write.write_all(buffer.as_ref()).await?;
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions src/profiler/thread/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ impl<const N: usize> FixedBufStr<N> {

impl<const N: usize> Write for FixedBufStr<N> {
fn write_str(&mut self, value: &str) -> std::fmt::Result {
let len = std::cmp::min(value.len(), N);
let len = std::cmp::min(value.len(), self.buffer.len() - self.len as usize);
unsafe {
std::ptr::copy_nonoverlapping(
value.as_ptr(),
std::mem::transmute(self.buffer.as_mut_ptr()),
std::mem::transmute(self.buffer.as_mut_ptr().offset(self.len as _)),
len,
);
}
self.len = len as _;
self.len += len as u8;
Ok(())
}
}
Expand Down

0 comments on commit 5e8f3a6

Please sign in to comment.