Skip to content

Commit

Permalink
refactor(blend): drop streams if malicious or on error
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjoon-lee committed Jan 22, 2025
1 parent c626126 commit 018e6cf
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 15 deletions.
12 changes: 11 additions & 1 deletion nomos-blend/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ where
Ok(())
}

pub fn peers(&self) -> &HashSet<PeerId> {
&self.peers
}

fn message_id(message: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(message);
Expand Down Expand Up @@ -193,6 +197,12 @@ where
remaining_established,
..
}) => {
// This event happens in one of the following cases:
// 1. The connection was closed by the peer.
// 2. The connection was closed by the local node since no stream is active.
//
// In both cases, we need to remove the peer from the list of connected peers,
// though it may be already removed from list by handling other events.
if remaining_established == 0 {
self.peers.remove(&peer_id);
}
Expand Down Expand Up @@ -259,7 +269,7 @@ where
// if we need more healthy peers.
}
ToBehaviour::IOError(error) => {
// TODO: Consider removing the peer from the connected_peers and closing the connection
self.peers.remove(&peer_id);
self.events
.push_back(ToSwarm::GenerateEvent(Event::Error(Error::PeerIOError {
error,
Expand Down
66 changes: 52 additions & 14 deletions nomos-blend/network/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const VALUE_IGNORED: &str = "ignored";
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/blend/0.1.0");

pub struct BlendConnectionHandler<Msg, Interval> {
inbound_substream: Option<MsgRecvFuture>,
inbound_substream: Option<InboundSubstreamState>,
outbound_substream: Option<OutboundSubstreamState>,
outbound_msgs: VecDeque<Vec<u8>>,
pending_events_to_behaviour: VecDeque<ToBehaviour>,
Expand All @@ -38,13 +38,22 @@ pub struct BlendConnectionHandler<Msg, Interval> {
type MsgSendFuture = BoxFuture<'static, Result<Stream, io::Error>>;
type MsgRecvFuture = BoxFuture<'static, Result<(Stream, Vec<u8>), io::Error>>;

enum InboundSubstreamState {
/// A message is being received on the inbound substream.
PendingRecv(MsgRecvFuture),
/// A substream has been dropped proactively.
Dropped,
}

enum OutboundSubstreamState {
/// A request to open a new outbound substream is being processed.
PendingOpenSubstream,
/// An outbound substream is open and ready to send messages.
Idle(Stream),
/// A message is being sent on the outbound substream.
PendingSend(MsgSendFuture),
/// A substream has been dropped proactively.
Dropped,
}

impl<Msg, Interval> BlendConnectionHandler<Msg, Interval> {
Expand Down Expand Up @@ -82,10 +91,12 @@ pub enum ToBehaviour {
/// A message has been received from the connection.
Message(Vec<u8>),
/// Notifying that the peer is detected as malicious.
/// The inbound/outbound streams to the peer are closed proactively.
MaliciousPeer,
/// Notifying that the peer is detected as unhealthy.
UnhealthyPeer,
/// An IO error from the connection
/// An IO error from the connection.
/// The inbound/outbound streams to the peer are closed proactively.
IOError(io::Error),
}

Expand Down Expand Up @@ -117,14 +128,17 @@ where
);

// Check if the monitor interval has elapsed, if exists.
// TODO: Refactor this to a separate function.
if let Some(monitor) = &mut self.monitor {
if let Poll::Ready(output) = monitor.poll(cx) {
match output {
ConnectionMonitorOutput::Malicious => {
// TODO: Mark the inbound/outbound substream state as Dropped.
// Mark the inbound/outbound substream state as Dropped.
// Then the substream hold by the state will be dropped from memory.
// As a result, Swarm will decrease the ref count to the connection,
// and close the connection when the count is 0.
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.pending_events_to_behaviour
.push_back(ToBehaviour::MaliciousPeer);
}
Expand All @@ -143,9 +157,16 @@ where
}

// Process inbound stream
// TODO: Refactor this to a separate function.
tracing::debug!("Processing inbound stream");
if let Some(msg_recv_fut) = self.inbound_substream.as_mut() {
match msg_recv_fut.poll_unpin(cx) {
match self.inbound_substream.take() {
None => {
tracing::debug!("Inbound substream is not initialized yet. Doing nothing.");
self.inbound_substream = None;
}
Some(InboundSubstreamState::PendingRecv(mut msg_recv_fut)) => match msg_recv_fut
.poll_unpin(cx)
{
Poll::Ready(Ok((stream, msg))) => {
tracing::debug!("Received message from inbound stream. Notifying behaviour...");

Expand All @@ -158,21 +179,32 @@ where
}
}

self.inbound_substream = Some(recv_msg(stream).boxed());
self.inbound_substream =
Some(InboundSubstreamState::PendingRecv(recv_msg(stream).boxed()));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::Message(msg),
));
}
Poll::Ready(Err(e)) => {
tracing::error!("Failed to receive message from inbound stream: {:?}", e);
// TODO: Mark the inbound/outbound substream state as Dropped.
self.inbound_substream = None;
tracing::error!(
"Failed to receive message from inbound stream: {e:?}. Dropping both inbound/outbound substreams"
);
self.inbound_substream = Some(InboundSubstreamState::Dropped);
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
}
Poll::Pending => {}
Poll::Pending => {
tracing::debug!("No message received from inbound stream yet. Waiting more...");
self.inbound_substream = Some(InboundSubstreamState::PendingRecv(msg_recv_fut));
}
},
Some(InboundSubstreamState::Dropped) => {
tracing::debug!("Inbound substream has been dropped proactively. Doing nothing.");
self.inbound_substream = Some(InboundSubstreamState::Dropped);
}
}

// Process outbound stream
// TODO: Refactor this to a separate function.
tracing::debug!("Processing outbound stream");
loop {
match self.outbound_substream.take() {
Expand Down Expand Up @@ -207,9 +239,9 @@ where
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
}
Poll::Ready(Err(e)) => {
tracing::error!("Failed to send message to outbound stream: {:?}", e);
// TODO: Mark the inbound/outbound substream state as Dropped.
self.outbound_substream = None;
tracing::error!("Failed to send message to outbound stream: {e:?}. Dropping both inbound and outbound substreams");
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
self.inbound_substream = Some(InboundSubstreamState::Dropped);
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::IOError(e),
));
Expand All @@ -222,6 +254,11 @@ where
}
}
}
Some(OutboundSubstreamState::Dropped) => {
tracing::debug!("Outbound substream has been dropped proactively");
self.outbound_substream = Some(OutboundSubstreamState::Dropped);
return Poll::Pending;
}
// If there is no outbound substream, request to open a new one.
None => {
self.outbound_substream = Some(OutboundSubstreamState::PendingOpenSubstream);
Expand Down Expand Up @@ -256,7 +293,8 @@ where
..
}) => {
tracing::debug!("FullyNegotiatedInbound: Creating inbound substream");
self.inbound_substream = Some(recv_msg(stream).boxed());
self.inbound_substream =
Some(InboundSubstreamState::PendingRecv(recv_msg(stream).boxed()));
VALUE_FULLY_NEGOTIATED_INBOUND
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
Expand Down
64 changes: 64 additions & 0 deletions nomos-blend/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub use behaviour::{Behaviour, Config, Event, IntervalStreamProvider};
mod test {
use std::time::Duration;

use fixed::types::U57F7;
use libp2p::{
futures::StreamExt,
identity::Keypair,
Expand Down Expand Up @@ -105,6 +106,64 @@ mod test {
}
}

#[tokio::test]
async fn detect_malicious_peer() {
// Init two swarms with connection monitoring enabled.
let conn_monitor_settings = ConnectionMonitorSettings {
interval: Duration::from_secs(1),
expected_effective_messages: U57F7::from_num(0.0),
effective_message_tolerance: U57F7::from_num(0.0),
expected_drop_messages: U57F7::from_num(0.0),
drop_message_tolerance: U57F7::from_num(0.0),
};
let (mut nodes, mut keypairs) = nodes(2, 8290);
let node1_addr = nodes.next().unwrap().address;
let mut swarm1 = new_blend_swarm(
keypairs.next().unwrap(),
node1_addr.clone(),
Some(conn_monitor_settings),
);
let mut swarm2 = new_blend_swarm(
keypairs.next().unwrap(),
nodes.next().unwrap().address,
Some(conn_monitor_settings),
);
swarm2.dial(node1_addr).unwrap();

// Swarm2 sends a message to Swarm1, even though expected_effective_messages is 0.
// Then, Swarm1 should detect Swarm2 as a malicious peer.
let task = async {
let mut msg_published = false;
let mut publish_try_interval = tokio::time::interval(Duration::from_millis(10));
loop {
select! {
_ = publish_try_interval.tick() => {
if !msg_published {
msg_published = swarm2.behaviour_mut().publish(vec![1; 10]).is_ok();
}
}
event = swarm1.select_next_some() => {
if let SwarmEvent::ConnectionClosed { peer_id, num_established, .. } = event {
assert_eq!(peer_id, *swarm2.local_peer_id());
assert_eq!(num_established, 0);
assert!(swarm1.connected_peers().next().is_none());
break;
}
}
_ = swarm2.select_next_some() => {}
}
}
};

// Expect for the task to be completed in time
assert!(tokio::time::timeout(
conn_monitor_settings.interval + Duration::from_secs(1),
task
)
.await
.is_ok());
}

fn new_blend_swarm(
keypair: Keypair,
addr: Multiaddr,
Expand Down Expand Up @@ -137,6 +196,11 @@ mod test {
.unwrap()
.with_behaviour(|_| behaviour)
.unwrap()
.with_swarm_config(|config| {
// We want connections to be closed immediately as soon as
// the corresponding streams are dropped by behaviours.
config.with_idle_connection_timeout(Duration::ZERO)
})
.build();
swarm.listen_on(addr).unwrap();
swarm
Expand Down

0 comments on commit 018e6cf

Please sign in to comment.