Skip to content

Commit 50c9724

Browse files
committed
feat: add connection monitor
1 parent 4aa6f6d commit 50c9724

File tree

3 files changed

+172
-1
lines changed

3 files changed

+172
-1
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use iroh::{
4+
Endpoint, RelayMode,
5+
endpoint::{ConnectionInfo, ConnectionMonitor},
6+
};
7+
use n0_error::{Result, StackResultExt, StdResultExt, ensure_any};
8+
use n0_future::task::AbortOnDropHandle;
9+
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
10+
use tracing::{Instrument, info, info_span};
11+
12+
const ALPN: &[u8] = b"iroh/test";
13+
14+
#[tokio::main]
15+
async fn main() -> Result {
16+
tracing_subscriber::fmt()
17+
.with_env_filter(
18+
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
19+
)
20+
.init();
21+
22+
let monitor = Monitor::new();
23+
let server = Endpoint::empty_builder(RelayMode::Disabled)
24+
.alpns(vec![ALPN.to_vec()])
25+
.monitor_connections(monitor.clone())
26+
.bind()
27+
.instrument(info_span!("server"))
28+
.await?;
29+
let server_addr = server.addr();
30+
31+
let count = 2;
32+
33+
let client_task = tokio::spawn(
34+
async move {
35+
let client = Endpoint::empty_builder(RelayMode::Disabled)
36+
.bind()
37+
.instrument(info_span!("client"))
38+
.await?;
39+
for _i in 0..count {
40+
let conn = client.connect(server_addr.clone(), ALPN).await?;
41+
let mut s = conn.accept_uni().await.anyerr()?;
42+
let data = s.read_to_end(2).await.anyerr()?;
43+
ensure_any!(data == b"hi", "unexpected data");
44+
conn.close(23u32.into(), b"bye");
45+
}
46+
client.close().await;
47+
n0_error::Ok(client)
48+
}
49+
.instrument(info_span!("client")),
50+
);
51+
52+
let server_task = tokio::spawn(
53+
async move {
54+
for _i in 0..count {
55+
let conn = server
56+
.accept()
57+
.await
58+
.context("server endpoint closed")?
59+
.await?;
60+
let mut s = conn.open_uni().await.anyerr()?;
61+
s.write_all(b"hi").await.anyerr()?;
62+
conn.closed().await;
63+
}
64+
info!("done");
65+
server.close().await;
66+
n0_error::Ok(())
67+
}
68+
.instrument(info_span!("server")),
69+
);
70+
client_task.await.std_context("client")?.context("client")?;
71+
server_task.await.std_context("server")?.context("server")?;
72+
tokio::time::sleep(Duration::from_secs(1)).await;
73+
drop(monitor);
74+
Ok(())
75+
}
76+
77+
/// Our connection monitor impl.
78+
///
79+
/// This here only logs connection open and close events via tracing.
80+
/// It could also maintain a datastructure of all connections, or send the stats to some metrics service.
81+
#[derive(Clone)]
82+
struct Monitor {
83+
tx: UnboundedSender<ConnectionInfo>,
84+
_task: Arc<AbortOnDropHandle<()>>,
85+
}
86+
87+
impl ConnectionMonitor for Monitor {
88+
fn on_connection(&self, connection: ConnectionInfo) {
89+
self.tx.send(connection).ok();
90+
}
91+
}
92+
93+
impl Monitor {
94+
fn new() -> Self {
95+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
96+
let task = tokio::spawn(Self::run(rx).instrument(info_span!("watcher")));
97+
Self {
98+
tx,
99+
_task: Arc::new(AbortOnDropHandle::new(task)),
100+
}
101+
}
102+
103+
async fn run(mut rx: UnboundedReceiver<ConnectionInfo>) {
104+
loop {
105+
tokio::select! {
106+
Some(conn) = rx.recv() => {
107+
let alpn = String::from_utf8_lossy(conn.alpn()).to_string();
108+
let remote = conn.remote_id().fmt_short();
109+
info!(%remote, %alpn, rtt=?conn.rtt(), "new connection");
110+
}
111+
else => break,
112+
}
113+
}
114+
}
115+
}

iroh/src/endpoint.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ pub struct Builder {
117117
#[cfg(any(test, feature = "test-utils"))]
118118
path_selection: PathSelection,
119119
max_tls_tickets: usize,
120+
#[debug("{}", connection_monitor.as_ref().map(|_| "Some(Box<dyn ConnectionMonitor>)").unwrap_or("None"))]
121+
connection_monitor: Option<Box<dyn ConnectionMonitor>>,
120122
}
121123

122124
impl Builder {
@@ -158,6 +160,7 @@ impl Builder {
158160
#[cfg(any(test, feature = "test-utils"))]
159161
path_selection: PathSelection::default(),
160162
max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
163+
connection_monitor: None,
161164
}
162165
}
163166

@@ -208,6 +211,7 @@ impl Builder {
208211
// #[cfg(any(test, feature = "test-utils"))]
209212
// path_selection: self.path_selection,
210213
metrics,
214+
connection_monitor: self.connection_monitor,
211215
};
212216

