diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index 455257e342..76332f970c 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -33,7 +33,10 @@ path = "src/main.rs" otap-df-config.workspace = true otap-df-controller.workspace = true otap-df-otap.workspace = true +otap-df-telemetry.workspace = true thiserror.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true quiver = { workspace = true, optional = true } serde_json.workspace = true clap.workspace = true @@ -147,6 +150,7 @@ sysinfo = "0.37" tempfile = "3" thiserror = "2.0.17" tracing = { version = ">=0.1.40", default-features = false } +tracing-core = { version = ">=0.1.32", default-features = false } tracing-subscriber = { version = "0.3", default-features = false } tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] } tokio-stream = "0.1.17" diff --git a/rust/otap-dataflow/configs/fake-batch-debug-noop.yaml b/rust/otap-dataflow/configs/fake-batch-debug-noop.yaml index 194334b4eb..b162bcef5c 100644 --- a/rust/otap-dataflow/configs/fake-batch-debug-noop.yaml +++ b/rust/otap-dataflow/configs/fake-batch-debug-noop.yaml @@ -51,3 +51,12 @@ nodes: kind: exporter plugin_urn: "urn:otel:noop:exporter" config: + +service: + telemetry: + logs: + level: "debug" + providers: + global: immediate + engine: immediate + output: direct diff --git a/rust/otap-dataflow/configs/internal-telemetry.yaml b/rust/otap-dataflow/configs/internal-telemetry.yaml new file mode 100644 index 0000000000..55d63863ce --- /dev/null +++ b/rust/otap-dataflow/configs/internal-telemetry.yaml @@ -0,0 +1,79 @@ +settings: + default_pipeline_ctrl_msg_channel_size: 100 + default_node_ctrl_msg_channel_size: 100 + default_pdata_channel_size: 100 + +nodes: + receiver: + kind: receiver + plugin_urn: "urn:otel:otap:fake_data_generator:receiver" + out_ports: + out_port: + destinations: + - debug + dispatch_strategy: round_robin + config: + traffic_config: + max_signal_count: 100000 + max_batch_size: 1000 + signals_per_second: 1000 + log_weight: 100 + registry_path: https://github.com/open-telemetry/semantic-conventions.git[model] + debug: + kind: processor + plugin_urn: "urn:otel:debug:processor" + out_ports: + out_port: + destinations: + - noop + dispatch_strategy: round_robin + config: + verbosity: basic + noop: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: {} + +# Internal telemetry pipeline - separate from main pipeline +# Uses hardcoded settings: single thread, no admin server +internal: + telemetry: + kind: receiver + plugin_urn: "urn:otel:internal:otlp:receiver" + out_ports: + out_port: + destinations: + - batch + dispatch_strategy: round_robin + config: {} + batch: + kind: processor + plugin_urn: "urn:otel:batch:processor" + out_ports: + out_port: + destinations: + - console + dispatch_strategy: round_robin + config: + otap: + min_size: 1000 + sizer: items + flush_timeout: 3s + format: preserve + console: + kind: exporter + plugin_urn: "urn:otel:console:exporter" + config: {} + +service: + telemetry: + logs: + level: "debug" + providers: + global: immediate + engine: immediate + internal: raw # Avoid feedback in internal pipeline + output: internal + resource: + service.id: 1234 + service.name: test diff --git a/rust/otap-dataflow/crates/config/src/error.rs b/rust/otap-dataflow/crates/config/src/error.rs index 4b0f378f2b..a8f1436870 100644 --- a/rust/otap-dataflow/crates/config/src/error.rs +++ b/rust/otap-dataflow/crates/config/src/error.rs @@ -118,6 +118,34 @@ pub enum Error { /// The id of the pipeline that was duplicated. pipeline_id: PipelineId, }, + + /// A receiver in the internal telemetry pipeline has an invalid plugin URN. + /// Only Internal Telemetry Receivers (ITR) are allowed in the internal pipeline. + #[error( + "Invalid receiver in internal pipeline: node `{node_id}` has plugin_urn `{plugin_urn}`, \ + but only Internal Telemetry Receivers are allowed\nContext: {context}" + )] + #[diagnostic(code(data_plane::invalid_internal_receiver), url(docsrs))] + InvalidInternalReceiver { + /// The context in which the error occurred. + context: Context, + /// The node id of the invalid receiver. + node_id: NodeId, + /// The plugin URN of the invalid receiver. + plugin_urn: String, + }, + + /// The internal telemetry pipeline is required but not configured. + #[error( + "Internal telemetry pipeline required but not configured. \ + When output mode is 'internal', the `internal` section must be present \ + with at least one Internal Telemetry Receiver.\nContext: {context}" + )] + #[diagnostic(code(data_plane::missing_internal_pipeline), url(docsrs))] + MissingInternalPipeline { + /// The context in which the error occurred. + context: Context, + }, } /// Information that all errors provide to help identify diff --git a/rust/otap-dataflow/crates/config/src/pipeline.rs b/rust/otap-dataflow/crates/config/src/pipeline.rs index fcc49e482e..7f96b031f5 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline.rs @@ -10,6 +10,7 @@ use crate::health::HealthPolicy; use crate::node::{DispatchStrategy, HyperEdgeConfig, NodeKind, NodeUserConfig}; use crate::observed_state::ObservedStateSettings; use crate::pipeline::service::ServiceConfig; +use crate::pipeline::service::telemetry::logs::INTERNAL_TELEMETRY_RECEIVER_URN; use crate::{Description, NodeId, NodeUrn, PipelineGroupId, PipelineId, PortName}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -42,11 +43,18 @@ pub struct PipelineConfig { #[serde(default)] settings: PipelineSettings, - /// All nodes in this pipeline, keyed by node ID. + /// All nodes in this pipeline. /// /// Note: We use `Arc` to allow sharing the same pipeline configuration /// across multiple cores/threads without cloning the entire configuration. - nodes: HashMap>, + #[serde(default)] + nodes: PipelineNodes, + + /// Internal telemetry pipeline nodes. These have the same structure + /// as `nodes` but are independent and isolated to a separate internal + /// telemetry runtime. + #[serde(default, skip_serializing_if = "PipelineNodes::is_empty")] + internal: PipelineNodes, /// Service-level telemetry configuration. #[serde(default)] @@ -68,6 +76,184 @@ pub enum PipelineType { /// OpenTelemetry with Apache Arrow Protocol (OTAP) pipeline. Otap, } + +/// A collection of nodes forming a pipeline graph (hyper-DAG). One of +/// these is the main pipeline, and one is the internal telemetry pipeline. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] +#[serde(transparent)] +pub struct PipelineNodes(HashMap>); + +impl PipelineNodes { + /// Returns true if the node collection is empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Returns the number of nodes. + #[must_use] + pub fn len(&self) -> usize { + self.0.len() + } + + /// Returns a reference to the node with the given ID, if it exists. + #[must_use] + pub fn get(&self, id: &str) -> Option<&Arc> { + self.0.get(id) + } + + /// Returns true if a node with the given ID exists. + #[must_use] + pub fn contains_key(&self, id: &str) -> bool { + self.0.contains_key(id) + } + + /// Returns an iterator visiting all nodes. + pub fn iter(&self) -> impl Iterator)> { + self.0.iter() + } + + /// Returns an iterator over node IDs. + pub fn keys(&self) -> impl Iterator { + self.0.keys() + } + + /// Validate the node graph structure. + /// + /// Checks for: + /// - Invalid hyper-edges (missing target nodes) + /// - Cycles in the DAG + pub fn validate( + &self, + pipeline_group_id: &PipelineGroupId, + pipeline_id: &PipelineId, + errors: &mut Vec, + ) { + self.validate_hyper_edges(pipeline_group_id, pipeline_id, errors); + + // Only check for cycles if no hyper-edge errors + if errors.is_empty() { + for cycle in self.detect_cycles() { + errors.push(Error::CycleDetected { + context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()), + nodes: cycle, + }); + } + } + } + + /// Validate hyper-edges (check that all destination nodes exist). + fn validate_hyper_edges( + &self, + pipeline_group_id: &PipelineGroupId, + pipeline_id: &PipelineId, + errors: &mut Vec, + ) { + for (node_id, node) in self.0.iter() { + for edge in node.out_ports.values() { + let missing_targets: Vec<_> = edge + .destinations + .iter() + .filter(|target| !self.0.contains_key(*target)) + .cloned() + .collect(); + + if !missing_targets.is_empty() { + errors.push(Error::InvalidHyperEdgeSpec { + context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()), + source_node: node_id.clone(), + missing_source: false, + details: Box::new(HyperEdgeSpecDetails { + target_nodes: edge.destinations.iter().cloned().collect(), + dispatch_strategy: edge.dispatch_strategy.clone(), + missing_targets, + }), + }); + } + } + } + } + + /// Detect cycles in the node graph. + fn detect_cycles(&self) -> Vec> { + fn visit( + node: &NodeId, + nodes: &HashMap>, + visiting: &mut HashSet, + visited: &mut HashSet, + current_path: &mut Vec, + cycles: &mut Vec>, + ) { + if visited.contains(node) { + return; + } + if visiting.contains(node) { + if let Some(pos) = current_path.iter().position(|n| n == node) { + cycles.push(current_path[pos..].to_vec()); + } + return; + } + _ = visiting.insert(node.clone()); + current_path.push(node.clone()); + + if let Some(n) = nodes.get(node) { + for edge in n.out_ports.values() { + for tgt in &edge.destinations { + visit(tgt, nodes, visiting, visited, current_path, cycles); + } + } + } + + _ = visiting.remove(node); + _ = visited.insert(node.clone()); + _ = current_path.pop(); + } + + let mut visiting = HashSet::new(); + let mut current_path = Vec::new(); + let mut visited = HashSet::new(); + let mut cycles = Vec::new(); + + for node in self.0.keys() { + if !visited.contains(node) { + visit( + node, + &self.0, + &mut visiting, + &mut visited, + &mut current_path, + &mut cycles, + ); + } + } + + cycles + } +} + +impl std::ops::Index<&str> for PipelineNodes { + type Output = Arc; + + fn index(&self, id: &str) -> &Self::Output { + &self.0[id] + } +} + +impl IntoIterator for PipelineNodes { + type Item = (NodeId, Arc); + type IntoIter = std::collections::hash_map::IntoIter>; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl FromIterator<(NodeId, Arc)> for PipelineNodes { + fn from_iter)>>(iter: T) -> Self { + Self(iter.into_iter().collect()) + } +} + /// A configuration for a pipeline. #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct PipelineSettings { @@ -86,6 +272,8 @@ pub struct PipelineSettings { pub default_pdata_channel_size: usize, /// Observed state settings. + /// + /// TODO: consider this internal logging configuration? #[serde(default)] pub observed_state: ObservedStateSettings, @@ -100,6 +288,11 @@ pub struct PipelineSettings { /// /// This is distinct from `service.telemetry`, which configures exporting of OpenTelemetry /// signals to external backends. + /// + /// TODO: fix the overlap with service::telemetry! This has only + /// metric configuration That has OpenTelemetry-declarative mixed + /// with this PR's LoggingProviders{3xProviderMode} and OutputMode + /// choics. #[serde(default)] pub telemetry: TelemetrySettings, } @@ -262,6 +455,12 @@ impl PipelineConfig { &self.settings } + /// Returns a reference to the main pipeline nodes. + #[must_use] + pub fn nodes(&self) -> &PipelineNodes { + &self.nodes + } + /// Returns an iterator visiting all nodes in the pipeline. pub fn node_iter(&self) -> impl Iterator)> { self.nodes.iter() @@ -278,13 +477,67 @@ impl PipelineConfig { &self.service } + /// Returns true if the internal telemetry pipeline is configured. + #[must_use] + pub fn has_internal_pipeline(&self) -> bool { + !self.internal.is_empty() + } + + /// Returns a reference to the internal pipeline nodes. + #[must_use] + pub fn internal_nodes(&self) -> &PipelineNodes { + &self.internal + } + + /// Returns an iterator visiting all nodes in the internal telemetry pipeline. + pub fn internal_node_iter(&self) -> impl Iterator)> { + self.internal.iter() + } + + /// Extracts the internal pipeline as a separate, independent `PipelineConfig`. + /// + /// This creates a complete pipeline configuration from the internal nodes, + /// with hardcoded settings appropriate for internal telemetry processing. + #[must_use] + pub fn extract_internal_config(&self) -> Option { + if self.internal.is_empty() { + return None; + } + + Some(PipelineConfig { + r#type: self.r#type.clone(), + settings: Self::internal_pipeline_settings(), + nodes: self.internal.clone(), + internal: PipelineNodes::default(), + service: ServiceConfig::default(), + }) + } + + /// Returns hardcoded settings for the internal telemetry pipeline. + /// + /// TODO: these are hard-coded, add configurability. + #[must_use] + pub fn internal_pipeline_settings() -> PipelineSettings { + PipelineSettings { + default_node_ctrl_msg_channel_size: 50, + default_pipeline_ctrl_msg_channel_size: 50, + default_pdata_channel_size: 50, + observed_state: ObservedStateSettings::default(), + health_policy: HealthPolicy::default(), + telemetry: TelemetrySettings { + pipeline_metrics: false, + tokio_metrics: false, + channel_metrics: false, + }, + } + } + /// Validate the pipeline specification. /// /// This method checks for: - /// - Duplicate node IDs - /// - Duplicate out-ports (same source node + port name) - /// - Invalid hyper-edges (missing source or target nodes) + /// - Invalid hyper-edges (missing target nodes) /// - Cycles in the DAG + /// - Internal pipeline receivers are only ITR nodes pub fn validate( &self, pipeline_group_id: &PipelineGroupId, @@ -292,41 +545,28 @@ impl PipelineConfig { ) -> Result<(), Error> { let mut errors = Vec::new(); - // Check for invalid hyper-edges (references to non-existent nodes) - for (node_id, node) in self.nodes.iter() { - for edge in node.out_ports.values() { - let mut missing_targets = Vec::new(); - - for target in &edge.destinations { - if !self.nodes.contains_key(target) { - missing_targets.push(target.clone()); - } - } - - if !missing_targets.is_empty() { - errors.push(Error::InvalidHyperEdgeSpec { + // Validate main pipeline + self.nodes + .validate(pipeline_group_id, pipeline_id, &mut errors); + + // Validate internal pipeline if present + if !self.internal.is_empty() { + // Check that receivers in internal pipeline are only ITR nodes + for (node_id, node) in self.internal.iter() { + if node.kind == NodeKind::Receiver + && node.plugin_urn.as_ref() != INTERNAL_TELEMETRY_RECEIVER_URN + { + errors.push(Error::InvalidInternalReceiver { context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()), - source_node: node_id.clone(), - missing_source: false, // source exists since we're iterating over nodes - details: Box::new(HyperEdgeSpecDetails { - target_nodes: edge.destinations.iter().cloned().collect(), - dispatch_strategy: edge.dispatch_strategy.clone(), - missing_targets, - }), + node_id: node_id.clone(), + plugin_urn: node.plugin_urn.to_string(), }); } } - } - // Check for cycles if no errors so far - if errors.is_empty() { - let cycles = self.detect_cycles(); - for cycle in cycles { - errors.push(Error::CycleDetected { - context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()), - nodes: cycle, - }); - } + // Validate internal pipeline graph structure + self.internal + .validate(pipeline_group_id, pipeline_id, &mut errors); } if !errors.is_empty() { @@ -335,62 +575,6 @@ impl PipelineConfig { Ok(()) } } - - fn detect_cycles(&self) -> Vec> { - fn visit( - node: &NodeId, - nodes: &HashMap>, - visiting: &mut HashSet, - visited: &mut HashSet, - current_path: &mut Vec, - cycles: &mut Vec>, - ) { - if visited.contains(node) { - return; - } - if visiting.contains(node) { - // Cycle found - if let Some(pos) = current_path.iter().position(|n| n == node) { - cycles.push(current_path[pos..].to_vec()); - } - return; - } - _ = visiting.insert(node.clone()); - current_path.push(node.clone()); - - if let Some(n) = nodes.get(node) { - for edge in n.out_ports.values() { - for tgt in &edge.destinations { - visit(tgt, nodes, visiting, visited, current_path, cycles); - } - } - } - - _ = visiting.remove(node); - _ = visited.insert(node.clone()); - _ = current_path.pop(); - } - - let mut visiting = HashSet::new(); - let mut current_path = Vec::new(); - let mut visited = HashSet::new(); - let mut cycles = Vec::new(); - - for node in self.nodes.keys() { - if !visited.contains(node) { - visit( - node, - &self.nodes, - &mut visiting, - &mut visited, - &mut current_path, - &mut cycles, - ); - } - } - - cycles - } } /// A builder for constructing a [`PipelineConfig`]. @@ -656,6 +840,7 @@ impl PipelineConfigBuilder { .into_iter() .map(|(id, node)| (id, Arc::new(node))) .collect(), + internal: PipelineNodes::default(), settings: PipelineSettings::default(), r#type: pipeline_type, service: ServiceConfig::default(), diff --git a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry.rs b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry.rs index 07edce8d85..7323f57554 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry.rs @@ -55,6 +55,7 @@ fn default_reporting_interval() -> Duration { } /// Attribute value types for telemetry resource attributes. +/// TODO: Replace with OTLP AnyValue? #[derive(Debug, Clone, PartialEq, Serialize, JsonSchema)] pub enum AttributeValue { /// String type attribute value. diff --git a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs index 544e160119..d435f36020 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs @@ -5,34 +5,45 @@ pub mod processors; +use crate::error::Error; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +/// Internal Telemetry Receiver node URN for internal logging using OTLP bytes. +pub const INTERNAL_TELEMETRY_RECEIVER_URN: &str = "urn:otel:internal:otlp:receiver"; + /// Internal logs configuration. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct LogsConfig { - /// The log level for internal engine logs. - #[serde(default)] + /// The log level for internal logs. + #[serde(default = "default_level")] pub level: LogLevel, - /// The list of log processors to configure. + /// Logging provider configuration. + #[serde(default = "default_providers")] + pub providers: LoggingProviders, + + /// What to do with collected log events. This applies when any ProviderMode + /// in providers indicates Buffered or Unbuffered. Does not apply if all + /// providers are in [Noop, Raw, OpenTelemetry]. + #[serde(default = "default_output")] + pub output: OutputMode, + + /// OpenTelemetry SDK is configured via processors. #[serde(default)] pub processors: Vec, } -/// Log level for internal engine logs. -/// -/// TODO: Change default to `Info` once per-thread subscriber is implemented -/// to avoid contention from the global tracing subscriber. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default, PartialEq)] +/// Log level for dataflow engine logs. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, Default, PartialEq)] #[serde(rename_all = "lowercase")] pub enum LogLevel { /// Logging is completely disabled. - #[default] Off, /// Debug level logging. Debug, /// Info level logging. + #[default] Info, /// Warn level logging. Warn, @@ -40,39 +51,148 @@ pub enum LogLevel { Error, } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_logs_config_deserialize() { - let yaml_str = r#" - level: "info" - processors: - - batch: - exporter: - console: - "#; - let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap(); - assert_eq!(config.level, LogLevel::Info); - assert_eq!(config.processors.len(), 1); +/// Logging providers for different execution contexts. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct LoggingProviders { + /// Provider mode for non-engine threads. This defines the global Tokio + /// `tracing` subscriber. Default is Unbuffered. Note that Buffered + /// requires opt-in thread-local setup. + pub global: ProviderMode, + + /// Provider mod for engine/pipeline threads. This defines how the + /// engine thread / core sets the Tokio `tracing` + /// subscriber. Default is Buffered. Internal logs will be flushed + /// by either the Internal Telemetry Receiver or the main pipeline + /// controller. + pub engine: ProviderMode, + + /// Provider mode for nodes downstream of Internal Telemetry receiver. + /// This defaults to Noop to avoid internal feedback. + #[serde(default = "default_internal_provider")] + pub internal: ProviderMode, +} + +/// Logs producer: how log events are captured and routed. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ProviderMode { + /// Log events are silently ignored. + Noop, + + /// Immediate delivery to the internal telemetry pipeline. + Immediate, + + /// Use OTel-Rust as the provider. + OpenTelemetry, + + /// Synchronous console logging. Note! This can block the producing thread. + Raw, +} + +impl ProviderMode { + /// Returns true if this requires a LogsReporter channel for + /// asynchronous logging. + #[must_use] + pub fn needs_reporter(&self) -> bool { + matches!(self, Self::Immediate) + } +} + +/// Output mode: what the recipient does with received events for +/// provider logging modes. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)] +#[serde(rename_all = "lowercase")] +pub enum OutputMode { + /// Noop prevents the use of the Unbuffered mode. This output mode + /// can be set when all providers are configured to avoid the + /// internal output configuration through Noop, Raw, or + /// OpenTelemetry settings. + Noop, + + /// Direct console logging: format and print directly to console + /// (stdout/stderr) from the logs collector thread, bypasses any + /// internal use of the dataflow engine. ERROR and WARN go to + /// stderr, others to stdout. + #[default] + Direct, + + /// Route to the internal telemetry pipeline. + Internal, +} + +fn default_output() -> OutputMode { + OutputMode::Direct +} + +fn default_level() -> LogLevel { + LogLevel::Info +} + +fn default_internal_provider() -> ProviderMode { + ProviderMode::Noop +} + +fn default_providers() -> LoggingProviders { + LoggingProviders { + global: ProviderMode::Immediate, + engine: ProviderMode::Immediate, + internal: default_internal_provider(), } +} - #[test] - fn test_log_level_deserialize() { - let yaml_str = r#" - level: "info" - "#; - let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap(); - assert_eq!(config.level, LogLevel::Info); +impl Default for LogsConfig { + fn default() -> Self { + Self { + level: default_level(), + providers: default_providers(), + output: default_output(), + processors: Vec::new(), + } } +} + +impl LogsConfig { + /// Validate the logs configuration. + /// + /// Returns an error if: + /// - `output` is `Noop` but a provider uses `Immediate` + /// (logs would be sent but discarded) + /// - `engine` is `OpenTelemetry` but `global` is not + /// (current implementation restriction: the SDK logger provider is only + /// configured when global uses OpenTelemetry) + pub fn validate(&self) -> Result<(), Error> { + if self.providers.internal.needs_reporter() { + return Err(Error::InvalidUserConfig { + error: format!( + "internal provider is invalid: {:?}", + self.providers.internal + ), + }); + } + if self.output == OutputMode::Noop { + let global_reports = self.providers.global.needs_reporter(); + let engine_reports = self.providers.engine.needs_reporter(); + + if global_reports || engine_reports { + return Err(Error::InvalidUserConfig { + error: "output mode is 'noop' but a provider uses an internal reporter".into(), + }); + } + } + + // Current implementation restriction: engine OpenTelemetry requires global OpenTelemetry. + // The SDK logger provider is only created when the global provider is OpenTelemetry. + // This could be lifted in the future by creating the logger provider independently. + if self.providers.engine == ProviderMode::OpenTelemetry + && self.providers.global != ProviderMode::OpenTelemetry + { + return Err(Error::InvalidUserConfig { + error: "engine provider 'opentelemetry' requires global provider to also be \ + 'opentelemetry' (current implementation restriction)" + .into(), + }); + } - #[test] - fn test_logs_config_default_deserialize() -> Result<(), serde_yaml::Error> { - let yaml_str = r#""#; - let config: LogsConfig = serde_yaml::from_str(yaml_str)?; - assert_eq!(config.level, LogLevel::Off); - assert!(config.processors.is_empty()); Ok(()) } } diff --git a/rust/otap-dataflow/crates/controller/Cargo.toml b/rust/otap-dataflow/crates/controller/Cargo.toml index ad214acc03..fc379825bd 100644 --- a/rust/otap-dataflow/crates/controller/Cargo.toml +++ b/rust/otap-dataflow/crates/controller/Cargo.toml @@ -27,3 +27,4 @@ miette = { workspace = true } core_affinity = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } +tracing = { workspace = true } diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 0d6129cda7..13bae0f3e8 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -21,6 +21,7 @@ use crate::error::Error; use crate::thread_task::spawn_thread_local_task; use core_affinity::CoreId; use otap_df_config::engine::HttpAdminSettings; +use otap_df_config::pipeline::service::telemetry::logs::OutputMode; use otap_df_config::{ PipelineGroupId, PipelineId, pipeline::PipelineConfig, @@ -32,13 +33,17 @@ use otap_df_engine::control::{ PipelineCtrlMsgReceiver, PipelineCtrlMsgSender, pipeline_ctrl_msg_channel, }; use otap_df_engine::error::{Error as EngineError, error_summary_from}; +use otap_df_engine::receiver::InternalTelemetrySettings; use otap_df_state::DeployedPipelineKey; use otap_df_state::event::{ErrorSummary, ObservedEvent}; use otap_df_state::reporter::ObservedEventReporter; use otap_df_state::store::ObservedStateStore; -use otap_df_telemetry::opentelemetry_client::OpentelemetryClient; +use otap_df_telemetry::logs::{DirectCollector, TelemetrySetup}; use otap_df_telemetry::reporter::MetricsReporter; +use otap_df_telemetry::self_tracing::ConsoleWriter; +use otap_df_telemetry::telemetry_runtime::TelemetryRuntime; use otap_df_telemetry::{InternalTelemetrySystem, otel_info, otel_info_span, otel_warn}; +use std::sync::mpsc as std_mpsc; use std::thread; /// Error types and helpers for the controller module. @@ -83,7 +88,36 @@ impl Controller { node_ctrl_msg_channel_size = settings.default_node_ctrl_msg_channel_size, pipeline_ctrl_msg_channel_size = settings.default_pipeline_ctrl_msg_channel_size ); - let opentelemetry_client = OpentelemetryClient::new(telemetry_config)?; + + // Validate logs configuration. + // TODO: add metrics, validate the whole config. + telemetry_config + .logs + .validate() + .map_err(|err| Error::InvalidConfiguration { + errors: [err].into(), + })?; + + // Create telemetry runtime according to the various options. + let mut telemetry_runtime = TelemetryRuntime::new(telemetry_config)?; + + let direct_collector = (telemetry_config.logs.output == OutputMode::Direct).then(|| { + DirectCollector::new( + ConsoleWriter::color(), + telemetry_runtime.take_logs_receiver().expect("valid"), + ) + }); + + // Start the logs collector thread if needed for direct output. + let _logs_collector_handle = if telemetry_config.logs.output == OutputMode::Direct { + Some(spawn_thread_local_task( + "logs-collector", + move |_cancellation_token| direct_collector.expect("ok").run(), + )?) + } else { + None + }; + let metrics_system = InternalTelemetrySystem::new(telemetry_config); let metrics_dispatcher = metrics_system.dispatcher(); let metrics_reporter = metrics_system.reporter(); @@ -115,6 +149,102 @@ impl Controller { obs_state_store.run(cancellation_token) })?; + // Create telemetry setup for engine and internal pipelines from provider configuration. + let engine_telemetry_setup = + telemetry_runtime.telemetry_setup_for(telemetry_config.logs.providers.engine); + let internal_telemetry_setup = + telemetry_runtime.telemetry_setup_for(telemetry_config.logs.providers.internal); + let log_level = telemetry_config.logs.level; + + // Spawn internal telemetry pipeline thread, if configured. + let _internal_pipeline_thread = if let Some(internal_config) = + pipeline.extract_internal_config() + { + // Take internal telemetry settings (logs receiver + resource bytes) if available + let internal_telemetry_settings = telemetry_runtime.take_internal_telemetry_settings(); + let internal_factory = self.pipeline_factory; + let internal_pipeline_id: PipelineId = "internal".into(); + let internal_pipeline_key = DeployedPipelineKey { + pipeline_group_id: pipeline_group_id.clone(), + pipeline_id: internal_pipeline_id.clone(), + core_id: 0, + }; + let internal_pipeline_ctx = controller_ctx.pipeline_context_with( + pipeline_group_id.clone(), + internal_pipeline_id.clone(), + 0, + 0, + ); + let internal_obs_evt_reporter = obs_evt_reporter.clone(); + let internal_metrics_reporter = metrics_reporter.clone(); + + // Create control message channel for internal pipeline + let (internal_ctrl_tx, internal_ctrl_rx) = pipeline_ctrl_msg_channel( + internal_config + .pipeline_settings() + .default_pipeline_ctrl_msg_channel_size, + ); + + // Create a channel to signal startup success/failure + // This allows us to fail fast if the internal pipeline can't build + let (startup_tx, startup_rx) = std_mpsc::channel::>(); + + let thread_name = "internal-pipeline".to_string(); + let internal_telemetry_setup = internal_telemetry_setup.clone(); + let handle = thread::Builder::new() + .name(thread_name.clone()) + .spawn(move || { + Self::run_internal_pipeline_thread( + internal_pipeline_key, + CoreId { id: 0 }, // No pinning for internal pipeline + internal_config, + internal_factory, + internal_pipeline_ctx, + internal_obs_evt_reporter, + internal_metrics_reporter, + internal_telemetry_setup, + log_level, // TODO: separate log level for internal pipeline. + internal_telemetry_settings, + internal_ctrl_tx, + internal_ctrl_rx, + startup_tx, + ) + }) + .map_err(|e| Error::ThreadSpawnError { + thread_name: thread_name.clone(), + source: e, + })?; + + // Wait for the internal pipeline to signal successful startup + // This ensures we fail fast with a clear error if the internal pipeline can't build + match startup_rx.recv() { + Ok(Ok(())) => { + // Internal pipeline built successfully and is running + } + Ok(Err(e)) => { + // Internal pipeline failed to build - propagate the error + return Err(e); + } + Err(err) => { + // Channel closed unexpectedly - thread may have panicked + return Err(Error::PipelineRuntimeError{source: Box::new(err)}) + } + } + + Some((thread_name, handle)) + } else { + None + }; + + // Initialize the global subscriber AFTER the internal pipeline has signaled successful startup. + // This ensures the channel receiver is being consumed before we start sending logs. + telemetry_runtime.init_global_subscriber(); + + otel_info!( + "InternalPipeline.Started", + num_nodes = pipeline.internal_nodes().len() + ); + // Start one thread per requested core // Get available CPU cores for pinning let requested_cores = Self::select_cores_for_quota( @@ -148,6 +278,7 @@ impl Controller { thread_id, ); let metrics_reporter = metrics_reporter.clone(); + let telemetry_setup = engine_telemetry_setup.clone(); let thread_name = format!("pipeline-core-{}", core_id.id); let obs_evt_reporter = obs_evt_reporter.clone(); @@ -162,6 +293,8 @@ impl Controller { pipeline_handle, obs_evt_reporter, metrics_reporter, + telemetry_setup, + log_level, pipeline_ctrl_msg_tx, pipeline_ctrl_msg_rx, ) @@ -257,7 +390,7 @@ impl Controller { handle.shutdown_and_join()?; } obs_state_join_handle.shutdown_and_join()?; - opentelemetry_client.shutdown()?; + telemetry_runtime.shutdown()?; Ok(()) } @@ -379,54 +512,137 @@ impl Controller { pipeline_context: PipelineContext, obs_evt_reporter: ObservedEventReporter, metrics_reporter: MetricsReporter, + telemetry_setup: TelemetrySetup, + log_level: otap_df_config::pipeline::service::telemetry::logs::LogLevel, pipeline_ctrl_msg_tx: PipelineCtrlMsgSender, pipeline_ctrl_msg_rx: PipelineCtrlMsgReceiver, ) -> Result, Error> { - // Create a tracing span for this pipeline thread - // so that all logs within this scope include pipeline context. - let span = otel_info_span!("pipeline_thread", core.id = core_id.id); - let _guard = span.enter(); - - // Pin thread to specific core - if !core_affinity::set_for_current(core_id) { - // Continue execution even if pinning fails. - // This is acceptable because the OS will still schedule the thread, but performance may be less predictable. - otel_warn!( - "CoreAffinity.SetFailed", - message = "Failed to set core affinity for pipeline thread. Performance may be less predictable." - ); - } + // Run with the appropriate tracing subscriber for this pipeline. + telemetry_setup.with_subscriber(log_level, || { + // Create a tracing span for this pipeline thread + // so that all logs within this scope include pipeline context. + let span = otel_info_span!("pipeline_thread", core.id = core_id.id); + let _guard = span.enter(); + + // Pin thread to specific core + if !core_affinity::set_for_current(core_id) { + // Continue execution even if pinning fails. + // This is acceptable because the OS will still schedule the thread, but performance may be less predictable. + otel_warn!( + "CoreAffinity.SetFailed", + message = "Failed to set core affinity for pipeline thread. Performance may be less predictable." + ); + } - obs_evt_reporter.report(ObservedEvent::admitted( - pipeline_key.clone(), - Some("Pipeline admission successful.".to_owned()), - )); + obs_evt_reporter.report(ObservedEvent::admitted( + pipeline_key.clone(), + Some("Pipeline admission successful.".to_owned()), + )); + + // Build the runtime pipeline from the configuration + // Regular pipelines don't need ITR injection - that's only for the internal pipeline + let runtime_pipeline = pipeline_factory + .build(pipeline_context.clone(), pipeline_config.clone(), None) + .map_err(|e| { + Error::PipelineRuntimeError { + source: Box::new(e), + } + })?; - // Build the runtime pipeline from the configuration - let runtime_pipeline = pipeline_factory - .build(pipeline_context.clone(), pipeline_config.clone()) - .map_err(|e| Error::PipelineRuntimeError { - source: Box::new(e), - })?; + obs_evt_reporter.report(ObservedEvent::ready( + pipeline_key.clone(), + Some("Pipeline initialization successful.".to_owned()), + )); + + // Start the pipeline (this will use the current thread's Tokio runtime) + runtime_pipeline + .run_forever( + pipeline_key, + pipeline_context, + obs_evt_reporter, + metrics_reporter, + pipeline_ctrl_msg_tx, + pipeline_ctrl_msg_rx, + ) + .map_err(|e| Error::PipelineRuntimeError { + source: Box::new(e), + }) + }) + } - obs_evt_reporter.report(ObservedEvent::ready( - pipeline_key.clone(), - Some("Pipeline initialization successful.".to_owned()), - )); - - // Start the pipeline (this will use the current thread's Tokio runtime) - runtime_pipeline - .run_forever( - pipeline_key, - pipeline_context, - obs_evt_reporter, - metrics_reporter, - pipeline_ctrl_msg_tx, - pipeline_ctrl_msg_rx, - ) - .map_err(|e| Error::PipelineRuntimeError { - source: Box::new(e), - }) + /// Runs the internal telemetry pipeline in the current thread. + /// + /// This is similar to `run_pipeline_thread` but includes a startup signal channel + /// to notify the parent thread when the pipeline has successfully built and is ready + /// to receive logs. This allows the controller to fail fast with a clear error message + /// if the internal pipeline configuration is invalid. + #[allow(clippy::too_many_arguments)] + fn run_internal_pipeline_thread( + pipeline_key: DeployedPipelineKey, + core_id: CoreId, + pipeline_config: PipelineConfig, + pipeline_factory: &'static PipelineFactory, + pipeline_context: PipelineContext, + obs_evt_reporter: ObservedEventReporter, + metrics_reporter: MetricsReporter, + telemetry_setup: TelemetrySetup, + log_level: otap_df_config::pipeline::service::telemetry::logs::LogLevel, + internal_telemetry: Option, + pipeline_ctrl_msg_tx: PipelineCtrlMsgSender, + pipeline_ctrl_msg_rx: PipelineCtrlMsgReceiver, + startup_tx: std_mpsc::Sender>, + ) -> Result, Error> { + // Run with the appropriate tracing subscriber for this pipeline. + telemetry_setup.with_subscriber(log_level, || { + // Create a tracing span for this pipeline thread + let span = otel_info_span!("internal_pipeline_thread", core.id = core_id.id); + let _guard = span.enter(); + + // No core pinning for internal pipeline - it's lightweight + + obs_evt_reporter.report(ObservedEvent::admitted( + pipeline_key.clone(), + Some("Internal pipeline admission successful.".to_owned()), + )); + + // Build the runtime pipeline from the configuration + let runtime_pipeline = match pipeline_factory.build( + pipeline_context.clone(), + pipeline_config.clone(), + internal_telemetry, + ) { + Ok(pipeline) => pipeline, + Err(e) => { + // Send error to main thread and exit; main thread will propagate it + let _ = startup_tx.send(Err(Error::PipelineRuntimeError { + source: Box::new(e), + })); + return Ok(vec![]); + } + }; + + obs_evt_reporter.report(ObservedEvent::ready( + pipeline_key.clone(), + Some("Internal pipeline initialization successful.".to_owned()), + )); + + // Signal successful startup - the pipeline is built and ready to run + let _ = startup_tx.send(Ok(())); + + // Start the pipeline (this will use the current thread's Tokio runtime) + runtime_pipeline + .run_forever( + pipeline_key, + pipeline_context, + obs_evt_reporter, + metrics_reporter, + pipeline_ctrl_msg_tx, + pipeline_ctrl_msg_rx, + ) + .map_err(|e| Error::PipelineRuntimeError { + source: Box::new(e), + }) + }) } } diff --git a/rust/otap-dataflow/crates/controller/src/thread_task.rs b/rust/otap-dataflow/crates/controller/src/thread_task.rs index c3788ec130..e5a98a0257 100644 --- a/rust/otap-dataflow/crates/controller/src/thread_task.rs +++ b/rust/otap-dataflow/crates/controller/src/thread_task.rs @@ -4,6 +4,7 @@ //! Utilities to run a non-Send async task on a dedicated OS thread with a //! single-threaded Tokio runtime and LocalSet, plus a shutdown signal. +use otap_df_telemetry::raw_error; use std::future::Future; use std::thread; use tokio::{runtime::Builder as RtBuilder, task::LocalSet}; @@ -50,7 +51,6 @@ impl ThreadLocalTaskHandle { } impl Drop for ThreadLocalTaskHandle { - #[allow(clippy::print_stderr)] fn drop(&mut self) { // Best-effort, idempotent shutdown on drop. self.cancel_token.cancel(); @@ -64,17 +64,18 @@ impl Drop for ThreadLocalTaskHandle { Ok(Err(_)) => { // Task returned an error; can't propagate it from Drop, so just log. // ToDo Replace this eprintln once we have selected a logging solution - eprintln!( - "Thread '{}' finished with an error during drop; error suppressed", - self.name + raw_error!( + "Thread finished with an error during drop; error suppressed", + thread_name = &self.name ); } Err(panic) => { // Don't panic in Drop; report and suppress. // ToDo Replace this eprintln once we have selected a logging solution - eprintln!( - "Thread '{}' panicked during drop: {panic:?}; panic suppressed", - self.name + raw_error!( + "Thread panicked during drop; panic suppressed", + thread_name = &self.name, + panicked = ?panic ); } } diff --git a/rust/otap-dataflow/crates/engine/Cargo.toml b/rust/otap-dataflow/crates/engine/Cargo.toml index 8d46abfb00..2eafccf98a 100644 --- a/rust/otap-dataflow/crates/engine/Cargo.toml +++ b/rust/otap-dataflow/crates/engine/Cargo.toml @@ -41,6 +41,7 @@ uuid = { workspace = true } once_cell = { workspace = true } data-encoding = { workspace = true } prost = { workspace = true } +bytes = { workspace = true } libmimalloc-sys = { workspace = true } byte-unit = { workspace = true } cpu-time = { workspace = true } diff --git a/rust/otap-dataflow/crates/engine/src/lib.rs b/rust/otap-dataflow/crates/engine/src/lib.rs index 4519004855..13d3b5b197 100644 --- a/rust/otap-dataflow/crates/engine/src/lib.rs +++ b/rust/otap-dataflow/crates/engine/src/lib.rs @@ -25,6 +25,7 @@ use otap_df_config::{ PortName, node::{DispatchStrategy, NodeUserConfig}, pipeline::PipelineConfig, + pipeline::service::telemetry::logs::INTERNAL_TELEMETRY_RECEIVER_URN, }; use otap_df_telemetry::otel_debug; use std::borrow::Cow; @@ -296,6 +297,7 @@ impl PipelineFactory { self: &PipelineFactory, pipeline_ctx: PipelineContext, config: PipelineConfig, + internal_telemetry: Option, ) -> Result, Error> { let mut receivers = Vec::new(); let mut processors = Vec::new(); @@ -332,7 +334,7 @@ impl PipelineFactory { match node_config.kind { otap_df_config::node::NodeKind::Receiver => { - let wrapper = self.create_receiver( + let mut wrapper = self.create_receiver( &pipeline_ctx, &mut receiver_names, &mut nodes, @@ -340,6 +342,14 @@ impl PipelineFactory { name.clone(), node_config.clone(), )?; + + // Inject internal telemetry settings if this is the target node + if let Some(ref settings) = internal_telemetry { + if node_config.plugin_urn.as_ref() == INTERNAL_TELEMETRY_RECEIVER_URN { + wrapper.set_internal_telemetry(settings.clone()); + } + } + receivers.push(wrapper.with_control_channel_metrics( &pipeline_ctx, &mut channel_metrics, diff --git a/rust/otap-dataflow/crates/engine/src/local/receiver.rs b/rust/otap-dataflow/crates/engine/src/local/receiver.rs index e3b5caf8fb..9da335143c 100644 --- a/rust/otap-dataflow/crates/engine/src/local/receiver.rs +++ b/rust/otap-dataflow/crates/engine/src/local/receiver.rs @@ -37,6 +37,7 @@ use crate::effect_handler::{EffectHandlerCore, TelemetryTimerCancelHandle, Timer use crate::error::{Error, ReceiverErrorKind, TypedError}; use crate::local::message::LocalSender; use crate::node::NodeId; +use crate::receiver::InternalTelemetrySettings; use crate::terminal_state::TerminalState; use async_trait::async_trait; use otap_df_channel::error::RecvError; @@ -120,6 +121,9 @@ impl ControlChannel { } } +/// Type alias for the internal logs receiver channel. +pub type LogsReceiver = otap_df_telemetry::LogsReceiver; + /// A `!Send` implementation of the EffectHandler. #[derive(Clone)] pub struct EffectHandler { @@ -130,6 +134,8 @@ pub struct EffectHandler { msg_senders: HashMap>, /// Cached default sender for fast access in the hot path default_sender: Option>, + /// Internal telemetry settings (for internal telemetry receiver). + internal_telemetry: Option, } /// Implementation for the `!Send` effect handler. @@ -159,9 +165,27 @@ impl EffectHandler { core, msg_senders, default_sender, + internal_telemetry: None, } } + /// Sets the internal telemetry settings. + pub fn set_internal_telemetry(&mut self, settings: InternalTelemetrySettings) { + self.internal_telemetry = Some(settings); + } + + /// Returns the logs receiver, if configured. + #[must_use] + pub fn logs_receiver(&self) -> Option<&LogsReceiver> { + self.internal_telemetry.as_ref().map(|s| &s.logs_receiver) + } + + /// Returns the pre-encoded resource bytes, if configured. + #[must_use] + pub fn resource_bytes(&self) -> Option<&bytes::Bytes> { + self.internal_telemetry.as_ref().map(|s| &s.resource_bytes) + } + /// Returns the id of the receiver associated with this handler. #[must_use] pub fn receiver_id(&self) -> NodeId { diff --git a/rust/otap-dataflow/crates/engine/src/receiver.rs b/rust/otap-dataflow/crates/engine/src/receiver.rs index b1ea36d534..5c223b7819 100644 --- a/rust/otap-dataflow/crates/engine/src/receiver.rs +++ b/rust/otap-dataflow/crates/engine/src/receiver.rs @@ -27,6 +27,12 @@ use otap_df_telemetry::reporter::MetricsReporter; use std::collections::HashMap; use std::sync::Arc; +/// Type alias for the internal logs receiver channel. +pub type LogsReceiver = otap_df_telemetry::LogsReceiver; + +/// Re-export from telemetry crate for convenience. +pub use otap_df_telemetry::InternalTelemetrySettings; + /// A wrapper for the receiver that allows for both `Send` and `!Send` receivers. /// /// Note: This is useful for creating a single interface for the receiver regardless of their @@ -50,6 +56,8 @@ pub enum ReceiverWrapper { pdata_senders: HashMap>, /// A receiver for pdata messages. pdata_receiver: Option>, + /// Internal telemetry settings (for internal telemetry receiver). + internal_telemetry: Option, }, /// A receiver with a `Send` implementation. Shared { @@ -69,6 +77,8 @@ pub enum ReceiverWrapper { pdata_senders: HashMap>, /// A receiver for pdata messages. pdata_receiver: Option>, + /// Internal telemetry settings (for internal telemetry receiver). + internal_telemetry: Option, }, } @@ -108,6 +118,7 @@ impl ReceiverWrapper { control_receiver: LocalReceiver::mpsc(control_receiver), pdata_senders: HashMap::new(), pdata_receiver: None, + internal_telemetry: None, } } @@ -133,6 +144,7 @@ impl ReceiverWrapper { control_receiver: SharedReceiver::mpsc(control_receiver), pdata_senders: HashMap::new(), pdata_receiver: None, + internal_telemetry: None, } } @@ -155,7 +167,7 @@ impl ReceiverWrapper { receiver, pdata_senders, pdata_receiver, - .. + internal_telemetry, } => { let channel_id = control_channel_id(&node_id); let control_sender = match control_sender.into_mpsc() { @@ -189,6 +201,7 @@ impl ReceiverWrapper { control_receiver, pdata_senders, pdata_receiver, + internal_telemetry, } } ReceiverWrapper::Shared { @@ -200,7 +213,7 @@ impl ReceiverWrapper { receiver, pdata_senders, pdata_receiver, - .. + internal_telemetry, } => { let channel_id = control_channel_id(&node_id); let control_sender = match control_sender.into_mpsc() { @@ -234,6 +247,7 @@ impl ReceiverWrapper { control_receiver, pdata_senders, pdata_receiver, + internal_telemetry, } } } @@ -253,6 +267,7 @@ impl ReceiverWrapper { control_receiver, pdata_senders, user_config, + internal_telemetry, .. }, metrics_reporter, @@ -269,13 +284,17 @@ impl ReceiverWrapper { }; let default_port = user_config.default_out_port.clone(); let ctrl_msg_chan = local::ControlChannel::new(Receiver::Local(control_receiver)); - let effect_handler = local::EffectHandler::new( + let mut effect_handler = local::EffectHandler::new( node_id, msg_senders, default_port, pipeline_ctrl_msg_tx, metrics_reporter, ); + // Inject internal telemetry settings if configured + if let Some(settings) = internal_telemetry { + effect_handler.set_internal_telemetry(settings); + } receiver.start(ctrl_msg_chan, effect_handler).await } ( @@ -367,6 +386,31 @@ impl Node for ReceiverWrapper { } } +impl ReceiverWrapper { + /// Set the internal telemetry settings for this receiver. + /// + /// This is used by the Internal Telemetry Receiver to receive logs + /// from all threads via the logs channel. + pub fn set_internal_telemetry(&mut self, settings: InternalTelemetrySettings) { + match self { + ReceiverWrapper::Local { internal_telemetry, .. } => { + *internal_telemetry = Some(settings); + } + ReceiverWrapper::Shared { internal_telemetry, .. } => { + *internal_telemetry = Some(settings); + } + } + } + + /// Take the internal telemetry settings, if set. + pub fn take_internal_telemetry(&mut self) -> Option { + match self { + ReceiverWrapper::Local { internal_telemetry, .. } => internal_telemetry.take(), + ReceiverWrapper::Shared { internal_telemetry, .. } => internal_telemetry.take(), + } + } +} + impl NodeWithPDataSender for ReceiverWrapper { fn set_pdata_sender( &mut self, diff --git a/rust/otap-dataflow/crates/otap/src/console_exporter/formatter.rs b/rust/otap-dataflow/crates/otap/src/console_exporter/formatter.rs new file mode 100644 index 0000000000..3917bbb901 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/console_exporter/formatter.rs @@ -0,0 +1,644 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Hierarchical formatter for OTLP data with tree-style output. +//! +//! This module uses the shared `format_log_line` from `otap_df_telemetry::self_tracing` +//! to render OTLP log data in a hierarchical tree format. The tree structure is +//! inserted via the level-formatting callback, keeping timestamp left-aligned: +//! +//! ```text +//! 2026-01-14T18:29:09.645Z RESOURCE v1.Resource: [service.name=my-service] +//! 2026-01-14T18:29:09.645Z │ SCOPE v1.InstrumentationScope: [name=my-lib] +//! 2026-01-14T18:29:09.645Z │ └─ DEBUG event_name: body [attr=value] +//! ``` + +use otap_df_pdata::OtlpProtoBytes; +use otap_df_pdata::schema::{SpanId, TraceId}; +use otap_df_pdata::views::common::{ + AnyValueView, AttributeView, InstrumentationScopeView, Str, ValueType, +}; +use otap_df_pdata::views::logs::{LogRecordView, LogsDataView, ResourceLogsView, ScopeLogsView}; +use otap_df_pdata::views::otlp::bytes::logs::RawLogsData; +use otap_df_pdata::views::resource::ResourceView; +use otap_df_telemetry::self_tracing::{AnsiCode, BufWriter, ConsoleWriter, LOG_BUFFER_SIZE}; +use std::io::Write; + +/// Tree drawing characters for Unicode mode. +mod unicode_tree { + pub const VERTICAL: &str = "│"; + pub const TEE: &str = "├─"; + pub const CORNER: &str = "└─"; +} + +/// Tree drawing characters for ASCII mode. +mod ascii_tree { + pub const VERTICAL: &str = "|"; + pub const TEE: &str = "+-"; + pub const CORNER: &str = "\\-"; +} + +/// Tree drawing characters. +#[derive(Clone, Copy)] +struct TreeChars { + vertical: &'static str, + tee: &'static str, + corner: &'static str, +} + +impl TreeChars { + fn unicode() -> Self { + Self { + vertical: unicode_tree::VERTICAL, + tee: unicode_tree::TEE, + corner: unicode_tree::CORNER, + } + } + + fn ascii() -> Self { + Self { + vertical: ascii_tree::VERTICAL, + tee: ascii_tree::TEE, + corner: ascii_tree::CORNER, + } + } +} + +/// Hierarchical formatter for OTLP data. +pub struct HierarchicalFormatter { + writer: ConsoleWriter, + tree: TreeChars, +} + +impl HierarchicalFormatter { + /// Create a new hierarchical formatter. + #[must_use] + pub fn new(use_color: bool, use_unicode: bool) -> Self { + let writer = if use_color { + ConsoleWriter::color() + } else { + ConsoleWriter::no_color() + }; + let tree = if use_unicode { + TreeChars::unicode() + } else { + TreeChars::ascii() + }; + Self { writer, tree } + } + + /// Format logs from OTLP bytes. + pub fn format_logs_bytes(&self, bytes: &OtlpProtoBytes) { + if let OtlpProtoBytes::ExportLogsRequest(data) = bytes { + let logs_data = RawLogsData::new(data.as_ref()); + self.format_logs_data(&logs_data); + } + } + + /// Format logs from a LogsDataView. + fn format_logs_data(&self, logs_data: &L) { + for resource_logs in logs_data.resources() { + self.format_resource_logs(&resource_logs); + } + } + + /// Format a ResourceLogs with its nested scopes. + fn format_resource_logs(&self, resource_logs: &R) { + // Get first timestamp from nested log records + let first_ts = self.get_first_log_timestamp(resource_logs); + + // Always format resource line (even if empty) for consistent tree structure + match resource_logs.resource() { + Some(resource) => { + let view = ResourceLogView::new(&resource); + self.print_line(first_ts, "v1.Resource", &view, |w, cw| { + cw.write_ansi(w, AnsiCode::Cyan); + cw.write_ansi(w, AnsiCode::Bold); + let _ = w.write_all(b"RESOURCE"); + cw.write_ansi(w, AnsiCode::Reset); + let _ = w.write_all(b" "); + }); + } + None => { + self.print_resource_line(first_ts); + } + } + + // Format each scope + let scopes: Vec<_> = resource_logs.scopes().collect(); + let scope_count = scopes.len(); + for (i, scope_logs) in scopes.into_iter().enumerate() { + let is_last_scope = i == scope_count - 1; + self.format_scope_logs(&scope_logs, is_last_scope); + } + } + + /// Print a resource line with no attributes (used when resource is None). + fn print_resource_line(&self, timestamp_ns: u64) { + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let mut w = std::io::Cursor::new(buf.as_mut_slice()); + + self.writer.write_ansi(&mut w, AnsiCode::Dim); + ConsoleWriter::write_timestamp(&mut w, timestamp_ns); + self.writer.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b" "); + self.writer.write_ansi(&mut w, AnsiCode::Cyan); + self.writer.write_ansi(&mut w, AnsiCode::Bold); + let _ = w.write_all(b"RESOURCE"); + self.writer.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b" v1.Resource:\n"); + + let len = w.position() as usize; + let _ = std::io::stdout().write_all(&buf[..len]); + } + + /// Get the first timestamp from log records in a ResourceLogs. + fn get_first_log_timestamp(&self, resource_logs: &R) -> u64 { + for scope_logs in resource_logs.scopes() { + for log_record in scope_logs.log_records() { + if let Some(ts) = log_record.time_unix_nano() { + return ts; + } + if let Some(ts) = log_record.observed_time_unix_nano() { + return ts; + } + } + } + 0 + } + + /// Format a ScopeLogs with its nested log records. + fn format_scope_logs(&self, scope_logs: &S, is_last_scope: bool) { + // Get first timestamp from log records + let first_ts = scope_logs + .log_records() + .find_map(|lr| lr.time_unix_nano().or_else(|| lr.observed_time_unix_nano())) + .unwrap_or(0); + + // Always format scope line (even if empty) for consistent tree structure + match scope_logs.scope() { + Some(scope) => { + let view = ScopeLogView::new(&scope); + let tree = self.tree; + self.print_line(first_ts, "v1.InstrumentationScope", &view, |w, cw| { + let _ = w.write_all(tree.vertical.as_bytes()); + let _ = w.write_all(b" "); + cw.write_ansi(w, AnsiCode::Magenta); + cw.write_ansi(w, AnsiCode::Bold); + let _ = w.write_all(b"SCOPE"); + cw.write_ansi(w, AnsiCode::Reset); + let _ = w.write_all(b" "); + }); + } + None => { + self.print_scope_line(first_ts); + } + } + + // Format each log record + let records: Vec<_> = scope_logs.log_records().collect(); + let record_count = records.len(); + for (i, log_record) in records.into_iter().enumerate() { + let is_last_record = i == record_count - 1; + self.format_log_record(&log_record, is_last_scope, is_last_record); + } + } + + /// Print a scope line with no attributes (used when scope is None). + fn print_scope_line(&self, timestamp_ns: u64) { + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let mut w = std::io::Cursor::new(buf.as_mut_slice()); + + self.writer.write_ansi(&mut w, AnsiCode::Dim); + ConsoleWriter::write_timestamp(&mut w, timestamp_ns); + self.writer.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b" "); + let _ = w.write_all(self.tree.vertical.as_bytes()); + let _ = w.write_all(b" "); + self.writer.write_ansi(&mut w, AnsiCode::Magenta); + self.writer.write_ansi(&mut w, AnsiCode::Bold); + let _ = w.write_all(b"SCOPE"); + self.writer.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b" v1.InstrumentationScope:\n"); + + let len = w.position() as usize; + let _ = std::io::stdout().write_all(&buf[..len]); + } + + /// Format a single log record using format_log_line. + fn format_log_record( + &self, + log_record: &L, + is_last_scope: bool, + is_last_record: bool, + ) { + let ts = log_record + .time_unix_nano() + .or_else(|| log_record.observed_time_unix_nano()) + .unwrap_or(0); + + let event_name = log_record + .event_name() + .map(|s| String::from_utf8_lossy(s).into_owned()) + .unwrap_or_else(|| "event".to_string()); + + let severity = log_record.severity_number(); + let tree = self.tree; + + self.print_line(ts, &event_name, log_record, |w, cw| { + // Tree prefix + let _ = w.write_all(tree.vertical.as_bytes()); + let _ = w.write_all(b" "); + if is_last_record && is_last_scope { + let _ = w.write_all(tree.corner.as_bytes()); + } else { + let _ = w.write_all(tree.tee.as_bytes()); + } + let _ = w.write_all(b" "); + // Severity with color + cw.write_severity(w, severity); + }); + } + + /// Print a line using the shared format_log_line. + fn print_line(&self, timestamp_ns: u64, event_name: &str, record: &V, format_level: F) + where + V: LogRecordView, + F: FnOnce(&mut BufWriter<'_>, &ConsoleWriter), + { + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let mut w = std::io::Cursor::new(buf.as_mut_slice()); + + self.writer + .format_log_line(&mut w, timestamp_ns, event_name, record, format_level); + + let len = w.position() as usize; + let _ = std::io::stdout().write_all(&buf[..len]); + } +} + +// ============================================================================ +// View adapters that present Resource and Scope as LogRecordView +// ============================================================================ + +/// A view adapter that presents a Resource as a LogRecordView. +/// +/// The resource attributes become the log record's attributes. +/// Body is empty, severity is ignored. +struct ResourceLogView<'a, R: ResourceView> { + resource: &'a R, +} + +impl<'a, R: ResourceView> ResourceLogView<'a, R> { + fn new(resource: &'a R) -> Self { + Self { resource } + } +} + +impl LogRecordView for ResourceLogView<'_, R> { + type Attribute<'att> + = R::Attribute<'att> + where + Self: 'att; + type AttributeIter<'att> + = R::AttributesIter<'att> + where + Self: 'att; + type Body<'bod> + = EmptyAnyValue + where + Self: 'bod; + + fn time_unix_nano(&self) -> Option { + None + } + fn observed_time_unix_nano(&self) -> Option { + None + } + fn severity_number(&self) -> Option { + None + } + fn severity_text(&self) -> Option> { + None + } + fn body(&self) -> Option> { + None + } + fn attributes(&self) -> Self::AttributeIter<'_> { + self.resource.attributes() + } + fn dropped_attributes_count(&self) -> u32 { + 0 + } + fn flags(&self) -> Option { + None + } + fn trace_id(&self) -> Option<&TraceId> { + None + } + fn span_id(&self) -> Option<&SpanId> { + None + } + fn event_name(&self) -> Option> { + None + } +} + +/// A view adapter that presents an InstrumentationScope as a LogRecordView. +/// +/// The scope's name, version, and attributes are merged into the attributes iterator. +struct ScopeLogView<'a, S: InstrumentationScopeView> { + scope: &'a S, +} + +impl<'a, S: InstrumentationScopeView> ScopeLogView<'a, S> { + fn new(scope: &'a S) -> Self { + Self { scope } + } +} + +impl LogRecordView for ScopeLogView<'_, S> { + type Attribute<'att> + = ScopeAttribute<'att, S> + where + Self: 'att; + type AttributeIter<'att> + = ScopeAttributeIter<'att, S> + where + Self: 'att; + type Body<'bod> + = EmptyAnyValue + where + Self: 'bod; + + fn time_unix_nano(&self) -> Option { + None + } + fn observed_time_unix_nano(&self) -> Option { + None + } + fn severity_number(&self) -> Option { + None + } + fn severity_text(&self) -> Option> { + None + } + fn body(&self) -> Option> { + None + } + fn attributes(&self) -> Self::AttributeIter<'_> { + ScopeAttributeIter::new(self.scope) + } + fn dropped_attributes_count(&self) -> u32 { + 0 + } + fn flags(&self) -> Option { + None + } + fn trace_id(&self) -> Option<&TraceId> { + None + } + fn span_id(&self) -> Option<&SpanId> { + None + } + fn event_name(&self) -> Option> { + None + } +} + +/// Iterator that yields scope name, version, and attributes as a unified attribute stream. +struct ScopeAttributeIter<'a, S: InstrumentationScopeView> { + scope: &'a S, + phase: ScopeIterPhase<'a, S>, +} + +enum ScopeIterPhase<'a, S: InstrumentationScopeView + 'a> { + Name, + Version, + Attributes(S::AttributeIter<'a>), + Done, +} + +impl<'a, S: InstrumentationScopeView + 'a> ScopeAttributeIter<'a, S> { + fn new(scope: &'a S) -> Self { + Self { + scope, + phase: ScopeIterPhase::Name, + } + } +} + +impl<'a, S: InstrumentationScopeView + 'a> Iterator for ScopeAttributeIter<'a, S> { + type Item = ScopeAttribute<'a, S>; + + fn next(&mut self) -> Option { + loop { + match &mut self.phase { + ScopeIterPhase::Name => { + if let Some(name) = self.scope.name() { + self.phase = ScopeIterPhase::Version; + return Some(ScopeAttribute::Name(name)); + } + self.phase = ScopeIterPhase::Version; + } + ScopeIterPhase::Version => { + if let Some(version) = self.scope.version() { + self.phase = ScopeIterPhase::Attributes(self.scope.attributes()); + return Some(ScopeAttribute::Version(version)); + } + self.phase = ScopeIterPhase::Attributes(self.scope.attributes()); + } + ScopeIterPhase::Attributes(iter) => { + if let Some(attr) = iter.next() { + return Some(ScopeAttribute::Attr(attr)); + } + self.phase = ScopeIterPhase::Done; + return None; + } + ScopeIterPhase::Done => return None, + } + } + } +} + +/// A synthetic attribute for scope name/version or a real scope attribute. +enum ScopeAttribute<'a, S: InstrumentationScopeView + 'a> { + Name(Str<'a>), + Version(Str<'a>), + Attr(S::Attribute<'a>), +} + +impl<'a, S: InstrumentationScopeView + 'a> AttributeView for ScopeAttribute<'a, S> { + type Val<'val> + = ScopeAttributeValue<'a, 'val, S> + where + Self: 'val; + + fn key(&self) -> Str<'_> { + match self { + ScopeAttribute::Name(_) => b"name", + ScopeAttribute::Version(_) => b"version", + ScopeAttribute::Attr(a) => a.key(), + } + } + + fn value(&self) -> Option> { + match self { + ScopeAttribute::Name(s) => Some(ScopeAttributeValue::String(s)), + ScopeAttribute::Version(s) => Some(ScopeAttributeValue::String(s)), + ScopeAttribute::Attr(a) => a.value().map(ScopeAttributeValue::Delegated), + } + } +} + +/// Value type for scope attributes - either a string (name/version) or delegated. +/// 'a is the lifetime of the underlying data +/// 'val is the borrow lifetime for the value call +enum ScopeAttributeValue<'a, 'val, S: InstrumentationScopeView + 'a> +where + S::Attribute<'a>: 'val, +{ + String(Str<'a>), + Delegated( as AttributeView>::Val<'val>), +} + +impl<'a, 'val, S: InstrumentationScopeView + 'a> AnyValueView<'val> + for ScopeAttributeValue<'a, 'val, S> +where + 'a: 'val, +{ + type KeyValue = EmptyAttribute; + type ArrayIter<'arr> + = std::iter::Empty + where + Self: 'arr; + type KeyValueIter<'kv> + = std::iter::Empty + where + Self: 'kv; + + fn value_type(&self) -> ValueType { + match self { + ScopeAttributeValue::String(_) => ValueType::String, + ScopeAttributeValue::Delegated(v) => v.value_type(), + } + } + + fn as_string(&self) -> Option> { + match self { + ScopeAttributeValue::String(s) => Some(s), + ScopeAttributeValue::Delegated(v) => v.as_string(), + } + } + + fn as_int64(&self) -> Option { + match self { + ScopeAttributeValue::String(_) => None, + ScopeAttributeValue::Delegated(v) => v.as_int64(), + } + } + + fn as_bool(&self) -> Option { + match self { + ScopeAttributeValue::String(_) => None, + ScopeAttributeValue::Delegated(v) => v.as_bool(), + } + } + + fn as_double(&self) -> Option { + match self { + ScopeAttributeValue::String(_) => None, + ScopeAttributeValue::Delegated(v) => v.as_double(), + } + } + + fn as_bytes(&self) -> Option<&[u8]> { + match self { + ScopeAttributeValue::String(_) => None, + ScopeAttributeValue::Delegated(v) => v.as_bytes(), + } + } + + fn as_array(&self) -> Option> { + None + } + + fn as_kvlist(&self) -> Option> { + None + } +} + +/// An empty AnyValue (used as placeholder for body). +struct EmptyAnyValue; + +impl<'a> AnyValueView<'a> for EmptyAnyValue { + type KeyValue = EmptyAttribute; + type ArrayIter<'arr> + = std::iter::Empty + where + Self: 'arr; + type KeyValueIter<'kv> + = std::iter::Empty + where + Self: 'kv; + + fn value_type(&self) -> ValueType { + ValueType::Empty + } + fn as_string(&self) -> Option> { + None + } + fn as_int64(&self) -> Option { + None + } + fn as_bool(&self) -> Option { + None + } + fn as_double(&self) -> Option { + None + } + fn as_bytes(&self) -> Option<&[u8]> { + None + } + fn as_array(&self) -> Option> { + None + } + fn as_kvlist(&self) -> Option> { + None + } +} + +/// An empty attribute (used as placeholder). +struct EmptyAttribute; + +impl AttributeView for EmptyAttribute { + type Val<'val> + = EmptyAnyValue + where + Self: 'val; + + fn key(&self) -> Str<'_> { + b"" + } + + fn value(&self) -> Option> { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tree_chars() { + let unicode = TreeChars::unicode(); + let ascii = TreeChars::ascii(); + + assert_eq!(unicode.vertical, "│"); + assert_eq!(unicode.tee, "├─"); + + assert_eq!(ascii.vertical, "|"); + assert_eq!(ascii.tee, "+-"); + } +} diff --git a/rust/otap-dataflow/crates/otap/src/console_exporter/mod.rs b/rust/otap-dataflow/crates/otap/src/console_exporter/mod.rs new file mode 100644 index 0000000000..cd4820a422 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/console_exporter/mod.rs @@ -0,0 +1,193 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Console exporter that prints OTLP data with hierarchical formatting. +//! +//! This exporter displays logs (and future support for traces/metrics) with +//! resource and scope grouping using tree-style output: +//! +//! ```text +//! RESOURCE {service.name=my-service, ...} +//! │ SCOPE {name=my-scope, version=1.0} +//! │ ├─ INFO event_name: message [attr=value] +//! │ ├─ WARN event_name: warning message +//! │ └─ ERROR event_name: error message [code=500] +//! ``` + +use crate::OTAP_EXPORTER_FACTORIES; +use crate::pdata::OtapPdata; +use async_trait::async_trait; +use linkme::distributed_slice; +use otap_df_config::SignalType; +use otap_df_config::node::NodeUserConfig; +use otap_df_engine::config::ExporterConfig; +use otap_df_engine::context::PipelineContext; +use otap_df_engine::control::{AckMsg, NodeControlMsg}; +use otap_df_engine::error::Error; +use otap_df_engine::exporter::ExporterWrapper; +use otap_df_engine::local::exporter::{EffectHandler, Exporter}; +use otap_df_engine::message::{Message, MessageChannel}; +use otap_df_engine::node::NodeId; +use otap_df_engine::terminal_state::TerminalState; +use otap_df_engine::{ConsumerEffectHandlerExtension, ExporterFactory}; +use otap_df_pdata::OtapPayload; +use otap_df_telemetry::raw_error; +use std::sync::Arc; + +mod formatter; + +use formatter::HierarchicalFormatter; + +/// The URN for the console exporter +pub const CONSOLE_EXPORTER_URN: &str = "urn:otel:console:exporter"; + +/// Configuration for the console exporter +#[derive(Debug, Clone, Default, serde::Deserialize)] +pub struct ConsoleExporterConfig { + /// Whether to use ANSI colors in output (default: true) + #[serde(default = "default_color")] + pub color: bool, + /// Whether to use Unicode box-drawing characters (default: true) + #[serde(default = "default_unicode")] + pub unicode: bool, +} + +fn default_color() -> bool { + true +} + +fn default_unicode() -> bool { + true +} + +/// Console exporter that prints OTLP data with hierarchical formatting +pub struct ConsoleExporter { + formatter: HierarchicalFormatter, +} + +impl ConsoleExporter { + /// Create a new console exporter with the given configuration. + #[must_use] + pub fn new(config: ConsoleExporterConfig) -> Self { + Self { + formatter: HierarchicalFormatter::new(config.color, config.unicode), + } + } +} + +/// Declare the Console Exporter as a local exporter factory +#[allow(unsafe_code)] +#[distributed_slice(OTAP_EXPORTER_FACTORIES)] +pub static CONSOLE_EXPORTER: ExporterFactory = ExporterFactory { + name: CONSOLE_EXPORTER_URN, + create: |_pipeline: PipelineContext, + node: NodeId, + node_config: Arc, + exporter_config: &ExporterConfig| { + let config: ConsoleExporterConfig = serde_json::from_value(node_config.config.clone()) + .map_err(|e| otap_df_config::error::Error::InvalidUserConfig { + error: format!("Failed to parse console exporter config: {}", e), + })?; + Ok(ExporterWrapper::local( + ConsoleExporter::new(config), + node, + node_config, + exporter_config, + )) + }, +}; + +#[async_trait(?Send)] +impl Exporter for ConsoleExporter { + async fn start( + self: Box, + mut msg_chan: MessageChannel, + effect_handler: EffectHandler, + ) -> Result { + loop { + match msg_chan.recv().await? { + Message::Control(NodeControlMsg::Shutdown { .. }) => break, + Message::PData(data) => { + self.export(&data); + effect_handler.notify_ack(AckMsg::new(data)).await?; + } + _ => { + // do nothing + } + } + } + + Ok(TerminalState::default()) + } +} + +impl ConsoleExporter { + fn export(&self, data: &OtapPdata) { + let (_, payload) = data.clone().into_parts(); + match payload.signal_type() { + SignalType::Logs => self.export_logs(&payload), + SignalType::Traces => self.export_traces(&payload), + SignalType::Metrics => self.export_metrics(&payload), + } + } + + fn export_logs(&self, payload: &OtapPayload) { + match payload { + OtapPayload::OtlpBytes(bytes) => { + self.formatter.format_logs_bytes(bytes); + } + OtapPayload::OtapArrowRecords(_records) => { + // TODO: Support Arrow format + eprintln!("Console exporter: Arrow format not yet supported for logs"); + } + } + } + + fn export_traces(&self, _payload: &OtapPayload) { + // TODO: Implement traces formatting + raw_error!("Console exporter: Traces formatting not yet implemented"); + } + + fn export_metrics(&self, _payload: &OtapPayload) { + // TODO: Implement metrics formatting + raw_error!("Console exporter: Metrics formatting not yet implemented"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::testing::*; + use otap_df_engine::Interests; + use serde_json::json; + + #[test] + fn test_console_exporter_no_subscription() { + test_exporter_no_subscription(&CONSOLE_EXPORTER, json!({})); + } + + #[test] + fn test_console_exporter_with_subscription() { + test_exporter_with_subscription( + &CONSOLE_EXPORTER, + json!({}), + Interests::ACKS, + Interests::ACKS, + ); + } + + #[test] + fn test_console_exporter_config_defaults() { + let config: ConsoleExporterConfig = serde_json::from_value(json!({})).unwrap(); + assert!(config.color); + assert!(config.unicode); + } + + #[test] + fn test_console_exporter_config_custom() { + let config: ConsoleExporterConfig = + serde_json::from_value(json!({"color": false, "unicode": false})).unwrap(); + assert!(!config.color); + assert!(!config.unicode); + } +} diff --git a/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver.rs b/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver.rs new file mode 100644 index 0000000000..f3538f1ca5 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver.rs @@ -0,0 +1,173 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Internal telemetry receiver. +//! +//! This receiver consumes internal logs from the logging channel and emits +//! the logs as OTLP ExportLogsRequest messages into the pipeline. + +use crate::OTAP_RECEIVER_FACTORIES; +use crate::pdata::{Context, OtapPdata}; +use async_trait::async_trait; +use linkme::distributed_slice; +use otap_df_config::node::NodeUserConfig; +use otap_df_engine::ReceiverFactory; +use otap_df_engine::config::ReceiverConfig; +use otap_df_engine::context::PipelineContext; +use otap_df_engine::control::NodeControlMsg; +use otap_df_engine::error::Error; +use otap_df_engine::local::receiver as local; +use otap_df_engine::node::NodeId; +use otap_df_engine::receiver::ReceiverWrapper; +use otap_df_engine::terminal_state::TerminalState; +use bytes::Bytes; +use otap_df_pdata::otlp::ProtoBuffer; +use otap_df_pdata::OtlpProtoBytes; +use otap_df_telemetry::logs::LogPayload; +use otap_df_telemetry::metrics::MetricSetSnapshot; +use otap_df_telemetry::self_tracing::{SavedCallsite, encode_export_logs_request}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::sync::Arc; + +/// The URN for the internal telemetry receiver. +pub use otap_df_config::pipeline::service::telemetry::logs::INTERNAL_TELEMETRY_RECEIVER_URN; + +/// Configuration for the internal telemetry receiver. +#[derive(Clone, Deserialize, Serialize, Default)] +#[serde(deny_unknown_fields)] +pub struct Config {} + +/// A receiver that consumes internal logs from the logging channel and emits OTLP logs. +pub struct InternalTelemetryReceiver { + #[allow(dead_code)] + config: Config, +} + +/// Declares the internal telemetry receiver as a local receiver factory. +#[allow(unsafe_code)] +#[distributed_slice(OTAP_RECEIVER_FACTORIES)] +pub static INTERNAL_TELEMETRY_RECEIVER: ReceiverFactory = ReceiverFactory { + name: INTERNAL_TELEMETRY_RECEIVER_URN, + create: |_pipeline: PipelineContext, + node: NodeId, + node_config: Arc, + receiver_config: &ReceiverConfig| { + Ok(ReceiverWrapper::local( + InternalTelemetryReceiver::from_config(&node_config.config)?, + node, + node_config, + receiver_config, + )) + }, +}; + +impl InternalTelemetryReceiver { + /// Create a new receiver with the given configuration. + #[must_use] + pub fn new(config: Config) -> Self { + Self { config } + } + + /// Create a receiver from a JSON configuration. + pub fn from_config(config: &Value) -> Result { + let config: Config = serde_json::from_value(config.clone()).map_err(|e| { + otap_df_config::error::Error::InvalidUserConfig { + error: e.to_string(), + } + })?; + Ok(Self::new(config)) + } +} + +#[async_trait(?Send)] +impl local::Receiver for InternalTelemetryReceiver { + async fn start( + self: Box, + mut ctrl_msg_recv: local::ControlChannel, + effect_handler: local::EffectHandler, + ) -> Result { + // Get the logs receiver channel from the effect handler + let logs_receiver = effect_handler + .logs_receiver() + .expect("InternalTelemetryReceiver requires a logs_receiver to be configured"); + + // Start periodic telemetry collection + let _ = effect_handler + .start_periodic_telemetry(std::time::Duration::from_secs(1)) + .await?; + + // Reusable buffer for encoding log records + let mut buf = ProtoBuffer::with_capacity(512); + + loop { + tokio::select! { + biased; + + // Handle control messages with priority + ctrl_msg = ctrl_msg_recv.recv() => { + match ctrl_msg { + Ok(NodeControlMsg::Shutdown { deadline, .. }) => { + // Drain any remaining logs from channel before shutdown + while let Ok(payload) = logs_receiver.try_recv() { + self.send_payload(&effect_handler, payload, &mut buf).await?; + } + return Ok(TerminalState::new::<[MetricSetSnapshot; 0]>(deadline, [])); + } + Ok(NodeControlMsg::CollectTelemetry { .. }) => { + // No metrics to report for now + } + Err(e) => { + return Err(Error::ChannelRecvError(e)); + } + _ => { + // Ignore other control messages + } + } + } + + // Receive logs from the channel + result = logs_receiver.recv_async() => { + match result { + Ok(payload) => { + self.send_payload(&effect_handler, payload, &mut buf).await?; + } + Err(_) => { + // Channel closed, exit gracefully + return Ok(TerminalState::default()); + } + } + } + } + } + } +} + +impl InternalTelemetryReceiver { + /// Send a log payload as OTLP logs. + async fn send_payload( + &self, + effect_handler: &local::EffectHandler, + payload: LogPayload, + buf: &mut ProtoBuffer, + ) -> Result<(), Error> { + match payload { + LogPayload::Singleton(record) => { + let callsite = SavedCallsite::new(record.callsite_id.0.metadata()); + encode_export_logs_request( + buf, + record, + &callsite, + effect_handler.resource_bytes(), + ); + + let pdata = OtapPdata::new( + Context::default(), + OtlpProtoBytes::ExportLogsRequest(Bytes::copy_from_slice(buf.as_ref())).into(), + ); + effect_handler.send_message(pdata).await?; + } + } + Ok(()) + } +} diff --git a/rust/otap-dataflow/crates/otap/src/lib.rs b/rust/otap-dataflow/crates/otap/src/lib.rs index 63ffe331b1..ff2315f323 100644 --- a/rust/otap-dataflow/crates/otap/src/lib.rs +++ b/rust/otap-dataflow/crates/otap/src/lib.rs @@ -49,9 +49,15 @@ pub mod filter_processor; /// Implementation of a noop exporter that acts as a exporter placeholder pub mod noop_exporter; +/// Console exporter that prints OTLP data with hierarchical formatting +pub mod console_exporter; + /// An error-exporter returns a static error. pub mod error_exporter; +/// Internal telemetry receiver that drains engine logs into the pipeline. +pub mod internal_telemetry_receiver; + /// Experimental exporters and processors #[cfg(any( feature = "experimental-exporters", diff --git a/rust/otap-dataflow/crates/state/Cargo.toml b/rust/otap-dataflow/crates/state/Cargo.toml index 61e0af312c..fb9e035460 100644 --- a/rust/otap-dataflow/crates/state/Cargo.toml +++ b/rust/otap-dataflow/crates/state/Cargo.toml @@ -13,7 +13,8 @@ rust-version.workspace = true workspace = true [dependencies] -otap-df-config = { path = "../config" } +otap-df-config = { workspace = true } +otap-df-telemetry = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -22,4 +23,7 @@ flume = { workspace = true } thiserror = { workspace = true } tokio-util = { workspace = true } tokio = { workspace = true } -log = { workspace = true } + +# Note: use for tracing::field::debug() annotations, b/c otel_xxx! macros +# do not support directly. Consider whether to add or consolidate. +tracing = { workspace = true } diff --git a/rust/otap-dataflow/crates/state/src/reporter.rs b/rust/otap-dataflow/crates/state/src/reporter.rs index c030335ceb..785b3bbe0b 100644 --- a/rust/otap-dataflow/crates/state/src/reporter.rs +++ b/rust/otap-dataflow/crates/state/src/reporter.rs @@ -4,6 +4,7 @@ //! A reporter of observed events. use crate::event::ObservedEvent; +use otap_df_telemetry::raw_error; use std::time::Duration; /// A sharable/clonable observed event reporter sending events to an `ObservedStore`. @@ -25,17 +26,19 @@ impl ObservedEventReporter { /// Note: This method does not return an error if sending the event to the reporting channel /// fails, as this is not sufficient reason to interrupt the normal flow of the system under /// observation. However, an error message is logged to the standard error output. - #[allow( - clippy::print_stderr, - reason = "Use `eprintln!` while waiting for a decision on a framework for debugging/tracing." - )] pub fn report(&self, event: ObservedEvent) { match self.sender.send_timeout(event, self.timeout) { Err(flume::SendTimeoutError::Timeout(event)) => { - eprintln!("Timeout sending observed event: {event:?}") + raw_error!( + "Timeout sending observed event", + event = ?event + ); } Err(flume::SendTimeoutError::Disconnected(event)) => { - eprintln!("Disconnected event: {event:?}") + raw_error!( + "Disconnected event observer", + event = ?event + ); } Ok(_) => {} } diff --git a/rust/otap-dataflow/crates/state/src/store.rs b/rust/otap-dataflow/crates/state/src/store.rs index c7ad071f8e..f82b189524 100644 --- a/rust/otap-dataflow/crates/state/src/store.rs +++ b/rust/otap-dataflow/crates/state/src/store.rs @@ -11,6 +11,7 @@ use crate::pipeline_rt_status::{ApplyOutcome, PipelineRuntimeStatus}; use crate::pipeline_status::PipelineStatus; use crate::reporter::ObservedEventReporter; use otap_df_config::pipeline::PipelineSettings; +use otap_df_telemetry::{otel_warn, raw_error}; use serde::{Serialize, Serializer}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -51,7 +52,7 @@ impl ObservedStateHandle { match self.pipelines.lock() { Ok(guard) => guard.clone(), Err(poisoned) => { - log::warn!( + otel_warn!( "ObservedStateHandle mutex was poisoned; returning possibly stale snapshot" ); poisoned.into_inner().clone() @@ -101,23 +102,28 @@ impl ObservedStateStore { } /// Reports a new observed event in the store. - #[allow( - clippy::print_stderr, - reason = "Use `eprintln!` while waiting for https://github.com/open-telemetry/otel-arrow/issues/1237." - )] fn report(&self, observed_event: ObservedEvent) -> Result { // ToDo Event reporting see: https://github.com/open-telemetry/otel-arrow/issues/1237 // The code below is temporary and should be replaced with a proper event reporting // mechanism (see previous todo). match &observed_event.r#type { - EventType::Request(_) | EventType::Error(_) => { - eprintln!("Observed event: {observed_event:?}") + EventType::Request(_) => { + raw_error!( + "request.event", + observed_event = ?observed_event, + ); + } + EventType::Error(_) => { + raw_error!( + "error.event", + observed_event = ?observed_event, + ); } EventType::Success(_) => { /* no console output for success events */ } } let mut pipelines = self.pipelines.lock().unwrap_or_else(|poisoned| { - log::warn!( + otel_warn!( "ObservedStateStore mutex was poisoned; continuing with possibly inconsistent state" ); poisoned.into_inner() @@ -154,7 +160,10 @@ impl ObservedStateStore { // Exit the loop if the channel is closed while let Ok(event) = self.receiver.recv_async().await { if let Err(e) = self.report(event) { - log::error!("Error reporting observed event: {e}"); + raw_error!( + "Error reporting observed event", + error = ?e, + ); } } } => { /* Channel closed, exit gracefully */ } diff --git a/rust/otap-dataflow/crates/telemetry/src/attributes.rs b/rust/otap-dataflow/crates/telemetry/src/attributes.rs index 5b70d0b970..3de94a238c 100644 --- a/rust/otap-dataflow/crates/telemetry/src/attributes.rs +++ b/rust/otap-dataflow/crates/telemetry/src/attributes.rs @@ -122,6 +122,9 @@ pub trait AttributeSetHandler { } /// Represents a single attribute value that can be of different types. +/// +/// TODO: Duplicate of crates/config/src/pipeline/service/telemetry.rs +/// AttributeValue or OTLP AnyValue? #[derive(Debug, Clone, PartialEq, Serialize)] pub enum AttributeValue { /// String attribute value diff --git a/rust/otap-dataflow/crates/telemetry/src/error.rs b/rust/otap-dataflow/crates/telemetry/src/error.rs index 77bef53512..949aeeba47 100644 --- a/rust/otap-dataflow/crates/telemetry/src/error.rs +++ b/rust/otap-dataflow/crates/telemetry/src/error.rs @@ -20,6 +20,10 @@ pub enum Error { #[error("Metrics channel was closed")] MetricsChannelClosed, + /// The logs channel was closed unexpectedly. + #[error("Logs channel was closed")] + LogsChannelClosed, + /// Error during shutdown of a component. #[error("Shutdown error: {0}")] ShutdownError(String), @@ -27,4 +31,13 @@ pub enum Error { /// Error during configuration of a component. #[error("Configuration error: {0}")] ConfigurationError(String), + + /// Error during logs send. + #[error("Log send error, dropped: {dropped}: {message}")] + LogSendError { + /// Number dropped. + dropped: usize, + /// Reason. + message: String, + }, } diff --git a/rust/otap-dataflow/crates/telemetry/src/internal_events.rs b/rust/otap-dataflow/crates/telemetry/src/internal_events.rs index 07a68bd98f..9225754974 100644 --- a/rust/otap-dataflow/crates/telemetry/src/internal_events.rs +++ b/rust/otap-dataflow/crates/telemetry/src/internal_events.rs @@ -12,7 +12,11 @@ #[doc(hidden)] pub mod _private { - pub use tracing::{debug, error, info, warn}; + pub use tracing::callsite::{Callsite, DefaultCallsite}; + pub use tracing::field::ValueSet; + pub use tracing::metadata::Kind; + pub use tracing::{Event, Level}; + pub use tracing::{callsite2, debug, error, info, valueset, warn}; } /// Macro for logging informational messages. @@ -118,3 +122,49 @@ macro_rules! otel_error { ) }; } + +/// Create a subscriber that writes directly to console (bypassing channels). +fn raw_logging_subscriber() -> impl tracing::Subscriber { + use crate::self_tracing::{ConsoleWriter, RawLoggingLayer}; + use tracing_subscriber::layer::SubscriberExt; + + tracing_subscriber::registry().with(RawLoggingLayer::new(ConsoleWriter::no_color())) +} + +/// Execute a closure with a raw logging subscriber that writes directly to console. +#[inline] +pub fn with_raw_logging(f: F) -> R +where + F: FnOnce() -> R, +{ + tracing::subscriber::with_default(raw_logging_subscriber(), f) +} + +/// Log an error message directly to stderr, bypassing the tracing dispatcher. +/// TODO: the way this is written it supports the full tracing syntax for +/// debug and display formatting of field values. The macros above should +/// be extended similarly. +#[macro_export] +macro_rules! raw_error { + ($name:expr $(, $($fields:tt)*)?) => {{ + use $crate::self_tracing::{ConsoleWriter, RawLoggingLayer}; + use $crate::_private::Callsite; + + static __CALLSITE: $crate::_private::DefaultCallsite = $crate::_private::callsite2! { + name: $name, + kind: $crate::_private::Kind::EVENT, + target: module_path!(), + level: $crate::_private::Level::ERROR, + fields: $($($fields)*)? + }; + + let meta = __CALLSITE.metadata(); + let layer = RawLoggingLayer::new(ConsoleWriter::no_color()); + + // Use closure to extend valueset lifetime (same pattern as tracing::event!) + (|valueset: $crate::_private::ValueSet<'_>| { + let event = $crate::_private::Event::new(meta, &valueset); + layer.dispatch_event(&event); + })($crate::_private::valueset!(meta.fields(), $($($fields)*)?)); + }}; +} diff --git a/rust/otap-dataflow/crates/telemetry/src/lib.rs b/rust/otap-dataflow/crates/telemetry/src/lib.rs index c5fea44e73..487f6a64ef 100644 --- a/rust/otap-dataflow/crates/telemetry/src/lib.rs +++ b/rust/otap-dataflow/crates/telemetry/src/lib.rs @@ -30,21 +30,27 @@ use std::sync::Arc; use crate::error::Error; use crate::registry::TelemetryRegistryHandle; use otap_df_config::pipeline::service::telemetry::TelemetryConfig; +use otap_df_config::pipeline::service::telemetry::logs::LogLevel; use tokio_util::sync::CancellationToken; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; pub mod attributes; pub mod collector; pub mod descriptor; pub mod error; pub mod instrument; -/// Internal logs/events module for engine. +/// Internal logging macros. pub mod internal_events; +/// Internal logs collection and transport. +pub mod logs; pub mod metrics; -pub mod opentelemetry_client; pub mod registry; pub mod reporter; +pub mod resource; pub mod self_tracing; pub mod semconv; +pub mod telemetry_runtime; // Re-export _private module from internal_events for macro usage. // This allows the otel_info!, otel_warn!, etc. macros to work in other crates @@ -62,6 +68,23 @@ pub use tracing::info_span as otel_info_span; pub use tracing::trace_span as otel_trace_span; pub use tracing::warn_span as otel_warn_span; +// Re-export commonly used logs types for convenience. +pub use logs::{ + DirectCollector, ImmediateLayer, LogPayload, LogsReceiver, LogsReporter, TelemetrySetup, +}; + +/// Runtime settings for internal telemetry injection into a receiver. +/// +/// This struct bundles the logs receiver channel and pre-encoded resource bytes +/// that should be injected into the Internal Telemetry Receiver node. +#[derive(Clone)] +pub struct InternalTelemetrySettings { + /// The logs receiver channel. + pub logs_receiver: LogsReceiver, + /// Pre-encoded resource bytes for OTLP log encoding. + pub resource_bytes: bytes::Bytes, +} + // TODO This should be #[cfg(test)], but something is preventing it from working. // The #[cfg(test)]-labeled otap_batch_processor::test_helpers::from_config // can't load this module unless I remove #[cfg(test)]! See #1304. @@ -153,3 +176,23 @@ impl Default for InternalTelemetrySystem { Self::new(&TelemetryConfig::default()) } } + +/// Creates an `EnvFilter` for the given log level. +/// +/// If `RUST_LOG` is set in the environment, it takes precedence for fine-grained control. +/// Otherwise, falls back to the config level with known noisy dependencies (h2, hyper) silenced. +#[must_use] +pub fn get_env_filter(level: LogLevel) -> EnvFilter { + let level = match level { + LogLevel::Off => LevelFilter::OFF, + LogLevel::Debug => LevelFilter::DEBUG, + LogLevel::Info => LevelFilter::INFO, + LogLevel::Warn => LevelFilter::WARN, + LogLevel::Error => LevelFilter::ERROR, + }; + + EnvFilter::try_from_default_env().unwrap_or_else(|_| { + // Default filter: use config level, but silence known noisy HTTP dependencies + EnvFilter::new(format!("{level},h2=off,hyper=off")) + }) +} diff --git a/rust/otap-dataflow/crates/telemetry/src/logs.rs b/rust/otap-dataflow/crates/telemetry/src/logs.rs new file mode 100644 index 0000000000..cecf9ae08d --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/logs.rs @@ -0,0 +1,196 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Internal logs collection for OTAP-Dataflow. + +use crate::self_tracing::{ConsoleWriter, LogRecord, RawLoggingLayer, SavedCallsite}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_sdk::logs::SdkLoggerProvider; +use otap_df_config::pipeline::service::telemetry::logs::LogLevel; +use tracing::{Event, Subscriber}; +use tracing_subscriber::Registry; +use tracing_subscriber::layer::{Context, Layer as TracingLayer, SubscriberExt}; +use tracing_subscriber::registry::LookupSpan; + +/// A payload of log data +/// TODO: merge with Event in crates/state +pub enum LogPayload { + /// A single record. + Singleton(LogRecord), +} + +/// Reporter for sending log batches through a channel. +pub type LogsReporter = flume::Sender; + +/// Type alias for the log payload receiver channel. +pub type LogsReceiver = flume::Receiver; + +/// Create a reporter and receiver pair without the collector. +/// +/// Use this when the receiver will be consumed elsewhere (e.g., by the +/// Internal Telemetry Receiver node). +#[must_use] +pub fn channel(channel_size: usize) -> (LogsReporter, LogsReceiver) { + flume::bounded(channel_size) +} + +/// Direct logs collector +pub struct DirectCollector { + writer: ConsoleWriter, + receiver: LogsReceiver, +} + +impl DirectCollector { + /// New collector with writer. + pub fn new(writer: ConsoleWriter, receiver: LogsReceiver) -> Self { + Self { writer, receiver } + } + + /// Run the collection loop until the channel is closed. + pub async fn run(self) -> Result<(), crate::Error> { + loop { + match self.receiver.recv_async().await { + Ok(payload) => { + self.write_batch(payload); + } + Err(err) => { + crate::raw_error!("log collector error:", err = err.to_string()); + return Ok(()); + } + } + } + } + + /// Write a batch of log records to console. + fn write_batch(&self, payload: LogPayload) { + match payload { + LogPayload::Singleton(record) => self.write_record(record), + } + } + + /// Write one record. + fn write_record(&self, record: LogRecord) { + // Identifier.0 is the &'static dyn Callsite + let metadata = record.callsite_id.0.metadata(); + let saved = SavedCallsite::new(metadata); + // Use ConsoleWriter's routing: ERROR/WARN to stderr, others to stdout + self.writer.raw_print(&record, &saved); + } +} + +/// A tracing Layer that sends each record immediately. +pub struct ImmediateLayer { + /// Reporter for sending to the channel. + reporter: LogsReporter, +} + +impl ImmediateLayer { + /// Create a new unbuffered layer. + #[must_use] + pub fn new(reporter: LogsReporter) -> Self { + Self { reporter } + } +} + +impl TracingLayer for ImmediateLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + let record = LogRecord::new(event); + + match self.reporter.try_send(LogPayload::Singleton(record)) { + Ok(()) => {} + Err(err) => { + crate::raw_error!("failed to send log", err = %err); + } + }; + } +} + +/// Telemetry setup for pipeline threads, carrying the data needed for each mode. +/// +/// This enum is constructed based on `config.logs.providers.engine` (for main pipelines) +/// or `config.logs.providers.internal` (for the internal telemetry pipeline). +/// Pipeline threads use `with_subscriber()` to run with the appropriate logging layer. +#[derive(Clone)] +pub enum TelemetrySetup { + /// Logs are silently dropped. + Noop, + /// Synchronous raw logging to console. + Raw, + /// Immediate: each log is sent immediately. + Immediate { + /// Reporter to send singletons through. + reporter: LogsReporter, + }, + /// OpenTelemetry SDK: logs go through the OpenTelemetry logging pipeline. + OpenTelemetry { + /// The OpenTelemetry SDK logger provider. + logger_provider: SdkLoggerProvider, + }, +} + +impl TelemetrySetup { + /// Initialize this setup as the global tracing subscriber. + /// + /// This is used during startup to set the global subscriber. Returns an error + /// if a global subscriber has already been set. + pub fn try_init_global( + &self, + log_level: LogLevel, + ) -> Result<(), tracing_subscriber::util::TryInitError> { + use tracing_subscriber::util::SubscriberInitExt; + + let filter = crate::get_env_filter(log_level); + + match self { + TelemetrySetup::Noop => tracing::subscriber::NoSubscriber::new().try_init(), + TelemetrySetup::Raw => Registry::default() + .with(filter) + .with(RawLoggingLayer::new(ConsoleWriter::default())) + .try_init(), + TelemetrySetup::Immediate { reporter } => { + let layer = ImmediateLayer::new(reporter.clone()); + Registry::default().with(filter).with(layer).try_init() + } + TelemetrySetup::OpenTelemetry { logger_provider } => { + let sdk_layer = OpenTelemetryTracingBridge::new(logger_provider); + Registry::default().with(filter).with(sdk_layer).try_init() + } + } + } + + /// Run a closure with the appropriate tracing subscriber for this setup. + /// + /// The closure runs with the configured logging layer active. + pub fn with_subscriber(&self, log_level: LogLevel, f: F) -> R + where + F: FnOnce() -> R, + { + let filter = crate::get_env_filter(log_level); + + match self { + TelemetrySetup::Noop => { + let subscriber = tracing::subscriber::NoSubscriber::new(); + tracing::subscriber::with_default(subscriber, f) + } + TelemetrySetup::Raw => { + let subscriber = Registry::default() + .with(filter) + .with(RawLoggingLayer::new(ConsoleWriter::default())); + tracing::subscriber::with_default(subscriber, f) + } + TelemetrySetup::Immediate { reporter } => { + let layer = ImmediateLayer::new(reporter.clone()); + let subscriber = Registry::default().with(filter).with(layer); + tracing::subscriber::with_default(subscriber, f) + } + TelemetrySetup::OpenTelemetry { logger_provider } => { + let sdk_layer = OpenTelemetryTracingBridge::new(logger_provider); + let subscriber = Registry::default().with(filter).with(sdk_layer); + tracing::subscriber::with_default(subscriber, f) + } + } + } +} diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client.rs b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client.rs deleted file mode 100644 index 272d65cbe9..0000000000 --- a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client.rs +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -//! OpenTelemetry SDK integration for telemetry collection and reporting as a client. - -pub mod logger_provider; -pub mod meter_provider; - -use opentelemetry::KeyValue; -use opentelemetry_sdk::{Resource, logs::SdkLoggerProvider, metrics::SdkMeterProvider}; -use otap_df_config::pipeline::service::telemetry::{ - AttributeValue, AttributeValueArray, TelemetryConfig, -}; - -use crate::{ - error::Error, - opentelemetry_client::{logger_provider::LoggerProvider, meter_provider::MeterProvider}, -}; - -/// Client for the OpenTelemetry SDK. -pub struct OpentelemetryClient { - /// The tokio runtime used to run the OpenTelemetry SDK OTLP exporter. - /// The reference is kept to ensure the runtime lives as long as the client. - _runtime: Option, - meter_provider: SdkMeterProvider, - logger_provider: SdkLoggerProvider, - // TODO: Add traces providers. -} - -impl OpentelemetryClient { - /// Create a new OpenTelemetry client from the given configuration. - pub fn new(config: &TelemetryConfig) -> Result { - let sdk_resource = Self::configure_resource(&config.resource); - - let runtime = None; - - let meter_provider = - MeterProvider::configure(sdk_resource.clone(), &config.metrics, runtime)?; - - // Extract the meter provider and runtime by consuming the MeterProvider - let (meter_provider, runtime) = meter_provider.into_parts(); - - let logger_provider = LoggerProvider::configure(sdk_resource, &config.logs, runtime)?; - - let (logger_provider, runtime) = logger_provider.into_parts(); - - //TODO: Configure traces provider. - - Ok(Self { - _runtime: runtime, - meter_provider, - logger_provider, - }) - } - - fn configure_resource( - resource_attributes: &std::collections::HashMap, - ) -> Resource { - let mut sdk_resource_builder = Resource::builder_empty(); - for (k, v) in resource_attributes.iter() { - sdk_resource_builder = sdk_resource_builder - .with_attribute(KeyValue::new(k.clone(), Self::to_sdk_value(v))); - } - sdk_resource_builder.build() - } - - fn to_sdk_value(attr_value: &AttributeValue) -> opentelemetry::Value { - match attr_value { - AttributeValue::String(s) => opentelemetry::Value::String(s.clone().into()), - AttributeValue::Bool(b) => opentelemetry::Value::Bool(*b), - AttributeValue::I64(i) => opentelemetry::Value::I64(*i), - AttributeValue::F64(f) => opentelemetry::Value::F64(*f), - AttributeValue::Array(arr) => match arr { - AttributeValueArray::String(array_s) => { - let sdk_values = array_s.iter().map(|s| s.clone().into()).collect(); - opentelemetry::Value::Array(opentelemetry::Array::String(sdk_values)) - } - AttributeValueArray::Bool(array_b) => { - let sdk_values = array_b.to_vec(); - opentelemetry::Value::Array(opentelemetry::Array::Bool(sdk_values)) - } - AttributeValueArray::I64(array_i) => { - let sdk_values = array_i.to_vec(); - opentelemetry::Value::Array(opentelemetry::Array::I64(sdk_values)) - } - AttributeValueArray::F64(array_f) => { - let sdk_values = array_f.to_vec(); - opentelemetry::Value::Array(opentelemetry::Array::F64(sdk_values)) - } - }, - } - } - - /// Get a reference to the meter provider. - #[must_use] - pub fn meter_provider(&self) -> &SdkMeterProvider { - &self.meter_provider - } - - /// Get a reference to the logger provider. - #[must_use] - pub fn logger_provider(&self) -> &SdkLoggerProvider { - &self.logger_provider - } - - /// Shutdown the OpenTelemetry SDK. - pub fn shutdown(&self) -> Result<(), Error> { - let meter_shutdown_result = self.meter_provider().shutdown(); - let logger_provider_shutdown_result = self.logger_provider().shutdown(); - - if let Err(e) = meter_shutdown_result { - return Err(Error::ShutdownError(e.to_string())); - } - - if let Err(e) = logger_provider_shutdown_result { - return Err(Error::ShutdownError(e.to_string())); - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use opentelemetry::global; - use otap_df_config::pipeline::service::telemetry::{ - AttributeValue, - logs::LogsConfig, - metrics::{ - MetricsConfig, - readers::{ - MetricsReaderConfig, MetricsReaderPeriodicConfig, - periodic::MetricsPeriodicExporterConfig, - }, - }, - }; - - use super::*; - use std::{f64::consts::PI, time::Duration}; - - #[test] - fn test_configure_minimal_opentelemetry_client() -> Result<(), Error> { - let config = TelemetryConfig::default(); - let client = OpentelemetryClient::new(&config)?; - let meter = global::meter("test-meter"); - - let counter = meter.u64_counter("test-counter").build(); - counter.add(1, &[]); - //There is nothing to assert here. The test validates that nothing panics/crashes - - client.shutdown()?; - Ok(()) - } - - #[test] - fn test_configure_opentelemetry_client() -> Result<(), Error> { - let mut resource = std::collections::HashMap::new(); - _ = resource.insert( - "service.name".to_string(), - AttributeValue::String("test-service".to_string()), - ); - - let metrics_config = MetricsConfig { - readers: vec![MetricsReaderConfig::Periodic(MetricsReaderPeriodicConfig { - exporter: MetricsPeriodicExporterConfig::Console, - interval: Duration::from_millis(10), - })], - views: Vec::new(), - }; - - let config = TelemetryConfig { - reporting_channel_size: 10, - reporting_interval: Duration::from_millis(10), - metrics: metrics_config, - logs: LogsConfig::default(), - resource, - }; - let client = OpentelemetryClient::new(&config)?; - let meter = global::meter("test-meter"); - - let counter = meter.u64_counter("test-counter").build(); - counter.add(1, &[]); - //There is nothing to assert here. The test validates that nothing panics/crashes - - client.shutdown()?; - Ok(()) - } - - #[test] - fn test_to_sdk_value() { - let string_attr = AttributeValue::String("example".to_string()); - assert_eq!( - OpentelemetryClient::to_sdk_value(&string_attr), - opentelemetry::Value::String("example".into()) - ); - - let bool_attr = AttributeValue::Bool(true); - assert_eq!( - OpentelemetryClient::to_sdk_value(&bool_attr), - opentelemetry::Value::Bool(true) - ); - - let i64_attr = AttributeValue::I64(42); - assert_eq!( - OpentelemetryClient::to_sdk_value(&i64_attr), - opentelemetry::Value::I64(42) - ); - - let f64_attr = AttributeValue::F64(PI); - assert_eq!( - OpentelemetryClient::to_sdk_value(&f64_attr), - opentelemetry::Value::F64(PI) - ); - - let array_attr = AttributeValue::Array(AttributeValueArray::I64(vec![1, 2, 3])); - assert_eq!( - OpentelemetryClient::to_sdk_value(&array_attr), - opentelemetry::Value::Array(opentelemetry::Array::I64(vec![1, 2, 3])) - ); - } -} diff --git a/rust/otap-dataflow/crates/telemetry/src/resource.rs b/rust/otap-dataflow/crates/telemetry/src/resource.rs new file mode 100644 index 0000000000..5f59464c46 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/resource.rs @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Form OTLP Resource encodings from the configuration struct + +use bytes::Bytes; +use otap_df_config::pipeline::service::telemetry::AttributeValue; +use otap_df_pdata::otlp::ProtoBuffer; +use otap_df_pdata::proto::consts::field_num::common::{ + ANY_VALUE_BOOL_VALUE, ANY_VALUE_DOUBLE_VALUE, ANY_VALUE_INT_VALUE, ANY_VALUE_STRING_VALUE, + KEY_VALUE_KEY, KEY_VALUE_VALUE, +}; +use otap_df_pdata::proto::consts::field_num::logs::RESOURCE_LOGS_RESOURCE; +use otap_df_pdata::proto::consts::field_num::resource::RESOURCE_ATTRIBUTES; +use otap_df_pdata::proto::consts::wire_types; +use otap_df_pdata::proto_encode_len_delimited_unknown_size; +use std::collections::HashMap; + +/// Encode OTLP bytes of the ResourceLogs.resource field, the whole +/// tag-and-bytes representation for a single copy. +#[must_use] +pub fn encode_resource_bytes(attrs: &HashMap) -> Bytes { + if attrs.is_empty() { + return Bytes::new(); + } + + let mut buf = ProtoBuffer::with_capacity(attrs.len() * 64); + + // Encode: field 1 (RESOURCE_LOGS_RESOURCE) -> Resource message + proto_encode_len_delimited_unknown_size!( + RESOURCE_LOGS_RESOURCE, + { + // Resource { attributes: [ KeyValue, ... ] } + for (key, value) in attrs { + encode_resource_attribute(&mut buf, key, value); + } + }, + &mut buf + ); + + buf.into_bytes() +} + +/// Encode a single resource attribute as a KeyValue message. +fn encode_resource_attribute(buf: &mut ProtoBuffer, key: &str, value: &AttributeValue) { + proto_encode_len_delimited_unknown_size!( + RESOURCE_ATTRIBUTES, + { + buf.encode_string(KEY_VALUE_KEY, key); + proto_encode_len_delimited_unknown_size!( + KEY_VALUE_VALUE, + { + match value { + AttributeValue::String(s) => { + buf.encode_string(ANY_VALUE_STRING_VALUE, s); + } + AttributeValue::Bool(b) => { + buf.encode_field_tag(ANY_VALUE_BOOL_VALUE, wire_types::VARINT); + buf.encode_varint(u64::from(*b)); + } + AttributeValue::I64(i) => { + buf.encode_field_tag(ANY_VALUE_INT_VALUE, wire_types::VARINT); + buf.encode_varint(*i as u64); + } + AttributeValue::F64(f) => { + buf.encode_field_tag(ANY_VALUE_DOUBLE_VALUE, wire_types::FIXED64); + buf.extend_from_slice(&f.to_le_bytes()); + } + AttributeValue::Array(_) => { + crate::raw_error!("Arrays are not supported in resource attributes"); + } + } + }, + buf + ); + }, + buf + ); +} diff --git a/rust/otap-dataflow/crates/telemetry/src/self_tracing.rs b/rust/otap-dataflow/crates/telemetry/src/self_tracing.rs index 9d03fd56c7..86fb85a541 100644 --- a/rust/otap-dataflow/crates/telemetry/src/self_tracing.rs +++ b/rust/otap-dataflow/crates/telemetry/src/self_tracing.rs @@ -18,7 +18,8 @@ use tracing::callsite::Identifier; use tracing::{Event, Level, Metadata}; pub use encoder::DirectLogRecordEncoder; -pub use formatter::{ConsoleWriter, RawLoggingLayer}; +pub use encoder::encode_export_logs_request; +pub use formatter::{AnsiCode, BufWriter, ColorMode, ConsoleWriter, RawLoggingLayer, LOG_BUFFER_SIZE}; /// A log record with structural metadata and pre-encoded body/attributes. #[derive(Debug, Clone)] diff --git a/rust/otap-dataflow/crates/telemetry/src/self_tracing/encoder.rs b/rust/otap-dataflow/crates/telemetry/src/self_tracing/encoder.rs index 688f7eb19f..1a8034dfcb 100644 --- a/rust/otap-dataflow/crates/telemetry/src/self_tracing/encoder.rs +++ b/rust/otap-dataflow/crates/telemetry/src/self_tracing/encoder.rs @@ -47,7 +47,7 @@ impl<'buf> DirectLogRecordEncoder<'buf> { .encode_field_tag(LOG_RECORD_SEVERITY_NUMBER, wire_types::VARINT); self.buf.encode_varint(severity as u64); - // Node we skip encoding severity_text (field 3, string) + // Note we skip encoding severity_text (field 3, string) // Encode event_name (field 12, string) - format: "target::name (file:line)" encode_event_name(self.buf, callsite); @@ -301,3 +301,47 @@ pub fn level_to_severity_number(level: &Level) -> u8 { Level::ERROR => 17, } } + +/// Encode a single LogRecord as an OTLP ExportLogsServiceRequest also +/// known as LogsData. +/// +/// The buffer is cleared before encoding. After this call, the buffer +/// contains the complete encoded request. +pub fn encode_export_logs_request( + buf: &mut ProtoBuffer, + record: LogRecord, + callsite: &SavedCallsite, + resource_bytes: Option<&bytes::Bytes>, +) { + buf.clear(); + + // ExportLogsServiceRequest { resource_logs: [ ResourceLogs { ... } ] } + proto_encode_len_delimited_unknown_size!( + LOGS_DATA_RESOURCE, // field 1: resource_logs + { + // Insert pre-encoded resource (field 1: resource) if available + if let Some(res_bytes) = resource_bytes { + buf.extend_from_slice(res_bytes); + } + + // ResourceLogs { scope_logs: [ ScopeLogs { ... } ] } + proto_encode_len_delimited_unknown_size!( + RESOURCE_LOGS_SCOPE_LOGS, // field 2: scope_logs + { + // ScopeLogs { log_records: [ LogRecord { ... } ] } + // TODO: add scope (field 1) + proto_encode_len_delimited_unknown_size!( + SCOPE_LOGS_LOG_RECORDS, // field 2: log_records + { + let mut encoder = DirectLogRecordEncoder::new(buf); + let _ = encoder.encode_log_record(record, callsite); + }, + buf + ); + }, + buf + ); + }, + buf + ); +} diff --git a/rust/otap-dataflow/crates/telemetry/src/self_tracing/formatter.rs b/rust/otap-dataflow/crates/telemetry/src/self_tracing/formatter.rs index 2d6b4c2b79..d436bd0557 100644 --- a/rust/otap-dataflow/crates/telemetry/src/self_tracing/formatter.rs +++ b/rust/otap-dataflow/crates/telemetry/src/self_tracing/formatter.rs @@ -1,7 +1,16 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -//! An alternative to Tokio fmt::layer(). +//! Log formatting primitives for console output. +//! +//! This module provides shared formatting infrastructure used by: +//! - `RawLoggingLayer`: Flat format for tracing events before OTLP pipeline is ready +//! - Console exporter: Hierarchical format for OTLP log data with tree structure +//! +//! The core abstraction is [`ConsoleWriter`] which provides methods for formatting +//! timestamps, levels, bodies, and attributes. The [`ConsoleWriter::format_log_line`] +//! method accepts a callback for customizing the "level" section, enabling different +//! output formats (flat vs hierarchical) while sharing all other formatting logic. use super::{LogRecord, SavedCallsite}; use bytes::Bytes; @@ -23,19 +32,29 @@ pub const LOG_BUFFER_SIZE: usize = 4096; /// ANSI codes a.k.a. "Select Graphic Rendition" codes. #[derive(Clone, Copy)] #[repr(u8)] -enum AnsiCode { +pub enum AnsiCode { + /// Reset all attributes. Reset = 0, + /// Bold text. Bold = 1, + /// Dim/faint text. Dim = 2, + /// Red foreground. Red = 31, + /// Green foreground. Green = 32, + /// Yellow foreground. Yellow = 33, + /// Blue foreground. Blue = 34, + /// Magenta foreground. Magenta = 35, + /// Cyan foreground. + Cyan = 36, } /// Color mode for console output. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ColorMode { /// Enable ANSI color codes. Color, @@ -43,37 +62,6 @@ pub enum ColorMode { NoColor, } -impl ColorMode { - /// Write an ANSI escape sequence (no-op for NoColor). - #[inline] - fn write_ansi(self, w: &mut BufWriter<'_>, code: AnsiCode) { - if let ColorMode::Color = self { - let _ = write!(w, "\x1b[{}m", code as u8); - } - } - - /// Write level with color and padding. - #[inline] - fn write_level(self, w: &mut BufWriter<'_>, level: &Level) { - self.write_ansi(w, Self::color(level)); - let _ = w.write_all(level.as_str().as_bytes()); - self.write_ansi(w, AnsiCode::Reset); - let _ = w.write_all(b" "); - } - - /// Get ANSI color code for a severity level. - #[inline] - fn color(level: &Level) -> AnsiCode { - match *level { - Level::ERROR => AnsiCode::Red, - Level::WARN => AnsiCode::Yellow, - Level::INFO => AnsiCode::Green, - Level::DEBUG => AnsiCode::Blue, - Level::TRACE => AnsiCode::Magenta, - } - } -} - /// Console writes formatted text to stdout or stderr. #[derive(Debug, Clone, Copy)] pub struct ConsoleWriter { @@ -97,6 +85,13 @@ impl RawLoggingLayer { /// Uses `std::io::Cursor` for position tracking with `std::io::Write`. pub type BufWriter<'a> = Cursor<&'a mut [u8]>; +impl Default for ConsoleWriter { + /// Uses the standard NO_COLOR environment variable to disable color. + fn default() -> Self { + Self::no_color() + } +} + impl ConsoleWriter { /// Create a writer that outputs to stdout without ANSI colors. #[must_use] @@ -114,55 +109,27 @@ impl ConsoleWriter { } } - /// Format a LogRecord as a human-readable string (for testing/compatibility). - /// - /// Output format: `2026-01-06T10:30:45.123Z INFO target::name (file.rs:42): body [attr=value, ...]` - pub fn format_log_record(&self, record: &LogRecord, callsite: &SavedCallsite) -> String { - let mut buf = [0u8; LOG_BUFFER_SIZE]; - let len = self.write_log_record(&mut buf, record, callsite); - // The buffer contains valid UTF-8 since we only write ASCII and valid UTF-8 strings - String::from_utf8_lossy(&buf[..len]).into_owned() + /// Returns the color mode. + #[must_use] + pub fn color_mode(&self) -> ColorMode { + self.color_mode } - /// Write a LogRecord to a byte buffer. Returns the number of bytes written. - pub fn write_log_record( - &self, - buf: &mut [u8], - record: &LogRecord, - callsite: &SavedCallsite, - ) -> usize { - let mut w = Cursor::new(buf); - let cm = self.color_mode; - - cm.write_ansi(&mut w, AnsiCode::Dim); - Self::write_timestamp(&mut w, record.timestamp_ns); - cm.write_ansi(&mut w, AnsiCode::Reset); - let _ = w.write_all(b" "); - cm.write_level(&mut w, callsite.level()); - cm.write_ansi(&mut w, AnsiCode::Bold); - Self::write_event_name(&mut w, callsite); - cm.write_ansi(&mut w, AnsiCode::Reset); - let _ = w.write_all(b": "); - Self::write_body_attrs(&mut w, &record.body_attrs_bytes); - let _ = w.write_all(b"\n"); - - w.position() as usize - } + // ======================================================================== + // Core formatting primitives - used by both flat and hierarchical formats + // ======================================================================== - /// Write callsite details as event_name to buffer. + /// Write an ANSI escape sequence (no-op for NoColor mode). #[inline] - fn write_event_name(w: &mut BufWriter<'_>, callsite: &SavedCallsite) { - let _ = w.write_all(callsite.target().as_bytes()); - let _ = w.write_all(b"::"); - let _ = w.write_all(callsite.name().as_bytes()); - if let (Some(file), Some(line)) = (callsite.file(), callsite.line()) { - let _ = write!(w, " ({}:{})", file, line); + pub fn write_ansi(&self, w: &mut BufWriter<'_>, code: AnsiCode) { + if self.color_mode == ColorMode::Color { + let _ = write!(w, "\x1b[{}m", code as u8); } } /// Write nanosecond timestamp as ISO 8601 (UTC) to buffer. #[inline] - fn write_timestamp(w: &mut BufWriter<'_>, nanos: u64) { + pub fn write_timestamp(w: &mut BufWriter<'_>, nanos: u64) { let secs = (nanos / 1_000_000_000) as i64; let subsec_nanos = (nanos % 1_000_000_000) as u32; @@ -187,55 +154,76 @@ impl ConsoleWriter { } } - /// Write body+attrs bytes to buffer using LogRecordView. - fn write_body_attrs(w: &mut BufWriter<'_>, bytes: &Bytes) { - if bytes.is_empty() { - return; + /// Write a tracing level with color and padding. + /// + /// Format: `INFO ` (level string + padding to 6 chars total) + #[inline] + pub fn write_level(&self, w: &mut BufWriter<'_>, level: &Level) { + self.write_ansi(w, Self::level_color(level)); + let _ = w.write_all(level.as_str().as_bytes()); + self.write_ansi(w, AnsiCode::Reset); + // Pad to 6 chars total (longest is "ERROR" = 5, plus 1 space minimum) + let padding = 6 - level.as_str().len(); + for _ in 0..padding { + let _ = w.write_all(b" "); } + } - // A partial protobuf message (just body + attributes) is still a valid message. - // We can use the RawLogRecord view to access just the fields we encoded. - let record = RawLogRecord::new(bytes.as_ref()); + /// Get ANSI color code for a tracing level. + #[inline] + #[must_use] + pub fn level_color(level: &Level) -> AnsiCode { + match *level { + Level::ERROR => AnsiCode::Red, + Level::WARN => AnsiCode::Yellow, + Level::INFO => AnsiCode::Green, + Level::DEBUG => AnsiCode::Blue, + Level::TRACE => AnsiCode::Magenta, + } + } - // Write body if present - if let Some(body) = record.body() { - Self::write_any_value(w, &body); + /// Write OTLP severity with color and padding. + /// + /// Converts OTLP severity number to level string and writes with appropriate color. + /// Format: `INFO ` (level string + padding to 6 chars total) + #[inline] + pub fn write_severity(&self, w: &mut BufWriter<'_>, severity: Option) { + let (text, color) = Self::severity_to_text_and_color(severity); + self.write_ansi(w, color); + let _ = w.write_all(text.as_bytes()); + self.write_ansi(w, AnsiCode::Reset); + // Pad to 6 chars total (longest is "ERROR" = 5, plus 1 space minimum) + let padding = 6 - text.len(); + for _ in 0..padding { + let _ = w.write_all(b" "); } + } - // Write attributes if present - let mut attrs = record.attributes().peekable(); - if attrs.peek().is_some() { - let _ = w.write_all(b" ["); - let mut first = true; - for attr in attrs { - if Self::is_full(w) { - break; - } - if !first { - let _ = w.write_all(b", "); - } - first = false; - let _ = w.write_all(attr.key()); - let _ = w.write_all(b"="); - match attr.value() { - Some(v) => Self::write_any_value(w, &v), - None => { - let _ = w.write_all(b""); - } - } - } - let _ = w.write_all(b"]"); + /// Convert OTLP severity number to display text and ANSI color. + /// + /// See: + #[inline] + #[must_use] + pub fn severity_to_text_and_color(severity: Option) -> (&'static str, AnsiCode) { + match severity { + Some(n) if n >= 17 => ("ERROR", AnsiCode::Red), // FATAL, ERROR + Some(n) if n >= 13 => ("WARN", AnsiCode::Yellow), // WARN + Some(n) if n >= 9 => ("INFO", AnsiCode::Green), // INFO + Some(n) if n >= 5 => ("DEBUG", AnsiCode::Blue), // DEBUG + Some(_) => ("TRACE", AnsiCode::Magenta), // TRACE + None => ("INFO", AnsiCode::Green), // Default to INFO } } - /// Check if the buffer is full (position >= capacity). + /// Check if OTLP severity indicates error or warning (for stderr routing). #[inline] - fn is_full(w: &BufWriter<'_>) -> bool { - w.position() as usize >= w.get_ref().len() + #[must_use] + pub fn severity_is_error_or_warn(severity: Option) -> bool { + matches!(severity, Some(n) if n >= 13) } /// Write an AnyValue to buffer. - fn write_any_value<'a>(w: &mut BufWriter<'_>, value: &impl AnyValueView<'a>) { + pub fn write_any_value<'a>(w: &mut BufWriter<'_>, value: &impl AnyValueView<'a>) { match value.value_type() { ValueType::String => { if let Some(s) = value.as_string() { @@ -259,14 +247,7 @@ impl ConsoleWriter { } ValueType::Bytes => { if let Some(bytes) = value.as_bytes() { - let _ = w.write_all(b"["); - for (i, b) in bytes.iter().enumerate() { - if i > 0 { - let _ = w.write_all(b", "); - } - let _ = write!(w, "{}", b); - } - let _ = w.write_all(b"]"); + let _ = write!(w, "<{} bytes>", bytes.len()); } } ValueType::Array => { @@ -305,8 +286,50 @@ impl ConsoleWriter { } } - /// Write a log line to stdout or stderr. - fn write_line(&self, level: &Level, data: &[u8]) { + /// Write attributes in `[key=value, ...]` format. + /// + /// Writes nothing if the iterator is empty. + pub fn write_attrs(w: &mut BufWriter<'_>, attrs: I) + where + A: AttributeView, + I: Iterator, + { + let mut attrs = attrs.peekable(); + if attrs.peek().is_some() { + let _ = w.write_all(b" ["); + let mut first = true; + for attr in attrs { + if Self::is_full(w) { + break; + } + if !first { + let _ = w.write_all(b", "); + } + first = false; + let _ = w.write_all(attr.key()); + let _ = w.write_all(b"="); + match attr.value() { + Some(v) => Self::write_any_value(w, &v), + None => { + let _ = w.write_all(b""); + } + } + } + let _ = w.write_all(b"]"); + } + } + + /// Check if the buffer is full (position >= capacity). + #[inline] + #[must_use] + pub fn is_full(w: &BufWriter<'_>) -> bool { + w.position() as usize >= w.get_ref().len() + } + + /// Write a log line to stdout or stderr based on level. + /// + /// ERROR and WARN go to stderr, others go to stdout. + pub fn write_output(&self, level: &Level, data: &[u8]) { let use_stderr = matches!(*level, Level::ERROR | Level::WARN); let _ = if use_stderr { std::io::stderr().write_all(data) @@ -314,6 +337,215 @@ impl ConsoleWriter { std::io::stdout().write_all(data) }; } + + // ======================================================================== + // Generic log line formatting with customizable level section + // ======================================================================== + + /// Format a log line from a `LogRecordView` with customizable level formatting. + /// + /// This is the core formatting method used by both: + /// - Flat format (tracing events): callback writes `INFO ` with color + /// - Hierarchical format (OTLP): callback writes tree chars + `RESOURCE`/`SCOPE`/level + /// + /// Output format: ` : [attrs]` + /// + /// # Arguments + /// * `w` - Buffer to write to + /// * `timestamp_ns` - Timestamp in nanoseconds since UNIX epoch + /// * `event_name` - Event name to display (e.g., "target::name" or "v1.Resource") + /// * `record` - LogRecordView providing body() and attributes() + /// * `format_level` - Callback to format the level section; receives (writer, console_writer) + pub fn format_log_line( + &self, + w: &mut BufWriter<'_>, + timestamp_ns: u64, + event_name: &str, + record: &V, + format_level: F, + ) where + V: LogRecordView, + F: FnOnce(&mut BufWriter<'_>, &Self), + { + // Dim timestamp + self.write_ansi(w, AnsiCode::Dim); + Self::write_timestamp(w, timestamp_ns); + self.write_ansi(w, AnsiCode::Reset); + let _ = w.write_all(b" "); + + // Level section (delegated to callback) + format_level(w, self); + + // Bold event name + self.write_ansi(w, AnsiCode::Bold); + let _ = w.write_all(event_name.as_bytes()); + self.write_ansi(w, AnsiCode::Reset); + let _ = w.write_all(b": "); + + // Body + if let Some(body) = record.body() { + Self::write_any_value(w, &body); + } + + // Attributes + Self::write_attrs(w, record.attributes()); + + let _ = w.write_all(b"\n"); + } + + // ======================================================================== + // Tracing-specific methods (for RawLoggingLayer compatibility) + // ======================================================================== + + /// Format a LogRecord as a human-readable string (for testing/compatibility). + /// + /// Output format: `2026-01-06T10:30:45.123Z INFO target::name (file.rs:42): body [attr=value, ...]` + pub fn format_log_record(&self, record: &LogRecord, callsite: &SavedCallsite) -> String { + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let len = self.write_log_record(&mut buf, record, callsite); + // The buffer contains valid UTF-8 since we only write ASCII and valid UTF-8 strings + String::from_utf8_lossy(&buf[..len]).into_owned() + } + + /// Write a LogRecord to stdout or stderr (based on level). + /// + /// ERROR and WARN go to stderr, others go to stdout. + /// This is the same routing logic used by RawLoggingLayer. + pub fn raw_print(&self, record: &LogRecord, callsite: &SavedCallsite) { + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let len = self.write_log_record(&mut buf, record, callsite); + self.write_output(callsite.level(), &buf[..len]); + } + + /// Write a LogRecord to a byte buffer. Returns the number of bytes written. + pub(crate) fn write_log_record( + &self, + buf: &mut [u8], + record: &LogRecord, + callsite: &SavedCallsite, + ) -> usize { + let mut w = Cursor::new(buf); + + self.write_ansi(&mut w, AnsiCode::Dim); + Self::write_timestamp(&mut w, record.timestamp_ns); + self.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b" "); + self.write_level(&mut w, callsite.level()); + self.write_ansi(&mut w, AnsiCode::Bold); + Self::write_event_name(&mut w, callsite); + self.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b": "); + Self::write_body_attrs(&mut w, &record.body_attrs_bytes); + let _ = w.write_all(b"\n"); + + w.position() as usize + } + + /// Write callsite details as event_name to buffer. + #[inline] + fn write_event_name(w: &mut BufWriter<'_>, callsite: &SavedCallsite) { + let _ = w.write_all(callsite.target().as_bytes()); + let _ = w.write_all(b"::"); + let _ = w.write_all(callsite.name().as_bytes()); + if let (Some(file), Some(line)) = (callsite.file(), callsite.line()) { + let _ = write!(w, " ({}:{})", file, line); + } + } + + /// Write body+attrs bytes to buffer using LogRecordView. + fn write_body_attrs(w: &mut BufWriter<'_>, bytes: &Bytes) { + if bytes.is_empty() { + return; + } + + // A partial protobuf message (just body + attributes) is still a valid message. + // We can use the RawLogRecord view to access just the fields we encoded. + let record = RawLogRecord::new(bytes.as_ref()); + + // Write body if present + if let Some(body) = record.body() { + Self::write_any_value(w, &body); + } + + // Write attributes if present + Self::write_attrs(w, record.attributes()); + } + + /// Write a raw error message directly to stderr, bypassing tracing entirely. + /// + /// This method is safe to call from within tracing subscriber callbacks + /// (e.g., `on_event`) where calling `tracing::subscriber::with_default` + /// would cause a "RefCell already borrowed" panic. + /// + /// Output format matches the standard log format: + /// `2026-01-06T10:30:45.123Z ERROR target::name: message [key=value, ...]` + pub fn raw_write_error(&self, target: &str, name: &str, message: &str, attrs: &[(&str, &str)]) { + use std::time::{SystemTime, UNIX_EPOCH}; + + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let mut w = Cursor::new(buf.as_mut_slice()); + + // Timestamp + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + self.write_ansi(&mut w, AnsiCode::Dim); + Self::write_timestamp(&mut w, nanos); + self.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b" "); + + // Level (always ERROR for raw_write_error) + self.write_level(&mut w, &Level::ERROR); + + // Event name (target::name) + self.write_ansi(&mut w, AnsiCode::Bold); + let _ = w.write_all(target.as_bytes()); + let _ = w.write_all(b"::"); + let _ = w.write_all(name.as_bytes()); + self.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b": "); + + // Message body + let _ = w.write_all(message.as_bytes()); + + // Attributes + if !attrs.is_empty() { + let _ = w.write_all(b" ["); + for (i, (key, value)) in attrs.iter().enumerate() { + if i > 0 { + let _ = w.write_all(b", "); + } + let _ = w.write_all(key.as_bytes()); + let _ = w.write_all(b"="); + let _ = w.write_all(value.as_bytes()); + } + let _ = w.write_all(b"]"); + } + + let _ = w.write_all(b"\n"); + + // Always write to stderr for errors + let len = w.position() as usize; + let _ = std::io::stderr().write_all(&buf[..len]); + } +} + +impl RawLoggingLayer { + /// Process a tracing Event directly, bypassing the dispatcher. + /// + /// This method is safe to call from within tracing subscriber callbacks + /// (e.g., `on_event`) where calling `tracing::subscriber::with_default` + /// would cause a "RefCell already borrowed" panic. + /// + /// It performs the same formatting as the Layer's on_event, writing + /// directly to stdout/stderr based on the event's level. + pub fn dispatch_event(&self, event: &Event<'_>) { + let record = LogRecord::new(event); + let callsite = SavedCallsite::new(event.metadata()); + self.writer.raw_print(&record, &callsite); + } } impl TracingLayer for RawLoggingLayer @@ -326,12 +558,11 @@ where // TODO: there are allocations implied here that we would prefer // to avoid, it will be an extensive change in the ProtoBuffer to // stack-allocate this temporary. + // RawLoggingLayer is used before the logs infrastructure is set up, + // so no producer_key context is available. let record = LogRecord::new(event); let callsite = SavedCallsite::new(event.metadata()); - - let mut buf = [0u8; LOG_BUFFER_SIZE]; - let len = self.writer.write_log_record(&mut buf, &record, &callsite); - self.writer.write_line(callsite.level(), &buf[..len]); + self.writer.raw_print(&record, &callsite); } // Note! This tracing layer does not implement Span-related features @@ -350,10 +581,10 @@ mod tests { use crate::self_tracing::encoder::level_to_severity_number; use bytes::Bytes; use otap_df_pdata::otlp::ProtoBuffer; + use otap_df_pdata::prost::Message; use otap_df_pdata::proto::opentelemetry::common::v1::any_value::Value; use otap_df_pdata::proto::opentelemetry::common::v1::{AnyValue, KeyValue}; use otap_df_pdata::proto::opentelemetry::logs::v1::LogRecord as ProtoLogRecord; - use prost::Message; use std::sync::{Arc, Mutex}; use tracing_subscriber::prelude::*; @@ -537,8 +768,6 @@ mod tests { let writer = ConsoleWriter::no_color(); let output = writer.format_log_record(&record, &test_callsite()); - // Note that the severity text is formatted using the Metadata::Level - // so the text appears, unlike the protobuf case. assert_eq!( output, "2024-01-15T12:30:45.678Z INFO test_module::submodule::test_event (src/test.rs:123): \n" diff --git a/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime.rs b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime.rs new file mode 100644 index 0000000000..4f5a77162e --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime.rs @@ -0,0 +1,397 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! OpenTelemetry SDK integration for telemetry collection and reporting as a client. + +pub mod logger_provider; +pub mod meter_provider; + +use opentelemetry::KeyValue; +use opentelemetry_sdk::{Resource, logs::SdkLoggerProvider, metrics::SdkMeterProvider}; +use otap_df_config::pipeline::service::telemetry::{ + AttributeValue, AttributeValueArray, TelemetryConfig, + logs::{OutputMode, ProviderMode}, +}; + +use crate::{ + LogsReceiver, + error::Error, + logs::channel as logs_channel, + logs::{LogsReporter, TelemetrySetup}, + telemetry_runtime::logger_provider::LoggerProvider, + telemetry_runtime::meter_provider::MeterProvider, +}; +use otap_df_config::pipeline::service::telemetry::logs::LogLevel; + +/// Client for the OpenTelemetry SDK and internal telemetry settings. +/// +/// This struct owns all telemetry infrastructure including: +/// - OpenTelemetry SDK meter and logger providers +/// - Internal logs reporter and receiver channels +pub struct TelemetryRuntime { + /// The tokio runtime used to run the OpenTelemetry SDK OTLP exporter. + /// The reference is kept to ensure the runtime lives as long as the client. + _runtime: Option, + meter_provider: SdkMeterProvider, + logger_provider: Option, + /// Reporter for sending logs through the internal channel. + /// Present when global or engine provider mode needs a channel. + logs_reporter: Option, + /// Receiver for the internal logs channel (Internal output mode only). + /// The ITR node consumes this to process internal telemetry. + logs_receiver: Option, + /// Pre-encoded resource bytes for OTLP log encoding. + resource_bytes: bytes::Bytes, + /// Deferred global subscriber setup. Must be initialized by controller + /// AFTER the internal pipeline is started (so the channel is being consumed). + global_setup: Option, + /// Log level for the global subscriber. + global_log_level: LogLevel, + // TODO: Add traces providers. +} + +impl TelemetryRuntime { + /// Create a new OpenTelemetry client from the given configuration. + /// + /// Logging-specific notes: + /// + /// The log level can be controlled via: + /// 1. The `logs.level` config setting (off, debug, info, warn, error) + /// 2. The `RUST_LOG` environment variable for fine-grained control + /// + /// When `RUST_LOG` is set, it takes precedence and allows filtering by target. + /// Example: `RUST_LOG=info,h2=warn,hyper=warn` enables info level but silences + /// noisy HTTP/2 and hyper logs. + /// + /// The logs reporter is created internally based on the configuration: + /// - For `Direct` output: creates reporter + receiver (collector must be spawned) + /// - For `Internal` output: creates reporter + receiver (receiver goes to ITR node) + /// - For `Noop` output: no reporter is created + /// + /// The logger provider is configured when either global or engine providers + /// are set to `OpenTelemetry`. This allows the engine to use the same SDK + /// pipeline even when global uses a different logging strategy. + pub fn new(config: &TelemetryConfig) -> Result { + let sdk_resource = Self::configure_resource(&config.resource); + + // Pre-encode resource bytes once for internal telemetry + let resource_bytes = crate::resource::encode_resource_bytes(&config.resource); + + let runtime = None; + + let (meter_provider, runtime) = + MeterProvider::configure(sdk_resource.clone(), &config.metrics, runtime)?.into_parts(); + + // Create the logs reporter, receiver + let (logs_reporter, logs_receiver) = match config.logs.output { + OutputMode::Direct | OutputMode::Internal => { + let (x, y) = logs_channel(config.reporting_channel_size); + (Some(x), Some(y)) + } + _ => (None, None), + }; + + // Check if either global or engine needs the OpenTelemetry logger provider + let global_needs_otel = config.logs.providers.global == ProviderMode::OpenTelemetry; + let engine_needs_otel = config.logs.providers.engine == ProviderMode::OpenTelemetry; + + // Configure the logger provider if either global or engine needs it + let (logger_provider, runtime) = if global_needs_otel || engine_needs_otel { + let (provider, rt) = + LoggerProvider::configure(sdk_resource.clone(), &config.logs, runtime)? + .into_parts(); + (Some(provider), rt) + } else { + (None, runtime) + }; + + // Build the global setup but DO NOT initialize it yet. + // The controller must call init_global_subscriber() after the internal + // pipeline is started, so the channel receiver is being consumed. + let global_setup = Self::make_telemetry_setup( + config.logs.providers.global, + logs_reporter.as_ref(), + logger_provider.as_ref(), + )?; + + Ok(Self { + _runtime: runtime, + meter_provider, + logger_provider, + logs_reporter, + logs_receiver, + resource_bytes, + global_setup: Some(global_setup), + global_log_level: config.logs.level, + }) + } + + fn configure_resource( + resource_attributes: &std::collections::HashMap, + ) -> Resource { + let mut sdk_resource_builder = Resource::builder_empty(); + for (k, v) in resource_attributes.iter() { + sdk_resource_builder = sdk_resource_builder + .with_attribute(KeyValue::new(k.clone(), Self::to_sdk_value(v))); + } + sdk_resource_builder.build() + } + + fn to_sdk_value(attr_value: &AttributeValue) -> opentelemetry::Value { + match attr_value { + AttributeValue::String(s) => opentelemetry::Value::String(s.clone().into()), + AttributeValue::Bool(b) => opentelemetry::Value::Bool(*b), + AttributeValue::I64(i) => opentelemetry::Value::I64(*i), + AttributeValue::F64(f) => opentelemetry::Value::F64(*f), + AttributeValue::Array(arr) => match arr { + AttributeValueArray::String(array_s) => { + let sdk_values = array_s.iter().map(|s| s.clone().into()).collect(); + opentelemetry::Value::Array(opentelemetry::Array::String(sdk_values)) + } + AttributeValueArray::Bool(array_b) => { + let sdk_values = array_b.to_vec(); + opentelemetry::Value::Array(opentelemetry::Array::Bool(sdk_values)) + } + AttributeValueArray::I64(array_i) => { + let sdk_values = array_i.to_vec(); + opentelemetry::Value::Array(opentelemetry::Array::I64(sdk_values)) + } + AttributeValueArray::F64(array_f) => { + let sdk_values = array_f.to_vec(); + opentelemetry::Value::Array(opentelemetry::Array::F64(sdk_values)) + } + }, + } + } + + /// Get a reference to the meter provider. + #[must_use] + pub fn meter_provider(&self) -> &SdkMeterProvider { + &self.meter_provider + } + + /// Get a reference to the logger provider. + #[must_use] + pub fn logger_provider(&self) -> &Option { + &self.logger_provider + } + + /// Get a reference to the logs reporter. + /// + /// Returns `Some` when the configuration requires a channel-based reporter + /// (global or engine provider is `Immediate`). + #[must_use] + pub fn logs_reporter(&self) -> Option<&LogsReporter> { + self.logs_reporter.as_ref() + } + + /// Take the logs receiver for the internal telemetry pipeline. + /// + /// Returns `Some` only when output mode is `Internal`. The receiver should + /// be passed to the Internal Telemetry Receiver (ITR) node. + /// + /// This method takes ownership of the receiver (can only be called once). + pub fn take_logs_receiver(&mut self) -> Option { + self.logs_receiver.take() + } + + /// Take the internal telemetry settings for injection into the ITR node. + /// + /// Returns `Some` only when output mode is `Internal`. This bundles the + /// logs receiver channel and pre-encoded resource bytes together. + /// + /// This method takes ownership of the receiver (can only be called once). + pub fn take_internal_telemetry_settings(&mut self) -> Option { + self.logs_receiver.take().map(|rx| crate::InternalTelemetrySettings { + logs_receiver: rx, + resource_bytes: self.resource_bytes.clone(), + }) + } + + /// Initialize the global tracing subscriber. + /// + /// This MUST be called AFTER the internal pipeline is started (when using + /// Internal output mode), so the channel receiver is being actively consumed. + /// Otherwise, logs sent before the receiver starts will fill the channel buffer. + /// + /// For other output modes (Direct, Noop), this can be called at any time. + pub fn init_global_subscriber(&mut self) { + if let Some(setup) = self.global_setup.take() { + if let Err(err) = setup.try_init_global(self.global_log_level) { + crate::raw_error!("tracing.subscriber.init", error = err.to_string()); + } + } + } + + /// Create a `TelemetrySetup` for the given provider mode. + /// + /// This uses the runtime's shared `logs_reporter` and `logger_provider` to configure + /// the setup for the given provider mode. + /// + /// # Panics + /// Panics if the provider mode requires a resource that wasn't configured: + /// - `Immediate` requires `logs_reporter` to be present + /// - `OpenTelemetry` requires `logger_provider` to be present + #[must_use] + pub fn telemetry_setup_for(&self, provider_mode: ProviderMode) -> TelemetrySetup { + Self::make_telemetry_setup( + provider_mode, + self.logs_reporter.as_ref(), + self.logger_provider.as_ref(), + ) + .expect("validated: provider mode resources should be configured") + } + + /// Helper to create a TelemetrySetup from a ProviderMode and optional resources. + /// + /// Returns an error if the mode requires a resource that isn't provided. + fn make_telemetry_setup( + provider_mode: ProviderMode, + logs_reporter: Option<&LogsReporter>, + logger_provider: Option<&SdkLoggerProvider>, + ) -> Result { + match provider_mode { + ProviderMode::Noop => Ok(TelemetrySetup::Noop), + ProviderMode::Raw => Ok(TelemetrySetup::Raw), + ProviderMode::Immediate => { + let reporter = logs_reporter.ok_or_else(|| { + Error::ConfigurationError( + "Immediate provider mode requires logs_reporter".into(), + ) + })?; + Ok(TelemetrySetup::Immediate { + reporter: reporter.clone(), + }) + } + ProviderMode::OpenTelemetry => { + let provider = logger_provider.ok_or_else(|| { + Error::ConfigurationError( + "OpenTelemetry provider mode requires logger_provider".into(), + ) + })?; + Ok(TelemetrySetup::OpenTelemetry { + logger_provider: provider.clone(), + }) + } + } + } + + /// Shutdown the OpenTelemetry SDK. + pub fn shutdown(&self) -> Result<(), Error> { + let meter_shutdown_result = self.meter_provider().shutdown(); + let logger_provider_shutdown_result = self + .logger_provider() + .as_ref() + .map(|x| x.shutdown()) + .transpose(); + + if let Err(e) = meter_shutdown_result { + return Err(Error::ShutdownError(e.to_string())); + } + + if let Err(e) = logger_provider_shutdown_result { + return Err(Error::ShutdownError(e.to_string())); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use opentelemetry::global; + use otap_df_config::pipeline::service::telemetry::{ + AttributeValue, + logs::LogsConfig, + metrics::{ + MetricsConfig, + readers::{ + MetricsReaderConfig, MetricsReaderPeriodicConfig, + periodic::MetricsPeriodicExporterConfig, + }, + }, + }; + + use super::*; + use std::{f64::consts::PI, time::Duration}; + + #[test] + fn test_configure_minimal_telemetry_runtime() -> Result<(), Error> { + let config = TelemetryConfig::default(); + let client = TelemetryRuntime::new(&config)?; + let meter = global::meter("test-meter"); + + let counter = meter.u64_counter("test-counter").build(); + counter.add(1, &[]); + //There is nothing to assert here. The test validates that nothing panics/crashes + + client.shutdown()?; + Ok(()) + } + + #[test] + fn test_configure_telemetry_runtime() -> Result<(), Error> { + let mut resource = std::collections::HashMap::new(); + _ = resource.insert( + "service.name".to_string(), + AttributeValue::String("test-service".to_string()), + ); + + let metrics_config = MetricsConfig { + readers: vec![MetricsReaderConfig::Periodic(MetricsReaderPeriodicConfig { + exporter: MetricsPeriodicExporterConfig::Console, + interval: Duration::from_millis(10), + })], + views: Vec::new(), + }; + + let config = TelemetryConfig { + reporting_channel_size: 10, + reporting_interval: Duration::from_millis(10), + metrics: metrics_config, + logs: LogsConfig::default(), + resource, + }; + let client = TelemetryRuntime::new(&config)?; + let meter = global::meter("test-meter"); + + let counter = meter.u64_counter("test-counter").build(); + counter.add(1, &[]); + //There is nothing to assert here. The test validates that nothing panics/crashes + + client.shutdown()?; + Ok(()) + } + + #[test] + fn test_to_sdk_value() { + let string_attr = AttributeValue::String("example".to_string()); + assert_eq!( + TelemetryRuntime::to_sdk_value(&string_attr), + opentelemetry::Value::String("example".into()) + ); + + let bool_attr = AttributeValue::Bool(true); + assert_eq!( + TelemetryRuntime::to_sdk_value(&bool_attr), + opentelemetry::Value::Bool(true) + ); + + let i64_attr = AttributeValue::I64(42); + assert_eq!( + TelemetryRuntime::to_sdk_value(&i64_attr), + opentelemetry::Value::I64(42) + ); + + let f64_attr = AttributeValue::F64(PI); + assert_eq!( + TelemetryRuntime::to_sdk_value(&f64_attr), + opentelemetry::Value::F64(PI) + ); + + let array_attr = AttributeValue::Array(AttributeValueArray::I64(vec![1, 2, 3])); + assert_eq!( + TelemetryRuntime::to_sdk_value(&array_attr), + opentelemetry::Value::Array(opentelemetry::Array::I64(vec![1, 2, 3])) + ); + } +} diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/logger_provider.rs similarity index 76% rename from rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs rename to rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/logger_provider.rs index d14dc84a39..f5f2660b16 100644 --- a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs +++ b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/logger_provider.rs @@ -3,12 +3,11 @@ //! Configures the OpenTelemetry logger provider based on the provided configuration. -use opentelemetry_appender_tracing::layer; use opentelemetry_otlp::{Protocol, WithExportConfig}; use opentelemetry_sdk::{Resource, logs::SdkLoggerProvider}; use otap_df_config::pipeline::service::telemetry::{ logs::{ - LogLevel, LogsConfig, + LogsConfig, processors::{ BatchLogProcessorConfig, batch::{LogBatchProcessorExporterConfig, otlp::OtlpExporterConfig}, @@ -16,9 +15,6 @@ use otap_df_config::pipeline::service::telemetry::{ }, metrics::readers::periodic::otlp::OtlpProtocol, }; -use tracing::level_filters::LevelFilter; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::{EnvFilter, layer::SubscriberExt}; use crate::error::Error; @@ -29,39 +25,7 @@ pub struct LoggerProvider { } impl LoggerProvider { - /// Initializes internal logging for the OTAP engine. - /// - /// The log level can be controlled via: - /// 1. The `logs.level` config setting (off, debug, info, warn, error) - /// 2. The `RUST_LOG` environment variable for fine-grained control - /// - /// When `RUST_LOG` is set, it takes precedence and allows filtering by target. - /// Example: `RUST_LOG=info,h2=warn,hyper=warn` enables info level but silences - /// noisy HTTP/2 and hyper logs. - /// - /// TODO: The engine uses a thread-per-core model - /// and is NUMA aware. - /// The fmt::init() here is truly global, and hence - /// this will be a source of contention. - /// We need to evaluate alternatives: - /// - /// 1. Set up per thread subscriber. - /// ```ignore - /// // start of thread - /// let _guard = tracing::subscriber::set_default(subscriber); - /// // now, with this thread, all tracing calls will go to this subscriber - /// // eliminating contention. - /// // end of thread - /// ``` - /// - /// 2. Use custom subscriber that batches logs in thread-local buffer, and - /// flushes them periodically. - /// - /// The TODO here is to evaluate these options and implement one of them. - /// As of now, this causes contention, and we just need to accept temporarily. - /// - /// TODO: Evaluate also alternatives for the contention caused by the global - /// OpenTelemetry logger provider added as layer. + /// Initializes OpenTelemetry logger provider for the OTAP engine. pub fn configure( sdk_resource: Resource, logger_config: &LogsConfig, @@ -80,35 +44,6 @@ impl LoggerProvider { let sdk_logger_provider = sdk_logger_builder.build(); - let level = match logger_config.level { - LogLevel::Off => LevelFilter::OFF, - LogLevel::Debug => LevelFilter::DEBUG, - LogLevel::Info => LevelFilter::INFO, - LogLevel::Warn => LevelFilter::WARN, - LogLevel::Error => LevelFilter::ERROR, - }; - - // If RUST_LOG is set, use it for fine-grained control. - // Otherwise, fall back to the config level with some noisy dependencies silenced. - // Users can override by setting RUST_LOG explicitly. - let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { - // Default filter: use config level, but silence known noisy HTTP dependencies - EnvFilter::new(format!("{level},h2=off,hyper=off")) - }); - - // Formatting layer - let fmt_layer = tracing_subscriber::fmt::layer().with_thread_names(true); - - let sdk_layer = layer::OpenTelemetryTracingBridge::new(&sdk_logger_provider); - - // Try to initialize the global subscriber. In tests, this may fail if already set, - // which is acceptable as we're only validating the configuration works. - let _ = tracing_subscriber::registry() - .with(filter) - .with(fmt_layer) - .with(sdk_layer) - .try_init(); - Ok(LoggerProvider { sdk_logger_provider, runtime, @@ -250,6 +185,18 @@ impl LoggerProvider { #[cfg(test)] mod tests { use super::*; + use opentelemetry_otlp::Protocol; + use opentelemetry_sdk::Resource; + use otap_df_config::pipeline::service::telemetry::{ + logs::{ + LogLevel, LogsConfig, + processors::{ + BatchLogProcessorConfig, + batch::{LogBatchProcessorExporterConfig, otlp::OtlpExporterConfig}, + }, + }, + metrics::readers::periodic::otlp::OtlpProtocol, + }; use tracing::error; #[test] @@ -264,6 +211,7 @@ mod tests { }, ), ], + ..Default::default() }; let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; let (sdk_logger_provider, _) = logger_provider.into_parts(); @@ -292,6 +240,7 @@ mod tests { }, ), ], + ..Default::default() }; let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; let (sdk_logger_provider, runtime_option) = logger_provider.into_parts(); @@ -311,6 +260,7 @@ mod tests { let logger_config = LogsConfig { level: LogLevel::default(), processors: vec![], + ..Default::default() }; let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; let (sdk_logger_provider, _) = logger_provider.into_parts(); diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider.rs b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider.rs similarity index 99% rename from rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider.rs rename to rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider.rs index 358d8dad76..e22f163d9b 100644 --- a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider.rs +++ b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider.rs @@ -22,7 +22,7 @@ use otap_df_config::pipeline::service::telemetry::metrics::{ use crate::{ error::Error, - opentelemetry_client::meter_provider::{ + telemetry_runtime::meter_provider::{ otlp_exporter_provider::OtlpExporterProvider, prometheus_exporter_provider::PrometheusExporterProvider, }, diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/otlp_exporter_provider.rs b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider/otlp_exporter_provider.rs similarity index 100% rename from rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/otlp_exporter_provider.rs rename to rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider/otlp_exporter_provider.rs diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/prometheus_exporter_provider.rs b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider/prometheus_exporter_provider.rs similarity index 100% rename from rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/prometheus_exporter_provider.rs rename to rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider/prometheus_exporter_provider.rs diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/views_provider.rs b/rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider/views_provider.rs similarity index 100% rename from rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/views_provider.rs rename to rust/otap-dataflow/crates/telemetry/src/telemetry_runtime/meter_provider/views_provider.rs diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs index e242b472cf..b33b38e31d 100644 --- a/rust/otap-dataflow/src/main.rs +++ b/rust/otap-dataflow/src/main.rs @@ -9,8 +9,12 @@ use otap_df_config::pipeline_group::{CoreAllocation, CoreRange, Quota}; use otap_df_config::{PipelineGroupId, PipelineId}; use otap_df_controller::Controller; use otap_df_otap::OTAP_PIPELINE_FACTORY; +use otap_df_telemetry::raw_error; +use otap_df_telemetry::self_tracing::{ConsoleWriter, RawLoggingLayer}; use std::path::PathBuf; use sysinfo::System; +use tracing_subscriber::Registry; +use tracing_subscriber::layer::SubscriberExt; #[cfg(all( not(windows), @@ -121,43 +125,57 @@ fn main() -> Result<(), Box> { let pipeline_group_id: PipelineGroupId = "default_pipeline_group".into(); let pipeline_id: PipelineId = "default_pipeline".into(); - println!("{}", system_info()); - - // Load pipeline configuration from file - let pipeline_cfg = PipelineConfig::from_file( - pipeline_group_id.clone(), - pipeline_id.clone(), - &args.pipeline, - )?; + // Use with_default for a thread-local subscriber during startup. + // This covers config loading and early info logging. + // TelemetryRuntime::new() (called inside run_forever) will set the actual global subscriber. + let early_subscriber = Registry::default().with(RawLoggingLayer::new(ConsoleWriter::color())); + let (pipeline_cfg, quota, admin_settings) = + tracing::subscriber::with_default(early_subscriber, || { + // Load pipeline configuration + let pipeline_cfg = PipelineConfig::from_file( + pipeline_group_id.clone(), + pipeline_id.clone(), + &args.pipeline, + )?; + + tracing::info!("{}", system_info()); + + // Map CLI arguments to the core allocation enum + let core_allocation = if let Some(range) = args.core_id_range.clone() { + range + } else if args.num_cores == 0 { + CoreAllocation::AllCores + } else { + CoreAllocation::CoreCount { + count: args.num_cores, + } + }; + + let quota = Quota { core_allocation }; + + // Print the requested core configuration + match "a.core_allocation { + CoreAllocation::AllCores => { + tracing::info!("Requested core allocation: all available cores") + } + CoreAllocation::CoreCount { count } => { + tracing::info!("Requested core allocation: {count} cores") + } + CoreAllocation::CoreSet { .. } => { + tracing::info!("Requested core allocation: {}", quota.core_allocation); + } + } + + let admin_settings = otap_df_config::engine::HttpAdminSettings { + bind_address: args.http_admin_bind.clone(), + }; + + Ok::<_, Box>((pipeline_cfg, quota, admin_settings)) + })?; // Create controller and start pipeline with multi-core support let controller = Controller::new(&OTAP_PIPELINE_FACTORY); - // Map CLI arguments to the core allocation enum - let core_allocation = if let Some(range) = args.core_id_range { - range - } else if args.num_cores == 0 { - CoreAllocation::AllCores - } else { - CoreAllocation::CoreCount { - count: args.num_cores, - } - }; - - let quota = Quota { core_allocation }; - - // Print the requested core configuration - match "a.core_allocation { - CoreAllocation::AllCores => println!("Requested core allocation: all available cores"), - CoreAllocation::CoreCount { count } => println!("Requested core allocation: {count} cores"), - CoreAllocation::CoreSet { .. } => { - println!("Requested core allocation: {}", quota.core_allocation); - } - } - - let admin_settings = otap_df_config::engine::HttpAdminSettings { - bind_address: args.http_admin_bind, - }; let result = controller.run_forever( pipeline_group_id, pipeline_id, @@ -167,11 +185,11 @@ fn main() -> Result<(), Box> { ); match result { Ok(_) => { - println!("Pipeline run successfully"); + tracing::info!("Pipeline run successfully"); std::process::exit(0); } Err(e) => { - eprintln!("Pipeline failed to run: {e}"); + raw_error!("Pipeline failed to run", error = format!("{e}")); std::process::exit(1); } }