Skip to content

Commit 2121abe

Browse files
authored
Merge pull request #382 from SiaFoundation/nate/stuck-syncers
Fixed an issue where fully connected peers could get stuck after reorgs
2 parents 0044651 + b61c30c commit 2121abe

File tree

6 files changed

+162
-13
lines changed

6 files changed

+162
-13
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
default: patch
3+
---
4+
5+
# Fixed an issue where fully connected peers could get stuck on a stale chain after a reorg.

miner.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"go.sia.tech/core/consensus"
77
"go.sia.tech/core/types"
88
"go.sia.tech/coreutils/chain"
9+
"lukechampine.com/frand"
910
)
1011

1112
// FindBlockNonce attempts to find a nonce for b that meets the PoW target.
@@ -48,6 +49,9 @@ retry:
4849
if childHeight >= cs.Network.HardforkV2.AllowHeight {
4950
b.V2 = &types.V2BlockData{
5051
Height: childHeight,
52+
Transactions: []types.V2Transaction{
53+
{ArbitraryData: frand.Bytes(12)}, // to ensure unique block ID
54+
},
5155
}
5256
}
5357

syncer/parallel_sync.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ func (s *Syncer) parallelSync(ctx context.Context, cs consensus.State, headers [
8787
headers := headers[req.base.Height-cs.Index.Height:][:req.numBlocks]
8888
for i := range blocks {
8989
if blocks[i].ID() != headers[i].ID() {
90-
s.ban(p, errors.New("sent blocks that do not match header chain"))
90+
// note: this is not necessarily a ban-worthy offense, as it could
91+
// be caused by a peer on a fork that could be valid.
9192
return Resp{req: req, peer: p, err: errors.New("peer returned blocks that do not match header chain")}
9293
}
9394
}

syncer/syncer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,7 @@ func (s *Syncer) syncLoop(ctx context.Context) error {
711711
if err := s.parallelSync(ctx, r.cs, r.headers); err != nil {
712712
s.log.Warn("sync failed", zap.Stringer("peer", r.peer), zap.Error(err))
713713
}
714+
go s.relayV2Header(r.headers[len(r.headers)-1], r.peer)
714715
}
715716
}
716717
}

syncer/syncer_test.go

Lines changed: 149 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"go.sia.tech/core/gateway"
1111
"go.sia.tech/core/types"
12+
"go.sia.tech/coreutils"
1213
"go.sia.tech/coreutils/chain"
1314
"go.sia.tech/coreutils/syncer"
1415
"go.sia.tech/coreutils/testutil"
@@ -17,7 +18,47 @@ import (
1718
"go.uber.org/zap/zaptest"
1819
)
1920

20-
func newTestSyncer(t testing.TB, name string, log *zap.Logger) (*syncer.Syncer, *chain.Manager) {
21+
// helper to wait for all provided chain managers to be synced
22+
func synced(t *testing.T, cm ...*chain.Manager) {
23+
t.Helper()
24+
25+
var heights []uint64
26+
for range 100 {
27+
heights = heights[:0]
28+
heights = append(heights, cm[0].Tip().Height)
29+
allEqual := true
30+
for _, c := range cm[1:] {
31+
heights = append(heights, c.Tip().Height)
32+
if c.Tip() != cm[0].Tip() {
33+
allEqual = false
34+
}
35+
}
36+
if allEqual {
37+
return
38+
}
39+
time.Sleep(100 * time.Millisecond)
40+
}
41+
t.Fatalf("tips are not equal: %v", heights)
42+
}
43+
44+
// helper to mine blocks on cm and broadcast to syncer s
45+
func mineBlocks(t *testing.T, s *syncer.Syncer, cm *chain.Manager, n int) {
46+
t.Helper()
47+
for range n {
48+
b, ok := coreutils.MineBlock(cm, types.VoidAddress, time.Second)
49+
if !ok {
50+
t.Fatal("failed to mine block")
51+
} else if err := cm.AddBlocks([]types.Block{b}); err != nil {
52+
t.Fatal(err)
53+
}
54+
if b.V2 != nil {
55+
// error is ignored, best effort relay
56+
s.BroadcastV2BlockOutline(gateway.OutlineBlock(b, cm.PoolTransactions(), cm.V2PoolTransactions()))
57+
}
58+
}
59+
}
60+
61+
func newTestSyncer(t testing.TB, opts ...syncer.Option) (*syncer.Syncer, *chain.Manager) {
2162
n, genesis := testutil.Network()
2263
store, tipState1, err := chain.NewDBStore(chain.NewMemDB(), n, genesis, nil)
2364
if err != nil {
@@ -33,22 +74,23 @@ func newTestSyncer(t testing.TB, name string, log *zap.Logger) (*syncer.Syncer,
3374
l.Close()
3475
})
3576

77+
opts = append([]syncer.Option{syncer.WithSyncInterval(100 * time.Millisecond)}, opts...)
3678
s := syncer.New(l, cm, testutil.NewEphemeralPeerStore(), gateway.Header{
3779
GenesisID: genesis.ID(),
3880
UniqueID: gateway.GenerateUniqueID(),
3981
NetAddress: l.Addr().String(),
40-
}, syncer.WithLogger(log.Named(name)), syncer.WithSyncInterval(100*time.Millisecond))
82+
}, opts...)
4183
go s.Run()
4284
return s, cm
4385
}
4486

