diff --git a/Cargo.lock b/Cargo.lock index fa825f40ccfad..7f9cf37ad5d67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12655,6 +12655,7 @@ dependencies = [ "console-subscriber", "criterion", "csv", + "dashmap", "databend-client", "deadpool", "derivative", diff --git a/Cargo.toml b/Cargo.toml index 54d61e5a10242..15dd81022592c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -210,6 +210,7 @@ serial_test = { version = "3.2" } [dependencies] cfg-if.workspace = true clap.workspace = true +dashmap.workspace = true indoc.workspace = true paste.workspace = true pin-project.workspace = true diff --git a/changelog.d/event_processing_time_metrics.enhancement.md b/changelog.d/event_processing_time_metrics.enhancement.md new file mode 100644 index 0000000000000..2041b920da91f --- /dev/null +++ b/changelog.d/event_processing_time_metrics.enhancement.md @@ -0,0 +1,4 @@ +Added the `event_processing_time_seconds` histogram and `event_processing_time_mean_seconds` gauge internal_metrics, +exposing the total time events spend between the originating source and final sink in a topology. + +authors: bruceg diff --git a/docs/tutorials/lognamespacing.md b/docs/tutorials/lognamespacing.md index de23447f796b7..25ac82f8d54b7 100644 --- a/docs/tutorials/lognamespacing.md +++ b/docs/tutorials/lognamespacing.md @@ -112,7 +112,8 @@ separate namespace to the event data. The actual value to be placed into the field. -For the ingest timestamp this will be `chrono::Utc::now()`. Source type will be +The ingest timestamp should be recorded on the event metadata using +`log.metadata_mut().set_ingest_timestamp(chrono::Utc::now())`. The source type will be the `NAME` property of the `Config` struct. `NAME` is provided by the `configurable_component` macro. You may need to include `use vector_config::NamedComponent;`. @@ -127,11 +128,8 @@ insert both these fields into the Vector namespace: ```rust - log_namespace.insert_standard_vector_source_metadata( - log, - KafkaSourceConfig::NAME, - Utc::now(), - ); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + log_namespace.insert_standard_vector_source_metadata(log, KafkaSourceConfig::NAME); ``` ### Source Metadata diff --git a/lib/vector-buffers/src/lib.rs b/lib/vector-buffers/src/lib.rs index d9c6373831c50..50daa4f19de1c 100644 --- a/lib/vector-buffers/src/lib.rs +++ b/lib/vector-buffers/src/lib.rs @@ -108,6 +108,13 @@ pub trait Bufferable: InMemoryBufferable + Encodable {} // Blanket implementation for anything that is already bufferable. impl Bufferable for T where T: InMemoryBufferable + Encodable {} +/// Hook for observing items as they are sent into a `BufferSender`. +pub trait BufferInstrumentation: Send + Sync + 'static { + /// Called immediately before the item is emitted to the underlying buffer. + /// The underlying type is stored in an `Arc`, so we cannot have `&mut self`. + fn on_send(&self, item: &T); +} + pub trait EventCount { fn event_count(&self) -> usize; } diff --git a/lib/vector-buffers/src/topology/channel/limited_queue.rs b/lib/vector-buffers/src/topology/channel/limited_queue.rs index cbb502b748dbd..6aa88ed86dd31 100644 --- a/lib/vector-buffers/src/topology/channel/limited_queue.rs +++ b/lib/vector-buffers/src/topology/channel/limited_queue.rs @@ -16,15 +16,10 @@ use crossbeam_queue::{ArrayQueue, SegQueue}; use futures::Stream; use metrics::{Gauge, Histogram, gauge, histogram}; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError}; -use vector_common::stats::AtomicEwma; +use vector_common::stats::EwmaGauge; use crate::{InMemoryBufferable, config::MemoryBufferSize}; -/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a -/// measure of how much weight to give to the current value versus the previous values. A value of -/// 0.9 results in a "half life" of 6-7 measurements. -pub const DEFAULT_EWMA_ALPHA: f64 = 0.9; - /// Error returned by `LimitedSender::send` when the receiver has disconnected. #[derive(Debug, PartialEq, Eq)] pub struct SendError(pub T); @@ -114,8 +109,7 @@ impl ChannelMetricMetadata { struct Metrics { histogram: Histogram, gauge: Gauge, - mean_gauge: Gauge, - ewma: Arc, + mean_gauge: EwmaGauge, // We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but // since the value is static, we never need to update it. The compiler detects this as an unused // field, so we need to suppress the warning here. @@ -141,30 +135,29 @@ impl Metrics { let histogram_name = format!("{prefix}_utilization"); let gauge_name = format!("{prefix}_utilization_level"); let mean_name = format!("{prefix}_utilization_mean"); - let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA))); #[cfg(test)] let recorded_values = Arc::new(Mutex::new(Vec::new())); if let Some(label_value) = output { let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone()); max_gauge.set(max_value); + let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone()); Self { histogram: histogram!(histogram_name, "output" => label_value.clone()), gauge: gauge!(gauge_name, "output" => label_value.clone()), - mean_gauge: gauge!(mean_name, "output" => label_value.clone()), + mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha), max_gauge, - ewma, #[cfg(test)] recorded_values, } } else { let max_gauge = gauge!(max_gauge_name); max_gauge.set(max_value); + let mean_gauge_handle = gauge!(mean_name); Self { histogram: histogram!(histogram_name), gauge: gauge!(gauge_name), - mean_gauge: gauge!(mean_name), + mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha), max_gauge, - ewma, #[cfg(test)] recorded_values, } @@ -175,8 +168,7 @@ impl Metrics { fn record(&self, value: usize) { self.histogram.record(value as f64); self.gauge.set(value as f64); - let avg = self.ewma.update(value as f64); - self.mean_gauge.set(avg); + self.mean_gauge.record(value as f64); #[cfg(test)] if let Ok(mut recorded) = self.recorded_values.lock() { recorded.push(value); diff --git a/lib/vector-buffers/src/topology/channel/mod.rs b/lib/vector-buffers/src/topology/channel/mod.rs index 8e030c622a0ef..300e79d88c872 100644 --- a/lib/vector-buffers/src/topology/channel/mod.rs +++ b/lib/vector-buffers/src/topology/channel/mod.rs @@ -3,10 +3,11 @@ mod receiver; mod sender; pub use limited_queue::{ - ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited, + ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited, }; pub use receiver::*; pub use sender::*; +pub use vector_common::stats::DEFAULT_EWMA_ALPHA; #[cfg(test)] mod tests; diff --git a/lib/vector-buffers/src/topology/channel/sender.rs b/lib/vector-buffers/src/topology/channel/sender.rs index e68b868e042e7..0a7116f8ceea9 100644 --- a/lib/vector-buffers/src/topology/channel/sender.rs +++ b/lib/vector-buffers/src/topology/channel/sender.rs @@ -8,7 +8,7 @@ use vector_common::internal_event::{InternalEventHandle, Registered, register}; use super::limited_queue::LimitedSender; use crate::{ - Bufferable, WhenFull, + BufferInstrumentation, Bufferable, WhenFull, buffer_usage_data::BufferUsageHandle, internal_events::BufferSendDuration, variants::disk_v2::{self, ProductionFilesystem}, @@ -134,9 +134,11 @@ pub struct BufferSender { base: SenderAdapter, overflow: Option>>, when_full: WhenFull, - instrumentation: Option, + usage_instrumentation: Option, #[derivative(Debug = "ignore")] send_duration: Option>, + #[derivative(Debug = "ignore")] + custom_instrumentation: Option>>, } impl BufferSender { @@ -146,8 +148,9 @@ impl BufferSender { base, overflow: None, when_full, - instrumentation: None, + usage_instrumentation: None, send_duration: None, + custom_instrumentation: None, } } @@ -157,8 +160,9 @@ impl BufferSender { base, overflow: Some(Box::new(overflow)), when_full: WhenFull::Overflow, - instrumentation: None, + usage_instrumentation: None, send_duration: None, + custom_instrumentation: None, } } @@ -174,7 +178,7 @@ impl BufferSender { /// Configures this sender to instrument the items passing through it. pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) { - self.instrumentation = Some(handle); + self.usage_instrumentation = Some(handle); } /// Configures this sender to instrument the send duration. @@ -182,6 +186,11 @@ impl BufferSender { let _enter = span.enter(); self.send_duration = Some(register(BufferSendDuration { stage })); } + + /// Configures this sender to invoke a custom instrumentation hook. + pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation) { + self.custom_instrumentation = Some(Arc::new(instrumentation)); + } } impl BufferSender { @@ -197,14 +206,17 @@ impl BufferSender { #[async_recursion] pub async fn send(&mut self, item: T, send_reference: Option) -> crate::Result<()> { + if let Some(instrumentation) = self.custom_instrumentation.as_ref() { + instrumentation.on_send(&item); + } let item_sizing = self - .instrumentation + .usage_instrumentation .as_ref() .map(|_| (item.event_count(), item.size_of())); let mut was_dropped = false; - if let Some(instrumentation) = self.instrumentation.as_ref() + if let Some(instrumentation) = self.usage_instrumentation.as_ref() && let Some((item_count, item_size)) = item_sizing { instrumentation @@ -229,7 +241,7 @@ impl BufferSender { } } - if let Some(instrumentation) = self.instrumentation.as_ref() + if let Some(instrumentation) = self.usage_instrumentation.as_ref() && let Some((item_count, item_size)) = item_sizing && was_dropped { diff --git a/lib/vector-common/src/stats/ewma_gauge.rs b/lib/vector-common/src/stats/ewma_gauge.rs new file mode 100644 index 0000000000000..ebd635cad1f78 --- /dev/null +++ b/lib/vector-common/src/stats/ewma_gauge.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use metrics::Gauge; + +use super::AtomicEwma; + +/// The default alpha parameter used when constructing EWMA-backed gauges. +pub const DEFAULT_EWMA_ALPHA: f64 = 0.9; + +/// Couples a [`Gauge`] with an [`AtomicEwma`] so gauge readings reflect the EWMA. +#[derive(Clone, Debug)] +pub struct EwmaGauge { + gauge: Gauge, + // Note that the `Gauge` internally is equivalent to an `Arc` so we need to use the + // same semantics for the EWMA calculation as well. + ewma: Arc, +} + +impl EwmaGauge { + #[must_use] + pub fn new(gauge: Gauge, alpha: Option) -> Self { + let alpha = alpha.unwrap_or(DEFAULT_EWMA_ALPHA); + let ewma = Arc::new(AtomicEwma::new(alpha)); + Self { gauge, ewma } + } + + /// Records a new value, updates the EWMA, and sets the gauge accordingly. + pub fn record(&self, value: f64) { + let average = self.ewma.update(value); + self.gauge.set(average); + } +} diff --git a/lib/vector-common/src/stats.rs b/lib/vector-common/src/stats/mod.rs similarity index 98% rename from lib/vector-common/src/stats.rs rename to lib/vector-common/src/stats/mod.rs index ffdaad8dac5e0..5c6c5bccc0240 100644 --- a/lib/vector-common/src/stats.rs +++ b/lib/vector-common/src/stats/mod.rs @@ -1,5 +1,9 @@ #![allow(missing_docs)] +pub mod ewma_gauge; + +pub use ewma_gauge::{DEFAULT_EWMA_ALPHA, EwmaGauge}; + use std::sync::atomic::Ordering; use crate::atomic::AtomicF64; diff --git a/lib/vector-core/src/config/global_options.rs b/lib/vector-core/src/config/global_options.rs index 93ad1d80f5934..0c034c473e5e9 100644 --- a/lib/vector-core/src/config/global_options.rs +++ b/lib/vector-core/src/config/global_options.rs @@ -143,16 +143,29 @@ pub struct GlobalOptions { /// The alpha value for the exponential weighted moving average (EWMA) of source and transform /// buffer utilization metrics. /// - /// This value specifies how much of the existing value is retained when each update is made. - /// Values closer to 1.0 result in the value adjusting slower to changes. The default value of - /// 0.9 is equivalent to a "half life" of 6-7 measurements. + /// This controls how quickly the `*_buffer_utilization_mean` gauges respond to new + /// observations. Values closer to 1.0 retain more of the previous value, leading to slower + /// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements. /// - /// Must be between 0 and 1 exclusive (0 < alpha < 1). + /// Must be between 0 and 1 exclusively (0 < alpha < 1). #[serde(default, skip_serializing_if = "crate::serde::is_default")] #[configurable(validation(range(min = 0.0, max = 1.0)))] #[configurable(metadata(docs::advanced))] pub buffer_utilization_ewma_alpha: Option, + /// The alpha value for the exponential weighted moving average (EWMA) of transform processing + /// time metrics. + /// + /// This controls how quickly the `event_processing_time_mean_seconds` gauge responds to new + /// observations. Values closer to 1.0 retain more of the previous value, leading to slower + /// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements. + /// + /// Must be between 0 and 1 exclusively (0 < alpha < 1). + #[serde(default, skip_serializing_if = "crate::serde::is_default")] + #[configurable(validation(range(min = 0.0, max = 1.0)))] + #[configurable(metadata(docs::advanced))] + pub processing_time_ewma_alpha: Option, + /// The interval, in seconds, at which the internal metrics cache for VRL is refreshed. /// This must be set to be able to access metrics in VRL functions. /// @@ -311,6 +324,9 @@ impl GlobalOptions { buffer_utilization_ewma_alpha: self .buffer_utilization_ewma_alpha .or(with.buffer_utilization_ewma_alpha), + processing_time_ewma_alpha: self + .processing_time_ewma_alpha + .or(with.processing_time_ewma_alpha), metrics_storage_refresh_period: self .metrics_storage_refresh_period .or(with.metrics_storage_refresh_period), diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index c86848d7b0be5..62636971bcfdb 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -2,7 +2,6 @@ use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc}; use bitmask_enum::bitmask; use bytes::Bytes; -use chrono::{DateTime, Utc}; mod global_options; mod log_schema; @@ -487,15 +486,16 @@ impl LogNamespace { } /// Vector: The `ingest_timestamp`, and `source_type` fields are added to "event metadata", nested - /// under the name "vector". This data will be marked as read-only in VRL. + /// under the name "vector". This data will be marked as read-only in VRL. The `ingest_timestamp` + /// is only inserted if it already exists on the event metadata. /// /// Legacy: The values of `source_type_key`, and `timestamp_key` are stored as keys on the event root, /// only if a field with that name doesn't already exist. + #[allow(clippy::missing_panics_doc, reason = "Can only panic in test")] pub fn insert_standard_vector_source_metadata( &self, log: &mut LogEvent, source_name: &'static str, - now: DateTime, ) { self.insert_vector_metadata( log, @@ -503,12 +503,21 @@ impl LogNamespace { path!("source_type"), Bytes::from_static(source_name.as_bytes()), ); - self.insert_vector_metadata( - log, - log_schema().timestamp_key(), - path!("ingest_timestamp"), - now, + // We could automatically set the ingest timestamp here, but it's better to make the source + // set it so that it can ensure that all events in a batch have the same timestamp. + #[cfg(test)] + assert!( + log.metadata().ingest_timestamp().is_some(), + "Event ingest_timestamp was not set by the source" ); + if let Some(timestamp) = log.metadata().ingest_timestamp() { + self.insert_vector_metadata( + log, + log_schema().timestamp_key(), + path!("ingest_timestamp"), + timestamp, + ); + } } /// Vector: This is added to the "event metadata", nested under the name "vector". This data @@ -587,7 +596,8 @@ mod test { let namespace = LogNamespace::Legacy; let mut event = LogEvent::from("log"); - namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now()); + event.metadata_mut().set_ingest_timestamp(Utc::now()); + namespace.insert_standard_vector_source_metadata(&mut event, "source"); assert!(event.get(event_path!("a", "b", "c", "d")).is_some()); } diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index f860b03bb207b..8123cf59ecfc4 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -2,6 +2,7 @@ use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc}; +use chrono::{DateTime, Utc}; use derivative::Derivative; use lookup::OwnedTargetPath; use serde::{Deserialize, Serialize}; @@ -78,6 +79,11 @@ pub(super) struct Inner { /// An internal vector id that can be used to identify this event across all components. #[derivative(PartialEq = "ignore")] pub(crate) source_event_id: Option, + + /// The timestamp when the event was ingested into Vector. + #[derivative(PartialEq = "ignore")] + #[serde(default, skip)] + pub(crate) ingest_timestamp: Option>, } /// Metric Origin metadata for submission to Datadog. @@ -239,6 +245,17 @@ impl EventMetadata { pub fn source_event_id(&self) -> Option { self.0.source_event_id } + + /// Returns the ingest timestamp, if it exists. + #[must_use] + pub fn ingest_timestamp(&self) -> Option> { + self.0.ingest_timestamp + } + + /// Sets the ingest timestamp to the provided value. + pub fn set_ingest_timestamp(&mut self, timestamp: DateTime) { + self.get_mut().ingest_timestamp = Some(timestamp); + } } impl Default for Inner { @@ -254,6 +271,7 @@ impl Default for Inner { dropped_fields: ObjectMap::new(), datadog_origin_metadata: None, source_event_id: Some(Uuid::new_v4()), + ingest_timestamp: None, } } } diff --git a/lib/vector-core/src/event/proto.rs b/lib/vector-core/src/event/proto.rs index 6a4796906d287..8ba73a9ae7540 100644 --- a/lib/vector-core/src/event/proto.rs +++ b/lib/vector-core/src/event/proto.rs @@ -688,6 +688,7 @@ impl From for EventMetadata { dropped_fields: ObjectMap::new(), datadog_origin_metadata, source_event_id, + ingest_timestamp: None, })) } } diff --git a/lib/vector-core/src/event/ref.rs b/lib/vector-core/src/event/ref.rs index 3c1b6d1953055..345a9ccaef880 100644 --- a/lib/vector-core/src/event/ref.rs +++ b/lib/vector-core/src/event/ref.rs @@ -64,6 +64,15 @@ impl<'a> EventRef<'a> { _ => panic!("Failed type coercion, {self:?} is not a metric reference"), } } + + /// Access the metadata for the event under this reference. + pub fn metadata(&self) -> &EventMetadata { + match self { + Self::Log(log) => log.metadata(), + Self::Metric(metric) => metric.metadata(), + Self::Trace(trace) => trace.metadata(), + } + } } impl<'a> From<&'a Event> for EventRef<'a> { diff --git a/lib/vector-core/src/source_sender/output.rs b/lib/vector-core/src/source_sender/output.rs index 4b9f29efc1f58..e06c80776af7d 100644 --- a/lib/vector-core/src/source_sender/output.rs +++ b/lib/vector-core/src/source_sender/output.rs @@ -5,7 +5,7 @@ use std::{ time::{Duration, Instant}, }; -use chrono::Utc; +use chrono::{DateTime, Utc}; use futures::{Stream, StreamExt as _}; use metrics::Histogram; use tracing::Span; @@ -135,13 +135,25 @@ impl Output { ) } + /// Set the ingest timestamp for any events that don't already have one. + fn ensure_ingest_timestamp(events: &mut EventArray, now: DateTime) { + events.iter_events_mut().for_each(|mut event| { + if event.metadata().ingest_timestamp().is_none() { + event.metadata_mut().set_ingest_timestamp(now); + } + }); + } + pub(super) async fn send( &mut self, mut events: EventArray, unsent_event_count: &mut UnsentEventCount, ) -> Result<(), SendError> { + let now = Utc::now(); + Self::ensure_ingest_timestamp(&mut events, now); + let send_reference = Instant::now(); - let reference = Utc::now().timestamp_millis(); + let reference = now.timestamp_millis(); events .iter_events() .for_each(|event| self.emit_lag_time(event, reference)); diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index a9d8aee743f9d..e8aa6f5dbc33a 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -587,7 +587,8 @@ fn spawn_input_driver( if component_type != ComponentType::Source && let Event::Log(log) = input_event.get_event() { - log_namespace.insert_standard_vector_source_metadata(log, "vector", now); + log.metadata_mut().set_ingest_timestamp(now); + log_namespace.insert_standard_vector_source_metadata(log, "vector"); } let (failure_case, mut event) = input_event.clone().get(); diff --git a/src/config/compiler.rs b/src/config/compiler.rs index ecb90f7e36f15..f493f8c8479b9 100644 --- a/src/config/compiler.rs +++ b/src/config/compiler.rs @@ -36,7 +36,7 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec), Vec< errors.extend(output_errors); } - if let Err(alpha_errors) = validation::check_buffer_utilization_ewma_alpha(&builder) { + if let Err(alpha_errors) = validation::check_values(&builder) { errors.extend(alpha_errors); } diff --git a/src/config/validation.rs b/src/config/validation.rs index 0b629ea9088a6..56964e0a4a9c7 100644 --- a/src/config/validation.rs +++ b/src/config/validation.rs @@ -11,11 +11,11 @@ use super::{ }; use crate::config::schema; -/// Minimum value (exclusive) for `utilization_ewma_alpha`. +/// Minimum value (exclusive) for EWMA alpha options. /// The alpha value must be strictly greater than this value. const EWMA_ALPHA_MIN: f64 = 0.0; -/// Maximum value (exclusive) for `utilization_ewma_alpha`. +/// Maximum value (exclusive) for EWMA alpha options. /// The alpha value must be strictly less than this value. const EWMA_ALPHA_MAX: f64 = 1.0; @@ -155,17 +155,29 @@ pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec> { } } -/// Validates that `buffer_utilization_ewma_alpha` value is within the valid range (0 < alpha < 1) -/// for the global configuration. -pub fn check_buffer_utilization_ewma_alpha(config: &ConfigBuilder) -> Result<(), Vec> { +/// Validates that `*_ewma_alpha` values are within the valid range (0 < alpha < 1). +pub fn check_values(config: &ConfigBuilder) -> Result<(), Vec> { + let mut errors = Vec::new(); + if let Some(alpha) = config.global.buffer_utilization_ewma_alpha && (alpha <= EWMA_ALPHA_MIN || alpha >= EWMA_ALPHA_MAX) { - Err(vec![format!( + errors.push(format!( "Global `buffer_utilization_ewma_alpha` must be between 0 and 1 exclusive (0 < alpha < 1), got {alpha}" - )]) - } else { + )); + } + if let Some(alpha) = config.global.processing_time_ewma_alpha + && (alpha <= EWMA_ALPHA_MIN || alpha >= EWMA_ALPHA_MAX) + { + errors.push(format!( + "Global `processing_time_ewma_alpha` must be between 0 and 1 exclusive (0 < alpha < 1), got {alpha}" + )); + } + + if errors.is_empty() { Ok(()) + } else { + Err(errors) } } diff --git a/src/enrichment_tables/memory/source.rs b/src/enrichment_tables/memory/source.rs index 74a5487efc3e5..8a312d99ebe16 100644 --- a/src/enrichment_tables/memory/source.rs +++ b/src/enrichment_tables/memory/source.rs @@ -107,11 +107,9 @@ impl MemorySource { EventMetadata::default(), )); let log = event.as_mut_log(); - self.log_namespace.insert_standard_vector_source_metadata( - log, - MemoryConfig::NAME, - utc_now, - ); + log.metadata_mut().set_ingest_timestamp(utc_now); + self.log_namespace + .insert_standard_vector_source_metadata(log, MemoryConfig::NAME); Some(event) }) @@ -158,11 +156,9 @@ impl MemorySource { EventMetadata::default(), )); let log = event.as_mut_log(); - self.log_namespace.insert_standard_vector_source_metadata( - log, - MemoryConfig::NAME, - Utc::now(), - ); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + self.log_namespace + .insert_standard_vector_source_metadata(log, MemoryConfig::NAME); Some(event) }, ) diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 862d6973aea5f..902272d405ef8 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -547,7 +547,8 @@ mod tests { log.insert(event_path!("message"), "the_message"); let namespace = log.namespace(); - namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now()); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent"); let tags = vec![ Value::Bytes("key1:value1".into()), @@ -594,7 +595,8 @@ mod tests { log.insert(metadata_path!("vector", "foo"), "bar"); let namespace = log.namespace(); - namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now()); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent"); let tags = vec![ Value::Bytes("key1:value1".into()), @@ -634,7 +636,8 @@ mod tests { ); let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition)); let namespace = log.namespace(); - namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now()); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent"); let tags = vec![ Value::Bytes("key1:value1".into()), diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 0c7f9a115a96b..539c61c6af2e9 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -944,11 +944,8 @@ mod integration_tests { path!("host"), "aws.cloud.eur", ); - LogNamespace::Vector.insert_standard_vector_source_metadata( - &mut namespaced_log, - "file", - now, - ); + namespaced_log.metadata_mut().set_ingest_timestamp(now); + LogNamespace::Vector.insert_standard_vector_source_metadata(&mut namespaced_log, "file"); let schema = BytesDeserializerConfig .schema_definition(LogNamespace::Vector) .with_metadata_field( diff --git a/src/sinks/loki/integration_tests.rs b/src/sinks/loki/integration_tests.rs index 05f9774dbe3fd..11d1ddb4dff0c 100644 --- a/src/sinks/loki/integration_tests.rs +++ b/src/sinks/loki/integration_tests.rs @@ -113,7 +113,8 @@ fn namespaced_timestamp_generator( log.insert(timestamp_field.as_str(), Value::from(timestamp)); // We need vector metadata for it to pick up that it is in the Vector namespace. - LogNamespace::Vector.insert_standard_vector_source_metadata(&mut log, "loki", Utc::now()); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + LogNamespace::Vector.insert_standard_vector_source_metadata(&mut log, "loki"); let schema = schema::Definition::new_with_default_metadata( Kind::object(Collection::empty()), diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index a0be621b0f7b8..fcaee12fc6658 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -293,6 +293,8 @@ fn populate_log_event( // This handles the transition from the original timestamp logic. Originally the // `timestamp_key` was populated by the `properties.timestamp()` time on the message, falling // back to calling `now()`. + let now = Utc::now(); + log.metadata_mut().set_ingest_timestamp(now); match log_namespace { LogNamespace::Vector => { if let Some(timestamp) = timestamp { @@ -302,11 +304,11 @@ fn populate_log_event( ); }; - log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); + log.insert(metadata_path!("vector", "ingest_timestamp"), now); } LogNamespace::Legacy => { if let Some(timestamp_key) = log_schema().timestamp_key_target_path() { - log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now)); + log.try_insert(timestamp_key, timestamp.unwrap_or(now)); } } }; diff --git a/src/sources/aws_kinesis_firehose/handlers.rs b/src/sources/aws_kinesis_firehose/handlers.rs index b9eefc857f3c3..5af9785b7b37c 100644 --- a/src/sources/aws_kinesis_firehose/handlers.rs +++ b/src/sources/aws_kinesis_firehose/handlers.rs @@ -92,6 +92,7 @@ pub(super) async fn firehose( event.add_batch_notifier(batch.clone()); } if let Event::Log(log) = event { + log.metadata_mut().set_ingest_timestamp(now); log_namespace.insert_vector_metadata( log, log_schema().source_type_key(), diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 9ad5b47be61cc..12ea005b0e52d 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -958,20 +958,19 @@ fn handle_single_log( // This handles the transition from the original timestamp logic. Originally the // `timestamp_key` was populated by the `last_modified` time on the object, falling // back to calling `now()`. + let now = Utc::now(); + log.metadata_mut().set_ingest_timestamp(now); match log_namespace { LogNamespace::Vector => { if let Some(timestamp) = timestamp { log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp); } - log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); + log.insert(metadata_path!("vector", "ingest_timestamp"), now); } LogNamespace::Legacy => { if let Some(timestamp_key) = log_schema().timestamp_key() { - log.try_insert( - (PathPrefix::Event, timestamp_key), - timestamp.unwrap_or_else(Utc::now), - ); + log.try_insert((PathPrefix::Event, timestamp_key), timestamp.unwrap_or(now)); } } }; diff --git a/src/sources/datadog_agent/logs.rs b/src/sources/datadog_agent/logs.rs index 7802def1dff2b..8c93a3a7be7b4 100644 --- a/src/sources/datadog_agent/logs.rs +++ b/src/sources/datadog_agent/logs.rs @@ -159,10 +159,10 @@ pub(crate) fn decode_log_body( // compute EstimatedJsonSizeOf before enrichment event_bytes_received += log.estimated_json_encoded_size_of(); + log.metadata_mut().set_ingest_timestamp(now); namespace.insert_standard_vector_source_metadata( log, DatadogAgentConfig::NAME, - now, ); if let Some(k) = &api_key { diff --git a/src/sources/demo_logs.rs b/src/sources/demo_logs.rs index 82c2fff9eb995..d5bed64f1f3d5 100644 --- a/src/sources/demo_logs.rs +++ b/src/sources/demo_logs.rs @@ -245,11 +245,9 @@ async fn demo_logs_source( let events = events.into_iter().map(|mut event| { let log = event.as_mut_log(); - log_namespace.insert_standard_vector_source_metadata( - log, - DemoLogsConfig::NAME, - now, - ); + log.metadata_mut().set_ingest_timestamp(now); + log_namespace + .insert_standard_vector_source_metadata(log, DemoLogsConfig::NAME); log_namespace.insert_source_metadata( DemoLogsConfig::NAME, log, diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 45268788d45cc..bf9e6c64642c3 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -1193,6 +1193,8 @@ impl ContainerLogInfo { // This handles the transition from the original timestamp logic. Originally the // `timestamp_key` was only populated when a timestamp was parsed from the event. + let now = Utc::now(); + log.metadata_mut().set_ingest_timestamp(now); match log_namespace { LogNamespace::Vector => { if let Some(timestamp) = timestamp { @@ -1202,7 +1204,7 @@ impl ContainerLogInfo { ); } - log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); + log.insert(metadata_path!("vector", "ingest_timestamp"), now); } LogNamespace::Legacy => { if let Some(timestamp) = timestamp diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 8563e8f5f69eb..b9450099ebc1c 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -673,7 +673,8 @@ fn handle_event( log_namespace: LogNamespace, ) { if let Event::Log(log) = event { - log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now()); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME); // Add data stream of stdin or stderr (if needed) if let Some(data_stream) = data_stream { diff --git a/src/sources/file_descriptors/mod.rs b/src/sources/file_descriptors/mod.rs index 3dca4f387e7c1..1ce5f0bc56505 100644 --- a/src/sources/file_descriptors/mod.rs +++ b/src/sources/file_descriptors/mod.rs @@ -148,11 +148,11 @@ async fn process_stream( match event{ Event::Log(_) => { let log = event.as_mut_log(); + log.metadata_mut().set_ingest_timestamp(now); log_namespace.insert_standard_vector_source_metadata( log, source_type, - now ); if let Some(hostname) = &hostname { diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index 4f1f3112e6f43..b899bdce34da2 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -793,10 +793,12 @@ impl From> for LogEvent { Bytes::from_static(FluentConfig::NAME.as_bytes()), ); + let now = Utc::now(); + log.metadata_mut().set_ingest_timestamp(now); match log_namespace { LogNamespace::Vector => { log.insert(metadata_path!(FluentConfig::NAME, "timestamp"), timestamp); - log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); + log.insert(metadata_path!("vector", "ingest_timestamp"), now); } LogNamespace::Legacy => { log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp); diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 02d682cccf967..4ef288d8b9b7a 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -415,7 +415,8 @@ fn line_to_events( for event in &mut events { if let Event::Log(log) = event { - log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now); + log.metadata_mut().set_ingest_timestamp(now); + log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME); } } diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 27d4643a73afc..0f7e3a505624f 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -565,11 +565,9 @@ impl http_client::HttpClientContext for HttpClientContext { for event in events { match event { Event::Log(log) => { - self.log_namespace.insert_standard_vector_source_metadata( - log, - HttpClientConfig::NAME, - now, - ); + log.metadata_mut().set_ingest_timestamp(now); + self.log_namespace + .insert_standard_vector_source_metadata(log, HttpClientConfig::NAME); } Event::Metric(metric) => { if let Some(source_type_key) = log_schema().source_type_key() { diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 3d77821af5ffa..6b915e409c547 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -438,6 +438,7 @@ impl HttpSource for SimpleHttpSource { for event in events.iter_mut() { match event { Event::Log(log) => { + log.metadata_mut().set_ingest_timestamp(now); // add request_path to each event self.log_namespace.insert_source_metadata( SimpleHttpConfig::NAME, @@ -447,11 +448,8 @@ impl HttpSource for SimpleHttpSource { request_path.to_owned(), ); - self.log_namespace.insert_standard_vector_source_metadata( - log, - SimpleHttpConfig::NAME, - now, - ); + self.log_namespace + .insert_standard_vector_source_metadata(log, SimpleHttpConfig::NAME); if let Some(addr) = source_ip { self.log_namespace.insert_source_metadata( diff --git a/src/sources/internal_logs.rs b/src/sources/internal_logs.rs index e7873a7e8a560..fc32c28308930 100644 --- a/src/sources/internal_logs.rs +++ b/src/sources/internal_logs.rs @@ -190,11 +190,8 @@ async fn run( pid, ); - log_namespace.insert_standard_vector_source_metadata( - &mut log, - InternalLogsConfig::NAME, - Utc::now(), - ); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + log_namespace.insert_standard_vector_source_metadata(&mut log, InternalLogsConfig::NAME); if (out.send_event(Event::from(log)).await).is_err() { // this wont trigger any infinite loop considering it stops the component diff --git a/src/sources/journald.rs b/src/sources/journald.rs index c6177fba1eb6c..95b2890065092 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -838,6 +838,8 @@ async fn get_systemd_version_from_journalctl(journalctl_path: &PathBuf) -> crate } fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) { + let now = Utc::now(); + log.metadata_mut().set_ingest_timestamp(now); match log_namespace { LogNamespace::Vector => { if let Some(host) = log @@ -886,7 +888,7 @@ fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) { // Add timestamp. match log_namespace { LogNamespace::Vector => { - log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now()); + log.insert(metadata_path!("vector", "ingest_timestamp"), now); if let Some(ts) = timestamp { log.insert(metadata_path!(JournaldConfig::NAME, "timestamp"), ts); diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 7f4895c8ed698..109a731e7c16c 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1099,17 +1099,15 @@ impl ReceivedMessage { fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) { if let Event::Log(log) = event { + log.metadata_mut().set_ingest_timestamp(Utc::now()); match log_namespace { LogNamespace::Vector => { // We'll only use this function in Vector namespaces because we don't want // "timestamp" to be set automatically in legacy namespaces. In legacy // namespaces, the "timestamp" field corresponds to the Kafka message, not the // timestamp when the event was processed. - log_namespace.insert_standard_vector_source_metadata( - log, - KafkaSourceConfig::NAME, - Utc::now(), - ); + log_namespace + .insert_standard_vector_source_metadata(log, KafkaSourceConfig::NAME); } LogNamespace::Legacy => { if let Some(source_type_key) = log_schema().source_type_key_target_path() { diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index fa001972a59d7..bc374f0ed782f 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -1018,16 +1018,18 @@ fn create_event( path!("source_type"), Bytes::from(Config::NAME), ); + let now = Utc::now(); + log.metadata_mut().set_ingest_timestamp(now); match (log_namespace, ingestion_timestamp_field) { // When using LogNamespace::Vector always set the ingest_timestamp. (LogNamespace::Vector, _) => { log.metadata_mut() .value_mut() - .insert(path!("vector", "ingest_timestamp"), Utc::now()); + .insert(path!("vector", "ingest_timestamp"), now); } // When LogNamespace::Legacy, only set when the `ingestion_timestamp_field` is configured. (LogNamespace::Legacy, Some(ingestion_timestamp_field)) => { - log.try_insert(ingestion_timestamp_field, Utc::now()) + log.try_insert(ingestion_timestamp_field, now) } // The CRI/Docker parsers handle inserting the `log_schema().timestamp_key()` value. (LogNamespace::Legacy, None) => (), diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index a643ce0af73ba..06734ae5a241d 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -211,6 +211,7 @@ impl TcpSource for LogstashSource { let now = chrono::Utc::now(); for event in events { let log = event.as_mut_log(); + log.metadata_mut().set_ingest_timestamp(now); self.log_namespace.insert_vector_metadata( log, diff --git a/src/sources/nats/source.rs b/src/sources/nats/source.rs index 12703ee2852dc..1a10ba1e3d096 100644 --- a/src/sources/nats/source.rs +++ b/src/sources/nats/source.rs @@ -60,11 +60,9 @@ pub async fn process_message( let now = Utc::now(); let events = events.into_iter().map(|mut event| { if let Event::Log(ref mut log) = event { - log_namespace.insert_standard_vector_source_metadata( - log, - NatsSourceConfig::NAME, - now, - ); + log.metadata_mut().set_ingest_timestamp(now); + log_namespace + .insert_standard_vector_source_metadata(log, NatsSourceConfig::NAME); let legacy_subject_key_field = config .subject_key_field .path diff --git a/src/sources/okta/client.rs b/src/sources/okta/client.rs index 4b06b6a6d71b9..cc83c3cdf2def 100644 --- a/src/sources/okta/client.rs +++ b/src/sources/okta/client.rs @@ -187,11 +187,9 @@ impl SourceConfig for OktaConfig { fn enrich_events(events: &mut Vec, log_namespace: LogNamespace) { let now = Utc::now(); for event in events { - log_namespace.insert_standard_vector_source_metadata( - event.as_mut_log(), - OktaConfig::NAME, - now, - ); + let log = event.as_mut_log(); + log.metadata_mut().set_ingest_timestamp(now); + log_namespace.insert_standard_vector_source_metadata(log, OktaConfig::NAME); } } diff --git a/src/sources/pulsar.rs b/src/sources/pulsar.rs index 495e878f5a799..e8ec78b287c25 100644 --- a/src/sources/pulsar.rs +++ b/src/sources/pulsar.rs @@ -404,10 +404,10 @@ async fn parse_message( let events = events.into_iter().map(|mut event| { if let Event::Log(ref mut log) = event { + log.metadata_mut().set_ingest_timestamp(now); log_namespace.insert_standard_vector_source_metadata( log, PulsarSourceConfig::NAME, - now, ); log_namespace.insert_source_metadata( diff --git a/src/sources/socket/tcp.rs b/src/sources/socket/tcp.rs index 5e1873e6f9cde..e4c4455e3e38a 100644 --- a/src/sources/socket/tcp.rs +++ b/src/sources/socket/tcp.rs @@ -222,11 +222,9 @@ impl TcpSource for RawTcpSource { for event in events { if let Event::Log(log) = event { - self.log_namespace.insert_standard_vector_source_metadata( - log, - SocketConfig::NAME, - now, - ); + log.metadata_mut().set_ingest_timestamp(now); + self.log_namespace + .insert_standard_vector_source_metadata(log, SocketConfig::NAME); let legacy_host_key = self .config diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index 84402e0264a95..08597f943d786 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -276,10 +276,10 @@ pub(super) fn udp( for event in &mut events { if let Event::Log(log) = event { + log.metadata_mut().set_ingest_timestamp(now); log_namespace.insert_standard_vector_source_metadata( log, SocketConfig::NAME, - now, ); let legacy_host_key = config diff --git a/src/sources/socket/unix.rs b/src/sources/socket/unix.rs index 0fb1a043e4ab1..a029460a05b01 100644 --- a/src/sources/socket/unix.rs +++ b/src/sources/socket/unix.rs @@ -101,7 +101,8 @@ fn handle_events( for event in events { if let Event::Log(log) = event { - log_namespace.insert_standard_vector_source_metadata(log, SocketConfig::NAME, now); + log.metadata_mut().set_ingest_timestamp(now); + log_namespace.insert_standard_vector_source_metadata(log, SocketConfig::NAME); if let Some(ref host) = received_from { let legacy_host_key = host_key.clone().path; diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index b020383b9fbda..cab69e79daa0c 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1119,7 +1119,8 @@ fn raw_event( ); } - log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, Utc::now()); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME); if let Some(batch) = batch { log = log.with_batch_notifier(&batch); diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 9a3d78b8e1bad..701fdcb569f80 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -444,8 +444,8 @@ fn enrich_syslog_event( parsed_host, ); } - - log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now()); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME); if log_namespace == LogNamespace::Legacy { let timestamp = log diff --git a/src/sources/util/message_decoding.rs b/src/sources/util/message_decoding.rs index df7bb540c073d..1d08b2f50a5ff 100644 --- a/src/sources/util/message_decoding.rs +++ b/src/sources/util/message_decoding.rs @@ -43,6 +43,7 @@ pub fn decode_message<'a>( path!("source_type"), Bytes::from(source_type), ); + log.metadata_mut().set_ingest_timestamp(now); match log_namespace { LogNamespace::Vector => { if let Some(timestamp) = timestamp { diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index 519365086a491..50711652a55d8 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -58,11 +58,9 @@ impl proto::Service for Service { let now = Utc::now(); for event in &mut events { if let Event::Log(log) = event { - self.log_namespace.insert_standard_vector_source_metadata( - log, - VectorConfig::NAME, - now, - ); + log.metadata_mut().set_ingest_timestamp(now); + self.log_namespace + .insert_standard_vector_source_metadata(log, VectorConfig::NAME); } } diff --git a/src/sources/websocket/source.rs b/src/sources/websocket/source.rs index cf3c75d40f534..26e5097e79cc5 100644 --- a/src/sources/websocket/source.rs +++ b/src/sources/websocket/source.rs @@ -256,9 +256,10 @@ impl WebSocketSource { } fn add_metadata(&self, event: &mut LogEvent) { + event.metadata_mut().set_ingest_timestamp(Utc::now()); self.params .log_namespace - .insert_standard_vector_source_metadata(event, WebSocketConfig::NAME, Utc::now()); + .insert_standard_vector_source_metadata(event, WebSocketConfig::NAME); } async fn reconnect( diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index daf0fee5927da..01e2c6fb307b6 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -10,14 +10,16 @@ use vector_lib::{ use self::{ sinks::{ - BackpressureSinkConfig, BasicSinkConfig, ErrorSinkConfig, OneshotSinkConfig, - PanicSinkConfig, + BackpressureSinkConfig, BasicSinkConfig, CompletionSinkConfig, ErrorSinkConfig, + OneshotSinkConfig, PanicSinkConfig, }, sources::{ BackpressureSourceConfig, BasicSourceConfig, ErrorSourceConfig, PanicSourceConfig, TripwireSourceConfig, }, - transforms::{BasicTransformConfig, ErrorDefinitionTransformConfig}, + transforms::{ + BasicTransformConfig, ErrorDefinitionTransformConfig, NoopTransformConfig, TransformType, + }, }; pub mod sinks; @@ -112,3 +114,11 @@ pub fn oneshot_sink(tx: Sender) -> OneshotSinkConfig { pub fn panic_sink() -> PanicSinkConfig { PanicSinkConfig::default() } + +pub fn completion_sink(expected: usize, tx: Sender) -> CompletionSinkConfig { + CompletionSinkConfig::new(expected, tx) +} + +pub fn noop_transform() -> NoopTransformConfig { + NoopTransformConfig::from(TransformType::Function) +} diff --git a/src/test_util/mock/sinks/completion.rs b/src/test_util/mock/sinks/completion.rs new file mode 100644 index 0000000000000..a390f33c9828b --- /dev/null +++ b/src/test_util/mock/sinks/completion.rs @@ -0,0 +1,95 @@ +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use futures_util::{FutureExt, StreamExt, future, stream::BoxStream}; +use tokio::sync::oneshot::Sender; +use vector_lib::{ + config::{AcknowledgementsConfig, Input}, + configurable::configurable_component, + event::Event, + sink::{StreamSink, VectorSink}, +}; + +use crate::{ + config::{SinkConfig, SinkContext}, + sinks::Healthcheck, +}; + +/// Configuration for the `test_completion` sink. +#[configurable_component(sink("test_completion", "Test (completion)."))] +#[derive(Clone, Debug, Default)] +pub struct CompletionSinkConfig { + #[serde(skip)] + expected: usize, + + #[serde(skip)] + completion_tx: Arc>>>, +} + +impl_generate_config_from_default!(CompletionSinkConfig); + +impl CompletionSinkConfig { + pub fn new(expected: usize, completion_tx: Sender) -> Self { + Self { + expected, + completion_tx: Arc::new(Mutex::new(Some(completion_tx))), + } + } +} + +#[async_trait] +#[typetag::serde(name = "test_completion")] +impl SinkConfig for CompletionSinkConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let completion_tx = self + .completion_tx + .lock() + .expect("completion sink mutex poisoned") + .take(); + + let sink = CompletionSink { + remaining: self.expected, + completion_tx, + }; + let healthcheck = future::ready(Ok(())).boxed(); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::all() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &AcknowledgementsConfig::DEFAULT + } +} + +struct CompletionSink { + remaining: usize, + completion_tx: Option>, +} + +#[async_trait] +impl StreamSink for CompletionSink { + async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { + while let Some(event) = input.next().await { + drop(event); + + if self.remaining > 0 { + self.remaining -= 1; + if self.remaining == 0 + && let Some(tx) = self.completion_tx.take() + { + let _ = tx.send(true); + } + } + } + + if let Some(tx) = self.completion_tx.take() { + let _ = tx.send(self.remaining == 0); + } + + Ok(()) + } +} diff --git a/src/test_util/mock/sinks/mod.rs b/src/test_util/mock/sinks/mod.rs index 862d7403603a7..226391007077b 100644 --- a/src/test_util/mock/sinks/mod.rs +++ b/src/test_util/mock/sinks/mod.rs @@ -4,6 +4,9 @@ pub use self::backpressure::BackpressureSinkConfig; mod basic; pub use self::basic::BasicSinkConfig; +mod completion; +pub use self::completion::CompletionSinkConfig; + mod error; pub use self::error::ErrorSinkConfig; diff --git a/src/topology/builder.rs b/src/topology/builder.rs index ba19f1dc61d78..3c5d836deb6e2 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, future::ready, num::NonZeroUsize, sync::{Arc, LazyLock, Mutex}, @@ -35,6 +35,7 @@ use vector_vrl_metrics::MetricsStorage; use super::{ BuiltBuffer, ConfigDiff, fanout::{self, Fanout}, + processing_time::ProcessingTimeRecorder, schema, task::{Task, TaskOutput, TaskResult}, }; @@ -543,6 +544,31 @@ impl<'a> Builder<'a> { } } + fn source_component_keys(&self, sink_key: &ComponentKey) -> Vec { + let mut sources = Vec::new(); + self.collect_source_component_keys(sink_key, &mut sources, &mut HashSet::new()); + sources + } + + fn collect_source_component_keys( + &self, + component: &ComponentKey, + sources: &mut Vec, + visited: &mut HashSet, + ) { + if visited.insert(component.clone()) { + if self.config.source(component).is_some() + || self.config.enrichment_table(component).is_some() + { + sources.push(component.clone()); + } else if let Some(inputs) = self.config.inputs_for_node(component) { + for input in inputs { + self.collect_source_component_keys(&input.component, sources, visited); + } + } + } + } + async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) { let table_sinks = self .config @@ -571,6 +597,15 @@ impl<'a> Builder<'a> { let typetag = sink.inner.get_component_name(); let input_type = sink.inner.input().data_type(); + // We need to create the processing time recorder before the span is entered, otherwise + // the metrics will be created with additional labels (i.e. `component_id` + // `component_kind` and `component_type`) that are not required for these metrics. + let processing_time_recorder = ProcessingTimeRecorder::new( + key, + self.source_component_keys(key), + self.config.global.processing_time_ewma_alpha, + ); + let span = error_span!( "sink", component_kind = "sink", @@ -591,7 +626,7 @@ impl<'a> Builder<'a> { self.errors.append(&mut err); }; - let (tx, rx) = match self.buffers.remove(key) { + let (mut tx, rx) = match self.buffers.remove(key) { Some(buffer) => buffer, _ => { let buffer_type = @@ -618,6 +653,8 @@ impl<'a> Builder<'a> { } }; + tx.with_custom_instrumentation(processing_time_recorder); + let cx = SinkContext { healthcheck, globals: self.config.global.clone(), diff --git a/src/topology/mod.rs b/src/topology/mod.rs index 15e8c4adf8a86..ed056c4c17ff3 100644 --- a/src/topology/mod.rs +++ b/src/topology/mod.rs @@ -12,6 +12,7 @@ pub mod schema; pub mod builder; mod controller; +mod processing_time; mod ready_arrays; mod running; mod task; diff --git a/src/topology/processing_time.rs b/src/topology/processing_time.rs new file mode 100644 index 0000000000000..71b55eccd5917 --- /dev/null +++ b/src/topology/processing_time.rs @@ -0,0 +1,166 @@ +#[cfg(test)] +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{collections::HashMap, sync::Arc}; + +use chrono::Utc; +use metrics::{Histogram, gauge, histogram}; +use vector_lib::{buffers::BufferInstrumentation, stats::EwmaGauge}; + +use crate::{config::ComponentKey, event::EventArray}; + +const NANOS_PER_SECOND: f64 = 1_000_000_000.0; +const EVENT_PROCESSING_TIME: &str = "event_processing_time_seconds"; +const EVENT_PROCESSING_TIME_MEAN: &str = "event_processing_time_mean_seconds"; +const DEFAULT_PROCESSING_TIME_EWMA_ALPHA: f64 = 0.9; + +#[cfg(test)] +static METRICS_ENTRY_CALLS: AtomicUsize = AtomicUsize::new(0); + +#[derive(Debug)] +pub(crate) struct ProcessingTimeRecorder { + metrics: HashMap, Metrics>, +} + +impl ProcessingTimeRecorder { + pub(crate) fn new( + sink_key: &ComponentKey, + sources: Vec, + ewma_alpha: Option, + ) -> Self { + let ewma_alpha = ewma_alpha.unwrap_or(DEFAULT_PROCESSING_TIME_EWMA_ALPHA); + let sink_id = Arc::from(sink_key.id().to_owned()); + let metrics = sources + .into_iter() + .map(|source_key| { + let source_id = Arc::new(source_key); + let metrics = Metrics::new(&sink_id, &source_id, ewma_alpha); + (source_id, metrics) + }) + .collect(); + Self { metrics } + } + + fn record_events(&self, events: &EventArray) { + let now = Utc::now(); + + // Under typical use, there will tend to be runs of events within each array from the same + // source. Instead of doing a relatively expensive lookup-or-insert in the metrics dashmap + // for each event, cache the last entry and skip the lookup if the source has not changed. + let mut curr_source = None; + let mut curr_metrics = None; + + for event in events.iter_events() { + let metadata = event.metadata(); + if let Some(ingest_timestamp) = metadata.ingest_timestamp() + && let Some(source_id) = metadata.source_id() + && let Some(latency_ns) = now + .signed_duration_since(ingest_timestamp) + .num_nanoseconds() + && latency_ns >= 0 + { + // We use a raw pointer to the contents of `source_id` as a cheap way to store and + // test if the source has changed with a new event. The alternative would be + // repeatedly cloning the `source_id`. Since this will be a hot path and we don't + // actually access anything _through_ the pointer, this is a safe use of pointers. + let source_ptr = Arc::as_ptr(source_id); + if curr_source != Some(source_ptr) { + curr_metrics = self.metrics_entry(source_id); + curr_source = Some(source_ptr); + } + + if let Some(metrics) = curr_metrics { + metrics.record(latency_ns as f64 / NANOS_PER_SECOND); + } + } + } + } + + fn metrics_entry<'a>(&'a self, source_id: &Arc) -> Option<&'a Metrics> { + #[cfg(test)] + { + // This is a test-only metric to track the number of times record_events calls this function. + METRICS_ENTRY_CALLS.fetch_add(1, Ordering::Relaxed); + } + + self.metrics.get(source_id) + } +} + +impl BufferInstrumentation for ProcessingTimeRecorder { + fn on_send(&self, events: &EventArray) { + self.record_events(events); + } +} + +#[derive(Debug)] +struct Metrics { + histogram: Histogram, + gauge: EwmaGauge, +} + +impl Metrics { + fn new(sink_id: &Arc, source_id: &ComponentKey, ewma_alpha: f64) -> Self { + let sink_label = sink_id.as_ref().to_owned(); + let source_label = source_id.id().to_owned(); + let histogram = histogram!( + EVENT_PROCESSING_TIME, + "sink_component_id" => sink_label.clone(), + "source_component_id" => source_label.clone(), + ); + let gauge = gauge!( + EVENT_PROCESSING_TIME_MEAN, + "sink_component_id" => sink_label, + "source_component_id" => source_label, + ); + Self { + histogram, + gauge: EwmaGauge::new(gauge, Some(ewma_alpha)), + } + } + + fn record(&self, latency_seconds: f64) { + self.histogram.record(latency_seconds); + self.gauge.record(latency_seconds); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::{EventArray, LogEvent}; + use chrono::Utc; + + fn make_log_event(source: &Arc) -> LogEvent { + let mut log = LogEvent::default(); + log.metadata_mut().set_source_id(Arc::clone(source)); + log.metadata_mut().set_ingest_timestamp(Utc::now()); + log + } + + #[test] + fn caches_metrics_entry_by_source_run() { + METRICS_ENTRY_CALLS.store(0, Ordering::Relaxed); + + let source_a_key = ComponentKey::from("source_a"); + let source_b_key = ComponentKey::from("source_b"); + let recorder = ProcessingTimeRecorder::new( + &ComponentKey::from("sink"), + vec![source_a_key.clone(), source_b_key.clone()], + None, + ); + let source_a = Arc::new(source_a_key); + let source_b = Arc::new(source_b_key); + let events = EventArray::Logs(vec![ + make_log_event(&source_a), + make_log_event(&source_a), + make_log_event(&source_b), + make_log_event(&source_b), + make_log_event(&source_a), + make_log_event(&source_a), + ]); + + recorder.record_events(&events); + + assert_eq!(METRICS_ENTRY_CALLS.load(Ordering::Relaxed), 3); + } +} diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index 738a73125aa8a..b10bd3e48e95b 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -39,6 +39,7 @@ mod crash; mod doesnt_reload; #[cfg(all(feature = "sources-http_server", feature = "sinks-http"))] mod end_to_end; +mod processing_time; #[cfg(all( feature = "sources-prometheus", feature = "sinks-prometheus", diff --git a/src/topology/test/processing_time.rs b/src/topology/test/processing_time.rs new file mode 100644 index 0000000000000..5315859c70403 --- /dev/null +++ b/src/topology/test/processing_time.rs @@ -0,0 +1,101 @@ +use tokio::{ + sync::oneshot, + time::{Duration, timeout}, +}; +use vector_lib::metrics::Controller; + +use crate::{ + config::Config, + event::{Event, LogEvent, Metric, MetricValue}, + test_util::{ + mock::{basic_source, completion_sink, noop_transform}, + start_topology, trace_init, + }, +}; + +#[tokio::test] +async fn sink_processing_time_metrics_emitted() { + trace_init(); + + let controller = Controller::get().expect("metrics controller"); + controller.reset(); + + let event_count = 3; + + let (mut source_tx, source_config) = basic_source(); + let transform_config = noop_transform(); + let (sink_done_tx, sink_done_rx) = oneshot::channel(); + let sink_config = completion_sink(event_count, sink_done_tx); + + let mut config = Config::builder(); + config.add_source("latency_source", source_config); + config.add_transform("latency_delay", &["latency_source"], transform_config); + config.add_sink("latency_sink", &["latency_delay"], sink_config); + + let (topology, _) = start_topology(config.build().unwrap(), false).await; + + for idx in 0..event_count { + let event = Event::Log(LogEvent::from(format!("payload-{idx}"))); + source_tx.send_event(event).await.unwrap(); + } + + drop(source_tx); + + let completed = timeout(Duration::from_secs(5), sink_done_rx) + .await + .expect("timed out waiting for completion sink to finish") + .expect("completion sink sender dropped"); + assert!( + completed, + "completion sink finished before receiving all events" + ); + + topology.stop().await; + + let metrics = controller.capture_metrics(); + let sink_id = "latency_sink"; + let source_id = "latency_source"; + + let histogram = metrics + .iter() + .find(|metric| { + metric.name() == "event_processing_time_seconds" + && has_latency_tags(metric, sink_id, source_id) + }) + .expect("event_processing_time_seconds histogram missing"); + + match histogram.value() { + MetricValue::AggregatedHistogram { count, .. } => { + assert_eq!( + *count, event_count as u64, + "histogram count should match number of events" + ); + } + other => panic!("expected aggregated histogram, got {other:?}"), + } + + let gauge = metrics + .iter() + .find(|metric| { + metric.name() == "event_processing_time_mean_seconds" + && has_latency_tags(metric, sink_id, source_id) + }) + .expect("event_processing_time_mean_seconds gauge missing"); + + match gauge.value() { + MetricValue::Gauge { value } => { + assert!( + *value >= 0.0, + "expected mean latency to be non-negative, got {value}" + ); + } + other => panic!("expected gauge metric, got {other:?}"), + } +} + +fn has_latency_tags(metric: &Metric, sink: &str, source: &str) -> bool { + metric.tags().is_some_and(|tags| { + tags.get("source_component_id") == Some(source) + && tags.get("sink_component_id") == Some(sink) + }) +} diff --git a/website/cue/reference/components/sinks.cue b/website/cue/reference/components/sinks.cue index cb399721a8c0b..b4f340e8e0b99 100644 --- a/website/cue/reference/components/sinks.cue +++ b/website/cue/reference/components/sinks.cue @@ -676,6 +676,8 @@ components: sinks: [Name=string]: { component_sent_bytes_total: components.sources.internal_metrics.output.metrics.component_sent_bytes_total component_sent_events_total: components.sources.internal_metrics.output.metrics.component_sent_events_total component_sent_event_bytes_total: components.sources.internal_metrics.output.metrics.component_sent_event_bytes_total + event_processing_time_seconds: components.sources.internal_metrics.output.metrics.event_processing_time_seconds + event_processing_time_mean_seconds: components.sources.internal_metrics.output.metrics.event_processing_time_mean_seconds utilization: components.sources.internal_metrics.output.metrics.utilization } } diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index f2315772fdaf5..e4db32139f874 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -273,6 +273,23 @@ components: sources: internal_metrics: { reason: _reason } } + event_processing_time_seconds: { + description: """ + The elapsed time, in fractional seconds, between when a source ingests an event and when a sink receives it. + """ + type: "histogram" + default_namespace: "vector" + tags: _processing_time_tags + } + event_processing_time_mean_seconds: { + description: """ + The mean elapsed time, in fractional seconds, between when a source ingests an event and when a sink receives it. + This value is smoothed over time using an exponentially weighted moving average (EWMA). + """ + type: "gauge" + default_namespace: "vector" + tags: _processing_time_tags + } buffer_byte_size: { description: "The number of bytes current in the buffer." type: "gauge" @@ -1043,6 +1060,10 @@ components: sources: internal_metrics: { component_id: _component_id component_type: _component_type } + _processing_time_tags: _internal_metrics_tags & { + sink_component_id: _sink_component_id + source_component_id: _source_component_id + } // All available tags _collector: { @@ -1068,6 +1089,16 @@ components: sources: internal_metrics: { required: true examples: ["file", "http", "honeycomb", "splunk_hec"] } + _sink_component_id: { + description: "The sink component ID that emitted the event." + required: true + examples: ["out_http", "out_console"] + } + _source_component_id: { + description: "The source component ID that originally ingested the event." + required: true + examples: ["in_kafka", "in_file"] + } _endpoint: { description: "The absolute path of originating file." required: true diff --git a/website/cue/reference/generated/configuration.cue b/website/cue/reference/generated/configuration.cue index 6bf097aa7c52c..c1af86fc76003 100644 --- a/website/cue/reference/generated/configuration.cue +++ b/website/cue/reference/generated/configuration.cue @@ -706,11 +706,11 @@ generated: configuration: configuration: { The alpha value for the exponential weighted moving average (EWMA) of source and transform buffer utilization metrics. - This value specifies how much of the existing value is retained when each update is made. - Values closer to 1.0 result in the value adjusting slower to changes. The default value of - 0.9 is equivalent to a "half life" of 6-7 measurements. + This controls how quickly the `*_buffer_utilization_mean` gauges respond to new + observations. Values closer to 1.0 retain more of the previous value, leading to slower + adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements. - Must be between 0 and 1 exclusive (0 < alpha < 1). + Must be between 0 and 1 exclusively (0 < alpha < 1). """ required: false type: float: {} @@ -898,6 +898,20 @@ generated: configuration: configuration: { required: false type: float: {} } + processing_time_ewma_alpha: { + description: """ + The alpha value for the exponential weighted moving average (EWMA) of transform processing + time metrics. + + This controls how quickly the `event_processing_time_mean_seconds` gauge responds to new + observations. Values closer to 1.0 retain more of the previous value, leading to slower + adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements. + + Must be between 0 and 1 exclusively (0 < alpha < 1). + """ + required: false + type: float: {} + } proxy: { common: false description: """