Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ serial_test = { version = "3.2" }
cfg-if.workspace = true
clap.workspace = true
clap_complete.workspace = true
dashmap.workspace = true
indoc.workspace = true
paste.workspace = true
pin-project.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions changelog.d/event_processing_time_metrics.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added the `event_processing_time_seconds` histogram and `event_processing_time_mean_seconds` gauge internal_metrics,
exposing the total time events spend between the originating source and final sink in a topology.

authors: bruceg
4 changes: 3 additions & 1 deletion docs/tutorials/lognamespacing.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ separate namespace to the event data.

The actual value to be placed into the field.

For the ingest timestamp this will be `chrono::Utc::now()`. Source type will be
The ingest timestamp should be recorded once per batch (e.g. `let now =
chrono::Utc::now();`) and passed into `insert_standard_vector_source_metadata`,
which takes care of updating the event metadata. The source type will be
the `NAME` property of the `Config` struct. `NAME` is provided by the
`configurable_component` macro. You may need to include `use vector_config::NamedComponent;`.

Expand Down
7 changes: 7 additions & 0 deletions lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ pub trait Bufferable: InMemoryBufferable + Encodable {}
// Blanket implementation for anything that is already bufferable.
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}

/// Hook for observing items as they are sent into a `BufferSender`.
pub trait BufferInstrumentation<T: Bufferable>: Send + Sync + 'static {
/// Called immediately before the item is emitted to the underlying buffer.
/// The underlying type is stored in an `Arc`, so we cannot have `&mut self`.
fn on_send(&self, item: &T);
}

pub trait EventCount {
fn event_count(&self) -> usize;
}
Expand Down
22 changes: 7 additions & 15 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@ use crossbeam_queue::{ArrayQueue, SegQueue};
use futures::Stream;
use metrics::{Gauge, Histogram, gauge, histogram};
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
use vector_common::stats::AtomicEwma;
use vector_common::stats::EwmaGauge;

use crate::{InMemoryBufferable, config::MemoryBufferSize};

/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
/// measure of how much weight to give to the current value versus the previous values. A value of
/// 0.9 results in a "half life" of 6-7 measurements.
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;

