Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blend): establish new connections, or deny new streams #989

Open
wants to merge 3 commits into
base: blend-drop-conns
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nomos-blend/core/src/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ where
&self,
rng: &mut R,
amount: usize,
exclude_addrs: &HashSet<Multiaddr>,
exclude_peers: &HashSet<NodeId>,
) -> Vec<&Node<NodeId, M::PublicKey>> {
self.remote_nodes
.iter()
.filter(|node| !exclude_addrs.contains(&node.address))
.filter(|node| !exclude_peers.contains(&node.id))
.choose_multiple(rng, amount)
}

Expand Down
152 changes: 110 additions & 42 deletions nomos-blend/network/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
error::Error,
handler::{BlendConnectionHandler, FromBehaviour, ToBehaviour},
peer_sets::PeerSets,
};
use cached::{Cached, TimedCache};
use libp2p::{
Expand Down Expand Up @@ -30,7 +31,7 @@ where
IntervalProvider: IntervalStreamProvider,
{
config: Config,
peers: HashSet<PeerId>,
peer_sets: PeerSets,
/// Queue of events to yield to the swarm.
events: VecDeque<ToSwarm<Event, FromBehaviour>>,
/// Waker that handles polling
Expand All @@ -45,13 +46,20 @@ where
#[derive(Debug)]
pub struct Config {
pub duplicate_cache_lifespan: u64,
pub peering_degree: usize,
pub max_peering_degree: usize,
pub conn_monitor_settings: Option<ConnectionMonitorSettings>,
}

#[derive(Debug)]
pub enum Event {
/// A message received from one of the peers.
Message(Vec<u8>),
/// New connections need to be established.
NewConnectionsRequested {
amount: usize,
exclude_peers: HashSet<PeerId>,
},
Error(Error),
}

Expand All @@ -65,7 +73,7 @@ where
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
Self {
config,
peers: HashSet::new(),
peer_sets: PeerSets::new(),
events: VecDeque::new(),
waker: None,
duplicate_cache,
Expand Down Expand Up @@ -103,30 +111,29 @@ where
message: Vec<u8>,
excluded_peer: Option<PeerId>,
) -> Result<(), Error> {
let mut peer_ids = self.peers.clone();
if let Some(peer) = &excluded_peer {
peer_ids.remove(peer);
}

if peer_ids.is_empty() {
return Err(Error::NoPeers);
}

peer_ids.into_iter().for_each(|peer_id| {
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: FromBehaviour::Message(message.clone()),
let mut num_peers = 0;
self.peer_sets
.negotiated_peers()
.filter(|peer_id| match excluded_peer {
Some(excluded_peer) => **peer_id != excluded_peer,
None => true,
})
.for_each(|peer_id| {
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::Any,
event: FromBehaviour::Message(message.clone()),
});
num_peers += 1;
});
});

self.try_wake();
Ok(())
}

pub fn peers(&self) -> &HashSet<PeerId> {
&self.peers
if num_peers == 0 {
Err(Error::NoPeers)
} else {
self.try_wake();
Ok(())
}
}

fn message_id(message: &[u8]) -> Vec<u8> {
Expand All @@ -135,6 +142,48 @@ where
hasher.finalize().to_vec()
}

/// Returns the number of new connections needed to reach `peering_degree`,
/// which is the minimum number of 'healthy' connections that the node should maintain.
/// Also, the total number of connections (healthy + unhealthy) should not exceed `max_peering_degree`.
fn num_new_conns_needed(&self) -> usize {
if self.max_peering_degree_reached() {
return 0;
}
self.config
.peering_degree
.saturating_sub(self.peer_sets.num_negotiated_healthy_peers())
}

fn max_peering_degree_reached(&self) -> bool {
self.peer_sets.num_negotiated_peers() >= self.config.max_peering_degree
}

/// Schedules a `NewConnectionsRequested` event if needed.
/// The event contains a list of peers to be excluded, which are connected or malicious.
fn check_and_request_new_connections(&mut self) {
let new_conns_needed = self.num_new_conns_needed();
if new_conns_needed == 0 {
return;
}
self.events
.push_back(ToSwarm::GenerateEvent(Event::NewConnectionsRequested {
amount: new_conns_needed,
exclude_peers: self
.peer_sets
.negotiated_or_malicious_peers()
.copied()
.collect(),
}));
}

