Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dynamic fusing manifest #884

Merged
merged 8 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 107 additions & 45 deletions manifest/fusing_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package manifest

import (
"context"
"fmt"
"time"

"github.com/filecoin-project/go-f3/ec"
Expand All @@ -18,13 +17,13 @@ type HeadGetter interface {

var _ ManifestProvider = (*FusingManifestProvider)(nil)

// FusingManifestProvider is a ManifestProvider that starts by providing dynamic manifest updates
// then switches to a static manifest when we get within finality of said manifest's bootstrap
// FusingManifestProvider is a ManifestProvider that starts by providing secondary manifest updates
// then switches to a primary manifest when we get within finality of said manifest's bootstrap
// epoch.
type FusingManifestProvider struct {
ec HeadGetter
dynamic ManifestProvider
static *Manifest
ec HeadGetter
secondary ManifestProvider
primary ManifestProvider

manifestCh chan *Manifest

Expand All @@ -34,19 +33,17 @@ type FusingManifestProvider struct {
clock clock.Clock
}

func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic ManifestProvider, static *Manifest) (*FusingManifestProvider, error) {
if err := static.Validate(); err != nil {
return nil, err
}

// NewFusingManifestProvider creates a provider that will lock into the primary manifest onces it reaches BootstrapEpoch-Finality of primary manifest
// the primary ManifestProvider needs to provide at least one manifest (or nil), a sign of life, to enable forwarding of secondary manifests.
func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, secondary ManifestProvider, primary ManifestProvider) (*FusingManifestProvider, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OCD: secondary comes as first argument 😱

clk := clock.GetClock(ctx)
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
errgrp, ctx := errgroup.WithContext(ctx)

return &FusingManifestProvider{
ec: ec,
dynamic: dynamic,
static: static,
secondary: secondary,
primary: primary,
errgrp: errgrp,
cancel: cancel,
runningCtx: ctx,
Expand All @@ -60,41 +57,112 @@ func (m *FusingManifestProvider) ManifestUpdates() <-chan *Manifest {
}

func (m *FusingManifestProvider) Start(ctx context.Context) error {
head, err := m.ec.GetHead(ctx)
if err != nil {
return fmt.Errorf("failed to determine current head epoch")
if err := m.primary.Start(ctx); err != nil {
return err
}

switchEpoch := m.static.BootstrapEpoch - m.static.EC.Finality
headEpoch := head.Epoch()

if headEpoch >= switchEpoch {
m.manifestCh <- m.static
return nil
if err := m.secondary.Start(ctx); err != nil {
return err
}

epochDelay := switchEpoch - headEpoch
start := head.Timestamp().Add(time.Duration(epochDelay) * m.static.EC.Period)
m.errgrp.Go(func() error {
defer m.primary.Stop(context.Background())
defer m.secondary.Stop(context.Background())

startTimeOfPriority := func(head ec.TipSet, mani *Manifest) time.Time {
headEpoch := head.Epoch()
switchEpoch := mani.BootstrapEpoch - mani.EC.Finality
epochDelay := switchEpoch - headEpoch
start := head.Timestamp().Add(time.Duration(epochDelay) * mani.EC.Period)
return start
}

if err := m.dynamic.Start(ctx); err != nil {
return err
}
var primaryManifest *Manifest
// create a stopped timer
timer := m.clock.Timer(time.Hour)
timer.Stop()

log.Infof("starting the fusing manifest provider, will switch to the static manifest at %s", start)
first := true
for m.runningCtx.Err() == nil {
if !first {
m.clock.Sleep(5 * time.Second)
first = false
}

select {
case primaryManifest = <-m.primary.ManifestUpdates():
case <-m.runningCtx.Done():
// we were stopped, clean exit
return nil
}

head, err := m.ec.GetHead(m.runningCtx)
if err != nil {
log.Errorf("failed to determine current head epoch: %w", err)
continue
}
headEpoch := head.Epoch()
// exit early if primaryManifest is relevant right now
if primaryManifest != nil && headEpoch >= primaryManifest.BootstrapEpoch-primaryManifest.EC.Finality {
m.manifestCh <- primaryManifest
return nil
}

if primaryManifest == nil {
// init with stopped timer
break
}
startTime := startTimeOfPriority(head, primaryManifest)
log.Infof("starting the fusing manifest provider, will switch to the primary manifest at %s",
startTime)
if err != nil {
log.Errorf("trying to compute start time: %w", err)
continue
}
timer.Reset(m.clock.Until(startTime))
break
}

if m.runningCtx.Err() != nil {
return nil
}

m.errgrp.Go(func() error {
dynamicUpdates := m.dynamic.ManifestUpdates()
timer := m.clock.Timer(m.clock.Until(start))
defer timer.Stop()

for m.runningCtx.Err() == nil {
select {
case primaryManifest = <-m.primary.ManifestUpdates():
if primaryManifest == nil {
timer.Stop()
continue
}
head, err := m.ec.GetHead(m.runningCtx)
if err != nil {
log.Errorf("getting head in fusing manifest: %+v", err)
}
startTime := startTimeOfPriority(head, primaryManifest)
if err != nil {
log.Errorf("trying to compute start time: %+v", err)
// set timer in one epoch, shouldn't happen but be defensive
timer.Reset(primaryManifest.EC.Period)
continue
}

log.Infof("got new primaryManifest, will switch to the primary manifest at %s",
startTime)
timer.Reset(m.clock.Until(startTime))
case <-timer.C:
if primaryManifest == nil {
// just a consistency check, timer might have fired before it was stopped
continue
}

// Make sure we're actually at the target epoch. This shouldn't be
// an issue unless our clocks are really funky, the network is
// behind, or we're in a lotus integration test
// (https://github.com/filecoin-project/lotus/issues/12557).
head, err := m.ec.GetHead(m.runningCtx)
switchEpoch := primaryManifest.BootstrapEpoch - primaryManifest.EC.Finality
switch {
case err != nil:
log.Errorw("failed to get head in fusing manifest provider", "error", err)
Expand All @@ -103,32 +171,26 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error {
log.Infow("delaying fusing manifest switch-over because head is behind the target epoch",
"head", head.Epoch(),
"target epoch", switchEpoch,
"bootstrap epoch", m.static.BootstrapEpoch,
"bootstrap epoch", primaryManifest.BootstrapEpoch,
)
timer.Reset(m.static.EC.Period)
timer.Reset(primaryManifest.EC.Period)
continue
}

log.Infow(
"fusing to the static manifest, stopping the dynamic manifest provider",
"network", m.static.NetworkName,
"bootstrap epoch", m.static.BootstrapEpoch,
"fusing to the primary manifest, stopping the secondary manifest provider",
"network", primaryManifest.NetworkName,
"bootstrap epoch", primaryManifest.BootstrapEpoch,
"current epoch", head.Epoch(),
)
m.updateManifest(m.static)
// Log any errors and move on. We don't bubble it because we don't
// want to stop everything if shutting down the dynamic manifest
// provider fails when switching over to a static manifest.
if err := m.dynamic.Stop(context.Background()); err != nil {
log.Errorw("failure when stopping dynamic manifest provider", "error", err)
}
m.updateManifest(primaryManifest)
return nil
case update := <-dynamicUpdates:
case update := <-m.secondary.ManifestUpdates():
m.updateManifest(update)
case <-m.runningCtx.Done():
}
}
return m.dynamic.Stop(context.Background())
return nil
})

return nil
Expand Down
70 changes: 66 additions & 4 deletions manifest/fusing_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/stretchr/testify/require"
)

// Static manifest provider that doesn't allow any changes
// in runtime to the initial manifest set in the provider
type testManifestProvider chan *manifest.Manifest

func (testManifestProvider) Start(context.Context) error { return nil }
Expand All @@ -34,8 +32,12 @@ func TestFusingManifestProvider(t *testing.T) {

fakeEc := consensus.NewFakeEC(ctx)
manifestCh := make(chan *manifest.Manifest, 10)
priorityManifestCh := make(chan *manifest.Manifest, 1)
priorityManifestProvider := testManifestProvider(priorityManifestCh)
priorityManifestCh <- initialManifest

prov, err := manifest.NewFusingManifestProvider(ctx,
fakeEc, (testManifestProvider)(manifestCh), initialManifest)
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
require.NoError(t, err)

require.NoError(t, prov.Start(ctx))
Expand Down Expand Up @@ -95,10 +97,70 @@ func TestFusingManifestProviderStop(t *testing.T) {

fakeEc := consensus.NewFakeEC(ctx)
manifestCh := make(chan *manifest.Manifest, 1)
priorityManifestCh := make(chan *manifest.Manifest, 1)
priorityManifestProvider := testManifestProvider(priorityManifestCh)
priorityManifestCh <- initialManifest

prov, err := manifest.NewFusingManifestProvider(ctx,
fakeEc, (testManifestProvider)(manifestCh), initialManifest)
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
require.NoError(t, err)

require.NoError(t, prov.Start(ctx))
require.NoError(t, prov.Stop(ctx))
}

func TestFusingManifestProviderSwitchToPriority(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx, clk := clock.WithMockClock(ctx)
t.Cleanup(cancel)

initialManifest := manifest.LocalDevnetManifest()
initialManifest.BootstrapEpoch = 2000
initialManifest.EC.Finality = 900

fakeEc := consensus.NewFakeEC(ctx)
manifestCh := make(chan *manifest.Manifest, 10)

priorityManifestCh := make(chan *manifest.Manifest, 1)
priorityManifestProvider := testManifestProvider(priorityManifestCh)
priorityManifestCh <- nil

prov, err := manifest.NewFusingManifestProvider(ctx,
fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider)
require.NoError(t, err)

require.NoError(t, prov.Start(ctx))
priorityManifestCh <- initialManifest

// Create and push a dynamic manifest with bootstrap epoch < 1100
dynamicManifest := *initialManifest
dynamicManifest.BootstrapEpoch = 1000
select {
case manifestCh <- &dynamicManifest:
default:
t.Fatal("failed to enqueue dynamic manifest")
}

select {
case m := <-prov.ManifestUpdates():
require.True(t, m.Equal(&dynamicManifest), "expected dynamic manifest")
case <-time.After(time.Second):
t.Fatal("expected a manifest update")
}

// Add time to reach the priority manifest switch epoch
clk.Add(initialManifest.EC.Period * 1200)
for i := 0; i < 10; i++ {
// fixes weird quirk with fake time
// where the initial manifest doesn't get processed before the initial clk.Add
// and the timer doesn't fire until another clk.Add
clk.Add(1)
}
t.Logf("clck now: %s", clk.Now())
select {
case m := <-prov.ManifestUpdates():
require.True(t, m.Equal(initialManifest), "expected to receive the priority manifest")
case <-time.After(time.Second):
t.Fatal("expected a manifest update")
}
}
Loading