Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ path = "src/main.rs"
otap-df-config.workspace = true
otap-df-controller.workspace = true
otap-df-otap.workspace = true
otap-df-pdata.workspace = true
otap-df-telemetry = { path = "crates/telemetry" }
thiserror.workspace = true
quiver = { workspace = true, optional = true }
serde_json.workspace = true
clap.workspace = true
mimalloc = { workspace = true, optional = true }
tikv-jemallocator.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
rustls = { workspace = true, optional = true }

[target.'cfg(not(windows))'.dependencies]
Expand All @@ -49,6 +54,7 @@ otap-df-pdata-otlp-model = { path = "./crates/pdata/src/otlp/model"}
otap-df-config = { path = "crates/config" }
otap-df-controller = { path = "crates/controller" }
otap-df-otap = { path = "crates/otap" }
otap-df-pdata = { path = "crates/pdata" }
quiver = { package = "otap-df-quiver", path = "crates/quiver" }
data_engine_expressions = { path = "../experimental/query_engine/expressions" }
data_engine_kql_parser = { path = "../experimental/query_engine/kql-parser" }
Expand Down Expand Up @@ -137,6 +143,7 @@ syn = { version = "2.0", features = ["full", "extra-traits"] }
tempfile = "3"
thiserror = "2.0.17"
tracing = { version = ">=0.1.40", default-features = false }
tracing-core = { version = "0.1", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false }
tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] }
tokio-stream = "0.1.17"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod processors;

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::time::Duration;

/// Internal logs configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
Expand All @@ -18,6 +19,75 @@ pub struct LogsConfig {
/// The list of log processors to configure.
#[serde(default)]
pub processors: Vec<processors::LogProcessorConfig>,

/// Internal collection for component-level logs.
///
/// When enabled, component logs (otel_info!, otel_warn!, etc.) are routed through
/// an internal telemetry receiver in the OTAP pipeline, allowing use of built-in
/// batch processors, retry, and exporters (console, OTLP, etc.).
Comment on lines +23 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a big fan of this approach.

///
/// When disabled (default), component logs are routed to the OpenTelemetry SDK,
/// using the same export path as 3rd party logs from tokio-tracing-rs.
#[serde(default)]
pub internal_collection: InternalCollectionConfig,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term, I think this approach will apply not only to logs, but also to metrics and traces. At some point, we might promote this configuration to a more general level.
For now, I suggest that this field be:

  • an option, so we can remove the enabled field from InternalCollectionConfig, which I think would simplify things
  • renamed to something more explicit. I'm not fully satisfied with my proposal otap_pipeline; there is probably a better name to find

}

/// Configuration for internal collection of component logs.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct InternalCollectionConfig {
/// Enable internal collection for component logs.
/// When false, component logs use the OpenTelemetry SDK (declarative config is honored).
#[serde(default)]
pub enabled: bool,

/// Per-thread buffer size in bytes for accumulating component logs.
/// This is a pre-allocated, fixed-size buffer. When full, logs are flushed to the channel.
#[serde(default = "default_buffer_size_bytes")]
pub buffer_size_bytes: usize,

/// Maximum size in bytes for a single log record.
/// Records exceeding this size are dropped with a counter increment.
/// This limit enables encoder optimizations (2-byte length placeholders for 14-bit sizes).
#[serde(default = "default_max_record_bytes")]
pub max_record_bytes: usize,

/// Maximum number of records in the bounded channel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"bounded" channel feels like internal implementation details; so we should avoid exposing them to public config..

/// When full, new records fall back to raw console logger.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fallback here, instead of keeping count of the drops?

#[serde(default = "default_max_record_count")]
pub max_record_count: usize,

/// Flush interval for periodic flushing by the internal telemetry receiver.
#[serde(with = "humantime_serde", default = "default_flush_interval")]
#[schemars(with = "String")]
pub flush_interval: Duration,
}

impl Default for InternalCollectionConfig {
fn default() -> Self {
Self {
enabled: false,
buffer_size_bytes: default_buffer_size_bytes(),
max_record_bytes: default_max_record_bytes(),
max_record_count: default_max_record_count(),
flush_interval: default_flush_interval(),
}
}
}

fn default_buffer_size_bytes() -> usize {
64 * 1024 // 64 KiB - pre-allocated per thread
}

fn default_max_record_bytes() -> usize {
16 * 1024 // 16 KiB - max single record (enables 2-byte length placeholders)
}

fn default_max_record_count() -> usize {
1000 // messages (bounded channel)
}

fn default_flush_interval() -> Duration {
Duration::from_secs(1)
}

/// Log level for internal engine logs.
Expand Down Expand Up @@ -56,6 +126,41 @@ mod tests {
let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.level, LogLevel::Info);
assert_eq!(config.processors.len(), 1);
assert!(!config.internal_collection.enabled);
}

