Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion votor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ bs58 = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true, features = ["rayon", "raw-api"] }
etcd-client = { workspace = true, features = ["tls"] }
histogram = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
num-traits = { workspace = true }
parking_lot = { workspace = true }
qualifier_attr = { workspace = true }
rayon = { workspace = true }
Expand Down
142 changes: 50 additions & 92 deletions votor/src/consensus_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::welford_stats::WelfordStats,
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
histogram::Histogram,
solana_clock::{Epoch, Slot},
solana_epoch_schedule::EpochSchedule,
solana_metrics::datapoint_info,
Expand Down Expand Up @@ -46,37 +46,14 @@ pub enum ConsensusMetricsEvent {
pub type ConsensusMetricsEventSender = Sender<(Instant, Vec<ConsensusMetricsEvent>)>;
pub type ConsensusMetricsEventReceiver = Receiver<(Instant, Vec<ConsensusMetricsEvent>)>;

/// Returns a [`Histogram`] configured for the use cases for this module.
///
/// Keeps the default precision and reduces the max value to 10s to get finer grained resolution.
fn build_histogram() -> Histogram {
Histogram::configure()
.max_value(10_000_000)
.build()
.unwrap()
}

/// Tracks all [`Vote`] metrics for a given node.
#[derive(Debug)]
#[derive(Debug, Default)]
struct NodeVoteMetrics {
notar: Histogram,
notar_fallback: Histogram,
skip: Histogram,
skip_fallback: Histogram,
final_: Histogram,
}

impl Default for NodeVoteMetrics {
fn default() -> Self {
let histogram = build_histogram();
Self {
notar: histogram.clone(),
notar_fallback: histogram.clone(),
skip: histogram.clone(),
skip_fallback: histogram.clone(),
final_: histogram,
}
}
notar: WelfordStats,
notar_fallback: WelfordStats,
skip: WelfordStats,
skip_fallback: WelfordStats,
final_: WelfordStats,
}

impl NodeVoteMetrics {
Expand All @@ -93,21 +70,13 @@ impl NodeVoteMetrics {
return;
}
};
let res = match vote {
Vote::Notarize(_) => self.notar.increment(elapsed),
Vote::NotarizeFallback(_) => self.notar_fallback.increment(elapsed),
Vote::Skip(_) => self.skip.increment(elapsed),
Vote::SkipFallback(_) => self.skip_fallback.increment(elapsed),
Vote::Finalize(_) => self.final_.increment(elapsed),
Vote::Genesis(_) => Ok(()), // Only for migration, tracked elsewhere
};
match res {
Ok(()) => (),
Err(err) => {
warn!(
"recording duration {elapsed} for vote {vote:?}: recording failed with {err}"
);
}
match vote {
Vote::Notarize(_) => self.notar.add_sample(elapsed),
Vote::NotarizeFallback(_) => self.notar_fallback.add_sample(elapsed),
Vote::Skip(_) => self.skip.add_sample(elapsed),
Vote::SkipFallback(_) => self.skip_fallback.add_sample(elapsed),
Vote::Finalize(_) => self.final_.add_sample(elapsed),
Vote::Genesis(_) => (), // Only for migration, tracked elsewhere
}
}
}
Expand All @@ -133,7 +102,7 @@ struct EpochMetrics {
node_metrics: BTreeMap<Pubkey, NodeVoteMetrics>,

/// Used to track when this node received blocks from different leaders in the network.
leader_metrics: BTreeMap<Pubkey, Histogram>,
leader_metrics: BTreeMap<Pubkey, WelfordStats>,

/// Counts number of times metrics recording failed.
metrics_recording_failed: usize,
Expand Down Expand Up @@ -260,19 +229,11 @@ impl ConsensusMetrics {
return;
}
};
let histogram = epoch_metrics
epoch_metrics
.leader_metrics
.entry(leader)
.or_insert_with(build_histogram);
match histogram.increment(elapsed) {
Ok(()) => (),
Err(err) => {
warn!(
"recording duration {elapsed} for block hash for slot {slot}: recording \
failed with {err}"
);
}
}
.or_default()
.add_sample(elapsed);
}