/// Error returned by `LimitedSender::send` when the receiver has disconnected.
#[derive(Debug, PartialEq, Eq)]
pub struct SendError<T>(pub T);
Expand Down Expand Up @@ -114,8 +109,7 @@ impl ChannelMetricMetadata {
struct Metrics {
histogram: Histogram,
gauge: Gauge,
mean_gauge: Gauge,
ewma: Arc<AtomicEwma>,
mean_gauge: EwmaGauge,
// We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
// since the value is static, we never need to update it. The compiler detects this as an unused
// field, so we need to suppress the warning here.
Expand Down Expand Up @@ -150,37 +144,36 @@ impl Metrics {
let histogram_name = format!("{prefix}_utilization");
let gauge_name = format!("{prefix}_utilization_level");
let mean_name = format!("{prefix}_utilization_mean");
let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
#[cfg(test)]
let recorded_values = Arc::new(Mutex::new(Vec::new()));
if let Some(label_value) = output {
let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
max_gauge.set(max_value);
let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
// DEPRECATED: buffer-bytes-events-metrics
let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
legacy_max_gauge.set(max_value);
Self {
histogram: histogram!(histogram_name, "output" => label_value.clone()),
gauge: gauge!(gauge_name, "output" => label_value.clone()),
mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
max_gauge,
ewma,
legacy_max_gauge,
#[cfg(test)]
recorded_values,
}
} else {
let max_gauge = gauge!(max_gauge_name);
max_gauge.set(max_value);
let mean_gauge_handle = gauge!(mean_name);
// DEPRECATED: buffer-bytes-events-metrics
let legacy_max_gauge = gauge!(legacy_max_gauge_name);
legacy_max_gauge.set(max_value);
Self {
histogram: histogram!(histogram_name),
gauge: gauge!(gauge_name),
mean_gauge: gauge!(mean_name),
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
max_gauge,
ewma,
legacy_max_gauge,
#[cfg(test)]
recorded_values,
Expand All @@ -192,8 +185,7 @@ impl Metrics {
fn record(&self, value: usize) {
self.histogram.record(value as f64);
self.gauge.set(value as f64);
let avg = self.ewma.update(value as f64);
self.mean_gauge.set(avg);
self.mean_gauge.record(value as f64);
#[cfg(test)]
if let Ok(mut recorded) = self.recorded_values.lock() {
recorded.push(value);
Expand Down
3 changes: 2 additions & 1 deletion lib/vector-buffers/src/topology/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ mod receiver;
mod sender;

pub use limited_queue::{
ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited,
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
};
pub use receiver::*;
pub use sender::*;
pub use vector_common::stats::DEFAULT_EWMA_ALPHA;

#[cfg(test)]
mod tests;
28 changes: 20 additions & 8 deletions lib/vector-buffers/src/topology/channel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use vector_common::internal_event::{InternalEventHandle, Registered, register};

use super::limited_queue::LimitedSender;
use crate::{
Bufferable, WhenFull,
BufferInstrumentation, Bufferable, WhenFull,
buffer_usage_data::BufferUsageHandle,
internal_events::BufferSendDuration,
variants::disk_v2::{self, ProductionFilesystem},
Expand Down Expand Up @@ -134,9 +134,11 @@ pub struct BufferSender<T: Bufferable> {
base: SenderAdapter<T>,
overflow: Option<Box<BufferSender<T>>>,
when_full: WhenFull,
instrumentation: Option<BufferUsageHandle>,
usage_instrumentation: Option<BufferUsageHandle>,
#[derivative(Debug = "ignore")]
send_duration: Option<Registered<BufferSendDuration>>,
#[derivative(Debug = "ignore")]
custom_instrumentation: Option<Arc<dyn BufferInstrumentation<T>>>,
}

impl<T: Bufferable> BufferSender<T> {
Expand All @@ -146,8 +148,9 @@ impl<T: Bufferable> BufferSender<T> {
base,
overflow: None,
when_full,
instrumentation: None,
usage_instrumentation: None,
send_duration: None,
custom_instrumentation: None,
}
}

Expand All @@ -157,8 +160,9 @@ impl<T: Bufferable> BufferSender<T> {
base,
overflow: Some(Box::new(overflow)),
when_full: WhenFull::Overflow,
instrumentation: None,
usage_instrumentation: None,
send_duration: None,
custom_instrumentation: None,
}
}

Expand All @@ -174,14 +178,19 @@ impl<T: Bufferable> BufferSender<T> {

/// Configures this sender to instrument the items passing through it.
pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
self.instrumentation = Some(handle);
self.usage_instrumentation = Some(handle);
}

/// Configures this sender to instrument the send duration.
pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) {
let _enter = span.enter();
self.send_duration = Some(register(BufferSendDuration { stage }));
}

/// Configures this sender to invoke a custom instrumentation hook.
pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation<T>) {
self.custom_instrumentation = Some(Arc::new(instrumentation));
}
}

impl<T: Bufferable> BufferSender<T> {
Expand All @@ -197,14 +206,17 @@ impl<T: Bufferable> BufferSender<T> {

#[async_recursion]
pub async fn send(&mut self, item: T, send_reference: Option<Instant>) -> crate::Result<()> {
if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
instrumentation.on_send(&item);
}
let item_sizing = self
.instrumentation
.usage_instrumentation
.as_ref()
.map(|_| (item.event_count(), item.size_of()));

let mut was_dropped = false;

if let Some(instrumentation) = self.instrumentation.as_ref()
if let Some(instrumentation) = self.usage_instrumentation.as_ref()
&& let Some((item_count, item_size)) = item_sizing
{
instrumentation
Expand All @@ -229,7 +241,7 @@ impl<T: Bufferable> BufferSender<T> {
}
}

if let Some(instrumentation) = self.instrumentation.as_ref()
if let Some(instrumentation) = self.usage_instrumentation.as_ref()
&& let Some((item_count, item_size)) = item_sizing
&& was_dropped
{
Expand Down
32 changes: 32 additions & 0 deletions lib/vector-common/src/stats/ewma_gauge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::sync::Arc;

use metrics::Gauge;

use super::AtomicEwma;

/// The default alpha parameter used when constructing EWMA-backed gauges.
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;

/// Couples a [`Gauge`] with an [`AtomicEwma`] so gauge readings reflect the EWMA.
#[derive(Clone, Debug)]
pub struct EwmaGauge {
gauge: Gauge,
// Note that the `Gauge` internally is equivalent to an `Arc<AtomicF64>` so we need to use the
// same semantics for the EWMA calculation as well.
ewma: Arc<AtomicEwma>,
}

impl EwmaGauge {
#[must_use]
pub fn new(gauge: Gauge, alpha: Option<f64>) -> Self {
let alpha = alpha.unwrap_or(DEFAULT_EWMA_ALPHA);
let ewma = Arc::new(AtomicEwma::new(alpha));
Self { gauge, ewma }
}

/// Records a new value, updates the EWMA, and sets the gauge accordingly.
pub fn record(&self, value: f64) {
let average = self.ewma.update(value);
self.gauge.set(average);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#![allow(missing_docs)]

pub mod ewma_gauge;

pub use ewma_gauge::{DEFAULT_EWMA_ALPHA, EwmaGauge};

use std::sync::atomic::Ordering;

use crate::atomic::AtomicF64;
Expand Down
24 changes: 20 additions & 4 deletions lib/vector-core/src/config/global_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,29 @@ pub struct GlobalOptions {
/// The alpha value for the exponential weighted moving average (EWMA) of source and transform
/// buffer utilization metrics.
///
/// This value specifies how much of the existing value is retained when each update is made.
/// Values closer to 1.0 result in the value adjusting slower to changes. The default value of
/// 0.9 is equivalent to a "half life" of 6-7 measurements.
/// This controls how quickly the `*_buffer_utilization_mean` gauges respond to new
/// observations. Values closer to 1.0 retain more of the previous value, leading to slower
/// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
///
/// Must be between 0 and 1 exclusive (0 < alpha < 1).
/// Must be between 0 and 1 exclusively (0 < alpha < 1).
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[configurable(metadata(docs::advanced))]
pub buffer_utilization_ewma_alpha: Option<f64>,

/// The alpha value for the exponential weighted moving average (EWMA) of transform processing
/// time metrics.
///
/// This controls how quickly the `event_processing_time_mean_seconds` gauge responds to new
/// observations. Values closer to 1.0 retain more of the previous value, leading to slower
/// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
///
/// Must be between 0 and 1 exclusively (0 < alpha < 1).
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
#[configurable(validation(range(min = 0.0, max = 1.0)))]
#[configurable(metadata(docs::advanced))]
pub processing_time_ewma_alpha: Option<f64>,

/// The interval, in seconds, at which the internal metrics cache for VRL is refreshed.
/// This must be set to be able to access metrics in VRL functions.
///
Expand Down Expand Up @@ -311,6 +324,9 @@ impl GlobalOptions {
buffer_utilization_ewma_alpha: self
.buffer_utilization_ewma_alpha
.or(with.buffer_utilization_ewma_alpha),
processing_time_ewma_alpha: self
.processing_time_ewma_alpha
.or(with.processing_time_ewma_alpha),
metrics_storage_refresh_period: self
.metrics_storage_refresh_period
.or(with.metrics_storage_refresh_period),
Expand Down
1 change: 1 addition & 0 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ impl LogNamespace {
path!("source_type"),
Bytes::from_static(source_name.as_bytes()),
);
log.metadata_mut().set_ingest_timestamp(now);
self.insert_vector_metadata(
log,
log_schema().timestamp_key(),
Expand Down
18 changes: 18 additions & 0 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc};

use chrono::{DateTime, Utc};
use derivative::Derivative;
use lookup::OwnedTargetPath;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -78,6 +79,11 @@ pub(super) struct Inner {
/// An internal vector id that can be used to identify this event across all components.
#[derivative(PartialEq = "ignore")]
pub(crate) source_event_id: Option<Uuid>,

/// The timestamp when the event was ingested into Vector.
#[derivative(PartialEq = "ignore")]
#[serde(default, skip)]
pub(crate) ingest_timestamp: Option<DateTime<Utc>>,
}

/// Metric Origin metadata for submission to Datadog.
Expand Down Expand Up @@ -239,6 +245,17 @@ impl EventMetadata {
pub fn source_event_id(&self) -> Option<Uuid> {
self.0.source_event_id
}

/// Returns the ingest timestamp, if it exists.
#[must_use]
pub fn ingest_timestamp(&self) -> Option<DateTime<Utc>> {
self.0.ingest_timestamp
}

/// Sets the ingest timestamp to the provided value.
pub fn set_ingest_timestamp(&mut self, timestamp: DateTime<Utc>) {
self.get_mut().ingest_timestamp = Some(timestamp);
}
}

impl Default for Inner {
Expand All @@ -254,6 +271,7 @@ impl Default for Inner {
dropped_fields: ObjectMap::new(),
datadog_origin_metadata: None,
source_event_id: Some(Uuid::new_v4()),
ingest_timestamp: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/vector-core/src/event/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ impl From<Metadata> for EventMetadata {
dropped_fields: ObjectMap::new(),
datadog_origin_metadata,
source_event_id,
ingest_timestamp: None,
}))
}
}
Expand Down
9 changes: 9 additions & 0 deletions lib/vector-core/src/event/ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ impl<'a> EventRef<'a> {
_ => panic!("Failed type coercion, {self:?} is not a metric reference"),
}
}

/// Access the metadata for the event under this reference.
pub fn metadata(&self) -> &EventMetadata {
match self {
Self::Log(log) => log.metadata(),
Self::Metric(metric) => metric.metadata(),
Self::Trace(trace) => trace.metadata(),
}
}
}

impl<'a> From<&'a Event> for EventRef<'a> {
Expand Down
Loading
Loading