diff --git a/changelog.d/buffer_utilization_mean_metrics.enhancement.md b/changelog.d/buffer_utilization_mean_metrics.enhancement.md new file mode 100644 index 0000000000000..3a88daa924c48 --- /dev/null +++ b/changelog.d/buffer_utilization_mean_metrics.enhancement.md @@ -0,0 +1,3 @@ +Added moving-mean gauges for source and transform buffers (`source_buffer_utilization_mean` and `transform_buffer_utilization_mean`), so observers can track an EWMA of buffer utilization in addition to the instant level. + +authors: bruceg diff --git a/lib/vector-buffers/src/topology/channel/limited_queue.rs b/lib/vector-buffers/src/topology/channel/limited_queue.rs index 54264e06afe60..0c47064dacc94 100644 --- a/lib/vector-buffers/src/topology/channel/limited_queue.rs +++ b/lib/vector-buffers/src/topology/channel/limited_queue.rs @@ -16,9 +16,15 @@ use crossbeam_queue::{ArrayQueue, SegQueue}; use futures::Stream; use metrics::{Gauge, Histogram, gauge, histogram}; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError}; +use vector_common::stats::AtomicEwma; use crate::{InMemoryBufferable, config::MemoryBufferSize}; +/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a +/// measure of how much weight to give to the current value versus the previous values. A value of +/// 0.9 results in a "half life" of 6-7 measurements. +const EWMA_ALPHA: f64 = 0.9; + /// Error returned by `LimitedSender::send` when the receiver has disconnected. #[derive(Debug, PartialEq, Eq)] pub struct SendError(pub T); @@ -108,6 +114,8 @@ impl ChannelMetricMetadata { struct Metrics { histogram: Histogram, gauge: Gauge, + mean_gauge: Gauge, + ewma: Arc, // We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but // since the value is static, we never need to update it. The compiler detects this as an unused // field, so we need to suppress the warning here. @@ -128,6 +136,8 @@ impl Metrics { let max_gauge_name = format!("{prefix}{gauge_suffix}"); let histogram_name = format!("{prefix}_utilization"); let gauge_name = format!("{prefix}_utilization_level"); + let mean_name = format!("{prefix}_utilization_mean"); + let ewma = Arc::new(AtomicEwma::new(EWMA_ALPHA)); #[cfg(test)] let recorded_values = Arc::new(Mutex::new(Vec::new())); if let Some(label_value) = output { @@ -136,7 +146,9 @@ impl Metrics { Self { histogram: histogram!(histogram_name, "output" => label_value.clone()), gauge: gauge!(gauge_name, "output" => label_value.clone()), + mean_gauge: gauge!(mean_name, "output" => label_value.clone()), max_gauge, + ewma, #[cfg(test)] recorded_values, } @@ -146,7 +158,9 @@ impl Metrics { Self { histogram: histogram!(histogram_name), gauge: gauge!(gauge_name), + mean_gauge: gauge!(mean_name), max_gauge, + ewma, #[cfg(test)] recorded_values, } @@ -157,6 +171,8 @@ impl Metrics { fn record(&self, value: usize) { self.histogram.record(value as f64); self.gauge.set(value as f64); + let avg = self.ewma.update(value as f64); + self.mean_gauge.set(avg); #[cfg(test)] if let Ok(mut recorded) = self.recorded_values.lock() { recorded.push(value); diff --git a/lib/vector-common/src/atomic.rs b/lib/vector-common/src/atomic.rs new file mode 100644 index 0000000000000..a2d342771819a --- /dev/null +++ b/lib/vector-common/src/atomic.rs @@ -0,0 +1,49 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +use metrics::GaugeFn; + +/// Simple atomic wrapper for `f64` values. +#[derive(Debug)] +pub struct AtomicF64(AtomicU64); + +impl AtomicF64 { + /// Creates a new `AtomicF64` with the given initial value. + #[must_use] + pub fn new(init: f64) -> Self { + Self(AtomicU64::new(init.to_bits())) + } + + pub fn load(&self, order: Ordering) -> f64 { + f64::from_bits(self.0.load(order)) + } + + #[expect(clippy::missing_panics_doc, reason = "fetch_update always succeeds")] + pub fn fetch_update( + &self, + set_order: Ordering, + fetch_order: Ordering, + mut f: impl FnMut(f64) -> f64, + ) -> f64 { + f64::from_bits( + self.0 + .fetch_update(set_order, fetch_order, |x| { + Some(f(f64::from_bits(x)).to_bits()) + }) + .expect("fetch_update always succeeds"), + ) + } +} + +impl GaugeFn for AtomicF64 { + fn increment(&self, amount: f64) { + self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value + amount); + } + + fn decrement(&self, amount: f64) { + self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value - amount); + } + + fn set(&self, value: f64) { + self.0.store(f64::to_bits(value), Ordering::Relaxed); + } +} diff --git a/lib/vector-common/src/lib.rs b/lib/vector-common/src/lib.rs index 8216f67f7a2a9..099a068578090 100644 --- a/lib/vector-common/src/lib.rs +++ b/lib/vector-common/src/lib.rs @@ -59,6 +59,8 @@ pub mod shutdown; #[cfg(feature = "sensitive_string")] pub mod sensitive_string; +pub mod atomic; +pub mod stats; pub mod trigger; #[macro_use] diff --git a/src/stats.rs b/lib/vector-common/src/stats.rs similarity index 70% rename from src/stats.rs rename to lib/vector-common/src/stats.rs index 9bcd3253515a9..ffdaad8dac5e0 100644 --- a/src/stats.rs +++ b/lib/vector-common/src/stats.rs @@ -1,4 +1,9 @@ #![allow(missing_docs)] + +use std::sync::atomic::Ordering; + +use crate::atomic::AtomicF64; + /// Exponentially Weighted Moving Average #[derive(Clone, Copy, Debug)] pub struct Ewma { @@ -7,11 +12,13 @@ pub struct Ewma { } impl Ewma { + #[must_use] pub const fn new(alpha: f64) -> Self { let average = None; Self { average, alpha } } + #[must_use] pub const fn average(&self) -> Option { self.average } @@ -35,6 +42,7 @@ pub struct EwmaDefault { } impl EwmaDefault { + #[must_use] pub const fn new(alpha: f64, initial_value: f64) -> Self { Self { average: initial_value, @@ -42,6 +50,7 @@ impl EwmaDefault { } } + #[must_use] pub const fn average(&self) -> f64 { self.average } @@ -67,21 +76,25 @@ pub struct MeanVariance { } impl EwmaVar { + #[must_use] pub const fn new(alpha: f64) -> Self { let state = None; Self { state, alpha } } + #[must_use] pub const fn state(&self) -> Option { self.state } #[cfg(test)] + #[must_use] pub fn average(&self) -> Option { self.state.map(|state| state.mean) } #[cfg(test)] + #[must_use] pub fn variance(&self) -> Option { self.state.map(|state| state.variance) } @@ -114,11 +127,16 @@ pub struct Mean { impl Mean { /// Update the and return the current average + #[expect( + clippy::cast_precision_loss, + reason = "We have to convert count to f64 for the calculation, it's okay to lose precision for very large counts." + )] pub fn update(&mut self, point: f64) { self.count += 1; self.mean += (point - self.mean) / self.count as f64; } + #[must_use] pub const fn average(&self) -> Option { match self.count { 0 => None, @@ -127,6 +145,43 @@ impl Mean { } } +/// Atomic EWMA that uses an `AtomicF64` to store the current average. +#[derive(Debug)] +pub struct AtomicEwma { + average: AtomicF64, + alpha: f64, +} + +impl AtomicEwma { + #[must_use] + pub fn new(alpha: f64) -> Self { + Self { + average: AtomicF64::new(f64::NAN), + alpha, + } + } + + pub fn update(&self, point: f64) -> f64 { + let mut result = f64::NAN; + self.average + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { + let average = if current.is_nan() { + point + } else { + point.mul_add(self.alpha, current * (1.0 - self.alpha)) + }; + result = average; + average + }); + result + } + + pub fn average(&self) -> Option { + let value = self.average.load(Ordering::Relaxed); + if value.is_nan() { None } else { Some(value) } + } +} + #[cfg(test)] mod tests { use super::*; @@ -144,16 +199,17 @@ mod tests { } #[test] + #[expect(clippy::float_cmp, reason = "none of the values will be rounded")] fn ewma_update_works() { let mut mean = Ewma::new(0.5); assert_eq!(mean.average(), None); - mean.update(2.0); + assert_eq!(mean.update(2.0), 2.0); assert_eq!(mean.average(), Some(2.0)); - mean.update(2.0); + assert_eq!(mean.update(2.0), 2.0); assert_eq!(mean.average(), Some(2.0)); - mean.update(1.0); + assert_eq!(mean.update(1.0), 1.5); assert_eq!(mean.average(), Some(1.5)); - mean.update(2.0); + assert_eq!(mean.update(2.0), 1.75); assert_eq!(mean.average(), Some(1.75)); assert_eq!(mean.average, Some(1.75)); @@ -185,4 +241,15 @@ mod tests { }) ); } + + #[test] + #[expect(clippy::float_cmp, reason = "none of the values will be rounded")] + fn atomic_ewma_update_works() { + let ewma = AtomicEwma::new(0.5); + assert_eq!(ewma.average(), None); + assert_eq!(ewma.update(2.0), 2.0); + assert_eq!(ewma.average(), Some(2.0)); + assert_eq!(ewma.update(1.0), 1.5); + assert_eq!(ewma.average(), Some(1.5)); + } } diff --git a/lib/vector-core/src/metrics/recency.rs b/lib/vector-core/src/metrics/recency.rs index dcb237d6206c5..e63de81c9b1e3 100644 --- a/lib/vector-core/src/metrics/recency.rs +++ b/lib/vector-core/src/metrics/recency.rs @@ -63,8 +63,9 @@ use metrics_util::{ }; use parking_lot::Mutex; use quanta::{Clock, Instant}; +use vector_common::atomic::AtomicF64; -use super::storage::{AtomicF64, Histogram}; +use super::storage::Histogram; /// The generation of a metric. /// diff --git a/lib/vector-core/src/metrics/storage.rs b/lib/vector-core/src/metrics/storage.rs index b102849a6f2a9..29030e927d7ee 100644 --- a/lib/vector-core/src/metrics/storage.rs +++ b/lib/vector-core/src/metrics/storage.rs @@ -3,8 +3,9 @@ use std::sync::{ atomic::{AtomicU32, Ordering}, }; -use metrics::{GaugeFn, HistogramFn, atomics::AtomicU64}; +use metrics::{HistogramFn, atomics::AtomicU64}; use metrics_util::registry::Storage; +use vector_common::atomic::AtomicF64; use crate::event::{MetricValue, metric::Bucket}; @@ -28,50 +29,6 @@ impl Storage for VectorStorage { } } -#[derive(Debug)] -pub(super) struct AtomicF64 { - inner: AtomicU64, -} - -impl AtomicF64 { - fn new(init: f64) -> Self { - Self { - inner: AtomicU64::new(init.to_bits()), - } - } - - fn fetch_update( - &self, - set_order: Ordering, - fetch_order: Ordering, - mut f: impl FnMut(f64) -> f64, - ) { - self.inner - .fetch_update(set_order, fetch_order, |x| { - Some(f(f64::from_bits(x)).to_bits()) - }) - .expect("Cannot fail"); - } - - pub(super) fn load(&self, order: Ordering) -> f64 { - f64::from_bits(self.inner.load(order)) - } -} - -impl GaugeFn for AtomicF64 { - fn increment(&self, amount: f64) { - self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value + amount); - } - - fn decrement(&self, amount: f64) { - self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value - amount); - } - - fn set(&self, value: f64) { - self.inner.store(f64::to_bits(value), Ordering::Relaxed); - } -} - #[derive(Debug)] pub(super) struct Histogram { buckets: Box<[(f64, AtomicU32); 20]>, diff --git a/lib/vector-core/src/source_sender/tests.rs b/lib/vector-core/src/source_sender/tests.rs index 66e4169af7077..4b2e47ed18a24 100644 --- a/lib/vector-core/src/source_sender/tests.rs +++ b/lib/vector-core/src/source_sender/tests.rs @@ -266,7 +266,7 @@ async fn emits_buffer_utilization_histogram_on_send_and_receive() { .into_iter() .filter(|metric| metric.name().starts_with("source_buffer_")) .collect(); - assert_eq!(metrics.len(), 3, "expected 3 utilization metrics"); + assert_eq!(metrics.len(), 4, "expected 4 utilization metrics"); let find_metric = |name: &str| { metrics diff --git a/lib/vector-lib/src/lib.rs b/lib/vector-lib/src/lib.rs index 4672148bacdf1..3400a03c2e4cf 100644 --- a/lib/vector-lib/src/lib.rs +++ b/lib/vector-lib/src/lib.rs @@ -10,10 +10,10 @@ pub use vector_buffers as buffers; #[cfg(feature = "test")] pub use vector_common::event_test_util; pub use vector_common::{ - Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, btreemap, byte_size_of, - byte_size_of::ByteSizeOf, conversion, encode_logfmt, finalization, finalizer, id, + Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, atomic, btreemap, + byte_size_of, byte_size_of::ByteSizeOf, conversion, encode_logfmt, finalization, finalizer, id, impl_event_data_eq, internal_event, json_size, registered_event, request_metadata, - sensitive_string, shutdown, trigger, + sensitive_string, shutdown, stats, trigger, }; pub use vector_config as configurable; pub use vector_config::impl_generate_config_from_default; diff --git a/src/lib.rs b/src/lib.rs index 0f2dbfb5fa728..9ace77858d452 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,7 +107,6 @@ pub(crate) mod sink_ext; pub mod sinks; #[allow(unreachable_pub)] pub mod sources; -pub mod stats; #[cfg(feature = "api-client")] #[allow(unreachable_pub)] pub mod tap; diff --git a/src/sinks/util/adaptive_concurrency/controller.rs b/src/sinks/util/adaptive_concurrency/controller.rs index 85a0559041e81..58caf13adb5cd 100644 --- a/src/sinks/util/adaptive_concurrency/controller.rs +++ b/src/sinks/util/adaptive_concurrency/controller.rs @@ -7,6 +7,7 @@ use std::{ use tokio::sync::OwnedSemaphorePermit; use tower::timeout::error::Elapsed; use vector_lib::internal_event::{InternalEventHandle as _, Registered}; +use vector_lib::stats::{EwmaVar, Mean, MeanVariance}; use super::{AdaptiveConcurrencySettings, instant_now, semaphore::ShrinkableSemaphore}; #[cfg(test)] @@ -18,7 +19,6 @@ use crate::{ AdaptiveConcurrencyLimitData, AdaptiveConcurrencyObservedRtt, }, sinks::util::retries::{RetryAction, RetryLogic}, - stats::{EwmaVar, Mean, MeanVariance}, }; /// Shared class for `tokio::sync::Semaphore` that manages adjusting the diff --git a/src/sources/util/net/tcp/request_limiter.rs b/src/sources/util/net/tcp/request_limiter.rs index 63019622eacca..d79b3d6c7a9e1 100644 --- a/src/sources/util/net/tcp/request_limiter.rs +++ b/src/sources/util/net/tcp/request_limiter.rs @@ -4,8 +4,7 @@ use std::{ }; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; - -use crate::stats::EwmaDefault; +use vector_lib::stats::EwmaDefault; const EWMA_WEIGHT: f64 = 0.1; const MINIMUM_PERMITS: usize = 2; diff --git a/src/utilization.rs b/src/utilization.rs index 8874f0537f583..413505b8fc9f5 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -20,9 +20,7 @@ use tokio::{ time::interval, }; use tokio_stream::wrappers::IntervalStream; -use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal}; - -use crate::stats; +use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal, stats}; const UTILIZATION_EMITTER_DURATION: Duration = Duration::from_secs(5); diff --git a/website/cue/reference/components/sources.cue b/website/cue/reference/components/sources.cue index 67585490993e5..aea0e54cedb2f 100644 --- a/website/cue/reference/components/sources.cue +++ b/website/cue/reference/components/sources.cue @@ -421,5 +421,6 @@ components: sources: [Name=string]: { source_buffer_max_event_size: components.sources.internal_metrics.output.metrics.source_buffer_max_event_size source_buffer_utilization: components.sources.internal_metrics.output.metrics.source_buffer_utilization source_buffer_utilization_level: components.sources.internal_metrics.output.metrics.source_buffer_utilization_level + source_buffer_utilization_mean: components.sources.internal_metrics.output.metrics.source_buffer_utilization_mean } } diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index da94e39ecf645..d24a3dc22b8bd 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -719,7 +719,7 @@ components: sources: internal_metrics: { tags: _component_tags } source_buffer_max_byte_size: { - description: "The maximum number of bytes the buffer that the source's outputs send into can hold." + description: "The maximum number of bytes the source buffer can hold. The outputs of the source send data to this buffer." type: "gauge" default_namespace: "vector" tags: _component_tags & { @@ -727,7 +727,7 @@ components: sources: internal_metrics: { } } source_buffer_max_event_size: { - description: "The maximum number of events the buffer that the source's outputs send into can hold." + description: "The maximum number of events the source buffer can hold. The outputs of the source send data to this buffer." type: "gauge" default_namespace: "vector" tags: _component_tags & { @@ -735,7 +735,7 @@ components: sources: internal_metrics: { } } source_buffer_utilization: { - description: "The utilization level of the buffer that the source's outputs send into." + description: "The utilization level of the source buffer. The outputs of the source send data to this buffer." type: "histogram" default_namespace: "vector" tags: _component_tags & { @@ -743,7 +743,15 @@ components: sources: internal_metrics: { } } source_buffer_utilization_level: { - description: "The current utilization level of the buffer that the source's outputs send into." + description: "The current utilization level of the source buffer. The outputs of the source send data to this buffer." + type: "gauge" + default_namespace: "vector" + tags: _component_tags & { + output: _output + } + } + source_buffer_utilization_mean: { + description: "The mean utilization level of the source buffer. The outputs of the source send data to this buffer. The mean utilization is smoothed over time using an exponentially weighted moving average (EWMA)." type: "gauge" default_namespace: "vector" tags: _component_tags & { @@ -882,6 +890,14 @@ components: sources: internal_metrics: { output: _output } } + transform_buffer_utilization_mean: { + description: "The mean utilization level of the buffer that feeds into a transform. This value is smoothed over time using an exponentially weighted moving average (EWMA)." + type: "gauge" + default_namespace: "vector" + tags: _component_tags & { + output: _output + } + } uptime_seconds: { description: "The total number of seconds the Vector instance has been up." type: "gauge" diff --git a/website/cue/reference/components/transforms.cue b/website/cue/reference/components/transforms.cue index 5f19812435555..3ad19224ce34b 100644 --- a/website/cue/reference/components/transforms.cue +++ b/website/cue/reference/components/transforms.cue @@ -24,6 +24,7 @@ components: transforms: [Name=string]: { transform_buffer_max_byte_size: components.sources.internal_metrics.output.metrics.transform_buffer_max_byte_size transform_buffer_utilization: components.sources.internal_metrics.output.metrics.transform_buffer_utilization transform_buffer_utilization_level: components.sources.internal_metrics.output.metrics.transform_buffer_utilization_level + transform_buffer_utilization_mean: components.sources.internal_metrics.output.metrics.transform_buffer_utilization_mean utilization: components.sources.internal_metrics.output.metrics.utilization } }