Skip to content

Commit f857404

Browse files
committed
Replace RemoteStateHandle with storing a JoinSet in RemoteMap
1 parent e98b835 commit f857404

File tree

2 files changed

+49
-42
lines changed

2 files changed

+49
-42
lines changed

iroh/src/magicsock/remote_map.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,22 @@ use std::{
22
collections::{BTreeSet, hash_map},
33
hash::Hash,
44
net::{IpAddr, SocketAddr},
5+
ops::DerefMut,
56
sync::{Arc, Mutex},
67
time::Duration,
78
};
89

9-
use iroh_base::{EndpointId, RelayUrl};
10+
use iroh_base::{EndpointAddr, EndpointId, RelayUrl};
11+
use n0_future::task::JoinSet;
1012
use rustc_hash::FxHashMap;
1113
use serde::{Deserialize, Serialize};
1214
use tokio::sync::mpsc;
13-
use tracing::warn;
15+
use tracing::{debug, error, warn};
1416

1517
pub(crate) use self::remote_state::PathsWatcher;
18+
use self::remote_state::RemoteStateActor;
1619
pub(super) use self::remote_state::RemoteStateMessage;
1720
pub use self::remote_state::{PathInfo, PathInfoList};
18-
use self::remote_state::{RemoteStateActor, RemoteStateHandle};
1921
use super::{
2022
DirectAddr, DiscoState, MagicsockMetrics,
2123
mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr},
@@ -45,7 +47,7 @@ pub(crate) struct RemoteMap {
4547
// State we keep about remote endpoints.
4648
//
4749
/// The actors tracking each remote endpoint.
48-
actor_handles: Mutex<FxHashMap<EndpointId, RemoteStateHandle>>,
50+
actor_senders: Mutex<FxHashMap<EndpointId, GuardedSender<RemoteStateMessage>>>,
4951
/// The mapping between [`EndpointId`]s and [`EndpointIdMappedAddr`]s.
5052
pub(super) endpoint_mapped_addrs: AddrMap<EndpointId, EndpointIdMappedAddr>,
5153
/// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s.
@@ -61,6 +63,7 @@ pub(crate) struct RemoteMap {
6163
disco: DiscoState,
6264
sender: TransportsSender,
6365
discovery: ConcurrentDiscovery,
66+
actor_tasks: Mutex<JoinSet<Vec<RemoteStateMessage>>>,
6467
}
6568

6669
impl RemoteMap {
@@ -75,7 +78,7 @@ impl RemoteMap {
7578
discovery: ConcurrentDiscovery,
7679
) -> Self {
7780
Self {
78-
actor_handles: Mutex::new(FxHashMap::default()),
81+
actor_senders: Mutex::new(FxHashMap::default()),
7982
endpoint_mapped_addrs: Default::default(),
8083
relay_mapped_addrs: Default::default(),
8184
local_endpoint_id,
@@ -84,6 +87,7 @@ impl RemoteMap {
8487
disco,
8588
sender,
8689
discovery,
90+
actor_tasks: Default::default(),
8791
}
8892
}
8993

@@ -96,8 +100,19 @@ impl RemoteMap {
96100
/// This should be called periodically to remove handles to endpoint state actors
97101
/// that have shutdown after their idle timeout expired.
98102
pub(super) fn remove_closed_remote_state_actors(&self) {
99-
let mut handles = self.actor_handles.lock().expect("poisoned");
100-
handles.retain(|_eid, handle| !handle.sender.is_closed())
103+
let mut senders = self.actor_senders.lock().expect("poisoned");
104+
senders.retain(|_eid, sender| !sender.is_closed());
105+
while let Some(result) = self.actor_tasks.lock().expect("poisoned").try_join_next() {
106+
match result {
107+
Ok(leftover_msgs) => debug!(?leftover_msgs, "TODO: handle leftover messages"),
108+
Err(err) => {
109+
if let Ok(panic) = err.try_into_panic() {
110+
error!("RemoteStateActor panicked.");
111+
std::panic::resume_unwind(panic);
112+
}
113+
}
114+
}
115+
}
101116
}
102117

103118
/// Returns the sender for the [`RemoteStateActor`].
@@ -106,10 +121,10 @@ impl RemoteMap {
106121
///
107122
/// [`RemoteStateActor`]: remote_state::RemoteStateActor
108123
pub(super) fn remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender<RemoteStateMessage> {
109-
let mut handles = self.actor_handles.lock().expect("poisoned");
124+
let mut handles = self.actor_senders.lock().expect("poisoned");
110125
match handles.entry(eid) {
111126
hash_map::Entry::Occupied(mut entry) => {
112-
if let Some(sender) = entry.get().sender.get() {
127+
if let Some(sender) = entry.get().get() {
113128
sender
114129
} else {
115130
// The actor is dead: Start a new actor.
@@ -132,10 +147,13 @@ impl RemoteMap {
132147
fn start_remote_state_actor(
133148
&self,
134149
eid: EndpointId,
135-
) -> (RemoteStateHandle, mpsc::Sender<RemoteStateMessage>) {
150+
) -> (
151+
GuardedSender<RemoteStateMessage>,
152+
mpsc::Sender<RemoteStateMessage>,
153+
) {
136154
// Ensure there is a RemoteMappedAddr for this EndpointId.
137155
self.endpoint_mapped_addrs.get(&eid);
138-
let handle = RemoteStateActor::new(
156+
let sender = RemoteStateActor::new(
139157
eid,
140158
self.local_endpoint_id,
141159
self.local_addrs.clone(),
@@ -145,9 +163,9 @@ impl RemoteMap {
145163
self.sender.clone(),
146164
self.discovery.clone(),
147165
)
148-
.start();
149-
let sender = handle.sender.get().expect("just created");
150-
(handle, sender)
166+
.start(self.actor_tasks.lock().expect("poisoned").deref_mut());
167+
let tx = sender.get().expect("just created");
168+
(sender, tx)
151169
}
152170

153171
pub(super) fn handle_ping(&self, msg: disco::Ping, sender: EndpointId, src: transports::Addr) {

iroh/src/magicsock/remote_map/remote_state.rs

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use iroh_base::{EndpointId, RelayUrl, TransportAddr};
1010
use n0_future::{
1111
Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt,
1212
boxed::BoxStream,
13-
task::{self, AbortOnDropHandle},
13+
task::JoinSet,
1414
time::{self, Duration, Instant},
1515
};
1616
use n0_watcher::{Watchable, Watcher};
@@ -41,7 +41,7 @@ use crate::{
4141
util::MaybeFuture,
4242
};
4343

44-
mod guarded_channel;
44+
pub(crate) mod guarded_channel;
4545
mod path_state;
4646

4747
// TODO: use this
@@ -209,7 +209,10 @@ impl RemoteStateActor {
209209
}
210210
}
211211

212-
pub(super) fn start(self) -> RemoteStateHandle {
212+
pub(super) fn start(
213+
self,
214+
tasks: &mut JoinSet<Vec<RemoteStateMessage>>,
215+
) -> GuardedSender<RemoteStateMessage> {
213216
let (tx, rx) = guarded_channel(16);
214217
let me = self.local_endpoint_id;
215218
let endpoint_id = self.endpoint_id;
@@ -219,28 +222,28 @@ impl RemoteStateActor {
219222
// we don't explicitly set a span we get the spans from whatever call happens to
220223
// first create the actor, which is often very confusing as it then keeps those
221224
// spans for all logging of the actor.
222-
let task = task::spawn(self.run(rx).instrument(info_span!(
225+
tasks.spawn(self.run(rx).instrument(info_span!(
223226
parent: None,
224227
"RemoteStateActor",
225228
me = %me.fmt_short(),
226229
remote = %endpoint_id.fmt_short(),
227230
)));
228-
RemoteStateHandle {
229-
sender: tx,
230-
_task: AbortOnDropHandle::new(task),
231-
}
231+
tx
232232
}
233233

234234
/// Runs the main loop of the actor.
235235
///
236236
/// Note that the actor uses async handlers for tasks from the main loop. The actor is
237237
/// not processing items from the inbox while waiting on any async calls. So some
238238
/// discipline is needed to not turn pending for a long time.
239-
async fn run(mut self, mut inbox: GuardedReceiver<RemoteStateMessage>) {
239+
async fn run(
240+
mut self,
241+
mut inbox: GuardedReceiver<RemoteStateMessage>,
242+
) -> Vec<RemoteStateMessage> {
240243
trace!("actor started");
241244
let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT);
242245
n0_future::pin!(idle_timeout);
243-
loop {
246+
let leftover_msgs = loop {
244247
let scheduled_path_open = match self.scheduled_open_path {
245248
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
246249
None => MaybeFuture::None,
@@ -261,7 +264,7 @@ impl RemoteStateActor {
261264
msg = inbox.recv() => {
262265
match msg {
263266
Some(msg) => self.handle_message(msg).await,
264-
None => break,
267+
None => break vec![],
265268
}
266269
}
267270
Some((id, evt)) = self.path_events.next() => {
@@ -297,15 +300,16 @@ impl RemoteStateActor {
297300
_ = &mut idle_timeout => {
298301
if self.connections.is_empty() && inbox.close_if_idle() {
299302
trace!("idle timeout expired and still idle: terminate actor");
300-
break;
303+
break vec![];
301304
} else {
302305
// Seems like we weren't really idle, so we reset
303306
idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
304307
}
305308
}
306309
}
307-
}
310+
};
308311
trace!("actor terminating");
312+
leftover_msgs
309313
}
310314

311315
/// Handles an actor message.
@@ -1096,21 +1100,6 @@ pub(crate) enum RemoteStateMessage {
10961100
Latency(oneshot::Sender<Option<Duration>>),
10971101
}
10981102

1099-
/// A handle to a [`RemoteStateActor`].
1100-
///
1101-
/// Dropping this will stop the actor. The actor will also stop after an idle timeout
1102-
/// if it has no connections, an empty inbox, and no other senders than the one stored
1103-
/// in the endpoint map exist.
1104-
#[derive(Debug)]
1105-
pub(super) struct RemoteStateHandle {
1106-
/// Sender for the channel into the [`RemoteStateActor`].
1107-
///
1108-
/// This is a [`GuardedSender`], from which we can get a sender but only if the receiver
1109-
/// hasn't been closed.
1110-
pub(super) sender: GuardedSender<RemoteStateMessage>,
1111-
_task: AbortOnDropHandle<()>,
1112-
}
1113-
11141103
/// Information about a holepunch attempt.
11151104
#[derive(Debug)]
11161105
struct HolepunchAttempt {

0 commit comments

Comments
 (0)