Skip to content

Commit abefebf

Browse files
committed
Improve snapshots reader.
1 parent 84b690d commit abefebf

File tree

2 files changed

+68
-35
lines changed

2 files changed

+68
-35
lines changed
+63-29
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,85 @@
11
package main
22

33
import (
4+
"bufio"
45
"encoding/binary"
56
"errors"
67
"flag"
78
"fmt"
9+
"io"
810
"os"
911

12+
"go.uber.org/zap"
13+
"go.uber.org/zap/zapcore"
14+
1015
"github.com/wavesplatform/gowaves/pkg/logging"
1116
"github.com/wavesplatform/gowaves/pkg/proto"
1217
"github.com/wavesplatform/gowaves/pkg/settings"
13-
"go.uber.org/zap"
14-
"go.uber.org/zap/zapcore"
1518
)
1619

1720
const (
1821
snapshotsByteSize = 4
1922
)
2023

21-
func parseSnapshots(nBlocks int, snapshotsBody *os.File, scheme proto.Scheme) []proto.BlockSnapshot {
24+
type SnapshotAtHeight struct {
25+
Height proto.Height
26+
BlockSnapshot proto.BlockSnapshot
27+
}
28+
29+
func parseSnapshots(start, end uint64, snapshotsBody io.Reader, scheme proto.Scheme) []SnapshotAtHeight {
30+
var buf []byte
2231
snapshotsSizeBytes := make([]byte, snapshotsByteSize)
23-
readPos := int64(0)
24-
var blocksSnapshots []proto.BlockSnapshot
25-
for height := uint64(1); height <= uint64(nBlocks); height++ {
26-
if _, readBerr := snapshotsBody.ReadAt(snapshotsSizeBytes, readPos); readBerr != nil {
27-
zap.S().Fatalf("failed to read the snapshots size in block %v", readBerr)
32+
var blocksSnapshots []SnapshotAtHeight
33+
for height := uint64(2); height < end; height++ {
34+
if _, readBerr := io.ReadFull(snapshotsBody, snapshotsSizeBytes); readBerr != nil {
35+
zap.S().Fatalf("failed to read the snapshots size in block: %v", readBerr)
2836
}
2937
snapshotsSize := binary.BigEndian.Uint32(snapshotsSizeBytes)
30-
if snapshotsSize == 0 {
31-
readPos += snapshotsByteSize
38+
if snapshotsSize == 0 { // add empty block snapshot
39+
if height >= start {
40+
blocksSnapshots = append(blocksSnapshots, SnapshotAtHeight{
41+
Height: height,
42+
BlockSnapshot: proto.BlockSnapshot{},
43+
})
44+
}
3245
continue
3346
}
34-
if snapshotsSize != 0 {
35-
snapshotsInBlock := proto.BlockSnapshot{}
36-
snapshots := make([]byte, snapshotsSize+snapshotsByteSize) // []{snapshot, size} + 4 bytes = size of all snapshots
37-
if _, readRrr := snapshotsBody.ReadAt(snapshots, readPos); readRrr != nil {
38-
zap.S().Fatalf("failed to read the snapshots in block %v", readRrr)
39-
}
40-
unmrshlErr := snapshotsInBlock.UnmarshalBinaryImport(snapshots, scheme)
41-
if unmrshlErr != nil {
42-
zap.S().Fatalf("failed to unmarshal snapshots in block %v", unmrshlErr)
43-
}
44-
blocksSnapshots = append(blocksSnapshots, snapshotsInBlock)
45-
readPos += int64(snapshotsSize) + snapshotsByteSize
47+
48+
if cap(buf) < int(snapshotsSize) {
49+
buf = make([]byte, snapshotsSize)
4650
}
51+
buf = buf[:snapshotsSize]
52+
53+
if _, readRrr := io.ReadFull(snapshotsBody, buf); readRrr != nil {
54+
zap.S().Fatalf("failed to read the snapshots in block: %v", readRrr)
55+
}
56+
if height < start {
57+
continue
58+
}
59+
60+
snapshotsInBlock := proto.BlockSnapshot{}
61+
unmrshlErr := snapshotsInBlock.UnmarshalBinaryImport(buf, scheme)
62+
if unmrshlErr != nil {
63+
zap.S().Fatalf("failed to unmarshal snapshots in block: %v", unmrshlErr)
64+
}
65+
blocksSnapshots = append(blocksSnapshots, SnapshotAtHeight{
66+
Height: height,
67+
BlockSnapshot: snapshotsInBlock,
68+
})
4769
}
4870
return blocksSnapshots
4971
}
5072

5173
func main() {
52-
const (
53-
defaultBlocksNumber = 1000
54-
)
5574
var (
5675
logLevel = zap.LevelFlag("log-level", zapcore.InfoLevel,
5776
"Logging level. Supported levels: DEBUG, INFO, WARN, ERROR, FATAL. Default logging level INFO.")
5877
blockchainType = flag.String("blockchain-type", "mainnet",
59-
"Blockchain type. Allowed values: mainnet/testnet/stagenet/custom. Default is 'mainnet'.")
78+
"Blockchain type. Allowed values: mainnet/testnet/stagenet. Default is 'mainnet'.")
6079
snapshotsPath = flag.String("snapshots-path", "", "Path to binary blockchain file.")
61-
nBlocks = flag.Int("blocks-number", defaultBlocksNumber, "Number of blocks to import.")
80+
blocksStart = flag.Uint64("blocks-start", 0,
81+
"Start block number. Should be greater than 1, because the snapshots file doesn't include genesis.")
82+
nBlocks = flag.Uint64("blocks-number", 1, "Number of blocks to read since 'blocks-start'.")
6283
)
6384
flag.Parse()
6485

@@ -72,6 +93,9 @@ func main() {
7293
if *snapshotsPath == "" {
7394
zap.S().Fatalf("You must specify snapshots-path option.")
7495
}
96+
if *blocksStart < 2 {
97+
zap.S().Fatalf("'blocks-start' must be greater than 1.")
98+
}
7599

76100
ss, err := settings.BlockchainSettingsByTypeName(*blockchainType)
77101
if err != nil {
@@ -82,7 +106,17 @@ func main() {
82106
if err != nil {
83107
zap.S().Fatalf("failed to open snapshots file, %v", err)
84108
}
85-
blocksSnapshots := parseSnapshots(*nBlocks, snapshotsBody, ss.AddressSchemeCharacter)
109+
defer func(snapshotsBody *os.File) {
110+
if clErr := snapshotsBody.Close(); clErr != nil {
111+
zap.S().Fatalf("failed to close snapshots file, %v", clErr)
112+
}
113+
}(snapshotsBody)
114+
const MB = 1 << 20
115+
var (
116+
start = *blocksStart
117+
end = start + *nBlocks
118+
)
119+
blocksSnapshots := parseSnapshots(start, end, bufio.NewReaderSize(snapshotsBody, MB), ss.AddressSchemeCharacter)
86120

87-
zap.S().Info(blocksSnapshots[0])
121+
zap.S().Info(blocksSnapshots)
88122
}

pkg/proto/block_snapshot.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,7 @@ func (bs *BlockSnapshot) UnmarshalBinary(data []byte, scheme Scheme) error {
6868
}
6969

7070
func (bs *BlockSnapshot) UnmarshalBinaryImport(data []byte, scheme Scheme) error {
71-
if len(data) < uint32Size {
72-
return errors.Errorf("BlockSnapshot UnmarshallBinary: invalid data size")
73-
}
74-
snapshotsBytesSize := binary.BigEndian.Uint32(data[0:uint32Size])
75-
data = data[uint32Size:] // skip size
71+
snapshotsBytesSize := len(data)
7672
var txSnapshots [][]AtomicSnapshot
7773
for i := uint32(0); snapshotsBytesSize > 0; i++ {
7874
if len(data) < uint32Size {
@@ -94,7 +90,10 @@ func (bs *BlockSnapshot) UnmarshalBinaryImport(data []byte, scheme Scheme) error
9490
}
9591
txSnapshots = append(txSnapshots, atomicTS)
9692
data = data[oneSnapshotSize:]
97-
snapshotsBytesSize = snapshotsBytesSize - oneSnapshotSize - uint32Size
93+
snapshotsBytesSize = snapshotsBytesSize - int(oneSnapshotSize) - uint32Size
94+
}
95+
if snapshotsBytesSize != 0 { // check that all bytes were read
96+
return errors.Errorf("BlockSnapshot UnmarshallBinary: invalid snapshots size")
9897
}
9998
bs.TxSnapshots = txSnapshots
10099
return nil

0 commit comments

Comments
 (0)