Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ serial_test = { version = "3.2" }
[dependencies]
cfg-if.workspace = true
clap.workspace = true
dashmap.workspace = true
indoc.workspace = true
paste.workspace = true
pin-project.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions changelog.d/event_processing_time_metrics.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added the `event_processing_time_seconds` histogram and `event_processing_time_mean_seconds` gauge internal_metrics,
exposing the total time events spend between the originating source and final sink in a topology.

authors: bruceg
10 changes: 4 additions & 6 deletions docs/tutorials/lognamespacing.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ separate namespace to the event data.

The actual value to be placed into the field.

For the ingest timestamp this will be `chrono::Utc::now()`. Source type will be
The ingest timestamp should be recorded on the event metadata using
`log.metadata_mut().set_ingest_timestamp(chrono::Utc::now())`. The source type will be
the `NAME` property of the `Config` struct. `NAME` is provided by the
`configurable_component` macro. You may need to include `use vector_config::NamedComponent;`.

Expand All @@ -127,11 +128,8 @@ insert both these fields into the Vector namespace:

```rust

log_namespace.insert_standard_vector_source_metadata(
log,
KafkaSourceConfig::NAME,
Utc::now(),
);
log.metadata_mut().set_ingest_timestamp(Utc::now());
log_namespace.insert_standard_vector_source_metadata(log, KafkaSourceConfig::NAME);
```

### Source Metadata
Expand Down
7 changes: 7 additions & 0 deletions lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ pub trait Bufferable: InMemoryBufferable + Encodable {}
// Blanket implementation for anything that is already bufferable.
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}

/// Hook for observing items as they are sent into a `BufferSender`.
pub trait BufferInstrumentation<T: Bufferable>: Send + Sync + 'static {
/// Called immediately before the item is emitted to the underlying buffer.
/// The underlying type is stored in an `Arc`, so we cannot have `&mut self`.
fn on_send(&self, item: &T);
}

pub trait EventCount {
fn event_count(&self) -> usize;
}
Expand Down
22 changes: 7 additions & 15 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@ use crossbeam_queue::{ArrayQueue, SegQueue};
use futures::Stream;
use metrics::{Gauge, Histogram, gauge, histogram};
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
use vector_common::stats::AtomicEwma;
use vector_common::stats::EwmaGauge;

use crate::{InMemoryBufferable, config::MemoryBufferSize};

/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
/// measure of how much weight to give to the current value versus the previous values. A value of
/// 0.9 results in a "half life" of 6-7 measurements.
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;

