Skip to content

Commit e277ff0

Browse files
committed
feat: remote_info and watcher for selected path
1 parent 3424780 commit e277ff0

File tree

7 files changed

+184
-96
lines changed

7 files changed

+184
-96
lines changed

iroh/examples/monitor-connections.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl Monitor {
110110
Some(conn) = rx.recv() => {
111111
let alpn = String::from_utf8_lossy(conn.alpn()).to_string();
112112
let remote = conn.remote_id().fmt_short();
113-
info!(%remote, %alpn, rtt=?conn.latency(), "new connection");
113+
info!(%remote, %alpn, rtt=?conn.rtt(), "new connection");
114114
tasks.spawn(async move {
115115
match conn.closed().await {
116116
Some((close_reason, stats)) => {

iroh/examples/transfer.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -522,13 +522,21 @@ fn parse_byte_size(s: &str) -> std::result::Result<u64, parse_size::Error> {
522522
}
523523

524524
fn watch_conn_type(endpoint: &Endpoint, endpoint_id: EndpointId) -> AbortOnDropHandle<()> {
525-
let mut stream = endpoint.conn_type(endpoint_id).unwrap().stream();
525+
let info = endpoint.remote_info(endpoint_id).unwrap();
526+
let mut stream = info.selected_path().stream();
526527
let task = tokio::task::spawn(async move {
527-
while let Some(conn_type) = stream.next().await {
528-
println!(
529-
"[{}] Connection type changed to: {conn_type}",
530-
endpoint_id.fmt_short()
531-
);
528+
while let Some(selected_path) = stream.next().await {
529+
if let Some(selected_path) = selected_path {
530+
let label = match selected_path {
531+
TransportAddr::Ip(addr) => format!("direct ({addr})"),
532+
TransportAddr::Relay(url) => format!("relay ({url})"),
533+
_ => format!("unknown transport"),
534+
};
535+
println!(
536+
"[{}] Connection type changed to: {label}",
537+
endpoint_id.fmt_short()
538+
);
539+
}
532540
}
533541
});
534542
AbortOnDropHandle::new(task)

iroh/src/endpoint.rs

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use url::Url;
2626

2727
pub use super::magicsock::{
2828
AddEndpointAddrError, ConnectionType, DirectAddr, DirectAddrType, PathInfo, PathsInfo,
29-
endpoint_map::Source,
29+
RemoteInfo, endpoint_map::Source,
3030
};
3131
#[cfg(wasm_browser)]
3232
use crate::discovery::pkarr::PkarrResolver;
@@ -55,8 +55,8 @@ pub use quinn::{
5555
AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream,
5656
ConnectionClose, ConnectionError, ConnectionStats, MtuDiscoveryConfig, OpenBi, OpenUni,
5757
ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError, RetryError,
58-
SendDatagramError, SendStream, ServerConfig, StoppedError, StreamId, TransportConfig, VarInt,
59-
WeakConnectionHandle, WriteError,
58+
SendDatagramError, SendStream, ServerConfig, Side, StoppedError, StreamId, TransportConfig,
59+
VarInt, WeakConnectionHandle, WriteError,
6060
};
6161
pub use quinn_proto::{
6262
FrameStats, PathStats, TransportError, TransportErrorCode, UdpStats, Written,
@@ -1005,38 +1005,24 @@ impl Endpoint {
10051005
//
10061006
// Partially they return things passed into the builder.
10071007

1008-
/// Returns a [`Watcher`] that reports the current connection type and any changes for
1009-
/// given remote endpoint.
1008+
/// Information about a remote endpoint.
10101009
///
1011-
/// This watcher allows observing a stream of [`ConnectionType`] items by calling
1012-
/// [`Watcher::stream()`]. If the underlying connection to a remote endpoint changes, it will
1013-
/// yield a new item. These connection changes are when the connection switches between
1014-
/// using the Relay server and a direct connection.
1010+
/// From the [`RemoteInfo`] you can watch which path is selected, get the current
1011+
/// round-trip time (latency), and get a list of [`ConnectionInfo`].
10151012
///
1016-
/// Note that this does not guarantee each connection change is yielded in the stream.
1017-
/// If the connection type changes several times before this stream is polled, only the
1018-
/// last recorded state is returned. This can be observed e.g. right at the start of a
1019-
/// connection when the switch from a relayed to a direct connection can be so fast that
1020-
/// the relayed state is never exposed.
1021-
///
1022-
/// If there is currently a connection with the remote endpoint, then using [`Watcher::get`]
1023-
/// will immediately return either [`ConnectionType::Relay`], [`ConnectionType::Direct`]
1024-
/// or [`ConnectionType::Mixed`].
1025-
///
1026-
/// It is possible for the connection type to be [`ConnectionType::None`] if you've
1027-
/// recently connected to this endpoint id but previous methods of reaching the endpoint have
1028-
/// become inaccessible.
1029-
///
1030-
/// Will return `None` if we do not have any address information for the given `endpoint_id`.
1031-
pub fn conn_type(&self, endpoint_id: EndpointId) -> Option<n0_watcher::Direct<ConnectionType>> {
1032-
self.msock.conn_type(endpoint_id)
1013+
/// Returns `None` if we don't have any state for this remote.
1014+
pub fn remote_info(&self, endpoint_id: EndpointId) -> Option<RemoteInfo> {
1015+
self.msock.endpoint_map.remote_info(endpoint_id)
10331016
}
10341017

1035-
/// Returns the currently lowest latency for this endpoint.
1018+
/// Returns a list of all remote endpoints that this endpoint is dealing with.
1019+
///
1020+
/// This includes all endpoints to which we have active connections. It also may include endpoints
1021+
/// to which we are in the process of connecting, or have recently been connected to.
10361022
///
1037-
/// Will return `None` if we do not have any address information for the given `endpoint_id`.
1038-
pub async fn latency(&self, endpoint_id: EndpointId) -> Option<Duration> {
1039-
self.msock.latency(endpoint_id).await
1023+
/// TODO: Expand docs.
1024+
pub fn remotes(&self) -> Vec<RemoteInfo> {
1025+
self.msock.endpoint_map.remotes()
10401026
}
10411027

10421028
/// Returns the DNS resolver used in this [`Endpoint`].

iroh/src/endpoint/connection.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use n0_watcher::{Watchable, Watcher};
3535
use pin_project::pin_project;
3636
use quinn::{
3737
AcceptBi, AcceptUni, ConnectionError, ConnectionStats, OpenBi, OpenUni, ReadDatagram,
38-
RetryError, SendDatagramError, ServerConfig, VarInt, WeakConnectionHandle,
38+
RetryError, SendDatagramError, ServerConfig, Side, VarInt, WeakConnectionHandle,
3939
};
4040
use quinn_proto::PathId;
4141
use tracing::warn;
@@ -249,8 +249,7 @@ fn conn_from_quinn_conn(
249249
};
250250
let info = conn.to_info();
251251
// Register this connection with the magicsock.
252-
ep.msock
253-
.register_connection(info, paths_info_watchable.clone());
252+
ep.msock.register_connection(info, paths_info_watchable);
254253
Ok(conn)
255254
}
256255

@@ -1533,6 +1532,11 @@ impl Connection {
15331532
self.inner.set_max_concurrent_bi_streams(count)
15341533
}
15351534

1535+
/// Returns the side of the connection (client or server).
1536+
pub fn side(&self) -> Side {
1537+
self.inner.side()
1538+
}
1539+
15361540
/// Returns a [`ConnectionInfo`], which is a weak handle to the connection
15371541
/// that does not keep the connection alive, but does allow to access some information
15381542
/// about the connection, and allows to wait for the connection to be closed.
@@ -1542,6 +1546,7 @@ impl Connection {
15421546
remote_id: self.remote_id,
15431547
inner: self.inner.weak_handle(),
15441548
paths_info: self.paths_info.clone(),
1549+
side: self.side(),
15451550
}
15461551
}
15471552
}
@@ -1550,6 +1555,7 @@ impl Connection {
15501555
/// but does not keep the connection alive.
15511556
#[derive(Debug, Clone)]
15521557
pub struct ConnectionInfo {
1558+
pub(crate) side: Side,
15531559
pub(crate) alpn: Vec<u8>,
15541560
pub(crate) remote_id: EndpointId,
15551561
pub(crate) inner: WeakConnectionHandle,
@@ -1584,10 +1590,10 @@ impl ConnectionInfo {
15841590
self.paths_info.get().keys().any(|addr| addr.is_ip())
15851591
}
15861592

1587-
/// Returns the latency for this connection.
1593+
/// Current best estimate of this connection's latency (round-trip-time)
15881594
///
15891595
/// Returns `None` if the connection has been dropped.
1590-
pub fn latency(&self) -> Option<Duration> {
1596+
pub fn rtt(&self) -> Option<Duration> {
15911597
self.inner.upgrade().map(|conn| conn.rtt())
15921598
}
15931599

@@ -1599,10 +1605,17 @@ impl ConnectionInfo {
15991605
}
16001606

16011607
/// Waits for the connection to be closed, and returns the close reason and final connection stats.
1608+
///
1609+
/// Returns `None` if the connection has been dropped already before this call.
16021610
pub async fn closed(&self) -> Option<(ConnectionError, ConnectionStats)> {
16031611
let fut = self.inner.upgrade()?.on_closed();
16041612
Some(fut.await)
16051613
}
1614+
1615+
/// Returns the side of the connection (client or server).
1616+
pub fn side(&self) -> Side {
1617+
self.side
1618+
}
16061619
}
16071620

16081621
#[cfg(test)]

iroh/src/magicsock.rs

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub(crate) mod transports;
7878
use mapped_addrs::{EndpointIdMappedAddr, MappedAddr};
7979

8080
pub use self::{
81-
endpoint_map::{ConnectionType, PathInfo, PathsInfo},
81+
endpoint_map::{ConnectionType, PathInfo, PathsInfo, RemoteInfo},
8282
metrics::Metrics,
8383
};
8484

@@ -299,9 +299,8 @@ impl MagicSock {
299299
// have a ZrttConnection::into_connection() function which can be async and actually
300300
// send this. Before the handshake has completed we don't have anything useful to
301301
// do with this connection inside of the EndpointStateActor anyway.
302-
let weak_handle = conn.inner.clone();
303302
let endpoint_state = self.endpoint_map.endpoint_state_actor(conn.remote_id);
304-
let msg = EndpointStateMessage::AddConnection(weak_handle, paths_info);
303+
let msg = EndpointStateMessage::AddConnection(conn, paths_info);
305304

306305
task::spawn(async move {
307306
endpoint_state.send(msg).await.ok();
@@ -407,32 +406,6 @@ impl MagicSock {
407406
})
408407
}
409408

410-
/// Returns a [`n0_watcher::Direct`] that reports the [`ConnectionType`] we have to the
411-
/// given `endpoint_id`.
412-
///
413-
/// This gets us a copy of the [`n0_watcher::Direct`] for the [`Watchable`] with a
414-
/// [`ConnectionType`] that the `EndpointMap` stores for each `endpoint_id`'s endpoint.
415-
///
416-
/// # Errors
417-
///
418-
/// Will return `None` if there is no address information known about the
419-
/// given `endpoint_id`.
420-
pub(crate) fn conn_type(&self, eid: EndpointId) -> Option<n0_watcher::Direct<ConnectionType>> {
421-
self.endpoint_map.conn_type(eid)
422-
}
423-
424-
// TODO: Build better info to expose to the user about remote nodes. We probably want
425-
// to expose this as part of path information instead.
426-
pub(crate) async fn latency(&self, eid: EndpointId) -> Option<Duration> {
427-
let endpoint_state = self.endpoint_map.endpoint_state_actor(eid);
428-
let (tx, rx) = oneshot::channel();
429-
endpoint_state
430-
.send(EndpointStateMessage::Latency(tx))
431-
.await
432-
.ok();
433-
rx.await.unwrap_or_default()
434-
}
435-
436409
/// Returns the socket address which can be used by the QUIC layer to dial this endpoint.
437410
pub(crate) fn get_endpoint_mapped_addr(&self, eid: EndpointId) -> EndpointIdMappedAddr {
438411
self.endpoint_map.endpoint_mapped_addr(eid)

iroh/src/magicsock/endpoint_map.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ use super::{
1717
mapped_addrs::{AddrMap, EndpointIdMappedAddr, MultipathMappedAddr, RelayMappedAddr},
1818
transports::{self, OwnedTransmit, TransportsSender},
1919
};
20-
use crate::disco::{self};
20+
use crate::disco;
2121
// #[cfg(any(test, feature = "test-utils"))]
2222
// use crate::endpoint::PathSelection;
2323

2424
mod endpoint_state;
2525
mod path_state;
2626

2727
pub(super) use endpoint_state::EndpointStateMessage;
28-
pub use endpoint_state::{ConnectionType, PathInfo, PathsInfo};
28+
pub use endpoint_state::{ConnectionType, PathInfo, PathsInfo, RemoteInfo};
2929
use endpoint_state::{EndpointStateActor, EndpointStateHandle};
3030

3131
// TODO: use this
@@ -122,17 +122,23 @@ impl EndpointMap {
122122
}
123123
}
124124

125-
/// Returns a [`n0_watcher::Direct`] for given endpoint's [`ConnectionType`].
126-
///
127-
/// # Errors
128-
///
129-
/// Will return `None` if there is not an entry in the [`EndpointMap`] for
130-
/// the `endpoint_id`
131-
pub(super) fn conn_type(
132-
&self,
133-
_endpoint_id: EndpointId,
134-
) -> Option<n0_watcher::Direct<ConnectionType>> {
135-
todo!();
125+
pub(crate) fn remote_info(&self, eid: EndpointId) -> Option<RemoteInfo> {
126+
self.actor_handles
127+
.lock()
128+
.expect("poisoned")
129+
.get(&eid)
130+
.map(|handle| RemoteInfo::new(eid, handle.sender.clone(), handle.selected_path.watch()))
131+
}
132+
133+
pub(crate) fn remotes(&self) -> Vec<RemoteInfo> {
134+
self.actor_handles
135+
.lock()
136+
.expect("poisoned")
137+
.iter()
138+
.map(|(eid, handle)| {
139+
RemoteInfo::new(*eid, handle.sender.clone(), handle.selected_path.watch())
140+
})
141+
.collect()
136142
}
137143

138144
/// Returns the sender for the [`EndpointStateActor`].

0 commit comments

Comments
 (0)