Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ opentelemetry-proto = { version = "0.31", default-features = false, features = [
opentelemetry_sdk = "0.31.0"
opentelemetry-stdout = "0.31.0"
opentelemetry-otlp = "0.31.0"
opentelemetry-prometheus = "0.31.0"
parking_lot = "0.12.5"
paste = "1"
parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"]}
portpicker = "0.1.1"
pretty_assertions = "1.4.1"
proc-macro2 = "1.0"
prometheus = "0.14.0"
prost = "0.14"
quote = "1.0"
rand = "0.9.2"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
settings:
default_pipeline_ctrl_msg_channel_size: 100
default_node_ctrl_msg_channel_size: 100
default_pdata_channel_size: 100

nodes:
receiver:
kind: receiver
plugin_urn: "urn:otel:otap:fake_data_generator:receiver"
out_ports:
out_port:
destinations:
- debug
dispatch_strategy: round_robin
config:
traffic_config:
max_signal_count: 1000
max_batch_size: 1000
signals_per_second: 1000
log_weight: 100
registry_path: https://github.com/open-telemetry/semantic-conventions.git[model]
debug:
kind: processor
plugin_urn: "urn:otel:debug:processor"
out_ports:
out_port:
destinations:
- noop
dispatch_strategy: round_robin
config:
verbosity: basic
noop:
kind: exporter
plugin_urn: "urn:otel:noop:exporter"
config:

service:
telemetry:
metrics:
readers:
- pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 9090
path: "/metrics"
views:
- selector:
instrument_name: "logs.produced"
stream:
name: "otlp.logs.produced.count"
description: "Count of logs produced"
resource:
service.name: "fake-debug-noop-service"
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
//! Readers level configurations.

pub mod periodic;
pub mod pull;

use std::time::Duration;

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::pipeline::service::telemetry::metrics::readers::periodic::MetricsPeriodicExporterConfig;
use crate::pipeline::service::telemetry::metrics::readers::{
periodic::MetricsPeriodicExporterConfig, pull::MetricsPullExporterConfig,
};

/// OpenTelemetry Metrics Reader configuration.
#[derive(Debug, Clone, Serialize, JsonSchema)]
Expand All @@ -19,9 +22,7 @@ pub enum MetricsReaderConfig {
/// Periodic reader that exports metrics at regular intervals.
Periodic(MetricsReaderPeriodicConfig),
/// Pull reader that allows on-demand metric collection.
Pull {
//TODO: Add specific configuration for supported pull readers.
},
Pull(MetricsReaderPullConfig),
}

/// OpenTelemetry Metrics Periodic Reader configuration.
Expand Down Expand Up @@ -50,16 +51,16 @@ impl<'de> Deserialize<'de> for MetricsReaderConfig {
#[serde(rename = "periodic")]
periodic: Option<MetricsReaderPeriodicConfig>,
#[serde(rename = "pull")]
pull: Option<()>,
pull: Option<MetricsReaderPullConfig>,
}

let reader_options_result = ReaderOptions::deserialize(deserializer);
match reader_options_result {
Ok(options) => {
if let Some(config) = options.periodic {
Ok(MetricsReaderConfig::Periodic(config))
} else if options.pull.is_some() {
Ok(MetricsReaderConfig::Pull {})
} else if let Some(config) = options.pull {
Ok(MetricsReaderConfig::Pull(config))
} else {
Err(serde::de::Error::custom(
"Expected either 'periodic' or 'pull' reader",
Expand All @@ -74,6 +75,13 @@ impl<'de> Deserialize<'de> for MetricsReaderConfig {
}
}

/// OpenTelemetry Metrics Pull Reader configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct MetricsReaderPullConfig {
/// The metrics exporter to use.
pub exporter: MetricsPullExporterConfig,
}

/// The temporality of the metrics to be exported.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -109,6 +117,26 @@ mod tests {
}
}

#[test]
fn test_metrics_reader_config_deserialize_pull() {
let yaml_str = r#"
pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 9090
"#;
let config: MetricsReaderConfig = serde_yaml::from_str(yaml_str).unwrap();

if let MetricsReaderConfig::Pull(pull_config) = config {
let MetricsPullExporterConfig::Prometheus(prometheus_config) = pull_config.exporter;
assert_eq!(prometheus_config.host, "0.0.0.0");
assert_eq!(prometheus_config.port, 9090);
} else {
panic!("Expected pull reader");
}
}

#[test]
fn test_temporality_deserialize() {
let yaml_str_cumulative = r#"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Pull reader level configurations.

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// OpenTelemetry Metrics Pull Exporter configuration.
#[derive(Debug, Clone, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum MetricsPullExporterConfig {
/// Prometheus exporter that exposes metrics for scraping.
Prometheus(PrometheusExporterConfig),
}

impl<'de> Deserialize<'de> for MetricsPullExporterConfig {
/// Custom deserialization to handle different exporter types.
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{MapAccess, Visitor};
use std::fmt;
struct MetricsPullExporterConfigVisitor;

impl<'de> Visitor<'de> for MetricsPullExporterConfigVisitor {
type Value = MetricsPullExporterConfig;

fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("a map with 'prometheus' key")
}

fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
if let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"prometheus" => {
let prometheus_config: PrometheusExporterConfig = map.next_value()?;
Ok(MetricsPullExporterConfig::Prometheus(prometheus_config))
}
_ => Err(serde::de::Error::unknown_field(&key, &["prometheus"])),
}
} else {
Err(serde::de::Error::custom("Expected 'prometheus' exporter"))
}
}
}