/// Checks if an established stream (of the connection) should be denied.
/// This returns `true` in one of the following cases:
/// - The maximum peering degree is reached.
/// - The peer is malicious.
fn should_deny_stream(&self, peer_id: &PeerId) -> bool {
self.max_peering_degree_reached() || self.peer_sets.is_malicious(peer_id)
}

fn create_connection_handler(&self) -> BlendConnectionHandler<M> {
let monitor = self.config.conn_monitor_settings.as_ref().map(|settings| {
ConnectionMonitor::new(
Expand Down Expand Up @@ -184,10 +233,21 @@ where
/// Informs the behaviour about an event from the [`Swarm`].
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { .. }) => {
// TODO: Notify the connection handler to deny the stream if necessary
// - if max peering degree was reached.
// - if the peer has been detected as malicious.
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
..
}) => {
if self.should_deny_stream(&peer_id) {
// Notify the corresponding [`ConnectionHandler`] to close the stream.
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: FromBehaviour::CloseSubstreams,
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes before the calls of handle_established_outbound/inbound_connection right? Anyway, we should check if by the time this event is processed we didn't open a stream already. Because we are opening streams for any in/out-bound connections.
Is that the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since they don't have a proper documentation, I investigated their code.

This ConnectionEstablished event is triggered AFTER the handle_established_outbound/inbound_connection is called and the ConnectionHandler is returned from that.

It means that the FromBehaviour::CloseSubstreams that I'm scheduling here will be defintely delivered to the ConnectionHandler. If the ConnectionHandler already established inbound/outbound substreams, they will be replaced with the Dropped state, as I implemented in #988. Even if they're not initialized yet, they'll be set as Dropped and are never going to be initalized.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think in this case it is easier to just reject the connection in the handle_stablished_outbound/inboundfrom this behaviour anaway. Accepting a connection just to get it closed feels weird. Maybe I'm missing something here.

Copy link
Contributor Author

@youngjoon-lee youngjoon-lee Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. That was my initial implementation when I opened this PR, but changed it while having discussions in other PRs. Rejecting the connection itself is the recommended way by libp2p, as far as I understand. However, as I've been saying, if we have to consider the case where multiple behaviours share a connection, things are complicated, and libp2p don't give us a clear approach. We need to decide.

Copy link
Contributor Author

@youngjoon-lee youngjoon-lee Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the way libp2p recommends. I will revert the handle_established_outbound/inbound_connection to deny the connection.

Copy link
Contributor Author

@youngjoon-lee youngjoon-lee Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4680e94 I just updated this PR by using libp2p-allow-block-list and libp2p-connection-limits. It would be better to review this PR itself again, but it might be also easy to review the specific commit if you want.

// We don't update [`self.peer_sets`]` now.
// It will be updated when [`FromSwarm::ConnectionClosed`] is received.
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
Expand All @@ -201,11 +261,14 @@ where
// In both cases, we need to remove the peer from the list of connected peers,
// though it may be already removed from list by handling other events.
if remaining_established == 0 {
self.peers.remove(&peer_id);
self.peer_sets.set_disconnected(&peer_id);
self.check_and_request_new_connections();
}
}
_ => {}
}
};

self.try_wake();
}

/// Handles an event generated by the [`BlendConnectionHandler`]
Expand Down Expand Up @@ -245,34 +308,39 @@ where
self.events
.push_back(ToSwarm::GenerateEvent(Event::Message(message)));
}
// The connection was fully negotiated by the peer,
// The inbound connection was fully negotiated by the peer,
// which means that the peer supports the blend protocol.
ToBehaviour::FullyNegotiatedInbound => {
self.peer_sets.add_healthy(peer_id);
}
// The outbound connection was fully negotiated by the peer,
// which means that the peer supports the blend protocol.
ToBehaviour::FullyNegotiatedOutbound => {
self.peers.insert(peer_id);
self.peer_sets.add_healthy(peer_id);
}
ToBehaviour::NegotiationFailed => {
self.peers.remove(&peer_id);
self.peer_sets.set_disconnected(&peer_id);
self.check_and_request_new_connections();
}
ToBehaviour::MaliciousPeer => {
// TODO: Remove the peer from the connected peer list
// and add it to the malicious peer list,
// so that the peer is excluded from the future connection establishments.
// Also, notify the upper layer to try to dial new peers
// if we need more healthy peers.
tracing::debug!("Peer {:?} has been detected as malicious", peer_id);
self.peer_sets.set_malicious(peer_id);
self.check_and_request_new_connections();
}
ToBehaviour::UnhealthyPeer => {
// TODO: Remove the peer from the connected 'healthy' peer list.
// Also, notify the upper layer to try to dial new peers
// if we need more healthy peers.
tracing::debug!("Peer {:?} has been detected as unhealthy", peer_id);
self.peer_sets.set_unhealthy(peer_id);
self.check_and_request_new_connections();
}
ToBehaviour::IOError(error) => {
self.peers.remove(&peer_id);
self.peer_sets.set_disconnected(&peer_id);
self.events
.push_back(ToSwarm::GenerateEvent(Event::Error(Error::PeerIOError {
error,
peer_id,
connection_id,
})));
self.check_and_request_new_connections();
}
}

