-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathdefaults.go
204 lines (172 loc) · 6.12 KB
/
defaults.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package handlers
import (
"context"
"errors"
"fmt"
abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/server/v2/cometbft/mempool"
consensustypes "cosmossdk.io/x/consensus/types"
)
type AppManager[T transaction.Tx] interface {
ValidateTx(ctx context.Context, tx T) (server.TxResult, error)
Query(ctx context.Context, version uint64, request transaction.Msg) (response transaction.Msg, err error)
}
type DefaultProposalHandler[T transaction.Tx] struct {
mempool mempool.Mempool[T]
txSelector TxSelector[T]
}
func NewDefaultProposalHandler[T transaction.Tx](mp mempool.Mempool[T]) *DefaultProposalHandler[T] {
return &DefaultProposalHandler[T]{
mempool: mp,
txSelector: NewDefaultTxSelector[T](),
}
}
func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
var maxBlockGas uint64
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return nil, err
}
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}
if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}
txs := decodeTxs(codec, req.Txs)
defer h.txSelector.Clear()
// If the mempool is nil or NoOp we simply return the transactions
// requested from CometBFT, which, by default, should be in FIFO order.
//
// Note, we still need to ensure the transactions returned respect req.MaxTxBytes.
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
for _, tx := range txs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx)
if stop {
break
}
}
return h.txSelector.SelectedTxs(ctx), nil
}
iterator := h.mempool.Select(ctx, txs)
for iterator != nil {
memTx := iterator.Tx()
// NOTE: Since transaction verification was already executed in CheckTx,
// which calls mempool.Insert, in theory everything in the pool should be
// valid. But some mempool implementations may insert invalid txs, so we
// check again.
_, err := app.ValidateTx(ctx, memTx)
if err != nil {
err := h.mempool.Remove([]T{memTx})
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
return nil, err
}
} else {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx)
if stop {
break
}
}
iterator = iterator.Next()
}
return h.txSelector.SelectedTxs(ctx), nil
}
}
func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.ProcessProposalRequest) error {
// If the mempool is nil we simply return ACCEPT,
// because PrepareProposal may have included txs that could fail verification.
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
return nil
}
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return err
}
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}
var maxBlockGas uint64
if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}
// Decode request txs bytes
// If there an tx decoded fail, return err
var txs []T
for _, tx := range req.Txs {
decTx, err := codec.Decode(tx)
if err != nil {
return fmt.Errorf("failed to decode tx: %w", err)
}
txs = append(txs, decTx)
}
var totalTxGas uint64
for _, tx := range txs {
_, err := app.ValidateTx(ctx, tx)
if err != nil {
return fmt.Errorf("failed to validate tx: %w", err)
}
if maxBlockGas > 0 {
gaslimit, err := tx.GetGasLimit()
if err != nil {
return errors.New("failed to get gas limit")
}
totalTxGas += gaslimit
if totalTxGas > maxBlockGas {
return fmt.Errorf("total tx gas %d exceeds max block gas %d", totalTxGas, maxBlockGas)
}
}
}
return nil
}
}
// decodeTxs decodes the txs bytes into a decoded txs
// If there a fail decoding tx, remove from the list
// Used for prepare proposal
func decodeTxs[T transaction.Tx](codec transaction.Codec[T], txsBz [][]byte) []T {
var txs []T
for _, tx := range txsBz {
decTx, err := codec.Decode(tx)
if err != nil {
continue
}
txs = append(txs, decTx)
}
return txs
}
// NoOpPrepareProposal defines a no-op PrepareProposal handler. It will always
// return the transactions sent by the client's request.
func NoOpPrepareProposal[T transaction.Tx]() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
return decodeTxs(codec, req.Txs), nil
}
}
// NoOpProcessProposal defines a no-op ProcessProposal Handler. It will always
// return ACCEPT.
func NoOpProcessProposal[T transaction.Tx]() ProcessHandler[T] {
return func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error {
return nil
}
}
// NoOpExtendVote defines a no-op ExtendVote handler. It will always return an
// empty byte slice as the vote extension.
func NoOpExtendVote() ExtendVoteHandler {
return func(context.Context, store.ReaderMap, *abci.ExtendVoteRequest) (*abci.ExtendVoteResponse, error) {
return &abci.ExtendVoteResponse{VoteExtension: []byte{}}, nil
}
}
// NoOpVerifyVoteExtensionHandler defines a no-op VerifyVoteExtension handler. It
// will always return an ACCEPT status with no error.
func NoOpVerifyVoteExtensionHandler() VerifyVoteExtensionhandler {
return func(context.Context, store.ReaderMap, *abci.VerifyVoteExtensionRequest) (*abci.VerifyVoteExtensionResponse, error) {
return &abci.VerifyVoteExtensionResponse{Status: abci.VERIFY_VOTE_EXTENSION_STATUS_ACCEPT}, nil
}
}