diff --git a/integration/sync_race_test.go b/integration/sync_race_test.go new file mode 100644 index 0000000000..9d1bbcedc6 --- /dev/null +++ b/integration/sync_race_test.go @@ -0,0 +1,253 @@ +//go:build rpctest +// +build rpctest + +package integration + +import ( + "math/rand" + "net" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/integration/rpctest" + "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/require" +) + +const ( + syncRaceIterations = 1000 + syncRaceConcurrency = 300 + syncRaceRunDuration = 90 * time.Second + syncRaceProofBlocks = 5 + syncRaceProofWait = 8 * time.Second +) + +// fakePeerConn connects to the node at nodeAddr, performs the +// minimum version/verack handshake so the node registers a peer +// (NewPeer) and then disconnects so the node runs DonePeer. This +// simulates attacker traffic: many connections that complete the +// handshake then drop, stressing the sync manager's ordering of +// NewPeer/DonePeer. +func fakePeerConn(nodeAddr string) error { + conn, err := net.DialTimeout("tcp", nodeAddr, 5*time.Second) + if err != nil { + return err + } + defer conn.Close() + + _ = conn.SetDeadline(time.Now().Add(15 * time.Second)) + + nodeTCP, err := net.ResolveTCPAddr("tcp", nodeAddr) + if err != nil { + return err + } + you := wire.NewNetAddress( + nodeTCP, wire.SFNodeNetwork|wire.SFNodeWitness, + ) + me := wire.NewNetAddress( + &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}, + wire.SFNodeNetwork|wire.SFNodeWitness, + ) + you.Timestamp = time.Time{} + me.Timestamp = time.Time{} + + nonce := uint64(rand.Int63()) + msgVersion := wire.NewMsgVersion(me, you, nonce, 0) + msgVersion.Services = wire.SFNodeNetwork | wire.SFNodeWitness + + err = wire.WriteMessage( + conn, msgVersion, wire.ProtocolVersion, wire.SimNet, + ) + if err != nil { + return err + } + + for { + msg, _, err := wire.ReadMessage(conn, wire.ProtocolVersion, wire.SimNet) + if err != nil { + return err + } + switch msg.(type) { + case *wire.MsgVersion: + // Node's version; send verack. + err := wire.WriteMessage( + conn, wire.NewMsgVerAck(), + wire.ProtocolVersion, wire.SimNet, + ) + if err != nil { + return err + } + + case *wire.MsgSendAddrV2: + // Optional; keep reading. + + case *wire.MsgVerAck: + // Handshake complete; close to trigger DonePeer. + return nil + + default: + // Ignore other messages (e.g. wtxidrelay) and keep reading. + } + } +} + +// TestSyncManagerRaceCorruption stresses a single simnet node +// with many inbound connections that complete the version/verack +// handshake then disconnect. It then proves corruption: connect a +// fresh node that generates blocks; if the stressed node does not +// sync, it was stuck with a dead sync peer (the sync manager still +// has a dead peer as sync peer, so it ignores the new live one). +func TestSyncManagerRaceCorruption(t *testing.T) { + stressedHarness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") + require.NoError(t, err) + require.NoError(t, stressedHarness.SetUp(true, 0)) + t.Cleanup(func() { + require.NoError(t, stressedHarness.TearDown()) + }) + + nodeAddr := stressedHarness.P2PAddress() + deadline := time.Now().Add(syncRaceRunDuration) + iter := 0 + var done int + errCh := make(chan error, syncRaceConcurrency*2) + + for time.Now().Before(deadline) && iter < syncRaceIterations { + for i := 0; i < syncRaceConcurrency; i++ { + go func() { + errCh <- fakePeerConn(nodeAddr) + }() + } + for i := 0; i < syncRaceConcurrency; i++ { + require.NoError(t, <-errCh) + done++ + } + iter += syncRaceConcurrency + } + + // Prove corruption: connect a live node and generate blocks. If + // the stressed node was corrupted (dead sync peer, 0 connected + // peers), it will not sync from the new one. + newHarness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") + require.NoError(t, err) + require.NoError(t, newHarness.SetUp(true, 0)) + defer func() { _ = newHarness.TearDown() }() + + require.NoError(t, rpctest.ConnectNode(stressedHarness, newHarness), + "stressed node must connect to the new node") + + _, heightBefore, err := stressedHarness.Client.GetBestBlock() + require.NoError(t, err) + + _, err = newHarness.Client.Generate(syncRaceProofBlocks) + require.NoError(t, err) + + time.Sleep(syncRaceProofWait) + + _, heightAfter, err := stressedHarness.Client.GetBestBlock() + require.NoError(t, err) + + expected := heightBefore + int32(syncRaceProofBlocks) + require.GreaterOrEqualf(t, heightAfter, expected, + "sync manager corruption after %d fake "+ + "peer cycles: node stuck with dead "+ + "sync peer (height %d -> %d)", + done, heightBefore, heightAfter) + + t.Logf("completed %d fake peer cycles; "+ + "node synced (height %d -> %d)", + done, heightBefore, heightAfter) +} + +// dialAndSendVersion connects to nodeAddr and sends a version +// message, returning the open connection. The caller is +// responsible for closing it. +func dialAndSendVersion( + t *testing.T, nodeAddr string, +) net.Conn { + + t.Helper() + + conn, err := net.DialTimeout("tcp", nodeAddr, 5*time.Second) + require.NoError(t, err) + + _ = conn.SetDeadline(time.Now().Add(5 * time.Second)) + + nodeTCP, err := net.ResolveTCPAddr("tcp", nodeAddr) + require.NoError(t, err) + + you := wire.NewNetAddress( + nodeTCP, wire.SFNodeNetwork|wire.SFNodeWitness, + ) + me := wire.NewNetAddress( + &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}, + wire.SFNodeNetwork|wire.SFNodeWitness, + ) + you.Timestamp = time.Time{} + me.Timestamp = time.Time{} + + nonce := uint64(rand.Int63()) + msgVersion := wire.NewMsgVersion(me, you, nonce, 0) + msgVersion.Services = wire.SFNodeNetwork | wire.SFNodeWitness + + err = wire.WriteMessage( + conn, msgVersion, wire.ProtocolVersion, wire.SimNet, + ) + require.NoError(t, err) + + return conn +} + +// TestPreVerackDisconnect verifies that a peer disconnecting +// before completing the version/verack handshake does not corrupt +// the sync manager state. In this case only a peerDone event is +// produced (no peerAdd), since peerLifecycleHandler only sends +// peerAdd when verAckCh is closed. The node must remain healthy. +func TestPreVerackDisconnect(t *testing.T) { + harness, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") + require.NoError(t, err) + require.NoError(t, harness.SetUp(true, 0)) + t.Cleanup(func() { _ = harness.TearDown() }) + + nodeAddr := harness.P2PAddress() + + // Connect and send version, then disconnect before receiving or + // sending verack. This is expected to produce a peerDone without + // a preceding peerAdd in the lifecycle channel. + const preVerackAttempts = 50 + + for i := 0; i < preVerackAttempts; i++ { + conn := dialAndSendVersion(t, nodeAddr) + conn.Close() + } + + // Allow the node time to process all the disconnects. + time.Sleep(2 * time.Second) + + // Verify the node is still healthy: connect a real peer, generate + // blocks, and confirm the harness syncs them. + helper, err := rpctest.New(&chaincfg.SimNetParams, nil, nil, "") + require.NoError(t, err) + require.NoError(t, helper.SetUp(true, 0)) + defer func() { _ = helper.TearDown() }() + + require.NoError(t, rpctest.ConnectNode(harness, helper)) + + _, heightBefore, err := harness.Client.GetBestBlock() + require.NoError(t, err) + + _, err = helper.Client.Generate(3) + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + _, heightAfter, err := harness.Client.GetBestBlock() + require.NoError(t, err) + + require.GreaterOrEqual(t, heightAfter, heightBefore+3, + "node failed to sync after pre-verack disconnects") + + t.Logf("node healthy after %d pre-verack disconnects "+ + "(height %d -> %d)", + preVerackAttempts, heightBefore, heightAfter) +} diff --git a/peer/peer.go b/peer/peer.go index ee6f3175da..0cd5706ff6 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -2477,6 +2477,13 @@ func (p *Peer) WaitForDisconnect() { <-p.quit } +// Done returns a channel that is closed when the peer has been +// disconnected. This allows callers to select on peer disconnect +// alongside other channels. +func (p *Peer) Done() <-chan struct{} { + return p.quit +} + // ShouldDowngradeToV1 is called when we try to connect to a peer via v2 BIP324 // transport and they hang up. In this case, we should reconnect with the // legacy transport. diff --git a/server.go b/server.go index 40755c8e9b..2a22dead5d 100644 --- a/server.go +++ b/server.go @@ -151,6 +151,25 @@ type updatePeerHeightsMsg struct { originPeer *peer.Peer } +// peerLifecycleAction describes the type of peer lifecycle event. +type peerLifecycleAction uint8 + +const ( + peerAdd peerLifecycleAction = iota + peerDone +) + +// peerLifecycleEvent represents a peer connection or disconnection +// event. Both event types for a given peer are sent by a single +// goroutine (peerLifecycleHandler), guaranteeing that if peerAdd is +// sent, it is always enqueued before peerDone. peerAdd may be +// skipped entirely when the peer disconnects before or concurrently +// with verack. +type peerLifecycleEvent struct { + action peerLifecycleAction + sp *serverPeer +} + // peerState maintains state of inbound, persistent, outbound peers as well // as banned peers and outbound groups. type peerState struct { @@ -218,8 +237,7 @@ type server struct { cpuMiner *cpuminer.CPUMiner modifyRebroadcastInv chan interface{} p2pDowngrader *peer.P2PDowngrader - newPeers chan *serverPeer - donePeers chan *serverPeer + peerLifecycle chan peerLifecycleEvent banPeers chan *serverPeer query chan interface{} relayInv chan relayMsg @@ -279,6 +297,8 @@ type serverPeer struct { knownAddresses lru.Cache banScore connmgr.DynamicBanScore quit chan struct{} + // Closed when OnVerAck fires. + verAckCh chan struct{} // The following chans are used to sync blockmanager and server. txProcessed chan struct{} blockProcessed chan struct{} @@ -293,6 +313,7 @@ func newServerPeer(s *server, isPersistent bool) *serverPeer { filter: bloom.LoadFilter(nil), knownAddresses: lru.NewCache(5000), quit: make(chan struct{}), + verAckCh: make(chan struct{}), txProcessed: make(chan struct{}, 1), blockProcessed: make(chan struct{}, 1), } @@ -535,10 +556,17 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej return nil } -// OnVerAck is invoked when a peer receives a verack bitcoin message and is used -// to kick start communication with them. +// OnVerAck is invoked when a peer receives a verack bitcoin message. +// It signals the peer's lifecycle handler that the handshake is +// complete so it can register the peer with the server. func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) { - sp.server.AddPeer(sp) + select { + case <-sp.verAckCh: + peerLog.Errorf("OnVerAck called more than once "+ + "for peer %v", sp) + default: + close(sp.verAckCh) + } } // OnMemPool is invoked when a peer receives a mempool bitcoin message. @@ -1907,7 +1935,19 @@ func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) { } delete(list, sp.ID()) srvrLog.Debugf("Removed peer %s", sp) - return + } + + // Notify the sync manager the peer is gone and evict any + // remaining orphans that were sent by the peer. + if sp.VerAckReceived() { + s.syncManager.DonePeer(sp.Peer) + + numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) + if numEvicted > 0 { + txmpLog.Debugf("Evicted %d %s from peer %v (id %d)", + numEvicted, pickNoun(numEvicted, "orphan", + "orphans"), sp, sp.ID()) + } } } @@ -2231,7 +2271,7 @@ func (s *server) inboundPeerConnected(conn net.Conn) { sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) + go s.peerLifecycleHandler(sp) } // outboundPeerConnected is invoked by the connection manager when a new @@ -2273,38 +2313,44 @@ func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { sp.connReq = c sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) + go s.peerLifecycleHandler(sp) } -// peerDoneHandler handles peer disconnects by notifying the server that it's -// done along with other performing other desirable cleanup. -func (s *server) peerDoneHandler(sp *serverPeer) { +// peerLifecycleHandler is the sole sender of lifecycle events for a +// given peer. It waits for either verack (handshake complete) or +// disconnect (handshake failed/timed out), sends peerAdd if verack +// was received, then waits for disconnect and sends peerDone. +// Because both sends originate from this single goroutine, +// peerAdd is always enqueued before peerDone. +func (s *server) peerLifecycleHandler(sp *serverPeer) { + // Wait for the handshake to complete or the peer to + // disconnect, whichever comes first. + select { + case <-sp.verAckCh: + s.peerLifecycle <- peerLifecycleEvent{ + action: peerAdd, sp: sp, + } + + case <-sp.Peer.Done(): + // Disconnected before verack; no peerAdd needed. + } + + // Wait for full disconnect (may already be done). sp.WaitForDisconnect() - // If this is an outbound peer and the shouldDowngradeToV1 bool is set - // on the underlying Peer, trigger a reconnect using the OG v1 - // connection scheme. + // If this is an outbound peer and the shouldDowngradeToV1 + // bool is set on the underlying Peer, trigger a reconnect + // using the OG v1 connection scheme. if !sp.Inbound() && sp.Peer.ShouldDowngradeToV1() { - srvrLog.Infof("Peer %s indicated v2->v1 downgrade. "+ - "Marking for next attempt as v1.", sp.Addr()) + srvrLog.Infof("Peer %s indicated v2->v1 downgrade."+ + " Marking for next attempt as v1.", + sp.Addr()) s.p2pDowngrader.MarkForDowngrade(sp.Addr()) } - // This is sent to a buffered channel, so it may not execute immediately. - s.donePeers <- sp - - // Only tell sync manager we are gone if we ever told it we existed. - if sp.VerAckReceived() { - s.syncManager.DonePeer(sp.Peer) - - // Evict any remaining orphans that were sent by the peer. - numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) - if numEvicted > 0 { - txmpLog.Debugf("Evicted %d %s from peer %v (id %d)", - numEvicted, pickNoun(numEvicted, "orphan", - "orphans"), sp, sp.ID()) - } + s.peerLifecycle <- peerLifecycleEvent{ + action: peerDone, sp: sp, } close(sp.quit) } @@ -2348,13 +2394,15 @@ func (s *server) peerHandler() { out: for { select { - // New peers connected to the server. - case p := <-s.newPeers: - s.handleAddPeerMsg(state, p) - - // Disconnected peers. - case p := <-s.donePeers: - s.handleDonePeerMsg(state, p) + // Peer connected or disconnected. + case event := <-s.peerLifecycle: + switch event.action { + case peerAdd: + s.handleAddPeerMsg(state, event.sp) + + case peerDone: + s.handleDonePeerMsg(state, event.sp) + } // Block accepted in mainchain or orphan, update peer height. case umsg := <-s.peerHeightsUpdate: @@ -2395,8 +2443,7 @@ out: cleanup: for { select { - case <-s.newPeers: - case <-s.donePeers: + case <-s.peerLifecycle: case <-s.peerHeightsUpdate: case <-s.relayInv: case <-s.broadcast: @@ -2409,11 +2456,6 @@ cleanup: srvrLog.Tracef("Peer handler done") } -// AddPeer adds a new peer that has already been connected to the server. -func (s *server) AddPeer(sp *serverPeer) { - s.newPeers <- sp -} - // BanPeer bans a peer that has already been connected to the server by ip. func (s *server) BanPeer(sp *serverPeer) { s.banPeers <- sp @@ -2847,8 +2889,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, s := server{ chainParams: chainParams, addrManager: amgr, - newPeers: make(chan *serverPeer, cfg.MaxPeers), - donePeers: make(chan *serverPeer, cfg.MaxPeers), + peerLifecycle: make(chan peerLifecycleEvent, cfg.MaxPeers*2), banPeers: make(chan *serverPeer, cfg.MaxPeers), query: make(chan interface{}), relayInv: make(chan relayMsg, cfg.MaxPeers), diff --git a/server_test.go b/server_test.go new file mode 100644 index 0000000000..2b07552c24 --- /dev/null +++ b/server_test.go @@ -0,0 +1,147 @@ +package main + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + // logRotator must be non-nil or any log write (e.g. from + // OnVerAck's double-call guard) panics via logWriter.Write. + initLogRotator(filepath.Join(os.TempDir(), "btcd-server-test.log")) + os.Exit(m.Run()) +} + +// newTestServerPeer creates a minimal serverPeer suitable for unit +// tests that exercise the peer lifecycle logic without starting the +// full server. The returned server's peerLifecycle channel is +// buffered so the handler never blocks during tests. +func newTestServerPeer(t *testing.T) (*server, *serverPeer) { + t.Helper() + + s := &server{ + peerLifecycle: make(chan peerLifecycleEvent, 10), + } + sp := newServerPeer(s, false) + sp.Peer = peer.NewInboundPeer(&peer.Config{ + ChainParams: &chaincfg.SimNetParams, + }) + + return s, sp +} + +// recvLifecycleEvent reads a single event from the peerLifecycle +// channel or fails the test after a timeout. +func recvLifecycleEvent( + t *testing.T, ch <-chan peerLifecycleEvent, +) peerLifecycleEvent { + + t.Helper() + + select { + case ev := <-ch: + return ev + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for peerLifecycleEvent") + return peerLifecycleEvent{} + } +} + +// TestOnVerAckDoubleCall verifies that calling OnVerAck twice on +// the same serverPeer does not panic. The double-call guard must +// log an error and leave verAckCh closed. +func TestOnVerAckDoubleCall(t *testing.T) { + t.Parallel() + + _, sp := newTestServerPeer(t) + + sp.OnVerAck(nil, nil) + + select { + case <-sp.verAckCh: + default: + t.Fatal("verAckCh should be closed after first OnVerAck call") + } + + require.NotPanics(t, func() { + sp.OnVerAck(nil, nil) + }) + + select { + case <-sp.verAckCh: + default: + t.Fatal("verAckCh should still be closed after second OnVerAck call") + } +} + +// TestPeerLifecycleOrdering verifies that when verack arrives before +// disconnect, peerLifecycleHandler emits peerAdd followed by peerDone +// on the peerLifecycle channel -- never out of order. +func TestPeerLifecycleOrdering(t *testing.T) { + t.Parallel() + + s, sp := newTestServerPeer(t) + + // Simulate verack received before the handler starts. + close(sp.verAckCh) + + go s.peerLifecycleHandler(sp) + + first := recvLifecycleEvent(t, s.peerLifecycle) + require.Equal(t, peerAdd, first.action, + "first lifecycle event must be peerAdd") + require.Equal(t, sp, first.sp) + + // Trigger disconnect after peerAdd is observed. + sp.Peer.Disconnect() + + second := recvLifecycleEvent(t, s.peerLifecycle) + require.Equal(t, peerDone, second.action, + "second lifecycle event must be peerDone") + require.Equal(t, sp, second.sp) +} + +// TestPeerLifecycleSimultaneousReady verifies that when both verAckCh +// and Peer.Done() are ready before the handler runs, the system stays +// stable: peerDone is always emitted, and if peerAdd is emitted it +// precedes peerDone. Go's select is nondeterministic so peerAdd may +// be skipped -- both outcomes are valid per documented behavior. +func TestPeerLifecycleSimultaneousReady(t *testing.T) { + t.Parallel() + + const iterations = 100 + var addEmitted int + + for i := 0; i < iterations; i++ { + s, sp := newTestServerPeer(t) + + close(sp.verAckCh) + sp.Peer.Disconnect() + + go s.peerLifecycleHandler(sp) + + first := recvLifecycleEvent(t, s.peerLifecycle) + if first.action == peerAdd { + addEmitted++ + second := recvLifecycleEvent(t, s.peerLifecycle) + assert.Equal(t, peerDone, second.action, + "iteration %d: peerAdd must be "+ + "followed by peerDone", i) + } else { + assert.Equal(t, peerDone, first.action, + "iteration %d: sole event must "+ + "be peerDone", i) + } + } + + t.Logf("peerAdd emitted in %d/%d iterations "+ + "(both outcomes are valid per documented behavior)", + addEmitted, iterations) +}