diff --git a/crates/networking/p2p/kademlia.rs b/crates/networking/p2p/kademlia.rs index 4bf794cc89..8f0c720e96 100644 --- a/crates/networking/p2p/kademlia.rs +++ b/crates/networking/p2p/kademlia.rs @@ -380,6 +380,21 @@ impl KademliaTable { .map(|channel| (peer.node.node_id(), channel)) }) } + + /// Returns a vector containing the channels for all the peers that + /// support the given capability, without considering their scores. + pub fn get_all_peer_channels(&self, capabilities: &[Capability]) -> Vec { + let filter = |peer: &PeerData| -> bool { + // Search for peers with an active connection that support the required capabilities + peer.channels.is_some() + && capabilities + .iter() + .any(|cap| peer.supported_capabilities.contains(cap)) + }; + self.filter_peers(&filter) + .filter_map(|peer| peer.channels.clone()) + .collect() + } } /// Computes the distance between two nodes according to the discv4 protocol diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 3ad37442e4..2bad7a57b9 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -50,6 +50,7 @@ pub struct PeerHandler { peer_table: Arc>, } +#[derive(Debug, Clone, Copy)] pub enum BlockRequestOrder { OldToNew, NewToOld, @@ -110,7 +111,14 @@ impl PeerHandler { None } - /// Requests block headers from any suitable peer, starting from the `start` block hash towards either older or newer blocks depending on the order + /// Returns a vector containing the channels for all the peers that support the given capability. + async fn get_all_peer_channels(&self, capabilities: &[Capability]) -> Vec { + let table = self.peer_table.lock().await; + table.get_all_peer_channels(capabilities) + } + + /// Requests block headers to all the peers and waits for the first valid response, + /// starting from the `start` block hash towards either older or newer blocks depending on the order /// Returns the block headers or None if: /// - There are no available peers (the node just started up or was rejected by all other nodes) /// - No peer returned a valid response in the given time and retry limits @@ -121,57 +129,68 @@ impl PeerHandler { ) -> Option> { for _ in 0..REQUEST_RETRY_ATTEMPTS { let request_id = rand::random(); - let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders { - id: request_id, - startblock: start.into(), - limit: BLOCK_HEADER_LIMIT, - skip: 0, - reverse: matches!(order, BlockRequestOrder::NewToOld), - }); - let (peer_id, mut peer_channel) = self - .get_peer_channel_with_retry(&SUPPORTED_ETH_CAPABILITIES) - .await?; - let mut receiver = peer_channel.receiver.lock().await; - if let Err(err) = peer_channel - .connection - .cast(CastMessage::BackendMessage(request)) - .await - { - self.record_peer_failure(peer_id).await; - debug!("Failed to send message to peer: {err:?}"); - continue; + let peer_channels = self + .get_all_peer_channels(&SUPPORTED_ETH_CAPABILITIES) + .await; + let mut task_set = tokio::task::JoinSet::new(); + // Request the block headers to all the active peers + for mut peer_channel in peer_channels { + task_set.spawn(async move { + let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders { + id: request_id, + startblock: start.into(), + limit: BLOCK_HEADER_LIMIT, + skip: 0, + reverse: matches!(order, BlockRequestOrder::NewToOld), + }); + + let mut receiver = peer_channel.receiver.lock().await; + // TODO: error handling when we re-add peer scoring and penalization + if let Err(err) = peer_channel + .connection + .cast(CastMessage::BackendMessage(request)) + .await + { + info!("Failed to send message to peer: {err:?}"); + return None; + } + + let Ok(response) = + tokio::time::timeout(PEER_REPLY_TIMEOUT, receiver.recv()).await + else { + return None; + }; + response + }); } - if let Some(block_headers) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move { - loop { - match receiver.recv().await { - Some(RLPxMessage::BlockHeaders(BlockHeaders { id, block_headers })) - if id == request_id => - { - return Some(block_headers); - } - // Ignore replies that don't match the expected id (such as late responses) - Some(_) => continue, - None => return None, // Retry request + + let mut block_headers_response = None; + // Wait for the first valid BlockHeaders response + while let Some(response) = task_set.join_next().await { + match response { + Ok(Some(RLPxMessage::BlockHeaders(BlockHeaders { id, block_headers }))) + if id == request_id => + { + block_headers_response = Some(block_headers); + break; } - } - }) - .await - .ok() - .flatten() - .and_then(|headers| (!headers.is_empty()).then_some(headers)) - { - if are_block_headers_chained(&block_headers, &order) { - self.record_peer_success(peer_id).await; - return Some(block_headers); - } else { - warn!( - "[SYNCING] Received invalid headers from peer, penalizing peer {peer_id}" - ); - self.record_peer_critical_failure(peer_id).await; + // Ignore replies that don't match the expected id (such as late responses) + Ok(_) => continue, + Err(err) => info!("Failed to receive message from peer: {err:?}"), } } - warn!("[SYNCING] Didn't receive block headers from peer, penalizing peer {peer_id}..."); - self.record_peer_failure(peer_id).await; + drop(task_set); + + let Some(block_headers) = block_headers_response else { + continue; + }; + + if are_block_headers_chained(&block_headers, &order) { + return Some(block_headers); + } else { + warn!("[SYNCING] Received invalid headers from peer"); + } + warn!("[SYNCING] Didn't receive block headers from peer"); } None }