Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ serial_test = { version = "3.2" }
[dependencies]
cfg-if.workspace = true
clap.workspace = true
dashmap.workspace = true
indoc.workspace = true
paste.workspace = true
pin-project.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions changelog.d/event_processing_time_metrics.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added the `event_processing_time_seconds` histogram and `event_processing_time_mean_seconds` gauge internal_metrics,
exposing the total time events spend between the originating source and final sink in a topology.

authors: bruceg
10 changes: 4 additions & 6 deletions docs/tutorials/lognamespacing.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ separate namespace to the event data.

The actual value to be placed into the field.

For the ingest timestamp this will be `chrono::Utc::now()`. Source type will be
For the ingest timestamp this should be recorded on the event metadata via
`log.metadata_mut().set_ingest_timestamp(chrono::Utc::now())`. Source type will be
the `NAME` property of the `Config` struct. `NAME` is provided by the
`configurable_component` macro. You may need to include `use vector_config::NamedComponent;`.

Expand All @@ -127,11 +128,8 @@ insert both these fields into the Vector namespace:

```rust

log_namespace.insert_standard_vector_source_metadata(
log,
KafkaSourceConfig::NAME,
Utc::now(),
);
log.metadata_mut().set_ingest_timestamp(Utc::now());
log_namespace.insert_standard_vector_source_metadata(log, KafkaSourceConfig::NAME);
```

### Source Metadata
Expand Down
7 changes: 7 additions & 0 deletions lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ pub trait Bufferable: InMemoryBufferable + Encodable {}
// Blanket implementation for anything that is already bufferable.
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}

/// Hook for observing items as they are sent into a `BufferSender`.
pub trait BufferInstrumentation<T: Bufferable>: Send + Sync + 'static {
/// Called immediately before the item is emitted to the underlying buffer.
/// The underlying type is stored in an `Arc`, so we cannot have `&mut self`.
fn on_send(&self, item: &T);
}

pub trait EventCount {
fn event_count(&self) -> usize;
}
Expand Down
22 changes: 7 additions & 15 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@ use crossbeam_queue::{ArrayQueue, SegQueue};
use futures::Stream;
use metrics::{Gauge, Histogram, gauge, histogram};
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
use vector_common::stats::AtomicEwma;
use vector_common::stats::EwmaGauge;

use crate::{InMemoryBufferable, config::MemoryBufferSize};

/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
/// measure of how much weight to give to the current value versus the previous values. A value of
/// 0.9 results in a "half life" of 6-7 measurements.
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;

