Skip to content

Commit d3d8077

Browse files
authored
Integrate end to end hashed chain exchange with F3 (#816)
Integrate the chain exchange mechanism with F3 host and runner. But without touching the core GPBFT. The implementation here leaves two major TODOs: 1) chain broadcasting mechanism (currently coupled to GPBFT message broadcast), and 2) partial message validation prior to buffering (currently skipped entirely but with capped buffer sizes and re-validation by core GPBFT once the messages are complete). The integration introduces the concept of Partial GMessage: a GMessage with chains replaced with the key to the chain. The work introduces a buffer and refill mechanism that listens to the chains discovered, un-buffers the messages having re-constructed their original GMessage and feeds them to the participation using the existing event loop. Part of #792
1 parent fb71a30 commit d3d8077

File tree

9 files changed

+624
-60
lines changed

9 files changed

+624
-60
lines changed

Diff for: cbor_gen.go

+122
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: chainexchange/pubsub.go

+41-21
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ type PubSubChainExchange struct {
3232
*options
3333

3434
// mu guards access to chains and API calls.
35-
mu sync.Mutex
36-
chainsWanted map[uint64]*lru.Cache[string, *chainPortion]
37-
chainsDiscovered map[uint64]*lru.Cache[string, *chainPortion]
38-
topic *pubsub.Topic
39-
stop func() error
35+
mu sync.Mutex
36+
chainsWanted map[uint64]*lru.Cache[string, *chainPortion]
37+
chainsDiscovered map[uint64]*lru.Cache[string, *chainPortion]
38+
pendingCacheAsWanted chan Message
39+
topic *pubsub.Topic
40+
stop func() error
4041
}
4142

4243
func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) {
@@ -45,9 +46,10 @@ func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) {
4546
return nil, err
4647
}
4748
return &PubSubChainExchange{
48-
options: opts,
49-
chainsWanted: map[uint64]*lru.Cache[string, *chainPortion]{},
50-
chainsDiscovered: map[uint64]*lru.Cache[string, *chainPortion]{},
49+
options: opts,
50+
chainsWanted: map[uint64]*lru.Cache[string, *chainPortion]{},
51+
chainsDiscovered: map[uint64]*lru.Cache[string, *chainPortion]{},
52+
pendingCacheAsWanted: make(chan Message, 100), // TODO: parameterise.
5153
}, nil
5254
}
5355

@@ -64,7 +66,9 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
6466
}
6567
if p.topicScoreParams != nil {
6668
if err := p.topic.SetScoreParams(p.topicScoreParams); err != nil {
67-
return fmt.Errorf("failed to set score params: %w", err)
69+
// This can happen most likely due to router not supporting peer scoring. It's
70+
// non-critical. Hence, the warning log.
71+
log.Warnw("failed to set topic score params", "err", err)
6872
}
6973
}
7074
subscription, err := p.topic.Subscribe(pubsub.WithBufferSize(p.subscriptionBufferSize))
@@ -79,17 +83,31 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
7983
for ctx.Err() == nil {
8084
msg, err := subscription.Next(ctx)
8185
if err != nil {
82-
log.Debugw("failed to read nex message from subscription", "err", err)
86+
log.Debugw("failed to read next message from subscription", "err", err)
8387
continue
8488
}
8589
cmsg := msg.ValidatorData.(Message)
8690
p.cacheAsDiscoveredChain(ctx, cmsg)
8791
}
92+
log.Debug("Stopped reading messages from chainexchange subscription.")
93+
}()
94+
go func() {
95+
for ctx.Err() == nil {
96+
select {
97+
case <-ctx.Done():
98+
return
99+
case cmsg := <-p.pendingCacheAsWanted:
100+
p.cacheAsWantedChain(ctx, cmsg)
101+
}
102+
}
103+
log.Debug("Stopped caching chains as wanted.")
88104
}()
89105
p.stop = func() error {
90106
cancel()
91107
subscription.Cancel()
92-
return p.topic.Close()
108+
_ = p.pubsub.UnregisterTopicValidator(p.topicName)
109+
_ = p.topic.Close()
110+
return nil
93111
}
94112
return nil
95113
}
@@ -124,21 +142,18 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
124142
cacheKey := string(key)
125143

