diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index cc6a4d29558..393cce2ed4c 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -15,7 +15,7 @@ use std::{ }; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::FromSwarm; +use libp2p_swarm::{DialError, FromSwarm}; use lru::LruCache; use super::Store; @@ -52,38 +52,48 @@ impl MemoryStore { } } - /// Update an address record and notify swarm when the address is new. - /// Returns `true` when the address is new. - pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { - let is_updated = self.update_address_silent(peer, address); + /// Update an address record and notify swarm when the address is new. + /// If `permanent` is true, and the address is not yet present, calls to `remove_address` will + /// only succeed if `force` is true. + /// Returns `true` when the address is new. + pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool { + let is_updated = self.update_address_silent(peer, address, permanent); if is_updated { - self.pending_events - .push_back(crate::store::Event::RecordUpdated(*peer)); - if let Some(waker) = self.waker.take() { - waker.wake(); - } + self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer)); } is_updated } /// Update an address record without notifying swarm. /// Returns `true` when the address is new. - pub fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + fn update_address_silent(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool { if let Some(record) = self.records.get_mut(peer) { - return record.update_address(address); + return record.update_address(address, permanent); } let mut new_record = PeerRecord::new(self.config.record_capacity); - new_record.update_address(address); + new_record.update_address(address, permanent); self.records.insert(*peer, new_record); true } /// Remove an address record. + /// If `force` is false, addresses that have been marked `permanent` on insertion will not be + /// removed. + /// Returns `true` when the address is removed. + pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool { + let is_updated = self.remove_address_silent(peer, address, force); + if is_updated { + self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer)); + } + is_updated + } + + /// Remove an address record without notifying swarm. /// Returns `true` when the address is removed. - pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + fn remove_address_silent(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool { self.records .get_mut(peer) - .is_some_and(|r| r.remove_address(address)) + .is_some_and(|r| r.remove_address(address, force)) } pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> { @@ -100,15 +110,11 @@ impl MemoryStore { /// Insert the data and notify the swarm about the update, dropping the old data if it exists. pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) { self.insert_custom_data_silent(peer, custom_data); - self.pending_events - .push_back(crate::store::Event::Store(Event::CustomDataUpdated(*peer))); - if let Some(waker) = self.waker.take() { - waker.wake(); - } + self.push_event_and_wake(crate::store::Event::Store(Event::CustomDataUpdated(*peer))); } /// Insert the data without notifying the swarm. Old data will be dropped if it exists. - pub fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) { + fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) { if let Some(r) = self.records.get_mut(peer) { return r.insert_custom_data(custom_data); } @@ -127,6 +133,13 @@ impl MemoryStore { pub fn record_iter_mut(&mut self) -> impl Iterator)> { self.records.iter_mut() } + + fn push_event_and_wake(&mut self, event: crate::store::Event) { + self.pending_events.push_back(event); + if let Some(waker) = self.waker.take() { + waker.wake(); // wake up because of update + } + } } impl Store for MemoryStore { @@ -135,21 +148,48 @@ impl Store for MemoryStore { fn on_swarm_event(&mut self, swarm_event: &FromSwarm) { match swarm_event { FromSwarm::NewExternalAddrOfPeer(info) => { - self.update_address(&info.peer_id, info.addr); + self.update_address(&info.peer_id, info.addr, false); } FromSwarm::ConnectionEstablished(info) => { let mut is_record_updated = false; for failed_addr in info.failed_addresses { - is_record_updated |= self.remove_address(&info.peer_id, failed_addr); + is_record_updated |= self.remove_address_silent(&info.peer_id, failed_addr, false); } is_record_updated |= - self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address()); + self.update_address_silent(&info.peer_id, info.endpoint.get_remote_address(), false); if is_record_updated { - self.pending_events - .push_back(crate::store::Event::RecordUpdated(info.peer_id)); - if let Some(waker) = self.waker.take() { - waker.wake(); // wake up because of update + self.push_event_and_wake(crate::store::Event::RecordUpdated(info.peer_id)); + } + } + FromSwarm::DialFailure(info) => { + let Some(peer) = info.peer_id else { + // We don't know which peer we are talking about here. + return; + }; + + match info.error { + DialError::LocalPeerId { .. } => { + // The stored peer is the local peer. Remove peer fully. + if self.records.remove(&peer).is_some() { + self.push_event_and_wake(crate::store::Event::RecordUpdated(peer)); + } + } + DialError::WrongPeerId { obtained, address } => { + // The stored peer id is incorrect, remove incorrect and add correct one. + self.remove_address(&peer, address, false); + self.update_address(obtained, address, false); + } + DialError::Transport(errors) => { + // Remove all attempted addresses. + let mut is_record_updated = false; + for (addr, _) in errors { + is_record_updated |= self.remove_address_silent(&peer, addr, false); + } + if is_record_updated { + self.push_event_and_wake(crate::store::Event::RecordUpdated(peer)); + } } + _ => {} } } _ => {} @@ -205,7 +245,8 @@ impl Config { pub struct PeerRecord { /// A LRU(Least Recently Used) cache for addresses. /// Will delete the least-recently-used record when full. - addresses: LruCache, + /// If the associated `bool` is true, the address can only be force-removed + addresses: LruCache, /// Custom data attached to the peer. custom_data: Option, } @@ -226,17 +267,20 @@ impl PeerRecord { /// Update the address in the LRU cache, promote it to the front if it exists, /// insert it to the front if not. /// Returns true when the address is new. - pub fn update_address(&mut self, address: &Multiaddr) -> bool { + pub fn update_address(&mut self, address: &Multiaddr, permanent: bool) -> bool { if self.addresses.get(address).is_some() { return false; } - self.addresses.get_or_insert(address.clone(), || ()); + self.addresses.get_or_insert(address.clone(), || permanent); true } /// Remove the address in the LRU cache regardless of its position. /// Returns true when the address is removed, false when not exist. - pub fn remove_address(&mut self, address: &Multiaddr) -> bool { + pub fn remove_address(&mut self, address: &Multiaddr, force: bool) -> bool { + if !force && self.addresses.peek(address) == Some(&true) { + return false; + } self.addresses.pop(address).is_some() } @@ -272,9 +316,9 @@ mod test { let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed"); - store.update_address(&peer, &addr1); - store.update_address(&peer, &addr2); - store.update_address(&peer, &addr3); + store.update_address(&peer, &addr1, false); + store.update_address(&peer, &addr2, false); + store.update_address(&peer, &addr3, false); assert!( store .records @@ -284,7 +328,7 @@ mod test { .collect::>() == vec![&addr3, &addr2, &addr1] ); - store.update_address(&peer, &addr1); + store.update_address(&peer, &addr1, false); assert!( store .records @@ -294,7 +338,7 @@ mod test { .collect::>() == vec![&addr1, &addr3, &addr2] ); - store.update_address(&peer, &addr3); + store.update_address(&peer, &addr3, false); assert!( store .records @@ -315,6 +359,7 @@ mod test { store.update_address( &peer, &Multiaddr::from_str(&addr_string).expect("parsing to succeed"), + false, ); } let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");