Skip to content

Commit 2b127d2

Browse files
Merge #4256
4256: Add version beacon badger storage r=janezpodhostnik a=janezpodhostnik Extracted from #3736, and added more cases. Co-authored-by: Janez Podhostnik <[email protected]>
2 parents f5a23f9 + bcffa32 commit 2b127d2

File tree

10 files changed

+406
-8
lines changed

10 files changed

+406
-8
lines changed

model/flow/version_beacon.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ type VersionBeacon struct {
3030
Sequence uint64
3131
}
3232

33+
// SealedVersionBeacon is a VersionBeacon with a SealHeight field.
34+
// Version beacons are effective only after they are sealed.
35+
type SealedVersionBeacon struct {
36+
*VersionBeacon
37+
SealHeight uint64
38+
}
39+
3340
func (v *VersionBeacon) ServiceEvent() ServiceEvent {
3441
return ServiceEvent{
3542
Type: ServiceEventVersionBeacon,

storage/badger/operation/common.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,43 @@ func traverse(prefix []byte, iteration iterationFunc) func(*badger.Txn) error {
521521
}
522522
}
523523

524+
// findHighestAtOrBelow searches for the highest key with the given prefix and a height
525+
// at or below the target height, and retrieves and decodes the value associated with the
526+
// key into the given entity.
527+
// If no key is found, the function returns storage.ErrNotFound.
528+
func findHighestAtOrBelow(
529+
prefix []byte,
530+
height uint64,
531+
entity interface{},
532+
) func(*badger.Txn) error {
533+
return func(tx *badger.Txn) error {
534+
if len(prefix) == 0 {
535+
return fmt.Errorf("prefix must not be empty")
536+
}
537+
538+
opts := badger.DefaultIteratorOptions
539+
opts.Prefix = prefix
540+
opts.Reverse = true
541+
542+
it := tx.NewIterator(opts)
543+
defer it.Close()
544+
545+
it.Seek(append(prefix, b(height)...))
546+
547+
if !it.Valid() {
548+
return storage.ErrNotFound
549+
}
550+
551+
return it.Item().Value(func(val []byte) error {
552+
err := msgpack.Unmarshal(val, entity)
553+
if err != nil {
554+
return fmt.Errorf("could not decode entity: %w", err)
555+
}
556+
return nil
557+
})
558+
}
559+
}
560+
524561
// Fail returns a DB operation function that always fails with the given error.
525562
func Fail(err error) func(*badger.Txn) error {
526563
return func(_ *badger.Txn) error {

storage/badger/operation/common_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,3 +614,97 @@ func TestIterateBoundaries(t *testing.T) {
614614
assert.ElementsMatch(t, keysInRange, found, "backward iteration should go over correct keys")
615615
})
616616
}
617+
618+
func TestFindHighestAtOrBelow(t *testing.T) {
619+
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
620+
prefix := []byte("test_prefix")
621+
622+
type Entity struct {
623+
Value uint64
624+
}
625+
626+
entity1 := Entity{Value: 41}
627+
entity2 := Entity{Value: 42}
628+
entity3 := Entity{Value: 43}
629+
630+
err := db.Update(func(tx *badger.Txn) error {
631+
key := append(prefix, b(uint64(15))...)
632+
val, err := msgpack.Marshal(entity3)
633+
if err != nil {
634+
return err
635+
}
636+
err = tx.Set(key, val)
637+
if err != nil {
638+
return err
639+
}
640+
641+
key = append(prefix, b(uint64(5))...)
642+
val, err = msgpack.Marshal(entity1)
643+
if err != nil {
644+
return err
645+
}
646+
err = tx.Set(key, val)
647+
if err != nil {
648+
return err
649+
}
650+
651+
key = append(prefix, b(uint64(10))...)
652+
val, err = msgpack.Marshal(entity2)
653+
if err != nil {
654+
return err
655+
}
656+
err = tx.Set(key, val)
657+
if err != nil {
658+
return err
659+
}
660+
return nil
661+
})
662+
require.NoError(t, err)
663+
664+
var entity Entity
665+
666+
t.Run("target height exists", func(t *testing.T) {
667+
err = findHighestAtOrBelow(
668+
prefix,
669+
10,
670+
&entity)(db.NewTransaction(false))
671+
require.NoError(t, err)
672+
require.Equal(t, uint64(42), entity.Value)
673+
})
674+
675+
t.Run("target height above", func(t *testing.T) {
676+
err = findHighestAtOrBelow(
677+
prefix,
678+
11,
679+
&entity)(db.NewTransaction(false))
680+
require.NoError(t, err)
681+
require.Equal(t, uint64(42), entity.Value)
682+
})
683+
684+
t.Run("target height above highest", func(t *testing.T) {
685+
err = findHighestAtOrBelow(
686+
prefix,
687+
20,
688+
&entity)(db.NewTransaction(false))
689+
require.NoError(t, err)
690+
require.Equal(t, uint64(43), entity.Value)
691+
})
692+
693+
t.Run("target height below lowest", func(t *testing.T) {
694+
err = findHighestAtOrBelow(
695+
prefix,
696+
4,
697+
&entity)(db.NewTransaction(false))
698+
require.ErrorIs(t, err, storage.ErrNotFound)
699+
})
700+
701+
t.Run("empty prefix", func(t *testing.T) {
702+
err = findHighestAtOrBelow(
703+
[]byte{},
704+
5,
705+
&entity)(db.NewTransaction(false))
706+
require.Error(t, err)
707+
require.Contains(t, err.Error(), "prefix must not be empty")
708+
})
709+
})
710+
}

