Skip to content

Commit

Permalink
do not deny conns, instead deny streams
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjoon-lee committed Jan 22, 2025
1 parent e801d22 commit d9e2d21
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 75 deletions.
108 changes: 43 additions & 65 deletions nomos-blend/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use cached::{Cached, TimedCache};
use libp2p::{
core::Endpoint,
swarm::{
ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour,
NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
behaviour::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionId,
FromSwarm, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -175,22 +176,12 @@ where
}));
}

/// Checks if a new pending or established connection should be denied.
/// This returns `ConnectionDenied` with a specific reason in one of the following cases:
/// Checks if an established stream (of the connection) should be denied.
/// This returns `true` in one of the following cases:
/// - The maximum peering degree is reached.
/// - The peer is malicious.
fn should_deny_connection(&self, peer_id: Option<PeerId>) -> Option<ConnectionDenied> {
if self.max_peering_degree_reached() {
return Some(ConnectionDenied::new("Max peering degree reached"));
}

if let Some(peer_id) = peer_id {
if self.peer_sets.is_malicious(&peer_id) {
return Some(ConnectionDenied::new("Malicious peer"));
}
}

None
fn should_deny_stream(&self, peer_id: &PeerId) -> bool {
self.max_peering_degree_reached() || self.peer_sets.is_malicious(peer_id)
}

fn create_connection_handler(
Expand Down Expand Up @@ -222,76 +213,63 @@ where
BlendConnectionHandler<M, <IntervalProvider as IntervalStreamProvider>::Stream>;
type ToSwarm = Event;

fn handle_pending_inbound_connection(
&mut self,
_: ConnectionId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<(), ConnectionDenied> {
if let Some(deny) = self.should_deny_connection(None) {
return Err(deny);
}
Ok(())
}

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
peer_id: PeerId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
if let Some(deny) = self.should_deny_connection(Some(peer_id)) {
return Err(deny);
}
Ok(self.create_connection_handler())
}

fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
if let Some(deny) = self.should_deny_connection(None) {
return Err(deny);
}
Ok(vec![])
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
peer_id: PeerId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
if let Some(deny) = self.should_deny_connection(Some(peer_id)) {
return Err(deny);
}
Ok(self.create_connection_handler())
}

/// Informs the behaviour about an event from the [`Swarm`].
fn on_swarm_event(&mut self, event: FromSwarm) {
if let FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
..
}) = event
{
// This event happens in one of the following cases:
// 1. The connection was closed by the peer.
// 2. The connection was closed by the local node since no stream is active.
//
// In both cases, we need to remove the peer from the list of connected peers,
// though it may be already removed from list by handling other events.
if remaining_established == 0 {
self.peer_sets.set_disconnected(&peer_id);
self.check_and_request_new_connections();
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
..
}) => {
if self.should_deny_stream(&peer_id) {
// Notify the corresponding [`ConnectionHandler`] to close the stream.
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: FromBehaviour::CloseSubstreams,
});
// We don't update [`self.peer_sets`]` now.
// It will be updated when [`FromSwarm::ConnectionClosed`] is received.
}
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
..
}) => {
// This event happens in one of the following cases:
// 1. The connection was closed by the peer.
// 2. The connection was closed by the local node since no stream is active.
//
// In both cases, we need to remove the peer from the list of connected peers,
// though it may be already removed from list by handling other events.
if remaining_established == 0 {
self.peer_sets.set_disconnected(&peer_id);
self.check_and_request_new_connections();
}
}
_ => {}
};

self.try_wake();
}
Expand Down
35 changes: 25 additions & 10 deletions nomos-blend/network/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ impl<Msg, Interval> BlendConnectionHandler<Msg, Interval> {
}
}

/// Mark the inbound/outbound substream state as Dropped.
/// Then the substream hold by the state will be dropped from memory.
/// As a result, Swarm will decrease the ref count to the connection,
/// and close the connection when the count is 0.
///
/// Also, this clears all pending messages and events
/// to avoid confusions for event recipients.
fn close_substreams(&mut self) {
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.outbound_msgs.clear();
self.pending_events_to_behaviour.clear();
}

fn try_wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
Expand All @@ -80,6 +94,11 @@ impl<Msg, Interval> BlendConnectionHandler<Msg, Interval> {
pub enum FromBehaviour {
/// A message to be sent to the connection.
Message(Vec<u8>),
/// Close inbound/outbound substreams.
/// This happens when [`crate::Behaviour`] determines that one of the followings is true.
/// - Max peering degree is reached.
/// - The peer has been detected as malicious.
CloseSubstreams,
}

#[derive(Debug)]
Expand Down Expand Up @@ -135,12 +154,7 @@ where
if let Poll::Ready(output) = monitor.poll(cx) {
match output {
ConnectionMonitorOutput::Malicious => {
// Mark the inbound/outbound substream state as Dropped.
// Then the substream hold by the state will be dropped from memory.
// As a result, Swarm will decrease the ref count to the connection,
// and close the connection when the count is 0.
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.close_substreams();
self.pending_events_to_behaviour
.push_back(ToBehaviour::MaliciousPeer);
}
Expand Down Expand Up @@ -191,8 +205,7 @@ where
tracing::error!(
"Failed to receive message from inbound stream: {e:?}. Dropping both inbound/outbound substreams"
);
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.close_substreams();
}
Poll::Pending => {
tracing::debug!("No message received from inbound stream yet. Waiting more...");
Expand Down Expand Up @@ -242,8 +255,7 @@ where
}
Poll::Ready(Err(e)) => {
tracing::error!("Failed to send message to outbound stream: {e:?}. Dropping both inbound and outbound substreams");
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.close_substreams();
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::IOError(e),
));
Expand Down Expand Up @@ -277,6 +289,9 @@ where
FromBehaviour::Message(msg) => {
self.outbound_msgs.push_back(msg);
}
FromBehaviour::CloseSubstreams => {
self.close_substreams();
}
}
}

Expand Down

0 comments on commit d9e2d21

Please sign in to comment.