From 559fe6421c8b67daf798f95632868cc6a03fd7d9 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 15 Nov 2024 23:46:07 +0000 Subject: [PATCH] Update user_events metrics exporter to otel 0.27, add internal logs (#129) --- .../CHANGELOG.md | 6 +- opentelemetry-user-events-metrics/Cargo.toml | 6 +- .../examples/basic-metrics.rs | 28 +-- .../src/exporter/mod.rs | 173 ++++++++++++------ .../src/tracepoint/mod.rs | 25 ++- 5 files changed, 153 insertions(+), 85 deletions(-) diff --git a/opentelemetry-user-events-metrics/CHANGELOG.md b/opentelemetry-user-events-metrics/CHANGELOG.md index fa2303ed..76bb6655 100644 --- a/opentelemetry-user-events-metrics/CHANGELOG.md +++ b/opentelemetry-user-events-metrics/CHANGELOG.md @@ -1,9 +1,13 @@ # Changelog -## vNext +## v0.8.0 ### Changed +- Bump opentelemetry and opentelemetry_sdk versions to 0.27 +- Bump opentelemetry-proto version to 0.27 +- Uses internal logging from `opentelemetry` crate, which routes internal logs + via `tracing` - Add support for skipping the metric data point if the size exceeds 65360 bytes. ## v0.7.0 diff --git a/opentelemetry-user-events-metrics/Cargo.toml b/opentelemetry-user-events-metrics/Cargo.toml index 748a1e6c..add80acc 100644 --- a/opentelemetry-user-events-metrics/Cargo.toml +++ b/opentelemetry-user-events-metrics/Cargo.toml @@ -11,9 +11,9 @@ edition = "2021" rust-version = "1.71.1" [dependencies] -opentelemetry = { workspace = true, features = ["metrics"] } -opentelemetry_sdk = { workspace = true, features = ["metrics", "rt-tokio"] } -opentelemetry-proto = { workspace = true, features = ["gen-tonic", "metrics"] } +opentelemetry = { version = "0.27", features = ["metrics"] } +opentelemetry_sdk = { version = "0.27", features = ["metrics", "rt-tokio"] } +opentelemetry-proto = { version = "0.27", features = ["gen-tonic", "metrics"] } eventheader = { version = "= 0.4.0" } async-trait = "0.1" prost = "0.13" diff --git a/opentelemetry-user-events-metrics/examples/basic-metrics.rs b/opentelemetry-user-events-metrics/examples/basic-metrics.rs index 80eee892..bcec937d 100644 --- a/opentelemetry-user-events-metrics/examples/basic-metrics.rs +++ b/opentelemetry-user-events-metrics/examples/basic-metrics.rs @@ -32,27 +32,31 @@ async fn main() -> Result<(), Box> { .f64_counter("counter_f64_test") .with_description("test_decription") .with_unit("test_unit") - .init(); + .build(); let counter2 = meter .u64_counter("counter_u64_test") .with_description("test_decription") .with_unit("test_unit") - .init(); + .build(); // Create an UpDownCounter Instrument. - let updown_counter = meter.i64_up_down_counter("up_down_counter_i64_test").init(); - let updown_counter2 = meter.f64_up_down_counter("up_down_counter_f64_test").init(); + let updown_counter = meter + .i64_up_down_counter("up_down_counter_i64_test") + .build(); + let updown_counter2 = meter + .f64_up_down_counter("up_down_counter_f64_test") + .build(); // Create a Histogram Instrument. let histogram = meter .f64_histogram("histogram_f64_test") .with_description("test_description") - .init(); + .build(); let histogram2 = meter .u64_histogram("histogram_u64_test") .with_description("test_description") - .init(); + .build(); // Create a ObservableGauge instrument and register a callback that reports the measurement. let _gauge = meter @@ -68,7 +72,7 @@ async fn main() -> Result<(), Box> { ], ) }) - .init(); + .build(); let _gauge2 = meter .u64_observable_gauge("observable_gauge_u64_test") @@ -83,7 +87,7 @@ async fn main() -> Result<(), Box> { ], ) }) - .init(); + .build(); // Create a ObservableCounter instrument and register a callback that reports the measurement. let _observable_counter = meter @@ -99,7 +103,7 @@ async fn main() -> Result<(), Box> { ], ) }) - .init(); + .build(); let _observable_counter2 = meter .f64_observable_counter("observable_counter_f64_test") @@ -114,7 +118,7 @@ async fn main() -> Result<(), Box> { ], ) }) - .init(); + .build(); // Create a Observable UpDownCounter instrument and register a callback that reports the measurement. let _observable_up_down_counter = meter @@ -130,7 +134,7 @@ async fn main() -> Result<(), Box> { ], ) }) - .init(); + .build(); let _observable_up_down_counter2 = meter .f64_observable_up_down_counter("observable_up_down_counter_f64_test") .with_description("test_description") @@ -144,7 +148,7 @@ async fn main() -> Result<(), Box> { ], ) }) - .init(); + .build(); loop { // Record measurements using the Counter instrument. diff --git a/opentelemetry-user-events-metrics/src/exporter/mod.rs b/opentelemetry-user-events-metrics/src/exporter/mod.rs index af2cf402..1f19ce09 100644 --- a/opentelemetry-user-events-metrics/src/exporter/mod.rs +++ b/opentelemetry-user-events-metrics/src/exporter/mod.rs @@ -1,16 +1,16 @@ use async_trait::async_trait; -use opentelemetry::metrics::{MetricsError, Result}; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_sdk::metrics::data; +use opentelemetry_sdk::metrics::exporter::PushMetricExporter; use opentelemetry_sdk::metrics::{ data::{ ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, ScopeMetrics, - Temporality, }, - exporter::PushMetricsExporter, - reader::TemporalitySelector, - InstrumentKind, + Temporality, }; +use opentelemetry_sdk::metrics::{MetricError, MetricResult}; + +use opentelemetry::{otel_debug, otel_warn}; use crate::tracepoint; use eventheader::_internal as ehi; @@ -42,22 +42,6 @@ impl Default for MetricsExporter { } } -impl TemporalitySelector for MetricsExporter { - // This is matching OTLP exporters delta. - fn temporality(&self, kind: InstrumentKind) -> Temporality { - match kind { - InstrumentKind::Counter - | InstrumentKind::ObservableCounter - | InstrumentKind::ObservableGauge - | InstrumentKind::Histogram - | InstrumentKind::Gauge => Temporality::Delta, - InstrumentKind::UpDownCounter | InstrumentKind::ObservableUpDownCounter => { - Temporality::Cumulative - } - } - } -} - impl Debug for MetricsExporter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("user_events metrics exporter") @@ -65,7 +49,12 @@ impl Debug for MetricsExporter { } impl MetricsExporter { - fn serialize_and_write(&self, resource_metric: &ResourceMetrics) -> Result<()> { + fn serialize_and_write( + &self, + resource_metric: &ResourceMetrics, + metric_name: &str, + metric_type: &str, + ) -> MetricResult<()> { // Allocate a local buffer for each write operation // TODO: Investigate if this can be optimized to avoid reallocation or // allocate a fixed buffer size for all writes @@ -73,28 +62,62 @@ impl MetricsExporter { // Convert to proto message let proto_message: ExportMetricsServiceRequest = resource_metric.into(); + otel_debug!(name: "SerializeStart", + metric_name = metric_name, + metric_type = metric_type); // Encode directly into the buffer - proto_message - .encode(&mut byte_array) - .map_err(|err| MetricsError::Other(err.to_string()))?; + match proto_message.encode(&mut byte_array) { + Ok(_) => { + otel_debug!(name: "SerializeSuccess", + metric_name = metric_name, + metric_type = metric_type, + size = byte_array.len()); + } + Err(err) => { + otel_debug!(name: "SerializeFailed", + error = err.to_string(), + metric_name = metric_name, + metric_type = metric_type, + size = byte_array.len()); + return Err(MetricError::Other(err.to_string())); + } + } // Check if the encoded message exceeds the 64 KB limit if byte_array.len() > MAX_EVENT_SIZE { - return Err(MetricsError::Other( + otel_debug!( + name: "MaxEventSizeExceeded", + reason = format!("Encoded event size exceeds maximum allowed limit of {} bytes. Event will be dropped.", MAX_EVENT_SIZE), + metric_name = metric_name, + metric_type = metric_type, + size = byte_array.len() + ); + return Err(MetricError::Other( "Event size exceeds maximum allowed limit".into(), )); } // Write to the tracepoint - tracepoint::write(&self.trace_point, &byte_array); + let result = tracepoint::write(&self.trace_point, &byte_array); + if result > 0 { + otel_debug!(name: "TracepointWrite", message = "Encoded data successfully written to tracepoint", size = byte_array.len(), metric_name = metric_name, metric_type = metric_type); + } + Ok(()) } } #[async_trait] -impl PushMetricsExporter for MetricsExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> { +impl PushMetricExporter for MetricsExporter { + async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + otel_debug!(name: "ExportStart", message = "Starting metrics export"); + if !self.trace_point.enabled() { + // TODO - This can flood the logs if the tracepoint is disabled for long periods of time + otel_warn!(name: "TracepointDisabled", message = "Tracepoint is disabled, skipping export"); + return Ok(()); + } + if self.trace_point.enabled() { let mut errors = Vec::new(); @@ -119,8 +142,12 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = self.serialize_and_write( + &resource_metric, + &metric.name, + "Histogram", + ) { + errors.push(e.to_string()); } } } else if let Some(histogram) = data.downcast_ref::>() { @@ -140,8 +167,12 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = self.serialize_and_write( + &resource_metric, + &metric.name, + "Histogram", + ) { + errors.push(e.to_string()); } } } else if let Some(gauge) = data.downcast_ref::>() { @@ -160,8 +191,12 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = self.serialize_and_write( + &resource_metric, + &metric.name, + "Gauge", + ) { + errors.push(e.to_string()); } } } else if let Some(gauge) = data.downcast_ref::>() { @@ -180,8 +215,12 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = self.serialize_and_write( + &resource_metric, + &metric.name, + "Gauge", + ) { + errors.push(e.to_string()); } } } else if let Some(gauge) = data.downcast_ref::>() { @@ -200,8 +239,12 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = self.serialize_and_write( + &resource_metric, + &metric.name, + "Gauge", + ) { + errors.push(e.to_string()); } } } else if let Some(sum) = data.downcast_ref::>() { @@ -222,8 +265,10 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = + self.serialize_and_write(&resource_metric, &metric.name, "Sum") + { + errors.push(e.to_string()); } } } else if let Some(sum) = data.downcast_ref::>() { @@ -244,8 +289,10 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = + self.serialize_and_write(&resource_metric, &metric.name, "Sum") + { + errors.push(e.to_string()); } } } else if let Some(sum) = data.downcast_ref::>() { @@ -266,8 +313,10 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = + self.serialize_and_write(&resource_metric, &metric.name, "Sum") + { + errors.push(e.to_string()); } } } else if let Some(exp_hist) = @@ -315,8 +364,12 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = self.serialize_and_write( + &resource_metric, + &metric.name, + "ExponentialHistogram", + ) { + errors.push(e.to_string()); } } } else if let Some(exp_hist) = @@ -364,8 +417,12 @@ impl PushMetricsExporter for MetricsExporter { }], }], }; - if let Err(e) = self.serialize_and_write(&resource_metric) { - errors.push(e); + if let Err(e) = self.serialize_and_write( + &resource_metric, + &metric.name, + "ExponentialHistogram", + ) { + errors.push(e.to_string()); } } } @@ -374,20 +431,26 @@ impl PushMetricsExporter for MetricsExporter { // Return any errors if present if !errors.is_empty() { - return Err(MetricsError::Other(format!( - "Encountered {} errors during export", - errors.len() - ))); + let error_message = format!( + "Export encountered {} errors: [{}]", + errors.len(), + errors.join("; ") + ); + return Err(MetricError::Other(error_message)); } } Ok(()) } - async fn force_flush(&self) -> Result<()> { + fn temporality(&self) -> Temporality { + Temporality::Delta + } + + async fn force_flush(&self) -> MetricResult<()> { Ok(()) // In this implementation, flush does nothing } - fn shutdown(&self) -> Result<()> { + fn shutdown(&self) -> MetricResult<()> { // TracepointState automatically unregisters when dropped // https://github.com/microsoft/LinuxTracepoints-Rust/blob/main/eventheader/src/native.rs#L618 Ok(()) diff --git a/opentelemetry-user-events-metrics/src/tracepoint/mod.rs b/opentelemetry-user-events-metrics/src/tracepoint/mod.rs index acddfe38..1f41fe68 100644 --- a/opentelemetry-user-events-metrics/src/tracepoint/mod.rs +++ b/opentelemetry-user-events-metrics/src/tracepoint/mod.rs @@ -1,6 +1,6 @@ use core::ffi; use eventheader::_internal as ehi; -use opentelemetry::{global, metrics::MetricsError}; +use opentelemetry::{otel_debug, otel_error, otel_info}; use std::panic; use std::pin::Pin; @@ -41,12 +41,12 @@ pub fn write(trace_point: &ehi::TracepointState, buffer: &[u8]) -> i32 { // This must stay in sync with the METRICS_EVENT_DEF string. // Return error -1 if buffer exceeds max size if buffer.len() > u16::MAX as usize { - eprintln!("Buffer exceeds max length."); + otel_debug!(name: "TracePointWriteError", reason = "Buffer exceeds max length.", buffer_size = buffer.len()); return -1; } if PROTOBUF_VERSION.len() != 8 { - eprintln!("Version must be char[8]."); + otel_debug!(name: "TracePointWriteError", reason = "Version must be char[8].", version = PROTOBUF_VERSION); return -1; } @@ -93,24 +93,21 @@ pub unsafe fn register(trace_point: Pin<&ehi::TracepointState>) -> i32 { if value == 0 { // Temporary print as a measure for quick testing // will be replaced with proper logging mechanism - println!("Tracepoint registered successfully.") + otel_info!(name: "TracePointRegistered", reason = "Tracepoint registered successfully."); } else if value == 95 { - global::handle_error(MetricsError::Other( - "Trace/debug file systems are not mounted.".into(), - )); + otel_error!(name: "TracePointRegisterError", reason = "Trace/debug file systems are not mounted."); } else if value == 13 { - global::handle_error(MetricsError::Other( - "Insufficient permissions. You need read/write/execute permissions to user_events tracing directory.".into(), - )); + otel_error!(name: "TracePointRegisterError", reason = "Insufficient permissions. You need read/write/execute permissions to user_events tracing directory."); } value } // We don't want to ever panic so we catch the error and return a unique code for retry Err(err) => { - global::handle_error(MetricsError::Other(format!( - "Tracepoint failed to register: {:?}.", - err, - ))); + otel_error!( + name: "TracePointRegisterError", + reason = "Tracepoint failed to register.", + error = format!("{:?}", err) + ); -1 } }