From 83cbc22449c824411ab63327238350a313fe623a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Ml=C3=A1dek?= Date: Fri, 16 Feb 2024 00:31:48 +0100 Subject: [PATCH 1/2] enhancement: honor span limits when collecting events --- src/layer.rs | 197 +++++++++++++++++++++++++++++++++++--------------- src/tracer.rs | 10 +++ 2 files changed, 148 insertions(+), 59 deletions(-) diff --git a/src/layer.rs b/src/layer.rs index dc3203a..8d63fb7 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -4,6 +4,7 @@ use opentelemetry::{ trace::{self as otel, noop, SpanBuilder, SpanKind, Status, TraceContextExt}, Context as OtelContext, Key, KeyValue, StringValue, Value, }; +use opentelemetry_sdk::trace::SpanLimits; use std::fmt; use std::marker; use std::thread; @@ -126,7 +127,7 @@ struct SpanBuilderUpdates { } impl SpanBuilderUpdates { - fn update(self, span_builder: &mut SpanBuilder) { + fn update(self, span_builder: &mut SpanBuilder, limits: Option) { let Self { name, span_kind, @@ -143,20 +144,46 @@ impl SpanBuilderUpdates { if let Some(status) = status { span_builder.status = status; } - if let Some(attributes) = attributes { + if let Some(mut attributes) = attributes { if let Some(builder_attributes) = &mut span_builder.attributes { + if let Some(limits) = limits { + attributes.truncate( + (limits.max_attributes_per_span as usize) + .saturating_sub(builder_attributes.len()), + ); + } builder_attributes.extend(attributes); } else { + if let Some(limits) = limits { + attributes.truncate(limits.max_attributes_per_span as usize); + } span_builder.attributes = Some(attributes); } } } } +fn push_unless_limit_reached(attributes_or_events: &mut Vec, limit: Option, value: T) { + if limit.map_or(true, |limit| (limit as usize) > attributes_or_events.len()) { + attributes_or_events.push(value); + } +} + struct SpanEventVisitor<'a, 'b> { event_builder: &'a mut otel::Event, span_builder_updates: &'b mut Option, sem_conv_config: SemConvConfig, + limits: Option, +} + +impl<'a, 'b> SpanEventVisitor<'a, 'b> { + fn push_event_attribute(&mut self, attribute: KeyValue) { + push_unless_limit_reached( + &mut self.event_builder.attributes, + self.limits.map(|limits| limits.max_attributes_per_event), + attribute, + ); + } } impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { @@ -170,9 +197,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { #[cfg(feature = "tracing-log")] name if name.starts_with("log.") => (), name => { - self.event_builder - .attributes - .push(KeyValue::new(name, value)); + self.push_event_attribute(KeyValue::new(name, value)); } } } @@ -187,9 +212,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { #[cfg(feature = "tracing-log")] name if name.starts_with("log.") => (), name => { - self.event_builder - .attributes - .push(KeyValue::new(name, value)); + self.push_event_attribute(KeyValue::new(name, value)); } } } @@ -204,9 +227,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { #[cfg(feature = "tracing-log")] name if name.starts_with("log.") => (), name => { - self.event_builder - .attributes - .push(KeyValue::new(name, value)); + self.push_event_attribute(KeyValue::new(name, value)); } } } @@ -229,23 +250,19 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { } if self.sem_conv_config.error_events_to_exceptions { self.event_builder.name = EVENT_EXCEPTION_NAME.into(); - self.event_builder.attributes.push(KeyValue::new( + self.push_event_attribute(KeyValue::new( FIELD_EXCEPTION_MESSAGE, format!("{:?}", value), )); } else { - self.event_builder - .attributes - .push(KeyValue::new("error", format!("{:?}", value))); + self.push_event_attribute(KeyValue::new("error", format!("{:?}", value))); } } // Skip fields that are actually log metadata that have already been handled #[cfg(feature = "tracing-log")] name if name.starts_with("log.") => (), name => { - self.event_builder - .attributes - .push(KeyValue::new(name, value.to_string())); + self.push_event_attribute(KeyValue::new(name, value.to_string())); } } } @@ -269,23 +286,19 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { } if self.sem_conv_config.error_events_to_exceptions { self.event_builder.name = EVENT_EXCEPTION_NAME.into(); - self.event_builder.attributes.push(KeyValue::new( + self.push_event_attribute(KeyValue::new( FIELD_EXCEPTION_MESSAGE, format!("{:?}", value), )); } else { - self.event_builder - .attributes - .push(KeyValue::new("error", format!("{:?}", value))); + self.push_event_attribute(KeyValue::new("error", format!("{:?}", value))); } } // Skip fields that are actually log metadata that have already been handled #[cfg(feature = "tracing-log")] name if name.starts_with("log.") => (), name => { - self.event_builder - .attributes - .push(KeyValue::new(name, format!("{:?}", value))); + self.push_event_attribute(KeyValue::new(name, format!("{:?}", value))); } } } @@ -310,9 +323,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { let error_msg = value.to_string(); if self.sem_conv_config.error_fields_to_exceptions { - self.event_builder - .attributes - .push(Key::new(FIELD_EXCEPTION_MESSAGE).string(error_msg.clone())); + self.push_event_attribute(Key::new(FIELD_EXCEPTION_MESSAGE).string(error_msg.clone())); // NOTE: This is actually not the stacktrace of the exception. This is // the "source chain". It represents the heirarchy of errors from the @@ -320,9 +331,7 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { // of the callsites in the code that led to the error happening. // `std::error::Error::backtrace` is a nightly-only API and cannot be // used here until the feature is stabilized. - self.event_builder - .attributes - .push(Key::new(FIELD_EXCEPTION_STACKTRACE).array(chain.clone())); + self.push_event_attribute(Key::new(FIELD_EXCEPTION_STACKTRACE).array(chain.clone())); } if self.sem_conv_config.error_records_to_exceptions { @@ -349,12 +358,8 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { )); } - self.event_builder - .attributes - .push(Key::new(field.name()).string(error_msg)); - self.event_builder - .attributes - .push(Key::new(format!("{}.chain", field.name())).array(chain)); + self.push_event_attribute(Key::new(field.name()).string(error_msg)); + self.push_event_attribute(Key::new(format!("{}.chain", field.name())).array(chain)); } } @@ -913,34 +918,62 @@ where builder.trace_id = Some(self.tracer.new_trace_id()); } + let limits = self.tracer.span_limits(); + let attribute_limit = limits.map(|limits| limits.max_attributes_per_span); let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity( - attrs.fields().len() + self.extra_span_attrs(), + (attrs.fields().len() + self.extra_span_attrs()).min( + attribute_limit + .map(|limit| limit as usize) + .unwrap_or(usize::MAX), + ), )); if self.location { let meta = attrs.metadata(); if let Some(filename) = meta.file() { - builder_attrs.push(KeyValue::new("code.filepath", filename)); + push_unless_limit_reached( + builder_attrs, + attribute_limit, + KeyValue::new("code.filepath", filename), + ); } if let Some(module) = meta.module_path() { - builder_attrs.push(KeyValue::new("code.namespace", module)); + push_unless_limit_reached( + builder_attrs, + attribute_limit, + KeyValue::new("code.namespace", module), + ); } if let Some(line) = meta.line() { - builder_attrs.push(KeyValue::new("code.lineno", line as i64)); + push_unless_limit_reached( + builder_attrs, + attribute_limit, + KeyValue::new("code.lineno", line as i64), + ); } } if self.with_threads { - THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64))); + THREAD_ID.with(|id| { + push_unless_limit_reached( + builder_attrs, + attribute_limit, + KeyValue::new("thread.id", **id as i64), + ) + }); if let Some(name) = std::thread::current().name() { // TODO(eliza): it's a bummer that we have to allocate here, but // we can't easily get the string as a `static`. it would be // nice if `opentelemetry` could also take `Arc`s as // `String` values... - builder_attrs.push(KeyValue::new("thread.name", name.to_string())); + push_unless_limit_reached( + builder_attrs, + attribute_limit, + KeyValue::new("thread.name", name.to_string()), + ); } } @@ -950,7 +983,7 @@ where sem_conv_config: self.sem_conv_config, }); - updates.update(&mut builder); + updates.update(&mut builder, limits); extensions.insert(OtelData { builder, parent_cx }); } @@ -996,7 +1029,7 @@ where }); let mut extensions = span.extensions_mut(); if let Some(data) = extensions.get_mut::() { - updates.update(&mut data.builder); + updates.update(&mut data.builder, self.tracer.span_limits()); } } @@ -1024,8 +1057,18 @@ where .clone(); let follows_link = otel::Link::new(follows_context, Vec::new()); if let Some(ref mut links) = data.builder.links { - links.push(follows_link); - } else { + push_unless_limit_reached( + links, + self.tracer + .span_limits() + .map(|limits| limits.max_links_per_span), + follows_link, + ); + } else if self + .tracer + .span_limits() + .map_or(true, |limits| limits.max_links_per_span > 0) + { data.builder.links = Some(vec![follows_link]); } } @@ -1068,6 +1111,8 @@ where #[cfg(not(feature = "tracing-log"))] let target = target.string(meta.target()); + let limits = self.tracer.span_limits(); + let mut otel_event = otel::Event::new( String::new(), crate::time::now(), @@ -1080,6 +1125,7 @@ where event_builder: &mut otel_event, span_builder_updates: &mut builder_updates, sem_conv_config: self.sem_conv_config, + limits, }); let mut extensions = span.extensions_mut(); @@ -1095,7 +1141,21 @@ where } if let Some(builder_updates) = builder_updates { - builder_updates.update(builder); + builder_updates.update(builder, limits); + } + + if builder + .events + .as_ref() + .map(|events| events.len()) + .zip(limits) + .map_or(false, |(current_length, limits)| { + current_length >= limits.max_events_per_span as usize + }) + { + // We have reached the configured limit for events so there is no point in storing any more. + // This is however the earliest we can abort this as the event can change e.g. the span's status. + return; } if self.location { @@ -1112,20 +1172,30 @@ where ), }; + let event_attributes_limit = + limits.map(|limits| limits.max_attributes_per_event); + let event_attributes = &mut otel_event.attributes; + if let Some(file) = file { - otel_event - .attributes - .push(KeyValue::new("code.filepath", file)); + push_unless_limit_reached( + event_attributes, + event_attributes_limit, + KeyValue::new("code.filepath", file), + ); } if let Some(module) = module { - otel_event - .attributes - .push(KeyValue::new("code.namespace", module)); + push_unless_limit_reached( + event_attributes, + event_attributes_limit, + KeyValue::new("code.namespace", module), + ); } if let Some(line) = meta.line() { - otel_event - .attributes - .push(KeyValue::new("code.lineno", line as i64)); + push_unless_limit_reached( + event_attributes, + event_attributes_limit, + KeyValue::new("code.lineno", line as i64), + ); } } @@ -1159,8 +1229,17 @@ where let attributes = builder .attributes .get_or_insert_with(|| Vec::with_capacity(2)); - attributes.push(KeyValue::new(busy_ns, timings.busy)); - attributes.push(KeyValue::new(idle_ns, timings.idle)); + let limits = self.tracer.span_limits(); + push_unless_limit_reached( + attributes, + limits.map(|limits| limits.max_attributes_per_span), + KeyValue::new(busy_ns, timings.busy), + ); + push_unless_limit_reached( + attributes, + limits.map(|limits| limits.max_attributes_per_span), + KeyValue::new(idle_ns, timings.idle), + ); } } diff --git a/src/tracer.rs b/src/tracer.rs index c37966b..b45ec58 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -47,6 +47,11 @@ pub trait PreSampledTracer { /// Generate a new span id. fn new_span_id(&self) -> otel::SpanId; + + /// Gets the current span limits. + fn span_limits(&self) -> Option { + None + } } impl PreSampledTracer for noop::NoopTracer { @@ -111,6 +116,11 @@ impl PreSampledTracer for SdkTracer { .map(|provider| provider.config().id_generator.new_span_id()) .unwrap_or(otel::SpanId::INVALID) } + + fn span_limits(&self) -> Option { + self.provider() + .map(|provider| provider.config().span_limits) + } } fn current_trace_state( From 4bba0f4303476466490e8602ef43ac7794a5be37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Ml=C3=A1dek?= Date: Thu, 29 Feb 2024 22:16:08 +0100 Subject: [PATCH 2/2] chore: fix imports in tests --- src/layer.rs | 8 +------- src/tracer.rs | 2 +- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/layer.rs b/src/layer.rs index 8d63fb7..9caa337 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -1291,18 +1291,12 @@ fn thread_id_integer(id: thread::ThreadId) -> u64 { #[cfg(test)] mod tests { use super::*; - use crate::OtelData; - use opentelemetry::{ - trace::{noop, TraceFlags}, - StringValue, - }; + use opentelemetry::trace::TraceFlags; use std::{ - borrow::Cow, collections::HashMap, error::Error, fmt::Display, sync::{Arc, Mutex}, - thread, time::SystemTime, }; use tracing_subscriber::prelude::*; diff --git a/src/tracer.rs b/src/tracer.rs index b45ec58..bff7d70 100644 --- a/src/tracer.rs +++ b/src/tracer.rs @@ -168,7 +168,7 @@ fn process_sampling_result( mod tests { use super::*; use crate::OtelData; - use opentelemetry::trace::{SpanBuilder, SpanId, TracerProvider as _}; + use opentelemetry::trace::TracerProvider as _; use opentelemetry_sdk::trace::{config, Sampler, TracerProvider}; #[test]