From 352f1aafa11e6cda5d3076cd3bbbc9b6f182fe30 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Mon, 10 Mar 2025 16:31:37 +0100 Subject: [PATCH 1/4] feat(peer-store): Remove addresses from peer store on dial failure --- misc/peer-store/src/memory_store.rs | 68 ++++++++++++++++++++++------- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index cc6a4d29558..a05dec088ee 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -15,7 +15,7 @@ use std::{ }; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::FromSwarm; +use libp2p_swarm::{DialError, FromSwarm}; use lru::LruCache; use super::Store; @@ -57,11 +57,7 @@ impl MemoryStore { pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { let is_updated = self.update_address_silent(peer, address); if is_updated { - self.pending_events - .push_back(crate::store::Event::RecordUpdated(*peer)); - if let Some(waker) = self.waker.take() { - waker.wake(); - } + self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer)); } is_updated } @@ -81,6 +77,16 @@ impl MemoryStore { /// Remove an address record. /// Returns `true` when the address is removed. pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + let is_updated = self.remove_address_silent(peer, address); + if is_updated { + self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer)); + } + is_updated + } + + /// Remove an address record without notifying swarm. + /// Returns `true` when the address is removed. + pub fn remove_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { self.records .get_mut(peer) .is_some_and(|r| r.remove_address(address)) @@ -100,11 +106,7 @@ impl MemoryStore { /// Insert the data and notify the swarm about the update, dropping the old data if it exists. pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) { self.insert_custom_data_silent(peer, custom_data); - self.pending_events - .push_back(crate::store::Event::Store(Event::CustomDataUpdated(*peer))); - if let Some(waker) = self.waker.take() { - waker.wake(); - } + self.push_event_and_wake(crate::store::Event::Store(Event::CustomDataUpdated(*peer))); } /// Insert the data without notifying the swarm. Old data will be dropped if it exists. @@ -127,6 +129,13 @@ impl MemoryStore { pub fn record_iter_mut(&mut self) -> impl Iterator)> { self.records.iter_mut() } + + fn push_event_and_wake(&mut self, event: crate::store::Event) { + self.pending_events.push_back(event); + if let Some(waker) = self.waker.take() { + waker.wake(); // wake up because of update + } + } } impl Store for MemoryStore { @@ -140,16 +149,43 @@ impl Store for MemoryStore { FromSwarm::ConnectionEstablished(info) => { let mut is_record_updated = false; for failed_addr in info.failed_addresses { - is_record_updated |= self.remove_address(&info.peer_id, failed_addr); + is_record_updated |= self.remove_address_silent(&info.peer_id, failed_addr); } is_record_updated |= self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address()); if is_record_updated { - self.pending_events - .push_back(crate::store::Event::RecordUpdated(info.peer_id)); - if let Some(waker) = self.waker.take() { - waker.wake(); // wake up because of update + self.push_event_and_wake(crate::store::Event::RecordUpdated(info.peer_id)); + } + } + FromSwarm::DialFailure(info) => { + let Some(peer) = info.peer_id else { + // We don't know which peer we are talking about here. + return; + }; + + match info.error { + DialError::LocalPeerId { .. } => { + // The stored peer is the local peer. Remove peer fully. + if self.records.remove(&peer).is_some() { + self.push_event_and_wake(crate::store::Event::RecordUpdated(peer)); + } + } + DialError::WrongPeerId { obtained, address } => { + // The stored peer id is incorrect, remove incorrect and add correct one. + self.remove_address(&peer, &address); + self.update_address(obtained, address); + } + DialError::Transport(errors) => { + // Remove all attempted addresses. + let mut is_record_updated = false; + for (addr, _) in errors { + is_record_updated |= self.remove_address_silent(&peer, addr); + } + if is_record_updated { + self.push_event_and_wake(crate::store::Event::RecordUpdated(peer)); + } } + _ => {} } } _ => {} From 6b9453fff201795d25104048bfbe54bd52b164a1 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 11 Mar 2025 08:59:03 +0100 Subject: [PATCH 2/4] clippy --- misc/peer-store/src/memory_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index a05dec088ee..9c192068669 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -172,7 +172,7 @@ impl Store for MemoryStore { } DialError::WrongPeerId { obtained, address } => { // The stored peer id is incorrect, remove incorrect and add correct one. - self.remove_address(&peer, &address); + self.remove_address(&peer, address); self.update_address(obtained, address); } DialError::Transport(errors) => { From 0654830a5c621f322ec155d212e2e90e1a50b2f0 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 12 Mar 2025 12:03:40 +0100 Subject: [PATCH 3/4] make *_silent fns non-pub --- misc/peer-store/src/memory_store.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 9c192068669..d86a470de3e 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -64,7 +64,7 @@ impl MemoryStore { /// Update an address record without notifying swarm. /// Returns `true` when the address is new. - pub fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { if let Some(record) = self.records.get_mut(peer) { return record.update_address(address); } @@ -86,7 +86,7 @@ impl MemoryStore { /// Remove an address record without notifying swarm. /// Returns `true` when the address is removed. - pub fn remove_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + fn remove_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { self.records .get_mut(peer) .is_some_and(|r| r.remove_address(address)) @@ -110,7 +110,7 @@ impl MemoryStore { } /// Insert the data without notifying the swarm. Old data will be dropped if it exists. - pub fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) { + fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) { if let Some(r) = self.records.get_mut(peer) { return r.insert_custom_data(custom_data); } From 0b243cc7f9009abc1d0e55308522abb75316352a Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 14 Mar 2025 15:08:13 +0100 Subject: [PATCH 4/4] allow making addresses unremovable by events --- misc/peer-store/src/memory_store.rs | 61 +++++++++++++++++------------ 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index d86a470de3e..393cce2ed4c 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -52,10 +52,12 @@ impl MemoryStore { } } - /// Update an address record and notify swarm when the address is new. - /// Returns `true` when the address is new. - pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { - let is_updated = self.update_address_silent(peer, address); + /// Update an address record and notify swarm when the address is new. + /// If `permanent` is true, and the address is not yet present, calls to `remove_address` will + /// only succeed if `force` is true. + /// Returns `true` when the address is new. + pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool { + let is_updated = self.update_address_silent(peer, address, permanent); if is_updated { self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer)); } @@ -64,20 +66,22 @@ impl MemoryStore { /// Update an address record without notifying swarm. /// Returns `true` when the address is new. - fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool { if let Some(record) = self.records.get_mut(peer) { - return record.update_address(address); + return record.update_address(address, permanent); } let mut new_record = PeerRecord::new(self.config.record_capacity); - new_record.update_address(address); + new_record.update_address(address, permanent); self.records.insert(*peer, new_record); true } /// Remove an address record. + /// If `force` is false, addresses that have been marked `permanent` on insertion will not be + /// removed. /// Returns `true` when the address is removed. - pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { - let is_updated = self.remove_address_silent(peer, address); + pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool { + let is_updated = self.remove_address_silent(peer, address, force); if is_updated { self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer)); } @@ -86,10 +90,10 @@ impl MemoryStore { /// Remove an address record without notifying swarm. /// Returns `true` when the address is removed. - fn remove_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + fn remove_address_silent(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool { self.records .get_mut(peer) - .is_some_and(|r| r.remove_address(address)) + .is_some_and(|r| r.remove_address(address, force)) } pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> { @@ -144,15 +148,15 @@ impl Store for MemoryStore { fn on_swarm_event(&mut self, swarm_event: &FromSwarm) { match swarm_event { FromSwarm::NewExternalAddrOfPeer(info) => { - self.update_address(&info.peer_id, info.addr); + self.update_address(&info.peer_id, info.addr, false); } FromSwarm::ConnectionEstablished(info) => { let mut is_record_updated = false; for failed_addr in info.failed_addresses { - is_record_updated |= self.remove_address_silent(&info.peer_id, failed_addr); + is_record_updated |= self.remove_address_silent(&info.peer_id, failed_addr, false); } is_record_updated |= - self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address()); + self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address(), false); if is_record_updated { self.push_event_and_wake(crate::store::Event::RecordUpdated(info.peer_id)); } @@ -172,14 +176,14 @@ impl Store for MemoryStore { } DialError::WrongPeerId { obtained, address } => { // The stored peer id is incorrect, remove incorrect and add correct one. - self.remove_address(&peer, address); - self.update_address(obtained, address); + self.remove_address(&peer, address, false); + self.update_address(obtained, address, false); } DialError::Transport(errors) => { // Remove all attempted addresses. let mut is_record_updated = false; for (addr, _) in errors { - is_record_updated |= self.remove_address_silent(&peer, addr); + is_record_updated |= self.remove_address_silent(&peer, addr, false); } if is_record_updated { self.push_event_and_wake(crate::store::Event::RecordUpdated(peer)); @@ -241,7 +245,8 @@ impl Config { pub struct PeerRecord { /// A LRU(Least Recently Used) cache for addresses. /// Will delete the least-recently-used record when full. - addresses: LruCache, + /// If the associated `bool` is true, the address can only be force-removed + addresses: LruCache, /// Custom data attached to the peer. custom_data: Option, } @@ -262,17 +267,20 @@ impl PeerRecord { /// Update the address in the LRU cache, promote it to the front if it exists, /// insert it to the front if not. /// Returns true when the address is new. - pub fn update_address(&mut self, address: &Multiaddr) -> bool { + pub fn update_address(&mut self, address: &Multiaddr, permanent: bool) -> bool { if self.addresses.get(address).is_some() { return false; } - self.addresses.get_or_insert(address.clone(), || ()); + self.addresses.get_or_insert(address.clone(), || permanent); true } /// Remove the address in the LRU cache regardless of its position. /// Returns true when the address is removed, false when not exist. - pub fn remove_address(&mut self, address: &Multiaddr) -> bool { + pub fn remove_address(&mut self, address: &Multiaddr, force: bool) -> bool { + if !force && self.addresses.peek(address) == Some(&true) { + return false; + } self.addresses.pop(address).is_some() } @@ -308,9 +316,9 @@ mod test { let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed"); - store.update_address(&peer, &addr1); - store.update_address(&peer, &addr2); - store.update_address(&peer, &addr3); + store.update_address(&peer, &addr1, false); + store.update_address(&peer, &addr2, false); + store.update_address(&peer, &addr3, false); assert!( store .records @@ -320,7 +328,7 @@ mod test { .collect::>() == vec![&addr3, &addr2, &addr1] ); - store.update_address(&peer, &addr1); + store.update_address(&peer, &addr1, false); assert!( store .records @@ -330,7 +338,7 @@ mod test { .collect::>() == vec![&addr1, &addr3, &addr2] ); - store.update_address(&peer, &addr3); + store.update_address(&peer, &addr3, false); assert!( store .records @@ -351,6 +359,7 @@ mod test { store.update_address( &peer, &Multiaddr::from_str(&addr_string).expect("parsing to succeed"), + false, ); } let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");