From 091b790163e5470fd9cee3e265b6ed87b4ad532f Mon Sep 17 00:00:00 2001 From: Or Aharonee <17099688+Aharonee@users.noreply.github.com> Date: Thu, 12 Feb 2026 12:03:27 +0200 Subject: [PATCH 1/4] server: fix peer add/done race between peerHandler and syncManager peerDoneHandler ran as a separate goroutine per peer and independently notified both peerHandler (via donePeers channel) and the sync manager (via syncManager.DonePeer) about a peer disconnect. Because these two sends were unsynchronized, the sync manager could observe DonePeer before NewPeer when a peer connected and disconnected quickly. This caused the sync manager to log "unknown peer", then later register the already-dead peer as a sync candidate that was never cleaned up, potentially leaving it stuck with a dead sync peer. Two structural changes eliminate the race: 1. Merge the newPeers and donePeers channels into a single peerLifecycle channel. Since OnVerAck (add) always fires before WaitForDisconnect returns (done), a single FIFO channel guarantees peerHandler always processes add before done for a given peer, removing the select-ambiguity where Go could pick done first. 2. Move the syncManager.DonePeer call and orphan eviction from peerDoneHandler into handleDonePeerMsg, which runs inside peerHandler. All sync manager peer lifecycle notifications now originate from the single peerHandler goroutine and flow into sm.msgChan in guaranteed add-before-done order. --- integration/sync_race_test.go | 228 ++++++++++++++++++++++++++++++++++ server.go | 77 +++++++----- 2 files changed, 275 insertions(+), 30 deletions(-) create mode 100644 integration/sync_race_test.go diff --git a/integration/sync_race_test.go b/integration/sync_race_test.go new file mode 100644 index 0000000000..910e637e69 --- /dev/null +++ b/integration/sync_race_test.go @@ -0,0 +1,228 @@ +//go:build rpctest +// +build rpctest + +package integration + +import ( + "math/rand" + "net" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/integration/rpctest" + "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/require" +) + +const ( + syncRaceIterations = 1000 + syncRaceConcurrency = 300 + syncRaceRunDuration = 90 * time.Second + syncRaceProofBlocks = 5 + syncRaceProofWait = 8 * time.Second +) + +// fakePeerConn connects to the node at nodeAddr, performs the minimum version/verack +// handshake so the node registers a peer (NewPeer) and then disconnects so the node +// runs DonePeer. This simulates attacker traffic: many connections that complete +// handshake then drop, stressing the sync manager's ordering of NewPeer/DonePeer. +func fakePeerConn(nodeAddr string) error { + conn, err := net.DialTimeout("tcp", nodeAddr, 5*time.Second) + if err != nil { + return err + } + defer conn.Close() + + _ = conn.SetDeadline(time.Now().Add(15 * time.Second)) + + nodeTCP, err := net.ResolveTCPAddr("tcp", nodeAddr) + if err != nil { + return err + } + you := wire.NewNetAddress( + nodeTCP, wire.SFNodeNetwork|wire.SFNodeWitness, + ) + me := wire.NewNetAddress( + &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}, + wire.SFNodeNetwork|wire.SFNodeWitness, + ) + you.Timestamp = time.Time{} + me.Timestamp = time.Time{} + + nonce := uint64(rand.Int63()) + msgVersion := wire.NewMsgVersion(me, you, nonce, 0) + msgVersion.Services = wire.SFNodeNetwork | wire.SFNodeWitness + + if err := wire.WriteMessage(conn, msgVersion, wire.ProtocolVersion, wire.SimNet); err != nil { + return err + } + + for { + msg, _, err := wire.ReadMessage(conn, wire.ProtocolVersion, wire.SimNet) + if err != nil { + return err + } + switch msg.(type) { + case *wire.MsgVersion: + // Node's version; send verack. + if err := wire.WriteMessage(conn, wire.NewMsgVerAck(), wire.ProtocolVersion, wire.SimNet); err != nil { + return err + } + + case *wire.MsgSendAddrV2: + // Optional; keep reading. + + case *wire.MsgVerAck: + // Handshake complete; close to trigger DonePeer. + return nil + + default: + // Ignore other messages (e.g. wtxidrelay) and keep reading. + } + } +} + +// TestSyncManagerRaceCorruption stresses a single simnet node with many inbound +// connections that complete the version/verack handshake then disconnect. It then +// proves corruption without a dedicated RPC: connect a fresh node that generates +// blocks; if the stressed node does not sync, it was stuck with a dead sync peer +// (getpeerinfo returns 0 peers when all disconnected; in the corrupted state the +// sync manager still has a dead peer as sync peer, so it ignores the new live one). +func TestSyncManagerRaceCorruption(t *testing.T) { + stressedHarness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") + require.NoError(t, err) + require.NoError(t, stressedHarness.SetUp(true, 0)) + t.Cleanup(func() { + require.NoError(t, stressedHarness.TearDown()) + }) + + nodeAddr := stressedHarness.P2PAddress() + deadline := time.Now().Add(syncRaceRunDuration) + iter := 0 + var done int + doneCh := make(chan struct{}, syncRaceConcurrency*2) + + for time.Now().Before(deadline) && iter < syncRaceIterations { + for i := 0; i < syncRaceConcurrency; i++ { + go func() { + _ = fakePeerConn(nodeAddr) + doneCh <- struct{}{} + }() + } + for i := 0; i < syncRaceConcurrency; i++ { + <-doneCh + done++ + } + iter += syncRaceConcurrency + } + + // Prove corruption: connect a live node and generate blocks. + // If the stressed node was corrupted (dead sync peer, 0 connected peers per getpeerinfo), it will not sync from the new one. + newHarness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") + require.NoError(t, err) + require.NoError(t, newHarness.SetUp(true, 0)) + defer func() { _ = newHarness.TearDown() }() + + require.NoError(t, rpctest.ConnectNode(stressedHarness, newHarness), + "stressed node must connect to the new node") + + _, heightBefore, err := stressedHarness.Client.GetBestBlock() + require.NoError(t, err) + + _, err = newHarness.Client.Generate(syncRaceProofBlocks) + require.NoError(t, err) + + time.Sleep(syncRaceProofWait) + + _, heightAfter, err := stressedHarness.Client.GetBestBlock() + require.NoError(t, err) + + if heightAfter < heightBefore+int32(syncRaceProofBlocks) { + t.Fatalf("proved sync manager corruption after %d fake peer cycles: stressed node did not sync from new peer (height %d -> %d); it was stuck with a dead sync peer instead of the new live one", + done, heightBefore, heightAfter) + } + + t.Logf("completed %d fake peer cycles; stressed node synced from new peer (height %d -> %d), no corruption observed", done, heightBefore, heightAfter) +} + +// TestPreVerackDisconnect verifies that a peer disconnecting before completing +// the version/verack handshake does not corrupt the sync manager state. In this +// case only a peerDone event is produced (no peerAdd), since AddPeer is only +// called from OnVerAck. The node must remain healthy and able to sync afterward. +func TestPreVerackDisconnect(t *testing.T) { + harness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") + require.NoError(t, err) + require.NoError(t, harness.SetUp(true, 0)) + t.Cleanup(func() { _ = harness.TearDown() }) + + nodeAddr := harness.P2PAddress() + + // Connect and send version, then disconnect before receiving or + // sending verack. This produces a peerDone without a preceding + // peerAdd in the lifecycle channel. + for i := 0; i < 50; i++ { + conn, err := net.DialTimeout("tcp", nodeAddr, 5*time.Second) + if err != nil { + continue + } + + _ = conn.SetDeadline(time.Now().Add(5 * time.Second)) + + nodeTCP, err := net.ResolveTCPAddr("tcp", nodeAddr) + if err != nil { + conn.Close() + continue + } + + you := wire.NewNetAddress( + nodeTCP, wire.SFNodeNetwork|wire.SFNodeWitness, + ) + me := wire.NewNetAddress( + &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}, + wire.SFNodeNetwork|wire.SFNodeWitness, + ) + you.Timestamp = time.Time{} + me.Timestamp = time.Time{} + + nonce := uint64(rand.Int63()) + msgVersion := wire.NewMsgVersion(me, you, nonce, 0) + msgVersion.Services = wire.SFNodeNetwork | wire.SFNodeWitness + + _ = wire.WriteMessage( + conn, msgVersion, wire.ProtocolVersion, wire.SimNet, + ) + + // Close immediately without completing the handshake. + conn.Close() + } + + // Allow the node time to process all the disconnects. + time.Sleep(2 * time.Second) + + // Verify the node is still healthy: connect a real peer, generate + // blocks, and confirm the harness syncs them. + helper, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") + require.NoError(t, err) + require.NoError(t, helper.SetUp(true, 0)) + defer func() { _ = helper.TearDown() }() + + require.NoError(t, rpctest.ConnectNode(harness, helper)) + + _, heightBefore, err := harness.Client.GetBestBlock() + require.NoError(t, err) + + _, err = helper.Client.Generate(3) + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + _, heightAfter, err := harness.Client.GetBestBlock() + require.NoError(t, err) + + require.GreaterOrEqual(t, heightAfter, heightBefore+3, + "node failed to sync after pre-verack disconnects") + + t.Logf("node healthy after 50 pre-verack disconnects (height %d -> %d)", + heightBefore, heightAfter) +} diff --git a/server.go b/server.go index 40755c8e9b..f5b6c0b28a 100644 --- a/server.go +++ b/server.go @@ -151,6 +151,23 @@ type updatePeerHeightsMsg struct { originPeer *peer.Peer } +// peerLifecycleAction describes the type of peer lifecycle event. +type peerLifecycleAction uint8 + +const ( + peerAdd peerLifecycleAction = iota + peerDone +) + +// peerLifecycleEvent represents a peer connection or disconnection event. +// Using a single channel for both event types guarantees FIFO ordering: +// the add event from OnVerAck is always enqueued before the done event +// from peerDoneHandler, so the receiver always sees add before done. +type peerLifecycleEvent struct { + action peerLifecycleAction + sp *serverPeer +} + // peerState maintains state of inbound, persistent, outbound peers as well // as banned peers and outbound groups. type peerState struct { @@ -218,8 +235,7 @@ type server struct { cpuMiner *cpuminer.CPUMiner modifyRebroadcastInv chan interface{} p2pDowngrader *peer.P2PDowngrader - newPeers chan *serverPeer - donePeers chan *serverPeer + peerLifecycle chan peerLifecycleEvent banPeers chan *serverPeer query chan interface{} relayInv chan relayMsg @@ -1907,7 +1923,22 @@ func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) { } delete(list, sp.ID()) srvrLog.Debugf("Removed peer %s", sp) - return + } + + // Notify the sync manager the peer is gone and evict any remaining + // orphans that were sent by the peer. This is done here rather than in + // peerDoneHandler so that the notification is serialized with NewPeer + // calls through the peerHandler goroutine, guaranteeing that the sync + // manager always sees NewPeer before DonePeer for a given peer. + if sp.VerAckReceived() { + s.syncManager.DonePeer(sp.Peer) + + numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) + if numEvicted > 0 { + txmpLog.Debugf("Evicted %d %s from peer %v (id %d)", + numEvicted, pickNoun(numEvicted, "orphan", + "orphans"), sp, sp.ID()) + } } } @@ -2291,21 +2322,7 @@ func (s *server) peerDoneHandler(sp *serverPeer) { s.p2pDowngrader.MarkForDowngrade(sp.Addr()) } - // This is sent to a buffered channel, so it may not execute immediately. - s.donePeers <- sp - - // Only tell sync manager we are gone if we ever told it we existed. - if sp.VerAckReceived() { - s.syncManager.DonePeer(sp.Peer) - - // Evict any remaining orphans that were sent by the peer. - numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) - if numEvicted > 0 { - txmpLog.Debugf("Evicted %d %s from peer %v (id %d)", - numEvicted, pickNoun(numEvicted, "orphan", - "orphans"), sp, sp.ID()) - } - } + s.peerLifecycle <- peerLifecycleEvent{action: peerDone, sp: sp} close(sp.quit) } @@ -2348,13 +2365,15 @@ func (s *server) peerHandler() { out: for { select { - // New peers connected to the server. - case p := <-s.newPeers: - s.handleAddPeerMsg(state, p) - - // Disconnected peers. - case p := <-s.donePeers: - s.handleDonePeerMsg(state, p) + // Peer connected or disconnected. + case event := <-s.peerLifecycle: + switch event.action { + case peerAdd: + s.handleAddPeerMsg(state, event.sp) + + case peerDone: + s.handleDonePeerMsg(state, event.sp) + } // Block accepted in mainchain or orphan, update peer height. case umsg := <-s.peerHeightsUpdate: @@ -2395,8 +2414,7 @@ out: cleanup: for { select { - case <-s.newPeers: - case <-s.donePeers: + case <-s.peerLifecycle: case <-s.peerHeightsUpdate: case <-s.relayInv: case <-s.broadcast: @@ -2411,7 +2429,7 @@ cleanup: // AddPeer adds a new peer that has already been connected to the server. func (s *server) AddPeer(sp *serverPeer) { - s.newPeers <- sp + s.peerLifecycle <- peerLifecycleEvent{action: peerAdd, sp: sp} } // BanPeer bans a peer that has already been connected to the server by ip. @@ -2847,8 +2865,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, s := server{ chainParams: chainParams, addrManager: amgr, - newPeers: make(chan *serverPeer, cfg.MaxPeers), - donePeers: make(chan *serverPeer, cfg.MaxPeers), + peerLifecycle: make(chan peerLifecycleEvent, cfg.MaxPeers*2), banPeers: make(chan *serverPeer, cfg.MaxPeers), query: make(chan interface{}), relayInv: make(chan relayMsg, cfg.MaxPeers), From ce913598086f80abef604659c026effd45d1fac1 Mon Sep 17 00:00:00 2001 From: Or Aharonee <17099688+Aharonee@users.noreply.github.com> Date: Mon, 16 Feb 2026 16:58:12 +0200 Subject: [PATCH 2/4] server: serialize peer lifecycle via single goroutine Address review feedback on the peer add/done race fix: - Make peerLifecycleHandler (renamed from peerDoneHandler) the sole sender of both peerAdd and peerDone events for each peer. OnVerAck now closes a signal channel (verAckCh) instead of sending directly, and peerLifecycleHandler selects on verAckCh vs peer.Done() to decide whether to send peerAdd before peerDone. This guarantees ordering by construction: a single goroutine sends both events sequentially, eliminating the negotiateTimeout race window. - Add Done() method to peer.Peer exposing the quit channel read-only, enabling select-based disconnect detection from server code. - Remove the now-unused AddPeer method. - Address style feedback: 80-char line limit, empty lines between switch cases, break long function calls, use require.GreaterOrEqualf instead of if+Fatalf, bump syncRaceConcurrency to 300 for backpressure testing, add TestPreVerackDisconnect for disconnect prior to verack. --- integration/sync_race_test.go | 61 ++++++++++++++++++----------- peer/peer.go | 7 ++++ server.go | 73 +++++++++++++++++++++-------------- 3 files changed, 89 insertions(+), 52 deletions(-) diff --git a/integration/sync_race_test.go b/integration/sync_race_test.go index 910e637e69..b3135990aa 100644 --- a/integration/sync_race_test.go +++ b/integration/sync_race_test.go @@ -23,10 +23,12 @@ const ( syncRaceProofWait = 8 * time.Second ) -// fakePeerConn connects to the node at nodeAddr, performs the minimum version/verack -// handshake so the node registers a peer (NewPeer) and then disconnects so the node -// runs DonePeer. This simulates attacker traffic: many connections that complete -// handshake then drop, stressing the sync manager's ordering of NewPeer/DonePeer. +// fakePeerConn connects to the node at nodeAddr, performs the +// minimum version/verack handshake so the node registers a peer +// (NewPeer) and then disconnects so the node runs DonePeer. This +// simulates attacker traffic: many connections that complete the +// handshake then drop, stressing the sync manager's ordering of +// NewPeer/DonePeer. func fakePeerConn(nodeAddr string) error { conn, err := net.DialTimeout("tcp", nodeAddr, 5*time.Second) if err != nil { @@ -54,7 +56,10 @@ func fakePeerConn(nodeAddr string) error { msgVersion := wire.NewMsgVersion(me, you, nonce, 0) msgVersion.Services = wire.SFNodeNetwork | wire.SFNodeWitness - if err := wire.WriteMessage(conn, msgVersion, wire.ProtocolVersion, wire.SimNet); err != nil { + err = wire.WriteMessage( + conn, msgVersion, wire.ProtocolVersion, wire.SimNet, + ) + if err != nil { return err } @@ -66,7 +71,11 @@ func fakePeerConn(nodeAddr string) error { switch msg.(type) { case *wire.MsgVersion: // Node's version; send verack. - if err := wire.WriteMessage(conn, wire.NewMsgVerAck(), wire.ProtocolVersion, wire.SimNet); err != nil { + err := wire.WriteMessage( + conn, wire.NewMsgVerAck(), + wire.ProtocolVersion, wire.SimNet, + ) + if err != nil { return err } @@ -83,12 +92,12 @@ func fakePeerConn(nodeAddr string) error { } } -// TestSyncManagerRaceCorruption stresses a single simnet node with many inbound -// connections that complete the version/verack handshake then disconnect. It then -// proves corruption without a dedicated RPC: connect a fresh node that generates -// blocks; if the stressed node does not sync, it was stuck with a dead sync peer -// (getpeerinfo returns 0 peers when all disconnected; in the corrupted state the -// sync manager still has a dead peer as sync peer, so it ignores the new live one). +// TestSyncManagerRaceCorruption stresses a single simnet node +// with many inbound connections that complete the version/verack +// handshake then disconnect. It then proves corruption: connect a +// fresh node that generates blocks; if the stressed node does not +// sync, it was stuck with a dead sync peer (the sync manager still +// has a dead peer as sync peer, so it ignores the new live one). func TestSyncManagerRaceCorruption(t *testing.T) { stressedHarness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") require.NoError(t, err) @@ -117,8 +126,9 @@ func TestSyncManagerRaceCorruption(t *testing.T) { iter += syncRaceConcurrency } - // Prove corruption: connect a live node and generate blocks. - // If the stressed node was corrupted (dead sync peer, 0 connected peers per getpeerinfo), it will not sync from the new one. + // Prove corruption: connect a live node and generate blocks. If + // the stressed node was corrupted (dead sync peer, 0 connected + // peers), it will not sync from the new one. newHarness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") require.NoError(t, err) require.NoError(t, newHarness.SetUp(true, 0)) @@ -138,18 +148,23 @@ func TestSyncManagerRaceCorruption(t *testing.T) { _, heightAfter, err := stressedHarness.Client.GetBestBlock() require.NoError(t, err) - if heightAfter < heightBefore+int32(syncRaceProofBlocks) { - t.Fatalf("proved sync manager corruption after %d fake peer cycles: stressed node did not sync from new peer (height %d -> %d); it was stuck with a dead sync peer instead of the new live one", - done, heightBefore, heightAfter) - } + expected := heightBefore + int32(syncRaceProofBlocks) + require.GreaterOrEqualf(t, heightAfter, expected, + "sync manager corruption after %d fake "+ + "peer cycles: node stuck with dead "+ + "sync peer (height %d -> %d)", + done, heightBefore, heightAfter) - t.Logf("completed %d fake peer cycles; stressed node synced from new peer (height %d -> %d), no corruption observed", done, heightBefore, heightAfter) + t.Logf("completed %d fake peer cycles; "+ + "node synced (height %d -> %d)", + done, heightBefore, heightAfter) } -// TestPreVerackDisconnect verifies that a peer disconnecting before completing -// the version/verack handshake does not corrupt the sync manager state. In this -// case only a peerDone event is produced (no peerAdd), since AddPeer is only -// called from OnVerAck. The node must remain healthy and able to sync afterward. +// TestPreVerackDisconnect verifies that a peer disconnecting +// before completing the version/verack handshake does not corrupt +// the sync manager state. In this case only a peerDone event is +// produced (no peerAdd), since peerLifecycleHandler only sends +// peerAdd when verAckCh is closed. The node must remain healthy. func TestPreVerackDisconnect(t *testing.T) { harness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") require.NoError(t, err) diff --git a/peer/peer.go b/peer/peer.go index ee6f3175da..0cd5706ff6 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -2477,6 +2477,13 @@ func (p *Peer) WaitForDisconnect() { <-p.quit } +// Done returns a channel that is closed when the peer has been +// disconnected. This allows callers to select on peer disconnect +// alongside other channels. +func (p *Peer) Done() <-chan struct{} { + return p.quit +} + // ShouldDowngradeToV1 is called when we try to connect to a peer via v2 BIP324 // transport and they hang up. In this case, we should reconnect with the // legacy transport. diff --git a/server.go b/server.go index f5b6c0b28a..a8a5fe0c1a 100644 --- a/server.go +++ b/server.go @@ -155,14 +155,14 @@ type updatePeerHeightsMsg struct { type peerLifecycleAction uint8 const ( - peerAdd peerLifecycleAction = iota + peerAdd peerLifecycleAction = iota peerDone ) -// peerLifecycleEvent represents a peer connection or disconnection event. -// Using a single channel for both event types guarantees FIFO ordering: -// the add event from OnVerAck is always enqueued before the done event -// from peerDoneHandler, so the receiver always sees add before done. +// peerLifecycleEvent represents a peer connection or disconnection +// event. Both event types for a given peer are sent by a single +// goroutine (peerLifecycleHandler), guaranteeing that peerAdd is +// always enqueued before peerDone. type peerLifecycleEvent struct { action peerLifecycleAction sp *serverPeer @@ -295,6 +295,7 @@ type serverPeer struct { knownAddresses lru.Cache banScore connmgr.DynamicBanScore quit chan struct{} + verAckCh chan struct{} // closed when OnVerAck fires // The following chans are used to sync blockmanager and server. txProcessed chan struct{} blockProcessed chan struct{} @@ -309,6 +310,7 @@ func newServerPeer(s *server, isPersistent bool) *serverPeer { filter: bloom.LoadFilter(nil), knownAddresses: lru.NewCache(5000), quit: make(chan struct{}), + verAckCh: make(chan struct{}), txProcessed: make(chan struct{}, 1), blockProcessed: make(chan struct{}, 1), } @@ -551,10 +553,11 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej return nil } -// OnVerAck is invoked when a peer receives a verack bitcoin message and is used -// to kick start communication with them. +// OnVerAck is invoked when a peer receives a verack bitcoin message. +// It signals the peer's lifecycle handler that the handshake is +// complete so it can register the peer with the server. func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) { - sp.server.AddPeer(sp) + close(sp.verAckCh) } // OnMemPool is invoked when a peer receives a mempool bitcoin message. @@ -1925,11 +1928,8 @@ func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) { srvrLog.Debugf("Removed peer %s", sp) } - // Notify the sync manager the peer is gone and evict any remaining - // orphans that were sent by the peer. This is done here rather than in - // peerDoneHandler so that the notification is serialized with NewPeer - // calls through the peerHandler goroutine, guaranteeing that the sync - // manager always sees NewPeer before DonePeer for a given peer. + // Notify the sync manager the peer is gone and evict any + // remaining orphans that were sent by the peer. if sp.VerAckReceived() { s.syncManager.DonePeer(sp.Peer) @@ -2262,7 +2262,7 @@ func (s *server) inboundPeerConnected(conn net.Conn) { sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) + go s.peerLifecycleHandler(sp) } // outboundPeerConnected is invoked by the connection manager when a new @@ -2304,25 +2304,45 @@ func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { sp.connReq = c sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) + go s.peerLifecycleHandler(sp) } -// peerDoneHandler handles peer disconnects by notifying the server that it's -// done along with other performing other desirable cleanup. -func (s *server) peerDoneHandler(sp *serverPeer) { +// peerLifecycleHandler is the sole sender of lifecycle events for a +// given peer. It waits for either verack (handshake complete) or +// disconnect (handshake failed/timed out), sends peerAdd if verack +// was received, then waits for disconnect and sends peerDone. +// Because both sends originate from this single goroutine, +// peerAdd is always enqueued before peerDone. +func (s *server) peerLifecycleHandler(sp *serverPeer) { + // Wait for the handshake to complete or the peer to + // disconnect, whichever comes first. + select { + case <-sp.verAckCh: + s.peerLifecycle <- peerLifecycleEvent{ + action: peerAdd, sp: sp, + } + + case <-sp.Peer.Done(): + // Disconnected before verack; no peerAdd needed. + } + + // Wait for full disconnect (may already be done). sp.WaitForDisconnect() - // If this is an outbound peer and the shouldDowngradeToV1 bool is set - // on the underlying Peer, trigger a reconnect using the OG v1 - // connection scheme. + // If this is an outbound peer and the shouldDowngradeToV1 + // bool is set on the underlying Peer, trigger a reconnect + // using the OG v1 connection scheme. if !sp.Inbound() && sp.Peer.ShouldDowngradeToV1() { - srvrLog.Infof("Peer %s indicated v2->v1 downgrade. "+ - "Marking for next attempt as v1.", sp.Addr()) + srvrLog.Infof("Peer %s indicated v2->v1 downgrade."+ + " Marking for next attempt as v1.", + sp.Addr()) s.p2pDowngrader.MarkForDowngrade(sp.Addr()) } - s.peerLifecycle <- peerLifecycleEvent{action: peerDone, sp: sp} + s.peerLifecycle <- peerLifecycleEvent{ + action: peerDone, sp: sp, + } close(sp.quit) } @@ -2427,11 +2447,6 @@ cleanup: srvrLog.Tracef("Peer handler done") } -// AddPeer adds a new peer that has already been connected to the server. -func (s *server) AddPeer(sp *serverPeer) { - s.peerLifecycle <- peerLifecycleEvent{action: peerAdd, sp: sp} -} - // BanPeer bans a peer that has already been connected to the server by ip. func (s *server) BanPeer(sp *serverPeer) { s.banPeers <- sp From f6d69f7a153077d06016d791b9baf8ab1bd85faf Mon Sep 17 00:00:00 2001 From: Or Aharonee <17099688+Aharonee@users.noreply.github.com> Date: Wed, 18 Feb 2026 14:43:40 +0200 Subject: [PATCH 3/4] server: address review feedback on peer lifecycle handling Prioritize verAckCh in peerLifecycleHandler select to avoid nondeterministic peerAdd skipping when both channels are ready. Guard OnVerAck against double-close by checking the channel before closing, logging an error instead of panicking. Adjust peerLifecycleEvent comment to reflect that peerAdd may be skipped when the peer disconnects before or concurrently with verack. Fix verAckCh field comment formatting. --- server.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index a8a5fe0c1a..2a22dead5d 100644 --- a/server.go +++ b/server.go @@ -161,8 +161,10 @@ const ( // peerLifecycleEvent represents a peer connection or disconnection // event. Both event types for a given peer are sent by a single -// goroutine (peerLifecycleHandler), guaranteeing that peerAdd is -// always enqueued before peerDone. +// goroutine (peerLifecycleHandler), guaranteeing that if peerAdd is +// sent, it is always enqueued before peerDone. peerAdd may be +// skipped entirely when the peer disconnects before or concurrently +// with verack. type peerLifecycleEvent struct { action peerLifecycleAction sp *serverPeer @@ -295,7 +297,8 @@ type serverPeer struct { knownAddresses lru.Cache banScore connmgr.DynamicBanScore quit chan struct{} - verAckCh chan struct{} // closed when OnVerAck fires + // Closed when OnVerAck fires. + verAckCh chan struct{} // The following chans are used to sync blockmanager and server. txProcessed chan struct{} blockProcessed chan struct{} @@ -557,7 +560,13 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // It signals the peer's lifecycle handler that the handshake is // complete so it can register the peer with the server. func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) { - close(sp.verAckCh) + select { + case <-sp.verAckCh: + peerLog.Errorf("OnVerAck called more than once "+ + "for peer %v", sp) + default: + close(sp.verAckCh) + } } // OnMemPool is invoked when a peer receives a mempool bitcoin message. From 08be37f3b6ed35ce5ca18fa109b9531575820c84 Mon Sep 17 00:00:00 2001 From: Or Aharonee <17099688+Aharonee@users.noreply.github.com> Date: Thu, 9 Apr 2026 18:19:15 +0300 Subject: [PATCH 4/4] server, integration: add unit regression tests for peer lifecycle fix Address review feedback on the peer add/done race fix: Add three direct unit tests in server_test.go that exercise the fix without the full server or rpctest harness: - TestOnVerAckDoubleCall: call OnVerAck twice on the same serverPeer, assert no panic and verAckCh remains closed. - TestPeerLifecycleOrdering: verack before disconnect emits peerAdd then peerDone in order. - TestPeerLifecycleSimultaneousReady: both verAckCh and Peer.Done() ready before the handler runs; assert peerDone always arrives and peerAdd, if emitted, precedes it (100 iterations). Harden integration tests in sync_race_test.go: - Check fakePeerConn errors via require.NoError instead of discarding. - Extract dialAndSendVersion helper for TestPreVerackDisconnect; check all errors instead of silently continuing. - Fix comment wording ("produces" -> "is expected to produce"). --- integration/sync_race_test.go | 90 ++++++++++++--------- server_test.go | 147 ++++++++++++++++++++++++++++++++++ 2 files changed, 197 insertions(+), 40 deletions(-) create mode 100644 server_test.go diff --git a/integration/sync_race_test.go b/integration/sync_race_test.go index b3135990aa..9d1bbcedc6 100644 --- a/integration/sync_race_test.go +++ b/integration/sync_race_test.go @@ -110,17 +110,16 @@ func TestSyncManagerRaceCorruption(t *testing.T) { deadline := time.Now().Add(syncRaceRunDuration) iter := 0 var done int - doneCh := make(chan struct{}, syncRaceConcurrency*2) + errCh := make(chan error, syncRaceConcurrency*2) for time.Now().Before(deadline) && iter < syncRaceIterations { for i := 0; i < syncRaceConcurrency; i++ { go func() { - _ = fakePeerConn(nodeAddr) - doneCh <- struct{}{} + errCh <- fakePeerConn(nodeAddr) }() } for i := 0; i < syncRaceConcurrency; i++ { - <-doneCh + require.NoError(t, <-errCh) done++ } iter += syncRaceConcurrency @@ -160,6 +159,45 @@ func TestSyncManagerRaceCorruption(t *testing.T) { done, heightBefore, heightAfter) } +// dialAndSendVersion connects to nodeAddr and sends a version +// message, returning the open connection. The caller is +// responsible for closing it. +func dialAndSendVersion( + t *testing.T, nodeAddr string, +) net.Conn { + + t.Helper() + + conn, err := net.DialTimeout("tcp", nodeAddr, 5*time.Second) + require.NoError(t, err) + + _ = conn.SetDeadline(time.Now().Add(5 * time.Second)) + + nodeTCP, err := net.ResolveTCPAddr("tcp", nodeAddr) + require.NoError(t, err) + + you := wire.NewNetAddress( + nodeTCP, wire.SFNodeNetwork|wire.SFNodeWitness, + ) + me := wire.NewNetAddress( + &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}, + wire.SFNodeNetwork|wire.SFNodeWitness, + ) + you.Timestamp = time.Time{} + me.Timestamp = time.Time{} + + nonce := uint64(rand.Int63()) + msgVersion := wire.NewMsgVersion(me, you, nonce, 0) + msgVersion.Services = wire.SFNodeNetwork | wire.SFNodeWitness + + err = wire.WriteMessage( + conn, msgVersion, wire.ProtocolVersion, wire.SimNet, + ) + require.NoError(t, err) + + return conn +} + // TestPreVerackDisconnect verifies that a peer disconnecting // before completing the version/verack handshake does not corrupt // the sync manager state. In this case only a peerDone event is @@ -174,41 +212,12 @@ func TestPreVerackDisconnect(t *testing.T) { nodeAddr := harness.P2PAddress() // Connect and send version, then disconnect before receiving or - // sending verack. This produces a peerDone without a preceding - // peerAdd in the lifecycle channel. - for i := 0; i < 50; i++ { - conn, err := net.DialTimeout("tcp", nodeAddr, 5*time.Second) - if err != nil { - continue - } - - _ = conn.SetDeadline(time.Now().Add(5 * time.Second)) - - nodeTCP, err := net.ResolveTCPAddr("tcp", nodeAddr) - if err != nil { - conn.Close() - continue - } + // sending verack. This is expected to produce a peerDone without + // a preceding peerAdd in the lifecycle channel. + const preVerackAttempts = 50 - you := wire.NewNetAddress( - nodeTCP, wire.SFNodeNetwork|wire.SFNodeWitness, - ) - me := wire.NewNetAddress( - &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}, - wire.SFNodeNetwork|wire.SFNodeWitness, - ) - you.Timestamp = time.Time{} - me.Timestamp = time.Time{} - - nonce := uint64(rand.Int63()) - msgVersion := wire.NewMsgVersion(me, you, nonce, 0) - msgVersion.Services = wire.SFNodeNetwork | wire.SFNodeWitness - - _ = wire.WriteMessage( - conn, msgVersion, wire.ProtocolVersion, wire.SimNet, - ) - - // Close immediately without completing the handshake. + for i := 0; i < preVerackAttempts; i++ { + conn := dialAndSendVersion(t, nodeAddr) conn.Close() } @@ -238,6 +247,7 @@ func TestPreVerackDisconnect(t *testing.T) { require.GreaterOrEqual(t, heightAfter, heightBefore+3, "node failed to sync after pre-verack disconnects") - t.Logf("node healthy after 50 pre-verack disconnects (height %d -> %d)", - heightBefore, heightAfter) + t.Logf("node healthy after %d pre-verack disconnects "+ + "(height %d -> %d)", + preVerackAttempts, heightBefore, heightAfter) } diff --git a/server_test.go b/server_test.go new file mode 100644 index 0000000000..2b07552c24 --- /dev/null +++ b/server_test.go @@ -0,0 +1,147 @@ +package main + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + // logRotator must be non-nil or any log write (e.g. from + // OnVerAck's double-call guard) panics via logWriter.Write. + initLogRotator(filepath.Join(os.TempDir(), "btcd-server-test.log")) + os.Exit(m.Run()) +} + +// newTestServerPeer creates a minimal serverPeer suitable for unit +// tests that exercise the peer lifecycle logic without starting the +// full server. The returned server's peerLifecycle channel is +// buffered so the handler never blocks during tests. +func newTestServerPeer(t *testing.T) (*server, *serverPeer) { + t.Helper() + + s := &server{ + peerLifecycle: make(chan peerLifecycleEvent, 10), + } + sp := newServerPeer(s, false) + sp.Peer = peer.NewInboundPeer(&peer.Config{ + ChainParams: &chaincfg.SimNetParams, + }) + + return s, sp +} + +// recvLifecycleEvent reads a single event from the peerLifecycle +// channel or fails the test after a timeout. +func recvLifecycleEvent( + t *testing.T, ch <-chan peerLifecycleEvent, +) peerLifecycleEvent { + + t.Helper() + + select { + case ev := <-ch: + return ev + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for peerLifecycleEvent") + return peerLifecycleEvent{} + } +} + +// TestOnVerAckDoubleCall verifies that calling OnVerAck twice on +// the same serverPeer does not panic. The double-call guard must +// log an error and leave verAckCh closed. +func TestOnVerAckDoubleCall(t *testing.T) { + t.Parallel() + + _, sp := newTestServerPeer(t) + + sp.OnVerAck(nil, nil) + + select { + case <-sp.verAckCh: + default: + t.Fatal("verAckCh should be closed after first OnVerAck call") + } + + require.NotPanics(t, func() { + sp.OnVerAck(nil, nil) + }) + + select { + case <-sp.verAckCh: + default: + t.Fatal("verAckCh should still be closed after second OnVerAck call") + } +} + +// TestPeerLifecycleOrdering verifies that when verack arrives before +// disconnect, peerLifecycleHandler emits peerAdd followed by peerDone +// on the peerLifecycle channel -- never out of order. +func TestPeerLifecycleOrdering(t *testing.T) { + t.Parallel() + + s, sp := newTestServerPeer(t) + + // Simulate verack received before the handler starts. + close(sp.verAckCh) + + go s.peerLifecycleHandler(sp) + + first := recvLifecycleEvent(t, s.peerLifecycle) + require.Equal(t, peerAdd, first.action, + "first lifecycle event must be peerAdd") + require.Equal(t, sp, first.sp) + + // Trigger disconnect after peerAdd is observed. + sp.Peer.Disconnect() + + second := recvLifecycleEvent(t, s.peerLifecycle) + require.Equal(t, peerDone, second.action, + "second lifecycle event must be peerDone") + require.Equal(t, sp, second.sp) +} + +// TestPeerLifecycleSimultaneousReady verifies that when both verAckCh +// and Peer.Done() are ready before the handler runs, the system stays +// stable: peerDone is always emitted, and if peerAdd is emitted it +// precedes peerDone. Go's select is nondeterministic so peerAdd may +// be skipped -- both outcomes are valid per documented behavior. +func TestPeerLifecycleSimultaneousReady(t *testing.T) { + t.Parallel() + + const iterations = 100 + var addEmitted int + + for i := 0; i < iterations; i++ { + s, sp := newTestServerPeer(t) + + close(sp.verAckCh) + sp.Peer.Disconnect() + + go s.peerLifecycleHandler(sp) + + first := recvLifecycleEvent(t, s.peerLifecycle) + if first.action == peerAdd { + addEmitted++ + second := recvLifecycleEvent(t, s.peerLifecycle) + assert.Equal(t, peerDone, second.action, + "iteration %d: peerAdd must be "+ + "followed by peerDone", i) + } else { + assert.Equal(t, peerDone, first.action, + "iteration %d: sole event must "+ + "be peerDone", i) + } + } + + t.Logf("peerAdd emitted in %d/%d iterations "+ + "(both outcomes are valid per documented behavior)", + addEmitted, iterations) +}