Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 144 additions & 40 deletions rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,74 +5,178 @@
pub mod processors;

use crate::error::Error;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// Internal Telemetry Receiver node URN for internal logging using OTLP bytes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Do not include any source code changes as part of this PR unless it is a complete POC.
I'm not clear how this changes are aligned with the design, and there might be problems when starting the SDK threads when some objects are present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you and other reviewers feel that this configuration is OK, I'll be glad to remove this change and include it in the PR with implementation.

pub const INTERNAL_TELEMETRY_RECEIVER_URN: &str = "urn:otel:otlp:telemetry:receiver";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The controller or engine at startup need to know which node is the ITR, and it's implemented in the crates/otap area, this just needs to be somewhere both can depend on, and this is sort of a type of configuration. It could go in the engine, controller, or other low-level crate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, this URN is ambiguous. We should have a URN that clearly expresses that this is a receiver used by our internal telemetry. I would suggest something like urn:otelcol:internal:receiver, or something along those lines.


/// Internal logs configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct LogsConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration is more "pipeline" level than engine level. That's why I don't see this structure configured here.
I would expect the configuration to be defined per component, with defaults for the type of component. For example, the internal telemetry receiver and subsequent nodes should be Noop or console only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that @lquerel also doesn't think the new configuration belongs here. I'm not sure where it belongs? This feels OK to me.

I do see this as configuration for the main function, so the global, engine, and internal aspects inside providers are for global-level, engine-level, and internal-pipeline logging configuration. As mentioned #1741 (comment) this is something we can configure component level, per-tenant, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, I expect a lot of changes at this level. Right now we are talking about LogConfig, but in the long run this will be a configuration for the entire internal telemetry pipeline/system and for all internally used signal types (metric set, events, ...).

I can see this being acceptable as an intermediate step for now, but as I mentioned, I don’t think this configuration is 1) general enough, or 2) in the right place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lquerel this position in the YAML is the same as the OTel Collector at service::telemetry::logs with the level field. I'm open to placing it anywhere you or @andborja could agree. Definitely the resource used for logs and metrics; I would expect other signal's internal configuration to mirror this.

/// The log level for internal engine logs.
#[serde(default)]
/// The log level for internal logs.
#[serde(default = "default_level")]
pub level: LogLevel,

/// The list of log processors to configure.
#[serde(default)]
/// Logging provider configuration.
#[serde(default = "default_providers")]
pub providers: LoggingProviders,

/// What to do with collected log events. This applies when any ProviderMode
/// in providers indicates Buffered or Unbuffered. Does not apply if all
/// providers are in [Noop, Raw, OpenTelemetry].
#[serde(default = "default_output")]
pub output: OutputMode,

/// OpenTelemetry SDK is configured via processors.
pub processors: Vec<processors::LogProcessorConfig>,
}

/// Log level for internal engine logs.
///
/// TODO: Change default to `Info` once per-thread subscriber is implemented
/// to avoid contention from the global tracing subscriber.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
/// Log level for dataflow engine logs.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
/// Logging is completely disabled.
#[default]
Off,
/// Debug level logging.
Debug,
/// Info level logging.
#[default]
Info,
/// Warn level logging.
Warn,
/// Error level logging.
Error,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_logs_config_deserialize() {
let yaml_str = r#"
level: "info"
processors:
- batch:
exporter:
console:
"#;
let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.level, LogLevel::Info);
assert_eq!(config.processors.len(), 1);
/// Logging providers for different execution contexts.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct LoggingProviders {
/// Provider mode for non-engine threads. This defines the global Tokio
/// `tracing` subscriber. Default is Unbuffered. Note that Buffered
/// requires opt-in thread-local setup.
pub global: ProviderMode,

/// Provider mod for engine/pipeline threads. This defines how the
/// engine thread / core sets the Tokio `tracing`
/// subscriber. Default is Buffered. Internal logs will be flushed
/// by either the Internal Telemetry Receiver or the main pipeline
/// controller.
pub engine: ProviderMode,

/// Provider mode for nodes downstream of Internal Telemetry receiver.
/// This defaults to Noop to avoid internal feedback.
#[serde(default = "default_internal_provider")]
pub internal: ProviderMode,
}

/// Logs producer: how log events are captured and routed.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProviderMode {
/// Log events are silently ignored.
Noop,

/// Place into a thread-local buffer.
Buffered,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be an option at production time. If someone wishes to buffer and batch events, that should be done in an internal telemetry pipeline (not at telemetry production time).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal telemetry pipeline is the buffer. The internal telemetry pipeline starts with a buffer in each engine thread.


/// Non-blocking, immediate delivery.
Unbuffered,

/// Use OTel-Rust as the provider.
OpenTelemetry,

/// Use synchronous logging. Note! This can block the producing thread.
Raw,
}

/// Output mode: what the recipient does with received events for
/// Buffered and Unbuffered provider logging modes.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum OutputMode {
/// Noop prevents the use of Buffered and Unbuffered modes. This
/// output mode can be set when all providers are configured to
/// avoid the internal output configuration through Noop, Raw, or
/// OpenTelemetry settings.
Noop,

/// Raw logging: format and print directly to console
/// (stdout/stderr) from the logs collector thread. ERROR and
/// WARN go to stderr, others to stdout.
#[default]
Raw,

/// Route to Internal Telemetry Receiver node. The pipeline must
/// include a nod with INTERNAL_TELEMETRY_RECEIVER_URN. The
/// engine provider mode must be Buffered for internal output.
Internal,
}

fn default_output() -> OutputMode {
OutputMode::Raw
}

fn default_level() -> LogLevel {
LogLevel::Info
}

fn default_internal_provider() -> ProviderMode {
ProviderMode::Noop
}

fn default_providers() -> LoggingProviders {
LoggingProviders {
global: ProviderMode::Unbuffered,
engine: ProviderMode::Buffered,
internal: default_internal_provider(),
}
}

#[test]
fn test_log_level_deserialize() {
let yaml_str = r#"
level: "info"
"#;
let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.level, LogLevel::Info);
impl Default for LogsConfig {
fn default() -> Self {
Self {
level: default_level(),
providers: default_providers(),
output: default_output(),
processors: Vec::new(),
}
}
}

impl LogsConfig {
/// Validate the logs configuration.
///
/// Returns an error if:
/// - `output` is `Noop` but a provider uses `Buffered` or `Unbuffered`
/// (logs would be sent but discarded)
/// - `output` is `Internal` but engine provider is not `Buffered`
pub fn validate(&self) -> Result<(), Error> {
if self.output == OutputMode::Noop {
let global_sends = matches!(
self.providers.global,
ProviderMode::Buffered | ProviderMode::Unbuffered
);
let engine_sends = matches!(
self.providers.engine,
ProviderMode::Buffered | ProviderMode::Unbuffered
);

if global_sends || engine_sends {
return Err(Error::InvalidUserConfig {
error: "output mode is 'noop' but a provider uses buffered or unbuffered"
.into(),
});
}
}

if self.output == OutputMode::Internal && self.providers.engine != ProviderMode::Buffered {
return Err(Error::InvalidUserConfig {
error: "output mode is 'internal', engine must use buffered provider".into(),
});
}

#[test]
fn test_logs_config_default_deserialize() -> Result<(), serde_yaml::Error> {
let yaml_str = r#""#;
let config: LogsConfig = serde_yaml::from_str(yaml_str)?;
assert_eq!(config.level, LogLevel::Off);
assert!(config.processors.is_empty());
Ok(())
}
}
Loading
Loading