Skip to content

Commit 7608bbc

Browse files
committed
refactor: add nodes field to Disconnect instead of sending ShuffleReply at disconnect
1 parent de765fa commit 7608bbc

File tree

1 file changed

+33
-18
lines changed

1 file changed

+33
-18
lines changed

src/proto/hyparview.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub enum Message<PI> {
8686
/// Request to disconnect from a peer.
8787
/// If [`Disconnect::alive`] is true, the other peer is not shutting down, so it should be
8888
/// added to the passive set.
89-
Disconnect(Disconnect),
89+
Disconnect(Disconnect<PI>),
9090
}
9191

9292
/// The time-to-live for this message.
@@ -187,10 +187,12 @@ pub struct NeighborReply {}
187187

188188
/// Message sent when leaving the swarm or closing down to inform peers about us being gone.
189189
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
190-
pub struct Disconnect {
190+
pub struct Disconnect<PI> {
191191
/// Whether we are actually shutting down or closing the connection only because our limits are
192192
/// reached.
193193
alive: bool,
194+
/// A list of nodes from the sender's active and passive views
195+
nodes: Vec<PeerInfo<PI>>,
194196
}
195197

196198
/// Configuration for the swarm membership layer
@@ -358,8 +360,9 @@ where
358360
}
359361

360362
/// We received a disconnect message.
361-
fn on_disconnect(&mut self, peer: PI, details: Disconnect, io: &mut impl IO<PI>) {
363+
fn on_disconnect(&mut self, peer: PI, details: Disconnect<PI>, io: &mut impl IO<PI>) {
362364
self.neighbor_request_failed(peer, io);
365+
363366
if self.active_view.contains(&peer) {
364367
self.remove_active(
365368
&peer,
@@ -371,6 +374,7 @@ where
371374
} else if details.alive && self.passive_view.contains(&peer) {
372375
self.alive_disconnect_peers.insert(peer);
373376
}
377+
self.on_shuffle_nodes(details.nodes, Some(&peer), io);
374378
}
375379

376380
/// A connection was closed by the peer.
@@ -395,9 +399,8 @@ where
395399
// Before disconnecting, send a `ShuffleReply` with some of our nodes to
396400
// prevent the other node from running out of connections. This is especially
397401
// relevant if the other node just joined the swarm.
398-
let len = self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count;
399-
self.send_shuffle_reply(peer, len, io);
400-
let message = Message::Disconnect(Disconnect { alive });
402+
let nodes = self.get_nodes_for_shuffle(None);
403+
let message = Message::Disconnect(Disconnect { alive, nodes });
401404
io.push(OutEvent::SendMessage(peer, message));
402405
io.push(OutEvent::DisconnectPeer(peer));
403406
}
@@ -524,9 +527,7 @@ where
524527
fn on_shuffle(&mut self, from: PI, shuffle: Shuffle<PI>, io: &mut impl IO<PI>) {
525528
if shuffle.ttl.expired() || self.active_view.len() <= 1 {
526529
let len = shuffle.nodes.len();
527-
for node in shuffle.nodes {
528-
self.add_passive(node.id, node.data, io);
529-
}
530+
self.on_shuffle_nodes(shuffle.nodes, None, io);
530531
self.send_shuffle_reply(shuffle.origin, len, io);
531532
} else if let Some(node) = self
532533
.active_view
@@ -541,7 +542,28 @@ where
541542
}
542543
}
543544

545+
fn on_shuffle_nodes(
546+
&mut self,
547+
nodes: Vec<PeerInfo<PI>>,
548+
skip_peer: Option<&PI>,
549+
io: &mut impl IO<PI>,
550+
) {
551+
for node in nodes {
552+
self.add_passive(node.id, node.data, io);
553+
}
554+
self.refill_active_from_passive(skip_peer, io);
555+
}
556+
544557
fn send_shuffle_reply(&mut self, to: PI, len: usize, io: &mut impl IO<PI>) {
558+
let nodes = self.get_nodes_for_shuffle(Some(len));
559+
let message = Message::ShuffleReply(ShuffleReply { nodes });
560+
io.push(OutEvent::SendMessage(to, message));
561+
}
562+
563+
fn get_nodes_for_shuffle(&mut self, len: Option<usize>) -> Vec<PeerInfo<PI>> {
564+
let len = len.unwrap_or_else(|| {
565+
self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count
566+
});
545567
let mut nodes = self.passive_view.shuffled_and_capped(len, &mut self.rng);
546568
// If we don't have enough passive nodes for the expected length, we fill with
547569
// active nodes.
@@ -551,18 +573,11 @@ where
551573
.shuffled_and_capped(len - nodes.len(), &mut self.rng),
552574
);
553575
}
554-
let nodes = nodes.into_iter().map(|id| self.peer_info(&id));
555-
let message = Message::ShuffleReply(ShuffleReply {
556-
nodes: nodes.collect(),
557-
});
558-
io.push(OutEvent::SendMessage(to, message));
576+
nodes.into_iter().map(|id| self.peer_info(&id)).collect()
559577
}
560578

561579
fn on_shuffle_reply(&mut self, message: ShuffleReply<PI>, io: &mut impl IO<PI>) {
562-
for node in message.nodes {
563-
self.add_passive(node.id, node.data, io);
564-
}
565-
self.refill_active_from_passive(None, io);
580+
self.on_shuffle_nodes(message.nodes, None, io);
566581
}
567582

568583
fn handle_shuffle_timer(&mut self, io: &mut impl IO<PI>) {

0 commit comments

Comments
 (0)