/// Error returned by `LimitedSender::send` when the receiver has disconnected.
#[derive(Debug, PartialEq, Eq)]
pub struct SendError<T>(pub T);
Expand Down Expand Up @@ -114,8 +109,7 @@ impl ChannelMetricMetadata {
struct Metrics {
histogram: Histogram,
gauge: Gauge,
mean_gauge: Gauge,
ewma: Arc<AtomicEwma>,
mean_gauge: EwmaGauge,
// We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
// since the value is static, we never need to update it. The compiler detects this as an unused
// field, so we need to suppress the warning here.
Expand All @@ -141,30 +135,29 @@ impl Metrics {
let histogram_name = format!("{prefix}_utilization");
let gauge_name = format!("{prefix}_utilization_level");
let mean_name = format!("{prefix}_utilization_mean");
let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
#[cfg(test)]
let recorded_values = Arc::new(Mutex::new(Vec::new()));
if let Some(label_value) = output {
let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
max_gauge.set(max_value);
let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
Self {
histogram: histogram!(histogram_name, "output" => label_value.clone()),
gauge: gauge!(gauge_name, "output" => label_value.clone()),
mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
max_gauge,
ewma,
#[cfg(test)]
recorded_values,
}
} else {
let max_gauge = gauge!(max_gauge_name);
max_gauge.set(max_value);
let mean_gauge_handle = gauge!(mean_name);
Self {
histogram: histogram!(histogram_name),
gauge: gauge!(gauge_name),
mean_gauge: gauge!(mean_name),
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
max_gauge,
ewma,
#[cfg(test)]
recorded_values,
}
Expand All @@ -175,8 +168,7 @@ impl Metrics {
fn record(&self, value: usize) {
self.histogram.record(value as f64);
self.gauge.set(value as f64);
let avg = self.ewma.update(value as f64);
self.mean_gauge.set(avg);
self.mean_gauge.record(value as f64);
#[cfg(test)]
if let Ok(mut recorded) = self.recorded_values.lock() {
recorded.push(value);
Expand Down
3 changes: 2 additions & 1 deletion lib/vector-buffers/src/topology/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ mod receiver;
mod sender;

pub use limited_queue::{
ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited,
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
};
pub use receiver::*;
pub use sender::*;
pub use vector_common::stats::DEFAULT_EWMA_ALPHA;

#[cfg(test)]
mod tests;
28 changes: 20 additions & 8 deletions lib/vector-buffers/src/topology/channel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use vector_common::internal_event::{InternalEventHandle, Registered, register};

use super::limited_queue::LimitedSender;
use crate::{
Bufferable, WhenFull,
BufferInstrumentation, Bufferable, WhenFull,
buffer_usage_data::BufferUsageHandle,
internal_events::BufferSendDuration,
variants::disk_v2::{self, ProductionFilesystem},
Expand Down Expand Up @@ -134,9 +134,11 @@ pub struct BufferSender<T: Bufferable> {
base: SenderAdapter<T>,
overflow: Option<Box<BufferSender<T>>>,
when_full: WhenFull,
instrumentation: Option<BufferUsageHandle>,
usage_instrumentation: Option<BufferUsageHandle>,
#[derivative(Debug = "ignore")]
send_duration: Option<Registered<BufferSendDuration>>,
#[derivative(Debug = "ignore")]
custom_instrumentation: Option<Arc<dyn BufferInstrumentation<T>>>,
}

impl<T: Bufferable> BufferSender<T> {
Expand All @@ -146,8 +148,9 @@ impl<T: Bufferable> BufferSender<T> {
base,
overflow: None,
when_full,
instrumentation: None,
usage_instrumentation: None,
send_duration: None,
custom_instrumentation: None,
}
}

Expand All @@ -157,8 +160,9 @@ impl<T: Bufferable> BufferSender<T> {
base,
overflow: Some(Box::new(overflow)),
when_full: WhenFull::Overflow,
instrumentation: None,
usage_instrumentation: None,
send_duration: None,
custom_instrumentation: None,
}
}

Expand All @@ -174,14 +178,19 @@ impl<T: Bufferable> BufferSender<T> {

/// Configures this sender to instrument the items passing through it.
pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
self.instrumentation = Some(handle);
self.usage_instrumentation = Some(handle);
}

/// Configures this sender to instrument the send duration.
pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) {
let _enter = span.enter();
self.send_duration = Some(register(BufferSendDuration { stage }));
}

/// Configures this sender to invoke a custom instrumentation hook.
pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation<T>) {
self.custom_instrumentation = Some(Arc::new(instrumentation));
}
}

impl<T: Bufferable> BufferSender<T> {
Expand All @@ -197,14 +206,17 @@ impl<T: Bufferable> BufferSender<T> {

#[async_recursion]
pub async fn send(&mut self, item: T, send_reference: Option<Instant>) -> crate::Result<()> {
if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
instrumentation.on_send(&item);
}
let item_sizing = self
.instrumentation
.usage_instrumentation
.as_ref()
.map(|_| (item.event_count(), item.size_of()));

let mut was_dropped = false;

if let Some(instrumentation) = self.instrumentation.as_ref()
if let Some(instrumentation) = self.usage_instrumentation.as_ref()
&& let Some((item_count, item_size)) = item_sizing
{
instrumentation
Expand All @@ -229,7 +241,7 @@ impl<T: Bufferable> BufferSender<T> {
}
}

if let Some(instrumentation) = self.instrumentation.as_ref()
if let Some(instrumentation) = self.usage_instrumentation.as_ref()
&& let Some((item_count, item_size)) = item_sizing
&& was_dropped
{
Expand Down
32 changes: 32 additions & 0 deletions lib/vector-common/src/stats/ewma_gauge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::sync::Arc;

use metrics::Gauge;

use super::AtomicEwma;

/// The default alpha parameter used when constructing EWMA-backed gauges.
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;

/// Couples a [`Gauge`] with an [`AtomicEwma`] so gauge readings reflect the EWMA.
#[derive(Clone, Debug)]
pub struct EwmaGauge {
gauge: Gauge,
// Note that the `Gauge` internally is equivalent to an `Arc<AtomicF64>` so we need to use the
// same semantics for the EWMA calculation as well.
ewma: Arc<AtomicEwma>,
}

impl EwmaGauge {
#[must_use]
pub fn new(gauge: Gauge, alpha: Option<f64>) -> Self {
let alpha = alpha.unwrap_or(DEFAULT_EWMA_ALPHA);
let ewma = Arc::new(AtomicEwma::new(alpha));
Self { gauge, ewma }
}

/// Records a new value, updates the EWMA, and sets the gauge accordingly.
pub fn record(&self, value: f64) {
let average = self.ewma.update(value);
self.gauge.set(average);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#![allow(missing_docs)]

pub mod ewma_gauge;

pub use ewma_gauge::{DEFAULT_EWMA_ALPHA, EwmaGauge};

use std::sync::atomic::Ordering;

use crate::atomic::AtomicF64;
Expand Down
24 changes: 20 additions & 4 deletions lib/vector-core/src/config/global_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,29 @@ pub struct GlobalOptions {
/// The alpha value for the exponential weighted moving average (EWMA) of source and transform
/// buffer utilization metrics.
///
/// This value specifies how much of the existing value is retained when each update is made.
/// Values closer to 1.0 result in the value adjusting slower to changes. The default value of
/// 0.9 is equivalent to a "half life" of 6-7 measurements.
/// This controls how quickly the `*_buffer_utilization_mean` gauges respond to new
/// observations. Values closer to 1.0 retain more of the previous value, leading to slower
/// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
///
/// Must be between 0 and 1 exclusive (0 < alpha < 1).
/// Must be between 0 and 1 exclusively (0 < alpha < 1).
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[configurable(metadata(docs::advanced))]
pub buffer_utilization_ewma_alpha: Option<f64>,

/// The alpha value for the exponential weighted moving average (EWMA) of transform processing
/// time metrics.
///
/// This controls how quickly the `event_processing_time_mean_seconds` gauge responds to new
/// observations. Values closer to 1.0 retain more of the previous value, leading to slower
/// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
///
/// Must be between 0 and 1 exclusively (0 < alpha < 1).
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[configurable(metadata(docs::advanced))]
pub processing_time_ewma_alpha: Option<f64>,

/// The interval, in seconds, at which the internal metrics cache for VRL is refreshed.
/// This must be set to be able to access metrics in VRL functions.
///
Expand Down Expand Up @@ -311,6 +324,9 @@ impl GlobalOptions {
buffer_utilization_ewma_alpha: self
.buffer_utilization_ewma_alpha
.or(with.buffer_utilization_ewma_alpha),
processing_time_ewma_alpha: self
.processing_time_ewma_alpha
.or(with.processing_time_ewma_alpha),
metrics_storage_refresh_period: self
.metrics_storage_refresh_period
.or(with.metrics_storage_refresh_period),
Expand Down
28 changes: 19 additions & 9 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc};

use bitmask_enum::bitmask;
use bytes::Bytes;
use chrono::{DateTime, Utc};

mod global_options;
mod log_schema;
Expand Down Expand Up @@ -487,28 +486,38 @@ impl LogNamespace {
}

/// Vector: The `ingest_timestamp`, and `source_type` fields are added to "event metadata", nested
/// under the name "vector". This data will be marked as read-only in VRL.
/// under the name "vector". This data will be marked as read-only in VRL. The `ingest_timestamp`
/// is only inserted if it already exists on the event metadata.
///
/// Legacy: The values of `source_type_key`, and `timestamp_key` are stored as keys on the event root,
/// only if a field with that name doesn't already exist.
#[allow(clippy::missing_panics_doc, reason = "Can only panic in test")]
pub fn insert_standard_vector_source_metadata(
&self,
log: &mut LogEvent,
source_name: &'static str,
now: DateTime<Utc>,
) {
Copy link
Contributor

@thomasqueirozb thomasqueirozb Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried the removal of now is creating a bit of an anti-pattern where log.metadata_mut().set_ingest_timestamp(now) needs to be called every time before insert_standard_vector_source_metadata.

Since we're already getting &mut LogEvent here, wouldn't it be better to have something like so:

    pub fn insert_standard_vector_source_metadata(
        &self,
        log: &mut LogEvent,
        source_name: &'static str,
        now: Option<DateTime<Utc>>, // Not sure if Option is needed here
    ) {
        if let Some(now) = now {
			log.metadata_mut().set_ingest_timestamp(now);
        }
		// ...

This way we can remove the many event.metadata_mut().set_ingest_timestamp(now); and log.metadata_mut().set_ingest_timestamp(now); lines. Not opposed to renaming the method or adding a wrapper method to make it more clear (maybe something like ingest_standard_vector_source_metadata, but not exactly that because insert and ingest look way too similar)

Or maybe we can move this into insert_vector_metadata (thinking of situations where insert_standard_vector_source_metadata isn't called but insert_vector_metadata is like in src/sources/aws_kinesis_firehose/handlers.rs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can get behind this, and it will shrink the diff too. I don't think insert_vector_metadata makes sense, though, as that only inserts one field into either the event metadata or a log field based on the namespace. i.e. it has nothing to do with timestamps specifically.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 1645439

self.insert_vector_metadata(
log,
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(source_name.as_bytes()),
);
self.insert_vector_metadata(
log,
log_schema().timestamp_key(),
path!("ingest_timestamp"),
now,
// We could automatically set the ingest timestamp here, but it's better to make the source
// set it so that it can ensure that all events in a batch have the same timestamp.
#[cfg(test)]
assert!(
log.metadata().ingest_timestamp().is_some(),
"Event ingest_timestamp was not set by the source"
);
if let Some(timestamp) = log.metadata().ingest_timestamp() {
self.insert_vector_metadata(
log,
log_schema().timestamp_key(),
path!("ingest_timestamp"),
timestamp,
);
}
}

/// Vector: This is added to the "event metadata", nested under the name "vector". This data
Expand Down Expand Up @@ -587,7 +596,8 @@ mod test {

let namespace = LogNamespace::Legacy;
let mut event = LogEvent::from("log");
namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
event.metadata_mut().set_ingest_timestamp(Utc::now());
namespace.insert_standard_vector_source_metadata(&mut event, "source");

assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
}
Expand Down
Loading
Loading