Skip to content

Commit 3a8c0af

Browse files
committed
Remove guarded_channel and use tokio::mpsc instead. Return leftover messages
1 parent f857404 commit 3a8c0af

File tree

3 files changed

+23
-115
lines changed

3 files changed

+23
-115
lines changed

iroh/src/magicsock/remote_map.rs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
time::Duration,
88
};
99

10-
use iroh_base::{EndpointAddr, EndpointId, RelayUrl};
10+
use iroh_base::{EndpointId, RelayUrl};
1111
use n0_future::task::JoinSet;
1212
use rustc_hash::FxHashMap;
1313
use serde::{Deserialize, Serialize};
@@ -47,7 +47,7 @@ pub(crate) struct RemoteMap {
4747
// State we keep about remote endpoints.
4848
//
4949
/// The actors tracking each remote endpoint.
50-
actor_senders: Mutex<FxHashMap<EndpointId, GuardedSender<RemoteStateMessage>>>,
50+
actor_senders: Mutex<FxHashMap<EndpointId, mpsc::Sender<RemoteStateMessage>>>,
5151
/// The mapping between [`EndpointId`]s and [`EndpointIdMappedAddr`]s.
5252
pub(super) endpoint_mapped_addrs: AddrMap<EndpointId, EndpointIdMappedAddr>,
5353
/// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s.
@@ -123,19 +123,10 @@ impl RemoteMap {
123123
pub(super) fn remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender<RemoteStateMessage> {
124124
let mut handles = self.actor_senders.lock().expect("poisoned");
125125
match handles.entry(eid) {
126-
hash_map::Entry::Occupied(mut entry) => {
127-
if let Some(sender) = entry.get().get() {
128-
sender
129-
} else {
130-
// The actor is dead: Start a new actor.
131-
let (handle, sender) = self.start_remote_state_actor(eid);
132-
entry.insert(handle);
133-
sender
134-
}
135-
}
126+
hash_map::Entry::Occupied(entry) => entry.get().clone(),
136127
hash_map::Entry::Vacant(entry) => {
137-
let (handle, sender) = self.start_remote_state_actor(eid);
138-
entry.insert(handle);
128+
let sender = self.start_remote_state_actor(eid);
129+
entry.insert(sender.clone());
139130
sender
140131
}
141132
}
@@ -144,16 +135,10 @@ impl RemoteMap {
144135
/// Starts a new remote state actor and returns a handle and a sender.
145136
///
146137
/// The handle is not inserted into the endpoint map, this must be done by the caller of this function.
147-
fn start_remote_state_actor(
148-
&self,
149-
eid: EndpointId,
150-
) -> (
151-
GuardedSender<RemoteStateMessage>,
152-
mpsc::Sender<RemoteStateMessage>,
153-
) {
138+
fn start_remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender<RemoteStateMessage> {
154139
// Ensure there is a RemoteMappedAddr for this EndpointId.
155140
self.endpoint_mapped_addrs.get(&eid);
156-
let sender = RemoteStateActor::new(
141+
RemoteStateActor::new(
157142
eid,
158143
self.local_endpoint_id,
159144
self.local_addrs.clone(),
@@ -163,9 +148,7 @@ impl RemoteMap {
163148
self.sender.clone(),
164149
self.discovery.clone(),
165150
)
166-
.start(self.actor_tasks.lock().expect("poisoned").deref_mut());
167-
let tx = sender.get().expect("just created");
168-
(sender, tx)
151+
.start(self.actor_tasks.lock().expect("poisoned").deref_mut())
169152
}
170153

171154
pub(super) fn handle_ping(&self, msg: disco::Ping, sender: EndpointId, src: transports::Addr) {

iroh/src/magicsock/remote_map/remote_state.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ use quinn_proto::{PathError, PathEvent, PathId, PathStatus};
1919
use rustc_hash::FxHashMap;
2020
use smallvec::SmallVec;
2121
use sync_wrapper::SyncStream;
22-
use tokio::sync::oneshot;
2322
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
2423
use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn};
2524

26-
use self::{
27-
guarded_channel::{GuardedReceiver, GuardedSender, guarded_channel},
28-
path_state::RemotePathState,
29-
};
25+
use self::path_state::RemotePathState;
26+
use tokio::sync::{mpsc, oneshot};
27+
3028
use super::Source;
3129
use crate::{
3230
disco::{self},
@@ -41,7 +39,6 @@ use crate::{
4139
util::MaybeFuture,
4240
};
4341

44-
pub(crate) mod guarded_channel;
4542
mod path_state;
4643

4744
// TODO: use this
@@ -212,8 +209,8 @@ impl RemoteStateActor {
212209
pub(super) fn start(
213210
self,
214211
tasks: &mut JoinSet<Vec<RemoteStateMessage>>,
215-
) -> GuardedSender<RemoteStateMessage> {
216-
let (tx, rx) = guarded_channel(16);
212+
) -> mpsc::Sender<RemoteStateMessage> {
213+
let (tx, rx) = mpsc::channel(16);
217214
let me = self.local_endpoint_id;
218215
let endpoint_id = self.endpoint_id;
219216

@@ -238,7 +235,7 @@ impl RemoteStateActor {
238235
/// discipline is needed to not turn pending for a long time.
239236
async fn run(
240237
mut self,
241-
mut inbox: GuardedReceiver<RemoteStateMessage>,
238+
mut inbox: mpsc::Receiver<RemoteStateMessage>,
242239
) -> Vec<RemoteStateMessage> {
243240
trace!("actor started");
244241
let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT);
@@ -254,7 +251,7 @@ impl RemoteStateActor {
254251
None => MaybeFuture::None,
255252
};
256253
n0_future::pin!(scheduled_hp);
257-
if !inbox.is_idle() || !self.connections.is_empty() {
254+
if !inbox.is_empty() || !self.connections.is_empty() {
258255
idle_timeout
259256
.as_mut()
260257
.reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
@@ -298,16 +295,22 @@ impl RemoteStateActor {
298295
self.handle_discovery_item(item);
299296
}
300297
_ = &mut idle_timeout => {
301-
if self.connections.is_empty() && inbox.close_if_idle() {
298+
if self.connections.is_empty() && inbox.is_empty() {
302299
trace!("idle timeout expired and still idle: terminate actor");
303-
break vec![];
300+
inbox.close();
301+
// There might be a race between checking `inbox.is_empty()` and `inbox.close()`,
302+
// so we pull out all messages that are left over.
303+
let mut leftover_msgs = Vec::with_capacity(inbox.len());
304+
inbox.recv_many(&mut leftover_msgs, inbox.len()).await;
305+
break leftover_msgs;
304306
} else {
305307
// Seems like we weren't really idle, so we reset
306308
idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
307309
}
308310
}
309311
}
310312
};
313+
311314
trace!("actor terminating");
312315
leftover_msgs
313316
}

iroh/src/magicsock/remote_map/remote_state/guarded_channel.rs

Lines changed: 0 additions & 78 deletions
This file was deleted.

0 commit comments

Comments
 (0)