Skip to content

Commit 08ea12c

Browse files
committed
Implement dynamic fusing manifest
Signed-off-by: Jakub Sztandera <[email protected]>
1 parent fa4d168 commit 08ea12c

File tree

2 files changed

+148
-35
lines changed

2 files changed

+148
-35
lines changed

manifest/fusing_provider.go

+82-33
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ type HeadGetter interface {
1919
var _ ManifestProvider = (*FusingManifestProvider)(nil)
2020

2121
// FusingManifestProvider is a ManifestProvider that starts by providing dynamic manifest updates
22-
// then switches to a static manifest when we get within finality of said manifest's bootstrap
22+
// then switches to a priority manifest when we get within finality of said manifest's bootstrap
2323
// epoch.
2424
type FusingManifestProvider struct {
25-
ec HeadGetter
26-
dynamic ManifestProvider
27-
static *Manifest
25+
ec HeadGetter
26+
dynamic ManifestProvider
27+
priority ManifestProvider
2828

2929
manifestCh chan *Manifest
3030

@@ -34,19 +34,17 @@ type FusingManifestProvider struct {
3434
clock clock.Clock
3535
}
3636

37-
func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic ManifestProvider, static *Manifest) (*FusingManifestProvider, error) {
38-
if err := static.Validate(); err != nil {
39-
return nil, err
40-
}
41-
37+
// NewFusingManifestProvider creates a provider that will lock into the priority manifest onces it reaches BootstrapEpoch of priority manifest
38+
// the priority ManifestProvider needs to provide at least one manifest (or nil), a sign of life, to enable forwarding of dynamic manifests
39+
func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic ManifestProvider, priority ManifestProvider) (*FusingManifestProvider, error) {
4240
clk := clock.GetClock(ctx)
4341
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
4442
errgrp, ctx := errgroup.WithContext(ctx)
4543

4644
return &FusingManifestProvider{
4745
ec: ec,
4846
dynamic: dynamic,
49-
static: static,
47+
priority: priority,
5048
errgrp: errgrp,
5149
cancel: cancel,
5250
runningCtx: ctx,
@@ -60,41 +58,92 @@ func (m *FusingManifestProvider) ManifestUpdates() <-chan *Manifest {
6058
}
6159

6260
func (m *FusingManifestProvider) Start(ctx context.Context) error {
63-
head, err := m.ec.GetHead(ctx)
64-
if err != nil {
65-
return fmt.Errorf("failed to determine current head epoch")
61+
if err := m.priority.Start(ctx); err != nil {
62+
return err
6663
}
6764

68-
switchEpoch := m.static.BootstrapEpoch - m.static.EC.Finality
69-
headEpoch := head.Epoch()
70-
71-
if headEpoch >= switchEpoch {
72-
m.manifestCh <- m.static
73-
return nil
65+
priorityManifest := <-m.priority.ManifestUpdates()
66+
var timer *clock.Timer
67+
startTimeOfPriority := func(mani *Manifest) (time.Time, error) {
68+
head, err := m.ec.GetHead(ctx)
69+
if err != nil {
70+
return time.Time{}, fmt.Errorf("failed to determine current head epoch: %w", err)
71+
}
72+
headEpoch := head.Epoch()
73+
switchEpoch := mani.BootstrapEpoch - mani.EC.Finality
74+
epochDelay := switchEpoch - headEpoch
75+
start := head.Timestamp().Add(time.Duration(epochDelay) * mani.EC.Period)
76+
return start, nil
7477
}
7578

76-
epochDelay := switchEpoch - headEpoch
77-
start := head.Timestamp().Add(time.Duration(epochDelay) * m.static.EC.Period)
79+
{
80+
head, err := m.ec.GetHead(ctx)
81+
if err != nil {
82+
return fmt.Errorf("failed to determine current head epoch: %w", err)
83+
}
84+
headEpoch := head.Epoch()
85+
// exit early if priorityManifest is relevant right now
86+
if priorityManifest != nil && headEpoch >= priorityManifest.BootstrapEpoch-priorityManifest.EC.Finality {
87+
m.priority.Stop(ctx)
88+
m.manifestCh <- priorityManifest
89+
return nil
90+
}
91+
92+
if priorityManifest != nil {
93+
startTime, err := startTimeOfPriority(priorityManifest)
94+
log.Infof("starting the fusing manifest provider, will switch to the priority manifest at %s,(now %s)",
95+
startTime, m.clock.Now())
96+
if err != nil {
97+
return fmt.Errorf("trying to compute start time: %w", err)
98+
}
99+
timer = m.clock.Timer(m.clock.Until(startTime))
100+
} else {
101+
// create a stopped timer
102+
timer = m.clock.Timer(time.Hour)
103+
timer.Stop()
104+
}
105+
}
78106

79107
if err := m.dynamic.Start(ctx); err != nil {
80108
return err
81109
}
82110

83-
log.Infof("starting the fusing manifest provider, will switch to the static manifest at %s", start)
84-
85111
m.errgrp.Go(func() error {
86-
dynamicUpdates := m.dynamic.ManifestUpdates()
87-
timer := m.clock.Timer(m.clock.Until(start))
88112
defer timer.Stop()
113+
defer m.priority.Stop(context.Background())
89114

90115
for m.runningCtx.Err() == nil {
91116
select {
117+
case priorityManifest = <-m.priority.ManifestUpdates():
118+
if priorityManifest == nil {
119+
timer.Stop()
120+
continue
121+
}
122+
startTime, err := startTimeOfPriority(priorityManifest)
123+
if err != nil {
124+
log.Errorf("trying to compute start time: %+v", err)
125+
// set timer in one epoch, shouldn't happen but be defensive
126+
timer.Reset(priorityManifest.EC.Period)
127+
continue
128+
}
129+
130+
log.Infof("got new priorityManifest, will switch to the priority manifest at %s",
131+
startTime)
132+
timer.Reset(m.clock.Until(startTime))
92133
case <-timer.C:
134+
log.Errorf("timer fired")
135+
if priorityManifest == nil {
136+
log.Errorf("nil priorityManifest")
137+
// just a consistency check, timer might have fired before it was stopped
138+
continue
139+
}
140+
93141
// Make sure we're actually at the target epoch. This shouldn't be
94142
// an issue unless our clocks are really funky, the network is
95143
// behind, or we're in a lotus integration test
96144
// (https://github.com/filecoin-project/lotus/issues/12557).
97145
head, err := m.ec.GetHead(m.runningCtx)
146+
switchEpoch := priorityManifest.BootstrapEpoch - priorityManifest.EC.Finality
98147
switch {
99148
case err != nil:
100149
log.Errorw("failed to get head in fusing manifest provider", "error", err)
@@ -103,27 +152,27 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error {
103152
log.Infow("delaying fusing manifest switch-over because head is behind the target epoch",
104153
"head", head.Epoch(),
105154
"target epoch", switchEpoch,
106-
"bootstrap epoch", m.static.BootstrapEpoch,
155+
"bootstrap epoch", priorityManifest.BootstrapEpoch,
107156
)
108-
timer.Reset(m.static.EC.Period)
157+
timer.Reset(priorityManifest.EC.Period)
109158
continue
110159
}
111160

112161
log.Infow(
113-
"fusing to the static manifest, stopping the dynamic manifest provider",
114-
"network", m.static.NetworkName,
115-
"bootstrap epoch", m.static.BootstrapEpoch,
162+
"fusing to the priority manifest, stopping the dynamic manifest provider",
163+
"network", priorityManifest.NetworkName,
164+
"bootstrap epoch", priorityManifest.BootstrapEpoch,
116165
"current epoch", head.Epoch(),
117166
)
118-
m.updateManifest(m.static)
167+
m.updateManifest(priorityManifest)
119168
// Log any errors and move on. We don't bubble it because we don't
120169
// want to stop everything if shutting down the dynamic manifest
121-
// provider fails when switching over to a static manifest.
170+
// provider fails when switching over to a priority manifest.
122171
if err := m.dynamic.Stop(context.Background()); err != nil {
123172
log.Errorw("failure when stopping dynamic manifest provider", "error", err)
124173
}
125174
return nil
126-
case update := <-dynamicUpdates:
175+
case update := <-m.dynamic.ManifestUpdates():
127176
m.updateManifest(update)
128177
case <-m.runningCtx.Done():
129178
}

manifest/fusing_provider_test.go

+66-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,12 @@ func TestFusingManifestProvider(t *testing.T) {
3232

3333
fakeEc := consensus.NewFakeEC(ctx)
3434
manifestCh := make(chan *manifest.Manifest, 10)
35+
priorityManifestCh := make(chan *manifest.Manifest, 1)
36+
priorityManifestProvider := testManifestProvider(priorityManifestCh)
37+
priorityManifestCh <- initialManifest
38+
3539
prov, err := manifest.NewFusingManifestProvider(ctx,
36-
fakeEc, (testManifestProvider)(manifestCh), initialManifest)
40+
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
3741
require.NoError(t, err)
3842

3943
require.NoError(t, prov.Start(ctx))
@@ -93,10 +97,70 @@ func TestFusingManifestProviderStop(t *testing.T) {
9397

9498
fakeEc := consensus.NewFakeEC(ctx)
9599
manifestCh := make(chan *manifest.Manifest, 1)
100+
priorityManifestCh := make(chan *manifest.Manifest, 1)
101+
priorityManifestProvider := testManifestProvider(priorityManifestCh)
102+
priorityManifestCh <- initialManifest
103+
96104
prov, err := manifest.NewFusingManifestProvider(ctx,
97-
fakeEc, (testManifestProvider)(manifestCh), initialManifest)
105+
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
98106
require.NoError(t, err)
99107

100108
require.NoError(t, prov.Start(ctx))
101109
require.NoError(t, prov.Stop(ctx))
102110
}
111+
112+
func TestFusingManifestProviderSwitchToPriority(t *testing.T) {
113+
ctx, cancel := context.WithCancel(context.Background())
114+
ctx, clk := clock.WithMockClock(ctx)
115+
t.Cleanup(cancel)
116+
117+
initialManifest := manifest.LocalDevnetManifest()
118+
initialManifest.BootstrapEpoch = 2000
119+
initialManifest.EC.Finality = 900
120+
121+
fakeEc := consensus.NewFakeEC(ctx)
122+
manifestCh := make(chan *manifest.Manifest, 10)
123+
124+
priorityManifestCh := make(chan *manifest.Manifest, 1)
125+
priorityManifestProvider := testManifestProvider(priorityManifestCh)
126+
priorityManifestCh <- nil
127+
128+
prov, err := manifest.NewFusingManifestProvider(ctx,
129+
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
130+
require.NoError(t, err)
131+
132+
require.NoError(t, prov.Start(ctx))
133+
priorityManifestCh <- initialManifest
134+
135+
// Create and push a dynamic manifest with bootstrap epoch < 1100
136+
dynamicManifest := *initialManifest
137+
dynamicManifest.BootstrapEpoch = 1000
138+
select {
139+
case manifestCh <- &dynamicManifest:
140+
default:
141+
t.Fatal("failed to enqueue dynamic manifest")
142+
}
143+
144+
select {
145+
case m := <-prov.ManifestUpdates():
146+
require.True(t, m.Equal(&dynamicManifest), "expected dynamic manifest")
147+
case <-time.After(time.Second):
148+
t.Fatal("expected a manifest update")
149+
}
150+
151+
// Add time to reach the priority manifest switch epoch
152+
clk.Add(initialManifest.EC.Period * 1200)
153+
for i := 0; i < 10; i++ {
154+
// fixes weird quirk with fake time
155+
// where the initial manifest doesn't get processed before the initial clk.Add
156+
// and the timer doesn't fire until another clk.Add
157+
clk.Add(1)
158+
}
159+
t.Logf("clck now: %s", clk.Now())
160+
select {
161+
case m := <-prov.ManifestUpdates():
162+
require.True(t, m.Equal(initialManifest), "expected to receive the priority manifest")
163+
case <-time.After(time.Second):
164+
t.Fatal("expected a manifest update")
165+
}
166+
}

0 commit comments

Comments
 (0)