Skip to content

Commit

Permalink
refactor(main): less verbose event loss error
Browse files Browse the repository at this point in the history
  • Loading branch information
qjerome committed Dec 13, 2024
1 parent 3179447 commit d7e9abf
Showing 1 changed file with 96 additions and 26 deletions.
122 changes: 96 additions & 26 deletions kunai/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tokio::time::timeout;

use std::borrow::Cow;
use std::cmp::max;
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};

use std::fs::{self, DirBuilder, File};
use std::io::{self, BufRead, BufReader, Read, Write};
Expand Down Expand Up @@ -2091,14 +2091,64 @@ impl<'s> EventConsumer<'s> {
}
}

#[derive(Debug)]
struct Stats {
read: u64,
lost: u64,
start: time::Instant,
}

impl Stats {
fn new() -> Self {
Self {
read: 0,
lost: 0,
start: time::Instant::now(),
}
}

#[inline(always)]
fn update(&mut self, read: u64, lost: u64) {
if self.is_empty() {
self.start = time::Instant::now();
}
self.read = self.read.wrapping_add(read);
self.lost = self.lost.wrapping_add(lost);
}

#[inline(always)]
fn percent_loss(&self) -> f64 {
self.lost as f64 * 100.0 / self.total() as f64
}

#[inline(always)]
fn eps(&self) -> f64 {
self.total() as f64
/ (time::Instant::now()
.duration_since(self.start)
.as_secs_f64())
}

#[inline(always)]
fn total(&self) -> u64 {
self.read.wrapping_add(self.lost)
}

#[inline(always)]
fn is_empty(&self) -> bool {
self.read == 0 && self.lost == 0
}
}

struct EventProducer {
config: Config,
batch: usize,
pipe: VecDeque<EncodedEvent>,
sender: mpsc::Sender<EncodedEvent>,
filter: Filter,
stats: AyaHashMap<MapData, Type, u64>,
perf_array: AsyncPerfEventArray<MapData>,
ebpf_stats_map: AyaHashMap<MapData, Type, u64>,
stats: Stats,
ebpf_perf_array: AsyncPerfEventArray<MapData>,
tasks: Vec<tokio::task::JoinHandle<Result<(), anyhow::Error>>>,
stop: bool,
// flag to be set when the producer needs to reload
Expand Down Expand Up @@ -2132,8 +2182,9 @@ impl EventProducer {
batch: 0,
sender,
filter,
stats: stats_map,
perf_array,
ebpf_stats_map: stats_map,
stats: Stats::new(),
ebpf_perf_array: perf_array,
tasks: vec![],
stop: false,
reload: false,
Expand Down Expand Up @@ -2311,7 +2362,7 @@ impl EventProducer {
let mut buf = shared
.lock()
.await
.perf_array
.ebpf_perf_array
.open(
cpu_id,
Some(optimal_page_count(
Expand All @@ -2334,6 +2385,8 @@ impl EventProducer {
.collect::<Vec<_>>();

let timeout = time::Duration::from_millis(10);
// serves as error display decision
let mut last_lost_cnt = 0;

loop {
// we time this out so that the barrier does not wait too long
Expand All @@ -2346,24 +2399,41 @@ impl EventProducer {
};

// checking out lost events
if events.lost > 0 {
error!(
"some events have been lost in the way from kernel read={} lost={}: consider filtering out some events or increase the number of buffered events in configuration",
events.read, events.lost
);

if events.lost > 0 || events.read > 0 {
{
let ep = event_producer.lock().await;
for ty in Type::variants() {
if ty.is_configurable() {
error!(
"stats {}: {}",
ty,
ep.stats.get(&ty, 0).unwrap_or_default()
);
let mut ep = event_producer.lock().await;
// update event statistics
ep.stats.update(events.read as u64, events.lost as u64);
// borrow stats
let stats = &ep.stats;
// only show error in leader cpu if needed
if cpu_id == leader_cpu_id && stats.lost > last_lost_cnt {
// easy way to create a top most frequent
let mut tree = BTreeMap::new();
for ty in Type::variants() {
if ty.is_configurable() {
tree.insert(
ep.ebpf_stats_map.get(&ty, 0).unwrap_or_default(),
ty,
);
}
}
// take top 5 most frequent events
let top = tree
.iter()
.rev()
.take(5)
.map(|(c, t)| format!("{t}={c}"))
.collect::<Vec<String>>()
.join(", ");

error!(
"some events have been lost in the way from kernel read={} lost={} loss-ratio={:.2}% eps={:.2}: consider event filtering out and/or increase the number of buffered events in configuration. Filtering hints, most frequent events: {top} ",
stats.read, stats.lost, stats.percent_loss(), stats.eps());
// update last_lost for future error display decision
last_lost_cnt = stats.lost;
// drop producer
}
// drop producer
}
}

Expand Down Expand Up @@ -2415,11 +2485,11 @@ impl EventProducer {
&& !matches!(
etype,
Type::Execve
| Type::ExecveScript
| Type::Clone
// exit and exit_group are used to cleanup hashmap
| Type::Exit
| Type::ExitGroup
| Type::ExecveScript
| Type::Clone
// exit and exit_group are used to cleanup hashmap
| Type::Exit
| Type::ExitGroup
)
{
continue;
Expand Down

0 comments on commit d7e9abf

Please sign in to comment.