storage/badger/operation/prefix.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,13 @@ const (
6666
codePayloadResults = 58 // index mapping block ID to payload results
6767
codeAllBlockReceipts = 59 // index mapping of blockID to multiple receipts
6868

69-
// codes related to epoch information
69+
// codes related to protocol level information
7070
codeEpochSetup = 61 // EpochSetup service event, keyed by ID
7171
codeEpochCommit = 62 // EpochCommit service event, keyed by ID
7272
codeBeaconPrivateKey = 63 // BeaconPrivateKey, keyed by epoch counter
7373
codeDKGStarted = 64 // flag that the DKG for an epoch has been started
7474
codeDKGEnded = 65 // flag that the DKG for an epoch has ended (stores end state)
75+
codeVersionBeacon = 67 // flag for storing version beacons
7576

7677
// code for ComputationResult upload status storage
7778
// NOTE: for now only GCP uploader is supported. When other uploader (AWS e.g.) needs to
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package operation
2+
3+
import (
4+
"github.com/dgraph-io/badger/v2"
5+
6+
"github.com/onflow/flow-go/model/flow"
7+
)
8+
9+
// IndexVersionBeaconByHeight stores a sealed version beacon indexed by
10+
// flow.SealedVersionBeacon.SealHeight.
11+
//
12+
// No errors are expected during normal operation.
13+
func IndexVersionBeaconByHeight(
14+
beacon flow.SealedVersionBeacon,
15+
) func(*badger.Txn) error {
16+
return upsert(makePrefix(codeVersionBeacon, beacon.SealHeight), beacon)
17+
}
18+
19+
// LookupLastVersionBeaconByHeight finds the highest flow.VersionBeacon but no higher
20+
// than maxHeight. Returns storage.ErrNotFound if no version beacon exists at or below
21+
// the given height.
22+
func LookupLastVersionBeaconByHeight(
23+
maxHeight uint64,
24+
versionBeacon *flow.SealedVersionBeacon,
25+
) func(*badger.Txn) error {
26+
return findHighestAtOrBelow(
27+
makePrefix(codeVersionBeacon),
28+
maxHeight,
29+
versionBeacon,
30+
)
31+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package operation
2+
3+
import (
4+
"testing"
5+
6+
"github.com/dgraph-io/badger/v2"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/onflow/flow-go/model/flow"
10+
"github.com/onflow/flow-go/storage"
11+
"github.com/onflow/flow-go/utils/unittest"
12+
)
13+
14+
func TestResults_IndexByServiceEvents(t *testing.T) {
15+
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
16+
height1 := uint64(21)
17+
height2 := uint64(37)
18+
height3 := uint64(55)
19+
vb1 := flow.SealedVersionBeacon{
20+
VersionBeacon: unittest.VersionBeaconFixture(
21+
unittest.WithBoundaries(
22+
flow.VersionBoundary{
23+
Version: "1.0.0",
24+
BlockHeight: height1 + 5,
25+
},
26+
),
27+
),
28+
SealHeight: height1,
29+
}
30+
vb2 := flow.SealedVersionBeacon{
31+
VersionBeacon: unittest.VersionBeaconFixture(
32+
unittest.WithBoundaries(
33+
flow.VersionBoundary{
34+
Version: "1.1.0",
35+
BlockHeight: height2 + 5,
36+
},
37+
),
38+
),
39+
SealHeight: height2,
40+
}
41+
vb3 := flow.SealedVersionBeacon{
42+
VersionBeacon: unittest.VersionBeaconFixture(
43+
unittest.WithBoundaries(
44+
flow.VersionBoundary{
45+
Version: "2.0.0",
46+
BlockHeight: height3 + 5,
47+
},
48+
),
49+
),
50+
SealHeight: height3,
51+
}
52+
53+
// indexing 3 version beacons at different heights
54+
err := db.Update(IndexVersionBeaconByHeight(vb1))
55+
require.NoError(t, err)
56+
57+
err = db.Update(IndexVersionBeaconByHeight(vb2))
58+
require.NoError(t, err)
59+
60+
err = db.Update(IndexVersionBeaconByHeight(vb3))
61+
require.NoError(t, err)
62+
63+
// index version beacon 2 again to make sure we tolerate duplicates
64+
// it is possible for two or more events of the same type to be from the same height
65+
err = db.Update(IndexVersionBeaconByHeight(vb2))
66+
require.NoError(t, err)
67+
68+
t.Run("retrieve exact height match", func(t *testing.T) {
69+
var actualVB flow.SealedVersionBeacon
70+
err := db.View(LookupLastVersionBeaconByHeight(height1, &actualVB))
71+
require.NoError(t, err)
72+
require.Equal(t, vb1, actualVB)
73+
74+
err = db.View(LookupLastVersionBeaconByHeight(height2, &actualVB))
75+
require.NoError(t, err)
76+
require.Equal(t, vb2, actualVB)
77+
78+
err = db.View(LookupLastVersionBeaconByHeight(height3, &actualVB))
79+
require.NoError(t, err)
80+
require.Equal(t, vb3, actualVB)
81+
})
82+
83+
t.Run("finds highest but not higher than given", func(t *testing.T) {
84+
var actualVB flow.SealedVersionBeacon
85+
86+
err := db.View(LookupLastVersionBeaconByHeight(height3-1, &actualVB))
87+
require.NoError(t, err)
88+
require.Equal(t, vb2, actualVB)
89+
})
90+
91+
t.Run("finds highest", func(t *testing.T) {
92+
var actualVB flow.SealedVersionBeacon
93+
94+
err := db.View(LookupLastVersionBeaconByHeight(height3+1, &actualVB))
95+
require.NoError(t, err)
96+
require.Equal(t, vb3, actualVB)
97+
})
98+
99+
t.Run("height below lowest entry returns nothing", func(t *testing.T) {
100+
var actualVB flow.SealedVersionBeacon
101+
102+
err := db.View(LookupLastVersionBeaconByHeight(height1-1, &actualVB))
103+
require.ErrorIs(t, err, storage.ErrNotFound)
104+
})
105+
})
106+
}

storage/badger/version_beacon.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package badger
2+
3+
import (
4+
"github.com/dgraph-io/badger/v2"
5+
6+
"github.com/onflow/flow-go/model/flow"
7+
"github.com/onflow/flow-go/storage"
8+
"github.com/onflow/flow-go/storage/badger/operation"
9+
)
10+
11+
type VersionBeacons struct {
12+
db *badger.DB
13+
}
14+
15+
var _ storage.VersionBeacons = (*VersionBeacons)(nil)
16+
17+
func NewVersionBeacons(db *badger.DB) *VersionBeacons {
18+
res := &VersionBeacons{
19+
db: db,
20+
}
21+
22+
return res
23+
}
24+
25+
func (r *VersionBeacons) Highest(
26+
belowOrEqualTo uint64,
27+
) (*flow.SealedVersionBeacon, error) {
28+
tx := r.db.NewTransaction(false)
29+
defer tx.Discard()
30+
31+
var beacon *flow.SealedVersionBeacon
32+
33+
err := operation.LookupLastVersionBeaconByHeight(belowOrEqualTo, beacon)(tx)
34+
if err != nil {
35+
return nil, err
36+
}
37+
return beacon, nil
38+
}

0 commit comments

Comments
 (0)