From f2e275f81878147632518cfd00dc71c35bf281c8 Mon Sep 17 00:00:00 2001
From: rjl493456442 <garyrong0905@gmail.com>
Date: Mon, 3 Oct 2022 20:10:00 +0800
Subject: [PATCH] eth, miner: add timeout for building sealing block (#25407)

* eth, miner: add timeout for building sealing block

* eth, cmd, miner: add newpayloadtimeout flag

* eth, miner, cmd: address comments

* eth, miner: minor fixes

Co-authored-by: Martin Holst Swende <martin@swende.se>
---
 cmd/geth/main.go         |   1 +
 cmd/utils/flags.go       |   9 +++
 eth/catalyst/api_test.go |   7 +--
 eth/ethconfig/config.go  |  16 ++----
 miner/miner.go           |  10 ++++
 miner/worker.go          | 116 +++++++++++++++++++++++++++------------
 miner/worker_test.go     |   8 +--
 7 files changed, 112 insertions(+), 55 deletions(-)

diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index 5d54ee41ca2f..e6d1128ba9b5 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -127,6 +127,7 @@ var (
 		utils.MinerExtraDataFlag,
 		utils.MinerRecommitIntervalFlag,
 		utils.MinerNoVerifyFlag,
+		utils.MinerNewPayloadTimeout,
 		utils.NATFlag,
 		utils.NoDiscoverFlag,
 		utils.DiscoveryV5Flag,
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index ca6ded475668..5ce244080ff2 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -563,6 +563,12 @@ var (
 		Usage:    "Disable remote sealing verification",
 		Category: flags.MinerCategory,
 	}
+	MinerNewPayloadTimeout = &cli.DurationFlag{
+		Name:     "miner.newpayload-timeout",
+		Usage:    "Specify the maximum time allowance for creating a new payload",
+		Value:    ethconfig.Defaults.Miner.NewPayloadTimeout,
+		Category: flags.MinerCategory,
+	}
 
 	// Account settings
 	UnlockedAccountFlag = &cli.StringFlag{
@@ -1658,6 +1664,9 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) {
 	if ctx.IsSet(MinerNoVerifyFlag.Name) {
 		cfg.Noverify = ctx.Bool(MinerNoVerifyFlag.Name)
 	}
+	if ctx.IsSet(MinerNewPayloadTimeout.Name) {
+		cfg.NewPayloadTimeout = ctx.Duration(MinerNewPayloadTimeout.Name)
+	}
 }
 
 func setRequiredBlocks(ctx *cli.Context, cfg *ethconfig.Config) {
diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go
index 480a30b52dc5..c98a48ea4769 100644
--- a/eth/catalyst/api_test.go
+++ b/eth/catalyst/api_test.go
@@ -476,10 +476,9 @@ func TestExchangeTransitionConfig(t *testing.T) {
 	genesis, preMergeBlocks := generatePreMergeChain(10)
 	n, ethservice := startEthService(t, genesis, preMergeBlocks)
 	defer n.Close()
-	var (
-		api = NewConsensusAPI(ethservice)
-	)
+
 	// invalid ttd
+	api := NewConsensusAPI(ethservice)
 	config := beacon.TransitionConfigurationV1{
 		TerminalTotalDifficulty: (*hexutil.Big)(big.NewInt(0)),
 		TerminalBlockHash:       common.Hash{},
@@ -812,10 +811,8 @@ func TestInvalidBloom(t *testing.T) {
 
 func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) {
 	genesis, preMergeBlocks := generatePreMergeChain(100)
-	fmt.Println(genesis.Config.TerminalTotalDifficulty)
 	genesis.Config.TerminalTotalDifficulty = preMergeBlocks[0].Difficulty() //.Sub(genesis.Config.TerminalTotalDifficulty, preMergeBlocks[len(preMergeBlocks)-1].Difficulty())
 
-	fmt.Println(genesis.Config.TerminalTotalDifficulty)
 	n, ethservice := startEthService(t, genesis, preMergeBlocks)
 	defer n.Close()
 
diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go
index a897294175ea..b5a7837ffda3 100644
--- a/eth/ethconfig/config.go
+++ b/eth/ethconfig/config.go
@@ -84,16 +84,12 @@ var Defaults = Config{
 	TrieTimeout:             60 * time.Minute,
 	SnapshotCache:           102,
 	FilterLogCacheSize:      32,
-	Miner: miner.Config{
-		GasCeil:  30000000,
-		GasPrice: big.NewInt(params.GWei),
-		Recommit: 3 * time.Second,
-	},
-	TxPool:        core.DefaultTxPoolConfig,
-	RPCGasCap:     50000000,
-	RPCEVMTimeout: 5 * time.Second,
-	GPO:           FullNodeGPO,
-	RPCTxFeeCap:   1, // 1 ether
+	Miner:                   miner.DefaultConfig,
+	TxPool:                  core.DefaultTxPoolConfig,
+	RPCGasCap:               50000000,
+	RPCEVMTimeout:           5 * time.Second,
+	GPO:                     FullNodeGPO,
+	RPCTxFeeCap:             1, // 1 ether
 }
 
 func init() {
diff --git a/miner/miner.go b/miner/miner.go
index 1e9607a76ad9..0f644b200bcf 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -53,6 +53,16 @@ type Config struct {
 	GasPrice   *big.Int       // Minimum gas price for mining a transaction
 	Recommit   time.Duration  // The time interval for miner to re-create mining work.
 	Noverify   bool           // Disable remote mining solution verification(only useful in ethash).
+
+	NewPayloadTimeout time.Duration // The maximum time allowance for creating a new payload
+}
+
+// DefaultConfig contains default settings for miner.
+var DefaultConfig = Config{
+	GasCeil:           30000000,
+	GasPrice:          big.NewInt(params.GWei),
+	Recommit:          3 * time.Second,
+	NewPayloadTimeout: 2 * time.Second,
 }
 
 // Miner creates blocks and searches for proof-of-work values.
diff --git a/miner/worker.go b/miner/worker.go
index 93fb6288bb45..bf9434eefe70 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -80,6 +80,7 @@ const (
 var (
 	errBlockInterruptedByNewHead  = errors.New("new head arrived while building block")
 	errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
+	errBlockInterruptedByTimeout  = errors.New("timeout while building block")
 )
 
 // environment is the worker's current environment and holds all
@@ -158,6 +159,7 @@ const (
 	commitInterruptNone int32 = iota
 	commitInterruptNewHead
 	commitInterruptResubmit
+	commitInterruptTimeout
 )
 
 // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
@@ -241,6 +243,13 @@ type worker struct {
 	// non-stop and no real transaction will be included.
 	noempty uint32
 
+	// newpayloadTimeout is the maximum timeout allowance for creating payload.
+	// The default value is 2 seconds but node operator can set it to arbitrary
+	// large value. A large timeout allowance may cause Geth to fail creating
+	// a non-empty payload within the specified time and eventually miss the slot
+	// in case there are some computation expensive transactions in txpool.
+	newpayloadTimeout time.Duration
+
 	// External functions
 	isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner.
 
@@ -288,6 +297,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
 		log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
 		recommit = minRecommitInterval
 	}
+	// Sanitize the timeout config for creating payload.
+	newpayloadTimeout := worker.config.NewPayloadTimeout
+	if newpayloadTimeout == 0 {
+		log.Warn("Sanitizing new payload timeout to default", "provided", newpayloadTimeout, "updated", DefaultConfig.NewPayloadTimeout)
+		newpayloadTimeout = DefaultConfig.NewPayloadTimeout
+	}
+	if newpayloadTimeout < time.Millisecond*100 {
+		log.Warn("Low payload timeout may cause high amount of non-full blocks", "provided", newpayloadTimeout, "default", DefaultConfig.NewPayloadTimeout)
+	}
+	worker.newpayloadTimeout = newpayloadTimeout
 
 	worker.wg.Add(4)
 	go worker.mainLoop()
@@ -844,42 +863,26 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
 	var coalescedLogs []*types.Log
 
 	for {
-		// In the following three cases, we will interrupt the execution of the transaction.
-		// (1) new head block event arrival, the interrupt signal is 1
-		// (2) worker start or restart, the interrupt signal is 1
-		// (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2.
-		// For the first two cases, the semi-finished work will be discarded.
-		// For the third case, the semi-finished work will be submitted to the consensus engine.
-		if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
-			// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
-			if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
-				ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
-				if ratio < 0.1 {
-					ratio = 0.1
-				}
-				w.resubmitAdjustCh <- &intervalAdjust{
-					ratio: ratio,
-					inc:   true,
-				}
-				return errBlockInterruptedByRecommit
+		// Check interruption signal and abort building if it's fired.
+		if interrupt != nil {
+			if signal := atomic.LoadInt32(interrupt); signal != commitInterruptNone {
+				return signalToErr(signal)
 			}
-			return errBlockInterruptedByNewHead
 		}
-		// If we don't have enough gas for any further transactions then we're done
+		// If we don't have enough gas for any further transactions then we're done.
 		if env.gasPool.Gas() < params.TxGas {
 			log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
 			break
 		}
-		// Retrieve the next transaction and abort if all done
+		// Retrieve the next transaction and abort if all done.
 		tx := txs.Peek()
 		if tx == nil {
 			break
 		}
 		// Error may be ignored here. The error has already been checked
 		// during transaction acceptance is the transaction pool.
-		//
-		// We use the eip155 signer regardless of the current hf.
 		from, _ := types.Sender(env.signer, tx)
+
 		// Check whether the tx is replay protected. If we're not in the EIP155 hf
 		// phase, start ignoring the sender until we do.
 		if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
@@ -926,7 +929,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
 			txs.Shift()
 		}
 	}
-
 	if !w.isRunning() && len(coalescedLogs) > 0 {
 		// We don't push the pendingLogsEvent while we are sealing. The reason is that
 		// when we are sealing, the worker will regenerate a sealing block every 3 seconds.
@@ -942,11 +944,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
 		}
 		w.pendingLogsFeed.Send(cpy)
 	}
-	// Notify resubmit loop to decrease resubmitting interval if current interval is larger
-	// than the user-specified one.
-	if interrupt != nil {
-		w.resubmitAdjustCh <- &intervalAdjust{inc: false}
-	}
 	return nil
 }
 
@@ -986,15 +983,15 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
 		}
 		timestamp = parent.Time() + 1
 	}
-	// Construct the sealing block header, set the extra field if it's allowed
-	num := parent.Number()
+	// Construct the sealing block header.
 	header := &types.Header{
 		ParentHash: parent.Hash(),
-		Number:     num.Add(num, common.Big1),
+		Number:     new(big.Int).Add(parent.Number(), common.Big1),
 		GasLimit:   core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil),
 		Time:       timestamp,
 		Coinbase:   genParams.coinbase,
 	}
+	// Set the extra field if it's allowed.
 	if !genParams.noExtra && len(w.extra) != 0 {
 		header.Extra = w.extra
 	}
@@ -1082,7 +1079,16 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
 	defer work.discard()
 
 	if !params.noTxs {
-		w.fillTransactions(nil, work)
+		interrupt := new(int32)
+		timer := time.AfterFunc(w.newpayloadTimeout, func() {
+			atomic.StoreInt32(interrupt, commitInterruptTimeout)
+		})
+		defer timer.Stop()
+
+		err := w.fillTransactions(interrupt, work)
+		if errors.Is(err, errBlockInterruptedByTimeout) {
+			log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout))
+		}
 	}
 	return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
 }
