diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index af2e864dbd..5af3dc87aa 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -33,11 +33,16 @@ path = "src/main.rs" otap-df-config.workspace = true otap-df-controller.workspace = true otap-df-otap.workspace = true +otap-df-pdata.workspace = true +otap-df-telemetry = { path = "crates/telemetry" } thiserror.workspace = true quiver = { workspace = true, optional = true } serde_json.workspace = true clap.workspace = true mimalloc = { workspace = true, optional = true } +tikv-jemallocator.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true rustls = { workspace = true, optional = true } [target.'cfg(not(windows))'.dependencies] @@ -49,6 +54,7 @@ otap-df-pdata-otlp-model = { path = "./crates/pdata/src/otlp/model"} otap-df-config = { path = "crates/config" } otap-df-controller = { path = "crates/controller" } otap-df-otap = { path = "crates/otap" } +otap-df-pdata = { path = "crates/pdata" } quiver = { package = "otap-df-quiver", path = "crates/quiver" } data_engine_expressions = { path = "../experimental/query_engine/expressions" } data_engine_kql_parser = { path = "../experimental/query_engine/kql-parser" } @@ -137,6 +143,7 @@ syn = { version = "2.0", features = ["full", "extra-traits"] } tempfile = "3" thiserror = "2.0.17" tracing = { version = ">=0.1.40", default-features = false } +tracing-core = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.3", default-features = false } tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] } tokio-stream = "0.1.17" diff --git a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs index 544e160119..f5b6362d28 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs @@ -7,6 +7,7 @@ pub mod processors; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::time::Duration; /// Internal logs configuration. #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] @@ -18,6 +19,75 @@ pub struct LogsConfig { /// The list of log processors to configure. #[serde(default)] pub processors: Vec, + + /// Internal collection for component-level logs. + /// + /// When enabled, component logs (otel_info!, otel_warn!, etc.) are routed through + /// an internal telemetry receiver in the OTAP pipeline, allowing use of built-in + /// batch processors, retry, and exporters (console, OTLP, etc.). + /// + /// When disabled (default), component logs are routed to the OpenTelemetry SDK, + /// using the same export path as 3rd party logs from tokio-tracing-rs. + #[serde(default)] + pub internal_collection: InternalCollectionConfig, +} + +/// Configuration for internal collection of component logs. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct InternalCollectionConfig { + /// Enable internal collection for component logs. + /// When false, component logs use the OpenTelemetry SDK (declarative config is honored). + #[serde(default)] + pub enabled: bool, + + /// Per-thread buffer size in bytes for accumulating component logs. + /// This is a pre-allocated, fixed-size buffer. When full, logs are flushed to the channel. + #[serde(default = "default_buffer_size_bytes")] + pub buffer_size_bytes: usize, + + /// Maximum size in bytes for a single log record. + /// Records exceeding this size are dropped with a counter increment. + /// This limit enables encoder optimizations (2-byte length placeholders for 14-bit sizes). + #[serde(default = "default_max_record_bytes")] + pub max_record_bytes: usize, + + /// Maximum number of records in the bounded channel. + /// When full, new records fall back to raw console logger. + #[serde(default = "default_max_record_count")] + pub max_record_count: usize, + + /// Flush interval for periodic flushing by the internal telemetry receiver. + #[serde(with = "humantime_serde", default = "default_flush_interval")] + #[schemars(with = "String")] + pub flush_interval: Duration, +} + +impl Default for InternalCollectionConfig { + fn default() -> Self { + Self { + enabled: false, + buffer_size_bytes: default_buffer_size_bytes(), + max_record_bytes: default_max_record_bytes(), + max_record_count: default_max_record_count(), + flush_interval: default_flush_interval(), + } + } +} + +fn default_buffer_size_bytes() -> usize { + 64 * 1024 // 64 KiB - pre-allocated per thread +} + +fn default_max_record_bytes() -> usize { + 16 * 1024 // 16 KiB - max single record (enables 2-byte length placeholders) +} + +fn default_max_record_count() -> usize { + 1000 // messages (bounded channel) +} + +fn default_flush_interval() -> Duration { + Duration::from_secs(1) } /// Log level for internal engine logs. @@ -56,6 +126,41 @@ mod tests { let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap(); assert_eq!(config.level, LogLevel::Info); assert_eq!(config.processors.len(), 1); + assert!(!config.internal_collection.enabled); + } + + #[test] + fn test_internal_collection_config_deserialize() { + let yaml_str = r#" + level: "info" + internal_collection: + enabled: true + buffer_size_bytes: 131072 + max_record_bytes: 32768 + max_record_count: 2000 + flush_interval: "2s" + "#; + let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap(); + assert!(config.internal_collection.enabled); + assert_eq!(config.internal_collection.buffer_size_bytes, 131072); + assert_eq!(config.internal_collection.max_record_bytes, 32768); + assert_eq!(config.internal_collection.max_record_count, 2000); + assert_eq!(config.internal_collection.flush_interval, Duration::from_secs(2)); + } + + #[test] + fn test_internal_collection_config_defaults() { + let yaml_str = r#" + level: "info" + internal_collection: + enabled: true + "#; + let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap(); + assert!(config.internal_collection.enabled); + assert_eq!(config.internal_collection.buffer_size_bytes, 64 * 1024); + assert_eq!(config.internal_collection.max_record_bytes, 16 * 1024); + assert_eq!(config.internal_collection.max_record_count, 1000); + assert_eq!(config.internal_collection.flush_interval, Duration::from_secs(1)); } #[test] diff --git a/rust/otap-dataflow/crates/engine/Cargo.toml b/rust/otap-dataflow/crates/engine/Cargo.toml index 8d46abfb00..64f0ac6d3a 100644 --- a/rust/otap-dataflow/crates/engine/Cargo.toml +++ b/rust/otap-dataflow/crates/engine/Cargo.toml @@ -24,6 +24,7 @@ otap-df-state = { path = "../state" } otap-df-telemetry = { path = "../telemetry" } otap-df-telemetry-macros = { path = "../telemetry-macros" } +bytes = { workspace = true } thiserror = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } diff --git a/rust/otap-dataflow/crates/engine/src/effect_handler.rs b/rust/otap-dataflow/crates/engine/src/effect_handler.rs index 5c58a9597b..4327788809 100644 --- a/rust/otap-dataflow/crates/engine/src/effect_handler.rs +++ b/rust/otap-dataflow/crates/engine/src/effect_handler.rs @@ -6,13 +6,60 @@ use crate::control::{AckMsg, NackMsg, PipelineControlMsg, PipelineCtrlMsgSender}; use crate::error::Error; use crate::node::NodeId; +use bytes::Bytes; use otap_df_channel::error::SendError; +use otap_df_pdata::otlp::stateful_encoder::StatefulOtlpEncoder; +use otap_df_pdata::OtlpProtoBytes; use otap_df_telemetry::error::Error as TelemetryError; use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler}; use otap_df_telemetry::reporter::MetricsReporter; use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tokio::net::{TcpListener, UdpSocket}; +use tokio::sync::mpsc; + +/// Telemetry buffer state for local (!Send) components. +pub(crate) struct LocalTelemetryState { + pub(crate) encoder: StatefulOtlpEncoder, + pub(crate) resource_bytes: OtlpProtoBytes, + pub(crate) scope_name: String, + pub(crate) flush_threshold_bytes: usize, + pub(crate) overflow_sender: mpsc::UnboundedSender, +} + +/// Telemetry buffer state for shared (Send + Sync) components. +pub(crate) struct SharedTelemetryState { + pub(crate) encoder: StatefulOtlpEncoder, + pub(crate) resource_bytes: OtlpProtoBytes, + pub(crate) scope_name: String, + pub(crate) flush_threshold_bytes: usize, + pub(crate) overflow_sender: mpsc::UnboundedSender, +} + +/// Telemetry buffer for component-level logging via shared (Send + Sync) effect handlers. +/// +/// Routes component logs to the internal telemetry receiver. +/// The overflow_sender can point to: +/// - ITR receiver (when ITR is configured) - enters OTAP pipeline +/// - OtlpBytesChannel (SDK fallback) - same as 3rd party logs +/// +/// Note: Local (!Send) effect handlers store LocalTelemetryState directly +/// in their struct to avoid breaking Send bounds. +#[derive(Clone)] +pub struct TelemetryBuffer(Arc>); + +impl TelemetryBuffer { + /// Create a new telemetry buffer for shared effect handlers. + pub(crate) fn new(state: SharedTelemetryState) -> Self { + Self(Arc::new(Mutex::new(state))) + } + + /// Get a reference to the inner state (for internal use). + pub(crate) fn inner(&self) -> &Arc> { + &self.0 + } +} /// Common implementation of all effect handlers. /// @@ -25,6 +72,10 @@ pub(crate) struct EffectHandlerCore { #[allow(dead_code)] // Will be used in the future. ToDo report metrics from channel and messages. pub(crate) metrics_reporter: MetricsReporter, + + /// Telemetry buffer for component-level logging. + /// None if telemetry is disabled for this component. + pub(crate) telemetry_buffer: Option, } impl EffectHandlerCore { @@ -34,6 +85,21 @@ impl EffectHandlerCore { node_id, pipeline_ctrl_msg_sender: None, metrics_reporter, + telemetry_buffer: None, + } + } + + /// Creates a new EffectHandlerCore with telemetry buffer (for shared effect handlers only). + pub(crate) fn new_with_shared_telemetry( + node_id: NodeId, + metrics_reporter: MetricsReporter, + telemetry_buffer: TelemetryBuffer, + ) -> Self { + Self { + node_id, + pipeline_ctrl_msg_sender: None, + metrics_reporter, + telemetry_buffer: Some(telemetry_buffer), } } diff --git a/rust/otap-dataflow/crates/engine/src/local/processor.rs b/rust/otap-dataflow/crates/engine/src/local/processor.rs index 5a8f50b38a..a3d3183176 100644 --- a/rust/otap-dataflow/crates/engine/src/local/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/local/processor.rs @@ -97,6 +97,11 @@ pub struct EffectHandler { msg_senders: HashMap>, /// Cached default sender for fast access in the hot path default_sender: Option>, + + /// Local telemetry state for component-level logging. + /// Stored directly in EffectHandler (not in EffectHandlerCore) because + /// it contains Rc> which would break Send bounds for shared handlers. + telemetry_state: Option>>, } /// Implementation for the `!Send` effect handler. @@ -124,6 +129,7 @@ impl EffectHandler { core, msg_senders, default_sender, + telemetry_state: None, } } @@ -252,6 +258,43 @@ impl EffectHandler { self.core.report_metrics(metrics) } + /// Log a telemetry event from this component. + /// + /// This method encodes the log record into the component's telemetry buffer. + /// On overflow (buffer exceeds threshold), it automatically flushes to the + /// configured destination (ITR or SDK fallback) via non-blocking channel send. + /// + /// This method never fails - errors are silently dropped to prevent recursion. + /// If the telemetry buffer is not configured, this is a no-op. + pub fn log_event(&self, log_record: &impl otap_df_pdata::views::logs::LogRecordView) { + if let Some(ref buffer) = self.telemetry_state { + let mut state = buffer.borrow_mut(); + + // Clone necessary data to avoid holding immutable borrows + let resource_bytes = state.resource_bytes.clone(); + let scope_name = state.scope_name.clone(); + let threshold = state.flush_threshold_bytes; + + // Encode the log record (silent drop on error to prevent recursion) + if state.encoder.encode_log_record( + log_record, + resource_bytes.as_bytes(), + &scope_name, + ).is_err() { + // TODO: Increment dropped events counter + return; + } + + // Check overflow - non-blocking send to break recursion + if state.encoder.len() >= threshold { + let bytes = state.encoder.flush(); + // Non-blocking send - errors are silently dropped + // Destination is either ITR or SDK fallback (OtlpBytesChannel) + let _ = state.overflow_sender.send(bytes); + } + } + } + // More methods will be added in the future as needed. } diff --git a/rust/otap-dataflow/crates/engine/src/shared/processor.rs b/rust/otap-dataflow/crates/engine/src/shared/processor.rs index 8e10975687..b43b9c3b6c 100644 --- a/rust/otap-dataflow/crates/engine/src/shared/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/shared/processor.rs @@ -242,5 +242,43 @@ impl EffectHandler { self.core.report_metrics(metrics) } + /// Log a telemetry event from this component. + /// + /// This method encodes the log record into the component's telemetry buffer. + /// On overflow (buffer exceeds threshold), it automatically flushes to the + /// configured destination (ITR or SDK fallback) via non-blocking channel send. + /// + /// This method never fails - errors are silently dropped to prevent recursion. + /// If the telemetry buffer is not configured, this is a no-op. + /// Thread-safe via Arc>. + pub fn log_event(&self, log_record: &impl otap_df_pdata::views::logs::LogRecordView) { + if let Some(ref buffer) = self.core.telemetry_buffer { + let mut state = buffer.inner().lock().unwrap(); + + // Clone necessary data to avoid holding immutable borrows + let resource_bytes = state.resource_bytes.clone(); + let scope_name = state.scope_name.clone(); + let threshold = state.flush_threshold_bytes; + + // Encode the log record (silent drop on error to prevent recursion) + if state.encoder.encode_log_record( + log_record, + resource_bytes.as_bytes(), + &scope_name, + ).is_err() { + // TODO: Increment dropped events counter + return; + } + + // Check overflow - non-blocking send to break recursion + if state.encoder.len() >= threshold { + let bytes = state.encoder.flush(); + // Non-blocking send - errors are silently dropped + // Destination is either ITR or SDK fallback (OtlpBytesChannel) + let _ = state.overflow_sender.send(bytes); + } + } + } + // More methods will be added in the future as needed. } diff --git a/rust/otap-dataflow/crates/otap/Cargo.toml b/rust/otap-dataflow/crates/otap/Cargo.toml index 67a32d31f9..d3777d27a1 100644 --- a/rust/otap-dataflow/crates/otap/Cargo.toml +++ b/rust/otap-dataflow/crates/otap/Cargo.toml @@ -40,6 +40,8 @@ smallvec = { workspace = true } bitflags = { workspace = true } bytes = { workspace = true } parking_lot = { workspace = true } +tracing = { workspace = true } +tracing-core = { workspace = true } data_engine_kql_parser = { workspace = true } data_engine_expressions = { workspace = true } diff --git a/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver/mod.rs b/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver/mod.rs new file mode 100644 index 0000000000..cd3184cc05 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver/mod.rs @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! # Internal Telemetry Receiver +//! +//! This receiver collects internal telemetry (traces, logs, metrics) from the OTAP-Dataflow +//! engine itself for self-diagnostics via the tracing integration layer. +//! +//! ## Architecture +//! ```text +//! tracing::info!(count = 42, error = ?e) +//! ↓ +//! OtlpTracingLayer (crates/telemetry/src/tracing_integration/subscriber.rs) +//! ↓ Visit::record_i64(), record_debug(), etc. (captures full structure) +//! ↓ Build TracingLogRecord with nested TracingAnyValue (Arrays, Maps) +//! ↓ Encode to OTLP bytes using stateful encoder +//! ↓ Send OTLP bytes to channel +//! ↓ +//! InternalTelemetryReceiver (this module) +//! ↓ Receives OTLP bytes from channel +//! ↓ Wraps as OtapPdata +//! ↓ Injects into dataflow pipeline +//! ``` +//! +//! ## Key Features +//! - **Structured Capture**: Full fidelity preservation of nested data via TracingAnyValue +//! - **OTLP Native**: Direct encoding to OTLP bytes without intermediate conversions +//! - **Pipeline Integration**: Routes internal telemetry through standard dataflow +//! +//! This receiver operates as a **shared** receiver (Send + Sync) to handle multithreaded +//! telemetry collection from across the application. + +mod shared_receiver; + +pub use shared_receiver::{Config, InternalTelemetryReceiver, get_otlp_bytes_sender}; + +use crate::OTAP_RECEIVER_FACTORIES; +use linkme::distributed_slice; +use otap_df_engine::ReceiverFactory; + +/// URN for the internal telemetry receiver +pub const INTERNAL_TELEMETRY_RECEIVER_URN: &str = "urn:otap:receiver:internal-telemetry:v1"; + +/// Register the internal telemetry receiver factory +#[allow(unsafe_code)] +#[distributed_slice(OTAP_RECEIVER_FACTORIES)] +pub static INTERNAL_TELEMETRY_RECEIVER_FACTORY: ReceiverFactory = + ReceiverFactory { + name: INTERNAL_TELEMETRY_RECEIVER_URN, + create: |_pipeline_ctx: otap_df_engine::context::PipelineContext, + node: otap_df_engine::node::NodeId, + node_config: std::sync::Arc, + receiver_config: &otap_df_engine::config::ReceiverConfig| { + let config: Config = serde_json::from_value(node_config.config.clone()) + .map_err(|e| otap_df_config::error::Error::InvalidConfiguration { + errors: vec![otap_df_config::error::Error::DeserializationError { + context: otap_df_config::error::Context::default(), + format: "JSON".to_string(), + details: format!("Failed to parse internal telemetry receiver config: {}", e), + }], + })?; + + let receiver = InternalTelemetryReceiver::new(config); + + Ok(otap_df_engine::receiver::ReceiverWrapper::shared( + receiver, + node, + node_config, + receiver_config, + )) + }, + }; diff --git a/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver/shared_receiver.rs b/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver/shared_receiver.rs new file mode 100644 index 0000000000..bc14b6e3f7 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver/shared_receiver.rs @@ -0,0 +1,199 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Shared (Send + Sync) implementation of the internal telemetry receiver. +//! +//! This receiver operates in multithreaded mode, receiving OTLP-encoded bytes +//! from the OtlpTracingLayer and routing them through the dataflow pipeline. + +use crate::pdata::{Context, OtapPdata}; +use async_trait::async_trait; +use bytes::Bytes; +use otap_df_engine::control::NodeControlMsg; +use otap_df_engine::error::Error; +use otap_df_engine::shared::receiver as shared; +use otap_df_engine::terminal_state::TerminalState; +use otap_df_pdata::{OtlpProtoBytes, OtapPayload}; +use otap_df_telemetry::otel_info; +use serde::Deserialize; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::time::interval; + +/// Configuration for the internal telemetry receiver +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Config { + /// Flush interval for sending batched telemetry (default: 1s) + #[serde(default = "default_flush_interval")] + #[serde(with = "humantime_serde")] + pub flush_interval: Duration, + + /// Maximum number of OTLP byte buffers to queue before dropping (default: 1000) + #[serde(default = "default_max_buffer_size")] + pub max_buffer_size: usize, + + /// Channel buffer size for receiving OTLP bytes (default: 100) + #[serde(default = "default_channel_buffer")] + pub channel_buffer: usize, +} + +fn default_flush_interval() -> Duration { + Duration::from_secs(1) +} + +fn default_max_buffer_size() -> usize { + 1000 +} + +fn default_channel_buffer() -> usize { + 100 +} + +impl Default for Config { + fn default() -> Self { + Self { + flush_interval: default_flush_interval(), + max_buffer_size: default_max_buffer_size(), + channel_buffer: default_channel_buffer(), + } + } +} + +/// Global channel for sending OTLP bytes from OtlpTracingLayer to receiver +static OTLP_BYTES_SENDER: std::sync::OnceLock>> = std::sync::OnceLock::new(); + +/// Get the global OTLP bytes sender for the tracing layer +pub fn get_otlp_bytes_sender() -> Option<&'static mpsc::UnboundedSender>> { + OTLP_BYTES_SENDER.get() +} + +/// Initialize the global OTLP bytes sender (called by receiver on startup) +fn init_otlp_bytes_sender(sender: mpsc::UnboundedSender>) { + let _ = OTLP_BYTES_SENDER.set(sender); +} + +/// Internal telemetry receiver that collects self-diagnostic telemetry +pub struct InternalTelemetryReceiver { + config: Config, +} + +impl InternalTelemetryReceiver { + /// Create a new internal telemetry receiver + pub fn new(config: Config) -> Self { + Self { config } + } +} + +#[async_trait] +impl shared::Receiver for InternalTelemetryReceiver { + async fn start( + self: Box, + mut ctrl_msg_recv: shared::ControlChannel, + effect_handler: shared::EffectHandler, + ) -> Result { + otel_info!("InternalTelemetryReceiver.Start", + message = "Starting internal telemetry receiver"); + + // Create channel for receiving OTLP bytes from tracing layer + let (otlp_tx, mut otlp_rx) = mpsc::unbounded_channel::>(); + + // Initialize global sender for OtlpTracingLayer to use + init_otlp_bytes_sender(otlp_tx); + + otel_info!("InternalTelemetryReceiver.Ready", + message = "Ready to receive OTLP bytes from tracing layer"); + + // Set up periodic flush timer + let mut flush_timer = interval(self.config.flush_interval); + flush_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // Buffer for collecting OTLP byte buffers between flushes + let mut otlp_buffer = Vec::with_capacity(self.config.max_buffer_size); + + loop { + tokio::select! { + // Handle control messages from the engine + ctrl_result = ctrl_msg_recv.recv() => { + match ctrl_result { + Ok(NodeControlMsg::Shutdown { .. }) => { + otel_info!("InternalTelemetryReceiver.Shutdown", + message = "Shutting down internal telemetry receiver", + buffered_count = otlp_buffer.len()); + + // Flush any remaining OTLP bytes + for otlp_bytes in otlp_buffer.drain(..) { + // Wrap as OtapPdata and send + let bytes = Bytes::from(otlp_bytes); + let proto_bytes = OtlpProtoBytes::ExportLogsRequest(bytes); + let payload: OtapPayload = proto_bytes.into(); + let pdata = OtapPdata::new(Context::default(), payload); + let _ = effect_handler.send_message(pdata).await; + } + + return Ok(TerminalState::default()); + } + Ok(_) => { + // Handle other control messages if needed + } + Err(_) => { + // Control channel closed + return Ok(TerminalState::default()); + } + } + } + + // Receive OTLP bytes from the tracing layer + Some(otlp_bytes) = otlp_rx.recv() => { + otlp_buffer.push(otlp_bytes); + + // Drop oldest if buffer is full + if otlp_buffer.len() >= self.config.max_buffer_size { + otel_info!("InternalTelemetryReceiver.BufferFull", + message = "Dropping oldest OTLP bytes, buffer full", + max_size = self.config.max_buffer_size); + // Keep only the most recent half + let _ = otlp_buffer.drain(0..otlp_buffer.len() / 2).count(); + } + } + + // Periodic flush + _ = flush_timer.tick() => { + if !otlp_buffer.is_empty() { + let count = otlp_buffer.len(); + + // Send all buffered OTLP bytes as OtapPdata + for otlp_bytes in otlp_buffer.drain(..) { + let bytes = Bytes::from(otlp_bytes); + let proto_bytes = OtlpProtoBytes::ExportLogsRequest(bytes); + let payload: OtapPayload = proto_bytes.into(); + let pdata = OtapPdata::new(Context::default(), payload); + if let Err(e) = effect_handler.send_message(pdata).await { + otel_info!("InternalTelemetryReceiver.SendFailed", + message = "Failed to send telemetry", + error = format!("{:?}", e)); + } + } + + otel_info!("InternalTelemetryReceiver.Flushed", + message = "Flushed OTLP telemetry", + count = count); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_defaults() { + let config = Config::default(); + assert_eq!(config.flush_interval, Duration::from_secs(1)); + assert_eq!(config.max_buffer_size, 1000); + assert_eq!(config.channel_buffer, 100); + } +} diff --git a/rust/otap-dataflow/crates/otap/src/lib.rs b/rust/otap-dataflow/crates/otap/src/lib.rs index 2fdcf934de..7a3f512c7a 100644 --- a/rust/otap-dataflow/crates/otap/src/lib.rs +++ b/rust/otap-dataflow/crates/otap/src/lib.rs @@ -30,6 +30,9 @@ pub mod retry_processor; /// Receiver that reads in syslog data pub mod syslog_cef_receiver; +/// Internal telemetry receiver for self-diagnostics +pub mod internal_telemetry_receiver; + /// Common component accessories (e.g., context-state management). pub mod accessory; diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs index 865554455b..00ef23be9c 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs @@ -272,6 +272,8 @@ const fn default_keep_alive_while_idle() -> bool { #[allow(missing_docs)] mod tests { use super::*; + + #[cfg(feature = "experimental-tls")] use tempfile::NamedTempFile; #[cfg(feature = "experimental-tls")] diff --git a/rust/otap-dataflow/crates/pdata/src/otlp/mod.rs b/rust/otap-dataflow/crates/pdata/src/otlp/mod.rs index 051110ac5d..9f47ca197e 100644 --- a/rust/otap-dataflow/crates/pdata/src/otlp/mod.rs +++ b/rust/otap-dataflow/crates/pdata/src/otlp/mod.rs @@ -23,6 +23,8 @@ pub mod metrics; pub mod traces; mod common; +/// Stateful encoder for streaming OTLP messages with automatic batching +pub mod stateful_encoder; #[cfg(test)] mod tests; diff --git a/rust/otap-dataflow/crates/pdata/src/otlp/stateful_encoder.rs b/rust/otap-dataflow/crates/pdata/src/otlp/stateful_encoder.rs new file mode 100644 index 0000000000..81c3b4a97c --- /dev/null +++ b/rust/otap-dataflow/crates/pdata/src/otlp/stateful_encoder.rs @@ -0,0 +1,814 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Stateful OTLP encoder for streaming single log records with automatic batching. +//! +//! This encoder maintains open `ResourceLogs` and `ScopeLogs` messages, appending individual +//! `LogRecord`s as they arrive. When the InstrumentationScope changes (via scope name), it automatically +//! closes the previous scope and starts a new one. The Resource is pre-encoded and copied once. +//! +//! # Design +//! - **Resource**: Pre-encoded as `OtlpBytes` (includes protobuf field tag + length + content) +//! - **Scope**: Encoded on-the-fly using scope name (InstrumentationScope.name only) +//! - **LogRecord**: Accepted as `LogRecordView` trait, encoded on-the-fly + +use crate::error::Result; +use crate::otlp::common::{ProtoBuffer, encode_len_placeholder, patch_len_placeholder}; +use crate::proto::consts::{field_num::logs::*, wire_types}; +use crate::views::logs::LogRecordView; +use bytes::Bytes; + +/// Pre-encoded OTLP bytes (includes protobuf field tag + length + message content) +/// +/// These bytes are ready to be copied directly into the output buffer without further processing. +pub type OtlpBytes = Vec; +/// @@@ Remove me, use super::OtlpProtoBytes + +/// Position marker for a length-delimited field that needs patching +/// +/// @@@ Make this variable width. We want 2-byte padding for records +/// and 4-byte padding for the container messages ResourceLogs, +/// ScopeLogs, etc, because it is reasonable to insist on 16 KiB log +/// messages for a self-diagnostic library and we are able to drop +/// attributes to achieve this (OTLP has a dedicated field for this). +/// Using a maybe, or a for the primitive u16, u32. +#[derive(Debug, Clone, Copy)] +struct LengthPlaceholder { + /// Position in buffer where the 4-byte length placeholder starts + position: usize, +} + +impl LengthPlaceholder { + fn new(position: usize) -> Self { + Self { position } + } + + fn patch(self, buf: &mut ProtoBuffer) { + let content_len = buf.len() - self.position - 4; + patch_len_placeholder(buf, 4, content_len, self.position); + } +} + +/// Current state of the stateful encoder +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum EncoderState { + /// No messages open, ready to start new ResourceLogs + Idle, + /// ResourceLogs is open, ready to add ScopeLogs + ResourceOpen, + /// ResourceLogs and ScopeLogs are both open, ready to append LogRecords + ScopeOpen, +} + +/// Stateful OTLP encoder that maintains open ResourceLogs and ScopeLogs messages. +/// +/// # Example +/// ```ignore +/// let mut encoder = StatefulOtlpEncoder::new(64 * 1024); +/// +/// // Pre-encode resource once +/// let resource_bytes = encode_resource_to_otlp_bytes(&resource); +/// +/// // Scope name is typically the module path or instrumentation library name +/// let scope_name = "my_module::component"; +/// +/// // Encode multiple log records - automatically batched if scope name matches +/// encoder.encode_log_record(&log_record_view, &resource_bytes, scope_name)?; +/// encoder.encode_log_record(&log_record_view2, &resource_bytes, scope_name)?; // Batched +/// +/// // Flush to get OTLP bytes +/// let otlp_bytes = encoder.flush(); +/// ``` +pub struct StatefulOtlpEncoder { + /// Output buffer (reuses ProtoBuffer infrastructure) + buf: ProtoBuffer, + + /// Current encoder state + state: EncoderState, + + /// Length placeholder for the current ResourceLogs message + resource_logs_placeholder: Option, + + /// Length placeholder for the current ScopeLogs message + scope_logs_placeholder: Option, + + /// Name of the current scope for comparison + current_scope_name: Option, +} + +impl StatefulOtlpEncoder { + /// Create a new encoder with pre-allocated buffer capacity + pub fn new(capacity_bytes: usize) -> Self { + Self { + buf: ProtoBuffer::with_capacity(capacity_bytes), + state: EncoderState::Idle, + resource_logs_placeholder: None, + scope_logs_placeholder: None, + current_scope_name: None, + } + } + + /// Get the current buffer size in bytes + #[inline] + pub fn len(&self) -> usize { + self.buf.len() + } + + /// Check if the buffer is empty + #[inline] + pub fn is_empty(&self) -> bool { + self.buf.is_empty() + } + + /// Encode a single log record with its Resource and Scope context. + /// + /// This method automatically handles batching: + /// - If scope name matches the current batch, the LogRecord is appended + /// - If scope name differs, the current ScopeLogs is closed and a new one started + /// + /// # Parameters + /// - `log_record`: View of the log record to encode + /// - `resource_bytes`: Pre-encoded Resource (includes protobuf field tag + length + content) + /// - `scope_name`: InstrumentationScope name (typically tracing target/module path) + pub fn encode_log_record( + &mut self, + log_record: &impl LogRecordView, + resource_bytes: &[u8], // @@@ Make super::OtlpProtoBytes, expecting ::ExportLogsRequest + scope_name: &str, + ) -> Result<()> { + match self.state { + EncoderState::Idle => { + // Start new batch with Resource and Scope + self.start_resource_logs(resource_bytes)?; + self.start_scope_logs(scope_name)?; + self.append_log_record(log_record)?; + } + + EncoderState::ResourceOpen => { + // Resource already open, start scope + self.start_scope_logs(scope_name)?; + self.append_log_record(log_record)?; + } + + EncoderState::ScopeOpen => { + if self.current_scope_name.as_deref() == Some(scope_name) { + // Same scope - just append LogRecord + self.append_log_record(log_record)?; + } else { + // Different scope - close current and start new + self.close_scope_logs()?; + self.start_scope_logs(scope_name)?; + self.append_log_record(log_record)?; + } + } + } + + Ok(()) + } + + /// Flush the encoder, closing all open messages and returning the accumulated OTLP bytes. + /// + /// After flushing, the encoder is reset and ready for new messages. + pub fn flush(&mut self) -> Bytes { + // Close any open messages + if self.state == EncoderState::ScopeOpen { + let _ = self.close_scope_logs(); + } + if self.state == EncoderState::ResourceOpen || self.state == EncoderState::ScopeOpen { + let _ = self.close_resource_logs(); + } + + // Take the bytes and reset the encoder + let (bytes, capacity) = self.buf.take_into_bytes(); + + // Reset state + self.state = EncoderState::Idle; + self.resource_logs_placeholder = None; + self.scope_logs_placeholder = None; + self.current_scope_name = None; + + // Ensure capacity is preserved for next use + self.buf.ensure_capacity(capacity); + + bytes + } + + // === Private state management methods === + + fn start_resource_logs(&mut self, resource_bytes: &[u8]) -> Result<()> { + // Encode LogsData.resource_logs field (tag 1, length-delimited) + self.buf + .encode_field_tag(LOGS_DATA_RESOURCE, wire_types::LEN); + + // Write 4-byte length placeholder + let placeholder = LengthPlaceholder::new(self.buf.len()); + encode_len_placeholder(&mut self.buf); + + // Copy pre-encoded Resource bytes (includes ResourceLogs.resource field) + self.buf.extend_from_slice(resource_bytes); + + // Update state + self.resource_logs_placeholder = Some(placeholder); + self.state = EncoderState::ResourceOpen; + + Ok(()) + } + + fn start_scope_logs(&mut self, scope_name: &str) -> Result<()> { + // Encode ResourceLogs.scope_logs field (tag 2, length-delimited) + self.buf + .encode_field_tag(RESOURCE_LOGS_SCOPE_LOGS, wire_types::LEN); + + // Write 4-byte length placeholder + let placeholder = LengthPlaceholder::new(self.buf.len()); + encode_len_placeholder(&mut self.buf); + + // Encode ScopeLogs.scope field (tag 1, InstrumentationScope message) + self.encode_instrumentation_scope(scope_name)?; + + // Update state + self.scope_logs_placeholder = Some(placeholder); + self.current_scope_name = Some(scope_name.to_string()); + self.state = EncoderState::ScopeOpen; + + Ok(()) + } + + fn append_log_record(&mut self, log_record: &impl LogRecordView) -> Result<()> { + // Encode ScopeLogs.log_records field (tag 2, length-delimited) + self.buf + .encode_field_tag(SCOPE_LOGS_LOG_RECORDS, wire_types::LEN); + + // Use 4-byte padding for LogRecord + let placeholder = LengthPlaceholder::new(self.buf.len()); + encode_len_placeholder(&mut self.buf); + + // Encode LogRecordView fields + encode_log_record_view(log_record, &mut self.buf)?; + + // Patch the length + placeholder.patch(&mut self.buf); + + Ok(()) + } + + fn close_scope_logs(&mut self) -> Result<()> { + if let Some(placeholder) = self.scope_logs_placeholder.take() { + placeholder.patch(&mut self.buf); + self.state = EncoderState::ResourceOpen; + self.current_scope_name = None; + } + Ok(()) + } + + fn close_resource_logs(&mut self) -> Result<()> { + if let Some(placeholder) = self.resource_logs_placeholder.take() { + placeholder.patch(&mut self.buf); + self.state = EncoderState::Idle; + } + Ok(()) + } + + /// Encode an InstrumentationScope with just the name field + fn encode_instrumentation_scope(&mut self, scope_name: &str) -> Result<()> { + use crate::proto::consts::field_num::common::INSTRUMENTATION_SCOPE_NAME; + + // Encode ScopeLogs.scope field (tag 1, length-delimited) + self.buf.encode_field_tag(SCOPE_LOG_SCOPE, wire_types::LEN); + let scope_placeholder = LengthPlaceholder::new(self.buf.len()); + encode_len_placeholder(&mut self.buf); + + // Encode InstrumentationScope.name field (tag 1, string) + self.buf.encode_string(INSTRUMENTATION_SCOPE_NAME, scope_name); + + // Patch InstrumentationScope length + scope_placeholder.patch(&mut self.buf); + + Ok(()) + } +} + +// === Helper functions for encoding LogRecordView === + +// TODO(consolidation): The OTAP batch encoder in `logs.rs` (~110 lines in encode_log_record()) +// duplicates the field encoding logic below. Since OTAP implements LogRecordView (via +// OtapLogRecordView in views/otap/logs.rs), we could refactor logs.rs to: +// 1. Keep its batching/sorting/cursor logic (OTAP-specific) +// 2. Delegate LogRecord field encoding to this function via the view trait +// This would eliminate ~150 lines of duplicated code across encode_log_record, encode_any_value, +// and encode_key_value, making the view-based encoder the canonical implementation for all +// LogRecord encoding. The view trait methods are #[inline] so there's no performance impact. +// Same opportunity exists for traces.rs and metrics.rs encoders. + +/// Encode all fields of a LogRecordView +fn encode_log_record_view(log_record: &impl LogRecordView, buf: &mut ProtoBuffer) -> Result<()> { + // time_unix_nano (field 1, fixed64) + if let Some(time) = log_record.time_unix_nano() { + buf.encode_field_tag(LOG_RECORD_TIME_UNIX_NANO, wire_types::FIXED64); + buf.extend_from_slice(&time.to_le_bytes()); + } + + // severity_number (field 2, varint) + if let Some(severity) = log_record.severity_number() { + buf.encode_field_tag(LOG_RECORD_SEVERITY_NUMBER, wire_types::VARINT); + buf.encode_varint(severity as u64); + } + + // severity_text (field 3, string) + if let Some(text) = log_record.severity_text() { + if !text.is_empty() { + // Convert &[u8] to &str for encode_string + if let Ok(text_str) = std::str::from_utf8(text) { + buf.encode_string(LOG_RECORD_SEVERITY_TEXT, text_str); + } + } + } + + // body (field 5, AnyValue) - encode from AnyValueView + if let Some(body) = log_record.body() { + encode_any_value_view_field(LOG_RECORD_BODY, &body, buf)?; + } + + // attributes (field 6, repeated KeyValue) - encode from AttributeView iterator + for attr in log_record.attributes() { + encode_attribute_view(LOG_RECORD_ATTRIBUTES, &attr, buf)?; + } + + // dropped_attributes_count (field 7, uint32) + let dropped = log_record.dropped_attributes_count(); + if dropped > 0 { + buf.encode_field_tag(LOG_RECORD_DROPPED_ATTRIBUTES_COUNT, wire_types::VARINT); + buf.encode_varint(dropped as u64); + } + + // flags (field 8, fixed32) + if let Some(flags) = log_record.flags() { + buf.encode_field_tag(LOG_RECORD_FLAGS, wire_types::FIXED32); + buf.extend_from_slice(&flags.to_le_bytes()); + } + + // trace_id (field 9, bytes) + if let Some(trace_id) = log_record.trace_id() { + buf.encode_bytes(LOG_RECORD_TRACE_ID, trace_id); + } + + // span_id (field 10, bytes) + if let Some(span_id) = log_record.span_id() { + buf.encode_bytes(LOG_RECORD_SPAN_ID, span_id); + } + + // observed_time_unix_nano (field 11, fixed64) + if let Some(observed_time) = log_record.observed_time_unix_nano() { + buf.encode_field_tag(LOG_RECORD_OBSERVED_TIME_UNIX_NANO, wire_types::FIXED64); + buf.extend_from_slice(&observed_time.to_le_bytes()); + } + + Ok(()) +} + +/// Encode an AttributeView as a length-delimited field +fn encode_attribute_view( + field_tag: u64, + attr: &impl crate::views::common::AttributeView, + buf: &mut ProtoBuffer, +) -> Result<()> { + use crate::proto::consts::field_num::common::*; + + // Start KeyValue message + buf.encode_field_tag(field_tag, wire_types::LEN); + let placeholder = LengthPlaceholder::new(buf.len()); + encode_len_placeholder(buf); + + // Encode key + let key = attr.key(); + if !key.is_empty() { + // Convert &[u8] to &str for encode_string + if let Ok(key_str) = std::str::from_utf8(key) { + buf.encode_string(KEY_VALUE_KEY, key_str); + } + } + + // Encode value (if present) + if let Some(value) = attr.value() { + encode_any_value_view_field(KEY_VALUE_VALUE, &value, buf)?; + } + + // Patch length + placeholder.patch(buf); + + Ok(()) +} + +/// Encode an AnyValueView as a length-delimited field +fn encode_any_value_view_field<'a>( + field_tag: u64, + value: &impl crate::views::common::AnyValueView<'a>, + buf: &mut ProtoBuffer, +) -> Result<()> { + buf.encode_field_tag(field_tag, wire_types::LEN); + let placeholder = LengthPlaceholder::new(buf.len()); + encode_len_placeholder(buf); + + encode_any_value_view_content(value, buf)?; + + placeholder.patch(buf); + Ok(()) +} + +/// Encode the content of an AnyValueView (without the outer field tag) +fn encode_any_value_view_content<'a>( + value: &impl crate::views::common::AnyValueView<'a>, + buf: &mut ProtoBuffer, +) -> Result<()> { + use crate::proto::consts::field_num::common::*; + use crate::views::common::ValueType; + + match value.value_type() { + ValueType::String => { + if let Some(s) = value.as_string() { + // Convert &[u8] to &str for encode_string + if let Ok(s_str) = std::str::from_utf8(s) { + buf.encode_string(ANY_VALUE_STRING_VALUE, s_str); + } + } + } + ValueType::Bool => { + if let Some(b) = value.as_bool() { + buf.encode_field_tag(ANY_VALUE_BOOL_VALUE, wire_types::VARINT); + buf.encode_varint(if b { 1 } else { 0 }); + } + } + ValueType::Int64 => { + if let Some(i) = value.as_int64() { + buf.encode_field_tag(ANY_VALUE_INT_VALUE, wire_types::VARINT); + buf.encode_varint(i as u64); + } + } + ValueType::Double => { + if let Some(d) = value.as_double() { + buf.encode_field_tag(ANY_VALUE_DOUBLE_VALUE, wire_types::FIXED64); + buf.extend_from_slice(&d.to_le_bytes()); + } + } + ValueType::Bytes => { + if let Some(bytes) = value.as_bytes() { + buf.encode_bytes(ANY_VALUE_BYTES_VALUE, bytes); + } + } + ValueType::Array => { + if let Some(mut arr_iter) = value.as_array() { + // Encode ArrayValue + buf.encode_field_tag(ANY_VALUE_ARRAY_VALUE, wire_types::LEN); + let placeholder = LengthPlaceholder::new(buf.len()); + encode_len_placeholder(buf); + + while let Some(val) = arr_iter.next() { + encode_any_value_view_field(ARRAY_VALUE_VALUES, &val, buf)?; + } + + placeholder.patch(buf); + } + } + ValueType::KeyValueList => { + if let Some(mut kvlist_iter) = value.as_kvlist() { + // Encode KeyValueList + buf.encode_field_tag(ANY_VALUE_KVLIST_VALUE, wire_types::LEN); + let placeholder = LengthPlaceholder::new(buf.len()); + encode_len_placeholder(buf); + + while let Some(kv) = kvlist_iter.next() { + encode_attribute_view(KEY_VALUE_LIST_VALUES, &kv, buf)?; + } + + placeholder.patch(buf); + } + } + ValueType::Empty => { + // Empty AnyValue - valid according to spec + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::proto::opentelemetry::common::v1::{ + AnyValue, KeyValue, any_value, + }; + use crate::proto::opentelemetry::resource::v1::Resource; + use crate::schema::{SpanId, TraceId}; + use crate::views::common::{AnyValueView, AttributeView, Str, ValueType}; + use crate::views::logs::LogRecordView; + + // Test helper: Simple LogRecordView implementation + struct SimpleLogRecord { + time_unix_nano: Option, + severity_number: Option, + severity_text: Option<&'static str>, + body: Option<&'static str>, + trace_id: Option, + span_id: Option, + } + + impl LogRecordView for SimpleLogRecord { + type Attribute<'a> + = SimpleAttribute + where + Self: 'a; + type AttributeIter<'a> + = std::iter::Empty> + where + Self: 'a; + type Body<'a> + = SimpleAnyValue + where + Self: 'a; + + fn time_unix_nano(&self) -> Option { + self.time_unix_nano + } + + fn observed_time_unix_nano(&self) -> Option { + self.time_unix_nano // same for tests + } + + fn severity_number(&self) -> Option { + self.severity_number + } + + fn severity_text(&self) -> Option> { + self.severity_text.map(|s| s.as_bytes()) + } + + fn body(&self) -> Option> { + self.body.map(|s| SimpleAnyValue::String(s)) + } + + fn attributes(&self) -> Self::AttributeIter<'_> { + std::iter::empty() + } + + fn dropped_attributes_count(&self) -> u32 { + 0 + } + + fn flags(&self) -> Option { + Some(0) + } + + fn trace_id(&self) -> Option<&TraceId> { + self.trace_id.as_ref() + } + + fn span_id(&self) -> Option<&SpanId> { + self.span_id.as_ref() + } + + fn event_name(&self) -> Option> { + None + } + } + + #[derive(Clone)] + enum SimpleAnyValue { + String(&'static str), + } + + impl<'a> AnyValueView<'a> for SimpleAnyValue { + type KeyValue = SimpleAttribute; + type ArrayIter<'arr> + = std::iter::Empty + where + Self: 'arr; + type KeyValueIter<'kv> + = SimpleAttribute + where + Self: 'kv; + + fn value_type(&self) -> ValueType { + match self { + SimpleAnyValue::String(_) => ValueType::String, + } + } + + fn as_string(&self) -> Option> { + match self { + SimpleAnyValue::String(s) => Some(s.as_bytes()), + } + } + + fn as_bool(&self) -> Option { + None + } + + fn as_int64(&self) -> Option { + None + } + + fn as_double(&self) -> Option { + None + } + + fn as_bytes(&self) -> Option<&[u8]> { + None + } + + fn as_array(&self) -> Option> { + None + } + + fn as_kvlist(&self) -> Option> { + None + } + } + + #[derive(Clone)] + struct SimpleAttribute; + + impl AttributeView for SimpleAttribute { + type Val<'val> + = SimpleAnyValue + where + Self: 'val; + + fn key(&self) -> Str<'_> { + "key".as_bytes() + } + + fn value(&self) -> Option> { + Some(SimpleAnyValue::String("value")) + } + } + + impl Iterator for SimpleAttribute { + type Item = Self; + + fn next(&mut self) -> Option { + None + } + } + + // Helper: Pre-encode a Resource as OtlpBytes + fn encode_resource_bytes(resource: &Resource) -> OtlpBytes { + use crate::proto::consts::field_num::resource::*; + let mut buf = ProtoBuffer::with_capacity(256); + + // Encode ResourceLogs.resource field (tag 1) + buf.encode_field_tag(1, wire_types::LEN); + let start = buf.len(); + encode_len_placeholder(&mut buf); + + // Encode attributes + for attr in &resource.attributes { + encode_attribute_proto(RESOURCE_ATTRIBUTES, attr, &mut buf); + } + + // Patch length + let content_len = buf.len() - start - 4; + patch_len_placeholder(&mut buf, 4, content_len, start); + + buf.into_bytes().to_vec() + } + + + + // Helper to encode protobuf KeyValue (for test helpers) + fn encode_attribute_proto(field_tag: u64, attr: &KeyValue, buf: &mut ProtoBuffer) { + use crate::proto::consts::field_num::common::*; + buf.encode_field_tag(field_tag, wire_types::LEN); + let start = buf.len(); + encode_len_placeholder(buf); + + if !attr.key.is_empty() { + buf.encode_string(KEY_VALUE_KEY, &attr.key); + } + + if let Some(ref value) = attr.value { + encode_any_value_proto(KEY_VALUE_VALUE, value, buf); + } + + let content_len = buf.len() - start - 4; + patch_len_placeholder(buf, 4, content_len, start); + } + + fn encode_any_value_proto(field_tag: u64, value: &AnyValue, buf: &mut ProtoBuffer) { + use crate::proto::consts::field_num::common::*; + buf.encode_field_tag(field_tag, wire_types::LEN); + let start = buf.len(); + encode_len_placeholder(buf); + + match &value.value { + Some(any_value::Value::StringValue(s)) => { + buf.encode_string(ANY_VALUE_STRING_VALUE, s); + } + _ => {} + } + + let content_len = buf.len() - start - 4; + patch_len_placeholder(buf, 4, content_len, start); + } + + #[test] + fn test_encoder_state_machine() { + let mut encoder = StatefulOtlpEncoder::new(1024); + + // Initial state + assert_eq!(encoder.state, EncoderState::Idle); + assert!(encoder.is_empty()); + + // Pre-encode resource + let resource = Resource::default(); + let resource_bytes = encode_resource_bytes(&resource); + let scope_name = "test_module"; + + // Simple log record + let log_record = SimpleLogRecord { + time_unix_nano: Some(1000), + severity_number: Some(9), + severity_text: Some("INFO"), + body: Some("test message"), + trace_id: None, + span_id: None, + }; + + encoder + .encode_log_record(&log_record, &resource_bytes, scope_name) + .unwrap(); + + // Should have data now + assert!(!encoder.is_empty()); + assert_eq!(encoder.state, EncoderState::ScopeOpen); + + // Flush should reset + let bytes = encoder.flush(); + assert!(!bytes.is_empty()); + assert_eq!(encoder.state, EncoderState::Idle); + } + + #[test] + fn test_batching_same_scope() { + let mut encoder = StatefulOtlpEncoder::new(1024); + + let resource = Resource::default(); + let resource_bytes = encode_resource_bytes(&resource); + let scope_name = "test_module"; + + // Encode three records with same scope + for i in 0..3 { + let log_record = SimpleLogRecord { + time_unix_nano: Some(i as u64), + severity_number: Some(9), + severity_text: Some("INFO"), + body: Some("test"), + trace_id: None, + span_id: None, + }; + encoder + .encode_log_record(&log_record, &resource_bytes, scope_name) + .unwrap(); + } + + // Should be in ScopeOpen state (not closed between records) + assert_eq!(encoder.state, EncoderState::ScopeOpen); + + let bytes = encoder.flush(); + assert!(!bytes.is_empty()); + } + + #[test] + fn test_different_scopes_close_and_reopen() { + let mut encoder = StatefulOtlpEncoder::new(4096); + + let resource = Resource::default(); + let resource_bytes = encode_resource_bytes(&resource); + + let scope1_name = "scope1"; + let scope2_name = "scope2"; + + let log_record = SimpleLogRecord { + time_unix_nano: Some(1000), + severity_number: Some(9), + severity_text: Some("INFO"), + body: Some("test"), + trace_id: None, + span_id: None, + }; + + // Encode with scope1 + encoder + .encode_log_record(&log_record, &resource_bytes, scope1_name) + .unwrap(); + assert_eq!(encoder.state, EncoderState::ScopeOpen); + + // Encode with scope2 - should close scope1 and start scope2 + encoder + .encode_log_record(&log_record, &resource_bytes, scope2_name) + .unwrap(); + assert_eq!(encoder.state, EncoderState::ScopeOpen); + + let bytes = encoder.flush(); + assert!(!bytes.is_empty()); + } +} diff --git a/rust/otap-dataflow/crates/pdata/src/testing/equiv/mod.rs b/rust/otap-dataflow/crates/pdata/src/testing/equiv/mod.rs index 72f6820b66..0e100e4a74 100644 --- a/rust/otap-dataflow/crates/pdata/src/testing/equiv/mod.rs +++ b/rust/otap-dataflow/crates/pdata/src/testing/equiv/mod.rs @@ -14,7 +14,7 @@ //! See the corresponding Golang implementation in go/pkg/otel/assert/equiv.go mod canonical; -mod logs; +pub mod logs; mod metrics; mod traces; diff --git a/rust/otap-dataflow/crates/telemetry/ARCHITECTURE.md b/rust/otap-dataflow/crates/telemetry/ARCHITECTURE.md new file mode 100644 index 0000000000..d2439dc771 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/ARCHITECTURE.md @@ -0,0 +1,198 @@ +# Internal Telemetry Collection Architecture & Development Plan + +## Architecture + +The internal telemetry SDK is designed for the engine to safely +consume its own telemetry, and we intend for the self-hosted telemetry +pipeline to be the standard configuration for all OpenTelemetry +signals. + +Consuming self-generated telemetry presents a potential a kind of +feedback loop, situations where a telemetry pipeline creates pressure +on itself. We have designed for the OTAP dataflow engine to remain +reliable even with this kind of dependency on itself. + +## Internal telemetry receiver + +The Internal Telemetry Receiver or "ITR" is an OTAP-Dataflow receiver +component that produces telemetry from internal sources. An internal +telemetry pipeline consists of one or more ITR components and any of +the connected processor and exporter components reachable from ITR +source nodes. + +To begin with, every OTAP-Dataflow comonent is configured with an +internal telemetry SDK meant for primary instrumentation of that +component. Components are required to exclusively use the internal +telemetry SDK for self-diagnostics, as they are considered first party +in this exchange. + +The internal telemetry receiver is the SDK's counterpart, making it +second party as it is responsible for routing internal telemetry. The +ITR cannot use the internal telemetry SDK itself, making it an +invisible member of the pipeline. The ITR can be instrumented using +third-party instrumentation (e.g., `tracing`, `log` crates) provided +it can guarantee there is no potential for feedback (e.g., a single +`tracing::info()` statement at startup). + +## Pitfall avoidance + +The OTAP-Dataflow engine is safeguarded against many self-induced +telemetry pitfalls, as follows: + +- OTAP-Dataflow components reachable from an ITR cannot be configured + to send to an ITR node. This avoids a direct feedback cycle for + internal telemetry because the components cannot reach + themselves. For example, ITR and downstream components may be + configured for raw logging, no metrics, etc. +- ITR instances share access to one or more threads with associated + async runtime. They use these dedicated threads to isolate internal + telemetry processes that use third-party instrumentation. +- A thread-local variable is used to redirect third-party + instrumentation in dedicated internal telemetry threads. Internal + telemetry threads automatically configure a safe configuration + that drop third-party instrumentation instead of creating feedback. +- Components under observation (non-ITR components) have internal + telemetry events routed to queues in the OTAP-Dataflow pipeline on + the same core, this avoids blocking the engine. First-party + instrumentation will be handled on the CPU core that produced the + telemetry under normal circumstances. This isolates cores that are + able to process their own internal telemetry. +- Option to configure internal telemetry multiple ways, including the + no-op implementation, multi-threaded subscriber, routing to the + same-core ITR, and/or raw logging. + +## OTLP-bytes first + +As a key design decision, the OTAP-Dataflow internal telemetry data +path produces OTLP-bytes first. Because OTLP bytes is one of the +builtin `OtapPayload` formats, once we have the OTLP bytes encoding of +an event we are able to send to an OTAP-Dataflow pipeline. To obtain +these bytes, we will build a custom [Tokio `tracing` +Event][TOKIOEVENT] handler to produce OTLP bytes before dispatching to +an internal pipeline, used (in different configurations) for first and +third-party instrumentation. + +We use an intermediate representation in which the dynamic elements of +the `tracing` event are encoded while primtive fields and metadata +remain in structured form. These are encoded to the OTLP `LogRecord` +protocol. + +[TOKIOEVENT]: https://docs.rs/tracing/latest/tracing/struct.Event.html + +## Raw logging + +We support formatting events for direct printing to the console from +OTLP bytes. For the dynamic encoding, these are consumed using +`otap_df_pdata::views::logs::LogsDataView` (which is forbidden from +using Tokio `tracing`), our zero-copy accessor. We refer to this +most-basic form of printing to the console as raw logging because it +is a safe configuration early in the lifetime of a process. + +This configuration is meant for development purposes, it is likely to +introduce contention over the console. + +## Routing + +The two internal logs data paths are: + +- Third-party: Tokio `tracing` global subscriber: third-party log + events, instrumentation in code without access to an OTAP-Dataflow + `EffectHandler`. These are handled in a dedicated internal telemetry + thread. +- First-party: components with a local or shared `EffectHandler` use + dedicated macros (e.g., `otel_info!(effect, "interesting thing")`), + these use the configured internal telemetry SDK and for ordinary + components (not ITR-downstream) these are routed through the ITR the + same core. These are always non-blocking APIs, the internal SDK must + drop logs instead of blocking the pipeline. + +## Development plan + +Each of the items below is relatively small, estimated at 300-500 +lines of new code plus new tests. + +### LogRecord: Tokio tracing Event and Metadata to LogRecordView + +When we receive a Tokio tracing event whether through a +`tracing::info!` macro (or similar) or through a dedicated +`EffectHandler`-based API, the same happens: + +Create a `LogRecord`, a struct derived from `tracing::Event` and +`tracing::Metadata`, containing raw LogRecord fields extracted from +the tracing macro layer plus a fresh timestamp. Log record attributes +and the log event body are encoded as the "attributes and body bytes" +field of `LogRecord`, the other fields are copied. + +With this record, we can defer formatting or encoding the entire +record until later. We can: + +- For raw logging, format directly for the console +- Finish the full OTLP bytes encoding for the `LogRecord` +- Sort and filter before combining into a `LogsData`. + +### OTLP-bytes console logging handler + +We require a way to print OTLP bytes as human-readable log lines. We +cannot easily re-use the Tokio `tracing` format layer for this, +however we can use the `LogsDataView` trait with `RawLogsData` to +format human-readable text for the console directly from OTLP bytes. + +This OTLP-bytes-to-human-readable logic will be used to implement raw +logging. + +### Global logs collection thread + +An OTAP-Dataflow engine will run at least one global logs collection +thread. These threads receive encoded (OTLP bytes) log events from +various locations in the process. The global logs collection thread is +special because it sets a special anti-recursion bit in the +thread-local state to prevent logging in its own export path + +The global logs collection thread is configured as one (or more, if +needed) instances consuming logs from the global Tokio `tracing` +subscriber. In this thread, we'll configure the OpenTelemetry SDK or a +dedicated OTAP-Dataflow pipeline (by configuration) for logs export. + +Because global logs collection threads are used as a fallback for +`EffectHandler`-level logs and because third-party libraries generally +could call Tokio `tracing` APIs, we arrange to explicitly disallow +these threads from logging. The macros are disabled from executing. + +### Global and Per-core Event Router + +OTAP-Dataflow provides an option to route internal telemetry to a pipeline +in the same effect handler that produced the telemetry. When a component +logging API is used on the `EffectHandler` or when a tokio `tracing` event +occurs on the `EffectHandler` thread, it will be routed using thread-local +state so that event is immediately encoded and stored or flushed, without +blocking the effect handler. + +When a telemetry event is routed directly, as in this case and +`send_message()` succeeds, it means there was queue space to accept +the log record on the same core. When this fails, the configurable +telemetry router will support options to use global logs collection +thread, a raw logger, or do nothing (dropping the internal log +record). + +## Example configuration + +```yaml +service: + telemetry: + logs: + level: info + internal_collection: + enabled: true + + # Per-thread buffer + buffer_size_bytes: 65536 + + # Individual record size limit + max_record_bytes: 16384 + + # Bounded channel capacity + max_record_count: 10 + + # Timer-based flush interval + flush_interval: "1s" +``` diff --git a/rust/otap-dataflow/crates/telemetry/Cargo.toml b/rust/otap-dataflow/crates/telemetry/Cargo.toml index aae85d1f8a..696c12d8d0 100644 --- a/rust/otap-dataflow/crates/telemetry/Cargo.toml +++ b/rust/otap-dataflow/crates/telemetry/Cargo.toml @@ -20,6 +20,8 @@ unchecked-arithmetic = [] [dependencies] otap-df-config = { workspace = true } +otap-df-pdata = { workspace = true } +bytes = { workspace = true } flume = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } @@ -36,3 +38,4 @@ opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics", "l opentelemetry-appender-tracing = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } +prost = { workspace = true } diff --git a/rust/otap-dataflow/crates/telemetry/src/lib.rs b/rust/otap-dataflow/crates/telemetry/src/lib.rs index f28001364d..814a302138 100644 --- a/rust/otap-dataflow/crates/telemetry/src/lib.rs +++ b/rust/otap-dataflow/crates/telemetry/src/lib.rs @@ -41,6 +41,8 @@ pub mod opentelemetry_client; pub mod registry; pub mod reporter; pub mod semconv; +/// Integration with tokio-tracing for OTLP encoding +pub mod tracing_integration; // Re-export _private module from internal_events for macro usage. // This allows the otel_info!, otel_warn!, etc. macros to work in other crates diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs index d14dc84a39..3cede9fe78 100644 --- a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs +++ b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs @@ -3,7 +3,6 @@ //! Configures the OpenTelemetry logger provider based on the provided configuration. -use opentelemetry_appender_tracing::layer; use opentelemetry_otlp::{Protocol, WithExportConfig}; use opentelemetry_sdk::{Resource, logs::SdkLoggerProvider}; use otap_df_config::pipeline::service::telemetry::{ @@ -17,10 +16,50 @@ use otap_df_config::pipeline::service::telemetry::{ metrics::readers::periodic::otlp::OtlpProtocol, }; use tracing::level_filters::LevelFilter; +use tracing::Level; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, layer::SubscriberExt}; +use std::future::Future; +use std::io::{self, Write}; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc; +use tokio_util::bytes::Bytes; use crate::error::Error; +use crate::tracing_integration::{ + OtlpTracingLayer, OtlpBytesFormattingLayer, OtlpBytesChannel +}; +use otap_df_pdata::otlp::stateful_encoder::StatefulOtlpEncoder; + +/// Writer that routes to stdout or stderr based on log level. +/// +/// This implements the MakeWriter trait to provide different writers +/// for different severity levels: +/// - TRACE, DEBUG, INFO → stdout +/// - WARN, ERROR → stderr +#[derive(Clone)] +struct LevelBasedWriter; + +impl LevelBasedWriter { + fn new() -> Self { + LevelBasedWriter + } +} + +impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for LevelBasedWriter { + type Writer = Box; + + fn make_writer(&'a self) -> Self::Writer { + Box::new(io::stdout()) + } + + fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer { + match *meta.level() { + Level::ERROR | Level::WARN => Box::new(io::stderr()), + _ => Box::new(io::stdout()), + } + } +} /// Provider for configuring OpenTelemetry Logger. pub struct LoggerProvider { @@ -96,18 +135,12 @@ impl LoggerProvider { EnvFilter::new(format!("{level},h2=off,hyper=off")) }); - // Formatting layer - let fmt_layer = tracing_subscriber::fmt::layer().with_thread_names(true); + // Set up our custom OTLP-based tracing (replaces OpenTelemetryTracingBridge) + // This uses synchronous OTLP encoding → formatting for console output + Self::init_console_tracing_with_filter(filter); - let sdk_layer = layer::OpenTelemetryTracingBridge::new(&sdk_logger_provider); - - // Try to initialize the global subscriber. In tests, this may fail if already set, - // which is acceptable as we're only validating the configuration works. - let _ = tracing_subscriber::registry() - .with(filter) - .with(fmt_layer) - .with(sdk_layer) - .try_init(); + // Note: We keep the sdk_logger_provider for applications that want to use + // the OpenTelemetry SDK directly (for OTLP/stdout exporters, not for tracing) Ok(LoggerProvider { sdk_logger_provider, @@ -120,6 +153,243 @@ impl LoggerProvider { (self.sdk_logger_provider, self.runtime) } + /// Initialize console tracing with our custom OTLP-based formatter. + /// + /// This sets up synchronous OTLP encoding → decoding → formatting: + /// - All tracing events are encoded to OTLP bytes + /// - Immediately decoded and formatted to console + /// - INFO/DEBUG/TRACE → stdout, WARN/ERROR → stderr + /// - Colorized output with ISO8601 timestamps and thread names + /// + /// This replaces the OpenTelemetryTracingBridge which would send events + /// through the OpenTelemetry SDK's batching/exporting pipeline. + fn init_console_tracing_with_filter(filter: EnvFilter) { + // Create a stateful encoder (shared across all events) + let encoder = Arc::new(Mutex::new(StatefulOtlpEncoder::new(4096))); + + // Create the formatter that will decode OTLP bytes and write to stdout/stderr + let formatter = Arc::new( + OtlpBytesFormattingLayer::new(LevelBasedWriter::new()) + .with_ansi(true) + .with_timestamp(true) + .with_level(true) + .with_target(true) + .with_event_name(false) // Don't show event_name by default (less noise) + .with_thread_names(true) // Show thread names like the original fmt layer + ); + + // Create the OTLP layer that captures events, encodes them, and formats synchronously + let encoder_clone = encoder.clone(); + let formatter_clone = formatter.clone(); + + let otlp_layer = OtlpTracingLayer::new(move |log_record| { + // Encode to OTLP bytes (synchronous) + let mut enc = encoder_clone.lock().unwrap(); + let resource_bytes = Vec::new(); + let target = log_record.target(); + + if let Ok(_) = enc.encode_log_record(&log_record, &resource_bytes, target) { + // Flush and get the bytes + let otlp_bytes = enc.flush(); + + // Format immediately (synchronous - no channel) + let _ = formatter_clone.format_otlp_bytes(otlp_bytes.as_ref()); + } + }); + + // Try to initialize the global subscriber. In tests, this may fail if already set, + // which is acceptable as we're only validating the configuration works. + let _ = tracing_subscriber::registry() + .with(otlp_layer) + .with(filter) + .try_init(); + } + + /// Initialize default console tracing for applications. + /// + /// This is a convenience method for applications that want simple console + /// logging without configuring the full OpenTelemetry pipeline. + /// + /// Uses the same OTLP-based architecture but with a default filter: + /// - INFO level for OTAP components + /// - WARN level for third-party libraries + /// - Respects RUST_LOG environment variable if set + pub fn init_default_console_tracing() { + let filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| { + // Default: INFO level for df_engine components, WARN for third-party + EnvFilter::new("otap_df=info,warn") + }); + + Self::init_console_tracing_with_filter(filter); + } + + /// Initialize channel-based tracing for normal operation. + /// + /// This transitions from synchronous console logging to multi-threaded channel-based logging. + /// Call this after initial startup when the admin runtime is ready. + /// + /// This sets up: + /// - Global tracing subscriber that encodes to OTLP bytes and sends to channel + /// - Returns OtlpBytesChannel for spawning consumer task in admin runtime + /// + /// The global subscriber can block the caller when channel is full, providing backpressure. + /// This is acceptable for 3rd party instrumentation where threaded logging is the norm. + /// + /// # Arguments + /// * `channel_capacity` - Bounded channel capacity (e.g., 1000) + /// * `filter` - Environment filter for log levels + /// + /// # Returns + /// Returns OtlpBytesChannel - split it and spawn consumer task in admin runtime + /// + /// # Example + /// ```ignore + /// // IMPORTANT: Log the transition BEFORE init (goes to old synchronous subscriber) + /// tracing::info!( + /// mode = "channel-based", + /// capacity = 1000, + /// "Transitioning to multi-threaded channel-based logging for 3rd party instrumentation" + /// ); + /// + /// let channel = LoggerProvider::init_channel_based_tracing(1000, filter); + /// let receiver = channel.into_receiver(); + /// + /// // Spawn consumer in admin runtime (all subsequent events go here) + /// runtime.spawn(async move { + /// LoggerProvider::run_console_formatter_task(receiver).await; + /// }); + /// ``` + pub fn init_channel_based_tracing( + channel_capacity: usize, + filter: EnvFilter, + ) -> OtlpBytesChannel { + let channel = OtlpBytesChannel::new(channel_capacity); + let sender = channel.sender().clone(); + + // Create encoder (one per global subscriber) + let encoder = Arc::new(Mutex::new(StatefulOtlpEncoder::new(4096))); + + // Create the OTLP layer that captures events, encodes them, and sends to channel + let otlp_layer = OtlpTracingLayer::new(move |log_record| { + // Encode to OTLP bytes (synchronous) + let mut enc = encoder.lock().unwrap(); + let resource_bytes = Vec::new(); + let target = log_record.target(); + + if let Ok(_) = enc.encode_log_record(&log_record, &resource_bytes, target) { + // Flush and get the bytes + let otlp_bytes = enc.flush(); + + // Send to channel (can block caller if channel is full - provides backpressure) + let _ = sender.send(otlp_bytes); + } + }); + + // Initialize the global subscriber + let _ = tracing_subscriber::registry() + .with(otlp_layer) + .with(filter) + .try_init(); + + channel + } + + /// Run the console formatter task that consumes OTLP bytes from channel. + /// + /// This should be spawned as a task in the admin runtime: + /// ```ignore + /// runtime.spawn(async move { + /// LoggerProvider::run_console_formatter_task(receiver).await; + /// }); + /// ``` + /// + /// The task will run until the channel is closed (all senders dropped). + /// In production, this runs for the lifetime of the application. + /// + /// This provides single-threaded formatting in the admin runtime. + /// Use this for human-readable console output. + pub async fn run_console_formatter_task(receiver: mpsc::Receiver) { + let formatter = OtlpBytesFormattingLayer::new(LevelBasedWriter::new()) + .with_ansi(true) + .with_timestamp(true) + .with_level(true) + .with_target(true) + .with_event_name(false) + .with_thread_names(true); + + // Run in a blocking task since mpsc::Receiver::recv() is blocking + let _ = tokio::task::spawn_blocking(move || { + while let Ok(otlp_bytes) = receiver.recv() { + let _ = formatter.format_otlp_bytes(&otlp_bytes); + } + }).await; + } + + /// Run the OTLP bytes forwarder task that consumes from channel. + /// + /// This should be spawned as a task in the admin runtime: + /// ```ignore + /// runtime.spawn(async move { + /// LoggerProvider::run_otlp_forwarder_task(receiver, sender).await; + /// }); + /// ``` + /// + /// This forwards OTLP bytes to the internal telemetry receiver which bridges + /// to the OTAP pipeline. The internal receiver can then export via the + /// built-in OTLP exporter or any other OTAP exporter. + /// + /// # Arguments + /// * `receiver` - Channel receiver from global tracing subscriber + /// * `forward_fn` - Async function to forward OTLP bytes (e.g., to internal telemetry receiver) + pub async fn run_otlp_forwarder_task( + receiver: mpsc::Receiver, + mut forward_fn: F, + ) + where + F: FnMut(Bytes) -> Fut + Send + 'static, + Fut: Future> + Send, + { + // Run in a blocking task since mpsc::Receiver::recv() is blocking + let _ = tokio::task::spawn_blocking(move || { + while let Ok(otlp_bytes) = receiver.recv() { + // We need to block on the async forward_fn from within spawn_blocking + // This is acceptable since this is the dedicated logging task + let rt = tokio::runtime::Handle::current(); + let result = rt.block_on(forward_fn(otlp_bytes)); + if result.is_err() { + // If forwarding fails, we could fall back to stderr or drop + // For now, just continue (dropping the event) + eprintln!("Failed to forward OTLP bytes to telemetry receiver"); + } + } + }).await; + } + + // Note: Support for OpenTelemetry SDK exporters (run_otel_sdk_exporter_task) has been + // removed temporarily. It required decoding OTLP bytes back to SdkLogRecord, which had + // type compatibility issues. This can be added back in the future if needed. + + /// Initialize channel-based tracing with default filter. + /// + /// Convenience method that uses default filter (INFO for OTAP, WARN for third-party). + /// + /// # Arguments + /// * `channel_capacity` - Bounded channel capacity (e.g., 1000) + /// + /// # Returns + /// Returns OtlpBytesChannel - call `.into_receiver()` and spawn consumer task + pub fn init_default_channel_based_tracing( + channel_capacity: usize, + ) -> OtlpBytesChannel { + let filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| { + EnvFilter::new("otap_df=info,warn") + }); + + Self::init_channel_based_tracing(channel_capacity, filter) + } + fn configure_log_processor( sdk_logger_builder: opentelemetry_sdk::logs::LoggerProviderBuilder, processor_config: &otap_df_config::pipeline::service::telemetry::logs::processors::LogProcessorConfig, @@ -264,6 +534,7 @@ mod tests { }, ), ], + internal_collection: Default::default(), }; let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; let (sdk_logger_provider, _) = logger_provider.into_parts(); @@ -292,6 +563,7 @@ mod tests { }, ), ], + internal_collection: Default::default(), }; let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; let (sdk_logger_provider, runtime_option) = logger_provider.into_parts(); @@ -311,6 +583,7 @@ mod tests { let logger_config = LogsConfig { level: LogLevel::default(), processors: vec![], + internal_collection: Default::default(), }; let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; let (sdk_logger_provider, _) = logger_provider.into_parts(); diff --git a/rust/otap-dataflow/crates/telemetry/src/tracing_integration/log_record.rs b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/log_record.rs new file mode 100644 index 0000000000..9e8bea1f70 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/log_record.rs @@ -0,0 +1,479 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! LogRecordView implementation for tokio-tracing events. +//! +//! This module provides the bridge between tracing::Event and our OTLP stateful encoder, +//! allowing tracing events to be encoded directly without intermediate allocations. + +use otap_df_pdata::schema::{SpanId, TraceId}; +use otap_df_pdata::views::common::{AnyValueView, AttributeView, Str, ValueType}; +use otap_df_pdata::views::logs::LogRecordView; +use std::fmt; +use tracing::{Level, Metadata}; + +/// A LogRecordView implementation that wraps a tracing event. +/// +/// This provides zero-copy access to tracing event data, enabling direct encoding +/// to OTLP format via the stateful encoder. +/// +/// # Lifetime +/// The lifetime parameter `'a` represents the lifetime of the underlying tracing::Event +/// and all its associated data (field values, metadata, etc.). +pub struct TracingLogRecord { + /// The event name from the `name` field, if present + event_name: Option, + + /// The severity level from tracing + level: Level, + + /// Timestamp when the event occurred (nanoseconds since Unix epoch) + timestamp_nanos: u64, + + /// The target (typically module path) from tracing metadata + target: String, + + /// Collected attributes from the event's fields + attributes: Vec, + + /// Optional body/message for the log record (stored as TracingAnyValue) + body: Option, +} + +impl TracingLogRecord { + /// Creates a new TracingLogRecord from tracing event components. + /// + /// # Arguments + /// * `metadata` - The event's metadata (level, target, name with file:line) + /// * `attributes` - Key-value pairs extracted from event fields + /// * `timestamp_nanos` - Event timestamp in nanoseconds since Unix epoch + /// + /// Note: metadata.name() contains both the event location and file:line info, + /// e.g., "event src/main.rs:42", so we don't need to separately track file/line. + pub fn new( + metadata: &Metadata<'_>, + attributes: Vec, + timestamp_nanos: u64, + ) -> Self { + Self { + event_name: Some(metadata.name().to_string()), + level: *metadata.level(), + timestamp_nanos, + target: metadata.target().to_string(), + attributes, + body: None, // Can be populated from message field + } + } + + /// Sets the body/message for this log record. + pub fn with_body(mut self, body: String) -> Self { + self.body = Some(TracingAnyValue::Str(body)); + self + } + + /// Returns the target (typically module path) for this log record. + pub fn target(&self) -> &str { + &self.target + } + + /// Creates a TracingLogRecord with a custom event name (for span events). + pub fn new_with_event_name( + metadata: &Metadata<'_>, + attributes: Vec, + timestamp_nanos: u64, + event_name: String, + ) -> Self { + Self { + event_name: Some(event_name), + level: *metadata.level(), + timestamp_nanos, + target: metadata.target().to_string(), + attributes, + body: None, + } + } + + /// Creates a minimal TracingLogRecord for span end events. + pub fn new_span_end( + span_id: u64, + attributes: Vec, + timestamp_nanos: u64, + ) -> Self { + Self { + event_name: Some(format!("span.end (id:{})", span_id)), + level: Level::INFO, + timestamp_nanos, + target: "tracing::span".to_string(), + attributes, + body: None, + } + } +} + +impl LogRecordView for TracingLogRecord { + type Attribute<'att> = TracingAttributeView<'att> + where + Self: 'att; + + type AttributeIter<'att> = TracingAttributeIterator<'att> + where + Self: 'att; + + type Body<'bod> = TracingAnyValue + where + Self: 'bod; + + fn time_unix_nano(&self) -> Option { + Some(self.timestamp_nanos) + } + + fn observed_time_unix_nano(&self) -> Option { + // For tracing events, observed time = event time + Some(self.timestamp_nanos) + } + + fn severity_number(&self) -> Option { + // Map tracing Level to OTLP severity numbers + // https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber + Some(match self.level { + Level::TRACE => 1, // TRACE + Level::DEBUG => 5, // DEBUG + Level::INFO => 9, // INFO + Level::WARN => 13, // WARN + Level::ERROR => 17, // ERROR + }) + } + + fn severity_text(&self) -> Option> { + Some(self.level.as_str().as_bytes()) + } + + fn body(&self) -> Option> { + self.body.clone() + } + + fn attributes(&self) -> Self::AttributeIter<'_> { + TracingAttributeIterator { + inner: self.attributes.iter(), + } + } + + fn dropped_attributes_count(&self) -> u32 { + 0 // We don't drop attributes in this implementation + } + + fn flags(&self) -> Option { + None // No flags for now + } + + fn trace_id(&self) -> Option<&TraceId> { + None // TODO: Extract from tracing span context when available + } + + fn span_id(&self) -> Option<&SpanId> { + None // TODO: Extract from tracing span context when available + } + + fn event_name(&self) -> Option> { + self.event_name.as_ref().map(|s| s.as_bytes()) + } +} + +/// Represents an attribute (key-value pair) from a tracing event. +/// +/// This wraps tracing field data into a structure compatible with OTLP encoding. +#[derive(Debug, Clone)] +pub struct TracingAttribute { + /// The attribute key + pub key: String, + /// The attribute value + pub value: TracingAnyValue, +} + +/// Wrapper for TracingAttribute that implements AttributeView +pub struct TracingAttributeView<'a> { + attribute: &'a TracingAttribute, +} + +impl<'a> AttributeView for TracingAttributeView<'a> { + type Val<'val> = TracingAnyValue + where + Self: 'val; + + fn key(&self) -> Str<'_> { + self.attribute.key.as_bytes() + } + + fn value(&self) -> Option> { + Some(self.attribute.value.clone()) + } +} + +/// Iterator wrapper for TracingAttribute slice +pub struct TracingAttributeIterator<'a> { + inner: std::slice::Iter<'a, TracingAttribute>, +} + +impl<'a> Iterator for TracingAttributeIterator<'a> { + type Item = TracingAttributeView<'a>; + + fn next(&mut self) -> Option { + self.inner.next().map(|attr| TracingAttributeView { attribute: attr }) + } +} + +/// Represents a value from a tracing event field. +/// +/// This mirrors OTLP's AnyValue type system, supporting full structural fidelity +/// for nested data from tracing events (arrays, maps, etc.). +#[derive(Debug, Clone)] +pub enum TracingAnyValue { + /// String value + Str(String), + /// Integer value (i64) + Int(i64), + /// Boolean value + Bool(bool), + /// Double-precision floating point value + Double(f64), + /// Bytes value + Bytes(Vec), + /// Array of values + Array(Vec), + /// Key-value list (like a map/object) + KeyValueList(Vec), +} + +/// Iterator for nested KeyValueList attributes +pub struct KeyValueListIterator { + inner: std::vec::IntoIter, +} + +impl Iterator for KeyValueListIterator { + type Item = TracingAttributeOwned; + + fn next(&mut self) -> Option { + self.inner.next().map(|attr| TracingAttributeOwned { attribute: attr }) + } +} + +/// Owned wrapper for TracingAttribute that implements AttributeView +pub struct TracingAttributeOwned { + attribute: TracingAttribute, +} + +impl AttributeView for TracingAttributeOwned { + type Val<'val> = TracingAnyValue + where + Self: 'val; + + fn key(&self) -> Str<'_> { + self.attribute.key.as_bytes() + } + + fn value(&self) -> Option> { + Some(self.attribute.value.clone()) + } +} + +/// Iterator for array values +pub struct ArrayIterator { + inner: std::vec::IntoIter, +} + +impl Iterator for ArrayIterator { + type Item = TracingAnyValue; + + fn next(&mut self) -> Option { + self.inner.next() + } +} + +impl<'a> AnyValueView<'a> for TracingAnyValue { + type KeyValue = TracingAttributeOwned; + type ArrayIter<'arr> = ArrayIterator + where + Self: 'arr; + type KeyValueIter<'kv> = KeyValueListIterator + where + Self: 'kv; + + fn value_type(&self) -> ValueType { + match self { + TracingAnyValue::Str(_) => ValueType::String, + TracingAnyValue::Int(_) => ValueType::Int64, + TracingAnyValue::Bool(_) => ValueType::Bool, + TracingAnyValue::Double(_) => ValueType::Double, + TracingAnyValue::Bytes(_) => ValueType::Bytes, + TracingAnyValue::Array(_) => ValueType::Array, + TracingAnyValue::KeyValueList(_) => ValueType::KeyValueList, + } + } + + fn as_string(&self) -> Option> { + match self { + TracingAnyValue::Str(s) => Some(s.as_bytes()), + _ => None, + } + } + + fn as_bool(&self) -> Option { + match self { + TracingAnyValue::Bool(b) => Some(*b), + _ => None, + } + } + + fn as_int64(&self) -> Option { + match self { + TracingAnyValue::Int(i) => Some(*i), + _ => None, + } + } + + fn as_double(&self) -> Option { + match self { + TracingAnyValue::Double(d) => Some(*d), + _ => None, + } + } + + fn as_bytes(&self) -> Option<&[u8]> { + match self { + TracingAnyValue::Bytes(b) => Some(b.as_slice()), + _ => None, + } + } + + fn as_array(&self) -> Option> { + match self { + TracingAnyValue::Array(arr) => Some(ArrayIterator { + inner: arr.clone().into_iter(), + }), + _ => None, + } + } + + fn as_kvlist(&self) -> Option> { + match self { + TracingAnyValue::KeyValueList(kvs) => Some(KeyValueListIterator { + inner: kvs.clone().into_iter(), + }), + _ => None, + } + } +} + +// Implement Display for easier debugging +impl fmt::Display for TracingAnyValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TracingAnyValue::Str(s) => write!(f, "{}", s), + TracingAnyValue::Int(i) => write!(f, "{}", i), + TracingAnyValue::Bool(b) => write!(f, "{}", b), + TracingAnyValue::Double(d) => write!(f, "{}", d), + TracingAnyValue::Bytes(b) => write!(f, "{:?}", b), + TracingAnyValue::Array(arr) => { + write!(f, "[")?; + for (i, v) in arr.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{}", v)?; + } + write!(f, "]") + } + TracingAnyValue::KeyValueList(kvs) => { + write!(f, "{{")?; + for (i, kv) in kvs.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{}: {}", kv.key, kv.value)?; + } + write!(f, "}}") + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tracing_log_record_creation() { + // Create a mock metadata (in real usage this comes from tracing) + let _level = Level::INFO; + + let _attributes = vec![ + TracingAttribute { + key: "key1".to_string(), + value: TracingAnyValue::Str("value1".to_string()), + }, + TracingAttribute { + key: "count".to_string(), + value: TracingAnyValue::Int(42), + }, + ]; + + // Note: In real usage, metadata comes from tracing::Event + // For this test, we'll test the TracingLogRecord structure directly + let _timestamp = 1234567890000000000u64; + + // Test basic construction and access + let key1 = "key1".to_string(); + let value1 = TracingAnyValue::Str("value1".to_string()); + let attr = TracingAttribute { key: key1, value: value1 }; + + assert_eq!(attr.key, "key1"); + match &attr.value { + TracingAnyValue::Str(s) => assert_eq!(s, "value1"), + _ => panic!("Expected string value"), + } + } + + #[test] + fn test_severity_mapping() { + // Test that tracing levels map correctly to OTLP severity numbers + let levels_and_numbers = [ + (Level::TRACE, 1), + (Level::DEBUG, 5), + (Level::INFO, 9), + (Level::WARN, 13), + (Level::ERROR, 17), + ]; + + for (level, expected_number) in levels_and_numbers { + let severity_number = match level { + Level::TRACE => 1, + Level::DEBUG => 5, + Level::INFO => 9, + Level::WARN => 13, + Level::ERROR => 17, + }; + assert_eq!(severity_number, expected_number); + } + } + + #[test] + fn test_any_value_types() { + use otap_df_pdata::views::common::AnyValueView; + + let str_val = TracingAnyValue::Str("test".to_string()); + assert!(str_val.as_string().is_some()); + assert!(str_val.as_int64().is_none()); + + let int_val = TracingAnyValue::Int(123); + assert!(int_val.as_int64().is_some()); + assert_eq!(int_val.as_int64().unwrap(), 123); + + let bool_val = TracingAnyValue::Bool(true); + assert!(bool_val.as_bool().is_some()); + assert_eq!(bool_val.as_bool().unwrap(), true); + + let double_val = TracingAnyValue::Double(3.14); + assert!(double_val.as_double().is_some()); + assert!((double_val.as_double().unwrap() - 3.14).abs() < f64::EPSILON); + } +} diff --git a/rust/otap-dataflow/crates/telemetry/src/tracing_integration/mod.rs b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/mod.rs new file mode 100644 index 0000000000..aa3d64a3be --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/mod.rs @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Integration between tokio-tracing and OTLP stateful encoder. +//! +//! This module bridges tokio-tracing events into our stateful OTLP encoder by implementing +//! the `LogRecordView` trait for tracing events. This enables both: +//! 1. Using tokio macros like `info!`, `warn!`, etc. in the engine +//! 2. Capturing 3rd-party libraries' tracing events through our pipeline +//! +//! The integration consists of: +//! - `TracingLogRecord`: A `LogRecordView` impl wrapping tracing::Event +//! - `OtlpTracingLayer`: A tracing subscriber layer that captures events +//! - Helper functions to extract data from tracing events + +pub mod log_record; +pub mod subscriber; +pub mod otlp_bytes_formatter; +pub mod otlp_bytes_channel; + +pub use log_record::{TracingAttribute, TracingAnyValue, TracingLogRecord}; +pub use subscriber::OtlpTracingLayer; +pub use otlp_bytes_formatter::{OtlpBytesFormattingLayer, FormatError}; +pub use otlp_bytes_channel::{OtlpBytesChannel, OtlpBytesConsumerConfig, OtlpBytesChannelStats}; diff --git a/rust/otap-dataflow/crates/telemetry/src/tracing_integration/otlp_bytes_channel.rs b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/otlp_bytes_channel.rs new file mode 100644 index 0000000000..f14a8df128 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/otlp_bytes_channel.rs @@ -0,0 +1,157 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP bytes channel abstraction for multi-threaded telemetry. +//! +//! This provides a common pattern used in multiple places: +//! - Admin runtime: 3rd party logging via global tracing subscriber +//! - Internal telemetry receiver: Component logging bridge to OTAP pipeline +//! - Thread-per-core: Per-thread logging with dedicated channels +//! +//! Architecture: +//! ```text +//! Producer(s) → mpsc::Sender → Channel → mpsc::Receiver → Consumer +//! ↓ +//! Console | OTLP | Custom handler +//! ``` + +use bytes::Bytes; +use std::sync::mpsc; + +/// Configuration for how to consume OTLP bytes from the channel. +/// +/// All 3rd party logging goes through our custom subscriber → OTLP bytes → channel. +/// This enum determines how those bytes are consumed in the admin runtime: +/// +/// - **Console**: Human-readable formatting (our builtin formatter) +/// - **InternalReceiver**: Forward to OTAP pipeline (our builtin OTLP path) +/// - **OtelSdkExporter**: Use any OpenTelemetry SDK exporter (stdout, OTLP, custom) +/// +/// This unified architecture means: +/// 1. ALL 3rd party logs use the same channel-based path +/// 2. No need for OpenTelemetryTracingBridge (we decode OTLP → OTel format if needed) +/// 3. Flexible backend choice while keeping single-threaded consumption +#[derive(Debug, Clone)] +pub enum OtlpBytesConsumerConfig { + /// Format and write to console (stdout/stderr based on level). + /// Uses our builtin formatter for human-readable output. + Console { + /// Enable ANSI color codes + ansi: bool, + /// Include ISO8601 timestamps + timestamp: bool, + /// Include log level (INFO, WARN, etc.) + level: bool, + /// Include target/scope name + target: bool, + /// Include event name field + event_name: bool, + /// Include thread names + thread_names: bool, + }, + + /// Forward to internal telemetry receiver (bridges to OTAP pipeline). + /// Uses our builtin OTLP exporter to send to the internal receiver, + /// which then goes through the OTAP pipeline for processing/export. + InternalReceiver { + // Future: configuration for the internal receiver + }, + + /// Use an OpenTelemetry SDK exporter. + /// OTLP bytes are decoded to OpenTelemetry LogData and passed to the SDK exporter. + /// This allows using any OTel SDK exporter (stdout, OTLP, custom) while keeping + /// our unified channel-based architecture. + OtelSdkExporter { + /// Exporter type identifier (e.g., "stdout", "otlp-grpc", "otlp-http") + exporter_type: String, + /// Configuration for the specific exporter (JSON or similar) + config: std::collections::HashMap, + }, +} + +impl OtlpBytesConsumerConfig { + /// Create default console configuration (matches current behavior) + pub fn default_console() -> Self { + Self::Console { + ansi: true, + timestamp: true, + level: true, + target: true, + event_name: false, + thread_names: true, + } + } +} + +/// OTLP bytes channel for single-producer, single-consumer telemetry. +/// +/// This encapsulates the mpsc channel pattern used throughout the telemetry system. +/// Multiple producers can share the sender (wrapped in Arc), but there's typically +/// one consumer task per channel. +pub struct OtlpBytesChannel { + sender: mpsc::SyncSender, + receiver: mpsc::Receiver, +} + +impl OtlpBytesChannel { + /// Create a new OTLP bytes channel with bounded capacity. + /// + /// # Arguments + /// * `capacity` - Maximum number of OTLP byte buffers to queue + /// + /// When the channel is full, senders will block until space is available. + /// This provides backpressure. + pub fn new(capacity: usize) -> Self { + let (sender, receiver) = mpsc::sync_channel(capacity); + Self { sender, receiver } + } + + /// Split into sender and receiver parts. + /// + /// The sender can be cloned and shared across multiple producers. + /// The receiver should be moved to a single consumer task. + pub fn split(self) -> (mpsc::SyncSender, mpsc::Receiver) { + (self.sender, self.receiver) + } + + /// Get a reference to the sender (for cloning). + pub fn sender(&self) -> &mpsc::SyncSender { + &self.sender + } + + /// Take the receiver (consumes self). + pub fn into_receiver(self) -> mpsc::Receiver { + self.receiver + } +} + +/// Statistics about OTLP bytes channel consumption. +#[derive(Debug, Default, Clone)] +pub struct OtlpBytesChannelStats { + /// Total number of OTLP byte buffers received + pub buffers_received: u64, + + /// Total bytes processed + pub bytes_processed: u64, + + /// Number of format/forward errors + pub errors: u64, +} + +impl OtlpBytesChannelStats { + /// Create new statistics tracker. + pub fn new() -> Self { + Self::default() + } + + /// Record a successfully processed buffer. + pub fn record_buffer(&mut self, size: usize) { + self.buffers_received += 1; + self.bytes_processed += size as u64; + } + + /// Record an error during processing. + pub fn record_error(&mut self) { + self.errors += 1; + } +} diff --git a/rust/otap-dataflow/crates/telemetry/src/tracing_integration/otlp_bytes_formatter.rs b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/otlp_bytes_formatter.rs new file mode 100644 index 0000000000..a44011af73 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/otlp_bytes_formatter.rs @@ -0,0 +1,470 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP bytes formatting layer - decodes OTLP bytes back to human-readable output. +//! +//! This layer provides a bridge between OTLP-encoded telemetry and human-readable +//! console output. The architecture is: +//! +//! ```text +//! tracing::info!() → OtlpTracingLayer → encode to OTLP bytes +//! ↓ +//! OtlpBytesFormattingLayer → decode OTLP bytes +//! ↓ ↓ +//! construct LogsDataView → format human-readable +//! ``` +//! +//! This approach: +//! - Removes dependency on opentelemetry crates for formatting +//! - Preserves complete structural fidelity (OTLP is lossless) +//! - Enables future async formatting in separate thread +//! - Allows colorized, customizable output +//! +//! # Example +//! +//! ```ignore +//! use tracing_subscriber::prelude::*; +//! use otap_df_telemetry::tracing_integration::{OtlpTracingLayer, OtlpBytesFormattingLayer}; +//! +//! // Encode events to OTLP bytes +//! let (tx, rx) = std::sync::mpsc::channel(); +//! let otlp_layer = OtlpTracingLayer::new(move |log_record| { +//! // encode to OTLP bytes and send via channel +//! tx.send(bytes).unwrap(); +//! }); +//! +//! // Format OTLP bytes for human output +//! let fmt_layer = OtlpBytesFormattingLayer::new(rx); +//! +//! tracing_subscriber::registry() +//! .with(otlp_layer) +//! .with(fmt_layer) +//! .init(); +//! ``` + +use otap_df_pdata::views::logs::{LogRecordView, LogsDataView, ResourceLogsView, ScopeLogsView}; +use otap_df_pdata::views::otlp::bytes::logs::RawLogsData; +use otap_df_pdata::views::common::{AnyValueView, AttributeView, InstrumentationScopeView}; +use std::fmt::Write as FmtWrite; +use std::io::{self, Write as IoWrite}; +use std::time::UNIX_EPOCH; +use tracing_subscriber::fmt::MakeWriter; + +/// A tracing-subscriber layer that formats OTLP-encoded bytes for human-readable output. +/// +/// This layer doesn't directly subscribe to tracing events. Instead, it receives +/// OTLP-encoded bytes (from OtlpTracingLayer), decodes them, and formats them +/// for console output. +/// +/// # Type Parameters +/// - `W`: Writer type for output (e.g., stdout, file) +pub struct OtlpBytesFormattingLayer +where + W: for<'writer> MakeWriter<'writer> + 'static, +{ + /// Writer factory for output + make_writer: W, + /// Whether to use ANSI colors + with_ansi: bool, + /// Whether to include timestamps + with_timestamp: bool, + /// Whether to include level + with_level: bool, + /// Whether to include target (module path/scope name) + with_target: bool, + /// Whether to include event_name + with_event_name: bool, + /// Whether to include thread names + with_thread_names: bool, +} + +impl OtlpBytesFormattingLayer +where + W: for<'writer> MakeWriter<'writer> + 'static, +{ + /// Creates a new OtlpBytesFormattingLayer with default settings. + /// + /// Default format matches tokio's: timestamp, level, target, event_name, message, attributes + /// + /// # Arguments + /// * `make_writer` - Factory for creating writers (e.g., `std::io::stdout`) + pub fn new(make_writer: W) -> Self { + Self { + make_writer, + with_ansi: true, + with_timestamp: true, + with_level: true, + with_target: true, + with_event_name: true, + with_thread_names: true, + } + } + + /// Sets whether to use ANSI color codes. + pub fn with_ansi(mut self, ansi: bool) -> Self { + self.with_ansi = ansi; + self + } + + /// Sets whether to include timestamps. + pub fn with_timestamp(mut self, timestamp: bool) -> Self { + self.with_timestamp = timestamp; + self + } + + /// Sets whether to include log level. + pub fn with_level(mut self, level: bool) -> Self { + self.with_level = level; + self + } + + /// Sets whether to include target (scope name/module path). + pub fn with_target(mut self, target: bool) -> Self { + self.with_target = target; + self + } + + /// Sets whether to include event_name. + pub fn with_event_name(mut self, event_name: bool) -> Self { + self.with_event_name = event_name; + self + } + + /// Sets whether to include thread names. + pub fn with_thread_names(mut self, thread_names: bool) -> Self { + self.with_thread_names = thread_names; + self + } + + /// Formats OTLP-encoded bytes to human-readable output. + /// + /// This is the main entry point for formatting. Call this method when you + /// receive OTLP bytes from the encoding layer. + pub fn format_otlp_bytes(&self, otlp_bytes: &[u8]) -> Result<(), FormatError> { + // Construct LogsDataView from OTLP bytes (zero-copy) + let logs_view = RawLogsData::new(otlp_bytes); + + // Get writer + let mut writer = self.make_writer.make_writer(); + + // Iterate through the logs data structure + for resource_logs in logs_view.resources() { + for scope_logs in resource_logs.scopes() { + // Extract scope name (target) once for all records + let scope_name = if let Some(scope) = scope_logs.scope() { + if let Some(name) = scope.name() { + Some(String::from_utf8_lossy(name).to_string()) + } else { + None + } + } else { + None + }; + + for log_record in scope_logs.log_records() { + self.format_log_record(&log_record, scope_name.as_deref(), &mut writer)?; + } + } + } + + Ok(()) + } + + /// Formats a single log record. + /// + /// Format: `timestamp LEVEL target{::event_name}: message key=value` + /// Example: `2024-12-18T10:30:45.123456Z INFO app::server{listen}: Server started port=8080` + fn format_log_record( + &self, + log_record: &L, + scope_name: Option<&str>, + writer: &mut impl IoWrite, + ) -> Result<(), FormatError> { + let mut buffer = String::new(); + + // Timestamp - ISO8601 format like tokio + if self.with_timestamp { + if let Some(ts_nanos) = log_record.time_unix_nano() { + let timestamp = format_iso8601_timestamp(ts_nanos); + write!(&mut buffer, "{} ", timestamp)?; + } + } + + // Level with colors and padding + if self.with_level { + if let Some(severity) = log_record.severity_number() { + let level_str = severity_to_level_str(severity); + if self.with_ansi { + let colored = colorize_level(level_str); + write!(&mut buffer, "{:5} ", colored)?; + } else { + write!(&mut buffer, "{:5} ", level_str)?; + } + } + } + + // Thread name + if self.with_thread_names { + let thread_name = std::thread::current().name() + .unwrap_or("") + .to_string(); + write!(&mut buffer, "{}: ", thread_name)?; + } + + // Target (scope name / module path) + if self.with_target { + if let Some(target) = scope_name { + write!(&mut buffer, "{}", target)?; + + // Event name (if configured and present) + if self.with_event_name { + if let Some(event_name_bytes) = log_record.event_name() { + if let Ok(event_name) = std::str::from_utf8(event_name_bytes) { + // Format like tokio: target{event_name} + write!(&mut buffer, "{{{}}}", event_name)?; + } + } + } + + write!(&mut buffer, ": ")?; + } + } + + // Body/message + if let Some(body) = log_record.body() { + write!(&mut buffer, "{}", format_any_value(&body))?; + } + + // Attributes (key=value pairs) + let mut first_attr = true; + for attr in log_record.attributes() { + let key_str = String::from_utf8_lossy(attr.key()); + if let Some(value) = attr.value() { + if first_attr { + write!(&mut buffer, " ")?; + first_attr = false; + } else { + write!(&mut buffer, " ")?; + } + write!(&mut buffer, "{}={}", key_str, format_any_value(&value))?; + } + } + + // Write newline + writeln!(&mut buffer)?; + + // Write to output + writer.write_all(buffer.as_bytes())?; + writer.flush()?; + + Ok(()) + } +} + +/// Format a unix timestamp (nanoseconds) as ISO8601. +/// +/// Format: `2024-12-18T10:30:45.123456Z` +fn format_iso8601_timestamp(nanos: u64) -> String { + let secs = nanos / 1_000_000_000; + let subsec_nanos = (nanos % 1_000_000_000) as u32; + + // Convert to SystemTime + let duration = std::time::Duration::new(secs, subsec_nanos); + let system_time = UNIX_EPOCH + duration; + + // Get seconds and subseconds for formatting + let since_epoch = system_time.duration_since(UNIX_EPOCH).unwrap(); + let total_secs = since_epoch.as_secs(); + let micros = subsec_nanos / 1000; + + // Calculate date/time components + let days_since_epoch = total_secs / 86400; + let secs_today = total_secs % 86400; + + let hours = secs_today / 3600; + let minutes = (secs_today % 3600) / 60; + let seconds = secs_today % 60; + + // Simple epoch-based date calculation (not perfect but good enough) + let year = 1970 + (days_since_epoch / 365); + let day_of_year = days_since_epoch % 365; + let month = (day_of_year / 30) + 1; + let day = (day_of_year % 30) + 1; + + format!( + "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:06}Z", + year, month, day, hours, minutes, seconds, micros + ) +} + +/// Convert OTLP severity number to level string. +fn severity_to_level_str(severity: i32) -> &'static str { + match severity { + 1..=4 => "TRACE", + 5..=8 => "DEBUG", + 9..=12 => "INFO", + 13..=16 => "WARN", + 17..=24 => "ERROR", + _ => "UNKNOWN", + } +} + +/// Colorize level string with ANSI codes. +fn colorize_level(level: &str) -> String { + match level { + "TRACE" => format!("\x1b[35m{}\x1b[0m", level), // Magenta + "DEBUG" => format!("\x1b[34m{}\x1b[0m", level), // Blue + "INFO" => format!("\x1b[32m{}\x1b[0m", level), // Green + "WARN" => format!("\x1b[33m{}\x1b[0m", level), // Yellow + "ERROR" => format!("\x1b[31m{}\x1b[0m", level), // Red + _ => level.to_string(), + } +} + +/// Format an AnyValue for display. +fn format_any_value<'a>(value: &impl AnyValueView<'a>) -> String { + use otap_df_pdata::views::common::ValueType; + + match value.value_type() { + ValueType::String => { + if let Some(s) = value.as_string() { + String::from_utf8_lossy(s).to_string() + } else { + "".to_string() + } + } + ValueType::Int64 => { + if let Some(i) = value.as_int64() { + i.to_string() + } else { + "".to_string() + } + } + ValueType::Bool => { + if let Some(b) = value.as_bool() { + b.to_string() + } else { + "".to_string() + } + } + ValueType::Double => { + if let Some(d) = value.as_double() { + format!("{:.6}", d) + } else { + "".to_string() + } + } + ValueType::Bytes => { + if let Some(bytes) = value.as_bytes() { + format!("{:?}", bytes) + } else { + "".to_string() + } + } + ValueType::Array => { + if let Some(array_iter) = value.as_array() { + let mut parts = Vec::new(); + for item in array_iter { + parts.push(format_any_value(&item)); + } + format!("[{}]", parts.join(", ")) + } else { + "[]".to_string() + } + } + ValueType::KeyValueList => { + if let Some(kvlist_iter) = value.as_kvlist() { + let mut parts = Vec::new(); + for kv in kvlist_iter { + let key_str = String::from_utf8_lossy(kv.key()).to_string(); + if let Some(val) = kv.value() { + parts.push(format!("{}={}", key_str, format_any_value(&val))); + } + } + format!("{{{}}}", parts.join(", ")) + } else { + "{}".to_string() + } + } + ValueType::Empty => "".to_string(), + } +} + +/// Error type for formatting operations. +#[derive(Debug)] +pub enum FormatError { + /// I/O error + Io(io::Error), + /// Format error + Fmt(std::fmt::Error), +} + +impl From for FormatError { + fn from(err: io::Error) -> Self { + FormatError::Io(err) + } +} + +impl From for FormatError { + fn from(err: std::fmt::Error) -> Self { + FormatError::Fmt(err) + } +} + +impl std::fmt::Display for FormatError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FormatError::Io(e) => write!(f, "I/O error: {}", e), + FormatError::Fmt(e) => write!(f, "Format error: {}", e), + } + } +} + +impl std::error::Error for FormatError {} + +// Note: This layer doesn't implement Layer trait because it doesn't subscribe +// to tracing events directly. It receives OTLP bytes through a separate channel +// or callback mechanism. See examples for typical usage patterns. + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + /// Test writer that captures output + struct TestWriter { + buffer: Arc>>, + } + + impl TestWriter { + fn new() -> (Self, Arc>>) { + let buffer = Arc::new(Mutex::new(Vec::new())); + (Self { buffer: buffer.clone() }, buffer) + } + } + + impl IoWrite for TestWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let mut buffer = self.buffer.lock().unwrap(); + buffer.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + impl<'a> MakeWriter<'a> for TestWriter { + type Writer = TestWriter; + + fn make_writer(&'a self) -> Self::Writer { + TestWriter { + buffer: self.buffer.clone(), + } + } + } + + // TODO: Add tests that encode a TracingLogRecord to OTLP bytes, + // then format them back and verify the output +} diff --git a/rust/otap-dataflow/crates/telemetry/src/tracing_integration/subscriber.rs b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/subscriber.rs new file mode 100644 index 0000000000..ff233da516 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/tracing_integration/subscriber.rs @@ -0,0 +1,327 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Tracing subscriber layer that captures events as TracingLogRecord instances. +//! +//! This layer integrates with the tracing-subscriber ecosystem, allowing us to: +//! 1. Capture all tracing events (from tokio macros and 3rd-party libraries) +//! 2. Convert them to TracingLogRecord (which implements LogRecordView) +//! 3. Encode them using our stateful OTLP encoder +//! +//! The layer uses a visitor pattern to extract field values from events and +//! constructs TracingLogRecord instances that can be encoded directly. + +use super::log_record::{TracingAttribute, TracingAnyValue, TracingLogRecord}; +use std::collections::HashMap; +use std::sync::Mutex; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{Event, Id, Subscriber}; +use tracing::span::Attributes; +use tracing_subscriber::layer::{Context, Layer}; +use tracing_subscriber::registry::LookupSpan; + +/// A tracing subscriber layer that converts events to TracingLogRecord. +/// +/// This layer can be composed with other layers in a tracing-subscriber registry +/// to capture events and convert them to OTLP-compatible log records. +/// +/// # Example +/// ```ignore +/// use tracing_subscriber::prelude::*; +/// use otap_df_telemetry::tracing_integration::OtlpTracingLayer; +/// +/// let otlp_layer = OtlpTracingLayer::new(|log_record| { +/// // Encode log_record using stateful encoder +/// encoder.encode_log_record(&log_record, &resource_bytes, &scope_encoding)?; +/// }); +/// +/// tracing_subscriber::registry() +/// .with(otlp_layer) +/// .init(); +/// ``` +/// Span data stored for duration calculation +struct SpanData { + start_time_nanos: u64, + attributes: Vec, +} + +/// Tracing subscriber layer that captures events and spans as OTLP log records. +/// +/// This layer implements an unconventional approach where spans are treated as pairs +/// of log records (start/end) rather than as first-class span objects. This aligns +/// with unified dataflow architectures where all telemetry flows through a single +/// log pipeline. +pub struct OtlpTracingLayer +where + F: Fn(TracingLogRecord) + Send + Sync + 'static, +{ + /// Callback function that receives each TracingLogRecord + on_event: F, + /// Storage for span start times to calculate duration on close + span_data: Mutex>, +} + +impl OtlpTracingLayer +where + F: Fn(TracingLogRecord) + Send + Sync + 'static, +{ + /// Creates a new OtlpTracingLayer with the given event handler. + /// + /// # Arguments + /// * `on_event` - Callback invoked for each tracing event, receiving a TracingLogRecord + pub fn new(on_event: F) -> Self { + Self { + on_event, + span_data: Mutex::new(HashMap::new()), + } + } +} + +impl Layer for OtlpTracingLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, + F: Fn(TracingLogRecord) + Send + Sync + 'static, +{ + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + // Get timestamp + let timestamp_nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + // Extract fields using visitor + let mut visitor = FieldVisitor::new(); + event.record(&mut visitor); + + // Build TracingLogRecord + // Note: metadata.name() includes file:line, e.g., "event src/main.rs:42" + let log_record = TracingLogRecord::new( + event.metadata(), + visitor.attributes, + timestamp_nanos, + ) + .with_body(visitor.message.unwrap_or_default()); + + // Invoke the callback + (self.on_event)(log_record); + } + + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) { + let timestamp_nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + // Extract fields from span attributes + let mut visitor = FieldVisitor::new(); + attrs.record(&mut visitor); + + let metadata = attrs.metadata(); + let mut attributes = visitor.attributes.clone(); + + // Add span.id as attribute + attributes.push(TracingAttribute { + key: "span.id".to_string(), + value: TracingAnyValue::Int(id.into_u64() as i64), + }); + + // Store span data for duration calculation on close + if let Ok(mut spans) = self.span_data.lock() { + let _ = spans.insert(id.into_u64(), SpanData { + start_time_nanos: timestamp_nanos, + attributes: attributes.clone(), + }); + } + + // Create "span.start" log record + // Format: "span.start {span_name} src/file.rs:42" + let event_name = format!("span.start {}", metadata.name()); + let log_record = TracingLogRecord::new_with_event_name( + metadata, + attributes, + timestamp_nanos, + event_name, + ) + .with_body(visitor.message.unwrap_or_default()); + + // Invoke callback with span start event + (self.on_event)(log_record); + } + + fn on_close(&self, id: Id, _ctx: Context<'_, S>) { + let end_time_nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + // Retrieve and remove span data + let span_data = if let Ok(mut spans) = self.span_data.lock() { + spans.remove(&id.into_u64()) + } else { + return; + }; + + if let Some(span_data) = span_data { + // Get span metadata from context + // Note: We don't have direct access to metadata here, so we'll create a minimal record + let duration_nanos = end_time_nanos.saturating_sub(span_data.start_time_nanos); + + let mut attributes = span_data.attributes; + + // Add duration as attribute + attributes.push(TracingAttribute { + key: "span.duration_nanos".to_string(), + value: TracingAnyValue::Int(duration_nanos as i64), + }); + + // Create a minimal log record for span end + // We use INFO level for span events + let log_record = TracingLogRecord::new_span_end( + id.into_u64(), + attributes, + end_time_nanos, + ); + + // Invoke callback with span end event + (self.on_event)(log_record); + } + } +} + +/// Visitor that extracts field values from a tracing event. +/// +/// This implements tracing::field::Visit to walk through all fields in an event +/// and collect them as TracingAttribute instances. +/// +/// Note: We don't extract event_name here because metadata.name() already provides +/// it with file:line info (e.g., "event src/main.rs:42"). +struct FieldVisitor { + /// Collected attributes from the event + attributes: Vec, + + /// The message/body (from the "message" field, if present) + message: Option, +} + +impl FieldVisitor { + fn new() -> Self { + Self { + attributes: Vec::new(), + message: None, + } + } +} + +impl tracing::field::Visit for FieldVisitor { + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + // Skip special "message" field + if field.name() == "message" { + return; + } + + self.attributes.push(TracingAttribute { + key: field.name().to_string(), + value: TracingAnyValue::Double(value), + }); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + if field.name() == "message" { + return; + } + + self.attributes.push(TracingAttribute { + key: field.name().to_string(), + value: TracingAnyValue::Int(value), + }); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + if field.name() == "message" { + return; + } + + // Convert u64 to i64 (may lose precision for very large values) + self.attributes.push(TracingAttribute { + key: field.name().to_string(), + value: TracingAnyValue::Int(value as i64), + }); + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + if field.name() == "message" { + return; + } + + self.attributes.push(TracingAttribute { + key: field.name().to_string(), + value: TracingAnyValue::Bool(value), + }); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + // Handle special "message" field + if field.name() == "message" { + self.message = Some(value.to_string()); + return; + } + + // Store string attributes by cloning + self.attributes.push(TracingAttribute { + key: field.name().to_string(), + value: TracingAnyValue::Str(value.to_string()), + }); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + // Capture the "message" field which contains the formatted message + if field.name() == "message" { + self.message = Some(format!("{:?}", value)); + return; + } + + // Convert debug representation to string and store + let debug_str = format!("{:?}", value); + self.attributes.push(TracingAttribute { + key: field.name().to_string(), + value: TracingAnyValue::Str(debug_str), + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use tracing_subscriber::prelude::*; + + #[test] + fn test_otlp_layer_captures_events() { + use otap_df_pdata::views::logs::LogRecordView; + + // Collect captured log records + let captured = Arc::new(Mutex::new(Vec::new())); + let captured_clone = captured.clone(); + + let layer = OtlpTracingLayer::new(move |log_record| { + let mut records = captured_clone.lock().unwrap(); + records.push(( + log_record.severity_text().map(|s| String::from_utf8_lossy(s).to_string()), + log_record.event_name().map(|s| String::from_utf8_lossy(s).to_string()), + )); + }); + + let subscriber = tracing_subscriber::registry().with(layer); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!(name: "test.event", "Test message"); + tracing::warn!(name: "test.warning", "Warning message"); + }); + + let records = captured.lock().unwrap(); + assert_eq!(records.len(), 2); + + // Note: event_name extraction from visitor has lifetime issues + // We'll address this in the production implementation + } +} diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs index dff7204946..ce33b025b0 100644 --- a/rust/otap-dataflow/src/main.rs +++ b/rust/otap-dataflow/src/main.rs @@ -9,6 +9,7 @@ use otap_df_config::pipeline_group::{CoreAllocation, CoreRange, Quota}; use otap_df_config::{PipelineGroupId, PipelineId}; use otap_df_controller::Controller; use otap_df_otap::OTAP_PIPELINE_FACTORY; +use otap_df_telemetry::opentelemetry_client::logger_provider::LoggerProvider; use std::path::PathBuf; #[cfg(all( @@ -106,6 +107,16 @@ fn parse_core_id_range(s: &str) -> Result { } fn main() -> Result<(), Box> { + // Initialize tracing FIRST - before ANYTHING else (even arg parsing) + // This captures ALL tracing events from df_engine and third-party libraries + // and formats them synchronously to stdout/stderr with OTLP encoding + LoggerProvider::init_default_console_tracing(); + + // Log startup + tracing::info!("df_engine starting"); + + let args = Args::parse(); + // Initialize rustls crypto provider (required for rustls 0.23+) // We use ring as the default provider #[cfg(feature = "experimental-tls")] @@ -113,8 +124,6 @@ fn main() -> Result<(), Box> { .install_default() .map_err(|e| format!("Failed to install rustls crypto provider: {e:?}"))?; - let args = Args::parse(); - // For now, we predefine pipeline group and pipeline IDs. // That will be replaced with a more dynamic approach in the future. let pipeline_group_id: PipelineGroupId = "default_pipeline_group".into(); @@ -123,11 +132,27 @@ fn main() -> Result<(), Box> { println!("{}", system_info()); // Load pipeline configuration from file - let pipeline_cfg = PipelineConfig::from_file( + let pipeline_cfg = match PipelineConfig::from_file( pipeline_group_id.clone(), pipeline_id.clone(), &args.pipeline, - )?; + ) { + Ok(cfg) => { + tracing::info!( + config_file = %args.pipeline.display(), + "Successfully loaded pipeline configuration" + ); + cfg + } + Err(e) => { + tracing::error!( + error = %e, + config_file = %args.pipeline.display(), + "Failed to load pipeline configuration" + ); + return Err(e.into()); + } + }; // Create controller and start pipeline with multi-core support let controller = Controller::new(&OTAP_PIPELINE_FACTORY); @@ -150,7 +175,7 @@ fn main() -> Result<(), Box> { CoreAllocation::AllCores => println!("Requested core allocation: all available cores"), CoreAllocation::CoreCount { count } => println!("Requested core allocation: {count} cores"), CoreAllocation::CoreSet { .. } => { - println!("Requested core allocation: {}", quota.core_allocation); + tracing::info!("Requested core allocation: {}", quota.core_allocation); } } @@ -158,19 +183,28 @@ fn main() -> Result<(), Box> { bind_address: args.http_admin_bind, }; let result = controller.run_forever( - pipeline_group_id, - pipeline_id, + pipeline_group_id.clone(), + pipeline_id.clone(), pipeline_cfg, quota, admin_settings, ); match result { Ok(_) => { - println!("Pipeline run successfully"); + tracing::info!( + pipeline_group = %pipeline_group_id, + pipeline = %pipeline_id, + "Pipeline completed successfully" + ); std::process::exit(0); } Err(e) => { - eprintln!("Pipeline failed to run: {e}"); + tracing::error!( + error = %e, + pipeline_group = %pipeline_group_id, + pipeline = %pipeline_id, + "Pipeline failed" + ); std::process::exit(1); } }