Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(peer-store): Remove addresses from peer store on dial failure #5926

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
119 changes: 82 additions & 37 deletions misc/peer-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
};

use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::FromSwarm;
use libp2p_swarm::{DialError, FromSwarm};
use lru::LruCache;

use super::Store;
Expand Down Expand Up @@ -52,38 +52,48 @@ impl<T> MemoryStore<T> {
}
}

/// Update an address record and notify swarm when the address is new.
/// Returns `true` when the address is new.
pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
let is_updated = self.update_address_silent(peer, address);
/// Update an address record and notify swarm when the address is new.
/// If `permanent` is true, and the address is not yet present, calls to `remove_address` will
/// only succeed if `force` is true.
/// Returns `true` when the address is new.
pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool {
let is_updated = self.update_address_silent(peer, address, permanent);
if is_updated {
self.pending_events
.push_back(crate::store::Event::RecordUpdated(*peer));
if let Some(waker) = self.waker.take() {
waker.wake();
}
self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer));
}
is_updated
}

/// Update an address record without notifying swarm.
/// Returns `true` when the address is new.
pub fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool {
if let Some(record) = self.records.get_mut(peer) {
return record.update_address(address);
return record.update_address(address, permanent);
}
let mut new_record = PeerRecord::new(self.config.record_capacity);
new_record.update_address(address);
new_record.update_address(address, permanent);
self.records.insert(*peer, new_record);
true
}

/// Remove an address record.
/// If `force` is false, addresses that have been marked `permanent` on insertion will not be
/// removed.
/// Returns `true` when the address is removed.
pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool {
let is_updated = self.remove_address_silent(peer, address, force);
if is_updated {
self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer));
}
is_updated
}

/// Remove an address record without notifying swarm.
/// Returns `true` when the address is removed.
pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
fn remove_address_silent(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool {
self.records
.get_mut(peer)
.is_some_and(|r| r.remove_address(address))
.is_some_and(|r| r.remove_address(address, force))
}
Comment on lines +93 to 97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove a peer from the hashmap if there are no more known addresses.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if and only if there is no custom data, or regardless?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you're right. Only if there is not custom data.


pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
Expand All @@ -100,15 +110,11 @@ impl<T> MemoryStore<T> {
/// Insert the data and notify the swarm about the update, dropping the old data if it exists.
pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) {
self.insert_custom_data_silent(peer, custom_data);
self.pending_events
.push_back(crate::store::Event::Store(Event::CustomDataUpdated(*peer)));
if let Some(waker) = self.waker.take() {
waker.wake();
}
self.push_event_and_wake(crate::store::Event::Store(Event::CustomDataUpdated(*peer)));
}

/// Insert the data without notifying the swarm. Old data will be dropped if it exists.
pub fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) {
fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) {
if let Some(r) = self.records.get_mut(peer) {
return r.insert_custom_data(custom_data);
}
Expand All @@ -127,6 +133,13 @@ impl<T> MemoryStore<T> {
pub fn record_iter_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerRecord<T>)> {
self.records.iter_mut()
}

fn push_event_and_wake(&mut self, event: crate::store::Event<Event>) {
self.pending_events.push_back(event);
if let Some(waker) = self.waker.take() {
waker.wake(); // wake up because of update
}
}
}