#[test]
fn test_internal_collection_config_deserialize() {
let yaml_str = r#"
level: "info"
internal_collection:
enabled: true
buffer_size_bytes: 131072
max_record_bytes: 32768
max_record_count: 2000
flush_interval: "2s"
"#;
let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap();
assert!(config.internal_collection.enabled);
assert_eq!(config.internal_collection.buffer_size_bytes, 131072);
assert_eq!(config.internal_collection.max_record_bytes, 32768);
assert_eq!(config.internal_collection.max_record_count, 2000);
assert_eq!(config.internal_collection.flush_interval, Duration::from_secs(2));
}

#[test]
fn test_internal_collection_config_defaults() {
let yaml_str = r#"
level: "info"
internal_collection:
enabled: true
"#;
let config: LogsConfig = serde_yaml::from_str(yaml_str).unwrap();
assert!(config.internal_collection.enabled);
assert_eq!(config.internal_collection.buffer_size_bytes, 64 * 1024);
assert_eq!(config.internal_collection.max_record_bytes, 16 * 1024);
assert_eq!(config.internal_collection.max_record_count, 1000);
assert_eq!(config.internal_collection.flush_interval, Duration::from_secs(1));
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ otap-df-state = { path = "../state" }
otap-df-telemetry = { path = "../telemetry" }
otap-df-telemetry-macros = { path = "../telemetry-macros" }

bytes = { workspace = true }
thiserror = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
Expand Down
66 changes: 66 additions & 0 deletions rust/otap-dataflow/crates/engine/src/effect_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,60 @@
use crate::control::{AckMsg, NackMsg, PipelineControlMsg, PipelineCtrlMsgSender};
use crate::error::Error;
use crate::node::NodeId;
use bytes::Bytes;
use otap_df_channel::error::SendError;
use otap_df_pdata::otlp::stateful_encoder::StatefulOtlpEncoder;
use otap_df_pdata::OtlpProtoBytes;
use otap_df_telemetry::error::Error as TelemetryError;
use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler};
use otap_df_telemetry::reporter::MetricsReporter;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, UdpSocket};
use tokio::sync::mpsc;

/// Telemetry buffer state for local (!Send) components.
pub(crate) struct LocalTelemetryState {
pub(crate) encoder: StatefulOtlpEncoder,
pub(crate) resource_bytes: OtlpProtoBytes,
pub(crate) scope_name: String,
pub(crate) flush_threshold_bytes: usize,
pub(crate) overflow_sender: mpsc::UnboundedSender<Bytes>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it okay to use a unbounded sender for overflow purposes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is a draft, so it is probably a temporary shortcut. The rule I personally follow is to systematically use bounded channels together with a policy that specifies what should happen in case of overflow, that is, drop the incoming message or block the sender. This policy can be configurable, or chosen directly in the code depending on whether it is worth making configurable.

}

/// Telemetry buffer state for shared (Send + Sync) components.
pub(crate) struct SharedTelemetryState {
pub(crate) encoder: StatefulOtlpEncoder,
pub(crate) resource_bytes: OtlpProtoBytes,
pub(crate) scope_name: String,
pub(crate) flush_threshold_bytes: usize,
pub(crate) overflow_sender: mpsc::UnboundedSender<Bytes>,
}

/// Telemetry buffer for component-level logging via shared (Send + Sync) effect handlers.
///
/// Routes component logs to the internal telemetry receiver.
/// The overflow_sender can point to:
/// - ITR receiver (when ITR is configured) - enters OTAP pipeline
/// - OtlpBytesChannel (SDK fallback) - same as 3rd party logs
///
/// Note: Local (!Send) effect handlers store LocalTelemetryState directly
/// in their struct to avoid breaking Send bounds.
#[derive(Clone)]
pub struct TelemetryBuffer(Arc<Mutex<SharedTelemetryState>>);

impl TelemetryBuffer {
/// Create a new telemetry buffer for shared effect handlers.
pub(crate) fn new(state: SharedTelemetryState) -> Self {
Self(Arc::new(Mutex::new(state)))
}

/// Get a reference to the inner state (for internal use).
pub(crate) fn inner(&self) -> &Arc<Mutex<SharedTelemetryState>> {
&self.0
}
}

/// Common implementation of all effect handlers.
///
Expand All @@ -25,6 +72,10 @@ pub(crate) struct EffectHandlerCore<PData> {
#[allow(dead_code)]
// Will be used in the future. ToDo report metrics from channel and messages.
pub(crate) metrics_reporter: MetricsReporter,

/// Telemetry buffer for component-level logging.
/// None if telemetry is disabled for this component.
pub(crate) telemetry_buffer: Option<TelemetryBuffer>,
}

impl<PData> EffectHandlerCore<PData> {
Expand All @@ -34,6 +85,21 @@ impl<PData> EffectHandlerCore<PData> {
node_id,
pipeline_ctrl_msg_sender: None,
metrics_reporter,
telemetry_buffer: None,
}
}