213217
let msock = magicsock::MagicSock::spawn(msock_opts).await?;
@@ -432,6 +436,44 @@ impl Builder {
432436
self.max_tls_tickets = n;
433437
self
434438
}
439+
440+
// TODO docs
441+
/// Register a handler that is invoked for each connection the endpoint accepts or initiates.
442+
///
443+
/// The [`ConnectionMonitor::on_connection`] method is invoked synchronosuly, from within a tokio
444+
/// context. So you can spawn tasks if needed.
445+
/// Make sure that whatever you do with the connection info here is non-blocking.
446+
/// Usually you'd want to send the info over a broadcast or unbounded channel,
447+
/// or insert it into some persistent datastructure.
448+
///
449+
/// The `ConnectionInfo` internally contains a weak reference to the connection,
450+
/// so keeping the struct alive does not keep the connection alive.
451+
/// Note however that `ConnectionInfo` does keep an allocation per connection alive
452+
/// so to not leak memory you should drop the `ConnectionInfo` eventually
453+
///
454+
/// [`ConnectionMonitor`] is implemented for `Fn(ConnectionInfo)`, so you can
455+
/// also pass a closure that takes [`ConnectionInfo`] to this function.
456+
pub fn monitor_connections(mut self, monitor: impl ConnectionMonitor) -> Self {
457+
self.connection_monitor = Some(Box::new(monitor));
458+
self
459+
}
460+
}
461+
462+
/// Monitor each connection accepted or initiated by the endpoint.
463+
pub trait ConnectionMonitor: Send + Sync + 'static {
464+
/// Called for each new connection the endpoint accepts or initiates.
465+
///
466+
/// This is only called when a connection is fully established.
467+
fn on_connection(&self, connection: ConnectionInfo);
468+
}
469+
470+
impl<T> ConnectionMonitor for T
471+
where
472+
T: Fn(ConnectionInfo) + Send + Sync + 'static,
473+
{
474+
fn on_connection(&self, connection: ConnectionInfo) {
475+
(self)(connection)
476+
}
435477
}
436478

437479
/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.

iroh/src/magicsock.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ use crate::{
6363
defaults::timeouts::NET_REPORT_TIMEOUT,
6464
disco::{self, SendAddr},
6565
discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData},
66-
endpoint::ConnectionInfo,
66+
endpoint::{ConnectionInfo, ConnectionMonitor},
6767
key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box},
6868
metrics::EndpointMetrics,
6969
net_report::{self, IfStateDetails, Report},
@@ -148,6 +148,8 @@ pub(crate) struct Options {
148148
// #[cfg(any(test, feature = "test-utils"))]
149149
// pub(crate) path_selection: PathSelection,
150150
pub(crate) metrics: EndpointMetrics,
151+
#[debug("{}", connection_monitor.as_ref().map(|_| "Some(Box<dyn ConnectionMonitor>)").unwrap_or("None"))]
152+
pub(crate) connection_monitor: Option<Box<dyn ConnectionMonitor>>,
151153
}
152154

153155
/// Handle for [`MagicSock`].
@@ -219,6 +221,9 @@ pub(crate) struct MagicSock {
219221

220222
/// Metrics
221223
pub(crate) metrics: EndpointMetrics,
224+
225+
#[debug("{}", connection_monitor.as_ref().map(|_| "Some(Box<dyn ConnectionMonitor>)").unwrap_or("None"))]
226+
connection_monitor: Option<Box<dyn ConnectionMonitor>>,
222227
}
223228

224229
#[allow(missing_docs)]
@@ -274,6 +279,11 @@ impl MagicSock {
274279
conn: ConnectionInfo,
275280
paths_info: n0_watcher::Watchable<PathsInfo>,
276281
) {
282+
// Inform the monitor about the new connection.
283+
if let Some(monitor) = self.connection_monitor.as_ref() {
284+
monitor.on_connection(conn.clone());
285+
}
286+
277287
// TODO: Spawning tasks like this is obviously bad. But it is solvable:
278288
// - This is only called from inside Connection::new.
279289
// - Connection::new is called from:
@@ -942,6 +952,7 @@ impl Handle {
942952
// #[cfg(any(test, feature = "test-utils"))]
943953
// path_selection,
944954
metrics,
955+
connection_monitor,
945956
} = opts;
946957

947958
let discovery = ConcurrentDiscovery::default();
@@ -1018,6 +1029,7 @@ impl Handle {
10181029
local_addrs_watch: transports.local_addrs_watch(),
10191030
#[cfg(not(wasm_browser))]
10201031
ip_bind_addrs: transports.ip_bind_addrs(),
1032+
connection_monitor,
10211033
});
10221034

10231035
let mut endpoint_config = quinn::EndpointConfig::default();
@@ -1919,6 +1931,7 @@ mod tests {
19191931
// path_selection: PathSelection::default(),
19201932
discovery_user_data: None,
19211933
metrics: Default::default(),
1934+
connection_monitor: None,
19221935
}
19231936
}
19241937

@@ -2351,6 +2364,7 @@ mod tests {
23512364
insecure_skip_relay_cert_verify: false,
23522365
// path_selection: PathSelection::default(),
23532366
metrics: Default::default(),
2367+
connection_monitor: None,
23542368
};
23552369
let msock = MagicSock::spawn(opts).await?;
23562370
Ok(msock)

0 commit comments

Comments
 (0)