/// Records when a given slot started.
Expand Down Expand Up @@ -314,42 +275,42 @@ impl ConsensusMetrics {
datapoint_info!("consensus_vote_metrics",
"address" => addr,
("epoch", epoch, i64),
("notar_vote_count", metrics.notar.entries(), i64),
("notar_vote_us_mean", metrics.notar.mean().ok(), Option<i64>),
("notar_vote_us_stddev", metrics.notar.stddev(), Option<i64>),
("notar_vote_us_maximum", metrics.notar.maximum().ok(), Option<i64>),

("notar_fallback_vote_count", metrics.notar_fallback.entries(), i64),
("notar_fallback_vote_us_mean", metrics.notar_fallback.mean().ok(), Option<i64>),
("notar_fallback_vote_us_stddev", metrics.notar_fallback.stddev(), Option<i64>),
("notar_fallback_vote_us_maximum", metrics.notar_fallback.maximum().ok(), Option<i64>),

("skip_vote_count", metrics.skip.entries(), i64),
("skip_vote_us_mean", metrics.skip.mean().ok(), Option<i64>),
("skip_vote_us_stddev", metrics.skip.stddev(), Option<i64>),
("skip_vote_us_maximum", metrics.skip.maximum().ok(), Option<i64>),

("skip_fallback_vote_count", metrics.skip_fallback.entries(), i64),
("skip_fallback_vote_us_mean", metrics.skip_fallback.mean().ok(), Option<i64>),
("skip_fallback_vote_us_stddev", metrics.skip_fallback.stddev(), Option<i64>),
("skip_fallback_vote_us_maximum", metrics.skip_fallback.maximum().ok(), Option<i64>),

("finalize_vote_count", metrics.final_.entries(), i64),
("finalize_vote_us_mean", metrics.final_.mean().ok(), Option<i64>),
("finalize_vote_us_stddev", metrics.final_.stddev(), Option<i64>),
("finalize_vote_us_maximum", metrics.final_.maximum().ok(), Option<i64>),
("notar_vote_count", metrics.notar.count(), i64),
("notar_vote_us_mean", metrics.notar.mean::<i64>(), Option<i64>),
("notar_vote_us_stddev", metrics.notar.stddev::<i64>(), Option<i64>),
("notar_vote_us_maximum", metrics.notar.maximum::<i64>(), Option<i64>),

("notar_fallback_vote_count", metrics.notar_fallback.count(), i64),
("notar_fallback_vote_us_mean", metrics.notar_fallback.mean::<i64>(), Option<i64>),
("notar_fallback_vote_us_stddev", metrics.notar_fallback.stddev::<i64>(), Option<i64>),
("notar_fallback_vote_us_maximum", metrics.notar_fallback.maximum::<i64>(), Option<i64>),

("skip_vote_count", metrics.skip.count(), i64),
("skip_vote_us_mean", metrics.skip.mean::<i64>(), Option<i64>),
("skip_vote_us_stddev", metrics.skip.stddev::<i64>(), Option<i64>),
("skip_vote_us_maximum", metrics.skip.maximum::<i64>(), Option<i64>),

("skip_fallback_vote_count", metrics.skip_fallback.count(), i64),
("skip_fallback_vote_us_mean", metrics.skip_fallback.mean::<i64>(), Option<i64>),
("skip_fallback_vote_us_stddev", metrics.skip_fallback.stddev::<i64>(), Option<i64>),
("skip_fallback_vote_us_maximum", metrics.skip_fallback.maximum::<i64>(), Option<i64>),

("finalize_vote_count", metrics.final_.count(), i64),
("finalize_vote_us_mean", metrics.final_.mean::<i64>(), Option<i64>),
("finalize_vote_us_stddev", metrics.final_.stddev::<i64>(), Option<i64>),
("finalize_vote_us_maximum", metrics.final_.maximum::<i64>(), Option<i64>),
);
}

for (addr, histogram) in &epoch_metrics.leader_metrics {
for (addr, stats) in &epoch_metrics.leader_metrics {
let addr = addr.to_string();
datapoint_info!("consensus_block_hash_seen_metrics",
"address" => addr,
("epoch", epoch, i64),
("block_hash_seen_count", histogram.entries(), i64),
("block_hash_seen_us_mean", histogram.mean().ok(), Option<i64>),
("block_hash_seen_us_stddev", histogram.stddev(), Option<i64>),
("block_hash_seen_us_maximum", histogram.maximum().ok(), Option<i64>),
("block_hash_seen_count", stats.count(), i64),
("block_hash_seen_us_mean", stats.mean::<i64>(), Option<i64>),
("block_hash_seen_us_stddev", stats.stddev::<i64>(), Option<i64>),
("block_hash_seen_us_maximum", stats.maximum::<i64>(), Option<i64>),
);
}

Expand Down Expand Up @@ -416,8 +377,8 @@ mod tests {
metrics.record_vote(pubkey, &Vote::Skip(SkipVote { slot: 42 }), Instant::now());

let node = &metrics.epoch_metrics[&0].node_metrics[&pubkey];
assert_eq!(node.skip.entries(), 1);
assert!(node.skip.mean().unwrap() > 0);
assert_eq!(node.skip.count(), 1);
assert!(node.skip.mean::<i64>().unwrap() > 0);
}

#[test]
Expand Down Expand Up @@ -503,9 +464,6 @@ mod tests {
metrics.record_start_of_slot(42, Instant::now());
metrics.record_block_hash_seen(leader, 42, Instant::now());

assert_eq!(
metrics.epoch_metrics[&0].leader_metrics[&leader].entries(),
1
);
assert_eq!(metrics.epoch_metrics[&0].leader_metrics[&leader].count(), 1);
}
}
1 change: 1 addition & 0 deletions votor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod vote_history_storage;
pub mod voting_service;
pub mod voting_utils;
pub mod votor;
mod welford_stats;

