Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.10.8"
version = "0.11.0"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
20 changes: 19 additions & 1 deletion apps/hermes/server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@ pub mod token;
pub mod types;
mod ws;

#[derive(Clone, Debug)]
pub struct StreamingConfig {
pub disconnect_slow_consumers: bool,
pub ws_max_write_buffer_bytes: usize,
}

pub struct ApiState<S> {
pub state: Arc<S>,
pub ws: Arc<ws::WsState>,
pub metrics: Arc<metrics_middleware::ApiMetrics>,
pub streaming: StreamingConfig,
}

/// Manually implement `Clone` as the derive macro will try and slap `Clone` on
Expand All @@ -34,12 +41,18 @@ impl<S> Clone for ApiState<S> {
state: self.state.clone(),
ws: self.ws.clone(),
metrics: self.metrics.clone(),
streaming: self.streaming.clone(),
}
}
}

impl<S> ApiState<S> {
pub fn new(state: Arc<S>, ws_whitelist: Vec<IpNet>, requester_ip_header_name: String) -> Self
pub fn new(
state: Arc<S>,
ws_whitelist: Vec<IpNet>,
requester_ip_header_name: String,
streaming: StreamingConfig,
) -> Self
where
S: Metrics,
S: Send + Sync + 'static,
Expand All @@ -52,6 +65,7 @@ impl<S> ApiState<S> {
state.clone(),
)),
state,
streaming,
}
}
}
Expand All @@ -71,6 +85,10 @@ where
state,
opts.rpc.ws_whitelist,
opts.rpc.requester_ip_header_name,
StreamingConfig {
disconnect_slow_consumers: opts.rpc.disconnect_slow_consumers,
ws_max_write_buffer_bytes: opts.rpc.ws_max_write_buffer_bytes,
},
)
};

Expand Down
52 changes: 51 additions & 1 deletion apps/hermes/server/src/api/metrics_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,24 @@ use {
},
prometheus_client::{
encoding::EncodeLabelSet,
metrics::{counter::Counter, family::Family, histogram::Histogram},
metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram},
},
std::sync::Arc,
tokio::time::Instant,
};

#[derive(Clone, Debug, PartialEq, Eq, Hash, EncodeLabelSet)]
pub struct StreamProtocolLabel {
pub protocol: String,
}

pub struct ApiMetrics {
pub requests: Family<Labels, Counter>,
pub latencies: Family<LatencyLabels, Histogram>,
pub sse_broadcast_latency: Histogram,
pub stream_active_connections: Family<StreamProtocolLabel, Gauge>,
pub stream_slow_consumer_disconnects: Family<StreamProtocolLabel, Counter>,
pub sse_broadcast_lagged: Counter,
}

impl ApiMetrics {
Expand All @@ -43,12 +51,18 @@ impl ApiMetrics {
]
.into_iter(),
),
stream_active_connections: Family::default(),
stream_slow_consumer_disconnects: Family::default(),
sse_broadcast_lagged: Counter::default(),
};

{
let requests = new.requests.clone();
let latencies = new.latencies.clone();
let sse_broadcast_latency = new.sse_broadcast_latency.clone();
let stream_active_connections = new.stream_active_connections.clone();
let stream_slow_consumer_disconnects = new.stream_slow_consumer_disconnects.clone();
let sse_broadcast_lagged = new.sse_broadcast_lagged.clone();

tokio::spawn(async move {
Metrics::register(
Expand Down Expand Up @@ -76,6 +90,36 @@ impl ApiMetrics {
),
)
.await;

Metrics::register(
&*state,
(
"stream_active_connections",
"Number of active WebSocket and SSE streaming connections",
stream_active_connections,
),
)
.await;

Metrics::register(
&*state,
(
"stream_slow_consumer_disconnects_total",
"Total streaming connections closed due to slow consumption",
stream_slow_consumer_disconnects,
),
)
.await;

Metrics::register(
&*state,
(
"sse_broadcast_lagged_total",
"Total SSE broadcast lag events (client missed updates)",
sse_broadcast_lagged,
),
)
.await;
});
}

Expand Down Expand Up @@ -141,3 +185,9 @@ pub async fn track_metrics<B, S>(

response
}

pub fn stream_protocol_label(protocol: &str) -> StreamProtocolLabel {
StreamProtocolLabel {
protocol: protocol.to_string(),
}
}
31 changes: 28 additions & 3 deletions apps/hermes/server/src/api/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ where
mod tests {
use {
super::*,
crate::api::StreamingConfig,
crate::state::{
aggregate::{
AggregationEvent, PriceFeedsWithUpdateData, PublisherStakeCapsWithUpdateData,
Expand Down Expand Up @@ -215,7 +216,15 @@ mod tests {
available_ids.insert(id2);

let mock_state = MockAggregates { available_ids };
let api_state = ApiState::new(Arc::new(mock_state), vec![], String::new());
let api_state = ApiState::new(
Arc::new(mock_state),
vec![],
String::new(),
StreamingConfig {
disconnect_slow_consumers: true,
ws_max_write_buffer_bytes: 2 * 1024 * 1024,
},
);

let input_ids = vec![id1, id2];
let result = validate_price_ids(&api_state, &input_ids, false).await;
Expand All @@ -234,7 +243,15 @@ mod tests {
available_ids.insert(id2);

let mock_state = MockAggregates { available_ids };
let api_state = ApiState::new(Arc::new(mock_state), vec![], String::new());
let api_state = ApiState::new(
Arc::new(mock_state),
vec![],
String::new(),
StreamingConfig {
disconnect_slow_consumers: true,
ws_max_write_buffer_bytes: 2 * 1024 * 1024,
},
);

let input_ids = vec![id1, id2, id3];
let result = validate_price_ids(&api_state, &input_ids, true).await;
Expand All @@ -253,7 +270,15 @@ mod tests {
available_ids.insert(id2);

let mock_state = MockAggregates { available_ids };
let api_state = ApiState::new(Arc::new(mock_state), vec![], String::new());
let api_state = ApiState::new(
Arc::new(mock_state),
vec![],
String::new(),
StreamingConfig {
disconnect_slow_consumers: true,
ws_max_write_buffer_bytes: 2 * 1024 * 1024,
},
);

let input_ids = vec![id1, id2, id3];
let result = validate_price_ids(&api_state, &input_ids, false).await;
Expand Down
Loading
Loading