impl<T> Store for MemoryStore<T> {
Expand All @@ -135,21 +148,48 @@ impl<T> Store for MemoryStore<T> {
fn on_swarm_event(&mut self, swarm_event: &FromSwarm) {
match swarm_event {
FromSwarm::NewExternalAddrOfPeer(info) => {
self.update_address(&info.peer_id, info.addr);
self.update_address(&info.peer_id, info.addr, false);
}
FromSwarm::ConnectionEstablished(info) => {
let mut is_record_updated = false;
for failed_addr in info.failed_addresses {
is_record_updated |= self.remove_address(&info.peer_id, failed_addr);
is_record_updated |= self.remove_address_silent(&info.peer_id, failed_addr, false);
}
is_record_updated |=
self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address());
self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address(), false);
if is_record_updated {
self.pending_events
.push_back(crate::store::Event::RecordUpdated(info.peer_id));
if let Some(waker) = self.waker.take() {
waker.wake(); // wake up because of update
self.push_event_and_wake(crate::store::Event::RecordUpdated(info.peer_id));
}
}
FromSwarm::DialFailure(info) => {
let Some(peer) = info.peer_id else {
// We don't know which peer we are talking about here.
return;
};

match info.error {
DialError::LocalPeerId { .. } => {
// The stored peer is the local peer. Remove peer fully.
if self.records.remove(&peer).is_some() {
self.push_event_and_wake(crate::store::Event::RecordUpdated(peer));
}
}
DialError::WrongPeerId { obtained, address } => {
// The stored peer id is incorrect, remove incorrect and add correct one.
self.remove_address(&peer, address, false);
self.update_address(obtained, address, false);
}
DialError::Transport(errors) => {
// Remove all attempted addresses.
let mut is_record_updated = false;
for (addr, _) in errors {
is_record_updated |= self.remove_address_silent(&peer, addr, false);
}
if is_record_updated {
self.push_event_and_wake(crate::store::Event::RecordUpdated(peer));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Slightly orthogonal to this PR, but noticed it again while reviewing. Can be solved in a separate PR).

I find this even variant not very informative.
I've already raised it in #5724 (comment), but forgot about it again in later reviews. IMO the RecordUpdated variant should include further info about the info that was added.
For the memory store I think it would make sense to have different variants AddressAdded AddressRemove etc. Wdyt?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

}
}
_ => {}
}
}
_ => {}
Expand Down Expand Up @@ -205,7 +245,8 @@ impl Config {
pub struct PeerRecord<T> {
/// A LRU(Least Recently Used) cache for addresses.
/// Will delete the least-recently-used record when full.
addresses: LruCache<Multiaddr, ()>,
/// If the associated `bool` is true, the address can only be force-removed
addresses: LruCache<Multiaddr, bool>,
/// Custom data attached to the peer.
custom_data: Option<T>,
}
Expand All @@ -226,17 +267,20 @@ impl<T> PeerRecord<T> {
/// Update the address in the LRU cache, promote it to the front if it exists,
/// insert it to the front if not.
/// Returns true when the address is new.
pub fn update_address(&mut self, address: &Multiaddr) -> bool {
pub fn update_address(&mut self, address: &Multiaddr, permanent: bool) -> bool {
if self.addresses.get(address).is_some() {
return false;
}
Comment on lines +270 to 273
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If permanent is true I think we should update the permanent parameter on the address. Otherwise there is no way for the user to change an existing discovered address into a permanent one.

self.addresses.get_or_insert(address.clone(), || ());
self.addresses.get_or_insert(address.clone(), || permanent);
true
}

/// Remove the address in the LRU cache regardless of its position.
/// Returns true when the address is removed, false when not exist.
pub fn remove_address(&mut self, address: &Multiaddr) -> bool {
pub fn remove_address(&mut self, address: &Multiaddr, force: bool) -> bool {
if !force && self.addresses.peek(address) == Some(&true) {
return false;
}
self.addresses.pop(address).is_some()
}

Expand Down Expand Up @@ -272,9 +316,9 @@ mod test {
let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed");
store.update_address(&peer, &addr1);
store.update_address(&peer, &addr2);
store.update_address(&peer, &addr3);
store.update_address(&peer, &addr1, false);
store.update_address(&peer, &addr2, false);
store.update_address(&peer, &addr3, false);
assert!(
store
.records
Expand All @@ -284,7 +328,7 @@ mod test {
.collect::<Vec<_>>()
== vec![&addr3, &addr2, &addr1]
);
store.update_address(&peer, &addr1);
store.update_address(&peer, &addr1, false);
assert!(
store
.records
Expand All @@ -294,7 +338,7 @@ mod test {
.collect::<Vec<_>>()
== vec![&addr1, &addr3, &addr2]
);
store.update_address(&peer, &addr3);
store.update_address(&peer, &addr3, false);
assert!(
store
.records
Expand All @@ -315,6 +359,7 @@ mod test {
store.update_address(
&peer,
&Multiaddr::from_str(&addr_string).expect("parsing to succeed"),
false,
);
}
let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
Expand Down
Loading