@@ -33,13 +33,26 @@ use {
3333 std:: {
3434 iter:: repeat_with,
3535 net:: { IpAddr , SocketAddr , UdpSocket } ,
36+ // CAUTION: be careful not to introduce any awaits while holding an RwLock.
3637 sync:: {
3738 atomic:: { AtomicBool , AtomicU64 , Ordering } ,
38- Arc , Mutex , MutexGuard , RwLock ,
39+ Arc , RwLock ,
3940 } ,
4041 time:: { Duration , Instant } ,
4142 } ,
42- tokio:: { task:: JoinHandle , time:: timeout} ,
43+ tokio:: {
44+ // CAUTION: It's kind of sketch that we're mixing async and sync locks (see the RwLock above).
45+ // This is done so that sync code can also access the stake table.
46+ // Make sure we don't hold a sync lock across an await - including the await to
47+ // lock an async Mutex. This does not happen now and should not happen as long as we
48+ // don't hold an async Mutex and sync RwLock at the same time (currently true)
49+ // but if we do, the scope of the RwLock must always be a subset of the async Mutex
50+ // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to
51+ // introduce any other awaits while holding the RwLock.
52+ sync:: { Mutex , MutexGuard } ,
53+ task:: JoinHandle ,
54+ time:: timeout,
55+ } ,
4356} ;
4457
4558const WAIT_FOR_STREAM_TIMEOUT : Duration = Duration :: from_millis ( 100 ) ;
@@ -384,7 +397,7 @@ fn handle_and_cache_new_connection(
384397 }
385398}
386399
387- fn prune_unstaked_connections_and_add_new_connection (
400+ async fn prune_unstaked_connections_and_add_new_connection (
388401 connection : Connection ,
389402 connection_table : Arc < Mutex < ConnectionTable > > ,
390403 max_connections : usize ,
@@ -395,7 +408,7 @@ fn prune_unstaked_connections_and_add_new_connection(
395408 let stats = params. stats . clone ( ) ;
396409 if max_connections > 0 {
397410 let connection_table_clone = connection_table. clone ( ) ;
398- let mut connection_table = connection_table. lock ( ) . unwrap ( ) ;
411+ let mut connection_table = connection_table. lock ( ) . await ;
399412 prune_unstaked_connection_table ( & mut connection_table, max_connections, stats) ;
400413 handle_and_cache_new_connection (
401414 connection,
@@ -505,7 +518,8 @@ async fn setup_connection(
505518
506519 match params. peer_type {
507520 ConnectionPeerType :: Staked ( stake) => {
508- let mut connection_table_l = staked_connection_table. lock ( ) . unwrap ( ) ;
521+ let mut connection_table_l = staked_connection_table. lock ( ) . await ;
522+
509523 if connection_table_l. total_size >= max_staked_connections {
510524 let num_pruned =
511525 connection_table_l. prune_random ( PRUNE_RANDOM_SAMPLE_SIZE , stake) ;
@@ -536,7 +550,9 @@ async fn setup_connection(
536550 & params,
537551 wait_for_chunk_timeout,
538552 stream_load_ema. clone ( ) ,
539- ) {
553+ )
554+ . await
555+ {
540556 stats
541557 . connection_added_from_staked_peer
542558 . fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -558,7 +574,9 @@ async fn setup_connection(
558574 & params,
559575 wait_for_chunk_timeout,
560576 stream_load_ema. clone ( ) ,
561- ) {
577+ )
578+ . await
579+ {
562580 stats
563581 . connection_added_from_unstaked_peer
564582 . fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -801,7 +819,7 @@ async fn handle_connection(
801819 }
802820 }
803821
804- let removed_connection_count = connection_table. lock ( ) . unwrap ( ) . remove_connection (
822+ let removed_connection_count = connection_table. lock ( ) . await . remove_connection (
805823 ConnectionTableKey :: new ( remote_addr. ip ( ) , params. remote_pubkey ) ,
806824 remote_addr. port ( ) ,
807825 stable_id,
0 commit comments