Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blend): establish new connections, or deny new streams #989

Merged
merged 1 commit into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# BUILD IMAGE ---------------------------------------------------------

FROM rust:1.82.0-slim-bookworm AS builder
FROM rust:1.84.0-slim-bookworm AS builder
Copy link
Contributor Author

@youngjoon-lee youngjoon-lee Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

libp2p v0.55 requires Rust >=1.83.

Why libp2p v0.55? Please see #989 (comment)


WORKDIR /nomos
COPY . .
Expand Down
2 changes: 1 addition & 1 deletion ci/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.82.0-slim-bookworm
FROM rust:1.84.0-slim-bookworm

LABEL maintainer="[email protected]" \
source="https://github.com/logos-co/nomos-node" \
Expand Down
4 changes: 2 additions & 2 deletions nomos-blend/core/src/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ where
&self,
rng: &mut R,
amount: usize,
exclude_addrs: &HashSet<Multiaddr>,
exclude_peers: &HashSet<NodeId>,
) -> Vec<&Node<NodeId, M::PublicKey>> {
self.remote_nodes
.iter()
.filter(|node| !exclude_addrs.contains(&node.address))
.filter(|node| !exclude_peers.contains(&node.id))
.choose_multiple(rng, amount)
}

Expand Down
4 changes: 2 additions & 2 deletions nomos-blend/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
cached = "0.53.1"
futures = "0.3.30"
futures-timer = "3.0.3"
libp2p = "0.53"
libp2p = "0.55"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use all functions of libp2p-allow-block-list, we need to upgrade libp2p to 0.55: libp2p/rust-libp2p@e63975d.

tracing = "0.1"
nomos-blend = { path = "../core" }
nomos-blend-message = { path = "../message" }
Expand All @@ -20,7 +20,7 @@ tokio-stream = { version = "0.1", optional = true }
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1"
libp2p = { version = "0.53", features = ["ed25519", "tokio", "quic"] }
libp2p = { version = "0.55", features = ["ed25519", "tokio", "quic"] }
tracing-subscriber = "0.3.18"
fixed = "1"

Expand Down
144 changes: 79 additions & 65 deletions nomos-blend/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@ use crate::{
};
use cached::{Cached, TimedCache};
use libp2p::{
core::Endpoint,
core::{transport::PortUse, Endpoint},
swarm::{
behaviour::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionId,
FromSwarm, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour,
NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use nomos_blend::conn_maintenance::{ConnectionMonitor, ConnectionMonitorSettings};
use nomos_blend_message::BlendMessage;
use sha2::{Digest, Sha256};
use std::{
collections::{HashSet, VecDeque},
collections::{HashMap, VecDeque},
task::{Context, Poll, Waker},
};
use std::{marker::PhantomData, time::Duration};
Expand All @@ -30,7 +29,7 @@ where
IntervalProvider: IntervalStreamProvider,
{
config: Config,
peers: HashSet<PeerId>,
negotiated_peers: HashMap<PeerId, NegotiatedPeerState>,
/// Queue of events to yield to the swarm.
events: VecDeque<ToSwarm<Event, FromBehaviour>>,
/// Waker that handles polling
Expand All @@ -42,6 +41,12 @@ where
_interval_provider: PhantomData<IntervalProvider>,
}

#[derive(Debug, Eq, PartialEq)]
enum NegotiatedPeerState {
Healthy,
Unhealthy,
}

#[derive(Debug)]
pub struct Config {
pub duplicate_cache_lifespan: u64,
Expand All @@ -52,6 +57,10 @@ pub struct Config {
pub enum Event {
/// A message received from one of the peers.
Message(Vec<u8>),
/// A peer has been detected as malicious.
MaliciousPeer(PeerId),
/// A peer has been detected as unhealthy.
UnhealthyPeer(PeerId),
Error(Error),
}

Expand All @@ -65,7 +74,7 @@ where
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
Self {
config,
peers: HashSet::new(),
negotiated_peers: HashMap::new(),
events: VecDeque::new(),
waker: None,
duplicate_cache,
Expand Down Expand Up @@ -103,30 +112,29 @@ where
message: Vec<u8>,
excluded_peer: Option<PeerId>,
) -> Result<(), Error> {
let mut peer_ids = self.peers.clone();
if let Some(peer) = &excluded_peer {
peer_ids.remove(peer);
}

if peer_ids.is_empty() {
return Err(Error::NoPeers);
}

peer_ids.into_iter().for_each(|peer_id| {
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: FromBehaviour::Message(message.clone()),
let mut num_peers = 0;
self.negotiated_peers
.keys()
.filter(|peer_id| match excluded_peer {
Some(excluded_peer) => **peer_id != excluded_peer,
None => true,
})
.for_each(|peer_id| {
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::Any,
event: FromBehaviour::Message(message.clone()),
});
num_peers += 1;
});
});

self.try_wake();
Ok(())
}

pub fn peers(&self) -> &HashSet<PeerId> {
&self.peers
if num_peers == 0 {
Err(Error::NoPeers)
} else {
self.try_wake();
Ok(())
}
}

fn message_id(message: &[u8]) -> Vec<u8> {
Expand All @@ -135,6 +143,13 @@ where
hasher.finalize().to_vec()
}

pub fn num_healthy_peers(&self) -> usize {
self.negotiated_peers
.iter()
.filter(|(_, state)| **state == NegotiatedPeerState::Healthy)
.count()
}

fn create_connection_handler(&self) -> BlendConnectionHandler<M> {
let monitor = self.config.conn_monitor_settings.as_ref().map(|settings| {
ConnectionMonitor::new(
Expand Down Expand Up @@ -177,35 +192,31 @@ where
_: PeerId,
_: &Multiaddr,
_: Endpoint,
_: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(self.create_connection_handler())
}

/// Informs the behaviour about an event from the [`Swarm`].
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { .. }) => {
// TODO: Notify the connection handler to deny the stream if necessary
// - if max peering degree was reached.
// - if the peer has been detected as malicious.
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
..
}) => {
// This event happens in one of the following cases:
// 1. The connection was closed by the peer.
// 2. The connection was closed by the local node since no stream is active.
//
// In both cases, we need to remove the peer from the list of connected peers,
// though it may be already removed from list by handling other events.
if remaining_established == 0 {
self.peers.remove(&peer_id);
}
if let FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
remaining_established,
..
}) = event
{
// This event happens in one of the following cases:
// 1. The connection was closed by the peer.
// 2. The connection was closed by the local node since no stream is active.
//
// In both cases, we need to remove the peer from the list of connected peers,
// though it may be already removed from list by handling other events.
if remaining_established == 0 {
self.negotiated_peers.remove(&peer_id);
}
_ => {}
}
};

self.try_wake();
}