/// Error returned by `LimitedSender::send` when the receiver has disconnected.
#[derive(Debug, PartialEq, Eq)]
pub struct SendError<T>(pub T);
Expand Down Expand Up @@ -114,8 +109,7 @@ impl ChannelMetricMetadata {
struct Metrics {
histogram: Histogram,
gauge: Gauge,
mean_gauge: Gauge,
ewma: Arc<AtomicEwma>,
mean_gauge: EwmaGauge,
// We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
// since the value is static, we never need to update it. The compiler detects this as an unused
// field, so we need to suppress the warning here.
Expand All @@ -141,30 +135,29 @@ impl Metrics {
let histogram_name = format!("{prefix}_utilization");
let gauge_name = format!("{prefix}_utilization_level");
let mean_name = format!("{prefix}_utilization_mean");
let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
#[cfg(test)]
let recorded_values = Arc::new(Mutex::new(Vec::new()));
if let Some(label_value) = output {
let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
max_gauge.set(max_value);
let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
Self {
histogram: histogram!(histogram_name, "output" => label_value.clone()),
gauge: gauge!(gauge_name, "output" => label_value.clone()),
mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
max_gauge,
ewma,
#[cfg(test)]
recorded_values,
}
} else {
let max_gauge = gauge!(max_gauge_name);
max_gauge.set(max_value);
let mean_gauge_handle = gauge!(mean_name);
Self {
histogram: histogram!(histogram_name),
gauge: gauge!(gauge_name),
mean_gauge: gauge!(mean_name),
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
max_gauge,
ewma,
#[cfg(test)]
recorded_values,
}
Expand All @@ -175,8 +168,7 @@ impl Metrics {
fn record(&self, value: usize) {
self.histogram.record(value as f64);
self.gauge.set(value as f64);
let avg = self.ewma.update(value as f64);
self.mean_gauge.set(avg);
self.mean_gauge.record(value as f64);
#[cfg(test)]
if let Ok(mut recorded) = self.recorded_values.lock() {
recorded.push(value);
Expand Down
3 changes: 2 additions & 1 deletion lib/vector-buffers/src/topology/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ mod receiver;
mod sender;

pub use limited_queue::{
ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited,
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
};
pub use receiver::*;
pub use sender::*;
pub use vector_common::stats::DEFAULT_EWMA_ALPHA;

#[cfg(test)]
mod tests;
28 changes: 20 additions & 8 deletions lib/vector-buffers/src/topology/channel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use vector_common::internal_event::{InternalEventHandle, Registered, register};

use super::limited_queue::LimitedSender;
use crate::{
Bufferable, WhenFull,
BufferInstrumentation, Bufferable, WhenFull,
buffer_usage_data::BufferUsageHandle,
internal_events::BufferSendDuration,
variants::disk_v2::{self, ProductionFilesystem},
Expand Down Expand Up @@ -134,9 +134,11 @@ pub struct BufferSender<T: Bufferable> {
base: SenderAdapter<T>,
overflow: Option<Box<BufferSender<T>>>,
when_full: WhenFull,
instrumentation: Option<BufferUsageHandle>,
usage_instrumentation: Option<BufferUsageHandle>,
#[derivative(Debug = "ignore")]
send_duration: Option<Registered<BufferSendDuration>>,
#[derivative(Debug = "ignore")]
custom_instrumentation: Option<Arc<dyn BufferInstrumentation<T>>>,
}

impl<T: Bufferable> BufferSender<T> {
Expand All @@ -146,8 +148,9 @@ impl<T: Bufferable> BufferSender<T> {
base,
overflow: None,
when_full,
instrumentation: None,
usage_instrumentation: None,
send_duration: None,
custom_instrumentation: None,
}
}

Expand All @@ -157,8 +160,9 @@ impl<T: Bufferable> BufferSender<T> {
base,
overflow: Some(Box::new(overflow)),
when_full: WhenFull::Overflow,
instrumentation: None,
usage_instrumentation: None,
send_duration: None,
custom_instrumentation: None,
}
}

Expand All @@ -174,14 +178,19 @@ impl<T: Bufferable> BufferSender<T> {

/// Configures this sender to instrument the items passing through it.
pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
self.instrumentation = Some(handle);
self.usage_instrumentation = Some(handle);
}

/// Configures this sender to instrument the send duration.
pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) {
let _enter = span.enter();
self.send_duration = Some(register(BufferSendDuration { stage }));
}

/// Configures this sender to invoke a custom instrumentation hook.
pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation<T>) {
self.custom_instrumentation = Some(Arc::new(instrumentation));
}
}

impl<T: Bufferable> BufferSender<T> {
Expand All @@ -197,14 +206,17 @@ impl<T: Bufferable> BufferSender<T> {

#[async_recursion]
pub async fn send(&mut self, item: T, send_reference: Option<Instant>) -> crate::Result<()> {
if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
instrumentation.on_send(&item);
}
let item_sizing = self
.instrumentation
.usage_instrumentation
.as_ref()
.map(|_| (item.event_count(), item.size_of()));

let mut was_dropped = false;

if let Some(instrumentation) = self.instrumentation.as_ref()
if let Some(instrumentation) = self.usage_instrumentation.as_ref()
&& let Some((item_count, item_size)) = item_sizing
{
instrumentation
Expand All @@ -229,7 +241,7 @@ impl<T: Bufferable> BufferSender<T> {
}
}

if let Some(instrumentation) = self.instrumentation.as_ref()
if let Some(instrumentation) = self.usage_instrumentation.as_ref()
&& let Some((item_count, item_size)) = item_sizing
&& was_dropped
{
Expand Down
32 changes: 32 additions & 0 deletions lib/vector-common/src/stats/ewma_gauge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::sync::Arc;

use metrics::Gauge;

use super::AtomicEwma;

/// The default alpha parameter used when constructing EWMA-backed gauges.
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;

/// Couples a [`Gauge`] with an [`AtomicEwma`] so gauge readings reflect the EWMA.
#[derive(Clone, Debug)]
pub struct EwmaGauge {
gauge: Gauge,
// Note that the `Gauge` internally is equivalent to an `Arc<AtomicF64>` so we need to use the
// same semantics for the EWMA calculation as well.
ewma: Arc<AtomicEwma>,
}

impl EwmaGauge {
#[must_use]
pub fn new(gauge: Gauge, alpha: Option<f64>) -> Self {
let alpha = alpha.unwrap_or(DEFAULT_EWMA_ALPHA);
let ewma = Arc::new(AtomicEwma::new(alpha));
Self { gauge, ewma }
}

/// Records a new value, updates the EWMA, and sets the gauge accordingly.
pub fn record(&self, value: f64) {
let average = self.ewma.update(value);
self.gauge.set(average);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#![allow(missing_docs)]

pub mod ewma_gauge;

pub use ewma_gauge::{DEFAULT_EWMA_ALPHA, EwmaGauge};

use std::sync::atomic::Ordering;

use crate::atomic::AtomicF64;
Expand Down
24 changes: 20 additions & 4 deletions lib/vector-core/src/config/global_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,29 @@ pub struct GlobalOptions {
/// The alpha value for the exponential weighted moving average (EWMA) of source and transform
/// buffer utilization metrics.
///
/// This value specifies how much of the existing value is retained when each update is made.
/// Values closer to 1.0 result in the value adjusting slower to changes. The default value of
/// 0.9 is equivalent to a "half life" of 6-7 measurements.
/// This controls how quickly the `*_buffer_utilization_mean` gauges respond to new
/// observations. Values closer to 1.0 retain more of the previous value, leading to slower
/// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
///
/// Must be between 0 and 1 exclusive (0 < alpha < 1).
/// Must be between 0 and 1 exclusively (0 < alpha < 1).
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[configurable(metadata(docs::advanced))]
pub buffer_utilization_ewma_alpha: Option<f64>,

/// The alpha value for the exponential weighted moving average (EWMA) of transform processing
/// time metrics.
///
/// This controls how quickly the `event_processing_time_mean_seconds` gauge responds to new
/// observations. Values closer to 1.0 retain more of the previous value, leading to slower
/// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
///
/// Must be between 0 and 1 exclusively (0 < alpha < 1).
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[configurable(metadata(docs::advanced))]
pub processing_time_ewma_alpha: Option<f64>,

/// The interval, in seconds, at which the internal metrics cache for VRL is refreshed.
/// This must be set to be able to access metrics in VRL functions.
///
Expand Down Expand Up @@ -311,6 +324,9 @@ impl GlobalOptions {
buffer_utilization_ewma_alpha: self
.buffer_utilization_ewma_alpha
.or(with.buffer_utilization_ewma_alpha),
processing_time_ewma_alpha: self
.processing_time_ewma_alpha
.or(with.processing_time_ewma_alpha),
metrics_storage_refresh_period: self
.metrics_storage_refresh_period
.or(with.metrics_storage_refresh_period),
Expand Down
28 changes: 19 additions & 9 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc};

use bitmask_enum::bitmask;
use bytes::Bytes;
use chrono::{DateTime, Utc};

mod global_options;
mod log_schema;
Expand Down Expand Up @@ -487,28 +486,38 @@ impl LogNamespace {
}

/// Vector: The `ingest_timestamp`, and `source_type` fields are added to "event metadata", nested
/// under the name "vector". This data will be marked as read-only in VRL.
/// under the name "vector". This data will be marked as read-only in VRL. The `ingest_timestamp`
/// is only inserted if it already exists on the event metadata.
///
/// Legacy: The values of `source_type_key`, and `timestamp_key` are stored as keys on the event root,
/// only if a field with that name doesn't already exist.
#[allow(clippy::missing_panics_doc, reason = "Can only panic in test")]
pub fn insert_standard_vector_source_metadata(
&self,
log: &mut LogEvent,
source_name: &'static str,
now: DateTime<Utc>,
) {
self.insert_vector_metadata(
log,
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(source_name.as_bytes()),
);
self.insert_vector_metadata(
log,
log_schema().timestamp_key(),
path!("ingest_timestamp"),
now,
// We could automatically set the ingest timestamp here, but it's better to make the source
// set it so that it can ensure that all events in a batch have the same timestamp.
#[cfg(test)]
assert!(
log.metadata().ingest_timestamp().is_some(),
"Event ingest_timestamp was not set by the source"
);
if let Some(timestamp) = log.metadata().ingest_timestamp() {
self.insert_vector_metadata(
log,
log_schema().timestamp_key(),
path!("ingest_timestamp"),
timestamp,
);
}
}

/// Vector: This is added to the "event metadata", nested under the name "vector". This data
Expand Down Expand Up @@ -587,7 +596,8 @@ mod test {

let namespace = LogNamespace::Legacy;
let mut event = LogEvent::from("log");
namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
event.metadata_mut().set_ingest_timestamp(Utc::now());
namespace.insert_standard_vector_source_metadata(&mut event, "source");

assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
}
Expand Down
Loading
Loading