diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index 78ed78456..470d920eb 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -588,7 +588,7 @@ impl TraceExporter { fn send_deser_ser(&self, data: tinybytes::Bytes) -> Result { // TODO base on input format - let (mut traces, size) = match msgpack_decoder::v04::decoder::from_slice(data) { + let (mut traces, size) = match msgpack_decoder::v04::decoder::from_bytes(data) { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); diff --git a/tinybytes/src/bytes_string.rs b/tinybytes/src/bytes_string.rs index 67b9ea9c3..629eedeed 100644 --- a/tinybytes/src/bytes_string.rs +++ b/tinybytes/src/bytes_string.rs @@ -82,6 +82,21 @@ impl BytesString { } } + /// Creates a `BytesString` from a string slice within the given buffer. + /// + /// # Arguments + /// + /// * `bytes` - A `tinybytes::Bytes` instance that will be converted into a `BytesString`. + /// * `slice` - The string slice pointing into the given bytes that will form the `BytesString`. + pub fn try_from_bytes_slice(bytes: &Bytes, slice: &str) -> Option { + // SAFETY: This is safe as a str slice is definitely a valid UTF-8 slice. + unsafe { + Some(Self::from_bytes_unchecked( + bytes.slice_ref(slice.as_bytes())?, + )) + } + } + /// Creates a `BytesString` from a `tinybytes::Bytes` instance without validating the bytes. /// /// This function does not perform any validation on the provided bytes, and assumes that the diff --git a/trace-utils/src/msgpack_decoder/mod.rs b/trace-utils/src/msgpack_decoder/mod.rs index f4e980a0a..f78316a76 100644 --- a/trace-utils/src/msgpack_decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/mod.rs @@ -1,4 +1,5 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Decoding logic for V04 encoded trace payload pub mod v04; diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs index d5e01f6c9..38b4df27d 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/mod.rs @@ -1,24 +1,14 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Read spans from msgpack mod span; -mod span_link; -use self::span::decode_span; use super::error::DecodeError; -use super::number::read_number_bytes; -use crate::span_v04::Span; -use rmp::decode::DecodeStringError; -use rmp::{decode, decode::RmpRead, Marker}; -use std::{collections::HashMap, f64}; -use tinybytes::{Bytes, BytesString}; +use crate::span_v04::{Span, SpanSlice}; +use span::decode_span; -// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) -const NULL_MARKER: &u8 = &0xc0; - -/// Decodes a slice of bytes into a vector of `TracerPayloadV04` objects. -/// -/// +/// Decodes a Bytes buffer into a vector of `TracerPayloadV04` objects. /// /// # Arguments /// @@ -39,10 +29,10 @@ const NULL_MARKER: &u8 = &0xc0; /// # Examples /// /// ``` -/// use datadog_trace_protobuf::pb::Span; -/// use datadog_trace_utils::msgpack_decoder::v04::decoder::from_slice; -/// use rmp_serde::to_vec_named; -/// use tinybytes; +/// # use datadog_trace_protobuf::pb::Span; +/// # use datadog_trace_utils::msgpack_decoder::v04::decoder::from_bytes; +/// # use rmp_serde::to_vec_named; +/// # use tinybytes; /// /// let span = Span { /// name: "test-span".to_owned(), @@ -51,18 +41,73 @@ const NULL_MARKER: &u8 = &0xc0; /// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); /// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); /// let (decoded_traces, _payload_size) = -/// from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); +/// from_bytes(encoded_data_as_tinybytes).expect("Decoding failed"); /// /// assert_eq!(1, decoded_traces.len()); /// assert_eq!(1, decoded_traces[0].len()); /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name.as_str()); /// ``` -pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { - let trace_count = - rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) - })?; +pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { + let mut parsed_data = data.clone(); + let (traces_ref, size) = from_slice(unsafe { parsed_data.as_mut_slice() })?; + let traces_owned = traces_ref + .iter() + .map(|trace| { + trace + .iter() + // Safe to unwrap since the spans use subslices of the `data` slice + .map(|span| span.try_to_bytes(&data).unwrap()) + .collect() + }) + .collect(); + Ok((traces_owned, size)) +} + +/// Decodes a slice of bytes into a `Vec>` object. +/// The resulting spans have the same lifetime as the initial buffer. +/// +/// # Arguments +/// +/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be +/// encoded msgpack data containing a list of a list of v04 spans. +/// +/// # Returns +/// +/// * `Ok(Vec>)` - The decoded `SpanSlice` objects if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The array length for trace count or span count cannot be read. +/// - Any span cannot be decoded. +/// +/// # Examples +/// +/// ``` +/// # use datadog_trace_protobuf::pb::Span; +/// # use datadog_trace_utils::msgpack_decoder::v04::decoder::from_slice; +/// # use rmp_serde::to_vec_named; +/// # use tinybytes; +/// +/// let span = Span { +/// name: "test-span".to_owned(), +/// ..Default::default() +/// }; +/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); +/// let (decoded_traces, _payload_size) = +/// from_slice(encoded_data.as_slice()).expect("Decoding failed"); +/// +/// assert_eq!(1, decoded_traces.len()); +/// assert_eq!(1, decoded_traces[0].len()); +/// let decoded_span = &decoded_traces[0][0]; +/// assert_eq!("test-span", decoded_span.name); +/// ``` +pub fn from_slice(mut data: &[u8]) -> Result<(Vec>, usize), DecodeError> { + let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) + })?; let start_len = data.len(); @@ -74,12 +119,9 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), .expect("Unable to cast trace_count to usize"), ), |mut traces, _| { - let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) - .map_err(|_| { - DecodeError::InvalidFormat( - "Unable to read array len for span count".to_owned(), - ) - })?; + let span_count = rmp::decode::read_array_len(&mut data).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for span count".to_owned()) + })?; let trace = (0..span_count).try_fold( Vec::with_capacity( @@ -103,200 +145,6 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), )) } -#[inline] -fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { - decode::read_str_from_slice(buf).map_err(|e| match e { - DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), - DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), - DecodeStringError::TypeMismatch(marker) => { - DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker)) - } - DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), - _ => DecodeError::IOError, - }) -} - -#[inline] -fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { - read_string_ref_nomut(buf).map(|(str, newbuf)| { - *buf = newbuf; - str - }) -} - -#[inline] -fn read_string_bytes(buf: &mut Bytes) -> Result { - // Note: we need to pass a &'static lifetime here, otherwise it'll complain - read_string_ref_nomut(unsafe { buf.as_mut_slice() }).map(|(str, newbuf)| { - let string = BytesString::from_bytes_slice(buf, str); - *unsafe { buf.as_mut_slice() } = newbuf; - string - }) -} - -#[inline] -fn read_nullable_string_bytes(buf: &mut Bytes) -> Result { - if let Some(empty_string) = handle_null_marker(buf, BytesString::default) { - Ok(empty_string) - } else { - read_string_bytes(buf) - } -} - -#[inline] -// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the -// BytesStrings. -fn read_str_map_to_bytes_strings( - buf: &mut Bytes, -) -> Result, DecodeError> { - let len = decode::read_map_len(unsafe { buf.as_mut_slice() }) - .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; - - let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); - for _ in 0..len { - let key = read_string_bytes(buf)?; - let value = read_string_bytes(buf)?; - map.insert(key, value); - } - Ok(map) -} - -#[inline] -fn read_nullable_str_map_to_bytes_strings( - buf: &mut Bytes, -) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); - } - - read_str_map_to_bytes_strings(buf) -} - -#[inline] -fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> { - let key = read_string_bytes(buf)?; - let v = read_number_bytes(buf)?; - - Ok((key, v)) -} -#[inline] -fn read_metrics(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); - } - - let len = read_map_len(unsafe { buf.as_mut_slice() })?; - - read_map(len, buf, read_metric_pair) -} - -#[inline] -fn read_meta_struct(buf: &mut Bytes) -> Result>, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); - } - - fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Vec), DecodeError> { - let key = read_string_bytes(buf)?; - let array_len = decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) - })?; - - let mut v = Vec::with_capacity(array_len as usize); - - for _ in 0..array_len { - let value = read_number_bytes(buf)?; - v.push(value); - } - Ok((key, v)) - } - - let len = read_map_len(unsafe { buf.as_mut_slice() })?; - read_map(len, buf, read_meta_struct_pair) -} - -/// Reads a map from the buffer and returns it as a `HashMap`. -/// -/// This function is generic over the key and value types of the map, and it uses a provided -/// function to read key-value pairs from the buffer. -/// -/// # Arguments -/// -/// * `len` - The number of key-value pairs to read from the buffer. -/// * `buf` - A reference to the Bytes containing the encoded map data. -/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a -/// `Result<(K, V), DecodeError>`. -/// -/// # Returns -/// -/// * `Ok(HashMap)` - A `HashMap` containing the decoded key-value pairs if successful. -/// * `Err(DecodeError)` - An error if the decoding process fails. -/// -/// # Errors -/// -/// This function will return an error if: -/// - The `read_pair` function returns an error while reading a key-value pair. -/// -/// # Type Parameters -/// -/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`. -/// * `V` - The type of the values in the map. -/// * `F` - The type of the function used to read key-value pairs from the buffer. -#[inline] -fn read_map( - len: usize, - buf: &mut Bytes, - read_pair: F, -) -> Result, DecodeError> -where - K: std::hash::Hash + Eq, - F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>, -{ - let mut map = HashMap::with_capacity(len); - for _ in 0..len { - let (k, v) = read_pair(buf)?; - map.insert(k, v); - } - Ok(map) -} - -#[inline] -fn read_map_len(buf: &mut &[u8]) -> Result { - match decode::read_marker(buf) - .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? - { - Marker::FixMap(len) => Ok(len as usize), - Marker::Map16 => buf - .read_data_u16() - .map_err(|_| DecodeError::IOError) - .map(|len| len as usize), - Marker::Map32 => buf - .read_data_u32() - .map_err(|_| DecodeError::IOError) - .map(|len| len as usize), - _ => Err(DecodeError::InvalidType( - "Unable to read map from buffer".to_owned(), - )), - } -} - -/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is -/// null and return the default value. If it is not null, you can continue to decode as expected. -#[inline] -fn handle_null_marker(buf: &mut Bytes, default: F) -> Option -where - F: FnOnce() -> T, -{ - let slice = unsafe { buf.as_mut_slice() }; - - if slice.first() == Some(NULL_MARKER) { - *slice = &slice[1..]; - Some(default()) - } else { - None - } -} - #[cfg(test)] mod tests { use super::*; @@ -305,6 +153,7 @@ mod tests { use rmp_serde; use rmp_serde::to_vec_named; use serde_json::json; + use std::collections::HashMap; use tinybytes::BytesString; fn generate_meta_struct_element(i: u8) -> (String, Vec) { @@ -322,13 +171,14 @@ mod tests { (key, rmp_serde::to_vec_named(&map).unwrap()) } + #[test] fn test_empty_array() { let encoded_data = vec![0x90]; let encoded_data = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let (_decoded_traces, decoded_size) = from_slice(bytes).expect("Decoding failed"); + let (_decoded_traces, decoded_size) = from_bytes(bytes).expect("Decoding failed"); assert_eq!(0, decoded_size); } @@ -343,7 +193,7 @@ mod tests { let expected_size = encoded_data.len() - 1; // rmp_serde adds additional 0 byte encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (_decoded_traces, decoded_size) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(expected_size, decoded_size); } @@ -358,7 +208,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -373,7 +223,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -387,7 +237,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -402,7 +252,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -417,7 +267,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -438,7 +288,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -462,7 +312,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -488,7 +338,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -509,7 +359,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -534,7 +384,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -556,7 +406,7 @@ mod tests { span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -580,7 +430,7 @@ mod tests { span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -600,7 +450,7 @@ mod tests { span["metrics"] = json!(null); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -627,7 +477,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -674,7 +524,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -696,7 +546,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::InvalidFormat( "Expected at least bytes 1, but only got 0 (pos 0)".to_owned() @@ -719,7 +569,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::Utf8Error( "invalid utf-8 sequence of 1 bytes from index 1".to_owned() @@ -739,7 +589,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::InvalidFormat( @@ -761,7 +611,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::InvalidFormat( @@ -782,7 +632,7 @@ mod tests { unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; let bytes = tinybytes::Bytes::from_static(encoded_data); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert_eq!( Err(DecodeError::InvalidType( @@ -794,7 +644,7 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] - fn fuzz_from_slice() { + fn fuzz_from_bytes() { check!() .with_type::<( String, @@ -845,10 +695,11 @@ mod tests { start, ..Default::default() }; - let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); - let result = from_slice(tinybytes::Bytes::from(encoded_data)); + let encoded_data = to_vec_named(&vec![vec![span.clone()]]).unwrap(); + let result = from_bytes(tinybytes::Bytes::from(encoded_data)); assert!(result.is_ok()); + assert_eq!(result.unwrap().0, vec![vec![span]]) }, ); } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span/map.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/map.rs new file mode 100644 index 000000000..3a217dd1d --- /dev/null +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/map.rs @@ -0,0 +1,152 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use super::is_null_marker; +use super::number::read_number; +use super::string::read_string; +use crate::msgpack_decoder::v04::error::DecodeError; +use rmp::{decode, decode::RmpRead, Marker}; +use std::collections::HashMap; + +/// Read a map of string to string from `buf`. +#[inline] +pub fn read_str_map_to_str<'a>( + buf: &mut &'a [u8], +) -> Result, DecodeError> { + let len = decode::read_map_len(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; + + let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); + for _ in 0..len { + let key = read_string(buf)?; + let value = read_string(buf)?; + map.insert(key, value); + } + Ok(map) +} + +/// Read a nullable map of string to string from `buf`. +#[inline] +pub fn read_nullable_str_map_to_str<'a>( + buf: &mut &'a [u8], +) -> Result, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + read_str_map_to_str(buf) +} + +/// Read a map of string to f64 from `buf`. +#[inline] +pub fn read_metrics<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + fn read_metric_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { + let key = read_string(buf)?; + let v = read_number(buf)?; + + Ok((key, v)) + } + + let len = read_map_len(buf)?; + + read_map(len, buf, read_metric_pair) +} + +/// Read a map of string to u8 array from `buf`. +/// +/// The struct can't be a u8 slice since it is encoded as a msgpack array and not as a raw bytes +/// buffer. +#[inline] +pub fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result>, DecodeError> { + if is_null_marker(buf) { + return Ok(HashMap::default()); + } + + fn read_meta_struct_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, Vec), DecodeError> { + let key = read_string(buf)?; + let array_len = decode::read_array_len(buf).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned()) + })?; + + let mut v = Vec::with_capacity(array_len as usize); + + for _ in 0..array_len { + let value = read_number(buf)?; + v.push(value); + } + Ok((key, v)) + } + + let len = read_map_len(buf)?; + read_map(len, buf, read_meta_struct_pair) +} + +/// Reads a map from the buffer and returns it as a `HashMap`. +/// +/// This function is generic over the key and value types of the map, and it uses a provided +/// function to read key-value pairs from the buffer. +/// +/// # Arguments +/// +/// * `len` - The number of key-value pairs to read from the buffer. +/// * `buf` - A reference to the slice containing the encoded map data. +/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a +/// `Result<(K, V), DecodeError>`. +/// +/// # Returns +/// +/// * `Ok(HashMap)` - A `HashMap` containing the decoded key-value pairs if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The `read_pair` function returns an error while reading a key-value pair. +/// +/// # Type Parameters +/// +/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`. +/// * `V` - The type of the values in the map. +/// * `F` - The type of the function used to read key-value pairs from the buffer. +#[inline] +fn read_map<'a, K, V, F>( + len: usize, + buf: &mut &'a [u8], + read_pair: F, +) -> Result, DecodeError> +where + K: std::hash::Hash + Eq, + F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>, +{ + let mut map = HashMap::with_capacity(len); + for _ in 0..len { + let (k, v) = read_pair(buf)?; + map.insert(k, v); + } + Ok(map) +} + +/// Read the length of a msgpack map. +#[inline] +fn read_map_len(buf: &mut &[u8]) -> Result { + match decode::read_marker(buf) + .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? + { + Marker::FixMap(len) => Ok(len as usize), + Marker::Map16 => buf + .read_data_u16() + .map_err(|_| DecodeError::IOError) + .map(|len| len as usize), + Marker::Map32 => buf + .read_data_u32() + .map_err(|_| DecodeError::IOError) + .map(|len| len as usize), + _ => Err(DecodeError::InvalidType( + "Unable to read map from buffer".to_owned(), + )), + } +} diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/mod.rs similarity index 60% rename from trace-utils/src/msgpack_decoder/v04/decoder/span.rs rename to trace-utils/src/msgpack_decoder/v04/decoder/span/mod.rs index ccbb7d1e9..ee87795aa 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/mod.rs @@ -1,16 +1,38 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use super::{ - read_meta_struct, read_metrics, read_nullable_str_map_to_bytes_strings, - read_nullable_string_bytes, read_string_ref, span_link::read_span_links, -}; +/// Read maps from msgpack +mod map; +/// Read numbers from msgpack +mod number; +/// Read span links from msgpack +mod span_link; +/// Read strings from msgpack +mod string; + use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_nullable_number_bytes; -use crate::span_v04::{Span, SpanKey}; -use tinybytes::Bytes; +use crate::span_v04::{SpanKey, SpanSlice}; +use map::{read_meta_struct, read_metrics, read_nullable_str_map_to_str}; +use number::read_nullable_number; +use span_link::read_span_links; +use string::{read_nullable_string, read_string}; + +// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) +const NULL_MARKER: &u8 = &0xc0; + +/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is +/// null. If it is not null, you can continue to decode as expected. +#[inline] +fn is_null_marker(buf: &mut &[u8]) -> bool { + if buf.first() == Some(NULL_MARKER) { + *buf = &buf[1..]; + true + } else { + false + } +} -/// Decodes a slice of bytes into a `Span` object. +/// Decodes a slice of bytes into a `SpanSlice` object. /// /// # Arguments /// @@ -18,7 +40,7 @@ use tinybytes::Bytes; /// /// # Returns /// -/// * `Ok(Span)` - A decoded `Span` object if successful. +/// * `Ok(Span)` - A decoded `SpanSlice` object if successful. /// * `Err(DecodeError)` - An error if the decoding process fails. /// /// # Errors @@ -26,10 +48,10 @@ use tinybytes::Bytes; /// This function will return an error if: /// - The map length cannot be read. /// - Any key or value cannot be decoded. -pub fn decode_span(buffer: &mut Bytes) -> Result { - let mut span = Span::default(); +pub fn decode_span<'a>(buffer: &mut &'a [u8]) -> Result, DecodeError> { + let mut span = SpanSlice::default(); - let span_size = rmp::decode::read_map_len(unsafe { buffer.as_mut_slice() }).map_err(|_| { + let span_size = rmp::decode::read_map_len(buffer).map_err(|_| { DecodeError::InvalidFormat("Unable to get map len for span size".to_owned()) })?; @@ -40,25 +62,24 @@ pub fn decode_span(buffer: &mut Bytes) -> Result { Ok(span) } -// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the -// BytesStrings -fn fill_span(span: &mut Span, buf: &mut Bytes) -> Result<(), DecodeError> { - let key = read_string_ref(unsafe { buf.as_mut_slice() })? +/// Read the next entry from `buf` and update `span` corresponding field. +fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> { + let key = read_string(buf)? .parse::() .map_err(|_| DecodeError::InvalidFormat("Invalid span key".to_owned()))?; match key { - SpanKey::Service => span.service = read_nullable_string_bytes(buf)?, - SpanKey::Name => span.name = read_nullable_string_bytes(buf)?, - SpanKey::Resource => span.resource = read_nullable_string_bytes(buf)?, - SpanKey::TraceId => span.trace_id = read_nullable_number_bytes(buf)?, - SpanKey::SpanId => span.span_id = read_nullable_number_bytes(buf)?, - SpanKey::ParentId => span.parent_id = read_nullable_number_bytes(buf)?, - SpanKey::Start => span.start = read_nullable_number_bytes(buf)?, - SpanKey::Duration => span.duration = read_nullable_number_bytes(buf)?, - SpanKey::Error => span.error = read_nullable_number_bytes(buf)?, - SpanKey::Type => span.r#type = read_nullable_string_bytes(buf)?, - SpanKey::Meta => span.meta = read_nullable_str_map_to_bytes_strings(buf)?, + SpanKey::Service => span.service = read_nullable_string(buf)?, + SpanKey::Name => span.name = read_nullable_string(buf)?, + SpanKey::Resource => span.resource = read_nullable_string(buf)?, + SpanKey::TraceId => span.trace_id = read_nullable_number(buf)?, + SpanKey::SpanId => span.span_id = read_nullable_number(buf)?, + SpanKey::ParentId => span.parent_id = read_nullable_number(buf)?, + SpanKey::Start => span.start = read_nullable_number(buf)?, + SpanKey::Duration => span.duration = read_nullable_number(buf)?, + SpanKey::Error => span.error = read_nullable_number(buf)?, + SpanKey::Type => span.r#type = read_nullable_string(buf)?, + SpanKey::Meta => span.meta = read_nullable_str_map_to_str(buf)?, SpanKey::Metrics => span.metrics = read_metrics(buf)?, SpanKey::MetaStruct => span.meta_struct = read_meta_struct(buf)?, SpanKey::SpanLinks => span.span_links = read_span_links(buf)?, diff --git a/trace-utils/src/msgpack_decoder/v04/number.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/number.rs similarity index 95% rename from trace-utils/src/msgpack_decoder/v04/number.rs rename to trace-utils/src/msgpack_decoder/v04/decoder/span/number.rs index 219591481..fb72eca99 100644 --- a/trace-utils/src/msgpack_decoder/v04/number.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/number.rs @@ -1,13 +1,12 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use super::error::DecodeError; +use crate::msgpack_decoder::v04::error::DecodeError; use rmp::{decode::RmpRead, Marker}; use std::fmt; -use tinybytes::Bytes; #[derive(Debug, PartialEq)] -pub enum Number { +pub(crate) enum Number { Unsigned(u64), Signed(i64), Float(f64), @@ -150,7 +149,7 @@ impl TryFrom for f64 { } } -fn read_number(buf: &mut &[u8], allow_null: bool) -> Result { +fn read_number_inner(buf: &mut &[u8], allow_null: bool) -> Result { match rmp::decode::read_marker(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for number".to_owned()))? { @@ -197,16 +196,18 @@ fn read_number(buf: &mut &[u8], allow_null: bool) -> Result } } -pub fn read_number_bytes>( - buf: &mut Bytes, +/// Read a msgpack encoded number from `buf`. +pub fn read_number>( + buf: &mut &[u8], ) -> Result { - read_number(unsafe { buf.as_mut_slice() }, false)?.try_into() + read_number_inner(buf, false)?.try_into() } -pub fn read_nullable_number_bytes>( - buf: &mut Bytes, +/// Read a msgpack encoded number from `buf` and return 0 if null. +pub fn read_nullable_number>( + buf: &mut &[u8], ) -> Result { - read_number(unsafe { buf.as_mut_slice() }, true)?.try_into() + read_number_inner(buf, true)?.try_into() } #[cfg(test)] @@ -221,8 +222,7 @@ mod tests { let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: u8 = read_number_bytes(&mut bytes).unwrap(); + let result: u8 = read_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, expected_value); } @@ -232,8 +232,7 @@ mod tests { let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: i8 = read_number_bytes(&mut bytes).unwrap(); + let result: i8 = read_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, expected_value); } @@ -243,8 +242,7 @@ mod tests { let expected_value = 42.98; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: f64 = read_number_bytes(&mut bytes).unwrap(); + let result: f64 = read_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, expected_value); } @@ -253,8 +251,7 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: Result = read_number_bytes(&mut bytes); + let result: Result = read_number(&mut buf.as_slice()); assert!(matches!(result, Err(DecodeError::InvalidType(_)))); assert_eq!( @@ -268,8 +265,7 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: u8 = read_nullable_number_bytes(&mut bytes).unwrap(); + let result: u8 = read_nullable_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, 0); } @@ -278,8 +274,7 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: i8 = read_nullable_number_bytes(&mut bytes).unwrap(); + let result: i8 = read_nullable_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, 0); } @@ -288,8 +283,7 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: f64 = read_nullable_number_bytes(&mut bytes).unwrap(); + let result: f64 = read_nullable_number(&mut buf.as_slice()).unwrap(); assert_eq!(result, 0.0); } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/span_link.rs similarity index 73% rename from trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs rename to trace-utils/src/msgpack_decoder/v04/decoder/span/span_link.rs index 2e9c4ec5f..9af3d4792 100644 --- a/trace-utils/src/msgpack_decoder/v04/decoder/span_link.rs +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/span_link.rs @@ -1,15 +1,14 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack_decoder::v04::decoder::{ - handle_null_marker, read_str_map_to_bytes_strings, read_string_bytes, read_string_ref, -}; +use super::number::read_number; +use super::string::read_string; use crate::msgpack_decoder::v04::error::DecodeError; -use crate::msgpack_decoder::v04::number::read_number_bytes; -use crate::span_v04::SpanLink; +use crate::span_v04::SpanLinkSlice; use rmp::Marker; use std::str::FromStr; -use tinybytes::Bytes; + +use super::{is_null_marker, map::read_str_map_to_str}; /// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. /// @@ -28,16 +27,18 @@ use tinybytes::Bytes; /// - The marker for the array length cannot be read. /// - Any `SpanLink` cannot be decoded. /// ``` -pub(crate) fn read_span_links(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_vec) = handle_null_marker(buf, Vec::default) { - return Ok(empty_vec); +pub(crate) fn read_span_links<'a>( + buf: &mut &'a [u8], +) -> Result>, DecodeError> { + if is_null_marker(buf) { + return Ok(Vec::default()); } - match rmp::decode::read_marker(unsafe { buf.as_mut_slice() }).map_err(|_| { + match rmp::decode::read_marker(buf).map_err(|_| { DecodeError::InvalidFormat("Unable to read marker for span links".to_owned()) })? { Marker::FixArray(len) => { - let mut vec: Vec = Vec::with_capacity(len.into()); + let mut vec: Vec> = Vec::with_capacity(len.into()); for _ in 0..len { vec.push(decode_span_link(buf)?); } @@ -48,6 +49,7 @@ pub(crate) fn read_span_links(buf: &mut Bytes) -> Result, DecodeEr )), } } + #[derive(Debug, PartialEq)] enum SpanLinkKey { TraceId, @@ -76,19 +78,19 @@ impl FromStr for SpanLinkKey { } } -fn decode_span_link(buf: &mut Bytes) -> Result { - let mut span = SpanLink::default(); - let span_size = rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }) +fn decode_span_link<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + let mut span = SpanLinkSlice::default(); + let span_size = rmp::decode::read_map_len(buf) .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; for _ in 0..span_size { - match read_string_ref(unsafe { buf.as_mut_slice() })?.parse::()? { - SpanLinkKey::TraceId => span.trace_id = read_number_bytes(buf)?, - SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_bytes(buf)?, - SpanLinkKey::SpanId => span.span_id = read_number_bytes(buf)?, - SpanLinkKey::Attributes => span.attributes = read_str_map_to_bytes_strings(buf)?, - SpanLinkKey::Tracestate => span.tracestate = read_string_bytes(buf)?, - SpanLinkKey::Flags => span.flags = read_number_bytes(buf)?, + match read_string(buf)?.parse::()? { + SpanLinkKey::TraceId => span.trace_id = read_number(buf)?, + SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number(buf)?, + SpanLinkKey::SpanId => span.span_id = read_number(buf)?, + SpanLinkKey::Attributes => span.attributes = read_str_map_to_str(buf)?, + SpanLinkKey::Tracestate => span.tracestate = read_string(buf)?, + SpanLinkKey::Flags => span.flags = read_number(buf)?, } } diff --git a/trace-utils/src/msgpack_decoder/v04/decoder/span/string.rs b/trace-utils/src/msgpack_decoder/v04/decoder/span/string.rs new file mode 100644 index 000000000..142206515 --- /dev/null +++ b/trace-utils/src/msgpack_decoder/v04/decoder/span/string.rs @@ -0,0 +1,44 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use super::is_null_marker; +use crate::msgpack_decoder::v04::error::DecodeError; +use rmp::decode::{self, DecodeStringError}; + +#[inline] +fn read_string_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { + decode::read_str_from_slice(buf).map_err(|e| match e { + DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), + DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), + DecodeStringError::TypeMismatch(marker) => { + DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker)) + } + DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), + _ => DecodeError::IOError, + }) +} + +/// Read a string from `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string. +#[inline] +pub fn read_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + read_string_nomut(buf).map(|(str, newbuf)| { + *buf = newbuf; + str + }) +} + +/// Read a nullable string from `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string or a null marker. +#[inline] +pub fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + if is_null_marker(buf) { + Ok("") + } else { + read_string(buf) + } +} diff --git a/trace-utils/src/msgpack_decoder/v04/error.rs b/trace-utils/src/msgpack_decoder/v04/error.rs index ff74819f5..fb37c76bf 100644 --- a/trace-utils/src/msgpack_decoder/v04/error.rs +++ b/trace-utils/src/msgpack_decoder/v04/error.rs @@ -1,12 +1,18 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Represent error that can happen while decoding msgpack #[derive(Debug, PartialEq)] pub enum DecodeError { + /// Failed to convert a number to the expected type. InvalidConversion(String), + /// Payload does not match the expected type for a trace payload. InvalidType(String), + /// Payload is not a valid msgpack object. InvalidFormat(String), + /// Failed to read the buffer. IOError, + /// The payload contains non-utf8 strings. Utf8Error(String), } diff --git a/trace-utils/src/msgpack_decoder/v04/mod.rs b/trace-utils/src/msgpack_decoder/v04/mod.rs index 5b789e7c1..bf5710d74 100644 --- a/trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/mod.rs @@ -1,6 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Decode a V04 encoded trace payload pub mod decoder; +/// Error type used for decoding pub mod error; -pub mod number; diff --git a/trace-utils/src/span_v04/mod.rs b/trace-utils/src/span_v04/mod.rs index cf8d34ee0..0265295ff 100644 --- a/trace-utils/src/span_v04/mod.rs +++ b/trace-utils/src/span_v04/mod.rs @@ -5,4 +5,4 @@ mod span; pub mod trace_utils; -pub use span::{Span, SpanKey, SpanKeyParseError, SpanLink}; +pub use span::{Span, SpanKey, SpanKeyParseError, SpanLink, SpanLinkSlice, SpanSlice}; diff --git a/trace-utils/src/span_v04/span.rs b/trace-utils/src/span_v04/span.rs index 83c220915..6ce464d0a 100644 --- a/trace-utils/src/span_v04/span.rs +++ b/trace-utils/src/span_v04/span.rs @@ -5,7 +5,7 @@ use serde::Serialize; use std::collections::HashMap; use std::fmt; use std::str::FromStr; -use tinybytes::BytesString; +use tinybytes::{Bytes, BytesString}; #[derive(Debug, PartialEq)] pub enum SpanKey { @@ -82,6 +82,103 @@ pub struct SpanLink { pub flags: u64, } +#[derive(Clone, Debug, Default, PartialEq, Serialize)] +pub struct SpanSlice<'a> { + pub service: &'a str, + pub name: &'a str, + pub resource: &'a str, + pub r#type: &'a str, + pub trace_id: u64, + pub span_id: u64, + pub parent_id: u64, + pub start: i64, + pub duration: i64, + #[serde(skip_serializing_if = "is_default")] + pub error: i32, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta: HashMap<&'a str, &'a str>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub metrics: HashMap<&'a str, f64>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta_struct: HashMap<&'a str, Vec>, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub span_links: Vec>, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize)] +pub struct SpanLinkSlice<'a> { + pub trace_id: u64, + pub trace_id_high: u64, + pub span_id: u64, + pub attributes: HashMap<&'a str, &'a str>, + pub tracestate: &'a str, + pub flags: u64, +} + +impl SpanSlice<'_> { + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + Some(Span { + service: BytesString::try_from_bytes_slice(bytes, self.service)?, + name: BytesString::try_from_bytes_slice(bytes, self.name)?, + resource: BytesString::try_from_bytes_slice(bytes, self.resource)?, + r#type: BytesString::try_from_bytes_slice(bytes, self.r#type)?, + trace_id: self.trace_id, + span_id: self.span_id, + parent_id: self.parent_id, + start: self.start, + duration: self.duration, + error: self.error, + meta: self + .meta + .iter() + .map(|(k, v)| { + Some(( + BytesString::try_from_bytes_slice(bytes, k)?, + BytesString::try_from_bytes_slice(bytes, v)?, + )) + }) + .collect::>>()?, + metrics: self + .metrics + .iter() + .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, *v))) + .collect::>>()?, + meta_struct: self + .meta_struct + .iter() + .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, v.clone()))) + .collect::>>>()?, + span_links: self + .span_links + .iter() + .map(|link| link.try_to_bytes(bytes)) + .collect::>>()?, + }) + } +} + +impl SpanLinkSlice<'_> { + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + Some(SpanLink { + trace_id: self.trace_id, + trace_id_high: self.trace_id_high, + span_id: self.span_id, + attributes: self + .attributes + .iter() + .map(|(k, v)| { + Some(( + BytesString::try_from_bytes_slice(bytes, k)?, + BytesString::try_from_bytes_slice(bytes, v)?, + )) + }) + .collect::>>()?, + tracestate: BytesString::try_from_bytes_slice(bytes, self.tracestate)?, + flags: self.flags, + }) + } +} + #[derive(Debug)] pub struct SpanKeyParseError { pub message: String, diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index 2623a5755..2da6b67b2 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -260,7 +260,7 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - let (traces, size) = match msgpack_decoder::v04::decoder::from_slice(self.data) { + let (traces, size) = match msgpack_decoder::v04::decoder::from_bytes(self.data) { Ok(res) => res, Err(e) => { anyhow::bail!("Error deserializing trace from request body: {e}")