Skip to content

fix(trace-utils): v05 events and span links serialization #980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 199 additions & 14 deletions trace-utils/src/span/v05/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
pub mod dict;

use crate::span::v05::dict::SharedDict;
use crate::span::SpanBytes;
use crate::span::{AttributeArrayValue, SpanBytes, SpanEventBytes};
use anyhow::Result;
use serde::Serialize;
use std::collections::HashMap;
use tinybytes::BytesString;

use super::{AttributeAnyValueBytes, AttributeArrayValueBytes};

/// Structure that represent a TraceChunk Span which String fields are interned in a shared
/// dictionary. The number of elements is fixed by the spec and they all need to be serialized, in
Expand All @@ -29,24 +32,146 @@ pub struct Span {
pub r#type: u32,
}

///This structure is a wrapper around a slice of span events
///
/// It is meant to overrdide the default serialization, so we can serialize attributes
/// differently from the original impl.
/// Span events are serialized to JSON and added to "meta" when serializing to v0.5
///
/// The main difference with messagepacck serialization is that attributes with any types
/// are supposed to be mapped to their natural JSON representation.
///
/// Sadly, I haven't found a good way of overriding the default Serialize behavior, other
/// than just doing it for the whole data structures that embed it.
Comment on lines +35 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
///This structure is a wrapper around a slice of span events
///
/// It is meant to overrdide the default serialization, so we can serialize attributes
/// differently from the original impl.
/// Span events are serialized to JSON and added to "meta" when serializing to v0.5
///
/// The main difference with messagepacck serialization is that attributes with any types
/// are supposed to be mapped to their natural JSON representation.
///
/// Sadly, I haven't found a good way of overriding the default Serialize behavior, other
/// than just doing it for the whole data structures that embed it.
/// This structure is a wrapper around a slice of span events
///
/// It is meant to override the default serialization, so we can serialize attributes
/// differently from the original impl.
/// Span events are serialized to JSON and added to "meta" when serializing to v0.5
///
/// The main difference with messagepack serialization is that attributes with any types
/// are supposed to be mapped to their natural JSON representation.

you can skip the justification and personalization.

struct SpanEventsSerializerV05<'a>(&'a [SpanEventBytes]);
struct SpanEventSerializerV05<'a>(&'a SpanEventBytes);
struct SpanEventAttributesSerializerV05<'a>(&'a HashMap<BytesString, AttributeAnyValueBytes>);
struct AttributeAnyValueV05<'a>(&'a AttributeAnyValueBytes);

impl serde::Serialize for SpanEventsSerializerV05<'_> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
for span_event in self.0 {
seq.serialize_element(&SpanEventSerializerV05(span_event))?;
}
seq.end()
}
}

impl serde::Serialize for SpanEventSerializerV05<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::SerializeStruct;
let mut state = serializer.serialize_struct("SpanEventV05", 3)?;
state.serialize_field("time_unix_nano", &self.0.time_unix_nano)?;
state.serialize_field("name", &self.0.name)?;
state.serialize_field(
"attributes",
&SpanEventAttributesSerializerV05(&self.0.attributes),
)?;
state.end()
}
}

impl serde::Serialize for SpanEventAttributesSerializerV05<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(self.0.len()))?;
for (key, value) in self.0 {
map.serialize_entry(key, &AttributeAnyValueV05(value))?;
}
map.end()
}
}

impl serde::Serialize for AttributeAnyValueV05<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::SerializeSeq;
match self.0 {
AttributeAnyValueBytes::SingleValue(v) => {
AttributeArrayValueV05::from_inner(v).serialize(serializer)
}
super::AttributeAnyValue::Array(attribute_array_values) => {
let mut seq = serializer.serialize_seq(Some(attribute_array_values.len()))?;
for value in attribute_array_values {
seq.serialize_element(&AttributeArrayValueV05::from_inner(value))?;
}
seq.end()
}
}
}
}

#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum AttributeArrayValueV05<'a> {
String(&'a BytesString),
Boolean(bool),
Integer(i64),
Double(f64),
}

impl<'a> AttributeArrayValueV05<'a> {
fn from_inner(v: &'a AttributeArrayValueBytes) -> Self {
use AttributeArrayValue::*;
match v {
String(string) => AttributeArrayValueV05::String(string),
Boolean(boolean) => AttributeArrayValueV05::Boolean(*boolean),
Integer(integer) => AttributeArrayValueV05::Integer(*integer),
Double(double) => AttributeArrayValueV05::Double(*double),
}
}
}

