Skip to content

Commit

Permalink
go: Add consensus transaction prioritization
Browse files Browse the repository at this point in the history
  • Loading branch information
abukosek committed Sep 5, 2022
1 parent b64e8b5 commit 3964c45
Show file tree
Hide file tree
Showing 20 changed files with 155 additions and 14 deletions.
1 change: 1 addition & 0 deletions .changelog/4911.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go: Add consensus transaction prioritization
52 changes: 38 additions & 14 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,15 +542,31 @@ func (mux *abciMux) decodeTx(ctx *api.Context, rawTx []byte) (*transaction.Trans
return &tx, &sigTx, nil
}

func (mux *abciMux) processTx(ctx *api.Context, tx *transaction.Transaction, txSize int) error {
func (mux *abciMux) processTx(ctx *api.Context, tx *transaction.Transaction, txSize int) (int64, error) {
// Lookup method handler.
app := mux.appsByMethod[tx.Method]
if app == nil {
ctx.Logger().Debug("unknown method",
"tx", tx,
"method", tx.Method,
)
return fmt.Errorf("mux: unknown method: %s", tx.Method)
return 0, fmt.Errorf("mux: unknown method: %s", tx.Method)
}

// Determine transaction's priority.
var (
err error
priority int64
)

if priority, err = app.GetTxPriority(ctx, tx); err != nil {
ctx.Logger().Debug("failed to get transaction's priority",
"tx", tx,
"tx_signer", ctx.TxSigner(),
"method", tx.Method,
"err", err,
)
return 0, err
}

// Pass the transaction through the fee handler if configured.
Expand All @@ -563,16 +579,17 @@ func (mux *abciMux) processTx(ctx *api.Context, tx *transaction.Transaction, txS
"tx", tx,
"tx_signer", ctx.TxSigner(),
"method", tx.Method,
"priority", priority,
"err", err,
)
return err
return 0, err
}
}

// Charge gas based on the size of the transaction.
params := mux.state.ConsensusParameters()
if err := ctx.Gas().UseGas(txSize, consensusGenesis.GasOpTxByte, params.GasCosts); err != nil {
return err
return 0, err
}

// Route to correct handler.
Expand All @@ -582,7 +599,7 @@ func (mux *abciMux) processTx(ctx *api.Context, tx *transaction.Transaction, txS
)

if err := app.ExecuteTx(ctx, tx); err != nil {
return err
return 0, err
}

// Pass the transaction through the PostExecuteTx handler if configured.
Expand All @@ -592,19 +609,20 @@ func (mux *abciMux) processTx(ctx *api.Context, tx *transaction.Transaction, txS
"tx", tx,
"tx_signer", ctx.TxSigner(),
"method", tx.Method,
"priority", priority,
"err", err,
)
return err
return 0, err
}
}

return nil
return priority, nil
}

