Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/buffer_utilization_mean_metrics.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added moving-mean gauges for source and transform buffers (`source_buffer_utilization_mean` and `transform_buffer_utilization_mean`), so observers can track an EWMA of buffer utilization in addition to the instant level.

authors: bruceg
16 changes: 16 additions & 0 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ use crossbeam_queue::{ArrayQueue, SegQueue};
use futures::Stream;
use metrics::{Gauge, Histogram, gauge, histogram};
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
use vector_common::stats::AtomicEwma;

use crate::{InMemoryBufferable, config::MemoryBufferSize};

/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
/// measure of how much weight to give to the current value versus the previous values. A value of
/// 0.9 results in a "half life" of 6-7 measurements.
const EWMA_ALPHA: f64 = 0.9;

/// Error returned by `LimitedSender::send` when the receiver has disconnected.
#[derive(Debug, PartialEq, Eq)]
pub struct SendError<T>(pub T);
Expand Down Expand Up @@ -108,6 +114,8 @@ impl ChannelMetricMetadata {
struct Metrics {
histogram: Histogram,
gauge: Gauge,
mean_gauge: Gauge,
ewma: Arc<AtomicEwma>,
// We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
// since the value is static, we never need to update it. The compiler detects this as an unused
// field, so we need to suppress the warning here.
Expand All @@ -128,6 +136,8 @@ impl Metrics {
let max_gauge_name = format!("{prefix}{gauge_suffix}");
let histogram_name = format!("{prefix}_utilization");
let gauge_name = format!("{prefix}_utilization_level");
let mean_name = format!("{prefix}_utilization_mean");
let ewma = Arc::new(AtomicEwma::new(EWMA_ALPHA));
#[cfg(test)]
let recorded_values = Arc::new(Mutex::new(Vec::new()));
if let Some(label_value) = output {
Expand All @@ -136,7 +146,9 @@ impl Metrics {
Self {
histogram: histogram!(histogram_name, "output" => label_value.clone()),
gauge: gauge!(gauge_name, "output" => label_value.clone()),
mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
max_gauge,
ewma,
#[cfg(test)]
recorded_values,
}
Expand All @@ -146,7 +158,9 @@ impl Metrics {
Self {
histogram: histogram!(histogram_name),
gauge: gauge!(gauge_name),
mean_gauge: gauge!(mean_name),
max_gauge,
ewma,
#[cfg(test)]
recorded_values,
}
Expand All @@ -157,6 +171,8 @@ impl Metrics {
fn record(&self, value: usize) {
self.histogram.record(value as f64);
self.gauge.set(value as f64);
let avg = self.ewma.update(value as f64);
self.mean_gauge.set(avg);
#[cfg(test)]
if let Ok(mut recorded) = self.recorded_values.lock() {
recorded.push(value);
Expand Down
49 changes: 49 additions & 0 deletions lib/vector-common/src/atomic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::sync::atomic::{AtomicU64, Ordering};

use metrics::GaugeFn;

/// Simple atomic wrapper for `f64` values.
#[derive(Debug)]
pub struct AtomicF64(AtomicU64);

impl AtomicF64 {
/// Creates a new `AtomicF64` with the given initial value.
#[must_use]
pub fn new(init: f64) -> Self {
Self(AtomicU64::new(init.to_bits()))
}

pub fn load(&self, order: Ordering) -> f64 {
f64::from_bits(self.0.load(order))
}

#[expect(clippy::missing_panics_doc, reason = "fetch_update always succeeds")]
pub fn fetch_update(
&self,
set_order: Ordering,
fetch_order: Ordering,
mut f: impl FnMut(f64) -> f64,
) -> f64 {
f64::from_bits(
self.0
.fetch_update(set_order, fetch_order, |x| {
Some(f(f64::from_bits(x)).to_bits())
})
.expect("fetch_update always succeeds"),
)
}
}

impl GaugeFn for AtomicF64 {
fn increment(&self, amount: f64) {
self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value + amount);
}

fn decrement(&self, amount: f64) {
self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value - amount);
}

fn set(&self, value: f64) {
self.0.store(f64::to_bits(value), Ordering::Relaxed);
}
}
2 changes: 2 additions & 0 deletions lib/vector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub mod shutdown;
#[cfg(feature = "sensitive_string")]
pub mod sensitive_string;

pub mod atomic;
pub mod stats;
pub mod trigger;

#[macro_use]
Expand Down
75 changes: 71 additions & 4 deletions src/stats.rs → lib/vector-common/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
#![allow(missing_docs)]

use std::sync::atomic::Ordering;

use crate::atomic::AtomicF64;

/// Exponentially Weighted Moving Average
#[derive(Clone, Copy, Debug)]
pub struct Ewma {
Expand All @@ -7,11 +12,13 @@ pub struct Ewma {
}

impl Ewma {
#[must_use]
pub const fn new(alpha: f64) -> Self {
let average = None;
Self { average, alpha }
}

#[must_use]
pub const fn average(&self) -> Option<f64> {
self.average
}
Expand All @@ -35,13 +42,15 @@ pub struct EwmaDefault {
}

impl EwmaDefault {
#[must_use]
pub const fn new(alpha: f64, initial_value: f64) -> Self {
Self {
average: initial_value,
alpha,
}
}

#[must_use]
pub const fn average(&self) -> f64 {
self.average
}
Expand All @@ -67,21 +76,25 @@ pub struct MeanVariance {
}

impl EwmaVar {
#[must_use]
pub const fn new(alpha: f64) -> Self {
let state = None;
Self { state, alpha }
}

#[must_use]
pub const fn state(&self) -> Option<MeanVariance> {
self.state
}

#[cfg(test)]
#[must_use]
pub fn average(&self) -> Option<f64> {
self.state.map(|state| state.mean)
}

#[cfg(test)]
#[must_use]
pub fn variance(&self) -> Option<f64> {
self.state.map(|state| state.variance)
}
Expand Down Expand Up @@ -114,11 +127,16 @@ pub struct Mean {

impl Mean {
/// Update the and return the current average
#[expect(
clippy::cast_precision_loss,
reason = "We have to convert count to f64 for the calculation, it's okay to lose precision for very large counts."
)]
pub fn update(&mut self, point: f64) {
self.count += 1;
self.mean += (point - self.mean) / self.count as f64;
}

#[must_use]
pub const fn average(&self) -> Option<f64> {
match self.count {
0 => None,
Expand All @@ -127,6 +145,43 @@ impl Mean {
}
}

/// Atomic EWMA that uses an `AtomicF64` to store the current average.
#[derive(Debug)]
pub struct AtomicEwma {
average: AtomicF64,
alpha: f64,
}

impl AtomicEwma {
#[must_use]
pub fn new(alpha: f64) -> Self {
Self {
average: AtomicF64::new(f64::NAN),
alpha,
}
}

pub fn update(&self, point: f64) -> f64 {
let mut result = f64::NAN;
self.average
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
let average = if current.is_nan() {
point
} else {
point.mul_add(self.alpha, current * (1.0 - self.alpha))
};
result = average;
average
});
result
Comment on lines +165 to +176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut result = f64::NAN;
self.average
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
let average = if current.is_nan() {
point
} else {
point.mul_add(self.alpha, current * (1.0 - self.alpha))
};
result = average;
average
});
result
self.average
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
if current.is_nan() {
point
} else {
point.mul_add(self.alpha, current * (1.0 - self.alpha))
}
})

I think this would work the same as fetch_update returns an f64

Copy link
Member Author

@bruceg bruceg Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that fetch_update returns the old value (hence the mnemonics of fetch-then-update), but we want to return the updated average which is only accessible in the closure or by adding another fetch which adds to the expense.

}

pub fn average(&self) -> Option<f64> {
let value = self.average.load(Ordering::Relaxed);
if value.is_nan() { None } else { Some(value) }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -144,16 +199,17 @@ mod tests {
}

#[test]
#[expect(clippy::float_cmp, reason = "none of the values will be rounded")]
fn ewma_update_works() {
let mut mean = Ewma::new(0.5);
assert_eq!(mean.average(), None);
mean.update(2.0);
assert_eq!(mean.update(2.0), 2.0);
assert_eq!(mean.average(), Some(2.0));
mean.update(2.0);
assert_eq!(mean.update(2.0), 2.0);
assert_eq!(mean.average(), Some(2.0));
mean.update(1.0);
assert_eq!(mean.update(1.0), 1.5);
assert_eq!(mean.average(), Some(1.5));
mean.update(2.0);
assert_eq!(mean.update(2.0), 1.75);
assert_eq!(mean.average(), Some(1.75));

assert_eq!(mean.average, Some(1.75));
Expand Down Expand Up @@ -185,4 +241,15 @@ mod tests {
})
);
}

#[test]
#[expect(clippy::float_cmp, reason = "none of the values will be rounded")]
fn atomic_ewma_update_works() {
let ewma = AtomicEwma::new(0.5);
assert_eq!(ewma.average(), None);
assert_eq!(ewma.update(2.0), 2.0);
assert_eq!(ewma.average(), Some(2.0));
assert_eq!(ewma.update(1.0), 1.5);
assert_eq!(ewma.average(), Some(1.5));
}
}
3 changes: 2 additions & 1 deletion lib/vector-core/src/metrics/recency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ use metrics_util::{
};
use parking_lot::Mutex;
use quanta::{Clock, Instant};
use vector_common::atomic::AtomicF64;

use super::storage::{AtomicF64, Histogram};
use super::storage::Histogram;

/// The generation of a metric.
///
Expand Down
47 changes: 2 additions & 45 deletions lib/vector-core/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::sync::{
atomic::{AtomicU32, Ordering},
};

use metrics::{GaugeFn, HistogramFn, atomics::AtomicU64};
use metrics::{HistogramFn, atomics::AtomicU64};
use metrics_util::registry::Storage;
use vector_common::atomic::AtomicF64;

use crate::event::{MetricValue, metric::Bucket};

Expand All @@ -28,50 +29,6 @@ impl<K> Storage<K> for VectorStorage {
}
}

#[derive(Debug)]
pub(super) struct AtomicF64 {
inner: AtomicU64,
}

impl AtomicF64 {
fn new(init: f64) -> Self {
Self {
inner: AtomicU64::new(init.to_bits()),
}
}

fn fetch_update(
&self,
set_order: Ordering,
fetch_order: Ordering,
mut f: impl FnMut(f64) -> f64,
) {
self.inner
.fetch_update(set_order, fetch_order, |x| {
Some(f(f64::from_bits(x)).to_bits())
})
.expect("Cannot fail");
}

pub(super) fn load(&self, order: Ordering) -> f64 {
f64::from_bits(self.inner.load(order))
}
}

impl GaugeFn for AtomicF64 {
fn increment(&self, amount: f64) {
self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value + amount);
}

fn decrement(&self, amount: f64) {
self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value - amount);
}

fn set(&self, value: f64) {
self.inner.store(f64::to_bits(value), Ordering::Relaxed);
}
}

#[derive(Debug)]
pub(super) struct Histogram {
buckets: Box<[(f64, AtomicU32); 20]>,
Expand Down
6 changes: 3 additions & 3 deletions lib/vector-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ pub use vector_buffers as buffers;
#[cfg(feature = "test")]
pub use vector_common::event_test_util;
pub use vector_common::{
Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, btreemap, byte_size_of,
byte_size_of::ByteSizeOf, conversion, encode_logfmt, finalization, finalizer, id,
Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, atomic, btreemap,
byte_size_of, byte_size_of::ByteSizeOf, conversion, encode_logfmt, finalization, finalizer, id,
impl_event_data_eq, internal_event, json_size, registered_event, request_metadata,
sensitive_string, shutdown, trigger,
sensitive_string, shutdown, stats, trigger,
};
pub use vector_config as configurable;
pub use vector_config::impl_generate_config_from_default;
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ pub(crate) mod sink_ext;
pub mod sinks;
#[allow(unreachable_pub)]
pub mod sources;
pub mod stats;
#[cfg(feature = "api-client")]
#[allow(unreachable_pub)]
pub mod tap;
Expand Down
Loading
Loading