Skip to content

Commit 4d5f13c

Browse files
committed
refactor: add nodes field to Disconnect instead of sending ShuffleReply at disconnect
1 parent 23fc931 commit 4d5f13c

File tree

1 file changed

+33
-18
lines changed

1 file changed

+33
-18
lines changed

src/proto/hyparview.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub enum Message<PI> {
8787
/// Request to disconnect from a peer.
8888
/// If [`Disconnect::alive`] is true, the other peer is not shutting down, so it should be
8989
/// added to the passive set.
90-
Disconnect(Disconnect),
90+
Disconnect(Disconnect<PI>),
9191
}
9292

9393
/// The time-to-live for this message.
@@ -188,10 +188,12 @@ pub struct NeighborReply {}
188188

189189
/// Message sent when leaving the swarm or closing down to inform peers about us being gone.
190190
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
191-
pub struct Disconnect {
191+
pub struct Disconnect<PI> {
192192
/// Whether we are actually shutting down or closing the connection only because our limits are
193193
/// reached.
194194
alive: bool,
195+
/// A list of nodes from the sender's active and passive views
196+
nodes: Vec<PeerInfo<PI>>,
195197
}
196198

197199
/// Configuration for the swarm membership layer
@@ -359,8 +361,9 @@ where
359361
}
360362

361363
/// We received a disconnect message.
362-
fn on_disconnect(&mut self, peer: PI, details: Disconnect, io: &mut impl IO<PI>) {
364+
fn on_disconnect(&mut self, peer: PI, details: Disconnect<PI>, io: &mut impl IO<PI>) {
363365
self.neighbor_request_failed(peer, io);
366+
364367
if self.active_view.contains(&peer) {
365368
self.remove_active(
366369
&peer,
@@ -372,6 +375,7 @@ where
372375
} else if details.alive && self.passive_view.contains(&peer) {
373376
self.alive_disconnect_peers.insert(peer);
374377
}
378+
self.on_shuffle_nodes(details.nodes, Some(&peer), io);
375379
}
376380

377381
/// A connection was closed by the peer.
@@ -396,9 +400,8 @@ where
396400
// Before disconnecting, send a `ShuffleReply` with some of our nodes to
397401
// prevent the other node from running out of connections. This is especially
398402
// relevant if the other node just joined the swarm.
399-
let len = self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count;
400-
self.send_shuffle_reply(peer, len, io);
401-
let message = Message::Disconnect(Disconnect { alive });
403+
let nodes = self.get_nodes_for_shuffle(None);
404+
let message = Message::Disconnect(Disconnect { alive, nodes });
402405
io.push(OutEvent::SendMessage(peer, message));
403406
io.push(OutEvent::DisconnectPeer(peer));
404407
}
@@ -525,9 +528,7 @@ where
525528
fn on_shuffle(&mut self, from: PI, shuffle: Shuffle<PI>, io: &mut impl IO<PI>) {
526529
if shuffle.ttl.expired() || self.active_view.len() <= 1 {
527530
let len = shuffle.nodes.len();
528-
for node in shuffle.nodes {
529-
self.add_passive(node.id, node.data, io);
530-
}
531+
self.on_shuffle_nodes(shuffle.nodes, None, io);
531532
self.send_shuffle_reply(shuffle.origin, len, io);
532533
} else if let Some(node) = self
533534
.active_view
@@ -542,7 +543,28 @@ where
542543
}
543544
}
544545

546+
fn on_shuffle_nodes(
547+
&mut self,
548+
nodes: Vec<PeerInfo<PI>>,
549+
skip_peer: Option<&PI>,
550+
io: &mut impl IO<PI>,
551+
) {
552+
for node in nodes {
553+
self.add_passive(node.id, node.data, io);
554+
}
555+
self.refill_active_from_passive(skip_peer, io);
556+
}
557+
545558
fn send_shuffle_reply(&mut self, to: PI, len: usize, io: &mut impl IO<PI>) {
559+
let nodes = self.get_nodes_for_shuffle(Some(len));
560+
let message = Message::ShuffleReply(ShuffleReply { nodes });
561+
io.push(OutEvent::SendMessage(to, message));
562+
}
563+
564+
fn get_nodes_for_shuffle(&mut self, len: Option<usize>) -> Vec<PeerInfo<PI>> {
565+
let len = len.unwrap_or_else(|| {
566+
self.config.shuffle_active_view_count + self.config.shuffle_passive_view_count
567+
});
546568
let mut nodes = self.passive_view.shuffled_and_capped(len, &mut self.rng);
547569
// If we don't have enough passive nodes for the expected length, we fill with
548570
// active nodes.
@@ -552,18 +574,11 @@ where
552574
.shuffled_and_capped(len - nodes.len(), &mut self.rng),
553575
);
554576
}
555-
let nodes = nodes.into_iter().map(|id| self.peer_info(&id));
556-
let message = Message::ShuffleReply(ShuffleReply {
557-
nodes: nodes.collect(),
558-
});
559-
io.push(OutEvent::SendMessage(to, message));
577+
nodes.into_iter().map(|id| self.peer_info(&id)).collect()
560578
}
561579

562580
fn on_shuffle_reply(&mut self, message: ShuffleReply<PI>, io: &mut impl IO<PI>) {
563-
for node in message.nodes {
564-
self.add_passive(node.id, node.data, io);
565-
}
566-
self.refill_active_from_passive(None, io);
581+
self.on_shuffle_nodes(message.nodes, None, io);
567582
}
568583

569584
fn handle_shuffle_timer(&mut self, io: &mut impl IO<PI>) {

0 commit comments

Comments
 (0)