func (mux *abciMux) executeTx(ctx *api.Context, rawTx []byte) error {
func (mux *abciMux) executeTx(ctx *api.Context, rawTx []byte) (int64, error) {
tx, sigTx, err := mux.decodeTx(ctx, rawTx)
if err != nil {
return err
return 0, err
}

// Set authenticated transaction signer.
Expand All @@ -615,10 +633,10 @@ func (mux *abciMux) executeTx(ctx *api.Context, rawTx []byte) error {
if upgrader := mux.state.Upgrader(); upgrader != nil && ctx.IsCheckOnly() {
hasUpgrade, err := upgrader.HasPendingUpgradeAt(ctx, ctx.BlockHeight()+1)
if err != nil {
return fmt.Errorf("failed to check for pending upgrades: %w", err)
return 0, fmt.Errorf("failed to check for pending upgrades: %w", err)
}
if hasUpgrade {
return transaction.ErrUpgradePending
return 0, transaction.ErrUpgradePending
}
}

Expand Down Expand Up @@ -662,7 +680,7 @@ func (mux *abciMux) EstimateGas(caller signature.PublicKey, tx *transaction.Tran

// Ignore any errors that occurred during simulation as we only need to estimate gas even if the
// transaction seems like it will fail.
_ = mux.processTx(ctx, tx, txSize)
_, _ = mux.processTx(ctx, tx, txSize)

return ctx.Gas().GasUsed(), nil
}
Expand All @@ -682,7 +700,12 @@ func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
ctx := mux.state.NewContext(api.ContextCheckTx, mux.currentTime)
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
var (
err error
priority int64
)

if priority, err = mux.executeTx(ctx, req.Tx); err != nil {
module, code := errors.Code(err)

if req.Type == types.CheckTxType_Recheck {
Expand Down Expand Up @@ -710,14 +733,15 @@ func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
Code: types.CodeTypeOK,
GasWanted: int64(ctx.Gas().GasWanted()),
GasUsed: int64(ctx.Gas().GasUsed()),
Priority: priority,
}
}

func (mux *abciMux) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverTx {
ctx := mux.state.NewContext(api.ContextDeliverTx, mux.currentTime)
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
if _, err := mux.executeTx(ctx, req.Tx); err != nil {
if api.IsUnavailableStateError(err) {
// Make sure to not commit any transactions which include results based on unavailable
// and/or corrupted state -- doing so can further corrupt state.
Expand Down
4 changes: 4 additions & 0 deletions go/consensus/tendermint/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type Application interface {
// ExecuteTx executes a transaction.
ExecuteTx(*Context, *transaction.Transaction) error

// GetTxPriority returns the transaction's priority.
// Higher number means higher priority.
GetTxPriority(*Context, *transaction.Transaction) (int64, error)

// InitChain initializes the blockchain with validators and other
// info from TendermintCore.
//
Expand Down
3 changes: 3 additions & 0 deletions go/consensus/tendermint/apps/beacon/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const (
// AppName is the ABCI application name.
// Run before all other applications.
AppName string = "000_beacon"

// AppPriority is the base priority for the app's transactions.
AppPriority int64 = 100000
)

var (
Expand Down
4 changes: 4 additions & 0 deletions go/consensus/tendermint/apps/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func (app *beaconApplication) ExecuteTx(ctx *api.Context, tx *transaction.Transa
return app.backend.ExecuteTx(ctx, state, params, tx)
}

func (app *beaconApplication) GetTxPriority(ctx *api.Context, tx *transaction.Transaction) (int64, error) {
return AppPriority, nil
}

func (app *beaconApplication) EndBlock(ctx *api.Context, req types.RequestEndBlock) (types.ResponseEndBlock, error) {
return types.ResponseEndBlock{}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions go/consensus/tendermint/apps/governance/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (

// AppName is the ABCI application name.
AppName string = "300_governance"

// AppPriority is the base priority for the app's transactions.
AppPriority int64 = 25000
)

var (
Expand Down
11 changes: 11 additions & 0 deletions go/consensus/tendermint/apps/governance/governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ func (app *governanceApplication) ExecuteTx(ctx *api.Context, tx *transaction.Tr
}
}

func (app *governanceApplication) GetTxPriority(ctx *api.Context, tx *transaction.Transaction) (int64, error) {
switch tx.Method {
case governance.MethodSubmitProposal:
return AppPriority, nil
case governance.MethodCastVote:
return AppPriority, nil
default:
return 0, governance.ErrInvalidArgument
}
}

func (app *governanceApplication) ExecuteMessage(ctx *api.Context, kind, msg interface{}) (interface{}, error) {
state := governanceState.NewMutableState(ctx.State())

Expand Down
3 changes: 3 additions & 0 deletions go/consensus/tendermint/apps/keymanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ const (

// AppName is the ABCI application name.
AppName string = "999_keymanager"

// AppPriority is the base priority for the app's transactions.
AppPriority int64 = 50000
)

var (
Expand Down
9 changes: 9 additions & 0 deletions go/consensus/tendermint/apps/keymanager/keymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ func (app *keymanagerApplication) ExecuteTx(ctx *tmapi.Context, tx *transaction.
}
}

func (app *keymanagerApplication) GetTxPriority(ctx *tmapi.Context, tx *transaction.Transaction) (int64, error) {
switch tx.Method {
case api.MethodUpdatePolicy:
return AppPriority, nil
default:
return 0, fmt.Errorf("keymanager: invalid method: %s", tx.Method)
}
}

func (app *keymanagerApplication) EndBlock(ctx *tmapi.Context, request types.RequestEndBlock) (types.ResponseEndBlock, error) {
return types.ResponseEndBlock{}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions go/consensus/tendermint/apps/registry/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (

// AppName is the ABCI application name.
AppName string = "200_registry"

// AppPriority is the base priority for the app's transactions.
AppPriority int64 = 50000
)

var (
Expand Down
19 changes: 19 additions & 0 deletions go/consensus/tendermint/apps/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@ func (app *registryApplication) ExecuteTx(ctx *api.Context, tx *transaction.Tran
}
}

func (app *registryApplication) GetTxPriority(ctx *api.Context, tx *transaction.Transaction) (int64, error) {
switch tx.Method {
case registry.MethodRegisterEntity:
return AppPriority, nil
case registry.MethodDeregisterEntity:
return AppPriority, nil
case registry.MethodRegisterNode:
return AppPriority + 10000, nil
case registry.MethodUnfreezeNode:
return AppPriority, nil
case registry.MethodRegisterRuntime:
return AppPriority, nil
case registry.MethodProveFreshness:
return AppPriority, nil
default:
return 0, registry.ErrInvalidArgument
}
}

func (app *registryApplication) EndBlock(ctx *api.Context, request types.RequestEndBlock) (types.ResponseEndBlock, error) {
return types.ResponseEndBlock{}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions go/consensus/tendermint/apps/roothash/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const (

// AppName is the ABCI application name.
AppName string = "999_roothash"

// AppPriority is the base priority for the app's transactions.
AppPriority int64 = 15000
)

var (
Expand Down
15 changes: 15 additions & 0 deletions go/consensus/tendermint/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,21 @@ func (app *rootHashApplication) ExecuteTx(ctx *tmapi.Context, tx *transaction.Tr
}
}

func (app *rootHashApplication) GetTxPriority(ctx *tmapi.Context, tx *transaction.Transaction) (int64, error) {
switch tx.Method {
case roothash.MethodExecutorCommit:
return AppPriority, nil
case roothash.MethodExecutorProposerTimeout:
return AppPriority, nil
case roothash.MethodEvidence:
return AppPriority, nil
case roothash.MethodSubmitMsg:
return AppPriority, nil
default:
return 0, roothash.ErrInvalidArgument
}
}

func (app *rootHashApplication) onNewRuntime(ctx *tmapi.Context, runtime *registry.Runtime, genesis *roothash.Genesis, suspended bool) error {
if !runtime.IsCompute() {
ctx.Logger().Debug("onNewRuntime: ignoring non-compute runtime",
Expand Down
3 changes: 3 additions & 0 deletions go/consensus/tendermint/apps/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (

// AppName is the ABCI application name.
AppName string = "200_scheduler"

// AppPriority is the base priority for the app's transactions.
AppPriority int64 = 0
)

var (
Expand Down
4 changes: 4 additions & 0 deletions go/consensus/tendermint/apps/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func (app *schedulerApplication) ExecuteTx(ctx *api.Context, tx *transaction.Tra
return fmt.Errorf("tendermint/scheduler: unexpected transaction")
}

func (app *schedulerApplication) GetTxPriority(ctx *api.Context, tx *transaction.Transaction) (int64, error) {
return AppPriority, nil
}

func diffValidators(logger *logging.Logger, current, pending map[signature.PublicKey]int64) []types.ValidatorUpdate {
var updates []types.ValidatorUpdate
for v := range current {
Expand Down
3 changes: 3 additions & 0 deletions go/consensus/tendermint/apps/staking/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
const (
// AppID is the unique application identifier.
AppID uint8 = 0x05

// AppPriority is the base priority for the app's transactions.
AppPriority int64 = 1000
)

var (
Expand Down
21 changes: 21 additions & 0 deletions go/consensus/tendermint/apps/staking/staking.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,27 @@ func (app *stakingApplication) ExecuteTx(ctx *api.Context, tx *transaction.Trans
}
}

func (app *stakingApplication) GetTxPriority(ctx *api.Context, tx *transaction.Transaction) (int64, error) {
switch tx.Method {
case staking.MethodTransfer:
return AppPriority, nil
case staking.MethodBurn:
return AppPriority, nil
case staking.MethodAddEscrow:
return AppPriority, nil
case staking.MethodReclaimEscrow:
return AppPriority, nil
case staking.MethodAmendCommissionSchedule:
return AppPriority, nil
case staking.MethodAllow:
return AppPriority, nil
case staking.MethodWithdraw:
return AppPriority, nil
default:
return 0, staking.ErrInvalidArgument
}
}

func (app *stakingApplication) EndBlock(ctx *api.Context, request types.RequestEndBlock) (types.ResponseEndBlock, error) {
fees := stakingState.BlockFees(ctx)
if err := app.disburseFeesP(ctx, stakingState.NewMutableState(ctx.State()), stakingState.BlockProposer(ctx), &fees); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions go/consensus/tendermint/apps/supplementarysanity/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ const (

// AppName is the ABCI application name.
AppName string = "999_supplementarysanity"

// AppPriority is the base priority for the app's transactions.
AppPriority int64 = 0
)
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (app *supplementarySanityApplication) ExecuteTx(*api.Context, *transaction.
return fmt.Errorf("supplementarysanity: unexpected transaction")
}

func (app *supplementarySanityApplication) GetTxPriority(ctx *api.Context, tx *transaction.Transaction) (int64, error) {
return AppPriority, nil
}

func (app *supplementarySanityApplication) InitChain(*api.Context, types.RequestInitChain, *genesis.Document) error {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions go/consensus/tendermint/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ func (t *fullService) lazyInit() error {
tenderConfig.Consensus.CreateEmptyBlocks = true
tenderConfig.Consensus.CreateEmptyBlocksInterval = emptyBlockInterval
tenderConfig.Consensus.DebugUnsafeReplayRecoverCorruptedWAL = viper.GetBool(CfgDebugUnsafeReplayRecoverCorruptedWAL) && cmflags.DebugDontBlameOasis()
tenderConfig.Mempool.Version = tmconfig.MempoolV1
tenderConfig.Instrumentation.Prometheus = true
tenderConfig.Instrumentation.PrometheusListenAddr = ""
tenderConfig.TxIndex.Indexer = "null"
Expand Down

0 comments on commit 3964c45

Please sign in to comment.