@@ -1113,13 +1119,36 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
 	if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
 		w.commit(work.copy(), nil, false, start)
 	}
-
-	// Fill pending transactions from the txpool
+	// Fill pending transactions from the txpool into the block.
 	err = w.fillTransactions(interrupt, work)
-	if errors.Is(err, errBlockInterruptedByNewHead) {
+	switch {
+	case err == nil:
+		// The entire block is filled, decrease resubmit interval in case
+		// of current interval is larger than the user-specified one.
+		w.resubmitAdjustCh <- &intervalAdjust{inc: false}
+
+	case errors.Is(err, errBlockInterruptedByRecommit):
+		// Notify resubmit loop to increase resubmitting interval if the
+		// interruption is due to frequent commits.
+		gaslimit := work.header.GasLimit
+		ratio := float64(gaslimit-work.gasPool.Gas()) / float64(gaslimit)
+		if ratio < 0.1 {
+			ratio = 0.1
+		}
+		w.resubmitAdjustCh <- &intervalAdjust{
+			ratio: ratio,
+			inc:   true,
+		}
+
+	case errors.Is(err, errBlockInterruptedByNewHead):
+		// If the block building is interrupted by newhead event, discard it
+		// totally. Committing the interrupted block introduces unnecessary
+		// delay, and possibly causes miner to mine on the previous head,
+		// which could result in higher uncle rate.
 		work.discard()
 		return
 	}
+	// Submit the generated block for consensus sealing.
 	w.commit(work.copy(), w.fullTaskHook, true, start)
 
 	// Swap out the old work with the new one, terminating any leftover
@@ -1231,3 +1260,18 @@ func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float {
 	}
 	return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
 }