4587
func TestSyncer(t *testing.T) {
4688
log := zaptest.NewLogger(t)
4789

48-
s1, cm1 := newTestSyncer(t, "syncer1", log)
90+
s1, cm1 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer1")))
4991
defer s1.Close()
5092

51-
s2, cm2 := newTestSyncer(t, "syncer2", log)
93+
s2, cm2 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer2")))
5294
defer s2.Close()
5395

5496
// mine enough blocks to test both v1 and v2 regimes
@@ -89,10 +131,10 @@ func (es evilManager) BlocksForHistory(history []types.BlockID, maxBlocks uint64
89131
func TestSyncWithBadPeer(t *testing.T) {
90132
log := zaptest.NewLogger(t)
91133

92-
s1, cm1 := newTestSyncer(t, "syncer1", log)
134+
s1, cm1 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer1")))
93135
defer s1.Close()
94136

95-
s2, cm2 := newTestSyncer(t, "syncer2", log)
137+
s2, cm2 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer2")))
96138
defer s2.Close()
97139

98140
// mine enough blocks to test both v1 and v2 regimes
@@ -141,7 +183,7 @@ func TestSyncWithBadPeer(t *testing.T) {
141183
func TestSyncerConnectAfterClose(t *testing.T) {
142184
log := zaptest.NewLogger(t)
143185

144-
s, _ := newTestSyncer(t, "syncer1", log)
186+
s, _ := newTestSyncer(t, syncer.WithLogger(log.Named("syncer1")))
145187
if err := s.Close(); err != nil {
146188
t.Fatal(err)
147189
} else if _, err := s.Connect(context.Background(), "localhost:1234"); !errors.Is(err, threadgroup.ErrClosed) {
@@ -162,10 +204,10 @@ func hashEq(a, b types.EncoderTo) bool {
162204
func TestSendCheckpoint(t *testing.T) {
163205
log := zaptest.NewLogger(t)
164206

165-
s1, cm1 := newTestSyncer(t, "syncer1", log)
207+
s1, cm1 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer1")))
166208
defer s1.Close()
167209

168-
s2, _ := newTestSyncer(t, "syncer2", log)
210+
s2, _ := newTestSyncer(t, syncer.WithLogger(log.Named("syncer2")))
169211
defer s2.Close()
170212

171213
// mine above v2 hardfork height
@@ -190,7 +232,7 @@ func TestInstantSync(t *testing.T) {
190232
n, genesis := testutil.Network()
191233
log := zap.NewNop()
192234

193-
s, cm := newTestSyncer(t, "syncer", log)
235+
s, cm := newTestSyncer(t, syncer.WithLogger(log.Named("syncer")))
194236
defer s.Close()
195237

196238
// mine a few blocks above v2 hardfork height
@@ -257,10 +299,10 @@ func TestInstantSync(t *testing.T) {
257299
func TestSendHeaders(t *testing.T) {
258300
log := zaptest.NewLogger(t)
259301

260-
s1, cm1 := newTestSyncer(t, "syncer1", log)
302+
s1, cm1 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer1")))
261303
defer s1.Close()
262304

263-
s2, cm2 := newTestSyncer(t, "syncer2", log)
305+
s2, cm2 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer2")))
264306
defer s2.Close()
265307
cs := cm2.TipState()
266308

@@ -279,3 +321,98 @@ func TestSendHeaders(t *testing.T) {
279321
t.Fatalf("expected 10 remaining headers, got %d", rem)
280322
}
281323
}
324+
325+
func TestSyncerReorg(t *testing.T) {
326+
log := zaptest.NewLogger(t)
327+
328+
s1, cm1 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer1")))
329+
defer s1.Close()
330+
331+
// s2 must only be able to sync from s1 to force reorg propagation
332+
s2, cm2 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer2")), syncer.WithSyncInterval(100*time.Millisecond), syncer.WithMaxInboundPeers(1), syncer.WithMaxOutboundPeers(0))
333+
defer s2.Close()
334+
335+
s3, cm3 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer3")))
336+
defer s3.Close()
337+
338+
// connect s1 and s2
339+
if _, err := s1.Connect(context.Background(), s2.Addr()); err != nil {
340+
t.Fatal(err)
341+
}
342+
log.Debug("connected s1 and s2")
343+
344+
// mine above the v2 require height
345+
mineBlocks(t, s1, cm1, int(cm1.TipState().Network.HardforkV2.RequireHeight+10))
346+
347+
// apply cm1 blocks manually to cm3 to simulate a synced node
348+
_, applied, err := cm1.UpdatesSince(types.ChainIndex{}, 1000)
349+
if err != nil {
350+
t.Fatalf("failed to get updates since genesis: %v", err)
351+
}
352+
for _, cau := range applied {
353+
if err := cm3.AddBlocks([]types.Block{cau.Block}); err != nil {
354+
t.Fatalf("failed to apply block at height %d: %v", cau.Block.V2.Height, err)
355+
}
356+
}
357+
358+
// check that all three nodes are at the same tip
359+
synced(t, cm1, cm2, cm3)
360+
361+
// mine conflicting chains on cm1 and cm3
362+
mineBlocks(t, s1, cm1, 1)
363+
mineBlocks(t, s3, cm3, 5)
364+
365+
// connect cm1 and cm3, triggering a reorg on cm1 and cm2
366+
if _, err := s1.Connect(context.Background(), s3.Addr()); err != nil {
367+
t.Fatal(err)
368+
}
369+
log.Debug("syncer peers", zap.Int("s1", len(s1.Peers())), zap.Int("s2", len(s2.Peers())), zap.Int("s3", len(s3.Peers())))
370+
synced(t, cm1, cm2, cm3)
371+
}
372+
373+
func TestParallelSyncReorgSplit(t *testing.T) {
374+
log := zaptest.NewLogger(t)
375+
376+
s1, cm1 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer1")), syncer.WithSyncInterval(100*time.Millisecond))
377+
defer s1.Close()
378+
379+
// s2 and s3 should not be able to connect to each other
380+
s2, cm2 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer2")), syncer.WithMaxInboundPeers(1), syncer.WithSyncInterval(100*time.Millisecond))
381+
defer s2.Close()
382+
383+
s3, cm3 := newTestSyncer(t, syncer.WithLogger(log.Named("syncer3")), syncer.WithMaxOutboundPeers(1), syncer.WithSyncInterval(100*time.Millisecond))
384+
defer s3.Close()
385+
386+
// mine after the v2 hardfork height
387+
testutil.MineBlocks(t, cm2, types.VoidAddress, int(cm2.TipState().Network.HardforkV2.RequireHeight+10))
388+
389+
// apply cm1 blocks manually to cm3 to simulate a synced node
390+
_, applied, err := cm2.UpdatesSince(types.ChainIndex{}, 1000)
391+
if err != nil {
392+
t.Fatalf("failed to get updates: %v", err)
393+
}
394+
for _, cau := range applied {
395+
if err := cm3.AddBlocks([]types.Block{cau.Block}); err != nil {
396+
t.Fatalf("failed to apply block: %v", err)
397+
}
398+
}
399+
400+
// create a split on cm2 and cm3
401+
testutil.MineBlocks(t, cm2, types.VoidAddress, 5)
402+
testutil.MineBlocks(t, cm3, types.VoidAddress, 6)
403+
404+
// Verify they've diverged
405+
if cm2.Tip() == cm3.Tip() {
406+
t.Fatal("chains should have diverged")
407+
}
408+
409+
// Connect s1 to both s2 and s3
410+
// s1 will get headers from s2 (longer chain) and may ask s3 for blocks
411+
if _, err := s1.Connect(context.Background(), s2.Addr()); err != nil {
412+
t.Fatal(err)
413+
}
414+
if _, err := s1.Connect(context.Background(), s3.Addr()); err != nil {
415+
t.Fatal(err)
416+
}
417+
synced(t, cm1, cm2, cm3)
418+
}

wallet/wallet_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2159,6 +2159,7 @@ func TestSplitUTXO(t *testing.T) {
21592159
if err != nil {
21602160
t.Fatal(err)
21612161
}
2162+
defer w.Close()
21622163

21632164
largestUTXO := cm.TipState().BlockReward().Sub(w.RecommendedFee().Mul64(estimatedTxnSize)) // miner fee is subtracted
21642165
// fund the wallet

0 commit comments

Comments
 (0)