deserializer.deserialize_map(MetricsPullExporterConfigVisitor)
}
}

/// Prometheus Exporter configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct PrometheusExporterConfig {
/// The host address where the Prometheus exporter will expose metrics.
#[serde(default = "default_host")]
pub host: String,

/// The port on which the Prometheus exporter will listen for scrape requests.
#[serde(default = "default_port")]
pub port: u16,

/// The HTTP path where metrics will be exposed.
#[serde(default = "default_metrics_path")]
pub path: String,
}

fn default_host() -> String {
"0.0.0.0".to_string()
}

fn default_port() -> u16 {
9090
}

fn default_metrics_path() -> String {
"/metrics".to_string()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_metrics_pull_exporter_config_deserialize() {
let yaml_str = r#"
prometheus:
host: "127.0.0.1"
port: 9090
path: "/"
"#;
let config: MetricsPullExporterConfig = serde_yaml::from_str(yaml_str).unwrap();

let MetricsPullExporterConfig::Prometheus(prometheus_config) = config;
assert_eq!(prometheus_config.host, "127.0.0.1");
assert_eq!(prometheus_config.port, 9090);
assert_eq!(prometheus_config.path, "/");
}

#[test]
fn test_metrics_pull_exporter_invalid_config_deserialize() {
let yaml_str = r#"
unknown_exporter:
some_field: "value"
"#;
let result: Result<MetricsPullExporterConfig, _> = serde_yaml::from_str(yaml_str);
match result {
Ok(_) => panic!("Deserialization should have failed for unknown exporter"),
Err(err) => {
let err_msg = err.to_string();
assert!(err_msg.contains("unknown field"));
assert!(err_msg.contains("prometheus"));
}
}
}

#[test]
fn test_prometheus_exporter_config_deserialize() {
let yaml_str = r#"
host: "127.0.0.1"
port: 9090
path: "/custom_metrics"
"#;
let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.host, "127.0.0.1");
assert_eq!(config.port, 9090);
assert_eq!(config.path, "/custom_metrics");
}

#[test]
fn test_prometheus_exporter_config_default_path_deserialize() {
let yaml_str = r#"
host: "127.0.0.1"
port: 9090
"#;
let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.host, "127.0.0.1");
assert_eq!(config.port, 9090);
assert_eq!(config.path, "/metrics");
}

#[test]
fn test_prometheus_exporter_unknown_field_config_deserialize() {
let yaml_str = r#"
host: "0.0.0.0"
port: 8080
extra_field: "unexpected"
"#;
let result: Result<PrometheusExporterConfig, _> = serde_yaml::from_str(yaml_str);
match result {
Ok(_) => panic!("Deserialization should have failed for unknown field"),
Err(err) => {
let err_msg = err.to_string();
assert!(err_msg.contains("unknown field `extra_field`"));
}
}
}

#[test]
fn test_prometheus_exporter_config_defaults() {
let yaml_str = r#""#;
let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap();
assert_eq!(config.host, "0.0.0.0");
assert_eq!(config.port, 9090);
assert_eq!(config.path, "/metrics");
}
}
6 changes: 6 additions & 0 deletions rust/otap-dataflow/crates/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ unchecked-index = []
unchecked-arithmetic = []

[dependencies]
axum = { workspace = true }
otap-df-config = { workspace = true }
flume = { workspace = true }
tokio = { workspace = true }
Expand All @@ -27,12 +28,17 @@ serde_yaml = { workspace = true }
thiserror = { workspace = true }
slotmap = { workspace = true }
parking_lot = { workspace = true }
prometheus = { workspace = true }
serde = { workspace = true }
tonic = { workspace = true, optional = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-stdout = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics", "logs"] }
opentelemetry-prometheus = { workspace = true }
opentelemetry-appender-tracing = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] }

[dev-dependencies]
tower = { workspace = true }
Loading
Loading