diff --git a/changelog.d/utilization_ewma_alpha_configuration.enhancement.md b/changelog.d/utilization_ewma_alpha_configuration.enhancement.md new file mode 100644 index 0000000000000..1f5c84bb9f515 --- /dev/null +++ b/changelog.d/utilization_ewma_alpha_configuration.enhancement.md @@ -0,0 +1,6 @@ +Added `buffer_utilization_ewma_alpha` configuration option to the global +options, allowing users to control the alpha value for the exponentially +weighted moving average (EWMA) used in source and transform buffer utilization +metrics. + +authors: bruceg diff --git a/lib/vector-buffers/src/topology/builder.rs b/lib/vector-buffers/src/topology/builder.rs index c4a42c30fab1f..0f54759c3bc65 100644 --- a/lib/vector-buffers/src/topology/builder.rs +++ b/lib/vector-buffers/src/topology/builder.rs @@ -191,12 +191,13 @@ impl TopologyBuilder { when_full: WhenFull, receiver_span: &Span, metadata: Option, + ewma_alpha: Option, ) -> (BufferSender, BufferReceiver) { let usage_handle = BufferUsageHandle::noop(); usage_handle.set_buffer_limits(None, Some(max_events.get())); let limit = MemoryBufferSize::MaxEvents(max_events); - let (sender, receiver) = limited(limit, metadata); + let (sender, receiver) = limited(limit, metadata, ewma_alpha); let mode = match when_full { WhenFull::Overflow => WhenFull::Block, @@ -232,7 +233,7 @@ impl TopologyBuilder { usage_handle.set_buffer_limits(None, Some(max_events.get())); let limit = MemoryBufferSize::MaxEvents(max_events); - let (sender, receiver) = limited(limit, metadata); + let (sender, receiver) = limited(limit, metadata, None); let mode = match when_full { WhenFull::Overflow => WhenFull::Block, diff --git a/lib/vector-buffers/src/topology/channel/limited_queue.rs b/lib/vector-buffers/src/topology/channel/limited_queue.rs index 0c47064dacc94..cbb502b748dbd 100644 --- a/lib/vector-buffers/src/topology/channel/limited_queue.rs +++ b/lib/vector-buffers/src/topology/channel/limited_queue.rs @@ -23,7 +23,7 @@ use crate::{InMemoryBufferable, config::MemoryBufferSize}; /// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a /// measure of how much weight to give to the current value versus the previous values. A value of /// 0.9 results in a "half life" of 6-7 measurements. -const EWMA_ALPHA: f64 = 0.9; +pub const DEFAULT_EWMA_ALPHA: f64 = 0.9; /// Error returned by `LimitedSender::send` when the receiver has disconnected. #[derive(Debug, PartialEq, Eq)] @@ -127,7 +127,11 @@ struct Metrics { impl Metrics { #[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here. - fn new(limit: MemoryBufferSize, metadata: ChannelMetricMetadata) -> Self { + fn new( + limit: MemoryBufferSize, + metadata: ChannelMetricMetadata, + ewma_alpha: Option, + ) -> Self { let ChannelMetricMetadata { prefix, output } = metadata; let (gauge_suffix, max_value) = match limit { MemoryBufferSize::MaxEvents(max_events) => ("_max_event_size", max_events.get() as f64), @@ -137,7 +141,7 @@ impl Metrics { let histogram_name = format!("{prefix}_utilization"); let gauge_name = format!("{prefix}_utilization_level"); let mean_name = format!("{prefix}_utilization_mean"); - let ewma = Arc::new(AtomicEwma::new(EWMA_ALPHA)); + let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA))); #[cfg(test)] let recorded_values = Arc::new(Mutex::new(Vec::new())); if let Some(label_value) = output { @@ -202,9 +206,13 @@ impl Clone for Inner { } impl Inner { - fn new(limit: MemoryBufferSize, metric_metadata: Option) -> Self { + fn new( + limit: MemoryBufferSize, + metric_metadata: Option, + ewma_alpha: Option, + ) -> Self { let read_waker = Arc::new(Notify::new()); - let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata)); + let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_alpha)); match limit { MemoryBufferSize::MaxEvents(max_events) => Inner { data: Arc::new(ArrayQueue::new(max_events.get())), @@ -397,8 +405,9 @@ impl Drop for LimitedReceiver { pub fn limited( limit: MemoryBufferSize, metric_metadata: Option, + ewma_alpha: Option, ) -> (LimitedSender, LimitedReceiver) { - let inner = Inner::new(limit, metric_metadata); + let inner = Inner::new(limit, metric_metadata, ewma_alpha); let sender = LimitedSender { inner: inner.clone(), @@ -426,7 +435,7 @@ mod tests { #[tokio::test] async fn send_receive() { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap()); - let (mut tx, mut rx) = limited(limit, None); + let (mut tx, mut rx) = limited(limit, None, None); assert_eq!(2, tx.available_capacity()); @@ -458,6 +467,7 @@ mod tests { let (mut tx, mut rx) = limited( limit, Some(ChannelMetricMetadata::new("test_channel", None)), + None, ); let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone(); @@ -477,7 +487,7 @@ mod tests { // With this configuration a maximum of exactly 10 messages can fit in the channel let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap()); - let (mut tx, mut rx) = limited(limit, None); + let (mut tx, mut rx) = limited(limit, None, None); assert_eq!(max_allowed_bytes, tx.available_capacity()); @@ -511,7 +521,7 @@ mod tests { #[test] fn sender_waits_for_more_capacity_when_none_available() { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()); - let (mut tx, mut rx) = limited(limit, None); + let (mut tx, mut rx) = limited(limit, None, None); assert_eq!(1, tx.available_capacity()); @@ -573,7 +583,7 @@ mod tests { #[test] fn sender_waits_for_more_capacity_when_partial_available() { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap()); - let (mut tx, mut rx) = limited(limit, None); + let (mut tx, mut rx) = limited(limit, None, None); assert_eq!(7, tx.available_capacity()); @@ -662,7 +672,7 @@ mod tests { #[test] fn empty_receiver_returns_none_when_last_sender_drops() { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()); - let (mut tx, mut rx) = limited(limit, None); + let (mut tx, mut rx) = limited(limit, None, None); assert_eq!(1, tx.available_capacity()); @@ -705,7 +715,7 @@ mod tests { #[test] fn receiver_returns_none_once_empty_when_last_sender_drops() { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()); - let (tx, mut rx) = limited::(limit, None); + let (tx, mut rx) = limited::(limit, None, None); assert_eq!(1, tx.available_capacity()); @@ -735,7 +745,7 @@ mod tests { #[test] fn oversized_send_allowed_when_empty() { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()); - let (mut tx, mut rx) = limited(limit, None); + let (mut tx, mut rx) = limited(limit, None, None); assert_eq!(1, tx.available_capacity()); @@ -768,7 +778,7 @@ mod tests { #[test] fn oversized_send_allowed_when_partial_capacity() { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap()); - let (mut tx, mut rx) = limited(limit, None); + let (mut tx, mut rx) = limited(limit, None, None); assert_eq!(2, tx.available_capacity()); diff --git a/lib/vector-buffers/src/topology/channel/mod.rs b/lib/vector-buffers/src/topology/channel/mod.rs index 303abee288cc4..8e030c622a0ef 100644 --- a/lib/vector-buffers/src/topology/channel/mod.rs +++ b/lib/vector-buffers/src/topology/channel/mod.rs @@ -3,7 +3,7 @@ mod receiver; mod sender; pub use limited_queue::{ - ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited, + ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited, }; pub use receiver::*; pub use sender::*; diff --git a/lib/vector-buffers/src/variants/in_memory.rs b/lib/vector-buffers/src/variants/in_memory.rs index f6bbe87c15b29..93937a591b133 100644 --- a/lib/vector-buffers/src/variants/in_memory.rs +++ b/lib/vector-buffers/src/variants/in_memory.rs @@ -45,7 +45,7 @@ where usage_handle.set_buffer_limits(max_bytes, max_size); - let (tx, rx) = limited(self.capacity, None); + let (tx, rx) = limited(self.capacity, None, None); Ok((tx.into(), rx.into())) } } diff --git a/lib/vector-core/src/config/global_options.rs b/lib/vector-core/src/config/global_options.rs index e789e8ae973ab..93ad1d80f5934 100644 --- a/lib/vector-core/src/config/global_options.rs +++ b/lib/vector-core/src/config/global_options.rs @@ -140,6 +140,19 @@ pub struct GlobalOptions { #[serde(skip_serializing_if = "crate::serde::is_default")] pub expire_metrics_per_metric_set: Option>, + /// The alpha value for the exponential weighted moving average (EWMA) of source and transform + /// buffer utilization metrics. + /// + /// This value specifies how much of the existing value is retained when each update is made. + /// Values closer to 1.0 result in the value adjusting slower to changes. The default value of + /// 0.9 is equivalent to a "half life" of 6-7 measurements. + /// + /// Must be between 0 and 1 exclusive (0 < alpha < 1). + #[serde(default, skip_serializing_if = "crate::serde::is_default")] + #[configurable(validation(range(min = 0.0, max = 1.0)))] + #[configurable(metadata(docs::advanced))] + pub buffer_utilization_ewma_alpha: Option, + /// The interval, in seconds, at which the internal metrics cache for VRL is refreshed. /// This must be set to be able to access metrics in VRL functions. /// @@ -295,6 +308,9 @@ impl GlobalOptions { expire_metrics: self.expire_metrics.or(with.expire_metrics), expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs), expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set, + buffer_utilization_ewma_alpha: self + .buffer_utilization_ewma_alpha + .or(with.buffer_utilization_ewma_alpha), metrics_storage_refresh_period: self .metrics_storage_refresh_period .or(with.metrics_storage_refresh_period), diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index d8ec7e57a11ad..0fd5cb034b703 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -490,6 +490,7 @@ mod tests { WhenFull::Block, &Span::current(), None, + None, ) } diff --git a/lib/vector-core/src/source_sender/builder.rs b/lib/vector-core/src/source_sender/builder.rs index 452d890f0d863..1992dc531ee9a 100644 --- a/lib/vector-core/src/source_sender/builder.rs +++ b/lib/vector-core/src/source_sender/builder.rs @@ -13,6 +13,7 @@ pub struct Builder { named_outputs: HashMap, lag_time: Option, timeout: Option, + ewma_alpha: Option, } impl Default for Builder { @@ -23,6 +24,7 @@ impl Default for Builder { named_outputs: Default::default(), lag_time: Some(histogram!(LAG_TIME_NAME)), timeout: None, + ewma_alpha: None, } } } @@ -40,6 +42,12 @@ impl Builder { self } + #[must_use] + pub fn with_ewma_alpha(mut self, alpha: Option) -> Self { + self.ewma_alpha = alpha; + self + } + pub fn add_source_output( &mut self, output: SourceOutput, @@ -60,6 +68,7 @@ impl Builder { log_definition, output_id, self.timeout, + self.ewma_alpha, ); self.default_output = Some(output); rx @@ -72,6 +81,7 @@ impl Builder { log_definition, output_id, self.timeout, + self.ewma_alpha, ); self.named_outputs.insert(name, output); rx diff --git a/lib/vector-core/src/source_sender/output.rs b/lib/vector-core/src/source_sender/output.rs index 5e470c0231d94..4b9f29efc1f58 100644 --- a/lib/vector-core/src/source_sender/output.rs +++ b/lib/vector-core/src/source_sender/output.rs @@ -115,10 +115,11 @@ impl Output { log_definition: Option>, output_id: OutputId, timeout: Option, + ewma_alpha: Option, ) -> (Self, LimitedReceiver) { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap()); let metrics = ChannelMetricMetadata::new(UTILIZATION_METRIC_PREFIX, Some(output.clone())); - let (tx, rx) = channel::limited(limit, Some(metrics)); + let (tx, rx) = channel::limited(limit, Some(metrics), ewma_alpha); ( Self { sender: tx, diff --git a/lib/vector-core/src/source_sender/sender.rs b/lib/vector-core/src/source_sender/sender.rs index 8bbf09404e755..88cb50172c70a 100644 --- a/lib/vector-core/src/source_sender/sender.rs +++ b/lib/vector-core/src/source_sender/sender.rs @@ -119,6 +119,7 @@ impl SourceSender { None, output_id, timeout, + None, ); ( Self { @@ -192,7 +193,7 @@ impl SourceSender { port: Some(name.clone()), }; let (output, recv) = - Output::new_with_buffer(100, name.clone(), None, None, output_id, None); + Output::new_with_buffer(100, name.clone(), None, None, output_id, None, None); let recv = recv.into_stream().map(move |mut item| { item.events.iter_events_mut().for_each(|mut event| { let metadata = event.metadata_mut(); diff --git a/lib/vector-tap/src/controller.rs b/lib/vector-tap/src/controller.rs index f6e53b07691b3..dba9a7ec7b885 100644 --- a/lib/vector-tap/src/controller.rs +++ b/lib/vector-tap/src/controller.rs @@ -361,6 +361,7 @@ async fn tap_handler( WhenFull::DropNewest, &Span::current(), None, + None, ); let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone()); diff --git a/src/config/compiler.rs b/src/config/compiler.rs index 734fd18c98205..ecb90f7e36f15 100644 --- a/src/config/compiler.rs +++ b/src/config/compiler.rs @@ -36,6 +36,10 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec), Vec< errors.extend(output_errors); } + if let Err(alpha_errors) = validation::check_buffer_utilization_ewma_alpha(&builder) { + errors.extend(alpha_errors); + } + let ConfigBuilder { global, #[cfg(feature = "api")] diff --git a/src/config/validation.rs b/src/config/validation.rs index 0878eedd5948a..0b629ea9088a6 100644 --- a/src/config/validation.rs +++ b/src/config/validation.rs @@ -11,6 +11,14 @@ use super::{ }; use crate::config::schema; +/// Minimum value (exclusive) for `utilization_ewma_alpha`. +/// The alpha value must be strictly greater than this value. +const EWMA_ALPHA_MIN: f64 = 0.0; + +/// Maximum value (exclusive) for `utilization_ewma_alpha`. +/// The alpha value must be strictly less than this value. +const EWMA_ALPHA_MAX: f64 = 1.0; + /// Check that provide + topology config aren't present in the same builder, which is an error. pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec> { if config.provider.is_some() @@ -147,6 +155,20 @@ pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec> { } } +/// Validates that `buffer_utilization_ewma_alpha` value is within the valid range (0 < alpha < 1) +/// for the global configuration. +pub fn check_buffer_utilization_ewma_alpha(config: &ConfigBuilder) -> Result<(), Vec> { + if let Some(alpha) = config.global.buffer_utilization_ewma_alpha + && (alpha <= EWMA_ALPHA_MIN || alpha >= EWMA_ALPHA_MAX) + { + Err(vec![format!( + "Global `buffer_utilization_ewma_alpha` must be between 0 and 1 exclusive (0 < alpha < 1), got {alpha}" + )]) + } else { + Ok(()) + } +} + /// To avoid collisions between `output` metric tags, check that a component /// does not have a named output with the name [`DEFAULT_OUTPUT`] pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec> { diff --git a/src/test_util/mock/sources/basic.rs b/src/test_util/mock/sources/basic.rs index 614f5f7387d43..c1c09477fcc38 100644 --- a/src/test_util/mock/sources/basic.rs +++ b/src/test_util/mock/sources/basic.rs @@ -46,7 +46,7 @@ pub struct BasicSourceConfig { impl Default for BasicSourceConfig { fn default() -> Self { let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1000).unwrap()); - let (_, receiver) = limited(limit, None); + let (_, receiver) = limited(limit, None, None); Self { receiver: Arc::new(Mutex::new(Some(receiver))), event_counter: None, diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 10336af293597..ba19f1dc61d78 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -265,7 +265,8 @@ impl<'a> Builder<'a> { let mut builder = SourceSender::builder() .with_buffer(*SOURCE_SENDER_BUFFER_SIZE) - .with_timeout(source.inner.send_timeout()); + .with_timeout(source.inner.send_timeout()) + .with_ewma_alpha(self.config.global.buffer_utilization_ewma_alpha); let mut pumps = Vec::new(); let mut controls = HashMap::new(); let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); @@ -528,6 +529,7 @@ impl<'a> Builder<'a> { WhenFull::Block, &span, Some(metrics), + self.config.global.buffer_utilization_ewma_alpha, ); self.inputs diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 2e33a82d85193..6bf097aa7c52c 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -701,6 +701,20 @@ generated: configuration: configuration: { type: bool: {} } } + buffer_utilization_ewma_alpha: { + description: """ + The alpha value for the exponential weighted moving average (EWMA) of source and transform + buffer utilization metrics. + + This value specifies how much of the existing value is retained when each update is made. + Values closer to 1.0 result in the value adjusting slower to changes. The default value of + 0.9 is equivalent to a "half life" of 6-7 measurements. + + Must be between 0 and 1 exclusive (0 < alpha < 1). + """ + required: false + type: float: {} + } data_dir: { common: false description: """