Skip to content

Commit

Permalink
refactor(blend): establish new connections, or deny new streams
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjoon-lee committed Jan 23, 2025
1 parent 21cf051 commit 6e56328
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 69 deletions.
4 changes: 2 additions & 2 deletions nomos-blend/core/src/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ where
&self,
rng: &mut R,
amount: usize,
exclude_addrs: &HashSet<Multiaddr>,
exclude_peers: &HashSet<NodeId>,
) -> Vec<&Node<NodeId, M::PublicKey>> {
self.remote_nodes
.iter()
.filter(|node| !exclude_addrs.contains(&node.address))
.filter(|node| !exclude_peers.contains(&node.id))
.choose_multiple(rng, amount)
}

Expand Down
152 changes: 110 additions & 42 deletions nomos-blend/network/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
error::Error,
handler::{BlendConnectionHandler, FromBehaviour, ToBehaviour},
peer_sets::PeerSets,
};
use cached::{Cached, TimedCache};
use libp2p::{
Expand Down Expand Up @@ -30,7 +31,7 @@ where
IntervalProvider: IntervalStreamProvider,
{
config: Config,
peers: HashSet<PeerId>,
peer_sets: PeerSets,
/// Queue of events to yield to the swarm.
events: VecDeque<ToSwarm<Event, FromBehaviour>>,
/// Waker that handles polling
Expand All @@ -45,13 +46,20 @@ where
#[derive(Debug)]
pub struct Config {
pub duplicate_cache_lifespan: u64,
pub peering_degree: usize,
pub max_peering_degree: usize,
pub conn_monitor_settings: Option<ConnectionMonitorSettings>,
}

#[derive(Debug)]
pub enum Event {
/// A message received from one of the peers.
Message(Vec<u8>),
/// New connections need to be established.
NewConnectionsRequested {
amount: usize,
exclude_peers: HashSet<PeerId>,
},
Error(Error),
}

Expand All @@ -65,7 +73,7 @@ where
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
Self {
config,
peers: HashSet::new(),
peer_sets: PeerSets::new(),
events: VecDeque::new(),
waker: None,
duplicate_cache,
Expand Down Expand Up @@ -103,30 +111,29 @@ where
message: Vec<u8>,
excluded_peer: Option<PeerId>,
) -> Result<(), Error> {
let mut peer_ids = self.peers.clone();
if let Some(peer) = &excluded_peer {
peer_ids.remove(peer);
}

if peer_ids.is_empty() {
return Err(Error::NoPeers);
}

peer_ids.into_iter().for_each(|peer_id| {
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: FromBehaviour::Message(message.clone()),
let mut num_peers = 0;
self.peer_sets
.negotiated_peers()
.filter(|peer_id| match excluded_peer {
Some(excluded_peer) => **peer_id != excluded_peer,
None => true,
})
.for_each(|peer_id| {
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::Any,
event: FromBehaviour::Message(message.clone()),
});
num_peers += 1;
});
});

self.try_wake();
Ok(())
}

pub fn peers(&self) -> &HashSet<PeerId> {
&self.peers
if num_peers == 0 {
Err(Error::NoPeers)
} else {
self.try_wake();
Ok(())
}
}

fn message_id(message: &[u8]) -> Vec<u8> {
Expand All @@ -135,6 +142,48 @@ where
hasher.finalize().to_vec()
}

/// Returns the number of new connections needed to reach `peering_degree`,
/// which is the minimum number of 'healthy' connections that the node should maintain.
/// Also, the total number of connections (healthy + unhealthy) should not exceed `max_peering_degree`.
fn num_new_conns_needed(&self) -> usize {
if self.max_peering_degree_reached() {
return 0;
}
self.config
.peering_degree
.saturating_sub(self.peer_sets.num_negotiated_healthy_peers())
}

fn max_peering_degree_reached(&self) -> bool {
self.peer_sets.num_negotiated_peers() >= self.config.max_peering_degree
}

/// Schedules a `NewConnectionsRequested` event if needed.
/// The event contains a list of peers to be excluded, which are connected or malicious.
fn check_and_request_new_connections(&mut self) {
let new_conns_needed = self.num_new_conns_needed();
if new_conns_needed == 0 {
return;
}
self.events
.push_back(ToSwarm::GenerateEvent(Event::NewConnectionsRequested {
amount: new_conns_needed,
exclude_peers: self
.peer_sets
.negotiated_or_malicious_peers()
.copied()
.collect(),
}));
}

/// Checks if an established stream (of the connection) should be denied.
/// This returns `true` in one of the following cases:
/// - The maximum peering degree is reached.
/// - The peer is malicious.
fn should_deny_stream(&self, peer_id: &PeerId) -> bool {
self.max_peering_degree_reached() || self.peer_sets.is_malicious(peer_id)
}

fn create_connection_handler(&self) -> BlendConnectionHandler<M> {
let monitor = self.config.conn_monitor_settings.as_ref().map(|settings| {
ConnectionMonitor::new(
Expand Down Expand Up @@ -184,10 +233,21 @@ where
/// Informs the behaviour about an event from the [`Swarm`].
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { .. }) => {
// TODO: Notify the connection handler to deny the stream if necessary
// - if max peering degree was reached.
// - if the peer has been detected as malicious.
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
..
}) => {
if self.should_deny_stream(&peer_id) {
// Notify the corresponding [`ConnectionHandler`] to close the stream.
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: FromBehaviour::CloseSubstreams,
});
// We don't update [`self.peer_sets`]` now.
// It will be updated when [`FromSwarm::ConnectionClosed`] is received.
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
Expand All @@ -201,11 +261,14 @@ where
// In both cases, we need to remove the peer from the list of connected peers,
// though it may be already removed from list by handling other events.
if remaining_established == 0 {
self.peers.remove(&peer_id);
self.peer_sets.set_disconnected(&peer_id);
self.check_and_request_new_connections();
}
}
_ => {}
}
};