/// Handles an event generated by the [`BlendConnectionHandler`]
Expand Down Expand Up @@ -245,28 +256,31 @@ where
self.events
.push_back(ToSwarm::GenerateEvent(Event::Message(message)));
}
// The connection was fully negotiated by the peer,
// The inbound/outbound connection was fully negotiated by the peer,
// which means that the peer supports the blend protocol.
ToBehaviour::FullyNegotiatedOutbound => {
self.peers.insert(peer_id);
ToBehaviour::FullyNegotiatedInbound | ToBehaviour::FullyNegotiatedOutbound => {
self.negotiated_peers
.insert(peer_id, NegotiatedPeerState::Healthy);
}
ToBehaviour::NegotiationFailed => {
self.peers.remove(&peer_id);
ToBehaviour::DialUpgradeError(_) => {
self.negotiated_peers.remove(&peer_id);
}
ToBehaviour::MaliciousPeer => {
// TODO: Remove the peer from the connected peer list
// and add it to the malicious peer list,
// so that the peer is excluded from the future connection establishments.
// Also, notify the upper layer to try to dial new peers
// if we need more healthy peers.
tracing::debug!("Peer {:?} has been detected as malicious", peer_id);
self.negotiated_peers.remove(&peer_id);
self.events
.push_back(ToSwarm::GenerateEvent(Event::MaliciousPeer(peer_id)));
}
ToBehaviour::UnhealthyPeer => {
// TODO: Remove the peer from the connected 'healthy' peer list.
// Also, notify the upper layer to try to dial new peers
// if we need more healthy peers.
tracing::debug!("Peer {:?} has been detected as unhealthy", peer_id);
// TODO: Still the algorithm to revert the peer to healthy state is not defined yet.
self.negotiated_peers
.insert(peer_id, NegotiatedPeerState::Unhealthy);
self.events
.push_back(ToSwarm::GenerateEvent(Event::UnhealthyPeer(peer_id)));
}
ToBehaviour::IOError(error) => {
self.peers.remove(&peer_id);
self.negotiated_peers.remove(&peer_id);
self.events
.push_back(ToSwarm::GenerateEvent(Event::Error(Error::PeerIOError {
error,
Expand Down
Loading
Loading