@@ -15,10 +15,9 @@ import (
15
15
"github.com/filecoin-project/go-f3/internal/clock"
16
16
"github.com/filecoin-project/go-f3/internal/psutil"
17
17
"github.com/filecoin-project/go-f3/manifest"
18
- "go.opentelemetry.io/otel/metric"
19
-
20
18
pubsub "github.com/libp2p/go-libp2p-pubsub"
21
- peer "github.com/libp2p/go-libp2p/core/peer"
19
+ "github.com/libp2p/go-libp2p/core/peer"
20
+ "go.opentelemetry.io/otel/metric"
22
21
"go.uber.org/multierr"
23
22
"golang.org/x/sync/errgroup"
24
23
)
@@ -109,7 +108,6 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
109
108
}
110
109
111
110
finalityCertificates , unsubCerts := h .certStore .Subscribe ()
112
-
113
111
h .errgrp .Go (func () (_err error ) {
114
112
defer func () {
115
113
unsubCerts ()
@@ -164,6 +162,40 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
164
162
}
165
163
return nil
166
164
})
165
+
166
+ // Asynchronously checkpoint the decided tipset keys by explicitly making a
167
+ // separate subscription to the cert store. This may cause a sync in a case where
168
+ // the finalized tipset is not already stored by the chain store, which is a
169
+ // blocking operation. Hence, the asynchronous checkpointing.
170
+ //
171
+ // Note, there is no guarantee that every finalized tipset will be checkpointed.
172
+ // Because:
173
+ // 1. the subscription only returns the latest certificate, i.e. may
174
+ // miss intermediate certificates, and
175
+ // 2. errors that may occur during checkpointing are silently logged
176
+ // to allow checkpointing of future finality certificates.
177
+ //
178
+ // Triggering the checkpointing here means that certstore remains the sole source
179
+ // of truth in terms of tipsets that have been finalised.
180
+ finalize , unsubFinalize := h .certStore .Subscribe ()
181
+ h .errgrp .Go (func () error {
182
+ defer unsubFinalize ()
183
+ for cert := range finalize {
184
+ key := cert .ECChain .Head ().Key
185
+ select {
186
+ case <- h .runningCtx .Done ():
187
+ return nil
188
+ default :
189
+ if err := h .ec .Finalize (h .runningCtx , key ); err != nil {
190
+ // There is not much we can do here other than logging. The next instance start
191
+ // will effectively retry checkpointing the latest finalized tipset. This error
192
+ // will not impact the selection of next instance chain.
193
+ log .Error (fmt .Errorf ("error while finalizing decision at EC: %w" , err ))
194
+ }
195
+ }
196
+ }
197
+ return nil
198
+ })
167
199
return nil
168
200
}
169
201
@@ -408,7 +440,7 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e
408
440
return res [1 :], nil
409
441
}
410
442
411
- func (h * gpbftRunner ) Stop (_ctx context.Context ) error {
443
+ func (h * gpbftRunner ) Stop (context.Context ) error {
412
444
h .ctxCancel ()
413
445
return multierr .Combine (
414
446
h .errgrp .Wait (),
@@ -604,7 +636,7 @@ func (h *gpbftHost) SetAlarm(at time.Time) {
604
636
// we cannot reuse the timer because we don't know if it was read or not
605
637
h .alertTimer .Stop ()
606
638
if at .IsZero () {
607
- // It "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
639
+ // If "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
608
640
// replace it for the reason stated above.
609
641
h .alertTimer = h .clock .Timer (0 )
610
642
if ! h .alertTimer .Stop () {
0 commit comments