@@ -15,10 +15,9 @@ import (
15
15
"github.com/filecoin-project/go-f3/internal/clock"
16
16
"github.com/filecoin-project/go-f3/internal/psutil"
17
17
"github.com/filecoin-project/go-f3/manifest"
18
- "go.opentelemetry.io/otel/metric"
19
-
20
18
pubsub "github.com/libp2p/go-libp2p-pubsub"
21
- peer "github.com/libp2p/go-libp2p/core/peer"
19
+ "github.com/libp2p/go-libp2p/core/peer"
20
+ "go.opentelemetry.io/otel/metric"
22
21
"go.uber.org/multierr"
23
22
"golang.org/x/sync/errgroup"
24
23
)
@@ -109,7 +108,6 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
109
108
}
110
109
111
110
finalityCertificates , unsubCerts := h .certStore .Subscribe ()
112
-
113
111
h .errgrp .Go (func () (_err error ) {
114
112
defer func () {
115
113
unsubCerts ()
@@ -164,6 +162,45 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
164
162
}
165
163
return nil
166
164
})
165
+
166
+ // Asynchronously checkpoint the decided tipset keys by explicitly making a
167
+ // separate subscription to the cert store. This may cause a sync in a case where
168
+ // the finalized tipset is not already stored by the chain store, which is a
169
+ // blocking operation. Hence, the asynchronous checkpointing.
170
+ //
171
+ // Note, there is no guarantee that every finalized tipset will be checkpointed.
172
+ // Because:
173
+ // 1. the subscription only returns the latest certificate, i.e. may
174
+ // miss intermediate certificates, and
175
+ // 2. errors that may occur during checkpointing are silently logged
176
+ // to allow checkpointing of future finality certificates.
177
+ //
178
+ // Triggering the checkpointing here means that certstore remains the sole source
179
+ // of truth in terms of tipsets that have been finalised.
180
+ finalize , unsubFinalize := h .certStore .Subscribe ()
181
+ h .errgrp .Go (func () error {
182
+ defer unsubFinalize ()
183
+ for h .runningCtx .Err () == nil {
184
+ select {
185
+ case <- h .runningCtx .Done ():
186
+ return nil
187
+ case cert , ok := <- finalize :
188
+ if ! ok {
189
+ // This should never happen according to certstore subscribe semantic. If it
190
+ // does, error loudly since the chances are the cause is a programmer error.
191
+ return errors .New ("cert store subscription to finalize tipsets was closed unexpectedly" )
192
+ }
193
+ key := cert .ECChain .Head ().Key
194
+ if err := h .ec .Finalize (h .runningCtx , key ); err != nil {
195
+ // There is not much we can do here other than logging. The next instance start
196
+ // will effectively retry checkpointing the latest finalized tipset. This error
197
+ // will not impact the selection of next instance chain.
198
+ log .Error (fmt .Errorf ("error while finalizing decision at EC: %w" , err ))
199
+ }
200
+ }
201
+ }
202
+ return nil
203
+ })
167
204
return nil
168
205
}
169
206
@@ -408,7 +445,7 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e
408
445
return res [1 :], nil
409
446
}
410
447
411
- func (h * gpbftRunner ) Stop (_ctx context.Context ) error {
448
+ func (h * gpbftRunner ) Stop (context.Context ) error {
412
449
h .ctxCancel ()
413
450
return multierr .Combine (
414
451
h .errgrp .Wait (),
@@ -604,7 +641,7 @@ func (h *gpbftHost) SetAlarm(at time.Time) {
604
641
// we cannot reuse the timer because we don't know if it was read or not
605
642
h .alertTimer .Stop ()
606
643
if at .IsZero () {
607
- // It "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
644
+ // If "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
608
645
// replace it for the reason stated above.
609
646
h .alertTimer = h .clock .Timer (0 )
610
647
if ! h .alertTimer .Stop () {
0 commit comments