Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@

.idea/

.env
.env

.local
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["examples", "jito_protos", "proxy"]
resolver = "2"

[workspace.package]
version = "0.2.10"
version = "0.2.12"
description = "Fast path to receive shreds from Jito, forwarding to local consumers. See https://docs.jito.wtf/lowlatencytxnfeed/ for details."
authors = ["Jito Team <[email protected]>"]
homepage = "https://jito.wtf/"
Expand All @@ -22,6 +22,7 @@ hostname = "0.4.0"
itertools = "0.14.0"
jito-protos = { path = "jito_protos" }
log = "0.4"
libc = "0.2"
prost = "0.13"
prost-types = "0.13"
rand = "0.8"
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ RUN --mount=type=cache,mode=0777,target=/home/root/app/target \

################################################################################
FROM --platform=linux/amd64 debian:bullseye-slim as base_image
RUN apt-get -qq update && apt-get install -qq -y ca-certificates libssl1.1 && rm -rf /var/lib/apt/lists/*
# keep iproute2 for multicast route parsing
RUN apt-get -qq update && apt-get install -qq -y ca-certificates libssl1.1 iproute2 && rm -rf /var/lib/apt/lists/*

################################################################################
FROM base_image as shredstream_proxy
Expand Down
1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ hostname = { workspace = true }
itertools = { workspace = true }
jito-protos = { workspace = true }
log = { workspace = true }
libc = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
Expand Down
76 changes: 74 additions & 2 deletions proxy/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
Arc, RwLock,
},
thread::{Builder, JoinHandle},
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};

use arc_swap::ArcSwap;
Expand Down Expand Up @@ -242,10 +242,13 @@ fn recv_from_channel_and_send_multiple_dest(

let mut packet_batch_vec = vec![packet_batch];

let t = Instant::now();
let num_deduped = solana_perf::deduper::dedup_packets_and_count_discards(
&deduper.read().unwrap(),
&mut packet_batch_vec,
);
let t_dedup = t.elapsed().as_micros() as u64;
metrics.dedup_time_spent.fetch_add(t_dedup, Ordering::Relaxed);
// Store stats for each Packet
packet_batch_vec.iter().for_each(|batch| {
batch.iter().for_each(|packet| {
Expand Down Expand Up @@ -275,7 +278,16 @@ fn recv_from_channel_and_send_multiple_dest(
Some((data, addr))
})
.collect::<Vec<(&[u8], &SocketAddr)>>();

let t = Instant::now();
metrics
.send_batch_size_sum
.fetch_add(packets_with_dest.len() as u64, Ordering::Relaxed);
metrics.send_batch_count.fetch_add(1, Ordering::Relaxed);
const MAX_IOV: usize = libc::UIO_MAXIOV as usize;
let max_iov_count = packets_with_dest.len() / MAX_IOV;
let unsaturated_iov_count = packets_with_dest.len() % MAX_IOV;
metrics.saturated_iov_count.fetch_add(max_iov_count as u64, Ordering::Relaxed);
metrics.unsaturated_iov_count.fetch_add(unsaturated_iov_count as u64, Ordering::Relaxed);
match batch_send(send_socket, &packets_with_dest) {
Ok(_) => {
metrics
Expand All @@ -297,6 +309,8 @@ fn recv_from_channel_and_send_multiple_dest(
);
}
}
let t_send = t.elapsed().as_micros() as u64;
metrics.batch_send_time_spent.fetch_add(t_send, Ordering::Relaxed);
});

// Count TraceShred shreds
Expand Down Expand Up @@ -463,6 +477,13 @@ pub struct ShredMetrics {
pub duplicate: AtomicU64,
/// (discarded, not discarded, from other shredstream instances)
pub packets_received: DashMap<IpAddr, (u64, u64)>,
/// The batch size we are sending to batch_send solana crate call.
pub send_batch_size_sum: AtomicU64,
pub send_batch_count: AtomicU64,
/// Number of occurrences we can saturated the iovecs in sendmmsg
pub saturated_iov_count: AtomicU64,
/// Number of occurrences we could not saturate the iovecs in sendmmsg
pub unsaturated_iov_count: AtomicU64,

// service metrics
pub enabled_grpc_service: bool,
Expand All @@ -481,6 +502,10 @@ pub struct ShredMetrics {
/// Number of times we couldn't find the previous DATA_COMPLETE_SHRED flag but tried to deshred+deserialize, and failed
pub unknown_start_position_error_count: AtomicU64,

// cumulative time spent in deduping packets
pub dedup_time_spent: AtomicU64,
pub batch_send_time_spent: AtomicU64,

// cumulative metrics (persist after reset)
pub agg_received_cumulative: AtomicU64,
pub agg_success_forward_cumulative: AtomicU64,
Expand Down Expand Up @@ -514,6 +539,12 @@ impl ShredMetrics {
agg_success_forward_cumulative: Default::default(),
agg_fail_forward_cumulative: Default::default(),
duplicate_cumulative: Default::default(),
dedup_time_spent: Default::default(),
batch_send_time_spent: Default::default(),
send_batch_size_sum: Default::default(),
send_batch_count: Default::default(),
saturated_iov_count: Default::default(),
unsaturated_iov_count: Default::default(),
}
}

Expand All @@ -534,6 +565,41 @@ impl ShredMetrics {
("duplicate", self.duplicate.load(Ordering::Relaxed), i64),
);

datapoint_info!(
"shredstream_proxy-sendmmsg_iov_metrics",
("max_iov_count", self.saturated_iov_count.load(Ordering::Relaxed), i64),
(
"unsaturated_iov_count",
self.unsaturated_iov_count.load(Ordering::Relaxed),
i64
),
);

datapoint_info!(
"shredstream_proxy-batch_send_metrics",
(
"send_batch_size_sum", self.send_batch_size_sum.load(Ordering::Relaxed), i64
),
(
"send_batch_count", self.send_batch_count.load(Ordering::Relaxed), i64
)
);

datapoint_info!(
"shredstream_proxy-time_allocation",
(
"deduping",
self.dedup_time_spent.load(Ordering::Relaxed),
i64
),
(
"batch_send",
self.batch_send_time_spent.load(Ordering::Relaxed),
i64
),
);


if self.enabled_grpc_service {
datapoint_info!(
"shredstream_proxy-service_metrics",
Expand Down Expand Up @@ -598,6 +664,12 @@ impl ShredMetrics {
);
self.duplicate_cumulative
.fetch_add(self.duplicate.swap(0, Ordering::Relaxed), Ordering::Relaxed);
self.dedup_time_spent.swap(0, Ordering::Relaxed);
self.batch_send_time_spent.swap(0, Ordering::Relaxed);
self.send_batch_size_sum.swap(0, Ordering::Relaxed);
self.send_batch_count.swap(0, Ordering::Relaxed);
self.saturated_iov_count.swap(0, Ordering::Relaxed);
self.unsaturated_iov_count.swap(0, Ordering::Relaxed);
}
}

Expand Down
3 changes: 2 additions & 1 deletion proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ fn main() -> Result<(), ShredstreamProxyError> {
&args.multicast_device,
args.multicast_subscribe_port,
args.multicast_bind_ip,
);
)
.inspect(|mcast_socket| info!("Multicast listeners found: {mcast_socket:?}."));
let forwarder_hdls = forwarder::start_forwarder_threads(
unioned_dest_sockets.clone(),
args.src_bind_addr,
Expand Down
10 changes: 5 additions & 5 deletions proxy/src/multicast_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use itertools::{Either, Itertools};
use log::{info, warn};
use log::{debug, info, warn};
use serde::Deserialize;

fn run_ip_json(args: &[&str]) -> io::Result<Vec<u8>> {
Expand Down Expand Up @@ -121,11 +121,11 @@ pub fn create_multicast_socket_on_device(
multicast_ip: Option<IpAddr>,
) -> Option<Vec<UdpSocket>> {
let device_ipv4 = ipv4_addr_for_device(device_name).unwrap_or_else(|e| {
warn!("Failed to resolve IPv4 address for device {device_name}: {e}");
debug!("Failed to resolve IPv4 address for device {device_name}: {e}");
None
});
let device_ifindex_v6 = ifindex_for_device(device_name).unwrap_or_else(|e| {
warn!("Failed to resolve ifindex for device {device_name}: {e}");
debug!("Failed to resolve ifindex for device {device_name}: {e}");
None
});

Expand All @@ -138,14 +138,14 @@ pub fn create_multicast_socket_on_device(
IpAddr::V6(v6) => Either::Right(v6),
}),
Err(e) => {
warn!("Failed to parse 'ip route list' for {device_name}: {e}");
debug!("Failed to parse 'ip route list' for {device_name}: {e}");
(Vec::new(), Vec::new())
}
},
};

if groups_v4.is_empty() && groups_v6.is_empty() {
warn!("No multicast groups found for device {device_name}; skipping multicast listener");
debug!("No multicast groups found for device {device_name}; skipping multicast listener");
return None;
}

Expand Down
Loading