diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index c19110da0c43..e6a07fd86f07 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -11,7 +11,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" gogoproto "github.com/cosmos/gogoproto/proto" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoregistry" "cosmossdk.io/collections" @@ -28,6 +28,7 @@ import ( "cosmossdk.io/server/v2/appmanager" "cosmossdk.io/server/v2/cometbft/handlers" "cosmossdk.io/server/v2/cometbft/mempool" + "cosmossdk.io/server/v2/cometbft/oe" "cosmossdk.io/server/v2/cometbft/types" cometerrors "cosmossdk.io/server/v2/cometbft/types/errors" "cosmossdk.io/server/v2/streaming" @@ -77,6 +78,11 @@ type consensus[T transaction.Tx] struct { extendVote handlers.ExtendVoteHandler checkTxHandler handlers.CheckTxHandler[T] + // optimisticExec contains the context required for Optimistic Execution, + // including the goroutine handling.This is experimental and must be enabled + // by developers. + optimisticExec *oe.OptimisticExecution[T] + addrPeerFilter types.PeerFilter // filter peers by address and port idPeerFilter types.PeerFilter // filter peers by node ID @@ -385,6 +391,14 @@ func (c *consensus[T]) PrepareProposal( return nil, errors.New("no prepare proposal function was set") } + // Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic + // `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round. + // Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to + // in-memory structs depending on application implementation. + // No-op if OE is not enabled. + // Similar call to Abort() is done in `ProcessProposal`. + c.optimisticExec.Abort() + ciCtx := contextWithCometInfo(ctx, comet.Info{ Evidence: toCoreEvidence(req.Misbehavior), ValidatorsHash: req.NextValidatorsHash, @@ -421,6 +435,16 @@ func (c *consensus[T]) ProcessProposal( return nil, errors.New("no process proposal function was set") } + // Since the application can get access to FinalizeBlock state and write to it, + // we must be sure to reset it in case ProcessProposal timeouts and is called + // again in a subsequent round. However, we only want to do this after we've + // processed the first block, as we want to avoid overwriting the finalizeState + // after state changes during InitChain. + if req.Height > int64(c.initialHeight) { + // abort any running OE + c.optimisticExec.Abort() + } + ciCtx := contextWithCometInfo(ctx, comet.Info{ Evidence: toCoreEvidence(req.Misbehavior), ValidatorsHash: req.NextValidatorsHash, @@ -436,6 +460,17 @@ func (c *consensus[T]) ProcessProposal( }, nil } + // Only execute optimistic execution if the proposal is accepted, OE is + // enabled and the block height is greater than the initial height. During + // the first block we'll be carrying state from InitChain, so it would be + // impossible for us to easily revert. + // After the first block has been processed, the next blocks will get executed + // optimistically, so that when the ABCI client calls `FinalizeBlock` the app + // can have a response ready. + if req.Height > int64(c.initialHeight) { + c.optimisticExec.Execute(req) + } + return &abciproto.ProcessProposalResponse{ Status: abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT, }, nil @@ -447,46 +482,40 @@ func (c *consensus[T]) FinalizeBlock( ctx context.Context, req *abciproto.FinalizeBlockRequest, ) (*abciproto.FinalizeBlockResponse, error) { - if err := c.validateFinalizeBlockHeight(req); err != nil { - return nil, err - } - - if err := c.checkHalt(req.Height, req.Time); err != nil { - return nil, err - } - - // TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case, - // considering that prepare and process always decode txs, assuming they're the ones providing txs we should never - // have a tx that fails decoding. - decodedTxs, err := decodeTxs(req.Txs, c.txCodec) - if err != nil { - return nil, err - } + var ( + resp *server.BlockResponse + newState store.WriterMap + decodedTxs []T + err error + ) + + if c.optimisticExec.Initialized() { + // check if the hash we got is the same as the one we are executing + aborted := c.optimisticExec.AbortIfNeeded(req.Hash) + + // Wait for the OE to finish, regardless of whether it was aborted or not + res, optimistErr := c.optimisticExec.WaitResult() + + if !aborted { + if res != nil { + resp = res.Resp + newState = res.StateChanges + decodedTxs = res.DecodedTxs + } - cid, err := c.store.LastCommitID() - if err != nil { - return nil, err - } + if optimistErr != nil { + return nil, optimistErr + } + } - blockReq := &server.BlockRequest[T]{ - Height: uint64(req.Height), - Time: req.Time, - Hash: req.Hash, - AppHash: cid.Hash, - ChainId: c.chainID, - Txs: decodedTxs, + c.optimisticExec.Reset() } - ciCtx := contextWithCometInfo(ctx, comet.Info{ - Evidence: toCoreEvidence(req.Misbehavior), - ValidatorsHash: req.NextValidatorsHash, - ProposerAddress: req.ProposerAddress, - LastCommit: toCoreCommitInfo(req.DecidedLastCommit), - }) - - resp, newState, err := c.app.DeliverBlock(ciCtx, blockReq) - if err != nil { - return nil, err + if resp == nil { // if we didn't run OE, run the normal finalize block + resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req) + if err != nil { + return nil, err + } } // after we get the changeset we can produce the commit hash, @@ -531,6 +560,52 @@ func (c *consensus[T]) FinalizeBlock( return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace) } +func (c *consensus[T]) internalFinalizeBlock( + ctx context.Context, + req *abciproto.FinalizeBlockRequest, +) (*server.BlockResponse, store.WriterMap, []T, error) { + if err := c.validateFinalizeBlockHeight(req); err != nil { + return nil, nil, nil, err + } + + if err := c.checkHalt(req.Height, req.Time); err != nil { + return nil, nil, nil, err + } + + // TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case, + // considering that prepare and process always decode txs, assuming they're the ones providing txs we should never + // have a tx that fails decoding. + decodedTxs, err := decodeTxs(req.Txs, c.txCodec) + if err != nil { + return nil, nil, nil, err + } + + cid, err := c.store.LastCommitID() + if err != nil { + return nil, nil, nil, err + } + + blockReq := &server.BlockRequest[T]{ + Height: uint64(req.Height), + Time: req.Time, + Hash: req.Hash, + AppHash: cid.Hash, + ChainId: c.chainID, + Txs: decodedTxs, + } + + ciCtx := contextWithCometInfo(ctx, comet.Info{ + Evidence: toCoreEvidence(req.Misbehavior), + ValidatorsHash: req.NextValidatorsHash, + ProposerAddress: req.ProposerAddress, + LastCommit: toCoreCommitInfo(req.DecidedLastCommit), + }) + + resp, stateChanges, err := c.app.DeliverBlock(ciCtx, blockReq) + + return resp, stateChanges, decodedTxs, err +} + // Commit implements types.Application. // It is called by cometbft to notify the application that a block was committed. func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) { diff --git a/server/v2/cometbft/abci_test.go b/server/v2/cometbft/abci_test.go index c86ca226391b..7b20c354d786 100644 --- a/server/v2/cometbft/abci_test.go +++ b/server/v2/cometbft/abci_test.go @@ -2,14 +2,18 @@ package cometbft import ( "context" + "cosmossdk.io/core/server" "crypto/sha256" "encoding/json" + "errors" + abci "github.com/cometbft/cometbft/abci/types" "io" "strings" "sync" "testing" "time" + "cosmossdk.io/server/v2/cometbft/oe" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" v1 "github.com/cometbft/cometbft/api/cometbft/types/v1" "github.com/cosmos/gogoproto/proto" @@ -56,10 +60,10 @@ func getQueryRouterBuilder[T any, PT interface { *T proto.Message }, - U any, UT interface { - *U - proto.Message - }]( +U any, UT interface { + *U + proto.Message +}]( t *testing.T, handler func(ctx context.Context, msg PT) (UT, error), ) *stf.MsgRouterBuilder { @@ -86,10 +90,10 @@ func getMsgRouterBuilder[T any, PT interface { *T transaction.Msg }, - U any, UT interface { - *U - transaction.Msg - }]( +U any, UT interface { + *U + transaction.Msg +}]( t *testing.T, handler func(ctx context.Context, msg PT) (UT, error), ) *stf.MsgRouterBuilder { @@ -514,6 +518,12 @@ func TestConsensus_ProcessProposal(t *testing.T) { require.Error(t, err) // NoOp handler + // dummy optimistic execution + optimisticMockFunc := func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []mock.Tx, error) { + return nil, nil, nil, errors.New("test error") + } + c.optimisticExec = oe.NewOptimisticExecution[mock.Tx](log.NewNopLogger(), optimisticMockFunc) + c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler _, err = c.ProcessProposal(context.Background(), &abciproto.ProcessProposalRequest{ Height: 1, @@ -724,3 +734,76 @@ func assertStoreLatestVersion(t *testing.T, store types.Store, target uint64) { require.NoError(t, err) require.Equal(t, target, commitInfo.Version) } + +func TestOptimisticExecution(t *testing.T) { + c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{}) + + // Set up handlers + c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler + + // mock optimistic execution + calledTimes := 0 + optimisticMockFunc := func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []mock.Tx, error) { + calledTimes++ + return nil, nil, nil, errors.New("test error") + } + c.optimisticExec = oe.NewOptimisticExecution[mock.Tx](log.NewNopLogger(), optimisticMockFunc) + + _, err := c.InitChain(context.Background(), &abciproto.InitChainRequest{ + Time: time.Now(), + ChainId: "test", + InitialHeight: 1, + }) + require.NoError(t, err) + + _, err = c.FinalizeBlock(context.Background(), &abciproto.FinalizeBlockRequest{ + Time: time.Now(), + Height: 1, + Txs: [][]byte{mockTx.Bytes()}, + Hash: emptyHash[:], + }) + require.NoError(t, err) + + theHash := sha256.Sum256([]byte("test")) + ppReq := &abciproto.ProcessProposalRequest{ + Height: 2, + Hash: theHash[:], + Time: time.Now(), + Txs: [][]byte{mockTx.Bytes()}, + } + + // Start optimistic execution + resp, err := c.ProcessProposal(context.Background(), ppReq) + require.NoError(t, err) + require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT) + + // Initialize FinalizeBlock with correct hash - should use optimistic result + theHash = sha256.Sum256([]byte("test")) + fbReq := &abciproto.FinalizeBlockRequest{ + Height: 2, + Hash: theHash[:], + Time: ppReq.Time, + Txs: ppReq.Txs, + } + fbResp, err := c.FinalizeBlock(context.Background(), fbReq) + require.Error(t, err) + require.ErrorContains(t, err, "test error") // from optimisticMockFunc + require.Equal(t, 1, calledTimes) + + resp, err = c.ProcessProposal(context.Background(), ppReq) + require.NoError(t, err) + require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT) + + theWrongHash := sha256.Sum256([]byte("wrong_hash")) + fbReq.Hash = theWrongHash[:] + + // Initialize FinalizeBlock with wrong hash - should abort optimistic execution + // Because is aborted, the result comes from the normal execution + fbResp, err = c.FinalizeBlock(context.Background(), fbReq) + require.NotNil(t, fbResp) + require.NoError(t, err) + require.Equal(t, 2, calledTimes) + + // Verify optimistic execution was reset + require.False(t, c.optimisticExec.Initialized()) +} diff --git a/server/v2/cometbft/oe/optimistic_execution.go b/server/v2/cometbft/oe/optimistic_execution.go new file mode 100644 index 000000000000..aee55b552425 --- /dev/null +++ b/server/v2/cometbft/oe/optimistic_execution.go @@ -0,0 +1,169 @@ +package oe + +import ( + "bytes" + "context" + "encoding/hex" + "math/rand" + "sync" + "time" + + abci "github.com/cometbft/cometbft/api/cometbft/abci/v1" + + "cosmossdk.io/core/server" + "cosmossdk.io/core/store" + "cosmossdk.io/core/transaction" + "cosmossdk.io/log" +) + +// FinalizeBlockFunc is the function that is called by the OE to finalize the +// block. It is the same as the one in the ABCI app. +type FinalizeBlockFunc[T transaction.Tx] func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []T, error) + +// OptimisticExecution is a struct that contains the OE context. It is used to +// run the FinalizeBlock function in a goroutine, and to abort it if needed. +type OptimisticExecution[T transaction.Tx] struct { + finalizeBlockFunc FinalizeBlockFunc[T] // ABCI FinalizeBlock function with a context + logger log.Logger + + mtx sync.Mutex + stopCh chan struct{} + request *abci.FinalizeBlockRequest + response *FinalizeBlockResponse[T] + err error + cancelFunc func() // cancel function for the context + initialized bool // A boolean value indicating whether the struct has been initialized + + // debugging/testing options + abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted +} + +type FinalizeBlockResponse[T transaction.Tx] struct { + Resp *server.BlockResponse + StateChanges store.WriterMap + DecodedTxs []T +} + +// NewOptimisticExecution initializes the Optimistic Execution context but does not start it. +func NewOptimisticExecution[T transaction.Tx](logger log.Logger, fn FinalizeBlockFunc[T], opts ...func(*OptimisticExecution[T])) *OptimisticExecution[T] { + logger = logger.With(log.ModuleKey, "oe") + oe := &OptimisticExecution[T]{logger: logger, finalizeBlockFunc: fn} + for _, opt := range opts { + opt(oe) + } + return oe +} + +// WithAbortRate sets the abort rate for the OE. The abort rate is a number from +// 0 to 100 that determines the percentage of OE that should be aborted. +// This is for testing purposes only and must not be used in production. +func WithAbortRate[T transaction.Tx](rate int) func(*OptimisticExecution[T]) { + return func(oe *OptimisticExecution[T]) { + oe.abortRate = rate + } +} + +// Reset resets the OE context. Must be called whenever we want to invalidate +// the current OE. +func (oe *OptimisticExecution[T]) Reset() { + oe.mtx.Lock() + defer oe.mtx.Unlock() + oe.request = nil + oe.response = nil + oe.err = nil + oe.initialized = false +} + +// Initialized returns true if the OE was initialized, meaning that it contains +// a request and it was run or it is running. +func (oe *OptimisticExecution[T]) Initialized() bool { + if oe == nil { + return false + } + oe.mtx.Lock() + defer oe.mtx.Unlock() + + return oe.initialized +} + +// Execute initializes the OE and starts it in a goroutine. +func (oe *OptimisticExecution[T]) Execute(req *abci.ProcessProposalRequest) { + oe.mtx.Lock() + defer oe.mtx.Unlock() + + oe.stopCh = make(chan struct{}) + oe.request = &abci.FinalizeBlockRequest{ + Txs: req.Txs, + DecidedLastCommit: req.ProposedLastCommit, + Misbehavior: req.Misbehavior, + Hash: req.Hash, + Height: req.Height, + Time: req.Time, + NextValidatorsHash: req.NextValidatorsHash, + ProposerAddress: req.ProposerAddress, + } + + oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String()) + ctx, cancel := context.WithCancel(context.Background()) + oe.cancelFunc = cancel + oe.initialized = true + + go func() { + start := time.Now() + resp, stateChanges, decodedTxs, err := oe.finalizeBlockFunc(ctx, oe.request) + + oe.mtx.Lock() + + executionTime := time.Since(start) + oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", oe.request.Height, "hash", hex.EncodeToString(oe.request.Hash)) + oe.response, oe.err = &FinalizeBlockResponse[T]{ + Resp: resp, + StateChanges: stateChanges, + DecodedTxs: decodedTxs, + }, err + + close(oe.stopCh) + oe.mtx.Unlock() + }() +} + +// AbortIfNeeded aborts the OE if the request hash is not the same as the one in +// the running OE. Returns true if the OE was aborted. +func (oe *OptimisticExecution[T]) AbortIfNeeded(reqHash []byte) bool { + if oe == nil { + return false + } + + oe.mtx.Lock() + defer oe.mtx.Unlock() + + if !bytes.Equal(oe.request.Hash, reqHash) { + oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height) + oe.cancelFunc() + return true + } else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate { + // this is for test purposes only, we can emulate a certain percentage of + // OE needed to be aborted. + oe.cancelFunc() + oe.logger.Error("OE aborted due to test abort rate") + return true + } + + return false +} + +// Abort aborts the OE unconditionally and waits for it to finish. +func (oe *OptimisticExecution[T]) Abort() { + if oe == nil || oe.cancelFunc == nil { + return + } + + oe.cancelFunc() + <-oe.stopCh +} + +// WaitResult waits for the OE to finish and returns the result. +func (oe *OptimisticExecution[T]) WaitResult() (*FinalizeBlockResponse[T], error) { + <-oe.stopCh + return oe.response, oe.err +} diff --git a/server/v2/cometbft/oe/optimistic_execution_test.go b/server/v2/cometbft/oe/optimistic_execution_test.go new file mode 100644 index 000000000000..c0eb28c2a5e9 --- /dev/null +++ b/server/v2/cometbft/oe/optimistic_execution_test.go @@ -0,0 +1,35 @@ +package oe + +import ( + "context" + "errors" + "testing" + + "cosmossdk.io/core/server" + "cosmossdk.io/core/store" + "cosmossdk.io/core/transaction" + "cosmossdk.io/log" + abci "github.com/cometbft/cometbft/api/cometbft/abci/v1" + "github.com/stretchr/testify/assert" +) + +func testFinalizeBlock[T transaction.Tx](context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []T, error) { + return nil, nil, nil, errors.New("test error") +} + +func TestOptimisticExecution(t *testing.T) { + oe := NewOptimisticExecution[transaction.Tx](log.NewNopLogger(), testFinalizeBlock) + oe.Execute(&abci.ProcessProposalRequest{ + Hash: []byte("test"), + }) + assert.True(t, oe.Initialized()) + + resp, err := oe.WaitResult() + assert.Equal(t, &FinalizeBlockResponse[transaction.Tx]{}, resp) // empty response + assert.EqualError(t, err, "test error") + + assert.False(t, oe.AbortIfNeeded([]byte("test"))) + assert.True(t, oe.AbortIfNeeded([]byte("wrong_hash"))) + + oe.Reset() +} diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index e844df3c80c3..6567e01caef0 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -2,6 +2,7 @@ package cometbft import ( "context" + "cosmossdk.io/server/v2/cometbft/oe" "crypto/sha256" "encoding/json" "fmt" @@ -164,7 +165,7 @@ func New[T transaction.Tx]( } } - srv.Consensus = &consensus[T]{ + c := &consensus[T]{ appName: appName, version: getCometBFTServerVersion(), app: app, @@ -192,6 +193,13 @@ func New[T transaction.Tx]( idPeerFilter: srv.serverOptions.IdPeerFilter, } + c.optimisticExec = oe.NewOptimisticExecution( + logger, + c.internalFinalizeBlock, + ) + + srv.Consensus = c + return srv, nil }