Expand Down
41 changes: 30 additions & 11 deletions nomos-blend/network/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ impl<Msg> BlendConnectionHandler<Msg> {
}
}

/// Mark the inbound/outbound substream state as Dropped.
/// Then the substream hold by the state will be dropped from memory.
/// As a result, Swarm will decrease the ref count to the connection,
/// and close the connection when the count is 0.
///
/// Also, this clears all pending messages and events
/// to avoid confusions for event recipients.
fn close_substreams(&mut self) {
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.outbound_msgs.clear();
self.pending_events_to_behaviour.clear();
}

fn try_wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
Expand All @@ -82,13 +96,20 @@ impl<Msg> BlendConnectionHandler<Msg> {
pub enum FromBehaviour {
/// A message to be sent to the connection.
Message(Vec<u8>),
/// Close inbound/outbound substreams.
/// This happens when [`crate::Behaviour`] determines that one of the followings is true.
/// - Max peering degree is reached.
/// - The peer has been detected as malicious.
CloseSubstreams,
}

#[derive(Debug)]
pub enum ToBehaviour {
/// An inbound substream has been successfully upgraded for the blend protocol.
FullyNegotiatedInbound,
/// An outbound substream has been successfully upgraded for the blend protocol.
FullyNegotiatedOutbound,
/// An outbound substream was failed to be upgraded for the blend protocol.
/// A substream was failed to be upgraded for the blend protocol.
NegotiationFailed,
/// A message has been received from the connection.
Message(Vec<u8>),
Expand Down Expand Up @@ -134,12 +155,7 @@ where
if let Poll::Ready(output) = monitor.poll(cx) {
match output {
ConnectionMonitorOutput::Malicious => {
// Mark the inbound/outbound substream state as Dropped.
// Then the substream hold by the state will be dropped from memory.
// As a result, Swarm will decrease the ref count to the connection,
// and close the connection when the count is 0.
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.close_substreams();
self.pending_events_to_behaviour
.push_back(ToBehaviour::MaliciousPeer);
}
Expand Down Expand Up @@ -190,8 +206,7 @@ where
tracing::error!(
"Failed to receive message from inbound stream: {e:?}. Dropping both inbound/outbound substreams"
);
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.close_substreams();
}
Poll::Pending => {
tracing::debug!("No message received from inbound stream yet. Waiting more...");
Expand Down Expand Up @@ -241,8 +256,7 @@ where
}
Poll::Ready(Err(e)) => {
tracing::error!("Failed to send message to outbound stream: {e:?}. Dropping both inbound and outbound substreams");
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.close_substreams();
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::IOError(e),
));
Expand Down Expand Up @@ -276,6 +290,9 @@ where
FromBehaviour::Message(msg) => {
self.outbound_msgs.push_back(msg);
}
FromBehaviour::CloseSubstreams => {
self.close_substreams();
}
}
}

Expand All @@ -296,6 +313,8 @@ where
tracing::debug!("FullyNegotiatedInbound: Creating inbound substream");
self.inbound_substream =
Some(InboundSubstreamState::PendingRecv(recv_msg(stream).boxed()));
self.pending_events_to_behaviour
.push_back(ToBehaviour::FullyNegotiatedInbound);
VALUE_FULLY_NEGOTIATED_INBOUND
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
Expand Down
Loading
Loading