#[macro_use]
extern crate log;
Expand Down
135 changes: 135 additions & 0 deletions votor/src/welford_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use num_traits::NumCast;

/// Welford's online algorithm for computing running mean, variance, and standard deviation.
#[derive(Debug, Clone, Default)]
pub(crate) struct WelfordStats {
/// Number of samples added.
count: u64,
/// Running mean, updated incrementally with each sample.
mean: f64,
/// Sum of squared differences from the current mean (used to compute variance).
m2: f64,
/// Maximum value seen.
max: u64,
}

impl WelfordStats {
/// Adds a sample and updates all running statistics.
pub(crate) fn add_sample(&mut self, value: u64) {
self.count = self.count.checked_add(1).unwrap();
let v = value as f64;
let d = v - self.mean;
self.mean += d / self.count as f64;
self.m2 += d * (v - self.mean);
self.max = self.max.max(value);
}

/// Returns the number of samples added.
pub(crate) fn count(&self) -> u64 {
self.count
}

/// Returns the mean, or `None` if no samples have been added.
pub(crate) fn mean<T: NumCast>(&self) -> Option<T> {
match self.count {
0 => None,
_ => NumCast::from(self.mean),
}
}

/// Returns the sample standard deviation, or `None` if fewer than 2 samples.
pub(crate) fn stddev<T: NumCast>(&self) -> Option<T> {
match self.count {
0 | 1 => None,
n => {
let var = self.m2 / n.saturating_sub(1) as f64;
NumCast::from(var.sqrt())
}
}
}

/// Returns the maximum value seen, or `None` if no samples have been added.
pub(crate) fn maximum<T: NumCast>(&self) -> Option<T> {
match self.count {
0 => None,
_ => NumCast::from(self.max),
}
}
}

#[cfg(test)]
mod tests {
use {super::*, test_case::test_matrix};

const EPSILON: f64 = 1e-10;

fn make_stats(values: &[u64]) -> WelfordStats {
let mut stats = WelfordStats::default();
values.iter().for_each(|&v| stats.add_sample(v));
stats
}

fn expected_sequential_stddev(n: u64) -> f64 {
let num = n.saturating_mul(n.saturating_add(1));
(num as f64 / 12.0).sqrt()
}

#[test]
fn test_empty_returns_none() {
let stats = WelfordStats::default();
assert_eq!(stats.count(), 0);
assert_eq!(stats.mean::<f64>(), None);
assert_eq!(stats.stddev::<f64>(), None);
assert_eq!(stats.maximum::<u64>(), None);
}

#[test_matrix(
[1usize, 5, 10, 100_000],
[false, true]
)]
fn test_sample_counts(n: usize, use_sequential: bool) {
let values: Vec<u64> = if use_sequential {
(1..=n as u64).collect()
} else {
std::iter::repeat_n(42, n).collect()
};
let stats = make_stats(&values);

assert_eq!(stats.count(), n as u64);
assert!(stats.mean::<f64>().is_some());
assert!(stats.maximum::<u64>().is_some());
assert_eq!(stats.stddev::<f64>().is_some(), n > 1);
}

#[test_matrix([1usize, 5, 10, 100_000])]
fn test_sequential_stats(n: usize) {
let stats = make_stats(&(1..=n as u64).collect::<Vec<_>>());

let expected_mean = (n as f64 + 1.0) / 2.0;
assert!((stats.mean::<f64>().unwrap() - expected_mean).abs() < EPSILON);
assert_eq!(stats.maximum::<u64>(), Some(n as u64));

if n > 1 {
let expected_stddev = expected_sequential_stddev(n as u64);
assert!((stats.stddev::<f64>().unwrap() - expected_stddev).abs() < EPSILON);
}
}

#[test_matrix([2usize, 5, 10, 100_000])]
fn test_constant_has_zero_stddev(n: usize) {
let stats = make_stats(&vec![999; n]);
assert_eq!(stats.mean::<i64>(), Some(999));
assert_eq!(stats.stddev::<f64>(), Some(0.0));
assert_eq!(stats.maximum::<u64>(), Some(999));
}

#[test]
fn test_numerical_stability_large_values() {
let base = 1_000_000_000_000u64;
let stats = make_stats(&[base, base + 1, base + 2, base + 3, base + 4]);

assert_eq!(stats.mean::<i64>(), Some((base + 2) as i64));
assert!((stats.stddev::<f64>().unwrap() - expected_sequential_stddev(5)).abs() < EPSILON);
assert_eq!(stats.maximum::<u64>(), Some(base + 4));
}
}