Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,6 @@ async fn main() -> anyhow::Result<()> {
.inspect_err(|error| error!("failed to connect: {error}"))
})
.await
.map_err(Into::into)
}

async fn geyser_health_watch(mut client: GeyserGrpcClient<impl Interceptor>) -> anyhow::Result<()> {
Expand Down
119 changes: 103 additions & 16 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
use {
crate::{
config::{ConfigGrpc, ConfigTokio},
metrics::{self, DebugClientMessage},
metrics::{
self, set_subscriber_pace, set_subscriber_recv_bandwidth_load,
set_subscriber_send_bandwidth_load, DebugClientMessage,
},
util::{
ema::{Ema, EmaReactivity, DEFAULT_EMA_WINDOW},
stream::{
load_aware_channel, LoadAwareReceiver, LoadAwareSender, StatsSettings,
TrafficWeighted,
},
},
version::GrpcVersionInfo,
},
anyhow::Context,
Expand All @@ -24,7 +34,6 @@ use {
task::spawn_blocking,
time::{sleep, Duration, Instant},
},
tokio_stream::wrappers::ReceiverStream,
tonic::{
service::interceptor::interceptor,
transport::{
Expand Down Expand Up @@ -55,6 +64,7 @@ use {
IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeReplayInfoRequest,
SubscribeReplayInfoResponse, SubscribeRequest,
},
prost::Message as ProstMessage,
},
};

Expand Down Expand Up @@ -86,6 +96,32 @@ struct BlockMetaStorageInner {
finalized: Option<u64>,
}

impl TrafficWeighted for FilteredUpdate {
fn weight(&self) -> u32 {
self.encoded_len() as u32
}
}

impl TrafficWeighted for Status {
fn weight(&self) -> u32 {
// Rough estimate of the size of a Status message, we don't really care about the exact size
self.message().len() as u32
}
}

impl<T, E> TrafficWeighted for Result<T, E>
where
T: TrafficWeighted,
E: TrafficWeighted,
{
fn weight(&self) -> u32 {
match self {
Ok(item) => item.weight(),
Err(err) => err.weight(),
}
}
}

#[derive(Debug)]
struct BlockMetaStorage {
read_sem: Semaphore,
Expand Down Expand Up @@ -252,7 +288,7 @@ struct MessageId {
}

impl MessageId {
fn next(&mut self) -> u64 {
const fn next(&mut self) -> u64 {
self.id = self.id.checked_add(1).expect("message id overflow");
self.id
}
Expand Down Expand Up @@ -650,6 +686,7 @@ impl GrpcService {
}
// Dedup accounts by max write_version
Message::Account(msg) => {
metrics::observe_geyser_account_update_received(msg.account.data.len());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line of code is inside the "geyser loop" which process every geyser event from agave.
I measure the account data size and put it inside an histogram.

let write_version = msg.account.write_version;
let msg_index = slot_messages.messages.len() - 1;
if let Some(entry) = slot_messages.accounts_dedup.get_mut(&msg.account.pubkey) {
Expand Down Expand Up @@ -845,8 +882,9 @@ impl GrpcService {
#[allow(clippy::too_many_arguments)]
async fn client_loop(
id: usize,
subscriber_id: Option<String>,
endpoint: String,
stream_tx: mpsc::Sender<TonicResult<FilteredUpdate>>,
stream_tx: LoadAwareSender<TonicResult<FilteredUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<(Option<u64>, Filter)>>,
mut snapshot_rx: Option<crossbeam_channel::Receiver<Box<Message>>>,
mut messages_rx: broadcast::Receiver<BroadcastedMessage>,
Expand All @@ -856,7 +894,7 @@ impl GrpcService {
) {
let mut filter = Filter::default();
metrics::update_subscriptions(&endpoint, None, Some(&filter));

let subscriber_id = subscriber_id.unwrap_or("UNKNOWN".to_owned());
metrics::connections_total_inc();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter {
id,
Expand All @@ -869,7 +907,7 @@ impl GrpcService {
Self::client_loop_snapshot(
id,
&endpoint,
&stream_tx,
stream_tx.clone(),
&mut client_rx,
snapshot_rx,
&mut is_alive,
Expand All @@ -878,8 +916,28 @@ impl GrpcService {
.await;
}

let client_loop_pace = Ema::builder()
.window(DEFAULT_EMA_WINDOW)
.reactivity(EmaReactivity::Reactive)
.build();

if is_alive {
'outer: loop {
set_subscriber_pace(
&subscriber_id,
client_loop_pace.current_load().per_second() as i64,
);

set_subscriber_send_bandwidth_load(
&subscriber_id,
stream_tx.estimated_send_rate().per_second() as i64,
);

set_subscriber_recv_bandwidth_load(
&subscriber_id,
stream_tx.estimated_consuming_rate().per_second() as i64,
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I measure three things:

The overall loop processing pace set_subscriber_pace where the unit is "geyser event"/second.

The actual bandwidth load we are sending to the downstream client (only filtered data matching client's filters).
The actual bandwidth consumption rate a client do per second.


tokio::select! {
mut message = client_rx.recv() => {
// forward to latest filter
Expand Down Expand Up @@ -946,8 +1004,12 @@ impl GrpcService {
messages.sort_by_key(|msg| msg.0);
for (_msgid, message) in messages.iter() {
for message in filter.get_updates(message, Some(commitment)) {
let proto_size = message.encoded_len().min(u32::MAX as usize) as u32;
match stream_tx.send(Ok(message)).await {
Ok(()) => {}
Ok(()) => {
metrics::incr_grpc_message_sent_counter(&subscriber_id);
metrics::incr_grpc_bytes_sent(&subscriber_id, proto_size);
}
Err(mpsc::error::SendError(_)) => {
error!("client #{id}: stream closed");
break 'outer;
Expand Down Expand Up @@ -980,11 +1042,17 @@ impl GrpcService {
}
};

client_loop_pace.record_load(Instant::now().into(), messages.len() as u32);

if commitment == filter.get_commitment_level() {
for (_msgid, message) in messages.iter() {
for message in filter.get_updates(message, Some(commitment)) {
let proto_size = message.encoded_len().min(u32::MAX as usize) as u32;
match stream_tx.try_send(Ok(message)) {
Ok(()) => {}
Ok(()) => {
metrics::incr_grpc_message_sent_counter(&subscriber_id);
metrics::incr_grpc_bytes_sent(&subscriber_id, proto_size);
Comment on lines +1042 to +1043
Copy link
Contributor Author

@lvboudre lvboudre Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linuskendall
This is the main sending code to the client.
Here I measure the size of the proto payload I will send to the client via gRPC.
Only payload that passes client's filter will be measure.
Finnally, I increase the subscriber message counter.

}
Err(mpsc::error::TrySendError::Full(_)) => {
error!("client #{id}: lagged to send an update");
tokio::spawn(async move {
Expand All @@ -999,6 +1067,8 @@ impl GrpcService {
}
}
}
} else {
stream_tx.no_load();
}

if commitment == CommitmentLevel::Processed && debug_client_tx.is_some() {
Expand All @@ -1023,7 +1093,7 @@ impl GrpcService {
async fn client_loop_snapshot(
id: usize,
endpoint: &str,
stream_tx: &mpsc::Sender<TonicResult<FilteredUpdate>>,
stream_tx: LoadAwareSender<TonicResult<FilteredUpdate>>,
client_rx: &mut mpsc::UnboundedReceiver<Option<(Option<u64>, Filter)>>,
snapshot_rx: crossbeam_channel::Receiver<Box<Message>>,
is_alive: &mut bool,
Expand Down Expand Up @@ -1086,7 +1156,7 @@ impl GrpcService {

#[tonic::async_trait]
impl Geyser for GrpcService {
type SubscribeStream = ReceiverStream<TonicResult<FilteredUpdate>>;
type SubscribeStream = LoadAwareReceiver<TonicResult<FilteredUpdate>>;

async fn subscribe(
&self,
Expand All @@ -1100,11 +1170,21 @@ impl Geyser for GrpcService {
} else {
None
};
let (stream_tx, stream_rx) = mpsc::channel(if snapshot_rx.is_some() {
self.config_snapshot_client_channel_capacity
} else {
self.config_channel_capacity
});

let client_stats_settigns = StatsSettings::default()
.tx_ema_reactivity(EmaReactivity::Reactive)
.tx_ema_window(DEFAULT_EMA_WINDOW)
.rx_ema_reactivity(EmaReactivity::Reactive)
.rx_ema_window(DEFAULT_EMA_WINDOW);

let (stream_tx, stream_rx) = load_aware_channel(
if snapshot_rx.is_some() {
self.config_snapshot_client_channel_capacity
} else {
self.config_channel_capacity
},
client_stats_settigns,
);
let (client_tx, client_rx) = mpsc::unbounded_channel();
let notify_exit1 = Arc::new(Notify::new());
let notify_exit2 = Arc::new(Notify::new());
Expand Down Expand Up @@ -1142,6 +1222,12 @@ impl Geyser for GrpcService {
.and_then(|h| h.to_str().ok().map(|s| s.to_string()))
.unwrap_or_else(|| "".to_owned());

let subscriber_id = request
.metadata()
.get("x-subscription-id")
.and_then(|h| h.to_str().ok().map(|s| s.to_string()))
.or(request.remote_addr().map(|addr| addr.to_string()));

Comment on lines +1217 to +1222
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to identitfy a downstream client I check if x-subscription-id is present otherwise I use its ip address.

let config_filter_limits = Arc::clone(&self.config_filter_limits);
let filter_names = Arc::clone(&self.filter_names);
let incoming_stream_tx = stream_tx.clone();
Expand Down Expand Up @@ -1201,6 +1287,7 @@ impl Geyser for GrpcService {

tokio::spawn(Self::client_loop(
id,
subscriber_id,
endpoint,
stream_tx,
client_rx,
Expand All @@ -1214,7 +1301,7 @@ impl Geyser for GrpcService {
},
));

Ok(Response::new(ReceiverStream::new(stream_rx)))
Ok(Response::new(stream_rx))
}

async fn subscribe_first_available_slot(
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod config;
pub mod grpc;
pub mod metrics;
pub mod plugin;
pub(crate) mod util;
pub mod version;

pub fn get_thread_name() -> String {
Expand Down
89 changes: 88 additions & 1 deletion yellowstone-grpc-geyser/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use {
server::conn::auto::Builder as ServerBuilder,
},
log::{error, info},
prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
prometheus::{
Histogram, HistogramOpts, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder,
},
solana_clock::Slot,
std::{
collections::{hash_map::Entry as HashMapEntry, HashMap},
Expand Down Expand Up @@ -67,6 +69,51 @@ lazy_static::lazy_static! {
Opts::new("missed_status_message_total", "Number of missed messages by commitment"),
&["status"]
).unwrap();

static ref GRPC_MESSAGE_SENT: IntCounterVec = IntCounterVec::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Measure how many geyser event we sent to each subscriber.

Opts::new(
"grpc_message_sent_count",
"Number of message sent over grpc to downstream client",
),
&["subscriber_id"]
).unwrap();

static ref GRPC_BYTES_SENT: IntCounterVec = IntCounterVec::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Measures how many bytes (protobuffer encoded data) to each subscriber

Opts::new("grpc_bytes_sent", "Number of bytes sent over grpc to downstream client"),
&["subscriber_id"]
).unwrap();

static ref GRPC_SUBSCRIBER_MESSAGE_PROCESSING_PACE: IntGaugeVec = IntGaugeVec::new(
Opts::new(
"grpc_subscriber_message_processing_pace_sec",
"How many subscriber loop process incoming geyser message per second"
),
&["subscriber_id"]
).unwrap();

static ref GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD: IntGaugeVec = IntGaugeVec::new(
Opts::new(
"grpc_subscriber_send_bandwidth_load",
"Current Send load we send to subscriber channel (in bytes per second)"
),
&["subscriber_id"]
).unwrap();

static ref GRPC_SUBCRIBER_RX_LOAD: IntGaugeVec = IntGaugeVec::new(
Opts::new(
"grpc_subscriber_recv_bandwidth_load",
"Current Receive load of subscriber channel (in bytes per second)"
),
&["subscriber_id"]
).unwrap();

static ref GEYSER_ACCOUNT_UPDATE_RECEIVED: Histogram = Histogram::with_opts(
HistogramOpts::new(
"geyser_account_update_data_size_kib",
"Histogram of all account update data (kib) received from Geyser plugin"
)
.buckets(vec![5.0, 10.0, 20.0, 30.0, 50.0, 100.0, 200.0, 300.0, 500.0, 1000.0, 2000.0, 3000.0, 5000.0, 10000.0])
).unwrap();
}
Comment on lines +110 to 117
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted for an histogram to measure account data our geyser plugin receives since histogram gives us two other metrics for "free":

the total sum of data (geyser_account_update_data_size_kib_sum) and the account update counts (geyser_account_update_data_size_kib_count).

The bucket is based of previous work I did for fumarole.
The P90 of account data size should be below 5KiB.
The P95 should be < 20kIB.
About 1% of account data can be above 1MiB.
The max size (bucket) is ~10mb which match the max account data size.


#[derive(Debug)]
Expand Down Expand Up @@ -199,6 +246,12 @@ impl PrometheusService {
register!(CONNECTIONS_TOTAL);
register!(SUBSCRIPTIONS_TOTAL);
register!(MISSED_STATUS_MESSAGE);
register!(GRPC_MESSAGE_SENT);
register!(GRPC_BYTES_SENT);
register!(GRPC_SUBSCRIBER_MESSAGE_PROCESSING_PACE);
register!(GEYSER_ACCOUNT_UPDATE_RECEIVED);
register!(GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD);
register!(GRPC_SUBCRIBER_RX_LOAD);

VERSION
.with_label_values(&[
Expand Down Expand Up @@ -314,6 +367,18 @@ fn not_found_handler() -> http::Result<Response<BoxBody<Bytes, Infallible>>> {
.body(BodyEmpty::new().boxed())
}

pub fn incr_grpc_bytes_sent<S: AsRef<str>>(remote_id: S, byte_sent: u32) {
GRPC_BYTES_SENT
.with_label_values(&[remote_id.as_ref()])
.inc_by(byte_sent as u64);
}

pub fn incr_grpc_message_sent_counter<S: AsRef<str>>(remote_id: S) {
GRPC_MESSAGE_SENT
.with_label_values(&[remote_id.as_ref()])
.inc();
}

pub fn update_slot_status(status: &GeyserSlosStatus, slot: u64) {
SLOT_STATUS
.with_label_values(&[status.as_str()])
Expand Down Expand Up @@ -370,3 +435,25 @@ pub fn missed_status_message_inc(status: SlotStatus) {
.with_label_values(&[status.as_str()])
.inc()
}

pub fn set_subscriber_pace<S: AsRef<str>>(subscriber_id: S, pace: i64) {
GRPC_SUBSCRIBER_MESSAGE_PROCESSING_PACE
.with_label_values(&[subscriber_id.as_ref()])
.set(pace);
}

pub fn observe_geyser_account_update_received(data_bytesize: usize) {
GEYSER_ACCOUNT_UPDATE_RECEIVED.observe(data_bytesize as f64 / 1024.0);
}

pub fn set_subscriber_send_bandwidth_load<S: AsRef<str>>(subscriber_id: S, load: i64) {
GRPC_SUBSCRIBER_SEND_BANDWIDTH_LOAD
.with_label_values(&[subscriber_id.as_ref()])
.set(load);
}

pub fn set_subscriber_recv_bandwidth_load<S: AsRef<str>>(subscriber_id: S, load: i64) {
GRPC_SUBCRIBER_RX_LOAD
.with_label_values(&[subscriber_id.as_ref()])
.set(load);
}
Loading
Loading