Skip to content

refactor(l1): parallelize request_block_headers function #3650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
15 changes: 15 additions & 0 deletions crates/networking/p2p/kademlia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,21 @@ impl KademliaTable {
.map(|channel| (peer.node.node_id(), channel))
})
}

/// Returns a vector containing the channels for all the peers that
/// support the given capability, without considering their scores.
pub fn get_all_peer_channels(&self, capabilities: &[Capability]) -> Vec<PeerChannels> {
let filter = |peer: &PeerData| -> bool {
// Search for peers with an active connection that support the required capabilities
peer.channels.is_some()
&& capabilities
.iter()
.any(|cap| peer.supported_capabilities.contains(cap))
};
self.filter_peers(&filter)
.filter_map(|peer| peer.channels.clone())
.collect()
}
}

/// Computes the distance between two nodes according to the discv4 protocol
Expand Down
115 changes: 67 additions & 48 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct PeerHandler {
peer_table: Arc<Mutex<KademliaTable>>,
}

#[derive(Debug, Clone, Copy)]
pub enum BlockRequestOrder {
OldToNew,
NewToOld,
Expand Down Expand Up @@ -110,7 +111,14 @@ impl PeerHandler {
None
}

/// Requests block headers from any suitable peer, starting from the `start` block hash towards either older or newer blocks depending on the order
/// Returns a vector containing the channels for all the peers that support the given capability.
async fn get_all_peer_channels(&self, capabilities: &[Capability]) -> Vec<PeerChannels> {
let table = self.peer_table.lock().await;
table.get_all_peer_channels(capabilities)
}

/// Requests block headers to all the peers and waits for the first valid response,
/// starting from the `start` block hash towards either older or newer blocks depending on the order
/// Returns the block headers or None if:
/// - There are no available peers (the node just started up or was rejected by all other nodes)
/// - No peer returned a valid response in the given time and retry limits
Expand All @@ -121,57 +129,68 @@ impl PeerHandler {
) -> Option<Vec<BlockHeader>> {
for _ in 0..REQUEST_RETRY_ATTEMPTS {
let request_id = rand::random();
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
id: request_id,
startblock: start.into(),
limit: BLOCK_HEADER_LIMIT,
skip: 0,
reverse: matches!(order, BlockRequestOrder::NewToOld),
});
let (peer_id, mut peer_channel) = self
.get_peer_channel_with_retry(&SUPPORTED_ETH_CAPABILITIES)
.await?;
let mut receiver = peer_channel.receiver.lock().await;
if let Err(err) = peer_channel
.connection
.cast(CastMessage::BackendMessage(request))
.await
{
self.record_peer_failure(peer_id).await;
debug!("Failed to send message to peer: {err:?}");
continue;
let peer_channels = self
.get_all_peer_channels(&SUPPORTED_ETH_CAPABILITIES)
.await;
let mut task_set = tokio::task::JoinSet::new();
// Request the block headers to all the active peers
for mut peer_channel in peer_channels {
task_set.spawn(async move {
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
id: request_id,
startblock: start.into(),
limit: BLOCK_HEADER_LIMIT,
skip: 0,
reverse: matches!(order, BlockRequestOrder::NewToOld),
});

let mut receiver = peer_channel.receiver.lock().await;
// TODO: error handling when we re-add peer scoring and penalization
if let Err(err) = peer_channel
.connection
.cast(CastMessage::BackendMessage(request))
.await
{
info!("Failed to send message to peer: {err:?}");
return None;
}

let Ok(response) =
tokio::time::timeout(PEER_REPLY_TIMEOUT, receiver.recv()).await
else {
return None;
};
response
});
}
if let Some(block_headers) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move {
loop {
match receiver.recv().await {
Some(RLPxMessage::BlockHeaders(BlockHeaders { id, block_headers }))
if id == request_id =>
{
return Some(block_headers);
}
// Ignore replies that don't match the expected id (such as late responses)
Some(_) => continue,
None => return None, // Retry request

let mut block_headers_response = None;
// Wait for the first valid BlockHeaders response
while let Some(response) = task_set.join_next().await {
match response {
Ok(Some(RLPxMessage::BlockHeaders(BlockHeaders { id, block_headers })))
if id == request_id =>
{
block_headers_response = Some(block_headers);
break;
}
}
})
.await
.ok()
.flatten()
.and_then(|headers| (!headers.is_empty()).then_some(headers))
{
if are_block_headers_chained(&block_headers, &order) {
self.record_peer_success(peer_id).await;
return Some(block_headers);
} else {
warn!(
"[SYNCING] Received invalid headers from peer, penalizing peer {peer_id}"
);
self.record_peer_critical_failure(peer_id).await;
// Ignore replies that don't match the expected id (such as late responses)
Ok(_) => continue,
Err(err) => info!("Failed to receive message from peer: {err:?}"),
}
}
warn!("[SYNCING] Didn't receive block headers from peer, penalizing peer {peer_id}...");
self.record_peer_failure(peer_id).await;
drop(task_set);

let Some(block_headers) = block_headers_response else {
continue;
};

if are_block_headers_chained(&block_headers, &order) {
return Some(block_headers);
} else {
warn!("[SYNCING] Received invalid headers from peer");
}
warn!("[SYNCING] Didn't receive block headers from peer");
}
None
}
Expand Down