From 90c09920ac8c520aeed4bc1f57be46c363184f24 Mon Sep 17 00:00:00 2001 From: vianney Date: Fri, 14 Feb 2025 11:49:16 +0100 Subject: [PATCH 1/3] Add span decoder for bytes slices --- tinybytes/src/bytes_string.rs | 15 ++ .../src/msgpack_decoder/v04/decoder/mod.rs | 145 +++++++++++++++--- .../src/msgpack_decoder/v04/decoder/span.rs | 51 +++++- .../msgpack_decoder/v04/decoder/span_link.rs | 49 +++++- trace-utils/src/msgpack_decoder/v04/number.rs | 12 ++ trace-utils/src/span_v04/mod.rs | 2 +- trace-utils/src/span_v04/span.rs | 99 +++++++++++- 7 files changed, 343 insertions(+), 30 deletions(-) diff --git a/tinybytes/src/bytes_string.rs b/tinybytes/src/bytes_string.rs index 67b9ea9c3..629eedeed 100644 --- a/tinybytes/src/bytes_string.rs +++ b/tinybytes/src/bytes_string.rs @@ -82,6 +82,21 @@ impl BytesString { } } + /// Creates a `BytesString` from a string slice within the given buffer. + /// + /// # Arguments + /// + /// * `bytes` - A `tinybytes::Bytes` instance that will be converted into a `BytesString`. + /// * `slice` - The string slice pointing into the given bytes that will form the `BytesString`. + pub fn try_from_bytes_slice(bytes: &Bytes, slice: &str) -> Option { + // SAFETY: This is safe as a str slice is definitely a valid UTF-8 slice. + unsafe { + Some(Self::from_bytes_unchecked( + bytes.slice_ref(slice.as_bytes())?, + )) + } + } + /// Creates a `BytesString` from a `tinybytes::Bytes` instance without validating the bytes. /// /// This function does not perform any validation on the provided bytes, and assumes that the diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index d5e01f6c9..188ce588b 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -6,10 +6,11 @@ mod span_link; use self::span::decode_span; use super::error::DecodeError; -use super::number::read_number_bytes; -use crate::span_v04::Span; +use super::number::{read_nullable_number_ref, read_number_bytes, read_number_ref}; +use crate::span_v04::{Span, SpanSlice}; use rmp::decode::DecodeStringError; use rmp::{decode, decode::RmpRead, Marker}; +use span::decode_span_ref; use std::{collections::HashMap, f64}; use tinybytes::{Bytes, BytesString}; @@ -58,11 +59,26 @@ const NULL_MARKER: &u8 = &0xc0; /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name.as_str()); /// ``` -pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { - let trace_count = - rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) - })?; +pub fn from_slice(data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { + let mut parsed_data = data.clone(); + let (traces_ref, size) = from_slice_ref(unsafe { parsed_data.as_mut_slice() })?; + let traces_owned = traces_ref + .iter() + .map(|trace| { + trace + .iter() + // Safe to unwrap since the spans uses subslices of the `data` slice + .map(|span| span.try_to_bytes(&data).unwrap()) + .collect() + }) + .collect(); + Ok((traces_owned, size)) +} + +pub fn from_slice_ref(mut data: &[u8]) -> Result<(Vec>, usize), DecodeError> { + let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) + })?; let start_len = data.len(); @@ -74,12 +90,9 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), .expect("Unable to cast trace_count to usize"), ), |mut traces, _| { - let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) - .map_err(|_| { - DecodeError::InvalidFormat( - "Unable to read array len for span count".to_owned(), - ) - })?; + let span_count = rmp::decode::read_array_len(&mut data).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for span count".to_owned()) + })?; let trace = (0..span_count).try_fold( Vec::with_capacity( @@ -88,7 +101,7 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), .expect("Unable to cast span_count to usize"), ), |mut trace, _| { - let span = decode_span(&mut data)?; + let span = decode_span_ref(&mut data)?; trace.push(span); Ok(trace) }, @@ -143,6 +156,15 @@ fn read_nullable_string_bytes(buf: &mut Bytes) -> Result(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + if is_null_marker(buf) { + Ok("") + } else { + read_string_ref(buf) + } +} + #[inline] // Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the // BytesStrings. @@ -172,6 +194,35 @@ fn read_nullable_str_map_to_bytes_strings( read_str_map_to_bytes_strings(buf) } +#[inline] +// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the +// BytesStrings. +fn read_str_map_to_str_ref<'a>( + buf: &mut &'a [u8], +) -> Result, DecodeError> { + let len = decode::read_map_len(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; + + let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); + for _ in 0..len { + let key = read_string_ref(buf)?; + let value = read_string_ref(buf)?; + map.insert(key, value); + } + Ok(map) +} + +#[inline] +fn read_nullable_str_map_to_str_ref<'a>( + buf: &mut &'a [u8], +) -> Result, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + read_str_map_to_str_ref(buf) +} + #[inline] fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> { let key = read_string_bytes(buf)?; @@ -215,6 +266,52 @@ fn read_meta_struct(buf: &mut Bytes) -> Result>, De read_map(len, buf, read_meta_struct_pair) } +#[inline] +fn read_metric_pair_ref<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { + let key = read_string_ref(buf)?; + let v = read_number_ref(buf)?; + + Ok((key, v)) +} + +#[inline] +fn read_metrics_ref<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + let len = read_map_len(buf)?; + + read_map(len, buf, read_metric_pair_ref) +} + +#[inline] +fn read_meta_struct_ref<'a>(buf: &mut &'a [u8]) -> Result>, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + fn read_meta_struct_pair_ref<'a>( + buf: &mut &'a [u8], + ) -> Result<(&'a str, Vec), DecodeError> { + let key = read_string_ref(buf)?; + let array_len = decode::read_array_len(buf).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) + })?; + + let mut v = Vec::with_capacity(array_len as usize); + + for _ in 0..array_len { + let value = read_number_ref(buf)?; + v.push(value); + } + Ok((key, v)) + } + + let len = read_map_len(buf)?; + read_map(len, buf, read_meta_struct_pair_ref) +} + /// Reads a map from the buffer and returns it as a `HashMap`. /// /// This function is generic over the key and value types of the map, and it uses a provided @@ -243,14 +340,10 @@ fn read_meta_struct(buf: &mut Bytes) -> Result>, De /// * `V` - The type of the values in the map. /// * `F` - The type of the function used to read key-value pairs from the buffer. #[inline] -fn read_map( - len: usize, - buf: &mut Bytes, - read_pair: F, -) -> Result, DecodeError> +fn read_map(len: usize, buf: &mut B, read_pair: F) -> Result, DecodeError> where K: std::hash::Hash + Eq, - F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>, + F: Fn(&mut B) -> Result<(K, V), DecodeError>, { let mut map = HashMap::with_capacity(len); for _ in 0..len { @@ -297,6 +390,18 @@ where } } +/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is +/// null and return the default value. If it is not null, you can continue to decode as expected. +#[inline] +fn is_null_marker(buf: &mut &[u8]) -> bool { + if buf.first() == Some(NULL_MARKER) { + *buf = &buf[1..]; + true + } else { + false + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs index ccbb7d1e9..45b2d5819 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs @@ -2,12 +2,15 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ - read_meta_struct, read_metrics, read_nullable_str_map_to_bytes_strings, - read_nullable_string_bytes, read_string_ref, span_link::read_span_links, + read_meta_struct, read_meta_struct_ref, read_metric_pair_ref, read_metrics, read_metrics_ref, + read_nullable_number_ref, read_nullable_str_map_to_bytes_strings, + read_nullable_str_map_to_str_ref, read_nullable_string_bytes, read_nullable_string_ref, + read_number_ref, read_str_map_to_str_ref, read_string_ref, + span_link::{read_span_links, read_span_links_ref}, }; use crate::msgpack_decoder::v04::error::DecodeError; use crate::msgpack_decoder::v04::number::read_nullable_number_bytes; -use crate::span_v04::{Span, SpanKey}; +use crate::span_v04::{Span, SpanKey, SpanSlice}; use tinybytes::Bytes; /// Decodes a slice of bytes into a `Span` object. @@ -27,14 +30,24 @@ use tinybytes::Bytes; /// - The map length cannot be read. /// - Any key or value cannot be decoded. pub fn decode_span(buffer: &mut Bytes) -> Result { - let mut span = Span::default(); + let span_ref = decode_span_ref(unsafe { buffer.as_mut_slice() }); + println!("Span ref {:?}", span_ref); - let span_size = rmp::decode::read_map_len(unsafe { buffer.as_mut_slice() }).map_err(|_| { + span_ref + .unwrap() + .try_to_bytes(buffer) + .ok_or(DecodeError::IOError) +} + +pub fn decode_span_ref<'a>(buffer: &mut &'a [u8]) -> Result, DecodeError> { + let mut span = SpanSlice::default(); + + let span_size = rmp::decode::read_map_len(buffer).map_err(|_| { DecodeError::InvalidFormat("Unable to get map len for span size".to_owned()) })?; for _ in 0..span_size { - fill_span(&mut span, buffer)?; + fill_span_ref(&mut span, buffer)?; } Ok(span) @@ -66,6 +79,32 @@ fn fill_span(span: &mut Span, buf: &mut Bytes) -> Result<(), DecodeError> { Ok(()) } +// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the +// BytesStrings +fn fill_span_ref<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> { + let key = read_string_ref(buf)? + .parse::() + .map_err(|_| DecodeError::InvalidFormat("Invalid span key".to_owned()))?; + + match key { + SpanKey::Service => span.service = read_nullable_string_ref(buf)?, + SpanKey::Name => span.name = read_nullable_string_ref(buf)?, + SpanKey::Resource => span.resource = read_nullable_string_ref(buf)?, + SpanKey::TraceId => span.trace_id = read_nullable_number_ref(buf)?, + SpanKey::SpanId => span.span_id = read_nullable_number_ref(buf)?, + SpanKey::ParentId => span.parent_id = read_nullable_number_ref(buf)?, + SpanKey::Start => span.start = read_nullable_number_ref(buf)?, + SpanKey::Duration => span.duration = read_nullable_number_ref(buf)?, + SpanKey::Error => span.error = read_nullable_number_ref(buf)?, + SpanKey::Type => span.r#type = read_nullable_string_ref(buf)?, + SpanKey::Meta => span.meta = read_nullable_str_map_to_str_ref(buf)?, + SpanKey::Metrics => span.metrics = read_metrics_ref(buf)?, + SpanKey::MetaStruct => span.meta_struct = read_meta_struct_ref(buf)?, + SpanKey::SpanLinks => span.span_links = read_span_links_ref(buf)?, + } + Ok(()) +} + #[cfg(test)] mod tests { use super::SpanKey; diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs index 2e9c4ec5f..5bb5c2716 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs @@ -5,12 +5,14 @@ use crate::msgpack_decoder::v04::decoder::{ handle_null_marker, read_str_map_to_bytes_strings, read_string_bytes, read_string_ref, }; use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_number_bytes; -use crate::span_v04::SpanLink; +use crate::msgpack_decoder::v04::number::{read_number_bytes, read_number_ref}; +use crate::span_v04::{SpanLink, SpanLinkSlice}; use rmp::Marker; use std::str::FromStr; use tinybytes::Bytes; +use super::{is_null_marker, read_str_map_to_str_ref}; + /// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. /// /// # Arguments @@ -48,6 +50,30 @@ pub(crate) fn read_span_links(buf: &mut Bytes) -> Result, DecodeEr )), } } + +pub(crate) fn read_span_links_ref<'a>( + buf: &mut &'a [u8], +) -> Result>, DecodeError> { + if is_null_marker(buf) { + return Ok(Vec::default()); + } + + match rmp::decode::read_marker(buf).map_err(|_| { + DecodeError::InvalidFormat("Unable to read marker for span links".to_owned()) + })? { + Marker::FixArray(len) => { + let mut vec: Vec> = Vec::with_capacity(len.into()); + for _ in 0..len { + vec.push(decode_span_link_ref(buf)?); + } + Ok(vec) + } + _ => Err(DecodeError::InvalidType( + "Unable to read span link from buffer".to_owned(), + )), + } +} + #[derive(Debug, PartialEq)] enum SpanLinkKey { TraceId, @@ -95,6 +121,25 @@ fn decode_span_link(buf: &mut Bytes) -> Result { Ok(span) } +fn decode_span_link_ref<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + let mut span = SpanLinkSlice::default(); + let span_size = rmp::decode::read_map_len(buf) + .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; + + for _ in 0..span_size { + match read_string_ref(buf)?.parse::()? { + SpanLinkKey::TraceId => span.trace_id = read_number_ref(buf)?, + SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_ref(buf)?, + SpanLinkKey::SpanId => span.span_id = read_number_ref(buf)?, + SpanLinkKey::Attributes => span.attributes = read_str_map_to_str_ref(buf)?, + SpanLinkKey::Tracestate => span.tracestate = read_string_ref(buf)?, + SpanLinkKey::Flags => span.flags = read_number_ref(buf)?, + } + } + + Ok(span) +} + #[cfg(test)] mod tests { use super::SpanLinkKey; diff --git a/trace-utils/src/msgpack_decoder/v04/number.rs b/trace-utils/src/msgpack_decoder/v04/number.rs index 219591481..d6192919d 100644 --- a/trace-utils/src/msgpack_decoder/v04/number.rs +++ b/trace-utils/src/msgpack_decoder/v04/number.rs @@ -209,6 +209,18 @@ pub fn read_nullable_number_bytes>( read_number(unsafe { buf.as_mut_slice() }, true)?.try_into() } +pub fn read_number_ref>( + buf: &mut &[u8], +) -> Result { + read_number(buf, false)?.try_into() +} + +pub fn read_nullable_number_ref>( + buf: &mut &[u8], +) -> Result { + read_number(buf, true)?.try_into() +} + #[cfg(test)] mod tests { use super::*; diff --git a/trace-utils/src/span_v04/mod.rs b/trace-utils/src/span_v04/mod.rs index cf8d34ee0..0265295ff 100644 --- a/trace-utils/src/span_v04/mod.rs +++ b/trace-utils/src/span_v04/mod.rs @@ -5,4 +5,4 @@ mod span; pub mod trace_utils; -pub use span::{Span, SpanKey, SpanKeyParseError, SpanLink}; +pub use span::{Span, SpanKey, SpanKeyParseError, SpanLink, SpanLinkSlice, SpanSlice}; diff --git a/trace-utils/src/span_v04/span.rs b/trace-utils/src/span_v04/span.rs index 83c220915..6ce464d0a 100644 --- a/trace-utils/src/span_v04/span.rs +++ b/trace-utils/src/span_v04/span.rs @@ -5,7 +5,7 @@ use serde::Serialize; use std::collections::HashMap; use std::fmt; use std::str::FromStr; -use tinybytes::BytesString; +use tinybytes::{Bytes, BytesString}; #[derive(Debug, PartialEq)] pub enum SpanKey { @@ -82,6 +82,103 @@ pub struct SpanLink { pub flags: u64, } +#[derive(Clone, Debug, Default, PartialEq, Serialize)] +pub struct SpanSlice<'a> { + pub service: &'a str, + pub name: &'a str, + pub resource: &'a str, + pub r#type: &'a str, + pub trace_id: u64, + pub span_id: u64, + pub parent_id: u64, + pub start: i64, + pub duration: i64, + #[serde(skip_serializing_if = "is_default")] + pub error: i32, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta: HashMap<&'a str, &'a str>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub metrics: HashMap<&'a str, f64>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta_struct: HashMap<&'a str, Vec>, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub span_links: Vec>, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize)] +pub struct SpanLinkSlice<'a> { + pub trace_id: u64, + pub trace_id_high: u64, + pub span_id: u64, + pub attributes: HashMap<&'a str, &'a str>, + pub tracestate: &'a str, + pub flags: u64, +} + +impl SpanSlice<'_> { + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + Some(Span { + service: BytesString::try_from_bytes_slice(bytes, self.service)?, + name: BytesString::try_from_bytes_slice(bytes, self.name)?, + resource: BytesString::try_from_bytes_slice(bytes, self.resource)?, + r#type: BytesString::try_from_bytes_slice(bytes, self.r#type)?, + trace_id: self.trace_id, + span_id: self.span_id, + parent_id: self.parent_id, + start: self.start, + duration: self.duration, + error: self.error, + meta: self + .meta + .iter() + .map(|(k, v)| { + Some(( + BytesString::try_from_bytes_slice(bytes, k)?, + BytesString::try_from_bytes_slice(bytes, v)?, + )) + }) + .collect::>>()?, + metrics: self + .metrics + .iter() + .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, *v))) + .collect::>>()?, + meta_struct: self + .meta_struct + .iter() + .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, v.clone()))) + .collect::>>>()?, + span_links: self + .span_links + .iter() + .map(|link| link.try_to_bytes(bytes)) + .collect::>>()?, + }) + } +} + +impl SpanLinkSlice<'_> { + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + Some(SpanLink { + trace_id: self.trace_id, + trace_id_high: self.trace_id_high, + span_id: self.span_id, + attributes: self + .attributes + .iter() + .map(|(k, v)| { + Some(( + BytesString::try_from_bytes_slice(bytes, k)?, + BytesString::try_from_bytes_slice(bytes, v)?, + )) + }) + .collect::>>()?, + tracestate: BytesString::try_from_bytes_slice(bytes, self.tracestate)?, + flags: self.flags, + }) + } +} + #[derive(Debug)] pub struct SpanKeyParseError { pub message: String, From 84f88535633594f52e43180722cd503d563fc2fe Mon Sep 17 00:00:00 2001 From: vianney Date: Fri, 14 Feb 2025 15:25:25 +0100 Subject: [PATCH 2/3] Remove Bytes decoder function --- data-pipeline/src/trace_exporter/mod.rs | 2 +- .../src/msgpack_decoder/v04/decoder/mod.rs | 285 +++++++----------- .../src/msgpack_decoder/v04/decoder/span.rs | 71 +---- .../msgpack_decoder/v04/decoder/span_link.rs | 63 +--- trace-utils/src/tracer_payload.rs | 2 +- 5 files changed, 132 insertions(+), 291 deletions(-) diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index 78ed78456..470d920eb 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -588,7 +588,7 @@ impl TraceExporter { fn send_deser_ser(&self, data: tinybytes::Bytes) -> Result { // TODO base on input format - let (mut traces, size) = match msgpack_decoder::v04::decoder::from_slice(data) { + let (mut traces, size) = match msgpack_decoder::v04::decoder::from_bytes(data) { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index 188ce588b..cac5d0dde 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -4,22 +4,18 @@ mod span; mod span_link; -use self::span::decode_span; use super::error::DecodeError; -use super::number::{read_nullable_number_ref, read_number_bytes, read_number_ref}; +use super::number::{read_nullable_number_ref, read_number_ref}; use crate::span_v04::{Span, SpanSlice}; use rmp::decode::DecodeStringError; use rmp::{decode, decode::RmpRead, Marker}; -use span::decode_span_ref; +use span::decode_span; use std::{collections::HashMap, f64}; -use tinybytes::{Bytes, BytesString}; // https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) const NULL_MARKER: &u8 = &0xc0; -/// Decodes a slice of bytes into a vector of `TracerPayloadV04` objects. -/// -/// +/// Decodes a Bytes buffer into a vector of `TracerPayloadV04` objects. /// /// # Arguments /// @@ -40,10 +36,10 @@ const NULL_MARKER: &u8 = &0xc0; /// # Examples /// /// ``` -/// use datadog_trace_protobuf::pb::Span; -/// use datadog_trace_utils::msgpack_decoder::v04::decoder::from_slice; -/// use rmp_serde::to_vec_named; -/// use tinybytes; +/// # use datadog_trace_protobuf::pb::Span; +/// # use datadog_trace_utils::msgpack_decoder::v04::decoder::from_bytes; +/// # use rmp_serde::to_vec_named; +/// # use tinybytes; /// /// let span = Span { /// name: "test-span".to_owned(), @@ -52,22 +48,22 @@ const NULL_MARKER: &u8 = &0xc0; /// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); /// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); /// let (decoded_traces, _payload_size) = -/// from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); +/// from_bytes(encoded_data_as_tinybytes).expect("Decoding failed"); /// /// assert_eq!(1, decoded_traces.len()); /// assert_eq!(1, decoded_traces[0].len()); /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name.as_str()); /// ``` -pub fn from_slice(data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { +pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { let mut parsed_data = data.clone(); - let (traces_ref, size) = from_slice_ref(unsafe { parsed_data.as_mut_slice() })?; + let (traces_ref, size) = from_slice(unsafe { parsed_data.as_mut_slice() })?; let traces_owned = traces_ref .iter() .map(|trace| { trace .iter() - // Safe to unwrap since the spans uses subslices of the `data` slice + // Safe to unwrap since the spans use subslices of the `data` slice .map(|span| span.try_to_bytes(&data).unwrap()) .collect() }) @@ -75,7 +71,47 @@ pub fn from_slice(data: tinybytes::Bytes) -> Result<(Vec>, usize), Dec Ok((traces_owned, size)) } -pub fn from_slice_ref(mut data: &[u8]) -> Result<(Vec>, usize), DecodeError> { +/// Decodes a slice of bytes into a `Vec>` object. +/// The resulting spans have the same lifetime as the initial buffer. +/// +/// # Arguments +/// +/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be +/// encoded msgpack data containing a list of a list of v04 spans. +/// +/// # Returns +/// +/// * `Ok(Vec>)` - The decoded `SpanSlice` objects if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The array length for trace count or span count cannot be read. +/// - Any span cannot be decoded. +/// +/// # Examples +/// +/// ``` +/// # use datadog_trace_protobuf::pb::Span; +/// # use datadog_trace_utils::msgpack_decoder::v04::decoder::from_slice; +/// # use rmp_serde::to_vec_named; +/// # use tinybytes; +/// +/// let span = Span { +/// name: "test-span".to_owned(), +/// ..Default::default() +/// }; +/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); +/// let (decoded_traces, _payload_size) = +/// from_slice(encoded_data.as_slice()).expect("Decoding failed"); +/// +/// assert_eq!(1, decoded_traces.len()); +/// assert_eq!(1, decoded_traces[0].len()); +/// let decoded_span = &decoded_traces[0][0]; +/// assert_eq!("test-span", decoded_span.name); +/// ``` +pub fn from_slice(mut data: &[u8]) -> Result<(Vec>, usize), DecodeError> { let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| { DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) })?; @@ -101,7 +137,7 @@ pub fn from_slice_ref(mut data: &[u8]) -> Result<(Vec>, usize), D .expect("Unable to cast span_count to usize"), ), |mut trace, _| { - let span = decode_span_ref(&mut data)?; + let span = decode_span(&mut data)?; trace.push(span); Ok(trace) }, @@ -117,7 +153,7 @@ pub fn from_slice_ref(mut data: &[u8]) -> Result<(Vec>, usize), D } #[inline] -fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { +fn read_string_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { decode::read_str_from_slice(buf).map_err(|e| match e { DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), @@ -130,171 +166,73 @@ fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { } #[inline] -fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { - read_string_ref_nomut(buf).map(|(str, newbuf)| { +fn read_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + read_string_nomut(buf).map(|(str, newbuf)| { *buf = newbuf; str }) } #[inline] -fn read_string_bytes(buf: &mut Bytes) -> Result { - // Note: we need to pass a &'static lifetime here, otherwise it'll complain - read_string_ref_nomut(unsafe { buf.as_mut_slice() }).map(|(str, newbuf)| { - let string = BytesString::from_bytes_slice(buf, str); - *unsafe { buf.as_mut_slice() } = newbuf; - string - }) -} - -#[inline] -fn read_nullable_string_bytes(buf: &mut Bytes) -> Result { - if let Some(empty_string) = handle_null_marker(buf, BytesString::default) { - Ok(empty_string) - } else { - read_string_bytes(buf) - } -} - -#[inline] -fn read_nullable_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { +fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { if is_null_marker(buf) { Ok("") } else { - read_string_ref(buf) - } -} - -#[inline] -// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the -// BytesStrings. -fn read_str_map_to_bytes_strings( - buf: &mut Bytes, -) -> Result, DecodeError> { - let len = decode::read_map_len(unsafe { buf.as_mut_slice() }) - .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; - - let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); - for _ in 0..len { - let key = read_string_bytes(buf)?; - let value = read_string_bytes(buf)?; - map.insert(key, value); + read_string(buf) } - Ok(map) } #[inline] -fn read_nullable_str_map_to_bytes_strings( - buf: &mut Bytes, -) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); - } - - read_str_map_to_bytes_strings(buf) -} - -#[inline] -// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the -// BytesStrings. -fn read_str_map_to_str_ref<'a>( - buf: &mut &'a [u8], -) -> Result, DecodeError> { +fn read_str_map_to_str<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { let len = decode::read_map_len(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); for _ in 0..len { - let key = read_string_ref(buf)?; - let value = read_string_ref(buf)?; + let key = read_string(buf)?; + let value = read_string(buf)?; map.insert(key, value); } Ok(map) } #[inline] -fn read_nullable_str_map_to_str_ref<'a>( +fn read_nullable_str_map_to_str<'a>( buf: &mut &'a [u8], ) -> Result, DecodeError> { if is_null_marker(buf) { return Ok(HashMap::default()); } - read_str_map_to_str_ref(buf) -} - -#[inline] -fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> { - let key = read_string_bytes(buf)?; - let v = read_number_bytes(buf)?; - - Ok((key, v)) -} -#[inline] -fn read_metrics(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); - } - - let len = read_map_len(unsafe { buf.as_mut_slice() })?; - - read_map(len, buf, read_metric_pair) + read_str_map_to_str(buf) } #[inline] -fn read_meta_struct(buf: &mut Bytes) -> Result>, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); +fn read_metrics<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); } - fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Vec), DecodeError> { - let key = read_string_bytes(buf)?; - let array_len = decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) - })?; - - let mut v = Vec::with_capacity(array_len as usize); + fn read_metric_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { + let key = read_string(buf)?; + let v = read_number_ref(buf)?; - for _ in 0..array_len { - let value = read_number_bytes(buf)?; - v.push(value); - } Ok((key, v)) } - let len = read_map_len(unsafe { buf.as_mut_slice() })?; - read_map(len, buf, read_meta_struct_pair) -} - -#[inline] -fn read_metric_pair_ref<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { - let key = read_string_ref(buf)?; - let v = read_number_ref(buf)?; - - Ok((key, v)) -} - -#[inline] -fn read_metrics_ref<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - if is_null_marker(buf) { - return Ok(HashMap::default()); - } - let len = read_map_len(buf)?; - read_map(len, buf, read_metric_pair_ref) + read_map(len, buf, read_metric_pair) } #[inline] -fn read_meta_struct_ref<'a>(buf: &mut &'a [u8]) -> Result>, DecodeError> { +fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result>, DecodeError> { if is_null_marker(buf) { return Ok(HashMap::default()); } - fn read_meta_struct_pair_ref<'a>( - buf: &mut &'a [u8], - ) -> Result<(&'a str, Vec), DecodeError> { - let key = read_string_ref(buf)?; + fn read_meta_struct_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, Vec), DecodeError> { + let key = read_string(buf)?; let array_len = decode::read_array_len(buf).map_err(|_| { DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) })?; @@ -309,7 +247,7 @@ fn read_meta_struct_ref<'a>(buf: &mut &'a [u8]) -> Result(buf: &mut &'a [u8]) -> Result`. /// @@ -340,10 +278,14 @@ fn read_meta_struct_ref<'a>(buf: &mut &'a [u8]) -> Result(len: usize, buf: &mut B, read_pair: F) -> Result, DecodeError> +fn read_map<'a, K, V, F>( + len: usize, + buf: &mut &'a [u8], + read_pair: F, +) -> Result, DecodeError> where K: std::hash::Hash + Eq, - F: Fn(&mut B) -> Result<(K, V), DecodeError>, + F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>, { let mut map = HashMap::with_capacity(len); for _ in 0..len { @@ -374,24 +316,7 @@ fn read_map_len(buf: &mut &[u8]) -> Result { } /// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is -/// null and return the default value. If it is not null, you can continue to decode as expected. -#[inline] -fn handle_null_marker(buf: &mut Bytes, default: F) -> Option -where - F: FnOnce() -> T, -{ - let slice = unsafe { buf.as_mut_slice() }; - - if slice.first() == Some(NULL_MARKER) { - *slice = &slice[1..]; - Some(default()) - } else { - None - } -} - -/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is -/// null and return the default value. If it is not null, you can continue to decode as expected. +/// null. If it is not null, you can continue to decode as expected. #[inline] fn is_null_marker(buf: &mut &[u8]) -> bool { if buf.first() == Some(NULL_MARKER) { @@ -433,7 +358,7 @@ mod tests { let encoded_data = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let (_decoded_traces, decoded_size) = from_slice(bytes).expect("Decoding failed"); + let (_decoded_traces, decoded_size) = from_bytes(bytes).expect("Decoding failed"); assert_eq!(0, decoded_size); } @@ -448,7 +373,7 @@ mod tests { let expected_size = encoded_data.len() - 1; // rmp_serde adds additional 0 byte encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (_decoded_traces, decoded_size) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(expected_size, decoded_size); } @@ -463,7 +388,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -478,7 +403,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -492,7 +417,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -507,7 +432,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -522,7 +447,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -543,7 +468,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -567,7 +492,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -593,7 +518,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -614,7 +539,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -639,7 +564,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -661,7 +586,7 @@ mod tests { span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -685,7 +610,7 @@ mod tests { span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -705,7 +630,7 @@ mod tests { span["metrics"] = json!(null); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -732,7 +657,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -779,7 +704,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -801,7 +726,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::InvalidFormat( "Expected at least bytes 1, but only got 0 (pos 0)".to_owned() @@ -824,7 +749,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::Utf8Error( "invalid utf-8 sequence of 1 bytes from index 1".to_owned() @@ -844,7 +769,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::InvalidFormat( @@ -866,7 +791,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::InvalidFormat( @@ -887,7 +812,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::InvalidType( @@ -951,7 +876,7 @@ mod tests { ..Default::default() }; let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); - let result = from_slice(tinybytes::Bytes::from(encoded_data)); + let result = from_bytes(tinybytes::Bytes::from(encoded_data)); assert!(result.is_ok()); }, diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs index 45b2d5819..f0d181126 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span.rs @@ -2,16 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ - read_meta_struct, read_meta_struct_ref, read_metric_pair_ref, read_metrics, read_metrics_ref, - read_nullable_number_ref, read_nullable_str_map_to_bytes_strings, - read_nullable_str_map_to_str_ref, read_nullable_string_bytes, read_nullable_string_ref, - read_number_ref, read_str_map_to_str_ref, read_string_ref, - span_link::{read_span_links, read_span_links_ref}, + read_meta_struct, read_metrics, read_nullable_number_ref, read_nullable_str_map_to_str, + read_nullable_string, read_string, span_link::read_span_links, }; use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_nullable_number_bytes; -use crate::span_v04::{Span, SpanKey, SpanSlice}; -use tinybytes::Bytes; +use crate::span_v04::{SpanKey, SpanSlice}; /// Decodes a slice of bytes into a `Span` object. /// @@ -29,17 +24,7 @@ use tinybytes::Bytes; /// This function will return an error if: /// - The map length cannot be read. /// - Any key or value cannot be decoded. -pub fn decode_span(buffer: &mut Bytes) -> Result { - let span_ref = decode_span_ref(unsafe { buffer.as_mut_slice() }); - println!("Span ref {:?}", span_ref); - - span_ref - .unwrap() - .try_to_bytes(buffer) - .ok_or(DecodeError::IOError) -} - -pub fn decode_span_ref<'a>(buffer: &mut &'a [u8]) -> Result, DecodeError> { +pub fn decode_span<'a>(buffer: &mut &'a [u8]) -> Result, DecodeError> { let mut span = SpanSlice::default(); let span_size = rmp::decode::read_map_len(buffer).map_err(|_| { @@ -47,7 +32,7 @@ pub fn decode_span_ref<'a>(buffer: &mut &'a [u8]) -> Result, Decod })?; for _ in 0..span_size { - fill_span_ref(&mut span, buffer)?; + fill_span(&mut span, buffer)?; } Ok(span) @@ -55,52 +40,26 @@ pub fn decode_span_ref<'a>(buffer: &mut &'a [u8]) -> Result, Decod // Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the // BytesStrings -fn fill_span(span: &mut Span, buf: &mut Bytes) -> Result<(), DecodeError> { - let key = read_string_ref(unsafe { buf.as_mut_slice() })? +fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> { + let key = read_string(buf)? .parse::() .map_err(|_| DecodeError::InvalidFormat("Invalid span key".to_owned()))?; match key { - SpanKey::Service => span.service = read_nullable_string_bytes(buf)?, - SpanKey::Name => span.name = read_nullable_string_bytes(buf)?, - SpanKey::Resource => span.resource = read_nullable_string_bytes(buf)?, - SpanKey::TraceId => span.trace_id = read_nullable_number_bytes(buf)?, - SpanKey::SpanId => span.span_id = read_nullable_number_bytes(buf)?, - SpanKey::ParentId => span.parent_id = read_nullable_number_bytes(buf)?, - SpanKey::Start => span.start = read_nullable_number_bytes(buf)?, - SpanKey::Duration => span.duration = read_nullable_number_bytes(buf)?, - SpanKey::Error => span.error = read_nullable_number_bytes(buf)?, - SpanKey::Type => span.r#type = read_nullable_string_bytes(buf)?, - SpanKey::Meta => span.meta = read_nullable_str_map_to_bytes_strings(buf)?, - SpanKey::Metrics => span.metrics = read_metrics(buf)?, - SpanKey::MetaStruct => span.meta_struct = read_meta_struct(buf)?, - SpanKey::SpanLinks => span.span_links = read_span_links(buf)?, - } - Ok(()) -} - -// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the -// BytesStrings -fn fill_span_ref<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> { - let key = read_string_ref(buf)? - .parse::() - .map_err(|_| DecodeError::InvalidFormat("Invalid span key".to_owned()))?; - - match key { - SpanKey::Service => span.service = read_nullable_string_ref(buf)?, - SpanKey::Name => span.name = read_nullable_string_ref(buf)?, - SpanKey::Resource => span.resource = read_nullable_string_ref(buf)?, + SpanKey::Service => span.service = read_nullable_string(buf)?, + SpanKey::Name => span.name = read_nullable_string(buf)?, + SpanKey::Resource => span.resource = read_nullable_string(buf)?, SpanKey::TraceId => span.trace_id = read_nullable_number_ref(buf)?, SpanKey::SpanId => span.span_id = read_nullable_number_ref(buf)?, SpanKey::ParentId => span.parent_id = read_nullable_number_ref(buf)?, SpanKey::Start => span.start = read_nullable_number_ref(buf)?, SpanKey::Duration => span.duration = read_nullable_number_ref(buf)?, SpanKey::Error => span.error = read_nullable_number_ref(buf)?, - SpanKey::Type => span.r#type = read_nullable_string_ref(buf)?, - SpanKey::Meta => span.meta = read_nullable_str_map_to_str_ref(buf)?, - SpanKey::Metrics => span.metrics = read_metrics_ref(buf)?, - SpanKey::MetaStruct => span.meta_struct = read_meta_struct_ref(buf)?, - SpanKey::SpanLinks => span.span_links = read_span_links_ref(buf)?, + SpanKey::Type => span.r#type = read_nullable_string(buf)?, + SpanKey::Meta => span.meta = read_nullable_str_map_to_str(buf)?, + SpanKey::Metrics => span.metrics = read_metrics(buf)?, + SpanKey::MetaStruct => span.meta_struct = read_meta_struct(buf)?, + SpanKey::SpanLinks => span.span_links = read_span_links(buf)?, } Ok(()) } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs index 5bb5c2716..30418d3d6 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs @@ -1,17 +1,14 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack_decoder::v04::decoder::{ - handle_null_marker, read_str_map_to_bytes_strings, read_string_bytes, read_string_ref, -}; +use crate::msgpack_decoder::v04::decoder::read_string; use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::{read_number_bytes, read_number_ref}; -use crate::span_v04::{SpanLink, SpanLinkSlice}; +use crate::msgpack_decoder::v04::number::read_number_ref; +use crate::span_v04::SpanLinkSlice; use rmp::Marker; use std::str::FromStr; -use tinybytes::Bytes; -use super::{is_null_marker, read_str_map_to_str_ref}; +use super::{is_null_marker, read_str_map_to_str}; /// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. /// @@ -30,28 +27,7 @@ use super::{is_null_marker, read_str_map_to_str_ref}; /// - The marker for the array length cannot be read. /// - Any `SpanLink` cannot be decoded. /// ``` -pub(crate) fn read_span_links(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_vec) = handle_null_marker(buf, Vec::default) { - return Ok(empty_vec); - } - - match rmp::decode::read_marker(unsafe { buf.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read marker for span links".to_owned()) - })? { - Marker::FixArray(len) => { - let mut vec: Vec = Vec::with_capacity(len.into()); - for _ in 0..len { - vec.push(decode_span_link(buf)?); - } - Ok(vec) - } - _ => Err(DecodeError::InvalidType( - "Unable to read span link from buffer".to_owned(), - )), - } -} - -pub(crate) fn read_span_links_ref<'a>( +pub(crate) fn read_span_links<'a>( buf: &mut &'a [u8], ) -> Result>, DecodeError> { if is_null_marker(buf) { @@ -64,7 +40,7 @@ pub(crate) fn read_span_links_ref<'a>( Marker::FixArray(len) => { let mut vec: Vec> = Vec::with_capacity(len.into()); for _ in 0..len { - vec.push(decode_span_link_ref(buf)?); + vec.push(decode_span_link(buf)?); } Ok(vec) } @@ -102,37 +78,18 @@ impl FromStr for SpanLinkKey { } } -fn decode_span_link(buf: &mut Bytes) -> Result { - let mut span = SpanLink::default(); - let span_size = rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }) - .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; - - for _ in 0..span_size { - match read_string_ref(unsafe { buf.as_mut_slice() })?.parse::()? { - SpanLinkKey::TraceId => span.trace_id = read_number_bytes(buf)?, - SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_bytes(buf)?, - SpanLinkKey::SpanId => span.span_id = read_number_bytes(buf)?, - SpanLinkKey::Attributes => span.attributes = read_str_map_to_bytes_strings(buf)?, - SpanLinkKey::Tracestate => span.tracestate = read_string_bytes(buf)?, - SpanLinkKey::Flags => span.flags = read_number_bytes(buf)?, - } - } - - Ok(span) -} - -fn decode_span_link_ref<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { +fn decode_span_link<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { let mut span = SpanLinkSlice::default(); let span_size = rmp::decode::read_map_len(buf) .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; for _ in 0..span_size { - match read_string_ref(buf)?.parse::()? { + match read_string(buf)?.parse::()? { SpanLinkKey::TraceId => span.trace_id = read_number_ref(buf)?, SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_ref(buf)?, SpanLinkKey::SpanId => span.span_id = read_number_ref(buf)?, - SpanLinkKey::Attributes => span.attributes = read_str_map_to_str_ref(buf)?, - SpanLinkKey::Tracestate => span.tracestate = read_string_ref(buf)?, + SpanLinkKey::Attributes => span.attributes = read_str_map_to_str(buf)?, + SpanLinkKey::Tracestate => span.tracestate = read_string(buf)?, SpanLinkKey::Flags => span.flags = read_number_ref(buf)?, } } diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 2623a5755..2da6b67b2 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -260,7 +260,7 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - let (traces, size) = match msgpack_decoder::v04::decoder::from_slice(self.data) { + let (traces, size) = match msgpack_decoder::v04::decoder::from_bytes(self.data) { Ok(res) => res, Err(e) => { anyhow::bail!("Error deserializing trace from request body: {e}") From 55b16bf6f36e73b32ad0fbd9cc73faaa8c398ed5 Mon Sep 17 00:00:00 2001 From: vianney Date: Mon, 17 Feb 2025 13:03:56 +0100 Subject: [PATCH 3/3] Move number, string and map decoding to separate modules --- trace-utils/src/msgpack_decoder/mod.rs | 1 + .../src/msgpack_decoder/v04/decoder/mod.rs | 191 +----------------- .../msgpack_decoder/v04/decoder/span/map.rs | 152 ++++++++++++++ .../v04/decoder/{span.rs => span/mod.rs} | 51 +++-- .../v04/{ => decoder/span}/number.rs | 50 ++--- .../v04/decoder/{ => span}/span_link.rs | 14 +- .../v04/decoder/span/string.rs | 44 ++++ trace-utils/src/msgpack_decoder/v04/error.rs | 6 + trace-utils/src/msgpack_decoder/v04/mod.rs | 3 +- 9 files changed, 271 insertions(+), 241 deletions(-) create mode 100644 trace-utils/src/msgpack_decoder/v04/decoder/span/map.rs rename trace-utils/src/msgpack_decoder/v04/decoder/{span.rs => span/mod.rs} (74%) rename trace-utils/src/msgpack_decoder/v04/{ => decoder/span}/number.rs (94%) rename trace-utils/src/msgpack_decoder/v04/decoder/{ => span}/span_link.rs (90%) create mode 100644 trace-utils/src/msgpack_decoder/v04/decoder/span/string.rs diff --git a/trace-utils/src/msgpack_decoder/mod.rs b/trace-utils/src/msgpack_decoder/mod.rs index f4e980a0a..f78316a76 100644 --- a/trace-utils/src/msgpack_decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/mod.rs @@ -1,4 +1,5 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Decoding logic for V04 encoded trace payload pub mod v04; diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index cac5d0dde..38b4df27d 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -1,19 +1,12 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Read spans from msgpack mod span; -mod span_link; use super::error::DecodeError; -use super::number::{read_nullable_number_ref, read_number_ref}; use crate::span_v04::{Span, SpanSlice}; -use rmp::decode::DecodeStringError; -use rmp::{decode, decode::RmpRead, Marker}; use span::decode_span; -use std::{collections::HashMap, f64}; - -// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) -const NULL_MARKER: &u8 = &0xc0; /// Decodes a Bytes buffer into a vector of `TracerPayloadV04` objects. /// @@ -152,181 +145,6 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>, usize), Decod )) } -#[inline] -fn read_string_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { - decode::read_str_from_slice(buf).map_err(|e| match e { - DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), - DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), - DecodeStringError::TypeMismatch(marker) => { - DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker)) - } - DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), - _ => DecodeError::IOError, - }) -} - -#[inline] -fn read_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { - read_string_nomut(buf).map(|(str, newbuf)| { - *buf = newbuf; - str - }) -} - -#[inline] -fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { - if is_null_marker(buf) { - Ok("") - } else { - read_string(buf) - } -} - -#[inline] -fn read_str_map_to_str<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - let len = decode::read_map_len(buf) - .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; - - let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); - for _ in 0..len { - let key = read_string(buf)?; - let value = read_string(buf)?; - map.insert(key, value); - } - Ok(map) -} - -#[inline] -fn read_nullable_str_map_to_str<'a>( - buf: &mut &'a [u8], -) -> Result, DecodeError> { - if is_null_marker(buf) { - return Ok(HashMap::default()); - } - - read_str_map_to_str(buf) -} - -#[inline] -fn read_metrics<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - if is_null_marker(buf) { - return Ok(HashMap::default()); - } - - fn read_metric_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { - let key = read_string(buf)?; - let v = read_number_ref(buf)?; - - Ok((key, v)) - } - - let len = read_map_len(buf)?; - - read_map(len, buf, read_metric_pair) -} - -#[inline] -fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result>, DecodeError> { - if is_null_marker(buf) { - return Ok(HashMap::default()); - } - - fn read_meta_struct_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, Vec), DecodeError> { - let key = read_string(buf)?; - let array_len = decode::read_array_len(buf).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) - })?; - - let mut v = Vec::with_capacity(array_len as usize); - - for _ in 0..array_len { - let value = read_number_ref(buf)?; - v.push(value); - } - Ok((key, v)) - } - - let len = read_map_len(buf)?; - read_map(len, buf, read_meta_struct_pair) -} - -/// Reads a map from the buffer and returns it as a `HashMap`. -/// -/// This function is generic over the key and value types of the map, and it uses a provided -/// function to read key-value pairs from the buffer. -/// -/// # Arguments -/// -/// * `len` - The number of key-value pairs to read from the buffer. -/// * `buf` - A reference to the slice containing the encoded map data. -/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a -/// `Result<(K, V), DecodeError>`. -/// -/// # Returns -/// -/// * `Ok(HashMap)` - A `HashMap` containing the decoded key-value pairs if successful. -/// * `Err(DecodeError)` - An error if the decoding process fails. -/// -/// # Errors -/// -/// This function will return an error if: -/// - The `read_pair` function returns an error while reading a key-value pair. -/// -/// # Type Parameters -/// -/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`. -/// * `V` - The type of the values in the map. -/// * `F` - The type of the function used to read key-value pairs from the buffer. -#[inline] -fn read_map<'a, K, V, F>( - len: usize, - buf: &mut &'a [u8], - read_pair: F, -) -> Result, DecodeError> -where - K: std::hash::Hash + Eq, - F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>, -{ - let mut map = HashMap::with_capacity(len); - for _ in 0..len { - let (k, v) = read_pair(buf)?; - map.insert(k, v); - } - Ok(map) -} - -#[inline] -fn read_map_len(buf: &mut &[u8]) -> Result { - match decode::read_marker(buf) - .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? - { - Marker::FixMap(len) => Ok(len as usize), - Marker::Map16 => buf - .read_data_u16() - .map_err(|_| DecodeError::IOError) - .map(|len| len as usize), - Marker::Map32 => buf - .read_data_u32() - .map_err(|_| DecodeError::IOError) - .map(|len| len as usize), - _ => Err(DecodeError::InvalidType( - "Unable to read map from buffer".to_owned(), - )), - } -} - -/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is -/// null. If it is not null, you can continue to decode as expected. -#[inline] -fn is_null_marker(buf: &mut &[u8]) -> bool { - if buf.first() == Some(NULL_MARKER) { - *buf = &buf[1..]; - true - } else { - false - } -} - #[cfg(test)] mod tests { use super::*; @@ -335,6 +153,7 @@ mod tests { use rmp_serde; use rmp_serde::to_vec_named; use serde_json::json; + use std::collections::HashMap; use tinybytes::BytesString; fn generate_meta_struct_element(i: u8) -> (String, Vec) { @@ -352,6 +171,7 @@ mod tests { (key, rmp_serde::to_vec_named(&map).unwrap()) } + #[test] fn test_empty_array() { let encoded_data = vec![0x90]; @@ -824,7 +644,7 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] - fn fuzz_from_slice() { + fn fuzz_from_bytes() { check!() .with_type::<( String, @@ -875,10 +695,11 @@ mod tests { start, ..Default::default() }; - let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); + let encoded_data = to_vec_named(&vec![vec![span.clone()]]).unwrap(); let result = from_bytes(tinybytes::Bytes::from(encoded_data)); assert!(result.is_ok()); + assert_eq!(result.unwrap().0, vec![vec![span]]) }, ); } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span/map.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/map.rs new file mode 100644 index 000000000..3a217dd1d --- /dev/null +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/map.rs @@ -0,0 +1,152 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use super::is_null_marker; +use super::number::read_number; +use super::string::read_string; +use crate::msgpack_decoder::v04::error::DecodeError; +use rmp::{decode, decode::RmpRead, Marker}; +use std::collections::HashMap; + +/// Read a map of string to string from `buf`. +#[inline] +pub fn read_str_map_to_str<'a>( + buf: &mut &'a [u8], +) -> Result, DecodeError> { + let len = decode::read_map_len(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; + + let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); + for _ in 0..len { + let key = read_string(buf)?; + let value = read_string(buf)?; + map.insert(key, value); + } + Ok(map) +} + +/// Read a nullable map of string to string from `buf`. +#[inline] +pub fn read_nullable_str_map_to_str<'a>( + buf: &mut &'a [u8], +) -> Result, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + read_str_map_to_str(buf) +} + +/// Read a map of string to f64 from `buf`. +#[inline] +pub fn read_metrics<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + fn read_metric_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { + let key = read_string(buf)?; + let v = read_number(buf)?; + + Ok((key, v)) + } + + let len = read_map_len(buf)?; + + read_map(len, buf, read_metric_pair) +} + +/// Read a map of string to u8 array from `buf`. +/// +/// The struct can't be a u8 slice since it is encoded as a msgpack array and not as a raw bytes +/// buffer. +#[inline] +pub fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result>, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + fn read_meta_struct_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, Vec), DecodeError> { + let key = read_string(buf)?; + let array_len = decode::read_array_len(buf).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) + })?; + + let mut v = Vec::with_capacity(array_len as usize); + + for _ in 0..array_len { + let value = read_number(buf)?; + v.push(value); + } + Ok((key, v)) + } + + let len = read_map_len(buf)?; + read_map(len, buf, read_meta_struct_pair) +} + +/// Reads a map from the buffer and returns it as a `HashMap`. +/// +/// This function is generic over the key and value types of the map, and it uses a provided +/// function to read key-value pairs from the buffer. +/// +/// # Arguments +/// +/// * `len` - The number of key-value pairs to read from the buffer. +/// * `buf` - A reference to the slice containing the encoded map data. +/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a +/// `Result<(K, V), DecodeError>`. +/// +/// # Returns +/// +/// * `Ok(HashMap)` - A `HashMap` containing the decoded key-value pairs if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The `read_pair` function returns an error while reading a key-value pair. +/// +/// # Type Parameters +/// +/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`. +/// * `V` - The type of the values in the map. +/// * `F` - The type of the function used to read key-value pairs from the buffer. +#[inline] +fn read_map<'a, K, V, F>( + len: usize, + buf: &mut &'a [u8], + read_pair: F, +) -> Result, DecodeError> +where + K: std::hash::Hash + Eq, + F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>, +{ + let mut map = HashMap::with_capacity(len); + for _ in 0..len { + let (k, v) = read_pair(buf)?; + map.insert(k, v); + } + Ok(map) +} + +/// Read the length of a msgpack map. +#[inline] +fn read_map_len(buf: &mut &[u8]) -> Result { + match decode::read_marker(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? + { + Marker::FixMap(len) => Ok(len as usize), + Marker::Map16 => buf + .read_data_u16() + .map_err(|_| DecodeError::IOError) + .map(|len| len as usize), + Marker::Map32 => buf + .read_data_u32() + .map_err(|_| DecodeError::IOError) + .map(|len| len as usize), + _ => Err(DecodeError::InvalidType( + "Unable to read map from buffer".to_owned(), + )), + } +} diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/mod.rs similarity index 74% rename from trace-utils/src/msgpack_decoder/v04/decoder/span.rs rename to trace-utils/src/msgpack_decoder/v04/decoder/span/mod.rs index f0d181126..ee87795aa 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/mod.rs @@ -1,14 +1,38 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use super::{ - read_meta_struct, read_metrics, read_nullable_number_ref, read_nullable_str_map_to_str, - read_nullable_string, read_string, span_link::read_span_links, -}; +/// Read maps from msgpack +mod map; +/// Read numbers from msgpack +mod number; +/// Read span links from msgpack +mod span_link; +/// Read strings from msgpack +mod string; + use crate::msgpack_decoder::v04::error::DecodeError; use crate::span_v04::{SpanKey, SpanSlice}; +use map::{read_meta_struct, read_metrics, read_nullable_str_map_to_str}; +use number::read_nullable_number; +use span_link::read_span_links; +use string::{read_nullable_string, read_string}; + +// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) +const NULL_MARKER: &u8 = &0xc0; + +/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is +/// null. If it is not null, you can continue to decode as expected. +#[inline] +fn is_null_marker(buf: &mut &[u8]) -> bool { + if buf.first() == Some(NULL_MARKER) { + *buf = &buf[1..]; + true + } else { + false + } +} -/// Decodes a slice of bytes into a `Span` object. +/// Decodes a slice of bytes into a `SpanSlice` object. /// /// # Arguments /// @@ -16,7 +40,7 @@ use crate::span_v04::{SpanKey, SpanSlice}; /// /// # Returns /// -/// * `Ok(Span)` - A decoded `Span` object if successful. +/// * `Ok(Span)` - A decoded `SpanSlice` object if successful. /// * `Err(DecodeError)` - An error if the decoding process fails. /// /// # Errors @@ -38,8 +62,7 @@ pub fn decode_span<'a>(buffer: &mut &'a [u8]) -> Result, DecodeErr Ok(span) } -// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the -// BytesStrings +/// Read the next entry from `buf` and update `span` corresponding field. fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> { let key = read_string(buf)? .parse::() @@ -49,12 +72,12 @@ fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), Dec SpanKey::Service => span.service = read_nullable_string(buf)?, SpanKey::Name => span.name = read_nullable_string(buf)?, SpanKey::Resource => span.resource = read_nullable_string(buf)?, - SpanKey::TraceId => span.trace_id = read_nullable_number_ref(buf)?, - SpanKey::SpanId => span.span_id = read_nullable_number_ref(buf)?, - SpanKey::ParentId => span.parent_id = read_nullable_number_ref(buf)?, - SpanKey::Start => span.start = read_nullable_number_ref(buf)?, - SpanKey::Duration => span.duration = read_nullable_number_ref(buf)?, - SpanKey::Error => span.error = read_nullable_number_ref(buf)?, + SpanKey::TraceId => span.trace_id = read_nullable_number(buf)?, + SpanKey::SpanId => span.span_id = read_nullable_number(buf)?, + SpanKey::ParentId => span.parent_id = read_nullable_number(buf)?, + SpanKey::Start => span.start = read_nullable_number(buf)?, + SpanKey::Duration => span.duration = read_nullable_number(buf)?, + SpanKey::Error => span.error = read_nullable_number(buf)?, SpanKey::Type => span.r#type = read_nullable_string(buf)?, SpanKey::Meta => span.meta = read_nullable_str_map_to_str(buf)?, SpanKey::Metrics => span.metrics = read_metrics(buf)?, diff --git a/trace-utils/src/msgpack_decoder/v04/number.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/number.rs similarity index 94% rename from trace-utils/src/msgpack_decoder/v04/number.rs rename to trace-utils/src/msgpack_decoder/v04/decoder/span/number.rs index d6192919d..fb72eca99 100644 --- a/trace-utils/src/msgpack_decoder/v04/number.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/number.rs @@ -1,13 +1,12 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use super::error::DecodeError; +use crate::msgpack_decoder::v04::error::DecodeError; use rmp::{decode::RmpRead, Marker}; use std::fmt; -use tinybytes::Bytes; #[derive(Debug, PartialEq)] -pub enum Number { +pub(crate) enum Number { Unsigned(u64), Signed(i64), Float(f64), @@ -150,7 +149,7 @@ impl TryFrom for f64 { } } -fn read_number(buf: &mut &[u8], allow_null: bool) -> Result { +fn read_number_inner(buf: &mut &[u8], allow_null: bool) -> Result { match rmp::decode::read_marker(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for number".to_owned()))? { @@ -197,28 +196,18 @@ fn read_number(buf: &mut &[u8], allow_null: bool) -> Result } } -pub fn read_number_bytes>( - buf: &mut Bytes, -) -> Result { - read_number(unsafe { buf.as_mut_slice() }, false)?.try_into() -} - -pub fn read_nullable_number_bytes>( - buf: &mut Bytes, -) -> Result { - read_number(unsafe { buf.as_mut_slice() }, true)?.try_into() -} - -pub fn read_number_ref>( +/// Read a msgpack encoded number from `buf`. +pub fn read_number>( buf: &mut &[u8], ) -> Result { - read_number(buf, false)?.try_into() + read_number_inner(buf, false)?.try_into() } -pub fn read_nullable_number_ref>( +/// Read a msgpack encoded number from `buf` and return 0 if null. +pub fn read_nullable_number>( buf: &mut &[u8], ) -> Result { - read_number(buf, true)?.try_into() + read_number_inner(buf, true)?.try_into() } #[cfg(test)] @@ -233,8 +222,7 @@ mod tests { let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: u8 = read_number_bytes(&mut bytes).unwrap(); + let result: u8 = read_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, expected_value); } @@ -244,8 +232,7 @@ mod tests { let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: i8 = read_number_bytes(&mut bytes).unwrap(); + let result: i8 = read_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, expected_value); } @@ -255,8 +242,7 @@ mod tests { let expected_value = 42.98; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: f64 = read_number_bytes(&mut bytes).unwrap(); + let result: f64 = read_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, expected_value); } @@ -265,8 +251,7 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: Result = read_number_bytes(&mut bytes); + let result: Result = read_number(&mut buf.as_slice()); assert!(matches!(result, Err(DecodeError::InvalidType(_)))); assert_eq!( @@ -280,8 +265,7 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: u8 = read_nullable_number_bytes(&mut bytes).unwrap(); + let result: u8 = read_nullable_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, 0); } @@ -290,8 +274,7 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: i8 = read_nullable_number_bytes(&mut bytes).unwrap(); + let result: i8 = read_nullable_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, 0); } @@ -300,8 +283,7 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: f64 = read_nullable_number_bytes(&mut bytes).unwrap(); + let result: f64 = read_nullable_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, 0.0); } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/span_link.rs similarity index 90% rename from trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs rename to trace-utils/src/msgpack_decoder/v04/decoder/span/span_link.rs index 30418d3d6..9af3d4792 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/span_link.rs @@ -1,14 +1,14 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack_decoder::v04::decoder::read_string; +use super::number::read_number; +use super::string::read_string; use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_number_ref; use crate::span_v04::SpanLinkSlice; use rmp::Marker; use std::str::FromStr; -use super::{is_null_marker, read_str_map_to_str}; +use super::{is_null_marker, map::read_str_map_to_str}; /// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. /// @@ -85,12 +85,12 @@ fn decode_span_link<'a>(buf: &mut &'a [u8]) -> Result, DecodeE for _ in 0..span_size { match read_string(buf)?.parse::()? { - SpanLinkKey::TraceId => span.trace_id = read_number_ref(buf)?, - SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_ref(buf)?, - SpanLinkKey::SpanId => span.span_id = read_number_ref(buf)?, + SpanLinkKey::TraceId => span.trace_id = read_number(buf)?, + SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number(buf)?, + SpanLinkKey::SpanId => span.span_id = read_number(buf)?, SpanLinkKey::Attributes => span.attributes = read_str_map_to_str(buf)?, SpanLinkKey::Tracestate => span.tracestate = read_string(buf)?, - SpanLinkKey::Flags => span.flags = read_number_ref(buf)?, + SpanLinkKey::Flags => span.flags = read_number(buf)?, } } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span/string.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/string.rs new file mode 100644 index 000000000..142206515 --- /dev/null +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/string.rs @@ -0,0 +1,44 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use super::is_null_marker; +use crate::msgpack_decoder::v04::error::DecodeError; +use rmp::decode::{self, DecodeStringError}; + +#[inline] +fn read_string_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { + decode::read_str_from_slice(buf).map_err(|e| match e { + DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), + DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), + DecodeStringError::TypeMismatch(marker) => { + DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker)) + } + DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), + _ => DecodeError::IOError, + }) +} + +/// Read a string from `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string. +#[inline] +pub fn read_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + read_string_nomut(buf).map(|(str, newbuf)| { + *buf = newbuf; + str + }) +} + +/// Read a nullable string from `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string or a null marker. +#[inline] +pub fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + if is_null_marker(buf) { + Ok("") + } else { + read_string(buf) + } +} diff --git a/trace-utils/src/msgpack_decoder/v04/error.rs b/trace-utils/src/msgpack_decoder/v04/error.rs index ff74819f5..fb37c76bf 100644 --- a/trace-utils/src/msgpack_decoder/v04/error.rs +++ b/trace-utils/src/msgpack_decoder/v04/error.rs @@ -1,12 +1,18 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Represent error that can happen while decoding msgpack #[derive(Debug, PartialEq)] pub enum DecodeError { + /// Failed to convert a number to the expected type. InvalidConversion(String), + /// Payload does not match the expected type for a trace payload. InvalidType(String), + /// Payload is not a valid msgpack object. InvalidFormat(String), + /// Failed to read the buffer. IOError, + /// The payload contains non-utf8 strings. Utf8Error(String), } diff --git a/trace-utils/src/msgpack_decoder/v04/mod.rs b/trace-utils/src/msgpack_decoder/v04/mod.rs index 5b789e7c1..bf5710d74 100644 --- a/trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/mod.rs @@ -1,6 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Decode a V04 encoded trace payload pub mod decoder; +/// Error type used for decoding pub mod error; -pub mod number;