Skip to content

Commit

Permalink
tracker: fetch block data in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
RaghavSood committed Jun 22, 2024
1 parent 03c58a8 commit d370f8a
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 25 deletions.
14 changes: 14 additions & 0 deletions bitcoinrpc/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ func (rpc *RpcClient) GetBlockStats(height int64) (types.BlockStats, error) {
return stats, nil
}

func (rpc *RpcClient) GetBlockHash(height int64) (string, error) {
result, err := rpc.Do("getblockhash", []interface{}{height})
if err != nil {
return "", err
}

var hash string
if err := json.Unmarshal(result, &hash); err != nil {
return "", fmt.Errorf("failed to unmarshal getblockhash response: %v", err)
}

return hash, nil
}

func (rpc *RpcClient) GetBlock(hash string) (types.Block, error) {
// Ensure we always get the block in the most verbose mode
result, err := rpc.Do("getblock", []interface{}{hash, 3})
Expand Down
124 changes: 99 additions & 25 deletions tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,34 +233,117 @@ func (t *Tracker) processScriptQueue() {
}
}

func (t *Tracker) processBlock(height int64) error {
log.Info().Int64("block_height", height).Msg("Processing block")
start := time.Now()
func (t *Tracker) getBlockStats(height int64, blockStatsCh chan btypes.BlockStats, errCh chan error, wg *sync.WaitGroup) {
startBlockStats := time.Now()
blockStats, err := t.client.GetBlockStats(int64(height))
defer wg.Done()
blockStats, err := t.client.GetBlockStats(height)
if err != nil {
return err
errCh <- err
return
}

blockStatsCh <- blockStats
elapsedBlockStats := time.Since(startBlockStats)
log.Info().Stringer("block_stats_elapsed", elapsedBlockStats).Int64("height", height).Msg("Block stats fetched")
}

func (t *Tracker) getCoinStats(height int64, txOutSetInfoCh chan btypes.TxOutSetInfo, errCh chan error, wg *sync.WaitGroup) {
startCoinStats := time.Now()
defer wg.Done()
coinStats, err := t.client.GetTxOutSetInfo("muhash", height, true)
if err != nil {
errCh <- err
return
}

txOutSetInfoCh <- coinStats
elapsedCoinStats := time.Since(startCoinStats)
log.Info().Stringer("coin_stats_elapsed", elapsedCoinStats).Int64("height", height).Msg("Coin stats fetched")
}

func (t *Tracker) getBlock(height int64, blockCh chan btypes.Block, errCh chan error, wg *sync.WaitGroup) {
startBlockTime := time.Now()
defer wg.Done()
hash, err := t.client.GetBlockHash(height)
if err != nil {
errCh <- err
return
}

block, err := t.client.GetBlock(hash)
if err != nil {
errCh <- err
return
}

blockCh <- block
elapsedBlockTime := time.Since(startBlockTime)
log.Info().Stringer("block_time_elapsed", elapsedBlockTime).Int64("height", height).Msg("Block fetched")
}

func (t *Tracker) processBlock(height int64) error {
log.Info().Int64("block_height", height).Msg("Processing block")
start := time.Now()

blockStatsCh := make(chan btypes.BlockStats)
txOutSetInfoCh := make(chan btypes.TxOutSetInfo)
blockCh := make(chan btypes.Block)
errCh := make(chan error, 3)

var wg sync.WaitGroup
wg.Add(3)

go t.getBlockStats(height, blockStatsCh, errCh, &wg)
go t.getCoinStats(height, txOutSetInfoCh, errCh, &wg)
go t.getBlock(height, blockCh, errCh, &wg)

go func() {
wg.Wait()
close(blockStatsCh)
close(txOutSetInfoCh)
close(blockCh)
close(errCh)
}()

var blockStats btypes.BlockStats
var coinStats btypes.TxOutSetInfo
var block btypes.Block

for blockStatsCh != nil || txOutSetInfoCh != nil || blockCh != nil {
select {
case stats, ok := <-blockStatsCh:
if !ok {
blockStatsCh = nil
continue
}
blockStats = stats
case coin, ok := <-txOutSetInfoCh:
if !ok {
txOutSetInfoCh = nil
continue
}
coinStats = coin
case b, ok := <-blockCh:
if !ok {
blockCh = nil
continue
}
block = b
case err := <-errCh:
if err != nil {
return fmt.Errorf("failed to fetch block data: %v", err)
}
}
}

log.Info().
Int64("subsidy", blockStats.Subsidy).
Str("hash", blockStats.Blockhash).
Stringer("block_stats_elapsed", elapsedBlockStats).
Int64("totalfee", blockStats.Totalfee).
Int64("height", blockStats.Height).
Msg("Block stats")

startCoinStats := time.Now()
coinStats, err := t.client.GetTxOutSetInfo("muhash", int64(height), true)
if err != nil {
return err
}

elapsedCoinStats := time.Since(startCoinStats)

log.Info().
Stringer("coin_stats_elapsed", elapsedCoinStats).
Stringer("total_amount", coinStats.TotalAmount).
Stringer("total_unspendable_amount", coinStats.TotalUnspendableAmount).
Str("bestblock", coinStats.Bestblock).
Expand All @@ -273,16 +356,7 @@ func (t *Tracker) processBlock(height int64) error {
Int64("height", coinStats.Height).
Msg("Coin stats")

startBlockTime := time.Now()
block, err := t.client.GetBlock(blockStats.Blockhash)
if err != nil {
return err
}

elapsedBlockTime := time.Since(startBlockTime)

log.Info().
Stringer("block_time_elapsed", elapsedBlockTime).
Int("nTx", block.NTx).
Int64("height", block.Height).
Msg("Block")
Expand Down Expand Up @@ -360,7 +434,7 @@ func (t *Tracker) processBlock(height int64) error {
}

startRecord := time.Now()
err = t.db.RecordBlockIndexResults(types.FromRPCBlock(block), types.FromRPCTxOutSetInfo(coinStats), blockStats, losses, transactions, spentTxids, spentVouts)
err := t.db.RecordBlockIndexResults(types.FromRPCBlock(block), types.FromRPCTxOutSetInfo(coinStats), blockStats, losses, transactions, spentTxids, spentVouts)
if err != nil {
return fmt.Errorf("failed to record block index results: %v", err)
}
Expand Down

0 comments on commit d370f8a

Please sign in to comment.