From fa4d16896638876d2763f5f63cf1877a27b74ef0 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 13 Feb 2025 15:42:17 +0100 Subject: [PATCH 1/8] chore: Remove unused comments from fusing_provider_test.go Signed-off-by: Jakub Sztandera --- manifest/fusing_provider_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/manifest/fusing_provider_test.go b/manifest/fusing_provider_test.go index 2cdc482d..3b9044cb 100644 --- a/manifest/fusing_provider_test.go +++ b/manifest/fusing_provider_test.go @@ -11,8 +11,6 @@ import ( "github.com/stretchr/testify/require" ) -// Static manifest provider that doesn't allow any changes -// in runtime to the initial manifest set in the provider type testManifestProvider chan *manifest.Manifest func (testManifestProvider) Start(context.Context) error { return nil } From 08ea12c4eef36da3042d6dd95684531c99aa440f Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 13 Feb 2025 15:42:23 +0100 Subject: [PATCH 2/8] Implement dynamic fusing manifest Signed-off-by: Jakub Sztandera --- manifest/fusing_provider.go | 115 ++++++++++++++++++++++--------- manifest/fusing_provider_test.go | 68 +++++++++++++++++- 2 files changed, 148 insertions(+), 35 deletions(-) diff --git a/manifest/fusing_provider.go b/manifest/fusing_provider.go index 104fe837..25302901 100644 --- a/manifest/fusing_provider.go +++ b/manifest/fusing_provider.go @@ -19,12 +19,12 @@ type HeadGetter interface { var _ ManifestProvider = (*FusingManifestProvider)(nil) // FusingManifestProvider is a ManifestProvider that starts by providing dynamic manifest updates -// then switches to a static manifest when we get within finality of said manifest's bootstrap +// then switches to a priority manifest when we get within finality of said manifest's bootstrap // epoch. type FusingManifestProvider struct { - ec HeadGetter - dynamic ManifestProvider - static *Manifest + ec HeadGetter + dynamic ManifestProvider + priority ManifestProvider manifestCh chan *Manifest @@ -34,11 +34,9 @@ type FusingManifestProvider struct { clock clock.Clock } -func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic ManifestProvider, static *Manifest) (*FusingManifestProvider, error) { - if err := static.Validate(); err != nil { - return nil, err - } - +// NewFusingManifestProvider creates a provider that will lock into the priority manifest onces it reaches BootstrapEpoch of priority manifest +// the priority ManifestProvider needs to provide at least one manifest (or nil), a sign of life, to enable forwarding of dynamic manifests +func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic ManifestProvider, priority ManifestProvider) (*FusingManifestProvider, error) { clk := clock.GetClock(ctx) ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) errgrp, ctx := errgroup.WithContext(ctx) @@ -46,7 +44,7 @@ func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic Manif return &FusingManifestProvider{ ec: ec, dynamic: dynamic, - static: static, + priority: priority, errgrp: errgrp, cancel: cancel, runningCtx: ctx, @@ -60,41 +58,92 @@ func (m *FusingManifestProvider) ManifestUpdates() <-chan *Manifest { } func (m *FusingManifestProvider) Start(ctx context.Context) error { - head, err := m.ec.GetHead(ctx) - if err != nil { - return fmt.Errorf("failed to determine current head epoch") + if err := m.priority.Start(ctx); err != nil { + return err } - switchEpoch := m.static.BootstrapEpoch - m.static.EC.Finality - headEpoch := head.Epoch() - - if headEpoch >= switchEpoch { - m.manifestCh <- m.static - return nil + priorityManifest := <-m.priority.ManifestUpdates() + var timer *clock.Timer + startTimeOfPriority := func(mani *Manifest) (time.Time, error) { + head, err := m.ec.GetHead(ctx) + if err != nil { + return time.Time{}, fmt.Errorf("failed to determine current head epoch: %w", err) + } + headEpoch := head.Epoch() + switchEpoch := mani.BootstrapEpoch - mani.EC.Finality + epochDelay := switchEpoch - headEpoch + start := head.Timestamp().Add(time.Duration(epochDelay) * mani.EC.Period) + return start, nil } - epochDelay := switchEpoch - headEpoch - start := head.Timestamp().Add(time.Duration(epochDelay) * m.static.EC.Period) + { + head, err := m.ec.GetHead(ctx) + if err != nil { + return fmt.Errorf("failed to determine current head epoch: %w", err) + } + headEpoch := head.Epoch() + // exit early if priorityManifest is relevant right now + if priorityManifest != nil && headEpoch >= priorityManifest.BootstrapEpoch-priorityManifest.EC.Finality { + m.priority.Stop(ctx) + m.manifestCh <- priorityManifest + return nil + } + + if priorityManifest != nil { + startTime, err := startTimeOfPriority(priorityManifest) + log.Infof("starting the fusing manifest provider, will switch to the priority manifest at %s,(now %s)", + startTime, m.clock.Now()) + if err != nil { + return fmt.Errorf("trying to compute start time: %w", err) + } + timer = m.clock.Timer(m.clock.Until(startTime)) + } else { + // create a stopped timer + timer = m.clock.Timer(time.Hour) + timer.Stop() + } + } if err := m.dynamic.Start(ctx); err != nil { return err } - log.Infof("starting the fusing manifest provider, will switch to the static manifest at %s", start) - m.errgrp.Go(func() error { - dynamicUpdates := m.dynamic.ManifestUpdates() - timer := m.clock.Timer(m.clock.Until(start)) defer timer.Stop() + defer m.priority.Stop(context.Background()) for m.runningCtx.Err() == nil { select { + case priorityManifest = <-m.priority.ManifestUpdates(): + if priorityManifest == nil { + timer.Stop() + continue + } + startTime, err := startTimeOfPriority(priorityManifest) + if err != nil { + log.Errorf("trying to compute start time: %+v", err) + // set timer in one epoch, shouldn't happen but be defensive + timer.Reset(priorityManifest.EC.Period) + continue + } + + log.Infof("got new priorityManifest, will switch to the priority manifest at %s", + startTime) + timer.Reset(m.clock.Until(startTime)) case <-timer.C: + log.Errorf("timer fired") + if priorityManifest == nil { + log.Errorf("nil priorityManifest") + // just a consistency check, timer might have fired before it was stopped + continue + } + // Make sure we're actually at the target epoch. This shouldn't be // an issue unless our clocks are really funky, the network is // behind, or we're in a lotus integration test // (https://github.com/filecoin-project/lotus/issues/12557). head, err := m.ec.GetHead(m.runningCtx) + switchEpoch := priorityManifest.BootstrapEpoch - priorityManifest.EC.Finality switch { case err != nil: log.Errorw("failed to get head in fusing manifest provider", "error", err) @@ -103,27 +152,27 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { log.Infow("delaying fusing manifest switch-over because head is behind the target epoch", "head", head.Epoch(), "target epoch", switchEpoch, - "bootstrap epoch", m.static.BootstrapEpoch, + "bootstrap epoch", priorityManifest.BootstrapEpoch, ) - timer.Reset(m.static.EC.Period) + timer.Reset(priorityManifest.EC.Period) continue } log.Infow( - "fusing to the static manifest, stopping the dynamic manifest provider", - "network", m.static.NetworkName, - "bootstrap epoch", m.static.BootstrapEpoch, + "fusing to the priority manifest, stopping the dynamic manifest provider", + "network", priorityManifest.NetworkName, + "bootstrap epoch", priorityManifest.BootstrapEpoch, "current epoch", head.Epoch(), ) - m.updateManifest(m.static) + m.updateManifest(priorityManifest) // Log any errors and move on. We don't bubble it because we don't // want to stop everything if shutting down the dynamic manifest - // provider fails when switching over to a static manifest. + // provider fails when switching over to a priority manifest. if err := m.dynamic.Stop(context.Background()); err != nil { log.Errorw("failure when stopping dynamic manifest provider", "error", err) } return nil - case update := <-dynamicUpdates: + case update := <-m.dynamic.ManifestUpdates(): m.updateManifest(update) case <-m.runningCtx.Done(): } diff --git a/manifest/fusing_provider_test.go b/manifest/fusing_provider_test.go index 3b9044cb..745e186d 100644 --- a/manifest/fusing_provider_test.go +++ b/manifest/fusing_provider_test.go @@ -32,8 +32,12 @@ func TestFusingManifestProvider(t *testing.T) { fakeEc := consensus.NewFakeEC(ctx) manifestCh := make(chan *manifest.Manifest, 10) + priorityManifestCh := make(chan *manifest.Manifest, 1) + priorityManifestProvider := testManifestProvider(priorityManifestCh) + priorityManifestCh <- initialManifest + prov, err := manifest.NewFusingManifestProvider(ctx, - fakeEc, (testManifestProvider)(manifestCh), initialManifest) + fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider) require.NoError(t, err) require.NoError(t, prov.Start(ctx)) @@ -93,10 +97,70 @@ func TestFusingManifestProviderStop(t *testing.T) { fakeEc := consensus.NewFakeEC(ctx) manifestCh := make(chan *manifest.Manifest, 1) + priorityManifestCh := make(chan *manifest.Manifest, 1) + priorityManifestProvider := testManifestProvider(priorityManifestCh) + priorityManifestCh <- initialManifest + prov, err := manifest.NewFusingManifestProvider(ctx, - fakeEc, (testManifestProvider)(manifestCh), initialManifest) + fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider) require.NoError(t, err) require.NoError(t, prov.Start(ctx)) require.NoError(t, prov.Stop(ctx)) } + +func TestFusingManifestProviderSwitchToPriority(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ctx, clk := clock.WithMockClock(ctx) + t.Cleanup(cancel) + + initialManifest := manifest.LocalDevnetManifest() + initialManifest.BootstrapEpoch = 2000 + initialManifest.EC.Finality = 900 + + fakeEc := consensus.NewFakeEC(ctx) + manifestCh := make(chan *manifest.Manifest, 10) + + priorityManifestCh := make(chan *manifest.Manifest, 1) + priorityManifestProvider := testManifestProvider(priorityManifestCh) + priorityManifestCh <- nil + + prov, err := manifest.NewFusingManifestProvider(ctx, + fakeEc, (testManifestProvider)(manifestCh), priorityManifestProvider) + require.NoError(t, err) + + require.NoError(t, prov.Start(ctx)) + priorityManifestCh <- initialManifest + + // Create and push a dynamic manifest with bootstrap epoch < 1100 + dynamicManifest := *initialManifest + dynamicManifest.BootstrapEpoch = 1000 + select { + case manifestCh <- &dynamicManifest: + default: + t.Fatal("failed to enqueue dynamic manifest") + } + + select { + case m := <-prov.ManifestUpdates(): + require.True(t, m.Equal(&dynamicManifest), "expected dynamic manifest") + case <-time.After(time.Second): + t.Fatal("expected a manifest update") + } + + // Add time to reach the priority manifest switch epoch + clk.Add(initialManifest.EC.Period * 1200) + for i := 0; i < 10; i++ { + // fixes weird quirk with fake time + // where the initial manifest doesn't get processed before the initial clk.Add + // and the timer doesn't fire until another clk.Add + clk.Add(1) + } + t.Logf("clck now: %s", clk.Now()) + select { + case m := <-prov.ManifestUpdates(): + require.True(t, m.Equal(initialManifest), "expected to receive the priority manifest") + case <-time.After(time.Second): + t.Fatal("expected a manifest update") + } +} From 57518965c4f8545556dd59d399e08e07b80db9af Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 13 Feb 2025 17:20:32 +0100 Subject: [PATCH 3/8] Move the init code into goroutine Signed-off-by: Jakub Sztandera --- manifest/fusing_provider.go | 105 ++++++++++++++++++++---------------- 1 file changed, 59 insertions(+), 46 deletions(-) diff --git a/manifest/fusing_provider.go b/manifest/fusing_provider.go index 25302901..f012cbdf 100644 --- a/manifest/fusing_provider.go +++ b/manifest/fusing_provider.go @@ -34,7 +34,7 @@ type FusingManifestProvider struct { clock clock.Clock } -// NewFusingManifestProvider creates a provider that will lock into the priority manifest onces it reaches BootstrapEpoch of priority manifest +// NewFusingManifestProvider creates a provider that will lock into the priority manifest onces it reaches BootstrapEpoch-Finality of priority manifest // the priority ManifestProvider needs to provide at least one manifest (or nil), a sign of life, to enable forwarding of dynamic manifests func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic ManifestProvider, priority ManifestProvider) (*FusingManifestProvider, error) { clk := clock.GetClock(ctx) @@ -62,55 +62,74 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { return err } - priorityManifest := <-m.priority.ManifestUpdates() - var timer *clock.Timer - startTimeOfPriority := func(mani *Manifest) (time.Time, error) { - head, err := m.ec.GetHead(ctx) - if err != nil { - return time.Time{}, fmt.Errorf("failed to determine current head epoch: %w", err) - } - headEpoch := head.Epoch() - switchEpoch := mani.BootstrapEpoch - mani.EC.Finality - epochDelay := switchEpoch - headEpoch - start := head.Timestamp().Add(time.Duration(epochDelay) * mani.EC.Period) - return start, nil + if err := m.dynamic.Start(ctx); err != nil { + return err } - { - head, err := m.ec.GetHead(ctx) - if err != nil { - return fmt.Errorf("failed to determine current head epoch: %w", err) - } - headEpoch := head.Epoch() - // exit early if priorityManifest is relevant right now - if priorityManifest != nil && headEpoch >= priorityManifest.BootstrapEpoch-priorityManifest.EC.Finality { - m.priority.Stop(ctx) - m.manifestCh <- priorityManifest - return nil + m.errgrp.Go(func() error { + startTimeOfPriority := func(mani *Manifest) (time.Time, error) { + head, err := m.ec.GetHead(m.runningCtx) + if err != nil { + return time.Time{}, fmt.Errorf("failed to determine current head epoch: %w", err) + } + headEpoch := head.Epoch() + switchEpoch := mani.BootstrapEpoch - mani.EC.Finality + epochDelay := switchEpoch - headEpoch + start := head.Timestamp().Add(time.Duration(epochDelay) * mani.EC.Period) + return start, nil } + var priorityManifest *Manifest + var timer *clock.Timer + + first := true + for m.runningCtx.Err() != nil { + if !first { + m.clk.Sleep(5 * time.Second) + first = false + } - if priorityManifest != nil { - startTime, err := startTimeOfPriority(priorityManifest) - log.Infof("starting the fusing manifest provider, will switch to the priority manifest at %s,(now %s)", - startTime, m.clock.Now()) + priorityManifest = <-m.priority.ManifestUpdates() + + head, err := m.ec.GetHead(ctx) if err != nil { - return fmt.Errorf("trying to compute start time: %w", err) + log.Errorf("failed to determine current head epoch: %w", err) + continue + } + headEpoch := head.Epoch() + // exit early if priorityManifest is relevant right now + if priorityManifest != nil && headEpoch >= priorityManifest.BootstrapEpoch-priorityManifest.EC.Finality { + m.priority.Stop(ctx) + m.manifestCh <- priorityManifest + m.priority.Stop() + m.dynamic.Stop() + return nil + } + + if priorityManifest != nil { + startTime, err := startTimeOfPriority(priorityManifest) + log.Infof("starting the fusing manifest provider, will switch to the priority manifest at %s", + startTime) + if err != nil { + log.Errorf("trying to compute start time: %w", err) + continue + } + timer = m.clock.Timer(m.clock.Until(startTime)) + break + } else { + // create a stopped timer + timer = m.clock.Timer(time.Hour) + timer.Stop() + break } - timer = m.clock.Timer(m.clock.Until(startTime)) - } else { - // create a stopped timer - timer = m.clock.Timer(time.Hour) - timer.Stop() } - } - if err := m.dynamic.Start(ctx); err != nil { - return err - } + if m.runningCtx.Err() != nil { + return nil + } - m.errgrp.Go(func() error { defer timer.Stop() defer m.priority.Stop(context.Background()) + defer m.dynamic.Stop(context.Background()) for m.runningCtx.Err() == nil { select { @@ -165,19 +184,13 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { "current epoch", head.Epoch(), ) m.updateManifest(priorityManifest) - // Log any errors and move on. We don't bubble it because we don't - // want to stop everything if shutting down the dynamic manifest - // provider fails when switching over to a priority manifest. - if err := m.dynamic.Stop(context.Background()); err != nil { - log.Errorw("failure when stopping dynamic manifest provider", "error", err) - } return nil case update := <-m.dynamic.ManifestUpdates(): m.updateManifest(update) case <-m.runningCtx.Done(): } } - return m.dynamic.Stop(context.Background()) + return }) return nil From fe2621a5f61e1ccea9a7e65cd8492c2a214f16f8 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 13 Feb 2025 17:22:42 +0100 Subject: [PATCH 4/8] fix typo, move to always defer stop Signed-off-by: Jakub Sztandera --- manifest/fusing_provider.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/manifest/fusing_provider.go b/manifest/fusing_provider.go index f012cbdf..415fbf09 100644 --- a/manifest/fusing_provider.go +++ b/manifest/fusing_provider.go @@ -67,6 +67,9 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { } m.errgrp.Go(func() error { + defer m.priority.Stop(context.Background()) + defer m.dynamic.Stop(context.Background()) + startTimeOfPriority := func(mani *Manifest) (time.Time, error) { head, err := m.ec.GetHead(m.runningCtx) if err != nil { @@ -78,13 +81,14 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { start := head.Timestamp().Add(time.Duration(epochDelay) * mani.EC.Period) return start, nil } + var priorityManifest *Manifest var timer *clock.Timer first := true for m.runningCtx.Err() != nil { if !first { - m.clk.Sleep(5 * time.Second) + m.clock.Sleep(5 * time.Second) first = false } @@ -98,10 +102,7 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { headEpoch := head.Epoch() // exit early if priorityManifest is relevant right now if priorityManifest != nil && headEpoch >= priorityManifest.BootstrapEpoch-priorityManifest.EC.Finality { - m.priority.Stop(ctx) m.manifestCh <- priorityManifest - m.priority.Stop() - m.dynamic.Stop() return nil } @@ -128,8 +129,6 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { } defer timer.Stop() - defer m.priority.Stop(context.Background()) - defer m.dynamic.Stop(context.Background()) for m.runningCtx.Err() == nil { select { @@ -190,7 +189,7 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { case <-m.runningCtx.Done(): } } - return + return nil }) return nil From 3fd26b6f43523ddd7f0aaa3b08a5a3472e2b86f4 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 13 Feb 2025 17:38:42 +0100 Subject: [PATCH 5/8] refactor startTimeOfPriority Signed-off-by: Jakub Sztandera --- manifest/fusing_provider.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/manifest/fusing_provider.go b/manifest/fusing_provider.go index 415fbf09..eb7639a4 100644 --- a/manifest/fusing_provider.go +++ b/manifest/fusing_provider.go @@ -2,7 +2,6 @@ package manifest import ( "context" - "fmt" "time" "github.com/filecoin-project/go-f3/ec" @@ -70,16 +69,12 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { defer m.priority.Stop(context.Background()) defer m.dynamic.Stop(context.Background()) - startTimeOfPriority := func(mani *Manifest) (time.Time, error) { - head, err := m.ec.GetHead(m.runningCtx) - if err != nil { - return time.Time{}, fmt.Errorf("failed to determine current head epoch: %w", err) - } + startTimeOfPriority := func(head ec.TipSet, mani *Manifest) time.Time { headEpoch := head.Epoch() switchEpoch := mani.BootstrapEpoch - mani.EC.Finality epochDelay := switchEpoch - headEpoch start := head.Timestamp().Add(time.Duration(epochDelay) * mani.EC.Period) - return start, nil + return start } var priorityManifest *Manifest @@ -107,7 +102,7 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { } if priorityManifest != nil { - startTime, err := startTimeOfPriority(priorityManifest) + startTime := startTimeOfPriority(head, priorityManifest) log.Infof("starting the fusing manifest provider, will switch to the priority manifest at %s", startTime) if err != nil { @@ -137,7 +132,11 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { timer.Stop() continue } - startTime, err := startTimeOfPriority(priorityManifest) + head, err := m.ec.GetHead(m.runningCtx) + if err != nil { + log.Errorf("getting head in fusing manifest: %+v", err) + } + startTime := startTimeOfPriority(head, priorityManifest) if err != nil { log.Errorf("trying to compute start time: %+v", err) // set timer in one epoch, shouldn't happen but be defensive @@ -149,7 +148,6 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { startTime) timer.Reset(m.clock.Until(startTime)) case <-timer.C: - log.Errorf("timer fired") if priorityManifest == nil { log.Errorf("nil priorityManifest") // just a consistency check, timer might have fired before it was stopped From ebf0fbfa6583f0389a9c73fc9a0ca52d5e225cf5 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 13 Feb 2025 17:52:36 +0100 Subject: [PATCH 6/8] Remove error log Signed-off-by: Jakub Sztandera --- manifest/fusing_provider.go | 1 - 1 file changed, 1 deletion(-) diff --git a/manifest/fusing_provider.go b/manifest/fusing_provider.go index eb7639a4..7df9a4aa 100644 --- a/manifest/fusing_provider.go +++ b/manifest/fusing_provider.go @@ -149,7 +149,6 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { timer.Reset(m.clock.Until(startTime)) case <-timer.C: if priorityManifest == nil { - log.Errorf("nil priorityManifest") // just a consistency check, timer might have fired before it was stopped continue } From 86fb31d1c183d93fe987279134f0197a2da9d56f Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Fri, 14 Feb 2025 16:08:23 +0100 Subject: [PATCH 7/8] Fix init behaviour Signed-off-by: Jakub Sztandera --- manifest/fusing_provider.go | 40 ++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/manifest/fusing_provider.go b/manifest/fusing_provider.go index 7df9a4aa..53906c9f 100644 --- a/manifest/fusing_provider.go +++ b/manifest/fusing_provider.go @@ -78,18 +78,25 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { } var priorityManifest *Manifest - var timer *clock.Timer + // create a stopped timer + timer := m.clock.Timer(time.Hour) + timer.Stop() first := true - for m.runningCtx.Err() != nil { + for m.runningCtx.Err() == nil { if !first { m.clock.Sleep(5 * time.Second) first = false } - priorityManifest = <-m.priority.ManifestUpdates() + select { + case priorityManifest = <-m.priority.ManifestUpdates(): + case <-m.runningCtx.Done(): + // we were stopped, clean exit + return nil + } - head, err := m.ec.GetHead(ctx) + head, err := m.ec.GetHead(m.runningCtx) if err != nil { log.Errorf("failed to determine current head epoch: %w", err) continue @@ -101,22 +108,19 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { return nil } - if priorityManifest != nil { - startTime := startTimeOfPriority(head, priorityManifest) - log.Infof("starting the fusing manifest provider, will switch to the priority manifest at %s", - startTime) - if err != nil { - log.Errorf("trying to compute start time: %w", err) - continue - } - timer = m.clock.Timer(m.clock.Until(startTime)) - break - } else { - // create a stopped timer - timer = m.clock.Timer(time.Hour) - timer.Stop() + if priorityManifest == nil { + // init with stopped timer break } + startTime := startTimeOfPriority(head, priorityManifest) + log.Infof("starting the fusing manifest provider, will switch to the priority manifest at %s", + startTime) + if err != nil { + log.Errorf("trying to compute start time: %w", err) + continue + } + timer.Reset(m.clock.Until(startTime)) + break } if m.runningCtx.Err() != nil { From 8c9208d654528016cd5349753c458c0418f7e7fd Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Mon, 17 Feb 2025 16:40:40 +0100 Subject: [PATCH 8/8] Switch to primary secondary Signed-off-by: Jakub Sztandera --- manifest/fusing_provider.go | 72 ++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/manifest/fusing_provider.go b/manifest/fusing_provider.go index 53906c9f..c4fab8a7 100644 --- a/manifest/fusing_provider.go +++ b/manifest/fusing_provider.go @@ -17,13 +17,13 @@ type HeadGetter interface { var _ ManifestProvider = (*FusingManifestProvider)(nil) -// FusingManifestProvider is a ManifestProvider that starts by providing dynamic manifest updates -// then switches to a priority manifest when we get within finality of said manifest's bootstrap +// FusingManifestProvider is a ManifestProvider that starts by providing secondary manifest updates +// then switches to a primary manifest when we get within finality of said manifest's bootstrap // epoch. type FusingManifestProvider struct { - ec HeadGetter - dynamic ManifestProvider - priority ManifestProvider + ec HeadGetter + secondary ManifestProvider + primary ManifestProvider manifestCh chan *Manifest @@ -33,17 +33,17 @@ type FusingManifestProvider struct { clock clock.Clock } -// NewFusingManifestProvider creates a provider that will lock into the priority manifest onces it reaches BootstrapEpoch-Finality of priority manifest -// the priority ManifestProvider needs to provide at least one manifest (or nil), a sign of life, to enable forwarding of dynamic manifests -func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, dynamic ManifestProvider, priority ManifestProvider) (*FusingManifestProvider, error) { +// NewFusingManifestProvider creates a provider that will lock into the primary manifest onces it reaches BootstrapEpoch-Finality of primary manifest +// the primary ManifestProvider needs to provide at least one manifest (or nil), a sign of life, to enable forwarding of secondary manifests. +func NewFusingManifestProvider(ctx context.Context, ec HeadGetter, secondary ManifestProvider, primary ManifestProvider) (*FusingManifestProvider, error) { clk := clock.GetClock(ctx) ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) errgrp, ctx := errgroup.WithContext(ctx) return &FusingManifestProvider{ ec: ec, - dynamic: dynamic, - priority: priority, + secondary: secondary, + primary: primary, errgrp: errgrp, cancel: cancel, runningCtx: ctx, @@ -57,17 +57,17 @@ func (m *FusingManifestProvider) ManifestUpdates() <-chan *Manifest { } func (m *FusingManifestProvider) Start(ctx context.Context) error { - if err := m.priority.Start(ctx); err != nil { + if err := m.primary.Start(ctx); err != nil { return err } - if err := m.dynamic.Start(ctx); err != nil { + if err := m.secondary.Start(ctx); err != nil { return err } m.errgrp.Go(func() error { - defer m.priority.Stop(context.Background()) - defer m.dynamic.Stop(context.Background()) + defer m.primary.Stop(context.Background()) + defer m.secondary.Stop(context.Background()) startTimeOfPriority := func(head ec.TipSet, mani *Manifest) time.Time { headEpoch := head.Epoch() @@ -77,7 +77,7 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { return start } - var priorityManifest *Manifest + var primaryManifest *Manifest // create a stopped timer timer := m.clock.Timer(time.Hour) timer.Stop() @@ -90,7 +90,7 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { } select { - case priorityManifest = <-m.priority.ManifestUpdates(): + case primaryManifest = <-m.primary.ManifestUpdates(): case <-m.runningCtx.Done(): // we were stopped, clean exit return nil @@ -102,18 +102,18 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { continue } headEpoch := head.Epoch() - // exit early if priorityManifest is relevant right now - if priorityManifest != nil && headEpoch >= priorityManifest.BootstrapEpoch-priorityManifest.EC.Finality { - m.manifestCh <- priorityManifest + // exit early if primaryManifest is relevant right now + if primaryManifest != nil && headEpoch >= primaryManifest.BootstrapEpoch-primaryManifest.EC.Finality { + m.manifestCh <- primaryManifest return nil } - if priorityManifest == nil { + if primaryManifest == nil { // init with stopped timer break } - startTime := startTimeOfPriority(head, priorityManifest) - log.Infof("starting the fusing manifest provider, will switch to the priority manifest at %s", + startTime := startTimeOfPriority(head, primaryManifest) + log.Infof("starting the fusing manifest provider, will switch to the primary manifest at %s", startTime) if err != nil { log.Errorf("trying to compute start time: %w", err) @@ -131,8 +131,8 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { for m.runningCtx.Err() == nil { select { - case priorityManifest = <-m.priority.ManifestUpdates(): - if priorityManifest == nil { + case primaryManifest = <-m.primary.ManifestUpdates(): + if primaryManifest == nil { timer.Stop() continue } @@ -140,19 +140,19 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { if err != nil { log.Errorf("getting head in fusing manifest: %+v", err) } - startTime := startTimeOfPriority(head, priorityManifest) + startTime := startTimeOfPriority(head, primaryManifest) if err != nil { log.Errorf("trying to compute start time: %+v", err) // set timer in one epoch, shouldn't happen but be defensive - timer.Reset(priorityManifest.EC.Period) + timer.Reset(primaryManifest.EC.Period) continue } - log.Infof("got new priorityManifest, will switch to the priority manifest at %s", + log.Infof("got new primaryManifest, will switch to the primary manifest at %s", startTime) timer.Reset(m.clock.Until(startTime)) case <-timer.C: - if priorityManifest == nil { + if primaryManifest == nil { // just a consistency check, timer might have fired before it was stopped continue } @@ -162,7 +162,7 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { // behind, or we're in a lotus integration test // (https://github.com/filecoin-project/lotus/issues/12557). head, err := m.ec.GetHead(m.runningCtx) - switchEpoch := priorityManifest.BootstrapEpoch - priorityManifest.EC.Finality + switchEpoch := primaryManifest.BootstrapEpoch - primaryManifest.EC.Finality switch { case err != nil: log.Errorw("failed to get head in fusing manifest provider", "error", err) @@ -171,21 +171,21 @@ func (m *FusingManifestProvider) Start(ctx context.Context) error { log.Infow("delaying fusing manifest switch-over because head is behind the target epoch", "head", head.Epoch(), "target epoch", switchEpoch, - "bootstrap epoch", priorityManifest.BootstrapEpoch, + "bootstrap epoch", primaryManifest.BootstrapEpoch, ) - timer.Reset(priorityManifest.EC.Period) + timer.Reset(primaryManifest.EC.Period) continue } log.Infow( - "fusing to the priority manifest, stopping the dynamic manifest provider", - "network", priorityManifest.NetworkName, - "bootstrap epoch", priorityManifest.BootstrapEpoch, + "fusing to the primary manifest, stopping the secondary manifest provider", + "network", primaryManifest.NetworkName, + "bootstrap epoch", primaryManifest.BootstrapEpoch, "current epoch", head.Epoch(), ) - m.updateManifest(priorityManifest) + m.updateManifest(primaryManifest) return nil - case update := <-m.dynamic.ManifestUpdates(): + case update := <-m.secondary.ManifestUpdates(): m.updateManifest(update) case <-m.runningCtx.Done(): }