Skip to content

Commit 12a5aa1

Browse files
Stebalienrjan90
authored andcommitted
feat(f3): refactor the F3 participation module to improve testing (#12589)
1. Move it next to all the F3 code. 2. Make it look more like a "service" so I won't have to duplicate the start/stop code in the itests. 3. Rename participator to participant. Not perfect, but it's a word. 4. Make all the constants public. 5. Only depend on the parts of the API we need (will help with unit testing).
1 parent 7c11a8d commit 12a5aa1

File tree

3 files changed

+256
-205
lines changed

3 files changed

+256
-205
lines changed

chain/lf3/participation.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package lf3
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/jpillora/backoff"
9+
"golang.org/x/sync/errgroup"
10+
"golang.org/x/xerrors"
11+
12+
"github.com/filecoin-project/go-address"
13+
"github.com/filecoin-project/go-f3/gpbft"
14+
"github.com/filecoin-project/go-f3/manifest"
15+
16+
"github.com/filecoin-project/lotus/api"
17+
"github.com/filecoin-project/lotus/node/modules/dtypes"
18+
)
19+
20+
const (
21+
// maxCheckProgressAttempts defines the maximum number of failed attempts
22+
// before we abandon the current lease and restart the participation process.
23+
//
24+
// The default backoff takes 12 attempts to reach a maximum delay of 1 minute.
25+
// Allowing for 13 failures results in approximately 2 minutes of backoff since
26+
// the lease was granted. Given a lease validity of up to 5 instances, this means
27+
// we would give up on checking the lease during its mid-validity period;
28+
// typically when we would try to renew the participation ticket. Hence, the value
29+
// to 13.
30+
ParticipationCheckProgressMaxAttempts = 13
31+
32+
// ParticipationLeaseTerm is the number of instances the miner will attempt to lease from nodes.
33+
ParticipationLeaseTerm = 5
34+
)
35+
36+
type F3ParticipationAPI interface {
37+
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
38+
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
39+
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
40+
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
41+
}
42+
43+
type Participant struct {
44+
node F3ParticipationAPI
45+
participant address.Address
46+
backoff *backoff.Backoff
47+
maxCheckProgressAttempts int
48+
previousTicket api.F3ParticipationTicket
49+
leaseTerm uint64
50+
51+
runningCtx context.Context
52+
cancelCtx context.CancelFunc
53+
errgrp *errgroup.Group
54+
}
55+
56+
func NewParticipant(ctx context.Context, node F3ParticipationAPI, participant dtypes.MinerAddress, backoff *backoff.Backoff, maxCheckProgress int, leaseTerm uint64) *Participant {
57+
runningCtx, cancel := context.WithCancel(context.WithoutCancel(ctx))
58+
errgrp, runningCtx := errgroup.WithContext(runningCtx)
59+
return &Participant{
60+
node: node,
61+
participant: address.Address(participant),
62+
backoff: backoff,
63+
maxCheckProgressAttempts: maxCheckProgress,
64+
leaseTerm: leaseTerm,
65+
runningCtx: runningCtx,
66+
cancelCtx: cancel,
67+
errgrp: errgrp,
68+
}
69+
}
70+
71+
func (p *Participant) Start(ctx context.Context) error {
72+
p.errgrp.Go(func() error {
73+
return p.run(p.runningCtx)
74+
})
75+
return nil
76+
}
77+
78+
func (p *Participant) Stop(ctx context.Context) error {
79+
p.cancelCtx()
80+
return p.errgrp.Wait()
81+
}
82+
83+
func (p *Participant) run(ctx context.Context) (_err error) {
84+
defer func() {
85+
if ctx.Err() == nil && _err != nil {
86+
log.Errorw("F3 participation stopped unexpectedly", "error", _err)
87+
}
88+
}()
89+
90+
for ctx.Err() == nil {
91+
start := time.Now()
92+
ticket, err := p.tryGetF3ParticipationTicket(ctx)
93+
if err != nil {
94+
return err
95+
}
96+
lease, participating, err := p.tryF3Participate(ctx, ticket)
97+
if err != nil {
98+
return err
99+
}
100+
if participating {
101+
if err := p.awaitLeaseExpiry(ctx, lease); err != nil {
102+
return err
103+
}
104+
}
105+
const minPeriod = 500 * time.Millisecond
106+
if sinceLastLoop := time.Since(start); sinceLastLoop < minPeriod {
107+
select {
108+
case <-time.After(minPeriod - sinceLastLoop):
109+
case <-ctx.Done():
110+
return ctx.Err()
111+
}
112+
}
113+
log.Info("Renewing F3 participation")
114+
}
115+
return ctx.Err()
116+
}
117+
118+
func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3ParticipationTicket, error) {
119+
p.backoff.Reset()
120+
for ctx.Err() == nil {
121+
switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, p.previousTicket, p.leaseTerm); {
122+
case ctx.Err() != nil:
123+
return api.F3ParticipationTicket{}, ctx.Err()
124+
case errors.Is(err, api.ErrF3Disabled):
125+
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
126+
return api.F3ParticipationTicket{}, xerrors.Errorf("acquiring F3 participation ticket: %w", err)
127+
case err != nil:
128+
log.Errorw("Failed to acquire F3 participation ticket; retrying after backoff", "backoff", p.backoff.Duration(), "err", err)
129+
p.backOff(ctx)
130+
log.Debugw("Reattempting to acquire F3 participation ticket.", "attempts", p.backoff.Attempt())
131+
continue
132+
default:
133+
log.Debug("Successfully acquired F3 participation ticket")
134+
return ticket, nil
135+
}
136+
}
137+
return api.F3ParticipationTicket{}, ctx.Err()
138+
}
139+
140+
func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
141+
p.backoff.Reset()
142+
for ctx.Err() == nil {
143+
switch lease, err := p.node.F3Participate(ctx, ticket); {
144+
case ctx.Err() != nil:
145+
return api.F3ParticipationLease{}, false, ctx.Err()
146+
case errors.Is(err, api.ErrF3Disabled):
147+
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
148+
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
149+
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
150+
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
151+
return api.F3ParticipationLease{}, false, nil
152+
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
153+
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
154+
return api.F3ParticipationLease{}, false, nil
155+
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
156+
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
157+
p.backOff(ctx)
158+
return api.F3ParticipationLease{}, false, nil
159+
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
160+
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err)
161+
p.backOff(ctx)
162+
log.Debugw("Reattempting F3 participation with the same ticket.", "attempts", p.backoff.Attempt())
163+
continue
164+
case errors.Is(err, api.ErrF3NotReady):
165+
log.Warnw("F3 is not ready. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err)
166+
p.backOff(ctx)
167+
continue
168+
case err != nil:
169+
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
170+
p.backOff(ctx)
171+
continue
172+
default:
173+
log.Infow("Successfully acquired F3 participation lease.",
174+
"issuer", lease.Issuer,
175+
"not-before", lease.FromInstance,
176+
"not-after", lease.ToInstance(),
177+
)
178+
p.previousTicket = ticket
179+
return lease, true, nil
180+
}
181+
}
182+
return api.F3ParticipationLease{}, false, ctx.Err()
183+
}
184+
185+
func (p *Participant) awaitLeaseExpiry(ctx context.Context, lease api.F3ParticipationLease) error {
186+
p.backoff.Reset()
187+
renewLeaseWithin := p.leaseTerm / 2
188+
for ctx.Err() == nil {
189+
manifest, err := p.node.F3GetManifest(ctx)
190+
switch {
191+
case errors.Is(err, api.ErrF3Disabled):
192+
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
193+
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
194+
case err != nil:
195+
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
196+
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
197+
return nil
198+
}
199+
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
200+
p.backOff(ctx)
201+
case manifest == nil || manifest.NetworkName != lease.Network:
202+
// If we got an unexpected manifest, or no manifest, go back to the
203+
// beginning and try to get another ticket. Switching from having a manifest
204+
// to having no manifest can theoretically happen if the lotus node reboots
205+
// and has no static manifest.
206+
return nil
207+
}
208+
switch progress, err := p.node.F3GetProgress(ctx); {
209+
case errors.Is(err, api.ErrF3Disabled):
210+
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
211+
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
212+
case err != nil:
213+
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
214+
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
215+
return nil
216+
}
217+
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
218+
p.backOff(ctx)
219+
case progress.ID+renewLeaseWithin >= lease.ToInstance():
220+
log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance())
221+
return nil
222+
default:
223+
remainingInstanceLease := lease.ToInstance() - progress.ID
224+
waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment
225+
if waitTime == 0 {
226+
waitTime = 100 * time.Millisecond
227+
}
228+
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime)
229+
p.backOffFor(ctx, waitTime)
230+
}
231+
}
232+
return ctx.Err()
233+
}
234+
235+
func (p *Participant) backOff(ctx context.Context) {
236+
p.backOffFor(ctx, p.backoff.Duration())
237+
}
238+
239+
func (p *Participant) backOffFor(ctx context.Context, d time.Duration) {
240+
// Create a timer every time to avoid potential risk of deadlock or the need for
241+
// mutex despite the fact that f3Participator is never (and should never) be
242+
// called from multiple goroutines.
243+
timer := time.NewTimer(d)
244+
defer timer.Stop()
245+
select {
246+
case <-ctx.Done():
247+
return
248+
case <-timer.C:
249+
}
250+
}

itests/f3_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestF3_Enabled(t *testing.T) {
4646
blocktime := 100 * time.Millisecond
4747
e := setup(t, blocktime)
4848

49-
e.waitTillF3Instance(modules.F3LeaseTerm+1, 40*time.Second)
49+
e.waitTillF3Instance(lf3.ParticipationLeaseTerm+1, 40*time.Second)
5050
}
5151

5252
// Test that checks that F3 can be rebootsrapped by changing the manifest

0 commit comments

Comments
 (0)