@@ -26,7 +26,7 @@ use n0_future::{
2626use rand:: rngs:: StdRng ;
2727use rand_core:: SeedableRng ;
2828use serde:: { Deserialize , Serialize } ;
29- use tokio:: sync:: mpsc;
29+ use tokio:: sync:: { mpsc, oneshot } ;
3030use tokio_util:: sync:: CancellationToken ;
3131use tracing:: { debug, error, error_span, trace, warn, Instrument } ;
3232
@@ -162,6 +162,15 @@ impl ProtocolHandler for Gossip {
162162 Ok ( ( ) )
163163 } )
164164 }
165+
166+ fn shutdown ( & self ) -> BoxFuture < ( ) > {
167+ let this = self . clone ( ) ;
168+ Box :: pin ( async move {
169+ if let Err ( err) = this. shutdown ( ) . await {
170+ warn ! ( "error while shutting down gossip: {err:#}" ) ;
171+ }
172+ } )
173+ }
165174}
166175
167176/// Builder to configure and construct [`Gossip`].
@@ -323,6 +332,17 @@ impl Gossip {
323332 pub fn metrics ( & self ) -> & Arc < Metrics > {
324333 & self . inner . metrics
325334 }
335+
336+ /// Shutdown the gossip instance.
337+ ///
338+ /// This leaves all topics, sending `Disconnect` messages to peers, and then
339+ /// stops the gossip actor loop and drops all state and connections.
340+ pub async fn shutdown ( & self ) -> anyhow:: Result < ( ) > {
341+ let ( reply, reply_rx) = oneshot:: channel ( ) ;
342+ self . inner . send ( ToActor :: Shutdown { reply } ) . await ?;
343+ reply_rx. await ?;
344+ Ok ( ( ) )
345+ }
326346}
327347
328348impl Inner {
@@ -457,6 +477,9 @@ enum ToActor {
457477 topic : TopicId ,
458478 receiver_id : ReceiverId ,
459479 } ,
480+ Shutdown {
481+ reply : oneshot:: Sender < ( ) > ,
482+ } ,
460483}
461484
462485/// Actor that sends and handles messages between the connection and main state loops
@@ -590,7 +613,18 @@ impl Actor {
590613 trace!( ?i, "tick: to_actor_rx" ) ;
591614 self . metrics. actor_tick_rx. inc( ) ;
592615 match msg {
593- Some ( msg) => self . handle_to_actor_msg( msg, Instant :: now( ) ) . await ?,
616+ Some ( msg) => {
617+ if let ToActor :: Shutdown { reply } = msg {
618+ debug!( "received shutdown message, quit all topics" ) ;
619+ self . quit_queue. extend( self . topics. keys( ) . copied( ) ) ;
620+ self . process_quit_queue( ) . await . ok( ) ;
621+ debug!( "all topics quit, stop gossip actor" ) ;
622+ reply. send( ( ) ) . ok( ) ;
623+ return Ok ( None )
624+ } else {
625+ self . handle_to_actor_msg( msg, Instant :: now( ) ) . await ?;
626+ }
627+ }
594628 None => {
595629 debug!( "all gossip handles dropped, stop gossip actor" ) ;
596630 return Ok ( None )
@@ -818,6 +852,7 @@ impl Actor {
818852 ToActor :: ReceiverGone { topic, receiver_id } => {
819853 self . handle_receiver_gone ( topic, receiver_id) . await ?;
820854 }
855+ ToActor :: Shutdown { .. } => unreachable ! ( "handled in main loop" ) ,
821856 }
822857 Ok ( ( ) )
823858 }
0 commit comments