self.try_wake();
}

/// Handles an event generated by the [`BlendConnectionHandler`]
Expand Down Expand Up @@ -245,34 +308,39 @@ where
self.events
.push_back(ToSwarm::GenerateEvent(Event::Message(message)));
}
// The connection was fully negotiated by the peer,
// The inbound connection was fully negotiated by the peer,
// which means that the peer supports the blend protocol.
ToBehaviour::FullyNegotiatedInbound => {
self.peer_sets.add_healthy(peer_id);
}
// The outbound connection was fully negotiated by the peer,
// which means that the peer supports the blend protocol.
ToBehaviour::FullyNegotiatedOutbound => {
self.peers.insert(peer_id);
self.peer_sets.add_healthy(peer_id);
}
ToBehaviour::NegotiationFailed => {
self.peers.remove(&peer_id);
self.peer_sets.set_disconnected(&peer_id);
self.check_and_request_new_connections();
}
ToBehaviour::MaliciousPeer => {
// TODO: Remove the peer from the connected peer list
// and add it to the malicious peer list,
// so that the peer is excluded from the future connection establishments.
// Also, notify the upper layer to try to dial new peers
// if we need more healthy peers.
tracing::debug!("Peer {:?} has been detected as malicious", peer_id);
self.peer_sets.set_malicious(peer_id);
self.check_and_request_new_connections();
}
ToBehaviour::UnhealthyPeer => {
// TODO: Remove the peer from the connected 'healthy' peer list.
// Also, notify the upper layer to try to dial new peers
// if we need more healthy peers.
tracing::debug!("Peer {:?} has been detected as unhealthy", peer_id);
self.peer_sets.set_unhealthy(peer_id);
self.check_and_request_new_connections();
}
ToBehaviour::IOError(error) => {
self.peers.remove(&peer_id);
self.peer_sets.set_disconnected(&peer_id);
self.events
.push_back(ToSwarm::GenerateEvent(Event::Error(Error::PeerIOError {
error,
peer_id,
connection_id,
})));
self.check_and_request_new_connections();
}
}

Expand Down
41 changes: 30 additions & 11 deletions nomos-blend/network/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ impl<Msg> BlendConnectionHandler<Msg> {
}
}

/// Mark the inbound/outbound substream state as Dropped.
/// Then the substream hold by the state will be dropped from memory.
/// As a result, Swarm will decrease the ref count to the connection,
/// and close the connection when the count is 0.
///
/// Also, this clears all pending messages and events
/// to avoid confusions for event recipients.
fn close_substreams(&mut self) {
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.outbound_msgs.clear();
self.pending_events_to_behaviour.clear();
}

fn try_wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
Expand All @@ -82,13 +96,20 @@ impl<Msg> BlendConnectionHandler<Msg> {
pub enum FromBehaviour {
/// A message to be sent to the connection.
Message(Vec<u8>),
/// Close inbound/outbound substreams.
/// This happens when [`crate::Behaviour`] determines that one of the followings is true.
/// - Max peering degree is reached.
/// - The peer has been detected as malicious.
CloseSubstreams,
}

#[derive(Debug)]
pub enum ToBehaviour {
/// An inbound substream has been successfully upgraded for the blend protocol.
FullyNegotiatedInbound,
/// An outbound substream has been successfully upgraded for the blend protocol.
FullyNegotiatedOutbound,
/// An outbound substream was failed to be upgraded for the blend protocol.
/// A substream was failed to be upgraded for the blend protocol.
NegotiationFailed,
/// A message has been received from the connection.
Message(Vec<u8>),
Expand Down Expand Up @@ -134,12 +155,7 @@ where
if let Poll::Ready(output) = monitor.poll(cx) {
match output {
ConnectionMonitorOutput::Malicious => {
// Mark the inbound/outbound substream state as Dropped.
// Then the substream hold by the state will be dropped from memory.
// As a result, Swarm will decrease the ref count to the connection,
// and close the connection when the count is 0.
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.close_substreams();
self.pending_events_to_behaviour
.push_back(ToBehaviour::MaliciousPeer);
}
Expand Down Expand Up @@ -190,8 +206,7 @@ where
tracing::error!(
"Failed to receive message from inbound stream: {e:?}. Dropping both inbound/outbound substreams"
);
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.close_substreams();
}
Poll::Pending => {
tracing::debug!("No message received from inbound stream yet. Waiting more...");
Expand Down Expand Up @@ -241,8 +256,7 @@ where
}
Poll::Ready(Err(e)) => {
tracing::error!("Failed to send message to outbound stream: {e:?}. Dropping both inbound and outbound substreams");
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.close_substreams();
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::IOError(e),
));
Expand Down Expand Up @@ -276,6 +290,9 @@ where
FromBehaviour::Message(msg) => {
self.outbound_msgs.push_back(msg);
}
FromBehaviour::CloseSubstreams => {
self.close_substreams();
}
}
}

Expand All @@ -296,6 +313,8 @@ where
tracing::debug!("FullyNegotiatedInbound: Creating inbound substream");
self.inbound_substream =
Some(InboundSubstreamState::PendingRecv(recv_msg(stream).boxed()));
self.pending_events_to_behaviour
.push_back(ToBehaviour::FullyNegotiatedInbound);
VALUE_FULLY_NEGOTIATED_INBOUND
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
Expand Down
Loading

0 comments on commit 6e56328

Please sign in to comment.