@@ -6,7 +6,10 @@ use iroh::{
66} ;
77use n0_error:: { Result , StackResultExt , StdResultExt , ensure_any} ;
88use n0_future:: task:: AbortOnDropHandle ;
9- use tokio:: sync:: mpsc:: { UnboundedReceiver , UnboundedSender } ;
9+ use tokio:: {
10+ sync:: mpsc:: { UnboundedReceiver , UnboundedSender } ,
11+ task:: JoinSet ,
12+ } ;
1013use tracing:: { Instrument , info, info_span} ;
1114
1215const ALPN : & [ u8 ] = b"iroh/test" ;
@@ -101,15 +104,32 @@ impl Monitor {
101104 }
102105
103106 async fn run ( mut rx : UnboundedReceiver < ConnectionInfo > ) {
107+ let mut tasks = JoinSet :: new ( ) ;
104108 loop {
105109 tokio:: select! {
106110 Some ( conn) = rx. recv( ) => {
107111 let alpn = String :: from_utf8_lossy( conn. alpn( ) ) . to_string( ) ;
108112 let remote = conn. remote_id( ) . fmt_short( ) ;
109113 info!( %remote, %alpn, rtt=?conn. rtt( ) , "new connection" ) ;
114+ tasks. spawn( async move {
115+ match conn. closed( ) . await {
116+ Some ( ( close_reason, stats) ) => {
117+ // We have access to the final stats of the connection!
118+ info!( %remote, %alpn, ?close_reason, udp_rx=stats. udp_rx. bytes, udp_tx=stats. udp_tx. bytes, "connection closed" ) ;
119+ }
120+ None => {
121+ // The connection was closed before we could register our stats-on-close listener.
122+ info!( %remote, %alpn, "connection closed before tracking started" ) ;
123+ }
124+ }
125+ } . instrument( tracing:: Span :: current( ) ) ) ;
110126 }
127+ Some ( res) = tasks. join_next( ) , if !tasks. is_empty( ) => res. expect( "conn close task panicked" ) ,
111128 else => break ,
112129 }
130+ while let Some ( res) = tasks. join_next ( ) . await {
131+ res. expect ( "conn close task panicked" ) ;
132+ }
113133 }
114134 }
115135}
0 commit comments