diff --git a/Dockerfile b/Dockerfile index f8c0a8904..c31492301 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # BUILD IMAGE --------------------------------------------------------- -FROM rust:1.82.0-slim-bookworm AS builder +FROM rust:1.84.0-slim-bookworm AS builder WORKDIR /nomos COPY . . diff --git a/ci/Dockerfile b/ci/Dockerfile index fbee5ea00..426237813 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.82.0-slim-bookworm +FROM rust:1.84.0-slim-bookworm LABEL maintainer="augustinas@status.im" \ source="https://github.com/logos-co/nomos-node" \ diff --git a/nomos-blend/core/src/membership.rs b/nomos-blend/core/src/membership.rs index 0305af0fe..61a19fcdc 100644 --- a/nomos-blend/core/src/membership.rs +++ b/nomos-blend/core/src/membership.rs @@ -64,11 +64,11 @@ where &self, rng: &mut R, amount: usize, - exclude_addrs: &HashSet<Multiaddr>, + exclude_peers: &HashSet<NodeId>, ) -> Vec<&Node<NodeId, M::PublicKey>> { self.remote_nodes .iter() - .filter(|node| !exclude_addrs.contains(&node.address)) + .filter(|node| !exclude_peers.contains(&node.id)) .choose_multiple(rng, amount) } diff --git a/nomos-blend/network/Cargo.toml b/nomos-blend/network/Cargo.toml index 7888f0ec2..604569a53 100644 --- a/nomos-blend/network/Cargo.toml +++ b/nomos-blend/network/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" cached = "0.53.1" futures = "0.3.30" futures-timer = "3.0.3" -libp2p = "0.53" +libp2p = "0.55" tracing = "0.1" nomos-blend = { path = "../core" } nomos-blend-message = { path = "../message" } @@ -20,7 +20,7 @@ tokio-stream = { version = "0.1", optional = true } [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } tokio-stream = "0.1" -libp2p = { version = "0.53", features = ["ed25519", "tokio", "quic"] } +libp2p = { version = "0.55", features = ["ed25519", "tokio", "quic"] } tracing-subscriber = "0.3.18" fixed = "1" diff --git a/nomos-blend/network/src/behaviour.rs b/nomos-blend/network/src/behaviour.rs index fd13a647c..46df8b33b 100644 --- a/nomos-blend/network/src/behaviour.rs +++ b/nomos-blend/network/src/behaviour.rs @@ -4,11 +4,10 @@ use crate::{ }; use cached::{Cached, TimedCache}; use libp2p::{ - core::Endpoint, + core::{transport::PortUse, Endpoint}, swarm::{ - behaviour::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionId, - FromSwarm, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, - ToSwarm, + ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, + NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, Multiaddr, PeerId, }; @@ -16,7 +15,7 @@ use nomos_blend::conn_maintenance::{ConnectionMonitor, ConnectionMonitorSettings use nomos_blend_message::BlendMessage; use sha2::{Digest, Sha256}; use std::{ - collections::{HashSet, VecDeque}, + collections::{HashMap, VecDeque}, task::{Context, Poll, Waker}, }; use std::{marker::PhantomData, time::Duration}; @@ -30,7 +29,7 @@ where IntervalProvider: IntervalStreamProvider, { config: Config, - peers: HashSet<PeerId>, + negotiated_peers: HashMap<PeerId, NegotiatedPeerState>, /// Queue of events to yield to the swarm. events: VecDeque<ToSwarm<Event, FromBehaviour>>, /// Waker that handles polling @@ -42,6 +41,12 @@ where _interval_provider: PhantomData<IntervalProvider>, } +#[derive(Debug, Eq, PartialEq)] +enum NegotiatedPeerState { + Healthy, + Unhealthy, +} + #[derive(Debug)] pub struct Config { pub duplicate_cache_lifespan: u64, @@ -52,6 +57,10 @@ pub struct Config { pub enum Event { /// A message received from one of the peers. Message(Vec<u8>), + /// A peer has been detected as malicious. + MaliciousPeer(PeerId), + /// A peer has been detected as unhealthy. + UnhealthyPeer(PeerId), Error(Error), } @@ -65,7 +74,7 @@ where let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); Self { config, - peers: HashSet::new(), + negotiated_peers: HashMap::new(), events: VecDeque::new(), waker: None, duplicate_cache, @@ -103,30 +112,29 @@ where message: Vec<u8>, excluded_peer: Option<PeerId>, ) -> Result<(), Error> { - let mut peer_ids = self.peers.clone(); - if let Some(peer) = &excluded_peer { - peer_ids.remove(peer); - } - - if peer_ids.is_empty() { - return Err(Error::NoPeers); - } - - peer_ids.into_iter().for_each(|peer_id| { - tracing::debug!("Registering event for peer {:?} to send msg", peer_id); - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: FromBehaviour::Message(message.clone()), + let mut num_peers = 0; + self.negotiated_peers + .keys() + .filter(|peer_id| match excluded_peer { + Some(excluded_peer) => **peer_id != excluded_peer, + None => true, + }) + .for_each(|peer_id| { + tracing::debug!("Registering event for peer {:?} to send msg", peer_id); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::Any, + event: FromBehaviour::Message(message.clone()), + }); + num_peers += 1; }); - }); - self.try_wake(); - Ok(()) - } - - pub fn peers(&self) -> &HashSet<PeerId> { - &self.peers + if num_peers == 0 { + Err(Error::NoPeers) + } else { + self.try_wake(); + Ok(()) + } } fn message_id(message: &[u8]) -> Vec<u8> { @@ -135,6 +143,13 @@ where hasher.finalize().to_vec() } + pub fn num_healthy_peers(&self) -> usize { + self.negotiated_peers + .iter() + .filter(|(_, state)| **state == NegotiatedPeerState::Healthy) + .count() + } + fn create_connection_handler(&self) -> BlendConnectionHandler<M> { let monitor = self.config.conn_monitor_settings.as_ref().map(|settings| { ConnectionMonitor::new( @@ -177,35 +192,31 @@ where _: PeerId, _: &Multiaddr, _: Endpoint, + _: PortUse, ) -> Result<THandler<Self>, ConnectionDenied> { Ok(self.create_connection_handler()) } /// Informs the behaviour about an event from the [`Swarm`]. fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(ConnectionEstablished { .. }) => { - // TODO: Notify the connection handler to deny the stream if necessary - // - if max peering degree was reached. - // - if the peer has been detected as malicious. - } - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - remaining_established, - .. - }) => { - // This event happens in one of the following cases: - // 1. The connection was closed by the peer. - // 2. The connection was closed by the local node since no stream is active. - // - // In both cases, we need to remove the peer from the list of connected peers, - // though it may be already removed from list by handling other events. - if remaining_established == 0 { - self.peers.remove(&peer_id); - } + if let FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + remaining_established, + .. + }) = event + { + // This event happens in one of the following cases: + // 1. The connection was closed by the peer. + // 2. The connection was closed by the local node since no stream is active. + // + // In both cases, we need to remove the peer from the list of connected peers, + // though it may be already removed from list by handling other events. + if remaining_established == 0 { + self.negotiated_peers.remove(&peer_id); } - _ => {} - } + }; + + self.try_wake(); } /// Handles an event generated by the [`BlendConnectionHandler`] @@ -245,28 +256,31 @@ where self.events .push_back(ToSwarm::GenerateEvent(Event::Message(message))); } - // The connection was fully negotiated by the peer, + // The inbound/outbound connection was fully negotiated by the peer, // which means that the peer supports the blend protocol. - ToBehaviour::FullyNegotiatedOutbound => { - self.peers.insert(peer_id); + ToBehaviour::FullyNegotiatedInbound | ToBehaviour::FullyNegotiatedOutbound => { + self.negotiated_peers + .insert(peer_id, NegotiatedPeerState::Healthy); } - ToBehaviour::NegotiationFailed => { - self.peers.remove(&peer_id); + ToBehaviour::DialUpgradeError(_) => { + self.negotiated_peers.remove(&peer_id); } ToBehaviour::MaliciousPeer => { - // TODO: Remove the peer from the connected peer list - // and add it to the malicious peer list, - // so that the peer is excluded from the future connection establishments. - // Also, notify the upper layer to try to dial new peers - // if we need more healthy peers. + tracing::debug!("Peer {:?} has been detected as malicious", peer_id); + self.negotiated_peers.remove(&peer_id); + self.events + .push_back(ToSwarm::GenerateEvent(Event::MaliciousPeer(peer_id))); } ToBehaviour::UnhealthyPeer => { - // TODO: Remove the peer from the connected 'healthy' peer list. - // Also, notify the upper layer to try to dial new peers - // if we need more healthy peers. + tracing::debug!("Peer {:?} has been detected as unhealthy", peer_id); + // TODO: Still the algorithm to revert the peer to healthy state is not defined yet. + self.negotiated_peers + .insert(peer_id, NegotiatedPeerState::Unhealthy); + self.events + .push_back(ToSwarm::GenerateEvent(Event::UnhealthyPeer(peer_id))); } ToBehaviour::IOError(error) => { - self.peers.remove(&peer_id); + self.negotiated_peers.remove(&peer_id); self.events .push_back(ToSwarm::GenerateEvent(Event::Error(Error::PeerIOError { error, diff --git a/nomos-blend/network/src/handler.rs b/nomos-blend/network/src/handler.rs index b11b3e5a7..77c5b193d 100644 --- a/nomos-blend/network/src/handler.rs +++ b/nomos-blend/network/src/handler.rs @@ -9,8 +9,10 @@ use futures::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt}; use libp2p::{ core::upgrade::ReadyUpgrade, swarm::{ - handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}, - ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol, + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + }, + ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol, }, Stream, StreamProtocol, }; @@ -71,6 +73,20 @@ impl<Msg> BlendConnectionHandler<Msg> { } } + /// Mark the inbound/outbound substream state as Dropped. + /// Then the substream hold by the state will be dropped from memory. + /// As a result, Swarm will decrease the ref count to the connection, + /// and close the connection when the count is 0. + /// + /// Also, this clears all pending messages and events + /// to avoid confusions for event recipients. + fn close_substreams(&mut self) { + self.inbound_substream = Some(InboundSubstreamState::Dropped); + self.outbound_substream = Some(OutboundSubstreamState::Dropped); + self.outbound_msgs.clear(); + self.pending_events_to_behaviour.clear(); + } + fn try_wake(&mut self) { if let Some(waker) = self.waker.take() { waker.wake(); @@ -82,14 +98,21 @@ impl<Msg> BlendConnectionHandler<Msg> { pub enum FromBehaviour { /// A message to be sent to the connection. Message(Vec<u8>), + /// Close inbound/outbound substreams. + /// This happens when [`crate::Behaviour`] determines that one of the followings is true. + /// - Max peering degree is reached. + /// - The peer has been detected as malicious. + CloseSubstreams, } #[derive(Debug)] pub enum ToBehaviour { + /// An inbound substream has been successfully upgraded for the blend protocol. + FullyNegotiatedInbound, /// An outbound substream has been successfully upgraded for the blend protocol. FullyNegotiatedOutbound, /// An outbound substream was failed to be upgraded for the blend protocol. - NegotiationFailed, + DialUpgradeError(DialUpgradeError<(), ReadyUpgrade<StreamProtocol>>), /// A message has been received from the connection. Message(Vec<u8>), /// Notifying that the peer is detected as malicious. @@ -109,14 +132,18 @@ where type FromBehaviour = FromBehaviour; type ToBehaviour = ToBehaviour; type InboundProtocol = ReadyUpgrade<StreamProtocol>; + #[allow(deprecated)] type InboundOpenInfo = (); type OutboundProtocol = ReadyUpgrade<StreamProtocol>; + #[allow(deprecated)] type OutboundOpenInfo = (); + #[allow(deprecated)] // Self::InboundOpenInfo is deprecated fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> { SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) } + #[allow(deprecated)] // Self::OutboundOpenInfo is deprecated fn poll( &mut self, cx: &mut Context<'_>, @@ -134,12 +161,7 @@ where if let Poll::Ready(output) = monitor.poll(cx) { match output { ConnectionMonitorOutput::Malicious => { - // Mark the inbound/outbound substream state as Dropped. - // Then the substream hold by the state will be dropped from memory. - // As a result, Swarm will decrease the ref count to the connection, - // and close the connection when the count is 0. - self.inbound_substream = Some(InboundSubstreamState::Dropped); - self.outbound_substream = Some(OutboundSubstreamState::Dropped); + self.close_substreams(); self.pending_events_to_behaviour .push_back(ToBehaviour::MaliciousPeer); } @@ -190,8 +212,10 @@ where tracing::error!( "Failed to receive message from inbound stream: {e:?}. Dropping both inbound/outbound substreams" ); - self.inbound_substream = Some(InboundSubstreamState::Dropped); - self.outbound_substream = Some(OutboundSubstreamState::Dropped); + self.close_substreams(); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::IOError(e), + )); } Poll::Pending => { tracing::debug!("No message received from inbound stream yet. Waiting more..."); @@ -241,8 +265,7 @@ where } Poll::Ready(Err(e)) => { tracing::error!("Failed to send message to outbound stream: {e:?}. Dropping both inbound and outbound substreams"); - self.outbound_substream = Some(OutboundSubstreamState::Dropped); - self.inbound_substream = Some(InboundSubstreamState::Dropped); + self.close_substreams(); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( ToBehaviour::IOError(e), )); @@ -276,9 +299,13 @@ where FromBehaviour::Message(msg) => { self.outbound_msgs.push_back(msg); } + FromBehaviour::CloseSubstreams => { + self.close_substreams(); + } } } + #[allow(deprecated)] // Self::InboundOpenInfo and Self::OutboundOpenInfo are deprecated fn on_connection_event( &mut self, event: ConnectionEvent< @@ -296,6 +323,8 @@ where tracing::debug!("FullyNegotiatedInbound: Creating inbound substream"); self.inbound_substream = Some(InboundSubstreamState::PendingRecv(recv_msg(stream).boxed())); + self.pending_events_to_behaviour + .push_back(ToBehaviour::FullyNegotiatedInbound); VALUE_FULLY_NEGOTIATED_INBOUND } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { @@ -310,24 +339,9 @@ where } ConnectionEvent::DialUpgradeError(e) => { tracing::error!("DialUpgradeError: {:?}", e); - match e.error { - StreamUpgradeError::NegotiationFailed => { - self.pending_events_to_behaviour - .push_back(ToBehaviour::NegotiationFailed); - } - StreamUpgradeError::Io(e) => { - self.pending_events_to_behaviour - .push_back(ToBehaviour::IOError(e)); - } - StreamUpgradeError::Timeout => { - self.pending_events_to_behaviour - .push_back(ToBehaviour::IOError(io::Error::new( - io::ErrorKind::TimedOut, - "blend protocol negotiation timed out", - ))); - } - StreamUpgradeError::Apply(_) => unreachable!(), - }; + self.pending_events_to_behaviour + .push_back(ToBehaviour::DialUpgradeError(e)); + self.close_substreams(); VALUE_DIAL_UPGRADE_ERROR } event => { diff --git a/nomos-blend/network/src/lib.rs b/nomos-blend/network/src/lib.rs index 2e1e83b15..8618cb976 100644 --- a/nomos-blend/network/src/lib.rs +++ b/nomos-blend/network/src/lib.rs @@ -150,9 +150,14 @@ mod test { // Swarm2 sends a message to Swarm1, even though expected_effective_messages is 0. // Then, Swarm1 should detect Swarm2 as a malicious peer. let task = async { + let mut num_events_waiting = 2; let mut msg_published = false; let mut publish_try_interval = tokio::time::interval(Duration::from_millis(10)); loop { + if num_events_waiting == 0 { + break; + } + select! { _ = publish_try_interval.tick() => { if !msg_published { @@ -160,11 +165,20 @@ mod test { } } event = swarm1.select_next_some() => { - if let SwarmEvent::ConnectionClosed { peer_id, num_established, .. } = event { - assert_eq!(peer_id, *swarm2.local_peer_id()); - assert_eq!(num_established, 0); - assert!(swarm1.connected_peers().next().is_none()); - break; + match event { + // We expect the behaviour reports a malicious peer. + SwarmEvent::Behaviour(Event::MaliciousPeer(peer_id)) => { + assert_eq!(peer_id, *swarm2.local_peer_id()); + num_events_waiting -= 1; + }, + // We expect that the Swarm1 closes the connection proactively. + SwarmEvent::ConnectionClosed { peer_id, num_established, .. } => { + assert_eq!(peer_id, *swarm2.local_peer_id()); + assert_eq!(num_established, 0); + assert!(swarm1.connected_peers().next().is_none()); + num_events_waiting -= 1; + }, + _ => {}, } } _ = swarm2.select_next_some() => {} @@ -181,6 +195,71 @@ mod test { .is_ok()); } + #[tokio::test] + async fn detect_unhealthy_peer() { + // Init two swarms with connection monitoring enabled. + let conn_monitor_settings = ConnectionMonitorSettings { + interval: Duration::from_secs(1), + expected_effective_messages: U57F7::from_num(1.0), + effective_message_tolerance: U57F7::from_num(0.0), + expected_drop_messages: U57F7::from_num(0.0), + drop_message_tolerance: U57F7::from_num(0.0), + }; + let (mut nodes, mut keypairs) = nodes(2, 8390); + let node1_addr = nodes.next().unwrap().address; + let mut swarm1 = new_blend_swarm( + keypairs.next().unwrap(), + node1_addr.clone(), + Some(conn_monitor_settings), + ); + let mut swarm2 = new_blend_swarm( + keypairs.next().unwrap(), + nodes.next().unwrap().address, + Some(conn_monitor_settings), + ); + swarm2.dial(node1_addr).unwrap(); + + // Swarms don't send anything, even though expected_effective_messages is 1. + // Then, both should detect the other as unhealthy. + // Swarms shouldn't close the connection of the unhealthy peers. + let task = async { + let mut num_events_waiting = 2; + loop { + if num_events_waiting == 0 { + break; + } + + select! { + event = swarm1.select_next_some() => { + if let SwarmEvent::Behaviour(Event::UnhealthyPeer(peer_id)) = event { + assert_eq!(peer_id, *swarm2.local_peer_id()); + num_events_waiting -= 1; + } + } + event = swarm2.select_next_some() => { + if let SwarmEvent::Behaviour(Event::UnhealthyPeer(peer_id)) = event { + assert_eq!(peer_id, *swarm1.local_peer_id()); + num_events_waiting -= 1; + } + } + } + } + + assert_eq!(swarm1.behaviour().num_healthy_peers(), 0); + assert_eq!(swarm1.connected_peers().count(), 1); + assert_eq!(swarm2.behaviour().num_healthy_peers(), 0); + assert_eq!(swarm2.connected_peers().count(), 1); + }; + + // Expect for the task to be completed in time + assert!(tokio::time::timeout( + conn_monitor_settings.interval + Duration::from_secs(1), + task + ) + .await + .is_ok()); + } + fn new_blend_swarm( keypair: Keypair, addr: Multiaddr, diff --git a/nomos-libp2p/Cargo.toml b/nomos-libp2p/Cargo.toml index e9c2149eb..3c9df768c 100644 --- a/nomos-libp2p/Cargo.toml +++ b/nomos-libp2p/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" multiaddr = "0.18" tokio = { version = "1", features = ["sync", "macros"] } futures = "0.3" -libp2p = { version = "0.53.2", features = [ +libp2p = { version = "0.55", features = [ "dns", "macros", "gossipsub", diff --git a/nomos-libp2p/src/lib.rs b/nomos-libp2p/src/lib.rs index 7e429ac47..4f1a806d2 100644 --- a/nomos-libp2p/src/lib.rs +++ b/nomos-libp2p/src/lib.rs @@ -117,7 +117,7 @@ impl Swarm { /// Unsubscribes from a topic /// /// Returns true if previously subscribed - pub fn unsubscribe(&mut self, topic: &str) -> Result<bool, PublishError> { + pub fn unsubscribe(&mut self, topic: &str) -> bool { self.swarm .behaviour_mut() .gossipsub diff --git a/nomos-services/blend/Cargo.toml b/nomos-services/blend/Cargo.toml index 98dfabcef..7140084b5 100644 --- a/nomos-services/blend/Cargo.toml +++ b/nomos-services/blend/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] async-trait = "0.1" futures = "0.3" -libp2p = { version = "0.53", features = ["ed25519"] } +libp2p = { version = "0.55", features = ["ed25519"] } nomos-libp2p = { path = "../../nomos-libp2p", optional = true } nomos-blend = { path = "../../nomos-blend/core" } nomos-core = { path = "../../nomos-core/chain-defs" } diff --git a/nomos-services/blend/src/backends/libp2p.rs b/nomos-services/blend/src/backends/libp2p.rs index d56d54cd1..8d5934502 100644 --- a/nomos-services/blend/src/backends/libp2p.rs +++ b/nomos-services/blend/src/backends/libp2p.rs @@ -1,9 +1,11 @@ -use std::{pin::Pin, time::Duration}; +use std::{collections::HashSet, pin::Pin, time::Duration}; use super::BlendBackend; use async_trait::async_trait; use futures::{Stream, StreamExt}; use libp2p::{ + allow_block_list::BlockedPeers, + connection_limits::ConnectionLimits, identity::{ed25519, Keypair}, swarm::SwarmEvent, Multiaddr, PeerId, Swarm, SwarmBuilder, @@ -11,7 +13,7 @@ use libp2p::{ use nomos_blend::{conn_maintenance::ConnectionMonitorSettings, membership::Membership}; use nomos_blend_message::sphinx::SphinxMessage; use nomos_blend_network::TokioIntervalStreamProvider; -use nomos_libp2p::secret_key_serde; +use nomos_libp2p::{secret_key_serde, NetworkBehaviour}; use overwatch_rs::overwatch::handle::OverwatchHandle; use rand::RngCore; use serde::{Deserialize, Serialize}; @@ -96,10 +98,44 @@ impl BlendBackend for Libp2pBlendBackend { } } -struct BlendSwarm { - swarm: Swarm<nomos_blend_network::Behaviour<SphinxMessage, TokioIntervalStreamProvider>>, +struct BlendSwarm<R> { + swarm: Swarm<BlendBehaviour>, swarm_messages_receiver: mpsc::Receiver<BlendSwarmMessage>, incoming_message_sender: broadcast::Sender<Vec<u8>>, + // TODO: Instead of holding the membership, we just want a way to get the list of addresses. + membership: Membership<PeerId, SphinxMessage>, + rng: R, + peering_degree: u16, +} + +#[derive(NetworkBehaviour)] +struct BlendBehaviour { + blend: nomos_blend_network::Behaviour<SphinxMessage, TokioIntervalStreamProvider>, + limits: libp2p::connection_limits::Behaviour, + blocked_peers: libp2p::allow_block_list::Behaviour<BlockedPeers>, +} + +impl BlendBehaviour { + fn new(config: &Libp2pBlendBackendSettings) -> Self { + BlendBehaviour { + blend: + nomos_blend_network::Behaviour::<SphinxMessage, TokioIntervalStreamProvider>::new( + nomos_blend_network::Config { + duplicate_cache_lifespan: 60, + conn_monitor_settings: config.conn_monitor, + }, + ), + limits: libp2p::connection_limits::Behaviour::new( + ConnectionLimits::default() + .with_max_established(Some(config.max_peering_degree as u32)) + .with_max_established_incoming(Some(config.max_peering_degree as u32)) + .with_max_established_outgoing(Some(config.max_peering_degree as u32)) + // Blend protocol restricts the number of connections per peer to 1. + .with_max_established_per_peer(Some(1)), + ), + blocked_peers: libp2p::allow_block_list::Behaviour::default(), + } + } } #[derive(Debug)] @@ -107,32 +143,27 @@ pub enum BlendSwarmMessage { Publish(Vec<u8>), } -impl BlendSwarm { - fn new<R>( +impl<R> BlendSwarm<R> +where + R: RngCore, +{ + fn new( config: Libp2pBlendBackendSettings, membership: Membership<PeerId, SphinxMessage>, mut rng: R, swarm_messages_receiver: mpsc::Receiver<BlendSwarmMessage>, incoming_message_sender: broadcast::Sender<Vec<u8>>, - ) -> Self - where - R: RngCore, - { + ) -> Self { let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone())); let mut swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() .with_quic() - .with_behaviour(|_| { - nomos_blend_network::Behaviour::<SphinxMessage, TokioIntervalStreamProvider>::new( - nomos_blend_network::Config { - duplicate_cache_lifespan: 60, - conn_monitor_settings: config.conn_monitor, - }, - ) - }) + .with_behaviour(|_| BlendBehaviour::new(&config)) .expect("Blend Behaviour should be built") .with_swarm_config(|cfg| { - cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) + // The idle timeout starts ticking once there are no active streams on a connection. + // We want the connection to be closed as soon as all streams are dropped. + cfg.with_idle_connection_timeout(Duration::ZERO) }) .build(); @@ -156,6 +187,9 @@ impl BlendSwarm { swarm, swarm_messages_receiver, incoming_message_sender, + membership, + rng, + peering_degree: config.peering_degree, } } @@ -176,7 +210,7 @@ impl BlendSwarm { match msg { BlendSwarmMessage::Publish(msg) => { let msg_size = msg.len(); - if let Err(e) = self.swarm.behaviour_mut().publish(msg) { + if let Err(e) = self.swarm.behaviour_mut().blend.publish(msg) { tracing::error!("Failed to publish message to blend network: {e:?}"); tracing::info!(counter.failed_outbound_messages = 1); } else { @@ -187,9 +221,11 @@ impl BlendSwarm { } } - fn handle_event(&mut self, event: SwarmEvent<nomos_blend_network::Event>) { + fn handle_event(&mut self, event: SwarmEvent<BlendBehaviourEvent>) { match event { - SwarmEvent::Behaviour(nomos_blend_network::Event::Message(msg)) => { + SwarmEvent::Behaviour(BlendBehaviourEvent::Blend( + nomos_blend_network::Event::Message(msg), + )) => { tracing::debug!("Received message from a peer: {msg:?}"); let msg_size = msg.len(); @@ -201,14 +237,71 @@ impl BlendSwarm { tracing::info!(histogram.received_data = msg_size as u64); } } - SwarmEvent::Behaviour(nomos_blend_network::Event::Error(e)) => { + SwarmEvent::Behaviour(BlendBehaviourEvent::Blend( + nomos_blend_network::Event::MaliciousPeer(peer_id), + )) => { + tracing::debug!("Peer {} is malicious", peer_id); + self.swarm.behaviour_mut().blocked_peers.block_peer(peer_id); + self.check_and_dial_new_peers(); + } + SwarmEvent::Behaviour(BlendBehaviourEvent::Blend( + nomos_blend_network::Event::UnhealthyPeer(peer_id), + )) => { + tracing::debug!("Peer {} is unhealthy", peer_id); + self.check_and_dial_new_peers(); + } + SwarmEvent::Behaviour(BlendBehaviourEvent::Blend( + nomos_blend_network::Event::Error(e), + )) => { tracing::error!("Received error from blend network: {e:?}"); + self.check_and_dial_new_peers(); tracing::info!(counter.error = 1); } + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + .. + } => { + tracing::error!( + "Connection closed: peer:{}, conn_id:{}", + peer_id, + connection_id + ); + self.check_and_dial_new_peers(); + } _ => { tracing::debug!("Received event from blend network: {event:?}"); tracing::info!(counter.ignored_event = 1); } } } + + /// Dial new peers, if necessary, to maintain the peering degree. + /// We aim to have at least the peering degree number of "healthy" peers. + fn check_and_dial_new_peers(&mut self) { + let num_new_conns_needed = (self.peering_degree as usize) + .saturating_sub(self.swarm.behaviour().blend.num_healthy_peers()); + if num_new_conns_needed > 0 { + self.dial_random_peers(num_new_conns_needed); + } + } + + /// Dial random peers from the membership list, + /// excluding the currently connected peers and the blocked peers. + fn dial_random_peers(&mut self, amount: usize) { + let exclude_peers: HashSet<PeerId> = self + .swarm + .connected_peers() + .chain(self.swarm.behaviour().blocked_peers.blocked_peers()) + .copied() + .collect(); + self.membership + .filter_and_choose_remote_nodes(&mut self.rng, amount, &exclude_peers) + .iter() + .for_each(|peer| { + if let Err(e) = self.swarm.dial(peer.address.clone()) { + tracing::error!("Failed to dial a peer: {e:?}"); + } + }); + } } diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 26b9a09b3..71827365d 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -27,7 +27,7 @@ nomos-blend-service = { path = "../../blend" } nomos-blend = { path = "../../../nomos-blend/core" } nomos-blend-message = { path = "../../../nomos-blend/message" } nomos-libp2p = { path = "../../../nomos-libp2p" } -libp2p = { version = "0.53.2", features = ["ed25519"] } +libp2p = { version = "0.55", features = ["ed25519"] } once_cell = "1.19" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "4160cdd" } diff --git a/nomos-services/network/src/backends/libp2p/swarm.rs b/nomos-services/network/src/backends/libp2p/swarm.rs index 5fce1ac95..b10906fdb 100644 --- a/nomos-services/network/src/backends/libp2p/swarm.rs +++ b/nomos-services/network/src/backends/libp2p/swarm.rs @@ -138,7 +138,7 @@ impl SwarmHandler { } Command::Unsubscribe(topic) => { tracing::debug!("unsubscribing to topic: {topic}"); - log_error!(self.swarm.unsubscribe(&topic)); + self.swarm.unsubscribe(&topic); } Command::Info { reply } => { let swarm = self.swarm.swarm(); diff --git a/testnet/Dockerfile b/testnet/Dockerfile index a90d5fea6..b753bff39 100644 --- a/testnet/Dockerfile +++ b/testnet/Dockerfile @@ -1,6 +1,6 @@ # BUILD IMAGE --------------------------------------------------------- -FROM rust:1.82.0-slim-bookworm AS builder +FROM rust:1.84.0-slim-bookworm AS builder WORKDIR /nomos COPY . .