Skip to content

Commit 1e58a5a

Browse files
authored
Merge pull request #369 from SiaFoundation/nate/retrieve-checkpoint
Add concurrency to checkpoint retrieval
2 parents cbf9bcf + 0d628be commit 1e58a5a

File tree

3 files changed

+83
-33
lines changed

3 files changed

+83
-33
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
default: minor
3+
---
4+
5+
# Renamed SendCheckpoint to RetrieveCheckpoint and now accepts multiple peers.

syncer/syncer.go

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -848,41 +848,85 @@ func New(l net.Listener, cm ChainManager, pm PeerStore, header gateway.Header, o
848848
return s
849849
}
850850

851-
// SendCheckpoint connects to the specified peer and downloads the requested
852-
// checkpoint.
853-
func SendCheckpoint(ctx context.Context, addr string, index types.ChainIndex, n *consensus.Network, genesisID types.BlockID) (consensus.State, types.Block, error) {
854-
conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr)
855-
if err != nil {
856-
return consensus.State{}, types.Block{}, err
851+
// RetrieveCheckpoint attempts to retrieve a checkpoint block and state from one of the
852+
// specified peers. The first successful response is returned.
853+
func RetrieveCheckpoint(ctx context.Context, peers []string, index types.ChainIndex, n *consensus.Network, genesisID types.BlockID) (consensus.State, types.Block, error) {
854+
if len(peers) == 0 {
855+
return consensus.State{}, types.Block{}, errors.New("no peers provided")
857856
}
858-
errChan := make(chan error)
859-
var cs consensus.State
860-
var b types.Block
857+
858+
type resp struct {
859+
state consensus.State
860+
block types.Block
861+
err error
862+
}
863+
864+
ctx, cancel := context.WithCancel(ctx)
865+
defer cancel()
866+
867+
resultCh := make(chan resp, 1)
861868
go func() {
862-
t, err := gateway.Dial(conn, gateway.Header{
863-
GenesisID: genesisID,
864-
UniqueID: gateway.GenerateUniqueID(),
865-
NetAddress: "ephemeral:0",
866-
})
867-
if err != nil {
868-
errChan <- err
869-
return
870-
}
871-
p := &Peer{
872-
t: t,
873-
ConnAddr: conn.RemoteAddr().String(),
874-
Inbound: false,
869+
sema := make(chan struct{}, 5) // limit concurrent dials
870+
for _, addr := range peers {
871+
select {
872+
case sema <- struct{}{}:
873+
case <-ctx.Done():
874+
return
875+
}
876+
go func(ctx context.Context, addr string) {
877+
defer func() { <-sema }()
878+
879+
sendResult := func(r resp) {
880+
select {
881+
case <-ctx.Done():
882+
// another goroutine succeeded
883+
case resultCh <- r:
884+
}
885+
}
886+
887+
conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr)
888+
if err != nil {
889+
sendResult(resp{err: err})
890+
return
891+
}
892+
defer conn.Close()
893+
894+
ctx, cancel := context.WithCancel(ctx) // cancels the cleanup goroutine if the RPC fails or completes
895+
defer cancel()
896+
go func() {
897+
<-ctx.Done()
898+
// interrupt dial or RPC by forcing the connection to close
899+
conn.Close()
900+
}()
901+
t, err := gateway.Dial(conn, gateway.Header{
902+
GenesisID: genesisID,
903+
UniqueID: gateway.GenerateUniqueID(),
904+
NetAddress: "ephemeral:0",
905+
})
906+
if err != nil {
907+
sendResult(resp{err: err})
908+
return
909+
}
910+
p := &Peer{
911+
t: t,
912+
ConnAddr: conn.RemoteAddr().String(),
913+
Inbound: false,
914+
}
915+
cs, b, err := p.SendCheckpoint(index, n, 30*time.Second)
916+
sendResult(resp{state: cs, block: b, err: err})
917+
}(ctx, addr)
875918
}
876-
cs, b, err = p.SendCheckpoint(index, n, 5*time.Second)
877-
errChan <- err
878919
}()
879-
select {
880-
case <-ctx.Done():
881-
conn.Close()
882-
<-errChan
883-
return consensus.State{}, types.Block{}, ctx.Err()
884-
case err := <-errChan:
885-
conn.Close()
886-
return cs, b, err
920+
921+
for range peers {
922+
select {
923+
case r := <-resultCh:
924+
if r.err == nil {
925+
return r.state, r.block, nil
926+
}
927+
case <-ctx.Done():
928+
return consensus.State{}, types.Block{}, ctx.Err()
929+
}
887930
}
931+
return consensus.State{}, types.Block{}, errors.New("failed to retrieve checkpoint from any peer")
888932
}

syncer/syncer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ func TestInstantSync(t *testing.T) {
201201
if !ok {
202202
t.Fatal("failed to get index")
203203
}
204-
cs, b, err := syncer.SendCheckpoint(context.Background(), s.Addr(), index, n, genesis.ID())
204+
205+
cs, b, err := syncer.RetrieveCheckpoint(context.Background(), []string{s.Addr()}, index, n, genesis.ID())
205206
if err != nil {
206207
t.Fatal(err)
207208
} else if cs.Index.ID != b.ParentID {

0 commit comments

Comments
 (0)