Skip to content

Commit c3e1896

Browse files
committed
Add finalize API to EC backend
Expand the F3 EC backend API to mark a given tipset as final. This translates to checkpointing the tipset in Lotus beyond which no forks are allowed. Part of #603
1 parent f49c874 commit c3e1896

File tree

3 files changed

+50
-7
lines changed

3 files changed

+50
-7
lines changed

ec/ec.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,21 @@ type Backend interface {
1414
GetTipsetByEpoch(ctx context.Context, epoch int64) (TipSet, error)
1515
// GetTipset returns the tipset with the given key.
1616
GetTipset(context.Context, gpbft.TipSetKey) (TipSet, error)
17-
// GetHead returns the current head tipset of the chain
17+
// GetHead returns the current head tipset of the chain, which must be a
18+
// descendant of the latest finalized tipset.
19+
//
20+
// See Finalize.
1821
GetHead(context.Context) (TipSet, error)
1922
// GetParent returns the parent of the current tipset.
2023
GetParent(context.Context, TipSet) (TipSet, error)
2124
// GetPowerTable returns the power table at the tipset given as an argument.
2225
GetPowerTable(context.Context, gpbft.TipSetKey) (gpbft.PowerEntries, error)
26+
// Finalize marks the tipset that corresponds to the given key as finalised
27+
// beyond which no forks are allowed to occur. The finalised tipset overrides the
28+
// head tipset if it is not an ancestor of the current head.
29+
//
30+
// See GetHead.
31+
Finalize(context.Context, gpbft.TipSetKey) error
2332
}
2433

2534
type TipSet interface {

host.go

+38-6
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ import (
1515
"github.com/filecoin-project/go-f3/internal/clock"
1616
"github.com/filecoin-project/go-f3/internal/psutil"
1717
"github.com/filecoin-project/go-f3/manifest"
18-
"go.opentelemetry.io/otel/metric"
19-
2018
pubsub "github.com/libp2p/go-libp2p-pubsub"
21-
peer "github.com/libp2p/go-libp2p/core/peer"
19+
"github.com/libp2p/go-libp2p/core/peer"
20+
"go.opentelemetry.io/otel/metric"
2221
"go.uber.org/multierr"
2322
"golang.org/x/sync/errgroup"
2423
)
@@ -109,7 +108,6 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
109108
}
110109

111110
finalityCertificates, unsubCerts := h.certStore.Subscribe()
112-
113111
h.errgrp.Go(func() (_err error) {
114112
defer func() {
115113
unsubCerts()
@@ -164,6 +162,40 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
164162
}
165163
return nil
166164
})
165+
166+
// Asynchronously checkpoint the decided tipset keys by explicitly making a
167+
// separate subscription to the cert store. This may cause a sync in a case where
168+
// the finalized tipset is not already stored by the chain store, which is a
169+
// blocking operation. Hence, the asynchronous checkpointing.
170+
//
171+
// Note that not every finalized tipset is guaranteed to be checkpointed.
172+
// Because:
173+
// 1. the subscription only returns the latest certificate, i.e. may
174+
// miss intermediate certificates, and
175+
// 2. errors that may occur during checkpointing are silently logged
176+
// to allow checkpointing of future finality certificates.
177+
//
178+
// Triggering the checkpointing here means that certstore remains the sole source
179+
// of truth in terms of tipset has been finalised.
180+
finalize, unsubFinalize := h.certStore.Subscribe()
181+
h.errgrp.Go(func() error {
182+
defer unsubFinalize()
183+
for cert := range finalize {
184+
key := cert.ECChain.Head().Key
185+
select {
186+
case <-h.runningCtx.Done():
187+
return nil
188+
default:
189+
if err := h.ec.Finalize(h.runningCtx, key); err != nil {
190+
// There is not much we can do here other than logging. The next instance start
191+
// will effectively retry checkpointing the latest finalized tipset. This error
192+
// will not impact the selection of next instance chain.
193+
log.Error(fmt.Errorf("error while finalizing decision at EC: %w", err))
194+
}
195+
}
196+
}
197+
return nil
198+
})
167199
return nil
168200
}
169201

@@ -408,7 +440,7 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e
408440
return res[1:], nil
409441
}
410442

411-
func (h *gpbftRunner) Stop(_ctx context.Context) error {
443+
func (h *gpbftRunner) Stop(context.Context) error {
412444
h.ctxCancel()
413445
return multierr.Combine(
414446
h.errgrp.Wait(),
@@ -604,7 +636,7 @@ func (h *gpbftHost) SetAlarm(at time.Time) {
604636
// we cannot reuse the timer because we don't know if it was read or not
605637
h.alertTimer.Stop()
606638
if at.IsZero() {
607-
// It "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
639+
// If "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
608640
// replace it for the reason stated above.
609641
h.alertTimer = h.clock.Timer(0)
610642
if !h.alertTimer.Stop() {

internal/consensus/fake_ec.go

+2
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,5 @@ func (ec *FakeEC) GetTipset(_ context.Context, tsk gpbft.TipSetKey) (ec.TipSet,
183183
epoch := binary.BigEndian.Uint64(tsk[6+32-8 : 6+32])
184184
return ec.genTipset(int64(epoch)), nil
185185
}
186+
187+
func (ec *FakeEC) Finalize(context.Context, gpbft.TipSetKey) error { return nil }

0 commit comments

Comments
 (0)