From 15a697343dfc817e9085fef238ff1b72c2769dc3 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Mon, 20 Jan 2025 15:10:52 +0900 Subject: [PATCH] refactor(blend): drop streams if malicious or on error --- nomos-blend/network/src/behaviour.rs | 12 ++++- nomos-blend/network/src/handler.rs | 66 ++++++++++++++++++++++------ nomos-blend/network/src/lib.rs | 64 +++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 15 deletions(-) diff --git a/nomos-blend/network/src/behaviour.rs b/nomos-blend/network/src/behaviour.rs index 4fde05409..e838deb59 100644 --- a/nomos-blend/network/src/behaviour.rs +++ b/nomos-blend/network/src/behaviour.rs @@ -125,6 +125,10 @@ where Ok(()) } + pub fn peers(&self) -> &HashSet { + &self.peers + } + fn message_id(message: &[u8]) -> Vec { let mut hasher = Sha256::new(); hasher.update(message); @@ -193,6 +197,12 @@ where remaining_established, .. }) => { + // This event happens in one of the following cases: + // 1. The connection was closed by the peer. + // 2. The connection was closed by the local node since no stream is active. + // + // In both cases, we need to remove the peer from the list of connected peers, + // though it may be already removed from list by handling other events. if remaining_established == 0 { self.peers.remove(&peer_id); } @@ -259,7 +269,7 @@ where // if we need more healthy peers. } ToBehaviour::IOError(error) => { - // TODO: Consider removing the peer from the connected_peers and closing the connection + self.peers.remove(&peer_id); self.events .push_back(ToSwarm::GenerateEvent(Event::Error(Error::PeerIOError { error, diff --git a/nomos-blend/network/src/handler.rs b/nomos-blend/network/src/handler.rs index 936cee0e7..2eefe5fe1 100644 --- a/nomos-blend/network/src/handler.rs +++ b/nomos-blend/network/src/handler.rs @@ -26,7 +26,7 @@ const VALUE_IGNORED: &str = "ignored"; const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/blend/0.1.0"); pub struct BlendConnectionHandler { - inbound_substream: Option, + inbound_substream: Option, outbound_substream: Option, outbound_msgs: VecDeque>, pending_events_to_behaviour: VecDeque, @@ -38,6 +38,13 @@ pub struct BlendConnectionHandler { type MsgSendFuture = BoxFuture<'static, Result>; type MsgRecvFuture = BoxFuture<'static, Result<(Stream, Vec), io::Error>>; +enum InboundSubstreamState { + /// A message is being received on the inbound substream. + PendingRecv(MsgRecvFuture), + /// A substream has been dropped proactively. + Dropped, +} + enum OutboundSubstreamState { /// A request to open a new outbound substream is being processed. PendingOpenSubstream, @@ -45,6 +52,8 @@ enum OutboundSubstreamState { Idle(Stream), /// A message is being sent on the outbound substream. PendingSend(MsgSendFuture), + /// A substream has been dropped proactively. + Dropped, } impl BlendConnectionHandler { @@ -82,10 +91,12 @@ pub enum ToBehaviour { /// A message has been received from the connection. Message(Vec), /// Notifying that the peer is detected as malicious. + /// The inbound/outbound streams to the peer are closed proactively. MaliciousPeer, /// Notifying that the peer is detected as unhealthy. UnhealthyPeer, - /// An IO error from the connection + /// An IO error from the connection. + /// The inbound/outbound streams to the peer are closed proactively. IOError(io::Error), } @@ -117,14 +128,17 @@ where ); // Check if the monitor interval has elapsed, if exists. + // TODO: Refactor this to a separate function. if let Some(monitor) = &mut self.monitor { if let Poll::Ready(output) = monitor.poll(cx) { match output { ConnectionMonitorOutput::Malicious => { - // TODO: Mark the inbound/outbound substream state as Dropped. + // Mark the inbound/outbound substream state as Dropped. // Then the substream hold by the state will be dropped from memory. // As a result, Swarm will decrease the ref count to the connection, // and close the connection when the count is 0. + self.inbound_substream = Some(InboundSubstreamState::Dropped); + self.outbound_substream = Some(OutboundSubstreamState::Dropped); self.pending_events_to_behaviour .push_back(ToBehaviour::MaliciousPeer); } @@ -143,9 +157,16 @@ where } // Process inbound stream + // TODO: Refactor this to a separate function. tracing::debug!("Processing inbound stream"); - if let Some(msg_recv_fut) = self.inbound_substream.as_mut() { - match msg_recv_fut.poll_unpin(cx) { + match self.inbound_substream.take() { + None => { + tracing::debug!("Inbound substream is not initialized yet. Doing nothing."); + self.inbound_substream = None; + } + Some(InboundSubstreamState::PendingRecv(mut msg_recv_fut)) => match msg_recv_fut + .poll_unpin(cx) + { Poll::Ready(Ok((stream, msg))) => { tracing::debug!("Received message from inbound stream. Notifying behaviour..."); @@ -158,21 +179,32 @@ where } } - self.inbound_substream = Some(recv_msg(stream).boxed()); + self.inbound_substream = + Some(InboundSubstreamState::PendingRecv(recv_msg(stream).boxed())); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( ToBehaviour::Message(msg), )); } Poll::Ready(Err(e)) => { - tracing::error!("Failed to receive message from inbound stream: {:?}", e); - // TODO: Mark the inbound/outbound substream state as Dropped. - self.inbound_substream = None; + tracing::error!( + "Failed to receive message from inbound stream: {e:?}. Dropping both inbound/outbound substreams" + ); + self.inbound_substream = Some(InboundSubstreamState::Dropped); + self.outbound_substream = Some(OutboundSubstreamState::Dropped); } - Poll::Pending => {} + Poll::Pending => { + tracing::debug!("No message received from inbound stream yet. Waiting more..."); + self.inbound_substream = Some(InboundSubstreamState::PendingRecv(msg_recv_fut)); + } + }, + Some(InboundSubstreamState::Dropped) => { + tracing::debug!("Inbound substream has been dropped proactively. Doing nothing."); + self.inbound_substream = Some(InboundSubstreamState::Dropped); } } // Process outbound stream + // TODO: Refactor this to a separate function. tracing::debug!("Processing outbound stream"); loop { match self.outbound_substream.take() { @@ -207,9 +239,9 @@ where self.outbound_substream = Some(OutboundSubstreamState::Idle(stream)); } Poll::Ready(Err(e)) => { - tracing::error!("Failed to send message to outbound stream: {:?}", e); - // TODO: Mark the inbound/outbound substream state as Dropped. - self.outbound_substream = None; + tracing::error!("Failed to send message to outbound stream: {e:?}. Dropping both inbound and outbound substreams"); + self.outbound_substream = Some(OutboundSubstreamState::Dropped); + self.inbound_substream = Some(InboundSubstreamState::Dropped); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( ToBehaviour::IOError(e), )); @@ -222,6 +254,11 @@ where } } } + Some(OutboundSubstreamState::Dropped) => { + tracing::debug!("Outbound substream has been dropped proactively"); + self.outbound_substream = Some(OutboundSubstreamState::Dropped); + return Poll::Pending; + } // If there is no outbound substream, request to open a new one. None => { self.outbound_substream = Some(OutboundSubstreamState::PendingOpenSubstream); @@ -256,7 +293,8 @@ where .. }) => { tracing::debug!("FullyNegotiatedInbound: Creating inbound substream"); - self.inbound_substream = Some(recv_msg(stream).boxed()); + self.inbound_substream = + Some(InboundSubstreamState::PendingRecv(recv_msg(stream).boxed())); VALUE_FULLY_NEGOTIATED_INBOUND } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { diff --git a/nomos-blend/network/src/lib.rs b/nomos-blend/network/src/lib.rs index 01cee7d68..f058433db 100644 --- a/nomos-blend/network/src/lib.rs +++ b/nomos-blend/network/src/lib.rs @@ -29,6 +29,7 @@ impl IntervalStreamProvider for TokioIntervalStreamProvider { mod test { use std::time::Duration; + use fixed::types::U57F7; use libp2p::{ futures::StreamExt, identity::Keypair, @@ -122,6 +123,64 @@ mod test { } } + #[tokio::test] + async fn detect_malicious_peer() { + // Init two swarms with connection monitoring enabled. + let conn_monitor_settings = ConnectionMonitorSettings { + interval: Duration::from_secs(1), + expected_effective_messages: U57F7::from_num(0.0), + effective_message_tolerance: U57F7::from_num(0.0), + expected_drop_messages: U57F7::from_num(0.0), + drop_message_tolerance: U57F7::from_num(0.0), + }; + let (mut nodes, mut keypairs) = nodes(2, 8290); + let node1_addr = nodes.next().unwrap().address; + let mut swarm1 = new_blend_swarm( + keypairs.next().unwrap(), + node1_addr.clone(), + Some(conn_monitor_settings), + ); + let mut swarm2 = new_blend_swarm( + keypairs.next().unwrap(), + nodes.next().unwrap().address, + Some(conn_monitor_settings), + ); + swarm2.dial(node1_addr).unwrap(); + + // Swarm2 sends a message to Swarm1, even though expected_effective_messages is 0. + // Then, Swarm1 should detect Swarm2 as a malicious peer. + let task = async { + let mut msg_published = false; + let mut publish_try_interval = tokio::time::interval(Duration::from_millis(10)); + loop { + select! { + _ = publish_try_interval.tick() => { + if !msg_published { + msg_published = swarm2.behaviour_mut().publish(vec![1; 10]).is_ok(); + } + } + event = swarm1.select_next_some() => { + if let SwarmEvent::ConnectionClosed { peer_id, num_established, .. } = event { + assert_eq!(peer_id, *swarm2.local_peer_id()); + assert_eq!(num_established, 0); + assert!(swarm1.connected_peers().next().is_none()); + break; + } + } + _ = swarm2.select_next_some() => {} + } + } + }; + + // Expect for the task to be completed in time + assert!(tokio::time::timeout( + conn_monitor_settings.interval + Duration::from_secs(1), + task + ) + .await + .is_ok()); + } + fn new_blend_swarm( keypair: Keypair, addr: Multiaddr, @@ -154,6 +213,11 @@ mod test { .unwrap() .with_behaviour(|_| behaviour) .unwrap() + .with_swarm_config(|config| { + // We want connections to be closed immediately as soon as + // the corresponding streams are dropped by behaviours. + config.with_idle_connection_timeout(Duration::ZERO) + }) .build(); swarm.listen_on(addr).unwrap(); swarm