Skip to content

Commit 21ffb56

Browse files
committed
feat(f3): refactor the F3 participation module to improve testing
1. Move it next to all the F3 code. 2. Make it look more like a "service" so I won't have to duplicate the start/stop code in the itests. 3. Rename participator to participant. Not perfect, but it's a word. 4. Make all the constants public. 5. Only depend on the parts of the API we need (will help with unit testing).
1 parent 4586794 commit 21ffb56

File tree

3 files changed

+248
-778
lines changed

3 files changed

+248
-778
lines changed

chain/lf3/participation.go

+247
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package lf3
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/jpillora/backoff"
9+
"golang.org/x/sync/errgroup"
10+
"golang.org/x/xerrors"
11+
12+
"github.com/filecoin-project/go-address"
13+
"github.com/filecoin-project/go-f3/gpbft"
14+
"github.com/filecoin-project/go-f3/manifest"
15+
16+
"github.com/filecoin-project/lotus/api"
17+
"github.com/filecoin-project/lotus/node/modules/dtypes"
18+
)
19+
20+
const (
21+
// maxCheckProgressAttempts defines the maximum number of failed attempts
22+
// before we abandon the current lease and restart the participation process.
23+
//
24+
// The default backoff takes 12 attempts to reach a maximum delay of 1 minute.
25+
// Allowing for 13 failures results in approximately 2 minutes of backoff since
26+
// the lease was granted. Given a lease validity of up to 5 instances, this means
27+
// we would give up on checking the lease during its mid-validity period;
28+
// typically when we would try to renew the participation ticket. Hence, the value
29+
// to 13.
30+
ParticipationCheckProgressMaxAttempts = 13
31+
32+
// checkProgressInterval defines the duration between progress checks in normal operation mode.
33+
// This interval is used when there are no errors in retrieving the current progress.
34+
ParticipationCheckProgressInterval = 10 * time.Second
35+
36+
// ParticipationLeaseTerm is the number of instances the miner will attempt to lease from nodes.
37+
ParticipationLeaseTerm = 5
38+
)
39+
40+
type F3ParticipationAPI interface {
41+
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
42+
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
43+
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
44+
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
45+
}
46+
47+
type Participant struct {
48+
node F3ParticipationAPI
49+
participant address.Address
50+
backoff *backoff.Backoff
51+
maxCheckProgressAttempts int
52+
previousTicket api.F3ParticipationTicket
53+
checkProgressInterval time.Duration
54+
leaseTerm uint64
55+
56+
runningCtx context.Context
57+
cancelCtx context.CancelFunc
58+
errgrp *errgroup.Group
59+
}
60+
61+
func NewParticipant(ctx context.Context, node F3ParticipationAPI, participant dtypes.MinerAddress, backoff *backoff.Backoff, maxCheckProgress int, checkProgressInterval time.Duration, leaseTerm uint64) *Participant {
62+
runningCtx, cancel := context.WithCancel(context.WithoutCancel(ctx))
63+
errgrp, runningCtx := errgroup.WithContext(runningCtx)
64+
return &Participant{
65+
node: node,
66+
participant: address.Address(participant),
67+
backoff: backoff,
68+
maxCheckProgressAttempts: maxCheckProgress,
69+
previousTicket: []byte{},
70+
checkProgressInterval: checkProgressInterval,
71+
leaseTerm: leaseTerm,
72+
runningCtx: runningCtx,
73+
cancelCtx: cancel,
74+
errgrp: errgrp,
75+
}
76+
}
77+
78+
func (p *Participant) Start(ctx context.Context) error {
79+
p.errgrp.Go(func() error {
80+
return p.run(p.runningCtx)
81+
})
82+
return nil
83+
}
84+
85+
func (p *Participant) Stop(ctx context.Context) error {
86+
p.cancelCtx()
87+
return p.errgrp.Wait()
88+
}
89+
90+
func (p *Participant) run(ctx context.Context) (_err error) {
91+
defer func() {
92+
if ctx.Err() == nil && _err != nil {
93+
log.Errorw("F3 participation stopped unexpectedly", "error", _err)
94+
}
95+
}()
96+
97+
for ctx.Err() == nil {
98+
start := time.Now()
99+
ticket, err := p.tryGetF3ParticipationTicket(ctx)
100+
if err != nil {
101+
return err
102+
}
103+
lease, participating, err := p.tryF3Participate(ctx, ticket)
104+
if err != nil {
105+
return err
106+
}
107+
if participating {
108+
if err := p.awaitLeaseExpiry(ctx, lease); err != nil {
109+
return err
110+
}
111+
}
112+
const minPeriod = 500 * time.Millisecond
113+
if sinceLastLoop := time.Since(start); sinceLastLoop < minPeriod {
114+
select {
115+
case <-time.After(minPeriod - sinceLastLoop):
116+
case <-ctx.Done():
117+
return ctx.Err()
118+
}
119+
}
120+
log.Info("Renewing F3 participation")
121+
}
122+
return ctx.Err()
123+
}
124+
125+
func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3ParticipationTicket, error) {
126+
p.backoff.Reset()
127+
for ctx.Err() == nil {
128+
switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, p.previousTicket, p.leaseTerm); {
129+
case ctx.Err() != nil:
130+
return api.F3ParticipationTicket{}, ctx.Err()
131+
case errors.Is(err, api.ErrF3Disabled):
132+
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
133+
return api.F3ParticipationTicket{}, xerrors.Errorf("acquiring F3 participation ticket: %w", err)
134+
case err != nil:
135+
log.Errorw("Failed to acquire F3 participation ticket; retrying after backoff", "backoff", p.backoff.Duration(), "err", err)
136+
p.backOff(ctx)
137+
log.Debugw("Reattempting to acquire F3 participation ticket.", "attempts", p.backoff.Attempt())
138+
continue
139+
default:
140+
log.Debug("Successfully acquired F3 participation ticket")
141+
return ticket, nil
142+
}
143+
}
144+
return api.F3ParticipationTicket{}, ctx.Err()
145+
}
146+
147+
func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
148+
p.backoff.Reset()
149+
for ctx.Err() == nil {
150+
switch lease, err := p.node.F3Participate(ctx, ticket); {
151+
case ctx.Err() != nil:
152+
return api.F3ParticipationLease{}, false, ctx.Err()
153+
case errors.Is(err, api.ErrF3Disabled):
154+
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
155+
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
156+
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
157+
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
158+
return api.F3ParticipationLease{}, false, nil
159+
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
160+
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
161+
return api.F3ParticipationLease{}, false, nil
162+
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
163+
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
164+
p.backOff(ctx)
165+
return api.F3ParticipationLease{}, false, nil
166+
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
167+
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err)
168+
p.backOff(ctx)
169+
log.Debugw("Reattempting F3 participation with the same ticket.", "attempts", p.backoff.Attempt())
170+
continue
171+
case errors.Is(err, api.ErrF3NotReady):
172+
log.Warnw("F3 is not ready. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err)
173+
p.backOff(ctx)
174+
continue
175+
case err != nil:
176+
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
177+
p.backOff(ctx)
178+
continue
179+
default:
180+
log.Infow("Successfully acquired F3 participation lease.",
181+
"issuer", lease.Issuer,
182+
"not-before", lease.FromInstance,
183+
"not-after", lease.ToInstance(),
184+
)
185+
p.previousTicket = ticket
186+
return lease, true, nil
187+
}
188+
}
189+
return api.F3ParticipationLease{}, false, ctx.Err()
190+
}
191+
192+
func (p *Participant) awaitLeaseExpiry(ctx context.Context, lease api.F3ParticipationLease) error {
193+
p.backoff.Reset()
194+
for ctx.Err() == nil {
195+
switch manifest, err := p.node.F3GetManifest(ctx); {
196+
case errors.Is(err, api.ErrF3Disabled):
197+
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
198+
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
199+
case err != nil:
200+
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
201+
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
202+
return nil
203+
}
204+
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
205+
p.backOff(ctx)
206+
case manifest.NetworkName != lease.Network:
207+
return nil
208+
}
209+
switch progress, err := p.node.F3GetProgress(ctx); {
210+
case errors.Is(err, api.ErrF3Disabled):
211+
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
212+
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
213+
case err != nil:
214+
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
215+
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
216+
return nil
217+
}
218+
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
219+
p.backOff(ctx)
220+
case progress.ID+2 >= lease.ToInstance():
221+
log.Infof("F3 progressed (%d) to within two instances of lease expiry (%d). Renewing participation.", progress.ID, lease.ToInstance())
222+
return nil
223+
default:
224+
remainingInstanceLease := lease.ToInstance() - progress.ID
225+
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, p.checkProgressInterval)
226+
p.backOffFor(ctx, p.checkProgressInterval)
227+
}
228+
}
229+
return ctx.Err()
230+
}
231+
232+
func (p *Participant) backOff(ctx context.Context) {
233+
p.backOffFor(ctx, p.backoff.Duration())
234+
}
235+
236+
func (p *Participant) backOffFor(ctx context.Context, d time.Duration) {
237+
// Create a timer every time to avoid potential risk of deadlock or the need for
238+
// mutex despite the fact that f3Participator is never (and should never) be
239+
// called from multiple goroutines.
240+
timer := time.NewTimer(d)
241+
defer timer.Stop()
242+
select {
243+
case <-ctx.Done():
244+
return
245+
case <-timer.C:
246+
}
247+
}

itests/f3_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestF3_Enabled(t *testing.T) {
4646
blocktime := 100 * time.Millisecond
4747
e := setup(t, blocktime)
4848

49-
e.waitTillF3Instance(modules.F3LeaseTerm+1, 40*time.Second)
49+
e.waitTillF3Instance(lf3.ParticipationLeaseTerm+1, 40*time.Second)
5050
}
5151

5252
// Test that checks that F3 can be rebootsrapped by changing the manifest

0 commit comments

Comments
 (0)