126144
// Check wanted keys first.
127-
p.mu.Lock()
145+
128146
wanted := p.getChainsWantedAt(instance)
129-
p.mu.Unlock()
130147
if portion, found := wanted.Get(cacheKey); found && !portion.IsPlaceholder() {
131148
return portion.chain, true
132149
}
133150

134151
// Check if the chain for the key is discovered.
135-
p.mu.Lock()
136152
discovered := p.getChainsDiscoveredAt(instance)
137153
if portion, found := discovered.Get(cacheKey); found {
138154
// Add it to the wanted cache and remove it from the discovered cache.
139155
wanted.Add(cacheKey, portion)
140156
discovered.Remove(cacheKey)
141-
p.mu.Unlock()
142157

143158
chain := portion.chain
144159
if p.listener != nil {
@@ -147,7 +162,6 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
147162
// TODO: Do we want to pull all the suffixes of the chain into wanted cache?
148163
return chain, true
149164
}
150-
p.mu.Unlock()
151165

152166
// Otherwise, add a placeholder for the wanted key as a way to prioritise its
153167
// retention via LRU recent-ness.
@@ -156,6 +170,8 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
156170
}
157171

158172
func (p *PubSubChainExchange) getChainsWantedAt(instance uint64) *lru.Cache[string, *chainPortion] {
173+
p.mu.Lock()
174+
defer p.mu.Unlock()
159175
wanted, exists := p.chainsWanted[instance]
160176
if !exists {
161177
wanted = p.newChainPortionCache(p.maxWantedChainsPerInstance)
@@ -165,6 +181,8 @@ func (p *PubSubChainExchange) getChainsWantedAt(instance uint64) *lru.Cache[stri
165181
}
166182

167183
func (p *PubSubChainExchange) getChainsDiscoveredAt(instance uint64) *lru.Cache[string, *chainPortion] {
184+
p.mu.Lock()
185+
defer p.mu.Unlock()
168186
discovered, exists := p.chainsDiscovered[instance]
169187
if !exists {
170188
discovered = p.newChainPortionCache(p.maxDiscoveredChainsPerInstance)
@@ -208,8 +226,6 @@ func (p *PubSubChainExchange) validatePubSubMessage(_ context.Context, _ peer.ID
208226
}
209227

210228
func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg Message) {
211-
p.mu.Lock()
212-
defer p.mu.Unlock()
213229

214230
wanted := p.getChainsDiscoveredAt(cmsg.Instance)
215231
discovered := p.getChainsDiscoveredAt(cmsg.Instance)
@@ -245,7 +261,13 @@ func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg M
245261
func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error {
246262

247263
// Optimistically cache the broadcast chain and all of its prefixes as wanted.
248-
p.cacheAsWantedChain(ctx, msg)
264+
select {
265+
case p.pendingCacheAsWanted <- msg:
266+
case <-ctx.Done():
267+
return ctx.Err()
268+
default:
269+
log.Warnw("Dropping wanted cache entry. Chain exchange is too slow to process chains as wanted", "msg", msg)
270+
}
249271

250272
// TODO: integrate zstd compression.
251273
var buf bytes.Buffer
@@ -266,7 +288,6 @@ type discovery struct {
266288

267289
func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) {
268290
var notifications []discovery
269-
p.mu.Lock()
270291
wanted := p.getChainsWantedAt(cmsg.Instance)
271292
for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- {
272293
// TODO: Expose internals of merkle.go so that keys can be generated
@@ -290,7 +311,6 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
290311
// been evicted from the cache or not. This should be cheap enough considering the
291312
// added complexity of tracking evictions relative to chain prefixes.
292313
}
293-
p.mu.Unlock()
294314

295315
// Notify the listener outside the lock.
296316
if p.listener != nil {

Diff for: chainexchange/pubsub_test.go

+29-12
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package chainexchange_test
22

33
import (
44
"context"
5+
"slices"
6+
"sync"
57
"testing"
68
"time"
79

@@ -52,32 +54,38 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
5254
chain, found := subject.GetChainByInstance(ctx, instance, key)
5355
require.False(t, found)
5456
require.Nil(t, chain)
55-
require.Empty(t, testListener.notifications)
57+
require.Empty(t, testListener.getNotifications())
5658

5759
require.NoError(t, subject.Broadcast(ctx, chainexchange.Message{
5860
Instance: instance,
5961
Chain: ecChain,
6062
}))
6163

62-
chain, found = subject.GetChainByInstance(ctx, instance, key)
63-
require.True(t, found)
64+
require.Eventually(t, func() bool {
65+
chain, found = subject.GetChainByInstance(ctx, instance, key)
66+
return found
67+
}, time.Second, 100*time.Millisecond)
6468
require.Equal(t, ecChain, chain)
6569

6670
baseChain := ecChain.BaseChain()
6771
baseKey := subject.Key(baseChain)
68-
chain, found = subject.GetChainByInstance(ctx, instance, baseKey)
69-
require.True(t, found)
72+
require.Eventually(t, func() bool {
73+
chain, found = subject.GetChainByInstance(ctx, instance, baseKey)
74+
return found
75+
}, time.Second, 100*time.Millisecond)
7076
require.Equal(t, baseChain, chain)
7177

7278
// Assert that we have received 2 notifications, because ecChain has 2 tipsets.
7379
// First should be the ecChain, second should be the baseChain.
74-
require.Len(t, testListener.notifications, 2)
75-
require.Equal(t, instance, testListener.notifications[1].instance)
76-
require.Equal(t, baseKey, testListener.notifications[1].key)
77-
require.Equal(t, baseChain, testListener.notifications[1].chain)
78-
require.Equal(t, instance, testListener.notifications[0].instance)
79-
require.Equal(t, key, testListener.notifications[0].key)
80-
require.Equal(t, ecChain, testListener.notifications[0].chain)
80+
81+
notifications := testListener.getNotifications()
82+
require.Len(t, notifications, 2)
83+
require.Equal(t, instance, notifications[1].instance)
84+
require.Equal(t, baseKey, notifications[1].key)
85+
require.Equal(t, baseChain, notifications[1].chain)
86+
require.Equal(t, instance, notifications[0].instance)
87+
require.Equal(t, key, notifications[0].key)
88+
require.Equal(t, ecChain, notifications[0].chain)
8189

8290
require.NoError(t, subject.Shutdown(ctx))
8391
}
@@ -88,13 +96,22 @@ type notification struct {
8896
chain gpbft.ECChain
8997
}
9098
type listener struct {
99+
mu sync.Mutex
91100
notifications []notification
92101
}
93102

94103
func (l *listener) NotifyChainDiscovered(_ context.Context, key chainexchange.Key, instance uint64, chain gpbft.ECChain) {
104+
l.mu.Lock()
105+
defer l.mu.Unlock()
95106
l.notifications = append(l.notifications, notification{key: key, instance: instance, chain: chain})
96107
}
97108

109+
func (l *listener) getNotifications() []notification {
110+
l.mu.Lock()
111+
defer l.mu.Unlock()
112+
return slices.Clone(l.notifications)
113+
}
114+
98115
// TODO: Add more tests, specifically:
99116
// - validation
100117
// - discovery through other chainexchange instance

Diff for: f3_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ func (e *testEnv) waitForEpochFinalized(epoch int64) {
548548
}
549549
}
550550
return false
551-
}, 30*time.Second)
551+
}, 60*time.Second)
552552
}
553553

554554
if head < epoch-100 {

0 commit comments

Comments
 (0)