Skip to content

Commit

Permalink
feat: improve initial connection times (#6343)
Browse files Browse the repository at this point in the history
Description
---
Improved initial connection times from a dht perspective for nodes at
startup when all previous connections to peers failed, as well as when a
node lost all connectivity. With this PR a node will attempt to connect
to peers with previously failed attempts under such circumstances,
improving general connectivity. Nodes will refresh their dht neighbour
list without discriminating against those peers so that all nodes can be
tried again.

Motivation and Context
---
When a node starts up or loses all connectivity and all known peers have
had a failed connection, the node will wait for a 24-hour cooldown
period before the same nodes are contacted again.

How Has This Been Tested?
---
System-level testing.

What process can a PR reviewer use to test or verify this change?
---
Code review.

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal authored May 16, 2024
1 parent 16ca4b5 commit 64e650b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 21 deletions.
1 change: 1 addition & 0 deletions comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ impl ConnectivityManagerActor {
target: LOG_TARGET,
"Node is offline. Ignoring connection failure event for peer '{}'.", node_id
);
self.publish_event(ConnectivityEvent::ConnectivityStateOffline);
return Ok(());
}

Expand Down
49 changes: 28 additions & 21 deletions comms/dht/src/connectivity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
mod test;

mod metrics;
use std::{sync::Arc, time::Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use log::*;
pub use metrics::{MetricsCollector, MetricsCollectorHandle};
Expand Down Expand Up @@ -159,7 +162,7 @@ impl DhtConnectivity {
tokio::time::sleep(delay).await;
debug!(target: LOG_TARGET, "DHT connectivity starting after delayed for {:.0?}", delay);
}
self.refresh_neighbour_pool().await?;
self.refresh_neighbour_pool(true).await?;

let mut ticker = time::interval(self.config.connectivity.update_interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
Expand Down Expand Up @@ -298,7 +301,7 @@ impl DhtConnectivity {
match event {
DhtEvent::NetworkDiscoveryPeersAdded(info) => {
if info.num_new_peers > 0 {
self.refresh_peer_pools().await?;
self.refresh_peer_pools(false).await?;
}
},
_ => {},
Expand Down Expand Up @@ -332,22 +335,22 @@ impl DhtConnectivity {
Ok(())
}

async fn refresh_peer_pools(&mut self) -> Result<(), DhtConnectivityError> {
async fn refresh_peer_pools(&mut self, try_revive_connections: bool) -> Result<(), DhtConnectivityError> {
info!(
target: LOG_TARGET,
"Reinitializing neighbour pool. (size={})",
self.neighbours.len(),
);

self.refresh_neighbour_pool().await?;
self.refresh_neighbour_pool(try_revive_connections).await?;
self.refresh_random_pool().await?;

Ok(())
}

async fn refresh_neighbour_pool_if_required(&mut self) -> Result<(), DhtConnectivityError> {
if self.num_connected_neighbours() < self.config.num_neighbouring_nodes {
self.refresh_neighbour_pool().await?;
self.refresh_neighbour_pool(false).await?;
}

Ok(())
Expand All @@ -364,10 +367,10 @@ impl DhtConnectivity {
self.connection_handles.iter().map(|c| c.peer_node_id())
}

async fn refresh_neighbour_pool(&mut self) -> Result<(), DhtConnectivityError> {
async fn refresh_neighbour_pool(&mut self, try_revive_connections: bool) -> Result<(), DhtConnectivityError> {
self.remove_allow_list_peers_from_pools().await?;
let mut new_neighbours = self
.fetch_neighbouring_peers(self.config.num_neighbouring_nodes, &[])
.fetch_neighbouring_peers(self.config.num_neighbouring_nodes, &[], try_revive_connections)
.await?;

if new_neighbours.is_empty() {
Expand Down Expand Up @@ -682,7 +685,7 @@ impl DhtConnectivity {
self.dial_multiple_peers(&[node_id]).await?;
},
ConnectivityStateOnline(n) => {
self.refresh_peer_pools().await?;
self.refresh_peer_pools(false).await?;
if self.config.auto_join && self.should_send_join() {
debug!(
target: LOG_TARGET,
Expand All @@ -698,7 +701,8 @@ impl DhtConnectivity {
},
ConnectivityStateOffline => {
debug!(target: LOG_TARGET, "Node is OFFLINE");
self.refresh_peer_pools().await?;
tokio::time::sleep(Duration::from_secs(15)).await;
self.refresh_peer_pools(true).await?;
},
_ => {},
}
Expand Down Expand Up @@ -779,7 +783,7 @@ impl DhtConnectivity {
target: LOG_TARGET,
"Peer '{}' in neighbour pool is offline. Adding a new peer if possible", current_peer
);
match self.fetch_neighbouring_peers(1, &exclude).await?.pop() {
match self.fetch_neighbouring_peers(1, &exclude, false).await?.pop() {
Some(new_peer) => {
self.insert_neighbour_ordered_by_distance(new_peer.clone());
self.dial_multiple_peers(&[new_peer]).await?;
Expand Down Expand Up @@ -902,6 +906,7 @@ impl DhtConnectivity {
&mut self,
n: usize,
excluded: &[NodeId],
try_revive_connections: bool,
) -> Result<Vec<NodeId>, DhtConnectivityError> {
let peer_allow_list = self.peer_allow_list().await?;
let neighbour_distance = self.max_neighbour_distance_all_conncetions().await?;
Expand Down Expand Up @@ -934,16 +939,18 @@ impl DhtConnectivity {
return false;
}

if peer
.offline_since()
.map(|since| since <= offline_cooldown)
.unwrap_or(false)
{
return false;
}
// we have tried to connect to this peer, and we have never made a successful attempt at connection
if peer.all_addresses_failed() {
return false;
if !try_revive_connections {
if peer
.offline_since()
.map(|since| since <= offline_cooldown)
.unwrap_or(false)
{
return false;
}
// we have tried to connect to this peer, and we have never made a successful attempt at connection
if peer.all_addresses_failed() {
return false;
}
}

let is_excluded = excluded.contains(&peer.node_id);
Expand Down

0 comments on commit 64e650b

Please sign in to comment.