-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathf3.go
177 lines (151 loc) · 4.93 KB
/
f3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package lf3
import (
"context"
"errors"
"time"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-f3"
"github.com/filecoin-project/go-f3/blssig"
"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
type F3 struct {
inner *f3.F3
ec *ecWrapper
signer gpbft.Signer
newLeases chan leaseRequest
}
type F3Params struct {
fx.In
NetworkName dtypes.NetworkName
ManifestProvider manifest.ManifestProvider
PubSub *pubsub.PubSub
Host host.Host
ChainStore *store.ChainStore
Syncer *chain.Syncer
StateManager *stmgr.StateManager
Datastore dtypes.MetadataDS
Wallet api.Wallet
}
var log = logging.Logger("f3")
func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) {
ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3"))
ec := &ecWrapper{
ChainStore: params.ChainStore,
StateManager: params.StateManager,
Syncer: params.Syncer,
}
verif := blssig.VerifierWithKeyOnG1()
module, err := f3.New(mctx, params.ManifestProvider, ds,
params.Host, params.PubSub, verif, ec)
if err != nil {
return nil, xerrors.Errorf("creating F3: %w", err)
}
fff := &F3{
inner: module,
ec: ec,
signer: &signer{params.Wallet},
newLeases: make(chan leaseRequest, 4), // some buffer to avoid blocking
}
lCtx, cancel := context.WithCancel(mctx)
lc.Append(fx.StartStopHook(
func() {
err := fff.inner.Start(lCtx)
if err != nil {
log.Errorf("running f3: %+v", err)
return
}
go fff.runSigningLoop(lCtx)
}, cancel))
return fff, nil
}
type leaseRequest struct {
minerID uint64
newExpiration time.Time
oldExpiration time.Time
resultCh chan<- bool
}
func (fff *F3) runSigningLoop(ctx context.Context) {
participateOnce := func(ctx context.Context, mb *gpbft.MessageBuilder, minerID uint64) error {
signatureBuilder, err := mb.PrepareSigningInputs(gpbft.ActorID(minerID))
if errors.Is(err, gpbft.ErrNoPower) {
// we don't have any power in F3, continue
log.Debug("no power to participate in F3: %+v", err)
return nil
}
if err != nil {
return xerrors.Errorf("preparing signing inputs: %+v", err)
}
// if worker keys were stored not in the node, the signatureBuilder can be send there
// the sign can be called where the keys are stored and then
// {signatureBuilder, payloadSig, vrfSig} can be sent back to lotus for broadcast
payloadSig, vrfSig, err := signatureBuilder.Sign(ctx, fff.signer)
if err != nil {
return xerrors.Errorf("signing message: %+v", err)
}
log.Debugf("miner with id %d is sending message in F3", minerID)
fff.inner.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig)
return nil
}
leaseMngr := new(leaseManager)
msgCh := fff.inner.MessagesToSign()
loop:
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case l := <-fff.newLeases:
// resultCh has only one user and is buffered
l.resultCh <- leaseMngr.UpsertDefensive(l.minerID, l.newExpiration, l.oldExpiration)
close(l.resultCh)
case mb, ok := <-msgCh:
if !ok {
continue loop
}
for _, minerID := range leaseMngr.Active() {
err := participateOnce(ctx, mb, minerID)
if err != nil {
log.Errorf("while participating for miner f0%d: %+v", minerID, err)
}
}
}
}
}
// Participate notifies participation loop about a new lease
// Returns true if lease was accepted
func (fff *F3) Participate(ctx context.Context, minerID uint64, newLeaseExpiration, oldLeaseExpiration time.Time) bool {
resultCh := make(chan bool, 1) //buffer the channel to for sure avoid blocking
request := leaseRequest{minerID: minerID, newExpiration: newLeaseExpiration, resultCh: resultCh}
select {
case fff.newLeases <- request:
return <-resultCh
case <-ctx.Done():
return false
}
}
func (fff *F3) GetCert(ctx context.Context, instance uint64) (*certs.FinalityCertificate, error) {
return fff.inner.GetCert(ctx, instance)
}
func (fff *F3) GetLatestCert(ctx context.Context) (*certs.FinalityCertificate, error) {
return fff.inner.GetLatestCert(ctx)
}
func (fff *F3) GetPowerTable(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error) {
return fff.ec.getPowerTableLotusTSK(ctx, tsk)
}
func (fff *F3) GetF3PowerTable(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error) {
return fff.inner.GetPowerTable(ctx, tsk.Bytes())
}