-
Notifications
You must be signed in to change notification settings - Fork 42
repair: add new socket and block id repair service #707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
78a3dde to
e28c2b9
Compare
6a09c1b to
e2ebd5a
Compare
|
|
||
| let mut last_stats_report = Instant::now(); | ||
| // throttle starts at 1024 responses => 1 second of compute | ||
| let mut throttle = DynamicPacketToProcessThreshold::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i know this is a really dumb throttle, but it's what eager repair uses 🤷
ideally we should be throttling based on sender but there was also pushback when I suggested we use quic here.
|
|
||
| /// Similar to [`Self::repair_request`] but for [`BlockIdRepairType`] requests. | ||
| /// Uses stake-weighted peer selection rather than cluster_slots weights. | ||
| pub(crate) fn block_id_repair_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no ❌ EpochSlots
|
|
||
| type OutstandingBlockIdRepairs = OutstandingRequests<BlockIdRepairType>; | ||
|
|
||
| const MAX_REPAIR_REQUESTS_PER_ITERATION: usize = 200; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷 can adjust this based on tuning, repair service uses 512 per 1ms sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
512 only really needed during restart. Seems like 200 should be plenty in most cases
|
|
||
| /// We prioritize Pong first (to respond to ping challenges), then requests with | ||
| /// lower slot #s, and then prefer metadata requests before shred requests. | ||
| impl Ord for RepairRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this ordering is because we cap the # of repair requests we send per iteration. Better to repair stuff we can actually replay first
| if !shred_socket_batch.is_empty() { | ||
| let total = shred_socket_batch.len(); | ||
| let _ = batch_send( | ||
| repair_socket, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We send shred requests via repair socket so they eventually get back to sigverify shreds / window service /blockstore.
Bit annoying to maintain two sockets here but better than copying all that logic over here
bw-solana
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed everything except core/src/repair/block_id_repair_service.rs
Submitting comments so far while I review this giant new file
| peer.pubkey, | ||
| repair_request | ||
| ); | ||
| Ok((out, peer.serve_repair)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were using the new socket for both send and receive of block ID repairs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit confusing:
Metadata (ParentAndFecSetCount & FecSetRoot) & Pong:
A BlockIdRepairService
-> A.block_id_socket.send(B.serve_repair, request)
-> B.serve_repair.recv
-> B ServeRepairService
-> B.serve_repair.send(A.block_id_socket, response)
-> A.block_id_socket.recv
-> A BlockIdRepairService
ShredForBlockId:
A BlockIdRepairService
-> A.repair_socket.send(B.serve_repair, request)
-> B.serve_repair.recv
-> B ServeRepairService
-> B.serve_repair.send(A.repair_socket, response)
-> A.repair_socket.recv
-> A ShredFetch / ShredSigverify
-> A Blockstore insert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serve repair service just processes incoming requests and sends back responses to the same socket
BlockID socket => send new Metadata/Pong requests and receive responses / Pings
repair socket => send new ShredForBlockId requests and receive shreds
Serve repair socket => Receive any sort of repair request from any socket, send back repair_handler's response to that same socket (or a Ping)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the ShredForBlockId flow, does B send shreds to A's TVU port? Or does it actually go to the repair_socket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have this right now:
- block_id_socket - send all meta requests out of this socket. Receive meta responses on this socket
- serve_repair - receive all requests (meta/shred) on this socket. Send all responses (meta/shred) out of this this socket
- repair_socket - send shreds requests out of this socket. Receive shreds on this socket
|
|
||
| type OutstandingBlockIdRepairs = OutstandingRequests<BlockIdRepairType>; | ||
|
|
||
| const MAX_REPAIR_REQUESTS_PER_ITERATION: usize = 200; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
512 only really needed during restart. Seems like 200 should be plenty in most cases
| match self { | ||
| // Pong is always highest priority and handled separately in Ord, | ||
| // so this should never be called. Return 0 as a fallback. | ||
| RepairRequest::Pong { .. } => 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe RepairRequest::Pong { .. } => unimplemented!("Pong requests do not have a slot"), ?
| /// We prioritize Pong first (to respond to ping challenges), then requests with | ||
| /// lower slot #s, and then prefer metadata requests before shred requests. | ||
| impl Ord for RepairRequest { | ||
| fn cmp(&self, other: &Self) -> std::cmp::Ordering { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we add more Request types, this is going to be a pain to maintain
| Arc::new(StreamerReceiveStats::new( | ||
| "block_id_repair_response_receiver", | ||
| )), | ||
| Some(Duration::from_millis(1)), // coalesce |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you done any perf measurements? Idk if we really need to coalesce here at all
| Ok(event) => state.pending_repair_events.push(event), | ||
| Err(_) => break, | ||
| }, | ||
| default(Duration::from_secs(1)) => (), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a continue?
| // for repair events that are currently deferred | ||
| select! { | ||
| recv(completed_slots_receiver) -> result => match result { | ||
| Ok(_) => (), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be doing something here?
| // pong has highest priority - must respond to ping challenges immediately | ||
| match (&self, &other) { | ||
| (Pong { .. }, Pong { .. }) => return Ordering::Equal, | ||
| (Pong { .. }, _) => return Ordering::Greater, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this DoSable? Can I flood pings?
| } | ||
| Some(turbine_block_id) if turbine_block_id != block_id => { | ||
| // Turbine has a different block | ||
| info!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably make this a warn at least
| /// For shred requests, we check if the shred has been received before retrying. | ||
| fn retry_timed_out_requests(blockstore: &Blockstore, state: &mut RepairState, now: u64) { | ||
| // TODO(ashwin): use extract_if when we upstream (rust 1.88+) | ||
| let mut timed_out = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just push directly to state.pending_repair_requests?
Problem
We need a new socket and logic for the block id based repair.

As a refresher this is the repair algo outlined in the paper:
Summary of Changes
Add new socket and repair service.
Plug from NotarizeFallback (or stronger) certificates. SafeToNotar (the other case where we need to repair blocks) will happen in a follow up.
The service sends 3 types of requests:
It responds to 3 types of responses:
Because of the expected low throughput of this service (until we turn off eager repair), I opted for a simple approach processing responses and sending requests all from one thread.
Couple of useful things: