Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/buffer_utilization_mean_metrics.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added moving-mean gauges for source and transform buffers (`source_buffer_utilization_mean` and `transform_buffer_utilization_mean`), so observers can track an EWMA of buffer utilization in addition to the instant level.

authors: bruceg
16 changes: 16 additions & 0 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ use crossbeam_queue::{ArrayQueue, SegQueue};
use futures::Stream;
use metrics::{Gauge, Histogram, gauge, histogram};
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
use vector_common::stats::AtomicEwma;

use crate::{InMemoryBufferable, config::MemoryBufferSize};

/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
/// measure of how much weight to give to the current value versus the previous values. A value of
/// 0.9 results in a "half life" of 6-7 measurements.
const EWMA_ALPHA: f64 = 0.9;

/// Error returned by `LimitedSender::send` when the receiver has disconnected.
#[derive(Debug, PartialEq, Eq)]
pub struct SendError<T>(pub T);
Expand Down Expand Up @@ -108,6 +114,8 @@ impl ChannelMetricMetadata {
struct Metrics {
histogram: Histogram,
gauge: Gauge,
mean_gauge: Gauge,
ewma: Arc<AtomicEwma>,
// We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
// since the value is static, we never need to update it. The compiler detects this as an unused
// field, so we need to suppress the warning here.
Expand All @@ -128,6 +136,8 @@ impl Metrics {
let max_gauge_name = format!("{prefix}{gauge_suffix}");
let histogram_name = format!("{prefix}_utilization");
let gauge_name = format!("{prefix}_utilization_level");
let mean_name = format!("{prefix}_utilization_mean");
let ewma = Arc::new(AtomicEwma::new(EWMA_ALPHA));
#[cfg(test)]
let recorded_values = Arc::new(Mutex::new(Vec::new()));
if let Some(label_value) = output {
Expand All @@ -136,7 +146,9 @@ impl Metrics {
Self {
histogram: histogram!(histogram_name, "output" => label_value.clone()),
gauge: gauge!(gauge_name, "output" => label_value.clone()),
mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
max_gauge,
ewma,
#[cfg(test)]
recorded_values,
}
Expand All @@ -146,7 +158,9 @@ impl Metrics {
Self {
histogram: histogram!(histogram_name),
gauge: gauge!(gauge_name),
mean_gauge: gauge!(mean_name),
max_gauge,
ewma,
#[cfg(test)]
recorded_values,
}
Expand All @@ -157,6 +171,8 @@ impl Metrics {
fn record(&self, value: usize) {
self.histogram.record(value as f64);
self.gauge.set(value as f64);
let avg = self.ewma.update(value as f64);
self.mean_gauge.set(avg);
#[cfg(test)]
if let Ok(mut recorded) = self.recorded_values.lock() {
recorded.push(value);
Expand Down
49 changes: 49 additions & 0 deletions lib/vector-common/src/atomic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::sync::atomic::{AtomicU64, Ordering};

use metrics::GaugeFn;

/// Simple atomic wrapper for `f64` values.
#[derive(Debug)]
pub struct AtomicF64(AtomicU64);

impl AtomicF64 {
/// Creates a new `AtomicF64` with the given initial value.
#[must_use]
pub fn new(init: f64) -> Self {
Self(AtomicU64::new(init.to_bits()))
}

pub fn load(&self, order: Ordering) -> f64 {
f64::from_bits(self.0.load(order))
}

#[expect(clippy::missing_panics_doc, reason = "fetch_update always succeeds")]
pub fn fetch_update(
&self,
set_order: Ordering,
fetch_order: Ordering,
mut f: impl FnMut(f64) -> f64,
) -> f64 {
f64::from_bits(
self.0
.fetch_update(set_order, fetch_order, |x| {
Some(f(f64::from_bits(x)).to_bits())
})
.expect("fetch_update always succeeds"),
)
}
}

impl GaugeFn for AtomicF64 {
fn increment(&self, amount: f64) {
self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value + amount);
}

fn decrement(&self, amount: f64) {
self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value - amount);
}

fn set(&self, value: f64) {
self.0.store(f64::to_bits(value), Ordering::Relaxed);
}
}
2 changes: 2 additions & 0 deletions lib/vector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub mod shutdown;
#[cfg(feature = "sensitive_string")]
pub mod sensitive_string;

pub mod atomic;
pub mod stats;
pub mod trigger;

#[macro_use]
Expand Down
75 changes: 71 additions & 4 deletions src/stats.rs → lib/vector-common/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
#![allow(missing_docs)]

use std::sync::atomic::Ordering;

use crate::atomic::AtomicF64;

/// Exponentially Weighted Moving Average
#[derive(Clone, Copy, Debug)]
pub struct Ewma {
Expand All @@ -7,11 +12,13 @@ pub struct Ewma {
}

impl Ewma {
#[must_use]
pub const fn new(alpha: f64) -> Self {
let average = None;
Self { average, alpha }
}

#[must_use]
pub const fn average(&self) -> Option<f64> {
self.average
}
Expand All @@ -35,13 +42,15 @@ pub struct EwmaDefault {
}

impl EwmaDefault {
#[must_use]
pub const fn new(alpha: f64, initial_value: f64) -> Self {
Self {
average: initial_value,
alpha,
}
}

#[must_use]
pub const fn average(&self) -> f64 {
self.average
}
Expand All @@ -67,21 +76,25 @@ pub struct MeanVariance {
}

impl EwmaVar {
#[must_use]
pub const fn new(alpha: f64) -> Self {
let state = None;
Self { state, alpha }
}

#[must_use]
pub const fn state(&self) -> Option<MeanVariance> {
self.state
}

#[cfg(test)]
#[must_use]
pub fn average(&self) -> Option<f64> {
self.state.map(|state| state.mean)
}

#[cfg(test)]
#[must_use]
pub fn variance(&self) -> Option<f64> {
self.state.map(|state| state.variance)
}
Expand Down Expand Up @@ -114,11 +127,16 @@ pub struct Mean {

impl Mean {
/// Update the and return the current average
#[expect(
clippy::cast_precision_loss,
reason = "We have to convert count to f64 for the calculation, it's okay to lose precision for very large counts."
)]
pub fn update(&mut self, point: f64) {
self.count += 1;
self.mean += (point - self.mean) / self.count as f64;
}

#[must_use]
pub const fn average(&self) -> Option<f64> {
match self.count {
0 => None,
Expand All @@ -127,6 +145,43 @@ impl Mean {
}
}

/// Atomic EWMA that uses an `AtomicF64` to store the current average.
#[derive(Debug)]
pub struct AtomicEwma {
average: AtomicF64,
alpha: f64,
}

impl AtomicEwma {
#[must_use]
pub fn new(alpha: f64) -> Self {
Self {
average: AtomicF64::new(f64::NAN),
alpha,
}
}

pub fn update(&self, point: f64) -> f64 {
let mut result = f64::NAN;
self.average
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
let average = if current.is_nan() {
point
} else {
point.mul_add(self.alpha, current * (1.0 - self.alpha))
};
result = average;
average
});
result
Comment on lines +165 to +176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut result = f64::NAN;
self.average
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
let average = if current.is_nan() {
point
} else {
point.mul_add(self.alpha, current * (1.0 - self.alpha))
};
result = average;
average
});
result
self.average
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
if current.is_nan() {
point
} else {
point.mul_add(self.alpha, current * (1.0 - self.alpha))
}
})

I think this would work the same as fetch_update returns an f64

Copy link
Member Author

@bruceg bruceg Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that fetch_update returns the old value (hence the mnemonics of fetch-then-update), but we want to return the updated average which is only accessible in the closure or by adding another fetch which adds to the expense.

}

pub fn average(&self) -> Option<f64> {
let value = self.average.load(Ordering::Relaxed);
if value.is_nan() { None } else { Some(value) }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -144,16 +199,17 @@ mod tests {
}

#[test]
#[expect(clippy::float_cmp, reason = "none of the values will be rounded")]
fn ewma_update_works() {
let mut mean = Ewma::new(0.5);
assert_eq!(mean.average(), None);
mean.update(2.0);
assert_eq!(mean.update(2.0), 2.0);
assert_eq!(mean.average(), Some(2.0));
mean.update(2.0);
assert_eq!(mean.update(2.0), 2.0);
assert_eq!(mean.average(), Some(2.0));
mean.update(1.0);
assert_eq!(mean.update(1.0), 1.5);
assert_eq!(mean.average(), Some(1.5));
mean.update(2.0);
assert_eq!(mean.update(2.0), 1.75);
assert_eq!(mean.average(), Some(1.75));

assert_eq!(mean.average, Some(1.75));
Expand Down Expand Up @@ -185,4 +241,15 @@ mod tests {
})
);
}

#[test]
#[expect(clippy::float_cmp, reason = "none of the values will be rounded")]
fn atomic_ewma_update_works() {
let ewma = AtomicEwma::new(0.5);
assert_eq!(ewma.average(), None);
assert_eq!(ewma.update(2.0), 2.0);
assert_eq!(ewma.average(), Some(2.0));
assert_eq!(ewma.update(1.0), 1.5);
assert_eq!(ewma.average(), Some(1.5));
}
}
3 changes: 2 additions & 1 deletion lib/vector-core/src/metrics/recency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ use metrics_util::{
};
use parking_lot::Mutex;
use quanta::{Clock, Instant};
use vector_common::atomic::AtomicF64;

use super::storage::{AtomicF64, Histogram};
use super::storage::Histogram;

/// The generation of a metric.
///
Expand Down
47 changes: 2 additions & 45 deletions lib/vector-core/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::sync::{
atomic::{AtomicU32, Ordering},
};

use metrics::{GaugeFn, HistogramFn, atomics::AtomicU64};
use metrics::{HistogramFn, atomics::AtomicU64};
use metrics_util::registry::Storage;
use vector_common::atomic::AtomicF64;

use crate::event::{MetricValue, metric::Bucket};

Expand All @@ -28,50 +29,6 @@ impl<K> Storage<K> for VectorStorage {
}
}

#[derive(Debug)]
pub(super) struct AtomicF64 {
inner: AtomicU64,
}

impl AtomicF64 {
fn new(init: f64) -> Self {
Self {
inner: AtomicU64::new(init.to_bits()),
}
}

fn fetch_update(
&self,
set_order: Ordering,
fetch_order: Ordering,
mut f: impl FnMut(f64) -> f64,
) {
self.inner
.fetch_update(set_order, fetch_order, |x| {
Some(f(f64::from_bits(x)).to_bits())
})
.expect("Cannot fail");
}

pub(super) fn load(&self, order: Ordering) -> f64 {
f64::from_bits(self.inner.load(order))
}
}

impl GaugeFn for AtomicF64 {
fn increment(&self, amount: f64) {
self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value + amount);
}

fn decrement(&self, amount: f64) {
self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value - amount);
}

fn set(&self, value: f64) {
self.inner.store(f64::to_bits(value), Ordering::Relaxed);
}
}

#[derive(Debug)]
pub(super) struct Histogram {
buckets: Box<[(f64, AtomicU32); 20]>,
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/source_sender/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ async fn emits_buffer_utilization_histogram_on_send_and_receive() {
.into_iter()
.filter(|metric| metric.name().starts_with("source_buffer_"))
.collect();
assert_eq!(metrics.len(), 3, "expected 3 utilization metrics");
assert_eq!(metrics.len(), 4, "expected 4 utilization metrics");

let find_metric = |name: &str| {
metrics
Expand Down
6 changes: 3 additions & 3 deletions lib/vector-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ pub use vector_buffers as buffers;
#[cfg(feature = "test")]
pub use vector_common::event_test_util;
pub use vector_common::{
Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, btreemap, byte_size_of,
byte_size_of::ByteSizeOf, conversion, encode_logfmt, finalization, finalizer, id,
Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, atomic, btreemap,
byte_size_of, byte_size_of::ByteSizeOf, conversion, encode_logfmt, finalization, finalizer, id,
impl_event_data_eq, internal_event, json_size, registered_event, request_metadata,
sensitive_string, shutdown, trigger,
sensitive_string, shutdown, stats, trigger,
};
pub use vector_config as configurable;
pub use vector_config::impl_generate_config_from_default;
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ pub(crate) mod sink_ext;
pub mod sinks;
#[allow(unreachable_pub)]
pub mod sources;
pub mod stats;
#[cfg(feature = "api-client")]
#[allow(unreachable_pub)]
pub mod tap;
Expand Down
Loading
Loading