pub fn from_span_bytes(span: &SpanBytes, dict: &mut SharedDict) -> Result<Span> {
let service = dict.get_or_insert(&span.service)?;
let name = dict.get_or_insert(&span.name)?;
let resource = dict.get_or_insert(&span.resource)?;
let mut meta = span.meta.iter().try_fold(
HashMap::with_capacity(span.meta.len()),
|mut meta, (k, v)| -> anyhow::Result<HashMap<u32, u32>> {
meta.insert(dict.get_or_insert(k)?, dict.get_or_insert(v)?);
Ok(meta)
},
)?;
if !span.span_links.is_empty() {
let serialized_span_links = serde_json::to_string(&span.span_links)?;
Comment on lines +149 to +150
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check here for the 25kb limit on the tag value, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, anything beyond 25KB limits is dropped. the implementation should try to stuff as many as links possible under the limit and drop the others.

There used to be dropped_attributes_count to count the dropped attributes in case of oversized span links which i didn't see our implementation.

meta.insert(
dict.get_or_insert(&tinybytes::BytesString::from("span_links"))?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the RFC there is some discussion about "span_links" or "_dd.span_links". Did they finally choose the former?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's a mistake the key should definitely be "_dd.span_links" from other implementations

dict.get_or_insert(&tinybytes::BytesString::from(serialized_span_links))?,
);
}
if !span.span_events.is_empty() {
let serialized_span_events =
serde_json::to_string(&SpanEventsSerializerV05(&span.span_events))?;
meta.insert(
dict.get_or_insert(&tinybytes::BytesString::from("events"))?,
dict.get_or_insert(&tinybytes::BytesString::from(serialized_span_events))?,
);
}
Ok(Span {
service: dict.get_or_insert(&span.service)?,
name: dict.get_or_insert(&span.name)?,
resource: dict.get_or_insert(&span.resource)?,
service,
name,
resource,
trace_id: span.trace_id,
span_id: span.span_id,
parent_id: span.parent_id,
start: span.start,
duration: span.duration,
error: span.error,
meta: span.meta.iter().try_fold(
HashMap::with_capacity(span.meta.len()),
|mut meta, (k, v)| -> anyhow::Result<HashMap<u32, u32>> {
meta.insert(dict.get_or_insert(k)?, dict.get_or_insert(v)?);
Ok(meta)
},
)?,
meta,
metrics: span.metrics.iter().try_fold(
HashMap::with_capacity(span.metrics.len()),
|mut metrics, (k, v)| -> anyhow::Result<HashMap<u32, f64>> {
Expand All @@ -60,6 +185,8 @@ pub fn from_span_bytes(span: &SpanBytes, dict: &mut SharedDict) -> Result<Span>

#[cfg(test)]
mod tests {
use crate::span::SpanLinkBytes;

use super::*;
use tinybytes::BytesString;

Expand All @@ -82,8 +209,47 @@ mod tests {
)]),
metrics: HashMap::from([(BytesString::from("metrics_field"), 1.1)]),
meta_struct: HashMap::new(),
span_links: vec![],
span_events: vec![],
span_links: vec![SpanLinkBytes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/suggestion

  • would be nice to have test case for multiple span links
  • a complex attributes (remember we do some flattening for array and maps), I'm not sure at this point we have them already or we have to do ourselves
    eg
{"key": [[1,2], ["3", "4"]]} to {"key.0.0": "1", "key.0.1": 2, "key.1.0": 3, "key.1.1":4}

trace_id: 12345,
trace_id_high: 67890,
span_id: 54321,
attributes: HashMap::from([(BytesString::from("key"), BytesString::from("val"))]),
tracestate: BytesString::from("tracestate_value"),
flags: 1,
}],
span_events: vec![
SpanEventBytes {
time_unix_nano: 123,
name: BytesString::from("ev1"),
attributes: HashMap::from([(
BytesString::from("str_attr"),
AttributeAnyValueBytes::SingleValue(AttributeArrayValueBytes::String(
BytesString::from("val"),
)),
)]),
},
SpanEventBytes {
time_unix_nano: 456,
name: BytesString::from("ev2"),
attributes: HashMap::from([(
BytesString::from("bool_attr"),
AttributeAnyValueBytes::SingleValue(AttributeArrayValueBytes::Boolean(
true,
)),
)]),
},
SpanEventBytes {
time_unix_nano: 789,
name: BytesString::from("ev3"),
attributes: HashMap::from([(
BytesString::from("list_attr"),
AttributeAnyValueBytes::Array(vec![
AttributeArrayValueBytes::String(BytesString::from("val1")),
AttributeArrayValueBytes::String(BytesString::from("val2")),
]),
)]),
},
],
};

let mut dict = SharedDict::default();
Expand All @@ -109,7 +275,7 @@ mod tests {
assert_eq!(v05_span.start, 1);
assert_eq!(v05_span.duration, 111);
assert_eq!(v05_span.error, 0);
assert_eq!(v05_span.meta.len(), 1);
assert_eq!(v05_span.meta.len(), 3);
assert_eq!(v05_span.metrics.len(), 1);

assert_eq!(
Expand All @@ -126,5 +292,24 @@ mod tests {
.unwrap(),
1.1
);
let mut meta = Vec::new();
for (key, value) in &v05_span.meta {
meta.push((dict[*key as usize].as_str(), dict[*value as usize].as_str()));
}
meta.sort();
assert_eq!(meta, &[
(
"events",
"[{\"time_unix_nano\":123,\"name\":\"ev1\",\"attributes\":{\"str_attr\":\"val\"}},{\"time_unix_nano\":456,\"name\":\"ev2\",\"attributes\":{\"bool_attr\":true}},{\"time_unix_nano\":789,\"name\":\"ev3\",\"attributes\":{\"list_attr\":[\"val1\",\"val2\"]}}]",
),
(
"meta_field",
"meta_value",
),
(
"span_links",
"[{\"trace_id\":12345,\"trace_id_high\":67890,\"span_id\":54321,\"attributes\":{\"key\":\"val\"},\"tracestate\":\"tracestate_value\",\"flags\":1}]",
),
]);
}
}
Loading