@@ -63,7 +63,7 @@ pub(crate) struct RemoteMap {
6363 disco : DiscoState ,
6464 sender : TransportsSender ,
6565 discovery : ConcurrentDiscovery ,
66- actor_tasks : Mutex < JoinSet < Vec < RemoteStateMessage > > > ,
66+ actor_tasks : Mutex < JoinSet < ( EndpointId , Vec < RemoteStateMessage > ) > > ,
6767}
6868
6969impl RemoteMap {
@@ -104,7 +104,22 @@ impl RemoteMap {
104104 senders. retain ( |_eid, sender| !sender. is_closed ( ) ) ;
105105 while let Some ( result) = self . actor_tasks . lock ( ) . expect ( "poisoned" ) . try_join_next ( ) {
106106 match result {
107- Ok ( leftover_msgs) => debug ! ( ?leftover_msgs, "TODO: handle leftover messages" ) ,
107+ Ok ( ( eid, leftover_msgs) ) => {
108+ let entry = senders. entry ( eid) ;
109+ if leftover_msgs. is_empty ( ) {
110+ match entry {
111+ hash_map:: Entry :: Occupied ( occupied_entry) => occupied_entry. remove ( ) ,
112+ hash_map:: Entry :: Vacant ( _) => {
113+ panic ! ( "this should be impossible TODO(matheus23)" ) ;
114+ }
115+ } ;
116+ } else {
117+ // The remote actor got messages while it was closing, so we're restarting
118+ debug ! ( %eid, "restarting terminated remote state actor: messages received during shutdown" ) ;
119+ let sender = self . start_remote_state_actor ( eid, leftover_msgs) ;
120+ entry. insert_entry ( sender) ;
121+ }
122+ }
108123 Err ( err) => {
109124 if let Ok ( panic) = err. try_into_panic ( ) {
110125 error ! ( "RemoteStateActor panicked." ) ;
@@ -125,7 +140,7 @@ impl RemoteMap {
125140 match handles. entry ( eid) {
126141 hash_map:: Entry :: Occupied ( entry) => entry. get ( ) . clone ( ) ,
127142 hash_map:: Entry :: Vacant ( entry) => {
128- let sender = self . start_remote_state_actor ( eid) ;
143+ let sender = self . start_remote_state_actor ( eid, vec ! [ ] ) ;
129144 entry. insert ( sender. clone ( ) ) ;
130145 sender
131146 }
@@ -135,7 +150,11 @@ impl RemoteMap {
135150 /// Starts a new remote state actor and returns a handle and a sender.
136151 ///
137152 /// The handle is not inserted into the endpoint map, this must be done by the caller of this function.
138- fn start_remote_state_actor ( & self , eid : EndpointId ) -> mpsc:: Sender < RemoteStateMessage > {
153+ fn start_remote_state_actor (
154+ & self ,
155+ eid : EndpointId ,
156+ initial_msgs : Vec < RemoteStateMessage > ,
157+ ) -> mpsc:: Sender < RemoteStateMessage > {
139158 // Ensure there is a RemoteMappedAddr for this EndpointId.
140159 self . endpoint_mapped_addrs . get ( & eid) ;
141160 RemoteStateActor :: new (
@@ -148,7 +167,10 @@ impl RemoteMap {
148167 self . sender . clone ( ) ,
149168 self . discovery . clone ( ) ,
150169 )
151- . start ( self . actor_tasks . lock ( ) . expect ( "poisoned" ) . deref_mut ( ) )
170+ . start (
171+ initial_msgs,
172+ self . actor_tasks . lock ( ) . expect ( "poisoned" ) . deref_mut ( ) ,
173+ )
152174 }
153175
154176 pub ( super ) fn handle_ping ( & self , msg : disco:: Ping , sender : EndpointId , src : transports:: Addr ) {
0 commit comments