Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

pub mod deny_list;
mod either;
pub mod toggle;

Expand Down
60 changes: 60 additions & 0 deletions swarm/src/behaviour/deny_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::behaviour::THandlerInEvent;
use crate::{dummy, CloseConnection, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{ConnectedPoint, PeerId};
use std::collections::{HashSet, VecDeque};
use std::task::{Context, Poll};

#[derive(Default)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[derive(Default)]
#[derive(Default, Debug, Clone)]

pub struct Behaviour {
list: HashSet<PeerId>,
to_disconnect: VecDeque<PeerId>,
}

impl Behaviour {
pub fn add_peer(&mut self, peer: PeerId) {
self.list.insert(peer);
Comment on lines +8 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub struct Behaviour {
list: HashSet<PeerId>,
to_disconnect: VecDeque<PeerId>,
}
impl Behaviour {
pub fn add_peer(&mut self, peer: PeerId) {
self.list.insert(peer);
pub struct Behaviour<T = ()> {
list: HashMap<PeerId, T>,
to_disconnect: VecDeque<PeerId>,
}
impl Behaviour<()> {
pub fn add_peer(&mut self, peer: PeerId) {
self.list.insert(peer, ());
self.to_disconnect.push_back(peer);
}
}
impl<T> Behaviour<T> {
pub fn add_peer_because(&mut self, peer: PeerId, reason: T) {
self.list.insert(peer, reason);

We could allow an optional reason parameter that we'd then also include in the Denied struct. Might be useful if there are multiple reasons why a peer can be banned.
If we'd additionally expose what peers are currently banned (and for what reason) it would allow a more sophisticated management of banned peers. E.g. if a peer was only banned because an Application-Layer limit was reached.

But not sure if it's worth the extra complexity. Could wait until there is actually a use-case for this.

self.to_disconnect.push_back(peer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also lead to waking up the swarm. Perhaps using an async channel is the easiest and most readable solution? (instead of rebuilding such functionality with a VecDeque and an Option<Waker>)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxinden was talking about building a WakingVec abstraction because we are coming across this very often actually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a quick search on crates.io but couldn't find anything that does what we need. It might be possible that I used the wrong keywords!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a PoC here: #3160

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While that may have independent merit (not yet reviewed), I meant simply using async-channel instead of the VecDeque.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While that may have independent merit (not yet reviewed), I meant simply using async-channel instead of the VecDeque.

I always find it confusing when the same data structure holds both ends of a channel.

But I agree with you that it would solve the problem.

}

pub fn remove_peer(&mut self, peer: PeerId) {
self.list.remove(&peer);
}
}

#[derive(Debug, thiserror::Error)]
#[error("peer is on a deny list")]
pub struct Denied;

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = dummy::ConnectionHandler;
type ConnectionDenied = Denied;
type OutEvent = ();

fn new_handler(
&mut self,
peer: &PeerId,
_: &ConnectedPoint,
) -> Result<Self::ConnectionHandler, Self::ConnectionDenied> {
if self.list.contains(peer) {
return Err(Denied);
}

Ok(dummy::ConnectionHandler)
}

fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self::ConnectionHandler>>>
{
if let Some(peer_id) = self.to_disconnect.pop_front() {
return Poll::Ready(NetworkBehaviourAction::CloseConnection {
peer_id,
connection: CloseConnection::All,
});
}

Poll::Pending
}
}
216 changes: 27 additions & 189 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,6 @@ where
/// similar mechanisms.
external_addrs: Addresses,

/// List of nodes for which we deny any incoming connection.
banned_peers: HashSet<PeerId>,

/// Connections for which we withhold any reporting. These belong to banned peers.
///
/// Note: Connections to a peer that are established at the time of banning that peer
Expand Down Expand Up @@ -524,14 +521,6 @@ where
return Err(DialError::DialPeerConditionFalse(condition));
}

// Check if peer is banned.
if self.banned_peers.contains(&peer_id) {
let error = DialError::Banned;
#[allow(deprecated)]
self.behaviour.inject_dial_failure(Some(peer_id), &error);
return Err(error);
}
Comment on lines -527 to -533
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lose this early check here, i.e. we will initiate a dial and then immediately drop the connection instead of aborting the connection early. I guess that is the price for implementing this kind of functionality in a generic way.


// Retrieve the addresses to dial.
let addresses = {
let mut addresses = match swarm_dial_opts.0 {
Expand Down Expand Up @@ -717,25 +706,6 @@ where
}
}

/// Bans a peer by its peer ID.
///
/// Any incoming connection and any dialing attempt will immediately be rejected.
/// This function has no effect if the peer is already banned.
pub fn ban_peer_id(&mut self, peer_id: PeerId) {
if self.banned_peers.insert(peer_id) {
// Note that established connections to the now banned peer are closed but not
// added to [`Swarm::banned_peer_connections`]. They have been previously reported
// as open to the behaviour and need be reported as closed once closing the
// connection finishes.
self.pool.disconnect(peer_id);
}
}

/// Unbans a peer.
pub fn unban_peer_id(&mut self, peer_id: PeerId) {
self.banned_peers.remove(&peer_id);
}

/// Disconnects a peer by its peer ID, closing all connections to said peer.
///
/// Returns `Ok(())` if there was one or more established connections to the peer.
Expand Down Expand Up @@ -797,49 +767,41 @@ where
concurrent_dial_errors,
supported_protocols,
} => {
if self.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any
// future events from the connection to the behaviour.
self.banned_peer_connections.insert(id);
self.pool.disconnect(peer_id);
return Some(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
let non_banned_established = other_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count();
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
let non_banned_established = other_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count();

log::debug!(
log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
peer_id,
endpoint,
num_established,
non_banned_established + 1,
);
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
#[allow(deprecated)]
self.behaviour.inject_connection_established(
&peer_id,
&id,
&endpoint,
failed_addresses.as_ref(),
non_banned_established,
);
self.supported_protocols = supported_protocols;
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
#[allow(deprecated)]
self.behaviour.inject_connection_established(
&peer_id,
&id,
&endpoint,
failed_addresses.as_ref(),
non_banned_established,
);
self.supported_protocols = supported_protocols;

return Some(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
});
}
return Some(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
});
}
PoolEvent::PendingOutboundConnectionError { id: _, error, peer } => {
let error = error.into();
Expand Down Expand Up @@ -1596,7 +1558,6 @@ where
supported_protocols: Default::default(),
listened_addrs: HashMap::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
banned_peer_connections: HashSet::new(),
pending_event: None,
}
Expand Down Expand Up @@ -1833,129 +1794,6 @@ mod tests {
&& !swarm2.is_connected(swarm1.local_peer_id())
}

/// Establishes multiple connections between two peers,
/// after which one peer bans the other.
///
/// The test expects both behaviours to be notified via pairs of
/// [`NetworkBehaviour::inject_connection_established`] / [`NetworkBehaviour::inject_connection_closed`]
/// calls while unbanned.
///
/// While the ban is in effect, further dials occur. For these connections no
/// [`NetworkBehaviour::inject_connection_established`], [`NetworkBehaviour::inject_connection_closed`]
/// calls should be registered.
#[test]
fn test_connect_disconnect_ban() {
// Since the test does not try to open any substreams, we can
// use the dummy protocols handler.
let handler_proto = keep_alive::ConnectionHandler;

let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();

let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();

swarm1.listen_on(addr1).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();

let swarm1_id = *swarm1.local_peer_id();

enum Stage {
/// Waiting for the peers to connect. Banning has not occurred.
Connecting,
/// Ban occurred.
Banned,
// Ban is in place and a dial is ongoing.
BannedDial,
// Mid-ban dial was registered and the peer was unbanned.
Unbanned,
// There are dial attempts ongoing for the no longer banned peers.
Reconnecting,
}

let num_connections = 10;

for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}

let mut s1_expected_conns = num_connections;
let mut s2_expected_conns = num_connections;

let mut stage = Stage::Connecting;

executor::block_on(future::poll_fn(move |cx| loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match stage {
Stage::Connecting => {
if swarm1.behaviour.assert_connected(s1_expected_conns, 1)
&& swarm2.behaviour.assert_connected(s2_expected_conns, 1)
{
// Setup to test that already established connections are correctly closed
// and reported as such after the peer is banned.
swarm2.ban_peer_id(swarm1_id);
stage = Stage::Banned;
}
}
Stage::Banned => {
if swarm1.behaviour.assert_disconnected(s1_expected_conns, 1)
&& swarm2.behaviour.assert_disconnected(s2_expected_conns, 1)
{
// Setup to test that new connections of banned peers are not reported.
swarm1.dial(addr2.clone()).unwrap();
s1_expected_conns += 1;
stage = Stage::BannedDial;
}
}
Stage::BannedDial => {
if swarm2.network_info().num_peers() == 1 {
// The banned connection was established. Check that it was not reported to
// the behaviour of the banning swarm.
assert_eq!(
swarm2.behaviour.on_connection_established.len(), s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);

// Setup to test that the banned connection is not reported upon closing
// even if the peer is unbanned.
swarm2.unban_peer_id(swarm1_id);
stage = Stage::Unbanned;
}
}
Stage::Unbanned => {
if swarm2.network_info().num_peers() == 0 {
// The banned connection has closed. Check that it was not reported.
assert_eq!(
swarm2.behaviour.on_connection_closed.len(), s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
assert!(swarm2.banned_peer_connections.is_empty());

// Setup to test that a ban lifted does not affect future connections.
for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}
s1_expected_conns += num_connections;
s2_expected_conns += num_connections;
stage = Stage::Reconnecting;
}
}
Stage::Reconnecting => {
if swarm1.behaviour.on_connection_established.len() == s1_expected_conns
&& swarm2.behaviour.assert_connected(s2_expected_conns, 2)
{
return Poll::Ready(());
}
}
}

if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending;
}
}))
}

/// Establishes multiple connections between two peers,
/// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
///
Expand Down
23 changes: 0 additions & 23 deletions swarm/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,29 +204,6 @@ where
.count()
}

/// Checks that when the expected number of closed connection notifications are received, a
/// given number of expected disconnections have been received as well.
///
/// Returns if the first condition is met.
pub fn assert_disconnected(
&self,
expected_closed_connections: usize,
expected_disconnections: usize,
) -> bool {
if self.on_connection_closed.len() == expected_closed_connections {
assert_eq!(
self.on_connection_closed
.iter()
.filter(|(.., remaining_established)| { *remaining_established == 0 })
.count(),
expected_disconnections
);
return true;
}

false
}

/// Checks that when the expected number of established connection notifications are received,
/// a given number of expected connections have been received as well.
///
Expand Down