Skip to content

Commit bae93e2

Browse files
authored
Clear out old instances from partial message management (#851)
* Clear out old instances from partial message management Once a decision about an instance is received, remove all states related to the older instances from partial message manager and chain exchange. Part of #792 * Remove sleep from asserting epoch finalisation
1 parent 2d636c9 commit bae93e2

File tree

4 files changed

+43
-4
lines changed

4 files changed

+43
-4
lines changed

Diff for: chainexchange/pubsub.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,16 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
321321
func (p *PubSubChainExchange) RemoveChainsByInstance(_ context.Context, instance uint64) error {
322322
p.mu.Lock()
323323
defer p.mu.Unlock()
324-
delete(p.chainsWanted, instance)
325-
delete(p.chainsDiscovered, instance)
324+
for i := range p.chainsWanted {
325+
if i < instance {
326+
delete(p.chainsWanted, i)
327+
}
328+
}
329+
for i := range p.chainsDiscovered {
330+
if i < instance {
331+
delete(p.chainsDiscovered, i)
332+
}
333+
}
326334
return nil
327335
}
328336

Diff for: f3_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,6 @@ func (e *testEnv) waitForEpochFinalized(epoch int64) {
541541
// here and reduce the clock advance to give messages a chance of being
542542
// delivered in time. See:
543543
// - https://github.com/filecoin-project/go-f3/issues/818
544-
time.Sleep(20 * time.Millisecond)
545544
for _, nd := range e.nodes {
546545
if nd.f3 == nil || !nd.f3.IsRunning() {
547546
continue

Diff for: host.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,9 @@ func (h *gpbftHost) ReceiveDecision(decision *gpbft.Justification) (time.Time, e
788788
log.Infow("reached a decision", "instance", decision.Vote.Instance,
789789
"ecHeadEpoch", decision.Vote.Value.Head().Epoch)
790790
if decision.Vote.Instance > 0 {
791-
h.pmCache.RemoveGroupsLessThan(decision.Vote.Instance - 1)
791+
oldInstance := decision.Vote.Instance - 1
792+
h.pmCache.RemoveGroupsLessThan(oldInstance)
793+
h.pmm.RemoveMessagesBeforeInstance(context.Background(), oldInstance)
792794
}
793795
cert, err := h.saveDecision(decision)
794796
if err != nil {

Diff for: partial_msg.go

+30
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type partialMessageManager struct {
4545
pendingDiscoveredChains chan *discoveredChain
4646
// pendingChainBroadcasts is a channel of chains that are pending to be broadcasted.
4747
pendingChainBroadcasts chan chainexchange.Message
48+
// pendingInstanceRemoval is a channel of instances that are pending to be removed.
49+
pendingInstanceRemoval chan uint64
4850
// rebroadcastInterval is the interval at which chains are re-broadcasted.
4951
rebroadcastInterval time.Duration
5052

@@ -58,6 +60,7 @@ func newPartialMessageManager(progress gpbft.Progress, ps *pubsub.PubSub, m *man
5860
pendingDiscoveredChains: make(chan *discoveredChain, 100), // TODO: parameterize buffer size.
5961
pendingPartialMessages: make(chan *PartiallyValidatedMessage, 100), // TODO: parameterize buffer size.
6062
pendingChainBroadcasts: make(chan chainexchange.Message, 100), // TODO: parameterize buffer size.
63+
pendingInstanceRemoval: make(chan uint64, 10),
6164
rebroadcastInterval: m.ChainExchange.RebroadcastInterval,
6265
}
6366
var err error
@@ -150,6 +153,23 @@ func (pmm *partialMessageManager) Start(ctx context.Context) (<-chan *PartiallyV
150153
// TODO: Add equivocation metrics: check if the message is different and if so
151154
// increment the equivocations counter tagged by phase.
152155
// See: https://github.com/filecoin-project/go-f3/issues/812
156+
case instance, ok := <-pmm.pendingInstanceRemoval:
157+
if !ok {
158+
return
159+
}
160+
for i := range pmm.pmByInstance {
161+
if i < instance {
162+
delete(pmm.pmByInstance, i)
163+
}
164+
}
165+
for i := range pmm.pmkByInstanceByChainKey {
166+
if i < instance {
167+
delete(pmm.pmkByInstanceByChainKey, i)
168+
}
169+
}
170+
if err := pmm.chainex.RemoveChainsByInstance(ctx, instance); err != nil {
171+
log.Errorw("Failed to remove chains by instance form chainexchange.", "instance", instance, "error", err)
172+
}
153173
}
154174
}
155175
}()
@@ -366,6 +386,16 @@ func inferJustificationVoteValue(pgmsg *PartialGMessage) {
366386
}
367387
}
368388

389+
func (pmm *partialMessageManager) RemoveMessagesBeforeInstance(ctx context.Context, instance uint64) {
390+
select {
391+
case <-ctx.Done():
392+
return
393+
case pmm.pendingInstanceRemoval <- instance:
394+
default:
395+
log.Warnw("Dropped instance removal request as partial message manager is too slow.", "instance", instance)
396+
}
397+
}
398+
369399
func (pmm *partialMessageManager) Shutdown(ctx context.Context) error {
370400
if pmm.stop != nil {
371401
pmm.stop()

0 commit comments

Comments
 (0)