+
+// signalToErr converts the interruption signal to a concrete error type for return.
+// The given signal must be a valid interruption signal.
+func signalToErr(signal int32) error {
+	switch signal {
+	case commitInterruptNewHead:
+		return errBlockInterruptedByNewHead
+	case commitInterruptResubmit:
+		return errBlockInterruptedByRecommit
+	case commitInterruptTimeout:
+		return errBlockInterruptedByTimeout
+	default:
+		panic(fmt.Errorf("undefined signal %d", signal))
+	}
+}
diff --git a/miner/worker_test.go b/miner/worker_test.go
index 2f1939f75981..0cba7ff9955f 100644
--- a/miner/worker_test.go
+++ b/miner/worker_test.go
@@ -523,21 +523,21 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
 }
 
 func TestGetSealingWorkEthash(t *testing.T) {
-	testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false)
+	testGetSealingWork(t, ethashChainConfig, ethash.NewFaker())
 }
 
 func TestGetSealingWorkClique(t *testing.T) {
-	testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()), false)
+	testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
 }
 
 func TestGetSealingWorkPostMerge(t *testing.T) {
 	local := new(params.ChainConfig)
 	*local = *ethashChainConfig
 	local.TerminalTotalDifficulty = big.NewInt(0)
-	testGetSealingWork(t, local, ethash.NewFaker(), true)
+	testGetSealingWork(t, local, ethash.NewFaker())
 }
 
-func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, postMerge bool) {
+func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
 	defer engine.Close()
 
 	w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)