Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ sources-logs = [
"sources-internal_logs",
"sources-journald",
"sources-kafka",
"sources-kubernetes_events",
"sources-kubernetes_logs",
"sources-logstash",
"sources-mqtt",
Expand Down Expand Up @@ -672,6 +673,7 @@ sources-internal_metrics = []
sources-static_metrics = []
sources-journald = []
sources-kafka = ["dep:rdkafka"]
sources-kubernetes_events = ["kubernetes"]
sources-kubernetes_logs = ["vector-lib/file-source", "kubernetes", "transforms-reduce"]
sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"]
sources-mongodb_metrics = ["dep:mongodb"]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/kubernetes_events_source.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Introduced a `kubernetes_events` source that streams Kubernetes Event objects through the API, with optional deduplication and enrichment helpers for singleton cluster deployments.

authors: elohmeier
78 changes: 78 additions & 0 deletions src/internal_events/kubernetes_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use metrics::counter;
use vector_lib::NamedInternalEvent;
use vector_lib::{
internal_event::{
ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
},
json_size::JsonSize,
};

#[derive(Debug, NamedInternalEvent)]
pub struct KubernetesEventsReceived {
pub byte_size: JsonSize,
}

impl InternalEvent for KubernetesEventsReceived {
fn emit(self) {
trace!(
message = "Kubernetes event received.",
count = 1,
byte_size = %self.byte_size,
);

counter!("component_received_events_total").increment(1);
counter!("component_received_event_bytes_total").increment(self.byte_size.get() as u64);
}
}

#[derive(Debug, NamedInternalEvent)]
pub struct KubernetesEventsWatchError<E> {
pub error: E,
}

impl<E: std::fmt::Display> InternalEvent for KubernetesEventsWatchError<E> {
fn emit(self) {
error!(
message = "Kubernetes events watcher error.",
error = %self.error,
error_type = error_type::READER_FAILED,
stage = error_stage::RECEIVING,
);
counter!(
"component_errors_total",
"error_type" => error_type::READER_FAILED,
"stage" => error_stage::RECEIVING,
)
.increment(1);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: "watcher_error"
});
}
}

#[derive(Debug, NamedInternalEvent)]
pub struct KubernetesEventsSerializationError<E> {
pub error: E,
}

impl<E: std::fmt::Display> InternalEvent for KubernetesEventsSerializationError<E> {
fn emit(self) {
error!(
message = "Failed to serialize Kubernetes event.",
error = %self.error,
error_type = error_type::ENCODER_FAILED,
stage = error_stage::PROCESSING,
);
counter!(
"component_errors_total",
"error_type" => error_type::ENCODER_FAILED,
"stage" => error_stage::PROCESSING,
)
.increment(1);
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: "serialization_failed"
});
}
}
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ mod internal_logs;
mod journald;
#[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))]
mod kafka;
#[cfg(feature = "sources-kubernetes_events")]
mod kubernetes_events;
#[cfg(feature = "sources-kubernetes_logs")]
mod kubernetes_logs;
#[cfg(feature = "transforms-log_to_metric")]
Expand Down Expand Up @@ -231,6 +233,8 @@ pub(crate) use self::internal_logs::*;
pub(crate) use self::journald::*;
#[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))]
pub(crate) use self::kafka::*;
#[cfg(feature = "sources-kubernetes_events")]
pub(crate) use self::kubernetes_events::*;
#[cfg(feature = "sources-kubernetes_logs")]
pub(crate) use self::kubernetes_logs::*;
#[cfg(feature = "transforms-log_to_metric")]
Expand Down
Loading
Loading