Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Added `buffer_utilization_ewma_alpha` configuration option to the global
options, allowing users to control the alpha value for the exponentially
weighted moving average (EWMA) used in source and transform buffer utilization
metrics.

authors: bruceg
5 changes: 3 additions & 2 deletions lib/vector-buffers/src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,13 @@ impl<T: Bufferable> TopologyBuilder<T> {
when_full: WhenFull,
receiver_span: &Span,
metadata: Option<ChannelMetricMetadata>,
ewma_alpha: Option<f64>,
) -> (BufferSender<T>, BufferReceiver<T>) {
let usage_handle = BufferUsageHandle::noop();
usage_handle.set_buffer_limits(None, Some(max_events.get()));

let limit = MemoryBufferSize::MaxEvents(max_events);
let (sender, receiver) = limited(limit, metadata);
let (sender, receiver) = limited(limit, metadata, ewma_alpha);

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
Expand Down Expand Up @@ -232,7 +233,7 @@ impl<T: Bufferable> TopologyBuilder<T> {
usage_handle.set_buffer_limits(None, Some(max_events.get()));

let limit = MemoryBufferSize::MaxEvents(max_events);
let (sender, receiver) = limited(limit, metadata);
let (sender, receiver) = limited(limit, metadata, None);

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
Expand Down
38 changes: 24 additions & 14 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{InMemoryBufferable, config::MemoryBufferSize};
/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
/// measure of how much weight to give to the current value versus the previous values. A value of
/// 0.9 results in a "half life" of 6-7 measurements.
const EWMA_ALPHA: f64 = 0.9;
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;

/// Error returned by `LimitedSender::send` when the receiver has disconnected.
#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -127,7 +127,11 @@ struct Metrics {

impl Metrics {
#[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
fn new(limit: MemoryBufferSize, metadata: ChannelMetricMetadata) -> Self {
fn new(
limit: MemoryBufferSize,
metadata: ChannelMetricMetadata,
ewma_alpha: Option<f64>,
) -> Self {
let ChannelMetricMetadata { prefix, output } = metadata;
let (gauge_suffix, max_value) = match limit {
MemoryBufferSize::MaxEvents(max_events) => ("_max_event_size", max_events.get() as f64),
Expand All @@ -137,7 +141,7 @@ impl Metrics {
let histogram_name = format!("{prefix}_utilization");
let gauge_name = format!("{prefix}_utilization_level");
let mean_name = format!("{prefix}_utilization_mean");
let ewma = Arc::new(AtomicEwma::new(EWMA_ALPHA));
let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
#[cfg(test)]
let recorded_values = Arc::new(Mutex::new(Vec::new()));
if let Some(label_value) = output {
Expand Down Expand Up @@ -202,9 +206,13 @@ impl<T> Clone for Inner<T> {
}

impl<T: InMemoryBufferable> Inner<T> {
fn new(limit: MemoryBufferSize, metric_metadata: Option<ChannelMetricMetadata>) -> Self {
fn new(
limit: MemoryBufferSize,
metric_metadata: Option<ChannelMetricMetadata>,
ewma_alpha: Option<f64>,
) -> Self {
let read_waker = Arc::new(Notify::new());
let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata));
let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_alpha));
match limit {
MemoryBufferSize::MaxEvents(max_events) => Inner {
data: Arc::new(ArrayQueue::new(max_events.get())),
Expand Down Expand Up @@ -397,8 +405,9 @@ impl<T> Drop for LimitedReceiver<T> {
pub fn limited<T: InMemoryBufferable + fmt::Debug>(
limit: MemoryBufferSize,
metric_metadata: Option<ChannelMetricMetadata>,
ewma_alpha: Option<f64>,
) -> (LimitedSender<T>, LimitedReceiver<T>) {
let inner = Inner::new(limit, metric_metadata);
let inner = Inner::new(limit, metric_metadata, ewma_alpha);

let sender = LimitedSender {
inner: inner.clone(),
Expand Down Expand Up @@ -426,7 +435,7 @@ mod tests {
#[tokio::test]
async fn send_receive() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
let (mut tx, mut rx) = limited(limit, None);
let (mut tx, mut rx) = limited(limit, None, None);

assert_eq!(2, tx.available_capacity());

Expand Down Expand Up @@ -458,6 +467,7 @@ mod tests {
let (mut tx, mut rx) = limited(
limit,
Some(ChannelMetricMetadata::new("test_channel", None)),
None,
);

let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
Expand All @@ -477,7 +487,7 @@ mod tests {

// With this configuration a maximum of exactly 10 messages can fit in the channel
let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
let (mut tx, mut rx) = limited(limit, None);
let (mut tx, mut rx) = limited(limit, None, None);

assert_eq!(max_allowed_bytes, tx.available_capacity());

Expand Down Expand Up @@ -511,7 +521,7 @@ mod tests {
#[test]
fn sender_waits_for_more_capacity_when_none_available() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
let (mut tx, mut rx) = limited(limit, None);
let (mut tx, mut rx) = limited(limit, None, None);

assert_eq!(1, tx.available_capacity());

Expand Down Expand Up @@ -573,7 +583,7 @@ mod tests {
#[test]
fn sender_waits_for_more_capacity_when_partial_available() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
let (mut tx, mut rx) = limited(limit, None);
let (mut tx, mut rx) = limited(limit, None, None);

assert_eq!(7, tx.available_capacity());

Expand Down Expand Up @@ -662,7 +672,7 @@ mod tests {
#[test]
fn empty_receiver_returns_none_when_last_sender_drops() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
let (mut tx, mut rx) = limited(limit, None);
let (mut tx, mut rx) = limited(limit, None, None);

assert_eq!(1, tx.available_capacity());

Expand Down Expand Up @@ -705,7 +715,7 @@ mod tests {
#[test]
fn receiver_returns_none_once_empty_when_last_sender_drops() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
let (tx, mut rx) = limited::<Sample>(limit, None);
let (tx, mut rx) = limited::<Sample>(limit, None, None);

assert_eq!(1, tx.available_capacity());

Expand Down Expand Up @@ -735,7 +745,7 @@ mod tests {
#[test]
fn oversized_send_allowed_when_empty() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
let (mut tx, mut rx) = limited(limit, None);
let (mut tx, mut rx) = limited(limit, None, None);

assert_eq!(1, tx.available_capacity());

Expand Down Expand Up @@ -768,7 +778,7 @@ mod tests {
#[test]
fn oversized_send_allowed_when_partial_capacity() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
let (mut tx, mut rx) = limited(limit, None);
let (mut tx, mut rx) = limited(limit, None, None);

assert_eq!(2, tx.available_capacity());

Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/topology/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod receiver;
mod sender;

pub use limited_queue::{
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited,
};
pub use receiver::*;
pub use sender::*;
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where

usage_handle.set_buffer_limits(max_bytes, max_size);

let (tx, rx) = limited(self.capacity, None);
let (tx, rx) = limited(self.capacity, None, None);
Ok((tx.into(), rx.into()))
}
}
16 changes: 16 additions & 0 deletions lib/vector-core/src/config/global_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,19 @@ pub struct GlobalOptions {
#[serde(skip_serializing_if = "crate::serde::is_default")]
pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,

/// The alpha value for the exponential weighted moving average (EWMA) of source and transform
/// buffer utilization metrics.
///
/// This value specifies how much of the existing value is retained when each update is made.
/// Values closer to 1.0 result in the value adjusting slower to changes. The default value of
/// 0.9 is equivalent to a "half life" of 6-7 measurements.
///
/// Must be between 0 and 1 exclusive (0 < alpha < 1).
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[configurable(metadata(docs::advanced))]
pub buffer_utilization_ewma_alpha: Option<f64>,

/// The interval, in seconds, at which the internal metrics cache for VRL is refreshed.
/// This must be set to be able to access metrics in VRL functions.
///
Expand Down Expand Up @@ -295,6 +308,9 @@ impl GlobalOptions {
expire_metrics: self.expire_metrics.or(with.expire_metrics),
expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
buffer_utilization_ewma_alpha: self
.buffer_utilization_ewma_alpha
.or(with.buffer_utilization_ewma_alpha),
metrics_storage_refresh_period: self
.metrics_storage_refresh_period
.or(with.metrics_storage_refresh_period),
Expand Down
1 change: 1 addition & 0 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ mod tests {
WhenFull::Block,
&Span::current(),
None,
None,
)
}

Expand Down
10 changes: 10 additions & 0 deletions lib/vector-core/src/source_sender/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct Builder {
named_outputs: HashMap<String, Output>,
lag_time: Option<Histogram>,
timeout: Option<Duration>,
ewma_alpha: Option<f64>,
}

impl Default for Builder {
Expand All @@ -23,6 +24,7 @@ impl Default for Builder {
named_outputs: Default::default(),
lag_time: Some(histogram!(LAG_TIME_NAME)),
timeout: None,
ewma_alpha: None,
}
}
}
Expand All @@ -40,6 +42,12 @@ impl Builder {
self
}

#[must_use]
pub fn with_ewma_alpha(mut self, alpha: Option<f64>) -> Self {
self.ewma_alpha = alpha;
self
}

pub fn add_source_output(
&mut self,
output: SourceOutput,
Expand All @@ -60,6 +68,7 @@ impl Builder {
log_definition,
output_id,
self.timeout,
self.ewma_alpha,
);
self.default_output = Some(output);
rx
Expand All @@ -72,6 +81,7 @@ impl Builder {
log_definition,
output_id,
self.timeout,
self.ewma_alpha,
);
self.named_outputs.insert(name, output);
rx
Expand Down
3 changes: 2 additions & 1 deletion lib/vector-core/src/source_sender/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ impl Output {
log_definition: Option<Arc<Definition>>,
output_id: OutputId,
timeout: Option<Duration>,
ewma_alpha: Option<f64>,
) -> (Self, LimitedReceiver<SourceSenderItem>) {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap());
let metrics = ChannelMetricMetadata::new(UTILIZATION_METRIC_PREFIX, Some(output.clone()));
let (tx, rx) = channel::limited(limit, Some(metrics));
let (tx, rx) = channel::limited(limit, Some(metrics), ewma_alpha);
(
Self {
sender: tx,
Expand Down
3 changes: 2 additions & 1 deletion lib/vector-core/src/source_sender/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl SourceSender {
None,
output_id,
timeout,
None,
);
(
Self {
Expand Down Expand Up @@ -192,7 +193,7 @@ impl SourceSender {
port: Some(name.clone()),
};
let (output, recv) =
Output::new_with_buffer(100, name.clone(), None, None, output_id, None);
Output::new_with_buffer(100, name.clone(), None, None, output_id, None, None);
let recv = recv.into_stream().map(move |mut item| {
item.events.iter_events_mut().for_each(|mut event| {
let metadata = event.metadata_mut();
Expand Down
1 change: 1 addition & 0 deletions lib/vector-tap/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ async fn tap_handler(
WhenFull::DropNewest,
&Span::current(),
None,
None,
);
let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone());

Expand Down
4 changes: 4 additions & 0 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
errors.extend(output_errors);
}

if let Err(alpha_errors) = validation::check_buffer_utilization_ewma_alpha(&builder) {
errors.extend(alpha_errors);
}

let ConfigBuilder {
global,
#[cfg(feature = "api")]
Expand Down
22 changes: 22 additions & 0 deletions src/config/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ use super::{
};
use crate::config::schema;

/// Minimum value (exclusive) for `utilization_ewma_alpha`.
/// The alpha value must be strictly greater than this value.
const EWMA_ALPHA_MIN: f64 = 0.0;

/// Maximum value (exclusive) for `utilization_ewma_alpha`.
/// The alpha value must be strictly less than this value.
const EWMA_ALPHA_MAX: f64 = 1.0;

/// Check that provide + topology config aren't present in the same builder, which is an error.
pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
if config.provider.is_some()
Expand Down Expand Up @@ -147,6 +155,20 @@ pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
}
}

/// Validates that `buffer_utilization_ewma_alpha` value is within the valid range (0 < alpha < 1)
/// for the global configuration.
pub fn check_buffer_utilization_ewma_alpha(config: &ConfigBuilder) -> Result<(), Vec<String>> {
if let Some(alpha) = config.global.buffer_utilization_ewma_alpha
&& (alpha <= EWMA_ALPHA_MIN || alpha >= EWMA_ALPHA_MAX)
{
Err(vec![format!(
"Global `buffer_utilization_ewma_alpha` must be between 0 and 1 exclusive (0 < alpha < 1), got {alpha}"
)])
} else {
Ok(())
}
}

/// To avoid collisions between `output` metric tags, check that a component
/// does not have a named output with the name [`DEFAULT_OUTPUT`]
pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
Expand Down
2 changes: 1 addition & 1 deletion src/test_util/mock/sources/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct BasicSourceConfig {
impl Default for BasicSourceConfig {
fn default() -> Self {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1000).unwrap());
let (_, receiver) = limited(limit, None);
let (_, receiver) = limited(limit, None, None);
Self {
receiver: Arc::new(Mutex::new(Some(receiver))),
event_counter: None,
Expand Down
4 changes: 3 additions & 1 deletion src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ impl<'a> Builder<'a> {

let mut builder = SourceSender::builder()
.with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
.with_timeout(source.inner.send_timeout());
.with_timeout(source.inner.send_timeout())
.with_ewma_alpha(self.config.global.buffer_utilization_ewma_alpha);
let mut pumps = Vec::new();
let mut controls = HashMap::new();
let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
Expand Down Expand Up @@ -528,6 +529,7 @@ impl<'a> Builder<'a> {
WhenFull::Block,
&span,
Some(metrics),
self.config.global.buffer_utilization_ewma_alpha,
);

self.inputs
Expand Down
Loading
Loading