Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) {

heightJSON, err := json.Marshal(strconv.Itoa(height))
if err != nil {
Log.Fatal("getBlockFromRPC bad height argument", height, err)
return nil, fmt.Errorf("getBlockFromRPC bad height argument %d: %w", height, err)
}
// Fetch the block using the verbose option ("1") because it provides
// both the list of txids, which we're not yet able to compute for
Expand All @@ -348,11 +348,11 @@ func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) {
var block1 ZcashRpcReplyGetblock1
err = json.Unmarshal(result, &block1)
if err != nil {
Log.Fatal("getBlockFromRPC: Can't unmarshal block:", err)
return nil, fmt.Errorf("getBlockFromRPC: can't unmarshal block: %w", err)
}
blockHash, err := json.Marshal(block1.Hash)
if err != nil {
Log.Fatal("getBlockFromRPC bad block hash", block1.Hash)
return nil, fmt.Errorf("getBlockFromRPC bad block hash %s: %w", block1.Hash, err)
}
// non-verbose (raw hex) version of block
params = []json.RawMessage{blockHash, json.RawMessage("0")}
Expand Down Expand Up @@ -422,6 +422,33 @@ func stopIngestor() {
func BlockIngestor(c *BlockCache, rep int) {
lastLog := Time.Now()
lastHeightLogged := 0
rpcFailing := false
rpcRetries := 0
lastFailLog := Time.Now()

retryDelay := func() time.Duration {
// Exponential backoff: 8s, 16s, 32s, 64s, capped at 2 minutes.
d := 8 * time.Second * (1 << min(rpcRetries, 4))
if d > 2*time.Minute {
d = 2 * time.Minute
}
return d
}

shouldLog := func() bool {
// Log the first failure, then re-log every 5 minutes.
return !rpcFailing || Time.Now().Sub(lastFailLog) >= 5*time.Minute
}

rpcFail := func(fields logrus.Fields, msg string) {
if shouldLog() {
Log.WithFields(fields).Warn(msg)
lastFailLog = Time.Now()
}
rpcFailing = true
rpcRetries++
Time.Sleep(retryDelay())
}

// Start listening for new blocks
for i := 0; rep == 0 || i < rep; i++ {
Expand All @@ -434,18 +461,27 @@ func BlockIngestor(c *BlockCache, rep int) {

result, err := RawRequest("getbestblockhash", []json.RawMessage{})
if err != nil {
Log.WithFields(logrus.Fields{
"error": err,
}).Fatal("error " + NodeName + " getbestblockhash rpc")
rpcFail(logrus.Fields{"error": err},
"error "+NodeName+" getbestblockhash rpc, retrying...")
continue
}
var hashHex string
err = json.Unmarshal(result, &hashHex)
if err != nil {
Log.Fatal("bad getbestblockhash return:", err, result)
rpcFail(logrus.Fields{"error": err},
"bad getbestblockhash return, retrying...")
continue
}
lastBestBlockHashBE, err := hash32.Decode(hashHex)
if err != nil {
Log.Fatal("error decoding getbestblockhash", err, hashHex)
rpcFail(logrus.Fields{"error": err},
"error decoding getbestblockhash, retrying...")
continue
}
if rpcFailing {
Log.Info(NodeName + " getbestblockhash RPC recovered")
rpcFailing = false
rpcRetries = 0
}

height := c.GetNextHeight()
Expand All @@ -463,13 +499,15 @@ func BlockIngestor(c *BlockCache, rep int) {
var block *walletrpc.CompactBlock
block, err = getBlockFromRPC(height)
if err != nil {
Log.Info("getblock ", height, " failed, will retry: ", err)
Log.Warn("getblock ", height, " failed, will retry: ", err)
Time.Sleep(8 * time.Second)
continue
}
if block != nil && c.HashMatch(hash32.FromSlice(block.PrevHash)) {
if err = c.Add(height, block); err != nil {
Log.Fatal("Cache add failed:", err)
Log.Warn("Cache add failed, will retry: ", err)
Time.Sleep(8 * time.Second)
continue
}
// Don't log these too often.
if DarksideEnabled || Time.Now().Sub(lastLog).Seconds() >= 4 {
Expand Down