From f7a4418f0441b642957fa40ffe518c2a322c8305 Mon Sep 17 00:00:00 2001 From: devopsbo3 <69951731+devopsbo3@users.noreply.github.com> Date: Fri, 10 Nov 2023 12:31:54 -0600 Subject: [PATCH] Revert "core, eth, miner: start propagating and consuming blob txs (#28243)" This reverts commit 723fd06e120a244e7fbff9ce5cf496174021a21d. --- core/txpool/blobpool/blobpool.go | 55 ++++++++-------------------- core/txpool/legacypool/legacypool.go | 18 ++++----- core/txpool/subpool.go | 19 ++-------- core/txpool/txpool.go | 12 +++--- eth/api_backend.go | 2 +- eth/catalyst/simulated_beacon.go | 2 +- eth/handler.go | 41 +++++++++------------ eth/handler_eth.go | 14 +++---- eth/handler_eth_test.go | 4 +- eth/handler_test.go | 6 +-- eth/protocols/eth/handler.go | 4 -- eth/protocols/eth/handlers.go | 4 +- miner/ordering_test.go | 4 -- miner/worker.go | 33 ++++++----------- 14 files changed, 73 insertions(+), 145 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 32c6c0e8feef..36916c3f0b2f 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -97,8 +97,6 @@ type blobTxMeta struct { execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump execFeeCap *uint256.Int // Needed to validate replacement price bump blobFeeCap *uint256.Int // Needed to validate replacement price bump - execGas uint64 // Needed to check inclusion validity before reading the blob - blobGas uint64 // Needed to check inclusion validity before reading the blob basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap @@ -120,8 +118,6 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta { execTipCap: uint256.MustFromBig(tx.GasTipCap()), execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), - execGas: tx.Gas(), - blobGas: tx.BlobGas(), } meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap) @@ -311,8 +307,8 @@ type BlobPool struct { spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts evict *evictHeap // Heap of cheapest accounts for eviction when full - discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded) - insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included) + eventFeed event.Feed // Event feed to send out new tx events on pool inclusion + eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination lock sync.RWMutex // Mutex protecting the pool during reorg handling } @@ -440,6 +436,8 @@ func (p *BlobPool) Close() error { if err := p.store.Close(); err != nil { errs = append(errs, err) } + p.eventScope.Close() + switch { case errs == nil: return nil @@ -760,21 +758,15 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { // Run the reorg between the old and new head and figure out which accounts // need to be rechecked and which transactions need to be readded if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil { - var adds []*types.Transaction for addr, txs := range reinject { // Blindly push all the lost transactions back into the pool for _, tx := range txs { - if err := p.reinject(addr, tx.Hash()); err == nil { - adds = append(adds, tx.WithoutBlobTxSidecar()) - } + p.reinject(addr, tx.Hash()) } // Recheck the account's pooled transactions to drop included and // invalidated one p.recheck(addr, inclusions) } - if len(adds) > 0 { - p.insertFeed.Send(core.NewTxsEvent{Txs: adds}) - } } // Flush out any blobs from limbo that are older than the latest finality if p.chain.Config().IsCancun(p.head.Number, p.head.Time) { @@ -929,13 +921,13 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]* // Note, the method will not initialize the eviction cache values as those will // be done once for all transactions belonging to an account after all individual // transactions are injected back into the pool. -func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { +func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) { // Retrieve the associated blob from the limbo. Without the blobs, we cannot // add the transaction back into the pool as it is not mineable. tx, err := p.limbo.pull(txhash) if err != nil { log.Error("Blobs unavailable, dropping reorged tx", "err", err) - return err + return } // TODO: seems like an easy optimization here would be getting the serialized tx // from limbo instead of re-serializing it here. @@ -944,12 +936,12 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { blob, err := rlp.EncodeToBytes(tx) if err != nil { log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) - return err + return } id, err := p.store.Put(blob) if err != nil { log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err) - return err + return } // Update the indixes and metrics @@ -957,7 +949,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { if _, ok := p.index[addr]; !ok { if err := p.reserve(addr, true); err != nil { log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) - return err + return } p.index[addr] = []*blobTxMeta{meta} p.spent[addr] = meta.costCap @@ -968,7 +960,6 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { } p.lookup[meta.hash] = meta.id p.stored += uint64(meta.size) - return nil } // SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements @@ -1163,19 +1154,9 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { // Add inserts a set of blob transactions into the pool if they pass validation (both // consensus validity and pool restictions). func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { - var ( - adds = make([]*types.Transaction, 0, len(txs)) - errs = make([]error, len(txs)) - ) + errs := make([]error, len(txs)) for i, tx := range txs { errs[i] = p.add(tx) - if errs[i] == nil { - adds = append(adds, tx.WithoutBlobTxSidecar()) - } - } - if len(adds) > 0 { - p.discoverFeed.Send(core.NewTxsEvent{Txs: adds}) - p.insertFeed.Send(core.NewTxsEvent{Txs: adds}) } return errs } @@ -1403,8 +1384,6 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr Time: time.Now(), // TODO(karalabe): Maybe save these and use that? GasFeeCap: tx.execFeeCap.ToBig(), GasTipCap: tx.execTipCap.ToBig(), - Gas: tx.execGas, - BlobGas: tx.blobGas, }) } if len(lazies) > 0 { @@ -1489,14 +1468,10 @@ func (p *BlobPool) updateLimboMetrics() { limboSlotusedGauge.Update(int64(slotused)) } -// SubscribeTransactions registers a subscription for new transaction events, -// supporting feeding only newly seen or also resurrected transactions. -func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { - if reorgs { - return p.insertFeed.Subscribe(ch) - } else { - return p.discoverFeed.Subscribe(ch) - } +// SubscribeTransactions registers a subscription of NewTxsEvent and +// starts sending event to the given channel. +func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { + return p.eventScope.Track(p.eventFeed.Subscribe(ch)) } // Nonce returns the next nonce of an account, with all transactions executable diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index e71204185f03..2430028f9d99 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -208,6 +208,7 @@ type LegacyPool struct { chain BlockChain gasTip atomic.Pointer[big.Int] txFeed event.Feed + scope event.SubscriptionScope signer types.Signer mu sync.RWMutex @@ -403,6 +404,9 @@ func (pool *LegacyPool) loop() { // Close terminates the transaction pool. func (pool *LegacyPool) Close() error { + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Terminate the pool reorger and return close(pool.reorgShutdownCh) pool.wg.Wait() @@ -421,14 +425,10 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) { <-wait } -// SubscribeTransactions registers a subscription for new transaction events, -// supporting feeding only newly seen or also resurrected transactions. -func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { - // The legacy pool has a very messed up internal shuffling, so it's kind of - // hard to separate newly discovered transaction from resurrected ones. This - // is because the new txs are added to the queue, resurrected ones too and - // reorgs run lazily, so separating the two would need a marker. - return pool.txFeed.Subscribe(ch) +// SubscribeTransactions registers a subscription of NewTxsEvent and +// starts sending event to the given channel. +func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) } // SetGasTip updates the minimum gas tip required by the transaction pool for a @@ -552,8 +552,6 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L Time: txs[i].Time(), GasFeeCap: txs[i].GasFeeCap(), GasTipCap: txs[i].GasTipCap(), - Gas: txs[i].Gas(), - BlobGas: txs[i].BlobGas(), } } pending[addr] = lazies diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index de05b38d433d..85312c431807 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -30,16 +30,13 @@ import ( // enough for the miner and other APIs to handle large batches of transactions; // and supports pulling up the entire transaction when really needed. type LazyTransaction struct { - Pool LazyResolver // Transaction resolver to pull the real transaction up + Pool SubPool // Transaction subpool to pull the real transaction up Hash common.Hash // Transaction hash to pull up if needed Tx *types.Transaction // Transaction if already resolved Time time.Time // Time when the transaction was first seen GasFeeCap *big.Int // Maximum fee per gas the transaction may consume GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay - - Gas uint64 // Amount of gas required by the transaction - BlobGas uint64 // Amount of blob gas required by the transaction } // Resolve retrieves the full transaction belonging to a lazy handle if it is still @@ -51,14 +48,6 @@ func (ltx *LazyTransaction) Resolve() *types.Transaction { return ltx.Tx } -// LazyResolver is a minimal interface needed for a transaction pool to satisfy -// resolving lazy transactions. It's mostly a helper to avoid the entire sub- -// pool being injected into the lazy transaction. -type LazyResolver interface { - // Get returns a transaction if it is contained in the pool, or nil otherwise. - Get(hash common.Hash) *types.Transaction -} - // AddressReserver is passed by the main transaction pool to subpools, so they // may request (and relinquish) exclusive access to certain addresses. type AddressReserver func(addr common.Address, reserve bool) error @@ -110,10 +99,8 @@ type SubPool interface { // account and sorted by nonce. Pending(enforceTips bool) map[common.Address][]*LazyTransaction - // SubscribeTransactions subscribes to new transaction events. The subscriber - // can decide whether to receive notifications only for newly seen transactions - // or also for reorged out ones. - SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + // SubscribeTransactions subscribes to new transaction events. + SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 0d4e05da4c18..cacae7bc0079 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -155,15 +155,13 @@ func (p *TxPool) Close() error { if err := <-errc; err != nil { errs = append(errs, err) } + // Terminate each subpool for _, subpool := range p.subpools { if err := subpool.Close(); err != nil { errs = append(errs, err) } } - // Unsubscribe anyone still listening for tx events - p.subs.Close() - if len(errs) > 0 { return fmt.Errorf("subpool close errors: %v", errs) } @@ -318,12 +316,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction return txs } -// SubscribeTransactions registers a subscription for new transaction events, -// supporting feeding only newly seen or also resurrected transactions. -func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { +// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending +// events to the given channel. +func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { subs := make([]event.Subscription, len(p.subpools)) for i, subpool := range p.subpools { - subs[i] = subpool.SubscribeTransactions(ch, reorgs) + subs[i] = subpool.SubscribeTransactions(ch) } return p.subs.Track(event.JoinSubscriptions(subs...)) } diff --git a/eth/api_backend.go b/eth/api_backend.go index 601e55515857..a0c14f1338da 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -334,7 +334,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool { } func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return b.eth.txPool.SubscribeTransactions(ch, true) + return b.eth.txPool.SubscribeNewTxsEvent(ch) } func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress { diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index a9a2bb4a9a7b..1f7a3266cd1a 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -199,7 +199,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal) error { func (c *SimulatedBeacon) loopOnDemand() { var ( newTxs = make(chan core.NewTxsEvent) - sub = c.eth.TxPool().SubscribeTransactions(newTxs, true) + sub = c.eth.TxPool().SubscribeNewTxsEvent(newTxs) ) defer sub.Unsubscribe() diff --git a/eth/handler.go b/eth/handler.go index 665df7d8cf85..f731efe1b8b1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -75,10 +75,9 @@ type txPool interface { // The slice should be modifiable by the caller. Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction - // SubscribeTransactions subscribes to new transaction events. The subscriber - // can decide whether to receive notifications only for newly seen transactions - // or also for reorged out ones. - SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + // SubscribeNewTxsEvent should return an event subscription of + // NewTxsEvent and send events to the given channel. + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription } // handlerConfig is the collection of initialization parameters to create a full @@ -510,10 +509,10 @@ func (h *handler) unregisterPeer(id string) { func (h *handler) Start(maxPeers int) { h.maxPeers = maxPeers - // broadcast and announce transactions (only new ones, not resurrected ones) + // broadcast transactions h.wg.Add(1) h.txsCh = make(chan core.NewTxsEvent, txChanSize) - h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) + h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh) go h.txBroadcastLoop() // broadcast mined blocks @@ -593,33 +592,26 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { } // BroadcastTransactions will propagate a batch of transactions -// - To a square root of all peers for non-blob transactions +// - To a square root of all peers // - And, separately, as announcements to all peers which are not known to // already have the given transaction. func (h *handler) BroadcastTransactions(txs types.Transactions) { var ( - blobTxs int // Number of blob transactions to announce only - largeTxs int // Number of large transactions to announce only - - directCount int // Number of transactions sent directly to peers (duplicates included) - directPeers int // Number of peers that were sent transactions directly - annCount int // Number of transactions announced across all peers (duplicates included) - annPeers int // Number of peers announced about transactions + annoCount int // Count of announcements made + annoPeers int + directCount int // Count of the txs sent directly to peers + directPeers int // Count of the peers that were sent transactions directly txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce + ) // Broadcast transactions to a batch of peers not knowing about it for _, tx := range txs { peers := h.peers.peersWithoutTransaction(tx.Hash()) var numDirect int - switch { - case tx.Type() == types.BlobTxType: - blobTxs++ - case tx.Size() > txMaxBroadcastSize: - largeTxs++ - default: + if tx.Size() <= txMaxBroadcastSize { numDirect = int(math.Sqrt(float64(len(peers)))) } // Send the tx unconditionally to a subset of our peers @@ -637,12 +629,13 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { peer.AsyncSendTransactions(hashes) } for peer, hashes := range annos { - annPeers++ - annCount += len(hashes) + annoPeers++ + annoCount += len(hashes) peer.AsyncSendPooledTransactionHashes(hashes) } - log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs, - "bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount) + log.Debug("Transaction broadcast", "txs", len(txs), + "announce packs", annoPeers, "announced hashes", annoCount, + "tx packs", directPeers, "broadcast txs", directCount) } // minedBroadcastLoop sends mined blocks to connected peers. diff --git a/eth/handler_eth.go b/eth/handler_eth.go index e844b36cca8b..3a0944640e49 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -17,7 +17,6 @@ package eth import ( - "errors" "fmt" "math/big" "time" @@ -74,11 +73,6 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return h.txFetcher.Notify(peer.ID(), packet.Hashes) case *eth.TransactionsPacket: - for _, tx := range *packet { - if tx.Type() == types.BlobTxType { - return errors.New("disallowed broadcast blob transaction") - } - } return h.txFetcher.Enqueue(peer.ID(), *packet, false) case *eth.PooledTransactionsResponse: @@ -96,7 +90,9 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, // the chain already entered the pos stage and disconnect the // remote peer. if h.merger.PoSFinalized() { - return errors.New("disallowed block announcement") + // TODO (MariusVanDerWijden) drop non-updated peers after the merge + return nil + // return errors.New("unexpected block announces") } // Schedule all the unknown hashes for retrieval var ( @@ -122,7 +118,9 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td // the chain already entered the pos stage and disconnect the // remote peer. if h.merger.PoSFinalized() { - return errors.New("disallowed block broadcast") + // TODO (MariusVanDerWijden) drop non-updated peers after the merge + return nil + // return errors.New("unexpected block announces") } // Schedule the block for import h.blockFetcher.Enqueue(peer.ID(), block) diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index bb342acc18f7..4cdfdf47b808 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -249,7 +249,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { handler.handler.synced.Store(true) // mark synced to accept transactions txs := make(chan core.NewTxsEvent) - sub := handler.txpool.SubscribeTransactions(txs, false) + sub := handler.txpool.SubscribeNewTxsEvent(txs) defer sub.Unsubscribe() // Create a source peer to send messages through and a sink handler to receive them @@ -424,7 +424,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { for i := 0; i < len(sinks); i++ { txChs[i] = make(chan core.NewTxsEvent, 1024) - sub := sinks[i].txpool.SubscribeTransactions(txChs[i], false) + sub := sinks[i].txpool.SubscribeNewTxsEvent(txChs[i]) defer sub.Unsubscribe() } // Fill the source pool with transactions and wait for them at the sinks diff --git a/eth/handler_test.go b/eth/handler_test.go index 6d6132ee4ce8..2e0a988452b7 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -113,17 +113,15 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.Lazy Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap(), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), }) } } return pending } -// SubscribeTransactions should return an event subscription of NewTxsEvent and +// SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and // send events to the given channel. -func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { +func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return p.txFeed.Subscribe(ch) } diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 42d0412a127c..a7d6ed25a9c6 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -93,10 +93,6 @@ type TxPool interface { func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol { protocols := make([]p2p.Protocol, 0, len(ProtocolVersions)) for _, version := range ProtocolVersions { - // Blob transactions require eth/68 announcements, disable everything else - if version <= ETH67 && backend.Chain().Config().CancunTime != nil { - continue - } version := version // Closure protocols = append(protocols, p2p.Protocol{ diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 069e92dadf90..da4ffd327e71 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -426,11 +426,11 @@ func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error if err := msg.Decode(&query); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } - hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsRequest) + hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsRequest, peer) return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs) } -func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsRequest) ([]common.Hash, []rlp.RawValue) { +func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsRequest, peer *Peer) ([]common.Hash, []rlp.RawValue) { // Gather transactions until the fetch or network limits is reached var ( bytes int diff --git a/miner/ordering_test.go b/miner/ordering_test.go index 59d478274d74..bdbdc3214851 100644 --- a/miner/ordering_test.go +++ b/miner/ordering_test.go @@ -92,8 +92,6 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) { Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap(), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), }) } expectedCount += count @@ -159,8 +157,6 @@ func TestTransactionTimeSort(t *testing.T) { Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap(), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), }) } // Sort the transactions and cross check the nonce ordering diff --git a/miner/worker.go b/miner/worker.go index f68070281454..711149232ba1 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -263,8 +263,8 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), } - // Subscribe for transaction insertion events (whether from network or resurrects) - worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) + // Subscribe NewTxsEvent for tx pool + worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) @@ -542,14 +542,11 @@ func (w *worker) mainLoop() { for _, tx := range ev.Txs { acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], &txpool.LazyTransaction{ - Pool: w.eth.TxPool(), // We don't know where this came from, yolo resolve from everywhere Hash: tx.Hash(), - Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in + Tx: tx.WithoutBlobTxSidecar(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap(), - Gas: tx.Gas(), - BlobGas: tx.BlobGas(), }) } txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) @@ -745,6 +742,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* if tx.Type() == types.BlobTxType { return w.commitBlobTransaction(env, tx) } + receipt, err := w.applyTransaction(env, tx) if err != nil { return nil, err @@ -766,6 +764,7 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction) if (env.blobs+len(sc.Blobs))*params.BlobTxBlobGasPerBlob > params.MaxBlobGasPerBlock { return nil, errors.New("max data blobs reached") } + receipt, err := w.applyTransaction(env, tx) if err != nil { return nil, err @@ -816,24 +815,13 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn if ltx == nil { break } - // If we don't have enough space for the next transaction, skip the account. - if env.gasPool.Gas() < ltx.Gas { - log.Trace("Not enough gas left for transaction", "hash", ltx.Hash, "left", env.gasPool.Gas(), "needed", ltx.Gas) - txs.Pop() - continue - } - if left := uint64(params.MaxBlobGasPerBlock - env.blobs*params.BlobTxBlobGasPerBlob); left < ltx.BlobGas { - log.Trace("Not enough blob gas left for transaction", "hash", ltx.Hash, "left", left, "needed", ltx.BlobGas) - txs.Pop() - continue - } - // Transaction seems to fit, pull it up from the pool tx := ltx.Resolve() if tx == nil { - log.Trace("Ignoring evicted transaction", "hash", ltx.Hash) + log.Warn("Ignoring evicted transaction") txs.Pop() continue } + // Error may be ignored here. The error has already been checked // during transaction acceptance is the transaction pool. from, _ := types.Sender(env.signer, tx) @@ -841,10 +829,11 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn // Check whether the tx is replay protected. If we're not in the EIP155 hf // phase, start ignoring the sender until we do. if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { - log.Trace("Ignoring replay protected transaction", "hash", ltx.Hash, "eip155", w.chainConfig.EIP155Block) + log.Trace("Ignoring replay protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) txs.Pop() continue } + // Start executing the transaction env.state.SetTxContext(tx.Hash(), env.tcount) @@ -852,7 +841,7 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn switch { case errors.Is(err, core.ErrNonceTooLow): // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "hash", ltx.Hash, "sender", from, "nonce", tx.Nonce()) + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) txs.Shift() case errors.Is(err, nil): @@ -864,7 +853,7 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn default: // Transaction is regarded as invalid, drop all consecutive transactions from // the same sender because of `nonce-too-high` clause. - log.Debug("Transaction failed, account skipped", "hash", ltx.Hash, "err", err) + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) txs.Pop() } }