@@ -404,7 +404,7 @@ impl ConnMgr {
404404 self . on_task_exit( task_id) . await ;
405405 }
406406 Err ( err) => {
407- error !( self . log, "Connection task panic: {err}" ) ;
407+ warn !( self . log, "Connection task panic: {err}" ) ;
408408 self . on_task_exit( err. id( ) ) . await ;
409409 }
410410
@@ -482,7 +482,19 @@ impl ConnMgr {
482482 tx,
483483 conn_type : ConnectionType :: Accepted ( addr) ,
484484 } ;
485- assert ! ( self . accepting. insert_unique( task_handle) . is_ok( ) ) ;
485+ let replaced = self . accepting . insert_overwrite ( task_handle) ;
486+ for h in replaced {
487+ // We accepted a connection from the same `SocketAddrV6` before the
488+ // old one was torn down. This should be rare, if not impossible.
489+ warn ! (
490+ self . log,
491+ "Accepted connection replaced. Aborting old task." ;
492+ "task_id" => ?h. task_id( ) ,
493+ "peer_addr" => %h. addr( ) ,
494+ ) ;
495+ h. abort ( ) ;
496+ }
497+
486498 Ok ( ( ) )
487499 }
488500
@@ -501,12 +513,38 @@ impl ConnMgr {
501513 "peer_id" => %peer_id
502514 ) ;
503515
504- let already_established = self . established . insert_unique (
516+ let replaced = self . established . insert_overwrite (
505517 EstablishedTaskHandle :: new ( peer_id, task_handle) ,
506518 ) ;
507- assert ! ( already_established. is_ok( ) ) ;
519+
520+ // The only reason for for established connections to be replaced
521+ // like this is when the IP address for a peer changes, but the
522+ // previous connection has not yet been torn down.
523+ //
524+ // Tear down usually happens quickly due to TCP reset or missed
525+ // pings. However if the new ip address is fed into the task via
526+ // `load_peer_addresses` and the peer at that address connects
527+ // before the old connection is torn down, you end up in this
528+ // situation.
529+ //
530+ // This isn't really possible, except in tests where we change port
531+ // numbers when simulating crash and restart of nodes, and do this
532+ // very quickly. We change port numbers because `NodeTask`s listen
533+ // on port 0 and use ephemeral ports to prevent collisions in tests
534+ // where the IP address is localhost.
535+ for h in replaced {
536+ warn ! (
537+ self . log,
538+ "Established connection replaced. Aborting old task." ;
539+ "task_id" => ?h. task_id( ) ,
540+ "peer_addr" => %h. addr( ) ,
541+ "peer_id" => %h. baseboard_id
542+ ) ;
543+ h. abort ( ) ;
544+ }
508545 } else {
509- error ! ( self . log, "Server handshake completed, but no server addr in map" ;
546+ warn ! ( self . log,
547+ "Server handshake completed, but no server addr in map" ;
510548 "task_id" => ?task_id,
511549 "peer_addr" => %addr,
512550 "peer_id" => %peer_id
@@ -528,12 +566,38 @@ impl ConnMgr {
528566 "peer_addr" => %addr,
529567 "peer_id" => %peer_id
530568 ) ;
531- let already_established = self . established . insert_unique (
569+ let replaced = self . established . insert_overwrite (
532570 EstablishedTaskHandle :: new ( peer_id, task_handle) ,
533571 ) ;
534- assert ! ( already_established. is_ok( ) ) ;
572+
573+ // The only reason for for established connections to be replaced
574+ // like this is when the IP address for a peer changes, but the
575+ // previous connection has not yet been torn down.
576+ //
577+ // Tear down usually happens quickly due to TCP reset or missed
578+ // pings. However if the new ip address is fed into the task via
579+ // `load_peer_addresses` and the peer at that address connects
580+ // before the old connection is torn down, you end up in this
581+ // situation.
582+ //
583+ // This isn't really possible, except in tests where we change port
584+ // numbers when simulating crash and restart of nodes, and do this
585+ // very quickly. We change port numbers because `NodeTask`s listen
586+ // on port 0 and use ephemeral ports to prevent collisions in tests
587+ // where the IP address is localhost.
588+ for h in replaced {
589+ warn ! (
590+ self . log,
591+ "Established connection replaced. Aborting old task." ;
592+ "task_id" => ?h. task_id( ) ,
593+ "peer_addr" => %h. addr( ) ,
594+ "peer_id" => %h. baseboard_id
595+ ) ;
596+ h. abort ( ) ;
597+ }
535598 } else {
536- error ! ( self . log, "Client handshake completed, but no client addr in map" ;
599+ warn ! ( self . log,
600+ "Client handshake completed, but no client addr in map" ;
537601 "task_id" => ?task_id,
538602 "peer_addr" => %addr,
539603 "peer_id" => %peer_id
@@ -634,7 +698,7 @@ impl ConnMgr {
634698 disconnected_peers
635699 }
636700
637- /// Spawn a task to estalbish a sprockets connection for the given address
701+ /// Spawn a task to establish a sprockets connection for the given address
638702 async fn connect_client (
639703 & mut self ,
640704 corpus : Vec < Utf8PathBuf > ,
0 commit comments