Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2/cometbft): optimistic execution (backport #22560) #22680

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 112 additions & 37 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
gogoproto "github.com/cosmos/gogoproto/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"

"cosmossdk.io/collections"
Expand All @@ -28,6 +28,7 @@ import (
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/handlers"
"cosmossdk.io/server/v2/cometbft/mempool"
"cosmossdk.io/server/v2/cometbft/oe"
"cosmossdk.io/server/v2/cometbft/types"
cometerrors "cosmossdk.io/server/v2/cometbft/types/errors"
"cosmossdk.io/server/v2/streaming"
Expand Down Expand Up @@ -77,6 +78,11 @@ type consensus[T transaction.Tx] struct {
extendVote handlers.ExtendVoteHandler
checkTxHandler handlers.CheckTxHandler[T]

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution[T]

addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

Expand Down Expand Up @@ -385,6 +391,14 @@ func (c *consensus[T]) PrepareProposal(
return nil, errors.New("no prepare proposal function was set")
}

// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round.
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to
// in-memory structs depending on application implementation.
// No-op if OE is not enabled.
// Similar call to Abort() is done in `ProcessProposal`.
c.optimisticExec.Abort()

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand Down Expand Up @@ -421,6 +435,16 @@ func (c *consensus[T]) ProcessProposal(
return nil, errors.New("no process proposal function was set")
}

// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
c.optimisticExec.Abort()
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand All @@ -436,6 +460,17 @@ func (c *consensus[T]) ProcessProposal(
}, nil
}

// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}

return &abciproto.ProcessProposalResponse{
Status: abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT,
}, nil
Expand All @@ -447,46 +482,40 @@ func (c *consensus[T]) FinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, err
}

if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, err
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, err
}
var (
resp *server.BlockResponse
newState store.WriterMap
decodedTxs []T
err error
)

if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)

// Wait for the OE to finish, regardless of whether it was aborted or not
res, optimistErr := c.optimisticExec.WaitResult()

if !aborted {
if res != nil {
resp = res.Resp
newState = res.StateChanges
decodedTxs = res.DecodedTxs
}

cid, err := c.store.LastCommitID()
if err != nil {
return nil, err
}
if optimistErr != nil {
return nil, optimistErr
}
}

blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
c.optimisticExec.Reset()
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})

resp, newState, err := c.app.DeliverBlock(ciCtx, blockReq)
if err != nil {
return nil, err
if resp == nil { // if we didn't run OE, run the normal finalize block
resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
if err != nil {
return nil, err
}
}

// after we get the changeset we can produce the commit hash,
Expand Down Expand Up @@ -531,6 +560,52 @@ func (c *consensus[T]) FinalizeBlock(
return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
}

func (c *consensus[T]) internalFinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*server.BlockResponse, store.WriterMap, []T, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, nil, nil, err
}

if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, nil, nil, err
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, nil, nil, err
}

cid, err := c.store.LastCommitID()
if err != nil {
return nil, nil, nil, err
}

blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})

resp, stateChanges, err := c.app.DeliverBlock(ciCtx, blockReq)

return resp, stateChanges, decodedTxs, err
}

// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
Expand Down
99 changes: 91 additions & 8 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package cometbft

import (
"context"
"cosmossdk.io/core/server"
"crypto/sha256"
"encoding/json"
"errors"
abci "github.com/cometbft/cometbft/abci/types"
"io"
"strings"
"sync"
"testing"
"time"

"cosmossdk.io/server/v2/cometbft/oe"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
v1 "github.com/cometbft/cometbft/api/cometbft/types/v1"
"github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -56,10 +60,10 @@ func getQueryRouterBuilder[T any, PT interface {
*T
proto.Message
},
U any, UT interface {
*U
proto.Message
}](
U any, UT interface {
*U
proto.Message
}](
t *testing.T,
handler func(ctx context.Context, msg PT) (UT, error),
) *stf.MsgRouterBuilder {
Expand All @@ -86,10 +90,10 @@ func getMsgRouterBuilder[T any, PT interface {
*T
transaction.Msg
},
U any, UT interface {
*U
transaction.Msg
}](
U any, UT interface {
*U
transaction.Msg
}](
t *testing.T,
handler func(ctx context.Context, msg PT) (UT, error),
) *stf.MsgRouterBuilder {
Expand Down Expand Up @@ -514,6 +518,12 @@ func TestConsensus_ProcessProposal(t *testing.T) {
require.Error(t, err)

// NoOp handler
// dummy optimistic execution
optimisticMockFunc := func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []mock.Tx, error) {
return nil, nil, nil, errors.New("test error")
}
c.optimisticExec = oe.NewOptimisticExecution[mock.Tx](log.NewNopLogger(), optimisticMockFunc)

c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler
_, err = c.ProcessProposal(context.Background(), &abciproto.ProcessProposalRequest{
Height: 1,
Expand Down Expand Up @@ -724,3 +734,76 @@ func assertStoreLatestVersion(t *testing.T, store types.Store, target uint64) {
require.NoError(t, err)
require.Equal(t, target, commitInfo.Version)
}

func TestOptimisticExecution(t *testing.T) {
c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{})

// Set up handlers
c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler

// mock optimistic execution
calledTimes := 0
optimisticMockFunc := func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []mock.Tx, error) {
calledTimes++
return nil, nil, nil, errors.New("test error")
}
c.optimisticExec = oe.NewOptimisticExecution[mock.Tx](log.NewNopLogger(), optimisticMockFunc)

_, err := c.InitChain(context.Background(), &abciproto.InitChainRequest{
Time: time.Now(),
ChainId: "test",
InitialHeight: 1,
})
require.NoError(t, err)

_, err = c.FinalizeBlock(context.Background(), &abciproto.FinalizeBlockRequest{
Time: time.Now(),
Height: 1,
Txs: [][]byte{mockTx.Bytes()},
Hash: emptyHash[:],
})
require.NoError(t, err)

theHash := sha256.Sum256([]byte("test"))
ppReq := &abciproto.ProcessProposalRequest{
Height: 2,
Hash: theHash[:],
Time: time.Now(),
Txs: [][]byte{mockTx.Bytes()},
}

// Start optimistic execution
resp, err := c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

// Initialize FinalizeBlock with correct hash - should use optimistic result
theHash = sha256.Sum256([]byte("test"))
fbReq := &abciproto.FinalizeBlockRequest{
Height: 2,
Hash: theHash[:],
Time: ppReq.Time,
Txs: ppReq.Txs,
}
fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)

resp, err = c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

theWrongHash := sha256.Sum256([]byte("wrong_hash"))
fbReq.Hash = theWrongHash[:]

// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)

// Verify optimistic execution was reset
require.False(t, c.optimisticExec.Initialized())
}
Loading
Loading