/// Creates a new EffectHandlerCore with telemetry buffer (for shared effect handlers only).
pub(crate) fn new_with_shared_telemetry(
node_id: NodeId,
metrics_reporter: MetricsReporter,
telemetry_buffer: TelemetryBuffer,
) -> Self {
Self {
node_id,
pipeline_ctrl_msg_sender: None,
metrics_reporter,
telemetry_buffer: Some(telemetry_buffer),
}
}

Expand Down
43 changes: 43 additions & 0 deletions rust/otap-dataflow/crates/engine/src/local/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ pub struct EffectHandler<PData> {
msg_senders: HashMap<PortName, LocalSender<PData>>,
/// Cached default sender for fast access in the hot path
default_sender: Option<LocalSender<PData>>,

/// Local telemetry state for component-level logging.
/// Stored directly in EffectHandler (not in EffectHandlerCore) because
/// it contains Rc<RefCell<>> which would break Send bounds for shared handlers.
telemetry_state: Option<std::rc::Rc<std::cell::RefCell<crate::effect_handler::LocalTelemetryState>>>,
}

/// Implementation for the `!Send` effect handler.
Expand Down Expand Up @@ -124,6 +129,7 @@ impl<PData> EffectHandler<PData> {
core,
msg_senders,
default_sender,
telemetry_state: None,
}
}

Expand Down Expand Up @@ -252,6 +258,43 @@ impl<PData> EffectHandler<PData> {
self.core.report_metrics(metrics)
}

/// Log a telemetry event from this component.
///
/// This method encodes the log record into the component's telemetry buffer.
/// On overflow (buffer exceeds threshold), it automatically flushes to the
/// configured destination (ITR or SDK fallback) via non-blocking channel send.
///
/// This method never fails - errors are silently dropped to prevent recursion.
/// If the telemetry buffer is not configured, this is a no-op.
pub fn log_event(&self, log_record: &impl otap_df_pdata::views::logs::LogRecordView) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to find a way to reuse the NodeAttributeSet that we already use for metrics. That will let every log emitted by a node share a common context with the metrics.

if let Some(ref buffer) = self.telemetry_state {
let mut state = buffer.borrow_mut();

// Clone necessary data to avoid holding immutable borrows
let resource_bytes = state.resource_bytes.clone();
let scope_name = state.scope_name.clone();
let threshold = state.flush_threshold_bytes;

// Encode the log record (silent drop on error to prevent recursion)
if state.encoder.encode_log_record(
log_record,
resource_bytes.as_bytes(),
&scope_name,
).is_err() {
// TODO: Increment dropped events counter
return;
}

// Check overflow - non-blocking send to break recursion
if state.encoder.len() >= threshold {
let bytes = state.encoder.flush();
// Non-blocking send - errors are silently dropped
// Destination is either ITR or SDK fallback (OtlpBytesChannel)
let _ = state.overflow_sender.send(bytes);
}
}
}

// More methods will be added in the future as needed.
}

Expand Down
38 changes: 38 additions & 0 deletions rust/otap-dataflow/crates/engine/src/shared/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,5 +242,43 @@ impl<PData> EffectHandler<PData> {
self.core.report_metrics(metrics)
}

/// Log a telemetry event from this component.
///
/// This method encodes the log record into the component's telemetry buffer.
/// On overflow (buffer exceeds threshold), it automatically flushes to the
/// configured destination (ITR or SDK fallback) via non-blocking channel send.
///
/// This method never fails - errors are silently dropped to prevent recursion.
/// If the telemetry buffer is not configured, this is a no-op.
/// Thread-safe via Arc<Mutex<>>.
pub fn log_event(&self, log_record: &impl otap_df_pdata::views::logs::LogRecordView) {
if let Some(ref buffer) = self.core.telemetry_buffer {
let mut state = buffer.inner().lock().unwrap();

// Clone necessary data to avoid holding immutable borrows
let resource_bytes = state.resource_bytes.clone();
let scope_name = state.scope_name.clone();
let threshold = state.flush_threshold_bytes;

// Encode the log record (silent drop on error to prevent recursion)
if state.encoder.encode_log_record(
log_record,
resource_bytes.as_bytes(),
&scope_name,
).is_err() {
// TODO: Increment dropped events counter
return;
}

// Check overflow - non-blocking send to break recursion
if state.encoder.len() >= threshold {
let bytes = state.encoder.flush();
// Non-blocking send - errors are silently dropped
// Destination is either ITR or SDK fallback (OtlpBytesChannel)
let _ = state.overflow_sender.send(bytes);
}
}
}

// More methods will be added in the future as needed.
}
2 changes: 2 additions & 0 deletions rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ smallvec = { workspace = true }
bitflags = { workspace = true }
bytes = { workspace = true }
parking_lot = { workspace = true }
tracing = { workspace = true }
tracing-core = { workspace = true }

data_engine_kql_parser = { workspace = true }
data_engine_expressions = { workspace = true }
Expand Down
Loading