diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index c62f44cc02..06a284e100 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -108,12 +108,14 @@ opentelemetry-proto = { version = "0.31", default-features = false, features = [ opentelemetry_sdk = "0.31.0" opentelemetry-stdout = "0.31.0" opentelemetry-otlp = "0.31.0" +opentelemetry-prometheus = "0.31.0" parking_lot = "0.12.5" paste = "1" parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"]} portpicker = "0.1.1" pretty_assertions = "1.4.1" proc-macro2 = "1.0" +prometheus = "0.14.0" prost = "0.14" quote = "1.0" rand = "0.9.2" diff --git a/rust/otap-dataflow/configs/fake-debug-noop-promethueus-telemetry.yaml b/rust/otap-dataflow/configs/fake-debug-noop-promethueus-telemetry.yaml new file mode 100644 index 0000000000..fc194233fd --- /dev/null +++ b/rust/otap-dataflow/configs/fake-debug-noop-promethueus-telemetry.yaml @@ -0,0 +1,54 @@ +settings: + default_pipeline_ctrl_msg_channel_size: 100 + default_node_ctrl_msg_channel_size: 100 + default_pdata_channel_size: 100 + +nodes: + receiver: + kind: receiver + plugin_urn: "urn:otel:otap:fake_data_generator:receiver" + out_ports: + out_port: + destinations: + - debug + dispatch_strategy: round_robin + config: + traffic_config: + max_signal_count: 1000 + max_batch_size: 1000 + signals_per_second: 1000 + log_weight: 100 + registry_path: https://github.com/open-telemetry/semantic-conventions.git[model] + debug: + kind: processor + plugin_urn: "urn:otel:debug:processor" + out_ports: + out_port: + destinations: + - noop + dispatch_strategy: round_robin + config: + verbosity: basic + noop: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: + +service: + telemetry: + metrics: + readers: + - pull: + exporter: + prometheus: + host: "0.0.0.0" + port: 9090 + path: "/metrics" + views: + - selector: + instrument_name: "logs.produced" + stream: + name: "otlp.logs.produced.count" + description: "Count of logs produced" + resource: + service.name: "fake-debug-noop-service" diff --git a/rust/otap-dataflow/crates/config/README.md b/rust/otap-dataflow/crates/config/README.md index 92936bd6ba..6b5c085ba0 100644 --- a/rust/otap-dataflow/crates/config/README.md +++ b/rust/otap-dataflow/crates/config/README.md @@ -64,7 +64,9 @@ The telemetry configuration includes: - **Metrics**: OpenTelemetry metrics for pipeline observability - **Readers**: Periodic or pull-based metric readers - - **Exporters**: Console, OTLP (gRPC/HTTP), or custom exporters + - **Periodic Readers**: Export metrics at regular intervals + - **Pull Readers**: Expose metrics via HTTP endpoint for scraping (e.g., Prometheus) + - **Exporters**: Console, OTLP (gRPC/HTTP), or Prometheus - **Views**: Metric aggregation and transformation rules - **Temporality**: Delta or cumulative aggregation - **Logs**: Internal logging configuration @@ -89,11 +91,20 @@ service: deployment.environment: "production" metrics: readers: + # Periodic reader - pushes metrics to OTLP endpoint - periodic: exporter: otlp: endpoint: "http://localhost:4318" protocol: "grpc/protobuf" + interval: "60s" + # Pull reader - exposes metrics for Prometheus scraping + - pull: + exporter: + prometheus: + host: "0.0.0.0" + port: 9090 + path: "/metrics" views: - selector: instrument_name: "logs.produced" @@ -114,12 +125,22 @@ service: #### Metric Exporters +##### Periodic Exporters + - **Console**: Prints metrics to stdout (useful for debugging) - **OTLP**: OpenTelemetry Protocol exporters - **grpc/protobuf**: Binary protocol over gRPC - **http/protobuf**: Binary protobuf over HTTP - **http/json**: JSON over HTTP +##### Pull Exporters + +- **Prometheus**: Exposes metrics via HTTP endpoint for Prometheus scraping + - Configurable host, port, and path + - Exposes metrics in Prometheus text format + - Example: `http://0.0.0.0:9090/metrics` + - Compatible with Prometheus, Grafana, and other scraping systems + #### Log Exporters - **Console**: Prints logs to stdout with structured formatting diff --git a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/metrics/readers.rs b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/metrics/readers.rs index 33c917bcc8..cc256a51cf 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/metrics/readers.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/metrics/readers.rs @@ -4,13 +4,16 @@ //! Readers level configurations. pub mod periodic; +pub mod pull; use std::time::Duration; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::pipeline::service::telemetry::metrics::readers::periodic::MetricsPeriodicExporterConfig; +use crate::pipeline::service::telemetry::metrics::readers::{ + periodic::MetricsPeriodicExporterConfig, pull::MetricsPullExporterConfig, +}; /// OpenTelemetry Metrics Reader configuration. #[derive(Debug, Clone, Serialize, JsonSchema)] @@ -19,9 +22,7 @@ pub enum MetricsReaderConfig { /// Periodic reader that exports metrics at regular intervals. Periodic(MetricsReaderPeriodicConfig), /// Pull reader that allows on-demand metric collection. - Pull { - //TODO: Add specific configuration for supported pull readers. - }, + Pull(MetricsReaderPullConfig), } /// OpenTelemetry Metrics Periodic Reader configuration. @@ -50,7 +51,7 @@ impl<'de> Deserialize<'de> for MetricsReaderConfig { #[serde(rename = "periodic")] periodic: Option, #[serde(rename = "pull")] - pull: Option<()>, + pull: Option, } let reader_options_result = ReaderOptions::deserialize(deserializer); @@ -58,8 +59,8 @@ impl<'de> Deserialize<'de> for MetricsReaderConfig { Ok(options) => { if let Some(config) = options.periodic { Ok(MetricsReaderConfig::Periodic(config)) - } else if options.pull.is_some() { - Ok(MetricsReaderConfig::Pull {}) + } else if let Some(config) = options.pull { + Ok(MetricsReaderConfig::Pull(config)) } else { Err(serde::de::Error::custom( "Expected either 'periodic' or 'pull' reader", @@ -74,6 +75,13 @@ impl<'de> Deserialize<'de> for MetricsReaderConfig { } } +/// OpenTelemetry Metrics Pull Reader configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct MetricsReaderPullConfig { + /// The metrics exporter to use. + pub exporter: MetricsPullExporterConfig, +} + /// The temporality of the metrics to be exported. #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default, PartialEq)] #[serde(rename_all = "lowercase")] @@ -109,6 +117,26 @@ mod tests { } } + #[test] + fn test_metrics_reader_config_deserialize_pull() { + let yaml_str = r#" + pull: + exporter: + prometheus: + host: "0.0.0.0" + port: 9090 + "#; + let config: MetricsReaderConfig = serde_yaml::from_str(yaml_str).unwrap(); + + if let MetricsReaderConfig::Pull(pull_config) = config { + let MetricsPullExporterConfig::Prometheus(prometheus_config) = pull_config.exporter; + assert_eq!(prometheus_config.host, "0.0.0.0"); + assert_eq!(prometheus_config.port, 9090); + } else { + panic!("Expected pull reader"); + } + } + #[test] fn test_temporality_deserialize() { let yaml_str_cumulative = r#" diff --git a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/metrics/readers/pull.rs b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/metrics/readers/pull.rs new file mode 100644 index 0000000000..19d3b6ec91 --- /dev/null +++ b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/metrics/readers/pull.rs @@ -0,0 +1,172 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Pull reader level configurations. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// OpenTelemetry Metrics Pull Exporter configuration. +#[derive(Debug, Clone, Serialize, JsonSchema, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum MetricsPullExporterConfig { + /// Prometheus exporter that exposes metrics for scraping. + Prometheus(PrometheusExporterConfig), +} + +impl<'de> Deserialize<'de> for MetricsPullExporterConfig { + /// Custom deserialization to handle different exporter types. + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + use serde::de::{MapAccess, Visitor}; + use std::fmt; + struct MetricsPullExporterConfigVisitor; + + impl<'de> Visitor<'de> for MetricsPullExporterConfigVisitor { + type Value = MetricsPullExporterConfig; + + fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("a map with 'prometheus' key") + } + + fn visit_map(self, mut map: M) -> Result + where + M: MapAccess<'de>, + { + if let Some(key) = map.next_key::()? { + match key.as_str() { + "prometheus" => { + let prometheus_config: PrometheusExporterConfig = map.next_value()?; + Ok(MetricsPullExporterConfig::Prometheus(prometheus_config)) + } + _ => Err(serde::de::Error::unknown_field(&key, &["prometheus"])), + } + } else { + Err(serde::de::Error::custom("Expected 'prometheus' exporter")) + } + } + } + + deserializer.deserialize_map(MetricsPullExporterConfigVisitor) + } +} + +/// Prometheus Exporter configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct PrometheusExporterConfig { + /// The host address where the Prometheus exporter will expose metrics. + #[serde(default = "default_host")] + pub host: String, + + /// The port on which the Prometheus exporter will listen for scrape requests. + #[serde(default = "default_port")] + pub port: u16, + + /// The HTTP path where metrics will be exposed. + #[serde(default = "default_metrics_path")] + pub path: String, +} + +fn default_host() -> String { + "0.0.0.0".to_string() +} + +fn default_port() -> u16 { + 9090 +} + +fn default_metrics_path() -> String { + "/metrics".to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_metrics_pull_exporter_config_deserialize() { + let yaml_str = r#" + prometheus: + host: "127.0.0.1" + port: 9090 + path: "/" + "#; + let config: MetricsPullExporterConfig = serde_yaml::from_str(yaml_str).unwrap(); + + let MetricsPullExporterConfig::Prometheus(prometheus_config) = config; + assert_eq!(prometheus_config.host, "127.0.0.1"); + assert_eq!(prometheus_config.port, 9090); + assert_eq!(prometheus_config.path, "/"); + } + + #[test] + fn test_metrics_pull_exporter_invalid_config_deserialize() { + let yaml_str = r#" + unknown_exporter: + some_field: "value" + "#; + let result: Result = serde_yaml::from_str(yaml_str); + match result { + Ok(_) => panic!("Deserialization should have failed for unknown exporter"), + Err(err) => { + let err_msg = err.to_string(); + assert!(err_msg.contains("unknown field")); + assert!(err_msg.contains("prometheus")); + } + } + } + + #[test] + fn test_prometheus_exporter_config_deserialize() { + let yaml_str = r#" + host: "127.0.0.1" + port: 9090 + path: "/custom_metrics" + "#; + let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap(); + assert_eq!(config.host, "127.0.0.1"); + assert_eq!(config.port, 9090); + assert_eq!(config.path, "/custom_metrics"); + } + + #[test] + fn test_prometheus_exporter_config_default_path_deserialize() { + let yaml_str = r#" + host: "127.0.0.1" + port: 9090 + "#; + let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap(); + assert_eq!(config.host, "127.0.0.1"); + assert_eq!(config.port, 9090); + assert_eq!(config.path, "/metrics"); + } + + #[test] + fn test_prometheus_exporter_unknown_field_config_deserialize() { + let yaml_str = r#" + host: "0.0.0.0" + port: 8080 + extra_field: "unexpected" + "#; + let result: Result = serde_yaml::from_str(yaml_str); + match result { + Ok(_) => panic!("Deserialization should have failed for unknown field"), + Err(err) => { + let err_msg = err.to_string(); + assert!(err_msg.contains("unknown field `extra_field`")); + } + } + } + + #[test] + fn test_prometheus_exporter_config_defaults() { + let yaml_str = r#""#; + let config: PrometheusExporterConfig = serde_yaml::from_str(yaml_str).unwrap(); + assert_eq!(config.host, "0.0.0.0"); + assert_eq!(config.port, 9090); + assert_eq!(config.path, "/metrics"); + } +} diff --git a/rust/otap-dataflow/crates/telemetry/Cargo.toml b/rust/otap-dataflow/crates/telemetry/Cargo.toml index aae85d1f8a..dce25e697a 100644 --- a/rust/otap-dataflow/crates/telemetry/Cargo.toml +++ b/rust/otap-dataflow/crates/telemetry/Cargo.toml @@ -19,6 +19,7 @@ unchecked-index = [] unchecked-arithmetic = [] [dependencies] +axum = { workspace = true } otap-df-config = { workspace = true } flume = { workspace = true } tokio = { workspace = true } @@ -27,12 +28,17 @@ serde_yaml = { workspace = true } thiserror = { workspace = true } slotmap = { workspace = true } parking_lot = { workspace = true } +prometheus = { workspace = true } serde = { workspace = true } tonic = { workspace = true, optional = true } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true } opentelemetry-stdout = { workspace = true } opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics", "logs"] } +opentelemetry-prometheus = { workspace = true } opentelemetry-appender-tracing = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } + +[dev-dependencies] +tower = { workspace = true } diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider.rs b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider.rs index 9d11ab6450..3628ae7b12 100644 --- a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider.rs +++ b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider.rs @@ -3,6 +3,7 @@ //! Configures the OpenTelemetry meter provider based on the provided configuration. +pub mod prometheus_exporter_provider; pub mod views_provider; use opentelemetry::global; @@ -19,10 +20,14 @@ use otap_df_config::pipeline::service::telemetry::metrics::{ MetricsPeriodicExporterConfig, otlp::{OtlpExporterConfig, OtlpProtocol}, }, + pull::MetricsPullExporterConfig, }, }; -use crate::error::Error; +use crate::{ + error::Error, + opentelemetry_client::meter_provider::prometheus_exporter_provider::PrometheusExporterProvider, +}; /// Wrapper around the OpenTelemetry SDK meter provider and its runtime. pub struct MeterProvider { @@ -92,9 +97,16 @@ impl MeterProvider { } Ok((sdk_meter_builder, runtime)) } - MetricsReaderConfig::Pull { .. } => Err(Error::ConfigurationError( - "Pull-based metric readers are not implemented yet".to_string(), - )), + MetricsReaderConfig::Pull(pull_config) => match &pull_config.exporter { + MetricsPullExporterConfig::Prometheus(prometheus_config) => { + (sdk_meter_builder, runtime) = PrometheusExporterProvider::configure_exporter( + sdk_meter_builder, + prometheus_config, + runtime, + )?; + Ok((sdk_meter_builder, runtime)) + } + }, } } @@ -191,7 +203,9 @@ impl MeterProvider { #[cfg(test)] mod tests { use super::*; - use otap_df_config::pipeline::service::telemetry::metrics::readers::MetricsReaderPeriodicConfig; + use otap_df_config::pipeline::service::telemetry::metrics::readers::{ + MetricsReaderPeriodicConfig, MetricsReaderPullConfig, pull::PrometheusExporterConfig, + }; #[test] fn test_meter_provider_configure_with_non_runtime_readers() -> Result<(), Error> { @@ -246,6 +260,13 @@ mod tests { temporality: Temporality::Cumulative, }), }), + MetricsReaderConfig::Pull(MetricsReaderPullConfig { + exporter: MetricsPullExporterConfig::Prometheus(PrometheusExporterConfig { + host: "0.0.0.0".to_string(), + port: 9090, + path: "/metrics".to_string(), + }), + }), ]; let metrics_config = MetricsConfig { readers: metric_readers, diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/prometheus_exporter_provider.rs b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/prometheus_exporter_provider.rs new file mode 100644 index 0000000000..ed7c64d096 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/meter_provider/prometheus_exporter_provider.rs @@ -0,0 +1,244 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Provider for Prometheus exporter configuration. + +use std::net::SocketAddr; + +use axum::{Router, extract::State, http::StatusCode, response::Response, routing::get}; +use opentelemetry_sdk::metrics::MeterProviderBuilder; +use otap_df_config::pipeline::service::telemetry::metrics::readers::pull::PrometheusExporterConfig; +use prometheus::{Encoder, Registry, TextEncoder}; + +use crate::error::Error; + +/// Provider for Prometheus exporter configuration. +pub struct PrometheusExporterProvider {} + +impl PrometheusExporterProvider { + /// Configure the Prometheus exporter for the given MeterProviderBuilder. + pub fn configure_exporter( + mut sdk_meter_builder: MeterProviderBuilder, + prometheus_config: &PrometheusExporterConfig, + runtime: Option, + ) -> Result<(MeterProviderBuilder, Option), Error> { + Self::validate_config(prometheus_config)?; + + let registry = Registry::new(); + + let exporter = opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build() + .map_err(|e| Error::ConfigurationError(e.to_string()))?; + + sdk_meter_builder = sdk_meter_builder.with_reader(exporter); + + // If there is a tokio runtime already, use it. Otherwise, create a new one. + let mut tokio_runtime = match runtime { + Some(rt) => rt, + None => tokio::runtime::Runtime::new() + .map_err(|e| Error::ConfigurationError(e.to_string()))?, + }; + + tokio_runtime = + Self::start_async_prometheus_server(registry, tokio_runtime, prometheus_config)?; + + Ok((sdk_meter_builder, Some(tokio_runtime))) + } + + fn validate_config(prometheus_config: &PrometheusExporterConfig) -> Result<(), Error> { + let endpoint = format!("{}:{}", prometheus_config.host, prometheus_config.port); + let _parsed_endpoint = endpoint.parse::().map_err(|e| { + Error::ConfigurationError(format!("Invalid Prometheus bind address: {}", e)) + })?; + + let path = &prometheus_config.path; + if !path.starts_with('/') { + return Err(Error::ConfigurationError( + "Prometheus metrics path must start with '/'".to_string(), + )); + } + + Ok(()) + } + + fn start_async_prometheus_server( + registry: Registry, + runtime: tokio::runtime::Runtime, + prometheus_config: &PrometheusExporterConfig, + ) -> Result { + let endpoint = format!("{}:{}", prometheus_config.host, prometheus_config.port); + let path = prometheus_config.path.clone(); + let _server_handle = runtime.spawn(async move { + Self::start_prometheus_server(registry, &endpoint, &path) + .await + .map_err(|e| { + Error::ConfigurationError(format!("Failed to start Prometheus server: {}", e)) + }) + }); + Ok(runtime) + } + + async fn start_prometheus_server( + registry: Registry, + endpoint: &str, + path: &str, + ) -> Result<(), Error> { + let addr: SocketAddr = endpoint.parse().map_err(|e| { + Error::ConfigurationError(format!("Invalid Prometheus bind address: {}", e)) + })?; + + let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| { + Error::ConfigurationError(format!("Failed to bind Prometheus server: {}", e)) + })?; + + let app = Router::new() + .merge(Self::routes(path)) + .with_state(registry.clone()); + + axum::serve(listener, app) + .await + .map_err(|e| Error::ConfigurationError(format!("Prometheus server failed: {}", e)))?; + + Ok(()) + } + + /// Define the routes for the Prometheus exporter. + fn routes(path: &str) -> Router { + Router::new().route(path, get(Self::get_metrics)) + } + + /// Handler for the metrics endpoint. Ex: `/metrics`. + async fn get_metrics(State(registry): State) -> Result { + let encoder = TextEncoder::new(); + let metric_families = registry.gather(); + let mut buffer = Vec::new(); + encoder + .encode(&metric_families, &mut buffer) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let response = Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + .body(buffer.into()) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(response) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_prometheus_exporter_provider_configure_exporter() { + let prometheus_config = PrometheusExporterConfig { + host: "0.0.0.0".to_string(), + port: 9090, + path: "/metrics".to_string(), + }; + let sdk_meter_builder = MeterProviderBuilder::default(); + let result = PrometheusExporterProvider::configure_exporter( + sdk_meter_builder, + &prometheus_config, + None, + ); + match result { + Ok((_, Some(tokio_runtime))) => { + tokio_runtime.shutdown_background(); + } + _ => panic!("Failed to configure Prometheus exporter"), + } + } + + #[test] + fn test_prometheus_invalid_host_config() { + let prometheus_config = PrometheusExporterConfig { + host: "invalid_host".to_string(), + port: 9090, + path: "/metrics".to_string(), + }; + let result = PrometheusExporterProvider::validate_config(&prometheus_config); + match result { + Err(Error::ConfigurationError(err)) => { + assert!(err.contains("Invalid Prometheus bind address")); + } + _ => panic!("Expected ConfigurationError for invalid host"), + } + } + + #[test] + fn test_prometheus_invalid_path_config() { + let prometheus_config = PrometheusExporterConfig { + host: "0.0.0.0".to_string(), + port: 9090, + path: "invalid/path/for/prometheus".to_string(), + }; + let result = PrometheusExporterProvider::validate_config(&prometheus_config); + match result { + Err(Error::ConfigurationError(err)) => { + assert!(err.contains("must start with '/'")); + } + _ => panic!("Expected ConfigurationError for invalid path"), + } + } + + #[tokio::test] + async fn test_prometheus_get_metrics_handler() { + let registry = Registry::new(); + registry + .register(Box::new( + prometheus::Counter::new("test_counter", "A test counter").unwrap(), + )) + .unwrap(); + let response = PrometheusExporterProvider::get_metrics(State(registry)).await; + match response { + Ok(resp) => { + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_body(); + let bytes = axum::body::to_bytes(body, usize::MAX) + .await + .expect("Failed to read body bytes"); + let body_str = String::from_utf8(bytes.to_vec()).expect("Body is not valid UTF-8"); + assert!(body_str.contains("HELP test_counter A test counter")); + } + Err(_) => panic!("Failed to get metrics"), + } + } + + #[tokio::test] + async fn test_routes() { + use axum::body::Body; + use axum::http::{Method, Request}; + use tower::ServiceExt; // For oneshot + + let registry = Registry::new(); + registry + .register(Box::new( + prometheus::Counter::new("route_test_counter", "A counter for route testing") + .unwrap(), + )) + .unwrap(); + + let app = PrometheusExporterProvider::routes("/metrics").with_state(registry); + + // Test the /metrics endpoint + let request = Request::builder() + .method(Method::GET) + .uri("/metrics") + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.into_body(); + let bytes = axum::body::to_bytes(body, usize::MAX) + .await + .expect("Failed to read body"); + let body_str = String::from_utf8(bytes.to_vec()).expect("Body is not valid UTF-8"); + + assert!(body_str.contains("route_test_counter")); + } +}